From 072044c537ea20ca567aae304e146752145a7523 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Tue, 30 Jul 2024 09:54:11 -0400 Subject: [PATCH] Rename variables based on joaos feedback --- src/crewai/pipeline/pipeline.py | 60 +++++++++---------- ...n_result.py => pipeline_kickoff_result.py} | 2 +- src/crewai/pipeline/pipeline_output.py | 6 +- tests/pipeline/test_pipeline.py | 6 +- 4 files changed, 37 insertions(+), 37 deletions(-) rename src/crewai/pipeline/{pipeline_run_result.py => pipeline_kickoff_result.py} (98%) diff --git a/src/crewai/pipeline/pipeline.py b/src/crewai/pipeline/pipeline.py index e6c74a671..6b4bac450 100644 --- a/src/crewai/pipeline/pipeline.py +++ b/src/crewai/pipeline/pipeline.py @@ -6,7 +6,7 @@ from pydantic import BaseModel, Field, model_validator from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput -from crewai.pipeline.pipeline_run_result import PipelineRunResult +from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult from crewai.types.usage_metrics import UsageMetrics Trace = Union[Union[str, Dict[str, Any]], List[Union[str, Dict[str, Any]]]] @@ -17,12 +17,12 @@ Developer Notes: This module defines a Pipeline class that represents a sequence of operations (stages) to process inputs. Each stage can be either sequential or parallel, and the pipeline -can process multiple runs concurrently. +can process multiple kickoffs concurrently. Core Loop Explanation: -1. The `process_runs` method processes multiple runs in parallel, each going through +1. The `process_kickoffs` method processes multiple kickoffs in parallel, each going through all pipeline stages. -2. The `process_single_run` method handles the processing of a single run through +2. The `process_single_kickoff` method handles the processing of a single kickouff through all stages, updating metrics and input data along the way. 3. The `_process_stage` method determines whether a stage is sequential or parallel and processes it accordingly. @@ -30,8 +30,8 @@ Core Loop Explanation: execution of single and parallel crew stages. 5. The `_update_metrics_and_input` method updates usage metrics and the current input with the outputs from a stage. -6. The `_build_pipeline_run_results` method constructs the final results of the - pipeline run, including traces and outputs. +6. The `_build_pipeline_kickoff_results` method constructs the final results of the + pipeline kickoff, including traces and outputs. Handling Traces and Crew Outputs: - During the processing of stages, we handle the results (traces and crew outputs) @@ -41,14 +41,14 @@ Handling Traces and Crew Outputs: dictionary and passing it to the next stage. This merged dictionary allows for smooth data flow between stages. - For the final stage, in addition to passing the input data, we also need to prepare - the final outputs and traces to be returned as the overall result of the pipeline run. + the final outputs and traces to be returned as the overall result of the pipeline kickoff. In this case, we do not merge the results, as each result needs to be included - separately in its own pipeline run result. + separately in its own pipeline kickoff result. Pipeline Terminology: - Pipeline: The overall structure that defines a sequence of operations. - Stage: A distinct part of the pipeline, which can be either sequential or parallel. -- Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. +- Kickoff: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline. - Branch: Parallel executions within a stage (e.g., concurrent crew operations). - Trace: The journey of an individual input through the entire pipeline. @@ -60,8 +60,8 @@ This represents a pipeline with three sequential stages: 2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3. 3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline. -Each input creates its own run, flowing through all stages of the pipeline. -Multiple runs can be processed concurrently, each following the defined pipeline structure. +Each input creates its own kickoff, flowing through all stages of the pipeline. +Multiple kickoffss can be processed concurrently, each following the defined pipeline structure. Another example pipeline structure: crew1 >> [crew2, crew3] >> crew4 @@ -71,8 +71,8 @@ This represents a pipeline with three stages: 2. A parallel stage with two branches (crew2 and crew3 executing concurrently) 3. Another sequential stage (crew4) -Each input creates its own run, flowing through all stages of the pipeline. -Multiple runs can be processed concurrently, each following the defined pipeline structure. +Each input creates its own kickoff, flowing through all stages of the pipeline. +Multiple kickoffs can be processed concurrently, each following the defined pipeline structure. """ @@ -111,22 +111,22 @@ class Pipeline(BaseModel): return values async def kickoff( - self, run_inputs: List[Dict[str, Any]] - ) -> List[PipelineRunResult]: + self, inputs: List[Dict[str, Any]] + ) -> List[PipelineKickoffResult]: """ Processes multiple runs in parallel, each going through all pipeline stages. Args: - run_inputs (List[Dict[str, Any]]): List of inputs for each run. + inputs (List[Dict[str, Any]]): List of inputs for each run. Returns: - List[PipelineRunResult]: List of results from each run. + List[PipelineKickoffResult]: List of results from each run. """ - pipeline_results: List[PipelineRunResult] = [] + pipeline_results: List[PipelineKickoffResult] = [] # Process all runs in parallel all_run_results = await asyncio.gather( - *(self.process_single_kickoff(input_data) for input_data in run_inputs) + *(self.process_single_kickoff(input_data) for input_data in inputs) ) # Flatten the list of lists into a single list of results @@ -137,19 +137,19 @@ class Pipeline(BaseModel): return pipeline_results async def process_single_kickoff( - self, run_input: Dict[str, Any] - ) -> List[PipelineRunResult]: + self, kickoff_input: Dict[str, Any] + ) -> List[PipelineKickoffResult]: """ Processes a single run through all pipeline stages. Args: - run_input (Dict[str, Any]): The input for the run. + input (Dict[str, Any]): The input for the run. Returns: - List[PipelineRunResult]: The results of processing the run. + List[PipelineKickoffResult]: The results of processing the run. """ - initial_input = copy.deepcopy(run_input) - current_input = copy.deepcopy(run_input) + initial_input = copy.deepcopy(kickoff_input) + current_input = copy.deepcopy(kickoff_input) pipeline_usage_metrics: Dict[str, UsageMetrics] = {} all_stage_outputs: List[List[CrewOutput]] = [] traces: List[List[Union[str, Dict[str, Any]]]] = [[initial_input]] @@ -164,7 +164,7 @@ class Pipeline(BaseModel): traces.append(stage_trace) all_stage_outputs.append(stage_outputs) - return self._build_pipeline_run_results( + return self._build_pipeline_kickoff_results( all_stage_outputs, traces, pipeline_usage_metrics ) @@ -240,12 +240,12 @@ class Pipeline(BaseModel): usage_metrics[crew.name or str(crew.id)] = output.token_usage current_input.update(output.to_dict()) - def _build_pipeline_run_results( + def _build_pipeline_kickoff_results( self, all_stage_outputs: List[List[CrewOutput]], traces: List[List[Union[str, Dict[str, Any]]]], token_usage: Dict[str, UsageMetrics], - ) -> List[PipelineRunResult]: + ) -> List[PipelineKickoffResult]: """ Builds the results of a pipeline run. @@ -255,13 +255,13 @@ class Pipeline(BaseModel): token_usage (Dict[str, Any]): Token usage metrics. Returns: - List[PipelineRunResult]: The results of the pipeline run. + List[PipelineKickoffResult]: The results of the pipeline run. """ formatted_traces = self._format_traces(traces) formatted_crew_outputs = self._format_crew_outputs(all_stage_outputs) return [ - PipelineRunResult( + PipelineKickoffResult( token_usage=token_usage, trace=formatted_trace, raw=crews_outputs[-1].raw, diff --git a/src/crewai/pipeline/pipeline_run_result.py b/src/crewai/pipeline/pipeline_kickoff_result.py similarity index 98% rename from src/crewai/pipeline/pipeline_run_result.py rename to src/crewai/pipeline/pipeline_kickoff_result.py index 576cd9949..7bde238cd 100644 --- a/src/crewai/pipeline/pipeline_run_result.py +++ b/src/crewai/pipeline/pipeline_kickoff_result.py @@ -8,7 +8,7 @@ from crewai.crews.crew_output import CrewOutput from crewai.types.usage_metrics import UsageMetrics -class PipelineRunResult(BaseModel): +class PipelineKickoffResult(BaseModel): """Class that represents the result of a pipeline run.""" id: UUID4 = Field( diff --git a/src/crewai/pipeline/pipeline_output.py b/src/crewai/pipeline/pipeline_output.py index fc28c7982..d9875b64a 100644 --- a/src/crewai/pipeline/pipeline_output.py +++ b/src/crewai/pipeline/pipeline_output.py @@ -3,7 +3,7 @@ from typing import List from pydantic import UUID4, BaseModel, Field -from crewai.pipeline.pipeline_run_result import PipelineRunResult +from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult class PipelineOutput(BaseModel): @@ -12,9 +12,9 @@ class PipelineOutput(BaseModel): frozen=True, description="Unique identifier for the object, not set by user.", ) - run_results: List[PipelineRunResult] = Field( + run_results: List[PipelineKickoffResult] = Field( description="List of results for each run through the pipeline", default=[] ) - def add_run_result(self, result: PipelineRunResult): + def add_run_result(self, result: PipelineKickoffResult): self.run_results.append(result) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index fef6a87a6..dcdd49efe 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -6,7 +6,7 @@ from crewai.agent import Agent from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput from crewai.pipeline.pipeline import Pipeline -from crewai.pipeline.pipeline_run_result import PipelineRunResult +from crewai.pipeline.pipeline_kickoff_result import PipelineKickoffResult from crewai.process import Process from crewai.task import Task from crewai.tasks.task_output import TaskOutput @@ -108,7 +108,7 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory): mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"}) for pipeline_result in pipeline_results: - assert isinstance(pipeline_result, PipelineRunResult) + assert isinstance(pipeline_result, PipelineKickoffResult) assert pipeline_result.raw == "Test output" assert len(pipeline_result.crews_outputs) == 1 print("pipeline_result.token_usage", pipeline_result.token_usage) @@ -194,7 +194,7 @@ async def test_pipeline_process_streams_single_input_pydantic_output(mock_crew_f print("pipeline_result.trace", pipeline_result.trace) - assert isinstance(pipeline_result, PipelineRunResult) + assert isinstance(pipeline_result, PipelineKickoffResult) assert pipeline_result.raw == "Test output" assert len(pipeline_result.crews_outputs) == 1 assert pipeline_result.token_usage == {crew_name: DEFAULT_TOKEN_USAGE}