Compare commits

..

16 Commits

Author SHA1 Message Date
Devin AI
ce13217f0c fix: Add final type casting for remaining type-checker issues
- Add cast import and type casting in tool_utils.py for ToolUsage constructor
- Add cast import and type casting in lite_agent.py for execute_tool_and_check_finality call

These changes complete the type-checker fixes for hosted tools implementation.

Co-Authored-By: João <joao@crewai.com>
2025-08-18 14:19:38 +00:00
Devin AI
e5e6b11909 fix: Resolve final type-checker errors for hosted tools implementation
- Add cast imports and proper type casting in agent.py
- Fix Liskov substitution principle violations in langgraph_adapter.py
- Add type ignore for lite_agent.py tool parameter mismatch
- Filter tools properly in tool_utils.py for ToolUsage constructor

All type-checker issues verified locally before pushing.

Co-Authored-By: João <joao@crewai.com>
2025-08-18 14:17:24 +00:00
Devin AI
5495cccf2f fix: Resolve remaining type-checker and lint issues
- Fix ToolUsage constructor to only accept executable tools
- Remove unused imports from langgraph adapter
- Filter tools correctly for mixed type handling

Co-Authored-By: João <joao@crewai.com>
2025-08-18 14:07:07 +00:00
Devin AI
d8eaadca47 fix: Complete hosted tools implementation with remaining changes
- Finalize base_agent.py and agent_utils.py updates
- Include comprehensive test suite for hosted tools
- Update lock file with dependencies

Co-Authored-By: João <joao@crewai.com>
2025-08-18 14:03:52 +00:00
Devin AI
be09b519c4 fix: Resolve type-checker errors for mixed tool types
- Add Union types to handle both BaseTool and dict in tool parameters
- Update tool utility functions to handle mixed types safely
- Filter raw tool definitions in agent adapters
- Fix experimental evaluation metrics to handle mixed tool types
- Maintain backward compatibility while adding hosted tools support

Co-Authored-By: João <joao@crewai.com>
2025-08-18 14:03:40 +00:00
Devin AI
95d3b5dbc3 feat: Add support for hosted/server-side tools in agents
- Allow agents to accept both CrewAI tools and raw tool definitions (dicts)
- Raw tool definitions are passed through unchanged to LLM providers
- Maintain backward compatibility with existing CrewAI tools
- Update BaseAgent.validate_tools() to handle dict tool definitions
- Update parse_tools() to preserve raw tool definitions
- Update utility functions to handle mixed tool types
- Add comprehensive tests for new functionality

Fixes #3338

