mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-17 04:48:30 +00:00
Compare commits
26 Commits
bugfix/sup
...
tm-add-tas
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
861560aa93 | ||
|
|
86825e1769 | ||
|
|
7afc531fbb | ||
|
|
ed0490112b | ||
|
|
66c66e3d84 | ||
|
|
b9b625a70d | ||
|
|
b58253cacc | ||
|
|
fbf8732784 | ||
|
|
8fedbe49cb | ||
|
|
1e8ee247ca | ||
|
|
34d2993456 | ||
|
|
e3c5c174ee | ||
|
|
b4e2db0306 | ||
|
|
9cc759ba32 | ||
|
|
ac9f8b9d5a | ||
|
|
3d4a1e4b18 | ||
|
|
123f302744 | ||
|
|
5bae78639e | ||
|
|
5235442a5b | ||
|
|
c62fb615b1 | ||
|
|
78797c64b0 | ||
|
|
8a7584798b | ||
|
|
b50772a38b | ||
|
|
96a7e8038f | ||
|
|
ec050e5d33 | ||
|
|
e2ce65fc5b |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -21,4 +21,5 @@ crew_tasks_output.json
|
||||
.mypy_cache
|
||||
.ruff_cache
|
||||
.venv
|
||||
agentops.log
|
||||
agentops.log
|
||||
test_flow.html
|
||||
@@ -136,17 +136,21 @@ crewai test -n 5 -m gpt-3.5-turbo
|
||||
|
||||
### 8. Run
|
||||
|
||||
Run the crew.
|
||||
Run the crew or flow.
|
||||
|
||||
```shell Terminal
|
||||
crewai run
|
||||
```
|
||||
|
||||
<Note>
|
||||
Starting from version 0.103.0, the `crewai run` command can be used to run both standard crews and flows. For flows, it automatically detects the type from pyproject.toml and runs the appropriate command. This is now the recommended way to run both crews and flows.
|
||||
</Note>
|
||||
|
||||
<Note>
|
||||
Make sure to run these commands from the directory where your CrewAI project is set up.
|
||||
Some commands may require additional configuration or setup within your project structure.
|
||||
</Note>
|
||||
|
||||
|
||||
### 9. Chat
|
||||
|
||||
Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks.
|
||||
@@ -175,7 +179,6 @@ def crew(self) -> Crew:
|
||||
```
|
||||
</Note>
|
||||
|
||||
|
||||
### 10. API Keys
|
||||
|
||||
When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one.
|
||||
|
||||
349
docs/concepts/event-listner.mdx
Normal file
349
docs/concepts/event-listner.mdx
Normal file
@@ -0,0 +1,349 @@
|
||||
---
|
||||
title: 'Event Listeners'
|
||||
description: 'Tap into CrewAI events to build custom integrations and monitoring'
|
||||
---
|
||||
|
||||
# Event Listeners
|
||||
|
||||
CrewAI provides a powerful event system that allows you to listen for and react to various events that occur during the execution of your Crew. This feature enables you to build custom integrations, monitoring solutions, logging systems, or any other functionality that needs to be triggered based on CrewAI's internal events.
|
||||
|
||||
## How It Works
|
||||
|
||||
CrewAI uses an event bus architecture to emit events throughout the execution lifecycle. The event system is built on the following components:
|
||||
|
||||
1. **CrewAIEventsBus**: A singleton event bus that manages event registration and emission
|
||||
2. **CrewEvent**: Base class for all events in the system
|
||||
3. **BaseEventListener**: Abstract base class for creating custom event listeners
|
||||
|
||||
When specific actions occur in CrewAI (like a Crew starting execution, an Agent completing a task, or a tool being used), the system emits corresponding events. You can register handlers for these events to execute custom code when they occur.
|
||||
|
||||
## Creating a Custom Event Listener
|
||||
|
||||
To create a custom event listener, you need to:
|
||||
|
||||
1. Create a class that inherits from `BaseEventListener`
|
||||
2. Implement the `setup_listeners` method
|
||||
3. Register handlers for the events you're interested in
|
||||
4. Create an instance of your listener in the appropriate file
|
||||
|
||||
Here's a simple example of a custom event listener class:
|
||||
|
||||
```python
|
||||
from crewai.utilities.events import (
|
||||
CrewKickoffStartedEvent,
|
||||
CrewKickoffCompletedEvent,
|
||||
AgentExecutionCompletedEvent,
|
||||
)
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
|
||||
class MyCustomListener(BaseEventListener):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def on_crew_started(source, event):
|
||||
print(f"Crew '{event.crew_name}' has started execution!")
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffCompletedEvent)
|
||||
def on_crew_completed(source, event):
|
||||
print(f"Crew '{event.crew_name}' has completed execution!")
|
||||
print(f"Output: {event.output}")
|
||||
|
||||
@crewai_event_bus.on(AgentExecutionCompletedEvent)
|
||||
def on_agent_execution_completed(source, event):
|
||||
print(f"Agent '{event.agent.role}' completed task")
|
||||
print(f"Output: {event.output}")
|
||||
```
|
||||
|
||||
## Properly Registering Your Listener
|
||||
|
||||
Simply defining your listener class isn't enough. You need to create an instance of it and ensure it's imported in your application. This ensures that:
|
||||
|
||||
1. The event handlers are registered with the event bus
|
||||
2. The listener instance remains in memory (not garbage collected)
|
||||
3. The listener is active when events are emitted
|
||||
|
||||
### Option 1: Import and Instantiate in Your Crew or Flow Implementation
|
||||
|
||||
The most important thing is to create an instance of your listener in the file where your Crew or Flow is defined and executed:
|
||||
|
||||
#### For Crew-based Applications
|
||||
|
||||
Create and import your listener at the top of your Crew implementation file:
|
||||
|
||||
```python
|
||||
# In your crew.py file
|
||||
from crewai import Agent, Crew, Task
|
||||
from my_listeners import MyCustomListener
|
||||
|
||||
# Create an instance of your listener
|
||||
my_listener = MyCustomListener()
|
||||
|
||||
class MyCustomCrew:
|
||||
# Your crew implementation...
|
||||
|
||||
def crew(self):
|
||||
return Crew(
|
||||
agents=[...],
|
||||
tasks=[...],
|
||||
# ...
|
||||
)
|
||||
```
|
||||
|
||||
#### For Flow-based Applications
|
||||
|
||||
Create and import your listener at the top of your Flow implementation file:
|
||||
|
||||
```python
|
||||
# In your main.py or flow.py file
|
||||
from crewai.flow import Flow, listen, start
|
||||
from my_listeners import MyCustomListener
|
||||
|
||||
# Create an instance of your listener
|
||||
my_listener = MyCustomListener()
|
||||
|
||||
class MyCustomFlow(Flow):
|
||||
# Your flow implementation...
|
||||
|
||||
@start()
|
||||
def first_step(self):
|
||||
# ...
|
||||
```
|
||||
|
||||
This ensures that your listener is loaded and active when your Crew or Flow is executed.
|
||||
|
||||
### Option 2: Create a Package for Your Listeners
|
||||
|
||||
For a more structured approach, especially if you have multiple listeners:
|
||||
|
||||
1. Create a package for your listeners:
|
||||
|
||||
```
|
||||
my_project/
|
||||
├── listeners/
|
||||
│ ├── __init__.py
|
||||
│ ├── my_custom_listener.py
|
||||
│ └── another_listener.py
|
||||
```
|
||||
|
||||
2. In `my_custom_listener.py`, define your listener class and create an instance:
|
||||
|
||||
```python
|
||||
# my_custom_listener.py
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
# ... import events ...
|
||||
|
||||
class MyCustomListener(BaseEventListener):
|
||||
# ... implementation ...
|
||||
|
||||
# Create an instance of your listener
|
||||
my_custom_listener = MyCustomListener()
|
||||
```
|
||||
|
||||
3. In `__init__.py`, import the listener instances to ensure they're loaded:
|
||||
|
||||
```python
|
||||
# __init__.py
|
||||
from .my_custom_listener import my_custom_listener
|
||||
from .another_listener import another_listener
|
||||
|
||||
# Optionally export them if you need to access them elsewhere
|
||||
__all__ = ['my_custom_listener', 'another_listener']
|
||||
```
|
||||
|
||||
4. Import your listeners package in your Crew or Flow file:
|
||||
|
||||
```python
|
||||
# In your crew.py or flow.py file
|
||||
import my_project.listeners # This loads all your listeners
|
||||
|
||||
class MyCustomCrew:
|
||||
# Your crew implementation...
|
||||
```
|
||||
|
||||
This is exactly how CrewAI's built-in `agentops_listener` is registered. In the CrewAI codebase, you'll find:
|
||||
|
||||
```python
|
||||
# src/crewai/utilities/events/third_party/__init__.py
|
||||
from .agentops_listener import agentops_listener
|
||||
```
|
||||
|
||||
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
|
||||
|
||||
## Available Event Types
|
||||
|
||||
CrewAI provides a wide range of events that you can listen for:
|
||||
|
||||
### Crew Events
|
||||
|
||||
- **CrewKickoffStartedEvent**: Emitted when a Crew starts execution
|
||||
- **CrewKickoffCompletedEvent**: Emitted when a Crew completes execution
|
||||
- **CrewKickoffFailedEvent**: Emitted when a Crew fails to complete execution
|
||||
- **CrewTestStartedEvent**: Emitted when a Crew starts testing
|
||||
- **CrewTestCompletedEvent**: Emitted when a Crew completes testing
|
||||
- **CrewTestFailedEvent**: Emitted when a Crew fails to complete testing
|
||||
- **CrewTrainStartedEvent**: Emitted when a Crew starts training
|
||||
- **CrewTrainCompletedEvent**: Emitted when a Crew completes training
|
||||
- **CrewTrainFailedEvent**: Emitted when a Crew fails to complete training
|
||||
|
||||
### Agent Events
|
||||
|
||||
- **AgentExecutionStartedEvent**: Emitted when an Agent starts executing a task
|
||||
- **AgentExecutionCompletedEvent**: Emitted when an Agent completes executing a task
|
||||
- **AgentExecutionErrorEvent**: Emitted when an Agent encounters an error during execution
|
||||
|
||||
### Task Events
|
||||
|
||||
- **TaskStartedEvent**: Emitted when a Task starts execution
|
||||
- **TaskCompletedEvent**: Emitted when a Task completes execution
|
||||
- **TaskFailedEvent**: Emitted when a Task fails to complete execution
|
||||
- **TaskEvaluationEvent**: Emitted when a Task is evaluated
|
||||
|
||||
### Tool Usage Events
|
||||
|
||||
- **ToolUsageStartedEvent**: Emitted when a tool execution is started
|
||||
- **ToolUsageFinishedEvent**: Emitted when a tool execution is completed
|
||||
- **ToolUsageErrorEvent**: Emitted when a tool execution encounters an error
|
||||
- **ToolValidateInputErrorEvent**: Emitted when a tool input validation encounters an error
|
||||
- **ToolExecutionErrorEvent**: Emitted when a tool execution encounters an error
|
||||
- **ToolSelectionErrorEvent**: Emitted when there's an error selecting a tool
|
||||
|
||||
### Flow Events
|
||||
|
||||
- **FlowCreatedEvent**: Emitted when a Flow is created
|
||||
- **FlowStartedEvent**: Emitted when a Flow starts execution
|
||||
- **FlowFinishedEvent**: Emitted when a Flow completes execution
|
||||
- **FlowPlotEvent**: Emitted when a Flow is plotted
|
||||
- **MethodExecutionStartedEvent**: Emitted when a Flow method starts execution
|
||||
- **MethodExecutionFinishedEvent**: Emitted when a Flow method completes execution
|
||||
- **MethodExecutionFailedEvent**: Emitted when a Flow method fails to complete execution
|
||||
|
||||
### LLM Events
|
||||
|
||||
- **LLMCallStartedEvent**: Emitted when an LLM call starts
|
||||
- **LLMCallCompletedEvent**: Emitted when an LLM call completes
|
||||
- **LLMCallFailedEvent**: Emitted when an LLM call fails
|
||||
|
||||
## Event Handler Structure
|
||||
|
||||
Each event handler receives two parameters:
|
||||
|
||||
1. **source**: The object that emitted the event
|
||||
2. **event**: The event instance, containing event-specific data
|
||||
|
||||
The structure of the event object depends on the event type, but all events inherit from `CrewEvent` and include:
|
||||
|
||||
- **timestamp**: The time when the event was emitted
|
||||
- **type**: A string identifier for the event type
|
||||
|
||||
Additional fields vary by event type. For example, `CrewKickoffCompletedEvent` includes `crew_name` and `output` fields.
|
||||
|
||||
## Real-World Example: Integration with AgentOps
|
||||
|
||||
CrewAI includes an example of a third-party integration with [AgentOps](https://github.com/AgentOps-AI/agentops), a monitoring and observability platform for AI agents. Here's how it's implemented:
|
||||
|
||||
```python
|
||||
from typing import Optional
|
||||
|
||||
from crewai.utilities.events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
ToolUsageErrorEvent,
|
||||
ToolUsageStartedEvent,
|
||||
)
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events.crew_events import CrewKickoffStartedEvent
|
||||
from crewai.utilities.events.task_events import TaskEvaluationEvent
|
||||
|
||||
try:
|
||||
import agentops
|
||||
AGENTOPS_INSTALLED = True
|
||||
except ImportError:
|
||||
AGENTOPS_INSTALLED = False
|
||||
|
||||
class AgentOpsListener(BaseEventListener):
|
||||
tool_event: Optional["agentops.ToolEvent"] = None
|
||||
session: Optional["agentops.Session"] = None
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def setup_listeners(self, crewai_event_bus):
|
||||
if not AGENTOPS_INSTALLED:
|
||||
return
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def on_crew_kickoff_started(source, event: CrewKickoffStartedEvent):
|
||||
self.session = agentops.init()
|
||||
for agent in source.agents:
|
||||
if self.session:
|
||||
self.session.create_agent(
|
||||
name=agent.role,
|
||||
agent_id=str(agent.id),
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffCompletedEvent)
|
||||
def on_crew_kickoff_completed(source, event: CrewKickoffCompletedEvent):
|
||||
if self.session:
|
||||
self.session.end_session(
|
||||
end_state="Success",
|
||||
end_state_reason="Finished Execution",
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(ToolUsageStartedEvent)
|
||||
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
|
||||
self.tool_event = agentops.ToolEvent(name=event.tool_name)
|
||||
if self.session:
|
||||
self.session.record(self.tool_event)
|
||||
|
||||
@crewai_event_bus.on(ToolUsageErrorEvent)
|
||||
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
|
||||
agentops.ErrorEvent(exception=event.error, trigger_event=self.tool_event)
|
||||
```
|
||||
|
||||
This listener initializes an AgentOps session when a Crew starts, registers agents with AgentOps, tracks tool usage, and ends the session when the Crew completes.
|
||||
|
||||
The AgentOps listener is registered in CrewAI's event system through the import in `src/crewai/utilities/events/third_party/__init__.py`:
|
||||
|
||||
```python
|
||||
from .agentops_listener import agentops_listener
|
||||
```
|
||||
|
||||
This ensures the `agentops_listener` is loaded when the `crewai.utilities.events` package is imported.
|
||||
|
||||
## Advanced Usage: Scoped Handlers
|
||||
|
||||
For temporary event handling (useful for testing or specific operations), you can use the `scoped_handlers` context manager:
|
||||
|
||||
```python
|
||||
from crewai.utilities.events import crewai_event_bus, CrewKickoffStartedEvent
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def temp_handler(source, event):
|
||||
print("This handler only exists within this context")
|
||||
|
||||
# Do something that emits events
|
||||
|
||||
# Outside the context, the temporary handler is removed
|
||||
```
|
||||
|
||||
## Use Cases
|
||||
|
||||
Event listeners can be used for a variety of purposes:
|
||||
|
||||
1. **Logging and Monitoring**: Track the execution of your Crew and log important events
|
||||
2. **Analytics**: Collect data about your Crew's performance and behavior
|
||||
3. **Debugging**: Set up temporary listeners to debug specific issues
|
||||
4. **Integration**: Connect CrewAI with external systems like monitoring platforms, databases, or notification services
|
||||
5. **Custom Behavior**: Trigger custom actions based on specific events
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Keep Handlers Light**: Event handlers should be lightweight and avoid blocking operations
|
||||
2. **Error Handling**: Include proper error handling in your event handlers to prevent exceptions from affecting the main execution
|
||||
3. **Cleanup**: If your listener allocates resources, ensure they're properly cleaned up
|
||||
4. **Selective Listening**: Only listen for events you actually need to handle
|
||||
5. **Testing**: Test your event listeners in isolation to ensure they behave as expected
|
||||
|
||||
By leveraging CrewAI's event system, you can extend its functionality and integrate it seamlessly with your existing infrastructure.
|
||||
@@ -150,12 +150,12 @@ final_output = flow.kickoff()
|
||||
|
||||
print("---- Final Output ----")
|
||||
print(final_output)
|
||||
````
|
||||
```
|
||||
|
||||
```text Output
|
||||
---- Final Output ----
|
||||
Second method received: Output from first_method
|
||||
````
|
||||
```
|
||||
|
||||
</CodeGroup>
|
||||
|
||||
@@ -738,3 +738,34 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
|
||||
referrerpolicy="strict-origin-when-cross-origin"
|
||||
allowfullscreen
|
||||
></iframe>
|
||||
|
||||
## Running Flows
|
||||
|
||||
There are two ways to run a flow:
|
||||
|
||||
### Using the Flow API
|
||||
|
||||
You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method:
|
||||
|
||||
```python
|
||||
flow = ExampleFlow()
|
||||
result = flow.kickoff()
|
||||
```
|
||||
|
||||
### Using the CLI
|
||||
|
||||
Starting from version 0.103.0, you can run flows using the `crewai run` command:
|
||||
|
||||
```shell
|
||||
crewai run
|
||||
```
|
||||
|
||||
This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line.
|
||||
|
||||
For backward compatibility, you can also use:
|
||||
|
||||
```shell
|
||||
crewai flow kickoff
|
||||
```
|
||||
|
||||
However, the `crewai run` command is now the preferred method as it works for both crews and flows.
|
||||
|
||||
@@ -506,7 +506,7 @@ my_crew = Crew(
|
||||
)
|
||||
```
|
||||
|
||||
### Resetting Memory
|
||||
### Resetting Memory via cli
|
||||
|
||||
```shell
|
||||
crewai reset-memories [OPTIONS]
|
||||
@@ -520,8 +520,46 @@ crewai reset-memories [OPTIONS]
|
||||
| `-s`, `--short` | Reset SHORT TERM memory. | Flag (boolean) | False |
|
||||
| `-e`, `--entities` | Reset ENTITIES memory. | Flag (boolean) | False |
|
||||
| `-k`, `--kickoff-outputs` | Reset LATEST KICKOFF TASK OUTPUTS. | Flag (boolean) | False |
|
||||
| `-kn`, `--knowledge` | Reset KNOWLEDEGE storage | Flag (boolean) | False |
|
||||
| `-a`, `--all` | Reset ALL memories. | Flag (boolean) | False |
|
||||
|
||||
Note: To use the cli command you need to have your crew in a file called crew.py in the same directory.
|
||||
|
||||
|
||||
|
||||
|
||||
### Resetting Memory via crew object
|
||||
|
||||
```python
|
||||
|
||||
my_crew = Crew(
|
||||
agents=[...],
|
||||
tasks=[...],
|
||||
process=Process.sequential,
|
||||
memory=True,
|
||||
verbose=True,
|
||||
embedder={
|
||||
"provider": "custom",
|
||||
"config": {
|
||||
"embedder": CustomEmbedder()
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
my_crew.reset_memories(command_type = 'all') # Resets all the memory
|
||||
```
|
||||
|
||||
#### Resetting Memory Options
|
||||
|
||||
| Command Type | Description |
|
||||
| :----------------- | :------------------------------- |
|
||||
| `long` | Reset LONG TERM memory. |
|
||||
| `short` | Reset SHORT TERM memory. |
|
||||
| `entities` | Reset ENTITIES memory. |
|
||||
| `kickoff_outputs` | Reset LATEST KICKOFF TASK OUTPUTS. |
|
||||
| `knowledge` | Reset KNOWLEDGE memory. |
|
||||
| `all` | Reset ALL memories. |
|
||||
|
||||
|
||||
## Benefits of Using CrewAI's Memory System
|
||||
|
||||
|
||||
@@ -48,7 +48,6 @@ Define a crew with a designated manager and establish a clear chain of command.
|
||||
</Tip>
|
||||
|
||||
```python Code
|
||||
from langchain_openai import ChatOpenAI
|
||||
from crewai import Crew, Process, Agent
|
||||
|
||||
# Agents are defined with attributes for backstory, cache, and verbose mode
|
||||
@@ -56,38 +55,51 @@ researcher = Agent(
|
||||
role='Researcher',
|
||||
goal='Conduct in-depth analysis',
|
||||
backstory='Experienced data analyst with a knack for uncovering hidden trends.',
|
||||
cache=True,
|
||||
verbose=False,
|
||||
# tools=[] # This can be optionally specified; defaults to an empty list
|
||||
use_system_prompt=True, # Enable or disable system prompts for this agent
|
||||
max_rpm=30, # Limit on the number of requests per minute
|
||||
max_iter=5 # Maximum number of iterations for a final answer
|
||||
)
|
||||
writer = Agent(
|
||||
role='Writer',
|
||||
goal='Create engaging content',
|
||||
backstory='Creative writer passionate about storytelling in technical domains.',
|
||||
cache=True,
|
||||
verbose=False,
|
||||
# tools=[] # Optionally specify tools; defaults to an empty list
|
||||
use_system_prompt=True, # Enable or disable system prompts for this agent
|
||||
max_rpm=30, # Limit on the number of requests per minute
|
||||
max_iter=5 # Maximum number of iterations for a final answer
|
||||
)
|
||||
|
||||
# Establishing the crew with a hierarchical process and additional configurations
|
||||
project_crew = Crew(
|
||||
tasks=[...], # Tasks to be delegated and executed under the manager's supervision
|
||||
agents=[researcher, writer],
|
||||
manager_llm=ChatOpenAI(temperature=0, model="gpt-4"), # Mandatory if manager_agent is not set
|
||||
process=Process.hierarchical, # Specifies the hierarchical management approach
|
||||
respect_context_window=True, # Enable respect of the context window for tasks
|
||||
memory=True, # Enable memory usage for enhanced task execution
|
||||
manager_agent=None, # Optional: explicitly set a specific agent as manager instead of the manager_llm
|
||||
planning=True, # Enable planning feature for pre-execution strategy
|
||||
manager_llm="gpt-4o", # Specify which LLM the manager should use
|
||||
process=Process.hierarchical,
|
||||
planning=True,
|
||||
)
|
||||
```
|
||||
|
||||
### Using a Custom Manager Agent
|
||||
|
||||
Alternatively, you can create a custom manager agent with specific attributes tailored to your project's management needs. This gives you more control over the manager's behavior and capabilities.
|
||||
|
||||
```python
|
||||
# Define a custom manager agent
|
||||
manager = Agent(
|
||||
role="Project Manager",
|
||||
goal="Efficiently manage the crew and ensure high-quality task completion",
|
||||
backstory="You're an experienced project manager, skilled in overseeing complex projects and guiding teams to success.",
|
||||
allow_delegation=True,
|
||||
)
|
||||
|
||||
# Use the custom manager in your crew
|
||||
project_crew = Crew(
|
||||
tasks=[...],
|
||||
agents=[researcher, writer],
|
||||
manager_agent=manager, # Use your custom manager agent
|
||||
process=Process.hierarchical,
|
||||
planning=True,
|
||||
)
|
||||
```
|
||||
|
||||
<Tip>
|
||||
For more details on creating and customizing a manager agent, check out the [Custom Manager Agent documentation](https://docs.crewai.com/how-to/custom-manager-agent#custom-manager-agent).
|
||||
</Tip>
|
||||
|
||||
|
||||
### Workflow in Action
|
||||
|
||||
1. **Task Assignment**: The manager assigns tasks strategically, considering each agent's capabilities and available tools.
|
||||
@@ -97,4 +109,4 @@ project_crew = Crew(
|
||||
## Conclusion
|
||||
|
||||
Adopting the hierarchical process in CrewAI, with the correct configurations and understanding of the system's capabilities, facilitates an organized and efficient approach to project management.
|
||||
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.
|
||||
Utilize the advanced features and customizations to tailor the workflow to your specific needs, ensuring optimal task execution and project success.
|
||||
|
||||
@@ -54,7 +54,8 @@ coding_agent = Agent(
|
||||
# Create a task that requires code execution
|
||||
data_analysis_task = Task(
|
||||
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
|
||||
agent=coding_agent
|
||||
agent=coding_agent,
|
||||
expected_output="The average age of the participants."
|
||||
)
|
||||
|
||||
# Create a crew and add the task
|
||||
@@ -116,4 +117,4 @@ async def async_multiple_crews():
|
||||
|
||||
# Run the async function
|
||||
asyncio.run(async_multiple_crews())
|
||||
```
|
||||
```
|
||||
|
||||
@@ -10,6 +10,8 @@ This notebook demonstrates how to integrate **Langfuse** with **CrewAI** using O
|
||||
|
||||
> **What is Langfuse?** [Langfuse](https://langfuse.com) is an open-source LLM engineering platform. It provides tracing and monitoring capabilities for LLM applications, helping developers debug, analyze, and optimize their AI systems. Langfuse integrates with various tools and frameworks via native integrations, OpenTelemetry, and APIs/SDKs.
|
||||
|
||||
[](https://langfuse.com/watch-demo)
|
||||
|
||||
## Get Started
|
||||
|
||||
We'll walk through a simple example of using CrewAI and integrating it with Langfuse via OpenTelemetry using OpenLit.
|
||||
|
||||
@@ -139,6 +139,7 @@
|
||||
"tools/nl2sqltool",
|
||||
"tools/pdfsearchtool",
|
||||
"tools/pgsearchtool",
|
||||
"tools/qdrantvectorsearchtool",
|
||||
"tools/scrapewebsitetool",
|
||||
"tools/seleniumscrapingtool",
|
||||
"tools/spidertool",
|
||||
|
||||
271
docs/tools/qdrantvectorsearchtool.mdx
Normal file
271
docs/tools/qdrantvectorsearchtool.mdx
Normal file
@@ -0,0 +1,271 @@
|
||||
---
|
||||
title: 'Qdrant Vector Search Tool'
|
||||
description: 'Semantic search capabilities for CrewAI agents using Qdrant vector database'
|
||||
icon: magnifying-glass-plus
|
||||
---
|
||||
|
||||
# `QdrantVectorSearchTool`
|
||||
|
||||
The Qdrant Vector Search Tool enables semantic search capabilities in your CrewAI agents by leveraging [Qdrant](https://qdrant.tech/), a vector similarity search engine. This tool allows your agents to search through documents stored in a Qdrant collection using semantic similarity.
|
||||
|
||||
## Installation
|
||||
|
||||
Install the required packages:
|
||||
|
||||
```bash
|
||||
uv pip install 'crewai[tools] qdrant-client'
|
||||
```
|
||||
|
||||
## Basic Usage
|
||||
|
||||
Here's a minimal example of how to use the tool:
|
||||
|
||||
```python
|
||||
from crewai import Agent
|
||||
from crewai_tools import QdrantVectorSearchTool
|
||||
|
||||
# Initialize the tool
|
||||
qdrant_tool = QdrantVectorSearchTool(
|
||||
qdrant_url="your_qdrant_url",
|
||||
qdrant_api_key="your_qdrant_api_key",
|
||||
collection_name="your_collection"
|
||||
)
|
||||
|
||||
# Create an agent that uses the tool
|
||||
agent = Agent(
|
||||
role="Research Assistant",
|
||||
goal="Find relevant information in documents",
|
||||
tools=[qdrant_tool]
|
||||
)
|
||||
|
||||
# The tool will automatically use OpenAI embeddings
|
||||
# and return the 3 most relevant results with scores > 0.35
|
||||
```
|
||||
|
||||
## Complete Working Example
|
||||
|
||||
Here's a complete example showing how to:
|
||||
1. Extract text from a PDF
|
||||
2. Generate embeddings using OpenAI
|
||||
3. Store in Qdrant
|
||||
4. Create a CrewAI agentic RAG workflow for semantic search
|
||||
|
||||
```python
|
||||
import os
|
||||
import uuid
|
||||
import pdfplumber
|
||||
from openai import OpenAI
|
||||
from dotenv import load_dotenv
|
||||
from crewai import Agent, Task, Crew, Process, LLM
|
||||
from crewai_tools import QdrantVectorSearchTool
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.models import PointStruct, Distance, VectorParams
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
|
||||
# Initialize OpenAI client
|
||||
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
|
||||
|
||||
# Extract text from PDF
|
||||
def extract_text_from_pdf(pdf_path):
|
||||
text = []
|
||||
with pdfplumber.open(pdf_path) as pdf:
|
||||
for page in pdf.pages:
|
||||
page_text = page.extract_text()
|
||||
if page_text:
|
||||
text.append(page_text.strip())
|
||||
return text
|
||||
|
||||
# Generate OpenAI embeddings
|
||||
def get_openai_embedding(text):
|
||||
response = client.embeddings.create(
|
||||
input=text,
|
||||
model="text-embedding-3-small"
|
||||
)
|
||||
return response.data[0].embedding
|
||||
|
||||
# Store text and embeddings in Qdrant
|
||||
def load_pdf_to_qdrant(pdf_path, qdrant, collection_name):
|
||||
# Extract text from PDF
|
||||
text_chunks = extract_text_from_pdf(pdf_path)
|
||||
|
||||
# Create Qdrant collection
|
||||
if qdrant.collection_exists(collection_name):
|
||||
qdrant.delete_collection(collection_name)
|
||||
qdrant.create_collection(
|
||||
collection_name=collection_name,
|
||||
vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
|
||||
)
|
||||
|
||||
# Store embeddings
|
||||
points = []
|
||||
for chunk in text_chunks:
|
||||
embedding = get_openai_embedding(chunk)
|
||||
points.append(PointStruct(
|
||||
id=str(uuid.uuid4()),
|
||||
vector=embedding,
|
||||
payload={"text": chunk}
|
||||
))
|
||||
qdrant.upsert(collection_name=collection_name, points=points)
|
||||
|
||||
# Initialize Qdrant client and load data
|
||||
qdrant = QdrantClient(
|
||||
url=os.getenv("QDRANT_URL"),
|
||||
api_key=os.getenv("QDRANT_API_KEY")
|
||||
)
|
||||
collection_name = "example_collection"
|
||||
pdf_path = "path/to/your/document.pdf"
|
||||
load_pdf_to_qdrant(pdf_path, qdrant, collection_name)
|
||||
|
||||
# Initialize Qdrant search tool
|
||||
qdrant_tool = QdrantVectorSearchTool(
|
||||
qdrant_url=os.getenv("QDRANT_URL"),
|
||||
qdrant_api_key=os.getenv("QDRANT_API_KEY"),
|
||||
collection_name=collection_name,
|
||||
limit=3,
|
||||
score_threshold=0.35
|
||||
)
|
||||
|
||||
# Create CrewAI agents
|
||||
search_agent = Agent(
|
||||
role="Senior Semantic Search Agent",
|
||||
goal="Find and analyze documents based on semantic search",
|
||||
backstory="""You are an expert research assistant who can find relevant
|
||||
information using semantic search in a Qdrant database.""",
|
||||
tools=[qdrant_tool],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
answer_agent = Agent(
|
||||
role="Senior Answer Assistant",
|
||||
goal="Generate answers to questions based on the context provided",
|
||||
backstory="""You are an expert answer assistant who can generate
|
||||
answers to questions based on the context provided.""",
|
||||
tools=[qdrant_tool],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Define tasks
|
||||
search_task = Task(
|
||||
description="""Search for relevant documents about the {query}.
|
||||
Your final answer should include:
|
||||
- The relevant information found
|
||||
- The similarity scores of the results
|
||||
- The metadata of the relevant documents""",
|
||||
agent=search_agent
|
||||
)
|
||||
|
||||
answer_task = Task(
|
||||
description="""Given the context and metadata of relevant documents,
|
||||
generate a final answer based on the context.""",
|
||||
agent=answer_agent
|
||||
)
|
||||
|
||||
# Run CrewAI workflow
|
||||
crew = Crew(
|
||||
agents=[search_agent, answer_agent],
|
||||
tasks=[search_task, answer_task],
|
||||
process=Process.sequential,
|
||||
verbose=True
|
||||
)
|
||||
|
||||
result = crew.kickoff(
|
||||
inputs={"query": "What is the role of X in the document?"}
|
||||
)
|
||||
print(result)
|
||||
```
|
||||
|
||||
## Tool Parameters
|
||||
|
||||
### Required Parameters
|
||||
- `qdrant_url` (str): The URL of your Qdrant server
|
||||
- `qdrant_api_key` (str): API key for authentication with Qdrant
|
||||
- `collection_name` (str): Name of the Qdrant collection to search
|
||||
|
||||
### Optional Parameters
|
||||
- `limit` (int): Maximum number of results to return (default: 3)
|
||||
- `score_threshold` (float): Minimum similarity score threshold (default: 0.35)
|
||||
- `custom_embedding_fn` (Callable[[str], list[float]]): Custom function for text vectorization
|
||||
|
||||
## Search Parameters
|
||||
|
||||
The tool accepts these parameters in its schema:
|
||||
- `query` (str): The search query to find similar documents
|
||||
- `filter_by` (str, optional): Metadata field to filter on
|
||||
- `filter_value` (str, optional): Value to filter by
|
||||
|
||||
## Return Format
|
||||
|
||||
The tool returns results in JSON format:
|
||||
|
||||
```json
|
||||
[
|
||||
{
|
||||
"metadata": {
|
||||
// Any metadata stored with the document
|
||||
},
|
||||
"context": "The actual text content of the document",
|
||||
"distance": 0.95 // Similarity score
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## Default Embedding
|
||||
|
||||
By default, the tool uses OpenAI's `text-embedding-3-small` model for vectorization. This requires:
|
||||
- OpenAI API key set in environment: `OPENAI_API_KEY`
|
||||
|
||||
## Custom Embeddings
|
||||
|
||||
Instead of using the default embedding model, you might want to use your own embedding function in cases where you:
|
||||
|
||||
1. Want to use a different embedding model (e.g., Cohere, HuggingFace, Ollama models)
|
||||
2. Need to reduce costs by using open-source embedding models
|
||||
3. Have specific requirements for vector dimensions or embedding quality
|
||||
4. Want to use domain-specific embeddings (e.g., for medical or legal text)
|
||||
|
||||
Here's an example using a HuggingFace model:
|
||||
|
||||
```python
|
||||
from transformers import AutoTokenizer, AutoModel
|
||||
import torch
|
||||
|
||||
# Load model and tokenizer
|
||||
tokenizer = AutoTokenizer.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
|
||||
model = AutoModel.from_pretrained('sentence-transformers/all-MiniLM-L6-v2')
|
||||
|
||||
def custom_embeddings(text: str) -> list[float]:
|
||||
# Tokenize and get model outputs
|
||||
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
|
||||
outputs = model(**inputs)
|
||||
|
||||
# Use mean pooling to get text embedding
|
||||
embeddings = outputs.last_hidden_state.mean(dim=1)
|
||||
|
||||
# Convert to list of floats and return
|
||||
return embeddings[0].tolist()
|
||||
|
||||
# Use custom embeddings with the tool
|
||||
tool = QdrantVectorSearchTool(
|
||||
qdrant_url="your_url",
|
||||
qdrant_api_key="your_key",
|
||||
collection_name="your_collection",
|
||||
custom_embedding_fn=custom_embeddings # Pass your custom function
|
||||
)
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
The tool handles these specific errors:
|
||||
- Raises ImportError if `qdrant-client` is not installed (with option to auto-install)
|
||||
- Raises ValueError if `QDRANT_URL` is not set
|
||||
- Prompts to install `qdrant-client` if missing using `uv add qdrant-client`
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Required environment variables:
|
||||
```bash
|
||||
export QDRANT_URL="your_qdrant_url" # If not provided in constructor
|
||||
export QDRANT_API_KEY="your_api_key" # If not provided in constructor
|
||||
export OPENAI_API_KEY="your_openai_key" # If using default embeddings
|
||||
@@ -114,7 +114,6 @@ class Agent(BaseAgent):
|
||||
|
||||
@model_validator(mode="after")
|
||||
def post_init_setup(self):
|
||||
self._set_knowledge()
|
||||
self.agent_ops_agent_name = self.role
|
||||
|
||||
self.llm = create_llm(self.llm)
|
||||
@@ -134,8 +133,11 @@ class Agent(BaseAgent):
|
||||
self.cache_handler = CacheHandler()
|
||||
self.set_cache_handler(self.cache_handler)
|
||||
|
||||
def _set_knowledge(self):
|
||||
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
|
||||
try:
|
||||
if self.embedder is None and crew_embedder:
|
||||
self.embedder = crew_embedder
|
||||
|
||||
if self.knowledge_sources:
|
||||
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
|
||||
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
|
||||
|
||||
@@ -351,3 +351,6 @@ class BaseAgent(ABC, BaseModel):
|
||||
if not self._rpm_controller:
|
||||
self._rpm_controller = rpm_controller
|
||||
self.create_agent_executor()
|
||||
|
||||
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
|
||||
pass
|
||||
|
||||
@@ -31,11 +31,11 @@ class OutputConverter(BaseModel, ABC):
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def to_pydantic(self, current_attempt=1):
|
||||
def to_pydantic(self, current_attempt=1) -> BaseModel:
|
||||
"""Convert text to pydantic."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def to_json(self, current_attempt=1):
|
||||
def to_json(self, current_attempt=1) -> dict:
|
||||
"""Convert text to json."""
|
||||
pass
|
||||
|
||||
@@ -124,14 +124,15 @@ class CrewAgentParser:
|
||||
)
|
||||
|
||||
def _extract_thought(self, text: str) -> str:
|
||||
regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)"
|
||||
thought_match = re.search(regex, text, re.DOTALL)
|
||||
if thought_match:
|
||||
thought = thought_match.group(1).strip()
|
||||
# Remove any triple backticks from the thought string
|
||||
thought = thought.replace("```", "").strip()
|
||||
return thought
|
||||
return ""
|
||||
thought_index = text.find("\n\nAction")
|
||||
if thought_index == -1:
|
||||
thought_index = text.find("\n\nFinal Answer")
|
||||
if thought_index == -1:
|
||||
return ""
|
||||
thought = text[:thought_index].strip()
|
||||
# Remove any triple backticks from the thought string
|
||||
thought = thought.replace("```", "").strip()
|
||||
return thought
|
||||
|
||||
def _clean_action(self, text: str) -> str:
|
||||
"""Clean action string by removing non-essential formatting characters."""
|
||||
|
||||
@@ -203,7 +203,6 @@ def install(context):
|
||||
@crewai.command()
|
||||
def run():
|
||||
"""Run the Crew."""
|
||||
click.echo("Running the Crew")
|
||||
run_crew()
|
||||
|
||||
|
||||
|
||||
@@ -216,10 +216,43 @@ MODELS = {
|
||||
"watsonx/ibm/granite-3-8b-instruct",
|
||||
],
|
||||
"bedrock": [
|
||||
"bedrock/us.amazon.nova-pro-v1:0",
|
||||
"bedrock/us.amazon.nova-micro-v1:0",
|
||||
"bedrock/us.amazon.nova-lite-v1:0",
|
||||
"bedrock/us.anthropic.claude-3-5-sonnet-20240620-v1:0",
|
||||
"bedrock/us.anthropic.claude-3-5-haiku-20241022-v1:0",
|
||||
"bedrock/us.anthropic.claude-3-5-sonnet-20241022-v2:0",
|
||||
"bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
|
||||
"bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
"bedrock/us.anthropic.claude-3-opus-20240229-v1:0",
|
||||
"bedrock/us.anthropic.claude-3-haiku-20240307-v1:0",
|
||||
"bedrock/us.meta.llama3-2-11b-instruct-v1:0",
|
||||
"bedrock/us.meta.llama3-2-3b-instruct-v1:0",
|
||||
"bedrock/us.meta.llama3-2-90b-instruct-v1:0",
|
||||
"bedrock/us.meta.llama3-2-1b-instruct-v1:0",
|
||||
"bedrock/us.meta.llama3-1-8b-instruct-v1:0",
|
||||
"bedrock/us.meta.llama3-1-70b-instruct-v1:0",
|
||||
"bedrock/us.meta.llama3-3-70b-instruct-v1:0",
|
||||
"bedrock/us.meta.llama3-1-405b-instruct-v1:0",
|
||||
"bedrock/eu.anthropic.claude-3-5-sonnet-20240620-v1:0",
|
||||
"bedrock/eu.anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
"bedrock/eu.anthropic.claude-3-haiku-20240307-v1:0",
|
||||
"bedrock/eu.meta.llama3-2-3b-instruct-v1:0",
|
||||
"bedrock/eu.meta.llama3-2-1b-instruct-v1:0",
|
||||
"bedrock/apac.anthropic.claude-3-5-sonnet-20240620-v1:0",
|
||||
"bedrock/apac.anthropic.claude-3-5-sonnet-20241022-v2:0",
|
||||
"bedrock/apac.anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
"bedrock/apac.anthropic.claude-3-haiku-20240307-v1:0",
|
||||
"bedrock/amazon.nova-pro-v1:0",
|
||||
"bedrock/amazon.nova-micro-v1:0",
|
||||
"bedrock/amazon.nova-lite-v1:0",
|
||||
"bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0",
|
||||
"bedrock/anthropic.claude-3-5-haiku-20241022-v1:0",
|
||||
"bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
|
||||
"bedrock/anthropic.claude-3-7-sonnet-20250219-v1:0",
|
||||
"bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
|
||||
"bedrock/anthropic.claude-3-opus-20240229-v1:0",
|
||||
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
|
||||
"bedrock/anthropic.claude-v2:1",
|
||||
"bedrock/anthropic.claude-v2",
|
||||
"bedrock/anthropic.claude-instant-v1",
|
||||
@@ -234,8 +267,6 @@ MODELS = {
|
||||
"bedrock/ai21.j2-mid-v1",
|
||||
"bedrock/ai21.j2-ultra-v1",
|
||||
"bedrock/ai21.jamba-instruct-v1:0",
|
||||
"bedrock/meta.llama2-13b-chat-v1",
|
||||
"bedrock/meta.llama2-70b-chat-v1",
|
||||
"bedrock/mistral.mistral-7b-instruct-v0:2",
|
||||
"bedrock/mistral.mixtral-8x7b-instruct-v0:1",
|
||||
],
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import subprocess
|
||||
from enum import Enum
|
||||
from typing import List, Optional
|
||||
|
||||
import click
|
||||
from packaging import version
|
||||
@@ -7,16 +9,24 @@ from crewai.cli.utils import read_toml
|
||||
from crewai.cli.version import get_crewai_version
|
||||
|
||||
|
||||
class CrewType(Enum):
|
||||
STANDARD = "standard"
|
||||
FLOW = "flow"
|
||||
|
||||
|
||||
def run_crew() -> None:
|
||||
"""
|
||||
Run the crew by running a command in the UV environment.
|
||||
Run the crew or flow by running a command in the UV environment.
|
||||
|
||||
Starting from version 0.103.0, this command can be used to run both
|
||||
standard crews and flows. For flows, it detects the type from pyproject.toml
|
||||
and automatically runs the appropriate command.
|
||||
"""
|
||||
command = ["uv", "run", "run_crew"]
|
||||
crewai_version = get_crewai_version()
|
||||
min_required_version = "0.71.0"
|
||||
|
||||
pyproject_data = read_toml()
|
||||
|
||||
# Check for legacy poetry configuration
|
||||
if pyproject_data.get("tool", {}).get("poetry") and (
|
||||
version.parse(crewai_version) < version.parse(min_required_version)
|
||||
):
|
||||
@@ -26,18 +36,54 @@ def run_crew() -> None:
|
||||
fg="red",
|
||||
)
|
||||
|
||||
# Determine crew type
|
||||
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
|
||||
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
|
||||
|
||||
# Display appropriate message
|
||||
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
|
||||
|
||||
# Execute the appropriate command
|
||||
execute_command(crew_type)
|
||||
|
||||
|
||||
def execute_command(crew_type: CrewType) -> None:
|
||||
"""
|
||||
Execute the appropriate command based on crew type.
|
||||
|
||||
Args:
|
||||
crew_type: The type of crew to run
|
||||
"""
|
||||
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
|
||||
|
||||
try:
|
||||
subprocess.run(command, capture_output=False, text=True, check=True)
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
click.echo(f"An error occurred while running the crew: {e}", err=True)
|
||||
click.echo(e.output, err=True, nl=True)
|
||||
|
||||
if pyproject_data.get("tool", {}).get("poetry"):
|
||||
click.secho(
|
||||
"It's possible that you are using an old version of crewAI that uses poetry, please run `crewai update` to update your pyproject.toml to use uv.",
|
||||
fg="yellow",
|
||||
)
|
||||
handle_error(e, crew_type)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"An unexpected error occurred: {e}", err=True)
|
||||
|
||||
|
||||
def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None:
|
||||
"""
|
||||
Handle subprocess errors with appropriate messaging.
|
||||
|
||||
Args:
|
||||
error: The subprocess error that occurred
|
||||
crew_type: The type of crew that was being run
|
||||
"""
|
||||
entity_type = "flow" if crew_type == CrewType.FLOW else "crew"
|
||||
click.echo(f"An error occurred while running the {entity_type}: {error}", err=True)
|
||||
|
||||
if error.output:
|
||||
click.echo(error.output, err=True, nl=True)
|
||||
|
||||
pyproject_data = read_toml()
|
||||
if pyproject_data.get("tool", {}).get("poetry"):
|
||||
click.secho(
|
||||
"It's possible that you are using an old version of crewAI that uses poetry, "
|
||||
"please run `crewai update` to update your pyproject.toml to use uv.",
|
||||
fg="yellow",
|
||||
)
|
||||
|
||||
@@ -30,13 +30,13 @@ crewai install
|
||||
|
||||
## Running the Project
|
||||
|
||||
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
|
||||
To kickstart your flow and begin execution, run this from the root folder of your project:
|
||||
|
||||
```bash
|
||||
crewai run
|
||||
```
|
||||
|
||||
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
|
||||
This command initializes the {{name}} Flow as defined in your configuration.
|
||||
|
||||
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
|
||||
|
||||
|
||||
@@ -257,11 +257,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
|
||||
import os
|
||||
|
||||
for root, _, files in os.walk("."):
|
||||
if "crew.py" in files:
|
||||
crew_path = os.path.join(root, "crew.py")
|
||||
if crew_path in files:
|
||||
crew_os_path = os.path.join(root, crew_path)
|
||||
try:
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"crew_module", crew_path
|
||||
"crew_module", crew_os_path
|
||||
)
|
||||
if not spec or not spec.loader:
|
||||
continue
|
||||
@@ -273,9 +273,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
|
||||
for attr_name in dir(module):
|
||||
attr = getattr(module, attr_name)
|
||||
try:
|
||||
if callable(attr) and hasattr(attr, "crew"):
|
||||
crew_instance = attr().crew()
|
||||
return crew_instance
|
||||
if isinstance(attr, Crew) and hasattr(attr, "kickoff"):
|
||||
print(
|
||||
f"Found valid crew object in attribute '{attr_name}' at {crew_os_path}."
|
||||
)
|
||||
return attr
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error processing attribute {attr_name}: {e}")
|
||||
|
||||
@@ -35,10 +35,8 @@ from crewai.process import Process
|
||||
from crewai.task import Task
|
||||
from crewai.tasks.conditional_task import ConditionalTask
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.telemetry import Telemetry
|
||||
from crewai.tools.agent_tools.agent_tools import AgentTools
|
||||
from crewai.tools.base_tool import Tool
|
||||
from crewai.traces.unified_trace_controller import init_crew_main_trace
|
||||
from crewai.types.usage_metrics import UsageMetrics
|
||||
from crewai.utilities import I18N, FileHandler, Logger, RPMController
|
||||
from crewai.utilities.constants import TRAINING_DATA_FILE
|
||||
@@ -258,8 +256,6 @@ class Crew(BaseModel):
|
||||
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
|
||||
self.function_calling_llm = create_llm(self.function_calling_llm)
|
||||
|
||||
self._telemetry = Telemetry()
|
||||
self._telemetry.set_tracer()
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
@@ -574,7 +570,6 @@ class Crew(BaseModel):
|
||||
CrewTrainingHandler(filename).clear()
|
||||
raise
|
||||
|
||||
@init_crew_main_trace
|
||||
def kickoff(
|
||||
self,
|
||||
inputs: Optional[Dict[str, Any]] = None,
|
||||
@@ -605,6 +600,7 @@ class Crew(BaseModel):
|
||||
agent.i18n = i18n
|
||||
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
|
||||
agent.crew = self # type: ignore[attr-defined]
|
||||
agent.set_knowledge(crew_embedder=self.embedder)
|
||||
# TODO: Create an AgentFunctionCalling protocol for future refactoring
|
||||
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
|
||||
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
|
||||
@@ -1115,7 +1111,6 @@ class Crew(BaseModel):
|
||||
"_short_term_memory",
|
||||
"_long_term_memory",
|
||||
"_entity_memory",
|
||||
"_telemetry",
|
||||
"agents",
|
||||
"tasks",
|
||||
"knowledge_sources",
|
||||
@@ -1278,11 +1273,11 @@ class Crew(BaseModel):
|
||||
def _reset_all_memories(self) -> None:
|
||||
"""Reset all available memory systems."""
|
||||
memory_systems = [
|
||||
("short term", self._short_term_memory),
|
||||
("entity", self._entity_memory),
|
||||
("long term", self._long_term_memory),
|
||||
("task output", self._task_output_handler),
|
||||
("knowledge", self.knowledge),
|
||||
("short term", getattr(self, "_short_term_memory", None)),
|
||||
("entity", getattr(self, "_entity_memory", None)),
|
||||
("long term", getattr(self, "_long_term_memory", None)),
|
||||
("task output", getattr(self, "_task_output_handler", None)),
|
||||
("knowledge", getattr(self, "knowledge", None)),
|
||||
]
|
||||
|
||||
for name, system in memory_systems:
|
||||
|
||||
@@ -22,10 +22,6 @@ from pydantic import BaseModel, Field, ValidationError
|
||||
from crewai.flow.flow_visualizer import plot_flow
|
||||
from crewai.flow.persistence.base import FlowPersistence
|
||||
from crewai.flow.utils import get_possible_return_constants
|
||||
from crewai.traces.unified_trace_controller import (
|
||||
init_flow_main_trace,
|
||||
trace_flow_step,
|
||||
)
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.flow_events import (
|
||||
FlowCreatedEvent,
|
||||
@@ -713,16 +709,34 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
raise TypeError(f"State must be dict or BaseModel, got {type(self._state)}")
|
||||
|
||||
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||
"""Start the flow execution.
|
||||
"""
|
||||
Start the flow execution in a synchronous context.
|
||||
|
||||
This method wraps kickoff_async so that all state initialization and event
|
||||
emission is handled in the asynchronous method.
|
||||
"""
|
||||
|
||||
async def run_flow():
|
||||
return await self.kickoff_async(inputs)
|
||||
|
||||
return asyncio.run(run_flow())
|
||||
|
||||
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||
"""
|
||||
Start the flow execution asynchronously.
|
||||
|
||||
This method performs state restoration (if an 'id' is provided and persistence is available)
|
||||
and updates the flow state with any additional inputs. It then emits the FlowStartedEvent,
|
||||
logs the flow startup, and executes all start methods. Once completed, it emits the
|
||||
FlowFinishedEvent and returns the final output.
|
||||
|
||||
Args:
|
||||
inputs: Optional dictionary containing input values and potentially a state ID to restore
|
||||
"""
|
||||
# Handle state restoration if ID is provided in inputs
|
||||
if inputs and "id" in inputs and self._persistence is not None:
|
||||
restore_uuid = inputs["id"]
|
||||
stored_state = self._persistence.load_state(restore_uuid)
|
||||
inputs: Optional dictionary containing input values and/or a state ID for restoration.
|
||||
|
||||
Returns:
|
||||
The final output from the flow, which is the result of the last executed method.
|
||||
"""
|
||||
if inputs:
|
||||
# Override the id in the state if it exists in inputs
|
||||
if "id" in inputs:
|
||||
if isinstance(self._state, dict):
|
||||
@@ -730,24 +744,27 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
elif isinstance(self._state, BaseModel):
|
||||
setattr(self._state, "id", inputs["id"])
|
||||
|
||||
if stored_state:
|
||||
self._log_flow_event(
|
||||
f"Loading flow state from memory for UUID: {restore_uuid}",
|
||||
color="yellow",
|
||||
)
|
||||
# Restore the state
|
||||
self._restore_state(stored_state)
|
||||
else:
|
||||
self._log_flow_event(
|
||||
f"No flow state found for UUID: {restore_uuid}", color="red"
|
||||
)
|
||||
# If persistence is enabled, attempt to restore the stored state using the provided id.
|
||||
if "id" in inputs and self._persistence is not None:
|
||||
restore_uuid = inputs["id"]
|
||||
stored_state = self._persistence.load_state(restore_uuid)
|
||||
if stored_state:
|
||||
self._log_flow_event(
|
||||
f"Loading flow state from memory for UUID: {restore_uuid}",
|
||||
color="yellow",
|
||||
)
|
||||
self._restore_state(stored_state)
|
||||
else:
|
||||
self._log_flow_event(
|
||||
f"No flow state found for UUID: {restore_uuid}", color="red"
|
||||
)
|
||||
|
||||
# Apply any additional inputs after restoration
|
||||
# Update state with any additional inputs (ignoring the 'id' key)
|
||||
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
|
||||
if filtered_inputs:
|
||||
self._initialize_state(filtered_inputs)
|
||||
|
||||
# Start flow execution
|
||||
# Emit FlowStartedEvent and log the start of the flow.
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
FlowStartedEvent(
|
||||
@@ -763,16 +780,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if inputs is not None and "id" not in inputs:
|
||||
self._initialize_state(inputs)
|
||||
|
||||
async def run_flow():
|
||||
return await self.kickoff_async()
|
||||
|
||||
return asyncio.run(run_flow())
|
||||
|
||||
@init_flow_main_trace
|
||||
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
|
||||
if not self._start_methods:
|
||||
raise ValueError("No start method defined")
|
||||
|
||||
tasks = [
|
||||
self._execute_start_method(start_method)
|
||||
for start_method in self._start_methods
|
||||
@@ -789,6 +796,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
result=final_output,
|
||||
),
|
||||
)
|
||||
|
||||
return final_output
|
||||
|
||||
async def _execute_start_method(self, start_method_name: str) -> None:
|
||||
@@ -814,7 +822,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
)
|
||||
await self._execute_listeners(start_method_name, result)
|
||||
|
||||
@trace_flow_step
|
||||
async def _execute_method(
|
||||
self, method_name: str, method: Callable, *args: Any, **kwargs: Any
|
||||
) -> Any:
|
||||
@@ -887,35 +894,45 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
Notes
|
||||
-----
|
||||
- Routers are executed sequentially to maintain flow control
|
||||
- Each router's result becomes the new trigger_method
|
||||
- Each router's result becomes a new trigger_method
|
||||
- Normal listeners are executed in parallel for efficiency
|
||||
- Listeners can receive the trigger method's result as a parameter
|
||||
"""
|
||||
# First, handle routers repeatedly until no router triggers anymore
|
||||
router_results = []
|
||||
current_trigger = trigger_method
|
||||
|
||||
while True:
|
||||
routers_triggered = self._find_triggered_methods(
|
||||
trigger_method, router_only=True
|
||||
current_trigger, router_only=True
|
||||
)
|
||||
if not routers_triggered:
|
||||
break
|
||||
|
||||
for router_name in routers_triggered:
|
||||
await self._execute_single_listener(router_name, result)
|
||||
# After executing router, the router's result is the path
|
||||
# The last router executed sets the trigger_method
|
||||
# The router result is the last element in self._method_outputs
|
||||
trigger_method = self._method_outputs[-1]
|
||||
router_result = self._method_outputs[-1]
|
||||
if router_result: # Only add non-None results
|
||||
router_results.append(router_result)
|
||||
current_trigger = (
|
||||
router_result # Update for next iteration of router chain
|
||||
)
|
||||
|
||||
# Now that no more routers are triggered by current trigger_method,
|
||||
# execute normal listeners
|
||||
listeners_triggered = self._find_triggered_methods(
|
||||
trigger_method, router_only=False
|
||||
)
|
||||
if listeners_triggered:
|
||||
tasks = [
|
||||
self._execute_single_listener(listener_name, result)
|
||||
for listener_name in listeners_triggered
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
# Now execute normal listeners for all router results and the original trigger
|
||||
all_triggers = [trigger_method] + router_results
|
||||
|
||||
for current_trigger in all_triggers:
|
||||
if current_trigger: # Skip None results
|
||||
listeners_triggered = self._find_triggered_methods(
|
||||
current_trigger, router_only=False
|
||||
)
|
||||
if listeners_triggered:
|
||||
tasks = [
|
||||
self._execute_single_listener(listener_name, result)
|
||||
for listener_name in listeners_triggered
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
def _find_triggered_methods(
|
||||
self, trigger_method: str, router_only: bool
|
||||
|
||||
@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Union
|
||||
|
||||
@@ -34,6 +34,7 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
ValueError: If db_path is invalid
|
||||
"""
|
||||
from crewai.utilities.paths import db_storage_path
|
||||
|
||||
# Get path from argument or default location
|
||||
path = db_path or str(Path(db_storage_path()) / "flow_states.db")
|
||||
|
||||
@@ -46,7 +47,8 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
def init_db(self) -> None:
|
||||
"""Create the necessary tables if they don't exist."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("""
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS flow_states (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
flow_uuid TEXT NOT NULL,
|
||||
@@ -54,12 +56,15 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
timestamp DATETIME NOT NULL,
|
||||
state_json TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
"""
|
||||
)
|
||||
# Add index for faster UUID lookups
|
||||
conn.execute("""
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
|
||||
ON flow_states(flow_uuid)
|
||||
""")
|
||||
"""
|
||||
)
|
||||
|
||||
def save_state(
|
||||
self,
|
||||
@@ -85,19 +90,22 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("""
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO flow_states (
|
||||
flow_uuid,
|
||||
method_name,
|
||||
timestamp,
|
||||
state_json
|
||||
) VALUES (?, ?, ?, ?)
|
||||
""", (
|
||||
flow_uuid,
|
||||
method_name,
|
||||
datetime.utcnow().isoformat(),
|
||||
json.dumps(state_dict),
|
||||
))
|
||||
""",
|
||||
(
|
||||
flow_uuid,
|
||||
method_name,
|
||||
datetime.now(timezone.utc).isoformat(),
|
||||
json.dumps(state_dict),
|
||||
),
|
||||
)
|
||||
|
||||
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
|
||||
"""Load the most recent state for a given flow UUID.
|
||||
@@ -109,13 +117,16 @@ class SQLiteFlowPersistence(FlowPersistence):
|
||||
The most recent state as a dictionary, or None if no state exists
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.execute("""
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT state_json
|
||||
FROM flow_states
|
||||
WHERE flow_uuid = ?
|
||||
ORDER BY id DESC
|
||||
LIMIT 1
|
||||
""", (flow_uuid,))
|
||||
""",
|
||||
(flow_uuid,),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row:
|
||||
|
||||
@@ -16,7 +16,8 @@ Example
|
||||
import ast
|
||||
import inspect
|
||||
import textwrap
|
||||
from typing import Any, Dict, List, Optional, Set, Union
|
||||
from collections import defaultdict, deque
|
||||
from typing import Any, Deque, Dict, List, Optional, Set, Union
|
||||
|
||||
|
||||
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
|
||||
@@ -118,7 +119,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
|
||||
- Processes router paths separately
|
||||
"""
|
||||
levels: Dict[str, int] = {}
|
||||
queue: List[str] = []
|
||||
queue: Deque[str] = deque()
|
||||
visited: Set[str] = set()
|
||||
pending_and_listeners: Dict[str, Set[str]] = {}
|
||||
|
||||
@@ -128,28 +129,35 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
|
||||
levels[method_name] = 0
|
||||
queue.append(method_name)
|
||||
|
||||
# Precompute listener dependencies
|
||||
or_listeners = defaultdict(list)
|
||||
and_listeners = defaultdict(set)
|
||||
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
|
||||
if condition_type == "OR":
|
||||
for method in trigger_methods:
|
||||
or_listeners[method].append(listener_name)
|
||||
elif condition_type == "AND":
|
||||
and_listeners[listener_name] = set(trigger_methods)
|
||||
|
||||
# Breadth-first traversal to assign levels
|
||||
while queue:
|
||||
current = queue.pop(0)
|
||||
current = queue.popleft()
|
||||
current_level = levels[current]
|
||||
visited.add(current)
|
||||
|
||||
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
|
||||
if condition_type == "OR":
|
||||
if current in trigger_methods:
|
||||
if (
|
||||
listener_name not in levels
|
||||
or levels[listener_name] > current_level + 1
|
||||
):
|
||||
levels[listener_name] = current_level + 1
|
||||
if listener_name not in visited:
|
||||
queue.append(listener_name)
|
||||
elif condition_type == "AND":
|
||||
for listener_name in or_listeners[current]:
|
||||
if listener_name not in levels or levels[listener_name] > current_level + 1:
|
||||
levels[listener_name] = current_level + 1
|
||||
if listener_name not in visited:
|
||||
queue.append(listener_name)
|
||||
|
||||
for listener_name, required_methods in and_listeners.items():
|
||||
if current in required_methods:
|
||||
if listener_name not in pending_and_listeners:
|
||||
pending_and_listeners[listener_name] = set()
|
||||
if current in trigger_methods:
|
||||
pending_and_listeners[listener_name].add(current)
|
||||
if set(trigger_methods) == pending_and_listeners[listener_name]:
|
||||
pending_and_listeners[listener_name].add(current)
|
||||
|
||||
if required_methods == pending_and_listeners[listener_name]:
|
||||
if (
|
||||
listener_name not in levels
|
||||
or levels[listener_name] > current_level + 1
|
||||
@@ -159,22 +167,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
|
||||
queue.append(listener_name)
|
||||
|
||||
# Handle router connections
|
||||
if current in flow._routers:
|
||||
router_method_name = current
|
||||
paths = flow._router_paths.get(router_method_name, [])
|
||||
for path in paths:
|
||||
for listener_name, (
|
||||
condition_type,
|
||||
trigger_methods,
|
||||
) in flow._listeners.items():
|
||||
if path in trigger_methods:
|
||||
if (
|
||||
listener_name not in levels
|
||||
or levels[listener_name] > current_level + 1
|
||||
):
|
||||
levels[listener_name] = current_level + 1
|
||||
if listener_name not in visited:
|
||||
queue.append(listener_name)
|
||||
process_router_paths(flow, current, current_level, levels, queue)
|
||||
|
||||
return levels
|
||||
|
||||
@@ -227,10 +220,7 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
|
||||
|
||||
|
||||
def dfs_ancestors(
|
||||
node: str,
|
||||
ancestors: Dict[str, Set[str]],
|
||||
visited: Set[str],
|
||||
flow: Any
|
||||
node: str, ancestors: Dict[str, Set[str]], visited: Set[str], flow: Any
|
||||
) -> None:
|
||||
"""
|
||||
Perform depth-first search to build ancestor relationships.
|
||||
@@ -274,7 +264,9 @@ def dfs_ancestors(
|
||||
dfs_ancestors(listener_name, ancestors, visited, flow)
|
||||
|
||||
|
||||
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool:
|
||||
def is_ancestor(
|
||||
node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]
|
||||
) -> bool:
|
||||
"""
|
||||
Check if one node is an ancestor of another.
|
||||
|
||||
@@ -339,7 +331,9 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
|
||||
return parent_children
|
||||
|
||||
|
||||
def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str]]) -> int:
|
||||
def get_child_index(
|
||||
parent: str, child: str, parent_children: Dict[str, List[str]]
|
||||
) -> int:
|
||||
"""
|
||||
Get the index of a child node in its parent's sorted children list.
|
||||
|
||||
@@ -360,3 +354,23 @@ def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str
|
||||
children = parent_children.get(parent, [])
|
||||
children.sort()
|
||||
return children.index(child)
|
||||
|
||||
|
||||
def process_router_paths(flow, current, current_level, levels, queue):
|
||||
"""
|
||||
Handle the router connections for the current node.
|
||||
"""
|
||||
if current in flow._routers:
|
||||
paths = flow._router_paths.get(current, [])
|
||||
for path in paths:
|
||||
for listener_name, (
|
||||
condition_type,
|
||||
trigger_methods,
|
||||
) in flow._listeners.items():
|
||||
if path in trigger_methods:
|
||||
if (
|
||||
listener_name not in levels
|
||||
or levels[listener_name] > current_level + 1
|
||||
):
|
||||
levels[listener_name] = current_level + 1
|
||||
queue.append(listener_name)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -6,37 +5,31 @@ import sys
|
||||
import threading
|
||||
import warnings
|
||||
from contextlib import contextmanager
|
||||
from typing import (
|
||||
Any,
|
||||
Dict,
|
||||
List,
|
||||
Literal,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
LLMCallType,
|
||||
)
|
||||
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", UserWarning)
|
||||
import litellm
|
||||
from litellm import Choices, get_supported_openai_params
|
||||
from litellm import Choices
|
||||
from litellm.types.utils import ModelResponse
|
||||
from litellm.utils import supports_response_schema
|
||||
from litellm.utils import get_supported_openai_params, supports_response_schema
|
||||
|
||||
|
||||
from crewai.traces.unified_trace_controller import trace_llm_call
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.exceptions.context_window_exceeding_exception import (
|
||||
LLMContextLengthExceededException,
|
||||
)
|
||||
from crewai.utilities.protocols import AgentExecutorProtocol
|
||||
|
||||
load_dotenv()
|
||||
|
||||
@@ -71,6 +64,7 @@ LLM_CONTEXT_WINDOW_SIZES = {
|
||||
"gpt-4-turbo": 128000,
|
||||
"o1-preview": 128000,
|
||||
"o1-mini": 128000,
|
||||
"o3-mini": 200000, # Based on official o3-mini specifications
|
||||
# gemini
|
||||
"gemini-2.0-flash": 1048576,
|
||||
"gemini-1.5-pro": 2097152,
|
||||
@@ -180,7 +174,6 @@ class LLM:
|
||||
self.context_window_size = 0
|
||||
self.reasoning_effort = reasoning_effort
|
||||
self.additional_params = kwargs
|
||||
self._message_history: List[Dict[str, str]] = []
|
||||
self.is_anthropic = self._is_anthropic_model(model)
|
||||
|
||||
litellm.drop_params = True
|
||||
@@ -196,12 +189,6 @@ class LLM:
|
||||
self.set_callbacks(callbacks)
|
||||
self.set_env_callbacks()
|
||||
|
||||
@trace_llm_call
|
||||
def _call_llm(self, params: Dict[str, Any]) -> Any:
|
||||
with suppress_warnings():
|
||||
response = litellm.completion(**params)
|
||||
return response
|
||||
|
||||
def _is_anthropic_model(self, model: str) -> bool:
|
||||
"""Determine if the model is from Anthropic provider.
|
||||
|
||||
@@ -259,6 +246,15 @@ class LLM:
|
||||
>>> print(response)
|
||||
"The capital of France is Paris."
|
||||
"""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallStartedEvent(
|
||||
messages=messages,
|
||||
tools=tools,
|
||||
callbacks=callbacks,
|
||||
available_functions=available_functions,
|
||||
),
|
||||
)
|
||||
# Validate parameters before proceeding with the call.
|
||||
self._validate_call_params()
|
||||
|
||||
@@ -311,7 +307,7 @@ class LLM:
|
||||
params = {k: v for k, v in params.items() if v is not None}
|
||||
|
||||
# --- 2) Make the completion call
|
||||
response = self._call_llm(params)
|
||||
response = litellm.completion(**params)
|
||||
response_message = cast(Choices, cast(ModelResponse, response).choices)[
|
||||
0
|
||||
].message
|
||||
@@ -333,12 +329,13 @@ class LLM:
|
||||
|
||||
# --- 4) If no tool calls, return the text response
|
||||
if not tool_calls or not available_functions:
|
||||
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL)
|
||||
return text_response
|
||||
|
||||
# --- 5) Handle the tool call
|
||||
tool_call = tool_calls[0]
|
||||
function_name = tool_call.function.name
|
||||
print("function_name", function_name)
|
||||
|
||||
if function_name in available_functions:
|
||||
try:
|
||||
function_args = json.loads(tool_call.function.arguments)
|
||||
@@ -350,6 +347,7 @@ class LLM:
|
||||
try:
|
||||
# Call the actual tool function
|
||||
result = fn(**function_args)
|
||||
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
@@ -365,6 +363,12 @@ class LLM:
|
||||
error=str(e),
|
||||
),
|
||||
)
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallFailedEvent(
|
||||
error=f"Tool execution error: {str(e)}"
|
||||
),
|
||||
)
|
||||
return text_response
|
||||
|
||||
else:
|
||||
@@ -374,12 +378,28 @@ class LLM:
|
||||
return text_response
|
||||
|
||||
except Exception as e:
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallFailedEvent(error=str(e)),
|
||||
)
|
||||
if not LLMContextLengthExceededException(
|
||||
str(e)
|
||||
)._is_context_limit_error(str(e)):
|
||||
logging.error(f"LiteLLM call failed: {str(e)}")
|
||||
raise
|
||||
|
||||
def _handle_emit_call_events(self, response: Any, call_type: LLMCallType):
|
||||
"""Handle the events for the LLM call.
|
||||
|
||||
Args:
|
||||
response (str): The response from the LLM call.
|
||||
call_type (str): The type of call, either "tool_call" or "llm_call".
|
||||
"""
|
||||
crewai_event_bus.emit(
|
||||
self,
|
||||
event=LLMCallCompletedEvent(response=response, call_type=call_type),
|
||||
)
|
||||
|
||||
def _format_messages_for_provider(
|
||||
self, messages: List[Dict[str, str]]
|
||||
) -> List[Dict[str, str]]:
|
||||
@@ -449,7 +469,7 @@ class LLM:
|
||||
def supports_function_calling(self) -> bool:
|
||||
try:
|
||||
params = get_supported_openai_params(model=self.model)
|
||||
return "response_format" in params
|
||||
return params is not None and "tools" in params
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to get supported params: {str(e)}")
|
||||
return False
|
||||
@@ -457,7 +477,7 @@ class LLM:
|
||||
def supports_stop_words(self) -> bool:
|
||||
try:
|
||||
params = get_supported_openai_params(model=self.model)
|
||||
return "stop" in params
|
||||
return params is not None and "stop" in params
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to get supported params: {str(e)}")
|
||||
return False
|
||||
@@ -466,10 +486,23 @@ class LLM:
|
||||
"""
|
||||
Returns the context window size, using 75% of the maximum to avoid
|
||||
cutting off messages mid-thread.
|
||||
|
||||
Raises:
|
||||
ValueError: If a model's context window size is outside valid bounds (1024-2097152)
|
||||
"""
|
||||
if self.context_window_size != 0:
|
||||
return self.context_window_size
|
||||
|
||||
MIN_CONTEXT = 1024
|
||||
MAX_CONTEXT = 2097152 # Current max from gemini-1.5-pro
|
||||
|
||||
# Validate all context window sizes
|
||||
for key, value in LLM_CONTEXT_WINDOW_SIZES.items():
|
||||
if value < MIN_CONTEXT or value > MAX_CONTEXT:
|
||||
raise ValueError(
|
||||
f"Context window for {key} must be between {MIN_CONTEXT} and {MAX_CONTEXT}"
|
||||
)
|
||||
|
||||
self.context_window_size = int(
|
||||
DEFAULT_CONTEXT_WINDOW_SIZE * CONTEXT_WINDOW_USAGE_RATIO
|
||||
)
|
||||
@@ -531,95 +564,3 @@ class LLM:
|
||||
|
||||
litellm.success_callback = success_callbacks
|
||||
litellm.failure_callback = failure_callbacks
|
||||
|
||||
def _get_execution_context(self) -> Tuple[Optional[Any], Optional[Any]]:
|
||||
"""Get the agent and task from the execution context.
|
||||
|
||||
Returns:
|
||||
tuple: (agent, task) from any AgentExecutor context, or (None, None) if not found
|
||||
"""
|
||||
frame = inspect.currentframe()
|
||||
caller_frame = frame.f_back if frame else None
|
||||
agent = None
|
||||
task = None
|
||||
|
||||
# Add a maximum depth to prevent infinite loops
|
||||
max_depth = 100 # Reasonable limit for call stack depth
|
||||
current_depth = 0
|
||||
|
||||
while caller_frame and current_depth < max_depth:
|
||||
if "self" in caller_frame.f_locals:
|
||||
caller_self = caller_frame.f_locals["self"]
|
||||
if isinstance(caller_self, AgentExecutorProtocol):
|
||||
agent = caller_self.agent
|
||||
task = caller_self.task
|
||||
break
|
||||
caller_frame = caller_frame.f_back
|
||||
current_depth += 1
|
||||
|
||||
return agent, task
|
||||
|
||||
def _get_new_messages(self, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
|
||||
"""Get only the new messages that haven't been processed before."""
|
||||
if not hasattr(self, "_message_history"):
|
||||
self._message_history = []
|
||||
|
||||
new_messages = []
|
||||
for message in messages:
|
||||
message_key = (message["role"], message["content"])
|
||||
if message_key not in [
|
||||
(m["role"], m["content"]) for m in self._message_history
|
||||
]:
|
||||
new_messages.append(message)
|
||||
self._message_history.append(message)
|
||||
return new_messages
|
||||
|
||||
def _get_new_tool_results(self, agent) -> List[Dict]:
|
||||
"""Get only the new tool results that haven't been processed before."""
|
||||
if not agent or not agent.tools_results:
|
||||
return []
|
||||
|
||||
if not hasattr(self, "_tool_results_history"):
|
||||
self._tool_results_history: List[Dict] = []
|
||||
|
||||
new_tool_results = []
|
||||
|
||||
for result in agent.tools_results:
|
||||
# Process tool arguments to extract actual values
|
||||
processed_args = {}
|
||||
if isinstance(result["tool_args"], dict):
|
||||
for key, value in result["tool_args"].items():
|
||||
if isinstance(value, dict) and "type" in value:
|
||||
# Skip metadata and just store the actual value
|
||||
continue
|
||||
processed_args[key] = value
|
||||
|
||||
# Create a clean result with processed arguments
|
||||
clean_result = {
|
||||
"tool_name": result["tool_name"],
|
||||
"tool_args": processed_args,
|
||||
"result": result["result"],
|
||||
"content": result.get("content", ""),
|
||||
"start_time": result.get("start_time", ""),
|
||||
}
|
||||
|
||||
# Check if this exact tool execution exists in history
|
||||
is_duplicate = False
|
||||
for history_result in self._tool_results_history:
|
||||
if (
|
||||
clean_result["tool_name"] == history_result["tool_name"]
|
||||
and str(clean_result["tool_args"])
|
||||
== str(history_result["tool_args"])
|
||||
and str(clean_result["result"]) == str(history_result["result"])
|
||||
and clean_result["content"] == history_result.get("content", "")
|
||||
and clean_result["start_time"]
|
||||
== history_result.get("start_time", "")
|
||||
):
|
||||
is_duplicate = True
|
||||
break
|
||||
|
||||
if not is_duplicate:
|
||||
new_tool_results.append(clean_result)
|
||||
self._tool_results_history.append(clean_result)
|
||||
|
||||
return new_tool_results
|
||||
|
||||
@@ -373,6 +373,7 @@ class Task(BaseModel):
|
||||
|
||||
pydantic_output, json_output = self._export_output(result)
|
||||
task_output = TaskOutput(
|
||||
key=self.key,
|
||||
name=self.name,
|
||||
description=self.description,
|
||||
expected_output=self.expected_output,
|
||||
|
||||
@@ -9,6 +9,7 @@ from crewai.tasks.output_format import OutputFormat
|
||||
class TaskOutput(BaseModel):
|
||||
"""Class that represents the result of a task."""
|
||||
|
||||
key: Optional[str] = Field(description="Key of the task", default=None)
|
||||
description: str = Field(description="Description of the task")
|
||||
name: Optional[str] = Field(description="Name of the task", default=None)
|
||||
expected_output: Optional[str] = Field(
|
||||
|
||||
@@ -2,7 +2,6 @@ import ast
|
||||
import datetime
|
||||
import json
|
||||
import time
|
||||
from datetime import UTC
|
||||
from difflib import SequenceMatcher
|
||||
from json import JSONDecodeError
|
||||
from textwrap import dedent
|
||||
@@ -118,10 +117,7 @@ class ToolUsage:
|
||||
self._printer.print(content=f"\n\n{error}\n", color="red")
|
||||
return error
|
||||
|
||||
if (
|
||||
isinstance(tool, CrewStructuredTool)
|
||||
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
|
||||
):
|
||||
if isinstance(tool, CrewStructuredTool) and tool.name == self._i18n.tools("add_image")["name"]: # type: ignore
|
||||
try:
|
||||
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
|
||||
return result
|
||||
@@ -158,7 +154,6 @@ class ToolUsage:
|
||||
self.task.increment_tools_errors()
|
||||
|
||||
started_at = time.time()
|
||||
started_at_trace = datetime.datetime.now(UTC)
|
||||
from_cache = False
|
||||
|
||||
result = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
|
||||
@@ -186,9 +181,7 @@ class ToolUsage:
|
||||
|
||||
if calling.arguments:
|
||||
try:
|
||||
acceptable_args = tool.args_schema.model_json_schema()[
|
||||
"properties"
|
||||
].keys() # type: ignore
|
||||
acceptable_args = tool.args_schema.model_json_schema()["properties"].keys() # type: ignore
|
||||
arguments = {
|
||||
k: v
|
||||
for k, v in calling.arguments.items()
|
||||
@@ -209,7 +202,7 @@ class ToolUsage:
|
||||
error=e, tool=tool.name, tool_inputs=tool.description
|
||||
)
|
||||
error = ToolUsageErrorException(
|
||||
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
||||
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
|
||||
).message
|
||||
self.task.increment_tools_errors()
|
||||
if self.agent.verbose:
|
||||
@@ -244,7 +237,6 @@ class ToolUsage:
|
||||
"result": result,
|
||||
"tool_name": tool.name,
|
||||
"tool_args": calling.arguments,
|
||||
"start_time": started_at_trace,
|
||||
}
|
||||
|
||||
self.on_tool_use_finished(
|
||||
@@ -388,7 +380,7 @@ class ToolUsage:
|
||||
raise
|
||||
else:
|
||||
return ToolUsageErrorException(
|
||||
f"{self._i18n.errors('tool_arguments_error')}"
|
||||
f'{self._i18n.errors("tool_arguments_error")}'
|
||||
)
|
||||
|
||||
if not isinstance(arguments, dict):
|
||||
@@ -396,7 +388,7 @@ class ToolUsage:
|
||||
raise
|
||||
else:
|
||||
return ToolUsageErrorException(
|
||||
f"{self._i18n.errors('tool_arguments_error')}"
|
||||
f'{self._i18n.errors("tool_arguments_error")}'
|
||||
)
|
||||
|
||||
return ToolCalling(
|
||||
@@ -424,7 +416,7 @@ class ToolUsage:
|
||||
if self.agent.verbose:
|
||||
self._printer.print(content=f"\n\n{e}\n", color="red")
|
||||
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
|
||||
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
|
||||
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
|
||||
)
|
||||
return self._tool_calling(tool_string)
|
||||
|
||||
|
||||
@@ -1,39 +0,0 @@
|
||||
from contextlib import contextmanager
|
||||
from contextvars import ContextVar
|
||||
from typing import Generator
|
||||
|
||||
|
||||
class TraceContext:
|
||||
"""Maintains the current trace context throughout the execution stack.
|
||||
|
||||
This class provides a context manager for tracking trace execution across
|
||||
async and sync code paths using ContextVars.
|
||||
"""
|
||||
|
||||
_context: ContextVar = ContextVar("trace_context", default=None)
|
||||
|
||||
@classmethod
|
||||
def get_current(cls):
|
||||
"""Get the current trace context.
|
||||
|
||||
Returns:
|
||||
Optional[UnifiedTraceController]: The current trace controller or None if not set.
|
||||
"""
|
||||
return cls._context.get()
|
||||
|
||||
@classmethod
|
||||
@contextmanager
|
||||
def set_current(cls, trace):
|
||||
"""Set the current trace context within a context manager.
|
||||
|
||||
Args:
|
||||
trace: The trace controller to set as current.
|
||||
|
||||
Yields:
|
||||
UnifiedTraceController: The current trace controller.
|
||||
"""
|
||||
token = cls._context.set(trace)
|
||||
try:
|
||||
yield trace
|
||||
finally:
|
||||
cls._context.reset(token)
|
||||
@@ -1,19 +0,0 @@
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class TraceType(Enum):
|
||||
LLM_CALL = "llm_call"
|
||||
TOOL_CALL = "tool_call"
|
||||
FLOW_STEP = "flow_step"
|
||||
START_CALL = "start_call"
|
||||
|
||||
|
||||
class RunType(Enum):
|
||||
KICKOFF = "kickoff"
|
||||
TRAIN = "train"
|
||||
TEST = "test"
|
||||
|
||||
|
||||
class CrewType(Enum):
|
||||
CREW = "crew"
|
||||
FLOW = "flow"
|
||||
@@ -1,89 +0,0 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class ToolCall(BaseModel):
|
||||
"""Model representing a tool call during execution"""
|
||||
|
||||
name: str
|
||||
arguments: Dict[str, Any]
|
||||
output: str
|
||||
start_time: datetime
|
||||
end_time: Optional[datetime] = None
|
||||
latency_ms: Optional[int] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class LLMRequest(BaseModel):
|
||||
"""Model representing the LLM request details"""
|
||||
|
||||
model: str
|
||||
messages: List[Dict[str, str]]
|
||||
temperature: Optional[float] = None
|
||||
max_tokens: Optional[int] = None
|
||||
stop_sequences: Optional[List[str]] = None
|
||||
additional_params: Dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class LLMResponse(BaseModel):
|
||||
"""Model representing the LLM response details"""
|
||||
|
||||
content: str
|
||||
finish_reason: Optional[str] = None
|
||||
|
||||
|
||||
class FlowStepIO(BaseModel):
|
||||
"""Model representing flow step input/output details"""
|
||||
|
||||
function_name: str
|
||||
inputs: Dict[str, Any] = Field(default_factory=dict)
|
||||
outputs: Any
|
||||
metadata: Dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class CrewTrace(BaseModel):
|
||||
"""Model for tracking detailed information about LLM interactions and Flow steps"""
|
||||
|
||||
deployment_instance_id: Optional[str] = Field(
|
||||
description="ID of the deployment instance"
|
||||
)
|
||||
trace_id: str = Field(description="Unique identifier for this trace")
|
||||
run_id: str = Field(description="Identifier for the execution run")
|
||||
agent_role: Optional[str] = Field(description="Role of the agent")
|
||||
task_id: Optional[str] = Field(description="ID of the current task being executed")
|
||||
task_name: Optional[str] = Field(description="Name of the current task")
|
||||
task_description: Optional[str] = Field(
|
||||
description="Description of the current task"
|
||||
)
|
||||
trace_type: str = Field(description="Type of the trace")
|
||||
crew_type: str = Field(description="Type of the crew")
|
||||
run_type: str = Field(description="Type of the run")
|
||||
|
||||
# Timing information
|
||||
start_time: Optional[datetime] = None
|
||||
end_time: Optional[datetime] = None
|
||||
latency_ms: Optional[int] = None
|
||||
|
||||
# Request/Response for LLM calls
|
||||
request: Optional[LLMRequest] = None
|
||||
response: Optional[LLMResponse] = None
|
||||
|
||||
# Input/Output for Flow steps
|
||||
flow_step: Optional[FlowStepIO] = None
|
||||
|
||||
# Tool usage
|
||||
tool_calls: List[ToolCall] = Field(default_factory=list)
|
||||
|
||||
# Metrics
|
||||
tokens_used: Optional[int] = None
|
||||
prompt_tokens: Optional[int] = None
|
||||
completion_tokens: Optional[int] = None
|
||||
cost: Optional[float] = None
|
||||
|
||||
# Additional metadata
|
||||
status: str = "running" # running, completed, error
|
||||
error: Optional[str] = None
|
||||
metadata: Dict[str, Any] = Field(default_factory=dict)
|
||||
tags: List[str] = Field(default_factory=list)
|
||||
@@ -1,543 +0,0 @@
|
||||
import inspect
|
||||
import os
|
||||
from datetime import UTC, datetime
|
||||
from functools import wraps
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
||||
from uuid import uuid4
|
||||
|
||||
from crewai.traces.context import TraceContext
|
||||
from crewai.traces.enums import CrewType, RunType, TraceType
|
||||
from crewai.traces.models import (
|
||||
CrewTrace,
|
||||
FlowStepIO,
|
||||
LLMRequest,
|
||||
LLMResponse,
|
||||
ToolCall,
|
||||
)
|
||||
|
||||
|
||||
class UnifiedTraceController:
|
||||
"""Controls and manages trace execution and recording.
|
||||
|
||||
This class handles the lifecycle of traces including creation, execution tracking,
|
||||
and recording of results for various types of operations (LLM calls, tool calls, flow steps).
|
||||
"""
|
||||
|
||||
_task_traces: Dict[str, List["UnifiedTraceController"]] = {}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
trace_type: TraceType,
|
||||
run_type: RunType,
|
||||
crew_type: CrewType,
|
||||
run_id: str,
|
||||
deployment_instance_id: str = os.environ.get(
|
||||
"CREWAI_DEPLOYMENT_INSTANCE_ID", ""
|
||||
),
|
||||
parent_trace_id: Optional[str] = None,
|
||||
agent_role: Optional[str] = "unknown",
|
||||
task_name: Optional[str] = None,
|
||||
task_description: Optional[str] = None,
|
||||
task_id: Optional[str] = None,
|
||||
flow_step: Dict[str, Any] = {},
|
||||
tool_calls: List[ToolCall] = [],
|
||||
**context: Any,
|
||||
) -> None:
|
||||
"""Initialize a new trace controller.
|
||||
|
||||
Args:
|
||||
trace_type: Type of trace being recorded.
|
||||
run_type: Type of run being executed.
|
||||
crew_type: Type of crew executing the trace.
|
||||
run_id: Unique identifier for the run.
|
||||
deployment_instance_id: Optional deployment instance identifier.
|
||||
parent_trace_id: Optional parent trace identifier for nested traces.
|
||||
agent_role: Role of the agent executing the trace.
|
||||
task_name: Optional name of the task being executed.
|
||||
task_description: Optional description of the task.
|
||||
task_id: Optional unique identifier for the task.
|
||||
flow_step: Optional flow step information.
|
||||
tool_calls: Optional list of tool calls made during execution.
|
||||
**context: Additional context parameters.
|
||||
"""
|
||||
self.trace_id = str(uuid4())
|
||||
self.run_id = run_id
|
||||
self.parent_trace_id = parent_trace_id
|
||||
self.trace_type = trace_type
|
||||
self.run_type = run_type
|
||||
self.crew_type = crew_type
|
||||
self.context = context
|
||||
self.agent_role = agent_role
|
||||
self.task_name = task_name
|
||||
self.task_description = task_description
|
||||
self.task_id = task_id
|
||||
self.deployment_instance_id = deployment_instance_id
|
||||
self.children: List[Dict[str, Any]] = []
|
||||
self.start_time: Optional[datetime] = None
|
||||
self.end_time: Optional[datetime] = None
|
||||
self.error: Optional[str] = None
|
||||
self.tool_calls = tool_calls
|
||||
self.flow_step = flow_step
|
||||
self.status: str = "running"
|
||||
|
||||
# Add trace to task's trace collection if task_id is present
|
||||
if task_id:
|
||||
self._add_to_task_traces()
|
||||
|
||||
def _add_to_task_traces(self) -> None:
|
||||
"""Add this trace to the task's trace collection."""
|
||||
if not hasattr(UnifiedTraceController, "_task_traces"):
|
||||
UnifiedTraceController._task_traces = {}
|
||||
|
||||
if self.task_id is None:
|
||||
return
|
||||
|
||||
if self.task_id not in UnifiedTraceController._task_traces:
|
||||
UnifiedTraceController._task_traces[self.task_id] = []
|
||||
|
||||
UnifiedTraceController._task_traces[self.task_id].append(self)
|
||||
|
||||
@classmethod
|
||||
def get_task_traces(cls, task_id: str) -> List["UnifiedTraceController"]:
|
||||
"""Get all traces for a specific task.
|
||||
|
||||
Args:
|
||||
task_id: The ID of the task to get traces for
|
||||
|
||||
Returns:
|
||||
List of traces associated with the task
|
||||
"""
|
||||
return cls._task_traces.get(task_id, [])
|
||||
|
||||
@classmethod
|
||||
def clear_task_traces(cls, task_id: str) -> None:
|
||||
"""Clear traces for a specific task.
|
||||
|
||||
Args:
|
||||
task_id: The ID of the task to clear traces for
|
||||
"""
|
||||
if hasattr(cls, "_task_traces") and task_id in cls._task_traces:
|
||||
del cls._task_traces[task_id]
|
||||
|
||||
def _get_current_trace(self) -> "UnifiedTraceController":
|
||||
return TraceContext.get_current()
|
||||
|
||||
def start_trace(self) -> "UnifiedTraceController":
|
||||
"""Start the trace execution.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: Self for method chaining.
|
||||
"""
|
||||
self.start_time = datetime.now(UTC)
|
||||
return self
|
||||
|
||||
def end_trace(self, result: Any = None, error: Optional[str] = None) -> None:
|
||||
"""End the trace execution and record results.
|
||||
|
||||
Args:
|
||||
result: Optional result from the trace execution.
|
||||
error: Optional error message if the trace failed.
|
||||
"""
|
||||
self.end_time = datetime.now(UTC)
|
||||
self.status = "error" if error else "completed"
|
||||
self.error = error
|
||||
self._record_trace(result)
|
||||
|
||||
def add_child_trace(self, child_trace: Dict[str, Any]) -> None:
|
||||
"""Add a child trace to this trace's execution history.
|
||||
|
||||
Args:
|
||||
child_trace: The child trace information to add.
|
||||
"""
|
||||
self.children.append(child_trace)
|
||||
|
||||
def to_crew_trace(self) -> CrewTrace:
|
||||
"""Convert to CrewTrace format for storage.
|
||||
|
||||
Returns:
|
||||
CrewTrace: The trace data in CrewTrace format.
|
||||
"""
|
||||
latency_ms = None
|
||||
|
||||
if self.tool_calls and hasattr(self.tool_calls[0], "start_time"):
|
||||
self.start_time = self.tool_calls[0].start_time
|
||||
|
||||
if self.start_time and self.end_time:
|
||||
latency_ms = int((self.end_time - self.start_time).total_seconds() * 1000)
|
||||
|
||||
request = None
|
||||
response = None
|
||||
flow_step_obj = None
|
||||
|
||||
if self.trace_type in [TraceType.LLM_CALL, TraceType.TOOL_CALL]:
|
||||
request = LLMRequest(
|
||||
model=self.context.get("model", "unknown"),
|
||||
messages=self.context.get("messages", []),
|
||||
temperature=self.context.get("temperature"),
|
||||
max_tokens=self.context.get("max_tokens"),
|
||||
stop_sequences=self.context.get("stop_sequences"),
|
||||
)
|
||||
if "response" in self.context:
|
||||
response = LLMResponse(
|
||||
content=self.context["response"].get("content", ""),
|
||||
finish_reason=self.context["response"].get("finish_reason"),
|
||||
)
|
||||
|
||||
elif self.trace_type == TraceType.FLOW_STEP:
|
||||
flow_step_obj = FlowStepIO(
|
||||
function_name=self.flow_step.get("function_name", "unknown"),
|
||||
inputs=self.flow_step.get("inputs", {}),
|
||||
outputs={"result": self.context.get("response")},
|
||||
metadata=self.flow_step.get("metadata", {}),
|
||||
)
|
||||
|
||||
return CrewTrace(
|
||||
deployment_instance_id=self.deployment_instance_id,
|
||||
trace_id=self.trace_id,
|
||||
task_id=self.task_id,
|
||||
run_id=self.run_id,
|
||||
agent_role=self.agent_role,
|
||||
task_name=self.task_name,
|
||||
task_description=self.task_description,
|
||||
trace_type=self.trace_type.value,
|
||||
crew_type=self.crew_type.value,
|
||||
run_type=self.run_type.value,
|
||||
start_time=self.start_time,
|
||||
end_time=self.end_time,
|
||||
latency_ms=latency_ms,
|
||||
request=request,
|
||||
response=response,
|
||||
flow_step=flow_step_obj,
|
||||
tool_calls=self.tool_calls,
|
||||
tokens_used=self.context.get("tokens_used"),
|
||||
prompt_tokens=self.context.get("prompt_tokens"),
|
||||
completion_tokens=self.context.get("completion_tokens"),
|
||||
status=self.status,
|
||||
error=self.error,
|
||||
)
|
||||
|
||||
def _record_trace(self, result: Any = None) -> None:
|
||||
"""Record the trace.
|
||||
|
||||
This method is called when a trace is completed. It ensures the trace
|
||||
is properly recorded and associated with its task if applicable.
|
||||
|
||||
Args:
|
||||
result: Optional result to include in the trace
|
||||
"""
|
||||
if result:
|
||||
self.context["response"] = result
|
||||
|
||||
# Add to task traces if this trace belongs to a task
|
||||
if self.task_id:
|
||||
self._add_to_task_traces()
|
||||
|
||||
|
||||
def should_trace() -> bool:
|
||||
"""Check if tracing is enabled via environment variable."""
|
||||
return os.getenv("CREWAI_ENABLE_TRACING", "false").lower() == "true"
|
||||
|
||||
|
||||
# Crew main trace
|
||||
def init_crew_main_trace(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||
"""Decorator to initialize and track the main crew execution trace.
|
||||
|
||||
This decorator sets up the trace context for the main crew execution,
|
||||
handling both synchronous and asynchronous crew operations.
|
||||
|
||||
Args:
|
||||
func: The crew function to be traced.
|
||||
|
||||
Returns:
|
||||
Wrapped function that creates and manages the main crew trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
if not should_trace():
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
trace = build_crew_main_trace(self)
|
||||
with TraceContext.set_current(trace):
|
||||
try:
|
||||
return func(self, *args, **kwargs)
|
||||
except Exception as e:
|
||||
trace.end_trace(error=str(e))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_crew_main_trace(self: Any) -> "UnifiedTraceController":
|
||||
"""Build the main trace controller for a crew execution.
|
||||
|
||||
This function creates a trace controller configured for the main crew execution,
|
||||
handling different run types (kickoff, test, train) and maintaining context.
|
||||
|
||||
Args:
|
||||
self: The crew instance.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the crew.
|
||||
"""
|
||||
run_type = RunType.KICKOFF
|
||||
if hasattr(self, "_test") and self._test:
|
||||
run_type = RunType.TEST
|
||||
elif hasattr(self, "_train") and self._train:
|
||||
run_type = RunType.TRAIN
|
||||
|
||||
current_trace = TraceContext.get_current()
|
||||
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=run_type,
|
||||
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
|
||||
run_id=current_trace.run_id if current_trace else str(self.id),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
)
|
||||
return trace
|
||||
|
||||
|
||||
# Flow main trace
|
||||
def init_flow_main_trace(
|
||||
func: Callable[..., Awaitable[Any]],
|
||||
) -> Callable[..., Awaitable[Any]]:
|
||||
"""Decorator to initialize and track the main flow execution trace.
|
||||
|
||||
Args:
|
||||
func: The async flow function to be traced.
|
||||
|
||||
Returns:
|
||||
Wrapped async function that creates and manages the main flow trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
if not should_trace():
|
||||
return await func(self, *args, **kwargs)
|
||||
|
||||
trace = build_flow_main_trace(self, *args, **kwargs)
|
||||
with TraceContext.set_current(trace):
|
||||
try:
|
||||
return await func(self, *args, **kwargs)
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_flow_main_trace(
|
||||
self: Any, *args: Any, **kwargs: Any
|
||||
) -> "UnifiedTraceController":
|
||||
"""Build the main trace controller for a flow execution.
|
||||
|
||||
Args:
|
||||
self: The flow instance.
|
||||
*args: Variable positional arguments.
|
||||
**kwargs: Variable keyword arguments.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the flow.
|
||||
"""
|
||||
current_trace = TraceContext.get_current()
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_id=current_trace.run_id if current_trace else str(self.flow_id),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
crew_type=CrewType.FLOW,
|
||||
run_type=RunType.KICKOFF,
|
||||
context={
|
||||
"crew_name": self.__class__.__name__,
|
||||
"inputs": kwargs.get("inputs", {}),
|
||||
"agents": [],
|
||||
"tasks": [],
|
||||
},
|
||||
)
|
||||
return trace
|
||||
|
||||
|
||||
# Flow step trace
|
||||
def trace_flow_step(
|
||||
func: Callable[..., Awaitable[Any]],
|
||||
) -> Callable[..., Awaitable[Any]]:
|
||||
"""Decorator to trace individual flow step executions.
|
||||
|
||||
Args:
|
||||
func: The async flow step function to be traced.
|
||||
|
||||
Returns:
|
||||
Wrapped async function that creates and manages the flow step trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(
|
||||
self: Any,
|
||||
method_name: str,
|
||||
method: Callable[..., Any],
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
if not should_trace():
|
||||
return await func(self, method_name, method, *args, **kwargs)
|
||||
|
||||
trace = build_flow_step_trace(self, method_name, method, *args, **kwargs)
|
||||
with TraceContext.set_current(trace):
|
||||
trace.start_trace()
|
||||
try:
|
||||
result = await func(self, method_name, method, *args, **kwargs)
|
||||
trace.end_trace(result=result)
|
||||
return result
|
||||
except Exception as e:
|
||||
trace.end_trace(error=str(e))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_flow_step_trace(
|
||||
self: Any, method_name: str, method: Callable[..., Any], *args: Any, **kwargs: Any
|
||||
) -> "UnifiedTraceController":
|
||||
"""Build a trace controller for an individual flow step.
|
||||
|
||||
Args:
|
||||
self: The flow instance.
|
||||
method_name: Name of the method being executed.
|
||||
method: The actual method being executed.
|
||||
*args: Variable positional arguments.
|
||||
**kwargs: Variable keyword arguments.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the flow step.
|
||||
"""
|
||||
current_trace = TraceContext.get_current()
|
||||
|
||||
# Get method signature
|
||||
sig = inspect.signature(method)
|
||||
params = list(sig.parameters.values())
|
||||
|
||||
# Create inputs dictionary mapping parameter names to values
|
||||
method_params = [p for p in params if p.name != "self"]
|
||||
inputs: Dict[str, Any] = {}
|
||||
|
||||
# Map positional args to their parameter names
|
||||
for i, param in enumerate(method_params):
|
||||
if i < len(args):
|
||||
inputs[param.name] = args[i]
|
||||
|
||||
# Add keyword arguments
|
||||
inputs.update(kwargs)
|
||||
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
|
||||
crew_type=current_trace.crew_type if current_trace else CrewType.FLOW,
|
||||
run_id=current_trace.run_id if current_trace else str(self.flow_id),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
flow_step={
|
||||
"function_name": method_name,
|
||||
"inputs": inputs,
|
||||
"metadata": {
|
||||
"crew_name": self.__class__.__name__,
|
||||
},
|
||||
},
|
||||
)
|
||||
return trace
|
||||
|
||||
|
||||
# LLM trace
|
||||
def trace_llm_call(func: Callable[..., Any]) -> Callable[..., Any]:
|
||||
"""Decorator to trace LLM calls.
|
||||
|
||||
Args:
|
||||
func: The function to trace.
|
||||
|
||||
Returns:
|
||||
Wrapped function that creates and manages the LLM call trace context.
|
||||
"""
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
|
||||
if not should_trace():
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
trace = build_llm_trace(self, *args, **kwargs)
|
||||
with TraceContext.set_current(trace):
|
||||
trace.start_trace()
|
||||
try:
|
||||
response = func(self, *args, **kwargs)
|
||||
# Extract relevant data from response
|
||||
trace_response = {
|
||||
"content": response["choices"][0]["message"]["content"],
|
||||
"finish_reason": response["choices"][0].get("finish_reason"),
|
||||
}
|
||||
|
||||
# Add usage metrics to context
|
||||
if "usage" in response:
|
||||
trace.context["tokens_used"] = response["usage"].get(
|
||||
"total_tokens", 0
|
||||
)
|
||||
trace.context["prompt_tokens"] = response["usage"].get(
|
||||
"prompt_tokens", 0
|
||||
)
|
||||
trace.context["completion_tokens"] = response["usage"].get(
|
||||
"completion_tokens", 0
|
||||
)
|
||||
|
||||
trace.end_trace(trace_response)
|
||||
return response
|
||||
except Exception as e:
|
||||
trace.end_trace(error=str(e))
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def build_llm_trace(
|
||||
self: Any, params: Dict[str, Any], *args: Any, **kwargs: Any
|
||||
) -> Any:
|
||||
"""Build a trace controller for an LLM call.
|
||||
|
||||
Args:
|
||||
self: The LLM instance.
|
||||
params: The parameters for the LLM call.
|
||||
*args: Variable positional arguments.
|
||||
**kwargs: Variable keyword arguments.
|
||||
|
||||
Returns:
|
||||
UnifiedTraceController: The configured trace controller for the LLM call.
|
||||
"""
|
||||
current_trace = TraceContext.get_current()
|
||||
agent, task = self._get_execution_context()
|
||||
|
||||
# Get new messages and tool results
|
||||
new_messages = self._get_new_messages(params.get("messages", []))
|
||||
new_tool_results = self._get_new_tool_results(agent)
|
||||
|
||||
# Create trace context
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.TOOL_CALL if new_tool_results else TraceType.LLM_CALL,
|
||||
crew_type=current_trace.crew_type if current_trace else CrewType.CREW,
|
||||
run_type=current_trace.run_type if current_trace else RunType.KICKOFF,
|
||||
run_id=current_trace.run_id if current_trace else str(uuid4()),
|
||||
parent_trace_id=current_trace.trace_id if current_trace else None,
|
||||
agent_role=agent.role if agent else "unknown",
|
||||
task_id=str(task.id) if task else None,
|
||||
task_name=task.name if task else None,
|
||||
task_description=task.description if task else None,
|
||||
model=self.model,
|
||||
messages=new_messages,
|
||||
temperature=self.temperature,
|
||||
max_tokens=self.max_tokens,
|
||||
stop_sequences=self.stop,
|
||||
tool_calls=[
|
||||
ToolCall(
|
||||
name=result["tool_name"],
|
||||
arguments=result["tool_args"],
|
||||
output=str(result["result"]),
|
||||
start_time=result.get("start_time", ""),
|
||||
end_time=datetime.now(UTC),
|
||||
)
|
||||
for result in new_tool_results
|
||||
],
|
||||
)
|
||||
return trace
|
||||
@@ -39,8 +39,8 @@
|
||||
"validation_error": "### Previous attempt failed validation: {guardrail_result_error}\n\n\n### Previous result:\n{task_output}\n\n\nTry again, making sure to address the validation error."
|
||||
},
|
||||
"tools": {
|
||||
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
|
||||
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolute everything you know, don't reference things but instead explain them.",
|
||||
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolutely everything you know, don't reference things but instead explain them.",
|
||||
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolutely everything you know, don't reference things but instead explain them.",
|
||||
"add_image": {
|
||||
"name": "Add image to content",
|
||||
"description": "See image to understand its content, you can optionally ask a question about the image",
|
||||
|
||||
@@ -20,11 +20,11 @@ class ConverterError(Exception):
|
||||
class Converter(OutputConverter):
|
||||
"""Class that converts text into either pydantic or json."""
|
||||
|
||||
def to_pydantic(self, current_attempt=1):
|
||||
def to_pydantic(self, current_attempt=1) -> BaseModel:
|
||||
"""Convert text to pydantic."""
|
||||
try:
|
||||
if self.llm.supports_function_calling():
|
||||
return self._create_instructor().to_pydantic()
|
||||
result = self._create_instructor().to_pydantic()
|
||||
else:
|
||||
response = self.llm.call(
|
||||
[
|
||||
@@ -32,18 +32,40 @@ class Converter(OutputConverter):
|
||||
{"role": "user", "content": self.text},
|
||||
]
|
||||
)
|
||||
return self.model.model_validate_json(response)
|
||||
try:
|
||||
# Try to directly validate the response JSON
|
||||
result = self.model.model_validate_json(response)
|
||||
except ValidationError:
|
||||
# If direct validation fails, attempt to extract valid JSON
|
||||
result = handle_partial_json(response, self.model, False, None)
|
||||
# Ensure result is a BaseModel instance
|
||||
if not isinstance(result, BaseModel):
|
||||
if isinstance(result, dict):
|
||||
result = self.model.parse_obj(result)
|
||||
elif isinstance(result, str):
|
||||
try:
|
||||
parsed = json.loads(result)
|
||||
result = self.model.parse_obj(parsed)
|
||||
except Exception as parse_err:
|
||||
raise ConverterError(
|
||||
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
|
||||
)
|
||||
else:
|
||||
raise ConverterError(
|
||||
"handle_partial_json returned an unexpected type."
|
||||
)
|
||||
return result
|
||||
except ValidationError as e:
|
||||
if current_attempt < self.max_attempts:
|
||||
return self.to_pydantic(current_attempt + 1)
|
||||
raise ConverterError(
|
||||
f"Failed to convert text into a Pydantic model due to the following validation error: {e}"
|
||||
f"Failed to convert text into a Pydantic model due to validation error: {e}"
|
||||
)
|
||||
except Exception as e:
|
||||
if current_attempt < self.max_attempts:
|
||||
return self.to_pydantic(current_attempt + 1)
|
||||
raise ConverterError(
|
||||
f"Failed to convert text into a Pydantic model due to the following error: {e}"
|
||||
f"Failed to convert text into a Pydantic model due to error: {e}"
|
||||
)
|
||||
|
||||
def to_json(self, current_attempt=1):
|
||||
@@ -197,11 +219,15 @@ def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
|
||||
if llm.supports_function_calling():
|
||||
model_schema = PydanticSchemaParser(model=model).get_schema()
|
||||
instructions += (
|
||||
f"\n\nThe JSON should follow this schema:\n```json\n{model_schema}\n```"
|
||||
f"\n\nOutput ONLY the valid JSON and nothing else.\n\n"
|
||||
f"The JSON must follow this schema exactly:\n```json\n{model_schema}\n```"
|
||||
)
|
||||
else:
|
||||
model_description = generate_model_description(model)
|
||||
instructions += f"\n\nThe JSON should follow this format:\n{model_description}"
|
||||
instructions += (
|
||||
f"\n\nOutput ONLY the valid JSON and nothing else.\n\n"
|
||||
f"The JSON must follow this format exactly:\n{model_description}"
|
||||
)
|
||||
return instructions
|
||||
|
||||
|
||||
|
||||
@@ -34,6 +34,7 @@ from .tool_usage_events import (
|
||||
ToolUsageEvent,
|
||||
ToolValidateInputErrorEvent,
|
||||
)
|
||||
from .llm_events import LLMCallCompletedEvent, LLMCallFailedEvent, LLMCallStartedEvent
|
||||
|
||||
# events
|
||||
from .event_listener import EventListener
|
||||
|
||||
@@ -1,9 +1,17 @@
|
||||
from pydantic import PrivateAttr
|
||||
from typing import Any, Dict
|
||||
|
||||
from pydantic import Field, PrivateAttr
|
||||
|
||||
from crewai.task import Task
|
||||
from crewai.telemetry.telemetry import Telemetry
|
||||
from crewai.utilities import Logger
|
||||
from crewai.utilities.constants import EMITTER_COLOR
|
||||
from crewai.utilities.events.base_event_listener import BaseEventListener
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
)
|
||||
|
||||
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
|
||||
from .crew_events import (
|
||||
@@ -37,6 +45,7 @@ class EventListener(BaseEventListener):
|
||||
_instance = None
|
||||
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
|
||||
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
|
||||
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
@@ -49,6 +58,7 @@ class EventListener(BaseEventListener):
|
||||
super().__init__()
|
||||
self._telemetry = Telemetry()
|
||||
self._telemetry.set_tracer()
|
||||
self.execution_spans = {}
|
||||
self._initialized = True
|
||||
|
||||
# ----------- CREW EVENTS -----------
|
||||
@@ -57,7 +67,7 @@ class EventListener(BaseEventListener):
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def on_crew_started(source, event: CrewKickoffStartedEvent):
|
||||
self.logger.log(
|
||||
f"🚀 Crew '{event.crew_name}' started",
|
||||
f"🚀 Crew '{event.crew_name}' started, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
self._telemetry.crew_execution_span(source, event.inputs)
|
||||
@@ -67,28 +77,28 @@ class EventListener(BaseEventListener):
|
||||
final_string_output = event.output.raw
|
||||
self._telemetry.end_crew(source, final_string_output)
|
||||
self.logger.log(
|
||||
f"✅ Crew '{event.crew_name}' completed",
|
||||
f"✅ Crew '{event.crew_name}' completed, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffFailedEvent)
|
||||
def on_crew_failed(source, event: CrewKickoffFailedEvent):
|
||||
self.logger.log(
|
||||
f"❌ Crew '{event.crew_name}' failed",
|
||||
f"❌ Crew '{event.crew_name}' failed, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(CrewTestStartedEvent)
|
||||
def on_crew_test_started(source, event: CrewTestStartedEvent):
|
||||
cloned_crew = source.copy()
|
||||
cloned_crew._telemetry.test_execution_span(
|
||||
self._telemetry.test_execution_span(
|
||||
cloned_crew,
|
||||
event.n_iterations,
|
||||
event.inputs,
|
||||
event.eval_llm,
|
||||
event.eval_llm or "",
|
||||
)
|
||||
self.logger.log(
|
||||
f"🚀 Crew '{event.crew_name}' started test",
|
||||
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@@ -131,9 +141,9 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(TaskStartedEvent)
|
||||
def on_task_started(source, event: TaskStartedEvent):
|
||||
source._execution_span = self._telemetry.task_started(
|
||||
crew=source.agent.crew, task=source
|
||||
)
|
||||
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
|
||||
self.execution_spans[source] = span
|
||||
|
||||
self.logger.log(
|
||||
f"📋 Task started: {source.description}",
|
||||
event.timestamp,
|
||||
@@ -141,24 +151,22 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(TaskCompletedEvent)
|
||||
def on_task_completed(source, event: TaskCompletedEvent):
|
||||
if source._execution_span:
|
||||
self._telemetry.task_ended(
|
||||
source._execution_span, source, source.agent.crew
|
||||
)
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.logger.log(
|
||||
f"✅ Task completed: {source.description}",
|
||||
event.timestamp,
|
||||
)
|
||||
source._execution_span = None
|
||||
self.execution_spans[source] = None
|
||||
|
||||
@crewai_event_bus.on(TaskFailedEvent)
|
||||
def on_task_failed(source, event: TaskFailedEvent):
|
||||
if source._execution_span:
|
||||
span = self.execution_spans.get(source)
|
||||
if span:
|
||||
if source.agent and source.agent.crew:
|
||||
self._telemetry.task_ended(
|
||||
source._execution_span, source, source.agent.crew
|
||||
)
|
||||
source._execution_span = None
|
||||
self._telemetry.task_ended(span, source, source.agent.crew)
|
||||
self.execution_spans[source] = None
|
||||
self.logger.log(
|
||||
f"❌ Task failed: {source.description}",
|
||||
event.timestamp,
|
||||
@@ -184,7 +192,7 @@ class EventListener(BaseEventListener):
|
||||
|
||||
@crewai_event_bus.on(FlowCreatedEvent)
|
||||
def on_flow_created(source, event: FlowCreatedEvent):
|
||||
self._telemetry.flow_creation_span(self.__class__.__name__)
|
||||
self._telemetry.flow_creation_span(event.flow_name)
|
||||
self.logger.log(
|
||||
f"🌊 Flow Created: '{event.flow_name}'",
|
||||
event.timestamp,
|
||||
@@ -193,17 +201,17 @@ class EventListener(BaseEventListener):
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def on_flow_started(source, event: FlowStartedEvent):
|
||||
self._telemetry.flow_execution_span(
|
||||
source.__class__.__name__, list(source._methods.keys())
|
||||
event.flow_name, list(source._methods.keys())
|
||||
)
|
||||
self.logger.log(
|
||||
f"🤖 Flow Started: '{event.flow_name}'",
|
||||
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(FlowFinishedEvent)
|
||||
def on_flow_finished(source, event: FlowFinishedEvent):
|
||||
self.logger.log(
|
||||
f"👍 Flow Finished: '{event.flow_name}'",
|
||||
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@@ -253,5 +261,28 @@ class EventListener(BaseEventListener):
|
||||
#
|
||||
)
|
||||
|
||||
# ----------- LLM EVENTS -----------
|
||||
|
||||
@crewai_event_bus.on(LLMCallStartedEvent)
|
||||
def on_llm_call_started(source, event: LLMCallStartedEvent):
|
||||
self.logger.log(
|
||||
f"🤖 LLM Call Started",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(LLMCallCompletedEvent)
|
||||
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
|
||||
self.logger.log(
|
||||
f"✅ LLM Call Completed",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(LLMCallFailedEvent)
|
||||
def on_llm_call_failed(source, event: LLMCallFailedEvent):
|
||||
self.logger.log(
|
||||
f"❌ LLM Call Failed: '{event.error}'",
|
||||
event.timestamp,
|
||||
)
|
||||
|
||||
|
||||
event_listener = EventListener()
|
||||
|
||||
36
src/crewai/utilities/events/llm_events.py
Normal file
36
src/crewai/utilities/events/llm_events.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from crewai.utilities.events.base_events import CrewEvent
|
||||
|
||||
|
||||
class LLMCallType(Enum):
|
||||
"""Type of LLM call being made"""
|
||||
|
||||
TOOL_CALL = "tool_call"
|
||||
LLM_CALL = "llm_call"
|
||||
|
||||
|
||||
class LLMCallStartedEvent(CrewEvent):
|
||||
"""Event emitted when a LLM call starts"""
|
||||
|
||||
type: str = "llm_call_started"
|
||||
messages: Union[str, List[Dict[str, str]]]
|
||||
tools: Optional[List[dict]] = None
|
||||
callbacks: Optional[List[Any]] = None
|
||||
available_functions: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
class LLMCallCompletedEvent(CrewEvent):
|
||||
"""Event emitted when a LLM call completes"""
|
||||
|
||||
type: str = "llm_call_completed"
|
||||
response: Any
|
||||
call_type: LLMCallType
|
||||
|
||||
|
||||
class LLMCallFailedEvent(CrewEvent):
|
||||
"""Event emitted when a LLM call fails"""
|
||||
|
||||
error: str
|
||||
type: str = "llm_call_failed"
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Any, Optional
|
||||
from typing import Optional
|
||||
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.utilities.events.base_events import CrewEvent
|
||||
|
||||
@@ -44,6 +44,7 @@ def create_llm(
|
||||
# Extract attributes with explicit types
|
||||
model = (
|
||||
getattr(llm_value, "model_name", None)
|
||||
or getattr(llm_value, "model", None)
|
||||
or getattr(llm_value, "deployment_name", None)
|
||||
or str(llm_value)
|
||||
)
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
from typing import Any, Protocol, runtime_checkable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class AgentExecutorProtocol(Protocol):
|
||||
"""Protocol defining the expected interface for an agent executor."""
|
||||
|
||||
@property
|
||||
def agent(self) -> Any: ...
|
||||
|
||||
@property
|
||||
def task(self) -> Any: ...
|
||||
@@ -30,8 +30,14 @@ class TokenCalcHandler(CustomLogger):
|
||||
if hasattr(usage, "prompt_tokens"):
|
||||
self.token_cost_process.sum_prompt_tokens(usage.prompt_tokens)
|
||||
if hasattr(usage, "completion_tokens"):
|
||||
self.token_cost_process.sum_completion_tokens(usage.completion_tokens)
|
||||
if hasattr(usage, "prompt_tokens_details") and usage.prompt_tokens_details:
|
||||
self.token_cost_process.sum_completion_tokens(
|
||||
usage.completion_tokens
|
||||
)
|
||||
if (
|
||||
hasattr(usage, "prompt_tokens_details")
|
||||
and usage.prompt_tokens_details
|
||||
and usage.prompt_tokens_details.cached_tokens
|
||||
):
|
||||
self.token_cost_process.sum_cached_prompt_tokens(
|
||||
usage.prompt_tokens_details.cached_tokens
|
||||
)
|
||||
|
||||
@@ -915,8 +915,6 @@ def test_tool_result_as_answer_is_the_final_answer_for_the_agent():
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_tool_usage_information_is_appended_to_agent():
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from crewai.tools import BaseTool
|
||||
|
||||
class MyCustomTool(BaseTool):
|
||||
@@ -926,36 +924,30 @@ def test_tool_usage_information_is_appended_to_agent():
|
||||
def _run(self) -> str:
|
||||
return "Howdy!"
|
||||
|
||||
fixed_datetime = datetime(2025, 2, 10, 12, 0, 0, tzinfo=UTC)
|
||||
with patch("datetime.datetime") as mock_datetime:
|
||||
mock_datetime.now.return_value = fixed_datetime
|
||||
mock_datetime.side_effect = lambda *args, **kw: datetime(*args, **kw)
|
||||
agent1 = Agent(
|
||||
role="Friendly Neighbor",
|
||||
goal="Make everyone feel welcome",
|
||||
backstory="You are the friendly neighbor",
|
||||
tools=[MyCustomTool(result_as_answer=True)],
|
||||
)
|
||||
|
||||
agent1 = Agent(
|
||||
role="Friendly Neighbor",
|
||||
goal="Make everyone feel welcome",
|
||||
backstory="You are the friendly neighbor",
|
||||
tools=[MyCustomTool(result_as_answer=True)],
|
||||
)
|
||||
greeting = Task(
|
||||
description="Say an appropriate greeting.",
|
||||
expected_output="The greeting.",
|
||||
agent=agent1,
|
||||
)
|
||||
tasks = [greeting]
|
||||
crew = Crew(agents=[agent1], tasks=tasks)
|
||||
|
||||
greeting = Task(
|
||||
description="Say an appropriate greeting.",
|
||||
expected_output="The greeting.",
|
||||
agent=agent1,
|
||||
)
|
||||
tasks = [greeting]
|
||||
crew = Crew(agents=[agent1], tasks=tasks)
|
||||
|
||||
crew.kickoff()
|
||||
assert agent1.tools_results == [
|
||||
{
|
||||
"result": "Howdy!",
|
||||
"tool_name": "Decide Greetings",
|
||||
"tool_args": {},
|
||||
"result_as_answer": True,
|
||||
"start_time": fixed_datetime,
|
||||
}
|
||||
]
|
||||
crew.kickoff()
|
||||
assert agent1.tools_results == [
|
||||
{
|
||||
"result": "Howdy!",
|
||||
"tool_name": "Decide Greetings",
|
||||
"tool_args": {},
|
||||
"result_as_answer": True,
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def test_agent_definition_based_on_dict():
|
||||
|
||||
@@ -833,6 +833,12 @@ def test_crew_verbose_output(capsys):
|
||||
|
||||
crew.kickoff()
|
||||
captured = capsys.readouterr()
|
||||
|
||||
# Filter out event listener logs (lines starting with '[')
|
||||
filtered_output = "\n".join(
|
||||
line for line in captured.out.split("\n") if not line.startswith("[")
|
||||
)
|
||||
|
||||
expected_strings = [
|
||||
"\x1b[1m\x1b[95m# Agent:\x1b[00m \x1b[1m\x1b[92mResearcher",
|
||||
"\x1b[00m\n\x1b[95m## Task:\x1b[00m \x1b[92mResearch AI advancements.",
|
||||
@@ -845,27 +851,19 @@ def test_crew_verbose_output(capsys):
|
||||
]
|
||||
|
||||
for expected_string in expected_strings:
|
||||
assert expected_string in captured.out
|
||||
assert expected_string in filtered_output
|
||||
|
||||
# Now test with verbose set to False
|
||||
crew.verbose = False
|
||||
crew._logger = Logger(verbose=False)
|
||||
crew.kickoff()
|
||||
expected_listener_logs = [
|
||||
"[🚀 CREW 'CREW' STARTED]",
|
||||
"[📋 TASK STARTED: RESEARCH AI ADVANCEMENTS.]",
|
||||
"[🤖 AGENT 'RESEARCHER' STARTED TASK]",
|
||||
"[✅ AGENT 'RESEARCHER' COMPLETED TASK]",
|
||||
"[✅ TASK COMPLETED: RESEARCH AI ADVANCEMENTS.]",
|
||||
"[📋 TASK STARTED: WRITE ABOUT AI IN HEALTHCARE.]",
|
||||
"[🤖 AGENT 'SENIOR WRITER' STARTED TASK]",
|
||||
"[✅ AGENT 'SENIOR WRITER' COMPLETED TASK]",
|
||||
"[✅ TASK COMPLETED: WRITE ABOUT AI IN HEALTHCARE.]",
|
||||
"[✅ CREW 'CREW' COMPLETED]",
|
||||
]
|
||||
captured = capsys.readouterr()
|
||||
for log in expected_listener_logs:
|
||||
assert log in captured.out
|
||||
filtered_output = "\n".join(
|
||||
line
|
||||
for line in captured.out.split("\n")
|
||||
if not line.startswith("[") and line.strip() and not line.startswith("\x1b")
|
||||
)
|
||||
assert filtered_output == ""
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
|
||||
@@ -654,3 +654,104 @@ def test_flow_plotting():
|
||||
assert isinstance(received_events[0], FlowPlotEvent)
|
||||
assert received_events[0].flow_name == "StatelessFlow"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
|
||||
|
||||
def test_multiple_routers_from_same_trigger():
|
||||
"""Test that multiple routers triggered by the same method all activate their listeners."""
|
||||
execution_order = []
|
||||
|
||||
class MultiRouterFlow(Flow):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
# Set diagnosed conditions to trigger all routers
|
||||
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
|
||||
|
||||
@start()
|
||||
def scan_medical(self):
|
||||
execution_order.append("scan_medical")
|
||||
return "scan_complete"
|
||||
|
||||
@router(scan_medical)
|
||||
def diagnose_conditions(self):
|
||||
execution_order.append("diagnose_conditions")
|
||||
return "diagnosis_complete"
|
||||
|
||||
@router(diagnose_conditions)
|
||||
def diabetes_router(self):
|
||||
execution_order.append("diabetes_router")
|
||||
if "D" in self.state["diagnosed_conditions"]:
|
||||
return "diabetes"
|
||||
return None
|
||||
|
||||
@listen("diabetes")
|
||||
def diabetes_analysis(self):
|
||||
execution_order.append("diabetes_analysis")
|
||||
return "diabetes_analysis_complete"
|
||||
|
||||
@router(diagnose_conditions)
|
||||
def hypertension_router(self):
|
||||
execution_order.append("hypertension_router")
|
||||
if "H" in self.state["diagnosed_conditions"]:
|
||||
return "hypertension"
|
||||
return None
|
||||
|
||||
@listen("hypertension")
|
||||
def hypertension_analysis(self):
|
||||
execution_order.append("hypertension_analysis")
|
||||
return "hypertension_analysis_complete"
|
||||
|
||||
@router(diagnose_conditions)
|
||||
def anemia_router(self):
|
||||
execution_order.append("anemia_router")
|
||||
if "A" in self.state["diagnosed_conditions"]:
|
||||
return "anemia"
|
||||
return None
|
||||
|
||||
@listen("anemia")
|
||||
def anemia_analysis(self):
|
||||
execution_order.append("anemia_analysis")
|
||||
return "anemia_analysis_complete"
|
||||
|
||||
flow = MultiRouterFlow()
|
||||
flow.kickoff()
|
||||
|
||||
# Verify all methods were called
|
||||
assert "scan_medical" in execution_order
|
||||
assert "diagnose_conditions" in execution_order
|
||||
|
||||
# Verify all routers were called
|
||||
assert "diabetes_router" in execution_order
|
||||
assert "hypertension_router" in execution_order
|
||||
assert "anemia_router" in execution_order
|
||||
|
||||
# Verify all listeners were called - this is the key test for the fix
|
||||
assert "diabetes_analysis" in execution_order
|
||||
assert "hypertension_analysis" in execution_order
|
||||
assert "anemia_analysis" in execution_order
|
||||
|
||||
# Verify execution order constraints
|
||||
assert execution_order.index("diagnose_conditions") > execution_order.index(
|
||||
"scan_medical"
|
||||
)
|
||||
|
||||
# All routers should execute after diagnose_conditions
|
||||
assert execution_order.index("diabetes_router") > execution_order.index(
|
||||
"diagnose_conditions"
|
||||
)
|
||||
assert execution_order.index("hypertension_router") > execution_order.index(
|
||||
"diagnose_conditions"
|
||||
)
|
||||
assert execution_order.index("anemia_router") > execution_order.index(
|
||||
"diagnose_conditions"
|
||||
)
|
||||
|
||||
# All analyses should execute after their respective routers
|
||||
assert execution_order.index("diabetes_analysis") > execution_order.index(
|
||||
"diabetes_router"
|
||||
)
|
||||
assert execution_order.index("hypertension_analysis") > execution_order.index(
|
||||
"hypertension_router"
|
||||
)
|
||||
assert execution_order.index("anemia_analysis") > execution_order.index(
|
||||
"anemia_router"
|
||||
)
|
||||
|
||||
@@ -6,7 +6,7 @@ import pytest
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
|
||||
from crewai.llm import LLM
|
||||
from crewai.llm import CONTEXT_WINDOW_USAGE_RATIO, LLM
|
||||
from crewai.utilities.events import crewai_event_bus
|
||||
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
|
||||
from crewai.utilities.token_counter_callback import TokenCalcHandler
|
||||
@@ -285,6 +285,23 @@ def test_o3_mini_reasoning_effort_medium():
|
||||
assert isinstance(result, str)
|
||||
assert "Paris" in result
|
||||
|
||||
def test_context_window_validation():
|
||||
"""Test that context window validation works correctly."""
|
||||
# Test valid window size
|
||||
llm = LLM(model="o3-mini")
|
||||
assert llm.get_context_window_size() == int(200000 * CONTEXT_WINDOW_USAGE_RATIO)
|
||||
|
||||
# Test invalid window size
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
with patch.dict(
|
||||
"crewai.llm.LLM_CONTEXT_WINDOW_SIZES",
|
||||
{"test-model": 500}, # Below minimum
|
||||
clear=True,
|
||||
):
|
||||
llm = LLM(model="test-model")
|
||||
llm.get_context_window_size()
|
||||
assert "must be between 1024 and 2097152" in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@pytest.fixture
|
||||
|
||||
@@ -111,6 +111,7 @@ def test_task_callback():
|
||||
task.execute_sync(agent=researcher)
|
||||
task_completed.assert_called_once_with(task.output)
|
||||
|
||||
assert task.output.key == task.key
|
||||
assert task.output.description == task.description
|
||||
assert task.output.expected_output == task.expected_output
|
||||
assert task.output.name == task.name
|
||||
@@ -149,6 +150,7 @@ def test_task_callback_returns_task_output():
|
||||
assert isinstance(callback_data, str)
|
||||
output_dict = json.loads(callback_data)
|
||||
expected_output = {
|
||||
"key": task.key,
|
||||
"description": task.description,
|
||||
"raw": "exported_ok",
|
||||
"pydantic": None,
|
||||
|
||||
@@ -1,360 +0,0 @@
|
||||
import os
|
||||
from datetime import UTC, datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
from uuid import UUID
|
||||
|
||||
import pytest
|
||||
|
||||
from crewai.traces.context import TraceContext
|
||||
from crewai.traces.enums import CrewType, RunType, TraceType
|
||||
from crewai.traces.models import (
|
||||
CrewTrace,
|
||||
FlowStepIO,
|
||||
LLMRequest,
|
||||
LLMResponse,
|
||||
)
|
||||
from crewai.traces.unified_trace_controller import (
|
||||
UnifiedTraceController,
|
||||
init_crew_main_trace,
|
||||
init_flow_main_trace,
|
||||
should_trace,
|
||||
trace_flow_step,
|
||||
trace_llm_call,
|
||||
)
|
||||
|
||||
|
||||
class TestUnifiedTraceController:
|
||||
@pytest.fixture
|
||||
def basic_trace_controller(self):
|
||||
return UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run-id",
|
||||
agent_role="test-agent",
|
||||
task_name="test-task",
|
||||
task_description="test description",
|
||||
task_id="test-task-id",
|
||||
)
|
||||
|
||||
def test_initialization(self, basic_trace_controller):
|
||||
"""Test basic initialization of UnifiedTraceController"""
|
||||
assert basic_trace_controller.trace_type == TraceType.LLM_CALL
|
||||
assert basic_trace_controller.run_type == RunType.KICKOFF
|
||||
assert basic_trace_controller.crew_type == CrewType.CREW
|
||||
assert basic_trace_controller.run_id == "test-run-id"
|
||||
assert basic_trace_controller.agent_role == "test-agent"
|
||||
assert basic_trace_controller.task_name == "test-task"
|
||||
assert basic_trace_controller.task_description == "test description"
|
||||
assert basic_trace_controller.task_id == "test-task-id"
|
||||
assert basic_trace_controller.status == "running"
|
||||
assert isinstance(UUID(basic_trace_controller.trace_id), UUID)
|
||||
|
||||
def test_start_trace(self, basic_trace_controller):
|
||||
"""Test starting a trace"""
|
||||
result = basic_trace_controller.start_trace()
|
||||
assert result == basic_trace_controller
|
||||
assert basic_trace_controller.start_time is not None
|
||||
assert isinstance(basic_trace_controller.start_time, datetime)
|
||||
|
||||
def test_end_trace_success(self, basic_trace_controller):
|
||||
"""Test ending a trace successfully"""
|
||||
basic_trace_controller.start_trace()
|
||||
basic_trace_controller.end_trace(result={"test": "result"})
|
||||
|
||||
assert basic_trace_controller.end_time is not None
|
||||
assert basic_trace_controller.status == "completed"
|
||||
assert basic_trace_controller.error is None
|
||||
assert basic_trace_controller.context.get("response") == {"test": "result"}
|
||||
|
||||
def test_end_trace_with_error(self, basic_trace_controller):
|
||||
"""Test ending a trace with an error"""
|
||||
basic_trace_controller.start_trace()
|
||||
basic_trace_controller.end_trace(error="Test error occurred")
|
||||
|
||||
assert basic_trace_controller.end_time is not None
|
||||
assert basic_trace_controller.status == "error"
|
||||
assert basic_trace_controller.error == "Test error occurred"
|
||||
|
||||
def test_add_child_trace(self, basic_trace_controller):
|
||||
"""Test adding a child trace"""
|
||||
child_trace = {"id": "child-1", "type": "test"}
|
||||
basic_trace_controller.add_child_trace(child_trace)
|
||||
assert len(basic_trace_controller.children) == 1
|
||||
assert basic_trace_controller.children[0] == child_trace
|
||||
|
||||
def test_to_crew_trace_llm_call(self):
|
||||
"""Test converting to CrewTrace for LLM call"""
|
||||
test_messages = [{"role": "user", "content": "test"}]
|
||||
test_response = {
|
||||
"content": "test response",
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
|
||||
controller = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run-id",
|
||||
context={
|
||||
"messages": test_messages,
|
||||
"temperature": 0.7,
|
||||
"max_tokens": 100,
|
||||
},
|
||||
)
|
||||
|
||||
# Set model and messages in the context
|
||||
controller.context["model"] = "gpt-4"
|
||||
controller.context["messages"] = test_messages
|
||||
|
||||
controller.start_trace()
|
||||
controller.end_trace(result=test_response)
|
||||
|
||||
crew_trace = controller.to_crew_trace()
|
||||
assert isinstance(crew_trace, CrewTrace)
|
||||
assert isinstance(crew_trace.request, LLMRequest)
|
||||
assert isinstance(crew_trace.response, LLMResponse)
|
||||
assert crew_trace.request.model == "gpt-4"
|
||||
assert crew_trace.request.messages == test_messages
|
||||
assert crew_trace.response.content == test_response["content"]
|
||||
assert crew_trace.response.finish_reason == test_response["finish_reason"]
|
||||
|
||||
def test_to_crew_trace_flow_step(self):
|
||||
"""Test converting to CrewTrace for flow step"""
|
||||
flow_step_data = {
|
||||
"function_name": "test_function",
|
||||
"inputs": {"param1": "value1"},
|
||||
"metadata": {"meta": "data"},
|
||||
}
|
||||
|
||||
controller = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.FLOW,
|
||||
run_id="test-run-id",
|
||||
flow_step=flow_step_data,
|
||||
)
|
||||
|
||||
controller.start_trace()
|
||||
controller.end_trace(result="test result")
|
||||
|
||||
crew_trace = controller.to_crew_trace()
|
||||
assert isinstance(crew_trace, CrewTrace)
|
||||
assert isinstance(crew_trace.flow_step, FlowStepIO)
|
||||
assert crew_trace.flow_step.function_name == "test_function"
|
||||
assert crew_trace.flow_step.inputs == {"param1": "value1"}
|
||||
assert crew_trace.flow_step.outputs == {"result": "test result"}
|
||||
|
||||
def test_should_trace(self):
|
||||
"""Test should_trace function"""
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
assert should_trace() is True
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "false"}):
|
||||
assert should_trace() is False
|
||||
|
||||
with patch.dict(os.environ, clear=True):
|
||||
assert should_trace() is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_trace_flow_step_decorator(self):
|
||||
"""Test trace_flow_step decorator"""
|
||||
|
||||
class TestFlow:
|
||||
flow_id = "test-flow-id"
|
||||
|
||||
@trace_flow_step
|
||||
async def test_method(self, method_name, method, *args, **kwargs):
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
flow = TestFlow()
|
||||
result = await flow.test_method("test_method", lambda x: x, arg1="value1")
|
||||
assert result == "test result"
|
||||
|
||||
def test_trace_llm_call_decorator(self):
|
||||
"""Test trace_llm_call decorator"""
|
||||
|
||||
class TestLLM:
|
||||
model = "gpt-4"
|
||||
temperature = 0.7
|
||||
max_tokens = 100
|
||||
stop = None
|
||||
|
||||
def _get_execution_context(self):
|
||||
return MagicMock(), MagicMock()
|
||||
|
||||
def _get_new_messages(self, messages):
|
||||
return messages
|
||||
|
||||
def _get_new_tool_results(self, agent):
|
||||
return []
|
||||
|
||||
@trace_llm_call
|
||||
def test_method(self, params):
|
||||
return {
|
||||
"choices": [
|
||||
{
|
||||
"message": {"content": "test response"},
|
||||
"finish_reason": "stop",
|
||||
}
|
||||
],
|
||||
"usage": {
|
||||
"total_tokens": 50,
|
||||
"prompt_tokens": 20,
|
||||
"completion_tokens": 30,
|
||||
},
|
||||
}
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
llm = TestLLM()
|
||||
result = llm.test_method({"messages": []})
|
||||
assert result["choices"][0]["message"]["content"] == "test response"
|
||||
|
||||
def test_init_crew_main_trace_kickoff(self):
|
||||
"""Test init_crew_main_trace in kickoff mode"""
|
||||
trace_context = None
|
||||
|
||||
class TestCrew:
|
||||
id = "test-crew-id"
|
||||
_test = False
|
||||
_train = False
|
||||
|
||||
@init_crew_main_trace
|
||||
def test_method(self):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
crew = TestCrew()
|
||||
result = test_method(crew)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.trace_type == TraceType.LLM_CALL
|
||||
assert trace_context.run_type == RunType.KICKOFF
|
||||
assert trace_context.crew_type == CrewType.CREW
|
||||
assert trace_context.run_id == str(crew.id)
|
||||
|
||||
def test_init_crew_main_trace_test_mode(self):
|
||||
"""Test init_crew_main_trace in test mode"""
|
||||
trace_context = None
|
||||
|
||||
class TestCrew:
|
||||
id = "test-crew-id"
|
||||
_test = True
|
||||
_train = False
|
||||
|
||||
@init_crew_main_trace
|
||||
def test_method(self):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
crew = TestCrew()
|
||||
result = test_method(crew)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.run_type == RunType.TEST
|
||||
|
||||
def test_init_crew_main_trace_train_mode(self):
|
||||
"""Test init_crew_main_trace in train mode"""
|
||||
trace_context = None
|
||||
|
||||
class TestCrew:
|
||||
id = "test-crew-id"
|
||||
_test = False
|
||||
_train = True
|
||||
|
||||
@init_crew_main_trace
|
||||
def test_method(self):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
crew = TestCrew()
|
||||
result = test_method(crew)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.run_type == RunType.TRAIN
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_init_flow_main_trace(self):
|
||||
"""Test init_flow_main_trace decorator"""
|
||||
trace_context = None
|
||||
test_inputs = {"test": "input"}
|
||||
|
||||
class TestFlow:
|
||||
flow_id = "test-flow-id"
|
||||
|
||||
@init_flow_main_trace
|
||||
async def test_method(self, **kwargs):
|
||||
nonlocal trace_context
|
||||
trace_context = TraceContext.get_current()
|
||||
# Verify the context is set during execution
|
||||
assert trace_context.context["context"]["inputs"] == test_inputs
|
||||
return "test result"
|
||||
|
||||
with patch.dict(os.environ, {"CREWAI_ENABLE_TRACING": "true"}):
|
||||
flow = TestFlow()
|
||||
result = await flow.test_method(inputs=test_inputs)
|
||||
assert result == "test result"
|
||||
assert trace_context is not None
|
||||
assert trace_context.trace_type == TraceType.FLOW_STEP
|
||||
assert trace_context.crew_type == CrewType.FLOW
|
||||
assert trace_context.run_type == RunType.KICKOFF
|
||||
assert trace_context.run_id == str(flow.flow_id)
|
||||
assert trace_context.context["context"]["inputs"] == test_inputs
|
||||
|
||||
def test_trace_context_management(self):
|
||||
"""Test TraceContext management"""
|
||||
trace1 = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run-1",
|
||||
)
|
||||
|
||||
trace2 = UnifiedTraceController(
|
||||
trace_type=TraceType.FLOW_STEP,
|
||||
run_type=RunType.TEST,
|
||||
crew_type=CrewType.FLOW,
|
||||
run_id="test-run-2",
|
||||
)
|
||||
|
||||
# Test that context is initially empty
|
||||
assert TraceContext.get_current() is None
|
||||
|
||||
# Test setting and getting context
|
||||
with TraceContext.set_current(trace1):
|
||||
assert TraceContext.get_current() == trace1
|
||||
|
||||
# Test nested context
|
||||
with TraceContext.set_current(trace2):
|
||||
assert TraceContext.get_current() == trace2
|
||||
|
||||
# Test context restoration after nested block
|
||||
assert TraceContext.get_current() == trace1
|
||||
|
||||
# Test context cleanup after with block
|
||||
assert TraceContext.get_current() is None
|
||||
|
||||
def test_trace_context_error_handling(self):
|
||||
"""Test TraceContext error handling"""
|
||||
trace = UnifiedTraceController(
|
||||
trace_type=TraceType.LLM_CALL,
|
||||
run_type=RunType.KICKOFF,
|
||||
crew_type=CrewType.CREW,
|
||||
run_id="test-run",
|
||||
)
|
||||
|
||||
# Test that context is properly cleaned up even if an error occurs
|
||||
try:
|
||||
with TraceContext.set_current(trace):
|
||||
raise ValueError("Test error")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
assert TraceContext.get_current() is None
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,14 +1,9 @@
|
||||
interactions:
|
||||
- request:
|
||||
body: '{"model": "llama3.2:3b", "prompt": "### User:\nName: Alice Llama, Age:
|
||||
30\n\n### System:\nProduce JSON OUTPUT ONLY! Adhere to this format {\"name\":
|
||||
\"function_name\", \"arguments\":{\"argument_name\": \"argument_value\"}} The
|
||||
following functions are available to you:\n{''type'': ''function'', ''function'':
|
||||
{''name'': ''SimpleModel'', ''description'': ''Correctly extracted `SimpleModel`
|
||||
with all the required parameters with correct types'', ''parameters'': {''properties'':
|
||||
{''name'': {''title'': ''Name'', ''type'': ''string''}, ''age'': {''title'':
|
||||
''Age'', ''type'': ''integer''}}, ''required'': [''age'', ''name''], ''type'':
|
||||
''object''}}}\n\n\n", "options": {}, "stream": false, "format": "json"}'
|
||||
body: '{"model": "llama3.2:3b", "prompt": "### System:\nPlease convert the following
|
||||
text into valid JSON.\n\nOutput ONLY the valid JSON and nothing else.\n\nThe
|
||||
JSON must follow this format exactly:\n{\n \"name\": str,\n \"age\": int\n}\n\n###
|
||||
User:\nName: Alice Llama, Age: 30\n\n", "options": {"stop": []}, "stream": false}'
|
||||
headers:
|
||||
accept:
|
||||
- '*/*'
|
||||
@@ -17,23 +12,23 @@ interactions:
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '657'
|
||||
- '321'
|
||||
host:
|
||||
- localhost:11434
|
||||
user-agent:
|
||||
- litellm/1.57.4
|
||||
- litellm/1.60.2
|
||||
method: POST
|
||||
uri: http://localhost:11434/api/generate
|
||||
response:
|
||||
content: '{"model":"llama3.2:3b","created_at":"2025-01-15T20:47:11.926411Z","response":"{\"name\":
|
||||
\"SimpleModel\", \"arguments\":{\"name\": \"Alice Llama\", \"age\": 30}}","done":true,"done_reason":"stop","context":[128006,9125,128007,271,38766,1303,33025,2696,25,6790,220,2366,18,271,128009,128006,882,128007,271,14711,2724,512,678,25,30505,445,81101,11,13381,25,220,966,271,14711,744,512,1360,13677,4823,32090,27785,0,2467,6881,311,420,3645,5324,609,794,330,1723,1292,498,330,16774,23118,14819,1292,794,330,14819,3220,32075,578,2768,5865,527,2561,311,499,512,13922,1337,1232,364,1723,518,364,1723,1232,5473,609,1232,364,16778,1747,518,364,4789,1232,364,34192,398,28532,1595,16778,1747,63,449,682,279,2631,5137,449,4495,4595,518,364,14105,1232,5473,13495,1232,5473,609,1232,5473,2150,1232,364,678,518,364,1337,1232,364,928,25762,364,425,1232,5473,2150,1232,364,17166,518,364,1337,1232,364,11924,8439,2186,364,6413,1232,2570,425,518,364,609,4181,364,1337,1232,364,1735,23742,3818,128009,128006,78191,128007,271,5018,609,794,330,16778,1747,498,330,16774,23118,609,794,330,62786,445,81101,498,330,425,794,220,966,3500],"total_duration":3374470708,"load_duration":1075750500,"prompt_eval_count":167,"prompt_eval_duration":1871000000,"eval_count":24,"eval_duration":426000000}'
|
||||
content: '{"model":"llama3.2:3b","created_at":"2025-02-21T02:57:55.059392Z","response":"{\"name\":
|
||||
\"Alice Llama\", \"age\": 30}","done":true,"done_reason":"stop","context":[128006,9125,128007,271,38766,1303,33025,2696,25,6790,220,2366,18,271,128009,128006,882,128007,271,14711,744,512,5618,5625,279,2768,1495,1139,2764,4823,382,5207,27785,279,2764,4823,323,4400,775,382,791,4823,2011,1833,420,3645,7041,512,517,220,330,609,794,610,345,220,330,425,794,528,198,633,14711,2724,512,678,25,30505,445,81101,11,13381,25,220,966,271,128009,128006,78191,128007,271,5018,609,794,330,62786,445,81101,498,330,425,794,220,966,92],"total_duration":4675906000,"load_duration":836091458,"prompt_eval_count":82,"prompt_eval_duration":3561000000,"eval_count":15,"eval_duration":275000000}'
|
||||
headers:
|
||||
Content-Length:
|
||||
- '1263'
|
||||
- '761'
|
||||
Content-Type:
|
||||
- application/json; charset=utf-8
|
||||
Date:
|
||||
- Wed, 15 Jan 2025 20:47:12 GMT
|
||||
- Fri, 21 Feb 2025 02:57:55 GMT
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
- request:
|
||||
@@ -52,7 +47,7 @@ interactions:
|
||||
host:
|
||||
- localhost:11434
|
||||
user-agent:
|
||||
- litellm/1.57.4
|
||||
- litellm/1.60.2
|
||||
method: POST
|
||||
uri: http://localhost:11434/api/show
|
||||
response:
|
||||
@@ -228,7 +223,7 @@ interactions:
|
||||
Reporting violations of the Acceptable Use Policy or unlicensed uses of Llama
|
||||
3.2: LlamaUseReport@meta.com\",\"modelfile\":\"# Modelfile generated by \\\"ollama
|
||||
show\\\"\\n# To build a new Modelfile based on this, replace FROM with:\\n#
|
||||
FROM llama3.2:3b\\n\\nFROM /Users/brandonhancock/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
|
||||
FROM llama3.2:3b\\n\\nFROM /Users/joaomoura/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
|
||||
\\\"\\\"\\\"\\u003c|start_header_id|\\u003esystem\\u003c|end_header_id|\\u003e\\n\\nCutting
|
||||
Knowledge Date: December 2023\\n\\n{{ if .System }}{{ .System }}\\n{{- end }}\\n{{-
|
||||
if .Tools }}When you receive a tool call response, use the output to format
|
||||
@@ -441,12 +436,12 @@ interactions:
|
||||
.Content }}\\n{{- end }}{{ if not $last }}\\u003c|eot_id|\\u003e{{ end }}\\n{{-
|
||||
else if eq .Role \\\"tool\\\" }}\\u003c|start_header_id|\\u003eipython\\u003c|end_header_id|\\u003e\\n\\n{{
|
||||
.Content }}\\u003c|eot_id|\\u003e{{ if $last }}\\u003c|start_header_id|\\u003eassistant\\u003c|end_header_id|\\u003e\\n\\n{{
|
||||
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2024-12-31T11:53:14.529771974-05:00\"}"
|
||||
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2025-02-20T18:55:09.150577031-08:00\"}"
|
||||
headers:
|
||||
Content-Type:
|
||||
- application/json; charset=utf-8
|
||||
Date:
|
||||
- Wed, 15 Jan 2025 20:47:12 GMT
|
||||
- Fri, 21 Feb 2025 02:57:55 GMT
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
http_version: HTTP/1.1
|
||||
@@ -467,7 +462,7 @@ interactions:
|
||||
host:
|
||||
- localhost:11434
|
||||
user-agent:
|
||||
- litellm/1.57.4
|
||||
- litellm/1.60.2
|
||||
method: POST
|
||||
uri: http://localhost:11434/api/show
|
||||
response:
|
||||
@@ -643,7 +638,7 @@ interactions:
|
||||
Reporting violations of the Acceptable Use Policy or unlicensed uses of Llama
|
||||
3.2: LlamaUseReport@meta.com\",\"modelfile\":\"# Modelfile generated by \\\"ollama
|
||||
show\\\"\\n# To build a new Modelfile based on this, replace FROM with:\\n#
|
||||
FROM llama3.2:3b\\n\\nFROM /Users/brandonhancock/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
|
||||
FROM llama3.2:3b\\n\\nFROM /Users/joaomoura/.ollama/models/blobs/sha256-dde5aa3fc5ffc17176b5e8bdc82f587b24b2678c6c66101bf7da77af9f7ccdff\\nTEMPLATE
|
||||
\\\"\\\"\\\"\\u003c|start_header_id|\\u003esystem\\u003c|end_header_id|\\u003e\\n\\nCutting
|
||||
Knowledge Date: December 2023\\n\\n{{ if .System }}{{ .System }}\\n{{- end }}\\n{{-
|
||||
if .Tools }}When you receive a tool call response, use the output to format
|
||||
@@ -856,12 +851,12 @@ interactions:
|
||||
.Content }}\\n{{- end }}{{ if not $last }}\\u003c|eot_id|\\u003e{{ end }}\\n{{-
|
||||
else if eq .Role \\\"tool\\\" }}\\u003c|start_header_id|\\u003eipython\\u003c|end_header_id|\\u003e\\n\\n{{
|
||||
.Content }}\\u003c|eot_id|\\u003e{{ if $last }}\\u003c|start_header_id|\\u003eassistant\\u003c|end_header_id|\\u003e\\n\\n{{
|
||||
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2024-12-31T11:53:14.529771974-05:00\"}"
|
||||
end }}\\n{{- end }}\\n{{- end }}\",\"details\":{\"parent_model\":\"\",\"format\":\"gguf\",\"family\":\"llama\",\"families\":[\"llama\"],\"parameter_size\":\"3.2B\",\"quantization_level\":\"Q4_K_M\"},\"model_info\":{\"general.architecture\":\"llama\",\"general.basename\":\"Llama-3.2\",\"general.file_type\":15,\"general.finetune\":\"Instruct\",\"general.languages\":[\"en\",\"de\",\"fr\",\"it\",\"pt\",\"hi\",\"es\",\"th\"],\"general.parameter_count\":3212749888,\"general.quantization_version\":2,\"general.size_label\":\"3B\",\"general.tags\":[\"facebook\",\"meta\",\"pytorch\",\"llama\",\"llama-3\",\"text-generation\"],\"general.type\":\"model\",\"llama.attention.head_count\":24,\"llama.attention.head_count_kv\":8,\"llama.attention.key_length\":128,\"llama.attention.layer_norm_rms_epsilon\":0.00001,\"llama.attention.value_length\":128,\"llama.block_count\":28,\"llama.context_length\":131072,\"llama.embedding_length\":3072,\"llama.feed_forward_length\":8192,\"llama.rope.dimension_count\":128,\"llama.rope.freq_base\":500000,\"llama.vocab_size\":128256,\"tokenizer.ggml.bos_token_id\":128000,\"tokenizer.ggml.eos_token_id\":128009,\"tokenizer.ggml.merges\":null,\"tokenizer.ggml.model\":\"gpt2\",\"tokenizer.ggml.pre\":\"llama-bpe\",\"tokenizer.ggml.token_type\":null,\"tokenizer.ggml.tokens\":null},\"modified_at\":\"2025-02-20T18:55:09.150577031-08:00\"}"
|
||||
headers:
|
||||
Content-Type:
|
||||
- application/json; charset=utf-8
|
||||
Date:
|
||||
- Wed, 15 Jan 2025 20:47:12 GMT
|
||||
- Fri, 21 Feb 2025 02:57:55 GMT
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
http_version: HTTP/1.1
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,236 @@
|
||||
interactions:
|
||||
- request:
|
||||
body: '{"messages": [{"role": "system", "content": "You are base_agent. You are
|
||||
a helpful assistant that just says hi\nYour personal goal is: Just say hi\nTo
|
||||
give my best complete final answer to the task respond using the exact following
|
||||
format:\n\nThought: I now can give a great answer\nFinal Answer: Your final
|
||||
answer must be the great and the most complete as possible, it must be outcome
|
||||
described.\n\nI MUST use these formats, my job depends on it!"}, {"role": "user",
|
||||
"content": "\nCurrent Task: Just say hi\n\nThis is the expected criteria for
|
||||
your final answer: hi\nyou MUST return the actual complete content as the final
|
||||
answer, not a summary.\n\nBegin! This is VERY important to you, use the tools
|
||||
available and give your best Final Answer, your job depends on it!\n\nThought:"}],
|
||||
"model": "gpt-4o-mini", "stop": ["\nObservation:"]}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '838'
|
||||
content-type:
|
||||
- application/json
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4VsaBZ4ec4b0ab4pkqWgyxTFVVfc\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740415556,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
|
||||
Answer: hi\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
|
||||
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
|
||||
161,\n \"completion_tokens\": 12,\n \"total_tokens\": 173,\n \"prompt_tokens_details\":
|
||||
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
|
||||
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9170edc5da6f230e-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 16:45:57 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Set-Cookie:
|
||||
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
|
||||
path=/; expires=Mon, 24-Feb-25 17:15:57 GMT; domain=.api.openai.com; HttpOnly;
|
||||
Secure; SameSite=None
|
||||
- _cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000;
|
||||
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '721'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999808'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_fc3b3bcd4382cddaa3c04ce7003e4857
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
- request:
|
||||
body: '{"messages": [{"role": "system", "content": "You are Task Execution Evaluator.
|
||||
Evaluator agent for crew evaluation with precise capabilities to evaluate the
|
||||
performance of the agents in the crew based on the tasks they have performed\nYour
|
||||
personal goal is: Your goal is to evaluate the performance of the agents in
|
||||
the crew based on the tasks they have performed using score from 1 to 10 evaluating
|
||||
on completion, quality, and overall performance.\nTo give my best complete final
|
||||
answer to the task respond using the exact following format:\n\nThought: I now
|
||||
can give a great answer\nFinal Answer: Your final answer must be the great and
|
||||
the most complete as possible, it must be outcome described.\n\nI MUST use these
|
||||
formats, my job depends on it!"}, {"role": "user", "content": "\nCurrent Task:
|
||||
Based on the task description and the expected output, compare and evaluate
|
||||
the performance of the agents in the crew based on the Task Output they have
|
||||
performed using score from 1 to 10 evaluating on completion, quality, and overall
|
||||
performance.task_description: Just say hi task_expected_output: hi agent: base_agent
|
||||
agent_goal: Just say hi Task Output: hi\n\nThis is the expected criteria for
|
||||
your final answer: Evaluation Score from 1 to 10 based on the performance of
|
||||
the agents on the tasks\nyou MUST return the actual complete content as the
|
||||
final answer, not a summary.\nEnsure your final answer contains only the content
|
||||
in the following format: {\n \"quality\": float\n}\n\nEnsure the final output
|
||||
does not include any code block markers like ```json or ```python.\n\nBegin!
|
||||
This is VERY important to you, use the tools available and give your best Final
|
||||
Answer, your job depends on it!\n\nThought:"}], "model": "gpt-4o-mini", "stop":
|
||||
["\nObservation:"]}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '1765'
|
||||
content-type:
|
||||
- application/json
|
||||
cookie:
|
||||
- __cf_bm=lvRw4Nyef7N35to64fj2_kHDfbZp0KSFbwgF5chYMRI-1740415557-1.0.1.1-o5BaN1FpBwv5Wq6zIlv0rCB28lk5hVI9wZQWU3pig1jgyAKDkYzTwZ0MlSR6v6TPIX9RfepjrO3.Gk3FEmcVRw;
|
||||
_cfuvid=ySaVoTQvAcQyH5QoJQJDj75e5j8HwGFPOlFMAWEvXJk-1740415557302-0.0.1.1-604800000
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4Vsbd9AsRaJ2exDtWnHAwC8rIjfi\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740415557,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
|
||||
Answer: { \\n \\\"quality\\\": 10 \\n} \",\n \"refusal\": null\n
|
||||
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
|
||||
\ ],\n \"usage\": {\n \"prompt_tokens\": 338,\n \"completion_tokens\":
|
||||
22,\n \"total_tokens\": 360,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
|
||||
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
|
||||
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9170edd15bb5230e-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 16:45:58 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '860'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999578'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_fad452c2d10b5fc95809130912b08837
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
version: 1
|
||||
103
tests/utilities/cassettes/test_llm_emits_call_failed_event.yaml
Normal file
103
tests/utilities/cassettes/test_llm_emits_call_failed_event.yaml
Normal file
@@ -0,0 +1,103 @@
|
||||
interactions:
|
||||
- request:
|
||||
body: '{"messages": [{"role": "user", "content": "Hello, how are you?"}], "model":
|
||||
"gpt-4o-mini", "stop": []}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '102'
|
||||
content-type:
|
||||
- application/json
|
||||
cookie:
|
||||
- _cfuvid=IY8ppO70AMHr2skDSUsGh71zqHHdCQCZ3OvkPi26NBc-1740424913267-0.0.1.1-604800000;
|
||||
__cf_bm=fU6K5KZoDmgcEuF8_yWAYKUO5fKHh6q5.wDPnna393g-1740424913-1.0.1.1-2iOaq3JVGWs439V0HxJee0IC9HdJm7dPkeJorD.AGw0YwkngRPM8rrTzn_7ht1BkbOauEezj.wPKcBz18gIYUg
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4YLA2SrC2rwdVQ3U87G5a0P5lsLw\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740425016,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"Hello! I'm just a computer program, so
|
||||
I don't have feelings, but I'm here and ready to help you. How can I assist
|
||||
you today?\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
|
||||
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
|
||||
13,\n \"completion_tokens\": 30,\n \"total_tokens\": 43,\n \"prompt_tokens_details\":
|
||||
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
|
||||
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_709714d124\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9171d4c0ed44236e-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 19:23:38 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '1954'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999978'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_ea2703502b8827e4297cd2a7bae9d9c8
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
version: 1
|
||||
108
tests/utilities/cassettes/test_llm_emits_call_started_event.yaml
Normal file
108
tests/utilities/cassettes/test_llm_emits_call_started_event.yaml
Normal file
@@ -0,0 +1,108 @@
|
||||
interactions:
|
||||
- request:
|
||||
body: '{"messages": [{"role": "user", "content": "Hello, how are you?"}], "model":
|
||||
"gpt-4o-mini", "stop": []}'
|
||||
headers:
|
||||
accept:
|
||||
- application/json
|
||||
accept-encoding:
|
||||
- gzip, deflate
|
||||
connection:
|
||||
- keep-alive
|
||||
content-length:
|
||||
- '102'
|
||||
content-type:
|
||||
- application/json
|
||||
cookie:
|
||||
- _cfuvid=GefCcEtb_Gem93E4a9Hvt3Xyof1YQZVJAXBb9I6pEUs-1739398417375-0.0.1.1-604800000
|
||||
host:
|
||||
- api.openai.com
|
||||
user-agent:
|
||||
- OpenAI/Python 1.61.0
|
||||
x-stainless-arch:
|
||||
- arm64
|
||||
x-stainless-async:
|
||||
- 'false'
|
||||
x-stainless-lang:
|
||||
- python
|
||||
x-stainless-os:
|
||||
- MacOS
|
||||
x-stainless-package-version:
|
||||
- 1.61.0
|
||||
x-stainless-raw-response:
|
||||
- 'true'
|
||||
x-stainless-retry-count:
|
||||
- '0'
|
||||
x-stainless-runtime:
|
||||
- CPython
|
||||
x-stainless-runtime-version:
|
||||
- 3.12.8
|
||||
method: POST
|
||||
uri: https://api.openai.com/v1/chat/completions
|
||||
response:
|
||||
content: "{\n \"id\": \"chatcmpl-B4YJU8IWKGyBQtAyPDRd3SFI2flYR\",\n \"object\":
|
||||
\"chat.completion\",\n \"created\": 1740424912,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
|
||||
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
|
||||
\"assistant\",\n \"content\": \"Hello! I'm just a computer program, so
|
||||
I don't have feelings, but I'm here and ready to help you. How can I assist
|
||||
you today?\",\n \"refusal\": null\n },\n \"logprobs\": null,\n
|
||||
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
|
||||
13,\n \"completion_tokens\": 30,\n \"total_tokens\": 43,\n \"prompt_tokens_details\":
|
||||
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
|
||||
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
|
||||
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
|
||||
\"default\",\n \"system_fingerprint\": \"fp_7fcd609668\"\n}\n"
|
||||
headers:
|
||||
CF-RAY:
|
||||
- 9171d230d8ed7ae0-SJC
|
||||
Connection:
|
||||
- keep-alive
|
||||
Content-Encoding:
|
||||
- gzip
|
||||
Content-Type:
|
||||
- application/json
|
||||
Date:
|
||||
- Mon, 24 Feb 2025 19:21:53 GMT
|
||||
Server:
|
||||
- cloudflare
|
||||
Set-Cookie:
|
||||
- __cf_bm=fU6K5KZoDmgcEuF8_yWAYKUO5fKHh6q5.wDPnna393g-1740424913-1.0.1.1-2iOaq3JVGWs439V0HxJee0IC9HdJm7dPkeJorD.AGw0YwkngRPM8rrTzn_7ht1BkbOauEezj.wPKcBz18gIYUg;
|
||||
path=/; expires=Mon, 24-Feb-25 19:51:53 GMT; domain=.api.openai.com; HttpOnly;
|
||||
Secure; SameSite=None
|
||||
- _cfuvid=IY8ppO70AMHr2skDSUsGh71zqHHdCQCZ3OvkPi26NBc-1740424913267-0.0.1.1-604800000;
|
||||
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
|
||||
Transfer-Encoding:
|
||||
- chunked
|
||||
X-Content-Type-Options:
|
||||
- nosniff
|
||||
access-control-expose-headers:
|
||||
- X-Request-ID
|
||||
alt-svc:
|
||||
- h3=":443"; ma=86400
|
||||
cf-cache-status:
|
||||
- DYNAMIC
|
||||
openai-organization:
|
||||
- crewai-iuxna1
|
||||
openai-processing-ms:
|
||||
- '993'
|
||||
openai-version:
|
||||
- '2020-10-01'
|
||||
strict-transport-security:
|
||||
- max-age=31536000; includeSubDomains; preload
|
||||
x-ratelimit-limit-requests:
|
||||
- '30000'
|
||||
x-ratelimit-limit-tokens:
|
||||
- '150000000'
|
||||
x-ratelimit-remaining-requests:
|
||||
- '29999'
|
||||
x-ratelimit-remaining-tokens:
|
||||
- '149999978'
|
||||
x-ratelimit-reset-requests:
|
||||
- 2ms
|
||||
x-ratelimit-reset-tokens:
|
||||
- 0s
|
||||
x-request-id:
|
||||
- req_d9c4d49185e97b1797061efc1e55d811
|
||||
http_version: HTTP/1.1
|
||||
status_code: 200
|
||||
version: 1
|
||||
@@ -1,4 +1,5 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, List, Optional
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
@@ -220,10 +221,13 @@ def test_get_conversion_instructions_gpt():
|
||||
supports_function_calling.return_value = True
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
model_schema = PydanticSchemaParser(model=SimpleModel).get_schema()
|
||||
assert (
|
||||
instructions
|
||||
== f"Please convert the following text into valid JSON.\n\nThe JSON should follow this schema:\n```json\n{model_schema}\n```"
|
||||
expected_instructions = (
|
||||
"Please convert the following text into valid JSON.\n\n"
|
||||
"Output ONLY the valid JSON and nothing else.\n\n"
|
||||
"The JSON must follow this schema exactly:\n```json\n"
|
||||
f"{model_schema}\n```"
|
||||
)
|
||||
assert instructions == expected_instructions
|
||||
|
||||
|
||||
def test_get_conversion_instructions_non_gpt():
|
||||
@@ -346,12 +350,17 @@ def test_convert_with_instructions():
|
||||
assert output.age == 30
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
# Skip tests that call external APIs when running in CI/CD
|
||||
skip_external_api = pytest.mark.skipif(
|
||||
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
|
||||
)
|
||||
|
||||
|
||||
@skip_external_api
|
||||
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
|
||||
def test_converter_with_llama3_2_model():
|
||||
llm = LLM(model="ollama/llama3.2:3b", base_url="http://localhost:11434")
|
||||
|
||||
sample_text = "Name: Alice Llama, Age: 30"
|
||||
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
@@ -359,19 +368,17 @@ def test_converter_with_llama3_2_model():
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Alice Llama"
|
||||
assert output.age == 30
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@skip_external_api
|
||||
@pytest.mark.vcr(filter_headers=["authorization"], record_mode="once")
|
||||
def test_converter_with_llama3_1_model():
|
||||
llm = LLM(model="ollama/llama3.1", base_url="http://localhost:11434")
|
||||
sample_text = "Name: Alice Llama, Age: 30"
|
||||
|
||||
instructions = get_conversion_instructions(SimpleModel, llm)
|
||||
converter = Converter(
|
||||
llm=llm,
|
||||
@@ -379,14 +386,19 @@ def test_converter_with_llama3_1_model():
|
||||
model=SimpleModel,
|
||||
instructions=instructions,
|
||||
)
|
||||
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert isinstance(output, SimpleModel)
|
||||
assert output.name == "Alice Llama"
|
||||
assert output.age == 30
|
||||
|
||||
|
||||
# Skip tests that call external APIs when running in CI/CD
|
||||
skip_external_api = pytest.mark.skipif(
|
||||
os.getenv("CI") is not None, reason="Skipping tests that call external API in CI/CD"
|
||||
)
|
||||
|
||||
|
||||
@skip_external_api
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_converter_with_nested_model():
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
@@ -563,7 +575,7 @@ def test_converter_with_ambiguous_input():
|
||||
with pytest.raises(ConverterError) as exc_info:
|
||||
output = converter.to_pydantic()
|
||||
|
||||
assert "validation error" in str(exc_info.value).lower()
|
||||
assert "failed to convert text into a pydantic model" in str(exc_info.value).lower()
|
||||
|
||||
|
||||
# Tests for function calling support
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
from pydantic import Field
|
||||
@@ -9,9 +8,9 @@ from crewai.agent import Agent
|
||||
from crewai.agents.crew_agent_executor import CrewAgentExecutor
|
||||
from crewai.crew import Crew
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from crewai.llm import LLM
|
||||
from crewai.task import Task
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.tools.tool_usage import ToolUsage
|
||||
from crewai.utilities.events.agent_events import (
|
||||
AgentExecutionCompletedEvent,
|
||||
AgentExecutionErrorEvent,
|
||||
@@ -21,8 +20,11 @@ from crewai.utilities.events.crew_events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
CrewKickoffStartedEvent,
|
||||
CrewTestCompletedEvent,
|
||||
CrewTestStartedEvent,
|
||||
)
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.event_listener import EventListener
|
||||
from crewai.utilities.events.event_types import ToolUsageFinishedEvent
|
||||
from crewai.utilities.events.flow_events import (
|
||||
FlowCreatedEvent,
|
||||
@@ -31,6 +33,12 @@ from crewai.utilities.events.flow_events import (
|
||||
MethodExecutionFailedEvent,
|
||||
MethodExecutionStartedEvent,
|
||||
)
|
||||
from crewai.utilities.events.llm_events import (
|
||||
LLMCallCompletedEvent,
|
||||
LLMCallFailedEvent,
|
||||
LLMCallStartedEvent,
|
||||
LLMCallType,
|
||||
)
|
||||
from crewai.utilities.events.task_events import (
|
||||
TaskCompletedEvent,
|
||||
TaskFailedEvent,
|
||||
@@ -52,26 +60,35 @@ base_task = Task(
|
||||
expected_output="hi",
|
||||
agent=base_agent,
|
||||
)
|
||||
event_listener = EventListener()
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_emits_start_kickoff_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def handle_crew_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
@crewai_event_bus.on(CrewKickoffStartedEvent)
|
||||
def handle_crew_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "crew_execution_span", return_value=mock_span
|
||||
) as mock_crew_execution_span,
|
||||
patch.object(
|
||||
event_listener._telemetry, "end_crew", return_value=mock_span
|
||||
) as mock_crew_ended,
|
||||
):
|
||||
crew.kickoff()
|
||||
mock_crew_execution_span.assert_called_once_with(crew, None)
|
||||
mock_crew_ended.assert_called_once_with(crew, "hi")
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
assert received_events[0].type == "crew_kickoff_started"
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
assert received_events[0].type == "crew_kickoff_started"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
@@ -92,6 +109,45 @@ def test_crew_emits_end_kickoff_event():
|
||||
assert received_events[0].type == "crew_kickoff_completed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_emits_test_kickoff_type_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
@crewai_event_bus.on(CrewTestStartedEvent)
|
||||
def handle_crew_end(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
@crewai_event_bus.on(CrewTestCompletedEvent)
|
||||
def handle_crew_test_end(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
eval_llm = LLM(model="gpt-4o-mini")
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "test_execution_span", return_value=mock_span
|
||||
) as mock_crew_execution_span,
|
||||
):
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
crew.test(n_iterations=1, eval_llm=eval_llm)
|
||||
|
||||
# Verify the call was made with correct argument types and values
|
||||
assert mock_crew_execution_span.call_count == 1
|
||||
args = mock_crew_execution_span.call_args[0]
|
||||
assert isinstance(args[0], Crew)
|
||||
assert args[1] == 1
|
||||
assert args[2] is None
|
||||
assert args[3] == eval_llm
|
||||
|
||||
assert len(received_events) == 2
|
||||
assert received_events[0].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
assert received_events[0].type == "crew_test_started"
|
||||
assert received_events[1].crew_name == "TestCrew"
|
||||
assert isinstance(received_events[1].timestamp, datetime)
|
||||
assert received_events[1].type == "crew_test_completed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_crew_emits_kickoff_failed_event():
|
||||
received_events = []
|
||||
@@ -142,9 +198,20 @@ def test_crew_emits_end_task_event():
|
||||
def handle_task_end(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
mock_span = Mock()
|
||||
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "task_started", return_value=mock_span
|
||||
) as mock_task_started,
|
||||
patch.object(
|
||||
event_listener._telemetry, "task_ended", return_value=mock_span
|
||||
) as mock_task_ended,
|
||||
):
|
||||
crew.kickoff()
|
||||
|
||||
crew.kickoff()
|
||||
mock_task_started.assert_called_once_with(crew=crew, task=base_task)
|
||||
mock_task_ended.assert_called_once_with(mock_span, base_task, crew)
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert isinstance(received_events[0].timestamp, datetime)
|
||||
@@ -334,24 +401,29 @@ def test_tools_emits_error_events():
|
||||
|
||||
def test_flow_emits_start_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle_flow_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle_flow_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
class TestFlow(Flow[dict]):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
class TestFlow(Flow[dict]):
|
||||
@start()
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "flow_execution_span", return_value=mock_span
|
||||
) as mock_flow_execution_span,
|
||||
):
|
||||
flow = TestFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
assert received_events[0].type == "flow_started"
|
||||
mock_flow_execution_span.assert_called_once_with("TestFlow", ["begin"])
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
assert received_events[0].type == "flow_started"
|
||||
|
||||
|
||||
def test_flow_emits_finish_event():
|
||||
@@ -455,6 +527,7 @@ def test_multiple_handlers_for_same_event():
|
||||
|
||||
def test_flow_emits_created_event():
|
||||
received_events = []
|
||||
mock_span = Mock()
|
||||
|
||||
@crewai_event_bus.on(FlowCreatedEvent)
|
||||
def handle_flow_created(source, event):
|
||||
@@ -465,8 +538,15 @@ def test_flow_emits_created_event():
|
||||
def begin(self):
|
||||
return "started"
|
||||
|
||||
flow = TestFlow()
|
||||
flow.kickoff()
|
||||
with (
|
||||
patch.object(
|
||||
event_listener._telemetry, "flow_creation_span", return_value=mock_span
|
||||
) as mock_flow_creation_span,
|
||||
):
|
||||
flow = TestFlow()
|
||||
flow.kickoff()
|
||||
|
||||
mock_flow_creation_span.assert_called_once_with("TestFlow")
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
@@ -495,3 +575,43 @@ def test_flow_emits_method_execution_failed_event():
|
||||
assert received_events[0].flow_name == "TestFlow"
|
||||
assert received_events[0].type == "method_execution_failed"
|
||||
assert received_events[0].error == error
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_llm_emits_call_started_event():
|
||||
received_events = []
|
||||
|
||||
@crewai_event_bus.on(LLMCallStartedEvent)
|
||||
def handle_llm_call_started(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
@crewai_event_bus.on(LLMCallCompletedEvent)
|
||||
def handle_llm_call_completed(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
llm.call("Hello, how are you?")
|
||||
|
||||
assert len(received_events) == 2
|
||||
assert received_events[0].type == "llm_call_started"
|
||||
assert received_events[1].type == "llm_call_completed"
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_llm_emits_call_failed_event():
|
||||
received_events = []
|
||||
|
||||
@crewai_event_bus.on(LLMCallFailedEvent)
|
||||
def handle_llm_call_failed(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
error_message = "Simulated LLM call failure"
|
||||
with patch("crewai.llm.litellm.completion", side_effect=Exception(error_message)):
|
||||
llm = LLM(model="gpt-4o-mini")
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
llm.call("Hello, how are you?")
|
||||
|
||||
assert str(exc_info.value) == error_message
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].type == "llm_call_failed"
|
||||
assert received_events[0].error == error_message
|
||||
|
||||
Reference in New Issue
Block a user