diff --git a/docs/ar/installation.mdx b/docs/ar/installation.mdx index 3a902fae0..6690e72ec 100644 --- a/docs/ar/installation.mdx +++ b/docs/ar/installation.mdx @@ -196,7 +196,7 @@ python3 --version - يدعم أي مزود سحابي بما في ذلك النشر المحلي - تكامل مع أنظمة الأمان الحالية - + تعرّف على عروض CrewAI للمؤسسات وجدول عرضًا توضيحيًا diff --git a/docs/en/installation.mdx b/docs/en/installation.mdx index 727f71220..50f43ff9d 100644 --- a/docs/en/installation.mdx +++ b/docs/en/installation.mdx @@ -199,7 +199,7 @@ For teams and organizations, CrewAI offers enterprise deployment options that el - Supports any hyperscaler including on prem deployments - Integration with your existing security systems - + Learn about CrewAI's enterprise offerings and schedule a demo diff --git a/docs/ko/installation.mdx b/docs/ko/installation.mdx index fc47d796b..6363f3271 100644 --- a/docs/ko/installation.mdx +++ b/docs/ko/installation.mdx @@ -189,7 +189,7 @@ CrewAI는 의존성 관리와 패키지 처리를 위해 `uv`를 사용합니다 - 온프레미스 배포를 포함하여 모든 하이퍼스케일러 지원 - 기존 보안 시스템과의 통합 - + CrewAI의 엔터프라이즈 서비스에 대해 알아보고 데모를 예약하세요 diff --git a/docs/pt-BR/installation.mdx b/docs/pt-BR/installation.mdx index 868778af8..74d0445d3 100644 --- a/docs/pt-BR/installation.mdx +++ b/docs/pt-BR/installation.mdx @@ -191,7 +191,7 @@ Para equipes e organizações, o CrewAI oferece opções de implantação corpor - Compatível com qualquer hyperscaler, incluindo ambientes on-premises - Integração com seus sistemas de segurança existentes - + Saiba mais sobre as soluções enterprise do CrewAI e agende uma demonstração diff --git a/lib/crewai/src/crewai/cli/checkpoint_tui.py b/lib/crewai/src/crewai/cli/checkpoint_tui.py index e0d10f813..26791af23 100644 --- a/lib/crewai/src/crewai/cli/checkpoint_tui.py +++ b/lib/crewai/src/crewai/cli/checkpoint_tui.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections import defaultdict -from typing import Any, ClassVar +from typing import Any, ClassVar, Literal from textual.app import App, ComposeResult from textual.binding import Binding @@ -78,15 +78,25 @@ def _build_entity_header(ent: dict[str, Any]) -> str: return "\n".join(lines) -# Return type: (location, action, inputs, task_output_overrides) -_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None] | None +# Return type: (location, action, inputs, task_output_overrides, entity_type) +_TuiResult = ( + tuple[ + str, + str, + dict[str, Any] | None, + dict[int, str] | None, + Literal["crew", "flow"], + ] + | None +) class CheckpointTUI(App[_TuiResult]): """TUI to browse and inspect checkpoints. - Returns ``(location, action, inputs)`` where action is ``"resume"`` or - ``"fork"`` and inputs is a parsed dict or ``None``, + Returns ``(location, action, inputs, task_overrides, entity_type)`` + where action is ``"resume"`` or ``"fork"``, inputs is a parsed dict + or ``None``, and entity_type is ``"crew"`` or ``"flow"``; or ``None`` if the user quit without selecting. """ @@ -506,6 +516,13 @@ class CheckpointTUI(App[_TuiResult]): overrides[task_idx] = editor.text return overrides or None + def _detect_entity_type(self, entry: dict[str, Any]) -> Literal["crew", "flow"]: + """Infer the top-level entity type from checkpoint entities.""" + for ent in entry.get("entities", []): + if ent.get("type") == "flow": + return "flow" + return "crew" + def _resolve_location(self, entry: dict[str, Any]) -> str: """Get the restore location string for a checkpoint entry.""" if "path" in entry: @@ -526,15 +543,64 @@ class CheckpointTUI(App[_TuiResult]): inputs = self._collect_inputs() overrides = self._collect_task_overrides() loc = self._resolve_location(self._selected_entry) + etype = self._detect_entity_type(self._selected_entry) if event.button.id == "btn-resume": - self.exit((loc, "resume", inputs, overrides)) + self.exit((loc, "resume", inputs, overrides, etype)) elif event.button.id == "btn-fork": - self.exit((loc, "fork", inputs, overrides)) + self.exit((loc, "fork", inputs, overrides, etype)) def action_refresh(self) -> None: self._refresh_tree() +def _apply_task_overrides(crew: Any, task_overrides: dict[int, str]) -> None: + """Apply task output overrides to a restored Crew and print modifications.""" + import click + + click.echo("Modifications:") + overridden_agents: set[int] = set() + for task_idx, new_output in task_overrides.items(): + if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None: + desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}" + if len(desc) > 60: + desc = desc[:57] + "..." + crew.tasks[task_idx].output.raw = new_output + preview = new_output.replace("\n", " ") + if len(preview) > 80: + preview = preview[:77] + "..." + click.echo(f" Task {task_idx + 1}: {desc}") + click.echo(f" -> {preview}") + agent = crew.tasks[task_idx].agent + if agent and agent.agent_executor: + nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent) + messages = agent.agent_executor.messages + system_positions = [ + i for i, m in enumerate(messages) if m.get("role") == "system" + ] + if nth < len(system_positions): + seg_start = system_positions[nth] + seg_end = ( + system_positions[nth + 1] + if nth + 1 < len(system_positions) + else len(messages) + ) + for j in range(seg_end - 1, seg_start, -1): + if messages[j].get("role") == "assistant": + messages[j]["content"] = new_output + break + overridden_agents.add(id(agent)) + + earliest = min(task_overrides) + for offset, subsequent in enumerate(crew.tasks[earliest + 1 :], start=earliest + 1): + if subsequent.output and offset not in task_overrides: + subsequent.output = None + if subsequent.agent and subsequent.agent.agent_executor: + subsequent.agent.agent_executor._resuming = False + if id(subsequent.agent) not in overridden_agents: + subsequent.agent.agent_executor.messages = [] + click.echo() + + async def _run_checkpoint_tui_async(location: str) -> None: """Async implementation of the checkpoint TUI flow.""" import click @@ -545,13 +611,54 @@ async def _run_checkpoint_tui_async(location: str) -> None: if selection is None: return - selected, action, inputs, task_overrides = selection + selected, action, inputs, task_overrides, entity_type = selection - from crewai.crew import Crew from crewai.state.checkpoint_config import CheckpointConfig config = CheckpointConfig(restore_from=selected) + if entity_type == "flow": + from crewai.events.event_bus import crewai_event_bus + from crewai.flow.flow import Flow + + if action == "fork": + click.echo(f"\nForking flow from: {selected}\n") + flow = Flow.fork(config) + else: + click.echo(f"\nResuming flow from: {selected}\n") + flow = Flow.from_checkpoint(config) + + if task_overrides: + from crewai.crew import Crew as CrewCls + + state = crewai_event_bus._runtime_state + if state is not None: + flat_offset = 0 + for entity in state.root: + if not isinstance(entity, CrewCls) or not entity.tasks: + continue + n = len(entity.tasks) + local = { + idx - flat_offset: out + for idx, out in task_overrides.items() + if flat_offset <= idx < flat_offset + n + } + if local: + _apply_task_overrides(entity, local) + flat_offset += n + + if inputs: + click.echo("Inputs:") + for k, v in inputs.items(): + click.echo(f" {k}: {v}") + click.echo() + + result = await flow.kickoff_async(inputs=inputs) + click.echo(f"\nResult: {getattr(result, 'raw', result)}") + return + + from crewai.crew import Crew + if action == "fork": click.echo(f"\nForking from: {selected}\n") crew = Crew.fork(config) @@ -560,50 +667,7 @@ async def _run_checkpoint_tui_async(location: str) -> None: crew = Crew.from_checkpoint(config) if task_overrides: - click.echo("Modifications:") - overridden_agents: set[int] = set() - for task_idx, new_output in task_overrides.items(): - if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None: - desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}" - if len(desc) > 60: - desc = desc[:57] + "..." - crew.tasks[task_idx].output.raw = new_output # type: ignore[union-attr] - preview = new_output.replace("\n", " ") - if len(preview) > 80: - preview = preview[:77] + "..." - click.echo(f" Task {task_idx + 1}: {desc}") - click.echo(f" -> {preview}") - agent = crew.tasks[task_idx].agent - if agent and agent.agent_executor: - nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent) - messages = agent.agent_executor.messages - system_positions = [ - i for i, m in enumerate(messages) if m.get("role") == "system" - ] - if nth < len(system_positions): - seg_start = system_positions[nth] - seg_end = ( - system_positions[nth + 1] - if nth + 1 < len(system_positions) - else len(messages) - ) - for j in range(seg_end - 1, seg_start, -1): - if messages[j].get("role") == "assistant": - messages[j]["content"] = new_output - break - overridden_agents.add(id(agent)) - - earliest = min(task_overrides) - for offset, subsequent in enumerate( - crew.tasks[earliest + 1 :], start=earliest + 1 - ): - if subsequent.output and offset not in task_overrides: - subsequent.output = None - if subsequent.agent and subsequent.agent.agent_executor: - subsequent.agent.agent_executor._resuming = False - if id(subsequent.agent) not in overridden_agents: - subsequent.agent.agent_executor.messages = [] - click.echo() + _apply_task_overrides(crew, task_overrides) if inputs: click.echo("Inputs:") diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 057f60ffb..88457f7aa 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -2138,7 +2138,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): try: # Reset flow state for fresh execution unless restoring from persistence - is_restoring = inputs and "id" in inputs and self.persistence is not None + is_restoring = ( + inputs and "id" in inputs and self.persistence is not None + ) or self.checkpoint_completed_methods is not None if not is_restoring: # Clear completed methods and outputs for a fresh start self._completed_methods.clear() diff --git a/lib/crewai/src/crewai/utilities/streaming.py b/lib/crewai/src/crewai/utilities/streaming.py index 008144bff..99bc9b199 100644 --- a/lib/crewai/src/crewai/utilities/streaming.py +++ b/lib/crewai/src/crewai/utilities/streaming.py @@ -7,6 +7,7 @@ import logging import queue import threading from typing import Any, NamedTuple +import uuid from typing_extensions import TypedDict @@ -25,6 +26,10 @@ from crewai.utilities.string_utils import sanitize_tool_name logger = logging.getLogger(__name__) +_current_stream_ids: contextvars.ContextVar[tuple[str, ...]] = contextvars.ContextVar( + "_current_stream_ids", default=() +) + class TaskInfo(TypedDict): """Task context information for streaming.""" @@ -45,6 +50,7 @@ class StreamingState(NamedTuple): async_queue: asyncio.Queue[StreamChunk | None | Exception] | None loop: asyncio.AbstractEventLoop | None handler: Callable[[Any, BaseEvent], None] + stream_id: str | None = None def _extract_tool_call_info( @@ -106,6 +112,7 @@ def _create_stream_handler( sync_queue: queue.Queue[StreamChunk | None | Exception], async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None, loop: asyncio.AbstractEventLoop | None = None, + stream_id: str | None = None, ) -> Callable[[Any, BaseEvent], None]: """Create a stream handler function. @@ -114,21 +121,19 @@ def _create_stream_handler( sync_queue: Synchronous queue for chunks. async_queue: Optional async queue for chunks. loop: Optional event loop for async operations. + stream_id: Stream scope ID for concurrent isolation. Returns: Handler function that can be registered with the event bus. """ def stream_handler(_: Any, event: BaseEvent) -> None: - """Handle LLM stream chunk events and enqueue them. - - Args: - _: Event source (unused). - event: The event to process. - """ if not isinstance(event, LLMStreamChunkEvent): return + if stream_id is not None and stream_id not in _current_stream_ids.get(): + return + chunk = _create_stream_chunk(event, current_task_info) if async_queue is not None and loop is not None: @@ -203,7 +208,11 @@ def create_streaming_state( async_queue = asyncio.Queue() loop = asyncio.get_event_loop() - handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop) + stream_id = str(uuid.uuid4()) + + handler = _create_stream_handler( + current_task_info, sync_queue, async_queue, loop, stream_id=stream_id + ) crewai_event_bus.register_handler(LLMStreamChunkEvent, handler) return StreamingState( @@ -213,6 +222,7 @@ def create_streaming_state( async_queue=async_queue, loop=loop, handler=handler, + stream_id=stream_id, ) @@ -260,7 +270,12 @@ def create_chunk_generator( Yields: StreamChunk objects as they arrive. """ - ctx = contextvars.copy_context() + if state.stream_id is not None: + token = _current_stream_ids.set((*_current_stream_ids.get(), state.stream_id)) + ctx = contextvars.copy_context() + _current_stream_ids.reset(token) + else: + ctx = contextvars.copy_context() thread = threading.Thread(target=ctx.run, args=(run_func,), daemon=True) thread.start() @@ -300,7 +315,12 @@ async def create_async_chunk_generator( "Async queue not initialized. Use create_streaming_state(use_async=True)." ) - task = asyncio.create_task(run_coro()) + if state.stream_id is not None: + token = _current_stream_ids.set((*_current_stream_ids.get(), state.stream_id)) + task = asyncio.create_task(run_coro()) + _current_stream_ids.reset(token) + else: + task = asyncio.create_task(run_coro()) try: while True: diff --git a/lib/crewai/tests/test_streaming.py b/lib/crewai/tests/test_streaming.py index 7b1c8e1ba..9079c393f 100644 --- a/lib/crewai/tests/test_streaming.py +++ b/lib/crewai/tests/test_streaming.py @@ -879,3 +879,91 @@ class TestStreamingImports: assert StreamChunk is not None assert StreamChunkType is not None assert ToolCallChunk is not None + + +class TestConcurrentStreamIsolation: + """Regression tests for concurrent streaming isolation (issue #5376).""" + + def test_concurrent_streams_do_not_cross_contaminate(self) -> None: + """Two concurrent streaming runs must each receive only their own chunks. + + Mirrors the real production path: create_streaming_state in the caller, + then temporarily push the stream_id into the ContextVar, copy_context, + and reset — exactly as create_chunk_generator does. + """ + import contextvars + import threading + + from crewai.utilities.streaming import ( + TaskInfo, + _current_stream_ids, + _unregister_handler, + create_streaming_state, + ) + + task_info_a: TaskInfo = { + "index": 0, + "name": "task_a", + "id": "a", + "agent_role": "A", + "agent_id": "a", + } + task_info_b: TaskInfo = { + "index": 1, + "name": "task_b", + "id": "b", + "agent_role": "B", + "agent_id": "b", + } + + state_a = create_streaming_state(task_info_a, []) + state_b = create_streaming_state(task_info_b, []) + + def make_emitter_ctx(state: Any) -> contextvars.Context: + token = _current_stream_ids.set( + (*_current_stream_ids.get(), state.stream_id) + ) + ctx = contextvars.copy_context() + _current_stream_ids.reset(token) + return ctx + + ctx_a = make_emitter_ctx(state_a) + ctx_b = make_emitter_ctx(state_b) + + def emit_chunks(prefix: str, call_id: str) -> None: + for text in [f"{prefix}1", f"{prefix}2", f"{prefix}3"]: + crewai_event_bus.emit( + None, + event=LLMStreamChunkEvent( + chunk=text, call_id=call_id, response_id="r" + ), + ) + + t_a = threading.Thread(target=ctx_a.run, args=(lambda: emit_chunks("A", "ca"),)) + t_b = threading.Thread(target=ctx_b.run, args=(lambda: emit_chunks("B", "cb"),)) + t_a.start() + t_b.start() + t_a.join() + t_b.join() + + chunks_a: list[str] = [] + while not state_a.sync_queue.empty(): + item = state_a.sync_queue.get_nowait() + if isinstance(item, StreamChunk): + chunks_a.append(item.content) + + chunks_b: list[str] = [] + while not state_b.sync_queue.empty(): + item = state_b.sync_queue.get_nowait() + if isinstance(item, StreamChunk): + chunks_b.append(item.content) + + assert set(chunks_a) == {"A1", "A2", "A3"}, ( + f"Stream A received unexpected chunks: {chunks_a}" + ) + assert set(chunks_b) == {"B1", "B2", "B3"}, ( + f"Stream B received unexpected chunks: {chunks_b}" + ) + + _unregister_handler(state_a.handler) + _unregister_handler(state_b.handler)