Compare commits

...

4 Commits

Author SHA1 Message Date
Devin AI
3e914133bd Fix lint issues: remove unused imports from test file
- Remove unused pytest import
- Remove unused patch import
- Remove unused MAX_FILE_NAME_LENGTH import
- All ruff checks now pass locally

Co-Authored-By: João <joao@crewai.com>
2025-07-29 23:03:55 +00:00
Devin AI
f0949f90dd Fix Windows path length issue in memory storage
- Replace agent.role with agent.id (UUID) in RAGStorage directory naming
- UUIDs are guaranteed short (36 chars) and filesystem-safe
- Remove unused _sanitize_role method since UUIDs don't need sanitization
- Add comprehensive tests for Windows path length scenarios
- Add test case for long agent roles in existing memory tests
- Maintains backward compatibility while fixing Windows 260-char limit

Fixes #3236

Co-Authored-By: João <joao@crewai.com>
2025-07-29 23:00:35 +00:00
Lorenze Jay
cb522cf500 Enhance Flow class to support custom flow names (#3234)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Added an optional `name` attribute to the Flow class for better identification.
- Updated event emissions to utilize the new `name` attribute, ensuring accurate flow naming in events.
- Added tests to verify the correct flow name is set and emitted during flow execution.
2025-07-29 15:41:30 -07:00
Vini Brasil
017acc74f5 Add timezone to event timestamps (#3231)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Events were lacking timezone information, making them naive datetimes,
which can be ambiguous.
2025-07-28 17:09:06 -03:00
7 changed files with 280 additions and 19 deletions

View File

@@ -436,6 +436,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
name: Optional[str] = None
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -473,7 +474,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
@@ -769,7 +770,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
)
@@ -792,7 +793,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
result=final_output,
),
)
@@ -834,7 +835,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
@@ -856,7 +857,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
state=self._copy_state(),
result=result,
),
@@ -869,7 +870,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
@@ -1076,7 +1077,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
plot_flow(self, filename)

View File

