Fix pipelineoutput to look more like crewoutput and taskoutput

This commit is contained in:
Brandon Hancock
2024-07-19 15:18:19 -04:00
parent 392490c48b
commit afd6bff159
3 changed files with 86 additions and 14 deletions

View File

@@ -82,8 +82,27 @@ class Pipeline(BaseModel):
return traces_to_return
def format_crew_outputs(
all_stage_outputs: List[List[CrewOutput]],
) -> List[List[CrewOutput]]:
formatted_crew_outputs: List[List[CrewOutput]] = []
# Handle all output stages except the final one
crew_outputs: List[CrewOutput] = []
for stage_outputs in all_stage_outputs[:-1]:
for output in stage_outputs:
crew_outputs.append(output)
final_stage = all_stage_outputs[-1]
for output in final_stage:
copied_crew_outputs = crew_outputs.copy()
copied_crew_outputs.append(output)
formatted_crew_outputs.append(copied_crew_outputs)
return formatted_crew_outputs
def build_pipeline_run_results(
final_stage_outputs: List[CrewOutput],
all_stage_outputs: List[List[CrewOutput]],
traces: List[List[Union[str, Dict[str, Any]]]],
token_usage: Dict[str, Any],
) -> List[PipelineRunResult]:
@@ -93,16 +112,21 @@ class Pipeline(BaseModel):
pipeline_run_results: List[PipelineRunResult] = []
# Format traces
formatted_traces = format_traces(traces)
formatted_crew_outputs = format_crew_outputs(all_stage_outputs)
for output, formatted_trace in zip(final_stage_outputs, formatted_traces):
# FORMAT TRACE
for crews_outputs, formatted_trace in zip(
formatted_crew_outputs, formatted_traces
):
final_crew = crews_outputs[-1]
new_pipeline_run_result = PipelineRunResult(
final_output=output,
token_usage=token_usage,
trace=formatted_trace,
raw=final_crew.raw,
pydantic=final_crew.pydantic,
json_dict=final_crew.json_dict,
crews_outputs=crews_outputs,
)
pipeline_run_results.append(new_pipeline_run_result)
@@ -112,9 +136,10 @@ class Pipeline(BaseModel):
async def process_single_run(
run_input: Dict[str, Any]
) -> List[PipelineRunResult]:
stages_queue = deque(self.stages)
stages_queue = deque(self.stages) # TODO: Change over to forloop
usage_metrics = {}
stage_outputs: List[CrewOutput] = []
all_stage_outputs: List[List[CrewOutput]] = []
traces: List[List[Union[str, Dict[str, Any]]]] = [[run_input]]
stage = None
@@ -146,13 +171,16 @@ class Pipeline(BaseModel):
# Store output for final results
stage_outputs = parallel_outputs
all_stage_outputs.append(stage_outputs)
print("STAGE OUTPUTS: ", stage_outputs)
print("TRACES: ", traces)
print("TOKEN USAGE: ", usage_metrics)
print("ALL STAGE OUTPUTS: ", all_stage_outputs)
# Build final pipeline run results
final_results = build_pipeline_run_results(
final_stage_outputs=stage_outputs,
all_stage_outputs=all_stage_outputs,
traces=traces,
token_usage=usage_metrics,
)

View File

@@ -1,11 +1,17 @@
import uuid
from typing import List
from pydantic import BaseModel, Field
from pydantic import UUID4, BaseModel, Field
from crewai.pipeline.pipeline_run_result import PipelineRunResult
class PipelineOutput(BaseModel):
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
run_results: List[PipelineRunResult] = Field(
description="List of results for each run through the pipeline", default=[]
)

View File

@@ -1,22 +1,60 @@
from typing import Any, Dict, List
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field
from pydantic import UUID4, BaseModel, Field
from crewai.crews.crew_output import CrewOutput
class PipelineRunResult(BaseModel):
final_output: CrewOutput = Field(
description="Final output from the last crew in the run"
"""Class that represents the result of a pipeline run."""
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
raw: str = Field(description="Raw output of the pipeline run", default="")
pydantic: Any = Field(
description="Pydantic output of the pipeline run", default=None
)
json_dict: Union[Dict[str, Any], None] = Field(
description="JSON dict output of the pipeline run", default={}
)
token_usage: Dict[str, Any] = Field(
description="Token usage for each crew in the run"
)
trace: List[Any] = Field(
description="Trace of the journey of inputs through the run"
)
# TODO: Should we store all outputs from crews along the way?
crews_output: List[CrewOutput] = Field(
crews_outputs: List[CrewOutput] = Field(
description="Output from each crew in the run",
default=[],
)
@property
def json(self) -> Optional[str]:
if self.crews_outputs[-1].tasks_output[-1].output_format != "json":
raise ValueError(
"No JSON output found in the final task of the final crew. Please make sure to set the output_json property in the final task in your crew."
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_dict:
output_dict.update(self.json_dict)
elif self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
def __str__(self):
if self.pydantic:
return str(self.pydantic)
if self.json_dict:
return str(self.json_dict)
return self.raw