Compare commits

..

21 Commits

Author SHA1 Message Date
lorenzejay
64052745b7 Enhance Flow Listener Logic and Agent Imports
- Updated the Flow class to track fired OR listeners, ensuring that multi-source OR listeners only trigger once during execution. This prevents redundant executions and improves flow efficiency.
- Cleared fired OR listeners during cyclic flow resets to allow re-execution in new cycles.
- Modified the Agent class imports to include Coroutine from collections.abc, enhancing type handling for asynchronous operations.

These changes improve the control and performance of flow execution in CrewAI, ensuring more predictable behavior in complex scenarios.
2026-01-15 16:12:13 -08:00
lorenzejay
7f7b5094cc Enhance Agent and Flow Execution Logic
- Updated the Agent class to automatically detect the event loop and return a coroutine when called within a Flow, simplifying async handling for users.
- Modified Flow class to execute listeners sequentially, preventing race conditions on shared state during listener execution.
- Improved handling of coroutine results from synchronous methods, ensuring proper execution flow and state management.

These changes enhance the overall execution logic and user experience when working with agents and flows in CrewAI.
2026-01-15 15:51:39 -08:00
lorenzejay
ad83e8a2bf Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-decouple-executor-from-crew 2026-01-15 14:45:17 -08:00
lorenzejay
601eda9095 Enhance Flow Execution Logic
- Introduced conditional execution for start methods in the Flow class.
- Unconditional start methods are prioritized during kickoff, while conditional starts are executed only if no unconditional starts are present.
- Improved handling of cyclic flows by allowing re-execution of conditional start methods triggered by routers.
- Added checks to continue execution chains for completed conditional starts.

