diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index 4a6361cce..c80ec2a2a 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -219,7 +219,11 @@ class Flow(Generic[T], metaclass=FlowMeta): """Returns the list of all outputs from executed methods.""" return self._method_outputs - def _initialize_state(self, inputs: Dict[str, Any]) -> None: + def _initialize_state(self, inputs: Optional[Dict[str, Any]] = None) -> None: + """Initialize the state of the flow.""" + if inputs is None: + return + if isinstance(self._state, BaseModel): # Structured state try: @@ -245,6 +249,8 @@ class Flow(Generic[T], metaclass=FlowMeta): self._state.update(inputs) else: raise TypeError("State must be a BaseModel instance or a dictionary.") + + self._interpolate_inputs_in_crew(inputs) def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any: self.event_emitter.send( @@ -406,6 +412,11 @@ class Flow(Generic[T], metaclass=FlowMeta): traceback.print_exc() + def _interpolate_inputs_in_crew(self, inputs: Dict[str, Any]) -> None: + """Interpolate inputs in the crew's tasks and agents if a crew is present.""" + if hasattr(self, 'crew') and self.crew: + self.crew._interpolate_inputs(inputs) + def plot(self, filename: str = "crewai_flow") -> None: self._telemetry.flow_plotting_span( self.__class__.__name__, list(self._methods.keys()) diff --git a/tests/flow_test.py b/tests/flow_test.py index d52c459ce..b87d23e42 100644 --- a/tests/flow_test.py +++ b/tests/flow_test.py @@ -322,3 +322,43 @@ def test_router_with_multiple_conditions(): # final_step should run after router_and assert execution_order.index("log_final_step") > execution_order.index("router_and") + + +def test_flow_inputs_passed_to_tasks(): + """Test that inputs passed to Flow's kickoff method are correctly interpolated in task descriptions.""" + from crewai import Agent, Crew, Task + from crewai.llm import LLM + + agent = Agent( + role="Test Agent", + goal="Test Goal", + backstory="Test Backstory", + llm=LLM(model="gpt-4o-mini") + ) + + task = Task( + description="Process data about {topic}", + expected_output="Information about {topic}", + agent=agent + ) + + crew = Crew( + agents=[agent], + tasks=[task] + ) + + class TestFlow(Flow): + def __init__(self): + super().__init__() + self.crew = crew + + @start() + def start_process(self): + pass + + flow = TestFlow() + inputs = {"topic": "artificial intelligence"} + flow.kickoff(inputs=inputs) + + assert task.description == "Process data about artificial intelligence" + assert task.expected_output == "Information about artificial intelligence"