Going to start refactoring for pipeline_output

This commit is contained in:
Brandon Hancock
2024-07-18 11:20:26 -04:00
parent c0c329b6e0
commit 834c62feca
8 changed files with 322 additions and 299 deletions

View File

@@ -48,7 +48,7 @@ except ImportError:
agentops = None
if TYPE_CHECKING:
from crewai.procedure.procedure import Procedure
from crewai.pipeline.pipeline import Pipeline
class Crew(BaseModel):
@@ -946,17 +946,17 @@ class Crew(BaseModel):
return total_usage_metrics
def __rshift__(self, other: "Crew") -> "Procedure":
def __rshift__(self, other: "Crew") -> "Pipeline":
"""
Implements the >> operator to add another Crew to an existing Procedure.
Implements the >> operator to add another Crew to an existing Pipeline.
"""
from crewai.procedure.procedure import Procedure
from crewai.pipeline.pipeline import Pipeline
if not isinstance(other, Crew):
raise TypeError(
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
)
return Procedure(crews=[self, other])
return Pipeline(stages=[self, other])
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"

View File

@@ -0,0 +1,3 @@
from crewai.pipeline.pipeline import Pipeline
__all__ = ["Pipeline"]

View File

@@ -0,0 +1,92 @@
import asyncio
from typing import Any, Dict, List, Union
from pydantic import BaseModel, Field
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
"""
Pipeline Terminology:
Pipeline: The overall structure that defines a sequence of operations.
Stage: A distinct part of the pipeline, which can be either sequential or parallel.
Branch: Parallel executions within a stage (e.g., concurrent crew operations).
Stream: The journey of an individual input through the entire pipeline.
Example pipeline structure:
crew1 >> [crew2, crew3] >> crew4
This represents a pipeline with three stages:
1. A sequential stage (crew1)
2. A parallel stage with two branches (crew2 and crew3 executing concurrently)
3. Another sequential stage (crew4)
Each input creates its own stream, flowing through all stages of the pipeline.
Multiple streams can be processed concurrently, each following the defined pipeline structure.
"""
class Pipeline(BaseModel):
stages: List[Union[Crew, List[Crew]]] = Field(
..., description="List of crews representing stages to be executed in sequence"
)
async def process_streams(
self, stream_inputs: List[Dict[str, Any]]
) -> List[List[CrewOutput]]:
"""
Process multiple streams in parallel, with each stream going through all stages.
"""
async def process_single_stream(
stream_input: Dict[str, Any]
) -> List[CrewOutput]:
print("current_input in stream", stream_input)
stage_outputs = []
for stage in self.stages:
if isinstance(stage, Crew):
# Process single crew
stage_output = await stage.kickoff_async(inputs=stream_input)
stage_outputs = [stage_output]
else:
# Process each crew in parallel
parallel_outputs = await asyncio.gather(
*[crew.kickoff_async(inputs=stream_input) for crew in stage]
)
stage_outputs = parallel_outputs
# Convert all CrewOutputs from stage into a dictionary for next stage
# and update original stream_input dictionary with new values
stage_output_dicts = [output.to_dict() for output in stage_outputs]
for stage_dict in stage_output_dicts:
stream_input.update(stage_dict)
print("UPDATING stream_input - new values:", stream_input)
# Return all CrewOutputs from this stream
return stage_outputs
# Process all streams in parallel
return await asyncio.gather(
*(process_single_stream(input_data) for input_data in stream_inputs)
)
def __rshift__(self, other: Any) -> "Pipeline":
"""
Implements the >> operator to add another Stage (Crew or List[Crew]) to an existing Pipeline.
"""
if isinstance(other, Crew):
return type(self)(stages=self.stages + [other])
elif isinstance(other, list) and all(isinstance(crew, Crew) for crew in other):
return type(self)(stages=self.stages + [other])
else:
raise TypeError(
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
)
# Helper function to run the pipeline
async def run_pipeline(
pipeline: Pipeline, inputs: List[Dict[str, Any]]
) -> List[List[CrewOutput]]:
return await pipeline.process_streams(inputs)