Co-Authored-By: João <joao@crewai.com>
2025-08-18 13:56:38 +00:00
Greyson LaLonde
947c9552f0 chore: remove AgentOps integration (#3334)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-08-17 23:07:41 -04:00
Lorenze Jay
04a03d332f Lorenze/emphemeral tracing (#3323)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* for ephemeral traces

* default false

* simpler and consolidated

* keep raising exception but catch it and continue if its for trace batches

* cleanup

* more cleanup

* not using logger

* refactor: rename TEMP_TRACING_RESOURCE to EPHEMERAL_TRACING_RESOURCE for clarity and consistency in PlusAPI; update related method calls accordingly

* default true

* drop print
2025-08-15 13:37:16 -07:00
Vidit Ostwal
992e093610 Update Docs: Added Mem0 integration with Short Term and Entity Memory (#3293)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Added Mem0 integration with Short Term and Entity Memory

* Flaky test case of telemetry
2025-08-14 22:50:24 -04:00
Lucas Gomide
07f8e73958 feat: include exchanged agent messages into ExternalMemory metadata (#3290)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
2025-08-14 09:41:09 -04:00
Lorenze Jay
66c2fa1623 chore: update crewAI and tools dependencies to version 0.159.0 and 0.62.0 respectively (#3318)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Bump crewAI version from 0.157.0 to 0.159.0
- Update tools dependency from 0.60.0 to 0.62.0 in pyproject.toml and uv.lock
- Ensure compatibility with the latest features and improvements in the tools package
2025-08-13 16:52:58 -07:00
Greyson LaLonde
7a52cc9667 fix: comment out listener resumability check (#3316) 2025-08-13 19:04:16 -04:00
Greyson LaLonde
8b686fb0c6 feat: add flow resumability support (#3312)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
- Add reload() method to restore flow state from execution data
- Add FlowExecutionData type definitions
- Track completed methods for proper flow resumption
- Support OpenTelemetry baggage context for flow inputs
2025-08-13 13:45:08 -04:00
Tony Kipkemboi
dc6771ae95 docs: Fix API Reference, add RBAC, revamp Examples/Cookbooks (EN/PT-BR/KO) (#3314)
* docs: add RBAC docs and other chores

* docs: fix API Reference rendering; per-locale OpenAPI; add Enterprise RBAC; restructure Examples (EN/PT-BR/KO) + Cookbooks; update nav and links

* docs(i18n): add RBAC docs for pt-BR and ko; update Enterprise Features nav
2025-08-13 13:13:24 -04:00
Tony Kipkemboi
e9b1e5a8f6 docs: add RBAC docs and other chores (#3313) 2025-08-13 12:08:42 -04:00
rishiraj
57c787f919 Docs update/add truefoundry (#3245)
* Add TrueFoundry observability integration documentation

- Added comprehensive TrueFoundry integration guide for CrewAI
- Included AI Gateway overview with key features
- Added technical architecture details for Traceloop SDK integration
- Provided step-by-step setup instructions
- Added advanced configuration examples
- Included tracing dashboard screenshot
- Added support contact and documentation links

* Update TrueFoundry integration documentation

Major improvements and fixes:
- Fixed integration pattern to follow LLM provider approach (base_url + api_key)
- Added technical architecture details showing LLM provider and observability flows
- Updated model names to use correct TrueFoundry format (openai-main/gpt-4o, anthropic/claude-3.5-sonnet)
- Added unified-code-tfy.png image for visual code example
- Reorganized document structure with better section placement
- Moved Additional Tracing section to better position
- Added link to TrueFoundry quick start guide
- Added comprehensive observability details and dashboard explanation
- Removed complex tracing setup in favor of simpler LLM provider integration

* Finalize TrueFoundry integration documentation

Key improvements:
- Updated base_url references to use placeholder from code snippet
- Added gateway-metrics.png image for observability dashboard
- Formatted metrics description with proper bullet points and bold headers
- Added link to TrueFoundry tracing overview documentation
- Improved readability and consistency throughout the documentation
- Updated Portuguese translation (pt-BR) version

* added truefoundry.mdx

* updated tfy mdx

* Update docs/en/observability/truefoundry.mdx

Co-authored-by: Nikhil Popli <97437109+nikp1172@users.noreply.github.com>

* Update truefoundry.mdx

* Update truefoundry.mdx

-minor updates

* Update truefoundry.mdx

* updated truefoundry.mdx PT-BR

---------

Co-authored-by: Nikhil Popli <97437109+nikp1172@users.noreply.github.com>
2025-08-13 10:08:15 -04:00
80 changed files with 6341 additions and 6152 deletions

1
.gitignore vendored
View File

@@ -21,7 +21,6 @@ crew_tasks_output.json
.mypy_cache
.ruff_cache
.venv
agentops.log
test_flow.html
crewairules.mdc
plan.md

View File

@@ -226,7 +226,6 @@
"group": "Observability",
"pages": [
"en/observability/overview",
"en/observability/agentops",
"en/observability/arize-phoenix",
"en/observability/langdb",
"en/observability/langfuse",
@@ -238,7 +237,8 @@
"en/observability/opik",
"en/observability/patronus-evaluation",
"en/observability/portkey",
"en/observability/weave"
"en/observability/weave",
"en/observability/truefoundry"
]
},
{
@@ -281,6 +281,7 @@
{
"group": "Features",
"pages": [
"en/enterprise/features/rbac",
"en/enterprise/features/tool-repository",
"en/enterprise/features/webhook-streaming",
"en/enterprise/features/traces",
@@ -344,7 +345,7 @@
},
{
"group": "Endpoints",
"openapi": "enterprise-api.yaml"
"openapi": "https://raw.githubusercontent.com/crewAIInc/crewAI/main/docs/enterprise-api.en.yaml"
}
]
},
@@ -353,7 +354,7 @@
"groups": [
{
"group": "Examples",
"pages": ["en/examples/example"]
"pages": ["en/examples/example", "en/examples/cookbooks"]
}
]
}
@@ -564,7 +565,6 @@
"group": "Observabilidade",
"pages": [
"pt-BR/observability/overview",
"pt-BR/observability/agentops",
"pt-BR/observability/arize-phoenix",
"pt-BR/observability/langdb",
"pt-BR/observability/langfuse",
@@ -575,7 +575,8 @@
"pt-BR/observability/opik",
"pt-BR/observability/patronus-evaluation",
"pt-BR/observability/portkey",
"pt-BR/observability/weave"
"pt-BR/observability/weave",
"pt-BR/observability/truefoundry"
]
},
{
@@ -618,6 +619,7 @@
{
"group": "Funcionalidades",
"pages": [
"pt-BR/enterprise/features/rbac",
"pt-BR/enterprise/features/tool-repository",
"pt-BR/enterprise/features/webhook-streaming",
"pt-BR/enterprise/features/traces",
@@ -682,7 +684,7 @@
},
{
"group": "Endpoints",
"openapi": "enterprise-api.yaml"
"openapi": "https://raw.githubusercontent.com/crewAIInc/crewAI/main/docs/enterprise-api.pt-BR.yaml"
}
]
},
@@ -691,7 +693,7 @@
"groups": [
{
"group": "Exemplos",
"pages": ["pt-BR/examples/example"]
"pages": ["pt-BR/examples/example", "pt-BR/examples/cookbooks"]
}
]
}
@@ -910,7 +912,6 @@
"group": "오브저버빌리티",
"pages": [
"ko/observability/overview",
"ko/observability/agentops",
"ko/observability/arize-phoenix",
"ko/observability/langdb",
"ko/observability/langfuse",
@@ -965,6 +966,7 @@
{
"group": "특징",
"pages": [
"ko/enterprise/features/rbac",
"ko/enterprise/features/tool-repository",
"ko/enterprise/features/webhook-streaming",
"ko/enterprise/features/traces",
@@ -1028,7 +1030,7 @@
},
{
"group": "Endpoints",
"openapi": "enterprise-api.yaml"
"openapi": "https://raw.githubusercontent.com/crewAIInc/crewAI/main/docs/enterprise-api.ko.yaml"
}
]
},
@@ -1037,7 +1039,7 @@
"groups": [
{
"group": "예시",
"pages": ["ko/examples/example"]
"pages": ["ko/examples/example", "ko/examples/cookbooks"]
}
]
}

View File

@@ -177,14 +177,7 @@ class MyCustomCrew:
# Your crew implementation...
```
This is exactly how CrewAI's built-in `agentops_listener` is registered. In the CrewAI codebase, you'll find:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
This is how third-party event listeners are registered in the CrewAI codebase.
## Available Event Types
@@ -280,77 +273,6 @@ The structure of the event object depends on the event type, but all events inhe
Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` includes `crew_name` and `output` fields.
## Real-World Example: Integration with AgentOps
CrewAI includes an example of a third-party integration with [AgentOps](https://github.com/AgentOps-AI/agentops), a monitoring and observability platform for AI agents. Here's how it's implemented:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
This listener initializes an AgentOps session when a Crew starts, registers agents with AgentOps, tracks tool usage, and ends the session when the Crew completes.
The AgentOps listener is registered in CrewAI's event system through the import in `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
## Advanced Usage: Scoped Handlers

View File

@@ -539,16 +539,71 @@ crew = Crew(
)
```
### Mem0 Provider
Short-Term Memory and Entity Memory both supports a tight integration with both Mem0 OSS and Mem0 Client as a provider. Here is how you can use Mem0 as a provider.
```python
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.entity_entity_memory import EntityMemory
mem0_oss_embedder_config = {
"provider": "mem0",
"config": {
"user_id": "john",
"local_mem0_config": {
"vector_store": {"provider": "qdrant","config": {"host": "localhost", "port": 6333}},
"llm": {"provider": "openai","config": {"api_key": "your-api-key", "model": "gpt-4"}},
"embedder": {"provider": "openai","config": {"api_key": "your-api-key", "model": "text-embedding-3-small"}}
},
"infer": True # Optional defaults to True
},
}
mem0_client_embedder_config = {
"provider": "mem0",
"config": {
"user_id": "john",
"org_id": "my_org_id", # Optional
"project_id": "my_project_id", # Optional
"api_key": "custom-api-key" # Optional - overrides env var
"run_id": "my_run_id", # Optional - for short-term memory
"includes": "include1", # Optional
"excludes": "exclude1", # Optional
"infer": True # Optional defaults to True
"custom_categories": new_categories # Optional - custom categories for user memory
},
}
short_term_memory_mem0_oss = ShortTermMemory(embedder_config=mem0_oss_embedder_config) # Short Term Memory with Mem0 OSS
short_term_memory_mem0_client = ShortTermMemory(embedder_config=mem0_client_embedder_config) # Short Term Memory with Mem0 Client
entity_memory_mem0_oss = EntityMemory(embedder_config=mem0_oss_embedder_config) # Entity Memory with Mem0 OSS
entity_memory_mem0_client = EntityMemory(embedder_config=mem0_client_embedder_config) # Short Term Memory with Mem0 Client
crew = Crew(
memory=True,
short_term_memory=short_term_memory_mem0_oss, # or short_term_memory_mem0_client
entity_memory=entity_memory_mem0_oss # or entity_memory_mem0_client
)
```
### Choosing the Right Embedding Provider
| Provider | Best For | Pros | Cons |
|:---------|:----------|:------|:------|
| **OpenAI** | General use, reliability | High quality, well-tested | Cost, requires API key |
| **Ollama** | Privacy, cost savings | Free, local, private | Requires local setup |
| **Google AI** | Google ecosystem | Good performance | Requires Google account |
| **Azure OpenAI** | Enterprise, compliance | Enterprise features | Complex setup |
| **Cohere** | Multilingual content | Great language support | Specialized use case |
| **VoyageAI** | Retrieval tasks | Optimized for search | Newer provider |
When selecting an embedding provider, consider factors like performance, privacy, cost, and integration needs.
Below is a comparison to help you decide:
| Provider | Best For | Pros | Cons |
| -------------- | ------------------------------ | --------------------------------- | ------------------------- |
| **OpenAI** | General use, high reliability | High quality, widely tested | Paid service, API key required |
| **Ollama** | Privacy-focused, cost savings | Free, runs locally, fully private | Requires local installation/setup |
| **Google AI** | Integration in Google ecosystem| Strong performance, good support | Google account required |
| **Azure OpenAI** | Enterprise & compliance needs| Enterprise-grade features, security | More complex setup process |
| **Cohere** | Multilingual content handling | Excellent language support | More niche use cases |
| **VoyageAI** | Information retrieval & search | Optimized for retrieval tasks | Relatively new provider |
| **Mem0** | Per-user personalization | Search-optimized embeddings | Paid service, API key required |
### Environment Variable Configuration

View File

@@ -0,0 +1,103 @@
---
title: "Role-Based Access Control (RBAC)"
description: "Control access to crews, tools, and data with roles, scopes, and granular permissions."
icon: "shield"
---
## Overview
RBAC in CrewAI Enterprise enables secure, scalable access management through a combination of organizationlevel roles and automationlevel visibility controls.
<Frame>
<img src="/images/enterprise/users_and_roles.png" alt="RBAC overview in CrewAI Enterprise" />
</Frame>
## Users and Roles
Each member in your CrewAI workspace is assigned a role, which determines their access across various features.
You can:
- Use predefined roles (Owner, Member)
- Create custom roles tailored to specific permissions
- Assign roles at any time through the settings panel
You can configure users and roles in Settings → Roles.
<Steps>
<Step title="Open Roles settings">
Go to <b>Settings → Roles</b> in CrewAI Enterprise.
</Step>
<Step title="Choose a role type">
Use a predefined role (<b>Owner</b>, <b>Member</b>) or click <b>Create role</b> to define a custom one.
</Step>
<Step title="Assign to members">
Select users and assign the role. You can change this anytime.
</Step>
</Steps>
### Configuration summary
| Area | Where to configure | Options |
|:---|:---|:---|
| Users & Roles | Settings → Roles | Predefined: Owner, Member; Custom roles |
| Automation visibility | Automation → Settings → Visibility | Private; Whitelist users/roles |
## Automationlevel Access Control
In addition to organizationwide roles, CrewAI Automations support finegrained visibility settings that let you restrict access to specific automations by user or role.
This is useful for:
- Keeping sensitive or experimental automations private
- Managing visibility across large teams or external collaborators
- Testing automations in isolated contexts
Deployments can be configured as private, meaning only whitelisted users and roles will be able to:
- View the deployment
- Run it or interact with its API
- Access its logs, metrics, and settings
The organization owner always has access, regardless of visibility settings.
You can configure automationlevel access control in Automation → Settings → Visibility tab.
<Steps>
<Step title="Open Visibility tab">
Navigate to <b>Automation → Settings → Visibility</b>.
</Step>
<Step title="Set visibility">
Choose <b>Private</b> to restrict access. The organization owner always retains access.
</Step>
<Step title="Whitelist access">
Add specific users and roles allowed to view, run, and access logs/metrics/settings.
</Step>
<Step title="Save and verify">
Save changes, then confirm that nonwhitelisted users cannot view or run the automation.
</Step>
</Steps>
### Private visibility: access outcomes
| Action | Owner | Whitelisted user/role | Not whitelisted |
|:---|:---|:---|:---|
| View automation | ✓ | ✓ | ✗ |
| Run automation/API | ✓ | ✓ | ✗ |
| Access logs/metrics/settings | ✓ | ✓ | ✗ |
<Tip>
The organization owner always has access. In private mode, only whitelisted users and roles can view, run, and access logs/metrics/settings.
</Tip>
<Frame>
<img src="/images/enterprise/visibility.png" alt="Automation Visibility settings in CrewAI Enterprise" />
</Frame>
<Card title="Need Help?" icon="headset" href="mailto:support@crewai.com">
Contact our support team for assistance with RBAC questions.
</Card>

View File

@@ -0,0 +1,22 @@
---
title: CrewAI Cookbooks
description: Feature-focused quickstarts and notebooks for learning patterns fast.
icon: book
---
## Quickstarts & Demos
<CardGroup cols={2}>
<Card title="Task Guardrails" icon="shield-check" href="https://github.com/crewAIInc/crewAI-quickstarts/tree/main/Task%20Guardrails">
Interactive notebooks for hands-on exploration.
</Card>
<Card title="Browse Quickstarts" icon="bolt" href="https://github.com/crewAIInc/crewAI-quickstarts">
Feature demos and small projects showcasing specific CrewAI capabilities.
</Card>
</CardGroup>
<Tip>
Use Cookbooks to learn a pattern quickly, then jump to Full Examples for productiongrade implementations.
</Tip>

View File

@@ -1,62 +1,85 @@
---
title: CrewAI Examples
description: A collection of examples that show how to use CrewAI framework to automate workflows.
description: Explore curated examples organized by Crews, Flows, Integrations, and Notebooks.
icon: rocket-launch
---
## Crews
<CardGroup cols={3}>
<Card
title="Marketing Strategy"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/marketing_strategy"
icon="bullhorn"
iconType="solid"
>
Automate marketing strategy creation with CrewAI.
<Card title="Marketing Strategy" icon="bullhorn" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/marketing_strategy">
Multiagent marketing campaign planning.
</Card>
<Card title="Surprise Trip" icon="plane" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/surprise_trip">
Personalized surprise travel planning.
</Card>
<Card title="Match Profile to Positions" icon="id-card" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/match_profile_to_positions">
CVtojob matching with vector search.
</Card>
<Card title="Job Posting" icon="newspaper" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/job-posting">
Automated job description creation.
</Card>
<Card title="Game Builder Crew" icon="gamepad" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/game-builder-crew">
Multiagent team that designs and builds Python games.
</Card>
<Card title="Recruitment" icon="user-group" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/recruitment">
Candidate sourcing and evaluation.
</Card>
<Card title="Browse all Crews" icon="users" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews">
See the full list of crew examples.
</Card>
</CardGroup>
## Flows
<CardGroup cols={3}>
<Card title="Content Creator Flow" icon="pen" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/content_creator_flow">
Multicrew content generation with routing.
</Card>
<Card title="Email Auto Responder" icon="envelope" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/email_auto_responder_flow">
Automated email monitoring and replies.
</Card>
<Card title="Lead Score Flow" icon="chart-line" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/lead_score_flow">
Lead qualification with humanintheloop.
</Card>
<Card title="Meeting Assistant Flow" icon="calendar" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/meeting_assistant_flow">
Notes processing with integrations.
</Card>
<Card title="Self Evaluation Loop" icon="rotate" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/self_evaluation_loop_flow">
Iterative selfimprovement workflows.
</Card>
<Card title="Write a Book (Flows)" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/write_a_book_with_flows">
Parallel chapter generation.
</Card>
<Card title="Browse all Flows" icon="diagram-project" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows">
See the full list of flow examples.
</Card>
</CardGroup>
## Integrations
<CardGroup cols={3}>
<Card title="CrewAI ↔ LangGraph" icon="link" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/crewai-langgraph">
Integration with LangGraph framework.
</Card>
<Card title="Azure OpenAI" icon="cloud" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/azure_model">
Using CrewAI with Azure OpenAI.
</Card>
<Card title="NVIDIA Models" icon="microchip" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/nvidia_models">
NVIDIA ecosystem integrations.
</Card>
<Card title="Browse Integrations" icon="puzzle-piece" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations">
See all integration examples.
</Card>
</CardGroup>
## Notebooks
<CardGroup cols={2}>
<Card title="Simple QA Crew + Flow" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/Simple%20QA%20Crew%20%2B%20Flow">
Simple QA Crew + Flow.
</Card>
<Card title="All Notebooks" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks">
Interactive examples for learning and experimentation.
</Card>
<Card
title="Surprise Trip"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/surprise_trip"
icon="plane"
iconType="duotone"
>
Create a surprise trip itinerary with CrewAI.
</Card>
<Card
title="Match Profile to Positions"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/match_profile_to_positions"
icon="linkedin"
iconType="duotone"
>
Match a profile to jobpositions with CrewAI.
</Card>
<Card
title="Create Job Posting"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/job-posting"
icon="newspaper"
iconType="duotone"
>
Create a job posting with CrewAI.
</Card>
<Card
title="Game Generator"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/game-builder-crew"
icon="gamepad"
iconType="duotone"
>
Create a game with CrewAI.
</Card>
<Card
title="Find Job Candidates"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/recruitment"
icon="user-group"
iconType="duotone"
>
Find job candidates with CrewAI.
</Card>
</CardGroup>

View File

@@ -1,126 +0,0 @@
---
title: AgentOps Integration
description: Understanding and logging your agent performance with AgentOps.
icon: paperclip
---
# Introduction
Observability is a key aspect of developing and deploying conversational AI agents. It allows developers to understand how their agents are performing,
how their agents are interacting with users, and how their agents use external tools and APIs.
AgentOps is a product independent of CrewAI that provides a comprehensive observability solution for agents.
## AgentOps
[AgentOps](https://agentops.ai/?=crew) provides session replays, metrics, and monitoring for agents.
At a high level, AgentOps gives you the ability to monitor cost, token usage, latency, agent failures, session-wide statistics, and more.
For more info, check out the [AgentOps Repo](https://github.com/AgentOps-AI/agentops).
### Overview
AgentOps provides monitoring for agents in development and production.
It provides a dashboard for tracking agent performance, session replays, and custom reporting.
Additionally, AgentOps provides session drilldowns for viewing Crew agent interactions, LLM calls, and tool usage in real-time.
This feature is useful for debugging and understanding how agents interact with users as well as other agents.
![Overview of a select series of agent session runs](/images/agentops-overview.png)
![Overview of session drilldowns for examining agent runs](/images/agentops-session.png)
![Viewing a step-by-step agent replay execution graph](/images/agentops-replay.png)
### Features
- **LLM Cost Management and Tracking**: Track spend with foundation model providers.
- **Replay Analytics**: Watch step-by-step agent execution graphs.
- **Recursive Thought Detection**: Identify when agents fall into infinite loops.
- **Custom Reporting**: Create custom analytics on agent performance.
- **Analytics Dashboard**: Monitor high-level statistics about agents in development and production.
- **Public Model Testing**: Test your agents against benchmarks and leaderboards.
- **Custom Tests**: Run your agents against domain-specific tests.
- **Time Travel Debugging**: Restart your sessions from checkpoints.
- **Compliance and Security**: Create audit logs and detect potential threats such as profanity and PII leaks.
- **Prompt Injection Detection**: Identify potential code injection and secret leaks.
### Using AgentOps
<Steps>
<Step title="Create an API Key">
Create a user API key here: [Create API Key](https://app.agentops.ai/account)
</Step>
<Step title="Configure Your Environment">
Add your API key to your environment variables:
```bash
AGENTOPS_API_KEY=<YOUR_AGENTOPS_API_KEY>
```
</Step>
<Step title="Install AgentOps">
Install AgentOps with:
```bash
pip install 'crewai[agentops]'
```
or
```bash
pip install agentops
```
</Step>
<Step title="Initialize AgentOps">
Before using `Crew` in your script, include these lines:
```python
import agentops
agentops.init()
```
This will initiate an AgentOps session as well as automatically track Crew agents. For further info on how to outfit more complex agentic systems,
check out the [AgentOps documentation](https://docs.agentops.ai) or join the [Discord](https://discord.gg/j4f3KbeH).
</Step>
</Steps>
### Crew + AgentOps Examples
<CardGroup cols={3}>
<Card
title="Job Posting"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/job-posting"
icon="briefcase"
iconType="solid"
>
Example of a Crew agent that generates job posts.
</Card>
<Card
title="Markdown Validator"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/markdown_validator"
icon="markdown"
iconType="solid"
>
Example of a Crew agent that validates Markdown files.
</Card>
<Card
title="Instagram Post"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/instagram_post"
icon="square-instagram"
iconType="brands"
>
Example of a Crew agent that generates Instagram posts.
</Card>
</CardGroup>
### Further Information
To get started, create an [AgentOps account](https://agentops.ai/?=crew).
For feature requests or bug reports, please reach out to the AgentOps team on the [AgentOps Repo](https://github.com/AgentOps-AI/agentops).
#### Extra links
<a href="https://twitter.com/agentopsai/">🐦 Twitter</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://discord.gg/JHPt4C7r">📢 Discord</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://app.agentops.ai/?=crew">🖇️ AgentOps Dashboard</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://docs.agentops.ai/introduction">📙 Documentation</a>

View File

@@ -21,9 +21,6 @@ Observability is crucial for understanding how your CrewAI agents perform, ident
### Monitoring & Tracing Platforms
<CardGroup cols={2}>
<Card title="AgentOps" icon="paperclip" href="/en/observability/agentops">
Session replays, metrics, and monitoring for agent development and production.
</Card>
<Card title="LangDB" icon="database" href="/en/observability/langdb">
End-to-end tracing for CrewAI workflows with automatic agent interaction capture.

View File

@@ -0,0 +1,146 @@
---
title: TrueFoundry Integration
icon: chart-line
---
TrueFoundry provides an enterprise-ready [AI Gateway](https://www.truefoundry.com/ai-gateway) which can integrate with agentic frameworks like CrewAI and provides governance and observability for your AI Applications. TrueFoundry AI Gateway serves as a unified interface for LLM access, providing:
- **Unified API Access**: Connect to 250+ LLMs (OpenAI, Claude, Gemini, Groq, Mistral) through one API
- **Low Latency**: Sub-3ms internal latency with intelligent routing and load balancing
- **Enterprise Security**: SOC 2, HIPAA, GDPR compliance with RBAC and audit logging
- **Quota and cost management**: Token-based quotas, rate limiting, and comprehensive usage tracking
- **Observability**: Full request/response logging, metrics, and traces with customizable retention
## How TrueFoundry Integrates with CrewAI
### Installation & Setup
<Steps>
<Step title="Install CrewAI">
```bash
pip install crewai
```
</Step>
<Step title="Get TrueFoundry Access Token">
1. Sign up for a [TrueFoundry account](https://www.truefoundry.com/register)
2. Follow the steps here in [Quick start](https://docs.truefoundry.com/gateway/quick-start)
</Step>
<Step title="Configure CrewAI with TrueFoundry">
![TrueFoundry Code Configuration](/images/new-code-snippet.png)
```python
from crewai import LLM
# Create an LLM instance with TrueFoundry AI Gateway
truefoundry_llm = LLM(
model="openai-main/gpt-4o", # Similarly, you can call any model from any provider
base_url="your_truefoundry_gateway_base_url",
api_key="your_truefoundry_api_key"
)
# Use in your CrewAI agents
from crewai import Agent
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
llm=truefoundry_llm,
verbose=True
)
```
</Step>
</Steps>
### Complete CrewAI Example
```python
from crewai import Agent, Task, Crew, LLM
# Configure LLM with TrueFoundry
llm = LLM(
model="openai-main/gpt-4o",
base_url="your_truefoundry_gateway_base_url",
api_key="your_truefoundry_api_key"
)
# Create agents
researcher = Agent(
role='Research Analyst',
goal='Conduct detailed market research',
backstory='Expert market analyst with attention to detail',
llm=llm,
verbose=True
)
writer = Agent(
role='Content Writer',
goal='Create comprehensive reports',
backstory='Experienced technical writer',
llm=llm,
verbose=True
)
# Create tasks
research_task = Task(
description='Research AI market trends for 2024',
agent=researcher,
expected_output='Comprehensive research summary'
)
writing_task = Task(
description='Create a market research report',
agent=writer,
expected_output='Well-structured report with insights',
context=[research_task]
)
# Create and execute crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=True
)
result = crew.kickoff()
```
### Observability and Governance
Monitor your CrewAI agents through TrueFoundry's metrics tab:
![TrueFoundry metrics](/images/gateway-metrics.png)
With Truefoundry's AI gateway, you can monitor and analyze:
- **Performance Metrics**: Track key latency metrics like Request Latency, Time to First Token (TTFS), and Inter-Token Latency (ITL) with P99, P90, and P50 percentiles
- **Cost and Token Usage**: Gain visibility into your application's costs with detailed breakdowns of input/output tokens and the associated expenses for each model
- **Usage Patterns**: Understand how your application is being used with detailed analytics on user activity, model distribution, and team-based usage
- **Rate limit and Load balancing**: You can set up rate limiting, load balancing and fallback for your models
## Tracing
For a more detailed understanding on tracing, please see [getting-started-tracing](https://docs.truefoundry.com/docs/tracing/tracing-getting-started).For tracing, you can add the Traceloop SDK:
For tracing, you can add the Traceloop SDK:
```bash
pip install traceloop-sdk
```
```python
from traceloop.sdk import Traceloop
# Initialize enhanced tracing
Traceloop.init(
api_endpoint="https://your-truefoundry-endpoint/api/tracing",
headers={
"Authorization": f"Bearer {your_truefoundry_pat_token}",
"TFY-Tracing-Project": "your_project_name",
},
)
```
This provides additional trace correlation across your entire CrewAI workflow.
![TrueFoundry CrewAI Tracing](/images/tracing_crewai.png)

View File

@@ -1,249 +0,0 @@
# Encrypted Agent Communication
CrewAI now supports encrypted agent-to-agent communication to ensure secure information exchange between agents in multi-agent workflows.
## Features
- **Fernet Encryption**: Uses industry-standard Fernet symmetric encryption
- **Fingerprint-based Key Derivation**: Unique keys derived from agent fingerprints
- **Secure Message Routing**: Messages can only be decrypted by intended recipients
- **Backward Compatible**: Non-encrypted agents continue to work normally
- **Optional**: Encryption can be enabled per-agent or per-crew
## Quick Start
### 1. Enable Encryption for Agents
```python
from crewai import Agent
from crewai.security import SecurityConfig
# Create agents with encryption enabled
researcher = Agent(
role="Research Analyst",
goal="Conduct research and analysis",
backstory="Expert researcher with deep analytical skills",
security_config=SecurityConfig(encrypted_communication=True)
)
writer = Agent(
role="Content Writer",
goal="Create engaging content",
backstory="Skilled writer who creates compelling narratives",
security_config=SecurityConfig(encrypted_communication=True)
)
```
### 2. Use Encrypted Communication in Tasks
```python
from crewai import Crew, Task
from crewai.tools.agent_tools import AskQuestionTool
# Create tools that support encrypted communication
agent_tools = [
AskQuestionTool(
agents=[researcher, writer],
description="Ask questions to team members with encrypted communication"
)
]
# Create tasks that will use encrypted communication
research_task = Task(
description="Research the latest AI trends",
expected_output="Comprehensive research report",
agent=researcher
)
writing_task = Task(
description="Ask the researcher about their findings and create a blog post",
expected_output="Engaging blog post based on research",
agent=writer,
tools=agent_tools # These will use encrypted communication automatically
)
# Run the crew - communications will be automatically encrypted
crew = Crew(agents=[researcher, writer], tasks=[research_task, writing_task])
result = crew.kickoff()
```
## How It Works
### Security Architecture
1. **Agent Fingerprints**: Each agent gets a unique cryptographic fingerprint
2. **Key Derivation**: Communication keys are derived from sender/recipient fingerprint pairs
3. **Message Encryption**: Payloads are encrypted using Fernet with derived keys
4. **Secure Routing**: Only intended recipients can decrypt messages
### Message Flow
```
Sender Agent → Encrypt Message → Encrypted Payload → Recipient Agent → Decrypt Message
↓ ↓ ↓ ↓ ↓
Fingerprint Derive Key EncryptedMessage Derive Key Original Message
```
### Encryption Details
- **Algorithm**: Fernet (AES 128 in CBC mode with HMAC-SHA256)
- **Key Length**: 256-bit encryption keys
- **Key Derivation**: SHA-256 based on sorted agent fingerprints
- **Message Format**: JSON with encrypted payload and metadata
## Configuration Options
### SecurityConfig Parameters
```python
from crewai.security import SecurityConfig
# Enable encryption
security_config = SecurityConfig(encrypted_communication=True)
# Disable encryption (default)
security_config = SecurityConfig(encrypted_communication=False)
```
### Mixed Encryption Scenarios
```python
# Some agents with encryption, others without
encrypted_agent = Agent(
role="Secure Agent",
security_config=SecurityConfig(encrypted_communication=True),
# ... other params
)
plain_agent = Agent(
role="Regular Agent",
security_config=SecurityConfig(encrypted_communication=False),
# ... other params
)
# Agent tools automatically handle mixed scenarios:
# - Encrypted agents → Encrypted communication
# - Non-encrypted agents → Plain communication
# - Mixed → Falls back to plain communication with warning
```
## Security Considerations
### What Is Protected
**Task descriptions and context** passed between agents
**Questions and responses** in agent-to-agent communication
**Delegation payloads** including sensitive instructions
**Agent-to-agent metadata** like sender/recipient information
### What Is NOT Protected
**Agent configurations** (roles, goals, backstories)
**Task outputs** stored in crew results
**LLM API calls** to external services
**Tool executions** outside of agent communication
### Best Practices
1. **Enable encryption** for sensitive workflows
2. **Use unique fingerprints** per deployment/environment
3. **Monitor logs** for encryption failures or downgrades
4. **Test mixed scenarios** with both encrypted and non-encrypted agents
5. **Keep fingerprints secure** - they are used for key derivation
## Advanced Usage
### Direct Encryption API
For advanced use cases, you can use the encryption API directly:
```python
from crewai.security import AgentCommunicationEncryption, Fingerprint
# Create encryption handlers
sender_fp = Fingerprint()
recipient_fp = Fingerprint()
sender_encryption = AgentCommunicationEncryption(sender_fp)
recipient_encryption = AgentCommunicationEncryption(recipient_fp)
# Encrypt a message
message = {"task": "Analyze data", "context": "Q4 results"}
encrypted_msg = sender_encryption.encrypt_message(
message,
recipient_fp,
message_type="analysis_request"
)
# Decrypt the message
decrypted_msg = recipient_encryption.decrypt_message(encrypted_msg)
```
### Custom Fingerprints
```python
# Generate deterministic fingerprints from seeds
fp = Fingerprint.generate(seed="agent-role-environment")
# Use custom metadata
fp = Fingerprint.generate(
seed="unique-seed",
metadata={"environment": "production", "version": "1.0"}
)
```
## Troubleshooting
### Common Issues
**Q: "Message not intended for this agent" error**
A: This happens when an agent tries to decrypt a message meant for another agent. Check that the correct recipient agent is being used.
**Q: "Encryption failed, falling back to plain communication" warning**
A: This indicates the encryption process failed and the system fell back to unencrypted communication. Check agent security configurations.
**Q: Mixed encrypted/non-encrypted agents not working**
A: Ensure at least one agent has encryption enabled for the encryption to activate. If no agents have encryption, all communication will be plain text.
### Debug Logging
Enable debug logging to see encryption activities:
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Look for log messages like:
# DEBUG:crewai.security.encrypted_communication:Encrypted message from abc12345... to def67890...
# DEBUG:crewai.tools.agent_tools.base_agent_tools:Executing encrypted communication task...
```
## Performance Impact
- **Encryption/Decryption**: Minimal overhead (~1-2ms per message)
- **Key Derivation**: Cached after first use per agent pair
- **Memory**: Small increase for encryption handlers and cached keys
- **Network**: No additional network calls (all local encryption)
## Migration Guide
### From Non-Encrypted to Encrypted
1. Add `security_config` to your agent definitions
2. No code changes required for agent tools
3. Test with mixed encrypted/non-encrypted agents first
4. Enable encryption for all agents in production
```python
# Before
agent = Agent(role="Analyst", goal="...", backstory="...")
# After
agent = Agent(
role="Analyst",
goal="...",
backstory="...",
security_config=SecurityConfig(encrypted_communication=True)
)
```
That's it! Your agents will automatically use encrypted communication when both sender and recipient support it.

435
docs/enterprise-api.en.yaml Normal file
View File

@@ -0,0 +1,435 @@
openapi: 3.0.3
info:
title: CrewAI Enterprise API
description: |
REST API for interacting with your deployed CrewAI crews on CrewAI Enterprise.
## Getting Started
1. **Find your crew URL**: Get your unique crew URL from the CrewAI Enterprise dashboard
2. **Copy examples**: Use the code examples from each endpoint page as templates
3. **Replace placeholders**: Update URLs and tokens with your actual values
4. **Test with your tools**: Use cURL, Postman, or your preferred API client
## Authentication
All API requests require a bearer token for authentication. There are two types of tokens:
- **Bearer Token**: Organization-level token for full crew operations
- **User Bearer Token**: User-scoped token for individual access with limited permissions
You can find your bearer tokens in the Status tab of your crew's detail page in the CrewAI Enterprise dashboard.
## Reference Documentation
This documentation provides comprehensive examples for each endpoint:
- **Request formats** with all required and optional parameters
- **Response examples** for success and error scenarios
- **Code samples** in multiple programming languages
- **Authentication patterns** with proper Bearer token usage
Copy the examples and customize them with your actual crew URL and authentication tokens.
## Workflow
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Support
email: support@crewai.com
url: https://crewai.com
servers:
- url: https://your-actual-crew-name.crewai.com
description: Replace with your actual deployed crew URL from the CrewAI Enterprise dashboard
- url: https://my-travel-crew.crewai.com
description: Example travel planning crew (replace with your URL)
- url: https://content-creation-crew.crewai.com
description: Example content creation crew (replace with your URL)
- url: https://research-assistant-crew.crewai.com
description: Example research assistant crew (replace with your URL)
security:
- BearerAuth: []
paths:
/inputs:
get:
summary: Get Required Inputs
description: |
**📋 Reference Example Only** - *This shows the request format. To test with your actual crew, copy the cURL example and replace the URL + token with your real values.*
Retrieves the list of all required input parameters that your crew expects for execution.
Use this endpoint to discover what inputs you need to provide when starting a crew execution.
operationId: getRequiredInputs
responses:
'200':
description: Successfully retrieved required inputs
content:
application/json:
schema:
type: object
properties:
inputs:
type: array
items:
type: string
description: Array of required input parameter names
example: ["budget", "interests", "duration", "age"]
examples:
travel_crew:
summary: Travel planning crew inputs
value:
inputs: ["budget", "interests", "duration", "age"]
outreach_crew:
summary: Outreach crew inputs
value:
inputs: ["name", "title", "company", "industry", "our_product", "linkedin_url"]
'401':
$ref: '#/components/responses/UnauthorizedError'
'404':
$ref: '#/components/responses/NotFoundError'
'500':
$ref: '#/components/responses/ServerError'
/kickoff:
post:
summary: Start Crew Execution
description: |
**📋 Reference Example Only** - *This shows the request format. To test with your actual crew, copy the cURL example and replace the URL + token with your real values.*
Initiates a new crew execution with the provided inputs. Returns a kickoff ID that can be used
to track the execution progress and retrieve results.
Crew executions can take anywhere from seconds to minutes depending on their complexity.
Consider using webhooks for real-time notifications or implement polling with the status endpoint.
operationId: startCrewExecution
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- inputs
properties:
inputs:
type: object
description: Key-value pairs of all required inputs for your crew
additionalProperties:
type: string
example:
budget: "1000 USD"
interests: "games, tech, ai, relaxing hikes, amazing food"
duration: "7 days"
age: "35"
meta:
type: object
description: Additional metadata to pass to the crew
additionalProperties: true
example:
requestId: "user-request-12345"
source: "mobile-app"
taskWebhookUrl:
type: string
format: uri
description: Callback URL executed after each task completion
example: "https://your-server.com/webhooks/task"
stepWebhookUrl:
type: string
format: uri
description: Callback URL executed after each agent thought/action
example: "https://your-server.com/webhooks/step"
crewWebhookUrl:
type: string
format: uri
description: Callback URL executed when the crew execution completes
example: "https://your-server.com/webhooks/crew"
examples:
travel_planning:
summary: Travel planning crew
value:
inputs:
budget: "1000 USD"
interests: "games, tech, ai, relaxing hikes, amazing food"
duration: "7 days"
age: "35"
meta:
requestId: "travel-req-123"
source: "web-app"
outreach_campaign:
summary: Outreach crew with webhooks
value:
inputs:
name: "John Smith"
title: "CTO"
company: "TechCorp"
industry: "Software"
our_product: "AI Development Platform"
linkedin_url: "https://linkedin.com/in/johnsmith"
taskWebhookUrl: "https://api.example.com/webhooks/task"
crewWebhookUrl: "https://api.example.com/webhooks/crew"
responses:
'200':
description: Crew execution started successfully
content:
application/json:
schema:
type: object
properties:
kickoff_id:
type: string
format: uuid
description: Unique identifier for tracking this execution
example: "abcd1234-5678-90ef-ghij-klmnopqrstuv"
'400':
description: Invalid request body or missing required inputs
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'401':
$ref: '#/components/responses/UnauthorizedError'
'422':
description: Validation error - ensure all required inputs are provided
content:
application/json:
schema:
$ref: '#/components/schemas/ValidationError'
'500':
$ref: '#/components/responses/ServerError'
/status/{kickoff_id}:
get:
summary: Get Execution Status
description: |
**📋 Reference Example Only** - *This shows the request format. To test with your actual crew, copy the cURL example and replace the URL + token with your real values.*
Retrieves the current status and results of a crew execution using its kickoff ID.
The response structure varies depending on the execution state:
- **running**: Execution in progress with current task info
- **completed**: Execution finished with full results
- **error**: Execution failed with error details
operationId: getExecutionStatus
parameters:
- name: kickoff_id
in: path
required: true
description: The kickoff ID returned from the /kickoff endpoint
schema:
type: string
format: uuid
example: "abcd1234-5678-90ef-ghij-klmnopqrstuv"
responses:
'200':
description: Successfully retrieved execution status
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/ExecutionRunning'
- $ref: '#/components/schemas/ExecutionCompleted'
- $ref: '#/components/schemas/ExecutionError'
examples:
running:
summary: Execution in progress
value:
status: "running"
current_task: "research_task"
progress:
completed_tasks: 1
total_tasks: 3
completed:
summary: Execution completed successfully
value:
status: "completed"
result:
output: "Comprehensive travel itinerary for 7 days in Japan focusing on tech culture..."
tasks:
- task_id: "research_task"
output: "Research findings on tech destinations in Japan..."
agent: "Travel Researcher"
execution_time: 45.2
- task_id: "planning_task"
output: "7-day detailed itinerary with activities and recommendations..."
agent: "Trip Planner"
execution_time: 62.8
execution_time: 108.5
error:
summary: Execution failed
value:
status: "error"
error: "Task execution failed: Invalid API key for external service"
execution_time: 23.1
'401':
$ref: '#/components/responses/UnauthorizedError'
'404':
description: Kickoff ID not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error: "Execution not found"
message: "No execution found with ID: abcd1234-5678-90ef-ghij-klmnopqrstuv"
'500':
$ref: '#/components/responses/ServerError'
components:
securitySchemes:
BearerAuth:
type: http
scheme: bearer
description: |
**📋 Reference Documentation** - *The tokens shown in examples are placeholders for reference only.*
Use your actual Bearer Token or User Bearer Token from the CrewAI Enterprise dashboard for real API calls.
**Bearer Token**: Organization-level access for full crew operations
**User Bearer Token**: User-scoped access with limited permissions
schemas:
ExecutionRunning:
type: object
properties:
status:
type: string
enum: ["running"]
example: "running"
current_task:
type: string
description: Name of the currently executing task
example: "research_task"
progress:
type: object
properties:
completed_tasks:
type: integer
description: Number of completed tasks
example: 1
total_tasks:
type: integer
description: Total number of tasks in the crew
example: 3
ExecutionCompleted:
type: object
properties:
status:
type: string
enum: ["completed"]
example: "completed"
result:
type: object
properties:
output:
type: string
description: Final output from the crew execution
example: "Comprehensive travel itinerary..."
tasks:
type: array
items:
$ref: '#/components/schemas/TaskResult'
execution_time:
type: number
description: Total execution time in seconds
example: 108.5
ExecutionError:
type: object
properties:
status:
type: string
enum: ["error"]
example: "error"
error:
type: string
description: Error message describing what went wrong
example: "Task execution failed: Invalid API key"
execution_time:
type: number
description: Time until error occurred in seconds
example: 23.1
TaskResult:
type: object
properties:
task_id:
type: string
description: Unique identifier for the task
example: "research_task"
output:
type: string
description: Output generated by this task
example: "Research findings..."
agent:
type: string
description: Name of the agent that executed this task
example: "Travel Researcher"
execution_time:
type: number
description: Time taken to execute this task in seconds
example: 45.2
Error:
type: object
properties:
error:
type: string
description: Error type or title
example: "Authentication Error"
message:
type: string
description: Detailed error message
example: "Invalid bearer token provided"
ValidationError:
type: object
properties:
error:
type: string
example: "Validation Error"
message:
type: string
example: "Missing required inputs"
details:
type: object
properties:
missing_inputs:
type: array
items:
type: string
example: ["budget", "interests"]
responses:
UnauthorizedError:
description: Authentication failed - check your bearer token
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error: "Unauthorized"
message: "Invalid or missing bearer token"
NotFoundError:
description: Resource not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error: "Not Found"
message: "The requested resource was not found"
ServerError:
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error: "Internal Server Error"
message: "An unexpected error occurred"

231
docs/enterprise-api.ko.yaml Normal file
View File

@@ -0,0 +1,231 @@
openapi: 3.0.3
info:
title: CrewAI 엔터프라이즈 API
description: |
CrewAI Enterprise에 배포된 crew와 상호작용하기 위한 REST API입니다.
## 시작하기
1. **Crew URL 확인**: 대시보드에서 고유한 crew URL을 확인하세요
2. **예제 복사**: 각 엔드포인트의 예제를 템플릿으로 사용하세요
3. **플레이스홀더 교체**: 실제 URL과 토큰으로 바꾸세요
4. **도구로 테스트**: cURL, Postman 등 선호하는 도구로 테스트하세요
version: 1.0.0
contact:
name: CrewAI 지원
email: support@crewai.com
url: https://crewai.com
servers:
- url: https://your-actual-crew-name.crewai.com
description: 대시보드의 실제 crew URL로 교체하세요
security:
- BearerAuth: []
paths:
/inputs:
get:
summary: 필요 입력값 조회
description: |
**📋 참조 예제만 제공** - *요청 형식을 보여줍니다. 실제 호출은 cURL 예제를 복사해 URL과 토큰을 교체하세요.*
실행에 필요한 입력 파라미터 목록을 반환합니다.
operationId: getRequiredInputs
responses:
'200':
description: 입력값을 성공적으로 조회
content:
application/json:
schema:
type: object
properties:
inputs:
type: array
items:
type: string
'401':
$ref: '#/components/responses/UnauthorizedError'
'404':
$ref: '#/components/responses/NotFoundError'
'500':
$ref: '#/components/responses/ServerError'
/kickoff:
post:
summary: Crew 실행 시작
description: |
**📋 참조 예제만 제공** - *요청 형식을 보여줍니다. 실제 호출은 cURL 예제를 복사해 URL과 토큰을 교체하세요.*
제공된 입력으로 새로운 실행을 시작하고 kickoff ID를 반환합니다.
operationId: startCrewExecution
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- inputs
properties:
inputs:
type: object
additionalProperties:
type: string
responses:
'200':
description: 실행이 성공적으로 시작됨
content:
application/json:
schema:
type: object
properties:
kickoff_id:
type: string
format: uuid
'401':
$ref: '#/components/responses/UnauthorizedError'
'500':
$ref: '#/components/responses/ServerError'
/status/{kickoff_id}:
get:
summary: 실행 상태 조회
description: |
**📋 참조 예제만 제공** - *요청 형식을 보여줍니다. 실제 호출은 cURL 예제를 복사해 URL과 토큰을 교체하세요.*
kickoff ID로 실행 상태와 결과를 조회합니다.
operationId: getExecutionStatus
parameters:
- name: kickoff_id
in: path
required: true
schema:
type: string
format: uuid
responses:
'200':
description: 상태를 성공적으로 조회
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/ExecutionRunning'
- $ref: '#/components/schemas/ExecutionCompleted'
- $ref: '#/components/schemas/ExecutionError'
'401':
$ref: '#/components/responses/UnauthorizedError'
'404':
description: Kickoff ID를 찾을 수 없음
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
$ref: '#/components/responses/ServerError'
components:
securitySchemes:
BearerAuth:
type: http
scheme: bearer
description: |
**📋 참고** - *예시의 토큰은 자리 표시자입니다.* 실제 토큰을 사용하세요.
schemas:
ExecutionRunning:
type: object
properties:
status:
type: string
enum: ["running"]
current_task:
type: string
progress:
type: object
properties:
completed_tasks:
type: integer
total_tasks:
type: integer
ExecutionCompleted:
type: object
properties:
status:
type: string
enum: ["completed"]
result:
type: object
properties:
output:
type: string
tasks:
type: array
items:
$ref: '#/components/schemas/TaskResult'
execution_time:
type: number
ExecutionError:
type: object
properties:
status:
type: string
enum: ["error"]
error:
type: string
execution_time:
type: number
TaskResult:
type: object
properties:
task_id:
type: string
output:
type: string
agent:
type: string
execution_time:
type: number
Error:
type: object
properties:
error:
type: string
message:
type: string
ValidationError:
type: object
properties:
error:
type: string
message:
type: string
details:
type: object
properties:
missing_inputs:
type: array
items:
type: string
responses:
UnauthorizedError:
description: 인증 실패
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
NotFoundError:
description: 리소스를 찾을 수 없음
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
ServerError:
description: 서버 내부 오류
content:
application/json:
schema:
$ref: '#/components/schemas/Error'

View File

@@ -0,0 +1,268 @@
openapi: 3.0.3
info:
title: CrewAI Enterprise API
description: |
REST API para interagir com suas crews implantadas no CrewAI Enterprise.
## Introdução
1. **Encontre a URL da sua crew**: Obtenha sua URL única no painel do CrewAI Enterprise
2. **Copie os exemplos**: Use os exemplos de cada endpoint como modelo
3. **Substitua os placeholders**: Atualize URLs e tokens com seus valores reais
4. **Teste com suas ferramentas**: Use cURL, Postman ou seu cliente preferido
## Autenticação
Todas as requisições exigem um token bearer. Existem dois tipos:
- **Bearer Token**: Token em nível de organização para operações completas
- **User Bearer Token**: Token com escopo de usuário com permissões limitadas
Você encontra os tokens na aba Status da sua crew no painel do CrewAI Enterprise.
## Documentação de Referência
Este documento fornece exemplos completos para cada endpoint:
- **Formatos de requisição** com parâmetros obrigatórios e opcionais
- **Exemplos de resposta** para sucesso e erro
- **Amostras de código** em várias linguagens
- **Padrões de autenticação** com uso correto de Bearer token
Copie os exemplos e personalize com sua URL e tokens reais.
## Fluxo
1. **Descubra os inputs** usando `GET /inputs`
2. **Inicie a execução** usando `POST /kickoff`
3. **Monitore o progresso** usando `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Suporte
email: support@crewai.com
url: https://crewai.com
servers:
- url: https://your-actual-crew-name.crewai.com
description: Substitua pela URL real da sua crew no painel do CrewAI Enterprise
security:
- BearerAuth: []
paths:
/inputs:
get:
summary: Obter Inputs Requeridos
description: |
**📋 Exemplo de Referência** - *Mostra o formato da requisição. Para testar com sua crew real, copie o cURL e substitua URL + token.*
Retorna a lista de parâmetros de entrada que sua crew espera.
operationId: getRequiredInputs
responses:
'200':
description: Inputs requeridos obtidos com sucesso
content:
application/json:
schema:
type: object
properties:
inputs:
type: array
items:
type: string
description: Nomes dos parâmetros de entrada
example: ["budget", "interests", "duration", "age"]
'401':
$ref: '#/components/responses/UnauthorizedError'
'404':
$ref: '#/components/responses/NotFoundError'
'500':
$ref: '#/components/responses/ServerError'
/kickoff:
post:
summary: Iniciar Execução da Crew
description: |
**📋 Exemplo de Referência** - *Mostra o formato da requisição. Para testar com sua crew real, copie o cURL e substitua URL + token.*
Inicia uma nova execução da crew com os inputs fornecidos e retorna um kickoff ID.
operationId: startCrewExecution
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- inputs
properties:
inputs:
type: object
additionalProperties:
type: string
example:
budget: "1000 USD"
interests: "games, tech, ai, relaxing hikes, amazing food"
duration: "7 days"
age: "35"
responses:
'200':
description: Execução iniciada com sucesso
content:
application/json:
schema:
type: object
properties:
kickoff_id:
type: string
format: uuid
example: "abcd1234-5678-90ef-ghij-klmnopqrstuv"
'401':
$ref: '#/components/responses/UnauthorizedError'
'500':
$ref: '#/components/responses/ServerError'
/status/{kickoff_id}:
get:
summary: Obter Status da Execução
description: |
**📋 Exemplo de Referência** - *Mostra o formato da requisição. Para testar com sua crew real, copie o cURL e substitua URL + token.*
Retorna o status atual e os resultados de uma execução usando o kickoff ID.
operationId: getExecutionStatus
parameters:
- name: kickoff_id
in: path
required: true
schema:
type: string
format: uuid
responses:
'200':
description: Status recuperado com sucesso
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/ExecutionRunning'
- $ref: '#/components/schemas/ExecutionCompleted'
- $ref: '#/components/schemas/ExecutionError'
'401':
$ref: '#/components/responses/UnauthorizedError'
'404':
description: Kickoff ID não encontrado
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
'500':
$ref: '#/components/responses/ServerError'
components:
securitySchemes:
BearerAuth:
type: http
scheme: bearer
description: |
**📋 Referência** - *Os tokens mostrados são apenas exemplos.*
Use seus tokens reais do painel do CrewAI Enterprise.
schemas:
ExecutionRunning:
type: object
properties:
status:
type: string
enum: ["running"]
current_task:
type: string
progress:
type: object
properties:
completed_tasks:
type: integer
total_tasks:
type: integer
ExecutionCompleted:
type: object
properties:
status:
type: string
enum: ["completed"]
result:
type: object
properties:
output:
type: string
tasks:
type: array
items:
$ref: '#/components/schemas/TaskResult'
execution_time:
type: number
ExecutionError:
type: object
properties:
status:
type: string
enum: ["error"]
error:
type: string
execution_time:
type: number
TaskResult:
type: object
properties:
task_id:
type: string
output:
type: string
agent:
type: string
execution_time:
type: number
Error:
type: object
properties:
error:
type: string
message:
type: string
ValidationError:
type: object
properties:
error:
type: string
message:
type: string
details:
type: object
properties:
missing_inputs:
type: array
items:
type: string
responses:
UnauthorizedError:
description: Autenticação falhou
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
NotFoundError:
description: Recurso não encontrado
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
ServerError:
description: Erro interno do servidor
content:
application/json:
schema:
$ref: '#/components/schemas/Error'

Binary file not shown.

Before

Width:  |  Height:  |  Size: 288 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 419 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 263 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 248 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 472 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 530 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 554 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

View File

@@ -177,14 +177,7 @@ class MyCustomCrew:
# Your crew implementation...
```
이것이 바로 CrewAI의 내장 `agentops_listener`가 등록되는 방식과 동일합니다. CrewAI 코드베이스에서는 다음과 같이 되어 있습니다:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
이렇게 하면 `crewai.utilities.events` 패키지가 임포트될 때 `agentops_listener`가 자동으로 로드됩니다.
이것이 CrewAI 코드베이스에서 서드파티 이벤트 리스너가 등록되는 방식입니다.
## 사용 가능한 이벤트 유형
@@ -280,77 +273,6 @@ CrewAI는 여러분이 청취할 수 있는 다양한 이벤트를 제공합니
추가 필드는 이벤트 타입에 따라 다릅니다. 예를 들어, `CrewKickoffCompletedEvent`에는 `crew_name`과 `output` 필드가 포함됩니다.
## 실제 예시: AgentOps와의 통합
CrewAI는 AI 에이전트를 위한 모니터링 및 관찰 플랫폼인 [AgentOps](https://github.com/AgentOps-AI/agentops)와의 서드파티 통합 예시를 포함하고 있습니다. 구현 방식은 다음과 같습니다:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
이 listener는 crew가 시작될 때 AgentOps 세션을 초기화하고, agent를 AgentOps에 등록하며, 도구 사용을 추적하고, crew가 완료되면 세션을 종료합니다.
AgentOps listener는 `src/crewai/utilities/events/third_party/__init__.py` 파일의 import를 통해 CrewAI 이벤트 시스템에 등록됩니다:
```python
from .agentops_listener import agentops_listener
```
이렇게 하면 `crewai.utilities.events` 패키지가 import될 때 `agentops_listener`가 로드되는 것이 보장됩니다.
## 고급 사용법: Scoped Handlers

View File

@@ -0,0 +1,103 @@
---
title: "역할 기반 접근 제어 (RBAC)"
description: "역할과 자동화별 가시성으로 crews, 도구, 데이터 접근을 제어합니다."
icon: "shield"
---
## 개요
CrewAI Enterprise의 RBAC는 **조직 수준 역할**과 **자동화(Automation) 수준 가시성**을 결합하여 안전하고 확장 가능한 접근 제어를 제공합니다.
<Frame>
<img src="/images/enterprise/users_and_roles.png" alt="CrewAI Enterprise RBAC 개요" />
</Frame>
## 사용자와 역할
워크스페이스의 각 구성원은 역할이 있으며, 이는 기능 접근 범위를 결정합니다.
가능한 작업:
- 사전 정의된 역할 사용 (Owner, Member)
- 권한을 세분화한 커스텀 역할 생성
- 설정 화면에서 언제든 역할 할당/변경
설정 위치: Settings → Roles
<Steps>
<Step title="Roles 열기">
<b>Settings → Roles</b>로 이동합니다.
</Step>
<Step title="역할 선택">
<b>Owner</b> 또는 <b>Member</b>를 사용하거나 <b>Create role</b>로 커스텀 역할을 만듭니다.
</Step>
<Step title="멤버에 할당">
사용자들을 선택하여 역할을 지정합니다. 언제든 변경할 수 있습니다.
</Step>
</Steps>
### 구성 요약
| 영역 | 위치 | 옵션 |
|:---|:---|:---|
| 사용자 & 역할 | Settings → Roles | Owner, Member; 커스텀 역할 |
| 자동화 가시성 | Automation → Settings → Visibility | Private; 사용자/역할 화이트리스트 |
## 자동화 수준 접근 제어
조직 역할과 별개로, **Automations**는 사용자/역할별로 특정 자동화 접근을 제한하는 가시성 설정을 제공합니다.
유용한 경우:
- 민감/실험 자동화를 비공개로 유지
- 대규모 팀/외부 협업에서 가시성 관리
- 격리된 컨텍스트에서 자동화 테스트
Private 모드에서는 화이트리스트에 포함된 사용자/역할만 다음 작업이 가능합니다:
- 자동화 보기
- 실행/API 사용
- 로그, 메트릭, 설정 접근
조직 Owner는 항상 접근 가능하며, 가시성 설정에 영향을 받지 않습니다.
설정 위치: Automation → Settings → Visibility
<Steps>
<Step title="Visibility 탭 열기">
<b>Automation → Settings → Visibility</b>로 이동합니다.
</Step>
<Step title="가시성 설정">
<b>Private</b>를 선택합니다. Owner는 항상 접근 가능합니다.
</Step>
<Step title="허용 대상 추가">
보기/실행/로그·메트릭·설정 접근이 가능한 사용자/역할을 추가합니다.
</Step>
<Step title="저장 및 확인">
저장 후, 목록에 없는 사용자가 보거나 실행할 수 없는지 확인합니다.
</Step>
</Steps>
### Private 모드 접근 결과
| 동작 | Owner | 화이트리스트 사용자/역할 | 비포함 |
|:---|:---|:---|:---|
| 자동화 보기 | ✓ | ✓ | ✗ |
| 실행/API | ✓ | ✓ | ✗ |
| 로그/메트릭/설정 | ✓ | ✓ | ✗ |
<Tip>
Owner는 항상 접근 가능하며, Private 모드에서는 화이트리스트에 포함된 사용자/역할만 권한이 부여됩니다.
</Tip>
<Frame>
<img src="/images/enterprise/visibility.png" alt="CrewAI Enterprise 가시성 설정" />
</Frame>
<Card title="도움이 필요하신가요?" icon="headset" href="mailto:support@crewai.com">
RBAC 구성과 점검에 대한 지원이 필요하면 연락해 주세요.
</Card>

View File

@@ -0,0 +1,22 @@
---
title: CrewAI Cookbooks
description: 패턴을 빠르게 익히기 위한 기능 중심 Quickstarts와 노트북.
icon: book
---
## Quickstarts & Demos
<CardGroup cols={2}>
<Card title="Quickstarts 저장소" icon="bolt" href="https://github.com/crewAIInc/crewAI-quickstarts">
특정 CrewAI 기능을 보여주는 데모와 소규모 프로젝트.
</Card>
<Card title="예시의 노트북" icon="book-open" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks">
실습을 위한 인터랙티브 노트북.
</Card>
</CardGroup>
<Tip>
Cookbooks로 패턴을 빠르게 익힌 뒤, 프로덕션급 구현은 Full Examples에서 확인하세요.
</Tip>

View File

@@ -1,62 +1,85 @@
---
title: CrewAI 예시
description: CrewAI 프레임워크를 사용하여 워크플로우를 자동화하는 방법을 보여주는 예시 모음입니다.
description: Crews, Flows, 통합, Notebooks로 구성된 예시 모음입니다.
icon: rocket-launch
---
## Crews
<CardGroup cols={3}>
<Card
title="마케팅 전략"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/marketing_strategy"
icon="bullhorn"
iconType="solid"
>
CrewAI로 마케팅 전략 생성을 자동화하세요.
<Card title="마케팅 전략" icon="bullhorn" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/marketing_strategy">
다중 에이전트 마케팅 캠페인 기획.
</Card>
<Card title="깜짝 여행" icon="plane" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/surprise_trip">
개인화된 여행 계획.
</Card>
<Card title="프로필-포지션 매칭" icon="id-card" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/match_profile_to_positions">
벡터 검색 기반 이력서 매칭.
</Card>
<Card title="채용 공고" icon="newspaper" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/job-posting">
채용 공고 자동 생성.
</Card>
<Card title="게임 빌더 Crew" icon="gamepad" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/game-builder-crew">
파이썬 게임을 설계·구축하는 멀티 에이전트 팀.
</Card>
<Card title="리크루팅" icon="user-group" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/recruitment">
후보자 소싱 및 평가.
</Card>
<Card title="모든 Crews 보기" icon="users" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews">
전체 crew 예시 목록.
</Card>
</CardGroup>
## Flows
<CardGroup cols={3}>
<Card title="Content Creator Flow" icon="pen" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/content_creator_flow">
라우팅 기반 콘텐츠 생성.
</Card>
<Card title="이메일 자동 응답" icon="envelope" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/email_auto_responder_flow">
이메일 모니터링과 자동 응답.
</Card>
<Card title="리드 점수 Flow" icon="chart-line" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/lead_score_flow">
휴먼‑인‑더‑루프 리드 평가.
</Card>
<Card title="미팅 어시스턴트 Flow" icon="calendar" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/meeting_assistant_flow">
노트 처리 및 연동.
</Card>
<Card title="Self Evaluation Loop" icon="rotate" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/self_evaluation_loop_flow">
반복적 자가 개선 워크플로우.
</Card>
<Card title="책 쓰기 (Flows)" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/write_a_book_with_flows">
병렬 챕터 생성.
</Card>
<Card title="모든 Flows 보기" icon="diagram-project" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows">
전체 flow 예시 목록.
</Card>
</CardGroup>
## 통합 (Integrations)
<CardGroup cols={3}>
<Card title="CrewAI ↔ LangGraph" icon="link" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/crewai-langgraph">
LangGraph 프레임워크 연동.
</Card>
<Card title="Azure OpenAI" icon="cloud" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/azure_model">
Azure OpenAI와 함께 사용.
</Card>
<Card title="NVIDIA 모델" icon="microchip" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/nvidia_models">
NVIDIA 생태계 연동.
</Card>
<Card title="모든 통합 보기" icon="puzzle-piece" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations">
전체 통합 예시.
</Card>
</CardGroup>
## 노트북 (Notebooks)
<CardGroup cols={2}>
<Card title="Simple QA Crew + Flow" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/Simple%20QA%20Crew%20%2B%20Flow">
Simple QA Crew + Flow.
</Card>
<Card title="모든 노트북" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks">
학습과 실험을 위한 인터랙티브 예시 모음.
</Card>
<Card
title="깜짝 여행"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/surprise_trip"
icon="plane"
iconType="duotone"
>
CrewAI로 깜짝 여행 일정표를 만들어보세요.
</Card>
<Card
title="프로필과 포지션 매칭"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/match_profile_to_positions"
icon="linkedin"
iconType="duotone"
>
CrewAI로 프로필을 채용 포지션에 매칭하세요.
</Card>
<Card
title="채용 공고 생성"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/job-posting"
icon="newspaper"
iconType="duotone"
>
CrewAI로 채용 공고를 만드세요.
</Card>
<Card
title="게임 생성기"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/game-builder-crew"
icon="gamepad"
iconType="duotone"
>
CrewAI로 게임을 만들어보세요.
</Card>
<Card
title="채용 후보자 찾기"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/recruitment"
icon="user-group"
iconType="duotone"
>
CrewAI로 채용 후보자를 찾으세요.
</Card>
</CardGroup>

View File

@@ -1,124 +0,0 @@
---
title: AgentOps 통합
description: AgentOps를 사용하여 에이전트 성능을 이해하고 로깅하기
icon: paperclip
---
# 소개
Observability는 대화형 AI 에이전트를 개발하고 배포하는 데 있어 핵심적인 요소입니다. 이는 개발자가 에이전트의 성능을 이해하고, 에이전트가 사용자와 어떻게 상호작용하는지, 그리고 에이전트가 외부 도구와 API를 어떻게 사용하는지를 파악할 수 있게 해줍니다.
AgentOps는 CrewAI와 독립적인 제품으로, 에이전트를 위한 종합적인 observability 솔루션을 제공합니다.
## AgentOps
[AgentOps](https://agentops.ai/?=crew)은 에이전트에 대한 세션 리플레이, 메트릭, 모니터링을 제공합니다.
AgentOps는 높은 수준에서 비용, 토큰 사용량, 대기 시간, 에이전트 실패, 세션 전체 통계 등 다양한 항목을 모니터링할 수 있는 기능을 제공합니다.
더 자세한 내용은 [AgentOps Repo](https://github.com/AgentOps-AI/agentops)를 확인하세요.
### 개요
AgentOps는 개발 및 프로덕션 환경에서 에이전트에 대한 모니터링을 제공합니다.
에이전트 성능, 세션 리플레이, 맞춤형 리포팅을 추적할 수 있는 대시보드를 제공합니다.
또한, AgentOps는 Crew 에이전트 상호작용, LLM 호출, 툴 사용을 실시간으로 볼 수 있는 세션 드릴다운 기능을 제공합니다.
이 기능은 에이전트가 사용자 및 다른 에이전트와 어떻게 상호작용하는지 디버깅하고 이해하는 데 유용합니다.
![선택된 에이전트 세션 실행 시리즈의 개요](/images/agentops-overview.png)
![에이전트 실행을 조사하기 위한 세션 드릴다운 개요](/images/agentops-session.png)
![단계별 에이전트 리플레이 실행 그래프 보기](/images/agentops-replay.png)
### 특징
- **LLM 비용 관리 및 추적**: 기반 모델 공급자와의 지출을 추적합니다.
- **재생 분석**: 단계별 에이전트 실행 그래프를 시청할 수 있습니다.
- **재귀적 사고 감지**: 에이전트가 무한 루프에 빠졌는지 식별합니다.
- **맞춤형 보고서**: 에이전트 성능에 대한 맞춤형 분석을 생성합니다.
- **분석 대시보드**: 개발 및 운영 중인 에이전트에 대한 상위 수준 통계를 모니터링합니다.
- **공개 모델 테스트**: 벤치마크 및 리더보드를 통해 에이전트를 테스트할 수 있습니다.
- **맞춤형 테스트**: 도메인별 테스트로 에이전트를 실행합니다.
- **타임 트래블 디버깅**: 체크포인트에서 세션을 재시작합니다.
- **컴플라이언스 및 보안**: 감사 로그를 생성하고 욕설 및 PII 유출과 같은 잠재적 위협을 감지합니다.
- **프롬프트 인젝션 감지**: 잠재적 코드 인젝션 및 시크릿 유출을 식별합니다.
### AgentOps 사용하기
<Steps>
<Step title="API 키 생성">
사용자 API 키를 여기서 생성하세요: [API 키 생성](https://app.agentops.ai/account)
</Step>
<Step title="환경 설정">
API 키를 환경 변수에 추가하세요:
```bash
AGENTOPS_API_KEY=<YOUR_AGENTOPS_API_KEY>
```
</Step>
<Step title="AgentOps 설치">
다음 명령어로 AgentOps를 설치하세요:
```bash
pip install 'crewai[agentops]'
```
또는
```bash
pip install agentops
```
</Step>
<Step title="AgentOps 초기화">
스크립트에서 `Crew`를 사용하기 전에 다음 코드를 포함하세요:
```python
import agentops
agentops.init()
```
이렇게 하면 AgentOps 세션이 시작되고 Crew 에이전트가 자동으로 추적됩니다. 더 복잡한 agentic 시스템을 구성하는 방법에 대한 자세한 정보는 [AgentOps 문서](https://docs.agentops.ai) 또는 [Discord](https://discord.gg/j4f3KbeH)를 참조하세요.
</Step>
</Steps>
### Crew + AgentOps 예시
<CardGroup cols={3}>
<Card
title="Job Posting"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/job-posting"
icon="briefcase"
iconType="solid"
>
채용 공고를 생성하는 Crew agent의 예시입니다.
</Card>
<Card
title="Markdown Validator"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/markdown_validator"
icon="markdown"
iconType="solid"
>
Markdown 파일을 검증하는 Crew agent의 예시입니다.
</Card>
<Card
title="Instagram Post"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/instagram_post"
icon="square-instagram"
iconType="brands"
>
Instagram 게시물을 생성하는 Crew agent의 예시입니다.
</Card>
</CardGroup>
### 추가 정보
시작하려면 [AgentOps 계정](https://agentops.ai/?=crew)을 생성하세요.
기능 요청이나 버그 보고가 필요하시면 [AgentOps Repo](https://github.com/AgentOps-AI/agentops)에서 AgentOps 팀에 문의해 주세요.
#### 추가 링크
<a href="https://twitter.com/agentopsai/">🐦 트위터</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://discord.gg/JHPt4C7r">📢 디스코드</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://app.agentops.ai/?=crew">🖇️ AgentOps 대시보드</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://docs.agentops.ai/introduction">📙 문서화</a>

View File

@@ -21,9 +21,6 @@ icon: "face-smile"
### 모니터링 & 트레이싱 플랫폼
<CardGroup cols={2}>
<Card title="AgentOps" icon="paperclip" href="/ko/observability/agentops">
에이전트 개발 및 운영을 위한 세션 리플레이, 메트릭, 모니터링 제공.
</Card>
<Card title="LangDB" icon="database" href="/ko/observability/langdb">
자동 에이전트 상호작용 캡처를 포함한 CrewAI 워크플로의 엔드-투-엔드 트레이싱.

View File

@@ -177,14 +177,7 @@ class MyCustomCrew:
# Sua implementação do crew...
```
É exatamente assim que o `agentops_listener` integrado do CrewAI é registrado. No código-fonte do CrewAI, você encontrará:
```python
# src/crewai/utilities/events/third_party/__init__.py
from .agentops_listener import agentops_listener
```
Isso garante que o `agentops_listener` seja carregado quando o pacote `crewai.utilities.events` for importado.
É assim que listeners de eventos de terceiros são registrados no código do CrewAI.
## Tipos de Eventos Disponíveis
@@ -269,77 +262,6 @@ A estrutura do objeto de evento depende do tipo do evento, mas todos herdam de `
Campos adicionais variam pelo tipo de evento. Por exemplo, `CrewKickoffCompletedEvent` inclui os campos `crew_name` e `output`.
## Exemplo Real: Integração com AgentOps
O CrewAI inclui um exemplo de integração com [AgentOps](https://github.com/AgentOps-AI/agentops), uma plataforma de monitoramento e observabilidade para agentes de IA. Veja como é implementado:
```python
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
```
Esse listener inicializa uma sessão do AgentOps quando um Crew inicia, cadastra agentes no AgentOps, rastreia o uso de ferramentas e finaliza a sessão quando o Crew é concluído.
O listener AgentOps é registrado no sistema de eventos do CrewAI via importação em `src/crewai/utilities/events/third_party/__init__.py`:
```python
from .agentops_listener import agentops_listener
```
Isso garante que o `agentops_listener` seja carregado quando o pacote `crewai.utilities.events` for importado.
## Uso Avançado: Handlers Escopados

View File

@@ -0,0 +1,103 @@
---
title: "Controle de Acesso Baseado em Funções (RBAC)"
description: "Controle o acesso a crews, ferramentas e dados com funções e visibilidade por automação."
icon: "shield"
---
## Visão Geral
O RBAC no CrewAI Enterprise permite gerenciar acesso de forma segura e escalável combinando **funções em nível de organização** com **controles de visibilidade em nível de automação**.
<Frame>
<img src="/images/enterprise/users_and_roles.png" alt="Visão geral de RBAC no CrewAI Enterprise" />
</Frame>
## Usuários e Funções
Cada membro da sua workspace possui uma função, que determina o acesso aos recursos.
Você pode:
- Usar funções pré-definidas (Owner, Member)
- Criar funções personalizadas com permissões específicas
- Atribuir funções a qualquer momento no painel de configurações
A configuração de usuários e funções é feita em Settings → Roles.
<Steps>
<Step title="Abrir Roles">
Vá em <b>Settings → Roles</b> no CrewAI Enterprise.
</Step>
<Step title="Escolher a função">
Use <b>Owner</b> ou <b>Member</b>, ou clique em <b>Create role</b> para criar uma função personalizada.
</Step>
<Step title="Atribuir aos membros">
Selecione os usuários e atribua a função. Você pode alterar depois.
</Step>
</Steps>
### Resumo de configuração
| Área | Onde configurar | Opções |
|:---|:---|:---|
| Usuários & Funções | Settings → Roles | Pré-definidas: Owner, Member; Funções personalizadas |
| Visibilidade da automação | Automation → Settings → Visibility | Private; Lista de usuários/funções |
## Controle de Acesso em Nível de Automação
Além das funções na organização, as **Automations** suportam visibilidade refinada para restringir acesso por usuário ou função.
Útil para:
- Manter automações sensíveis/experimentais privadas
- Gerenciar visibilidade em equipes grandes ou colaboradores externos
- Testar automações em contexto isolado
Em modo privado, somente usuários/funções na whitelist poderão:
- Ver a automação
- Executar/usar a API
- Acessar logs, métricas e configurações
O owner da organização sempre tem acesso, independente da visibilidade.
Configure em Automation → Settings → Visibility.
<Steps>
<Step title="Abrir a aba Visibility">
Acesse <b>Automation → Settings → Visibility</b>.
</Step>
<Step title="Definir visibilidade">
Selecione <b>Private</b> para restringir o acesso. O owner mantém acesso.
</Step>
<Step title="Permitir acesso">
Adicione usuários e funções que poderão ver/executar e acessar logs/métricas/configurações.
</Step>
<Step title="Salvar e verificar">
Salve e confirme que não listados não conseguem ver ou executar a automação.
</Step>
</Steps>
### Resultado de acesso no modo Private
| Ação | Owner | Usuário/função na whitelist | Não listado |
|:---|:---|:---|:---|
| Ver automação | ✓ | ✓ | ✗ |
| Executar/API | ✓ | ✓ | ✗ |
| Logs/métricas/configurações | ✓ | ✓ | ✗ |
<Tip>
O owner sempre possui acesso. Em modo privado, somente usuários/funções na whitelist têm permissão.
</Tip>
<Frame>
<img src="/images/enterprise/visibility.png" alt="Configuração de visibilidade no CrewAI Enterprise" />
</Frame>
<Card title="Precisa de Ajuda?" icon="headset" href="mailto:support@crewai.com">
Fale com o nosso time para suporte em configuração e auditoria de RBAC.
</Card>

View File

@@ -0,0 +1,22 @@
---
title: CrewAI Cookbooks
description: Quickstarts e notebooks focados em recursos para aprender padrões rapidamente.
icon: book
---
## Quickstarts & Demos
<CardGroup cols={2}>
<Card title="Repositório de Quickstarts" icon="bolt" href="https://github.com/crewAIInc/crewAI-quickstarts">
Demos e projetos pequenos que mostram capacidades específicas do CrewAI.
</Card>
<Card title="Notebooks nos Exemplos" icon="book-open" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks">
Notebooks interativos para aprendizado prático.
</Card>
</CardGroup>
<Tip>
Use Cookbooks para aprender um padrão rapidamente e, em seguida, avance para os Exemplos completos para implementações de produção.
</Tip>

View File

@@ -1,62 +1,85 @@
---
title: Exemplos CrewAI
description: Uma coleção de exemplos que mostram como usar o framework CrewAI para automatizar fluxos de trabalho.
description: Explore exemplos organizados por Crews, Flows, Integrações e Notebooks.
icon: rocket-launch
---
## Crews
<CardGroup cols={3}>
<Card
title="Estratégia de Marketing"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/marketing_strategy"
icon="bullhorn"
iconType="solid"
>
Automatize a criação de estratégias de marketing com CrewAI.
<Card title="Estratégia de Marketing" icon="bullhorn" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/marketing_strategy">
Planejamento de campanhas com múltiplos agentes.
</Card>
<Card title="Viagem Surpresa" icon="plane" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/surprise_trip">
Planejamento de viagens personalizadas.
</Card>
<Card title="Relacionar Perfil a Posições" icon="id-card" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/match_profile_to_positions">
Correspondência de CV para vagas com busca vetorial.
</Card>
<Card title="Criar Vaga" icon="newspaper" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/job-posting">
Criação automatizada de descrições de vagas.
</Card>
<Card title="Crew Construtor de Jogos" icon="gamepad" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/game-builder-crew">
Equipe multiagente que projeta e constrói jogos em Python.
</Card>
<Card title="Recrutamento" icon="user-group" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews/recruitment">
Prospecção e avaliação de candidatos.
</Card>
<Card title="Ver todos os Crews" icon="users" href="https://github.com/crewAIInc/crewAI-examples/tree/main/crews">
Lista completa de exemplos de crews.
</Card>
</CardGroup>
## Flows
<CardGroup cols={3}>
<Card title="Content Creator Flow" icon="pen" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/content_creator_flow">
Geração de conteúdo com roteamento.
</Card>
<Card title="Email Auto Responder" icon="envelope" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/email_auto_responder_flow">
Monitoramento e respostas de email.
</Card>
<Card title="Lead Score Flow" icon="chart-line" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/lead_score_flow">
Qualificação de leads com revisão humana.
</Card>
<Card title="Meeting Assistant Flow" icon="calendar" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/meeting_assistant_flow">
Processamento de notas com integrações.
</Card>
<Card title="Self Evaluation Loop" icon="rotate" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/self_evaluation_loop_flow">
Fluxos de autoaperfeiçoamento iterativo.
</Card>
<Card title="Escrever um Livro (Flows)" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows/write_a_book_with_flows">
Geração paralela de capítulos.
</Card>
<Card title="Ver todos os Flows" icon="diagram-project" href="https://github.com/crewAIInc/crewAI-examples/tree/main/flows">
Lista completa de exemplos de flows.
</Card>
</CardGroup>
## Integrações
<CardGroup cols={3}>
<Card title="CrewAI ↔ LangGraph" icon="link" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/crewai-langgraph">
Integração com o framework LangGraph.
</Card>
<Card title="Azure OpenAI" icon="cloud" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/azure_model">
Usando CrewAI com Azure OpenAI.
</Card>
<Card title="Modelos NVIDIA" icon="microchip" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations/nvidia_models">
Integrações com o ecossistema NVIDIA.
</Card>
<Card title="Ver todas as Integrações" icon="puzzle-piece" href="https://github.com/crewAIInc/crewAI-examples/tree/main/integrations">
Todos os exemplos de integrações.
</Card>
</CardGroup>
## Notebooks
<CardGroup cols={2}>
<Card title="Simple QA Crew + Flow" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks/Simple%20QA%20Crew%20%2B%20Flow">
Simple QA Crew + Flow.
</Card>
<Card title="Todos os Notebooks" icon="book" href="https://github.com/crewAIInc/crewAI-examples/tree/main/Notebooks">
Exemplos interativos para aprendizado e experimentação.
</Card>
<Card
title="Viagem Surpresa"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/surprise_trip"
icon="plane"
iconType="duotone"
>
Crie um roteiro de viagem surpresa com CrewAI.
</Card>
<Card
title="Relacionar Perfil a Posições"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/match_profile_to_positions"
icon="linkedin"
iconType="duotone"
>
Relacione um perfil a vagas de emprego com CrewAI.
</Card>
<Card
title="Criar Vaga"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/job-posting"
icon="newspaper"
iconType="duotone"
>
Crie uma vaga de emprego com CrewAI.
</Card>
<Card
title="Gerador de Jogos"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/game-builder-crew"
icon="gamepad"
iconType="duotone"
>
Crie um jogo com CrewAI.
</Card>
<Card
title="Encontrar Candidatos"
color="#F3A78B"
href="https://github.com/crewAIInc/crewAI-examples/tree/main/recruitment"
icon="user-group"
iconType="duotone"
>
Encontre candidatos a vagas com CrewAI.
</Card>
</CardGroup>

View File

@@ -1,126 +0,0 @@
---
title: Integração com AgentOps
description: Entendendo e registrando a performance do seu agente com AgentOps.
icon: paperclip
---
# Introdução
Observabilidade é um aspecto fundamental no desenvolvimento e implantação de agentes de IA conversacional. Ela permite que desenvolvedores compreendam como seus agentes estão performando,
como eles estão interagindo com os usuários e como utilizam ferramentas externas e APIs.
AgentOps é um produto independente do CrewAI que fornece uma solução completa de observabilidade para agentes.
## AgentOps
[AgentOps](https://agentops.ai/?=crew) oferece replay de sessões, métricas e monitoramento para agentes.
Em um alto nível, o AgentOps oferece a capacidade de monitorar custos, uso de tokens, latência, falhas do agente, estatísticas de sessão e muito mais.
Para mais informações, confira o [Repositório do AgentOps](https://github.com/AgentOps-AI/agentops).
### Visão Geral
AgentOps fornece monitoramento para agentes em desenvolvimento e produção.
Disponibiliza um dashboard para acompanhamento de performance dos agentes, replay de sessões e relatórios personalizados.
Além disso, o AgentOps traz análises detalhadas das sessões para visualizar interações do agente Crew, chamadas LLM e uso de ferramentas em tempo real.
Esse recurso é útil para depuração e entendimento de como os agentes interagem com usuários e entre si.
![Visão geral de uma série selecionada de execuções de sessões do agente](/images/agentops-overview.png)
![Visão geral das análises detalhadas de sessões para examinar execuções de agentes](/images/agentops-session.png)
![Visualizando um gráfico de execução passo a passo do replay do agente](/images/agentops-replay.png)
### Funcionalidades
- **Gerenciamento e Rastreamento de Custos de LLM**: Acompanhe gastos com provedores de modelos fundamentais.
- **Análises de Replay**: Assista gráficos de execução do agente, passo a passo.
- **Detecção de Pensamento Recursivo**: Identifique quando agentes entram em loops infinitos.
- **Relatórios Personalizados**: Crie análises customizadas sobre a performance dos agentes.
- **Dashboard Analítico**: Monitore estatísticas gerais de agentes em desenvolvimento e produção.
- **Teste de Modelos Públicos**: Teste seus agentes em benchmarks e rankings.
- **Testes Personalizados**: Execute seus agentes em testes específicos de domínio.
- **Depuração com Viagem no Tempo**: Reinicie suas sessões a partir de checkpoints.
- **Conformidade e Segurança**: Crie registros de auditoria e detecte possíveis ameaças como uso de palavrões e vazamento de dados pessoais.
- **Detecção de Prompt Injection**: Identifique possíveis injeções de código e vazamentos de segredos.
### Utilizando o AgentOps
<Steps>
<Step title="Crie uma Chave de API">
Crie uma chave de API de usuário aqui: [Create API Key](https://app.agentops.ai/account)
</Step>
<Step title="Configure seu Ambiente">
Adicione sua chave API nas variáveis de ambiente:
```bash
AGENTOPS_API_KEY=<YOUR_AGENTOPS_API_KEY>
```
</Step>
<Step title="Instale o AgentOps">
Instale o AgentOps com:
```bash
pip install 'crewai[agentops]'
```
ou
```bash
pip install agentops
```
</Step>
<Step title="Inicialize o AgentOps">
Antes de utilizar o `Crew` no seu script, inclua estas linhas:
```python
import agentops
agentops.init()
```
Isso irá iniciar uma sessão do AgentOps e também rastrear automaticamente os agentes Crew. Para mais detalhes sobre como adaptar sistemas de agentes mais complexos,
confira a [documentação do AgentOps](https://docs.agentops.ai) ou participe do [Discord](https://discord.gg/j4f3KbeH).
</Step>
</Steps>
### Exemplos de Crew + AgentOps
<CardGroup cols={3}>
<Card
title="Vaga de Emprego"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/job-posting"
icon="briefcase"
iconType="solid"
>
Exemplo de um agente Crew que gera vagas de emprego.
</Card>
<Card
title="Validador de Markdown"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/markdown_validator"
icon="markdown"
iconType="solid"
>
Exemplo de um agente Crew que valida arquivos Markdown.
</Card>
<Card
title="Post no Instagram"
color="#F3A78B"
href="https://github.com/joaomdmoura/crewAI-examples/tree/main/instagram_post"
icon="square-instagram"
iconType="brands"
>
Exemplo de um agente Crew que gera posts para Instagram.
</Card>
</CardGroup>
### Mais Informações
Para começar, crie uma [conta AgentOps](https://agentops.ai/?=crew).
Para sugestões de funcionalidades ou relatos de bugs, entre em contato com o time do AgentOps pelo [Repositório do AgentOps](https://github.com/AgentOps-AI/agentops).
#### Links Extras
<a href="https://twitter.com/agentopsai/">🐦 Twitter</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://discord.gg/JHPt4C7r">📢 Discord</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://app.agentops.ai/?=crew">🖇️ Dashboard AgentOps</a>
<span>&nbsp;&nbsp;•&nbsp;&nbsp;</span>
<a href="https://docs.agentops.ai/introduction">📙 Documentação</a>

View File

@@ -21,9 +21,6 @@ A observabilidade é fundamental para entender como seus agentes CrewAI estão d
### Plataformas de Monitoramento e Rastreamento
<CardGroup cols={2}>
<Card title="AgentOps" icon="paperclip" href="/pt-BR/observability/agentops">
Replays de sessões, métricas e monitoramento para desenvolvimento e produção de agentes.
</Card>
<Card title="LangDB" icon="database" href="/pt-BR/observability/langdb">
Rastreamento ponta a ponta para fluxos de trabalho CrewAI com captura automática de interações de agentes.

View File

@@ -0,0 +1,145 @@
---
title: Integração com a TrueFoundry
icon: chart-line
---
A TrueFoundry fornece um [AI Gateway](https://www.truefoundry.com/ai-gateway) pronto para uso empresarial, que pode ser usado para governança e observabilidade em frameworks agentivos como o CrewAI. O AI Gateway da TrueFoundry funciona como uma interface unificada para acesso a LLMs, oferecendo:
- **Acesso unificado à API**: Conecte-se a 250+ LLMs (OpenAI, Claude, Gemini, Groq, Mistral) por meio de uma única API
- **Baixa latência**: Latência interna abaixo de 3 ms com roteamento inteligente e balanceamento de carga
- **Segurança corporativa**: Conformidade com SOC 2, HIPAA e GDPR, com RBAC e auditoria de logs
- **Gestão de cotas e custos**: Cotas baseadas em tokens, rate limiting e rastreamento abrangente de uso
- **Observabilidade**: Registro completo de requisições/respostas, métricas e traces com retenção personalizável
## Como a TrueFoundry se integra ao CrewAI
### Instalação e configuração
<Steps>
<Step title="Instalar o CrewAI">
```bash
pip install crewai
```
</Step>
<Step title="Obter o token de acesso da TrueFoundry">
1. Crie uma conta na [TrueFoundry](https://www.truefoundry.com/register)
2. Siga os passos do [Início rápido](https://docs.truefoundry.com/gateway/quick-start)
</Step>
<Step title="Configurar o CrewAI com a TrueFoundry">
![Configuração de código da TrueFoundry](/images/new-code-snippet.png)
```python
from crewai import LLM
# Criar uma instância de LLM com o AI Gateway da TrueFoundry
truefoundry_llm = LLM(
model="openai-main/gpt-4o", # Da mesma forma, você pode chamar qualquer modelo de qualquer provedor
base_url="your_truefoundry_gateway_base_url",
api_key="your_truefoundry_api_key"
)
# Usar nos seus agentes do CrewAI
from crewai import Agent
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
llm=truefoundry_llm,
verbose=True
)
```
</Step>
</Steps>
### Exemplo completo do CrewAI
```python
from crewai import Agent, Task, Crew, LLM
# Configurar o LLM com a TrueFoundry
llm = LLM(
model="openai-main/gpt-4o",
base_url="your_truefoundry_gateway_base_url",
api_key="your_truefoundry_api_key"
)
# Criar agentes
researcher = Agent(
role='Analista de Pesquisa',
goal='Conduzir pesquisa de mercado detalhada',
backstory='Analista de mercado especialista com atenção aos detalhes',
llm=llm,
verbose=True
)
writer = Agent(
role='Redator de Conteúdo',
goal='Criar relatórios abrangentes',
backstory='Redator técnico experiente',
llm=llm,
verbose=True
)
# Criar tarefas
research_task = Task(
description='Pesquisar tendências do mercado de IA para 2024',
agent=researcher,
expected_output='Resumo de pesquisa abrangente'
)
writing_task = Task(
description='Criar um relatório de pesquisa de mercado',
agent=writer,
expected_output='Relatório bem estruturado com insights',
context=[research_task]
)
# Criar e executar a crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=True
)
result = crew.kickoff()
```
### Observabilidade e governança
Monitore seus agentes do CrewAI pela aba de métricas da TrueFoundry:
![Métricas da TrueFoundry](/images/gateway-metrics.png)
Com o AI Gateway da TrueFoundry, você pode monitorar e analisar:
- **Métricas de desempenho**: Acompanhe métricas-chave de latência como Latência da Requisição, Tempo até o Primeiro Token (TTFS) e Latência entre Tokens (ITL), com percentis P99, P90 e P50
- **Custos e uso de tokens**: Tenha visibilidade dos custos da sua aplicação com detalhamento de tokens de entrada/saída e das despesas associadas a cada modelo
- **Padrões de uso**: Entenda como sua aplicação está sendo utilizada com análises detalhadas sobre atividade de usuários, distribuição de modelos e uso por equipe
- **Limite de taxa e balanceamento de carga**: Você pode configurar rate limiting, balanceamento de carga e fallback para seus modelos
## Rastreamento
Para uma compreensão mais detalhada sobre rastreamento, consulte [getting-started-tracing](https://docs.truefoundry.com/docs/tracing/tracing-getting-started). Para rastreamento, você pode adicionar o SDK do Traceloop:
```bash
pip install traceloop-sdk
```
```python
from traceloop.sdk import Traceloop
# Inicializar rastreamento avançado
Traceloop.init(
api_endpoint="https://your-truefoundry-endpoint/api/tracing",
headers={
"Authorization": f"Bearer {your_truefoundry_pat_token}",
"TFY-Tracing-Project": "your_project_name",
},
)
```
Isso oferece correlação adicional de rastreamentos em todo o seu fluxo de trabalho com o CrewAI.
![Rastreamento do CrewAI na TrueFoundry](/images/tracing_crewai.png)

View File

@@ -1,103 +0,0 @@
#!/usr/bin/env python3
"""
Example demonstrating encrypted agent-to-agent communication in CrewAI.
This example shows how to:
1. Enable encrypted communication for agents
2. Use existing agent tools with encryption
3. Verify that communication is encrypted between agents
"""
from crewai import Agent, Crew, Task
from crewai.security import SecurityConfig
from crewai.tools.agent_tools.ask_question_tool import AskQuestionTool
def main():
"""Demonstrate encrypted agent communication."""
print("🔒 CrewAI Encrypted Agent Communication Example")
print("=" * 50)
# Create agents with encrypted communication enabled
print("Creating agents with encrypted communication...")
# Researcher agent with encryption enabled
researcher = Agent(
role="Senior Research Analyst",
goal="Conduct thorough research and analysis",
backstory="You are an expert researcher with years of experience in data analysis.",
security_config=SecurityConfig(encrypted_communication=True),
verbose=True
)
# Writer agent with encryption enabled
writer = Agent(
role="Content Writer",
goal="Create compelling content based on research",
backstory="You are a skilled writer who transforms complex research into engaging content.",
security_config=SecurityConfig(encrypted_communication=True),
verbose=True
)
print(f"✓ Researcher agent created with encryption: {researcher.security_config.encrypted_communication}")
print(f"✓ Writer agent created with encryption: {writer.security_config.encrypted_communication}")
print(f"✓ Researcher fingerprint: {researcher.security_config.fingerprint.uuid_str[:8]}...")
print(f"✓ Writer fingerprint: {writer.security_config.fingerprint.uuid_str[:8]}...")
# Create agent tools - these will automatically use encryption when available
agent_tools = [
AskQuestionTool(
agents=[researcher, writer],
description="Tool for asking questions to coworkers with encrypted communication support"
)
]
print(f"✓ Agent tools created with encryption capability")
# Create tasks that will involve encrypted communication
research_task = Task(
description="Research the latest trends in artificial intelligence and machine learning",
expected_output="A comprehensive research report on AI/ML trends",
agent=researcher
)
writing_task = Task(
description="Ask the researcher about their findings and write a blog post",
expected_output="An engaging blog post about AI trends",
agent=writer,
tools=agent_tools
)
# Create crew with both agents
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, writing_task],
verbose=True
)
print("\n🚀 Starting crew execution with encrypted communication...")
print("Note: Agent communications will be automatically encrypted!")
# Execute the crew - agent tools will use encryption automatically
try:
result = crew.kickoff()
print("\n✅ Crew execution completed successfully!")
print("=" * 50)
print("RESULT:")
print(result)
except Exception as e:
print(f"\n❌ Execution failed: {e}")
print("This is expected in a demo environment without proper LLM configuration")
print("\n🔍 Key Features Demonstrated:")
print("- Agents created with SecurityConfig(encrypted_communication=True)")
print("- Unique fingerprints generated for each agent")
print("- Agent tools automatically detect encryption capability")
print("- Communication payloads encrypted using Fernet symmetric encryption")
print("- Keys derived from agent fingerprints for secure communication")
print("- Backward compatibility maintained - non-encrypted agents still work")
if __name__ == "__main__":
main()

View File

@@ -48,11 +48,10 @@ Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = ["crewai-tools~=0.60.0"]
tools = ["crewai-tools~=0.62.0"]
embeddings = [
"tiktoken~=0.8.0"
]
agentops = ["agentops==0.3.18"]
pdfplumber = [
"pdfplumber>=0.11.4",
]

View File

@@ -54,7 +54,7 @@ def _track_install_async():
_track_install_async()
__version__ = "0.157.0"
__version__ = "0.159.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -1,7 +1,7 @@
import shutil
import subprocess
import time
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple, Type, Union, cast
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
@@ -399,8 +399,13 @@ class Agent(BaseAgent):
),
)
tools = tools or self.tools or []
self.create_agent_executor(tools=tools, task=task)
if tools is not None:
agent_tools: List[Union[BaseTool, dict]] = cast(List[Union[BaseTool, dict]], tools)
elif self.tools is not None:
agent_tools = cast(List[Union[BaseTool, dict]], self.tools)
else:
agent_tools = []
self.create_agent_executor(tools=agent_tools, task=task)
if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
@@ -537,14 +542,14 @@ class Agent(BaseAgent):
)["output"]
def create_agent_executor(
self, tools: Optional[List[BaseTool]] = None, task=None
self, tools: Optional[List[Union[BaseTool, dict]]] = None, task=None
) -> None:
"""Create an agent executor for the agent.
Returns:
An instance of the CrewAgentExecutor class.
"""
raw_tools: List[BaseTool] = tools or self.tools or []
raw_tools: List[Union[BaseTool, dict]] = tools or self.tools or []
parsed_tools = parse_tools(raw_tools)
prompt = Prompts(
@@ -803,7 +808,7 @@ class Agent(BaseAgent):
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
tools=[tool for tool in (self.tools or []) if isinstance(tool, BaseTool)],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
@@ -841,7 +846,7 @@ class Agent(BaseAgent):
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
tools=[tool for tool in (self.tools or []) if isinstance(tool, BaseTool)],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,

View File

@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from pydantic import PrivateAttr
@@ -25,11 +25,11 @@ class BaseAgentAdapter(BaseAgent, ABC):
self._agent_config = agent_config
@abstractmethod
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
"""Configure and adapt tools for the specific agent implementation.
Args:
tools: Optional list of BaseTool instances to be configured
tools: Optional list of BaseTool instances and raw tool definitions to be configured
"""
pass

View File

@@ -1,4 +1,4 @@
from typing import Any, AsyncIterable, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from pydantic import Field, PrivateAttr
@@ -22,7 +22,6 @@ from crewai.utilities.events.agent_events import (
)
try:
from langchain_core.messages import ToolMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
@@ -128,7 +127,8 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
tools: Optional[List[BaseTool]] = None,
) -> str:
"""Execute a task using the LangGraph workflow."""
self.create_agent_executor(tools)
mixed_tools: Optional[List[Union[BaseTool, dict]]] = tools # type: ignore[assignment]
self.create_agent_executor(mixed_tools)
self.configure_structured_output(task)
@@ -198,17 +198,20 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
)
raise
def create_agent_executor(self, tools: Optional[List[BaseTool]] = None) -> None:
def create_agent_executor(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
"""Configure the LangGraph agent for execution."""
self.configure_tools(tools)
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
"""Configure tools for the LangGraph agent."""
if tools:
all_tools = list(self.tools or []) + list(tools or [])
self._tool_adapter.configure_tools(all_tools)
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
base_tools = [tool for tool in tools if isinstance(tool, BaseTool)]
existing_base_tools = [tool for tool in (self.tools or []) if isinstance(tool, BaseTool)]
all_tools = existing_base_tools + base_tools
if all_tools:
self._tool_adapter.configure_tools(all_tools)
available_tools = self._tool_adapter.tools()
self._graph.tools = available_tools
def get_delegation_tools(self, agents: List[BaseAgent]) -> List[BaseTool]:
"""Implement delegation tools support for LangGraph."""

View File

@@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any, List, Optional, Union
from pydantic import Field, PrivateAttr
@@ -152,10 +152,12 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
self.agent_executor = Runner
def configure_tools(self, tools: Optional[List[BaseTool]] = None) -> None:
def configure_tools(self, tools: Optional[List[Union[BaseTool, dict]]] = None) -> None:
"""Configure tools for the OpenAI Assistant"""
if tools:
self._tool_adapter.configure_tools(tools)
base_tools = [tool for tool in tools if isinstance(tool, BaseTool)]
if base_tools:
self._tool_adapter.configure_tools(base_tools)
if self._tool_adapter.converted_tools:
self._openai_agent.tools = self._tool_adapter.converted_tools

View File

@@ -1,9 +1,8 @@
import logging
import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, TypeVar
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union
from pydantic import (
UUID4,
@@ -26,13 +25,10 @@ from crewai.security.security_config import SecurityConfig
from crewai.tools.base_tool import BaseTool, Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter
from crewai.utilities.string_utils import interpolate_only
T = TypeVar("T", bound="BaseAgent")
logger = logging.getLogger(__name__)
class BaseAgent(ABC, BaseModel):
"""Abstract Base Class for all third party agents compatible with CrewAI.
@@ -111,7 +107,7 @@ class BaseAgent(ABC, BaseModel):
default=False,
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[BaseTool]] = Field(
tools: Optional[List[Union[BaseTool, dict]]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: int = Field(
@@ -171,29 +167,34 @@ class BaseAgent(ABC, BaseModel):
@field_validator("tools")
@classmethod
def validate_tools(cls, tools: List[Any]) -> List[BaseTool]:
def validate_tools(cls, tools: List[Any]) -> List[Union[BaseTool, dict]]:
"""Validate and process the tools provided to the agent.
This method ensures that each tool is either an instance of BaseTool
or an object with 'name', 'func', and 'description' attributes. If the
tool meets these criteria, it is processed and added to the list of
tools. Otherwise, a ValueError is raised.
This method ensures that each tool is either an instance of BaseTool,
an object with 'name', 'func', and 'description' attributes, or a
raw tool definition (dict) for hosted/server-side tools.
"""
if not tools:
return []
processed_tools = []
processed_tools: List[Union[BaseTool, dict]] = []
required_attrs = ["name", "func", "description"]
for tool in tools:
if isinstance(tool, BaseTool):
processed_tools.append(tool)
elif isinstance(tool, dict):
if "name" not in tool:
raise ValueError(
f"Raw tool definition must have a 'name' field: {tool}"
)
processed_tools.append(tool)
elif all(hasattr(tool, attr) for attr in required_attrs):
# Tool has the required attributes, create a Tool instance
processed_tools.append(Tool.from_langchain(tool))
else:
raise ValueError(
f"Invalid tool type: {type(tool)}. "
"Tool must be an instance of BaseTool or "
"Tool must be an instance of BaseTool, a dict for hosted tools, or "
"an object with 'name', 'func', and 'description' attributes."
)
return processed_tools
@@ -220,12 +221,6 @@ class BaseAgent(ABC, BaseModel):
if self.security_config is None:
self.security_config = SecurityConfig()
# Log encryption status for agent initialization
if hasattr(self.security_config, 'encrypted_communication') and self.security_config.encrypted_communication:
logger.info(f"Agent '{self.role}' initialized with encrypted communication enabled (fingerprint: {self.security_config.fingerprint.uuid_str[:8]}...)")
else:
logger.debug(f"Agent '{self.role}' initialized with encrypted communication disabled")
return self
@field_validator("id", mode="before")

View File

@@ -1,5 +1,5 @@
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Dict, List
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
@@ -21,6 +21,7 @@ class CrewAgentExecutorMixin:
task: "Task"
iterations: int
max_iter: int
messages: List[Dict[str, str]]
_i18n: I18N
_printer: Printer = Printer()
@@ -62,6 +63,7 @@ class CrewAgentExecutorMixin:
value=output.text,
metadata={
"description": self.task.description,
"messages": self.messages,
},
agent=self.agent.role,
)
@@ -127,7 +129,6 @@ class CrewAgentExecutorMixin:
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input with mode-appropriate messaging."""
event_listener.formatter.pause_live_updates()
try:
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"

View File

@@ -48,7 +48,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
agent: BaseAgent,
prompt: dict[str, str],
max_iter: int,
tools: List[CrewStructuredTool],
tools: List[Union[CrewStructuredTool, dict]],
tools_names: str,
stop_words: List[str],
tools_description: str,
@@ -84,8 +84,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.messages: List[Dict[str, str]] = []
self.iterations = 0
self.log_error_after = 3
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool]] = {
tool.name: tool for tool in self.tools
self.tool_name_to_tool_map: Dict[str, Union[CrewStructuredTool, BaseTool, dict]] = {
tool.name if hasattr(tool, 'name') else tool.get('name', 'unknown'): tool for tool in self.tools
}
existing_stop = self.llm.stop or []
self.llm.stop = list(

View File

@@ -1,9 +1,13 @@
from .utils import TokenManager
class AuthError(Exception):
pass
def get_auth_token() -> str:
"""Get the authentication token."""
access_token = TokenManager().get_token()
if not access_token:
raise Exception("No token found, make sure you are logged in")
raise AuthError("No token found, make sure you are logged in")
return access_token

View File

@@ -18,6 +18,7 @@ class PlusAPI:
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
@@ -124,6 +125,11 @@ class PlusAPI:
"POST", f"{self.TRACING_RESOURCE}/batches", json=payload
)
def initialize_ephemeral_trace_batch(self, payload) -> requests.Response:
return self._make_request(
"POST", f"{self.EPHEMERAL_TRACING_RESOURCE}/batches", json=payload
)
def send_trace_events(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"POST",
@@ -131,9 +137,27 @@ class PlusAPI:
json=payload,
)
def send_ephemeral_trace_events(
self, trace_batch_id: str, payload
) -> requests.Response:
return self._make_request(
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
)
def finalize_trace_batch(self, trace_batch_id: str, payload) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)
def finalize_ephemeral_trace_batch(
self, trace_batch_id: str, payload
) -> requests.Response:
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.157.0,<1.0.0"
"crewai[tools]>=0.159.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.157.0,<1.0.0",
"crewai[tools]>=0.159.0,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.157.0"
"crewai[tools]>=0.159.0"
]
[tool.crewai]

View File

@@ -1,6 +1,5 @@
import asyncio
import json
import logging
import re
import uuid
import warnings
@@ -78,7 +77,10 @@ from crewai.utilities.events.listeners.tracing.trace_listener import (
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.listeners.tracing.utils import (
is_tracing_enabled,
on_first_execution_tracing_confirmation,
)
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -90,8 +92,6 @@ from crewai.utilities.training_handler import CrewTrainingHandler
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
logger = logging.getLogger(__name__)
class Crew(FlowTrackable, BaseModel):
"""
@@ -286,8 +286,11 @@ class Crew(FlowTrackable, BaseModel):
self._cache_handler = CacheHandler()
event_listener = EventListener()
if on_first_execution_tracing_confirmation():
self.tracing = True
if is_tracing_enabled() or self.tracing:
trace_listener = TraceCollectionListener(tracing=self.tracing)
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose
@@ -384,20 +387,6 @@ class Crew(FlowTrackable, BaseModel):
self._setup_from_config()
if self.agents:
# Count agents with encryption enabled
encryption_enabled_agents = [
agent for agent in self.agents
if hasattr(agent, 'security_config')
and agent.security_config
and getattr(agent.security_config, 'encrypted_communication', False)
]
if encryption_enabled_agents:
logger.info(f"Crew initialized with {len(encryption_enabled_agents)} agent(s) having encrypted communication enabled: {[agent.role for agent in encryption_enabled_agents]}")
logger.info("Agent-to-agent communication will be automatically encrypted when using delegation tools")
else:
logger.debug(f"Crew initialized with {len(self.agents)} agent(s) - encrypted communication disabled for all agents")
for agent in self.agents:
if self.cache:
agent.set_cache_handler(self._cache_handler)

View File

@@ -43,7 +43,9 @@ class ToolSelectionEvaluator(BaseEvaluator):
available_tools_info = ""
if agent.tools:
for tool in agent.tools:
available_tools_info += f"- {tool.name}: {tool.description}\n"
tool_name = tool.name if hasattr(tool, 'name') else tool.get('name', 'unknown')
tool_desc = tool.description if hasattr(tool, 'description') else tool.get('description', 'No description')
available_tools_info += f"- {tool_name}: {tool_desc}\n"
else:
available_tools_info = "No tools available"

View File

@@ -17,10 +17,13 @@ from typing import (
)
from uuid import uuid4
from opentelemetry import baggage
from opentelemetry.context import attach, detach
from pydantic import BaseModel, Field, ValidationError
from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData
from crewai.flow.utils import get_possible_return_constants
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.flow_events import (
@@ -35,7 +38,10 @@ from crewai.utilities.events.flow_events import (
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.listeners.tracing.utils import (
is_tracing_enabled,
on_first_execution_tracing_confirmation,
)
from crewai.utilities.printer import Printer
logger = logging.getLogger(__name__)
@@ -467,13 +473,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_execution_counts: Dict[str, int] = {}
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs
self._completed_methods: Set[str] = set() # Track completed methods for reload
self._persistence: Optional[FlowPersistence] = persistence
# Initialize state with initial values
self._state = self._create_initial_state()
self.tracing = tracing
if is_tracing_enabled() or tracing:
trace_listener = TraceCollectionListener(tracing=tracing)
if (
on_first_execution_tracing_confirmation()
or is_tracing_enabled()
or self.tracing
):
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)
# Apply any additional kwargs
if kwargs:
@@ -718,6 +729,73 @@ class Flow(Generic[T], metaclass=FlowMeta):
else:
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")
def reload(self, execution_data: FlowExecutionData) -> None:
"""Reloads the flow from an execution data dict.
This method restores the flow's execution ID, completed methods, and state,
allowing it to resume from where it left off.
Args:
execution_data: Flow execution data containing:
- id: Flow execution ID
- flow: Flow structure
- completed_methods: List of successfully completed methods
- execution_methods: All execution methods with their status
"""
flow_id = execution_data.get("id")
if flow_id:
self._update_state_field("id", flow_id)
self._completed_methods = {
name
for method_data in execution_data.get("completed_methods", [])
if (name := method_data.get("flow_method", {}).get("name")) is not None
}
execution_methods = execution_data.get("execution_methods", [])
if not execution_methods:
return
sorted_methods = sorted(
execution_methods,
key=lambda m: m.get("started_at", ""),
)
state_to_apply = None
for method in reversed(sorted_methods):
if method.get("final_state"):
state_to_apply = method["final_state"]
break
if not state_to_apply and sorted_methods:
last_method = sorted_methods[-1]
if last_method.get("initial_state"):
state_to_apply = last_method["initial_state"]
if state_to_apply:
self._apply_state_updates(state_to_apply)
for i, method in enumerate(sorted_methods[:-1]):
method_name = method.get("flow_method", {}).get("name")
if method_name:
self._completed_methods.add(method_name)
def _update_state_field(self, field_name: str, value: Any) -> None:
"""Update a single field in the state."""
if isinstance(self._state, dict):
self._state[field_name] = value
elif hasattr(self._state, field_name):
object.__setattr__(self._state, field_name, value)
def _apply_state_updates(self, updates: Dict[str, Any]) -> None:
"""Apply multiple state updates efficiently."""
if isinstance(self._state, dict):
self._state.update(updates)
elif hasattr(self._state, "__dict__"):
for key, value in updates.items():
if hasattr(self._state, key):
object.__setattr__(self._state, key, value)
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Start the flow execution in a synchronous context.
@@ -746,68 +824,81 @@ class Flow(Generic[T], metaclass=FlowMeta):
Returns:
The final output from the flow, which is the result of the last executed method.
"""
if inputs:
# Override the id in the state if it exists in inputs
if "id" in inputs:
if isinstance(self._state, dict):
self._state["id"] = inputs["id"]
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"])
ctx = baggage.set_baggage("flow_inputs", inputs or {})
flow_token = attach(ctx)
# If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = inputs and "id" in inputs and self._persistence is not None
if not is_restoring:
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
# Update state with any additional inputs (ignoring the 'id' key)
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
if filtered_inputs:
self._initialize_state(filtered_inputs)
if inputs:
# Override the id in the state if it exists in inputs
if "id" in inputs:
if isinstance(self._state, dict):
self._state["id"] = inputs["id"]
elif isinstance(self._state, BaseModel):
setattr(self._state, "id", inputs["id"])
# Emit FlowStartedEvent and log the start of the flow.
crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
)
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
)
# If persistence is enabled, attempt to restore the stored state using the provided id.
if "id" in inputs and self._persistence is not None:
restore_uuid = inputs["id"]
stored_state = self._persistence.load_state(restore_uuid)
if stored_state:
self._log_flow_event(
f"Loading flow state from memory for UUID: {restore_uuid}",
color="yellow",
)
self._restore_state(stored_state)
else:
self._log_flow_event(
f"No flow state found for UUID: {restore_uuid}", color="red"
)
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
# Update state with any additional inputs (ignoring the 'id' key)
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
if filtered_inputs:
self._initialize_state(filtered_inputs)
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
# Emit FlowStartedEvent and log the start of the flow.
crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
)
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold_magenta"
)
final_output = self._method_outputs[-1] if self._method_outputs else None
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_output,
),
)
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
return final_output
final_output = self._method_outputs[-1] if self._method_outputs else None
crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_output,
),
)
return final_output
finally:
detach(flow_token)
async def _execute_start_method(self, start_method_name: str) -> None:
"""
@@ -826,7 +917,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
- Executes the start method and captures its result
- Triggers execution of any listeners waiting on this start method
- Part of the flow's initialization sequence
- Skips execution if method was already completed (e.g., after reload)
"""
if start_method_name in self._completed_methods:
last_output = self._method_outputs[-1] if self._method_outputs else None
await self._execute_listeners(start_method_name, last_output)
return
result = await self._execute_method(
start_method_name, self._methods[start_method_name]
)
@@ -861,6 +958,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_execution_counts.get(method_name, 0) + 1
)
self._completed_methods.add(method_name)
crewai_event_bus.emit(
self,
MethodExecutionFinishedEvent(
@@ -1023,12 +1121,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
- Handles errors gracefully with detailed logging
- Recursively triggers listeners of this listener
- Supports both parameterized and parameter-less listeners
- Skips execution if method was already completed (e.g., after reload)
Error Handling
-------------
Catches and logs any exceptions during execution, preventing
individual listener failures from breaking the entire flow.
"""
# TODO: greyson fix
# if listener_name in self._completed_methods:
# await self._execute_listeners(listener_name, None)
# return
try:
method = self._methods[listener_name]
@@ -1047,12 +1151,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
await self._execute_listeners(listener_name, listener_result)
except Exception as e:
print(
f"[Flow._execute_single_listener] Error in method {listener_name}: {e}"
)
import traceback
traceback.print_exc()
logger.error(f"Error executing listener {listener_name}: {e}")
raise
def _log_flow_event(

95
src/crewai/flow/types.py Normal file
View File

@@ -0,0 +1,95 @@
"""Type definitions for CrewAI Flow module.
This module contains TypedDict definitions and type aliases used throughout
the Flow system.
"""
from typing import Any, TypedDict
from typing_extensions import NotRequired, Required
class FlowMethodData(TypedDict):
"""Flow method information.
Attributes:
name: The name of the flow method.
starting_point: Whether this method is a starting point for the flow.
"""
name: str
starting_point: NotRequired[bool]
class CompletedMethodData(TypedDict):
"""Completed method information.
Represents a flow method that has been successfully executed.
Attributes:
flow_method: The flow method information.
status: The completion status of the method.
"""
flow_method: FlowMethodData
status: str
class ExecutionMethodData(TypedDict, total=False):
"""Execution method information.
Contains detailed information about a method's execution, including
timing, state, and any error details.
Attributes:
flow_method: The flow method information.
started_at: ISO timestamp when the method started execution.
finished_at: ISO timestamp when the method finished execution, if completed.
status: Current status of the method execution.
initial_state: The state before method execution.
final_state: The state after method execution.
error_details: Details about any error that occurred during execution.
"""
flow_method: Required[FlowMethodData]
started_at: Required[str]
status: Required[str]
finished_at: str
initial_state: dict[str, Any]
final_state: dict[str, Any]
error_details: dict[str, Any]
class FlowData(TypedDict):
"""Flow structure information.
Contains metadata about the flow structure and its methods.
Attributes:
name: The name of the flow.
flow_methods_attributes: List of all flow methods and their attributes.
"""
name: str
flow_methods_attributes: list[FlowMethodData]
class FlowExecutionData(TypedDict):
"""Flow execution data.
Complete execution data for a flow, including its current state,
completed methods, and execution history. Used for resuming flows
from a previous state.
Attributes:
id: Unique identifier for the flow execution.
flow: Flow structure and metadata.
inputs: Input data provided to the flow.
completed_methods: List of methods that have been successfully completed.
execution_methods: Detailed execution history for all methods.
"""
id: str
flow: FlowData
inputs: dict[str, Any]
completed_methods: list[CompletedMethodData]
execution_methods: list[ExecutionMethodData]

View File

@@ -564,9 +564,10 @@ class LiteAgent(FlowTrackable, BaseModel):
if isinstance(formatted_answer, AgentAction):
try:
from typing import cast
tool_result = execute_tool_and_check_finality(
agent_action=formatted_answer,
tools=self._parsed_tools,
tools=cast(List[Union[CrewStructuredTool, dict]], self._parsed_tools),
i18n=self.i18n,
agent_key=self.key,
agent_role=self.role,

View File

@@ -4,15 +4,10 @@ CrewAI security module.
This module provides security-related functionality for CrewAI, including:
- Fingerprinting for component identity and tracking
- Security configuration for controlling access and permissions
- Encrypted agent-to-agent communication
- Future: authentication, scoping, and delegation mechanisms
"""
from crewai.security.fingerprint import Fingerprint
from crewai.security.security_config import SecurityConfig
from crewai.security.encrypted_communication import (
AgentCommunicationEncryption,
EncryptedMessage
)
__all__ = ["Fingerprint", "SecurityConfig", "AgentCommunicationEncryption", "EncryptedMessage"]
__all__ = ["Fingerprint", "SecurityConfig"]

View File

@@ -1,266 +0,0 @@
"""
Encrypted Communication Module
This module provides functionality for encrypting and decrypting agent-to-agent
communication in CrewAI. It leverages existing security infrastructure including
agent fingerprints and Fernet encryption.
"""
import json
import logging
from typing import Any, Dict, Optional, Union
from cryptography.fernet import Fernet
from pydantic import BaseModel, Field
from crewai.security.fingerprint import Fingerprint
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.encryption_events import (
EncryptionStartedEvent,
EncryptionCompletedEvent,
DecryptionStartedEvent,
DecryptionCompletedEvent,
)
logger = logging.getLogger(__name__)
class EncryptedMessage(BaseModel):
"""
Represents an encrypted message between agents.
Attributes:
encrypted_payload (str): The encrypted message content
sender_fingerprint (str): The fingerprint of the sending agent
recipient_fingerprint (str): The fingerprint of the intended recipient
message_type (str): The type of message (task, question, response, etc.)
"""
encrypted_payload: str = Field(..., description="The encrypted message content")
sender_fingerprint: str = Field(..., description="Sender agent's fingerprint")
recipient_fingerprint: str = Field(..., description="Recipient agent's fingerprint")
message_type: str = Field(default="communication", description="Type of message")
class AgentCommunicationEncryption:
"""
Handles encryption and decryption of agent-to-agent communication.
Uses Fernet symmetric encryption with keys derived from agent fingerprints.
Provides methods to encrypt and decrypt communication payloads.
"""
def __init__(self, agent_fingerprint: Fingerprint, agent=None):
"""
Initialize encryption handler for an agent.
Args:
agent_fingerprint (Fingerprint): The agent's unique fingerprint
agent: The agent instance (optional, needed for events)
"""
self.agent_fingerprint = agent_fingerprint
self.agent = agent
self._encryption_keys: Dict[str, Fernet] = {}
def _derive_communication_key(self, sender_fp: str, recipient_fp: str) -> bytes:
"""
Derive a communication key from sender and recipient fingerprints.
Creates a deterministic key based on both agent fingerprints to ensure
both agents can derive the same key for encrypted communication.
Args:
sender_fp (str): Sender agent's fingerprint
recipient_fp (str): Recipient agent's fingerprint
Returns:
bytes: 32-byte encryption key for Fernet
"""
# Sort fingerprints to ensure consistent key derivation regardless of role
fp_pair = tuple(sorted([sender_fp, recipient_fp]))
key_material = f"crewai_comm_{fp_pair[0]}_{fp_pair[1]}".encode('utf-8')
# Use SHA-256 to derive a 32-byte key from the fingerprint pair
import hashlib
key_hash = hashlib.sha256(key_material).digest()
# Fernet requires base64-encoded 32-byte key
import base64
return base64.urlsafe_b64encode(key_hash)
def _get_fernet(self, sender_fp: str, recipient_fp: str) -> Fernet:
"""
Get or create Fernet instance for communication between two agents.
Args:
sender_fp (str): Sender agent's fingerprint
recipient_fp (str): Recipient agent's fingerprint
Returns:
Fernet: Encryption instance for this agent pair
"""
# Create cache key from sorted fingerprints
cache_key = "_".join(sorted([sender_fp, recipient_fp]))
if cache_key not in self._encryption_keys:
key = self._derive_communication_key(sender_fp, recipient_fp)
self._encryption_keys[cache_key] = Fernet(key)
return self._encryption_keys[cache_key]
def encrypt_message(
self,
message: Union[str, Dict[str, Any]],
recipient_fingerprint: Fingerprint,
message_type: str = "communication",
recipient_agent=None
) -> EncryptedMessage:
"""
Encrypt a message for a specific recipient agent.
Args:
message (Union[str, Dict[str, Any]]): The message to encrypt
recipient_fingerprint (Fingerprint): The recipient agent's fingerprint
message_type (str): Type of message being sent
recipient_agent: The recipient agent instance (optional, needed for events)
Returns:
EncryptedMessage: Encrypted message container
Raises:
ValueError: If encryption fails
"""
try:
# Emit encryption started event if both agents are available
if self.agent and recipient_agent:
crewai_event_bus.emit(
self.agent,
EncryptionStartedEvent(
sender_agent=self.agent,
recipient_agent=recipient_agent,
message_type=message_type
)
)
logger.info(f"Starting encryption for {message_type} message to recipient {recipient_fingerprint.uuid_str[:8]}...")
# Convert message to JSON string if it's a dict
if isinstance(message, dict):
message_str = json.dumps(message)
else:
message_str = str(message)
# Get Fernet instance for this communication pair
fernet = self._get_fernet(
self.agent_fingerprint.uuid_str,
recipient_fingerprint.uuid_str
)
# Encrypt the message
encrypted_bytes = fernet.encrypt(message_str.encode('utf-8'))
encrypted_payload = encrypted_bytes.decode('utf-8')
# Emit encryption completed event if both agents are available
if self.agent and recipient_agent:
crewai_event_bus.emit(
self.agent,
EncryptionCompletedEvent(
sender_agent=self.agent,
recipient_agent=recipient_agent,
message_type=message_type
)
)
logger.info(f"Successfully encrypted {message_type} message from {self.agent_fingerprint.uuid_str[:8]}... to {recipient_fingerprint.uuid_str[:8]}...")
logger.debug(f"Encrypted message from {self.agent_fingerprint.uuid_str[:8]}... to {recipient_fingerprint.uuid_str[:8]}...")
return EncryptedMessage(
encrypted_payload=encrypted_payload,
sender_fingerprint=self.agent_fingerprint.uuid_str,
recipient_fingerprint=recipient_fingerprint.uuid_str,
message_type=message_type
)
except Exception as e:
logger.error(f"Failed to encrypt message: {e}")
raise ValueError(f"Message encryption failed: {e}")
def decrypt_message(self, encrypted_message: EncryptedMessage) -> Union[str, Dict[str, Any]]:
"""
Decrypt a message intended for this agent.
Args:
encrypted_message (EncryptedMessage): The encrypted message to decrypt
Returns:
Union[str, Dict[str, Any]]: The decrypted message content
Raises:
ValueError: If decryption fails or message is not for this agent
"""
try:
# Emit decryption started event if agent is available
if self.agent:
crewai_event_bus.emit(
self.agent,
DecryptionStartedEvent(
recipient_agent=self.agent,
sender_fingerprint=encrypted_message.sender_fingerprint,
message_type=encrypted_message.message_type
)
)
logger.info(f"Starting decryption of {encrypted_message.message_type} message from sender {encrypted_message.sender_fingerprint[:8]}...")
# Verify this message is intended for this agent
if encrypted_message.recipient_fingerprint != self.agent_fingerprint.uuid_str:
raise ValueError(f"Message not intended for this agent. Expected {self.agent_fingerprint.uuid_str[:8]}..., got {encrypted_message.recipient_fingerprint[:8]}...")
# Get Fernet instance for this communication pair
fernet = self._get_fernet(
encrypted_message.sender_fingerprint,
encrypted_message.recipient_fingerprint
)
# Decrypt the message
decrypted_bytes = fernet.decrypt(encrypted_message.encrypted_payload.encode('utf-8'))
decrypted_str = decrypted_bytes.decode('utf-8')
# Try to parse as JSON, fallback to string
try:
decrypted_content = json.loads(decrypted_str)
except json.JSONDecodeError:
decrypted_content = decrypted_str
# Emit decryption completed event if agent is available
if self.agent:
crewai_event_bus.emit(
self.agent,
DecryptionCompletedEvent(
recipient_agent=self.agent,
sender_fingerprint=encrypted_message.sender_fingerprint,
message_type=encrypted_message.message_type
)
)
logger.info(f"Successfully decrypted {encrypted_message.message_type} message from {encrypted_message.sender_fingerprint[:8]}... to {encrypted_message.recipient_fingerprint[:8]}...")
return decrypted_content
except Exception as e:
logger.error(f"Failed to decrypt message: {e}")
raise ValueError(f"Message decryption failed: {e}")
def is_encrypted_communication(self, message: Any) -> bool:
"""
Check if a message is an encrypted communication.
Args:
message (Any): Message to check
Returns:
bool: True if message is encrypted, False otherwise
"""
return isinstance(message, (EncryptedMessage, dict)) and (
hasattr(message, 'encrypted_payload') or
(isinstance(message, dict) and 'encrypted_payload' in message)
)

View File

@@ -24,14 +24,12 @@ class SecurityConfig(BaseModel):
This class manages security settings for CrewAI agents, including:
- Authentication credentials *TODO*
- Identity information (agent fingerprints)
- Encrypted agent-to-agent communication
- Scoping rules *TODO*
- Impersonation/delegation tokens *TODO*
Attributes:
version (str): Version of the security configuration
fingerprint (Fingerprint): The unique fingerprint automatically generated for the component
encrypted_communication (bool): Enable encrypted agent-to-agent communication
"""
model_config = ConfigDict(
@@ -48,11 +46,6 @@ class SecurityConfig(BaseModel):
default_factory=Fingerprint,
description="Unique identifier for the component"
)
encrypted_communication: bool = Field(
default=False,
description="Enable encrypted communication between agents"
)
def is_compatible(self, min_version: str) -> bool:
"""

View File

@@ -1,13 +1,10 @@
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.utilities import I18N
import logging
from .ask_question_tool import AskQuestionTool
from .delegate_work_tool import DelegateWorkTool
logger = logging.getLogger(__name__)
class AgentTools:
"""Manager class for agent-related tools"""
@@ -19,19 +16,6 @@ class AgentTools:
def tools(self) -> list[BaseTool]:
"""Get all available agent tools"""
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])
# Check encryption capabilities of agents
encryption_enabled_agents = [
agent for agent in self.agents
if hasattr(agent, 'security_config')
and agent.security_config
and getattr(agent.security_config, 'encrypted_communication', False)
]
if encryption_enabled_agents:
logger.info(f"Creating agent communication tools with encryption support for {len(encryption_enabled_agents)} agent(s): {[agent.role for agent in encryption_enabled_agents]}")
else:
logger.debug(f"Creating agent communication tools without encryption (no agents have encrypted_communication enabled)")
delegate_tool = DelegateWorkTool(
agents=self.agents,

View File

@@ -1,5 +1,5 @@
import logging
from typing import Optional, Union, Dict, Any
from typing import Optional
from pydantic import Field
@@ -7,173 +7,18 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.utilities import I18N
from crewai.security import AgentCommunicationEncryption, EncryptedMessage
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.encryption_events import (
EncryptedCommunicationStartedEvent,
EncryptedCommunicationEstablishedEvent,
EncryptedTaskExecutionEvent,
)
logger = logging.getLogger(__name__)
class BaseAgentTool(BaseTool):
"""Base class for agent-related tools with optional encrypted communication support"""
"""Base class for agent-related tools"""
agents: list[BaseAgent] = Field(description="List of available agents")
i18n: I18N = Field(
default_factory=I18N, description="Internationalization settings"
)
def __init__(self, **data):
"""Initialize BaseAgentTool with optional encryption support."""
super().__init__(**data)
self._encryption_handler: Optional[AgentCommunicationEncryption] = None
@property
def encryption_enabled(self) -> bool:
"""Check if encryption is enabled for agent communication."""
# Check if any agent has encryption enabled
return any(
hasattr(agent, 'security_config') and
agent.security_config and
getattr(agent.security_config, 'encrypted_communication', False)
for agent in self.agents
)
def _get_encryption_handler(self, sender_agent: BaseAgent) -> Optional[AgentCommunicationEncryption]:
"""Get encryption handler for a specific agent."""
if not hasattr(sender_agent, 'security_config') or not sender_agent.security_config:
return None
if not getattr(sender_agent.security_config, 'encrypted_communication', False):
return None
# Create encryption handler if it doesn't exist, passing the agent instance
if self._encryption_handler is None:
self._encryption_handler = AgentCommunicationEncryption(
sender_agent.security_config.fingerprint,
agent=sender_agent
)
return self._encryption_handler
def _prepare_communication_payload(
self,
sender_agent: BaseAgent,
recipient_agent: BaseAgent,
task: str,
context: Optional[str] = None
) -> Union[Dict[str, Any], EncryptedMessage]:
"""
Prepare communication payload, with optional encryption.
Args:
sender_agent: The agent sending the communication
recipient_agent: The agent receiving the communication
task: The task or question to communicate
context: Optional context for the communication
Returns:
Union[Dict[str, Any], EncryptedMessage]: Plain or encrypted message
"""
# Prepare the base message
message_payload = {
"task": task,
"context": context or "",
"sender_role": getattr(sender_agent, 'role', 'unknown'),
"message_type": "agent_communication"
}
# Check if encryption should be used
encryption_handler = self._get_encryption_handler(sender_agent)
if encryption_handler and hasattr(recipient_agent, 'security_config') and recipient_agent.security_config:
try:
# Emit communication started event
crewai_event_bus.emit(
sender_agent,
EncryptedCommunicationStartedEvent(
sender_agent=sender_agent,
recipient_agent=recipient_agent
)
)
logger.info(f"Starting encrypted communication from '{sender_agent.role}' to '{recipient_agent.role}'")
# Encrypt the message for the recipient
encrypted_msg = encryption_handler.encrypt_message(
message_payload,
recipient_agent.security_config.fingerprint,
message_type="agent_communication",
recipient_agent=recipient_agent
)
# Emit communication established event
crewai_event_bus.emit(
sender_agent,
EncryptedCommunicationEstablishedEvent(
sender_agent=sender_agent,
recipient_agent=recipient_agent
)
)
logger.info(f"Encrypted communication established between '{sender_agent.role}' and '{recipient_agent.role}'")
logger.debug(f"Encrypted communication from {sender_agent.role} to {recipient_agent.role}")
return encrypted_msg
except Exception as e:
logger.warning(f"Encryption failed, falling back to plain communication: {e}")
return message_payload
def _process_received_communication(
self,
recipient_agent: BaseAgent,
message: Union[str, Dict[str, Any], EncryptedMessage]
) -> Union[str, Dict[str, Any]]:
"""
Process received communication, with optional decryption.
Args:
recipient_agent: The agent receiving the communication
message: The message to process (may be encrypted)
Returns:
Union[str, Dict[str, Any]]: Processed message content
"""
# Handle encrypted messages
if isinstance(message, EncryptedMessage) or (
isinstance(message, dict) and 'encrypted_payload' in message
):
# We need an encryption handler for the recipient agent
recipient_encryption_handler = None
if hasattr(recipient_agent, 'security_config') and recipient_agent.security_config:
if getattr(recipient_agent.security_config, 'encrypted_communication', False):
recipient_encryption_handler = AgentCommunicationEncryption(
recipient_agent.security_config.fingerprint,
agent=recipient_agent
)
if recipient_encryption_handler:
try:
logger.info(f"Starting decryption of received communication for '{recipient_agent.role}'")
# Convert dict to EncryptedMessage if needed
if isinstance(message, dict):
message = EncryptedMessage(**message)
decrypted = recipient_encryption_handler.decrypt_message(message)
logger.info(f"Successfully decrypted communication for '{recipient_agent.role}'")
logger.debug(f"Decrypted communication for {recipient_agent.role}")
return decrypted
except Exception as e:
logger.error(f"Decryption failed for {recipient_agent.role}: {e}")
raise ValueError(f"Failed to decrypt communication: {e}")
else:
logger.warning(f"Received encrypted message but {recipient_agent.role} has no decryption capability")
raise ValueError("Received encrypted message but agent cannot decrypt it")
# Return message as-is for plain communication
return message
def sanitize_agent_name(self, name: str) -> str:
"""
Sanitize agent role name by normalizing whitespace and setting to lowercase.
@@ -209,7 +54,6 @@ class BaseAgentTool(BaseTool):
) -> str:
"""
Execute delegation to an agent with case-insensitive and whitespace-tolerant matching.
Supports both encrypted and non-encrypted communication based on agent configuration.
Args:
agent_name: Name/role of the agent to delegate to (case-insensitive)
@@ -262,63 +106,19 @@ class BaseAgentTool(BaseTool):
error=f"No agent found with role '{sanitized_name}'"
)
target_agent = agent[0]
# Determine sender agent (first agent with security config, or first agent as fallback)
sender_agent = None
for a in self.agents:
if hasattr(a, 'security_config') and a.security_config:
sender_agent = a
break
if not sender_agent:
sender_agent = self.agents[0] if self.agents else target_agent
agent = agent[0]
try:
# Prepare communication with optional encryption
communication_payload = self._prepare_communication_payload(
sender_agent=sender_agent,
recipient_agent=target_agent,
task=task,
context=context
)
# Create task for execution
task_with_assigned_agent = Task(
description=task,
agent=target_agent,
expected_output=target_agent.i18n.slice("manager_request"),
i18n=target_agent.i18n,
agent=agent,
expected_output=agent.i18n.slice("manager_request"),
i18n=agent.i18n,
)
# Execute with processed communication context
if isinstance(communication_payload, EncryptedMessage):
# Emit encrypted task execution event
crewai_event_bus.emit(
target_agent,
EncryptedTaskExecutionEvent(
agent=target_agent
)
)
logger.info(f"Executing encrypted communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
logger.debug(f"Executing encrypted communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
# For encrypted messages, pass the encrypted payload as additional context
# The target agent will need to handle decryption during execution
enhanced_context = f"ENCRYPTED_COMMUNICATION: {communication_payload.model_dump_json()}"
if context:
enhanced_context += f"\nADDITIONAL_CONTEXT: {context}"
result = target_agent.execute_task(task_with_assigned_agent, enhanced_context)
else:
logger.info(f"Executing plain communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
logger.debug(f"Executing plain communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
result = target_agent.execute_task(task_with_assigned_agent, context)
return result
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
return agent.execute_task(task_with_assigned_agent, context)
except Exception as e:
# Handle task creation or execution errors
logger.error(f"Task execution failed for agent '{self.sanitize_agent_name(target_agent.role)}': {e}")
return self.i18n.errors("agent_tool_execution_error").format(
agent_role=self.sanitize_agent_name(target_agent.role),
agent_role=self.sanitize_agent_name(agent.role),
error=str(e)
)

18
src/crewai/types/hitl.py Normal file
View File

@@ -0,0 +1,18 @@
from typing import List, Dict, TypedDict
class HITLResumeInfo(TypedDict, total=False):
"""HITL resume information passed from flow to crew."""
task_id: str
crew_execution_id: str
task_key: str
task_output: str
human_feedback: str
previous_messages: List[Dict[str, str]]
class CrewInputsWithHITL(TypedDict, total=False):
"""Crew inputs that may contain HITL resume information."""
_hitl_resume: HITLResumeInfo

View File

@@ -11,7 +11,6 @@ from crewai.agents.parser import (
)
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.tools import BaseTool as CrewAITool
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.tools.tool_types import ToolResult
@@ -25,26 +24,28 @@ from crewai.cli.config import Settings
console = Console()
def parse_tools(tools: List[BaseTool]) -> List[CrewStructuredTool]:
def parse_tools(tools: List[Union[BaseTool, dict]]) -> List[Union[CrewStructuredTool, dict]]:
"""Parse tools to be used for the task."""
tools_list = []
tools_list: List[Union[CrewStructuredTool, dict]] = []
for tool in tools:
if isinstance(tool, CrewAITool):
if isinstance(tool, dict):
tools_list.append(tool)
elif isinstance(tool, BaseTool):
tools_list.append(tool.to_structured_tool())
else:
raise ValueError("Tool is not a CrewStructuredTool or BaseTool")
raise ValueError(f"Tool is not a CrewStructuredTool, BaseTool, or raw tool definition (dict): {type(tool)}")
return tools_list
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool]]) -> str:
def get_tool_names(tools: Sequence[Union[CrewStructuredTool, BaseTool, dict]]) -> str:
"""Get the names of the tools."""
return ", ".join([t.name for t in tools])
return ", ".join([t.name if hasattr(t, 'name') else t.get('name', 'unknown') for t in tools])
def render_text_description_and_args(
tools: Sequence[Union[CrewStructuredTool, BaseTool]],
tools: Sequence[Union[CrewStructuredTool, BaseTool, dict]],
) -> str:
"""Render the tool name, description, and args in plain text.
@@ -54,7 +55,12 @@ def render_text_description_and_args(
"""
tool_strings = []
for tool in tools:
tool_strings.append(tool.description)
if hasattr(tool, 'description'):
tool_strings.append(tool.description)
elif isinstance(tool, dict) and 'description' in tool:
tool_strings.append(f"Tool name: {tool.get('name', 'unknown')}\nTool description:\n{tool['description']}")
else:
tool_strings.append(f"Tool name: {tool.get('name', 'unknown') if isinstance(tool, dict) else 'unknown'}")
return "\n".join(tool_strings)

View File

@@ -67,11 +67,9 @@ from .memory_events import (
# events
from .event_listener import EventListener
from .third_party.agentops_listener import agentops_listener
__all__ = [
"EventListener",
"agentops_listener",
"CrewAIEventsBus",
"crewai_event_bus",
"AgentExecutionStartedEvent",
@@ -105,7 +103,6 @@ __all__ = [
"MemoryRetrievalStartedEvent",
"MemoryRetrievalCompletedEvent",
"EventListener",
"agentops_listener",
"CrewKickoffStartedEvent",
"CrewKickoffCompletedEvent",
"CrewKickoffFailedEvent",

View File

@@ -1,165 +0,0 @@
"""
Encryption events for agent-to-agent communication
"""
from typing import Optional
from crewai.agents.agent_builder.base_agent import BaseAgent
from .base_events import BaseEvent
class EncryptionStartedEvent(BaseEvent):
"""Event emitted when agent-to-agent encryption starts"""
sender_agent: BaseAgent
recipient_agent: BaseAgent
message_type: str = "agent_communication"
type: str = "encryption_started"
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the sender agent
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.sender_agent.fingerprint, "metadata")
and self.sender_agent.fingerprint.metadata
):
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
class EncryptionCompletedEvent(BaseEvent):
"""Event emitted when agent-to-agent encryption completes successfully"""
sender_agent: BaseAgent
recipient_agent: BaseAgent
message_type: str = "agent_communication"
type: str = "encryption_completed"
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the sender agent
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.sender_agent.fingerprint, "metadata")
and self.sender_agent.fingerprint.metadata
):
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
class DecryptionStartedEvent(BaseEvent):
"""Event emitted when agent-to-agent decryption starts"""
recipient_agent: BaseAgent
sender_fingerprint: str
message_type: str = "agent_communication"
type: str = "decryption_started"
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the recipient agent
if hasattr(self.recipient_agent, "fingerprint") and self.recipient_agent.fingerprint:
self.source_fingerprint = self.recipient_agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.recipient_agent.fingerprint, "metadata")
and self.recipient_agent.fingerprint.metadata
):
self.fingerprint_metadata = self.recipient_agent.fingerprint.metadata
class DecryptionCompletedEvent(BaseEvent):
"""Event emitted when agent-to-agent decryption completes successfully"""
recipient_agent: BaseAgent
sender_fingerprint: str
message_type: str = "agent_communication"
type: str = "decryption_completed"
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the recipient agent
if hasattr(self.recipient_agent, "fingerprint") and self.recipient_agent.fingerprint:
self.source_fingerprint = self.recipient_agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.recipient_agent.fingerprint, "metadata")
and self.recipient_agent.fingerprint.metadata
):
self.fingerprint_metadata = self.recipient_agent.fingerprint.metadata
class EncryptedCommunicationStartedEvent(BaseEvent):
"""Event emitted when encrypted communication between agents begins"""
sender_agent: BaseAgent
recipient_agent: BaseAgent
type: str = "encrypted_communication_started"
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the sender agent
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.sender_agent.fingerprint, "metadata")
and self.sender_agent.fingerprint.metadata
):
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
class EncryptedCommunicationEstablishedEvent(BaseEvent):
"""Event emitted when encrypted communication is successfully established"""
sender_agent: BaseAgent
recipient_agent: BaseAgent
type: str = "encrypted_communication_established"
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the sender agent
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.sender_agent.fingerprint, "metadata")
and self.sender_agent.fingerprint.metadata
):
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
class EncryptedTaskExecutionEvent(BaseEvent):
"""Event emitted when an encrypted communication task is being executed"""
agent: BaseAgent
task_type: str = "encrypted_communication"
type: str = "encrypted_task_execution"
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata

