Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
59044b6512 Fix issue #2545: Pass inputs from Flow to Tasks for template interpolation
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-09 07:57:09 +00:00
5 changed files with 52 additions and 41 deletions

View File

@@ -440,16 +440,5 @@ A: CrewAI uses anonymous telemetry to collect usage data for improvement purpose
### Q: Where can I find examples of CrewAI in action?
A: You can find various real-life examples in the [CrewAI-examples repository](https://github.com/crewAIInc/crewAI-examples), including trip planners, stock analysis tools, and more.
### Q: Can I package my CrewAI application as an executable?
A: Yes, CrewAI is compatible with PyInstaller, which allows you to package your application into a standalone executable. To create an executable:
```shell
# Install PyInstaller
pip install pyinstaller
# Create the executable
pyinstaller --onefile your_script.py
```
The generated executable will be in the `dist` directory.
### Q: How can I contribute to CrewAI?
A: Contributions are welcome! You can fork the repository, create a new branch for your feature, add your improvement, and send a pull request. Check the Contribution section in the README for more details.

View File

@@ -219,7 +219,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
def _initialize_state(self, inputs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the state of the flow."""
if inputs is None:
return
if isinstance(self._state, BaseModel):
# Structured state
try:
@@ -245,6 +249,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
self._interpolate_inputs_in_crew(inputs)
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
self.event_emitter.send(
@@ -406,6 +412,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
traceback.print_exc()
def _interpolate_inputs_in_crew(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs in the crew's tasks and agents if a crew is present."""
if hasattr(self, 'crew') and self.crew:
self.crew._interpolate_inputs(inputs)
def plot(self, filename: str = "crewai_flow") -> None:
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())

View File

@@ -1,13 +0,0 @@
import os
import sys
def is_bundled():
"""Check if the application is running from a PyInstaller bundle."""
return getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS')
def get_bundle_dir():
"""Get the PyInstaller bundle directory if the application is bundled."""
if is_bundled():
return sys._MEIPASS
return None

View File

@@ -322,3 +322,43 @@ def test_router_with_multiple_conditions():
# final_step should run after router_and
assert execution_order.index("log_final_step") > execution_order.index("router_and")
def test_flow_inputs_passed_to_tasks():
"""Test that inputs passed to Flow's kickoff method are correctly interpolated in task descriptions."""
from crewai import Agent, Crew, Task
from crewai.llm import LLM
agent = Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
llm=LLM(model="gpt-4o-mini")
)
task = Task(
description="Process data about {topic}",
expected_output="Information about {topic}",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task]
)
class TestFlow(Flow):
def __init__(self):
super().__init__()
self.crew = crew
@start()
def start_process(self):
pass
flow = TestFlow()
inputs = {"topic": "artificial intelligence"}
flow.kickoff(inputs=inputs)
assert task.description == "Process data about artificial intelligence"
assert task.expected_output == "Information about artificial intelligence"

View File

@@ -1,16 +0,0 @@
import sys
import unittest
from unittest.mock import patch
from crewai.utilities.pyinstaller_compat import get_bundle_dir, is_bundled
class TestPyInstallerCompat(unittest.TestCase):
def test_is_bundled_normal(self):
self.assertFalse(is_bundled())
@patch.object(sys, 'frozen', True, create=True)
@patch.object(sys, '_MEIPASS', '/path/to/bundle', create=True)
def test_is_bundled_pyinstaller(self):
self.assertTrue(is_bundled())
self.assertEqual(get_bundle_dir(), '/path/to/bundle')