Compare commits

..

7 Commits

Author SHA1 Message Date
Brandon Hancock
0e14b82124 Fix failing test 2025-02-24 16:02:09 -05:00
Brandon Hancock (bhancock_ai)
38d92d1aaf Merge branch 'main' into revert-90f1bee 2025-02-24 15:46:50 -05:00
Lorenze Jay
70b5e753a5 Decoupling telemetry and ensure tests (#2212)
* feat: Enhance event listener and telemetry tracking

- Update event listener to improve telemetry span handling
- Add execution_span field to Task for better tracing
- Modify event handling in EventListener to use new span tracking
- Remove debug print statements
- Improve test coverage for crew and flow events
- Update cassettes to reflect new event tracking behavior

* Remove telemetry references from Crew class

- Remove Telemetry import and initialization from Crew class
- Delete _telemetry attribute from class configuration
- Clean up unused telemetry-related code

* test: Improve crew verbose output test with event log filtering

- Filter out event listener logs in verbose output test
- Ensure no output when verbose is set to False
- Enhance test coverage for crew logging behavior

* dropped comment

* refactor: Improve telemetry span tracking in EventListener

- Remove `execution_span` from Task class
- Add `execution_spans` dictionary to EventListener to track spans
- Update task event handlers to use new span tracking mechanism
- Simplify span management across task lifecycle events

* lint
2025-02-24 15:44:41 -05:00
Brandon Hancock (bhancock_ai)
1f8b90e391 Merge branch 'main' into revert-90f1bee 2025-02-24 15:18:22 -05:00
Brandon Hancock
8ac6f6a536 Fix issues with flows post merge 2025-02-24 10:47:25 -05:00
Brandon Hancock (bhancock_ai)
a1cb222f3a Merge branch 'main' into revert-90f1bee 2025-02-24 10:34:52 -05:00
Brandon Hancock
6d846c0024 Revert "feat: add prompt observability code (#2027)"
This reverts commit 90f1bee602.
2025-02-24 10:31:52 -05:00
13 changed files with 70 additions and 205 deletions

View File

@@ -506,7 +506,7 @@ my_crew = Crew(
)
```
### Resetting Memory via cli
### Resetting Memory
```shell
crewai reset-memories [OPTIONS]
@@ -520,46 +520,8 @@ crewai reset-memories [OPTIONS]
| `-s`, `--short` | Reset SHORT TERM memory. | Flag (boolean) | False |
| `-e`, `--entities` | Reset ENTITIES memory. | Flag (boolean) | False |
| `-k`, `--kickoff-outputs` | Reset LATEST KICKOFF TASK OUTPUTS. | Flag (boolean) | False |
| `-kn`, `--knowledge` | Reset KNOWLEDEGE storage | Flag (boolean) | False |
| `-a`, `--all` | Reset ALL memories. | Flag (boolean) | False |
Note: To use the cli command you need to have your crew in a file called crew.py in the same directory.
### Resetting Memory via crew object
```python
my_crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential,
memory=True,
verbose=True,
embedder={
"provider": "custom",
"config": {
"embedder": CustomEmbedder()
}
}
)
my_crew.reset_memories(command_type = 'all') # Resets all the memory
```
#### Resetting Memory Options
| Command Type | Description |
| :----------------- | :------------------------------- |
| `long` | Reset LONG TERM memory. |
| `short` | Reset SHORT TERM memory. |
| `entities` | Reset ENTITIES memory. |
| `kickoff_outputs` | Reset LATEST KICKOFF TASK OUTPUTS. |
| `knowledge` | Reset KNOWLEDGE memory. |
| `all` | Reset ALL memories. |
## Benefits of Using CrewAI's Memory System

View File

@@ -54,8 +54,7 @@ coding_agent = Agent(
# Create a task that requires code execution
data_analysis_task = Task(
description="Analyze the given dataset and calculate the average age of participants. Ages: {ages}",
agent=coding_agent,
expected_output="The average age of the participants."
agent=coding_agent
)
# Create a crew and add the task
@@ -117,4 +116,4 @@ async def async_multiple_crews():
# Run the async function
asyncio.run(async_multiple_crews())
```
```

View File

@@ -114,6 +114,7 @@ class Agent(BaseAgent):
@model_validator(mode="after")
def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role
self.llm = create_llm(self.llm)
@@ -133,11 +134,8 @@ class Agent(BaseAgent):
self.cache_handler = CacheHandler()
self.set_cache_handler(self.cache_handler)
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
def _set_knowledge(self):
try:
if self.embedder is None and crew_embedder:
self.embedder = crew_embedder
if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"

View File

@@ -351,6 +351,3 @@ class BaseAgent(ABC, BaseModel):
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
def set_knowledge(self, crew_embedder: Optional[Dict[str, Any]] = None):
pass

View File

@@ -216,43 +216,10 @@ MODELS = {
"watsonx/ibm/granite-3-8b-instruct",
],
"bedrock": [
"bedrock/us.amazon.nova-pro-v1:0",
"bedrock/us.amazon.nova-micro-v1:0",
"bedrock/us.amazon.nova-lite-v1:0",
"bedrock/us.anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/us.anthropic.claude-3-5-haiku-20241022-v1:0",
"bedrock/us.anthropic.claude-3-5-sonnet-20241022-v2:0",
"bedrock/us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"bedrock/us.anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/us.anthropic.claude-3-opus-20240229-v1:0",
"bedrock/us.anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/us.meta.llama3-2-11b-instruct-v1:0",
"bedrock/us.meta.llama3-2-3b-instruct-v1:0",
"bedrock/us.meta.llama3-2-90b-instruct-v1:0",
"bedrock/us.meta.llama3-2-1b-instruct-v1:0",
"bedrock/us.meta.llama3-1-8b-instruct-v1:0",
"bedrock/us.meta.llama3-1-70b-instruct-v1:0",
"bedrock/us.meta.llama3-3-70b-instruct-v1:0",
"bedrock/us.meta.llama3-1-405b-instruct-v1:0",
"bedrock/eu.anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/eu.anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/eu.anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/eu.meta.llama3-2-3b-instruct-v1:0",
"bedrock/eu.meta.llama3-2-1b-instruct-v1:0",
"bedrock/apac.anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/apac.anthropic.claude-3-5-sonnet-20241022-v2:0",
"bedrock/apac.anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/apac.anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/amazon.nova-pro-v1:0",
"bedrock/amazon.nova-micro-v1:0",
"bedrock/amazon.nova-lite-v1:0",
"bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/anthropic.claude-3-5-haiku-20241022-v1:0",
"bedrock/anthropic.claude-3-5-sonnet-20241022-v2:0",
"bedrock/anthropic.claude-3-7-sonnet-20250219-v1:0",
"bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/anthropic.claude-3-opus-20240229-v1:0",
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/anthropic.claude-3-opus-20240229-v1:0",
"bedrock/anthropic.claude-v2:1",
"bedrock/anthropic.claude-v2",
"bedrock/anthropic.claude-instant-v1",
@@ -267,6 +234,8 @@ MODELS = {
"bedrock/ai21.j2-mid-v1",
"bedrock/ai21.j2-ultra-v1",
"bedrock/ai21.jamba-instruct-v1:0",
"bedrock/meta.llama2-13b-chat-v1",
"bedrock/meta.llama2-70b-chat-v1",
"bedrock/mistral.mistral-7b-instruct-v0:2",
"bedrock/mistral.mixtral-8x7b-instruct-v0:1",
],

View File

@@ -257,11 +257,11 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
import os
for root, _, files in os.walk("."):
if crew_path in files:
crew_os_path = os.path.join(root, crew_path)
if "crew.py" in files:
crew_path = os.path.join(root, "crew.py")
try:
spec = importlib.util.spec_from_file_location(
"crew_module", crew_os_path
"crew_module", crew_path
)
if not spec or not spec.loader:
continue
@@ -273,11 +273,9 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
for attr_name in dir(module):
attr = getattr(module, attr_name)
try:
if isinstance(attr, Crew) and hasattr(attr, "kickoff"):
print(
f"Found valid crew object in attribute '{attr_name}' at {crew_os_path}."
)
return attr
if callable(attr) and hasattr(attr, "crew"):
crew_instance = attr().crew()
return crew_instance
except Exception as e:
print(f"Error processing attribute {attr_name}: {e}")

View File

@@ -600,7 +600,6 @@ class Crew(BaseModel):
agent.i18n = i18n
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
agent.set_knowledge(crew_embedder=self.embedder)
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"

View File

@@ -4,7 +4,7 @@ SQLite-based implementation of flow state persistence.
import json
import sqlite3
from datetime import datetime, timezone
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional, Union
@@ -34,7 +34,6 @@ class SQLiteFlowPersistence(FlowPersistence):
ValueError: If db_path is invalid
"""
from crewai.utilities.paths import db_storage_path
# Get path from argument or default location
path = db_path or str(Path(db_storage_path()) / "flow_states.db")
@@ -47,8 +46,7 @@ class SQLiteFlowPersistence(FlowPersistence):
def init_db(self) -> None:
"""Create the necessary tables if they don't exist."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
conn.execute("""
CREATE TABLE IF NOT EXISTS flow_states (
id INTEGER PRIMARY KEY AUTOINCREMENT,
flow_uuid TEXT NOT NULL,
@@ -56,15 +54,12 @@ class SQLiteFlowPersistence(FlowPersistence):
timestamp DATETIME NOT NULL,
state_json TEXT NOT NULL
)
"""
)
""")
# Add index for faster UUID lookups
conn.execute(
"""
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_flow_states_uuid
ON flow_states(flow_uuid)
"""
)
""")
def save_state(
self,
@@ -90,22 +85,19 @@ class SQLiteFlowPersistence(FlowPersistence):
)
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"""
conn.execute("""
INSERT INTO flow_states (
flow_uuid,
method_name,
timestamp,
state_json
) VALUES (?, ?, ?, ?)
""",
(
flow_uuid,
method_name,
datetime.now(timezone.utc).isoformat(),
json.dumps(state_dict),
),
)
""", (
flow_uuid,
method_name,
datetime.utcnow().isoformat(),
json.dumps(state_dict),
))
def load_state(self, flow_uuid: str) -> Optional[Dict[str, Any]]:
"""Load the most recent state for a given flow UUID.
@@ -117,16 +109,13 @@ class SQLiteFlowPersistence(FlowPersistence):
The most recent state as a dictionary, or None if no state exists
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
"""
cursor = conn.execute("""
SELECT state_json
FROM flow_states
WHERE flow_uuid = ?
ORDER BY id DESC
LIMIT 1
""",
(flow_uuid,),
)
""", (flow_uuid,))
row = cursor.fetchone()
if row:

View File

@@ -16,8 +16,7 @@ Example
import ast
import inspect
import textwrap
from collections import defaultdict, deque
from typing import Any, Deque, Dict, List, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set, Union
def get_possible_return_constants(function: Any) -> Optional[List[str]]:
@@ -119,7 +118,7 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
- Processes router paths separately
"""
levels: Dict[str, int] = {}
queue: Deque[str] = deque()
queue: List[str] = []
visited: Set[str] = set()
pending_and_listeners: Dict[str, Set[str]] = {}
@@ -129,35 +128,28 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
levels[method_name] = 0
queue.append(method_name)
# Precompute listener dependencies
or_listeners = defaultdict(list)
and_listeners = defaultdict(set)
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
for method in trigger_methods:
or_listeners[method].append(listener_name)
elif condition_type == "AND":
and_listeners[listener_name] = set(trigger_methods)
# Breadth-first traversal to assign levels
while queue:
current = queue.popleft()
current = queue.pop(0)
current_level = levels[current]
visited.add(current)
for listener_name in or_listeners[current]:
if listener_name not in levels or levels[listener_name] > current_level + 1:
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
for listener_name, required_methods in and_listeners.items():
if current in required_methods:
for listener_name, (condition_type, trigger_methods) in flow._listeners.items():
if condition_type == "OR":
if current in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
elif condition_type == "AND":
if listener_name not in pending_and_listeners:
pending_and_listeners[listener_name] = set()
pending_and_listeners[listener_name].add(current)
if required_methods == pending_and_listeners[listener_name]:
if current in trigger_methods:
pending_and_listeners[listener_name].add(current)
if set(trigger_methods) == pending_and_listeners[listener_name]:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
@@ -167,7 +159,22 @@ def calculate_node_levels(flow: Any) -> Dict[str, int]:
queue.append(listener_name)
# Handle router connections
process_router_paths(flow, current, current_level, levels, queue)
if current in flow._routers:
router_method_name = current
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
return levels
@@ -220,7 +227,10 @@ def build_ancestor_dict(flow: Any) -> Dict[str, Set[str]]:
def dfs_ancestors(
node: str, ancestors: Dict[str, Set[str]], visited: Set[str], flow: Any
node: str,
ancestors: Dict[str, Set[str]],
visited: Set[str],
flow: Any
) -> None:
"""
Perform depth-first search to build ancestor relationships.
@@ -264,9 +274,7 @@ def dfs_ancestors(
dfs_ancestors(listener_name, ancestors, visited, flow)
def is_ancestor(
node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]
) -> bool:
def is_ancestor(node: str, ancestor_candidate: str, ancestors: Dict[str, Set[str]]) -> bool:
"""
Check if one node is an ancestor of another.
@@ -331,9 +339,7 @@ def build_parent_children_dict(flow: Any) -> Dict[str, List[str]]:
return parent_children
def get_child_index(
parent: str, child: str, parent_children: Dict[str, List[str]]
) -> int:
def get_child_index(parent: str, child: str, parent_children: Dict[str, List[str]]) -> int:
"""
Get the index of a child node in its parent's sorted children list.
@@ -354,23 +360,3 @@ def get_child_index(
children = parent_children.get(parent, [])
children.sort()
return children.index(child)
def process_router_paths(flow, current, current_level, levels, queue):
"""
Handle the router connections for the current node.
"""
if current in flow._routers:
paths = flow._router_paths.get(current, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
queue.append(listener_name)

View File

@@ -64,7 +64,6 @@ LLM_CONTEXT_WINDOW_SIZES = {
"gpt-4-turbo": 128000,
"o1-preview": 128000,
"o1-mini": 128000,
"o3-mini": 200000, # Based on official o3-mini specifications
# gemini
"gemini-2.0-flash": 1048576,
"gemini-1.5-pro": 2097152,
@@ -486,23 +485,10 @@ class LLM:
"""
Returns the context window size, using 75% of the maximum to avoid
cutting off messages mid-thread.
Raises:
ValueError: If a model's context window size is outside valid bounds (1024-2097152)
"""
if self.context_window_size != 0:
return self.context_window_size
MIN_CONTEXT = 1024
MAX_CONTEXT = 2097152 # Current max from gemini-1.5-pro
# Validate all context window sizes
for key, value in LLM_CONTEXT_WINDOW_SIZES.items():
if value < MIN_CONTEXT or value > MAX_CONTEXT:
raise ValueError(
f"Context window for {key} must be between {MIN_CONTEXT} and {MAX_CONTEXT}"
)
self.context_window_size = int(
DEFAULT_CONTEXT_WINDOW_SIZE * CONTEXT_WINDOW_USAGE_RATIO
)

View File

@@ -39,8 +39,8 @@
"validation_error": "### Previous attempt failed validation: {guardrail_result_error}\n\n\n### Previous result:\n{task_output}\n\n\nTry again, making sure to address the validation error."
},
"tools": {
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolutely everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolutely everything you know, don't reference things but instead explain them.",
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolute everything you know, don't reference things but instead explain them.",
"add_image": {
"name": "Add image to content",
"description": "See image to understand its content, you can optionally ask a question about the image",

View File

@@ -44,7 +44,6 @@ def create_llm(
# Extract attributes with explicit types
model = (
getattr(llm_value, "model_name", None)
or getattr(llm_value, "model", None)
or getattr(llm_value, "deployment_name", None)
or str(llm_value)
)

View File

@@ -6,7 +6,7 @@ import pytest
from pydantic import BaseModel
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.llm import CONTEXT_WINDOW_USAGE_RATIO, LLM
from crewai.llm import LLM
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
from crewai.utilities.token_counter_callback import TokenCalcHandler
@@ -285,23 +285,6 @@ def test_o3_mini_reasoning_effort_medium():
assert isinstance(result, str)
assert "Paris" in result
def test_context_window_validation():
"""Test that context window validation works correctly."""
# Test valid window size
llm = LLM(model="o3-mini")
assert llm.get_context_window_size() == int(200000 * CONTEXT_WINDOW_USAGE_RATIO)
# Test invalid window size
with pytest.raises(ValueError) as excinfo:
with patch.dict(
"crewai.llm.LLM_CONTEXT_WINDOW_SIZES",
{"test-model": 500}, # Below minimum
clear=True,
):
llm = LLM(model="test-model")
llm.get_context_window_size()
assert "must be between 1024 and 2097152" in str(excinfo.value)
@pytest.mark.vcr(filter_headers=["authorization"])
@pytest.fixture