Feature/kickoff for each sync (#680)

* Sync with deep copy working now

* async working!!

* Clean up code for review

* Fix naming

---------

Co-authored-by: João Moura <joaomdmoura@gmail.com>
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-06-11 11:51:39 -04:00
committed by GitHub
parent 3a77b418cb
commit e202592715
3 changed files with 190 additions and 39 deletions

View File

@@ -1,3 +1,4 @@
from copy import deepcopy
import os
import uuid
from typing import Any, Dict, List, Optional, Tuple
@@ -97,7 +98,8 @@ class Agent(BaseModel):
agent_executor: InstanceOf[CrewAgentExecutor] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
crew: Any = Field(
default=None, description="Crew to which the agent belongs.")
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
@@ -108,7 +110,8 @@ class Agent(BaseModel):
default=None,
description="Callback to be executed after each step of the agent execution.",
)
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
i18n: I18N = Field(
default=I18N(), description="Internationalization settings.")
llm: Any = Field(
default_factory=lambda: ChatOpenAI(
model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4o")
@@ -169,7 +172,8 @@ class Agent(BaseModel):
def set_agent_executor(self) -> "Agent":
"""set agent executor is set."""
if hasattr(self.llm, "model_name"):
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)
token_handler = TokenCalcHandler(
self.llm.model_name, self._token_process)
# Ensure self.llm.callbacks is a list
if not isinstance(self.llm.callbacks, list):
@@ -204,7 +208,8 @@ class Agent(BaseModel):
Output of the agent
"""
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
# type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")
self.tools_handler.last_used_tool = {}
task_prompt = task.prompt()
@@ -224,13 +229,15 @@ class Agent(BaseModel):
task_prompt += self.i18n.slice("memory").format(memory=memory)
tools = tools or self.tools
parsed_tools = self._parse_tools(tools) # type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]"
# type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]"
parsed_tools = self._parse_tools(tools)
self.create_agent_executor(tools=tools)
self.agent_executor.tools = parsed_tools
self.agent_executor.task = task
self.agent_executor.tools_description = render_text_description(parsed_tools)
self.agent_executor.tools_description = render_text_description(
parsed_tools)
self.agent_executor.tools_names = self.__tools_names(parsed_tools)
result = self.agent_executor.invoke(
@@ -328,7 +335,8 @@ class Agent(BaseModel):
)
bind = self.llm.bind(stop=stop_words)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(agent=self)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(
agent=self)
self.agent_executor = CrewAgentExecutor(
agent=RunnableAgent(runnable=inner_agent), **executor_args
)
@@ -363,8 +371,31 @@ class Agent(BaseModel):
thoughts += action.log
thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}"
return thoughts
def copy(self):
"""Create a deep copy of the Agent."""
exclude = {
"id",
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
}
def _parse_tools(self, tools: List[Any]) -> List[LangChainTool]: # type: ignore # Function "langchain_core.tools.tool" is not valid as a type
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = Agent(**copied_data)
copied_agent.tools = deepcopy(self.tools)
return copied_agent
# type: ignore # Function "langchain_core.tools.tool" is not valid as a type
def _parse_tools(self, tools: List[Any]) -> List[LangChainTool]:
"""Parse tools to be used for the task."""
# tentatively try to import from crewai_tools import BaseTool as CrewAITool
tools_list = []

View File

@@ -1,3 +1,4 @@
import asyncio
import json
import uuid
from typing import Any, Dict, List, Optional, Union
@@ -58,7 +59,8 @@ class Crew(BaseModel):
_rpm_controller: RPMController = PrivateAttr()
_logger: Logger = PrivateAttr()
_file_handler: FileHandler = PrivateAttr()
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler())
_cache_handler: InstanceOf[CacheHandler] = PrivateAttr(
default=CacheHandler())
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
@@ -153,7 +155,8 @@ class Crew(BaseModel):
self._logger = Logger(self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self._telemetry.crew_creation(self)
@@ -164,9 +167,7 @@ class Crew(BaseModel):
"""Set private attributes."""
if self.memory:
self._long_term_memory = LongTermMemory()
self._short_term_memory = ShortTermMemory(
crew=self, embedder_config=self.embedder
)
self._short_term_memory = ShortTermMemory(crew=self, embedder_config=self.embedder)
self._entity_memory = EntityMemory(crew=self, embedder_config=self.embedder)
return self
@@ -244,7 +245,8 @@ class Crew(BaseModel):
def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
self._interpolate_inputs(inputs) # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
i18n = I18N(prompt_file=self.prompt_file)
@@ -265,8 +267,10 @@ class Crew(BaseModel):
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result, manager_metrics = self._run_hierarchical_process() # type: ignore # Unpacking a string is disallowed
metrics.append(manager_metrics) # type: ignore # Cannot determine type of "manager_metrics"
# type: ignore # Unpacking a string is disallowed
result, manager_metrics = self._run_hierarchical_process()
# type: ignore # Cannot determine type of "manager_metrics"
metrics.append(manager_metrics)
else:
raise NotImplementedError(
@@ -282,6 +286,45 @@ class Crew(BaseModel):
return result
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List:
"""Executes the Crew's workflow for each input in the list and aggregates results."""
results = []
for input_data in inputs:
crew = self.copy()
for task in crew.tasks:
task.interpolate_inputs(input_data)
for agent in crew.agents:
agent.interpolate_inputs(input_data)
output = crew.kickoff()
results.append(output)
return results
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> Union[str, Dict]:
"""Asynchronous kickoff method to start the crew execution."""
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
async def run_crew(input_data):
crew = self.copy()
for task in crew.tasks:
task.interpolate_inputs(input_data)
for agent in crew.agents:
agent.interpolate_inputs(input_data)
return await crew.kickoff_async()
tasks = [asyncio.create_task(run_crew(input_data))
for input_data in inputs]
results = await asyncio.gather(*tasks)
return results
def train(self, n_iterations: int) -> None:
# TODO: Implement training
pass
@@ -298,7 +341,8 @@ class Crew(BaseModel):
task.tools += AgentTools(agents=agents_for_delegation).tools()
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
self._logger.log("debug", f"== Working Agent: {
role}", color="bold_purple")
self._logger.log(
"info", f"== Starting Task: {task.description}", color="bold_purple"
)
@@ -313,14 +357,17 @@ class Crew(BaseModel):
task_output = output
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
self._logger.log("debug", f"== [{role}] Task output: {
task_output}\n\n")
if self.output_log_file:
self._file_handler.log(agent=role, task=task_output, status="completed")
self._file_handler.log(
agent=role, task=task_output, status="completed")
self._finish_execution(task_output)
return self._format_output(task_output)
def _run_hierarchical_process(self) -> str:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
@@ -335,7 +382,8 @@ class Crew(BaseModel):
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
backstory=i18n.retrieve(
"hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
llm=self.manager_llm,
verbose=True,
@@ -355,7 +403,8 @@ class Crew(BaseModel):
agent=manager, context=task_output, tools=manager.tools
)
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
self._logger.log(
"debug", f"[{manager.role}] Task output: {task_output}")
if self.output_log_file:
self._file_handler.log(
@@ -363,7 +412,39 @@ class Crew(BaseModel):
)
self._finish_execution(task_output)
return self._format_output(task_output), manager._token_process.get_summary() # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
return self._format_output(task_output), manager._token_process.get_summary()
def copy(self):
"""Create a deep copy of the Crew."""
exclude = {
"id",
"_rpm_controller",
"_logger",
"_execution_span",
"_file_handler",
"_cache_handler",
"_short_term_memory",
"_long_term_memory",
"_entity_memory"
"agents",
"tasks",
}
cloned_agents = [agent.copy() for agent in self.agents]
cloned_tasks = [task.copy() for task in self.tasks]
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_data.pop("agents", None)
copied_data.pop("tasks", None)
copied_crew = Crew(**copied_data, agents=cloned_agents, tasks=cloned_tasks)
return copied_crew
def _set_tasks_callbacks(self) -> None:
"""Sets callback for every task suing task_callback"""
@@ -373,8 +454,11 @@ class Crew(BaseModel):
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolates the inputs in the tasks and agents."""
[task.interpolate_inputs(inputs) for task in self.tasks] # type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
[agent.interpolate_inputs(inputs) for agent in self.agents] # type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
[task.interpolate_inputs(
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
inputs) for task in self.tasks]
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
[agent.interpolate_inputs(inputs) for agent in self.agents]
def _format_output(self, output: str) -> str:
"""Formats the output of the crew execution."""

View File

@@ -1,3 +1,4 @@
from copy import deepcopy
import os
import re
import threading
@@ -163,13 +164,16 @@ class Task(BaseModel):
)
if self.context:
context = [] # type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None")
# type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None")
context = []
for task in self.context:
if task.async_execution:
task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join"
if task and task.output:
context.append(task.output.raw_output) # type: ignore # Item "str" of "str | None" has no attribute "append"
context = "\n".join(context) # type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]"
# type: ignore # Item "str" of "str | None" has no attribute "append"
context.append(task.output.raw_output)
# type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]"
context = "\n".join(context)
self.prompt_context = context
tools = tools or self.tools
@@ -232,7 +236,8 @@ class Task(BaseModel):
if inputs:
self.description = self._original_description.format(**inputs)
self.expected_output = self._original_expected_output.format(**inputs)
self.expected_output = self._original_expected_output.format(
**inputs)
def increment_tools_errors(self) -> None:
"""Increment the tools errors counter."""
@@ -242,6 +247,25 @@ class Task(BaseModel):
"""Increment the delegations counter."""
self.delegations += 1
def copy(self):
"""Create a deep copy of the Task."""
exclude = {
"id",
"agent",
"context",
"tools",
}
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
cloned_context = [task.copy() for task in self.context] if self.context else None
cloned_agent = self.agent.copy() if self.agent else None
cloned_tools = deepcopy(self.tools) if self.tools else None
copied_task = Task(**copied_data, context=cloned_context, agent=cloned_agent, tools=cloned_tools)
return copied_task
def _export_output(self, result: str) -> Any:
exported_result = result
instructions = "I'm gonna convert this raw text into valid JSON."
@@ -251,27 +275,35 @@ class Task(BaseModel):
# try to convert task_output directly to pydantic/json
try:
exported_result = model.model_validate_json(result) # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
exported_result = model.model_validate_json(result)
if self.output_json:
return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump"
# type: ignore # "str" has no attribute "model_dump"
return exported_result.model_dump()
return exported_result
except Exception:
# sometimes the response contains valid JSON in the middle of text
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
exported_result = model.model_validate_json(match.group(0)) # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
exported_result = model.model_validate_json(
match.group(0))
if self.output_json:
return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump"
# type: ignore # "str" has no attribute "model_dump"
return exported_result.model_dump()
return exported_result
except Exception:
pass
llm = self.agent.function_calling_llm or self.agent.llm # type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm"
# type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm"
llm = self.agent.function_calling_llm or self.agent.llm
if not self._is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema() # type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
# type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{
instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
@@ -284,14 +316,16 @@ class Task(BaseModel):
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
content=f"{
exported_result.message} Using raw output instead.",
color="red",
)
exported_result = result
if self.output_file:
content = (
exported_result if not self.output_pydantic else exported_result.json() # type: ignore # "str" has no attribute "json"
# type: ignore # "str" has no attribute "json"
exported_result if not self.output_pydantic else exported_result.json()
)
self._save_file(content)
@@ -301,12 +335,14 @@ class Task(BaseModel):
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def _save_file(self, result: Any) -> None:
directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
# type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
directory = os.path.dirname(self.output_file)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(self.output_file, "w", encoding="utf-8") as file: # type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
# type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
with open(self.output_file, "w", encoding='utf-8') as file:
file.write(result)
return None