Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
2266980274 Enhance BaseLLM documentation and add model parameter validation
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-19 14:53:18 +00:00
Devin AI
8c4f6e3db9 Fix documentation for BaseLLM to clarify model parameter requirement
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-05-19 14:34:13 +00:00
35 changed files with 168 additions and 1617 deletions

View File

@@ -21,7 +21,7 @@ Think of an agent as a specialized team member with specific skills, expertise,
<Note type="info" title="Enterprise Enhancement: Visual Agent Builder">
CrewAI Enterprise includes a Visual Agent Builder that simplifies agent creation and configuration without writing code. Design your agents visually and test them in real-time.
![Visual Agent Builder Screenshot](/images/enterprise/crew-studio-interface.png)
![Visual Agent Builder Screenshot](../images/enterprise/crew-studio-quickstart)
The Visual Agent Builder enables:
- Intuitive agent configuration with form-based interfaces

View File

@@ -4,7 +4,7 @@ description: Learn how to use the CrewAI CLI to interact with CrewAI.
icon: terminal
---
## Overview
# CrewAI CLI Documentation
The CrewAI CLI provides a set of commands to interact with CrewAI, allowing you to create, train, run, and manage crews & flows.

View File

@@ -4,7 +4,7 @@ description: Exploring the dynamics of agent collaboration within the CrewAI fra
icon: screen-users
---
## Overview
## Collaboration Fundamentals
Collaboration in CrewAI is fundamental, enabling agents to combine their skills, share information, and assist each other in task execution, embodying a truly cooperative ecosystem.

View File

@@ -4,7 +4,7 @@ description: Understanding and utilizing crews in the crewAI framework with comp
icon: people-group
---
## Overview
## What is a Crew?
A crew in crewAI represents a collaborative group of agents working together to achieve a set of tasks. Each crew defines the strategy for task execution, agent collaboration, and the overall workflow.

View File

