mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 04:18:35 +00:00
Compare commits
9 Commits
1.6.1
...
brandon/cr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ef5cc37a9 | ||
|
|
ba8fbed30a | ||
|
|
abfd121f99 | ||
|
|
72f0b600b8 | ||
|
|
a028566bd6 | ||
|
|
3a266d6b40 | ||
|
|
a4a14df72e | ||
|
|
8664f3912b | ||
|
|
d67c12a5a3 |
@@ -4,6 +4,7 @@ import click
|
||||
import pkg_resources
|
||||
|
||||
from crewai.cli.create_crew import create_crew
|
||||
from crewai.cli.create_flow import create_flow
|
||||
from crewai.cli.create_pipeline import create_pipeline
|
||||
from crewai.memory.storage.kickoff_task_outputs_storage import (
|
||||
KickoffTaskOutputsSQLiteStorage,
|
||||
@@ -25,19 +26,20 @@ def crewai():
|
||||
|
||||
|
||||
@crewai.command()
|
||||
@click.argument("type", type=click.Choice(["crew", "pipeline"]))
|
||||
@click.argument("type", type=click.Choice(["crew", "pipeline", "flow"]))
|
||||
@click.argument("name")
|
||||
@click.option(
|
||||
"--router", is_flag=True, help="Create a pipeline with router functionality"
|
||||
)
|
||||
def create(type, name, router):
|
||||
"""Create a new crew or pipeline."""
|
||||
def create(type, name):
|
||||
"""Create a new crew, pipeline, or flow."""
|
||||
if type == "crew":
|
||||
create_crew(name)
|
||||
elif type == "pipeline":
|
||||
create_pipeline(name, router)
|
||||
create_pipeline(name)
|
||||
elif type == "flow":
|
||||
create_flow(name)
|
||||
else:
|
||||
click.secho("Error: Invalid type. Must be 'crew' or 'pipeline'.", fg="red")
|
||||
click.secho(
|
||||
"Error: Invalid type. Must be 'crew', 'pipeline', or 'flow'.", fg="red"
|
||||
)
|
||||
|
||||
|
||||
@crewai.command()
|
||||
|
||||
86
src/crewai/cli/create_flow.py
Normal file
86
src/crewai/cli/create_flow.py
Normal file
@@ -0,0 +1,86 @@
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
|
||||
def create_flow(name):
|
||||
"""Create a new flow."""
|
||||
folder_name = name.replace(" ", "_").replace("-", "_").lower()
|
||||
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
|
||||
|
||||
click.secho(f"Creating flow {folder_name}...", fg="green", bold=True)
|
||||
|
||||
project_root = Path(folder_name)
|
||||
if project_root.exists():
|
||||
click.secho(f"Error: Folder {folder_name} already exists.", fg="red")
|
||||
return
|
||||
|
||||
# Create directory structure
|
||||
(project_root / "src" / folder_name).mkdir(parents=True)
|
||||
(project_root / "src" / folder_name / "crews").mkdir(parents=True)
|
||||
(project_root / "src" / folder_name / "tools").mkdir(parents=True)
|
||||
(project_root / "tests").mkdir(exist_ok=True)
|
||||
|
||||
# Create .env file
|
||||
with open(project_root / ".env", "w") as file:
|
||||
file.write("OPENAI_API_KEY=YOUR_API_KEY")
|
||||
|
||||
package_dir = Path(__file__).parent
|
||||
templates_dir = package_dir / "templates" / "flow"
|
||||
|
||||
# List of template files to copy
|
||||
root_template_files = [".gitignore", "pyproject.toml", "README.md"]
|
||||
src_template_files = ["__init__.py", "main.py"]
|
||||
tools_template_files = ["tools/__init__.py", "tools/custom_tool.py"]
|
||||
|
||||
crew_folders = [
|
||||
"poem_crew",
|
||||
]
|
||||
|
||||
def process_file(src_file, dst_file):
|
||||
with open(src_file, "r") as file:
|
||||
content = file.read()
|
||||
|
||||
content = content.replace("{{name}}", name)
|
||||
content = content.replace("{{flow_name}}", class_name)
|
||||
content = content.replace("{{folder_name}}", folder_name)
|
||||
|
||||
with open(dst_file, "w") as file:
|
||||
file.write(content)
|
||||
|
||||
# Copy and process root template files
|
||||
for file_name in root_template_files:
|
||||
src_file = templates_dir / file_name
|
||||
dst_file = project_root / file_name
|
||||
process_file(src_file, dst_file)
|
||||
|
||||
# Copy and process src template files
|
||||
for file_name in src_template_files:
|
||||
src_file = templates_dir / file_name
|
||||
dst_file = project_root / "src" / folder_name / file_name
|
||||
process_file(src_file, dst_file)
|
||||
|
||||
# Copy tools files
|
||||
for file_name in tools_template_files:
|
||||
src_file = templates_dir / file_name
|
||||
dst_file = project_root / "src" / folder_name / file_name
|
||||
process_file(src_file, dst_file)
|
||||
|
||||
# Copy crew folders
|
||||
for crew_folder in crew_folders:
|
||||
src_crew_folder = templates_dir / "crews" / crew_folder
|
||||
dst_crew_folder = project_root / "src" / folder_name / "crews" / crew_folder
|
||||
if src_crew_folder.exists():
|
||||
for src_file in src_crew_folder.rglob("*"):
|
||||
if src_file.is_file():
|
||||
relative_path = src_file.relative_to(src_crew_folder)
|
||||
dst_file = dst_crew_folder / relative_path
|
||||
dst_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
process_file(src_file, dst_file)
|
||||
else:
|
||||
click.secho(
|
||||
f"Warning: Crew folder {crew_folder} not found in template.",
|
||||
fg="yellow",
|
||||
)
|
||||
|
||||
click.secho(f"Flow {name} created successfully!", fg="green", bold=True)
|
||||
2
src/crewai/cli/templates/flow/.gitignore
vendored
Normal file
2
src/crewai/cli/templates/flow/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
.env
|
||||
__pycache__/
|
||||
57
src/crewai/cli/templates/flow/README.md
Normal file
57
src/crewai/cli/templates/flow/README.md
Normal file
@@ -0,0 +1,57 @@
|
||||
# {{crew_name}} Crew
|
||||
|
||||
Welcome to the {{crew_name}} Crew project, powered by [crewAI](https://crewai.com). This template is designed to help you set up a multi-agent AI system with ease, leveraging the powerful and flexible framework provided by crewAI. Our goal is to enable your agents to collaborate effectively on complex tasks, maximizing their collective intelligence and capabilities.
|
||||
|
||||
## Installation
|
||||
|
||||
Ensure you have Python >=3.10 <=3.13 installed on your system. This project uses [Poetry](https://python-poetry.org/) for dependency management and package handling, offering a seamless setup and execution experience.
|
||||
|
||||
First, if you haven't already, install Poetry:
|
||||
|
||||
```bash
|
||||
pip install poetry
|
||||
```
|
||||
|
||||
Next, navigate to your project directory and install the dependencies:
|
||||
|
||||
1. First lock the dependencies and then install them:
|
||||
|
||||
```bash
|
||||
crewai install
|
||||
```
|
||||
|
||||
### Customizing
|
||||
|
||||
**Add your `OPENAI_API_KEY` into the `.env` file**
|
||||
|
||||
- Modify `src/{{folder_name}}/config/agents.yaml` to define your agents
|
||||
- Modify `src/{{folder_name}}/config/tasks.yaml` to define your tasks
|
||||
- Modify `src/{{folder_name}}/crew.py` to add your own logic, tools and specific args
|
||||
- Modify `src/{{folder_name}}/main.py` to add custom inputs for your agents and tasks
|
||||
|
||||
## Running the Project
|
||||
|
||||
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
|
||||
|
||||
```bash
|
||||
crewai run
|
||||
```
|
||||
|
||||
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
|
||||
|
||||
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
|
||||
|
||||
## Understanding Your Crew
|
||||
|
||||
The {{name}} Crew is composed of multiple AI agents, each with unique roles, goals, and tools. These agents collaborate on a series of tasks, defined in `config/tasks.yaml`, leveraging their collective skills to achieve complex objectives. The `config/agents.yaml` file outlines the capabilities and configurations of each agent in your crew.
|
||||
|
||||
## Support
|
||||
|
||||
For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI.
|
||||
|
||||
- Visit our [documentation](https://docs.crewai.com)
|
||||
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
|
||||
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb)
|
||||
- [Chat with our docs](https://chatg.pt/DWjSBZn)
|
||||
|
||||
Let's create wonders together with the power and simplicity of crewAI.
|
||||
0
src/crewai/cli/templates/flow/__init__.py
Normal file
0
src/crewai/cli/templates/flow/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
poem_writer:
|
||||
role: >
|
||||
CrewAI Poem Writer
|
||||
goal: >
|
||||
Generate a funny, light heartedpoem about how CrewAI
|
||||
is awesome with a sentence count of {sentence_count}
|
||||
backstory: >
|
||||
You're a creative poet with a talent for capturing the essence of any topic
|
||||
in a beautiful and engaging way. Known for your ability to craft poems that
|
||||
resonate with readers, you bring a unique perspective and artistic flair to
|
||||
every piece you write.
|
||||
@@ -0,0 +1,7 @@
|
||||
write_poem:
|
||||
description: >
|
||||
Write a poem about how CrewAI is awesome.
|
||||
Ensure the poem is engaging and adheres to the specified sentence count of {sentence_count}.
|
||||
expected_output: >
|
||||
A beautifully crafted poem about CrewAI, with exactly {sentence_count} sentences.
|
||||
agent: poem_writer
|
||||
31
src/crewai/cli/templates/flow/crews/poem_crew/poem_crew.py
Normal file
31
src/crewai/cli/templates/flow/crews/poem_crew/poem_crew.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from crewai import Agent, Crew, Process, Task
|
||||
from crewai.project import CrewBase, agent, crew, task
|
||||
|
||||
@CrewBase
|
||||
class PoemCrew():
|
||||
"""Poem Crew"""
|
||||
|
||||
agents_config = 'config/agents.yaml'
|
||||
tasks_config = 'config/tasks.yaml'
|
||||
|
||||
@agent
|
||||
def poem_writer(self) -> Agent:
|
||||
return Agent(
|
||||
config=self.agents_config['poem_writer'],
|
||||
)
|
||||
|
||||
@task
|
||||
def write_poem(self) -> Task:
|
||||
return Task(
|
||||
config=self.tasks_config['write_poem'],
|
||||
)
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
"""Creates the Research Crew"""
|
||||
return Crew(
|
||||
agents=self.agents, # Automatically created by the @agent decorator
|
||||
tasks=self.tasks, # Automatically created by the @task decorator
|
||||
process=Process.sequential,
|
||||
verbose=True,
|
||||
)
|
||||
55
src/crewai/cli/templates/flow/main.py
Normal file
55
src/crewai/cli/templates/flow/main.py
Normal file
@@ -0,0 +1,55 @@
|
||||
#!/usr/bin/env python
|
||||
import asyncio
|
||||
from random import randint
|
||||
|
||||
from pydantic import BaseModel
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from .crews.poem_crew.poem_crew import PoemCrew
|
||||
|
||||
class PoemState(BaseModel):
|
||||
sentence_count: int = 1
|
||||
poem: str = ""
|
||||
|
||||
class PoemFlow(Flow[PoemState]):
|
||||
initial_state = PoemState
|
||||
|
||||
@start()
|
||||
def generate_sentence_count(self):
|
||||
print("Generating sentence count")
|
||||
# Generate a number between 1 and 5
|
||||
self.state.sentence_count = randint(1, 5)
|
||||
|
||||
@listen(generate_sentence_count)
|
||||
def generate_poem(self):
|
||||
print("Generating poem")
|
||||
print(f"State before poem: {self.state}")
|
||||
poem_crew = PoemCrew().crew()
|
||||
result = poem_crew.kickoff(inputs={"sentence_count": self.state.sentence_count})
|
||||
|
||||
print("Poem generated", result.raw)
|
||||
self.state.poem = result.raw
|
||||
|
||||
print(f"State after generate_poem: {self.state}")
|
||||
|
||||
@listen(generate_poem)
|
||||
def save_poem(self):
|
||||
print("Saving poem")
|
||||
print(f"State before save_poem: {self.state}")
|
||||
with open("poem.txt", "w") as f:
|
||||
f.write(self.state.poem)
|
||||
print(f"State after save_poem: {self.state}")
|
||||
|
||||
async def run():
|
||||
"""
|
||||
Run the flow.
|
||||
"""
|
||||
poem_flow = PoemFlow()
|
||||
await poem_flow.kickoff()
|
||||
|
||||
|
||||
def main():
|
||||
asyncio.run(run())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
18
src/crewai/cli/templates/flow/pyproject.toml
Normal file
18
src/crewai/cli/templates/flow/pyproject.toml
Normal file
@@ -0,0 +1,18 @@
|
||||
[tool.poetry]
|
||||
name = "{{folder_name}}"
|
||||
version = "0.1.0"
|
||||
description = "{{name}} using crewAI"
|
||||
authors = ["Your Name <you@example.com>"]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.10,<=3.13"
|
||||
crewai = { extras = ["tools"], version = ">=0.55.2,<1.0.0" }
|
||||
asyncio = "*"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
{{folder_name}} = "{{folder_name}}.main:main"
|
||||
run_crew = "{{folder_name}}.main:main"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
0
src/crewai/cli/templates/flow/tools/__init__.py
Normal file
0
src/crewai/cli/templates/flow/tools/__init__.py
Normal file
12
src/crewai/cli/templates/flow/tools/custom_tool.py
Normal file
12
src/crewai/cli/templates/flow/tools/custom_tool.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from crewai_tools import BaseTool
|
||||
|
||||
|
||||
class MyCustomTool(BaseTool):
|
||||
name: str = "Name of my tool"
|
||||
description: str = (
|
||||
"Clear description for what this tool is useful for, you agent will need this information to use it."
|
||||
)
|
||||
|
||||
def _run(self, argument: str) -> str:
|
||||
# Implementation goes here
|
||||
return "this is an example of a tool output, ignore it and move along."
|
||||
@@ -41,6 +41,14 @@ class CrewOutput(BaseModel):
|
||||
output_dict.update(self.pydantic.model_dump())
|
||||
return output_dict
|
||||
|
||||
def __getitem__(self, key):
|
||||
if self.pydantic and hasattr(self.pydantic, key):
|
||||
return getattr(self.pydantic, key)
|
||||
elif self.json_dict and key in self.json_dict:
|
||||
return self.json_dict[key]
|
||||
else:
|
||||
raise KeyError(f"Key '{key}' not found in CrewOutput.")
|
||||
|
||||
def __str__(self):
|
||||
if self.pydantic:
|
||||
return str(self.pydantic)
|
||||
|
||||
1
src/crewai/flow/decorators.py
Normal file
1
src/crewai/flow/decorators.py
Normal file
@@ -0,0 +1 @@
|
||||
# TODO:
|
||||
13
src/crewai/flow/examples/context.py
Normal file
13
src/crewai/flow/examples/context.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from crewai.flows import Flow, end_job, start_job # type: ignore
|
||||
|
||||
|
||||
class SimpleFlow(Flow):
|
||||
|
||||
@start_job()
|
||||
async def research(self, topic: str) -> str:
|
||||
print(f"Researching {topic}...")
|
||||
return f"Full report on {topic}..."
|
||||
|
||||
@end_job("research")
|
||||
async def write_post(self, report: str) -> str:
|
||||
return f"Post written: {report}"
|
||||
17
src/crewai/flow/examples/longer.py
Normal file
17
src/crewai/flow/examples/longer.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from crewai.flows import Flow, end_job, job, start_job # type: ignore
|
||||
|
||||
|
||||
class LongerFlow(Flow):
|
||||
|
||||
@start_job()
|
||||
async def research(self, topic: str) -> str:
|
||||
print(f"Researching {topic}...")
|
||||
return f"Full report on {topic}..."
|
||||
|
||||
@job("research")
|
||||
async def edit_report(self, report: str) -> str:
|
||||
return f"Edited report: {report}"
|
||||
|
||||
@end_job("edit_report")
|
||||
async def write_post(self, report: str) -> str:
|
||||
return f"Post written: {report}"
|
||||
22
src/crewai/flow/examples/router.py
Normal file
22
src/crewai/flow/examples/router.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from typing import Tuple
|
||||
|
||||
from crewai.flows import Flow, end_job, router, start_job # type: ignore
|
||||
|
||||
|
||||
class RouterFlow(Flow):
|
||||
|
||||
@start_job()
|
||||
@router()
|
||||
async def classify_email(self, report: str) -> Tuple[str, str]:
|
||||
if "urgent" in report:
|
||||
return "urgent", report
|
||||
|
||||
return "normal", report
|
||||
|
||||
@end_job("urgent")
|
||||
async def write_urgent_email(self, report: str) -> str:
|
||||
return f"Urgent Email Response: {report}"
|
||||
|
||||
@end_job("normal")
|
||||
async def write_normal_email(self, report: str) -> str:
|
||||
return f"Normal Email Response: {report}"
|
||||
13
src/crewai/flow/examples/simple.py
Normal file
13
src/crewai/flow/examples/simple.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from crewai.flows import Flow, end_job, start_job # type: ignore
|
||||
|
||||
|
||||
class SimpleFlow(Flow):
|
||||
|
||||
@start_job()
|
||||
async def research(self, topic: str) -> str:
|
||||
print(f"Researching {topic}...")
|
||||
return f"Full report on {topic}..."
|
||||
|
||||
@end_job("research")
|
||||
async def write_post(self, report: str) -> str:
|
||||
return f"Post written: {report}"
|
||||
19
src/crewai/flow/examples/simple_with_crew.py
Normal file
19
src/crewai/flow/examples/simple_with_crew.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from crewai.flows import Flow, end_job, job, start_job # type: ignore
|
||||
|
||||
|
||||
class SimpleFlow(Flow):
|
||||
|
||||
@start_job()
|
||||
async def research_crew(self, topic: str) -> str:
|
||||
result = research_crew.kickoff(inputs={topic: topic})
|
||||
return result.raw
|
||||
|
||||
@job("research_crew")
|
||||
async def create_x_post(self, research: str) -> str:
|
||||
result = x_post_crew.kickoff(inputs={research: research})
|
||||
return result.raw
|
||||
|
||||
@end_job("research")
|
||||
async def post_to_x(self, post: str) -> None:
|
||||
# TODO: Post to X
|
||||
return None
|
||||
234
src/crewai/flow/flow.py
Normal file
234
src/crewai/flow/flow.py
Normal file
@@ -0,0 +1,234 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
# TODO: Allow people to pass results from one method to another and not just state
|
||||
|
||||
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
|
||||
|
||||
|
||||
def start():
|
||||
def decorator(func):
|
||||
print(f"[start decorator] Decorating start method: {func.__name__}")
|
||||
func.__is_start_method__ = True
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def listen(condition):
|
||||
def decorator(func):
|
||||
print(
|
||||
f"[listen decorator] Decorating listener: {func.__name__} with condition: {condition}"
|
||||
)
|
||||
if isinstance(condition, str):
|
||||
func.__trigger_methods__ = [condition]
|
||||
func.__condition_type__ = "OR"
|
||||
print(
|
||||
f"[listen decorator] Set __trigger_methods__ for {func.__name__}: [{condition}] with mode: OR"
|
||||
)
|
||||
elif (
|
||||
isinstance(condition, dict)
|
||||
and "type" in condition
|
||||
and "methods" in condition
|
||||
):
|
||||
func.__trigger_methods__ = condition["methods"]
|
||||
func.__condition_type__ = condition["type"]
|
||||
print(
|
||||
f"[listen decorator] Set __trigger_methods__ for {func.__name__}: {func.__trigger_methods__} with mode: {func.__condition_type__}"
|
||||
)
|
||||
elif callable(condition) and hasattr(condition, "__name__"):
|
||||
func.__trigger_methods__ = [condition.__name__]
|
||||
func.__condition_type__ = "OR"
|
||||
print(
|
||||
f"[listen decorator] Set __trigger_methods__ for {func.__name__}: [{condition.__name__}] with mode: OR"
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Condition must be a method, string, or a result of or_() or and_()"
|
||||
)
|
||||
return func
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def or_(*conditions):
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
methods.extend(condition["methods"])
|
||||
elif callable(condition) and hasattr(condition, "__name__"):
|
||||
methods.append(condition.__name__)
|
||||
elif isinstance(condition, str):
|
||||
methods.append(condition)
|
||||
else:
|
||||
raise ValueError("Invalid condition in or_()")
|
||||
return {"type": "OR", "methods": methods}
|
||||
|
||||
|
||||
def and_(*conditions):
|
||||
methods = []
|
||||
for condition in conditions:
|
||||
if isinstance(condition, dict) and "methods" in condition:
|
||||
methods.extend(condition["methods"])
|
||||
elif callable(condition) and hasattr(condition, "__name__"):
|
||||
methods.append(condition.__name__)
|
||||
elif isinstance(condition, str):
|
||||
methods.append(condition)
|
||||
else:
|
||||
raise ValueError("Invalid condition in and_()")
|
||||
return {"type": "AND", "methods": methods}
|
||||
|
||||
|
||||
class FlowMeta(type):
|
||||
def __new__(mcs, name, bases, dct):
|
||||
cls = super().__new__(mcs, name, bases, dct)
|
||||
|
||||
start_methods = []
|
||||
listeners = {}
|
||||
|
||||
print(f"[FlowMeta] Processing class: {name}")
|
||||
for attr_name, attr_value in dct.items():
|
||||
print(f"[FlowMeta] Checking attribute: {attr_name}")
|
||||
if hasattr(attr_value, "__is_start_method__"):
|
||||
print(f"[FlowMeta] Found start method: {attr_name}")
|
||||
start_methods.append(attr_name)
|
||||
if hasattr(attr_value, "__trigger_methods__"):
|
||||
methods = attr_value.__trigger_methods__
|
||||
condition_type = getattr(attr_value, "__condition_type__", "OR")
|
||||
print(f"[FlowMeta] Conditions for {attr_name}:", methods)
|
||||
listeners[attr_name] = (condition_type, methods)
|
||||
|
||||
setattr(cls, "_start_methods", start_methods)
|
||||
setattr(cls, "_listeners", listeners)
|
||||
|
||||
print("[FlowMeta] ALL LISTENERS:", listeners)
|
||||
print("[FlowMeta] START METHODS:", start_methods)
|
||||
|
||||
return cls
|
||||
|
||||
|
||||
class Flow(Generic[T], metaclass=FlowMeta):
|
||||
_start_methods: List[str] = []
|
||||
_listeners: Dict[str, tuple[str, List[str]]] = {}
|
||||
initial_state: Union[Type[T], T, None] = None
|
||||
|
||||
def __class_getitem__(cls, item):
|
||||
print(f"[Flow.__class_getitem__] Getting initial state type: {item}")
|
||||
class _FlowGeneric(cls):
|
||||
_initial_state_T = item
|
||||
return _FlowGeneric
|
||||
|
||||
def __init__(self):
|
||||
print("[Flow.__init__] Initializing Flow")
|
||||
self._methods: Dict[str, Callable] = {}
|
||||
self._state = self._create_initial_state()
|
||||
self._completed_methods: Set[str] = set()
|
||||
self._pending_and_listeners: Dict[str, Set[str]] = {}
|
||||
|
||||
for method_name in dir(self):
|
||||
if callable(getattr(self, method_name)) and not method_name.startswith(
|
||||
"__"
|
||||
):
|
||||
print(f"[Flow.__init__] Adding method: {method_name}")
|
||||
self._methods[method_name] = getattr(self, method_name)
|
||||
|
||||
print("[Flow.__init__] All methods:", self._methods.keys())
|
||||
print("[Flow.__init__] Listeners:", self._listeners)
|
||||
|
||||
def _create_initial_state(self) -> T:
|
||||
print("[Flow._create_initial_state] Creating initial state")
|
||||
if self.initial_state is None and hasattr(self, "_initial_state_T"):
|
||||
return self._initial_state_T()
|
||||
elif self.initial_state is None:
|
||||
return {} # type: ignore
|
||||
elif isinstance(self.initial_state, type):
|
||||
return self.initial_state()
|
||||
else:
|
||||
return self.initial_state
|
||||
|
||||
@property
|
||||
def state(self) -> T:
|
||||
return self._state
|
||||
|
||||
async def kickoff(self):
|
||||
print("[Flow.kickoff] Starting kickoff")
|
||||
if not self._start_methods:
|
||||
raise ValueError("No start method defined")
|
||||
|
||||
for start_method in self._start_methods:
|
||||
print(f"[Flow.kickoff] Executing start method: {start_method}")
|
||||
result = await self._execute_method(self._methods[start_method])
|
||||
print(
|
||||
f"[Flow.kickoff] Start method {start_method} completed. Executing listeners."
|
||||
)
|
||||
await self._execute_listeners(start_method, result)
|
||||
|
||||
async def _execute_method(self, method: Callable, *args, **kwargs):
|
||||
print(f"[Flow._execute_method] Executing method: {method.__name__}")
|
||||
if inspect.iscoroutinefunction(method):
|
||||
return await method(*args, **kwargs)
|
||||
else:
|
||||
return method(*args, **kwargs)
|
||||
|
||||
async def _execute_listeners(self, trigger_method: str, result: Any):
|
||||
print(
|
||||
f"[Flow._execute_listeners] Executing listeners for trigger method: {trigger_method}"
|
||||
)
|
||||
listener_tasks = []
|
||||
for listener, (condition_type, methods) in self._listeners.items():
|
||||
print(
|
||||
f"[Flow._execute_listeners] Checking listener: {listener}, condition: {condition_type}, methods: {methods}"
|
||||
)
|
||||
if condition_type == "OR":
|
||||
if trigger_method in methods:
|
||||
print(
|
||||
f"[Flow._execute_listeners] TRIGGERING METHOD: {listener} due to trigger: {trigger_method}"
|
||||
)
|
||||
listener_tasks.append(
|
||||
self._execute_single_listener(listener, result)
|
||||
)
|
||||
elif condition_type == "AND":
|
||||
if listener not in self._pending_and_listeners:
|
||||
self._pending_and_listeners[listener] = set()
|
||||
self._pending_and_listeners[listener].add(trigger_method)
|
||||
if set(methods) == self._pending_and_listeners[listener]:
|
||||
print(
|
||||
f"[Flow._execute_listeners] All conditions met for listener: {listener}. Executing."
|
||||
)
|
||||
listener_tasks.append(
|
||||
self._execute_single_listener(listener, result)
|
||||
)
|
||||
del self._pending_and_listeners[listener]
|
||||
|
||||
# Run all listener tasks concurrently and wait for them to complete
|
||||
print(
|
||||
f"[Flow._execute_listeners] Executing {len(listener_tasks)} listener tasks"
|
||||
)
|
||||
await asyncio.gather(*listener_tasks)
|
||||
|
||||
async def _execute_single_listener(self, listener: str, result: Any):
|
||||
print(f"[Flow._execute_single_listener] Executing listener: {listener}")
|
||||
try:
|
||||
method = self._methods[listener]
|
||||
sig = inspect.signature(method)
|
||||
if len(sig.parameters) > 1: # More than just 'self'
|
||||
print(
|
||||
f"[Flow._execute_single_listener] Executing {listener} with result"
|
||||
)
|
||||
listener_result = await self._execute_method(method, result)
|
||||
else:
|
||||
print(
|
||||
f"[Flow._execute_single_listener] Executing {listener} without result"
|
||||
)
|
||||
listener_result = await self._execute_method(method)
|
||||
print(
|
||||
f"[Flow._execute_single_listener] {listener} completed, executing its listeners"
|
||||
)
|
||||
await self._execute_listeners(listener, listener_result)
|
||||
except Exception as e:
|
||||
print(
|
||||
f"[Flow._execute_single_listener] Error in method {listener}: {str(e)}"
|
||||
)
|
||||
46
src/crewai/flow/structured_test_flow.py
Normal file
46
src/crewai/flow/structured_test_flow.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import asyncio
|
||||
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
counter: int = 0
|
||||
message: str = ""
|
||||
|
||||
|
||||
class StructuredExampleFlow(Flow[ExampleState]):
|
||||
|
||||
@start()
|
||||
async def start_method(self):
|
||||
print("Starting the structured flow")
|
||||
print(f"State in start_method: {self.state}")
|
||||
self.state.message = "Hello from structured flow"
|
||||
print(f"State after start_method: {self.state}")
|
||||
return "Start result"
|
||||
|
||||
@listen(start_method)
|
||||
async def second_method(self, result):
|
||||
print(f"Second method, received: {result}")
|
||||
print(f"State before increment: {self.state}")
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated"
|
||||
print(f"State after second_method: {self.state}")
|
||||
return "Second result"
|
||||
|
||||
@listen(start_method)
|
||||
async def third_method(self, result):
|
||||
print(f"Third method, received: {result}")
|
||||
print(f"State before increment: {self.state}")
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated"
|
||||
print(f"State after third_method: {self.state}")
|
||||
return "Third result"
|
||||
|
||||
|
||||
async def main():
|
||||
flow = StructuredExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
42
src/crewai/flow/structured_test_flow_and.py
Normal file
42
src/crewai/flow/structured_test_flow_and.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import asyncio
|
||||
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
counter: int = 0
|
||||
message: str = ""
|
||||
|
||||
|
||||
class StructuredExampleFlow(Flow):
|
||||
initial_state = ExampleState
|
||||
|
||||
@start()
|
||||
async def start_method(self):
|
||||
print("Starting the structured flow")
|
||||
print(f"State in start_method: {self.state}")
|
||||
self.state.message = "Hello from structured flow"
|
||||
print(f"State after start_method: {self.state}")
|
||||
return "Start result"
|
||||
|
||||
@listen(start_method)
|
||||
async def second_method(self):
|
||||
print(f"State before increment: {self.state}")
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated"
|
||||
print(f"State after second_method: {self.state}")
|
||||
return "Second result"
|
||||
|
||||
@listen(start_method and second_method)
|
||||
async def logger(self):
|
||||
print("AND METHOD RUNNING")
|
||||
print("CURRENT STATE FROM OR: ", self.state)
|
||||
|
||||
|
||||
async def main():
|
||||
flow = StructuredExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
47
src/crewai/flow/structured_test_flow_or.py
Normal file
47
src/crewai/flow/structured_test_flow_or.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import asyncio
|
||||
|
||||
from crewai.flow.flow import Flow, and_, listen, or_, start
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ExampleState(BaseModel):
|
||||
counter: int = 0
|
||||
message: str = ""
|
||||
|
||||
|
||||
class StructuredExampleFlow(Flow[ExampleState]):
|
||||
initial_state = ExampleState
|
||||
|
||||
@start()
|
||||
async def start_method(self):
|
||||
print("Starting the structured flow")
|
||||
print(f"State in start_method: {self.state}")
|
||||
self.state.message = "Hello from structured flow"
|
||||
print(f"State after start_method: {self.state}")
|
||||
return "Start result"
|
||||
|
||||
@listen(start_method)
|
||||
async def second_method(self):
|
||||
print(f"State before increment: {self.state}")
|
||||
self.state.counter += 1
|
||||
self.state.message += " - updated"
|
||||
print(f"State after second_method: {self.state}")
|
||||
return "Second result"
|
||||
|
||||
@listen(or_(start_method, second_method))
|
||||
async def logger(self):
|
||||
print("LOGGER METHOD RUNNING")
|
||||
print("CURRENT STATE FROM LOGGER: ", self.state)
|
||||
|
||||
@listen(and_(start_method, second_method))
|
||||
async def and_logger(self):
|
||||
print("AND LOGGER METHOD RUNNING")
|
||||
print("CURRENT STATE FROM AND LOGGER: ", self.state)
|
||||
|
||||
|
||||
async def main():
|
||||
flow = StructuredExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
33
src/crewai/flow/unstructured_test_flow.py
Normal file
33
src/crewai/flow/unstructured_test_flow.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import asyncio
|
||||
|
||||
from crewai.flow.flow import Flow, listen, start
|
||||
|
||||
|
||||
class FlexibleExampleFlow(Flow):
|
||||
@start()
|
||||
def start_method(self):
|
||||
print("Starting the flexible flow")
|
||||
self.state["counter"] = 1
|
||||
return "Start result"
|
||||
|
||||
@listen(start_method)
|
||||
def second_method(self, result):
|
||||
print(f"Second method, received: {result}")
|
||||
self.state["counter"] += 1
|
||||
self.state["message"] = "Hello from flexible flow"
|
||||
return "Second result"
|
||||
|
||||
@listen(second_method)
|
||||
def third_method(self, result):
|
||||
print(f"Third method, received: {result}")
|
||||
print(f"Final counter value: {self.state["counter"]}")
|
||||
print(f"Final message: {self.state["message"]}")
|
||||
return "Third result"
|
||||
|
||||
|
||||
async def main():
|
||||
flow = FlexibleExampleFlow()
|
||||
await flow.kickoff()
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user