Compare commits

..

3 Commits

Author SHA1 Message Date
Brandon Hancock (bhancock_ai)
3d148090d9 Merge branch 'main' into bugfix/fix-type-error-in-token-counter 2025-02-26 13:25:02 -05:00
Lorenze Jay
82cfd353b3 Merge branch 'main' into bugfix/fix-type-error-in-token-counter 2025-02-26 10:01:19 -08:00
Brandon Hancock
0903bbeca2 Fix type issue 2025-02-25 12:18:52 -05:00
12 changed files with 45 additions and 432 deletions

View File

@@ -136,21 +136,17 @@ crewai test -n 5 -m gpt-3.5-turbo
### 8. Run
Run the crew or flow.
Run the crew.
```shell Terminal
crewai run
```
<Note>
Starting from version 0.103.0, the `crewai run` command can be used to run both standard crews and flows. For flows, it automatically detects the type from pyproject.toml and runs the appropriate command. This is now the recommended way to run both crews and flows.
</Note>
<Note>
Make sure to run these commands from the directory where your CrewAI project is set up.
Some commands may require additional configuration or setup within your project structure.
</Note>
### 9. Chat
Starting in version `0.98.0`, when you run the `crewai chat` command, you start an interactive session with your crew. The AI assistant will guide you by asking for necessary inputs to execute the crew. Once all inputs are provided, the crew will execute its tasks.
@@ -179,6 +175,7 @@ def crew(self) -> Crew:
```
</Note>
### 10. API Keys
When running ```crewai create crew``` command, the CLI will first show you the top 5 most common LLM providers and ask you to select one.

View File

@@ -150,12 +150,12 @@ final_output = flow.kickoff()
print("---- Final Output ----")
print(final_output)
```
````
```text Output
---- Final Output ----
Second method received: Output from first_method
```
````
</CodeGroup>
@@ -738,34 +738,3 @@ Also, check out our YouTube video on how to use flows in CrewAI below!
referrerpolicy="strict-origin-when-cross-origin"
allowfullscreen
></iframe>
## Running Flows
There are two ways to run a flow:
### Using the Flow API
You can run a flow programmatically by creating an instance of your flow class and calling the `kickoff()` method:
```python
flow = ExampleFlow()
result = flow.kickoff()
```
### Using the CLI
Starting from version 0.103.0, you can run flows using the `crewai run` command:
```shell
crewai run
```
This command automatically detects if your project is a flow (based on the `type = "flow"` setting in your pyproject.toml) and runs it accordingly. This is the recommended way to run flows from the command line.
For backward compatibility, you can also use:
```shell
crewai flow kickoff
```
However, the `crewai run` command is now the preferred method as it works for both crews and flows.

View File

@@ -124,15 +124,14 @@ class CrewAgentParser:
)
def _extract_thought(self, text: str) -> str:
thought_index = text.find("\n\nAction")
if thought_index == -1:
thought_index = text.find("\n\nFinal Answer")
if thought_index == -1:
return ""
thought = text[:thought_index].strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
regex = r"(.*?)(?:\n\nAction|\n\nFinal Answer)"
thought_match = re.search(regex, text, re.DOTALL)
if thought_match:
thought = thought_match.group(1).strip()
# Remove any triple backticks from the thought string
thought = thought.replace("```", "").strip()
return thought
return ""
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""

View File

@@ -203,6 +203,7 @@ def install(context):
@crewai.command()
def run():
"""Run the Crew."""
click.echo("Running the Crew")
run_crew()

View File

@@ -1,6 +1,4 @@
import subprocess
from enum import Enum
from typing import List, Optional
import click
from packaging import version
@@ -9,24 +7,16 @@ from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
class CrewType(Enum):
STANDARD = "standard"
FLOW = "flow"
def run_crew() -> None:
"""
Run the crew or flow by running a command in the UV environment.
Starting from version 0.103.0, this command can be used to run both
standard crews and flows. For flows, it detects the type from pyproject.toml
and automatically runs the appropriate command.
Run the crew by running a command in the UV environment.
"""
command = ["uv", "run", "run_crew"]
crewai_version = get_crewai_version()
min_required_version = "0.71.0"
pyproject_data = read_toml()
# Check for legacy poetry configuration
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
@@ -36,54 +26,18 @@ def run_crew() -> None:
fg="red",
)
# Determine crew type
is_flow = pyproject_data.get("tool", {}).get("crewai", {}).get("type") == "flow"
crew_type = CrewType.FLOW if is_flow else CrewType.STANDARD
# Display appropriate message
click.echo(f"Running the {'Flow' if is_flow else 'Crew'}")
# Execute the appropriate command
execute_command(crew_type)
def execute_command(crew_type: CrewType) -> None:
"""
Execute the appropriate command based on crew type.
Args:
crew_type: The type of crew to run
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
try:
subprocess.run(command, capture_output=False, text=True, check=True)
except subprocess.CalledProcessError as e:
handle_error(e, crew_type)
click.echo(f"An error occurred while running the crew: {e}", err=True)
click.echo(e.output, err=True, nl=True)
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
def handle_error(error: subprocess.CalledProcessError, crew_type: CrewType) -> None:
"""
Handle subprocess errors with appropriate messaging.
Args:
error: The subprocess error that occurred
crew_type: The type of crew that was being run
"""
entity_type = "flow" if crew_type == CrewType.FLOW else "crew"
click.echo(f"An error occurred while running the {entity_type}: {error}", err=True)
if error.output:
click.echo(error.output, err=True, nl=True)
pyproject_data = read_toml()
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry, "
"please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)

View File

@@ -30,13 +30,13 @@ crewai install
## Running the Project
To kickstart your flow and begin execution, run this from the root folder of your project:
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
```bash
crewai run
```
This command initializes the {{name}} Flow as defined in your configuration.
This command initializes the {{name}} Crew, assembling the agents and assigning them tasks as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.

View File

@@ -894,45 +894,35 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Routers are executed sequentially to maintain flow control
- Each router's result becomes a new trigger_method
- Each router's result becomes the new trigger_method
- Normal listeners are executed in parallel for efficiency
- Listeners can receive the trigger method's result as a parameter
"""
# First, handle routers repeatedly until no router triggers anymore
router_results = []
current_trigger = trigger_method
while True:
routers_triggered = self._find_triggered_methods(
current_trigger, router_only=True
trigger_method, router_only=True
)
if not routers_triggered:
break
for router_name in routers_triggered:
await self._execute_single_listener(router_name, result)
# After executing router, the router's result is the path
router_result = self._method_outputs[-1]
if router_result: # Only add non-None results
router_results.append(router_result)
current_trigger = (
router_result # Update for next iteration of router chain
)
# The last router executed sets the trigger_method
# The router result is the last element in self._method_outputs
trigger_method = self._method_outputs[-1]
# Now execute normal listeners for all router results and the original trigger
all_triggers = [trigger_method] + router_results
for current_trigger in all_triggers:
if current_trigger: # Skip None results
listeners_triggered = self._find_triggered_methods(
current_trigger, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# Now that no more routers are triggered by current trigger_method,
# execute normal listeners
listeners_triggered = self._find_triggered_methods(
trigger_method, router_only=False
)
if listeners_triggered:
tasks = [
self._execute_single_listener(listener_name, result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
def _find_triggered_methods(
self, trigger_method: str, router_only: bool

View File

@@ -1,4 +1,3 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import PrivateAttr
@@ -53,34 +52,10 @@ class ShortTermMemory(Memory):
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
"""
Save a memory item to the storage.
Args:
value: The data to save.
metadata: Optional metadata to associate with the memory.
agent: Optional agent identifier.
Raises:
ValueError: If the item's timestamp is in the future.
"""
import logging
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
if item.timestamp > datetime.now():
raise ValueError("Cannot save memory item with future timestamp")
logging.debug(f"Saving memory item with timestamp: {item.timestamp}")
if self._memory_provider == "mem0":
item.data = f"Remember the following insights from Agent run: {item.data}"
# Include timestamp in metadata
if item.metadata is None:
item.metadata = {}
item.metadata["timestamp"] = item.timestamp.isoformat()
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
def search(

View File

@@ -1,4 +1,3 @@
from datetime import datetime
from typing import Any, Dict, Optional
@@ -8,11 +7,7 @@ class ShortTermMemoryItem:
data: Any,
agent: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
timestamp: Optional[datetime] = None,
):
if timestamp is not None and timestamp > datetime.now():
raise ValueError("Timestamp cannot be in the future")
self.data = data
self.agent = agent
self.metadata = metadata if metadata is not None else {}
self.timestamp = timestamp if timestamp is not None else datetime.now()

View File

@@ -114,32 +114,13 @@ class RAGStorage(BaseRAGStorage):
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
recency_weight: float = 0.3,
time_decay_days: float = 1.0,
) -> List[Any]:
"""
Search for entries in the storage based on semantic similarity and recency.
Args:
query: The search query string.
limit: Maximum number of results to return.
filter: Optional filter to apply to the search.
score_threshold: Minimum score threshold for results.
recency_weight: Weight given to recency vs. semantic similarity (0.0-1.0).
Higher values prioritize recent memories more strongly.
time_decay_days: Number of days over which recency factor decays to zero.
Smaller values make older memories lose relevance faster.
Returns:
List of search results, each containing id, metadata, context, and score.
Results are sorted by combined semantic similarity and recency score.
"""
if not hasattr(self, "app"):
self._initialize_app()
try:
with suppress_logging():
response = self.collection.query(query_texts=query, n_results=limit * 2) # Get more results to allow for recency filtering
response = self.collection.query(query_texts=query, n_results=limit)
results = []
for i in range(len(response["ids"][0])):
@@ -149,27 +130,10 @@ class RAGStorage(BaseRAGStorage):
"context": response["documents"][0][i],
"score": response["distances"][0][i],
}
# Apply recency boost if timestamp exists in metadata
if "timestamp" in result["metadata"]:
try:
from datetime import datetime
timestamp = datetime.fromisoformat(result["metadata"]["timestamp"])
now = datetime.now()
# Calculate recency factor (newer = higher score)
time_diff_seconds = (now - timestamp).total_seconds()
recency_factor = max(0, 1 - (time_diff_seconds / (time_decay_days * 24 * 60 * 60)))
# Adjust score with recency factor
result["score"] = result["score"] * (1 - recency_weight) + recency_factor * recency_weight
except (ValueError, TypeError):
pass # If timestamp parsing fails, use original score
if result["score"] >= score_threshold:
results.append(result)
# Sort by adjusted score (higher is better)
results.sort(key=lambda x: x["score"], reverse=True)
return results[:limit] # Return only the requested number of results
return results
except Exception as e:
logging.error(f"Error during {self.type} search: {str(e)}")
return []

View File

@@ -654,104 +654,3 @@ def test_flow_plotting():
assert isinstance(received_events[0], FlowPlotEvent)
assert received_events[0].flow_name == "StatelessFlow"
assert isinstance(received_events[0].timestamp, datetime)
def test_multiple_routers_from_same_trigger():
"""Test that multiple routers triggered by the same method all activate their listeners."""
execution_order = []
class MultiRouterFlow(Flow):
def __init__(self):
super().__init__()
# Set diagnosed conditions to trigger all routers
self.state["diagnosed_conditions"] = "DHA" # Contains D, H, and A
@start()
def scan_medical(self):
execution_order.append("scan_medical")
return "scan_complete"
@router(scan_medical)
def diagnose_conditions(self):
execution_order.append("diagnose_conditions")
return "diagnosis_complete"
@router(diagnose_conditions)
def diabetes_router(self):
execution_order.append("diabetes_router")
if "D" in self.state["diagnosed_conditions"]:
return "diabetes"
return None
@listen("diabetes")
def diabetes_analysis(self):
execution_order.append("diabetes_analysis")
return "diabetes_analysis_complete"
@router(diagnose_conditions)
def hypertension_router(self):
execution_order.append("hypertension_router")
if "H" in self.state["diagnosed_conditions"]:
return "hypertension"
return None
@listen("hypertension")
def hypertension_analysis(self):
execution_order.append("hypertension_analysis")
return "hypertension_analysis_complete"
@router(diagnose_conditions)
def anemia_router(self):
execution_order.append("anemia_router")
if "A" in self.state["diagnosed_conditions"]:
return "anemia"
return None
@listen("anemia")
def anemia_analysis(self):
execution_order.append("anemia_analysis")
return "anemia_analysis_complete"
flow = MultiRouterFlow()
flow.kickoff()
# Verify all methods were called
assert "scan_medical" in execution_order
assert "diagnose_conditions" in execution_order
# Verify all routers were called
assert "diabetes_router" in execution_order
assert "hypertension_router" in execution_order
assert "anemia_router" in execution_order
# Verify all listeners were called - this is the key test for the fix
assert "diabetes_analysis" in execution_order
assert "hypertension_analysis" in execution_order
assert "anemia_analysis" in execution_order
# Verify execution order constraints
assert execution_order.index("diagnose_conditions") > execution_order.index(
"scan_medical"
)
# All routers should execute after diagnose_conditions
assert execution_order.index("diabetes_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("hypertension_router") > execution_order.index(
"diagnose_conditions"
)
assert execution_order.index("anemia_router") > execution_order.index(
"diagnose_conditions"
)
# All analyses should execute after their respective routers
assert execution_order.index("diabetes_analysis") > execution_order.index(
"diabetes_router"
)
assert execution_order.index("hypertension_analysis") > execution_order.index(
"hypertension_router"
)
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)

View File

@@ -1,130 +0,0 @@
import time
from datetime import datetime, timedelta
from unittest.mock import patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.task import Task
@pytest.fixture
def short_term_memory():
"""Fixture to create a ShortTermMemory instance"""
agent = Agent(
role="Tutor",
goal="Teach programming concepts",
backstory="You are a programming tutor helping students learn.",
tools=[],
verbose=True,
)
task = Task(
description="Explain programming concepts to students.",
expected_output="Clear explanations of programming concepts.",
agent=agent,
)
return ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
def test_memory_prioritizes_recent_topic(short_term_memory):
"""Test that memory retrieval prioritizes the most recent topic in a conversation."""
# First topic: Python variables
topic1_data = "Variables in Python are dynamically typed. You can assign any value to a variable without declaring its type."
topic1_timestamp = datetime.now() - timedelta(minutes=10) # Older memory
# Second topic: Python abstract classes
topic2_data = "Abstract classes in Python are created using the ABC module. They cannot be instantiated and are used as a blueprint for other classes."
topic2_timestamp = datetime.now() # More recent memory
# Mock search results to simulate what would be returned by RAGStorage
mock_results = [
{
"id": "2",
"metadata": {
"agent": "Tutor",
"topic": "python_abstract_classes",
"timestamp": topic2_timestamp.isoformat()
},
"context": topic2_data,
"score": 0.85, # Higher score due to recency boost
},
{
"id": "1",
"metadata": {
"agent": "Tutor",
"topic": "python_variables",
"timestamp": topic1_timestamp.isoformat()
},
"context": topic1_data,
"score": 0.75, # Lower score due to being older
}
]
# Mock the search method to return our predefined results
with patch.object(RAGStorage, 'search', return_value=mock_results):
# Query that could match both topics but should prioritize the more recent one
query = "Can you give me another example of that?"
# Search with recency consideration
results = short_term_memory.search(query)
# Verify that the most recent topic (abstract classes) is prioritized
assert len(results) > 0, "No search results returned"
# The first result should be about abstract classes (the more recent topic)
assert "abstract classes" in results[0]["context"].lower(), "Recent topic (abstract classes) not prioritized"
# If there are multiple results, check if the older topic is also returned but with lower priority
if len(results) > 1:
assert "variables" in results[1]["context"].lower(), "Older topic should be second"
# Verify that the scores reflect the recency prioritization
assert results[0]["score"] > results[1]["score"], "Recent topic should have higher score"
def test_future_timestamp_validation():
"""Test that ShortTermMemoryItem raises ValueError for future timestamps."""
# Setup agent and task for memory
agent = Agent(
role="Tutor",
goal="Teach programming concepts",
backstory="You are a programming tutor helping students learn.",
tools=[],
verbose=True,
)
task = Task(
description="Explain programming concepts to students.",
expected_output="Clear explanations of programming concepts.",
agent=agent,
)
# Create a future timestamp
future_timestamp = datetime.now() + timedelta(days=1)
# Test constructor validation
with pytest.raises(ValueError, match="Timestamp cannot be in the future"):
ShortTermMemoryItem(data="Test data", timestamp=future_timestamp)
# Test save method validation
memory = ShortTermMemory(crew=Crew(agents=[agent], tasks=[task]))
# Create a memory item with a future timestamp
future_data = "Test data with future timestamp"
# We need to pass the data directly to the save method
# The save method will create a ShortTermMemoryItem internally
# and then we'll modify its timestamp before it's saved
# Mock datetime.now to return a fixed time
with patch('crewai.memory.short_term.short_term_memory_item.datetime') as mock_datetime:
# Set up the mock to return our future timestamp when now() is called
mock_datetime.now.return_value = future_timestamp
with pytest.raises(ValueError, match="Cannot save memory item with future timestamp"):
memory.save(value=future_data)