Merge pull request #1635 from crewAIInc/gui/kickoff-callbacks

Move kickoff callbacks to crew's domain
This commit is contained in:
Gui Vieira
2024-11-20 14:37:52 -03:00
committed by GitHub
10 changed files with 2056 additions and 565 deletions

View File

@@ -5,7 +5,7 @@ import uuid
import warnings
from concurrent.futures import Future
from hashlib import md5
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union
from pydantic import (
UUID4,
@@ -36,9 +36,7 @@ from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import (
TRAINING_DATA_FILE,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.formatter import (
@@ -165,6 +163,16 @@ class Crew(BaseModel):
default=None,
description="Callback to be executed after each task for all agents execution.",
)
before_kickoff_callbacks: List[
Callable[[Optional[Dict[str, Any]]], Optional[Dict[str, Any]]]
] = Field(
default_factory=list,
description="List of callbacks to be executed before crew kickoff. It may be used to adjust inputs before the crew is executed.",
)
after_kickoff_callbacks: List[Callable[[CrewOutput], CrewOutput]] = Field(
default_factory=list,
description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.",
)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the crew execution to be respected.",
@@ -478,6 +486,9 @@ class Crew(BaseModel):
self,
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
for callback in self.before_kickoff_callbacks:
inputs = callback(inputs)
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._task_output_handler.reset()
@@ -520,6 +531,9 @@ class Crew(BaseModel):
f"The process '{self.process}' is not implemented yet."
)
for callback in self.after_kickoff_callbacks:
result = callback(result)
metrics += [agent._token_process.get_summary() for agent in self.agents]
self.usage_metrics = UsageMetrics()

View File

@@ -1,5 +1,7 @@
from .annotations import (
after_kickoff,
agent,
before_kickoff,
cache_handler,
callback,
crew,
@@ -9,8 +11,6 @@ from .annotations import (
pipeline,
task,
tool,
before_crew,
after_crew,
)
from .crew_base import CrewBase
from .pipeline_base import PipelineBase
@@ -28,6 +28,6 @@ __all__ = [
"llm",
"cache_handler",
"pipeline",
"before_crew",
"after_crew",
"before_kickoff",
"after_kickoff",
]

View File

@@ -5,13 +5,13 @@ from crewai import Crew
from crewai.project.utils import memoize
def before_crew(func):
func.is_before_crew = True
def before_kickoff(func):
func.is_before_kickoff = True
return func
def after_crew(func):
func.is_after_crew = True
def after_kickoff(func):
func.is_after_kickoff = True
return func
@@ -109,6 +109,19 @@ def crew(func) -> Callable[..., Crew]:
self.agents = instantiated_agents
self.tasks = instantiated_tasks
return func(self, *args, **kwargs)
crew = func(self, *args, **kwargs)
return wrapper
def callback_wrapper(callback, instance):
def wrapper(*args, **kwargs):
return callback(instance, *args, **kwargs)
return wrapper
for _, callback in self._before_kickoff.items():
crew.before_kickoff_callbacks.append(callback_wrapper(callback, self))
for _, callback in self._after_kickoff.items():
crew.after_kickoff_callbacks.append(callback_wrapper(callback, self))
return crew
return memoize(wrapper)

View File

@@ -43,8 +43,8 @@ def CrewBase(cls: T) -> T:
for attr in [
"is_task",
"is_agent",
"is_before_crew",
"is_after_crew",
"is_before_kickoff",
"is_after_kickoff",
"is_kickoff",
]
)
@@ -57,11 +57,11 @@ def CrewBase(cls: T) -> T:
self._original_agents = self._filter_functions(
self._original_functions, "is_agent"
)
self._before_crew = self._filter_functions(
self._original_functions, "is_before_crew"
self._before_kickoff = self._filter_functions(
self._original_functions, "is_before_kickoff"
)
self._after_crew = self._filter_functions(
self._original_functions, "is_after_crew"
self._after_kickoff = self._filter_functions(
self._original_functions, "is_after_kickoff"
)
self._kickoff = self._filter_functions(
self._original_functions, "is_kickoff"
@@ -213,25 +213,4 @@ def CrewBase(cls: T) -> T:
callback_functions[callback]() for callback in callbacks
]
def kickoff(self, inputs=None):
# Execute before_crew functions and allow them to modify inputs
for _, func in self._before_crew.items():
modified_inputs = func(self, inputs)
if modified_inputs is not None:
inputs = modified_inputs
# Get the crew instance
crew_instance = self.crew()
# Execute the crew's tasks
result = crew_instance.kickoff(inputs=inputs)
# Execute after_crew functions and allow them to modify the output
for _, func in self._after_crew.items():
modified_result = func(self, result)
if modified_result is not None:
result = modified_result
return result
return cast(T, WrappedClass)