These changes improve the flexibility and control of flow execution, ensuring that the correct methods are triggered based on the defined conditions.
2026-01-15 09:29:25 -08:00
lorenzejay
83c62a65dd Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-decouple-executor-from-crew 2026-01-15 09:12:38 -08:00
lorenzejay
3a1deb193a fixed cassette 2026-01-14 19:06:28 -08:00
lorenzejay
09185acc0d refactor: streamline agent execution and enhance flow compatibility
Refactored the Agent class to simplify the execution method by removing the event loop check and clarifying the behavior when called from synchronous and asynchronous contexts. The changes ensure that the method operates seamlessly within flow methods, improving clarity in the documentation. Additionally, updated the AgentExecutor to set the response model to None, enhancing flexibility. New test cassettes were added to validate the functionality of agents within flow contexts, ensuring robust testing for both synchronous and asynchronous operations.
2026-01-14 18:51:09 -08:00
lorenzejay
6541f01b1b working cassette 2026-01-14 16:40:35 -08:00
lorenzejay
3a6702e9c8 working 2026-01-14 16:27:50 -08:00
lorenzejay
e4bd7889fd test fix cassette 2026-01-14 16:23:36 -08:00
lorenzejay
842a1db16f test fix cassette 2026-01-14 16:23:19 -08:00
lorenzejay
e9b86100c7 refactor: update test task guardrail process output for improved validation
Refactored the test for task guardrail process output to enhance the validation of the output against the OpenAPI schema. The changes include a more structured request body and updated response handling to ensure compliance with the guardrail requirements. This update aims to improve the clarity and reliability of the test cases, ensuring that task outputs are correctly validated and feedback is appropriately provided.
2026-01-14 16:05:38 -08:00
lorenzejay
341812d58e refactor: improve test for Agent kickoff parameters
Updated the test for the Agent class to ensure that the kickoff method correctly preserves parameters. The test now verifies the configuration of the agent after kickoff, enhancing clarity and maintainability. Additionally, the test for asynchronous kickoff within a flow context has been updated to reflect the Agent class instead of LiteAgent.
2026-01-14 15:56:53 -08:00
lorenzejay
38db734561 fix test 2026-01-14 15:39:34 -08:00
lorenzejay
5048d54981 Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-decouple-executor-from-crew 2026-01-14 14:28:33 -08:00
lorenzejay
ae17178e86 linting and tests 2026-01-14 14:28:09 -08:00
lorenzejay
b7a13e15ff refactor: enhance agent kickoff preparation by separating common logic
Updated the Agent class to introduce a new private method  that consolidates the common setup logic for both synchronous and asynchronous kickoff executions. This change improves code clarity and maintainability by reducing redundancy in the kickoff process, while ensuring that the agent can still execute effectively within both standalone and flow contexts.
2026-01-14 14:27:39 -08:00
lorenzejay
13dc7e25e0 ensure executors work inside a flow due to flow in flow async structure 2026-01-14 14:23:10 -08:00
lorenzejay
5cef85c643 refactor: streamline AgentExecutor initialization by removing redundant parameters
Updated the Agent class to simplify the initialization of the AgentExecutor by removing unnecessary task and crew parameters in standalone mode. This change enhances code clarity and maintains backward compatibility by ensuring that the executor is correctly configured without redundant assignments.
2026-01-09 18:27:07 -08:00
lorenzejay
dc3ae9396d fix: handle None task in AgentExecutor to prevent errors
Added a check to ensure that if the task is None, the method returns early without attempting to access task properties. This change improves the robustness of the AgentExecutor by preventing potential errors when the task is not set.
2026-01-09 18:07:37 -08:00
lorenzejay
0029f8193c wip restrcuturing agent executor and liteagent 2026-01-09 14:42:50 -08:00
33 changed files with 2859 additions and 3192 deletions

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from collections.abc import Callable, Coroutine, Sequence
import shutil
import subprocess
import time
@@ -34,6 +34,11 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
@@ -43,10 +48,10 @@ from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
)
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.agent_executor import AgentExecutor
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.lite_agent import LiteAgent
from crewai.lite_agent_output import LiteAgentOutput
from crewai.llms.base_llm import BaseLLM
from crewai.mcp import (
MCPClient,
@@ -64,15 +69,18 @@ from crewai.security.fingerprint import Fingerprint
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities.agent_utils import (
get_tool_names,
is_inside_event_loop,
load_agent_from_repository,
parse_tools,
render_text_description_and_args,
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import Converter
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -89,9 +97,9 @@ if TYPE_CHECKING:
from crewai_tools import CodeInterpreterTool
from crewai.agents.agent_builder.base_agent import PlatformAppOrAction
from crewai.lite_agent_output import LiteAgentOutput
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.types import LLMMessage
@@ -113,7 +121,7 @@ class Agent(BaseAgent):
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes:
agent_executor: An instance of the CrewAgentExecutor or CrewAgentExecutorFlow class.
agent_executor: An instance of the CrewAgentExecutor or AgentExecutor class.
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
@@ -238,9 +246,9 @@ class Agent(BaseAgent):
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
""",
)
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
executor_class: type[CrewAgentExecutor] | type[AgentExecutor] = Field(
default=CrewAgentExecutor,
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use CrewAgentExecutorFlow.",
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use AgentExecutor.",
)
@model_validator(mode="before")
@@ -725,17 +733,9 @@ class Agent(BaseAgent):
raw_tools: list[BaseTool] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
use_native_tool_calling = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and len(raw_tools) > 0
)
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
use_native_tool_calling=use_native_tool_calling,
i18n=self.i18n,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
@@ -743,8 +743,6 @@ class Agent(BaseAgent):
response_template=self.response_template,
).task_execution()
print("prompt", prompt)
stop_words = [self.i18n.slice("observation")]
if self.response_template:
@@ -1593,26 +1591,25 @@ class Agent(BaseAgent):
)
return None
def kickoff(
def _prepare_kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent with the given messages using a LiteAgent instance.
) -> tuple[AgentExecutor, dict[str, str], dict[str, Any], list[CrewStructuredTool]]:
"""Prepare common setup for kickoff execution.
This method is useful when you want to use the Agent configuration but
with the simpler and more direct execution flow of LiteAgent.
This method handles all the common preparation logic shared between
kickoff() and kickoff_async(), including tool processing, prompt building,
executor creation, and input formatting.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
LiteAgentOutput: The result of the agent execution.
Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution.
"""
# Process platform apps and MCP tools
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
if platform_tools and self.tools is not None:
@@ -1622,25 +1619,359 @@ class Agent(BaseAgent):
if mcps and self.tools is not None:
self.tools.extend(mcps)
lite_agent = LiteAgent(
id=self.id,
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
verbose=self.verbose,
response_format=response_format,
# Prepare tools
raw_tools: list[BaseTool] = self.tools or []
parsed_tools = parse_tools(raw_tools)
# Build agent_info for backward-compatible event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": raw_tools,
"verbose": self.verbose,
}
# Build prompt for standalone execution
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
i18n=self.i18n,
original_agent=self,
guardrail=self.guardrail,
guardrail_max_retries=self.guardrail_max_retries,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
prompt_template=self.prompt_template,
response_template=self.response_template,
).task_execution()
# Prepare stop words
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
# Get RPM limit function
rpm_limit_fn = (
self._rpm_controller.check_or_wait if self._rpm_controller else None
)
return lite_agent.kickoff(messages)
# Create the executor for standalone mode (no crew, no task)
executor = AgentExecutor(
task=None,
crew=None,
llm=cast(BaseLLM, self.llm),
agent=self,
prompt=prompt,
max_iter=self.max_iter,
tools=parsed_tools,
tools_names=get_tool_names(parsed_tools),
stop_words=stop_words,
tools_description=render_text_description_and_args(parsed_tools),
tools_handler=self.tools_handler,
original_tools=raw_tools,
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=response_format,
i18n=self.i18n,
)
# Format messages
if isinstance(messages, str):
formatted_messages = messages
else:
formatted_messages = "\n".join(
str(msg.get("content", "")) for msg in messages if msg.get("content")
)
# Build the input dict for the executor
inputs = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),
"tools": render_text_description_and_args(parsed_tools),
}
return executor, inputs, agent_info, parsed_tools
def kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""
Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, and guardrails.
When called from within a Flow (sync or async method), this automatically
detects the event loop and returns a coroutine that the Flow framework
awaits. Users don't need to handle async explicitly.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
LiteAgentOutput: The result of the agent execution.
When inside a Flow, returns a coroutine that resolves to LiteAgentOutput.
Note:
For explicit async usage outside of Flow, use kickoff_async() directly.
"""
# Magic auto-async: if inside event loop (e.g., inside a Flow),
# return coroutine for Flow to await
if is_inside_event_loop():
return self.kickoff_async(messages, response_format)
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
)
try:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
),
)
output = self._execute_and_build_output(executor, inputs, response_format)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
except Exception as e:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise
def _execute_and_build_output(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent and build the output object.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent (this is called from sync path, so invoke returns dict)
result = cast(dict[str, Any], executor.invoke(inputs))
raw_output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
raw=raw_output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
async def _execute_and_build_output_async(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously and build the output object.
This is the async version of _execute_and_build_output that uses
invoke_async() for native async execution within event loops.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent asynchronously
result = await executor.invoke_async(inputs)
raw_output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
raw=raw_output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
def _process_kickoff_guardrail(
self,
output: LiteAgentOutput,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
retry_count: int = 0,
) -> LiteAgentOutput:
"""Process guardrail for kickoff execution with retry logic.
Args:
output: Current agent output.
executor: The executor instance.
inputs: Input dictionary for re-execution.
response_format: Optional response format.
retry_count: Current retry count.
Returns:
Validated/updated output.
"""
from crewai.utilities.guardrail_types import GuardrailCallable
# Ensure guardrail is callable
guardrail_callable: GuardrailCallable
if isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
guardrail_callable = cast(
GuardrailCallable,
LLMGuardrail(description=self.guardrail, llm=cast(BaseLLM, self.llm)),
)
elif callable(self.guardrail):
guardrail_callable = self.guardrail
else:
# Should not happen if called from kickoff with guardrail check
return output
guardrail_result = process_guardrail(
output=output,
guardrail=guardrail_callable,
retry_count=retry_count,
event_source=self,
from_agent=self,
)
if not guardrail_result.success:
if retry_count >= self.guardrail_max_retries:
raise ValueError(
f"Agent's guardrail failed validation after {self.guardrail_max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
# Add feedback and re-execute
executor._append_message_to_state(
guardrail_result.error or "Guardrail validation failed",
role="user",
)
# Re-execute and build new output
output = self._execute_and_build_output(executor, inputs, response_format)
# Recursively retry guardrail
return self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
retry_count=retry_count + 1,
)
# Apply guardrail result if available
if guardrail_result.result is not None:
if isinstance(guardrail_result.result, str):
output.raw = guardrail_result.result
elif isinstance(guardrail_result.result, BaseModel):
output.pydantic = guardrail_result.result
return output
async def kickoff_async(
self,
@@ -1648,9 +1979,11 @@ class Agent(BaseAgent):
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages using a LiteAgent instance.
Execute the agent asynchronously with the given messages.
This is the async version of the kickoff method.
This is the async version of the kickoff method that uses native async
execution. It is designed for use within async contexts, such as when
called from within an async Flow method.
Args:
messages: Either a string query or a list of message dictionaries.
@@ -1661,21 +1994,48 @@ class Agent(BaseAgent):
Returns:
LiteAgentOutput: The result of the agent execution.
"""
lite_agent = LiteAgent(
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
verbose=self.verbose,
response_format=response_format,
i18n=self.i18n,
original_agent=self,
guardrail=self.guardrail,
guardrail_max_retries=self.guardrail_max_retries,
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
)
return await lite_agent.kickoff_async(messages)
try:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
),
)
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
except Exception as e:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise

View File

@@ -236,30 +236,14 @@ def process_tool_results(agent: Agent, result: Any) -> Any:
def save_last_messages(agent: Agent) -> None:
"""Save the last messages from agent executor.
Sanitizes messages to be compatible with TaskOutput's LLMMessage type,
which only accepts 'user', 'assistant', 'system' roles and requires
content to be a string or list (not None).
Args:
agent: The agent instance.
"""
if not agent.agent_executor or not hasattr(agent.agent_executor, "messages"):
agent._last_messages = []
return
sanitized_messages = []
for msg in agent.agent_executor.messages:
role = msg.get("role", "")
# Only include messages with valid LLMMessage roles
if role not in ("user", "assistant", "system"):
continue
# Ensure content is not None (can happen with tool call assistant messages)
content = msg.get("content")
if content is None:
content = ""
sanitized_messages.append({"role": role, "content": content})
agent._last_messages = sanitized_messages
agent._last_messages = (
agent.agent_executor.messages.copy()
if agent.agent_executor and hasattr(agent.agent_executor, "messages")
else []
)
def prepare_tools(

View File

@@ -21,9 +21,9 @@ if TYPE_CHECKING:
class CrewAgentExecutorMixin:
crew: Crew
crew: Crew | None
agent: Agent
task: Task
task: Task | None
iterations: int
max_iter: int
messages: list[LLMMessage]

View File

@@ -30,7 +30,6 @@ from crewai.hooks.llm_hooks import (
)
from crewai.utilities.agent_utils import (
aget_llm_response,
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -216,33 +215,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
def _invoke_loop(self) -> AgentFinish:
"""Execute agent loop until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return self._invoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return self._invoke_loop_react()
def _invoke_loop_react(self) -> AgentFinish:
"""Execute agent loop using ReAct text-based pattern.
This is the traditional approach where tool definitions are embedded
in the prompt and the LLM outputs Action/Action Input text that is
parsed to execute tools.
Returns:
Final answer from the agent.
"""
@@ -272,10 +244,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("get_llm_response answer", answer)
print("--------------------------------")
# breakpoint()
if self.response_model is not None:
try:
self.response_model.model_validate_json(answer)
@@ -365,338 +333,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
def _invoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern. The LLM directly returns
structured tool calls which are executed and results fed back.
Returns:
Final answer from the agent.
"""
print("--------------------------------")
print("invoke_loop_native_tools")
print("--------------------------------")
# Convert tools to OpenAI schema format
if not self.original_tools:
# No tools available, fall back to simple LLM call
return self._invoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Debug: Show messages being sent to LLM
print("--------------------------------")
print(f"Messages count: {len(self.messages)}")
for i, msg in enumerate(self.messages):
role = msg.get("role", "unknown")
content = msg.get("content", "")
if content:
preview = (
content[:200] + "..." if len(content) > 200 else content
)
else:
preview = "(no content)"
print(f" [{i}] {role}: {preview}")
print("--------------------------------")
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("invoke_loop_native_tools answer", answer)
print("--------------------------------")
# print("get_llm_response answer", answer[:500] + "...")
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
def _invoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = get_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# OpenAI-style
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Anthropic-style
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Gemini-style
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
def _handle_native_tool_calls(
self,
tool_calls: list[Any],
available_functions: dict[str, Callable[..., Any]],
) -> None:
"""Handle a single native tool call from the LLM.
Executes only the FIRST tool call and appends the result to message history.
This enables sequential tool execution with reflection after each tool,
allowing the LLM to reason about results before deciding on next steps.
Args:
tool_calls: List of tool calls from the LLM (only first is processed).
available_functions: Dict mapping function names to callables.
"""
from datetime import datetime
import json
from crewai.events import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
if not tool_calls:
return
# Only process the FIRST tool call for sequential execution with reflection
tool_call = tool_calls[0]
# Extract tool call info - handle OpenAI-style, Anthropic-style, and Gemini-style
if hasattr(tool_call, "function"):
# OpenAI-style: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini-style: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
print(f"Using Tool: {func_name}")
result = "Tool not found"
if func_name in available_functions:
try:
tool_func = available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.messages.append(reasoning_message)
async def ainvoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute the agent asynchronously with given inputs.
@@ -746,29 +382,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
async def _ainvoke_loop(self) -> AgentFinish:
"""Execute agent loop asynchronously until completion.
Checks if the LLM supports native function calling and uses that
approach if available, otherwise falls back to the ReAct text pattern.
Returns:
Final answer from the agent.
"""
# Check if model supports native function calling
use_native_tools = (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and self.original_tools
)
if use_native_tools:
return await self._ainvoke_loop_native_tools()
# Fall back to ReAct text-based pattern
return await self._ainvoke_loop_react()
async def _ainvoke_loop_react(self) -> AgentFinish:
"""Execute agent loop asynchronously using ReAct text-based pattern.
Returns:
Final answer from the agent.
"""
@@ -882,139 +495,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
async def _ainvoke_loop_native_tools(self) -> AgentFinish:
"""Execute agent loop asynchronously using native function calling.
This method uses the LLM's native tool/function calling capability
instead of the text-based ReAct pattern.
Returns:
Final answer from the agent.
"""
# Convert tools to OpenAI schema format
if not self.original_tools:
return await self._ainvoke_loop_native_no_tools()
openai_tools, available_functions = convert_tools_to_openai_schema(
self.original_tools
)
while True:
try:
if has_reached_max_iterations(self.iterations, self.max_iter):
formatted_answer = handle_max_iterations_exceeded(
None,
printer=self._printer,
i18n=self._i18n,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
)
self._show_logs(formatted_answer)
return formatted_answer
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution
# via _handle_native_tool_calls to properly manage message history.
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
tools=openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
print("--------------------------------")
print("native llm completion answer", answer)
print("--------------------------------")
# Check if the response is a list of tool calls
if (
isinstance(answer, list)
and answer
and self._is_tool_call_list(answer)
):
# Handle tool calls - execute tools and add results to messages
self._handle_native_tool_calls(answer, available_functions)
# Continue loop to let LLM analyze results and decide next steps
continue
# Text or other response - handle as potential final answer
if isinstance(answer, str):
# Text response - this is the final answer
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(formatted_answer)
self._append_message(answer) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(formatted_answer)
self._append_message(str(answer)) # Save final answer to messages
self._show_logs(formatted_answer)
return formatted_answer
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
raise e
if is_context_length_exceeded(e):
handle_context_length(
respect_context_window=self.respect_context_window,
printer=self._printer,
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
)
continue
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
async def _ainvoke_loop_native_no_tools(self) -> AgentFinish:
"""Execute a simple async LLM call when no tools are available.
Returns:
Final answer from the agent.
"""
enforce_rpm_limit(self.request_within_rpm_limit)
answer = await aget_llm_response(
llm=self.llm,
messages=self.messages,
callbacks=self.callbacks,
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
def _handle_agent_action(
self, formatted_answer: AgentAction, tool_result: ToolResult
) -> AgentAction | AgentFinish:

View File

@@ -378,12 +378,6 @@ class EventListener(BaseEventListener):
self.formatter.handle_llm_tool_usage_finished(
event.tool_name,
)
else:
self.formatter.handle_tool_usage_finished(
event.tool_name,
event.output,
getattr(event, "run_attempts", None),
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source: Any, event: ToolUsageErrorEvent) -> None:

View File

@@ -366,32 +366,6 @@ To enable tracing, do any one of these:
self.print_panel(content, f"🔧 Tool Execution Started (#{iteration})", "yellow")
def handle_tool_usage_finished(
self,
tool_name: str,
output: str,
run_attempts: int | None = None,
) -> None:
"""Handle tool usage finished event with panel display."""
if not self.verbose:
return
iteration = self.tool_usage_counts.get(tool_name, 1)
content = Text()
content.append("Tool Completed\n", style="green bold")
content.append("Tool: ", style="white")
content.append(f"{tool_name}\n", style="green bold")
if output:
content.append("Output: ", style="white")
content.append(f"{output}\n", style="green")
self.print_panel(
content, f"✅ Tool Execution Completed (#{iteration})", "green"
)
def handle_tool_usage_error(
self,
tool_name: str,

View File

@@ -1,4 +1,4 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.agent_executor import AgentExecutor, CrewAgentExecutorFlow
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
@@ -23,8 +23,9 @@ from crewai.experimental.evaluation import (
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"AgentExecutor",
"BaseEvaluator",
"CrewAgentExecutorFlow",
"CrewAgentExecutorFlow", # Deprecated alias for AgentExecutor
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",

View File

@@ -1,8 +1,6 @@
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
import json
from collections.abc import Callable, Coroutine
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -19,24 +17,16 @@ from crewai.agents.parser import (
OutputParserError,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled_in_context,
)
from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.flow.flow import Flow, listen, or_, router, start
from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
enforce_rpm_limit,
format_message_for_llm,
get_llm_response,
@@ -47,6 +37,7 @@ from crewai.utilities.agent_utils import (
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
is_inside_event_loop,
process_llm_response,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -81,17 +72,19 @@ class AgentReActState(BaseModel):
current_answer: AgentAction | AgentFinish | None = Field(default=None)
is_finished: bool = Field(default=False)
ask_for_human_input: bool = Field(default=False)
use_native_tools: bool = Field(default=False)
pending_tool_calls: list[Any] = Field(default_factory=list)
class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based executor matching CrewAgentExecutor interface.
class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based agent executor for both standalone and crew-bound execution.
Inherits from:
- Flow[AgentReActState]: Provides flow orchestration capabilities
- CrewAgentExecutorMixin: Provides memory methods (short/long/external term)
This executor can operate in two modes:
- Standalone mode: When crew and task are None (used by Agent.kickoff())
- Crew mode: When crew and task are provided (used by Agent.execute_task())
Note: Multiple instances may be created during agent initialization
(cache setup, RPM controller setup, etc.) but only the final instance
should execute tasks via invoke().
@@ -100,8 +93,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def __init__(
self,
llm: BaseLLM,
task: Task,
crew: Crew,
agent: Agent,
prompt: SystemPromptResult | StandardPromptResult,
max_iter: int,
@@ -110,6 +101,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
task: Task | None = None,
crew: Crew | None = None,
step_callback: Any = None,
original_tools: list[BaseTool] | None = None,
function_calling_llm: BaseLLM | Any | None = None,
@@ -123,8 +116,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
@@ -133,6 +124,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
task: Optional task to execute (None for standalone agent execution).
crew: Optional crew instance (None for standalone agent execution).
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
@@ -143,9 +136,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
self._i18n: I18N = i18n or get_i18n()
self.llm = llm
self.task = task
self.task: Task | None = task
self.agent = agent
self.crew = crew
self.crew: Crew | None = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
@@ -190,11 +183,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
else self.stop
)
)
# Native tool calling support
self._openai_tools: list[dict[str, Any]] = []
self._available_functions: dict[str, Callable[..., Any]] = {}
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
@@ -205,66 +193,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Only the instance that actually executes via invoke() will emit events.
"""
if not self._flow_initialized:
current_tracing = is_tracing_enabled_in_context()
# Now call Flow's __init__ which will replace self._state
# with Flow's managed state. Suppress flow events since this is
# an agent executor, not a user-facing flow.
super().__init__(
suppress_flow_events=True,
tracing=current_tracing if current_tracing else None,
)
self._flow_initialized = True
def _check_native_tool_support(self) -> bool:
"""Check if LLM supports native function calling.
Returns:
True if the LLM supports native function calling and tools are available.
"""
return (
hasattr(self.llm, "supports_function_calling")
and callable(getattr(self.llm, "supports_function_calling", None))
and self.llm.supports_function_calling()
and bool(self.original_tools)
)
def _setup_native_tools(self) -> None:
"""Convert tools to OpenAI schema format for native function calling."""
if self.original_tools:
self._openai_tools, self._available_functions = (
convert_tools_to_openai_schema(self.original_tools)
)
def _is_tool_call_list(self, response: list[Any]) -> bool:
"""Check if a response is a list of tool calls.
Args:
response: The response to check.
Returns:
True if the response appears to be a list of tool calls.
"""
if not response:
return False
first_item = response[0]
# Check for OpenAI-style tool call structure
if hasattr(first_item, "function") or (
isinstance(first_item, dict) and "function" in first_item
):
return True
# Check for Anthropic-style tool call structure (ToolUseBlock)
if (
hasattr(first_item, "type")
and getattr(first_item, "type", None) == "tool_use"
):
return True
if hasattr(first_item, "name") and hasattr(first_item, "input"):
return True
# Check for Gemini-style function call (Part with function_call)
if hasattr(first_item, "function_call") and first_item.function_call:
return True
return False
@property
def use_stop_words(self) -> bool:
"""Check to determine if stop words are being used.
@@ -297,11 +233,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def initialize_reasoning(self) -> Literal["initialized"]:
"""Initialize the reasoning flow and emit agent start logs."""
self._show_start_logs()
# Check for native tool support on first iteration
if self.state.iterations == 0:
self.state.use_native_tools = self._check_native_tool_support()
if self.state.use_native_tools:
self._setup_native_tools()
return "initialized"
@listen("force_final_answer")
@@ -337,7 +268,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
response_model=None,
executor_context=self,
)
@@ -376,69 +307,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
handle_unknown_error(self._printer, e)
raise
@listen("continue_reasoning_native")
def call_llm_native_tools(
self,
) -> Literal["native_tool_calls", "native_finished", "context_error"]:
"""Execute LLM call with native function calling.
Returns routing decision based on whether tool calls or final answer.
"""
try:
enforce_rpm_limit(self.request_within_rpm_limit)
# Call LLM with native tools
# Pass available_functions=None so the LLM returns tool_calls
# without executing them. The executor handles tool execution.
answer = get_llm_response(
llm=self.llm,
messages=list(self.state.messages),
callbacks=self.callbacks,
printer=self._printer,
tools=self._openai_tools,
available_functions=None,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
# Check if the response is a list of tool calls
if isinstance(answer, list) and answer and self._is_tool_call_list(answer):
# Store tool calls for sequential processing
self.state.pending_tool_calls = list(answer)
return "native_tool_calls"
# Text response - this is the final answer
if isinstance(answer, str):
self.state.current_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(answer)
return "native_finished"
# Unexpected response type, treat as final answer
self.state.current_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._invoke_step_callback(self.state.current_answer)
self._append_message_to_state(str(answer))
return "native_finished"
except Exception as e:
if is_context_length_exceeded(e):
self._last_context_error = e
return "context_error"
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e)
raise
@router(call_llm_and_parse)
def route_by_answer_type(self) -> Literal["execute_tool", "agent_finished"]:
"""Route based on whether answer is AgentAction or AgentFinish."""
@@ -494,14 +362,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.is_finished = True
return "tool_result_is_final"
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "tool_completed"
except Exception as e:
@@ -511,143 +371,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self._console.print(error_text)
raise
@listen("native_tool_calls")
def execute_native_tool(self) -> Literal["native_tool_completed"]:
"""Execute a single native tool call and inject reasoning prompt.
Processes only the FIRST tool call from pending_tool_calls for
sequential execution with reflection after each tool.
"""
if not self.state.pending_tool_calls:
return "native_tool_completed"
tool_call = self.state.pending_tool_calls[0]
self.state.pending_tool_calls = [] # Clear pending calls
# Extract tool call info - handle OpenAI, Anthropic, and Gemini formats
if hasattr(tool_call, "function"):
# OpenAI format: has .function.name and .function.arguments
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.function.name
func_args = tool_call.function.arguments
elif hasattr(tool_call, "function_call") and tool_call.function_call:
# Gemini format: has .function_call.name and .function_call.args
call_id = f"call_{id(tool_call)}"
func_name = tool_call.function_call.name
func_args = (
dict(tool_call.function_call.args)
if tool_call.function_call.args
else {}
)
elif hasattr(tool_call, "name") and hasattr(tool_call, "input"):
# Anthropic format: has .name and .input (ToolUseBlock)
call_id = getattr(tool_call, "id", f"call_{id(tool_call)}")
func_name = tool_call.name
func_args = tool_call.input # Already a dict in Anthropic
elif isinstance(tool_call, dict):
call_id = tool_call.get("id", f"call_{id(tool_call)}")
func_info = tool_call.get("function", {})
func_name = func_info.get("name", "") or tool_call.get("name", "")
func_args = func_info.get("arguments", "{}") or tool_call.get("input", {})
else:
return "native_tool_completed"
# Append assistant message with single tool call
assistant_message: LLMMessage = {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": call_id,
"type": "function",
"function": {
"name": func_name,
"arguments": func_args
if isinstance(func_args, str)
else json.dumps(func_args),
},
}
],
}
self.state.messages.append(assistant_message)
# Parse arguments for the single tool call
if isinstance(func_args, str):
try:
args_dict = json.loads(func_args)
except json.JSONDecodeError:
args_dict = {}
else:
args_dict = func_args
# Emit tool usage started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
),
)
# Execute the tool
result = "Tool not found"
if func_name in self._available_functions:
try:
tool_func = self._available_functions[func_name]
result = tool_func(**args_dict)
if not isinstance(result, str):
result = str(result)
except Exception as e:
result = f"Error executing tool: {e}"
# Emit tool usage finished event
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=result,
tool_name=func_name,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
started_at=started_at,
finished_at=datetime.now(),
),
)
# Append tool result message
tool_message: LLMMessage = {
"role": "tool",
"tool_call_id": call_id,
"content": result,
}
self.state.messages.append(tool_message)
# Log the tool execution
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Tool {func_name} executed with result: {result[:200]}...",
color="green",
)
# Inject post-tool reasoning prompt to enforce analysis
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "native_tool_completed"
@router(execute_native_tool)
def increment_native_and_continue(self) -> Literal["initialized"]:
"""Increment iteration counter after native tool execution."""
self.state.iterations += 1
return "initialized"
@listen("initialized")
def continue_iteration(self) -> Literal["check_iteration"]:
"""Bridge listener that connects iteration loop back to iteration check."""
@@ -656,14 +379,10 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
@router(or_(initialize_reasoning, continue_iteration))
def check_max_iterations(
self,
) -> Literal[
"force_final_answer", "continue_reasoning", "continue_reasoning_native"
]:
) -> Literal["force_final_answer", "continue_reasoning"]:
"""Check if max iterations reached before proceeding with reasoning."""
if has_reached_max_iterations(self.state.iterations, self.max_iter):
return "force_final_answer"
if self.state.use_native_tools:
return "continue_reasoning_native"
return "continue_reasoning"
@router(execute_tool_action)
@@ -672,7 +391,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations += 1
return "initialized"
@listen(or_("agent_finished", "tool_result_is_final", "native_finished"))
@listen(or_("agent_finished", "tool_result_is_final"))
def finalize(self) -> Literal["completed", "skipped"]:
"""Finalize execution and emit completion logs."""
if self.state.current_answer is None:
@@ -734,9 +453,99 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
return "initialized"
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
def invoke(
self, inputs: dict[str, Any]
) -> dict[str, Any] | Coroutine[Any, Any, dict[str, Any]]:
"""Execute agent with given inputs.
When called from within an existing event loop (e.g., inside a Flow),
this method returns a coroutine that should be awaited. The Flow
framework handles this automatically.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output, or a coroutine if inside an event loop.
"""
# Magic auto-async: if inside event loop, return coroutine for Flow to await
if is_inside_event_loop():
return self.invoke_async(inputs)
self._ensure_flow_initialized()
with self._execution_lock:
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True
try:
# Reset state for fresh execution
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.ask_for_human_input = bool(
inputs.get("ask_for_human_input", False)
)
self.kickoff()
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
except AssertionError:
fail_text = Text()
fail_text.append("", style="red bold")
fail_text.append(
"Agent failed to reach a final answer. This is likely a bug - please report it.",
style="red",
)
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False
async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent asynchronously with given inputs.
This method is designed for use within async contexts, such as when
the agent is called from within an async Flow method. It uses
kickoff_async() directly instead of running in a separate thread.
Args:
inputs: Input dictionary containing prompt variables.
@@ -760,8 +569,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
self.state.use_native_tools = False
self.state.pending_tool_calls = []
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
@@ -779,7 +586,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
inputs.get("ask_for_human_input", False)
)
self.kickoff()
# Use async kickoff directly since we're already in an async context
await self.kickoff_async()
formatted_answer = self.state.current_answer
@@ -870,11 +678,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
if self.agent is None:
raise ValueError("Agent cannot be None")
if self.task is None:
return
crewai_event_bus.emit(
self.agent,
AgentLogsStartedEvent(
agent_role=self.agent.role,
task_description=(self.task.description if self.task else "Not Found"),
task_description=self.task.description,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
),
@@ -908,10 +719,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
# Early return if no crew (standalone mode)
if self.crew is None:
return
agent_id = str(self.agent.id)
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
train_iteration = getattr(self.crew, "_train_iteration", None)
if train_iteration is None or not isinstance(train_iteration, int):
train_error = Text()
@@ -1093,3 +906,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
requiring arbitrary_types_allowed=True.
"""
return core_schema.any_schema()
# Backward compatibility alias (deprecated)
CrewAgentExecutorFlow = AgentExecutor

View File

@@ -73,6 +73,7 @@ from crewai.flow.utils import (
is_simple_flow_condition,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.human_feedback import HumanFeedbackResult
@@ -519,6 +520,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods: dict[FlowMethodName, FlowMethod[Any, Any]] = {}
self._method_execution_counts: dict[FlowMethodName, int] = {}
self._pending_and_listeners: dict[PendingListenerKey, set[FlowMethodName]] = {}
self._fired_or_listeners: set[FlowMethodName] = (
set()
) # Track OR listeners that already fired
self._method_outputs: list[Any] = [] # list to store all method outputs
self._completed_methods: set[FlowMethodName] = (
set()
@@ -570,7 +574,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
flow_id: str,
persistence: FlowPersistence | None = None,
**kwargs: Any,
) -> "Flow[Any]":
) -> Flow[Any]:
"""Create a Flow instance from a pending feedback state.
This classmethod is used to restore a flow that was paused waiting
@@ -631,7 +635,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return instance
@property
def pending_feedback(self) -> "PendingFeedbackContext | None":
def pending_feedback(self) -> PendingFeedbackContext | None:
"""Get the pending feedback context if this flow is waiting for feedback.
Returns:
@@ -716,9 +720,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no pending feedback context exists
"""
from crewai.flow.human_feedback import HumanFeedbackResult
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if self._pending_feedback_context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
@@ -1295,6 +1300,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._fired_or_listeners.clear()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
@@ -1346,9 +1352,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(inputs)
try:
# Determine which start methods to execute at kickoff
# Conditional start methods (with __trigger_methods__) are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
unconditional_starts = [
start_method
for start_method in self._start_methods
if not getattr(
self._methods.get(start_method), "__trigger_methods__", None
)
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts
if unconditional_starts
else self._start_methods
)
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
except Exception as e:
@@ -1481,6 +1504,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(start_method_name)
# Also clear fired OR listeners to allow them to fire again in new cycle
self._fired_or_listeners.clear()
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
@@ -1503,11 +1528,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else result
)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
# Execute listeners sequentially to prevent race conditions on shared state
for listener_name in listeners_for_result:
await self._execute_single_listener(listener_name, listener_result)
else:
await self._execute_listeners(start_method_name, result)
@@ -1573,11 +1596,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
if future:
self._event_futures.append(future)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
# Run sync methods in thread pool for isolation
# This allows Agent.kickoff() to work synchronously inside Flow methods
import contextvars
ctx = contextvars.copy_context()
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
# Auto-await coroutines returned from sync methods (enables AgentExecutor pattern)
if asyncio.iscoroutine(result):
result = await result
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
@@ -1724,11 +1755,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
listener_result = router_result_to_feedback.get(
str(current_trigger), result
)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# Execute listeners sequentially to prevent race conditions on shared state
for listener_name in listeners_triggered:
await self._execute_single_listener(
listener_name, listener_result
)
if current_trigger in router_results:
# Find start methods triggered by this router result
@@ -1745,14 +1776,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
should_trigger = current_trigger in all_methods
if should_trigger:
# Only execute if this is a cycle (method was already completed)
# Execute conditional start method triggered by router result
if method_name in self._completed_methods:
# For router-triggered start methods in cycles, temporarily clear resumption flag
# to allow cyclic execution
# For cyclic re-execution, temporarily clear resumption flag
was_resuming = self._is_execution_resuming
self._is_execution_resuming = False
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
else:
# First-time execution of conditional start
await self._execute_start_method(method_name)
def _evaluate_condition(
self,
@@ -1850,8 +1883,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
condition_type, methods = condition_data
if condition_type == OR_CONDITION:
if trigger_method in methods:
triggered.append(listener_name)
# Only trigger multi-source OR listeners (or_(A, B, C)) once - skip if already fired
# Simple single-method listeners fire every time their trigger occurs
# Routers also fire every time - they're decision points
has_multiple_triggers = len(methods) > 1
should_check_fired = has_multiple_triggers and not is_router
if (
not should_check_fired
or listener_name not in self._fired_or_listeners
):
if trigger_method in methods:
triggered.append(listener_name)
# Only track multi-source OR listeners (not single-method or routers)
if should_check_fired:
self._fired_or_listeners.add(listener_name)
elif condition_type == AND_CONDITION:
pending_key = PendingListenerKey(listener_name)
if pending_key not in self._pending_and_listeners:
@@ -1864,10 +1910,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._pending_and_listeners.pop(pending_key, None)
elif is_flow_condition_dict(condition_data):
# For complex conditions, check if top-level is OR and track accordingly
top_level_type = condition_data.get("type", OR_CONDITION)
is_or_based = top_level_type == OR_CONDITION
# Only track multi-source OR conditions (multiple sub-conditions), not routers
sub_conditions = condition_data.get("conditions", [])
has_multiple_triggers = is_or_based and len(sub_conditions) > 1
should_check_fired = has_multiple_triggers and not is_router
# Skip compound OR-based listeners that have already fired
if should_check_fired and listener_name in self._fired_or_listeners:
continue
if self._evaluate_condition(
condition_data, trigger_method, listener_name
):
triggered.append(listener_name)
# Track compound OR-based listeners so they only fire once
if should_check_fired:
self._fired_or_listeners.add(listener_name)
return triggered
@@ -1896,9 +1958,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
await self._execute_listeners(listener_name, None)
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if listener_name in self._routers:
for start_method_name in self._start_methods:
if (
start_method_name in self._listeners
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
await self._execute_start_method(start_method_name)
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)
# Also clear from fired OR listeners for cyclic flows
self._fired_or_listeners.discard(listener_name)
try:
method = self._methods[listener_name]
@@ -1931,11 +2006,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else listener_result
)
tasks = [
self._execute_single_listener(name, feedback_result)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
# Execute listeners sequentially to prevent race conditions on shared state
for name in listeners_for_result:
await self._execute_single_listener(name, feedback_result)
except Exception as e:
# Don't log HumanFeedbackPending as an error - it's expected control flow

View File

@@ -10,6 +10,7 @@ from typing import (
get_origin,
)
import uuid
import warnings
from pydantic import (
UUID4,
@@ -80,6 +81,11 @@ class LiteAgent(FlowTrackable, BaseModel):
"""
A lightweight agent that can process messages and use tools.
.. deprecated::
LiteAgent is deprecated and will be removed in a future version.
Use ``Agent().kickoff(messages)`` instead, which provides the same
functionality with additional features like memory and knowledge support.
This agent is simpler than the full Agent class, focusing on direct execution
rather than task delegation. It's designed to be used for simple interactions
where a full crew is not needed.
@@ -164,6 +170,18 @@ class LiteAgent(FlowTrackable, BaseModel):
default_factory=get_after_llm_call_hooks
)
@model_validator(mode="after")
def emit_deprecation_warning(self) -> Self:
"""Emit deprecation warning for LiteAgent usage."""
warnings.warn(
"LiteAgent is deprecated and will be removed in a future version. "
"Use Agent().kickoff(messages) instead, which provides the same "
"functionality with additional features like memory and knowledge support.",
DeprecationWarning,
stacklevel=2,
)
return self
@model_validator(mode="after")
def setup_llm(self) -> Self:
"""Set up the LLM and other components after initialization."""

View File

@@ -931,6 +931,7 @@ class LLM(BaseLLM):
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
if not tool_calls or not available_functions:
if response_model and self.is_litellm:
instructor_instance = InternalInstructor(
content=full_response,
@@ -1143,12 +1144,8 @@ class LLM(BaseLLM):
if response_model:
params["response_model"] = response_model
response = litellm.completion(**params)
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1202,19 +1199,16 @@ class LLM(BaseLLM):
)
return text_response
# --- 6) If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
if tool_calls and not available_functions and not text_response:
return tool_calls
# --- 7) Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(
response=text_response,
@@ -1279,11 +1273,7 @@ class LLM(BaseLLM):
params["response_model"] = response_model
response = await litellm.acompletion(**params)
if (
hasattr(response, "usage")
and not isinstance(response.usage, type)
and response.usage
):
if hasattr(response,"usage") and not isinstance(response.usage, type) and response.usage:
usage_info = response.usage
self._track_token_usage_internal(usage_info)
@@ -1331,18 +1321,14 @@ class LLM(BaseLLM):
)
return text_response
# If there are tool calls but no available functions, return the tool calls
# This allows the caller (e.g., executor) to handle tool execution
if tool_calls and not available_functions:
if tool_calls and not available_functions and not text_response:
return tool_calls
# Handle tool calls if present (execute when available_functions provided)
if tool_calls and available_functions:
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
tool_result = self._handle_tool_call(
tool_calls, available_functions, from_task, from_agent
)
if tool_result is not None:
return tool_result
self._handle_emit_call_events(
response=text_response,
@@ -1377,7 +1363,7 @@ class LLM(BaseLLM):
"""
full_response = ""
chunk_count = 0
usage_info = None
accumulated_tool_args: defaultdict[int, AccumulatedToolArgs] = defaultdict(

View File

@@ -445,7 +445,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
)
return result
return str(result)
except Exception as e:
error_msg = f"Error executing function '{function_name}': {e!s}"

View File

@@ -418,7 +418,6 @@ class AnthropicCompletion(BaseLLM):
- System messages are separate from conversation messages
- Messages must alternate between user and assistant
- First message must be from user
- Tool results must be in user messages with tool_result content blocks
- When thinking is enabled, assistant messages must start with thinking blocks
Args:
@@ -432,7 +431,6 @@ class AnthropicCompletion(BaseLLM):
formatted_messages: list[LLMMessage] = []
system_message: str | None = None
pending_tool_results: list[dict[str, Any]] = []
for message in base_formatted:
role = message.get("role")
@@ -443,47 +441,16 @@ class AnthropicCompletion(BaseLLM):
system_message += f"\n\n{content}"
else:
system_message = cast(str, content)
elif role == "tool":
# Convert OpenAI-style tool message to Anthropic tool_result format
# These will be collected and added as a user message
tool_call_id = message.get("tool_call_id", "")
tool_result = {
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": content if content else "",
}
pending_tool_results.append(tool_result)
elif role == "assistant":
# First, flush any pending tool results as a user message
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
else:
role_str = role if role is not None else "user"
# Handle assistant message with tool_calls (convert to Anthropic format)
tool_calls = message.get("tool_calls", [])
if tool_calls:
assistant_content: list[dict[str, Any]] = []
for tc in tool_calls:
if isinstance(tc, dict):
func = tc.get("function", {})
tool_use = {
"type": "tool_use",
"id": tc.get("id", ""),
"name": func.get("name", ""),
"input": json.loads(func.get("arguments", "{}"))
if isinstance(func.get("arguments"), str)
else func.get("arguments", {}),
}
assistant_content.append(tool_use)
if assistant_content:
formatted_messages.append(
{"role": "assistant", "content": assistant_content}
)
elif isinstance(content, list):
formatted_messages.append({"role": "assistant", "content": content})
elif self.thinking and self.previous_thinking_blocks:
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
elif (
role_str == "assistant"
and self.thinking
and self.previous_thinking_blocks
):
structured_content = cast(
list[dict[str, Any]],
[
@@ -492,34 +459,14 @@ class AnthropicCompletion(BaseLLM):
],
)
formatted_messages.append(
LLMMessage(role="assistant", content=structured_content)
LLMMessage(role=role_str, content=structured_content)
)
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role="assistant", content=content_str)
)
else:
# User message - first flush any pending tool results
if pending_tool_results:
formatted_messages.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
role_str = role if role is not None else "user"
if isinstance(content, list):
formatted_messages.append({"role": role_str, "content": content})
else:
content_str = content if content is not None else ""
formatted_messages.append(
LLMMessage(role=role_str, content=content_str)
)
# Flush any remaining pending tool results
if pending_tool_results:
formatted_messages.append({"role": "user", "content": pending_tool_results})
# Ensure first message is from user (Anthropic requirement)
if not formatted_messages:
# If no messages, add a default user message
@@ -579,19 +526,13 @@ class AnthropicCompletion(BaseLLM):
return structured_json
# Check if Claude wants to use tools
if response.content:
if response.content and available_functions:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
# This allows the executor to manage tool execution with proper
# message history and post-tool reasoning prompts
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
# Handle tool use conversation flow
return self._handle_tool_use_conversation(
response,
tool_uses,
@@ -755,7 +696,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content:
if final_message.content and available_functions:
tool_uses = [
block
for block in final_message.content
@@ -763,11 +704,7 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
# Handle tool use conversation flow internally
# Handle tool use conversation flow
return self._handle_tool_use_conversation(
final_message,
tool_uses,
@@ -996,16 +933,12 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if response.content:
if response.content and available_functions:
tool_uses = [
block for block in response.content if isinstance(block, ToolUseBlock)
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
response,
tool_uses,
@@ -1146,7 +1079,7 @@ class AnthropicCompletion(BaseLLM):
return structured_json
if final_message.content:
if final_message.content and available_functions:
tool_uses = [
block
for block in final_message.content
@@ -1154,10 +1087,6 @@ class AnthropicCompletion(BaseLLM):
]
if tool_uses:
# If no available_functions, return tool calls for executor to handle
if not available_functions:
return list(tool_uses)
return await self._ahandle_tool_use_conversation(
final_message,
tool_uses,

View File

@@ -514,31 +514,10 @@ class AzureCompletion(BaseLLM):
for message in base_formatted:
role = message.get("role", "user") # Default to user if no role
# Handle None content - Azure requires string content
content = message.get("content") or ""
content = message.get("content", "")
# Handle tool role messages - keep as tool role for Azure OpenAI
if role == "tool":
tool_call_id = message.get("tool_call_id", "unknown")
azure_messages.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"content": content,
}
)
# Handle assistant messages with tool_calls
elif role == "assistant" and message.get("tool_calls"):
tool_calls = message.get("tool_calls", [])
azure_msg: LLMMessage = {
"role": "assistant",
"content": content, # Already defaulted to "" above
"tool_calls": tool_calls,
}
azure_messages.append(azure_msg)
else:
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
# Azure AI Inference requires both 'role' and 'content'
azure_messages.append({"role": role, "content": content})
return azure_messages
@@ -625,11 +604,6 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
return list(message.tool_calls)
# Handle tool calls
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0] # Handle first tool call
@@ -801,21 +775,6 @@ class AzureCompletion(BaseLLM):
from_agent=from_agent,
)
# If there are tool_calls but no available_functions, return them
# in OpenAI-compatible format for executor to handle
if tool_calls and not available_functions:
return [
{
"id": call_data.get("id", f"call_{idx}"),
"type": "function",
"function": {
"name": call_data["name"],
"arguments": call_data["arguments"],
},
}
for idx, call_data in tool_calls.items()
]
# Handle completed tool calls
if tool_calls and available_functions:
for call_data in tool_calls.values():

View File

@@ -606,17 +606,6 @@ class GeminiCompletion(BaseLLM):
if response.candidates and (self.tools or available_functions):
candidate = response.candidates[0]
if candidate.content and candidate.content.parts:
# Collect function call parts
function_call_parts = [
part for part in candidate.content.parts if part.function_call
]
# If there are function calls but no available_functions,
# return them for the executor to handle (like OpenAI/Anthropic)
if function_call_parts and not available_functions:
return function_call_parts
# Otherwise execute the tools internally
for part in candidate.content.parts:
if part.function_call:
function_name = part.function_call.name
@@ -731,7 +720,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | list[dict[str, Any]]:
) -> str:
"""Finalize streaming response with usage tracking, function execution, and events.
Args:
@@ -749,21 +738,6 @@ class GeminiCompletion(BaseLLM):
"""
self._track_token_usage_internal(usage_data)
# If there are function calls but no available_functions,
# return them for the executor to handle
if function_calls and not available_functions:
return [
{
"id": call_data["id"],
"function": {
"name": call_data["name"],
"arguments": json.dumps(call_data["args"]),
},
"type": "function",
}
for call_data in function_calls.values()
]
# Handle completed function calls
if function_calls and available_functions:
for call_data in function_calls.values():

View File

@@ -428,12 +428,6 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name
@@ -731,15 +725,6 @@ class OpenAICompletion(BaseLLM):
choice: Choice = response.choices[0]
message = choice.message
# If there are tool_calls but no available_functions, return the tool_calls
# This allows the caller (e.g., executor) to handle tool execution
if message.tool_calls and not available_functions:
print("--------------------------------")
print("lorenze tool_calls", list(message.tool_calls))
print("--------------------------------")
return list(message.tool_calls)
# If there are tool_calls and available_functions, execute the tools
if message.tool_calls and available_functions:
tool_call = message.tool_calls[0]
function_name = tool_call.function.name

View File

@@ -11,9 +11,6 @@
"role_playing": "You are {role}. {backstory}\nYour personal goal is: {goal}",
"tools": "\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"no_tools": "\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"native_tools": "\nUse available tools to gather information and complete your task.",
"native_task": "\nCurrent Task: {input}\n\nThis is VERY important to you, your job depends on it!",
"post_tool_reasoning": "PAUSE and THINK before responding.\n\nInternally consider (DO NOT output these steps):\n- What key insights did the tool provide?\n- Have I fulfilled ALL requirements from my original instructions (e.g., minimum tool calls, specific sources)?\n- Do I have enough information to fully answer the task?\n\nIF you have NOT met all requirements or need more information: Call another tool now.\n\nIF you have met all requirements and have sufficient information: Provide ONLY your final answer in the format specified by the task's expected output. Do NOT include reasoning steps, analysis sections, or meta-commentary. Just deliver the answer.",
"format": "I MUST either use a tool (use one at time) OR give my best final answer not both at the same time. When responding, I must use the following format:\n\n```\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action, dictionary enclosed in curly braces\nObservation: the result of the action\n```\nThis Thought/Action/Action Input/Result can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```",

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
import json
import re
@@ -54,6 +55,23 @@ console = Console()
_MULTIPLE_NEWLINES: Final[re.Pattern[str]] = re.compile(r"\n+")
def is_inside_event_loop() -> bool:
"""Check if code is currently running inside an asyncio event loop.
This is used to detect when code is being called from within an async context
(e.g., inside a Flow). In such cases, callers should return a coroutine
instead of executing synchronously to avoid nested event loop errors.
Returns:
True if inside a running event loop, False otherwise.
"""
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False
def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
"""Parse tools to be used for the task.
@@ -108,65 +126,6 @@ def render_text_description_and_args(
return "\n".join(tool_strings)
def convert_tools_to_openai_schema(
tools: Sequence[BaseTool | CrewStructuredTool],
) -> tuple[list[dict[str, Any]], dict[str, Callable[..., Any]]]:
"""Convert CrewAI tools to OpenAI function calling format.
This function converts CrewAI BaseTool and CrewStructuredTool objects
into the OpenAI-compatible tool schema format that can be passed to
LLM providers for native function calling.
Args:
tools: List of CrewAI tool objects to convert.
Returns:
Tuple containing:
- List of OpenAI-format tool schema dictionaries
- Dict mapping tool names to their callable run() methods
Example:
>>> tools = [CalculatorTool(), SearchTool()]
>>> schemas, functions = convert_tools_to_openai_schema(tools)
>>> # schemas can be passed to llm.call(tools=schemas)
>>> # functions can be passed to llm.call(available_functions=functions)
"""
openai_tools: list[dict[str, Any]] = []
available_functions: dict[str, Callable[..., Any]] = {}
for tool in tools:
# Get the JSON schema for tool parameters
parameters: dict[str, Any] = {}
if hasattr(tool, "args_schema") and tool.args_schema is not None:
try:
parameters = tool.args_schema.model_json_schema()
# Remove title and description from schema root as they're redundant
parameters.pop("title", None)
parameters.pop("description", None)
except Exception:
parameters = {}
# Extract original description from formatted description
# BaseTool formats description as "Tool Name: ...\nTool Arguments: ...\nTool Description: {original}"
description = tool.description
if "Tool Description:" in description:
# Extract the original description after "Tool Description:"
description = description.split("Tool Description:")[-1].strip()
schema: dict[str, Any] = {
"type": "function",
"function": {
"name": tool.name,
"description": description,
"parameters": parameters,
},
}
openai_tools.append(schema)
available_functions[tool.name] = tool.run
return openai_tools, available_functions
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached.
@@ -293,13 +252,11 @@ def get_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | LiteAgent | None = None,
) -> str | Any:
) -> str:
"""Call the LLM and return the response, handling any invalid responses.
Args:
@@ -307,16 +264,13 @@ def get_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string, or tool call results if
native function calling is used.
The response from the LLM as a string.
Raises:
Exception: If an error occurs.
@@ -331,9 +285,7 @@ def get_llm_response(
try:
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,
@@ -355,13 +307,11 @@ async def aget_llm_response(
messages: list[LLMMessage],
callbacks: list[TokenCalcHandler],
printer: Printer,
tools: list[dict[str, Any]] | None = None,
available_functions: dict[str, Callable[..., Any]] | None = None,
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | None = None,
) -> str | Any:
) -> str:
"""Call the LLM asynchronously and return the response.
Args:
@@ -369,16 +319,13 @@ async def aget_llm_response(
messages: The messages to send to the LLM.
callbacks: List of callbacks for the LLM call.
printer: Printer instance for output.
tools: Optional list of tool schemas for native function calling.
available_functions: Optional dict mapping function names to callables.
from_task: Optional task context for the LLM call.
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string, or tool call results if
native function calling is used.
The response from the LLM as a string.
Raises:
Exception: If an error occurs.
@@ -392,9 +339,7 @@ async def aget_llm_response(
try:
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,

View File

@@ -22,9 +22,7 @@ class SystemPromptResult(StandardPromptResult):
user: Annotated[str, "The user prompt component"]
COMPONENTS = Literal[
"role_playing", "tools", "no_tools", "native_tools", "task", "native_task"
]
COMPONENTS = Literal["role_playing", "tools", "no_tools", "task"]
class Prompts(BaseModel):
@@ -38,10 +36,6 @@ class Prompts(BaseModel):
has_tools: bool = Field(
default=False, description="Indicates if the agent has access to tools"
)
use_native_tool_calling: bool = Field(
default=False,
description="Whether to use native function calling instead of ReAct format",
)
system_template: str | None = Field(
default=None, description="Custom system prompt template"
)
@@ -64,24 +58,12 @@ class Prompts(BaseModel):
A dictionary containing the constructed prompt(s).
"""
slices: list[COMPONENTS] = ["role_playing"]
# When using native tool calling with tools, use native_tools instructions
# When using ReAct pattern with tools, use tools instructions
# When no tools are available, use no_tools instructions
if self.has_tools:
if self.use_native_tool_calling:
slices.append("native_tools")
else:
slices.append("tools")
slices.append("tools")
else:
slices.append("no_tools")
system: str = self._build_prompt(slices)
# Use native_task for native tool calling (no "Thought:" prompt)
# Use task for ReAct pattern (includes "Thought:" prompt)
task_slice: COMPONENTS = (
"native_task" if self.use_native_tool_calling else "task"
)
slices.append(task_slice)
slices.append("task")
if (
not self.system_template
@@ -90,7 +72,7 @@ class Prompts(BaseModel):
):
return SystemPromptResult(
system=system,
user=self._build_prompt([task_slice]),
user=self._build_prompt(["task"]),
prompt=self._build_prompt(slices),
)
return StandardPromptResult(

View File

@@ -1,4 +1,4 @@
"""Unit tests for CrewAgentExecutorFlow.
"""Unit tests for AgentExecutor.
Tests the Flow-based agent executor implementation including state management,
flow methods, routing logic, and error handling.
@@ -8,9 +8,9 @@ from unittest.mock import Mock, patch
import pytest
from crewai.experimental.crew_agent_executor_flow import (
from crewai.experimental.agent_executor import (
AgentReActState,
CrewAgentExecutorFlow,
AgentExecutor,
)
from crewai.agents.parser import AgentAction, AgentFinish
@@ -43,8 +43,8 @@ class TestAgentReActState:
assert state.ask_for_human_input is True
class TestCrewAgentExecutorFlow:
"""Test CrewAgentExecutorFlow class."""
class TestAgentExecutor:
"""Test AgentExecutor class."""
@pytest.fixture
def mock_dependencies(self):
@@ -87,8 +87,8 @@ class TestCrewAgentExecutorFlow:
}
def test_executor_initialization(self, mock_dependencies):
"""Test CrewAgentExecutorFlow initialization."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
"""Test AgentExecutor initialization."""
executor = AgentExecutor(**mock_dependencies)
assert executor.llm == mock_dependencies["llm"]
assert executor.task == mock_dependencies["task"]
@@ -100,9 +100,9 @@ class TestCrewAgentExecutorFlow:
def test_initialize_reasoning(self, mock_dependencies):
"""Test flow entry point."""
with patch.object(
CrewAgentExecutorFlow, "_show_start_logs"
AgentExecutor, "_show_start_logs"
) as mock_show_start:
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.initialize_reasoning()
assert result == "initialized"
@@ -110,7 +110,7 @@ class TestCrewAgentExecutorFlow:
def test_check_max_iterations_not_reached(self, mock_dependencies):
"""Test routing when iterations < max."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.iterations = 5
result = executor.check_max_iterations()
@@ -118,7 +118,7 @@ class TestCrewAgentExecutorFlow:
def test_check_max_iterations_reached(self, mock_dependencies):
"""Test routing when iterations >= max."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.iterations = 10
result = executor.check_max_iterations()
@@ -126,7 +126,7 @@ class TestCrewAgentExecutorFlow:
def test_route_by_answer_type_action(self, mock_dependencies):
"""Test routing for AgentAction."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
@@ -136,7 +136,7 @@ class TestCrewAgentExecutorFlow:
def test_route_by_answer_type_finish(self, mock_dependencies):
"""Test routing for AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thoughts", output="Final answer", text="complete"
)
@@ -146,7 +146,7 @@ class TestCrewAgentExecutorFlow:
def test_continue_iteration(self, mock_dependencies):
"""Test iteration continuation."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.continue_iteration()
@@ -154,8 +154,8 @@ class TestCrewAgentExecutorFlow:
def test_finalize_success(self, mock_dependencies):
"""Test finalize with valid AgentFinish."""
with patch.object(CrewAgentExecutorFlow, "_show_logs") as mock_show_logs:
executor = CrewAgentExecutorFlow(**mock_dependencies)
with patch.object(AgentExecutor, "_show_logs") as mock_show_logs:
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thinking", output="Done", text="complete"
)
@@ -168,7 +168,7 @@ class TestCrewAgentExecutorFlow:
def test_finalize_failure(self, mock_dependencies):
"""Test finalize skips when given AgentAction instead of AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
@@ -181,7 +181,7 @@ class TestCrewAgentExecutorFlow:
def test_format_prompt(self, mock_dependencies):
"""Test prompt formatting."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
inputs = {"input": "test input", "tool_names": "tool1, tool2", "tools": "desc"}
result = executor._format_prompt("Prompt {input} {tool_names} {tools}", inputs)
@@ -192,18 +192,18 @@ class TestCrewAgentExecutorFlow:
def test_is_training_mode_false(self, mock_dependencies):
"""Test training mode detection when not in training."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor._is_training_mode() is False
def test_is_training_mode_true(self, mock_dependencies):
"""Test training mode detection when in training."""
mock_dependencies["crew"]._train = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor._is_training_mode() is True
def test_append_message_to_state(self, mock_dependencies):
"""Test message appending to state."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
initial_count = len(executor.state.messages)
executor._append_message_to_state("test message")
@@ -216,7 +216,7 @@ class TestCrewAgentExecutorFlow:
callback = Mock()
mock_dependencies["step_callback"] = callback
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
answer = AgentFinish(thought="thinking", output="test", text="final")
executor._invoke_step_callback(answer)
@@ -226,14 +226,14 @@ class TestCrewAgentExecutorFlow:
def test_invoke_step_callback_none(self, mock_dependencies):
"""Test step callback when none provided."""
mock_dependencies["step_callback"] = None
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
# Should not raise error
executor._invoke_step_callback(
AgentFinish(thought="thinking", output="test", text="final")
)
@patch("crewai.experimental.crew_agent_executor_flow.handle_output_parser_exception")
@patch("crewai.experimental.agent_executor.handle_output_parser_exception")
def test_recover_from_parser_error(
self, mock_handle_exception, mock_dependencies
):
@@ -242,7 +242,7 @@ class TestCrewAgentExecutorFlow:
mock_handle_exception.return_value = None
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor._last_parser_error = OutputParserError("test error")
initial_iterations = executor.state.iterations
@@ -252,12 +252,12 @@ class TestCrewAgentExecutorFlow:
assert executor.state.iterations == initial_iterations + 1
mock_handle_exception.assert_called_once()
@patch("crewai.experimental.crew_agent_executor_flow.handle_context_length")
@patch("crewai.experimental.agent_executor.handle_context_length")
def test_recover_from_context_length(
self, mock_handle_context, mock_dependencies
):
"""Test recovery from context length error."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor._last_context_error = Exception("context too long")
initial_iterations = executor.state.iterations
@@ -270,16 +270,16 @@ class TestCrewAgentExecutorFlow:
def test_use_stop_words_property(self, mock_dependencies):
"""Test use_stop_words property."""
mock_dependencies["llm"].supports_stop_words.return_value = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor.use_stop_words is True
mock_dependencies["llm"].supports_stop_words.return_value = False
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor.use_stop_words is False
def test_compatibility_properties(self, mock_dependencies):
"""Test compatibility properties for mixin."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.messages = [{"role": "user", "content": "test"}]
executor.state.iterations = 5
@@ -321,8 +321,8 @@ class TestFlowErrorHandling:
"tools_handler": Mock(),
}
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
@patch("crewai.experimental.agent_executor.get_llm_response")
@patch("crewai.experimental.agent_executor.enforce_rpm_limit")
def test_call_llm_parser_error(
self, mock_enforce_rpm, mock_get_llm, mock_dependencies
):
@@ -332,15 +332,15 @@ class TestFlowErrorHandling:
mock_enforce_rpm.return_value = None
mock_get_llm.side_effect = OutputParserError("parse failed")
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "parser_error"
assert executor._last_parser_error is not None
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
@patch("crewai.experimental.crew_agent_executor_flow.is_context_length_exceeded")
@patch("crewai.experimental.agent_executor.get_llm_response")
@patch("crewai.experimental.agent_executor.enforce_rpm_limit")
@patch("crewai.experimental.agent_executor.is_context_length_exceeded")
def test_call_llm_context_error(
self,
mock_is_context_exceeded,
@@ -353,7 +353,7 @@ class TestFlowErrorHandling:
mock_get_llm.side_effect = Exception("context length")
mock_is_context_exceeded.return_value = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "context_error"
@@ -397,10 +397,10 @@ class TestFlowInvoke:
"tools_handler": Mock(),
}
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
@patch.object(AgentExecutor, "kickoff")
@patch.object(AgentExecutor, "_create_short_term_memory")
@patch.object(AgentExecutor, "_create_long_term_memory")
@patch.object(AgentExecutor, "_create_external_memory")
def test_invoke_success(
self,
mock_external_memory,
@@ -410,7 +410,7 @@ class TestFlowInvoke:
mock_dependencies,
):
"""Test successful invoke without human feedback."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
# Mock kickoff to set the final answer in state
def mock_kickoff_side_effect():
@@ -429,10 +429,10 @@ class TestFlowInvoke:
mock_long_term_memory.assert_called_once()
mock_external_memory.assert_called_once()
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(AgentExecutor, "kickoff")
def test_invoke_failure_no_agent_finish(self, mock_kickoff, mock_dependencies):
"""Test invoke fails without AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="test", tool_input="test", text="action text"
)
@@ -442,10 +442,10 @@ class TestFlowInvoke:
with pytest.raises(RuntimeError, match="without reaching a final answer"):
executor.invoke(inputs)
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
@patch.object(AgentExecutor, "kickoff")
@patch.object(AgentExecutor, "_create_short_term_memory")
@patch.object(AgentExecutor, "_create_long_term_memory")
@patch.object(AgentExecutor, "_create_external_memory")
def test_invoke_with_system_prompt(
self,
mock_external_memory,
@@ -459,7 +459,7 @@ class TestFlowInvoke:
"system": "System: {input}",
"user": "User: {input} {tool_names} {tools}",
}
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
def mock_kickoff_side_effect():
executor.state.current_answer = AgentFinish(

View File

@@ -72,62 +72,53 @@ class ResearchResult(BaseModel):
@pytest.mark.vcr()
@pytest.mark.parametrize("verbose", [True, False])
def test_lite_agent_created_with_correct_parameters(monkeypatch, verbose):
"""Test that LiteAgent is created with the correct parameters when Agent.kickoff() is called."""
def test_agent_kickoff_preserves_parameters(verbose):
"""Test that Agent.kickoff() uses the correct parameters from the Agent."""
# Create a test agent with specific parameters
llm = LLM(model="gpt-4o-mini")
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Final Answer: Test response"
mock_llm.stop = []
from crewai.types.usage_metrics import UsageMetrics
mock_usage_metrics = UsageMetrics(
total_tokens=100,
prompt_tokens=50,
completion_tokens=50,
cached_prompt_tokens=0,
successful_requests=1,
)
mock_llm.get_token_usage_summary.return_value = mock_usage_metrics
custom_tools = [WebSearchTool(), CalculatorTool()]
max_iter = 10
max_execution_time = 300
agent = Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
llm=llm,
llm=mock_llm,
tools=custom_tools,
max_iter=max_iter,
max_execution_time=max_execution_time,
verbose=verbose,
)
# Create a mock to capture the created LiteAgent
created_lite_agent = None
original_lite_agent = LiteAgent
# Call kickoff and verify it works
result = agent.kickoff("Test query")
# Define a mock LiteAgent class that captures its arguments
class MockLiteAgent(original_lite_agent):
def __init__(self, **kwargs):
nonlocal created_lite_agent
created_lite_agent = kwargs
super().__init__(**kwargs)
# Verify the agent was configured correctly
assert agent.role == "Test Agent"
assert agent.goal == "Test Goal"
assert agent.backstory == "Test Backstory"
assert len(agent.tools) == 2
assert isinstance(agent.tools[0], WebSearchTool)
assert isinstance(agent.tools[1], CalculatorTool)
assert agent.max_iter == max_iter
assert agent.verbose == verbose
# Patch the LiteAgent class
monkeypatch.setattr("crewai.agent.core.LiteAgent", MockLiteAgent)
# Call kickoff to create the LiteAgent
agent.kickoff("Test query")
# Verify all parameters were passed correctly
assert created_lite_agent is not None
assert created_lite_agent["role"] == "Test Agent"
assert created_lite_agent["goal"] == "Test Goal"
assert created_lite_agent["backstory"] == "Test Backstory"
assert created_lite_agent["llm"] == llm
assert len(created_lite_agent["tools"]) == 2
assert isinstance(created_lite_agent["tools"][0], WebSearchTool)
assert isinstance(created_lite_agent["tools"][1], CalculatorTool)
assert created_lite_agent["max_iterations"] == max_iter
assert created_lite_agent["max_execution_time"] == max_execution_time
assert created_lite_agent["verbose"] == verbose
assert created_lite_agent["response_format"] is None
# Test with a response_format
class TestResponse(BaseModel):
test_field: str
agent.kickoff("Test query", response_format=TestResponse)
assert created_lite_agent["response_format"] == TestResponse
# Verify kickoff returned a result
assert result is not None
assert result.raw is not None
@pytest.mark.vcr()
@@ -310,7 +301,8 @@ def verify_agent_parent_flow(result, agent, flow):
def test_sets_parent_flow_when_inside_flow():
captured_agent = None
"""Test that an Agent can be created and executed inside a Flow context."""
captured_event = None
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Test response"
@@ -343,15 +335,17 @@ def test_sets_parent_flow_when_inside_flow():
event_received = threading.Event()
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def capture_agent(source, event):
nonlocal captured_agent
captured_agent = source
def capture_event(source, event):
nonlocal captured_event
captured_event = event
event_received.set()
flow.kickoff()
result = flow.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for agent execution event"
assert captured_agent.parent_flow is flow
assert captured_event is not None
assert captured_event.agent_info["role"] == "Test Agent"
assert result is not None
@pytest.mark.vcr()
@@ -373,16 +367,14 @@ def test_guardrail_is_called_using_string():
@crewai_event_bus.on(LLMGuardrailStartedEvent)
def capture_guardrail_started(source, event):
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
assert isinstance(source, Agent)
with condition:
guardrail_events["started"].append(event)
condition.notify()
@crewai_event_bus.on(LLMGuardrailCompletedEvent)
def capture_guardrail_completed(source, event):
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
assert isinstance(source, Agent)
with condition:
guardrail_events["completed"].append(event)
condition.notify()
@@ -683,3 +675,151 @@ def test_agent_kickoff_with_mcp_tools(mock_get_mcp_tools):
# Verify MCP tools were retrieved
mock_get_mcp_tools.assert_called_once_with("https://mcp.exa.ai/mcp?api_key=test_exa_key&profile=research")
# ============================================================================
# Tests for LiteAgent inside Flow (magic auto-async pattern)
# ============================================================================
from crewai.flow.flow import listen
@pytest.mark.vcr()
def test_lite_agent_inside_flow_sync():
"""Test that LiteAgent.kickoff() works magically inside a Flow.
This tests the "magic auto-async" pattern where calling agent.kickoff()
from within a Flow automatically detects the event loop and returns a
coroutine that the Flow framework awaits. Users don't need to use async/await.
"""
# Track execution
execution_log = []
class TestFlow(Flow):
@start()
def run_agent(self):
execution_log.append("flow_started")
agent = Agent(
role="Test Agent",
goal="Answer questions",
backstory="A helpful test assistant",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
# Magic: just call kickoff() normally - it auto-detects Flow context
result = agent.kickoff(messages="What is 2+2? Reply with just the number.")
execution_log.append("agent_completed")
return result
flow = TestFlow()
result = flow.kickoff()
# Verify the flow executed successfully
assert "flow_started" in execution_log
assert "agent_completed" in execution_log
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_inside_flow_with_tools():
"""Test that LiteAgent with tools works correctly inside a Flow."""
class TestFlow(Flow):
@start()
def run_agent_with_tools(self):
agent = Agent(
role="Calculator Agent",
goal="Perform calculations",
backstory="A math expert",
llm=LLM(model="gpt-4o-mini"),
tools=[CalculatorTool()],
verbose=False,
)
result = agent.kickoff(messages="Calculate 10 * 5")
return result
flow = TestFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
assert result.raw is not None
@pytest.mark.vcr()
def test_multiple_agents_in_same_flow():
"""Test that multiple LiteAgents can run sequentially in the same Flow."""
class MultiAgentFlow(Flow):
@start()
def first_step(self):
agent1 = Agent(
role="First Agent",
goal="Greet users",
backstory="A friendly greeter",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
return agent1.kickoff(messages="Say hello")
@listen(first_step)
def second_step(self, first_result):
agent2 = Agent(
role="Second Agent",
goal="Say goodbye",
backstory="A polite farewell agent",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
return agent2.kickoff(messages="Say goodbye")
flow = MultiAgentFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_kickoff_async_inside_flow():
"""Test that Agent.kickoff_async() works correctly from async Flow methods."""
class AsyncAgentFlow(Flow):
@start()
async def async_agent_step(self):
agent = Agent(
role="Async Test Agent",
goal="Answer questions asynchronously",
backstory="An async helper",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
result = await agent.kickoff_async(messages="What is 3+3?")
return result
flow = AsyncAgentFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_standalone_still_works():
"""Test that LiteAgent.kickoff() still works normally outside of a Flow.
This verifies that the magic auto-async pattern doesn't break standalone usage
where there's no event loop running.
"""
agent = Agent(
role="Standalone Agent",
goal="Answer questions",
backstory="A helpful assistant",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
# This should work normally - no Flow, no event loop
result = agent.kickoff(messages="What is 5+5? Reply with just the number.")
assert result is not None
assert isinstance(result, LiteAgentOutput)
assert result.raw is not None

View File

@@ -1,479 +0,0 @@
"""Integration tests for native tool calling functionality.
These tests verify that agents can use native function calling
when the LLM supports it, across multiple providers.
"""
from __future__ import annotations
import os
from typing import Any
from unittest.mock import patch, MagicMock
import pytest
from pydantic import BaseModel, Field
from crewai import Agent, Crew, Task
from crewai.llm import LLM
from crewai.tools.base_tool import BaseTool
# Check for optional provider availability
try:
import anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
try:
import google.genai
HAS_GOOGLE_GENAI = True
except ImportError:
HAS_GOOGLE_GENAI = False
try:
import boto3
HAS_BOTO3 = True
except ImportError:
HAS_BOTO3 = False
class CalculatorInput(BaseModel):
"""Input schema for calculator tool."""
expression: str = Field(description="Mathematical expression to evaluate")
class CalculatorTool(BaseTool):
"""A calculator tool that performs mathematical calculations."""
name: str = "calculator"
description: str = "Perform mathematical calculations. Use this for any math operations."
args_schema: type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
"""Execute the calculation."""
try:
# Safe evaluation for basic math
result = eval(expression) # noqa: S307
return f"The result of {expression} is {result}"
except Exception as e:
return f"Error calculating {expression}: {e}"
class WeatherInput(BaseModel):
"""Input schema for weather tool."""
location: str = Field(description="City name to get weather for")
class WeatherTool(BaseTool):
"""A mock weather tool for testing."""
name: str = "get_weather"
description: str = "Get the current weather for a location"
args_schema: type[BaseModel] = WeatherInput
def _run(self, location: str) -> str:
"""Get weather (mock implementation)."""
return f"The weather in {location} is sunny with a temperature of 72°F"
@pytest.fixture
def calculator_tool() -> CalculatorTool:
"""Create a calculator tool for testing."""
return CalculatorTool()
@pytest.fixture
def weather_tool() -> WeatherTool:
"""Create a weather tool for testing."""
return WeatherTool()
# =============================================================================
# OpenAI Provider Tests
# =============================================================================
class TestOpenAINativeToolCalling:
"""Tests for native tool calling with OpenAI models."""
@pytest.mark.vcr()
def test_openai_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test OpenAI agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="gpt-4o-mini"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
assert "120" in str(result.raw)
def test_openai_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test OpenAI agent kickoff with mocked LLM call."""
llm = LLM(model="gpt-4o-mini")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Anthropic Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_ANTHROPIC, reason="anthropic package not installed")
class TestAnthropicNativeToolCalling:
"""Tests for native tool calling with Anthropic models."""
@pytest.fixture(autouse=True)
def mock_anthropic_api_key(self):
"""Mock ANTHROPIC_API_KEY for tests."""
if "ANTHROPIC_API_KEY" not in os.environ:
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}):
yield
else:
yield
@pytest.mark.vcr()
def test_anthropic_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Anthropic agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="anthropic/claude-3-5-haiku-20241022"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
def test_anthropic_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Anthropic agent kickoff with mocked LLM call."""
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Google/Gemini Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_GOOGLE_GENAI, reason="google-genai package not installed")
class TestGeminiNativeToolCalling:
"""Tests for native tool calling with Gemini models."""
@pytest.fixture(autouse=True)
def mock_google_api_key(self):
"""Mock GOOGLE_API_KEY for tests."""
with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-key"}):
yield
@pytest.mark.vcr()
def test_gemini_agent_with_native_tool_calling(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Gemini agent can use native tool calling."""
agent = Agent(
role="Math Assistant",
goal="Help users with mathematical calculations",
backstory="You are a helpful math assistant.",
tools=[calculator_tool],
llm=LLM(model="gemini/gemini-2.0-flash-001"),
verbose=False,
max_iter=3,
)
task = Task(
description="Calculate what is 15 * 8",
expected_output="The result of the calculation",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
def test_gemini_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Gemini agent kickoff with mocked LLM call."""
llm = LLM(model="gemini/gemini-2.0-flash-001")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Azure Provider Tests
# =============================================================================
class TestAzureNativeToolCalling:
"""Tests for native tool calling with Azure OpenAI models."""
@pytest.fixture(autouse=True)
def mock_azure_env(self):
"""Mock Azure environment variables for tests."""
env_vars = {
"AZURE_API_KEY": "test-key",
"AZURE_API_BASE": "https://test.openai.azure.com",
"AZURE_API_VERSION": "2024-02-15-preview",
}
with patch.dict(os.environ, env_vars):
yield
def test_azure_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Azure agent kickoff with mocked LLM call."""
llm = LLM(
model="azure/gpt-4o-mini",
api_key="test-key",
base_url="https://test.openai.azure.com",
)
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Bedrock Provider Tests
# =============================================================================
@pytest.mark.skipif(not HAS_BOTO3, reason="boto3 package not installed")
class TestBedrockNativeToolCalling:
"""Tests for native tool calling with AWS Bedrock models."""
@pytest.fixture(autouse=True)
def mock_aws_env(self):
"""Mock AWS environment variables for tests."""
env_vars = {
"AWS_ACCESS_KEY_ID": "test-key",
"AWS_SECRET_ACCESS_KEY": "test-secret",
"AWS_REGION": "us-east-1",
}
with patch.dict(os.environ, env_vars):
yield
def test_bedrock_agent_kickoff_with_tools_mocked(
self, calculator_tool: CalculatorTool
) -> None:
"""Test Bedrock agent kickoff with mocked LLM call."""
llm = LLM(model="bedrock/anthropic.claude-3-haiku-20240307-v1:0")
with patch.object(llm, "call", return_value="The answer is 120.") as mock_call:
agent = Agent(
role="Math Assistant",
goal="Calculate math",
backstory="You calculate.",
tools=[calculator_tool],
llm=llm,
verbose=False,
)
task = Task(
description="Calculate 15 * 8",
expected_output="Result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert mock_call.called
assert result is not None
# =============================================================================
# Cross-Provider Native Tool Calling Behavior Tests
# =============================================================================
class TestNativeToolCallingBehavior:
"""Tests for native tool calling behavior across providers."""
def test_supports_function_calling_check(self) -> None:
"""Test that supports_function_calling() is properly checked."""
# OpenAI should support function calling
openai_llm = LLM(model="gpt-4o-mini")
assert hasattr(openai_llm, "supports_function_calling")
assert openai_llm.supports_function_calling() is True
@pytest.mark.skipif(not HAS_ANTHROPIC, reason="anthropic package not installed")
def test_anthropic_supports_function_calling(self) -> None:
"""Test that Anthropic models support function calling."""
with patch.dict(os.environ, {"ANTHROPIC_API_KEY": "test-key"}):
llm = LLM(model="anthropic/claude-3-5-haiku-20241022")
assert hasattr(llm, "supports_function_calling")
assert llm.supports_function_calling() is True
@pytest.mark.skipif(not HAS_GOOGLE_GENAI, reason="google-genai package not installed")
def test_gemini_supports_function_calling(self) -> None:
"""Test that Gemini models support function calling."""
# with patch.dict(os.environ, {"GOOGLE_API_KEY": "test-key"}):
print("GOOGLE_API_KEY", os.getenv("GOOGLE_API_KEY"))
llm = LLM(model="gemini/gemini-2.5-flash")
assert hasattr(llm, "supports_function_calling")
# Gemini uses supports_tools property
assert llm.supports_function_calling() is True
# =============================================================================
# Token Usage Tests
# =============================================================================
class TestNativeToolCallingTokenUsage:
"""Tests for token usage with native tool calling."""
@pytest.mark.vcr()
def test_openai_native_tool_calling_token_usage(
self, calculator_tool: CalculatorTool
) -> None:
"""Test token usage tracking with OpenAI native tool calling."""
agent = Agent(
role="Calculator",
goal="Perform calculations efficiently",
backstory="You calculate things.",
tools=[calculator_tool],
llm=LLM(model="gpt-4o-mini"),
verbose=False,
max_iter=3,
)
task = Task(
description="What is 100 / 4?",
expected_output="The result",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.token_usage is not None
assert result.token_usage.total_tokens > 0
assert result.token_usage.successful_requests >= 1
print(f"\n[OPENAI NATIVE TOOL CALLING TOKEN USAGE]")
print(f" Prompt tokens: {result.token_usage.prompt_tokens}")
print(f" Completion tokens: {result.token_usage.completion_tokens}")
print(f" Total tokens: {result.token_usage.total_tokens}")

View File

@@ -0,0 +1,119 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. A helpful
test assistant\nYour personal goal is: Answer questions\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 2+2? Reply with just the number.\n\nBegin! This is VERY important to
you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '673'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7b0HjL79y39EkUcMLrRhPFe3XGj\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444914,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: 4\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 136,\n \"completion_tokens\": 13,\n
\ \"total_tokens\": 149,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_8bbc38b4db\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:55 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '857'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '341'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '358'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,255 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Agent. A math
expert\nYour personal goal is: Perform calculations\nYou ONLY have access to
the following tools, and should NEVER make up tools that are not listed here:\n\nTool
Name: calculate\nTool Arguments: {\n \"properties\": {\n \"expression\":
{\n \"title\": \"Expression\",\n \"type\": \"string\"\n }\n },\n \"required\":
[\n \"expression\"\n ],\n \"title\": \"CalculatorToolSchema\",\n \"type\":
\"object\",\n \"additionalProperties\": false\n}\nTool Description: Calculate
the result of a mathematical expression.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate], just the name, exactly as
it''s written.\nAction Input: the input to the action, just a simple JSON object,
enclosed in curly braces, using \" to wrap keys and values.\nObservation: the
result of the action\n```\n\nOnce all necessary information is gathered, return
the following format:\n\n```\nThought: I now know the final answer\nFinal Answer:
the final answer to the original input question\n```"},{"role":"user","content":"\nCurrent
Task: Calculate 10 * 5\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1403'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7avghVPSpszLmlbHpwDQlWDoD6O\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444909,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I need to calculate the expression
10 * 5.\\nAction: calculate\\nAction Input: {\\\"expression\\\":\\\"10 * 5\\\"}\\nObservation:
50\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 291,\n \"completion_tokens\": 33,\n
\ \"total_tokens\": 324,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:49 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '939'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '579'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '598'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Agent. A math
expert\nYour personal goal is: Perform calculations\nYou ONLY have access to
the following tools, and should NEVER make up tools that are not listed here:\n\nTool
Name: calculate\nTool Arguments: {\n \"properties\": {\n \"expression\":
{\n \"title\": \"Expression\",\n \"type\": \"string\"\n }\n },\n \"required\":
[\n \"expression\"\n ],\n \"title\": \"CalculatorToolSchema\",\n \"type\":
\"object\",\n \"additionalProperties\": false\n}\nTool Description: Calculate
the result of a mathematical expression.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate], just the name, exactly as
it''s written.\nAction Input: the input to the action, just a simple JSON object,
enclosed in curly braces, using \" to wrap keys and values.\nObservation: the
result of the action\n```\n\nOnce all necessary information is gathered, return
the following format:\n\n```\nThought: I now know the final answer\nFinal Answer:
the final answer to the original input question\n```"},{"role":"user","content":"\nCurrent
Task: Calculate 10 * 5\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"},{"role":"assistant","content":"Thought:
I need to calculate the expression 10 * 5.\nAction: calculate\nAction Input:
{\"expression\":\"10 * 5\"}\nObservation: The result of 10 * 5 is 50"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1591'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7avDhDZCLvv8v2dh8ZQRrLdci6A\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444909,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I now know the final answer.\\nFinal
Answer: 50\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 337,\n \"completion_tokens\": 14,\n
\ \"total_tokens\": 351,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:50 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '864'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '429'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '457'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,119 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Async Test Agent. An async
helper\nYour personal goal is: Answer questions asynchronously\nTo give my best
complete final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 3+3?\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '657'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7atOGxtc4y3oYNI62WiQ0Vogsdv\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444907,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: The sum of 3 + 3 is 6. Therefore, the outcome is that if you add three
and three together, you will arrive at the total of six.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
131,\n \"completion_tokens\": 46,\n \"total_tokens\": 177,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:48 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '983'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '944'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1192'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,119 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Standalone Agent. A helpful
assistant\nYour personal goal is: Answer questions\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 5+5? Reply with just the number.\n\nBegin! This is VERY important to
you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '674'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7azhPwUHQ0p5tdhxSAmLPoE8UgC\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444913,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: 10\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 136,\n \"completion_tokens\": 13,\n
\ \"total_tokens\": 149,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:54 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '858'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '455'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '583'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,239 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are First Agent. A friendly
greeter\nYour personal goal is: Greet users\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
hello\n\nBegin! This is VERY important to you, use the tools available and give
your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '632'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CyRKzgODZ9yn3F9OkaXsscLk2Ln3N\",\n \"object\":
\"chat.completion\",\n \"created\": 1768520801,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: Hello! Welcome! I'm so glad to see you here. If you need any assistance
or have any questions, feel free to ask. Have a wonderful day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
127,\n \"completion_tokens\": 43,\n \"total_tokens\": 170,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 23:46:42 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '990'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '880'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1160'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Second Agent. A polite
farewell agent\nYour personal goal is: Say goodbye\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
Say goodbye\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '640'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CyRL1Ua2PkK5xXPp3KeF0AnGAk3JP\",\n \"object\":
\"chat.completion\",\n \"created\": 1768520803,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: As we reach the end of our conversation, I want to express my gratitude
for the time we've shared. It's been a pleasure assisting you, and I hope
you found our interaction helpful and enjoyable. Remember, whenever you need
assistance, I'm just a message away. Wishing you all the best in your future
endeavors. Goodbye and take care!\",\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 126,\n \"completion_tokens\":
79,\n \"total_tokens\": 205,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 23:46:44 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1189'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1363'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1605'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

File diff suppressed because one or more lines are too long

View File

@@ -1,456 +1,528 @@
interactions:
- request:
body: '{"trace_id": "00000000-0000-0000-0000-000000000000", "execution_type": "crew", "user_identifier": null, "execution_context": {"crew_fingerprint": null, "crew_name": "Unknown Crew", "flow_name": null, "crewai_version": "1.3.0", "privacy_level": "standard"}, "execution_metadata": {"expected_duration_estimate": 300, "agent_count": 0, "task_count": 0, "flow_method_count": 0, "execution_started_at": "2025-11-05T22:19:56.074812+00:00"}}'
body: "{\"messages\":[{\"role\":\"system\",\"content\":\"You are Guardrail Agent.
You are a expert at validating the output of a task. By providing effective
feedback if the output is not valid.\\nYour personal goal is: Validate the output
of the task\\nTo give my best complete final answer to the task respond using
the exact following format:\\n\\nThought: I now can give a great answer\\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\\n\\nI MUST use these formats, my job depends
on it!\"},{\"role\":\"user\",\"content\":\"\\nCurrent Task: \\n Ensure
the following task result complies with the given guardrail.\\n\\n Task
result:\\n \\n Lorem Ipsum is simply dummy text of the printing
and typesetting industry. Lorem Ipsum has been the industry's standard dummy
text ever\\n \\n\\n Guardrail:\\n Ensure the result has
less than 10 words\\n\\n Your task:\\n - Confirm if the Task result
complies with the guardrail.\\n - If not, provide clear feedback explaining
what is wrong (e.g., by how much it violates the rule, or what specific part
fails).\\n - Focus only on identifying issues \u2014 do not propose corrections.\\n
\ - If the Task result complies with the guardrail, saying that is valid\\n
\ \\n\\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\\n\\nThought:\"}],\"model\":\"gpt-4o\"}"
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '434'
Content-Type:
- application/json
User-Agent:
- CrewAI-CLI/1.3.0
X-Crewai-Version:
- 1.3.0
method: POST
uri: https://app.crewai.com/crewai_plus/api/v1/tracing/batches
response:
body:
string: '{"error":"bad_credentials","message":"Bad credentials"}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 05 Nov 2025 22:19:56 GMT
cache-control:
- no-store
content-security-policy:
- 'default-src ''self'' *.app.crewai.com app.crewai.com; script-src ''self'' ''unsafe-inline'' *.app.crewai.com app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts https://www.gstatic.com https://run.pstmn.io https://apis.google.com https://apis.google.com/js/api.js https://accounts.google.com https://accounts.google.com/gsi/client https://cdnjs.cloudflare.com/ajax/libs/normalize/8.0.1/normalize.min.css.map https://*.google.com https://docs.google.com https://slides.google.com https://js.hs-scripts.com https://js.sentry-cdn.com https://browser.sentry-cdn.com https://www.googletagmanager.com https://js-na1.hs-scripts.com https://js.hubspot.com http://js-na1.hs-scripts.com https://bat.bing.com https://cdn.amplitude.com https://cdn.segment.com https://d1d3n03t5zntha.cloudfront.net/ https://descriptusercontent.com https://edge.fullstory.com https://googleads.g.doubleclick.net https://js.hs-analytics.net https://js.hs-banner.com https://js.hsadspixel.net https://js.hscollectedforms.net
https://js.usemessages.com https://snap.licdn.com https://static.cloudflareinsights.com https://static.reo.dev https://www.google-analytics.com https://share.descript.com/; style-src ''self'' ''unsafe-inline'' *.app.crewai.com app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts; img-src ''self'' data: *.app.crewai.com app.crewai.com https://zeus.tools.crewai.com https://dashboard.tools.crewai.com https://cdn.jsdelivr.net https://forms.hsforms.com https://track.hubspot.com https://px.ads.linkedin.com https://px4.ads.linkedin.com https://www.google.com https://www.google.com.br; font-src ''self'' data: *.app.crewai.com app.crewai.com; connect-src ''self'' *.app.crewai.com app.crewai.com https://zeus.tools.crewai.com https://connect.useparagon.com/ https://zeus.useparagon.com/* https://*.useparagon.com/* https://run.pstmn.io https://connect.tools.crewai.com/ https://*.sentry.io https://www.google-analytics.com https://edge.fullstory.com https://rs.fullstory.com https://api.hubspot.com
https://forms.hscollectedforms.net https://api.hubapi.com https://px.ads.linkedin.com https://px4.ads.linkedin.com https://google.com/pagead/form-data/16713662509 https://google.com/ccm/form-data/16713662509 https://www.google.com/ccm/collect https://worker-actionkit.tools.crewai.com https://api.reo.dev; frame-src ''self'' *.app.crewai.com app.crewai.com https://connect.useparagon.com/ https://zeus.tools.crewai.com https://zeus.useparagon.com/* https://connect.tools.crewai.com/ https://docs.google.com https://drive.google.com https://slides.google.com https://accounts.google.com https://*.google.com https://app.hubspot.com/ https://td.doubleclick.net https://www.googletagmanager.com/ https://www.youtube.com https://share.descript.com'
expires:
- '0'
permissions-policy:
- camera=(), microphone=(self), geolocation=()
pragma:
- no-cache
referrer-policy:
- strict-origin-when-cross-origin
strict-transport-security:
- max-age=63072000; includeSubDomains
vary:
- Accept
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
x-permitted-cross-domain-policies:
- none
x-request-id:
- 230c6cb5-92c7-448d-8c94-e5548a9f4259
x-runtime:
- '0.073220'
x-xss-protection:
- 1; mode=block
status:
code: 401
message: Unauthorized
- request:
body: '{"messages":[{"role":"system","content":"You are Guardrail Agent. You are a expert at validating the output of a task. By providing effective feedback if the output is not valid.\nYour personal goal is: Validate the output of the task\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\":
[\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\": false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"\n Ensure the following task result complies with the given guardrail.\n\n Task result:\n \n Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry''s standard dummy text ever\n \n\n Guardrail:\n Ensure
the result has less than 10 words\n\n Your task:\n - Confirm if the Task result complies with the guardrail.\n - If not, provide clear feedback explaining what is wrong (e.g., by how much it violates the rule, or what specific part fails).\n - Focus only on identifying issues — do not propose corrections.\n - If the Task result complies with the guardrail, saying that is valid\n "}],"model":"gpt-4o"}'
headers:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '2452'
- '1467'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYg96Riy2RJRxnBHvoROukymP9wvs\",\n \"object\": \"chat.completion\",\n \"created\": 1762381196,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"Thought: I need to check if the task result meets the requirement of having less than 10 words.\\n\\nFinal Answer: {\\n \\\"valid\\\": false,\\n \\\"feedback\\\": \\\"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\\\"\\n}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 489,\n \"completion_tokens\": 61,\n \"total_tokens\": 550,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\"\
: 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yHRYTZi8yzRbcODnKr92keLKCb\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446357,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"The task result provided has more than
10 words. I will count the words to verify this.\\n\\nThe task result is the
following text:\\n\\\"Lorem Ipsum is simply dummy text of the printing and
typesetting industry. Lorem Ipsum has been the industry's standard dummy text
ever\\\"\\n\\nCounting the words:\\n\\n1. Lorem \\n2. Ipsum \\n3. is \\n4.
simply \\n5. dummy \\n6. text \\n7. of \\n8. the \\n9. printing \\n10. and
\\n11. typesetting \\n12. industry. \\n13. Lorem \\n14. Ipsum \\n15. has \\n16.
been \\n17. the \\n18. industry's \\n19. standard \\n20. dummy \\n21. text
\\n22. ever\\n\\nThe total word count is 22.\\n\\nThought: I now can give
a great answer\\nFinal Answer: The task result does not comply with the guardrail.
It contains 22 words, which exceeds the limit of 10 words.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
285,\n \"completion_tokens\": 195,\n \"total_tokens\": 480,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_deacdd5f6f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:19:58 GMT
- Thu, 15 Jan 2026 03:05:59 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=REDACTED; path=/; expires=Wed, 05-Nov-25 22:49:58 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- _cfuvid=REDACTED; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- SET-COOKIE-XXX
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1557'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '2201'
- '2130'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '2401'
- '2147'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29439'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 1.122s
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"{\n \"valid\": false,\n \"feedback\": \"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\"\n}"}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly
adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\":
{\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\":
{\n \"properties\": {\n \"valid\": {\n \"description\":
\"Whether the task output complies with the guardrail\",\n \"title\":
\"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\":
{\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\":
\"null\"\n }\n ],\n \"default\": null,\n \"description\":
\"A feedback about the task output if it is not valid\",\n \"title\":
\"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\":
\"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output.
Ensure the final output does not include any code block markers like ```json
or ```python."},{"role":"user","content":"The task result does not comply with
the guardrail. It contains 22 words, which exceeds the limit of 10 words."}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether
the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A
feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1884'
- '1835'
content-type:
- application/json
cookie:
- __cf_bm=REDACTED; _cfuvid=REDACTED
- COOKIE-XXX
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-helper-method:
- chat.completions.parse
- beta.chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYg98QlZ8NTrQ69676MpXXyCoZJT8\",\n \"object\": \"chat.completion\",\n \"created\": 1762381198,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"{\\\"valid\\\":false,\\\"feedback\\\":\\\"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\\\"}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 374,\n \"completion_tokens\": 32,\n \"total_tokens\": 406,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n\
\ \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yJiPCk4fXuogyT5e8XeGRLCSf8\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446359,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"valid\\\":false,\\\"feedback\\\":\\\"The
task output exceeds the word limit of 10 words by containing 22 words.\\\"}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
363,\n \"completion_tokens\": 25,\n \"total_tokens\": 388,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a0e9480a2f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:19:59 GMT
- Thu, 15 Jan 2026 03:05:59 GMT
Server:
- cloudflare
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '913'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '419'
- '488'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '432'
- '507'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29702'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 596ms
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Guardrail Agent. You are a expert at validating the output of a task. By providing effective feedback if the output is not valid.\nYour personal goal is: Validate the output of the task\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\":
[\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\": false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"\n Ensure the following task result complies with the given guardrail.\n\n Task result:\n \n Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry''s standard dummy text ever\n \n\n Guardrail:\n Ensure
the result has less than 500 words\n\n Your task:\n - Confirm if the Task result complies with the guardrail.\n - If not, provide clear feedback explaining what is wrong (e.g., by how much it violates the rule, or what specific part fails).\n - Focus only on identifying issues — do not propose corrections.\n - If the Task result complies with the guardrail, saying that is valid\n "}],"model":"gpt-4o"}'
body: "{\"messages\":[{\"role\":\"system\",\"content\":\"You are Guardrail Agent.
You are a expert at validating the output of a task. By providing effective
feedback if the output is not valid.\\nYour personal goal is: Validate the output
of the task\\nTo give my best complete final answer to the task respond using
the exact following format:\\n\\nThought: I now can give a great answer\\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\\n\\nI MUST use these formats, my job depends
on it!\"},{\"role\":\"user\",\"content\":\"\\nCurrent Task: \\n Ensure
the following task result complies with the given guardrail.\\n\\n Task
result:\\n \\n Lorem Ipsum is simply dummy text of the printing
and typesetting industry. Lorem Ipsum has been the industry's standard dummy
text ever\\n \\n\\n Guardrail:\\n Ensure the result has
less than 500 words\\n\\n Your task:\\n - Confirm if the Task
result complies with the guardrail.\\n - If not, provide clear feedback
explaining what is wrong (e.g., by how much it violates the rule, or what specific
part fails).\\n - Focus only on identifying issues \u2014 do not propose
corrections.\\n - If the Task result complies with the guardrail, saying
that is valid\\n \\n\\nBegin! This is VERY important to you, use the
tools available and give your best Final Answer, your job depends on it!\\n\\nThought:\"}],\"model\":\"gpt-4o\"}"
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '2453'
- '1468'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYgBMV6fu7EvV2BqzMdJaKyLAg1WW\",\n \"object\": \"chat.completion\",\n \"created\": 1762381336,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"Thought: I now can give a great answer\\nFinal Answer: {\\\"valid\\\": true, \\\"feedback\\\": null}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 489,\n \"completion_tokens\": 23,\n \"total_tokens\": 512,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\"\
: \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yKa0rmi2YoTLpyXt9hjeLt2rTI\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446360,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"First, I'll count the number of words
in the Task result to ensure it complies with the guardrail. \\n\\nThe Task
result is: \\\"Lorem Ipsum is simply dummy text of the printing and typesetting
industry. Lorem Ipsum has been the industry's standard dummy text ever.\\\"\\n\\nBy
counting the words: \\n1. Lorem\\n2. Ipsum\\n3. is\\n4. simply\\n5. dummy\\n6.
text\\n7. of\\n8. the\\n9. printing\\n10. and\\n11. typesetting\\n12. industry\\n13.
Lorem\\n14. Ipsum\\n15. has\\n16. been\\n17. the\\n18. industry's\\n19. standard\\n20.
dummy\\n21. text\\n22. ever\\n\\nThere are 22 words total in the Task result.\\n\\nI
need to verify if the count of 22 words is less than the guardrail limit of
500 words.\\n\\nThought: I now can give a great answer\\nFinal Answer: The
Task result complies with the guardrail as it contains 22 words, which is
less than the 500-word limit. Therefore, the output is valid.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
285,\n \"completion_tokens\": 227,\n \"total_tokens\": 512,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_deacdd5f6f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:22:16 GMT
- Thu, 15 Jan 2026 03:06:02 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=REDACTED; path=/; expires=Wed, 05-Nov-25 22:52:16 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- _cfuvid=REDACTED; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- SET-COOKIE-XXX
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1668'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '327'
- '2502'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '372'
- '2522'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29438'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 1.124s
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"{\"valid\": true, \"feedback\": null}"}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly
adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\":
{\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\":
{\n \"properties\": {\n \"valid\": {\n \"description\":
\"Whether the task output complies with the guardrail\",\n \"title\":
\"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\":
{\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\":
\"null\"\n }\n ],\n \"default\": null,\n \"description\":
\"A feedback about the task output if it is not valid\",\n \"title\":
\"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\":
\"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output.
Ensure the final output does not include any code block markers like ```json
or ```python."},{"role":"user","content":"The Task result complies with the
guardrail as it contains 22 words, which is less than the 500-word limit. Therefore,
the output is valid."}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether
the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A
feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1762'
- '1864'
content-type:
- application/json
cookie:
- __cf_bm=REDACTED; _cfuvid=REDACTED
- COOKIE-XXX
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-helper-method:
- chat.completions.parse
- beta.chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYgBMU20R45qGGaLN6vNAmW1NR4R6\",\n \"object\": \"chat.completion\",\n \"created\": 1762381336,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"{\\\"valid\\\":true,\\\"feedback\\\":null}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 347,\n \"completion_tokens\": 9,\n \"total_tokens\": 356,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yMAjNYSCz2foZPEcSVCuapzF8y\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446362,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"valid\\\":true,\\\"feedback\\\":null}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
369,\n \"completion_tokens\": 9,\n \"total_tokens\": 378,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a0e9480a2f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:22:17 GMT
- Thu, 15 Jan 2026 03:06:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '837'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '1081'
- '413'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1241'
- '650'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29478'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 1.042s
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK

View File

@@ -1202,8 +1202,9 @@ def test_complex_and_or_branching():
)
assert execution_order.index("branch_2b") > min_branch_1_index
# Final should be last and after both 2a and 2b
assert execution_order[-1] == "final"
# Final should be after both 2a and 2b
# Note: final may not be absolutely last due to independent branches (like branch_1c)
# that don't contribute to the final result path with sequential listener execution
assert execution_order.index("final") > execution_order.index("branch_2a")
assert execution_order.index("final") > execution_order.index("branch_2b")

View File

@@ -185,8 +185,8 @@ def test_task_guardrail_process_output(task_output):
result = guardrail(task_output)
assert result[0] is False
assert result[1] == "The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words."
# Check that feedback is provided (wording varies by LLM)
assert result[1] and len(result[1]) > 0
guardrail = LLMGuardrail(
description="Ensure the result has less than 500 words", llm=LLM(model="gpt-4o")

View File

@@ -1,214 +0,0 @@
"""Tests for agent utility functions."""
from __future__ import annotations
from typing import Any
import pytest
from pydantic import BaseModel, Field
from crewai.tools.base_tool import BaseTool
from crewai.utilities.agent_utils import convert_tools_to_openai_schema
class CalculatorInput(BaseModel):
"""Input schema for calculator tool."""
expression: str = Field(description="Mathematical expression to evaluate")
class CalculatorTool(BaseTool):
"""A simple calculator tool for testing."""
name: str = "calculator"
description: str = "Perform mathematical calculations"
args_schema: type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
"""Execute the calculation."""
try:
result = eval(expression) # noqa: S307
return str(result)
except Exception as e:
return f"Error: {e}"
class SearchInput(BaseModel):
"""Input schema for search tool."""
query: str = Field(description="Search query")
max_results: int = Field(default=10, description="Maximum number of results")
class SearchTool(BaseTool):
"""A search tool for testing."""
name: str = "web_search"
description: str = "Search the web for information"
args_schema: type[BaseModel] = SearchInput
def _run(self, query: str, max_results: int = 10) -> str:
"""Execute the search."""
return f"Search results for '{query}' (max {max_results})"
class NoSchemaTool(BaseTool):
"""A tool without an args schema for testing edge cases."""
name: str = "simple_tool"
description: str = "A simple tool with no schema"
def _run(self, **kwargs: Any) -> str:
"""Execute the tool."""
return "Simple tool executed"
class TestConvertToolsToOpenaiSchema:
"""Tests for convert_tools_to_openai_schema function."""
def test_converts_single_tool(self) -> None:
"""Test converting a single tool to OpenAI schema."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 1
assert len(functions) == 1
schema = schemas[0]
assert schema["type"] == "function"
assert schema["function"]["name"] == "calculator"
assert schema["function"]["description"] == "Perform mathematical calculations"
assert "properties" in schema["function"]["parameters"]
assert "expression" in schema["function"]["parameters"]["properties"]
def test_converts_multiple_tools(self) -> None:
"""Test converting multiple tools to OpenAI schema."""
tools = [CalculatorTool(), SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 2
assert len(functions) == 2
# Check calculator
calc_schema = next(s for s in schemas if s["function"]["name"] == "calculator")
assert calc_schema["function"]["description"] == "Perform mathematical calculations"
# Check search
search_schema = next(s for s in schemas if s["function"]["name"] == "web_search")
assert search_schema["function"]["description"] == "Search the web for information"
assert "query" in search_schema["function"]["parameters"]["properties"]
assert "max_results" in search_schema["function"]["parameters"]["properties"]
def test_functions_dict_contains_callables(self) -> None:
"""Test that the functions dict maps names to callable run methods."""
tools = [CalculatorTool(), SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert "calculator" in functions
assert "web_search" in functions
assert callable(functions["calculator"])
assert callable(functions["web_search"])
def test_function_can_be_called(self) -> None:
"""Test that the returned function can be called."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
result = functions["calculator"](expression="2 + 2")
assert result == "4"
def test_empty_tools_list(self) -> None:
"""Test with an empty tools list."""
schemas, functions = convert_tools_to_openai_schema([])
assert schemas == []
assert functions == {}
def test_schema_has_required_fields(self) -> None:
"""Test that the schema includes required fields information."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
schema = schemas[0]
params = schema["function"]["parameters"]
# Should have required array
assert "required" in params
assert "query" in params["required"]
def test_tool_without_args_schema(self) -> None:
"""Test converting a tool that doesn't have an args_schema."""
# Create a minimal tool without args_schema
class MinimalTool(BaseTool):
name: str = "minimal"
description: str = "A minimal tool"
def _run(self) -> str:
return "done"
tools = [MinimalTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
assert len(schemas) == 1
schema = schemas[0]
assert schema["function"]["name"] == "minimal"
# Parameters should be empty dict or have minimal schema
assert isinstance(schema["function"]["parameters"], dict)
def test_schema_structure_matches_openai_format(self) -> None:
"""Test that the schema structure matches OpenAI's expected format."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
schema = schemas[0]
# Top level must have "type": "function"
assert schema["type"] == "function"
# Must have "function" key with nested structure
assert "function" in schema
func = schema["function"]
# Function must have name and description
assert "name" in func
assert "description" in func
assert isinstance(func["name"], str)
assert isinstance(func["description"], str)
# Parameters should be a valid JSON schema
assert "parameters" in func
params = func["parameters"]
assert isinstance(params, dict)
def test_removes_redundant_schema_fields(self) -> None:
"""Test that redundant title and description are removed from parameters."""
tools = [CalculatorTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
# Title should be removed as it's redundant with function name
assert "title" not in params
def test_preserves_field_descriptions(self) -> None:
"""Test that field descriptions are preserved in the schema."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
query_prop = params["properties"]["query"]
# Field description should be preserved
assert "description" in query_prop
assert query_prop["description"] == "Search query"
def test_preserves_default_values(self) -> None:
"""Test that default values are preserved in the schema."""
tools = [SearchTool()]
schemas, functions = convert_tools_to_openai_schema(tools)
params = schemas[0]["function"]["parameters"]
max_results_prop = params["properties"]["max_results"]
# Default value should be preserved
assert "default" in max_results_prop
assert max_results_prop["default"] == 10

View File

@@ -348,11 +348,11 @@ def test_agent_emits_execution_error_event(base_agent, base_task):
error_message = "Error happening while sending prompt to model."
base_agent.max_retry_limit = 0
with patch.object(
CrewAgentExecutor, "invoke", wraps=base_agent.agent_executor.invoke
) as invoke_mock:
invoke_mock.side_effect = Exception(error_message)
# Patch at the class level since agent_executor is created lazily
with patch.object(
CrewAgentExecutor, "invoke", side_effect=Exception(error_message)
):
with pytest.raises(Exception): # noqa: B017
base_agent.execute_task(
task=base_task,