Compare commits

..

47 Commits

Author SHA1 Message Date
Lorenze Jay
a88e910a7a test: Improve flow persistence test cases and logging 2025-02-19 13:40:32 -08:00
Lorenze Jay
73ee7ce1c9 Merge branch 'better/event-emitter' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-19 13:24:28 -08:00
Lorenze Jay
74727592bc Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-19 13:24:26 -08:00
Lorenze Jay
5a022fe6c0 Merge branch 'main' into better/event-emitter 2025-02-19 13:15:01 -08:00
Lorenze Jay
34f5469490 refactor: clean up and organize imports in llm and flow modules 2025-02-19 08:18:48 -08:00
Lorenze Jay
f68a85a6f5 Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-19 08:13:28 -08:00
Lorenze Jay
390031026a Remove ToolUsageStartedEvent emission in tool usage process
- Remove unnecessary event emission for tool usage start
- Simplify tool usage event handling
- Eliminate redundant event data preparation step
2025-02-18 15:07:23 -08:00
Lorenze Jay
ae4c4cffc4 Improve test_validate_tool_input_invalid_input with mock objects
- Add explicit mock objects for agent and action in test case
- Ensure proper string values for mock agent and action attributes
- Simplify test setup for ToolUsage validation method
2025-02-18 14:50:52 -08:00
Lorenze Jay
b623960a94 Improve type hinting for TaskCompletedEvent handler
- Add explicit type annotation for TaskCompletedEvent in event_listener.py
- Enhance type safety for event handling in EventListener
2025-02-18 14:48:35 -08:00
Lorenze Jay
d368efdeda Update AgentExecutionStartedEvent to use task_prompt
- Modify test_events.py to use task_prompt instead of inputs
- Simplify event input validation in test case
- Align with recent event system refactoring
2025-02-18 14:35:12 -08:00
Lorenze Jay
4dc258a590 Refactor task events to use base CrewEvent
- Move CrewEvent import from crew_events to base_events
- Remove unnecessary blank lines in task_events.py
- Simplify event class structure for task-related events
2025-02-18 14:30:11 -08:00
Lorenze Jay
c64c0698c5 Refactor event system and improve crew testing
- Extract base CrewEvent class to a new base_events.py module
- Update event imports across multiple event-related files
- Modify CrewTestStartedEvent to use eval_llm instead of openai_model_name
- Add LLM creation validation in crew testing method
- Improve type handling and event consistency
2025-02-18 14:29:39 -08:00
Lorenze Jay
6fea26d223 Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-18 14:18:24 -08:00
Lorenze Jay
1b5cc08abe Enhance event handling for tool usage and agent execution
- Add new events for tool usage: ToolSelectionErrorEvent, ToolValidateInputErrorEvent
- Improve error tracking and event emission in ToolUsage and LLM classes
- Update AgentExecutionStartedEvent to use task_prompt instead of inputs
- Add comprehensive test coverage for new event types and error scenarios
2025-02-18 14:13:18 -08:00
Lorenze Jay
e9dc68723f Remove RunType enum and clean up crew events module
- Delete unused RunType enum from crew_events.py
- Simplify crew_events.py by removing unnecessary enum definition
- Improve code clarity by removing unneeded imports
2025-02-18 09:14:47 -08:00
Lorenze Jay
d0f9abaa85 Add FlowPlotEvent and update event bus to support flow plotting
- Introduce FlowPlotEvent to track flow plotting events
- Replace Telemetry method with event bus emission in Flow.plot()
- Update event bus to support new FlowPlotEvent type
- Add test case to validate flow plotting event emission
2025-02-18 09:10:27 -08:00
Lorenze Jay
935da884ed Enhance EventListener with singleton pattern and color configuration
- Implement singleton pattern for EventListener to ensure single instance
- Add default color configuration using EMITTER_COLOR from constants
- Modify log method calls to use default color and remove redundant color parameters
- Improve initialization logic to prevent multiple initializations
2025-02-18 08:53:41 -08:00
Lorenze Jay
64569ce130 Rename event_bus to crewai_event_bus for improved clarity and specificity
- Replace all references to `event_bus` with `crewai_event_bus`
- Update import statements across multiple files
- Remove the old `event_bus.py` file
- Maintain existing event handling functionality
2025-02-18 08:36:21 -08:00
Lorenze Jay
1603a1d9ac Update test_events to validate multiple tool usage events
- Modify test to assert 75 events instead of a single error event
- Remove pytest.raises() check, allowing crew kickoff to complete
- Adjust event validation to support broader event tracking
2025-02-14 16:07:15 -08:00
Lorenze Jay
6d1bcff6d1 Improve AgentOps listener type hints and formatting
- Add string type hints for AgentOps classes to resolve potential import issues
- Clean up unnecessary whitespace and improve code indentation
- Simplify initialization and event handling logic
2025-02-14 16:00:30 -08:00
Lorenze Jay
aa2e7c888e Update test_events to validate tool usage error event handling
- Modify test to assert single error event with correct attributes
- Use pytest.raises() to verify error event generation
- Simplify error event validation in test case
2025-02-14 15:57:38 -08:00
Lorenze Jay
ec048cf6fe Refactor AgentOps event listener for crew-level tracking
- Modify AgentOpsListener to handle crew-level events
- Initialize and end AgentOps session at crew kickoff and completion
- Create agents for each crew member during session initialization
- Improve session management and event recording
- Clean up and simplify event handling logic
2025-02-14 15:49:42 -08:00
Lorenze Jay
18791eadd3 dont forget crew level 2025-02-14 14:50:38 -08:00
Lorenze Jay
6eab0e3d3b moving to dedicated eventlistener 2025-02-14 14:49:34 -08:00
Lorenze Jay
fe7c8b2049 Reorder and clean up event imports in event_listener
- Reorganize imports for tool usage events and other event types
- Maintain consistent import ordering and remove unused imports
- Ensure clean and organized import structure in event_listener module
2025-02-14 09:26:41 -08:00
Lorenze Jay
1c2903abea Add event handling for tool usage events
- Introduce event listeners for ToolUsageFinishedEvent and ToolUsageErrorEvent
- Log tool usage events with descriptive emoji icons ( and )
- Update event_listener to track and log tool usage lifecycle
2025-02-14 09:26:18 -08:00
Lorenze Jay
f4547648b4 Enable test coverage for Flow method execution failure event
- Uncomment pytest.raises() in test_events to verify exception handling
- Ensure test validates MethodExecutionFailedEvent emission during flow kickoff
2025-02-14 09:14:37 -08:00
Lorenze Jay
a557275112 Propagate method execution failures in Flow class
- Modify Flow class to re-raise exceptions after emitting MethodExecutionFailedEvent
- Reorder MethodExecutionFailedEvent import to maintain consistent import style
2025-02-14 09:10:56 -08:00
Lorenze Jay
e17159f877 Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-14 09:06:32 -08:00
Lorenze Jay
7d168d6d61 Add MethodExecutionFailedEvent to handle flow method execution failures
- Introduce new MethodExecutionFailedEvent in flow_events module
- Update Flow class to catch and emit method execution failures
- Add event listener for method execution failure events
- Update event-related imports to include new event type
- Enhance test coverage for method execution failure handling
2025-02-14 09:00:16 -08:00
Lorenze Jay
766422dd5e Update crew test verbose output with improved emoji icons
- Replace task and agent completion icons from 👍 to 
- Enhance readability of test output logging
- Maintain consistent test coverage for crew verbose output
2025-02-14 08:36:29 -08:00
Lorenze Jay
3e3e68ed75 Update crew test to validate verbose output and kickoff_for_each method
- Enhance test_crew_verbose_output to check specific listener log messages
- Modify test_kickoff_for_each_invalid_input to use Pydantic validation error
- Improve test coverage for crew logging and input validation
2025-02-14 08:34:18 -08:00
Lorenze Jay
43064e2a0e Clean up unused imports and event-related code
- Remove unused imports from various event and flow-related files
- Reorder event imports to follow standard conventions
- Remove unnecessary event type references
- Simplify import statements in event and flow modules
2025-02-13 18:07:43 -08:00
Lorenze Jay
184d08e6e7 Remove telemetry and tracing dependencies from Task and Flow classes
- Remove telemetry-related imports and private attributes from Task class
- Remove `_telemetry` attribute from Flow class
- Update event handling to emit events without direct telemetry tracking
- Simplify task and flow execution by removing explicit telemetry spans
- Move telemetry-related event handling to EventListener
2025-02-13 17:54:45 -08:00
Lorenze Jay
00a98cd5c9 Enhance event handling for Crew, Task, and Event classes
- Add crew name to failed event types (CrewKickoffFailedEvent, CrewTrainFailedEvent, CrewTestFailedEvent)
- Update Task events to remove redundant task and context attributes
- Refactor EventListener to use Logger for consistent event logging
- Add new event types for Crew train and test events
- Improve event bus event tracking in test cases
2025-02-13 12:01:18 -08:00
Lorenze Jay
62a20426a5 Refactor Flow and Agent event handling to use event_bus
- Remove `event_emitter` from Flow class and replace with `event_bus.emit()`
- Update Flow and Agent tests to use event_bus event listeners
- Remove redundant event emissions in Flow methods
- Add debug print statements in Flow execution
- Simplify event tracking in test cases
2025-02-13 10:54:58 -08:00
Lorenze Jay
097ed1f0df Fix tool usage and event import handling
- Update tool usage to use `.get()` method when checking tool name
- Remove unnecessary `__all__` export list in events/__init__.py
2025-02-12 16:26:15 -08:00
Lorenze Jay
fa5d7a2e05 Add default model for CrewEvaluator and fix event import order
- Set default model to "gpt-4o-mini" in CrewEvaluator when no model is specified
- Reorder event-related imports in task.py to follow standard import conventions
- Update event bus initialization method return type hint
- Export event_bus in events/__init__.py
2025-02-12 16:23:05 -08:00
Lorenze Jay
779db3c3dd Refactor event classes to improve type safety and naming consistency
- Rename event classes to have explicit 'Event' suffix (e.g., TaskStartedEvent)
- Update import statements and references across multiple files
- Remove deprecated events.py module
- Enhance event type hints and configurations
- Clean up unnecessary event-related code
2025-02-12 16:17:52 -08:00
Lorenze Jay
9debd3a6da Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-12 15:47:50 -08:00
Lorenze Jay
1250388635 Enhance event system type safety and error handling
- Improve type annotations for event bus and event types
- Add null checks for agent and task in event emissions
- Update import paths for base tool and base agent
- Refactor event listener type hints
- Remove unnecessary print statements
- Update test configurations to match new event handling
2025-02-12 15:46:56 -08:00
Lorenze Jay
25453f7cb1 Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-12 10:41:48 -08:00
Lorenze Jay
f70162c064 Refactor event system and add third-party event listeners
- Move event_bus import to correct module paths
- Introduce BaseEventListener abstract base class
- Add AgentOpsListener for third-party event tracking
- Update event listener initialization and setup
- Clean up event-related imports and exports
2025-02-12 10:29:27 -08:00
Lorenze Jay
3a89b9feab Add event emission for agent execution lifecycle
- Emit AgentExecutionStarted and AgentExecutionError events
- Update CrewAgentExecutor to use event_bus for tracking agent execution
- Refactor error handling to include event emission
- Minor code formatting improvements in task.py and crew_agent_executor.py
- Fix a typo in test file
2025-02-11 14:35:55 -08:00
Lorenze Jay
9eb5b361dd Merge branch 'main' of github.com:crewAIInc/crewAI into better/event-emitter 2025-02-11 14:33:08 -08:00
Lorenze Jay
676cabfdd6 Refactor event handling and introduce new event types
- Migrate from global `emit` function to `event_bus.emit`
- Add new event types for task failures, tool usage, and agent execution
- Update event listeners and event bus to support more granular event tracking
- Remove deprecated event emission methods
- Improve event type consistency and add more detailed event information
2025-02-11 14:31:50 -08:00
Lorenze Jay
95bae8bba3 WIP crew events emitter 2025-02-06 11:06:43 -08:00
2 changed files with 18 additions and 129 deletions

