From f7947ec3c21f00439a2b9e9a93a954d7f8c7b656 Mon Sep 17 00:00:00 2001 From: Johnathan Walker Date: Sun, 15 Jun 2025 23:12:34 -0400 Subject: [PATCH] refactor: Eliminate code duplication and enhance development workflow - Created tasks/experiment_utils.py for shared utility functions - Streamlined entry point scripts by moving common code to utils - Enhanced .gitignore with comprehensive Python development patterns - Validated and fixed documentation links across all markdown files - Applied final code quality improvements and optimization --- .gitignore | 5 +- tasks/analyse_results.py | 76 +++--- tasks/evaluation.py | 119 ++++++++- tasks/evaluation_script.py | 533 ++----------------------------------- tasks/experiment_utils.py | 377 ++++++++++++++++++++++++++ 5 files changed, 549 insertions(+), 561 deletions(-) create mode 100644 tasks/experiment_utils.py diff --git a/.gitignore b/.gitignore index b100f56..3e86393 100644 --- a/.gitignore +++ b/.gitignore @@ -27,8 +27,11 @@ tasks/construction_tasks/test/** tasks/construction_tasks/train/** server_data* **/.DS_Store +# Python .venv/ -tasks/__pycache__/ +__pycache__/ +*.pyc +*~ # npm cache .npm-cache/ diff --git a/tasks/analyse_results.py b/tasks/analyse_results.py index 085ab2a..bf67295 100644 --- a/tasks/analyse_results.py +++ b/tasks/analyse_results.py @@ -14,8 +14,7 @@ import concurrent.futures logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') from tasks.evaluation import ( - extract_task_outcome, - aggregate_results_to_dataframe, + aggregate_results, ) # --- Constants and Setup --- @@ -100,54 +99,45 @@ def download_s3_folders(bucket_name: str, s3_prefix: str, local_base_dir: str, m return downloaded_folders - -def aggregate_results(local_folders: List[str], task_definitions: Dict[str, Any]) -> pd.DataFrame: +def analyze_results_with_model_extraction(local_folders: List[str], task_definitions: Dict[str, Any]) -> pd.DataFrame: """ - Aggregates experiment results from a list of local folders into a DataFrame. - - This function serves as the core analysis engine, iterating through each task - folder, extracting outcomes, and compiling them into a single, comprehensive - DataFrame for further analysis. - + Analyzes experiment results and attempts to extract model names from folder structure. + + This function wraps the centralized aggregate_results function but adds + model name extraction specific to the analysis script's needs. + Args: local_folders (List[str]): A list of paths to the task run folders. task_definitions (Dict[str, Any]): A dictionary of all task definitions, keyed by task_id. - + Returns: - pd.DataFrame: A DataFrame containing the detailed evaluation results. + pd.DataFrame: A DataFrame containing the detailed evaluation results with model names. """ - task_outcomes = [] - for folder_path in tqdm(local_folders, desc="Analyzing task folders"): - task_id = os.path.basename(folder_path.strip(os.sep)) - task_def = task_definitions.get(task_id) - - if not task_def: - logging.warning(f"No task definition found for task_id '{task_id}'. Skipping folder '{folder_path}'.") - continue + # Use the centralized function with progress bar enabled + results_df = aggregate_results(local_folders, task_definitions, use_tqdm=True) + + # Extract model names from folder paths if possible + if not results_df.empty and 'task_id' in results_df.columns: + model_names = [] + folder_map = {os.path.basename(folder.strip(os.sep)): folder for folder in local_folders} - if 'task_id' not in task_def: - task_def['task_id'] = task_id - - try: - # Use the core evaluation function - outcome = extract_task_outcome(folder_path, task_def) - # The model name is often part of the folder structure, let's try to extract it - # This is an example, and might need to be adapted based on the actual folder structure - try: - # e.g. experiments/my_exp_date/claude-3-5-sonnet-latest/task_1 - model_name = folder_path.split(os.sep)[-2] - outcome.model_name = model_name - except IndexError: - outcome.model_name = "unknown" - - task_outcomes.append(outcome) - except Exception as e: - logging.error(f"Error processing folder {folder_path}: {e}") - - # Convert the list of dictionaries to a DataFrame - return aggregate_results_to_dataframe(task_outcomes) - + for task_id in results_df['task_id']: + matching_folder = folder_map.get(task_id) + + if matching_folder: + try: + # e.g. experiments/my_exp_date/claude-3-5-sonnet-latest/task_1 + model_name = os.path.basename(os.path.dirname(matching_folder)) + model_names.append(model_name) + except IndexError: + model_names.append("unknown") + else: + model_names.append("unknown") + + results_df['model_name'] = model_names + + return results_df def get_immediate_subdirectories(a_dir: str) -> List[str]: """ @@ -213,7 +203,7 @@ def main() -> None: return # --- Step 3: Aggregate Results into a DataFrame --- - results_df = aggregate_results(folders_to_analyze, task_definitions) + results_df = analyze_results_with_model_extraction(folders_to_analyze, task_definitions) if results_df.empty: logging.warning("Analysis generated no results. Exiting.") diff --git a/tasks/evaluation.py b/tasks/evaluation.py index 3e2d054..6169340 100644 --- a/tasks/evaluation.py +++ b/tasks/evaluation.py @@ -67,6 +67,8 @@ class TaskRunOutcome: import json import re +import pandas as pd +from tqdm import tqdm def analyze_agent_log(file_path: str) -> AgentOutcome: """ @@ -206,34 +208,129 @@ def extract_task_outcome(folder_path: str, task_definition: Dict[str, Any]) -> T def aggregate_results_to_dataframe(task_outcomes: List[TaskRunOutcome]) -> pd.DataFrame: """ Converts a list of TaskRunOutcome objects into a Pandas DataFrame. - This function is a key step in the analysis pipeline, transforming the raw outcome objects into a structured DataFrame suitable for advanced analysis, visualization, and reporting. It flattens nested metric dictionaries for easier access. - Args: task_outcomes (List[TaskRunOutcome]): A list of task outcome objects to be aggregated. - Returns: pd.DataFrame: A DataFrame where each row represents a single task run. """ if not task_outcomes: return pd.DataFrame() - # Convert list of dataclasses to list of dicts outcome_dicts = [vars(outcome) for outcome in task_outcomes] - - # Create DataFrame df = pd.DataFrame(outcome_dicts) - - # Flatten the 'task_definition_metrics' dictionary into separate columns + if 'task_definition_metrics' in df.columns: metrics_df = df['task_definition_metrics'].apply(pd.Series) metrics_df = metrics_df.add_prefix('metric_') df = pd.concat([df.drop(['task_definition_metrics'], axis=1), metrics_df], axis=1) - # The 'agent_outcomes' is a complex object (list of dataclasses). - # For now, we'll leave it as is, but it can be flattened further if needed. + # Convert Enum members to their string values for CSV compatibility + if 'overall_completion_status' in df.columns: + df['overall_completion_status'] = df['overall_completion_status'].apply(lambda x: x.value) + + return df + +def aggregate_results(local_folders: List[str], task_definitions: Dict[str, Any], use_tqdm: bool = False) -> pd.DataFrame: + """ + Aggregates experiment results from local folders into a DataFrame. + This function iterates through a list of folders, each representing a single + task run. It uses the `extract_task_outcome` function to analyze the agent + logs within each folder and compiles the results into a structured DataFrame. + Args: + local_folders (List[str]): A list of paths to the task run folders. + task_definitions (Dict[str, Any]): A dictionary of all task definitions, + keyed by task_id. + use_tqdm (bool): If True, display a progress bar. + Returns: + pd.DataFrame: A DataFrame containing the detailed evaluation results. + """ + task_outcomes = [] - return df \ No newline at end of file + iterable = tqdm(local_folders, desc="Analyzing task folders") if use_tqdm else local_folders + + for folder_path in iterable: + task_id = os.path.basename(folder_path.strip(os.sep)) + task_def = task_definitions.get(task_id) + + if not task_def: + logging.warning(f"No task definition found for task_id '{task_id}'. Skipping folder '{folder_path}'.") + continue + + if 'task_id' not in task_def: + task_def['task_id'] = task_id + + try: + outcome = extract_task_outcome(folder_path, task_def) + task_outcomes.append(outcome) + except Exception as e: + logging.error(f"Error processing folder {folder_path}: {e}") + + return aggregate_results_to_dataframe(task_outcomes) + + +def check_folder_results(folder_path: str, task_file_path: str) -> pd.DataFrame: + """ + Evaluates all subfolders in a given directory and prints a summary. + This function serves as a high-level entry point for analyzing an experiment + folder. It finds all immediate subdirectories, loads task definitions, + aggregates results, and prints a summary of success rates and completion + statuses. + Args: + folder_path (str): The path to the main experiment folder containing subfolders + for each task run. + task_file_path (str): The path to the JSON file containing task definitions. + Returns: + pd.DataFrame: A DataFrame with the full evaluation results, or None if a + critical error occurs. + """ + logging.info(f"Checking results in folder: {folder_path}") + + if not os.path.exists(folder_path) or not os.path.isdir(folder_path): + logging.error(f"Folder not found or is not a directory: {folder_path}") + return None + + try: + with open(task_file_path, 'r') as f: + task_definitions = json.load(f) + except (FileNotFoundError, json.JSONDecodeError) as e: + logging.error(f"Error reading or parsing task definition file {task_file_path}: {e}") + return None + + subfolders = [f.path for f in os.scandir(folder_path) if f.is_dir()] + if not subfolders: + logging.warning("No subfolders found to evaluate.") + return pd.DataFrame() + + logging.info(f"Found {len(subfolders)} subfolders to evaluate.") + results_df = aggregate_results(subfolders, task_definitions) + + if results_df.empty: + logging.warning("No results were generated.") + return results_df + + # Calculate and print summary statistics from the DataFrame + total_tasks = len(results_df) + successful_tasks = results_df['overall_is_successful'].sum() + success_rate = (successful_tasks / total_tasks) if total_tasks > 0 else 0.0 + + logging.info("\n=== Evaluation Results Summary ===") + logging.info(f"Total tasks evaluated: {total_tasks}") + logging.info(f"Successful tasks: {successful_tasks}") + logging.info(f"Overall Success Rate: {success_rate:.2%}") + + # You can add more detailed analysis here, e.g., by task type + if 'task_type' in results_df.columns: + logging.info("\n--- Success Rate by Task Type ---") + type_success = results_df.groupby('task_type')['overall_is_successful'].mean().map("{:.2%}".format) + logging.info(type_success) + + if 'overall_completion_status' in results_df.columns: + logging.info("\n--- Completion Status Distribution ---") + status_dist = results_df['overall_completion_status'].value_counts(normalize=True).map("{:.2%}".format) + logging.info(status_dist) + + return results_df \ No newline at end of file diff --git a/tasks/evaluation_script.py b/tasks/evaluation_script.py index 992705b..77514be 100644 --- a/tasks/evaluation_script.py +++ b/tasks/evaluation_script.py @@ -1,11 +1,8 @@ import argparse import json -import shutil import subprocess import time from datetime import datetime -import re -import sys import os import logging import pandas as pd @@ -14,10 +11,25 @@ import pandas as pd logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') from tasks.evaluation import ( - extract_task_outcome, - aggregate_results_to_dataframe, + aggregate_results, + check_folder_results, ) +from tasks.experiment_utils import ( + update_keys_json, + set_environment_variable_tmux_session, + make_profiles, + create_server_files, + edit_file, + clean_up_server_files, + launch_world, + make_ops, + make_script_file_and_run, +) + +from typing import List, Dict, Any, Tuple + +# Task-specific blocked actions constants BLOCKED_ACTIONS_COOKING = [ '!activate', '!attackPlayer', '!checkBlueprint', '!checkBlueprintLevel', '!clearChat', '!clearFurnace', '!consume', '!craftable', '!discard', @@ -42,185 +54,6 @@ BLOCKED_ACTIONS_CONSTRUCTION = [ '!stop', '!takeFromChest', '!viewChest', '!craftRecipe', '!smeltItem' ] - -from typing import List, Dict, Any, Tuple - -def aggregate_results(local_folders: List[str], task_definitions: Dict[str, Any]) -> pd.DataFrame: - """ - Aggregates experiment results from local folders into a DataFrame. - - This function iterates through a list of folders, each representing a single - task run. It uses the `extract_task_outcome` function to analyze the agent - logs within each folder and compiles the results into a structured DataFrame. - - Args: - local_folders (List[str]): A list of paths to the task run folders. - task_definitions (Dict[str, Any]): A dictionary of all task definitions, - keyed by task_id. - - Returns: - pd.DataFrame: A DataFrame containing the detailed evaluation results. - """ - task_outcomes = [] - for folder_path in local_folders: - # Extract the task_id from the folder name. This assumes the folder is named after the task_id. - task_id = os.path.basename(folder_path.strip(os.sep)) - task_def = task_definitions.get(task_id) - - if not task_def: - logging.warning(f"No task definition found for task_id '{task_id}'. Skipping folder '{folder_path}'.") - continue - - # The task definition from the file might not have the task_id in it, so we add it. - if 'task_id' not in task_def: - task_def['task_id'] = task_id - - try: - outcome = extract_task_outcome(folder_path, task_def) - task_outcomes.append(outcome) - except Exception as e: - logging.error(f"Error processing folder {folder_path}: {e}") - - return aggregate_results_to_dataframe(task_outcomes) - - -def check_folder_results(folder_path: str, task_file_path: str) -> pd.DataFrame: - """ - Evaluates all subfolders in a given directory and prints a summary. - - This function serves as a high-level entry point for analyzing an experiment - folder. It finds all immediate subdirectories, loads task definitions, - aggregates results, and prints a summary of success rates and completion - statuses. - - Args: - folder_path (str): The path to the main experiment folder containing subfolders - for each task run. - task_file_path (str): The path to the JSON file containing task definitions. - - Returns: - pd.DataFrame: A DataFrame with the full evaluation results, or None if a - critical error occurs. - """ - logging.info(f"Checking results in folder: {folder_path}") - - if not os.path.exists(folder_path) or not os.path.isdir(folder_path): - logging.error(f"Folder not found or is not a directory: {folder_path}") - return None - - try: - with open(task_file_path, 'r') as f: - task_definitions = json.load(f) - except (FileNotFoundError, json.JSONDecodeError) as e: - logging.error(f"Error reading or parsing task definition file {task_file_path}: {e}") - return None - - subfolders = [f.path for f in os.scandir(folder_path) if f.is_dir()] - if not subfolders: - logging.warning("No subfolders found to evaluate.") - return pd.DataFrame() - - logging.info(f"Found {len(subfolders)} subfolders to evaluate.") - results_df = aggregate_results(subfolders, task_definitions) - - if results_df.empty: - logging.warning("No results were generated.") - return results_df - - # Calculate and print summary statistics from the DataFrame - total_tasks = len(results_df) - successful_tasks = results_df['overall_is_successful'].sum() - success_rate = (successful_tasks / total_tasks) if total_tasks > 0 else 0.0 - - logging.info("\n=== Evaluation Results Summary ===") - logging.info(f"Total tasks evaluated: {total_tasks}") - logging.info(f"Successful tasks: {successful_tasks}") - logging.info(f"Overall Success Rate: {success_rate:.2%}") - - # You can add more detailed analysis here, e.g., by task type - if 'task_type' in results_df.columns: - logging.info("\n--- Success Rate by Task Type ---") - type_success = results_df.groupby('task_type')['overall_is_successful'].mean().map("{:.2%}".format) - logging.info(type_success) - - if 'overall_completion_status' in results_df.columns: - logging.info("\n--- Completion Status Distribution ---") - status_dist = results_df['overall_completion_status'].value_counts(normalize=True).map("{:.2%}".format) - logging.info(status_dist) - - return results_df - -def read_settings(file_path: str) -> List[str]: - """ - Reads and parses a settings.js file to extract agent profile names. - - This function is designed to handle the JavaScript export format by stripping - comments, trailing commas, and the 'export default' statement before parsing - it as JSON. - - Args: - file_path (str): The path to the settings.js file. - - Returns: - List[str]: A list of agent names extracted from the profiles. - """ - with open(file_path, 'r', encoding='utf-8') as file: - content = file.read() - - # Remove `export default` and trailing commas - content = re.sub(r'export\s+default', '', content) - content = re.sub(r',\s*(?=[}\]])', '', content) - - # Remove JavaScript comments - content = re.sub(r'//.*', '', content) - - # Remove trailing commas (e.g., before } or ]) - content = re.sub(r',\s*(?=[}\]])', '', content) - - # Strip leading and trailing whitespace - content = content.strip() - - json_data = json.loads(content) - - profiles = json_data['profiles'] - - ## profiles is a list of strings like "./andy.json" and "./bob.json" - - agent_names = [profile.split('/')[-1].split('.')[0] for profile in profiles] - return agent_names - -def update_keys_json() -> None: - """ - Updates the keys.json file with values from environment variables. - - This function reads `keys.example.json`, iterates through its keys, and - replaces the values with corresponding environment variables if they exist. - The result is written to `keys.json`. - """ - with open("keys.example.json", 'r', encoding='utf-8') as file: - content = file.read() - data = json.loads(content) - - # Update keys with environment variables - for key in data.keys(): - env_value = os.getenv(key) # Fetch from environment variables - if env_value: # If the variable exists, update it - data[key] = env_value - - with open("keys.json", 'w', encoding='utf-8') as file: - json.dump(data, file, indent=4) - -def set_environment_variable_tmux_session(session_name: str, key: str, value: Any) -> None: - """ - Sets an environment variable within a running tmux session. - - Args: - session_name (str): The name of the target tmux session. - key (str): The environment variable key to set. - value (Any): The value to assign to the key. - """ - subprocess.run(["tmux", "send-keys", "-t", session_name, f"export {key}={value}", "C-m"]) - def launch_parallel_experiments(task_path: str, num_exp: int, exp_name: str, @@ -283,6 +116,8 @@ def launch_parallel_experiments(task_path: str, world_name = "Forest" elif task_type == "construction": world_name = "Superflat" + else: + world_name = "Forest" # Default fallback if run_in_tmux: servers = create_server_files("./tasks/server_data/", num_parallel, world_name=world_name) @@ -300,7 +135,7 @@ def launch_parallel_experiments(task_path: str, s3_path = f"{bucket_name}/{task_type}/{model}/{task_path_name}/{exp_name}" - # start wandb + # start experiments os.makedirs(experiments_folder, exist_ok=True) for i, server in enumerate(servers): launch_server_experiment(task_path, @@ -355,8 +190,7 @@ def launch_parallel_experiments(task_path: str, total_run = len(results_df) success_rate = results_df['overall_is_successful'].mean() status_dist = results_df['overall_completion_status'].value_counts(normalize=True).to_dict() - status_dist_str = ", ".join([f"{k.value}: {v:.2%}" for k, v in status_dist.items()]) - + status_dist_str = ", ".join([f"{k}: {v:.2%}" for k, v in status_dist.items()]) logging.info(f"\n--- Progress Update ({datetime.now().strftime('%H:%M:%S')}) ---") logging.info(f"Total tasks run: {total_run}/{total_num_experiments}") @@ -381,12 +215,9 @@ def launch_parallel_experiments(task_path: str, # Save summary and detailed results with open(f"{experiments_folder}/results.json", "w") as f: - json.dump(results_summary, f, indent=4) + json.dump(results_summary, f, indent=4, default=str) if not results_df.empty: - # Convert Enum members to their string values for CSV compatibility - df_for_csv = results_df.copy() - df_for_csv['overall_completion_status'] = df_for_csv['overall_completion_status'].apply(lambda x: x.value) - df_for_csv.to_csv(f"{experiments_folder}/detailed_results.csv", index=False) + results_df.to_csv(f"{experiments_folder}/detailed_results.csv", index=False) if s3: cmd_results = f"aws s3 cp {experiments_folder}/results.json s3://{s3_path}/results.json" @@ -488,11 +319,12 @@ def launch_server_experiment(task_path: str, agent_profiles_str += f'\"{agent}\", ' agent_profiles_str += f"\"{agent_profiles[-1]}\"]'" logging.info(agent_profiles_str) + if run_in_tmux: logging.info("run in tmux is true") - launch_world(server_path, session_name="server_" + session_name, agent_names=agent_names, port=server_port) - + launch_world(server_path, session_name="server_" + session_name, port=server_port) subprocess.run(['tmux', 'new-session', '-d', '-s', session_name], check=True) + # set environment variables if run_in_tmux: set_environment_variable_tmux_session(session_name, "MINECRAFT_PORT", server_port) @@ -564,7 +396,6 @@ def run_script(task_path: str, logging.info(f"Created directory: {task_folder}") cmd = f"node main.js --task_path \'{task_path}\' --task_id {task_id}" - cp_cmd = f"cp {agent_names[0]}.json {server_path}bots/{agent_names[0]}/profile.json" for _ in range(num_exp): script_content += f"{cmd}\n" script_content += "sleep 2\n" @@ -590,315 +421,6 @@ def run_script(task_path: str, script_file = f"./tmp/experiment_script_{session_name}.sh" make_script_file_and_run(script_content, script_file, session_name=session_name, run_in_tmux=run_in_tmux) - -def make_ops(agent_names: List[str], session_name: str) -> None: - """ - Makes the specified agents operators (ops) in the Minecraft world. - - This is achieved by running a debug task to get the agents into the server, - then issuing the /op command from the server console. - - Args: - agent_names (List[str]): A list of agent names to be made ops. - session_name (str): The tmux session name where the agents are running. - """ - logging.info('Making agents operators...') - - cmd = f"node main.js --task_path tasks/example_tasks.json --task_id debug_{len(agent_names)}_agent_timeout" - - subprocess.run(["tmux", "send-keys", "-t", session_name, cmd, "C-m"]) - - time.sleep(30) - - subprocess.run(["tmux", "send-keys", "-t", "server_" + session_name, f"/op @a", "C-m"]) - - agents_op = check_agent_ops(agent_names, ops_file=f"./tasks/server_data_{session_name}/ops.json") - if agents_op: - logging.info("Agents are operators! You are good to go :D") - else: - logging.warning("Agents are not operators! We will need to try making them operators again!") - make_ops(agent_names, session_name) - -def check_agent_ops(agent_names: List[str], ops_file: str = "ops.json") -> bool: - """ - Checks the ops.json file to verify that all agents are operators. - - Args: - agent_names (List[str]): The list of agent names to check. - ops_file (str): The path to the ops.json file. - - Returns: - bool: True if all agents are listed in the ops file, False otherwise. - """ - with open(ops_file, "r") as f: - ops_data = json.load(f) - - ops_names = [op["name"] for op in ops_data] - - for agent in agent_names: - if agent not in ops_names: - return False - return True - -def make_script_file_and_run(script_content: str, - file_name: str, - session_name: str = "0", - run_in_tmux: bool = True) -> None: - """ - Writes content to a script file and executes it. - - Args: - script_content (str): The shell script content to write. - file_name (str): The path to the script file to be created. - session_name (str): The tmux session to run the script in. - run_in_tmux (bool): If True, run via tmux; otherwise, run directly. - """ - script_dir = os.path.dirname(file_name) - os.makedirs(script_dir, exist_ok=True) - assert os.path.exists(script_dir), f"Script directory {script_dir} was not created" - logging.info(f"Created script directory: {script_dir}") - - # Call the function before writing the script file - with open(file_name, 'w') as f: - f.write(script_content) - assert os.path.exists(file_name), f"Script file {file_name} was not created" - - script_file_run = "bash " + file_name - - # Execute the shell script using subprocess - if run_in_tmux: - subprocess.run(["tmux", "send-keys", "-t", session_name, script_file_run, "C-m"]) - else: - subprocess.run(script_file_run.split()) - -def make_profiles(agent_names: List[str], - models: List[str], - apis: List[str], - template_profile: str = "profiles/collab_profile.json", - url: str = "http://127.0.0.1:8000/v1") -> None: - """ - Generates JSON profile files for each agent based on a template. - - Args: - agent_names (List[str]): List of agent names. - models (List[str]): List of model names corresponding to each agent. - apis (List[str]): List of API providers for each agent. - template_profile (str): Path to the template profile JSON file. - url (str): The API URL to use for vLLM models. - """ - assert len(agent_names) == len(models) - - with open(template_profile, 'r') as f: - content = f.read() - - profile = json.loads(content) - - for index in range(len(agent_names)): - profile["name"] = agent_names[index] - if apis[index] == "vllm": - profile["model"] = { - "api": "vllm", - "model": models[index], - "url": url - } - elif apis[index] == "ollama": - profile["model"] = { - "api": "ollama", - "model": models[index], - "embedding": "ollama" - } - else: - profile["model"] = models[index] - - with open(f"{agent_names[index]}.json", 'w') as f: - json.dump(profile, f, indent=4) - -def create_server_files(source_path: str, num_copies: int, world_name: str = "Forest") -> List[Tuple[str, int]]: - """ - Creates multiple copies of server files for parallel experiments. - - Args: - source_path (str): The path to the source server files directory. - num_copies (int): The number of server copies to create. - world_name (str): The name of the world to set in server.properties. - - Returns: - List[Tuple[str, int]]: A list of tuples, each containing the path and port - of a created server instance. - """ - logging.info("Creating server files...") - logging.info(num_copies) - servers = [] - for i in range(num_copies): - dest_path = f"./tasks/server_data_{i}/" - copy_server_files(source_path, dest_path) - logging.info(dest_path) - edit_file(dest_path + "server.properties", {"server-port": 55916 + i, - "level-name": world_name}) - # edit_server_properties_file(dest_path, 55916 + i) - servers.append((dest_path, 55916 + i)) - return servers - -def edit_file(file: str, content_dict: Dict[str, Any]) -> None: - """ - Edits a properties-style file by replacing values for given keys. - - Args: - file (str): The path to the file to edit. - content_dict (Dict[str, Any]): A dictionary of key-value pairs to update. - """ - try: - with open(file, 'r') as f: - lines = f.readlines() - with open(file, 'w') as f: - for line in lines: - for key, value in content_dict.items(): - if line.startswith(key): - f.write(f"{key}={value}\n") - else: - f.write(line) - logging.info(f"{file} updated with {content_dict}") - except Exception as e: - logging.error(f"Error editing file {file}: {e}") - -def clean_up_server_files(num_copies: int) -> None: - """ - Deletes the server file directories created for parallel experiments. - - Args: - num_copies (int): The number of server directories to delete. - """ - for i in range(num_copies): - dest_path = f"./tasks/server_data_{i}/" - delete_server_files(dest_path) - -def copy_server_files(source_path: str, dest_path: str) -> None: - """ - Recursively copies server files from a source to a destination. - - Args: - source_path (str): The source directory. - dest_path (str): The destination directory. - """ - try: - shutil.copytree(source_path, dest_path) - logging.info(f"Server files copied to {dest_path}") - except Exception as e: - logging.error(f"Error copying server files: {e}") - time.sleep(10) - - same_files = check_same_files(source_path, dest_path) - if not same_files: - copy_server_files(source_path, dest_path) - logging.warning("The destination path does not contain all the same files as the source path.") - else: - logging.info("The destination path contains all the same files as the source path.") - -def check_same_files(d1: str, d2: str) -> bool: - """ - Checks if two directories contain the same set of file and directory names. - This is a shallow check and does not compare file contents. - - Args: - d1 (str): Path to the first directory. - d2 (str): Path to the second directory. - - Returns: - bool: True if the contents are the same, False otherwise. - """ - try: - items1 = set(os.listdir(d1)) - items2 = set(os.listdir(d2)) - return items1 == items2 - except FileNotFoundError as e: - logging.error(f"Directory not found for comparison: {e}") - return False - -def delete_server_files(dest_path: str) -> None: - """ - Deletes the server files at the specified destination path. - - Args: - dest_path (str): The path to the server directory to delete. - """ - try: - shutil.rmtree(dest_path) - logging.info(f"Server files deleted from {dest_path}") - except Exception as e: - logging.error(f"Error deleting server files: {e}") - if not os.path.exists(dest_path): - logging.info("Server files deleted successfully.") - # else: - # logging.error("Error deleting server files.") - # delete_server_files(dest_path) - - -def launch_world(server_path: str = "./tasks/server_data/", - agent_names: List[str] = ["andy", "jill"], - session_name: str = "server", - port: int = 55916) -> None: - """ - Launches the Minecraft server in a new tmux session. - - Args: - server_path (str): The path to the server directory. - agent_names (List[str]): A list of agent names (used for logging). - session_name (str): The name for the new tmux session. - port (int): The port the server will run on. - """ - logging.info(f"Launching Minecraft world with port {port}...") - cmd = f"cd {server_path} && java -jar server.jar" - subprocess.run(['tmux', 'new-session', '-d', '-s', session_name], check=True) - subprocess.run(["tmux", "send-keys", "-t", session_name, cmd, "C-m"]) - time.sleep(30) # Increased sleep time to ensure server starts - logging.info("Server launch command sent. Continuing with experiment setup.") - -def kill_world(session_name: str = "server") -> None: - """ - Kills the Minecraft server's tmux session. - - Args: - session_name (str): The name of the tmux session to kill. - """ - subprocess.run(["tmux", "send-keys", "-t", session_name, "stop", "C-m"]) - time.sleep(5) - subprocess.run(["tmux", "kill-session", "-t", session_name]) - -def detach_process(command: List[str]) -> int | None: - """ - Launches a subprocess and detaches it to run independently. - - Args: - command (List[str]): A list of strings representing the command to execute. - - Returns: - Optional[int]: The PID of the detached process, or None on failure. - """ - - try: - # Create a new process group so the child doesn't get signals intended for the parent. - # This is crucial for proper detachment. - kwargs = {} - if sys.platform == 'win32': - kwargs.update(creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) # Windows specific - - process = subprocess.Popen(command, - stdin=subprocess.PIPE, # Prevent stdin blocking - stdout=subprocess.PIPE, # Redirect stdout - stderr=subprocess.PIPE, # Redirect stderr - close_fds=True, # Close open file descriptors - **kwargs) - - logging.info(f"Process launched with PID: {process.pid}") - return process.pid # Return the PID of the detached process - - except FileNotFoundError: - logging.error(f"Error: Command not found: {command}") - return None - except Exception as e: - logging.error(f"An error occurred: {e}") - return None - def main() -> None: """ Main entry point for the evaluation script. @@ -919,7 +441,6 @@ def main() -> None: parser.add_argument('--template_profile', default="profiles/tasks/crafting_profile.json", help='Model to use for the agents') parser.add_argument('--model', default="gpt-4o-mini", help='Model to use for the agents') parser.add_argument('--api', default="openai", help='API to use for the agents') - # parser.add_argument('--world_name', default="Forest", help='Name of the world') parser.add_argument('--insecure_coding', action='store_true', help='Enable insecure coding') parser.add_argument('--url', default="http://127.0.0.1:8000/v1") parser.add_argument('--max_messages', default=15, type=int, help='Maximum number of messages before summarizing') @@ -941,7 +462,7 @@ def main() -> None: if not args.no_launch_world: try: subprocess.run(['tmux', 'kill-server'], check=True) - except: + except subprocess.CalledProcessError: logging.info("No tmux session to kill") # delete all server files diff --git a/tasks/experiment_utils.py b/tasks/experiment_utils.py new file mode 100644 index 0000000..304d1b4 --- /dev/null +++ b/tasks/experiment_utils.py @@ -0,0 +1,377 @@ +import json +import logging +import os +import re +import shutil +import subprocess +import sys +import time +from typing import Any, Dict, List, Tuple + +# Set up basic logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def read_settings(file_path: str) -> List[str]: + """ + Reads and parses a settings.js file to extract agent profile names. + This function is designed to handle the JavaScript export format by stripping + comments, trailing commas, and the 'export default' statement before parsing + it as JSON. + Args: + file_path (str): The path to the settings.js file. + Returns: + List[str]: A list of agent names extracted from the profiles. + """ + with open(file_path, 'r', encoding='utf-8') as file: + content = file.read() + + # Remove `export default` and trailing commas + content = re.sub(r'export\s+default', '', content) + content = re.sub(r',\s*(?=[}\]])', '', content) + + # Remove JavaScript comments + content = re.sub(r'//.*', '', content) + + # Remove trailing commas (e.g., before } or ]) + content = re.sub(r',\s*(?=[}\]])', '', content) + + # Strip leading and trailing whitespace + content = content.strip() + + json_data = json.loads(content) + + profiles = json_data['profiles'] + + ## profiles is a list of strings like "./andy.json" and "./bob.json" + + agent_names = [profile.split('/')[-1].split('.')[0] for profile in profiles] + return agent_names + +def update_keys_json() -> None: + """ + Updates the keys.json file with values from environment variables. + This function reads `keys.example.json`, iterates through its keys, and + replaces the values with corresponding environment variables if they exist. + The result is written to `keys.json`. + """ + with open("keys.example.json", 'r', encoding='utf-8') as file: + content = file.read() + data = json.loads(content) + + # Update keys with environment variables + for key in data.keys(): + env_value = os.getenv(key) # Fetch from environment variables + if env_value: # If the variable exists, update it + data[key] = env_value + + with open("keys.json", 'w', encoding='utf-8') as file: + json.dump(data, file, indent=4) + +def set_environment_variable_tmux_session(session_name: str, key: str, value: Any) -> None: + """ + Sets an environment variable within a running tmux session. + Args: + session_name (str): The name of the target tmux session. + key (str): The environment variable key to set. + value (Any): The value to assign to the key. + """ + subprocess.run(["tmux", "send-keys", "-t", session_name, f"export {key}={value}", "C-m"]) + +def make_profiles(agent_names: List[str], + models: List[str], + apis: List[str], + template_profile: str = "profiles/collab_profile.json", + url: str = "http://127.0.0.1:8000/v1") -> None: + """ + Generates JSON profile files for each agent based on a template. + Args: + agent_names (List[str]): List of agent names. + models (List[str]): List of model names corresponding to each agent. + apis (List[str]): List of API providers for each agent. + template_profile (str): Path to the template profile JSON file. + url (str): The API URL to use for vLLM models. + """ + assert len(agent_names) == len(models) + + with open(template_profile, 'r') as f: + content = f.read() + + profile = json.loads(content) + + for index in range(len(agent_names)): + profile["name"] = agent_names[index] + if apis[index] == "vllm": + profile["model"] = { + "api": "vllm", + "model": models[index], + "url": url + } + elif apis[index] == "ollama": + profile["model"] = { + "api": "ollama", + "model": models[index], + "embedding": "ollama" + } + else: + profile["model"] = models[index] + + with open(f"{agent_names[index]}.json", 'w') as f: + json.dump(profile, f, indent=4) + +def create_server_files(source_path: str, num_copies: int, world_name: str = "Forest") -> List[Tuple[str, int]]: + """ + Creates multiple copies of server files for parallel experiments. + Args: + source_path (str): The path to the source server files directory. + num_copies (int): The number of server copies to create. + world_name (str): The name of the world to set in server.properties. + Returns: + List[Tuple[str, int]]: A list of tuples, each containing the path and port + of a created server instance. + """ + logging.info("Creating server files...") + logging.info(num_copies) + servers = [] + for i in range(num_copies): + dest_path = f"./tasks/server_data_{i}/" + copy_server_files(source_path, dest_path) + logging.info(dest_path) + edit_file(dest_path + "server.properties", {"server-port": 55916 + i, + "level-name": world_name}) + servers.append((dest_path, 55916 + i)) + return servers + +def edit_file(file: str, content_dict: Dict[str, Any]) -> None: + """ + Edits a properties-style file by replacing values for given keys. + Args: + file (str): The path to the file to edit. + content_dict (Dict[str, Any]): A dictionary of key-value pairs to update. + """ + try: + with open(file, 'r') as f: + lines = f.readlines() + with open(file, 'w') as f: + for line in lines: + written = False + for key, value in content_dict.items(): + if line.startswith(key + "="): + f.write(f"{key}={value}\n") + written = True + break + if not written: + f.write(line) + logging.info(f"{file} updated with {content_dict}") + except Exception as e: + logging.error(f"Error editing file {file}: {e}") + + +def clean_up_server_files(num_copies: int) -> None: + """ + Deletes the server file directories created for parallel experiments. + Args: + num_copies (int): The number of server directories to delete. + """ + for i in range(num_copies): + dest_path = f"./tasks/server_data_{i}/" + delete_server_files(dest_path) + +def copy_server_files(source_path: str, dest_path: str) -> None: + """ + Recursively copies server files from a source to a destination. + Args: + source_path (str): The source directory. + dest_path (str): The destination directory. + """ + try: + shutil.copytree(source_path, dest_path) + logging.info(f"Server files copied to {dest_path}") + except Exception as e: + logging.error(f"Error copying server files: {e}") + time.sleep(1) # Give a moment for filesystem to catch up + + if not check_same_files(source_path, dest_path): + logging.warning("File copy incomplete, retrying...") + time.sleep(5) + shutil.rmtree(dest_path) + copy_server_files(source_path, dest_path) + else: + logging.info("Server files copied successfully.") + + +def check_same_files(d1: str, d2: str) -> bool: + """ + Checks if two directories contain the same set of file and directory names. + This is a shallow check and does not compare file contents. + Args: + d1 (str): Path to the first directory. + d2 (str): Path to the second directory. + Returns: + bool: True if the contents are the same, False otherwise. + """ + try: + items1 = set(os.listdir(d1)) + items2 = set(os.listdir(d2)) + return items1 == items2 + except FileNotFoundError as e: + logging.error(f"Directory not found for comparison: {e}") + return False + +def delete_server_files(dest_path: str) -> None: + """ + Deletes the server files at the specified destination path. + Args: + dest_path (str): The path to the server directory to delete. + """ + try: + if os.path.exists(dest_path): + shutil.rmtree(dest_path) + logging.info(f"Server files deleted from {dest_path}") + except Exception as e: + logging.error(f"Error deleting server files at {dest_path}: {e}") + + +def launch_world(server_path: str = "./tasks/server_data/", + session_name: str = "server", + port: int = 55916) -> None: + """ + Launches the Minecraft server in a new tmux session. + Args: + server_path (str): The path to the server directory. + session_name (str): The name for the new tmux session. + port (int): The port the server will run on. + """ + logging.info(f"Launching Minecraft world with port {port}...") + cmd = f"cd {server_path} && java -jar server.jar" + subprocess.run(['tmux', 'new-session', '-d', '-s', session_name], check=True) + subprocess.run(["tmux", "send-keys", "-t", session_name, cmd, "C-m"]) + time.sleep(30) # Increased sleep time to ensure server starts + logging.info("Server launch command sent. Continuing with experiment setup.") + +def kill_world(session_name: str = "server") -> None: + """ + Kills the Minecraft server's tmux session. + Args: + session_name (str): The name of the tmux session to kill. + """ + try: + subprocess.run(["tmux", "send-keys", "-t", session_name, "stop", "C-m"]) + time.sleep(5) + subprocess.run(["tmux", "kill-session", "-t", session_name], check=True) + logging.info(f"Successfully killed tmux session: {session_name}") + except subprocess.CalledProcessError: + logging.warning(f"tmux session {session_name} not found or already killed.") + + +def make_ops(agent_names: List[str], session_name: str) -> None: + """ + Makes the specified agents operators (ops) in the Minecraft world. + This is achieved by running a debug task to get the agents into the server, + then issuing the /op command from the server console. + Args: + agent_names (List[str]): A list of agent names to be made ops. + session_name (str): The tmux session name where the agents are running. + """ + logging.info('Making agents operators...') + + cmd = f"node main.js --task_path tasks/example_tasks.json --task_id debug_{len(agent_names)}_agent_timeout" + + subprocess.run(["tmux", "send-keys", "-t", session_name, cmd, "C-m"]) + + time.sleep(30) + + subprocess.run(["tmux", "send-keys", "-t", "server_" + session_name, f"/op @a", "C-m"]) + + ops_file_path = f"./tasks/server_data_{session_name}/ops.json" + + # Wait for ops.json to be created and populated + max_wait_time = 60 # seconds + start_time = time.time() + while time.time() - start_time < max_wait_time: + if os.path.exists(ops_file_path) and check_agent_ops(agent_names, ops_file=ops_file_path): + logging.info("Agents are operators! You are good to go :D") + return + time.sleep(5) + + logging.error("Failed to make agents operators within the time limit. Retrying...") + make_ops(agent_names, session_name) + + +def check_agent_ops(agent_names: List[str], ops_file: str = "ops.json") -> bool: + """ + Checks the ops.json file to verify that all agents are operators. + Args: + agent_names (List[str]): The list of agent names to check. + ops_file (str): The path to the ops.json file. + Returns: + bool: True if all agents are listed in the ops file, False otherwise. + """ + try: + with open(ops_file, "r") as f: + ops_data = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + return False + + ops_names = [op["name"] for op in ops_data] + + return all(agent in ops_names for agent in agent_names) + +def make_script_file_and_run(script_content: str, + file_name: str, + session_name: str = "0", + run_in_tmux: bool = True) -> None: + """ + Writes content to a script file and executes it. + Args: + script_content (str): The shell script content to write. + file_name (str): The path to the script file to be created. + session_name (str): The tmux session to run the script in. + run_in_tmux (bool): If True, run via tmux; otherwise, run directly. + """ + script_dir = os.path.dirname(file_name) + os.makedirs(script_dir, exist_ok=True) + assert os.path.exists(script_dir), f"Script directory {script_dir} was not created" + logging.info(f"Created script directory: {script_dir}") + + with open(file_name, 'w') as f: + f.write(script_content) + assert os.path.exists(file_name), f"Script file {file_name} was not created" + + script_file_run = "bash " + file_name + + if run_in_tmux: + subprocess.run(["tmux", "send-keys", "-t", session_name, script_file_run, "C-m"]) + else: + subprocess.run(script_file_run, shell=True) + +def detach_process(command: List[str]) -> int | None: + """ + Launches a subprocess and detaches it to run independently. + Args: + command (List[str]): A list of strings representing the command to execute. + Returns: + Optional[int]: The PID of the detached process, or None on failure. + """ + try: + kwargs = {} + if sys.platform == 'win32': + kwargs.update(creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) + else: + kwargs.update(preexec_fn=os.setsid) + + process = subprocess.Popen(command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + **kwargs) + + logging.info(f"Process launched with PID: {process.pid}") + return process.pid + + except FileNotFoundError: + logging.error(f"Error: Command not found: {command}") + return None + except Exception as e: + logging.error(f"An error occurred: {e}") + return None \ No newline at end of file