mindcraft/tasks/analyze_crafting_tasks.py
2025-04-19 14:49:20 -05:00

379 lines
No EOL
16 KiB
Python

import boto3
import os
import json
import re
from botocore.exceptions import ClientError
import json
import argparse
from tqdm import tqdm
import glob
from prettytable import PrettyTable
import pandas as pd
# Calculate project root directory
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Define output directory for analysis results
analysis_output_dir = os.path.join(project_root, "experiments", "analysis_results")
# Ensure the output directory exists
os.makedirs(analysis_output_dir, exist_ok=True)
def download_s3_folders(bucket_name, s3_prefix, local_base_dir):
"""
Downloads groups of folders from S3 based on the next level of prefixes.
Args:
bucket_name (str): Name of the S3 bucket.
s3_prefix (str): Prefix where the folders are located (e.g., 'my-experiments/').
local_base_dir (str): Local directory to download the folders to.
Returns:
list: List of downloaded local folder paths.
"""
s3_client = boto3.client('s3')
downloaded_folders = []
# Ensure local_base_dir is relative to project root if not absolute
if not os.path.isabs(local_base_dir):
local_base_dir = os.path.join(project_root, local_base_dir)
try:
# List objects with the prefix, delimited by '/' to find sub-prefixes (folders)
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=s3_prefix, Delimiter='/')
if 'CommonPrefixes' not in response:
print(f"No folders found under s3://{bucket_name}/{s3_prefix}")
return downloaded_folders
s3_folder_prefixes = [prefix['Prefix'] for prefix in response['CommonPrefixes']]
subfolder = s3_prefix.split('/')[-2]
for s3_folder_prefix in tqdm(s3_folder_prefixes):
folder_name = s3_folder_prefix.split('/')[-2] # Extract folder name
local_folder_path = os.path.join(local_base_dir, subfolder, folder_name)
os.makedirs(local_folder_path, exist_ok=True)
downloaded_folders.append(local_folder_path)
# Download files within the folder
objects_in_folder = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder_prefix)
if 'Contents' in objects_in_folder:
for obj in objects_in_folder['Contents']:
s3_key = obj['Key']
local_file_path = os.path.join(local_folder_path, os.path.basename(s3_key))
try:
s3_client.download_file(bucket_name, s3_key, local_file_path)
except Exception as e:
print(f"Error downloading {s3_key}: {e}")
else:
print(f"No files found in {s3_folder_prefix}")
except ClientError as e:
print(f"Error accessing S3: {e}")
return []
return downloaded_folders
def analyze_json_file(file_path):
"""
Analyzes a single JSON file to extract the task outcome.
Args:
file_path (str): Path to the JSON file.
Returns:
bool: True if task was successful, False otherwise.
"""
try:
with open(file_path, 'r') as f:
data = json.load(f)
if 'turns' in data and isinstance(data['turns'], list):
for turn in data['turns']: # Check all turns, not just from the end
if turn.get('role') == 'system' and isinstance(turn.get('content'), str):
if "Task successful ended with code : 2" in turn['content'] or "Task ended with score : 1" in turn["content"] or "Task ended in score: 1" in turn["content"]:
# print(f"Success found in {file_path}")
return True
return False
except FileNotFoundError:
print(f"Error: File not found: {file_path}")
return None
except json.JSONDecodeError:
print(f"Error: Invalid JSON format in: {file_path}")
return None
except Exception as e:
print(f"An unexpected error occurred while processing {file_path}: {e}")
return None
def extract_result(folder_path):
folder_name = os.path.basename(folder_path)
json_files = glob.glob(os.path.join(folder_path, "*.json"))
if not json_files:
print(f"No JSON files found in {folder_name}")
return None
else:
# Check each JSON file in the folder for success indication
for json_file in json_files:
outcome = analyze_json_file(json_file)
if outcome: # If any file indicates success, return True
return True
return False # Return False only if no files indicate success
def is_base(folder_path):
return "full_plan" in folder_path and "depth_0" in folder_path and "missing" not in folder_path
def base_without_plan(folder_path):
return "no_plan" in folder_path and "depth_0" in folder_path and "missing" in folder_path
def aggregate_results(local_folders):
"""
Aggregates the analysis results for each folder.
Args:
local_folders (list): List of local folder paths containing the JSON files.
Returns:
dict: A dictionary where keys are folder names and values are the aggregated outcomes.
"""
aggregated_data = {}
total = 0
successful = 0
base_successful = 0
base_total = 0
base_no_plan_successful = 0
base_no_plan_total = 0
missing_successful = 0
missing_total = 0
full_plan_successful = 0
full_plan_total = 0
partial_plan_successful = 0
partial_plan_total = 0
no_plan_successful = 0
no_plan_total = 0
high_depth_successful = 0
high_depth_total = 0
# For depth-based metrics
depth_0_successful = 0
depth_0_total = 0
depth_1_successful = 0
depth_1_total = 0
depth_2_successful = 0
depth_2_total = 0
for folder_path in tqdm(local_folders):
folder_name = os.path.basename(folder_path)
try:
total += 1
result = extract_result(folder_path)
success = int(extract_result(folder_path))
successful += success
print(f"Folder: {folder_name} -> {success}")
if "missing" in folder_path:
missing_successful += success
missing_total += 1
if is_base(folder_path):
base_successful += success
base_total += 1
if base_without_plan(folder_path):
base_no_plan_successful += success
base_no_plan_total += 1
if "full_plan" in folder_path:
full_plan_successful += success
full_plan_total += 1
if "partial_plan" in folder_path:
partial_plan_successful += success
partial_plan_total += 1
if "no_plan" in folder_path:
no_plan_successful += success
no_plan_total += 1
if "depth_1" in folder_path or "depth_2" in folder_path:
high_depth_successful += success
high_depth_total += 1
# Collect depth-specific metrics
if "depth_0" in folder_path:
depth_0_successful += success
depth_0_total += 1
elif "depth_1" in folder_path:
depth_1_successful += success
depth_1_total += 1
elif "depth_2" in folder_path:
depth_2_successful += success
depth_2_total += 1
except Exception as e:
print(f"Error processing {folder_name}: {e}")
return {
"total": total,
"successful": successful,
"success_rate": successful / total if total > 0 else 0,
"base_total": base_total,
"base_successful": base_successful,
"base_success_rate": base_successful / base_total if base_total > 0 else 0,
"base_no_plan_total": base_no_plan_total,
"base_no_plan_successful": base_no_plan_successful,
"base_no_plan_success_rate": base_no_plan_successful / base_no_plan_total if base_no_plan_total > 0 else 0,
"missing_total": missing_total,
"missing_successful": missing_successful,
"missing_success_rate": missing_successful / missing_total if missing_total > 0 else 0,
"full_plan_total": full_plan_total,
"full_plan_successful": full_plan_successful,
"full_plan_success_rate": full_plan_successful / full_plan_total if full_plan_total > 0 else 0,
"partial_plan_total": partial_plan_total,
"partial_plan_successful": partial_plan_successful,
"partial_plan_success_rate": partial_plan_successful / partial_plan_total if partial_plan_total > 0 else 0,
"no_plan_total": no_plan_total,
"no_plan_successful": no_plan_successful,
"no_plan_success_rate": no_plan_successful / no_plan_total if no_plan_total > 0 else 0,
"high_depth_total": high_depth_total,
"high_depth_successful": high_depth_successful,
"high_depth_success_rate": high_depth_successful / high_depth_total if high_depth_total > 0 else 0,
"depth_0_total": depth_0_total,
"depth_0_successful": depth_0_successful,
"depth_0_success_rate": depth_0_successful / depth_0_total if depth_0_total > 0 else 0,
"depth_1_total": depth_1_total,
"depth_1_successful": depth_1_successful,
"depth_1_success_rate": depth_1_successful / depth_1_total if depth_1_total > 0 else 0,
"depth_2_total": depth_2_total,
"depth_2_successful": depth_2_successful,
"depth_2_success_rate": depth_2_successful / depth_2_total if depth_2_total > 0 else 0
}
def get_immediate_subdirectories(a_dir):
# Ensure a_dir is relative to project root if not absolute
if not os.path.isabs(a_dir):
a_dir = os.path.join(project_root, a_dir)
return [os.path.join(a_dir, name) for name in os.listdir(a_dir)
if os.path.isdir(os.path.join(a_dir, name))]
def format_percentage(value):
"""Format a decimal value as a percentage with 2 decimal places"""
return f"{value * 100:.2f}%"
def create_pretty_tables(results):
"""
Create pretty tables for the results.
Args:
results (dict): Dictionary with aggregated results
Returns:
str: String representation of the formatted tables
"""
# Table 1: Overall Metrics
overall_table = PrettyTable()
overall_table.title = "Overall Metrics"
overall_table.field_names = ["Metric", "Total", "Successful", "Success Rate"]
overall_table.add_row(["All Tests", results["total"], results["successful"], format_percentage(results["success_rate"])])
overall_table.add_row(["Base", results["base_total"], results["base_successful"], format_percentage(results["base_success_rate"])])
overall_table.add_row(["Base (No Plan)", results["base_no_plan_total"], results["base_no_plan_successful"], format_percentage(results["base_no_plan_success_rate"])])
overall_table.add_row(["Missing", results["missing_total"], results["missing_successful"], format_percentage(results["missing_success_rate"])])
overall_table.add_row(["High Depth", results["high_depth_total"], results["high_depth_successful"], format_percentage(results["high_depth_success_rate"])])
# Table 2: Depth-based Metrics
depth_table = PrettyTable()
depth_table.title = "Metrics by Depth"
depth_table.field_names = ["Depth", "Total", "Successful", "Success Rate"]
depth_table.add_row(["Depth 0", results["depth_0_total"], results["depth_0_successful"], format_percentage(results["depth_0_success_rate"])])
depth_table.add_row(["Depth 1", results["depth_1_total"], results["depth_1_successful"], format_percentage(results["depth_1_success_rate"])])
depth_table.add_row(["Depth 2", results["depth_2_total"], results["depth_2_successful"], format_percentage(results["depth_2_success_rate"])])
# Table 3: Plan Availability Metrics
plan_table = PrettyTable()
plan_table.title = "Metrics by Plan Availability"
plan_table.field_names = ["Plan Type", "Total", "Successful", "Success Rate"]
plan_table.add_row(["Full Plan", results["full_plan_total"], results["full_plan_successful"], format_percentage(results["full_plan_success_rate"])])
plan_table.add_row(["Partial Plan", results["partial_plan_total"], results["partial_plan_successful"], format_percentage(results["partial_plan_success_rate"])])
plan_table.add_row(["No Plan", results["no_plan_total"], results["no_plan_successful"], format_percentage(results["no_plan_success_rate"])])
return overall_table.get_string() + "\n\n" + depth_table.get_string() + "\n\n" + plan_table.get_string()
def analyze_crafting_log(log_file):
# ... existing code ...
pass
def main():
# 1. Download folders from AWS or use local directory
parser = argparse.ArgumentParser()
parser.add_argument('--s3_download', action="store_true", help='Download folders from S3')
parser.add_argument('--aws_bucket_name', default="mindcraft" , type=str, help='AWS bucket name')
parser.add_argument('--s3_folder_prefix', default="", type=str, help='S3 folder prefix')
# Change default input dir to 'experiments' relative to project root
parser.add_argument('--local_download_dir', default="experiments", type=str, help='Local directory containing results (relative to project root)')
args = parser.parse_args()
AWS_BUCKET_NAME = args.aws_bucket_name
S3_FOLDER_PREFIX = args.s3_folder_prefix
# Resolve local_download_dir relative to project root
local_download_dir_abs = args.local_download_dir
if not os.path.isabs(local_download_dir_abs):
local_download_dir_abs = os.path.join(project_root, local_download_dir_abs)
# Construct LOCAL_DOWNLOAD_DIR based on the absolute path
# This directory will be used for results aggregation and saving output files
if args.local_download_dir != "":
LOCAL_DOWNLOAD_DIR = local_download_dir_abs # Base results directory
if args.s3_download and S3_FOLDER_PREFIX: # Append S3 prefix if downloading to keep results separate
LOCAL_DOWNLOAD_DIR = os.path.join(local_download_dir_abs, S3_FOLDER_PREFIX.replace('/', '_').rstrip('_'))
else:
LOCAL_DOWNLOAD_DIR = local_download_dir_abs # Should not happen with default
if (args.s3_download):
print(f"Downloading folders from s3://{AWS_BUCKET_NAME}/{S3_FOLDER_PREFIX} to {LOCAL_DOWNLOAD_DIR}...")
# Pass the absolute base path for downloads, download_s3_folders handles subfolder creation
folders = download_s3_folders(AWS_BUCKET_NAME, S3_FOLDER_PREFIX, local_download_dir_abs)
else:
# Use the absolute path to get subdirectories
folders = get_immediate_subdirectories(local_download_dir_abs)
print(f"Found local folders: {folders}")
if not folders:
print("No folders found or downloaded. Exiting.")
exit()
results = aggregate_results(folders)
print(results)
# Create pretty tables
tables_output = create_pretty_tables(results)
print("\n" + tables_output)
# Save results to files within the hardcoded experiments/analysis_results/ directory
# os.makedirs(LOCAL_DOWNLOAD_DIR, exist_ok=True) # Output dir created at top
# Save raw results
# Determine filename based on S3 prefix or local dir name if possible
if S3_FOLDER_PREFIX:
results_filename_base = S3_FOLDER_PREFIX.replace('/', '_').rstrip('_')
else:
results_filename_base = os.path.basename(local_download_dir_abs) if local_download_dir_abs else "local"
results_filename_base = f"crafting_analysis_{results_filename_base}"
results_file_path = os.path.join(analysis_output_dir, f"{results_filename_base}_results.txt")
with open(results_file_path, "w") as file:
file.write("Results\n")
for key, value in results.items():
file.write(f"{key}: {value}\n")
# Save pretty tables
tables_file_path = os.path.join(analysis_output_dir, f"{results_filename_base}_tables.txt")
with open(tables_file_path, "w") as file:
file.write(tables_output)
print(f"Results saved to {results_file_path} and tables saved to {tables_file_path}")
if __name__ == "__main__":
main()