View File

@@ -873,9 +873,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
Executes all listeners and routers triggered by a method completion.
This internal method manages the execution flow by:
1. First executing all triggered routers sequentially and collecting their paths
2. Then executing listeners for each router path in parallel
3. Finally executing listeners for the original trigger method
1. First executing all triggered routers sequentially
2. Then executing all triggered listeners in parallel
Parameters
----------
@@ -888,45 +887,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Routers are executed sequentially to maintain flow control
- Each router's result is collected and processed independently
- Each router's result becomes the new trigger_method
- Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter
"""
# First, execute all routers for the trigger method
router_paths = [] # Store all router paths
routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True
)
print(f"Found routers for {trigger_method}: {routers_triggered}")
# Execute all routers and collect their results
for router_name in routers_triggered:
print(f"Executing router: {router_name}")
await self._execute_single_listener(router_name, result)
# After executing router, add its result to paths
router_result = self._method_outputs[-1]
print(f"Router {router_name} result: {router_result}")
if router_result: # Only add non-None results
router_paths.append(router_result)
# Process all router paths
print(f"Processing router paths: {router_paths}")
for path in router_paths:
# Execute normal listeners for each router path
listeners_triggered = self._find_triggered_methods(
path, router_only=False
# First, handle routers repeatedly until no router triggers anymore
while True:
routers_triggered = self._find_triggered_methods(
trigger_method, router_only=True
)
print(f"Found listeners for path {path}: {listeners_triggered}")
if listeners_triggered:
print(f"Executing listeners for path {path}")
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
print(f"Finished executing listeners for path {path}")
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
# The last router executed sets the trigger_method
# The router result is the last element in self._method_outputs
trigger_method = self._method_outputs[-1]
# Now execute normal listeners for the original trigger method
# Now that no more routers are triggered by current trigger_method,
# execute normal listeners
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False
)

View File

@@ -2,7 +2,6 @@
import asyncio
from datetime import datetime
from typing import Any
import pytest
from pydantic import BaseModel
@@ -10,12 +9,12 @@ from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.utilities.events import (
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
crewai_event_bus,
)
from crewai.utilities.events.flow_events import FlowPlotEvent
def test_simple_sequential_flow():
@@ -410,96 +409,6 @@ def test_router_with_multiple_conditions():
assert execution_order.index("log_final_step") > execution_order.index("router_and")
@pytest.fixture(autouse=True)
def cleanup_event_bus():
"""Clean up event bus after each test."""
crewai_event_bus._handlers = {}
yield
crewai_event_bus._handlers = {}
@pytest.mark.asyncio
async def test_multiple_concurrent_routers():
"""Test that multiple routers triggered by the same method all execute their events."""
execution_order = []
# Setup event handlers
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source: Any, event: FlowStartedEvent):
pass
@crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_start(source: Any, event: MethodExecutionStartedEvent):
pass
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def handle_method_end(source: Any, event: MethodExecutionFinishedEvent):
pass
@crewai_event_bus.on(FlowFinishedEvent)
def handle_flow_end(source: Any, event: FlowFinishedEvent):
pass
class MultiRouterFlow(Flow):
def __init__(self):
print("Initializing MultiRouterFlow")
super().__init__(diagnosed_conditions="ABCDH")
print(f"State after init: {self._state}")
@start()
async def diagnose_conditions(self):
print("Running diagnose_conditions")
execution_order.append("diagnose_conditions")
print(f"State in diagnose_conditions: {self._state}")
return self._state.get("diagnosed_conditions", "")
@router(diagnose_conditions)
async def diabetes_router(self):
execution_order.append("diabetes_router")
conditions = self._state.get("diagnosed_conditions", "")
print(f"Checking diabetes condition in: {conditions}")
if "D" in conditions:
return "diabetes"
return None
@router(diagnose_conditions)
async def hypertension_router(self):
execution_order.append("hypertension_router")
conditions = self._state.get("diagnosed_conditions", "")
print(f"Checking hypertension condition in: {conditions}")
if "H" in conditions:
return "hypertension"
return None
@listen("diabetes")
async def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
@listen("hypertension")
async def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
flow = MultiRouterFlow()
try:
await asyncio.wait_for(flow.kickoff_async(), timeout=10.0)
except asyncio.TimeoutError:
print("Flow execution timed out")
pytest.fail("Flow execution timed out")
print("Execution order:", execution_order)
# Verify both routers and their listeners executed
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
# Verify execution order is correct
assert execution_order.index("diagnose_conditions") < execution_order.index("diabetes_router")
assert execution_order.index("diagnose_conditions") < execution_order.index("hypertension_router")
assert execution_order.index("diabetes_router") < execution_order.index("diabetes_analysis")
assert execution_order.index("hypertension_router") < execution_order.index("hypertension_analysis")
def test_unstructured_flow_event_emission():
"""Test that the correct events are emitted during unstructured flow
execution with all fields validated."""