View File

@@ -37,15 +37,6 @@ from .agent_events import (
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from .encryption_events import (
EncryptionStartedEvent,
EncryptionCompletedEvent,
DecryptionStartedEvent,
DecryptionCompletedEvent,
EncryptedCommunicationStartedEvent,
EncryptedCommunicationEstablishedEvent,
EncryptedTaskExecutionEvent,
)
from .crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
@@ -522,70 +513,5 @@ class EventListener(BaseEventListener):
event.verbose,
)
# Encryption event handlers
@crewai_event_bus.on(EncryptedCommunicationStartedEvent)
def on_encrypted_communication_started(source, event: EncryptedCommunicationStartedEvent):
self.formatter.handle_encryption_communication_started(
event.sender_agent.role,
event.recipient_agent.role
)
@crewai_event_bus.on(EncryptedCommunicationEstablishedEvent)
def on_encrypted_communication_established(source, event: EncryptedCommunicationEstablishedEvent):
self.formatter.handle_encryption_communication_established(
event.sender_agent.role,
event.recipient_agent.role
)
@crewai_event_bus.on(EncryptionStartedEvent)
def on_encryption_started(source, event: EncryptionStartedEvent):
recipient_fingerprint = ""
if hasattr(event.recipient_agent, "fingerprint") and event.recipient_agent.fingerprint:
recipient_fingerprint = event.recipient_agent.fingerprint.uuid_str[:8] + "..."
self.formatter.handle_encryption_started(
event.message_type,
recipient_fingerprint
)
@crewai_event_bus.on(EncryptionCompletedEvent)
def on_encryption_completed(source, event: EncryptionCompletedEvent):
sender_fingerprint = ""
recipient_fingerprint = ""
if hasattr(event.sender_agent, "fingerprint") and event.sender_agent.fingerprint:
sender_fingerprint = event.sender_agent.fingerprint.uuid_str[:8] + "..."
if hasattr(event.recipient_agent, "fingerprint") and event.recipient_agent.fingerprint:
recipient_fingerprint = event.recipient_agent.fingerprint.uuid_str[:8] + "..."
self.formatter.handle_encryption_completed(
event.message_type,
sender_fingerprint,
recipient_fingerprint
)
@crewai_event_bus.on(DecryptionStartedEvent)
def on_decryption_started(source, event: DecryptionStartedEvent):
sender_fingerprint = event.sender_fingerprint[:8] + "..." if event.sender_fingerprint else ""
self.formatter.handle_decryption_started(
event.message_type,
sender_fingerprint
)
@crewai_event_bus.on(DecryptionCompletedEvent)
def on_decryption_completed(source, event: DecryptionCompletedEvent):
sender_fingerprint = event.sender_fingerprint[:8] + "..." if event.sender_fingerprint else ""
recipient_fingerprint = ""
if hasattr(event.recipient_agent, "fingerprint") and event.recipient_agent.fingerprint:
recipient_fingerprint = event.recipient_agent.fingerprint.uuid_str[:8] + "..."
self.formatter.handle_decryption_completed(
event.message_type,
sender_fingerprint,
recipient_fingerprint
)
@crewai_event_bus.on(EncryptedTaskExecutionEvent)
def on_encrypted_task_execution(source, event: EncryptedTaskExecutionEvent):
self.formatter.handle_encrypted_task_execution(
event.agent.role
)
event_listener = EventListener()