@@ -44,7 +44,7 @@ class RAGStorage(BaseRAGStorage):
):
super().__init__(type, allow_reset, embedder_config, crew)
agents = crew.agents if crew else []
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = [str(agent.id) for agent in agents]
agents = "_".join(agents)
self.agents = agents
self.storage_file_name = self._build_storage_file_name(type, agents)
@@ -74,11 +74,6 @@ class RAGStorage(BaseRAGStorage):
)
logging.info(f"Collection found or created: {self.collection}")
def _sanitize_role(self, role: str) -> str:
"""
Sanitizes agent roles to ensure valid directory names.
"""
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
def _build_storage_file_name(self, type: str, file_name: str) -> str:
"""

View File

@@ -1,6 +1,5 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
@@ -9,7 +8,7 @@ from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=datetime.now)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"

View File

@@ -755,3 +755,15 @@ def test_multiple_routers_from_same_trigger():
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)
def test_flow_name():
class MyFlow(Flow):
name = "MyFlow"
@start()
def start(self):
return "Hello, world!"
flow = MyFlow()
assert flow.name == "MyFlow"

View File

@@ -160,3 +160,41 @@ def test_save_and_search(short_term_memory):
find = short_term_memory.search("test value", score_threshold=0.01)[0]
assert find["context"] == memory.data, "Data value mismatch."
assert find["metadata"]["agent"] == "test_agent", "Agent value mismatch."
def test_memory_with_long_agent_role():
"""Test that memory works correctly with very long agent roles."""
very_long_role = (
"Senior Equity Research Analyst specializing in corporate fundamentals and industry dynamics "
"with expertise in financial modeling, valuation techniques, and market analysis for "
"technology, healthcare, and consumer discretionary sectors, responsible for generating "
"comprehensive investment recommendations and detailed research reports"
)
agent = Agent(
role=very_long_role,
goal="Search relevant data and provide results",
backstory="You are a researcher at a leading tech think tank.",
tools=[],
verbose=True,
)
task = Task(
description="Perform a search on specific topics.",
expected_output="A list of relevant URLs based on the search query.",
agent=agent,
)
memory = ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
test_data = "Test memory data for long role agent"
test_metadata = {"task": "test_task"}
memory.save(
value=test_data,
metadata=test_metadata,
agent=very_long_role,
)
results = memory.search("Test memory", score_threshold=0.01)
assert isinstance(results, list), "Search should return a list even with long agent roles"

View File

@@ -0,0 +1,187 @@
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.task import Task
def test_long_agent_role_memory_storage():
"""Test that agents with very long roles don't exceed Windows path limits."""
very_long_role = (
"Senior Equity Research Analyst specializing in corporate fundamentals and industry dynamics "
"with expertise in financial modeling, valuation techniques, and market analysis for "
"technology, healthcare, and consumer discretionary sectors, responsible for generating "
"comprehensive investment recommendations and detailed research reports"
)
agent = Agent(
role=very_long_role,
goal="Analyze market trends and provide investment insights",
backstory="You are an experienced financial analyst with deep market knowledge.",
verbose=True,
)
task = Task(
description="Analyze the current market conditions.",
expected_output="A comprehensive market analysis report.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
rag_storage = RAGStorage(type="short_term", crew=crew)
assert len(rag_storage.storage_file_name) <= 260, (
f"Storage path too long: {len(rag_storage.storage_file_name)} characters"
)
assert str(agent.id) in rag_storage.agents, (
"Agent UUID should be used in storage path"
)
def test_multiple_agents_with_long_roles():
"""Test that multiple agents with long roles create valid storage paths."""
long_roles = [
"Senior Investment Advisor specializing in equity portfolio strategy and client tailored recommendations",
"Financial Communications Specialist skilled in distilling complex analysis into concise client-facing investment reports",
"Registered Investment Advisor (RIA) specializing in equity portfolio strategy and client-tailored recommendations"
]
agents = []
for i, role in enumerate(long_roles):
agent = Agent(
role=role,
goal=f"Goal for agent {i+1}",
backstory=f"Backstory for agent {i+1}",
verbose=True,
)
agents.append(agent)
tasks = [
Task(
description=f"Task {i+1}",
expected_output=f"Output {i+1}",
agent=agent,
)
for i, agent in enumerate(agents)
]
crew = Crew(agents=agents, tasks=tasks)
rag_storage = RAGStorage(type="short_term", crew=crew)
assert len(rag_storage.storage_file_name) <= 260, (
f"Storage path too long: {len(rag_storage.storage_file_name)} characters"
)
for agent in agents:
assert str(agent.id) in rag_storage.agents, (
f"Agent UUID {agent.id} should be in storage path"
)
def test_memory_functionality_with_long_roles():
"""Test that memory save/search functionality works with long agent roles."""
long_role = (
"Senior Equity Research Analyst specializing in corporate fundamentals and industry dynamics "
"with expertise in financial modeling, valuation techniques, and market analysis"
)
agent = Agent(
role=long_role,
goal="Analyze market trends",
backstory="You are an experienced analyst.",
verbose=True,
)
task = Task(
description="Analyze market conditions.",
expected_output="Market analysis report.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
rag_storage = RAGStorage(type="short_term", crew=crew)
test_value = "Test memory content for long role agent"
test_metadata = {"task": "test_task", "agent": long_role}
rag_storage.save(value=test_value, metadata=test_metadata)
results = rag_storage.search(query="Test memory", limit=1, score_threshold=0.1)
assert isinstance(results, list), "Search should return a list"
def test_backward_compatibility_short_roles():
"""Test that short agent roles still work correctly."""
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="You are a researcher.",
verbose=True,
)
task = Task(
description="Research a topic.",
expected_output="Research results.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
rag_storage = RAGStorage(type="short_term", crew=crew)
assert str(agent.id) in rag_storage.agents, (
"Agent UUID should be used even for short roles"
)
assert len(rag_storage.storage_file_name) <= 260, (
f"Storage path should be valid: {len(rag_storage.storage_file_name)} characters"
)
def test_uuid_based_path_uniqueness():
"""Test that different agents with same role create different storage paths."""
role = "Data Analyst"
agent1 = Agent(
role=role,
goal="Analyze data",
backstory="You are an analyst.",
verbose=True,
)
agent2 = Agent(
role=role,
goal="Analyze data",
backstory="You are an analyst.",
verbose=True,
)
task1 = Task(
description="Analyze dataset 1.",
expected_output="Analysis 1.",
agent=agent1,
)
task2 = Task(
description="Analyze dataset 2.",
expected_output="Analysis 2.",
agent=agent2,
)
crew1 = Crew(agents=[agent1], tasks=[task1])
crew2 = Crew(agents=[agent2], tasks=[task2])
storage1 = RAGStorage(type="short_term", crew=crew1)
storage2 = RAGStorage(type="short_term", crew=crew2)
assert storage1.agents != storage2.agents, (
"Different agents should create different storage paths even with same role"
)
assert str(agent1.id) in storage1.agents
assert str(agent2.id) in storage2.agents
assert str(agent1.id) != str(agent2.id)

View File

@@ -64,7 +64,8 @@ def base_agent():
llm="gpt-4o-mini",
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
)
)
@pytest.fixture(scope="module")
def base_task(base_agent):
@@ -74,6 +75,7 @@ def base_task(base_agent):
agent=base_agent,
)
event_listener = EventListener()
@@ -448,6 +450,27 @@ def test_flow_emits_start_event():
assert received_events[0].type == "flow_started"
def test_flow_name_emitted_to_event_bus():
received_events = []
class MyFlowClass(Flow):
name = "PRODUCTION_FLOW"
@start()
def start(self):
return "Hello, world!"
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
flow = MyFlowClass()
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].flow_name == "PRODUCTION_FLOW"
def test_flow_emits_finish_event():
received_events = []
@@ -756,6 +779,7 @@ def test_streaming_empty_response_handling():
received_chunks = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
@@ -793,6 +817,7 @@ def test_streaming_empty_response_handling():
# Restore the original method
llm.call = original_call
@pytest.mark.vcr(filter_headers=["authorization"])
def test_stream_llm_emits_event_with_task_and_agent_info():
completed_event = []
@@ -801,6 +826,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -827,7 +853,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
description="Just say hi",
expected_output="hi",
llm=LLM(model="gpt-4o-mini", stream=True),
agent=agent
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
@@ -855,6 +881,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
assert set(all_task_id) == {task.id}
assert set(all_task_name) == {task.name}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
completed_event = []
@@ -863,6 +890,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -904,6 +932,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
assert set(all_task_id) == {base_task.id}
assert set(all_task_name) == {base_task.name}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_event_with_lite_agent():
completed_event = []
@@ -912,6 +941,7 @@ def test_llm_emits_event_with_lite_agent():
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -936,7 +966,6 @@ def test_llm_emits_event_with_lite_agent():
)
agent.kickoff(messages=[{"role": "user", "content": "say hi!"}])
assert len(completed_event) == 2
assert len(failed_event) == 0
assert len(started_event) == 2