Update terminology

This commit is contained in:
Brandon Hancock
2024-07-18 14:59:38 -04:00
parent 834c62feca
commit d094e178f1
3 changed files with 41 additions and 30 deletions

View File

@@ -94,6 +94,7 @@ class Crew(BaseModel):
default_factory=TaskOutputStorageHandler
)
name: Optional[str] = Field(default="")
cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(default_factory=list)

View File

@@ -5,15 +5,28 @@ from pydantic import BaseModel, Field
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline_output import PipelineOutput
"""
Pipeline Terminology:
Pipeline: The overall structure that defines a sequence of operations.
Stage: A distinct part of the pipeline, which can be either sequential or parallel.
Run: A specific execution of the pipeline for a given set of inputs, representing a single instance of processing through the pipeline.
Branch: Parallel executions within a stage (e.g., concurrent crew operations).
Stream: The journey of an individual input through the entire pipeline.
Trace: The journey of an individual input through the entire pipeline.
Example pipeline structure:
crew1 >> crew2 >> crew3
This represents a pipeline with three sequential stages:
1. crew1 is the first stage, which processes the input and passes its output to crew2.
2. crew2 is the second stage, which takes the output from crew1 as its input, processes it, and passes its output to crew3.
3. crew3 is the final stage, which takes the output from crew2 as its input and produces the final output of the pipeline.
Each input creates its own run, flowing through all stages of the pipeline.
Multiple runs can be processed concurrently, each following the defined pipeline structure.
Another example pipeline structure:
crew1 >> [crew2, crew3] >> crew4
This represents a pipeline with three stages:
@@ -21,8 +34,8 @@ This represents a pipeline with three stages:
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
3. Another sequential stage (crew4)
Each input creates its own stream, flowing through all stages of the pipeline.
Multiple streams can be processed concurrently, each following the defined pipeline structure.
Each input creates its own run, flowing through all stages of the pipeline.
Multiple runs can be processed concurrently, each following the defined pipeline structure.
"""
@@ -31,44 +44,43 @@ class Pipeline(BaseModel):
..., description="List of crews representing stages to be executed in sequence"
)
async def process_streams(
self, stream_inputs: List[Dict[str, Any]]
async def process_runs(
self, run_inputs: List[Dict[str, Any]]
) -> List[List[CrewOutput]]:
"""
Process multiple streams in parallel, with each stream going through all stages.
Process multiple runs in parallel, with each run going through all stages.
"""
pipeline_output = PipelineOutput()
async def process_single_stream(
stream_input: Dict[str, Any]
) -> List[CrewOutput]:
print("current_input in stream", stream_input)
async def process_single_run(run_input: Dict[str, Any]) -> List[CrewOutput]:
print("current_input in run", run_input)
stage_outputs = []
for stage in self.stages:
if isinstance(stage, Crew):
# Process single crew
stage_output = await stage.kickoff_async(inputs=stream_input)
stage_output = await stage.kickoff_async(inputs=run_input)
stage_outputs = [stage_output]
else:
# Process each crew in parallel
parallel_outputs = await asyncio.gather(
*[crew.kickoff_async(inputs=stream_input) for crew in stage]
*[crew.kickoff_async(inputs=run_input) for crew in stage]
)
stage_outputs = parallel_outputs
# Convert all CrewOutputs from stage into a dictionary for next stage
# and update original stream_input dictionary with new values
# and update original run_input dictionary with new values
stage_output_dicts = [output.to_dict() for output in stage_outputs]
for stage_dict in stage_output_dicts:
stream_input.update(stage_dict)
print("UPDATING stream_input - new values:", stream_input)
run_input.update(stage_dict)
print("UPDATING run_input - new values:", run_input)
# Return all CrewOutputs from this stream
# Return all CrewOutputs from this run
return stage_outputs
# Process all streams in parallel
# Process all runs in parallel
return await asyncio.gather(
*(process_single_stream(input_data) for input_data in stream_inputs)
*(process_single_run(input_data) for input_data in run_inputs)
)
def __rshift__(self, other: Any) -> "Pipeline":
@@ -89,4 +101,4 @@ class Pipeline(BaseModel):
async def run_pipeline(
pipeline: Pipeline, inputs: List[Dict[str, Any]]
) -> List[List[CrewOutput]]:
return await pipeline.process_streams(inputs)
return await pipeline.process_runs(inputs)

View File

@@ -80,7 +80,7 @@ async def test_pipeline_process_streams_single_input(mock_crew_factory):
mock_crew = mock_crew_factory()
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key": "value"}]
pipeline_result = await pipeline.process_streams(input_data)
pipeline_result = await pipeline.process_runs(input_data)
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
for stream_result in pipeline_result:
@@ -104,14 +104,12 @@ async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory):
mock_crew = mock_crew_factory()
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key1": "value1"}, {"key2": "value2"}]
pipeline_result = await pipeline.process_streams(input_data)
pipeline_result = await pipeline.process_runs(input_data)
assert mock_crew.kickoff_async.call_count == 2
assert len(pipeline_result) == 2
for stream_result in pipeline_result:
assert all(
isinstance(stream_output, CrewOutput) for stream_output in stream_result
)
for run_result in pipeline_result:
assert all(isinstance(run_output, CrewOutput) for run_output in run_result)
@pytest.mark.asyncio
@@ -126,7 +124,7 @@ async def test_pipeline_with_parallel_stages(mock_crew_factory):
pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
input_data = [{"initial": "data"}]
pipeline_result = await pipeline.process_streams(input_data)
pipeline_result = await pipeline.process_runs(input_data)
crew1.kickoff_async.assert_called_once_with(
inputs={"initial": "data", "key": "value"}
@@ -188,13 +186,13 @@ async def test_pipeline_data_accumulation(mock_crew_factory):
pipeline = Pipeline(stages=[crew1, crew2])
input_data = [{"initial": "data"}]
pipeline_result = await pipeline.process_streams(input_data)
pipeline_result = await pipeline.process_runs(input_data)
assert len(pipeline_result) == 1
print("RESULT: ", pipeline_result)
for stream_result in pipeline_result:
print("STREAM RESULT: ", stream_result)
assert stream_result[0].json_dict == {
for run_result in pipeline_result:
print("RUN RESULT: ", run_result)
assert run_result[0].json_dict == {
"initial": "data",
"key1": "value1",
"key2": "value2",