Compare commits

...

19 Commits

Author SHA1 Message Date
Devin AI
0c862fd5e1 Trigger CI re-run
Co-Authored-By: João <joao@crewai.com>
2025-11-13 12:53:07 +00:00
Devin AI
6442a88856 Fix Azure structured outputs to use json_object instead of json_schema
- Azure AI Inference SDK doesn't support json_schema response_format
- Changed to use json_object format with client-side Pydantic validation
- Added comprehensive tests for structured outputs with Azure models
- Fixes issue #3906

Co-Authored-By: João <joao@crewai.com>
2025-11-13 12:45:55 +00:00
Greyson LaLonde
ffd717c51a fix: custom tool docs links, add mintlify broken links action (#3903)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
* fix: update docs links to point to correct endpoints

* fix: update all broken doc links
2025-11-12 22:55:10 -08:00
Heitor Carvalho
fbe4aa4bd1 feat: fetch and store more data about okta authorization server (#3894)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-11-12 15:28:00 -03:00
Lorenze Jay
c205d2e8de feat: implement before and after LLM call hooks in CrewAgentExecutor (#3893)
- Added support for before and after LLM call hooks to allow modification of messages and responses during LLM interactions.
- Introduced LLMCallHookContext to provide hooks with access to the executor state, enabling in-place modifications of messages.
- Updated get_llm_response function to utilize the new hooks, ensuring that modifications persist across iterations.
- Enhanced tests to verify the functionality of the hooks and their error handling capabilities, ensuring robust execution flow.
2025-11-12 08:38:13 -08:00
Daniel Barreto
fcb5b19b2e Enhance schema description of QdrantVectorSearchTool (#3891)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-11-11 14:33:33 -08:00
Rip&Tear
01f0111d52 dependabot.yml creation (#3868)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* dependabot.yml creation

* Configure dependabot for pip package updates

Co-authored-by: matt <matt@crewai.com>

* Fix Dependabot package ecosystem

* Refactor: Use uv package-ecosystem in dependabot

Co-authored-by: matt <matt@crewai.com>

* fix: ensure dependabot uses uv ecosystem

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: matt <matt@crewai.com>
2025-11-11 12:14:16 +08:00
Lorenze Jay
6b52587c67 feat: expose messages to TaskOutput and LiteAgentOutputs (#3880)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
* feat: add messages to task and agent outputs

- Introduced a new  field in  and  to capture messages from the last task execution.
- Updated the  class to store the last messages and provide a property for easy access.
- Enhanced the  and  classes to include messages in their outputs.
- Added tests to ensure that messages are correctly included in task outputs and agent outputs during execution.

* using typing_extensions for 3.10 compatability

* feat: add last_messages attribute to agent for improved task tracking

- Introduced a new `last_messages` attribute in the agent class to store messages from the last task execution.
- Updated the `Crew` class to handle the new messages attribute in task outputs.
- Enhanced existing tests to ensure that the `last_messages` attribute is correctly initialized and utilized across various guardrail scenarios.

* fix: add messages field to TaskOutput in tests for consistency

- Updated multiple test cases to include the new `messages` field in the `TaskOutput` instances.
- Ensured that all relevant tests reflect the latest changes in the TaskOutput structure, maintaining consistency across the test suite.
- This change aligns with the recent addition of the `last_messages` attribute in the agent class for improved task tracking.

* feat: preserve messages in task outputs during replay

- Added functionality to the Crew class to store and retrieve messages in task outputs.
- Enhanced the replay mechanism to ensure that messages from stored task outputs are preserved and accessible.
- Introduced a new test case to verify that messages are correctly stored and replayed, ensuring consistency in task execution and output handling.
- This change improves the overall tracking and context retention of task interactions within the CrewAI framework.

* fix original test, prev was debugging
2025-11-10 17:38:30 -08:00
Lorenze Jay
629f7f34ce docs: enhance task guardrail documentation with LLM-based validation support (#3879)
- Added section on LLM-based guardrails, explaining their usage and requirements.
- Updated examples to demonstrate the implementation of multiple guardrails, including both function-based and LLM-based approaches.
- Clarified the distinction between single and multiple guardrails in task configurations.
- Improved explanations of guardrail functionality to ensure better understanding of validation processes.
2025-11-10 15:35:42 -08:00
Lorenze Jay
0f1c173d02 feat: bump versions to 1.4.1 (#3862)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
* feat: bump versions to 1.4.1

* chore: update crewAI tools dependency to version 1.4.1 in project templates
2025-11-07 11:19:07 -08:00
Greyson LaLonde
19c5b9a35e fix: properly handle agent max iterations
fixes #3847
2025-11-07 13:54:11 -05:00
Greyson LaLonde
1ed307b58c fix: route llm model syntax to litellm
* fix: route llm model syntax to litellm

* wip: add list of supported models
2025-11-07 13:34:15 -05:00
Lorenze Jay
d29867bbb6 chore: update version numbers to 1.4.0
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-11-06 23:04:44 -05:00
Lorenze Jay
b2c278ed22 refactor: improve MCP tool execution handling with concurrent futures (#3854)
- Enhanced the MCP tool execution in both synchronous and asynchronous contexts by utilizing  for better event loop management.
- Updated error handling to provide clearer messages for connection issues and task cancellations.
- Added tests to validate MCP tool execution in both sync and async scenarios, ensuring robust functionality across different contexts.
2025-11-06 19:28:08 -08:00
Greyson LaLonde
f6aed9798b feat: allow non-ast plot routes 2025-11-06 21:17:29 -05:00
Greyson LaLonde
40a2d387a1 fix: keep stopwords updated 2025-11-06 21:10:25 -05:00
Lorenze Jay
6f36d7003b Lorenze/feat mcp first class support (#3850)
* WIP transport support mcp

* refactor: streamline MCP tool loading and error handling

* linted

* Self type from typing with typing_extensions in MCP transport modules

* added tests for mcp setup

* added tests for mcp setup

* docs: enhance MCP overview with detailed integration examples and structured configurations

* feat: implement MCP event handling and logging in event listener and client

- Added MCP event types and handlers for connection and tool execution events.
- Enhanced MCPClient to emit events on connection status and tool execution.
- Updated ConsoleFormatter to handle MCP event logging.
- Introduced new MCP event types for better integration and monitoring.
2025-11-06 17:45:16 -08:00
Greyson LaLonde
9e5906c52f feat: add pydantic validation dunder to BaseInterceptor
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
2025-11-06 15:27:07 -05:00
Lorenze Jay
fc521839e4 Lorenze/fix duplicating doc ids for knowledge (#3840)
* fix: update document ID handling in ChromaDB utility functions to use SHA-256 hashing and include index for uniqueness

* test: add tests for hash-based ID generation in ChromaDB utility functions

* drop idx for preventing dups, upsert should handle dups

* fix: update document ID extraction logic in ChromaDB utility functions to check for doc_id at the top level of the document

* fix: enhance document ID generation in ChromaDB utility functions to deduplicate documents and ensure unique hash-based IDs without suffixes

* fix: improve error handling and document ID generation in ChromaDB utility functions to ensure robust processing and uniqueness
2025-11-06 10:59:52 -08:00
129 changed files with 11472 additions and 2917 deletions

11
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
version: 2
updates:
- package-ecosystem: uv # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"

35
.github/workflows/docs-broken-links.yml vendored Normal file
View File

@@ -0,0 +1,35 @@
name: Check Documentation Broken Links
on:
pull_request:
paths:
- "docs/**"
- "docs.json"
push:
branches:
- main
paths:
- "docs/**"
- "docs.json"
workflow_dispatch:
jobs:
check-links:
name: Check broken links
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Node
uses: actions/setup-node@v4
with:
node-version: "latest"
- name: Install Mintlify CLI
run: npm i -g mintlify
- name: Run broken link checker
run: |
# Auto-answer the prompt with yes command
yes "" | mintlify broken-links || test $? -eq 141
working-directory: ./docs

View File

@@ -739,7 +739,7 @@ class KnowledgeMonitorListener(BaseEventListener):
knowledge_monitor = KnowledgeMonitorListener()
```
For more information on using events, see the [Event Listeners](https://docs.crewai.com/concepts/event-listener) documentation.
For more information on using events, see the [Event Listeners](/en/concepts/event-listener) documentation.
### Custom Knowledge Sources

View File

@@ -1035,7 +1035,7 @@ CrewAI supports streaming responses from LLMs, allowing your application to rece
```
<Tip>
[Click here](https://docs.crewai.com/concepts/event-listener#event-listeners) for more details
[Click here](/en/concepts/event-listener#event-listeners) for more details
</Tip>
</Tab>

View File

@@ -60,6 +60,7 @@ crew = Crew(
| **Output Pydantic** _(optional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | A Pydantic model for task output. |
| **Callback** _(optional)_ | `callback` | `Optional[Any]` | Function/object to be executed after task completion. |
| **Guardrail** _(optional)_ | `guardrail` | `Optional[Callable]` | Function to validate task output before proceeding to next task. |
| **Guardrails** _(optional)_ | `guardrails` | `Optional[List[Callable] | List[str]]` | List of guardrails to validate task output before proceeding to next task. |
| **Guardrail Max Retries** _(optional)_ | `guardrail_max_retries` | `Optional[int]` | Maximum number of retries when guardrail validation fails. Defaults to 3. |
<Note type="warning" title="Deprecated: max_retries">
@@ -223,6 +224,7 @@ By default, the `TaskOutput` will only include the `raw` output. A `TaskOutput`
| **JSON Dict** | `json_dict` | `Optional[Dict[str, Any]]` | A dictionary representing the JSON output of the task. |
| **Agent** | `agent` | `str` | The agent that executed the task. |
| **Output Format** | `output_format` | `OutputFormat` | The format of the task output, with options including RAW, JSON, and Pydantic. The default is RAW. |
| **Messages** | `messages` | `list[LLMMessage]` | The messages from the last task execution. |
### Task Methods and Properties
@@ -341,7 +343,11 @@ Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
feedback to agents when their output doesn't meet specific criteria.
Guardrails are implemented as Python functions that contain custom validation logic, giving you complete control over the validation process and ensuring reliable, deterministic results.
CrewAI supports two types of guardrails:
1. **Function-based guardrails**: Python functions with custom validation logic, giving you complete control over the validation process and ensuring reliable, deterministic results.
2. **LLM-based guardrails**: String descriptions that use the agent's LLM to validate outputs based on natural language criteria. These are ideal for complex or subjective validation requirements.
### Function-Based Guardrails
@@ -355,12 +361,12 @@ def validate_blog_content(result: TaskOutput) -> Tuple[bool, Any]:
"""Validate blog content meets requirements."""
try:
# Check word count
word_count = len(result.split())
word_count = len(result.raw.split())
if word_count > 200:
return (False, "Blog content exceeds 200 words")
# Additional validation logic here
return (True, result.strip())
return (True, result.raw.strip())
except Exception as e:
return (False, "Unexpected error during validation")
@@ -372,6 +378,147 @@ blog_task = Task(
)
```
### LLM-Based Guardrails (String Descriptions)
Instead of writing custom validation functions, you can use string descriptions that leverage LLM-based validation. When you provide a string to the `guardrail` or `guardrails` parameter, CrewAI automatically creates an `LLMGuardrail` that uses the agent's LLM to validate the output based on your description.
**Requirements**:
- The task must have an `agent` assigned (the guardrail uses the agent's LLM)
- Provide a clear, descriptive string explaining the validation criteria
```python Code
from crewai import Task
# Single LLM-based guardrail
blog_task = Task(
description="Write a blog post about AI",
expected_output="A blog post under 200 words",
agent=blog_agent,
guardrail="The blog post must be under 200 words and contain no technical jargon"
)
```
LLM-based guardrails are particularly useful for:
- **Complex validation logic** that's difficult to express programmatically
- **Subjective criteria** like tone, style, or quality assessments
- **Natural language requirements** that are easier to describe than code
The LLM guardrail will:
1. Analyze the task output against your description
2. Return `(True, output)` if the output complies with the criteria
3. Return `(False, feedback)` with specific feedback if validation fails
**Example with detailed validation criteria**:
```python Code
research_task = Task(
description="Research the latest developments in quantum computing",
expected_output="A comprehensive research report",
agent=researcher_agent,
guardrail="""
The research report must:
- Be at least 1000 words long
- Include at least 5 credible sources
- Cover both technical and practical applications
- Be written in a professional, academic tone
- Avoid speculation or unverified claims
"""
)
```
### Multiple Guardrails
You can apply multiple guardrails to a task using the `guardrails` parameter. Multiple guardrails are executed sequentially, with each guardrail receiving the output from the previous one. This allows you to chain validation and transformation steps.
The `guardrails` parameter accepts:
- A list of guardrail functions or string descriptions
- A single guardrail function or string (same as `guardrail`)
**Note**: If `guardrails` is provided, it takes precedence over `guardrail`. The `guardrail` parameter will be ignored when `guardrails` is set.
```python Code
from typing import Tuple, Any
from crewai import TaskOutput, Task
def validate_word_count(result: TaskOutput) -> Tuple[bool, Any]:
"""Validate word count is within limits."""
word_count = len(result.raw.split())
if word_count < 100:
return (False, f"Content too short: {word_count} words. Need at least 100 words.")
if word_count > 500:
return (False, f"Content too long: {word_count} words. Maximum is 500 words.")
return (True, result.raw)
def validate_no_profanity(result: TaskOutput) -> Tuple[bool, Any]:
"""Check for inappropriate language."""
profanity_words = ["badword1", "badword2"] # Example list
content_lower = result.raw.lower()
for word in profanity_words:
if word in content_lower:
return (False, f"Inappropriate language detected: {word}")
return (True, result.raw)
def format_output(result: TaskOutput) -> Tuple[bool, Any]:
"""Format and clean the output."""
formatted = result.raw.strip()
# Capitalize first letter
formatted = formatted[0].upper() + formatted[1:] if formatted else formatted
return (True, formatted)
# Apply multiple guardrails sequentially
blog_task = Task(
description="Write a blog post about AI",
expected_output="A well-formatted blog post between 100-500 words",
agent=blog_agent,
guardrails=[
validate_word_count, # First: validate length
validate_no_profanity, # Second: check content
format_output # Third: format the result
],
guardrail_max_retries=3
)
```
In this example, the guardrails execute in order:
1. `validate_word_count` checks the word count
2. `validate_no_profanity` checks for inappropriate language (using the output from step 1)
3. `format_output` formats the final result (using the output from step 2)
If any guardrail fails, the error is sent back to the agent, and the task is retried up to `guardrail_max_retries` times.
**Mixing function-based and LLM-based guardrails**:
You can combine both function-based and string-based guardrails in the same list:
```python Code
from typing import Tuple, Any
from crewai import TaskOutput, Task
def validate_word_count(result: TaskOutput) -> Tuple[bool, Any]:
"""Validate word count is within limits."""
word_count = len(result.raw.split())
if word_count < 100:
return (False, f"Content too short: {word_count} words. Need at least 100 words.")
if word_count > 500:
return (False, f"Content too long: {word_count} words. Maximum is 500 words.")
return (True, result.raw)
# Mix function-based and LLM-based guardrails
blog_task = Task(
description="Write a blog post about AI",
expected_output="A well-formatted blog post between 100-500 words",
agent=blog_agent,
guardrails=[
validate_word_count, # Function-based: precise word count check
"The content must be engaging and suitable for a general audience", # LLM-based: subjective quality check
"The writing style should be clear, concise, and free of technical jargon" # LLM-based: style validation
],
guardrail_max_retries=3
)
```
This approach combines the precision of programmatic validation with the flexibility of LLM-based assessment for subjective criteria.
### Guardrail Function Requirements
1. **Function Signature**:

View File

@@ -37,7 +37,7 @@ you can use them locally or refine them to your needs.
<Card title="Tools & Integrations" href="/en/enterprise/features/tools-and-integrations" icon="wrench">
Connect external apps and manage internal tools your agents can use.
</Card>
<Card title="Tool Repository" href="/en/enterprise/features/tool-repository" icon="toolbox">
<Card title="Tool Repository" href="/en/enterprise/guides/tool-repository#tool-repository" icon="toolbox">
Publish and install tools to enhance your crews' capabilities.
</Card>
<Card title="Agents Repository" href="/en/enterprise/features/agent-repositories" icon="people-group">

View File

@@ -241,7 +241,7 @@ Tools & Integrations is the central hub for connecting thirdparty apps and ma
## Related
<CardGroup cols={2}>
<Card title="Tool Repository" href="/en/enterprise/features/tool-repository" icon="toolbox">
<Card title="Tool Repository" href="/en/enterprise/guides/tool-repository#tool-repository" icon="toolbox">
Create, publish, and version custom tools for your organization.
</Card>
<Card title="Webhook Automation" href="/en/enterprise/guides/webhook-automation" icon="bolt">

View File

@@ -21,7 +21,7 @@ The repository is not a version control system. Use Git to track code changes an
Before using the Tool Repository, ensure you have:
- A [CrewAI AMP](https://app.crewai.com) account
- [CrewAI CLI](https://docs.crewai.com/concepts/cli#cli) installed
- [CrewAI CLI](/en/concepts/cli#cli) installed
- uv>=0.5.0 installed. Check out [how to upgrade](https://docs.astral.sh/uv/getting-started/installation/#upgrading-uv)
- [Git](https://git-scm.com) installed and configured
- Access permissions to publish or install tools in your CrewAI AMP organization
@@ -112,7 +112,7 @@ By default, tools are published as private. To make a tool public:
crewai tool publish --public
```
For more details on how to build tools, see [Creating your own tools](https://docs.crewai.com/concepts/tools#creating-your-own-tools).
For more details on how to build tools, see [Creating your own tools](/en/concepts/tools#creating-your-own-tools).
## Updating Tools

View File

@@ -49,7 +49,7 @@ mode: "wide"
To integrate human input into agent execution, set the `human_input` flag in the task definition. When enabled, the agent prompts the user for input before delivering its final answer. This input can provide extra context, clarify ambiguities, or validate the agent's output.
For detailed implementation guidance, see our [Human-in-the-Loop guide](/en/how-to/human-in-the-loop).
For detailed implementation guidance, see our [Human-in-the-Loop guide](/en/enterprise/guides/human-in-the-loop).
</Accordion>
<Accordion title="What advanced customization options are available for tailoring and enhancing agent behavior and capabilities in CrewAI?">
@@ -142,7 +142,7 @@ mode: "wide"
<Accordion title="How can I create custom tools for my CrewAI agents?">
You can create custom tools by subclassing the `BaseTool` class provided by CrewAI or by using the tool decorator. Subclassing involves defining a new class that inherits from `BaseTool`, specifying the name, description, and the `_run` method for operational logic. The tool decorator allows you to create a `Tool` object directly with the required attributes and a functional logic.
<Card href="https://docs.crewai.com/how-to/create-custom-tools" icon="code">CrewAI Tools Guide</Card>
<Card href="/en/learn/create-custom-tools" icon="code">CrewAI Tools Guide</Card>
</Accordion>
<Accordion title="How can you control the maximum number of requests per minute that the entire crew can perform?">

View File

@@ -97,7 +97,7 @@ project_crew = Crew(
```
<Tip>
For more details on creating and customizing a manager agent, check out the [Custom Manager Agent documentation](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent).
For more details on creating and customizing a manager agent, check out the [Custom Manager Agent documentation](/en/learn/custom-manager-agent).
</Tip>

View File

@@ -11,9 +11,13 @@ The [Model Context Protocol](https://modelcontextprotocol.io/introduction) (MCP)
CrewAI offers **two approaches** for MCP integration:
### Simple DSL Integration** (Recommended)
### 🚀 **Simple DSL Integration** (Recommended)
Use the `mcps` field directly on agents for seamless MCP tool integration:
Use the `mcps` field directly on agents for seamless MCP tool integration. The DSL supports both **string references** (for quick setup) and **structured configurations** (for full control).
#### String-Based References (Quick Setup)
Perfect for remote HTTPS servers and CrewAI AMP marketplace:
```python
from crewai import Agent
@@ -32,6 +36,46 @@ agent = Agent(
# MCP tools are now automatically available to your agent!
```
#### Structured Configurations (Full Control)
For complete control over connection settings, tool filtering, and all transport types:
```python
from crewai import Agent
from crewai.mcp import MCPServerStdio, MCPServerHTTP, MCPServerSSE
from crewai.mcp.filters import create_static_tool_filter
agent = Agent(
role="Advanced Research Analyst",
goal="Research with full control over MCP connections",
backstory="Expert researcher with advanced tool access",
mcps=[
# Stdio transport for local servers
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
env={"API_KEY": "your_key"},
tool_filter=create_static_tool_filter(
allowed_tool_names=["read_file", "list_directory"]
),
cache_tools_list=True,
),
# HTTP/Streamable HTTP transport for remote servers
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your_token"},
streamable=True,
cache_tools_list=True,
),
# SSE transport for real-time streaming
MCPServerSSE(
url="https://stream.example.com/mcp/sse",
headers={"Authorization": "Bearer your_token"},
),
]
)
```
### 🔧 **Advanced: MCPServerAdapter** (For Complex Scenarios)
For advanced use cases requiring manual connection management, the `crewai-tools` library provides the `MCPServerAdapter` class.
@@ -68,12 +112,14 @@ uv pip install 'crewai-tools[mcp]'
## Quick Start: Simple DSL Integration
The easiest way to integrate MCP servers is using the `mcps` field on your agents:
The easiest way to integrate MCP servers is using the `mcps` field on your agents. You can use either string references or structured configurations.
### Quick Start with String References
```python
from crewai import Agent, Task, Crew
# Create agent with MCP tools
# Create agent with MCP tools using string references
research_agent = Agent(
role="Research Analyst",
goal="Find and analyze information using advanced search tools",
@@ -96,13 +142,53 @@ crew = Crew(agents=[research_agent], tasks=[research_task])
result = crew.kickoff()
```
### Quick Start with Structured Configurations
```python
from crewai import Agent, Task, Crew
from crewai.mcp import MCPServerStdio, MCPServerHTTP, MCPServerSSE
# Create agent with structured MCP configurations
research_agent = Agent(
role="Research Analyst",
goal="Find and analyze information using advanced search tools",
backstory="Expert researcher with access to multiple data sources",
mcps=[
# Local stdio server
MCPServerStdio(
command="python",
args=["local_server.py"],
env={"API_KEY": "your_key"},
),
# Remote HTTP server
MCPServerHTTP(
url="https://api.research.com/mcp",
headers={"Authorization": "Bearer your_token"},
),
]
)
# Create task
research_task = Task(
description="Research the latest developments in AI agent frameworks",
expected_output="Comprehensive research report with citations",
agent=research_agent
)
# Create and run crew
crew = Crew(agents=[research_agent], tasks=[research_task])
result = crew.kickoff()
```
That's it! The MCP tools are automatically discovered and available to your agent.
## MCP Reference Formats
The `mcps` field supports various reference formats for maximum flexibility:
The `mcps` field supports both **string references** (for quick setup) and **structured configurations** (for full control). You can mix both formats in the same list.
### External MCP Servers
### String-Based References
#### External MCP Servers
```python
mcps=[
@@ -117,7 +203,7 @@ mcps=[
]
```
### CrewAI AMP Marketplace
#### CrewAI AMP Marketplace
```python
mcps=[
@@ -133,17 +219,166 @@ mcps=[
]
```
### Mixed References
### Structured Configurations
#### Stdio Transport (Local Servers)
Perfect for local MCP servers that run as processes:
```python
from crewai.mcp import MCPServerStdio
from crewai.mcp.filters import create_static_tool_filter
mcps=[
"https://external-api.com/mcp", # External server
"https://weather.service.com/mcp#forecast", # Specific external tool
"crewai-amp:financial-insights", # AMP service
"crewai-amp:data-analysis#sentiment_tool" # Specific AMP tool
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
env={"API_KEY": "your_key"},
tool_filter=create_static_tool_filter(
allowed_tool_names=["read_file", "write_file"]
),
cache_tools_list=True,
),
# Python-based server
MCPServerStdio(
command="python",
args=["path/to/server.py"],
env={"UV_PYTHON": "3.12", "API_KEY": "your_key"},
),
]
```
#### HTTP/Streamable HTTP Transport (Remote Servers)
For remote MCP servers over HTTP/HTTPS:
```python
from crewai.mcp import MCPServerHTTP
mcps=[
# Streamable HTTP (default)
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your_token"},
streamable=True,
cache_tools_list=True,
),
# Standard HTTP
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer your_token"},
streamable=False,
),
]
```
#### SSE Transport (Real-Time Streaming)
For remote servers using Server-Sent Events:
```python
from crewai.mcp import MCPServerSSE
mcps=[
MCPServerSSE(
url="https://stream.example.com/mcp/sse",
headers={"Authorization": "Bearer your_token"},
cache_tools_list=True,
),
]
```
### Mixed References
You can combine string references and structured configurations:
```python
from crewai.mcp import MCPServerStdio, MCPServerHTTP
mcps=[
# String references
"https://external-api.com/mcp", # External server
"crewai-amp:financial-insights", # AMP service
# Structured configurations
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
),
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer token"},
),
]
```
### Tool Filtering
Structured configurations support advanced tool filtering:
```python
from crewai.mcp import MCPServerStdio
from crewai.mcp.filters import create_static_tool_filter, create_dynamic_tool_filter, ToolFilterContext
# Static filtering (allow/block lists)
static_filter = create_static_tool_filter(
allowed_tool_names=["read_file", "write_file"],
blocked_tool_names=["delete_file"],
)
# Dynamic filtering (context-aware)
def dynamic_filter(context: ToolFilterContext, tool: dict) -> bool:
# Block dangerous tools for certain agent roles
if context.agent.role == "Code Reviewer":
if "delete" in tool.get("name", "").lower():
return False
return True
mcps=[
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
tool_filter=static_filter, # or dynamic_filter
),
]
```
## Configuration Parameters
Each transport type supports specific configuration options:
### MCPServerStdio Parameters
- **`command`** (required): Command to execute (e.g., `"python"`, `"node"`, `"npx"`, `"uvx"`)
- **`args`** (optional): List of command arguments (e.g., `["server.py"]` or `["-y", "@mcp/server"]`)
- **`env`** (optional): Dictionary of environment variables to pass to the process
- **`tool_filter`** (optional): Tool filter function for filtering available tools
- **`cache_tools_list`** (optional): Whether to cache the tool list for faster subsequent access (default: `False`)
### MCPServerHTTP Parameters
- **`url`** (required): Server URL (e.g., `"https://api.example.com/mcp"`)
- **`headers`** (optional): Dictionary of HTTP headers for authentication or other purposes
- **`streamable`** (optional): Whether to use streamable HTTP transport (default: `True`)
- **`tool_filter`** (optional): Tool filter function for filtering available tools
- **`cache_tools_list`** (optional): Whether to cache the tool list for faster subsequent access (default: `False`)
### MCPServerSSE Parameters
- **`url`** (required): Server URL (e.g., `"https://api.example.com/mcp/sse"`)
- **`headers`** (optional): Dictionary of HTTP headers for authentication or other purposes
- **`tool_filter`** (optional): Tool filter function for filtering available tools
- **`cache_tools_list`** (optional): Whether to cache the tool list for faster subsequent access (default: `False`)
### Common Parameters
All transport types support:
- **`tool_filter`**: Filter function to control which tools are available. Can be:
- `None` (default): All tools are available
- Static filter: Created with `create_static_tool_filter()` for allow/block lists
- Dynamic filter: Created with `create_dynamic_tool_filter()` for context-aware filtering
- **`cache_tools_list`**: When `True`, caches the tool list after first discovery to improve performance on subsequent connections
## Key Features
- 🔄 **Automatic Tool Discovery**: Tools are automatically discovered and integrated
@@ -152,26 +387,47 @@ mcps=[
- 🛡️ **Error Resilience**: Graceful handling of unavailable servers
- ⏱️ **Timeout Protection**: Built-in timeouts prevent hanging connections
- 📊 **Transparent Integration**: Works seamlessly with existing CrewAI features
- 🔧 **Full Transport Support**: Stdio, HTTP/Streamable HTTP, and SSE transports
- 🎯 **Advanced Filtering**: Static and dynamic tool filtering capabilities
- 🔐 **Flexible Authentication**: Support for headers, environment variables, and query parameters
## Error Handling
The MCP DSL integration is designed to be resilient:
The MCP DSL integration is designed to be resilient and handles failures gracefully:
```python
from crewai import Agent
from crewai.mcp import MCPServerStdio, MCPServerHTTP
agent = Agent(
role="Resilient Agent",
goal="Continue working despite server issues",
backstory="Agent that handles failures gracefully",
mcps=[
# String references
"https://reliable-server.com/mcp", # Will work
"https://unreachable-server.com/mcp", # Will be skipped gracefully
"https://slow-server.com/mcp", # Will timeout gracefully
"crewai-amp:working-service" # Will work
"crewai-amp:working-service", # Will work
# Structured configs
MCPServerStdio(
command="python",
args=["reliable_server.py"], # Will work
),
MCPServerHTTP(
url="https://slow-server.com/mcp", # Will timeout gracefully
),
]
)
# Agent will use tools from working servers and log warnings for failing ones
```
All connection errors are handled gracefully:
- **Connection failures**: Logged as warnings, agent continues with available tools
- **Timeout errors**: Connections timeout after 30 seconds (configurable)
- **Authentication errors**: Logged clearly for debugging
- **Invalid configurations**: Validation errors are raised at agent creation time
## Advanced: MCPServerAdapter
For complex scenarios requiring manual connection management, use the `MCPServerAdapter` class from `crewai-tools`. Using a Python context manager (`with` statement) is the recommended approach as it automatically handles starting and stopping the connection to the MCP server.

View File

@@ -733,9 +733,7 @@ Here's a basic configuration to route requests to OpenAI, specifically using GPT
- Collect relevant metadata to filter logs
- Enforce access permissions
Create API keys through:
- [Portkey App](https://app.portkey.ai/)
- [API Key Management API](/en/api-reference/admin-api/control-plane/api-keys/create-api-key)
Create API keys through the [Portkey App](https://app.portkey.ai/)
Example using Python SDK:
```python
@@ -758,7 +756,7 @@ Here's a basic configuration to route requests to OpenAI, specifically using GPT
)
```
For detailed key management instructions, see our [API Keys documentation](/en/api-reference/admin-api/control-plane/api-keys/create-api-key).
For detailed key management instructions, see the [Portkey documentation](https://portkey.ai/docs).
</Accordion>
<Accordion title="Step 4: Deploy & Monitor">

View File

@@ -18,7 +18,7 @@ These tools enable your agents to interact with cloud services, access cloud sto
Write and upload files to Amazon S3 storage.
</Card>
<Card title="Bedrock Invoke Agent" icon="aws" href="/en/tools/cloud-storage/bedrockinvokeagenttool">
<Card title="Bedrock Invoke Agent" icon="aws" href="/en/tools/integration/bedrockinvokeagenttool">
Invoke Amazon Bedrock agents for AI-powered tasks.
</Card>

View File

@@ -632,11 +632,11 @@ mode: "wide"
## 기여
기여를 원하시면, [기여 가이드](CONTRIBUTING.md)를 참조하세요.
기여를 원하시면, [기여 가이드](https://github.com/crewAIInc/crewAI/blob/main/CONTRIBUTING.md)를 참조하세요.
## 라이센스
이 프로젝트는 MIT 라이센스 하에 배포됩니다. 자세한 내용은 [LICENSE](LICENSE) 파일을 확인하세요.
이 프로젝트는 MIT 라이센스 하에 배포됩니다. 자세한 내용은 [LICENSE](https://github.com/crewAIInc/crewAI/blob/main/LICENSE) 파일을 확인하세요.
</Update>
<Update label="2025년 5월 22일">

View File

@@ -706,7 +706,7 @@ class KnowledgeMonitorListener(BaseEventListener):
knowledge_monitor = KnowledgeMonitorListener()
```
이벤트 사용에 대한 자세한 내용은 [이벤트 리스너](https://docs.crewai.com/concepts/event-listener) 문서를 참고하세요.
이벤트 사용에 대한 자세한 내용은 [이벤트 리스너](/ko/concepts/event-listener) 문서를 참고하세요.
### 맞춤형 지식 소스

View File

@@ -748,7 +748,7 @@ CrewAI는 LLM의 스트리밍 응답을 지원하여, 애플리케이션이 출
```
<Tip>
[자세한 내용은 여기를 클릭하세요](https://docs.crewai.com/concepts/event-listener#event-listeners)
[자세한 내용은 여기를 클릭하세요](/ko/concepts/event-listener#event-listeners)
</Tip>
</Tab>

View File

@@ -36,7 +36,7 @@ mode: "wide"
<Card title="도구 & 통합" href="/ko/enterprise/features/tools-and-integrations" icon="wrench">
에이전트가 사용할 외부 앱 연결 및 내부 도구 관리.
</Card>
<Card title="도구 저장소" href="/ko/enterprise/features/tool-repository" icon="toolbox">
<Card title="도구 저장소" href="/ko/enterprise/guides/tool-repository" icon="toolbox">
크루 기능을 확장할 수 있도록 도구를 게시하고 설치.
</Card>
<Card title="에이전트 저장소" href="/ko/enterprise/features/agent-repositories" icon="people-group">

View File

@@ -231,7 +231,7 @@ mode: "wide"
## 관련 문서
<CardGroup cols={2}>
<Card title="도구 저장소" href="/ko/enterprise/features/tool-repository" icon="toolbox">
<Card title="도구 저장소" href="/ko/enterprise/guides/tool-repository" icon="toolbox">
크루 기능을 확장할 수 있도록 도구를 게시하고 설치하세요.
</Card>
<Card title="Webhook 자동화" href="/ko/enterprise/guides/webhook-automation" icon="bolt">

View File

@@ -21,7 +21,7 @@ Tool Repository는 CrewAI 도구를 위한 패키지 관리자입니다. 사용
Tool Repository를 사용하기 전에 다음이 준비되어 있어야 합니다:
- [CrewAI AMP](https://app.crewai.com) 계정
- [CrewAI CLI](https://docs.crewai.com/concepts/cli#cli) 설치됨
- [CrewAI CLI](/ko/concepts/cli#cli) 설치됨
- uv>=0.5.0 이 설치되어 있어야 합니다. [업그레이드 방법](https://docs.astral.sh/uv/getting-started/installation/#upgrading-uv)을 참고하세요.
- [Git](https://git-scm.com) 설치 및 구성 완료
- CrewAI AMP 조직에서 도구를 게시하거나 설치할 수 있는 액세스 권한
@@ -66,7 +66,7 @@ crewai tool publish
crewai tool publish --public
```
도구 빌드에 대한 자세한 내용은 [나만의 도구 만들기](https://docs.crewai.com/concepts/tools#creating-your-own-tools)를 참고하세요.
도구 빌드에 대한 자세한 내용은 [나만의 도구 만들기](/ko/concepts/tools#creating-your-own-tools)를 참고하세요.
## 도구 업데이트

View File

@@ -49,7 +49,7 @@ mode: "wide"
에이전트 실행에 인간 입력을 통합하려면 작업 정의에서 `human_input` 플래그를 설정하세요. 활성화하면, 에이전트가 최종 답변을 제공하기 전에 사용자에게 입력을 요청합니다. 이 입력은 추가 맥락을 제공하거나, 애매함을 해소하거나, 에이전트의 출력을 검증해야 할 때 활용될 수 있습니다.
자세한 구현 방법은 [Human-in-the-Loop 가이드](/ko/how-to/human-in-the-loop)를 참고해 주세요.
자세한 구현 방법은 [Human-in-the-Loop 가이드](/ko/enterprise/guides/human-in-the-loop)를 참고해 주세요.
</Accordion>
<Accordion title="CrewAI에서 에이전트의 행동과 역량을 맞춤화하고 향상시키기 위한 고급 커스터마이징 옵션에는 어떤 것이 있나요?">
@@ -142,7 +142,7 @@ mode: "wide"
<Accordion title="CrewAI 에이전트를 위한 커스텀 도구는 어떻게 만들 수 있습니까?">
CrewAI에서 제공하는 `BaseTool` 클래스를 상속받아 커스텀 도구를 직접 만들거나, tool 데코레이터를 활용할 수 있습니다. 상속 방식은 `BaseTool`을 상속하는 새로운 클래스를 정의해 이름, 설명, 그리고 실제 논리를 처리하는 `_run` 메서드를 작성합니다. tool 데코레이터를 사용하면 필수 속성과 운영 로직만 정의해 바로 `Tool` 객체를 만들 수 있습니다.
<Card href="https://docs.crewai.com/how-to/create-custom-tools" icon="code">CrewAI 도구 가이드</Card>
<Card href="/ko/learn/create-custom-tools" icon="code">CrewAI 도구 가이드</Card>
</Accordion>
<Accordion title="전체 crew가 수행할 수 있는 분당 최대 요청 수는 어떻게 제한할 수 있나요?">

View File

@@ -95,7 +95,7 @@ project_crew = Crew(
```
<Tip>
매니저 에이전트 생성 및 맞춤화에 대한 자세한 내용은 [커스텀 매니저 에이전트 문서](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent)를 참고하세요.
매니저 에이전트 생성 및 맞춤화에 대한 자세한 내용은 [커스텀 매니저 에이전트 문서](/ko/learn/custom-manager-agent)를 참고하세요.
</Tip>
### 워크플로우 실행

View File

@@ -730,9 +730,7 @@ Portkey 대시보드에서 [구성 페이지](https://app.portkey.ai/configs)에
- 로그를 필터링하기 위한 관련 메타데이터 수집
- 액세스 권한 적용
API 키 생성 방법:
- [Portkey App](https://app.portkey.ai/)
- [API Key Management API](/ko/api-reference/admin-api/control-plane/api-keys/create-api-key)
[Portkey App](https://app.portkey.ai/)를 통해 API 키 생성하세요
Python SDK를 사용한 예시:
```python
@@ -755,7 +753,7 @@ api_key = portkey.api_keys.create(
)
```
자세한 키 관리 방법은 [API 키 문서](/ko/api-reference/admin-api/control-plane/api-keys/create-api-key)를 참조하세요.
자세한 키 관리 방법은 [Portkey 문서](https://portkey.ai/docs)를 참조하세요.
</Accordion>
<Accordion title="4단계: 배포 및 모니터링">

View File

@@ -18,7 +18,7 @@ mode: "wide"
파일을 Amazon S3 스토리지에 작성하고 업로드합니다.
</Card>
<Card title="Bedrock Invoke Agent" icon="aws" href="/ko/tools/cloud-storage/bedrockinvokeagenttool">
<Card title="Bedrock Invoke Agent" icon="aws" href="/ko/tools/integration/bedrockinvokeagenttool">
AI 기반 작업을 위해 Amazon Bedrock 에이전트를 호출합니다.
</Card>

View File

@@ -11,7 +11,7 @@ mode: "wide"
<Card
title="Bedrock Invoke Agent Tool"
icon="cloud"
href="/en/tools/tool-integrations/bedrockinvokeagenttool"
href="/ko/tools/integration/bedrockinvokeagenttool"
color="#0891B2"
>
Invoke Amazon Bedrock Agents from CrewAI to orchestrate actions across AWS services.
@@ -20,7 +20,7 @@ mode: "wide"
<Card
title="CrewAI Automation Tool"
icon="bolt"
href="/en/tools/tool-integrations/crewaiautomationtool"
href="/ko/tools/integration/crewaiautomationtool"
color="#7C3AED"
>
Automate deployment and operations by integrating CrewAI with external platforms and workflows.

View File

@@ -704,7 +704,7 @@ class KnowledgeMonitorListener(BaseEventListener):
knowledge_monitor = KnowledgeMonitorListener()
```
Para mais informações sobre como usar eventos, consulte a documentação [Event Listeners](https://docs.crewai.com/concepts/event-listener).
Para mais informações sobre como usar eventos, consulte a documentação [Event Listeners](/pt-BR/concepts/event-listener).
### Fontes de Knowledge Personalizadas

View File

@@ -725,7 +725,7 @@ O CrewAI suporta respostas em streaming de LLMs, permitindo que sua aplicação
```
<Tip>
[Clique aqui](https://docs.crewai.com/concepts/event-listener#event-listeners) para mais detalhes
[Clique aqui](/pt-BR/concepts/event-listener#event-listeners) para mais detalhes
</Tip>
</Tab>
</Tabs>

View File

@@ -36,7 +36,7 @@ Você também pode baixar templates diretamente do marketplace clicando em `Down
<Card title="Ferramentas & Integrações" href="/pt-BR/enterprise/features/tools-and-integrations" icon="wrench">
Conecte apps externos e gerencie ferramentas internas que seus agentes podem usar.
</Card>
<Card title="Repositório de Ferramentas" href="/pt-BR/enterprise/features/tool-repository" icon="toolbox">
<Card title="Repositório de Ferramentas" href="/pt-BR/enterprise/guides/tool-repository" icon="toolbox">
Publique e instale ferramentas para ampliar as capacidades dos seus crews.
</Card>
<Card title="Repositório de Agentes" href="/pt-BR/enterprise/features/agent-repositories" icon="people-group">

View File

@@ -231,7 +231,7 @@ Ferramentas & Integrações é o hub central para conectar aplicações de terce
## Relacionados
<CardGroup cols={2}>
<Card title="Repositório de Ferramentas" href="/pt-BR/enterprise/features/tool-repository" icon="toolbox">
<Card title="Repositório de Ferramentas" href="/pt-BR/enterprise/guides/tool-repository" icon="toolbox">
Publique e instale ferramentas para ampliar as capacidades dos seus crews.
</Card>
<Card title="Automação com Webhook" href="/pt-BR/enterprise/guides/webhook-automation" icon="bolt">

View File

@@ -21,7 +21,7 @@ O repositório não é um sistema de controle de versões. Use Git para rastrear
Antes de usar o Repositório de Ferramentas, certifique-se de que você possui:
- Uma conta [CrewAI AMP](https://app.crewai.com)
- [CrewAI CLI](https://docs.crewai.com/concepts/cli#cli) instalada
- [CrewAI CLI](/pt-BR/concepts/cli#cli) instalada
- uv>=0.5.0 instalado. Veja [como atualizar](https://docs.astral.sh/uv/getting-started/installation/#upgrading-uv)
- [Git](https://git-scm.com) instalado e configurado
- Permissões de acesso para publicar ou instalar ferramentas em sua organização CrewAI AMP
@@ -66,7 +66,7 @@ Por padrão, as ferramentas são publicadas como privadas. Para tornar uma ferra
crewai tool publish --public
```
Para mais detalhes sobre como construir ferramentas, acesse [Criando suas próprias ferramentas](https://docs.crewai.com/concepts/tools#creating-your-own-tools).
Para mais detalhes sobre como construir ferramentas, acesse [Criando suas próprias ferramentas](/pt-BR/concepts/tools#creating-your-own-tools).
## Atualizando ferramentas

View File

@@ -49,7 +49,7 @@ mode: "wide"
Para integrar a entrada humana na execução do agente, defina a flag `human_input` na definição da tarefa. Quando habilitada, o agente solicitará a entrada do usuário antes de entregar sua resposta final. Essa entrada pode fornecer contexto extra, esclarecer ambiguidades ou validar a saída do agente.
Para orientações detalhadas de implementação, veja nosso [guia Human-in-the-Loop](/pt-BR/how-to/human-in-the-loop).
Para orientações detalhadas de implementação, veja nosso [guia Human-in-the-Loop](/pt-BR/enterprise/guides/human-in-the-loop).
</Accordion>
<Accordion title="Quais opções avançadas de customização estão disponíveis para aprimorar e personalizar o comportamento e as capacidades dos agentes na CrewAI?">
@@ -142,7 +142,7 @@ mode: "wide"
<Accordion title="Como posso criar ferramentas personalizadas para meus agentes CrewAI?">
Você pode criar ferramentas personalizadas herdando da classe `BaseTool` fornecida pela CrewAI ou usando o decorador de ferramenta. Herdar envolve definir uma nova classe que herda de `BaseTool`, especificando o nome, a descrição e o método `_run` para a lógica operacional. O decorador de ferramenta permite criar um objeto `Tool` diretamente com os atributos necessários e uma lógica funcional.
<Card href="https://docs.crewai.com/how-to/create-custom-tools" icon="code">CrewAI Tools Guide</Card>
<Card href="/pt-BR/learn/create-custom-tools" icon="code">CrewAI Tools Guide</Card>
</Accordion>
<Accordion title="Como controlar o número máximo de solicitações por minuto que toda a crew pode realizar?">

View File

@@ -96,7 +96,7 @@ project_crew = Crew(
```
<Tip>
Para mais detalhes sobre a criação e personalização de um agente gerente, confira a [documentação do Custom Manager Agent](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent).
Para mais detalhes sobre a criação e personalização de um agente gerente, confira a [documentação do Custom Manager Agent](/pt-BR/learn/custom-manager-agent).
</Tip>

View File

@@ -733,9 +733,7 @@ Aqui está um exemplo básico para rotear requisições ao OpenAI, usando especi
- Coletam metadados relevantes para filtragem de logs
- Impõem permissões de acesso
Crie chaves de API através de:
- [Portkey App](https://app.portkey.ai/)
- [API Key Management API](/pt-BR/api-reference/admin-api/control-plane/api-keys/create-api-key)
Crie chaves de API através do [Portkey App](https://app.portkey.ai/)
Exemplo usando Python SDK:
```python
@@ -758,7 +756,7 @@ Aqui está um exemplo básico para rotear requisições ao OpenAI, usando especi
)
```
Para instruções detalhadas de gerenciamento de chaves, veja nossa [documentação de API Keys](/pt-BR/api-reference/admin-api/control-plane/api-keys/create-api-key).
Para instruções detalhadas de gerenciamento de chaves, veja a [documentação Portkey](https://portkey.ai/docs).
</Accordion>
<Accordion title="Etapa 4: Implante & Monitore">

View File

@@ -18,7 +18,7 @@ Essas ferramentas permitem que seus agentes interajam com serviços em nuvem, ac
Escreva e faça upload de arquivos para o armazenamento Amazon S3.
</Card>
<Card title="Bedrock Invoke Agent" icon="aws" href="/pt-BR/tools/cloud-storage/bedrockinvokeagenttool">
<Card title="Bedrock Invoke Agent" icon="aws" href="/pt-BR/tools/integration/bedrockinvokeagenttool">
Acione agentes Amazon Bedrock para tarefas orientadas por IA.
</Card>

View File

@@ -11,7 +11,7 @@ mode: "wide"
<Card
title="Bedrock Invoke Agent Tool"
icon="cloud"
href="/en/tools/tool-integrations/bedrockinvokeagenttool"
href="/pt-BR/tools/integration/bedrockinvokeagenttool"
color="#0891B2"
>
Invoke Amazon Bedrock Agents from CrewAI to orchestrate actions across AWS services.
@@ -20,7 +20,7 @@ mode: "wide"
<Card
title="CrewAI Automation Tool"
icon="bolt"
href="/en/tools/tool-integrations/crewaiautomationtool"
href="/pt-BR/tools/integration/crewaiautomationtool"
color="#7C3AED"
>
Automate deployment and operations by integrating CrewAI with external platforms and workflows.

View File

@@ -12,7 +12,7 @@ dependencies = [
"pytube>=15.0.0",
"requests>=2.32.5",
"docker>=7.1.0",
"crewai==1.3.0",
"crewai==1.4.1",
"lancedb>=0.5.4",
"tiktoken>=0.8.0",
"beautifulsoup4>=4.13.4",

View File

@@ -287,4 +287,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.3.0"
__version__ = "1.4.1"

View File

@@ -12,12 +12,16 @@ from pydantic.types import ImportString
class QdrantToolSchema(BaseModel):
query: str = Field(..., description="Query to search in Qdrant DB")
query: str = Field(
..., description="Query to search in Qdrant DB - always required."
)
filter_by: str | None = Field(
default=None, description="Parameter to filter the search by."
default=None,
description="Parameter to filter the search by. When filtering, needs to be used in conjunction with filter_value.",
)
filter_value: Any | None = Field(
default=None, description="Value to filter the search by."
default=None,
description="Value to filter the search by. When filtering, needs to be used in conjunction with filter_by.",
)

View File

@@ -48,7 +48,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.3.0",
"crewai-tools==1.4.1",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.3.0"
__version__ = "1.4.1"
_telemetry_submitted = False

View File

@@ -40,6 +40,16 @@ from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
from crewai.lite_agent import LiteAgent
from crewai.llms.base_llm import BaseLLM
from crewai.mcp import (
MCPClient,
MCPServerConfig,
MCPServerHTTP,
MCPServerSSE,
MCPServerStdio,
)
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.security.fingerprint import Fingerprint
@@ -108,6 +118,8 @@ class Agent(BaseAgent):
"""
_times_executed: int = PrivateAttr(default=0)
_mcp_clients: list[Any] = PrivateAttr(default_factory=list)
_last_messages: list[LLMMessage] = PrivateAttr(default_factory=list)
max_execution_time: int | None = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -526,6 +538,15 @@ class Agent(BaseAgent):
self,
event=AgentExecutionCompletedEvent(agent=self, task=task, output=result),
)
self._last_messages = (
self.agent_executor.messages.copy()
if self.agent_executor and hasattr(self.agent_executor, "messages")
else []
)
self._cleanup_mcp_clients()
return result
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any:
@@ -649,30 +670,70 @@ class Agent(BaseAgent):
self._logger.log("error", f"Error getting platform tools: {e!s}")
return []
def get_mcp_tools(self, mcps: list[str]) -> list[BaseTool]:
"""Convert MCP server references to CrewAI tools."""
def get_mcp_tools(self, mcps: list[str | MCPServerConfig]) -> list[BaseTool]:
"""Convert MCP server references/configs to CrewAI tools.
Supports both string references (backwards compatible) and structured
configuration objects (MCPServerStdio, MCPServerHTTP, MCPServerSSE).
Args:
mcps: List of MCP server references (strings) or configurations.
Returns:
List of BaseTool instances from MCP servers.
"""
all_tools = []
clients = []
for mcp_ref in mcps:
try:
if mcp_ref.startswith("crewai-amp:"):
tools = self._get_amp_mcp_tools(mcp_ref)
elif mcp_ref.startswith("https://"):
tools = self._get_external_mcp_tools(mcp_ref)
else:
continue
for mcp_config in mcps:
if isinstance(mcp_config, str):
tools = self._get_mcp_tools_from_string(mcp_config)
else:
tools, client = self._get_native_mcp_tools(mcp_config)
if client:
clients.append(client)
all_tools.extend(tools)
self._logger.log(
"info", f"Successfully loaded {len(tools)} tools from {mcp_ref}"
)
except Exception as e:
self._logger.log("warning", f"Skipping MCP {mcp_ref} due to error: {e}")
continue
all_tools.extend(tools)
# Store clients for cleanup
self._mcp_clients.extend(clients)
return all_tools
def _cleanup_mcp_clients(self) -> None:
"""Cleanup MCP client connections after task execution."""
if not self._mcp_clients:
return
async def _disconnect_all() -> None:
for client in self._mcp_clients:
if client and hasattr(client, "connected") and client.connected:
await client.disconnect()
try:
asyncio.run(_disconnect_all())
except Exception as e:
self._logger.log("error", f"Error during MCP client cleanup: {e}")
finally:
self._mcp_clients.clear()
def _get_mcp_tools_from_string(self, mcp_ref: str) -> list[BaseTool]:
"""Get tools from legacy string-based MCP references.
This method maintains backwards compatibility with string-based
MCP references (https://... and crewai-amp:...).
Args:
mcp_ref: String reference to MCP server.
Returns:
List of BaseTool instances.
"""
if mcp_ref.startswith("crewai-amp:"):
return self._get_amp_mcp_tools(mcp_ref)
if mcp_ref.startswith("https://"):
return self._get_external_mcp_tools(mcp_ref)
return []
def _get_external_mcp_tools(self, mcp_ref: str) -> list[BaseTool]:
"""Get tools from external HTTPS MCP server with graceful error handling."""
from crewai.tools.mcp_tool_wrapper import MCPToolWrapper
@@ -731,6 +792,164 @@ class Agent(BaseAgent):
)
return []
def _get_native_mcp_tools(
self, mcp_config: MCPServerConfig
) -> tuple[list[BaseTool], Any | None]:
"""Get tools from MCP server using structured configuration.
This method creates an MCP client based on the configuration type,
connects to the server, discovers tools, applies filtering, and
returns wrapped tools along with the client instance for cleanup.
Args:
mcp_config: MCP server configuration (MCPServerStdio, MCPServerHTTP, or MCPServerSSE).
Returns:
Tuple of (list of BaseTool instances, MCPClient instance for cleanup).
"""
from crewai.tools.base_tool import BaseTool
from crewai.tools.mcp_native_tool import MCPNativeTool
if isinstance(mcp_config, MCPServerStdio):
transport = StdioTransport(
command=mcp_config.command,
args=mcp_config.args,
env=mcp_config.env,
)
server_name = f"{mcp_config.command}_{'_'.join(mcp_config.args)}"
elif isinstance(mcp_config, MCPServerHTTP):
transport = HTTPTransport(
url=mcp_config.url,
headers=mcp_config.headers,
streamable=mcp_config.streamable,
)
server_name = self._extract_server_name(mcp_config.url)
elif isinstance(mcp_config, MCPServerSSE):
transport = SSETransport(
url=mcp_config.url,
headers=mcp_config.headers,
)
server_name = self._extract_server_name(mcp_config.url)
else:
raise ValueError(f"Unsupported MCP server config type: {type(mcp_config)}")
client = MCPClient(
transport=transport,
cache_tools_list=mcp_config.cache_tools_list,
)
async def _setup_client_and_list_tools() -> list[dict[str, Any]]:
"""Async helper to connect and list tools in same event loop."""
try:
if not client.connected:
await client.connect()
tools_list = await client.list_tools()
try:
await client.disconnect()
# Small delay to allow background tasks to finish cleanup
# This helps prevent "cancel scope in different task" errors
# when asyncio.run() closes the event loop
await asyncio.sleep(0.1)
except Exception as e:
self._logger.log("error", f"Error during disconnect: {e}")
return tools_list
except Exception as e:
if client.connected:
await client.disconnect()
await asyncio.sleep(0.1)
raise RuntimeError(
f"Error during setup client and list tools: {e}"
) from e
try:
try:
asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
asyncio.run, _setup_client_and_list_tools()
)
tools_list = future.result()
except RuntimeError:
try:
tools_list = asyncio.run(_setup_client_and_list_tools())
except RuntimeError as e:
error_msg = str(e).lower()
if "cancel scope" in error_msg or "task" in error_msg:
raise ConnectionError(
"MCP connection failed due to event loop cleanup issues. "
"This may be due to authentication errors or server unavailability."
) from e
except asyncio.CancelledError as e:
raise ConnectionError(
"MCP connection was cancelled. This may indicate an authentication "
"error or server unavailability."
) from e
if mcp_config.tool_filter:
filtered_tools = []
for tool in tools_list:
if callable(mcp_config.tool_filter):
try:
from crewai.mcp.filters import ToolFilterContext
context = ToolFilterContext(
agent=self,
server_name=server_name,
run_context=None,
)
if mcp_config.tool_filter(context, tool):
filtered_tools.append(tool)
except (TypeError, AttributeError):
if mcp_config.tool_filter(tool):
filtered_tools.append(tool)
else:
# Not callable - include tool
filtered_tools.append(tool)
tools_list = filtered_tools
tools = []
for tool_def in tools_list:
tool_name = tool_def.get("name", "")
if not tool_name:
continue
# Convert inputSchema to Pydantic model if present
args_schema = None
if tool_def.get("inputSchema"):
args_schema = self._json_schema_to_pydantic(
tool_name, tool_def["inputSchema"]
)
tool_schema = {
"description": tool_def.get("description", ""),
"args_schema": args_schema,
}
try:
native_tool = MCPNativeTool(
mcp_client=client,
tool_name=tool_name,
tool_schema=tool_schema,
server_name=server_name,
)
tools.append(native_tool)
except Exception as e:
self._logger.log("error", f"Failed to create native MCP tool: {e}")
continue
return cast(list[BaseTool], tools), client
except Exception as e:
if client.connected:
asyncio.run(client.disconnect())
raise RuntimeError(f"Failed to get native MCP tools: {e}") from e
def _get_amp_mcp_tools(self, amp_ref: str) -> list[BaseTool]:
"""Get tools from CrewAI AMP MCP marketplace."""
# Parse: "crewai-amp:mcp-name" or "crewai-amp:mcp-name#tool_name"
@@ -1129,6 +1348,15 @@ class Agent(BaseAgent):
def set_fingerprint(self, fingerprint: Fingerprint) -> None:
self.security_config.fingerprint = fingerprint
@property
def last_messages(self) -> list[LLMMessage]:
"""Get messages from the last task execution.
Returns:
List of LLM messages from the most recent task execution.
"""
return self._last_messages
def _get_knowledge_search_query(self, task_prompt: str, task: Task) -> str | None:
"""Generate a search query for the knowledge base based on the task description."""
crewai_event_bus.emit(

View File

@@ -25,6 +25,7 @@ from crewai.agents.tools_handler import ToolsHandler
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.knowledge_config import KnowledgeConfig
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.mcp.config import MCPServerConfig
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.security.security_config import SecurityConfig
from crewai.tools.base_tool import BaseTool, Tool
@@ -194,7 +195,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
default=None,
description="List of applications or application/action combinations that the agent can access through CrewAI Platform. Can contain app names (e.g., 'gmail') or specific actions (e.g., 'gmail/send_email')",
)
mcps: list[str] | None = Field(
mcps: list[str | MCPServerConfig] | None = Field(
default=None,
description="List of MCP server references. Supports 'https://server.com/path' for external servers and 'crewai-amp:mcp-name' for AMP marketplace. Use '#tool_name' suffix for specific tools.",
)
@@ -253,20 +254,36 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
@field_validator("mcps")
@classmethod
def validate_mcps(cls, mcps: list[str] | None) -> list[str] | None:
def validate_mcps(
cls, mcps: list[str | MCPServerConfig] | None
) -> list[str | MCPServerConfig] | None:
"""Validate MCP server references and configurations.
Supports both string references (for backwards compatibility) and
structured configuration objects (MCPServerStdio, MCPServerHTTP, MCPServerSSE).
"""
if not mcps:
return mcps
validated_mcps = []
for mcp in mcps:
if mcp.startswith(("https://", "crewai-amp:")):
if isinstance(mcp, str):
if mcp.startswith(("https://", "crewai-amp:")):
validated_mcps.append(mcp)
else:
raise ValueError(
f"Invalid MCP reference: {mcp}. "
"String references must start with 'https://' or 'crewai-amp:'"
)
elif isinstance(mcp, (MCPServerConfig)):
validated_mcps.append(mcp)
else:
raise ValueError(
f"Invalid MCP reference: {mcp}. Must start with 'https://' or 'crewai-amp:'"
f"Invalid MCP configuration: {type(mcp)}. "
"Must be a string reference or MCPServerConfig instance."
)
return list(set(validated_mcps))
return validated_mcps
@model_validator(mode="after")
def validate_and_set_attributes(self) -> Self:
@@ -343,7 +360,7 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
"""Get platform tools for the specified list of applications and/or application/action combinations."""
@abstractmethod
def get_mcp_tools(self, mcps: list[str]) -> list[BaseTool]:
def get_mcp_tools(self, mcps: list[str | MCPServerConfig]) -> list[BaseTool]:
"""Get MCP tools for the specified list of MCP server references."""
def copy(self) -> Self: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"

View File

@@ -38,6 +38,10 @@ from crewai.utilities.agent_utils import (
)
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.llm_call_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.utilities.printer import Printer
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -130,6 +134,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.messages: list[LLMMessage] = []
self.iterations = 0
self.log_error_after = 3
self.before_llm_call_hooks: list[Callable] = []
self.after_llm_call_hooks: list[Callable] = []
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
if self.llm:
# This may be mutating the shared llm object and needs further evaluation
existing_stop = getattr(self.llm, "stop", [])
@@ -214,6 +222,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
llm=self.llm,
callbacks=self.callbacks,
)
break
enforce_rpm_limit(self.request_within_rpm_limit)
@@ -225,8 +234,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
)
formatted_answer = process_llm_response(answer, self.use_stop_words)
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
@@ -258,11 +268,11 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer, tool_result
)
self._invoke_step_callback(formatted_answer)
self._append_message(formatted_answer.text)
self._invoke_step_callback(formatted_answer) # type: ignore[arg-type]
self._append_message(formatted_answer.text) # type: ignore[union-attr,attr-defined]
except OutputParserError as e: # noqa: PERF203
formatted_answer = handle_output_parser_exception(
except OutputParserError as e:
formatted_answer = handle_output_parser_exception( # type: ignore[assignment]
e=e,
messages=self.messages,
iterations=self.iterations,

View File

@@ -1,5 +1,5 @@
import time
from typing import Any
from typing import TYPE_CHECKING, Any, TypeVar, cast
import webbrowser
from pydantic import BaseModel, Field
@@ -13,6 +13,8 @@ from crewai.cli.shared.token_manager import TokenManager
console = Console()
TOauth2Settings = TypeVar("TOauth2Settings", bound="Oauth2Settings")
class Oauth2Settings(BaseModel):
provider: str = Field(
@@ -28,9 +30,15 @@ class Oauth2Settings(BaseModel):
description="OAuth2 audience value, typically used to identify the target API or resource.",
default=None,
)
extra: dict[str, Any] = Field(
description="Extra configuration for the OAuth2 provider.",
default={},
)
@classmethod
def from_settings(cls):
def from_settings(cls: type[TOauth2Settings]) -> TOauth2Settings:
"""Create an Oauth2Settings instance from the CLI settings."""
settings = Settings()
return cls(
@@ -38,12 +46,20 @@ class Oauth2Settings(BaseModel):
domain=settings.oauth2_domain,
client_id=settings.oauth2_client_id,
audience=settings.oauth2_audience,
extra=settings.oauth2_extra,
)
if TYPE_CHECKING:
from crewai.cli.authentication.providers.base_provider import BaseProvider
class ProviderFactory:
@classmethod
def from_settings(cls, settings: Oauth2Settings | None = None):
def from_settings(
cls: type["ProviderFactory"], # noqa: UP037
settings: Oauth2Settings | None = None,
) -> "BaseProvider": # noqa: UP037
settings = settings or Oauth2Settings.from_settings()
import importlib
@@ -53,11 +69,11 @@ class ProviderFactory:
)
provider = getattr(module, f"{settings.provider.capitalize()}Provider")
return provider(settings)
return cast("BaseProvider", provider(settings))
class AuthenticationCommand:
def __init__(self):
def __init__(self) -> None:
self.token_manager = TokenManager()
self.oauth2_provider = ProviderFactory.from_settings()
@@ -84,7 +100,7 @@ class AuthenticationCommand:
timeout=20,
)
response.raise_for_status()
return response.json()
return cast(dict[str, Any], response.json())
def _display_auth_instructions(self, device_code_data: dict[str, str]) -> None:
"""Display the authentication instructions to the user."""

View File

@@ -24,3 +24,7 @@ class BaseProvider(ABC):
@abstractmethod
def get_client_id(self) -> str: ...
def get_required_fields(self) -> list[str]:
"""Returns which provider-specific fields inside the "extra" dict will be required"""
return []

View File

@@ -3,16 +3,16 @@ from crewai.cli.authentication.providers.base_provider import BaseProvider
class OktaProvider(BaseProvider):
def get_authorize_url(self) -> str:
return f"https://{self.settings.domain}/oauth2/default/v1/device/authorize"
return f"{self._oauth2_base_url()}/v1/device/authorize"
def get_token_url(self) -> str:
return f"https://{self.settings.domain}/oauth2/default/v1/token"
return f"{self._oauth2_base_url()}/v1/token"
def get_jwks_url(self) -> str:
return f"https://{self.settings.domain}/oauth2/default/v1/keys"
return f"{self._oauth2_base_url()}/v1/keys"
def get_issuer(self) -> str:
return f"https://{self.settings.domain}/oauth2/default"
return self._oauth2_base_url().removesuffix("/oauth2")
def get_audience(self) -> str:
if self.settings.audience is None:
@@ -27,3 +27,16 @@ class OktaProvider(BaseProvider):
"Client ID is required. Please set it in the configuration."
)
return self.settings.client_id
def get_required_fields(self) -> list[str]:
return ["authorization_server_name", "using_org_auth_server"]
def _oauth2_base_url(self) -> str:
using_org_auth_server = self.settings.extra.get("using_org_auth_server", False)
if using_org_auth_server:
base_url = f"https://{self.settings.domain}/oauth2"
else:
base_url = f"https://{self.settings.domain}/oauth2/{self.settings.extra.get('authorization_server_name', 'default')}"
return f"{base_url}"

View File

@@ -11,18 +11,18 @@ console = Console()
class BaseCommand:
def __init__(self):
def __init__(self) -> None:
self._telemetry = Telemetry()
self._telemetry.set_tracer()
class PlusAPIMixin:
def __init__(self, telemetry):
def __init__(self, telemetry: Telemetry) -> None:
try:
telemetry.set_tracer()
self.plus_api_client = PlusAPI(api_key=get_auth_token())
except Exception:
self._deploy_signup_error_span = telemetry.deploy_signup_error_span()
telemetry.deploy_signup_error_span()
console.print(
"Please sign up/login to CrewAI+ before using the CLI.",
style="bold red",

View File

@@ -2,6 +2,7 @@ import json
from logging import getLogger
from pathlib import Path
import tempfile
from typing import Any
from pydantic import BaseModel, Field
@@ -136,7 +137,12 @@ class Settings(BaseModel):
default=DEFAULT_CLI_SETTINGS["oauth2_domain"],
)
def __init__(self, config_path: Path | None = None, **data):
oauth2_extra: dict[str, Any] = Field(
description="Extra configuration for the OAuth2 provider.",
default={},
)
def __init__(self, config_path: Path | None = None, **data: dict[str, Any]) -> None:
"""Load Settings from config path with fallback support"""
if config_path is None:
config_path = get_writable_config_path()

View File

@@ -1,9 +1,10 @@
from typing import Any
from typing import Any, cast
import requests
from requests.exceptions import JSONDecodeError, RequestException
from rich.console import Console
from crewai.cli.authentication.main import Oauth2Settings, ProviderFactory
from crewai.cli.command import BaseCommand
from crewai.cli.settings.main import SettingsCommand
from crewai.cli.version import get_crewai_version
@@ -13,7 +14,7 @@ console = Console()
class EnterpriseConfigureCommand(BaseCommand):
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.settings_command = SettingsCommand()
@@ -54,25 +55,12 @@ class EnterpriseConfigureCommand(BaseCommand):
except JSONDecodeError as e:
raise ValueError(f"Invalid JSON response from {oauth_endpoint}") from e
required_fields = [
"audience",
"domain",
"device_authorization_client_id",
"provider",
]
missing_fields = [
field for field in required_fields if field not in oauth_config
]
if missing_fields:
raise ValueError(
f"Missing required fields in OAuth2 configuration: {', '.join(missing_fields)}"
)
self._validate_oauth_config(oauth_config)
console.print(
"✅ Successfully retrieved OAuth2 configuration", style="green"
)
return oauth_config
return cast(dict[str, Any], oauth_config)
except RequestException as e:
raise ValueError(f"Failed to connect to enterprise URL: {e!s}") from e
@@ -89,6 +77,7 @@ class EnterpriseConfigureCommand(BaseCommand):
"oauth2_audience": oauth_config["audience"],
"oauth2_client_id": oauth_config["device_authorization_client_id"],
"oauth2_domain": oauth_config["domain"],
"oauth2_extra": oauth_config["extra"],
}
console.print("🔄 Updating local OAuth2 configuration...")
@@ -99,3 +88,38 @@ class EnterpriseConfigureCommand(BaseCommand):
except Exception as e:
raise ValueError(f"Failed to update OAuth2 settings: {e!s}") from e
def _validate_oauth_config(self, oauth_config: dict[str, Any]) -> None:
required_fields = [
"audience",
"domain",
"device_authorization_client_id",
"provider",
"extra",
]
missing_basic_fields = [
field for field in required_fields if field not in oauth_config
]
missing_provider_specific_fields = [
field
for field in self._get_provider_specific_fields(oauth_config["provider"])
if field not in oauth_config.get("extra", {})
]
if missing_basic_fields:
raise ValueError(
f"Missing required fields in OAuth2 configuration: [{', '.join(missing_basic_fields)}]"
)
if missing_provider_specific_fields:
raise ValueError(
f"Missing authentication provider required fields in OAuth2 configuration: [{', '.join(missing_provider_specific_fields)}] (Configured provider: '{oauth_config['provider']}')"
)
def _get_provider_specific_fields(self, provider_name: str) -> list[str]:
provider = ProviderFactory.from_settings(
Oauth2Settings(provider=provider_name, client_id="dummy", domain="dummy")
)
return provider.get_required_fields()

View File

@@ -3,7 +3,7 @@ import subprocess
class Repository:
def __init__(self, path="."):
def __init__(self, path: str = ".") -> None:
self.path = path
if not self.is_git_installed():

View File

@@ -1,3 +1,4 @@
from typing import Any
from urllib.parse import urljoin
import requests
@@ -36,19 +37,21 @@ class PlusAPI:
str(settings.enterprise_base_url) or DEFAULT_CREWAI_ENTERPRISE_URL
)
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
def _make_request(
self, method: str, endpoint: str, **kwargs: Any
) -> requests.Response:
url = urljoin(self.base_url, endpoint)
session = requests.Session()
session.trust_env = False
return session.request(method, url, headers=self.headers, **kwargs)
def login_to_tool_repository(self):
def login_to_tool_repository(self) -> requests.Response:
return self._make_request("POST", f"{self.TOOLS_RESOURCE}/login")
def get_tool(self, handle: str):
def get_tool(self, handle: str) -> requests.Response:
return self._make_request("GET", f"{self.TOOLS_RESOURCE}/{handle}")
def get_agent(self, handle: str):
def get_agent(self, handle: str) -> requests.Response:
return self._make_request("GET", f"{self.AGENTS_RESOURCE}/{handle}")
def publish_tool(
@@ -58,8 +61,8 @@ class PlusAPI:
version: str,
description: str | None,
encoded_file: str,
available_exports: list[str] | None = None,
):
available_exports: list[dict[str, Any]] | None = None,
) -> requests.Response:
params = {
"handle": handle,
"public": is_public,
@@ -111,13 +114,13 @@ class PlusAPI:
def list_crews(self) -> requests.Response:
return self._make_request("GET", self.CREWS_RESOURCE)
def create_crew(self, payload) -> requests.Response:
def create_crew(self, payload: dict[str, Any]) -> requests.Response:
return self._make_request("POST", self.CREWS_RESOURCE, json=payload)
def get_organizations(self) -> requests.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)
def initialize_trace_batch(self, payload) -> requests.Response:
def initialize_trace_batch(self, payload: dict[str, Any]) -> requests.Response:
return self._make_request(
"POST",
f"{self.TRACING_RESOURCE}/batches",
@@ -125,14 +128,18 @@ class PlusAPI:
timeout=30,
)
def initialize_ephemeral_trace_batch(self, payload) -> requests.Response:
def initialize_ephemeral_trace_batch(
self, payload: dict[str, Any]
) -> requests.Response:
return self._make_request(
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches",
json=payload,
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
def send_trace_events(
self, trace_batch_id: str, payload: dict[str, Any]
) -> requests.Response:
return self._make_request(
"POST",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/events",
@@ -141,7 +148,7 @@ class PlusAPI:
)
def send_ephemeral_trace_events(
self, trace_batch_id: str, payload
self, trace_batch_id: str, payload: dict[str, Any]
) -> requests.Response:
return self._make_request(
"POST",
@@ -150,7 +157,9 @@ class PlusAPI:
timeout=30,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
def finalize_trace_batch(
self, trace_batch_id: str, payload: dict[str, Any]
) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
@@ -159,7 +168,7 @@ class PlusAPI:
)
def finalize_ephemeral_trace_batch(
self, trace_batch_id: str, payload
self, trace_batch_id: str, payload: dict[str, Any]
) -> requests.Response:
return self._make_request(
"PATCH",

View File

@@ -34,7 +34,7 @@ class SettingsCommand(BaseCommand):
current_value = getattr(self.settings, field_name)
description = field_info.description or "No description available"
display_value = (
str(current_value) if current_value is not None else "Not set"
str(current_value) if current_value not in [None, {}] else "Not set"
)
table.add_row(field_name, display_value, description)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.3.0"
"crewai[tools]==1.4.1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.3.0"
"crewai[tools]==1.4.1"
]
[project.scripts]

View File

@@ -30,11 +30,11 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
A class to handle tool repository related operations for CrewAI projects.
"""
def __init__(self):
def __init__(self) -> None:
BaseCommand.__init__(self)
PlusAPIMixin.__init__(self, telemetry=self._telemetry)
def create(self, handle: str):
def create(self, handle: str) -> None:
self._ensure_not_in_project()
folder_name = handle.replace(" ", "_").replace("-", "_").lower()
@@ -64,7 +64,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
finally:
os.chdir(old_directory)
def publish(self, is_public: bool, force: bool = False):
def publish(self, is_public: bool, force: bool = False) -> None:
if not git.Repository().is_synced() and not force:
console.print(
"[bold red]Failed to publish tool.[/bold red]\n"
@@ -137,7 +137,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
style="bold green",
)
def install(self, handle: str):
def install(self, handle: str) -> None:
self._print_current_organization()
get_response = self.plus_api_client.get_tool(handle)
@@ -180,7 +180,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
settings.org_name = login_response_json["current_organization"]["name"]
settings.dump()
def _add_package(self, tool_details: dict[str, Any]):
def _add_package(self, tool_details: dict[str, Any]) -> None:
is_from_pypi = tool_details.get("source", None) == "pypi"
tool_handle = tool_details["handle"]
repository_handle = tool_details["repository"]["handle"]
@@ -209,7 +209,7 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
click.echo(add_package_result.stderr, err=True)
raise SystemExit
def _ensure_not_in_project(self):
def _ensure_not_in_project(self) -> None:
if os.path.isfile("./pyproject.toml"):
console.print(
"[bold red]Oops! It looks like you're inside a project.[/bold red]"

View File

@@ -5,7 +5,7 @@ import os
from pathlib import Path
import shutil
import sys
from typing import Any, get_type_hints
from typing import Any, cast, get_type_hints
import click
from rich.console import Console
@@ -23,7 +23,9 @@ if sys.version_info >= (3, 11):
console = Console()
def copy_template(src, dst, name, class_name, folder_name):
def copy_template(
src: Path, dst: Path, name: str, class_name: str, folder_name: str
) -> None:
"""Copy a file from src to dst."""
with open(src, "r") as file:
content = file.read()
@@ -40,13 +42,13 @@ def copy_template(src, dst, name, class_name, folder_name):
click.secho(f" - Created {dst}", fg="green")
def read_toml(file_path: str = "pyproject.toml"):
def read_toml(file_path: str = "pyproject.toml") -> dict[str, Any]:
"""Read the content of a TOML file and return it as a dictionary."""
with open(file_path, "rb") as f:
return tomli.load(f)
def parse_toml(content):
def parse_toml(content: str) -> dict[str, Any]:
if sys.version_info >= (3, 11):
return tomllib.loads(content)
return tomli.loads(content)
@@ -103,7 +105,7 @@ def _get_project_attribute(
)
except Exception as e:
# Handle TOML decode errors for Python 3.11+
if sys.version_info >= (3, 11) and isinstance(e, tomllib.TOMLDecodeError): # type: ignore
if sys.version_info >= (3, 11) and isinstance(e, tomllib.TOMLDecodeError):
console.print(
f"Error: {pyproject_path} is not a valid TOML file.", style="bold red"
)
@@ -126,7 +128,7 @@ def _get_nested_value(data: dict[str, Any], keys: list[str]) -> Any:
return reduce(dict.__getitem__, keys, data)
def fetch_and_json_env_file(env_file_path: str = ".env") -> dict:
def fetch_and_json_env_file(env_file_path: str = ".env") -> dict[str, Any]:
"""Fetch the environment variables from a .env file and return them as a dictionary."""
try:
# Read the .env file
@@ -150,7 +152,7 @@ def fetch_and_json_env_file(env_file_path: str = ".env") -> dict:
return {}
def tree_copy(source, destination):
def tree_copy(source: Path, destination: Path) -> None:
"""Copies the entire directory structure from the source to the destination."""
for item in os.listdir(source):
source_item = os.path.join(source, item)
@@ -161,7 +163,7 @@ def tree_copy(source, destination):
shutil.copy2(source_item, destination_item)
def tree_find_and_replace(directory, find, replace):
def tree_find_and_replace(directory: Path, find: str, replace: str) -> None:
"""Recursively searches through a directory, replacing a target string in
both file contents and filenames with a specified replacement string.
"""
@@ -187,7 +189,7 @@ def tree_find_and_replace(directory, find, replace):
os.rename(old_dirpath, new_dirpath)
def load_env_vars(folder_path):
def load_env_vars(folder_path: Path) -> dict[str, Any]:
"""
Loads environment variables from a .env file in the specified folder path.
@@ -208,7 +210,9 @@ def load_env_vars(folder_path):
return env_vars
def update_env_vars(env_vars, provider, model):
def update_env_vars(
env_vars: dict[str, Any], provider: str, model: str
) -> dict[str, Any] | None:
"""
Updates environment variables with the API key for the selected provider and model.
@@ -220,15 +224,20 @@ def update_env_vars(env_vars, provider, model):
Returns:
- None
"""
api_key_var = ENV_VARS.get(
provider,
[
click.prompt(
f"Enter the environment variable name for your {provider.capitalize()} API key",
type=str,
)
],
)[0]
provider_config = cast(
list[str],
ENV_VARS.get(
provider,
[
click.prompt(
f"Enter the environment variable name for your {provider.capitalize()} API key",
type=str,
)
],
),
)
api_key_var = provider_config[0]
if api_key_var not in env_vars:
try:
@@ -246,7 +255,7 @@ def update_env_vars(env_vars, provider, model):
return env_vars
def write_env_file(folder_path, env_vars):
def write_env_file(folder_path: Path, env_vars: dict[str, Any]) -> None:
"""
Writes environment variables to a .env file in the specified folder.
@@ -342,18 +351,18 @@ def get_crews(crew_path: str = "crew.py", require: bool = False) -> list[Crew]:
return crew_instances
def get_crew_instance(module_attr) -> Crew | None:
def get_crew_instance(module_attr: Any) -> Crew | None:
if (
callable(module_attr)
and hasattr(module_attr, "is_crew_class")
and module_attr.is_crew_class
):
return module_attr().crew()
return cast(Crew, module_attr().crew())
try:
if (ismethod(module_attr) or isfunction(module_attr)) and get_type_hints(
module_attr
).get("return") is Crew:
return module_attr()
return cast(Crew, module_attr())
except Exception:
return None
@@ -362,7 +371,7 @@ def get_crew_instance(module_attr) -> Crew | None:
return None
def fetch_crews(module_attr) -> list[Crew]:
def fetch_crews(module_attr: Any) -> list[Crew]:
crew_instances: list[Crew] = []
if crew_instance := get_crew_instance(module_attr):
@@ -377,7 +386,7 @@ def fetch_crews(module_attr) -> list[Crew]:
return crew_instances
def is_valid_tool(obj):
def is_valid_tool(obj: Any) -> bool:
from crewai.tools.base_tool import Tool
if isclass(obj):
@@ -389,7 +398,7 @@ def is_valid_tool(obj):
return isinstance(obj, Tool)
def extract_available_exports(dir_path: str = "src"):
def extract_available_exports(dir_path: str = "src") -> list[dict[str, Any]]:
"""
Extract available tool classes from the project's __init__.py files.
Only includes classes that inherit from BaseTool or functions decorated with @tool.
@@ -419,7 +428,9 @@ def extract_available_exports(dir_path: str = "src"):
raise SystemExit(1) from e
def build_env_with_tool_repository_credentials(repository_handle: str):
def build_env_with_tool_repository_credentials(
repository_handle: str,
) -> dict[str, Any]:
repository_handle = repository_handle.upper().replace("-", "_")
settings = Settings()
@@ -472,7 +483,7 @@ def _load_tools_from_init(init_file: Path) -> list[dict[str, Any]]:
sys.modules.pop("temp_module", None)
def _print_no_tools_warning():
def _print_no_tools_warning() -> None:
"""
Display warning and usage instructions if no tools were found.
"""

View File

@@ -809,6 +809,7 @@ class Crew(FlowTrackable, BaseModel):
"json_dict": output.json_dict,
"output_format": output.output_format,
"agent": output.agent,
"messages": output.messages,
},
"task_index": task_index,
"inputs": inputs,
@@ -1236,6 +1237,7 @@ class Crew(FlowTrackable, BaseModel):
pydantic=stored_output["pydantic"],
json_dict=stored_output["json_dict"],
output_format=stored_output["output_format"],
messages=stored_output.get("messages", []),
)
self.tasks[i].output = task_output

View File

@@ -16,7 +16,6 @@ from crewai.events.base_event_listener import BaseEventListener
from crewai.events.depends import Depends
from crewai.events.event_bus import crewai_event_bus
from crewai.events.handler_graph import CircularDependencyError
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
@@ -61,6 +60,14 @@ from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.events.types.mcp_events import (
MCPConnectionCompletedEvent,
MCPConnectionFailedEvent,
MCPConnectionStartedEvent,
MCPToolExecutionCompletedEvent,
MCPToolExecutionFailedEvent,
MCPToolExecutionStartedEvent,
)
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
@@ -153,6 +160,12 @@ __all__ = [
"LiteAgentExecutionCompletedEvent",
"LiteAgentExecutionErrorEvent",
"LiteAgentExecutionStartedEvent",
"MCPConnectionCompletedEvent",
"MCPConnectionFailedEvent",
"MCPConnectionStartedEvent",
"MCPToolExecutionCompletedEvent",
"MCPToolExecutionFailedEvent",
"MCPToolExecutionStartedEvent",
"MemoryQueryCompletedEvent",
"MemoryQueryFailedEvent",
"MemoryQueryStartedEvent",

View File

@@ -65,6 +65,14 @@ from crewai.events.types.logging_events import (
AgentLogsExecutionEvent,
AgentLogsStartedEvent,
)
from crewai.events.types.mcp_events import (
MCPConnectionCompletedEvent,
MCPConnectionFailedEvent,
MCPConnectionStartedEvent,
MCPToolExecutionCompletedEvent,
MCPToolExecutionFailedEvent,
MCPToolExecutionStartedEvent,
)
from crewai.events.types.reasoning_events import (
AgentReasoningCompletedEvent,
AgentReasoningFailedEvent,
@@ -615,5 +623,67 @@ class EventListener(BaseEventListener):
event.total_turns,
)
# ----------- MCP EVENTS -----------
@crewai_event_bus.on(MCPConnectionStartedEvent)
def on_mcp_connection_started(source, event: MCPConnectionStartedEvent):
self.formatter.handle_mcp_connection_started(
event.server_name,
event.server_url,
event.transport_type,
event.is_reconnect,
event.connect_timeout,
)
@crewai_event_bus.on(MCPConnectionCompletedEvent)
def on_mcp_connection_completed(source, event: MCPConnectionCompletedEvent):
self.formatter.handle_mcp_connection_completed(
event.server_name,
event.server_url,
event.transport_type,
event.connection_duration_ms,
event.is_reconnect,
)
@crewai_event_bus.on(MCPConnectionFailedEvent)
def on_mcp_connection_failed(source, event: MCPConnectionFailedEvent):
self.formatter.handle_mcp_connection_failed(
event.server_name,
event.server_url,
event.transport_type,
event.error,
event.error_type,
)
@crewai_event_bus.on(MCPToolExecutionStartedEvent)
def on_mcp_tool_execution_started(source, event: MCPToolExecutionStartedEvent):
self.formatter.handle_mcp_tool_execution_started(
event.server_name,
event.tool_name,
event.tool_args,
)
@crewai_event_bus.on(MCPToolExecutionCompletedEvent)
def on_mcp_tool_execution_completed(
source, event: MCPToolExecutionCompletedEvent
):
self.formatter.handle_mcp_tool_execution_completed(
event.server_name,
event.tool_name,
event.tool_args,
event.result,
event.execution_duration_ms,
)
@crewai_event_bus.on(MCPToolExecutionFailedEvent)
def on_mcp_tool_execution_failed(source, event: MCPToolExecutionFailedEvent):
self.formatter.handle_mcp_tool_execution_failed(
event.server_name,
event.tool_name,
event.tool_args,
event.error,
event.error_type,
)
event_listener = EventListener()

View File

@@ -40,6 +40,14 @@ from crewai.events.types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.types.mcp_events import (
MCPConnectionCompletedEvent,
MCPConnectionFailedEvent,
MCPConnectionStartedEvent,
MCPToolExecutionCompletedEvent,
MCPToolExecutionFailedEvent,
MCPToolExecutionStartedEvent,
)
from crewai.events.types.memory_events import (
MemoryQueryCompletedEvent,
MemoryQueryFailedEvent,
@@ -115,4 +123,10 @@ EventTypes = (
| MemoryQueryFailedEvent
| MemoryRetrievalStartedEvent
| MemoryRetrievalCompletedEvent
| MCPConnectionStartedEvent
| MCPConnectionCompletedEvent
| MCPConnectionFailedEvent
| MCPToolExecutionStartedEvent
| MCPToolExecutionCompletedEvent
| MCPToolExecutionFailedEvent
)

View File

@@ -0,0 +1,85 @@
from datetime import datetime
from typing import Any
from crewai.events.base_events import BaseEvent
class MCPEvent(BaseEvent):
"""Base event for MCP operations."""
server_name: str
server_url: str | None = None
transport_type: str | None = None # "stdio", "http", "sse"
agent_id: str | None = None
agent_role: str | None = None
from_agent: Any | None = None
from_task: Any | None = None
def __init__(self, **data):
super().__init__(**data)
self._set_agent_params(data)
self._set_task_params(data)
class MCPConnectionStartedEvent(MCPEvent):
"""Event emitted when starting to connect to an MCP server."""
type: str = "mcp_connection_started"
connect_timeout: int | None = None
is_reconnect: bool = (
False # True if this is a reconnection, False for first connection
)
class MCPConnectionCompletedEvent(MCPEvent):
"""Event emitted when successfully connected to an MCP server."""
type: str = "mcp_connection_completed"
started_at: datetime | None = None
completed_at: datetime | None = None
connection_duration_ms: float | None = None
is_reconnect: bool = (
False # True if this was a reconnection, False for first connection
)
class MCPConnectionFailedEvent(MCPEvent):
"""Event emitted when connection to an MCP server fails."""
type: str = "mcp_connection_failed"
error: str
error_type: str | None = None # "timeout", "authentication", "network", etc.
started_at: datetime | None = None
failed_at: datetime | None = None
class MCPToolExecutionStartedEvent(MCPEvent):
"""Event emitted when starting to execute an MCP tool."""
type: str = "mcp_tool_execution_started"
tool_name: str
tool_args: dict[str, Any] | None = None
class MCPToolExecutionCompletedEvent(MCPEvent):
"""Event emitted when MCP tool execution completes."""
type: str = "mcp_tool_execution_completed"
tool_name: str
tool_args: dict[str, Any] | None = None
result: Any | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
execution_duration_ms: float | None = None
class MCPToolExecutionFailedEvent(MCPEvent):
"""Event emitted when MCP tool execution fails."""
type: str = "mcp_tool_execution_failed"
tool_name: str
tool_args: dict[str, Any] | None = None
error: str
error_type: str | None = None # "timeout", "validation", "server_error", etc.
started_at: datetime | None = None
failed_at: datetime | None = None

View File

@@ -2248,3 +2248,203 @@ class ConsoleFormatter:
self.current_a2a_conversation_branch = None
self.current_a2a_turn_count = 0
# ----------- MCP EVENTS -----------
def handle_mcp_connection_started(
self,
server_name: str,
server_url: str | None = None,
transport_type: str | None = None,
is_reconnect: bool = False,
connect_timeout: int | None = None,
) -> None:
"""Handle MCP connection started event."""
if not self.verbose:
return
content = Text()
reconnect_text = " (Reconnecting)" if is_reconnect else ""
content.append(f"MCP Connection Started{reconnect_text}\n\n", style="cyan bold")
content.append("Server: ", style="white")
content.append(f"{server_name}\n", style="cyan")
if server_url:
content.append("URL: ", style="white")
content.append(f"{server_url}\n", style="cyan dim")
if transport_type:
content.append("Transport: ", style="white")
content.append(f"{transport_type}\n", style="cyan")
if connect_timeout:
content.append("Timeout: ", style="white")
content.append(f"{connect_timeout}s\n", style="cyan")
panel = self.create_panel(content, "🔌 MCP Connection", "cyan")
self.print(panel)
self.print()
def handle_mcp_connection_completed(
self,
server_name: str,
server_url: str | None = None,
transport_type: str | None = None,
connection_duration_ms: float | None = None,
is_reconnect: bool = False,
) -> None:
"""Handle MCP connection completed event."""
if not self.verbose:
return
content = Text()
reconnect_text = " (Reconnected)" if is_reconnect else ""
content.append(
f"MCP Connection Completed{reconnect_text}\n\n", style="green bold"
)
content.append("Server: ", style="white")
content.append(f"{server_name}\n", style="green")
if server_url:
content.append("URL: ", style="white")
content.append(f"{server_url}\n", style="green dim")
if transport_type:
content.append("Transport: ", style="white")
content.append(f"{transport_type}\n", style="green")
if connection_duration_ms is not None:
content.append("Duration: ", style="white")
content.append(f"{connection_duration_ms:.2f}ms\n", style="green")
panel = self.create_panel(content, "✅ MCP Connected", "green")
self.print(panel)
self.print()
def handle_mcp_connection_failed(
self,
server_name: str,
server_url: str | None = None,
transport_type: str | None = None,
error: str = "",
error_type: str | None = None,
) -> None:
"""Handle MCP connection failed event."""
if not self.verbose:
return
content = Text()
content.append("MCP Connection Failed\n\n", style="red bold")
content.append("Server: ", style="white")
content.append(f"{server_name}\n", style="red")
if server_url:
content.append("URL: ", style="white")
content.append(f"{server_url}\n", style="red dim")
if transport_type:
content.append("Transport: ", style="white")
content.append(f"{transport_type}\n", style="red")
if error_type:
content.append("Error Type: ", style="white")
content.append(f"{error_type}\n", style="red")
if error:
content.append("\nError: ", style="white bold")
error_preview = error[:500] + "..." if len(error) > 500 else error
content.append(f"{error_preview}\n", style="red")
panel = self.create_panel(content, "❌ MCP Connection Failed", "red")
self.print(panel)
self.print()
def handle_mcp_tool_execution_started(
self,
server_name: str,
tool_name: str,
tool_args: dict[str, Any] | None = None,
) -> None:
"""Handle MCP tool execution started event."""
if not self.verbose:
return
content = self.create_status_content(
"MCP Tool Execution Started",
tool_name,
"yellow",
tool_args=tool_args or {},
Server=server_name,
)
panel = self.create_panel(content, "🔧 MCP Tool", "yellow")
self.print(panel)
self.print()
def handle_mcp_tool_execution_completed(
self,
server_name: str,
tool_name: str,
tool_args: dict[str, Any] | None = None,
result: Any | None = None,
execution_duration_ms: float | None = None,
) -> None:
"""Handle MCP tool execution completed event."""
if not self.verbose:
return
content = self.create_status_content(
"MCP Tool Execution Completed",
tool_name,
"green",
tool_args=tool_args or {},
Server=server_name,
)
if execution_duration_ms is not None:
content.append("Duration: ", style="white")
content.append(f"{execution_duration_ms:.2f}ms\n", style="green")
if result is not None:
result_str = str(result)
if len(result_str) > 500:
result_str = result_str[:497] + "..."
content.append("\nResult: ", style="white bold")
content.append(f"{result_str}\n", style="green")
panel = self.create_panel(content, "✅ MCP Tool Completed", "green")
self.print(panel)
self.print()
def handle_mcp_tool_execution_failed(
self,
server_name: str,
tool_name: str,
tool_args: dict[str, Any] | None = None,
error: str = "",
error_type: str | None = None,
) -> None:
"""Handle MCP tool execution failed event."""
if not self.verbose:
return
content = self.create_status_content(
"MCP Tool Execution Failed",
tool_name,
"red",
tool_args=tool_args or {},
Server=server_name,
)
if error_type:
content.append("Error Type: ", style="white")
content.append(f"{error_type}\n", style="red")
if error:
content.append("\nError: ", style="white bold")
error_preview = error[:500] + "..." if len(error) > 500 else error
content.append(f"{error_preview}\n", style="red")
panel = self.create_panel(content, "❌ MCP Tool Failed", "red")
self.print(panel)
self.print()

View File

@@ -428,6 +428,8 @@ class FlowMeta(type):
possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
else:
router_paths[attr_name] = []
cls._start_methods = start_methods # type: ignore[attr-defined]
cls._listeners = listeners # type: ignore[attr-defined]

View File

@@ -21,6 +21,7 @@ P = ParamSpec("P")
R = TypeVar("R", covariant=True)
FlowMethodName = NewType("FlowMethodName", str)
FlowRouteName = NewType("FlowRouteName", str)
PendingListenerKey = NewType(
"PendingListenerKey",
Annotated[str, "nested flow conditions use 'listener_name:object_id'"],

View File

@@ -19,11 +19,11 @@ import ast
from collections import defaultdict, deque
import inspect
import textwrap
from typing import Any, TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from typing_extensions import TypeIs
from crewai.flow.constants import OR_CONDITION, AND_CONDITION
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
@@ -33,6 +33,7 @@ from crewai.flow.flow_wrappers import (
from crewai.flow.types import FlowMethodCallable, FlowMethodName
from crewai.utilities.printer import Printer
if TYPE_CHECKING:
from crewai.flow.flow import Flow
@@ -40,6 +41,22 @@ _printer = Printer()
def get_possible_return_constants(function: Any) -> list[str] | None:
"""Extract possible string return values from a function using AST parsing.
This function analyzes the source code of a router method to identify
all possible string values it might return. It handles:
- Direct string literals: return "value"
- Variable assignments: x = "value"; return x
- Dictionary lookups: d = {"k": "v"}; return d[key]
- Conditional returns: return "a" if cond else "b"
- State attributes: return self.state.attr (infers from class context)
Args:
function: The function to analyze.
Returns:
List of possible string return values, or None if analysis fails.
"""
try:
source = inspect.getsource(function)
except OSError:
@@ -82,6 +99,7 @@ def get_possible_return_constants(function: Any) -> list[str] | None:
return_values: set[str] = set()
dict_definitions: dict[str, list[str]] = {}
variable_values: dict[str, list[str]] = {}
state_attribute_values: dict[str, list[str]] = {}
def extract_string_constants(node: ast.expr) -> list[str]:
"""Recursively extract all string constants from an AST node."""
@@ -91,6 +109,17 @@ def get_possible_return_constants(function: Any) -> list[str] | None:
elif isinstance(node, ast.IfExp):
strings.extend(extract_string_constants(node.body))
strings.extend(extract_string_constants(node.orelse))
elif isinstance(node, ast.Call):
if (
isinstance(node.func, ast.Attribute)
and node.func.attr == "get"
and len(node.args) >= 2
):
default_arg = node.args[1]
if isinstance(default_arg, ast.Constant) and isinstance(
default_arg.value, str
):
strings.append(default_arg.value)
return strings
class VariableAssignmentVisitor(ast.NodeVisitor):
@@ -124,6 +153,22 @@ def get_possible_return_constants(function: Any) -> list[str] | None:
self.generic_visit(node)
def get_attribute_chain(node: ast.expr) -> str | None:
"""Extract the full attribute chain from an AST node.
Examples:
self.state.run_type -> "self.state.run_type"
x.y.z -> "x.y.z"
simple_var -> "simple_var"
"""
if isinstance(node, ast.Name):
return node.id
if isinstance(node, ast.Attribute):
base = get_attribute_chain(node.value)
if base:
return f"{base}.{node.attr}"
return None
class ReturnVisitor(ast.NodeVisitor):
def visit_Return(self, node: ast.Return) -> None:
if (
@@ -139,21 +184,94 @@ def get_possible_return_constants(function: Any) -> list[str] | None:
for v in dict_definitions[var_name_dict]:
return_values.add(v)
elif node.value:
var_name_ret: str | None = None
if isinstance(node.value, ast.Name):
var_name_ret = node.value.id
elif isinstance(node.value, ast.Attribute):
var_name_ret = f"{node.value.value.id if isinstance(node.value.value, ast.Name) else '_'}.{node.value.attr}"
var_name_ret = get_attribute_chain(node.value)
if var_name_ret and var_name_ret in variable_values:
for v in variable_values[var_name_ret]:
return_values.add(v)
elif var_name_ret and var_name_ret in state_attribute_values:
for v in state_attribute_values[var_name_ret]:
return_values.add(v)
self.generic_visit(node)
def visit_If(self, node: ast.If) -> None:
self.generic_visit(node)
# Try to get the class context to infer state attribute values
try:
if hasattr(function, "__self__"):
# Method is bound, get the class
class_obj = function.__self__.__class__
elif hasattr(function, "__qualname__") and "." in function.__qualname__:
# Method is unbound but we can try to get class from module
class_name = function.__qualname__.rsplit(".", 1)[0]
if hasattr(function, "__globals__"):
class_obj = function.__globals__.get(class_name)
else:
class_obj = None
else:
class_obj = None
if class_obj is not None:
try:
class_source = inspect.getsource(class_obj)
class_source = textwrap.dedent(class_source)
class_ast = ast.parse(class_source)
# Look for comparisons and assignments involving state attributes
class StateAttributeVisitor(ast.NodeVisitor):
def visit_Compare(self, node: ast.Compare) -> None:
"""Find comparisons like: self.state.attr == "value" """
left_attr = get_attribute_chain(node.left)
if left_attr:
for comparator in node.comparators:
if isinstance(comparator, ast.Constant) and isinstance(
comparator.value, str
):
if left_attr not in state_attribute_values:
state_attribute_values[left_attr] = []
if (
comparator.value
not in state_attribute_values[left_attr]
):
state_attribute_values[left_attr].append(
comparator.value
)
# Also check right side
for comparator in node.comparators:
right_attr = get_attribute_chain(comparator)
if (
right_attr
and isinstance(node.left, ast.Constant)
and isinstance(node.left.value, str)
):
if right_attr not in state_attribute_values:
state_attribute_values[right_attr] = []
if (
node.left.value
not in state_attribute_values[right_attr]
):
state_attribute_values[right_attr].append(
node.left.value
)
self.generic_visit(node)
StateAttributeVisitor().visit(class_ast)
except Exception as e:
_printer.print(
f"Could not analyze class context for {function.__name__}: {e}",
color="yellow",
)
except Exception as e:
_printer.print(
f"Could not introspect class for {function.__name__}: {e}",
color="yellow",
)
VariableAssignmentVisitor().visit(code_ast)
ReturnVisitor().visit(code_ast)

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap" rel="stylesheet">
<link rel="stylesheet" href="'{{ css_path }}'" />
<script src="https://unpkg.com/lucide@latest"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/prism.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/prism/1.29.0/components/prism-python.min.js"></script>
<script src="'{{ js_path }}'"></script>
@@ -23,93 +24,129 @@
<div class="drawer-title" id="drawer-node-name">Node Details</div>
<div style="display: flex; align-items: center;">
<button class="drawer-open-ide" id="drawer-open-ide" style="display: none;">
<svg viewBox="0 0 16 16" fill="none" stroke="currentColor" stroke-width="2">
<path d="M4 2 L12 2 L12 14 L4 14 Z" stroke-linecap="round" stroke-linejoin="round"/>
<path d="M6 5 L10 5 M6 8 L10 8 M6 11 L10 11" stroke-linecap="round"/>
</svg>
<i data-lucide="file-code" style="width: 16px; height: 16px;"></i>
Open in IDE
</button>
<button class="drawer-close" id="drawer-close">×</button>
<button class="drawer-close" id="drawer-close">
<i data-lucide="x" style="width: 20px; height: 20px;"></i>
</button>
</div>
</div>
<div class="drawer-content" id="drawer-content"></div>
</div>
<div id="info">
<div style="text-align: center; margin-bottom: 20px;">
<div style="text-align: center;">
<img src="https://cdn.prod.website-files.com/68de1ee6d7c127849807d7a6/68de1ee6d7c127849807d7ef_Logo.svg"
alt="CrewAI Logo"
style="width: 120px; height: auto;">
</div>
<h3>Flow Execution</h3>
<div class="stats">
<p><strong>Nodes:</strong> '{{ dag_nodes_count }}'</p>
<p><strong>Edges:</strong> '{{ dag_edges_count }}'</p>
<p><strong>Topological Paths:</strong> '{{ execution_paths }}'</p>
</div>
<div class="legend">
<div class="legend-title">Node Types</div>
<div class="legend-item">
<div class="legend-color" style="background: '{{ CREWAI_ORANGE }}';"></div>
<span>Start Methods</span>
</div>
<div class="legend-item">
<div class="legend-color" style="background: '{{ DARK_GRAY }}'; border: 3px solid '{{ CREWAI_ORANGE }}';"></div>
<span>Router Methods</span>
</div>
<div class="legend-item">
<div class="legend-color" style="background: '{{ DARK_GRAY }}';"></div>
<span>Listen Methods</span>
</div>
</div>
<div class="legend">
<div class="legend-title">Edge Types</div>
<div class="legend-item">
<svg width="24" height="12" style="margin-right: 12px;">
<line x1="0" y1="6" x2="24" y2="6" stroke="'{{ CREWAI_ORANGE }}'" stroke-width="2" stroke-dasharray="5,5"/>
</svg>
<span>Router Paths</span>
</div>
<div class="legend-item">
<svg width="24" height="12" style="margin-right: 12px;" class="legend-or-line">
<line x1="0" y1="6" x2="24" y2="6" stroke="var(--edge-or-color)" stroke-width="2"/>
</svg>
<span>OR Conditions</span>
</div>
<div class="legend-item">
<svg width="24" height="12" style="margin-right: 12px;">
<line x1="0" y1="6" x2="24" y2="6" stroke="'{{ CREWAI_ORANGE }}'" stroke-width="2"/>
</svg>
<span>AND Conditions</span>
</div>
</div>
<div class="instructions">
<strong>Interactions:</strong><br>
• Drag to pan<br>
• Scroll to zoom<br><br>
<strong>IDE:</strong>
<select id="ide-selector" style="width: 100%; padding: 4px; margin-top: 4px; border-radius: 3px; border: 1px solid #e0e0e0; background: white; font-size: 12px; cursor: pointer; pointer-events: auto; position: relative; z-index: 10;">
<option value="auto">Auto-detect</option>
<option value="pycharm">PyCharm</option>
<option value="vscode">VS Code</option>
<option value="jetbrains">JetBrains (Toolbox)</option>
</select>
style="width: 144px; height: auto;">
</div>
</div>
<!-- Custom navigation controls -->
<div class="nav-controls">
<div class="nav-button" id="theme-toggle" title="Toggle Dark Mode">🌙</div>
<div class="nav-button" id="zoom-in" title="Zoom In">+</div>
<div class="nav-button" id="zoom-out" title="Zoom Out"></div>
<div class="nav-button" id="fit" title="Fit to Screen">⊡</div>
<div class="nav-button" id="export-png" title="Export to PNG">🖼</div>
<div class="nav-button" id="export-pdf" title="Export to PDF">📄</div>
<div class="nav-button" id="export-json" title="Export to JSON">{}</div>
<div class="nav-button" id="theme-toggle" title="Toggle Dark Mode">
<i data-lucide="moon" style="width: 18px; height: 18px;"></i>
</div>
<div class="nav-button" id="zoom-in" title="Zoom In">
<i data-lucide="zoom-in" style="width: 18px; height: 18px;"></i>
</div>
<div class="nav-button" id="zoom-out" title="Zoom Out">
<i data-lucide="zoom-out" style="width: 18px; height: 18px;"></i>
</div>
<div class="nav-button" id="fit" title="Fit to Screen">
<i data-lucide="maximize-2" style="width: 18px; height: 18px;"></i>
</div>
<div class="nav-button" id="export-png" title="Export to PNG">
<i data-lucide="image" style="width: 18px; height: 18px;"></i>
</div>
<div class="nav-button" id="export-pdf" title="Export to PDF">
<i data-lucide="file-text" style="width: 18px; height: 18px;"></i>
</div>
<!-- <div class="nav-button" id="export-json" title="Export to JSON">
<i data-lucide="braces" style="width: 18px; height: 18px;"></i>
</div> -->
</div>
<div id="network-container">
<div id="network"></div>
</div>
<!-- Info panel at bottom -->
<div id="legend-panel">
<!-- Stats Section -->
<div class="legend-section">
<div class="legend-stats-row">
<div class="legend-stat-item">
<span class="stat-value">'{{ dag_nodes_count }}'</span>
<span class="stat-label">Nodes</span>
</div>
<div class="legend-stat-item">
<span class="stat-value">'{{ dag_edges_count }}'</span>
<span class="stat-label">Edges</span>
</div>
<div class="legend-stat-item">
<span class="stat-value">'{{ execution_paths }}'</span>
<span class="stat-label">Paths</span>
</div>
</div>
</div>
<!-- Node Types Section -->
<div class="legend-section">
<div class="legend-group">
<div class="legend-item-compact">
<div class="legend-color-small" style="background: var(--node-bg-start);"></div>
<span>Start</span>
</div>
<div class="legend-item-compact">
<div class="legend-color-small" style="background: var(--node-bg-router); border: 2px solid var(--node-border-start);"></div>
<span>Router</span>
</div>
<div class="legend-item-compact">
<div class="legend-color-small" style="background: var(--node-bg-listen); border: 2px solid var(--node-border-listen);"></div>
<span>Listen</span>
</div>
</div>
</div>
<!-- Edge Types Section -->
<div class="legend-section">
<div class="legend-group">
<div class="legend-item-compact">
<svg>
<line x1="0" y1="7" x2="29" y2="7" stroke="var(--edge-router-color)" stroke-width="2" stroke-dasharray="4,4"/>
</svg>
<span>Router</span>
</div>
<div class="legend-item-compact">
<svg class="legend-or-line">
<line x1="0" y1="7" x2="29" y2="7" stroke="var(--edge-or-color)" stroke-width="2"/>
</svg>
<span>OR</span>
</div>
<div class="legend-item-compact">
<svg>
<line x1="0" y1="7" x2="29" y2="7" stroke="var(--edge-router-color)" stroke-width="2"/>
</svg>
<span>AND</span>
</div>
</div>
</div>
<!-- IDE Selector Section -->
<div class="legend-section">
<div class="legend-ide-column">
<label class="legend-ide-label">IDE</label>
<select id="ide-selector" class="legend-ide-select">
<option value="auto">Auto-detect</option>
<option value="pycharm">PyCharm</option>
<option value="vscode">VS Code</option>
<option value="jetbrains">JetBrains</option>
</select>
</div>
</div>
</div>
</body>
</html>

View File

@@ -13,6 +13,14 @@
--edge-label-text: '{{ GRAY }}';
--edge-label-bg: rgba(255, 255, 255, 0.8);
--edge-or-color: #000000;
--edge-router-color: '{{ CREWAI_ORANGE }}';
--node-border-start: #C94238;
--node-border-listen: #3D3D3D;
--node-bg-start: #FF7066;
--node-bg-router: #FFFFFF;
--node-bg-listen: #FFFFFF;
--node-text-color: #FFFFFF;
--nav-button-hover: #f5f5f5;
}
[data-theme="dark"] {
@@ -30,6 +38,14 @@
--edge-label-text: #c9d1d9;
--edge-label-bg: rgba(22, 27, 34, 0.9);
--edge-or-color: #ffffff;
--edge-router-color: '{{ CREWAI_ORANGE }}';
--node-border-start: #FF5A50;
--node-border-listen: #666666;
--node-bg-start: #B33830;
--node-bg-router: #3D3D3D;
--node-bg-listen: #3D3D3D;
--node-text-color: #FFFFFF;
--nav-button-hover: #30363d;
}
@keyframes dash {
@@ -72,12 +88,10 @@ body {
position: absolute;
top: 20px;
left: 20px;
background: var(--bg-secondary);
background: transparent;
padding: 20px;
border-radius: 8px;
box-shadow: 0 4px 12px var(--shadow-strong);
max-width: 320px;
border: 1px solid var(--border-color);
z-index: 10000;
pointer-events: auto;
transition: background 0.3s ease, border-color 0.3s ease, box-shadow 0.3s ease;
@@ -125,12 +139,16 @@ h3 {
margin-right: 12px;
border-radius: 3px;
box-sizing: border-box;
transition: background 0.3s ease, border-color 0.3s ease;
}
.legend-item span {
color: var(--text-secondary);
font-size: 13px;
transition: color 0.3s ease;
}
.legend-item svg line {
transition: stroke 0.3s ease;
}
.instructions {
margin-top: 15px;
padding-top: 15px;
@@ -155,7 +173,7 @@ h3 {
bottom: 20px;
right: auto;
display: grid;
grid-template-columns: repeat(4, 40px);
grid-template-columns: repeat(3, 40px);
gap: 8px;
z-index: 10002;
pointer-events: auto;
@@ -165,10 +183,187 @@ h3 {
.nav-controls.drawer-open {
}
#legend-panel {
position: fixed;
left: 164px;
bottom: 20px;
right: 20px;
height: 92px;
background: var(--bg-secondary);
backdrop-filter: blur(12px) saturate(180%);
-webkit-backdrop-filter: blur(12px) saturate(180%);
border: 1px solid var(--border-subtle);
border-radius: 6px;
box-shadow: 0 2px 8px var(--shadow-color);
display: grid;
grid-template-columns: repeat(4, 1fr);
align-items: center;
gap: 0;
padding: 0 24px;
box-sizing: border-box;
z-index: 10001;
pointer-events: auto;
transition: background 0.3s ease, border-color 0.3s ease, box-shadow 0.3s ease, right 0.3s cubic-bezier(0.4, 0, 0.2, 1);
}
#legend-panel.drawer-open {
right: 405px;
}
.legend-section {
display: flex;
align-items: center;
justify-content: center;
min-width: 0;
width: -webkit-fill-available;
width: -moz-available;
width: stretch;
position: relative;
}
.legend-section:not(:last-child)::after {
content: '';
position: absolute;
right: 0;
top: 50%;
transform: translateY(-50%);
width: 1px;
height: 48px;
background: var(--border-color);
transition: background 0.3s ease;
}
.legend-stats-row {
display: flex;
gap: 32px;
justify-content: center;
align-items: center;
min-width: 0;
}
.legend-stat-item {
display: flex;
flex-direction: column;
align-items: center;
gap: 4px;
}
.stat-value {
font-size: 19px;
font-weight: 700;
color: var(--text-primary);
line-height: 1;
transition: color 0.3s ease;
}
.stat-label {
font-size: 8px;
font-weight: 500;
text-transform: uppercase;
color: var(--text-secondary);
letter-spacing: 0.5px;
transition: color 0.3s ease;
}
.legend-items-row {
display: flex;
gap: 16px;
align-items: center;
justify-content: center;
min-width: 0;
}
.legend-group {
display: flex;
gap: 16px;
align-items: center;
}
.legend-item-compact {
display: flex;
align-items: center;
gap: 6px;
}
.legend-item-compact span {
font-size: 12px;
font-weight: 500;
text-transform: uppercase;
color: var(--text-secondary);
letter-spacing: 0.5px;
white-space: nowrap;
font-family: inherit;
line-height: 1;
transition: color 0.3s ease;
}
.legend-color-small {
width: 17px;
height: 17px;
border-radius: 2px;
box-sizing: border-box;
flex-shrink: 0;
transition: background 0.3s ease, border-color 0.3s ease;
}
.legend-item-compact svg {
display: block;
flex-shrink: 0;
width: 29px;
height: 14px;
}
.legend-item-compact svg line {
transition: stroke 0.3s ease;
}
.legend-ide-column {
display: flex;
flex-direction: row;
gap: 8px;
align-items: center;
justify-content: center;
min-width: 0;
width: 100%;
}
.legend-ide-label {
font-size: 12px;
font-weight: 500;
text-transform: uppercase;
color: var(--text-secondary);
letter-spacing: 0.5px;
transition: color 0.3s ease;
white-space: nowrap;
}
.legend-ide-select {
width: 120px;
padding: 6px 10px;
border-radius: 4px;
border: 1px solid var(--border-subtle);
background: var(--bg-primary);
color: var(--text-primary);
font-size: 11px;
cursor: pointer;
transition: all 0.3s ease;
}
.legend-ide-select:hover {
border-color: var(--text-secondary);
}
.legend-ide-select:focus {
outline: none;
border-color: '{{ CREWAI_ORANGE }}';
}
.nav-button {
width: 40px;
height: 40px;
background: var(--bg-secondary);
backdrop-filter: blur(12px) saturate(180%);
-webkit-backdrop-filter: blur(12px) saturate(180%);
border: 1px solid var(--border-subtle);
border-radius: 6px;
display: flex;
@@ -181,12 +376,12 @@ h3 {
user-select: none;
pointer-events: auto;
position: relative;
z-index: 10001;
z-index: 10002;
transition: background 0.3s ease, border-color 0.3s ease, color 0.3s ease, box-shadow 0.3s ease;
}
.nav-button:hover {
background: var(--border-subtle);
background: var(--nav-button-hover);
}
#drawer {
@@ -198,9 +393,10 @@ h3 {
background: var(--bg-drawer);
box-shadow: -4px 0 12px var(--shadow-strong);
transition: right 0.3s cubic-bezier(0.4, 0, 0.2, 1), background 0.3s ease, box-shadow 0.3s ease;
z-index: 2000;
overflow-y: auto;
padding: 24px;
z-index: 10003;
overflow: hidden;
transform: translateZ(0);
isolation: isolate;
}
#drawer.open {
@@ -247,17 +443,22 @@ h3 {
justify-content: space-between;
align-items: center;
margin-bottom: 20px;
padding-bottom: 16px;
padding: 24px 24px 16px 24px;
border-bottom: 2px solid '{{ CREWAI_ORANGE }}';
position: relative;
z-index: 2001;
}
.drawer-title {
font-size: 20px;
font-size: 15px;
font-weight: 700;
color: var(--text-primary);
transition: color 0.3s ease;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
flex: 1;
min-width: 0;
}
.drawer-close {
@@ -269,12 +470,19 @@ h3 {
padding: 4px 8px;
line-height: 1;
transition: color 0.3s ease;
display: flex;
align-items: center;
justify-content: center;
}
.drawer-close:hover {
color: '{{ CREWAI_ORANGE }}';
}
.drawer-close i {
display: block;
}
.drawer-open-ide {
background: '{{ CREWAI_ORANGE }}';
border: none;
@@ -292,6 +500,9 @@ h3 {
position: relative;
z-index: 9999;
pointer-events: auto;
white-space: nowrap;
flex-shrink: 0;
min-width: fit-content;
}
.drawer-open-ide:hover {
@@ -305,14 +516,19 @@ h3 {
box-shadow: 0 1px 4px rgba(255, 90, 80, 0.2);
}
.drawer-open-ide svg {
.drawer-open-ide svg,
.drawer-open-ide i {
width: 14px;
height: 14px;
display: block;
}
.drawer-content {
color: '{{ DARK_GRAY }}';
line-height: 1.6;
padding: 0 24px 24px 24px;
overflow-y: auto;
height: calc(100vh - 95px);
}
.drawer-section {
@@ -328,6 +544,10 @@ h3 {
position: relative;
}
.drawer-metadata-grid:has(.drawer-section:nth-child(3):nth-last-child(1)) {
grid-template-columns: 1fr 2fr;
}
.drawer-metadata-grid::before {
content: '';
position: absolute;
@@ -419,20 +639,35 @@ h3 {
grid-column: 2;
display: flex;
flex-direction: column;
justify-content: center;
justify-content: flex-start;
align-items: flex-start;
}
.drawer-metadata-grid:has(.drawer-section:nth-child(3):nth-last-child(1))::after {
right: 50%;
right: 66.666%;
}
.drawer-metadata-grid:has(.drawer-section:nth-child(3):nth-last-child(1))::before {
left: 33.333%;
}
.drawer-metadata-grid .drawer-section:nth-child(3):nth-last-child(1) .drawer-section-title {
align-self: flex-start;
}
.drawer-metadata-grid .drawer-section:nth-child(3):nth-last-child(1) > *:not(.drawer-section-title) {
width: 100%;
align-self: stretch;
}
.drawer-section-title {
font-size: 12px;
text-transform: uppercase;
color: '{{ GRAY }}';
color: var(--text-secondary);
letter-spacing: 0.5px;
margin-bottom: 8px;
font-weight: 600;
transition: color 0.3s ease;
}
.drawer-badge {
@@ -465,9 +700,44 @@ h3 {
padding: 3px 0;
}
.drawer-metadata-grid .drawer-section .drawer-list {
display: flex;
flex-direction: column;
gap: 6px;
}
.drawer-metadata-grid .drawer-section .drawer-list li {
border-bottom: none;
padding: 0;
}
.drawer-metadata-grid .drawer-section:nth-child(3) .drawer-list li {
border-bottom: none;
padding: 3px 0;
padding: 0;
}
.drawer-metadata-grid .drawer-section {
overflow: visible;
}
.drawer-metadata-grid .drawer-section .condition-group,
.drawer-metadata-grid .drawer-section .trigger-group {
width: 100%;
box-sizing: border-box;
}
.drawer-metadata-grid .drawer-section .condition-children {
width: 100%;
}
.drawer-metadata-grid .drawer-section .trigger-group-items {
width: 100%;
}
.drawer-metadata-grid .drawer-section .drawer-code-link {
word-break: break-word;
overflow-wrap: break-word;
max-width: 100%;
}
.drawer-code {
@@ -491,6 +761,7 @@ h3 {
cursor: pointer;
transition: all 0.2s;
display: inline-block;
margin: 3px 2px;
}
.drawer-code-link:hover {

View File

@@ -3,12 +3,13 @@
from __future__ import annotations
from collections import defaultdict
from collections.abc import Iterable
import inspect
from typing import TYPE_CHECKING, Any
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_wrappers import FlowCondition
from crewai.flow.types import FlowMethodName
from crewai.flow.types import FlowMethodName, FlowRouteName
from crewai.flow.utils import (
is_flow_condition_dict,
is_simple_flow_condition,
@@ -197,8 +198,6 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
node_metadata["type"] = "router"
router_methods.append(method_name)
node_metadata["condition_type"] = "IF"
if method_name in flow._router_paths:
node_metadata["router_paths"] = [
str(p) for p in flow._router_paths[method_name]
@@ -210,9 +209,13 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
]
if hasattr(method, "__condition_type__") and method.__condition_type__:
node_metadata["trigger_condition_type"] = method.__condition_type__
if "condition_type" not in node_metadata:
node_metadata["condition_type"] = method.__condition_type__
if node_metadata.get("is_router") and "condition_type" not in node_metadata:
node_metadata["condition_type"] = "IF"
if (
hasattr(method, "__trigger_condition__")
and method.__trigger_condition__ is not None
@@ -298,6 +301,9 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
nodes[method_name] = node_metadata
for listener_name, condition_data in flow._listeners.items():
if listener_name in router_methods:
continue
if is_simple_flow_condition(condition_data):
cond_type, methods = condition_data
edges.extend(
@@ -315,6 +321,60 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
_create_edges_from_condition(condition_data, str(listener_name), nodes)
)
for method_name, node_metadata in nodes.items(): # type: ignore[assignment]
if node_metadata.get("is_router") and "trigger_methods" in node_metadata:
trigger_methods = node_metadata["trigger_methods"]
condition_type = node_metadata.get("trigger_condition_type", OR_CONDITION)
if "trigger_condition" in node_metadata:
edges.extend(
_create_edges_from_condition(
node_metadata["trigger_condition"], # type: ignore[arg-type]
method_name,
nodes,
)
)
else:
edges.extend(
StructureEdge(
source=trigger_method,
target=method_name,
condition_type=condition_type,
is_router_path=False,
)
for trigger_method in trigger_methods
if trigger_method in nodes
)
for router_method_name in router_methods:
if router_method_name not in flow._router_paths:
flow._router_paths[FlowMethodName(router_method_name)] = []
inferred_paths: Iterable[FlowMethodName | FlowRouteName] = set(
flow._router_paths.get(FlowMethodName(router_method_name), [])
)
for condition_data in flow._listeners.values():
trigger_strings: list[str] = []
if is_simple_flow_condition(condition_data):
_, methods = condition_data
trigger_strings = [str(m) for m in methods]
elif is_flow_condition_dict(condition_data):
trigger_strings = _extract_direct_or_triggers(condition_data)
for trigger_str in trigger_strings:
if trigger_str not in nodes:
# This is likely a router path output
inferred_paths.add(trigger_str) # type: ignore[attr-defined]
if inferred_paths:
flow._router_paths[FlowMethodName(router_method_name)] = list(
inferred_paths # type: ignore[arg-type]
)
if router_method_name in nodes:
nodes[router_method_name]["router_paths"] = list(inferred_paths)
for router_method_name in router_methods:
if router_method_name not in flow._router_paths:
continue
@@ -340,6 +400,7 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
target=str(listener_name),
condition_type=None,
is_router_path=True,
router_path_label=str(path),
)
)

View File

@@ -20,7 +20,7 @@ class CSSExtension(Extension):
Provides {% css 'path/to/file.css' %} tag syntax.
"""
tags: ClassVar[set[str]] = {"css"} # type: ignore[assignment]
tags: ClassVar[set[str]] = {"css"} # type: ignore[misc]
def parse(self, parser: Parser) -> nodes.Node:
"""Parse {% css 'styles.css' %} tag.
@@ -53,7 +53,7 @@ class JSExtension(Extension):
Provides {% js 'path/to/file.js' %} tag syntax.
"""
tags: ClassVar[set[str]] = {"js"} # type: ignore[assignment]
tags: ClassVar[set[str]] = {"js"} # type: ignore[misc]
def parse(self, parser: Parser) -> nodes.Node:
"""Parse {% js 'script.js' %} tag.
@@ -91,6 +91,116 @@ TEXT_PRIMARY = "#e6edf3"
TEXT_SECONDARY = "#7d8590"
def calculate_node_positions(
dag: FlowStructure,
) -> dict[str, dict[str, int | float]]:
"""Calculate hierarchical positions (level, x, y) for each node.
Args:
dag: FlowStructure containing nodes and edges.
Returns:
Dictionary mapping node names to their position data (level, x, y).
"""
children: dict[str, list[str]] = {name: [] for name in dag["nodes"]}
parents: dict[str, list[str]] = {name: [] for name in dag["nodes"]}
for edge in dag["edges"]:
source = edge["source"]
target = edge["target"]
if source in children and target in children:
children[source].append(target)
parents[target].append(source)
levels: dict[str, int] = {}
queue: list[tuple[str, int]] = []
for start_method in dag["start_methods"]:
if start_method in dag["nodes"]:
levels[start_method] = 0
queue.append((start_method, 0))
visited: set[str] = set()
while queue:
node, level = queue.pop(0)
if node in visited:
continue
visited.add(node)
if node not in levels or levels[node] < level:
levels[node] = level
for child in children.get(node, []):
if child not in visited:
child_level = level + 1
if child not in levels or levels[child] < child_level:
levels[child] = child_level
queue.append((child, child_level))
for name in dag["nodes"]:
if name not in levels:
levels[name] = 0
nodes_by_level: dict[int, list[str]] = {}
for node, level in levels.items():
if level not in nodes_by_level:
nodes_by_level[level] = []
nodes_by_level[level].append(node)
positions: dict[str, dict[str, int | float]] = {}
level_separation = 300 # Vertical spacing between levels
node_spacing = 400 # Horizontal spacing between nodes
parent_count: dict[str, int] = {}
for node, parent_list in parents.items():
parent_count[node] = len(parent_list)
for level, nodes_at_level in sorted(nodes_by_level.items()):
y = level * level_separation
if level == 0:
num_nodes = len(nodes_at_level)
for i, node in enumerate(nodes_at_level):
x = (i - (num_nodes - 1) / 2) * node_spacing
positions[node] = {"level": level, "x": x, "y": y}
else:
for i, node in enumerate(nodes_at_level):
parent_list = parents.get(node, [])
parent_positions: list[float] = [
positions[parent]["x"]
for parent in parent_list
if parent in positions
]
if parent_positions:
if len(parent_positions) > 1 and len(set(parent_positions)) == 1:
base_x = parent_positions[0]
avg_x = base_x + node_spacing * 0.4
else:
avg_x = sum(parent_positions) / len(parent_positions)
else:
avg_x = i * node_spacing * 0.5
positions[node] = {"level": level, "x": avg_x, "y": y}
nodes_at_level_sorted = sorted(
nodes_at_level, key=lambda n: positions[n]["x"]
)
min_spacing = node_spacing * 0.6 # Minimum horizontal distance
for i in range(len(nodes_at_level_sorted) - 1):
current_node = nodes_at_level_sorted[i]
next_node = nodes_at_level_sorted[i + 1]
current_x = positions[current_node]["x"]
next_x = positions[next_node]["x"]
if next_x - current_x < min_spacing:
positions[next_node]["x"] = current_x + min_spacing
return positions
def render_interactive(
dag: FlowStructure,
filename: str = "flow_dag.html",
@@ -110,6 +220,8 @@ def render_interactive(
Returns:
Absolute path to generated HTML file in temporary directory.
"""
node_positions = calculate_node_positions(dag)
nodes_list: list[dict[str, Any]] = []
for name, metadata in dag["nodes"].items():
node_type: str = metadata.get("type", "listen")
@@ -120,37 +232,37 @@ def render_interactive(
if node_type == "start":
color_config = {
"background": CREWAI_ORANGE,
"border": CREWAI_ORANGE,
"background": "var(--node-bg-start)",
"border": "var(--node-border-start)",
"highlight": {
"background": CREWAI_ORANGE,
"border": CREWAI_ORANGE,
"background": "var(--node-bg-start)",
"border": "var(--node-border-start)",
},
}
font_color = WHITE
border_width = 2
font_color = "var(--node-text-color)"
border_width = 3
elif node_type == "router":
color_config = {
"background": DARK_GRAY,
"background": "var(--node-bg-router)",
"border": CREWAI_ORANGE,
"highlight": {
"background": DARK_GRAY,
"background": "var(--node-bg-router)",
"border": CREWAI_ORANGE,
},
}
font_color = WHITE
font_color = "var(--node-text-color)"
border_width = 3
else:
color_config = {
"background": DARK_GRAY,
"border": DARK_GRAY,
"background": "var(--node-bg-listen)",
"border": "var(--node-border-listen)",
"highlight": {
"background": DARK_GRAY,
"border": DARK_GRAY,
"background": "var(--node-bg-listen)",
"border": "var(--node-border-listen)",
},
}
font_color = WHITE
border_width = 2
font_color = "var(--node-text-color)"
border_width = 3
title_parts: list[str] = []
@@ -215,25 +327,34 @@ def render_interactive(
bg_color = color_config["background"]
border_color = color_config["border"]
nodes_list.append(
{
"id": name,
"label": name,
"title": "".join(title_parts),
"shape": "custom",
"size": 30,
"nodeStyle": {
"name": name,
"bgColor": bg_color,
"borderColor": border_color,
"borderWidth": border_width,
"fontColor": font_color,
},
"opacity": 1.0,
"glowSize": 0,
"glowColor": None,
}
)
position_data = node_positions.get(name, {"level": 0, "x": 0, "y": 0})
node_data: dict[str, Any] = {
"id": name,
"label": name,
"title": "".join(title_parts),
"shape": "custom",
"size": 30,
"level": position_data["level"],
"nodeStyle": {
"name": name,
"bgColor": bg_color,
"borderColor": border_color,
"borderWidth": border_width,
"fontColor": font_color,
},
"opacity": 1.0,
"glowSize": 0,
"glowColor": None,
}
# Add x,y only for graphs with 3-4 nodes
total_nodes = len(dag["nodes"])
if 3 <= total_nodes <= 4:
node_data["x"] = position_data["x"]
node_data["y"] = position_data["y"]
nodes_list.append(node_data)
execution_paths: int = calculate_execution_paths(dag)
@@ -246,6 +367,8 @@ def render_interactive(
if edge["is_router_path"]:
edge_color = CREWAI_ORANGE
edge_dashes = [15, 10]
if "router_path_label" in edge:
edge_label = edge["router_path_label"]
elif edge["condition_type"] == "AND":
edge_label = "AND"
edge_color = CREWAI_ORANGE

View File

@@ -10,6 +10,7 @@ class NodeMetadata(TypedDict, total=False):
is_router: bool
router_paths: list[str]
condition_type: str | None
trigger_condition_type: str | None
trigger_methods: list[str]
trigger_condition: dict[str, Any] | None
method_signature: dict[str, Any]
@@ -22,13 +23,14 @@ class NodeMetadata(TypedDict, total=False):
class_line_number: int
class StructureEdge(TypedDict):
class StructureEdge(TypedDict, total=False):
"""Represents a connection in the flow structure."""
source: str
target: str
condition_type: str | None
is_router_path: bool
router_path_label: str
class FlowStructure(TypedDict):

View File

@@ -358,6 +358,7 @@ class LiteAgent(FlowTrackable, BaseModel):
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=self._messages,
)
# Process guardrail if set

View File

@@ -6,6 +6,8 @@ from typing import Any
from pydantic import BaseModel, Field
from crewai.utilities.types import LLMMessage
class LiteAgentOutput(BaseModel):
"""Class that represents the result of a LiteAgent execution."""
@@ -20,6 +22,7 @@ class LiteAgentOutput(BaseModel):
usage_metrics: dict[str, Any] | None = Field(
description="Token usage metrics for this execution", default=None
)
messages: list[LLMMessage] = Field(description="Messages of the agent", default=[])
def to_dict(self) -> dict[str, Any]:
"""Convert pydantic_output to a dictionary."""

View File

@@ -38,6 +38,13 @@ from crewai.events.types.tool_usage_events import (
ToolUsageStartedEvent,
)
from crewai.llms.base_llm import BaseLLM
from crewai.llms.constants import (
ANTHROPIC_MODELS,
AZURE_MODELS,
BEDROCK_MODELS,
GEMINI_MODELS,
OPENAI_MODELS,
)
from crewai.utilities import InternalInstructor
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -323,18 +330,64 @@ class LLM(BaseLLM):
completion_cost: float | None = None
def __new__(cls, model: str, is_litellm: bool = False, **kwargs: Any) -> LLM:
"""Factory method that routes to native SDK or falls back to LiteLLM."""
"""Factory method that routes to native SDK or falls back to LiteLLM.
Routing priority:
1. If 'provider' kwarg is present, use that provider with constants
2. If only 'model' kwarg, use constants to infer provider
3. If "/" in model name:
- Check if prefix is a native provider (openai/anthropic/azure/bedrock/gemini)
- If yes, validate model against constants
- If valid, route to native SDK; otherwise route to LiteLLM
"""
if not model or not isinstance(model, str):
raise ValueError("Model must be a non-empty string")
provider = model.partition("/")[0] if "/" in model else "openai"
explicit_provider = kwargs.get("provider")
native_class = cls._get_native_provider(provider)
if explicit_provider:
provider = explicit_provider
use_native = True
model_string = model
elif "/" in model:
prefix, _, model_part = model.partition("/")
provider_mapping = {
"openai": "openai",
"anthropic": "anthropic",
"claude": "anthropic",
"azure": "azure",
"azure_openai": "azure",
"google": "gemini",
"gemini": "gemini",
"bedrock": "bedrock",
"aws": "bedrock",
}
canonical_provider = provider_mapping.get(prefix.lower())
if canonical_provider and cls._validate_model_in_constants(
model_part, canonical_provider
):
provider = canonical_provider
use_native = True
model_string = model_part
else:
provider = prefix
use_native = False
model_string = model_part
else:
provider = cls._infer_provider_from_model(model)
use_native = True
model_string = model
native_class = cls._get_native_provider(provider) if use_native else None
if native_class and not is_litellm and provider in SUPPORTED_NATIVE_PROVIDERS:
try:
model_string = model.partition("/")[2] if "/" in model else model
# Remove 'provider' from kwargs if it exists to avoid duplicate keyword argument
kwargs_copy = {k: v for k, v in kwargs.items() if k != 'provider'}
return cast(
Self, native_class(model=model_string, provider=provider, **kwargs)
Self, native_class(model=model_string, provider=provider, **kwargs_copy)
)
except NotImplementedError:
raise
@@ -351,6 +404,63 @@ class LLM(BaseLLM):
instance.is_litellm = True
return instance
@classmethod
def _validate_model_in_constants(cls, model: str, provider: str) -> bool:
"""Validate if a model name exists in the provider's constants.
Args:
model: The model name to validate
provider: The provider to check against (canonical name)
Returns:
True if the model exists in the provider's constants, False otherwise
"""
if provider == "openai":
return model in OPENAI_MODELS
if provider == "anthropic" or provider == "claude":
return model in ANTHROPIC_MODELS
if provider == "gemini":
return model in GEMINI_MODELS
if provider == "bedrock":
return model in BEDROCK_MODELS
if provider == "azure":
# azure does not provide a list of available models, determine a better way to handle this
return True
return False
@classmethod
def _infer_provider_from_model(cls, model: str) -> str:
"""Infer the provider from the model name.
Args:
model: The model name without provider prefix
Returns:
The inferred provider name, defaults to "openai"
"""
if model in OPENAI_MODELS:
return "openai"
if model in ANTHROPIC_MODELS:
return "anthropic"
if model in GEMINI_MODELS:
return "gemini"
if model in BEDROCK_MODELS:
return "bedrock"
if model in AZURE_MODELS:
return "azure"
return "openai"
@classmethod
def _get_native_provider(cls, provider: str) -> type | None:
"""Get native provider class if available."""

View File

@@ -0,0 +1,558 @@
from typing import Literal, TypeAlias
OpenAIModels: TypeAlias = Literal[
"gpt-3.5-turbo",
"gpt-3.5-turbo-0125",
"gpt-3.5-turbo-0301",
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-1106",
"gpt-3.5-turbo-16k",
"gpt-3.5-turbo-16k-0613",
"gpt-3.5-turbo-instruct",
"gpt-3.5-turbo-instruct-0914",
"gpt-4",
"gpt-4-0125-preview",
"gpt-4-0314",
"gpt-4-0613",
"gpt-4-1106-preview",
"gpt-4-32k",
"gpt-4-32k-0314",
"gpt-4-32k-0613",
"gpt-4-turbo",
"gpt-4-turbo-2024-04-09",
"gpt-4-turbo-preview",
"gpt-4-vision-preview",
"gpt-4.1",
"gpt-4.1-2025-04-14",
"gpt-4.1-mini",
"gpt-4.1-mini-2025-04-14",
"gpt-4.1-nano",
"gpt-4.1-nano-2025-04-14",
"gpt-4o",
"gpt-4o-2024-05-13",
"gpt-4o-2024-08-06",
"gpt-4o-2024-11-20",
"gpt-4o-audio-preview",
"gpt-4o-audio-preview-2024-10-01",
"gpt-4o-audio-preview-2024-12-17",
"gpt-4o-audio-preview-2025-06-03",
"gpt-4o-mini",
"gpt-4o-mini-2024-07-18",
"gpt-4o-mini-audio-preview",
"gpt-4o-mini-audio-preview-2024-12-17",
"gpt-4o-mini-realtime-preview",
"gpt-4o-mini-realtime-preview-2024-12-17",
"gpt-4o-mini-search-preview",
"gpt-4o-mini-search-preview-2025-03-11",
"gpt-4o-mini-transcribe",
"gpt-4o-mini-tts",
"gpt-4o-realtime-preview",
"gpt-4o-realtime-preview-2024-10-01",
"gpt-4o-realtime-preview-2024-12-17",
"gpt-4o-realtime-preview-2025-06-03",
"gpt-4o-search-preview",
"gpt-4o-search-preview-2025-03-11",
"gpt-4o-transcribe",
"gpt-4o-transcribe-diarize",
"gpt-5",
"gpt-5-2025-08-07",
"gpt-5-chat",
"gpt-5-chat-latest",
"gpt-5-codex",
"gpt-5-mini",
"gpt-5-mini-2025-08-07",
"gpt-5-nano",
"gpt-5-nano-2025-08-07",
"gpt-5-pro",
"gpt-5-pro-2025-10-06",
"gpt-5-search-api",
"gpt-5-search-api-2025-10-14",
"gpt-audio",
"gpt-audio-2025-08-28",
"gpt-audio-mini",
"gpt-audio-mini-2025-10-06",
"gpt-image-1",
"gpt-image-1-mini",
"gpt-realtime",
"gpt-realtime-2025-08-28",
"gpt-realtime-mini",
"gpt-realtime-mini-2025-10-06",
"o1",
"o1-preview",
"o1-2024-12-17",
"o1-mini",
"o1-mini-2024-09-12",
"o1-pro",
"o1-pro-2025-03-19",
"o3-mini",
"o3",
"o4-mini",
"whisper-1",
]
OPENAI_MODELS: list[OpenAIModels] = [
"gpt-3.5-turbo",
"gpt-3.5-turbo-0125",
"gpt-3.5-turbo-0301",
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-1106",
"gpt-3.5-turbo-16k",
"gpt-3.5-turbo-16k-0613",
"gpt-3.5-turbo-instruct",
"gpt-3.5-turbo-instruct-0914",
"gpt-4",
"gpt-4-0125-preview",
"gpt-4-0314",
"gpt-4-0613",
"gpt-4-1106-preview",
"gpt-4-32k",
"gpt-4-32k-0314",
"gpt-4-32k-0613",
"gpt-4-turbo",
"gpt-4-turbo-2024-04-09",
"gpt-4-turbo-preview",
"gpt-4-vision-preview",
"gpt-4.1",
"gpt-4.1-2025-04-14",
"gpt-4.1-mini",
"gpt-4.1-mini-2025-04-14",
"gpt-4.1-nano",
"gpt-4.1-nano-2025-04-14",
"gpt-4o",
"gpt-4o-2024-05-13",
"gpt-4o-2024-08-06",
"gpt-4o-2024-11-20",
"gpt-4o-audio-preview",
"gpt-4o-audio-preview-2024-10-01",
"gpt-4o-audio-preview-2024-12-17",
"gpt-4o-audio-preview-2025-06-03",
"gpt-4o-mini",
"gpt-4o-mini-2024-07-18",
"gpt-4o-mini-audio-preview",
"gpt-4o-mini-audio-preview-2024-12-17",
"gpt-4o-mini-realtime-preview",
"gpt-4o-mini-realtime-preview-2024-12-17",
"gpt-4o-mini-search-preview",
"gpt-4o-mini-search-preview-2025-03-11",
"gpt-4o-mini-transcribe",
"gpt-4o-mini-tts",
"gpt-4o-realtime-preview",
"gpt-4o-realtime-preview-2024-10-01",
"gpt-4o-realtime-preview-2024-12-17",
"gpt-4o-realtime-preview-2025-06-03",
"gpt-4o-search-preview",
"gpt-4o-search-preview-2025-03-11",
"gpt-4o-transcribe",
"gpt-4o-transcribe-diarize",
"gpt-5",
"gpt-5-2025-08-07",
"gpt-5-chat",
"gpt-5-chat-latest",
"gpt-5-codex",
"gpt-5-mini",
"gpt-5-mini-2025-08-07",
"gpt-5-nano",
"gpt-5-nano-2025-08-07",
"gpt-5-pro",
"gpt-5-pro-2025-10-06",
"gpt-5-search-api",
"gpt-5-search-api-2025-10-14",
"gpt-audio",
"gpt-audio-2025-08-28",
"gpt-audio-mini",
"gpt-audio-mini-2025-10-06",
"gpt-image-1",
"gpt-image-1-mini",
"gpt-realtime",
"gpt-realtime-2025-08-28",
"gpt-realtime-mini",
"gpt-realtime-mini-2025-10-06",
"o1",
"o1-preview",
"o1-2024-12-17",
"o1-mini",
"o1-mini-2024-09-12",
"o1-pro",
"o1-pro-2025-03-19",
"o3-mini",
"o3",
"o4-mini",
"whisper-1",
]
AnthropicModels: TypeAlias = Literal[
"claude-3-7-sonnet-latest",
"claude-3-7-sonnet-20250219",
"claude-3-5-haiku-latest",
"claude-3-5-haiku-20241022",
"claude-haiku-4-5",
"claude-haiku-4-5-20251001",
"claude-sonnet-4-20250514",
"claude-sonnet-4-0",
"claude-4-sonnet-20250514",
"claude-sonnet-4-5",
"claude-sonnet-4-5-20250929",
"claude-3-5-sonnet-latest",
"claude-3-5-sonnet-20241022",
"claude-3-5-sonnet-20240620",
"claude-opus-4-0",
"claude-opus-4-20250514",
"claude-4-opus-20250514",
"claude-opus-4-1",
"claude-opus-4-1-20250805",
"claude-3-opus-latest",
"claude-3-opus-20240229",
"claude-3-sonnet-20240229",
"claude-3-haiku-latest",
"claude-3-haiku-20240307",
]
ANTHROPIC_MODELS: list[AnthropicModels] = [
"claude-3-7-sonnet-latest",
"claude-3-7-sonnet-20250219",
"claude-3-5-haiku-latest",
"claude-3-5-haiku-20241022",
"claude-haiku-4-5",
"claude-haiku-4-5-20251001",
"claude-sonnet-4-20250514",
"claude-sonnet-4-0",
"claude-4-sonnet-20250514",
"claude-sonnet-4-5",
"claude-sonnet-4-5-20250929",
"claude-3-5-sonnet-latest",
"claude-3-5-sonnet-20241022",
"claude-3-5-sonnet-20240620",
"claude-opus-4-0",
"claude-opus-4-20250514",
"claude-4-opus-20250514",
"claude-opus-4-1",
"claude-opus-4-1-20250805",
"claude-3-opus-latest",
"claude-3-opus-20240229",
"claude-3-sonnet-20240229",
"claude-3-haiku-latest",
"claude-3-haiku-20240307",
]
GeminiModels: TypeAlias = Literal[
"gemini-2.5-pro",
"gemini-2.5-pro-preview-03-25",
"gemini-2.5-pro-preview-05-06",
"gemini-2.5-pro-preview-06-05",
"gemini-2.5-flash",
"gemini-2.5-flash-preview-05-20",
"gemini-2.5-flash-preview-04-17",
"gemini-2.5-flash-image",
"gemini-2.5-flash-image-preview",
"gemini-2.5-flash-lite",
"gemini-2.5-flash-lite-preview-06-17",
"gemini-2.5-flash-preview-09-2025",
"gemini-2.5-flash-lite-preview-09-2025",
"gemini-2.5-flash-preview-tts",
"gemini-2.5-pro-preview-tts",
"gemini-2.5-computer-use-preview-10-2025",
"gemini-2.0-flash",
"gemini-2.0-flash-001",
"gemini-2.0-flash-exp",
"gemini-2.0-flash-exp-image-generation",
"gemini-2.0-flash-lite",
"gemini-2.0-flash-lite-001",
"gemini-2.0-flash-lite-preview",
"gemini-2.0-flash-lite-preview-02-05",
"gemini-2.0-flash-preview-image-generation",
"gemini-2.0-flash-thinking-exp",
"gemini-2.0-flash-thinking-exp-01-21",
"gemini-2.0-flash-thinking-exp-1219",
"gemini-2.0-pro-exp",
"gemini-2.0-pro-exp-02-05",
"gemini-exp-1206",
"gemini-1.5-pro",
"gemini-1.5-flash",
"gemini-1.5-flash-8b",
"gemini-flash-latest",
"gemini-flash-lite-latest",
"gemini-pro-latest",
"gemini-2.0-flash-live-001",
"gemini-live-2.5-flash-preview",
"gemini-2.5-flash-live-preview",
"gemini-robotics-er-1.5-preview",
"gemini-gemma-2-27b-it",
"gemini-gemma-2-9b-it",
"gemma-3-1b-it",
"gemma-3-4b-it",
"gemma-3-12b-it",
"gemma-3-27b-it",
"gemma-3n-e2b-it",
"gemma-3n-e4b-it",
"learnlm-2.0-flash-experimental",
]
GEMINI_MODELS: list[GeminiModels] = [
"gemini-2.5-pro",
"gemini-2.5-pro-preview-03-25",
"gemini-2.5-pro-preview-05-06",
"gemini-2.5-pro-preview-06-05",
"gemini-2.5-flash",
"gemini-2.5-flash-preview-05-20",
"gemini-2.5-flash-preview-04-17",
"gemini-2.5-flash-image",
"gemini-2.5-flash-image-preview",
"gemini-2.5-flash-lite",
"gemini-2.5-flash-lite-preview-06-17",
"gemini-2.5-flash-preview-09-2025",
"gemini-2.5-flash-lite-preview-09-2025",
"gemini-2.5-flash-preview-tts",
"gemini-2.5-pro-preview-tts",
"gemini-2.5-computer-use-preview-10-2025",
"gemini-2.0-flash",
"gemini-2.0-flash-001",
"gemini-2.0-flash-exp",
"gemini-2.0-flash-exp-image-generation",
"gemini-2.0-flash-lite",
"gemini-2.0-flash-lite-001",
"gemini-2.0-flash-lite-preview",
"gemini-2.0-flash-lite-preview-02-05",
"gemini-2.0-flash-preview-image-generation",
"gemini-2.0-flash-thinking-exp",
"gemini-2.0-flash-thinking-exp-01-21",
"gemini-2.0-flash-thinking-exp-1219",
"gemini-2.0-pro-exp",
"gemini-2.0-pro-exp-02-05",
"gemini-exp-1206",
"gemini-1.5-pro",
"gemini-1.5-flash",
"gemini-1.5-flash-8b",
"gemini-flash-latest",
"gemini-flash-lite-latest",
"gemini-pro-latest",
"gemini-2.0-flash-live-001",
"gemini-live-2.5-flash-preview",
"gemini-2.5-flash-live-preview",
"gemini-robotics-er-1.5-preview",
"gemini-gemma-2-27b-it",
"gemini-gemma-2-9b-it",
"gemma-3-1b-it",
"gemma-3-4b-it",
"gemma-3-12b-it",
"gemma-3-27b-it",
"gemma-3n-e2b-it",
"gemma-3n-e4b-it",
"learnlm-2.0-flash-experimental",
]
AzureModels: TypeAlias = Literal[
"gpt-3.5-turbo",
"gpt-3.5-turbo-0301",
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k",
"gpt-3.5-turbo-16k-0613",
"gpt-35-turbo",
"gpt-35-turbo-0125",
"gpt-35-turbo-1106",
"gpt-35-turbo-16k-0613",
"gpt-35-turbo-instruct-0914",
"gpt-4",
"gpt-4-0314",
"gpt-4-0613",
"gpt-4-1106-preview",
"gpt-4-0125-preview",
"gpt-4-32k",
"gpt-4-32k-0314",
"gpt-4-32k-0613",
"gpt-4-turbo",
"gpt-4-turbo-2024-04-09",
"gpt-4-vision",
"gpt-4o",
"gpt-4o-2024-05-13",
"gpt-4o-2024-08-06",
"gpt-4o-2024-11-20",
"gpt-4o-mini",
"gpt-5",
"o1",
"o1-mini",
"o1-preview",
"o3-mini",
"o3",
"o4-mini",
]
AZURE_MODELS: list[AzureModels] = [
"gpt-3.5-turbo",
"gpt-3.5-turbo-0301",
"gpt-3.5-turbo-0613",
"gpt-3.5-turbo-16k",
"gpt-3.5-turbo-16k-0613",
"gpt-35-turbo",
"gpt-35-turbo-0125",
"gpt-35-turbo-1106",
"gpt-35-turbo-16k-0613",
"gpt-35-turbo-instruct-0914",
"gpt-4",
"gpt-4-0314",
"gpt-4-0613",
"gpt-4-1106-preview",
"gpt-4-0125-preview",
"gpt-4-32k",
"gpt-4-32k-0314",
"gpt-4-32k-0613",
"gpt-4-turbo",
"gpt-4-turbo-2024-04-09",
"gpt-4-vision",
"gpt-4o",
"gpt-4o-2024-05-13",
"gpt-4o-2024-08-06",
"gpt-4o-2024-11-20",
"gpt-4o-mini",
"gpt-5",
"o1",
"o1-mini",
"o1-preview",
"o3-mini",
"o3",
"o4-mini",
]
BedrockModels: TypeAlias = Literal[
"ai21.jamba-1-5-large-v1:0",
"ai21.jamba-1-5-mini-v1:0",
"amazon.nova-lite-v1:0",
"amazon.nova-lite-v1:0:24k",
"amazon.nova-lite-v1:0:300k",
"amazon.nova-micro-v1:0",
"amazon.nova-micro-v1:0:128k",
"amazon.nova-micro-v1:0:24k",
"amazon.nova-premier-v1:0",
"amazon.nova-premier-v1:0:1000k",
"amazon.nova-premier-v1:0:20k",
"amazon.nova-premier-v1:0:8k",
"amazon.nova-premier-v1:0:mm",
"amazon.nova-pro-v1:0",
"amazon.nova-pro-v1:0:24k",
"amazon.nova-pro-v1:0:300k",
"amazon.titan-text-express-v1",
"amazon.titan-text-express-v1:0:8k",
"amazon.titan-text-lite-v1",
"amazon.titan-text-lite-v1:0:4k",
"amazon.titan-tg1-large",
"anthropic.claude-3-5-haiku-20241022-v1:0",
"anthropic.claude-3-5-sonnet-20240620-v1:0",
"anthropic.claude-3-5-sonnet-20241022-v2:0",
"anthropic.claude-3-7-sonnet-20250219-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0:200k",
"anthropic.claude-3-haiku-20240307-v1:0:48k",
"anthropic.claude-3-opus-20240229-v1:0",
"anthropic.claude-3-opus-20240229-v1:0:12k",
"anthropic.claude-3-opus-20240229-v1:0:200k",
"anthropic.claude-3-opus-20240229-v1:0:28k",
"anthropic.claude-3-sonnet-20240229-v1:0",
"anthropic.claude-3-sonnet-20240229-v1:0:200k",
"anthropic.claude-3-sonnet-20240229-v1:0:28k",
"anthropic.claude-haiku-4-5-20251001-v1:0",
"anthropic.claude-instant-v1:2:100k",
"anthropic.claude-opus-4-1-20250805-v1:0",
"anthropic.claude-opus-4-20250514-v1:0",
"anthropic.claude-sonnet-4-20250514-v1:0",
"anthropic.claude-sonnet-4-5-20250929-v1:0",
"anthropic.claude-v2:0:100k",
"anthropic.claude-v2:0:18k",
"anthropic.claude-v2:1:18k",
"anthropic.claude-v2:1:200k",
"cohere.command-r-plus-v1:0",
"cohere.command-r-v1:0",
"cohere.rerank-v3-5:0",
"deepseek.r1-v1:0",
"meta.llama3-1-70b-instruct-v1:0",
"meta.llama3-1-8b-instruct-v1:0",
"meta.llama3-2-11b-instruct-v1:0",
"meta.llama3-2-1b-instruct-v1:0",
"meta.llama3-2-3b-instruct-v1:0",
"meta.llama3-2-90b-instruct-v1:0",
"meta.llama3-3-70b-instruct-v1:0",
"meta.llama3-70b-instruct-v1:0",
"meta.llama3-8b-instruct-v1:0",
"meta.llama4-maverick-17b-instruct-v1:0",
"meta.llama4-scout-17b-instruct-v1:0",
"mistral.mistral-7b-instruct-v0:2",
"mistral.mistral-large-2402-v1:0",
"mistral.mistral-small-2402-v1:0",
"mistral.mixtral-8x7b-instruct-v0:1",
"mistral.pixtral-large-2502-v1:0",
"openai.gpt-oss-120b-1:0",
"openai.gpt-oss-20b-1:0",
"qwen.qwen3-32b-v1:0",
"qwen.qwen3-coder-30b-a3b-v1:0",
"twelvelabs.pegasus-1-2-v1:0",
]
BEDROCK_MODELS: list[BedrockModels] = [
"ai21.jamba-1-5-large-v1:0",
"ai21.jamba-1-5-mini-v1:0",
"amazon.nova-lite-v1:0",
"amazon.nova-lite-v1:0:24k",
"amazon.nova-lite-v1:0:300k",
"amazon.nova-micro-v1:0",
"amazon.nova-micro-v1:0:128k",
"amazon.nova-micro-v1:0:24k",
"amazon.nova-premier-v1:0",
"amazon.nova-premier-v1:0:1000k",
"amazon.nova-premier-v1:0:20k",
"amazon.nova-premier-v1:0:8k",
"amazon.nova-premier-v1:0:mm",
"amazon.nova-pro-v1:0",
"amazon.nova-pro-v1:0:24k",
"amazon.nova-pro-v1:0:300k",
"amazon.titan-text-express-v1",
"amazon.titan-text-express-v1:0:8k",
"amazon.titan-text-lite-v1",
"amazon.titan-text-lite-v1:0:4k",
"amazon.titan-tg1-large",
"anthropic.claude-3-5-haiku-20241022-v1:0",
"anthropic.claude-3-5-sonnet-20240620-v1:0",
"anthropic.claude-3-5-sonnet-20241022-v2:0",
"anthropic.claude-3-7-sonnet-20250219-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0",
"anthropic.claude-3-haiku-20240307-v1:0:200k",
"anthropic.claude-3-haiku-20240307-v1:0:48k",
"anthropic.claude-3-opus-20240229-v1:0",
"anthropic.claude-3-opus-20240229-v1:0:12k",
"anthropic.claude-3-opus-20240229-v1:0:200k",
"anthropic.claude-3-opus-20240229-v1:0:28k",
"anthropic.claude-3-sonnet-20240229-v1:0",
"anthropic.claude-3-sonnet-20240229-v1:0:200k",
"anthropic.claude-3-sonnet-20240229-v1:0:28k",
"anthropic.claude-haiku-4-5-20251001-v1:0",
"anthropic.claude-instant-v1:2:100k",
"anthropic.claude-opus-4-1-20250805-v1:0",
"anthropic.claude-opus-4-20250514-v1:0",
"anthropic.claude-sonnet-4-20250514-v1:0",
"anthropic.claude-sonnet-4-5-20250929-v1:0",
"anthropic.claude-v2:0:100k",
"anthropic.claude-v2:0:18k",
"anthropic.claude-v2:1:18k",
"anthropic.claude-v2:1:200k",
"cohere.command-r-plus-v1:0",
"cohere.command-r-v1:0",
"cohere.rerank-v3-5:0",
"deepseek.r1-v1:0",
"meta.llama3-1-70b-instruct-v1:0",
"meta.llama3-1-8b-instruct-v1:0",
"meta.llama3-2-11b-instruct-v1:0",
"meta.llama3-2-1b-instruct-v1:0",
"meta.llama3-2-3b-instruct-v1:0",
"meta.llama3-2-90b-instruct-v1:0",
"meta.llama3-3-70b-instruct-v1:0",
"meta.llama3-70b-instruct-v1:0",
"meta.llama3-8b-instruct-v1:0",
"meta.llama4-maverick-17b-instruct-v1:0",
"meta.llama4-scout-17b-instruct-v1:0",
"mistral.mistral-7b-instruct-v0:2",
"mistral.mistral-large-2402-v1:0",
"mistral.mistral-small-2402-v1:0",
"mistral.mixtral-8x7b-instruct-v0:1",
"mistral.pixtral-large-2502-v1:0",
"openai.gpt-oss-120b-1:0",
"openai.gpt-oss-20b-1:0",
"qwen.qwen3-32b-v1:0",
"qwen.qwen3-coder-30b-a3b-v1:0",
"twelvelabs.pegasus-1-2-v1:0",
]

View File

@@ -7,7 +7,14 @@ outbound and inbound messages at the transport level.
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Generic, TypeVar
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from pydantic_core import core_schema
if TYPE_CHECKING:
from pydantic import GetCoreSchemaHandler
from pydantic_core import CoreSchema
T = TypeVar("T")
@@ -25,6 +32,7 @@ class BaseInterceptor(ABC, Generic[T, U]):
U: Inbound message type (e.g., httpx.Response)
Example:
>>> import httpx
>>> class CustomInterceptor(BaseInterceptor[httpx.Request, httpx.Response]):
... def on_outbound(self, message: httpx.Request) -> httpx.Request:
... message.headers["X-Custom-Header"] = "value"
@@ -80,3 +88,46 @@ class BaseInterceptor(ABC, Generic[T, U]):
Modified message object.
"""
raise NotImplementedError
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler
) -> CoreSchema:
"""Generate Pydantic core schema for BaseInterceptor.
This allows the generic BaseInterceptor to be used in Pydantic models
without requiring arbitrary_types_allowed=True. The schema validates
that the value is an instance of BaseInterceptor.
Args:
_source_type: The source type being validated (unused).
_handler: Handler for generating schemas (unused).
Returns:
A Pydantic core schema that validates BaseInterceptor instances.
"""
return core_schema.no_info_plain_validator_function(
_validate_interceptor,
serialization=core_schema.plain_serializer_function_ser_schema(
lambda x: x, return_schema=core_schema.any_schema()
),
)
def _validate_interceptor(value: Any) -> BaseInterceptor[T, U]:
"""Validate that the value is a BaseInterceptor instance.
Args:
value: The value to validate.
Returns:
The validated BaseInterceptor instance.
Raises:
ValueError: If the value is not a BaseInterceptor instance.
"""
if not isinstance(value, BaseInterceptor):
raise ValueError(
f"Expected BaseInterceptor instance, got {type(value).__name__}"
)
return value

View File

@@ -6,16 +6,52 @@ to enable request/response modification at the transport level.
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from collections.abc import Iterable
from typing import TYPE_CHECKING, TypedDict
import httpx
from httpx import (
AsyncHTTPTransport as _AsyncHTTPTransport,
HTTPTransport as _HTTPTransport,
)
from typing_extensions import NotRequired, Unpack
if TYPE_CHECKING:
from ssl import SSLContext
from httpx import Limits, Request, Response
from httpx._types import CertTypes, ProxyTypes
from crewai.llms.hooks.base import BaseInterceptor
class HTTPTransport(httpx.HTTPTransport):
class HTTPTransportKwargs(TypedDict, total=False):
"""Typed dictionary for httpx.HTTPTransport initialization parameters.
These parameters configure the underlying HTTP transport behavior including
SSL verification, proxies, connection limits, and low-level socket options.
"""
verify: bool | str | SSLContext
cert: NotRequired[CertTypes]
trust_env: bool
http1: bool
http2: bool
limits: Limits
proxy: NotRequired[ProxyTypes]
uds: NotRequired[str]
local_address: NotRequired[str]
retries: int
socket_options: NotRequired[
Iterable[
tuple[int, int, int]
| tuple[int, int, bytes | bytearray]
| tuple[int, int, None, int]
]
]
class HTTPTransport(_HTTPTransport):
"""HTTP transport that uses an interceptor for request/response modification.
This transport is used internally when a user provides a BaseInterceptor.
@@ -25,19 +61,19 @@ class HTTPTransport(httpx.HTTPTransport):
def __init__(
self,
interceptor: BaseInterceptor[httpx.Request, httpx.Response],
**kwargs: Any,
interceptor: BaseInterceptor[Request, Response],
**kwargs: Unpack[HTTPTransportKwargs],
) -> None:
"""Initialize transport with interceptor.
Args:
interceptor: HTTP interceptor for modifying raw request/response objects.
**kwargs: Additional arguments passed to httpx.HTTPTransport.
**kwargs: HTTPTransport configuration parameters (verify, cert, proxy, etc.).
"""
super().__init__(**kwargs)
self.interceptor = interceptor
def handle_request(self, request: httpx.Request) -> httpx.Response:
def handle_request(self, request: Request) -> Response:
"""Handle request with interception.
Args:
@@ -51,7 +87,7 @@ class HTTPTransport(httpx.HTTPTransport):
return self.interceptor.on_inbound(response)
class AsyncHTTPransport(httpx.AsyncHTTPTransport):
class AsyncHTTPTransport(_AsyncHTTPTransport):
"""Async HTTP transport that uses an interceptor for request/response modification.
This transport is used internally when a user provides a BaseInterceptor.
@@ -61,19 +97,19 @@ class AsyncHTTPransport(httpx.AsyncHTTPTransport):
def __init__(
self,
interceptor: BaseInterceptor[httpx.Request, httpx.Response],
**kwargs: Any,
interceptor: BaseInterceptor[Request, Response],
**kwargs: Unpack[HTTPTransportKwargs],
) -> None:
"""Initialize async transport with interceptor.
Args:
interceptor: HTTP interceptor for modifying raw request/response objects.
**kwargs: Additional arguments passed to httpx.AsyncHTTPTransport.
**kwargs: HTTPTransport configuration parameters (verify, cert, proxy, etc.).
"""
super().__init__(**kwargs)
self.interceptor = interceptor
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
async def handle_async_request(self, request: Request) -> Response:
"""Handle async request with interception.
Args:

View File

@@ -94,6 +94,30 @@ class AnthropicCompletion(BaseLLM):
self.is_claude_3 = "claude-3" in model.lower()
self.supports_tools = self.is_claude_3 # Claude 3+ supports tool use
@property
def stop(self) -> list[str]:
"""Get stop sequences sent to the API."""
return self.stop_sequences
@stop.setter
def stop(self, value: list[str] | str | None) -> None:
"""Set stop sequences.
Synchronizes stop_sequences to ensure values set by CrewAgentExecutor
are properly sent to the Anthropic API.
Args:
value: Stop sequences as a list, single string, or None
"""
if value is None:
self.stop_sequences = []
elif isinstance(value, str):
self.stop_sequences = [value]
elif isinstance(value, list):
self.stop_sequences = value
else:
self.stop_sequences = []
def _get_client_params(self) -> dict[str, Any]:
"""Get client parameters."""

View File

@@ -277,14 +277,10 @@ class AzureCompletion(BaseLLM):
"stream": self.stream,
}
# Azure AI Inference SDK doesn't support json_schema response_format (important-comment)
# Use json_object instead and rely on client-side Pydantic validation
if response_model and self.is_openai_model:
params["response_format"] = {
"type": "json_schema",
"json_schema": {
"name": response_model.__name__,
"schema": response_model.model_json_schema(),
},
}
params["response_format"] = {"type": "json_object"}
# Only include model parameter for non-Azure OpenAI endpoints
# Azure OpenAI endpoints have the deployment name in the URL

View File

@@ -243,6 +243,30 @@ class BedrockCompletion(BaseLLM):
# Handle inference profiles for newer models
self.model_id = model
@property
def stop(self) -> list[str]:
"""Get stop sequences sent to the API."""
return list(self.stop_sequences)
@stop.setter
def stop(self, value: Sequence[str] | str | None) -> None:
"""Set stop sequences.
Synchronizes stop_sequences to ensure values set by CrewAgentExecutor
are properly sent to the Bedrock API.
Args:
value: Stop sequences as a Sequence, single string, or None
"""
if value is None:
self.stop_sequences = []
elif isinstance(value, str):
self.stop_sequences = [value]
elif isinstance(value, Sequence):
self.stop_sequences = list(value)
else:
self.stop_sequences = []
def call(
self,
messages: str | list[LLMMessage],

View File

@@ -104,6 +104,30 @@ class GeminiCompletion(BaseLLM):
self.is_gemini_1_5 = "gemini-1.5" in model.lower()
self.supports_tools = self.is_gemini_1_5 or self.is_gemini_2
@property
def stop(self) -> list[str]:
"""Get stop sequences sent to the API."""
return self.stop_sequences
@stop.setter
def stop(self, value: list[str] | str | None) -> None:
"""Set stop sequences.
Synchronizes stop_sequences to ensure values set by CrewAgentExecutor
are properly sent to the Gemini API.
Args:
value: Stop sequences as a list, single string, or None
"""
if value is None:
self.stop_sequences = []
elif isinstance(value, str):
self.stop_sequences = [value]
elif isinstance(value, list):
self.stop_sequences = value
else:
self.stop_sequences = []
def _initialize_client(self, use_vertexai: bool = False) -> genai.Client: # type: ignore[no-any-unimported]
"""Initialize the Google Gen AI client with proper parameter handling.

View File

@@ -0,0 +1,37 @@
"""MCP (Model Context Protocol) client support for CrewAI agents.
This module provides native MCP client functionality, allowing CrewAI agents
to connect to any MCP-compliant server using various transport types.
"""
from crewai.mcp.client import MCPClient
from crewai.mcp.config import (
MCPServerConfig,
MCPServerHTTP,
MCPServerSSE,
MCPServerStdio,
)
from crewai.mcp.filters import (
StaticToolFilter,
ToolFilter,
ToolFilterContext,
create_dynamic_tool_filter,
create_static_tool_filter,
)
from crewai.mcp.transports.base import BaseTransport, TransportType
__all__ = [
"BaseTransport",
"MCPClient",
"MCPServerConfig",
"MCPServerHTTP",
"MCPServerSSE",
"MCPServerStdio",
"StaticToolFilter",
"ToolFilter",
"ToolFilterContext",
"TransportType",
"create_dynamic_tool_filter",
"create_static_tool_filter",
]

View File

@@ -0,0 +1,742 @@
"""MCP client with session management for CrewAI agents."""
import asyncio
from collections.abc import Callable
from contextlib import AsyncExitStack
from datetime import datetime
import logging
import time
from typing import Any
from typing_extensions import Self
# BaseExceptionGroup is available in Python 3.11+
try:
from builtins import BaseExceptionGroup
except ImportError:
# Fallback for Python < 3.11 (shouldn't happen in practice)
BaseExceptionGroup = Exception
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.mcp_events import (
MCPConnectionCompletedEvent,
MCPConnectionFailedEvent,
MCPConnectionStartedEvent,
MCPToolExecutionCompletedEvent,
MCPToolExecutionFailedEvent,
MCPToolExecutionStartedEvent,
)
from crewai.mcp.transports.base import BaseTransport
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
# MCP Connection timeout constants (in seconds)
MCP_CONNECTION_TIMEOUT = 30 # Increased for slow servers
MCP_TOOL_EXECUTION_TIMEOUT = 30
MCP_DISCOVERY_TIMEOUT = 30 # Increased for slow servers
MCP_MAX_RETRIES = 3
# Simple in-memory cache for MCP tool schemas (duration: 5 minutes)
_mcp_schema_cache: dict[str, tuple[dict[str, Any], float]] = {}
_cache_ttl = 300 # 5 minutes
class MCPClient:
"""MCP client with session management.
This client manages connections to MCP servers and provides a high-level
interface for interacting with MCP tools, prompts, and resources.
Example:
```python
transport = StdioTransport(command="python", args=["server.py"])
client = MCPClient(transport)
async with client:
tools = await client.list_tools()
result = await client.call_tool("tool_name", {"arg": "value"})
```
"""
def __init__(
self,
transport: BaseTransport,
connect_timeout: int = MCP_CONNECTION_TIMEOUT,
execution_timeout: int = MCP_TOOL_EXECUTION_TIMEOUT,
discovery_timeout: int = MCP_DISCOVERY_TIMEOUT,
max_retries: int = MCP_MAX_RETRIES,
cache_tools_list: bool = False,
logger: logging.Logger | None = None,
) -> None:
"""Initialize MCP client.
Args:
transport: Transport instance for MCP server connection.
connect_timeout: Connection timeout in seconds.
execution_timeout: Tool execution timeout in seconds.
discovery_timeout: Tool discovery timeout in seconds.
max_retries: Maximum retry attempts for operations.
cache_tools_list: Whether to cache tool list results.
logger: Optional logger instance.
"""
self.transport = transport
self.connect_timeout = connect_timeout
self.execution_timeout = execution_timeout
self.discovery_timeout = discovery_timeout
self.max_retries = max_retries
self.cache_tools_list = cache_tools_list
# self._logger = logger or logging.getLogger(__name__)
self._session: Any = None
self._initialized = False
self._exit_stack = AsyncExitStack()
self._was_connected = False
@property
def connected(self) -> bool:
"""Check if client is connected to server."""
return self.transport.connected and self._initialized
@property
def session(self) -> Any:
"""Get the MCP session."""
if self._session is None:
raise RuntimeError("Client not connected. Call connect() first.")
return self._session
def _get_server_info(self) -> tuple[str, str | None, str | None]:
"""Get server information for events.
Returns:
Tuple of (server_name, server_url, transport_type).
"""
if isinstance(self.transport, StdioTransport):
server_name = f"{self.transport.command} {' '.join(self.transport.args)}"
server_url = None
transport_type = self.transport.transport_type.value
elif isinstance(self.transport, HTTPTransport):
server_name = self.transport.url
server_url = self.transport.url
transport_type = self.transport.transport_type.value
elif isinstance(self.transport, SSETransport):
server_name = self.transport.url
server_url = self.transport.url
transport_type = self.transport.transport_type.value
else:
server_name = "Unknown MCP Server"
server_url = None
transport_type = (
self.transport.transport_type.value
if hasattr(self.transport, "transport_type")
else None
)
return server_name, server_url, transport_type
async def connect(self) -> Self:
"""Connect to MCP server and initialize session.
Returns:
Self for method chaining.
Raises:
ConnectionError: If connection fails.
ImportError: If MCP SDK not available.
"""
if self.connected:
return self
# Get server info for events
server_name, server_url, transport_type = self._get_server_info()
is_reconnect = self._was_connected
# Emit connection started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
MCPConnectionStartedEvent(
server_name=server_name,
server_url=server_url,
transport_type=transport_type,
is_reconnect=is_reconnect,
connect_timeout=self.connect_timeout,
),
)
try:
from mcp import ClientSession
# Use AsyncExitStack to manage transport and session contexts together
# This ensures they're in the same async scope and prevents cancel scope errors
# Always enter transport context via exit stack (it handles already-connected state)
await self._exit_stack.enter_async_context(self.transport)
# Create ClientSession with transport streams
self._session = ClientSession(
self.transport.read_stream,
self.transport.write_stream,
)
# Enter the session's async context manager via exit stack
await self._exit_stack.enter_async_context(self._session)
# Initialize the session (required by MCP protocol)
try:
await asyncio.wait_for(
self._session.initialize(),
timeout=self.connect_timeout,
)
except asyncio.CancelledError:
# If initialization was cancelled (e.g., event loop closing),
# cleanup and re-raise - don't suppress cancellation
await self._cleanup_on_error()
raise
except BaseExceptionGroup as eg:
# Handle exception groups from anyio task groups
# Extract the actual meaningful error (not GeneratorExit)
actual_error = None
for exc in eg.exceptions:
if isinstance(exc, Exception) and not isinstance(
exc, GeneratorExit
):
# Check if it's an HTTP error (like 401)
error_msg = str(exc).lower()
if "401" in error_msg or "unauthorized" in error_msg:
actual_error = exc
break
if "cancel scope" not in error_msg and "task" not in error_msg:
actual_error = exc
break
await self._cleanup_on_error()
if actual_error:
raise ConnectionError(
f"Failed to connect to MCP server: {actual_error}"
) from actual_error
raise ConnectionError(f"Failed to connect to MCP server: {eg}") from eg
self._initialized = True
self._was_connected = True
completed_at = datetime.now()
connection_duration_ms = (completed_at - started_at).total_seconds() * 1000
crewai_event_bus.emit(
self,
MCPConnectionCompletedEvent(
server_name=server_name,
server_url=server_url,
transport_type=transport_type,
started_at=started_at,
completed_at=completed_at,
connection_duration_ms=connection_duration_ms,
is_reconnect=is_reconnect,
),
)
return self
except ImportError as e:
await self._cleanup_on_error()
error_msg = (
"MCP library not available. Please install with: pip install mcp"
)
self._emit_connection_failed(
server_name,
server_url,
transport_type,
error_msg,
"import_error",
started_at,
)
raise ImportError(error_msg) from e
except asyncio.TimeoutError as e:
await self._cleanup_on_error()
error_msg = f"MCP connection timed out after {self.connect_timeout} seconds. The server may be slow or unreachable."
self._emit_connection_failed(
server_name,
server_url,
transport_type,
error_msg,
"timeout",
started_at,
)
raise ConnectionError(error_msg) from e
except asyncio.CancelledError:
# Re-raise cancellation - don't suppress it
await self._cleanup_on_error()
self._emit_connection_failed(
server_name,
server_url,
transport_type,
"Connection cancelled",
"cancelled",
started_at,
)
raise
except BaseExceptionGroup as eg:
# Handle exception groups from anyio task groups at outer level
actual_error = None
for exc in eg.exceptions:
if isinstance(exc, Exception) and not isinstance(exc, GeneratorExit):
error_msg = str(exc).lower()
if "401" in error_msg or "unauthorized" in error_msg:
actual_error = exc
break
if "cancel scope" not in error_msg and "task" not in error_msg:
actual_error = exc
break
await self._cleanup_on_error()
error_type = (
"authentication"
if actual_error
and (
"401" in str(actual_error).lower()
or "unauthorized" in str(actual_error).lower()
)
else "network"
)
error_msg = str(actual_error) if actual_error else str(eg)
self._emit_connection_failed(
server_name,
server_url,
transport_type,
error_msg,
error_type,
started_at,
)
if actual_error:
raise ConnectionError(
f"Failed to connect to MCP server: {actual_error}"
) from actual_error
raise ConnectionError(f"Failed to connect to MCP server: {eg}") from eg
except Exception as e:
await self._cleanup_on_error()
error_type = (
"authentication"
if "401" in str(e).lower() or "unauthorized" in str(e).lower()
else "network"
)
self._emit_connection_failed(
server_name, server_url, transport_type, str(e), error_type, started_at
)
raise ConnectionError(f"Failed to connect to MCP server: {e}") from e
def _emit_connection_failed(
self,
server_name: str,
server_url: str | None,
transport_type: str | None,
error: str,
error_type: str,
started_at: datetime,
) -> None:
"""Emit connection failed event."""
failed_at = datetime.now()
crewai_event_bus.emit(
self,
MCPConnectionFailedEvent(
server_name=server_name,
server_url=server_url,
transport_type=transport_type,
error=error,
error_type=error_type,
started_at=started_at,
failed_at=failed_at,
),
)
async def _cleanup_on_error(self) -> None:
"""Cleanup resources when an error occurs during connection."""
try:
await self._exit_stack.aclose()
except Exception as e:
# Best effort cleanup - ignore all other errors
raise RuntimeError(f"Error during MCP client cleanup: {e}") from e
finally:
self._session = None
self._initialized = False
self._exit_stack = AsyncExitStack()
async def disconnect(self) -> None:
"""Disconnect from MCP server and cleanup resources."""
if not self.connected:
return
try:
await self._exit_stack.aclose()
except Exception as e:
raise RuntimeError(f"Error during MCP client disconnect: {e}") from e
finally:
self._session = None
self._initialized = False
self._exit_stack = AsyncExitStack()
async def list_tools(self, use_cache: bool | None = None) -> list[dict[str, Any]]:
"""List available tools from MCP server.
Args:
use_cache: Whether to use cached results. If None, uses
client's cache_tools_list setting.
Returns:
List of tool definitions with name, description, and inputSchema.
"""
if not self.connected:
await self.connect()
# Check cache if enabled
use_cache = use_cache if use_cache is not None else self.cache_tools_list
if use_cache:
cache_key = self._get_cache_key("tools")
if cache_key in _mcp_schema_cache:
cached_data, cache_time = _mcp_schema_cache[cache_key]
if time.time() - cache_time < _cache_ttl:
# Logger removed - return cached data
return cached_data
# List tools with timeout and retries
tools = await self._retry_operation(
self._list_tools_impl,
timeout=self.discovery_timeout,
)
# Cache results if enabled
if use_cache:
cache_key = self._get_cache_key("tools")
_mcp_schema_cache[cache_key] = (tools, time.time())
return tools
async def _list_tools_impl(self) -> list[dict[str, Any]]:
"""Internal implementation of list_tools."""
tools_result = await asyncio.wait_for(
self.session.list_tools(),
timeout=self.discovery_timeout,
)
return [
{
"name": tool.name,
"description": getattr(tool, "description", ""),
"inputSchema": getattr(tool, "inputSchema", {}),
}
for tool in tools_result.tools
]
async def call_tool(
self, tool_name: str, arguments: dict[str, Any] | None = None
) -> Any:
"""Call a tool on the MCP server.
Args:
tool_name: Name of the tool to call.
arguments: Tool arguments.
Returns:
Tool execution result.
"""
if not self.connected:
await self.connect()
arguments = arguments or {}
cleaned_arguments = self._clean_tool_arguments(arguments)
# Get server info for events
server_name, server_url, transport_type = self._get_server_info()
# Emit tool execution started event
started_at = datetime.now()
crewai_event_bus.emit(
self,
MCPToolExecutionStartedEvent(
server_name=server_name,
server_url=server_url,
transport_type=transport_type,
tool_name=tool_name,
tool_args=cleaned_arguments,
),
)
try:
result = await self._retry_operation(
lambda: self._call_tool_impl(tool_name, cleaned_arguments),
timeout=self.execution_timeout,
)
completed_at = datetime.now()
execution_duration_ms = (completed_at - started_at).total_seconds() * 1000
crewai_event_bus.emit(
self,
MCPToolExecutionCompletedEvent(
server_name=server_name,
server_url=server_url,
transport_type=transport_type,
tool_name=tool_name,
tool_args=cleaned_arguments,
result=result,
started_at=started_at,
completed_at=completed_at,
execution_duration_ms=execution_duration_ms,
),
)
return result
except Exception as e:
failed_at = datetime.now()
error_type = (
"timeout"
if isinstance(e, (asyncio.TimeoutError, ConnectionError))
and "timeout" in str(e).lower()
else "server_error"
)
crewai_event_bus.emit(
self,
MCPToolExecutionFailedEvent(
server_name=server_name,
server_url=server_url,
transport_type=transport_type,
tool_name=tool_name,
tool_args=cleaned_arguments,
error=str(e),
error_type=error_type,
started_at=started_at,
failed_at=failed_at,
),
)
raise
def _clean_tool_arguments(self, arguments: dict[str, Any]) -> dict[str, Any]:
"""Clean tool arguments by removing None values and fixing formats.
Args:
arguments: Raw tool arguments.
Returns:
Cleaned arguments ready for MCP server.
"""
cleaned = {}
for key, value in arguments.items():
# Skip None values
if value is None:
continue
# Fix sources array format: convert ["web"] to [{"type": "web"}]
if key == "sources" and isinstance(value, list):
fixed_sources = []
for item in value:
if isinstance(item, str):
# Convert string to object format
fixed_sources.append({"type": item})
elif isinstance(item, dict):
# Already in correct format
fixed_sources.append(item)
else:
# Keep as is if unknown format
fixed_sources.append(item)
if fixed_sources:
cleaned[key] = fixed_sources
continue
# Recursively clean nested dictionaries
if isinstance(value, dict):
nested_cleaned = self._clean_tool_arguments(value)
if nested_cleaned: # Only add if not empty
cleaned[key] = nested_cleaned
elif isinstance(value, list):
# Clean list items
cleaned_list = []
for item in value:
if isinstance(item, dict):
cleaned_item = self._clean_tool_arguments(item)
if cleaned_item:
cleaned_list.append(cleaned_item)
elif item is not None:
cleaned_list.append(item)
if cleaned_list:
cleaned[key] = cleaned_list
else:
# Keep primitive values
cleaned[key] = value
return cleaned
async def _call_tool_impl(self, tool_name: str, arguments: dict[str, Any]) -> Any:
"""Internal implementation of call_tool."""
result = await asyncio.wait_for(
self.session.call_tool(tool_name, arguments),
timeout=self.execution_timeout,
)
# Extract result content
if hasattr(result, "content") and result.content:
if isinstance(result.content, list) and len(result.content) > 0:
content_item = result.content[0]
if hasattr(content_item, "text"):
return str(content_item.text)
return str(content_item)
return str(result.content)
return str(result)
async def list_prompts(self) -> list[dict[str, Any]]:
"""List available prompts from MCP server.
Returns:
List of prompt definitions.
"""
if not self.connected:
await self.connect()
return await self._retry_operation(
self._list_prompts_impl,
timeout=self.discovery_timeout,
)
async def _list_prompts_impl(self) -> list[dict[str, Any]]:
"""Internal implementation of list_prompts."""
prompts_result = await asyncio.wait_for(
self.session.list_prompts(),
timeout=self.discovery_timeout,
)
return [
{
"name": prompt.name,
"description": getattr(prompt, "description", ""),
"arguments": getattr(prompt, "arguments", []),
}
for prompt in prompts_result.prompts
]
async def get_prompt(
self, prompt_name: str, arguments: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Get a prompt from the MCP server.
Args:
prompt_name: Name of the prompt to get.
arguments: Optional prompt arguments.
Returns:
Prompt content and metadata.
"""
if not self.connected:
await self.connect()
arguments = arguments or {}
return await self._retry_operation(
lambda: self._get_prompt_impl(prompt_name, arguments),
timeout=self.execution_timeout,
)
async def _get_prompt_impl(
self, prompt_name: str, arguments: dict[str, Any]
) -> dict[str, Any]:
"""Internal implementation of get_prompt."""
result = await asyncio.wait_for(
self.session.get_prompt(prompt_name, arguments),
timeout=self.execution_timeout,
)
return {
"name": prompt_name,
"messages": [
{
"role": msg.role,
"content": msg.content,
}
for msg in result.messages
],
"arguments": arguments,
}
async def _retry_operation(
self,
operation: Callable[[], Any],
timeout: int | None = None,
) -> Any:
"""Retry an operation with exponential backoff.
Args:
operation: Async operation to retry.
timeout: Operation timeout in seconds.
Returns:
Operation result.
"""
last_error = None
timeout = timeout or self.execution_timeout
for attempt in range(self.max_retries):
try:
if timeout:
return await asyncio.wait_for(operation(), timeout=timeout)
return await operation()
except asyncio.TimeoutError as e: # noqa: PERF203
last_error = f"Operation timed out after {timeout} seconds"
if attempt < self.max_retries - 1:
wait_time = 2**attempt
await asyncio.sleep(wait_time)
else:
raise ConnectionError(last_error) from e
except Exception as e:
error_str = str(e).lower()
# Classify errors as retryable or non-retryable
if "authentication" in error_str or "unauthorized" in error_str:
raise ConnectionError(f"Authentication failed: {e}") from e
if "not found" in error_str:
raise ValueError(f"Resource not found: {e}") from e
# Retryable errors
last_error = str(e)
if attempt < self.max_retries - 1:
wait_time = 2**attempt
await asyncio.sleep(wait_time)
else:
raise ConnectionError(
f"Operation failed after {self.max_retries} attempts: {last_error}"
) from e
raise ConnectionError(f"Operation failed: {last_error}")
def _get_cache_key(self, resource_type: str) -> str:
"""Generate cache key for resource.
Args:
resource_type: Type of resource (e.g., "tools", "prompts").
Returns:
Cache key string.
"""
# Use transport type and URL/command as cache key
if isinstance(self.transport, StdioTransport):
key = f"stdio:{self.transport.command}:{':'.join(self.transport.args)}"
elif isinstance(self.transport, HTTPTransport):
key = f"http:{self.transport.url}"
elif isinstance(self.transport, SSETransport):
key = f"sse:{self.transport.url}"
else:
key = f"{self.transport.transport_type}:unknown"
return f"mcp:{key}:{resource_type}"
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return await self.connect()
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Async context manager exit."""
await self.disconnect()

View File

@@ -0,0 +1,124 @@
"""MCP server configuration models for CrewAI agents.
This module provides Pydantic models for configuring MCP servers with
various transport types, similar to OpenAI's Agents SDK.
"""
from pydantic import BaseModel, Field
from crewai.mcp.filters import ToolFilter
class MCPServerStdio(BaseModel):
"""Stdio MCP server configuration.
This configuration is used for connecting to local MCP servers
that run as processes and communicate via standard input/output.
Example:
```python
mcp_server = MCPServerStdio(
command="python",
args=["path/to/server.py"],
env={"API_KEY": "..."},
tool_filter=create_static_tool_filter(
allowed_tool_names=["read_file", "write_file"]
),
)
```
"""
command: str = Field(
...,
description="Command to execute (e.g., 'python', 'node', 'npx', 'uvx').",
)
args: list[str] = Field(
default_factory=list,
description="Command arguments (e.g., ['server.py'] or ['-y', '@mcp/server']).",
)
env: dict[str, str] | None = Field(
default=None,
description="Environment variables to pass to the process.",
)
tool_filter: ToolFilter | None = Field(
default=None,
description="Optional tool filter for filtering available tools.",
)
cache_tools_list: bool = Field(
default=False,
description="Whether to cache the tool list for faster subsequent access.",
)
class MCPServerHTTP(BaseModel):
"""HTTP/Streamable HTTP MCP server configuration.
This configuration is used for connecting to remote MCP servers
over HTTP/HTTPS using streamable HTTP transport.
Example:
```python
mcp_server = MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer ..."},
cache_tools_list=True,
)
```
"""
url: str = Field(
..., description="Server URL (e.g., 'https://api.example.com/mcp')."
)
headers: dict[str, str] | None = Field(
default=None,
description="Optional HTTP headers for authentication or other purposes.",
)
streamable: bool = Field(
default=True,
description="Whether to use streamable HTTP transport (default: True).",
)
tool_filter: ToolFilter | None = Field(
default=None,
description="Optional tool filter for filtering available tools.",
)
cache_tools_list: bool = Field(
default=False,
description="Whether to cache the tool list for faster subsequent access.",
)
class MCPServerSSE(BaseModel):
"""Server-Sent Events (SSE) MCP server configuration.
This configuration is used for connecting to remote MCP servers
using Server-Sent Events for real-time streaming communication.
Example:
```python
mcp_server = MCPServerSSE(
url="https://api.example.com/mcp/sse",
headers={"Authorization": "Bearer ..."},
)
```
"""
url: str = Field(
...,
description="Server URL (e.g., 'https://api.example.com/mcp/sse').",
)
headers: dict[str, str] | None = Field(
default=None,
description="Optional HTTP headers for authentication or other purposes.",
)
tool_filter: ToolFilter | None = Field(
default=None,
description="Optional tool filter for filtering available tools.",
)
cache_tools_list: bool = Field(
default=False,
description="Whether to cache the tool list for faster subsequent access.",
)
# Type alias for all MCP server configurations
MCPServerConfig = MCPServerStdio | MCPServerHTTP | MCPServerSSE

View File

@@ -0,0 +1,166 @@
"""Tool filtering support for MCP servers.
This module provides utilities for filtering tools from MCP servers,
including static allow/block lists and dynamic context-aware filtering.
"""
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field
if TYPE_CHECKING:
pass
class ToolFilterContext(BaseModel):
"""Context for dynamic tool filtering.
This context is passed to dynamic tool filters to provide
information about the agent, run context, and server.
"""
agent: Any = Field(..., description="The agent requesting tools.")
server_name: str = Field(..., description="Name of the MCP server.")
run_context: dict[str, Any] | None = Field(
default=None,
description="Optional run context for additional filtering logic.",
)
# Type alias for tool filter functions
ToolFilter = (
Callable[[ToolFilterContext, dict[str, Any]], bool]
| Callable[[dict[str, Any]], bool]
)
class StaticToolFilter:
"""Static tool filter with allow/block lists.
This filter provides simple allow/block list filtering based on
tool names. Useful for restricting which tools are available
from an MCP server.
Example:
```python
filter = StaticToolFilter(
allowed_tool_names=["read_file", "write_file"],
blocked_tool_names=["delete_file"],
)
```
"""
def __init__(
self,
allowed_tool_names: list[str] | None = None,
blocked_tool_names: list[str] | None = None,
) -> None:
"""Initialize static tool filter.
Args:
allowed_tool_names: List of tool names to allow. If None,
all tools are allowed (unless blocked).
blocked_tool_names: List of tool names to block. Blocked tools
take precedence over allowed tools.
"""
self.allowed_tool_names = set(allowed_tool_names or [])
self.blocked_tool_names = set(blocked_tool_names or [])
def __call__(self, tool: dict[str, Any]) -> bool:
"""Filter tool based on allow/block lists.
Args:
tool: Tool definition dictionary with at least 'name' key.
Returns:
True if tool should be included, False otherwise.
"""
tool_name = tool.get("name", "")
# Blocked tools take precedence
if self.blocked_tool_names and tool_name in self.blocked_tool_names:
return False
# If allow list exists, tool must be in it
if self.allowed_tool_names:
return tool_name in self.allowed_tool_names
# No restrictions - allow all
return True
def create_static_tool_filter(
allowed_tool_names: list[str] | None = None,
blocked_tool_names: list[str] | None = None,
) -> Callable[[dict[str, Any]], bool]:
"""Create a static tool filter function.
This is a convenience function for creating static tool filters
with allow/block lists.
Args:
allowed_tool_names: List of tool names to allow. If None,
all tools are allowed (unless blocked).
blocked_tool_names: List of tool names to block. Blocked tools
take precedence over allowed tools.
Returns:
Tool filter function that returns True for allowed tools.
Example:
```python
filter_fn = create_static_tool_filter(
allowed_tool_names=["read_file", "write_file"],
blocked_tool_names=["delete_file"],
)
# Use in MCPServerStdio
mcp_server = MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
tool_filter=filter_fn,
)
```
"""
return StaticToolFilter(
allowed_tool_names=allowed_tool_names,
blocked_tool_names=blocked_tool_names,
)
def create_dynamic_tool_filter(
filter_func: Callable[[ToolFilterContext, dict[str, Any]], bool],
) -> Callable[[ToolFilterContext, dict[str, Any]], bool]:
"""Create a dynamic tool filter function.
This function wraps a dynamic filter function that has access
to the tool filter context (agent, server, run context).
Args:
filter_func: Function that takes (context, tool) and returns bool.
Returns:
Tool filter function that can be used with MCP server configs.
Example:
```python
async def context_aware_filter(
context: ToolFilterContext, tool: dict[str, Any]
) -> bool:
# Block dangerous tools for code reviewers
if context.agent.role == "Code Reviewer":
if tool["name"].startswith("danger_"):
return False
return True
filter_fn = create_dynamic_tool_filter(context_aware_filter)
mcp_server = MCPServerStdio(
command="python", args=["server.py"], tool_filter=filter_fn
)
```
"""
return filter_func

View File

@@ -0,0 +1,15 @@
"""MCP transport implementations for various connection types."""
from crewai.mcp.transports.base import BaseTransport, TransportType
from crewai.mcp.transports.http import HTTPTransport
from crewai.mcp.transports.sse import SSETransport
from crewai.mcp.transports.stdio import StdioTransport
__all__ = [
"BaseTransport",
"HTTPTransport",
"SSETransport",
"StdioTransport",
"TransportType",
]

View File

@@ -0,0 +1,125 @@
"""Base transport interface for MCP connections."""
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Protocol
from typing_extensions import Self
class TransportType(str, Enum):
"""MCP transport types."""
STDIO = "stdio"
HTTP = "http"
STREAMABLE_HTTP = "streamable-http"
SSE = "sse"
class ReadStream(Protocol):
"""Protocol for read streams."""
async def read(self, n: int = -1) -> bytes:
"""Read bytes from stream."""
...
class WriteStream(Protocol):
"""Protocol for write streams."""
async def write(self, data: bytes) -> None:
"""Write bytes to stream."""
...
class BaseTransport(ABC):
"""Base class for MCP transport implementations.
This abstract base class defines the interface that all transport
implementations must follow. Transports handle the low-level communication
with MCP servers.
"""
def __init__(self, **kwargs: Any) -> None:
"""Initialize the transport.
Args:
**kwargs: Transport-specific configuration options.
"""
self._read_stream: ReadStream | None = None
self._write_stream: WriteStream | None = None
self._connected = False
@property
@abstractmethod
def transport_type(self) -> TransportType:
"""Return the transport type."""
...
@property
def connected(self) -> bool:
"""Check if transport is connected."""
return self._connected
@property
def read_stream(self) -> ReadStream:
"""Get the read stream."""
if self._read_stream is None:
raise RuntimeError("Transport not connected. Call connect() first.")
return self._read_stream
@property
def write_stream(self) -> WriteStream:
"""Get the write stream."""
if self._write_stream is None:
raise RuntimeError("Transport not connected. Call connect() first.")
return self._write_stream
@abstractmethod
async def connect(self) -> Self:
"""Establish connection to MCP server.
Returns:
Self for method chaining.
Raises:
ConnectionError: If connection fails.
"""
...
@abstractmethod
async def disconnect(self) -> None:
"""Close connection to MCP server."""
...
@abstractmethod
async def __aenter__(self) -> Self:
"""Async context manager entry."""
...
@abstractmethod
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Async context manager exit."""
...
def _set_streams(self, read: ReadStream, write: WriteStream) -> None:
"""Set the read and write streams.
Args:
read: Read stream.
write: Write stream.
"""
self._read_stream = read
self._write_stream = write
self._connected = True
def _clear_streams(self) -> None:
"""Clear the read and write streams."""
self._read_stream = None
self._write_stream = None
self._connected = False

View File

@@ -0,0 +1,174 @@
"""HTTP and Streamable HTTP transport for MCP servers."""
import asyncio
from typing import Any
from typing_extensions import Self
# BaseExceptionGroup is available in Python 3.11+
try:
from builtins import BaseExceptionGroup
except ImportError:
# Fallback for Python < 3.11 (shouldn't happen in practice)
BaseExceptionGroup = Exception
from crewai.mcp.transports.base import BaseTransport, TransportType
class HTTPTransport(BaseTransport):
"""HTTP/Streamable HTTP transport for connecting to remote MCP servers.
This transport connects to MCP servers over HTTP/HTTPS using the
streamable HTTP client from the MCP SDK.
Example:
```python
transport = HTTPTransport(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer ..."}
)
async with transport:
# Use transport...
```
"""
def __init__(
self,
url: str,
headers: dict[str, str] | None = None,
streamable: bool = True,
**kwargs: Any,
) -> None:
"""Initialize HTTP transport.
Args:
url: Server URL (e.g., "https://api.example.com/mcp").
headers: Optional HTTP headers.
streamable: Whether to use streamable HTTP (default: True).
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
self.url = url
self.headers = headers or {}
self.streamable = streamable
self._transport_context: Any = None
@property
def transport_type(self) -> TransportType:
"""Return the transport type."""
return TransportType.STREAMABLE_HTTP if self.streamable else TransportType.HTTP
async def connect(self) -> Self:
"""Establish HTTP connection to MCP server.
Returns:
Self for method chaining.
Raises:
ConnectionError: If connection fails.
ImportError: If MCP SDK not available.
"""
if self._connected:
return self
try:
from mcp.client.streamable_http import streamablehttp_client
self._transport_context = streamablehttp_client(
self.url,
headers=self.headers if self.headers else None,
terminate_on_close=True,
)
try:
read, write, _ = await asyncio.wait_for(
self._transport_context.__aenter__(), timeout=30.0
)
except asyncio.TimeoutError as e:
self._transport_context = None
raise ConnectionError(
"Transport context entry timed out after 30 seconds. "
"Server may be slow or unreachable."
) from e
except Exception as e:
self._transport_context = None
raise ConnectionError(f"Failed to enter transport context: {e}") from e
self._set_streams(read=read, write=write)
return self
except ImportError as e:
raise ImportError(
"MCP library not available. Please install with: pip install mcp"
) from e
except Exception as e:
self._clear_streams()
if self._transport_context is not None:
self._transport_context = None
raise ConnectionError(f"Failed to connect to MCP server: {e}") from e
async def disconnect(self) -> None:
"""Close HTTP connection."""
if not self._connected:
return
try:
# Clear streams first
self._clear_streams()
# await self._exit_stack.aclose()
# Exit transport context - this will clean up background tasks
# Give a small delay to allow background tasks to complete
if self._transport_context is not None:
try:
# Wait a tiny bit for any pending operations
await asyncio.sleep(0.1)
await self._transport_context.__aexit__(None, None, None)
except (RuntimeError, asyncio.CancelledError) as e:
# Ignore "exit cancel scope in different task" errors and cancellation
# These happen when asyncio.run() closes the event loop
# while background tasks are still running
error_msg = str(e).lower()
if "cancel scope" not in error_msg and "task" not in error_msg:
# Only suppress cancel scope/task errors, re-raise others
if isinstance(e, RuntimeError):
raise
# For CancelledError, just suppress it
except BaseExceptionGroup as eg:
# Handle exception groups from anyio task groups
# Suppress if they contain cancel scope errors
should_suppress = False
for exc in eg.exceptions:
error_msg = str(exc).lower()
if "cancel scope" in error_msg or "task" in error_msg:
should_suppress = True
break
if not should_suppress:
raise
except Exception as e:
raise RuntimeError(
f"Error during HTTP transport disconnect: {e}"
) from e
self._connected = False
except Exception as e:
# Log but don't raise - cleanup should be best effort
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Error during HTTP transport disconnect: {e}")
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return await self.connect()
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Async context manager exit."""
await self.disconnect()

View File

@@ -0,0 +1,113 @@
"""Server-Sent Events (SSE) transport for MCP servers."""
from typing import Any
from typing_extensions import Self
from crewai.mcp.transports.base import BaseTransport, TransportType
class SSETransport(BaseTransport):
"""SSE transport for connecting to remote MCP servers.
This transport connects to MCP servers using Server-Sent Events (SSE)
for real-time streaming communication.
Example:
```python
transport = SSETransport(
url="https://api.example.com/mcp/sse",
headers={"Authorization": "Bearer ..."}
)
async with transport:
# Use transport...
```
"""
def __init__(
self,
url: str,
headers: dict[str, str] | None = None,
**kwargs: Any,
) -> None:
"""Initialize SSE transport.
Args:
url: Server URL (e.g., "https://api.example.com/mcp/sse").
headers: Optional HTTP headers.
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
self.url = url
self.headers = headers or {}
self._transport_context: Any = None
@property
def transport_type(self) -> TransportType:
"""Return the transport type."""
return TransportType.SSE
async def connect(self) -> Self:
"""Establish SSE connection to MCP server.
Returns:
Self for method chaining.
Raises:
ConnectionError: If connection fails.
ImportError: If MCP SDK not available.
"""
if self._connected:
return self
try:
from mcp.client.sse import sse_client
self._transport_context = sse_client(
self.url,
headers=self.headers if self.headers else None,
terminate_on_close=True,
)
read, write = await self._transport_context.__aenter__()
self._set_streams(read=read, write=write)
return self
except ImportError as e:
raise ImportError(
"MCP library not available. Please install with: pip install mcp"
) from e
except Exception as e:
self._clear_streams()
raise ConnectionError(f"Failed to connect to SSE MCP server: {e}") from e
async def disconnect(self) -> None:
"""Close SSE connection."""
if not self._connected:
return
try:
self._clear_streams()
if self._transport_context is not None:
await self._transport_context.__aexit__(None, None, None)
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Error during SSE transport disconnect: {e}")
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return await self.connect()
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Async context manager exit."""
await self.disconnect()

View File

@@ -0,0 +1,153 @@
"""Stdio transport for MCP servers running as local processes."""
import asyncio
import os
import subprocess
from typing import Any
from typing_extensions import Self
from crewai.mcp.transports.base import BaseTransport, TransportType
class StdioTransport(BaseTransport):
"""Stdio transport for connecting to local MCP servers.
This transport connects to MCP servers running as local processes,
communicating via standard input/output streams. Supports Python,
Node.js, and other command-line servers.
Example:
```python
transport = StdioTransport(
command="python",
args=["path/to/server.py"],
env={"API_KEY": "..."}
)
async with transport:
# Use transport...
```
"""
def __init__(
self,
command: str,
args: list[str] | None = None,
env: dict[str, str] | None = None,
**kwargs: Any,
) -> None:
"""Initialize stdio transport.
Args:
command: Command to execute (e.g., "python", "node", "npx").
args: Command arguments (e.g., ["server.py"] or ["-y", "@mcp/server"]).
env: Environment variables to pass to the process.
**kwargs: Additional transport options.
"""
super().__init__(**kwargs)
self.command = command
self.args = args or []
self.env = env or {}
self._process: subprocess.Popen[bytes] | None = None
self._transport_context: Any = None
@property
def transport_type(self) -> TransportType:
"""Return the transport type."""
return TransportType.STDIO
async def connect(self) -> Self:
"""Start the MCP server process and establish connection.
Returns:
Self for method chaining.
Raises:
ConnectionError: If process fails to start.
ImportError: If MCP SDK not available.
"""
if self._connected:
return self
try:
from mcp import StdioServerParameters
from mcp.client.stdio import stdio_client
process_env = os.environ.copy()
process_env.update(self.env)
server_params = StdioServerParameters(
command=self.command,
args=self.args,
env=process_env if process_env else None,
)
self._transport_context = stdio_client(server_params)
try:
read, write = await self._transport_context.__aenter__()
except Exception as e:
import traceback
traceback.print_exc()
self._transport_context = None
raise ConnectionError(
f"Failed to enter stdio transport context: {e}"
) from e
self._set_streams(read=read, write=write)
return self
except ImportError as e:
raise ImportError(
"MCP library not available. Please install with: pip install mcp"
) from e
except Exception as e:
self._clear_streams()
if self._transport_context is not None:
self._transport_context = None
raise ConnectionError(f"Failed to start MCP server process: {e}") from e
async def disconnect(self) -> None:
"""Terminate the MCP server process and close connection."""
if not self._connected:
return
try:
self._clear_streams()
if self._transport_context is not None:
await self._transport_context.__aexit__(None, None, None)
if self._process is not None:
try:
self._process.terminate()
try:
await asyncio.wait_for(self._process.wait(), timeout=5.0)
except asyncio.TimeoutError:
self._process.kill()
await self._process.wait()
# except ProcessLookupError:
# pass
finally:
self._process = None
except Exception as e:
# Log but don't raise - cleanup should be best effort
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Error during stdio transport disconnect: {e}")
async def __aenter__(self) -> Self:
"""Async context manager entry."""
return await self.connect()
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: Any,
) -> None:
"""Async context manager exit."""
await self.disconnect()

View File

@@ -67,31 +67,44 @@ def _prepare_documents_for_chromadb(
ids: list[str] = []
texts: list[str] = []
metadatas: list[Mapping[str, str | int | float | bool]] = []
seen_ids: dict[str, int] = {}
try:
for doc in documents:
if "doc_id" in doc:
doc_id = str(doc["doc_id"])
else:
metadata = doc.get("metadata")
if metadata and isinstance(metadata, dict) and "doc_id" in metadata:
doc_id = str(metadata["doc_id"])
else:
content_for_hash = doc["content"]
if metadata:
metadata_str = json.dumps(metadata, sort_keys=True)
content_for_hash = f"{content_for_hash}|{metadata_str}"
doc_id = hashlib.sha256(content_for_hash.encode()).hexdigest()
for doc in documents:
if "doc_id" in doc:
ids.append(doc["doc_id"])
else:
content_for_hash = doc["content"]
metadata = doc.get("metadata")
if metadata:
metadata_str = json.dumps(metadata, sort_keys=True)
content_for_hash = f"{content_for_hash}|{metadata_str}"
content_hash = hashlib.blake2b(
content_for_hash.encode(), digest_size=32
).hexdigest()
ids.append(content_hash)
texts.append(doc["content"])
metadata = doc.get("metadata")
if metadata:
if isinstance(metadata, list):
metadatas.append(metadata[0] if metadata and metadata[0] else {})
if isinstance(metadata, list):
processed_metadata = metadata[0] if metadata and metadata[0] else {}
else:
processed_metadata = metadata
else:
metadatas.append(metadata)
else:
metadatas.append({})
processed_metadata = {}
if doc_id in seen_ids:
idx = seen_ids[doc_id]
texts[idx] = doc["content"]
metadatas[idx] = processed_metadata
else:
idx = len(ids)
ids.append(doc_id)
texts.append(doc["content"])
metadatas.append(processed_metadata)
seen_ids[doc_id] = idx
except Exception as e:
raise ValueError(f"Error preparing documents for ChromaDB: {e}") from e
return PreparedDocuments(ids, texts, metadatas)

View File

@@ -539,6 +539,7 @@ class Task(BaseModel):
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages,
)
if self._guardrails:
@@ -949,6 +950,7 @@ Follow these guidelines:
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages,
)
return task_output

View File

@@ -6,6 +6,7 @@ from typing import Any
from pydantic import BaseModel, Field, model_validator
from crewai.tasks.output_format import OutputFormat
from crewai.utilities.types import LLMMessage
class TaskOutput(BaseModel):
@@ -40,6 +41,7 @@ class TaskOutput(BaseModel):
output_format: OutputFormat = Field(
description="Output format of the task", default=OutputFormat.RAW
)
messages: list[LLMMessage] = Field(description="Messages of the task", default=[])
@model_validator(mode="after")
def set_summary(self):

View File

@@ -0,0 +1,162 @@
"""Native MCP tool wrapper for CrewAI agents.
This module provides a tool wrapper that reuses existing MCP client sessions
for better performance and connection management.
"""
import asyncio
from typing import Any
from crewai.tools import BaseTool
class MCPNativeTool(BaseTool):
"""Native MCP tool that reuses client sessions.
This tool wrapper is used when agents connect to MCP servers using
structured configurations. It reuses existing client sessions for
better performance and proper connection lifecycle management.
Unlike MCPToolWrapper which connects on-demand, this tool uses
a shared MCP client instance that maintains a persistent connection.
"""
def __init__(
self,
mcp_client: Any,
tool_name: str,
tool_schema: dict[str, Any],
server_name: str,
) -> None:
"""Initialize native MCP tool.
Args:
mcp_client: MCPClient instance with active session.
tool_name: Original name of the tool on the MCP server.
tool_schema: Schema information for the tool.
server_name: Name of the MCP server for prefixing.
"""
# Create tool name with server prefix to avoid conflicts
prefixed_name = f"{server_name}_{tool_name}"
# Handle args_schema properly - BaseTool expects a BaseModel subclass
args_schema = tool_schema.get("args_schema")
# Only pass args_schema if it's provided
kwargs = {
"name": prefixed_name,
"description": tool_schema.get(
"description", f"Tool {tool_name} from {server_name}"
),
}
if args_schema is not None:
kwargs["args_schema"] = args_schema
super().__init__(**kwargs)
# Set instance attributes after super().__init__
self._mcp_client = mcp_client
self._original_tool_name = tool_name
self._server_name = server_name
# self._logger = logging.getLogger(__name__)
@property
def mcp_client(self) -> Any:
"""Get the MCP client instance."""
return self._mcp_client
@property
def original_tool_name(self) -> str:
"""Get the original tool name."""
return self._original_tool_name
@property
def server_name(self) -> str:
"""Get the server name."""
return self._server_name
def _run(self, **kwargs) -> str:
"""Execute tool using the MCP client session.
Args:
**kwargs: Arguments to pass to the MCP tool.
Returns:
Result from the MCP tool execution.
"""
try:
try:
asyncio.get_running_loop()
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
coro = self._run_async(**kwargs)
future = executor.submit(asyncio.run, coro)
return future.result()
except RuntimeError:
return asyncio.run(self._run_async(**kwargs))
except Exception as e:
raise RuntimeError(
f"Error executing MCP tool {self.original_tool_name}: {e!s}"
) from e
async def _run_async(self, **kwargs) -> str:
"""Async implementation of tool execution.
Args:
**kwargs: Arguments to pass to the MCP tool.
Returns:
Result from the MCP tool execution.
"""
# Note: Since we use asyncio.run() which creates a new event loop each time,
# Always reconnect on-demand because asyncio.run() creates new event loops per call
# All MCP transport context managers (stdio, streamablehttp_client, sse_client)
# use anyio.create_task_group() which can't span different event loops
if self._mcp_client.connected:
await self._mcp_client.disconnect()
await self._mcp_client.connect()
try:
result = await self._mcp_client.call_tool(self.original_tool_name, kwargs)
except Exception as e:
error_str = str(e).lower()
if (
"not connected" in error_str
or "connection" in error_str
or "send" in error_str
):
await self._mcp_client.disconnect()
await self._mcp_client.connect()
# Retry the call
result = await self._mcp_client.call_tool(
self.original_tool_name, kwargs
)
else:
raise
finally:
# Always disconnect after tool call to ensure clean context manager lifecycle
# This prevents "exit cancel scope in different task" errors
# All transport context managers must be exited in the same event loop they were entered
await self._mcp_client.disconnect()
# Extract result content
if isinstance(result, str):
return result
# Handle various result formats
if hasattr(result, "content") and result.content:
if isinstance(result.content, list) and len(result.content) > 0:
content_item = result.content[0]
if hasattr(content_item, "text"):
return str(content_item.text)
return str(content_item)
return str(result.content)
return str(result)

View File

@@ -33,6 +33,7 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.lite_agent import LiteAgent
from crewai.llm import LLM
from crewai.task import Task
@@ -127,7 +128,7 @@ def handle_max_iterations_exceeded(
messages: list[LLMMessage],
llm: LLM | BaseLLM,
callbacks: list[TokenCalcHandler],
) -> AgentAction | AgentFinish:
) -> AgentFinish:
"""Handles the case when the maximum number of iterations is exceeded. Performs one more LLM call to get the final answer.
Args:
@@ -139,7 +140,7 @@ def handle_max_iterations_exceeded(
callbacks: List of callbacks for the LLM call.
Returns:
The final formatted answer after exceeding max iterations.
AgentFinish with the final answer after exceeding max iterations.
"""
printer.print(
content="Maximum iterations reached. Requesting final answer.",
@@ -157,7 +158,7 @@ def handle_max_iterations_exceeded(
# Perform one more LLM call to get the final answer
answer = llm.call(
messages, # type: ignore[arg-type]
messages,
callbacks=callbacks,
)
@@ -168,8 +169,16 @@ def handle_max_iterations_exceeded(
)
raise ValueError("Invalid response from LLM call - None or empty.")
# Return the formatted answer, regardless of its type
return format_answer(answer=answer)
formatted = format_answer(answer=answer)
# If format_answer returned an AgentAction, convert it to AgentFinish
if isinstance(formatted, AgentFinish):
return formatted
return AgentFinish(
thought=formatted.thought,
output=formatted.text,
text=formatted.text,
)
def format_message_for_llm(
@@ -228,6 +237,7 @@ def get_llm_response(
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | None = None,
) -> str:
"""Call the LLM and return the response, handling any invalid responses.
@@ -239,6 +249,7 @@ def get_llm_response(
from_task: Optional task context for the LLM call
from_agent: Optional agent context for the LLM call
response_model: Optional Pydantic model for structured outputs
executor_context: Optional executor context for hook invocation
Returns:
The response from the LLM as a string
@@ -247,12 +258,17 @@ def get_llm_response(
Exception: If an error occurs.
ValueError: If the response is None or empty.
"""
if executor_context is not None:
_setup_before_llm_call_hooks(executor_context, printer)
messages = executor_context.messages
try:
answer = llm.call(
messages, # type: ignore[arg-type]
messages,
callbacks=callbacks,
from_task=from_task,
from_agent=from_agent,
from_agent=from_agent, # type: ignore[arg-type]
response_model=response_model,
)
except Exception as e:
@@ -264,7 +280,7 @@ def get_llm_response(
)
raise ValueError("Invalid response from LLM call - None or empty.")
return answer
return _setup_after_llm_call_hooks(executor_context, answer, printer)
def process_llm_response(
@@ -294,8 +310,8 @@ def handle_agent_action_core(
formatted_answer: AgentAction,
tool_result: ToolResult,
messages: list[LLMMessage] | None = None,
step_callback: Callable | None = None,
show_logs: Callable | None = None,
step_callback: Callable | None = None, # type: ignore[type-arg]
show_logs: Callable | None = None, # type: ignore[type-arg]
) -> AgentAction | AgentFinish:
"""Core logic for handling agent actions and tool results.
@@ -481,7 +497,7 @@ def summarize_messages(
),
]
summary = llm.call(
messages, # type: ignore[arg-type]
messages,
callbacks=callbacks,
)
summarized_contents.append({"content": str(summary)})
@@ -653,3 +669,92 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
else:
attributes[key] = value
return attributes
def _setup_before_llm_call_hooks(
executor_context: CrewAgentExecutor | None, printer: Printer
) -> None:
"""Setup and invoke before_llm_call hooks for the executor context.
Args:
executor_context: The executor context to setup the hooks for.
printer: Printer instance for error logging.
"""
if executor_context and executor_context.before_llm_call_hooks:
from crewai.utilities.llm_call_hooks import LLMCallHookContext
original_messages = executor_context.messages
hook_context = LLMCallHookContext(executor_context)
try:
for hook in executor_context.before_llm_call_hooks:
hook(hook_context)
except Exception as e:
printer.print(
content=f"Error in before_llm_call hook: {e}",
color="yellow",
)
if not isinstance(executor_context.messages, list):
printer.print(
content=(
"Warning: before_llm_call hook replaced messages with non-list. "
"Restoring original messages list. Hooks should modify messages in-place, "
"not replace the list (e.g., use context.messages.append() not context.messages = [])."
),
color="yellow",
)
if isinstance(original_messages, list):
executor_context.messages = original_messages
else:
executor_context.messages = []
def _setup_after_llm_call_hooks(
executor_context: CrewAgentExecutor | None,
answer: str,
printer: Printer,
) -> str:
"""Setup and invoke after_llm_call hooks for the executor context.
Args:
executor_context: The executor context to setup the hooks for.
answer: The LLM response string.
printer: Printer instance for error logging.
Returns:
The potentially modified response string.
"""
if executor_context and executor_context.after_llm_call_hooks:
from crewai.utilities.llm_call_hooks import LLMCallHookContext
original_messages = executor_context.messages
hook_context = LLMCallHookContext(executor_context, response=answer)
try:
for hook in executor_context.after_llm_call_hooks:
modified_response = hook(hook_context)
if modified_response is not None and isinstance(modified_response, str):
answer = modified_response
except Exception as e:
printer.print(
content=f"Error in after_llm_call hook: {e}",
color="yellow",
)
if not isinstance(executor_context.messages, list):
printer.print(
content=(
"Warning: after_llm_call hook replaced messages with non-list. "
"Restoring original messages list. Hooks should modify messages in-place, "
"not replace the list (e.g., use context.messages.append() not context.messages = [])."
),
color="yellow",
)
if isinstance(original_messages, list):
executor_context.messages = original_messages
else:
executor_context.messages = []
return answer

View File

@@ -0,0 +1,115 @@
from __future__ import annotations
from collections.abc import Callable
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from crewai.agents.crew_agent_executor import CrewAgentExecutor
class LLMCallHookContext:
"""Context object passed to LLM call hooks with full executor access.
Provides hooks with complete access to the executor state, allowing
modification of messages, responses, and executor attributes.
Attributes:
executor: Full reference to the CrewAgentExecutor instance
messages: Direct reference to executor.messages (mutable list).
Can be modified in both before_llm_call and after_llm_call hooks.
Modifications in after_llm_call hooks persist to the next iteration,
allowing hooks to modify conversation history for subsequent LLM calls.
IMPORTANT: Modify messages in-place (e.g., append, extend, remove items).
Do NOT replace the list (e.g., context.messages = []), as this will break
the executor. Use context.messages.append() or context.messages.extend()
instead of assignment.
agent: Reference to the agent executing the task
task: Reference to the task being executed
crew: Reference to the crew instance
llm: Reference to the LLM instance
iterations: Current iteration count
response: LLM response string (only set for after_llm_call hooks).
Can be modified by returning a new string from after_llm_call hook.
"""
def __init__(
self,
executor: CrewAgentExecutor,
response: str | None = None,
) -> None:
"""Initialize hook context with executor reference.
Args:
executor: The CrewAgentExecutor instance
response: Optional response string (for after_llm_call hooks)
"""
self.executor = executor
self.messages = executor.messages
self.agent = executor.agent
self.task = executor.task
self.crew = executor.crew
self.llm = executor.llm
self.iterations = executor.iterations
self.response = response
# Global hook registries (optional convenience feature)
_before_llm_call_hooks: list[Callable[[LLMCallHookContext], None]] = []
_after_llm_call_hooks: list[Callable[[LLMCallHookContext], str | None]] = []
def register_before_llm_call_hook(
hook: Callable[[LLMCallHookContext], None],
) -> None:
"""Register a global before_llm_call hook.
Global hooks are added to all executors automatically.
This is a convenience function for registering hooks that should
apply to all LLM calls across all executors.
Args:
hook: Function that receives LLMCallHookContext and can modify
context.messages directly. Should return None.
IMPORTANT: Modify messages in-place (append, extend, remove items).
Do NOT replace the list (context.messages = []), as this will break execution.
"""
_before_llm_call_hooks.append(hook)
def register_after_llm_call_hook(
hook: Callable[[LLMCallHookContext], str | None],
) -> None:
"""Register a global after_llm_call hook.
Global hooks are added to all executors automatically.
This is a convenience function for registering hooks that should
apply to all LLM calls across all executors.
Args:
hook: Function that receives LLMCallHookContext and can modify:
- The response: Return modified response string or None to keep original
- The messages: Modify context.messages directly (mutable reference)
Both modifications are supported and can be used together.
IMPORTANT: Modify messages in-place (append, extend, remove items).
Do NOT replace the list (context.messages = []), as this will break execution.
"""
_after_llm_call_hooks.append(hook)
def get_before_llm_call_hooks() -> list[Callable[[LLMCallHookContext], None]]:
"""Get all registered global before_llm_call hooks.
Returns:
List of registered before hooks
"""
return _before_llm_call_hooks.copy()
def get_after_llm_call_hooks() -> list[Callable[[LLMCallHookContext], str | None]]:
"""Get all registered global after_llm_call hooks.
Returns:
List of registered after hooks
"""
return _after_llm_call_hooks.copy()

View File

@@ -1,6 +1,8 @@
"""Types for CrewAI utilities."""
from typing import Any, Literal, TypedDict
from typing import Any, Literal
from typing_extensions import TypedDict
class LLMMessage(TypedDict):

View File

@@ -508,7 +508,47 @@ def test_agent_custom_max_iterations():
assert isinstance(result, str)
assert len(result) > 0
assert call_count > 0
assert call_count == 3
# With max_iter=1, expect 2 calls:
# - Call 1: iteration 0
# - Call 2: iteration 1 (max reached, handle_max_iterations_exceeded called, then loop breaks)
assert call_count == 2
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.timeout(30)
def test_agent_max_iterations_stops_loop():
"""Test that agent execution terminates when max_iter is reached."""
@tool
def get_data(step: str) -> str:
"""Get data for a step. Always returns data requiring more steps."""
return f"Data for {step}: incomplete, need to query more steps."
agent = Agent(
role="data collector",
goal="collect data using the get_data tool",
backstory="You must use the get_data tool extensively",
max_iter=2,
allow_delegation=False,
)
task = Task(
description="Use get_data tool for step1, step2, step3, step4, step5, step6, step7, step8, step9, and step10. Do NOT stop until you've called it for ALL steps.",
expected_output="A summary of all data collected",
)
result = agent.execute_task(
task=task,
tools=[get_data],
)
assert result is not None
assert isinstance(result, str)
assert agent.agent_executor.iterations <= agent.max_iter + 2, (
f"Agent ran {agent.agent_executor.iterations} iterations "
f"but should stop around {agent.max_iter + 1}. "
)
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -2117,15 +2157,14 @@ def test_agent_with_only_crewai_knowledge():
goal="Provide information based on knowledge sources",
backstory="You have access to specific knowledge sources.",
llm=LLM(
model="openrouter/openai/gpt-4o-mini",
api_key=os.getenv("OPENROUTER_API_KEY"),
model="gpt-4o-mini",
),
)
# Create a task that requires the agent to use the knowledge
task = Task(
description="What is Vidit's favorite color?",
expected_output="Vidit's favorclearite color.",
expected_output="Vidit's favorite color.",
agent=agent,
)
@@ -2675,3 +2714,293 @@ def test_agent_without_apps_no_platform_tools():
tools = crew._prepare_tools(agent, task, [])
assert tools == []
@pytest.mark.vcr(filter_headers=["authorization"])
def test_before_llm_call_hook_modifies_messages():
"""Test that before_llm_call hooks can modify messages."""
from crewai.utilities.llm_call_hooks import LLMCallHookContext, register_before_llm_call_hook
hook_called = False
original_message_count = 0
def before_hook(context: LLMCallHookContext) -> None:
nonlocal hook_called, original_message_count
hook_called = True
original_message_count = len(context.messages)
context.messages.append({
"role": "user",
"content": "Additional context: This is a test modification."
})
register_before_llm_call_hook(before_hook)
try:
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
allow_delegation=False,
)
task = Task(
description="Say hello",
expected_output="A greeting",
agent=agent,
)
result = agent.execute_task(task)
assert hook_called, "before_llm_call hook should have been called"
assert len(agent.agent_executor.messages) > original_message_count
assert result is not None
finally:
pass
@pytest.mark.vcr(filter_headers=["authorization"])
def test_after_llm_call_hook_modifies_messages_for_next_iteration():
"""Test that after_llm_call hooks can modify messages for the next iteration."""
from crewai.utilities.llm_call_hooks import LLMCallHookContext, register_after_llm_call_hook
hook_call_count = 0
hook_iterations = []
messages_added_in_iteration_0 = False
test_message_content = "HOOK_ADDED_MESSAGE_FOR_NEXT_ITERATION"
def after_hook(context: LLMCallHookContext) -> str | None:
nonlocal hook_call_count, hook_iterations, messages_added_in_iteration_0
hook_call_count += 1
current_iteration = context.iterations
hook_iterations.append(current_iteration)
if current_iteration == 0:
messages_before = len(context.messages)
context.messages.append({
"role": "user",
"content": test_message_content
})
messages_added_in_iteration_0 = True
assert len(context.messages) == messages_before + 1
return None
register_after_llm_call_hook(after_hook)
try:
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
allow_delegation=False,
max_iter=3,
)
task = Task(
description="Count to 3, taking your time",
expected_output="A count",
agent=agent,
)
result = agent.execute_task(task)
assert hook_call_count > 0, "after_llm_call hook should have been called"
assert messages_added_in_iteration_0, "Message should have been added in iteration 0"
executor_messages = agent.agent_executor.messages
message_contents = [msg.get("content", "") for msg in executor_messages if isinstance(msg, dict)]
assert any(test_message_content in content for content in message_contents), (
f"Message added by hook in iteration 0 should be present in executor messages. "
f"Messages: {message_contents}"
)
assert len(executor_messages) > 2, "Executor should have more than initial messages"
assert result is not None
finally:
pass
@pytest.mark.vcr(filter_headers=["authorization"])
def test_after_llm_call_hook_modifies_messages():
"""Test that after_llm_call hooks can modify messages for next iteration."""
from crewai.utilities.llm_call_hooks import LLMCallHookContext, register_after_llm_call_hook
hook_called = False
messages_before_hook = 0
def after_hook(context: LLMCallHookContext) -> str | None:
nonlocal hook_called, messages_before_hook
hook_called = True
messages_before_hook = len(context.messages)
context.messages.append({
"role": "user",
"content": "Remember: This is iteration 2 context."
})
return None # Don't modify response
register_after_llm_call_hook(after_hook)
try:
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
allow_delegation=False,
max_iter=2,
)
task = Task(
description="Count to 2",
expected_output="A count",
agent=agent,
)
result = agent.execute_task(task)
assert hook_called, "after_llm_call hook should have been called"
assert len(agent.agent_executor.messages) > messages_before_hook
assert result is not None
finally:
pass
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_call_hooks_with_crew():
"""Test that LLM call hooks work with crew execution."""
from crewai.utilities.llm_call_hooks import (
LLMCallHookContext,
register_after_llm_call_hook,
register_before_llm_call_hook,
)
before_hook_called = False
after_hook_called = False
def before_hook(context: LLMCallHookContext) -> None:
nonlocal before_hook_called
before_hook_called = True
assert context.executor is not None
assert context.agent is not None
assert context.task is not None
context.messages.append({
"role": "system",
"content": "Additional system context from hook."
})
def after_hook(context: LLMCallHookContext) -> str | None:
nonlocal after_hook_called
after_hook_called = True
assert context.response is not None
assert len(context.messages) > 0
return None
register_before_llm_call_hook(before_hook)
register_after_llm_call_hook(after_hook)
try:
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="You are a researcher",
allow_delegation=False,
)
task = Task(
description="Research AI frameworks",
expected_output="A research summary",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert before_hook_called, "before_llm_call hook should have been called"
assert after_hook_called, "after_llm_call hook should have been called"
assert result is not None
assert result.raw is not None
finally:
pass
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_call_hooks_can_modify_executor_attributes():
"""Test that hooks can access and modify executor attributes like tools."""
from crewai.utilities.llm_call_hooks import LLMCallHookContext, register_before_llm_call_hook
from crewai.tools import tool
@tool
def test_tool() -> str:
"""A test tool."""
return "test result"
hook_called = False
original_tools_count = 0
def before_hook(context: LLMCallHookContext) -> None:
nonlocal hook_called, original_tools_count
hook_called = True
original_tools_count = len(context.executor.tools)
assert context.executor.max_iter > 0
assert context.executor.iterations >= 0
assert context.executor.tools is not None
register_before_llm_call_hook(before_hook)
try:
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[test_tool],
allow_delegation=False,
)
task = Task(
description="Use the test tool",
expected_output="Tool result",
agent=agent,
)
result = agent.execute_task(task)
assert hook_called, "before_llm_call hook should have been called"
assert original_tools_count >= 0
assert result is not None
finally:
pass
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_call_hooks_error_handling():
"""Test that hook errors don't break execution."""
from crewai.utilities.llm_call_hooks import LLMCallHookContext, register_before_llm_call_hook
hook_called = False
def error_hook(context: LLMCallHookContext) -> None:
nonlocal hook_called
hook_called = True
raise ValueError("Test hook error")
register_before_llm_call_hook(error_hook)
try:
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
allow_delegation=False,
)
task = Task(
description="Say hello",
expected_output="A greeting",
agent=agent,
)
result = agent.execute_task(task)
assert hook_called, "before_llm_call hook should have been called"
assert result is not None
finally:
pass

View File

@@ -238,6 +238,27 @@ def test_lite_agent_returns_usage_metrics():
assert result.usage_metrics["total_tokens"] > 0
@pytest.mark.vcr(filter_headers=["authorization"])
def test_lite_agent_output_includes_messages():
"""Test that LiteAgentOutput includes messages from agent execution."""
llm = LLM(model="gpt-4o-mini")
agent = Agent(
role="Research Assistant",
goal="Find information about the population of Tokyo",
backstory="You are a helpful research assistant who can search for information about the population of Tokyo.",
llm=llm,
tools=[WebSearchTool()],
verbose=True,
)
result = agent.kickoff("What is the population of Tokyo?")
assert isinstance(result, LiteAgentOutput)
assert hasattr(result, "messages")
assert isinstance(result.messages, list)
assert len(result.messages) > 0
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.mark.asyncio
async def test_lite_agent_returns_usage_metrics_async():

View File

@@ -0,0 +1,126 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. Test backstory\nYour
personal goal is: Test goal\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"},{"role":"user","content":"\nCurrent Task: Count to 2\n\nThis
is the expected criteria for your final answer: A count\nyou MUST return the
actual complete content as the final answer, not a summary.\n\nBegin! This is
VERY important to you, use the tools available and give your best Final Answer,
your job depends on it!\n\nThought:"},{"role":"user","content":"Additional context:
This is a test modification."}],"model":"gpt-4.1-mini"}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '849'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.109.1
x-stainless-read-timeout:
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJNb5wwEL3zK0Y+QwSI7LLcokqVcujHoR9S2wg5ZsBujceyTdIo2v9e
GTYLaROpFyTmzXt+b2YeEwCmOtYAE5IHMVqdveH09UHKLx+/2eFzkAdZXL8XJPr9h3dFydLIoNuf
KMIT60LQaDUGRWaBhUMeMKoW+11ZH/K8rmZgpA51pA02ZNVFkY3KqKzMy8ssr7KiOtElKYGeNfA9
AQB4nL/RqOnwN2sgT58qI3rPB2TNuQmAOdKxwrj3ygduAktXUJAJaGbvnyRNgwwNXIOhexDcwKDu
EDgMMQBw4+/R/TBvleEarua/BooUyq2gw37yPKYyk9YbgBtDgcepzFFuTsjxbF7TYB3d+r+orFdG
edk65J5MNOoDWTajxwTgZh7S9Cw3s45GG9pAv3B+rtgdFj22LmeD1icwUOB6W9+nL+i1HQautN+M
mQkuJHYrdd0JnzpFGyDZpP7XzUvaS3Jlhv+RXwEh0AbsWuuwU+J54rXNYbzd19rOU54NM4/uTgls
g0IXN9Fhzye9HBTzDz7g2PbKDOisU8tV9batRFlfFn29K1lyTP4AAAD//wMApumqgWQDAAA=
headers:
CF-RAY:
- 99d044543db94e48-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 11 Nov 2025 19:41:25 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=KLlCOQ_zxXquDvj96O28ObVFEoAbFE8R7zlmuiuXH1M-1762890085-1.0.1.1-UChItG1GnLDHrErY60dUpkbD3lEkSvfkTQpOmEtzd0fjjm_y1pJQiB.VDXVi2pPIMSelir0ZgiVXSh5.hGPb3RjQqbH3pv0Rr_2dQ59OIQ8;
path=/; expires=Tue, 11-Nov-25 20:11:25 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=u.Z6xV9tQd3ucK35BinKtlCkewcI6q_uQicyeEeeR18-1762890085355-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '559'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '735'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-project-tokens:
- '150000000'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-project-tokens:
- '149999817'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999817'
x-ratelimit-reset-project-tokens:
- 0s
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_bcaa0f8500714ed09f967488b238ce2e
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,222 @@
interactions:
- request:
body: '{"trace_id": "aeb82647-004a-4a30-9481-d55f476d5659", "execution_type":
"crew", "user_identifier": null, "execution_context": {"crew_fingerprint": null,
"crew_name": "Unknown Crew", "flow_name": null, "crewai_version": "1.4.1", "privacy_level":
"standard"}, "execution_metadata": {"expected_duration_estimate": 300, "agent_count":
0, "task_count": 0, "flow_method_count": 0, "execution_started_at": "2025-11-11T19:45:17.648657+00:00"}}'
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '434'
Content-Type:
- application/json
User-Agent:
- CrewAI-CLI/1.4.1
X-Crewai-Version:
- 1.4.1
method: POST
uri: https://app.crewai.com/crewai_plus/api/v1/tracing/batches
response:
body:
string: '{"error":"bad_credentials","message":"Bad credentials"}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json; charset=utf-8
Date:
- Tue, 11 Nov 2025 19:45:17 GMT
cache-control:
- no-store
content-security-policy:
- 'default-src ''self'' *.app.crewai.com app.crewai.com; script-src ''self''
''unsafe-inline'' *.app.crewai.com app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts
https://www.gstatic.com https://run.pstmn.io https://apis.google.com https://apis.google.com/js/api.js
https://accounts.google.com https://accounts.google.com/gsi/client https://cdnjs.cloudflare.com/ajax/libs/normalize/8.0.1/normalize.min.css.map
https://*.google.com https://docs.google.com https://slides.google.com https://js.hs-scripts.com
https://js.sentry-cdn.com https://browser.sentry-cdn.com https://www.googletagmanager.com
https://js-na1.hs-scripts.com https://js.hubspot.com http://js-na1.hs-scripts.com
https://bat.bing.com https://cdn.amplitude.com https://cdn.segment.com https://d1d3n03t5zntha.cloudfront.net/
https://descriptusercontent.com https://edge.fullstory.com https://googleads.g.doubleclick.net
https://js.hs-analytics.net https://js.hs-banner.com https://js.hsadspixel.net
https://js.hscollectedforms.net https://js.usemessages.com https://snap.licdn.com
https://static.cloudflareinsights.com https://static.reo.dev https://www.google-analytics.com
https://share.descript.com/; style-src ''self'' ''unsafe-inline'' *.app.crewai.com
app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts; img-src ''self'' data:
*.app.crewai.com app.crewai.com https://zeus.tools.crewai.com https://dashboard.tools.crewai.com
https://cdn.jsdelivr.net https://forms.hsforms.com https://track.hubspot.com
https://px.ads.linkedin.com https://px4.ads.linkedin.com https://www.google.com
https://www.google.com.br; font-src ''self'' data: *.app.crewai.com app.crewai.com;
connect-src ''self'' *.app.crewai.com app.crewai.com https://zeus.tools.crewai.com
https://connect.useparagon.com/ https://zeus.useparagon.com/* https://*.useparagon.com/*
https://run.pstmn.io https://connect.tools.crewai.com/ https://*.sentry.io
https://www.google-analytics.com https://edge.fullstory.com https://rs.fullstory.com
https://api.hubspot.com https://forms.hscollectedforms.net https://api.hubapi.com
https://px.ads.linkedin.com https://px4.ads.linkedin.com https://google.com/pagead/form-data/16713662509
https://google.com/ccm/form-data/16713662509 https://www.google.com/ccm/collect
https://worker-actionkit.tools.crewai.com https://api.reo.dev; frame-src ''self''
*.app.crewai.com app.crewai.com https://connect.useparagon.com/ https://zeus.tools.crewai.com
https://zeus.useparagon.com/* https://connect.tools.crewai.com/ https://docs.google.com
https://drive.google.com https://slides.google.com https://accounts.google.com
https://*.google.com https://app.hubspot.com/ https://td.doubleclick.net https://www.googletagmanager.com/
https://www.youtube.com https://share.descript.com'
expires:
- '0'
permissions-policy:
- camera=(), microphone=(self), geolocation=()
pragma:
- no-cache
referrer-policy:
- strict-origin-when-cross-origin
strict-transport-security:
- max-age=63072000; includeSubDomains
vary:
- Accept
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
x-permitted-cross-domain-policies:
- none
x-request-id:
- 48a89b0d-206b-4c1b-aa0d-ecc3b4ab525c
x-runtime:
- '0.088251'
x-xss-protection:
- 1; mode=block
status:
code: 401
message: Unauthorized
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. Test backstory\nYour
personal goal is: Test goal\nTo give my best complete final answer to the task
respond using the exact following format:\n\nThought: I now can give a great
answer\nFinal Answer: Your final answer must be the great and the most complete
as possible, it must be outcome described.\n\nI MUST use these formats, my job
depends on it!"},{"role":"user","content":"\nCurrent Task: Count to 3, taking
your time\n\nThis is the expected criteria for your final answer: A count\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nBegin!
This is VERY important to you, use the tools available and give your best Final
Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
connection:
- keep-alive
content-length:
- '790'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.109.1
x-stainless-read-timeout:
- '600'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJNa9wwEL37Vww6r43tOpuNb2nKQgslOSy0NA1mIo9tdWVJSHK2Jex/
L/J+2Ns20IuE5s0bzXszrxEAEzUrgfEOPe+NjO9Q41atP3/79GG7vX8QD0Xq15svX9/fUd+yRWDo
5x/E/YmVcN0bSV5odYC5JfQUqmbXy3x1k77LViPQ65pkoLXGx0WSxb1QIs7T/CpOizgrjvROC06O
lfAYAQC8jmdoVNX0k5WQLk6RnpzDllh5TgJgVssQYeiccB6VZ4sJ5Fp5UmPvm04PbedL+AhK74Cj
gla8ECC0QQCgcjuy39VaKJRwO75KuFeUJAlsdnq8OkuUzD+w1AwOg0o1SDkDUCntMbg0Sns6Ivuz
GKlbY/Wz+4PKGqGE6ypL6LQKjTuvDRvRfQTwNJo2XPjAjNW98ZXXWxq/y5ZH09g0rBl6cwS99ihn
8esTcFGvqsmjkG5mO+PIO6on6jQjHGqhZ0A0U/13N/+qfVAuVPs/5SeAczKe6spYqgW/VDylWQq7
/Fba2eWxYebIvghOlRdkwyRqanCQhwVj7pfz1FeNUC1ZY8VhyxpTFTxfXWXNapmzaB/9BgAA//8D
AL0LXHV0AwAA
headers:
CF-RAY:
- 99d04a06dc4d1949-SJC
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 11 Nov 2025 19:45:18 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=KnsnYxgmlpoHf.5TWnNgU30xb2tc0gK7SC2BbUkud2M-1762890318-1.0.1.1-3KeaQY59x5mY6n8DINELLaH9_b68w7W4ZZ0KeOknBHmQyDwx5qbtDonfYxOjsO_KykjtJLHpB0bsINSNEa9TrjNQHqUWTlRhldfTLenUG44;
path=/; expires=Tue, 11-Nov-25 20:15:18 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=ekC35NRP79GCMP.eTi_odl5.6DIsAeFEXKlanWUZOH4-1762890318589-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '598'
openai-project:
- proj_xitITlrFeen7zjNSzML82h9x
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '632'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-project-tokens:
- '150000000'
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-project-tokens:
- '149999827'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999827'
x-ratelimit-reset-project-tokens:
- 0s
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_cb36cbe6c33b42a28675e8c6d9a36fe9
status:
code: 200
message: OK
version: 1

Some files were not shown because too many files have changed in this diff Show More