mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-01 21:28:10 +00:00
Add current_flow_method_name context variable for flow method tracking
- Introduced a new context variable, `current_flow_method_name`, to store the name of the currently executing flow method, defaulting to "unknown". - Updated the Flow class to set and reset this context variable during method execution, enhancing the ability to track method calls without stack inspection. - Removed the obsolete `_resolve_calling_method_name` method, streamlining the code and improving clarity.
This commit is contained in:
@@ -2123,15 +2123,24 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if future:
|
||||
self._event_futures.append(future)
|
||||
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
result = await method(*args, **kwargs)
|
||||
else:
|
||||
# Run sync methods in thread pool for isolation
|
||||
# This allows Agent.kickoff() to work synchronously inside Flow methods
|
||||
import contextvars
|
||||
# Set method name in context so ask() can read it without
|
||||
# stack inspection. Must happen before copy_context() so the
|
||||
# value propagates into the thread pool for sync methods.
|
||||
from crewai.flow.flow_context import current_flow_method_name
|
||||
|
||||
ctx = contextvars.copy_context()
|
||||
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
|
||||
method_name_token = current_flow_method_name.set(method_name)
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(method):
|
||||
result = await method(*args, **kwargs)
|
||||
else:
|
||||
# Run sync methods in thread pool for isolation
|
||||
# This allows Agent.kickoff() to work synchronously inside Flow methods
|
||||
import contextvars
|
||||
|
||||
ctx = contextvars.copy_context()
|
||||
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
|
||||
finally:
|
||||
current_flow_method_name.reset(method_name_token)
|
||||
|
||||
# Auto-await coroutines returned from sync methods (enables AgentExecutor pattern)
|
||||
if asyncio.iscoroutine(result):
|
||||
@@ -2608,23 +2617,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
return flow_config.input_provider
|
||||
return ConsoleProvider()
|
||||
|
||||
def _resolve_calling_method_name(self) -> str:
|
||||
"""Resolve the name of the flow method that called ask().
|
||||
|
||||
Walks the call stack to find the first frame whose function name
|
||||
matches a registered flow method on this instance.
|
||||
|
||||
Returns:
|
||||
The flow method name, or ``"unknown"`` if not found.
|
||||
"""
|
||||
import inspect
|
||||
|
||||
for frame_info in inspect.stack():
|
||||
fname = frame_info.function
|
||||
if fname in self._methods:
|
||||
return fname
|
||||
return "unknown"
|
||||
|
||||
def _checkpoint_state_for_ask(self) -> None:
|
||||
"""Auto-checkpoint flow state before waiting for user input.
|
||||
|
||||
@@ -2706,9 +2698,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
FlowInputReceivedEvent,
|
||||
FlowInputRequestedEvent,
|
||||
)
|
||||
from crewai.flow.flow_context import current_flow_method_name
|
||||
from crewai.flow.input_provider import InputResponse
|
||||
|
||||
method_name = self._resolve_calling_method_name()
|
||||
method_name = current_flow_method_name.get("unknown")
|
||||
|
||||
# Emit input requested event
|
||||
crewai_event_bus.emit(
|
||||
|
||||
@@ -14,3 +14,7 @@ current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.Contex
|
||||
current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
|
||||
"flow_id", default=None
|
||||
)
|
||||
|
||||
current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar(
|
||||
"flow_method_name", default="unknown"
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user