diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 34bda0ead..699f63930 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -2123,15 +2123,24 @@ class Flow(Generic[T], metaclass=FlowMeta): if future: self._event_futures.append(future) - if asyncio.iscoroutinefunction(method): - result = await method(*args, **kwargs) - else: - # Run sync methods in thread pool for isolation - # This allows Agent.kickoff() to work synchronously inside Flow methods - import contextvars + # Set method name in context so ask() can read it without + # stack inspection. Must happen before copy_context() so the + # value propagates into the thread pool for sync methods. + from crewai.flow.flow_context import current_flow_method_name - ctx = contextvars.copy_context() - result = await asyncio.to_thread(ctx.run, method, *args, **kwargs) + method_name_token = current_flow_method_name.set(method_name) + try: + if asyncio.iscoroutinefunction(method): + result = await method(*args, **kwargs) + else: + # Run sync methods in thread pool for isolation + # This allows Agent.kickoff() to work synchronously inside Flow methods + import contextvars + + ctx = contextvars.copy_context() + result = await asyncio.to_thread(ctx.run, method, *args, **kwargs) + finally: + current_flow_method_name.reset(method_name_token) # Auto-await coroutines returned from sync methods (enables AgentExecutor pattern) if asyncio.iscoroutine(result): @@ -2608,23 +2617,6 @@ class Flow(Generic[T], metaclass=FlowMeta): return flow_config.input_provider return ConsoleProvider() - def _resolve_calling_method_name(self) -> str: - """Resolve the name of the flow method that called ask(). - - Walks the call stack to find the first frame whose function name - matches a registered flow method on this instance. - - Returns: - The flow method name, or ``"unknown"`` if not found. - """ - import inspect - - for frame_info in inspect.stack(): - fname = frame_info.function - if fname in self._methods: - return fname - return "unknown" - def _checkpoint_state_for_ask(self) -> None: """Auto-checkpoint flow state before waiting for user input. @@ -2706,9 +2698,10 @@ class Flow(Generic[T], metaclass=FlowMeta): FlowInputReceivedEvent, FlowInputRequestedEvent, ) + from crewai.flow.flow_context import current_flow_method_name from crewai.flow.input_provider import InputResponse - method_name = self._resolve_calling_method_name() + method_name = current_flow_method_name.get("unknown") # Emit input requested event crewai_event_bus.emit( diff --git a/lib/crewai/src/crewai/flow/flow_context.py b/lib/crewai/src/crewai/flow/flow_context.py index ae9bd69f9..0ff6cf973 100644 --- a/lib/crewai/src/crewai/flow/flow_context.py +++ b/lib/crewai/src/crewai/flow/flow_context.py @@ -14,3 +14,7 @@ current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.Contex current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar( "flow_id", default=None ) + +current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar( + "flow_method_name", default="unknown" +)