From 0f12b701b264f156a97b907c7653611264d5154b Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Thu, 17 Oct 2024 12:51:38 -0400 Subject: [PATCH] Still making WIP --- src/crewai/crew.py | 20 ++++++++++++++++---- src/crewai/crews/crew_output.py | 13 +++++++++++++ src/crewai/tasks/task_output.py | 15 +++++++++++++++ src/crewai/utilities/event_emitter.py | 9 --------- src/crewai/utilities/event_helpers.py | 20 +++++++++++--------- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index f66ff3250..b2c47d5a4 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -41,6 +41,7 @@ from crewai.utilities.constants import ( from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.event_helpers import ( + emit_crew_finish, emit_crew_start, emit_task_finish, emit_task_start, @@ -510,7 +511,7 @@ class Crew(BaseModel): for metric in metrics: self.usage_metrics.add_usage_metrics(metric) - # TODO: ADD CREW FINISH EVENT + emit_crew_finish(self, result) return result @@ -852,10 +853,19 @@ class Crew(BaseModel): for future_task, future, task_index in futures: task_output = future.result() task_outputs.append(task_output) - self._process_task_result(future_task, task_output) - self._store_execution_log( - future_task, task_output, task_index, was_replayed + emit_task_finish( + future_task, + self._inputs if self._inputs else {}, + task_output, + task_index, + was_replayed, ) + + # TODO: ADD ELSEWHERE + # self._process_task_result(future_task, task_output) + # self._store_execution_log( + # future_task, task_output, task_index, was_replayed + # ) return task_outputs def _find_task_index( @@ -1040,6 +1050,7 @@ class Crew(BaseModel): inputs: Optional[Dict[str, Any]] = None, ) -> None: """Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures.""" + # TODO: Event Emit test execution start self._test_execution_span = self._telemetry.test_execution_span( self, n_iterations, @@ -1053,6 +1064,7 @@ class Crew(BaseModel): self.kickoff(inputs=inputs) evaluator.print_crew_evaluation_result() + # TODO: def __rshift__(self, other: "Crew") -> "Pipeline": """ diff --git a/src/crewai/crews/crew_output.py b/src/crewai/crews/crew_output.py index c9a92a0d0..71b91e850 100644 --- a/src/crewai/crews/crew_output.py +++ b/src/crewai/crews/crew_output.py @@ -41,6 +41,19 @@ class CrewOutput(BaseModel): output_dict.update(self.pydantic.model_dump()) return output_dict + def serialize(self) -> Dict[str, Any]: + """Serialize the CrewOutput into a dictionary excluding complex objects.""" + serialized_data = { + "raw": self.raw, + "pydantic": self.pydantic.model_dump() if self.pydantic else None, + "json_dict": self.json_dict, + "tasks_output": [ + task_output.serialize() for task_output in self.tasks_output + ], + "token_usage": self.token_usage.model_dump(), + } + return {k: v for k, v in serialized_data.items() if v is not None} + def __getitem__(self, key): if self.pydantic and hasattr(self.pydantic, key): return getattr(self.pydantic, key) diff --git a/src/crewai/tasks/task_output.py b/src/crewai/tasks/task_output.py index b0e8aecd4..9a9a321fb 100644 --- a/src/crewai/tasks/task_output.py +++ b/src/crewai/tasks/task_output.py @@ -56,6 +56,21 @@ class TaskOutput(BaseModel): output_dict.update(self.pydantic.model_dump()) return output_dict + def serialize(self) -> Dict[str, Any]: + """Serialize the TaskOutput into a dictionary excluding complex objects.""" + serialized_data = { + "description": self.description, + "name": self.name, + "expected_output": self.expected_output, + "summary": self.summary, + "raw": self.raw, + "pydantic": self.pydantic.model_dump() if self.pydantic else None, + "json_dict": self.json_dict, + "agent": self.agent, + "output_format": self.output_format.value, + } + return {k: v for k, v in serialized_data.items() if v is not None} + def __str__(self) -> str: if self.pydantic: return str(self.pydantic) diff --git a/src/crewai/utilities/event_emitter.py b/src/crewai/utilities/event_emitter.py index 71e120e3b..a662200c2 100644 --- a/src/crewai/utilities/event_emitter.py +++ b/src/crewai/utilities/event_emitter.py @@ -21,21 +21,15 @@ class CrewEventEmitter: self._all_signal = signal("all") def on(self, event_name: CrewEvents | str, callback: Callable) -> None: - print("Connecting signal:", event_name) if event_name == "*" or event_name == "all": self._all_signal.connect(callback, weak=False) - print("Connected to all_signal") else: signal( event_name.value if isinstance(event_name, CrewEvents) else event_name ).connect(callback, weak=False) def emit(self, event_name: CrewEvents, *args: Any, **kwargs: Any) -> None: - print(f"Emitting signal: {event_name.value}") - print("args", args) - print("kwargs", kwargs) signal(event_name.value).send(*args, **kwargs) - print(f"Emitting all signal for: {event_name.value}") self._all_signal.send(*args, event=event_name.value, **kwargs) @@ -43,9 +37,6 @@ crew_events = CrewEventEmitter() def emit(event_name: CrewEvents, *args: Any, **kwargs: Any) -> None: - print("Calling emit", event_name) - print("Args:", args) - print("Kwargs:", kwargs) try: crew_events.emit(event_name, *args, **kwargs) except Exception as e: diff --git a/src/crewai/utilities/event_helpers.py b/src/crewai/utilities/event_helpers.py index 00787a76f..4dfe69d3f 100644 --- a/src/crewai/utilities/event_helpers.py +++ b/src/crewai/utilities/event_helpers.py @@ -7,6 +7,7 @@ from crewai.utilities.event_emitter import CrewEvents, emit if TYPE_CHECKING: from crewai.crew import Crew + from crewai.crews.crew_output import CrewOutput from crewai.task import Task from crewai.tasks.task_output import TaskOutput @@ -25,15 +26,16 @@ def emit_crew_start( ) -def emit_crew_finish(crew_id: str, name: str, result: Any, duration: float) -> None: +def emit_crew_finish(crew: "Crew", result: "CrewOutput") -> None: + serialized_crew = crew.serialize() + serialized_result = result.serialize() + print("emit crew finish") + emit( CrewEvents.CREW_FINISH, { - "crew_id": crew_id, - "name": name, - "finish_time": datetime.now().isoformat(), - "result": result, - "duration": duration, + **serialized_crew, + "result": serialized_result, }, ) @@ -56,7 +58,7 @@ def emit_crew_failure( def emit_task_start( - task: Task, + task: "Task", agent_role: str = "None", ) -> None: serialized_task = task.serialize() @@ -70,9 +72,9 @@ def emit_task_start( def emit_task_finish( - task: Task, + task: "Task", inputs: Dict[str, Any], - output: TaskOutput, + output: "TaskOutput", task_index: int, was_replayed: bool = False, ) -> None: