Compare commits

..

58 Commits

Author SHA1 Message Date
Brandon Hancock
25e7bb0adf Fix merge conflict 2024-10-01 14:17:18 -04:00
Brandon Hancock
c6cbd39b6d Merge branch 'main' into flow-visualizer 2024-10-01 14:07:57 -04:00
Brandon Hancock
156409196d Fix crewai version in flows 2024-10-01 11:11:17 -04:00
Brandon Hancock
2e7995eaef update CLI and templates 2024-10-01 11:10:32 -04:00
Brandon Hancock
b22568aa6d Refactor to make crews easier to understand 2024-10-01 09:56:14 -04:00
João Moura
f15d5cbb64 remove unnecessary line 2024-09-30 23:21:50 -07:00
João Moura
c8a5a3e32e preparing new version 2024-09-30 23:13:47 -07:00
Brandon Hancock (bhancock_ai)
32fdd11c93 Flow visualizer (#1375)
* Almost working!

* It fully works but not clean enought

* Working but not clean engouth

* Everything is workign

* WIP. Working on adding and & or to flows. In the middle of setting up template for flow as well

* template working

* Everything is working

* More changes and todos

* Add more support for @start

* Router working now

* minor tweak to

* minor tweak to conditions and event handling

* Update logs

* Too trigger happy with cleanup

* Added in Thiago fix

* Flow passing results again

* Working on docs.

* made more progress updates on docs

* Finished talking about controlling flows

* add flow output

* fixed flow output section

* add crews to flows section is looking good now

* more flow doc changes

* Update docs and add more examples

* drop visualizer

* save visualizer

* pyvis is beginning to work

* pyvis working

* it is working

* regular methods and triggers working. Need to work on router next.

* properly identifying router and router children nodes. Need to fix color

* children router working. Need to support loops

* curving cycles but need to add curve conditionals

* everythin is showing up properly need to fix curves

* all working. needs to be cleaned up

* adjust padding

* drop lib

* clean up prior to PR

* incorporate joao feedback

* final tweaks for joao
2024-09-30 20:52:56 -03:00
Brandon Hancock
09bc68078c final tweaks for joao 2024-09-30 19:46:47 -04:00
Brandon Hancock
aa8565149d Merge branch 'main' into flow-visualizer 2024-09-30 19:45:28 -04:00
Brandon Hancock
4e3f393c89 incorporate joao feedback 2024-09-30 19:33:06 -04:00
Brandon Hancock
edc33a1cec clean up prior to PR 2024-09-30 19:08:19 -04:00
Brandon Hancock
6c46326d93 drop lib 2024-09-30 19:02:30 -04:00
Brandon Hancock
40688451ad adjust padding 2024-09-30 18:57:03 -04:00
Brandon Hancock
1a0f96ae03 all working. needs to be cleaned up 2024-09-30 16:00:24 -04:00
João Moura
7f830b4f43 fixing test 2024-09-30 12:01:50 -07:00
Brandon Hancock
b927989c4d everythin is showing up properly need to fix curves 2024-09-30 13:28:32 -04:00
Brandon Hancock
e1c01ae907 curving cycles but need to add curve conditionals 2024-09-30 12:32:47 -04:00
Brandon Hancock
e07b245c83 children router working. Need to support loops 2024-09-30 11:34:57 -04:00
Brandon Hancock
66e7fc5ce3 properly identifying router and router children nodes. Need to fix color 2024-09-30 11:05:46 -04:00
Shreyan Sood
d6c57402cf Update Pipeline.md, fixed typo "=inputs" was repeated. (#1363) 2024-09-28 01:27:43 -03:00
Rip&Tear
42bea00184 flow template copy fix (#1364) 2024-09-28 01:27:17 -03:00
João Moura
5a6b0ff398 temporary dropping excamples 2024-09-27 21:15:52 -03:00
João Moura
1b57bc0c75 preparing for new verison 2024-09-27 20:28:25 -03:00
João Moura
96544009f5 preparing new verion 2024-09-27 20:24:37 -03:00
João Moura
44c8765add fixing tasks order 2024-09-27 20:21:46 -03:00
Brandon Hancock
5d645cd89f regular methods and triggers working. Need to work on router next. 2024-09-27 16:01:07 -04:00
Brandon Hancock (bhancock_ai)
bc31019b67 Brandon/cre 19 workflows (#1347)
Flows
2024-09-27 12:11:17 -03:00
Brandon Hancock
16fabdd4b5 it is working 2024-09-27 00:44:48 -04:00
Brandon Hancock
b0c9cffb88 pyvis working 2024-09-27 00:20:07 -04:00
Brandon Hancock
b7b2cce6c5 pyvis is beginning to work 2024-09-26 22:56:38 -04:00
Brandon Hancock
4dd13e75c9 save visualizer 2024-09-26 11:49:06 -04:00
Brandon Hancock
d13716d29c drop visualizer 2024-09-26 11:47:49 -04:00
Brandon Hancock
128872a482 Update docs and add more examples 2024-09-24 15:41:11 -04:00
Brandon Hancock
bfaba72da2 more flow doc changes 2024-09-23 15:21:54 -04:00
Brandon Hancock
6ba6ac7fcc add crews to flows section is looking good now 2024-09-23 14:58:16 -04:00
Brandon Hancock
50055a814c fixed flow output section 2024-09-23 14:47:11 -04:00
Brandon Hancock
3939d432aa add flow output 2024-09-20 15:27:55 -04:00
Brandon Hancock
734018254d Finished talking about controlling flows 2024-09-20 11:24:53 -04:00
Brandon Hancock
4e68015574 made more progress updates on docs 2024-09-20 11:13:30 -04:00
Brandon Hancock
d63750705c Working on docs. 2024-09-19 16:40:31 -04:00
Brandon Hancock
cbff4bb967 Flow passing results again 2024-09-19 15:56:16 -04:00
Brandon Hancock
a4fad7cafd Added in Thiago fix 2024-09-19 13:32:04 -04:00
Brandon Hancock
f16f7aebdf Too trigger happy with cleanup 2024-09-19 11:48:42 -04:00
Brandon Hancock
92dc95156b Update logs 2024-09-19 11:35:51 -04:00
Brandon Hancock
aa6fa13262 minor tweak to conditions and event handling 2024-09-18 15:57:05 -04:00
Brandon Hancock
abaf8c4d24 minor tweak to 2024-09-18 15:56:54 -04:00
Brandon Hancock
00f355bf88 Merge branch 'main' into brandon/cre-19-workflows 2024-09-16 16:26:34 -04:00
Brandon Hancock
3e48a402ee Router working now 2024-09-16 11:08:30 -04:00
Brandon Hancock
86c1f85edc Add more support for @start 2024-09-16 10:44:01 -04:00
Brandon Hancock
ba8fbed30a More changes and todos 2024-09-13 12:47:43 -04:00
Brandon Hancock
abfd121f99 Everything is working 2024-09-12 16:21:12 -04:00
Brandon Hancock
72f0b600b8 template working 2024-09-12 11:33:46 -04:00
Brandon Hancock
a028566bd6 WIP. Working on adding and & or to flows. In the middle of setting up template for flow as well 2024-09-11 16:26:02 -04:00
Brandon Hancock
3a266d6b40 Everything is workign 2024-09-11 13:01:36 -04:00
Brandon Hancock
a4a14df72e Working but not clean engouth 2024-09-11 11:59:12 -04:00
Brandon Hancock
8664f3912b It fully works but not clean enought 2024-09-11 10:45:24 -04:00
Brandon Hancock
d67c12a5a3 Almost working! 2024-09-11 10:29:54 -04:00
39 changed files with 1942 additions and 429 deletions

3
.gitignore vendored
View File

@@ -2,6 +2,7 @@
.pytest_cache
__pycache__
dist/
lib/
.env
assets/*
.idea
@@ -15,4 +16,4 @@ rc-tests/*
*.pkl
temp/*
.vscode/*
crew_tasks_output.json
crew_tasks_output.json

View File

@@ -248,7 +248,7 @@ main_pipeline = Pipeline(stages=[classification_crew, email_router])
inputs = [{"email": "..."}, {"email": "..."}] # List of email data
main_pipeline.kickoff(inputs=inputs=inputs)
main_pipeline.kickoff(inputs=inputs)
```
In this example, the router decides between an urgent pipeline and a normal pipeline based on the urgency score of the email. If the urgency score is greater than 7, it routes to the urgent pipeline; otherwise, it uses the normal pipeline. If the input doesn't include an urgency score, it defaults to just the classification crew.
@@ -265,4 +265,4 @@ In this example, the router decides between an urgent pipeline and a normal pipe
The `Pipeline` class includes validation mechanisms to ensure the robustness of the pipeline structure:
- Validates that stages contain only Crew instances or lists of Crew instances.
- Prevents double nesting of stages to maintain a clear structure.
- Prevents double nesting of stages to maintain a clear structure.

View File

@@ -58,6 +58,11 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
LLMs
</a>
</li>
<!-- <li>
<a href="./core-concepts/Flows">
Flows
</a>
</li> -->
<li>
<a href="./core-concepts/Pipeline">
Pipeline
@@ -85,7 +90,7 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
</li>
</ul>
</div>
<div style="width:30%">
<div style="width:25%">
<h2>How-To Guides</h2>
<ul>
<li>
@@ -160,7 +165,7 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
</li>
</ul>
</div>
<div style="width:30%">
<!-- <div style="width:25%">
<h2>Examples</h2>
<ul>
<li>
@@ -198,6 +203,26 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
Landing Page Generator
</a>
</li>
<li>
<a target='_blank' href="https://github.com/crewAIInc/crewAI-examples/tree/main/email_auto_responder_flow">
Email Auto Responder Flow
</a>
</li>
<li>
<a target='_blank' href="https://github.com/crewAIInc/crewAI-examples/tree/main/lead-score-flow">
Lead Score Flow
</a>
</li>
<li>
<a target='_blank' href="https://github.com/crewAIInc/crewAI-examples/tree/main/write_a_book_with_flows">
Write a Book Flow
</a>
</li>
<li>
<a target='_blank' href="https://github.com/crewAIInc/crewAI-examples/tree/main/meeting_assistant_flow">
Meeting Assistant Flow
</a>
</li>
</ul>
</div>
</div> -->
</div>

961
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "crewai"
version = "0.64.0"
version = "0.66.0"
description = "Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By fostering collaborative intelligence, CrewAI empowers agents to work together seamlessly, tackling complex tasks."
authors = ["Joao Moura <joao@crewai.com>"]
readme = "README.md"
@@ -32,6 +32,7 @@ json-repair = "^0.25.2"
auth0-python = "^4.7.1"
poetry = "^1.8.3"
litellm = "^1.44.22"
pyvis = "^0.3.2"
[tool.poetry.extras]
tools = ["crewai-tools"]

View File

@@ -1,12 +1,13 @@
import warnings
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.flow.flow import Flow
from crewai.llm import LLM
from crewai.pipeline import Pipeline
from crewai.process import Process
from crewai.routers import Router
from crewai.task import Task
from crewai.llm import LLM
warnings.filterwarnings(
"ignore",
@@ -15,4 +16,4 @@ warnings.filterwarnings(
module="pydantic.main",
)
__all__ = ["Agent", "Crew", "Process", "Task", "Pipeline", "Router", "LLM"]
__all__ = ["Agent", "Crew", "Process", "Task", "Pipeline", "Router", "LLM", "Flow"]

View File

@@ -70,7 +70,10 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
self.log_error_after = 3
self.have_forced_answer = False
self.name_to_tool_map = {tool.name: tool for tool in self.tools}
self.llm.stop = self.stop
if self.llm.stop:
self.llm.stop = list(set(self.llm.stop + self.stop))
else:
self.llm.stop = self.stop
def invoke(self, inputs: Dict[str, str]) -> Dict[str, Any]:
if "system" in self.prompt:

View File

@@ -4,6 +4,7 @@ import click
import pkg_resources
from crewai.cli.create_crew import create_crew
from crewai.cli.create_flow import create_flow
from crewai.cli.create_pipeline import create_pipeline
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
@@ -11,12 +12,14 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
from .authentication.main import AuthenticationCommand
from .deploy.main import DeployCommand
from .tools.main import ToolCommand
from .evaluate_crew import evaluate_crew
from .install_crew import install_crew
from .plot_flow import plot_flow
from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
from .run_crew import run_crew
from .run_flow import run_flow
from .tools.main import ToolCommand
from .train_crew import train_crew
@@ -26,19 +29,20 @@ def crewai():
@crewai.command()
@click.argument("type", type=click.Choice(["crew", "pipeline"]))
@click.argument("type", type=click.Choice(["crew", "pipeline", "flow"]))
@click.argument("name")
@click.option(
"--router", is_flag=True, help="Create a pipeline with router functionality"
)
def create(type, name, router):
"""Create a new crew or pipeline."""
def create(type, name):
"""Create a new crew, pipeline, or flow."""
if type == "crew":
create_crew(name)
elif type == "pipeline":
create_pipeline(name, router)
create_pipeline(name)
elif type == "flow":
create_flow(name)
else:
click.secho("Error: Invalid type. Must be 'crew' or 'pipeline'.", fg="red")
click.secho(
"Error: Invalid type. Must be 'crew', 'pipeline', or 'flow'.", fg="red"
)
@crewai.command()
@@ -271,5 +275,25 @@ def tool_publish(is_public: bool):
tool_cmd.publish(is_public)
@crewai.group()
def flow():
"""Flow related commands."""
pass
@flow.command(name="run")
def flow_run():
"""Run the Flow."""
click.echo("Running the Flow")
run_flow()
@flow.command(name="plot")
def flow_plot():
"""Plot the Flow."""
click.echo("Plotting the Flow")
plot_flow()
if __name__ == "__main__":
crewai()

View File

@@ -0,0 +1,93 @@
from pathlib import Path
import click
def create_flow(name):
"""Create a new flow."""
folder_name = name.replace(" ", "_").replace("-", "_").lower()
class_name = name.replace("_", " ").replace("-", " ").title().replace(" ", "")
click.secho(f"Creating flow {folder_name}...", fg="green", bold=True)
project_root = Path(folder_name)
if project_root.exists():
click.secho(f"Error: Folder {folder_name} already exists.", fg="red")
return
# Create directory structure
(project_root / "src" / folder_name).mkdir(parents=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True)
(project_root / "src" / folder_name / "tools").mkdir(parents=True)
(project_root / "tests").mkdir(exist_ok=True)
# Create .env file
with open(project_root / ".env", "w") as file:
file.write("OPENAI_API_KEY=YOUR_API_KEY")
package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "flow"
# List of template files to copy
root_template_files = [".gitignore", "pyproject.toml", "README.md"]
src_template_files = ["__init__.py", "main.py"]
tools_template_files = ["tools/__init__.py", "tools/custom_tool.py"]
crew_folders = [
"poem_crew",
]
def process_file(src_file, dst_file):
if src_file.suffix in [".pyc", ".pyo", ".pyd"]:
return
try:
with open(src_file, "r", encoding="utf-8") as file:
content = file.read()
except Exception as e:
click.secho(f"Error processing file {src_file}: {e}", fg="red")
return
content = content.replace("{{name}}", name)
content = content.replace("{{flow_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
with open(dst_file, "w") as file:
file.write(content)
# Copy and process root template files
for file_name in root_template_files:
src_file = templates_dir / file_name
dst_file = project_root / file_name
process_file(src_file, dst_file)
# Copy and process src template files
for file_name in src_template_files:
src_file = templates_dir / file_name
dst_file = project_root / "src" / folder_name / file_name
process_file(src_file, dst_file)
# Copy tools files
for file_name in tools_template_files:
src_file = templates_dir / file_name
dst_file = project_root / "src" / folder_name / file_name
process_file(src_file, dst_file)
# Copy crew folders
for crew_folder in crew_folders:
src_crew_folder = templates_dir / "crews" / crew_folder
dst_crew_folder = project_root / "src" / folder_name / "crews" / crew_folder
if src_crew_folder.exists():
for src_file in src_crew_folder.rglob("*"):
if src_file.is_file():
relative_path = src_file.relative_to(src_crew_folder)
dst_file = dst_crew_folder / relative_path
dst_file.parent.mkdir(parents=True, exist_ok=True)
process_file(src_file, dst_file)
else:
click.secho(
f"Warning: Crew folder {crew_folder} not found in template.",
fg="yellow",
)
click.secho(f"Flow {name} created successfully!", fg="green", bold=True)

View File

@@ -0,0 +1,23 @@
import subprocess
import click
def plot_flow() -> None:
"""
Plot the flow by running a command in the Poetry environment.
"""
command = ["poetry", "run", "plot_flow"]
try:
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while plotting the flow: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -0,0 +1,23 @@
import subprocess
import click
def run_flow() -> None:
"""
Run the flow by running a command in the Poetry environment.
"""
command = ["poetry", "run", "run_flow"]
try:
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while running the flow: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -6,7 +6,7 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = ">=0.64.0,<1.0.0" }
crewai = { extras = ["tools"], version = ">=0.66.0,<1.0.0" }
[tool.poetry.scripts]

View File

@@ -0,0 +1,3 @@
.env
__pycache__/
lib/

View File

@@ -0,0 +1,57 @@
# {{crew_name}} Crew
Welcome to the {{crew_name}} Crew project, powered by [crewAI](https://crewai.com). This template is designed to help you set up a multi-agent AI system with ease, leveraging the powerful and flexible framework provided by crewAI. Our goal is to enable your agents to collaborate effectively on complex tasks, maximizing their collective intelligence and capabilities.
## Installation
Ensure you have Python >=3.10 <=3.13 installed on your system. This project uses [Poetry](https://python-poetry.org/) for dependency management and package handling, offering a seamless setup and execution experience.
First, if you haven't already, install Poetry:
```bash
pip install poetry
```
Next, navigate to your project directory and install the dependencies:
1. First lock the dependencies and then install them:
```bash
crewai install
```
### Customizing
**Add your `OPENAI_API_KEY` into the `.env` file**
- Modify `src/{{folder_name}}/config/agents.yaml` to define your agents
- Modify `src/{{folder_name}}/config/tasks.yaml` to define your tasks
- Modify `src/{{folder_name}}/crew.py` to add your own logic, tools and specific args
- Modify `src/{{folder_name}}/main.py` to add custom inputs for your agents and tasks
## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
```bash
crewai run
```
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
## Understanding Your Crew
The {{name}} Crew is composed of multiple AI agents, each with unique roles, goals, and tools. These agents collaborate on a series of tasks, defined in `config/tasks.yaml`, leveraging their collective skills to achieve complex objectives. The `config/agents.yaml` file outlines the capabilities and configurations of each agent in your crew.
## Support
For support, questions, or feedback regarding the {{crew_name}} Crew or crewAI.
- Visit our [documentation](https://docs.crewai.com)
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb)
- [Chat with our docs](https://chatg.pt/DWjSBZn)
Let's create wonders together with the power and simplicity of crewAI.

View File

@@ -0,0 +1,11 @@
poem_writer:
role: >
CrewAI Poem Writer
goal: >
Generate a funny, light heartedpoem about how CrewAI
is awesome with a sentence count of {sentence_count}
backstory: >
You're a creative poet with a talent for capturing the essence of any topic
in a beautiful and engaging way. Known for your ability to craft poems that
resonate with readers, you bring a unique perspective and artistic flair to
every piece you write.

View File

@@ -0,0 +1,7 @@
write_poem:
description: >
Write a poem about how CrewAI is awesome.
Ensure the poem is engaging and adheres to the specified sentence count of {sentence_count}.
expected_output: >
A beautifully crafted poem about CrewAI, with exactly {sentence_count} sentences.
agent: poem_writer

View File

@@ -0,0 +1,31 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
@CrewBase
class PoemCrew():
"""Poem Crew"""
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def poem_writer(self) -> Agent:
return Agent(
config=self.agents_config['poem_writer'],
)
@task
def write_poem(self) -> Task:
return Task(
config=self.tasks_config['write_poem'],
)
@crew
def crew(self) -> Crew:
"""Creates the Research Crew"""
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
)

View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python
import asyncio
from random import randint
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
from .crews.poem_crew.poem_crew import PoemCrew
class PoemState(BaseModel):
sentence_count: int = 1
poem: str = ""
class PoemFlow(Flow[PoemState]):
@start()
def generate_sentence_count(self):
print("Generating sentence count")
# Generate a number between 1 and 5
self.state.sentence_count = randint(1, 5)
@listen(generate_sentence_count)
def generate_poem(self):
print("Generating poem")
print(f"State before poem: {self.state}")
result = PoemCrew().crew().kickoff(inputs={"sentence_count": self.state.sentence_count})
print("Poem generated", result.raw)
self.state.poem = result.raw
print(f"State after generate_poem: {self.state}")
@listen(generate_poem)
def save_poem(self):
print("Saving poem")
print(f"State before save_poem: {self.state}")
with open("poem.txt", "w") as f:
f.write(self.state.poem)
print(f"State after save_poem: {self.state}")
async def run_flow():
"""
Run the flow.
"""
poem_flow = PoemFlow()
await poem_flow.kickoff()
async def plot_flow():
"""
Plot the flow.
"""
poem_flow = PoemFlow()
poem_flow.plot()
def main():
asyncio.run(run_flow())
def plot():
asyncio.run(plot_flow())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,19 @@
[tool.poetry]
name = "{{folder_name}}"
version = "0.1.0"
description = "{{name}} using crewAI"
authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = ">=0.66.0,<1.0.0" }
asyncio = "*"
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:main"
run_flow = "{{folder_name}}.main:main"
plot_flow = "{{folder_name}}.main:plot"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

View File

@@ -0,0 +1,12 @@
from crewai_tools import BaseTool
class MyCustomTool(BaseTool):
name: str = "Name of my tool"
description: str = (
"Clear description for what this tool is useful for, you agent will need this information to use it."
)
def _run(self, argument: str) -> str:
# Implementation goes here
return "this is an example of a tool output, ignore it and move along."

View File

@@ -6,7 +6,7 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = ">=0.64.0,<1.0.0" }
crewai = { extras = ["tools"], version = ">=0.66.0,<1.0.0" }
asyncio = "*"
[tool.poetry.scripts]

View File

@@ -6,7 +6,7 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = ">=0.64.0,<1.0.0" }
crewai = { extras = ["tools"], version = ">=0.66.0,<1.0.0" }
[tool.poetry.scripts]

View File

@@ -41,6 +41,14 @@ class CrewOutput(BaseModel):
output_dict.update(self.pydantic.model_dump())
return output_dict
def __getitem__(self, key):
if self.pydantic and hasattr(self.pydantic, key):
return getattr(self.pydantic, key)
elif self.json_dict and key in self.json_dict:
return self.json_dict[key]
else:
raise KeyError(f"Key '{key}' not found in CrewOutput.")
def __str__(self):
if self.pydantic:
return str(self.pydantic)

View File

@@ -0,0 +1,3 @@
from crewai.flow.flow import Flow
__all__ = ["Flow"]

View File

@@ -0,0 +1,93 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>{{ title }}</title>
<script
src="https://cdnjs.cloudflare.com/ajax/libs/vis-network/9.1.2/dist/vis-network.min.js"
integrity="sha512-LnvoEWDFrqGHlHmDD2101OrLcbsfkrzoSpvtSQtxK3RMnRV0eOkhhBN2dXHKRrUU8p2DGRTk35n4O8nWSVe1mQ=="
crossorigin="anonymous"
referrerpolicy="no-referrer"
></script>
<link
rel="stylesheet"
href="https://cdnjs.cloudflare.com/ajax/libs/vis-network/9.1.2/dist/dist/vis-network.min.css"
integrity="sha512-WgxfT5LWjfszlPHXRmBWHkV2eceiWTOBvrKCNbdgDYTHrT2AeLCGbF4sZlZw3UMN3WtL0tGUoIAKsu8mllg/XA=="
crossorigin="anonymous"
referrerpolicy="no-referrer"
/>
<style type="text/css">
body {
font-family: verdana;
margin: 0;
padding: 0;
}
.container {
display: flex;
flex-direction: column;
height: 100vh;
}
#mynetwork {
flex-grow: 1;
width: 100%;
height: 750px;
background-color: #ffffff;
}
.card {
border: none;
}
.legend-container {
display: flex;
align-items: center;
justify-content: center;
padding: 10px;
background-color: #f8f9fa;
position: fixed; /* Make the legend fixed */
bottom: 0; /* Position it at the bottom */
width: 100%; /* Make it span the full width */
}
.legend-item {
display: flex;
align-items: center;
margin-right: 20px;
}
.legend-color-box {
width: 20px;
height: 20px;
margin-right: 5px;
}
.logo {
height: 50px;
margin-right: 20px;
}
.legend-dashed {
border-bottom: 2px dashed #666666;
width: 20px;
height: 0;
margin-right: 5px;
}
.legend-solid {
border-bottom: 2px solid #666666;
width: 20px;
height: 0;
margin-right: 5px;
}
</style>
</head>
<body>
<div class="container">
<div class="card" style="width: 100%">
<div id="mynetwork" class="card-body"></div>
</div>
<div class="legend-container">
<img
src="data:image/svg+xml;base64,{{ logo_svg_base64 }}"
alt="CrewAI logo"
class="logo"
/>
<!-- LEGEND_ITEMS_PLACEHOLDER -->
</div>
</div>
{{ network_content }}
</body>
</html>

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 27 KiB

46
src/crewai/flow/config.py Normal file
View File

@@ -0,0 +1,46 @@
DARK_GRAY = "#333333"
CREWAI_ORANGE = "#FF5A50"
GRAY = "#666666"
WHITE = "#FFFFFF"
COLORS = {
"bg": WHITE,
"start": CREWAI_ORANGE,
"method": DARK_GRAY,
"router": DARK_GRAY,
"router_border": CREWAI_ORANGE,
"edge": GRAY,
"router_edge": CREWAI_ORANGE,
"text": WHITE,
}
NODE_STYLES = {
"start": {
"color": COLORS["start"],
"shape": "box",
"font": {"color": COLORS["text"]},
"margin": {"top": 10, "bottom": 8, "left": 10, "right": 10},
},
"method": {
"color": COLORS["method"],
"shape": "box",
"font": {"color": COLORS["text"]},
"margin": {"top": 10, "bottom": 8, "left": 10, "right": 10},
},
"router": {
"color": {
"background": COLORS["router"],
"border": COLORS["router_border"],
"highlight": {
"border": COLORS["router_border"],
"background": COLORS["router"],
},
},
"shape": "box",
"font": {"color": COLORS["text"]},
"borderWidth": 3,
"borderWidthSelected": 4,
"shapeProperties": {"borderDashes": [5, 5]},
"margin": {"top": 10, "bottom": 8, "left": 10, "right": 10},
},
}

274
src/crewai/flow/flow.py Normal file
View File

@@ -0,0 +1,274 @@
# flow.py
# flow.py
import asyncio
import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
from pydantic import BaseModel
from crewai.flow.flow_visualizer import plot_flow
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
def start(condition=None):
def decorator(func):
func.__is_start_method__ = True
if condition is not None:
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
else:
raise ValueError(
"Condition must be a method, string, or a result of or_() or and_()"
)
return func
return decorator
def listen(condition):
def decorator(func):
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
else:
raise ValueError(
"Condition must be a method, string, or a result of or_() or and_()"
)
return func
return decorator
def router(method, paths=None):
def decorator(func):
func.__is_router__ = True
func.__router_for__ = method.__name__
if paths:
func.__router_paths__ = paths
return func
return decorator
def or_(*conditions):
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
methods.extend(condition["methods"])
elif isinstance(condition, str):
methods.append(condition)
elif callable(condition):
methods.append(getattr(condition, "__name__", repr(condition)))
else:
raise ValueError("Invalid condition in or_()")
return {"type": "OR", "methods": methods}
def and_(*conditions):
methods = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
methods.extend(condition["methods"])
elif isinstance(condition, str):
methods.append(condition)
elif callable(condition):
methods.append(getattr(condition, "__name__", repr(condition)))
else:
raise ValueError("Invalid condition in and_()")
return {"type": "AND", "methods": methods}
class FlowMeta(type):
def __new__(mcs, name, bases, dct):
cls = super().__new__(mcs, name, bases, dct)
start_methods = []
listeners = {}
routers = {}
router_paths = {}
for attr_name, attr_value in dct.items():
if hasattr(attr_value, "__is_start_method__"):
start_methods.append(attr_name)
if hasattr(attr_value, "__trigger_methods__"):
methods = attr_value.__trigger_methods__
condition_type = getattr(attr_value, "__condition_type__", "OR")
listeners[attr_name] = (condition_type, methods)
elif hasattr(attr_value, "__trigger_methods__"):
methods = attr_value.__trigger_methods__
condition_type = getattr(attr_value, "__condition_type__", "OR")
listeners[attr_name] = (condition_type, methods)
elif hasattr(attr_value, "__is_router__"):
routers[attr_value.__router_for__] = attr_name
if hasattr(attr_value, "__router_paths__"):
router_paths[attr_name] = attr_value.__router_paths__
# **Register router as a listener to its triggering method**
trigger_method_name = attr_value.__router_for__
methods = [trigger_method_name]
condition_type = "OR"
listeners[attr_name] = (condition_type, methods)
setattr(cls, "_start_methods", start_methods)
setattr(cls, "_listeners", listeners)
setattr(cls, "_routers", routers)
setattr(cls, "_router_paths", router_paths)
return cls
class Flow(Generic[T], metaclass=FlowMeta):
_start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {}
_routers: Dict[str, str] = {}
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
def __class_getitem__(cls, item):
class _FlowGeneric(cls):
_initial_state_T = item
return _FlowGeneric
def __init__(self):
self._methods: Dict[str, Callable] = {}
self._state = self._create_initial_state()
self._completed_methods: Set[str] = set()
self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs
for method_name in dir(self):
if callable(getattr(self, method_name)) and not method_name.startswith(
"__"
):
self._methods[method_name] = getattr(self, method_name)
def _create_initial_state(self) -> T:
if self.initial_state is None and hasattr(self, "_initial_state_T"):
return self._initial_state_T() # type: ignore
if self.initial_state is None:
return {} # type: ignore
elif isinstance(self.initial_state, type):
return self.initial_state()
else:
return self.initial_state
@property
def state(self) -> T:
return self._state
@property
def method_outputs(self) -> List[Any]:
"""Returns the list of all outputs from executed methods."""
return self._method_outputs
async def kickoff(self) -> Any:
if not self._start_methods:
raise ValueError("No start method defined")
# Create tasks for all start methods
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
]
# Run all start methods concurrently
await asyncio.gather(*tasks)
# Return the final output (from the last executed method)
if self._method_outputs:
return self._method_outputs[-1]
else:
return None # Or raise an exception if no methods were executed
async def _execute_start_method(self, start_method: str):
result = await self._execute_method(self._methods[start_method])
await self._execute_listeners(start_method, result)
async def _execute_method(self, method: Callable, *args, **kwargs):
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
self._method_outputs.append(result) # Store the output
return result
async def _execute_listeners(self, trigger_method: str, result: Any):
listener_tasks = []
if trigger_method in self._routers:
router_method = self._methods[self._routers[trigger_method]]
path = await self._execute_method(router_method)
# Use the path as the new trigger method
trigger_method = path
for listener, (condition_type, methods) in self._listeners.items():
if condition_type == "OR":
if trigger_method in methods:
listener_tasks.append(
self._execute_single_listener(listener, result)
)
elif condition_type == "AND":
if listener not in self._pending_and_listeners:
self._pending_and_listeners[listener] = set()
self._pending_and_listeners[listener].add(trigger_method)
if set(methods) == self._pending_and_listeners[listener]:
listener_tasks.append(
self._execute_single_listener(listener, result)
)
del self._pending_and_listeners[listener]
# Run all listener tasks concurrently and wait for them to complete
await asyncio.gather(*listener_tasks)
async def _execute_single_listener(self, listener: str, result: Any):
try:
method = self._methods[listener]
sig = inspect.signature(method)
params = list(sig.parameters.values())
# Exclude 'self' parameter
method_params = [p for p in params if p.name != "self"]
if method_params:
# If listener expects parameters, pass the result
listener_result = await self._execute_method(method, result)
else:
# If listener does not expect parameters, call without arguments
listener_result = await self._execute_method(method)
# Execute listeners of this listener
await self._execute_listeners(listener, listener_result)
except Exception as e:
print(f"[Flow._execute_single_listener] Error in method {listener}: {e}")
import traceback
traceback.print_exc()
def plot(self, filename: str = "crewai_flow_graph"):
plot_flow(self, filename)

View File

@@ -0,0 +1,99 @@
# flow_visualizer.py
import os
from pyvis.network import Network
from crewai.flow.config import COLORS, NODE_STYLES
from crewai.flow.html_template_handler import HTMLTemplateHandler
from crewai.flow.legend_generator import generate_legend_items_html, get_legend_items
from crewai.flow.utils import calculate_node_levels
from crewai.flow.visualization_utils import (
add_edges,
add_nodes_to_network,
compute_positions,
)
class FlowPlot:
def __init__(self, flow):
self.flow = flow
self.colors = COLORS
self.node_styles = NODE_STYLES
def plot(self, filename):
net = Network(
directed=True,
height="750px",
width="100%",
bgcolor=self.colors["bg"],
layout=None,
)
# Calculate levels for nodes
node_levels = calculate_node_levels(self.flow)
# Compute positions
node_positions = compute_positions(self.flow, node_levels)
# Add nodes to the network
add_nodes_to_network(net, self.flow, node_positions, self.node_styles)
# Add edges to the network
add_edges(net, self.flow, node_positions, self.colors)
# Set options to disable physics
net.set_options(
"""
var options = {
"physics": {
"enabled": false
}
}
"""
)
network_html = net.generate_html()
final_html_content = self._generate_final_html(network_html)
# Save the final HTML content to the file
with open(f"{filename}.html", "w", encoding="utf-8") as f:
f.write(final_html_content)
print(f"Graph saved as {filename}.html")
self._cleanup_pyvis_lib()
def _generate_final_html(self, network_html):
# Extract just the body content from the generated HTML
current_dir = os.path.dirname(__file__)
template_path = os.path.join(
current_dir, "assets", "crewai_flow_visual_template.html"
)
logo_path = os.path.join(current_dir, "assets", "crewai_logo.svg")
html_handler = HTMLTemplateHandler(template_path, logo_path)
network_body = html_handler.extract_body_content(network_html)
# Generate the legend items HTML
legend_items = get_legend_items(self.colors)
legend_items_html = generate_legend_items_html(legend_items)
final_html_content = html_handler.generate_final_html(
network_body, legend_items_html
)
return final_html_content
def _cleanup_pyvis_lib(self):
# Clean up the generated lib folder
lib_folder = os.path.join(os.getcwd(), "lib")
try:
if os.path.exists(lib_folder) and os.path.isdir(lib_folder):
import shutil
shutil.rmtree(lib_folder)
except Exception as e:
print(f"Error cleaning up {lib_folder}: {e}")
def plot_flow(flow, filename="flow_graph"):
visualizer = FlowPlot(flow)
visualizer.plot(filename)

View File

@@ -0,0 +1,66 @@
import base64
import os
import re
class HTMLTemplateHandler:
def __init__(self, template_path, logo_path):
self.template_path = template_path
self.logo_path = logo_path
def read_template(self):
with open(self.template_path, "r", encoding="utf-8") as f:
return f.read()
def encode_logo(self):
with open(self.logo_path, "rb") as logo_file:
logo_svg_data = logo_file.read()
return base64.b64encode(logo_svg_data).decode("utf-8")
def extract_body_content(self, html):
match = re.search("<body.*?>(.*?)</body>", html, re.DOTALL)
return match.group(1) if match else ""
def generate_legend_items_html(self, legend_items):
legend_items_html = ""
for item in legend_items:
if "border" in item:
legend_items_html += f"""
<div class="legend-item">
<div class="legend-color-box" style="background-color: {item['color']}; border: 2px dashed {item['border']};"></div>
<div>{item['label']}</div>
</div>
"""
elif item.get("dashed") is not None:
style = "dashed" if item["dashed"] else "solid"
legend_items_html += f"""
<div class="legend-item">
<div class="legend-{style}" style="border-bottom: 2px {style} {item['color']};"></div>
<div>{item['label']}</div>
</div>
"""
else:
legend_items_html += f"""
<div class="legend-item">
<div class="legend-color-box" style="background-color: {item['color']};"></div>
<div>{item['label']}</div>
</div>
"""
return legend_items_html
def generate_final_html(self, network_body, legend_items_html, title="Flow Graph"):
html_template = self.read_template()
logo_svg_base64 = self.encode_logo()
final_html_content = html_template.replace("{{ title }}", title)
final_html_content = final_html_content.replace(
"{{ network_content }}", network_body
)
final_html_content = final_html_content.replace(
"{{ logo_svg_base64 }}", logo_svg_base64
)
final_html_content = final_html_content.replace(
"<!-- LEGEND_ITEMS_PLACEHOLDER -->", legend_items_html
)
return final_html_content

View File

@@ -0,0 +1,46 @@
def get_legend_items(colors):
return [
{"label": "Start Method", "color": colors["start"]},
{"label": "Method", "color": colors["method"]},
{
"label": "Router",
"color": colors["router"],
"border": colors["router_border"],
"dashed": True,
},
{"label": "Trigger", "color": colors["edge"], "dashed": False},
{"label": "AND Trigger", "color": colors["edge"], "dashed": True},
{
"label": "Router Trigger",
"color": colors["router_edge"],
"dashed": True,
},
]
def generate_legend_items_html(legend_items):
legend_items_html = ""
for item in legend_items:
if "border" in item:
legend_items_html += f"""
<div class="legend-item">
<div class="legend-color-box" style="background-color: {item['color']}; border: 2px dashed {item['border']};"></div>
<div>{item['label']}</div>
</div>
"""
elif item.get("dashed") is not None:
style = "dashed" if item["dashed"] else "solid"
legend_items_html += f"""
<div class="legend-item">
<div class="legend-{style}" style="border-bottom: 2px {style} {item['color']};"></div>
<div>{item['label']}</div>
</div>
"""
else:
legend_items_html += f"""
<div class="legend-item">
<div class="legend-color-box" style="background-color: {item['color']};"></div>
<div>{item['label']}</div>
</div>
"""
return legend_items_html

143
src/crewai/flow/utils.py Normal file
View File

@@ -0,0 +1,143 @@
def calculate_node_levels(flow):
levels = {}
queue = []
visited = set()
pending_and_listeners = {}
# Make all start methods at level 0
for method_name, method in flow._methods.items():
if hasattr(method, "__is_start_method__"):
levels[method_name] = 0
queue.append(method_name)
# Breadth-first traversal to assign levels
while queue:
current = queue.pop(0)
current_level = levels[current]
visited.add(current)
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if condition_type == "OR":
if current in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
elif condition_type == "AND":
if listener_name not in pending_and_listeners:
pending_and_listeners[listener_name] = set()
if current in trigger_methods:
pending_and_listeners[listener_name].add(current)
if set(trigger_methods) == pending_and_listeners[listener_name]:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
# Handle router connections
if current in flow._routers.values():
router_method_name = current
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
if (
listener_name not in levels
or levels[listener_name] > current_level + 1
):
levels[listener_name] = current_level + 1
if listener_name not in visited:
queue.append(listener_name)
return levels
def count_outgoing_edges(flow):
counts = {}
for method_name in flow._methods:
counts[method_name] = 0
for method_name in flow._listeners:
_, trigger_methods = flow._listeners[method_name]
for trigger in trigger_methods:
if trigger in flow._methods:
counts[trigger] += 1
return counts
def build_ancestor_dict(flow):
ancestors = {node: set() for node in flow._methods}
visited = set()
for node in flow._methods:
if node not in visited:
dfs_ancestors(node, ancestors, visited, flow)
return ancestors
def dfs_ancestors(node, ancestors, visited, flow):
if node in visited:
return
visited.add(node)
# Handle regular listeners
for listener_name, (_, trigger_methods) in flow._listeners.items():
if node in trigger_methods:
ancestors[listener_name].add(node)
ancestors[listener_name].update(ancestors[node])
dfs_ancestors(listener_name, ancestors, visited, flow)
# Handle router methods separately
if node in flow._routers.values():
router_method_name = node
paths = flow._router_paths.get(router_method_name, [])
for path in paths:
for listener_name, (_, trigger_methods) in flow._listeners.items():
if path in trigger_methods:
# Only propagate the ancestors of the router method, not the router method itself
ancestors[listener_name].update(ancestors[node])
dfs_ancestors(listener_name, ancestors, visited, flow)
def is_ancestor(node, ancestor_candidate, ancestors):
return ancestor_candidate in ancestors.get(node, set())
def build_parent_children_dict(flow):
parent_children = {}
# Map listeners to their trigger methods
for listener_name, (_, trigger_methods) in flow._listeners.items():
for trigger in trigger_methods:
if trigger not in parent_children:
parent_children[trigger] = []
if listener_name not in parent_children[trigger]:
parent_children[trigger].append(listener_name)
# Map router methods to their paths and to listeners
for router_method_name, paths in flow._router_paths.items():
for path in paths:
# Map router method to listeners of each path
for listener_name, (_, trigger_methods) in flow._listeners.items():
if path in trigger_methods:
if router_method_name not in parent_children:
parent_children[router_method_name] = []
if listener_name not in parent_children[router_method_name]:
parent_children[router_method_name].append(listener_name)
return parent_children
def get_child_index(parent, child, parent_children):
children = parent_children.get(parent, [])
children.sort()
return children.index(child)

View File

@@ -0,0 +1,132 @@
from .utils import (
build_ancestor_dict,
build_parent_children_dict,
get_child_index,
is_ancestor,
)
def compute_positions(flow, node_levels, y_spacing=150, x_spacing=150):
level_nodes = {}
node_positions = {}
for method_name, level in node_levels.items():
level_nodes.setdefault(level, []).append(method_name)
for level, nodes in level_nodes.items():
x_offset = -(len(nodes) - 1) * x_spacing / 2 # Center nodes horizontally
for i, method_name in enumerate(nodes):
x = x_offset + i * x_spacing
y = level * y_spacing
node_positions[method_name] = (x, y)
return node_positions
def add_edges(net, flow, node_positions, colors):
ancestors = build_ancestor_dict(flow)
parent_children = build_parent_children_dict(flow)
for method_name in flow._listeners:
condition_type, trigger_methods = flow._listeners[method_name]
is_and_condition = condition_type == "AND"
for trigger in trigger_methods:
if trigger in flow._methods or trigger in flow._routers.values():
is_router_edge = any(
trigger in paths for paths in flow._router_paths.values()
)
edge_color = colors["router_edge"] if is_router_edge else colors["edge"]
is_cycle_edge = is_ancestor(trigger, method_name, ancestors)
parent_has_multiple_children = len(parent_children.get(trigger, [])) > 1
needs_curvature = is_cycle_edge or parent_has_multiple_children
if needs_curvature:
source_pos = node_positions.get(trigger)
target_pos = node_positions.get(method_name)
if source_pos and target_pos:
dx = target_pos[0] - source_pos[0]
smooth_type = "curvedCCW" if dx <= 0 else "curvedCW"
index = get_child_index(trigger, method_name, parent_children)
edge_smooth = {
"type": smooth_type,
"roundness": 0.2 + (0.1 * index),
}
else:
edge_smooth = {"type": "cubicBezier"}
else:
edge_smooth = False
edge_style = {
"color": edge_color,
"width": 2,
"arrows": "to",
"dashes": True if is_router_edge or is_and_condition else False,
"smooth": edge_smooth,
}
net.add_edge(trigger, method_name, **edge_style)
for router_method_name, paths in flow._router_paths.items():
for path in paths:
for listener_name, (
condition_type,
trigger_methods,
) in flow._listeners.items():
if path in trigger_methods:
is_cycle_edge = is_ancestor(trigger, method_name, ancestors)
parent_has_multiple_children = (
len(parent_children.get(router_method_name, [])) > 1
)
needs_curvature = is_cycle_edge or parent_has_multiple_children
if needs_curvature:
source_pos = node_positions.get(router_method_name)
target_pos = node_positions.get(listener_name)
if source_pos and target_pos:
dx = target_pos[0] - source_pos[0]
smooth_type = "curvedCCW" if dx <= 0 else "curvedCW"
index = get_child_index(
router_method_name, listener_name, parent_children
)
edge_smooth = {
"type": smooth_type,
"roundness": 0.2 + (0.1 * index),
}
else:
edge_smooth = {"type": "cubicBezier"}
else:
edge_smooth = False
edge_style = {
"color": colors["router_edge"],
"width": 2,
"arrows": "to",
"dashes": True,
"smooth": edge_smooth,
}
net.add_edge(router_method_name, listener_name, **edge_style)
def add_nodes_to_network(net, flow, node_positions, node_styles):
for method_name, (x, y) in node_positions.items():
method = flow._methods.get(method_name)
if hasattr(method, "__is_start_method__"):
node_style = node_styles["start"]
elif hasattr(method, "__is_router__"):
node_style = node_styles["router"]
else:
node_style = node_styles["method"]
net.add_node(
method_name,
label=method_name,
x=x,
y=y,
fixed=True,
physics=False,
**node_style,
)

View File

@@ -1,6 +1,7 @@
from functools import wraps
from crewai.project.utils import memoize
from crewai import Crew
def task(func):
@@ -72,7 +73,7 @@ def pipeline(func):
return memoize(func)
def crew(func):
def crew(func) -> "Crew":
def wrapper(self, *args, **kwargs):
instantiated_tasks = []
instantiated_agents = []

View File

@@ -5,11 +5,8 @@ from typing import Any, Callable, Dict, Type, TypeVar
import yaml
from dotenv import load_dotenv
from crewai.crew import Crew
load_dotenv()
T = TypeVar("T", bound=Type[Any])
@@ -37,19 +34,6 @@ def CrewBase(cls: T) -> T:
self.map_all_agent_variables()
self.map_all_task_variables()
def crew(self) -> "Crew":
agents = [
getattr(self, name)()
for name, func in self._get_all_functions().items()
if hasattr(func, "is_agent")
]
tasks = [
getattr(self, name)()
for name, func in reversed(self._get_all_functions().items())
if hasattr(func, "is_task")
]
return Crew(agents=agents, tasks=tasks)
@staticmethod
def load_yaml(config_path: Path):
try:

View File

@@ -1093,7 +1093,7 @@ def test_agent_training_handler(crew_training_handler):
result = agent._training_handler(task_prompt=task_prompt)
assert result == "What is 1 + 1?You MUST follow these feedbacks: \n good"
assert result == "What is 1 + 1?\n\nYou MUST follow these instructions: \n good"
crew_training_handler.assert_has_calls(
[mock.call(), mock.call("training_data.pkl"), mock.call().load()]
@@ -1121,8 +1121,8 @@ def test_agent_use_trained_data(crew_training_handler):
result = agent._use_trained_data(task_prompt=task_prompt)
assert (
result == "What is 1 + 1?You MUST follow these feedbacks: \n "
"The result of the math operation must be right.\n - Result must be better than 1."
result == "What is 1 + 1?\n\nYou MUST follow these instructions: \n"
" - The result of the math operation must be right.\n - Result must be better than 1."
)
crew_training_handler.assert_has_calls(
[mock.call(), mock.call("trained_agents_data.pkl"), mock.call().load()]
@@ -1205,7 +1205,9 @@ def test_agent_with_custom_stop_words():
)
assert isinstance(agent.llm, LLM)
assert agent.llm.stop == stop_words
assert set(agent.llm.stop) == set(stop_words + ["\nObservation:"])
assert all(word in agent.llm.stop for word in stop_words)
assert "\nObservation:" in agent.llm.stop
def test_agent_with_callbacks():
@@ -1368,7 +1370,8 @@ def test_agent_with_all_llm_attributes():
assert agent.llm.temperature == 0.7
assert agent.llm.top_p == 0.9
assert agent.llm.n == 1
assert agent.llm.stop == ["STOP", "END"]
assert set(agent.llm.stop) == set(["STOP", "END", "\nObservation:"])
assert all(word in agent.llm.stop for word in ["STOP", "END", "\nObservation:"])
assert agent.llm.max_tokens == 100
assert agent.llm.presence_penalty == 0.1
assert agent.llm.frequency_penalty == 0.1

View File

@@ -50,11 +50,12 @@ def test_evaluate_training_data(converter_mock):
text="Assess the quality of the training data based on the llm output, human feedback , and llm "
"output improved result.\n\nInitial Output:\nInitial output 1\n\nHuman Feedback:\nHuman feedback "
"1\n\nImproved Output:\nImproved output 1\n\nInitial Output:\nInitial output 2\n\nHuman "
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\nPlease provide:\n- "
"Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs "
"provide action items based on human_feedback for future tasks\n- A score from 0 to 10 evaluating "
"on completion, quality, and overall performance from the improved output to the initial output "
"based on the human feedback\n",
"Feedback:\nHuman feedback 2\n\nImproved Output:\nImproved output 2\n\nPlease provide:\n- Provide "
"a list of clear, actionable instructions derived from the Human Feedbacks to enhance the Agent's "
"performance. Analyze the differences between Initial Outputs and Improved Outputs to generate specific "
"action items for future tasks. Ensure all key and specificpoints from the human feedback are "
"incorporated into these instructions.\n- A score from 0 to 10 evaluating on completion, quality, and "
"overall performance from the improved output to the initial output based on the human feedback\n",
model=TrainingTaskEvaluation,
instructions="I'm gonna convert this raw text into valid JSON.\n\nThe json should have the "
"following structure, with the following keys:\n{\n suggestions: List[str],\n quality: float,\n final_summary: str\n}",