View File

@@ -4,7 +4,7 @@ from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from crewai.utilities.constants import CREWAI_BASE_URL
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.cli.plus_api import PlusAPI
@@ -41,14 +41,21 @@ class TraceBatchManager:
"""Single responsibility: Manage batches and event buffering"""
def __init__(self):
self.plus_api = PlusAPI(api_key=get_auth_token())
try:
self.plus_api = PlusAPI(api_key=get_auth_token())
except AuthError:
self.plus_api = PlusAPI(api_key="")
self.trace_batch_id: Optional[str] = None # Backend ID
self.current_batch: Optional[TraceBatch] = None
self.event_buffer: List[TraceEvent] = []
self.execution_start_times: Dict[str, datetime] = {}
def initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
) -> TraceBatch:
"""Initialize a new trace batch"""
self.current_batch = TraceBatch(
@@ -57,13 +64,15 @@ class TraceBatchManager:
self.event_buffer.clear()
self.record_start_time("execution")
self._initialize_backend_batch(user_context, execution_metadata)
self._initialize_backend_batch(user_context, execution_metadata, use_ephemeral)
return self.current_batch
def _initialize_backend_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
self,
user_context: Dict[str, str],
execution_metadata: Dict[str, Any],
use_ephemeral: bool = False,
):
"""Send batch initialization to backend"""
@@ -74,6 +83,7 @@ class TraceBatchManager:
payload = {
"trace_id": self.current_batch.batch_id,
"execution_type": execution_metadata.get("execution_type", "crew"),
"user_identifier": execution_metadata.get("user_context", None),
"execution_context": {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", None),
@@ -91,12 +101,22 @@ class TraceBatchManager:
"execution_started_at": datetime.now(timezone.utc).isoformat(),
},
}
if use_ephemeral:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
response = self.plus_api.initialize_trace_batch(payload)
response = (
self.plus_api.initialize_ephemeral_trace_batch(payload)
if use_ephemeral
else self.plus_api.initialize_trace_batch(payload)
)
if response.status_code == 201 or response.status_code == 200:
response_data = response.json()
self.trace_batch_id = response_data["trace_id"]
self.trace_batch_id = (
response_data["trace_id"]
if not use_ephemeral
else response_data["ephemeral_trace_id"]
)
console = Console()
panel = Panel(
f"✅ Trace batch initialized with session ID: {self.trace_batch_id}",
@@ -116,7 +136,7 @@ class TraceBatchManager:
"""Add event to buffer"""
self.event_buffer.append(trace_event)
def _send_events_to_backend(self):
def _send_events_to_backend(self, ephemeral: bool = True):
"""Send buffered events to backend"""
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return
@@ -134,7 +154,11 @@ class TraceBatchManager:
if not self.trace_batch_id:
raise Exception("❌ Trace batch ID not found")
response = self.plus_api.send_trace_events(self.trace_batch_id, payload)
response = (
self.plus_api.send_ephemeral_trace_events(self.trace_batch_id, payload)
if ephemeral
else self.plus_api.send_trace_events(self.trace_batch_id, payload)
)
if response.status_code == 200 or response.status_code == 201:
self.event_buffer.clear()
@@ -146,15 +170,15 @@ class TraceBatchManager:
except Exception as e:
logger.error(f"❌ Error sending events to backend: {str(e)}")
def finalize_batch(self) -> Optional[TraceBatch]:
def finalize_batch(self, ephemeral: bool = True) -> Optional[TraceBatch]:
"""Finalize batch and return it for sending"""
if not self.current_batch:
return None
if self.event_buffer:
self._send_events_to_backend()
self._send_events_to_backend(ephemeral)
self._finalize_backend_batch()
self._finalize_backend_batch(ephemeral)
self.current_batch.events = self.event_buffer.copy()
@@ -168,7 +192,7 @@ class TraceBatchManager:
return finalized_batch
def _finalize_backend_batch(self):
def _finalize_backend_batch(self, ephemeral: bool = True):
"""Send batch finalization to backend"""
if not self.plus_api or not self.trace_batch_id:
return
@@ -182,12 +206,24 @@ class TraceBatchManager:
"final_event_count": total_events,
}
response = self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
response = (
self.plus_api.finalize_ephemeral_trace_batch(
self.trace_batch_id, payload
)
if ephemeral
else self.plus_api.finalize_trace_batch(self.trace_batch_id, payload)
)
if response.status_code == 200:
access_code = response.json().get("access_code", None)
console = Console()
return_link = (
f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}"
if not ephemeral and access_code
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}",
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
title="Trace Batch Finalization",
border_style="green",
)

