Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
bc5252826e Fix type validation error in hierarchical process delegation (Issue #2606)
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-15 08:54:52 +00:00
5 changed files with 52 additions and 18 deletions

View File

@@ -14,13 +14,13 @@ class Knowledge(BaseModel):
Knowledge is a collection of sources and setup for the vector store to save and query relevant context.
Args:
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
"""
sources: List[BaseKnowledgeSource] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
embedder_config: Optional[Dict[str, Any]] = None
collection_name: Optional[str] = None

View File

@@ -22,7 +22,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
default_factory=list, description="The path to the file"
)
content: Dict[Path, str] = Field(init=False, default_factory=dict)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
@@ -62,10 +62,7 @@ class BaseFileKnowledgeSource(BaseKnowledgeSource, ABC):
def _save_documents(self):
"""Save the documents to the storage."""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
self.storage.save(self.chunks)
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""

View File

@@ -16,7 +16,7 @@ class BaseKnowledgeSource(BaseModel, ABC):
chunk_embeddings: List[np.ndarray] = Field(default_factory=list)
model_config = ConfigDict(arbitrary_types_allowed=True)
storage: Optional[KnowledgeStorage] = Field(default=None)
storage: KnowledgeStorage = Field(default_factory=KnowledgeStorage)
metadata: Dict[str, Any] = Field(default_factory=dict) # Currently unused
collection_name: Optional[str] = Field(default=None)
@@ -46,7 +46,4 @@ class BaseKnowledgeSource(BaseModel, ABC):
Save the documents to the storage.
This method should be called after the chunks and embeddings are generated.
"""
if self.storage:
self.storage.save(self.chunks)
else:
raise ValueError("No storage found to save documents.")
self.storage.save(self.chunks)

View File

@@ -1,4 +1,5 @@
from typing import Optional
import json
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, Field
@@ -6,8 +7,8 @@ from crewai.tools.agent_tools.base_agent_tools import BaseAgentTool
class DelegateWorkToolSchema(BaseModel):
task: str = Field(..., description="The task to delegate")
context: str = Field(..., description="The context for the task")
task: Union[str, Dict[str, Any]] = Field(..., description="The task to delegate")
context: Union[str, Dict[str, Any]] = Field(..., description="The context for the task")
coworker: str = Field(
..., description="The role/name of the coworker to delegate to"
)
@@ -21,10 +22,12 @@ class DelegateWorkTool(BaseAgentTool):
def _run(
self,
task: str,
context: str,
task: Union[str, Dict[str, Any]],
context: Union[str, Dict[str, Any]],
coworker: Optional[str] = None,
**kwargs,
) -> str:
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, task, context)
task_str = json.dumps(task) if isinstance(task, dict) else task
context_str = json.dumps(context) if isinstance(context, dict) else context
return self._execute(coworker, task_str, context_str)

View File

@@ -0,0 +1,37 @@
"""Test delegate work tool with dictionary inputs."""
import pytest
from crewai.agent import Agent
from crewai.tools.agent_tools.agent_tools import AgentTools
researcher = Agent(
role="researcher",
goal="make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology",
allow_delegation=False,
)
tools = AgentTools(agents=[researcher]).tools()
delegate_tool = tools[0]
@pytest.mark.vcr(filter_headers=["authorization"])
def test_delegate_work_with_dict_input():
"""Test that the delegate work tool can handle dictionary inputs."""
task_dict = {
"description": "share your take on AI Agents",
"goal": "provide comprehensive analysis"
}
context_dict = {
"background": "I heard you hate them",
"additional_info": "We need this for a report"
}
result = delegate_tool.run(
coworker="researcher",
task=task_dict,
context=context_dict,
)
assert isinstance(result, str)
assert len(result) > 0