Compare commits

..

1 Commits

Author SHA1 Message Date
Jesse Miller
4dae956725 Add support for skills 2026-02-04 07:43:52 -08:00
37 changed files with 339 additions and 676 deletions

View File

@@ -1,63 +0,0 @@
name: Generate Tool Specifications
on:
pull_request:
branches:
- main
paths:
- 'lib/crewai-tools/src/crewai_tools/**'
workflow_dispatch:
permissions:
contents: write
pull-requests: write
jobs:
generate-specs:
runs-on: ubuntu-latest
env:
PYTHONUNBUFFERED: 1
steps:
- name: Generate GitHub App token
id: app-token
uses: tibdex/github-app-token@v2
with:
app_id: ${{ secrets.CREWAI_TOOL_SPECS_APP_ID }}
private_key: ${{ secrets.CREWAI_TOOL_SPECS_PRIVATE_KEY }}
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.head_ref }}
token: ${{ steps.app-token.outputs.token }}
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: "3.12"
enable-cache: true
- name: Install the project
working-directory: lib/crewai-tools
run: uv sync --dev --all-extras
- name: Generate tool specifications
working-directory: lib/crewai-tools
run: uv run python src/crewai_tools/generate_tool_specs.py
- name: Check for changes and commit
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add lib/crewai-tools/tool.specs.json
if git diff --quiet --staged; then
echo "No changes detected in tool.specs.json"
else
echo "Changes detected in tool.specs.json, committing..."
git commit -m "chore: update tool specifications"
git push
fi

View File

@@ -25,6 +25,35 @@ For file-based Knowledge Sources, make sure to place your files in a `knowledge`
Also, use relative paths from the `knowledge` directory when creating the source.
</Tip>
### Skills.md for Crews (CrewBase)
When using the `@CrewBase` decorator and project layout (e.g. from `crewai create crew`), you can add **Skills.md** files as crew-level knowledge sources. Place one markdown file per skill under:
```
src/<project>/.agents/<skill_name>/Skills.md
```
Each `Skills.md` is loaded as a knowledge source so the crew can query it via RAG at runtime. Use `get_skills_knowledge_sources()` when building your crew:
```python
from crewai.project import CrewBase, agent, crew, task
@CrewBase
class MyCrew():
# ...
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
knowledge_sources=self.get_skills_knowledge_sources(),
)
```
You can combine skills with other knowledge sources: `knowledge_sources=[*self.get_skills_knowledge_sources(), other_source]`. If the `.agents` directory is missing or has no `Skills.md` files, `get_skills_knowledge_sources()` returns an empty list. To use a different directory than `.agents`, set the class attribute `skills_directory = "my_skills"`. Skills.md support requires the `docling` package (`uv add docling`).
### Vector store (RAG) client configuration
CrewAI exposes a provider-neutral RAG client abstraction for vector stores. The default provider is ChromaDB, and Qdrant is supported as well. You can switch providers using configuration utilities.

View File

