Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
9af03058fe fix: skip signal handler registration in non-main thread
When CrewAI is initialized from a non-main thread (e.g., in Streamlit,
Flask, Django, Jupyter), the telemetry module was printing multiple
ValueError tracebacks for each signal handler registration attempt.

This fix adds a proactive main thread check in _register_shutdown_handlers()
before attempting signal registration. If not in the main thread, a debug
message is logged and signal handler registration is skipped.

Fixes #4289

Co-Authored-By: João <joao@crewai.com>
2026-01-27 19:45:52 +00:00
73 changed files with 1485 additions and 5264 deletions

View File

@@ -370,8 +370,7 @@
"pages": [
"en/enterprise/features/traces",
"en/enterprise/features/webhook-streaming",
"en/enterprise/features/hallucination-guardrail",
"en/enterprise/features/flow-hitl-management"
"en/enterprise/features/hallucination-guardrail"
]
},
{
@@ -824,8 +823,7 @@
"pages": [
"pt-BR/enterprise/features/traces",
"pt-BR/enterprise/features/webhook-streaming",
"pt-BR/enterprise/features/hallucination-guardrail",
"pt-BR/enterprise/features/flow-hitl-management"
"pt-BR/enterprise/features/hallucination-guardrail"
]
},
{
@@ -1289,8 +1287,7 @@
"pages": [
"ko/enterprise/features/traces",
"ko/enterprise/features/webhook-streaming",
"ko/enterprise/features/hallucination-guardrail",
"ko/enterprise/features/flow-hitl-management"
"ko/enterprise/features/hallucination-guardrail"
]
},
{

View File

@@ -1,563 +0,0 @@
---
title: "Flow HITL Management"
description: "Enterprise-grade human review for Flows with email-first notifications, routing rules, and auto-response capabilities"
icon: "users-gear"
mode: "wide"
---
<Note>
Flow HITL Management features require the `@human_feedback` decorator, available in **CrewAI version 1.8.0 or higher**. These features apply specifically to **Flows**, not Crews.
</Note>
CrewAI Enterprise provides a comprehensive Human-in-the-Loop (HITL) management system for Flows that transforms AI workflows into collaborative human-AI processes. The platform uses an **email-first architecture** that enables anyone with an email address to respond to review requests—no platform account required.
## Overview
<CardGroup cols={3}>
<Card title="Email-First Design" icon="envelope">
Responders can reply directly to notification emails to provide feedback
</Card>
<Card title="Flexible Routing" icon="route">
Route requests to specific emails based on method patterns or flow state
</Card>
<Card title="Auto-Response" icon="clock">
Configure automatic fallback responses when no human replies in time
</Card>
</CardGroup>
### Key Benefits
- **Simple mental model**: Email addresses are universal; no need to manage platform users or roles
- **External responders**: Anyone with an email can respond, even non-platform users
- **Dynamic assignment**: Pull assignee email directly from flow state (e.g., `sales_rep_email`)
- **Reduced configuration**: Fewer settings to configure, faster time to value
- **Email as primary channel**: Most users prefer responding via email over logging into a dashboard
## Setting Up Human Review Points in Flows
Configure human review checkpoints within your Flows using the `@human_feedback` decorator. When execution reaches a review point, the system pauses, notifies the assignee via email, and waits for a response.
```python
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
class ContentApprovalFlow(Flow):
@start()
def generate_content(self):
# AI generates content
return "Generated marketing copy for Q1 campaign..."
@listen(generate_content)
@human_feedback(
message="Please review this content for brand compliance:",
emit=["approved", "rejected", "needs_revision"],
)
def review_content(self, content):
return content
@listen("approved")
def publish_content(self, result: HumanFeedbackResult):
print(f"Publishing approved content. Reviewer notes: {result.feedback}")
@listen("rejected")
def archive_content(self, result: HumanFeedbackResult):
print(f"Content rejected. Reason: {result.feedback}")
@listen("needs_revision")
def revise_content(self, result: HumanFeedbackResult):
print(f"Revision requested: {result.feedback}")
```
For complete implementation details, see the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide.
### Decorator Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `message` | `str` | The message displayed to the human reviewer |
| `emit` | `list[str]` | Valid response options (displayed as buttons in UI) |
## Platform Configuration
Access HITL configuration from: **Deployment → Settings → Human in the Loop Configuration**
<Frame>
<img src="/images/enterprise/hitl-settings-overview.png" alt="HITL Configuration Settings" />
</Frame>
### Email Notifications
Toggle to enable or disable email notifications for HITL requests.
| Setting | Default | Description |
|---------|---------|-------------|
| Email Notifications | Enabled | Send emails when feedback is requested |
<Note>
When disabled, responders must use the dashboard UI or you must configure webhooks for custom notification systems.
</Note>
### SLA Target
Set a target response time for tracking and metrics purposes.
| Setting | Description |
|---------|-------------|
| SLA Target (minutes) | Target response time. Used for dashboard metrics and SLA tracking |
Leave empty to disable SLA tracking.
## Email Notifications & Responses
The HITL system uses an email-first architecture where responders can reply directly to notification emails.
### How Email Responses Work
<Steps>
<Step title="Notification Sent">
When a HITL request is created, an email is sent to the assigned responder with the review content and context.
</Step>
<Step title="Reply-To Address">
The email includes a special reply-to address with a signed token for authentication.
</Step>
<Step title="User Replies">
The responder simply replies to the email with their feedback—no login required.
</Step>
<Step title="Token Validation">
The platform receives the reply, verifies the signed token, and matches the sender email.
</Step>
<Step title="Flow Resumes">
The feedback is recorded and the flow continues with the human's input.
</Step>
</Steps>
### Response Format
Responders can reply with:
- **Emit option**: If the reply matches an `emit` option (e.g., "approved"), it's used directly
- **Free-form text**: Any text response is passed to the flow as feedback
- **Plain text**: The first line of the reply body is used as feedback
### Confirmation Emails
After processing a reply, the responder receives a confirmation email indicating whether the feedback was successfully submitted or if an error occurred.
### Email Token Security
- Tokens are cryptographically signed for security
- Tokens expire after 7 days
- Sender email must match the token's authorized email
- Confirmation/error emails are sent after processing
## Routing Rules
Route HITL requests to specific email addresses based on method patterns.
<Frame>
<img src="/images/enterprise/hitl-settings-routing-rules.png" alt="HITL Routing Rules Configuration" />
</Frame>
### Rule Structure
```json
{
"name": "Approvals to Finance",
"match": {
"method_name": "approve_*"
},
"assign_to_email": "finance@company.com",
"assign_from_input": "manager_email"
}
```
### Matching Patterns
| Pattern | Description | Example Match |
|---------|-------------|---------------|
| `approve_*` | Wildcard (any chars) | `approve_payment`, `approve_vendor` |
| `review_?` | Single char | `review_a`, `review_1` |
| `validate_payment` | Exact match | `validate_payment` only |
### Assignment Priority
1. **Dynamic assignment** (`assign_from_input`): If configured, pulls email from flow state
2. **Static email** (`assign_to_email`): Falls back to configured email
3. **Deployment creator**: If no rule matches, the deployment creator's email is used
### Dynamic Assignment Example
If your flow state contains `{"sales_rep_email": "alice@company.com"}`, configure:
```json
{
"name": "Route to Sales Rep",
"match": {
"method_name": "review_*"
},
"assign_from_input": "sales_rep_email"
}
```
The request will be assigned to `alice@company.com` automatically.
<Tip>
**Use Case**: Pull the assignee from your CRM, database, or previous flow step to dynamically route reviews to the right person.
</Tip>
## Auto-Response
Automatically respond to HITL requests if no human responds within a timeout. This ensures flows don't hang indefinitely.
### Configuration
| Setting | Description |
|---------|-------------|
| Enabled | Toggle to enable auto-response |
| Timeout (minutes) | Time to wait before auto-responding |
| Default Outcome | The response value (must match an `emit` option) |
<Frame>
<img src="/images/enterprise/hitl-settings-auto-respond.png" alt="HITL Auto-Response Configuration" />
</Frame>
### Use Cases
- **SLA compliance**: Ensure flows don't hang indefinitely
- **Default approval**: Auto-approve low-risk requests after timeout
- **Graceful degradation**: Continue with a safe default when reviewers are unavailable
<Warning>
Use auto-response carefully. Only enable it for non-critical reviews where a default response is acceptable.
</Warning>
## Review Process
### Dashboard Interface
The HITL review interface provides a clean, focused experience for reviewers:
- **Markdown Rendering**: Rich formatting for review content with syntax highlighting
- **Context Panel**: View flow state, execution history, and related information
- **Feedback Input**: Provide detailed feedback and comments with your decision
- **Quick Actions**: One-click emit option buttons with optional comments
<Frame>
<img src="/images/enterprise/hitl-list-pending-feedbacks.png" alt="HITL Pending Requests List" />
</Frame>
### Response Methods
Reviewers can respond via three channels:
| Method | Description |
|--------|-------------|
| **Email Reply** | Reply directly to the notification email |
| **Dashboard** | Use the Enterprise dashboard UI |
| **API/Webhook** | Programmatic response via API |
### History & Audit Trail
Every HITL interaction is tracked with a complete timeline:
- Decision history (approve/reject/revise)
- Reviewer identity and timestamp
- Feedback and comments provided
- Response method (email/dashboard/API)
- Response time metrics
## Analytics & Monitoring
Track HITL performance with comprehensive analytics.
### Performance Dashboard
<Frame>
<img src="/images/enterprise/hitl-metrics.png" alt="HITL Metrics Dashboard" />
</Frame>
<CardGroup cols={2}>
<Card title="Response Times" icon="stopwatch">
Monitor average and median response times by reviewer or flow.
</Card>
<Card title="Volume Trends" icon="chart-bar">
Analyze review volume patterns to optimize team capacity.
</Card>
<Card title="Decision Distribution" icon="chart-pie">
View approval/rejection rates across different review types.
</Card>
<Card title="SLA Tracking" icon="chart-line">
Track percentage of reviews completed within SLA targets.
</Card>
</CardGroup>
### Audit & Compliance
Enterprise-ready audit capabilities for regulatory requirements:
- Complete decision history with timestamps
- Reviewer identity verification
- Immutable audit logs
- Export capabilities for compliance reporting
## Common Use Cases
<AccordionGroup>
<Accordion title="Security Reviews" icon="shield-halved">
**Use Case**: Internal security questionnaire automation with human validation
- AI generates responses to security questionnaires
- Security team reviews and validates accuracy via email
- Approved responses are compiled into final submission
- Full audit trail for compliance
</Accordion>
<Accordion title="Content Approval" icon="file-lines">
**Use Case**: Marketing content requiring legal/brand review
- AI generates marketing copy or social media content
- Route to brand team email for voice/tone review
- Automatic publishing upon approval
</Accordion>
<Accordion title="Financial Approvals" icon="money-bill">
**Use Case**: Expense reports, contract terms, budget allocations
- AI pre-processes and categorizes financial requests
- Route based on amount thresholds using dynamic assignment
- Maintain complete audit trail for financial compliance
</Accordion>
<Accordion title="Dynamic Assignment from CRM" icon="database">
**Use Case**: Route reviews to account owners from your CRM
- Flow fetches account owner email from CRM
- Store email in flow state (e.g., `account_owner_email`)
- Use `assign_from_input` to route to the right person automatically
</Accordion>
<Accordion title="Quality Assurance" icon="magnifying-glass">
**Use Case**: AI output validation before customer delivery
- AI generates customer-facing content or responses
- QA team reviews via email notification
- Feedback loops improve AI performance over time
</Accordion>
</AccordionGroup>
## Webhooks API
When your Flows pause for human feedback, you can configure webhooks to send request data to your own application. This enables:
- Building custom approval UIs
- Integrating with internal tools (Jira, ServiceNow, custom dashboards)
- Routing approvals to third-party systems
- Mobile app notifications
- Automated decision systems
<Frame>
<img src="/images/enterprise/hitl-settings-webhook.png" alt="HITL Webhook Configuration" />
</Frame>
### Configuring Webhooks
<Steps>
<Step title="Navigate to Settings">
Go to your **Deployment** → **Settings** → **Human in the Loop**
</Step>
<Step title="Expand Webhooks Section">
Click to expand the **Webhooks** configuration
</Step>
<Step title="Add Your Webhook URL">
Enter your webhook URL (must be HTTPS in production)
</Step>
<Step title="Save Configuration">
Click **Save Configuration** to activate
</Step>
</Steps>
You can configure multiple webhooks. Each active webhook receives all HITL events.
### Webhook Events
Your endpoint will receive HTTP POST requests for these events:
| Event Type | When Triggered |
|------------|----------------|
| `new_request` | A flow pauses and requests human feedback |
### Webhook Payload
All webhooks receive a JSON payload with this structure:
```json
{
"event": "new_request",
"request": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"flow_id": "flow_abc123",
"method_name": "review_article",
"message": "Please review this article for publication.",
"emit_options": ["approved", "rejected", "request_changes"],
"state": {
"article_id": 12345,
"author": "john@example.com",
"category": "technology"
},
"metadata": {},
"created_at": "2026-01-14T12:00:00Z"
},
"deployment": {
"id": 456,
"name": "Content Review Flow",
"organization_id": 789
},
"callback_url": "https://api.crewai.com/...",
"assigned_to_email": "reviewer@company.com"
}
```
### Responding to Requests
To submit feedback, **POST to the `callback_url`** included in the webhook payload.
```http
POST {callback_url}
Content-Type: application/json
{
"feedback": "Approved. Great article!",
"source": "my_custom_app"
}
```
### Security
<Info>
All webhook requests are cryptographically signed using HMAC-SHA256 to ensure authenticity and prevent tampering.
</Info>
#### Webhook Security
- **HMAC-SHA256 signatures**: Every webhook includes a cryptographic signature
- **Per-webhook secrets**: Each webhook has its own unique signing secret
- **Encrypted at rest**: Signing secrets are encrypted in our database
- **Timestamp verification**: Prevents replay attacks
#### Signature Headers
Each webhook request includes these headers:
| Header | Description |
|--------|-------------|
| `X-Signature` | HMAC-SHA256 signature: `sha256=<hex_digest>` |
| `X-Timestamp` | Unix timestamp when the request was signed |
#### Verification
Verify by computing:
```python
import hmac
import hashlib
expected = hmac.new(
signing_secret.encode(),
f"{timestamp}.{payload}".encode(),
hashlib.sha256
).hexdigest()
if hmac.compare_digest(expected, signature):
# Valid signature
```
### Error Handling
Your webhook endpoint should return a 2xx status code to acknowledge receipt:
| Your Response | Our Behavior |
|---------------|--------------|
| 2xx | Webhook delivered successfully |
| 4xx/5xx | Logged as failed, no retry |
| Timeout (30s) | Logged as failed, no retry |
## Security & RBAC
### Dashboard Access
HITL access is controlled at the deployment level:
| Permission | Capability |
|------------|------------|
| `manage_human_feedback` | Configure HITL settings, view all requests |
| `respond_to_human_feedback` | Respond to requests, view assigned requests |
### Email Response Authorization
For email replies:
1. The reply-to token encodes the authorized email
2. Sender email must match the token's email
3. Token must not be expired (7-day default)
4. Request must still be pending
### Audit Trail
All HITL actions are logged:
- Request creation
- Assignment changes
- Response submission (with source: dashboard/email/API)
- Flow resume status
## Troubleshooting
### Emails Not Sending
1. Check "Email Notifications" is enabled in configuration
2. Verify routing rules match the method name
3. Verify assignee email is valid
4. Check deployment creator fallback if no routing rules match
### Email Replies Not Processing
1. Check token hasn't expired (7-day default)
2. Verify sender email matches assigned email
3. Ensure request is still pending (not already responded)
### Flow Not Resuming
1. Check request status in dashboard
2. Verify callback URL is accessible
3. Ensure deployment is still running
## Best Practices
<Tip>
**Start Simple**: Begin with email notifications to deployment creator, then add routing rules as your workflows mature.
</Tip>
1. **Use Dynamic Assignment**: Pull assignee emails from your flow state for flexible routing.
2. **Configure Auto-Response**: Set up a fallback for non-critical reviews to prevent flows from hanging.
3. **Monitor Response Times**: Use analytics to identify bottlenecks and optimize your review process.
4. **Keep Review Messages Clear**: Write clear, actionable messages in the `@human_feedback` decorator.
5. **Test Email Flow**: Send test requests to verify email delivery before going to production.
## Related Resources
<CardGroup cols={2}>
<Card title="Human Feedback in Flows" icon="code" href="/en/learn/human-feedback-in-flows">
Implementation guide for the `@human_feedback` decorator
</Card>
<Card title="Flow HITL Workflow Guide" icon="route" href="/en/enterprise/guides/human-in-the-loop">
Step-by-step guide for setting up HITL workflows
</Card>
<Card title="RBAC Configuration" icon="shield-check" href="/en/enterprise/features/rbac">
Configure role-based access control for your organization
</Card>
<Card title="Webhook Streaming" icon="bolt" href="/en/enterprise/features/webhook-streaming">
Set up real-time event notifications
</Card>
</CardGroup>

View File

@@ -5,54 +5,9 @@ icon: "user-check"
mode: "wide"
---
Human-In-The-Loop (HITL) is a powerful approach that combines artificial intelligence with human expertise to enhance decision-making and improve task outcomes. This guide shows you how to implement HITL within CrewAI Enterprise.
Human-In-The-Loop (HITL) is a powerful approach that combines artificial intelligence with human expertise to enhance decision-making and improve task outcomes. This guide shows you how to implement HITL within CrewAI.
## HITL Approaches in CrewAI
CrewAI offers two approaches for implementing human-in-the-loop workflows:
| Approach | Best For | Version |
|----------|----------|---------|
| **Flow-based** (`@human_feedback` decorator) | Production with Enterprise UI, email-first workflows, full platform features | **1.8.0+** |
| **Webhook-based** | Custom integrations, external systems (Slack, Teams, etc.), legacy setups | All versions |
## Flow-Based HITL with Enterprise Platform
<Note>
The `@human_feedback` decorator requires **CrewAI version 1.8.0 or higher**.
</Note>
When using the `@human_feedback` decorator in your Flows, CrewAI Enterprise provides an **email-first HITL system** that enables anyone with an email address to respond to review requests:
<CardGroup cols={2}>
<Card title="Email-First Design" icon="envelope">
Responders receive email notifications and can reply directly—no login required.
</Card>
<Card title="Dashboard Review" icon="desktop">
Review and respond to HITL requests in the Enterprise dashboard when preferred.
</Card>
<Card title="Flexible Routing" icon="route">
Route requests to specific emails based on method patterns or pull from flow state.
</Card>
<Card title="Auto-Response" icon="clock">
Configure automatic fallback responses when no human replies within the timeout.
</Card>
</CardGroup>
### Key Benefits
- **External responders**: Anyone with an email can respond, even non-platform users
- **Dynamic assignment**: Pull assignee email from flow state (e.g., `account_owner_email`)
- **Simple configuration**: Email-based routing is easier to set up than user/role management
- **Deployment creator fallback**: If no routing rule matches, the deployment creator is notified
<Tip>
For implementation details on the `@human_feedback` decorator, see the [Human Feedback in Flows](/en/learn/human-feedback-in-flows) guide.
</Tip>
## Setting Up Webhook-Based HITL Workflows
For custom integrations with external systems like Slack, Microsoft Teams, or your own applications, you can use the webhook-based approach:
## Setting Up HITL Workflows
<Steps>
<Step title="Configure Your Task">
@@ -144,14 +99,3 @@ HITL workflows are particularly valuable for:
- Sensitive or high-stakes operations
- Creative tasks requiring human judgment
- Compliance and regulatory reviews
## Learn More
<CardGroup cols={2}>
<Card title="Flow HITL Management" icon="users-gear" href="/en/enterprise/features/flow-hitl-management">
Explore the full Enterprise Flow HITL platform capabilities including email notifications, routing rules, auto-response, and analytics.
</Card>
<Card title="Human Feedback in Flows" icon="code" href="/en/learn/human-feedback-in-flows">
Implementation guide for the `@human_feedback` decorator in your Flows.
</Card>
</CardGroup>

View File

@@ -151,9 +151,3 @@ HITL workflows are particularly valuable for:
- Sensitive or high-stakes operations
- Creative tasks requiring human judgment
- Compliance and regulatory reviews
## Enterprise Features
<Card title="Flow HITL Management Platform" icon="users-gear" href="/en/enterprise/features/flow-hitl-management">
CrewAI Enterprise provides a comprehensive HITL management system for Flows with in-platform review, responder assignment, permissions, escalation policies, SLA management, dynamic routing, and full analytics. [Learn more →](/en/enterprise/features/flow-hitl-management)
</Card>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 251 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 263 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 405 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 156 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 83 KiB

View File

@@ -1,563 +0,0 @@
---
title: "Flow HITL 관리"
description: "이메일 우선 알림, 라우팅 규칙 및 자동 응답 기능을 갖춘 Flow용 엔터프라이즈급 인간 검토"
icon: "users-gear"
mode: "wide"
---
<Note>
Flow HITL 관리 기능은 `@human_feedback` 데코레이터가 필요하며, **CrewAI 버전 1.8.0 이상**에서 사용할 수 있습니다. 이 기능은 Crew가 아닌 **Flow**에만 적용됩니다.
</Note>
CrewAI Enterprise는 AI 워크플로우를 협업적인 인간-AI 프로세스로 전환하는 Flow용 포괄적인 Human-in-the-Loop(HITL) 관리 시스템을 제공합니다. 플랫폼은 **이메일 우선 아키텍처**를 사용하여 이메일 주소가 있는 누구나 플랫폼 계정 없이도 검토 요청에 응답할 수 있습니다.
## 개요
<CardGroup cols={3}>
<Card title="이메일 우선 설계" icon="envelope">
응답자가 알림 이메일에 직접 회신하여 피드백 제공 가능
</Card>
<Card title="유연한 라우팅" icon="route">
메서드 패턴 또는 Flow 상태에 따라 특정 이메일로 요청 라우팅
</Card>
<Card title="자동 응답" icon="clock">
시간 내에 인간이 응답하지 않을 경우 자동 대체 응답 구성
</Card>
</CardGroup>
### 주요 이점
- **간단한 멘탈 모델**: 이메일 주소는 보편적이며 플랫폼 사용자나 역할을 관리할 필요 없음
- **외부 응답자**: 플랫폼 사용자가 아니어도 이메일이 있는 누구나 응답 가능
- **동적 할당**: Flow 상태에서 직접 담당자 이메일 가져오기 (예: `sales_rep_email`)
- **간소화된 구성**: 설정할 항목이 적어 더 빠르게 가치 실현
- **이메일이 주요 채널**: 대부분의 사용자는 대시보드 로그인보다 이메일로 응답하는 것을 선호
## Flow에서 인간 검토 포인트 설정
`@human_feedback` 데코레이터를 사용하여 Flow 내에 인간 검토 체크포인트를 구성합니다. 실행이 검토 포인트에 도달하면 시스템이 일시 중지되고, 담당자에게 이메일로 알리며, 응답을 기다립니다.
```python
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
class ContentApprovalFlow(Flow):
@start()
def generate_content(self):
# AI가 콘텐츠 생성
return "Q1 캠페인용 마케팅 카피 생성..."
@listen(generate_content)
@human_feedback(
message="브랜드 준수를 위해 이 콘텐츠를 검토해 주세요:",
emit=["approved", "rejected", "needs_revision"],
)
def review_content(self, content):
return content
@listen("approved")
def publish_content(self, result: HumanFeedbackResult):
print(f"승인된 콘텐츠 게시 중. 검토자 노트: {result.feedback}")
@listen("rejected")
def archive_content(self, result: HumanFeedbackResult):
print(f"콘텐츠 거부됨. 사유: {result.feedback}")
@listen("needs_revision")
def revise_content(self, result: HumanFeedbackResult):
print(f"수정 요청: {result.feedback}")
```
완전한 구현 세부 사항은 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
### 데코레이터 파라미터
| 파라미터 | 유형 | 설명 |
|---------|------|------|
| `message` | `str` | 인간 검토자에게 표시되는 메시지 |
| `emit` | `list[str]` | 유효한 응답 옵션 (UI에서 버튼으로 표시) |
## 플랫폼 구성
HITL 구성에 접근: **배포** → **설정** → **Human in the Loop 구성**
<Frame>
<img src="/images/enterprise/hitl-settings-overview.png" alt="HITL 구성 설정" />
</Frame>
### 이메일 알림
HITL 요청에 대한 이메일 알림을 활성화하거나 비활성화하는 토글입니다.
| 설정 | 기본값 | 설명 |
|-----|-------|------|
| 이메일 알림 | 활성화됨 | 피드백 요청 시 이메일 전송 |
<Note>
비활성화되면 응답자는 대시보드 UI를 사용하거나 커스텀 알림 시스템을 위해 webhook을 구성해야 합니다.
</Note>
### SLA 목표
추적 및 메트릭 목적으로 목표 응답 시간을 설정합니다.
| 설정 | 설명 |
|-----|------|
| SLA 목표 (분) | 목표 응답 시간. 대시보드 메트릭 및 SLA 추적에 사용 |
SLA 추적을 비활성화하려면 비워 두세요.
## 이메일 알림 및 응답
HITL 시스템은 응답자가 알림 이메일에 직접 회신할 수 있는 이메일 우선 아키텍처를 사용합니다.
### 이메일 응답 작동 방식
<Steps>
<Step title="알림 전송">
HITL 요청이 생성되면 검토 콘텐츠와 컨텍스트가 포함된 이메일이 할당된 응답자에게 전송됩니다.
</Step>
<Step title="Reply-To 주소">
이메일에는 인증을 위한 서명된 토큰이 포함된 특별한 reply-to 주소가 있습니다.
</Step>
<Step title="사용자 회신">
응답자는 이메일에 피드백으로 회신하면 됩니다—로그인 필요 없음.
</Step>
<Step title="토큰 검증">
플랫폼이 회신을 받고, 서명된 토큰을 확인하고, 발신자 이메일을 매칭합니다.
</Step>
<Step title="Flow 재개">
피드백이 기록되고 인간의 입력으로 Flow가 계속됩니다.
</Step>
</Steps>
### 응답 형식
응답자는 다음과 같이 회신할 수 있습니다:
- **Emit 옵션**: 회신이 `emit` 옵션과 일치하면 (예: "approved") 직접 사용됨
- **자유 형식 텍스트**: 모든 텍스트 응답이 피드백으로 Flow에 전달됨
- **일반 텍스트**: 회신 본문의 첫 번째 줄이 피드백으로 사용됨
### 확인 이메일
회신을 처리한 후 응답자는 피드백이 성공적으로 제출되었는지 또는 오류가 발생했는지 나타내는 확인 이메일을 받습니다.
### 이메일 토큰 보안
- 토큰은 보안을 위해 암호화 서명됨
- 토큰은 7일 후 만료됨
- 발신자 이메일은 토큰의 인증된 이메일과 일치해야 함
- 처리 후 확인/오류 이메일 전송됨
## 라우팅 규칙
메서드 패턴에 따라 HITL 요청을 특정 이메일 주소로 라우팅합니다.
<Frame>
<img src="/images/enterprise/hitl-settings-routing-rules.png" alt="HITL 라우팅 규칙 구성" />
</Frame>
### 규칙 구조
```json
{
"name": "재무팀으로 승인",
"match": {
"method_name": "approve_*"
},
"assign_to_email": "finance@company.com",
"assign_from_input": "manager_email"
}
```
### 매칭 패턴
| 패턴 | 설명 | 매칭 예시 |
|-----|------|----------|
| `approve_*` | 와일드카드 (모든 문자) | `approve_payment`, `approve_vendor` |
| `review_?` | 단일 문자 | `review_a`, `review_1` |
| `validate_payment` | 정확히 일치 | `validate_payment`만 |
### 할당 우선순위
1. **동적 할당** (`assign_from_input`): 구성된 경우 Flow 상태에서 이메일 가져옴
2. **정적 이메일** (`assign_to_email`): 구성된 이메일로 대체
3. **배포 생성자**: 규칙이 일치하지 않으면 배포 생성자의 이메일이 사용됨
### 동적 할당 예제
Flow 상태에 `{"sales_rep_email": "alice@company.com"}`이 포함된 경우:
```json
{
"name": "영업 담당자에게 라우팅",
"match": {
"method_name": "review_*"
},
"assign_from_input": "sales_rep_email"
}
```
요청이 자동으로 `alice@company.com`에 할당됩니다.
<Tip>
**사용 사례**: CRM, 데이터베이스 또는 이전 Flow 단계에서 담당자를 가져와 적합한 사람에게 검토를 동적으로 라우팅하세요.
</Tip>
## 자동 응답
시간 내에 인간이 응답하지 않으면 HITL 요청에 자동으로 응답합니다. 이를 통해 Flow가 무한정 중단되지 않도록 합니다.
### 구성
| 설정 | 설명 |
|-----|------|
| 활성화됨 | 자동 응답 활성화 토글 |
| 타임아웃 (분) | 자동 응답 전 대기 시간 |
| 기본 결과 | 응답 값 (`emit` 옵션과 일치해야 함) |
<Frame>
<img src="/images/enterprise/hitl-settings-auto-respond.png" alt="HITL 자동 응답 구성" />
</Frame>
### 사용 사례
- **SLA 준수**: Flow가 무한정 중단되지 않도록 보장
- **기본 승인**: 타임아웃 후 저위험 요청 자동 승인
- **우아한 저하**: 검토자가 없을 때 안전한 기본값으로 계속
<Warning>
자동 응답을 신중하게 사용하세요. 기본 응답이 허용되는 중요하지 않은 검토에만 활성화하세요.
</Warning>
## 검토 프로세스
### 대시보드 인터페이스
HITL 검토 인터페이스는 검토자에게 깔끔하고 집중된 경험을 제공합니다:
- **마크다운 렌더링**: 구문 강조가 포함된 풍부한 형식의 검토 콘텐츠
- **컨텍스트 패널**: Flow 상태, 실행 기록 및 관련 정보 보기
- **피드백 입력**: 결정과 함께 상세한 피드백 및 코멘트 제공
- **빠른 작업**: 선택적 코멘트가 있는 원클릭 emit 옵션 버튼
<Frame>
<img src="/images/enterprise/hitl-list-pending-feedbacks.png" alt="HITL 대기 중인 요청 목록" />
</Frame>
### 응답 방법
검토자는 세 가지 채널을 통해 응답할 수 있습니다:
| 방법 | 설명 |
|-----|------|
| **이메일 회신** | 알림 이메일에 직접 회신 |
| **대시보드** | Enterprise 대시보드 UI 사용 |
| **API/Webhook** | API를 통한 프로그래밍 방식 응답 |
### 기록 및 감사 추적
모든 HITL 상호작용은 완전한 타임라인으로 추적됩니다:
- 결정 기록 (승인/거부/수정)
- 검토자 신원 및 타임스탬프
- 제공된 피드백 및 코멘트
- 응답 방법 (이메일/대시보드/API)
- 응답 시간 메트릭
## 분석 및 모니터링
포괄적인 분석으로 HITL 성능을 추적합니다.
### 성능 대시보드
<Frame>
<img src="/images/enterprise/hitl-metrics.png" alt="HITL 메트릭 대시보드" />
</Frame>
<CardGroup cols={2}>
<Card title="응답 시간" icon="stopwatch">
검토자 또는 Flow별 평균 및 중앙값 응답 시간 모니터링.
</Card>
<Card title="볼륨 트렌드" icon="chart-bar">
팀 용량 최적화를 위한 검토 볼륨 패턴 분석.
</Card>
<Card title="결정 분포" icon="chart-pie">
다양한 검토 유형에 대한 승인/거부 비율 보기.
</Card>
<Card title="SLA 추적" icon="chart-line">
SLA 목표 내에 완료된 검토 비율 추적.
</Card>
</CardGroup>
### 감사 및 규정 준수
규제 요구 사항을 위한 엔터프라이즈급 감사 기능:
- 타임스탬프가 있는 완전한 결정 기록
- 검토자 신원 확인
- 불변 감사 로그
- 규정 준수 보고를 위한 내보내기 기능
## 일반적인 사용 사례
<AccordionGroup>
<Accordion title="보안 검토" icon="shield-halved">
**사용 사례**: 인간 검증이 포함된 내부 보안 설문지 자동화
- AI가 보안 설문지에 대한 응답 생성
- 보안팀이 이메일로 정확성 검토 및 검증
- 승인된 응답이 최종 제출물로 편집
- 규정 준수를 위한 완전한 감사 추적
</Accordion>
<Accordion title="콘텐츠 승인" icon="file-lines">
**사용 사례**: 법무/브랜드 검토가 필요한 마케팅 콘텐츠
- AI가 마케팅 카피 또는 소셜 미디어 콘텐츠 생성
- 브랜드팀 이메일로 목소리/톤 검토를 위해 라우팅
- 승인 시 자동 게시
</Accordion>
<Accordion title="재무 승인" icon="money-bill">
**사용 사례**: 경비 보고서, 계약 조건, 예산 배분
- AI가 재무 요청을 사전 처리하고 분류
- 동적 할당을 사용하여 금액 임계값에 따라 라우팅
- 재무 규정 준수를 위한 완전한 감사 추적 유지
</Accordion>
<Accordion title="CRM에서 동적 할당" icon="database">
**사용 사례**: CRM에서 계정 담당자에게 검토 라우팅
- Flow가 CRM에서 계정 담당자 이메일 가져옴
- 이메일을 Flow 상태에 저장 (예: `account_owner_email`)
- `assign_from_input`을 사용하여 적합한 사람에게 자동 라우팅
</Accordion>
<Accordion title="품질 보증" icon="magnifying-glass">
**사용 사례**: 고객 전달 전 AI 출력 검증
- AI가 고객 대면 콘텐츠 또는 응답 생성
- QA팀이 이메일 알림을 통해 검토
- 피드백 루프가 시간이 지남에 따라 AI 성능 개선
</Accordion>
</AccordionGroup>
## Webhook API
Flow가 인간 피드백을 위해 일시 중지되면, 요청 데이터를 자체 애플리케이션으로 보내도록 webhook을 구성할 수 있습니다. 이를 통해 다음이 가능합니다:
- 커스텀 승인 UI 구축
- 내부 도구와 통합 (Jira, ServiceNow, 커스텀 대시보드)
- 타사 시스템으로 승인 라우팅
- 모바일 앱 알림
- 자동화된 결정 시스템
<Frame>
<img src="/images/enterprise/hitl-settings-webhook.png" alt="HITL Webhook 구성" />
</Frame>
### Webhook 구성
<Steps>
<Step title="설정으로 이동">
**배포** → **설정** → **Human in the Loop**으로 이동
</Step>
<Step title="Webhook 섹션 확장">
**Webhooks** 구성을 클릭하여 확장
</Step>
<Step title="Webhook URL 추가">
webhook URL 입력 (프로덕션에서는 HTTPS 필수)
</Step>
<Step title="구성 저장">
**구성 저장**을 클릭하여 활성화
</Step>
</Steps>
여러 webhook을 구성할 수 있습니다. 각 활성 webhook은 모든 HITL 이벤트를 수신합니다.
### Webhook 이벤트
엔드포인트는 다음 이벤트에 대해 HTTP POST 요청을 수신합니다:
| 이벤트 유형 | 트리거 시점 |
|------------|------------|
| `new_request` | Flow가 일시 중지되고 인간 피드백을 요청할 때 |
### Webhook 페이로드
모든 webhook은 다음 구조의 JSON 페이로드를 수신합니다:
```json
{
"event": "new_request",
"request": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"flow_id": "flow_abc123",
"method_name": "review_article",
"message": "이 기사의 게시를 검토해 주세요.",
"emit_options": ["approved", "rejected", "request_changes"],
"state": {
"article_id": 12345,
"author": "john@example.com",
"category": "technology"
},
"metadata": {},
"created_at": "2026-01-14T12:00:00Z"
},
"deployment": {
"id": 456,
"name": "Content Review Flow",
"organization_id": 789
},
"callback_url": "https://api.crewai.com/...",
"assigned_to_email": "reviewer@company.com"
}
```
### 요청에 응답하기
피드백을 제출하려면 webhook 페이로드에 포함된 **`callback_url`로 POST**합니다.
```http
POST {callback_url}
Content-Type: application/json
{
"feedback": "승인됨. 훌륭한 기사입니다!",
"source": "my_custom_app"
}
```
### 보안
<Info>
모든 webhook 요청은 HMAC-SHA256을 사용하여 암호화 서명되어 진위성을 보장하고 변조를 방지합니다.
</Info>
#### Webhook 보안
- **HMAC-SHA256 서명**: 모든 webhook에 암호화 서명이 포함됨
- **Webhook별 시크릿**: 각 webhook은 고유한 서명 시크릿을 가짐
- **저장 시 암호화**: 서명 시크릿은 데이터베이스에서 암호화됨
- **타임스탬프 검증**: 리플레이 공격 방지
#### 서명 헤더
각 webhook 요청에는 다음 헤더가 포함됩니다:
| 헤더 | 설명 |
|------|------|
| `X-Signature` | HMAC-SHA256 서명: `sha256=<hex_digest>` |
| `X-Timestamp` | 요청이 서명된 Unix 타임스탬프 |
#### 검증
다음과 같이 계산하여 검증합니다:
```python
import hmac
import hashlib
expected = hmac.new(
signing_secret.encode(),
f"{timestamp}.{payload}".encode(),
hashlib.sha256
).hexdigest()
if hmac.compare_digest(expected, signature):
# 유효한 서명
```
### 오류 처리
webhook 엔드포인트는 수신 확인을 위해 2xx 상태 코드를 반환해야 합니다:
| 응답 | 동작 |
|------|------|
| 2xx | Webhook 성공적으로 전달됨 |
| 4xx/5xx | 실패로 기록됨, 재시도 없음 |
| 타임아웃 (30초) | 실패로 기록됨, 재시도 없음 |
## 보안 및 RBAC
### 대시보드 접근
HITL 접근은 배포 수준에서 제어됩니다:
| 권한 | 기능 |
|-----|------|
| `manage_human_feedback` | HITL 설정 구성, 모든 요청 보기 |
| `respond_to_human_feedback` | 요청에 응답, 할당된 요청 보기 |
### 이메일 응답 인증
이메일 회신의 경우:
1. reply-to 토큰이 인증된 이메일을 인코딩
2. 발신자 이메일이 토큰의 이메일과 일치해야 함
3. 토큰이 만료되지 않아야 함 (기본 7일)
4. 요청이 여전히 대기 중이어야 함
### 감사 추적
모든 HITL 작업이 기록됩니다:
- 요청 생성
- 할당 변경
- 응답 제출 (소스: 대시보드/이메일/API)
- Flow 재개 상태
## 문제 해결
### 이메일이 전송되지 않음
1. 구성에서 "이메일 알림"이 활성화되어 있는지 확인
2. 라우팅 규칙이 메서드 이름과 일치하는지 확인
3. 담당자 이메일이 유효한지 확인
4. 라우팅 규칙이 일치하지 않는 경우 배포 생성자 대체 확인
### 이메일 회신이 처리되지 않음
1. 토큰이 만료되지 않았는지 확인 (기본 7일)
2. 발신자 이메일이 할당된 이메일과 일치하는지 확인
3. 요청이 여전히 대기 중인지 확인 (아직 응답되지 않음)
### Flow가 재개되지 않음
1. 대시보드에서 요청 상태 확인
2. 콜백 URL에 접근 가능한지 확인
3. 배포가 여전히 실행 중인지 확인
## 모범 사례
<Tip>
**간단하게 시작**: 배포 생성자에게 이메일 알림으로 시작한 다음, 워크플로우가 성숙해지면 라우팅 규칙을 추가하세요.
</Tip>
1. **동적 할당 사용**: 유연한 라우팅을 위해 Flow 상태에서 담당자 이메일을 가져오세요.
2. **자동 응답 구성**: 중요하지 않은 검토에 대해 Flow가 중단되지 않도록 대체를 설정하세요.
3. **응답 시간 모니터링**: 분석을 사용하여 병목 현상을 식별하고 검토 프로세스를 최적화하세요.
4. **검토 메시지를 명확하게 유지**: `@human_feedback` 데코레이터에 명확하고 실행 가능한 메시지를 작성하세요.
5. **이메일 흐름 테스트**: 프로덕션에 가기 전에 테스트 요청을 보내 이메일 전달을 확인하세요.
## 관련 리소스
<CardGroup cols={2}>
<Card title="Flow에서 인간 피드백" icon="code" href="/ko/learn/human-feedback-in-flows">
`@human_feedback` 데코레이터 구현 가이드
</Card>
<Card title="Flow HITL 워크플로우 가이드" icon="route" href="/ko/enterprise/guides/human-in-the-loop">
HITL 워크플로우 설정을 위한 단계별 가이드
</Card>
<Card title="RBAC 구성" icon="shield-check" href="/ko/enterprise/features/rbac">
조직을 위한 역할 기반 접근 제어 구성
</Card>
<Card title="Webhook 스트리밍" icon="bolt" href="/ko/enterprise/features/webhook-streaming">
실시간 이벤트 알림 설정
</Card>
</CardGroup>

View File

@@ -5,54 +5,9 @@ icon: "user-check"
mode: "wide"
---
인간-중심(Human-In-The-Loop, HITL)은 인공지능과 인간 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드는 CrewAI Enterprise 내에서 HITL을 구현하는 방법을 보여줍니다.
인간-중심(Human-In-The-Loop, HITL)은 인공지능과 인간 전문 지식을 결합하여 의사결정을 강화하고 작업 결과를 향상시키는 강력한 접근 방식입니다. 이 가이드는 CrewAI 내에서 HITL을 구현하는 방법을 보여줍니다.
## CrewAI의 HITL 접근 방식
CrewAI는 human-in-the-loop 워크플로우를 구현하기 위한 두 가지 접근 방식을 제공합니다:
| 접근 방식 | 적합한 용도 | 버전 |
|----------|----------|---------|
| **Flow 기반** (`@human_feedback` 데코레이터) | Enterprise UI를 사용한 프로덕션, 이메일 우선 워크플로우, 전체 플랫폼 기능 | **1.8.0+** |
| **Webhook 기반** | 커스텀 통합, 외부 시스템 (Slack, Teams 등), 레거시 설정 | 모든 버전 |
## Enterprise 플랫폼과 Flow 기반 HITL
<Note>
`@human_feedback` 데코레이터는 **CrewAI 버전 1.8.0 이상**이 필요합니다.
</Note>
Flow에서 `@human_feedback` 데코레이터를 사용하면, CrewAI Enterprise는 이메일 주소가 있는 누구나 검토 요청에 응답할 수 있는 **이메일 우선 HITL 시스템**을 제공합니다:
<CardGroup cols={2}>
<Card title="이메일 우선 설계" icon="envelope">
응답자가 이메일 알림을 받고 직접 회신할 수 있습니다—로그인이 필요 없습니다.
</Card>
<Card title="대시보드 검토" icon="desktop">
원할 때 Enterprise 대시보드에서 HITL 요청을 검토하고 응답하세요.
</Card>
<Card title="유연한 라우팅" icon="route">
메서드 패턴에 따라 특정 이메일로 요청을 라우팅하거나 Flow 상태에서 가져오세요.
</Card>
<Card title="자동 응답" icon="clock">
타임아웃 내에 인간이 응답하지 않을 경우 자동 대체 응답을 구성하세요.
</Card>
</CardGroup>
### 주요 이점
- **외부 응답자**: 플랫폼 사용자가 아니어도 이메일이 있는 누구나 응답 가능
- **동적 할당**: Flow 상태에서 담당자 이메일 가져오기 (예: `account_owner_email`)
- **간단한 구성**: 이메일 기반 라우팅은 사용자/역할 관리보다 설정이 쉬움
- **배포 생성자 대체**: 라우팅 규칙이 일치하지 않으면 배포 생성자에게 알림
<Tip>
`@human_feedback` 데코레이터의 구현 세부 사항은 [Flow에서 인간 피드백](/ko/learn/human-feedback-in-flows) 가이드를 참조하세요.
</Tip>
## Webhook 기반 HITL 워크플로 설정
Slack, Microsoft Teams 또는 자체 애플리케이션과 같은 외부 시스템과의 커스텀 통합을 위해 webhook 기반 접근 방식을 사용할 수 있습니다:
## HITL 워크플로 설정
<Steps>
<Step title="작업 구성">
@@ -144,14 +99,3 @@ HITL 워크플로우는 특히 다음과 같은 경우에 유용합니다:
- 민감하거나 위험도가 높은 작업
- 인간의 판단이 필요한 창의적 작업
- 준수 및 규제 검토
## 자세히 알아보기
<CardGroup cols={2}>
<Card title="Flow HITL 관리" icon="users-gear" href="/ko/enterprise/features/flow-hitl-management">
이메일 알림, 라우팅 규칙, 자동 응답 및 분석을 포함한 전체 Enterprise Flow HITL 플랫폼 기능을 살펴보세요.
</Card>
<Card title="Flow에서 인간 피드백" icon="code" href="/ko/learn/human-feedback-in-flows">
Flow에서 `@human_feedback` 데코레이터 구현 가이드.
</Card>
</CardGroup>

View File

@@ -112,9 +112,3 @@ HITL 워크플로우는 다음과 같은 경우에 특히 유용합니다:
- 민감하거나 고위험 작업
- 인간의 판단이 필요한 창의적 과제
- 컴플라이언스 및 규제 검토
## Enterprise 기능
<Card title="Flow HITL 관리 플랫폼" icon="users-gear" href="/ko/enterprise/features/flow-hitl-management">
CrewAI Enterprise는 플랫폼 내 검토, 응답자 할당, 권한, 에스컬레이션 정책, SLA 관리, 동적 라우팅 및 전체 분석을 갖춘 Flow용 포괄적인 HITL 관리 시스템을 제공합니다. [자세히 알아보기 →](/ko/enterprise/features/flow-hitl-management)
</Card>

View File

@@ -1,563 +0,0 @@
---
title: "Gerenciamento HITL para Flows"
description: "Revisão humana de nível empresarial para Flows com notificações por email, regras de roteamento e capacidades de resposta automática"
icon: "users-gear"
mode: "wide"
---
<Note>
Os recursos de gerenciamento HITL para Flows requerem o decorador `@human_feedback`, disponível no **CrewAI versão 1.8.0 ou superior**. Estes recursos aplicam-se especificamente a **Flows**, não a Crews.
</Note>
O CrewAI Enterprise oferece um sistema abrangente de gerenciamento Human-in-the-Loop (HITL) para Flows que transforma fluxos de trabalho de IA em processos colaborativos humano-IA. A plataforma usa uma **arquitetura email-first** que permite que qualquer pessoa com um endereço de email responda a solicitações de revisão—sem necessidade de conta na plataforma.
## Visão Geral
<CardGroup cols={3}>
<Card title="Design Email-First" icon="envelope">
Respondentes podem responder diretamente aos emails de notificação para fornecer feedback
</Card>
<Card title="Roteamento Flexível" icon="route">
Direcione solicitações para emails específicos com base em padrões de método ou estado do flow
</Card>
<Card title="Resposta Automática" icon="clock">
Configure respostas automáticas de fallback quando nenhum humano responder a tempo
</Card>
</CardGroup>
### Principais Benefícios
- **Modelo mental simples**: Endereços de email são universais; não é necessário gerenciar usuários ou funções da plataforma
- **Respondentes externos**: Qualquer pessoa com email pode responder, mesmo não sendo usuário da plataforma
- **Atribuição dinâmica**: Obtenha o email do responsável diretamente do estado do flow (ex: `sales_rep_email`)
- **Configuração reduzida**: Menos configurações para definir, tempo mais rápido para gerar valor
- **Email como canal principal**: A maioria dos usuários prefere responder via email do que fazer login em um dashboard
## Configurando Pontos de Revisão Humana em Flows
Configure checkpoints de revisão humana em seus Flows usando o decorador `@human_feedback`. Quando a execução atinge um ponto de revisão, o sistema pausa, notifica o responsável via email e aguarda uma resposta.
```python
from crewai.flow.flow import Flow, start, listen
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
class ContentApprovalFlow(Flow):
@start()
def generate_content(self):
# IA gera conteúdo
return "Texto de marketing gerado para campanha Q1..."
@listen(generate_content)
@human_feedback(
message="Por favor, revise este conteúdo para conformidade com a marca:",
emit=["approved", "rejected", "needs_revision"],
)
def review_content(self, content):
return content
@listen("approved")
def publish_content(self, result: HumanFeedbackResult):
print(f"Publicando conteúdo aprovado. Notas do revisor: {result.feedback}")
@listen("rejected")
def archive_content(self, result: HumanFeedbackResult):
print(f"Conteúdo rejeitado. Motivo: {result.feedback}")
@listen("needs_revision")
def revise_content(self, result: HumanFeedbackResult):
print(f"Revisão solicitada: {result.feedback}")
```
Para detalhes completos de implementação, consulte o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows).
### Parâmetros do Decorador
| Parâmetro | Tipo | Descrição |
|-----------|------|-----------|
| `message` | `str` | A mensagem exibida para o revisor humano |
| `emit` | `list[str]` | Opções de resposta válidas (exibidas como botões na UI) |
## Configuração da Plataforma
Acesse a configuração HITL em: **Deployment** → **Settings** → **Human in the Loop Configuration**
<Frame>
<img src="/images/enterprise/hitl-settings-overview.png" alt="Configurações HITL" />
</Frame>
### Notificações por Email
Toggle para ativar ou desativar notificações por email para solicitações HITL.
| Configuração | Padrão | Descrição |
|--------------|--------|-----------|
| Notificações por Email | Ativado | Enviar emails quando feedback for solicitado |
<Note>
Quando desativado, os respondentes devem usar a UI do dashboard ou você deve configurar webhooks para sistemas de notificação personalizados.
</Note>
### Meta de SLA
Defina um tempo de resposta alvo para fins de rastreamento e métricas.
| Configuração | Descrição |
|--------------|-----------|
| Meta de SLA (minutos) | Tempo de resposta alvo. Usado para métricas do dashboard e rastreamento de SLA |
Deixe vazio para desativar o rastreamento de SLA.
## Notificações e Respostas por Email
O sistema HITL usa uma arquitetura email-first onde os respondentes podem responder diretamente aos emails de notificação.
### Como Funcionam as Respostas por Email
<Steps>
<Step title="Notificação Enviada">
Quando uma solicitação HITL é criada, um email é enviado ao respondente atribuído com o conteúdo e contexto da revisão.
</Step>
<Step title="Endereço Reply-To">
O email inclui um endereço reply-to especial com um token assinado para autenticação.
</Step>
<Step title="Usuário Responde">
O respondente simplesmente responde ao email com seu feedback—nenhum login necessário.
</Step>
<Step title="Validação do Token">
A plataforma recebe a resposta, verifica o token assinado e corresponde o email do remetente.
</Step>
<Step title="Flow Continua">
O feedback é registrado e o flow continua com a entrada humana.
</Step>
</Steps>
### Formato de Resposta
Respondentes podem responder com:
- **Opção emit**: Se a resposta corresponder a uma opção `emit` (ex: "approved"), ela é usada diretamente
- **Texto livre**: Qualquer resposta de texto é passada para o flow como feedback
- **Texto simples**: A primeira linha do corpo da resposta é usada como feedback
### Emails de Confirmação
Após processar uma resposta, o respondente recebe um email de confirmação indicando se o feedback foi enviado com sucesso ou se ocorreu um erro.
### Segurança do Token de Email
- Tokens são assinados criptograficamente para segurança
- Tokens expiram após 7 dias
- Email do remetente deve corresponder ao email autorizado do token
- Emails de confirmação/erro são enviados após o processamento
## Regras de Roteamento
Direcione solicitações HITL para endereços de email específicos com base em padrões de método.
<Frame>
<img src="/images/enterprise/hitl-settings-routing-rules.png" alt="Configuração de Regras de Roteamento HITL" />
</Frame>
### Estrutura da Regra
```json
{
"name": "Aprovações para Financeiro",
"match": {
"method_name": "approve_*"
},
"assign_to_email": "financeiro@empresa.com",
"assign_from_input": "manager_email"
}
```
### Padrões de Correspondência
| Padrão | Descrição | Exemplo de Correspondência |
|--------|-----------|---------------------------|
| `approve_*` | Wildcard (qualquer caractere) | `approve_payment`, `approve_vendor` |
| `review_?` | Caractere único | `review_a`, `review_1` |
| `validate_payment` | Correspondência exata | apenas `validate_payment` |
### Prioridade de Atribuição
1. **Atribuição dinâmica** (`assign_from_input`): Se configurado, obtém email do estado do flow
2. **Email estático** (`assign_to_email`): Fallback para email configurado
3. **Criador do deployment**: Se nenhuma regra corresponder, o email do criador do deployment é usado
### Exemplo de Atribuição Dinâmica
Se seu estado do flow contém `{"sales_rep_email": "alice@empresa.com"}`, configure:
```json
{
"name": "Direcionar para Representante de Vendas",
"match": {
"method_name": "review_*"
},
"assign_from_input": "sales_rep_email"
}
```
A solicitação será atribuída automaticamente para `alice@empresa.com`.
<Tip>
**Caso de Uso**: Obtenha o responsável do seu CRM, banco de dados ou etapa anterior do flow para direcionar revisões dinamicamente para a pessoa certa.
</Tip>
## Resposta Automática
Responda automaticamente a solicitações HITL se nenhum humano responder dentro do timeout. Isso garante que os flows não fiquem travados indefinidamente.
### Configuração
| Configuração | Descrição |
|--------------|-----------|
| Ativado | Toggle para ativar resposta automática |
| Timeout (minutos) | Tempo de espera antes de responder automaticamente |
| Resultado Padrão | O valor da resposta (deve corresponder a uma opção `emit`) |
<Frame>
<img src="/images/enterprise/hitl-settings-auto-respond.png" alt="Configuração de Resposta Automática HITL" />
</Frame>
### Casos de Uso
- **Conformidade com SLA**: Garante que flows não fiquem travados indefinidamente
- **Aprovação padrão**: Aprove automaticamente solicitações de baixo risco após timeout
- **Degradação graciosa**: Continue com um padrão seguro quando revisores não estiverem disponíveis
<Warning>
Use resposta automática com cuidado. Ative apenas para revisões não críticas onde uma resposta padrão é aceitável.
</Warning>
## Processo de Revisão
### Interface do Dashboard
A interface de revisão HITL oferece uma experiência limpa e focada para revisores:
- **Renderização Markdown**: Formatação rica para conteúdo de revisão com destaque de sintaxe
- **Painel de Contexto**: Visualize estado do flow, histórico de execução e informações relacionadas
- **Entrada de Feedback**: Forneça feedback detalhado e comentários com sua decisão
- **Ações Rápidas**: Botões de opção emit com um clique com comentários opcionais
<Frame>
<img src="/images/enterprise/hitl-list-pending-feedbacks.png" alt="Lista de Solicitações HITL Pendentes" />
</Frame>
### Métodos de Resposta
Revisores podem responder por três canais:
| Método | Descrição |
|--------|-----------|
| **Resposta por Email** | Responda diretamente ao email de notificação |
| **Dashboard** | Use a UI do dashboard Enterprise |
| **API/Webhook** | Resposta programática via API |
### Histórico e Trilha de Auditoria
Toda interação HITL é rastreada com uma linha do tempo completa:
- Histórico de decisões (aprovar/rejeitar/revisar)
- Identidade do revisor e timestamp
- Feedback e comentários fornecidos
- Método de resposta (email/dashboard/API)
- Métricas de tempo de resposta
## Análise e Monitoramento
Acompanhe o desempenho HITL com análises abrangentes.
### Dashboard de Desempenho
<Frame>
<img src="/images/enterprise/hitl-metrics.png" alt="Dashboard de Métricas HITL" />
</Frame>
<CardGroup cols={2}>
<Card title="Tempos de Resposta" icon="stopwatch">
Monitore tempos de resposta médios e medianos por revisor ou flow.
</Card>
<Card title="Tendências de Volume" icon="chart-bar">
Analise padrões de volume de revisão para otimizar capacidade da equipe.
</Card>
<Card title="Distribuição de Decisões" icon="chart-pie">
Visualize taxas de aprovação/rejeição em diferentes tipos de revisão.
</Card>
<Card title="Rastreamento de SLA" icon="chart-line">
Acompanhe a porcentagem de revisões concluídas dentro das metas de SLA.
</Card>
</CardGroup>
### Auditoria e Conformidade
Capacidades de auditoria prontas para empresas para requisitos regulatórios:
- Histórico completo de decisões com timestamps
- Verificação de identidade do revisor
- Logs de auditoria imutáveis
- Capacidades de exportação para relatórios de conformidade
## Casos de Uso Comuns
<AccordionGroup>
<Accordion title="Revisões de Segurança" icon="shield-halved">
**Caso de Uso**: Automação de questionários de segurança internos com validação humana
- IA gera respostas para questionários de segurança
- Equipe de segurança revisa e valida precisão via email
- Respostas aprovadas são compiladas na submissão final
- Trilha de auditoria completa para conformidade
</Accordion>
<Accordion title="Aprovação de Conteúdo" icon="file-lines">
**Caso de Uso**: Conteúdo de marketing que requer revisão legal/marca
- IA gera texto de marketing ou conteúdo de mídia social
- Roteie para email da equipe de marca para revisão de voz/tom
- Publicação automática após aprovação
</Accordion>
<Accordion title="Aprovações Financeiras" icon="money-bill">
**Caso de Uso**: Relatórios de despesas, termos de contrato, alocações de orçamento
- IA pré-processa e categoriza solicitações financeiras
- Roteie com base em limites de valor usando atribuição dinâmica
- Mantenha trilha de auditoria completa para conformidade financeira
</Accordion>
<Accordion title="Atribuição Dinâmica do CRM" icon="database">
**Caso de Uso**: Direcione revisões para proprietários de conta do seu CRM
- Flow obtém email do proprietário da conta do CRM
- Armazene email no estado do flow (ex: `account_owner_email`)
- Use `assign_from_input` para direcionar automaticamente para a pessoa certa
</Accordion>
<Accordion title="Garantia de Qualidade" icon="magnifying-glass">
**Caso de Uso**: Validação de saída de IA antes da entrega ao cliente
- IA gera conteúdo ou respostas voltadas ao cliente
- Equipe de QA revisa via notificação por email
- Loops de feedback melhoram desempenho da IA ao longo do tempo
</Accordion>
</AccordionGroup>
## API de Webhooks
Quando seus Flows pausam para feedback humano, você pode configurar webhooks para enviar dados da solicitação para sua própria aplicação. Isso permite:
- Construir UIs de aprovação personalizadas
- Integrar com ferramentas internas (Jira, ServiceNow, dashboards personalizados)
- Rotear aprovações para sistemas de terceiros
- Notificações em apps mobile
- Sistemas de decisão automatizados
<Frame>
<img src="/images/enterprise/hitl-settings-webhook.png" alt="Configuração de Webhook HITL" />
</Frame>
### Configurando Webhooks
<Steps>
<Step title="Navegue até Configurações">
Vá para **Deployment** → **Settings** → **Human in the Loop**
</Step>
<Step title="Expanda a Seção Webhooks">
Clique para expandir a configuração de **Webhooks**
</Step>
<Step title="Adicione sua URL de Webhook">
Digite sua URL de webhook (deve ser HTTPS em produção)
</Step>
<Step title="Salve a Configuração">
Clique em **Salvar Configuração** para ativar
</Step>
</Steps>
Você pode configurar múltiplos webhooks. Cada webhook ativo recebe todos os eventos HITL.
### Eventos de Webhook
Seu endpoint receberá requisições HTTP POST para estes eventos:
| Tipo de Evento | Quando é Disparado |
|----------------|-------------------|
| `new_request` | Um flow pausa e solicita feedback humano |
### Payload do Webhook
Todos os webhooks recebem um payload JSON com esta estrutura:
```json
{
"event": "new_request",
"request": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"flow_id": "flow_abc123",
"method_name": "review_article",
"message": "Por favor, revise este artigo para publicação.",
"emit_options": ["approved", "rejected", "request_changes"],
"state": {
"article_id": 12345,
"author": "john@example.com",
"category": "technology"
},
"metadata": {},
"created_at": "2026-01-14T12:00:00Z"
},
"deployment": {
"id": 456,
"name": "Content Review Flow",
"organization_id": 789
},
"callback_url": "https://api.crewai.com/...",
"assigned_to_email": "reviewer@company.com"
}
```
### Respondendo a Solicitações
Para enviar feedback, **faça POST para a `callback_url`** incluída no payload do webhook.
```http
POST {callback_url}
Content-Type: application/json
{
"feedback": "Aprovado. Ótimo artigo!",
"source": "my_custom_app"
}
```
### Segurança
<Info>
Todas as requisições de webhook são assinadas criptograficamente usando HMAC-SHA256 para garantir autenticidade e prevenir adulteração.
</Info>
#### Segurança do Webhook
- **Assinaturas HMAC-SHA256**: Todo webhook inclui uma assinatura criptográfica
- **Secrets por webhook**: Cada webhook tem seu próprio secret de assinatura único
- **Criptografado em repouso**: Os secrets de assinatura são criptografados no nosso banco de dados
- **Verificação de timestamp**: Previne ataques de replay
#### Headers de Assinatura
Cada requisição de webhook inclui estes headers:
| Header | Descrição |
|--------|-----------|
| `X-Signature` | Assinatura HMAC-SHA256: `sha256=<hex_digest>` |
| `X-Timestamp` | Timestamp Unix de quando a requisição foi assinada |
#### Verificação
Verifique computando:
```python
import hmac
import hashlib
expected = hmac.new(
signing_secret.encode(),
f"{timestamp}.{payload}".encode(),
hashlib.sha256
).hexdigest()
if hmac.compare_digest(expected, signature):
# Assinatura válida
```
### Tratamento de Erros
Seu endpoint de webhook deve retornar um código de status 2xx para confirmar o recebimento:
| Sua Resposta | Nosso Comportamento |
|--------------|---------------------|
| 2xx | Webhook entregue com sucesso |
| 4xx/5xx | Registrado como falha, sem retry |
| Timeout (30s) | Registrado como falha, sem retry |
## Segurança e RBAC
### Acesso ao Dashboard
O acesso HITL é controlado no nível do deployment:
| Permissão | Capacidade |
|-----------|------------|
| `manage_human_feedback` | Configurar settings HITL, ver todas as solicitações |
| `respond_to_human_feedback` | Responder a solicitações, ver solicitações atribuídas |
### Autorização de Resposta por Email
Para respostas por email:
1. O token reply-to codifica o email autorizado
2. Email do remetente deve corresponder ao email do token
3. Token não deve estar expirado (padrão 7 dias)
4. Solicitação ainda deve estar pendente
### Trilha de Auditoria
Todas as ações HITL são registradas:
- Criação de solicitação
- Mudanças de atribuição
- Submissão de resposta (com fonte: dashboard/email/API)
- Status de retomada do flow
## Solução de Problemas
### Emails Não Enviando
1. Verifique se "Notificações por Email" está ativado na configuração
2. Verifique se as regras de roteamento correspondem ao nome do método
3. Verifique se o email do responsável é válido
4. Verifique o fallback do criador do deployment se nenhuma regra de roteamento corresponder
### Respostas de Email Não Processando
1. Verifique se o token não expirou (padrão 7 dias)
2. Verifique se o email do remetente corresponde ao email atribuído
3. Garanta que a solicitação ainda está pendente (não respondida ainda)
### Flow Não Retomando
1. Verifique o status da solicitação no dashboard
2. Verifique se a URL de callback está acessível
3. Garanta que o deployment ainda está rodando
## Melhores Práticas
<Tip>
**Comece Simples**: Comece com notificações por email para o criador do deployment, depois adicione regras de roteamento conforme seus fluxos de trabalho amadurecem.
</Tip>
1. **Use Atribuição Dinâmica**: Obtenha emails de responsáveis do seu estado do flow para roteamento flexível.
2. **Configure Resposta Automática**: Defina um fallback para revisões não críticas para evitar que flows fiquem travados.
3. **Monitore Tempos de Resposta**: Use análises para identificar gargalos e otimizar seu processo de revisão.
4. **Mantenha Mensagens de Revisão Claras**: Escreva mensagens claras e acionáveis no decorador `@human_feedback`.
5. **Teste o Fluxo de Email**: Envie solicitações de teste para verificar a entrega de email antes de ir para produção.
## Recursos Relacionados
<CardGroup cols={2}>
<Card title="Feedback Humano em Flows" icon="code" href="/pt-BR/learn/human-feedback-in-flows">
Guia de implementação para o decorador `@human_feedback`
</Card>
<Card title="Guia de Workflow HITL para Flows" icon="route" href="/pt-BR/enterprise/guides/human-in-the-loop">
Guia passo a passo para configurar workflows HITL
</Card>
<Card title="Configuração RBAC" icon="shield-check" href="/pt-BR/enterprise/features/rbac">
Configure controle de acesso baseado em função para sua organização
</Card>
<Card title="Streaming de Webhook" icon="bolt" href="/pt-BR/enterprise/features/webhook-streaming">
Configure notificações de eventos em tempo real
</Card>
</CardGroup>

View File

@@ -5,54 +5,9 @@ icon: "user-check"
mode: "wide"
---
Human-In-The-Loop (HITL) é uma abordagem poderosa que combina inteligência artificial com expertise humana para aprimorar a tomada de decisão e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro do CrewAI Enterprise.
Human-In-The-Loop (HITL) é uma abordagem poderosa que combina inteligência artificial com expertise humana para aprimorar a tomada de decisão e melhorar os resultados das tarefas. Este guia mostra como implementar HITL dentro do CrewAI.
## Abordagens HITL no CrewAI
CrewAI oferece duas abordagens para implementar workflows human-in-the-loop:
| Abordagem | Melhor Para | Versão |
|----------|----------|---------|
| **Baseada em Flow** (decorador `@human_feedback`) | Produção com UI Enterprise, workflows email-first, recursos completos da plataforma | **1.8.0+** |
| **Baseada em Webhook** | Integrações customizadas, sistemas externos (Slack, Teams, etc.), configurações legadas | Todas as versões |
## HITL Baseado em Flow com Plataforma Enterprise
<Note>
O decorador `@human_feedback` requer **CrewAI versão 1.8.0 ou superior**.
</Note>
Ao usar o decorador `@human_feedback` em seus Flows, o CrewAI Enterprise oferece um **sistema HITL email-first** que permite que qualquer pessoa com um endereço de email responda a solicitações de revisão:
<CardGroup cols={2}>
<Card title="Design Email-First" icon="envelope">
Respondentes recebem notificações por email e podem responder diretamente—nenhum login necessário.
</Card>
<Card title="Revisão no Dashboard" icon="desktop">
Revise e responda a solicitações HITL no dashboard Enterprise quando preferir.
</Card>
<Card title="Roteamento Flexível" icon="route">
Direcione solicitações para emails específicos com base em padrões de método ou obtenha do estado do flow.
</Card>
<Card title="Resposta Automática" icon="clock">
Configure respostas automáticas de fallback quando nenhum humano responder dentro do timeout.
</Card>
</CardGroup>
### Principais Benefícios
- **Respondentes externos**: Qualquer pessoa com email pode responder, mesmo não sendo usuário da plataforma
- **Atribuição dinâmica**: Obtenha o email do responsável do estado do flow (ex: `account_owner_email`)
- **Configuração simples**: Roteamento baseado em email é mais fácil de configurar do que gerenciamento de usuários/funções
- **Fallback do criador do deployment**: Se nenhuma regra de roteamento corresponder, o criador do deployment é notificado
<Tip>
Para detalhes de implementação do decorador `@human_feedback`, consulte o guia [Feedback Humano em Flows](/pt-BR/learn/human-feedback-in-flows).
</Tip>
## Configurando Workflows HITL Baseados em Webhook
Para integrações customizadas com sistemas externos como Slack, Microsoft Teams ou suas próprias aplicações, você pode usar a abordagem baseada em webhook:
## Configurando Workflows HITL
<Steps>
<Step title="Configure Sua Tarefa">
@@ -144,14 +99,3 @@ Workflows HITL são particularmente valiosos para:
- Operações sensíveis ou de alto risco
- Tarefas criativas que exigem julgamento humano
- Revisões de conformidade e regulatórias
## Saiba Mais
<CardGroup cols={2}>
<Card title="Gerenciamento HITL para Flows" icon="users-gear" href="/pt-BR/enterprise/features/flow-hitl-management">
Explore os recursos completos da plataforma HITL para Flows, incluindo notificações por email, regras de roteamento, resposta automática e análises.
</Card>
<Card title="Feedback Humano em Flows" icon="code" href="/pt-BR/learn/human-feedback-in-flows">
Guia de implementação para o decorador `@human_feedback` em seus Flows.
</Card>
</CardGroup>

