From 1a44a34c177e398c28a5109b796bbb391e23219c Mon Sep 17 00:00:00 2001 From: Lorenze Jay Date: Mon, 8 Jul 2024 14:36:41 -0700 Subject: [PATCH] WIP: need to fix json encoder --- src/crewai/crew.py | 169 ++++++++++++++++++++++++++++++++++++++++++--- src/crewai/task.py | 4 +- 2 files changed, 163 insertions(+), 10 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 5a1fc87fe..78a8c698f 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -1,6 +1,7 @@ import asyncio import json import uuid +from datetime import datetime from concurrent.futures import Future from typing import Any, Dict, List, Optional, Tuple, Union @@ -32,6 +33,7 @@ from crewai.telemetry import Telemetry from crewai.tools.agent_tools import AgentTools from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE +from crewai.utilities.crew_json_encoder import CrewJSONEncoder from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs from crewai.utilities.training_handler import CrewTrainingHandler @@ -132,6 +134,16 @@ class Crew(BaseModel): default=False, description="output_log_file", ) + task_execution_output_json_files: Optional[List[str]] = Field( + default=None, + description="List of file paths for task execution JSON files.", + ) + execution_logs: List[Dict[str, Any]] = Field( + default=[], + description="List of execution logs for tasks", + ) + + _log_file: str = PrivateAttr(default="crew_tasks_output.json") @field_validator("id", mode="before") @classmethod @@ -436,12 +448,25 @@ class Crew(BaseModel): return results + def _store_execution_log(self, task, output, task_index): + print("output passeed in", output) + log = { + "task_id": str(task.id), + "description": task.description, + "agent_role": task.agent.role if task.agent else "None", + "output": output, + "timestamp": datetime.now().isoformat(), + "task_index": task_index, + } + self.execution_logs.append(log) + print("execution_logs", self.execution_logs) + def _run_sequential_process(self) -> CrewOutput: """Executes tasks sequentially and returns the final output.""" task_outputs: List[TaskOutput] = [] futures: List[Tuple[Task, Future[TaskOutput]]] = [] - - for task in self.tasks: + # execution_logs: List[Dict[str, Any]] = [] + for task_index, task in enumerate(self.tasks): if task.agent and task.agent.allow_delegation: agents_for_delegation = [ agent for agent in self.agents if agent != task.agent @@ -469,11 +494,14 @@ class Crew(BaseModel): else: # Before executing a synchronous task, wait for all async tasks to complete if futures: + print("futures for sync task", futures) # Clear task_outputs before processing async tasks task_outputs = [] for future_task, future in futures: task_output = future.result() task_outputs.append(task_output) + self._store_execution_log(future_task, task_output, task_index) + self._process_task_result(future_task, task_output) # Clear the futures list after processing all async results @@ -485,18 +513,23 @@ class Crew(BaseModel): ) task_outputs = [task_output] self._process_task_result(task, task_output) + self._store_execution_log(task, task_output, task_index) if futures: + print("there are some async tasks we need to eecute in the future", futures) # Clear task_outputs before processing async tasks - task_outputs = [] - for future_task, future in futures: - task_output = future.result() - task_outputs.append(task_output) - self._process_task_result(future_task, task_output) + task_outputs = self._process_async_tasks(futures, len(self.tasks)) + print("task_outputs from futures", task_outputs) + # task_outputs = [] + # for future_task, future in futures: + # task_output = future.result() + # task_outputs.append(task_output) + # self._process_task_result(future_task, task_output) final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) self._finish_execution(final_string_output) - + print("self.execution_logs", self.execution_logs) + self.save_execution_logs() token_usage = self.calculate_usage_metrics() return self._format_output(task_outputs, token_usage) @@ -507,6 +540,126 @@ class Crew(BaseModel): if self.output_log_file: self._file_handler.log(agent=role, task=output, status="completed") + def _process_async_tasks( + self, + futures: List[Tuple[Task, Future[TaskOutput]]], + task_index: int, + ) -> List[TaskOutput]: + task_outputs = [] + for future_task, future in futures: + task_output = future.result() + task_outputs.append(task_output) + self._process_task_result(future_task, task_output) + self._store_execution_log(future_task, task_output, task_index) + + return task_outputs + + def _create_execution_log( + self, task: Task, output: TaskOutput, task_index: int + ) -> Dict[str, Any]: + return { + "task_id": str(task.id), + "task_index": task_index, + "task_description": task.description, + "agent_role": task.agent.role if task.agent else "None", + "output": output.raw_output, + "timestamp": datetime.now().isoformat(), + "task": task.model_dump(), + } + + def replay_from_task(self, task_id: UUID4, use_stored_logs: bool = False): + """Replay execution from a specific task and continue through subsequent tasks.""" + task_outputs: List[TaskOutput] = [] + futures: List[Tuple[Task, Future[TaskOutput]]] = [] + execution_logs: List[Dict[str, Any]] = [] + + if use_stored_logs: + self.load_execution_logs() + + # Load the task outputs from the crew_tasks_output.json file + with open("crew_tasks_output.json", "r") as f: + stored_outputs = json.load(f) + # Find the index of the task with the given task_id + start_index = next( + ( + index + for (index, d) in enumerate(stored_outputs) + if d["task_id"] == str(task_id) + ), + None, + ) + + if start_index is None: + raise ValueError(f"Task with id {task_id} not found in the task outputs.") + + # Run the tasks sequentially starting from the task_id + for task_index, stored_output in enumerate(stored_outputs[start_index:]): + task = Task(**stored_output["task"]) + if task.async_execution: + context = aggregate_raw_outputs_from_task_outputs(stored_outputs) + future = task.execute_async( + agent=task.agent, context=context, tools=task.tools + ) + futures.append((task, future)) + else: + # Before executing a synchronous task, wait for all async tasks to complete + if futures: + print("futures for sync task", futures) + # Clear task_outputs before processing async tasks + task_outputs = [] + for future_task, future in futures: + task_output = future.result() + task_outputs.append(task_output) + execution_logs.append( + self._create_execution_log( + future_task, task_output, task_index + ) + ) + self._process_task_result(future_task, task_output) + + # Clear the futures list after processing all async results + futures.clear() + + context = aggregate_raw_outputs_from_task_outputs(task_outputs) + task_output = task.execute_sync( + agent=task.agent, context=context, tools=task.tools + ) + task_outputs = [task_output] + self._process_task_result(task, task_output) + execution_logs.append( + self._create_execution_log(task, task_output, task_index) + ) + + def save_execution_logs(self, filename: str | None = None): + """Save execution logs to a file.""" + if filename: + self._log_file = filename + try: + with open(self._log_file, "w") as f: + json.dump(self.execution_logs, f, indent=2, cls=CrewJSONEncoder) + except Exception as e: + self._logger.log("error", f"Failed to save execution logs: {str(e)}") + + def load_execution_logs(self, filename: str | None = None): + """Load execution logs from a file.""" + if filename: + self._log_file = filename + try: + with open(self._log_file, "r") as f: + self.execution_logs = json.load(f) + except FileNotFoundError: + self._logger.log( + "warning", + f"Log file {self._log_file} not found. Starting with empty logs.", + ) + self.execution_logs = [] + except json.JSONDecodeError: + self._logger.log( + "error", + f"Failed to parse log file {self._log_file}. Starting with empty logs.", + ) + self.execution_logs = [] + def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]: """Creates and assigns a manager agent to make sure the crew completes the tasks.""" i18n = I18N(prompt_file=self.prompt_file) diff --git a/src/crewai/task.py b/src/crewai/task.py index 06d2808df..fc018ade0 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -222,8 +222,8 @@ class Task(BaseModel): if self.context: task_outputs: List[TaskOutput] = [] for task in self.context: - if task.async_execution: - task.wait_for_completion() + # if task.async_execution: + # task.wait_for_completion() if task.output: task_outputs.append(task.output) context = aggregate_raw_outputs_from_task_outputs(task_outputs)