@@ -1,17 +1,12 @@
from datetime import datetime
import json
import os
import time
from typing import Annotated, Any, ClassVar, Literal
from typing import Any, ClassVar
from crewai.tools import BaseTool, EnvVar
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from pydantic.types import StringConstraints
import requests
load_dotenv()
def _save_results_to_file(content: str) -> None:
"""Saves the search results to a file."""
@@ -20,72 +15,37 @@ def _save_results_to_file(content: str) -> None:
file.write(content)
FreshnessPreset = Literal["pd", "pw", "pm", "py"]
FreshnessRange = Annotated[
str, StringConstraints(pattern=r"^\d{4}-\d{2}-\d{2}to\d{4}-\d{2}-\d{2}$")
]
Freshness = FreshnessPreset | FreshnessRange
SafeSearch = Literal["off", "moderate", "strict"]
class BraveSearchToolSchema(BaseModel):
"""Input for BraveSearchTool"""
"""Input for BraveSearchTool."""
query: str = Field(..., description="Search query to perform")
country: str | None = Field(
default=None,
description="Country code for geo-targeting (e.g., 'US', 'BR').",
)
search_language: str | None = Field(
default=None,
description="Language code for the search results (e.g., 'en', 'es').",
)
count: int | None = Field(
default=None,
description="The maximum number of results to return. Actual number may be less.",
)
offset: int | None = Field(
default=None, description="Skip the first N result sets/pages. Max is 9."
)
safesearch: SafeSearch | None = Field(
default=None,
description="Filter out explicit content. Options: off/moderate/strict",
)
spellcheck: bool | None = Field(
default=None,
description="Attempt to correct spelling errors in the search query.",
)
freshness: Freshness | None = Field(
default=None,
description="Enforce freshness of results. Options: pd/pw/pm/py, or YYYY-MM-DDtoYYYY-MM-DD",
)
text_decorations: bool | None = Field(
default=None,
description="Include markup to highlight search terms in the results.",
)
extra_snippets: bool | None = Field(
default=None,
description="Include up to 5 text snippets for each page if possible.",
)
operators: bool | None = Field(
default=None,
description="Whether to apply search operators (e.g., site:example.com).",
search_query: str = Field(
..., description="Mandatory search query you want to use to search the internet"
)
# TODO: Extend support to additional endpoints (e.g., /images, /news, etc.)
class BraveSearchTool(BaseTool):
"""A tool that performs web searches using the Brave Search API."""
"""BraveSearchTool - A tool for performing web searches using the Brave Search API.
name: str = "Brave Search"
This module provides functionality to search the internet using Brave's Search API,
supporting customizable result counts and country-specific searches.
Dependencies:
- requests
- pydantic
- python-dotenv (for API key management)
"""
name: str = "Brave Web Search the internet"
description: str = (
"A tool that performs web searches using the Brave Search API. "
"Results are returned as structured JSON data."
"A tool that can be used to search the internet with a search_query."
)
args_schema: type[BaseModel] = BraveSearchToolSchema
search_url: str = "https://api.search.brave.com/res/v1/web/search"
country: str | None = ""
n_results: int = 10
save_file: bool = False
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
env_vars: list[EnvVar] = Field(
default_factory=lambda: [
EnvVar(
@@ -95,9 +55,6 @@ class BraveSearchTool(BaseTool):
),
]
)
# Rate limiting parameters
_last_request_time: ClassVar[float] = 0
_min_request_interval: ClassVar[float] = 1.0 # seconds
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -116,64 +73,19 @@ class BraveSearchTool(BaseTool):
self._min_request_interval - (current_time - self._last_request_time)
)
BraveSearchTool._last_request_time = time.time()
# Construct and send the request
try:
# Maintain both "search_query" and "query" for backwards compatibility
query = kwargs.get("search_query") or kwargs.get("query")
if not query:
raise ValueError("Query is required")
payload = {"q": query}
if country := kwargs.get("country"):
payload["country"] = country
if search_language := kwargs.get("search_language"):
payload["search_language"] = search_language
# Fallback to deprecated n_results parameter if no count is provided
count = kwargs.get("count")
if count is not None:
payload["count"] = count
else:
payload["count"] = self.n_results
# Offset may be 0, so avoid truthiness check
offset = kwargs.get("offset")
if offset is not None:
payload["offset"] = offset
if safesearch := kwargs.get("safesearch"):
payload["safesearch"] = safesearch
search_query = kwargs.get("search_query") or kwargs.get("query")
if not search_query:
raise ValueError("Search query is required")
save_file = kwargs.get("save_file", self.save_file)
if freshness := kwargs.get("freshness"):
payload["freshness"] = freshness
n_results = kwargs.get("n_results", self.n_results)
# Boolean parameters
spellcheck = kwargs.get("spellcheck")
if spellcheck is not None:
payload["spellcheck"] = spellcheck
payload = {"q": search_query, "count": n_results}
text_decorations = kwargs.get("text_decorations")
if text_decorations is not None:
payload["text_decorations"] = text_decorations
if self.country != "":
payload["country"] = self.country
extra_snippets = kwargs.get("extra_snippets")
if extra_snippets is not None:
payload["extra_snippets"] = extra_snippets
operators = kwargs.get("operators")
if operators is not None:
payload["operators"] = operators
# Limit the result types to "web" since there is presently no
# handling of other types like "discussions", "faq", "infobox",
# "news", "videos", or "locations".
payload["result_filter"] = "web"
# Setup Request Headers
headers = {
"X-Subscription-Token": os.environ["BRAVE_API_KEY"],
"Accept": "application/json",
@@ -185,32 +97,25 @@ class BraveSearchTool(BaseTool):
response.raise_for_status() # Handle non-200 responses
results = response.json()
# TODO: Handle other result types like "discussions", "faq", etc.
web_results_items = []
if "web" in results:
web_results = results["web"]["results"]
for result in web_results:
url = result.get("url")
title = result.get("title")
# If, for whatever reason, this entry does not have a title
# or url, skip it.
if not url or not title:
results = results["web"]["results"]
string = []
for result in results:
try:
string.append(
"\n".join(
[
f"Title: {result['title']}",
f"Link: {result['url']}",
f"Snippet: {result['description']}",
"---",
]
)
)
except KeyError: # noqa: PERF203
continue
item = {
"url": url,
"title": title,
}
description = result.get("description")
if description:
item["description"] = description
snippets = result.get("extra_snippets")
if snippets:
item["snippets"] = snippets
web_results_items.append(item)
content = json.dumps(web_results_items)
content = "\n".join(string)
except requests.RequestException as e:
return f"Error performing search: {e!s}"
except KeyError as e:

View File

@@ -13,16 +13,10 @@ from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder impor
from crewai_tools.tools.crewai_platform_tools.crewai_platform_tools import (
CrewaiPlatformTools,
)
from crewai_tools.tools.crewai_platform_tools.file_hook import (
process_file_markers,
register_file_processing_hook,
)
__all__ = [
"CrewAIPlatformActionTool",
"CrewaiPlatformToolBuilder",
"CrewaiPlatformTools",
"process_file_markers",
"register_file_processing_hook",
]

View File

@@ -2,8 +2,6 @@
import json
import os
import re
import tempfile
from typing import Any
from crewai.tools import BaseTool
@@ -16,26 +14,6 @@ from crewai_tools.tools.crewai_platform_tools.misc import (
get_platform_integration_token,
)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_MIME_TO_EXTENSION = {
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-powerpoint": ".ppt",
"application/pdf": ".pdf",
"image/png": ".png",
"image/jpeg": ".jpg",
"image/gif": ".gif",
"image/webp": ".webp",
"text/plain": ".txt",
"text/csv": ".csv",
"application/json": ".json",
"application/zip": ".zip",
}
class CrewAIPlatformActionTool(BaseTool):
action_name: str = Field(default="", description="The name of the action")
@@ -93,18 +71,10 @@ class CrewAIPlatformActionTool(BaseTool):
url=api_url,
headers=headers,
json=payload,
timeout=300,
stream=True,
timeout=60,
verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true",
)
content_type = response.headers.get("Content-Type", "")
# Check if response is binary (non-JSON)
if "application/json" not in content_type:
return self._handle_binary_response(response)
# Normal JSON response
data = response.json()
if not response.ok:
if isinstance(data, dict):
@@ -121,49 +91,3 @@ class CrewAIPlatformActionTool(BaseTool):
except Exception as e:
return f"Error executing action {self.action_name}: {e!s}"
def _handle_binary_response(self, response: requests.Response) -> str:
"""Handle binary streaming response from the API.
Streams the binary content to a temporary file and returns a marker
that can be processed by the file hook to inject the file into the
LLM context.
Args:
response: The streaming HTTP response with binary content.
Returns:
A file marker string in the format:
__CREWAI_FILE__:filename:content_type:file_path
"""
content_type = response.headers.get("Content-Type", "application/octet-stream")
filename = self._extract_filename_from_headers(response.headers)
extension = self._get_file_extension(content_type, filename)
with tempfile.NamedTemporaryFile(
delete=False, suffix=extension, prefix="crewai_"
) as tmp_file:
for chunk in response.iter_content(chunk_size=8192):
tmp_file.write(chunk)
tmp_path = tmp_file.name
return f"{_FILE_MARKER_PREFIX}:{filename}:{content_type}:{tmp_path}"
def _extract_filename_from_headers(
self, headers: requests.structures.CaseInsensitiveDict
) -> str:
content_disposition = headers.get("Content-Disposition", "")
if content_disposition:
match = re.search(r'filename="?([^";\s]+)"?', content_disposition)
if match:
return match.group(1)
return "downloaded_file"
def _get_file_extension(self, content_type: str, filename: str) -> str:
if "." in filename:
return "." + filename.rsplit(".", 1)[-1]
base_content_type = content_type.split(";")[0].strip()
return _MIME_TO_EXTENSION.get(base_content_type, "")

