From 563e2eccbd7b77d529c7d35883cece637b011297 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Sat, 3 Jan 2026 17:31:49 +0000 Subject: [PATCH] Fix lint errors (B023, W293, B007, PERF102) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: João --- lib/crewai/src/crewai/crew.py | 105 ++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 48 deletions(-) diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index c1fd91ffb..a3bb33ca9 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -951,23 +951,32 @@ class Crew(FlowTrackable, BaseModel): if task.async_execution: # Capture token usage before async task execution tokens_before = self._get_agent_token_usage(exec_data.agent) - + context = self._get_context( task, [last_sync_output] if last_sync_output else [] ) - + # Wrap task execution to capture tokens immediately after completion - async def _wrapped_task_execution(): - result = await task.aexecute_sync( - agent=exec_data.agent, - context=context, - tools=exec_data.tools, + # Use default arguments to bind loop variables at definition time (fixes B023) + agent = exec_data.agent + tools = exec_data.tools + + async def _wrapped_task_execution( + _task=task, + _agent=agent, + _tools=tools, + _context=context, + ): + result = await _task.aexecute_sync( + agent=_agent, + context=_context, + tools=_tools, ) # Capture tokens immediately after task completes # This reduces (but doesn't eliminate) race conditions - tokens_after = self._get_agent_token_usage(exec_data.agent) + tokens_after = self._get_agent_token_usage(_agent) return result, tokens_after - + async_task = asyncio.create_task(_wrapped_task_execution()) pending_tasks.append((task, async_task, task_index, exec_data.agent, tokens_before)) else: @@ -979,20 +988,20 @@ class Crew(FlowTrackable, BaseModel): # Capture token usage before task execution tokens_before = self._get_agent_token_usage(exec_data.agent) - + context = self._get_context(task, task_outputs) task_output = await task.aexecute_sync( agent=exec_data.agent, context=context, tools=exec_data.tools, ) - + # Capture token usage after task execution and attach to task output tokens_after = self._get_agent_token_usage(exec_data.agent) task_output = self._attach_task_token_metrics( task_output, task, exec_data.agent, tokens_before, tokens_after ) - + task_outputs.append(task_output) self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index, was_replayed) @@ -1029,12 +1038,12 @@ class Crew(FlowTrackable, BaseModel): for future_task, async_task, task_index, agent, tokens_before in pending_tasks: # Unwrap the result which includes both output and tokens_after task_output, tokens_after = await async_task - + # Attach token metrics using the captured tokens_after task_output = self._attach_task_token_metrics( task_output, future_task, agent, tokens_before, tokens_after ) - + task_outputs.append(task_output) self._process_task_result(future_task, task_output) self._store_execution_log( @@ -1156,7 +1165,7 @@ class Crew(FlowTrackable, BaseModel): task_outputs: list[TaskOutput] = [] futures: list[tuple[Task, Future[TaskOutput | tuple[TaskOutput, Any]], int, Any, Any]] = [] last_sync_output: TaskOutput | None = None - + # Per-agent locks to serialize async task execution for accurate token tracking # This ensures that when multiple async tasks from the same agent run, # they execute one at a time so token deltas can be accurately attributed @@ -1181,19 +1190,19 @@ class Crew(FlowTrackable, BaseModel): context = self._get_context( task, [last_sync_output] if last_sync_output else [] ) - + # Get or create a lock for this agent to serialize async task execution # This ensures accurate per-task token tracking agent_id = str(getattr(exec_data.agent, 'id', id(exec_data.agent))) if agent_id not in agent_locks: agent_locks[agent_id] = threading.Lock() agent_lock = agent_locks[agent_id] - + # Create a token capture callback that will be called inside the thread # after task completion (while still holding the lock) def create_token_callback(agent: Any = exec_data.agent) -> Any: return self._get_agent_token_usage(agent) - + future = task.execute_async( agent=exec_data.agent, context=context, @@ -1211,20 +1220,20 @@ class Crew(FlowTrackable, BaseModel): # Capture token usage before task execution tokens_before = self._get_agent_token_usage(exec_data.agent) - + context = self._get_context(task, task_outputs) task_output = task.execute_sync( agent=exec_data.agent, context=context, tools=exec_data.tools, ) - + # Capture token usage after task execution and attach to task output tokens_after = self._get_agent_token_usage(exec_data.agent) task_output = self._attach_task_token_metrics( task_output, task, exec_data.agent, tokens_before, tokens_after ) - + task_outputs.append(task_output) self._process_task_result(task, task_output) self._store_execution_log(task, task_output, task_index, was_replayed) @@ -1474,11 +1483,11 @@ class Crew(FlowTrackable, BaseModel): was_replayed: bool = False, ) -> list[TaskOutput]: """Process completed async tasks and attach token metrics. - + The futures contain either: - TaskOutput (if no token tracking was enabled) - tuple of (TaskOutput, tokens_before, tokens_after) (if token tracking was enabled) - + Token tracking is enabled when the task was executed with a token_capture_callback and agent_execution_lock, which ensures accurate per-task token attribution even when multiple async tasks from the same agent run concurrently. @@ -1486,7 +1495,7 @@ class Crew(FlowTrackable, BaseModel): task_outputs: list[TaskOutput] = [] for future_task, future, task_index, agent, _ in futures: result = future.result() - + # Check if result is a tuple (token tracking enabled) or just TaskOutput if isinstance(result, tuple) and len(result) == 3: task_output, tokens_before, tokens_after = result @@ -1496,7 +1505,7 @@ class Crew(FlowTrackable, BaseModel): else: # No token tracking - result is just TaskOutput task_output = result - + task_outputs.append(task_output) self._process_task_result(future_task, task_output) self._store_execution_log( @@ -1706,9 +1715,9 @@ class Crew(FlowTrackable, BaseModel): AgentTokenMetrics, WorkflowTokenMetrics, ) - + total_usage_metrics = UsageMetrics() - + # Preserve existing workflow_token_metrics if it exists (has per_task data) if hasattr(self, 'workflow_token_metrics') and self.workflow_token_metrics: workflow_metrics = self.workflow_token_metrics @@ -1720,23 +1729,23 @@ class Crew(FlowTrackable, BaseModel): # Key by agent_id to handle multiple agents with the same role agent_token_sums = {} agent_info_map = {} # Map agent_id to (agent_name, agent_id) - + # First, build a map of all agents by their ID for agent in self.agents: agent_role = getattr(agent, 'role', 'Unknown Agent') agent_id = str(getattr(agent, 'id', '')) agent_info_map[agent_id] = (agent_role, agent_id) - + if workflow_metrics.per_task: # Sum up tokens for each agent from their tasks # We need to find which agent_id corresponds to each task's agent_name - for task_name, task_metrics in workflow_metrics.per_task.items(): + for task_metrics in workflow_metrics.per_task.values(): agent_name = task_metrics.agent_name # Find the agent_id for this agent_name from agent_info_map # For now, we'll use the agent_name as a temporary key but this needs improvement # TODO: Store agent_id in TaskTokenMetrics to avoid this lookup matching_agent_ids = [aid for aid, (name, _) in agent_info_map.items() if name == agent_name] - + # Use the first matching agent_id (limitation: can't distinguish between same-role agents) # This is better than nothing but ideally we'd store agent_id in TaskTokenMetrics for agent_id in matching_agent_ids: @@ -1755,12 +1764,12 @@ class Crew(FlowTrackable, BaseModel): agent_token_sums[agent_id]['completion_tokens'] += task_metrics.completion_tokens agent_token_sums[agent_id]['successful_requests'] += task_metrics.successful_requests break # Only add to first matching agent - + # Create per-agent metrics from the summed task data, keyed by agent_id for agent in self.agents: agent_role = getattr(agent, 'role', 'Unknown Agent') agent_id = str(getattr(agent, 'id', '')) - + if agent_id in agent_token_sums: # Use accurate per-task summed data sums = agent_token_sums[agent_id] @@ -1775,7 +1784,7 @@ class Crew(FlowTrackable, BaseModel): ) # Key by agent_id to avoid collision for agents with same role workflow_metrics.per_agent[agent_id] = agent_metrics - + # Still get total usage for overall metrics if isinstance(agent.llm, BaseLLM): llm_usage = agent.llm.get_token_usage_summary() @@ -1789,11 +1798,11 @@ class Crew(FlowTrackable, BaseModel): if self.manager_agent: manager_role = getattr(self.manager_agent, 'role', 'Manager Agent') manager_id = str(getattr(self.manager_agent, 'id', '')) - + if hasattr(self.manager_agent, "_token_process"): token_sum = self.manager_agent._token_process.get_summary() total_usage_metrics.add_usage_metrics(token_sum) - + # Create per-agent metrics for manager manager_metrics = AgentTokenMetrics( agent_name=manager_role, @@ -1816,7 +1825,7 @@ class Crew(FlowTrackable, BaseModel): llm_usage = self.manager_agent.llm._token_process.get_summary() total_usage_metrics.add_usage_metrics(llm_usage) - + # Update or create manager metrics if manager_role in workflow_metrics.per_agent: workflow_metrics.per_agent[manager_role].total_tokens += llm_usage.total_tokens @@ -1842,7 +1851,7 @@ class Crew(FlowTrackable, BaseModel): workflow_metrics.cached_prompt_tokens = total_usage_metrics.cached_prompt_tokens workflow_metrics.completion_tokens = total_usage_metrics.completion_tokens workflow_metrics.successful_requests = total_usage_metrics.successful_requests - + # Store workflow metrics (preserving per_task data) self.workflow_token_metrics = workflow_metrics self.usage_metrics = total_usage_metrics @@ -2123,14 +2132,14 @@ To enable tracing, do any one of these: """Get current token usage for an agent.""" if not agent: return UsageMetrics() - + if isinstance(agent.llm, BaseLLM): return agent.llm.get_token_usage_summary() - elif hasattr(agent, "_token_process"): + if hasattr(agent, "_token_process"): return agent._token_process.get_summary() - + return UsageMetrics() - + def _attach_task_token_metrics( self, task_output: TaskOutput, @@ -2141,10 +2150,10 @@ To enable tracing, do any one of these: ) -> TaskOutput: """Attach per-task token metrics to the task output.""" from crewai.types.usage_metrics import TaskTokenMetrics - + if not agent: return task_output - + # Calculate the delta (tokens used by this specific task) task_tokens = TaskTokenMetrics( task_name=getattr(task, 'name', None) or task.description[:50], @@ -2156,18 +2165,18 @@ To enable tracing, do any one of these: completion_tokens=tokens_after.completion_tokens - tokens_before.completion_tokens, successful_requests=tokens_after.successful_requests - tokens_before.successful_requests ) - + # Attach to task output task_output.usage_metrics = task_tokens - + # Store in workflow metrics if not hasattr(self, 'workflow_token_metrics') or self.workflow_token_metrics is None: from crewai.types.usage_metrics import WorkflowTokenMetrics self.workflow_token_metrics = WorkflowTokenMetrics() - + # Use task_id in the key to prevent collision when multiple tasks have the same name task_key = f"{task_tokens.task_id}_{task_tokens.task_name}_{task_tokens.agent_name}" self.workflow_token_metrics.per_task[task_key] = task_tokens - + return task_output