WIP. It looks like usage metrics has always been broken for async

This commit is contained in:
Brandon Hancock
2024-07-01 11:28:50 -04:00
parent 764234c426
commit 5091712a2d
3 changed files with 34 additions and 12 deletions

View File

@@ -10,14 +10,14 @@ from langchain_core.agents import AgentAction
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
@@ -169,6 +169,9 @@ class Agent(BaseModel):
@model_validator(mode="after")
def set_agent_executor(self) -> "Agent":
"""set agent executor is set."""
print(
f"CREW ID: {self.id} - SET AGENT EXECUTOR model name", self.llm.model_name
)
if hasattr(self.llm, "model_name"):
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)
@@ -177,6 +180,7 @@ class Agent(BaseModel):
self.llm.callbacks = []
# Check if an instance of TokenCalcHandler already exists in the list
print(f"CREW ID : {self.id} - self.llm.callbacks", self.llm.callbacks)
if not any(
isinstance(handler, TokenCalcHandler) for handler in self.llm.callbacks
):
@@ -382,11 +386,15 @@ class Agent(BaseModel):
"llm",
}
print("EXISTING LLM", self.llm)
existing_llm = copy(self.llm)
print("COPIED LLM", existing_llm)
# TODO: EXPAND ON WHY THIS IS NEEDED
# RESET LLM CALLBACKS
existing_llm.callbacks = []
existing_llm.callbacks = []
print("RESET LLM CALLBACKS", existing_llm)
copied_data = self.model_dump(exclude=exclude)
print("COPIED DATA FOR AGENT", copied_data)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = Agent(**copied_data, llm=existing_llm, tools=self.tools)

View File

@@ -64,7 +64,7 @@ class Crew(BaseModel):
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
cache: bool = Field(default=True)
cache: bool = Field(default=False)
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[Agent] = Field(default_factory=list)
@@ -247,6 +247,11 @@ class Crew(BaseModel):
inputs: Optional[Dict[str, Any]] = {},
) -> Union[str, Dict[str, Any]]:
"""Starts the crew to work on its assigned tasks."""
print(f"CREW ID {self.id} - KICKING OFF CREW")
print(
f"CREW ID {self.id} - callbacks",
[agent.llm.callbacks for agent in self.agents],
)
self._execution_span = self._telemetry.crew_execution_span(self)
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs)
@@ -315,6 +320,10 @@ class Crew(BaseModel):
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
crew_copies = [self.copy() for _ in inputs]
for crew in crew_copies:
for task in crew.tasks:
print("TASK DESCRIPTION", task.description)
async def run_crew(crew, input_data):
return await crew.kickoff_async(inputs=input_data)
@@ -344,6 +353,7 @@ class Crew(BaseModel):
}
for task in self.tasks:
print("TASK DESCRIPTION", task.description)
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation"
agents_for_delegation = [
agent for agent in self.agents if agent != task.agent
@@ -374,7 +384,9 @@ class Crew(BaseModel):
self._file_handler.log(agent=role, task=task_output, status="completed")
for agent in self.agents:
print("INSPECTING AGENT", agent.role)
agent_token_usage = agent._token_process.get_summary()
print("AGENT TOKEN USAGE", agent_token_usage)
for key in total_token_usage:
total_token_usage[key] += agent_token_usage.get(key, 0)

View File

@@ -2,7 +2,7 @@ import os
import re
import threading
import uuid
from copy import deepcopy
from copy import copy
from typing import Any, Dict, List, Optional, Type
from langchain_openai import ChatOpenAI
@@ -268,7 +268,9 @@ class Task(BaseModel):
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
# cloned_agent = self.agent.copy() if self.agent else None
cloned_tools = deepcopy(self.tools) if self.tools else []
print("TOOLS BEFORE COPY", self.tools)
cloned_tools = copy(self.tools) if self.tools else []
print("TOOLS AFTER COPY", cloned_tools)
copied_task = Task(
**copied_data,