View File

@@ -112,9 +112,3 @@ Workflows HITL são particularmente valiosos para:
- Operações sensíveis ou de alto risco
- Tarefas criativas que requerem julgamento humano
- Revisões de conformidade e regulamentação
## Recursos Enterprise
<Card title="Plataforma de Gerenciamento HITL para Flows" icon="users-gear" href="/pt-BR/enterprise/features/flow-hitl-management">
O CrewAI Enterprise oferece um sistema abrangente de gerenciamento HITL para Flows com revisão na plataforma, atribuição de respondentes, permissões, políticas de escalação, gerenciamento de SLA, roteamento dinâmico e análises completas. [Saiba mais →](/pt-BR/enterprise/features/flow-hitl-management)
</Card>

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.9.2"
__version__ = "1.9.0"

View File

@@ -12,7 +12,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.9.2",
"crewai==1.9.0",
"lancedb~=0.5.4",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",

View File

@@ -291,4 +291,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.9.2"
__version__ = "1.9.0"

View File

@@ -1,11 +1,10 @@
"""Crewai Enterprise Tools."""
import json
import os
from typing import Any
import json
import re
from typing import Any, Optional, Union, cast, get_origin
from crewai.tools import BaseTool
from crewai.utilities.pydantic_schema_utils import create_model_from_schema
from pydantic import Field, create_model
import requests
@@ -15,6 +14,77 @@ from crewai_tools.tools.crewai_platform_tools.misc import (
)
class AllOfSchemaAnalyzer:
"""Helper class to analyze and merge allOf schemas."""
def __init__(self, schemas: list[dict[str, Any]]):
self.schemas = schemas
self._explicit_types: list[str] = []
self._merged_properties: dict[str, Any] = {}
self._merged_required: list[str] = []
self._analyze_schemas()
def _analyze_schemas(self) -> None:
"""Analyze all schemas and extract relevant information."""
for schema in self.schemas:
if "type" in schema:
self._explicit_types.append(schema["type"])
# Merge object properties
if schema.get("type") == "object" and "properties" in schema:
self._merged_properties.update(schema["properties"])
if "required" in schema:
self._merged_required.extend(schema["required"])
def has_consistent_type(self) -> bool:
"""Check if all schemas have the same explicit type."""
return len(set(self._explicit_types)) == 1 if self._explicit_types else False
def get_consistent_type(self) -> type[Any]:
"""Get the consistent type if all schemas agree."""
if not self.has_consistent_type():
raise ValueError("No consistent type found")
type_mapping = {
"string": str,
"integer": int,
"number": float,
"boolean": bool,
"array": list,
"object": dict,
"null": type(None),
}
return type_mapping.get(self._explicit_types[0], str)
def has_object_schemas(self) -> bool:
"""Check if any schemas are object types with properties."""
return bool(self._merged_properties)
def get_merged_properties(self) -> dict[str, Any]:
"""Get merged properties from all object schemas."""
return self._merged_properties
def get_merged_required_fields(self) -> list[str]:
"""Get merged required fields from all object schemas."""
return list(set(self._merged_required)) # Remove duplicates
def get_fallback_type(self) -> type[Any]:
"""Get a fallback type when merging fails."""
if self._explicit_types:
# Use the first explicit type
type_mapping = {
"string": str,
"integer": int,
"number": float,
"boolean": bool,
"array": list,
"object": dict,
"null": type(None),
}
return type_mapping.get(self._explicit_types[0], str)
return str
class CrewAIPlatformActionTool(BaseTool):
action_name: str = Field(default="", description="The name of the action")
action_schema: dict[str, Any] = Field(
@@ -27,19 +97,42 @@ class CrewAIPlatformActionTool(BaseTool):
action_name: str,
action_schema: dict[str, Any],
):
parameters = action_schema.get("function", {}).get("parameters", {})
self._model_registry: dict[str, type[Any]] = {}
self._base_name = self._sanitize_name(action_name)
schema_props, required = self._extract_schema_info(action_schema)
field_definitions: dict[str, Any] = {}
for param_name, param_details in schema_props.items():
param_desc = param_details.get("description", "")
is_required = param_name in required
if parameters and parameters.get("properties"):
try:
if "title" not in parameters:
parameters = {**parameters, "title": f"{action_name}Schema"}
if "type" not in parameters:
parameters = {**parameters, "type": "object"}
args_schema = create_model_from_schema(parameters)
field_type = self._process_schema_type(
param_details, self._sanitize_name(param_name).title()
)
except Exception:
args_schema = create_model(f"{action_name}Schema")
field_type = str
field_definitions[param_name] = self._create_field_definition(
field_type, is_required, param_desc
)
if field_definitions:
try:
args_schema = create_model(
f"{self._base_name}Schema", **field_definitions
)
except Exception:
args_schema = create_model(
f"{self._base_name}Schema",
input_text=(str, Field(description="Input for the action")),
)
else:
args_schema = create_model(f"{action_name}Schema")
args_schema = create_model(
f"{self._base_name}Schema",
input_text=(str, Field(description="Input for the action")),
)
super().__init__(
name=action_name.lower().replace(" ", "_"),
@@ -49,12 +142,285 @@ class CrewAIPlatformActionTool(BaseTool):
self.action_name = action_name
self.action_schema = action_schema
def _run(self, **kwargs: Any) -> str:
@staticmethod
def _sanitize_name(name: str) -> str:
name = name.lower().replace(" ", "_")
sanitized = re.sub(r"[^a-zA-Z0-9_]", "", name)
parts = sanitized.split("_")
return "".join(word.capitalize() for word in parts if word)
@staticmethod
def _extract_schema_info(
action_schema: dict[str, Any],
) -> tuple[dict[str, Any], list[str]]:
schema_props = (
action_schema.get("function", {})
.get("parameters", {})
.get("properties", {})
)
required = (
action_schema.get("function", {}).get("parameters", {}).get("required", [])
)
return schema_props, required
def _process_schema_type(self, schema: dict[str, Any], type_name: str) -> type[Any]:
"""
Process a JSON Schema type definition into a Python type.
Handles complex schema constructs like anyOf, oneOf, allOf, enums, arrays, and objects.
"""
# Handle composite schema types (anyOf, oneOf, allOf)
if composite_type := self._process_composite_schema(schema, type_name):
return composite_type
# Handle primitive types and simple constructs
return self._process_primitive_schema(schema, type_name)
def _process_composite_schema(
self, schema: dict[str, Any], type_name: str
) -> type[Any] | None:
"""Process composite schema types: anyOf, oneOf, allOf."""
if "anyOf" in schema:
return self._process_any_of_schema(schema["anyOf"], type_name)
if "oneOf" in schema:
return self._process_one_of_schema(schema["oneOf"], type_name)
if "allOf" in schema:
return self._process_all_of_schema(schema["allOf"], type_name)
return None
def _process_any_of_schema(
self, any_of_types: list[dict[str, Any]], type_name: str
) -> type[Any]:
"""Process anyOf schema - creates Union of possible types."""
is_nullable = any(t.get("type") == "null" for t in any_of_types)
non_null_types = [t for t in any_of_types if t.get("type") != "null"]
if not non_null_types:
return cast(
type[Any], cast(object, str | None)
) # fallback for only-null case
base_type = (
self._process_schema_type(non_null_types[0], type_name)
if len(non_null_types) == 1
else self._create_union_type(non_null_types, type_name, "AnyOf")
)
return base_type | None if is_nullable else base_type # type: ignore[return-value]
def _process_one_of_schema(
self, one_of_types: list[dict[str, Any]], type_name: str
) -> type[Any]:
"""Process oneOf schema - creates Union of mutually exclusive types."""
return (
self._process_schema_type(one_of_types[0], type_name)
if len(one_of_types) == 1
else self._create_union_type(one_of_types, type_name, "OneOf")
)
def _process_all_of_schema(
self, all_of_schemas: list[dict[str, Any]], type_name: str
) -> type[Any]:
"""Process allOf schema - merges schemas that must all be satisfied."""
if len(all_of_schemas) == 1:
return self._process_schema_type(all_of_schemas[0], type_name)
return self._merge_all_of_schemas(all_of_schemas, type_name)
def _create_union_type(
self, schemas: list[dict[str, Any]], type_name: str, prefix: str
) -> type[Any]:
"""Create a Union type from multiple schemas."""
return Union[ # type: ignore # noqa: UP007
tuple(
self._process_schema_type(schema, f"{type_name}{prefix}{i}")
for i, schema in enumerate(schemas)
)
]
def _process_primitive_schema(
self, schema: dict[str, Any], type_name: str
) -> type[Any]:
"""Process primitive schema types: string, number, array, object, etc."""
json_type = schema.get("type", "string")
if "enum" in schema:
return self._process_enum_schema(schema, json_type)
if json_type == "array":
return self._process_array_schema(schema, type_name)
if json_type == "object":
return self._create_nested_model(schema, type_name)
return self._map_json_type_to_python(json_type)
def _process_enum_schema(self, schema: dict[str, Any], json_type: str) -> type[Any]:
"""Process enum schema - currently falls back to base type."""
enum_values = schema["enum"]
if not enum_values:
return self._map_json_type_to_python(json_type)
# For Literal types, we need to pass the values directly, not as a tuple
# This is a workaround since we can't dynamically create Literal types easily
# Fall back to the base JSON type for now
return self._map_json_type_to_python(json_type)
def _process_array_schema(
self, schema: dict[str, Any], type_name: str
) -> type[Any]:
items_schema = schema.get("items", {"type": "string"})
item_type = self._process_schema_type(items_schema, f"{type_name}Item")
return list[item_type] # type: ignore
def _merge_all_of_schemas(
self, schemas: list[dict[str, Any]], type_name: str
) -> type[Any]:
schema_analyzer = AllOfSchemaAnalyzer(schemas)
if schema_analyzer.has_consistent_type():
return schema_analyzer.get_consistent_type()
if schema_analyzer.has_object_schemas():
return self._create_merged_object_model(
schema_analyzer.get_merged_properties(),
schema_analyzer.get_merged_required_fields(),
type_name,
)
return schema_analyzer.get_fallback_type()
def _create_merged_object_model(
self, properties: dict[str, Any], required: list[str], model_name: str
) -> type[Any]:
full_model_name = f"{self._base_name}{model_name}AllOf"
if full_model_name in self._model_registry:
return self._model_registry[full_model_name]
if not properties:
return dict
field_definitions = self._build_field_definitions(
properties, required, model_name
)
try:
merged_model = create_model(full_model_name, **field_definitions)
self._model_registry[full_model_name] = merged_model
return merged_model
except Exception:
return dict
def _build_field_definitions(
self, properties: dict[str, Any], required: list[str], model_name: str
) -> dict[str, Any]:
field_definitions = {}
for prop_name, prop_schema in properties.items():
prop_desc = prop_schema.get("description", "")
is_required = prop_name in required
try:
prop_type = self._process_schema_type(
prop_schema, f"{model_name}{self._sanitize_name(prop_name).title()}"
)
except Exception:
prop_type = str
field_definitions[prop_name] = self._create_field_definition(
prop_type, is_required, prop_desc
)
return field_definitions
def _create_nested_model(
self, schema: dict[str, Any], model_name: str
) -> type[Any]:
full_model_name = f"{self._base_name}{model_name}"
if full_model_name in self._model_registry:
return self._model_registry[full_model_name]
properties = schema.get("properties", {})
required_fields = schema.get("required", [])
if not properties:
return dict
field_definitions = {}
for prop_name, prop_schema in properties.items():
prop_desc = prop_schema.get("description", "")
is_required = prop_name in required_fields
try:
prop_type = self._process_schema_type(
prop_schema, f"{model_name}{self._sanitize_name(prop_name).title()}"
)
except Exception:
prop_type = str
field_definitions[prop_name] = self._create_field_definition(
prop_type, is_required, prop_desc
)
try:
nested_model = create_model(full_model_name, **field_definitions) # type: ignore
self._model_registry[full_model_name] = nested_model
return nested_model
except Exception:
return dict
def _create_field_definition(
self, field_type: type[Any], is_required: bool, description: str
) -> tuple:
if is_required:
return (field_type, Field(description=description))
if get_origin(field_type) is Union:
return (field_type, Field(default=None, description=description))
return (
Optional[field_type], # noqa: UP045
Field(default=None, description=description),
)
def _map_json_type_to_python(self, json_type: str) -> type[Any]:
type_mapping = {
"string": str,
"integer": int,
"number": float,
"boolean": bool,
"array": list,
"object": dict,
"null": type(None),
}
return type_mapping.get(json_type, str)
def _get_required_nullable_fields(self) -> list[str]:
schema_props, required = self._extract_schema_info(self.action_schema)
required_nullable_fields = []
for param_name in required:
param_details = schema_props.get(param_name, {})
if self._is_nullable_type(param_details):
required_nullable_fields.append(param_name)
return required_nullable_fields
def _is_nullable_type(self, schema: dict[str, Any]) -> bool:
if "anyOf" in schema:
return any(t.get("type") == "null" for t in schema["anyOf"])
return schema.get("type") == "null"
def _run(self, **kwargs) -> str:
try:
cleaned_kwargs = {
key: value for key, value in kwargs.items() if value is not None
}
required_nullable_fields = self._get_required_nullable_fields()
for field_name in required_nullable_fields:
if field_name not in cleaned_kwargs:
cleaned_kwargs[field_name] = None
api_url = (
f"{get_platform_api_base_url()}/actions/{self.action_name}/execute"
)
@@ -63,9 +429,7 @@ class CrewAIPlatformActionTool(BaseTool):
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
payload = {
"integration": cleaned_kwargs if cleaned_kwargs else {"_noop": True}
}
payload = cleaned_kwargs
response = requests.post(
url=api_url,
@@ -77,14 +441,7 @@ class CrewAIPlatformActionTool(BaseTool):
data = response.json()
if not response.ok:
if isinstance(data, dict):
error_info = data.get("error", {})
if isinstance(error_info, dict):
error_message = error_info.get("message", json.dumps(data))
else:
error_message = str(error_info)
else:
error_message = str(data)
error_message = data.get("error", {}).get("message", json.dumps(data))
return f"API request failed: {error_message}"
return json.dumps(data, indent=2)

View File

@@ -1,10 +1,5 @@
"""CrewAI platform tool builder for fetching and creating action tools."""
import logging
import os
from types import TracebackType
from typing import Any
import os
from crewai.tools import BaseTool
import requests
@@ -17,29 +12,22 @@ from crewai_tools.tools.crewai_platform_tools.misc import (
)
logger = logging.getLogger(__name__)
class CrewaiPlatformToolBuilder:
"""Builds platform tools from remote action schemas."""
def __init__(
self,
apps: list[str],
) -> None:
):
self._apps = apps
self._actions_schema: dict[str, dict[str, Any]] = {}
self._tools: list[BaseTool] | None = None
self._actions_schema = {} # type: ignore[var-annotated]
self._tools = None
def tools(self) -> list[BaseTool]:
"""Fetch actions and return built tools."""
if self._tools is None:
self._fetch_actions()
self._create_tools()
return self._tools if self._tools is not None else []
def _fetch_actions(self) -> None:
"""Fetch action schemas from the platform API."""
def _fetch_actions(self):
actions_url = f"{get_platform_api_base_url()}/actions"
headers = {"Authorization": f"Bearer {get_platform_integration_token()}"}
@@ -52,8 +40,7 @@ class CrewaiPlatformToolBuilder:
verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true",
)
response.raise_for_status()
except Exception as e:
logger.error(f"Failed to fetch platform tools for apps {self._apps}: {e}")
except Exception:
return
raw_data = response.json()
@@ -64,8 +51,6 @@ class CrewaiPlatformToolBuilder:
for app, action_list in action_categories.items():
if isinstance(action_list, list):
for action in action_list:
if not isinstance(action, dict):
continue
if action_name := action.get("name"):
action_schema = {
"function": {
@@ -79,16 +64,72 @@ class CrewaiPlatformToolBuilder:
}
self._actions_schema[action_name] = action_schema
def _create_tools(self) -> None:
"""Create tool instances from fetched action schemas."""
tools: list[BaseTool] = []
def _generate_detailed_description(
self, schema: dict[str, Any], indent: int = 0
) -> list[str]:
descriptions = []
indent_str = " " * indent
schema_type = schema.get("type", "string")
if schema_type == "object":
properties = schema.get("properties", {})
required_fields = schema.get("required", [])
if properties:
descriptions.append(f"{indent_str}Object with properties:")
for prop_name, prop_schema in properties.items():
prop_desc = prop_schema.get("description", "")
is_required = prop_name in required_fields
req_str = " (required)" if is_required else " (optional)"
descriptions.append(
f"{indent_str} - {prop_name}: {prop_desc}{req_str}"
)
if prop_schema.get("type") == "object":
descriptions.extend(
self._generate_detailed_description(prop_schema, indent + 2)
)
elif prop_schema.get("type") == "array":
items_schema = prop_schema.get("items", {})
if items_schema.get("type") == "object":
descriptions.append(f"{indent_str} Array of objects:")
descriptions.extend(
self._generate_detailed_description(
items_schema, indent + 3
)
)
elif "enum" in items_schema:
descriptions.append(
f"{indent_str} Array of enum values: {items_schema['enum']}"
)
elif "enum" in prop_schema:
descriptions.append(
f"{indent_str} Enum values: {prop_schema['enum']}"
)
return descriptions
def _create_tools(self):
tools = []
for action_name, action_schema in self._actions_schema.items():
function_details = action_schema.get("function", {})
description = function_details.get("description", f"Execute {action_name}")
parameters = function_details.get("parameters", {})
param_descriptions = []
if parameters.get("properties"):
param_descriptions.append("\nDetailed Parameter Structure:")
param_descriptions.extend(
self._generate_detailed_description(parameters)
)
full_description = description + "\n".join(param_descriptions)
tool = CrewAIPlatformActionTool(
description=description,
description=full_description,
action_name=action_name,
action_schema=action_schema,
)
@@ -97,14 +138,8 @@ class CrewaiPlatformToolBuilder:
self._tools = tools
def __enter__(self) -> list[BaseTool]:
"""Enter context manager and return tools."""
def __enter__(self):
return self.tools()
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
"""Exit context manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
pass

View File

@@ -1,3 +1,4 @@
from typing import Union, get_args, get_origin
from unittest.mock import patch, Mock
import os
@@ -6,6 +7,251 @@ from crewai_tools.tools.crewai_platform_tools.crewai_platform_action_tool import
)
class TestSchemaProcessing:
def setup_method(self):
self.base_action_schema = {
"function": {
"parameters": {
"properties": {},
"required": []
}
}
}
def create_test_tool(self, action_name="test_action"):
return CrewAIPlatformActionTool(
description="Test tool",
action_name=action_name,
action_schema=self.base_action_schema
)
def test_anyof_multiple_types(self):
tool = self.create_test_tool()
test_schema = {
"anyOf": [
{"type": "string"},
{"type": "number"},
{"type": "integer"}
]
}
result_type = tool._process_schema_type(test_schema, "TestField")
assert get_origin(result_type) is Union
args = get_args(result_type)
expected_types = (str, float, int)
for expected_type in expected_types:
assert expected_type in args
def test_anyof_with_null(self):
tool = self.create_test_tool()
test_schema = {
"anyOf": [
{"type": "string"},
{"type": "number"},
{"type": "null"}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldNullable")
assert get_origin(result_type) is Union
args = get_args(result_type)
assert type(None) in args
assert str in args
assert float in args
def test_anyof_single_type(self):
tool = self.create_test_tool()
test_schema = {
"anyOf": [
{"type": "string"}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldSingle")
assert result_type is str
def test_oneof_multiple_types(self):
tool = self.create_test_tool()
test_schema = {
"oneOf": [
{"type": "string"},
{"type": "boolean"}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldOneOf")
assert get_origin(result_type) is Union
args = get_args(result_type)
expected_types = (str, bool)
for expected_type in expected_types:
assert expected_type in args
def test_oneof_single_type(self):
tool = self.create_test_tool()
test_schema = {
"oneOf": [
{"type": "integer"}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldOneOfSingle")
assert result_type is int
def test_basic_types(self):
tool = self.create_test_tool()
test_cases = [
({"type": "string"}, str),
({"type": "integer"}, int),
({"type": "number"}, float),
({"type": "boolean"}, bool),
({"type": "array", "items": {"type": "string"}}, list),
]
for schema, expected_type in test_cases:
result_type = tool._process_schema_type(schema, "TestField")
if schema["type"] == "array":
assert get_origin(result_type) is list
else:
assert result_type is expected_type
def test_enum_handling(self):
tool = self.create_test_tool()
test_schema = {
"type": "string",
"enum": ["option1", "option2", "option3"]
}
result_type = tool._process_schema_type(test_schema, "TestFieldEnum")
assert result_type is str
def test_nested_anyof(self):
tool = self.create_test_tool()
test_schema = {
"anyOf": [
{"type": "string"},
{
"anyOf": [
{"type": "integer"},
{"type": "boolean"}
]
}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldNested")
assert get_origin(result_type) is Union
args = get_args(result_type)
assert str in args
if len(args) == 3:
assert int in args
assert bool in args
else:
nested_union = next(arg for arg in args if get_origin(arg) is Union)
nested_args = get_args(nested_union)
assert int in nested_args
assert bool in nested_args
def test_allof_same_types(self):
tool = self.create_test_tool()
test_schema = {
"allOf": [
{"type": "string"},
{"type": "string", "maxLength": 100}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldAllOfSame")
assert result_type is str
def test_allof_object_merge(self):
tool = self.create_test_tool()
test_schema = {
"allOf": [
{
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"}
},
"required": ["name"]
},
{
"type": "object",
"properties": {
"email": {"type": "string"},
"age": {"type": "integer"}
},
"required": ["email"]
}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldAllOfMerged")
# Should create a merged model with all properties
# The implementation might fall back to dict if model creation fails
# Let's just verify it's not a basic scalar type
assert result_type is not str
assert result_type is not int
assert result_type is not bool
# It could be dict (fallback) or a proper model class
assert result_type in (dict, type) or hasattr(result_type, '__name__')
def test_allof_single_schema(self):
"""Test that allOf with single schema works correctly."""
tool = self.create_test_tool()
test_schema = {
"allOf": [
{"type": "boolean"}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldAllOfSingle")
# Should be just bool
assert result_type is bool
def test_allof_mixed_types(self):
tool = self.create_test_tool()
test_schema = {
"allOf": [
{"type": "string"},
{"type": "integer"}
]
}
result_type = tool._process_schema_type(test_schema, "TestFieldAllOfMixed")
assert result_type is str
class TestCrewAIPlatformActionToolVerify:
"""Test suite for SSL verification behavior based on CREWAI_FACTORY environment variable"""

View File

@@ -224,6 +224,43 @@ class TestCrewaiPlatformToolBuilder(unittest.TestCase):
_, kwargs = mock_get.call_args
assert kwargs["params"]["apps"] == ""
def test_detailed_description_generation(self):
builder = CrewaiPlatformToolBuilder(apps=["test"])
complex_schema = {
"type": "object",
"properties": {
"simple_string": {"type": "string", "description": "A simple string"},
"nested_object": {
"type": "object",
"properties": {
"inner_prop": {
"type": "integer",
"description": "Inner property",
}
},
"description": "Nested object",
},
"array_prop": {
"type": "array",
"items": {"type": "string"},
"description": "Array of strings",
},
},
}
descriptions = builder._generate_detailed_description(complex_schema)
assert isinstance(descriptions, list)
assert len(descriptions) > 0
description_text = "\n".join(descriptions)
assert "simple_string" in description_text
assert "nested_object" in description_text
assert "array_prop" in description_text
class TestCrewaiPlatformToolBuilderVerify(unittest.TestCase):
"""Test suite for SSL verification behavior in CrewaiPlatformToolBuilder"""

View File

@@ -49,7 +49,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.9.2",
"crewai-tools==1.9.0",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.9.2"
__version__ = "1.9.0"
_telemetry_submitted = False

View File

@@ -37,8 +37,7 @@ class CrewAgentExecutorMixin:
self.crew
and self.agent
and self.task
and f"Action: {sanitize_tool_name('Delegate work to coworker')}"
not in output.text
and f"Action: {sanitize_tool_name('Delegate work to coworker')}" not in output.text
):
try:
if (
@@ -133,11 +132,10 @@ class CrewAgentExecutorMixin:
and self.crew._long_term_memory
and self.crew._entity_memory is None
):
if self.agent and self.agent.verbose:
self._printer.print(
content="Long term memory is enabled, but entity memory is not enabled. Please configure entity memory or set memory=True to automatically enable it.",
color="bold_yellow",
)
self._printer.print(
content="Long term memory is enabled, but entity memory is not enabled. Please configure entity memory or set memory=True to automatically enable it.",
color="bold_yellow",
)
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input with mode-appropriate messaging.

View File

@@ -28,11 +28,6 @@ from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.hooks.tool_hooks import (
ToolCallHookContext,
get_after_tool_call_hooks,
get_before_tool_call_hooks,
)
from crewai.utilities.agent_utils import (
aget_llm_response,
convert_tools_to_openai_schema,
@@ -206,14 +201,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
try:
formatted_answer = self._invoke_loop()
except AssertionError:
if self.agent.verbose:
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise
if self.ask_for_human_input:
@@ -328,7 +322,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
verbose=self.agent.verbose,
)
break
@@ -343,41 +336,22 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
# breakpoint()
if self.response_model is not None:
try:
if isinstance(answer, BaseModel):
output_json = answer.model_dump_json()
formatted_answer = AgentFinish(
thought="",
output=answer,
text=output_json,
)
else:
self.response_model.model_validate_json(answer)
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
except ValidationError:
# If validation fails, convert BaseModel to JSON string for parsing
answer_str = (
answer.model_dump_json()
if isinstance(answer, BaseModel)
else str(answer)
self.response_model.model_validate_json(answer)
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
except ValidationError:
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
answer, self.use_stop_words
) # type: ignore[assignment]
else:
# When no response_model, answer should be a string
answer_str = str(answer) if not isinstance(answer, str) else answer
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
) # type: ignore[assignment]
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
@@ -420,7 +394,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
iterations=self.iterations,
log_error_after=self.log_error_after,
printer=self._printer,
verbose=self.agent.verbose,
)
except Exception as e:
@@ -435,10 +408,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
verbose=self.agent.verbose,
)
continue
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
@@ -484,7 +456,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
verbose=self.agent.verbose,
)
self._show_logs(formatted_answer)
return formatted_answer
@@ -506,7 +477,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
# Check if the response is a list of tool calls
@@ -538,18 +508,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
if isinstance(answer, BaseModel):
output_json = answer.model_dump_json()
formatted_answer = AgentFinish(
thought="",
output=answer,
text=output_json,
)
self._invoke_step_callback(formatted_answer)
self._append_message(output_json)
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
@@ -572,10 +530,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
verbose=self.agent.verbose,
)
continue
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
@@ -597,23 +554,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
if isinstance(answer, BaseModel):
output_json = answer.model_dump_json()
formatted_answer = AgentFinish(
thought="",
output=answer,
text=output_json,
)
else:
answer_str = answer if isinstance(answer, str) else str(answer)
formatted_answer = AgentFinish(
thought="",
output=answer_str,
text=answer_str,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
@@ -802,42 +749,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
track_delegation_if_needed(func_name, args_dict, self.task)
# Find the structured tool for hook context
structured_tool: CrewStructuredTool | None = None
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
# Execute before_tool_call hooks
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
)
before_hooks = get_before_tool_call_hooks()
try:
for hook in before_hooks:
hook_result = hook(before_hook_context)
if hook_result is False:
hook_blocked = True
break
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in before_tool_call hook: {hook_error}",
color="red",
)
# If hook blocked execution, set result and skip tool execution
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
# Execute the tool (only if not cached, not at max usage, and not blocked by hook)
elif not from_cache and not max_usage_reached:
# Execute the tool (only if not cached and not at max usage)
if not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in available_functions:
try:
@@ -885,29 +798,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
after_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
tool_result=result,
)
after_hooks = get_after_tool_call_hooks()
try:
for after_hook in after_hooks:
after_hook_result = after_hook(after_hook_context)
if after_hook_result is not None:
result = after_hook_result
after_hook_context.tool_result = result
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in after_tool_call hook: {hook_error}",
color="red",
)
# Emit tool usage finished event
crewai_event_bus.emit(
self,
@@ -992,14 +882,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
try:
formatted_answer = await self._ainvoke_loop()
except AssertionError:
if self.agent.verbose:
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise
if self.ask_for_human_input:
@@ -1050,7 +939,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
verbose=self.agent.verbose,
)
break
@@ -1065,41 +953,22 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
if self.response_model is not None:
try:
if isinstance(answer, BaseModel):
output_json = answer.model_dump_json()
formatted_answer = AgentFinish(
thought="",
output=answer,
text=output_json,
)
else:
self.response_model.model_validate_json(answer)
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
except ValidationError:
# If validation fails, convert BaseModel to JSON string for parsing
answer_str = (
answer.model_dump_json()
if isinstance(answer, BaseModel)
else str(answer)
self.response_model.model_validate_json(answer)
formatted_answer = AgentFinish(
thought="",
output=answer,
text=answer,
)
except ValidationError:
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
answer, self.use_stop_words
) # type: ignore[assignment]
else:
# When no response_model, answer should be a string
answer_str = str(answer) if not isinstance(answer, str) else answer
formatted_answer = process_llm_response(
answer_str, self.use_stop_words
) # type: ignore[assignment]
formatted_answer = process_llm_response(answer, self.use_stop_words) # type: ignore[assignment]
if isinstance(formatted_answer, AgentAction):
fingerprint_context = {}
@@ -1141,7 +1010,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
iterations=self.iterations,
log_error_after=self.log_error_after,
printer=self._printer,
verbose=self.agent.verbose,
)
except Exception as e:
@@ -1155,10 +1023,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
verbose=self.agent.verbose,
)
continue
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
@@ -1198,7 +1065,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
messages=self.messages,
llm=self.llm,
callbacks=self.callbacks,
verbose=self.agent.verbose,
)
self._show_logs(formatted_answer)
return formatted_answer
@@ -1220,7 +1086,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
# Check if the response is a list of tool calls
if (
@@ -1251,18 +1116,6 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self._show_logs(formatted_answer)
return formatted_answer
if isinstance(answer, BaseModel):
output_json = answer.model_dump_json()
formatted_answer = AgentFinish(
thought="",
output=answer,
text=output_json,
)
self._invoke_step_callback(formatted_answer)
self._append_message(output_json)
self._show_logs(formatted_answer)
return formatted_answer
# Unexpected response type, treat as final answer
formatted_answer = AgentFinish(
thought="",
@@ -1285,10 +1138,9 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
verbose=self.agent.verbose,
)
continue
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise e
finally:
self.iterations += 1
@@ -1310,23 +1162,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=self.response_model,
executor_context=self,
verbose=self.agent.verbose,
)
if isinstance(answer, BaseModel):
output_json = answer.model_dump_json()
formatted_answer = AgentFinish(
thought="",
output=answer,
text=output_json,
)
else:
answer_str = answer if isinstance(answer, str) else str(answer)
formatted_answer = AgentFinish(
thought="",
output=answer_str,
text=answer_str,
)
formatted_answer = AgentFinish(
thought="",
output=str(answer),
text=str(answer),
)
self._show_logs(formatted_answer)
return formatted_answer
@@ -1437,11 +1279,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
)
if train_iteration is None or not isinstance(train_iteration, int):
if self.agent.verbose:
self._printer.print(
content="Invalid or missing train iteration. Cannot save training data.",
color="red",
)
self._printer.print(
content="Invalid or missing train iteration. Cannot save training data.",
color="red",
)
return
training_handler = CrewTrainingHandler(TRAINING_DATA_FILE)
@@ -1461,14 +1302,13 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
if train_iteration in agent_training_data:
agent_training_data[train_iteration]["improved_output"] = result.output
else:
if self.agent.verbose:
self._printer.print(
content=(
f"No existing training data for agent {agent_id} and iteration "
f"{train_iteration}. Cannot save improved output."
),
color="red",
)
self._printer.print(
content=(
f"No existing training data for agent {agent_id} and iteration "
f"{train_iteration}. Cannot save improved output."
),
color="red",
)
return
# Update the training data and save
@@ -1499,12 +1339,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
Returns:
Final answer after feedback.
"""
output_str = (
formatted_answer.output
if isinstance(formatted_answer.output, str)
else formatted_answer.output.model_dump_json()
)
human_feedback = self._ask_human_input(output_str)
human_feedback = self._ask_human_input(formatted_answer.output)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
@@ -1563,12 +1398,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
output_str = (
answer.output
if isinstance(answer.output, str)
else answer.output.model_dump_json()
)
feedback = self._ask_human_input(output_str)
feedback = self._ask_human_input(answer.output)
return answer

View File

@@ -8,7 +8,6 @@ AgentAction or AgentFinish objects.
from dataclasses import dataclass
from json_repair import repair_json # type: ignore[import-untyped]
from pydantic import BaseModel
from crewai.agents.constants import (
ACTION_INPUT_ONLY_REGEX,
@@ -41,7 +40,7 @@ class AgentFinish:
"""Represents the final answer from an agent."""
thought: str
output: str | BaseModel
output: str
text: str

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.9.2"
"crewai[tools]==1.9.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.9.2"
"crewai[tools]==1.9.0"
]
[project.scripts]

View File

@@ -193,13 +193,7 @@ class Crew(FlowTrackable, BaseModel):
tasks: list[Task] = Field(default_factory=list)
agents: list[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: bool | None = Field(
default=None,
description=(
"Whether to enable verbose logging output. True=always enable, "
"False=always disable, None=check CREWAI_VERBOSE env var (defaults to True if not set)."
),
)
verbose: bool = Field(default=False)
memory: bool = Field(
default=False,
description="If crew should use memory to store memories of it's execution",
@@ -356,8 +350,6 @@ class Crew(FlowTrackable, BaseModel):
@model_validator(mode="after")
def set_private_attrs(self) -> Crew:
"""set private attributes."""
from crewai.utilities.logger_utils import should_enable_verbose
self._cache_handler = CacheHandler()
event_listener = EventListener()
@@ -365,11 +357,6 @@ class Crew(FlowTrackable, BaseModel):
tracing_enabled = should_enable_tracing(override=self.tracing)
set_tracing_enabled(tracing_enabled)
# Determine verbose setting (respects CREWAI_VERBOSE env var)
# Update self.verbose to the resolved boolean value so it can be used
# consistently throughout the class (e.g., in _create_manager_agent)
self.verbose = should_enable_verbose(override=self.verbose)
# Always setup trace listener - actual execution control is via contextvar
trace_listener = TraceCollectionListener()
trace_listener.setup_listeners(crewai_event_bus)

View File

@@ -119,11 +119,14 @@ To enable tracing, do any one of these:
self, content: Text, title: str, style: str = "blue", is_flow: bool = False
) -> None:
"""Print a panel with consistent formatting if verbose is enabled."""
if not self.verbose:
return
panel = self.create_panel(content, title, style)
self.print(panel)
self.print()
if is_flow:
self.print(panel)
self.print()
else:
if self.verbose:
self.print(panel)
self.print()
def handle_crew_status(
self,

View File

@@ -36,12 +36,6 @@ from crewai.hooks.llm_hooks import (
get_after_llm_call_hooks,
get_before_llm_call_hooks,
)
from crewai.hooks.tool_hooks import (
ToolCallHookContext,
get_after_tool_call_hooks,
get_before_tool_call_hooks,
)
from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
enforce_rpm_limit,
@@ -191,8 +185,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._instance_id = str(uuid4())[:8]
self.before_llm_call_hooks: list[BeforeLLMCallHookType] = []
self.after_llm_call_hooks: list[AfterLLMCallHookType] = []
self.before_llm_call_hooks: list[Callable] = []
self.after_llm_call_hooks: list[Callable] = []
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
@@ -305,21 +299,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""Compatibility property for mixin - returns state messages."""
return self._state.messages
@messages.setter
def messages(self, value: list[LLMMessage]) -> None:
"""Set state messages."""
self._state.messages = value
@property
def iterations(self) -> int:
"""Compatibility property for mixin - returns state iterations."""
return self._state.iterations
@iterations.setter
def iterations(self, value: int) -> None:
"""Set state iterations."""
self._state.iterations = value
@start()
def initialize_reasoning(self) -> Literal["initialized"]:
"""Initialize the reasoning flow and emit agent start logs."""
@@ -341,7 +325,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
messages=list(self.state.messages),
llm=self.llm,
callbacks=self.callbacks,
verbose=self.agent.verbose,
)
self.state.current_answer = formatted_answer
@@ -367,7 +350,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=None,
executor_context=self,
verbose=self.agent.verbose,
)
# Parse the LLM response
@@ -403,7 +385,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "context_error"
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise
@listen("continue_reasoning_native")
@@ -438,7 +420,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
from_agent=self.agent,
response_model=None,
executor_context=self,
verbose=self.agent.verbose,
)
# Check if the response is a list of tool calls
@@ -477,7 +458,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
return "context_error"
if e.__class__.__module__.startswith("litellm"):
raise e
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise
@router(call_llm_and_parse)
@@ -596,12 +577,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"content": None,
"tool_calls": tool_calls_to_report,
}
if all(
type(tc).__qualname__ == "Part" for tc in self.state.pending_tool_calls
):
assistant_message["raw_tool_call_parts"] = list(
self.state.pending_tool_calls
)
self.state.messages.append(assistant_message)
# Now execute each tool
@@ -636,12 +611,14 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
# Check if tool has reached max usage count
max_usage_reached = False
if (
original_tool
and original_tool.max_usage_count is not None
and original_tool.current_usage_count >= original_tool.max_usage_count
):
max_usage_reached = True
if original_tool:
if (
hasattr(original_tool, "max_usage_count")
and original_tool.max_usage_count is not None
and original_tool.current_usage_count
>= original_tool.max_usage_count
):
max_usage_reached = True
# Check cache before executing
from_cache = False
@@ -673,38 +650,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
track_delegation_if_needed(func_name, args_dict, self.task)
structured_tool: CrewStructuredTool | None = None
for structured in self.tools or []:
if sanitize_tool_name(structured.name) == func_name:
structured_tool = structured
break
hook_blocked = False
before_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
)
before_hooks = get_before_tool_call_hooks()
try:
for hook in before_hooks:
hook_result = hook(before_hook_context)
if hook_result is False:
hook_blocked = True
break
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in before_tool_call hook: {hook_error}",
color="red",
)
if hook_blocked:
result = f"Tool execution blocked by hook. Tool: {func_name}"
elif not from_cache and not max_usage_reached:
# Execute the tool (only if not cached and not at max usage)
if not from_cache and not max_usage_reached:
result = "Tool not found"
if func_name in self._available_functions:
try:
@@ -714,7 +661,11 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
# Add to cache after successful execution (before string conversion)
if self.tools_handler and self.tools_handler.cache:
should_cache = True
if original_tool:
if (
original_tool
and hasattr(original_tool, "cache_function")
and original_tool.cache_function
):
should_cache = original_tool.cache_function(
args_dict, raw_result
)
@@ -745,34 +696,10 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
error=e,
),
)
elif max_usage_reached and original_tool:
elif max_usage_reached:
# Return error message when max usage limit is reached
result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore."
# Execute after_tool_call hooks (even if blocked, to allow logging/monitoring)
after_hook_context = ToolCallHookContext(
tool_name=func_name,
tool_input=args_dict,
tool=structured_tool, # type: ignore[arg-type]
agent=self.agent,
task=self.task,
crew=self.crew,
tool_result=result,
)
after_hooks = get_after_tool_call_hooks()
try:
for after_hook in after_hooks:
after_hook_result = after_hook(after_hook_context)
if after_hook_result is not None:
result = after_hook_result
after_hook_context.tool_result = result
except Exception as hook_error:
if self.agent.verbose:
self._printer.print(
content=f"Error in after_tool_call hook: {hook_error}",
color="red",
)
# Emit tool usage finished event
crewai_event_bus.emit(
self,
@@ -819,6 +746,15 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self.state.is_finished = True
return "tool_result_is_final"
# Add reflection prompt once after all tools in the batch
reasoning_prompt = self._i18n.slice("post_tool_reasoning")
reasoning_message: LLMMessage = {
"role": "user",
"content": reasoning_prompt,
}
self.state.messages.append(reasoning_message)
return "native_tool_completed"
def _extract_tool_name(self, tool_call: Any) -> str:
@@ -897,17 +833,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
@listen("parser_error")
def recover_from_parser_error(self) -> Literal["initialized"]:
"""Recover from output parser errors and retry."""
if not self._last_parser_error:
self.state.iterations += 1
return "initialized"
formatted_answer = handle_output_parser_exception(
e=self._last_parser_error,
messages=list(self.state.messages),
iterations=self.state.iterations,
log_error_after=self.log_error_after,
printer=self._printer,
verbose=self.agent.verbose,
)
if formatted_answer:
@@ -927,7 +858,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
llm=self.llm,
callbacks=self.callbacks,
i18n=self._i18n,
verbose=self.agent.verbose,
)
self.state.iterations += 1
@@ -1019,7 +949,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False
@@ -1104,7 +1034,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._console.print(fail_text)
raise
except Exception as e:
handle_unknown_error(self._printer, e, verbose=self.agent.verbose)
handle_unknown_error(self._printer, e)
raise
finally:
self._is_executing = False

View File

@@ -559,7 +559,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
name: str | None = None
tracing: bool | None = None
stream: bool = False
verbose: bool = True
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]:
class _FlowGeneric(cls): # type: ignore
@@ -573,7 +572,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
persistence: FlowPersistence | None = None,
tracing: bool | None = None,
suppress_flow_events: bool = False,
verbose: bool | None = None,
**kwargs: Any,
) -> None:
"""Initialize a new Flow instance.
@@ -582,12 +580,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
persistence: Optional persistence backend for storing flow states
tracing: Whether to enable tracing. True=always enable, False=always disable, None=check environment/user settings
suppress_flow_events: Whether to suppress flow event emissions (internal use)
verbose: Whether to enable verbose logging output. True=always enable, False=always disable,
None=check CREWAI_VERBOSE environment variable (defaults to True if not set).
**kwargs: Additional state values to initialize or override
"""
from crewai.events.event_listener import EventListener
# Initialize basic instance attributes
self._methods: dict[FlowMethodName, FlowMethod[Any, Any]] = {}
self._method_execution_counts: dict[FlowMethodName, int] = {}
@@ -611,14 +605,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._pending_feedback_context: PendingFeedbackContext | None = None
self.suppress_flow_events: bool = suppress_flow_events
# Set verbose and configure event listener
from crewai.utilities.logger_utils import should_enable_verbose
self.verbose = should_enable_verbose(override=verbose)
event_listener = EventListener()
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose
# Initialize state with initial values
self._state = self._create_initial_state()
self.tracing = tracing

View File

@@ -118,20 +118,17 @@ class PersistenceDecorator:
)
except Exception as e:
error_msg = LOG_MESSAGES["save_error"].format(method_name, str(e))
if verbose:
cls._printer.print(error_msg, color="red")
cls._printer.print(error_msg, color="red")
logger.error(error_msg)
raise RuntimeError(f"State persistence failed: {e!s}") from e
except AttributeError as e:
error_msg = LOG_MESSAGES["state_missing"]
if verbose:
cls._printer.print(error_msg, color="red")
cls._printer.print(error_msg, color="red")
logger.error(error_msg)
raise ValueError(error_msg) from e
except (TypeError, ValueError) as e:
error_msg = LOG_MESSAGES["id_missing"]
if verbose:
cls._printer.print(error_msg, color="red")
cls._printer.print(error_msg, color="red")
logger.error(error_msg)
raise ValueError(error_msg) from e

View File

@@ -151,9 +151,7 @@ def _unwrap_function(function: Any) -> Any:
return function
def get_possible_return_constants(
function: Any, verbose: bool = True
) -> list[str] | None:
def get_possible_return_constants(function: Any) -> list[str] | None:
"""Extract possible string return values from a function using AST parsing.
This function analyzes the source code of a router method to identify
@@ -180,11 +178,10 @@ def get_possible_return_constants(
# Can't get source code
return None
except Exception as e:
if verbose:
_printer.print(
f"Error retrieving source code for function {function.__name__}: {e}",
color="red",
)
_printer.print(
f"Error retrieving source code for function {function.__name__}: {e}",
color="red",
)
return None
try:
@@ -193,28 +190,25 @@ def get_possible_return_constants(
# Parse the source code into an AST
code_ast = ast.parse(source)
except IndentationError as e:
if verbose:
_printer.print(
f"IndentationError while parsing source code of {function.__name__}: {e}",
color="red",
)
_printer.print(f"Source code:\n{source}", color="yellow")
_printer.print(
f"IndentationError while parsing source code of {function.__name__}: {e}",
color="red",
)
_printer.print(f"Source code:\n{source}", color="yellow")
return None
except SyntaxError as e:
if verbose:
_printer.print(
f"SyntaxError while parsing source code of {function.__name__}: {e}",
color="red",
)
_printer.print(f"Source code:\n{source}", color="yellow")
_printer.print(
f"SyntaxError while parsing source code of {function.__name__}: {e}",
color="red",
)
_printer.print(f"Source code:\n{source}", color="yellow")
return None
except Exception as e:
if verbose:
_printer.print(
f"Unexpected error while parsing source code of {function.__name__}: {e}",
color="red",
)
_printer.print(f"Source code:\n{source}", color="yellow")
_printer.print(
f"Unexpected error while parsing source code of {function.__name__}: {e}",
color="red",
)
_printer.print(f"Source code:\n{source}", color="yellow")
return None
return_values: set[str] = set()
@@ -394,17 +388,15 @@ def get_possible_return_constants(
StateAttributeVisitor().visit(class_ast)
except Exception as e:
if verbose:
_printer.print(
f"Could not analyze class context for {function.__name__}: {e}",
color="yellow",
)
_printer.print(
f"Could not analyze class context for {function.__name__}: {e}",
color="yellow",
)
except Exception as e:
if verbose:
_printer.print(
f"Could not introspect class for {function.__name__}: {e}",
color="yellow",
)
_printer.print(
f"Could not introspect class for {function.__name__}: {e}",
color="yellow",
)
VariableAssignmentVisitor().visit(code_ast)
ReturnVisitor().visit(code_ast)

View File

@@ -9,7 +9,6 @@ from crewai.utilities.printer import Printer
if TYPE_CHECKING:
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.experimental.agent_executor import AgentExecutor
from crewai.lite_agent import LiteAgent
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.types import LLMMessage
@@ -42,7 +41,7 @@ class LLMCallHookContext:
Can be modified by returning a new string from after_llm_call hook.
"""
executor: CrewAgentExecutor | AgentExecutor | LiteAgent | None
executor: CrewAgentExecutor | LiteAgent | None
messages: list[LLMMessage]
agent: Any
task: Any
@@ -53,7 +52,7 @@ class LLMCallHookContext:
def __init__(
self,
executor: CrewAgentExecutor | AgentExecutor | LiteAgent | None = None,
executor: CrewAgentExecutor | LiteAgent | None = None,
response: str | None = None,
messages: list[LLMMessage] | None = None,
llm: BaseLLM | str | Any | None = None, # TODO: look into

View File

@@ -72,13 +72,13 @@ from crewai.utilities.agent_utils import (
from crewai.utilities.converter import (
Converter,
ConverterError,
generate_model_description,
)
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.guardrail_types import GuardrailCallable, GuardrailType
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.types import LLMMessage
@@ -344,12 +344,11 @@ class LiteAgent(FlowTrackable, BaseModel):
)
except Exception as e:
if self.verbose:
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
handle_unknown_error(self._printer, e, verbose=self.verbose)
self._printer.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
handle_unknown_error(self._printer, e)
# Emit error event
crewai_event_bus.emit(
self,
@@ -397,11 +396,10 @@ class LiteAgent(FlowTrackable, BaseModel):
if isinstance(result, BaseModel):
formatted_result = result
except ConverterError as e:
if self.verbose:
self._printer.print(
content=f"Failed to parse output into response format after retries: {e.message}",
color="yellow",
)
self._printer.print(
content=f"Failed to parse output into response format after retries: {e.message}",
color="yellow",
)
# Calculate token usage metrics
if isinstance(self.llm, BaseLLM):
@@ -607,7 +605,6 @@ class LiteAgent(FlowTrackable, BaseModel):
messages=self._messages,
llm=cast(LLM, self.llm),
callbacks=self._callbacks,
verbose=self.verbose,
)
enforce_rpm_limit(self.request_within_rpm_limit)
@@ -620,7 +617,6 @@ class LiteAgent(FlowTrackable, BaseModel):
printer=self._printer,
from_agent=self,
executor_context=self,
verbose=self.verbose,
)
except Exception as e:
@@ -650,18 +646,16 @@ class LiteAgent(FlowTrackable, BaseModel):
self._append_message(formatted_answer.text, role="assistant")
except OutputParserError as e: # noqa: PERF203
if self.verbose:
self._printer.print(
content="Failed to parse LLM output. Retrying...",
color="yellow",
)
self._printer.print(
content="Failed to parse LLM output. Retrying...",
color="yellow",
)
formatted_answer = handle_output_parser_exception(
e=e,
messages=self._messages,
iterations=self._iterations,
log_error_after=3,
printer=self._printer,
verbose=self.verbose,
)
except Exception as e:
@@ -676,10 +670,9 @@ class LiteAgent(FlowTrackable, BaseModel):
llm=cast(LLM, self.llm),
callbacks=self._callbacks,
i18n=self.i18n,
verbose=self.verbose,
)
continue
handle_unknown_error(self._printer, e, verbose=self.verbose)
handle_unknown_error(self._printer, e)
raise e
finally:

View File

@@ -404,7 +404,7 @@ class BaseLLM(ABC):
from_agent: Agent | None = None,
tool_call: dict[str, Any] | None = None,
call_type: LLMCallType | None = None,
response_id: str | None = None,
response_id: str | None = None
) -> None:
"""Emit stream chunk event.
@@ -427,7 +427,7 @@ class BaseLLM(ABC):
from_task=from_task,
from_agent=from_agent,
call_type=call_type,
response_id=response_id,
response_id=response_id
),
)
@@ -497,7 +497,7 @@ class BaseLLM(ABC):
from_agent=from_agent,
)
return str(result) if not isinstance(result, str) else result
return result
except Exception as e:
error_msg = f"Error executing function '{function_name}': {e!s}"
@@ -737,25 +737,22 @@ class BaseLLM(ABC):
task=None,
crew=None,
)
verbose = getattr(from_agent, "verbose", True) if from_agent else True
printer = Printer()
try:
for hook in before_hooks:
result = hook(hook_context)
if result is False:
if verbose:
printer.print(
content="LLM call blocked by before_llm_call hook",
color="yellow",
)
printer.print(
content="LLM call blocked by before_llm_call hook",
color="yellow",
)
return False
except Exception as e:
if verbose:
printer.print(
content=f"Error in before_llm_call hook: {e}",
color="yellow",
)
printer.print(
content=f"Error in before_llm_call hook: {e}",
color="yellow",
)
return True
@@ -808,7 +805,6 @@ class BaseLLM(ABC):
crew=None,
response=response,
)
verbose = getattr(from_agent, "verbose", True) if from_agent else True
printer = Printer()
modified_response = response
@@ -819,10 +815,9 @@ class BaseLLM(ABC):
modified_response = result
hook_context.response = modified_response
except Exception as e:
if verbose:
printer.print(
content=f"Error in after_llm_call hook: {e}",
color="yellow",
)
printer.print(
content=f"Error in after_llm_call hook: {e}",
color="yellow",
)
return modified_response

View File

@@ -23,7 +23,7 @@ if TYPE_CHECKING:
try:
from anthropic import Anthropic, AsyncAnthropic, transform_schema
from anthropic.types import Message, TextBlock, ThinkingBlock, ToolUseBlock
from anthropic.types.beta import BetaMessage, BetaTextBlock
from anthropic.types.beta import BetaMessage
import httpx
except ImportError:
raise ImportError(
@@ -337,7 +337,6 @@ class AnthropicCompletion(BaseLLM):
available_functions: Available functions for tool calling
from_task: Task that initiated the call
from_agent: Agent that initiated the call
response_model: Optional response model.
Returns:
Chat completion response or tool call result
@@ -678,31 +677,31 @@ class AnthropicCompletion(BaseLLM):
if _is_pydantic_model_class(response_model) and response.content:
if use_native_structured_output:
for block in response.content:
if isinstance(block, (TextBlock, BetaTextBlock)):
structured_data = response_model.model_validate_json(block.text)
if isinstance(block, TextBlock):
structured_json = block.text
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return structured_json
else:
for block in response.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_data = response_model.model_validate(block.input)
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return structured_json
# Check if Claude wants to use tools
if response.content:
@@ -898,29 +897,28 @@ class AnthropicCompletion(BaseLLM):
if _is_pydantic_model_class(response_model):
if use_native_structured_output:
structured_data = response_model.model_validate_json(full_response)
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return full_response
for block in final_message.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_data = response_model.model_validate(block.input)
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return structured_json
if final_message.content:
tool_uses = [
@@ -1168,31 +1166,31 @@ class AnthropicCompletion(BaseLLM):
if _is_pydantic_model_class(response_model) and response.content:
if use_native_structured_output:
for block in response.content:
if isinstance(block, (TextBlock, BetaTextBlock)):
structured_data = response_model.model_validate_json(block.text)
if isinstance(block, TextBlock):
structured_json = block.text
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return structured_json
else:
for block in response.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_data = response_model.model_validate(block.input)
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return structured_json
if response.content:
tool_uses = [
@@ -1364,29 +1362,28 @@ class AnthropicCompletion(BaseLLM):
if _is_pydantic_model_class(response_model):
if use_native_structured_output:
structured_data = response_model.model_validate_json(full_response)
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=full_response,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return full_response
for block in final_message.content:
if (
isinstance(block, ToolUseBlock)
and block.name == "structured_output"
):
structured_data = response_model.model_validate(block.input)
structured_json = json.dumps(block.input)
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return structured_json
if final_message.content:
tool_uses = [

View File

@@ -557,7 +557,7 @@ class AzureCompletion(BaseLLM):
params: AzureCompletionParams,
from_task: Any | None = None,
from_agent: Any | None = None,
) -> BaseModel:
) -> str:
"""Validate content against response model and emit completion event.
Args:
@@ -568,23 +568,24 @@ class AzureCompletion(BaseLLM):
from_agent: Agent that initiated the call
Returns:
Validated Pydantic model instance
Validated and serialized JSON string
Raises:
ValueError: If validation fails
"""
try:
structured_data = response_model.model_validate_json(content)
structured_json = structured_data.model_dump_json()
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return structured_data
return structured_json
except Exception as e:
error_msg = f"Failed to validate structured output with model {response_model.__name__}: {e}"
logging.error(error_msg)

View File

@@ -16,7 +16,6 @@ from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
)
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.types import LLMMessage
@@ -549,11 +548,7 @@ class BedrockCompletion(BaseLLM):
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(
@@ -784,11 +779,7 @@ class BedrockCompletion(BaseLLM):
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(
@@ -1020,11 +1011,7 @@ class BedrockCompletion(BaseLLM):
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(
@@ -1236,11 +1223,7 @@ class BedrockCompletion(BaseLLM):
"toolSpec": {
"name": "structured_output",
"description": "Returns structured data according to the schema",
"inputSchema": {
"json": generate_model_description(response_model)
.get("json_schema", {})
.get("schema", {})
},
"inputSchema": {"json": response_model.model_json_schema()},
}
}
body["toolConfig"] = cast(

View File

@@ -15,7 +15,6 @@ from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
)
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.types import LLMMessage
@@ -132,9 +131,6 @@ class GeminiCompletion(BaseLLM):
self.supports_tools = bool(
version_match and float(version_match.group(1)) >= 1.5
)
self.is_gemini_2_0 = bool(
version_match and float(version_match.group(1)) >= 2.0
)
@property
def stop(self) -> list[str]:
@@ -442,11 +438,6 @@ class GeminiCompletion(BaseLLM):
Returns:
GenerateContentConfig object for Gemini API
Note:
Structured output support varies by model version:
- Gemini 1.5 and earlier: Uses response_schema (Pydantic model)
- Gemini 2.0+: Uses response_json_schema (JSON Schema) with propertyOrdering
"""
self.tools = tools
config_params: dict[str, Any] = {}
@@ -473,14 +464,7 @@ class GeminiCompletion(BaseLLM):
if response_model:
config_params["response_mime_type"] = "application/json"
schema_output = generate_model_description(response_model)
schema = schema_output.get("json_schema", {}).get("schema", {})
if self.is_gemini_2_0:
schema = self._add_property_ordering(schema)
config_params["response_json_schema"] = schema
else:
config_params["response_schema"] = response_model
config_params["response_schema"] = response_model.model_json_schema()
# Handle tools for supported models
if tools and self.supports_tools:
@@ -505,7 +489,7 @@ class GeminiCompletion(BaseLLM):
function_declaration = types.FunctionDeclaration(
name=name,
description=description,
parameters_json_schema=parameters if parameters else None,
parameters=parameters if parameters else None,
)
gemini_tool = types.Tool(function_declarations=[function_declaration])
@@ -559,10 +543,11 @@ class GeminiCompletion(BaseLLM):
else:
parts.append(types.Part.from_text(text=str(content) if content else ""))
text_content: str = " ".join(p.text for p in parts if p.text is not None)
if role == "system":
# Extract system instruction - Gemini handles it separately
text_content = " ".join(
p.text for p in parts if hasattr(p, "text") and p.text
)
if system_instruction:
system_instruction += f"\n\n{text_content}"
else:
@@ -591,40 +576,31 @@ class GeminiCompletion(BaseLLM):
types.Content(role="user", parts=[function_response_part])
)
elif role == "assistant" and message.get("tool_calls"):
raw_parts: list[Any] | None = message.get("raw_tool_call_parts")
if raw_parts and all(isinstance(p, types.Part) for p in raw_parts):
tool_parts: list[types.Part] = list(raw_parts)
if text_content:
tool_parts.insert(0, types.Part.from_text(text=text_content))
else:
tool_parts = []
if text_content:
tool_parts.append(types.Part.from_text(text=text_content))
tool_parts: list[types.Part] = []
tool_calls: list[dict[str, Any]] = message.get("tool_calls") or []
for tool_call in tool_calls:
func: dict[str, Any] = tool_call.get("function") or {}
func_name: str = str(func.get("name") or "")
func_args_raw: str | dict[str, Any] = (
func.get("arguments") or {}
)
if text_content:
tool_parts.append(types.Part.from_text(text=text_content))
func_args: dict[str, Any]
if isinstance(func_args_raw, str):
try:
func_args = (
json.loads(func_args_raw) if func_args_raw else {}
)
except (json.JSONDecodeError, TypeError):
func_args = {}
else:
func_args = func_args_raw
tool_calls: list[dict[str, Any]] = message.get("tool_calls") or []
for tool_call in tool_calls:
func: dict[str, Any] = tool_call.get("function") or {}
func_name: str = str(func.get("name") or "")
func_args_raw: str | dict[str, Any] = func.get("arguments") or {}
tool_parts.append(
types.Part.from_function_call(
name=func_name, args=func_args
func_args: dict[str, Any]
if isinstance(func_args_raw, str):
try:
func_args = (
json.loads(func_args_raw) if func_args_raw else {}
)
)
except (json.JSONDecodeError, TypeError):
func_args = {}
else:
func_args = func_args_raw
tool_parts.append(
types.Part.from_function_call(name=func_name, args=func_args)
)
contents.append(types.Content(role="model", parts=tool_parts))
else:
@@ -644,7 +620,7 @@ class GeminiCompletion(BaseLLM):
messages_for_event: list[LLMMessage],
from_task: Any | None = None,
from_agent: Any | None = None,
) -> BaseModel:
) -> str:
"""Validate content against response model and emit completion event.
Args:
@@ -655,23 +631,24 @@ class GeminiCompletion(BaseLLM):
from_agent: Agent that initiated the call
Returns:
Validated Pydantic model instance
Validated and serialized JSON string
Raises:
ValueError: If validation fails
"""
try:
structured_data = response_model.model_validate_json(content)
structured_json = structured_data.model_dump_json()
self._emit_call_completed_event(
response=structured_data.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=messages_for_event,
)
return structured_data
return structured_json
except Exception as e:
error_msg = f"Failed to validate structured output with model {response_model.__name__}: {e}"
logging.error(error_msg)
@@ -684,7 +661,7 @@ class GeminiCompletion(BaseLLM):
response_model: type[BaseModel] | None = None,
from_task: Any | None = None,
from_agent: Any | None = None,
) -> str | BaseModel:
) -> str:
"""Finalize completion response with validation and event emission.
Args:
@@ -695,7 +672,7 @@ class GeminiCompletion(BaseLLM):
from_agent: Agent that initiated the call
Returns:
Final response content after processing (str or Pydantic model if response_model provided)
Final response content after processing
"""
messages_for_event = self._convert_contents_to_dict(contents)
@@ -881,7 +858,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | BaseModel | list[dict[str, Any]]:
) -> str | list[dict[str, Any]]:
"""Finalize streaming response with usage tracking, function execution, and events.
Args:
@@ -1001,7 +978,7 @@ class GeminiCompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | BaseModel | list[dict[str, Any]] | Any:
) -> str | Any:
"""Handle streaming content generation."""
full_response = ""
function_calls: dict[int, dict[str, Any]] = {}
@@ -1201,36 +1178,6 @@ class GeminiCompletion(BaseLLM):
return "".join(text_parts)
@staticmethod
def _add_property_ordering(schema: dict[str, Any]) -> dict[str, Any]:
"""Add propertyOrdering to JSON schema for Gemini 2.0 compatibility.
Gemini 2.0 models require an explicit propertyOrdering list to define
the preferred structure of JSON objects. This recursively adds
propertyOrdering to all objects in the schema.
Args:
schema: JSON schema dictionary.
Returns:
Modified schema with propertyOrdering added to all objects.
"""
if isinstance(schema, dict):
if schema.get("type") == "object" and "properties" in schema:
properties = schema["properties"]
if properties and "propertyOrdering" not in schema:
schema["propertyOrdering"] = list(properties.keys())
for value in schema.values():
if isinstance(value, dict):
GeminiCompletion._add_property_ordering(value)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
GeminiCompletion._add_property_ordering(item)
return schema
@staticmethod
def _convert_contents_to_dict(
contents: list[types.Content],

View File

@@ -693,14 +693,14 @@ class OpenAICompletion(BaseLLM):
if response_model or self.response_format:
format_model = response_model or self.response_format
if isinstance(format_model, type) and issubclass(format_model, BaseModel):
schema_output = generate_model_description(format_model)
json_schema = schema_output.get("json_schema", {})
schema = format_model.model_json_schema()
schema["additionalProperties"] = False
params["text"] = {
"format": {
"type": "json_schema",
"name": json_schema.get("name", format_model.__name__),
"strict": json_schema.get("strict", True),
"schema": json_schema.get("schema", {}),
"name": format_model.__name__,
"strict": True,
"schema": schema,
}
}
elif isinstance(format_model, dict):
@@ -1060,7 +1060,7 @@ class OpenAICompletion(BaseLLM):
chunk=delta_text,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
response_id=response_id_stream
)
elif event.type == "response.function_call_arguments.delta":
@@ -1570,14 +1570,15 @@ class OpenAICompletion(BaseLLM):
parsed_object = parsed_response.choices[0].message.parsed
if parsed_object:
structured_json = parsed_object.model_dump_json()
self._emit_call_completed_event(
response=parsed_object.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return parsed_object
return structured_json
response: ChatCompletion = self.client.chat.completions.create(**params)
@@ -1691,7 +1692,7 @@ class OpenAICompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | BaseModel:
) -> str:
"""Handle streaming chat completion."""
full_response = ""
tool_calls: dict[int, dict[str, Any]] = {}
@@ -1708,7 +1709,7 @@ class OpenAICompletion(BaseLLM):
**parse_params, response_format=response_model
) as stream:
for chunk in stream:
response_id_stream = chunk.id if hasattr(chunk, "id") else None
response_id_stream=chunk.id if hasattr(chunk,"id") else None
if chunk.type == "content.delta":
delta_content = chunk.delta
@@ -1717,7 +1718,7 @@ class OpenAICompletion(BaseLLM):
chunk=delta_content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
response_id=response_id_stream
)
final_completion = stream.get_final_completion()
@@ -1727,14 +1728,15 @@ class OpenAICompletion(BaseLLM):
if final_completion.choices:
parsed_result = final_completion.choices[0].message.parsed
if parsed_result:
structured_json = parsed_result.model_dump_json()
self._emit_call_completed_event(
response=parsed_result.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return parsed_result
return structured_json
logging.error("Failed to get parsed result from stream")
return ""
@@ -1746,9 +1748,7 @@ class OpenAICompletion(BaseLLM):
usage_data = {"total_tokens": 0}
for completion_chunk in completion_stream:
response_id_stream = (
completion_chunk.id if hasattr(completion_chunk, "id") else None
)
response_id_stream=completion_chunk.id if hasattr(completion_chunk,"id") else None
if hasattr(completion_chunk, "usage") and completion_chunk.usage:
usage_data = self._extract_openai_token_usage(completion_chunk)
@@ -1766,7 +1766,7 @@ class OpenAICompletion(BaseLLM):
chunk=chunk_delta.content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
response_id=response_id_stream
)
if chunk_delta.tool_calls:
@@ -1805,7 +1805,7 @@ class OpenAICompletion(BaseLLM):
"index": tool_calls[tool_index]["index"],
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id_stream,
response_id=response_id_stream
)
self._track_token_usage_internal(usage_data)
@@ -1885,14 +1885,15 @@ class OpenAICompletion(BaseLLM):
parsed_object = parsed_response.choices[0].message.parsed
if parsed_object:
structured_json = parsed_object.model_dump_json()
self._emit_call_completed_event(
response=parsed_object.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return parsed_object
return structured_json
response: ChatCompletion = await self.async_client.chat.completions.create(
**params
@@ -2003,7 +2004,7 @@ class OpenAICompletion(BaseLLM):
from_task: Any | None = None,
from_agent: Any | None = None,
response_model: type[BaseModel] | None = None,
) -> str | BaseModel:
) -> str:
"""Handle async streaming chat completion."""
full_response = ""
tool_calls: dict[int, dict[str, Any]] = {}
@@ -2016,7 +2017,7 @@ class OpenAICompletion(BaseLLM):
accumulated_content = ""
usage_data = {"total_tokens": 0}
async for chunk in completion_stream:
response_id_stream = chunk.id if hasattr(chunk, "id") else None
response_id_stream=chunk.id if hasattr(chunk,"id") else None
if hasattr(chunk, "usage") and chunk.usage:
usage_data = self._extract_openai_token_usage(chunk)
@@ -2034,23 +2035,24 @@ class OpenAICompletion(BaseLLM):
chunk=delta.content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
response_id=response_id_stream
)
self._track_token_usage_internal(usage_data)
try:
parsed_object = response_model.model_validate_json(accumulated_content)
structured_json = parsed_object.model_dump_json()
self._emit_call_completed_event(
response=parsed_object.model_dump_json(),
response=structured_json,
call_type=LLMCallType.LLM_CALL,
from_task=from_task,
from_agent=from_agent,
messages=params["messages"],
)
return parsed_object
return structured_json
except Exception as e:
logging.error(f"Failed to parse structured output from stream: {e}")
self._emit_call_completed_event(
@@ -2069,7 +2071,7 @@ class OpenAICompletion(BaseLLM):
usage_data = {"total_tokens": 0}
async for chunk in stream:
response_id_stream = chunk.id if hasattr(chunk, "id") else None
response_id_stream=chunk.id if hasattr(chunk,"id") else None
if hasattr(chunk, "usage") and chunk.usage:
usage_data = self._extract_openai_token_usage(chunk)
@@ -2087,7 +2089,7 @@ class OpenAICompletion(BaseLLM):
chunk=chunk_delta.content,
from_task=from_task,
from_agent=from_agent,
response_id=response_id_stream,
response_id=response_id_stream
)
if chunk_delta.tool_calls:
@@ -2126,7 +2128,7 @@ class OpenAICompletion(BaseLLM):
"index": tool_calls[tool_index]["index"],
},
call_type=LLMCallType.TOOL_CALL,
response_id=response_id_stream,
response_id=response_id_stream
)
self._track_token_usage_internal(usage_data)

