mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-30 19:28:29 +00:00
Compare commits
4 Commits
gl/chore/a
...
devin/1751
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
319b12f950 | ||
|
|
49aa75e622 | ||
|
|
09e5a829f9 | ||
|
|
ae59abb052 |
@@ -5,7 +5,3 @@ repos:
|
||||
- id: ruff
|
||||
args: ["--fix"]
|
||||
- id: ruff-format
|
||||
- repo: https://github.com/commitizen-tools/commitizen
|
||||
rev: v3.13.0
|
||||
hooks:
|
||||
- id: commitizen
|
||||
|
||||
@@ -94,7 +94,7 @@
|
||||
"pages": [
|
||||
"en/guides/advanced/customizing-prompts",
|
||||
"en/guides/advanced/fingerprinting"
|
||||
|
||||
|
||||
]
|
||||
}
|
||||
]
|
||||
@@ -296,8 +296,7 @@
|
||||
"en/enterprise/features/webhook-streaming",
|
||||
"en/enterprise/features/traces",
|
||||
"en/enterprise/features/hallucination-guardrail",
|
||||
"en/enterprise/features/integrations",
|
||||
"en/enterprise/features/agent-repositories"
|
||||
"en/enterprise/features/integrations"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -374,7 +373,7 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -731,7 +730,7 @@
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
]
|
||||
}
|
||||
]
|
||||
@@ -775,7 +774,7 @@
|
||||
"destination": "/en/introduction"
|
||||
},
|
||||
{
|
||||
"source": "/installation",
|
||||
"source": "/installation",
|
||||
"destination": "/en/installation"
|
||||
},
|
||||
{
|
||||
|
||||
@@ -526,103 +526,6 @@ agent = Agent(
|
||||
The context window management feature works automatically in the background. You don't need to call any special functions - just set `respect_context_window` to your preferred behavior and CrewAI handles the rest!
|
||||
</Note>
|
||||
|
||||
## Direct Agent Interaction with `kickoff()`
|
||||
|
||||
Agents can be used directly without going through a task or crew workflow using the `kickoff()` method. This provides a simpler way to interact with an agent when you don't need the full crew orchestration capabilities.
|
||||
|
||||
### How `kickoff()` Works
|
||||
|
||||
The `kickoff()` method allows you to send messages directly to an agent and get a response, similar to how you would interact with an LLM but with all the agent's capabilities (tools, reasoning, etc.).
|
||||
|
||||
```python Code
|
||||
from crewai import Agent
|
||||
from crewai_tools import SerperDevTool
|
||||
|
||||
# Create an agent
|
||||
researcher = Agent(
|
||||
role="AI Technology Researcher",
|
||||
goal="Research the latest AI developments",
|
||||
tools=[SerperDevTool()],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Use kickoff() to interact directly with the agent
|
||||
result = researcher.kickoff("What are the latest developments in language models?")
|
||||
|
||||
# Access the raw response
|
||||
print(result.raw)
|
||||
```
|
||||
|
||||
### Parameters and Return Values
|
||||
|
||||
| Parameter | Type | Description |
|
||||
| :---------------- | :---------------------------------- | :------------------------------------------------------------------------ |
|
||||
| `messages` | `Union[str, List[Dict[str, str]]]` | Either a string query or a list of message dictionaries with role/content |
|
||||
| `response_format` | `Optional[Type[Any]]` | Optional Pydantic model for structured output |
|
||||
|
||||
The method returns a `LiteAgentOutput` object with the following properties:
|
||||
|
||||
- `raw`: String containing the raw output text
|
||||
- `pydantic`: Parsed Pydantic model (if a `response_format` was provided)
|
||||
- `agent_role`: Role of the agent that produced the output
|
||||
- `usage_metrics`: Token usage metrics for the execution
|
||||
|
||||
### Structured Output
|
||||
|
||||
You can get structured output by providing a Pydantic model as the `response_format`:
|
||||
|
||||
```python Code
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
class ResearchFindings(BaseModel):
|
||||
main_points: List[str]
|
||||
key_technologies: List[str]
|
||||
future_predictions: str
|
||||
|
||||
# Get structured output
|
||||
result = researcher.kickoff(
|
||||
"Summarize the latest developments in AI for 2025",
|
||||
response_format=ResearchFindings
|
||||
)
|
||||
|
||||
# Access structured data
|
||||
print(result.pydantic.main_points)
|
||||
print(result.pydantic.future_predictions)
|
||||
```
|
||||
|
||||
### Multiple Messages
|
||||
|
||||
You can also provide a conversation history as a list of message dictionaries:
|
||||
|
||||
```python Code
|
||||
messages = [
|
||||
{"role": "user", "content": "I need information about large language models"},
|
||||
{"role": "assistant", "content": "I'd be happy to help with that! What specifically would you like to know?"},
|
||||
{"role": "user", "content": "What are the latest developments in 2025?"}
|
||||
]
|
||||
|
||||
result = researcher.kickoff(messages)
|
||||
```
|
||||
|
||||
### Async Support
|
||||
|
||||
An asynchronous version is available via `kickoff_async()` with the same parameters:
|
||||
|
||||
```python Code
|
||||
import asyncio
|
||||
|
||||
async def main():
|
||||
result = await researcher.kickoff_async("What are the latest developments in AI?")
|
||||
print(result.raw)
|
||||
|
||||
asyncio.run(main())
|
||||
```
|
||||
|
||||
<Note>
|
||||
The `kickoff()` method uses a `LiteAgent` internally, which provides a simpler execution flow while preserving all of the agent's configuration (role, goal, backstory, tools, etc.).
|
||||
</Note>
|
||||
|
||||
## Important Considerations and Best Practices
|
||||
|
||||
### Security and Code Execution
|
||||
|
||||
@@ -1,155 +0,0 @@
|
||||
---
|
||||
title: 'Agent Repositories'
|
||||
description: 'Learn how to use Agent Repositories to share and reuse your agents across teams and projects'
|
||||
icon: 'database'
|
||||
---
|
||||
|
||||
Agent Repositories allow enterprise users to store, share, and reuse agent definitions across teams and projects. This feature enables organizations to maintain a centralized library of standardized agents, promoting consistency and reducing duplication of effort.
|
||||
|
||||
## Benefits of Agent Repositories
|
||||
|
||||
- **Standardization**: Maintain consistent agent definitions across your organization
|
||||
- **Reusability**: Create an agent once and use it in multiple crews and projects
|
||||
- **Governance**: Implement organization-wide policies for agent configurations
|
||||
- **Collaboration**: Enable teams to share and build upon each other's work
|
||||
|
||||
## Using Agent Repositories
|
||||
|
||||
### Prerequisites
|
||||
|
||||
1. You must have an account at CrewAI, try the [free plan](https://app.crewai.com).
|
||||
2. You need to be authenticated using the CrewAI CLI.
|
||||
3. If you have more than one organization, make sure you are switched to the correct organization using the CLI command:
|
||||
|
||||
```bash
|
||||
crewai org switch <org_id>
|
||||
```
|
||||
|
||||
### Creating and Managing Agents in Repositories
|
||||
|
||||
To create and manage agents in repositories,Enterprise Dashboard.
|
||||
|
||||
### Loading Agents from Repositories
|
||||
|
||||
You can load agents from repositories in your code using the `from_repository` parameter:
|
||||
|
||||
```python
|
||||
from crewai import Agent
|
||||
|
||||
# Create an agent by loading it from a repository
|
||||
# The agent is loaded with all its predefined configurations
|
||||
researcher = Agent(
|
||||
from_repository="market-research-agent"
|
||||
)
|
||||
|
||||
```
|
||||
|
||||
### Overriding Repository Settings
|
||||
|
||||
You can override specific settings from the repository by providing them in the configuration:
|
||||
|
||||
```python
|
||||
researcher = Agent(
|
||||
from_repository="market-research-agent",
|
||||
goal="Research the latest trends in AI development", # Override the repository goal
|
||||
verbose=True # Add a setting not in the repository
|
||||
)
|
||||
```
|
||||
|
||||
### Example: Creating a Crew with Repository Agents
|
||||
|
||||
```python
|
||||
from crewai import Crew, Agent, Task
|
||||
|
||||
# Load agents from repositories
|
||||
researcher = Agent(
|
||||
from_repository="market-research-agent"
|
||||
)
|
||||
|
||||
writer = Agent(
|
||||
from_repository="content-writer-agent"
|
||||
)
|
||||
|
||||
# Create tasks
|
||||
research_task = Task(
|
||||
description="Research the latest trends in AI",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Write a comprehensive report based on the research",
|
||||
agent=writer
|
||||
)
|
||||
|
||||
# Create the crew
|
||||
crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Run the crew
|
||||
result = crew.kickoff()
|
||||
```
|
||||
|
||||
### Example: Using `kickoff()` with Repository Agents
|
||||
|
||||
You can also use repository agents directly with the `kickoff()` method for simpler interactions:
|
||||
|
||||
```python
|
||||
from crewai import Agent
|
||||
from pydantic import BaseModel
|
||||
from typing import List
|
||||
|
||||
# Define a structured output format
|
||||
class MarketAnalysis(BaseModel):
|
||||
key_trends: List[str]
|
||||
opportunities: List[str]
|
||||
recommendation: str
|
||||
|
||||
# Load an agent from repository
|
||||
analyst = Agent(
|
||||
from_repository="market-analyst-agent",
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Get a free-form response
|
||||
result = analyst.kickoff("Analyze the AI market in 2025")
|
||||
print(result.raw) # Access the raw response
|
||||
|
||||
# Get structured output
|
||||
structured_result = analyst.kickoff(
|
||||
"Provide a structured analysis of the AI market in 2025",
|
||||
response_format=MarketAnalysis
|
||||
)
|
||||
|
||||
# Access structured data
|
||||
print(f"Key Trends: {structured_result.pydantic.key_trends}")
|
||||
print(f"Recommendation: {structured_result.pydantic.recommendation}")
|
||||
```
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Naming Convention**: Use clear, descriptive names for your repository agents
|
||||
2. **Documentation**: Include comprehensive descriptions for each agent
|
||||
3. **Tool Management**: Ensure that tools referenced by repository agents are available in your environment
|
||||
4. **Access Control**: Manage permissions to ensure only authorized team members can modify repository agents
|
||||
|
||||
## Organization Management
|
||||
|
||||
To switch between organizations or see your current organization, use the CrewAI CLI:
|
||||
|
||||
```bash
|
||||
# View current organization
|
||||
crewai org current
|
||||
|
||||
# Switch to a different organization
|
||||
crewai org switch <org_id>
|
||||
|
||||
# List all available organizations
|
||||
crewai org list
|
||||
```
|
||||
|
||||
<Note>
|
||||
When loading agents from repositories, you must be authenticated and switched to the correct organization. If you receive errors, check your authentication status and organization settings using the CLI commands above.
|
||||
</Note>
|
||||
63
examples/batch_mode_example.py
Normal file
63
examples/batch_mode_example.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""
|
||||
Example demonstrating Google Batch Mode support in CrewAI.
|
||||
|
||||
This example shows how to use batch mode with Gemini models to reduce costs
|
||||
by up to 50% for non-urgent LLM calls.
|
||||
"""
|
||||
|
||||
import os
|
||||
from crewai import Agent, Task, Crew
|
||||
from crewai.llm import LLM
|
||||
|
||||
os.environ["GOOGLE_API_KEY"] = "your-google-api-key-here"
|
||||
|
||||
def main():
|
||||
batch_llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
batch_mode=True,
|
||||
batch_size=5, # Process 5 requests at once
|
||||
batch_timeout=300, # Wait up to 5 minutes for batch completion
|
||||
temperature=0.7
|
||||
)
|
||||
|
||||
research_agent = Agent(
|
||||
role="Research Analyst",
|
||||
goal="Analyze market trends and provide insights",
|
||||
backstory="You are an expert market analyst with years of experience.",
|
||||
llm=batch_llm,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
tasks = []
|
||||
topics = [
|
||||
"artificial intelligence market trends",
|
||||
"renewable energy investment opportunities",
|
||||
"cryptocurrency regulatory landscape",
|
||||
"e-commerce growth projections",
|
||||
"healthcare technology innovations"
|
||||
]
|
||||
|
||||
for topic in topics:
|
||||
task = Task(
|
||||
description=f"Research and analyze {topic}. Provide a brief summary of key trends and insights.",
|
||||
agent=research_agent,
|
||||
expected_output="A concise analysis with key findings and trends"
|
||||
)
|
||||
tasks.append(task)
|
||||
|
||||
crew = Crew(
|
||||
agents=[research_agent],
|
||||
tasks=tasks,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
print("Starting batch processing...")
|
||||
print("Note: Batch requests will be queued until batch_size is reached")
|
||||
|
||||
result = crew.kickoff()
|
||||
|
||||
print("Batch processing completed!")
|
||||
print("Results:", result)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -39,6 +39,7 @@ dependencies = [
|
||||
"tomli>=2.0.2",
|
||||
"blinker>=1.9.0",
|
||||
"json5>=0.10.0",
|
||||
"google-generativeai>=0.8.0",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -3,6 +3,7 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import warnings
|
||||
from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
@@ -23,6 +24,13 @@ from dotenv import load_dotenv
|
||||
from litellm.types.utils import ChatCompletionDeltaToolCall
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
try:
|
||||
import google.generativeai as genai
|
||||
GOOGLE_GENAI_AVAILABLE = True
|
||||
except ImportError:
|
||||
GOOGLE_GENAI_AVAILABLE = False
|
||||
genai = None # type: ignore
|
||||
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
@@ -57,6 +65,32 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
|
||||
LLMContextLengthExceededException,
|
||||
)
|
||||
|
||||
|
||||
class BatchJobStartedEvent:
|
||||
"""Event emitted when a batch job is started."""
|
||||
def __init__(self, messages, tools=None, from_task=None, from_agent=None):
|
||||
self.messages = messages
|
||||
self.tools = tools
|
||||
self.from_task = from_task
|
||||
self.from_agent = from_agent
|
||||
|
||||
|
||||
class BatchJobCompletedEvent:
|
||||
"""Event emitted when a batch job is completed."""
|
||||
def __init__(self, response, job_name, from_task=None, from_agent=None):
|
||||
self.response = response
|
||||
self.job_name = job_name
|
||||
self.from_task = from_task
|
||||
self.from_agent = from_agent
|
||||
|
||||
|
||||
class BatchJobFailedEvent:
|
||||
"""Event emitted when a batch job fails."""
|
||||
def __init__(self, error, from_task=None, from_agent=None):
|
||||
self.error = error
|
||||
self.from_task = from_task
|
||||
self.from_agent = from_agent
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
@@ -311,6 +345,9 @@ class LLM(BaseLLM):
|
||||
callbacks: List[Any] = [],
|
||||
reasoning_effort: Optional[Literal["none", "low", "medium", "high"]] = None,
|
||||
stream: bool = False,
|
||||
batch_mode: bool = False,
|
||||
batch_size: Optional[int] = None,
|
||||
batch_timeout: Optional[int] = 300,
|
||||
**kwargs,
|
||||
):
|
||||
self.model = model
|
||||
@@ -337,6 +374,12 @@ class LLM(BaseLLM):
|
||||
self.additional_params = kwargs
|
||||
self.is_anthropic = self._is_anthropic_model(model)
|
||||
self.stream = stream
|
||||
self.batch_mode = batch_mode
|
||||
self.batch_size = batch_size or 10
|
||||
self.batch_timeout = batch_timeout
|
||||
self._batch_requests: List[Dict[str, Any]] = []
|
||||
self._current_batch_job: Optional[str] = None
|
||||
self._batch_results: Dict[str, List[str]] = {}
|
||||
|
||||
litellm.drop_params = True
|
||||
|
||||
@@ -363,6 +406,10 @@ class LLM(BaseLLM):
|
||||
ANTHROPIC_PREFIXES = ("anthropic/", "claude-", "claude/")
|
||||
return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES)
|
||||
|
||||
def _is_gemini_model(self) -> bool:
|
||||
"""Check if the model is a Gemini model that supports batch mode."""
|
||||
return "gemini" in self.model.lower() and GOOGLE_GENAI_AVAILABLE
|
||||
|
||||
def _prepare_completion_params(
|
||||
self,
|
||||
messages: Union[str, List[Dict[str, str]]],
|
||||
@@ -414,6 +461,87 @@ class LLM(BaseLLM):
|
||||
# Remove None values from params
|
||||
return {k: v for k, v in params.items() if v is not None}
|
||||
|
||||
def _prepare_batch_request(
|
||||
self,
|
||||
messages: List[Dict[str, str]],
|
||||
tools: Optional[List[dict]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Prepare a single request for batch processing."""
|
||||
if not self._is_gemini_model():
|
||||
raise ValueError("Batch mode is only supported for Gemini models")
|
||||
|
||||
formatted_messages = self._format_messages_for_provider(messages)
|
||||
|
||||
request: Dict[str, Any] = {
|
||||
"contents": [],
|
||||
"generationConfig": {
|
||||
"temperature": self.temperature,
|
||||
"topP": self.top_p,
|
||||
"maxOutputTokens": self.max_tokens or self.max_completion_tokens,
|
||||
"stopSequences": self.stop if isinstance(self.stop, list) else [self.stop] if self.stop else None,
|
||||
}
|
||||
}
|
||||
|
||||
for message in formatted_messages:
|
||||
role = "user" if message["role"] == "user" else "model"
|
||||
request["contents"].append({
|
||||
"role": role,
|
||||
"parts": [{"text": message["content"]}]
|
||||
})
|
||||
|
||||
if tools:
|
||||
request["tools"] = tools
|
||||
|
||||
return {"model": self.model.replace("gemini/", ""), "contents": request["contents"], "generationConfig": request["generationConfig"]}
|
||||
|
||||
def _submit_batch_job(self, requests: List[Dict[str, Any]]) -> str:
|
||||
"""Submit requests for sequential processing (fallback for batch mode)."""
|
||||
if not GOOGLE_GENAI_AVAILABLE:
|
||||
raise ImportError("google-generativeai is required for batch mode")
|
||||
|
||||
if not self.api_key:
|
||||
raise ValueError("API key is required for batch mode")
|
||||
|
||||
genai.configure(api_key=self.api_key)
|
||||
model = genai.GenerativeModel(self.model.replace("gemini/", ""))
|
||||
|
||||
job_id = f"crewai-batch-{int(time.time())}"
|
||||
results = []
|
||||
|
||||
for request in requests:
|
||||
try:
|
||||
response = model.generate_content(
|
||||
request["contents"],
|
||||
generation_config=request.get("generationConfig")
|
||||
)
|
||||
results.append(response.text if response.text else "")
|
||||
except Exception as e:
|
||||
results.append(f"Error: {str(e)}")
|
||||
|
||||
self._batch_results[job_id] = results
|
||||
return job_id
|
||||
|
||||
def _poll_batch_job(self, job_name: str) -> Any:
|
||||
"""Return immediately since processing is synchronous."""
|
||||
if not GOOGLE_GENAI_AVAILABLE:
|
||||
raise ImportError("google-generativeai is required for batch mode")
|
||||
|
||||
class MockBatchJob:
|
||||
def __init__(self):
|
||||
self.state = type('State', (), {'name': 'JOB_STATE_SUCCEEDED'})()
|
||||
|
||||
return MockBatchJob()
|
||||
|
||||
def _retrieve_batch_results(self, job_name: str) -> List[str]:
|
||||
"""Retrieve stored results."""
|
||||
if not GOOGLE_GENAI_AVAILABLE:
|
||||
raise ImportError("google-generativeai is required for batch mode")
|
||||
|
||||
if job_name in self._batch_results:
|
||||
return self._batch_results[job_name]
|
||||
|
||||
return []
|
||||
|
||||
def _handle_streaming_response(
|
||||
self,
|
||||
params: Dict[str, Any],
|
||||
@@ -952,6 +1080,11 @@ class LLM(BaseLLM):
|
||||
if isinstance(messages, str):
|
||||
messages = [{"role": "user", "content": messages}]
|
||||
|
||||
if self.batch_mode and self._is_gemini_model():
|
||||
return self._handle_batch_request(
|
||||
messages, tools, callbacks, available_functions, from_task, from_agent
|
||||
)
|
||||
|
||||
# --- 4) Handle O1 model special case (system messages not supported)
|
||||
if "o1" in self.model.lower():
|
||||
for message in messages:
|
||||
@@ -991,6 +1124,77 @@ class LLM(BaseLLM):
|
||||
logging.error(f"LiteLLM call failed: {str(e)}")
|
||||
raise
|
||||
|
||||
def _handle_batch_request(
|
||||
self,
|
||||
messages: List[Dict[str, str]],
|
||||
tools: Optional[List[dict]] = None,
|
||||
callbacks: Optional[List[Any]] = None,
|
||||
available_functions: Optional[Dict[str, Any]] = None,
|
||||
from_task: Optional[Any] = None,
|
||||
from_agent: Optional[Any] = None,
|
||||
) -> str:
|
||||
"""Handle batch mode request for Gemini models."""
|
||||
if not self._is_gemini_model():
|
||||
raise ValueError("Batch mode is only supported for Gemini models")
|
||||
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=BatchJobStartedEvent(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
batch_request = self._prepare_batch_request(messages, tools)
|
||||
self._batch_requests.append(batch_request)
|
||||
|
||||
if len(self._batch_requests) >= self.batch_size:
|
||||
job_name = self._submit_batch_job(self._batch_requests)
|
||||
self._current_batch_job = job_name
|
||||
|
||||
self._poll_batch_job(job_name)
|
||||
results = self._retrieve_batch_results(job_name)
|
||||
|
||||
self._batch_requests.clear()
|
||||
self._current_batch_job = None
|
||||
|
||||
if results:
|
||||
response = results[0]
|
||||
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=BatchJobCompletedEvent(
|
||||
response=response,
|
||||
job_name=job_name,
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
|
||||
return response
|
||||
else:
|
||||
raise RuntimeError("No results returned from batch job")
|
||||
else:
|
||||
return "Batch request queued. Call with more requests to trigger batch processing."
|
||||
|
||||
except Exception as e:
|
||||
assert hasattr(crewai_event_bus, "emit")
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=BatchJobFailedEvent(
|
||||
error=str(e),
|
||||
from_task=from_task,
|
||||
from_agent=from_agent,
|
||||
),
|
||||
)
|
||||
logging.error(f"Batch request failed: {str(e)}")
|
||||
raise
|
||||
|
||||
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType, from_task: Optional[Any] = None, from_agent: Optional[Any] = None):
|
||||
"""Handle the events for the LLM call.
|
||||
|
||||
|
||||
235
tests/test_batch_mode.py
Normal file
235
tests/test_batch_mode.py
Normal file
@@ -0,0 +1,235 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from crewai.llm import LLM
|
||||
|
||||
|
||||
class TestBatchMode:
|
||||
"""Test suite for Google Batch Mode functionality."""
|
||||
|
||||
def test_batch_mode_initialization(self):
|
||||
"""Test that batch mode parameters are properly initialized."""
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
batch_mode=True,
|
||||
batch_size=5,
|
||||
batch_timeout=600
|
||||
)
|
||||
|
||||
assert llm.batch_mode is True
|
||||
assert llm.batch_size == 5
|
||||
assert llm.batch_timeout == 600
|
||||
assert llm._batch_requests == []
|
||||
assert llm._current_batch_job is None
|
||||
|
||||
def test_batch_mode_defaults(self):
|
||||
"""Test default values for batch mode parameters."""
|
||||
llm = LLM(model="gemini/gemini-1.5-pro", batch_mode=True)
|
||||
|
||||
assert llm.batch_mode is True
|
||||
assert llm.batch_size == 10
|
||||
assert llm.batch_timeout == 300
|
||||
|
||||
def test_is_gemini_model_detection(self):
|
||||
"""Test Gemini model detection for batch mode support."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
llm_gemini = LLM(model="gemini/gemini-1.5-pro")
|
||||
assert llm_gemini._is_gemini_model() is True
|
||||
|
||||
llm_openai = LLM(model="gpt-4")
|
||||
assert llm_openai._is_gemini_model() is False
|
||||
|
||||
def test_is_gemini_model_without_genai_available(self):
|
||||
"""Test Gemini model detection when google-generativeai is not available."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', False):
|
||||
llm = LLM(model="gemini/gemini-1.5-pro")
|
||||
assert llm._is_gemini_model() is False
|
||||
|
||||
def test_prepare_batch_request(self):
|
||||
"""Test batch request preparation."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
temperature=0.7,
|
||||
top_p=0.9,
|
||||
max_tokens=1000
|
||||
)
|
||||
|
||||
messages = [{"role": "user", "content": "Hello, world!"}]
|
||||
batch_request = llm._prepare_batch_request(messages)
|
||||
|
||||
assert "model" in batch_request
|
||||
assert batch_request["model"] == "gemini-1.5-pro"
|
||||
assert "contents" in batch_request
|
||||
assert "generationConfig" in batch_request
|
||||
assert batch_request["generationConfig"]["temperature"] == 0.7
|
||||
assert batch_request["generationConfig"]["topP"] == 0.9
|
||||
assert batch_request["generationConfig"]["maxOutputTokens"] == 1000
|
||||
|
||||
def test_prepare_batch_request_non_gemini_model(self):
|
||||
"""Test that batch request preparation fails for non-Gemini models."""
|
||||
llm = LLM(model="gpt-4")
|
||||
messages = [{"role": "user", "content": "Hello, world!"}]
|
||||
|
||||
with pytest.raises(ValueError, match="Batch mode is only supported for Gemini models"):
|
||||
llm._prepare_batch_request(messages)
|
||||
|
||||
@patch('crewai.llm.genai')
|
||||
def test_submit_batch_job(self, mock_genai):
|
||||
"""Test batch job submission."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
mock_batch_job = Mock()
|
||||
mock_batch_job.name = "test-job-123"
|
||||
mock_genai.create_batch_job.return_value = mock_batch_job
|
||||
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
api_key="test-key"
|
||||
)
|
||||
|
||||
requests = [{"model": "gemini-1.5-pro", "contents": []}]
|
||||
job_name = llm._submit_batch_job(requests)
|
||||
|
||||
assert job_name == "test-job-123"
|
||||
mock_genai.configure.assert_called_with(api_key="test-key")
|
||||
mock_genai.create_batch_job.assert_called_once()
|
||||
|
||||
def test_submit_batch_job_without_genai(self):
|
||||
"""Test batch job submission without google-generativeai available."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', False):
|
||||
llm = LLM(model="gemini/gemini-1.5-pro")
|
||||
|
||||
with pytest.raises(ImportError, match="google-generativeai is required for batch mode"):
|
||||
llm._submit_batch_job([])
|
||||
|
||||
def test_submit_batch_job_without_api_key(self):
|
||||
"""Test batch job submission without API key."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
llm = LLM(model="gemini/gemini-1.5-pro")
|
||||
|
||||
with pytest.raises(ValueError, match="API key is required for batch mode"):
|
||||
llm._submit_batch_job([])
|
||||
|
||||
@patch('crewai.llm.genai')
|
||||
@patch('crewai.llm.time')
|
||||
def test_poll_batch_job_success(self, mock_time, mock_genai):
|
||||
"""Test successful batch job polling."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
mock_batch_job = Mock()
|
||||
mock_batch_job.state = "JOB_STATE_SUCCEEDED"
|
||||
mock_genai.get_batch_job.return_value = mock_batch_job
|
||||
mock_time.time.side_effect = [0, 1, 2]
|
||||
mock_time.sleep = Mock()
|
||||
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
api_key="test-key"
|
||||
)
|
||||
|
||||
result = llm._poll_batch_job("test-job-123")
|
||||
|
||||
assert result == mock_batch_job
|
||||
mock_genai.get_batch_job.assert_called_with("test-job-123")
|
||||
|
||||
@patch('crewai.llm.genai')
|
||||
@patch('crewai.llm.time')
|
||||
def test_poll_batch_job_timeout(self, mock_time, mock_genai):
|
||||
"""Test batch job polling timeout."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
mock_batch_job = Mock()
|
||||
mock_batch_job.state = "JOB_STATE_PENDING"
|
||||
mock_genai.get_batch_job.return_value = mock_batch_job
|
||||
mock_time.time.side_effect = [0, 400]
|
||||
mock_time.sleep = Mock()
|
||||
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
api_key="test-key",
|
||||
batch_timeout=300
|
||||
)
|
||||
|
||||
with pytest.raises(TimeoutError, match="did not complete within 300 seconds"):
|
||||
llm._poll_batch_job("test-job-123")
|
||||
|
||||
@patch('crewai.llm.genai')
|
||||
def test_retrieve_batch_results(self, mock_genai):
|
||||
"""Test batch result retrieval."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
mock_batch_job = Mock()
|
||||
mock_batch_job.state = "JOB_STATE_SUCCEEDED"
|
||||
mock_genai.get_batch_job.return_value = mock_batch_job
|
||||
|
||||
mock_response = Mock()
|
||||
mock_response.response.candidates = [Mock()]
|
||||
mock_response.response.candidates[0].content.parts = [Mock()]
|
||||
mock_response.response.candidates[0].content.parts[0].text = "Test response"
|
||||
|
||||
mock_genai.list_batch_job_responses.return_value = [mock_response]
|
||||
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
api_key="test-key"
|
||||
)
|
||||
|
||||
results = llm._retrieve_batch_results("test-job-123")
|
||||
|
||||
assert results == ["Test response"]
|
||||
mock_genai.get_batch_job.assert_called_with("test-job-123")
|
||||
mock_genai.list_batch_job_responses.assert_called_with("test-job-123")
|
||||
|
||||
@patch('crewai.llm.genai')
|
||||
def test_retrieve_batch_results_failed_job(self, mock_genai):
|
||||
"""Test batch result retrieval for failed job."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
mock_batch_job = Mock()
|
||||
mock_batch_job.state = "JOB_STATE_FAILED"
|
||||
mock_genai.get_batch_job.return_value = mock_batch_job
|
||||
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
api_key="test-key"
|
||||
)
|
||||
|
||||
with pytest.raises(RuntimeError, match="Batch job failed with state: JOB_STATE_FAILED"):
|
||||
llm._retrieve_batch_results("test-job-123")
|
||||
|
||||
@patch('crewai.llm.crewai_event_bus')
|
||||
def test_handle_batch_request_non_gemini(self, mock_event_bus):
|
||||
"""Test batch request handling for non-Gemini models."""
|
||||
llm = LLM(model="gpt-4", batch_mode=True)
|
||||
messages = [{"role": "user", "content": "Hello"}]
|
||||
|
||||
with pytest.raises(ValueError, match="Batch mode is only supported for Gemini models"):
|
||||
llm._handle_batch_request(messages)
|
||||
|
||||
@patch('crewai.llm.crewai_event_bus')
|
||||
def test_batch_mode_call_routing(self, mock_event_bus):
|
||||
"""Test that batch mode calls are routed correctly."""
|
||||
with patch('crewai.llm.GOOGLE_GENAI_AVAILABLE', True):
|
||||
llm = LLM(
|
||||
model="gemini/gemini-1.5-pro",
|
||||
batch_mode=True,
|
||||
api_key="test-key"
|
||||
)
|
||||
|
||||
with patch.object(llm, '_handle_batch_request') as mock_batch_handler:
|
||||
mock_batch_handler.return_value = "Batch response"
|
||||
|
||||
result = llm.call("Hello, world!")
|
||||
|
||||
assert result == "Batch response"
|
||||
mock_batch_handler.assert_called_once()
|
||||
|
||||
def test_non_batch_mode_unchanged(self):
|
||||
"""Test that non-batch mode behavior is unchanged."""
|
||||
with patch('crewai.llm.litellm') as mock_litellm:
|
||||
mock_response = Mock()
|
||||
mock_response.choices = [Mock()]
|
||||
mock_response.choices[0].message.content = "Regular response"
|
||||
mock_response.choices[0].message.tool_calls = []
|
||||
mock_litellm.completion.return_value = mock_response
|
||||
|
||||
llm = LLM(model="gemini/gemini-1.5-pro", batch_mode=False)
|
||||
result = llm.call("Hello, world!")
|
||||
|
||||
assert result == "Regular response"
|
||||
mock_litellm.completion.assert_called_once()
|
||||
Reference in New Issue
Block a user