Compare commits

..

23 Commits

Author SHA1 Message Date
Devin AI
56f8ad5297 fix: remove unused import in examples file
- Remove unused event_stream_manager import from examples/human_input_event_streaming.py
- Resolves final lint issue preventing CI from passing

Co-Authored-By: João <joao@crewai.com>
2025-08-02 16:13:15 +00:00
Devin AI
f133d13a00 fix: resolve lint issues in human input event streaming implementation
- Remove unused imports from server modules and test files
- Fix undefined variable references in test assertions
- All ruff checks now pass locally

Co-Authored-By: João <joao@crewai.com>
2025-08-02 16:10:35 +00:00
Devin AI
ea560d0af1 feat: implement client-initiated real-time human input event streams
- Add HumanInputRequiredEvent and HumanInputCompletedEvent to task_events.py
- Implement HTTP server with WebSocket, SSE, and long-polling endpoints
- Add EventStreamManager for connection and event management
- Integrate event emission in agent executor human input flow
- Add comprehensive tests for server endpoints and event integration
- Add optional FastAPI dependencies for server functionality
- Include documentation and example usage
- Maintain backward compatibility with existing human input flow

Addresses issue #3259 for WebSocket/SSE/long-polling human input events

Co-Authored-By: João <joao@crewai.com>
2025-08-02 16:05:43 +00:00
Heitor Carvalho
88ed91561f feat: add crewai config command group and tests (#3206)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-07-31 10:38:51 -04:00
Lorenze Jay
9a347ad458 chore: update crewai-tools dependency to version 0.59.0 and bump CrewAI version to 0.152.0 (#3244)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Updated `crewai-tools` dependency from `0.58.0` to `0.59.0` in `pyproject.toml` and `uv.lock`.
- Bumped the version of the CrewAI library from `0.150.0` to `0.152.0` in `__init__.py`.
- Updated dependency versions in CLI templates for crew, flow, and tool projects to reflect the new CrewAI version.
2025-07-30 14:38:24 -07:00
Lucas Gomide
34c3075fdb fix: support to add memories to Mem0 with agent_id (#3217)
* fix: support to add memories to Mem0 with agent_id

* feat: removing memory_type checkings from Mem0Storage

* feat: ensure agent_id is always present while saving memory into Mem0

* fix: use OR operator when querying Mem0 memories with both user_id and agent_id
2025-07-30 11:56:46 -04:00
Vidit Ostwal
498e8dc6e8 Changed the import error to show missing module files (#2423)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Fix issue #2421: Handle missing google.genai dependency gracefully

Co-Authored-By: Joe Moura <joao@crewai.com>

* Fix import sorting in test file

Co-Authored-By: Joe Moura <joao@crewai.com>

* Fix import sorting with ruff

Co-Authored-By: Joe Moura <joao@crewai.com>

* Removed unwatned test case

* Added dynamic catching for all the embedder function

* Dropped the comment

* Added test case

* Fixed Linting Issue

* Flaky test case in 3.13

* Test Case fixed

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: Joe Moura <joao@crewai.com>
Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
2025-07-30 10:01:17 -04:00
Lorenze Jay
cb522cf500 Enhance Flow class to support custom flow names (#3234)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Added an optional `name` attribute to the Flow class for better identification.
- Updated event emissions to utilize the new `name` attribute, ensuring accurate flow naming in events.
- Added tests to verify the correct flow name is set and emitted during flow execution.
2025-07-29 15:41:30 -07:00
Vini Brasil
017acc74f5 Add timezone to event timestamps (#3231)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Events were lacking timezone information, making them naive datetimes,
which can be ambiguous.
2025-07-28 17:09:06 -03:00
Greyson LaLonde
fab86d197a Refactor: Move RAG components to dedicated top-level module (#3222)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Move RAG components to top-level module

- Create src/crewai/rag directory structure
- Move embeddings configurator from utilities to rag module
- Update imports across codebase and documentation
- Remove deprecated embedding files

* Remove empty knowledge/embedder directory
2025-07-25 10:55:31 -04:00
Vidit Ostwal
864e9bfb76 Changed the default value in Mem0 config (#3216)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Changed the default value in Mem0 config

* Added regression test for this

* Fixed Linting issues
2025-07-24 13:20:18 -04:00
Lucas Gomide
d3b45d197c fix: remove crewai signup references, replaced by crewai login (#3213) 2025-07-24 07:47:35 -04:00
Manuka Yasas
579153b070 docs: fix incorrect model naming in Google Vertex AI documentation (#3189)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Change model format from "gemini/gemini-1.5-pro-latest" to "gemini-1.5-pro-latest"
  in Vertex AI section examples
- Update both English and Portuguese documentation files
- Fixes incorrect provider prefix usage for Vertex AI models
- Ensures consistency with Vertex AI provider requirements

Files changed:
- docs/en/concepts/llms.mdx (line 272)
- docs/pt-BR/concepts/llms.mdx (line 270)

Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-07-23 16:58:57 -04:00
Lorenze Jay
b1fdcdfa6e chore: update dependencies and version in project files (#3212)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
- Updated `crewai-tools` dependency from `0.55.0` to `0.58.0` in `pyproject.toml` and `uv.lock`.
- Added new packages `anthropic`, `browserbase`, `playwright`, `pyee`, and `stagehand` with their respective versions in `uv.lock`.
- Bumped the version of the CrewAI library from `0.148.0` to `0.150.0` in `__init__.py`.
- Updated dependency versions in CLI templates for crew, flow, and tool projects to reflect the new CrewAI version.
2025-07-23 11:03:50 -07:00
Mike Plachta
18d76a270c docs: add SerperScrapeWebsiteTool documentation and reorganize SerperDevTool setup instructions (#3211) 2025-07-23 12:12:59 -04:00
Vidit Ostwal
30541239ad Changed Mem0 Storage v1.1 -> v2 (#2893)
* Changed v1.1 -> v2

* Fixed Test Cases:

* Fixed linting issues

* Changed docs

* Refractored the storage

* Fixed test cases

* Fixing run-time checks

* Fixed Test Case

* Updated docs and added test case for custom categories

* Add the TODO back

* Minor Changes

* Added output_format in search

* Minor changes

* Added output_format and version in both search and save

* Small change

* Minor bugs

* Fixed test cases

* Changed docs

---------

Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
2025-07-23 08:30:52 -04:00
Tony Kipkemboi
9a65573955 Feature/update docs (#3205)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* docs: add create_directory parameter

* docs: remove string guardrails to focus on function guardrails

* docs: remove get help from docs.json

* docs: update pt-BR docs.json changes
2025-07-22 13:55:27 -04:00
Lucas Gomide
27623a1d01 feat: remove duplicate print on LLM call error (#3183)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
By improving litellm handler error / outputs

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-07-21 22:08:07 -04:00
João Moura
2593242234 Adding Support to adhoc tool calling using the internal LLM class (#3195)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Adding Support to adhoc tool calling using the internal LLM class

* fix type
2025-07-21 19:36:48 -03:00
Greyson LaLonde
2ab6c31544 chore: add deprecation notices to UserMemory (#3201)
- Mark UserMemory and UserMemoryItem for removal in v0.156.0 or 2025-08-04
- Update all references with deprecation warnings
- Users should migrate to ExternalMemory
2025-07-21 15:26:34 -04:00
Lucas Gomide
3c55c8a22a fix: append user message when last message is from assistent when using Ollama models (#3200)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Ollama doesn't supports last message to be 'assistant'
We can drop this commit after merging https://github.com/BerriAI/litellm/pull/10917
2025-07-21 13:30:40 -04:00
Ranuga Disansa
424433ff58 docs: Add Tavily Search & Extractor tools to Search-Research suite (#3146)
* docs: Add Tavily Search and Extractor tools documentation

* docs: Add Tavily Search and Extractor tools to the documentation

---------

Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-07-21 12:01:29 -04:00
Lucas Gomide
2fd99503ed build: upgrade LiteLLM to 1.74.3 (#3199) 2025-07-21 09:58:47 -04:00
71 changed files with 5962 additions and 3898 deletions

View File

@@ -32,11 +32,6 @@
"href": "https://chatgpt.com/g/g-qqTuUWsBY-crewai-assistant",
"icon": "robot"
},
{
"anchor": "Get Help",
"href": "mailto:support@crewai.com",
"icon": "headset"
},
{
"anchor": "Releases",
"href": "https://github.com/crewAIInc/crewAI/releases",
@@ -166,7 +161,9 @@
"en/tools/search-research/websitesearchtool",
"en/tools/search-research/codedocssearchtool",
"en/tools/search-research/youtubechannelsearchtool",
"en/tools/search-research/youtubevideosearchtool"
"en/tools/search-research/youtubevideosearchtool",
"en/tools/search-research/tavilysearchtool",
"en/tools/search-research/tavilyextractortool"
]
},
{
@@ -370,11 +367,6 @@
"href": "https://chatgpt.com/g/g-qqTuUWsBY-crewai-assistant",
"icon": "robot"
},
{
"anchor": "Obter Ajuda",
"href": "mailto:support@crewai.com",
"icon": "headset"
},
{
"anchor": "Lançamentos",
"href": "https://github.com/crewAIInc/crewAI/releases",

View File

@@ -270,7 +270,7 @@ In this section, you'll find detailed examples that help you select, configure,
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -623,7 +623,7 @@ for provider in providers_to_test:
**Model not found errors:**
```python
# Verify model availability
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:
@@ -712,7 +712,7 @@ crew = Crew(
memory_config={
"provider": "mem0",
"config": {"user_id": "john"},
"user_memory": {} # Required - triggers user memory initialization
"user_memory": {} # DEPRECATED: Will be removed in version 0.156.0 or on 2025-08-04, use external_memory instead
},
process=Process.sequential,
verbose=True
@@ -720,7 +720,16 @@ crew = Crew(
```
### Advanced Mem0 Configuration
When using Mem0 Client, you can customize the memory configuration further, by using parameters like 'includes', 'excludes', 'custom_categories', 'infer' and 'run_id' (this is only for short-term memory).
You can find more details in the [Mem0 documentation](https://docs.mem0.ai/).
```python
new_categories = [
{"lifestyle_management_concerns": "Tracks daily routines, habits, hobbies and interests including cooking, time management and work-life balance"},
{"seeking_structure": "Documents goals around creating routines, schedules, and organized systems in various life areas"},
{"personal_information": "Basic information about the user including name, preferences, and personality traits"}
]
crew = Crew(
agents=[...],
tasks=[...],
@@ -732,6 +741,11 @@ crew = Crew(
"org_id": "my_org_id", # Optional
"project_id": "my_project_id", # Optional
"api_key": "custom-api-key" # Optional - overrides env var
"run_id": "my_run_id", # Optional - for short-term memory
"includes": "include1", # Optional
"excludes": "exclude1", # Optional
"infer": True # Optional defaults to True
"custom_categories": new_categories # Optional - custom categories for user memory
},
"user_memory": {}
}
@@ -761,7 +775,8 @@ crew = Crew(
"provider": "openai",
"config": {"api_key": "your-api-key", "model": "text-embedding-3-small"}
}
}
},
"infer": True # Optional defaults to True
},
"user_memory": {}
}

View File

@@ -54,10 +54,11 @@ crew = Crew(
| **Markdown** _(optional)_ | `markdown` | `Optional[bool]` | Whether the task should instruct the agent to return the final answer formatted in Markdown. Defaults to False. |
| **Config** _(optional)_ | `config` | `Optional[Dict[str, Any]]` | Task-specific configuration parameters. |
| **Output File** _(optional)_ | `output_file` | `Optional[str]` | File path for storing the task output. |
| **Create Directory** _(optional)_ | `create_directory` | `Optional[bool]` | Whether to create the directory for output_file if it doesn't exist. Defaults to True. |
| **Output JSON** _(optional)_ | `output_json` | `Optional[Type[BaseModel]]` | A Pydantic model to structure the JSON output. |
| **Output Pydantic** _(optional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | A Pydantic model for task output. |
| **Callback** _(optional)_ | `callback` | `Optional[Any]` | Function/object to be executed after task completion. |
| **Guardrail** _(optional)_ | `guardrail` | `Optional[Union[Callable, str]]` | Function or string description to validate task output before proceeding to next task. |
| **Guardrail** _(optional)_ | `guardrail` | `Optional[Callable]` | Function to validate task output before proceeding to next task. |
## Creating Tasks
@@ -87,7 +88,6 @@ research_task:
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
guardrail: ensure each bullet contains a minimum of 100 words
reporting_task:
description: >
@@ -334,9 +334,7 @@ Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
feedback to agents when their output doesn't meet specific criteria.
**Guardrails can be defined in two ways:**
1. **Function-based guardrails**: Python functions that implement custom validation logic
2. **String-based guardrails**: Natural language descriptions that are automatically converted to LLM-powered validation
Guardrails are implemented as Python functions that contain custom validation logic, giving you complete control over the validation process and ensuring reliable, deterministic results.
### Function-Based Guardrails
@@ -378,82 +376,7 @@ blog_task = Task(
- On success: it returns a tuple of `(bool, Any)`. For example: `(True, validated_result)`
- On Failure: it returns a tuple of `(bool, str)`. For example: `(False, "Error message explain the failure")`
### String-Based Guardrails
String-based guardrails allow you to describe validation criteria in natural language. When you provide a string instead of a function, CrewAI automatically converts it to an `LLMGuardrail` that uses an AI agent to validate the task output.
#### Using String Guardrails in Python
```python Code
from crewai import Task
# Simple string-based guardrail
blog_task = Task(
description="Write a blog post about AI",
expected_output="A blog post under 200 words",
agent=blog_agent,
guardrail="Ensure the blog post is under 200 words and includes practical examples"
)
# More complex validation criteria
research_task = Task(
description="Research AI trends for 2025",
expected_output="A comprehensive research report",
agent=research_agent,
guardrail="Ensure each finding includes a credible source and is backed by recent data from 2024-2025"
)
```
#### Using String Guardrails in YAML
```yaml
research_task:
description: Research the latest AI developments
expected_output: A list of 10 bullet points about AI
agent: researcher
guardrail: ensure each bullet contains a minimum of 100 words
validation_task:
description: Validate the research findings
expected_output: A validation report
agent: validator
guardrail: confirm all sources are from reputable publications and published within the last 2 years
```
#### How String Guardrails Work
When you provide a string guardrail, CrewAI automatically:
1. Creates an `LLMGuardrail` instance using the string as validation criteria
2. Uses the task's agent LLM to power the validation
3. Creates a temporary validation agent that checks the output against your criteria
4. Returns detailed feedback if validation fails
This approach is ideal when you want to use natural language to describe validation rules without writing custom validation functions.
### LLMGuardrail Class
The `LLMGuardrail` class is the underlying mechanism that powers string-based guardrails. You can also use it directly for more advanced control:
```python Code
from crewai import Task
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.llm import LLM
# Create a custom LLMGuardrail with specific LLM
custom_guardrail = LLMGuardrail(
description="Ensure the response contains exactly 5 bullet points with proper citations",
llm=LLM(model="gpt-4o-mini")
)
task = Task(
description="Research AI safety measures",
expected_output="A detailed analysis with bullet points",
agent=research_agent,
guardrail=custom_guardrail
)
```
**Note**: When you use a string guardrail, CrewAI automatically creates an `LLMGuardrail` instance using your task's agent LLM. Using `LLMGuardrail` directly gives you more control over the validation process and LLM selection.
### Error Handling Best Practices
@@ -881,21 +804,87 @@ These validations help in maintaining the consistency and reliability of task ex
## Creating Directories when Saving Files
You can now specify if a task should create directories when saving its output to a file. This is particularly useful for organizing outputs and ensuring that file paths are correctly structured.
The `create_directory` parameter controls whether CrewAI should automatically create directories when saving task outputs to files. This feature is particularly useful for organizing outputs and ensuring that file paths are correctly structured, especially when working with complex project hierarchies.
### Default Behavior
By default, `create_directory=True`, which means CrewAI will automatically create any missing directories in the output file path:
```python Code
# ...
save_output_task = Task(
description='Save the summarized AI news to a file',
expected_output='File saved successfully',
agent=research_agent,
tools=[file_save_tool],
output_file='outputs/ai_news_summary.txt',
create_directory=True
# Default behavior - directories are created automatically
report_task = Task(
description='Generate a comprehensive market analysis report',
expected_output='A detailed market analysis with charts and insights',
agent=analyst_agent,
output_file='reports/2025/market_analysis.md', # Creates 'reports/2025/' if it doesn't exist
markdown=True
)
```
#...
### Disabling Directory Creation
If you want to prevent automatic directory creation and ensure that the directory already exists, set `create_directory=False`:
```python Code
# Strict mode - directory must already exist
strict_output_task = Task(
description='Save critical data that requires existing infrastructure',
expected_output='Data saved to pre-configured location',
agent=data_agent,
output_file='secure/vault/critical_data.json',
create_directory=False # Will raise RuntimeError if 'secure/vault/' doesn't exist
)
```
### YAML Configuration
You can also configure this behavior in your YAML task definitions:
```yaml tasks.yaml
analysis_task:
description: >
Generate quarterly financial analysis
expected_output: >
A comprehensive financial report with quarterly insights
agent: financial_analyst
output_file: reports/quarterly/q4_2024_analysis.pdf
create_directory: true # Automatically create 'reports/quarterly/' directory
audit_task:
description: >
Perform compliance audit and save to existing audit directory
expected_output: >
A compliance audit report
agent: auditor
output_file: audit/compliance_report.md
create_directory: false # Directory must already exist
```
### Use Cases
**Automatic Directory Creation (`create_directory=True`):**
- Development and prototyping environments
- Dynamic report generation with date-based folders
- Automated workflows where directory structure may vary
- Multi-tenant applications with user-specific folders
**Manual Directory Management (`create_directory=False`):**
- Production environments with strict file system controls
- Security-sensitive applications where directories must be pre-configured
- Systems with specific permission requirements
- Compliance environments where directory creation is audited
### Error Handling
When `create_directory=False` and the directory doesn't exist, CrewAI will raise a `RuntimeError`:
```python Code
try:
result = crew.kickoff()
except RuntimeError as e:
# Handle missing directory error
print(f"Directory creation failed: {e}")
# Create directory manually or use fallback location
```
Check out the video below to see how to use structured outputs in CrewAI:

View File

@@ -44,6 +44,14 @@ These tools enable your agents to search the web, research topics, and find info
<Card title="YouTube Video Search" icon="play" href="/en/tools/search-research/youtubevideosearchtool">
Find and analyze YouTube videos by topic, keyword, or criteria.
</Card>
<Card title="Tavily Search Tool" icon="magnifying-glass" href="/en/tools/search-research/tavilysearchtool">
Comprehensive web search using Tavily's AI-powered search API.
</Card>
<Card title="Tavily Extractor Tool" icon="file-text" href="/en/tools/search-research/tavilyextractortool">
Extract structured content from web pages using the Tavily API.
</Card>
</CardGroup>
## **Common Use Cases**
@@ -55,17 +63,19 @@ These tools enable your agents to search the web, research topics, and find info
- **Academic Research**: Find scholarly articles and technical papers
```python
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool, TavilySearchTool, TavilyExtractorTool
# Create research tools
web_search = SerperDevTool()
code_search = GitHubSearchTool()
video_research = YoutubeVideoSearchTool()
tavily_search = TavilySearchTool()
content_extractor = TavilyExtractorTool()
# Add to your agent
agent = Agent(
role="Research Analyst",
tools=[web_search, code_search, video_research],
tools=[web_search, code_search, video_research, tavily_search, content_extractor],
goal="Gather comprehensive information on any topic"
)
```

View File

@@ -6,10 +6,6 @@ icon: google
# `SerperDevTool`
<Note>
We are still working on improving tools, so there might be unexpected behavior or changes in the future.
</Note>
## Description
This tool is designed to perform a semantic search for a specified query from a text's content across the internet. It utilizes the [serper.dev](https://serper.dev) API
@@ -17,6 +13,12 @@ to fetch and display the most relevant search results based on the query provide
## Installation
To effectively use the `SerperDevTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for a free account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
To incorporate this tool into your project, follow the installation instructions below:
```shell
@@ -34,14 +36,6 @@ from crewai_tools import SerperDevTool
tool = SerperDevTool()
```
## Steps to Get Started
To effectively use the `SerperDevTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for a free account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
## Parameters
The `SerperDevTool` comes with several parameters that will be passed to the API :

View File

@@ -0,0 +1,139 @@
---
title: "Tavily Extractor Tool"
description: "Extract structured content from web pages using the Tavily API"
icon: "file-text"
---
The `TavilyExtractorTool` allows CrewAI agents to extract structured content from web pages using the Tavily API. It can process single URLs or lists of URLs and provides options for controlling the extraction depth and including images.
## Installation
To use the `TavilyExtractorTool`, you need to install the `tavily-python` library:
```shell
pip install 'crewai[tools]' tavily-python
```
You also need to set your Tavily API key as an environment variable:
```bash
export TAVILY_API_KEY='your-tavily-api-key'
```
## Example Usage
Here's how to initialize and use the `TavilyExtractorTool` within a CrewAI agent:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import TavilyExtractorTool
# Ensure TAVILY_API_KEY is set in your environment
# os.environ["TAVILY_API_KEY"] = "YOUR_API_KEY"
# Initialize the tool
tavily_tool = TavilyExtractorTool()
# Create an agent that uses the tool
extractor_agent = Agent(
role='Web Content Extractor',
goal='Extract key information from specified web pages',
backstory='You are an expert at extracting relevant content from websites using the Tavily API.',
tools=[tavily_tool],
verbose=True
)
# Define a task for the agent
extract_task = Task(
description='Extract the main content from the URL https://example.com using basic extraction depth.',
expected_output='A JSON string containing the extracted content from the URL.',
agent=extractor_agent
)
# Create and run the crew
crew = Crew(
agents=[extractor_agent],
tasks=[extract_task],
verbose=2
)
result = crew.kickoff()
print(result)
```
## Configuration Options
The `TavilyExtractorTool` accepts the following arguments:
- `urls` (Union[List[str], str]): **Required**. A single URL string or a list of URL strings to extract data from.
- `include_images` (Optional[bool]): Whether to include images in the extraction results. Defaults to `False`.
- `extract_depth` (Literal["basic", "advanced"]): The depth of extraction. Use `"basic"` for faster, surface-level extraction or `"advanced"` for more comprehensive extraction. Defaults to `"basic"`.
- `timeout` (int): The maximum time in seconds to wait for the extraction request to complete. Defaults to `60`.
## Advanced Usage
### Multiple URLs with Advanced Extraction
```python
# Example with multiple URLs and advanced extraction
multi_extract_task = Task(
description='Extract content from https://example.com and https://anotherexample.org using advanced extraction.',
expected_output='A JSON string containing the extracted content from both URLs.',
agent=extractor_agent
)
# Configure the tool with custom parameters
custom_extractor = TavilyExtractorTool(
extract_depth='advanced',
include_images=True,
timeout=120
)
agent_with_custom_tool = Agent(
role="Advanced Content Extractor",
goal="Extract comprehensive content with images",
tools=[custom_extractor]
)
```
### Tool Parameters
You can customize the tool's behavior by setting parameters during initialization:
```python
# Initialize with custom configuration
extractor_tool = TavilyExtractorTool(
extract_depth='advanced', # More comprehensive extraction
include_images=True, # Include image results
timeout=90 # Custom timeout
)
```
## Features
- **Single or Multiple URLs**: Extract content from one URL or process multiple URLs in a single request
- **Configurable Depth**: Choose between basic (fast) and advanced (comprehensive) extraction modes
- **Image Support**: Optionally include images in the extraction results
- **Structured Output**: Returns well-formatted JSON containing the extracted content
- **Error Handling**: Robust handling of network timeouts and extraction errors
## Response Format
The tool returns a JSON string representing the structured data extracted from the provided URL(s). The exact structure depends on the content of the pages and the `extract_depth` used.
Common response elements include:
- **Title**: The page title
- **Content**: Main text content of the page
- **Images**: Image URLs and metadata (when `include_images=True`)
- **Metadata**: Additional page information like author, description, etc.
## Use Cases
- **Content Analysis**: Extract and analyze content from competitor websites
- **Research**: Gather structured data from multiple sources for analysis
- **Content Migration**: Extract content from existing websites for migration
- **Monitoring**: Regular extraction of content for change detection
- **Data Collection**: Systematic extraction of information from web sources
Refer to the [Tavily API documentation](https://docs.tavily.com/docs/tavily-api/python-sdk#extract) for detailed information about the response structure and available options.

View File

@@ -0,0 +1,122 @@
---
title: "Tavily Search Tool"
description: "Perform comprehensive web searches using the Tavily Search API"
icon: "magnifying-glass"
---
The `TavilySearchTool` provides an interface to the Tavily Search API, enabling CrewAI agents to perform comprehensive web searches. It allows for specifying search depth, topics, time ranges, included/excluded domains, and whether to include direct answers, raw content, or images in the results.
## Installation
To use the `TavilySearchTool`, you need to install the `tavily-python` library:
```shell
pip install 'crewai[tools]' tavily-python
```
## Environment Variables
Ensure your Tavily API key is set as an environment variable:
```bash
export TAVILY_API_KEY='your_tavily_api_key'
```
## Example Usage
Here's how to initialize and use the `TavilySearchTool` within a CrewAI agent:
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import TavilySearchTool
# Ensure the TAVILY_API_KEY environment variable is set
# os.environ["TAVILY_API_KEY"] = "YOUR_TAVILY_API_KEY"
# Initialize the tool
tavily_tool = TavilySearchTool()
# Create an agent that uses the tool
researcher = Agent(
role='Market Researcher',
goal='Find information about the latest AI trends',
backstory='An expert market researcher specializing in technology.',
tools=[tavily_tool],
verbose=True
)
# Create a task for the agent
research_task = Task(
description='Search for the top 3 AI trends in 2024.',
expected_output='A JSON report summarizing the top 3 AI trends found.',
agent=researcher
)
# Form the crew and kick it off
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=2
)
result = crew.kickoff()
print(result)
```
## Configuration Options
The `TavilySearchTool` accepts the following arguments during initialization or when calling the `run` method:
- `query` (str): **Required**. The search query string.
- `search_depth` (Literal["basic", "advanced"], optional): The depth of the search. Defaults to `"basic"`.
- `topic` (Literal["general", "news", "finance"], optional): The topic to focus the search on. Defaults to `"general"`.
- `time_range` (Literal["day", "week", "month", "year"], optional): The time range for the search. Defaults to `None`.
- `days` (int, optional): The number of days to search back. Relevant if `time_range` is not set. Defaults to `7`.
- `max_results` (int, optional): The maximum number of search results to return. Defaults to `5`.
- `include_domains` (Sequence[str], optional): A list of domains to prioritize in the search. Defaults to `None`.
- `exclude_domains` (Sequence[str], optional): A list of domains to exclude from the search. Defaults to `None`.
- `include_answer` (Union[bool, Literal["basic", "advanced"]], optional): Whether to include a direct answer synthesized from the search results. Defaults to `False`.
- `include_raw_content` (bool, optional): Whether to include the raw HTML content of the searched pages. Defaults to `False`.
- `include_images` (bool, optional): Whether to include image results. Defaults to `False`.
- `timeout` (int, optional): The request timeout in seconds. Defaults to `60`.
## Advanced Usage
You can configure the tool with custom parameters:
```python
# Example: Initialize with specific parameters
custom_tavily_tool = TavilySearchTool(
search_depth='advanced',
max_results=10,
include_answer=True
)
# The agent will use these defaults
agent_with_custom_tool = Agent(
role="Advanced Researcher",
goal="Conduct detailed research with comprehensive results",
tools=[custom_tavily_tool]
)
```
## Features
- **Comprehensive Search**: Access to Tavily's powerful search index
- **Configurable Depth**: Choose between basic and advanced search modes
- **Topic Filtering**: Focus searches on general, news, or finance topics
- **Time Range Control**: Limit results to specific time periods
- **Domain Control**: Include or exclude specific domains
- **Direct Answers**: Get synthesized answers from search results
- **Content Filtering**: Prevent context window issues with automatic content truncation
## Response Format
The tool returns search results as a JSON string containing:
- Search results with titles, URLs, and content snippets
- Optional direct answers to queries
- Optional image results
- Optional raw HTML content (when enabled)
Content for each result is automatically truncated to prevent context window issues while maintaining the most relevant information.

View File

@@ -0,0 +1,100 @@
---
title: Serper Scrape Website
description: The `SerperScrapeWebsiteTool` is designed to scrape websites and extract clean, readable content using Serper's scraping API.
icon: globe
---
# `SerperScrapeWebsiteTool`
## Description
This tool is designed to scrape website content and extract clean, readable text from any website URL. It utilizes the [serper.dev](https://serper.dev) scraping API to fetch and process web pages, optionally including markdown formatting for better structure and readability.
## Installation
To effectively use the `SerperScrapeWebsiteTool`, follow these steps:
1. **Package Installation**: Confirm that the `crewai[tools]` package is installed in your Python environment.
2. **API Key Acquisition**: Acquire a `serper.dev` API key by registering for an account at `serper.dev`.
3. **Environment Configuration**: Store your obtained API key in an environment variable named `SERPER_API_KEY` to facilitate its use by the tool.
To incorporate this tool into your project, follow the installation instructions below:
```shell
pip install 'crewai[tools]'
```
## Example
The following example demonstrates how to initialize the tool and scrape a website:
```python Code
from crewai_tools import SerperScrapeWebsiteTool
# Initialize the tool for website scraping capabilities
tool = SerperScrapeWebsiteTool()
# Scrape a website with markdown formatting
result = tool.run(url="https://example.com", include_markdown=True)
```
## Arguments
The `SerperScrapeWebsiteTool` accepts the following arguments:
- **url**: Required. The URL of the website to scrape.
- **include_markdown**: Optional. Whether to include markdown formatting in the scraped content. Defaults to `True`.
## Example with Parameters
Here is an example demonstrating how to use the tool with different parameters:
```python Code
from crewai_tools import SerperScrapeWebsiteTool
tool = SerperScrapeWebsiteTool()
# Scrape with markdown formatting (default)
markdown_result = tool.run(
url="https://docs.crewai.com",
include_markdown=True
)
# Scrape without markdown formatting for plain text
plain_result = tool.run(
url="https://docs.crewai.com",
include_markdown=False
)
print("Markdown formatted content:")
print(markdown_result)
print("\nPlain text content:")
print(plain_result)
```
## Use Cases
The `SerperScrapeWebsiteTool` is particularly useful for:
- **Content Analysis**: Extract and analyze website content for research purposes
- **Data Collection**: Gather structured information from web pages
- **Documentation Processing**: Convert web-based documentation into readable formats
- **Competitive Analysis**: Scrape competitor websites for market research
- **Content Migration**: Extract content from existing websites for migration purposes
## Error Handling
The tool includes comprehensive error handling for:
- **Network Issues**: Handles connection timeouts and network errors gracefully
- **API Errors**: Provides detailed error messages for API-related issues
- **Invalid URLs**: Validates and reports issues with malformed URLs
- **Authentication**: Clear error messages for missing or invalid API keys
## Security Considerations
- Always store your `SERPER_API_KEY` in environment variables, never hardcode it in your source code
- Be mindful of rate limits imposed by the Serper API
- Respect robots.txt and website terms of service when scraping content
- Consider implementing delays between requests for large-scale scraping operations

View File

@@ -0,0 +1,264 @@
# Human Input Event Streaming
CrewAI supports real-time event streaming for human input events, allowing clients to receive notifications when human input is required during crew execution. This feature provides an alternative to webhook-only approaches and supports multiple streaming protocols.
## Overview
When a task requires human input (`task.human_input=True`), CrewAI emits events that can be consumed via:
- **WebSocket**: Real-time bidirectional communication
- **Server-Sent Events (SSE)**: Unidirectional server-to-client streaming
- **Long Polling**: HTTP-based polling for events
## Event Types
### HumanInputRequiredEvent
Emitted when human input is required during task execution.
```json
{
"type": "human_input_required",
"execution_id": "uuid",
"crew_id": "uuid",
"task_id": "uuid",
"agent_id": "uuid",
"prompt": "string",
"context": "string",
"timestamp": "ISO8601",
"event_id": "uuid",
"reason_flags": {
"ambiguity": true,
"missing_field": false
}
}
```
### HumanInputCompletedEvent
Emitted when human input is completed.
```json
{
"type": "human_input_completed",
"execution_id": "uuid",
"crew_id": "uuid",
"task_id": "uuid",
"agent_id": "uuid",
"event_id": "uuid",
"human_feedback": "string",
"timestamp": "ISO8601"
}
```
## Server Setup
### Installation
Install the server dependencies:
```bash
pip install crewai[server]
```
### Starting the Server
```python
from crewai.server.human_input_server import HumanInputServer
# Start server with authentication
server = HumanInputServer(
host="localhost",
port=8000,
api_key="your-api-key"
)
# Start synchronously
server.start()
# Or start asynchronously
await server.start_async()
```
### Configuration Options
- `host`: Server host (default: "localhost")
- `port`: Server port (default: 8000)
- `api_key`: Optional API key for authentication
## Client Integration
### WebSocket Client
```python
import asyncio
import json
import websockets
async def websocket_client(execution_id: str, api_key: str = None):
uri = f"ws://localhost:8000/ws/human-input/{execution_id}"
if api_key:
uri += f"?token={api_key}"
async with websockets.connect(uri) as websocket:
async for message in websocket:
event = json.loads(message)
if event['type'] == 'human_input_required':
print(f"Human input needed: {event['prompt']}")
print(f"Context: {event['context']}")
elif event['type'] == 'human_input_completed':
print(f"Input completed: {event['human_feedback']}")
# Usage
asyncio.run(websocket_client("execution-id", "api-key"))
```
### Server-Sent Events (SSE) Client
```python
import httpx
import json
async def sse_client(execution_id: str, api_key: str = None):
url = f"http://localhost:8000/events/human-input/{execution_id}"
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
async with httpx.AsyncClient() as client:
async with client.stream("GET", url, headers=headers) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
event = json.loads(line[6:])
if event.get('type') != 'heartbeat':
print(f"Received: {event}")
# Usage
asyncio.run(sse_client("execution-id", "api-key"))
```
### Long Polling Client
```python
import httpx
import asyncio
async def polling_client(execution_id: str, api_key: str = None):
url = f"http://localhost:8000/poll/human-input/{execution_id}"
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
last_event_id = None
async with httpx.AsyncClient() as client:
while True:
params = {}
if last_event_id:
params["last_event_id"] = last_event_id
response = await client.get(url, headers=headers, params=params)
data = response.json()
for event in data.get("events", []):
print(f"Received: {event}")
last_event_id = event.get('event_id')
await asyncio.sleep(2) # Poll every 2 seconds
# Usage
asyncio.run(polling_client("execution-id", "api-key"))
```
## API Endpoints
### WebSocket Endpoint
- **URL**: `/ws/human-input/{execution_id}`
- **Protocol**: WebSocket
- **Authentication**: Query parameter `token` (if API key configured)
### SSE Endpoint
- **URL**: `/events/human-input/{execution_id}`
- **Method**: GET
- **Headers**: `Authorization: Bearer <api_key>` (if configured)
- **Response**: `text/event-stream`
### Polling Endpoint
- **URL**: `/poll/human-input/{execution_id}`
- **Method**: GET
- **Headers**: `Authorization: Bearer <api_key>` (if configured)
- **Query Parameters**:
- `last_event_id`: Get events after this ID
- **Response**: JSON with `events` array
### Health Check
- **URL**: `/health`
- **Method**: GET
- **Response**: `{"status": "healthy", "timestamp": "..."}`
## Authentication
When an API key is configured, clients must authenticate:
- **WebSocket**: Include `token` query parameter
- **SSE/Polling**: Include `Authorization: Bearer <api_key>` header
## Integration with Crew Execution
The event streaming works automatically with existing crew execution:
```python
from crewai import Agent, Task, Crew
# Create crew with human input task
agent = Agent(...)
task = Task(
description="...",
human_input=True, # This enables human input
agent=agent
)
crew = Crew(agents=[agent], tasks=[task])
# Start event server (optional)
server = HumanInputServer(port=8000)
server_thread = threading.Thread(target=server.start, daemon=True)
server_thread.start()
# Execute crew - events will be emitted automatically
result = crew.kickoff()
```
## Error Handling
- **Connection Errors**: Clients should implement reconnection logic
- **Authentication Errors**: Server returns 401 for invalid credentials
- **Rate Limiting**: Consider implementing client-side rate limiting for polling
## Best Practices
1. **Use WebSocket** for real-time applications requiring immediate notifications
2. **Use SSE** for one-way streaming with automatic reconnection support
3. **Use Polling** for simple implementations or when WebSocket/SSE aren't available
4. **Implement Authentication** in production environments
5. **Handle Connection Failures** gracefully with retry logic
6. **Filter Events** by execution_id to avoid processing irrelevant events
## Backward Compatibility
This feature is fully backward compatible:
- Existing webhook functionality remains unchanged
- Console-based human input continues to work
- No breaking changes to existing APIs
## Example Applications
- **Web Dashboards**: Real-time crew execution monitoring
- **Mobile Apps**: Push notifications for human input requests
- **Integration Platforms**: Event-driven workflow automation
- **Monitoring Systems**: Real-time alerting and logging

View File

@@ -84,8 +84,8 @@ filename = "seu_modelo.pkl"
try:
SuaCrew().crew().train(
n_iterations=n_iterations,
inputs=inputs,
n_iterations=n_iterations,
inputs=inputs,
filename=filename
)
except Exception as e:
@@ -103,7 +103,7 @@ crewai replay [OPTIONS]
- `-t, --task_id TEXT`: Reexecuta o crew a partir deste task ID, incluindo todas as tarefas subsequentes
Exemplo:
```shell Terminal
```shell Terminal
crewai replay -t task_123456
```
@@ -149,7 +149,7 @@ crewai test [OPTIONS]
- `-m, --model TEXT`: Modelo LLM para executar os testes no Crew (padrão: "gpt-4o-mini")
Exemplo:
```shell Terminal
```shell Terminal
crewai test -n 5 -m gpt-3.5-turbo
```
@@ -203,10 +203,7 @@ def crew(self) -> Crew:
Implemente o crew ou flow no [CrewAI Enterprise](https://app.crewai.com).
- **Autenticação**: Você precisa estar autenticado para implementar no CrewAI Enterprise.
```shell Terminal
crewai signup
```
Caso já tenha uma conta, você pode fazer login com:
Você pode fazer login ou criar uma conta com:
```shell Terminal
crewai login
```
@@ -253,7 +250,7 @@ Você deve estar autenticado no CrewAI Enterprise para usar estes comandos de ge
- **Implantar o Crew**: Depois de autenticado, você pode implantar seu crew ou flow no CrewAI Enterprise.
```shell Terminal
crewai deploy push
```
```
- Inicia o processo de deployment na plataforma CrewAI Enterprise.
- Após a iniciação bem-sucedida, será exibida a mensagem Deployment created successfully! juntamente com o Nome do Deployment e um Deployment ID (UUID) único.
@@ -326,4 +323,4 @@ Ao escolher um provedor, o CLI solicitará que você informe o nome da chave e a
Veja o seguinte link para o nome de chave de cada provedor:
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)

View File

@@ -268,7 +268,7 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -623,7 +623,7 @@ for provider in providers_to_test:
**Erros de modelo não encontrado:**
```python
# Verifique disponibilidade do modelo
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:

View File

@@ -54,10 +54,11 @@ crew = Crew(
| **Markdown** _(opcional)_ | `markdown` | `Optional[bool]` | Se a tarefa deve instruir o agente a retornar a resposta final formatada em Markdown. O padrão é False. |
| **Config** _(opcional)_ | `config` | `Optional[Dict[str, Any]]` | Parâmetros de configuração específicos da tarefa. |
| **Arquivo de Saída** _(opcional)_| `output_file` | `Optional[str]` | Caminho do arquivo para armazenar a saída da tarefa. |
| **Criar Diretório** _(opcional)_ | `create_directory` | `Optional[bool]` | Se deve criar o diretório para output_file caso não exista. O padrão é True. |
| **Saída JSON** _(opcional)_ | `output_json` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para estruturar a saída em JSON. |
| **Output Pydantic** _(opcional)_ | `output_pydantic` | `Optional[Type[BaseModel]]` | Um modelo Pydantic para a saída da tarefa. |
| **Callback** _(opcional)_ | `callback` | `Optional[Any]` | Função/objeto a ser executado após a conclusão da tarefa. |
| **Guardrail** _(opcional)_ | `guardrail` | `Optional[Union[Callable, str]]` | Função ou descrição em string para validar a saída da tarefa antes de prosseguir para a próxima tarefa. |
| **Guardrail** _(opcional)_ | `guardrail` | `Optional[Callable]` | Função para validar a saída da tarefa antes de prosseguir para a próxima tarefa. |
## Criando Tarefas
@@ -87,7 +88,6 @@ research_task:
expected_output: >
Uma lista com 10 tópicos em bullet points das informações mais relevantes sobre {topic}
agent: researcher
guardrail: garanta que cada bullet point contenha no mínimo 100 palavras
reporting_task:
description: >
@@ -332,9 +332,7 @@ analysis_task = Task(
Guardrails (trilhas de proteção) de tarefas fornecem uma maneira de validar e transformar as saídas das tarefas antes que elas sejam passadas para a próxima tarefa. Esse recurso assegura a qualidade dos dados e oferece feedback aos agentes quando sua saída não atende a critérios específicos.
**Guardrails podem ser definidos de duas maneiras:**
1. **Guardrails baseados em função**: Funções Python que implementam lógica de validação customizada
2. **Guardrails baseados em string**: Descrições em linguagem natural que são automaticamente convertidas em validação baseada em LLM
Guardrails são implementados como funções Python que contêm lógica de validação customizada, proporcionando controle total sobre o processo de validação e garantindo resultados confiáveis e determinísticos.
### Guardrails Baseados em Função
@@ -376,82 +374,7 @@ blog_task = Task(
- Em caso de sucesso: retorna uma tupla `(True, resultado_validado)`
- Em caso de falha: retorna uma tupla `(False, "mensagem de erro explicando a falha")`
### Guardrails Baseados em String
Guardrails baseados em string permitem que você descreva critérios de validação em linguagem natural. Quando você fornece uma string em vez de uma função, o CrewAI automaticamente a converte em um `LLMGuardrail` que usa um agente de IA para validar a saída da tarefa.
#### Usando Guardrails de String em Python
```python Code
from crewai import Task
# Guardrail simples baseado em string
blog_task = Task(
description="Escreva um post de blog sobre IA",
expected_output="Um post de blog com menos de 200 palavras",
agent=blog_agent,
guardrail="Garanta que o post do blog tenha menos de 200 palavras e inclua exemplos práticos"
)
# Critérios de validação mais complexos
research_task = Task(
description="Pesquise tendências de IA para 2025",
expected_output="Um relatório abrangente de pesquisa",
agent=research_agent,
guardrail="Garanta que cada descoberta inclua uma fonte confiável e seja respaldada por dados recentes de 2024-2025"
)
```
#### Usando Guardrails de String em YAML
```yaml
research_task:
description: Pesquise os últimos desenvolvimentos em IA
expected_output: Uma lista de 10 bullet points sobre IA
agent: researcher
guardrail: garanta que cada bullet point contenha no mínimo 100 palavras
validation_task:
description: Valide os achados da pesquisa
expected_output: Um relatório de validação
agent: validator
guardrail: confirme que todas as fontes são de publicações respeitáveis e publicadas nos últimos 2 anos
```
#### Como Funcionam os Guardrails de String
Quando você fornece um guardrail de string, o CrewAI automaticamente:
1. Cria uma instância `LLMGuardrail` usando a string como critério de validação
2. Usa o LLM do agente da tarefa para alimentar a validação
3. Cria um agente temporário de validação que verifica a saída contra seus critérios
4. Retorna feedback detalhado se a validação falhar
Esta abordagem é ideal quando você quer usar linguagem natural para descrever regras de validação sem escrever funções de validação customizadas.
### Classe LLMGuardrail
A classe `LLMGuardrail` é o mecanismo subjacente que alimenta os guardrails baseados em string. Você também pode usá-la diretamente para maior controle avançado:
```python Code
from crewai import Task
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.llm import LLM
# Crie um LLMGuardrail customizado com LLM específico
custom_guardrail = LLMGuardrail(
description="Garanta que a resposta contenha exatamente 5 bullet points com citações adequadas",
llm=LLM(model="gpt-4o-mini")
)
task = Task(
description="Pesquise medidas de segurança em IA",
expected_output="Uma análise detalhada com bullet points",
agent=research_agent,
guardrail=custom_guardrail
)
```
**Nota**: Quando você usa um guardrail de string, o CrewAI automaticamente cria uma instância `LLMGuardrail` usando o LLM do agente da sua tarefa. Usar `LLMGuardrail` diretamente lhe dá mais controle sobre o processo de validação e seleção de LLM.
### Melhores Práticas de Tratamento de Erros
@@ -902,26 +825,7 @@ task = Task(
)
```
#### Use uma abordagem no-code para validação
```python Code
from crewai import Task
task = Task(
description="Gerar dados em JSON",
expected_output="Objeto JSON válido",
guardrail="Garanta que a resposta é um objeto JSON válido"
)
```
#### Usando YAML
```yaml
research_task:
...
guardrail: garanta que cada bullet tenha no mínimo 100 palavras
...
```
```python Code
@CrewBase
@@ -1037,21 +941,87 @@ task = Task(
## Criando Diretórios ao Salvar Arquivos
Agora é possível especificar se uma tarefa deve criar diretórios ao salvar sua saída em arquivo. Isso é útil para organizar outputs e garantir que os caminhos estejam corretos.
O parâmetro `create_directory` controla se o CrewAI deve criar automaticamente diretórios ao salvar saídas de tarefas em arquivos. Este recurso é particularmente útil para organizar outputs e garantir que os caminhos de arquivos estejam estruturados corretamente, especialmente ao trabalhar com hierarquias de projetos complexas.
### Comportamento Padrão
Por padrão, `create_directory=True`, o que significa que o CrewAI criará automaticamente qualquer diretório ausente no caminho do arquivo de saída:
```python Code
# ...
save_output_task = Task(
description='Salve o resumo das notícias de IA em um arquivo',
expected_output='Arquivo salvo com sucesso',
agent=research_agent,
tools=[file_save_tool],
output_file='outputs/ai_news_summary.txt',
create_directory=True
# Comportamento padrão - diretórios são criados automaticamente
report_task = Task(
description='Gerar um relatório abrangente de análise de mercado',
expected_output='Uma análise detalhada de mercado com gráficos e insights',
agent=analyst_agent,
output_file='reports/2025/market_analysis.md', # Cria 'reports/2025/' se não existir
markdown=True
)
```
#...
### Desabilitando a Criação de Diretórios
Se você quiser evitar a criação automática de diretórios e garantir que o diretório já exista, defina `create_directory=False`:
```python Code
# Modo estrito - o diretório já deve existir
strict_output_task = Task(
description='Salvar dados críticos que requerem infraestrutura existente',
expected_output='Dados salvos em localização pré-configurada',
agent=data_agent,
output_file='secure/vault/critical_data.json',
create_directory=False # Gerará RuntimeError se 'secure/vault/' não existir
)
```
### Configuração YAML
Você também pode configurar este comportamento em suas definições de tarefas YAML:
```yaml tasks.yaml
analysis_task:
description: >
Gerar análise financeira trimestral
expected_output: >
Um relatório financeiro abrangente com insights trimestrais
agent: financial_analyst
output_file: reports/quarterly/q4_2024_analysis.pdf
create_directory: true # Criar automaticamente o diretório 'reports/quarterly/'
audit_task:
description: >
Realizar auditoria de conformidade e salvar no diretório de auditoria existente
expected_output: >
Um relatório de auditoria de conformidade
agent: auditor
output_file: audit/compliance_report.md
create_directory: false # O diretório já deve existir
```
### Casos de Uso
**Criação Automática de Diretórios (`create_directory=True`):**
- Ambientes de desenvolvimento e prototipagem
- Geração dinâmica de relatórios com pastas baseadas em datas
- Fluxos de trabalho automatizados onde a estrutura de diretórios pode variar
- Aplicações multi-tenant com pastas específicas do usuário
**Gerenciamento Manual de Diretórios (`create_directory=False`):**
- Ambientes de produção com controles rígidos do sistema de arquivos
- Aplicações sensíveis à segurança onde diretórios devem ser pré-configurados
- Sistemas com requisitos específicos de permissão
- Ambientes de conformidade onde a criação de diretórios é auditada
### Tratamento de Erros
Quando `create_directory=False` e o diretório não existe, o CrewAI gerará um `RuntimeError`:
```python Code
try:
result = crew.kickoff()
except RuntimeError as e:
# Tratar erro de diretório ausente
print(f"Falha na criação do diretório: {e}")
# Criar diretório manualmente ou usar local alternativo
```
Veja o vídeo abaixo para aprender como utilizar saídas estruturadas no CrewAI:

View File

@@ -0,0 +1,241 @@
"""
Example demonstrating how to use the human input event streaming feature.
This example shows how to:
1. Start the human input event server
2. Connect to WebSocket/SSE/polling endpoints
3. Handle human input events in real-time
4. Integrate with crew execution
"""
import asyncio
import json
import threading
import time
from typing import Optional
try:
import websockets
import httpx
from crewai.server.human_input_server import HumanInputServer
DEPENDENCIES_AVAILABLE = True
except ImportError:
DEPENDENCIES_AVAILABLE = False
from crewai import Agent, Task, Crew
from crewai.llm import LLM
def start_event_server(port: int = 8000, api_key: Optional[str] = None):
"""Start the human input event server in a separate thread"""
if not DEPENDENCIES_AVAILABLE:
print("Server dependencies not available. Install with: pip install crewai[server]")
return None
server = HumanInputServer(host="localhost", port=port, api_key=api_key)
def run_server():
server.start()
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
time.sleep(2)
print(f"Human input event server started on http://localhost:{port}")
return server
async def websocket_client_example(execution_id: str, api_key: Optional[str] = None):
"""Example WebSocket client for receiving human input events"""
if not DEPENDENCIES_AVAILABLE:
print("WebSocket dependencies not available")
return
uri = f"ws://localhost:8000/ws/human-input/{execution_id}"
if api_key:
uri += f"?token={api_key}"
try:
async with websockets.connect(uri) as websocket:
print(f"Connected to WebSocket for execution {execution_id}")
async for message in websocket:
event_data = json.loads(message)
print(f"Received WebSocket event: {event_data['type']}")
if event_data['type'] == 'human_input_required':
print(f"Human input required for task: {event_data.get('task_id')}")
print(f"Prompt: {event_data.get('prompt')}")
print(f"Context: {event_data.get('context')}")
elif event_data['type'] == 'human_input_completed':
print(f"Human input completed: {event_data.get('human_feedback')}")
except Exception as e:
print(f"WebSocket error: {e}")
async def sse_client_example(execution_id: str, api_key: Optional[str] = None):
"""Example SSE client for receiving human input events"""
if not DEPENDENCIES_AVAILABLE:
print("SSE dependencies not available")
return
url = f"http://localhost:8000/events/human-input/{execution_id}"
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
try:
async with httpx.AsyncClient() as client:
async with client.stream("GET", url, headers=headers) as response:
print(f"Connected to SSE for execution {execution_id}")
async for line in response.aiter_lines():
if line.startswith("data: "):
event_data = json.loads(line[6:])
if event_data.get('type') != 'heartbeat':
print(f"Received SSE event: {event_data['type']}")
if event_data['type'] == 'human_input_required':
print(f"Human input required for task: {event_data.get('task_id')}")
print(f"Prompt: {event_data.get('prompt')}")
elif event_data['type'] == 'human_input_completed':
print(f"Human input completed: {event_data.get('human_feedback')}")
except Exception as e:
print(f"SSE error: {e}")
async def polling_client_example(execution_id: str, api_key: Optional[str] = None):
"""Example polling client for receiving human input events"""
if not DEPENDENCIES_AVAILABLE:
print("Polling dependencies not available")
return
url = f"http://localhost:8000/poll/human-input/{execution_id}"
headers = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
last_event_id = None
try:
async with httpx.AsyncClient() as client:
print(f"Starting polling for execution {execution_id}")
while True:
params = {}
if last_event_id:
params["last_event_id"] = last_event_id
response = await client.get(url, headers=headers, params=params)
data = response.json()
for event in data.get("events", []):
print(f"Received polling event: {event['type']}")
if event['type'] == 'human_input_required':
print(f"Human input required for task: {event.get('task_id')}")
print(f"Prompt: {event.get('prompt')}")
elif event['type'] == 'human_input_completed':
print(f"Human input completed: {event.get('human_feedback')}")
last_event_id = event.get('event_id')
await asyncio.sleep(2)
except Exception as e:
print(f"Polling error: {e}")
def create_sample_crew():
"""Create a sample crew that requires human input"""
llm = LLM(model="gpt-4o-mini")
agent = Agent(
role="Research Assistant",
goal="Help with research tasks and get human feedback",
backstory="You are a helpful research assistant that works with humans to complete tasks.",
llm=llm,
verbose=True
)
task = Task(
description="Research the latest trends in AI and provide a summary. Ask for human feedback on the findings.",
expected_output="A comprehensive summary of AI trends with human feedback incorporated.",
agent=agent,
human_input=True
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True
)
return crew
async def main():
"""Main example function"""
print("CrewAI Human Input Event Streaming Example")
print("=" * 50)
api_key = "demo-api-key"
server = start_event_server(port=8000, api_key=api_key)
if not server:
return
crew = create_sample_crew()
execution_id = str(crew.id)
print(f"Crew execution ID: {execution_id}")
print("\nStarting event stream clients...")
websocket_task = asyncio.create_task(
websocket_client_example(execution_id, api_key)
)
sse_task = asyncio.create_task(
sse_client_example(execution_id, api_key)
)
polling_task = asyncio.create_task(
polling_client_example(execution_id, api_key)
)
await asyncio.sleep(1)
print("\nStarting crew execution...")
print("Note: This will prompt for human input in the console.")
print("The event streams above will also receive the events in real-time.")
def run_crew():
try:
result = crew.kickoff()
print(f"\nCrew execution completed: {result}")
except Exception as e:
print(f"Crew execution error: {e}")
crew_thread = threading.Thread(target=run_crew)
crew_thread.start()
await asyncio.sleep(30)
websocket_task.cancel()
sse_task.cancel()
polling_task.cancel()
crew_thread.join(timeout=5)
print("\nExample completed!")
if __name__ == "__main__":
if DEPENDENCIES_AVAILABLE:
asyncio.run(main())
else:
print("Dependencies not available. Install with: pip install crewai[server]")
print("This example requires FastAPI, uvicorn, websockets, and httpx.")

View File

@@ -11,7 +11,7 @@ dependencies = [
# Core Dependencies
"pydantic>=2.4.2",
"openai>=1.13.3",
"litellm==1.72.6",
"litellm==1.74.3",
"instructor>=1.3.3",
# Text Processing
"pdfplumber>=0.11.4",
@@ -48,7 +48,7 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.55.0"]
tools = ["crewai-tools~=0.59.0"]
embeddings = [
"tiktoken~=0.8.0"
]
@@ -69,6 +69,11 @@ docling = [
aisuite = [
"aisuite>=0.1.10",
]
server = [
"fastapi>=0.104.0",
"uvicorn>=0.24.0",
"websockets>=12.0",
]
[tool.uv]
dev-dependencies = [
@@ -86,6 +91,10 @@ dev-dependencies = [
"pytest-timeout>=2.3.1",
"pytest-xdist>=3.6.1",
"pytest-split>=0.9.0",
"fastapi>=0.104.0",
"uvicorn>=0.24.0",
"websockets>=12.0",
"httpx>=0.25.0",
]
[project.scripts]

View File

@@ -54,7 +54,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.148.0"
__version__ = "0.152.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -1,4 +1,5 @@
import time
import uuid
from typing import TYPE_CHECKING
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
@@ -8,6 +9,8 @@ from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.printer import Printer
from crewai.utilities.events.event_listener import event_listener
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.task_events import HumanInputRequiredEvent, HumanInputCompletedEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -126,6 +129,42 @@ class CrewAgentExecutorMixin:
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input with mode-appropriate messaging."""
event_id = str(uuid.uuid4())
execution_id = getattr(self.crew, 'id', None) if self.crew else None
crew_id = str(execution_id) if execution_id else None
task_id = str(getattr(self.task, 'id', None)) if self.task else None
agent_id = str(getattr(self.agent, 'id', None)) if self.agent else None
if self.crew and getattr(self.crew, "_train", False):
prompt_text = (
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process."
)
else:
prompt_text = (
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Please follow these guidelines:\n"
" - If you are happy with the result, simply hit Enter without typing anything.\n"
" - Otherwise, provide specific improvement requests.\n"
" - You can provide multiple rounds of feedback until satisfied."
)
crewai_event_bus.emit(
self,
HumanInputRequiredEvent(
execution_id=execution_id,
crew_id=crew_id,
task_id=task_id,
agent_id=agent_id,
prompt=prompt_text,
context=final_answer,
event_id=event_id,
reason_flags={"ambiguity": True, "missing_field": False}
)
)
event_listener.formatter.pause_live_updates()
try:
@@ -133,7 +172,6 @@ class CrewAgentExecutorMixin:
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt = (
"\n\n=====\n"
@@ -142,7 +180,6 @@ class CrewAgentExecutorMixin:
"Please provide detailed feedback about the result quality and reasoning process.\n"
"=====\n"
)
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt = (
"\n\n=====\n"
@@ -158,6 +195,19 @@ class CrewAgentExecutorMixin:
response = input()
if response.strip() != "":
self._printer.print(content="\nProcessing your feedback...", color="cyan")
crewai_event_bus.emit(
self,
HumanInputCompletedEvent(
execution_id=execution_id,
crew_id=crew_id,
task_id=task_id,
agent_id=agent_id,
event_id=event_id,
human_feedback=response
)
)
return response
finally:
event_listener.formatter.resume_live_updates()

View File

@@ -120,11 +120,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
raise
except Exception as e:
handle_unknown_error(self._printer, e)
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
else:
raise e
raise
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)

View File

@@ -3,6 +3,7 @@ from typing import Optional
import click
from crewai.cli.config import Settings
from crewai.cli.settings.main import SettingsCommand
from crewai.cli.add_crew_to_flow import add_crew_to_flow
from crewai.cli.create_crew import create_crew
from crewai.cli.create_flow import create_flow
@@ -227,7 +228,7 @@ def update():
@crewai.command()
def login():
"""Sign Up/Login to CrewAI Enterprise."""
Settings().clear()
Settings().clear_user_settings()
AuthenticationCommand().login()
@@ -369,8 +370,8 @@ def org():
pass
@org.command()
def list():
@org.command("list")
def org_list():
"""List available organizations."""
org_command = OrganizationCommand()
org_command.list()
@@ -391,5 +392,34 @@ def current():
org_command.current()
@crewai.group()
def config():
"""CLI Configuration commands."""
pass
@config.command("list")
def config_list():
"""List all CLI configuration parameters."""
config_command = SettingsCommand()
config_command.list()
@config.command("set")
@click.argument("key")
@click.argument("value")
def config_set(key: str, value: str):
"""Set a CLI configuration parameter."""
config_command = SettingsCommand()
config_command.set(key, value)
@config.command("reset")
def config_reset():
"""Reset all CLI configuration parameters to default values."""
config_command = SettingsCommand()
config_command.reset_all_settings()
if __name__ == "__main__":
crewai()

View File

@@ -26,7 +26,7 @@ class PlusAPIMixin:
"Please sign up/login to CrewAI+ before using the CLI.",
style="bold red",
)
console.print("Run 'crewai signup' to sign up/login.", style="bold green")
console.print("Run 'crewai login' to sign up/login.", style="bold green")
raise SystemExit
def _validate_response(self, response: requests.Response) -> None:

View File

@@ -4,10 +4,47 @@ from typing import Optional
from pydantic import BaseModel, Field
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
DEFAULT_CONFIG_PATH = Path.home() / ".config" / "crewai" / "settings.json"
# Settings that are related to the user's account
USER_SETTINGS_KEYS = [
"tool_repository_username",
"tool_repository_password",
"org_name",
"org_uuid",
]
# Settings that are related to the CLI
CLI_SETTINGS_KEYS = [
"enterprise_base_url",
]
# Default values for CLI settings
DEFAULT_CLI_SETTINGS = {
"enterprise_base_url": DEFAULT_CREWAI_ENTERPRISE_URL,
}
# Readonly settings - cannot be set by the user
READONLY_SETTINGS_KEYS = [
"org_name",
"org_uuid",
]
# Hidden settings - not displayed by the 'list' command and cannot be set by the user
HIDDEN_SETTINGS_KEYS = [
"config_path",
"tool_repository_username",
"tool_repository_password",
]
class Settings(BaseModel):
enterprise_base_url: Optional[str] = Field(
default=DEFAULT_CREWAI_ENTERPRISE_URL,
description="Base URL of the CrewAI Enterprise instance",
)
tool_repository_username: Optional[str] = Field(
None, description="Username for interacting with the Tool Repository"
)
@@ -20,7 +57,7 @@ class Settings(BaseModel):
org_uuid: Optional[str] = Field(
None, description="UUID of the currently active organization"
)
config_path: Path = Field(default=DEFAULT_CONFIG_PATH, exclude=True)
config_path: Path = Field(default=DEFAULT_CONFIG_PATH, frozen=True, exclude=True)
def __init__(self, config_path: Path = DEFAULT_CONFIG_PATH, **data):
"""Load Settings from config path"""
@@ -37,9 +74,16 @@ class Settings(BaseModel):
merged_data = {**file_data, **data}
super().__init__(config_path=config_path, **merged_data)
def clear(self) -> None:
"""Clear all settings"""
self.config_path.unlink(missing_ok=True)
def clear_user_settings(self) -> None:
"""Clear all user settings"""
self._reset_user_settings()
self.dump()
def reset(self) -> None:
"""Reset all settings to default values"""
self._reset_user_settings()
self._reset_cli_settings()
self.dump()
def dump(self) -> None:
"""Save current settings to settings.json"""
@@ -52,3 +96,13 @@ class Settings(BaseModel):
updated_data = {**existing_data, **self.model_dump(exclude_unset=True)}
with self.config_path.open("w") as f:
json.dump(updated_data, f, indent=4)
def _reset_user_settings(self) -> None:
"""Reset all user settings to default values"""
for key in USER_SETTINGS_KEYS:
setattr(self, key, None)
def _reset_cli_settings(self) -> None:
"""Reset all CLI settings to default values"""
for key in CLI_SETTINGS_KEYS:
setattr(self, key, DEFAULT_CLI_SETTINGS[key])

View File

@@ -1,3 +1,5 @@
DEFAULT_CREWAI_ENTERPRISE_URL = "https://app.crewai.com"
ENV_VARS = {
"openai": [
{
@@ -320,5 +322,4 @@ DEFAULT_LLM_MODEL = "gpt-4o-mini"
JSON_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"
LITELLM_PARAMS = ["api_key", "api_base", "api_version"]

View File

@@ -1,4 +1,3 @@
from os import getenv
from typing import List, Optional
from urllib.parse import urljoin
@@ -6,6 +5,7 @@ import requests
from crewai.cli.config import Settings
from crewai.cli.version import get_crewai_version
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
class PlusAPI:
@@ -29,7 +29,10 @@ class PlusAPI:
settings = Settings()
if settings.org_uuid:
self.headers["X-Crewai-Organization-Id"] = settings.org_uuid
self.base_url = getenv("CREWAI_BASE_URL", "https://app.crewai.com")
self.base_url = (
str(settings.enterprise_base_url) or DEFAULT_CREWAI_ENTERPRISE_URL
)
def _make_request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
url = urljoin(self.base_url, endpoint)
@@ -108,7 +111,6 @@ class PlusAPI:
def create_crew(self, payload) -> requests.Response:
return self._make_request("POST", self.CREWS_RESOURCE, json=payload)
def get_organizations(self) -> requests.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)

View File

@@ -0,0 +1,67 @@
from rich.console import Console
from rich.table import Table
from crewai.cli.command import BaseCommand
from crewai.cli.config import Settings, READONLY_SETTINGS_KEYS, HIDDEN_SETTINGS_KEYS
from typing import Any
console = Console()
class SettingsCommand(BaseCommand):
"""A class to handle CLI configuration commands."""
def __init__(self, settings_kwargs: dict[str, Any] = {}):
super().__init__()
self.settings = Settings(**settings_kwargs)
def list(self) -> None:
"""List all CLI configuration parameters."""
table = Table(title="CrewAI CLI Configuration")
table.add_column("Setting", style="cyan", no_wrap=True)
table.add_column("Value", style="green")
table.add_column("Description", style="yellow")
# Add all settings to the table
for field_name, field_info in Settings.model_fields.items():
if field_name in HIDDEN_SETTINGS_KEYS:
# Do not display hidden settings
continue
current_value = getattr(self.settings, field_name)
description = field_info.description or "No description available"
display_value = (
str(current_value) if current_value is not None else "Not set"
)
table.add_row(field_name, display_value, description)
console.print(table)
def set(self, key: str, value: str) -> None:
"""Set a CLI configuration parameter."""
readonly_settings = READONLY_SETTINGS_KEYS + HIDDEN_SETTINGS_KEYS
if not hasattr(self.settings, key) or key in readonly_settings:
console.print(
f"Error: Unknown or readonly configuration key '{key}'",
style="bold red",
)
console.print("Available keys:", style="yellow")
for field_name in Settings.model_fields.keys():
if field_name not in readonly_settings:
console.print(f" - {field_name}", style="yellow")
raise SystemExit(1)
setattr(self.settings, key, value)
self.settings.dump()
console.print(f"Successfully set '{key}' to '{value}'", style="bold green")
def reset_all_settings(self) -> None:
"""Reset all CLI configuration parameters to default values."""
self.settings.reset()
console.print(
"Successfully reset all configuration parameters to default values. It is recommended to run [bold yellow]'crewai login'[/bold yellow] to re-authenticate.",
style="bold green",
)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0,<1.0.0"
"crewai[tools]>=0.152.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0,<1.0.0",
"crewai[tools]>=0.152.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.148.0"
"crewai[tools]>=0.152.0"
]
[tool.crewai]

View File

@@ -161,7 +161,7 @@ class Crew(FlowTrackable, BaseModel):
)
user_memory: Optional[InstanceOf[UserMemory]] = Field(
default=None,
description="An instance of the UserMemory to be used by the Crew to store/fetch memories of a specific user.",
description="DEPRECATED: Will be removed in version 0.156.0 or on 2025-08-04, whichever comes first. Use external_memory instead.",
)
external_memory: Optional[InstanceOf[ExternalMemory]] = Field(
default=None,
@@ -327,7 +327,7 @@ class Crew(FlowTrackable, BaseModel):
self._short_term_memory = self.short_term_memory
self._entity_memory = self.entity_memory
# UserMemory is gonna to be deprecated in the future, but we have to initialize a default value for now
# UserMemory will be removed in version 0.156.0 or on 2025-08-04, whichever comes first
self._user_memory = None
if self.memory:
@@ -1255,6 +1255,7 @@ class Crew(FlowTrackable, BaseModel):
if self.external_memory:
copied_data["external_memory"] = self.external_memory.model_copy(deep=True)
if self.user_memory:
# DEPRECATED: UserMemory will be removed in version 0.156.0 or on 2025-08-04
copied_data["user_memory"] = self.user_memory.model_copy(deep=True)
copied_data.pop("agents", None)

View File

@@ -436,6 +436,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
name: Optional[str] = None
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -473,7 +474,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
@@ -769,7 +770,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
)
@@ -792,7 +793,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
result=final_output,
),
)
@@ -834,7 +835,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
@@ -856,7 +857,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
state=self._copy_state(),
result=result,
),
@@ -869,7 +870,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
@@ -1076,7 +1077,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
plot_flow(self, filename)

View File

@@ -1,55 +0,0 @@
from abc import ABC, abstractmethod
from typing import List
import numpy as np
class BaseEmbedder(ABC):
"""
Abstract base class for text embedding models
"""
@abstractmethod
def embed_chunks(self, chunks: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of text chunks
Args:
chunks: List of text chunks to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_texts(self, texts: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of texts
Args:
texts: List of texts to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_text(self, text: str) -> np.ndarray:
"""
Generate embedding for a single text
Args:
text: Text to embed
Returns:
Embedding array
"""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Get the dimension of the embeddings"""
pass

View File

@@ -13,7 +13,7 @@ from chromadb.api.types import OneOrMany
from chromadb.config import Settings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger

View File

@@ -62,7 +62,6 @@ from crewai.utilities.agent_utils import (
render_text_description_and_args,
)
from crewai.utilities.converter import generate_model_description
from crewai.utilities.crew_pydantic_output_parser import clean_json_from_text
from crewai.utilities.events.agent_events import (
AgentLogsExecutionEvent,
LiteAgentExecutionCompletedEvent,
@@ -356,8 +355,8 @@ class LiteAgent(FlowTrackable, BaseModel):
formatted_result: Optional[BaseModel] = None
if self.response_format:
try:
cleaned_output = clean_json_from_text(agent_finish.output)
result = self.response_format.model_validate_json(cleaned_output)
# Cast to BaseModel to ensure type safety
result = self.response_format.model_validate_json(agent_finish.output)
if isinstance(result, BaseModel):
formatted_result = result
except Exception as e:
@@ -623,4 +622,4 @@ class LiteAgent(FlowTrackable, BaseModel):
def _append_message(self, text: str, role: str = "assistant") -> None:
"""Append a message to the message list with the given role."""
self._messages.append(format_message_for_llm(text, role=role))
self._messages.append(format_message_for_llm(text, role=role))

View File

@@ -59,6 +59,7 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
load_dotenv()
litellm.suppress_debug_info = True
class FilteredStream(io.TextIOBase):
_lock = None
@@ -76,9 +77,7 @@ class FilteredStream(io.TextIOBase):
# Skip common noisy LiteLLM banners and any other lines that contain "litellm"
if (
"give feedback / get help" in lower_s
or "litellm.info:" in lower_s
or "litellm" in lower_s
"litellm.info:" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
):
return 0
@@ -760,7 +759,7 @@ class LLM(BaseLLM):
available_functions: Optional[Dict[str, Any]] = None,
from_task: Optional[Any] = None,
from_agent: Optional[Any] = None,
) -> str:
) -> str | Any:
"""Handle a non-streaming response from the LLM.
Args:
@@ -784,13 +783,11 @@ class LLM(BaseLLM):
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase
raise LLMContextLengthExceededException(str(e))
# --- 2) Extract response message and content
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
text_response = response_message.content or ""
# --- 3) Handle callbacks with usage info
if callbacks and len(callbacks) > 0:
for callback in callbacks:
@@ -803,21 +800,22 @@ class LLM(BaseLLM):
start_time=0,
end_time=0,
)
# --- 4) Check for tool calls
tool_calls = getattr(response_message, "tool_calls", [])
# --- 5) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
# --- 5) If no tool calls or no available functions, return the text response directly as long as there is a text response
if (not tool_calls or not available_functions) and text_response:
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
return text_response
# --- 6) If there is no text response, no available functions, but there are tool calls, return the tool calls
elif tool_calls and not available_functions and not text_response:
return tool_calls
# --- 6) Handle tool calls if present
# --- 7) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result
# --- 7) If tool call handling didn't return a result, emit completion event and return text response
# --- 8) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(response=text_response, call_type=LLMCallType.LLM_CALL, from_task=from_task, from_agent=from_agent, messages=params["messages"])
return text_response
@@ -952,22 +950,18 @@ class LLM(BaseLLM):
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
message["role"] = "assistant"
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 7) Make the completion call and handle response
if self.stream:
return self._handle_streaming_response(
@@ -1010,7 +1004,6 @@ class LLM(BaseLLM):
self,
event=LLMCallFailedEvent(error=str(e), from_task=from_task, from_agent=from_agent),
)
logging.error(f"LiteLLM call failed: {str(e)}")
raise
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None, messages: str | list[dict[str, Any]] | None = None):
@@ -1079,6 +1072,15 @@ class LLM(BaseLLM):
messages.append({"role": "user", "content": "Please continue."})
return messages
# TODO: Remove this code after merging PR https://github.com/BerriAI/litellm/pull/10917
# Ollama doesn't supports last message to be 'assistant'
if "ollama" in self.model.lower() and messages and messages[-1]["role"] == "assistant":
messages = messages.copy()
messages.append(
{"role": "user", "content": ""}
)
return messages
# Handle Anthropic models
if not self.is_anthropic:
return messages

View File

@@ -108,6 +108,7 @@ class ContextualMemory:
def _fetch_user_context(self, query: str) -> str:
"""
DEPRECATED: Will be removed in version 0.156.0 or on 2025-08-04, whichever comes first.
Fetches and formats relevant user information from User Memory.
Args:
query (str): The search query to find relevant user memories.

View File

@@ -1,10 +1,10 @@
import os
from typing import Any, Dict, List
from collections import defaultdict
from mem0 import Memory, MemoryClient
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.memory.storage.interface import Storage
from crewai.utilities.chromadb import sanitize_collection_name
MAX_AGENT_ID_LENGTH_MEM0 = 255
@@ -13,47 +13,162 @@ class Mem0Storage(Storage):
"""
Extends Storage to handle embedding and searching across entities using Mem0.
"""
def __init__(self, type, crew=None, config=None):
super().__init__()
supported_types = ["user", "short_term", "long_term", "entities", "external"]
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: "
+ ", ".join(supported_types)
)
self._validate_type(type)
self.memory_type = type
self.crew = crew
self.config = config or {}
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.memory_config = self.config or getattr(crew, "memory_config", {}) or {}
# User ID is required for user memory type "user" since it's used as a unique identifier for the user.
user_id = self._get_user_id()
if type == "user" and not user_id:
# TODO: Memory config will be removed in the future the config will be passed as a parameter
self.config = config or getattr(crew, "memory_config", {}).get("config", {}) or {}
self._validate_user_id()
self._extract_config_values()
self._initialize_memory()
def _validate_type(self, type):
supported_types = {"user", "short_term", "long_term", "entities", "external"}
if type not in supported_types:
raise ValueError(
f"Invalid type '{type}' for Mem0Storage. Must be one of: {', '.join(supported_types)}"
)
def _validate_user_id(self):
if self.memory_type == "user" and not self.config.get("user_id", ""):
raise ValueError("User ID is required for user memory type")
# API key in memory config overrides the environment variable
config = self._get_config()
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
mem0_local_config = config.get("local_mem0_config")
def _extract_config_values(self):
cfg = self.config
self.mem0_run_id = cfg.get("run_id")
self.includes = cfg.get("includes")
self.excludes = cfg.get("excludes")
self.custom_categories = cfg.get("custom_categories")
self.infer = cfg.get("infer", True)
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
def _initialize_memory(self):
api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY")
org_id = self.config.get("org_id")
project_id = self.config.get("project_id")
local_config = self.config.get("local_mem0_config")
if api_key:
self.memory = (
MemoryClient(api_key=api_key, org_id=org_id, project_id=project_id)
if org_id and project_id
else MemoryClient(api_key=api_key)
)
if self.custom_categories:
self.memory.update_project(custom_categories=self.custom_categories)
else:
if mem0_local_config and len(mem0_local_config):
self.memory = Memory.from_config(mem0_local_config)
else:
self.memory = Memory()
self.memory = (
Memory.from_config(local_config)
if local_config and len(local_config)
else Memory()
)
def _create_filter_for_search(self):
"""
Returns:
dict: A filter dictionary containing AND conditions for querying data.
- Includes user_id and agent_id if both are present.
- Includes user_id if only user_id is present.
- Includes agent_id if only agent_id is present.
- Includes run_id if memory_type is 'short_term' and mem0_run_id is present.
"""
filter = defaultdict(list)
if self.memory_type == "short_term" and self.mem0_run_id:
filter["AND"].append({"run_id": self.mem0_run_id})
else:
user_id = self.config.get("user_id", "")
agent_id = self.config.get("agent_id", "")
if user_id and agent_id:
filter["OR"].append({"user_id": user_id})
filter["OR"].append({"agent_id": agent_id})
elif user_id:
filter["AND"].append({"user_id": user_id})
elif agent_id:
filter["AND"].append({"agent_id": agent_id})
return filter
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self.config.get("user_id", "")
assistant_message = [{"role" : "assistant","content" : value}]
base_metadata = {
"short_term": "short_term",
"long_term": "long_term",
"entities": "entity",
"external": "external"
}
# Shared base params
params: dict[str, Any] = {
"metadata": {"type": base_metadata[self.memory_type], **metadata},
"infer": self.infer
}
# MemoryClient-specific overrides
if isinstance(self.memory, MemoryClient):
params["includes"] = self.includes
params["excludes"] = self.excludes
params["output_format"] = "v1.1"
params["version"] = "v2"
if self.memory_type == "short_term" and self.mem0_run_id:
params["run_id"] = self.mem0_run_id
if user_id:
params["user_id"] = user_id
if agent_id := self.config.get("agent_id", self._get_agent_name()):
params["agent_id"] = agent_id
self.memory.add(assistant_message, **params)
def search(self,query: str,limit: int = 3,score_threshold: float = 0.35) -> List[Any]:
params = {
"query": query,
"limit": limit,
"version": "v2",
"output_format": "v1.1"
}
if user_id := self.config.get("user_id", ""):
params["user_id"] = user_id
memory_type_map = {
"short_term": {"type": "short_term"},
"long_term": {"type": "long_term"},
"entities": {"type": "entity"},
"external": {"type": "external"},
}
if self.memory_type in memory_type_map:
params["metadata"] = memory_type_map[self.memory_type]
if self.memory_type == "short_term":
params["run_id"] = self.mem0_run_id
# Discard the filters for now since we create the filters
# automatically when the crew is created.
params["filters"] = self._create_filter_for_search()
params['threshold'] = score_threshold
if isinstance(self.memory, Memory):
del params["metadata"], params["version"], params['output_format']
if params.get("run_id"):
del params["run_id"]
results = self.memory.search(**params)
return [r for r in results["results"]]
def reset(self):
if self.memory:
self.memory.reset()
def _sanitize_role(self, role: str) -> str:
"""
@@ -61,77 +176,6 @@ class Mem0Storage(Storage):
"""
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
user_id = self._get_user_id()
agent_name = self._get_agent_name()
assistant_message = [{"role" : "assistant","content" : value}]
params = None
if self.memory_type == "short_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "short_term", **metadata},
}
elif self.memory_type == "long_term":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "long_term", **metadata},
}
elif self.memory_type == "entities":
params = {
"agent_id": agent_name,
"infer": False,
"metadata": {"type": "entity", **metadata},
}
elif self.memory_type == "external":
params = {
"user_id": user_id,
"agent_id": agent_name,
"metadata": {"type": "external", **metadata},
}
if params:
if isinstance(self.memory, MemoryClient):
params["output_format"] = "v1.1"
self.memory.add(assistant_message, **params)
def search(
self,
query: str,
limit: int = 3,
score_threshold: float = 0.35,
) -> List[Any]:
params = {"query": query, "limit": limit, "output_format": "v1.1"}
if user_id := self._get_user_id():
params["user_id"] = user_id
agent_name = self._get_agent_name()
if self.memory_type == "short_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "short_term"}
elif self.memory_type == "long_term":
params["agent_id"] = agent_name
params["metadata"] = {"type": "long_term"}
elif self.memory_type == "entities":
params["agent_id"] = agent_name
params["metadata"] = {"type": "entity"}
elif self.memory_type == "external":
params["agent_id"] = agent_name
params["metadata"] = {"type": "external"}
# Discard the filters for now since we create the filters
# automatically when the crew is created.
if isinstance(self.memory, Memory):
del params["metadata"], params["output_format"]
results = self.memory.search(**params)
return [r for r in results["results"] if r["score"] >= score_threshold]
def _get_user_id(self) -> str:
return self._get_config().get("user_id", "")
def _get_agent_name(self) -> str:
if not self.crew:
return ""
@@ -139,11 +183,4 @@ class Mem0Storage(Storage):
agents = self.crew.agents
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return sanitize_collection_name(name=agents,max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)
def _get_config(self) -> Dict[str, Any]:
return self.config or getattr(self, "memory_config", {}).get("config", {}) or {}
def reset(self):
if self.memory:
self.memory.reset()
return sanitize_collection_name(name=agents, max_collection_length=MAX_AGENT_ID_LENGTH_MEM0)

View File

@@ -7,8 +7,8 @@ import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.rag.storage.base_rag_storage import BaseRAGStorage
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import create_persistent_client
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path

View File

@@ -14,7 +14,8 @@ class UserMemory(Memory):
def __init__(self, crew=None):
warnings.warn(
"UserMemory is deprecated and will be removed in a future version. "
"UserMemory is deprecated and will be removed in version 0.156.0 "
"or on 2025-08-04, whichever comes first. "
"Please use ExternalMemory instead.",
DeprecationWarning,
stacklevel=2,

View File

@@ -1,8 +1,16 @@
import warnings
from typing import Any, Dict, Optional
class UserMemoryItem:
def __init__(self, data: Any, user: str, metadata: Optional[Dict[str, Any]] = None):
warnings.warn(
"UserMemoryItem is deprecated and will be removed in version 0.156.0 "
"or on 2025-08-04, whichever comes first. "
"Please use ExternalMemory instead.",
DeprecationWarning,
stacklevel=2,
)
self.data = data
self.user = user
self.metadata = metadata if metadata is not None else {}

View File

@@ -0,0 +1 @@
"""RAG (Retrieval-Augmented Generation) infrastructure for CrewAI."""

View File

@@ -0,0 +1 @@
"""Embedding components for RAG infrastructure."""

View File

@@ -38,7 +38,14 @@ class EmbeddingConfigurator:
f"Unsupported embedding provider: {provider}, supported providers: {list(self.embedding_functions.keys())}"
)
embedding_function = self.embedding_functions[provider]
try:
embedding_function = self.embedding_functions[provider]
except ImportError as e:
missing_package = str(e).split()[-1]
raise ImportError(
f"{missing_package} is not installed. Please install it with: pip install {missing_package}"
)
return (
embedding_function(config)
if provider == "custom"

View File

@@ -0,0 +1 @@
"""Storage components for RAG infrastructure."""

View File

@@ -0,0 +1,4 @@
from .human_input_server import HumanInputServer
from .event_stream_manager import EventStreamManager
__all__ = ["HumanInputServer", "EventStreamManager"]

View File

@@ -0,0 +1,147 @@
import asyncio
import json
import uuid
from typing import Dict, List, Optional, Set
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.task_events import HumanInputRequiredEvent, HumanInputCompletedEvent
class EventStreamManager:
"""Manages event streams for human input events"""
def __init__(self):
self._websocket_connections: Dict[str, Set] = {}
self._sse_connections: Dict[str, Set] = {}
self._polling_events: Dict[str, List] = {}
self._event_listeners_registered = False
def register_event_listeners(self):
"""Register event listeners for human input events"""
if self._event_listeners_registered:
return
@crewai_event_bus.on(HumanInputRequiredEvent)
def handle_human_input_required(event: HumanInputRequiredEvent):
self._broadcast_event(event)
@crewai_event_bus.on(HumanInputCompletedEvent)
def handle_human_input_completed(event: HumanInputCompletedEvent):
self._broadcast_event(event)
self._event_listeners_registered = True
def add_websocket_connection(self, execution_id: str, websocket):
"""Add a WebSocket connection for an execution"""
if execution_id not in self._websocket_connections:
self._websocket_connections[execution_id] = set()
self._websocket_connections[execution_id].add(websocket)
def remove_websocket_connection(self, execution_id: str, websocket):
"""Remove a WebSocket connection"""
if execution_id in self._websocket_connections:
self._websocket_connections[execution_id].discard(websocket)
if not self._websocket_connections[execution_id]:
del self._websocket_connections[execution_id]
def add_sse_connection(self, execution_id: str, queue):
"""Add an SSE connection for an execution"""
if execution_id not in self._sse_connections:
self._sse_connections[execution_id] = set()
self._sse_connections[execution_id].add(queue)
def remove_sse_connection(self, execution_id: str, queue):
"""Remove an SSE connection"""
if execution_id in self._sse_connections:
self._sse_connections[execution_id].discard(queue)
if not self._sse_connections[execution_id]:
del self._sse_connections[execution_id]
def get_polling_events(self, execution_id: str, last_event_id: Optional[str] = None) -> List[Dict]:
"""Get events for polling clients"""
if execution_id not in self._polling_events:
return []
events = self._polling_events[execution_id]
if last_event_id:
try:
last_index = next(
i for i, event in enumerate(events)
if event.get("event_id") == last_event_id
)
return events[last_index + 1:]
except StopIteration:
pass
return events
def _broadcast_event(self, event):
"""Broadcast event to all relevant connections"""
execution_id = getattr(event, 'execution_id', None)
if not execution_id:
return
event_data = self._serialize_event(event)
self._broadcast_websocket(execution_id, event_data)
self._broadcast_sse(execution_id, event_data)
self._store_polling_event(execution_id, event_data)
def _serialize_event(self, event) -> Dict:
"""Serialize event to dictionary format"""
event_dict = event.to_json()
if not event_dict.get("event_id"):
event_dict["event_id"] = str(uuid.uuid4())
return event_dict
def _broadcast_websocket(self, execution_id: str, event_data: Dict):
"""Broadcast event to WebSocket connections"""
if execution_id not in self._websocket_connections:
return
connections_to_remove = set()
for websocket in self._websocket_connections[execution_id]:
try:
asyncio.create_task(websocket.send_text(json.dumps(event_data)))
except Exception:
connections_to_remove.add(websocket)
for websocket in connections_to_remove:
self.remove_websocket_connection(execution_id, websocket)
def _broadcast_sse(self, execution_id: str, event_data: Dict):
"""Broadcast event to SSE connections"""
if execution_id not in self._sse_connections:
return
connections_to_remove = set()
for queue in self._sse_connections[execution_id]:
try:
queue.put_nowait(event_data)
except Exception:
connections_to_remove.add(queue)
for queue in connections_to_remove:
self.remove_sse_connection(execution_id, queue)
def _store_polling_event(self, execution_id: str, event_data: Dict):
"""Store event for polling clients"""
if execution_id not in self._polling_events:
self._polling_events[execution_id] = []
self._polling_events[execution_id].append(event_data)
if len(self._polling_events[execution_id]) > 100:
self._polling_events[execution_id] = self._polling_events[execution_id][-100:]
def cleanup_execution(self, execution_id: str):
"""Clean up all connections and events for an execution"""
self._websocket_connections.pop(execution_id, None)
self._sse_connections.pop(execution_id, None)
self._polling_events.pop(execution_id, None)
event_stream_manager = EventStreamManager()

View File

@@ -0,0 +1,122 @@
import asyncio
import json
from typing import Optional
from datetime import datetime, timezone
try:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, Query
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import uvicorn
FASTAPI_AVAILABLE = True
except ImportError:
FASTAPI_AVAILABLE = False
from .event_stream_manager import event_stream_manager
class HumanInputServer:
"""HTTP server for human input event streaming"""
def __init__(self, host: str = "localhost", port: int = 8000, api_key: Optional[str] = None):
if not FASTAPI_AVAILABLE:
raise ImportError(
"FastAPI dependencies not available. Install with: pip install fastapi uvicorn websockets"
)
self.host = host
self.port = port
self.api_key = api_key
self.app = FastAPI(title="CrewAI Human Input Event Stream API")
self.security = HTTPBearer() if api_key else None
self._setup_routes()
event_stream_manager.register_event_listeners()
def _setup_routes(self):
"""Setup FastAPI routes"""
@self.app.websocket("/ws/human-input/{execution_id}")
async def websocket_endpoint(websocket: WebSocket, execution_id: str):
if self.api_key:
token = websocket.query_params.get("token")
if not token or token != self.api_key:
await websocket.close(code=4001, reason="Unauthorized")
return
await websocket.accept()
event_stream_manager.add_websocket_connection(execution_id, websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
event_stream_manager.remove_websocket_connection(execution_id, websocket)
@self.app.get("/events/human-input/{execution_id}")
async def sse_endpoint(execution_id: str, credentials: Optional[HTTPAuthorizationCredentials] = Depends(self.security) if self.security else None):
if self.api_key and (not credentials or credentials.credentials != self.api_key):
raise HTTPException(status_code=401, detail="Unauthorized")
async def event_stream():
event_queue = asyncio.Queue()
event_stream_manager.add_sse_connection(execution_id, event_queue)
try:
while True:
try:
event_data = await asyncio.wait_for(event_queue.get(), timeout=30.0)
yield f"data: {json.dumps(event_data)}\n\n"
except asyncio.TimeoutError:
yield "data: {\"type\": \"heartbeat\"}\n\n"
except asyncio.CancelledError:
pass
finally:
event_stream_manager.remove_sse_connection(execution_id, event_queue)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
}
)
@self.app.get("/poll/human-input/{execution_id}")
async def polling_endpoint(
execution_id: str,
last_event_id: Optional[str] = Query(None),
credentials: Optional[HTTPAuthorizationCredentials] = Depends(self.security) if self.security else None
):
if self.api_key and (not credentials or credentials.credentials != self.api_key):
raise HTTPException(status_code=401, detail="Unauthorized")
events = event_stream_manager.get_polling_events(execution_id, last_event_id)
return {"events": events}
@self.app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.now(timezone.utc).isoformat()}
async def start_async(self):
"""Start the server asynchronously"""
config = uvicorn.Config(
self.app,
host=self.host,
port=self.port,
log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
def start(self):
"""Start the server synchronously"""
uvicorn.run(
self.app,
host=self.host,
port=self.port,
log_level="info"
)

View File

@@ -10,7 +10,6 @@ from .rpm_controller import RPMController
from .exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from .embedding_configurator import EmbeddingConfigurator
__all__ = [
"Converter",
@@ -24,5 +23,4 @@ __all__ = [
"RPMController",
"YamlParser",
"LLMContextLengthExceededException",
"EmbeddingConfigurator",
]

View File

@@ -157,10 +157,6 @@ def get_llm_response(
from_agent=from_agent,
)
except Exception as e:
printer.print(
content=f"Error during LLM call: {e}",
color="red",
)
raise e
if not answer:
printer.print(
@@ -232,12 +228,17 @@ def handle_unknown_error(printer: Any, exception: Exception) -> None:
printer: Printer instance for output
exception: The exception that occurred
"""
error_message = str(exception)
if "litellm" in error_message:
return
printer.print(
content="An unknown error occurred. Please check the details below.",
color="red",
)
printer.print(
content=f"Error details: {exception}",
content=f"Error details: {error_message}",
color="red",
)

View File

@@ -8,22 +8,6 @@ from crewai.agents.parser import OutputParserException
"""Parser for converting text outputs into Pydantic models."""
def clean_json_from_text(text: str) -> str:
"""Extract and clean JSON from text that may contain markdown or trailing characters."""
text = text.replace("```", "").replace("json", "")
json_pattern = r"\{(?:[^{}]|(?R))*\}"
matches = regex.finditer(json_pattern, text)
for match in matches:
try:
json_obj = json.loads(match.group())
json_obj = json.dumps(json_obj)
return str(json_obj)
except json.JSONDecodeError:
continue
return text
class CrewPydanticOutputParser:
"""Parses text outputs into specified Pydantic models."""
@@ -46,4 +30,18 @@ class CrewPydanticOutputParser:
raise OutputParserException(error=msg)
def _transform_in_valid_json(self, text) -> str:
return clean_json_from_text(text)
text = text.replace("```", "").replace("json", "")
json_pattern = r"\{(?:[^{}]|(?R))*\}"
matches = regex.finditer(json_pattern, text)
for match in matches:
try:
# Attempt to parse the matched string as JSON
json_obj = json.loads(match.group())
# Return the first successfully parsed JSON object
json_obj = json.dumps(json_obj)
return str(json_obj)
except json.JSONDecodeError:
# If parsing fails, skip to the next match
continue
return text

View File

@@ -1,6 +1,5 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
@@ -9,7 +8,7 @@ from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=datetime.now)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"

View File

@@ -35,6 +35,8 @@ from .llm_guardrail_events import (
LLMGuardrailStartedEvent,
)
from .task_events import (
HumanInputCompletedEvent,
HumanInputRequiredEvent,
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
@@ -85,6 +87,8 @@ EventTypes = Union[
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,
HumanInputRequiredEvent,
HumanInputCompletedEvent,
FlowStartedEvent,
FlowFinishedEvent,
MethodExecutionStartedEvent,

View File

@@ -82,3 +82,35 @@ class TaskEvaluationEvent(BaseEvent):
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class HumanInputRequiredEvent(BaseEvent):
"""Event emitted when human input is required during task execution"""
type: str = "human_input_required"
execution_id: Optional[str] = None
crew_id: Optional[str] = None
task_id: Optional[str] = None
agent_id: Optional[str] = None
prompt: Optional[str] = None
context: Optional[str] = None
reason_flags: Optional[dict] = None
event_id: Optional[str] = None
def __init__(self, **data):
super().__init__(**data)
class HumanInputCompletedEvent(BaseEvent):
"""Event emitted when human input is completed"""
type: str = "human_input_completed"
execution_id: Optional[str] = None
crew_id: Optional[str] = None
task_id: Optional[str] = None
agent_id: Optional[str] = None
event_id: Optional[str] = None
human_feedback: Optional[str] = None
def __init__(self, **data):
super().__init__(**data)

View File

@@ -2010,7 +2010,6 @@ def test_crew_agent_executor_litellm_auth_error():
from litellm.exceptions import AuthenticationError
from crewai.agents.tools_handler import ToolsHandler
from crewai.utilities import Printer
# Create an agent and executor
agent = Agent(
@@ -2043,7 +2042,6 @@ def test_crew_agent_executor_litellm_auth_error():
# Mock the LLM call to raise AuthenticationError
with (
patch.object(LLM, "call") as mock_llm_call,
patch.object(Printer, "print") as mock_printer,
pytest.raises(AuthenticationError) as exc_info,
):
mock_llm_call.side_effect = AuthenticationError(
@@ -2057,13 +2055,6 @@ def test_crew_agent_executor_litellm_auth_error():
}
)
# Verify error handling messages
error_message = f"Error during LLM call: {str(mock_llm_call.side_effect)}"
mock_printer.assert_any_call(
content=error_message,
color="red",
)
# Verify the call was only made once (no retries)
mock_llm_call.assert_called_once()

View File

@@ -4,7 +4,12 @@ import tempfile
import unittest
from pathlib import Path
from crewai.cli.config import Settings
from crewai.cli.config import (
Settings,
USER_SETTINGS_KEYS,
CLI_SETTINGS_KEYS,
DEFAULT_CLI_SETTINGS,
)
class TestSettings(unittest.TestCase):
@@ -52,6 +57,30 @@ class TestSettings(unittest.TestCase):
self.assertEqual(settings.tool_repository_username, "new_user")
self.assertEqual(settings.tool_repository_password, "file_pass")
def test_clear_user_settings(self):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
settings = Settings(config_path=self.config_path, **user_settings)
settings.clear_user_settings()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
def test_reset_settings(self):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
cli_settings = {key: f"value_for_{key}" for key in CLI_SETTINGS_KEYS}
settings = Settings(
config_path=self.config_path, **user_settings, **cli_settings
)
settings.reset()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
for key in cli_settings.keys():
self.assertEqual(getattr(settings, key), DEFAULT_CLI_SETTINGS[key])
def test_dump_new_settings(self):
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"

View File

@@ -6,7 +6,7 @@ from click.testing import CliRunner
import requests
from crewai.cli.organization.main import OrganizationCommand
from crewai.cli.cli import list, switch, current
from crewai.cli.cli import org_list, switch, current
@pytest.fixture
@@ -16,44 +16,44 @@ def runner():
@pytest.fixture
def org_command():
with patch.object(OrganizationCommand, '__init__', return_value=None):
with patch.object(OrganizationCommand, "__init__", return_value=None):
command = OrganizationCommand()
yield command
@pytest.fixture
def mock_settings():
with patch('crewai.cli.organization.main.Settings') as mock_settings_class:
with patch("crewai.cli.organization.main.Settings") as mock_settings_class:
mock_settings_instance = MagicMock()
mock_settings_class.return_value = mock_settings_instance
yield mock_settings_instance
@patch('crewai.cli.cli.OrganizationCommand')
@patch("crewai.cli.cli.OrganizationCommand")
def test_org_list_command(mock_org_command_class, runner):
mock_org_instance = MagicMock()
mock_org_command_class.return_value = mock_org_instance
result = runner.invoke(list)
result = runner.invoke(org_list)
assert result.exit_code == 0
mock_org_command_class.assert_called_once()
mock_org_instance.list.assert_called_once()
@patch('crewai.cli.cli.OrganizationCommand')
@patch("crewai.cli.cli.OrganizationCommand")
def test_org_switch_command(mock_org_command_class, runner):
mock_org_instance = MagicMock()
mock_org_command_class.return_value = mock_org_instance
result = runner.invoke(switch, ['test-id'])
result = runner.invoke(switch, ["test-id"])
assert result.exit_code == 0
mock_org_command_class.assert_called_once()
mock_org_instance.switch.assert_called_once_with('test-id')
mock_org_instance.switch.assert_called_once_with("test-id")
@patch('crewai.cli.cli.OrganizationCommand')
@patch("crewai.cli.cli.OrganizationCommand")
def test_org_current_command(mock_org_command_class, runner):
mock_org_instance = MagicMock()
mock_org_command_class.return_value = mock_org_instance
@@ -67,18 +67,18 @@ def test_org_current_command(mock_org_command_class, runner):
class TestOrganizationCommand(unittest.TestCase):
def setUp(self):
with patch.object(OrganizationCommand, '__init__', return_value=None):
with patch.object(OrganizationCommand, "__init__", return_value=None):
self.org_command = OrganizationCommand()
self.org_command.plus_api_client = MagicMock()
@patch('crewai.cli.organization.main.console')
@patch('crewai.cli.organization.main.Table')
@patch("crewai.cli.organization.main.console")
@patch("crewai.cli.organization.main.Table")
def test_list_organizations_success(self, mock_table, mock_console):
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = [
{"name": "Org 1", "uuid": "org-123"},
{"name": "Org 2", "uuid": "org-456"}
{"name": "Org 2", "uuid": "org-456"},
]
self.org_command.plus_api_client = MagicMock()
self.org_command.plus_api_client.get_organizations.return_value = mock_response
@@ -89,16 +89,14 @@ class TestOrganizationCommand(unittest.TestCase):
self.org_command.plus_api_client.get_organizations.assert_called_once()
mock_table.assert_called_once_with(title="Your Organizations")
mock_table.return_value.add_column.assert_has_calls([
call("Name", style="cyan"),
call("ID", style="green")
])
mock_table.return_value.add_row.assert_has_calls([
call("Org 1", "org-123"),
call("Org 2", "org-456")
])
mock_table.return_value.add_column.assert_has_calls(
[call("Name", style="cyan"), call("ID", style="green")]
)
mock_table.return_value.add_row.assert_has_calls(
[call("Org 1", "org-123"), call("Org 2", "org-456")]
)
@patch('crewai.cli.organization.main.console')
@patch("crewai.cli.organization.main.console")
def test_list_organizations_empty(self, mock_console):
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
@@ -110,33 +108,32 @@ class TestOrganizationCommand(unittest.TestCase):
self.org_command.plus_api_client.get_organizations.assert_called_once()
mock_console.print.assert_called_once_with(
"You don't belong to any organizations yet.",
style="yellow"
"You don't belong to any organizations yet.", style="yellow"
)
@patch('crewai.cli.organization.main.console')
@patch("crewai.cli.organization.main.console")
def test_list_organizations_api_error(self, mock_console):
self.org_command.plus_api_client = MagicMock()
self.org_command.plus_api_client.get_organizations.side_effect = requests.exceptions.RequestException("API Error")
self.org_command.plus_api_client.get_organizations.side_effect = (
requests.exceptions.RequestException("API Error")
)
with pytest.raises(SystemExit):
self.org_command.list()
self.org_command.plus_api_client.get_organizations.assert_called_once()
mock_console.print.assert_called_once_with(
"Failed to retrieve organization list: API Error",
style="bold red"
"Failed to retrieve organization list: API Error", style="bold red"
)
@patch('crewai.cli.organization.main.console')
@patch('crewai.cli.organization.main.Settings')
@patch("crewai.cli.organization.main.console")
@patch("crewai.cli.organization.main.Settings")
def test_switch_organization_success(self, mock_settings_class, mock_console):
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = [
{"name": "Org 1", "uuid": "org-123"},
{"name": "Test Org", "uuid": "test-id"}
{"name": "Test Org", "uuid": "test-id"},
]
self.org_command.plus_api_client = MagicMock()
self.org_command.plus_api_client.get_organizations.return_value = mock_response
@@ -151,17 +148,16 @@ class TestOrganizationCommand(unittest.TestCase):
assert mock_settings_instance.org_name == "Test Org"
assert mock_settings_instance.org_uuid == "test-id"
mock_console.print.assert_called_once_with(
"Successfully switched to Test Org (test-id)",
style="bold green"
"Successfully switched to Test Org (test-id)", style="bold green"
)
@patch('crewai.cli.organization.main.console')
@patch("crewai.cli.organization.main.console")
def test_switch_organization_not_found(self, mock_console):
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_response.json.return_value = [
{"name": "Org 1", "uuid": "org-123"},
{"name": "Org 2", "uuid": "org-456"}
{"name": "Org 2", "uuid": "org-456"},
]
self.org_command.plus_api_client = MagicMock()
self.org_command.plus_api_client.get_organizations.return_value = mock_response
@@ -170,12 +166,11 @@ class TestOrganizationCommand(unittest.TestCase):
self.org_command.plus_api_client.get_organizations.assert_called_once()
mock_console.print.assert_called_once_with(
"Organization with id 'non-existent-id' not found.",
style="bold red"
"Organization with id 'non-existent-id' not found.", style="bold red"
)
@patch('crewai.cli.organization.main.console')
@patch('crewai.cli.organization.main.Settings')
@patch("crewai.cli.organization.main.console")
@patch("crewai.cli.organization.main.Settings")
def test_current_organization_with_org(self, mock_settings_class, mock_console):
mock_settings_instance = MagicMock()
mock_settings_instance.org_name = "Test Org"
@@ -186,12 +181,11 @@ class TestOrganizationCommand(unittest.TestCase):
self.org_command.plus_api_client.get_organizations.assert_not_called()
mock_console.print.assert_called_once_with(
"Currently logged in to organization Test Org (test-id)",
style="bold green"
"Currently logged in to organization Test Org (test-id)", style="bold green"
)
@patch('crewai.cli.organization.main.console')
@patch('crewai.cli.organization.main.Settings')
@patch("crewai.cli.organization.main.console")
@patch("crewai.cli.organization.main.Settings")
def test_current_organization_without_org(self, mock_settings_class, mock_console):
mock_settings_instance = MagicMock()
mock_settings_instance.org_uuid = None
@@ -201,16 +195,14 @@ class TestOrganizationCommand(unittest.TestCase):
assert mock_console.print.call_count == 3
mock_console.print.assert_any_call(
"You're not currently logged in to any organization.",
style="yellow"
"You're not currently logged in to any organization.", style="yellow"
)
@patch('crewai.cli.organization.main.console')
@patch("crewai.cli.organization.main.console")
def test_list_organizations_unauthorized(self, mock_console):
mock_response = MagicMock()
mock_http_error = requests.exceptions.HTTPError(
"401 Client Error: Unauthorized",
response=MagicMock(status_code=401)
"401 Client Error: Unauthorized", response=MagicMock(status_code=401)
)
mock_response.raise_for_status.side_effect = mock_http_error
@@ -221,15 +213,14 @@ class TestOrganizationCommand(unittest.TestCase):
self.org_command.plus_api_client.get_organizations.assert_called_once()
mock_console.print.assert_called_once_with(
"You are not logged in to any organization. Use 'crewai login' to login.",
style="bold red"
style="bold red",
)
@patch('crewai.cli.organization.main.console')
@patch("crewai.cli.organization.main.console")
def test_switch_organization_unauthorized(self, mock_console):
mock_response = MagicMock()
mock_http_error = requests.exceptions.HTTPError(
"401 Client Error: Unauthorized",
response=MagicMock(status_code=401)
"401 Client Error: Unauthorized", response=MagicMock(status_code=401)
)
mock_response.raise_for_status.side_effect = mock_http_error
@@ -240,5 +231,5 @@ class TestOrganizationCommand(unittest.TestCase):
self.org_command.plus_api_client.get_organizations.assert_called_once()
mock_console.print.assert_called_once_with(
"You are not logged in to any organization. Use 'crewai login' to login.",
style="bold red"
style="bold red",
)

View File

@@ -1,8 +1,8 @@
import os
import unittest
from unittest.mock import MagicMock, patch, ANY
from crewai.cli.plus_api import PlusAPI
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
class TestPlusAPI(unittest.TestCase):
@@ -30,29 +30,41 @@ class TestPlusAPI(unittest.TestCase):
)
self.assertEqual(response, mock_response)
def assert_request_with_org_id(self, mock_make_request, method: str, endpoint: str, **kwargs):
def assert_request_with_org_id(
self, mock_make_request, method: str, endpoint: str, **kwargs
):
mock_make_request.assert_called_once_with(
method, f"https://app.crewai.com{endpoint}", headers={'Authorization': ANY, 'Content-Type': ANY, 'User-Agent': ANY, 'X-Crewai-Version': ANY, 'X-Crewai-Organization-Id': self.org_uuid}, **kwargs
method,
f"{DEFAULT_CREWAI_ENTERPRISE_URL}{endpoint}",
headers={
"Authorization": ANY,
"Content-Type": ANY,
"User-Agent": ANY,
"X-Crewai-Version": ANY,
"X-Crewai-Organization-Id": self.org_uuid,
},
**kwargs,
)
@patch("crewai.cli.plus_api.Settings")
@patch("requests.Session.request")
def test_login_to_tool_repository_with_org_uuid(self, mock_make_request, mock_settings_class):
def test_login_to_tool_repository_with_org_uuid(
self, mock_make_request, mock_settings_class
):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.login_to_tool_repository()
self.assert_request_with_org_id(
mock_make_request,
'POST',
'/crewai_plus/api/v1/tools/login'
mock_make_request, "POST", "/crewai_plus/api/v1/tools/login"
)
self.assertEqual(response, mock_response)
@@ -66,28 +78,27 @@ class TestPlusAPI(unittest.TestCase):
"GET", "/crewai_plus/api/v1/agents/test_agent_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai.cli.plus_api.Settings")
@patch("requests.Session.request")
def test_get_agent_with_org_uuid(self, mock_make_request, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.get_agent("test_agent_handle")
self.assert_request_with_org_id(
mock_make_request,
"GET",
"/crewai_plus/api/v1/agents/test_agent_handle"
mock_make_request, "GET", "/crewai_plus/api/v1/agents/test_agent_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai.cli.plus_api.PlusAPI._make_request")
def test_get_tool(self, mock_make_request):
mock_response = MagicMock()
@@ -98,12 +109,13 @@ class TestPlusAPI(unittest.TestCase):
"GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai.cli.plus_api.Settings")
@patch("requests.Session.request")
def test_get_tool_with_org_uuid(self, mock_make_request, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
@@ -115,9 +127,7 @@ class TestPlusAPI(unittest.TestCase):
response = self.api.get_tool("test_tool_handle")
self.assert_request_with_org_id(
mock_make_request,
"GET",
"/crewai_plus/api/v1/tools/test_tool_handle"
mock_make_request, "GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@@ -147,12 +157,13 @@ class TestPlusAPI(unittest.TestCase):
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai.cli.plus_api.Settings")
@patch("requests.Session.request")
def test_publish_tool_with_org_uuid(self, mock_make_request, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
@@ -160,7 +171,7 @@ class TestPlusAPI(unittest.TestCase):
# Set up mock response
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = True
version = "1.0.0"
@@ -180,12 +191,9 @@ class TestPlusAPI(unittest.TestCase):
"description": description,
"available_exports": None,
}
self.assert_request_with_org_id(
mock_make_request,
"POST",
"/crewai_plus/api/v1/tools",
json=expected_params
mock_make_request, "POST", "/crewai_plus/api/v1/tools", json=expected_params
)
self.assertEqual(response, mock_response)
@@ -311,8 +319,11 @@ class TestPlusAPI(unittest.TestCase):
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch.dict(os.environ, {"CREWAI_BASE_URL": "https://custom-url.com/api"})
def test_custom_base_url(self):
@patch("crewai.cli.plus_api.Settings")
def test_custom_base_url(self, mock_settings_class):
mock_settings = MagicMock()
mock_settings.enterprise_base_url = "https://custom-url.com/api"
mock_settings_class.return_value = mock_settings
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,

View File

@@ -0,0 +1,91 @@
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch, MagicMock, call
from crewai.cli.settings.main import SettingsCommand
from crewai.cli.config import (
Settings,
USER_SETTINGS_KEYS,
CLI_SETTINGS_KEYS,
DEFAULT_CLI_SETTINGS,
HIDDEN_SETTINGS_KEYS,
READONLY_SETTINGS_KEYS,
)
import shutil
class TestSettingsCommand(unittest.TestCase):
def setUp(self):
self.test_dir = Path(tempfile.mkdtemp())
self.config_path = self.test_dir / "settings.json"
self.settings = Settings(config_path=self.config_path)
self.settings_command = SettingsCommand(
settings_kwargs={"config_path": self.config_path}
)
def tearDown(self):
shutil.rmtree(self.test_dir)
@patch("crewai.cli.settings.main.console")
@patch("crewai.cli.settings.main.Table")
def test_list_settings(self, mock_table_class, mock_console):
mock_table_instance = MagicMock()
mock_table_class.return_value = mock_table_instance
self.settings_command.list()
# Tests that the table is created skipping hidden settings
mock_table_instance.add_row.assert_has_calls(
[
call(
field_name,
getattr(self.settings, field_name) or "Not set",
field_info.description,
)
for field_name, field_info in Settings.model_fields.items()
if field_name not in HIDDEN_SETTINGS_KEYS
]
)
# Tests that the table is printed
mock_console.print.assert_called_once_with(mock_table_instance)
def test_set_valid_keys(self):
valid_keys = Settings.model_fields.keys() - (
READONLY_SETTINGS_KEYS + HIDDEN_SETTINGS_KEYS
)
for key in valid_keys:
test_value = f"some_value_for_{key}"
self.settings_command.set(key, test_value)
self.assertEqual(getattr(self.settings_command.settings, key), test_value)
def test_set_invalid_key(self):
with self.assertRaises(SystemExit):
self.settings_command.set("invalid_key", "value")
def test_set_readonly_keys(self):
for key in READONLY_SETTINGS_KEYS:
with self.assertRaises(SystemExit):
self.settings_command.set(key, "some_readonly_key_value")
def test_set_hidden_keys(self):
for key in HIDDEN_SETTINGS_KEYS:
with self.assertRaises(SystemExit):
self.settings_command.set(key, "some_hidden_key_value")
def test_reset_all_settings(self):
for key in USER_SETTINGS_KEYS + CLI_SETTINGS_KEYS:
setattr(self.settings_command.settings, key, f"custom_value_for_{key}")
self.settings_command.settings.dump()
self.settings_command.reset_all_settings()
print(USER_SETTINGS_KEYS)
for key in USER_SETTINGS_KEYS:
self.assertEqual(getattr(self.settings_command.settings, key), None)
for key in CLI_SETTINGS_KEYS:
self.assertEqual(
getattr(self.settings_command.settings, key), DEFAULT_CLI_SETTINGS[key]
)

View File

@@ -755,3 +755,15 @@ def test_multiple_routers_from_same_trigger():
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)
def test_flow_name():
class MyFlow(Flow):
name = "MyFlow"
@start()
def start(self):
return "Hello, world!"
flow = MyFlow()
assert flow.name == "MyFlow"

View File

@@ -684,3 +684,30 @@ def test_llm_call_when_stop_is_unsupported_when_additional_drop_params_is_provid
assert "Retrying LLM call without the unsupported 'stop'" in caplog.text
assert isinstance(result, str)
assert "Paris" in result
@pytest.fixture
def ollama_llm():
return LLM(model="ollama/llama3.2:3b")
def test_ollama_appends_dummy_user_message_when_last_is_assistant(ollama_llm):
original_messages = [
{"role": "user", "content": "Hi there"},
{"role": "assistant", "content": "Hello!"},
]
formatted = ollama_llm._format_messages_for_provider(original_messages)
assert len(formatted) == len(original_messages) + 1
assert formatted[-1]["role"] == "user"
assert formatted[-1]["content"] == ""
def test_ollama_does_not_modify_when_last_is_user(ollama_llm):
original_messages = [
{"role": "user", "content": "Tell me a joke."},
]
formatted = ollama_llm._format_messages_for_provider(original_messages)
assert formatted == original_messages

View File

@@ -0,0 +1,210 @@
from unittest.mock import MagicMock, patch
from crewai.server.event_stream_manager import EventStreamManager
from crewai.utilities.events.task_events import HumanInputRequiredEvent
class TestEventStreamManager:
"""Test the event stream manager"""
def setup_method(self):
"""Setup test environment"""
self.manager = EventStreamManager()
self.manager._websocket_connections.clear()
self.manager._sse_connections.clear()
self.manager._polling_events.clear()
def test_websocket_connection_management(self):
"""Test WebSocket connection management"""
execution_id = "test-execution"
websocket1 = MagicMock()
websocket2 = MagicMock()
self.manager.add_websocket_connection(execution_id, websocket1)
assert execution_id in self.manager._websocket_connections
assert websocket1 in self.manager._websocket_connections[execution_id]
self.manager.add_websocket_connection(execution_id, websocket2)
assert len(self.manager._websocket_connections[execution_id]) == 2
self.manager.remove_websocket_connection(execution_id, websocket1)
assert websocket1 not in self.manager._websocket_connections[execution_id]
assert websocket2 in self.manager._websocket_connections[execution_id]
self.manager.remove_websocket_connection(execution_id, websocket2)
assert execution_id not in self.manager._websocket_connections
def test_sse_connection_management(self):
"""Test SSE connection management"""
execution_id = "test-execution"
queue1 = MagicMock()
queue2 = MagicMock()
self.manager.add_sse_connection(execution_id, queue1)
assert execution_id in self.manager._sse_connections
assert queue1 in self.manager._sse_connections[execution_id]
self.manager.add_sse_connection(execution_id, queue2)
assert len(self.manager._sse_connections[execution_id]) == 2
self.manager.remove_sse_connection(execution_id, queue1)
assert queue1 not in self.manager._sse_connections[execution_id]
assert queue2 in self.manager._sse_connections[execution_id]
self.manager.remove_sse_connection(execution_id, queue2)
assert execution_id not in self.manager._sse_connections
def test_polling_events_storage(self):
"""Test polling events storage and retrieval"""
execution_id = "test-execution"
event1 = {"event_id": "event-1", "type": "test", "data": "test1"}
event2 = {"event_id": "event-2", "type": "test", "data": "test2"}
self.manager._store_polling_event(execution_id, event1)
self.manager._store_polling_event(execution_id, event2)
events = self.manager.get_polling_events(execution_id)
assert len(events) == 2
assert events[0] == event1
assert events[1] == event2
def test_polling_events_with_last_event_id(self):
"""Test polling events retrieval with last_event_id"""
execution_id = "test-execution"
event1 = {"event_id": "event-1", "type": "test", "data": "test1"}
event2 = {"event_id": "event-2", "type": "test", "data": "test2"}
event3 = {"event_id": "event-3", "type": "test", "data": "test3"}
self.manager._store_polling_event(execution_id, event1)
self.manager._store_polling_event(execution_id, event2)
self.manager._store_polling_event(execution_id, event3)
events = self.manager.get_polling_events(execution_id, "event-1")
assert len(events) == 2
assert events[0] == event2
assert events[1] == event3
def test_polling_events_limit(self):
"""Test polling events storage limit"""
execution_id = "test-execution"
for i in range(105):
event = {"event_id": f"event-{i}", "type": "test", "data": f"test{i}"}
self.manager._store_polling_event(execution_id, event)
events = self.manager.get_polling_events(execution_id)
assert len(events) == 100
assert events[0]["event_id"] == "event-5"
assert events[-1]["event_id"] == "event-104"
def test_event_serialization(self):
"""Test event serialization"""
event = HumanInputRequiredEvent(
execution_id="test-execution",
crew_id="test-crew",
task_id="test-task",
prompt="Test prompt"
)
serialized = self.manager._serialize_event(event)
assert isinstance(serialized, dict)
assert serialized["type"] == "human_input_required"
assert serialized["execution_id"] == "test-execution"
assert "event_id" in serialized
def test_broadcast_websocket(self):
"""Test WebSocket broadcasting"""
execution_id = "test-execution"
websocket = MagicMock()
self.manager.add_websocket_connection(execution_id, websocket)
event_data = {"type": "test", "data": "test"}
with patch('asyncio.create_task') as mock_create_task:
self.manager._broadcast_websocket(execution_id, event_data)
mock_create_task.assert_called_once()
def test_broadcast_sse(self):
"""Test SSE broadcasting"""
execution_id = "test-execution"
queue = MagicMock()
self.manager.add_sse_connection(execution_id, queue)
event_data = {"type": "test", "data": "test"}
self.manager._broadcast_sse(execution_id, event_data)
queue.put_nowait.assert_called_once_with(event_data)
def test_broadcast_event(self):
"""Test complete event broadcasting"""
execution_id = "test-execution"
event = HumanInputRequiredEvent(
execution_id=execution_id,
crew_id="test-crew",
task_id="test-task",
prompt="Test prompt"
)
with patch.object(self.manager, '_broadcast_websocket') as mock_ws, \
patch.object(self.manager, '_broadcast_sse') as mock_sse, \
patch.object(self.manager, '_store_polling_event') as mock_poll:
self.manager._broadcast_event(event)
mock_ws.assert_called_once()
mock_sse.assert_called_once()
mock_poll.assert_called_once()
def test_cleanup_execution(self):
"""Test execution cleanup"""
execution_id = "test-execution"
websocket = MagicMock()
queue = MagicMock()
event = {"event_id": "event-1", "type": "test"}
self.manager.add_websocket_connection(execution_id, websocket)
self.manager.add_sse_connection(execution_id, queue)
self.manager._store_polling_event(execution_id, event)
assert execution_id in self.manager._websocket_connections
assert execution_id in self.manager._sse_connections
assert execution_id in self.manager._polling_events
self.manager.cleanup_execution(execution_id)
assert execution_id not in self.manager._websocket_connections
assert execution_id not in self.manager._sse_connections
assert execution_id not in self.manager._polling_events
def test_register_event_listeners(self):
"""Test event listener registration"""
with patch('crewai.utilities.events.crewai_event_bus.crewai_event_bus.on') as mock_on:
self.manager.register_event_listeners()
assert mock_on.call_count == 2
self.manager.register_event_listeners()
assert mock_on.call_count == 2
def test_broadcast_event_without_execution_id(self):
"""Test broadcasting event without execution_id"""
event = HumanInputRequiredEvent(
crew_id="test-crew",
task_id="test-task",
prompt="Test prompt"
)
with patch.object(self.manager, '_broadcast_websocket') as mock_ws, \
patch.object(self.manager, '_broadcast_sse') as mock_sse, \
patch.object(self.manager, '_store_polling_event') as mock_poll:
self.manager._broadcast_event(event)
mock_ws.assert_not_called()
mock_sse.assert_not_called()
mock_poll.assert_not_called()

View File

@@ -0,0 +1,136 @@
import pytest
try:
from fastapi.testclient import TestClient
from crewai.server.human_input_server import HumanInputServer
from crewai.server.event_stream_manager import event_stream_manager
from crewai.utilities.events.task_events import HumanInputRequiredEvent
FASTAPI_AVAILABLE = True
except ImportError:
FASTAPI_AVAILABLE = False
@pytest.mark.skipif(not FASTAPI_AVAILABLE, reason="FastAPI dependencies not available")
class TestHumanInputServer:
"""Test the human input server endpoints"""
def setup_method(self):
"""Setup test environment"""
self.server = HumanInputServer(host="localhost", port=8001, api_key="test-key")
self.client = TestClient(self.server.app)
event_stream_manager._websocket_connections.clear()
event_stream_manager._sse_connections.clear()
event_stream_manager._polling_events.clear()
def test_health_endpoint(self):
"""Test health check endpoint"""
response = self.client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
assert "timestamp" in data
def test_polling_endpoint_unauthorized(self):
"""Test polling endpoint without authentication"""
response = self.client.get("/poll/human-input/test-execution-id")
assert response.status_code == 401
def test_polling_endpoint_authorized(self):
"""Test polling endpoint with authentication"""
headers = {"Authorization": "Bearer test-key"}
response = self.client.get("/poll/human-input/test-execution-id", headers=headers)
assert response.status_code == 200
data = response.json()
assert "events" in data
assert isinstance(data["events"], list)
def test_polling_endpoint_with_events(self):
"""Test polling endpoint returns stored events"""
execution_id = "test-execution-id"
event = HumanInputRequiredEvent(
execution_id=execution_id,
crew_id="test-crew",
task_id="test-task",
agent_id="test-agent",
prompt="Test prompt",
context="Test context",
event_id="test-event-1"
)
event_stream_manager._store_polling_event(execution_id, event.to_json())
headers = {"Authorization": "Bearer test-key"}
response = self.client.get(f"/poll/human-input/{execution_id}", headers=headers)
assert response.status_code == 200
data = response.json()
assert len(data["events"]) == 1
assert data["events"][0]["type"] == "human_input_required"
assert data["events"][0]["execution_id"] == execution_id
def test_polling_endpoint_with_last_event_id(self):
"""Test polling endpoint with last_event_id parameter"""
execution_id = "test-execution-id"
event1 = HumanInputRequiredEvent(
execution_id=execution_id,
event_id="event-1"
)
event2 = HumanInputRequiredEvent(
execution_id=execution_id,
event_id="event-2"
)
event_stream_manager._store_polling_event(execution_id, event1.to_json())
event_stream_manager._store_polling_event(execution_id, event2.to_json())
headers = {"Authorization": "Bearer test-key"}
response = self.client.get(
f"/poll/human-input/{execution_id}?last_event_id=event-1",
headers=headers
)
assert response.status_code == 200
data = response.json()
assert len(data["events"]) == 1
assert data["events"][0]["event_id"] == "event-2"
def test_sse_endpoint_unauthorized(self):
"""Test SSE endpoint without authentication"""
response = self.client.get("/events/human-input/test-execution-id")
assert response.status_code == 401
def test_sse_endpoint_authorized(self):
"""Test SSE endpoint with authentication"""
headers = {"Authorization": "Bearer test-key"}
with self.client.stream("GET", "/events/human-input/test-execution-id", headers=headers) as response:
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
def test_websocket_endpoint_unauthorized(self):
"""Test WebSocket endpoint without authentication"""
with pytest.raises(Exception):
with self.client.websocket_connect("/ws/human-input/test-execution-id"):
pass
def test_websocket_endpoint_authorized(self):
"""Test WebSocket endpoint with authentication"""
with self.client.websocket_connect("/ws/human-input/test-execution-id?token=test-key") as websocket:
assert websocket is not None
def test_server_without_api_key(self):
"""Test server initialization without API key"""
server = HumanInputServer(host="localhost", port=8002)
client = TestClient(server.app)
response = client.get("/poll/human-input/test-execution-id")
assert response.status_code == 200
response = client.get("/events/human-input/test-execution-id")
assert response.status_code == 200
@pytest.mark.skipif(FASTAPI_AVAILABLE, reason="Testing import error handling")
def test_server_without_fastapi():
"""Test server initialization without FastAPI dependencies"""
with pytest.raises(ImportError, match="FastAPI dependencies not available"):
HumanInputServer()

View File

@@ -55,10 +55,11 @@ def mem0_storage_with_mocked_config(mock_mem0_memory):
}
# Instantiate the class with memory_config
# Parameters like run_id, includes, and excludes doesn't matter in Memory OSS
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {"user_id": "test_user", "local_mem0_config": config},
"config": {"user_id": "test_user", "local_mem0_config": config, "run_id": "my_run_id", "includes": "include1","excludes": "exclude1", "infer" : True},
}
)
@@ -95,6 +96,10 @@ def mem0_storage_with_memory_client_using_config_from_crew(mock_mem0_memory_clie
"api_key": "ABCDEFGH",
"org_id": "my_org_id",
"project_id": "my_project_id",
"run_id": "my_run_id",
"includes": "include1",
"excludes": "exclude1",
"infer": True
},
}
)
@@ -150,28 +155,75 @@ def test_mem0_storage_with_explict_config(
assert (
mem0_storage_with_memory_client_using_explictly_config.config == expected_config
)
assert (
mem0_storage_with_memory_client_using_explictly_config.memory_config
== expected_config
def test_mem0_storage_updates_project_with_custom_categories(mock_mem0_memory_client):
mock_mem0_memory_client.update_project = MagicMock()
new_categories = [
{"lifestyle_management_concerns": "Tracks daily routines, habits, hobbies and interests including cooking, time management and work-life balance"},
]
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {
"user_id": "test_user",
"api_key": "ABCDEFGH",
"org_id": "my_org_id",
"project_id": "my_project_id",
"custom_categories": new_categories,
},
}
)
with patch.object(MemoryClient, "__new__", return_value=mock_mem0_memory_client):
_ = Mem0Storage(type="short_term", crew=crew)
mock_mem0_memory_client.update_project.assert_called_once_with(
custom_categories=new_categories
)
def test_save_method_with_memory_oss(mem0_storage_with_mocked_config):
"""Test save method for different memory types"""
mem0_storage, _, _ = mem0_storage_with_mocked_config
mem0_storage.memory.add = MagicMock()
# Test short_term memory type (already set in fixture)
test_value = "This is a test memory"
test_metadata = {"key": "value"}
mem0_storage.save(test_value, test_metadata)
mem0_storage.memory.add.assert_called_once_with(
[{'role': 'assistant' , 'content': test_value}],
agent_id="Test_Agent",
infer=False,
[{"role": "assistant" , "content": test_value}],
infer=True,
metadata={"type": "short_term", "key": "value"},
run_id="my_run_id",
user_id="test_user",
agent_id='Test_Agent'
)
def test_save_method_with_multiple_agents(mem0_storage_with_mocked_config):
mem0_storage, _, _ = mem0_storage_with_mocked_config
mem0_storage.crew.agents = [MagicMock(role="Test Agent"), MagicMock(role="Test Agent 2"), MagicMock(role="Test Agent 3")]
mem0_storage.memory.add = MagicMock()
test_value = "This is a test memory"
test_metadata = {"key": "value"}
mem0_storage.save(test_value, test_metadata)
mem0_storage.memory.add.assert_called_once_with(
[{"role": "assistant" , "content": test_value}],
infer=True,
metadata={"type": "short_term", "key": "value"},
run_id="my_run_id",
user_id="test_user",
agent_id='Test_Agent_Test_Agent_2_Test_Agent_3'
)
@@ -179,19 +231,24 @@ def test_save_method_with_memory_client(mem0_storage_with_memory_client_using_co
"""Test save method for different memory types"""
mem0_storage = mem0_storage_with_memory_client_using_config_from_crew
mem0_storage.memory.add = MagicMock()
# Test short_term memory type (already set in fixture)
test_value = "This is a test memory"
test_metadata = {"key": "value"}
mem0_storage.save(test_value, test_metadata)
mem0_storage.memory.add.assert_called_once_with(
[{'role': 'assistant' , 'content': test_value}],
agent_id="Test_Agent",
infer=False,
infer=True,
metadata={"type": "short_term", "key": "value"},
output_format="v1.1"
version="v2",
run_id="my_run_id",
includes="include1",
excludes="exclude1",
output_format='v1.1',
user_id='test_user',
agent_id='Test_Agent'
)
@@ -204,13 +261,14 @@ def test_search_method_with_memory_oss(mem0_storage_with_mocked_config):
results = mem0_storage.search("test query", limit=5, score_threshold=0.5)
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
agent_id="Test_Agent",
user_id="test_user"
query="test query",
limit=5,
user_id="test_user",
filters={'AND': [{'run_id': 'my_run_id'}]},
threshold=0.5
)
assert len(results) == 1
assert len(results) == 2
assert results[0]["content"] == "Result 1"
@@ -223,13 +281,85 @@ def test_search_method_with_memory_client(mem0_storage_with_memory_client_using_
results = mem0_storage.search("test query", limit=5, score_threshold=0.5)
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
agent_id="Test_Agent",
query="test query",
limit=5,
metadata={"type": "short_term"},
user_id="test_user",
output_format='v1.1'
version='v2',
run_id="my_run_id",
output_format='v1.1',
filters={'AND': [{'run_id': 'my_run_id'}]},
threshold=0.5
)
assert len(results) == 1
assert len(results) == 2
assert results[0]["content"] == "Result 1"
def test_mem0_storage_default_infer_value(mock_mem0_memory_client):
"""Test that Mem0Storage sets infer=True by default for short_term memory."""
with patch.object(MemoryClient, "__new__", return_value=mock_mem0_memory_client):
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {
"user_id": "test_user",
"api_key": "ABCDEFGH"
},
}
)
mem0_storage = Mem0Storage(type="short_term", crew=crew)
assert mem0_storage.infer is True
def test_save_memory_using_agent_entity(mock_mem0_memory_client):
config = {
"agent_id": "agent-123",
}
mock_memory = MagicMock(spec=Memory)
with patch.object(Memory, "__new__", return_value=mock_memory):
mem0_storage = Mem0Storage(type="external", config=config)
mem0_storage.save("test memory", {"key": "value"})
mem0_storage.memory.add.assert_called_once_with(
[{'role': 'assistant' , 'content': 'test memory'}],
infer=True,
metadata={"type": "external", "key": "value"},
agent_id="agent-123",
)
def test_search_method_with_agent_entity():
mem0_storage = Mem0Storage(type="external", config={"agent_id": "agent-123"})
mock_results = {"results": [{"score": 0.9, "content": "Result 1"}, {"score": 0.4, "content": "Result 2"}]}
mem0_storage.memory.search = MagicMock(return_value=mock_results)
results = mem0_storage.search("test query", limit=5, score_threshold=0.5)
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
filters={"AND": [{"agent_id": "agent-123"}]},
threshold=0.5,
)
assert len(results) == 2
assert results[0]["content"] == "Result 1"
def test_search_method_with_agent_id_and_user_id():
mem0_storage = Mem0Storage(type="external", config={"agent_id": "agent-123", "user_id": "user-123"})
mock_results = {"results": [{"score": 0.9, "content": "Result 1"}, {"score": 0.4, "content": "Result 2"}]}
mem0_storage.memory.search = MagicMock(return_value=mock_results)
results = mem0_storage.search("test query", limit=5, score_threshold=0.5)
mem0_storage.memory.search.assert_called_once_with(
query="test query",
limit=5,
user_id='user-123',
filters={"OR": [{"user_id": "user-123"}, {"agent_id": "agent-123"}]},
threshold=0.5,
)
assert len(results) == 2
assert results[0]["content"] == "Result 1"

View File

@@ -0,0 +1,209 @@
import pytest
import uuid
from unittest.mock import patch, MagicMock
from crewai.utilities.events.task_events import HumanInputRequiredEvent, HumanInputCompletedEvent
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
class TestHumanInputEventIntegration:
"""Test integration between human input flow and event system"""
def setup_method(self):
"""Setup test environment"""
self.executor = CrewAgentExecutorMixin()
self.executor.crew = MagicMock()
self.executor.crew.id = str(uuid.uuid4())
self.executor.crew._train = False
self.executor.task = MagicMock()
self.executor.task.id = str(uuid.uuid4())
self.executor.agent = MagicMock()
self.executor.agent.id = str(uuid.uuid4())
self.executor._printer = MagicMock()
@patch('builtins.input', return_value='test feedback')
def test_human_input_emits_required_event(self, mock_input):
"""Test that human input emits HumanInputRequiredEvent"""
events_captured = []
def capture_event(event):
events_captured.append(event)
with patch.object(crewai_event_bus, 'emit', side_effect=capture_event):
result = self.executor._ask_human_input("Test result")
assert result == 'test feedback'
assert len(events_captured) == 2
required_event = events_captured[0][1]
completed_event = events_captured[1][1]
assert isinstance(required_event, HumanInputRequiredEvent)
assert isinstance(completed_event, HumanInputCompletedEvent)
assert required_event.execution_id == str(self.executor.crew.id)
assert required_event.crew_id == str(self.executor.crew.id)
assert required_event.task_id == str(self.executor.task.id)
assert required_event.agent_id == str(self.executor.agent.id)
assert "HUMAN FEEDBACK" in required_event.prompt
assert required_event.context == "Test result"
assert required_event.event_id is not None
assert completed_event.execution_id == str(self.executor.crew.id)
assert completed_event.human_feedback == 'test feedback'
assert completed_event.event_id == required_event.event_id
@patch('builtins.input', return_value='training feedback')
def test_training_mode_human_input_events(self, mock_input):
"""Test human input events in training mode"""
self.executor.crew._train = True
events_captured = []
def capture_event(event):
events_captured.append(event)
with patch.object(crewai_event_bus, 'emit', side_effect=capture_event):
result = self.executor._ask_human_input("Test result")
assert result == 'training feedback'
assert len(events_captured) == 2
required_event = events_captured[0][1]
assert isinstance(required_event, HumanInputRequiredEvent)
assert "TRAINING MODE" in required_event.prompt
@patch('builtins.input', return_value='')
def test_empty_feedback_events(self, mock_input):
"""Test events with empty feedback"""
events_captured = []
def capture_event(event):
events_captured.append(event)
with patch.object(crewai_event_bus, 'emit', side_effect=capture_event):
result = self.executor._ask_human_input("Test result")
assert result == ''
assert len(events_captured) == 2
completed_event = events_captured[1][1]
assert isinstance(completed_event, HumanInputCompletedEvent)
assert completed_event.human_feedback == ''
def test_event_payload_structure(self):
"""Test that event payload matches GitHub issue specification"""
event = HumanInputRequiredEvent(
execution_id="test-execution-id",
crew_id="test-crew-id",
task_id="test-task-id",
agent_id="test-agent-id",
prompt="Test prompt",
context="Test context",
reason_flags={"ambiguity": True, "missing_field": False},
event_id="test-event-id"
)
payload = event.to_json()
assert payload["type"] == "human_input_required"
assert payload["execution_id"] == "test-execution-id"
assert payload["crew_id"] == "test-crew-id"
assert payload["task_id"] == "test-task-id"
assert payload["agent_id"] == "test-agent-id"
assert payload["prompt"] == "Test prompt"
assert payload["context"] == "Test context"
assert payload["reason_flags"]["ambiguity"] is True
assert payload["reason_flags"]["missing_field"] is False
assert payload["event_id"] == "test-event-id"
assert "timestamp" in payload
def test_completed_event_payload_structure(self):
"""Test that completed event payload is correct"""
event = HumanInputCompletedEvent(
execution_id="test-execution-id",
crew_id="test-crew-id",
task_id="test-task-id",
agent_id="test-agent-id",
event_id="test-event-id",
human_feedback="User feedback"
)
payload = event.to_json()
assert payload["type"] == "human_input_completed"
assert payload["execution_id"] == "test-execution-id"
assert payload["crew_id"] == "test-crew-id"
assert payload["task_id"] == "test-task-id"
assert payload["agent_id"] == "test-agent-id"
assert payload["event_id"] == "test-event-id"
assert payload["human_feedback"] == "User feedback"
assert "timestamp" in payload
@patch('builtins.input', side_effect=KeyboardInterrupt("Test interrupt"))
def test_human_input_exception_handling(self, mock_input):
"""Test that events are still emitted even if input is interrupted"""
events_captured = []
def capture_event(event):
events_captured.append(event)
with patch.object(crewai_event_bus, 'emit', side_effect=capture_event):
with pytest.raises(KeyboardInterrupt):
self.executor._ask_human_input("Test result")
assert len(events_captured) == 1
required_event = events_captured[0][1]
assert isinstance(required_event, HumanInputRequiredEvent)
def test_human_input_without_crew(self):
"""Test human input events when crew is None"""
self.executor.crew = None
events_captured = []
def capture_event(event):
events_captured.append(event)
with patch.object(crewai_event_bus, 'emit', side_effect=capture_event), \
patch('builtins.input', return_value='test'):
self.executor._ask_human_input("Test result")
assert len(events_captured) == 2
required_event = events_captured[0][1]
assert required_event.execution_id is None
assert required_event.crew_id is None
def test_human_input_without_task(self):
"""Test human input events when task is None"""
self.executor.task = None
events_captured = []
def capture_event(event):
events_captured.append(event)
with patch.object(crewai_event_bus, 'emit', side_effect=capture_event), \
patch('builtins.input', return_value='test'):
self.executor._ask_human_input("Test result")
assert len(events_captured) == 2
required_event = events_captured[0][1]
assert required_event.task_id is None
def test_human_input_without_agent(self):
"""Test human input events when agent is None"""
self.executor.agent = None
events_captured = []
def capture_event(event):
events_captured.append(event)
with patch.object(crewai_event_bus, 'emit', side_effect=capture_event), \
patch('builtins.input', return_value='test'):
self.executor._ask_human_input("Test result")
assert len(events_captured) == 2
required_event = events_captured[0][1]
assert required_event.agent_id is None

View File

@@ -1,96 +0,0 @@
from pydantic import BaseModel, Field
from crewai.utilities.crew_pydantic_output_parser import clean_json_from_text
class TestOutput(BaseModel):
summary: str = Field(description="A brief summary")
confidence: int = Field(description="Confidence level from 1-100")
def test_clean_json_from_text_with_trailing_characters():
"""Test that clean_json_from_text handles trailing characters correctly."""
text_with_trailing = '''{"summary": "Test summary", "confidence": 85}
Additional text after JSON that should be ignored.
Final Answer: This text should also be ignored.'''
cleaned = clean_json_from_text(text_with_trailing)
expected = '{"summary": "Test summary", "confidence": 85}'
assert cleaned == expected
def test_clean_json_from_text_with_markdown():
"""Test that clean_json_from_text handles markdown formatting correctly."""
text_with_markdown = '''```json
{"summary": "Test summary with markdown", "confidence": 90}
```'''
cleaned = clean_json_from_text(text_with_markdown)
expected = '{"summary": "Test summary with markdown", "confidence": 90}'
assert cleaned == expected
def test_clean_json_from_text_with_prefix():
"""Test that clean_json_from_text handles text prefix correctly."""
text_with_prefix = '''Final Answer: {"summary": "Test summary with prefix", "confidence": 95}'''
cleaned = clean_json_from_text(text_with_prefix)
expected = '{"summary": "Test summary with prefix", "confidence": 95}'
assert cleaned == expected
def test_clean_json_from_text_pure_json():
"""Test that clean_json_from_text handles pure JSON correctly."""
pure_json = '{"summary": "Pure JSON", "confidence": 100}'
cleaned = clean_json_from_text(pure_json)
assert cleaned == pure_json
def test_clean_json_from_text_no_json():
"""Test that clean_json_from_text returns original text when no JSON found."""
no_json_text = "This is just plain text with no JSON"
cleaned = clean_json_from_text(no_json_text)
assert cleaned == no_json_text
def test_clean_json_from_text_invalid_json():
"""Test that clean_json_from_text handles invalid JSON gracefully."""
invalid_json = '{"summary": "Invalid JSON", "confidence":}'
cleaned = clean_json_from_text(invalid_json)
assert cleaned == invalid_json
def test_clean_json_from_text_multiple_json_objects():
"""Test that clean_json_from_text returns the first valid JSON object."""
multiple_json = '''{"summary": "First JSON", "confidence": 80}
Some text in between.
{"summary": "Second JSON", "confidence": 90}'''
cleaned = clean_json_from_text(multiple_json)
expected = '{"summary": "First JSON", "confidence": 80}'
assert cleaned == expected
def test_clean_json_from_text_nested_json():
"""Test that clean_json_from_text handles nested JSON correctly."""
nested_json = '''{"summary": "Nested test", "details": {"score": 95, "category": "A"}, "confidence": 85}'''
cleaned = clean_json_from_text(nested_json)
assert cleaned == nested_json
def test_clean_json_from_text_with_complex_trailing():
"""Test the exact scenario from GitHub issue #3191."""
github_issue_text = '''{"valid": true, "feedback": null}
Agent failed to reach a final answer. This is likely a bug - please report it.
Error details: maximum recursion depth exceeded in comparison'''
cleaned = clean_json_from_text(github_issue_text)
expected = '{"valid": true, "feedback": null}'
assert cleaned == expected

View File

@@ -492,102 +492,4 @@ def test_lite_agent_with_invalid_llm():
backstory="Test backstory",
llm="invalid-model"
)
assert "Expected LLM instance of type BaseLLM" in str(exc_info.value)
def test_lite_agent_structured_output_with_trailing_characters():
"""Test that LiteAgent can handle JSON responses with trailing characters."""
from unittest.mock import patch
class SimpleOutput(BaseModel):
summary: str = Field(description="A brief summary")
confidence: int = Field(description="Confidence level from 1-100")
mock_response_with_trailing = '''{"summary": "Test summary", "confidence": 85}
Additional text after JSON that should be ignored.
Final Answer: This text should also be ignored.'''
with patch('crewai.lite_agent.get_llm_response') as mock_llm:
mock_llm.return_value = mock_response_with_trailing
agent = LiteAgent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm=LLM(model="gpt-4o-mini"),
)
result = agent.kickoff(
"Test message",
response_format=SimpleOutput
)
assert result.pydantic is not None
assert isinstance(result.pydantic, SimpleOutput)
assert result.pydantic.summary == "Test summary"
assert result.pydantic.confidence == 85
def test_lite_agent_structured_output_with_markdown():
"""Test that LiteAgent can handle JSON responses wrapped in markdown."""
from unittest.mock import patch
class SimpleOutput(BaseModel):
summary: str = Field(description="A brief summary")
confidence: int = Field(description="Confidence level from 1-100")
mock_response_with_markdown = '''```json
{"summary": "Test summary with markdown", "confidence": 90}
```'''
with patch('crewai.lite_agent.get_llm_response') as mock_llm:
mock_llm.return_value = mock_response_with_markdown
agent = LiteAgent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm=LLM(model="gpt-4o-mini"),
)
result = agent.kickoff(
"Test message",
response_format=SimpleOutput
)
assert result.pydantic is not None
assert isinstance(result.pydantic, SimpleOutput)
assert result.pydantic.summary == "Test summary with markdown"
assert result.pydantic.confidence == 90
def test_lite_agent_structured_output_with_final_answer_prefix():
"""Test that LiteAgent can handle JSON responses with Final Answer prefix."""
from unittest.mock import patch
class SimpleOutput(BaseModel):
summary: str = Field(description="A brief summary")
confidence: int = Field(description="Confidence level from 1-100")
mock_response_with_prefix = '''Final Answer: {"summary": "Test summary with prefix", "confidence": 95}'''
with patch('crewai.lite_agent.get_llm_response') as mock_llm:
mock_llm.return_value = mock_response_with_prefix
agent = LiteAgent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm=LLM(model="gpt-4o-mini"),
)
result = agent.kickoff(
"Test message",
response_format=SimpleOutput
)
assert result.pydantic is not None
assert isinstance(result.pydantic, SimpleOutput)
assert result.pydantic.summary == "Test summary with prefix"
assert result.pydantic.confidence == 95
assert "Expected LLM instance of type BaseLLM" in str(exc_info.value)

View File

@@ -302,78 +302,3 @@ def test_hallucination_guardrail_description_in_events():
event = LLMGuardrailStartedEvent(guardrail=guardrail, retry_count=0)
assert event.guardrail == "HallucinationGuardrail (no-op)"
def test_llm_guardrail_with_trailing_characters():
"""Test that LLMGuardrail can handle responses with trailing characters."""
from unittest.mock import patch
mock_response_with_trailing = '''{"valid": true, "feedback": null}
Some additional text that should be ignored.
More trailing content.'''
with patch('crewai.Agent.kickoff') as mock_kickoff:
from crewai.agent import LiteAgentOutput
from crewai.tasks.llm_guardrail import LLMGuardrailResult
mock_output = LiteAgentOutput(
raw=mock_response_with_trailing,
pydantic=LLMGuardrailResult(valid=True, feedback=None),
agent_role="Guardrail Agent",
usage_metrics=None
)
mock_kickoff.return_value = mock_output
guardrail = LLMGuardrail(
description="Test guardrail",
llm=LLM(model="gpt-4o-mini")
)
task_output = TaskOutput(
raw="Test task output",
description="Test task",
expected_output="Output",
agent="Test Agent",
)
result = guardrail(task_output)
assert result[0] is True
assert result[1] == "Test task output"
def test_llm_guardrail_with_markdown_formatting():
"""Test that LLMGuardrail can handle responses with markdown formatting."""
from unittest.mock import patch
mock_response_with_markdown = '''```json
{"valid": false, "feedback": "The output does not meet the requirements"}
```'''
with patch('crewai.Agent.kickoff') as mock_kickoff:
from crewai.agent import LiteAgentOutput
from crewai.tasks.llm_guardrail import LLMGuardrailResult
mock_output = LiteAgentOutput(
raw=mock_response_with_markdown,
pydantic=LLMGuardrailResult(valid=False, feedback="The output does not meet the requirements"),
agent_role="Guardrail Agent",
usage_metrics=None
)
mock_kickoff.return_value = mock_output
guardrail = LLMGuardrail(
description="Test guardrail",
llm=LLM(model="gpt-4o-mini")
)
task_output = TaskOutput(
raw="Test task output",
description="Test task",
expected_output="Output",
agent="Test Agent",
)
result = guardrail(task_output)
assert result[0] is False
assert result[1] == "The output does not meet the requirements"

View File

@@ -0,0 +1,25 @@
from unittest.mock import patch
import pytest
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
def test_configure_embedder_importerror():
configurator = EmbeddingConfigurator()
embedder_config = {
'provider': 'openai',
'config': {
'model': 'text-embedding-ada-002',
}
}
with patch('chromadb.utils.embedding_functions.openai_embedding_function.OpenAIEmbeddingFunction') as mock_openai:
mock_openai.side_effect = ImportError("Module not found.")
with pytest.raises(ImportError) as exc_info:
configurator.configure_embedder(embedder_config)
assert str(exc_info.value) == "Module not found."
mock_openai.assert_called_once()

View File

@@ -64,7 +64,8 @@ def base_agent():
llm="gpt-4o-mini",
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
)
)
@pytest.fixture(scope="module")
def base_task(base_agent):
@@ -74,6 +75,7 @@ def base_task(base_agent):
agent=base_agent,
)
event_listener = EventListener()
@@ -448,6 +450,27 @@ def test_flow_emits_start_event():
assert received_events[0].type == "flow_started"
def test_flow_name_emitted_to_event_bus():
received_events = []
class MyFlowClass(Flow):
name = "PRODUCTION_FLOW"
@start()
def start(self):
return "Hello, world!"
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
flow = MyFlowClass()
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].flow_name == "PRODUCTION_FLOW"
def test_flow_emits_finish_event():
received_events = []
@@ -756,6 +779,7 @@ def test_streaming_empty_response_handling():
received_chunks = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
@@ -793,6 +817,7 @@ def test_streaming_empty_response_handling():
# Restore the original method
llm.call = original_call
@pytest.mark.vcr(filter_headers=["authorization"])
def test_stream_llm_emits_event_with_task_and_agent_info():
completed_event = []
@@ -801,6 +826,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -827,7 +853,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
description="Just say hi",
expected_output="hi",
llm=LLM(model="gpt-4o-mini", stream=True),
agent=agent
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
@@ -855,6 +881,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
assert set(all_task_id) == {task.id}
assert set(all_task_name) == {task.name}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
completed_event = []
@@ -863,6 +890,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -904,6 +932,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
assert set(all_task_id) == {base_task.id}
assert set(all_task_name) == {base_task.name}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_event_with_lite_agent():
completed_event = []
@@ -912,6 +941,7 @@ def test_llm_emits_event_with_lite_agent():
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -936,7 +966,6 @@ def test_llm_emits_event_with_lite_agent():
)
agent.kickoff(messages=[{"role": "user", "content": "say hi!"}])
assert len(completed_event) == 2
assert len(failed_event) == 0
assert len(started_event) == 2

6111
uv.lock generated

File diff suppressed because it is too large Load Diff