mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 15:48:29 +00:00
Still making WIP
This commit is contained in:
@@ -41,6 +41,7 @@ from crewai.utilities.constants import (
|
||||
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
|
||||
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
|
||||
from crewai.utilities.event_helpers import (
|
||||
emit_crew_finish,
|
||||
emit_crew_start,
|
||||
emit_task_finish,
|
||||
emit_task_start,
|
||||
@@ -510,7 +511,7 @@ class Crew(BaseModel):
|
||||
for metric in metrics:
|
||||
self.usage_metrics.add_usage_metrics(metric)
|
||||
|
||||
# TODO: ADD CREW FINISH EVENT
|
||||
emit_crew_finish(self, result)
|
||||
|
||||
return result
|
||||
|
||||
@@ -852,10 +853,19 @@ class Crew(BaseModel):
|
||||
for future_task, future, task_index in futures:
|
||||
task_output = future.result()
|
||||
task_outputs.append(task_output)
|
||||
self._process_task_result(future_task, task_output)
|
||||
self._store_execution_log(
|
||||
future_task, task_output, task_index, was_replayed
|
||||
emit_task_finish(
|
||||
future_task,
|
||||
self._inputs if self._inputs else {},
|
||||
task_output,
|
||||
task_index,
|
||||
was_replayed,
|
||||
)
|
||||
|
||||
# TODO: ADD ELSEWHERE
|
||||
# self._process_task_result(future_task, task_output)
|
||||
# self._store_execution_log(
|
||||
# future_task, task_output, task_index, was_replayed
|
||||
# )
|
||||
return task_outputs
|
||||
|
||||
def _find_task_index(
|
||||
@@ -1040,6 +1050,7 @@ class Crew(BaseModel):
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
"""Test and evaluate the Crew with the given inputs for n iterations concurrently using concurrent.futures."""
|
||||
# TODO: Event Emit test execution start
|
||||
self._test_execution_span = self._telemetry.test_execution_span(
|
||||
self,
|
||||
n_iterations,
|
||||
@@ -1053,6 +1064,7 @@ class Crew(BaseModel):
|
||||
self.kickoff(inputs=inputs)
|
||||
|
||||
evaluator.print_crew_evaluation_result()
|
||||
# TODO:
|
||||
|
||||
def __rshift__(self, other: "Crew") -> "Pipeline":
|
||||
"""
|
||||
|
||||
@@ -41,6 +41,19 @@ class CrewOutput(BaseModel):
|
||||
output_dict.update(self.pydantic.model_dump())
|
||||
return output_dict
|
||||
|
||||
def serialize(self) -> Dict[str, Any]:
|
||||
"""Serialize the CrewOutput into a dictionary excluding complex objects."""
|
||||
serialized_data = {
|
||||
"raw": self.raw,
|
||||
"pydantic": self.pydantic.model_dump() if self.pydantic else None,
|
||||
"json_dict": self.json_dict,
|
||||
"tasks_output": [
|
||||
task_output.serialize() for task_output in self.tasks_output
|
||||
],
|
||||
"token_usage": self.token_usage.model_dump(),
|
||||
}
|
||||
return {k: v for k, v in serialized_data.items() if v is not None}
|
||||
|
||||
def __getitem__(self, key):
|
||||
if self.pydantic and hasattr(self.pydantic, key):
|
||||
return getattr(self.pydantic, key)
|
||||
|
||||
@@ -56,6 +56,21 @@ class TaskOutput(BaseModel):
|
||||
output_dict.update(self.pydantic.model_dump())
|
||||
return output_dict
|
||||
|
||||
def serialize(self) -> Dict[str, Any]:
|
||||
"""Serialize the TaskOutput into a dictionary excluding complex objects."""
|
||||
serialized_data = {
|
||||
"description": self.description,
|
||||
"name": self.name,
|
||||
"expected_output": self.expected_output,
|
||||
"summary": self.summary,
|
||||
"raw": self.raw,
|
||||
"pydantic": self.pydantic.model_dump() if self.pydantic else None,
|
||||
"json_dict": self.json_dict,
|
||||
"agent": self.agent,
|
||||
"output_format": self.output_format.value,
|
||||
}
|
||||
return {k: v for k, v in serialized_data.items() if v is not None}
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.pydantic:
|
||||
return str(self.pydantic)
|
||||
|
||||
@@ -21,21 +21,15 @@ class CrewEventEmitter:
|
||||
self._all_signal = signal("all")
|
||||
|
||||
def on(self, event_name: CrewEvents | str, callback: Callable) -> None:
|
||||
print("Connecting signal:", event_name)
|
||||
if event_name == "*" or event_name == "all":
|
||||
self._all_signal.connect(callback, weak=False)
|
||||
print("Connected to all_signal")
|
||||
else:
|
||||
signal(
|
||||
event_name.value if isinstance(event_name, CrewEvents) else event_name
|
||||
).connect(callback, weak=False)
|
||||
|
||||
def emit(self, event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
|
||||
print(f"Emitting signal: {event_name.value}")
|
||||
print("args", args)
|
||||
print("kwargs", kwargs)
|
||||
signal(event_name.value).send(*args, **kwargs)
|
||||
print(f"Emitting all signal for: {event_name.value}")
|
||||
self._all_signal.send(*args, event=event_name.value, **kwargs)
|
||||
|
||||
|
||||
@@ -43,9 +37,6 @@ crew_events = CrewEventEmitter()
|
||||
|
||||
|
||||
def emit(event_name: CrewEvents, *args: Any, **kwargs: Any) -> None:
|
||||
print("Calling emit", event_name)
|
||||
print("Args:", args)
|
||||
print("Kwargs:", kwargs)
|
||||
try:
|
||||
crew_events.emit(event_name, *args, **kwargs)
|
||||
except Exception as e:
|
||||
|
||||
@@ -7,6 +7,7 @@ from crewai.utilities.event_emitter import CrewEvents, emit
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from crewai.crew import Crew
|
||||
from crewai.crews.crew_output import CrewOutput
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
|
||||
@@ -25,15 +26,16 @@ def emit_crew_start(
|
||||
)
|
||||
|
||||
|
||||
def emit_crew_finish(crew_id: str, name: str, result: Any, duration: float) -> None:
|
||||
def emit_crew_finish(crew: "Crew", result: "CrewOutput") -> None:
|
||||
serialized_crew = crew.serialize()
|
||||
serialized_result = result.serialize()
|
||||
print("emit crew finish")
|
||||
|
||||
emit(
|
||||
CrewEvents.CREW_FINISH,
|
||||
{
|
||||
"crew_id": crew_id,
|
||||
"name": name,
|
||||
"finish_time": datetime.now().isoformat(),
|
||||
"result": result,
|
||||
"duration": duration,
|
||||
**serialized_crew,
|
||||
"result": serialized_result,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -56,7 +58,7 @@ def emit_crew_failure(
|
||||
|
||||
|
||||
def emit_task_start(
|
||||
task: Task,
|
||||
task: "Task",
|
||||
agent_role: str = "None",
|
||||
) -> None:
|
||||
serialized_task = task.serialize()
|
||||
@@ -70,9 +72,9 @@ def emit_task_start(
|
||||
|
||||
|
||||
def emit_task_finish(
|
||||
task: Task,
|
||||
task: "Task",
|
||||
inputs: Dict[str, Any],
|
||||
output: TaskOutput,
|
||||
output: "TaskOutput",
|
||||
task_index: int,
|
||||
was_replayed: bool = False,
|
||||
) -> None:
|
||||
|
||||
Reference in New Issue
Block a user