mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-29 22:32:36 +00:00
Compare commits
4 Commits
devin/1753
...
devin/1753
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3e914133bd | ||
|
|
f0949f90dd | ||
|
|
cb522cf500 | ||
|
|
017acc74f5 |
@@ -436,6 +436,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
_routers: Set[str] = set()
|
||||
_router_paths: Dict[str, List[str]] = {}
|
||||
initial_state: Union[Type[T], T, None] = None
|
||||
name: Optional[str] = None
|
||||
|
||||
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
|
||||
class _FlowGeneric(cls): # type: ignore
|
||||
@@ -473,7 +474,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowCreatedEvent(
|
||||
type="flow_created",
|
||||
flow_name=self.__class__.__name__,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -769,7 +770,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowStartedEvent(
|
||||
type="flow_started",
|
||||
flow_name=self.__class__.__name__,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
inputs=inputs,
|
||||
),
|
||||
)
|
||||
@@ -792,7 +793,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowFinishedEvent(
|
||||
type="flow_finished",
|
||||
flow_name=self.__class__.__name__,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
result=final_output,
|
||||
),
|
||||
)
|
||||
@@ -834,7 +835,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionStartedEvent(
|
||||
type="method_execution_started",
|
||||
method_name=method_name,
|
||||
flow_name=self.__class__.__name__,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
params=dumped_params,
|
||||
state=self._copy_state(),
|
||||
),
|
||||
@@ -856,7 +857,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionFinishedEvent(
|
||||
type="method_execution_finished",
|
||||
method_name=method_name,
|
||||
flow_name=self.__class__.__name__,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
state=self._copy_state(),
|
||||
result=result,
|
||||
),
|
||||
@@ -869,7 +870,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
MethodExecutionFailedEvent(
|
||||
type="method_execution_failed",
|
||||
method_name=method_name,
|
||||
flow_name=self.__class__.__name__,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
error=e,
|
||||
),
|
||||
)
|
||||
@@ -1076,7 +1077,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self,
|
||||
FlowPlotEvent(
|
||||
type="flow_plot",
|
||||
flow_name=self.__class__.__name__,
|
||||
flow_name=self.name or self.__class__.__name__,
|
||||
),
|
||||
)
|
||||
plot_flow(self, filename)
|
||||
|
||||
@@ -44,7 +44,7 @@ class RAGStorage(BaseRAGStorage):
|
||||
):
|
||||
super().__init__(type, allow_reset, embedder_config, crew)
|
||||
agents = crew.agents if crew else []
|
||||
agents = [self._sanitize_role(agent.role) for agent in agents]
|
||||
agents = [str(agent.id) for agent in agents]
|
||||
agents = "_".join(agents)
|
||||
self.agents = agents
|
||||
self.storage_file_name = self._build_storage_file_name(type, agents)
|
||||
@@ -74,11 +74,6 @@ class RAGStorage(BaseRAGStorage):
|
||||
)
|
||||
logging.info(f"Collection found or created: {self.collection}")
|
||||
|
||||
def _sanitize_role(self, role: str) -> str:
|
||||
"""
|
||||
Sanitizes agent roles to ensure valid directory names.
|
||||
"""
|
||||
return role.replace("\n", "").replace(" ", "_").replace("/", "_")
|
||||
|
||||
def _build_storage_file_name(self, type: str, file_name: str) -> str:
|
||||
"""
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.utilities.serialization import to_serializable
|
||||
@@ -9,7 +8,7 @@ from crewai.utilities.serialization import to_serializable
|
||||
class BaseEvent(BaseModel):
|
||||
"""Base class for all events"""
|
||||
|
||||
timestamp: datetime = Field(default_factory=datetime.now)
|
||||
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
||||
type: str
|
||||
source_fingerprint: Optional[str] = None # UUID string of the source entity
|
||||
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"
|
||||
|
||||
@@ -755,3 +755,15 @@ def test_multiple_routers_from_same_trigger():
|
||||
assert execution_order.index("anemia_analysis") > execution_order.index(
|
||||
"anemia_router"
|
||||
)
|
||||
|
||||
|
||||
def test_flow_name():
|
||||
class MyFlow(Flow):
|
||||
name = "MyFlow"
|
||||
|
||||
@start()
|
||||
def start(self):
|
||||
return "Hello, world!"
|
||||
|
||||
flow = MyFlow()
|
||||
assert flow.name == "MyFlow"
|
||||
|
||||
@@ -160,3 +160,41 @@ def test_save_and_search(short_term_memory):
|
||||
find = short_term_memory.search("test value", score_threshold=0.01)[0]
|
||||
assert find["context"] == memory.data, "Data value mismatch."
|
||||
assert find["metadata"]["agent"] == "test_agent", "Agent value mismatch."
|
||||
|
||||
|
||||
def test_memory_with_long_agent_role():
|
||||
"""Test that memory works correctly with very long agent roles."""
|
||||
very_long_role = (
|
||||
"Senior Equity Research Analyst specializing in corporate fundamentals and industry dynamics "
|
||||
"with expertise in financial modeling, valuation techniques, and market analysis for "
|
||||
"technology, healthcare, and consumer discretionary sectors, responsible for generating "
|
||||
"comprehensive investment recommendations and detailed research reports"
|
||||
)
|
||||
|
||||
agent = Agent(
|
||||
role=very_long_role,
|
||||
goal="Search relevant data and provide results",
|
||||
backstory="You are a researcher at a leading tech think tank.",
|
||||
tools=[],
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Perform a search on specific topics.",
|
||||
expected_output="A list of relevant URLs based on the search query.",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
memory = ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
|
||||
|
||||
test_data = "Test memory data for long role agent"
|
||||
test_metadata = {"task": "test_task"}
|
||||
|
||||
memory.save(
|
||||
value=test_data,
|
||||
metadata=test_metadata,
|
||||
agent=very_long_role,
|
||||
)
|
||||
|
||||
results = memory.search("Test memory", score_threshold=0.01)
|
||||
assert isinstance(results, list), "Search should return a list even with long agent roles"
|
||||
|
||||
187
tests/memory/test_windows_path_length.py
Normal file
187
tests/memory/test_windows_path_length.py
Normal file
@@ -0,0 +1,187 @@
|
||||
from crewai.agent import Agent
|
||||
from crewai.crew import Crew
|
||||
from crewai.memory.storage.rag_storage import RAGStorage
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
def test_long_agent_role_memory_storage():
|
||||
"""Test that agents with very long roles don't exceed Windows path limits."""
|
||||
very_long_role = (
|
||||
"Senior Equity Research Analyst specializing in corporate fundamentals and industry dynamics "
|
||||
"with expertise in financial modeling, valuation techniques, and market analysis for "
|
||||
"technology, healthcare, and consumer discretionary sectors, responsible for generating "
|
||||
"comprehensive investment recommendations and detailed research reports"
|
||||
)
|
||||
|
||||
agent = Agent(
|
||||
role=very_long_role,
|
||||
goal="Analyze market trends and provide investment insights",
|
||||
backstory="You are an experienced financial analyst with deep market knowledge.",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Analyze the current market conditions.",
|
||||
expected_output="A comprehensive market analysis report.",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
rag_storage = RAGStorage(type="short_term", crew=crew)
|
||||
|
||||
assert len(rag_storage.storage_file_name) <= 260, (
|
||||
f"Storage path too long: {len(rag_storage.storage_file_name)} characters"
|
||||
)
|
||||
|
||||
assert str(agent.id) in rag_storage.agents, (
|
||||
"Agent UUID should be used in storage path"
|
||||
)
|
||||
|
||||
|
||||
def test_multiple_agents_with_long_roles():
|
||||
"""Test that multiple agents with long roles create valid storage paths."""
|
||||
long_roles = [
|
||||
"Senior Investment Advisor specializing in equity portfolio strategy and client tailored recommendations",
|
||||
"Financial Communications Specialist skilled in distilling complex analysis into concise client-facing investment reports",
|
||||
"Registered Investment Advisor (RIA) specializing in equity portfolio strategy and client-tailored recommendations"
|
||||
]
|
||||
|
||||
agents = []
|
||||
for i, role in enumerate(long_roles):
|
||||
agent = Agent(
|
||||
role=role,
|
||||
goal=f"Goal for agent {i+1}",
|
||||
backstory=f"Backstory for agent {i+1}",
|
||||
verbose=True,
|
||||
)
|
||||
agents.append(agent)
|
||||
|
||||
tasks = [
|
||||
Task(
|
||||
description=f"Task {i+1}",
|
||||
expected_output=f"Output {i+1}",
|
||||
agent=agent,
|
||||
)
|
||||
for i, agent in enumerate(agents)
|
||||
]
|
||||
|
||||
crew = Crew(agents=agents, tasks=tasks)
|
||||
|
||||
rag_storage = RAGStorage(type="short_term", crew=crew)
|
||||
|
||||
assert len(rag_storage.storage_file_name) <= 260, (
|
||||
f"Storage path too long: {len(rag_storage.storage_file_name)} characters"
|
||||
)
|
||||
|
||||
for agent in agents:
|
||||
assert str(agent.id) in rag_storage.agents, (
|
||||
f"Agent UUID {agent.id} should be in storage path"
|
||||
)
|
||||
|
||||
|
||||
def test_memory_functionality_with_long_roles():
|
||||
"""Test that memory save/search functionality works with long agent roles."""
|
||||
long_role = (
|
||||
"Senior Equity Research Analyst specializing in corporate fundamentals and industry dynamics "
|
||||
"with expertise in financial modeling, valuation techniques, and market analysis"
|
||||
)
|
||||
|
||||
agent = Agent(
|
||||
role=long_role,
|
||||
goal="Analyze market trends",
|
||||
backstory="You are an experienced analyst.",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Analyze market conditions.",
|
||||
expected_output="Market analysis report.",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
rag_storage = RAGStorage(type="short_term", crew=crew)
|
||||
|
||||
test_value = "Test memory content for long role agent"
|
||||
test_metadata = {"task": "test_task", "agent": long_role}
|
||||
|
||||
rag_storage.save(value=test_value, metadata=test_metadata)
|
||||
|
||||
results = rag_storage.search(query="Test memory", limit=1, score_threshold=0.1)
|
||||
|
||||
assert isinstance(results, list), "Search should return a list"
|
||||
|
||||
|
||||
def test_backward_compatibility_short_roles():
|
||||
"""Test that short agent roles still work correctly."""
|
||||
agent = Agent(
|
||||
role="Researcher",
|
||||
goal="Research topics",
|
||||
backstory="You are a researcher.",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
task = Task(
|
||||
description="Research a topic.",
|
||||
expected_output="Research results.",
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
|
||||
rag_storage = RAGStorage(type="short_term", crew=crew)
|
||||
|
||||
assert str(agent.id) in rag_storage.agents, (
|
||||
"Agent UUID should be used even for short roles"
|
||||
)
|
||||
|
||||
assert len(rag_storage.storage_file_name) <= 260, (
|
||||
f"Storage path should be valid: {len(rag_storage.storage_file_name)} characters"
|
||||
)
|
||||
|
||||
|
||||
def test_uuid_based_path_uniqueness():
|
||||
"""Test that different agents with same role create different storage paths."""
|
||||
role = "Data Analyst"
|
||||
|
||||
agent1 = Agent(
|
||||
role=role,
|
||||
goal="Analyze data",
|
||||
backstory="You are an analyst.",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
agent2 = Agent(
|
||||
role=role,
|
||||
goal="Analyze data",
|
||||
backstory="You are an analyst.",
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
task1 = Task(
|
||||
description="Analyze dataset 1.",
|
||||
expected_output="Analysis 1.",
|
||||
agent=agent1,
|
||||
)
|
||||
|
||||
task2 = Task(
|
||||
description="Analyze dataset 2.",
|
||||
expected_output="Analysis 2.",
|
||||
agent=agent2,
|
||||
)
|
||||
|
||||
crew1 = Crew(agents=[agent1], tasks=[task1])
|
||||
crew2 = Crew(agents=[agent2], tasks=[task2])
|
||||
|
||||
storage1 = RAGStorage(type="short_term", crew=crew1)
|
||||
storage2 = RAGStorage(type="short_term", crew=crew2)
|
||||
|
||||
assert storage1.agents != storage2.agents, (
|
||||
"Different agents should create different storage paths even with same role"
|
||||
)
|
||||
|
||||
assert str(agent1.id) in storage1.agents
|
||||
assert str(agent2.id) in storage2.agents
|
||||
assert str(agent1.id) != str(agent2.id)
|
||||
@@ -64,7 +64,8 @@ def base_agent():
|
||||
llm="gpt-4o-mini",
|
||||
goal="Just say hi",
|
||||
backstory="You are a helpful assistant that just says hi",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def base_task(base_agent):
|
||||
@@ -74,6 +75,7 @@ def base_task(base_agent):
|
||||
agent=base_agent,
|
||||
)
|
||||
|
||||
|
||||
event_listener = EventListener()
|
||||
|
||||
|
||||
@@ -448,6 +450,27 @@ def test_flow_emits_start_event():
|
||||
assert received_events[0].type == "flow_started"
|
||||
|
||||
|
||||
def test_flow_name_emitted_to_event_bus():
|
||||
received_events = []
|
||||
|
||||
class MyFlowClass(Flow):
|
||||
name = "PRODUCTION_FLOW"
|
||||
|
||||
@start()
|
||||
def start(self):
|
||||
return "Hello, world!"
|
||||
|
||||
@crewai_event_bus.on(FlowStartedEvent)
|
||||
def handle_flow_start(source, event):
|
||||
received_events.append(event)
|
||||
|
||||
flow = MyFlowClass()
|
||||
flow.kickoff()
|
||||
|
||||
assert len(received_events) == 1
|
||||
assert received_events[0].flow_name == "PRODUCTION_FLOW"
|
||||
|
||||
|
||||
def test_flow_emits_finish_event():
|
||||
received_events = []
|
||||
|
||||
@@ -756,6 +779,7 @@ def test_streaming_empty_response_handling():
|
||||
received_chunks = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(LLMStreamChunkEvent)
|
||||
def handle_stream_chunk(source, event):
|
||||
received_chunks.append(event.chunk)
|
||||
@@ -793,6 +817,7 @@ def test_streaming_empty_response_handling():
|
||||
# Restore the original method
|
||||
llm.call = original_call
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_stream_llm_emits_event_with_task_and_agent_info():
|
||||
completed_event = []
|
||||
@@ -801,6 +826,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
|
||||
stream_event = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(LLMCallFailedEvent)
|
||||
def handle_llm_failed(source, event):
|
||||
failed_event.append(event)
|
||||
@@ -827,7 +853,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
|
||||
description="Just say hi",
|
||||
expected_output="hi",
|
||||
llm=LLM(model="gpt-4o-mini", stream=True),
|
||||
agent=agent
|
||||
agent=agent,
|
||||
)
|
||||
|
||||
crew = Crew(agents=[agent], tasks=[task])
|
||||
@@ -855,6 +881,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
|
||||
assert set(all_task_id) == {task.id}
|
||||
assert set(all_task_name) == {task.name}
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
|
||||
completed_event = []
|
||||
@@ -863,6 +890,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
|
||||
stream_event = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(LLMCallFailedEvent)
|
||||
def handle_llm_failed(source, event):
|
||||
failed_event.append(event)
|
||||
@@ -904,6 +932,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
|
||||
assert set(all_task_id) == {base_task.id}
|
||||
assert set(all_task_name) == {base_task.name}
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_llm_emits_event_with_lite_agent():
|
||||
completed_event = []
|
||||
@@ -912,6 +941,7 @@ def test_llm_emits_event_with_lite_agent():
|
||||
stream_event = []
|
||||
|
||||
with crewai_event_bus.scoped_handlers():
|
||||
|
||||
@crewai_event_bus.on(LLMCallFailedEvent)
|
||||
def handle_llm_failed(source, event):
|
||||
failed_event.append(event)
|
||||
@@ -936,7 +966,6 @@ def test_llm_emits_event_with_lite_agent():
|
||||
)
|
||||
agent.kickoff(messages=[{"role": "user", "content": "say hi!"}])
|
||||
|
||||
|
||||
assert len(completed_event) == 2
|
||||
assert len(failed_event) == 0
|
||||
assert len(started_event) == 2
|
||||
|
||||
Reference in New Issue
Block a user