mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-28 05:42:46 +00:00
Compare commits
1 Commits
devin/1739
...
devin/1739
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a34cc7293c |
@@ -14,6 +14,7 @@ icon: bars-staggered
|
||||
|
||||
- **Sequential**: Executes tasks sequentially, ensuring tasks are completed in an orderly progression.
|
||||
- **Hierarchical**: Organizes tasks in a managerial hierarchy, where tasks are delegated and executed based on a structured chain of command. A manager language model (`manager_llm`) or a custom manager agent (`manager_agent`) must be specified in the crew to enable the hierarchical process, facilitating the creation and management of tasks by the manager.
|
||||
- **Parallel**: Enables concurrent execution of multiple flows, allowing transitions from one flow to multiple parallel flows for improved task parallelization. Parallel execution is automatically handled using asyncio for optimal performance.
|
||||
- **Consensual Process (Planned)**: Aiming for collaborative decision-making among agents on task execution, this process type introduces a democratic approach to task management within CrewAI. It is planned for future development and is not currently implemented in the codebase.
|
||||
|
||||
## The Role of Processes in Teamwork
|
||||
@@ -57,9 +58,30 @@ Emulates a corporate hierarchy, CrewAI allows specifying a custom manager agent
|
||||
|
||||
## Process Class: Detailed Overview
|
||||
|
||||
The `Process` class is implemented as an enumeration (`Enum`), ensuring type safety and restricting process values to the defined types (`sequential`, `hierarchical`). The consensual process is planned for future inclusion, emphasizing our commitment to continuous development and innovation.
|
||||
The `Process` class is implemented as an enumeration (`Enum`), ensuring type safety and restricting process values to the defined types (`sequential`, `hierarchical`, `parallel`). The consensual process is planned for future inclusion, emphasizing our commitment to continuous development and innovation.
|
||||
|
||||
## Parallel Process
|
||||
|
||||
The parallel process type enables concurrent execution of multiple flows, leveraging Python's asyncio for efficient task parallelization. When using parallel execution:
|
||||
|
||||
- Multiple start methods are executed concurrently
|
||||
- Listeners can run in parallel when triggered by the same method
|
||||
- State consistency is maintained through thread-safe operations
|
||||
- Execution timing and order are preserved where necessary
|
||||
|
||||
Example of parallel flow execution:
|
||||
```python
|
||||
from crewai import Crew, Process
|
||||
|
||||
# Create a crew with parallel process
|
||||
crew = Crew(
|
||||
agents=my_agents,
|
||||
tasks=my_tasks,
|
||||
process=Process.parallel
|
||||
)
|
||||
```
|
||||
|
||||
## Conclusion
|
||||
|
||||
The structured collaboration facilitated by processes within CrewAI is crucial for enabling systematic teamwork among agents.
|
||||
This documentation has been updated to reflect the latest features, enhancements, and the planned integration of the Consensual Process, ensuring users have access to the most current and comprehensive information.
|
||||
This documentation has been updated to reflect the latest features, enhancements, and the planned integration of the Consensual Process, ensuring users have access to the most current and comprehensive information.
|
||||
|
||||
@@ -314,46 +314,6 @@ class BaseAgent(ABC, BaseModel):
|
||||
|
||||
return copied_agent
|
||||
|
||||
def _interpolate_only(self, input_string: str, inputs: Dict[str, Any]) -> str:
|
||||
"""Interpolate placeholders in a string while preserving JSON-like structures.
|
||||
|
||||
Args:
|
||||
input_string (str): The string containing placeholders to interpolate.
|
||||
inputs (Dict[str, Any]): Dictionary of values for interpolation.
|
||||
|
||||
Returns:
|
||||
str: The interpolated string with JSON structures preserved.
|
||||
|
||||
Example:
|
||||
>>> _interpolate_only("Name: {name}, Config: {'key': 'value'}", {"name": "John"})
|
||||
"Name: John, Config: {'key': 'value'}"
|
||||
|
||||
Raises:
|
||||
ValueError: If input_string is None or empty, or if inputs is empty
|
||||
KeyError: If a required template variable is missing from inputs
|
||||
"""
|
||||
if not input_string:
|
||||
raise ValueError("Input string cannot be None or empty")
|
||||
if not inputs:
|
||||
raise ValueError("Inputs dictionary cannot be empty")
|
||||
|
||||
try:
|
||||
# First check if all required variables are present
|
||||
required_vars = [
|
||||
var.split("}")[0] for var in input_string.split("{")[1:]
|
||||
if "}" in var
|
||||
]
|
||||
for var in required_vars:
|
||||
if var not in inputs:
|
||||
raise KeyError(f"Missing required template variable: {var}")
|
||||
|
||||
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
|
||||
for key in inputs.keys():
|
||||
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
|
||||
return escaped_string.format(**inputs)
|
||||
except ValueError as e:
|
||||
raise ValueError(f"Error during string interpolation: {str(e)}") from e
|
||||
|
||||
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
|
||||
"""Interpolate inputs into the agent description and backstory."""
|
||||
if self._original_role is None:
|
||||
@@ -364,9 +324,9 @@ class BaseAgent(ABC, BaseModel):
|
||||
self._original_backstory = self.backstory
|
||||
|
||||
if inputs:
|
||||
self.role = self._interpolate_only(self._original_role, inputs)
|
||||
self.goal = self._interpolate_only(self._original_goal, inputs)
|
||||
self.backstory = self._interpolate_only(self._original_backstory, inputs)
|
||||
self.role = self._original_role.format(**inputs)
|
||||
self.goal = self._original_goal.format(**inputs)
|
||||
self.backstory = self._original_backstory.format(**inputs)
|
||||
|
||||
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
|
||||
"""Set the cache handler for the agent.
|
||||
|
||||
@@ -8,4 +8,5 @@ class Process(str, Enum):
|
||||
|
||||
sequential = "sequential"
|
||||
hierarchical = "hierarchical"
|
||||
parallel = "parallel"
|
||||
# TODO: consensual = 'consensual'
|
||||
|
||||
@@ -1357,51 +1357,6 @@ def test_handle_context_length_exceeds_limit_cli_no():
|
||||
mock_handle_context.assert_not_called()
|
||||
|
||||
|
||||
def test_interpolate_inputs_with_tool_description():
|
||||
from crewai.tools import BaseTool
|
||||
|
||||
class DummyTool(BaseTool):
|
||||
name: str = "dummy_tool"
|
||||
description: str = "Tool Arguments: {'arg': {'description': 'test arg', 'type': 'str'}}"
|
||||
|
||||
def _run(self, arg: str) -> str:
|
||||
"""Run the tool."""
|
||||
return f"Dummy result for: {arg}"
|
||||
|
||||
tool = DummyTool()
|
||||
agent = Agent(
|
||||
role="{topic} specialist",
|
||||
goal="Figure {goal} out",
|
||||
backstory="I am the master of {role}\nTools: {tool_desc}",
|
||||
)
|
||||
|
||||
agent.interpolate_inputs({
|
||||
"topic": "AI",
|
||||
"goal": "life",
|
||||
"role": "all things",
|
||||
"tool_desc": tool.description
|
||||
})
|
||||
assert "Tool Arguments: {'arg': {'description': 'test arg', 'type': 'str'}}" in agent.backstory
|
||||
|
||||
def test_interpolate_only_error_handling():
|
||||
agent = Agent(
|
||||
role="{topic} specialist",
|
||||
goal="Figure {goal} out",
|
||||
backstory="I am the master of {role}",
|
||||
)
|
||||
|
||||
# Test empty input string
|
||||
with pytest.raises(ValueError, match="Input string cannot be None or empty"):
|
||||
agent._interpolate_only("", {"topic": "AI"})
|
||||
|
||||
# Test empty inputs dictionary
|
||||
with pytest.raises(ValueError, match="Inputs dictionary cannot be empty"):
|
||||
agent._interpolate_only("test {topic}", {})
|
||||
|
||||
# Test missing template variable
|
||||
with pytest.raises(KeyError, match="Missing required template variable"):
|
||||
agent._interpolate_only("test {missing}", {"topic": "AI"})
|
||||
|
||||
def test_agent_with_all_llm_attributes():
|
||||
agent = Agent(
|
||||
role="test role",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Test Flow creation and execution basic functionality."""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import pytest
|
||||
@@ -620,3 +621,35 @@ def test_stateless_flow_event_emission():
|
||||
== "Deeds will not be less valiant because they are unpraised."
|
||||
)
|
||||
assert isinstance(event_log[5].timestamp, datetime)
|
||||
|
||||
|
||||
def test_parallel_flow():
|
||||
"""Test a flow where multiple listeners execute in parallel."""
|
||||
execution_order = []
|
||||
execution_times = {}
|
||||
|
||||
class ParallelFlow(Flow):
|
||||
@start()
|
||||
def start_method(self):
|
||||
execution_order.append("start")
|
||||
return "start"
|
||||
|
||||
@listen(start_method)
|
||||
async def parallel_1(self):
|
||||
await asyncio.sleep(0.1)
|
||||
execution_times["parallel_1"] = time.time()
|
||||
execution_order.append("parallel_1")
|
||||
|
||||
@listen(start_method)
|
||||
async def parallel_2(self):
|
||||
await asyncio.sleep(0.1)
|
||||
execution_times["parallel_2"] = time.time()
|
||||
execution_order.append("parallel_2")
|
||||
|
||||
flow = ParallelFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert "start" in execution_order
|
||||
assert "parallel_1" in execution_order
|
||||
assert "parallel_2" in execution_order
|
||||
assert abs(execution_times["parallel_1"] - execution_times["parallel_2"]) < 0.05
|
||||
|
||||
Reference in New Issue
Block a user