new pipeline flow with traces and usage metrics working. need to add more tests and make sure PipelineOutput behaves likew CrewOutput

This commit is contained in:
Brandon Hancock
2024-07-19 14:39:31 -04:00
parent d094e178f1
commit 392490c48b
4 changed files with 152 additions and 42 deletions

View File

@@ -1,11 +1,12 @@
import asyncio import asyncio
from collections import deque
from typing import Any, Dict, List, Union from typing import Any, Dict, List, Union
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from crewai.crew import Crew from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_output import PipelineOutput from crewai.pipeline.pipeline_run_result import PipelineRunResult
""" """
Pipeline Terminology: Pipeline Terminology:
@@ -38,6 +39,8 @@ Each input creates its own run, flowing through all stages of the pipeline.
Multiple runs can be processed concurrently, each following the defined pipeline structure. Multiple runs can be processed concurrently, each following the defined pipeline structure.
""" """
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
class Pipeline(BaseModel): class Pipeline(BaseModel):
stages: List[Union[Crew, List[Crew]]] = Field( stages: List[Union[Crew, List[Crew]]] = Field(
@@ -46,43 +49,130 @@ class Pipeline(BaseModel):
async def process_runs( async def process_runs(
self, run_inputs: List[Dict[str, Any]] self, run_inputs: List[Dict[str, Any]]
) -> List[List[CrewOutput]]: ) -> List[PipelineRunResult]:
""" """
Process multiple runs in parallel, with each run going through all stages. Process multiple runs in parallel, with each run going through all stages.
""" """
pipeline_output = PipelineOutput() pipeline_results = []
async def process_single_run(run_input: Dict[str, Any]) -> List[CrewOutput]: def format_traces(
print("current_input in run", run_input) traces: List[List[Union[str, Dict[str, Any]]]],
stage_outputs = [] ) -> List[List[Trace]]:
formatted_traces: List[Trace] = []
# Process all traces except the last one
for trace in traces[:-1]:
if len(trace) == 1:
formatted_traces.append(trace[0])
else:
formatted_traces.append(trace)
# Handle the final stage trace
traces_to_return: List[List[Trace]] = []
final_trace = traces[-1]
if len(final_trace) == 1:
formatted_traces.append(final_trace)
traces_to_return.append(formatted_traces)
else:
for trace in final_trace:
copied_traces = formatted_traces.copy()
copied_traces.append(trace)
traces_to_return.append(copied_traces)
return traces_to_return
def build_pipeline_run_results(
final_stage_outputs: List[CrewOutput],
traces: List[List[Union[str, Dict[str, Any]]]],
token_usage: Dict[str, Any],
) -> List[PipelineRunResult]:
"""
Build PipelineRunResult objects from the final stage outputs and traces.
"""
pipeline_run_results: List[PipelineRunResult] = []
# Format traces
formatted_traces = format_traces(traces)
for output, formatted_trace in zip(final_stage_outputs, formatted_traces):
# FORMAT TRACE
new_pipeline_run_result = PipelineRunResult(
final_output=output,
token_usage=token_usage,
trace=formatted_trace,
)
pipeline_run_results.append(new_pipeline_run_result)
return pipeline_run_results
async def process_single_run(
run_input: Dict[str, Any]
) -> List[PipelineRunResult]:
stages_queue = deque(self.stages)
usage_metrics = {}
stage_outputs: List[CrewOutput] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[run_input]]
stage = None
while stages_queue:
stage = stages_queue.popleft()
for stage in self.stages:
if isinstance(stage, Crew): if isinstance(stage, Crew):
# Process single crew # Process single crew
stage_output = await stage.kickoff_async(inputs=run_input) output = await stage.kickoff_async(inputs=run_input)
stage_outputs = [stage_output] # Update usage metrics and setup inputs for next stage
usage_metrics[stage.name] = output.token_usage
run_input.update(output.to_dict())
# Update traces for single crew stage
traces.append([stage.name or "No name"])
# Store output for final results
stage_outputs = [output]
else: else:
# Process each crew in parallel # Process each crew in parallel
parallel_outputs = await asyncio.gather( parallel_outputs = await asyncio.gather(
*[crew.kickoff_async(inputs=run_input) for crew in stage] *[crew.kickoff_async(inputs=run_input) for crew in stage]
) )
# Update usage metrics and setup inputs for next stage
for crew, output in zip(stage, parallel_outputs):
usage_metrics[crew.name] = output.token_usage
run_input.update(output.to_dict())
# Update traces for parallel stage
traces.append([crew.name or "No name" for crew in stage])
# Store output for final results
stage_outputs = parallel_outputs stage_outputs = parallel_outputs
# Convert all CrewOutputs from stage into a dictionary for next stage print("STAGE OUTPUTS: ", stage_outputs)
# and update original run_input dictionary with new values print("TRACES: ", traces)
stage_output_dicts = [output.to_dict() for output in stage_outputs] print("TOKEN USAGE: ", usage_metrics)
for stage_dict in stage_output_dicts:
run_input.update(stage_dict)
print("UPDATING run_input - new values:", run_input)
# Return all CrewOutputs from this run # Build final pipeline run results
return stage_outputs final_results = build_pipeline_run_results(
final_stage_outputs=stage_outputs,
traces=traces,
token_usage=usage_metrics,
)
print("FINAL RESULTS: ", final_results)
# prepare traces for final results
return final_results
# Process all runs in parallel # Process all runs in parallel
return await asyncio.gather( all_run_results = await asyncio.gather(
*(process_single_run(input_data) for input_data in run_inputs) *(process_single_run(input_data) for input_data in run_inputs)
) )
# Flatten the list of lists into a single list of results
pipeline_results.extend(
result for run_result in all_run_results for result in run_result
)
return pipeline_results
def __rshift__(self, other: Any) -> "Pipeline": def __rshift__(self, other: Any) -> "Pipeline":
""" """
Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline. Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline.
@@ -100,5 +190,5 @@ class Pipeline(BaseModel):
# Helper function to run the pipeline # Helper function to run the pipeline
async def run_pipeline( async def run_pipeline(
pipeline: Pipeline, inputs: List[Dict[str, Any]] pipeline: Pipeline, inputs: List[Dict[str, Any]]
) -> List[List[CrewOutput]]: ) -> List[PipelineRunResult]:
return await pipeline.process_runs(inputs) return await pipeline.process_runs(inputs)

View File

@@ -1,21 +1,14 @@
from typing import Any, Dict, List from typing import List
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline_run_result import PipelineRunResult
class PipelineOutput(BaseModel): class PipelineOutput(BaseModel):
final_outputs: List[CrewOutput] = Field( run_results: List[PipelineRunResult] = Field(
description="List of final outputs from the last crew in the pipeline", description="List of results for each run through the pipeline", default=[]
default=[],
)
token_usage: List[List[Dict[str, Any]]] = Field(
description="Token usage for each crew in each stream", default=[]
) )
def add_final_output(self, output: CrewOutput): def add_run_result(self, result: PipelineRunResult):
self.final_outputs.append(output) self.run_results.append(result)
def add_token_usage(self, usage: List[Dict[str, Any]]):
self.token_usage.append(usage)

View File

@@ -0,0 +1,22 @@
from typing import Any, Dict, List
from pydantic import BaseModel, Field
from crewai.crews.crew_output import CrewOutput
class PipelineRunResult(BaseModel):
final_output: CrewOutput = Field(
description="Final output from the last crew in the run"
)
token_usage: Dict[str, Any] = Field(
description="Token usage for each crew in the run"
)
trace: List[Any] = Field(
description="Trace of the journey of inputs through the run"
)
# TODO: Should we store all outputs from crews along the way?
crews_output: List[CrewOutput] = Field(
description="Output from each crew in the run",
default=[],
)

View File

@@ -5,6 +5,7 @@ from crewai.agent import Agent
from crewai.crew import Crew from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline import Pipeline from crewai.pipeline.pipeline import Pipeline
from crewai.pipeline.pipeline_run_result import PipelineRunResult
from crewai.process import Process from crewai.process import Process
from crewai.task import Task from crewai.task import Task
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
@@ -83,16 +84,20 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory):
pipeline_result = await pipeline.process_runs(input_data) pipeline_result = await pipeline.process_runs(input_data)
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
for stream_result in pipeline_result:
assert isinstance(stream_result[0], CrewOutput) for pipeline_line_result in pipeline_result:
assert stream_result[0].raw == "Test output" assert isinstance(pipeline_line_result, PipelineRunResult)
assert len(stream_result[0].tasks_output) == 1
assert stream_result[0].tasks_output[0].raw == "Task output" # for stream_result in pipeline_result:
assert stream_result[0].token_usage == { # assert isinstance(stream_result[0], CrewOutput)
"total_tokens": 100, # assert stream_result[0].raw == "Test output"
"prompt_tokens": 50, # assert len(stream_result[0].tasks_output) == 1
"completion_tokens": 50, # assert stream_result[0].tasks_output[0].raw == "Task output"
} # assert stream_result[0].token_usage == {
# "total_tokens": 100,
# "prompt_tokens": 50,
# "completion_tokens": 50,
# }
@pytest.mark.asyncio @pytest.mark.asyncio