From 946c56494e21a726a08652f8acc26d5311b950d2 Mon Sep 17 00:00:00 2001 From: "Brandon Hancock (bhancock_ai)" <109994880+bhancockio@users.noreply.github.com> Date: Tue, 11 Jun 2024 11:51:39 -0400 Subject: [PATCH] Feature/kickoff for each sync (#680) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Sync with deep copy working now * async working!! * Clean up code for review * Fix naming --------- Co-authored-by: João Moura --- src/crewai/agent.py | 47 +++++++++++++++--- src/crewai/crew.py | 116 ++++++++++++++++++++++++++++++++++++++------ src/crewai/task.py | 66 +++++++++++++++++++------ 3 files changed, 190 insertions(+), 39 deletions(-) diff --git a/src/crewai/agent.py b/src/crewai/agent.py index d8b2cdec5..9fb70b3fd 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,3 +1,4 @@ +from copy import deepcopy import os import uuid from typing import Any, Dict, List, Optional, Tuple @@ -97,7 +98,8 @@ class Agent(BaseModel): agent_executor: InstanceOf[CrewAgentExecutor] = Field( default=None, description="An instance of the CrewAgentExecutor class." ) - crew: Any = Field(default=None, description="Crew to which the agent belongs.") + crew: Any = Field( + default=None, description="Crew to which the agent belongs.") tools_handler: InstanceOf[ToolsHandler] = Field( default=None, description="An instance of the ToolsHandler class." ) @@ -108,7 +110,8 @@ class Agent(BaseModel): default=None, description="Callback to be executed after each step of the agent execution.", ) - i18n: I18N = Field(default=I18N(), description="Internationalization settings.") + i18n: I18N = Field( + default=I18N(), description="Internationalization settings.") llm: Any = Field( default_factory=lambda: ChatOpenAI( model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4o") @@ -169,7 +172,8 @@ class Agent(BaseModel): def set_agent_executor(self) -> "Agent": """set agent executor is set.""" if hasattr(self.llm, "model_name"): - token_handler = TokenCalcHandler(self.llm.model_name, self._token_process) + token_handler = TokenCalcHandler( + self.llm.model_name, self._token_process) # Ensure self.llm.callbacks is a list if not isinstance(self.llm.callbacks, list): @@ -204,7 +208,8 @@ class Agent(BaseModel): Output of the agent """ if self.tools_handler: - self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling") + # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling") + self.tools_handler.last_used_tool = {} task_prompt = task.prompt() @@ -224,13 +229,15 @@ class Agent(BaseModel): task_prompt += self.i18n.slice("memory").format(memory=memory) tools = tools or self.tools - parsed_tools = self._parse_tools(tools) # type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]" + # type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]" + parsed_tools = self._parse_tools(tools) self.create_agent_executor(tools=tools) self.agent_executor.tools = parsed_tools self.agent_executor.task = task - self.agent_executor.tools_description = render_text_description(parsed_tools) + self.agent_executor.tools_description = render_text_description( + parsed_tools) self.agent_executor.tools_names = self.__tools_names(parsed_tools) result = self.agent_executor.invoke( @@ -328,7 +335,8 @@ class Agent(BaseModel): ) bind = self.llm.bind(stop=stop_words) - inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(agent=self) + inner_agent = agent_args | execution_prompt | bind | CrewAgentParser( + agent=self) self.agent_executor = CrewAgentExecutor( agent=RunnableAgent(runnable=inner_agent), **executor_args ) @@ -363,8 +371,31 @@ class Agent(BaseModel): thoughts += action.log thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}" return thoughts + + def copy(self): + """Create a deep copy of the Agent.""" + exclude = { + "id", + "_logger", + "_rpm_controller", + "_request_within_rpm_limit", + "_token_process", + "agent_executor", + "tools", + "tools_handler", + "cache_handler", + } - def _parse_tools(self, tools: List[Any]) -> List[LangChainTool]: # type: ignore # Function "langchain_core.tools.tool" is not valid as a type + copied_data = self.model_dump(exclude=exclude) + copied_data = {k: v for k, v in copied_data.items() if v is not None} + + copied_agent = Agent(**copied_data) + copied_agent.tools = deepcopy(self.tools) + + return copied_agent + + # type: ignore # Function "langchain_core.tools.tool" is not valid as a type + def _parse_tools(self, tools: List[Any]) -> List[LangChainTool]: """Parse tools to be used for the task.""" # tentatively try to import from crewai_tools import BaseTool as CrewAITool tools_list = [] diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 61ebbcf14..974fdb9b3 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -1,3 +1,4 @@ +import asyncio import json import uuid from typing import Any, Dict, List, Optional, Union @@ -58,7 +59,8 @@ class Crew(BaseModel): _rpm_controller: RPMController = PrivateAttr() _logger: Logger = PrivateAttr() _file_handler: FileHandler = PrivateAttr() - _cache_handler: InstanceOf[CacheHandler] = PrivateAttr(default=CacheHandler()) + _cache_handler: InstanceOf[CacheHandler] = PrivateAttr( + default=CacheHandler()) _short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr() _long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr() _entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr() @@ -153,7 +155,8 @@ class Crew(BaseModel): self._logger = Logger(self.verbose) if self.output_log_file: self._file_handler = FileHandler(self.output_log_file) - self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger) + self._rpm_controller = RPMController( + max_rpm=self.max_rpm, logger=self._logger) self._telemetry = Telemetry() self._telemetry.set_tracer() self._telemetry.crew_creation(self) @@ -164,9 +167,7 @@ class Crew(BaseModel): """Set private attributes.""" if self.memory: self._long_term_memory = LongTermMemory() - self._short_term_memory = ShortTermMemory( - crew=self, embedder_config=self.embedder - ) + self._short_term_memory = ShortTermMemory(crew=self, embedder_config=self.embedder) self._entity_memory = EntityMemory(crew=self, embedder_config=self.embedder) return self @@ -244,7 +245,8 @@ class Crew(BaseModel): def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str: """Starts the crew to work on its assigned tasks.""" self._execution_span = self._telemetry.crew_execution_span(self) - self._interpolate_inputs(inputs) # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" + # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" + self._interpolate_inputs(inputs) self._set_tasks_callbacks() i18n = I18N(prompt_file=self.prompt_file) @@ -265,8 +267,10 @@ class Crew(BaseModel): if self.process == Process.sequential: result = self._run_sequential_process() elif self.process == Process.hierarchical: - result, manager_metrics = self._run_hierarchical_process() # type: ignore # Unpacking a string is disallowed - metrics.append(manager_metrics) # type: ignore # Cannot determine type of "manager_metrics" + # type: ignore # Unpacking a string is disallowed + result, manager_metrics = self._run_hierarchical_process() + # type: ignore # Cannot determine type of "manager_metrics" + metrics.append(manager_metrics) else: raise NotImplementedError( @@ -282,6 +286,45 @@ class Crew(BaseModel): return result + def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List: + """Executes the Crew's workflow for each input in the list and aggregates results.""" + results = [] + + for input_data in inputs: + crew = self.copy() + + for task in crew.tasks: + task.interpolate_inputs(input_data) + for agent in crew.agents: + agent.interpolate_inputs(input_data) + + output = crew.kickoff() + results.append(output) + + return results + + async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> Union[str, Dict]: + """Asynchronous kickoff method to start the crew execution.""" + return await asyncio.to_thread(self.kickoff, inputs) + + async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]: + async def run_crew(input_data): + crew = self.copy() + + for task in crew.tasks: + task.interpolate_inputs(input_data) + for agent in crew.agents: + agent.interpolate_inputs(input_data) + + return await crew.kickoff_async() + + tasks = [asyncio.create_task(run_crew(input_data)) + for input_data in inputs] + + results = await asyncio.gather(*tasks) + + return results + def train(self, n_iterations: int) -> None: # TODO: Implement training pass @@ -298,7 +341,8 @@ class Crew(BaseModel): task.tools += AgentTools(agents=agents_for_delegation).tools() role = task.agent.role if task.agent is not None else "None" - self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple") + self._logger.log("debug", f"== Working Agent: { + role}", color="bold_purple") self._logger.log( "info", f"== Starting Task: {task.description}", color="bold_purple" ) @@ -313,14 +357,17 @@ class Crew(BaseModel): task_output = output role = task.agent.role if task.agent is not None else "None" - self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n") + self._logger.log("debug", f"== [{role}] Task output: { + task_output}\n\n") if self.output_log_file: - self._file_handler.log(agent=role, task=task_output, status="completed") + self._file_handler.log( + agent=role, task=task_output, status="completed") self._finish_execution(task_output) return self._format_output(task_output) + def _run_hierarchical_process(self) -> str: """Creates and assigns a manager agent to make sure the crew completes the tasks.""" @@ -335,7 +382,8 @@ class Crew(BaseModel): manager = Agent( role=i18n.retrieve("hierarchical_manager_agent", "role"), goal=i18n.retrieve("hierarchical_manager_agent", "goal"), - backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"), + backstory=i18n.retrieve( + "hierarchical_manager_agent", "backstory"), tools=AgentTools(agents=self.agents).tools(), llm=self.manager_llm, verbose=True, @@ -355,7 +403,8 @@ class Crew(BaseModel): agent=manager, context=task_output, tools=manager.tools ) - self._logger.log("debug", f"[{manager.role}] Task output: {task_output}") + self._logger.log( + "debug", f"[{manager.role}] Task output: {task_output}") if self.output_log_file: self._file_handler.log( @@ -363,7 +412,39 @@ class Crew(BaseModel): ) self._finish_execution(task_output) - return self._format_output(task_output), manager._token_process.get_summary() # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str") + # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str") + return self._format_output(task_output), manager._token_process.get_summary() + + def copy(self): + """Create a deep copy of the Crew.""" + + exclude = { + "id", + "_rpm_controller", + "_logger", + "_execution_span", + "_file_handler", + "_cache_handler", + "_short_term_memory", + "_long_term_memory", + "_entity_memory" + "agents", + "tasks", + } + + cloned_agents = [agent.copy() for agent in self.agents] + cloned_tasks = [task.copy() for task in self.tasks] + + copied_data = self.model_dump(exclude=exclude) + copied_data = {k: v for k, v in copied_data.items() if v is not None} + + copied_data.pop("agents", None) + copied_data.pop("tasks", None) + + copied_crew = Crew(**copied_data, agents=cloned_agents, tasks=cloned_tasks) + + return copied_crew + def _set_tasks_callbacks(self) -> None: """Sets callback for every task suing task_callback""" @@ -373,8 +454,11 @@ class Crew(BaseModel): def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None: """Interpolates the inputs in the tasks and agents.""" - [task.interpolate_inputs(inputs) for task in self.tasks] # type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None) - [agent.interpolate_inputs(inputs) for agent in self.agents] # type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None) + [task.interpolate_inputs( + # type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None) + inputs) for task in self.tasks] + # type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None) + [agent.interpolate_inputs(inputs) for agent in self.agents] def _format_output(self, output: str) -> str: """Formats the output of the crew execution.""" diff --git a/src/crewai/task.py b/src/crewai/task.py index 715b09534..2c618fe8d 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -1,3 +1,4 @@ +from copy import deepcopy import os import re import threading @@ -163,13 +164,16 @@ class Task(BaseModel): ) if self.context: - context = [] # type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None") + # type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None") + context = [] for task in self.context: if task.async_execution: task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join" if task and task.output: - context.append(task.output.raw_output) # type: ignore # Item "str" of "str | None" has no attribute "append" - context = "\n".join(context) # type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]" + # type: ignore # Item "str" of "str | None" has no attribute "append" + context.append(task.output.raw_output) + # type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]" + context = "\n".join(context) self.prompt_context = context tools = tools or self.tools @@ -232,7 +236,8 @@ class Task(BaseModel): if inputs: self.description = self._original_description.format(**inputs) - self.expected_output = self._original_expected_output.format(**inputs) + self.expected_output = self._original_expected_output.format( + **inputs) def increment_tools_errors(self) -> None: """Increment the tools errors counter.""" @@ -242,6 +247,25 @@ class Task(BaseModel): """Increment the delegations counter.""" self.delegations += 1 + def copy(self): + """Create a deep copy of the Task.""" + exclude = { + "id", + "agent", + "context", + "tools", + } + + copied_data = self.model_dump(exclude=exclude) + copied_data = {k: v for k, v in copied_data.items() if v is not None} + + cloned_context = [task.copy() for task in self.context] if self.context else None + cloned_agent = self.agent.copy() if self.agent else None + cloned_tools = deepcopy(self.tools) if self.tools else None + + copied_task = Task(**copied_data, context=cloned_context, agent=cloned_agent, tools=cloned_tools) + return copied_task + def _export_output(self, result: str) -> Any: exported_result = result instructions = "I'm gonna convert this raw text into valid JSON." @@ -251,27 +275,35 @@ class Task(BaseModel): # try to convert task_output directly to pydantic/json try: - exported_result = model.model_validate_json(result) # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json" + # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json" + exported_result = model.model_validate_json(result) if self.output_json: - return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump" + # type: ignore # "str" has no attribute "model_dump" + return exported_result.model_dump() return exported_result except Exception: # sometimes the response contains valid JSON in the middle of text match = re.search(r"({.*})", result, re.DOTALL) if match: try: - exported_result = model.model_validate_json(match.group(0)) # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json" + # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json" + exported_result = model.model_validate_json( + match.group(0)) if self.output_json: - return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump" + # type: ignore # "str" has no attribute "model_dump" + return exported_result.model_dump() return exported_result except Exception: pass - llm = self.agent.function_calling_llm or self.agent.llm # type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm" + # type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm" + llm = self.agent.function_calling_llm or self.agent.llm if not self._is_gpt(llm): - model_schema = PydanticSchemaParser(model=model).get_schema() # type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]" - instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}" + # type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]" + model_schema = PydanticSchemaParser(model=model).get_schema() + instructions = f"{ + instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}" converter = Converter( llm=llm, text=result, model=model, instructions=instructions @@ -284,14 +316,16 @@ class Task(BaseModel): if isinstance(exported_result, ConverterError): Printer().print( - content=f"{exported_result.message} Using raw output instead.", + content=f"{ + exported_result.message} Using raw output instead.", color="red", ) exported_result = result if self.output_file: content = ( - exported_result if not self.output_pydantic else exported_result.json() # type: ignore # "str" has no attribute "json" + # type: ignore # "str" has no attribute "json" + exported_result if not self.output_pydantic else exported_result.json() ) self._save_file(content) @@ -301,12 +335,14 @@ class Task(BaseModel): return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None def _save_file(self, result: Any) -> None: - directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None" + # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None" + directory = os.path.dirname(self.output_file) if directory and not os.path.exists(directory): os.makedirs(directory) - with open(self.output_file, "w", encoding="utf-8") as file: # type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]" + # type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]" + with open(self.output_file, "w", encoding='utf-8') as file: file.write(result) return None