View File

@@ -6,9 +6,6 @@ from crewai_tools.adapters.tool_collection import ToolCollection
from crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder import (
CrewaiPlatformToolBuilder,
)
from crewai_tools.tools.crewai_platform_tools.file_hook import (
register_file_processing_hook,
)
logger = logging.getLogger(__name__)
@@ -25,8 +22,6 @@ def CrewaiPlatformTools( # noqa: N802
Returns:
A list of BaseTool instances for platform actions
"""
register_file_processing_hook()
builder = CrewaiPlatformToolBuilder(apps=apps)
return builder.tools() # type: ignore

View File

@@ -1,132 +0,0 @@
"""File processing hook for CrewAI Platform Tools.
This module provides a hook that processes file markers returned by platform tools
and injects the files into the LLM context for native file handling.
"""
from __future__ import annotations
import logging
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from crewai.hooks.tool_hooks import ToolCallHookContext
logger = logging.getLogger(__name__)
_FILE_MARKER_PREFIX = "__CREWAI_FILE__"
_hook_registered = False
def process_file_markers(context: ToolCallHookContext) -> str | None:
"""Process file markers in tool results and inject files into context.
This hook detects file markers returned by platform tools (e.g., download_file)
and converts them into FileInput objects that are attached to the hook context.
The agent executor will then inject these files into the tool message for
native LLM file handling.
The marker format is:
__CREWAI_FILE__:filename:content_type:file_path
Args:
context: The tool call hook context containing the tool result.
Returns:
A human-readable message if a file was processed, None otherwise.
"""
result = context.tool_result
if not result or not result.startswith(_FILE_MARKER_PREFIX):
return None
try:
parts = result.split(":", 3)
if len(parts) < 4:
logger.warning(f"Invalid file marker format: {result[:100]}")
return None
_, filename, content_type, file_path = parts
if not os.path.isfile(file_path):
logger.error(f"File not found: {file_path}")
return f"Error: Downloaded file not found at {file_path}"
try:
from crewai_files import File
except ImportError:
logger.warning(
"crewai_files not installed. File will not be attached to LLM context."
)
return (
f"Downloaded file: {filename} ({content_type}). "
f"File saved at: {file_path}. "
"Note: Install crewai_files for native LLM file handling."
)
file = File(source=file_path, content_type=content_type, filename=filename)
context.files = {filename: file}
file_size = os.path.getsize(file_path)
size_str = _format_file_size(file_size)
return f"Downloaded file: {filename} ({content_type}, {size_str}). File is attached for LLM analysis."
except Exception as e:
logger.exception(f"Error processing file marker: {e}")
return f"Error processing downloaded file: {e}"
def _format_file_size(size_bytes: int) -> str:
"""Format file size in human-readable format.
Args:
size_bytes: Size in bytes.
Returns:
Human-readable size string.
"""
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
def register_file_processing_hook() -> bool:
"""Register the file processing hook globally.
This function should be called once during application initialization
to enable automatic file injection for platform tools.
Returns:
True if the hook was registered, False if it was already registered
or if registration failed.
"""
global _hook_registered
if _hook_registered:
logger.debug("File processing hook already registered")
return False
try:
from crewai.hooks import register_after_tool_call_hook
register_after_tool_call_hook(process_file_markers)
_hook_registered = True
logger.info("File processing hook registered successfully")
return True
except ImportError:
logger.warning(
"crewai.hooks not available. File processing hook not registered."
)
return False
except Exception as e:
logger.exception(f"Failed to register file processing hook: {e}")
return False

View File

@@ -137,7 +137,6 @@ class StagehandTool(BaseTool):
- 'observe': For finding elements in a specific area
"""
args_schema: type[BaseModel] = StagehandToolSchema
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand"])
# Stagehand configuration
api_key: str | None = None

View File

@@ -1,9 +1,7 @@
import json
from unittest.mock import patch
import pytest
from crewai_tools.tools.brave_search_tool.brave_search_tool import BraveSearchTool
import pytest
@pytest.fixture
@@ -32,43 +30,16 @@ def test_brave_tool_search(mock_get, brave_tool):
}
mock_get.return_value.json.return_value = mock_response
result = brave_tool.run(query="test")
result = brave_tool.run(search_query="test")
assert "Test Title" in result
assert "http://test.com" in result
@patch("requests.get")
def test_brave_tool(mock_get):
mock_response = {
"web": {
"results": [
{
"title": "Brave Browser",
"url": "https://brave.com",
"description": "Brave Browser description",
}
]
}
}
mock_get.return_value.json.return_value = mock_response
tool = BraveSearchTool(n_results=2)
result = tool.run(query="Brave Browser")
assert result is not None
# Parse JSON so we can examine the structure
data = json.loads(result)
assert isinstance(data, list)
assert len(data) >= 1
# First item should have expected fields: title, url, and description
first = data[0]
assert "title" in first
assert first["title"] == "Brave Browser"
assert "url" in first
assert first["url"] == "https://brave.com"
assert "description" in first
assert first["description"] == "Brave Browser description"
def test_brave_tool():
tool = BraveSearchTool(
n_results=2,
)
tool.run(search_query="ChatGPT")
if __name__ == "__main__":

