Compare commits

...

5 Commits

Author SHA1 Message Date
Greyson LaLonde
4b2d5633c1 chore: add commitizen to pre-commit hooks 2025-07-09 09:35:02 -04:00
Lucas Gomide
f071966951 docs: add docs about Agent.kickoff usage (#3121)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-07-08 16:15:40 -04:00
Lucas Gomide
318310bb7a docs: add docs about Agent repository (#3122) 2025-07-08 15:56:08 -04:00
Greyson LaLonde
34a03f882c feat: add crew context tracking for LLM guardrail events (#3111)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Add crew context tracking using OpenTelemetry baggage for thread-safe propagation. Context is set during kickoff and cleaned up in finally block. Added thread safety tests with mocked agent execution.
2025-07-07 16:33:07 -04:00
Greyson LaLonde
a0fcc0c8d1 Speed up GitHub Actions tests with parallelization (#3107)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Add pytest-xdist and pytest-split to dev dependencies for parallel test execution
- Split tests into 8 parallel groups per Python version for better distribution
- Enable CPU-level parallelization with -n auto to maximize resource usage
- Add fail-fast strategy and maxfail=3 to stop early on failures
- Add job name to match branch protection rules
- Reduce test timeout from default to 30s for faster failure detection
- Remove redundant cache configuration
2025-07-03 21:08:00 -04:00
14 changed files with 651 additions and 13 deletions

View File

@@ -7,14 +7,18 @@ permissions:
env:
OPENAI_API_KEY: fake-api-key
PYTHONUNBUFFERED: 1
jobs:
tests:
name: tests (${{ matrix.python-version }})
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
fail-fast: true
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
group: [1, 2, 3, 4, 5, 6, 7, 8]
steps:
- name: Checkout code
uses: actions/checkout@v4
@@ -23,6 +27,9 @@ jobs:
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
cache-dependency-glob: |
**/pyproject.toml
**/uv.lock
- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}
@@ -30,5 +37,14 @@ jobs:
- name: Install the project
run: uv sync --dev --all-extras
- name: Run tests
run: uv run pytest --block-network --timeout=60 -vv
- name: Run tests (group ${{ matrix.group }} of 8)
run: |
uv run pytest \
--block-network \
--timeout=30 \
-vv \
--splits 8 \
--group ${{ matrix.group }} \
--durations=10 \
-n auto \
--maxfail=3

View File

@@ -5,3 +5,7 @@ repos:
- id: ruff
args: ["--fix"]
- id: ruff-format
- repo: https://github.com/commitizen-tools/commitizen
rev: v3.13.0
hooks:
- id: commitizen

View File

@@ -94,7 +94,7 @@
"pages": [
"en/guides/advanced/customizing-prompts",
"en/guides/advanced/fingerprinting"
]
}
]
@@ -296,7 +296,8 @@
"en/enterprise/features/webhook-streaming",
"en/enterprise/features/traces",
"en/enterprise/features/hallucination-guardrail",
"en/enterprise/features/integrations"
"en/enterprise/features/integrations",
"en/enterprise/features/agent-repositories"
]
},
{
@@ -373,7 +374,7 @@
}
]
}
]
},
{
@@ -730,7 +731,7 @@
}
]
}
]
}
]
@@ -774,7 +775,7 @@
"destination": "/en/introduction"
},
{
"source": "/installation",
"source": "/installation",
"destination": "/en/installation"
},
{

View File

@@ -526,6 +526,103 @@ agent = Agent(
The context window management feature works automatically in the background. You don't need to call any special functions - just set `respect_context_window` to your preferred behavior and CrewAI handles the rest!
</Note>
## Direct Agent Interaction with `kickoff()`
Agents can be used directly without going through a task or crew workflow using the `kickoff()` method. This provides a simpler way to interact with an agent when you don't need the full crew orchestration capabilities.
### How `kickoff()` Works
The `kickoff()` method allows you to send messages directly to an agent and get a response, similar to how you would interact with an LLM but with all the agent's capabilities (tools, reasoning, etc.).
```python Code
from crewai import Agent
from crewai_tools import SerperDevTool
# Create an agent
researcher = Agent(
role="AI Technology Researcher",
goal="Research the latest AI developments",
tools=[SerperDevTool()],
verbose=True
)
# Use kickoff() to interact directly with the agent
result = researcher.kickoff("What are the latest developments in language models?")
# Access the raw response
print(result.raw)
```
### Parameters and Return Values
| Parameter | Type | Description |
| :---------------- | :---------------------------------- | :------------------------------------------------------------------------ |
| `messages` | `Union[str, List[Dict[str, str]]]` | Either a string query or a list of message dictionaries with role/content |
| `response_format` | `Optional[Type[Any]]` | Optional Pydantic model for structured output |
The method returns a `LiteAgentOutput` object with the following properties:
- `raw`: String containing the raw output text
- `pydantic`: Parsed Pydantic model (if a `response_format` was provided)
- `agent_role`: Role of the agent that produced the output
- `usage_metrics`: Token usage metrics for the execution
### Structured Output
You can get structured output by providing a Pydantic model as the `response_format`:
```python Code
from pydantic import BaseModel
from typing import List
class ResearchFindings(BaseModel):
main_points: List[str]
key_technologies: List[str]
future_predictions: str
# Get structured output
result = researcher.kickoff(
"Summarize the latest developments in AI for 2025",
response_format=ResearchFindings
)
# Access structured data
print(result.pydantic.main_points)
print(result.pydantic.future_predictions)
```
### Multiple Messages
You can also provide a conversation history as a list of message dictionaries:
```python Code
messages = [
{"role": "user", "content": "I need information about large language models"},
{"role": "assistant", "content": "I'd be happy to help with that! What specifically would you like to know?"},
{"role": "user", "content": "What are the latest developments in 2025?"}
]
result = researcher.kickoff(messages)
```
### Async Support
An asynchronous version is available via `kickoff_async()` with the same parameters:
```python Code
import asyncio
async def main():
result = await researcher.kickoff_async("What are the latest developments in AI?")
print(result.raw)
asyncio.run(main())
```
<Note>
The `kickoff()` method uses a `LiteAgent` internally, which provides a simpler execution flow while preserving all of the agent's configuration (role, goal, backstory, tools, etc.).
</Note>
## Important Considerations and Best Practices
### Security and Code Execution

View File

@@ -0,0 +1,155 @@
---
title: 'Agent Repositories'
description: 'Learn how to use Agent Repositories to share and reuse your agents across teams and projects'
icon: 'database'
---
Agent Repositories allow enterprise users to store, share, and reuse agent definitions across teams and projects. This feature enables organizations to maintain a centralized library of standardized agents, promoting consistency and reducing duplication of effort.
## Benefits of Agent Repositories
- **Standardization**: Maintain consistent agent definitions across your organization
- **Reusability**: Create an agent once and use it in multiple crews and projects
- **Governance**: Implement organization-wide policies for agent configurations
- **Collaboration**: Enable teams to share and build upon each other's work
## Using Agent Repositories
### Prerequisites
1. You must have an account at CrewAI, try the [free plan](https://app.crewai.com).
2. You need to be authenticated using the CrewAI CLI.
3. If you have more than one organization, make sure you are switched to the correct organization using the CLI command:
```bash
crewai org switch <org_id>
```
### Creating and Managing Agents in Repositories
To create and manage agents in repositories,Enterprise Dashboard.
### Loading Agents from Repositories
You can load agents from repositories in your code using the `from_repository` parameter:
```python
from crewai import Agent
# Create an agent by loading it from a repository
# The agent is loaded with all its predefined configurations
researcher = Agent(
from_repository="market-research-agent"
)
```
### Overriding Repository Settings
You can override specific settings from the repository by providing them in the configuration:
```python
researcher = Agent(
from_repository="market-research-agent",
goal="Research the latest trends in AI development", # Override the repository goal
verbose=True # Add a setting not in the repository
)
```
### Example: Creating a Crew with Repository Agents
```python
from crewai import Crew, Agent, Task
# Load agents from repositories
researcher = Agent(
from_repository="market-research-agent"
)
writer = Agent(
from_repository="content-writer-agent"
)
# Create tasks
research_task = Task(
description="Research the latest trends in AI",
agent=researcher
)
writing_task = Task(
description="Write a comprehensive report based on the research",
agent=writer
)
# Create the crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=True
)
# Run the crew
result = crew.kickoff()
```
### Example: Using `kickoff()` with Repository Agents
You can also use repository agents directly with the `kickoff()` method for simpler interactions:
```python
from crewai import Agent
from pydantic import BaseModel
from typing import List
# Define a structured output format
class MarketAnalysis(BaseModel):
key_trends: List[str]
opportunities: List[str]
recommendation: str
# Load an agent from repository
analyst = Agent(
from_repository="market-analyst-agent",
verbose=True
)
# Get a free-form response
result = analyst.kickoff("Analyze the AI market in 2025")
print(result.raw) # Access the raw response
# Get structured output
structured_result = analyst.kickoff(
"Provide a structured analysis of the AI market in 2025",
response_format=MarketAnalysis
)
# Access structured data
print(f"Key Trends: {structured_result.pydantic.key_trends}")
print(f"Recommendation: {structured_result.pydantic.recommendation}")
```
## Best Practices
1. **Naming Convention**: Use clear, descriptive names for your repository agents
2. **Documentation**: Include comprehensive descriptions for each agent
3. **Tool Management**: Ensure that tools referenced by repository agents are available in your environment
4. **Access Control**: Manage permissions to ensure only authorized team members can modify repository agents
## Organization Management
To switch between organizations or see your current organization, use the CrewAI CLI:
```bash
# View current organization
crewai org current
# Switch to a different organization
crewai org switch <org_id>
# List all available organizations
crewai org list
```
<Note>
When loading agents from repositories, you must be authenticated and switched to the correct organization. If you receive errors, check your authentication status and organization settings using the CLI commands above.
</Note>

View File

@@ -83,6 +83,8 @@ dev-dependencies = [
"pytest-recording>=0.13.2",
"pytest-randomly>=3.16.0",
"pytest-timeout>=2.3.1",
"pytest-xdist>=3.6.1",
"pytest-split>=0.9.0",
]
[project.scripts]

View File

@@ -18,6 +18,11 @@ from typing import (
cast,
)
from opentelemetry import baggage
from opentelemetry.context import attach, detach
from crewai.utilities.crew.models import CrewContext
from pydantic import (
UUID4,
BaseModel,
@@ -616,6 +621,11 @@ class Crew(FlowTrackable, BaseModel):
self,
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
ctx = baggage.set_baggage(
"crew_context", CrewContext(id=str(self.id), key=self.key)
)
token = attach(ctx)
try:
for before_callback in self.before_kickoff_callbacks:
if inputs is None:
@@ -676,6 +686,8 @@ class Crew(FlowTrackable, BaseModel):
CrewKickoffFailedEvent(error=str(e), crew_name=self.name or "crew"),
)
raise
finally:
detach(token)
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
"""Executes the Crew's workflow for each input in the list and aggregates results."""

View File

@@ -0,0 +1 @@
"""Crew-specific utilities."""

View File

@@ -0,0 +1,16 @@
"""Context management utilities for tracking crew and task execution context using OpenTelemetry baggage."""
from typing import Optional
from opentelemetry import baggage
from crewai.utilities.crew.models import CrewContext
def get_crew_context() -> Optional[CrewContext]:
"""Get the current crew context from OpenTelemetry baggage.
Returns:
CrewContext instance containing crew context information, or None if no context is set
"""
return baggage.get_baggage("crew_context")

View File

@@ -0,0 +1,16 @@
"""Models for crew-related data structures."""
from typing import Optional
from pydantic import BaseModel, Field
class CrewContext(BaseModel):
"""Model representing crew context information."""
id: Optional[str] = Field(
default=None, description="Unique identifier for the crew"
)
key: Optional[str] = Field(
default=None, description="Optional crew key/name for identification"
)

View File

@@ -1,3 +1,4 @@
from inspect import getsource
from typing import Any, Callable, Optional, Union
from crewai.utilities.events.base_events import BaseEvent
@@ -16,23 +17,26 @@ class LLMGuardrailStartedEvent(BaseEvent):
retry_count: int
def __init__(self, **data):
from inspect import getsource
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.hallucination_guardrail import HallucinationGuardrail
super().__init__(**data)
if isinstance(self.guardrail, LLMGuardrail) or isinstance(
self.guardrail, HallucinationGuardrail
):
if isinstance(self.guardrail, (LLMGuardrail, HallucinationGuardrail)):
self.guardrail = self.guardrail.description.strip()
elif isinstance(self.guardrail, Callable):
self.guardrail = getsource(self.guardrail).strip()
class LLMGuardrailCompletedEvent(BaseEvent):
"""Event emitted when a guardrail task completes"""
"""Event emitted when a guardrail task completes
Attributes:
success: Whether the guardrail validation passed
result: The validation result
error: Error message if validation failed
retry_count: The number of times the guardrail has been retried
"""
type: str = "llm_guardrail_completed"
success: bool

View File

@@ -0,0 +1,226 @@
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Any, Callable
from unittest.mock import patch
import pytest
from crewai import Agent, Crew, Task
from crewai.utilities.crew.crew_context import get_crew_context
@pytest.fixture
def simple_agent_factory():
def create_agent(name: str) -> Agent:
return Agent(
role=f"{name} Agent",
goal=f"Complete {name} task",
backstory=f"I am agent for {name}",
)
return create_agent
@pytest.fixture
def simple_task_factory():
def create_task(name: str, callback: Callable = None) -> Task:
return Task(
description=f"Task for {name}", expected_output="Done", callback=callback
)
return create_task
@pytest.fixture
def crew_factory(simple_agent_factory, simple_task_factory):
def create_crew(name: str, task_callback: Callable = None) -> Crew:
agent = simple_agent_factory(name)
task = simple_task_factory(name, callback=task_callback)
task.agent = agent
return Crew(agents=[agent], tasks=[task], verbose=False)
return create_crew
class TestCrewThreadSafety:
@patch("crewai.Agent.execute_task")
def test_parallel_crews_thread_safety(self, mock_execute_task, crew_factory):
mock_execute_task.return_value = "Task completed"
num_crews = 5
def run_crew_with_context_check(crew_id: str) -> Dict[str, Any]:
results = {"crew_id": crew_id, "contexts": []}
def check_context_task(output):
context = get_crew_context()
results["contexts"].append(
{
"stage": "task_callback",
"crew_id": context.id if context else None,
"crew_key": context.key if context else None,
"thread": threading.current_thread().name,
}
)
return output
context_before = get_crew_context()
results["contexts"].append(
{
"stage": "before_kickoff",
"crew_id": context_before.id if context_before else None,
"thread": threading.current_thread().name,
}
)
crew = crew_factory(crew_id, task_callback=check_context_task)
output = crew.kickoff()
context_after = get_crew_context()
results["contexts"].append(
{
"stage": "after_kickoff",
"crew_id": context_after.id if context_after else None,
"thread": threading.current_thread().name,
}
)
results["crew_uuid"] = str(crew.id)
results["output"] = output.raw
return results
with ThreadPoolExecutor(max_workers=num_crews) as executor:
futures = []
for i in range(num_crews):
future = executor.submit(run_crew_with_context_check, f"crew_{i}")
futures.append(future)
results = [f.result() for f in futures]
for result in results:
crew_uuid = result["crew_uuid"]
before_ctx = next(
ctx for ctx in result["contexts"] if ctx["stage"] == "before_kickoff"
)
assert (
before_ctx["crew_id"] is None
), f"Context should be None before kickoff for {result['crew_id']}"
task_ctx = next(
ctx for ctx in result["contexts"] if ctx["stage"] == "task_callback"
)
assert (
task_ctx["crew_id"] == crew_uuid
), f"Context mismatch during task for {result['crew_id']}"
after_ctx = next(
ctx for ctx in result["contexts"] if ctx["stage"] == "after_kickoff"
)
assert (
after_ctx["crew_id"] is None
), f"Context should be None after kickoff for {result['crew_id']}"
thread_name = before_ctx["thread"]
assert (
"ThreadPoolExecutor" in thread_name
), f"Should run in thread pool for {result['crew_id']}"
@pytest.mark.asyncio
@patch("crewai.Agent.execute_task")
async def test_async_crews_thread_safety(self, mock_execute_task, crew_factory):
mock_execute_task.return_value = "Task completed"
num_crews = 5
async def run_crew_async(crew_id: str) -> Dict[str, Any]:
task_context = {"crew_id": crew_id, "context": None}
def capture_context(output):
ctx = get_crew_context()
task_context["context"] = {
"crew_id": ctx.id if ctx else None,
"crew_key": ctx.key if ctx else None,
}
return output
crew = crew_factory(crew_id, task_callback=capture_context)
output = await crew.kickoff_async()
return {
"crew_id": crew_id,
"crew_uuid": str(crew.id),
"output": output.raw,
"task_context": task_context,
}
tasks = [run_crew_async(f"async_crew_{i}") for i in range(num_crews)]
results = await asyncio.gather(*tasks)
for result in results:
crew_uuid = result["crew_uuid"]
task_ctx = result["task_context"]["context"]
assert (
task_ctx is not None
), f"Context should exist during task for {result['crew_id']}"
assert (
task_ctx["crew_id"] == crew_uuid
), f"Context mismatch for {result['crew_id']}"
@patch("crewai.Agent.execute_task")
def test_concurrent_kickoff_for_each(self, mock_execute_task, crew_factory):
mock_execute_task.return_value = "Task completed"
contexts_captured = []
def capture_context(output):
ctx = get_crew_context()
contexts_captured.append(
{
"context_id": ctx.id if ctx else None,
"thread": threading.current_thread().name,
}
)
return output
crew = crew_factory("for_each_test", task_callback=capture_context)
inputs = [{"item": f"input_{i}"} for i in range(3)]
results = crew.kickoff_for_each(inputs=inputs)
assert len(results) == len(inputs)
assert len(contexts_captured) == len(inputs)
context_ids = [ctx["context_id"] for ctx in contexts_captured]
assert len(set(context_ids)) == len(
inputs
), "Each execution should have unique context"
@patch("crewai.Agent.execute_task")
def test_no_context_leakage_between_crews(self, mock_execute_task, crew_factory):
mock_execute_task.return_value = "Task completed"
contexts = []
def check_context(output):
ctx = get_crew_context()
contexts.append(
{
"context_id": ctx.id if ctx else None,
"context_key": ctx.key if ctx else None,
}
)
return output
def run_crew(name: str):
crew = crew_factory(name, task_callback=check_context)
crew.kickoff()
return str(crew.id)
crew1_id = run_crew("First")
crew2_id = run_crew("Second")
assert len(contexts) == 2
assert contexts[0]["context_id"] == crew1_id
assert contexts[1]["context_id"] == crew2_id
assert contexts[0]["context_id"] != contexts[1]["context_id"]

View File

View File

@@ -0,0 +1,88 @@
import uuid
import pytest
from opentelemetry import baggage
from opentelemetry.context import attach, detach
from crewai.utilities.crew.crew_context import get_crew_context
from crewai.utilities.crew.models import CrewContext
def test_crew_context_creation():
crew_id = str(uuid.uuid4())
context = CrewContext(id=crew_id, key="test-crew")
assert context.id == crew_id
assert context.key == "test-crew"
def test_get_crew_context_with_baggage():
crew_id = str(uuid.uuid4())
assert get_crew_context() is None
crew_ctx = CrewContext(id=crew_id, key="test-key")
ctx = baggage.set_baggage("crew_context", crew_ctx)
token = attach(ctx)
try:
context = get_crew_context()
assert context is not None
assert context.id == crew_id
assert context.key == "test-key"
finally:
detach(token)
assert get_crew_context() is None
def test_get_crew_context_empty():
assert get_crew_context() is None
def test_baggage_nested_contexts():
crew_id1 = str(uuid.uuid4())
crew_id2 = str(uuid.uuid4())
crew_ctx1 = CrewContext(id=crew_id1, key="outer")
ctx1 = baggage.set_baggage("crew_context", crew_ctx1)
token1 = attach(ctx1)
try:
outer_context = get_crew_context()
assert outer_context.id == crew_id1
assert outer_context.key == "outer"
crew_ctx2 = CrewContext(id=crew_id2, key="inner")
ctx2 = baggage.set_baggage("crew_context", crew_ctx2)
token2 = attach(ctx2)
try:
inner_context = get_crew_context()
assert inner_context.id == crew_id2
assert inner_context.key == "inner"
finally:
detach(token2)
restored_context = get_crew_context()
assert restored_context.id == crew_id1
assert restored_context.key == "outer"
finally:
detach(token1)
assert get_crew_context() is None
def test_baggage_exception_handling():
crew_id = str(uuid.uuid4())
crew_ctx = CrewContext(id=crew_id, key="test")
ctx = baggage.set_baggage("crew_context", crew_ctx)
token = attach(ctx)
with pytest.raises(ValueError):
try:
assert get_crew_context() is not None
raise ValueError("Test exception")
finally:
detach(token)
assert get_crew_context() is None