Merge branch 'main' of github.com:crewAIInc/crewAI into lorenze/fix/flaky-test-git

This commit is contained in:
lorenzejay
2026-04-16 12:10:06 -07:00
8 changed files with 241 additions and 67 deletions

View File

@@ -196,7 +196,7 @@ python3 --version
- يدعم أي مزود سحابي بما في ذلك النشر المحلي
- تكامل مع أنظمة الأمان الحالية
<Card title="استكشف خيارات المؤسسات" icon="building" href="https://crewai.com/enterprise">
<Card title="استكشف خيارات المؤسسات" icon="building" href="https://share.hsforms.com/1Ooo2UViKQ22UOzdr7i77iwr87kg">
تعرّف على عروض CrewAI للمؤسسات وجدول عرضًا توضيحيًا
</Card>
</Note>

View File

@@ -199,7 +199,7 @@ For teams and organizations, CrewAI offers enterprise deployment options that el
- Supports any hyperscaler including on prem deployments
- Integration with your existing security systems
<Card title="Explore Enterprise Options" icon="building" href="https://crewai.com/enterprise">
<Card title="Explore Enterprise Options" icon="building" href="https://share.hsforms.com/1Ooo2UViKQ22UOzdr7i77iwr87kg">
Learn about CrewAI's enterprise offerings and schedule a demo
</Card>
</Note>

View File

@@ -189,7 +189,7 @@ CrewAI는 의존성 관리와 패키지 처리를 위해 `uv`를 사용합니다
- 온프레미스 배포를 포함하여 모든 하이퍼스케일러 지원
- 기존 보안 시스템과의 통합
<Card title="엔터프라이즈 옵션 살펴보기" icon="building" href="https://crewai.com/enterprise">
<Card title="엔터프라이즈 옵션 살펴보기" icon="building" href="https://share.hsforms.com/1Ooo2UViKQ22UOzdr7i77iwr87kg">
CrewAI의 엔터프라이즈 서비스에 대해 알아보고 데모를 예약하세요
</Card>
</Note>

View File

