From 7e01c5a03048596dc6dd43b51b3ca2c41e3bc8fe Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Fri, 17 Apr 2026 01:34:06 +0800 Subject: [PATCH 1/4] fix: dispatch Flow checkpoints through Flow APIs in TUI --- lib/crewai/src/crewai/cli/checkpoint_tui.py | 170 ++++++++++++++------ lib/crewai/src/crewai/flow/flow.py | 4 +- 2 files changed, 120 insertions(+), 54 deletions(-) diff --git a/lib/crewai/src/crewai/cli/checkpoint_tui.py b/lib/crewai/src/crewai/cli/checkpoint_tui.py index e0d10f813..26791af23 100644 --- a/lib/crewai/src/crewai/cli/checkpoint_tui.py +++ b/lib/crewai/src/crewai/cli/checkpoint_tui.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections import defaultdict -from typing import Any, ClassVar +from typing import Any, ClassVar, Literal from textual.app import App, ComposeResult from textual.binding import Binding @@ -78,15 +78,25 @@ def _build_entity_header(ent: dict[str, Any]) -> str: return "\n".join(lines) -# Return type: (location, action, inputs, task_output_overrides) -_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None] | None +# Return type: (location, action, inputs, task_output_overrides, entity_type) +_TuiResult = ( + tuple[ + str, + str, + dict[str, Any] | None, + dict[int, str] | None, + Literal["crew", "flow"], + ] + | None +) class CheckpointTUI(App[_TuiResult]): """TUI to browse and inspect checkpoints. - Returns ``(location, action, inputs)`` where action is ``"resume"`` or - ``"fork"`` and inputs is a parsed dict or ``None``, + Returns ``(location, action, inputs, task_overrides, entity_type)`` + where action is ``"resume"`` or ``"fork"``, inputs is a parsed dict + or ``None``, and entity_type is ``"crew"`` or ``"flow"``; or ``None`` if the user quit without selecting. """ @@ -506,6 +516,13 @@ class CheckpointTUI(App[_TuiResult]): overrides[task_idx] = editor.text return overrides or None + def _detect_entity_type(self, entry: dict[str, Any]) -> Literal["crew", "flow"]: + """Infer the top-level entity type from checkpoint entities.""" + for ent in entry.get("entities", []): + if ent.get("type") == "flow": + return "flow" + return "crew" + def _resolve_location(self, entry: dict[str, Any]) -> str: """Get the restore location string for a checkpoint entry.""" if "path" in entry: @@ -526,15 +543,64 @@ class CheckpointTUI(App[_TuiResult]): inputs = self._collect_inputs() overrides = self._collect_task_overrides() loc = self._resolve_location(self._selected_entry) + etype = self._detect_entity_type(self._selected_entry) if event.button.id == "btn-resume": - self.exit((loc, "resume", inputs, overrides)) + self.exit((loc, "resume", inputs, overrides, etype)) elif event.button.id == "btn-fork": - self.exit((loc, "fork", inputs, overrides)) + self.exit((loc, "fork", inputs, overrides, etype)) def action_refresh(self) -> None: self._refresh_tree() +def _apply_task_overrides(crew: Any, task_overrides: dict[int, str]) -> None: + """Apply task output overrides to a restored Crew and print modifications.""" + import click + + click.echo("Modifications:") + overridden_agents: set[int] = set() + for task_idx, new_output in task_overrides.items(): + if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None: + desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}" + if len(desc) > 60: + desc = desc[:57] + "..." + crew.tasks[task_idx].output.raw = new_output + preview = new_output.replace("\n", " ") + if len(preview) > 80: + preview = preview[:77] + "..." + click.echo(f" Task {task_idx + 1}: {desc}") + click.echo(f" -> {preview}") + agent = crew.tasks[task_idx].agent + if agent and agent.agent_executor: + nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent) + messages = agent.agent_executor.messages + system_positions = [ + i for i, m in enumerate(messages) if m.get("role") == "system" + ] + if nth < len(system_positions): + seg_start = system_positions[nth] + seg_end = ( + system_positions[nth + 1] + if nth + 1 < len(system_positions) + else len(messages) + ) + for j in range(seg_end - 1, seg_start, -1): + if messages[j].get("role") == "assistant": + messages[j]["content"] = new_output + break + overridden_agents.add(id(agent)) + + earliest = min(task_overrides) + for offset, subsequent in enumerate(crew.tasks[earliest + 1 :], start=earliest + 1): + if subsequent.output and offset not in task_overrides: + subsequent.output = None + if subsequent.agent and subsequent.agent.agent_executor: + subsequent.agent.agent_executor._resuming = False + if id(subsequent.agent) not in overridden_agents: + subsequent.agent.agent_executor.messages = [] + click.echo() + + async def _run_checkpoint_tui_async(location: str) -> None: """Async implementation of the checkpoint TUI flow.""" import click @@ -545,13 +611,54 @@ async def _run_checkpoint_tui_async(location: str) -> None: if selection is None: return - selected, action, inputs, task_overrides = selection + selected, action, inputs, task_overrides, entity_type = selection - from crewai.crew import Crew from crewai.state.checkpoint_config import CheckpointConfig config = CheckpointConfig(restore_from=selected) + if entity_type == "flow": + from crewai.events.event_bus import crewai_event_bus + from crewai.flow.flow import Flow + + if action == "fork": + click.echo(f"\nForking flow from: {selected}\n") + flow = Flow.fork(config) + else: + click.echo(f"\nResuming flow from: {selected}\n") + flow = Flow.from_checkpoint(config) + + if task_overrides: + from crewai.crew import Crew as CrewCls + + state = crewai_event_bus._runtime_state + if state is not None: + flat_offset = 0 + for entity in state.root: + if not isinstance(entity, CrewCls) or not entity.tasks: + continue + n = len(entity.tasks) + local = { + idx - flat_offset: out + for idx, out in task_overrides.items() + if flat_offset <= idx < flat_offset + n + } + if local: + _apply_task_overrides(entity, local) + flat_offset += n + + if inputs: + click.echo("Inputs:") + for k, v in inputs.items(): + click.echo(f" {k}: {v}") + click.echo() + + result = await flow.kickoff_async(inputs=inputs) + click.echo(f"\nResult: {getattr(result, 'raw', result)}") + return + + from crewai.crew import Crew + if action == "fork": click.echo(f"\nForking from: {selected}\n") crew = Crew.fork(config) @@ -560,50 +667,7 @@ async def _run_checkpoint_tui_async(location: str) -> None: crew = Crew.from_checkpoint(config) if task_overrides: - click.echo("Modifications:") - overridden_agents: set[int] = set() - for task_idx, new_output in task_overrides.items(): - if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None: - desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}" - if len(desc) > 60: - desc = desc[:57] + "..." - crew.tasks[task_idx].output.raw = new_output # type: ignore[union-attr] - preview = new_output.replace("\n", " ") - if len(preview) > 80: - preview = preview[:77] + "..." - click.echo(f" Task {task_idx + 1}: {desc}") - click.echo(f" -> {preview}") - agent = crew.tasks[task_idx].agent - if agent and agent.agent_executor: - nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent) - messages = agent.agent_executor.messages - system_positions = [ - i for i, m in enumerate(messages) if m.get("role") == "system" - ] - if nth < len(system_positions): - seg_start = system_positions[nth] - seg_end = ( - system_positions[nth + 1] - if nth + 1 < len(system_positions) - else len(messages) - ) - for j in range(seg_end - 1, seg_start, -1): - if messages[j].get("role") == "assistant": - messages[j]["content"] = new_output - break - overridden_agents.add(id(agent)) - - earliest = min(task_overrides) - for offset, subsequent in enumerate( - crew.tasks[earliest + 1 :], start=earliest + 1 - ): - if subsequent.output and offset not in task_overrides: - subsequent.output = None - if subsequent.agent and subsequent.agent.agent_executor: - subsequent.agent.agent_executor._resuming = False - if id(subsequent.agent) not in overridden_agents: - subsequent.agent.agent_executor.messages = [] - click.echo() + _apply_task_overrides(crew, task_overrides) if inputs: click.echo("Inputs:") diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 057f60ffb..88457f7aa 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -2138,7 +2138,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): try: # Reset flow state for fresh execution unless restoring from persistence - is_restoring = inputs and "id" in inputs and self.persistence is not None + is_restoring = ( + inputs and "id" in inputs and self.persistence is not None + ) or self.checkpoint_completed_methods is not None if not is_restoring: # Clear completed methods and outputs for a fresh start self._completed_methods.clear() From baf91d8f0a15103d39e3cb87aafbb3d31229b600 Mon Sep 17 00:00:00 2001 From: iris-clawd Date: Thu, 16 Apr 2026 11:01:59 -0700 Subject: [PATCH 2/4] fix: update broken enterprise link on installation page (OSS-36) (#5443) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: update broken enterprise link on installation page (OSS-36) The 'Explore Enterprise Options' card on the installation page linked to https://crewai.com/enterprise which returns a 404. Updated the href to https://crewai.com/amp across all locales (en, pt-BR, ko, ar). * fix: use HubSpot form link for enterprise options card Updated per team feedback — the enterprise card should link to the HubSpot demo form instead of crewai.com/amp. --------- Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> --- docs/ar/installation.mdx | 2 +- docs/en/installation.mdx | 2 +- docs/ko/installation.mdx | 2 +- docs/pt-BR/installation.mdx | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/ar/installation.mdx b/docs/ar/installation.mdx index 3a902fae0..6690e72ec 100644 --- a/docs/ar/installation.mdx +++ b/docs/ar/installation.mdx @@ -196,7 +196,7 @@ python3 --version - يدعم أي مزود سحابي بما في ذلك النشر المحلي - تكامل مع أنظمة الأمان الحالية - + تعرّف على عروض CrewAI للمؤسسات وجدول عرضًا توضيحيًا diff --git a/docs/en/installation.mdx b/docs/en/installation.mdx index 727f71220..50f43ff9d 100644 --- a/docs/en/installation.mdx +++ b/docs/en/installation.mdx @@ -199,7 +199,7 @@ For teams and organizations, CrewAI offers enterprise deployment options that el - Supports any hyperscaler including on prem deployments - Integration with your existing security systems - + Learn about CrewAI's enterprise offerings and schedule a demo diff --git a/docs/ko/installation.mdx b/docs/ko/installation.mdx index fc47d796b..6363f3271 100644 --- a/docs/ko/installation.mdx +++ b/docs/ko/installation.mdx @@ -189,7 +189,7 @@ CrewAI는 의존성 관리와 패키지 처리를 위해 `uv`를 사용합니다 - 온프레미스 배포를 포함하여 모든 하이퍼스케일러 지원 - 기존 보안 시스템과의 통합 - + CrewAI의 엔터프라이즈 서비스에 대해 알아보고 데모를 예약하세요 diff --git a/docs/pt-BR/installation.mdx b/docs/pt-BR/installation.mdx index 868778af8..74d0445d3 100644 --- a/docs/pt-BR/installation.mdx +++ b/docs/pt-BR/installation.mdx @@ -191,7 +191,7 @@ Para equipes e organizações, o CrewAI oferece opções de implantação corpor - Compatível com qualquer hyperscaler, incluindo ambientes on-premises - Integração com seus sistemas de segurança existentes - + Saiba mais sobre as soluções enterprise do CrewAI e agende uma demonstração From fbe2a040645cc3ef2f3938279c026b58a52616bf Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Fri, 17 Apr 2026 02:39:22 +0800 Subject: [PATCH 3/4] fix: mock Repository.__init__ in test_publish_when_not_in_sync --- lib/crewai/tests/cli/tools/test_main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/crewai/tests/cli/tools/test_main.py b/lib/crewai/tests/cli/tools/test_main.py index 31032a072..ed51db74a 100644 --- a/lib/crewai/tests/cli/tools/test_main.py +++ b/lib/crewai/tests/cli/tools/test_main.py @@ -161,7 +161,8 @@ def test_install_api_error(mock_get, capsys, tool_command): @patch("crewai.cli.tools.main.git.Repository.is_synced", return_value=False) -def test_publish_when_not_in_sync(mock_is_synced, capsys, tool_command): +@patch("crewai.cli.tools.main.git.Repository.__init__", return_value=None) +def test_publish_when_not_in_sync(mock_init, mock_is_synced, capsys, tool_command): with raises(SystemExit): tool_command.publish(is_public=True) From 6136228a66d94d77f4899548812ce866577f4117 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Fri, 17 Apr 2026 03:02:03 +0800 Subject: [PATCH 4/4] fix: scope streaming handlers to prevent cross-run chunk contamination Concurrent streaming runs registered handlers on the singleton event bus that received all LLMStreamChunkEvent emissions, causing chunks to fan out across unrelated queues. Introduces a ContextVar-based stream scope ID so each handler only accepts events from its own execution context. Closes #5376 --- lib/crewai/src/crewai/utilities/streaming.py | 38 +++++++-- lib/crewai/tests/test_streaming.py | 88 ++++++++++++++++++++ 2 files changed, 117 insertions(+), 9 deletions(-) diff --git a/lib/crewai/src/crewai/utilities/streaming.py b/lib/crewai/src/crewai/utilities/streaming.py index 008144bff..99bc9b199 100644 --- a/lib/crewai/src/crewai/utilities/streaming.py +++ b/lib/crewai/src/crewai/utilities/streaming.py @@ -7,6 +7,7 @@ import logging import queue import threading from typing import Any, NamedTuple +import uuid from typing_extensions import TypedDict @@ -25,6 +26,10 @@ from crewai.utilities.string_utils import sanitize_tool_name logger = logging.getLogger(__name__) +_current_stream_ids: contextvars.ContextVar[tuple[str, ...]] = contextvars.ContextVar( + "_current_stream_ids", default=() +) + class TaskInfo(TypedDict): """Task context information for streaming.""" @@ -45,6 +50,7 @@ class StreamingState(NamedTuple): async_queue: asyncio.Queue[StreamChunk | None | Exception] | None loop: asyncio.AbstractEventLoop | None handler: Callable[[Any, BaseEvent], None] + stream_id: str | None = None def _extract_tool_call_info( @@ -106,6 +112,7 @@ def _create_stream_handler( sync_queue: queue.Queue[StreamChunk | None | Exception], async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None, loop: asyncio.AbstractEventLoop | None = None, + stream_id: str | None = None, ) -> Callable[[Any, BaseEvent], None]: """Create a stream handler function. @@ -114,21 +121,19 @@ def _create_stream_handler( sync_queue: Synchronous queue for chunks. async_queue: Optional async queue for chunks. loop: Optional event loop for async operations. + stream_id: Stream scope ID for concurrent isolation. Returns: Handler function that can be registered with the event bus. """ def stream_handler(_: Any, event: BaseEvent) -> None: - """Handle LLM stream chunk events and enqueue them. - - Args: - _: Event source (unused). - event: The event to process. - """ if not isinstance(event, LLMStreamChunkEvent): return + if stream_id is not None and stream_id not in _current_stream_ids.get(): + return + chunk = _create_stream_chunk(event, current_task_info) if async_queue is not None and loop is not None: @@ -203,7 +208,11 @@ def create_streaming_state( async_queue = asyncio.Queue() loop = asyncio.get_event_loop() - handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop) + stream_id = str(uuid.uuid4()) + + handler = _create_stream_handler( + current_task_info, sync_queue, async_queue, loop, stream_id=stream_id + ) crewai_event_bus.register_handler(LLMStreamChunkEvent, handler) return StreamingState( @@ -213,6 +222,7 @@ def create_streaming_state( async_queue=async_queue, loop=loop, handler=handler, + stream_id=stream_id, ) @@ -260,7 +270,12 @@ def create_chunk_generator( Yields: StreamChunk objects as they arrive. """ - ctx = contextvars.copy_context() + if state.stream_id is not None: + token = _current_stream_ids.set((*_current_stream_ids.get(), state.stream_id)) + ctx = contextvars.copy_context() + _current_stream_ids.reset(token) + else: + ctx = contextvars.copy_context() thread = threading.Thread(target=ctx.run, args=(run_func,), daemon=True) thread.start() @@ -300,7 +315,12 @@ async def create_async_chunk_generator( "Async queue not initialized. Use create_streaming_state(use_async=True)." ) - task = asyncio.create_task(run_coro()) + if state.stream_id is not None: + token = _current_stream_ids.set((*_current_stream_ids.get(), state.stream_id)) + task = asyncio.create_task(run_coro()) + _current_stream_ids.reset(token) + else: + task = asyncio.create_task(run_coro()) try: while True: diff --git a/lib/crewai/tests/test_streaming.py b/lib/crewai/tests/test_streaming.py index 7b1c8e1ba..9079c393f 100644 --- a/lib/crewai/tests/test_streaming.py +++ b/lib/crewai/tests/test_streaming.py @@ -879,3 +879,91 @@ class TestStreamingImports: assert StreamChunk is not None assert StreamChunkType is not None assert ToolCallChunk is not None + + +class TestConcurrentStreamIsolation: + """Regression tests for concurrent streaming isolation (issue #5376).""" + + def test_concurrent_streams_do_not_cross_contaminate(self) -> None: + """Two concurrent streaming runs must each receive only their own chunks. + + Mirrors the real production path: create_streaming_state in the caller, + then temporarily push the stream_id into the ContextVar, copy_context, + and reset — exactly as create_chunk_generator does. + """ + import contextvars + import threading + + from crewai.utilities.streaming import ( + TaskInfo, + _current_stream_ids, + _unregister_handler, + create_streaming_state, + ) + + task_info_a: TaskInfo = { + "index": 0, + "name": "task_a", + "id": "a", + "agent_role": "A", + "agent_id": "a", + } + task_info_b: TaskInfo = { + "index": 1, + "name": "task_b", + "id": "b", + "agent_role": "B", + "agent_id": "b", + } + + state_a = create_streaming_state(task_info_a, []) + state_b = create_streaming_state(task_info_b, []) + + def make_emitter_ctx(state: Any) -> contextvars.Context: + token = _current_stream_ids.set( + (*_current_stream_ids.get(), state.stream_id) + ) + ctx = contextvars.copy_context() + _current_stream_ids.reset(token) + return ctx + + ctx_a = make_emitter_ctx(state_a) + ctx_b = make_emitter_ctx(state_b) + + def emit_chunks(prefix: str, call_id: str) -> None: + for text in [f"{prefix}1", f"{prefix}2", f"{prefix}3"]: + crewai_event_bus.emit( + None, + event=LLMStreamChunkEvent( + chunk=text, call_id=call_id, response_id="r" + ), + ) + + t_a = threading.Thread(target=ctx_a.run, args=(lambda: emit_chunks("A", "ca"),)) + t_b = threading.Thread(target=ctx_b.run, args=(lambda: emit_chunks("B", "cb"),)) + t_a.start() + t_b.start() + t_a.join() + t_b.join() + + chunks_a: list[str] = [] + while not state_a.sync_queue.empty(): + item = state_a.sync_queue.get_nowait() + if isinstance(item, StreamChunk): + chunks_a.append(item.content) + + chunks_b: list[str] = [] + while not state_b.sync_queue.empty(): + item = state_b.sync_queue.get_nowait() + if isinstance(item, StreamChunk): + chunks_b.append(item.content) + + assert set(chunks_a) == {"A1", "A2", "A3"}, ( + f"Stream A received unexpected chunks: {chunks_a}" + ) + assert set(chunks_b) == {"B1", "B2", "B3"}, ( + f"Stream B received unexpected chunks: {chunks_b}" + ) + + _unregister_handler(state_a.handler) + _unregister_handler(state_b.handler)