diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 2c7f583b9..ebd0461e2 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -732,6 +732,7 @@ class Crew(FlowTrackable, BaseModel): ) raise finally: + crewai_event_bus.flush() detach(token) def kickoff_for_each( @@ -880,6 +881,7 @@ class Crew(FlowTrackable, BaseModel): ) raise finally: + await asyncio.to_thread(crewai_event_bus.flush) detach(token) async def akickoff_for_each( diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index e4993a5d4..e537c41f3 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -80,6 +80,8 @@ class CrewAIEventsBus: _execution_plan_cache: dict[type[BaseEvent], ExecutionPlan] _console: ConsoleFormatter _shutting_down: bool + _pending_futures: set[Future[Any]] + _futures_lock: threading.Lock def __new__(cls) -> Self: """Create or return the singleton instance. @@ -102,6 +104,8 @@ class CrewAIEventsBus: """ self._shutting_down = False self._rwlock = RWLock() + self._pending_futures: set[Future[Any]] = set() + self._futures_lock = threading.Lock() self._sync_handlers: dict[type[BaseEvent], SyncHandlerSet] = {} self._async_handlers: dict[type[BaseEvent], AsyncHandlerSet] = {} self._handler_dependencies: dict[ @@ -122,6 +126,25 @@ class CrewAIEventsBus: ) self._loop_thread.start() + def _track_future(self, future: Future[Any]) -> Future[Any]: + """Track a future and set up automatic cleanup when it completes. + + Args: + future: The future to track + + Returns: + The same future for chaining + """ + with self._futures_lock: + self._pending_futures.add(future) + + def _cleanup(f: Future[Any]) -> None: + with self._futures_lock: + self._pending_futures.discard(f) + + future.add_done_callback(_cleanup) + return future + def _run_loop(self) -> None: """Run the background async event loop.""" asyncio.set_event_loop(self._loop) @@ -369,9 +392,11 @@ class CrewAIEventsBus: async_handlers = self._async_handlers.get(event_type, frozenset()) if has_dependencies: - return asyncio.run_coroutine_threadsafe( - self._emit_with_dependencies(source, event), - self._loop, + return self._track_future( + asyncio.run_coroutine_threadsafe( + self._emit_with_dependencies(source, event), + self._loop, + ) ) if sync_handlers: @@ -383,16 +408,53 @@ class CrewAIEventsBus: ctx.run, self._call_handlers, source, event, sync_handlers ) if not async_handlers: - return sync_future + return self._track_future(sync_future) if async_handlers: - return asyncio.run_coroutine_threadsafe( - self._acall_handlers(source, event, async_handlers), - self._loop, + return self._track_future( + asyncio.run_coroutine_threadsafe( + self._acall_handlers(source, event, async_handlers), + self._loop, + ) ) return None + def flush(self, timeout: float | None = None) -> bool: + """Block until all pending event handlers complete. + + This method waits for all futures from previously emitted events to + finish executing. Useful at the end of operations (like kickoff) to + ensure all event handlers have completed before returning. + + Args: + timeout: Maximum time in seconds to wait for handlers to complete. + If None, waits indefinitely. + + Returns: + True if all handlers completed, False if timeout occurred. + """ + with self._futures_lock: + futures_to_wait = list(self._pending_futures) + + if not futures_to_wait: + return True + + from concurrent.futures import wait as wait_futures + + done, not_done = wait_futures(futures_to_wait, timeout=timeout) + + # Check for exceptions in completed futures + errors = [ + future.exception() for future in done if future.exception() is not None + ] + for error in errors: + self._console.print( + f"[CrewAIEventsBus] Handler exception during flush: {error}" + ) + + return len(not_done) == 0 + async def aemit(self, source: Any, event: BaseEvent) -> None: """Asynchronously emit an event to registered async handlers. diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 715fd44fd..b84b444ef 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1448,6 +1448,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return final_output finally: + await asyncio.to_thread(crewai_event_bus.flush) detach(flow_token) async def akickoff( diff --git a/lib/crewai/tests/cassettes/events/TestAgentEventOrdering.test_agent_events_have_event_ids.yaml b/lib/crewai/tests/cassettes/events/TestAgentEventOrdering.test_agent_events_have_event_ids.yaml new file mode 100644 index 000000000..79769ab99 --- /dev/null +++ b/lib/crewai/tests/cassettes/events/TestAgentEventOrdering.test_agent_events_have_event_ids.yaml @@ -0,0 +1,115 @@ +interactions: +- request: + body: '{"messages":[{"role":"system","content":"You are Helper. You help.\nYour + personal goal is: Help with tasks\nTo give my best complete final answer to + the task respond using the exact following format:\n\nThought: I now can give + a great answer\nFinal Answer: Your final answer must be the great and the most + complete as possible, it must be outcome described.\n\nI MUST use these formats, + my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say ''done'' + and nothing else.\n\nThis is the expected criteria for your final answer: The + word done.\nyou MUST return the actual complete content as the final answer, + not a summary.\n\nBegin! This is VERY important to you, use the tools available + and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '794' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.10 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJdT9wwEHzPr1j5+YISyBHIGxSBSh+Q+qGq16LI2JvErWMbewOt0P33 + yslxCS2V+hIpOzvjmd19SgCYkqwCJjpOonc6vciyL809bdafPpy/e3N9XV5Jer85uzkPny82bBUZ + 9u47CnpmHQjbO42krJlg4ZETRtW8PD45OS3WeTkCvZWoI611lBYHedoro9LD7HCdZkWaFzt6Z5XA + wCr4mgAAPI3faNRI/MkqyFbPlR5D4C2yat8EwLzVscJ4CCoQN8RWMyisITSj94+dHdqOKngLxj6C + 4AZa9YDAoY0BgJvwiP6buVSGazgb/yqQ1uBS0GMzBB5TmUHrBcCNscTjVMYotztkuzevbeu8vQt/ + UFmjjApd7ZEHa6LRQNaxEd0mALfjkIYXuZnztndUk/2B43P5+nTSY/NyFmixA8kS14t6ebR6Ra+W + SFzpsBgzE1x0KGfqvBM+SGUXQLJI/beb17Sn5Mq0/yM/A0KgI5S18yiVeJl4bvMYb/dfbfspj4ZZ + QP+gBNak0MdNSGz4oKeDYuFXIOzrRpkWvfNquqrG1UflMS+FwCZnyTb5DQAA//8DAJn9dBVkAwAA + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Tue, 20 Jan 2026 07:35:17 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '520' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '538' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/events/TestAgentEventOrdering.test_llm_events_have_parent.yaml b/lib/crewai/tests/cassettes/events/TestAgentEventOrdering.test_llm_events_have_parent.yaml new file mode 100644 index 000000000..3a82a4b47 --- /dev/null +++ b/lib/crewai/tests/cassettes/events/TestAgentEventOrdering.test_llm_events_have_parent.yaml @@ -0,0 +1,115 @@ +interactions: +- request: + body: '{"messages":[{"role":"system","content":"You are Helper. You help.\nYour + personal goal is: Help with tasks\nTo give my best complete final answer to + the task respond using the exact following format:\n\nThought: I now can give + a great answer\nFinal Answer: Your final answer must be the great and the most + complete as possible, it must be outcome described.\n\nI MUST use these formats, + my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say ''hi'' + and nothing else.\n\nThis is the expected criteria for your final answer: The + word hi.\nyou MUST return the actual complete content as the final answer, not + a summary.\n\nBegin! This is VERY important to you, use the tools available + and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '790' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.10 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJda9wwEHz3r1j0fA524vvyWyAkFEKh0D6kbTCKtLaVyishyXdpw/33 + IvtydtoE8mLwzs5oZnefEwCmJCuBiZYH0VmdXmXZXXOLq9vHP09y9/kb3a1uvn/ptq3dbzu2iAzz + 8IgivLDOhOmsxqAMjbBwyANG1Xy92my2xTLfDEBnJOpIa2xIi7M87RSp9Dw7X6ZZkebFkd4aJdCz + En4kAADPwzcaJYlPrIRs8VLp0HveICtPTQDMGR0rjHuvfOAU2GIChaGANHj/2pq+aUMJn4DMHgQn + aNQOgUMTAwAnv0f3k64VcQ2Xw18JrZrLOax7z2Mm6rWeAZzIBB5nMgS5PyKHk3VtGuvMg/+HympF + yreVQ+4NRZs+GMsG9JAA3A8j6l+lZtaZzoYqmF84PJcvt6Mem1YzQ4sjGEzgelZfXyze0KskBq60 + nw2ZCS5alBN12gjvpTIzIJml/t/NW9pjckXNR+QnQAi0AWVlHUolXiee2hzGy32v7TTlwTDz6HZK + YBUUurgJiTXv9XhOzP/2AbuqVtSgs06NN1Xb6mK94mshsM5Zckj+AgAA//8DAJ/ajdRiAwAA + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Tue, 20 Jan 2026 07:35:18 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '420' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '448' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_crew_completed_after_started.yaml b/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_crew_completed_after_started.yaml new file mode 100644 index 000000000..347277ef5 --- /dev/null +++ b/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_crew_completed_after_started.yaml @@ -0,0 +1,115 @@ +interactions: +- request: + body: '{"messages":[{"role":"system","content":"You are Responder. You give short + answers.\nYour personal goal is: Respond briefly\nTo give my best complete final + answer to the task respond using the exact following format:\n\nThought: I now + can give a great answer\nFinal Answer: Your final answer must be the great and + the most complete as possible, it must be outcome described.\n\nI MUST use these + formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say + ''yes'' and nothing else.\n\nThis is the expected criteria for your final answer: + The word yes.\nyou MUST return the actual complete content as the final answer, + not a summary.\n\nBegin! This is VERY important to you, use the tools available + and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '809' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.10 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFJNa9wwEL37Vww6r4O98X7Ut0La0h5KIb0sbTCKNJbVypLQyNkuYf97 + kXezdpoEejF43ryn92bmMQNgWrIamOh4FL03+U1R7NTtF72TH3b0jfo9qtV1rw63X2+Wn9giMdz9 + LxTxiXUlXO8NRu3sCRYBecSkWm7W2+27alVuR6B3Ek2iKR/z6qrMe211viyWq7yo8rI60zunBRKr + 4UcGAPA4fpNRK/EPq6FYPFV6JOIKWX1pAmDBmVRhnEhT5DayxQQKZyPa0fv3zg2qizV8Buv2ILgF + pR8QOKgUALilPYaf9qO23MD78a+GA9JcL2A7EE+h7GDMDODWusjTUMYkd2fkePFunPLB3dM/VNZq + q6lrAnJyNvmk6Dwb0WMGcDfOaHgWm/ngeh+b6H7j+Fy5Lk96bNrNDK3OYHSRm1l9s1q8otdIjFwb + mk2ZCS46lBN1WgkfpHYzIJulfunmNe1Tcm3V/8hPgBDoI8rGB5RaPE88tQVMp/tW22XKo2FGGB60 + wCZqDGkTEls+mNM9MTpQxL5ptVUYfNCno2p9c71Z840Q2JYsO2Z/AQAA//8DALjcfKRjAwAA + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Tue, 20 Jan 2026 07:35:19 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '586' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '606' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_crew_events_have_event_ids.yaml b/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_crew_events_have_event_ids.yaml new file mode 100644 index 000000000..28cb3f534 --- /dev/null +++ b/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_crew_events_have_event_ids.yaml @@ -0,0 +1,115 @@ +interactions: +- request: + body: '{"messages":[{"role":"system","content":"You are Responder. You give short + answers.\nYour personal goal is: Respond briefly\nTo give my best complete final + answer to the task respond using the exact following format:\n\nThought: I now + can give a great answer\nFinal Answer: Your final answer must be the great and + the most complete as possible, it must be outcome described.\n\nI MUST use these + formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say + ''hello'' and nothing else.\n\nThis is the expected criteria for your final + answer: The word hello.\nyou MUST return the actual complete content as the + final answer, not a summary.\n\nBegin! This is VERY important to you, use the + tools available and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '813' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.10 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAAwAAAP//jFLLbtswELzrKxY8W4HkynKim4GgaA/toXAPfQQCTa0kNhSXIKm4ReB/ + L0g5lpKmQC4CtLMznNndxwSAyYZVwETPvRiMSm+z7Fu/3zm3/76mtt+VX+4/F/bTh/LrkQ5sFRh0 + +IXCP7GuBA1GoZekJ1hY5B6Dar4tr69vik1+E4GBGlSB1hmfFld5Okgt03W23qRZkebFmd6TFOhY + BT8SAIDH+A1GdYO/WQXZ6qkyoHO8Q1ZdmgCYJRUqjDsnnefas9UMCtIedfS+72nsel/BR9B0BME1 + dPIBgUMXAgDX7oj2p34vNVewi38V9KgULRUttqPjIZYelVoAXGvyPIwlZrk7I6eLe0WdsXRwL6is + lVq6vrbIHeng1HkyLKKnBOAuTml8FpwZS4Pxtad7jM/lZT7psXk7C7Q4g548V4v6drN6Ra9u0HOp + 3GLOTHDRYzNT56XwsZG0AJJF6n/dvKY9JZe6e4v8DAiBxmNTG4uNFM8Tz20Ww/H+r+0y5WiYObQP + UmDtJdqwiQZbPqrpopj74zwOdSt1h9ZYOZ1Va+p325JvhcA2Z8kp+QsAAP//AwCEhgo7ZQMAAA== + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Tue, 20 Jan 2026 07:35:20 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '473' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '618' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_task_parent_is_crew.yaml b/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_task_parent_is_crew.yaml new file mode 100644 index 000000000..74b684fbb --- /dev/null +++ b/lib/crewai/tests/cassettes/events/TestCrewEventOrdering.test_task_parent_is_crew.yaml @@ -0,0 +1,115 @@ +interactions: +- request: + body: '{"messages":[{"role":"system","content":"You are Responder. You give short + answers.\nYour personal goal is: Respond briefly\nTo give my best complete final + answer to the task respond using the exact following format:\n\nThought: I now + can give a great answer\nFinal Answer: Your final answer must be the great and + the most complete as possible, it must be outcome described.\n\nI MUST use these + formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say + ''ok'' and nothing else.\n\nThis is the expected criteria for your final answer: + The word ok.\nyou MUST return the actual complete content as the final answer, + not a summary.\n\nBegin! This is VERY important to you, use the tools available + and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}' + headers: + User-Agent: + - X-USER-AGENT-XXX + accept: + - application/json + accept-encoding: + - ACCEPT-ENCODING-XXX + authorization: + - AUTHORIZATION-XXX + connection: + - keep-alive + content-length: + - '807' + content-type: + - application/json + host: + - api.openai.com + x-stainless-arch: + - X-STAINLESS-ARCH-XXX + x-stainless-async: + - 'false' + x-stainless-lang: + - python + x-stainless-os: + - X-STAINLESS-OS-XXX + x-stainless-package-version: + - 1.83.0 + x-stainless-read-timeout: + - X-STAINLESS-READ-TIMEOUT-XXX + x-stainless-retry-count: + - '0' + x-stainless-runtime: + - CPython + x-stainless-runtime-version: + - 3.12.10 + method: POST + uri: https://api.openai.com/v1/chat/completions + response: + body: + string: !!binary | + H4sIAAAAAAAAA4xSXWvcMBB8969Y9HwO9vU+Ur+1JAeBQqH0pbTB6OS1rUReCWl9aQj334t017PT + ptAXg3d2RjO7+5IBCN2ICoTqJavBmfymKL5pPtx9/rikzU3x5YF22/3umXa3n4pbsYgMu39Axb9Z + V8oOziBrSydYeZSMUbXcbq6v36/WyyIBg23QRFrnOF9dlfmgSefLYrnOi1Vers703mqFQVTwPQMA + eEnfaJQa/CkqSGKpMmAIskNRXZoAhLcmVoQMQQeWxGIxgcoSIyXvX3s7dj1XcAdkn0BJgk4fECR0 + MQBICk/of9BOkzTwIf1VYB/nch7bMciYiUZjZoAksizjTFKQ+zNyvFg3tnPe7sMfVNFq0qGvPcpg + KdoMbJ1I6DEDuE8jGl+lFs7bwXHN9hHTc+WmPOmJaTUzdHUG2bI0s/p2vXhDr26QpTZhNmShpOqx + majTRuTYaDsDslnqv928pX1Krqn7H/kJUAodY1M7j41WrxNPbR7j5f6r7TLlZFgE9AetsGaNPm6i + wVaO5nROIjwHxqFuNXXondenm2pd/W67kVulsC1Fdsx+AQAA//8DAEJgbhBiAwAA + headers: + CF-RAY: + - CF-RAY-XXX + Connection: + - keep-alive + Content-Type: + - application/json + Date: + - Tue, 20 Jan 2026 07:35:20 GMT + Server: + - cloudflare + Set-Cookie: + - SET-COOKIE-XXX + Strict-Transport-Security: + - STS-XXX + Transfer-Encoding: + - chunked + X-Content-Type-Options: + - X-CONTENT-TYPE-XXX + access-control-expose-headers: + - ACCESS-CONTROL-XXX + alt-svc: + - h3=":443"; ma=86400 + cf-cache-status: + - DYNAMIC + openai-organization: + - OPENAI-ORG-XXX + openai-processing-ms: + - '474' + openai-project: + - OPENAI-PROJECT-XXX + openai-version: + - '2020-10-01' + x-envoy-upstream-service-time: + - '498' + x-openai-proxy-wasm: + - v0.1 + x-ratelimit-limit-requests: + - X-RATELIMIT-LIMIT-REQUESTS-XXX + x-ratelimit-limit-tokens: + - X-RATELIMIT-LIMIT-TOKENS-XXX + x-ratelimit-remaining-requests: + - X-RATELIMIT-REMAINING-REQUESTS-XXX + x-ratelimit-remaining-tokens: + - X-RATELIMIT-REMAINING-TOKENS-XXX + x-ratelimit-reset-requests: + - X-RATELIMIT-RESET-REQUESTS-XXX + x-ratelimit-reset-tokens: + - X-RATELIMIT-RESET-TOKENS-XXX + x-request-id: + - X-REQUEST-ID-XXX + status: + code: 200 + message: OK +version: 1 diff --git a/lib/crewai/tests/events/test_event_ordering.py b/lib/crewai/tests/events/test_event_ordering.py new file mode 100644 index 000000000..2f8fafa36 --- /dev/null +++ b/lib/crewai/tests/events/test_event_ordering.py @@ -0,0 +1,499 @@ +"""Tests for event ordering and parent-child relationships.""" + +import pytest + +from crewai.agent import Agent +from crewai.crew import Crew +from crewai.events.base_events import BaseEvent +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.agent_events import ( + AgentExecutionCompletedEvent, + AgentExecutionStartedEvent, +) +from crewai.events.types.crew_events import ( + CrewKickoffCompletedEvent, + CrewKickoffStartedEvent, +) +from crewai.events.types.flow_events import ( + FlowFinishedEvent, + FlowStartedEvent, + MethodExecutionFinishedEvent, + MethodExecutionStartedEvent, +) +from crewai.events.types.llm_events import ( + LLMCallCompletedEvent, + LLMCallStartedEvent, +) +from crewai.events.types.task_events import ( + TaskCompletedEvent, + TaskStartedEvent, +) +from crewai.flow.flow import Flow, listen, start +from crewai.task import Task + + +class EventCollector: + """Collects events and provides helpers to find related events.""" + + def __init__(self) -> None: + self.events: list[BaseEvent] = [] + + def add(self, event: BaseEvent) -> None: + self.events.append(event) + + def first(self, event_type: type[BaseEvent]) -> BaseEvent | None: + for e in self.events: + if isinstance(e, event_type): + return e + return None + + def all_of(self, event_type: type[BaseEvent]) -> list[BaseEvent]: + return [e for e in self.events if isinstance(e, event_type)] + + def with_parent(self, parent_id: str) -> list[BaseEvent]: + return [e for e in self.events if e.parent_event_id == parent_id] + + +@pytest.fixture +def collector() -> EventCollector: + """Fixture that collects events during test execution.""" + c = EventCollector() + + @crewai_event_bus.on(CrewKickoffStartedEvent) + def h1(source, event): + c.add(event) + + @crewai_event_bus.on(CrewKickoffCompletedEvent) + def h2(source, event): + c.add(event) + + @crewai_event_bus.on(TaskStartedEvent) + def h3(source, event): + c.add(event) + + @crewai_event_bus.on(TaskCompletedEvent) + def h4(source, event): + c.add(event) + + @crewai_event_bus.on(AgentExecutionStartedEvent) + def h5(source, event): + c.add(event) + + @crewai_event_bus.on(AgentExecutionCompletedEvent) + def h6(source, event): + c.add(event) + + @crewai_event_bus.on(LLMCallStartedEvent) + def h7(source, event): + c.add(event) + + @crewai_event_bus.on(LLMCallCompletedEvent) + def h8(source, event): + c.add(event) + + @crewai_event_bus.on(FlowStartedEvent) + def h9(source, event): + c.add(event) + + @crewai_event_bus.on(FlowFinishedEvent) + def h10(source, event): + c.add(event) + + @crewai_event_bus.on(MethodExecutionStartedEvent) + def h11(source, event): + c.add(event) + + @crewai_event_bus.on(MethodExecutionFinishedEvent) + def h12(source, event): + c.add(event) + + return c + + +class TestCrewEventOrdering: + """Tests for event ordering in crew execution.""" + + @pytest.mark.vcr() + def test_crew_events_have_event_ids(self, collector: EventCollector) -> None: + """Every crew event should have a unique event_id.""" + agent = Agent( + role="Responder", + goal="Respond briefly", + backstory="You give short answers.", + verbose=False, + ) + task = Task( + description="Say 'hello' and nothing else.", + expected_output="The word hello.", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], verbose=False) + crew.kickoff() + + started = collector.first(CrewKickoffStartedEvent) + completed = collector.first(CrewKickoffCompletedEvent) + + assert started is not None + assert started.event_id is not None + assert len(started.event_id) > 0 + + assert completed is not None + assert completed.event_id is not None + assert completed.event_id != started.event_id + + @pytest.mark.vcr() + def test_crew_completed_after_started(self, collector: EventCollector) -> None: + """Crew completed event should have higher sequence than started.""" + agent = Agent( + role="Responder", + goal="Respond briefly", + backstory="You give short answers.", + verbose=False, + ) + task = Task( + description="Say 'yes' and nothing else.", + expected_output="The word yes.", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], verbose=False) + crew.kickoff() + + started = collector.first(CrewKickoffStartedEvent) + completed = collector.first(CrewKickoffCompletedEvent) + + assert started is not None + assert completed is not None + assert started.emission_sequence is not None + assert completed.emission_sequence is not None + assert completed.emission_sequence > started.emission_sequence + + @pytest.mark.vcr() + def test_task_parent_is_crew(self, collector: EventCollector) -> None: + """Task events should have crew event as parent.""" + agent = Agent( + role="Responder", + goal="Respond briefly", + backstory="You give short answers.", + verbose=False, + ) + task = Task( + description="Say 'ok' and nothing else.", + expected_output="The word ok.", + agent=agent, + ) + crew = Crew(agents=[agent], tasks=[task], verbose=False) + crew.kickoff() + + crew_started = collector.first(CrewKickoffStartedEvent) + task_started = collector.first(TaskStartedEvent) + + assert crew_started is not None + assert task_started is not None + assert task_started.parent_event_id == crew_started.event_id + + +class TestAgentEventOrdering: + """Tests for event ordering in agent execution.""" + + @pytest.mark.vcr() + def test_agent_events_have_event_ids(self, collector: EventCollector) -> None: + """Agent execution events should have event_ids.""" + agent = Agent( + role="Helper", + goal="Help with tasks", + backstory="You help.", + verbose=False, + ) + task = Task( + description="Say 'done' and nothing else.", + expected_output="The word done.", + agent=agent, + ) + agent.execute_task(task) + crewai_event_bus.flush() + + started = collector.first(AgentExecutionStartedEvent) + completed = collector.first(AgentExecutionCompletedEvent) + + if started: + assert started.event_id is not None + + if completed: + assert completed.event_id is not None + + @pytest.mark.vcr() + def test_llm_events_have_parent(self, collector: EventCollector) -> None: + """LLM call events should have a parent event.""" + agent = Agent( + role="Helper", + goal="Help with tasks", + backstory="You help.", + verbose=False, + ) + task = Task( + description="Say 'hi' and nothing else.", + expected_output="The word hi.", + agent=agent, + ) + agent.execute_task(task) + crewai_event_bus.flush() + + llm_started = collector.first(LLMCallStartedEvent) + + if llm_started: + assert llm_started.event_id is not None + # LLM events should have some parent in the hierarchy + assert llm_started.parent_event_id is not None + + +class TestFlowWithCrewEventOrdering: + """Tests for event ordering in flows containing crews.""" + + @pytest.mark.vcr() + def test_flow_events_have_ids(self, collector: EventCollector) -> None: + """Flow events should have event_ids.""" + agent = Agent( + role="Worker", + goal="Do work", + backstory="You work.", + verbose=False, + ) + task = Task( + description="Say 'complete' and nothing else.", + expected_output="The word complete.", + agent=agent, + ) + + class SimpleFlow(Flow): + @start() + def run_crew(self): + c = Crew(agents=[agent], tasks=[task], verbose=False) + return c.kickoff() + + flow = SimpleFlow() + flow.kickoff() + + flow_started = collector.first(FlowStartedEvent) + flow_finished = collector.first(FlowFinishedEvent) + + assert flow_started is not None + assert flow_started.event_id is not None + + assert flow_finished is not None + assert flow_finished.event_id is not None + + @pytest.mark.vcr() + def test_method_parent_is_flow(self, collector: EventCollector) -> None: + """Method execution events should have flow as parent.""" + agent = Agent( + role="Worker", + goal="Do work", + backstory="You work.", + verbose=False, + ) + task = Task( + description="Say 'ready' and nothing else.", + expected_output="The word ready.", + agent=agent, + ) + + class FlowWithMethod(Flow): + @start() + def my_method(self): + c = Crew(agents=[agent], tasks=[task], verbose=False) + return c.kickoff() + + flow = FlowWithMethod() + flow.kickoff() + + flow_started = collector.first(FlowStartedEvent) + method_started = collector.first(MethodExecutionStartedEvent) + + assert flow_started is not None + assert method_started is not None + assert method_started.parent_event_id == flow_started.event_id + + @pytest.mark.vcr() + def test_crew_parent_is_method(self, collector: EventCollector) -> None: + """Crew inside flow method should have method as parent.""" + agent = Agent( + role="Worker", + goal="Do work", + backstory="You work.", + verbose=False, + ) + task = Task( + description="Say 'go' and nothing else.", + expected_output="The word go.", + agent=agent, + ) + + class FlowWithCrew(Flow): + @start() + def run_it(self): + c = Crew(agents=[agent], tasks=[task], verbose=False) + return c.kickoff() + + flow = FlowWithCrew() + flow.kickoff() + + method_started = collector.first(MethodExecutionStartedEvent) + crew_started = collector.first(CrewKickoffStartedEvent) + + assert method_started is not None + assert crew_started is not None + assert crew_started.parent_event_id == method_started.event_id + + +class TestFlowWithMultipleCrewsEventOrdering: + """Tests for event ordering in flows with multiple crews.""" + + @pytest.mark.vcr() + def test_two_crews_have_different_ids(self, collector: EventCollector) -> None: + """Two crews in a flow should have different event_ids.""" + agent1 = Agent( + role="First", + goal="Be first", + backstory="You go first.", + verbose=False, + ) + agent2 = Agent( + role="Second", + goal="Be second", + backstory="You go second.", + verbose=False, + ) + task1 = Task( + description="Say '1' and nothing else.", + expected_output="The number 1.", + agent=agent1, + ) + task2 = Task( + description="Say '2' and nothing else.", + expected_output="The number 2.", + agent=agent2, + ) + + class TwoCrewFlow(Flow): + @start() + def first(self): + c = Crew(agents=[agent1], tasks=[task1], verbose=False) + return c.kickoff() + + @listen(first) + def second(self, _): + c = Crew(agents=[agent2], tasks=[task2], verbose=False) + return c.kickoff() + + flow = TwoCrewFlow() + flow.kickoff() + + crew_started_events = collector.all_of(CrewKickoffStartedEvent) + + assert len(crew_started_events) >= 2 + assert crew_started_events[0].event_id != crew_started_events[1].event_id + + @pytest.mark.vcr() + def test_second_crew_after_first(self, collector: EventCollector) -> None: + """Second crew should have higher sequence than first.""" + agent1 = Agent( + role="First", + goal="Be first", + backstory="You go first.", + verbose=False, + ) + agent2 = Agent( + role="Second", + goal="Be second", + backstory="You go second.", + verbose=False, + ) + task1 = Task( + description="Say 'a' and nothing else.", + expected_output="The letter a.", + agent=agent1, + ) + task2 = Task( + description="Say 'b' and nothing else.", + expected_output="The letter b.", + agent=agent2, + ) + + class SequentialCrewFlow(Flow): + @start() + def crew_a(self): + c = Crew(agents=[agent1], tasks=[task1], verbose=False) + return c.kickoff() + + @listen(crew_a) + def crew_b(self, _): + c = Crew(agents=[agent2], tasks=[task2], verbose=False) + return c.kickoff() + + flow = SequentialCrewFlow() + flow.kickoff() + + crew_started_events = collector.all_of(CrewKickoffStartedEvent) + + assert len(crew_started_events) >= 2 + first = crew_started_events[0] + second = crew_started_events[1] + + assert first.emission_sequence is not None + assert second.emission_sequence is not None + assert second.emission_sequence > first.emission_sequence + + @pytest.mark.vcr() + def test_tasks_have_correct_crew_parents(self, collector: EventCollector) -> None: + """Tasks in different crews should have their own crew as parent.""" + agent1 = Agent( + role="Alpha", + goal="Do alpha work", + backstory="You are alpha.", + verbose=False, + ) + agent2 = Agent( + role="Beta", + goal="Do beta work", + backstory="You are beta.", + verbose=False, + ) + task1 = Task( + description="Say 'alpha' and nothing else.", + expected_output="The word alpha.", + agent=agent1, + ) + task2 = Task( + description="Say 'beta' and nothing else.", + expected_output="The word beta.", + agent=agent2, + ) + + class ParentTestFlow(Flow): + @start() + def alpha_crew(self): + c = Crew(agents=[agent1], tasks=[task1], verbose=False) + return c.kickoff() + + @listen(alpha_crew) + def beta_crew(self, _): + c = Crew(agents=[agent2], tasks=[task2], verbose=False) + return c.kickoff() + + flow = ParentTestFlow() + flow.kickoff() + + crew_started_events = collector.all_of(CrewKickoffStartedEvent) + task_started_events = collector.all_of(TaskStartedEvent) + + assert len(crew_started_events) >= 2 + assert len(task_started_events) >= 2 + + crew1_id = crew_started_events[0].event_id + crew2_id = crew_started_events[1].event_id + + task1_parent = task_started_events[0].parent_event_id + task2_parent = task_started_events[1].parent_event_id + + assert task1_parent == crew1_id + assert task2_parent == crew2_id \ No newline at end of file