Add selective task execution feature for issue #2941

- Add tags field to Task class for categorization
- Add task_selector parameter to Crew class
- Implement task filtering in _execute_tasks method
- Add Process.selective type with validation
- Add helper method for tag-based selection
- Add comprehensive tests covering all scenarios
- Maintain backward compatibility with existing crews

Fixes #2941: Users can now run only specific agents/tasks based on input parameters like 'action', rather than executing the entire crew process.

Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
Devin AI
2025-06-03 17:07:36 +00:00
parent 2bd6b72aae
commit 0e963b6de1
5 changed files with 295 additions and 1 deletions

View File

@@ -200,6 +200,10 @@ class Crew(FlowTrackable, BaseModel):
default_factory=list,
description="List of callbacks to be executed after crew kickoff. It may be used to adjust the output of the crew.",
)
task_selector: Optional[Callable[[Dict[str, Any], Task], bool]] = Field(
default=None,
description="Function to determine which tasks should execute based on inputs and task properties.",
)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the crew execution to be respected.",
@@ -504,6 +508,17 @@ class Crew(FlowTrackable, BaseModel):
)
return self
@model_validator(mode="after")
def validate_selective_process_requirements(self) -> "Crew":
"""Ensure selective process has required task_selector."""
if self.process == Process.selective and not self.task_selector:
raise PydanticCustomError(
"missing_task_selector",
"Selective process requires a task_selector to be defined.",
{},
)
return self
@property
def key(self) -> str:
source: List[str] = [agent.key for agent in self.agents] + [
@@ -661,6 +676,8 @@ class Crew(FlowTrackable, BaseModel):
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result = self._run_hierarchical_process()
elif self.process == Process.selective:
result = self._run_selective_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
@@ -777,6 +794,12 @@ class Crew(FlowTrackable, BaseModel):
self._create_manager_agent()
return self._execute_tasks(self.tasks)
def _run_selective_process(self) -> CrewOutput:
"""Executes tasks selectively based on task_selector and returns the final output."""
if not self.task_selector:
raise ValueError("Selective process requires a task_selector to be defined.")
return self._execute_tasks(self.tasks)
def _create_manager_agent(self):
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
@@ -812,12 +835,22 @@ class Crew(FlowTrackable, BaseModel):
Args:
tasks (List[Task]): List of tasks to execute
manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None.
start_index (Optional[int], optional): Starting index for task execution. Defaults to 0.
was_replayed (bool, optional): Whether this is a replayed execution. Defaults to False.
Returns:
CrewOutput: Final output of the crew
"""
if self.task_selector and self._inputs:
filtered_tasks = [
task for task in tasks
if self.task_selector(self._inputs, task)
]
if not filtered_tasks:
raise ValueError("No tasks match the selection criteria. At least one task must be selected for execution.")
tasks = filtered_tasks
task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
last_sync_output: Optional[TaskOutput] = None
@@ -1506,3 +1539,27 @@ class Crew(FlowTrackable, BaseModel):
"""Reset crew and agent knowledge storage."""
for ks in knowledges:
ks.reset()
@staticmethod
def create_tag_selector(action_key: str = "action", tag_mapping: Optional[Dict[str, List[str]]] = None) -> Callable[[Dict[str, Any], Task], bool]:
"""Create a task selector function based on tags and input action.
Args:
action_key: Key in inputs dict that specifies the action (default: "action")
tag_mapping: Optional mapping of action values to required tags
Returns:
Function that selects tasks based on tags matching the action
"""
def selector(inputs: Dict[str, Any], task: Task) -> bool:
action = inputs.get(action_key)
if not action or not task.tags:
return True
if tag_mapping and action in tag_mapping:
required_tags = tag_mapping[action]
return any(tag in task.tags for tag in required_tags)
else:
return action in task.tags
return selector

View File

@@ -8,4 +8,5 @@ class Process(str, Enum):
sequential = "sequential"
hierarchical = "hierarchical"
selective = "selective"
# TODO: consensual = 'consensual'

View File

@@ -139,6 +139,10 @@ class Task(BaseModel):
description="Whether the task should instruct the agent to return the final answer formatted in Markdown",
default=False,
)
tags: Optional[List[str]] = Field(
default=None,
description="Tags to categorize this task for selective execution.",
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,