From afd6bff1593280b93cd87a0d465a2c95768805c1 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Fri, 19 Jul 2024 15:18:19 -0400 Subject: [PATCH] Fix pipelineoutput to look more like crewoutput and taskoutput --- src/crewai/pipeline/pipeline.py | 42 +++++++++++++++--- src/crewai/pipeline/pipeline_output.py | 8 +++- src/crewai/pipeline/pipeline_run_result.py | 50 +++++++++++++++++++--- 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index daca7a51d..fed3eb395 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -82,8 +82,27 @@ class Pipeline(BaseModel): return traces_to_return + def format_crew_outputs( + all_stage_outputs: List[List[CrewOutput]], + ) -> List[List[CrewOutput]]: + formatted_crew_outputs: List[List[CrewOutput]] = [] + + # Handle all output stages except the final one + crew_outputs: List[CrewOutput] = [] + for stage_outputs in all_stage_outputs[:-1]: + for output in stage_outputs: + crew_outputs.append(output) + + final_stage = all_stage_outputs[-1] + for output in final_stage: + copied_crew_outputs = crew_outputs.copy() + copied_crew_outputs.append(output) + formatted_crew_outputs.append(copied_crew_outputs) + + return formatted_crew_outputs + def build_pipeline_run_results( - final_stage_outputs: List[CrewOutput], + all_stage_outputs: List[List[CrewOutput]], traces: List[List[Union[str, Dict[str, Any]]]], token_usage: Dict[str, Any], ) -> List[PipelineRunResult]: @@ -93,16 +112,21 @@ class Pipeline(BaseModel): pipeline_run_results: List[PipelineRunResult] = [] - # Format traces formatted_traces = format_traces(traces) + formatted_crew_outputs = format_crew_outputs(all_stage_outputs) - for output, formatted_trace in zip(final_stage_outputs, formatted_traces): - # FORMAT TRACE + for crews_outputs, formatted_trace in zip( + formatted_crew_outputs, formatted_traces + ): + final_crew = crews_outputs[-1] new_pipeline_run_result = PipelineRunResult( - final_output=output, token_usage=token_usage, trace=formatted_trace, + raw=final_crew.raw, + pydantic=final_crew.pydantic, + json_dict=final_crew.json_dict, + crews_outputs=crews_outputs, ) pipeline_run_results.append(new_pipeline_run_result) @@ -112,9 +136,10 @@ class Pipeline(BaseModel): async def process_single_run( run_input: Dict[str, Any] ) -> List[PipelineRunResult]: - stages_queue = deque(self.stages) + stages_queue = deque(self.stages) # TODO: Change over to forloop usage_metrics = {} stage_outputs: List[CrewOutput] = [] + all_stage_outputs: List[List[CrewOutput]] = [] traces: List[List[Union[str, Dict[str, Any]]]] = [[run_input]] stage = None @@ -146,13 +171,16 @@ class Pipeline(BaseModel): # Store output for final results stage_outputs = parallel_outputs + all_stage_outputs.append(stage_outputs) + print("STAGE OUTPUTS: ", stage_outputs) print("TRACES: ", traces) print("TOKEN USAGE: ", usage_metrics) + print("ALL STAGE OUTPUTS: ", all_stage_outputs) # Build final pipeline run results final_results = build_pipeline_run_results( - final_stage_outputs=stage_outputs, + all_stage_outputs=all_stage_outputs, traces=traces, token_usage=usage_metrics, ) diff --git a/src/crewai/pipeline/pipeline_output.py b/src/crewai/pipeline/pipeline_output.py index 06947573b..fc28c7982 100644 --- a/src/crewai/pipeline/pipeline_output.py +++ b/src/crewai/pipeline/pipeline_output.py @@ -1,11 +1,17 @@ +import uuid from typing import List -from pydantic import BaseModel, Field +from pydantic import UUID4, BaseModel, Field from crewai.pipeline.pipeline_run_result import PipelineRunResult class PipelineOutput(BaseModel): + id: UUID4 = Field( + default_factory=uuid.uuid4, + frozen=True, + description="Unique identifier for the object, not set by user.", + ) run_results: List[PipelineRunResult] = Field( description="List of results for each run through the pipeline", default=[] ) diff --git a/src/crewai/pipeline/pipeline_run_result.py b/src/crewai/pipeline/pipeline_run_result.py index 38c64bbc8..172a8f545 100644 --- a/src/crewai/pipeline/pipeline_run_result.py +++ b/src/crewai/pipeline/pipeline_run_result.py @@ -1,22 +1,60 @@ -from typing import Any, Dict, List +import json +import uuid +from typing import Any, Dict, List, Optional, Union -from pydantic import BaseModel, Field +from pydantic import UUID4, BaseModel, Field from crewai.crews.crew_output import CrewOutput class PipelineRunResult(BaseModel): - final_output: CrewOutput = Field( - description="Final output from the last crew in the run" + """Class that represents the result of a pipeline run.""" + + id: UUID4 = Field( + default_factory=uuid.uuid4, + frozen=True, + description="Unique identifier for the object, not set by user.", ) + raw: str = Field(description="Raw output of the pipeline run", default="") + pydantic: Any = Field( + description="Pydantic output of the pipeline run", default=None + ) + json_dict: Union[Dict[str, Any], None] = Field( + description="JSON dict output of the pipeline run", default={} + ) + token_usage: Dict[str, Any] = Field( description="Token usage for each crew in the run" ) trace: List[Any] = Field( description="Trace of the journey of inputs through the run" ) - # TODO: Should we store all outputs from crews along the way? - crews_output: List[CrewOutput] = Field( + crews_outputs: List[CrewOutput] = Field( description="Output from each crew in the run", default=[], ) + + @property + def json(self) -> Optional[str]: + if self.crews_outputs[-1].tasks_output[-1].output_format != "json": + raise ValueError( + "No JSON output found in the final task of the final crew. Please make sure to set the output_json property in the final task in your crew." + ) + + return json.dumps(self.json_dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert json_output and pydantic_output to a dictionary.""" + output_dict = {} + if self.json_dict: + output_dict.update(self.json_dict) + elif self.pydantic: + output_dict.update(self.pydantic.model_dump()) + return output_dict + + def __str__(self): + if self.pydantic: + return str(self.pydantic) + if self.json_dict: + return str(self.json_dict) + return self.raw