feat: add flush() to event bus for deterministic event handling

This commit is contained in:
Greyson LaLonde
2026-01-20 02:43:25 -05:00
parent c8f9547816
commit ff9108dac4
9 changed files with 1146 additions and 7 deletions

View File

@@ -732,6 +732,7 @@ class Crew(FlowTrackable, BaseModel):
)
raise
finally:
crewai_event_bus.flush()
detach(token)
def kickoff_for_each(
@@ -880,6 +881,7 @@ class Crew(FlowTrackable, BaseModel):
)
raise
finally:
await asyncio.to_thread(crewai_event_bus.flush)
detach(token)
async def akickoff_for_each(

View File

@@ -80,6 +80,8 @@ class CrewAIEventsBus:
_execution_plan_cache: dict[type[BaseEvent], ExecutionPlan]
_console: ConsoleFormatter
_shutting_down: bool
_pending_futures: set[Future[Any]]
_futures_lock: threading.Lock
def __new__(cls) -> Self:
"""Create or return the singleton instance.
@@ -102,6 +104,8 @@ class CrewAIEventsBus:
"""
self._shutting_down = False
self._rwlock = RWLock()
self._pending_futures: set[Future[Any]] = set()
self._futures_lock = threading.Lock()
self._sync_handlers: dict[type[BaseEvent], SyncHandlerSet] = {}
self._async_handlers: dict[type[BaseEvent], AsyncHandlerSet] = {}
self._handler_dependencies: dict[
@@ -122,6 +126,25 @@ class CrewAIEventsBus:
)
self._loop_thread.start()
def _track_future(self, future: Future[Any]) -> Future[Any]:
"""Track a future and set up automatic cleanup when it completes.
Args:
future: The future to track
Returns:
The same future for chaining
"""
with self._futures_lock:
self._pending_futures.add(future)
def _cleanup(f: Future[Any]) -> None:
with self._futures_lock:
self._pending_futures.discard(f)
future.add_done_callback(_cleanup)
return future
def _run_loop(self) -> None:
"""Run the background async event loop."""
asyncio.set_event_loop(self._loop)
@@ -369,9 +392,11 @@ class CrewAIEventsBus:
async_handlers = self._async_handlers.get(event_type, frozenset())
if has_dependencies:
return asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event),
self._loop,
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event),
self._loop,
)
)
if sync_handlers:
@@ -383,16 +408,53 @@ class CrewAIEventsBus:
ctx.run, self._call_handlers, source, event, sync_handlers
)
if not async_handlers:
return sync_future
return self._track_future(sync_future)
if async_handlers:
return asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers),
self._loop,
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers),
self._loop,
)
)
return None
def flush(self, timeout: float | None = None) -> bool:
"""Block until all pending event handlers complete.
This method waits for all futures from previously emitted events to
finish executing. Useful at the end of operations (like kickoff) to
ensure all event handlers have completed before returning.
Args:
timeout: Maximum time in seconds to wait for handlers to complete.
If None, waits indefinitely.
Returns:
True if all handlers completed, False if timeout occurred.
"""
with self._futures_lock:
futures_to_wait = list(self._pending_futures)
if not futures_to_wait:
return True
from concurrent.futures import wait as wait_futures
done, not_done = wait_futures(futures_to_wait, timeout=timeout)
# Check for exceptions in completed futures
errors = [
future.exception() for future in done if future.exception() is not None
]
for error in errors:
self._console.print(
f"[CrewAIEventsBus] Handler exception during flush: {error}"
)
return len(not_done) == 0
async def aemit(self, source: Any, event: BaseEvent) -> None:
"""Asynchronously emit an event to registered async handlers.

View File

@@ -1448,6 +1448,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output
finally:
await asyncio.to_thread(crewai_event_bus.flush)
detach(flow_token)
async def akickoff(

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Helper. You help.\nYour
personal goal is: Help with tasks\nTo give my best complete final answer to
the task respond using the exact following format:\n\nThought: I now can give
a great answer\nFinal Answer: Your final answer must be the great and the most
complete as possible, it must be outcome described.\n\nI MUST use these formats,
my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say ''done''
and nothing else.\n\nThis is the expected criteria for your final answer: The
word done.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '794'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJdT9wwEHzPr1j5+YISyBHIGxSBSh+Q+qGq16LI2JvErWMbewOt0P33
yslxCS2V+hIpOzvjmd19SgCYkqwCJjpOonc6vciyL809bdafPpy/e3N9XV5Jer85uzkPny82bBUZ
9u47CnpmHQjbO42krJlg4ZETRtW8PD45OS3WeTkCvZWoI611lBYHedoro9LD7HCdZkWaFzt6Z5XA
wCr4mgAAPI3faNRI/MkqyFbPlR5D4C2yat8EwLzVscJ4CCoQN8RWMyisITSj94+dHdqOKngLxj6C
4AZa9YDAoY0BgJvwiP6buVSGazgb/yqQ1uBS0GMzBB5TmUHrBcCNscTjVMYotztkuzevbeu8vQt/
UFmjjApd7ZEHa6LRQNaxEd0mALfjkIYXuZnztndUk/2B43P5+nTSY/NyFmixA8kS14t6ebR6Ra+W
SFzpsBgzE1x0KGfqvBM+SGUXQLJI/beb17Sn5Mq0/yM/A0KgI5S18yiVeJl4bvMYb/dfbfspj4ZZ
QP+gBNak0MdNSGz4oKeDYuFXIOzrRpkWvfNquqrG1UflMS+FwCZnyTb5DQAA//8DAJn9dBVkAwAA
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:35:17 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '520'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '538'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Helper. You help.\nYour
personal goal is: Help with tasks\nTo give my best complete final answer to
the task respond using the exact following format:\n\nThought: I now can give
a great answer\nFinal Answer: Your final answer must be the great and the most
complete as possible, it must be outcome described.\n\nI MUST use these formats,
my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say ''hi''
and nothing else.\n\nThis is the expected criteria for your final answer: The
word hi.\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '790'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJda9wwEHz3r1j0fA524vvyWyAkFEKh0D6kbTCKtLaVyishyXdpw/33
IvtydtoE8mLwzs5oZnefEwCmJCuBiZYH0VmdXmXZXXOLq9vHP09y9/kb3a1uvn/ptq3dbzu2iAzz
8IgivLDOhOmsxqAMjbBwyANG1Xy92my2xTLfDEBnJOpIa2xIi7M87RSp9Dw7X6ZZkebFkd4aJdCz
En4kAADPwzcaJYlPrIRs8VLp0HveICtPTQDMGR0rjHuvfOAU2GIChaGANHj/2pq+aUMJn4DMHgQn
aNQOgUMTAwAnv0f3k64VcQ2Xw18JrZrLOax7z2Mm6rWeAZzIBB5nMgS5PyKHk3VtGuvMg/+HympF
yreVQ+4NRZs+GMsG9JAA3A8j6l+lZtaZzoYqmF84PJcvt6Mem1YzQ4sjGEzgelZfXyze0KskBq60
nw2ZCS5alBN12gjvpTIzIJml/t/NW9pjckXNR+QnQAi0AWVlHUolXiee2hzGy32v7TTlwTDz6HZK
YBUUurgJiTXv9XhOzP/2AbuqVtSgs06NN1Xb6mK94mshsM5Zckj+AgAA//8DAJ/ajdRiAwAA
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:35:18 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '420'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '448'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Responder. You give short
answers.\nYour personal goal is: Respond briefly\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
''yes'' and nothing else.\n\nThis is the expected criteria for your final answer:
The word yes.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '809'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFJNa9wwEL37Vww6r4O98X7Ut0La0h5KIb0sbTCKNJbVypLQyNkuYf97
kXezdpoEejF43ryn92bmMQNgWrIamOh4FL03+U1R7NTtF72TH3b0jfo9qtV1rw63X2+Wn9giMdz9
LxTxiXUlXO8NRu3sCRYBecSkWm7W2+27alVuR6B3Ek2iKR/z6qrMe211viyWq7yo8rI60zunBRKr
4UcGAPA4fpNRK/EPq6FYPFV6JOIKWX1pAmDBmVRhnEhT5DayxQQKZyPa0fv3zg2qizV8Buv2ILgF
pR8QOKgUALilPYaf9qO23MD78a+GA9JcL2A7EE+h7GDMDODWusjTUMYkd2fkePFunPLB3dM/VNZq
q6lrAnJyNvmk6Dwb0WMGcDfOaHgWm/ngeh+b6H7j+Fy5Lk96bNrNDK3OYHSRm1l9s1q8otdIjFwb
mk2ZCS46lBN1WgkfpHYzIJulfunmNe1Tcm3V/8hPgBDoI8rGB5RaPE88tQVMp/tW22XKo2FGGB60
wCZqDGkTEls+mNM9MTpQxL5ptVUYfNCno2p9c71Z840Q2JYsO2Z/AQAA//8DALjcfKRjAwAA
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:35:19 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '586'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '606'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Responder. You give short
answers.\nYour personal goal is: Respond briefly\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
''hello'' and nothing else.\n\nThis is the expected criteria for your final
answer: The word hello.\nyou MUST return the actual complete content as the
final answer, not a summary.\n\nBegin! This is VERY important to you, use the
tools available and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '813'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAAwAAAP//jFLLbtswELzrKxY8W4HkynKim4GgaA/toXAPfQQCTa0kNhSXIKm4ReB/
L0g5lpKmQC4CtLMznNndxwSAyYZVwETPvRiMSm+z7Fu/3zm3/76mtt+VX+4/F/bTh/LrkQ5sFRh0
+IXCP7GuBA1GoZekJ1hY5B6Dar4tr69vik1+E4GBGlSB1hmfFld5Okgt03W23qRZkebFmd6TFOhY
BT8SAIDH+A1GdYO/WQXZ6qkyoHO8Q1ZdmgCYJRUqjDsnnefas9UMCtIedfS+72nsel/BR9B0BME1
dPIBgUMXAgDX7oj2p34vNVewi38V9KgULRUttqPjIZYelVoAXGvyPIwlZrk7I6eLe0WdsXRwL6is
lVq6vrbIHeng1HkyLKKnBOAuTml8FpwZS4Pxtad7jM/lZT7psXk7C7Q4g548V4v6drN6Ra9u0HOp
3GLOTHDRYzNT56XwsZG0AJJF6n/dvKY9JZe6e4v8DAiBxmNTG4uNFM8Tz20Ww/H+r+0y5WiYObQP
UmDtJdqwiQZbPqrpopj74zwOdSt1h9ZYOZ1Va+p325JvhcA2Z8kp+QsAAP//AwCEhgo7ZQMAAA==
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:35:20 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '473'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '618'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,115 @@
interactions:
- request:
body: '{"messages":[{"role":"system","content":"You are Responder. You give short
answers.\nYour personal goal is: Respond briefly\nTo give my best complete final
answer to the task respond using the exact following format:\n\nThought: I now
can give a great answer\nFinal Answer: Your final answer must be the great and
the most complete as possible, it must be outcome described.\n\nI MUST use these
formats, my job depends on it!"},{"role":"user","content":"\nCurrent Task: Say
''ok'' and nothing else.\n\nThis is the expected criteria for your final answer:
The word ok.\nyou MUST return the actual complete content as the final answer,
not a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}],"model":"gpt-4.1-mini"}'
headers:
User-Agent:
- X-USER-AGENT-XXX
accept:
- application/json
accept-encoding:
- ACCEPT-ENCODING-XXX
authorization:
- AUTHORIZATION-XXX
connection:
- keep-alive
content-length:
- '807'
content-type:
- application/json
host:
- api.openai.com
x-stainless-arch:
- X-STAINLESS-ARCH-XXX
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- X-STAINLESS-OS-XXX
x-stainless-package-version:
- 1.83.0
x-stainless-read-timeout:
- X-STAINLESS-READ-TIMEOUT-XXX
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.10
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: !!binary |
H4sIAAAAAAAAA4xSXWvcMBB8969Y9HwO9vU+Ur+1JAeBQqH0pbTB6OS1rUReCWl9aQj334t017PT
ptAXg3d2RjO7+5IBCN2ICoTqJavBmfymKL5pPtx9/rikzU3x5YF22/3umXa3n4pbsYgMu39Axb9Z
V8oOziBrSydYeZSMUbXcbq6v36/WyyIBg23QRFrnOF9dlfmgSefLYrnOi1Vers703mqFQVTwPQMA
eEnfaJQa/CkqSGKpMmAIskNRXZoAhLcmVoQMQQeWxGIxgcoSIyXvX3s7dj1XcAdkn0BJgk4fECR0
MQBICk/of9BOkzTwIf1VYB/nch7bMciYiUZjZoAksizjTFKQ+zNyvFg3tnPe7sMfVNFq0qGvPcpg
KdoMbJ1I6DEDuE8jGl+lFs7bwXHN9hHTc+WmPOmJaTUzdHUG2bI0s/p2vXhDr26QpTZhNmShpOqx
majTRuTYaDsDslnqv928pX1Krqn7H/kJUAodY1M7j41WrxNPbR7j5f6r7TLlZFgE9AetsGaNPm6i
wVaO5nROIjwHxqFuNXXondenm2pd/W67kVulsC1Fdsx+AQAA//8DAEJgbhBiAwAA
headers:
CF-RAY:
- CF-RAY-XXX
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Tue, 20 Jan 2026 07:35:20 GMT
Server:
- cloudflare
Set-Cookie:
- SET-COOKIE-XXX
Strict-Transport-Security:
- STS-XXX
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- X-CONTENT-TYPE-XXX
access-control-expose-headers:
- ACCESS-CONTROL-XXX
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '474'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '498'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
- X-RATELIMIT-LIMIT-REQUESTS-XXX
x-ratelimit-limit-tokens:
- X-RATELIMIT-LIMIT-TOKENS-XXX
x-ratelimit-remaining-requests:
- X-RATELIMIT-REMAINING-REQUESTS-XXX
x-ratelimit-remaining-tokens:
- X-RATELIMIT-REMAINING-TOKENS-XXX
x-ratelimit-reset-requests:
- X-RATELIMIT-RESET-REQUESTS-XXX
x-ratelimit-reset-tokens:
- X-RATELIMIT-RESET-TOKENS-XXX
x-request-id:
- X-REQUEST-ID-XXX
status:
code: 200
message: OK
version: 1

View File

@@ -0,0 +1,499 @@
"""Tests for event ordering and parent-child relationships."""
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffStartedEvent,
)
from crewai.events.types.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallStartedEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskStartedEvent,
)
from crewai.flow.flow import Flow, listen, start
from crewai.task import Task
class EventCollector:
"""Collects events and provides helpers to find related events."""
def __init__(self) -> None:
self.events: list[BaseEvent] = []
def add(self, event: BaseEvent) -> None:
self.events.append(event)
def first(self, event_type: type[BaseEvent]) -> BaseEvent | None:
for e in self.events:
if isinstance(e, event_type):
return e
return None
def all_of(self, event_type: type[BaseEvent]) -> list[BaseEvent]:
return [e for e in self.events if isinstance(e, event_type)]
def with_parent(self, parent_id: str) -> list[BaseEvent]:
return [e for e in self.events if e.parent_event_id == parent_id]
@pytest.fixture
def collector() -> EventCollector:
"""Fixture that collects events during test execution."""
c = EventCollector()
@crewai_event_bus.on(CrewKickoffStartedEvent)
def h1(source, event):
c.add(event)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def h2(source, event):
c.add(event)
@crewai_event_bus.on(TaskStartedEvent)
def h3(source, event):
c.add(event)
@crewai_event_bus.on(TaskCompletedEvent)
def h4(source, event):
c.add(event)
@crewai_event_bus.on(AgentExecutionStartedEvent)
def h5(source, event):
c.add(event)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def h6(source, event):
c.add(event)
@crewai_event_bus.on(LLMCallStartedEvent)
def h7(source, event):
c.add(event)
@crewai_event_bus.on(LLMCallCompletedEvent)
def h8(source, event):
c.add(event)
@crewai_event_bus.on(FlowStartedEvent)
def h9(source, event):
c.add(event)
@crewai_event_bus.on(FlowFinishedEvent)
def h10(source, event):
c.add(event)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def h11(source, event):
c.add(event)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def h12(source, event):
c.add(event)
return c
class TestCrewEventOrdering:
"""Tests for event ordering in crew execution."""
@pytest.mark.vcr()
def test_crew_events_have_event_ids(self, collector: EventCollector) -> None:
"""Every crew event should have a unique event_id."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'hello' and nothing else.",
expected_output="The word hello.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
started = collector.first(CrewKickoffStartedEvent)
completed = collector.first(CrewKickoffCompletedEvent)
assert started is not None
assert started.event_id is not None
assert len(started.event_id) > 0
assert completed is not None
assert completed.event_id is not None
assert completed.event_id != started.event_id
@pytest.mark.vcr()
def test_crew_completed_after_started(self, collector: EventCollector) -> None:
"""Crew completed event should have higher sequence than started."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'yes' and nothing else.",
expected_output="The word yes.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
started = collector.first(CrewKickoffStartedEvent)
completed = collector.first(CrewKickoffCompletedEvent)
assert started is not None
assert completed is not None
assert started.emission_sequence is not None
assert completed.emission_sequence is not None
assert completed.emission_sequence > started.emission_sequence
@pytest.mark.vcr()
def test_task_parent_is_crew(self, collector: EventCollector) -> None:
"""Task events should have crew event as parent."""
agent = Agent(
role="Responder",
goal="Respond briefly",
backstory="You give short answers.",
verbose=False,
)
task = Task(
description="Say 'ok' and nothing else.",
expected_output="The word ok.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
crew.kickoff()
crew_started = collector.first(CrewKickoffStartedEvent)
task_started = collector.first(TaskStartedEvent)
assert crew_started is not None
assert task_started is not None
assert task_started.parent_event_id == crew_started.event_id
class TestAgentEventOrdering:
"""Tests for event ordering in agent execution."""
@pytest.mark.vcr()
def test_agent_events_have_event_ids(self, collector: EventCollector) -> None:
"""Agent execution events should have event_ids."""
agent = Agent(
role="Helper",
goal="Help with tasks",
backstory="You help.",
verbose=False,
)
task = Task(
description="Say 'done' and nothing else.",
expected_output="The word done.",
agent=agent,
)
agent.execute_task(task)
crewai_event_bus.flush()
started = collector.first(AgentExecutionStartedEvent)
completed = collector.first(AgentExecutionCompletedEvent)
if started:
assert started.event_id is not None
if completed:
assert completed.event_id is not None
@pytest.mark.vcr()
def test_llm_events_have_parent(self, collector: EventCollector) -> None:
"""LLM call events should have a parent event."""
agent = Agent(
role="Helper",
goal="Help with tasks",
backstory="You help.",
verbose=False,
)
task = Task(
description="Say 'hi' and nothing else.",
expected_output="The word hi.",
agent=agent,
)
agent.execute_task(task)
crewai_event_bus.flush()
llm_started = collector.first(LLMCallStartedEvent)
if llm_started:
assert llm_started.event_id is not None
# LLM events should have some parent in the hierarchy
assert llm_started.parent_event_id is not None
class TestFlowWithCrewEventOrdering:
"""Tests for event ordering in flows containing crews."""
@pytest.mark.vcr()
def test_flow_events_have_ids(self, collector: EventCollector) -> None:
"""Flow events should have event_ids."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'complete' and nothing else.",
expected_output="The word complete.",
agent=agent,
)
class SimpleFlow(Flow):
@start()
def run_crew(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return c.kickoff()
flow = SimpleFlow()
flow.kickoff()
flow_started = collector.first(FlowStartedEvent)
flow_finished = collector.first(FlowFinishedEvent)
assert flow_started is not None
assert flow_started.event_id is not None
assert flow_finished is not None
assert flow_finished.event_id is not None
@pytest.mark.vcr()
def test_method_parent_is_flow(self, collector: EventCollector) -> None:
"""Method execution events should have flow as parent."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'ready' and nothing else.",
expected_output="The word ready.",
agent=agent,
)
class FlowWithMethod(Flow):
@start()
def my_method(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return c.kickoff()
flow = FlowWithMethod()
flow.kickoff()
flow_started = collector.first(FlowStartedEvent)
method_started = collector.first(MethodExecutionStartedEvent)
assert flow_started is not None
assert method_started is not None
assert method_started.parent_event_id == flow_started.event_id
@pytest.mark.vcr()
def test_crew_parent_is_method(self, collector: EventCollector) -> None:
"""Crew inside flow method should have method as parent."""
agent = Agent(
role="Worker",
goal="Do work",
backstory="You work.",
verbose=False,
)
task = Task(
description="Say 'go' and nothing else.",
expected_output="The word go.",
agent=agent,
)
class FlowWithCrew(Flow):
@start()
def run_it(self):
c = Crew(agents=[agent], tasks=[task], verbose=False)
return c.kickoff()
flow = FlowWithCrew()
flow.kickoff()
method_started = collector.first(MethodExecutionStartedEvent)
crew_started = collector.first(CrewKickoffStartedEvent)
assert method_started is not None
assert crew_started is not None
assert crew_started.parent_event_id == method_started.event_id
class TestFlowWithMultipleCrewsEventOrdering:
"""Tests for event ordering in flows with multiple crews."""
@pytest.mark.vcr()
def test_two_crews_have_different_ids(self, collector: EventCollector) -> None:
"""Two crews in a flow should have different event_ids."""
agent1 = Agent(
role="First",
goal="Be first",
backstory="You go first.",
verbose=False,
)
agent2 = Agent(
role="Second",
goal="Be second",
backstory="You go second.",
verbose=False,
)
task1 = Task(
description="Say '1' and nothing else.",
expected_output="The number 1.",
agent=agent1,
)
task2 = Task(
description="Say '2' and nothing else.",
expected_output="The number 2.",
agent=agent2,
)
class TwoCrewFlow(Flow):
@start()
def first(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return c.kickoff()
@listen(first)
def second(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return c.kickoff()
flow = TwoCrewFlow()
flow.kickoff()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
assert len(crew_started_events) >= 2
assert crew_started_events[0].event_id != crew_started_events[1].event_id
@pytest.mark.vcr()
def test_second_crew_after_first(self, collector: EventCollector) -> None:
"""Second crew should have higher sequence than first."""
agent1 = Agent(
role="First",
goal="Be first",
backstory="You go first.",
verbose=False,
)
agent2 = Agent(
role="Second",
goal="Be second",
backstory="You go second.",
verbose=False,
)
task1 = Task(
description="Say 'a' and nothing else.",
expected_output="The letter a.",
agent=agent1,
)
task2 = Task(
description="Say 'b' and nothing else.",
expected_output="The letter b.",
agent=agent2,
)
class SequentialCrewFlow(Flow):
@start()
def crew_a(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return c.kickoff()
@listen(crew_a)
def crew_b(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return c.kickoff()
flow = SequentialCrewFlow()
flow.kickoff()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
assert len(crew_started_events) >= 2
first = crew_started_events[0]
second = crew_started_events[1]
assert first.emission_sequence is not None
assert second.emission_sequence is not None
assert second.emission_sequence > first.emission_sequence
@pytest.mark.vcr()
def test_tasks_have_correct_crew_parents(self, collector: EventCollector) -> None:
"""Tasks in different crews should have their own crew as parent."""
agent1 = Agent(
role="Alpha",
goal="Do alpha work",
backstory="You are alpha.",
verbose=False,
)
agent2 = Agent(
role="Beta",
goal="Do beta work",
backstory="You are beta.",
verbose=False,
)
task1 = Task(
description="Say 'alpha' and nothing else.",
expected_output="The word alpha.",
agent=agent1,
)
task2 = Task(
description="Say 'beta' and nothing else.",
expected_output="The word beta.",
agent=agent2,
)
class ParentTestFlow(Flow):
@start()
def alpha_crew(self):
c = Crew(agents=[agent1], tasks=[task1], verbose=False)
return c.kickoff()
@listen(alpha_crew)
def beta_crew(self, _):
c = Crew(agents=[agent2], tasks=[task2], verbose=False)
return c.kickoff()
flow = ParentTestFlow()
flow.kickoff()
crew_started_events = collector.all_of(CrewKickoffStartedEvent)
task_started_events = collector.all_of(TaskStartedEvent)
assert len(crew_started_events) >= 2
assert len(task_started_events) >= 2
crew1_id = crew_started_events[0].event_id
crew2_id = crew_started_events[1].event_id
task1_parent = task_started_events[0].parent_event_id
task2_parent = task_started_events[1].parent_event_id
assert task1_parent == crew1_id
assert task2_parent == crew2_id