View File

@@ -0,0 +1,21 @@
from typing import Any, Dict, List
from pydantic import BaseModel, Field
from crewai.crews.crew_output import CrewOutput
class PipelineOutput(BaseModel):
final_outputs: List[CrewOutput] = Field(
description="List of final outputs from the last crew in the pipeline",
default=[],
)
token_usage: List[List[Dict[str, Any]]] = Field(
description="Token usage for each crew in each stream", default=[]
)
def add_final_output(self, output: CrewOutput):
self.final_outputs.append(output)
def add_token_usage(self, usage: List[Dict[str, Any]]):
self.token_usage.append(usage)

View File

@@ -1,3 +0,0 @@
from crewai.procedure.procedure import Procedure
__all__ = ["Procedure"]

View File

@@ -1,76 +0,0 @@
import asyncio
from typing import Any, Dict, List
from pydantic import BaseModel, Field
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
class Procedure(BaseModel):
crews: List[Crew] = Field(
..., description="List of crews to be executed in sequence"
)
def kickoff(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
current_inputs = inputs
crew_outputs = []
for index, crew in enumerate(self.crews):
# Process all inputs for the current crew
crew_outputs = self._process_crew(crew, current_inputs)
# If this is not the last crew, prepare inputs for the next crew
if index < len(self.crews) - 1:
current_inputs = [output.to_dict() for output in crew_outputs]
else:
# For the last crew, we don't need to convert the output to input
return crew_outputs
return crew_outputs
async def kickoff_async(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
current_inputs = inputs
crew_outputs = []
for index, crew in enumerate(self.crews):
# Process all inputs for the current crew
crew_outputs = await self._process_crew(crew, current_inputs)
# If this is not the last crew, prepare inputs for the next crew
if index < len(self.crews) - 1:
current_inputs = [output.to_dict() for output in crew_outputs]
else:
# For the last crew, we don't need to convert the output to input
return crew_outputs
return crew_outputs
def _process_crew(
self, crew: Crew, inputs: List[Dict[str, Any]]
) -> List[CrewOutput]:
# Kickoff crew for each input
outputs = [crew.kickoff(inputs=input_data) for input_data in inputs]
return outputs
async def _process_crew_async(
self, crew: Crew, inputs: List[Dict[str, Any]]
) -> List[CrewOutput]:
# Kickoff crew asynchronously for each input
crew_kickoffs = [crew.kickoff_async(inputs=input_data) for input_data in inputs]
# Wait for all kickoffs to complete
outputs = await asyncio.gather(*crew_kickoffs)
return outputs
def __rshift__(self, other: Crew) -> "Procedure":
"""
Implements the >> operator to add another Crew to an existing Procedure.
"""
if not isinstance(other, Crew):
raise TypeError(
f"Unsupported operand type for >>: '{type(self).__name__}' and '{type(other).__name__}'"
)
return type(self)(crews=self.crews + [other])

View File

@@ -0,0 +1,201 @@
from unittest.mock import MagicMock
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.pipeline.pipeline import Pipeline
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@pytest.fixture
def mock_crew_factory():
def _create_mock_crew(output_json_dict=None):
crew = MagicMock(spec=Crew)
task_output = TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
crew_output = CrewOutput(
raw="Test output",
tasks_output=[task_output],
token_usage={
"total_tokens": 100,
"prompt_tokens": 50,
"completion_tokens": 50,
},
json_dict=output_json_dict if output_json_dict else {"key": "value"},
)
async def async_kickoff(inputs=None):
print("inputs in async_kickoff", inputs)
return crew_output
crew.kickoff_async.side_effect = async_kickoff
# Add more attributes that Procedure might be expecting
crew.verbose = False
crew.output_log_file = None
crew.max_rpm = None
crew.memory = False
crew.process = Process.sequential
crew.config = None
crew.cache = True
# Add non-empty agents and tasks
mock_agent = MagicMock(spec=Agent)
mock_task = MagicMock(spec=Task)
mock_task.agent = mock_agent
mock_task.async_execution = False
mock_task.context = None
crew.agents = [mock_agent]
crew.tasks = [mock_task]
return crew
return _create_mock_crew
def test_pipeline_initialization(mock_crew_factory):
"""
Test that a Pipeline is correctly initialized with the given stages.
"""
crew1 = mock_crew_factory()
crew2 = mock_crew_factory()
pipeline = Pipeline(stages=[crew1, crew2])
assert len(pipeline.stages) == 2
assert pipeline.stages[0] == crew1
assert pipeline.stages[1] == crew2
@pytest.mark.asyncio
async def test_pipeline_process_streams_single_input(mock_crew_factory):
"""
Test that Pipeline.process_streams() correctly processes a single input
and returns the expected CrewOutput.
"""
mock_crew = mock_crew_factory()
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key": "value"}]
pipeline_result = await pipeline.process_streams(input_data)
mock_crew.kickoff_async.assert_called_once_with(inputs={"key": "value"})
for stream_result in pipeline_result:
assert isinstance(stream_result[0], CrewOutput)
assert stream_result[0].raw == "Test output"
assert len(stream_result[0].tasks_output) == 1
assert stream_result[0].tasks_output[0].raw == "Task output"
assert stream_result[0].token_usage == {
"total_tokens": 100,
"prompt_tokens": 50,
"completion_tokens": 50,
}
@pytest.mark.asyncio
async def test_pipeline_process_streams_multiple_inputs(mock_crew_factory):
"""
Test that Pipeline.process_streams() correctly processes multiple inputs
and returns the expected CrewOutputs.
"""
mock_crew = mock_crew_factory()
pipeline = Pipeline(stages=[mock_crew])
input_data = [{"key1": "value1"}, {"key2": "value2"}]
pipeline_result = await pipeline.process_streams(input_data)
assert mock_crew.kickoff_async.call_count == 2
assert len(pipeline_result) == 2
for stream_result in pipeline_result:
assert all(
isinstance(stream_output, CrewOutput) for stream_output in stream_result
)
@pytest.mark.asyncio
async def test_pipeline_with_parallel_stages(mock_crew_factory):
"""
Test that Pipeline correctly handles parallel stages.
"""
crew1 = mock_crew_factory()
crew2 = mock_crew_factory()
crew3 = mock_crew_factory()
pipeline = Pipeline(stages=[crew1, [crew2, crew3]])
input_data = [{"initial": "data"}]
pipeline_result = await pipeline.process_streams(input_data)
crew1.kickoff_async.assert_called_once_with(
inputs={"initial": "data", "key": "value"}
)
crew2.kickoff_async.assert_called_once_with(
inputs={"initial": "data", "key": "value"}
)
crew3.kickoff_async.assert_called_once_with(
inputs={"initial": "data", "key": "value"}
)
assert len(pipeline_result) == 1
for stage_result in pipeline_result:
assert isinstance(stage_result[0], CrewOutput)
def test_pipeline_rshift_operator(mock_crew_factory):
"""
Test that the >> operator correctly creates a Pipeline from Crews and lists of Crews.
"""
crew1 = mock_crew_factory()
crew2 = mock_crew_factory()
crew3 = mock_crew_factory()
# Test single crew addition
pipeline = Pipeline(stages=[]) >> crew1
assert len(pipeline.stages) == 1
assert pipeline.stages[0] == crew1
# Test adding a list of crews
pipeline = Pipeline(stages=[crew1])
pipeline = pipeline >> [crew2, crew3]
print("pipeline.stages:", pipeline.stages)
assert len(pipeline.stages) == 2
assert pipeline.stages[1] == [crew2, crew3]
# Test error case: trying to shift with non-Crew object
with pytest.raises(TypeError):
pipeline >> "not a crew"
"""
TODO: Figure out what is the proper output for a pipeline with multiple stages
Options:
- Should the final output only include the last stage's output?
- Should the final output include the accumulation of previous stages' outputs?
"""
@pytest.mark.asyncio
async def test_pipeline_data_accumulation(mock_crew_factory):
"""
Test that data is correctly accumulated through the pipeline stages.
"""
crew1 = mock_crew_factory(output_json_dict={"key1": "value1"})
crew2 = mock_crew_factory(output_json_dict={"key2": "value2"})
pipeline = Pipeline(stages=[crew1, crew2])
input_data = [{"initial": "data"}]
pipeline_result = await pipeline.process_streams(input_data)
assert len(pipeline_result) == 1
print("RESULT: ", pipeline_result)
for stream_result in pipeline_result:
print("STREAM RESULT: ", stream_result)
assert stream_result[0].json_dict == {
"initial": "data",
"key1": "value1",
"key2": "value2",
}

View File

@@ -1,215 +0,0 @@
from unittest.mock import MagicMock
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.procedure.procedure import Procedure
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@pytest.fixture
def mock_crew_factory():
def _create_mock_crew():
crew = MagicMock(spec=Crew)
task_output = TaskOutput(
description="Test task", raw="Task output", agent="Test Agent"
)
crew_output = CrewOutput(
raw="Test output",
tasks_output=[task_output],
token_usage={
"total_tokens": 100,
"prompt_tokens": 50,
"completion_tokens": 50,
},
json_dict={"key": "value"},
)
async def async_kickoff(inputs=None):
return crew_output
crew.kickoff.return_value = crew_output
crew.kickoff_async.side_effect = async_kickoff
# Add more attributes that Procedure might be expecting
crew.verbose = 0
crew.output_log_file = None
crew.max_rpm = None
crew.memory = False
crew.process = Process.sequential
crew.config = None
crew.cache = True
# Add non-empty agents and tasks
mock_agent = MagicMock(spec=Agent)
mock_task = MagicMock(spec=Task)
mock_task.agent = mock_agent
mock_task.async_execution = False
mock_task.context = None
crew.agents = [mock_agent]
crew.tasks = [mock_task]
return crew
return _create_mock_crew
def test_procedure_initialization(mock_crew_factory):
"""
Test that a Procedure is correctly initialized with the given crews.
"""
crew1 = mock_crew_factory()
crew2 = mock_crew_factory()
procedure = Procedure(crews=[crew1, crew2])
assert len(procedure.crews) == 2
assert procedure.crews[0] == crew1
assert procedure.crews[1] == crew2
@pytest.mark.asyncio
async def test_procedure_kickoff_single_input(mock_crew_factory):
"""
Test that Procedure.kickoff() correctly processes a single input
and returns the expected CrewOutput.
"""
mock_crew_1 = mock_crew_factory()
procedure = Procedure(crews=[mock_crew_1])
input_data = {"key": "value"}
result = await procedure.kickoff([input_data])
mock_crew_1.kickoff_async.assert_called_once_with(inputs=input_data)
assert len(result) == 1
assert isinstance(result[0], CrewOutput)
assert result[0].raw == "Test output"
assert len(result[0].tasks_output) == 1
assert result[0].tasks_output[0].raw == "Task output"
assert result[0].token_usage == {
"total_tokens": 100,
"prompt_tokens": 50,
"completion_tokens": 50,
}
@pytest.mark.asyncio
async def test_procedure_kickoff_multiple_inputs(mock_crew_factory):
"""
Test that Procedure.kickoff() correctly processes multiple inputs
and returns the expected CrewOutputs.
"""
mock_crew_1, mock_crew_2 = mock_crew_factory(), mock_crew_factory()
procedure = Procedure(crews=[mock_crew_1, mock_crew_2])
input_data = [{"key1": "value1"}, {"key2": "value2"}]
result = await procedure.kickoff(input_data)
expected_call_count_per_crew = 2
assert mock_crew_1.kickoff_async.call_count == expected_call_count_per_crew
assert mock_crew_2.kickoff_async.call_count == expected_call_count_per_crew
assert len(result) == 2
assert all(isinstance(r, CrewOutput) for r in result)
assert all(len(r.tasks_output) == 1 for r in result)
assert all(
r.token_usage
== {"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50}
for r in result
)
@pytest.mark.asyncio
async def test_procedure_chaining(mock_crew_factory):
"""
Test that Procedure correctly chains multiple crews, passing the output
of one crew as input to the next crew in the sequence.
This test verifies:
1. The first crew receives the initial input.
2. The second crew receives the output from the first crew as its input.
3. The final output contains the result from the last crew in the chain.
4. Task outputs and token usage are correctly propagated through the chain.
"""
crew1, crew2 = mock_crew_factory(), mock_crew_factory()
task_output1 = TaskOutput(description="Task 1", raw="Output 1", agent="Agent 1")
task_output2 = TaskOutput(description="Task 2", raw="Final output", agent="Agent 2")
crew_output1 = CrewOutput(
raw="Output 1",
tasks_output=[task_output1],
token_usage={"total_tokens": 100, "prompt_tokens": 50, "completion_tokens": 50},
json_dict={"key1": "value1"},
)
crew_output2 = CrewOutput(
raw="Final output",
tasks_output=[task_output2],
token_usage={"total_tokens": 150, "prompt_tokens": 75, "completion_tokens": 75},
json_dict={"key2": "value2"},
)
async def async_kickoff1(inputs=None):
return crew_output1
async def async_kickoff2(inputs=None):
return crew_output2
crew1.kickoff_async.side_effect = async_kickoff1
crew2.kickoff_async.side_effect = async_kickoff2
procedure = Procedure(crews=[crew1, crew2])
input_data = [{"initial": "data"}]
result = await procedure.kickoff(input_data)
# Check that the first crew received the initial input
crew1.kickoff_async.assert_called_once_with(inputs={"initial": "data"})
# Check that the second crew received the output from the first crew as its input
crew2.kickoff_async.assert_called_once_with(inputs=crew_output1.to_dict())
# Check the final output
assert len(result) == 1
assert isinstance(result[0], CrewOutput)
assert result[0].raw == "Final output"
assert len(result[0].tasks_output) == 1
assert result[0].tasks_output[0].raw == "Final output"
assert result[0].token_usage == {
"total_tokens": 150,
"prompt_tokens": 75,
"completion_tokens": 75,
}
assert result[0].json_dict == {"key2": "value2"}
def test_crew_rshift_operator():
"""
Test that the >> operator correctly creates a Procedure from two Crews.
"""
# Create minimal Crew instances
agent = Agent(role="Test Agent", goal="Test Goal", backstory="Test Backstory")
task = Task(agent=agent, description="Test Task", expected_output="Test Output")
crew1 = Crew(agents=[agent], tasks=[task])
crew2 = Crew(agents=[agent], tasks=[task])
crew3 = Crew(agents=[agent], tasks=[task])
# Test the >> operator
procedure = crew1 >> crew2
assert isinstance(procedure, Procedure)
assert len(procedure.crews) == 2
assert procedure.crews[0] == crew1
assert procedure.crews[1] == crew2
# Test chaining multiple crews
procedure = crew1 >> crew2 >> crew3
assert isinstance(procedure, Procedure)
assert len(procedure.crews) == 3
assert procedure.crews[0] == crew1
assert procedure.crews[1] == crew2
assert procedure.crews[2] == crew3
# Test error case: trying to shift with non-Crew object
with pytest.raises(TypeError):
crew1 >> "not a crew"