@@ -191,7 +191,7 @@ Para equipes e organizações, o CrewAI oferece opções de implantação corpor
- Compatível com qualquer hyperscaler, incluindo ambientes on-premises
- Integração com seus sistemas de segurança existentes
<Card title="Explore as Opções Enterprise" icon="building" href="https://crewai.com/enterprise">
<Card title="Explore as Opções Enterprise" icon="building" href="https://share.hsforms.com/1Ooo2UViKQ22UOzdr7i77iwr87kg">
Saiba mais sobre as soluções enterprise do CrewAI e agende uma demonstração
</Card>
</Note>

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from collections import defaultdict
from typing import Any, ClassVar
from typing import Any, ClassVar, Literal
from textual.app import App, ComposeResult
from textual.binding import Binding
@@ -78,15 +78,25 @@ def _build_entity_header(ent: dict[str, Any]) -> str:
return "\n".join(lines)
# Return type: (location, action, inputs, task_output_overrides)
_TuiResult = tuple[str, str, dict[str, Any] | None, dict[int, str] | None] | None
# Return type: (location, action, inputs, task_output_overrides, entity_type)
_TuiResult = (
tuple[
str,
str,
dict[str, Any] | None,
dict[int, str] | None,
Literal["crew", "flow"],
]
| None
)
class CheckpointTUI(App[_TuiResult]):
"""TUI to browse and inspect checkpoints.
Returns ``(location, action, inputs)`` where action is ``"resume"`` or
``"fork"`` and inputs is a parsed dict or ``None``,
Returns ``(location, action, inputs, task_overrides, entity_type)``
where action is ``"resume"`` or ``"fork"``, inputs is a parsed dict
or ``None``, and entity_type is ``"crew"`` or ``"flow"``;
or ``None`` if the user quit without selecting.
"""
@@ -506,6 +516,13 @@ class CheckpointTUI(App[_TuiResult]):
overrides[task_idx] = editor.text
return overrides or None
def _detect_entity_type(self, entry: dict[str, Any]) -> Literal["crew", "flow"]:
"""Infer the top-level entity type from checkpoint entities."""
for ent in entry.get("entities", []):
if ent.get("type") == "flow":
return "flow"
return "crew"
def _resolve_location(self, entry: dict[str, Any]) -> str:
"""Get the restore location string for a checkpoint entry."""
if "path" in entry:
@@ -526,15 +543,64 @@ class CheckpointTUI(App[_TuiResult]):
inputs = self._collect_inputs()
overrides = self._collect_task_overrides()
loc = self._resolve_location(self._selected_entry)
etype = self._detect_entity_type(self._selected_entry)
if event.button.id == "btn-resume":
self.exit((loc, "resume", inputs, overrides))
self.exit((loc, "resume", inputs, overrides, etype))
elif event.button.id == "btn-fork":
self.exit((loc, "fork", inputs, overrides))
self.exit((loc, "fork", inputs, overrides, etype))
def action_refresh(self) -> None:
self._refresh_tree()
def _apply_task_overrides(crew: Any, task_overrides: dict[int, str]) -> None:
"""Apply task output overrides to a restored Crew and print modifications."""
import click
click.echo("Modifications:")
overridden_agents: set[int] = set()
for task_idx, new_output in task_overrides.items():
if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None:
desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}"
if len(desc) > 60:
desc = desc[:57] + "..."
crew.tasks[task_idx].output.raw = new_output
preview = new_output.replace("\n", " ")
if len(preview) > 80:
preview = preview[:77] + "..."
click.echo(f" Task {task_idx + 1}: {desc}")
click.echo(f" -> {preview}")
agent = crew.tasks[task_idx].agent
if agent and agent.agent_executor:
nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent)
messages = agent.agent_executor.messages
system_positions = [
i for i, m in enumerate(messages) if m.get("role") == "system"
]
if nth < len(system_positions):
seg_start = system_positions[nth]
seg_end = (
system_positions[nth + 1]
if nth + 1 < len(system_positions)
else len(messages)
)
for j in range(seg_end - 1, seg_start, -1):
if messages[j].get("role") == "assistant":
messages[j]["content"] = new_output
break
overridden_agents.add(id(agent))
earliest = min(task_overrides)
for offset, subsequent in enumerate(crew.tasks[earliest + 1 :], start=earliest + 1):
if subsequent.output and offset not in task_overrides:
subsequent.output = None
if subsequent.agent and subsequent.agent.agent_executor:
subsequent.agent.agent_executor._resuming = False
if id(subsequent.agent) not in overridden_agents:
subsequent.agent.agent_executor.messages = []
click.echo()
async def _run_checkpoint_tui_async(location: str) -> None:
"""Async implementation of the checkpoint TUI flow."""
import click
@@ -545,13 +611,54 @@ async def _run_checkpoint_tui_async(location: str) -> None:
if selection is None:
return
selected, action, inputs, task_overrides = selection
selected, action, inputs, task_overrides, entity_type = selection
from crewai.crew import Crew
from crewai.state.checkpoint_config import CheckpointConfig
config = CheckpointConfig(restore_from=selected)
if entity_type == "flow":
from crewai.events.event_bus import crewai_event_bus
from crewai.flow.flow import Flow
if action == "fork":
click.echo(f"\nForking flow from: {selected}\n")
flow = Flow.fork(config)
else:
click.echo(f"\nResuming flow from: {selected}\n")
flow = Flow.from_checkpoint(config)
if task_overrides:
from crewai.crew import Crew as CrewCls
state = crewai_event_bus._runtime_state
if state is not None:
flat_offset = 0
for entity in state.root:
if not isinstance(entity, CrewCls) or not entity.tasks:
continue
n = len(entity.tasks)
local = {
idx - flat_offset: out
for idx, out in task_overrides.items()
if flat_offset <= idx < flat_offset + n
}
if local:
_apply_task_overrides(entity, local)
flat_offset += n
if inputs:
click.echo("Inputs:")
for k, v in inputs.items():
click.echo(f" {k}: {v}")
click.echo()
result = await flow.kickoff_async(inputs=inputs)
click.echo(f"\nResult: {getattr(result, 'raw', result)}")
return
from crewai.crew import Crew
if action == "fork":
click.echo(f"\nForking from: {selected}\n")
crew = Crew.fork(config)
@@ -560,50 +667,7 @@ async def _run_checkpoint_tui_async(location: str) -> None:
crew = Crew.from_checkpoint(config)
if task_overrides:
click.echo("Modifications:")
overridden_agents: set[int] = set()
for task_idx, new_output in task_overrides.items():
if task_idx < len(crew.tasks) and crew.tasks[task_idx].output is not None:
desc = crew.tasks[task_idx].description or f"Task {task_idx + 1}"
if len(desc) > 60:
desc = desc[:57] + "..."
crew.tasks[task_idx].output.raw = new_output # type: ignore[union-attr]
preview = new_output.replace("\n", " ")
if len(preview) > 80:
preview = preview[:77] + "..."
click.echo(f" Task {task_idx + 1}: {desc}")
click.echo(f" -> {preview}")
agent = crew.tasks[task_idx].agent
if agent and agent.agent_executor:
nth = sum(1 for t in crew.tasks[:task_idx] if t.agent is agent)
messages = agent.agent_executor.messages
system_positions = [
i for i, m in enumerate(messages) if m.get("role") == "system"
]
if nth < len(system_positions):
seg_start = system_positions[nth]
seg_end = (
system_positions[nth + 1]
if nth + 1 < len(system_positions)
else len(messages)
)
for j in range(seg_end - 1, seg_start, -1):
if messages[j].get("role") == "assistant":
messages[j]["content"] = new_output
break
overridden_agents.add(id(agent))
earliest = min(task_overrides)
for offset, subsequent in enumerate(
crew.tasks[earliest + 1 :], start=earliest + 1
):
if subsequent.output and offset not in task_overrides:
subsequent.output = None
if subsequent.agent and subsequent.agent.agent_executor:
subsequent.agent.agent_executor._resuming = False
if id(subsequent.agent) not in overridden_agents:
subsequent.agent.agent_executor.messages = []
click.echo()
_apply_task_overrides(crew, task_overrides)
if inputs:
click.echo("Inputs:")