@@ -4,7 +4,7 @@ description: 'Tap into CrewAI events to build custom integrations and monitoring
icon: spinner
---
## Overview
# Event Listeners
CrewAI provides a powerful event system that allows you to listen for and react to various events that occur during the execution of your Crew. This feature enables you to build custom integrations, monitoring solutions, logging systems, or any other functionality that needs to be triggered based on CrewAI's internal events.
@@ -21,7 +21,7 @@ When specific actions occur in CrewAI (like a Crew starting execution, an Agent
<Note type="info" title="Enterprise Enhancement: Prompt Tracing">
CrewAI Enterprise provides a built-in Prompt Tracing feature that leverages the event system to track, store, and visualize all prompts, completions, and associated metadata. This provides powerful debugging capabilities and transparency into your agent operations.
![Prompt Tracing Dashboard](/images/enterprise/traces-overview.png)
![Prompt Tracing Dashboard](../images/enterprise/prompt-tracing.png)
With Prompt Tracing you can:
- View the complete history of all prompts sent to your LLM
@@ -224,15 +224,6 @@ CrewAI provides a wide range of events that you can listen for:
- **ToolExecutionErrorEvent**: Emitted when a tool execution encounters an error
- **ToolSelectionErrorEvent**: Emitted when there's an error selecting a tool
### Knowledge Events
- **KnowledgeRetrievalStartedEvent**: Emitted when a knowledge retrieval is started
- **KnowledgeRetrievalCompletedEvent**: Emitted when a knowledge retrieval is completed
- **KnowledgeQueryStartedEvent**: Emitted when a knowledge query is started
- **KnowledgeQueryCompletedEvent**: Emitted when a knowledge query is completed
- **KnowledgeQueryFailedEvent**: Emitted when a knowledge query fails
- **KnowledgeSearchQueryFailedEvent**: Emitted when a knowledge search query fails
### Flow Events
- **FlowCreatedEvent**: Emitted when a Flow is created

View File

@@ -4,7 +4,7 @@ description: Learn how to create and manage AI workflows using CrewAI Flows.
icon: arrow-progress
---
## Overview
## Introduction
CrewAI Flows is a powerful feature designed to streamline the creation and management of AI workflows. Flows allow developers to combine and coordinate coding tasks and Crews efficiently, providing a robust framework for building sophisticated AI automations.

View File

@@ -4,7 +4,7 @@ description: What is knowledge in CrewAI and how to use it.
icon: book
---
## Overview
## What is Knowledge?
Knowledge in CrewAI is a powerful system that allows AI agents to access and utilize external information sources during their tasks.
Think of it as giving your agents a reference library they can consult while working.
@@ -36,15 +36,12 @@ CrewAI supports various types of knowledge sources out of the box:
## Supported Knowledge Parameters
<ParamField body="sources" type="List[BaseKnowledgeSource]" required="Yes">
List of knowledge sources that provide content to be stored and queried. Can include PDF, CSV, Excel, JSON, text files, or string content.
</ParamField>
<ParamField body="collection_name" type="str">
Name of the collection where the knowledge will be stored. Used to identify different sets of knowledge. Defaults to \"knowledge\" if not provided.
</ParamField>
<ParamField body="storage" type="Optional[KnowledgeStorage]">
Custom storage configuration for managing how the knowledge is stored and retrieved. If not provided, a default storage will be created.
</ParamField>
| Parameter | Type | Required | Description |
| :--------------------------- | :---------------------------------- | :------- | :---------------------------------------------------------------------------------------------------------------------------------------------------- |
| `sources` | **List[BaseKnowledgeSource]** | Yes | List of knowledge sources that provide content to be stored and queried. Can include PDF, CSV, Excel, JSON, text files, or string content. |
| `collection_name` | **str** | No | Name of the collection where the knowledge will be stored. Used to identify different sets of knowledge. Defaults to "knowledge" if not provided. |
| `storage` | **Optional[KnowledgeStorage]** | No | Custom storage configuration for managing how the knowledge is stored and retrieved. If not provided, a default storage will be created. |
<Tip>
Unlike retrieval from a vector database using a tool, agents preloaded with knowledge will not need a retrieval persona or task.
@@ -435,46 +432,6 @@ Query rewriting happens transparently using a system prompt that instructs the L
This mechanism is fully automatic and requires no configuration from users. The agent's LLM is used to perform the query rewriting, so using a more capable LLM can improve the quality of rewritten queries.
</Tip>
## Knowledge Events
CrewAI emits events during the knowledge retrieval process that you can listen for using the event system. These events allow you to monitor, debug, and analyze how knowledge is being retrieved and used by your agents.
### Available Knowledge Events
- **KnowledgeRetrievalStartedEvent**: Emitted when an agent starts retrieving knowledge from sources
- **KnowledgeRetrievalCompletedEvent**: Emitted when knowledge retrieval is completed, including the query used and the retrieved content
- **KnowledgeQueryStartedEvent**: Emitted when a query to knowledge sources begins
- **KnowledgeQueryCompletedEvent**: Emitted when a query completes successfully
- **KnowledgeQueryFailedEvent**: Emitted when a query to knowledge sources fails
- **KnowledgeSearchQueryFailedEvent**: Emitted when a search query fails
### Example: Monitoring Knowledge Retrieval
```python
from crewai.utilities.events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
class KnowledgeMonitorListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(KnowledgeRetrievalStartedEvent)
def on_knowledge_retrieval_started(source, event):
print(f"Agent '{event.agent.role}' started retrieving knowledge")
@crewai_event_bus.on(KnowledgeRetrievalCompletedEvent)
def on_knowledge_retrieval_completed(source, event):
print(f"Agent '{event.agent.role}' completed knowledge retrieval")
print(f"Query: {event.query}")
print(f"Retrieved {len(event.retrieved_knowledge)} knowledge chunks")
# Create an instance of your listener
knowledge_monitor = KnowledgeMonitorListener()
```
For more information on using events, see the [Event Listeners](https://docs.crewai.com/concepts/event-listener) documentation.
### Example
```python

View File

@@ -4,10 +4,9 @@ description: 'A comprehensive guide to configuring and using Large Language Mode
icon: 'microchip-ai'
---
## Overview
CrewAI integrates with multiple LLM providers through LiteLLM, giving you the flexibility to choose the right model for your specific use case. This guide will help you understand how to configure and use different LLM providers in your CrewAI projects.
<Note>
CrewAI integrates with multiple LLM providers through LiteLLM, giving you the flexibility to choose the right model for your specific use case. This guide will help you understand how to configure and use different LLM providers in your CrewAI projects.
</Note>
## What are LLMs?

View File

@@ -4,7 +4,7 @@ description: Leveraging memory systems in the CrewAI framework to enhance agent
icon: database
---
## Overview
## Introduction to Memory Systems in CrewAI
The crewAI framework introduces a sophisticated memory system designed to significantly enhance the capabilities of AI agents.
This system comprises `short-term memory`, `long-term memory`, `entity memory`, and `contextual memory`, each serving a unique purpose in aiding agents to remember,

View File

@@ -1,10 +1,10 @@
---
title: Planning
description: Learn how to add planning to your CrewAI Crew and improve their performance.
icon: ruler-combined
icon: brain
---
## Overview
## Introduction
The planning feature in CrewAI allows you to add planning capability to your crew. When enabled, before each Crew iteration,
all Crew information is sent to an AgentPlanner that will plan the tasks step by step, and this plan will be added to each task description.

View File

@@ -4,8 +4,7 @@ description: Detailed guide on workflow management through processes in CrewAI,
icon: bars-staggered
---
## Overview
## Understanding Processes
<Tip>
Processes orchestrate the execution of tasks by agents, akin to project management in human teams.
These processes ensure tasks are distributed and executed efficiently, in alignment with a predefined strategy.

View File

@@ -1,147 +0,0 @@
---
title: Reasoning
description: "Learn how to enable and use agent reasoning to improve task execution."
icon: brain
---
## Overview
Agent reasoning is a feature that allows agents to reflect on a task and create a plan before execution. This helps agents approach tasks more methodically and ensures they're ready to perform the assigned work.
## Usage
To enable reasoning for an agent, simply set `reasoning=True` when creating the agent:
```python
from crewai import Agent
agent = Agent(
role="Data Analyst",
goal="Analyze complex datasets and provide insights",
backstory="You are an experienced data analyst with expertise in finding patterns in complex data.",
reasoning=True, # Enable reasoning
max_reasoning_attempts=3 # Optional: Set a maximum number of reasoning attempts
)
```
## How It Works
When reasoning is enabled, before executing a task, the agent will:
1. Reflect on the task and create a detailed plan
2. Evaluate whether it's ready to execute the task
3. Refine the plan as necessary until it's ready or max_reasoning_attempts is reached
4. Inject the reasoning plan into the task description before execution
This process helps the agent break down complex tasks into manageable steps and identify potential challenges before starting.
## Configuration Options
<ParamField body="reasoning" type="bool" default="False">
Enable or disable reasoning
</ParamField>
<ParamField body="max_reasoning_attempts" type="int" default="None">
Maximum number of attempts to refine the plan before proceeding with execution. If None (default), the agent will continue refining until it's ready.
</ParamField>
## Example
Here's a complete example:
```python
from crewai import Agent, Task, Crew
# Create an agent with reasoning enabled
analyst = Agent(
role="Data Analyst",
goal="Analyze data and provide insights",
backstory="You are an expert data analyst.",
reasoning=True,
max_reasoning_attempts=3 # Optional: Set a limit on reasoning attempts
)
# Create a task
analysis_task = Task(
description="Analyze the provided sales data and identify key trends.",
expected_output="A report highlighting the top 3 sales trends.",
agent=analyst
)
# Create a crew and run the task
crew = Crew(agents=[analyst], tasks=[analysis_task])
result = crew.kickoff()
print(result)
```
## Error Handling
The reasoning process is designed to be robust, with error handling built in. If an error occurs during reasoning, the agent will proceed with executing the task without the reasoning plan. This ensures that tasks can still be executed even if the reasoning process fails.
Here's how to handle potential errors in your code:
```python
from crewai import Agent, Task
import logging
# Set up logging to capture any reasoning errors
logging.basicConfig(level=logging.INFO)
# Create an agent with reasoning enabled
agent = Agent(
role="Data Analyst",
goal="Analyze data and provide insights",
reasoning=True,
max_reasoning_attempts=3
)
# Create a task
task = Task(
description="Analyze the provided sales data and identify key trends.",
expected_output="A report highlighting the top 3 sales trends.",
agent=agent
)
# Execute the task
# If an error occurs during reasoning, it will be logged and execution will continue
result = agent.execute_task(task)
```
## Example Reasoning Output
Here's an example of what a reasoning plan might look like for a data analysis task:
```
Task: Analyze the provided sales data and identify key trends.
Reasoning Plan:
I'll analyze the sales data to identify the top 3 trends.
1. Understanding of the task:
I need to analyze sales data to identify key trends that would be valuable for business decision-making.
2. Key steps I'll take:
- First, I'll examine the data structure to understand what fields are available
- Then I'll perform exploratory data analysis to identify patterns
- Next, I'll analyze sales by time periods to identify temporal trends
- I'll also analyze sales by product categories and customer segments
- Finally, I'll identify the top 3 most significant trends
3. Approach to challenges:
- If the data has missing values, I'll decide whether to fill or filter them
- If the data has outliers, I'll investigate whether they're valid data points or errors
- If trends aren't immediately obvious, I'll apply statistical methods to uncover patterns
4. Use of available tools:
- I'll use data analysis tools to explore and visualize the data
- I'll use statistical tools to identify significant patterns
- I'll use knowledge retrieval to access relevant information about sales analysis
5. Expected outcome:
A concise report highlighting the top 3 sales trends with supporting evidence from the data.
READY: I am ready to execute the task.
```
This reasoning plan helps the agent organize its approach to the task, consider potential challenges, and ensure it delivers the expected output.

View File

@@ -4,7 +4,7 @@ description: Detailed guide on managing and creating tasks within the CrewAI fra
icon: list-check
---
## Overview
## Overview of a Task
In the CrewAI framework, a `Task` is a specific assignment completed by an `Agent`.
@@ -15,7 +15,7 @@ Tasks within CrewAI can be collaborative, requiring multiple agents to work toge
<Note type="info" title="Enterprise Enhancement: Visual Task Builder">
CrewAI Enterprise includes a Visual Task Builder in Crew Studio that simplifies complex task creation and chaining. Design your task flows visually and test them in real-time without writing code.
![Task Builder Screenshot](/images/enterprise/crew-studio-interface.png)
![Task Builder Screenshot](../images/enterprise/crew-studio-quickstart.png)
The Visual Task Builder enables:
- Drag-and-drop task creation

View File

@@ -4,7 +4,7 @@ description: Learn how to test your CrewAI Crew and evaluate their performance.
icon: vial
---
## Overview
## Introduction
Testing is a crucial part of the development process, and it is essential to ensure that your crew is performing as expected. With crewAI, you can easily test your crew and evaluate its performance using the built-in testing capabilities.

View File

@@ -4,7 +4,7 @@ description: Understanding and leveraging tools within the CrewAI framework for
icon: screwdriver-wrench
---
## Overview
## Introduction
CrewAI tools empower agents with capabilities ranging from web searching and data analysis to collaboration and delegating tasks among coworkers.
This documentation outlines how to create, integrate, and leverage these tools within the CrewAI framework, including a new focus on collaboration tools.
@@ -18,6 +18,8 @@ enabling everything from simple searches to complex interactions and effective t
<Note type="info" title="Enterprise Enhancement: Tools Repository">
CrewAI Enterprise provides a comprehensive Tools Repository with pre-built integrations for common business systems and APIs. Deploy agents with enterprise tools in minutes instead of days.
![Tools Repository Screenshot](../images/enterprise/tools-repository.png)
The Enterprise Tools Repository includes:
- Pre-built connectors for popular enterprise systems
- Custom tool creation interface

View File

@@ -4,7 +4,7 @@ description: Learn how to train your CrewAI agents by giving them feedback early
icon: dumbbell
---
## Overview
## Introduction
The training feature in CrewAI allows you to train your AI agents using the command-line interface (CLI).
By running the command `crewai train -n <n_iterations>`, you can specify the number of iterations for the training process.

View File

@@ -74,7 +74,6 @@
"concepts/collaboration",
"concepts/training",
"concepts/memory",
"concepts/reasoning",
"concepts/planning",
"concepts/testing",
"concepts/cli",
@@ -140,12 +139,6 @@
"tools/youtubevideosearchtool"
]
},
{
"group": "MCP Integration",
"pages": [
"mcp/crewai-mcp-integration"
]
},
{
"group": "Agent Monitoring & Observability",
"pages": [

View File

@@ -64,14 +64,14 @@ CrewAI Enterprise extends the power of the open-source framework with features d
Sign Up
</Card>
</Step>
<Step title="Build your first crew">
Use code or Crew Studio to build your crew
<Step title="Create your first crew">
Use code or Crew Studio to create your crew
<Card
title="Build Crew"
title="Create Crew"
icon="paintbrush"
href="/enterprise/guides/build-crew"
href="/enterprise/guides/create-crew"
>
Build Crew
Create Crew
</Card>
</Step>
<Step title="Deploy your crew">

View File

@@ -16,6 +16,16 @@ To create a custom LLM implementation, you need to:
- `supports_function_calling()`: Whether the LLM supports function calling
- `supports_stop_words()`: Whether the LLM supports stop words
- `get_context_window_size()`: The context window size of the LLM
3. Ensure you pass a model identifier string to the BaseLLM constructor using `super().__init__(model="your-model-name")`
## Required Parameters
When creating custom LLM implementations, the following parameters are essential:
- `model`: String identifier for your model implementation.
- Required in the BaseLLM constructor
- Example values: `"gpt-4-custom"`, `"anthropic-claude-custom"`, `"custom-llm-v1.0"`
- Used to identify the model in logs, metrics, and other components
## Example: Basic Custom LLM
@@ -24,8 +34,14 @@ from crewai import BaseLLM
from typing import Any, Dict, List, Optional, Union
class CustomLLM(BaseLLM):
"""A custom LLM implementation with basic API key authentication.
Args:
api_key (str): API key for the LLM service.
endpoint (str): Endpoint URL for the LLM service.
"""
def __init__(self, api_key: str, endpoint: str):
super().__init__() # Initialize the base class to set default attributes
super().__init__(model="custom-llm-v1.0") # Initialize with required model parameter
if not api_key or not isinstance(api_key, str):
raise ValueError("Invalid API key: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
@@ -195,7 +211,7 @@ Always validate input parameters to prevent runtime errors:
```python
def __init__(self, api_key: str, endpoint: str):
super().__init__()
super().__init__(model="custom-api-llm-v1.0") # Initialize with required model parameter
if not api_key or not isinstance(api_key, str):
raise ValueError("Invalid API key: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
@@ -238,8 +254,14 @@ from crewai import BaseLLM, Agent, Task
from typing import Any, Dict, List, Optional, Union
class JWTAuthLLM(BaseLLM):
"""A custom LLM implementation with JWT-based authentication.
Args:
jwt_token (str): JWT token for authentication.
endpoint (str): Endpoint URL for the LLM service.
"""
def __init__(self, jwt_token: str, endpoint: str):
super().__init__() # Initialize the base class to set default attributes
super().__init__(model="jwt-auth-llm-v1.0") # Initialize with required model parameter
if not jwt_token or not isinstance(jwt_token, str):
raise ValueError("Invalid JWT token: must be a non-empty string")
if not endpoint or not isinstance(endpoint, str):
@@ -386,8 +408,14 @@ import logging
from typing import Any, Dict, List, Optional, Union
class LoggingLLM(BaseLLM):
"""A custom LLM implementation with logging capabilities.
Args:
api_key (str): API key for the LLM service.
endpoint (str): Endpoint URL for the LLM service.
"""
def __init__(self, api_key: str, endpoint: str):
super().__init__()
super().__init__(model="logging-llm-v1.0") # Initialize with required model parameter
self.api_key = api_key
self.endpoint = endpoint
self.logger = logging.getLogger("crewai.llm.custom")
@@ -419,13 +447,21 @@ import time
from typing import Any, Dict, List, Optional, Union
class RateLimitedLLM(BaseLLM):
"""A custom LLM implementation with rate limiting capabilities.
Args:
api_key (str): API key for the LLM service.
endpoint (str): Endpoint URL for the LLM service.
requests_per_minute (int, optional): Maximum number of requests allowed per minute.
Defaults to 60.
"""
def __init__(
self,
api_key: str,
endpoint: str,
requests_per_minute: int = 60
):
super().__init__()
super().__init__(model="rate-limited-llm-v1.0") # Initialize with required model parameter
self.api_key = api_key
self.endpoint = endpoint
self.requests_per_minute = requests_per_minute
@@ -467,8 +503,14 @@ import time
from typing import Any, Dict, List, Optional, Union
class MetricsCollectingLLM(BaseLLM):
"""A custom LLM implementation with metrics collection capabilities.
Args:
api_key (str): API key for the LLM service.
endpoint (str): Endpoint URL for the LLM service.
"""
def __init__(self, api_key: str, endpoint: str):
super().__init__()
super().__init__(model="metrics-llm-v1.0") # Initialize with required model parameter
self.api_key = api_key
self.endpoint = endpoint
self.metrics: Dict[str, Any] = {

Binary file not shown.

Before

Width:  |  Height:  |  Size: 362 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 241 KiB

View File

@@ -1,229 +0,0 @@
---
title: 'MCP Servers as Tools in CrewAI'
description: 'Learn how to integrate MCP servers as tools in your CrewAI agents using the `crewai-tools` library.'
icon: 'plug'
---
## Overview
The [Model Context Protocol](https://modelcontextprotocol.io/introduction) (MCP) provides a standardized way for AI agents to provide context to LLMs by communicating with external services, known as MCP Servers.
The `crewai-tools` library extends CrewAI's capabilities by allowing you to seamlessly integrate tools from these MCP servers into your agents.
This gives your crews access to a vast ecosystem of functionalities. For now, we support **Standard Input/Output** (Stdio) and **Server-Sent Events** (SSE) transport mechanisms.
<Info>
We will also be integrating **Streamable HTTP** transport in the near future.
Streamable HTTP is designed for efficient, bi-directional communication over a single HTTP connection.
</Info>
## Installation
Before you start using MCP with `crewai-tools`, you need to install the `mcp` extra `crewai-tools` dependency with the following command:
```shell
uv pip install 'crewai-tools[mcp]'
```
### Integrating MCP Tools with `MCPServerAdapter`
The `MCPServerAdapter` class from `crewai-tools` is the primary way to connect to an MCP server and make its tools available to your CrewAI agents.
It supports different transport mechanisms, primarily **Stdio** (for local servers) and **SSE** (Server-Sent Events).You have two main options for managing the connection lifecycle:
### Option 1: Fully Managed Connection (Recommended)
Using a Python context manager (`with` statement) is the recommended approach. It automatically handles starting and stopping the connection to the MCP server.
**For a local Stdio-based MCP server:**
```python
from crewai import Agent, Task, Crew
from crewai_tools import MCPServerAdapter
from mcp import StdioServerParameters
import os
server_params=StdioServerParameters(
command="uxv", # Or your python3 executable i.e. "python3"
args=["mock_server.py"],
env={"UV_PYTHON": "3.12", **os.environ},
)
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from Stdio MCP server: {[tool.name for tool in tools]}")
# Example: Using the tools from the Stdio MCP server in a CrewAI Agent
agent = Agent(
role="Web Information Retriever",
goal="Scrape content from a specified URL.",
backstory="An AI that can fetch and process web page data via an MCP tool.",
tools=tools,
verbose=True,
)
task = Task(
description="Scrape content from a specified URL.",
expected_output="Scraped content from the specified URL.",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
)
result = crew.kickoff()
print(result)
```
**For a remote SSE-based MCP server:**
```python
from crewai_tools import MCPServerAdapter
from crewai import Agent, Task, Crew
server_params = {"url": "http://localhost:8000/sse"}
with MCPServerAdapter(server_params) as tools:
print(f"Available tools from SSE MCP server: {[tool.name for tool in tools]}")
# Example: Using the tools from the SSE MCP server in a CrewAI Agent
agent = Agent(
role="Web Information Retriever",
goal="Scrape content from a specified URL.",
backstory="An AI that can fetch and process web page data via an MCP tool.",
tools=tools,
verbose=True,
)
task = Task(
description="Scrape content from a specified URL.",
expected_output="Scraped content from the specified URL.",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
)
result = crew.kickoff()
print(result)
```
### Option 2: More control over the MCP server connection lifecycle
If you need finer-grained control over the MCP server connection lifecycle, you can instantiate `MCPServerAdapter` directly and manage its `start()` and `stop()` methods.
<Info>
You **MUST** call `mcp_server_adapter.stop()` to ensure the connection is closed and resources are released. Using a `try...finally` block is highly recommended.
</Info>
#### Stdio Transport Example (Manual)
```python
from mcp import StdioServerParameters
from crewai_tools import MCPServerAdapter
from crewai import Agent, Task, Crew
import os
stdio_params = StdioServerParameters(
command="uvx", # Or your python3 executable i.e. "python3"
args=["--quiet", "your-mcp-server@0.1.3"],
env={"UV_PYTHON": "3.12", **os.environ},
)
mcp_server_adapter = MCPServerAdapter(server_params=stdio_params)
try:
mcp_server_adapter.start() # Manually start the connection
tools = mcp_server_adapter.tools
print(f"Available tools (manual Stdio): {[tool.name for tool in tools]}")
# Use 'tools' with your Agent, Task, Crew setup as in Option 1
agent = Agent(
role="Medical Researcher",
goal="Find recent studies on a given topic using PubMed.",
backstory="An AI assistant specialized in biomedical literature research.",
tools=tools,
verbose=True
)
task = Task(
description="Search for recent articles on 'crispr gene editing'.",
expected_output="A summary of the top 3 recent articles.",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential
)
result = crew.kickoff()
print(result)
finally:
print("Stopping Stdio MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
```
#### SSE Transport Example (Manual)
```python
from crewai_tools import MCPServerAdapter
from crewai import Agent, Task, Crew, Process
from mcp import StdioServerParameters
server_params = {"url": "http://localhost:8000/sse"}
try:
mcp_server_adapter = MCPServerAdapter(server_params)
mcp_server_adapter.start()
tools = mcp_server_adapter.tools
print(f"Available tools (manual SSE): {[tool.name for tool in tools]}")
agent = Agent(
role="Medical Researcher",
goal="Find recent studies on a given topic using PubMed.",
backstory="An AI assistant specialized in biomedical literature research.",
tools=tools,
verbose=True
)
task = Task(
description="Search for recent articles on 'crispr gene editing'.",
expected_output="A summary of the top 3 recent articles.",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
process=Process.sequential
)
result = crew.kickoff()
print(result)
finally:
print("Stopping SSE MCP server connection (manual)...")
mcp_server_adapter.stop() # **Crucial: Ensure stop is called**
```
## Staying Safe with MCP
<Warning>
Always ensure that you trust an MCP Server before using it.
</Warning>
#### Security Warning: DNS Rebinding Attacks
SSE transports can be vulnerable to DNS rebinding attacks if not properly secured.
To prevent this:
1. **Always validate Origin headers** on incoming SSE connections to ensure they come from expected sources
2. **Avoid binding servers to all network interfaces** (0.0.0.0) when running locally - bind only to localhost (127.0.0.1) instead
3. **Implement proper authentication** for all SSE connections
Without these protections, attackers could use DNS rebinding to interact with local MCP servers from remote websites.
For more details, see the [MCP Transport Security](https://modelcontextprotocol.io/docs/concepts/transports#security-considerations) documentation.
### Limitations
* **Supported Primitives**: Currently, `MCPServerAdapter` primarily supports adapting MCP `tools`.
Other MCP primitives like `prompts` or `resources` are not directly integrated as CrewAI components through this adapter at this time.
* **Output Handling**: The adapter typically processes the primary text output from an MCP tool (e.g., `.content[0].text`). Complex or multi-modal outputs might require custom handling if not fitting this pattern.

View File

@@ -119,14 +119,6 @@ class Agent(BaseAgent):
default="safe",
description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).",
)
reasoning: bool = Field(
default=False,
description="Whether the agent should reflect and create a plan before executing a task.",
)
max_reasoning_attempts: Optional[int] = Field(
default=None,
description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
)
embedder: Optional[Dict[str, Any]] = Field(
default=None,
description="Embedder configuration for the agent.",
@@ -233,21 +225,6 @@ class Agent(BaseAgent):
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
if self.reasoning:
try:
from crewai.utilities.reasoning_handler import AgentReasoning, AgentReasoningOutput
reasoning_handler = AgentReasoning(task=task, agent=self)
reasoning_output: AgentReasoningOutput = reasoning_handler.handle_agent_reasoning()
# Add the reasoning plan to the task description
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
except Exception as e:
if hasattr(self, '_logger'):
self._logger.log("error", f"Error during reasoning process: {str(e)}")
else:
print(f"Error during reasoning process: {str(e)}")
if self.tools_handler:
self.tools_handler.last_used_tool = {} # type: ignore # Incompatible types in assignment (expression has type "dict[Never, Never]", variable has type "ToolCalling")

View File

@@ -161,7 +161,7 @@ def tree_find_and_replace(directory, find, replace):
for filename in files:
filepath = os.path.join(path, filename)
with open(filepath, "r", encoding="utf-8", errors="ignore") as file:
with open(filepath, "r") as file:
contents = file.read()
with open(filepath, "w") as file:
file.write(contents.replace(find, replace))

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union
class BaseLLM(ABC):
@@ -27,15 +27,25 @@ class BaseLLM(ABC):
self,
model: str,
temperature: Optional[float] = None,
):
) -> None:
"""Initialize the BaseLLM with default attributes.
This constructor sets default values for attributes that are expected
by the CrewAgentExecutor and other components.
All custom LLM implementations should call super().__init__() to ensure
that these default attributes are properly initialized.
All custom LLM implementations should call super().__init__(model="model_name"),
where "model_name" is a string identifier for your model. This parameter
is required and cannot be omitted.
Args:
model (str): Required. A string identifier for the model.
temperature (Optional[float]): The sampling temperature to use.
Raises:
ValueError: If the model parameter is not provided or empty.
"""
if not model:
raise ValueError("model parameter is required and must be a non-empty string")
self.model = model
self.temperature = temperature
self.stop = []

View File

@@ -135,10 +135,6 @@ class Task(BaseModel):
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
markdown: Optional[bool] = Field(
description="Whether the task should instruct the agent to return the final answer formatted in Markdown",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,
@@ -526,14 +522,10 @@ class Task(BaseModel):
return guardrail_result
def prompt(self) -> str:
"""Generates the task prompt with optional markdown formatting.
When the markdown attribute is True, instructions for formatting the
response in Markdown syntax will be added to the prompt.
"""Prompt the task.
Returns:
str: The formatted prompt string containing the task description,
expected output, and optional markdown formatting instructions.
Prompt of the task.
"""
tasks_slices = [self.description]
@@ -541,17 +533,6 @@ class Task(BaseModel):
expected_output=self.expected_output
)
tasks_slices = [self.description, output]
if self.markdown:
markdown_instruction = """Your final answer MUST be formatted in Markdown syntax.
Follow these guidelines:
- Use # for headers
- Use ** for bold text
- Use * for italic text
- Use - or * for bullet points
- Use `code` for inline code
- Use ```language for code blocks"""
tasks_slices.append(markdown_instruction)
return "\n".join(tasks_slices)
def interpolate_inputs_and_add_conversation_history(

View File

@@ -64,7 +64,7 @@ class Telemetry:
attribute in the Crew class.
"""
def __init__(self) -> None:
def __init__(self):
self.ready: bool = False
self.trace_set: bool = False

View File

@@ -51,11 +51,5 @@
"description": "See image to understand its content, you can optionally ask a question about the image",
"default_action": "Please provide a detailed description of this image, including all visual elements, context, and any notable details you can observe."
}
},
"reasoning": {
"initial_plan": "You are {role}, a professional with the following background: {backstory}\n\nYour primary goal is: {goal}\n\nAs {role}, you are creating a strategic plan for a task that requires your expertise and unique perspective.",
"refine_plan": "You are {role}, a professional with the following background: {backstory}\n\nYour primary goal is: {goal}\n\nAs {role}, you are refining a strategic plan for a task that requires your expertise and unique perspective.",
"create_plan_prompt": "You are {role} with this background: {backstory}\n\nYour primary goal is: {goal}\n\nYou have been assigned the following task:\n{description}\n\nExpected output:\n{expected_output}\n\nAvailable tools: {tools}\n\nBefore executing this task, create a detailed plan that leverages your expertise as {role} and outlines:\n1. Your understanding of the task from your professional perspective\n2. The key steps you'll take to complete it, drawing on your background and skills\n3. How you'll approach any challenges that might arise, considering your expertise\n4. How you'll strategically use the available tools based on your experience\n5. The expected outcome and how it aligns with your goal\n\nAfter creating your plan, assess whether you feel ready to execute the task.\nConclude with one of these statements:\n- \"READY: I am ready to execute the task.\"\n- \"NOT READY: I need to refine my plan because [specific reason].\"",
"refine_plan_prompt": "You are {role} with this background: {backstory}\n\nYour primary goal is: {goal}\n\nYou created the following plan for this task:\n{current_plan}\n\nHowever, you indicated that you're not ready to execute the task yet.\n\nPlease refine your plan further, drawing on your expertise as {role} to address any gaps or uncertainties.\n\nAfter refining your plan, assess whether you feel ready to execute the task.\nConclude with one of these statements:\n- \"READY: I am ready to execute the task.\"\n- \"NOT READY: I need to refine my plan further because [specific reason].\""
}
}

View File

@@ -56,11 +56,6 @@ from .tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
class EventListener(BaseEventListener):
@@ -411,30 +406,5 @@ class EventListener(BaseEventListener):
self.formatter.current_crew_tree,
)
# ----------- REASONING EVENTS -----------
@crewai_event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event: AgentReasoningStartedEvent):
self.formatter.handle_reasoning_started(
self.formatter.current_agent_branch,
event.attempt,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event: AgentReasoningCompletedEvent):
self.formatter.handle_reasoning_completed(
event.plan,
event.ready,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event: AgentReasoningFailedEvent):
self.formatter.handle_reasoning_failed(
event.error,
self.formatter.current_crew_tree,
)
event_listener = EventListener()

View File

@@ -43,19 +43,6 @@ from .tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from .reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
from .knowledge_events import (
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeQueryStartedEvent,
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeSearchQueryFailedEvent,
)
EventTypes = Union[
CrewKickoffStartedEvent,
@@ -87,13 +74,4 @@ EventTypes = Union[
LLMStreamChunkEvent,
LLMGuardrailStartedEvent,
LLMGuardrailCompletedEvent,
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
KnowledgeRetrievalStartedEvent,
KnowledgeRetrievalCompletedEvent,
KnowledgeQueryStartedEvent,
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
KnowledgeSearchQueryFailedEvent,
]

View File

@@ -1,31 +0,0 @@
from crewai.utilities.events.base_events import BaseEvent
class AgentReasoningStartedEvent(BaseEvent):
"""Event emitted when an agent starts reasoning about a task."""
type: str = "agent_reasoning_started"
agent_role: str
task_id: str
attempt: int = 1 # The current reasoning/refinement attempt
class AgentReasoningCompletedEvent(BaseEvent):
"""Event emitted when an agent finishes its reasoning process."""
type: str = "agent_reasoning_completed"
agent_role: str
task_id: str
plan: str
ready: bool
attempt: int = 1
class AgentReasoningFailedEvent(BaseEvent):
"""Event emitted when the reasoning process fails."""
type: str = "agent_reasoning_failed"
agent_role: str
task_id: str
error: str
attempt: int = 1

View File

@@ -4,7 +4,6 @@ from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
from rich.live import Live
class ConsoleFormatter:
@@ -16,17 +15,10 @@ class ConsoleFormatter:
current_method_branch: Optional[Tree] = None
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
current_reasoning_branch: Optional[Tree] = None # Track reasoning status
def __init__(self, verbose: bool = False):
self.console = Console(width=None)
self.verbose = verbose
# Live instance to dynamically update a Tree renderable (e.g. the Crew tree)
# When multiple Tree objects are printed sequentially we reuse this Live
# instance so the previous render is replaced instead of writing a new one.
# Once any non-Tree renderable is printed we stop the Live session so the
# final Tree persists on the terminal.
self._live: Optional[Live] = None
def create_panel(self, content: Text, title: str, style: str = "blue") -> Panel:
"""Create a standardized panel with consistent styling."""
@@ -67,7 +59,7 @@ class ConsoleFormatter:
label.append(f"{prefix} ", style=f"{style} bold")
label.append(name, style=style)
if status:
label.append("\nStatus: ", style="white")
label.append("\n Status: ", style="white")
label.append(status, style=f"{style} bold")
tree.label = label
@@ -76,46 +68,7 @@ class ConsoleFormatter:
return parent.add(Text(text, style=style))
def print(self, *args, **kwargs) -> None:
"""Custom print that replaces consecutive Tree renders.
* If the argument is a single ``Tree`` instance, we either start a
``Live`` session (first tree) or update the existing one (subsequent
trees). This results in the tree being rendered in-place instead of
being appended repeatedly to the log.
* A blank call (no positional arguments) is ignored while a Live
session is active so it does not prematurely terminate the tree
rendering.
* Any other renderable will terminate the Live session (if one is
active) so the last tree stays on screen and the new content is
printed normally.
"""
# Case 1: updating / starting live Tree rendering
if len(args) == 1 and isinstance(args[0], Tree):
tree = args[0]
if not self._live:
# Start a new Live session for the first tree
self._live = Live(tree, console=self.console, refresh_per_second=4)
self._live.start()
else:
# Update existing Live session
self._live.update(tree, refresh=True)
return # Nothing else to do
# Case 2: blank line while a live session is running ignore so we
# don't break the in-place rendering behaviour
if len(args) == 0 and self._live:
return
# Case 3: printing something other than a Tree → terminate live session
if self._live:
self._live.stop()
self._live = None
# Finally, pass through to the regular Console.print implementation
"""Print to console with consistent formatting if verbose is enabled."""
self.console.print(*args, **kwargs)
def print_panel(
@@ -203,7 +156,7 @@ class ConsoleFormatter:
task_content = Text()
task_content.append(f"📋 Task: {task_id}", style="yellow bold")
task_content.append("\nStatus: ", style="white")
task_content.append("\n Status: ", style="white")
task_content.append("Executing Task...", style="yellow dim")
task_branch = None
@@ -243,17 +196,11 @@ class ConsoleFormatter:
# Update tree label
for branch in crew_tree.children:
if str(task_id) in str(branch.label):
# Build label without introducing stray blank lines
task_content = Text()
# First line: Task ID
task_content.append(f"📋 Task: {task_id}", style=f"{style} bold")
# Second line: Assigned to
task_content.append("\nAssigned to: ", style="white")
task_content.append("\n Assigned to: ", style="white")
task_content.append(agent_role, style=style)
# Third line: Status
task_content.append("Status: ", style="white")
task_content.append("\n Status: ", style="white")
task_content.append(status_text, style=f"{style} bold")
branch.label = task_content
self.print(crew_tree)
@@ -272,16 +219,18 @@ class ConsoleFormatter:
if not self.verbose or not task_branch or not crew_tree:
return None
# Instead of creating a separate Agent node, we treat the task branch
# itself as the logical agent branch so that Reasoning/Tool nodes are
# nested under the task without an extra visual level.
agent_branch = task_branch.add("")
self.update_tree_label(
agent_branch, "🤖 Agent:", agent_role, "green", "In Progress"
)
# Store the task branch as the current_agent_branch for future nesting.
self.current_agent_branch = task_branch
self.print(crew_tree)
self.print()
# No additional tree modification needed; return the task branch so
# caller logic remains unchanged.
return task_branch
# Set the current_agent_branch attribute directly
self.current_agent_branch = agent_branch
return agent_branch
def update_agent_status(
self,
@@ -291,10 +240,19 @@ class ConsoleFormatter:
status: str = "completed",
) -> None:
"""Update agent status in the tree."""
# We no longer render a separate agent branch, so this method simply
# updates the stored branch reference (already the task branch) without
# altering the tree. Keeping it a no-op avoids duplicate status lines.
return
if not self.verbose or agent_branch is None or crew_tree is None:
return
self.update_tree_label(
agent_branch,
"🤖 Agent:",
agent_role,
"green",
"✅ Completed" if status == "completed" else "❌ Failed",
)
self.print(crew_tree)
self.print()
def create_flow_tree(self, flow_name: str, flow_id: str) -> Optional[Tree]:
"""Create and initialize a flow tree."""
@@ -307,7 +265,7 @@ class ConsoleFormatter:
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(flow_name, style="blue")
flow_label.append("\nID: ", style="white")
flow_label.append("\n ID: ", style="white")
flow_label.append(flow_id, style="blue")
flow_tree = Tree(flow_label)
@@ -322,7 +280,7 @@ class ConsoleFormatter:
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(flow_name, style="blue")
flow_label.append("\nID: ", style="white")
flow_label.append("\n ID: ", style="white")
flow_label.append(flow_id, style="blue")
flow_tree.label = flow_label
@@ -436,22 +394,12 @@ class ConsoleFormatter:
if not self.verbose:
return None
# Parent for tool usage: LiteAgent > Agent > Task
branch_to_use = (
self.current_lite_agent_branch
or agent_branch
or self.current_task_branch
)
# Render full crew tree when available for consistent live updates
tree_to_use = self.current_crew_tree or crew_tree or branch_to_use
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
# If we don't have a valid branch, default to crew_tree if provided
if crew_tree is not None:
branch_to_use = tree_to_use = crew_tree
else:
return None
return None
# Update tool usage count
self.tool_usage_counts[tool_name] = self.tool_usage_counts.get(tool_name, 0) + 1
@@ -470,9 +418,10 @@ class ConsoleFormatter:
"yellow",
)
# Print updated tree immediately
self.print(tree_to_use)
self.print()
# Only print if this is a new tool usage
if tool_branch not in branch_to_use.children:
self.print(tree_to_use)
self.print()
return tool_branch
@@ -486,8 +435,8 @@ class ConsoleFormatter:
if not self.verbose or tool_branch is None:
return
# Decide which tree to render: prefer full crew tree, else parent branch
tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
if tree_to_use is None:
return
@@ -518,8 +467,8 @@ class ConsoleFormatter:
if not self.verbose:
return
# Decide which tree to render: prefer full crew tree, else parent branch
tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
if tool_branch:
self.update_tree_label(
@@ -547,22 +496,12 @@ class ConsoleFormatter:
if not self.verbose:
return None
# Parent for tool usage: LiteAgent > Agent > Task
branch_to_use = (
self.current_lite_agent_branch
or agent_branch
or self.current_task_branch
)
# Render full crew tree when available for consistent live updates
tree_to_use = self.current_crew_tree or crew_tree or branch_to_use
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
# If we don't have a valid branch, default to crew_tree if provided
if crew_tree is not None:
branch_to_use = tree_to_use = crew_tree
else:
return None
return None
# Only add thinking status if we don't have a current tool branch
if self.current_tool_branch is None:
@@ -584,33 +523,24 @@ class ConsoleFormatter:
if not self.verbose or tool_branch is None:
return
# Decide which tree to render: prefer full crew tree, else parent branch
tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch
if tree_to_use is None:
# Use LiteAgent branch if available, otherwise use regular agent branch
branch_to_use = self.current_lite_agent_branch or agent_branch
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
return
# Remove the thinking status node when complete
# Remove the thinking status node when complete, but only if it exists
if "Thinking" in str(tool_branch.label):
parents = [
self.current_lite_agent_branch,
self.current_agent_branch,
self.current_task_branch,
tree_to_use,
]
removed = False
for parent in parents:
if isinstance(parent, Tree) and tool_branch in parent.children:
parent.children.remove(tool_branch)
removed = True
break
# Clear pointer if we just removed the current_tool_branch
if self.current_tool_branch is tool_branch:
self.current_tool_branch = None
if removed:
self.print(tree_to_use)
self.print()
try:
# Check if the node is actually in the children list
if tool_branch in branch_to_use.children:
branch_to_use.children.remove(tool_branch)
self.print(tree_to_use)
self.print()
except Exception:
# If any error occurs during removal, just continue without removing
pass
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
@@ -619,8 +549,8 @@ class ConsoleFormatter:
if not self.verbose:
return
# Decide which tree to render: prefer full crew tree, else parent branch
tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch
# Use LiteAgent branch if available, otherwise use crew tree
tree_to_use = self.current_lite_agent_branch or crew_tree
# Update tool branch if it exists
if tool_branch:
@@ -662,7 +592,7 @@ class ConsoleFormatter:
test_label = Text()
test_label.append("🧪 Test: ", style="blue bold")
test_label.append(crew_name or "Crew", style="blue")
test_label.append("\nStatus: ", style="white")
test_label.append("\n Status: ", style="white")
test_label.append("In Progress", style="yellow")
test_tree = Tree(test_label)
@@ -684,7 +614,7 @@ class ConsoleFormatter:
test_label = Text()
test_label.append("✅ Test: ", style="green bold")
test_label.append(crew_name or "Crew", style="green")
test_label.append("\nStatus: ", style="white")
test_label.append("\n Status: ", style="white")
test_label.append("Completed", style="green bold")
flow_tree.label = test_label
@@ -773,7 +703,7 @@ class ConsoleFormatter:
lite_agent_label = Text()
lite_agent_label.append("🤖 LiteAgent: ", style="cyan bold")
lite_agent_label.append(lite_agent_role, style="cyan")
lite_agent_label.append("\nStatus: ", style="white")
lite_agent_label.append("\n Status: ", style="white")
lite_agent_label.append("In Progress", style="yellow")
lite_agent_tree = Tree(lite_agent_label)
@@ -812,7 +742,7 @@ class ConsoleFormatter:
lite_agent_label = Text()
lite_agent_label.append(f"{prefix} ", style=f"{style} bold")
lite_agent_label.append(lite_agent_role, style=style)
lite_agent_label.append("Status: ", style="white")
lite_agent_label.append("\n Status: ", style="white")
lite_agent_label.append(status_text, style=f"{style} bold")
lite_agent_branch.label = lite_agent_label
@@ -867,7 +797,7 @@ class ConsoleFormatter:
tree_to_use = branch_to_use or crew_tree
if branch_to_use is None or tree_to_use is None:
# If we don't have a valid branch, default to crew_tree if provided
# If we don't have a valid branch, use crew_tree as the branch if available
if crew_tree is not None:
branch_to_use = tree_to_use = crew_tree
else:
@@ -1052,124 +982,3 @@ class ConsoleFormatter:
"Knowledge Search Failed", "Search Error", "red", Error=error
)
self.print_panel(error_content, "Search Error", "red")
# ----------- AGENT REASONING EVENTS -----------
def handle_reasoning_started(
self,
agent_branch: Optional[Tree],
attempt: int,
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle agent reasoning started (or refinement) event."""
if not self.verbose:
return None
# Prefer LiteAgent > Agent > Task branch as the parent for reasoning
branch_to_use = (
self.current_lite_agent_branch
or agent_branch
or self.current_task_branch
)
# We always want to render the full crew tree when possible so the
# Live view updates coherently. Fallbacks: crew tree → branch itself.
tree_to_use = self.current_crew_tree or crew_tree or branch_to_use
if branch_to_use is None:
# Nothing to attach to, abort
return None
# Reuse existing reasoning branch if present
reasoning_branch = self.current_reasoning_branch
if reasoning_branch is None:
reasoning_branch = branch_to_use.add("")
self.current_reasoning_branch = reasoning_branch
# Build label text depending on attempt
status_text = (
f"Reasoning (Attempt {attempt})" if attempt > 1 else "Reasoning..."
)
self.update_tree_label(reasoning_branch, "🧠", status_text, "blue")
self.print(tree_to_use)
self.print()
return reasoning_branch
def handle_reasoning_completed(
self,
plan: str,
ready: bool,
crew_tree: Optional[Tree],
) -> None:
"""Handle agent reasoning completed event."""
if not self.verbose:
return
reasoning_branch = self.current_reasoning_branch
tree_to_use = (
self.current_crew_tree
or self.current_lite_agent_branch
or self.current_task_branch
or crew_tree
)
style = "green" if ready else "yellow"
status_text = "Reasoning Completed" if ready else "Reasoning Completed (Not Ready)"
if reasoning_branch is not None:
self.update_tree_label(reasoning_branch, "", status_text, style)
if tree_to_use is not None:
self.print(tree_to_use)
# Show plan in a panel (trim very long plans)
if plan:
plan_panel = Panel(
Text(plan, style="white"),
title="🧠 Reasoning Plan",
border_style=style,
padding=(1, 2),
)
self.print(plan_panel)
self.print()
# Clear stored branch after completion
self.current_reasoning_branch = None
def handle_reasoning_failed(
self,
error: str,
crew_tree: Optional[Tree],
) -> None:
"""Handle agent reasoning failure event."""
if not self.verbose:
return
reasoning_branch = self.current_reasoning_branch
tree_to_use = (
self.current_crew_tree
or self.current_lite_agent_branch
or self.current_task_branch
or crew_tree
)
if reasoning_branch is not None:
self.update_tree_label(reasoning_branch, "", "Reasoning Failed", "red")
if tree_to_use is not None:
self.print(tree_to_use)
# Error panel
error_content = self.create_status_content(
"Reasoning Failed",
"Error",
"red",
Error=error,
)
self.print_panel(error_content, "Reasoning Error", "red")
# Clear stored branch after failure
self.current_reasoning_branch = None

View File

@@ -1,387 +0,0 @@
import logging
import json
from typing import Tuple, cast
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities import I18N
from crewai.llm import LLM
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
)
class ReasoningPlan(BaseModel):
"""Model representing a reasoning plan for a task."""
plan: str = Field(description="The detailed reasoning plan for the task.")
ready: bool = Field(description="Whether the agent is ready to execute the task.")
class AgentReasoningOutput(BaseModel):
"""Model representing the output of the agent reasoning process."""
plan: ReasoningPlan = Field(description="The reasoning plan for the task.")
class ReasoningFunction(BaseModel):
"""Model for function calling with reasoning."""
plan: str = Field(description="The detailed reasoning plan for the task.")
ready: bool = Field(description="Whether the agent is ready to execute the task.")
class AgentReasoning:
"""
Handles the agent reasoning process, enabling an agent to reflect and create a plan
before executing a task.
"""
def __init__(self, task: Task, agent: Agent):
if not task or not agent:
raise ValueError("Both task and agent must be provided.")
self.task = task
self.agent = agent
self.llm = cast(LLM, agent.llm)
self.logger = logging.getLogger(__name__)
self.i18n = I18N()
def handle_agent_reasoning(self) -> AgentReasoningOutput:
"""
Public method for the reasoning process that creates and refines a plan
for the task until the agent is ready to execute it.
Returns:
AgentReasoningOutput: The output of the agent reasoning process.
"""
# Emit a reasoning started event (attempt 1)
try:
crewai_event_bus.emit(
self.agent,
AgentReasoningStartedEvent(
agent_role=self.agent.role,
task_id=str(self.task.id),
attempt=1,
),
)
except Exception:
# Ignore event bus errors to avoid breaking execution
pass
try:
output = self.__handle_agent_reasoning()
# Emit reasoning completed event
try:
crewai_event_bus.emit(
self.agent,
AgentReasoningCompletedEvent(
agent_role=self.agent.role,
task_id=str(self.task.id),
plan=output.plan.plan,
ready=output.plan.ready,
attempt=1,
),
)
except Exception:
pass
return output
except Exception as e:
# Emit reasoning failed event
try:
crewai_event_bus.emit(
self.agent,
AgentReasoningFailedEvent(
agent_role=self.agent.role,
task_id=str(self.task.id),
error=str(e),
attempt=1,
),
)
except Exception:
pass
raise
def __handle_agent_reasoning(self) -> AgentReasoningOutput:
"""
Private method that handles the agent reasoning process.
Returns:
AgentReasoningOutput: The output of the agent reasoning process.
"""
plan, ready = self.__create_initial_plan()
plan, ready = self.__refine_plan_if_needed(plan, ready)
reasoning_plan = ReasoningPlan(plan=plan, ready=ready)
return AgentReasoningOutput(plan=reasoning_plan)
def __create_initial_plan(self) -> Tuple[str, bool]:
"""
Creates the initial reasoning plan for the task.
Returns:
Tuple[str, bool]: The initial plan and whether the agent is ready to execute the task.
"""
reasoning_prompt = self.__create_reasoning_prompt()
if self.llm.supports_function_calling():
plan, ready = self.__call_with_function(reasoning_prompt, "initial_plan")
return plan, ready
else:
system_prompt = self.i18n.retrieve("reasoning", "initial_plan").format(
role=self.agent.role,
goal=self.agent.goal,
backstory=self.__get_agent_backstory()
)
response = self.llm.call(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": reasoning_prompt}
]
)
return self.__parse_reasoning_response(str(response))
def __refine_plan_if_needed(self, plan: str, ready: bool) -> Tuple[str, bool]:
"""
Refines the reasoning plan if the agent is not ready to execute the task.
Args:
plan: The current reasoning plan.
ready: Whether the agent is ready to execute the task.
Returns:
Tuple[str, bool]: The refined plan and whether the agent is ready to execute the task.
"""
attempt = 1
max_attempts = self.agent.max_reasoning_attempts
while not ready and (max_attempts is None or attempt < max_attempts):
# Emit event for each refinement attempt
try:
crewai_event_bus.emit(
self.agent,
AgentReasoningStartedEvent(
agent_role=self.agent.role,
task_id=str(self.task.id),
attempt=attempt + 1,
),
)
except Exception:
pass
refine_prompt = self.__create_refine_prompt(plan)
if self.llm.supports_function_calling():
plan, ready = self.__call_with_function(refine_prompt, "refine_plan")
else:
system_prompt = self.i18n.retrieve("reasoning", "refine_plan").format(
role=self.agent.role,
goal=self.agent.goal,
backstory=self.__get_agent_backstory()
)
response = self.llm.call(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": refine_prompt}
]
)
plan, ready = self.__parse_reasoning_response(str(response))
attempt += 1
if max_attempts is not None and attempt >= max_attempts:
self.logger.warning(
f"Agent reasoning reached maximum attempts ({max_attempts}) without being ready. Proceeding with current plan."
)
break
return plan, ready
def __call_with_function(self, prompt: str, prompt_type: str) -> Tuple[str, bool]:
"""
Calls the LLM with function calling to get a reasoning plan.
Args:
prompt: The prompt to send to the LLM.
prompt_type: The type of prompt (initial_plan or refine_plan).
Returns:
Tuple[str, bool]: A tuple containing the plan and whether the agent is ready.
"""
self.logger.debug(f"Using function calling for {prompt_type} reasoning")
function_schema = {
"type": "function",
"function": {
"name": "create_reasoning_plan",
"description": "Create or refine a reasoning plan for a task",
"parameters": {
"type": "object",
"properties": {
"plan": {
"type": "string",
"description": "The detailed reasoning plan for the task."
},
"ready": {
"type": "boolean",
"description": "Whether the agent is ready to execute the task."
}
},
"required": ["plan", "ready"]
}
}
}
try:
system_prompt = self.i18n.retrieve("reasoning", prompt_type).format(
role=self.agent.role,
goal=self.agent.goal,
backstory=self.__get_agent_backstory()
)
# Prepare a simple callable that just returns the tool arguments as JSON
def _create_reasoning_plan(plan: str, ready: bool): # noqa: N802
"""Return the reasoning plan result in JSON string form."""
return json.dumps({"plan": plan, "ready": ready})
response = self.llm.call(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
tools=[function_schema],
available_functions={"create_reasoning_plan": _create_reasoning_plan},
)
self.logger.debug(f"Function calling response: {response[:100]}...")
try:
result = json.loads(response)
if "plan" in result and "ready" in result:
return result["plan"], result["ready"]
except (json.JSONDecodeError, KeyError):
pass
response_str = str(response)
return response_str, "READY: I am ready to execute the task." in response_str
except Exception as e:
self.logger.warning(f"Error during function calling: {str(e)}. Falling back to text parsing.")
try:
system_prompt = self.i18n.retrieve("reasoning", prompt_type).format(
role=self.agent.role,
goal=self.agent.goal,
backstory=self.__get_agent_backstory()
)
fallback_response = self.llm.call(
[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
)
fallback_str = str(fallback_response)
return fallback_str, "READY: I am ready to execute the task." in fallback_str
except Exception as inner_e:
self.logger.error(f"Error during fallback text parsing: {str(inner_e)}")
return "Failed to generate a plan due to an error.", True # Default to ready to avoid getting stuck
def __get_agent_backstory(self) -> str:
"""
Safely gets the agent's backstory, providing a default if not available.
Returns:
str: The agent's backstory or a default value.
"""
return getattr(self.agent, "backstory", "No backstory provided")
def __create_reasoning_prompt(self) -> str:
"""
Creates a prompt for the agent to reason about the task.
Returns:
str: The reasoning prompt.
"""
available_tools = self.__format_available_tools()
return self.i18n.retrieve("reasoning", "create_plan_prompt").format(
role=self.agent.role,
goal=self.agent.goal,
backstory=self.__get_agent_backstory(),
description=self.task.description,
expected_output=self.task.expected_output,
tools=available_tools
)
def __format_available_tools(self) -> str:
"""
Formats the available tools for inclusion in the prompt.
Returns:
str: Comma-separated list of tool names.
"""
try:
return ', '.join([tool.name for tool in (self.task.tools or [])])
except (AttributeError, TypeError):
return "No tools available"
def __create_refine_prompt(self, current_plan: str) -> str:
"""
Creates a prompt for the agent to refine its reasoning plan.
Args:
current_plan: The current reasoning plan.
Returns:
str: The refine prompt.
"""
return self.i18n.retrieve("reasoning", "refine_plan_prompt").format(
role=self.agent.role,
goal=self.agent.goal,
backstory=self.__get_agent_backstory(),
current_plan=current_plan
)
def __parse_reasoning_response(self, response: str) -> Tuple[str, bool]:
"""
Parses the reasoning response to extract the plan and whether
the agent is ready to execute the task.
Args:
response: The LLM response.
Returns:
Tuple[str, bool]: The plan and whether the agent is ready to execute the task.
"""
if not response:
return "No plan was generated.", False
plan = response
ready = False
if "READY: I am ready to execute the task." in response:
ready = True
return plan, ready
def _handle_agent_reasoning(self) -> AgentReasoningOutput:
"""
Deprecated method for backward compatibility.
Use handle_agent_reasoning() instead.
Returns:
AgentReasoningOutput: The output of the agent reasoning process.
"""
self.logger.warning(
"The _handle_agent_reasoning method is deprecated. Use handle_agent_reasoning instead."
)
return self.handle_agent_reasoning()

View File

@@ -1,261 +0,0 @@
"""Tests for reasoning in agents."""
import json
import pytest
from crewai import Agent, Task
from crewai.llm import LLM
from crewai.utilities.reasoning_handler import AgentReasoning
@pytest.fixture
def mock_llm_responses():
"""Fixture for mock LLM responses."""
return {
"ready": "I'll solve this simple math problem.\n\nREADY: I am ready to execute the task.\n\n",
"not_ready": "I need to think about derivatives.\n\nNOT READY: I need to refine my plan because I'm not sure about the derivative rules.",
"ready_after_refine": "I'll use the power rule for derivatives where d/dx(x^n) = n*x^(n-1).\n\nREADY: I am ready to execute the task.",
"execution": "4"
}
def test_agent_with_reasoning(mock_llm_responses):
"""Test agent with reasoning."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning feature",
backstory="I am a test agent created to verify the reasoning feature works correctly.",
llm=llm,
reasoning=True,
verbose=True
)
task = Task(
description="Simple math task: What's 2+2?",
expected_output="The answer should be a number.",
agent=agent
)
agent.llm.call = lambda messages, *args, **kwargs: (
mock_llm_responses["ready"]
if any("create a detailed plan" in msg.get("content", "") for msg in messages)
else mock_llm_responses["execution"]
)
result = agent.execute_task(task)
assert result == mock_llm_responses["execution"]
assert "Reasoning Plan:" in task.description
def test_agent_with_reasoning_not_ready_initially(mock_llm_responses):
"""Test agent with reasoning that requires refinement."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning feature",
backstory="I am a test agent created to verify the reasoning feature works correctly.",
llm=llm,
reasoning=True,
max_reasoning_attempts=2,
verbose=True
)
task = Task(
description="Complex math task: What's the derivative of x²?",
expected_output="The answer should be a mathematical expression.",
agent=agent
)
call_count = [0]
def mock_llm_call(messages, *args, **kwargs):
if any("create a detailed plan" in msg.get("content", "") for msg in messages) or any("refine your plan" in msg.get("content", "") for msg in messages):
call_count[0] += 1
if call_count[0] == 1:
return mock_llm_responses["not_ready"]
else:
return mock_llm_responses["ready_after_refine"]
else:
return "2x"
agent.llm.call = mock_llm_call
result = agent.execute_task(task)
assert result == "2x"
assert call_count[0] == 2 # Should have made 2 reasoning calls
assert "Reasoning Plan:" in task.description
def test_agent_with_reasoning_max_attempts_reached():
"""Test agent with reasoning that reaches max attempts without being ready."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning feature",
backstory="I am a test agent created to verify the reasoning feature works correctly.",
llm=llm,
reasoning=True,
max_reasoning_attempts=2,
verbose=True
)
task = Task(
description="Complex math task: Solve the Riemann hypothesis.",
expected_output="A proof or disproof of the hypothesis.",
agent=agent
)
call_count = [0]
def mock_llm_call(messages, *args, **kwargs):
if any("create a detailed plan" in msg.get("content", "") for msg in messages) or any("refine your plan" in msg.get("content", "") for msg in messages):
call_count[0] += 1
return f"Attempt {call_count[0]}: I need more time to think.\n\nNOT READY: I need to refine my plan further."
else:
return "This is an unsolved problem in mathematics."
agent.llm.call = mock_llm_call
result = agent.execute_task(task)
assert result == "This is an unsolved problem in mathematics."
assert call_count[0] == 2 # Should have made exactly 2 reasoning calls (max_attempts)
assert "Reasoning Plan:" in task.description
def test_agent_reasoning_input_validation():
"""Test input validation in AgentReasoning."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning feature",
backstory="I am a test agent created to verify the reasoning feature works correctly.",
llm=llm,
reasoning=True
)
with pytest.raises(ValueError, match="Both task and agent must be provided"):
AgentReasoning(task=None, agent=agent)
task = Task(
description="Simple task",
expected_output="Simple output"
)
with pytest.raises(ValueError, match="Both task and agent must be provided"):
AgentReasoning(task=task, agent=None)
def test_agent_reasoning_error_handling():
"""Test error handling during the reasoning process."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning feature",
backstory="I am a test agent created to verify the reasoning feature works correctly.",
llm=llm,
reasoning=True
)
task = Task(
description="Task that will cause an error",
expected_output="Output that will never be generated",
agent=agent
)
call_count = [0]
def mock_llm_call_error(*args, **kwargs):
call_count[0] += 1
if call_count[0] <= 2: # First calls are for reasoning
raise Exception("LLM error during reasoning")
return "Fallback execution result" # Return a value for task execution
agent.llm.call = mock_llm_call_error
result = agent.execute_task(task)
assert result == "Fallback execution result"
assert call_count[0] > 2 # Ensure we called the mock multiple times
def test_agent_with_function_calling():
"""Test agent with reasoning using function calling."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning feature",
backstory="I am a test agent created to verify the reasoning feature works correctly.",
llm=llm,
reasoning=True,
verbose=True
)
task = Task(
description="Simple math task: What's 2+2?",
expected_output="The answer should be a number.",
agent=agent
)
agent.llm.supports_function_calling = lambda: True
def mock_function_call(messages, *args, **kwargs):
if "tools" in kwargs:
return json.dumps({
"plan": "I'll solve this simple math problem: 2+2=4.",
"ready": True
})
else:
return "4"
agent.llm.call = mock_function_call
result = agent.execute_task(task)
assert result == "4"
assert "Reasoning Plan:" in task.description
assert "I'll solve this simple math problem: 2+2=4." in task.description
def test_agent_with_function_calling_fallback():
"""Test agent with reasoning using function calling that falls back to text parsing."""
llm = LLM("gpt-3.5-turbo")
agent = Agent(
role="Test Agent",
goal="To test the reasoning feature",
backstory="I am a test agent created to verify the reasoning feature works correctly.",
llm=llm,
reasoning=True,
verbose=True
)
task = Task(
description="Simple math task: What's 2+2?",
expected_output="The answer should be a number.",
agent=agent
)
agent.llm.supports_function_calling = lambda: True
def mock_function_call(messages, *args, **kwargs):
if "tools" in kwargs:
return "Invalid JSON that will trigger fallback. READY: I am ready to execute the task."
else:
return "4"
agent.llm.call = mock_function_call
result = agent.execute_task(task)
assert result == "4"
assert "Reasoning Plan:" in task.description
assert "Invalid JSON that will trigger fallback" in task.description

View File

@@ -1,96 +0,0 @@
"""Test the markdown attribute in Task class."""
import pytest
from pydantic import BaseModel
from crewai import Agent, Task
@pytest.mark.parametrize(
"markdown_enabled,should_contain_instructions",
[
(True, True),
(False, False),
],
)
def test_markdown_option_in_task_prompt(markdown_enabled, should_contain_instructions):
"""Test that markdown flag correctly controls the inclusion of markdown formatting instructions."""
researcher = Agent(
role="Researcher",
goal="Research a topic",
backstory="You're a researcher specialized in providing well-formatted content.",
allow_delegation=False,
)
task = Task(
description="Research advances in AI in 2023",
expected_output="A summary of key AI advances in 2023",
markdown=markdown_enabled,
agent=researcher,
)
prompt = task.prompt()
assert "Research advances in AI in 2023" in prompt
assert "A summary of key AI advances in 2023" in prompt
if should_contain_instructions:
assert "Your final answer MUST be formatted in Markdown syntax." in prompt
assert "Use # for headers" in prompt
assert "Use ** for bold text" in prompt
else:
assert "Your final answer MUST be formatted in Markdown syntax." not in prompt
def test_markdown_with_empty_description():
"""Test markdown formatting with empty description."""
researcher = Agent(
role="Researcher",
goal="Research a topic",
backstory="You're a researcher.",
allow_delegation=False,
)
task = Task(
description="",
expected_output="A summary",
markdown=True,
agent=researcher,
)
prompt = task.prompt()
assert prompt.strip() != ""
assert "A summary" in prompt
assert "Your final answer MUST be formatted in Markdown syntax." in prompt
def test_markdown_with_complex_output_format():
"""Test markdown with JSON output format to ensure compatibility."""
class ResearchOutput(BaseModel):
title: str
findings: list[str]
researcher = Agent(
role="Researcher",
goal="Research a topic",
backstory="You're a researcher.",
allow_delegation=False,
)
task = Task(
description="Research topic",
expected_output="Research results",
markdown=True,
output_json=ResearchOutput,
agent=researcher,
)
prompt = task.prompt()
assert "Your final answer MUST be formatted in Markdown syntax." in prompt
assert "Research topic" in prompt
assert "Research results" in prompt