Update pipeline to properly process input and ouput dictionary

This commit is contained in:
Brandon Hancock
2024-07-23 11:12:55 -04:00
parent 471c5b970c
commit 602ade4cc4

View File

@@ -1,6 +1,6 @@
import asyncio
from collections import deque
from typing import Any, Dict, List, Union
import copy
from typing import Any, Dict, List, Tuple, Union
from pydantic import BaseModel, Field, model_validator
@@ -8,37 +8,6 @@ from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_run_result import PipelineRunResult
"""
Pipeline Terminology:
Pipeline: The overall structure that defines a sequence of operations.
Stage: A distinct part of the pipeline, which can be either sequential or parallel.
Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
Branch: Parallel executions within a stage (e.g., concurrent crew operations).
Trace: The journey of an individual input through the entire pipeline.
Example pipeline structure:
crew1 >> crew2 >> crew3
This represents a pipeline with three sequential stages:
1. crew1 is the first stage, which processes the input and passes its output to crew2.
2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3.
3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline.
Each input creates its own run, flowing through all stages of the pipeline.
Multiple runs can be processed concurrently, each following the defined pipeline structure.
Another example pipeline structure:
crew1 >> [crew2, crew3] >> crew4
This represents a pipeline with three stages:
1. A sequential stage (crew1)
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
3. Another sequential stage (crew4)
Each input creates its own run, flowing through all stages of the pipeline.
Multiple runs can be processed concurrently, each following the defined pipeline structure.
"""
Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]]
@@ -75,157 +44,9 @@ class Pipeline(BaseModel):
"""
pipeline_results = []
def format_traces(
traces: List[List[Union[str, Dict[str, Any]]]],
) -> List[List[Trace]]:
print("INCOMING TRACES: ", traces)
formatted_traces: List[Trace] = []
# Process all traces except the last one
for trace in traces[:-1]:
if len(trace) == 1:
formatted_traces.append(trace[0])
else:
formatted_traces.append(trace)
print("FORMATTED TRACES PRE LAST TRACE: ", formatted_traces)
# Handle the final stage trace
traces_to_return: List[List[Trace]] = []
final_trace = traces[-1]
print("FINAL TRACE: ", final_trace)
if len(final_trace) == 1:
formatted_traces.append(final_trace[0])
traces_to_return.append(formatted_traces)
else:
for trace in final_trace:
copied_traces = formatted_traces.copy()
copied_traces.append(trace)
traces_to_return.append(copied_traces)
print("TRACES TO RETURN", traces_to_return)
return traces_to_return
def format_crew_outputs(
all_stage_outputs: List[List[CrewOutput]],
) -> List[List[CrewOutput]]:
formatted_crew_outputs: List[List[CrewOutput]] = []
# Handle all output stages except the final one
crew_outputs: List[CrewOutput] = []
for stage_outputs in all_stage_outputs[:-1]:
for output in stage_outputs:
crew_outputs.append(output)
final_stage = all_stage_outputs[-1]
for output in final_stage:
copied_crew_outputs = crew_outputs.copy()
copied_crew_outputs.append(output)
formatted_crew_outputs.append(copied_crew_outputs)
return formatted_crew_outputs
def build_pipeline_run_results(
all_stage_outputs: List[List[CrewOutput]],
traces: List[List[Union[str, Dict[str, Any]]]],
token_usage: Dict[str, Any],
) -> List[PipelineRunResult]:
"""
Build PipelineRunResult objects from the final stage outputs and traces.
"""
pipeline_run_results: List[PipelineRunResult] = []
formatted_traces = format_traces(traces)
formatted_crew_outputs = format_crew_outputs(all_stage_outputs)
for crews_outputs, formatted_trace in zip(
formatted_crew_outputs, formatted_traces
):
final_crew = crews_outputs[-1]
new_pipeline_run_result = PipelineRunResult(
token_usage=token_usage,
trace=formatted_trace,
raw=final_crew.raw,
pydantic=final_crew.pydantic,
json_dict=final_crew.json_dict,
crews_outputs=crews_outputs,
)
pipeline_run_results.append(new_pipeline_run_result)
return pipeline_run_results
async def process_single_run(
run_input: Dict[str, Any]
) -> List[PipelineRunResult]:
initial_input = run_input.copy() # Create a copy of the initial input
current_input = (
run_input.copy()
) # Create a working copy that will be updated
stages_queue = deque(self.stages)
usage_metrics = {}
stage_outputs: List[CrewOutput] = []
all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [
[initial_input]
] # Use the initial input here
stage = None
while stages_queue:
stage = stages_queue.popleft()
if isinstance(stage, Crew):
# Process single crew
output = await stage.kickoff_async(inputs=current_input)
# Update usage metrics and setup inputs for next stage
usage_metrics[stage.name or stage.id] = output.token_usage
current_input.update(output.to_dict()) # Update the working copy
# Update traces for single crew stage
traces.append([stage.name or str(stage.id)])
# Store output for final results
stage_outputs = [output]
else:
# Process each crew in parallel
parallel_outputs = await asyncio.gather(
*[crew.kickoff_async(inputs=current_input) for crew in stage]
)
# Update usage metrics and setup inputs for next stage
for crew, output in zip(stage, parallel_outputs):
usage_metrics[crew.name] = output.token_usage
current_input.update(
output.to_dict()
) # Update the working copy
# Update traces for parallel stage
traces.append([crew.name or str(crew.id) for crew in stage])
# Store output for final results
stage_outputs = parallel_outputs
all_stage_outputs.append(stage_outputs)
# print("STAGE OUTPUTS: ", stage_outputs)
# print("TRACES: ", traces)
# print("TOKEN USAGE: ", usage_metrics)
# print("ALL STAGE OUTPUTS: ", all_stage_outputs)
# Build final pipeline run results
final_results = build_pipeline_run_results(
all_stage_outputs=all_stage_outputs,
traces=traces,
token_usage=usage_metrics,
)
print("FINAL RESULTS: ", final_results)
# prepare traces for final results
return final_results
# Process all runs in parallel
all_run_results = await asyncio.gather(
*(process_single_run(input_data) for input_data in run_inputs)
*(self.process_single_run(input_data) for input_data in run_inputs)
)
# Flatten the list of lists into a single list of results
@@ -235,6 +56,115 @@ class Pipeline(BaseModel):
return pipeline_results
async def process_single_run(
self, run_input: Dict[str, Any]
) -> List[PipelineRunResult]:
initial_input = copy.deepcopy(run_input)
current_input = copy.deepcopy(run_input)
usage_metrics = {}
all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]]
for stage in self.stages:
stage_input = copy.deepcopy(current_input)
stage_outputs, stage_trace = await self._process_stage(stage, stage_input)
self._update_metrics_and_input(
usage_metrics, current_input, stage, stage_outputs
)
traces.append(stage_trace)
all_stage_outputs.append(stage_outputs)
return self._build_pipeline_run_results(
all_stage_outputs, traces, usage_metrics
)
async def _process_stage(
self, stage: Union[Crew, List[Crew]], current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
if isinstance(stage, Crew):
return await self._process_single_crew(stage, current_input)
else:
return await self._process_parallel_crews(stage, current_input)
async def _process_single_crew(
self, crew: Crew, current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
output = await crew.kickoff_async(inputs=current_input)
return [output], [crew.name or str(crew.id)]
async def _process_parallel_crews(
self, crews: List[Crew], current_input: Dict[str, Any]
) -> Tuple[List[CrewOutput], List[Union[str, Dict[str, Any]]]]:
parallel_outputs = await asyncio.gather(
*[crew.kickoff_async(inputs=current_input) for crew in crews]
)
return parallel_outputs, [crew.name or str(crew.id) for crew in crews]
def _update_metrics_and_input(
self,
usage_metrics: Dict[str, Any],
current_input: Dict[str, Any],
stage: Union[Crew, List[Crew]],
outputs: List[CrewOutput],
) -> None:
for crew, output in zip([stage] if isinstance(stage, Crew) else stage, outputs):
usage_metrics[crew.name or str(crew.id)] = output.token_usage
current_input.update(output.to_dict())
def _build_pipeline_run_results(
self,
all_stage_outputs: List[List[CrewOutput]],
traces: List[List[Union[str, Dict[str, Any]]]],
token_usage: Dict[str, Any],
) -> List[PipelineRunResult]:
formatted_traces = self._format_traces(traces)
formatted_crew_outputs = self._format_crew_outputs(all_stage_outputs)
return [
PipelineRunResult(
token_usage=token_usage,
trace=formatted_trace,
raw=crews_outputs[-1].raw,
pydantic=crews_outputs[-1].pydantic,
json_dict=crews_outputs[-1].json_dict,
crews_outputs=crews_outputs,
)
for crews_outputs, formatted_trace in zip(
formatted_crew_outputs, formatted_traces
)
]
def _format_traces(
self, traces: List[List[Union[str, Dict[str, Any]]]]
) -> List[List[Trace]]:
formatted_traces: List[Trace] = []
for trace in traces[:-1]:
formatted_traces.append(trace[0] if len(trace) == 1 else trace)
traces_to_return: List[List[Trace]] = []
final_trace = traces[-1]
if len(final_trace) == 1:
formatted_traces.append(final_trace[0])
traces_to_return.append(formatted_traces)
else:
for trace in final_trace:
copied_traces = formatted_traces.copy()
copied_traces.append(trace)
traces_to_return.append(copied_traces)
return traces_to_return
def _format_crew_outputs(
self, all_stage_outputs: List[List[CrewOutput]]
) -> List[List[CrewOutput]]:
crew_outputs: List[CrewOutput] = [
output
for stage_outputs in all_stage_outputs[:-1]
for output in stage_outputs
]
return [crew_outputs + [output] for output in all_stage_outputs[-1]]
def __rshift__(self, other: Any) -> "Pipeline":
"""
Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline.