mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
Merge branch 'gl/feat/async-crew-support' into gl/feat/async-flow-kickoff
This commit is contained in:
@@ -2,7 +2,6 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
from collections.abc import Sequence
|
from collections.abc import Sequence
|
||||||
import json
|
|
||||||
import shutil
|
import shutil
|
||||||
import subprocess
|
import subprocess
|
||||||
import time
|
import time
|
||||||
@@ -19,6 +18,19 @@ from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
|
|||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
|
|
||||||
from crewai.a2a.config import A2AConfig
|
from crewai.a2a.config import A2AConfig
|
||||||
|
from crewai.agent.utils import (
|
||||||
|
ahandle_knowledge_retrieval,
|
||||||
|
apply_training_data,
|
||||||
|
build_task_prompt_with_schema,
|
||||||
|
format_task_with_context,
|
||||||
|
get_knowledge_config,
|
||||||
|
handle_knowledge_retrieval,
|
||||||
|
handle_reasoning,
|
||||||
|
prepare_tools,
|
||||||
|
process_tool_results,
|
||||||
|
save_last_messages,
|
||||||
|
validate_max_execution_time,
|
||||||
|
)
|
||||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||||
from crewai.agents.cache.cache_handler import CacheHandler
|
from crewai.agents.cache.cache_handler import CacheHandler
|
||||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||||
@@ -27,9 +39,6 @@ from crewai.events.types.knowledge_events import (
|
|||||||
KnowledgeQueryCompletedEvent,
|
KnowledgeQueryCompletedEvent,
|
||||||
KnowledgeQueryFailedEvent,
|
KnowledgeQueryFailedEvent,
|
||||||
KnowledgeQueryStartedEvent,
|
KnowledgeQueryStartedEvent,
|
||||||
KnowledgeRetrievalCompletedEvent,
|
|
||||||
KnowledgeRetrievalStartedEvent,
|
|
||||||
KnowledgeSearchQueryFailedEvent,
|
|
||||||
)
|
)
|
||||||
from crewai.events.types.memory_events import (
|
from crewai.events.types.memory_events import (
|
||||||
MemoryRetrievalCompletedEvent,
|
MemoryRetrievalCompletedEvent,
|
||||||
@@ -37,7 +46,6 @@ from crewai.events.types.memory_events import (
|
|||||||
)
|
)
|
||||||
from crewai.knowledge.knowledge import Knowledge
|
from crewai.knowledge.knowledge import Knowledge
|
||||||
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
|
||||||
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
|
|
||||||
from crewai.lite_agent import LiteAgent
|
from crewai.lite_agent import LiteAgent
|
||||||
from crewai.llms.base_llm import BaseLLM
|
from crewai.llms.base_llm import BaseLLM
|
||||||
from crewai.mcp import (
|
from crewai.mcp import (
|
||||||
@@ -61,7 +69,7 @@ from crewai.utilities.agent_utils import (
|
|||||||
render_text_description_and_args,
|
render_text_description_and_args,
|
||||||
)
|
)
|
||||||
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
|
||||||
from crewai.utilities.converter import Converter, generate_model_description
|
from crewai.utilities.converter import Converter
|
||||||
from crewai.utilities.guardrail_types import GuardrailType
|
from crewai.utilities.guardrail_types import GuardrailType
|
||||||
from crewai.utilities.llm_utils import create_llm
|
from crewai.utilities.llm_utils import create_llm
|
||||||
from crewai.utilities.prompts import Prompts
|
from crewai.utilities.prompts import Prompts
|
||||||
@@ -295,53 +303,15 @@ class Agent(BaseAgent):
|
|||||||
ValueError: If the max execution time is not a positive integer.
|
ValueError: If the max execution time is not a positive integer.
|
||||||
RuntimeError: If the agent execution fails for other reasons.
|
RuntimeError: If the agent execution fails for other reasons.
|
||||||
"""
|
"""
|
||||||
if self.reasoning:
|
handle_reasoning(self, task)
|
||||||
try:
|
|
||||||
from crewai.utilities.reasoning_handler import (
|
|
||||||
AgentReasoning,
|
|
||||||
AgentReasoningOutput,
|
|
||||||
)
|
|
||||||
|
|
||||||
reasoning_handler = AgentReasoning(task=task, agent=self)
|
|
||||||
reasoning_output: AgentReasoningOutput = (
|
|
||||||
reasoning_handler.handle_agent_reasoning()
|
|
||||||
)
|
|
||||||
|
|
||||||
# Add the reasoning plan to the task description
|
|
||||||
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
|
|
||||||
except Exception as e:
|
|
||||||
self._logger.log("error", f"Error during reasoning process: {e!s}")
|
|
||||||
self._inject_date_to_task(task)
|
self._inject_date_to_task(task)
|
||||||
|
|
||||||
if self.tools_handler:
|
if self.tools_handler:
|
||||||
self.tools_handler.last_used_tool = None
|
self.tools_handler.last_used_tool = None
|
||||||
|
|
||||||
task_prompt = task.prompt()
|
task_prompt = task.prompt()
|
||||||
|
task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n)
|
||||||
# If the task requires output in JSON or Pydantic format,
|
task_prompt = format_task_with_context(task_prompt, context, self.i18n)
|
||||||
# append specific instructions to the task prompt to ensure
|
|
||||||
# that the final answer does not include any code block markers
|
|
||||||
# Skip this if task.response_model is set, as native structured outputs handle schema automatically
|
|
||||||
if (task.output_json or task.output_pydantic) and not task.response_model:
|
|
||||||
# Generate the schema based on the output format
|
|
||||||
if task.output_json:
|
|
||||||
schema_dict = generate_model_description(task.output_json)
|
|
||||||
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
|
|
||||||
task_prompt += "\n" + self.i18n.slice(
|
|
||||||
"formatted_task_instructions"
|
|
||||||
).format(output_format=schema)
|
|
||||||
|
|
||||||
elif task.output_pydantic:
|
|
||||||
schema_dict = generate_model_description(task.output_pydantic)
|
|
||||||
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
|
|
||||||
task_prompt += "\n" + self.i18n.slice(
|
|
||||||
"formatted_task_instructions"
|
|
||||||
).format(output_format=schema)
|
|
||||||
|
|
||||||
if context:
|
|
||||||
task_prompt = self.i18n.slice("task_with_context").format(
|
|
||||||
task=task_prompt, context=context
|
|
||||||
)
|
|
||||||
|
|
||||||
if self._is_any_available_memory():
|
if self._is_any_available_memory():
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
@@ -379,84 +349,20 @@ class Agent(BaseAgent):
|
|||||||
from_task=task,
|
from_task=task,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
knowledge_config = (
|
|
||||||
self.knowledge_config.model_dump() if self.knowledge_config else {}
|
knowledge_config = get_knowledge_config(self)
|
||||||
|
task_prompt = handle_knowledge_retrieval(
|
||||||
|
self,
|
||||||
|
task,
|
||||||
|
task_prompt,
|
||||||
|
knowledge_config,
|
||||||
|
self.knowledge.query if self.knowledge else lambda *a, **k: None,
|
||||||
|
self.crew.query_knowledge if self.crew else lambda *a, **k: None,
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.knowledge or (self.crew and self.crew.knowledge):
|
prepare_tools(self, tools, task)
|
||||||
crewai_event_bus.emit(
|
task_prompt = apply_training_data(self, task_prompt)
|
||||||
self,
|
|
||||||
event=KnowledgeRetrievalStartedEvent(
|
|
||||||
from_task=task,
|
|
||||||
from_agent=self,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
self.knowledge_search_query = self._get_knowledge_search_query(
|
|
||||||
task_prompt, task
|
|
||||||
)
|
|
||||||
if self.knowledge_search_query:
|
|
||||||
# Quering agent specific knowledge
|
|
||||||
if self.knowledge:
|
|
||||||
agent_knowledge_snippets = self.knowledge.query(
|
|
||||||
[self.knowledge_search_query], **knowledge_config
|
|
||||||
)
|
|
||||||
if agent_knowledge_snippets:
|
|
||||||
self.agent_knowledge_context = extract_knowledge_context(
|
|
||||||
agent_knowledge_snippets
|
|
||||||
)
|
|
||||||
if self.agent_knowledge_context:
|
|
||||||
task_prompt += self.agent_knowledge_context
|
|
||||||
|
|
||||||
# Quering crew specific knowledge
|
|
||||||
knowledge_snippets = self.crew.query_knowledge(
|
|
||||||
[self.knowledge_search_query], **knowledge_config
|
|
||||||
)
|
|
||||||
if knowledge_snippets:
|
|
||||||
self.crew_knowledge_context = extract_knowledge_context(
|
|
||||||
knowledge_snippets
|
|
||||||
)
|
|
||||||
if self.crew_knowledge_context:
|
|
||||||
task_prompt += self.crew_knowledge_context
|
|
||||||
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=KnowledgeRetrievalCompletedEvent(
|
|
||||||
query=self.knowledge_search_query,
|
|
||||||
from_task=task,
|
|
||||||
from_agent=self,
|
|
||||||
retrieved_knowledge=(
|
|
||||||
(self.agent_knowledge_context or "")
|
|
||||||
+ (
|
|
||||||
"\n"
|
|
||||||
if self.agent_knowledge_context
|
|
||||||
and self.crew_knowledge_context
|
|
||||||
else ""
|
|
||||||
)
|
|
||||||
+ (self.crew_knowledge_context or "")
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=KnowledgeSearchQueryFailedEvent(
|
|
||||||
query=self.knowledge_search_query or "",
|
|
||||||
error=str(e),
|
|
||||||
from_task=task,
|
|
||||||
from_agent=self,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
tools = tools or self.tools or []
|
|
||||||
self.create_agent_executor(tools=tools, task=task)
|
|
||||||
|
|
||||||
if self.crew and self.crew._train:
|
|
||||||
task_prompt = self._training_handler(task_prompt=task_prompt)
|
|
||||||
else:
|
|
||||||
task_prompt = self._use_trained_data(task_prompt=task_prompt)
|
|
||||||
|
|
||||||
# Import agent events locally to avoid circular imports
|
|
||||||
from crewai.events.types.agent_events import (
|
from crewai.events.types.agent_events import (
|
||||||
AgentExecutionCompletedEvent,
|
AgentExecutionCompletedEvent,
|
||||||
AgentExecutionErrorEvent,
|
AgentExecutionErrorEvent,
|
||||||
@@ -474,15 +380,8 @@ class Agent(BaseAgent):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Determine execution method based on timeout setting
|
validate_max_execution_time(self.max_execution_time)
|
||||||
if self.max_execution_time is not None:
|
if self.max_execution_time is not None:
|
||||||
if (
|
|
||||||
not isinstance(self.max_execution_time, int)
|
|
||||||
or self.max_execution_time <= 0
|
|
||||||
):
|
|
||||||
raise ValueError(
|
|
||||||
"Max Execution time must be a positive integer greater than zero"
|
|
||||||
)
|
|
||||||
result = self._execute_with_timeout(
|
result = self._execute_with_timeout(
|
||||||
task_prompt, task, self.max_execution_time
|
task_prompt, task, self.max_execution_time
|
||||||
)
|
)
|
||||||
@@ -490,7 +389,6 @@ class Agent(BaseAgent):
|
|||||||
result = self._execute_without_timeout(task_prompt, task)
|
result = self._execute_without_timeout(task_prompt, task)
|
||||||
|
|
||||||
except TimeoutError as e:
|
except TimeoutError as e:
|
||||||
# Propagate TimeoutError without retry
|
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
event=AgentExecutionErrorEvent(
|
event=AgentExecutionErrorEvent(
|
||||||
@@ -502,7 +400,6 @@ class Agent(BaseAgent):
|
|||||||
raise e
|
raise e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if e.__class__.__module__.startswith("litellm"):
|
if e.__class__.__module__.startswith("litellm"):
|
||||||
# Do not retry on litellm errors
|
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
event=AgentExecutionErrorEvent(
|
event=AgentExecutionErrorEvent(
|
||||||
@@ -528,23 +425,13 @@ class Agent(BaseAgent):
|
|||||||
if self.max_rpm and self._rpm_controller:
|
if self.max_rpm and self._rpm_controller:
|
||||||
self._rpm_controller.stop_rpm_counter()
|
self._rpm_controller.stop_rpm_counter()
|
||||||
|
|
||||||
# If there was any tool in self.tools_results that had result_as_answer
|
result = process_tool_results(self, result)
|
||||||
# set to True, return the results of the last tool that had
|
|
||||||
# result_as_answer set to True
|
|
||||||
for tool_result in self.tools_results:
|
|
||||||
if tool_result.get("result_as_answer", False):
|
|
||||||
result = tool_result["result"]
|
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
|
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
|
||||||
)
|
)
|
||||||
|
|
||||||
self._last_messages = (
|
save_last_messages(self)
|
||||||
self.agent_executor.messages.copy()
|
|
||||||
if self.agent_executor and hasattr(self.agent_executor, "messages")
|
|
||||||
else []
|
|
||||||
)
|
|
||||||
|
|
||||||
self._cleanup_mcp_clients()
|
self._cleanup_mcp_clients()
|
||||||
|
|
||||||
return result
|
return result
|
||||||
@@ -625,47 +512,15 @@ class Agent(BaseAgent):
|
|||||||
ValueError: If the max execution time is not a positive integer.
|
ValueError: If the max execution time is not a positive integer.
|
||||||
RuntimeError: If the agent execution fails for other reasons.
|
RuntimeError: If the agent execution fails for other reasons.
|
||||||
"""
|
"""
|
||||||
if self.reasoning:
|
handle_reasoning(self, task)
|
||||||
try:
|
|
||||||
from crewai.utilities.reasoning_handler import (
|
|
||||||
AgentReasoning,
|
|
||||||
AgentReasoningOutput,
|
|
||||||
)
|
|
||||||
|
|
||||||
reasoning_handler = AgentReasoning(task=task, agent=self)
|
|
||||||
reasoning_output: AgentReasoningOutput = (
|
|
||||||
reasoning_handler.handle_agent_reasoning()
|
|
||||||
)
|
|
||||||
|
|
||||||
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
|
|
||||||
except Exception as e:
|
|
||||||
self._logger.log("error", f"Error during reasoning process: {e!s}")
|
|
||||||
self._inject_date_to_task(task)
|
self._inject_date_to_task(task)
|
||||||
|
|
||||||
if self.tools_handler:
|
if self.tools_handler:
|
||||||
self.tools_handler.last_used_tool = None
|
self.tools_handler.last_used_tool = None
|
||||||
|
|
||||||
task_prompt = task.prompt()
|
task_prompt = task.prompt()
|
||||||
|
task_prompt = build_task_prompt_with_schema(task, task_prompt, self.i18n)
|
||||||
if (task.output_json or task.output_pydantic) and not task.response_model:
|
task_prompt = format_task_with_context(task_prompt, context, self.i18n)
|
||||||
if task.output_json:
|
|
||||||
schema_dict = generate_model_description(task.output_json)
|
|
||||||
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
|
|
||||||
task_prompt += "\n" + self.i18n.slice(
|
|
||||||
"formatted_task_instructions"
|
|
||||||
).format(output_format=schema)
|
|
||||||
|
|
||||||
elif task.output_pydantic:
|
|
||||||
schema_dict = generate_model_description(task.output_pydantic)
|
|
||||||
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
|
|
||||||
task_prompt += "\n" + self.i18n.slice(
|
|
||||||
"formatted_task_instructions"
|
|
||||||
).format(output_format=schema)
|
|
||||||
|
|
||||||
if context:
|
|
||||||
task_prompt = self.i18n.slice("task_with_context").format(
|
|
||||||
task=task_prompt, context=context
|
|
||||||
)
|
|
||||||
|
|
||||||
if self._is_any_available_memory():
|
if self._is_any_available_memory():
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
@@ -705,80 +560,14 @@ class Agent(BaseAgent):
|
|||||||
from_task=task,
|
from_task=task,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
knowledge_config = (
|
|
||||||
self.knowledge_config.model_dump() if self.knowledge_config else {}
|
knowledge_config = get_knowledge_config(self)
|
||||||
|
task_prompt = await ahandle_knowledge_retrieval(
|
||||||
|
self, task, task_prompt, knowledge_config
|
||||||
)
|
)
|
||||||
|
|
||||||
if self.knowledge or (self.crew and self.crew.knowledge):
|
prepare_tools(self, tools, task)
|
||||||
crewai_event_bus.emit(
|
task_prompt = apply_training_data(self, task_prompt)
|
||||||
self,
|
|
||||||
event=KnowledgeRetrievalStartedEvent(
|
|
||||||
from_task=task,
|
|
||||||
from_agent=self,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
self.knowledge_search_query = self._get_knowledge_search_query(
|
|
||||||
task_prompt, task
|
|
||||||
)
|
|
||||||
if self.knowledge_search_query:
|
|
||||||
if self.knowledge:
|
|
||||||
agent_knowledge_snippets = await self.knowledge.aquery(
|
|
||||||
[self.knowledge_search_query], **knowledge_config
|
|
||||||
)
|
|
||||||
if agent_knowledge_snippets:
|
|
||||||
self.agent_knowledge_context = extract_knowledge_context(
|
|
||||||
agent_knowledge_snippets
|
|
||||||
)
|
|
||||||
if self.agent_knowledge_context:
|
|
||||||
task_prompt += self.agent_knowledge_context
|
|
||||||
|
|
||||||
knowledge_snippets = await self.crew.aquery_knowledge(
|
|
||||||
[self.knowledge_search_query], **knowledge_config
|
|
||||||
)
|
|
||||||
if knowledge_snippets:
|
|
||||||
self.crew_knowledge_context = extract_knowledge_context(
|
|
||||||
knowledge_snippets
|
|
||||||
)
|
|
||||||
if self.crew_knowledge_context:
|
|
||||||
task_prompt += self.crew_knowledge_context
|
|
||||||
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=KnowledgeRetrievalCompletedEvent(
|
|
||||||
query=self.knowledge_search_query,
|
|
||||||
from_task=task,
|
|
||||||
from_agent=self,
|
|
||||||
retrieved_knowledge=(
|
|
||||||
(self.agent_knowledge_context or "")
|
|
||||||
+ (
|
|
||||||
"\n"
|
|
||||||
if self.agent_knowledge_context
|
|
||||||
and self.crew_knowledge_context
|
|
||||||
else ""
|
|
||||||
)
|
|
||||||
+ (self.crew_knowledge_context or "")
|
|
||||||
),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=KnowledgeSearchQueryFailedEvent(
|
|
||||||
query=self.knowledge_search_query or "",
|
|
||||||
error=str(e),
|
|
||||||
from_task=task,
|
|
||||||
from_agent=self,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
tools = tools or self.tools or []
|
|
||||||
self.create_agent_executor(tools=tools, task=task)
|
|
||||||
|
|
||||||
if self.crew and self.crew._train:
|
|
||||||
task_prompt = self._training_handler(task_prompt=task_prompt)
|
|
||||||
else:
|
|
||||||
task_prompt = self._use_trained_data(task_prompt=task_prompt)
|
|
||||||
|
|
||||||
from crewai.events.types.agent_events import (
|
from crewai.events.types.agent_events import (
|
||||||
AgentExecutionCompletedEvent,
|
AgentExecutionCompletedEvent,
|
||||||
@@ -797,14 +586,8 @@ class Agent(BaseAgent):
|
|||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
validate_max_execution_time(self.max_execution_time)
|
||||||
if self.max_execution_time is not None:
|
if self.max_execution_time is not None:
|
||||||
if (
|
|
||||||
not isinstance(self.max_execution_time, int)
|
|
||||||
or self.max_execution_time <= 0
|
|
||||||
):
|
|
||||||
raise ValueError(
|
|
||||||
"Max Execution time must be a positive integer greater than zero"
|
|
||||||
)
|
|
||||||
result = await self._aexecute_with_timeout(
|
result = await self._aexecute_with_timeout(
|
||||||
task_prompt, task, self.max_execution_time
|
task_prompt, task, self.max_execution_time
|
||||||
)
|
)
|
||||||
@@ -848,20 +631,13 @@ class Agent(BaseAgent):
|
|||||||
if self.max_rpm and self._rpm_controller:
|
if self.max_rpm and self._rpm_controller:
|
||||||
self._rpm_controller.stop_rpm_counter()
|
self._rpm_controller.stop_rpm_counter()
|
||||||
|
|
||||||
for tool_result in self.tools_results:
|
result = process_tool_results(self, result)
|
||||||
if tool_result.get("result_as_answer", False):
|
|
||||||
result = tool_result["result"]
|
|
||||||
crewai_event_bus.emit(
|
crewai_event_bus.emit(
|
||||||
self,
|
self,
|
||||||
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
|
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
|
||||||
)
|
)
|
||||||
|
|
||||||
self._last_messages = (
|
save_last_messages(self)
|
||||||
self.agent_executor.messages.copy()
|
|
||||||
if self.agent_executor and hasattr(self.agent_executor, "messages")
|
|
||||||
else []
|
|
||||||
)
|
|
||||||
|
|
||||||
self._cleanup_mcp_clients()
|
self._cleanup_mcp_clients()
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
|||||||
355
lib/crewai/src/crewai/agent/utils.py
Normal file
355
lib/crewai/src/crewai/agent/utils.py
Normal file
@@ -0,0 +1,355 @@
|
|||||||
|
"""Utility functions for agent task execution.
|
||||||
|
|
||||||
|
This module contains shared logic extracted from the Agent's execute_task
|
||||||
|
and aexecute_task methods to reduce code duplication.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
|
from crewai.events.event_bus import crewai_event_bus
|
||||||
|
from crewai.events.types.knowledge_events import (
|
||||||
|
KnowledgeRetrievalCompletedEvent,
|
||||||
|
KnowledgeRetrievalStartedEvent,
|
||||||
|
KnowledgeSearchQueryFailedEvent,
|
||||||
|
)
|
||||||
|
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
|
||||||
|
from crewai.utilities.converter import generate_model_description
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from crewai.agent.core import Agent
|
||||||
|
from crewai.task import Task
|
||||||
|
from crewai.tools.base_tool import BaseTool
|
||||||
|
from crewai.utilities.i18n import I18N
|
||||||
|
|
||||||
|
|
||||||
|
def handle_reasoning(agent: Agent, task: Task) -> None:
|
||||||
|
"""Handle the reasoning process for an agent before task execution.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent performing the task.
|
||||||
|
task: The task to execute.
|
||||||
|
"""
|
||||||
|
if not agent.reasoning:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
from crewai.utilities.reasoning_handler import (
|
||||||
|
AgentReasoning,
|
||||||
|
AgentReasoningOutput,
|
||||||
|
)
|
||||||
|
|
||||||
|
reasoning_handler = AgentReasoning(task=task, agent=agent)
|
||||||
|
reasoning_output: AgentReasoningOutput = (
|
||||||
|
reasoning_handler.handle_agent_reasoning()
|
||||||
|
)
|
||||||
|
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
|
||||||
|
except Exception as e:
|
||||||
|
agent._logger.log("error", f"Error during reasoning process: {e!s}")
|
||||||
|
|
||||||
|
|
||||||
|
def build_task_prompt_with_schema(task: Task, task_prompt: str, i18n: I18N) -> str:
|
||||||
|
"""Build task prompt with JSON/Pydantic schema instructions if applicable.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task: The task being executed.
|
||||||
|
task_prompt: The initial task prompt.
|
||||||
|
i18n: Internationalization instance.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The task prompt potentially augmented with schema instructions.
|
||||||
|
"""
|
||||||
|
if (task.output_json or task.output_pydantic) and not task.response_model:
|
||||||
|
if task.output_json:
|
||||||
|
schema_dict = generate_model_description(task.output_json)
|
||||||
|
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
|
||||||
|
task_prompt += "\n" + i18n.slice("formatted_task_instructions").format(
|
||||||
|
output_format=schema
|
||||||
|
)
|
||||||
|
elif task.output_pydantic:
|
||||||
|
schema_dict = generate_model_description(task.output_pydantic)
|
||||||
|
schema = json.dumps(schema_dict["json_schema"]["schema"], indent=2)
|
||||||
|
task_prompt += "\n" + i18n.slice("formatted_task_instructions").format(
|
||||||
|
output_format=schema
|
||||||
|
)
|
||||||
|
return task_prompt
|
||||||
|
|
||||||
|
|
||||||
|
def format_task_with_context(task_prompt: str, context: str | None, i18n: I18N) -> str:
|
||||||
|
"""Format task prompt with context if provided.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_prompt: The task prompt.
|
||||||
|
context: Optional context string.
|
||||||
|
i18n: Internationalization instance.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The task prompt formatted with context if provided.
|
||||||
|
"""
|
||||||
|
if context:
|
||||||
|
return i18n.slice("task_with_context").format(task=task_prompt, context=context)
|
||||||
|
return task_prompt
|
||||||
|
|
||||||
|
|
||||||
|
def get_knowledge_config(agent: Agent) -> dict[str, Any]:
|
||||||
|
"""Get knowledge configuration from agent.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent instance.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary of knowledge configuration.
|
||||||
|
"""
|
||||||
|
return agent.knowledge_config.model_dump() if agent.knowledge_config else {}
|
||||||
|
|
||||||
|
|
||||||
|
def handle_knowledge_retrieval(
|
||||||
|
agent: Agent,
|
||||||
|
task: Task,
|
||||||
|
task_prompt: str,
|
||||||
|
knowledge_config: dict[str, Any],
|
||||||
|
query_func: Any,
|
||||||
|
crew_query_func: Any,
|
||||||
|
) -> str:
|
||||||
|
"""Handle knowledge retrieval for task execution.
|
||||||
|
|
||||||
|
This function handles both agent-specific and crew-specific knowledge queries.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent performing the task.
|
||||||
|
task: The task being executed.
|
||||||
|
task_prompt: The current task prompt.
|
||||||
|
knowledge_config: Knowledge configuration dictionary.
|
||||||
|
query_func: Function to query agent knowledge (sync or async).
|
||||||
|
crew_query_func: Function to query crew knowledge (sync or async).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The task prompt potentially augmented with knowledge context.
|
||||||
|
"""
|
||||||
|
if not (agent.knowledge or (agent.crew and agent.crew.knowledge)):
|
||||||
|
return task_prompt
|
||||||
|
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
event=KnowledgeRetrievalStartedEvent(
|
||||||
|
from_task=task,
|
||||||
|
from_agent=agent,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
agent.knowledge_search_query = agent._get_knowledge_search_query(
|
||||||
|
task_prompt, task
|
||||||
|
)
|
||||||
|
if agent.knowledge_search_query:
|
||||||
|
if agent.knowledge:
|
||||||
|
agent_knowledge_snippets = query_func(
|
||||||
|
[agent.knowledge_search_query], **knowledge_config
|
||||||
|
)
|
||||||
|
if agent_knowledge_snippets:
|
||||||
|
agent.agent_knowledge_context = extract_knowledge_context(
|
||||||
|
agent_knowledge_snippets
|
||||||
|
)
|
||||||
|
if agent.agent_knowledge_context:
|
||||||
|
task_prompt += agent.agent_knowledge_context
|
||||||
|
|
||||||
|
knowledge_snippets = crew_query_func(
|
||||||
|
[agent.knowledge_search_query], **knowledge_config
|
||||||
|
)
|
||||||
|
if knowledge_snippets:
|
||||||
|
agent.crew_knowledge_context = extract_knowledge_context(
|
||||||
|
knowledge_snippets
|
||||||
|
)
|
||||||
|
if agent.crew_knowledge_context:
|
||||||
|
task_prompt += agent.crew_knowledge_context
|
||||||
|
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
event=KnowledgeRetrievalCompletedEvent(
|
||||||
|
query=agent.knowledge_search_query,
|
||||||
|
from_task=task,
|
||||||
|
from_agent=agent,
|
||||||
|
retrieved_knowledge=_combine_knowledge_context(agent),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
event=KnowledgeSearchQueryFailedEvent(
|
||||||
|
query=agent.knowledge_search_query or "",
|
||||||
|
error=str(e),
|
||||||
|
from_task=task,
|
||||||
|
from_agent=agent,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return task_prompt
|
||||||
|
|
||||||
|
|
||||||
|
def _combine_knowledge_context(agent: Agent) -> str:
|
||||||
|
"""Combine agent and crew knowledge contexts into a single string.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent with knowledge contexts.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Combined knowledge context string.
|
||||||
|
"""
|
||||||
|
agent_ctx = agent.agent_knowledge_context or ""
|
||||||
|
crew_ctx = agent.crew_knowledge_context or ""
|
||||||
|
separator = "\n" if agent_ctx and crew_ctx else ""
|
||||||
|
return agent_ctx + separator + crew_ctx
|
||||||
|
|
||||||
|
|
||||||
|
def apply_training_data(agent: Agent, task_prompt: str) -> str:
|
||||||
|
"""Apply training data to the task prompt.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent performing the task.
|
||||||
|
task_prompt: The task prompt.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The task prompt with training data applied.
|
||||||
|
"""
|
||||||
|
if agent.crew and agent.crew._train:
|
||||||
|
return agent._training_handler(task_prompt=task_prompt)
|
||||||
|
return agent._use_trained_data(task_prompt=task_prompt)
|
||||||
|
|
||||||
|
|
||||||
|
def process_tool_results(agent: Agent, result: Any) -> Any:
|
||||||
|
"""Process tool results, returning result_as_answer if applicable.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent with tool results.
|
||||||
|
result: The current result.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The final result, potentially overridden by tool result_as_answer.
|
||||||
|
"""
|
||||||
|
for tool_result in agent.tools_results:
|
||||||
|
if tool_result.get("result_as_answer", False):
|
||||||
|
result = tool_result["result"]
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def save_last_messages(agent: Agent) -> None:
|
||||||
|
"""Save the last messages from agent executor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent instance.
|
||||||
|
"""
|
||||||
|
agent._last_messages = (
|
||||||
|
agent.agent_executor.messages.copy()
|
||||||
|
if agent.agent_executor and hasattr(agent.agent_executor, "messages")
|
||||||
|
else []
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_tools(
|
||||||
|
agent: Agent, tools: list[BaseTool] | None, task: Task
|
||||||
|
) -> list[BaseTool]:
|
||||||
|
"""Prepare tools for task execution and create agent executor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent instance.
|
||||||
|
tools: Optional list of tools.
|
||||||
|
task: The task being executed.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The list of tools to use.
|
||||||
|
"""
|
||||||
|
final_tools = tools or agent.tools or []
|
||||||
|
agent.create_agent_executor(tools=final_tools, task=task)
|
||||||
|
return final_tools
|
||||||
|
|
||||||
|
|
||||||
|
def validate_max_execution_time(max_execution_time: int | None) -> None:
|
||||||
|
"""Validate max_execution_time parameter.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
max_execution_time: The maximum execution time to validate.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: If max_execution_time is not a positive integer.
|
||||||
|
"""
|
||||||
|
if max_execution_time is not None:
|
||||||
|
if not isinstance(max_execution_time, int) or max_execution_time <= 0:
|
||||||
|
raise ValueError(
|
||||||
|
"Max Execution time must be a positive integer greater than zero"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def ahandle_knowledge_retrieval(
|
||||||
|
agent: Agent,
|
||||||
|
task: Task,
|
||||||
|
task_prompt: str,
|
||||||
|
knowledge_config: dict[str, Any],
|
||||||
|
) -> str:
|
||||||
|
"""Handle async knowledge retrieval for task execution.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
agent: The agent performing the task.
|
||||||
|
task: The task being executed.
|
||||||
|
task_prompt: The current task prompt.
|
||||||
|
knowledge_config: Knowledge configuration dictionary.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The task prompt potentially augmented with knowledge context.
|
||||||
|
"""
|
||||||
|
if not (agent.knowledge or (agent.crew and agent.crew.knowledge)):
|
||||||
|
return task_prompt
|
||||||
|
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
event=KnowledgeRetrievalStartedEvent(
|
||||||
|
from_task=task,
|
||||||
|
from_agent=agent,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
agent.knowledge_search_query = agent._get_knowledge_search_query(
|
||||||
|
task_prompt, task
|
||||||
|
)
|
||||||
|
if agent.knowledge_search_query:
|
||||||
|
if agent.knowledge:
|
||||||
|
agent_knowledge_snippets = await agent.knowledge.aquery(
|
||||||
|
[agent.knowledge_search_query], **knowledge_config
|
||||||
|
)
|
||||||
|
if agent_knowledge_snippets:
|
||||||
|
agent.agent_knowledge_context = extract_knowledge_context(
|
||||||
|
agent_knowledge_snippets
|
||||||
|
)
|
||||||
|
if agent.agent_knowledge_context:
|
||||||
|
task_prompt += agent.agent_knowledge_context
|
||||||
|
|
||||||
|
knowledge_snippets = await agent.crew.aquery_knowledge(
|
||||||
|
[agent.knowledge_search_query], **knowledge_config
|
||||||
|
)
|
||||||
|
if knowledge_snippets:
|
||||||
|
agent.crew_knowledge_context = extract_knowledge_context(
|
||||||
|
knowledge_snippets
|
||||||
|
)
|
||||||
|
if agent.crew_knowledge_context:
|
||||||
|
task_prompt += agent.crew_knowledge_context
|
||||||
|
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
event=KnowledgeRetrievalCompletedEvent(
|
||||||
|
query=agent.knowledge_search_query,
|
||||||
|
from_task=task,
|
||||||
|
from_agent=agent,
|
||||||
|
retrieved_knowledge=_combine_knowledge_context(agent),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
agent,
|
||||||
|
event=KnowledgeSearchQueryFailedEvent(
|
||||||
|
query=agent.knowledge_search_query or "",
|
||||||
|
error=str(e),
|
||||||
|
from_task=task,
|
||||||
|
from_agent=agent,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return task_prompt
|
||||||
Reference in New Issue
Block a user