WIP: need to fix json encoder

This commit is contained in:
Lorenze Jay
2024-07-08 14:36:41 -07:00
parent 363ce5e9ce
commit 1a44a34c17
2 changed files with 163 additions and 10 deletions

View File

@@ -1,6 +1,7 @@
import asyncio import asyncio
import json import json
import uuid import uuid
from datetime import datetime
from concurrent.futures import Future from concurrent.futures import Future
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
@@ -32,6 +33,7 @@ from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
from crewai.utilities.training_handler import CrewTrainingHandler from crewai.utilities.training_handler import CrewTrainingHandler
@@ -132,6 +134,16 @@ class Crew(BaseModel):
default=False, default=False,
description="output_log_file", description="output_log_file",
) )
task_execution_output_json_files: Optional[List[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
)
execution_logs: List[Dict[str, Any]] = Field(
default=[],
description="List of execution logs for tasks",
)
_log_file: str = PrivateAttr(default="crew_tasks_output.json")
@field_validator("id", mode="before") @field_validator("id", mode="before")
@classmethod @classmethod
@@ -436,12 +448,25 @@ class Crew(BaseModel):
return results return results
def _store_execution_log(self, task, output, task_index):
print("output passeed in", output)
log = {
"task_id": str(task.id),
"description": task.description,
"agent_role": task.agent.role if task.agent else "None",
"output": output,
"timestamp": datetime.now().isoformat(),
"task_index": task_index,
}
self.execution_logs.append(log)
print("execution_logs", self.execution_logs)
def _run_sequential_process(self) -> CrewOutput: def _run_sequential_process(self) -> CrewOutput:
"""Executes tasks sequentially and returns the final output.""" """Executes tasks sequentially and returns the final output."""
task_outputs: List[TaskOutput] = [] task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput]]] = [] futures: List[Tuple[Task, Future[TaskOutput]]] = []
# execution_logs: List[Dict[str, Any]] = []
for task in self.tasks: for task_index, task in enumerate(self.tasks):
if task.agent and task.agent.allow_delegation: if task.agent and task.agent.allow_delegation:
agents_for_delegation = [ agents_for_delegation = [
agent for agent in self.agents if agent != task.agent agent for agent in self.agents if agent != task.agent
@@ -469,11 +494,14 @@ class Crew(BaseModel):
else: else:
# Before executing a synchronous task, wait for all async tasks to complete # Before executing a synchronous task, wait for all async tasks to complete
if futures: if futures:
print("futures for sync task", futures)
# Clear task_outputs before processing async tasks # Clear task_outputs before processing async tasks
task_outputs = [] task_outputs = []
for future_task, future in futures: for future_task, future in futures:
task_output = future.result() task_output = future.result()
task_outputs.append(task_output) task_outputs.append(task_output)
self._store_execution_log(future_task, task_output, task_index)
self._process_task_result(future_task, task_output) self._process_task_result(future_task, task_output)
# Clear the futures list after processing all async results # Clear the futures list after processing all async results
@@ -485,18 +513,23 @@ class Crew(BaseModel):
) )
task_outputs = [task_output] task_outputs = [task_output]
self._process_task_result(task, task_output) self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index)
if futures: if futures:
print("there are some async tasks we need to eecute in the future", futures)
# Clear task_outputs before processing async tasks # Clear task_outputs before processing async tasks
task_outputs = [] task_outputs = self._process_async_tasks(futures, len(self.tasks))
for future_task, future in futures:
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
print("task_outputs from futures", task_outputs)
# task_outputs = []
# for future_task, future in futures:
# task_output = future.result()
# task_outputs.append(task_output)
# self._process_task_result(future_task, task_output)
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs) final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
self._finish_execution(final_string_output) self._finish_execution(final_string_output)
print("self.execution_logs", self.execution_logs)
self.save_execution_logs()
token_usage = self.calculate_usage_metrics() token_usage = self.calculate_usage_metrics()
return self._format_output(task_outputs, token_usage) return self._format_output(task_outputs, token_usage)
@@ -507,6 +540,126 @@ class Crew(BaseModel):
if self.output_log_file: if self.output_log_file:
self._file_handler.log(agent=role, task=output, status="completed") self._file_handler.log(agent=role, task=output, status="completed")
def _process_async_tasks(
self,
futures: List[Tuple[Task, Future[TaskOutput]]],
task_index: int,
) -> List[TaskOutput]:
task_outputs = []
for future_task, future in futures:
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(future_task, task_output, task_index)
return task_outputs
def _create_execution_log(
self, task: Task, output: TaskOutput, task_index: int
) -> Dict[str, Any]:
return {
"task_id": str(task.id),
"task_index": task_index,
"task_description": task.description,
"agent_role": task.agent.role if task.agent else "None",
"output": output.raw_output,
"timestamp": datetime.now().isoformat(),
"task": task.model_dump(),
}
def replay_from_task(self, task_id: UUID4, use_stored_logs: bool = False):
"""Replay execution from a specific task and continue through subsequent tasks."""
task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput]]] = []
execution_logs: List[Dict[str, Any]] = []
if use_stored_logs:
self.load_execution_logs()
# Load the task outputs from the crew_tasks_output.json file
with open("crew_tasks_output.json", "r") as f:
stored_outputs = json.load(f)
# Find the index of the task with the given task_id
start_index = next(
(
index
for (index, d) in enumerate(stored_outputs)
if d["task_id"] == str(task_id)
),
None,
)
if start_index is None:
raise ValueError(f"Task with id {task_id} not found in the task outputs.")
# Run the tasks sequentially starting from the task_id
for task_index, stored_output in enumerate(stored_outputs[start_index:]):
task = Task(**stored_output["task"])
if task.async_execution:
context = aggregate_raw_outputs_from_task_outputs(stored_outputs)
future = task.execute_async(
agent=task.agent, context=context, tools=task.tools
)
futures.append((task, future))
else:
# Before executing a synchronous task, wait for all async tasks to complete
if futures:
print("futures for sync task", futures)
# Clear task_outputs before processing async tasks
task_outputs = []
for future_task, future in futures:
task_output = future.result()
task_outputs.append(task_output)
execution_logs.append(
self._create_execution_log(
future_task, task_output, task_index
)
)
self._process_task_result(future_task, task_output)
# Clear the futures list after processing all async results
futures.clear()
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
task_output = task.execute_sync(
agent=task.agent, context=context, tools=task.tools
)
task_outputs = [task_output]
self._process_task_result(task, task_output)
execution_logs.append(
self._create_execution_log(task, task_output, task_index)
)
def save_execution_logs(self, filename: str | None = None):
"""Save execution logs to a file."""
if filename:
self._log_file = filename
try:
with open(self._log_file, "w") as f:
json.dump(self.execution_logs, f, indent=2, cls=CrewJSONEncoder)
except Exception as e:
self._logger.log("error", f"Failed to save execution logs: {str(e)}")
def load_execution_logs(self, filename: str | None = None):
"""Load execution logs from a file."""
if filename:
self._log_file = filename
try:
with open(self._log_file, "r") as f:
self.execution_logs = json.load(f)
except FileNotFoundError:
self._logger.log(
"warning",
f"Log file {self._log_file} not found. Starting with empty logs.",
)
self.execution_logs = []
except json.JSONDecodeError:
self._logger.log(
"error",
f"Failed to parse log file {self._log_file}. Starting with empty logs.",
)
self.execution_logs = []
def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]: def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]:
"""Creates and assigns a manager agent to make sure the crew completes the tasks.""" """Creates and assigns a manager agent to make sure the crew completes the tasks."""
i18n = I18N(prompt_file=self.prompt_file) i18n = I18N(prompt_file=self.prompt_file)

View File

@@ -222,8 +222,8 @@ class Task(BaseModel):
if self.context: if self.context:
task_outputs: List[TaskOutput] = [] task_outputs: List[TaskOutput] = []
for task in self.context: for task in self.context:
if task.async_execution: # if task.async_execution:
task.wait_for_completion() # task.wait_for_completion()
if task.output: if task.output:
task_outputs.append(task.output) task_outputs.append(task.output)
context = aggregate_raw_outputs_from_task_outputs(task_outputs) context = aggregate_raw_outputs_from_task_outputs(task_outputs)