Compare commits

..

4 Commits

5 changed files with 13 additions and 58 deletions

View File

@@ -219,11 +219,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
def _initialize_state(self, inputs: Optional[Dict[str, Any]] = None) -> None:
"""Initialize the state of the flow."""
if inputs is None:
return
def _initialize_state(self, inputs: Dict[str, Any]) -> None:
if isinstance(self._state, BaseModel):
# Structured state
try:
@@ -249,8 +245,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
self._interpolate_inputs_in_crew(inputs)
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
self.event_emitter.send(
@@ -412,11 +406,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
traceback.print_exc()
def _interpolate_inputs_in_crew(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs in the crew's tasks and agents if a crew is present."""
if hasattr(self, 'crew') and self.crew:
self.crew._interpolate_inputs(inputs)
def plot(self, filename: str = "crewai_flow") -> None:
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())

View File

@@ -14,13 +14,13 @@ class Knowledge(BaseModel):
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None

View File

@@ -22,7 +22,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
default_factory=list, description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
@@ -62,7 +62,10 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
def _save_documents(self):
"""Save the documents to the storage."""
self.storage.save(self.chunks)
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""

View File

@@ -16,7 +16,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
storage: Optional[KnowledgeStorage] = Field(default=None)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
@@ -46,4 +46,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
self.storage.save(self.chunks)
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")

View File

@@ -322,43 +322,3 @@ def test_router_with_multiple_conditions():
# final_step should run after router_and
assert execution_order.index("log_final_step") > execution_order.index("router_and")
def test_flow_inputs_passed_to_tasks():
"""Test that inputs passed to Flow's kickoff method are correctly interpolated in task descriptions."""
from crewai import Agent, Crew, Task
from crewai.llm import LLM
agent = Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
llm=LLM(model="gpt-4o-mini")
)
task = Task(
description="Process data about {topic}",
expected_output="Information about {topic}",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task]
)
class TestFlow(Flow):
def __init__(self):
super().__init__()
self.crew = crew
@start()
def start_process(self):
pass
flow = TestFlow()
inputs = {"topic": "artificial intelligence"}
flow.kickoff(inputs=inputs)
assert task.description == "Process data about artificial intelligence"
assert task.expected_output == "Information about artificial intelligence"