View File

@@ -2,7 +2,6 @@ import unittest
from unittest.mock import Mock, patch
from crewai_tools.tools.crewai_platform_tools import CrewaiPlatformTools
from crewai_tools.tools.crewai_platform_tools import file_hook
class TestCrewaiPlatformTools(unittest.TestCase):
@@ -114,64 +113,3 @@ class TestCrewaiPlatformTools(unittest.TestCase):
with self.assertRaises(ValueError) as context:
CrewaiPlatformTools(apps=["github"])
assert "No platform integration token found" in str(context.exception)
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token"})
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tools.register_file_processing_hook"
)
def test_crewai_platform_tools_registers_file_hook(
self, mock_register_hook, mock_get
):
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {"github": []}}
mock_get.return_value = mock_response
CrewaiPlatformTools(apps=["github"])
mock_register_hook.assert_called_once()
class TestFileHook(unittest.TestCase):
def setUp(self):
file_hook._hook_registered = False
def tearDown(self):
file_hook._hook_registered = False
@patch("crewai.hooks.register_after_tool_call_hook")
def test_register_hook_is_idempotent(self, mock_register):
"""Test hook registration succeeds once and is idempotent."""
assert file_hook.register_file_processing_hook() is True
assert file_hook._hook_registered is True
mock_register.assert_called_once_with(file_hook.process_file_markers)
# Second call should return False and not register again
assert file_hook.register_file_processing_hook() is False
mock_register.assert_called_once()
def test_process_file_markers_ignores_non_file_results(self):
"""Test that non-file-marker results return None."""
test_cases = [
None, # Empty result
"Regular tool output", # Non-marker
"__CREWAI_FILE__:incomplete", # Invalid format (missing parts)
]
for tool_result in test_cases:
mock_context = Mock()
mock_context.tool_result = tool_result
assert file_hook.process_file_markers(mock_context) is None
def test_format_file_size(self):
"""Test file size formatting across units."""
cases = [
(500, "500 bytes"),
(1024, "1.0 KB"),
(1536, "1.5 KB"),
(1024 * 1024, "1.0 MB"),
(1024 * 1024 * 1024, "1.0 GB"),
]
for size_bytes, expected in cases:
assert file_hook._format_file_size(size_bytes) == expected

View File