View File

@@ -2138,7 +2138,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
try:
# Reset flow state for fresh execution unless restoring from persistence
is_restoring = inputs and "id" in inputs and self.persistence is not None
is_restoring = (
inputs and "id" in inputs and self.persistence is not None
) or self.checkpoint_completed_methods is not None
if not is_restoring:
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()

View File

@@ -7,6 +7,7 @@ import logging
import queue
import threading
from typing import Any, NamedTuple
import uuid
from typing_extensions import TypedDict
@@ -25,6 +26,10 @@ from crewai.utilities.string_utils import sanitize_tool_name
logger = logging.getLogger(__name__)
_current_stream_ids: contextvars.ContextVar[tuple[str, ...]] = contextvars.ContextVar(
"_current_stream_ids", default=()
)
class TaskInfo(TypedDict):
"""Task context information for streaming."""
@@ -45,6 +50,7 @@ class StreamingState(NamedTuple):
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None
loop: asyncio.AbstractEventLoop | None
handler: Callable[[Any, BaseEvent], None]
stream_id: str | None = None
def _extract_tool_call_info(
@@ -106,6 +112,7 @@ def _create_stream_handler(
sync_queue: queue.Queue[StreamChunk | None | Exception],
async_queue: asyncio.Queue[StreamChunk | None | Exception] | None = None,
loop: asyncio.AbstractEventLoop | None = None,
stream_id: str | None = None,
) -> Callable[[Any, BaseEvent], None]:
"""Create a stream handler function.
@@ -114,21 +121,19 @@ def _create_stream_handler(
sync_queue: Synchronous queue for chunks.
async_queue: Optional async queue for chunks.
loop: Optional event loop for async operations.
stream_id: Stream scope ID for concurrent isolation.
Returns:
Handler function that can be registered with the event bus.
"""
def stream_handler(_: Any, event: BaseEvent) -> None:
"""Handle LLM stream chunk events and enqueue them.
Args:
_: Event source (unused).
event: The event to process.
"""
if not isinstance(event, LLMStreamChunkEvent):
return
if stream_id is not None and stream_id not in _current_stream_ids.get():
return
chunk = _create_stream_chunk(event, current_task_info)
if async_queue is not None and loop is not None:
@@ -203,7 +208,11 @@ def create_streaming_state(
async_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
handler = _create_stream_handler(current_task_info, sync_queue, async_queue, loop)
stream_id = str(uuid.uuid4())
handler = _create_stream_handler(
current_task_info, sync_queue, async_queue, loop, stream_id=stream_id
)
crewai_event_bus.register_handler(LLMStreamChunkEvent, handler)
return StreamingState(
@@ -213,6 +222,7 @@ def create_streaming_state(
async_queue=async_queue,
loop=loop,
handler=handler,
stream_id=stream_id,
)
@@ -260,7 +270,12 @@ def create_chunk_generator(
Yields:
StreamChunk objects as they arrive.
"""
ctx = contextvars.copy_context()
if state.stream_id is not None:
token = _current_stream_ids.set((*_current_stream_ids.get(), state.stream_id))
ctx = contextvars.copy_context()
_current_stream_ids.reset(token)
else:
ctx = contextvars.copy_context()
thread = threading.Thread(target=ctx.run, args=(run_func,), daemon=True)
thread.start()
@@ -300,7 +315,12 @@ async def create_async_chunk_generator(
"Async queue not initialized. Use create_streaming_state(use_async=True)."
)
task = asyncio.create_task(run_coro())
if state.stream_id is not None:
token = _current_stream_ids.set((*_current_stream_ids.get(), state.stream_id))
task = asyncio.create_task(run_coro())
_current_stream_ids.reset(token)
else:
task = asyncio.create_task(run_coro())
try:
while True:

View File

@@ -879,3 +879,91 @@ class TestStreamingImports:
assert StreamChunk is not None
assert StreamChunkType is not None
assert ToolCallChunk is not None
class TestConcurrentStreamIsolation:
"""Regression tests for concurrent streaming isolation (issue #5376)."""
def test_concurrent_streams_do_not_cross_contaminate(self) -> None:
"""Two concurrent streaming runs must each receive only their own chunks.
Mirrors the real production path: create_streaming_state in the caller,
then temporarily push the stream_id into the ContextVar, copy_context,
and reset — exactly as create_chunk_generator does.
"""
import contextvars
import threading
from crewai.utilities.streaming import (
TaskInfo,
_current_stream_ids,
_unregister_handler,
create_streaming_state,
)
task_info_a: TaskInfo = {
"index": 0,
"name": "task_a",
"id": "a",
"agent_role": "A",
"agent_id": "a",
}
task_info_b: TaskInfo = {
"index": 1,
"name": "task_b",
"id": "b",
"agent_role": "B",
"agent_id": "b",
}
state_a = create_streaming_state(task_info_a, [])
state_b = create_streaming_state(task_info_b, [])
def make_emitter_ctx(state: Any) -> contextvars.Context:
token = _current_stream_ids.set(
(*_current_stream_ids.get(), state.stream_id)
)
ctx = contextvars.copy_context()
_current_stream_ids.reset(token)
return ctx
ctx_a = make_emitter_ctx(state_a)
ctx_b = make_emitter_ctx(state_b)
def emit_chunks(prefix: str, call_id: str) -> None:
for text in [f"{prefix}1", f"{prefix}2", f"{prefix}3"]:
crewai_event_bus.emit(
None,
event=LLMStreamChunkEvent(
chunk=text, call_id=call_id, response_id="r"
),
)
t_a = threading.Thread(target=ctx_a.run, args=(lambda: emit_chunks("A", "ca"),))
t_b = threading.Thread(target=ctx_b.run, args=(lambda: emit_chunks("B", "cb"),))
t_a.start()
t_b.start()
t_a.join()
t_b.join()
chunks_a: list[str] = []
while not state_a.sync_queue.empty():
item = state_a.sync_queue.get_nowait()
if isinstance(item, StreamChunk):
chunks_a.append(item.content)
chunks_b: list[str] = []
while not state_b.sync_queue.empty():
item = state_b.sync_queue.get_nowait()
if isinstance(item, StreamChunk):
chunks_b.append(item.content)
assert set(chunks_a) == {"A1", "A2", "A3"}, (
f"Stream A received unexpected chunks: {chunks_a}"
)
assert set(chunks_b) == {"B1", "B2", "B3"}, (
f"Stream B received unexpected chunks: {chunks_b}"
)
_unregister_handler(state_a.handler)
_unregister_handler(state_b.handler)