Compare commits

...

4 Commits

Author SHA1 Message Date
Greyson LaLonde
c7b7492876 chore: fix ruff linting issues in project module 2025-09-19 14:23:01 -04:00
João Moura
9491fe8334 Adding Ability for user to get deeper observability (#3541)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* feat(tracing): enhance first-time trace display and auto-open browser

* avoinding line breaking

* set tracing if user enables it

* linted

---------

Co-authored-by: lorenzejay <lorenzejaytech@gmail.com>
2025-09-18 21:47:09 -03:00
Greyson LaLonde
6f2ea013a7 docs: update RagTool references from EmbedChain to CrewAI native RAG (#3537)
* docs: update RagTool references from EmbedChain to CrewAI native RAG

* change ref to qdrant

* docs: update RAGTool to use Qdrant and add embedding_model example
2025-09-18 16:06:44 -07:00
Greyson LaLonde
39e8792ae5 fix: add l2 distance metric support for backward compatibility (#3540) 2025-09-18 18:36:33 -04:00
8 changed files with 113 additions and 78 deletions

View File

@@ -9,7 +9,7 @@ mode: "wide"
## Description
The `RagTool` is designed to answer questions by leveraging the power of Retrieval-Augmented Generation (RAG) through EmbedChain.
The `RagTool` is designed to answer questions by leveraging the power of Retrieval-Augmented Generation (RAG) through CrewAI's native RAG system.
It provides a dynamic knowledge base that can be queried to retrieve relevant information from various data sources.
This tool is particularly useful for applications that require access to a vast array of information and need to provide contextually relevant answers.
@@ -76,8 +76,8 @@ The `RagTool` can be used with a wide variety of data sources, including:
The `RagTool` accepts the following parameters:
- **summarize**: Optional. Whether to summarize the retrieved content. Default is `False`.
- **adapter**: Optional. A custom adapter for the knowledge base. If not provided, an EmbedchainAdapter will be used.
- **config**: Optional. Configuration for the underlying EmbedChain App.
- **adapter**: Optional. A custom adapter for the knowledge base. If not provided, a CrewAIRagAdapter will be used.
- **config**: Optional. Configuration for the underlying CrewAI RAG system.
## Adding Content
@@ -130,44 +130,23 @@ from crewai_tools import RagTool
# Create a RAG tool with custom configuration
config = {
"app": {
"name": "custom_app",
},
"llm": {
"provider": "openai",
"vectordb": {
"provider": "qdrant",
"config": {
"model": "gpt-4",
"collection_name": "my-collection"
}
},
"embedding_model": {
"provider": "openai",
"config": {
"model": "text-embedding-ada-002"
"model": "text-embedding-3-small"
}
},
"vectordb": {
"provider": "elasticsearch",
"config": {
"collection_name": "my-collection",
"cloud_id": "deployment-name:xxxx",
"api_key": "your-key",
"verify_certs": False
}
},
"chunker": {
"chunk_size": 400,
"chunk_overlap": 100,
"length_function": "len",
"min_chunk_size": 0
}
}
rag_tool = RagTool(config=config, summarize=True)
```
The internal RAG tool utilizes the Embedchain adapter, allowing you to pass any configuration options that are supported by Embedchain.
You can refer to the [Embedchain documentation](https://docs.embedchain.ai/components/introduction) for details.
Make sure to review the configuration options available in the .yaml file.
## Conclusion
The `RagTool` provides a powerful way to create and query knowledge bases from various data sources. By leveraging Retrieval-Augmented Generation, it enables agents to access and retrieve relevant information efficiently, enhancing their ability to provide accurate and contextually appropriate responses.

View File

@@ -1,5 +1,7 @@
import logging
import uuid
import webbrowser
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
@@ -14,6 +16,47 @@ from crewai.events.listeners.tracing.utils import (
logger = logging.getLogger(__name__)
def _update_or_create_env_file():
"""Update or create .env file with CREWAI_TRACING_ENABLED=true."""
env_path = Path(".env")
env_content = ""
variable_name = "CREWAI_TRACING_ENABLED"
variable_value = "true"
# Read existing content if file exists
if env_path.exists():
with open(env_path, "r") as f:
env_content = f.read()
# Check if CREWAI_TRACING_ENABLED is already set
lines = env_content.splitlines()
variable_exists = False
updated_lines = []
for line in lines:
if line.strip().startswith(f"{variable_name}="):
# Update existing variable
updated_lines.append(f"{variable_name}={variable_value}")
variable_exists = True
else:
updated_lines.append(line)
# Add variable if it doesn't exist
if not variable_exists:
if updated_lines and not updated_lines[-1].strip():
# If last line is empty, replace it
updated_lines[-1] = f"{variable_name}={variable_value}"
else:
# Add new line and then the variable
updated_lines.append(f"{variable_name}={variable_value}")
# Write updated content
with open(env_path, "w") as f:
f.write("\n".join(updated_lines))
if updated_lines: # Add final newline if there's content
f.write("\n")
class FirstTimeTraceHandler:
"""Handles the first-time user trace collection and display flow."""
@@ -48,6 +91,12 @@ class FirstTimeTraceHandler:
if user_wants_traces:
self._initialize_backend_and_send_events()
# Enable tracing for future runs by updating .env file
try:
_update_or_create_env_file()
except Exception: # noqa: S110
pass
if self.ephemeral_url:
self._display_ephemeral_trace_link()
@@ -108,9 +157,14 @@ class FirstTimeTraceHandler:
self._gracefully_fail(f"Backend initialization failed: {e}")
def _display_ephemeral_trace_link(self):
"""Display the ephemeral trace link to the user."""
"""Display the ephemeral trace link to the user and automatically open browser."""
console = Console()
try:
webbrowser.open(self.ephemeral_url)
except Exception: # noqa: S110
pass
panel_content = f"""
🎉 Your First CrewAI Execution Trace is Ready!
@@ -123,7 +177,8 @@ This trace shows:
• Tool usage and results
• LLM calls and responses
To use traces add tracing=True to your Crew(tracing=True) / Flow(tracing=True)
✅ Tracing has been enabled for future runs! (CREWAI_TRACING_ENABLED=true added to .env)
You can also add tracing=True to your Crew(tracing=True) / Flow(tracing=True) for more control.
📝 Note: This link will expire in 24 hours.
""".strip()
@@ -158,8 +213,8 @@ Unfortunately, we couldn't upload them to the server right now, but here's what
• Execution duration: {self.batch_manager.calculate_duration("execution")}ms
• Batch ID: {self.batch_manager.trace_batch_id}
Tracing has been enabled for future runs! (CREWAI_TRACING_ENABLED=true added to .env)
The traces include agent decisions, task execution, and tool usage.
Try running with CREWAI_TRACING_ENABLED=true next time for persistent traces.
""".strip()
panel = Panel(

View File

@@ -138,13 +138,6 @@ class TraceBatchManager:
if not use_ephemeral
else response_data["ephemeral_trace_id"]
)
console = Console()
panel = Panel(
f"✅ Trace batch initialized with session ID: {self.trace_batch_id}",
title="Trace Batch Initialization",
border_style="green",
)
console.print(panel)
else:
logger.warning(
f"Trace batch initialization returned status {response.status_code}. Continuing without tracing."
@@ -258,12 +251,23 @@ class TraceBatchManager:
if self.is_current_batch_ephemeral:
self.ephemeral_trace_url = return_link
# Create a properly formatted message with URL on its own line
message_parts = [
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}",
"",
f"🔗 View here: {return_link}",
]
if access_code:
message_parts.append(f"🔑 Access Code: {access_code}")
panel = Panel(
f"✅ Trace batch finalized with session ID: {self.trace_batch_id}. View here: {return_link} {f', Access Code: {access_code}' if access_code else ''}",
"\n".join(message_parts),
title="Trace Batch Finalization",
border_style="green",
)
console.print(panel)
if not should_auto_collect_first_time_traces():
console.print(panel)
else:
logger.error(

View File

@@ -2,7 +2,7 @@ import json
import logging
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any
from crewai.task import Task
from crewai.utilities import Printer
@@ -18,7 +18,7 @@ class KickoffTaskOutputsSQLiteStorage:
An updated SQLite storage class for kickoff task outputs storage.
"""
def __init__(self, db_path: Optional[str] = None) -> None:
def __init__(self, db_path: str | None = None) -> None:
if db_path is None:
# Get the parent directory of the default db path and create our db file there
db_path = str(Path(db_storage_path()) / "latest_kickoff_task_outputs.db")
@@ -57,15 +57,15 @@ class KickoffTaskOutputsSQLiteStorage:
except sqlite3.Error as e:
error_msg = DatabaseError.format_error(DatabaseError.INIT_ERROR, e)
logger.error(error_msg)
raise DatabaseOperationError(error_msg, e)
raise DatabaseOperationError(error_msg, e) from e
def add(
self,
task: Task,
output: Dict[str, Any],
output: dict[str, Any],
task_index: int,
was_replayed: bool = False,
inputs: Dict[str, Any] | None = None,
inputs: dict[str, Any] | None = None,
) -> None:
"""Add a new task output record to the database.
@@ -103,7 +103,7 @@ class KickoffTaskOutputsSQLiteStorage:
except sqlite3.Error as e:
error_msg = DatabaseError.format_error(DatabaseError.SAVE_ERROR, e)
logger.error(error_msg)
raise DatabaseOperationError(error_msg, e)
raise DatabaseOperationError(error_msg, e) from e
def update(
self,
@@ -138,7 +138,7 @@ class KickoffTaskOutputsSQLiteStorage:
else value
)
query = f"UPDATE latest_kickoff_task_outputs SET {', '.join(fields)} WHERE task_index = ?" # nosec
query = f"UPDATE latest_kickoff_task_outputs SET {', '.join(fields)} WHERE task_index = ?" # nosec # noqa: S608
values.append(task_index)
cursor.execute(query, tuple(values))
@@ -151,9 +151,9 @@ class KickoffTaskOutputsSQLiteStorage:
except sqlite3.Error as e:
error_msg = DatabaseError.format_error(DatabaseError.UPDATE_ERROR, e)
logger.error(error_msg)
raise DatabaseOperationError(error_msg, e)
raise DatabaseOperationError(error_msg, e) from e
def load(self) -> List[Dict[str, Any]]:
def load(self) -> list[dict[str, Any]]:
"""Load all task output records from the database.
Returns:
@@ -192,7 +192,7 @@ class KickoffTaskOutputsSQLiteStorage:
except sqlite3.Error as e:
error_msg = DatabaseError.format_error(DatabaseError.LOAD_ERROR, e)
logger.error(error_msg)
raise DatabaseOperationError(error_msg, e)
raise DatabaseOperationError(error_msg, e) from e
def delete_all(self) -> None:
"""Delete all task output records from the database.
@@ -212,4 +212,4 @@ class KickoffTaskOutputsSQLiteStorage:
except sqlite3.Error as e:
error_msg = DatabaseError.format_error(DatabaseError.DELETE_ERROR, e)
logger.error(error_msg)
raise DatabaseOperationError(error_msg, e)
raise DatabaseOperationError(error_msg, e) from e

View File

@@ -1,7 +1,7 @@
import json
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from typing import Any
from crewai.utilities import Printer
from crewai.utilities.paths import db_storage_path
@@ -12,9 +12,7 @@ class LTMSQLiteStorage:
An updated SQLite storage class for LTM data storage.
"""
def __init__(
self, db_path: Optional[str] = None
) -> None:
def __init__(self, db_path: str | None = None) -> None:
if db_path is None:
# Get the parent directory of the default db path and create our db file there
db_path = str(Path(db_storage_path()) / "long_term_memory_storage.db")
@@ -53,9 +51,9 @@ class LTMSQLiteStorage:
def save(
self,
task_description: str,
metadata: Dict[str, Any],
metadata: dict[str, Any],
datetime: str,
score: Union[int, float],
score: int | float,
) -> None:
"""Saves data to the LTM table with error handling."""
try:
@@ -75,9 +73,7 @@ class LTMSQLiteStorage:
color="red",
)
def load(
self, task_description: str, latest_n: int
) -> Optional[List[Dict[str, Any]]]:
def load(self, task_description: str, latest_n: int) -> list[dict[str, Any]] | None:
"""Queries the LTM table by task description with error handling."""
try:
with sqlite3.connect(self.db_path) as conn:
@@ -89,7 +85,7 @@ class LTMSQLiteStorage:
WHERE task_description = ?
ORDER BY datetime DESC, score ASC
LIMIT {latest_n}
""", # nosec
""", # nosec # noqa: S608
(task_description,),
)
rows = cursor.fetchall()
@@ -125,4 +121,4 @@ class LTMSQLiteStorage:
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)
return None
return

View File

@@ -14,16 +14,16 @@ from .annotations import (
from .crew_base import CrewBase
__all__ = [
"CrewBase",
"after_kickoff",
"agent",
"before_kickoff",
"cache_handler",
"callback",
"crew",
"task",
"llm",
"output_json",
"output_pydantic",
"task",
"tool",
"callback",
"CrewBase",
"llm",
"cache_handler",
"before_kickoff",
"after_kickoff",
]

View File

@@ -1,5 +1,5 @@
from collections.abc import Callable
from functools import wraps
from typing import Callable
from crewai import Crew
from crewai.project.utils import memoize
@@ -36,15 +36,13 @@ def task(func):
def agent(func):
"""Marks a method as a crew agent."""
func.is_agent = True
func = memoize(func)
return func
return memoize(func)
def llm(func):
"""Marks a method as an LLM provider."""
func.is_llm = True
func = memoize(func)
return func
return memoize(func)
def output_json(cls):
@@ -91,7 +89,7 @@ def crew(func) -> Callable[..., Crew]:
agents = self._original_agents.items()
# Instantiate tasks in order
for task_name, task_method in tasks:
for _task_name, task_method in tasks:
task_instance = task_method(self)
instantiated_tasks.append(task_instance)
agent_instance = getattr(task_instance, "agent", None)
@@ -100,7 +98,7 @@ def crew(func) -> Callable[..., Crew]:
agent_roles.add(agent_instance.role)
# Instantiate agents not included by tasks
for agent_name, agent_method in agents:
for _agent_name, agent_method in agents:
agent_instance = agent_method(self)
if agent_instance.role not in agent_roles:
instantiated_agents.append(agent_instance)
@@ -117,9 +115,9 @@ def crew(func) -> Callable[..., Crew]:
return wrapper
for _, callback in self._before_kickoff.items():
for callback in self._before_kickoff.values():
crew.before_kickoff_callbacks.append(callback_wrapper(callback, self))
for _, callback in self._after_kickoff.items():
for callback in self._after_kickoff.values():
crew.after_kickoff_callbacks.append(callback_wrapper(callback, self))
return crew

View File

@@ -133,6 +133,9 @@ def _convert_distance_to_score(
if distance_metric == "cosine":
score = 1.0 - 0.5 * distance
return max(0.0, min(1.0, score))
if distance_metric == "l2":
score = 1.0 / (1.0 + distance)
return max(0.0, min(1.0, score))
raise ValueError(f"Unsupported distance metric: {distance_metric}")