mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
Update parent crew who is managing for_each loop
This commit is contained in:
@@ -5,15 +5,15 @@ from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from langchain_core.callbacks import BaseCallbackHandler
|
||||
from pydantic import (
|
||||
UUID4,
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
InstanceOf,
|
||||
Json,
|
||||
PrivateAttr,
|
||||
field_validator,
|
||||
model_validator,
|
||||
UUID4,
|
||||
BaseModel,
|
||||
ConfigDict,
|
||||
Field,
|
||||
InstanceOf,
|
||||
Json,
|
||||
PrivateAttr,
|
||||
field_validator,
|
||||
model_validator,
|
||||
)
|
||||
from pydantic_core import PydanticCustomError
|
||||
|
||||
@@ -284,7 +284,6 @@ class Crew(BaseModel):
|
||||
f"The process '{self.process}' is not implemented yet."
|
||||
)
|
||||
|
||||
# TODO: THIS IS A BUG. ONLY THE LAST AGENT'S TOKEN USAGE IS BEING RETURNED
|
||||
metrics = metrics + [
|
||||
agent._token_process.get_summary() for agent in self.agents
|
||||
]
|
||||
@@ -300,14 +299,26 @@ class Crew(BaseModel):
|
||||
"""Executes the Crew's workflow for each input in the list and aggregates results."""
|
||||
results = []
|
||||
|
||||
# Initialize the parent crew's usage metrics
|
||||
total_usage_metrics = {
|
||||
"total_tokens": 0,
|
||||
"prompt_tokens": 0,
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
}
|
||||
|
||||
for input_data in inputs:
|
||||
crew = self.copy()
|
||||
|
||||
output = crew.kickoff(inputs=input_data)
|
||||
# TODO: FIGURE OUT HOW TO MERGE THE USAGE METRICS
|
||||
# TODO: I would expect we would want to merge the usage metrics from each crew execution
|
||||
|
||||
if crew.usage_metrics:
|
||||
for key in total_usage_metrics:
|
||||
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
|
||||
|
||||
results.append(output)
|
||||
|
||||
self.usage_metrics = total_usage_metrics
|
||||
return results
|
||||
|
||||
async def kickoff_async(
|
||||
@@ -320,10 +331,6 @@ class Crew(BaseModel):
|
||||
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
|
||||
crew_copies = [self.copy() for _ in inputs]
|
||||
|
||||
for crew in crew_copies:
|
||||
for task in crew.tasks:
|
||||
print("TASK DESCRIPTION", task.description)
|
||||
|
||||
async def run_crew(crew, input_data):
|
||||
return await crew.kickoff_async(inputs=input_data)
|
||||
|
||||
@@ -334,8 +341,19 @@ class Crew(BaseModel):
|
||||
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
# TODO: FIGURE OUT HOW TO MERGE THE USAGE METRICS
|
||||
# TODO: I would expect we would want to merge the usage metrics from each crew execution
|
||||
total_usage_metrics = {
|
||||
"total_tokens": 0,
|
||||
"prompt_tokens": 0,
|
||||
"completion_tokens": 0,
|
||||
"successful_requests": 0,
|
||||
}
|
||||
for crew in crew_copies:
|
||||
if crew.usage_metrics:
|
||||
for key in total_usage_metrics:
|
||||
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
|
||||
|
||||
self.usage_metrics = total_usage_metrics
|
||||
|
||||
return results
|
||||
|
||||
def train(self, n_iterations: int) -> None:
|
||||
|
||||
Reference in New Issue
Block a user