View File

@@ -13,7 +13,6 @@ from crewai.utilities.events.agent_events import (
AgentExecutionErrorEvent,
)
from crewai.utilities.events.listeners.tracing.types import TraceEvent
from crewai.utilities.events.listeners.tracing.utils import is_tracing_enabled
from crewai.utilities.events.reasoning_events import (
AgentReasoningStartedEvent,
AgentReasoningCompletedEvent,
@@ -67,7 +66,7 @@ from crewai.utilities.events.memory_events import (
MemorySaveFailedEvent,
)
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
@@ -76,13 +75,12 @@ class TraceCollectionListener(BaseEventListener):
Trace collection listener that orchestrates trace collection
"""
trace_enabled: Optional[bool] = False
complex_events = ["task_started", "llm_call_started", "llm_call_completed"]
_instance = None
_initialized = False
def __new__(cls, batch_manager=None, tracing: Optional[bool] = False):
def __new__(cls, batch_manager=None):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@@ -90,25 +88,22 @@ class TraceCollectionListener(BaseEventListener):
def __init__(
self,
batch_manager: Optional[TraceBatchManager] = None,
tracing: Optional[bool] = False,
):
if self._initialized:
return
super().__init__()
self.batch_manager = batch_manager or TraceBatchManager()
self.tracing = tracing or False
self.trace_enabled = self._check_trace_enabled()
self._initialized = True
def _check_trace_enabled(self) -> bool:
def _check_authenticated(self) -> bool:
"""Check if tracing should be enabled"""
auth_token = get_auth_token()
if not auth_token:
try:
res = bool(get_auth_token())
return res
except AuthError:
return False
return is_tracing_enabled() or self.tracing
def _get_user_context(self) -> Dict[str, str]:
"""Extract user context for tracing"""
return {
@@ -120,8 +115,6 @@ class TraceCollectionListener(BaseEventListener):
def setup_listeners(self, crewai_event_bus):
"""Setup event listeners - delegates to specific handlers"""
if not self.trace_enabled:
return
self._register_flow_event_handlers(crewai_event_bus)
self._register_context_event_handlers(crewai_event_bus)
@@ -167,13 +160,13 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
if not self.batch_manager.is_batch_initialized():
self._initialize_batch(source, event)
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
self._handle_trace_event("crew_kickoff_completed", source, event)
self.batch_manager.finalize_batch()
self.batch_manager.finalize_batch(ephemeral=True)
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
@@ -287,7 +280,7 @@ class TraceCollectionListener(BaseEventListener):
def on_agent_reasoning_failed(source, event):
self._handle_action_event("agent_reasoning_failed", source, event)
def _initialize_batch(self, source: Any, event: Any):
def _initialize_crew_batch(self, source: Any, event: Any):
"""Initialize trace batch"""
user_context = self._get_user_context()
execution_metadata = {
@@ -296,7 +289,7 @@ class TraceCollectionListener(BaseEventListener):
"crewai_version": get_crewai_version(),
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
"""Initialize trace batch for Flow execution"""
@@ -308,7 +301,20 @@ class TraceCollectionListener(BaseEventListener):
"execution_type": "flow",
}
self.batch_manager.initialize_batch(user_context, execution_metadata)
self._initialize_batch(user_context, execution_metadata)
def _initialize_batch(
self, user_context: Dict[str, str], execution_metadata: Dict[str, Any]
):
"""Initialize trace batch if ephemeral"""
if not self._check_authenticated():
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=True
)
else:
self.batch_manager.initialize_batch(
user_context, execution_metadata, use_ephemeral=False
)
def _handle_trace_event(self, event_type: str, source: Any, event: Any):
"""Generic handler for context end events"""

View File

@@ -1,5 +1,153 @@
import os
import platform
import uuid
import hashlib
import subprocess
import getpass
from pathlib import Path
from datetime import datetime
import re
import json
import click
from crewai.utilities.paths import db_storage_path
def is_tracing_enabled() -> bool:
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"
def on_first_execution_tracing_confirmation() -> bool:
if _is_test_environment():
return False
if is_first_execution():
mark_first_execution_done()
return click.confirm(
"This is the first execution of CrewAI. Do you want to enable tracing?",
default=True,
show_default=True,
)
return False
def _is_test_environment() -> bool:
"""Detect if we're running in a test environment."""
return os.environ.get("CREWAI_TESTING", "").lower() == "true"
def _get_machine_id() -> str:
"""Stable, privacy-preserving machine fingerprint (cross-platform)."""
parts = []
try:
mac = ":".join(
["{:02x}".format((uuid.getnode() >> b) & 0xFF) for b in range(0, 12, 2)][
::-1
]
)
parts.append(mac)
except Exception:
pass
sysname = platform.system()
parts.append(sysname)
try:
if sysname == "Darwin":
res = subprocess.run(
["system_profiler", "SPHardwareDataType"],
capture_output=True,
text=True,
timeout=2,
)
m = re.search(r"Hardware UUID:\s*([A-Fa-f0-9\-]+)", res.stdout)
if m:
parts.append(m.group(1))
elif sysname == "Linux":
try:
parts.append(Path("/etc/machine-id").read_text().strip())
except Exception:
parts.append(Path("/sys/class/dmi/id/product_uuid").read_text().strip())
elif sysname == "Windows":
res = subprocess.run(
["wmic", "csproduct", "get", "UUID"],
capture_output=True,
text=True,
timeout=2,
)
lines = [line.strip() for line in res.stdout.splitlines() if line.strip()]
if len(lines) >= 2:
parts.append(lines[1])
except Exception:
pass
return hashlib.sha256("".join(parts).encode()).hexdigest()
def _user_data_file() -> Path:
base = Path(db_storage_path())
base.mkdir(parents=True, exist_ok=True)
return base / ".crewai_user.json"
def _load_user_data() -> dict:
p = _user_data_file()
if p.exists():
try:
return json.loads(p.read_text())
except Exception:
pass
return {}
def _save_user_data(data: dict) -> None:
try:
p = _user_data_file()
p.write_text(json.dumps(data, indent=2))
except Exception:
pass
def get_user_id() -> str:
"""Stable, anonymized user identifier with caching."""
data = _load_user_data()
if "user_id" in data:
return data["user_id"]
try:
username = getpass.getuser()
except Exception:
username = "unknown"
seed = f"{username}|{_get_machine_id()}"
uid = hashlib.sha256(seed.encode()).hexdigest()
data["user_id"] = uid
_save_user_data(data)
return uid
def is_first_execution() -> bool:
"""True if this is the first execution for this user."""
data = _load_user_data()
return not data.get("first_execution_done", False)
def mark_first_execution_done() -> None:
"""Mark that the first execution has been completed."""
data = _load_user_data()
if data.get("first_execution_done", False):
return
data.update(
{
"first_execution_done": True,
"first_execution_at": datetime.now().timestamp(),
"user_id": get_user_id(),
"machine_id": _get_machine_id(),
}
)
_save_user_data(data)

View File

@@ -1 +0,0 @@
from .agentops_listener import agentops_listener

View File

@@ -1,67 +0,0 @@
from typing import Optional
from crewai.utilities.events import (
CrewKickoffCompletedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.events.base_event_listener import BaseEventListener
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
from crewai.utilities.events.task_events import TaskEvaluationEvent
try:
import agentops
AGENTOPS_INSTALLED = True
except ImportError:
AGENTOPS_INSTALLED = False
class AgentOpsListener(BaseEventListener):
tool_event: Optional["agentops.ToolEvent"] = None
session: Optional["agentops.Session"] = None
def __init__(self):
super().__init__()
def setup_listeners(self, crewai_event_bus):
if not AGENTOPS_INSTALLED:
return
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
self.session = agentops.init()
for agent in source.agents:
if self.session:
self.session.create_agent(
name=agent.role,
agent_id=str(agent.id),
)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
if self.session:
self.session.end_session(
end_state="Success",
end_state_reason="Finished Execution",
)
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.tool_event = agentops.ToolEvent(name=event.tool_name)
if self.session:
self.session.record(self.tool_event)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
@crewai_event_bus.on(TaskEvaluationEvent)
def on_task_evaluation(source, event: TaskEvaluationEvent):
if self.session:
self.session.create_agent(
name="Task Evaluator", agent_id=str(source.original_agent.id)
)
agentops_listener = AgentOpsListener()

View File

@@ -1754,151 +1754,3 @@ class ConsoleFormatter:
Attempts=f"{retry_count + 1}",
)
self.print_panel(content, "🛡️ Guardrail Failed", "red")
# Encryption event handlers
def handle_encryption_communication_started(
self, sender_role: str, recipient_role: str
) -> None:
"""Handle encrypted communication started event."""
if not self.verbose:
return
content = Text()
content.append("Starting encrypted communication from '", style="white")
content.append(f"{sender_role}", style="bright_cyan bold")
content.append("' to '", style="white")
content.append(f"{recipient_role}", style="bright_cyan bold")
content.append("'", style="white")
panel = Panel(
content,
title="🔐 Encrypted Communication",
border_style="cyan",
padding=(0, 1),
)
self.print(panel)
def handle_encryption_communication_established(
self, sender_role: str, recipient_role: str
) -> None:
"""Handle encrypted communication established event."""
if not self.verbose:
return
content = Text()
content.append("Encrypted communication established between '", style="white")
content.append(f"{sender_role}", style="bright_green bold")
content.append("' and '", style="white")
content.append(f"{recipient_role}", style="bright_green bold")
content.append("'", style="white")
panel = Panel(
content,
title="✅ Communication Secured",
border_style="green",
padding=(0, 1),
)
self.print(panel)
def handle_encryption_started(
self, message_type: str, recipient_fingerprint: str
) -> None:
"""Handle encryption started event."""
if not self.verbose:
return
content = Text()
content.append(f"Starting encryption for {message_type} message to recipient ", style="white")
content.append(f"{recipient_fingerprint}", style="bright_yellow")
content.append("...", style="white")
panel = Panel(
content,
title="🔒 Encrypting Message",
border_style="yellow",
padding=(0, 1),
)
self.print(panel)
def handle_encryption_completed(
self, message_type: str, sender_fingerprint: str, recipient_fingerprint: str
) -> None:
"""Handle encryption completed event."""
if not self.verbose:
return
content = Text()
content.append(f"Successfully encrypted {message_type} message from ", style="white")
content.append(f"{sender_fingerprint}", style="bright_green")
content.append(" to ", style="white")
content.append(f"{recipient_fingerprint}", style="bright_green")
content.append("...", style="white")
panel = Panel(
content,
title="✅ Message Encrypted",
border_style="green",
padding=(0, 1),
)
self.print(panel)
def handle_decryption_started(
self, message_type: str, sender_fingerprint: str
) -> None:
"""Handle decryption started event."""
if not self.verbose:
return
content = Text()
content.append(f"Starting decryption of {message_type} message from sender ", style="white")
content.append(f"{sender_fingerprint}", style="bright_yellow")
content.append("...", style="white")
panel = Panel(
content,
title="🔓 Decrypting Message",
border_style="yellow",
padding=(0, 1),
)
self.print(panel)
def handle_decryption_completed(
self, message_type: str, sender_fingerprint: str, recipient_fingerprint: str
) -> None:
"""Handle decryption completed event."""
if not self.verbose:
return
content = Text()
content.append(f"Successfully decrypted {message_type} message from ", style="white")
content.append(f"{sender_fingerprint}", style="bright_green")
content.append(" to ", style="white")
content.append(f"{recipient_fingerprint}", style="bright_green")
content.append("...", style="white")
panel = Panel(
content,
title="✅ Message Decrypted",
border_style="green",
padding=(0, 1),
)
self.print(panel)
def handle_encrypted_task_execution(self, agent_role: str) -> None:
"""Handle encrypted task execution event."""
if not self.verbose:
return
content = Text()
content.append("Executing encrypted communication task for agent '", style="white")
content.append(f"{agent_role}", style="bright_blue bold")
content.append("'", style="white")
panel = Panel(
content,
title="🔐 Executing Encrypted Task",
border_style="blue",
padding=(0, 1),
)
self.print(panel)

View File

@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from crewai.agents.parser import AgentAction
from crewai.security import Fingerprint
@@ -10,7 +10,7 @@ from crewai.utilities.i18n import I18N
def execute_tool_and_check_finality(
agent_action: AgentAction,
tools: List[CrewStructuredTool],
tools: List[Union[CrewStructuredTool, dict]],
i18n: I18N,
agent_key: Optional[str] = None,
agent_role: Optional[str] = None,
@@ -37,7 +37,8 @@ def execute_tool_and_check_finality(
ToolResult containing the execution result and whether it should be treated as a final answer
"""
try:
tool_name_to_tool_map = {tool.name: tool for tool in tools}
executable_tools = [tool for tool in tools if hasattr(tool, 'name') and hasattr(tool, 'result_as_answer')]
tool_name_to_tool_map = {tool.name: tool for tool in executable_tools}
if agent_key and agent_role and agent:
fingerprint_context = fingerprint_context or {}
@@ -52,10 +53,12 @@ def execute_tool_and_check_finality(
except Exception as e:
raise ValueError(f"Failed to set fingerprint: {e}")
# Create tool usage instance
# Create tool usage instance - filter to only CrewStructuredTool instances for ToolUsage
from typing import cast
crew_structured_tools = [tool for tool in tools if hasattr(tool, 'name') and hasattr(tool, 'result_as_answer') and not isinstance(tool, dict)]
tool_usage = ToolUsage(
tools_handler=tools_handler,
tools=tools,
tools=cast(List[CrewStructuredTool], crew_structured_tools),
function_calling_llm=function_calling_llm,
task=task,
agent=agent,
@@ -82,7 +85,7 @@ def execute_tool_and_check_finality(
# Handle invalid tool name
tool_result = i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in tools]),
tools=", ".join([tool.name.casefold() for tool in executable_tools]),
)
return ToolResult(tool_result, False)

File diff suppressed because one or more lines are too long

View File

@@ -34,11 +34,11 @@ def setup_test_environment():
f"Test storage directory {storage_dir} is not writable: {e}"
)
# Set environment variable to point to the test storage directory
os.environ["CREWAI_STORAGE_DIR"] = str(storage_dir)
os.environ["CREWAI_TESTING"] = "true"
yield
os.environ.pop("CREWAI_TESTING", None)
# Cleanup is handled automatically when tempfile context exits

View File

@@ -1,5 +1,4 @@
"""Test Agent creation and execution basic functionality."""
import hashlib
import json
from concurrent.futures import Future
@@ -51,7 +50,7 @@ from crewai.utilities.events.memory_events import (
MemoryRetrievalStartedEvent,
MemoryRetrievalCompletedEvent,
)
from crewai.memory.external.external_memory import ExternalMemory
@pytest.fixture
def ceo():
@@ -312,7 +311,6 @@ def test_crew_creation(researcher, writer):
@pytest.mark.vcr(filter_headers=["authorization"])
def test_sync_task_execution(researcher, writer):
from unittest.mock import patch
tasks = [
Task(
@@ -961,7 +959,6 @@ def test_cache_hitting_between_agents(researcher, writer, ceo):
@pytest.mark.vcr(filter_headers=["authorization"])
def test_api_calls_throttling(capsys):
from unittest.mock import patch
from crewai.tools import tool
@@ -1396,7 +1393,6 @@ def test_kickoff_for_each_invalid_input():
def test_kickoff_for_each_error_handling():
"""Tests error handling in kickoff_for_each when kickoff raises an error."""
from unittest.mock import patch
inputs = [
{"topic": "dog"},
@@ -1433,7 +1429,6 @@ def test_kickoff_for_each_error_handling():
@pytest.mark.asyncio
async def test_kickoff_async_basic_functionality_and_output():
"""Tests the basic functionality and output of kickoff_async."""
from unittest.mock import patch
inputs = {"topic": "dog"}
@@ -1540,7 +1535,6 @@ async def test_async_kickoff_for_each_async_empty_input():
def test_set_agents_step_callback():
from unittest.mock import patch
researcher_agent = Agent(
role="Researcher",
@@ -1570,7 +1564,6 @@ def test_set_agents_step_callback():
def test_dont_set_agents_step_callback_if_already_set():
from unittest.mock import patch
def agent_callback(_):
pass
@@ -2035,7 +2028,6 @@ def test_crew_inputs_interpolate_both_agents_and_tasks():
def test_crew_inputs_interpolate_both_agents_and_tasks_diff():
from unittest.mock import patch
agent = Agent(
role="{topic} Researcher",
@@ -2068,7 +2060,6 @@ def test_crew_inputs_interpolate_both_agents_and_tasks_diff():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_does_not_interpolate_without_inputs():
from unittest.mock import patch
agent = Agent(
role="{topic} Researcher",
@@ -2203,7 +2194,6 @@ def test_task_same_callback_both_on_task_and_crew():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_tools_with_custom_caching():
from unittest.mock import patch
from crewai.tools import tool
@@ -2484,7 +2474,6 @@ def test_multiple_conditional_tasks(researcher, writer):
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch
math_researcher = Agent(
role="Researcher",
@@ -2583,7 +2572,6 @@ def test_memory_events_are_emitted():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory_with_long_term_memory():
from unittest.mock import patch
math_researcher = Agent(
role="Researcher",
@@ -2614,7 +2602,6 @@ def test_using_contextual_memory_with_long_term_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_warning_long_term_memory_without_entity_memory():
from unittest.mock import patch
math_researcher = Agent(
role="Researcher",
@@ -2651,7 +2638,6 @@ def test_warning_long_term_memory_without_entity_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_long_term_memory_with_memory_flag():
from unittest.mock import patch
math_researcher = Agent(
role="Researcher",
@@ -2686,7 +2672,6 @@ def test_long_term_memory_with_memory_flag():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory_with_short_term_memory():
from unittest.mock import patch
math_researcher = Agent(
role="Researcher",
@@ -2717,7 +2702,6 @@ def test_using_contextual_memory_with_short_term_memory():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_disabled_memory_using_contextual_memory():
from unittest.mock import patch
math_researcher = Agent(
role="Researcher",
@@ -2845,7 +2829,6 @@ def test_crew_output_file_validation_failures():
def test_manager_agent(researcher, writer):
from unittest.mock import patch
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
@@ -4752,3 +4735,43 @@ def test_default_crew_name(researcher, writer):
],
)
assert crew.name == "crew"
@pytest.mark.vcr(filter_headers=["authorization"])
def test_ensure_exchanged_messages_are_propagated_to_external_memory():
external_memory = ExternalMemory(storage=MagicMock())
math_researcher = Agent(
role="Researcher",
goal="You research about math.",
backstory="You're an expert in research and you love to learn new things.",
allow_delegation=False,
)
task1 = Task(
description="Research a topic to teach a kid aged 6 about math.",
expected_output="A topic, explanation, angle, and examples.",
agent=math_researcher,
)
crew = Crew(
agents=[math_researcher],
tasks=[task1],
external_memory=external_memory,
)
with patch.object(
ExternalMemory, "save", return_value=None
) as external_memory_save:
crew.kickoff()
expected_messages = [
{'role': 'system', 'content': "You are Researcher. You're an expert in research and you love to learn new things.\nYour personal goal is: You research about math.\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!"},
{'role': 'user', 'content': '\nCurrent Task: Research a topic to teach a kid aged 6 about math.\n\nThis is the expected criteria for your final answer: A topic, explanation, angle, and examples.\nyou MUST return the actual complete content as the final answer, not a summary.\n\n# Useful context: \nExternal memories:\n\n\nBegin! This is VERY important to you, use the tools available and give your best Final Answer, your job depends on it!\n\nThought:'},
{'role': 'assistant', 'content': 'I now can give a great answer \nFinal Answer: \n\n**Topic: Understanding Shapes (Geometry)**\n\n**Explanation:** \nShapes are everywhere around us! They are the special forms that we can see in everyday objects. Teaching a 6-year-old about shapes is not only fun but also a way to help them think about the world around them and develop their spatial awareness. We will focus on basic shapes: circle, square, triangle, and rectangle. Understanding these shapes helps kids recognize and describe their environment.\n\n**Angle:** \nLets make learning about shapes an adventure! We can turn it into a treasure hunt where the child has to find objects around the house or outside that match the shapes we learn. This hands-on approach helps make the learning stick!\n\n**Examples:** \n1. **Circle:** \n - Explanation: A circle is round and has no corners. It looks like a wheel or a cookie! \n - Activity: Find objects that are circles, such as a clock, a dinner plate, or a ball. Draw a big circle on a paper and then try to draw smaller circles inside it.\n\n2. **Square:** \n - Explanation: A square has four equal sides and four corners. It looks like a box! \n - Activity: Look for squares in books, in windows, or in building blocks. Try to build a tall tower using square blocks!\n\n3. **Triangle:** \n - Explanation: A triangle has three sides and three corners. It looks like a slice of pizza or a roof! \n - Activity: Use crayons to draw a big triangle and then find things that are shaped like a triangle, like a slice of cheese or a traffic sign.\n\n4. **Rectangle:** \n - Explanation: A rectangle has four sides but only opposite sides are equal. Its like a stretched square! \n - Activity: Search for rectangles, such as a book cover or a door. You can cut out rectangles from colored paper and create a collage!\n\nBy relating the shapes to fun activities and using real-world examples, we not only make learning more enjoyable but also help the child better remember and understand the concept of shapes in math. This foundation forms the basis of their future learning in geometry!'}
]
external_memory_save.assert_called_once_with(
value=ANY,
metadata={"description": ANY, "messages": expected_messages},
agent=ANY,
)

View File

@@ -1,314 +0,0 @@
"""Test encrypted agent communication functionality."""
import json
import pytest
from unittest.mock import Mock, patch
from crewai.security import (
AgentCommunicationEncryption,
EncryptedMessage,
Fingerprint,
SecurityConfig
)
from crewai.agent import Agent
from crewai.tools.agent_tools.ask_question_tool import AskQuestionTool
from crewai.tools.agent_tools.delegate_work_tool import DelegateWorkTool
class TestAgentCommunicationEncryption:
"""Test the encryption/decryption functionality."""
def test_encryption_initialization(self):
"""Test initialization of encryption handler."""
fp = Fingerprint()
encryption = AgentCommunicationEncryption(fp)
assert encryption.agent_fingerprint == fp
assert encryption._encryption_keys == {}
def test_key_derivation_consistency(self):
"""Test that key derivation is consistent for same agent pair."""
fp1 = Fingerprint()
fp2 = Fingerprint()
encryption1 = AgentCommunicationEncryption(fp1)
encryption2 = AgentCommunicationEncryption(fp2)
# Keys should be the same regardless of which agent derives them
key1_from_1 = encryption1._derive_communication_key(fp1.uuid_str, fp2.uuid_str)
key1_from_2 = encryption2._derive_communication_key(fp1.uuid_str, fp2.uuid_str)
key2_from_1 = encryption1._derive_communication_key(fp2.uuid_str, fp1.uuid_str)
key2_from_2 = encryption2._derive_communication_key(fp2.uuid_str, fp1.uuid_str)
assert key1_from_1 == key1_from_2
assert key1_from_1 == key2_from_1
assert key1_from_1 == key2_from_2
def test_message_encryption_decryption(self):
"""Test basic message encryption and decryption."""
sender_fp = Fingerprint()
recipient_fp = Fingerprint()
sender_encryption = AgentCommunicationEncryption(sender_fp)
recipient_encryption = AgentCommunicationEncryption(recipient_fp)
original_message = "Hello, this is a test message"
# Encrypt message
encrypted_msg = sender_encryption.encrypt_message(
original_message,
recipient_fp,
"test_message"
)
assert isinstance(encrypted_msg, EncryptedMessage)
assert encrypted_msg.sender_fingerprint == sender_fp.uuid_str
assert encrypted_msg.recipient_fingerprint == recipient_fp.uuid_str
assert encrypted_msg.message_type == "test_message"
assert encrypted_msg.encrypted_payload != original_message
# Decrypt message
decrypted_message = recipient_encryption.decrypt_message(encrypted_msg)
assert decrypted_message == original_message
def test_dict_message_encryption_decryption(self):
"""Test encryption and decryption of dictionary messages."""
sender_fp = Fingerprint()
recipient_fp = Fingerprint()
sender_encryption = AgentCommunicationEncryption(sender_fp)
recipient_encryption = AgentCommunicationEncryption(recipient_fp)
original_message = {
"task": "Analyze this data",
"context": "This is important context",
"priority": "high"
}
# Encrypt message
encrypted_msg = sender_encryption.encrypt_message(
original_message,
recipient_fp
)
# Decrypt message
decrypted_message = recipient_encryption.decrypt_message(encrypted_msg)
assert decrypted_message == original_message
def test_wrong_recipient_decryption_fails(self):
"""Test that decryption fails for wrong recipient."""
sender_fp = Fingerprint()
recipient_fp = Fingerprint()
wrong_recipient_fp = Fingerprint()
sender_encryption = AgentCommunicationEncryption(sender_fp)
wrong_recipient_encryption = AgentCommunicationEncryption(wrong_recipient_fp)
original_message = "Secret message"
# Encrypt for correct recipient
encrypted_msg = sender_encryption.encrypt_message(
original_message,
recipient_fp
)
# Try to decrypt with wrong recipient
with pytest.raises(ValueError, match="Message not intended for this agent"):
wrong_recipient_encryption.decrypt_message(encrypted_msg)
def test_is_encrypted_communication(self):
"""Test detection of encrypted communication."""
fp = Fingerprint()
encryption = AgentCommunicationEncryption(fp)
# Test with EncryptedMessage
encrypted_msg = EncryptedMessage(
encrypted_payload="test",
sender_fingerprint="sender",
recipient_fingerprint="recipient"
)
assert encryption.is_encrypted_communication(encrypted_msg) is True
# Test with dict containing encrypted_payload
encrypted_dict = {
"encrypted_payload": "test",
"sender_fingerprint": "sender"
}
assert encryption.is_encrypted_communication(encrypted_dict) is True
# Test with regular message
regular_msg = "Plain text message"
assert encryption.is_encrypted_communication(regular_msg) is False
# Test with regular dict
regular_dict = {"task": "Do something"}
assert encryption.is_encrypted_communication(regular_dict) is False
class TestSecurityConfigEncryption:
"""Test SecurityConfig encryption settings."""
def test_security_config_encryption_default(self):
"""Test default encryption setting."""
config = SecurityConfig()
assert config.encrypted_communication is False
def test_security_config_encryption_enabled(self):
"""Test enabling encryption."""
config = SecurityConfig(encrypted_communication=True)
assert config.encrypted_communication is True
class TestAgentToolsEncryption:
"""Test encrypted communication in agent tools."""
@pytest.fixture
def agents_with_encryption(self):
"""Create test agents with encryption enabled."""
# Create agents with security configs
sender_agent = Mock()
sender_agent.role = "sender"
sender_agent.security_config = SecurityConfig(encrypted_communication=True)
sender_agent.i18n = Mock()
recipient_agent = Mock()
recipient_agent.role = "recipient"
recipient_agent.security_config = SecurityConfig(encrypted_communication=True)
recipient_agent.i18n = Mock()
recipient_agent.i18n.slice.return_value = "Expected output"
recipient_agent.execute_task = Mock(return_value="Task completed")
return [sender_agent, recipient_agent]
@pytest.fixture
def agents_without_encryption(self):
"""Create test agents without encryption."""
sender_agent = Mock()
sender_agent.role = "sender"
sender_agent.security_config = SecurityConfig(encrypted_communication=False)
sender_agent.i18n = Mock()
recipient_agent = Mock()
recipient_agent.role = "recipient"
recipient_agent.security_config = SecurityConfig(encrypted_communication=False)
recipient_agent.i18n = Mock()
recipient_agent.i18n.slice.return_value = "Expected output"
recipient_agent.execute_task = Mock(return_value="Task completed")
return [sender_agent, recipient_agent]
def test_encryption_enabled_detection(self, agents_with_encryption):
"""Test detection of encryption capability."""
tool = AskQuestionTool(agents=agents_with_encryption, description="Test tool")
assert tool.encryption_enabled is True
def test_encryption_disabled_detection(self, agents_without_encryption):
"""Test detection when encryption is disabled."""
tool = AskQuestionTool(agents=agents_without_encryption, description="Test tool")
assert tool.encryption_enabled is False
def test_prepare_encrypted_communication_payload(self, agents_with_encryption):
"""Test preparation of encrypted communication payload."""
sender, recipient = agents_with_encryption
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
payload = tool._prepare_communication_payload(
sender_agent=sender,
recipient_agent=recipient,
task="Test task",
context="Test context"
)
assert isinstance(payload, EncryptedMessage)
assert payload.sender_fingerprint == sender.security_config.fingerprint.uuid_str
assert payload.recipient_fingerprint == recipient.security_config.fingerprint.uuid_str
def test_prepare_plain_communication_payload(self, agents_without_encryption):
"""Test preparation of plain communication payload."""
sender, recipient = agents_without_encryption
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
payload = tool._prepare_communication_payload(
sender_agent=sender,
recipient_agent=recipient,
task="Test task",
context="Test context"
)
assert isinstance(payload, dict)
assert payload["task"] == "Test task"
assert payload["context"] == "Test context"
assert payload["sender_role"] == "sender"
def test_execute_with_encryption_enabled(self, agents_with_encryption):
"""Test task execution with encryption enabled."""
sender, recipient = agents_with_encryption
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
result = tool._run(
question="What is AI?",
context="Test context",
coworker="recipient"
)
# Verify task was executed
assert result == "Task completed"
# Verify execute_task was called with encrypted context
recipient.execute_task.assert_called_once()
call_args = recipient.execute_task.call_args
context_arg = call_args[0][1] # Second argument is context
assert "ENCRYPTED_COMMUNICATION:" in context_arg
def test_execute_with_encryption_disabled(self, agents_without_encryption):
"""Test task execution with encryption disabled."""
sender, recipient = agents_without_encryption
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
result = tool._run(
question="What is AI?",
context="Test context",
coworker="recipient"
)
# Verify task was executed
assert result == "Task completed"
# Verify execute_task was called with plain context
recipient.execute_task.assert_called_once()
call_args = recipient.execute_task.call_args
context_arg = call_args[0][1] # Second argument is context
assert context_arg == "Test context"
class TestBackwardCompatibility:
"""Test that existing functionality still works."""
def test_agents_without_security_config_work(self):
"""Test that agents without security config still function."""
# Create agents without security config
agent1 = Mock()
agent1.role = "agent1"
agent1.i18n = Mock()
agent1.i18n.slice.return_value = "Expected output"
agent1.execute_task = Mock(return_value="Task completed")
# No security_config attribute
agent2 = Mock()
agent2.role = "agent2"
agent2.i18n = Mock()
tool = AskQuestionTool(agents=[agent1, agent2], description="Test tool")
result = tool._run(
question="Test question",
context="Test context",
coworker="agent1"
)
assert result == "Task completed"
agent1.execute_task.assert_called_once()

237
tests/test_hosted_tools.py Normal file
View File

@@ -0,0 +1,237 @@
import pytest
from crewai import Agent
from crewai.tools import BaseTool
class MockTool(BaseTool):
name: str = "mock_tool"
description: str = "A mock tool for testing"
def _run(self, query: str) -> str:
return f"Mock result for: {query}"
def test_agent_with_crewai_tools_only():
"""Test backward compatibility with CrewAI tools only."""
mock_tool = MockTool()
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[mock_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], BaseTool)
def test_agent_with_raw_tools_only():
"""Test agent with raw tool definitions only."""
raw_tool = {
"name": "hosted_search",
"description": "Search the web using hosted search",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"]
}
}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[raw_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], dict)
assert agent.tools[0]["name"] == "hosted_search"
def test_agent_with_mixed_tools():
"""Test agent with both CrewAI tools and raw tool definitions."""
mock_tool = MockTool()
raw_tool = {
"name": "hosted_calculator",
"description": "Hosted calculator tool",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string"}
}
}
}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[mock_tool, raw_tool]
)
assert len(agent.tools) == 2
assert isinstance(agent.tools[0], BaseTool)
assert isinstance(agent.tools[1], dict)
def test_invalid_raw_tool_definition():
"""Test error handling for invalid raw tool definitions."""
invalid_tool = {"description": "Missing name field"}
with pytest.raises(ValueError, match="Raw tool definition must have a 'name' field"):
Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[invalid_tool]
)
def test_parse_tools_with_mixed_types():
"""Test parse_tools function with mixed tool types."""
from crewai.utilities.agent_utils import parse_tools
mock_tool = MockTool()
raw_tool = {
"name": "hosted_tool",
"description": "A hosted tool",
"parameters": {"type": "object"}
}
parsed = parse_tools([mock_tool, raw_tool])
assert len(parsed) == 2
assert hasattr(parsed[0], 'name')
assert parsed[0].name == "mock_tool"
assert isinstance(parsed[1], dict)
assert parsed[1]["name"] == "hosted_tool"
def test_get_tool_names_with_mixed_types():
"""Test get_tool_names function with mixed tool types."""
from crewai.utilities.agent_utils import get_tool_names
mock_tool = MockTool()
raw_tool = {"name": "hosted_tool", "description": "A hosted tool"}
names = get_tool_names([mock_tool, raw_tool])
assert "mock_tool" in names
assert "hosted_tool" in names
def test_render_text_description_with_mixed_types():
"""Test render_text_description_and_args function with mixed tool types."""
from crewai.utilities.agent_utils import render_text_description_and_args
mock_tool = MockTool()
raw_tool = {"name": "hosted_tool", "description": "A hosted tool"}
description = render_text_description_and_args([mock_tool, raw_tool])
assert "A mock tool for testing" in description
assert "A hosted tool" in description
def test_agent_executor_with_mixed_tools():
"""Test CrewAgentExecutor initialization with mixed tool types."""
mock_tool = MockTool()
raw_tool = {"name": "hosted_tool", "description": "A hosted tool"}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[mock_tool, raw_tool]
)
agent.create_agent_executor()
assert len(agent.agent_executor.tool_name_to_tool_map) == 2
assert "mock_tool" in agent.agent_executor.tool_name_to_tool_map
assert "hosted_tool" in agent.agent_executor.tool_name_to_tool_map
def test_empty_tools_list():
"""Test agent with empty tools list."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[]
)
assert len(agent.tools) == 0
def test_none_tools():
"""Test agent with None tools."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=None
)
assert agent.tools == []
def test_raw_tool_without_description():
"""Test raw tool definition without description field."""
raw_tool = {"name": "minimal_tool"}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[raw_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], dict)
assert agent.tools[0]["name"] == "minimal_tool"
def test_complex_raw_tool_definition():
"""Test complex raw tool definition with nested parameters."""
raw_tool = {
"name": "complex_search",
"description": "Advanced search with multiple parameters",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"filters": {
"type": "object",
"properties": {
"date_range": {"type": "string"},
"category": {"type": "string"}
}
},
"limit": {
"type": "integer",
"minimum": 1,
"maximum": 100,
"default": 10
}
},
"required": ["query"]
}
}
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
tools=[raw_tool]
)
assert len(agent.tools) == 1
assert isinstance(agent.tools[0], dict)
assert agent.tools[0]["name"] == "complex_search"
assert "parameters" in agent.tools[0]
assert agent.tools[0]["parameters"]["type"] == "object"

View File

@@ -2,8 +2,8 @@ import os
import pytest
from unittest.mock import patch, MagicMock
# Remove the module-level patch
from crewai import Agent, Task, Crew
from crewai.flow.flow import Flow, start
from crewai.utilities.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
@@ -284,29 +284,42 @@ class TestTraceListenerSetup:
f"Found {len(trace_handlers)} trace handlers when tracing should be disabled"
)
def test_trace_listener_setup_correctly(self):
def test_trace_listener_setup_correctly_for_crew(self):
"""Test that trace listener is set up correctly when enabled"""
with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}):
trace_listener = TraceCollectionListener()
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
with patch.object(
TraceCollectionListener, "setup_listeners"
) as mock_listener_setup:
Crew(agents=[agent], tasks=[task], verbose=True)
assert mock_listener_setup.call_count >= 1
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
@pytest.mark.vcr(filter_headers=["authorization"])
def test_trace_listener_setup_correctly_with_tracing_flag(self):
def test_trace_listener_setup_correctly_for_flow(self):
"""Test that trace listener is set up correctly when enabled"""
agent = Agent(role="Test Agent", goal="Test goal", backstory="Test backstory")
task = Task(
description="Say hello to the world",
expected_output="hello world",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=True, tracing=True)
crew.kickoff()
trace_listener = TraceCollectionListener(tracing=True)
assert trace_listener.trace_enabled is True
assert trace_listener.batch_manager is not None
with patch.dict(os.environ, {"CREWAI_TRACING_ENABLED": "true"}):
class FlowExample(Flow):
@start()
def start(self):
pass
with patch.object(
TraceCollectionListener, "setup_listeners"
) as mock_listener_setup:
FlowExample()
assert mock_listener_setup.call_count >= 1
# Helper method to ensure cleanup
def teardown_method(self):

6656
uv.lock generated

File diff suppressed because it is too large Load Diff