mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-01 13:18:10 +00:00
Address review: async task logging, fix typo, use _get_agent_to_use for role
- Add completion/handoff logs for async tasks in _process_async_tasks - Fix pre-existing filter_headeruvs typo back to filter_headers - Use _get_agent_to_use() instead of task.agent.role in skip path to match the effective agent used during execution Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -707,7 +707,7 @@ class Crew(BaseModel):
|
||||
"""
|
||||
|
||||
task_outputs: List[TaskOutput] = []
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int, str, Optional[str]]] = []
|
||||
last_sync_output: Optional[TaskOutput] = None
|
||||
previous_agent_role: Optional[str] = None
|
||||
|
||||
@@ -719,8 +719,9 @@ class Crew(BaseModel):
|
||||
else:
|
||||
task_outputs = [task.output]
|
||||
last_sync_output = task.output
|
||||
if task.agent is not None:
|
||||
previous_agent_role = task.agent.role
|
||||
skipped_agent = self._get_agent_to_use(task)
|
||||
if skipped_agent is not None:
|
||||
previous_agent_role = skipped_agent.role
|
||||
continue
|
||||
|
||||
agent_to_use = self._get_agent_to_use(task)
|
||||
@@ -762,7 +763,9 @@ class Crew(BaseModel):
|
||||
context=context,
|
||||
tools=tools_for_task,
|
||||
)
|
||||
futures.append((task, future, task_index))
|
||||
futures.append(
|
||||
(task, future, task_index, agent_to_use.role, next_agent_role)
|
||||
)
|
||||
else:
|
||||
if futures:
|
||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
||||
@@ -792,7 +795,7 @@ class Crew(BaseModel):
|
||||
self,
|
||||
task: ConditionalTask,
|
||||
task_outputs: List[TaskOutput],
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int, str, Optional[str]]],
|
||||
task_index: int,
|
||||
was_replayed: bool,
|
||||
) -> Optional[TaskOutput]:
|
||||
@@ -984,17 +987,20 @@ class Crew(BaseModel):
|
||||
|
||||
def _process_async_tasks(
|
||||
self,
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
||||
futures: List[Tuple[Task, Future[TaskOutput], int, str, Optional[str]]],
|
||||
was_replayed: bool = False,
|
||||
) -> List[TaskOutput]:
|
||||
task_outputs: List[TaskOutput] = []
|
||||
for future_task, future, task_index in futures:
|
||||
for future_task, future, task_index, agent_role, next_agent_role in futures:
|
||||
task_output = future.result()
|
||||
task_outputs.append(task_output)
|
||||
self._process_task_result(future_task, task_output)
|
||||
self._store_execution_log(
|
||||
future_task, task_output, task_index, was_replayed
|
||||
)
|
||||
self._log_task_completion(
|
||||
agent_role, future_task, next_agent_role
|
||||
)
|
||||
return task_outputs
|
||||
|
||||
def _find_task_index(
|
||||
|
||||
@@ -1123,7 +1123,7 @@ def test_kickoff_for_each_empty_input():
|
||||
assert results == []
|
||||
|
||||
|
||||
@pytest.mark.vcr(filter_headeruvs=["authorization"])
|
||||
@pytest.mark.vcr(filter_headers=["authorization"])
|
||||
def test_kickoff_for_each_invalid_input():
|
||||
"""Tests if kickoff_for_each raises TypeError for invalid input types."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user