@@ -16,7 +16,6 @@ from crewai.agents.agent_adapters.openai_agents.protocols import (
)
from crewai.tools import BaseTool
from crewai.utilities.import_utils import require
from crewai.utilities.pydantic_schema_utils import force_additional_properties_false
from crewai.utilities.string_utils import sanitize_tool_name
@@ -136,9 +135,7 @@ class OpenAIAgentToolAdapter(BaseToolAdapter):
for tool in tools:
schema: dict[str, Any] = tool.args_schema.model_json_schema()
schema = force_additional_properties_false(schema)
schema.update({"type": "object"})
schema.update({"additionalProperties": False, "type": "object"})
openai_tool: OpenAIFunctionTool = cast(
OpenAIFunctionTool,

View File

@@ -930,10 +930,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.messages.append(tool_message)
# Log the tool execution

View File

@@ -110,6 +110,9 @@ def create_folder_structure(name, parent_folder=None):
(folder_path / "src" / folder_name).mkdir(parents=True)
(folder_path / "src" / folder_name / "tools").mkdir(parents=True)
(folder_path / "src" / folder_name / "config").mkdir(parents=True)
(folder_path / "src" / folder_name / ".agents" / "research").mkdir(
parents=True
)
return folder_path, folder_name, class_name
@@ -154,6 +157,13 @@ def copy_template_files(folder_path, name, class_name, parent_folder):
dst_file = src_folder / file_name
copy_template(src_file, dst_file, name, class_name, folder_path.name)
# Copy Skills.md from .agents/research template
skills_src = templates_dir / ".agents" / "research" / "Skills.md"
skills_dst = src_folder / ".agents" / "research" / "Skills.md"
if skills_src.exists():
skills_dst.parent.mkdir(parents=True, exist_ok=True)
copy_template(skills_src, skills_dst, name, class_name, folder_path.name)
def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
folder_path, folder_name, class_name = create_folder_structure(name, parent_folder)

View File

@@ -54,11 +54,18 @@ class {{crew_name}}():
"""Creates the {{crew_name}} crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
# Skills.md files under .agents/<skill_name>/ are loaded via get_skills_knowledge_sources()
skills_sources = (
self.get_skills_knowledge_sources()
if hasattr(self, "get_skills_knowledge_sources")
else []
)
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
knowledge_sources=skills_sources,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

View File

@@ -814,10 +814,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"name": func_name,
"content": result,
}
if after_hook_context.files:
tool_message["files"] = after_hook_context.files
self.state.messages.append(tool_message)
# Log the tool execution

View File

@@ -8,13 +8,11 @@ Example:
from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
self.send_slack_notification(context)
raise HumanFeedbackPending(context=context)
class MyFlow(Flow):
@start()
@human_feedback(
@@ -28,13 +26,12 @@ Example:
```
"""
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import (
HumanFeedbackPending,
HumanFeedbackProvider,
PendingFeedbackContext,
)
from crewai.flow.async_feedback.providers import ConsoleProvider
__all__ = [
"ConsoleProvider",

View File

@@ -6,11 +6,10 @@ provider that collects feedback via console input.
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from crewai.flow.async_feedback.types import PendingFeedbackContext
if TYPE_CHECKING:
from crewai.flow.flow import Flow
@@ -28,7 +27,6 @@ class ConsoleProvider:
```python
from crewai.flow.async_feedback import ConsoleProvider
# Explicitly use console provider
@human_feedback(
message="Review this:",
@@ -51,7 +49,7 @@ class ConsoleProvider:
def request_feedback(
self,
context: PendingFeedbackContext,
flow: Flow[Any],
flow: Flow,
) -> str:
"""Request feedback via console input (blocking).

View File

@@ -10,7 +10,6 @@ from dataclasses import dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
if TYPE_CHECKING:
from crewai.flow.flow import Flow
@@ -156,7 +155,7 @@ class HumanFeedbackPending(Exception): # noqa: N818 - Not an error, a control f
callback_info={
"slack_channel": "#reviews",
"thread_id": ticket_id,
},
}
)
```
"""
@@ -233,7 +232,7 @@ class HumanFeedbackProvider(Protocol):
callback_info={
"channel": self.channel,
"thread_id": thread_id,
},
}
)
```
"""
@@ -241,7 +240,7 @@ class HumanFeedbackProvider(Protocol):
def request_feedback(
self,
context: PendingFeedbackContext,
flow: Flow[Any],
flow: Flow,
) -> str:
"""Request feedback from a human.

View File

@@ -1,5 +1,4 @@
from typing import Final, Literal
AND_CONDITION: Final[Literal["AND"]] = "AND"
OR_CONDITION: Final[Literal["OR"]] = "OR"

View File

@@ -58,7 +58,6 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent,
)
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
@@ -1541,13 +1540,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
flow_token = attach(ctx)
flow_id_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = inputs and "id" in inputs and self._persistence is not None
@@ -1725,10 +1717,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output
finally:
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
detach(flow_token)
async def akickoff(

View File

@@ -8,7 +8,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider

View File

@@ -1,16 +0,0 @@
"""Flow execution context management.
This module provides context variables for tracking flow execution state across
async boundaries and nested function calls.
"""
import contextvars
current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_request_id", default=None
)
current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)

View File

@@ -1,22 +1,46 @@
from pydantic import BaseModel, model_validator
import inspect
from typing import Any
from pydantic import BaseModel, Field, InstanceOf, model_validator
from typing_extensions import Self
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow import Flow
class FlowTrackable(BaseModel):
"""Mixin that tracks flow execution context for objects created within flows.
"""Mixin that tracks the Flow instance that instantiated the object, e.g. a
Flow instance that created a Crew or Agent.
When a Crew or Agent is instantiated inside a flow execution, this mixin
automatically captures the flow ID and request ID from context variables,
enabling proper tracking and association with the parent flow execution.
Automatically finds and stores a reference to the parent Flow instance by
inspecting the call stack.
"""
parent_flow: InstanceOf[Flow[Any]] | None = Field(
default=None,
description="The parent flow of the instance, if it was created inside a flow.",
)
@model_validator(mode="after")
def _set_flow_context(self) -> Self:
request_id = current_flow_request_id.get()
if request_id:
self._request_id = request_id
self._flow_id = current_flow_id.get()
def _set_parent_flow(self) -> Self:
max_depth = 8
frame = inspect.currentframe()
try:
if frame is None:
return self
frame = frame.f_back
for _ in range(max_depth):
if frame is None:
break
candidate = frame.f_locals.get("self")
if isinstance(candidate, Flow):
self.parent_flow = candidate
break
frame = frame.f_back
finally:
del frame
return self

View File

@@ -11,7 +11,6 @@ Example (synchronous, default):
```python
from crewai.flow import Flow, start, listen, human_feedback
class ReviewFlow(Flow):
@start()
@human_feedback(
@@ -33,13 +32,11 @@ Example (asynchronous with custom provider):
from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow):
self.send_notification(context)
raise HumanFeedbackPending(context=context)
class ReviewFlow(Flow):
@start()
@human_feedback(
@@ -232,7 +229,6 @@ def human_feedback(
def review_document(self):
return document_content
@listen("approved")
def publish(self):
print(f"Publishing: {self.last_human_feedback.output}")
@@ -269,7 +265,7 @@ def human_feedback(
def decorator(func: F) -> F:
"""Inner decorator that wraps the function."""
def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
def _request_feedback(flow_instance: Flow, method_output: Any) -> str:
"""Request feedback using provider or default console."""
from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -295,16 +291,19 @@ def human_feedback(
effective_provider = flow_config.hitl_provider
if effective_provider is not None:
# Use provider (may raise HumanFeedbackPending for async providers)
return effective_provider.request_feedback(context, flow_instance)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
else:
# Use default console input (local development)
return flow_instance._request_human_feedback(
message=message,
output=method_output,
metadata=metadata,
emit=emit,
)
def _process_feedback(
flow_instance: Flow[Any],
flow_instance: Flow,
method_output: Any,
raw_feedback: str,
) -> HumanFeedbackResult | str:
@@ -320,14 +319,12 @@ def human_feedback(
# No default and no feedback - use first outcome
collapsed_outcome = emit[0]
elif emit:
if llm is not None:
collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
# Collapse feedback to outcome using LLM
collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback,
outcomes=emit,
llm=llm,
)
# Create result
result = HumanFeedbackResult(
@@ -352,7 +349,7 @@ def human_feedback(
if asyncio.iscoroutinefunction(func):
# Async wrapper
@wraps(func)
async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
async def async_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
# Execute the original method
method_output = await func(self, *args, **kwargs)
@@ -366,7 +363,7 @@ def human_feedback(
else:
# Sync wrapper
@wraps(func)
def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
def sync_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any:
# Execute the original method
method_output = func(self, *args, **kwargs)
@@ -400,10 +397,11 @@ def human_feedback(
)
wrapper.__is_flow_method__ = True
# Make it a router if emit specified
if emit:
wrapper.__is_router__ = True
wrapper.__router_paths__ = list(emit)
return wrapper # type: ignore[no-any-return]
return wrapper # type: ignore[return-value]
return decorator

View File

@@ -7,7 +7,6 @@ from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -104,3 +103,4 @@ class FlowPersistence(ABC):
Args:
flow_uuid: Unique identifier for the flow instance
"""
pass

View File

@@ -15,7 +15,6 @@ from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence
from crewai.utilities.paths import db_storage_path
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -177,8 +176,7 @@ class SQLiteFlowPersistence(FlowPersistence):
row = cursor.fetchone()
if row:
result = json.loads(row[0])
return result if isinstance(result, dict) else None
return json.loads(row[0])
return None
def save_pending_feedback(
@@ -198,6 +196,7 @@ class SQLiteFlowPersistence(FlowPersistence):
state_data: Current state data
"""
# Import here to avoid circular imports
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Convert state_data to dict
if isinstance(state_data, BaseModel):

View File

@@ -5,7 +5,6 @@ from typing import TYPE_CHECKING, Any
from crewai.events.event_listener import event_listener
from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType
from crewai.utilities.printer import Printer
from crewai.utilities.types import FileInput
if TYPE_CHECKING:
@@ -35,9 +34,6 @@ class ToolCallHookContext:
crew: Crew instance (may be None)
tool_result: Tool execution result (only set for after_tool_call hooks).
Can be modified by returning a new string from after_tool_call hook.
files: Optional dictionary of files to attach to the tool message.
Can be set by after_tool_call hooks to inject files into the LLM context.
These files will be formatted according to the LLM provider's requirements.
"""
def __init__(
@@ -68,7 +64,6 @@ class ToolCallHookContext:
self.task = task
self.crew = crew
self.tool_result = tool_result
self.files: dict[str, FileInput] | None = None
def request_human_input(
self,

View File

@@ -1521,16 +1521,13 @@ class OpenAICompletion(BaseLLM):
) -> list[dict[str, Any]]:
"""Convert CrewAI tool format to OpenAI function calling format."""
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.utilities.pydantic_schema_utils import (
force_additional_properties_false,
)
openai_tools = []
for tool in tools:
name, description, parameters = safe_tool_conversion(tool, "OpenAI")
openai_tool: dict[str, Any] = {
openai_tool = {
"type": "function",
"function": {
"name": name,
@@ -1540,11 +1537,10 @@ class OpenAICompletion(BaseLLM):
}
if parameters:
params_dict = (
parameters if isinstance(parameters, dict) else dict(parameters)
)
params_dict = force_additional_properties_false(params_dict)
openai_tool["function"]["parameters"] = params_dict
if isinstance(parameters, dict):
openai_tool["function"]["parameters"] = parameters # type: ignore
else:
openai_tool["function"]["parameters"] = dict(parameters)
openai_tools.append(openai_tool)
return openai_tools

View File

@@ -178,6 +178,57 @@ def _set_mcp_params(cls: type[CrewClass]) -> None:
cls.mcp_connect_timeout = getattr(cls, "mcp_connect_timeout", 30)
def _set_skills_params(cls: type[CrewClass]) -> None:
"""Set the skills directory path for the crew class.
Args:
cls: Crew class to configure.
"""
cls.skills_directory = getattr(cls, "skills_directory", ".agents")
def get_skills_knowledge_sources(self: CrewInstance) -> list[Any]:
"""Discover Skills.md files under .agents/<skill_name>/ and return them as knowledge sources.
Looks for src/<project>/.agents/<skill_name>/Skills.md (relative to the crew class
base_directory). Each found file is wrapped in a CrewDoclingSource so the crew can
query it via RAG. Requires the docling package; if not installed, returns an empty list.
Returns:
List of knowledge sources (CrewDoclingSource instances), or empty list if
.agents is missing, has no Skills.md files, or docling is not installed.
"""
skills_dir_name = getattr(self, "skills_directory", ".agents")
skills_dir = self.base_directory / skills_dir_name
if not skills_dir.exists() or not skills_dir.is_dir():
return []
try:
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
except ImportError:
logging.warning(
"Skills.md support requires the docling package. "
"Install it with: uv add docling"
)
return []
sources: list[Any] = []
for subdir in sorted(skills_dir.iterdir()):
if not subdir.is_dir():
continue
skills_md = subdir / "Skills.md"
if skills_md.exists():
try:
source = CrewDoclingSource(file_paths=[skills_md])
sources.append(source)
except Exception as e:
logging.warning(
f"Could not create knowledge source for {skills_md}: {e}"
)
return sources
def _is_string_list(value: list[str] | list[BaseTool]) -> TypeGuard[list[str]]:
"""Type guard to check if list contains strings rather than BaseTool instances.
@@ -731,6 +782,7 @@ _CLASS_SETUP_FUNCTIONS: tuple[Callable[[type[CrewClass]], None], ...] = (
_set_base_directory,
_set_config_paths,
_set_mcp_params,
_set_skills_params,
)
_METHODS_TO_INJECT = (
@@ -739,6 +791,7 @@ _METHODS_TO_INJECT = (
_load_config,
load_configurations,
staticmethod(load_yaml),
get_skills_knowledge_sources,
map_all_agent_variables,
_map_agent_variables,
map_all_task_variables,

View File

@@ -81,8 +81,10 @@ class CrewInstance(Protocol):
tasks_config: dict[str, Any]
mcp_server_params: Any
mcp_connect_timeout: int
skills_directory: str
def load_configurations(self) -> None: ...
def get_skills_knowledge_sources(self) -> list[Any]: ...
def map_all_agent_variables(self) -> None: ...
def map_all_task_variables(self) -> None: ...
def close_mcp_server(self, instance: Self, outputs: CrewOutput) -> CrewOutput: ...
@@ -122,8 +124,10 @@ class CrewClass(Protocol):
original_tasks_config_path: str
mcp_server_params: Any
mcp_connect_timeout: int
skills_directory: str
close_mcp_server: Callable[..., Any]
get_mcp_tools: Callable[..., list[BaseTool]]
get_skills_knowledge_sources: Callable[..., list[Any]]
_load_config: Callable[..., dict[str, Any]]
load_configurations: Callable[..., None]
load_yaml: Callable[..., dict[str, Any]]

View File

@@ -148,7 +148,10 @@ def _llm_via_environment_or_fallback() -> LLM | None:
"AWS_SECRET_ACCESS_KEY",
"AWS_REGION_NAME",
]
set_provider = model_name.partition("/")[0] if "/" in model_name else "openai"
if "/" in model_name:
set_provider = model_name.partition("/")[0]
else:
set_provider = LLM._infer_provider_from_model(model_name)
if set_provider in ENV_VARS:
env_vars_for_provider = ENV_VARS[set_provider]

View File

@@ -127,36 +127,6 @@ def add_key_in_dict_recursively(
return d
def force_additional_properties_false(d: Any) -> Any:
"""Force additionalProperties=false on all object-type dicts recursively.
OpenAI strict mode requires all objects to have additionalProperties=false.
This function overwrites any existing value to ensure compliance.
Also ensures objects have properties and required arrays, even if empty,
as OpenAI strict mode requires these for all object types.
Args:
d: The dictionary/list to modify.
Returns:
The modified dictionary/list.
"""
if isinstance(d, dict):
if d.get("type") == "object":
d["additionalProperties"] = False
if "properties" not in d:
d["properties"] = {}
if "required" not in d:
d["required"] = []
for v in d.values():
force_additional_properties_false(v)
elif isinstance(d, list):
for i in d:
force_additional_properties_false(i)
return d
def fix_discriminator_mappings(schema: dict[str, Any]) -> dict[str, Any]:
"""Replace '#/$defs/...' references in discriminator.mapping with just the model name.
@@ -308,7 +278,13 @@ def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"""
json_schema = model.model_json_schema(ref_template="#/$defs/{model}")
json_schema = force_additional_properties_false(json_schema)
json_schema = add_key_in_dict_recursively(
json_schema,
key="additionalProperties",
value=False,
criteria=lambda d: d.get("type") == "object"
and "additionalProperties" not in d,
)
json_schema = resolve_refs(json_schema)
@@ -402,9 +378,6 @@ def create_model_from_schema( # type: ignore[no-any-unimported]
"""
effective_root = root_schema or json_schema
json_schema = force_additional_properties_false(json_schema)
effective_root = force_additional_properties_false(effective_root)
if "allOf" in json_schema:
json_schema = _merge_all_of_schemas(json_schema["allOf"], effective_root)
if "title" not in json_schema and "title" in (root_schema or {}):

View File

@@ -299,16 +299,14 @@ class TestFlow(Flow):
return agent.kickoff("Test query")
def verify_agent_flow_context(result, agent, flow):
"""Verify that both the result and agent have the correct flow context."""
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
def verify_agent_parent_flow(result, agent, flow):
"""Verify that both the result and agent have the correct parent flow."""
assert result.parent_flow is flow
assert agent is not None
assert agent._flow_id == flow.flow_id # type: ignore[attr-defined]
assert agent._request_id == flow.flow_id # type: ignore[attr-defined]
assert agent.parent_flow is flow
def test_sets_flow_context_when_inside_flow():
def test_sets_parent_flow_when_inside_flow():
"""Test that an Agent can be created and executed inside a Flow context."""
captured_event = None

View File

@@ -4520,7 +4520,7 @@ def test_crew_copy_with_memory():
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
@CrewBase
class TestCrew:
agents_config = None
@@ -4582,11 +4582,10 @@ def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
flow.kickoff()
assert captured_crew is not None
assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew.parent_flow is flow
def test_sets_flow_context_when_outside_flow(researcher, writer):
def test_sets_parent_flow_when_outside_flow(researcher, writer):
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
@@ -4595,12 +4594,11 @@ def test_sets_flow_context_when_outside_flow(researcher, writer):
Task(description="Task 2", expected_output="output", agent=writer),
],
)
assert not hasattr(crew, "_flow_id")
assert not hasattr(crew, "_request_id")
assert crew.parent_flow is None
@pytest.mark.vcr()
def test_sets_flow_context_when_inside_flow(researcher, writer):
def test_sets_parent_flow_when_inside_flow(researcher, writer):
class MyFlow(Flow):
@start()
def start(self):
@@ -4617,8 +4615,7 @@ def test_sets_flow_context_when_inside_flow(researcher, writer):
flow = MyFlow()
result = flow.kickoff()
assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
assert result.parent_flow is flow
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):

View File

@@ -1,3 +1,5 @@
import tempfile
from pathlib import Path
from typing import Any, ClassVar
from unittest.mock import Mock, patch
@@ -382,3 +384,93 @@ def test_internal_crew_with_mcp():
adapter_mock.assert_called_once_with(
{"host": "localhost", "port": 8000}, connect_timeout=120
)
def test_get_skills_knowledge_sources_discovery():
"""get_skills_knowledge_sources discovers .agents/<skill_name>/Skills.md and returns sources."""
@CrewBase
class SkillsCrew:
agents_config = "nonexistent/agents.yaml"
tasks_config = "nonexistent/tasks.yaml"
agents: list[BaseAgent]
tasks: list[Task]
@agent
def researcher(self):
return Agent(
role="Researcher",
goal="Research",
backstory="Expert researcher",
)
@task
def research_task(self):
return Task(
description="Research", expected_output="Report", agent=self.researcher()
)
@crew
def crew(self):
return Crew(agents=self.agents, tasks=self.tasks, verbose=True)
with tempfile.TemporaryDirectory() as tmp:
base = Path(tmp)
(base / ".agents" / "skill_a").mkdir(parents=True)
(base / ".agents" / "skill_b").mkdir(parents=True)
(base / ".agents" / "skill_a" / "Skills.md").write_text("# Skill A")
(base / ".agents" / "skill_b" / "Skills.md").write_text("# Skill B")
crew_instance = SkillsCrew()
crew_instance.base_directory = base
sources = crew_instance.get_skills_knowledge_sources()
# With docling installed we get 2 sources; without it we get []
if len(sources) == 2:
paths = []
for s in sources:
paths.extend(getattr(s, "file_paths", []) or getattr(s, "safe_file_paths", []))
path_strs = {str(Path(p).resolve()) for p in paths}
expected_a = str((base / ".agents" / "skill_a" / "Skills.md").resolve())
expected_b = str((base / ".agents" / "skill_b" / "Skills.md").resolve())
assert expected_a in path_strs
assert expected_b in path_strs
else:
assert len(sources) == 0
def test_get_skills_knowledge_sources_missing_dir_returns_empty():
"""get_skills_knowledge_sources returns [] when .agents does not exist."""
@CrewBase
class NoSkillsCrew:
agents_config = "nonexistent/agents.yaml"
tasks_config = "nonexistent/tasks.yaml"
agents: list[BaseAgent]
tasks: list[Task]
@agent
def researcher(self):
return Agent(
role="Researcher",
goal="Research",
backstory="Expert researcher",
)
@task
def research_task(self):
return Task(
description="Research", expected_output="Report", agent=self.researcher()
)
@crew
def crew(self):
return Crew(agents=self.agents, tasks=self.tasks, verbose=True)
with tempfile.TemporaryDirectory() as tmp:
base = Path(tmp)
crew_instance = NoSkillsCrew()
crew_instance.base_directory = base
sources = crew_instance.get_skills_knowledge_sources()
assert sources == []

View File

@@ -149,6 +149,7 @@ members = [
"lib/crewai-tools",
"lib/devtools",
"lib/crewai-files",
"testing_agents",
]