Fix issues with flows post merge

This commit is contained in:
Brandon Hancock
2025-02-24 10:47:25 -05:00
parent a1cb222f3a
commit 8ac6f6a536

View File

@@ -721,7 +721,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
return asyncio.run(run_flow())
@init_flow_main_trace
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Start the flow execution asynchronously.
@@ -781,21 +780,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")
# Execute all start methods concurrently.
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
await asyncio.gather(*tasks)
final_output = self._method_outputs[-1] if self._method_outputs else None
# Emit FlowFinishedEvent after all processing is complete.
crewai_event_bus.emit(
self,
FlowFinishedEvent(
@@ -804,6 +796,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
result=final_output,
),
)
return final_output
async def _execute_start_method(self, start_method_name: str) -> None: