Compare commits

...

9 Commits

Author SHA1 Message Date
Brandon Hancock
faf33c9eee Remove contextual memory print statements 2024-11-07 10:42:51 -05:00
João Moura
9f2acfe91f making sure we don't check for agents that were not used in the crew 2024-11-06 23:07:23 -03:00
Brandon Hancock (bhancock_ai)
e856359e23 fix missing config (#1557) 2024-11-05 12:07:29 -05:00
Brandon Hancock (bhancock_ai)
faa231e278 Fix flows to support cycles and added in test (#1556) 2024-11-05 12:02:54 -05:00
Brandon Hancock (bhancock_ai)
3d44795476 Feat/watson in cli (#1535)
* getting cli and .env to work together for different models

* support new models

* clean up prints

* Add support for cerebras

* Fix watson keys
2024-11-05 12:01:57 -05:00
Tony Kipkemboi
f50e709985 docs update (#1558)
* add llm providers accordion group

* fix numbering

* Fix directory tree & add llms to accordion

* update crewai enterprise link in docs
2024-11-05 11:26:19 -05:00
Brandon Hancock (bhancock_ai)
d70c542547 Raise an error if an LLM doesnt return a response (#1548) 2024-11-04 11:42:38 -05:00
Gui Vieira
57201fb856 Increase providers fetching timeout 2024-11-01 18:54:40 -03:00
Brandon Hancock (bhancock_ai)
9b142e580b add inputs to flows (#1553)
* add inputs to flows

* fix flows lint
2024-11-01 14:37:02 -07:00
15 changed files with 720 additions and 187 deletions

View File

@@ -18,60 +18,63 @@ Flows allow you to create structured, event-driven workflows. They provide a sea
4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows. 4. **Flexible Control Flow**: Implement conditional logic, loops, and branching within your workflows.
5. **Input Flexibility**: Flows can accept inputs to initialize or update their state, with different handling for structured and unstructured state management.
## Getting Started ## Getting Started
Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task. Let's create a simple Flow where you will use OpenAI to generate a random city in one task and then use that city to generate a fun fact in another task.
```python Code ### Passing Inputs to Flows
Flows can accept inputs to initialize or update their state before execution. The way inputs are handled depends on whether the flow uses structured or unstructured state management.
#### Structured State Management
In structured state management, the flow's state is defined using a Pydantic `BaseModel`. Inputs must match the model's schema, and any updates will overwrite the default values.
```python
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
from dotenv import load_dotenv from pydantic import BaseModel
from litellm import completion
class ExampleState(BaseModel):
counter: int = 0
message: str = ""
class ExampleFlow(Flow): class StructuredExampleFlow(Flow[ExampleState]):
model = "gpt-4o-mini"
@start() @start()
def generate_city(self): def first_method(self):
print("Starting flow") # Implementation
response = completion( flow = StructuredExampleFlow()
model=self.model, flow.kickoff(inputs={"counter": 10})
messages=[ ```
{
"role": "user",
"content": "Return the name of a random city in the world.",
},
],
)
random_city = response["choices"][0]["message"]["content"] In this example, the `counter` is initialized to `10`, while `message` retains its default value.
print(f"Random City: {random_city}")
return random_city #### Unstructured State Management
@listen(generate_city) In unstructured state management, the flow's state is a dictionary. You can pass any dictionary to update the state.
def generate_fun_fact(self, random_city):
response = completion(
model=self.model,
messages=[
{
"role": "user",
"content": f"Tell me a fun fact about {random_city}",
},
],
)
fun_fact = response["choices"][0]["message"]["content"] ```python
return fun_fact from crewai.flow.flow import Flow, listen, start
class UnstructuredExampleFlow(Flow):
@start()
def first_method(self):
# Implementation
flow = UnstructuredExampleFlow()
flow.kickoff(inputs={"counter": 5, "message": "Initial message"})
```
flow = ExampleFlow() Here, both `counter` and `message` are updated based on the provided inputs.
result = flow.kickoff()
print(f"Generated fun fact: {result}") **Note:** Ensure that inputs for structured state management adhere to the defined schema to avoid validation errors.
### Example Flow
```python
# Existing example code
``` ```
In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task. In the above example, we have created a simple Flow that generates a random city using OpenAI and then generates a fun fact about that city. The Flow consists of two tasks: `generate_city` and `generate_fun_fact`. The `generate_city` task is the starting point of the Flow, and the `generate_fun_fact` task listens for the output of the `generate_city` task.
@@ -94,14 +97,14 @@ The `@listen()` decorator can be used in several ways:
1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered. 1. **Listening to a Method by Name**: You can pass the name of the method you want to listen to as a string. When that method completes, the listener method will be triggered.
```python Code ```python
@listen("generate_city") @listen("generate_city")
def generate_fun_fact(self, random_city): def generate_fun_fact(self, random_city):
# Implementation # Implementation
``` ```
2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered. 2. **Listening to a Method Directly**: You can pass the method itself. When that method completes, the listener method will be triggered.
```python Code ```python
@listen(generate_city) @listen(generate_city)
def generate_fun_fact(self, random_city): def generate_fun_fact(self, random_city):
# Implementation # Implementation
@@ -118,7 +121,7 @@ When you run a Flow, the final output is determined by the last method that comp
Here's how you can access the final output: Here's how you can access the final output:
<CodeGroup> <CodeGroup>
```python Code ```python
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
class OutputExampleFlow(Flow): class OutputExampleFlow(Flow):
@@ -130,18 +133,17 @@ class OutputExampleFlow(Flow):
def second_method(self, first_output): def second_method(self, first_output):
return f"Second method received: {first_output}" return f"Second method received: {first_output}"
flow = OutputExampleFlow() flow = OutputExampleFlow()
final_output = flow.kickoff() final_output = flow.kickoff()
print("---- Final Output ----") print("---- Final Output ----")
print(final_output) print(final_output)
```` ```
``` text Output ```text
---- Final Output ---- ---- Final Output ----
Second method received: Output from first_method Second method received: Output from first_method
```` ```
</CodeGroup> </CodeGroup>
@@ -156,7 +158,7 @@ Here's an example of how to update and access the state:
<CodeGroup> <CodeGroup>
```python Code ```python
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel from pydantic import BaseModel
@@ -184,7 +186,7 @@ print("Final State:")
print(flow.state) print(flow.state)
``` ```
```text Output ```text
Final Output: Hello from first_method - updated by second_method Final Output: Hello from first_method - updated by second_method
Final State: Final State:
counter=2 message='Hello from first_method - updated by second_method' counter=2 message='Hello from first_method - updated by second_method'
@@ -208,10 +210,10 @@ allowing developers to choose the approach that best fits their application's ne
In unstructured state management, all state is stored in the `state` attribute of the `Flow` class. In unstructured state management, all state is stored in the `state` attribute of the `Flow` class.
This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema. This approach offers flexibility, enabling developers to add or modify state attributes on the fly without defining a strict schema.
```python Code ```python
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
class UntructuredExampleFlow(Flow): class UnstructuredExampleFlow(Flow):
@start() @start()
def first_method(self): def first_method(self):
@@ -230,8 +232,7 @@ class UntructuredExampleFlow(Flow):
print(f"State after third_method: {self.state}") print(f"State after third_method: {self.state}")
flow = UnstructuredExampleFlow()
flow = UntructuredExampleFlow()
flow.kickoff() flow.kickoff()
``` ```
@@ -245,16 +246,14 @@ flow.kickoff()
Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow. Structured state management leverages predefined schemas to ensure consistency and type safety across the workflow.
By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments. By using models like Pydantic's `BaseModel`, developers can define the exact shape of the state, enabling better validation and auto-completion in development environments.
```python Code ```python
from crewai.flow.flow import Flow, listen, start from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel from pydantic import BaseModel
class ExampleState(BaseModel): class ExampleState(BaseModel):
counter: int = 0 counter: int = 0
message: str = "" message: str = ""
class StructuredExampleFlow(Flow[ExampleState]): class StructuredExampleFlow(Flow[ExampleState]):
@start() @start()
@@ -273,7 +272,6 @@ class StructuredExampleFlow(Flow[ExampleState]):
print(f"State after third_method: {self.state}") print(f"State after third_method: {self.state}")
flow = StructuredExampleFlow() flow = StructuredExampleFlow()
flow.kickoff() flow.kickoff()
``` ```
@@ -307,7 +305,7 @@ The `or_` function in Flows allows you to listen to multiple methods and trigger
<CodeGroup> <CodeGroup>
```python Code ```python
from crewai.flow.flow import Flow, listen, or_, start from crewai.flow.flow import Flow, listen, or_, start
class OrExampleFlow(Flow): class OrExampleFlow(Flow):
@@ -324,13 +322,11 @@ class OrExampleFlow(Flow):
def logger(self, result): def logger(self, result):
print(f"Logger: {result}") print(f"Logger: {result}")
flow = OrExampleFlow() flow = OrExampleFlow()
flow.kickoff() flow.kickoff()
``` ```
```text Output ```text
Logger: Hello from the start method Logger: Hello from the start method
Logger: Hello from the second method Logger: Hello from the second method
``` ```
@@ -346,7 +342,7 @@ The `and_` function in Flows allows you to listen to multiple methods and trigge
<CodeGroup> <CodeGroup>
```python Code ```python
from crewai.flow.flow import Flow, and_, listen, start from crewai.flow.flow import Flow, and_, listen, start
class AndExampleFlow(Flow): class AndExampleFlow(Flow):
@@ -368,7 +364,7 @@ flow = AndExampleFlow()
flow.kickoff() flow.kickoff()
``` ```
```text Output ```text
---- Logger ---- ---- Logger ----
{'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'} {'greeting': 'Hello from the start method', 'joke': 'What do computers eat? Microchips.'}
``` ```
@@ -385,7 +381,7 @@ You can specify different routes based on the output of the method, allowing you
<CodeGroup> <CodeGroup>
```python Code ```python
import random import random
from crewai.flow.flow import Flow, listen, router, start from crewai.flow.flow import Flow, listen, router, start
from pydantic import BaseModel from pydantic import BaseModel
@@ -416,12 +412,11 @@ class RouterFlow(Flow[ExampleState]):
def fourth_method(self): def fourth_method(self):
print("Fourth method running") print("Fourth method running")
flow = RouterFlow() flow = RouterFlow()
flow.kickoff() flow.kickoff()
``` ```
```text Output ```text
Starting the structured flow Starting the structured flow
Third method running Third method running
Fourth method running Fourth method running
@@ -484,7 +479,7 @@ The `main.py` file is where you create your flow and connect the crews together.
Here's an example of how you can connect the `poem_crew` in the `main.py` file: Here's an example of how you can connect the `poem_crew` in the `main.py` file:
```python Code ```python
#!/usr/bin/env python #!/usr/bin/env python
from random import randint from random import randint
@@ -612,7 +607,7 @@ CrewAI provides two convenient methods to generate plots of your flows:
If you are working directly with a flow instance, you can generate a plot by calling the `plot()` method on your flow object. This method will create an HTML file containing the interactive plot of your flow. If you are working directly with a flow instance, you can generate a plot by calling the `plot()` method on your flow object. This method will create an HTML file containing the interactive plot of your flow.
```python Code ```python
# Assuming you have a flow instance # Assuming you have a flow instance
flow.plot("my_flow_plot") flow.plot("my_flow_plot")
``` ```

View File

@@ -330,4 +330,4 @@ This will clear the crew's memory, allowing for a fresh start.
## Deploying Your Project ## Deploying Your Project
The easiest way to deploy your crew is through [CrewAI Enterprise](https://www.crewai.com/crewaiplus), where you can deploy your crew in a few clicks. The easiest way to deploy your crew is through [CrewAI Enterprise](http://app.crewai.com/), where you can deploy your crew in a few clicks.

View File

@@ -8,6 +8,7 @@ from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from crewai.agents import CacheHandler from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.crew_agent_executor import CrewAgentExecutor from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.cli.constants import ENV_VARS
from crewai.llm import LLM from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.tools.agent_tools.agent_tools import AgentTools
@@ -131,8 +132,12 @@ class Agent(BaseAgent):
# If it's already an LLM instance, keep it as is # If it's already an LLM instance, keep it as is
pass pass
elif self.llm is None: elif self.llm is None:
# If it's None, use environment variables or default # Determine the model name from environment variables or use default
model_name = os.environ.get("OPENAI_MODEL_NAME", "gpt-4o-mini") model_name = (
os.environ.get("OPENAI_MODEL_NAME")
or os.environ.get("MODEL")
or "gpt-4o-mini"
)
llm_params = {"model": model_name} llm_params = {"model": model_name}
api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get( api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get(
@@ -141,9 +146,39 @@ class Agent(BaseAgent):
if api_base: if api_base:
llm_params["base_url"] = api_base llm_params["base_url"] = api_base
api_key = os.environ.get("OPENAI_API_KEY") # Iterate over all environment variables to find matching API keys or use defaults
if api_key: for provider, env_vars in ENV_VARS.items():
llm_params["api_key"] = api_key for env_var in env_vars:
# Check if the environment variable is set
if "key_name" in env_var:
env_value = os.environ.get(env_var["key_name"])
if env_value:
# Map key names containing "API_KEY" to "api_key"
key_name = (
"api_key"
if "API_KEY" in env_var["key_name"]
else env_var["key_name"]
)
# Map key names containing "API_BASE" to "api_base"
key_name = (
"api_base"
if "API_BASE" in env_var["key_name"]
else key_name
)
# Map key names containing "API_VERSION" to "api_version"
key_name = (
"api_version"
if "API_VERSION" in env_var["key_name"]
else key_name
)
llm_params[key_name] = env_value
# Check for default values if the environment variable is not set
elif env_var.get("default", False):
for key, value in env_var.items():
if key not in ["prompt", "key_name", "default"]:
# Only add default if the key is already set in os.environ
if key in os.environ:
llm_params[key] = value
self.llm = LLM(**llm_params) self.llm = LLM(**llm_params)
else: else:

View File

@@ -117,6 +117,15 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
callbacks=self.callbacks, callbacks=self.callbacks,
) )
if answer is None or answer == "":
self._printer.print(
content="Received None or empty response from LLM call.",
color="red",
)
raise ValueError(
"Invalid response from LLM call - None or empty."
)
if not self.use_stop_words: if not self.use_stop_words:
try: try:
self._format_answer(answer) self._format_answer(answer)

View File

@@ -54,7 +54,7 @@ def create_embedded_crew(crew_name: str, parent_folder: Path) -> None:
templates_dir = Path(__file__).parent / "templates" / "crew" templates_dir = Path(__file__).parent / "templates" / "crew"
config_template_files = ["agents.yaml", "tasks.yaml"] config_template_files = ["agents.yaml", "tasks.yaml"]
crew_template_file = f"{folder_name}_crew.py" # Updated file name crew_template_file = f"{folder_name}.py" # Updated file name
for file_name in config_template_files: for file_name in config_template_files:
src_file = templates_dir / "config" / file_name src_file = templates_dir / "config" / file_name

View File

@@ -1,19 +1,168 @@
ENV_VARS = { ENV_VARS = {
'openai': ['OPENAI_API_KEY'], "openai": [
'anthropic': ['ANTHROPIC_API_KEY'], {
'gemini': ['GEMINI_API_KEY'], "prompt": "Enter your OPENAI API key (press Enter to skip)",
'groq': ['GROQ_API_KEY'], "key_name": "OPENAI_API_KEY",
'ollama': ['FAKE_KEY'], }
],
"anthropic": [
{
"prompt": "Enter your ANTHROPIC API key (press Enter to skip)",
"key_name": "ANTHROPIC_API_KEY",
}
],
"gemini": [
{
"prompt": "Enter your GEMINI API key (press Enter to skip)",
"key_name": "GEMINI_API_KEY",
}
],
"groq": [
{
"prompt": "Enter your GROQ API key (press Enter to skip)",
"key_name": "GROQ_API_KEY",
}
],
"watson": [
{
"prompt": "Enter your WATSONX URL (press Enter to skip)",
"key_name": "WATSONX_URL",
},
{
"prompt": "Enter your WATSONX API Key (press Enter to skip)",
"key_name": "WATSONX_APIKEY",
},
{
"prompt": "Enter your WATSONX Project Id (press Enter to skip)",
"key_name": "WATSONX_PROJECT_ID",
},
],
"ollama": [
{
"default": True,
"API_BASE": "http://localhost:11434",
}
],
"bedrock": [
{
"prompt": "Enter your AWS Access Key ID (press Enter to skip)",
"key_name": "AWS_ACCESS_KEY_ID",
},
{
"prompt": "Enter your AWS Secret Access Key (press Enter to skip)",
"key_name": "AWS_SECRET_ACCESS_KEY",
},
{
"prompt": "Enter your AWS Region Name (press Enter to skip)",
"key_name": "AWS_REGION_NAME",
},
],
"azure": [
{
"prompt": "Enter your Azure deployment name (must start with 'azure/')",
"key_name": "model",
},
{
"prompt": "Enter your AZURE API key (press Enter to skip)",
"key_name": "AZURE_API_KEY",
},
{
"prompt": "Enter your AZURE API base URL (press Enter to skip)",
"key_name": "AZURE_API_BASE",
},
{
"prompt": "Enter your AZURE API version (press Enter to skip)",
"key_name": "AZURE_API_VERSION",
},
],
"cerebras": [
{
"prompt": "Enter your Cerebras model name (must start with 'cerebras/')",
"key_name": "model",
},
{
"prompt": "Enter your Cerebras API version (press Enter to skip)",
"key_name": "CEREBRAS_API_KEY",
},
],
} }
PROVIDERS = ['openai', 'anthropic', 'gemini', 'groq', 'ollama']
PROVIDERS = [
"openai",
"anthropic",
"gemini",
"groq",
"ollama",
"watson",
"bedrock",
"azure",
"cerebras",
]
MODELS = { MODELS = {
'openai': ['gpt-4', 'gpt-4o', 'gpt-4o-mini', 'o1-mini', 'o1-preview'], "openai": ["gpt-4", "gpt-4o", "gpt-4o-mini", "o1-mini", "o1-preview"],
'anthropic': ['claude-3-5-sonnet-20240620', 'claude-3-sonnet-20240229', 'claude-3-opus-20240229', 'claude-3-haiku-20240307'], "anthropic": [
'gemini': ['gemini-1.5-flash', 'gemini-1.5-pro', 'gemini-gemma-2-9b-it', 'gemini-gemma-2-27b-it'], "claude-3-5-sonnet-20240620",
'groq': ['llama-3.1-8b-instant', 'llama-3.1-70b-versatile', 'llama-3.1-405b-reasoning', 'gemma2-9b-it', 'gemma-7b-it'], "claude-3-sonnet-20240229",
'ollama': ['llama3.1', 'mixtral'], "claude-3-opus-20240229",
"claude-3-haiku-20240307",
],
"gemini": [
"gemini/gemini-1.5-flash",
"gemini/gemini-1.5-pro",
"gemini/gemini-gemma-2-9b-it",
"gemini/gemini-gemma-2-27b-it",
],
"groq": [
"groq/llama-3.1-8b-instant",
"groq/llama-3.1-70b-versatile",
"groq/llama-3.1-405b-reasoning",
"groq/gemma2-9b-it",
"groq/gemma-7b-it",
],
"ollama": ["ollama/llama3.1", "ollama/mixtral"],
"watson": [
"watsonx/google/flan-t5-xxl",
"watsonx/google/flan-ul2",
"watsonx/bigscience/mt0-xxl",
"watsonx/eleutherai/gpt-neox-20b",
"watsonx/ibm/mpt-7b-instruct2",
"watsonx/bigcode/starcoder",
"watsonx/meta-llama/llama-2-70b-chat",
"watsonx/meta-llama/llama-2-13b-chat",
"watsonx/ibm/granite-13b-instruct-v1",
"watsonx/ibm/granite-13b-chat-v1",
"watsonx/google/flan-t5-xl",
"watsonx/ibm/granite-13b-chat-v2",
"watsonx/ibm/granite-13b-instruct-v2",
"watsonx/elyza/elyza-japanese-llama-2-7b-instruct",
"watsonx/ibm-mistralai/mixtral-8x7b-instruct-v01-q",
],
"bedrock": [
"bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/anthropic.claude-3-opus-20240229-v1:0",
"bedrock/anthropic.claude-v2:1",
"bedrock/anthropic.claude-v2",
"bedrock/anthropic.claude-instant-v1",
"bedrock/meta.llama3-1-405b-instruct-v1:0",
"bedrock/meta.llama3-1-70b-instruct-v1:0",
"bedrock/meta.llama3-1-8b-instruct-v1:0",
"bedrock/meta.llama3-70b-instruct-v1:0",
"bedrock/meta.llama3-8b-instruct-v1:0",
"bedrock/amazon.titan-text-lite-v1",
"bedrock/amazon.titan-text-express-v1",
"bedrock/cohere.command-text-v14",
"bedrock/ai21.j2-mid-v1",
"bedrock/ai21.j2-ultra-v1",
"bedrock/ai21.jamba-instruct-v1:0",
"bedrock/meta.llama2-13b-chat-v1",
"bedrock/meta.llama2-70b-chat-v1",
"bedrock/mistral.mistral-7b-instruct-v0:2",
"bedrock/mistral.mixtral-8x7b-instruct-v0:1",
],
} }
JSON_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json" JSON_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"

View File

@@ -1,11 +1,11 @@
import shutil
import sys import sys
from pathlib import Path from pathlib import Path
import click import click
from crewai.cli.constants import ENV_VARS from crewai.cli.constants import ENV_VARS, MODELS
from crewai.cli.provider import ( from crewai.cli.provider import (
PROVIDERS,
get_provider_data, get_provider_data,
select_model, select_model,
select_provider, select_provider,
@@ -29,14 +29,14 @@ def create_folder_structure(name, parent_folder=None):
click.secho("Operation cancelled.", fg="yellow") click.secho("Operation cancelled.", fg="yellow")
sys.exit(0) sys.exit(0)
click.secho(f"Overriding folder {folder_name}...", fg="green", bold=True) click.secho(f"Overriding folder {folder_name}...", fg="green", bold=True)
else: shutil.rmtree(folder_path) # Delete the existing folder and its contents
click.secho( click.secho(
f"Creating {'crew' if parent_folder else 'folder'} {folder_name}...", f"Creating {'crew' if parent_folder else 'folder'} {folder_name}...",
fg="green", fg="green",
bold=True, bold=True,
) )
if not folder_path.exists():
folder_path.mkdir(parents=True) folder_path.mkdir(parents=True)
(folder_path / "tests").mkdir(exist_ok=True) (folder_path / "tests").mkdir(exist_ok=True)
if not parent_folder: if not parent_folder:
@@ -92,7 +92,10 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
existing_provider = None existing_provider = None
for provider, env_keys in ENV_VARS.items(): for provider, env_keys in ENV_VARS.items():
if any(key in env_vars for key in env_keys): if any(
"key_name" in details and details["key_name"] in env_vars
for details in env_keys
):
existing_provider = provider existing_provider = provider
break break
@@ -118,6 +121,8 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
"No provider selected. Please try again or press 'q' to exit.", fg="red" "No provider selected. Please try again or press 'q' to exit.", fg="red"
) )
# Check if the selected provider has predefined models
if selected_provider in MODELS and MODELS[selected_provider]:
while True: while True:
selected_model = select_model(selected_provider, provider_models) selected_model = select_model(selected_provider, provider_models)
if selected_model is None: # User typed 'q' if selected_model is None: # User typed 'q'
@@ -126,39 +131,38 @@ def create_crew(name, provider=None, skip_provider=False, parent_folder=None):
if selected_model: # Valid selection if selected_model: # Valid selection
break break
click.secho( click.secho(
"No model selected. Please try again or press 'q' to exit.", fg="red" "No model selected. Please try again or press 'q' to exit.",
fg="red",
) )
env_vars["MODEL"] = selected_model
if selected_provider in PROVIDERS: # Check if the selected provider requires API keys
api_key_var = ENV_VARS[selected_provider][0] if selected_provider in ENV_VARS:
else: provider_env_vars = ENV_VARS[selected_provider]
api_key_var = click.prompt( for details in provider_env_vars:
f"Enter the environment variable name for your {selected_provider.capitalize()} API key", if details.get("default", False):
type=str, # Automatically add default key-value pairs
default="", for key, value in details.items():
) if key not in ["prompt", "key_name", "default"]:
env_vars[key] = value
api_key_value = "" elif "key_name" in details:
click.echo( # Prompt for non-default key-value pairs
f"Enter your {selected_provider.capitalize()} API key (press Enter to skip): ", prompt = details["prompt"]
nl=False, key_name = details["key_name"]
) api_key_value = click.prompt(prompt, default="", show_default=False)
try:
api_key_value = input()
except (KeyboardInterrupt, EOFError):
api_key_value = ""
if api_key_value.strip(): if api_key_value.strip():
env_vars = {api_key_var: api_key_value} env_vars[key_name] = api_key_value
if env_vars:
write_env_file(folder_path, env_vars) write_env_file(folder_path, env_vars)
click.secho("API key saved to .env file", fg="green") click.secho("API keys and model saved to .env file", fg="green")
else: else:
click.secho( click.secho(
"No API key provided. Skipping .env file creation.", fg="yellow" "No API keys provided. Skipping .env file creation.", fg="yellow"
) )
env_vars["MODEL"] = selected_model click.secho(f"Selected model: {env_vars.get('MODEL', 'N/A')}", fg="green")
click.secho(f"Selected model: {selected_model}", fg="green")
package_dir = Path(__file__).parent package_dir = Path(__file__).parent
templates_dir = package_dir / "templates" / "crew" templates_dir = package_dir / "templates" / "crew"

View File

@@ -164,7 +164,7 @@ def fetch_provider_data(cache_file):
- dict or None: The fetched provider data or None if the operation fails. - dict or None: The fetched provider data or None if the operation fails.
""" """
try: try:
response = requests.get(JSON_URL, stream=True, timeout=10) response = requests.get(JSON_URL, stream=True, timeout=60)
response.raise_for_status() response.raise_for_status()
data = download_data(response) data = download_data(response)
with open(cache_file, "w") as f: with open(cache_file, "w") as f:

View File

@@ -8,9 +8,12 @@ from crewai.project import CrewBase, agent, crew, task
# from crewai_tools import SerperDevTool # from crewai_tools import SerperDevTool
@CrewBase @CrewBase
class {{crew_name}}Crew(): class {{crew_name}}():
"""{{crew_name}} crew""" """{{crew_name}} crew"""
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent @agent
def researcher(self) -> Agent: def researcher(self) -> Agent:
return Agent( return Agent(

View File

@@ -1,6 +1,10 @@
#!/usr/bin/env python #!/usr/bin/env python
import sys import sys
from {{folder_name}}.crew import {{crew_name}}Crew import warnings
from {{folder_name}}.crew import {{crew_name}}
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
# This main file is intended to be a way for you to run your # This main file is intended to be a way for you to run your
# crew locally, so refrain from adding unnecessary logic into this file. # crew locally, so refrain from adding unnecessary logic into this file.
@@ -14,7 +18,7 @@ def run():
inputs = { inputs = {
'topic': 'AI LLMs' 'topic': 'AI LLMs'
} }
{{crew_name}}Crew().crew().kickoff(inputs=inputs) {{crew_name}}().crew().kickoff(inputs=inputs)
def train(): def train():
@@ -25,7 +29,7 @@ def train():
"topic": "AI LLMs" "topic": "AI LLMs"
} }
try: try:
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs) {{crew_name}}().crew().train(n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs)
except Exception as e: except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}") raise Exception(f"An error occurred while training the crew: {e}")
@@ -35,7 +39,7 @@ def replay():
Replay the crew execution from a specific task. Replay the crew execution from a specific task.
""" """
try: try:
{{crew_name}}Crew().crew().replay(task_id=sys.argv[1]) {{crew_name}}().crew().replay(task_id=sys.argv[1])
except Exception as e: except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}") raise Exception(f"An error occurred while replaying the crew: {e}")
@@ -48,7 +52,7 @@ def test():
"topic": "AI LLMs" "topic": "AI LLMs"
} }
try: try:
{{crew_name}}Crew().crew().test(n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs) {{crew_name}}().crew().test(n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs)
except Exception as e: except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}") raise Exception(f"An error occurred while replaying the crew: {e}")

View File

@@ -445,6 +445,7 @@ class Crew(BaseModel):
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load() training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
for agent in train_crew.agents: for agent in train_crew.agents:
if training_data.get(str(agent.id)):
result = TaskEvaluator(agent).evaluate_training_data( result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id) training_data=training_data, agent_id=str(agent.id)
) )

View File

@@ -1,8 +1,20 @@
import asyncio import asyncio
import inspect import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union from typing import (
Any,
Callable,
Dict,
Generic,
List,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)
from pydantic import BaseModel from pydantic import BaseModel, ValidationError
from crewai.flow.flow_visualizer import plot_flow from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.utils import get_possible_return_constants from crewai.flow.utils import get_possible_return_constants
@@ -119,7 +131,6 @@ class FlowMeta(type):
condition_type = getattr(attr_value, "__condition_type__", "OR") condition_type = getattr(attr_value, "__condition_type__", "OR")
listeners[attr_name] = (condition_type, methods) listeners[attr_name] = (condition_type, methods)
# TODO: should we add a check for __condition_type__ 'AND'?
elif hasattr(attr_value, "__is_router__"): elif hasattr(attr_value, "__is_router__"):
routers[attr_value.__router_for__] = attr_name routers[attr_value.__router_for__] = attr_name
possible_returns = get_possible_return_constants(attr_value) possible_returns = get_possible_return_constants(attr_value)
@@ -159,8 +170,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
def __init__(self) -> None: def __init__(self) -> None:
self._methods: Dict[str, Callable] = {} self._methods: Dict[str, Callable] = {}
self._state: T = self._create_initial_state() self._state: T = self._create_initial_state()
self._executed_methods: Set[str] = set() self._method_execution_counts: Dict[str, int] = {}
self._scheduled_tasks: Set[str] = set()
self._pending_and_listeners: Dict[str, Set[str]] = {} self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs self._method_outputs: List[Any] = [] # List to store all method outputs
@@ -191,10 +201,74 @@ class Flow(Generic[T], metaclass=FlowMeta):
"""Returns the list of all outputs from executed methods.""" """Returns the list of all outputs from executed methods."""
return self._method_outputs return self._method_outputs
def kickoff(self) -> Any: def _initialize_state(self, inputs: Dict[str, Any]) -> None:
"""
Initializes or updates the state with the provided inputs.
Args:
inputs: Dictionary of inputs to initialize or update the state.
Raises:
ValueError: If inputs do not match the structured state model.
TypeError: If state is neither a BaseModel instance nor a dictionary.
"""
if isinstance(self._state, BaseModel):
# Structured state management
try:
# Define a function to create the dynamic class
def create_model_with_extra_forbid(
base_model: Type[BaseModel],
) -> Type[BaseModel]:
class ModelWithExtraForbid(base_model): # type: ignore
model_config = base_model.model_config.copy()
model_config["extra"] = "forbid"
return ModelWithExtraForbid
# Create the dynamic class
ModelWithExtraForbid = create_model_with_extra_forbid(
self._state.__class__
)
# Create a new instance using the combined state and inputs
self._state = cast(
T, ModelWithExtraForbid(**{**self._state.model_dump(), **inputs})
)
except ValidationError as e:
raise ValueError(f"Invalid inputs for structured state: {e}") from e
elif isinstance(self._state, dict):
# Unstructured state management
self._state.update(inputs)
else:
raise TypeError("State must be a BaseModel instance or a dictionary.")
def kickoff(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Starts the execution of the flow synchronously.
Args:
inputs: Optional dictionary of inputs to initialize or update the state.
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
return asyncio.run(self.kickoff_async()) return asyncio.run(self.kickoff_async())
async def kickoff_async(self) -> Any: async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""
Starts the execution of the flow asynchronously.
Args:
inputs: Optional dictionary of inputs to initialize or update the state.
Returns:
The final output from the flow execution.
"""
if inputs is not None:
self._initialize_state(inputs)
if not self._start_methods: if not self._start_methods:
raise ValueError("No start method defined") raise ValueError("No start method defined")
@@ -233,7 +307,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
) )
self._method_outputs.append(result) # Store the output self._method_outputs.append(result) # Store the output
self._executed_methods.add(method_name) # Track method execution counts
self._method_execution_counts[method_name] = (
self._method_execution_counts.get(method_name, 0) + 1
)
return result return result
@@ -243,34 +320,33 @@ class Flow(Generic[T], metaclass=FlowMeta):
if trigger_method in self._routers: if trigger_method in self._routers:
router_method = self._methods[self._routers[trigger_method]] router_method = self._methods[self._routers[trigger_method]]
path = await self._execute_method( path = await self._execute_method(
trigger_method, router_method self._routers[trigger_method], router_method
) # TODO: Change or not? )
# Use the path as the new trigger method
trigger_method = path trigger_method = path
for listener_name, (condition_type, methods) in self._listeners.items(): for listener_name, (condition_type, methods) in self._listeners.items():
if condition_type == "OR": if condition_type == "OR":
if trigger_method in methods: if trigger_method in methods:
if ( # Schedule the listener without preventing re-execution
listener_name not in self._executed_methods
and listener_name not in self._scheduled_tasks
):
self._scheduled_tasks.add(listener_name)
listener_tasks.append( listener_tasks.append(
self._execute_single_listener(listener_name, result) self._execute_single_listener(listener_name, result)
) )
elif condition_type == "AND": elif condition_type == "AND":
if all(method in self._executed_methods for method in methods): # Initialize pending methods for this listener if not already done
if ( if listener_name not in self._pending_and_listeners:
listener_name not in self._executed_methods self._pending_and_listeners[listener_name] = set(methods)
and listener_name not in self._scheduled_tasks # Remove the trigger method from pending methods
): self._pending_and_listeners[listener_name].discard(trigger_method)
self._scheduled_tasks.add(listener_name) if not self._pending_and_listeners[listener_name]:
# All required methods have been executed
listener_tasks.append( listener_tasks.append(
self._execute_single_listener(listener_name, result) self._execute_single_listener(listener_name, result)
) )
# Reset pending methods for this listener
self._pending_and_listeners.pop(listener_name, None)
# Run all listener tasks concurrently and wait for them to complete # Run all listener tasks concurrently and wait for them to complete
if listener_tasks:
await asyncio.gather(*listener_tasks) await asyncio.gather(*listener_tasks)
async def _execute_single_listener(self, listener_name: str, result: Any) -> None: async def _execute_single_listener(self, listener_name: str, result: Any) -> None:
@@ -291,9 +367,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
# If listener does not expect parameters, call without arguments # If listener does not expect parameters, call without arguments
listener_result = await self._execute_method(listener_name, method) listener_result = await self._execute_method(listener_name, method)
# Remove from scheduled tasks after execution
self._scheduled_tasks.discard(listener_name)
# Execute listeners of this listener # Execute listeners of this listener
await self._execute_listeners(listener_name, listener_result) await self._execute_listeners(listener_name, listener_result)
except Exception as e: except Exception as e:

View File

@@ -1,7 +1,10 @@
import io
import logging
import sys
import warnings
from contextlib import contextmanager from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
import logging
import warnings
import litellm import litellm
from litellm import get_supported_openai_params from litellm import get_supported_openai_params
@@ -9,9 +12,6 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException, LLMContextLengthExceededException,
) )
import sys
import io
class FilteredStream(io.StringIO): class FilteredStream(io.StringIO):
def write(self, s): def write(self, s):

View File

@@ -34,7 +34,6 @@ class ContextualMemory:
formatted_results = "\n".join( formatted_results = "\n".join(
[f"- {result['context']}" for result in stm_results] [f"- {result['context']}" for result in stm_results]
) )
print("formatted_results stm", formatted_results)
return f"Recent Insights:\n{formatted_results}" if stm_results else "" return f"Recent Insights:\n{formatted_results}" if stm_results else ""
def _fetch_ltm_context(self, task) -> Optional[str]: def _fetch_ltm_context(self, task) -> Optional[str]:
@@ -54,8 +53,6 @@ class ContextualMemory:
formatted_results = list(dict.fromkeys(formatted_results)) formatted_results = list(dict.fromkeys(formatted_results))
formatted_results = "\n".join([f"- {result}" for result in formatted_results]) # type: ignore # Incompatible types in assignment (expression has type "str", variable has type "list[str]") formatted_results = "\n".join([f"- {result}" for result in formatted_results]) # type: ignore # Incompatible types in assignment (expression has type "str", variable has type "list[str]")
print("formatted_results ltm", formatted_results)
return f"Historical Data:\n{formatted_results}" if ltm_results else "" return f"Historical Data:\n{formatted_results}" if ltm_results else ""
def _fetch_entity_context(self, query) -> str: def _fetch_entity_context(self, query) -> str:
@@ -67,5 +64,4 @@ class ContextualMemory:
formatted_results = "\n".join( formatted_results = "\n".join(
[f"- {result['context']}" for result in em_results] # type: ignore # Invalid index type "str" for "str"; expected type "SupportsIndex | slice" [f"- {result['context']}" for result in em_results] # type: ignore # Invalid index type "str" for "str"; expected type "SupportsIndex | slice"
) )
print("formatted_results em", formatted_results)
return f"Entities:\n{formatted_results}" if em_results else "" return f"Entities:\n{formatted_results}" if em_results else ""

264
tests/flow_test.py Normal file
View File

@@ -0,0 +1,264 @@
"""Test Flow creation and execution basic functionality."""
import asyncio
import pytest
from crewai.flow.flow import Flow, and_, listen, or_, router, start
def test_simple_sequential_flow():
"""Test a simple flow with two steps called sequentially."""
execution_order = []
class SimpleFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
flow = SimpleFlow()
flow.kickoff()
assert execution_order == ["step_1", "step_2"]
def test_flow_with_multiple_starts():
"""Test a flow with multiple start methods."""
execution_order = []
class MultiStartFlow(Flow):
@start()
def step_a(self):
execution_order.append("step_a")
@start()
def step_b(self):
execution_order.append("step_b")
@listen(step_a)
def step_c(self):
execution_order.append("step_c")
@listen(step_b)
def step_d(self):
execution_order.append("step_d")
flow = MultiStartFlow()
flow.kickoff()
assert "step_a" in execution_order
assert "step_b" in execution_order
assert "step_c" in execution_order
assert "step_d" in execution_order
assert execution_order.index("step_c") > execution_order.index("step_a")
assert execution_order.index("step_d") > execution_order.index("step_b")
def test_cyclic_flow():
"""Test a cyclic flow that runs a finite number of iterations."""
execution_order = []
class CyclicFlow(Flow):
iteration = 0
max_iterations = 3
@start("loop")
def step_1(self):
if self.iteration >= self.max_iterations:
return # Do not proceed further
execution_order.append(f"step_1_{self.iteration}")
@listen(step_1)
def step_2(self):
execution_order.append(f"step_2_{self.iteration}")
@router(step_2)
def step_3(self):
execution_order.append(f"step_3_{self.iteration}")
self.iteration += 1
if self.iteration < self.max_iterations:
return "loop"
return "exit"
flow = CyclicFlow()
flow.kickoff()
expected_order = []
for i in range(flow.max_iterations):
expected_order.extend([f"step_1_{i}", f"step_2_{i}", f"step_3_{i}"])
assert execution_order == expected_order
def test_flow_with_and_condition():
"""Test a flow where a step waits for multiple other steps to complete."""
execution_order = []
class AndConditionFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
@start()
def step_2(self):
execution_order.append("step_2")
@listen(and_(step_1, step_2))
def step_3(self):
execution_order.append("step_3")
flow = AndConditionFlow()
flow.kickoff()
assert "step_1" in execution_order
assert "step_2" in execution_order
assert execution_order[-1] == "step_3"
assert execution_order.index("step_3") > execution_order.index("step_1")
assert execution_order.index("step_3") > execution_order.index("step_2")
def test_flow_with_or_condition():
"""Test a flow where a step is triggered when any of multiple steps complete."""
execution_order = []
class OrConditionFlow(Flow):
@start()
def step_a(self):
execution_order.append("step_a")
@start()
def step_b(self):
execution_order.append("step_b")
@listen(or_(step_a, step_b))
def step_c(self):
execution_order.append("step_c")
flow = OrConditionFlow()
flow.kickoff()
assert "step_a" in execution_order or "step_b" in execution_order
assert "step_c" in execution_order
assert execution_order.index("step_c") > min(
execution_order.index("step_a"), execution_order.index("step_b")
)
def test_flow_with_router():
"""Test a flow that uses a router method to determine the next step."""
execution_order = []
class RouterFlow(Flow):
@start()
def start_method(self):
execution_order.append("start_method")
@router(start_method)
def router(self):
execution_order.append("router")
# Ensure the condition is set to True to follow the "step_if_true" path
condition = True
return "step_if_true" if condition else "step_if_false"
@listen("step_if_true")
def truthy(self):
execution_order.append("step_if_true")
@listen("step_if_false")
def falsy(self):
execution_order.append("step_if_false")
flow = RouterFlow()
flow.kickoff()
assert execution_order == ["start_method", "router", "step_if_true"]
def test_async_flow():
"""Test an asynchronous flow."""
execution_order = []
class AsyncFlow(Flow):
@start()
async def step_1(self):
execution_order.append("step_1")
await asyncio.sleep(0.1)
@listen(step_1)
async def step_2(self):
execution_order.append("step_2")
await asyncio.sleep(0.1)
flow = AsyncFlow()
asyncio.run(flow.kickoff_async())
assert execution_order == ["step_1", "step_2"]
def test_flow_with_exceptions():
"""Test flow behavior when exceptions occur in steps."""
execution_order = []
class ExceptionFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
raise ValueError("An error occurred in step_1")
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
flow = ExceptionFlow()
with pytest.raises(ValueError):
flow.kickoff()
# Ensure step_2 did not execute
assert execution_order == ["step_1"]
def test_flow_restart():
"""Test restarting a flow after it has completed."""
execution_order = []
class RestartableFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
flow = RestartableFlow()
flow.kickoff()
flow.kickoff() # Restart the flow
assert execution_order == ["step_1", "step_2", "step_1", "step_2"]
def test_flow_with_custom_state():
"""Test a flow that maintains and modifies internal state."""
class StateFlow(Flow):
def __init__(self):
super().__init__()
self.counter = 0
@start()
def step_1(self):
self.counter += 1
@listen(step_1)
def step_2(self):
self.counter *= 2
assert self.counter == 2
flow = StateFlow()
flow.kickoff()
assert flow.counter == 2