View File

@@ -2,7 +2,6 @@ import logging
import re
from typing import Any
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.string_utils import sanitize_tool_name
@@ -78,8 +77,7 @@ def extract_tool_info(tool: dict[str, Any]) -> tuple[str, str, dict[str, Any]]:
# Also check for args_schema (Pydantic format)
if not parameters and "args_schema" in tool:
if hasattr(tool["args_schema"], "model_json_schema"):
schema_output = generate_model_description(tool["args_schema"])
parameters = schema_output.get("json_schema", {}).get("schema", {})
parameters = tool["args_schema"].model_json_schema()
return name, description, parameters

View File

@@ -12,17 +12,15 @@ from crewai.utilities.paths import db_storage_path
class LTMSQLiteStorage:
"""SQLite storage class for long-term memory data."""
def __init__(self, db_path: str | None = None, verbose: bool = True) -> None:
def __init__(self, db_path: str | None = None) -> None:
"""Initialize the SQLite storage.
Args:
db_path: Optional path to the database file.
verbose: Whether to print error messages.
"""
if db_path is None:
db_path = str(Path(db_storage_path()) / "long_term_memory_storage.db")
self.db_path = db_path
self._verbose = verbose
self._printer: Printer = Printer()
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
self._initialize_db()
@@ -46,11 +44,10 @@ class LTMSQLiteStorage:
conn.commit()
except sqlite3.Error as e:
if self._verbose:
self._printer.print(
content=f"MEMORY ERROR: An error occurred during database initialization: {e}",
color="red",
)
self._printer.print(
content=f"MEMORY ERROR: An error occurred during database initialization: {e}",
color="red",
)
def save(
self,
@@ -72,11 +69,10 @@ class LTMSQLiteStorage:
)
conn.commit()
except sqlite3.Error as e:
if self._verbose:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while saving to LTM: {e}",
color="red",
)
self._printer.print(
content=f"MEMORY ERROR: An error occurred while saving to LTM: {e}",
color="red",
)
def load(self, task_description: str, latest_n: int) -> list[dict[str, Any]] | None:
"""Queries the LTM table by task description with error handling."""
@@ -105,11 +101,10 @@ class LTMSQLiteStorage:
]
except sqlite3.Error as e:
if self._verbose:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while querying LTM: {e}",
color="red",
)
self._printer.print(
content=f"MEMORY ERROR: An error occurred while querying LTM: {e}",
color="red",
)
return None
def reset(self) -> None:
@@ -121,11 +116,10 @@ class LTMSQLiteStorage:
conn.commit()
except sqlite3.Error as e:
if self._verbose:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)
self._printer.print(
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)
async def asave(
self,
@@ -153,11 +147,10 @@ class LTMSQLiteStorage:
)
await conn.commit()
except aiosqlite.Error as e:
if self._verbose:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while saving to LTM: {e}",
color="red",
)
self._printer.print(
content=f"MEMORY ERROR: An error occurred while saving to LTM: {e}",
color="red",
)
async def aload(
self, task_description: str, latest_n: int
@@ -194,11 +187,10 @@ class LTMSQLiteStorage:
for row in rows
]
except aiosqlite.Error as e:
if self._verbose:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while querying LTM: {e}",
color="red",
)
self._printer.print(
content=f"MEMORY ERROR: An error occurred while querying LTM: {e}",
color="red",
)
return None
async def areset(self) -> None:
@@ -208,8 +200,7 @@ class LTMSQLiteStorage:
await conn.execute("DELETE FROM long_term_memories")
await conn.commit()
except aiosqlite.Error as e:
if self._verbose:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)
self._printer.print(
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)

