Compare commits

...

23 Commits

Author SHA1 Message Date
lorenzejay
601eda9095 Enhance Flow Execution Logic
- Introduced conditional execution for start methods in the Flow class.
- Unconditional start methods are prioritized during kickoff, while conditional starts are executed only if no unconditional starts are present.
- Improved handling of cyclic flows by allowing re-execution of conditional start methods triggered by routers.
- Added checks to continue execution chains for completed conditional starts.

These changes improve the flexibility and control of flow execution, ensuring that the correct methods are triggered based on the defined conditions.
2026-01-15 09:29:25 -08:00
lorenzejay
83c62a65dd Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-decouple-executor-from-crew 2026-01-15 09:12:38 -08:00
nicoferdi96
5645cbb22e CrewAI AMP Deployment Guidelines (#4205)
* doc changes for better deplyment guidelines and checklist

* chore: remove .claude folder from version control

The .claude folder contains local Claude Code skills and configuration
that should not be tracked in the repository. Already in .gitignore.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Better project structure for flows

* docs.json updated structure

* Ko and Pt traslations for deploying guidelines to AMP

* fix broken links

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-01-15 16:32:20 +01:00
Lorenze Jay
8f022be106 feat: bump versions to 1.8.1 (#4242)
Some checks failed
Build uv cache / build-cache (3.10) (push) Waiting to run
Build uv cache / build-cache (3.11) (push) Waiting to run
Build uv cache / build-cache (3.12) (push) Waiting to run
Build uv cache / build-cache (3.13) (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
* feat: bump versions to 1.8.1

* bump bump
2026-01-14 20:49:14 -08:00
Greyson LaLonde
6a19b0a279 feat: a2a task execution utilities 2026-01-14 22:56:17 -05:00
Greyson LaLonde
641c336b2c chore: a2a agent card docs, refine existing a2a docs 2026-01-14 22:46:53 -05:00
Greyson LaLonde
22f1812824 feat: add a2a server config; agent card generation 2026-01-14 22:09:11 -05:00
lorenzejay
3a1deb193a fixed cassette 2026-01-14 19:06:28 -08:00
lorenzejay
09185acc0d refactor: streamline agent execution and enhance flow compatibility
Refactored the Agent class to simplify the execution method by removing the event loop check and clarifying the behavior when called from synchronous and asynchronous contexts. The changes ensure that the method operates seamlessly within flow methods, improving clarity in the documentation. Additionally, updated the AgentExecutor to set the response model to None, enhancing flexibility. New test cassettes were added to validate the functionality of agents within flow contexts, ensuring robust testing for both synchronous and asynchronous operations.
2026-01-14 18:51:09 -08:00
lorenzejay
6541f01b1b working cassette 2026-01-14 16:40:35 -08:00
lorenzejay
3a6702e9c8 working 2026-01-14 16:27:50 -08:00
lorenzejay
e4bd7889fd test fix cassette 2026-01-14 16:23:36 -08:00
lorenzejay
842a1db16f test fix cassette 2026-01-14 16:23:19 -08:00
lorenzejay
e9b86100c7 refactor: update test task guardrail process output for improved validation
Refactored the test for task guardrail process output to enhance the validation of the output against the OpenAPI schema. The changes include a more structured request body and updated response handling to ensure compliance with the guardrail requirements. This update aims to improve the clarity and reliability of the test cases, ensuring that task outputs are correctly validated and feedback is appropriately provided.
2026-01-14 16:05:38 -08:00
lorenzejay
341812d58e refactor: improve test for Agent kickoff parameters
Updated the test for the Agent class to ensure that the kickoff method correctly preserves parameters. The test now verifies the configuration of the agent after kickoff, enhancing clarity and maintainability. Additionally, the test for asynchronous kickoff within a flow context has been updated to reflect the Agent class instead of LiteAgent.
2026-01-14 15:56:53 -08:00
lorenzejay
38db734561 fix test 2026-01-14 15:39:34 -08:00
lorenzejay
5048d54981 Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/enh-decouple-executor-from-crew 2026-01-14 14:28:33 -08:00
lorenzejay
ae17178e86 linting and tests 2026-01-14 14:28:09 -08:00
lorenzejay
b7a13e15ff refactor: enhance agent kickoff preparation by separating common logic
Updated the Agent class to introduce a new private method  that consolidates the common setup logic for both synchronous and asynchronous kickoff executions. This change improves code clarity and maintainability by reducing redundancy in the kickoff process, while ensuring that the agent can still execute effectively within both standalone and flow contexts.
2026-01-14 14:27:39 -08:00
lorenzejay
13dc7e25e0 ensure executors work inside a flow due to flow in flow async structure 2026-01-14 14:23:10 -08:00
lorenzejay
5cef85c643 refactor: streamline AgentExecutor initialization by removing redundant parameters
Updated the Agent class to simplify the initialization of the AgentExecutor by removing unnecessary task and crew parameters in standalone mode. This change enhances code clarity and maintains backward compatibility by ensuring that the executor is correctly configured without redundant assignments.
2026-01-09 18:27:07 -08:00
lorenzejay
dc3ae9396d fix: handle None task in AgentExecutor to prevent errors
Added a check to ensure that if the task is None, the method returns early without attempting to access task properties. This change improves the robustness of the AgentExecutor by preventing potential errors when the task is not set.
2026-01-09 18:07:37 -08:00
lorenzejay
0029f8193c wip restrcuturing agent executor and liteagent 2026-01-09 14:42:50 -08:00
59 changed files with 6724 additions and 2378 deletions

1
.gitignore vendored
View File

@@ -26,3 +26,4 @@ plan.md
conceptual_plan.md
build_image
chromadb-*.lock
.claude

View File

@@ -429,7 +429,8 @@
"group": "How-To Guides",
"pages": [
"en/enterprise/guides/build-crew",
"en/enterprise/guides/deploy-crew",
"en/enterprise/guides/prepare-for-deployment",
"en/enterprise/guides/deploy-to-amp",
"en/enterprise/guides/kickoff-crew",
"en/enterprise/guides/update-crew",
"en/enterprise/guides/enable-crew-studio",
@@ -864,7 +865,8 @@
"group": "Guias",
"pages": [
"pt-BR/enterprise/guides/build-crew",
"pt-BR/enterprise/guides/deploy-crew",
"pt-BR/enterprise/guides/prepare-for-deployment",
"pt-BR/enterprise/guides/deploy-to-amp",
"pt-BR/enterprise/guides/kickoff-crew",
"pt-BR/enterprise/guides/update-crew",
"pt-BR/enterprise/guides/enable-crew-studio",
@@ -1326,7 +1328,8 @@
"group": "How-To Guides",
"pages": [
"ko/enterprise/guides/build-crew",
"ko/enterprise/guides/deploy-crew",
"ko/enterprise/guides/prepare-for-deployment",
"ko/enterprise/guides/deploy-to-amp",
"ko/enterprise/guides/kickoff-crew",
"ko/enterprise/guides/update-crew",
"ko/enterprise/guides/enable-crew-studio",
@@ -1514,6 +1517,18 @@
"source": "/enterprise/:path*",
"destination": "/en/enterprise/:path*"
},
{
"source": "/en/enterprise/guides/deploy-crew",
"destination": "/en/enterprise/guides/deploy-to-amp"
},
{
"source": "/ko/enterprise/guides/deploy-crew",
"destination": "/ko/enterprise/guides/deploy-to-amp"
},
{
"source": "/pt-BR/enterprise/guides/deploy-crew",
"destination": "/pt-BR/enterprise/guides/deploy-to-amp"
},
{
"source": "/api-reference/:path*",
"destination": "/en/api-reference/:path*"

View File

@@ -1,12 +1,12 @@
---
title: "Deploy Crew"
description: "Deploying a Crew on CrewAI AMP"
title: "Deploy to AMP"
description: "Deploy your Crew or Flow to CrewAI AMP"
icon: "rocket"
mode: "wide"
---
<Note>
After creating a crew locally or through Crew Studio, the next step is
After creating a Crew or Flow locally (or through Crew Studio), the next step is
deploying it to the CrewAI AMP platform. This guide covers multiple deployment
methods to help you choose the best approach for your workflow.
</Note>
@@ -14,19 +14,26 @@ mode: "wide"
## Prerequisites
<CardGroup cols={2}>
<Card title="Crew Ready for Deployment" icon="users">
You should have a working crew either built locally or created through Crew
Studio
<Card title="Project Ready for Deployment" icon="check-circle">
You should have a working Crew or Flow that runs successfully locally.
Follow our [preparation guide](/en/enterprise/guides/prepare-for-deployment) to verify your project structure.
</Card>
<Card title="GitHub Repository" icon="github">
Your crew code should be in a GitHub repository (for GitHub integration
Your code should be in a GitHub repository (for GitHub integration
method)
</Card>
</CardGroup>
<Info>
**Crews vs Flows**: Both project types can be deployed as "automations" on CrewAI AMP.
The deployment process is the same, but they have different project structures.
See [Prepare for Deployment](/en/enterprise/guides/prepare-for-deployment) for details.
</Info>
## Option 1: Deploy Using CrewAI CLI
The CLI provides the fastest way to deploy locally developed crews to the Enterprise platform.
The CLI provides the fastest way to deploy locally developed Crews or Flows to the AMP platform.
The CLI automatically detects your project type from `pyproject.toml` and builds accordingly.
<Steps>
<Step title="Install CrewAI CLI">
@@ -128,7 +135,7 @@ crewai deploy remove <deployment_id>
## Option 2: Deploy Directly via Web Interface
You can also deploy your crews directly through the CrewAI AMP web interface by connecting your GitHub account. This approach doesn't require using the CLI on your local machine.
You can also deploy your Crews or Flows directly through the CrewAI AMP web interface by connecting your GitHub account. This approach doesn't require using the CLI on your local machine. The platform automatically detects your project type and handles the build appropriately.
<Steps>
@@ -282,68 +289,7 @@ For automated deployments in CI/CD pipelines, you can use the CrewAI API to trig
</Steps>
## ⚠️ Environment Variable Security Requirements
<Warning>
**Important**: CrewAI AMP has security restrictions on environment variable
names that can cause deployment failures if not followed.
</Warning>
### Blocked Environment Variable Patterns
For security reasons, the following environment variable naming patterns are **automatically filtered** and will cause deployment issues:
**Blocked Patterns:**
- Variables ending with `_TOKEN` (e.g., `MY_API_TOKEN`)
- Variables ending with `_PASSWORD` (e.g., `DB_PASSWORD`)
- Variables ending with `_SECRET` (e.g., `API_SECRET`)
- Variables ending with `_KEY` in certain contexts
**Specific Blocked Variables:**
- `GITHUB_USER`, `GITHUB_TOKEN`
- `AWS_REGION`, `AWS_DEFAULT_REGION`
- Various internal CrewAI system variables
### Allowed Exceptions
Some variables are explicitly allowed despite matching blocked patterns:
- `AZURE_AD_TOKEN`
- `AZURE_OPENAI_AD_TOKEN`
- `ENTERPRISE_ACTION_TOKEN`
- `CREWAI_ENTEPRISE_TOOLS_TOKEN`
### How to Fix Naming Issues
If your deployment fails due to environment variable restrictions:
```bash
# ❌ These will cause deployment failures
OPENAI_TOKEN=sk-...
DATABASE_PASSWORD=mypassword
API_SECRET=secret123
# ✅ Use these naming patterns instead
OPENAI_API_KEY=sk-...
DATABASE_CREDENTIALS=mypassword
API_CONFIG=secret123
```
### Best Practices
1. **Use standard naming conventions**: `PROVIDER_API_KEY` instead of `PROVIDER_TOKEN`
2. **Test locally first**: Ensure your crew works with the renamed variables
3. **Update your code**: Change any references to the old variable names
4. **Document changes**: Keep track of renamed variables for your team
<Tip>
If you encounter deployment failures with cryptic environment variable errors,
check your variable names against these patterns first.
</Tip>
### Interact with Your Deployed Crew
## Interact with Your Deployed Automation
Once deployment is complete, you can access your crew through:
@@ -387,7 +333,108 @@ The Enterprise platform also offers:
- **Custom Tools Repository**: Create, share, and install tools
- **Crew Studio**: Build crews through a chat interface without writing code
## Troubleshooting Deployment Failures
If your deployment fails, check these common issues:
### Build Failures
#### Missing uv.lock File
**Symptom**: Build fails early with dependency resolution errors
**Solution**: Generate and commit the lock file:
```bash
uv lock
git add uv.lock
git commit -m "Add uv.lock for deployment"
git push
```
<Warning>
The `uv.lock` file is required for all deployments. Without it, the platform
cannot reliably install your dependencies.
</Warning>
#### Wrong Project Structure
**Symptom**: "Could not find entry point" or "Module not found" errors
**Solution**: Verify your project matches the expected structure:
- **Both Crews and Flows**: Must have entry point at `src/project_name/main.py`
- **Crews**: Use a `run()` function as entry point
- **Flows**: Use a `kickoff()` function as entry point
See [Prepare for Deployment](/en/enterprise/guides/prepare-for-deployment) for detailed structure diagrams.
#### Missing CrewBase Decorator
**Symptom**: "Crew not found", "Config not found", or agent/task configuration errors
**Solution**: Ensure **all** crew classes use the `@CrewBase` decorator:
```python
from crewai.project import CrewBase, agent, crew, task
@CrewBase # This decorator is REQUIRED
class YourCrew():
"""Your crew description"""
@agent
def my_agent(self) -> Agent:
return Agent(
config=self.agents_config['my_agent'], # type: ignore[index]
verbose=True
)
# ... rest of crew definition
```
<Info>
This applies to standalone Crews AND crews embedded inside Flow projects.
Every crew class needs the decorator.
</Info>
#### Incorrect pyproject.toml Type
**Symptom**: Build succeeds but runtime fails, or unexpected behavior
**Solution**: Verify the `[tool.crewai]` section matches your project type:
```toml
# For Crew projects:
[tool.crewai]
type = "crew"
# For Flow projects:
[tool.crewai]
type = "flow"
```
### Runtime Failures
#### LLM Connection Failures
**Symptom**: API key errors, "model not found", or authentication failures
**Solution**:
1. Verify your LLM provider's API key is correctly set in environment variables
2. Ensure the environment variable names match what your code expects
3. Test locally with the exact same environment variables before deploying
#### Crew Execution Errors
**Symptom**: Crew starts but fails during execution
**Solution**:
1. Check the execution logs in the AMP dashboard (Traces tab)
2. Verify all tools have required API keys configured
3. Ensure agent configurations in `agents.yaml` are valid
4. Check task configurations in `tasks.yaml` for syntax errors
<Card title="Need Help?" icon="headset" href="mailto:support@crewai.com">
Contact our support team for assistance with deployment issues or questions
about the Enterprise platform.
about the AMP platform.
</Card>

View File

@@ -0,0 +1,305 @@
---
title: "Prepare for Deployment"
description: "Ensure your Crew or Flow is ready for deployment to CrewAI AMP"
icon: "clipboard-check"
mode: "wide"
---
<Note>
Before deploying to CrewAI AMP, it's crucial to verify your project is correctly structured.
Both Crews and Flows can be deployed as "automations," but they have different project structures
and requirements that must be met for successful deployment.
</Note>
## Understanding Automations
In CrewAI AMP, **automations** is the umbrella term for deployable Agentic AI projects. An automation can be either:
- **A Crew**: A standalone team of AI agents working together on tasks
- **A Flow**: An orchestrated workflow that can combine multiple crews, direct LLM calls, and procedural logic
Understanding which type you're deploying is essential because they have different project structures and entry points.
## Crews vs Flows: Key Differences
<CardGroup cols={2}>
<Card title="Crew Projects" icon="users">
Standalone AI agent teams with `crew.py` defining agents and tasks. Best for focused, collaborative tasks.
</Card>
<Card title="Flow Projects" icon="diagram-project">
Orchestrated workflows with embedded crews in a `crews/` folder. Best for complex, multi-stage processes.
</Card>
</CardGroup>
| Aspect | Crew | Flow |
|--------|------|------|
| **Project structure** | `src/project_name/` with `crew.py` | `src/project_name/` with `crews/` folder |
| **Main logic location** | `src/project_name/crew.py` | `src/project_name/main.py` (Flow class) |
| **Entry point function** | `run()` in `main.py` | `kickoff()` in `main.py` |
| **pyproject.toml type** | `type = "crew"` | `type = "flow"` |
| **CLI create command** | `crewai create crew name` | `crewai create flow name` |
| **Config location** | `src/project_name/config/` | `src/project_name/crews/crew_name/config/` |
| **Can contain other crews** | No | Yes (in `crews/` folder) |
## Project Structure Reference
### Crew Project Structure
When you run `crewai create crew my_crew`, you get this structure:
```
my_crew/
├── .gitignore
├── pyproject.toml # Must have type = "crew"
├── README.md
├── .env
├── uv.lock # REQUIRED for deployment
└── src/
└── my_crew/
├── __init__.py
├── main.py # Entry point with run() function
├── crew.py # Crew class with @CrewBase decorator
├── tools/
│ ├── custom_tool.py
│ └── __init__.py
└── config/
├── agents.yaml # Agent definitions
└── tasks.yaml # Task definitions
```
<Warning>
The nested `src/project_name/` structure is critical for Crews.
Placing files at the wrong level will cause deployment failures.
</Warning>
### Flow Project Structure
When you run `crewai create flow my_flow`, you get this structure:
```
my_flow/
├── .gitignore
├── pyproject.toml # Must have type = "flow"
├── README.md
├── .env
├── uv.lock # REQUIRED for deployment
└── src/
└── my_flow/
├── __init__.py
├── main.py # Entry point with kickoff() function + Flow class
├── crews/ # Embedded crews folder
│ └── poem_crew/
│ ├── __init__.py
│ ├── poem_crew.py # Crew with @CrewBase decorator
│ └── config/
│ ├── agents.yaml
│ └── tasks.yaml
└── tools/
├── __init__.py
└── custom_tool.py
```
<Info>
Both Crews and Flows use the `src/project_name/` structure.
The key difference is that Flows have a `crews/` folder for embedded crews,
while Crews have `crew.py` directly in the project folder.
</Info>
## Pre-Deployment Checklist
Use this checklist to verify your project is ready for deployment.
### 1. Verify pyproject.toml Configuration
Your `pyproject.toml` must include the correct `[tool.crewai]` section:
<Tabs>
<Tab title="For Crews">
```toml
[tool.crewai]
type = "crew"
```
</Tab>
<Tab title="For Flows">
```toml
[tool.crewai]
type = "flow"
```
</Tab>
</Tabs>
<Warning>
If the `type` doesn't match your project structure, the build will fail or
the automation won't run correctly.
</Warning>
### 2. Ensure uv.lock File Exists
CrewAI uses `uv` for dependency management. The `uv.lock` file ensures reproducible builds and is **required** for deployment.
```bash
# Generate or update the lock file
uv lock
# Verify it exists
ls -la uv.lock
```
If the file doesn't exist, run `uv lock` and commit it to your repository:
```bash
uv lock
git add uv.lock
git commit -m "Add uv.lock for deployment"
git push
```
### 3. Validate CrewBase Decorator Usage
**Every crew class must use the `@CrewBase` decorator.** This applies to:
- Standalone crew projects
- Crews embedded inside Flow projects
```python
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
@CrewBase # This decorator is REQUIRED
class MyCrew():
"""My crew description"""
agents: List[BaseAgent]
tasks: List[Task]
@agent
def my_agent(self) -> Agent:
return Agent(
config=self.agents_config['my_agent'], # type: ignore[index]
verbose=True
)
@task
def my_task(self) -> Task:
return Task(
config=self.tasks_config['my_task'] # type: ignore[index]
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
)
```
<Warning>
If you forget the `@CrewBase` decorator, your deployment will fail with
errors about missing agents or tasks configurations.
</Warning>
### 4. Check Project Entry Points
Both Crews and Flows have their entry point in `src/project_name/main.py`:
<Tabs>
<Tab title="For Crews">
The entry point uses a `run()` function:
```python
# src/my_crew/main.py
from my_crew.crew import MyCrew
def run():
"""Run the crew."""
inputs = {'topic': 'AI in Healthcare'}
result = MyCrew().crew().kickoff(inputs=inputs)
return result
if __name__ == "__main__":
run()
```
</Tab>
<Tab title="For Flows">
The entry point uses a `kickoff()` function with a Flow class:
```python
# src/my_flow/main.py
from crewai.flow import Flow, listen, start
from my_flow.crews.poem_crew.poem_crew import PoemCrew
class MyFlow(Flow):
@start()
def begin(self):
# Flow logic here
result = PoemCrew().crew().kickoff(inputs={...})
return result
def kickoff():
"""Run the flow."""
MyFlow().kickoff()
if __name__ == "__main__":
kickoff()
```
</Tab>
</Tabs>
### 5. Prepare Environment Variables
Before deployment, ensure you have:
1. **LLM API keys** ready (OpenAI, Anthropic, Google, etc.)
2. **Tool API keys** if using external tools (Serper, etc.)
<Tip>
Test your project locally with the same environment variables before deploying
to catch configuration issues early.
</Tip>
## Quick Validation Commands
Run these commands from your project root to quickly verify your setup:
```bash
# 1. Check project type in pyproject.toml
grep -A2 "\[tool.crewai\]" pyproject.toml
# 2. Verify uv.lock exists
ls -la uv.lock || echo "ERROR: uv.lock missing! Run 'uv lock'"
# 3. Verify src/ structure exists
ls -la src/*/main.py 2>/dev/null || echo "No main.py found in src/"
# 4. For Crews - verify crew.py exists
ls -la src/*/crew.py 2>/dev/null || echo "No crew.py (expected for Crews)"
# 5. For Flows - verify crews/ folder exists
ls -la src/*/crews/ 2>/dev/null || echo "No crews/ folder (expected for Flows)"
# 6. Check for CrewBase usage
grep -r "@CrewBase" . --include="*.py"
```
## Common Setup Mistakes
| Mistake | Symptom | Fix |
|---------|---------|-----|
| Missing `uv.lock` | Build fails during dependency resolution | Run `uv lock` and commit |
| Wrong `type` in pyproject.toml | Build succeeds but runtime fails | Change to correct type |
| Missing `@CrewBase` decorator | "Config not found" errors | Add decorator to all crew classes |
| Files at root instead of `src/` | Entry point not found | Move to `src/project_name/` |
| Missing `run()` or `kickoff()` | Cannot start automation | Add correct entry function |
## Next Steps
Once your project passes all checklist items, you're ready to deploy:
<Card title="Deploy to AMP" icon="rocket" href="/en/enterprise/guides/deploy-to-amp">
Follow the deployment guide to deploy your Crew or Flow to CrewAI AMP using
the CLI, web interface, or CI/CD integration.
</Card>

View File

@@ -1,43 +1,48 @@
---
title: Agent-to-Agent (A2A) Protocol
description: Enable CrewAI agents to delegate tasks to remote A2A-compliant agents for specialized handling
description: Agents delegate tasks to remote A2A agents and/or operate as A2A-compliant server agents.
icon: network-wired
mode: "wide"
---
## A2A Agent Delegation
CrewAI supports the Agent-to-Agent (A2A) protocol, allowing agents to delegate tasks to remote specialized agents. The agent's LLM automatically decides whether to handle a task directly or delegate to an A2A agent based on the task requirements.
<Note>
A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'`
</Note>
CrewAI treats [A2A protocol](https://a2a-protocol.org/latest/) as a first-class delegation primitive, enabling agents to delegate tasks, request information, and collaborate with remote agents, as well as act as A2A-compliant server agents.
In client mode, agents autonomously choose between local execution and remote delegation based on task requirements.
## How It Works
When an agent is configured with A2A capabilities:
1. The LLM analyzes each task
1. The Agent analyzes each task
2. It decides to either:
- Handle the task directly using its own capabilities
- Delegate to a remote A2A agent for specialized handling
3. If delegating, the agent communicates with the remote A2A agent through the protocol
4. Results are returned to the CrewAI workflow
<Note>
A2A delegation requires the `a2a-sdk` package. Install with: `uv add 'crewai[a2a]'` or `pip install 'crewai[a2a]'`
</Note>
## Basic Configuration
<Warning>
`crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0. Use `A2AClientConfig` for connecting to remote agents and/or `A2AServerConfig` for exposing agents as servers.
</Warning>
Configure an agent for A2A delegation by setting the `a2a` parameter:
```python Code
from crewai import Agent, Crew, Task
from crewai.a2a import A2AConfig
from crewai.a2a import A2AClientConfig
agent = Agent(
role="Research Coordinator",
goal="Coordinate research tasks efficiently",
backstory="Expert at delegating to specialized research agents",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://example.com/.well-known/agent-card.json",
timeout=120,
max_turns=10
@@ -54,9 +59,9 @@ crew = Crew(agents=[agent], tasks=[task], verbose=True)
result = crew.kickoff()
```
## Configuration Options
## Client Configuration Options
The `A2AConfig` class accepts the following parameters:
The `A2AClientConfig` class accepts the following parameters:
<ParamField path="endpoint" type="str" required>
The A2A agent endpoint URL (typically points to `.well-known/agent-card.json`)
@@ -95,14 +100,30 @@ The `A2AConfig` class accepts the following parameters:
Transport protocol for A2A communication. Options: `JSONRPC` (default), `GRPC`, or `HTTP+JSON`.
</ParamField>
<ParamField path="accepted_output_modes" type="list[str]" default='["application/json"]'>
Media types the client can accept in responses.
</ParamField>
<ParamField path="supported_transports" type="list[str]" default='["JSONRPC"]'>
Ordered list of transport protocols the client supports.
</ParamField>
<ParamField path="use_client_preference" type="bool" default="False">
Whether to prioritize client transport preferences over server.
</ParamField>
<ParamField path="extensions" type="list[str]" default="[]">
Extension URIs the client supports.
</ParamField>
## Authentication
For A2A agents that require authentication, use one of the provided auth schemes:
<Tabs>
<Tab title="Bearer Token">
```python Code
from crewai.a2a import A2AConfig
```python bearer_token_auth.py lines
from crewai.a2a import A2AClientConfig
from crewai.a2a.auth import BearerTokenAuth
agent = Agent(
@@ -110,18 +131,18 @@ agent = Agent(
goal="Coordinate tasks with secured agents",
backstory="Manages secure agent communications",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://secure-agent.example.com/.well-known/agent-card.json",
auth=BearerTokenAuth(token="your-bearer-token"),
timeout=120
)
)
```
```
</Tab>
<Tab title="API Key">
```python Code
from crewai.a2a import A2AConfig
```python api_key_auth.py lines
from crewai.a2a import A2AClientConfig
from crewai.a2a.auth import APIKeyAuth
agent = Agent(
@@ -129,7 +150,7 @@ agent = Agent(
goal="Coordinate with API-based agents",
backstory="Manages API-authenticated communications",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://api-agent.example.com/.well-known/agent-card.json",
auth=APIKeyAuth(
api_key="your-api-key",
@@ -139,12 +160,12 @@ agent = Agent(
timeout=120
)
)
```
```
</Tab>
<Tab title="OAuth2">
```python Code
from crewai.a2a import A2AConfig
```python oauth2_auth.py lines
from crewai.a2a import A2AClientConfig
from crewai.a2a.auth import OAuth2ClientCredentials
agent = Agent(
@@ -152,7 +173,7 @@ agent = Agent(
goal="Coordinate with OAuth-secured agents",
backstory="Manages OAuth-authenticated communications",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://oauth-agent.example.com/.well-known/agent-card.json",
auth=OAuth2ClientCredentials(
token_url="https://auth.example.com/oauth/token",
@@ -163,12 +184,12 @@ agent = Agent(
timeout=120
)
)
```
```
</Tab>
<Tab title="HTTP Basic">
```python Code
from crewai.a2a import A2AConfig
```python http_basic_auth.py lines
from crewai.a2a import A2AClientConfig
from crewai.a2a.auth import HTTPBasicAuth
agent = Agent(
@@ -176,7 +197,7 @@ agent = Agent(
goal="Coordinate with basic auth agents",
backstory="Manages basic authentication communications",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://basic-agent.example.com/.well-known/agent-card.json",
auth=HTTPBasicAuth(
username="your-username",
@@ -185,7 +206,7 @@ agent = Agent(
timeout=120
)
)
```
```
</Tab>
</Tabs>
@@ -194,7 +215,7 @@ agent = Agent(
Configure multiple A2A agents for delegation by passing a list:
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a import A2AClientConfig
from crewai.a2a.auth import BearerTokenAuth
agent = Agent(
@@ -203,11 +224,11 @@ agent = Agent(
backstory="Expert at delegating to the right specialist",
llm="gpt-4o",
a2a=[
A2AConfig(
A2AClientConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
timeout=120
),
A2AConfig(
A2AClientConfig(
endpoint="https://data.example.com/.well-known/agent-card.json",
auth=BearerTokenAuth(token="data-token"),
timeout=90
@@ -223,7 +244,7 @@ The LLM will automatically choose which A2A agent to delegate to based on the ta
Control how agent connection failures are handled using the `fail_fast` parameter:
```python Code
from crewai.a2a import A2AConfig
from crewai.a2a import A2AClientConfig
# Fail immediately on connection errors (default)
agent = Agent(
@@ -231,7 +252,7 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
fail_fast=True
)
@@ -244,11 +265,11 @@ agent = Agent(
backstory="Expert at working with available resources",
llm="gpt-4o",
a2a=[
A2AConfig(
A2AClientConfig(
endpoint="https://primary.example.com/.well-known/agent-card.json",
fail_fast=False
),
A2AConfig(
A2AClientConfig(
endpoint="https://backup.example.com/.well-known/agent-card.json",
fail_fast=False
)
@@ -267,8 +288,8 @@ Control how your agent receives task status updates from remote A2A agents:
<Tabs>
<Tab title="Streaming (Default)">
```python Code
from crewai.a2a import A2AConfig
```python streaming_config.py lines
from crewai.a2a import A2AClientConfig
from crewai.a2a.updates import StreamingConfig
agent = Agent(
@@ -276,17 +297,17 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=StreamingConfig()
)
)
```
```
</Tab>
<Tab title="Polling">
```python Code
from crewai.a2a import A2AConfig
```python polling_config.py lines
from crewai.a2a import A2AClientConfig
from crewai.a2a.updates import PollingConfig
agent = Agent(
@@ -294,7 +315,7 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=PollingConfig(
interval=2.0,
@@ -303,12 +324,12 @@ agent = Agent(
)
)
)
```
```
</Tab>
<Tab title="Push Notifications">
```python Code
from crewai.a2a import A2AConfig
```python push_notifications_config.py lines
from crewai.a2a import A2AClientConfig
from crewai.a2a.updates import PushNotificationConfig
agent = Agent(
@@ -316,19 +337,137 @@ agent = Agent(
goal="Coordinate research tasks",
backstory="Expert at delegation",
llm="gpt-4o",
a2a=A2AConfig(
a2a=A2AClientConfig(
endpoint="https://research.example.com/.well-known/agent-card.json",
updates=PushNotificationConfig(
url={base_url}/a2a/callback",
url="{base_url}/a2a/callback",
token="your-validation-token",
timeout=300.0
)
)
)
```
```
</Tab>
</Tabs>
## Exposing Agents as A2A Servers
You can expose your CrewAI agents as A2A-compliant servers, allowing other A2A clients to delegate tasks to them.
### Server Configuration
Add an `A2AServerConfig` to your agent to enable server capabilities:
```python a2a_server_agent.py lines
from crewai import Agent
from crewai.a2a import A2AServerConfig
agent = Agent(
role="Data Analyst",
goal="Analyze datasets and provide insights",
backstory="Expert data scientist with statistical analysis skills",
llm="gpt-4o",
a2a=A2AServerConfig(url="https://your-server.com")
)
```
### Server Configuration Options
<ParamField path="name" type="str" default="None">
Human-readable name for the agent. Defaults to the agent's role if not provided.
</ParamField>
<ParamField path="description" type="str" default="None">
Human-readable description. Defaults to the agent's goal and backstory if not provided.
</ParamField>
<ParamField path="version" type="str" default="1.0.0">
Version string for the agent card.
</ParamField>
<ParamField path="skills" type="list[AgentSkill]" default="[]">
List of agent skills. Auto-generated from agent tools if not provided.
</ParamField>
<ParamField path="capabilities" type="AgentCapabilities" default="AgentCapabilities(streaming=True, push_notifications=False)">
Declaration of optional capabilities supported by the agent.
</ParamField>
<ParamField path="default_input_modes" type="list[str]" default='["text/plain", "application/json"]'>
Supported input MIME types.
</ParamField>
<ParamField path="default_output_modes" type="list[str]" default='["text/plain", "application/json"]'>
Supported output MIME types.
</ParamField>
<ParamField path="url" type="str" default="None">
Preferred endpoint URL. If set, overrides the URL passed to `to_agent_card()`.
</ParamField>
<ParamField path="preferred_transport" type="Literal['JSONRPC', 'GRPC', 'HTTP+JSON']" default="JSONRPC">
Transport protocol for the preferred endpoint.
</ParamField>
<ParamField path="protocol_version" type="str" default="0.3">
A2A protocol version this agent supports.
</ParamField>
<ParamField path="provider" type="AgentProvider" default="None">
Information about the agent's service provider.
</ParamField>
<ParamField path="documentation_url" type="str" default="None">
URL to the agent's documentation.
</ParamField>
<ParamField path="icon_url" type="str" default="None">
URL to an icon for the agent.
</ParamField>
<ParamField path="additional_interfaces" type="list[AgentInterface]" default="[]">
Additional supported interfaces (transport and URL combinations).
</ParamField>
<ParamField path="security" type="list[dict[str, list[str]]]" default="[]">
Security requirement objects for all agent interactions.
</ParamField>
<ParamField path="security_schemes" type="dict[str, SecurityScheme]" default="{}">
Security schemes available to authorize requests.
</ParamField>
<ParamField path="supports_authenticated_extended_card" type="bool" default="False">
Whether agent provides extended card to authenticated users.
</ParamField>
<ParamField path="signatures" type="list[AgentCardSignature]" default="[]">
JSON Web Signatures for the AgentCard.
</ParamField>
### Combined Client and Server
An agent can act as both client and server by providing both configurations:
```python Code
from crewai import Agent
from crewai.a2a import A2AClientConfig, A2AServerConfig
agent = Agent(
role="Research Coordinator",
goal="Coordinate research and serve analysis requests",
backstory="Expert at delegation and analysis",
llm="gpt-4o",
a2a=[
A2AClientConfig(
endpoint="https://specialist.example.com/.well-known/agent-card.json",
timeout=120
),
A2AServerConfig(url="https://your-server.com")
]
)
```
## Best Practices
<CardGroup cols={2}>

View File

@@ -128,7 +128,7 @@ Flow를 배포할 때 다음을 고려하세요:
### CrewAI Enterprise
Flow를 배포하는 가장 쉬운 방법은 CrewAI Enterprise를 사용하는 것입니다. 인프라, 인증 및 모니터링을 대신 처리합니다.
시작하려면 [배포 가이드](/ko/enterprise/guides/deploy-crew)를 확인하세요.
시작하려면 [배포 가이드](/ko/enterprise/guides/deploy-to-amp)를 확인하세요.
```bash
crewai deploy create

View File

@@ -91,7 +91,7 @@ Git 없이 빠르게 배포 — 프로젝트 ZIP 패키지를 업로드하세요
## 관련 문서
<CardGroup cols={3}>
<Card title="크루 배포" href="/ko/enterprise/guides/deploy-crew" icon="rocket">
<Card title="크루 배포" href="/ko/enterprise/guides/deploy-to-amp" icon="rocket">
GitHub 또는 ZIP 파일로 크루 배포
</Card>
<Card title="자동화 트리거" href="/ko/enterprise/guides/automation-triggers" icon="trigger">

View File

@@ -79,7 +79,7 @@ Crew Studio는 자연어와 시각적 워크플로 에디터로 처음부터 자
<Card title="크루 빌드" href="/ko/enterprise/guides/build-crew" icon="paintbrush">
크루를 빌드하세요.
</Card>
<Card title="크루 배포" href="/ko/enterprise/guides/deploy-crew" icon="rocket">
<Card title="크루 배포" href="/ko/enterprise/guides/deploy-to-amp" icon="rocket">
GitHub 또는 ZIP 파일로 크루 배포.
</Card>
<Card title="React 컴포넌트 내보내기" href="/ko/enterprise/guides/react-component-export" icon="download">

View File

@@ -1,305 +0,0 @@
---
title: "Crew 배포"
description: "CrewAI 엔터프라이즈에서 Crew 배포하기"
icon: "rocket"
mode: "wide"
---
<Note>
로컬에서 또는 Crew Studio를 통해 crew를 생성한 후, 다음 단계는 이를 CrewAI AMP
플랫폼에 배포하는 것입니다. 본 가이드에서는 다양한 배포 방법을 다루며,
여러분의 워크플로우에 가장 적합한 방식을 선택할 수 있도록 안내합니다.
</Note>
## 사전 준비 사항
<CardGroup cols={2}>
<Card title="배포 준비가 된 Crew" icon="users">
작동 중인 crew가 로컬에서 빌드되었거나 Crew Studio를 통해 생성되어 있어야
합니다.
</Card>
<Card title="GitHub 저장소" icon="github">
crew 코드가 GitHub 저장소에 있어야 합니다(GitHub 연동 방식의 경우).
</Card>
</CardGroup>
## 옵션 1: CrewAI CLI를 사용한 배포
CLI는 로컬에서 개발된 crew를 Enterprise 플랫폼에 가장 빠르게 배포할 수 있는 방법을 제공합니다.
<Steps>
<Step title="CrewAI CLI 설치">
아직 설치하지 않았다면 CrewAI CLI를 설치하세요:
```bash
pip install crewai[tools]
```
<Tip>
CLI는 기본 CrewAI 패키지에 포함되어 있지만, `[tools]` 추가 옵션을 사용하면 모든 배포 종속성을 함께 설치할 수 있습니다.
</Tip>
</Step>
<Step title="Enterprise 플랫폼에 인증">
먼저, CrewAI AMP 플랫폼에 CLI를 인증해야 합니다:
```bash
# 이미 CrewAI AMP 계정이 있거나 새로 생성하고 싶을 때:
crewai login
```
위 명령어를 실행하면 CLI가 다음을 진행합니다:
1. URL과 고유 기기 코드를 표시합니다
2. 브라우저를 열어 인증 페이지로 이동합니다
3. 기기 확인을 요청합니다
4. 인증 과정을 완료합니다
인증이 성공적으로 완료되면 터미널에 확인 메시지가 표시됩니다!
</Step>
<Step title="배포 생성">
프로젝트 디렉터리에서 다음 명령어를 실행하세요:
```bash
crewai deploy create
```
이 명령어는 다음을 수행합니다:
1. GitHub 저장소 정보를 감지합니다
2. 로컬 `.env` 파일의 환경 변수를 식별합니다
3. 이러한 변수를 Enterprise 플랫폼으로 안전하게 전송합니다
4. 고유 식별자가 부여된 새 배포를 만듭니다
성공적으로 생성되면 다음과 같은 메시지가 표시됩니다:
```shell
Deployment created successfully!
Name: your_project_name
Deployment ID: 01234567-89ab-cdef-0123-456789abcdef
Current Status: Deploy Enqueued
```
</Step>
<Step title="배포 진행 상황 모니터링">
다음 명령어로 배포 상태를 추적할 수 있습니다:
```bash
crewai deploy status
```
빌드 과정의 상세 로그가 필요하다면:
```bash
crewai deploy logs
```
<Tip>
첫 배포는 컨테이너 이미지를 빌드하므로 일반적으로 10~15분 정도 소요됩니다. 이후 배포는 훨씬 빠릅니다.
</Tip>
</Step>
</Steps>
## 추가 CLI 명령어
CrewAI CLI는 배포를 관리하기 위한 여러 명령어를 제공합니다:
```bash
# 모든 배포 목록 확인
crewai deploy list
# 배포 상태 확인
crewai deploy status
# 배포 로그 보기
crewai deploy logs
# 코드 변경 후 업데이트 푸시
crewai deploy push
# 배포 삭제
crewai deploy remove <deployment_id>
```
## 옵션 2: 웹 인터페이스를 통한 직접 배포
GitHub 계정을 연결하여 CrewAI AMP 웹 인터페이스를 통해 crews를 직접 배포할 수도 있습니다. 이 방법은 로컬 머신에서 CLI를 사용할 필요가 없습니다.
<Steps>
<Step title="GitHub로 푸시하기">
crew를 GitHub 저장소에 푸시해야 합니다. 아직 crew를 만들지 않았다면, [이 튜토리얼](/ko/quickstart)을 따라할 수 있습니다.
</Step>
<Step title="GitHub를 CrewAI AOP에 연결하기">
1. [CrewAI AMP](https://app.crewai.com)에 로그인합니다.
2. "Connect GitHub" 버튼을 클릭합니다.
<Frame>
![Connect GitHub Button](/images/enterprise/connect-github.png)
</Frame>
</Step>
<Step title="저장소 선택하기">
GitHub 계정을 연결한 후 배포할 저장소를 선택할 수 있습니다:
<Frame>
![Select Repository](/images/enterprise/select-repo.png)
</Frame>
</Step>
<Step title="환경 변수 설정하기">
배포 전에, LLM 제공업체 또는 기타 서비스에 연결할 환경 변수를 설정해야 합니다:
1. 변수를 개별적으로 또는 일괄적으로 추가할 수 있습니다.
2. 환경 변수는 `KEY=VALUE` 형식(한 줄에 하나씩)으로 입력합니다.
<Frame>
![Set Environment Variables](/images/enterprise/set-env-variables.png)
</Frame>
</Step>
<Step title="Crew 배포하기">
1. "Deploy" 버튼을 클릭하여 배포 프로세스를 시작합니다.
2. 진행 바를 통해 진행 상황을 모니터링할 수 있습니다.
3. 첫 번째 배포에는 일반적으로 약 10-15분 정도 소요되며, 이후 배포는 더 빠릅니다.
<Frame>
![Deploy Progress](/images/enterprise/deploy-progress.png)
</Frame>
배포가 완료되면 다음을 확인할 수 있습니다:
- crew의 고유 URL
- crew API를 보호할 Bearer 토큰
- 배포를 삭제해야 하는 경우 "Delete" 버튼
</Step>
</Steps>
## ⚠️ 환경 변수 보안 요구사항
<Warning>
**중요**: CrewAI AOP는 환경 변수 이름에 대한 보안 제한이 있으며, 이를 따르지
않을 경우 배포가 실패할 수 있습니다.
</Warning>
### 차단된 환경 변수 패턴
보안상의 이유로, 다음과 같은 환경 변수 명명 패턴은 **자동으로 필터링**되며 배포에 문제가 발생할 수 있습니다:
**차단된 패턴:**
- `_TOKEN`으로 끝나는 변수 (예: `MY_API_TOKEN`)
- `_PASSWORD`로 끝나는 변수 (예: `DB_PASSWORD`)
- `_SECRET`로 끝나는 변수 (예: `API_SECRET`)
- 특정 상황에서 `_KEY`로 끝나는 변수
**특정 차단 변수:**
- `GITHUB_USER`, `GITHUB_TOKEN`
- `AWS_REGION`, `AWS_DEFAULT_REGION`
- 다양한 내부 CrewAI 시스템 변수
### 허용된 예외
일부 변수는 차단된 패턴과 일치하더라도 명시적으로 허용됩니다:
- `AZURE_AD_TOKEN`
- `AZURE_OPENAI_AD_TOKEN`
- `ENTERPRISE_ACTION_TOKEN`
- `CREWAI_ENTEPRISE_TOOLS_TOKEN`
### 네이밍 문제 해결 방법
환경 변수 제한으로 인해 배포가 실패하는 경우:
```bash
# ❌ 이러한 이름은 배포 실패를 초래합니다
OPENAI_TOKEN=sk-...
DATABASE_PASSWORD=mypassword
API_SECRET=secret123
# ✅ 대신 다음과 같은 네이밍 패턴을 사용하세요
OPENAI_API_KEY=sk-...
DATABASE_CREDENTIALS=mypassword
API_CONFIG=secret123
```
### 모범 사례
1. **표준 명명 규칙 사용**: `PROVIDER_TOKEN` 대신 `PROVIDER_API_KEY` 사용
2. **먼저 로컬에서 테스트**: crew가 이름이 변경된 변수로 제대로 동작하는지 확인
3. **코드 업데이트**: 이전 변수 이름을 참조하는 부분을 모두 변경
4. **변경 내용 문서화**: 팀을 위해 이름이 변경된 변수를 기록
<Tip>
배포 실패 시, 환경 변수 에러 메시지가 난해하다면 먼저 변수 이름이 이 패턴을
따르는지 확인하세요.
</Tip>
### 배포된 Crew와 상호작용하기
배포가 완료되면 다음을 통해 crew에 접근할 수 있습니다:
1. **REST API**: 플랫폼에서 아래의 주요 경로가 포함된 고유한 HTTPS 엔드포인트를 생성합니다:
- `/inputs`: 필요한 입력 파라미터 목록
- `/kickoff`: 제공된 입력값으로 실행 시작
- `/status/{kickoff_id}`: 실행 상태 확인
2. **웹 인터페이스**: [app.crewai.com](https://app.crewai.com)에 방문하여 다음을 확인할 수 있습니다:
- **Status 탭**: 배포 정보, API 엔드포인트 세부 정보 및 인증 토큰 확인
- **Run 탭**: crew 구조의 시각적 표현
- **Executions 탭**: 모든 실행 내역
- **Metrics 탭**: 성능 분석
- **Traces 탭**: 상세 실행 인사이트
### 실행 트리거하기
Enterprise 대시보드에서 다음 작업을 수행할 수 있습니다:
1. crew 이름을 클릭하여 상세 정보를 엽니다
2. 관리 인터페이스에서 "Trigger Crew"를 선택합니다
3. 나타나는 모달에 필요한 입력값을 입력합니다
4. 파이프라인을 따라 실행의 진행 상황을 모니터링합니다
### 모니터링 및 분석
Enterprise 플랫폼은 포괄적인 가시성 기능을 제공합니다:
- **실행 관리**: 활성 및 완료된 실행 추적
- **트레이스**: 각 실행의 상세 분해
- **메트릭**: 토큰 사용량, 실행 시간, 비용
- **타임라인 보기**: 작업 시퀀스의 시각적 표현
### 고급 기능
Enterprise 플랫폼은 또한 다음을 제공합니다:
- **환경 변수 관리**: API 키를 안전하게 저장 및 관리
- **LLM 연결**: 다양한 LLM 공급자와의 통합 구성
- **Custom Tools Repository**: 도구 생성, 공유 및 설치
- **Crew Studio**: 코드를 작성하지 않고 채팅 인터페이스를 통해 crew 빌드
<Card
title="도움이 필요하신가요?"
icon="headset"
href="mailto:support@crewai.com"
>
Enterprise 플랫폼의 배포 문제 또는 문의 사항이 있으시면 지원팀에 연락해
주십시오.
</Card>

View File

@@ -0,0 +1,438 @@
---
title: "AMP에 배포하기"
description: "Crew 또는 Flow를 CrewAI AMP에 배포하기"
icon: "rocket"
mode: "wide"
---
<Note>
로컬에서 또는 Crew Studio를 통해 Crew나 Flow를 생성한 후, 다음 단계는 이를 CrewAI AMP
플랫폼에 배포하는 것입니다. 본 가이드에서는 다양한 배포 방법을 다루며,
여러분의 워크플로우에 가장 적합한 방식을 선택할 수 있도록 안내합니다.
</Note>
## 사전 준비 사항
<CardGroup cols={2}>
<Card title="배포 준비가 완료된 프로젝트" icon="check-circle">
로컬에서 성공적으로 실행되는 Crew 또는 Flow가 있어야 합니다.
[배포 준비 가이드](/ko/enterprise/guides/prepare-for-deployment)를 따라 프로젝트 구조를 확인하세요.
</Card>
<Card title="GitHub 저장소" icon="github">
코드가 GitHub 저장소에 있어야 합니다(GitHub 연동 방식의 경우).
</Card>
</CardGroup>
<Info>
**Crews vs Flows**: 두 프로젝트 유형 모두 CrewAI AMP에서 "자동화"로 배포할 수 있습니다.
배포 과정은 동일하지만, 프로젝트 구조가 다릅니다.
자세한 내용은 [배포 준비하기](/ko/enterprise/guides/prepare-for-deployment)를 참조하세요.
</Info>
## 옵션 1: CrewAI CLI를 사용한 배포
CLI는 로컬에서 개발된 Crew 또는 Flow를 AMP 플랫폼에 가장 빠르게 배포할 수 있는 방법을 제공합니다.
CLI는 `pyproject.toml`에서 프로젝트 유형을 자동으로 감지하고 그에 맞게 빌드합니다.
<Steps>
<Step title="CrewAI CLI 설치">
아직 설치하지 않았다면 CrewAI CLI를 설치하세요:
```bash
pip install crewai[tools]
```
<Tip>
CLI는 기본 CrewAI 패키지에 포함되어 있지만, `[tools]` 추가 옵션을 사용하면 모든 배포 종속성을 함께 설치할 수 있습니다.
</Tip>
</Step>
<Step title="Enterprise 플랫폼에 인증">
먼저, CrewAI AMP 플랫폼에 CLI를 인증해야 합니다:
```bash
# 이미 CrewAI AMP 계정이 있거나 새로 생성하고 싶을 때:
crewai login
```
위 명령어를 실행하면 CLI가 다음을 진행합니다:
1. URL과 고유 기기 코드를 표시합니다
2. 브라우저를 열어 인증 페이지로 이동합니다
3. 기기 확인을 요청합니다
4. 인증 과정을 완료합니다
인증이 성공적으로 완료되면 터미널에 확인 메시지가 표시됩니다!
</Step>
<Step title="배포 생성">
프로젝트 디렉터리에서 다음 명령어를 실행하세요:
```bash
crewai deploy create
```
이 명령어는 다음을 수행합니다:
1. GitHub 저장소 정보를 감지합니다
2. 로컬 `.env` 파일의 환경 변수를 식별합니다
3. 이러한 변수를 Enterprise 플랫폼으로 안전하게 전송합니다
4. 고유 식별자가 부여된 새 배포를 만듭니다
성공적으로 생성되면 다음과 같은 메시지가 표시됩니다:
```shell
Deployment created successfully!
Name: your_project_name
Deployment ID: 01234567-89ab-cdef-0123-456789abcdef
Current Status: Deploy Enqueued
```
</Step>
<Step title="배포 진행 상황 모니터링">
다음 명령어로 배포 상태를 추적할 수 있습니다:
```bash
crewai deploy status
```
빌드 과정의 상세 로그가 필요하다면:
```bash
crewai deploy logs
```
<Tip>
첫 배포는 컨테이너 이미지를 빌드하므로 일반적으로 10~15분 정도 소요됩니다. 이후 배포는 훨씬 빠릅니다.
</Tip>
</Step>
</Steps>
## 추가 CLI 명령어
CrewAI CLI는 배포를 관리하기 위한 여러 명령어를 제공합니다:
```bash
# 모든 배포 목록 확인
crewai deploy list
# 배포 상태 확인
crewai deploy status
# 배포 로그 보기
crewai deploy logs
# 코드 변경 후 업데이트 푸시
crewai deploy push
# 배포 삭제
crewai deploy remove <deployment_id>
```
## 옵션 2: 웹 인터페이스를 통한 직접 배포
GitHub 계정을 연결하여 CrewAI AMP 웹 인터페이스를 통해 Crew 또는 Flow를 직접 배포할 수도 있습니다. 이 방법은 로컬 머신에서 CLI를 사용할 필요가 없습니다. 플랫폼은 자동으로 프로젝트 유형을 감지하고 적절하게 빌드를 처리합니다.
<Steps>
<Step title="GitHub로 푸시하기">
Crew를 GitHub 저장소에 푸시해야 합니다. 아직 Crew를 만들지 않았다면, [이 튜토리얼](/ko/quickstart)을 따라할 수 있습니다.
</Step>
<Step title="GitHub를 CrewAI AMP에 연결하기">
1. [CrewAI AMP](https://app.crewai.com)에 로그인합니다.
2. "Connect GitHub" 버튼을 클릭합니다.
<Frame>
![Connect GitHub Button](/images/enterprise/connect-github.png)
</Frame>
</Step>
<Step title="저장소 선택하기">
GitHub 계정을 연결한 후 배포할 저장소를 선택할 수 있습니다:
<Frame>
![Select Repository](/images/enterprise/select-repo.png)
</Frame>
</Step>
<Step title="환경 변수 설정하기">
배포 전에, LLM 제공업체 또는 기타 서비스에 연결할 환경 변수를 설정해야 합니다:
1. 변수를 개별적으로 또는 일괄적으로 추가할 수 있습니다.
2. 환경 변수는 `KEY=VALUE` 형식(한 줄에 하나씩)으로 입력합니다.
<Frame>
![Set Environment Variables](/images/enterprise/set-env-variables.png)
</Frame>
</Step>
<Step title="Crew 배포하기">
1. "Deploy" 버튼을 클릭하여 배포 프로세스를 시작합니다.
2. 진행 바를 통해 진행 상황을 모니터링할 수 있습니다.
3. 첫 번째 배포에는 일반적으로 약 10-15분 정도 소요되며, 이후 배포는 더 빠릅니다.
<Frame>
![Deploy Progress](/images/enterprise/deploy-progress.png)
</Frame>
배포가 완료되면 다음을 확인할 수 있습니다:
- Crew의 고유 URL
- Crew API를 보호할 Bearer 토큰
- 배포를 삭제해야 하는 경우 "Delete" 버튼
</Step>
</Steps>
## 옵션 3: API를 통한 재배포 (CI/CD 통합)
CI/CD 파이프라인에서 자동화된 배포를 위해 CrewAI API를 사용하여 기존 crew의 재배포를 트리거할 수 있습니다. 이 방법은 GitHub Actions, Jenkins 또는 기타 자동화 워크플로우에 특히 유용합니다.
<Steps>
<Step title="개인 액세스 토큰 발급">
CrewAI AMP 계정 설정에서 API 토큰을 생성합니다:
1. [app.crewai.com](https://app.crewai.com)으로 이동합니다
2. **Settings** → **Account** → **Personal Access Token**을 클릭합니다
3. 새 토큰을 생성하고 안전하게 복사합니다
4. 이 토큰을 CI/CD 시스템의 시크릿으로 저장합니다
</Step>
<Step title="Automation UUID 찾기">
배포된 crew의 고유 식별자를 찾습니다:
1. CrewAI AMP 대시보드에서 **Automations**로 이동합니다
2. 기존 automation/crew를 선택합니다
3. **Additional Details**를 클릭합니다
4. **UUID**를 복사합니다 - 이것이 특정 crew 배포를 식별합니다
</Step>
<Step title="API를 통한 재배포 트리거">
Deploy API 엔드포인트를 사용하여 재배포를 트리거합니다:
```bash
curl -i -X POST \
-H "Authorization: Bearer YOUR_PERSONAL_ACCESS_TOKEN" \
https://app.crewai.com/crewai_plus/api/v1/crews/YOUR-AUTOMATION-UUID/deploy
# HTTP/2 200
# content-type: application/json
#
# {
# "uuid": "your-automation-uuid",
# "status": "Deploy Enqueued",
# "public_url": "https://your-crew-deployment.crewai.com",
# "token": "your-bearer-token"
# }
```
<Info>
Git에 연결되어 처음 생성된 automation의 경우, API가 재배포 전에 자동으로 저장소에서 최신 변경 사항을 가져옵니다.
</Info>
</Step>
<Step title="GitHub Actions 통합 예시">
더 복잡한 배포 트리거가 있는 GitHub Actions 워크플로우 예시입니다:
```yaml
name: Deploy CrewAI Automation
on:
push:
branches: [ main ]
pull_request:
types: [ labeled ]
release:
types: [ published ]
jobs:
deploy:
runs-on: ubuntu-latest
if: |
(github.event_name == 'push' && github.ref == 'refs/heads/main') ||
(github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'deploy')) ||
(github.event_name == 'release')
steps:
- name: Trigger CrewAI Redeployment
run: |
curl -X POST \
-H "Authorization: Bearer ${{ secrets.CREWAI_PAT }}" \
https://app.crewai.com/crewai_plus/api/v1/crews/${{ secrets.CREWAI_AUTOMATION_UUID }}/deploy
```
<Tip>
`CREWAI_PAT`와 `CREWAI_AUTOMATION_UUID`를 저장소 시크릿으로 추가하세요. PR 배포의 경우 "deploy" 라벨을 추가하여 워크플로우를 트리거합니다.
</Tip>
</Step>
</Steps>
## 배포된 Automation과 상호작용하기
배포가 완료되면 다음을 통해 crew에 접근할 수 있습니다:
1. **REST API**: 플랫폼에서 아래의 주요 경로가 포함된 고유한 HTTPS 엔드포인트를 생성합니다:
- `/inputs`: 필요한 입력 파라미터 목록
- `/kickoff`: 제공된 입력값으로 실행 시작
- `/status/{kickoff_id}`: 실행 상태 확인
2. **웹 인터페이스**: [app.crewai.com](https://app.crewai.com)에 방문하여 다음을 확인할 수 있습니다:
- **Status 탭**: 배포 정보, API 엔드포인트 세부 정보 및 인증 토큰 확인
- **Run 탭**: Crew 구조의 시각적 표현
- **Executions 탭**: 모든 실행 내역
- **Metrics 탭**: 성능 분석
- **Traces 탭**: 상세 실행 인사이트
### 실행 트리거하기
Enterprise 대시보드에서 다음 작업을 수행할 수 있습니다:
1. Crew 이름을 클릭하여 상세 정보를 엽니다
2. 관리 인터페이스에서 "Trigger Crew"를 선택합니다
3. 나타나는 모달에 필요한 입력값을 입력합니다
4. 파이프라인을 따라 실행의 진행 상황을 모니터링합니다
### 모니터링 및 분석
Enterprise 플랫폼은 포괄적인 가시성 기능을 제공합니다:
- **실행 관리**: 활성 및 완료된 실행 추적
- **트레이스**: 각 실행의 상세 분해
- **메트릭**: 토큰 사용량, 실행 시간, 비용
- **타임라인 보기**: 작업 시퀀스의 시각적 표현
### 고급 기능
Enterprise 플랫폼은 또한 다음을 제공합니다:
- **환경 변수 관리**: API 키를 안전하게 저장 및 관리
- **LLM 연결**: 다양한 LLM 공급자와의 통합 구성
- **Custom Tools Repository**: 도구 생성, 공유 및 설치
- **Crew Studio**: 코드를 작성하지 않고 채팅 인터페이스를 통해 crew 빌드
## 배포 실패 문제 해결
배포가 실패하면 다음과 같은 일반적인 문제를 확인하세요:
### 빌드 실패
#### uv.lock 파일 누락
**증상**: 의존성 해결 오류와 함께 빌드 초기에 실패
**해결책**: lock 파일을 생성하고 커밋합니다:
```bash
uv lock
git add uv.lock
git commit -m "Add uv.lock for deployment"
git push
```
<Warning>
`uv.lock` 파일은 모든 배포에 필수입니다. 이 파일이 없으면 플랫폼에서
의존성을 안정적으로 설치할 수 없습니다.
</Warning>
#### 잘못된 프로젝트 구조
**증상**: "Could not find entry point" 또는 "Module not found" 오류
**해결책**: 프로젝트가 예상 구조와 일치하는지 확인합니다:
- **Crews와 Flows 모두**: 진입점이 `src/project_name/main.py`에 있어야 합니다
- **Crews**: 진입점으로 `run()` 함수 사용
- **Flows**: 진입점으로 `kickoff()` 함수 사용
자세한 구조 다이어그램은 [배포 준비하기](/ko/enterprise/guides/prepare-for-deployment)를 참조하세요.
#### CrewBase 데코레이터 누락
**증상**: "Crew not found", "Config not found" 또는 agent/task 구성 오류
**해결책**: **모든** crew 클래스가 `@CrewBase` 데코레이터를 사용하는지 확인합니다:
```python
from crewai.project import CrewBase, agent, crew, task
@CrewBase # 이 데코레이터는 필수입니다
class YourCrew():
"""Crew 설명"""
@agent
def my_agent(self) -> Agent:
return Agent(
config=self.agents_config['my_agent'], # type: ignore[index]
verbose=True
)
# ... 나머지 crew 정의
```
<Info>
이것은 독립 실행형 Crews와 Flow 프로젝트 내에 포함된 crews 모두에 적용됩니다.
모든 crew 클래스에 데코레이터가 필요합니다.
</Info>
#### 잘못된 pyproject.toml 타입
**증상**: 빌드는 성공하지만 런타임에서 실패하거나 예상치 못한 동작
**해결책**: `[tool.crewai]` 섹션이 프로젝트 유형과 일치하는지 확인합니다:
```toml
# Crew 프로젝트의 경우:
[tool.crewai]
type = "crew"
# Flow 프로젝트의 경우:
[tool.crewai]
type = "flow"
```
### 런타임 실패
#### LLM 연결 실패
**증상**: API 키 오류, "model not found" 또는 인증 실패
**해결책**:
1. LLM 제공업체의 API 키가 환경 변수에 올바르게 설정되어 있는지 확인합니다
2. 환경 변수 이름이 코드에서 예상하는 것과 일치하는지 확인합니다
3. 배포 전에 동일한 환경 변수로 로컬에서 테스트합니다
#### Crew 실행 오류
**증상**: Crew가 시작되지만 실행 중에 실패
**해결책**:
1. AMP 대시보드에서 실행 로그를 확인합니다 (Traces 탭)
2. 모든 도구에 필요한 API 키가 구성되어 있는지 확인합니다
3. `agents.yaml`의 agent 구성이 유효한지 확인합니다
4. `tasks.yaml`의 task 구성에 구문 오류가 없는지 확인합니다
<Card title="도움이 필요하신가요?" icon="headset" href="mailto:support@crewai.com">
배포 문제 또는 AMP 플랫폼에 대한 문의 사항이 있으시면 지원팀에 연락해 주세요.
</Card>

View File

@@ -0,0 +1,305 @@
---
title: "배포 준비하기"
description: "Crew 또는 Flow가 CrewAI AMP에 배포될 준비가 되었는지 확인하기"
icon: "clipboard-check"
mode: "wide"
---
<Note>
CrewAI AMP에 배포하기 전에, 프로젝트가 올바르게 구성되어 있는지 확인하는 것이 중요합니다.
Crews와 Flows 모두 "자동화"로 배포할 수 있지만, 성공적인 배포를 위해 충족해야 하는
서로 다른 프로젝트 구조와 요구 사항이 있습니다.
</Note>
## 자동화 이해하기
CrewAI AMP에서 **자동화(automations)**는 배포 가능한 Agentic AI 프로젝트의 총칭입니다. 자동화는 다음 중 하나일 수 있습니다:
- **Crew**: 작업을 함께 수행하는 AI 에이전트들의 독립 실행형 팀
- **Flow**: 여러 crew, 직접 LLM 호출 및 절차적 로직을 결합할 수 있는 오케스트레이션된 워크플로우
배포하는 유형을 이해하는 것은 프로젝트 구조와 진입점이 다르기 때문에 필수적입니다.
## Crews vs Flows: 주요 차이점
<CardGroup cols={2}>
<Card title="Crew 프로젝트" icon="users">
에이전트와 작업을 정의하는 `crew.py`가 있는 독립 실행형 AI 에이전트 팀. 집중적이고 협업적인 작업에 적합합니다.
</Card>
<Card title="Flow 프로젝트" icon="diagram-project">
`crews/` 폴더에 포함된 crew가 있는 오케스트레이션된 워크플로우. 복잡한 다단계 프로세스에 적합합니다.
</Card>
</CardGroup>
| 측면 | Crew | Flow |
|------|------|------|
| **프로젝트 구조** | `crew.py`가 있는 `src/project_name/` | `crews/` 폴더가 있는 `src/project_name/` |
| **메인 로직 위치** | `src/project_name/crew.py` | `src/project_name/main.py` (Flow 클래스) |
| **진입점 함수** | `main.py`의 `run()` | `main.py`의 `kickoff()` |
| **pyproject.toml 타입** | `type = "crew"` | `type = "flow"` |
| **CLI 생성 명령어** | `crewai create crew name` | `crewai create flow name` |
| **설정 위치** | `src/project_name/config/` | `src/project_name/crews/crew_name/config/` |
| **다른 crew 포함 가능** | 아니오 | 예 (`crews/` 폴더 내) |
## 프로젝트 구조 참조
### Crew 프로젝트 구조
`crewai create crew my_crew`를 실행하면 다음 구조를 얻습니다:
```
my_crew/
├── .gitignore
├── pyproject.toml # type = "crew"여야 함
├── README.md
├── .env
├── uv.lock # 배포에 필수
└── src/
└── my_crew/
├── __init__.py
├── main.py # run() 함수가 있는 진입점
├── crew.py # @CrewBase 데코레이터가 있는 Crew 클래스
├── tools/
│ ├── custom_tool.py
│ └── __init__.py
└── config/
├── agents.yaml # 에이전트 정의
└── tasks.yaml # 작업 정의
```
<Warning>
중첩된 `src/project_name/` 구조는 Crews에 매우 중요합니다.
잘못된 레벨에 파일을 배치하면 배포 실패의 원인이 됩니다.
</Warning>
### Flow 프로젝트 구조
`crewai create flow my_flow`를 실행하면 다음 구조를 얻습니다:
```
my_flow/
├── .gitignore
├── pyproject.toml # type = "flow"여야 함
├── README.md
├── .env
├── uv.lock # 배포에 필수
└── src/
└── my_flow/
├── __init__.py
├── main.py # kickoff() 함수 + Flow 클래스가 있는 진입점
├── crews/ # 포함된 crews 폴더
│ └── poem_crew/
│ ├── __init__.py
│ ├── poem_crew.py # @CrewBase 데코레이터가 있는 Crew
│ └── config/
│ ├── agents.yaml
│ └── tasks.yaml
└── tools/
├── __init__.py
└── custom_tool.py
```
<Info>
Crews와 Flows 모두 `src/project_name/` 구조를 사용합니다.
핵심 차이점은 Flows는 포함된 crews를 위한 `crews/` 폴더가 있고,
Crews는 프로젝트 폴더에 직접 `crew.py`가 있다는 것입니다.
</Info>
## 배포 전 체크리스트
이 체크리스트를 사용하여 프로젝트가 배포 준비가 되었는지 확인하세요.
### 1. pyproject.toml 설정 확인
`pyproject.toml`에 올바른 `[tool.crewai]` 섹션이 포함되어야 합니다:
<Tabs>
<Tab title="Crews의 경우">
```toml
[tool.crewai]
type = "crew"
```
</Tab>
<Tab title="Flows의 경우">
```toml
[tool.crewai]
type = "flow"
```
</Tab>
</Tabs>
<Warning>
`type`이 프로젝트 구조와 일치하지 않으면 빌드가 실패하거나
자동화가 올바르게 실행되지 않습니다.
</Warning>
### 2. uv.lock 파일 존재 확인
CrewAI는 의존성 관리를 위해 `uv`를 사용합니다. `uv.lock` 파일은 재현 가능한 빌드를 보장하며 배포에 **필수**입니다.
```bash
# lock 파일 생성 또는 업데이트
uv lock
# 존재 여부 확인
ls -la uv.lock
```
파일이 존재하지 않으면 `uv lock`을 실행하고 저장소에 커밋하세요:
```bash
uv lock
git add uv.lock
git commit -m "Add uv.lock for deployment"
git push
```
### 3. CrewBase 데코레이터 사용 확인
**모든 crew 클래스는 `@CrewBase` 데코레이터를 사용해야 합니다.** 이것은 다음에 적용됩니다:
- 독립 실행형 crew 프로젝트
- Flow 프로젝트 내에 포함된 crews
```python
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
@CrewBase # 이 데코레이터는 필수입니다
class MyCrew():
"""내 crew 설명"""
agents: List[BaseAgent]
tasks: List[Task]
@agent
def my_agent(self) -> Agent:
return Agent(
config=self.agents_config['my_agent'], # type: ignore[index]
verbose=True
)
@task
def my_task(self) -> Task:
return Task(
config=self.tasks_config['my_task'] # type: ignore[index]
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
)
```
<Warning>
`@CrewBase` 데코레이터를 잊으면 에이전트나 작업 구성이 누락되었다는
오류와 함께 배포가 실패합니다.
</Warning>
### 4. 프로젝트 진입점 확인
Crews와 Flows 모두 `src/project_name/main.py`에 진입점이 있습니다:
<Tabs>
<Tab title="Crews의 경우">
진입점은 `run()` 함수를 사용합니다:
```python
# src/my_crew/main.py
from my_crew.crew import MyCrew
def run():
"""crew를 실행합니다."""
inputs = {'topic': 'AI in Healthcare'}
result = MyCrew().crew().kickoff(inputs=inputs)
return result
if __name__ == "__main__":
run()
```
</Tab>
<Tab title="Flows의 경우">
진입점은 Flow 클래스와 함께 `kickoff()` 함수를 사용합니다:
```python
# src/my_flow/main.py
from crewai.flow import Flow, listen, start
from my_flow.crews.poem_crew.poem_crew import PoemCrew
class MyFlow(Flow):
@start()
def begin(self):
# Flow 로직
result = PoemCrew().crew().kickoff(inputs={...})
return result
def kickoff():
"""flow를 실행합니다."""
MyFlow().kickoff()
if __name__ == "__main__":
kickoff()
```
</Tab>
</Tabs>
### 5. 환경 변수 준비
배포 전에 다음을 준비해야 합니다:
1. **LLM API 키** (OpenAI, Anthropic, Google 등)
2. **도구 API 키** - 외부 도구를 사용하는 경우 (Serper 등)
<Tip>
구성 문제를 조기에 발견하기 위해 배포 전에 동일한 환경 변수로
로컬에서 프로젝트를 테스트하세요.
</Tip>
## 빠른 검증 명령어
프로젝트 루트에서 다음 명령어를 실행하여 설정을 빠르게 확인하세요:
```bash
# 1. pyproject.toml에서 프로젝트 타입 확인
grep -A2 "\[tool.crewai\]" pyproject.toml
# 2. uv.lock 존재 확인
ls -la uv.lock || echo "오류: uv.lock이 없습니다! 'uv lock'을 실행하세요"
# 3. src/ 구조 존재 확인
ls -la src/*/main.py 2>/dev/null || echo "src/에서 main.py를 찾을 수 없습니다"
# 4. Crews의 경우 - crew.py 존재 확인
ls -la src/*/crew.py 2>/dev/null || echo "crew.py가 없습니다 (Crews에서 예상됨)"
# 5. Flows의 경우 - crews/ 폴더 존재 확인
ls -la src/*/crews/ 2>/dev/null || echo "crews/ 폴더가 없습니다 (Flows에서 예상됨)"
# 6. CrewBase 사용 확인
grep -r "@CrewBase" . --include="*.py"
```
## 일반적인 설정 실수
| 실수 | 증상 | 해결 방법 |
|------|------|----------|
| `uv.lock` 누락 | 의존성 해결 중 빌드 실패 | `uv lock` 실행 후 커밋 |
| pyproject.toml의 잘못된 `type` | 빌드 성공하지만 런타임 실패 | 올바른 타입으로 변경 |
| `@CrewBase` 데코레이터 누락 | "Config not found" 오류 | 모든 crew 클래스에 데코레이터 추가 |
| `src/` 대신 루트에 파일 배치 | 진입점을 찾을 수 없음 | `src/project_name/`으로 이동 |
| `run()` 또는 `kickoff()` 누락 | 자동화를 시작할 수 없음 | 올바른 진입 함수 추가 |
## 다음 단계
프로젝트가 모든 체크리스트 항목을 통과하면 배포할 준비가 된 것입니다:
<Card title="AMP에 배포하기" icon="rocket" href="/ko/enterprise/guides/deploy-to-amp">
CLI, 웹 인터페이스 또는 CI/CD 통합을 사용하여 Crew 또는 Flow를 CrewAI AMP에
배포하려면 배포 가이드를 따르세요.
</Card>

View File

@@ -79,7 +79,7 @@ CrewAI AOP는 오픈 소스 프레임워크의 강력함에 프로덕션 배포,
<Card
title="Crew 배포"
icon="rocket"
href="/ko/enterprise/guides/deploy-crew"
href="/ko/enterprise/guides/deploy-to-amp"
>
Crew 배포
</Card>
@@ -96,4 +96,4 @@ CrewAI AOP는 오픈 소스 프레임워크의 강력함에 프로덕션 배포,
</Step>
</Steps>
자세한 안내를 원하시면 [배포 가이드](/ko/enterprise/guides/deploy-crew)를 확인하거나 아래 버튼을 클릭해 시작하세요.
자세한 안내를 원하시면 [배포 가이드](/ko/enterprise/guides/deploy-to-amp)를 확인하거나 아래 버튼을 클릭해 시작하세요.

View File

@@ -128,7 +128,7 @@ Ao implantar seu Flow, considere o seguinte:
### CrewAI Enterprise
A maneira mais fácil de implantar seu Flow é usando o CrewAI Enterprise. Ele lida com a infraestrutura, autenticação e monitoramento para você.
Confira o [Guia de Implantação](/pt-BR/enterprise/guides/deploy-crew) para começar.
Confira o [Guia de Implantação](/pt-BR/enterprise/guides/deploy-to-amp) para começar.
```bash
crewai deploy create

View File

@@ -91,7 +91,7 @@ Após implantar, você pode ver os detalhes da automação e usar o menu **Optio
## Relacionados
<CardGroup cols={3}>
<Card title="Implantar um Crew" href="/pt-BR/enterprise/guides/deploy-crew" icon="rocket">
<Card title="Implantar um Crew" href="/pt-BR/enterprise/guides/deploy-to-amp" icon="rocket">
Implante um Crew via GitHub ou arquivo ZIP.
</Card>
<Card title="Gatilhos de Automação" href="/pt-BR/enterprise/guides/automation-triggers" icon="trigger">

View File

@@ -79,7 +79,7 @@ Após publicar, você pode visualizar os detalhes da automação e usar o menu *
<Card title="Criar um Crew" href="/pt-BR/enterprise/guides/build-crew" icon="paintbrush">
Crie um Crew.
</Card>
<Card title="Implantar um Crew" href="/pt-BR/enterprise/guides/deploy-crew" icon="rocket">
<Card title="Implantar um Crew" href="/pt-BR/enterprise/guides/deploy-to-amp" icon="rocket">
Implante um Crew via GitHub ou ZIP.
</Card>
<Card title="Exportar um Componente React" href="/pt-BR/enterprise/guides/react-component-export" icon="download">

View File

@@ -1,304 +0,0 @@
---
title: "Deploy Crew"
description: "Implantando um Crew na CrewAI AMP"
icon: "rocket"
mode: "wide"
---
<Note>
Depois de criar um crew localmente ou pelo Crew Studio, o próximo passo é
implantá-lo na plataforma CrewAI AMP. Este guia cobre múltiplos métodos de
implantação para ajudá-lo a escolher a melhor abordagem para o seu fluxo de
trabalho.
</Note>
## Pré-requisitos
<CardGroup cols={2}>
<Card title="Crew Pronto para Implantação" icon="users">
Você deve ter um crew funcional, criado localmente ou pelo Crew Studio
</Card>
<Card title="Repositório GitHub" icon="github">
O código do seu crew deve estar em um repositório do GitHub (para o método
de integração com GitHub)
</Card>
</CardGroup>
## Opção 1: Implantar Usando o CrewAI CLI
A CLI fornece a maneira mais rápida de implantar crews desenvolvidos localmente na plataforma Enterprise.
<Steps>
<Step title="Instale o CrewAI CLI">
Se ainda não tiver, instale o CrewAI CLI:
```bash
pip install crewai[tools]
```
<Tip>
A CLI vem com o pacote principal CrewAI, mas o extra `[tools]` garante todas as dependências de implantação.
</Tip>
</Step>
<Step title="Autentique-se na Plataforma Enterprise">
Primeiro, você precisa autenticar sua CLI com a plataforma CrewAI AMP:
```bash
# Se já possui uma conta CrewAI AMP, ou deseja criar uma:
crewai login
```
Ao executar qualquer um dos comandos, a CLI irá:
1. Exibir uma URL e um código de dispositivo único
2. Abrir seu navegador para a página de autenticação
3. Solicitar a confirmação do dispositivo
4. Completar o processo de autenticação
Após a autenticação bem-sucedida, você verá uma mensagem de confirmação no terminal!
</Step>
<Step title="Criar uma Implantação">
No diretório do seu projeto, execute:
```bash
crewai deploy create
```
Este comando irá:
1. Detectar informações do seu repositório GitHub
2. Identificar variáveis de ambiente no seu arquivo `.env` local
3. Transferir essas variáveis com segurança para a plataforma Enterprise
4. Criar uma nova implantação com um identificador único
Com a criação bem-sucedida, você verá uma mensagem como:
```shell
Deployment created successfully!
Name: your_project_name
Deployment ID: 01234567-89ab-cdef-0123-456789abcdef
Current Status: Deploy Enqueued
```
</Step>
<Step title="Acompanhe o Progresso da Implantação">
Acompanhe o status da implantação com:
```bash
crewai deploy status
```
Para ver logs detalhados do processo de build:
```bash
crewai deploy logs
```
<Tip>
A primeira implantação normalmente leva de 10 a 15 minutos, pois as imagens dos containers são construídas. As próximas implantações são bem mais rápidas.
</Tip>
</Step>
</Steps>
## Comandos Adicionais da CLI
O CrewAI CLI oferece vários comandos para gerenciar suas implantações:
```bash
# Liste todas as suas implantações
crewai deploy list
# Consulte o status de uma implantação
crewai deploy status
# Veja os logs da implantação
crewai deploy logs
# Envie atualizações após alterações no código
crewai deploy push
# Remova uma implantação
crewai deploy remove <deployment_id>
```
## Opção 2: Implantar Diretamente pela Interface Web
Você também pode implantar seus crews diretamente pela interface web da CrewAI AMP conectando sua conta do GitHub. Esta abordagem não requer utilizar a CLI na sua máquina local.
<Steps>
<Step title="Enviar no GitHub">
Você precisa subir seu crew para um repositório do GitHub. Caso ainda não tenha criado um crew, você pode [seguir este tutorial](/pt-BR/quickstart).
</Step>
<Step title="Conectando o GitHub ao CrewAI AMP">
1. Faça login em [CrewAI AMP](https://app.crewai.com)
2. Clique no botão "Connect GitHub"
<Frame>
![Botão Connect GitHub](/images/enterprise/connect-github.png)
</Frame>
</Step>
<Step title="Selecionar o Repositório">
Após conectar sua conta GitHub, você poderá selecionar qual repositório deseja implantar:
<Frame>
![Selecionar Repositório](/images/enterprise/select-repo.png)
</Frame>
</Step>
<Step title="Definir as Variáveis de Ambiente">
Antes de implantar, você precisará configurar as variáveis de ambiente para conectar ao seu provedor de LLM ou outros serviços:
1. Você pode adicionar variáveis individualmente ou em lote
2. Digite suas variáveis no formato `KEY=VALUE` (uma por linha)
<Frame>
![Definir Variáveis de Ambiente](/images/enterprise/set-env-variables.png)
</Frame>
</Step>
<Step title="Implante Seu Crew">
1. Clique no botão "Deploy" para iniciar o processo de implantação
2. Você pode monitorar o progresso pela barra de progresso
3. A primeira implantação geralmente demora de 10 a 15 minutos; as próximas serão mais rápidas
<Frame>
![Progresso da Implantação](/images/enterprise/deploy-progress.png)
</Frame>
Após a conclusão, você verá:
- A URL exclusiva do seu crew
- Um Bearer token para proteger sua API crew
- Um botão "Delete" caso precise remover a implantação
</Step>
</Steps>
## ⚠️ Requisitos de Segurança para Variáveis de Ambiente
<Warning>
**Importante**: A CrewAI AMP possui restrições de segurança sobre os nomes de
variáveis de ambiente que podem causar falha na implantação caso não sejam
seguidas.
</Warning>
### Padrões de Variáveis de Ambiente Bloqueados
Por motivos de segurança, os seguintes padrões de nome de variável de ambiente são **automaticamente filtrados** e causarão problemas de implantação:
**Padrões Bloqueados:**
- Variáveis terminando em `_TOKEN` (ex: `MY_API_TOKEN`)
- Variáveis terminando em `_PASSWORD` (ex: `DB_PASSWORD`)
- Variáveis terminando em `_SECRET` (ex: `API_SECRET`)
- Variáveis terminando em `_KEY` em certos contextos
**Variáveis Bloqueadas Específicas:**
- `GITHUB_USER`, `GITHUB_TOKEN`
- `AWS_REGION`, `AWS_DEFAULT_REGION`
- Diversas variáveis internas do sistema CrewAI
### Exceções Permitidas
Algumas variáveis são explicitamente permitidas mesmo coincidindo com os padrões bloqueados:
- `AZURE_AD_TOKEN`
- `AZURE_OPENAI_AD_TOKEN`
- `ENTERPRISE_ACTION_TOKEN`
- `CREWAI_ENTEPRISE_TOOLS_TOKEN`
### Como Corrigir Problemas de Nomeação
Se sua implantação falhar devido a restrições de variáveis de ambiente:
```bash
# ❌ Estas irão causar falhas na implantação
OPENAI_TOKEN=sk-...
DATABASE_PASSWORD=mysenha
API_SECRET=segredo123
# ✅ Utilize estes padrões de nomeação
OPENAI_API_KEY=sk-...
DATABASE_CREDENTIALS=mysenha
API_CONFIG=segredo123
```
### Melhores Práticas
1. **Use convenções padrão de nomenclatura**: `PROVIDER_API_KEY` em vez de `PROVIDER_TOKEN`
2. **Teste localmente primeiro**: Certifique-se de que seu crew funciona com as variáveis renomeadas
3. **Atualize seu código**: Altere todas as referências aos nomes antigos das variáveis
4. **Documente as mudanças**: Mantenha registro das variáveis renomeadas para seu time
<Tip>
Se você se deparar com falhas de implantação com erros enigmáticos de
variáveis de ambiente, confira primeiro os nomes das variáveis em relação a
esses padrões.
</Tip>
### Interaja com Seu Crew Implantado
Após a implantação, você pode acessar seu crew por meio de:
1. **REST API**: A plataforma gera um endpoint HTTPS exclusivo com estas rotas principais:
- `/inputs`: Lista os parâmetros de entrada requeridos
- `/kickoff`: Inicia uma execução com os inputs fornecidos
- `/status/{kickoff_id}`: Consulta o status da execução
2. **Interface Web**: Acesse [app.crewai.com](https://app.crewai.com) para visualizar:
- **Aba Status**: Informações da implantação, detalhes do endpoint da API e token de autenticação
- **Aba Run**: Visualização da estrutura do seu crew
- **Aba Executions**: Histórico de todas as execuções
- **Aba Metrics**: Análises de desempenho
- **Aba Traces**: Insights detalhados das execuções
### Dispare uma Execução
No dashboard Enterprise, você pode:
1. Clicar no nome do seu crew para abrir seus detalhes
2. Selecionar "Trigger Crew" na interface de gerenciamento
3. Inserir os inputs necessários no modal exibido
4. Monitorar o progresso à medida que a execução avança pelo pipeline
### Monitoramento e Análises
A plataforma Enterprise oferece recursos abrangentes de observabilidade:
- **Gestão das Execuções**: Acompanhe execuções ativas e concluídas
- **Traces**: Quebra detalhada de cada execução
- **Métricas**: Uso de tokens, tempos de execução e custos
- **Visualização em Linha do Tempo**: Representação visual das sequências de tarefas
### Funcionalidades Avançadas
A plataforma Enterprise também oferece:
- **Gerenciamento de Variáveis de Ambiente**: Armazene e gerencie com segurança as chaves de API
- **Conexões com LLM**: Configure integrações com diversos provedores de LLM
- **Repositório Custom Tools**: Crie, compartilhe e instale ferramentas
- **Crew Studio**: Monte crews via interface de chat sem escrever código
<Card title="Precisa de Ajuda?" icon="headset" href="mailto:support@crewai.com">
Entre em contato com nossa equipe de suporte para ajuda com questões de
implantação ou dúvidas sobre a plataforma Enterprise.
</Card>

View File

@@ -0,0 +1,439 @@
---
title: "Deploy para AMP"
description: "Implante seu Crew ou Flow no CrewAI AMP"
icon: "rocket"
mode: "wide"
---
<Note>
Depois de criar um Crew ou Flow localmente (ou pelo Crew Studio), o próximo passo é
implantá-lo na plataforma CrewAI AMP. Este guia cobre múltiplos métodos de
implantação para ajudá-lo a escolher a melhor abordagem para o seu fluxo de trabalho.
</Note>
## Pré-requisitos
<CardGroup cols={2}>
<Card title="Projeto Pronto para Implantação" icon="check-circle">
Você deve ter um Crew ou Flow funcionando localmente com sucesso.
Siga nosso [guia de preparação](/pt-BR/enterprise/guides/prepare-for-deployment) para verificar a estrutura do seu projeto.
</Card>
<Card title="Repositório GitHub" icon="github">
Seu código deve estar em um repositório do GitHub (para o método de integração com GitHub).
</Card>
</CardGroup>
<Info>
**Crews vs Flows**: Ambos os tipos de projeto podem ser implantados como "automações" no CrewAI AMP.
O processo de implantação é o mesmo, mas eles têm estruturas de projeto diferentes.
Veja [Preparar para Implantação](/pt-BR/enterprise/guides/prepare-for-deployment) para detalhes.
</Info>
## Opção 1: Implantar Usando o CrewAI CLI
A CLI fornece a maneira mais rápida de implantar Crews ou Flows desenvolvidos localmente na plataforma AMP.
A CLI detecta automaticamente o tipo do seu projeto a partir do `pyproject.toml` e faz o build adequadamente.
<Steps>
<Step title="Instale o CrewAI CLI">
Se ainda não tiver, instale o CrewAI CLI:
```bash
pip install crewai[tools]
```
<Tip>
A CLI vem com o pacote principal CrewAI, mas o extra `[tools]` garante todas as dependências de implantação.
</Tip>
</Step>
<Step title="Autentique-se na Plataforma Enterprise">
Primeiro, você precisa autenticar sua CLI com a plataforma CrewAI AMP:
```bash
# Se já possui uma conta CrewAI AMP, ou deseja criar uma:
crewai login
```
Ao executar qualquer um dos comandos, a CLI irá:
1. Exibir uma URL e um código de dispositivo único
2. Abrir seu navegador para a página de autenticação
3. Solicitar a confirmação do dispositivo
4. Completar o processo de autenticação
Após a autenticação bem-sucedida, você verá uma mensagem de confirmação no terminal!
</Step>
<Step title="Criar uma Implantação">
No diretório do seu projeto, execute:
```bash
crewai deploy create
```
Este comando irá:
1. Detectar informações do seu repositório GitHub
2. Identificar variáveis de ambiente no seu arquivo `.env` local
3. Transferir essas variáveis com segurança para a plataforma Enterprise
4. Criar uma nova implantação com um identificador único
Com a criação bem-sucedida, você verá uma mensagem como:
```shell
Deployment created successfully!
Name: your_project_name
Deployment ID: 01234567-89ab-cdef-0123-456789abcdef
Current Status: Deploy Enqueued
```
</Step>
<Step title="Acompanhe o Progresso da Implantação">
Acompanhe o status da implantação com:
```bash
crewai deploy status
```
Para ver logs detalhados do processo de build:
```bash
crewai deploy logs
```
<Tip>
A primeira implantação normalmente leva de 10 a 15 minutos, pois as imagens dos containers são construídas. As próximas implantações são bem mais rápidas.
</Tip>
</Step>
</Steps>
## Comandos Adicionais da CLI
O CrewAI CLI oferece vários comandos para gerenciar suas implantações:
```bash
# Liste todas as suas implantações
crewai deploy list
# Consulte o status de uma implantação
crewai deploy status
# Veja os logs da implantação
crewai deploy logs
# Envie atualizações após alterações no código
crewai deploy push
# Remova uma implantação
crewai deploy remove <deployment_id>
```
## Opção 2: Implantar Diretamente pela Interface Web
Você também pode implantar seus Crews ou Flows diretamente pela interface web do CrewAI AMP conectando sua conta do GitHub. Esta abordagem não requer utilizar a CLI na sua máquina local. A plataforma detecta automaticamente o tipo do seu projeto e trata o build adequadamente.
<Steps>
<Step title="Enviar para o GitHub">
Você precisa enviar seu crew para um repositório do GitHub. Caso ainda não tenha criado um crew, você pode [seguir este tutorial](/pt-BR/quickstart).
</Step>
<Step title="Conectando o GitHub ao CrewAI AMP">
1. Faça login em [CrewAI AMP](https://app.crewai.com)
2. Clique no botão "Connect GitHub"
<Frame>
![Botão Connect GitHub](/images/enterprise/connect-github.png)
</Frame>
</Step>
<Step title="Selecionar o Repositório">
Após conectar sua conta GitHub, você poderá selecionar qual repositório deseja implantar:
<Frame>
![Selecionar Repositório](/images/enterprise/select-repo.png)
</Frame>
</Step>
<Step title="Definir as Variáveis de Ambiente">
Antes de implantar, você precisará configurar as variáveis de ambiente para conectar ao seu provedor de LLM ou outros serviços:
1. Você pode adicionar variáveis individualmente ou em lote
2. Digite suas variáveis no formato `KEY=VALUE` (uma por linha)
<Frame>
![Definir Variáveis de Ambiente](/images/enterprise/set-env-variables.png)
</Frame>
</Step>
<Step title="Implante Seu Crew">
1. Clique no botão "Deploy" para iniciar o processo de implantação
2. Você pode monitorar o progresso pela barra de progresso
3. A primeira implantação geralmente demora de 10 a 15 minutos; as próximas serão mais rápidas
<Frame>
![Progresso da Implantação](/images/enterprise/deploy-progress.png)
</Frame>
Após a conclusão, você verá:
- A URL exclusiva do seu crew
- Um Bearer token para proteger sua API crew
- Um botão "Delete" caso precise remover a implantação
</Step>
</Steps>
## Opção 3: Reimplantar Usando API (Integração CI/CD)
Para implantações automatizadas em pipelines CI/CD, você pode usar a API do CrewAI para acionar reimplantações de crews existentes. Isso é particularmente útil para GitHub Actions, Jenkins ou outros workflows de automação.
<Steps>
<Step title="Obtenha Seu Token de Acesso Pessoal">
Navegue até as configurações da sua conta CrewAI AMP para gerar um token de API:
1. Acesse [app.crewai.com](https://app.crewai.com)
2. Clique em **Settings** → **Account** → **Personal Access Token**
3. Gere um novo token e copie-o com segurança
4. Armazene este token como um secret no seu sistema CI/CD
</Step>
<Step title="Encontre o UUID da Sua Automação">
Localize o identificador único do seu crew implantado:
1. Acesse **Automations** no seu dashboard CrewAI AMP
2. Selecione sua automação/crew existente
3. Clique em **Additional Details**
4. Copie o **UUID** - este identifica sua implantação específica do crew
</Step>
<Step title="Acione a Reimplantação via API">
Use o endpoint da API de Deploy para acionar uma reimplantação:
```bash
curl -i -X POST \
-H "Authorization: Bearer YOUR_PERSONAL_ACCESS_TOKEN" \
https://app.crewai.com/crewai_plus/api/v1/crews/YOUR-AUTOMATION-UUID/deploy
# HTTP/2 200
# content-type: application/json
#
# {
# "uuid": "your-automation-uuid",
# "status": "Deploy Enqueued",
# "public_url": "https://your-crew-deployment.crewai.com",
# "token": "your-bearer-token"
# }
```
<Info>
Se sua automação foi criada originalmente conectada ao Git, a API automaticamente puxará as últimas alterações do seu repositório antes de reimplantar.
</Info>
</Step>
<Step title="Exemplo de Integração com GitHub Actions">
Aqui está um workflow do GitHub Actions com gatilhos de implantação mais complexos:
```yaml
name: Deploy CrewAI Automation
on:
push:
branches: [ main ]
pull_request:
types: [ labeled ]
release:
types: [ published ]
jobs:
deploy:
runs-on: ubuntu-latest
if: |
(github.event_name == 'push' && github.ref == 'refs/heads/main') ||
(github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'deploy')) ||
(github.event_name == 'release')
steps:
- name: Trigger CrewAI Redeployment
run: |
curl -X POST \
-H "Authorization: Bearer ${{ secrets.CREWAI_PAT }}" \
https://app.crewai.com/crewai_plus/api/v1/crews/${{ secrets.CREWAI_AUTOMATION_UUID }}/deploy
```
<Tip>
Adicione `CREWAI_PAT` e `CREWAI_AUTOMATION_UUID` como secrets do repositório. Para implantações de PR, adicione um label "deploy" para acionar o workflow.
</Tip>
</Step>
</Steps>
## Interaja com Sua Automação Implantada
Após a implantação, você pode acessar seu crew através de:
1. **REST API**: A plataforma gera um endpoint HTTPS exclusivo com estas rotas principais:
- `/inputs`: Lista os parâmetros de entrada requeridos
- `/kickoff`: Inicia uma execução com os inputs fornecidos
- `/status/{kickoff_id}`: Consulta o status da execução
2. **Interface Web**: Acesse [app.crewai.com](https://app.crewai.com) para visualizar:
- **Aba Status**: Informações da implantação, detalhes do endpoint da API e token de autenticação
- **Aba Run**: Visualização da estrutura do seu crew
- **Aba Executions**: Histórico de todas as execuções
- **Aba Metrics**: Análises de desempenho
- **Aba Traces**: Insights detalhados das execuções
### Dispare uma Execução
No dashboard Enterprise, você pode:
1. Clicar no nome do seu crew para abrir seus detalhes
2. Selecionar "Trigger Crew" na interface de gerenciamento
3. Inserir os inputs necessários no modal exibido
4. Monitorar o progresso à medida que a execução avança pelo pipeline
### Monitoramento e Análises
A plataforma Enterprise oferece recursos abrangentes de observabilidade:
- **Gestão das Execuções**: Acompanhe execuções ativas e concluídas
- **Traces**: Quebra detalhada de cada execução
- **Métricas**: Uso de tokens, tempos de execução e custos
- **Visualização em Linha do Tempo**: Representação visual das sequências de tarefas
### Funcionalidades Avançadas
A plataforma Enterprise também oferece:
- **Gerenciamento de Variáveis de Ambiente**: Armazene e gerencie com segurança as chaves de API
- **Conexões com LLM**: Configure integrações com diversos provedores de LLM
- **Repositório Custom Tools**: Crie, compartilhe e instale ferramentas
- **Crew Studio**: Monte crews via interface de chat sem escrever código
## Solução de Problemas em Falhas de Implantação
Se sua implantação falhar, verifique estes problemas comuns:
### Falhas de Build
#### Arquivo uv.lock Ausente
**Sintoma**: Build falha no início com erros de resolução de dependências
**Solução**: Gere e faça commit do arquivo lock:
```bash
uv lock
git add uv.lock
git commit -m "Add uv.lock for deployment"
git push
```
<Warning>
O arquivo `uv.lock` é obrigatório para todas as implantações. Sem ele, a plataforma
não consegue instalar suas dependências de forma confiável.
</Warning>
#### Estrutura de Projeto Incorreta
**Sintoma**: Erros "Could not find entry point" ou "Module not found"
**Solução**: Verifique se seu projeto corresponde à estrutura esperada:
- **Tanto Crews quanto Flows**: Devem ter ponto de entrada em `src/project_name/main.py`
- **Crews**: Usam uma função `run()` como ponto de entrada
- **Flows**: Usam uma função `kickoff()` como ponto de entrada
Veja [Preparar para Implantação](/pt-BR/enterprise/guides/prepare-for-deployment) para diagramas de estrutura detalhados.
#### Decorador CrewBase Ausente
**Sintoma**: Erros "Crew not found", "Config not found" ou erros de configuração de agent/task
**Solução**: Certifique-se de que **todas** as classes crew usam o decorador `@CrewBase`:
```python
from crewai.project import CrewBase, agent, crew, task
@CrewBase # Este decorador é OBRIGATÓRIO
class YourCrew():
"""Descrição do seu crew"""
@agent
def my_agent(self) -> Agent:
return Agent(
config=self.agents_config['my_agent'], # type: ignore[index]
verbose=True
)
# ... resto da definição do crew
```
<Info>
Isso se aplica a Crews independentes E crews embutidos dentro de projetos Flow.
Toda classe crew precisa do decorador.
</Info>
#### Tipo Incorreto no pyproject.toml
**Sintoma**: Build tem sucesso mas falha em runtime, ou comportamento inesperado
**Solução**: Verifique se a seção `[tool.crewai]` corresponde ao tipo do seu projeto:
```toml
# Para projetos Crew:
[tool.crewai]
type = "crew"
# Para projetos Flow:
[tool.crewai]
type = "flow"
```
### Falhas de Runtime
#### Falhas de Conexão com LLM
**Sintoma**: Erros de chave API, "model not found" ou falhas de autenticação
**Solução**:
1. Verifique se a chave API do seu provedor LLM está corretamente definida nas variáveis de ambiente
2. Certifique-se de que os nomes das variáveis de ambiente correspondem ao que seu código espera
3. Teste localmente com exatamente as mesmas variáveis de ambiente antes de implantar
#### Erros de Execução do Crew
**Sintoma**: Crew inicia mas falha durante a execução
**Solução**:
1. Verifique os logs de execução no dashboard AMP (aba Traces)
2. Verifique se todas as ferramentas têm as chaves API necessárias configuradas
3. Certifique-se de que as configurações de agents em `agents.yaml` são válidas
4. Verifique se há erros de sintaxe nas configurações de tasks em `tasks.yaml`
<Card title="Precisa de Ajuda?" icon="headset" href="mailto:support@crewai.com">
Entre em contato com nossa equipe de suporte para ajuda com questões de
implantação ou dúvidas sobre a plataforma AMP.
</Card>

View File

@@ -0,0 +1,305 @@
---
title: "Preparar para Implantação"
description: "Certifique-se de que seu Crew ou Flow está pronto para implantação no CrewAI AMP"
icon: "clipboard-check"
mode: "wide"
---
<Note>
Antes de implantar no CrewAI AMP, é crucial verificar se seu projeto está estruturado corretamente.
Tanto Crews quanto Flows podem ser implantados como "automações", mas eles têm estruturas de projeto
e requisitos diferentes que devem ser atendidos para uma implantação bem-sucedida.
</Note>
## Entendendo Automações
No CrewAI AMP, **automações** é o termo geral para projetos de IA Agêntica implantáveis. Uma automação pode ser:
- **Um Crew**: Uma equipe independente de agentes de IA trabalhando juntos em tarefas
- **Um Flow**: Um workflow orquestrado que pode combinar múltiplos crews, chamadas diretas de LLM e lógica procedural
Entender qual tipo você está implantando é essencial porque eles têm estruturas de projeto e pontos de entrada diferentes.
## Crews vs Flows: Principais Diferenças
<CardGroup cols={2}>
<Card title="Projetos Crew" icon="users">
Equipes de agentes de IA independentes com `crew.py` definindo agentes e tarefas. Ideal para tarefas focadas e colaborativas.
</Card>
<Card title="Projetos Flow" icon="diagram-project">
Workflows orquestrados com crews embutidos em uma pasta `crews/`. Ideal para processos complexos de múltiplas etapas.
</Card>
</CardGroup>
| Aspecto | Crew | Flow |
|---------|------|------|
| **Estrutura do projeto** | `src/project_name/` com `crew.py` | `src/project_name/` com pasta `crews/` |
| **Localização da lógica principal** | `src/project_name/crew.py` | `src/project_name/main.py` (classe Flow) |
| **Função de ponto de entrada** | `run()` em `main.py` | `kickoff()` em `main.py` |
| **Tipo no pyproject.toml** | `type = "crew"` | `type = "flow"` |
| **Comando CLI de criação** | `crewai create crew name` | `crewai create flow name` |
| **Localização da configuração** | `src/project_name/config/` | `src/project_name/crews/crew_name/config/` |
| **Pode conter outros crews** | Não | Sim (na pasta `crews/`) |
## Referência de Estrutura de Projeto
### Estrutura de Projeto Crew
Quando você executa `crewai create crew my_crew`, você obtém esta estrutura:
```
my_crew/
├── .gitignore
├── pyproject.toml # Deve ter type = "crew"
├── README.md
├── .env
├── uv.lock # OBRIGATÓRIO para implantação
└── src/
└── my_crew/
├── __init__.py
├── main.py # Ponto de entrada com função run()
├── crew.py # Classe Crew com decorador @CrewBase
├── tools/
│ ├── custom_tool.py
│ └── __init__.py
└── config/
├── agents.yaml # Definições de agentes
└── tasks.yaml # Definições de tarefas
```
<Warning>
A estrutura aninhada `src/project_name/` é crítica para Crews.
Colocar arquivos no nível errado causará falhas na implantação.
</Warning>
### Estrutura de Projeto Flow
Quando você executa `crewai create flow my_flow`, você obtém esta estrutura:
```
my_flow/
├── .gitignore
├── pyproject.toml # Deve ter type = "flow"
├── README.md
├── .env
├── uv.lock # OBRIGATÓRIO para implantação
└── src/
└── my_flow/
├── __init__.py
├── main.py # Ponto de entrada com função kickoff() + classe Flow
├── crews/ # Pasta de crews embutidos
│ └── poem_crew/
│ ├── __init__.py
│ ├── poem_crew.py # Crew com decorador @CrewBase
│ └── config/
│ ├── agents.yaml
│ └── tasks.yaml
└── tools/
├── __init__.py
└── custom_tool.py
```
<Info>
Tanto Crews quanto Flows usam a estrutura `src/project_name/`.
A diferença chave é que Flows têm uma pasta `crews/` para crews embutidos,
enquanto Crews têm `crew.py` diretamente na pasta do projeto.
</Info>
## Checklist Pré-Implantação
Use este checklist para verificar se seu projeto está pronto para implantação.
### 1. Verificar Configuração do pyproject.toml
Seu `pyproject.toml` deve incluir a seção `[tool.crewai]` correta:
<Tabs>
<Tab title="Para Crews">
```toml
[tool.crewai]
type = "crew"
```
</Tab>
<Tab title="Para Flows">
```toml
[tool.crewai]
type = "flow"
```
</Tab>
</Tabs>
<Warning>
Se o `type` não corresponder à estrutura do seu projeto, o build falhará ou
a automação não funcionará corretamente.
</Warning>
### 2. Garantir que o Arquivo uv.lock Existe
CrewAI usa `uv` para gerenciamento de dependências. O arquivo `uv.lock` garante builds reproduzíveis e é **obrigatório** para implantação.
```bash
# Gerar ou atualizar o arquivo lock
uv lock
# Verificar se existe
ls -la uv.lock
```
Se o arquivo não existir, execute `uv lock` e faça commit no seu repositório:
```bash
uv lock
git add uv.lock
git commit -m "Add uv.lock for deployment"
git push
```
### 3. Validar Uso do Decorador CrewBase
**Toda classe crew deve usar o decorador `@CrewBase`.** Isso se aplica a:
- Projetos crew independentes
- Crews embutidos dentro de projetos Flow
```python
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
@CrewBase # Este decorador é OBRIGATÓRIO
class MyCrew():
"""Descrição do meu crew"""
agents: List[BaseAgent]
tasks: List[Task]
@agent
def my_agent(self) -> Agent:
return Agent(
config=self.agents_config['my_agent'], # type: ignore[index]
verbose=True
)
@task
def my_task(self) -> Task:
return Task(
config=self.tasks_config['my_task'] # type: ignore[index]
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
verbose=True,
)
```
<Warning>
Se você esquecer o decorador `@CrewBase`, sua implantação falhará com
erros sobre configurações de agents ou tasks ausentes.
</Warning>
### 4. Verificar Pontos de Entrada do Projeto
Tanto Crews quanto Flows têm seu ponto de entrada em `src/project_name/main.py`:
<Tabs>
<Tab title="Para Crews">
O ponto de entrada usa uma função `run()`:
```python
# src/my_crew/main.py
from my_crew.crew import MyCrew
def run():
"""Executa o crew."""
inputs = {'topic': 'AI in Healthcare'}
result = MyCrew().crew().kickoff(inputs=inputs)
return result
if __name__ == "__main__":
run()
```
</Tab>
<Tab title="Para Flows">
O ponto de entrada usa uma função `kickoff()` com uma classe Flow:
```python
# src/my_flow/main.py
from crewai.flow import Flow, listen, start
from my_flow.crews.poem_crew.poem_crew import PoemCrew
class MyFlow(Flow):
@start()
def begin(self):
# Lógica do Flow aqui
result = PoemCrew().crew().kickoff(inputs={...})
return result
def kickoff():
"""Executa o flow."""
MyFlow().kickoff()
if __name__ == "__main__":
kickoff()
```
</Tab>
</Tabs>
### 5. Preparar Variáveis de Ambiente
Antes da implantação, certifique-se de ter:
1. **Chaves de API de LLM** prontas (OpenAI, Anthropic, Google, etc.)
2. **Chaves de API de ferramentas** se estiver usando ferramentas externas (Serper, etc.)
<Tip>
Teste seu projeto localmente com as mesmas variáveis de ambiente antes de implantar
para detectar problemas de configuração antecipadamente.
</Tip>
## Comandos de Validação Rápida
Execute estes comandos a partir da raiz do seu projeto para verificar rapidamente sua configuração:
```bash
# 1. Verificar tipo do projeto no pyproject.toml
grep -A2 "\[tool.crewai\]" pyproject.toml
# 2. Verificar se uv.lock existe
ls -la uv.lock || echo "ERRO: uv.lock ausente! Execute 'uv lock'"
# 3. Verificar se estrutura src/ existe
ls -la src/*/main.py 2>/dev/null || echo "Nenhum main.py encontrado em src/"
# 4. Para Crews - verificar se crew.py existe
ls -la src/*/crew.py 2>/dev/null || echo "Nenhum crew.py (esperado para Crews)"
# 5. Para Flows - verificar se pasta crews/ existe
ls -la src/*/crews/ 2>/dev/null || echo "Nenhuma pasta crews/ (esperado para Flows)"
# 6. Verificar uso do CrewBase
grep -r "@CrewBase" . --include="*.py"
```
## Erros Comuns de Configuração
| Erro | Sintoma | Correção |
|------|---------|----------|
| `uv.lock` ausente | Build falha durante resolução de dependências | Execute `uv lock` e faça commit |
| `type` errado no pyproject.toml | Build bem-sucedido mas falha em runtime | Altere para o tipo correto |
| Decorador `@CrewBase` ausente | Erros "Config not found" | Adicione decorador a todas as classes crew |
| Arquivos na raiz ao invés de `src/` | Ponto de entrada não encontrado | Mova para `src/project_name/` |
| `run()` ou `kickoff()` ausente | Não é possível iniciar automação | Adicione a função de entrada correta |
## Próximos Passos
Uma vez que seu projeto passar por todos os itens do checklist, você está pronto para implantar:
<Card title="Deploy para AMP" icon="rocket" href="/pt-BR/enterprise/guides/deploy-to-amp">
Siga o guia de implantação para implantar seu Crew ou Flow no CrewAI AMP usando
a CLI, interface web ou integração CI/CD.
</Card>

View File

@@ -82,7 +82,7 @@ CrewAI AMP expande o poder do framework open-source com funcionalidades projetad
<Card
title="Implantar Crew"
icon="rocket"
href="/pt-BR/enterprise/guides/deploy-crew"
href="/pt-BR/enterprise/guides/deploy-to-amp"
>
Implantar Crew
</Card>
@@ -92,11 +92,11 @@ CrewAI AMP expande o poder do framework open-source com funcionalidades projetad
<Card
title="Acesso via API"
icon="code"
href="/pt-BR/enterprise/guides/deploy-crew"
href="/pt-BR/enterprise/guides/kickoff-crew"
>
Usar a API do Crew
</Card>
</Step>
</Steps>
Para instruções detalhadas, consulte nosso [guia de implantação](/pt-BR/enterprise/guides/deploy-crew) ou clique no botão abaixo para começar.
Para instruções detalhadas, consulte nosso [guia de implantação](/pt-BR/enterprise/guides/deploy-to-amp) ou clique no botão abaixo para começar.

View File

@@ -12,7 +12,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.8.0",
"crewai==1.8.1",
"lancedb~=0.5.4",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",

View File

@@ -291,4 +291,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.8.0"
__version__ = "1.8.1"

View File

@@ -49,7 +49,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.8.0",
"crewai-tools==1.8.1",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.8.0"
__version__ = "1.8.1"
_telemetry_submitted = False

View File

@@ -1,8 +1,10 @@
"""Agent-to-Agent (A2A) protocol communication module for CrewAI."""
from crewai.a2a.config import A2AConfig
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
__all__ = [
"A2AClientConfig",
"A2AConfig",
"A2AServerConfig",
]

View File

@@ -5,45 +5,57 @@ This module is separate from experimental.a2a to avoid circular imports.
from __future__ import annotations
from typing import Annotated, Any, ClassVar, Literal
from importlib.metadata import version
from typing import Any, ClassVar, Literal
from pydantic import (
BaseModel,
BeforeValidator,
ConfigDict,
Field,
HttpUrl,
TypeAdapter,
)
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import deprecated
from crewai.a2a.auth.schemas import AuthScheme
from crewai.a2a.types import TransportType, Url
try:
from a2a.types import (
AgentCapabilities,
AgentCardSignature,
AgentInterface,
AgentProvider,
AgentSkill,
SecurityScheme,
)
from crewai.a2a.updates import UpdateConfig
except ImportError:
UpdateConfig = Any
AgentCapabilities = Any
AgentCardSignature = Any
AgentInterface = Any
AgentProvider = Any
SecurityScheme = Any
AgentSkill = Any
UpdateConfig = Any # type: ignore[misc,assignment]
http_url_adapter = TypeAdapter(HttpUrl)
Url = Annotated[
str,
BeforeValidator(
lambda value: str(http_url_adapter.validate_python(value, strict=True))
),
]
def _get_default_update_config() -> UpdateConfig:
from crewai.a2a.updates import StreamingConfig
return StreamingConfig()
@deprecated(
"""
`crewai.a2a.config.A2AConfig` is deprecated and will be removed in v2.0.0,
use `crewai.a2a.config.A2AClientConfig` or `crewai.a2a.config.A2AServerConfig` instead.
""",
category=FutureWarning,
)
class A2AConfig(BaseModel):
"""Configuration for A2A protocol integration.
Deprecated:
Use A2AClientConfig instead. This class will be removed in a future version.
Attributes:
endpoint: A2A agent endpoint URL.
auth: Authentication scheme.
@@ -87,3 +99,176 @@ class A2AConfig(BaseModel):
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)
class A2AClientConfig(BaseModel):
"""Configuration for connecting to remote A2A agents.
Attributes:
endpoint: A2A agent endpoint URL.
auth: Authentication scheme.
timeout: Request timeout in seconds.
max_turns: Maximum conversation turns with A2A agent.
response_model: Optional Pydantic model for structured A2A agent responses.
fail_fast: If True, raise error when agent unreachable; if False, skip and continue.
trust_remote_completion_status: If True, return A2A agent's result directly when completed.
updates: Update mechanism config.
accepted_output_modes: Media types the client can accept in responses.
supported_transports: Ordered list of transport protocols the client supports.
use_client_preference: Whether to prioritize client transport preferences over server.
extensions: Extension URIs the client supports.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
endpoint: Url = Field(description="A2A agent endpoint URL")
auth: AuthScheme | None = Field(
default=None,
description="Authentication scheme",
)
timeout: int = Field(default=120, description="Request timeout in seconds")
max_turns: int = Field(
default=10, description="Maximum conversation turns with A2A agent"
)
response_model: type[BaseModel] | None = Field(
default=None,
description="Optional Pydantic model for structured A2A agent responses",
)
fail_fast: bool = Field(
default=True,
description="If True, raise error when agent unreachable; if False, skip",
)
trust_remote_completion_status: bool = Field(
default=False,
description="If True, return A2A result directly when completed",
)
updates: UpdateConfig = Field(
default_factory=_get_default_update_config,
description="Update mechanism config",
)
accepted_output_modes: list[str] = Field(
default_factory=lambda: ["application/json"],
description="Media types the client can accept in responses",
)
supported_transports: list[str] = Field(
default_factory=lambda: ["JSONRPC"],
description="Ordered list of transport protocols the client supports",
)
use_client_preference: bool = Field(
default=False,
description="Whether to prioritize client transport preferences over server",
)
extensions: list[str] = Field(
default_factory=list,
description="Extension URIs the client supports",
)
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"] = Field(
default="JSONRPC",
description="Specified mode of A2A transport protocol",
)
class A2AServerConfig(BaseModel):
"""Configuration for exposing a Crew or Agent as an A2A server.
All fields correspond to A2A AgentCard fields. Fields like name, description,
and skills can be auto-derived from the Crew/Agent if not provided.
Attributes:
name: Human-readable name for the agent.
description: Human-readable description of the agent.
version: Version string for the agent card.
skills: List of agent skills/capabilities.
default_input_modes: Default supported input MIME types.
default_output_modes: Default supported output MIME types.
capabilities: Declaration of optional capabilities.
preferred_transport: Transport protocol for the preferred endpoint.
protocol_version: A2A protocol version this agent supports.
provider: Information about the agent's service provider.
documentation_url: URL to the agent's documentation.
icon_url: URL to an icon for the agent.
additional_interfaces: Additional supported interfaces.
security: Security requirement objects for all interactions.
security_schemes: Security schemes available to authorize requests.
supports_authenticated_extended_card: Whether agent provides extended card to authenticated users.
url: Preferred endpoint URL for the agent.
signatures: JSON Web Signatures for the AgentCard.
"""
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
name: str | None = Field(
default=None,
description="Human-readable name for the agent. Auto-derived from Crew/Agent if not provided.",
)
description: str | None = Field(
default=None,
description="Human-readable description of the agent. Auto-derived from Crew/Agent if not provided.",
)
version: str = Field(
default="1.0.0",
description="Version string for the agent card",
)
skills: list[AgentSkill] = Field(
default_factory=list,
description="List of agent skills. Auto-derived from tasks/tools if not provided.",
)
default_input_modes: list[str] = Field(
default_factory=lambda: ["text/plain", "application/json"],
description="Default supported input MIME types",
)
default_output_modes: list[str] = Field(
default_factory=lambda: ["text/plain", "application/json"],
description="Default supported output MIME types",
)
capabilities: AgentCapabilities = Field(
default_factory=lambda: AgentCapabilities(
streaming=True,
push_notifications=False,
),
description="Declaration of optional capabilities supported by the agent",
)
preferred_transport: TransportType = Field(
default="JSONRPC",
description="Transport protocol for the preferred endpoint",
)
protocol_version: str = Field(
default_factory=lambda: version("a2a-sdk"),
description="A2A protocol version this agent supports",
)
provider: AgentProvider | None = Field(
default=None,
description="Information about the agent's service provider",
)
documentation_url: Url | None = Field(
default=None,
description="URL to the agent's documentation",
)
icon_url: Url | None = Field(
default=None,
description="URL to an icon for the agent",
)
additional_interfaces: list[AgentInterface] = Field(
default_factory=list,
description="Additional supported interfaces (transport and URL combinations)",
)
security: list[dict[str, list[str]]] = Field(
default_factory=list,
description="Security requirement objects for all agent interactions",
)
security_schemes: dict[str, SecurityScheme] = Field(
default_factory=dict,
description="Security schemes available to authorize requests",
)
supports_authenticated_extended_card: bool = Field(
default=False,
description="Whether agent provides extended card to authenticated users",
)
url: Url | None = Field(
default=None,
description="Preferred endpoint URL for the agent. Set at runtime if not provided.",
)
signatures: list[AgentCardSignature] = Field(
default_factory=list,
description="JSON Web Signatures for the AgentCard",
)

View File

@@ -1,7 +1,17 @@
"""Type definitions for A2A protocol message parts."""
from typing import Any, Literal, Protocol, TypedDict, runtime_checkable
from __future__ import annotations
from typing import (
Annotated,
Any,
Literal,
Protocol,
TypedDict,
runtime_checkable,
)
from pydantic import BeforeValidator, HttpUrl, TypeAdapter
from typing_extensions import NotRequired
from crewai.a2a.updates import (
@@ -15,6 +25,18 @@ from crewai.a2a.updates import (
)
TransportType = Literal["JSONRPC", "GRPC", "HTTP+JSON"]
http_url_adapter: TypeAdapter[HttpUrl] = TypeAdapter(HttpUrl)
Url = Annotated[
str,
BeforeValidator(
lambda value: str(http_url_adapter.validate_python(value, strict=True))
),
]
@runtime_checkable
class AgentResponseProtocol(Protocol):
"""Protocol for the dynamically created AgentResponse model."""

View File

@@ -0,0 +1 @@
"""A2A utility modules for client operations."""

View File

@@ -0,0 +1,399 @@
"""AgentCard utilities for A2A client and server operations."""
from __future__ import annotations
import asyncio
from collections.abc import MutableMapping
from functools import lru_cache
import time
from types import MethodType
from typing import TYPE_CHECKING
from a2a.client.errors import A2AClientHTTPError
from a2a.types import AgentCapabilities, AgentCard, AgentSkill
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
import httpx
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
from crewai.a2a.auth.utils import (
_auth_store,
configure_auth_client,
retry_on_401,
)
from crewai.a2a.config import A2AServerConfig
from crewai.crew import Crew
if TYPE_CHECKING:
from crewai.a2a.auth.schemas import AuthScheme
from crewai.agent import Agent
from crewai.task import Task
def _get_server_config(agent: Agent) -> A2AServerConfig | None:
"""Get A2AServerConfig from an agent's a2a configuration.
Args:
agent: The Agent instance to check.
Returns:
A2AServerConfig if present, None otherwise.
"""
if agent.a2a is None:
return None
if isinstance(agent.a2a, A2AServerConfig):
return agent.a2a
if isinstance(agent.a2a, list):
for config in agent.a2a:
if isinstance(config, A2AServerConfig):
return config
return None
def fetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
cache_ttl: int = 300,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint with optional caching.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
async def afetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint asynchronously.
Native async implementation. Use this when running in an async context.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
agent_card: AgentCard = await _afetch_agent_card_cached(
endpoint, auth_hash, timeout
)
return agent_card
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
@lru_cache()
def _fetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
_ttl_hash: int,
) -> AgentCard:
"""Cached sync version of fetch_agent_card."""
auth = _auth_store.get(auth_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
async def _afetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
) -> AgentCard:
"""Cached async implementation of AgentCard fetching."""
auth = _auth_store.get(auth_hash)
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
async def _afetch_agent_card_impl(
endpoint: str,
auth: AuthScheme | None,
timeout: int,
) -> AgentCard:
"""Internal async implementation of AgentCard fetching."""
if "/.well-known/agent-card.json" in endpoint:
base_url = endpoint.replace("/.well-known/agent-card.json", "")
agent_card_path = "/.well-known/agent-card.json"
else:
url_parts = endpoint.split("/", 3)
base_url = f"{url_parts[0]}//{url_parts[2]}"
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
headers: MutableMapping[str, str] = {}
if auth:
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_auth_client)
headers = await auth.apply_auth(temp_auth_client, {})
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_client)
agent_card_url = f"{base_url}{agent_card_path}"
async def _fetch_agent_card_request() -> httpx.Response:
return await temp_client.get(agent_card_url)
try:
response = await retry_on_401(
request_func=_fetch_agent_card_request,
auth_scheme=auth,
client=temp_client,
headers=temp_client.headers,
max_retries=2,
)
response.raise_for_status()
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
error_details = ["Authentication failed"]
www_auth = e.response.headers.get("WWW-Authenticate")
if www_auth:
error_details.append(f"WWW-Authenticate: {www_auth}")
if not auth:
error_details.append("No auth scheme provided")
msg = " | ".join(error_details)
raise A2AClientHTTPError(401, msg) from e
raise
def _task_to_skill(task: Task) -> AgentSkill:
"""Convert a CrewAI Task to an A2A AgentSkill.
Args:
task: The CrewAI Task to convert.
Returns:
AgentSkill representing the task's capability.
"""
task_name = task.name or task.description[:50]
task_id = task_name.lower().replace(" ", "_")
tags: list[str] = []
if task.agent:
tags.append(task.agent.role.lower().replace(" ", "-"))
return AgentSkill(
id=task_id,
name=task_name,
description=task.description,
tags=tags,
examples=[task.expected_output] if task.expected_output else None,
)
def _tool_to_skill(tool_name: str, tool_description: str) -> AgentSkill:
"""Convert an Agent's tool to an A2A AgentSkill.
Args:
tool_name: Name of the tool.
tool_description: Description of what the tool does.
Returns:
AgentSkill representing the tool's capability.
"""
tool_id = tool_name.lower().replace(" ", "_")
return AgentSkill(
id=tool_id,
name=tool_name,
description=tool_description,
tags=[tool_name.lower().replace(" ", "-")],
)
def _crew_to_agent_card(crew: Crew, url: str) -> AgentCard:
"""Generate an A2A AgentCard from a Crew instance.
Args:
crew: The Crew instance to generate a card for.
url: The base URL where this crew will be exposed.
Returns:
AgentCard describing the crew's capabilities.
"""
crew_name = getattr(crew, "name", None) or crew.__class__.__name__
description_parts: list[str] = []
crew_description = getattr(crew, "description", None)
if crew_description:
description_parts.append(crew_description)
else:
agent_roles = [agent.role for agent in crew.agents]
description_parts.append(
f"A crew of {len(crew.agents)} agents: {', '.join(agent_roles)}"
)
skills = [_task_to_skill(task) for task in crew.tasks]
return AgentCard(
name=crew_name,
description=" ".join(description_parts),
url=url,
version="1.0.0",
capabilities=AgentCapabilities(
streaming=True,
push_notifications=True,
),
default_input_modes=["text/plain", "application/json"],
default_output_modes=["text/plain", "application/json"],
skills=skills,
)
def _agent_to_agent_card(agent: Agent, url: str) -> AgentCard:
"""Generate an A2A AgentCard from an Agent instance.
Uses A2AServerConfig values when available, falling back to agent properties.
Args:
agent: The Agent instance to generate a card for.
url: The base URL where this agent will be exposed.
Returns:
AgentCard describing the agent's capabilities.
"""
server_config = _get_server_config(agent) or A2AServerConfig()
name = server_config.name or agent.role
description_parts = [agent.goal]
if agent.backstory:
description_parts.append(agent.backstory)
description = server_config.description or " ".join(description_parts)
skills: list[AgentSkill] = (
server_config.skills.copy() if server_config.skills else []
)
if not skills:
if agent.tools:
for tool in agent.tools:
tool_name = getattr(tool, "name", None) or tool.__class__.__name__
tool_desc = getattr(tool, "description", None) or f"Tool: {tool_name}"
skills.append(_tool_to_skill(tool_name, tool_desc))
if not skills:
skills.append(
AgentSkill(
id=agent.role.lower().replace(" ", "_"),
name=agent.role,
description=agent.goal,
tags=[agent.role.lower().replace(" ", "-")],
)
)
return AgentCard(
name=name,
description=description,
url=server_config.url or url,
version=server_config.version,
capabilities=server_config.capabilities,
default_input_modes=server_config.default_input_modes,
default_output_modes=server_config.default_output_modes,
skills=skills,
protocol_version=server_config.protocol_version,
provider=server_config.provider,
documentation_url=server_config.documentation_url,
icon_url=server_config.icon_url,
additional_interfaces=server_config.additional_interfaces,
security=server_config.security,
security_schemes=server_config.security_schemes,
supports_authenticated_extended_card=server_config.supports_authenticated_extended_card,
signatures=server_config.signatures,
)
def inject_a2a_server_methods(agent: Agent) -> None:
"""Inject A2A server methods onto an Agent instance.
Adds a `to_agent_card(url: str) -> AgentCard` method to the agent
that generates an A2A-compliant AgentCard.
Only injects if the agent has an A2AServerConfig.
Args:
agent: The Agent instance to inject methods onto.
"""
if _get_server_config(agent) is None:
return
def _to_agent_card(self: Agent, url: str) -> AgentCard:
return _agent_to_agent_card(self, url)
object.__setattr__(agent, "to_agent_card", MethodType(_to_agent_card, agent))

View File

@@ -1,16 +1,14 @@
"""Utility functions for A2A (Agent-to-Agent) protocol delegation."""
"""A2A delegation utilities for executing tasks on remote agents."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, MutableMapping
from contextlib import asynccontextmanager
from functools import lru_cache
import time
from typing import TYPE_CHECKING, Any, Literal
import uuid
from a2a.client import A2AClientHTTPError, Client, ClientConfig, ClientFactory
from a2a.client import Client, ClientConfig, ClientFactory
from a2a.types import (
AgentCard,
Message,
@@ -19,19 +17,15 @@ from a2a.types import (
Role,
TextPart,
)
from aiocache import cached # type: ignore[import-untyped]
from aiocache.serializers import PickleSerializer # type: ignore[import-untyped]
import httpx
from pydantic import BaseModel, Field, create_model
from pydantic import BaseModel
from crewai.a2a.auth.schemas import APIKeyAuth, HTTPDigestAuth
from crewai.a2a.auth.utils import (
_auth_store,
configure_auth_client,
retry_on_401,
validate_auth_against_agent_card,
)
from crewai.a2a.config import A2AConfig
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.types import (
HANDLER_REGISTRY,
@@ -45,6 +39,7 @@ from crewai.a2a.updates import (
StreamingHandler,
UpdateConfig,
)
from crewai.a2a.utils.agent_card import _afetch_agent_card_cached
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConversationStartedEvent,
@@ -52,7 +47,6 @@ from crewai.events.types.a2a_events import (
A2ADelegationStartedEvent,
A2AMessageSentEvent,
)
from crewai.types.utils import create_literals_from_strings
if TYPE_CHECKING:
@@ -75,187 +69,6 @@ def get_handler(config: UpdateConfig | None) -> HandlerType:
return HANDLER_REGISTRY.get(type(config), StreamingHandler)
@lru_cache()
def _fetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
_ttl_hash: int,
) -> AgentCard:
"""Cached sync version of fetch_agent_card."""
auth = _auth_store.get(auth_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
def fetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
cache_ttl: int = 300,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint with optional caching.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL)
auth: Optional AuthScheme for authentication
timeout: Request timeout in seconds
use_cache: Whether to use caching (default True)
cache_ttl: Cache TTL in seconds (default 300 = 5 minutes)
Returns:
AgentCard object with agent capabilities and skills
Raises:
httpx.HTTPStatusError: If the request fails
A2AClientHTTPError: If authentication fails
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
ttl_hash = int(time.time() // cache_ttl)
return _fetch_agent_card_cached(endpoint, auth_hash, timeout, ttl_hash)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
afetch_agent_card(endpoint=endpoint, auth=auth, timeout=timeout)
)
finally:
loop.close()
async def afetch_agent_card(
endpoint: str,
auth: AuthScheme | None = None,
timeout: int = 30,
use_cache: bool = True,
) -> AgentCard:
"""Fetch AgentCard from an A2A endpoint asynchronously.
Native async implementation. Use this when running in an async context.
Args:
endpoint: A2A agent endpoint URL (AgentCard URL).
auth: Optional AuthScheme for authentication.
timeout: Request timeout in seconds.
use_cache: Whether to use caching (default True).
Returns:
AgentCard object with agent capabilities and skills.
Raises:
httpx.HTTPStatusError: If the request fails.
A2AClientHTTPError: If authentication fails.
"""
if use_cache:
if auth:
auth_data = auth.model_dump_json(
exclude={
"_access_token",
"_token_expires_at",
"_refresh_token",
"_authorization_callback",
}
)
auth_hash = hash((type(auth).__name__, auth_data))
else:
auth_hash = 0
_auth_store[auth_hash] = auth
agent_card: AgentCard = await _afetch_agent_card_cached(
endpoint, auth_hash, timeout
)
return agent_card
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
@cached(ttl=300, serializer=PickleSerializer()) # type: ignore[untyped-decorator]
async def _afetch_agent_card_cached(
endpoint: str,
auth_hash: int,
timeout: int,
) -> AgentCard:
"""Cached async implementation of AgentCard fetching."""
auth = _auth_store.get(auth_hash)
return await _afetch_agent_card_impl(endpoint=endpoint, auth=auth, timeout=timeout)
async def _afetch_agent_card_impl(
endpoint: str,
auth: AuthScheme | None,
timeout: int,
) -> AgentCard:
"""Internal async implementation of AgentCard fetching."""
if "/.well-known/agent-card.json" in endpoint:
base_url = endpoint.replace("/.well-known/agent-card.json", "")
agent_card_path = "/.well-known/agent-card.json"
else:
url_parts = endpoint.split("/", 3)
base_url = f"{url_parts[0]}//{url_parts[2]}"
agent_card_path = f"/{url_parts[3]}" if len(url_parts) > 3 else "/"
headers: MutableMapping[str, str] = {}
if auth:
async with httpx.AsyncClient(timeout=timeout) as temp_auth_client:
if isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_auth_client)
headers = await auth.apply_auth(temp_auth_client, {})
async with httpx.AsyncClient(timeout=timeout, headers=headers) as temp_client:
if auth and isinstance(auth, (HTTPDigestAuth, APIKeyAuth)):
configure_auth_client(auth, temp_client)
agent_card_url = f"{base_url}{agent_card_path}"
async def _fetch_agent_card_request() -> httpx.Response:
return await temp_client.get(agent_card_url)
try:
response = await retry_on_401(
request_func=_fetch_agent_card_request,
auth_scheme=auth,
client=temp_client,
headers=temp_client.headers,
max_retries=2,
)
response.raise_for_status()
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 401:
error_details = ["Authentication failed"]
www_auth = e.response.headers.get("WWW-Authenticate")
if www_auth:
error_details.append(f"WWW-Authenticate: {www_auth}")
if not auth:
error_details.append("No auth scheme provided")
msg = " | ".join(error_details)
raise A2AClientHTTPError(401, msg) from e
raise
def execute_a2a_delegation(
endpoint: str,
transport_protocol: Literal["JSONRPC", "GRPC", "HTTP+JSON"],
@@ -644,19 +457,18 @@ async def _create_a2a_client(
"""Create and configure an A2A client.
Args:
agent_card: The A2A agent card
transport_protocol: Transport protocol to use
timeout: Request timeout in seconds
headers: HTTP headers (already with auth applied)
streaming: Enable streaming responses
auth: Optional AuthScheme for client configuration
use_polling: Enable polling mode
push_notification_config: Optional push notification config to include in requests
agent_card: The A2A agent card.
transport_protocol: Transport protocol to use.
timeout: Request timeout in seconds.
headers: HTTP headers (already with auth applied).
streaming: Enable streaming responses.
auth: Optional AuthScheme for client configuration.
use_polling: Enable polling mode.
push_notification_config: Optional push notification config.
Yields:
Configured A2A client instance
Configured A2A client instance.
"""
async with httpx.AsyncClient(
timeout=timeout,
headers=headers,
@@ -687,78 +499,3 @@ async def _create_a2a_client(
factory = ClientFactory(config)
client = factory.create(agent_card)
yield client
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel]:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Args:
agent_ids: List of available A2A agent IDs
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field
"""
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
a2a_config: list[A2AConfig] | A2AConfig | None,
) -> tuple[list[A2AConfig], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
Args:
a2a_config: A2A configuration
Returns:
List of A2A agent IDs
"""
if a2a_config is None:
return [], ()
if isinstance(a2a_config, A2AConfig):
a2a_agents = [a2a_config]
else:
a2a_agents = a2a_config
return a2a_agents, tuple(config.endpoint for config in a2a_agents)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfig] | A2AConfig | None,
) -> tuple[list[A2AConfig], type[BaseModel]]:
"""Get A2A agent IDs and response model.
Args:
a2a_config: A2A configuration
Returns:
Tuple of A2A agent IDs and response model
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

View File

@@ -0,0 +1,101 @@
"""Response model utilities for A2A agent interactions."""
from __future__ import annotations
from typing import TypeAlias
from pydantic import BaseModel, Field, create_model
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
from crewai.types.utils import create_literals_from_strings
A2AConfigTypes: TypeAlias = A2AConfig | A2AServerConfig | A2AClientConfig
A2AClientConfigTypes: TypeAlias = A2AConfig | A2AClientConfig
def create_agent_response_model(agent_ids: tuple[str, ...]) -> type[BaseModel] | None:
"""Create a dynamic AgentResponse model with Literal types for agent IDs.
Args:
agent_ids: List of available A2A agent IDs.
Returns:
Dynamically created Pydantic model with Literal-constrained a2a_ids field,
or None if agent_ids is empty.
"""
if not agent_ids:
return None
DynamicLiteral = create_literals_from_strings(agent_ids) # noqa: N806
return create_model(
"AgentResponse",
a2a_ids=(
tuple[DynamicLiteral, ...], # type: ignore[valid-type]
Field(
default_factory=tuple,
max_length=len(agent_ids),
description="A2A agent IDs to delegate to.",
),
),
message=(
str,
Field(
description="The message content. If is_a2a=true, this is sent to the A2A agent. If is_a2a=false, this is your final answer ending the conversation."
),
),
is_a2a=(
bool,
Field(
description="Set to false when the remote agent has answered your question - extract their answer and return it as your final message. Set to true ONLY if you need to ask a NEW, DIFFERENT question. NEVER repeat the same request - if the conversation history shows the agent already answered, set is_a2a=false immediately."
),
),
__base__=BaseModel,
)
def extract_a2a_agent_ids_from_config(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], tuple[str, ...]]:
"""Extract A2A agent IDs from A2A configuration.
Filters out A2AServerConfig since it doesn't have an endpoint for delegation.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs list and agent endpoint IDs.
"""
if a2a_config is None:
return [], ()
configs: list[A2AConfigTypes]
if isinstance(a2a_config, (A2AConfig, A2AClientConfig, A2AServerConfig)):
configs = [a2a_config]
else:
configs = a2a_config
# Filter to only client configs (those with endpoint)
client_configs: list[A2AClientConfigTypes] = [
config for config in configs if isinstance(config, (A2AConfig, A2AClientConfig))
]
return client_configs, tuple(config.endpoint for config in client_configs)
def get_a2a_agents_and_response_model(
a2a_config: list[A2AConfigTypes] | A2AConfigTypes | None,
) -> tuple[list[A2AClientConfigTypes], type[BaseModel] | None]:
"""Get A2A agent configs and response model.
Args:
a2a_config: A2A configuration (any type).
Returns:
Tuple of client A2A configs and response model.
"""
a2a_agents, agent_ids = extract_a2a_agent_ids_from_config(a2a_config=a2a_config)
return a2a_agents, create_agent_response_model(agent_ids)

View File

@@ -0,0 +1,284 @@
"""A2A task utilities for server-side task management."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from functools import wraps
import logging
import os
from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
from a2a.types import (
InternalError,
InvalidParamsError,
Message,
Task as A2ATask,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
)
from a2a.utils import new_agent_text_message, new_text_artifact
from a2a.utils.errors import ServerError
from aiocache import SimpleMemoryCache, caches # type: ignore[import-untyped]
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AServerTaskCanceledEvent,
A2AServerTaskCompletedEvent,
A2AServerTaskFailedEvent,
A2AServerTaskStartedEvent,
)
from crewai.task import Task
if TYPE_CHECKING:
from crewai.agent import Agent
logger = logging.getLogger(__name__)
P = ParamSpec("P")
T = TypeVar("T")
def _parse_redis_url(url: str) -> dict[str, Any]:
from urllib.parse import urlparse
parsed = urlparse(url)
config: dict[str, Any] = {
"cache": "aiocache.RedisCache",
"endpoint": parsed.hostname or "localhost",
"port": parsed.port or 6379,
}
if parsed.path and parsed.path != "/":
try:
config["db"] = int(parsed.path.lstrip("/"))
except ValueError:
pass
if parsed.password:
config["password"] = parsed.password
return config
_redis_url = os.environ.get("REDIS_URL")
caches.set_config(
{
"default": _parse_redis_url(_redis_url)
if _redis_url
else {
"cache": "aiocache.SimpleMemoryCache",
}
}
)
def cancellable(
fn: Callable[P, Coroutine[Any, Any, T]],
) -> Callable[P, Coroutine[Any, Any, T]]:
"""Decorator that enables cancellation for A2A task execution.
Runs a cancellation watcher concurrently with the wrapped function.
When a cancel event is published, the execution is cancelled.
Args:
fn: The async function to wrap.
Returns:
Wrapped function with cancellation support.
"""
@wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
"""Wrap function with cancellation monitoring."""
context: RequestContext | None = None
for arg in args:
if isinstance(arg, RequestContext):
context = arg
break
if context is None:
context = cast(RequestContext | None, kwargs.get("context"))
if context is None:
return await fn(*args, **kwargs)
task_id = context.task_id
cache = caches.get("default")
async def poll_for_cancel() -> bool:
"""Poll cache for cancellation flag."""
while True:
if await cache.get(f"cancel:{task_id}"):
return True
await asyncio.sleep(0.1)
async def watch_for_cancel() -> bool:
"""Watch for cancellation events via pub/sub or polling."""
if isinstance(cache, SimpleMemoryCache):
return await poll_for_cancel()
try:
client = cache.client
pubsub = client.pubsub()
await pubsub.subscribe(f"cancel:{task_id}")
async for message in pubsub.listen():
if message["type"] == "message":
return True
except Exception as e:
logger.warning("Cancel watcher error for task_id=%s: %s", task_id, e)
return await poll_for_cancel()
return False
execute_task = asyncio.create_task(fn(*args, **kwargs))
cancel_watch = asyncio.create_task(watch_for_cancel())
try:
done, _ = await asyncio.wait(
[execute_task, cancel_watch],
return_when=asyncio.FIRST_COMPLETED,
)
if cancel_watch in done:
execute_task.cancel()
try:
await execute_task
except asyncio.CancelledError:
pass
raise asyncio.CancelledError(f"Task {task_id} was cancelled")
cancel_watch.cancel()
return execute_task.result()
finally:
await cache.delete(f"cancel:{task_id}")
return wrapper
@cancellable
async def execute(
agent: Agent,
context: RequestContext,
event_queue: EventQueue,
) -> None:
"""Execute an A2A task using a CrewAI agent.
Args:
agent: The CrewAI agent to execute the task.
context: The A2A request context containing the user's message.
event_queue: The event queue for sending responses back.
TODOs:
* need to impl both of structured output and file inputs, depends on `file_inputs` for
`crewai.task.Task`, pass the below two to Task. both utils in `a2a.utils.parts`
* structured outputs ingestion, `structured_inputs = get_data_parts(parts=context.message.parts)`
* file inputs ingestion, `file_inputs = get_file_parts(parts=context.message.parts)`
"""
user_message = context.get_user_input()
task_id = context.task_id
context_id = context.context_id
if task_id is None or context_id is None:
msg = "task_id and context_id are required"
crewai_event_bus.emit(
agent,
A2AServerTaskFailedEvent(a2a_task_id="", a2a_context_id="", error=msg),
)
raise ServerError(InvalidParamsError(message=msg)) from None
task = Task(
description=user_message,
expected_output="Response to the user's request",
agent=agent,
)
crewai_event_bus.emit(
agent,
A2AServerTaskStartedEvent(a2a_task_id=task_id, a2a_context_id=context_id),
)
try:
result = await agent.aexecute_task(task=task, tools=agent.tools)
result_str = str(result)
history: list[Message] = [context.message] if context.message else []
history.append(new_agent_text_message(result_str, context_id, task_id))
await event_queue.enqueue_event(
A2ATask(
id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.input_required),
artifacts=[new_text_artifact(result_str, f"result_{task_id}")],
history=history,
)
)
crewai_event_bus.emit(
agent,
A2AServerTaskCompletedEvent(
a2a_task_id=task_id, a2a_context_id=context_id, result=str(result)
),
)
except asyncio.CancelledError:
crewai_event_bus.emit(
agent,
A2AServerTaskCanceledEvent(a2a_task_id=task_id, a2a_context_id=context_id),
)
raise
except Exception as e:
crewai_event_bus.emit(
agent,
A2AServerTaskFailedEvent(
a2a_task_id=task_id, a2a_context_id=context_id, error=str(e)
),
)
raise ServerError(
error=InternalError(message=f"Task execution failed: {e}")
) from e
async def cancel(
context: RequestContext,
event_queue: EventQueue,
) -> A2ATask | None:
"""Cancel an A2A task.
Publishes a cancel event that the cancellable decorator listens for.
Args:
context: The A2A request context containing task information.
event_queue: The event queue for sending the cancellation status.
Returns:
The canceled task with updated status.
"""
task_id = context.task_id
context_id = context.context_id
if task_id is None or context_id is None:
raise ServerError(InvalidParamsError(message="task_id and context_id required"))
if context.current_task and context.current_task.status.state in (
TaskState.completed,
TaskState.failed,
TaskState.canceled,
):
return context.current_task
cache = caches.get("default")
await cache.set(f"cancel:{task_id}", True, ttl=3600)
if not isinstance(cache, SimpleMemoryCache):
await cache.client.publish(f"cancel:{task_id}", "cancel")
await event_queue.enqueue_event(
TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(state=TaskState.canceled),
final=True,
)
)
if context.current_task:
context.current_task.status = TaskStatus(state=TaskState.canceled)
return context.current_task
return None

View File

@@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Any
from a2a.types import Role, TaskState
from pydantic import BaseModel, ValidationError
from crewai.a2a.config import A2AConfig
from crewai.a2a.config import A2AClientConfig, A2AConfig
from crewai.a2a.extensions.base import ExtensionRegistry
from crewai.a2a.task_helpers import TaskStateResult
from crewai.a2a.templates import (
@@ -26,13 +26,16 @@ from crewai.a2a.templates import (
UNAVAILABLE_AGENTS_NOTICE_TEMPLATE,
)
from crewai.a2a.types import AgentResponseProtocol
from crewai.a2a.utils import (
aexecute_a2a_delegation,
from crewai.a2a.utils.agent_card import (
afetch_agent_card,
execute_a2a_delegation,
fetch_agent_card,
get_a2a_agents_and_response_model,
inject_a2a_server_methods,
)
from crewai.a2a.utils.delegation import (
aexecute_a2a_delegation,
execute_a2a_delegation,
)
from crewai.a2a.utils.response_model import get_a2a_agents_and_response_model
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.a2a_events import (
A2AConversationCompletedEvent,
@@ -122,10 +125,12 @@ def wrap_agent_with_a2a_instance(
agent, "aexecute_task", MethodType(aexecute_task_with_a2a, agent)
)
inject_a2a_server_methods(agent)
def _fetch_card_from_config(
config: A2AConfig,
) -> tuple[A2AConfig, AgentCard | Exception]:
config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
"""Fetch agent card from A2A config.
Args:
@@ -146,7 +151,7 @@ def _fetch_card_from_config(
def _fetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig],
a2a_agents: list[A2AConfig | A2AClientConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]:
"""Fetch agent cards concurrently for multiple A2A agents.
@@ -181,7 +186,7 @@ def _fetch_agent_cards_concurrently(
def _execute_task_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig],
a2a_agents: list[A2AConfig | A2AClientConfig],
original_fn: Callable[..., str],
task: Task,
agent_response_model: type[BaseModel],
@@ -270,7 +275,7 @@ def _execute_task_with_a2a(
def _augment_prompt_with_a2a(
a2a_agents: list[A2AConfig],
a2a_agents: list[A2AConfig | A2AClientConfig],
task_description: str,
agent_cards: dict[str, AgentCard],
conversation_history: list[Message] | None = None,
@@ -523,11 +528,11 @@ def _prepare_delegation_context(
task: Task,
original_task_description: str | None,
) -> tuple[
list[A2AConfig],
list[A2AConfig | A2AClientConfig],
type[BaseModel],
str,
str,
A2AConfig,
A2AConfig | A2AClientConfig,
str | None,
str | None,
dict[str, Any] | None,
@@ -591,7 +596,7 @@ def _handle_task_completion(
task: Task,
task_id_config: str | None,
reference_task_ids: list[str],
agent_config: A2AConfig,
agent_config: A2AConfig | A2AClientConfig,
turn_num: int,
) -> tuple[str | None, str | None, list[str]]:
"""Handle task completion state including reference task updates.
@@ -631,7 +636,7 @@ def _handle_agent_response_and_continue(
a2a_result: TaskStateResult,
agent_id: str,
agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig],
a2a_agents: list[A2AConfig | A2AClientConfig],
original_task_description: str,
conversation_history: list[Message],
turn_num: int,
@@ -868,8 +873,8 @@ def _delegate_to_a2a(
async def _afetch_card_from_config(
config: A2AConfig,
) -> tuple[A2AConfig, AgentCard | Exception]:
config: A2AConfig | A2AClientConfig,
) -> tuple[A2AConfig | A2AClientConfig, AgentCard | Exception]:
"""Fetch agent card from A2A config asynchronously."""
try:
card = await afetch_agent_card(
@@ -883,7 +888,7 @@ async def _afetch_card_from_config(
async def _afetch_agent_cards_concurrently(
a2a_agents: list[A2AConfig],
a2a_agents: list[A2AConfig | A2AClientConfig],
) -> tuple[dict[str, AgentCard], dict[str, str]]:
"""Fetch agent cards concurrently for multiple A2A agents using asyncio."""
agent_cards: dict[str, AgentCard] = {}
@@ -908,7 +913,7 @@ async def _afetch_agent_cards_concurrently(
async def _aexecute_task_with_a2a(
self: Agent,
a2a_agents: list[A2AConfig],
a2a_agents: list[A2AConfig | A2AClientConfig],
original_fn: Callable[..., Coroutine[Any, Any, str]],
task: Task,
agent_response_model: type[BaseModel],
@@ -987,7 +992,7 @@ async def _ahandle_agent_response_and_continue(
a2a_result: TaskStateResult,
agent_id: str,
agent_cards: dict[str, AgentCard] | None,
a2a_agents: list[A2AConfig],
a2a_agents: list[A2AConfig | A2AClientConfig],
original_task_description: str,
conversation_history: list[Message],
turn_num: int,

View File

@@ -17,7 +17,6 @@ from urllib.parse import urlparse
from pydantic import BaseModel, Field, InstanceOf, PrivateAttr, model_validator
from typing_extensions import Self
from crewai.a2a.config import A2AConfig
from crewai.agent.utils import (
ahandle_knowledge_retrieval,
apply_training_data,
@@ -35,6 +34,11 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.knowledge_events import (
KnowledgeQueryCompletedEvent,
KnowledgeQueryFailedEvent,
@@ -44,10 +48,10 @@ from crewai.events.types.memory_events import (
MemoryRetrievalCompletedEvent,
MemoryRetrievalStartedEvent,
)
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.agent_executor import AgentExecutor
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.lite_agent import LiteAgent
from crewai.lite_agent_output import LiteAgentOutput
from crewai.llms.base_llm import BaseLLM
from crewai.mcp import (
MCPClient,
@@ -70,21 +74,31 @@ from crewai.utilities.agent_utils import (
render_text_description_and_args,
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import Converter
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.guardrail_types import GuardrailType
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.prompts import Prompts
from crewai.utilities.prompts import Prompts, StandardPromptResult, SystemPromptResult
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
from crewai.a2a.config import A2AClientConfig, A2AConfig, A2AServerConfig
except ImportError:
A2AClientConfig = Any
A2AConfig = Any
A2AServerConfig = Any
if TYPE_CHECKING:
from crewai_tools import CodeInterpreterTool
from crewai.agents.agent_builder.base_agent import PlatformAppOrAction
from crewai.lite_agent_output import LiteAgentOutput
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.types import LLMMessage
@@ -106,7 +120,7 @@ class Agent(BaseAgent):
The agent can also have memory, can operate in verbose mode, and can delegate tasks to other agents.
Attributes:
agent_executor: An instance of the CrewAgentExecutor or CrewAgentExecutorFlow class.
agent_executor: An instance of the CrewAgentExecutor or AgentExecutor class.
role: The role of the agent.
goal: The objective of the agent.
backstory: The backstory of the agent.
@@ -218,13 +232,22 @@ class Agent(BaseAgent):
guardrail_max_retries: int = Field(
default=3, description="Maximum number of retries when guardrail fails"
)
a2a: list[A2AConfig] | A2AConfig | None = Field(
a2a: (
list[A2AConfig | A2AServerConfig | A2AClientConfig]
| A2AConfig
| A2AServerConfig
| A2AClientConfig
| None
) = Field(
default=None,
description="A2A (Agent-to-Agent) configuration for delegating tasks to remote agents. Can be a single A2AConfig or a dict mapping agent IDs to configs.",
description="""
A2A (Agent-to-Agent) configuration for delegating tasks to remote agents.
Can be a single A2AConfig/A2AClientConfig/A2AServerConfig, or a list of any number of A2AConfig/A2AClientConfig with a single A2AServerConfig.
""",
)
executor_class: type[CrewAgentExecutor] | type[CrewAgentExecutorFlow] = Field(
executor_class: type[CrewAgentExecutor] | type[AgentExecutor] = Field(
default=CrewAgentExecutor,
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use CrewAgentExecutorFlow.",
description="Class to use for the agent executor. Defaults to CrewAgentExecutor, can optionally use AgentExecutor.",
)
@model_validator(mode="before")
@@ -733,7 +756,7 @@ class Agent(BaseAgent):
if self.agent_executor is not None:
self._update_executor_parameters(
task=task,
tools=parsed_tools,
tools=parsed_tools, # type: ignore[arg-type]
raw_tools=raw_tools,
prompt=prompt,
stop_words=stop_words,
@@ -742,7 +765,7 @@ class Agent(BaseAgent):
else:
self.agent_executor = self.executor_class(
llm=cast(BaseLLM, self.llm),
task=task,
task=task, # type: ignore[arg-type]
i18n=self.i18n,
agent=self,
crew=self.crew,
@@ -765,11 +788,11 @@ class Agent(BaseAgent):
def _update_executor_parameters(
self,
task: Task | None,
tools: list,
tools: list[BaseTool],
raw_tools: list[BaseTool],
prompt: dict,
prompt: SystemPromptResult | StandardPromptResult,
stop_words: list[str],
rpm_limit_fn: Callable | None,
rpm_limit_fn: Callable | None, # type: ignore[type-arg]
) -> None:
"""Update executor parameters without recreating instance.
@@ -1567,26 +1590,25 @@ class Agent(BaseAgent):
)
return None
def kickoff(
def _prepare_kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent with the given messages using a LiteAgent instance.
) -> tuple[AgentExecutor, dict[str, str], dict[str, Any], list[CrewStructuredTool]]:
"""Prepare common setup for kickoff execution.
This method is useful when you want to use the Agent configuration but
with the simpler and more direct execution flow of LiteAgent.
This method handles all the common preparation logic shared between
kickoff() and kickoff_async(), including tool processing, prompt building,
executor creation, and input formatting.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
LiteAgentOutput: The result of the agent execution.
Tuple of (executor, inputs, agent_info, parsed_tools) ready for execution.
"""
# Process platform apps and MCP tools
if self.apps:
platform_tools = self.get_platform_tools(self.apps)
if platform_tools and self.tools is not None:
@@ -1596,25 +1618,354 @@ class Agent(BaseAgent):
if mcps and self.tools is not None:
self.tools.extend(mcps)
lite_agent = LiteAgent(
id=self.id,
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
verbose=self.verbose,
response_format=response_format,
# Prepare tools
raw_tools: list[BaseTool] = self.tools or []
parsed_tools = parse_tools(raw_tools)
# Build agent_info for backward-compatible event emission
agent_info = {
"id": self.id,
"role": self.role,
"goal": self.goal,
"backstory": self.backstory,
"tools": raw_tools,
"verbose": self.verbose,
}
# Build prompt for standalone execution
prompt = Prompts(
agent=self,
has_tools=len(raw_tools) > 0,
i18n=self.i18n,
original_agent=self,
guardrail=self.guardrail,
guardrail_max_retries=self.guardrail_max_retries,
use_system_prompt=self.use_system_prompt,
system_template=self.system_template,
prompt_template=self.prompt_template,
response_template=self.response_template,
).task_execution()
# Prepare stop words
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
# Get RPM limit function
rpm_limit_fn = (
self._rpm_controller.check_or_wait if self._rpm_controller else None
)
return lite_agent.kickoff(messages)
# Create the executor for standalone mode (no crew, no task)
executor = AgentExecutor(
task=None,
crew=None,
llm=cast(BaseLLM, self.llm),
agent=self,
prompt=prompt,
max_iter=self.max_iter,
tools=parsed_tools,
tools_names=get_tool_names(parsed_tools),
stop_words=stop_words,
tools_description=render_text_description_and_args(parsed_tools),
tools_handler=self.tools_handler,
original_tools=raw_tools,
step_callback=self.step_callback,
function_calling_llm=self.function_calling_llm,
respect_context_window=self.respect_context_window,
request_within_rpm_limit=rpm_limit_fn,
callbacks=[TokenCalcHandler(self._token_process)],
response_model=response_format,
i18n=self.i18n,
)
# Format messages
if isinstance(messages, str):
formatted_messages = messages
else:
formatted_messages = "\n".join(
str(msg.get("content", "")) for msg in messages if msg.get("content")
)
# Build the input dict for the executor
inputs = {
"input": formatted_messages,
"tool_names": get_tool_names(parsed_tools),
"tools": render_text_description_and_args(parsed_tools),
}
return executor, inputs, agent_info, parsed_tools
def kickoff(
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, and guardrails.
When called from within a sync Flow method, the Flow framework automatically
runs the method in a thread pool, so this works seamlessly. For async Flow
methods, use kickoff_async() instead.
Args:
messages: Either a string query or a list of message dictionaries.
If a string is provided, it will be converted to a user message.
If a list is provided, each dict should have 'role' and 'content' keys.
response_format: Optional Pydantic model for structured output.
Returns:
LiteAgentOutput: The result of the agent execution.
Note:
If called from an async context (not through Flow), use kickoff_async().
"""
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
)
try:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
),
)
output = self._execute_and_build_output(executor, inputs, response_format)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
except Exception as e:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise
def _execute_and_build_output(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent and build the output object.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent (this is called from sync path, so invoke returns dict)
result = cast(dict[str, Any], executor.invoke(inputs))
raw_output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
raw=raw_output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
async def _execute_and_build_output_async(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously and build the output object.
This is the async version of _execute_and_build_output that uses
invoke_async() for native async execution within event loops.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent asynchronously
result = await executor.invoke_async(inputs)
raw_output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
if response_format:
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
return LiteAgentOutput(
raw=raw_output,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
def _process_kickoff_guardrail(
self,
output: LiteAgentOutput,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
retry_count: int = 0,
) -> LiteAgentOutput:
"""Process guardrail for kickoff execution with retry logic.
Args:
output: Current agent output.
executor: The executor instance.
inputs: Input dictionary for re-execution.
response_format: Optional response format.
retry_count: Current retry count.
Returns:
Validated/updated output.
"""
from crewai.utilities.guardrail_types import GuardrailCallable
# Ensure guardrail is callable
guardrail_callable: GuardrailCallable
if isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
guardrail_callable = cast(
GuardrailCallable,
LLMGuardrail(description=self.guardrail, llm=cast(BaseLLM, self.llm)),
)
elif callable(self.guardrail):
guardrail_callable = self.guardrail
else:
# Should not happen if called from kickoff with guardrail check
return output
guardrail_result = process_guardrail(
output=output,
guardrail=guardrail_callable,
retry_count=retry_count,
event_source=self,
from_agent=self,
)
if not guardrail_result.success:
if retry_count >= self.guardrail_max_retries:
raise ValueError(
f"Agent's guardrail failed validation after {self.guardrail_max_retries} retries. "
f"Last error: {guardrail_result.error}"
)
# Add feedback and re-execute
executor._append_message_to_state(
guardrail_result.error or "Guardrail validation failed",
role="user",
)
# Re-execute and build new output
output = self._execute_and_build_output(executor, inputs, response_format)
# Recursively retry guardrail
return self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
retry_count=retry_count + 1,
)
# Apply guardrail result if available
if guardrail_result.result is not None:
if isinstance(guardrail_result.result, str):
output.raw = guardrail_result.result
elif isinstance(guardrail_result.result, BaseModel):
output.pydantic = guardrail_result.result
return output
async def kickoff_async(
self,
@@ -1622,9 +1973,11 @@ class Agent(BaseAgent):
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""
Execute the agent asynchronously with the given messages using a LiteAgent instance.
Execute the agent asynchronously with the given messages.
This is the async version of the kickoff method.
This is the async version of the kickoff method that uses native async
execution. It is designed for use within async contexts, such as when
called from within an async Flow method.
Args:
messages: Either a string query or a list of message dictionaries.
@@ -1635,21 +1988,48 @@ class Agent(BaseAgent):
Returns:
LiteAgentOutput: The result of the agent execution.
"""
lite_agent = LiteAgent(
role=self.role,
goal=self.goal,
backstory=self.backstory,
llm=self.llm,
tools=self.tools or [],
max_iterations=self.max_iter,
max_execution_time=self.max_execution_time,
respect_context_window=self.respect_context_window,
verbose=self.verbose,
response_format=response_format,
i18n=self.i18n,
original_agent=self,
guardrail=self.guardrail,
guardrail_max_retries=self.guardrail_max_retries,
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format
)
return await lite_agent.kickoff_async(messages)
try:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
),
)
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)
if self.guardrail is not None:
output = self._process_kickoff_guardrail(
output=output,
executor=executor,
inputs=inputs,
response_format=response_format,
)
crewai_event_bus.emit(
self,
event=LiteAgentExecutionCompletedEvent(
agent_info=agent_info,
output=output.raw,
),
)
return output
except Exception as e:
crewai_event_bus.emit(
self,
event=LiteAgentExecutionErrorEvent(
agent_info=agent_info,
error=str(e),
),
)
raise

View File

@@ -21,9 +21,9 @@ if TYPE_CHECKING:
class CrewAgentExecutorMixin:
crew: Crew
crew: Crew | None
agent: Agent
task: Task
task: Task | None
iterations: int
max_iter: int
messages: list[LLMMessage]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.8.0"
"crewai[tools]==1.8.1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.8.0"
"crewai[tools]==1.8.1"
]
[project.scripts]

View File

@@ -1,3 +1,20 @@
from crewai.events.types.a2a_events import (
A2AConversationCompletedEvent,
A2AConversationStartedEvent,
A2ADelegationCompletedEvent,
A2ADelegationStartedEvent,
A2AMessageSentEvent,
A2APollingStartedEvent,
A2APollingStatusEvent,
A2APushNotificationReceivedEvent,
A2APushNotificationRegisteredEvent,
A2APushNotificationTimeoutEvent,
A2AResponseReceivedEvent,
A2AServerTaskCanceledEvent,
A2AServerTaskCompletedEvent,
A2AServerTaskFailedEvent,
A2AServerTaskStartedEvent,
)
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
@@ -76,7 +93,22 @@ from crewai.events.types.tool_usage_events import (
EventTypes = (
CrewKickoffStartedEvent
A2AConversationCompletedEvent
| A2AConversationStartedEvent
| A2ADelegationCompletedEvent
| A2ADelegationStartedEvent
| A2AMessageSentEvent
| A2APollingStartedEvent
| A2APollingStatusEvent
| A2APushNotificationReceivedEvent
| A2APushNotificationRegisteredEvent
| A2APushNotificationTimeoutEvent
| A2AResponseReceivedEvent
| A2AServerTaskCanceledEvent
| A2AServerTaskCompletedEvent
| A2AServerTaskFailedEvent
| A2AServerTaskStartedEvent
| CrewKickoffStartedEvent
| CrewKickoffCompletedEvent
| CrewKickoffFailedEvent
| CrewTestStartedEvent

View File

@@ -210,3 +210,37 @@ class A2APushNotificationTimeoutEvent(A2AEventBase):
type: str = "a2a_push_notification_timeout"
task_id: str
timeout_seconds: float
class A2AServerTaskStartedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution starts."""
type: str = "a2a_server_task_started"
a2a_task_id: str
a2a_context_id: str
class A2AServerTaskCompletedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution completes."""
type: str = "a2a_server_task_completed"
a2a_task_id: str
a2a_context_id: str
result: str
class A2AServerTaskCanceledEvent(A2AEventBase):
"""Event emitted when an A2A server task execution is canceled."""
type: str = "a2a_server_task_canceled"
a2a_task_id: str
a2a_context_id: str
class A2AServerTaskFailedEvent(A2AEventBase):
"""Event emitted when an A2A server task execution fails."""
type: str = "a2a_server_task_failed"
a2a_task_id: str
a2a_context_id: str
error: str

View File

@@ -1,4 +1,4 @@
from crewai.experimental.crew_agent_executor_flow import CrewAgentExecutorFlow
from crewai.experimental.agent_executor import AgentExecutor, CrewAgentExecutorFlow
from crewai.experimental.evaluation import (
AgentEvaluationResult,
AgentEvaluator,
@@ -23,8 +23,9 @@ from crewai.experimental.evaluation import (
__all__ = [
"AgentEvaluationResult",
"AgentEvaluator",
"AgentExecutor",
"BaseEvaluator",
"CrewAgentExecutorFlow",
"CrewAgentExecutorFlow", # Deprecated alias for AgentExecutor
"EvaluationScore",
"EvaluationTraceCallback",
"ExperimentResult",

View File

@@ -1,6 +1,6 @@
from __future__ import annotations
from collections.abc import Callable
from collections.abc import Callable, Coroutine
import threading
from typing import TYPE_CHECKING, Any, Literal, cast
from uuid import uuid4
@@ -37,6 +37,7 @@ from crewai.utilities.agent_utils import (
handle_unknown_error,
has_reached_max_iterations,
is_context_length_exceeded,
is_inside_event_loop,
process_llm_response,
)
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -73,13 +74,17 @@ class AgentReActState(BaseModel):
ask_for_human_input: bool = Field(default=False)
class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based executor matching CrewAgentExecutor interface.
class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Flow-based agent executor for both standalone and crew-bound execution.
Inherits from:
- Flow[AgentReActState]: Provides flow orchestration capabilities
- CrewAgentExecutorMixin: Provides memory methods (short/long/external term)
This executor can operate in two modes:
- Standalone mode: When crew and task are None (used by Agent.kickoff())
- Crew mode: When crew and task are provided (used by Agent.execute_task())
Note: Multiple instances may be created during agent initialization
(cache setup, RPM controller setup, etc.) but only the final instance
should execute tasks via invoke().
@@ -88,8 +93,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
def __init__(
self,
llm: BaseLLM,
task: Task,
crew: Crew,
agent: Agent,
prompt: SystemPromptResult | StandardPromptResult,
max_iter: int,
@@ -98,6 +101,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
stop_words: list[str],
tools_description: str,
tools_handler: ToolsHandler,
task: Task | None = None,
crew: Crew | None = None,
step_callback: Any = None,
original_tools: list[BaseTool] | None = None,
function_calling_llm: BaseLLM | Any | None = None,
@@ -111,8 +116,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
Args:
llm: Language model instance.
task: Task to execute.
crew: Crew instance.
agent: Agent to execute.
prompt: Prompt templates.
max_iter: Maximum iterations.
@@ -121,6 +124,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
stop_words: Stop word list.
tools_description: Tool descriptions.
tools_handler: Tool handler instance.
task: Optional task to execute (None for standalone agent execution).
crew: Optional crew instance (None for standalone agent execution).
step_callback: Optional step callback.
original_tools: Original tool list.
function_calling_llm: Optional function calling LLM.
@@ -131,9 +136,9 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
self._i18n: I18N = i18n or get_i18n()
self.llm = llm
self.task = task
self.task: Task | None = task
self.agent = agent
self.crew = crew
self.crew: Crew | None = crew
self.prompt = prompt
self.tools = tools
self.tools_names = tools_names
@@ -178,7 +183,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
else self.stop
)
)
self._state = AgentReActState()
def _ensure_flow_initialized(self) -> None:
@@ -264,7 +268,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
printer=self._printer,
from_task=self.task,
from_agent=self.agent,
response_model=self.response_model,
response_model=None,
executor_context=self,
)
@@ -449,9 +453,99 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
return "initialized"
def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]:
def invoke(
self, inputs: dict[str, Any]
) -> dict[str, Any] | Coroutine[Any, Any, dict[str, Any]]:
"""Execute agent with given inputs.
When called from within an existing event loop (e.g., inside a Flow),
this method returns a coroutine that should be awaited. The Flow
framework handles this automatically.
Args:
inputs: Input dictionary containing prompt variables.
Returns:
Dictionary with agent output, or a coroutine if inside an event loop.
"""
# Magic auto-async: if inside event loop, return coroutine for Flow to await
if is_inside_event_loop():
return self.invoke_async(inputs)
self._ensure_flow_initialized()
with self._execution_lock:
if self._is_executing:
raise RuntimeError(
"Executor is already running. "
"Cannot invoke the same executor instance concurrently."
)
self._is_executing = True
self._has_been_invoked = True
try:
# Reset state for fresh execution
self.state.messages.clear()
self.state.iterations = 0
self.state.current_answer = None
self.state.is_finished = False
if "system" in self.prompt:
prompt = cast("SystemPromptResult", self.prompt)
system_prompt = self._format_prompt(prompt["system"], inputs)
user_prompt = self._format_prompt(prompt["user"], inputs)
self.state.messages.append(
format_message_for_llm(system_prompt, role="system")
)
self.state.messages.append(format_message_for_llm(user_prompt))
else:
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
self.state.messages.append(format_message_for_llm(user_prompt))
self.state.ask_for_human_input = bool(
inputs.get("ask_for_human_input", False)
)
self.kickoff()
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._create_short_term_memory(formatted_answer)
self._create_long_term_memory(formatted_answer)
self._create_external_memory(formatted_answer)
return {"output": formatted_answer.output}
except AssertionError:
fail_text = Text()
fail_text.append("", style="red bold")
fail_text.append(
"Agent failed to reach a final answer. This is likely a bug - please report it.",
style="red",
)
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False
async def invoke_async(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""Execute agent asynchronously with given inputs.
This method is designed for use within async contexts, such as when
the agent is called from within an async Flow method. It uses
kickoff_async() directly instead of running in a separate thread.
Args:
inputs: Input dictionary containing prompt variables.
@@ -492,7 +586,8 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
inputs.get("ask_for_human_input", False)
)
self.kickoff()
# Use async kickoff directly since we're already in an async context
await self.kickoff_async()
formatted_answer = self.state.current_answer
@@ -583,11 +678,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
if self.agent is None:
raise ValueError("Agent cannot be None")
if self.task is None:
return
crewai_event_bus.emit(
self.agent,
AgentLogsStartedEvent(
agent_role=self.agent.role,
task_description=(self.task.description if self.task else "Not Found"),
task_description=self.task.description,
verbose=self.agent.verbose
or (hasattr(self, "crew") and getattr(self.crew, "verbose", False)),
),
@@ -621,10 +719,12 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
result: Agent's final output.
human_feedback: Optional feedback from human.
"""
# Early return if no crew (standalone mode)
if self.crew is None:
return
agent_id = str(self.agent.id)
train_iteration = (
getattr(self.crew, "_train_iteration", None) if self.crew else None
)
train_iteration = getattr(self.crew, "_train_iteration", None)
if train_iteration is None or not isinstance(train_iteration, int):
train_error = Text()
@@ -806,3 +906,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin):
requiring arbitrary_types_allowed=True.
"""
return core_schema.any_schema()
# Backward compatibility alias (deprecated)
CrewAgentExecutorFlow = AgentExecutor

View File

@@ -73,6 +73,7 @@ from crewai.flow.utils import (
is_simple_flow_condition,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.human_feedback import HumanFeedbackResult
@@ -570,7 +571,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
flow_id: str,
persistence: FlowPersistence | None = None,
**kwargs: Any,
) -> "Flow[Any]":
) -> Flow[Any]:
"""Create a Flow instance from a pending feedback state.
This classmethod is used to restore a flow that was paused waiting
@@ -631,7 +632,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return instance
@property
def pending_feedback(self) -> "PendingFeedbackContext | None":
def pending_feedback(self) -> PendingFeedbackContext | None:
"""Get the pending feedback context if this flow is waiting for feedback.
Returns:
@@ -716,9 +717,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no pending feedback context exists
"""
from crewai.flow.human_feedback import HumanFeedbackResult
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if self._pending_feedback_context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
@@ -1346,9 +1348,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(inputs)
try:
# Determine which start methods to execute at kickoff
# Conditional start methods (with __trigger_methods__) are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
unconditional_starts = [
start_method
for start_method in self._start_methods
if not getattr(
self._methods.get(start_method), "__trigger_methods__", None
)
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts
if unconditional_starts
else self._start_methods
)
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
except Exception as e:
@@ -1573,11 +1592,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
if future:
self._event_futures.append(future)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
import contextvars
ctx = contextvars.copy_context()
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
# Auto-await coroutines from sync methods (still useful for explicit coroutine returns)
if asyncio.iscoroutine(result):
result = await result
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
@@ -1745,14 +1770,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
should_trigger = current_trigger in all_methods
if should_trigger:
# Only execute if this is a cycle (method was already completed)
# Execute conditional start method triggered by router result
if method_name in self._completed_methods:
# For router-triggered start methods in cycles, temporarily clear resumption flag
# to allow cyclic execution
# For cyclic re-execution, temporarily clear resumption flag
was_resuming = self._is_execution_resuming
self._is_execution_resuming = False
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
else:
# First-time execution of conditional start
await self._execute_start_method(method_name)
def _evaluate_condition(
self,
@@ -1896,6 +1923,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
await self._execute_listeners(listener_name, None)
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if listener_name in self._routers:
for start_method_name in self._start_methods:
if (
start_method_name in self._listeners
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
await self._execute_start_method(start_method_name)
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)

View File

@@ -10,6 +10,7 @@ from typing import (
get_origin,
)
import uuid
import warnings
from pydantic import (
UUID4,
@@ -80,6 +81,11 @@ class LiteAgent(FlowTrackable, BaseModel):
"""
A lightweight agent that can process messages and use tools.
.. deprecated::
LiteAgent is deprecated and will be removed in a future version.
Use ``Agent().kickoff(messages)`` instead, which provides the same
functionality with additional features like memory and knowledge support.
This agent is simpler than the full Agent class, focusing on direct execution
rather than task delegation. It's designed to be used for simple interactions
where a full crew is not needed.
@@ -164,6 +170,18 @@ class LiteAgent(FlowTrackable, BaseModel):
default_factory=get_after_llm_call_hooks
)
@model_validator(mode="after")
def emit_deprecation_warning(self) -> Self:
"""Emit deprecation warning for LiteAgent usage."""
warnings.warn(
"LiteAgent is deprecated and will be removed in a future version. "
"Use Agent().kickoff(messages) instead, which provides the same "
"functionality with additional features like memory and knowledge support.",
DeprecationWarning,
stacklevel=2,
)
return self
@model_validator(mode="after")
def setup_llm(self) -> Self:
"""Set up the LLM and other components after initialization."""

View File

@@ -1,8 +1,6 @@
"""Utilities for creating and manipulating types."""
from typing import Annotated, Final, Literal
from typing_extensions import TypeAliasType
from typing import Annotated, Final, Literal, cast
_DYNAMIC_LITERAL_ALIAS: Final[Literal["DynamicLiteral"]] = "DynamicLiteral"
@@ -20,6 +18,11 @@ def create_literals_from_strings(
Returns:
Literal type for each A2A agent ID
Raises:
ValueError: If values is empty (Literal requires at least one value)
"""
unique_values: tuple[str, ...] = tuple(dict.fromkeys(values))
return Literal.__getitem__(unique_values)
if not unique_values:
raise ValueError("Cannot create Literal type from empty values")
return cast(type, Literal.__getitem__(unique_values))

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
import json
import re
@@ -54,6 +55,23 @@ console = Console()
_MULTIPLE_NEWLINES: Final[re.Pattern[str]] = re.compile(r"\n+")
def is_inside_event_loop() -> bool:
"""Check if code is currently running inside an asyncio event loop.
This is used to detect when code is being called from within an async context
(e.g., inside a Flow). In such cases, callers should return a coroutine
instead of executing synchronously to avoid nested event loop errors.
Returns:
True if inside a running event loop, False otherwise.
"""
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False
def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]:
"""Parse tools to be used for the task.

View File

@@ -0,0 +1,325 @@
"""Tests for A2A agent card utilities."""
from __future__ import annotations
from a2a.types import AgentCard, AgentSkill
from crewai import Agent
from crewai.a2a.config import A2AClientConfig, A2AServerConfig
from crewai.a2a.utils.agent_card import inject_a2a_server_methods
class TestInjectA2AServerMethods:
"""Tests for inject_a2a_server_methods function."""
def test_agent_with_server_config_gets_to_agent_card_method(self) -> None:
"""Agent with A2AServerConfig should have to_agent_card method injected."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
def test_agent_without_server_config_no_injection(self) -> None:
"""Agent without A2AServerConfig should not get to_agent_card method."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AClientConfig(endpoint="http://example.com"),
)
assert not hasattr(agent, "to_agent_card")
def test_agent_without_a2a_no_injection(self) -> None:
"""Agent without any a2a config should not get to_agent_card method."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
assert not hasattr(agent, "to_agent_card")
def test_agent_with_mixed_configs_gets_injection(self) -> None:
"""Agent with list containing A2AServerConfig should get to_agent_card."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=[
A2AClientConfig(endpoint="http://example.com"),
A2AServerConfig(name="My Agent"),
],
)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
def test_manual_injection_on_plain_agent(self) -> None:
"""inject_a2a_server_methods should work when called manually."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
# Manually set server config and inject
object.__setattr__(agent, "a2a", A2AServerConfig())
inject_a2a_server_methods(agent)
assert hasattr(agent, "to_agent_card")
assert callable(agent.to_agent_card)
class TestToAgentCard:
"""Tests for the injected to_agent_card method."""
def test_returns_agent_card(self) -> None:
"""to_agent_card should return an AgentCard instance."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert isinstance(card, AgentCard)
def test_uses_agent_role_as_name(self) -> None:
"""AgentCard name should default to agent role."""
agent = Agent(
role="Data Analyst",
goal="Analyze data",
backstory="Expert analyst",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.name == "Data Analyst"
def test_uses_server_config_name(self) -> None:
"""AgentCard name should prefer A2AServerConfig.name over role."""
agent = Agent(
role="Data Analyst",
goal="Analyze data",
backstory="Expert analyst",
a2a=A2AServerConfig(name="Custom Agent Name"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.name == "Custom Agent Name"
def test_uses_goal_as_description(self) -> None:
"""AgentCard description should include agent goal."""
agent = Agent(
role="Test Agent",
goal="Accomplish important tasks",
backstory="Has extensive experience",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert "Accomplish important tasks" in card.description
def test_uses_server_config_description(self) -> None:
"""AgentCard description should prefer A2AServerConfig.description."""
agent = Agent(
role="Test Agent",
goal="Accomplish important tasks",
backstory="Has extensive experience",
a2a=A2AServerConfig(description="Custom description"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.description == "Custom description"
def test_uses_provided_url(self) -> None:
"""AgentCard url should use the provided URL."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://my-server.com:9000")
assert card.url == "http://my-server.com:9000"
def test_uses_server_config_url(self) -> None:
"""AgentCard url should prefer A2AServerConfig.url over provided URL."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(url="http://configured-url.com"),
)
card = agent.to_agent_card("http://fallback-url.com")
assert card.url == "http://configured-url.com/"
def test_generates_default_skill(self) -> None:
"""AgentCard should have at least one skill based on agent role."""
agent = Agent(
role="Research Assistant",
goal="Help with research",
backstory="Skilled researcher",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert len(card.skills) >= 1
skill = card.skills[0]
assert skill.name == "Research Assistant"
assert skill.description == "Help with research"
def test_uses_server_config_skills(self) -> None:
"""AgentCard skills should prefer A2AServerConfig.skills."""
custom_skill = AgentSkill(
id="custom-skill",
name="Custom Skill",
description="A custom skill",
tags=["custom"],
)
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(skills=[custom_skill]),
)
card = agent.to_agent_card("http://localhost:8000")
assert len(card.skills) == 1
assert card.skills[0].id == "custom-skill"
assert card.skills[0].name == "Custom Skill"
def test_includes_custom_version(self) -> None:
"""AgentCard should include version from A2AServerConfig."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(version="2.0.0"),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.version == "2.0.0"
def test_default_version(self) -> None:
"""AgentCard should have default version 1.0.0."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
assert card.version == "1.0.0"
class TestAgentCardJsonStructure:
"""Tests for the JSON structure of AgentCard."""
def test_json_has_required_fields(self) -> None:
"""AgentCard JSON should contain all required A2A protocol fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
assert "name" in json_data
assert "description" in json_data
assert "url" in json_data
assert "version" in json_data
assert "skills" in json_data
assert "capabilities" in json_data
assert "defaultInputModes" in json_data
assert "defaultOutputModes" in json_data
def test_json_skills_structure(self) -> None:
"""Each skill in JSON should have required fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
assert len(json_data["skills"]) >= 1
skill = json_data["skills"][0]
assert "id" in skill
assert "name" in skill
assert "description" in skill
assert "tags" in skill
def test_json_capabilities_structure(self) -> None:
"""Capabilities in JSON should have expected fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump()
capabilities = json_data["capabilities"]
assert "streaming" in capabilities
assert "pushNotifications" in capabilities
def test_json_serializable(self) -> None:
"""AgentCard should be JSON serializable."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_str = card.model_dump_json()
assert isinstance(json_str, str)
assert "Test Agent" in json_str
assert "http://localhost:8000" in json_str
def test_json_excludes_none_values(self) -> None:
"""AgentCard JSON with exclude_none should omit None fields."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
a2a=A2AServerConfig(),
)
card = agent.to_agent_card("http://localhost:8000")
json_data = card.model_dump(exclude_none=True)
assert "provider" not in json_data
assert "documentationUrl" not in json_data
assert "iconUrl" not in json_data

View File

@@ -0,0 +1,370 @@
"""Tests for A2A task utilities."""
from __future__ import annotations
import asyncio
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
import pytest_asyncio
from a2a.server.agent_execution import RequestContext
from a2a.server.events import EventQueue
from a2a.types import Message, Task as A2ATask, TaskState, TaskStatus
from crewai.a2a.utils.task import cancel, cancellable, execute
@pytest.fixture
def mock_agent() -> MagicMock:
"""Create a mock CrewAI agent."""
agent = MagicMock()
agent.role = "Test Agent"
agent.tools = []
agent.aexecute_task = AsyncMock(return_value="Task completed successfully")
return agent
@pytest.fixture
def mock_task() -> MagicMock:
"""Create a mock Task."""
return MagicMock()
@pytest.fixture
def mock_context() -> MagicMock:
"""Create a mock RequestContext."""
context = MagicMock(spec=RequestContext)
context.task_id = "test-task-123"
context.context_id = "test-context-456"
context.get_user_input.return_value = "Test user message"
context.message = MagicMock(spec=Message)
context.current_task = None
return context
@pytest.fixture
def mock_event_queue() -> AsyncMock:
"""Create a mock EventQueue."""
queue = AsyncMock(spec=EventQueue)
queue.enqueue_event = AsyncMock()
return queue
@pytest_asyncio.fixture(autouse=True)
async def clear_cache(mock_context: MagicMock) -> None:
"""Clear cancel flag from cache before each test."""
from aiocache import caches
cache = caches.get("default")
await cache.delete(f"cancel:{mock_context.task_id}")
class TestCancellableDecorator:
"""Tests for the cancellable decorator."""
@pytest.mark.asyncio
async def test_executes_function_without_context(self) -> None:
"""Function executes normally when no RequestContext is provided."""
call_count = 0
@cancellable
async def my_func(value: int) -> int:
nonlocal call_count
call_count += 1
return value * 2
result = await my_func(5)
assert result == 10
assert call_count == 1
@pytest.mark.asyncio
async def test_executes_function_with_context(self, mock_context: MagicMock) -> None:
"""Function executes normally with RequestContext when not cancelled."""
@cancellable
async def my_func(context: RequestContext) -> str:
await asyncio.sleep(0.01)
return "completed"
result = await my_func(mock_context)
assert result == "completed"
@pytest.mark.asyncio
async def test_cancellation_raises_cancelled_error(
self, mock_context: MagicMock
) -> None:
"""Function raises CancelledError when cancel flag is set."""
from aiocache import caches
cache = caches.get("default")
@cancellable
async def slow_func(context: RequestContext) -> str:
await asyncio.sleep(1.0)
return "should not reach"
await cache.set(f"cancel:{mock_context.task_id}", True)
with pytest.raises(asyncio.CancelledError):
await slow_func(mock_context)
@pytest.mark.asyncio
async def test_cleanup_removes_cancel_flag(self, mock_context: MagicMock) -> None:
"""Cancel flag is cleaned up after execution."""
from aiocache import caches
cache = caches.get("default")
@cancellable
async def quick_func(context: RequestContext) -> str:
return "done"
await quick_func(mock_context)
flag = await cache.get(f"cancel:{mock_context.task_id}")
assert flag is None
@pytest.mark.asyncio
async def test_extracts_context_from_kwargs(self, mock_context: MagicMock) -> None:
"""Context can be passed as keyword argument."""
@cancellable
async def my_func(value: int, context: RequestContext | None = None) -> int:
return value + 1
result = await my_func(10, context=mock_context)
assert result == 11
class TestExecute:
"""Tests for the execute function."""
@pytest.mark.asyncio
async def test_successful_execution(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute completes successfully and enqueues completed task."""
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
await execute(mock_agent, mock_context, mock_event_queue)
mock_agent.aexecute_task.assert_called_once()
mock_event_queue.enqueue_event.assert_called_once()
assert mock_bus.emit.call_count == 2
@pytest.mark.asyncio
async def test_emits_started_event(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskStartedEvent."""
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
await execute(mock_agent, mock_context, mock_event_queue)
first_call = mock_bus.emit.call_args_list[0]
event = first_call[0][1]
assert event.type == "a2a_server_task_started"
assert event.a2a_task_id == mock_context.task_id
assert event.a2a_context_id == mock_context.context_id
@pytest.mark.asyncio
async def test_emits_completed_event(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskCompletedEvent on success."""
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
await execute(mock_agent, mock_context, mock_event_queue)
second_call = mock_bus.emit.call_args_list[1]
event = second_call[0][1]
assert event.type == "a2a_server_task_completed"
assert event.a2a_task_id == mock_context.task_id
assert event.result == "Task completed successfully"
@pytest.mark.asyncio
async def test_emits_failed_event_on_exception(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskFailedEvent on exception."""
mock_agent.aexecute_task = AsyncMock(side_effect=ValueError("Test error"))
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
with pytest.raises(Exception):
await execute(mock_agent, mock_context, mock_event_queue)
failed_call = mock_bus.emit.call_args_list[1]
event = failed_call[0][1]
assert event.type == "a2a_server_task_failed"
assert "Test error" in event.error
@pytest.mark.asyncio
async def test_emits_canceled_event_on_cancellation(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Execute emits A2AServerTaskCanceledEvent on CancelledError."""
mock_agent.aexecute_task = AsyncMock(side_effect=asyncio.CancelledError())
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus") as mock_bus,
):
with pytest.raises(asyncio.CancelledError):
await execute(mock_agent, mock_context, mock_event_queue)
canceled_call = mock_bus.emit.call_args_list[1]
event = canceled_call[0][1]
assert event.type == "a2a_server_task_canceled"
assert event.a2a_task_id == mock_context.task_id
class TestCancel:
"""Tests for the cancel function."""
@pytest.mark.asyncio
async def test_sets_cancel_flag_in_cache(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel sets the cancel flag in cache."""
from aiocache import caches
cache = caches.get("default")
await cancel(mock_context, mock_event_queue)
flag = await cache.get(f"cancel:{mock_context.task_id}")
assert flag is True
@pytest.mark.asyncio
async def test_enqueues_task_status_update_event(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel enqueues TaskStatusUpdateEvent with canceled state."""
await cancel(mock_context, mock_event_queue)
mock_event_queue.enqueue_event.assert_called_once()
event = mock_event_queue.enqueue_event.call_args[0][0]
assert event.task_id == mock_context.task_id
assert event.context_id == mock_context.context_id
assert event.status.state == TaskState.canceled
assert event.final is True
@pytest.mark.asyncio
async def test_returns_none_when_no_current_task(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel returns None when context has no current_task."""
mock_context.current_task = None
result = await cancel(mock_context, mock_event_queue)
assert result is None
@pytest.mark.asyncio
async def test_returns_updated_task_when_current_task_exists(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel returns updated task when context has current_task."""
current_task = MagicMock(spec=A2ATask)
current_task.status = TaskStatus(state=TaskState.working)
mock_context.current_task = current_task
result = await cancel(mock_context, mock_event_queue)
assert result is current_task
assert result.status.state == TaskState.canceled
@pytest.mark.asyncio
async def test_cleanup_after_cancel(
self,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
) -> None:
"""Cancel flag persists for cancellable decorator to detect."""
from aiocache import caches
cache = caches.get("default")
await cancel(mock_context, mock_event_queue)
flag = await cache.get(f"cancel:{mock_context.task_id}")
assert flag is True
await cache.delete(f"cancel:{mock_context.task_id}")
class TestExecuteAndCancelIntegration:
"""Integration tests for execute and cancel working together."""
@pytest.mark.asyncio
async def test_cancel_stops_running_execute(
self,
mock_agent: MagicMock,
mock_context: MagicMock,
mock_event_queue: AsyncMock,
mock_task: MagicMock,
) -> None:
"""Calling cancel stops a running execute."""
async def slow_task(**kwargs: Any) -> str:
await asyncio.sleep(2.0)
return "should not complete"
mock_agent.aexecute_task = slow_task
with (
patch("crewai.a2a.utils.task.Task", return_value=mock_task),
patch("crewai.a2a.utils.task.crewai_event_bus"),
):
execute_task = asyncio.create_task(
execute(mock_agent, mock_context, mock_event_queue)
)
await asyncio.sleep(0.1)
await cancel(mock_context, mock_event_queue)
with pytest.raises(asyncio.CancelledError):
await execute_task

View File

@@ -1,4 +1,4 @@
"""Unit tests for CrewAgentExecutorFlow.
"""Unit tests for AgentExecutor.
Tests the Flow-based agent executor implementation including state management,
flow methods, routing logic, and error handling.
@@ -8,9 +8,9 @@ from unittest.mock import Mock, patch
import pytest
from crewai.experimental.crew_agent_executor_flow import (
from crewai.experimental.agent_executor import (
AgentReActState,
CrewAgentExecutorFlow,
AgentExecutor,
)
from crewai.agents.parser import AgentAction, AgentFinish
@@ -43,8 +43,8 @@ class TestAgentReActState:
assert state.ask_for_human_input is True
class TestCrewAgentExecutorFlow:
"""Test CrewAgentExecutorFlow class."""
class TestAgentExecutor:
"""Test AgentExecutor class."""
@pytest.fixture
def mock_dependencies(self):
@@ -87,8 +87,8 @@ class TestCrewAgentExecutorFlow:
}
def test_executor_initialization(self, mock_dependencies):
"""Test CrewAgentExecutorFlow initialization."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
"""Test AgentExecutor initialization."""
executor = AgentExecutor(**mock_dependencies)
assert executor.llm == mock_dependencies["llm"]
assert executor.task == mock_dependencies["task"]
@@ -100,9 +100,9 @@ class TestCrewAgentExecutorFlow:
def test_initialize_reasoning(self, mock_dependencies):
"""Test flow entry point."""
with patch.object(
CrewAgentExecutorFlow, "_show_start_logs"
AgentExecutor, "_show_start_logs"
) as mock_show_start:
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.initialize_reasoning()
assert result == "initialized"
@@ -110,7 +110,7 @@ class TestCrewAgentExecutorFlow:
def test_check_max_iterations_not_reached(self, mock_dependencies):
"""Test routing when iterations < max."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.iterations = 5
result = executor.check_max_iterations()
@@ -118,7 +118,7 @@ class TestCrewAgentExecutorFlow:
def test_check_max_iterations_reached(self, mock_dependencies):
"""Test routing when iterations >= max."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.iterations = 10
result = executor.check_max_iterations()
@@ -126,7 +126,7 @@ class TestCrewAgentExecutorFlow:
def test_route_by_answer_type_action(self, mock_dependencies):
"""Test routing for AgentAction."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
@@ -136,7 +136,7 @@ class TestCrewAgentExecutorFlow:
def test_route_by_answer_type_finish(self, mock_dependencies):
"""Test routing for AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thoughts", output="Final answer", text="complete"
)
@@ -146,7 +146,7 @@ class TestCrewAgentExecutorFlow:
def test_continue_iteration(self, mock_dependencies):
"""Test iteration continuation."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.continue_iteration()
@@ -154,8 +154,8 @@ class TestCrewAgentExecutorFlow:
def test_finalize_success(self, mock_dependencies):
"""Test finalize with valid AgentFinish."""
with patch.object(CrewAgentExecutorFlow, "_show_logs") as mock_show_logs:
executor = CrewAgentExecutorFlow(**mock_dependencies)
with patch.object(AgentExecutor, "_show_logs") as mock_show_logs:
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentFinish(
thought="final thinking", output="Done", text="complete"
)
@@ -168,7 +168,7 @@ class TestCrewAgentExecutorFlow:
def test_finalize_failure(self, mock_dependencies):
"""Test finalize skips when given AgentAction instead of AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="search", tool_input="query", text="action text"
)
@@ -181,7 +181,7 @@ class TestCrewAgentExecutorFlow:
def test_format_prompt(self, mock_dependencies):
"""Test prompt formatting."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
inputs = {"input": "test input", "tool_names": "tool1, tool2", "tools": "desc"}
result = executor._format_prompt("Prompt {input} {tool_names} {tools}", inputs)
@@ -192,18 +192,18 @@ class TestCrewAgentExecutorFlow:
def test_is_training_mode_false(self, mock_dependencies):
"""Test training mode detection when not in training."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor._is_training_mode() is False
def test_is_training_mode_true(self, mock_dependencies):
"""Test training mode detection when in training."""
mock_dependencies["crew"]._train = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor._is_training_mode() is True
def test_append_message_to_state(self, mock_dependencies):
"""Test message appending to state."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
initial_count = len(executor.state.messages)
executor._append_message_to_state("test message")
@@ -216,7 +216,7 @@ class TestCrewAgentExecutorFlow:
callback = Mock()
mock_dependencies["step_callback"] = callback
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
answer = AgentFinish(thought="thinking", output="test", text="final")
executor._invoke_step_callback(answer)
@@ -226,14 +226,14 @@ class TestCrewAgentExecutorFlow:
def test_invoke_step_callback_none(self, mock_dependencies):
"""Test step callback when none provided."""
mock_dependencies["step_callback"] = None
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
# Should not raise error
executor._invoke_step_callback(
AgentFinish(thought="thinking", output="test", text="final")
)
@patch("crewai.experimental.crew_agent_executor_flow.handle_output_parser_exception")
@patch("crewai.experimental.agent_executor.handle_output_parser_exception")
def test_recover_from_parser_error(
self, mock_handle_exception, mock_dependencies
):
@@ -242,7 +242,7 @@ class TestCrewAgentExecutorFlow:
mock_handle_exception.return_value = None
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor._last_parser_error = OutputParserError("test error")
initial_iterations = executor.state.iterations
@@ -252,12 +252,12 @@ class TestCrewAgentExecutorFlow:
assert executor.state.iterations == initial_iterations + 1
mock_handle_exception.assert_called_once()
@patch("crewai.experimental.crew_agent_executor_flow.handle_context_length")
@patch("crewai.experimental.agent_executor.handle_context_length")
def test_recover_from_context_length(
self, mock_handle_context, mock_dependencies
):
"""Test recovery from context length error."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor._last_context_error = Exception("context too long")
initial_iterations = executor.state.iterations
@@ -270,16 +270,16 @@ class TestCrewAgentExecutorFlow:
def test_use_stop_words_property(self, mock_dependencies):
"""Test use_stop_words property."""
mock_dependencies["llm"].supports_stop_words.return_value = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor.use_stop_words is True
mock_dependencies["llm"].supports_stop_words.return_value = False
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
assert executor.use_stop_words is False
def test_compatibility_properties(self, mock_dependencies):
"""Test compatibility properties for mixin."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.messages = [{"role": "user", "content": "test"}]
executor.state.iterations = 5
@@ -321,8 +321,8 @@ class TestFlowErrorHandling:
"tools_handler": Mock(),
}
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
@patch("crewai.experimental.agent_executor.get_llm_response")
@patch("crewai.experimental.agent_executor.enforce_rpm_limit")
def test_call_llm_parser_error(
self, mock_enforce_rpm, mock_get_llm, mock_dependencies
):
@@ -332,15 +332,15 @@ class TestFlowErrorHandling:
mock_enforce_rpm.return_value = None
mock_get_llm.side_effect = OutputParserError("parse failed")
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "parser_error"
assert executor._last_parser_error is not None
@patch("crewai.experimental.crew_agent_executor_flow.get_llm_response")
@patch("crewai.experimental.crew_agent_executor_flow.enforce_rpm_limit")
@patch("crewai.experimental.crew_agent_executor_flow.is_context_length_exceeded")
@patch("crewai.experimental.agent_executor.get_llm_response")
@patch("crewai.experimental.agent_executor.enforce_rpm_limit")
@patch("crewai.experimental.agent_executor.is_context_length_exceeded")
def test_call_llm_context_error(
self,
mock_is_context_exceeded,
@@ -353,7 +353,7 @@ class TestFlowErrorHandling:
mock_get_llm.side_effect = Exception("context length")
mock_is_context_exceeded.return_value = True
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
result = executor.call_llm_and_parse()
assert result == "context_error"
@@ -397,10 +397,10 @@ class TestFlowInvoke:
"tools_handler": Mock(),
}
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
@patch.object(AgentExecutor, "kickoff")
@patch.object(AgentExecutor, "_create_short_term_memory")
@patch.object(AgentExecutor, "_create_long_term_memory")
@patch.object(AgentExecutor, "_create_external_memory")
def test_invoke_success(
self,
mock_external_memory,
@@ -410,7 +410,7 @@ class TestFlowInvoke:
mock_dependencies,
):
"""Test successful invoke without human feedback."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
# Mock kickoff to set the final answer in state
def mock_kickoff_side_effect():
@@ -429,10 +429,10 @@ class TestFlowInvoke:
mock_long_term_memory.assert_called_once()
mock_external_memory.assert_called_once()
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(AgentExecutor, "kickoff")
def test_invoke_failure_no_agent_finish(self, mock_kickoff, mock_dependencies):
"""Test invoke fails without AgentFinish."""
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
executor.state.current_answer = AgentAction(
thought="thinking", tool="test", tool_input="test", text="action text"
)
@@ -442,10 +442,10 @@ class TestFlowInvoke:
with pytest.raises(RuntimeError, match="without reaching a final answer"):
executor.invoke(inputs)
@patch.object(CrewAgentExecutorFlow, "kickoff")
@patch.object(CrewAgentExecutorFlow, "_create_short_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_long_term_memory")
@patch.object(CrewAgentExecutorFlow, "_create_external_memory")
@patch.object(AgentExecutor, "kickoff")
@patch.object(AgentExecutor, "_create_short_term_memory")
@patch.object(AgentExecutor, "_create_long_term_memory")
@patch.object(AgentExecutor, "_create_external_memory")
def test_invoke_with_system_prompt(
self,
mock_external_memory,
@@ -459,7 +459,7 @@ class TestFlowInvoke:
"system": "System: {input}",
"user": "User: {input} {tool_names} {tools}",
}
executor = CrewAgentExecutorFlow(**mock_dependencies)
executor = AgentExecutor(**mock_dependencies)
def mock_kickoff_side_effect():
executor.state.current_answer = AgentFinish(

View File

@@ -72,62 +72,53 @@ class ResearchResult(BaseModel):
@pytest.mark.vcr()
@pytest.mark.parametrize("verbose", [True, False])
def test_lite_agent_created_with_correct_parameters(monkeypatch, verbose):
"""Test that LiteAgent is created with the correct parameters when Agent.kickoff() is called."""
def test_agent_kickoff_preserves_parameters(verbose):
"""Test that Agent.kickoff() uses the correct parameters from the Agent."""
# Create a test agent with specific parameters
llm = LLM(model="gpt-4o-mini")
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Final Answer: Test response"
mock_llm.stop = []
from crewai.types.usage_metrics import UsageMetrics
mock_usage_metrics = UsageMetrics(
total_tokens=100,
prompt_tokens=50,
completion_tokens=50,
cached_prompt_tokens=0,
successful_requests=1,
)
mock_llm.get_token_usage_summary.return_value = mock_usage_metrics
custom_tools = [WebSearchTool(), CalculatorTool()]
max_iter = 10
max_execution_time = 300
agent = Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
llm=llm,
llm=mock_llm,
tools=custom_tools,
max_iter=max_iter,
max_execution_time=max_execution_time,
verbose=verbose,
)
# Create a mock to capture the created LiteAgent
created_lite_agent = None
original_lite_agent = LiteAgent
# Call kickoff and verify it works
result = agent.kickoff("Test query")
# Define a mock LiteAgent class that captures its arguments
class MockLiteAgent(original_lite_agent):
def __init__(self, **kwargs):
nonlocal created_lite_agent
created_lite_agent = kwargs
super().__init__(**kwargs)
# Verify the agent was configured correctly
assert agent.role == "Test Agent"
assert agent.goal == "Test Goal"
assert agent.backstory == "Test Backstory"
assert len(agent.tools) == 2
assert isinstance(agent.tools[0], WebSearchTool)
assert isinstance(agent.tools[1], CalculatorTool)
assert agent.max_iter == max_iter
assert agent.verbose == verbose
# Patch the LiteAgent class
monkeypatch.setattr("crewai.agent.core.LiteAgent", MockLiteAgent)
# Call kickoff to create the LiteAgent
agent.kickoff("Test query")
# Verify all parameters were passed correctly
assert created_lite_agent is not None
assert created_lite_agent["role"] == "Test Agent"
assert created_lite_agent["goal"] == "Test Goal"
assert created_lite_agent["backstory"] == "Test Backstory"
assert created_lite_agent["llm"] == llm
assert len(created_lite_agent["tools"]) == 2
assert isinstance(created_lite_agent["tools"][0], WebSearchTool)
assert isinstance(created_lite_agent["tools"][1], CalculatorTool)
assert created_lite_agent["max_iterations"] == max_iter
assert created_lite_agent["max_execution_time"] == max_execution_time
assert created_lite_agent["verbose"] == verbose
assert created_lite_agent["response_format"] is None
# Test with a response_format
class TestResponse(BaseModel):
test_field: str
agent.kickoff("Test query", response_format=TestResponse)
assert created_lite_agent["response_format"] == TestResponse
# Verify kickoff returned a result
assert result is not None
assert result.raw is not None
@pytest.mark.vcr()
@@ -310,7 +301,8 @@ def verify_agent_parent_flow(result, agent, flow):
def test_sets_parent_flow_when_inside_flow():
captured_agent = None
"""Test that an Agent can be created and executed inside a Flow context."""
captured_event = None
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Test response"
@@ -343,15 +335,17 @@ def test_sets_parent_flow_when_inside_flow():
event_received = threading.Event()
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def capture_agent(source, event):
nonlocal captured_agent
captured_agent = source
def capture_event(source, event):
nonlocal captured_event
captured_event = event
event_received.set()
flow.kickoff()
result = flow.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for agent execution event"
assert captured_agent.parent_flow is flow
assert captured_event is not None
assert captured_event.agent_info["role"] == "Test Agent"
assert result is not None
@pytest.mark.vcr()
@@ -373,16 +367,14 @@ def test_guardrail_is_called_using_string():
@crewai_event_bus.on(LLMGuardrailStartedEvent)
def capture_guardrail_started(source, event):
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
assert isinstance(source, Agent)
with condition:
guardrail_events["started"].append(event)
condition.notify()
@crewai_event_bus.on(LLMGuardrailCompletedEvent)
def capture_guardrail_completed(source, event):
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
assert isinstance(source, Agent)
with condition:
guardrail_events["completed"].append(event)
condition.notify()
@@ -683,3 +675,151 @@ def test_agent_kickoff_with_mcp_tools(mock_get_mcp_tools):
# Verify MCP tools were retrieved
mock_get_mcp_tools.assert_called_once_with("https://mcp.exa.ai/mcp?api_key=test_exa_key&profile=research")
# ============================================================================
# Tests for LiteAgent inside Flow (magic auto-async pattern)
# ============================================================================
from crewai.flow.flow import listen
@pytest.mark.vcr()
def test_lite_agent_inside_flow_sync():
"""Test that LiteAgent.kickoff() works magically inside a Flow.
This tests the "magic auto-async" pattern where calling agent.kickoff()
from within a Flow automatically detects the event loop and returns a
coroutine that the Flow framework awaits. Users don't need to use async/await.
"""
# Track execution
execution_log = []
class TestFlow(Flow):
@start()
def run_agent(self):
execution_log.append("flow_started")
agent = Agent(
role="Test Agent",
goal="Answer questions",
backstory="A helpful test assistant",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
# Magic: just call kickoff() normally - it auto-detects Flow context
result = agent.kickoff(messages="What is 2+2? Reply with just the number.")
execution_log.append("agent_completed")
return result
flow = TestFlow()
result = flow.kickoff()
# Verify the flow executed successfully
assert "flow_started" in execution_log
assert "agent_completed" in execution_log
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_inside_flow_with_tools():
"""Test that LiteAgent with tools works correctly inside a Flow."""
class TestFlow(Flow):
@start()
def run_agent_with_tools(self):
agent = Agent(
role="Calculator Agent",
goal="Perform calculations",
backstory="A math expert",
llm=LLM(model="gpt-4o-mini"),
tools=[CalculatorTool()],
verbose=False,
)
result = agent.kickoff(messages="Calculate 10 * 5")
return result
flow = TestFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
assert result.raw is not None
@pytest.mark.vcr()
def test_multiple_agents_in_same_flow():
"""Test that multiple LiteAgents can run sequentially in the same Flow."""
class MultiAgentFlow(Flow):
@start()
def first_step(self):
agent1 = Agent(
role="First Agent",
goal="Greet users",
backstory="A friendly greeter",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
return agent1.kickoff(messages="Say hello")
@listen(first_step)
def second_step(self, first_result):
agent2 = Agent(
role="Second Agent",
goal="Say goodbye",
backstory="A polite farewell agent",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
return agent2.kickoff(messages="Say goodbye")
flow = MultiAgentFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_kickoff_async_inside_flow():
"""Test that Agent.kickoff_async() works correctly from async Flow methods."""
class AsyncAgentFlow(Flow):
@start()
async def async_agent_step(self):
agent = Agent(
role="Async Test Agent",
goal="Answer questions asynchronously",
backstory="An async helper",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
result = await agent.kickoff_async(messages="What is 3+3?")
return result
flow = AsyncAgentFlow()
result = flow.kickoff()
assert result is not None
assert isinstance(result, LiteAgentOutput)
@pytest.mark.vcr()
def test_lite_agent_standalone_still_works():
"""Test that LiteAgent.kickoff() still works normally outside of a Flow.
This verifies that the magic auto-async pattern doesn't break standalone usage
where there's no event loop running.
"""
agent = Agent(
role="Standalone Agent",
goal="Answer questions",
backstory="A helpful assistant",
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
# This should work normally - no Flow, no event loop
result = agent.kickoff(messages="What is 5+5? Reply with just the number.")
assert result is not None
assert isinstance(result, LiteAgentOutput)
assert result.raw is not None

View File

@@ -0,0 +1,119 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. A helpful
test assistant\nYour personal goal is: Answer questions\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 2+2? Reply with just the number.\n\nBegin! This is VERY important to
you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '673'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7b0HjL79y39EkUcMLrRhPFe3XGj\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444914,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: 4\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 136,\n \"completion_tokens\": 13,\n
\ \"total_tokens\": 149,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_8bbc38b4db\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:55 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '857'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '341'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '358'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,255 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Agent. A math
expert\nYour personal goal is: Perform calculations\nYou ONLY have access to
the following tools, and should NEVER make up tools that are not listed here:\n\nTool
Name: calculate\nTool Arguments: {\n \"properties\": {\n \"expression\":
{\n \"title\": \"Expression\",\n \"type\": \"string\"\n }\n },\n \"required\":
[\n \"expression\"\n ],\n \"title\": \"CalculatorToolSchema\",\n \"type\":
\"object\",\n \"additionalProperties\": false\n}\nTool Description: Calculate
the result of a mathematical expression.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate], just the name, exactly as
it''s written.\nAction Input: the input to the action, just a simple JSON object,
enclosed in curly braces, using \" to wrap keys and values.\nObservation: the
result of the action\n```\n\nOnce all necessary information is gathered, return
the following format:\n\n```\nThought: I now know the final answer\nFinal Answer:
the final answer to the original input question\n```"},{"role":"user","content":"\nCurrent
Task: Calculate 10 * 5\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1403'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7avghVPSpszLmlbHpwDQlWDoD6O\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444909,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I need to calculate the expression
10 * 5.\\nAction: calculate\\nAction Input: {\\\"expression\\\":\\\"10 * 5\\\"}\\nObservation:
50\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 291,\n \"completion_tokens\": 33,\n
\ \"total_tokens\": 324,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:49 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '939'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '579'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '598'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Agent. A math
expert\nYour personal goal is: Perform calculations\nYou ONLY have access to
the following tools, and should NEVER make up tools that are not listed here:\n\nTool
Name: calculate\nTool Arguments: {\n \"properties\": {\n \"expression\":
{\n \"title\": \"Expression\",\n \"type\": \"string\"\n }\n },\n \"required\":
[\n \"expression\"\n ],\n \"title\": \"CalculatorToolSchema\",\n \"type\":
\"object\",\n \"additionalProperties\": false\n}\nTool Description: Calculate
the result of a mathematical expression.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate], just the name, exactly as
it''s written.\nAction Input: the input to the action, just a simple JSON object,
enclosed in curly braces, using \" to wrap keys and values.\nObservation: the
result of the action\n```\n\nOnce all necessary information is gathered, return
the following format:\n\n```\nThought: I now know the final answer\nFinal Answer:
the final answer to the original input question\n```"},{"role":"user","content":"\nCurrent
Task: Calculate 10 * 5\n\nBegin! This is VERY important to you, use the tools
available and give your best Final Answer, your job depends on it!\n\nThought:"},{"role":"assistant","content":"Thought:
I need to calculate the expression 10 * 5.\nAction: calculate\nAction Input:
{\"expression\":\"10 * 5\"}\nObservation: The result of 10 * 5 is 50"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1591'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7avDhDZCLvv8v2dh8ZQRrLdci6A\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444909,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"Thought: I now know the final answer.\\nFinal
Answer: 50\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 337,\n \"completion_tokens\": 14,\n
\ \"total_tokens\": 351,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:50 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '864'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '429'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '457'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,119 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Async Test Agent. An async
helper\nYour personal goal is: Answer questions asynchronously\nTo give my best
complete final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 3+3?\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '657'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7atOGxtc4y3oYNI62WiQ0Vogsdv\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444907,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: The sum of 3 + 3 is 6. Therefore, the outcome is that if you add three
and three together, you will arrive at the total of six.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
131,\n \"completion_tokens\": 46,\n \"total_tokens\": 177,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:48 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '983'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '944'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1192'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,119 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Standalone Agent. A helpful
assistant\nYour personal goal is: Answer questions\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
What is 5+5? Reply with just the number.\n\nBegin! This is VERY important to
you, use the tools available and give your best Final Answer, your job depends
on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '674'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7azhPwUHQ0p5tdhxSAmLPoE8UgC\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444913,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: 10\",\n \"refusal\": null,\n \"annotations\": []\n },\n
\ \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n
\ \"usage\": {\n \"prompt_tokens\": 136,\n \"completion_tokens\": 13,\n
\ \"total_tokens\": 149,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:54 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '858'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '455'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '583'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,240 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are First Agent. A friendly
greeter\nYour personal goal is: Greet users\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
hello\n\nBegin! This is VERY important to you, use the tools available and give
your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '632'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7awLGYnYfpKGEeRhKlU90FltH7L\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444910,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: Hello and welcome! It's wonderful to see you here. I hope you're having
a fantastic day. If there's anything you need or if you have any questions,
feel free to ask. I'm here to help and make your experience enjoyable!\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
127,\n \"completion_tokens\": 57,\n \"total_tokens\": 184,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_c4585b5b9c\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:51 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1074'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1019'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1242'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Second Agent. A polite
farewell agent\nYour personal goal is: Say goodbye\nTo give my best complete
final answer to the task respond using the exact following format:\n\nThought:
I now can give a great answer\nFinal Answer: Your final answer must be the great
and the most complete as possible, it must be outcome described.\n\nI MUST use
these formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task:
Say goodbye\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '640'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7ayZre6crr19UyujJE9YbNxDndk\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444912,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: As we conclude our conversation, I just want to take a moment to express
my heartfelt gratitude for your time and engagement. It has been a pleasure
interacting with you. I wish you all the best in your future endeavors. May
your path ahead be filled with success and happiness. Farewell, and until
we meet again!\",\n \"refusal\": null,\n \"annotations\": []\n
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
\ ],\n \"usage\": {\n \"prompt_tokens\": 126,\n \"completion_tokens\":
75,\n \"total_tokens\": 201,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_29330a9688\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:53 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1169'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1298'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1550'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

File diff suppressed because one or more lines are too long

View File

@@ -1,456 +1,528 @@
interactions:
- request:
body: '{"trace_id": "00000000-0000-0000-0000-000000000000", "execution_type": "crew", "user_identifier": null, "execution_context": {"crew_fingerprint": null, "crew_name": "Unknown Crew", "flow_name": null, "crewai_version": "1.3.0", "privacy_level": "standard"}, "execution_metadata": {"expected_duration_estimate": 300, "agent_count": 0, "task_count": 0, "flow_method_count": 0, "execution_started_at": "2025-11-05T22:19:56.074812+00:00"}}'
body: "{\"messages\":[{\"role\":\"system\",\"content\":\"You are Guardrail Agent.
You are a expert at validating the output of a task. By providing effective
feedback if the output is not valid.\\nYour personal goal is: Validate the output
of the task\\nTo give my best complete final answer to the task respond using
the exact following format:\\n\\nThought: I now can give a great answer\\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\\n\\nI MUST use these formats, my job depends
on it!\"},{\"role\":\"user\",\"content\":\"\\nCurrent Task: \\n Ensure
the following task result complies with the given guardrail.\\n\\n Task
result:\\n \\n Lorem Ipsum is simply dummy text of the printing
and typesetting industry. Lorem Ipsum has been the industry's standard dummy
text ever\\n \\n\\n Guardrail:\\n Ensure the result has
less than 10 words\\n\\n Your task:\\n - Confirm if the Task result
complies with the guardrail.\\n - If not, provide clear feedback explaining
what is wrong (e.g., by how much it violates the rule, or what specific part
fails).\\n - Focus only on identifying issues \u2014 do not propose corrections.\\n
\ - If the Task result complies with the guardrail, saying that is valid\\n
\ \\n\\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\\n\\nThought:\"}],\"model\":\"gpt-4o\"}"
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate, zstd
Connection:
- keep-alive
Content-Length:
- '434'
Content-Type:
- application/json
User-Agent:
- CrewAI-CLI/1.3.0
X-Crewai-Version:
- 1.3.0
method: POST
uri: https://app.crewai.com/crewai_plus/api/v1/tracing/batches
response:
body:
string: '{"error":"bad_credentials","message":"Bad credentials"}'
headers:
Connection:
- keep-alive
Content-Length:
- '55'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 05 Nov 2025 22:19:56 GMT
cache-control:
- no-store
content-security-policy:
- 'default-src ''self'' *.app.crewai.com app.crewai.com; script-src ''self'' ''unsafe-inline'' *.app.crewai.com app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts https://www.gstatic.com https://run.pstmn.io https://apis.google.com https://apis.google.com/js/api.js https://accounts.google.com https://accounts.google.com/gsi/client https://cdnjs.cloudflare.com/ajax/libs/normalize/8.0.1/normalize.min.css.map https://*.google.com https://docs.google.com https://slides.google.com https://js.hs-scripts.com https://js.sentry-cdn.com https://browser.sentry-cdn.com https://www.googletagmanager.com https://js-na1.hs-scripts.com https://js.hubspot.com http://js-na1.hs-scripts.com https://bat.bing.com https://cdn.amplitude.com https://cdn.segment.com https://d1d3n03t5zntha.cloudfront.net/ https://descriptusercontent.com https://edge.fullstory.com https://googleads.g.doubleclick.net https://js.hs-analytics.net https://js.hs-banner.com https://js.hsadspixel.net https://js.hscollectedforms.net
https://js.usemessages.com https://snap.licdn.com https://static.cloudflareinsights.com https://static.reo.dev https://www.google-analytics.com https://share.descript.com/; style-src ''self'' ''unsafe-inline'' *.app.crewai.com app.crewai.com https://cdn.jsdelivr.net/npm/apexcharts; img-src ''self'' data: *.app.crewai.com app.crewai.com https://zeus.tools.crewai.com https://dashboard.tools.crewai.com https://cdn.jsdelivr.net https://forms.hsforms.com https://track.hubspot.com https://px.ads.linkedin.com https://px4.ads.linkedin.com https://www.google.com https://www.google.com.br; font-src ''self'' data: *.app.crewai.com app.crewai.com; connect-src ''self'' *.app.crewai.com app.crewai.com https://zeus.tools.crewai.com https://connect.useparagon.com/ https://zeus.useparagon.com/* https://*.useparagon.com/* https://run.pstmn.io https://connect.tools.crewai.com/ https://*.sentry.io https://www.google-analytics.com https://edge.fullstory.com https://rs.fullstory.com https://api.hubspot.com
https://forms.hscollectedforms.net https://api.hubapi.com https://px.ads.linkedin.com https://px4.ads.linkedin.com https://google.com/pagead/form-data/16713662509 https://google.com/ccm/form-data/16713662509 https://www.google.com/ccm/collect https://worker-actionkit.tools.crewai.com https://api.reo.dev; frame-src ''self'' *.app.crewai.com app.crewai.com https://connect.useparagon.com/ https://zeus.tools.crewai.com https://zeus.useparagon.com/* https://connect.tools.crewai.com/ https://docs.google.com https://drive.google.com https://slides.google.com https://accounts.google.com https://*.google.com https://app.hubspot.com/ https://td.doubleclick.net https://www.googletagmanager.com/ https://www.youtube.com https://share.descript.com'
expires:
- '0'
permissions-policy:
- camera=(), microphone=(self), geolocation=()
pragma:
- no-cache
referrer-policy:
- strict-origin-when-cross-origin
strict-transport-security:
- max-age=63072000; includeSubDomains
vary:
- Accept
x-content-type-options:
- nosniff
x-frame-options:
- SAMEORIGIN
x-permitted-cross-domain-policies:
- none
x-request-id:
- 230c6cb5-92c7-448d-8c94-e5548a9f4259
x-runtime:
- '0.073220'
x-xss-protection:
- 1; mode=block
status:
code: 401
message: Unauthorized
- request:
body: '{"messages":[{"role":"system","content":"You are Guardrail Agent. You are a expert at validating the output of a task. By providing effective feedback if the output is not valid.\nYour personal goal is: Validate the output of the task\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\":
[\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\": false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"\n Ensure the following task result complies with the given guardrail.\n\n Task result:\n \n Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry''s standard dummy text ever\n \n\n Guardrail:\n Ensure
the result has less than 10 words\n\n Your task:\n - Confirm if the Task result complies with the guardrail.\n - If not, provide clear feedback explaining what is wrong (e.g., by how much it violates the rule, or what specific part fails).\n - Focus only on identifying issues — do not propose corrections.\n - If the Task result complies with the guardrail, saying that is valid\n "}],"model":"gpt-4o"}'
headers:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '2452'
- '1467'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYg96Riy2RJRxnBHvoROukymP9wvs\",\n \"object\": \"chat.completion\",\n \"created\": 1762381196,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"Thought: I need to check if the task result meets the requirement of having less than 10 words.\\n\\nFinal Answer: {\\n \\\"valid\\\": false,\\n \\\"feedback\\\": \\\"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\\\"\\n}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 489,\n \"completion_tokens\": 61,\n \"total_tokens\": 550,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\"\
: 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yHRYTZi8yzRbcODnKr92keLKCb\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446357,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"The task result provided has more than
10 words. I will count the words to verify this.\\n\\nThe task result is the
following text:\\n\\\"Lorem Ipsum is simply dummy text of the printing and
typesetting industry. Lorem Ipsum has been the industry's standard dummy text
ever\\\"\\n\\nCounting the words:\\n\\n1. Lorem \\n2. Ipsum \\n3. is \\n4.
simply \\n5. dummy \\n6. text \\n7. of \\n8. the \\n9. printing \\n10. and
\\n11. typesetting \\n12. industry. \\n13. Lorem \\n14. Ipsum \\n15. has \\n16.
been \\n17. the \\n18. industry's \\n19. standard \\n20. dummy \\n21. text
\\n22. ever\\n\\nThe total word count is 22.\\n\\nThought: I now can give
a great answer\\nFinal Answer: The task result does not comply with the guardrail.
It contains 22 words, which exceeds the limit of 10 words.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
285,\n \"completion_tokens\": 195,\n \"total_tokens\": 480,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_deacdd5f6f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:19:58 GMT
- Thu, 15 Jan 2026 03:05:59 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=REDACTED; path=/; expires=Wed, 05-Nov-25 22:49:58 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- _cfuvid=REDACTED; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- SET-COOKIE-XXX
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1557'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '2201'
- '2130'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '2401'
- '2147'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29439'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 1.122s
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"{\n \"valid\": false,\n \"feedback\": \"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\"\n}"}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly
adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\":
{\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\":
{\n \"properties\": {\n \"valid\": {\n \"description\":
\"Whether the task output complies with the guardrail\",\n \"title\":
\"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\":
{\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\":
\"null\"\n }\n ],\n \"default\": null,\n \"description\":
\"A feedback about the task output if it is not valid\",\n \"title\":
\"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\":
\"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output.
Ensure the final output does not include any code block markers like ```json
or ```python."},{"role":"user","content":"The task result does not comply with
the guardrail. It contains 22 words, which exceeds the limit of 10 words."}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether
the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A
feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1884'
- '1835'
content-type:
- application/json
cookie:
- __cf_bm=REDACTED; _cfuvid=REDACTED
- COOKIE-XXX
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-helper-method:
- chat.completions.parse
- beta.chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYg98QlZ8NTrQ69676MpXXyCoZJT8\",\n \"object\": \"chat.completion\",\n \"created\": 1762381198,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"{\\\"valid\\\":false,\\\"feedback\\\":\\\"The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words.\\\"}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 374,\n \"completion_tokens\": 32,\n \"total_tokens\": 406,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n\
\ \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yJiPCk4fXuogyT5e8XeGRLCSf8\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446359,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"valid\\\":false,\\\"feedback\\\":\\\"The
task output exceeds the word limit of 10 words by containing 22 words.\\\"}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
363,\n \"completion_tokens\": 25,\n \"total_tokens\": 388,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a0e9480a2f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:19:59 GMT
- Thu, 15 Jan 2026 03:05:59 GMT
Server:
- cloudflare
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '913'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '419'
- '488'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '432'
- '507'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29702'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 596ms
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Guardrail Agent. You are a expert at validating the output of a task. By providing effective feedback if the output is not valid.\nYour personal goal is: Validate the output of the task\n\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\":
[\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\": false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"\n Ensure the following task result complies with the given guardrail.\n\n Task result:\n \n Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry''s standard dummy text ever\n \n\n Guardrail:\n Ensure
the result has less than 500 words\n\n Your task:\n - Confirm if the Task result complies with the guardrail.\n - If not, provide clear feedback explaining what is wrong (e.g., by how much it violates the rule, or what specific part fails).\n - Focus only on identifying issues — do not propose corrections.\n - If the Task result complies with the guardrail, saying that is valid\n "}],"model":"gpt-4o"}'
body: "{\"messages\":[{\"role\":\"system\",\"content\":\"You are Guardrail Agent.
You are a expert at validating the output of a task. By providing effective
feedback if the output is not valid.\\nYour personal goal is: Validate the output
of the task\\nTo give my best complete final answer to the task respond using
the exact following format:\\n\\nThought: I now can give a great answer\\nFinal
Answer: Your final answer must be the great and the most complete as possible,
it must be outcome described.\\n\\nI MUST use these formats, my job depends
on it!\"},{\"role\":\"user\",\"content\":\"\\nCurrent Task: \\n Ensure
the following task result complies with the given guardrail.\\n\\n Task
result:\\n \\n Lorem Ipsum is simply dummy text of the printing
and typesetting industry. Lorem Ipsum has been the industry's standard dummy
text ever\\n \\n\\n Guardrail:\\n Ensure the result has
less than 500 words\\n\\n Your task:\\n - Confirm if the Task
result complies with the guardrail.\\n - If not, provide clear feedback
explaining what is wrong (e.g., by how much it violates the rule, or what specific
part fails).\\n - Focus only on identifying issues \u2014 do not propose
corrections.\\n - If the Task result complies with the guardrail, saying
that is valid\\n \\n\\nBegin! This is VERY important to you, use the
tools available and give your best Final Answer, your job depends on it!\\n\\nThought:\"}],\"model\":\"gpt-4o\"}"
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '2453'
- '1468'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYgBMV6fu7EvV2BqzMdJaKyLAg1WW\",\n \"object\": \"chat.completion\",\n \"created\": 1762381336,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"Thought: I now can give a great answer\\nFinal Answer: {\\\"valid\\\": true, \\\"feedback\\\": null}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 489,\n \"completion_tokens\": 23,\n \"total_tokens\": 512,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\"\
: \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yKa0rmi2YoTLpyXt9hjeLt2rTI\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446360,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"First, I'll count the number of words
in the Task result to ensure it complies with the guardrail. \\n\\nThe Task
result is: \\\"Lorem Ipsum is simply dummy text of the printing and typesetting
industry. Lorem Ipsum has been the industry's standard dummy text ever.\\\"\\n\\nBy
counting the words: \\n1. Lorem\\n2. Ipsum\\n3. is\\n4. simply\\n5. dummy\\n6.
text\\n7. of\\n8. the\\n9. printing\\n10. and\\n11. typesetting\\n12. industry\\n13.
Lorem\\n14. Ipsum\\n15. has\\n16. been\\n17. the\\n18. industry's\\n19. standard\\n20.
dummy\\n21. text\\n22. ever\\n\\nThere are 22 words total in the Task result.\\n\\nI
need to verify if the count of 22 words is less than the guardrail limit of
500 words.\\n\\nThought: I now can give a great answer\\nFinal Answer: The
Task result complies with the guardrail as it contains 22 words, which is
less than the 500-word limit. Therefore, the output is valid.\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
285,\n \"completion_tokens\": 227,\n \"total_tokens\": 512,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_deacdd5f6f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:22:16 GMT
- Thu, 15 Jan 2026 03:06:02 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=REDACTED; path=/; expires=Wed, 05-Nov-25 22:52:16 GMT; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- _cfuvid=REDACTED; path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
- SET-COOKIE-XXX
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '1668'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '327'
- '2502'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '372'
- '2522'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29438'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 1.124s
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\": {\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\": {\n \"properties\": {\n \"valid\": {\n \"description\": \"Whether the task output complies with the guardrail\",\n \"title\": \"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\": {\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\": \"null\"\n }\n ],\n \"default\": null,\n \"description\": \"A feedback about the task output if it is not valid\",\n \"title\": \"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\": \"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output. Ensure the final output does not include any code block markers like ```json or ```python."},{"role":"user","content":"{\"valid\": true, \"feedback\": null}"}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
body: '{"messages":[{"role":"system","content":"Ensure your final answer strictly
adheres to the following OpenAPI schema: {\n \"type\": \"json_schema\",\n \"json_schema\":
{\n \"name\": \"LLMGuardrailResult\",\n \"strict\": true,\n \"schema\":
{\n \"properties\": {\n \"valid\": {\n \"description\":
\"Whether the task output complies with the guardrail\",\n \"title\":
\"Valid\",\n \"type\": \"boolean\"\n },\n \"feedback\":
{\n \"anyOf\": [\n {\n \"type\": \"string\"\n },\n {\n \"type\":
\"null\"\n }\n ],\n \"default\": null,\n \"description\":
\"A feedback about the task output if it is not valid\",\n \"title\":
\"Feedback\"\n }\n },\n \"required\": [\n \"valid\",\n \"feedback\"\n ],\n \"title\":
\"LLMGuardrailResult\",\n \"type\": \"object\",\n \"additionalProperties\":
false\n }\n }\n}\n\nDo not include the OpenAPI schema in the final output.
Ensure the final output does not include any code block markers like ```json
or ```python."},{"role":"user","content":"The Task result complies with the
guardrail as it contains 22 words, which is less than the 500-word limit. Therefore,
the output is valid."}],"model":"gpt-4o","response_format":{"type":"json_schema","json_schema":{"schema":{"properties":{"valid":{"description":"Whether
the task output complies with the guardrail","title":"Valid","type":"boolean"},"feedback":{"anyOf":[{"type":"string"},{"type":"null"}],"description":"A
feedback about the task output if it is not valid","title":"Feedback"}},"required":["valid","feedback"],"title":"LLMGuardrailResult","type":"object","additionalProperties":false},"name":"LLMGuardrailResult","strict":true}},"stream":false}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- gzip, deflate, zstd
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1762'
- '1864'
content-type:
- application/json
cookie:
- __cf_bm=REDACTED; _cfuvid=REDACTED
- COOKIE-XXX
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.109.1
x-stainless-arch:
- arm64
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-helper-method:
- chat.completions.parse
- beta.chat.completions.parse
x-stainless-lang:
- python
x-stainless-os:
- MacOS
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.109.1
- 1.83.0
x-stainless-read-timeout:
- '600'
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.9
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CYgBMU20R45qGGaLN6vNAmW1NR4R6\",\n \"object\": \"chat.completion\",\n \"created\": 1762381336,\n \"model\": \"gpt-4o-2024-08-06\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"{\\\"valid\\\":true,\\\"feedback\\\":null}\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 347,\n \"completion_tokens\": 9,\n \"total_tokens\": 356,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_cbf1785567\"\n}\n"
string: "{\n \"id\": \"chatcmpl-Cy7yMAjNYSCz2foZPEcSVCuapzF8y\",\n \"object\":
\"chat.completion\",\n \"created\": 1768446362,\n \"model\": \"gpt-4o-2024-08-06\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"{\\\"valid\\\":true,\\\"feedback\\\":null}\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
369,\n \"completion_tokens\": 9,\n \"total_tokens\": 378,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_a0e9480a2f\"\n}\n"
headers:
CF-RAY:
- REDACTED-RAY
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 05 Nov 2025 22:22:17 GMT
- Thu, 15 Jan 2026 03:06:03 GMT
Server:
- cloudflare
Strict-Transport-Security:
- max-age=31536000; includeSubDomains; preload
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- X-Request-ID
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
content-length:
- '837'
openai-organization:
- user-hortuttj2f3qtmxyik2zxf4q
- OPENAI-ORG-XXX
openai-processing-ms:
- '1081'
- '413'
openai-project:
- proj_fL4UBWR1CMpAAdgzaSKqsVvA
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1241'
- '650'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- '500'
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- '30000'
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- '499'
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- '29478'
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- 120ms
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- 1.042s
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- req_REDACTED
- X-REQUEST-ID-XXX
status:
code: 200
message: OK

View File

@@ -185,8 +185,8 @@ def test_task_guardrail_process_output(task_output):
result = guardrail(task_output)
assert result[0] is False
assert result[1] == "The task result contains more than 10 words, violating the guardrail. The text provided contains about 21 words."
# Check that feedback is provided (wording varies by LLM)
assert result[1] and len(result[1]) > 0
guardrail = LLMGuardrail(
description="Ensure the result has less than 500 words", llm=LLM(model="gpt-4o")

View File

@@ -348,11 +348,11 @@ def test_agent_emits_execution_error_event(base_agent, base_task):
error_message = "Error happening while sending prompt to model."
base_agent.max_retry_limit = 0
with patch.object(
CrewAgentExecutor, "invoke", wraps=base_agent.agent_executor.invoke
) as invoke_mock:
invoke_mock.side_effect = Exception(error_message)
# Patch at the class level since agent_executor is created lazily
with patch.object(
CrewAgentExecutor, "invoke", side_effect=Exception(error_message)
):
with pytest.raises(Exception): # noqa: B017
base_agent.execute_task(
task=base_task,

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.8.0"
__version__ = "1.8.1"

View File

@@ -117,7 +117,7 @@ show_error_codes = true
warn_unused_ignores = true
python_version = "3.12"
exclude = "(?x)(^lib/crewai/src/crewai/cli/templates/ | ^lib/crewai/tests/ | ^lib/crewai-tools/tests/)"
plugins = ["pydantic.mypy", "crewai.mypy"]
plugins = ["pydantic.mypy"]
[tool.bandit]