Brandon/cre 130 pipeline project structure (#1066)

* WIP. Procedure appears to be working well. Working on mocking properly for tests

* All tests are passing now

* rshift working

* Add back in Gui's tool_usage fix

* WIP

* Going to start refactoring for pipeline_output

* Update terminology

* new pipeline flow with traces and usage metrics working. need to add more tests and make sure PipelineOutput behaves likew CrewOutput

* Fix pipelineoutput to look more like crewoutput and taskoutput

* Implemented additional tests for pipeline. One test is failing. Need team support

* Update docs for pipeline

* Update pipeline to properly process input and ouput dictionary

* Update Pipeline docs

* Add back in commentary at top of pipeline file

* Starting to work on router

* Drop router for now. will add in separately

* In the middle of fixing router. A ton of circular dependencies. Moving over to a new design.

* WIP.

* Fix circular dependencies and updated PipelineRouter

* Add in Eduardo feedback. Still need to add in more commentary describing the design decisions for pipeline

* Add developer notes to explain what is going on in pipelines.

* Add doc strings

* Fix missing rag datatype

* WIP. Converting usage metrics from a dict to an object

* Fix tests that were checking usage metrics

* Drop todo

* Fix 1 type error in pipeline

* Update pipeline to use UsageMetric

* Add missing doc string

* WIP.

* Change names

* Rename variables based on joaos feedback

* Fix critical circular dependency issues. Now needing to fix trace issue.

* Tests working now!

* Add more tests which showed underlying issue with traces

* Fix tests

* Remove overly complicated test

* Add router example to docs

* Clean up end of docs

* Clean up docs

* Working on creating Crew templates and pipeline templates

* WIP.

* WIP

* Fix poetry install from templates

* WIP

* Restructure

* changes for lorenze

* more todos

* WIP: create pipelines cli working

* wrapped up router

* ignore mypy src on templates

* ignored signature of copy

* fix all verbose

* rm print statements

* brought back correct folders

* fixes missing folders and then rm print statements

* fixed tests

* fixed broken test

* fixed type checker

* fixed type ignore

* ignore types for templates

* needed

* revert

* exclude only required

* rm type errors on templates

* rm excluding type checks for template files on github action

* fixed missing quotes

---------

Co-authored-by: Brandon Hancock <brandon@brandonhancock.io>
This commit is contained in:
Lorenze Jay
2024-08-09 14:13:29 -07:00
committed by GitHub
parent 6583f31459
commit 62f5b2fb2e
87 changed files with 5435 additions and 571 deletions

View File

@@ -3,7 +3,7 @@ import json
import uuid
from concurrent.futures import Future
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from langchain_core.callbacks import BaseCallbackHandler
from pydantic import (
@@ -32,11 +32,9 @@ from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import (
TRAINED_AGENTS_DATA_FILE,
TRAINING_DATA_FILE,
)
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.formatter import (
@@ -52,6 +50,9 @@ try:
except ImportError:
agentops = None
if TYPE_CHECKING:
from crewai.pipeline.pipeline import Pipeline
class Crew(BaseModel):
"""
@@ -97,6 +98,7 @@ class Crew(BaseModel):
default_factory=TaskOutputStorageHandler
)
name: Optional[str] = Field(default=None)
cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(default_factory=list)
@@ -111,7 +113,7 @@ class Crew(BaseModel):
default={"provider": "openai"},
description="Configuration for the embedder to be used for the crew.",
)
usage_metrics: Optional[dict] = Field(
usage_metrics: Optional[UsageMetrics] = Field(
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
@@ -147,8 +149,8 @@ class Crew(BaseModel):
default=None,
description="Path to the prompt json file to be used for the crew.",
)
output_log_file: Optional[Union[bool, str]] = Field(
default=False,
output_log_file: Optional[str] = Field(
default=None,
description="output_log_file",
)
planning: Optional[bool] = Field(
@@ -453,7 +455,7 @@ class Crew(BaseModel):
if self.planning:
self._handle_crew_planning()
metrics = []
metrics: List[UsageMetrics] = []
if self.process == Process.sequential:
result = self._run_sequential_process()
@@ -463,11 +465,12 @@ class Crew(BaseModel):
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
metrics += [agent._token_process.get_summary() for agent in self.agents]
self.usage_metrics = {
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
}
self.usage_metrics = UsageMetrics()
for metric in metrics:
self.usage_metrics.add_usage_metrics(metric)
return result
@@ -476,12 +479,7 @@ class Crew(BaseModel):
results: List[CrewOutput] = []
# Initialize the parent crew's usage metrics
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
total_usage_metrics = UsageMetrics()
for input_data in inputs:
crew = self.copy()
@@ -489,8 +487,7 @@ class Crew(BaseModel):
output = crew.kickoff(inputs=input_data)
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
total_usage_metrics.add_usage_metrics(crew.usage_metrics)
results.append(output)
@@ -519,29 +516,10 @@ class Crew(BaseModel):
results = await asyncio.gather(*tasks)
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
total_usage_metrics = UsageMetrics()
for crew in crew_copies:
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
self.usage_metrics = total_usage_metrics
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for crew in crew_copies:
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
total_usage_metrics.add_usage_metrics(crew.usage_metrics)
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
@@ -932,25 +910,18 @@ class Crew(BaseModel):
)
self._telemetry.end_crew(self, final_string_output)
def calculate_usage_metrics(self) -> Dict[str, int]:
def calculate_usage_metrics(self) -> UsageMetrics:
"""Calculates and returns the usage metrics."""
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
total_usage_metrics = UsageMetrics()
for agent in self.agents:
if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary()
for key in total_usage_metrics:
total_usage_metrics[key] += token_sum.get(key, 0)
total_usage_metrics.add_usage_metrics(token_sum)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
for key in total_usage_metrics:
total_usage_metrics[key] += token_sum.get(key, 0)
total_usage_metrics.add_usage_metrics(token_sum)
return total_usage_metrics
@@ -969,5 +940,17 @@ class Crew(BaseModel):
evaluator.print_crew_evaluation_result()
def __rshift__(self, other: "Crew") -> "Pipeline":
"""
Implements the >> operator to add another Crew to an existing Pipeline.
"""
from crewai.pipeline.pipeline import Pipeline
if not isinstance(other, Crew):
raise TypeError(
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
)
return Pipeline(stages=[self, other])
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"