View File

@@ -1,6 +1,6 @@
"""IBM WatsonX embedding function implementation."""
from typing import Any, cast
from typing import cast
from chromadb.api.types import Documents, EmbeddingFunction, Embeddings
from typing_extensions import Unpack
@@ -15,18 +15,14 @@ _printer = Printer()
class WatsonXEmbeddingFunction(EmbeddingFunction[Documents]):
"""Embedding function for IBM WatsonX models."""
def __init__(
self, *, verbose: bool = True, **kwargs: Unpack[WatsonXProviderConfig]
) -> None:
def __init__(self, **kwargs: Unpack[WatsonXProviderConfig]) -> None:
"""Initialize WatsonX embedding function.
Args:
verbose: Whether to print error messages.
**kwargs: Configuration parameters for WatsonX Embeddings and Credentials.
"""
super().__init__(**kwargs)
self._config = kwargs
self._verbose = verbose
@staticmethod
def name() -> str:
@@ -60,7 +56,7 @@ class WatsonXEmbeddingFunction(EmbeddingFunction[Documents]):
if isinstance(input, str):
input = [input]
embeddings_config: dict[str, Any] = {
embeddings_config: dict = {
"model_id": self._config["model_id"],
}
if "params" in self._config and self._config["params"] is not None:
@@ -94,7 +90,7 @@ class WatsonXEmbeddingFunction(EmbeddingFunction[Documents]):
if "credentials" in self._config and self._config["credentials"] is not None:
embeddings_config["credentials"] = self._config["credentials"]
else:
cred_config: dict[str, Any] = {}
cred_config: dict = {}
if "url" in self._config and self._config["url"] is not None:
cred_config["url"] = self._config["url"]
if "api_key" in self._config and self._config["api_key"] is not None:
@@ -163,6 +159,5 @@ class WatsonXEmbeddingFunction(EmbeddingFunction[Documents]):
embeddings = embedding.embed_documents(input)
return cast(Embeddings, embeddings)
except Exception as e:
if self._verbose:
_printer.print(f"Error during WatsonX embedding: {e}", color="red")
_printer.print(f"Error during WatsonX embedding: {e}", color="red")
raise

View File

@@ -767,11 +767,10 @@ class Task(BaseModel):
if files:
supported_types: list[str] = []
if self.agent.llm and self.agent.llm.supports_multimodal():
provider: str = str(
getattr(self.agent.llm, "provider", None)
or getattr(self.agent.llm, "model", "openai")
provider = getattr(self.agent.llm, "provider", None) or getattr(
self.agent.llm, "model", "openai"
)
api: str | None = getattr(self.agent.llm, "api", None)
api = getattr(self.agent.llm, "api", None)
supported_types = get_supported_content_types(provider, api)
def is_auto_injected(content_type: str) -> bool:
@@ -888,11 +887,10 @@ Follow these guidelines:
try:
crew_chat_messages = json.loads(crew_chat_messages_json)
except json.JSONDecodeError as e:
if self.agent and self.agent.verbose:
_printer.print(
f"An error occurred while parsing crew chat messages: {e}",
color="red",
)
_printer.print(
f"An error occurred while parsing crew chat messages: {e}",
color="red",
)
raise
conversation_history = "\n".join(
@@ -1134,12 +1132,11 @@ Follow these guidelines:
guardrail_result_error=guardrail_result.error,
task_output=task_output.raw,
)
if agent and agent.verbose:
printer = Printer()
printer.print(
content=f"Guardrail {guardrail_index if guardrail_index is not None else ''} blocked (attempt {attempt + 1}/{max_attempts}), retrying due to: {guardrail_result.error}\n",
color="yellow",
)
printer = Printer()
printer.print(
content=f"Guardrail {guardrail_index if guardrail_index is not None else ''} blocked (attempt {attempt + 1}/{max_attempts}), retrying due to: {guardrail_result.error}\n",
color="yellow",
)
# Regenerate output from agent
result = agent.execute_task(
@@ -1232,12 +1229,11 @@ Follow these guidelines:
guardrail_result_error=guardrail_result.error,
task_output=task_output.raw,
)
if agent and agent.verbose:
printer = Printer()
printer.print(
content=f"Guardrail {guardrail_index if guardrail_index is not None else ''} blocked (attempt {attempt + 1}/{max_attempts}), retrying due to: {guardrail_result.error}\n",
color="yellow",
)
printer = Printer()
printer.print(
content=f"Guardrail {guardrail_index if guardrail_index is not None else ''} blocked (attempt {attempt + 1}/{max_attempts}), retrying due to: {guardrail_result.error}\n",
color="yellow",
)
result = await agent.aexecute_task(
task=self,

View File

@@ -173,6 +173,13 @@ class Telemetry:
self._original_handlers: dict[int, Any] = {}
if threading.current_thread() is not threading.main_thread():
logger.debug(
"CrewAI telemetry: Skipping signal handler registration "
"(not running in main thread)."
)
return
self._register_signal_handler(signal.SIGTERM, SigTermEvent, shutdown=True)
self._register_signal_handler(signal.SIGINT, SigIntEvent, shutdown=True)
if hasattr(signal, "SIGHUP"):

View File

@@ -384,8 +384,6 @@ class ToolUsage:
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
and self.agent
and self.agent.verbose
):
self._printer.print(
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
@@ -398,8 +396,6 @@ class ToolUsage:
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
and self.agent
and self.agent.verbose
):
self._printer.print(
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
@@ -614,8 +610,6 @@ class ToolUsage:
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
and self.agent
and self.agent.verbose
):
self._printer.print(
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
@@ -628,8 +622,6 @@ class ToolUsage:
if (
hasattr(available_tool, "max_usage_count")
and available_tool.max_usage_count is not None
and self.agent
and self.agent.verbose
):
self._printer.print(
content=f"Tool '{sanitize_tool_name(available_tool.name)}' usage: {available_tool.current_usage_count}/{available_tool.max_usage_count}",
@@ -892,17 +884,15 @@ class ToolUsage:
# Attempt 4: Repair JSON
try:
repaired_input = str(repair_json(tool_input, skip_json_loads=True))
if self.agent and self.agent.verbose:
self._printer.print(
content=f"Repaired JSON: {repaired_input}", color="blue"
)
self._printer.print(
content=f"Repaired JSON: {repaired_input}", color="blue"
)
arguments = json.loads(repaired_input)
if isinstance(arguments, dict):
return arguments
except Exception as e:
error = f"Failed to repair JSON: {e}"
if self.agent and self.agent.verbose:
self._printer.print(content=error, color="red")
self._printer.print(content=error, color="red")
error_message = (
"Tool input must be a valid dictionary in JSON or Python literal format"

View File

@@ -10,10 +10,9 @@
"memory": "\n\n# Useful context: \n{memory}",
"role_playing": "You are {role}. {backstory}\nYour personal goal is: {goal}",
"tools": "\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\n{tools}\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [{tool_names}], just the name, exactly as it's written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final answer to the original input question\n```",
"no_tools": "",
"task_no_tools": "\nCurrent Task: {input}\n\nProvide your complete response:",
"native_tools": "",
"native_task": "\nCurrent Task: {input}",
"no_tools": "\nTo give my best complete final answer to the task respond using the exact following format:\n\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described.\n\nI MUST use these formats, my job depends on it!",
"native_tools": "\nUse available tools to gather information and complete your task.",
"native_task": "\nCurrent Task: {input}\n\nThis is VERY important to you, your job depends on it!",
"post_tool_reasoning": "Analyze the tool result. If requirements are met, provide the Final Answer. Otherwise, call the next tool. Deliver only the answer without meta-commentary.",
"format": "Decide if you need a tool or can provide the final answer. Use one at a time.\nTo use a tool, use:\nThought: [reasoning]\nAction: [name from {tool_names}]\nAction Input: [JSON object]\n\nTo provide the final answer, use:\nThought: [reasoning]\nFinal Answer: [complete response]",
"final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```",

View File

@@ -28,7 +28,6 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
)
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import ColoredText, Printer
from crewai.utilities.pydantic_schema_utils import generate_model_description
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.types import LLMMessage
@@ -37,7 +36,6 @@ from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.experimental.agent_executor import AgentExecutor
from crewai.lite_agent import LiteAgent
from crewai.llm import LLM
from crewai.task import Task
@@ -160,8 +158,7 @@ def convert_tools_to_openai_schema(
parameters: dict[str, Any] = {}
if hasattr(tool, "args_schema") and tool.args_schema is not None:
try:
schema_output = generate_model_description(tool.args_schema)
parameters = schema_output.get("json_schema", {}).get("schema", {})
parameters = tool.args_schema.model_json_schema()
# Remove title and description from schema root as they're redundant
parameters.pop("title", None)
parameters.pop("description", None)
@@ -210,7 +207,6 @@ def handle_max_iterations_exceeded(
messages: list[LLMMessage],
llm: LLM | BaseLLM,
callbacks: list[TokenCalcHandler],
verbose: bool = True,
) -> AgentFinish:
"""Handles the case when the maximum number of iterations is exceeded. Performs one more LLM call to get the final answer.
@@ -221,16 +217,14 @@ def handle_max_iterations_exceeded(
messages: List of messages to send to the LLM.
llm: The LLM instance to call.
callbacks: List of callbacks for the LLM call.
verbose: Whether to print output.
Returns:
AgentFinish with the final answer after exceeding max iterations.
"""
if verbose:
printer.print(
content="Maximum iterations reached. Requesting final answer.",
color="yellow",
)
printer.print(
content="Maximum iterations reached. Requesting final answer.",
color="yellow",
)
if formatted_answer and hasattr(formatted_answer, "text"):
assistant_message = (
@@ -248,11 +242,10 @@ def handle_max_iterations_exceeded(
)
if answer is None or answer == "":
if verbose:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
formatted = format_answer(answer=answer)
@@ -325,9 +318,8 @@ def get_llm_response(
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None = None,
verbose: bool = True,
) -> str | BaseModel | Any:
executor_context: CrewAgentExecutor | LiteAgent | None = None,
) -> str | Any:
"""Call the LLM and return the response, handling any invalid responses.
Args:
@@ -341,11 +333,10 @@ def get_llm_response(
from_agent: Optional agent context for the LLM call.
response_model: Optional Pydantic model for structured outputs.
executor_context: Optional executor context for hook invocation.
verbose: Whether to print output.
Returns:
The response from the LLM as a string, Pydantic model (when response_model is provided),
or tool call results if native function calling is used.
The response from the LLM as a string, or tool call results if
native function calling is used.
Raises:
Exception: If an error occurs.
@@ -353,7 +344,7 @@ def get_llm_response(
"""
if executor_context is not None:
if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose):
if not _setup_before_llm_call_hooks(executor_context, printer):
raise ValueError("LLM call blocked by before_llm_call hook")
messages = executor_context.messages
@@ -370,16 +361,13 @@ def get_llm_response(
except Exception as e:
raise e
if not answer:
if verbose:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return _setup_after_llm_call_hooks(
executor_context, answer, printer, verbose=verbose
)
return _setup_after_llm_call_hooks(executor_context, answer, printer)
async def aget_llm_response(
@@ -392,9 +380,8 @@ async def aget_llm_response(
from_task: Task | None = None,
from_agent: Agent | LiteAgent | None = None,
response_model: type[BaseModel] | None = None,
executor_context: CrewAgentExecutor | AgentExecutor | None = None,
verbose: bool = True,
) -> str | BaseModel | Any:
executor_context: CrewAgentExecutor | None = None,
) -> str | Any:
"""Call the LLM asynchronously and return the response.
Args:
@@ -410,15 +397,15 @@ async def aget_llm_response(
executor_context: Optional executor context for hook invocation.
Returns:
The response from the LLM as a string, Pydantic model (when response_model is provided),
or tool call results if native function calling is used.
The response from the LLM as a string, or tool call results if
native function calling is used.
Raises:
Exception: If an error occurs.
ValueError: If the response is None or empty.
"""
if executor_context is not None:
if not _setup_before_llm_call_hooks(executor_context, printer, verbose=verbose):
if not _setup_before_llm_call_hooks(executor_context, printer):
raise ValueError("LLM call blocked by before_llm_call hook")
messages = executor_context.messages
@@ -435,16 +422,13 @@ async def aget_llm_response(
except Exception as e:
raise e
if not answer:
if verbose:
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError("Invalid response from LLM call - None or empty.")
return _setup_after_llm_call_hooks(
executor_context, answer, printer, verbose=verbose
)
return _setup_after_llm_call_hooks(executor_context, answer, printer)
def process_llm_response(
@@ -511,19 +495,13 @@ def handle_agent_action_core(
return formatted_answer
def handle_unknown_error(
printer: Printer, exception: Exception, verbose: bool = True
) -> None:
def handle_unknown_error(printer: Printer, exception: Exception) -> None:
"""Handle unknown errors by informing the user.
Args:
printer: Printer instance for output
exception: The exception that occurred
verbose: Whether to print output.
"""
if not verbose:
return
error_message = str(exception)
if "litellm" in error_message:
@@ -545,7 +523,6 @@ def handle_output_parser_exception(
iterations: int,
log_error_after: int = 3,
printer: Printer | None = None,
verbose: bool = True,
) -> AgentAction:
"""Handle OutputParserError by updating messages and formatted_answer.
@@ -568,7 +545,7 @@ def handle_output_parser_exception(
thought="",
)
if verbose and iterations > log_error_after and printer:
if iterations > log_error_after and printer:
printer.print(
content=f"Error parsing LLM output, agent will retry: {e.error}",
color="red",
@@ -598,7 +575,6 @@ def handle_context_length(
llm: LLM | BaseLLM,
callbacks: list[TokenCalcHandler],
i18n: I18N,
verbose: bool = True,
) -> None:
"""Handle context length exceeded by either summarizing or raising an error.
@@ -614,20 +590,16 @@ def handle_context_length(
SystemExit: If context length is exceeded and user opts not to summarize
"""
if respect_context_window:
if verbose:
printer.print(
content="Context length exceeded. Summarizing content to fit the model context window. Might take a while...",
color="yellow",
)
summarize_messages(
messages=messages, llm=llm, callbacks=callbacks, i18n=i18n, verbose=verbose
printer.print(
content="Context length exceeded. Summarizing content to fit the model context window. Might take a while...",
color="yellow",
)
summarize_messages(messages=messages, llm=llm, callbacks=callbacks, i18n=i18n)
else:
if verbose:
printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
printer.print(
content="Context length exceeded. Consider using smaller text or RAG tools from crewai_tools.",
color="red",
)
raise SystemExit(
"Context length exceeded and user opted not to summarize. Consider using smaller text or RAG tools from crewai_tools."
)
@@ -638,7 +610,6 @@ def summarize_messages(
llm: LLM | BaseLLM,
callbacks: list[TokenCalcHandler],
i18n: I18N,
verbose: bool = True,
) -> None:
"""Summarize messages to fit within context window.
@@ -670,11 +641,10 @@ def summarize_messages(
total_groups = len(messages_groups)
for idx, group in enumerate(messages_groups, 1):
if verbose:
Printer().print(
content=f"Summarizing {idx}/{total_groups}...",
color="yellow",
)
Printer().print(
content=f"Summarizing {idx}/{total_groups}...",
color="yellow",
)
summarization_messages = [
format_message_for_llm(
@@ -930,16 +900,13 @@ def extract_tool_call_info(
def _setup_before_llm_call_hooks(
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
printer: Printer,
verbose: bool = True,
executor_context: CrewAgentExecutor | LiteAgent | None, printer: Printer
) -> bool:
"""Setup and invoke before_llm_call hooks for the executor context.
Args:
executor_context: The executor context to setup the hooks for.
printer: Printer instance for error logging.
verbose: Whether to print output.
Returns:
True if LLM execution should proceed, False if blocked by a hook.
@@ -954,29 +921,26 @@ def _setup_before_llm_call_hooks(
for hook in executor_context.before_llm_call_hooks:
result = hook(hook_context)
if result is False:
if verbose:
printer.print(
content="LLM call blocked by before_llm_call hook",
color="yellow",
)
printer.print(
content="LLM call blocked by before_llm_call hook",
color="yellow",
)
return False
except Exception as e:
if verbose:
printer.print(
content=f"Error in before_llm_call hook: {e}",
color="yellow",
)
printer.print(
content=f"Error in before_llm_call hook: {e}",
color="yellow",
)
if not isinstance(executor_context.messages, list):
if verbose:
printer.print(
content=(
"Warning: before_llm_call hook replaced messages with non-list. "
"Restoring original messages list. Hooks should modify messages in-place, "
"not replace the list (e.g., use context.messages.append() not context.messages = [])."
),
color="yellow",
)
printer.print(
content=(
"Warning: before_llm_call hook replaced messages with non-list. "
"Restoring original messages list. Hooks should modify messages in-place, "
"not replace the list (e.g., use context.messages.append() not context.messages = [])."
),
color="yellow",
)
if isinstance(original_messages, list):
executor_context.messages = original_messages
else:
@@ -986,80 +950,50 @@ def _setup_before_llm_call_hooks(
def _setup_after_llm_call_hooks(
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
answer: str | BaseModel,
executor_context: CrewAgentExecutor | LiteAgent | None,
answer: str,
printer: Printer,
verbose: bool = True,
) -> str | BaseModel:
) -> str:
"""Setup and invoke after_llm_call hooks for the executor context.
Args:
executor_context: The executor context to setup the hooks for.
answer: The LLM response (string or Pydantic model).
answer: The LLM response string.
printer: Printer instance for error logging.
verbose: Whether to print output.
Returns:
The potentially modified response (string or Pydantic model).
The potentially modified response string.
"""
if executor_context and executor_context.after_llm_call_hooks:
from crewai.hooks.llm_hooks import LLMCallHookContext
original_messages = executor_context.messages
# For Pydantic models, serialize to JSON for hooks
if isinstance(answer, BaseModel):
pydantic_answer = answer
hook_response: str = pydantic_answer.model_dump_json()
original_json: str = hook_response
else:
pydantic_answer = None
hook_response = str(answer)
hook_context = LLMCallHookContext(executor_context, response=hook_response)
hook_context = LLMCallHookContext(executor_context, response=answer)
try:
for hook in executor_context.after_llm_call_hooks:
modified_response = hook(hook_context)
if modified_response is not None and isinstance(modified_response, str):
hook_response = modified_response
answer = modified_response
except Exception as e:
if verbose:
printer.print(
content=f"Error in after_llm_call hook: {e}",
color="yellow",
)
printer.print(
content=f"Error in after_llm_call hook: {e}",
color="yellow",
)
if not isinstance(executor_context.messages, list):
if verbose:
printer.print(
content=(
"Warning: after_llm_call hook replaced messages with non-list. "
"Restoring original messages list. Hooks should modify messages in-place, "
"not replace the list (e.g., use context.messages.append() not context.messages = [])."
),
color="yellow",
)
printer.print(
content=(
"Warning: after_llm_call hook replaced messages with non-list. "
"Restoring original messages list. Hooks should modify messages in-place, "
"not replace the list (e.g., use context.messages.append() not context.messages = [])."
),
color="yellow",
)
if isinstance(original_messages, list):
executor_context.messages = original_messages
else:
executor_context.messages = []
# If hooks modified the response, update answer accordingly
if pydantic_answer is not None:
# For Pydantic models, reparse the JSON if it was modified
if hook_response != original_json:
try:
model_class: type[BaseModel] = type(pydantic_answer)
answer = model_class.model_validate_json(hook_response)
except Exception as e:
if verbose:
printer.print(
content=f"Warning: Hook modified response but failed to reparse as {type(pydantic_answer).__name__}: {e}. Using original model.",
color="yellow",
)
else:
# For string responses, use the hook-modified response
answer = hook_response
return answer

View File

@@ -62,10 +62,7 @@ class Converter(OutputConverter):
],
response_model=self.model,
)
if isinstance(response, BaseModel):
result = response
else:
result = self.model.model_validate_json(response)
result = self.model.model_validate_json(response)
else:
response = self.llm.call(
[
@@ -208,11 +205,10 @@ def convert_to_model(
)
except Exception as e:
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
Printer().print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result
@@ -266,11 +262,10 @@ def handle_partial_json(
except ValidationError:
raise
except Exception as e:
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)
Printer().print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)
return convert_with_instructions(
result=result,
@@ -328,11 +323,10 @@ def convert_with_instructions(
)
if isinstance(exported_result, ConverterError):
if agent and getattr(agent, "verbose", True):
Printer().print(
content=f"Failed to convert result to model: {exported_result}",
color="red",
)
Printer().print(
content=f"Failed to convert result to model: {exported_result}",
color="red",
)
return result
return exported_result

View File

@@ -4,7 +4,6 @@ from collections.abc import Generator
import contextlib
import io
import logging
import os
import warnings
@@ -57,39 +56,3 @@ def suppress_warnings() -> Generator[None, None, None]:
"ignore", message="open_text is deprecated*", category=DeprecationWarning
)
yield
def should_enable_verbose(*, override: bool | None = None) -> bool:
"""Determine if verbose logging should be enabled.
This is the single source of truth for verbose logging enablement.
Priority order:
1. Explicit override (e.g., Crew.verbose=True/False or Flow.verbose=True/False)
2. Environment variable CREWAI_VERBOSE
Args:
override: Explicit override for verbose (True=always enable, False=always disable,
None=check environment variable, defaults to True if not set)
Returns:
True if verbose logging should be enabled, False otherwise.
Example:
# Disable verbose logging globally via environment variable
export CREWAI_VERBOSE=false
# Or in code
flow = Flow(verbose=False)
crew = Crew(verbose=False)
"""
if override is not None:
return override
env_value = os.getenv("CREWAI_VERBOSE", "").lower()
if env_value in ("false", "0"):
return False
if env_value in ("true", "1"):
return True
# Default to True if not set
return True

View File

@@ -23,13 +23,7 @@ class SystemPromptResult(StandardPromptResult):
COMPONENTS = Literal[
"role_playing",
"tools",
"no_tools",
"native_tools",
"task",
"native_task",
"task_no_tools",
"role_playing", "tools", "no_tools", "native_tools", "task", "native_task"
]
@@ -80,14 +74,11 @@ class Prompts(BaseModel):
slices.append("no_tools")
system: str = self._build_prompt(slices)
# Determine which task slice to use:
task_slice: COMPONENTS
if self.use_native_tool_calling:
task_slice = "native_task"
elif self.has_tools:
task_slice = "task"
else:
task_slice = "task_no_tools"
# Use native_task for native tool calling (no "Thought:" prompt)
# Use task for ReAct pattern (includes "Thought:" prompt)
task_slice: COMPONENTS = (
"native_task" if self.use_native_tool_calling else "task"
)
slices.append(task_slice)
if (

View File

@@ -1,72 +1,14 @@
"""Dynamic Pydantic model creation from JSON schemas.
This module provides utilities for converting JSON schemas to Pydantic models at runtime.
The main function is `create_model_from_schema`, which takes a JSON schema and returns
a dynamically created Pydantic model class.
This is used by the A2A server to honor response schemas sent by clients, allowing
structured output from agent tasks.
Based on dydantic (https://github.com/zenbase-ai/dydantic).
"""Utilities for generating JSON schemas from Pydantic models.
This module provides functions for converting Pydantic models to JSON schemas
suitable for use with LLMs and tool definitions.
"""
from __future__ import annotations
from collections.abc import Callable
from copy import deepcopy
import datetime
import logging
from typing import TYPE_CHECKING, Annotated, Any, Literal, Union
import uuid
from typing import Any
from pydantic import (
UUID1,
UUID3,
UUID4,
UUID5,
AnyUrl,
BaseModel,
ConfigDict,
DirectoryPath,
Field,
FilePath,
FileUrl,
HttpUrl,
Json,
MongoDsn,
NewPath,
PostgresDsn,
SecretBytes,
SecretStr,
StrictBytes,
create_model as create_model_base,
)
from pydantic.networks import ( # type: ignore[attr-defined]
IPv4Address,
IPv6Address,
IPvAnyAddress,
IPvAnyInterface,
IPvAnyNetwork,
)
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from pydantic import EmailStr
from pydantic.main import AnyClassMethod
else:
try:
from pydantic import EmailStr
except ImportError:
logger.warning(
"EmailStr unavailable, using str fallback",
extra={"missing_package": "email_validator"},
)
EmailStr = str
from pydantic import BaseModel
def resolve_refs(schema: dict[str, Any]) -> dict[str, Any]:
@@ -301,319 +243,3 @@ def generate_model_description(model: type[BaseModel]) -> dict[str, Any]:
"schema": json_schema,
},
}
FORMAT_TYPE_MAP: dict[str, type[Any]] = {
"base64": Annotated[bytes, Field(json_schema_extra={"format": "base64"})], # type: ignore[dict-item]
"binary": StrictBytes,
"date": datetime.date,
"time": datetime.time,
"date-time": datetime.datetime,
"duration": datetime.timedelta,
"directory-path": DirectoryPath,
"email": EmailStr,
"file-path": FilePath,
"ipv4": IPv4Address,
"ipv6": IPv6Address,
"ipvanyaddress": IPvAnyAddress, # type: ignore[dict-item]
"ipvanyinterface": IPvAnyInterface, # type: ignore[dict-item]
"ipvanynetwork": IPvAnyNetwork, # type: ignore[dict-item]
"json-string": Json,
"multi-host-uri": PostgresDsn | MongoDsn, # type: ignore[dict-item]
"password": SecretStr,
"path": NewPath,
"uri": AnyUrl,
"uuid": uuid.UUID,
"uuid1": UUID1,
"uuid3": UUID3,
"uuid4": UUID4,
"uuid5": UUID5,
}
def create_model_from_schema( # type: ignore[no-any-unimported]
json_schema: dict[str, Any],
*,
root_schema: dict[str, Any] | None = None,
__config__: ConfigDict | None = None,
__base__: type[BaseModel] | None = None,
__module__: str = __name__,
__validators__: dict[str, AnyClassMethod] | None = None,
__cls_kwargs__: dict[str, Any] | None = None,
) -> type[BaseModel]:
"""Create a Pydantic model from a JSON schema.
This function takes a JSON schema as input and dynamically creates a Pydantic
model class based on the schema. It supports various JSON schema features such
as nested objects, referenced definitions ($ref), arrays with typed items,
union types (anyOf/oneOf), and string formats.
Args:
json_schema: A dictionary representing the JSON schema.
root_schema: The root schema containing $defs. If not provided, the
current schema is treated as the root schema.
__config__: Pydantic configuration for the generated model.
__base__: Base class for the generated model. Defaults to BaseModel.
__module__: Module name for the generated model class.
__validators__: A dictionary of custom validators for the generated model.
__cls_kwargs__: Additional keyword arguments for the generated model class.
Returns:
A dynamically created Pydantic model class based on the provided JSON schema.
Example:
>>> schema = {
... "title": "Person",
... "type": "object",
... "properties": {
... "name": {"type": "string"},
... "age": {"type": "integer"},
... },
... "required": ["name"],
... }
>>> Person = create_model_from_schema(schema)
>>> person = Person(name="John", age=30)
>>> person.name
'John'
"""
effective_root = root_schema or json_schema
if "allOf" in json_schema:
json_schema = _merge_all_of_schemas(json_schema["allOf"], effective_root)
if "title" not in json_schema and "title" in (root_schema or {}):
json_schema["title"] = (root_schema or {}).get("title")
model_name = json_schema.get("title", "DynamicModel")
field_definitions = {
name: _json_schema_to_pydantic_field(
name, prop, json_schema.get("required", []), effective_root
)
for name, prop in (json_schema.get("properties", {}) or {}).items()
}
return create_model_base(
model_name,
__config__=__config__,
__base__=__base__,
__module__=__module__,
__validators__=__validators__,
__cls_kwargs__=__cls_kwargs__,
**field_definitions,
)
def _json_schema_to_pydantic_field(
name: str,
json_schema: dict[str, Any],
required: list[str],
root_schema: dict[str, Any],
) -> Any:
"""Convert a JSON schema property to a Pydantic field definition.
Args:
name: The field name.
json_schema: The JSON schema for this field.
required: List of required field names.
root_schema: The root schema for resolving $ref.
Returns:
A tuple of (type, Field) for use with create_model.
"""
type_ = _json_schema_to_pydantic_type(json_schema, root_schema, name_=name.title())
description = json_schema.get("description")
examples = json_schema.get("examples")
is_required = name in required
field_params: dict[str, Any] = {}
schema_extra: dict[str, Any] = {}
if description:
field_params["description"] = description
if examples:
schema_extra["examples"] = examples
default = ... if is_required else None
if isinstance(type_, type) and issubclass(type_, (int, float)):
if "minimum" in json_schema:
field_params["ge"] = json_schema["minimum"]
if "exclusiveMinimum" in json_schema:
field_params["gt"] = json_schema["exclusiveMinimum"]
if "maximum" in json_schema:
field_params["le"] = json_schema["maximum"]
if "exclusiveMaximum" in json_schema:
field_params["lt"] = json_schema["exclusiveMaximum"]
if "multipleOf" in json_schema:
field_params["multiple_of"] = json_schema["multipleOf"]
format_ = json_schema.get("format")
if format_ in FORMAT_TYPE_MAP:
pydantic_type = FORMAT_TYPE_MAP[format_]
if format_ == "password":
if json_schema.get("writeOnly"):
pydantic_type = SecretBytes
elif format_ == "uri":
allowed_schemes = json_schema.get("scheme")
if allowed_schemes:
if len(allowed_schemes) == 1 and allowed_schemes[0] == "http":
pydantic_type = HttpUrl
elif len(allowed_schemes) == 1 and allowed_schemes[0] == "file":
pydantic_type = FileUrl
type_ = pydantic_type
if isinstance(type_, type) and issubclass(type_, str):
if "minLength" in json_schema:
field_params["min_length"] = json_schema["minLength"]
if "maxLength" in json_schema:
field_params["max_length"] = json_schema["maxLength"]
if "pattern" in json_schema:
field_params["pattern"] = json_schema["pattern"]
if not is_required:
type_ = type_ | None
if schema_extra:
field_params["json_schema_extra"] = schema_extra
return type_, Field(default, **field_params)
def _resolve_ref(ref: str, root_schema: dict[str, Any]) -> dict[str, Any]:
"""Resolve a $ref to its actual schema.
Args:
ref: The $ref string (e.g., "#/$defs/MyType").
root_schema: The root schema containing $defs.
Returns:
The resolved schema dict.
"""
from typing import cast
ref_path = ref.split("/")
if ref.startswith("#/$defs/"):
ref_schema: dict[str, Any] = root_schema["$defs"]
start_idx = 2
else:
ref_schema = root_schema
start_idx = 1
for path in ref_path[start_idx:]:
ref_schema = cast(dict[str, Any], ref_schema[path])
return ref_schema
def _merge_all_of_schemas(
schemas: list[dict[str, Any]],
root_schema: dict[str, Any],
) -> dict[str, Any]:
"""Merge multiple allOf schemas into a single schema.
Combines properties and required fields from all schemas.
Args:
schemas: List of schemas to merge.
root_schema: The root schema for resolving $ref.
Returns:
Merged schema with combined properties and required fields.
"""
merged: dict[str, Any] = {"type": "object", "properties": {}, "required": []}
for schema in schemas:
if "$ref" in schema:
schema = _resolve_ref(schema["$ref"], root_schema)
if "properties" in schema:
merged["properties"].update(schema["properties"])
if "required" in schema:
for field in schema["required"]:
if field not in merged["required"]:
merged["required"].append(field)
if "title" in schema and "title" not in merged:
merged["title"] = schema["title"]
return merged
def _json_schema_to_pydantic_type(
json_schema: dict[str, Any],
root_schema: dict[str, Any],
*,
name_: str | None = None,
) -> Any:
"""Convert a JSON schema to a Python/Pydantic type.
Args:
json_schema: The JSON schema to convert.
root_schema: The root schema for resolving $ref.
name_: Optional name for nested models.
Returns:
A Python type corresponding to the JSON schema.
"""
ref = json_schema.get("$ref")
if ref:
ref_schema = _resolve_ref(ref, root_schema)
return _json_schema_to_pydantic_type(ref_schema, root_schema, name_=name_)
enum_values = json_schema.get("enum")
if enum_values:
return Literal[tuple(enum_values)]
if "const" in json_schema:
return Literal[json_schema["const"]]
any_of_schemas = []
if "anyOf" in json_schema or "oneOf" in json_schema:
any_of_schemas = json_schema.get("anyOf", []) + json_schema.get("oneOf", [])
if any_of_schemas:
any_of_types = [
_json_schema_to_pydantic_type(schema, root_schema)
for schema in any_of_schemas
]
return Union[tuple(any_of_types)] # noqa: UP007
all_of_schemas = json_schema.get("allOf")
if all_of_schemas:
if len(all_of_schemas) == 1:
return _json_schema_to_pydantic_type(
all_of_schemas[0], root_schema, name_=name_
)
merged = _merge_all_of_schemas(all_of_schemas, root_schema)
return _json_schema_to_pydantic_type(merged, root_schema, name_=name_)
type_ = json_schema.get("type")
if type_ == "string":
return str
if type_ == "integer":
return int
if type_ == "number":
return float
if type_ == "boolean":
return bool
if type_ == "array":
items_schema = json_schema.get("items")
if items_schema:
item_type = _json_schema_to_pydantic_type(
items_schema, root_schema, name_=name_
)
return list[item_type] # type: ignore[valid-type]
return list
if type_ == "object":
properties = json_schema.get("properties")
if properties:
json_schema_ = json_schema.copy()
if json_schema_.get("title") is None:
json_schema_["title"] = name_
return create_model_from_schema(json_schema_, root_schema=root_schema)
return dict
if type_ == "null":
return None
if type_ is None:
return Any
raise ValueError(f"Unsupported JSON schema type: {type_} from {json_schema}")

View File

@@ -26,5 +26,4 @@ class LLMMessage(TypedDict):
tool_call_id: NotRequired[str]
name: NotRequired[str]
tool_calls: NotRequired[list[dict[str, Any]]]
raw_tool_call_parts: NotRequired[list[Any]]
files: NotRequired[dict[str, FileInput]]

View File

@@ -1004,53 +1004,3 @@ def test_prepare_kickoff_param_files_override_message_files():
assert "files" in inputs
assert inputs["files"]["same.png"] is param_file # param takes precedence
def test_lite_agent_verbose_false_suppresses_printer_output():
"""Test that setting verbose=False suppresses all printer output."""
from crewai.agents.parser import AgentFinish
from crewai.types.usage_metrics import UsageMetrics
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Final Answer: Hello!"
mock_llm.stop = []
mock_llm.supports_stop_words.return_value = False
mock_llm.get_token_usage_summary.return_value = UsageMetrics(
total_tokens=100,
prompt_tokens=50,
completion_tokens=50,
cached_prompt_tokens=0,
successful_requests=1,
)
with pytest.warns(DeprecationWarning):
agent = LiteAgent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm=mock_llm,
verbose=False,
)
result = agent.kickoff("Say hello")
assert result is not None
assert isinstance(result, LiteAgentOutput)
# Verify the printer was never called
agent._printer.print = Mock()
# For a clean verification, patch printer before execution
with pytest.warns(DeprecationWarning):
agent2 = LiteAgent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm=mock_llm,
verbose=False,
)
mock_printer = Mock()
agent2._printer = mock_printer
agent2.kickoff("Say hello")
mock_printer.print.assert_not_called()

View File

@@ -1,224 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator. You are a
calculator assistant\nYour personal goal is: Perform calculations"},{"role":"user","content":"\nCurrent
Task: What is 7 times 6? Use the multiply_numbers tool.\n\nThis is VERY important
to you, your job depends on it!"}],"model":"gpt-4.1-mini","tool_choice":"auto","tools":[{"type":"function","function":{"name":"multiply_numbers","description":"Multiply
two numbers together.","parameters":{"properties":{"a":{"title":"A","type":"integer"},"b":{"title":"B","type":"integer"}},"required":["a","b"],"type":"object"}}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '589'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2gblVDQeSH6tTrJiUtxgjoVoPuAR\",\n \"object\":
\"chat.completion\",\n \"created\": 1769532813,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": null,\n \"tool_calls\": [\n {\n
\ \"id\": \"call_gO6PtjoOIDVeDWs7Wf680BHh\",\n \"type\":
\"function\",\n \"function\": {\n \"name\": \"multiply_numbers\",\n
\ \"arguments\": \"{\\\"a\\\":7,\\\"b\\\":6}\"\n }\n
\ }\n ],\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"tool_calls\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 100,\n \"completion_tokens\":
18,\n \"total_tokens\": 118,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 16:53:34 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '593'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator. You are a
calculator assistant\nYour personal goal is: Perform calculations"},{"role":"user","content":"\nCurrent
Task: What is 7 times 6? Use the multiply_numbers tool.\n\nThis is VERY important
to you, your job depends on it!"},{"role":"assistant","content":null,"tool_calls":[{"id":"call_gO6PtjoOIDVeDWs7Wf680BHh","type":"function","function":{"name":"multiply_numbers","arguments":"{\"a\":7,\"b\":6}"}}]},{"role":"tool","tool_call_id":"call_gO6PtjoOIDVeDWs7Wf680BHh","name":"multiply_numbers","content":"42"},{"role":"user","content":"Analyze
the tool result. If requirements are met, provide the Final Answer. Otherwise,
call the next tool. Deliver only the answer without meta-commentary."}],"model":"gpt-4.1-mini","tool_choice":"auto","tools":[{"type":"function","function":{"name":"multiply_numbers","description":"Multiply
two numbers together.","parameters":{"properties":{"a":{"title":"A","type":"integer"},"b":{"title":"B","type":"integer"}},"required":["a","b"],"type":"object"}}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1056'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2gbm9NaGCXkI3QwW3eOTFSP4L4lh\",\n \"object\":
\"chat.completion\",\n \"created\": 1769532814,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"42\",\n \"refusal\": null,\n
\ \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\":
\"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 162,\n \"completion_tokens\":
2,\n \"total_tokens\": 164,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 16:53:34 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '259'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,351 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. You are a
test agent\nYour personal goal is: Try to use the dangerous operation tool"},{"role":"user","content":"\nCurrent
Task: Use the dangerous_operation tool with action ''delete_all''.\n\nThis is
the expected criteria for your final answer: The result of the operation\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nThis
is VERY important to you, your job depends on it!"}],"model":"gpt-4.1-mini","tool_choice":"auto","tools":[{"type":"function","function":{"name":"dangerous_operation","description":"Perform
a dangerous operation that should be blocked.","parameters":{"properties":{"action":{"title":"Action","type":"string"}},"required":["action"],"type":"object"}}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '773'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2giKEOxBDVqJVqVECwcFjbzdQKSA\",\n \"object\":
\"chat.completion\",\n \"created\": 1769533220,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": null,\n \"tool_calls\": [\n {\n
\ \"id\": \"call_3OM1qS0QaWqhiJaHyJbNz1ME\",\n \"type\":
\"function\",\n \"function\": {\n \"name\": \"dangerous_operation\",\n
\ \"arguments\": \"{\\\"action\\\":\\\"delete_all\\\"}\"\n }\n
\ }\n ],\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"tool_calls\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 133,\n \"completion_tokens\":
17,\n \"total_tokens\": 150,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 17:00:20 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '484'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. You are a
test agent\nYour personal goal is: Try to use the dangerous operation tool"},{"role":"user","content":"\nCurrent
Task: Use the dangerous_operation tool with action ''delete_all''.\n\nThis is
the expected criteria for your final answer: The result of the operation\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nThis
is VERY important to you, your job depends on it!"},{"role":"assistant","content":null,"tool_calls":[{"id":"call_3OM1qS0QaWqhiJaHyJbNz1ME","type":"function","function":{"name":"dangerous_operation","arguments":"{\"action\":\"delete_all\"}"}}]},{"role":"tool","tool_call_id":"call_3OM1qS0QaWqhiJaHyJbNz1ME","name":"dangerous_operation","content":"Tool
execution blocked by hook. Tool: dangerous_operation"},{"role":"user","content":"Analyze
the tool result. If requirements are met, provide the Final Answer. Otherwise,
call the next tool. Deliver only the answer without meta-commentary."}],"model":"gpt-4.1-mini","tool_choice":"auto","tools":[{"type":"function","function":{"name":"dangerous_operation","description":"Perform
a dangerous operation that should be blocked.","parameters":{"properties":{"action":{"title":"Action","type":"string"}},"required":["action"],"type":"object"}}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1311'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2giLnD91JxhK0yXninQ7oHYttNDY\",\n \"object\":
\"chat.completion\",\n \"created\": 1769533221,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": null,\n \"tool_calls\": [\n {\n
\ \"id\": \"call_qF1c2e31GgjoSNJx0HBxI3zX\",\n \"type\":
\"function\",\n \"function\": {\n \"name\": \"dangerous_operation\",\n
\ \"arguments\": \"{\\\"action\\\":\\\"delete_all\\\"}\"\n }\n
\ }\n ],\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"tool_calls\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 204,\n \"completion_tokens\":
17,\n \"total_tokens\": 221,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 17:00:21 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '447'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Test Agent. You are a
test agent\nYour personal goal is: Try to use the dangerous operation tool"},{"role":"user","content":"\nCurrent
Task: Use the dangerous_operation tool with action ''delete_all''.\n\nThis is
the expected criteria for your final answer: The result of the operation\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nThis
is VERY important to you, your job depends on it!"},{"role":"assistant","content":null,"tool_calls":[{"id":"call_3OM1qS0QaWqhiJaHyJbNz1ME","type":"function","function":{"name":"dangerous_operation","arguments":"{\"action\":\"delete_all\"}"}}]},{"role":"tool","tool_call_id":"call_3OM1qS0QaWqhiJaHyJbNz1ME","name":"dangerous_operation","content":"Tool
execution blocked by hook. Tool: dangerous_operation"},{"role":"user","content":"Analyze
the tool result. If requirements are met, provide the Final Answer. Otherwise,
call the next tool. Deliver only the answer without meta-commentary."},{"role":"assistant","content":null,"tool_calls":[{"id":"call_qF1c2e31GgjoSNJx0HBxI3zX","type":"function","function":{"name":"dangerous_operation","arguments":"{\"action\":\"delete_all\"}"}}]},{"role":"tool","tool_call_id":"call_qF1c2e31GgjoSNJx0HBxI3zX","name":"dangerous_operation","content":"Tool
execution blocked by hook. Tool: dangerous_operation"},{"role":"user","content":"Analyze
the tool result. If requirements are met, provide the Final Answer. Otherwise,
call the next tool. Deliver only the answer without meta-commentary."}],"model":"gpt-4.1-mini","tool_choice":"auto","tools":[{"type":"function","function":{"name":"dangerous_operation","description":"Perform
a dangerous operation that should be blocked.","parameters":{"properties":{"action":{"title":"Action","type":"string"}},"required":["action"],"type":"object"}}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1849'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2giM1tAvEOCNwDw1qNmNUN5PIg2Y\",\n \"object\":
\"chat.completion\",\n \"created\": 1769533222,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"The dangerous_operation tool with action
'delete_all' was blocked and did not execute. There is no result from the
operation to provide.\",\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 275,\n \"completion_tokens\":
28,\n \"total_tokens\": 303,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 17:00:22 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '636'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,230 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Math Assistant. You are
a math assistant that helps with division\nYour personal goal is: Perform division
calculations accurately"},{"role":"user","content":"\nCurrent Task: Calculate
100 divided by 4 using the divide_numbers tool.\n\nThis is the expected criteria
for your final answer: The result of the division\nyou MUST return the actual
complete content as the final answer, not a summary.\n\nThis is VERY important
to you, your job depends on it!"}],"model":"gpt-4.1-mini","tool_choice":"auto","tools":[{"type":"function","function":{"name":"divide_numbers","description":"Divide
first number by second number.","parameters":{"properties":{"a":{"title":"A","type":"integer"},"b":{"title":"B","type":"integer"}},"required":["a","b"],"type":"object"}}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '809'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2gbkWUn8InDLeD1Cf8w0LxiUQOIS\",\n \"object\":
\"chat.completion\",\n \"created\": 1769532812,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": null,\n \"tool_calls\": [\n {\n
\ \"id\": \"call_gwIV3i71RNqfpr7KguEciCuV\",\n \"type\":
\"function\",\n \"function\": {\n \"name\": \"divide_numbers\",\n
\ \"arguments\": \"{\\\"a\\\":100,\\\"b\\\":4}\"\n }\n
\ }\n ],\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"tool_calls\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 140,\n \"completion_tokens\":
18,\n \"total_tokens\": 158,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 16:53:32 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '435'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Math Assistant. You are
a math assistant that helps with division\nYour personal goal is: Perform division
calculations accurately"},{"role":"user","content":"\nCurrent Task: Calculate
100 divided by 4 using the divide_numbers tool.\n\nThis is the expected criteria
for your final answer: The result of the division\nyou MUST return the actual
complete content as the final answer, not a summary.\n\nThis is VERY important
to you, your job depends on it!"},{"role":"assistant","content":null,"tool_calls":[{"id":"call_gwIV3i71RNqfpr7KguEciCuV","type":"function","function":{"name":"divide_numbers","arguments":"{\"a\":100,\"b\":4}"}}]},{"role":"tool","tool_call_id":"call_gwIV3i71RNqfpr7KguEciCuV","name":"divide_numbers","content":"25.0"},{"role":"user","content":"Analyze
the tool result. If requirements are met, provide the Final Answer. Otherwise,
call the next tool. Deliver only the answer without meta-commentary."}],"model":"gpt-4.1-mini","tool_choice":"auto","tools":[{"type":"function","function":{"name":"divide_numbers","description":"Divide
first number by second number.","parameters":{"properties":{"a":{"title":"A","type":"integer"},"b":{"title":"B","type":"integer"}},"required":["a","b"],"type":"object"}}}]}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1276'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2gbkHw19D5oEBOhpZP5FR5MvRFgb\",\n \"object\":
\"chat.completion\",\n \"created\": 1769532812,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"25.0\",\n \"refusal\": null,\n
\ \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\":
\"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 204,\n \"completion_tokens\":
4,\n \"total_tokens\": 208,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 16:53:33 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '523'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,22 +1,7 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Assistant.
You are a helpful calculator assistant\nYour personal goal is: Help with math
calculations\n\nYou ONLY have access to the following tools, and should NEVER
make up tools that are not listed here:\n\nTool Name: calculate_sum\nTool Arguments:
{\n \"properties\": {\n \"a\": {\n \"title\": \"A\",\n \"type\":
\"integer\"\n },\n \"b\": {\n \"title\": \"B\",\n \"type\":
\"integer\"\n }\n },\n \"required\": [\n \"a\",\n \"b\"\n ],\n \"title\":
\"Calculate_Sum\",\n \"type\": \"object\",\n \"additionalProperties\": false\n}\nTool
Description: Add two numbers together.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate_sum], just the name, exactly
as it''s written.\nAction Input: the input to the action, just a simple JSON
object, enclosed in curly braces, using \" to wrap keys and values.\nObservation:
the result of the action\n```\n\nOnce all necessary information is gathered,
return the following format:\n\n```\nThought: I now know the final answer\nFinal
Answer: the final answer to the original input question\n```"},{"role":"user","content":"What
is 5 + 3? Use the calculate_sum tool."}],"model":"gpt-4.1-mini"}'
body: '{"messages":[{"role":"system","content":"You are Calculator Assistant. You are a helpful calculator assistant\nYour personal goal is: Help with math calculations\n\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\nTool Name: calculate_sum\nTool Arguments: {''a'': {''description'': None, ''type'': ''int''}, ''b'': {''description'': None, ''type'': ''int''}}\nTool Description: Add two numbers together.\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [calculate_sum], just the name, exactly as it''s written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final
answer to the original input question\n```"},{"role":"user","content":"What is 5 + 3? Use the calculate_sum tool."}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
@@ -29,7 +14,7 @@ interactions:
connection:
- keep-alive
content-length:
- '1356'
- '1119'
content-type:
- application/json
host:
@@ -56,18 +41,8 @@ interactions:
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2gSz7JfTi4NQ2QRTANg8Z2afJI8b\",\n \"object\":
\"chat.completion\",\n \"created\": 1769532269,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"```\\nThought: I need to use the calculate_sum
tool to find the sum of 5 and 3\\nAction: calculate_sum\\nAction Input: {\\\"a\\\":5,\\\"b\\\":3}\\n```\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
295,\n \"completion_tokens\": 41,\n \"total_tokens\": 336,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
string: "{\n \"id\": \"chatcmpl-CiksV15hVLWURKZH4BxQEGjiCFWpz\",\n \"object\": \"chat.completion\",\n \"created\": 1764782667,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"```\\nThought: I should use the calculate_sum tool to add 5 and 3.\\nAction: calculate_sum\\nAction Input: {\\\"a\\\": 5, \\\"b\\\": 3}\\n```\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 234,\n \"completion_tokens\": 40,\n \"total_tokens\": 274,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\"\
: \"default\",\n \"system_fingerprint\": \"fp_9766e549b2\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
@@ -76,7 +51,7 @@ interactions:
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 16:44:30 GMT
- Wed, 03 Dec 2025 17:24:28 GMT
Server:
- cloudflare
Set-Cookie:
@@ -96,11 +71,13 @@ interactions:
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '827'
- '681'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '871'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
@@ -121,25 +98,8 @@ interactions:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Assistant.
You are a helpful calculator assistant\nYour personal goal is: Help with math
calculations\n\nYou ONLY have access to the following tools, and should NEVER
make up tools that are not listed here:\n\nTool Name: calculate_sum\nTool Arguments:
{\n \"properties\": {\n \"a\": {\n \"title\": \"A\",\n \"type\":
\"integer\"\n },\n \"b\": {\n \"title\": \"B\",\n \"type\":
\"integer\"\n }\n },\n \"required\": [\n \"a\",\n \"b\"\n ],\n \"title\":
\"Calculate_Sum\",\n \"type\": \"object\",\n \"additionalProperties\": false\n}\nTool
Description: Add two numbers together.\n\nIMPORTANT: Use the following format
in your response:\n\n```\nThought: you should always think about what to do\nAction:
the action to take, only one name of [calculate_sum], just the name, exactly
as it''s written.\nAction Input: the input to the action, just a simple JSON
object, enclosed in curly braces, using \" to wrap keys and values.\nObservation:
the result of the action\n```\n\nOnce all necessary information is gathered,
return the following format:\n\n```\nThought: I now know the final answer\nFinal
Answer: the final answer to the original input question\n```"},{"role":"user","content":"What
is 5 + 3? Use the calculate_sum tool."},{"role":"assistant","content":"```\nThought:
I need to use the calculate_sum tool to find the sum of 5 and 3\nAction: calculate_sum\nAction
Input: {\"a\":5,\"b\":3}\n```\nObservation: 8"}],"model":"gpt-4.1-mini"}'
body: '{"messages":[{"role":"system","content":"You are Calculator Assistant. You are a helpful calculator assistant\nYour personal goal is: Help with math calculations\n\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\nTool Name: calculate_sum\nTool Arguments: {''a'': {''description'': None, ''type'': ''int''}, ''b'': {''description'': None, ''type'': ''int''}}\nTool Description: Add two numbers together.\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [calculate_sum], just the name, exactly as it''s written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final
answer to the original input question\n```"},{"role":"user","content":"What is 5 + 3? Use the calculate_sum tool."},{"role":"assistant","content":"```\nThought: I should use the calculate_sum tool to add 5 and 3.\nAction: calculate_sum\nAction Input: {\"a\": 5, \"b\": 3}\n```\nObservation: 8"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
@@ -152,7 +112,7 @@ interactions:
connection:
- keep-alive
content-length:
- '1544'
- '1298'
content-type:
- application/json
cookie:
@@ -181,18 +141,7 @@ interactions:
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D2gT0RU66XqjAUOXnGmokD1Q8Fman\",\n \"object\":
\"chat.completion\",\n \"created\": 1769532270,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"```\\nThought: I now know the final
answer\\nFinal Answer: 8\\n```\",\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 345,\n \"completion_tokens\":
18,\n \"total_tokens\": 363,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_376a7ccef1\"\n}\n"
string: "{\n \"id\": \"chatcmpl-CiksWrVbyJFurKCm7XPRU1b1pT7qF\",\n \"object\": \"chat.completion\",\n \"created\": 1764782668,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"```\\nThought: I now know the final answer\\nFinal Answer: 8\\n```\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 283,\n \"completion_tokens\": 18,\n \"total_tokens\": 301,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_9766e549b2\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
@@ -201,7 +150,7 @@ interactions:
Content-Type:
- application/json
Date:
- Tue, 27 Jan 2026 16:44:31 GMT
- Wed, 03 Dec 2025 17:24:29 GMT
Server:
- cloudflare
Strict-Transport-Security:
@@ -219,11 +168,208 @@ interactions:
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '606'
- '427'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '442'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Assistant. You are a helpful calculator assistant\nYour personal goal is: Help with math calculations\n\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\nTool Name: calculate_sum\nTool Arguments: {''a'': {''description'': None, ''type'': ''int''}, ''b'': {''description'': None, ''type'': ''int''}}\nTool Description: Add two numbers together.\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [calculate_sum], just the name, exactly as it''s written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final
answer to the original input question\n```"},{"role":"user","content":"What is 5 + 3? Use the calculate_sum tool."}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1119'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CimX8hwYiUUZijApUDk1yBMzTpBj9\",\n \"object\": \"chat.completion\",\n \"created\": 1764789030,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"```\\nThought: I need to add 5 and 3 using the calculate_sum tool.\\nAction: calculate_sum\\nAction Input: {\\\"a\\\":5,\\\"b\\\":3}\\n```\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 234,\n \"completion_tokens\": 37,\n \"total_tokens\": 271,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\"\
: \"default\",\n \"system_fingerprint\": \"fp_9766e549b2\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 03 Dec 2025 19:10:33 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '2329'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '2349'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
- request:
body: '{"messages":[{"role":"system","content":"You are Calculator Assistant. You are a helpful calculator assistant\nYour personal goal is: Help with math calculations\n\nYou ONLY have access to the following tools, and should NEVER make up tools that are not listed here:\n\nTool Name: calculate_sum\nTool Arguments: {''a'': {''description'': None, ''type'': ''int''}, ''b'': {''description'': None, ''type'': ''int''}}\nTool Description: Add two numbers together.\n\nIMPORTANT: Use the following format in your response:\n\n```\nThought: you should always think about what to do\nAction: the action to take, only one name of [calculate_sum], just the name, exactly as it''s written.\nAction Input: the input to the action, just a simple JSON object, enclosed in curly braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce all necessary information is gathered, return the following format:\n\n```\nThought: I now know the final answer\nFinal Answer: the final
answer to the original input question\n```"},{"role":"user","content":"What is 5 + 3? Use the calculate_sum tool."},{"role":"assistant","content":"```\nThought: I need to add 5 and 3 using the calculate_sum tool.\nAction: calculate_sum\nAction Input: {\"a\":5,\"b\":3}\n```\nObservation: 8"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '1295'
content-type:
- application/json
cookie:
- COOKIE-XXX
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-CimXBrY5sdbr2pJnqGlazPTra4dor\",\n \"object\": \"chat.completion\",\n \"created\": 1764789033,\n \"model\": \"gpt-4.1-mini-2025-04-14\",\n \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"```\\nThought: I now know the final answer\\nFinal Answer: 8\\n```\",\n \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 280,\n \"completion_tokens\": 18,\n \"total_tokens\": 298,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": \"default\",\n \"system_fingerprint\": \"fp_9766e549b2\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 03 Dec 2025 19:10:35 GMT
Server:
- cloudflare
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1647'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1694'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:

View File

@@ -1,112 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Language Detector. You
are an expert linguist who can identify languages.\nYour personal goal is: Detect
the language of text"},{"role":"user","content":"\nCurrent Task: What language
is this text written in: ''Hello, how are you?''\n\nThis is the expected criteria
for your final answer: The detected language (e.g., English, Spanish, etc.)\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nProvide
your complete response:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '530'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D39bkotgEapBcz1sSIXvhPhK9G7FD\",\n \"object\":
\"chat.completion\",\n \"created\": 1769644288,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"English\",\n \"refusal\": null,\n
\ \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\":
\"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 101,\n \"completion_tokens\":
1,\n \"total_tokens\": 102,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_3683ee3deb\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 28 Jan 2026 23:51:28 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '279'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -1,111 +0,0 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Classifier. You classify
text sentiment accurately.\nYour personal goal is: Classify text sentiment"},{"role":"user","content":"\nCurrent
Task: Classify the sentiment of: ''I love this product!''\n\nThis is the expected
criteria for your final answer: One word: positive, negative, or neutral\nyou
MUST return the actual complete content as the final answer, not a summary.\n\nProvide
your complete response:"}],"model":"gpt-4o-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '481'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.13.3
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-D39bkVPelOZanWIMBoIyzsuj072sM\",\n \"object\":
\"chat.completion\",\n \"created\": 1769644288,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"positive\",\n \"refusal\": null,\n
\ \"annotations\": []\n },\n \"logprobs\": null,\n \"finish_reason\":
\"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 89,\n \"completion_tokens\":
1,\n \"total_tokens\": 90,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
\"default\",\n \"system_fingerprint\": \"fp_3683ee3deb\"\n}\n"
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Wed, 28 Jan 2026 23:51:29 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '323'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -590,233 +590,3 @@ class TestToolHooksIntegration:
# Clean up hooks
unregister_before_tool_call_hook(before_tool_call_hook)
unregister_after_tool_call_hook(after_tool_call_hook)
class TestNativeToolCallingHooksIntegration:
"""Integration tests for hooks with native function calling (Agent and Crew)."""
@pytest.mark.vcr()
def test_agent_native_tool_hooks_before_and_after(self):
"""Test that Agent with native tool calling executes before/after hooks."""
import os
from crewai import Agent
from crewai.tools import tool
hook_calls = {"before": [], "after": []}
@tool("multiply_numbers")
def multiply_numbers(a: int, b: int) -> int:
"""Multiply two numbers together."""
return a * b
def before_hook(context: ToolCallHookContext) -> bool | None:
hook_calls["before"].append({
"tool_name": context.tool_name,
"tool_input": dict(context.tool_input),
"has_agent": context.agent is not None,
})
return None
def after_hook(context: ToolCallHookContext) -> str | None:
hook_calls["after"].append({
"tool_name": context.tool_name,
"tool_result": context.tool_result,
"has_agent": context.agent is not None,
})
return None
register_before_tool_call_hook(before_hook)
register_after_tool_call_hook(after_hook)
try:
agent = Agent(
role="Calculator",
goal="Perform calculations",
backstory="You are a calculator assistant",
tools=[multiply_numbers],
verbose=True,
)
agent.kickoff(
messages="What is 7 times 6? Use the multiply_numbers tool."
)
# Verify before hook was called
assert len(hook_calls["before"]) > 0, "Before hook was never called"
before_call = hook_calls["before"][0]
assert before_call["tool_name"] == "multiply_numbers"
assert "a" in before_call["tool_input"]
assert "b" in before_call["tool_input"]
assert before_call["has_agent"] is True
# Verify after hook was called
assert len(hook_calls["after"]) > 0, "After hook was never called"
after_call = hook_calls["after"][0]
assert after_call["tool_name"] == "multiply_numbers"
assert "42" in str(after_call["tool_result"])
assert after_call["has_agent"] is True
finally:
unregister_before_tool_call_hook(before_hook)
unregister_after_tool_call_hook(after_hook)
@pytest.mark.vcr()
def test_crew_native_tool_hooks_before_and_after(self):
"""Test that Crew with Agent executes before/after hooks with full context."""
import os
from crewai import Agent, Crew, Task
from crewai.tools import tool
hook_calls = {"before": [], "after": []}
@tool("divide_numbers")
def divide_numbers(a: int, b: int) -> float:
"""Divide first number by second number."""
return a / b
def before_hook(context: ToolCallHookContext) -> bool | None:
hook_calls["before"].append({
"tool_name": context.tool_name,
"tool_input": dict(context.tool_input),
"has_agent": context.agent is not None,
"has_task": context.task is not None,
"has_crew": context.crew is not None,
"agent_role": context.agent.role if context.agent else None,
})
return None
def after_hook(context: ToolCallHookContext) -> str | None:
hook_calls["after"].append({
"tool_name": context.tool_name,
"tool_result": context.tool_result,
"has_agent": context.agent is not None,
"has_task": context.task is not None,
"has_crew": context.crew is not None,
})
return None
register_before_tool_call_hook(before_hook)
register_after_tool_call_hook(after_hook)
try:
agent = Agent(
role="Math Assistant",
goal="Perform division calculations accurately",
backstory="You are a math assistant that helps with division",
tools=[divide_numbers],
verbose=True,
)
task = Task(
description="Calculate 100 divided by 4 using the divide_numbers tool.",
expected_output="The result of the division",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
)
crew.kickoff()
# Verify before hook was called with full context
assert len(hook_calls["before"]) > 0, "Before hook was never called"
before_call = hook_calls["before"][0]
assert before_call["tool_name"] == "divide_numbers"
assert "a" in before_call["tool_input"]
assert "b" in before_call["tool_input"]
assert before_call["has_agent"] is True
assert before_call["has_task"] is True
assert before_call["has_crew"] is True
assert before_call["agent_role"] == "Math Assistant"
# Verify after hook was called with full context
assert len(hook_calls["after"]) > 0, "After hook was never called"
after_call = hook_calls["after"][0]
assert after_call["tool_name"] == "divide_numbers"
assert "25" in str(after_call["tool_result"])
assert after_call["has_agent"] is True
assert after_call["has_task"] is True
assert after_call["has_crew"] is True
finally:
unregister_before_tool_call_hook(before_hook)
unregister_after_tool_call_hook(after_hook)
@pytest.mark.vcr()
def test_before_hook_blocks_tool_execution_in_crew(self):
"""Test that returning False from before hook blocks tool execution."""
import os
from crewai import Agent, Crew, Task
from crewai.tools import tool
hook_calls = {"before": [], "after": [], "tool_executed": False}
@tool("dangerous_operation")
def dangerous_operation(action: str) -> str:
"""Perform a dangerous operation that should be blocked."""
hook_calls["tool_executed"] = True
return f"Executed: {action}"
def blocking_before_hook(context: ToolCallHookContext) -> bool | None:
hook_calls["before"].append({
"tool_name": context.tool_name,
"tool_input": dict(context.tool_input),
})
# Block all calls to dangerous_operation
if context.tool_name == "dangerous_operation":
return False
return None
def after_hook(context: ToolCallHookContext) -> str | None:
hook_calls["after"].append({
"tool_name": context.tool_name,
"tool_result": context.tool_result,
})
return None
register_before_tool_call_hook(blocking_before_hook)
register_after_tool_call_hook(after_hook)
try:
agent = Agent(
role="Test Agent",
goal="Try to use the dangerous operation tool",
backstory="You are a test agent",
tools=[dangerous_operation],
verbose=True,
)
task = Task(
description="Use the dangerous_operation tool with action 'delete_all'.",
expected_output="The result of the operation",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
verbose=True,
)
crew.kickoff()
# Verify before hook was called
assert len(hook_calls["before"]) > 0, "Before hook was never called"
before_call = hook_calls["before"][0]
assert before_call["tool_name"] == "dangerous_operation"
# Verify the actual tool function was NOT executed
assert hook_calls["tool_executed"] is False, "Tool should have been blocked"
# Verify after hook was still called (with blocked message)
assert len(hook_calls["after"]) > 0, "After hook was never called"
after_call = hook_calls["after"][0]
assert "blocked" in after_call["tool_result"].lower()
finally:
unregister_before_tool_call_hook(blocking_before_hook)
unregister_after_tool_call_hook(after_hook)

View File

@@ -157,10 +157,10 @@ async def test_anthropic_async_with_response_model():
"Say hello in French",
response_model=GreetingResponse
)
# When response_model is provided, the result is already a parsed Pydantic model instance
assert isinstance(result, GreetingResponse)
assert isinstance(result.greeting, str)
assert isinstance(result.language, str)
model = GreetingResponse.model_validate_json(result)
assert isinstance(model, GreetingResponse)
assert isinstance(model.greeting, str)
assert isinstance(model.language, str)
@pytest.mark.vcr()

View File

@@ -799,131 +799,3 @@ def test_google_express_mode_works() -> None:
assert result.token_usage.prompt_tokens > 0
assert result.token_usage.completion_tokens > 0
assert result.token_usage.successful_requests >= 1
def test_gemini_2_0_model_detection():
"""Test that Gemini 2.0 models are properly detected."""
# Test Gemini 2.0 models
llm_2_0 = LLM(model="google/gemini-2.0-flash-001")
from crewai.llms.providers.gemini.completion import GeminiCompletion
assert isinstance(llm_2_0, GeminiCompletion)
assert llm_2_0.is_gemini_2_0 is True
llm_2_5 = LLM(model="google/gemini-2.5-flash")
assert isinstance(llm_2_5, GeminiCompletion)
assert llm_2_5.is_gemini_2_0 is True
# Test non-2.0 models
llm_1_5 = LLM(model="google/gemini-1.5-pro")
assert isinstance(llm_1_5, GeminiCompletion)
assert llm_1_5.is_gemini_2_0 is False
def test_add_property_ordering_to_schema():
"""Test that _add_property_ordering correctly adds propertyOrdering to schemas."""
from crewai.llms.providers.gemini.completion import GeminiCompletion
# Test simple object schema
simple_schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer"},
"email": {"type": "string"}
}
}
result = GeminiCompletion._add_property_ordering(simple_schema)
assert "propertyOrdering" in result
assert result["propertyOrdering"] == ["name", "age", "email"]
# Test nested object schema
nested_schema = {
"type": "object",
"properties": {
"user": {
"type": "object",
"properties": {
"name": {"type": "string"},
"contact": {
"type": "object",
"properties": {
"email": {"type": "string"},
"phone": {"type": "string"}
}
}
}
},
"id": {"type": "integer"}
}
}
result = GeminiCompletion._add_property_ordering(nested_schema)
assert "propertyOrdering" in result
assert result["propertyOrdering"] == ["user", "id"]
assert "propertyOrdering" in result["properties"]["user"]
assert result["properties"]["user"]["propertyOrdering"] == ["name", "contact"]
assert "propertyOrdering" in result["properties"]["user"]["properties"]["contact"]
assert result["properties"]["user"]["properties"]["contact"]["propertyOrdering"] == ["email", "phone"]
def test_gemini_2_0_response_model_with_property_ordering():
"""Test that Gemini 2.0 models include propertyOrdering in response schemas."""
from pydantic import BaseModel, Field
class TestResponse(BaseModel):
"""Test response model."""
name: str = Field(..., description="The name")
age: int = Field(..., description="The age")
email: str = Field(..., description="The email")
llm = LLM(model="google/gemini-2.0-flash-001")
# Prepare generation config with response model
config = llm._prepare_generation_config(response_model=TestResponse)
# Verify that the config has response_json_schema
assert hasattr(config, 'response_json_schema') or 'response_json_schema' in config.__dict__
# Get the schema
if hasattr(config, 'response_json_schema'):
schema = config.response_json_schema
else:
schema = config.__dict__.get('response_json_schema', {})
# Verify propertyOrdering is present for Gemini 2.0
assert "propertyOrdering" in schema
assert "name" in schema["propertyOrdering"]
assert "age" in schema["propertyOrdering"]
assert "email" in schema["propertyOrdering"]
def test_gemini_1_5_response_model_uses_response_schema():
"""Test that Gemini 1.5 models use response_schema parameter (not response_json_schema)."""
from pydantic import BaseModel, Field
class TestResponse(BaseModel):
"""Test response model."""
name: str = Field(..., description="The name")
age: int = Field(..., description="The age")
llm = LLM(model="google/gemini-1.5-pro")
# Prepare generation config with response model
config = llm._prepare_generation_config(response_model=TestResponse)
# Verify that the config uses response_schema (not response_json_schema)
assert hasattr(config, 'response_schema') or 'response_schema' in config.__dict__
assert not (hasattr(config, 'response_json_schema') and config.response_json_schema is not None)
# Get the schema
if hasattr(config, 'response_schema'):
schema = config.response_schema
else:
schema = config.__dict__.get('response_schema')
# For Gemini 1.5, response_schema should be the Pydantic model itself
# The SDK handles conversion internally
assert schema is TestResponse or isinstance(schema, type)

View File

@@ -540,9 +540,7 @@ def test_openai_streaming_with_response_model():
result = llm.call("Test question", response_model=TestResponse)
assert result is not None
assert isinstance(result, TestResponse)
assert result.answer == "test"
assert result.confidence == 0.95
assert isinstance(result, str)
assert mock_stream.called
call_kwargs = mock_stream.call_args[1]

View File

@@ -1,6 +1,6 @@
import os
import threading
from unittest.mock import patch
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Crew, Task
@@ -121,3 +121,90 @@ def test_telemetry_singleton_pattern():
thread.join()
assert all(instance is telemetry1 for instance in instances)
def test_signal_handler_registration_skipped_in_non_main_thread():
"""Test that signal handler registration is skipped when running from a non-main thread.
This test verifies that when Telemetry is initialized from a non-main thread,
the signal handler registration is skipped without raising noisy ValueError tracebacks.
See: https://github.com/crewAIInc/crewAI/issues/4289
"""
Telemetry._instance = None
result = {"register_signal_handler_called": False, "error": None}
def init_telemetry_in_thread():
try:
with patch("crewai.telemetry.telemetry.TracerProvider"):
with patch.object(
Telemetry,
"_register_signal_handler",
wraps=lambda *args, **kwargs: None,
) as mock_register:
telemetry = Telemetry()
result["register_signal_handler_called"] = mock_register.called
result["telemetry"] = telemetry
except Exception as e:
result["error"] = e
thread = threading.Thread(target=init_telemetry_in_thread)
thread.start()
thread.join()
assert result["error"] is None, f"Unexpected error: {result['error']}"
assert (
result["register_signal_handler_called"] is False
), "Signal handler should not be registered in non-main thread"
def test_signal_handler_registration_skipped_logs_debug_message():
"""Test that a debug message is logged when signal handler registration is skipped.
This test verifies that when Telemetry is initialized from a non-main thread,
a debug message is logged indicating that signal handler registration was skipped.
"""
Telemetry._instance = None
result = {"telemetry": None, "error": None, "debug_calls": []}
mock_logger_debug = MagicMock()
def init_telemetry_in_thread():
try:
with patch("crewai.telemetry.telemetry.TracerProvider"):
with patch(
"crewai.telemetry.telemetry.logger.debug", mock_logger_debug
):
result["telemetry"] = Telemetry()
result["debug_calls"] = [
str(call) for call in mock_logger_debug.call_args_list
]
except Exception as e:
result["error"] = e
thread = threading.Thread(target=init_telemetry_in_thread)
thread.start()
thread.join()
assert result["error"] is None, f"Unexpected error: {result['error']}"
assert result["telemetry"] is not None
debug_calls = result["debug_calls"]
assert any(
"Skipping signal handler registration" in call for call in debug_calls
), f"Expected debug message about skipping signal handler registration, got: {debug_calls}"
def test_signal_handlers_registered_in_main_thread():
"""Test that signal handlers are registered when running from the main thread."""
Telemetry._instance = None
with patch("crewai.telemetry.telemetry.TracerProvider"):
with patch(
"crewai.telemetry.telemetry.Telemetry._register_signal_handler"
) as mock_register:
telemetry = Telemetry()
assert telemetry.ready is True
assert mock_register.call_count >= 2

View File

@@ -2585,7 +2585,6 @@ def test_warning_long_term_memory_without_entity_memory():
goal="You research about math.",
backstory="You're an expert in research and you love to learn new things.",
allow_delegation=False,
verbose=True,
)
task1 = Task(

View File

@@ -1,258 +0,0 @@
"""Test verbose control for Flow and Crew."""
import os
from unittest.mock import patch
from crewai.events.event_listener import EventListener
from crewai.flow.flow import Flow, start, listen
from crewai.utilities.logger_utils import should_enable_verbose
class TestShouldEnableVerbose:
"""Test the should_enable_verbose utility function."""
def test_override_true_returns_true(self):
"""Test that explicit override=True always returns True."""
assert should_enable_verbose(override=True) is True
def test_override_false_returns_false(self):
"""Test that explicit override=False always returns False."""
assert should_enable_verbose(override=False) is False
def test_env_var_false_disables_verbose(self):
"""Test that CREWAI_VERBOSE=false disables verbose."""
with patch.dict(os.environ, {"CREWAI_VERBOSE": "false"}):
assert should_enable_verbose() is False
def test_env_var_0_disables_verbose(self):
"""Test that CREWAI_VERBOSE=0 disables verbose."""
with patch.dict(os.environ, {"CREWAI_VERBOSE": "0"}):
assert should_enable_verbose() is False
def test_env_var_true_enables_verbose(self):
"""Test that CREWAI_VERBOSE=true enables verbose."""
with patch.dict(os.environ, {"CREWAI_VERBOSE": "true"}):
assert should_enable_verbose() is True
def test_env_var_1_enables_verbose(self):
"""Test that CREWAI_VERBOSE=1 enables verbose."""
with patch.dict(os.environ, {"CREWAI_VERBOSE": "1"}):
assert should_enable_verbose() is True
def test_no_env_var_defaults_to_true(self):
"""Test that no CREWAI_VERBOSE env var defaults to True."""
with patch.dict(os.environ, {}, clear=True):
# Remove CREWAI_VERBOSE if it exists
os.environ.pop("CREWAI_VERBOSE", None)
assert should_enable_verbose() is True
def test_override_takes_precedence_over_env_var(self):
"""Test that explicit override takes precedence over env var."""
with patch.dict(os.environ, {"CREWAI_VERBOSE": "false"}):
assert should_enable_verbose(override=True) is True
with patch.dict(os.environ, {"CREWAI_VERBOSE": "true"}):
assert should_enable_verbose(override=False) is False
class TestFlowVerboseControl:
"""Test verbose control in Flow class."""
def test_flow_verbose_default_is_true(self):
"""Test that Flow verbose defaults to True when no env var is set."""
# Remove CREWAI_VERBOSE if it exists
os.environ.pop("CREWAI_VERBOSE", None)
class SimpleFlow(Flow):
@start()
def step_1(self):
return "done"
flow = SimpleFlow()
assert flow.verbose is True
def test_flow_verbose_false_disables_logging(self):
"""Test that Flow with verbose=False disables logging."""
class SimpleFlow(Flow):
@start()
def step_1(self):
return "done"
flow = SimpleFlow(verbose=False)
assert flow.verbose is False
# Verify EventListener is also set to verbose=False
event_listener = EventListener()
assert event_listener.verbose is False
assert event_listener.formatter.verbose is False
def test_flow_verbose_true_enables_logging(self):
"""Test that Flow with verbose=True enables logging."""
class SimpleFlow(Flow):
@start()
def step_1(self):
return "done"
flow = SimpleFlow(verbose=True)
assert flow.verbose is True
# Verify EventListener is also set to verbose=True
event_listener = EventListener()
assert event_listener.verbose is True
assert event_listener.formatter.verbose is True
def test_flow_respects_env_var_false(self):
"""Test that Flow respects CREWAI_VERBOSE=false env var."""
class SimpleFlow(Flow):
@start()
def step_1(self):
return "done"
with patch.dict(os.environ, {"CREWAI_VERBOSE": "false"}, clear=False):
flow = SimpleFlow()
assert flow.verbose is False
def test_flow_respects_env_var_true(self):
"""Test that Flow respects CREWAI_VERBOSE=true env var."""
class SimpleFlow(Flow):
@start()
def step_1(self):
return "done"
with patch.dict(os.environ, {"CREWAI_VERBOSE": "true"}, clear=False):
flow = SimpleFlow()
assert flow.verbose is True
def test_flow_explicit_verbose_overrides_env_var(self):
"""Test that explicit verbose parameter overrides env var."""
class SimpleFlow(Flow):
@start()
def step_1(self):
return "done"
# Explicit verbose=True overrides CREWAI_VERBOSE=false
with patch.dict(os.environ, {"CREWAI_VERBOSE": "false"}, clear=False):
flow = SimpleFlow(verbose=True)
assert flow.verbose is True
# Explicit verbose=False overrides CREWAI_VERBOSE=true
with patch.dict(os.environ, {"CREWAI_VERBOSE": "true"}, clear=False):
flow = SimpleFlow(verbose=False)
assert flow.verbose is False
class TestFlowVerboseExecution:
"""Test that verbose setting actually suppresses output during Flow execution."""
def test_flow_verbose_false_suppresses_console_output(self):
"""Test that Flow with verbose=False suppresses console output."""
execution_order = []
class SimpleFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
return "step_1_done"
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
return "step_2_done"
# Create flow with verbose=False
flow = SimpleFlow(verbose=False)
# Verify the formatter's verbose is False
event_listener = EventListener()
assert event_listener.formatter.verbose is False
# Execute the flow
result = flow.kickoff()
# Flow should still execute correctly
assert execution_order == ["step_1", "step_2"]
assert result == "step_2_done"
def test_flow_verbose_true_allows_console_output(self):
"""Test that Flow with verbose=True allows console output."""
execution_order = []
class SimpleFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
return "step_1_done"
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
return "step_2_done"
# Create flow with verbose=True
flow = SimpleFlow(verbose=True)
# Verify the formatter's verbose is True
event_listener = EventListener()
assert event_listener.formatter.verbose is True
# Execute the flow
result = flow.kickoff()
# Flow should execute correctly
assert execution_order == ["step_1", "step_2"]
assert result == "step_2_done"
class TestConsoleFormatterVerbose:
"""Test that ConsoleFormatter respects verbose setting."""
def test_console_formatter_print_panel_respects_verbose_false(self):
"""Test that print_panel does not print when verbose=False."""
from rich.text import Text
from crewai.events.utils.console_formatter import ConsoleFormatter
formatter = ConsoleFormatter(verbose=False)
# Create a mock to capture print calls
with patch.object(formatter, "print") as mock_print:
content = Text("Test content")
formatter.print_panel(content, "Test Title", "blue", is_flow=True)
# print should not be called when verbose=False
mock_print.assert_not_called()
def test_console_formatter_print_panel_respects_verbose_true(self):
"""Test that print_panel prints when verbose=True."""
from rich.text import Text
from crewai.events.utils.console_formatter import ConsoleFormatter
formatter = ConsoleFormatter(verbose=True)
# Create a mock to capture print calls
with patch.object(formatter, "print") as mock_print:
content = Text("Test content")
formatter.print_panel(content, "Test Title", "blue", is_flow=True)
# print should be called when verbose=True
assert mock_print.call_count >= 1
def test_console_formatter_flow_events_respect_verbose_false(self):
"""Test that flow events are suppressed when verbose=False."""
from rich.text import Text
from crewai.events.utils.console_formatter import ConsoleFormatter
formatter = ConsoleFormatter(verbose=False)
# Create a mock to capture print calls
with patch.object(formatter, "print") as mock_print:
content = Text("Flow event content")
# is_flow=True should still respect verbose=False
formatter.print_panel(content, "Flow Event", "blue", is_flow=True)
# print should not be called even for flow events when verbose=False
mock_print.assert_not_called()

View File

@@ -1,234 +0,0 @@
"""Tests for prompt generation to prevent thought leakage.
These tests verify that:
1. Agents without tools don't get ReAct format instructions
2. The generated prompts don't encourage "Thought:" prefixes that leak into output
3. Real LLM calls produce clean output without internal reasoning
"""
from __future__ import annotations
from unittest.mock import MagicMock
import pytest
from crewai import Agent, Crew, Task
from crewai.llm import LLM
from crewai.utilities.prompts import Prompts
class TestNoToolsPromptGeneration:
"""Tests for prompt generation when agent has no tools."""
def test_no_tools_uses_task_no_tools_slice(self) -> None:
"""Test that agents without tools use task_no_tools slice instead of task."""
mock_agent = MagicMock()
mock_agent.role = "Test Agent"
mock_agent.goal = "Test goal"
mock_agent.backstory = "Test backstory"
prompts = Prompts(
has_tools=False,
use_native_tool_calling=False,
use_system_prompt=True,
agent=mock_agent,
)
result = prompts.task_execution()
# Verify it's a SystemPromptResult with system and user keys
assert "system" in result
assert "user" in result
assert "prompt" in result
# The user prompt should NOT contain "Thought:" (ReAct format)
assert "Thought:" not in result["user"]
# The user prompt should NOT mention tools
assert "use the tools available" not in result["user"]
assert "tools available" not in result["user"].lower()
# The system prompt should NOT contain ReAct format instructions
assert "Thought:" not in result["system"]
assert "Final Answer:" not in result["system"]
def test_no_tools_prompt_is_simple(self) -> None:
"""Test that no-tools prompt is simple and direct."""
mock_agent = MagicMock()
mock_agent.role = "Language Detector"
mock_agent.goal = "Detect language"
mock_agent.backstory = "Expert linguist"
prompts = Prompts(
has_tools=False,
use_native_tool_calling=False,
use_system_prompt=True,
agent=mock_agent,
)
result = prompts.task_execution()
# Should contain the role playing info
assert "Language Detector" in result["system"]
# User prompt should be simple with just the task
assert "Current Task:" in result["user"]
assert "Provide your complete response:" in result["user"]
def test_with_tools_uses_task_slice_with_react(self) -> None:
"""Test that agents WITH tools use the task slice (ReAct format)."""
mock_agent = MagicMock()
mock_agent.role = "Test Agent"
mock_agent.goal = "Test goal"
mock_agent.backstory = "Test backstory"
prompts = Prompts(
has_tools=True,
use_native_tool_calling=False,
use_system_prompt=True,
agent=mock_agent,
)
result = prompts.task_execution()
# With tools and ReAct, the prompt SHOULD contain Thought:
assert "Thought:" in result["user"]
def test_native_tools_uses_native_task_slice(self) -> None:
"""Test that native tool calling uses native_task slice."""
mock_agent = MagicMock()
mock_agent.role = "Test Agent"
mock_agent.goal = "Test goal"
mock_agent.backstory = "Test backstory"
prompts = Prompts(
has_tools=True,
use_native_tool_calling=True,
use_system_prompt=True,
agent=mock_agent,
)
result = prompts.task_execution()
# Native tool calling should NOT have Thought: in user prompt
assert "Thought:" not in result["user"]
# Should NOT have emotional manipulation
assert "your job depends on it" not in result["user"]
class TestNoThoughtLeakagePatterns:
"""Tests to verify prompts don't encourage thought leakage."""
def test_no_job_depends_on_it_in_no_tools(self) -> None:
"""Test that 'your job depends on it' is not in no-tools prompts."""
mock_agent = MagicMock()
mock_agent.role = "Test"
mock_agent.goal = "Test"
mock_agent.backstory = "Test"
prompts = Prompts(
has_tools=False,
use_native_tool_calling=False,
use_system_prompt=True,
agent=mock_agent,
)
result = prompts.task_execution()
full_prompt = result["prompt"]
assert "your job depends on it" not in full_prompt.lower()
assert "i must use these formats" not in full_prompt.lower()
def test_no_job_depends_on_it_in_native_task(self) -> None:
"""Test that 'your job depends on it' is not in native task prompts."""
mock_agent = MagicMock()
mock_agent.role = "Test"
mock_agent.goal = "Test"
mock_agent.backstory = "Test"
prompts = Prompts(
has_tools=True,
use_native_tool_calling=True,
use_system_prompt=True,
agent=mock_agent,
)
result = prompts.task_execution()
full_prompt = result["prompt"]
assert "your job depends on it" not in full_prompt.lower()
class TestRealLLMNoThoughtLeakage:
"""Integration tests with real LLM calls to verify no thought leakage."""
@pytest.mark.vcr()
def test_agent_without_tools_no_thought_in_output(self) -> None:
"""Test that agent without tools produces clean output without 'Thought:' prefix."""
agent = Agent(
role="Language Detector",
goal="Detect the language of text",
backstory="You are an expert linguist who can identify languages.",
tools=[], # No tools
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
task = Task(
description="What language is this text written in: 'Hello, how are you?'",
expected_output="The detected language (e.g., English, Spanish, etc.)",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert result.raw is not None
# The output should NOT start with "Thought:" or contain ReAct artifacts
output = str(result.raw)
assert not output.strip().startswith("Thought:")
assert "Final Answer:" not in output
assert "I now can give a great answer" not in output
# Should contain an actual answer about the language
assert any(
lang in output.lower()
for lang in ["english", "en", "language"]
)
@pytest.mark.vcr()
def test_simple_task_clean_output(self) -> None:
"""Test that a simple task produces clean output without internal reasoning."""
agent = Agent(
role="Classifier",
goal="Classify text sentiment",
backstory="You classify text sentiment accurately.",
tools=[],
llm=LLM(model="gpt-4o-mini"),
verbose=False,
)
task = Task(
description="Classify the sentiment of: 'I love this product!'",
expected_output="One word: positive, negative, or neutral",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
output = str(result.raw).strip().lower()
# Output should be clean - just the classification
assert not output.startswith("thought:")
assert "final answer:" not in output
# Should contain the actual classification
assert any(
sentiment in output
for sentiment in ["positive", "negative", "neutral"]
)

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.9.2"
__version__ = "1.9.0"