Compare commits

..

22 Commits

Author SHA1 Message Date
Lorenze Jay
87fca32d5d Merge branch 'main' into feat/improve-yaml-extraction 2025-03-21 17:12:35 -07:00
Matisse
bb3829a9ed docs: Update model reference in LLM configuration (#2267)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 15:12:26 -04:00
Fernando Galves
0a116202f0 Update the context window size for Amazon Bedrock FM- llm.py (#2304)
Update the context window size for Amazon Bedrock Foundation Models.

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-03-21 14:48:25 -04:00
Stefano Baccianella
4daa88fa59 As explained in https://github.com/mangiucugna/json_repair?tab=readme-ov-file#performance-considerations we can skip a wasteful json.loads() here and save quite some time (#2397)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-03-21 14:25:19 -04:00
Parth Patel
53067f8b92 add Mem0 OSS support (#2429)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 13:57:24 -04:00
Saurabh Misra
d3a09c3180 ️ Speed up method CrewAgentParser._clean_action by 427,565% (#2382)
Here is the optimized version of the program.

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 13:51:14 -04:00
Saurabh Misra
4d7aacb5f2 ️ Speed up method Repository.is_git_repo by 72,270% (#2381)
Here is the optimized version of the `Repository` class.

Co-authored-by: codeflash-ai[bot] <148906541+codeflash-ai[bot]@users.noreply.github.com>
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 13:43:48 -04:00
Julio Peixoto
6b1cf78e41 docs: add detailed docstrings to Telemetry class methods (#2377)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 13:34:16 -04:00
Patcher
80f1a88b63 Upgrade OTel SDK version to 1.30.0 (#2375)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 13:26:50 -04:00
Jorge Gonzalez
32da76a2ca Use task in the note about how methods names need to match task names (#2355)
The note is about the task but mentions the agent incorrectly.

Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 13:17:43 -04:00
Gustavo Satheler
3aa48dcd58 fix: move agent tools for a variable instead of use format (#2319)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-03-21 12:32:54 -04:00
Brandon Hancock
7a07972ebe Improve test 2025-03-21 11:27:16 -04:00
Brandon Hancock (bhancock_ai)
5a755fad48 Merge branch 'main' into feat/improve-yaml-extraction 2025-03-21 11:20:28 -04:00
Brandon Hancock
deeb0446e5 Fixing interpolation imports 2025-03-21 11:17:16 -04:00
Brandon Hancock
d16144bbaa fix circular deps 2025-03-21 11:17:16 -04:00
Brandon Hancock (bhancock_ai)
c852d772d4 Merge branch 'main' into feat/improve-yaml-extraction 2025-03-20 15:25:51 -04:00
Brandon Hancock
1589e7ce69 make tests 2025-03-20 11:03:39 -04:00
Brandon Hancock
0e4e24d7d1 update interpolation to work with example response types in yaml docs 2025-03-20 10:58:18 -04:00
Vini Brasil
c62e6def62 Merge branch 'main' into event-handler-wildcard 2025-03-20 10:50:12 -03:00
Vinicius Brasil
e17911867d Remove unused variable 2025-03-20 10:45:39 -03:00
Vinicius Brasil
fae1cfdc89 Fix failing test 2025-03-20 10:45:39 -03:00
Vinicius Brasil
032922a4b9 Support wildcard handling in emit()
Change `emit()` to call handlers registered for parent classes using
`isinstance()`. Ensures that base event handlers receive derived
events.
2025-03-20 10:45:39 -03:00
20 changed files with 694 additions and 610 deletions

View File

@@ -59,7 +59,7 @@ There are three ways to configure LLMs in CrewAI. Choose the method that best fi
goal: Conduct comprehensive research and analysis
backstory: A dedicated research professional with years of experience
verbose: true
llm: openai/gpt-4o-mini # your model here
llm: openai/gpt-4o-mini # your model here
# (see provider configuration examples below for more)
```
@@ -111,7 +111,7 @@ There are three ways to configure LLMs in CrewAI. Choose the method that best fi
## Provider Configuration Examples
CrewAI supports a multitude of LLM providers, each offering unique features, authentication methods, and model capabilities.
CrewAI supports a multitude of LLM providers, each offering unique features, authentication methods, and model capabilities.
In this section, you'll find detailed examples that help you select, configure, and optimize the LLM that best fits your project's needs.
<AccordionGroup>
@@ -121,7 +121,7 @@ In this section, you'll find detailed examples that help you select, configure,
```toml Code
# Required
OPENAI_API_KEY=sk-...
# Optional
OPENAI_API_BASE=<custom-base-url>
OPENAI_ORGANIZATION=<your-org-id>
@@ -226,7 +226,7 @@ In this section, you'll find detailed examples that help you select, configure,
AZURE_API_KEY=<your-api-key>
AZURE_API_BASE=<your-resource-url>
AZURE_API_VERSION=<api-version>
# Optional
AZURE_AD_TOKEN=<your-azure-ad-token>
AZURE_API_TYPE=<your-azure-api-type>
@@ -289,7 +289,7 @@ In this section, you'll find detailed examples that help you select, configure,
| Mistral 8x7B Instruct | Up to 32k tokens | An MOE LLM that follows instructions, completes requests, and generates creative text. |
</Accordion>
<Accordion title="Amazon SageMaker">
```toml Code
AWS_ACCESS_KEY_ID=<your-access-key>
@@ -474,7 +474,7 @@ In this section, you'll find detailed examples that help you select, configure,
WATSONX_URL=<your-url>
WATSONX_APIKEY=<your-apikey>
WATSONX_PROJECT_ID=<your-project-id>
# Optional
WATSONX_TOKEN=<your-token>
WATSONX_DEPLOYMENT_SPACE_ID=<your-space-id>
@@ -491,7 +491,7 @@ In this section, you'll find detailed examples that help you select, configure,
<Accordion title="Ollama (Local LLMs)">
1. Install Ollama: [ollama.ai](https://ollama.ai/)
2. Run a model: `ollama run llama2`
2. Run a model: `ollama run llama3`
3. Configure:
```python Code
@@ -600,7 +600,7 @@ In this section, you'll find detailed examples that help you select, configure,
```toml Code
OPENROUTER_API_KEY=<your-api-key>
```
Example usage in your CrewAI project:
```python Code
llm = LLM(
@@ -723,7 +723,7 @@ Learn how to get the most out of your LLM configuration:
- Small tasks (up to 4K tokens): Standard models
- Medium tasks (between 4K-32K): Enhanced models
- Large tasks (over 32K): Large context models
```python
# Configure model with appropriate settings
llm = LLM(
@@ -760,11 +760,11 @@ Learn how to get the most out of your LLM configuration:
<Warning>
Most authentication issues can be resolved by checking API key format and environment variable names.
</Warning>
```bash
# OpenAI
OPENAI_API_KEY=sk-...
# Anthropic
ANTHROPIC_API_KEY=sk-ant-...
```
@@ -773,11 +773,11 @@ Learn how to get the most out of your LLM configuration:
<Check>
Always include the provider prefix in model names
</Check>
```python
# Correct
llm = LLM(model="openai/gpt-4")
# Incorrect
llm = LLM(model="gpt-4")
```
@@ -786,5 +786,10 @@ Learn how to get the most out of your LLM configuration:
<Tip>
Use larger context models for extensive tasks
</Tip>
```python
# Large context model
llm = LLM(model="openai/gpt-4o") # 128K tokens
```
</Tab>
</Tabs>

View File

@@ -300,7 +300,7 @@ email_summarizer:
```
<Tip>
Note how we use the same name for the agent in the `tasks.yaml` (`email_summarizer_task`) file as the method name in the `crew.py` (`email_summarizer_task`) file.
Note how we use the same name for the task in the `tasks.yaml` (`email_summarizer_task`) file as the method name in the `crew.py` (`email_summarizer_task`) file.
</Tip>
```yaml tasks.yaml

View File

@@ -17,9 +17,9 @@ dependencies = [
"pdfplumber>=0.11.4",
"regex>=2024.9.11",
# Telemetry and Monitoring
"opentelemetry-api>=1.22.0",
"opentelemetry-sdk>=1.22.0",
"opentelemetry-exporter-otlp-proto-http>=1.22.0",
"opentelemetry-api>=1.30.0",
"opentelemetry-sdk>=1.30.0",
"opentelemetry-exporter-otlp-proto-http>=1.30.0",
# Data Handling
"chromadb>=0.5.23",
"openpyxl>=3.1.5",

View File

@@ -25,6 +25,7 @@ from crewai.tools.base_tool import BaseTool, Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter
from crewai.utilities.string_utils import interpolate_only
T = TypeVar("T", bound="BaseAgent")
@@ -333,9 +334,15 @@ class BaseAgent(ABC, BaseModel):
self._original_backstory = self.backstory
if inputs:
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
self.role = interpolate_only(
input_string=self._original_role, inputs=inputs
)
self.goal = interpolate_only(
input_string=self._original_goal, inputs=inputs
)
self.backstory = interpolate_only(
input_string=self._original_backstory, inputs=inputs
)
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.

View File

@@ -136,7 +136,7 @@ class CrewAgentParser:
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""
return re.sub(r"^\s*\*+\s*|\s*\*+\s*$", "", text).strip()
return text.strip().strip("*").strip()
def _safe_repair_json(self, tool_input: str) -> str:
UNABLE_TO_REPAIR_JSON_RESULTS = ['""', "{}"]

View File

@@ -1,4 +1,5 @@
import subprocess
from functools import lru_cache
class Repository:
@@ -35,6 +36,7 @@ class Repository:
encoding="utf-8",
).strip()
@lru_cache(maxsize=None)
def is_git_repo(self) -> bool:
"""Check if the current directory is a git repository."""
try:

View File

@@ -4,34 +4,13 @@ import io
import logging
import os
import shutil
import warnings
from typing import Any, Dict, List, Optional, Union, cast
# Initialize module import status
CHROMADB_AVAILABLE = False
# Define placeholder types
class DummyClientAPI:
pass
class DummySettings:
pass
# Try to import chromadb-related modules with proper error handling
try:
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
CHROMADB_AVAILABLE = True
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import chromadb: {str(e)}. Knowledge functionality will be limited.")
# Use dummy classes when imports fail
chromadb = None
ClientAPI = DummyClientAPI
OneOrMany = Any
Settings = DummySettings
import chromadb
import chromadb.errors
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
@@ -63,9 +42,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
search efficiency.
"""
collection = None # Type annotation removed to handle case when chromadb is not available
collection: Optional[chromadb.Collection] = None
collection_name: Optional[str] = "knowledge"
app = None # Type annotation removed to handle case when chromadb is not available
app: Optional[ClientAPI] = None
def __init__(
self,
@@ -82,52 +61,37 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
if not CHROMADB_AVAILABLE:
logging.warning("Cannot search knowledge as chromadb is not available.")
return []
with suppress_logging():
if self.collection:
try:
fetched = self.collection.query(
query_texts=query,
n_results=limit,
where=filter,
)
results = []
for i in range(len(fetched["ids"][0])): # type: ignore
result = {
"id": fetched["ids"][0][i], # type: ignore
"metadata": fetched["metadatas"][0][i], # type: ignore
"context": fetched["documents"][0][i], # type: ignore
"score": fetched["distances"][0][i], # type: ignore
}
if result["score"] >= score_threshold:
results.append(result)
return results
except Exception as e:
logging.error(f"Error during knowledge search: {str(e)}")
return []
fetched = self.collection.query(
query_texts=query,
n_results=limit,
where=filter,
)
results = []
for i in range(len(fetched["ids"][0])): # type: ignore
result = {
"id": fetched["ids"][0][i], # type: ignore
"metadata": fetched["metadatas"][0][i], # type: ignore
"context": fetched["documents"][0][i], # type: ignore
"score": fetched["distances"][0][i], # type: ignore
}
if result["score"] >= score_threshold:
results.append(result)
return results
else:
logging.warning("Collection not initialized")
return []
raise Exception("Collection not initialized")
def initialize_knowledge_storage(self):
if not CHROMADB_AVAILABLE:
logging.warning("Cannot initialize knowledge storage as chromadb is not available.")
self.app = None
self.collection = None
return
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
try:
base_path = os.path.join(db_storage_path(), "knowledge")
chroma_client = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app = chroma_client
collection_name = (
f"knowledge_{self.collection_name}"
if self.collection_name
@@ -138,46 +102,30 @@ class KnowledgeStorage(BaseKnowledgeStorage):
name=collection_name, embedding_function=self.embedder
)
else:
logging.warning("Vector Database Client not initialized")
self.collection = None
except Exception as e:
logging.error(f"Failed to create or get collection: {str(e)}")
self.app = None
self.collection = None
raise Exception("Vector Database Client not initialized")
except Exception:
raise Exception("Failed to create or get collection")
def reset(self):
if not CHROMADB_AVAILABLE:
logging.warning("Cannot reset knowledge storage as chromadb is not available.")
return
try:
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
base_path = os.path.join(db_storage_path(), KNOWLEDGE_DIRECTORY)
if not self.app:
self.app = chromadb.PersistentClient(
path=base_path,
settings=Settings(allow_reset=True),
)
self.app.reset()
shutil.rmtree(base_path)
except Exception as e:
logging.error(f"Error during knowledge reset: {str(e)}")
finally:
self.app = None
self.collection = None
self.app.reset()
shutil.rmtree(base_path)
self.app = None
self.collection = None
def save(
self,
documents: List[str],
metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
):
if not CHROMADB_AVAILABLE:
logging.warning("Cannot save to knowledge storage as chromadb is not available.")
return
if not self.collection:
logging.warning("Collection not initialized")
return
raise Exception("Collection not initialized")
try:
# Create a dictionary to store unique documents
@@ -206,46 +154,38 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filtered_ids.append(doc_id)
# If we have no metadata at all, set it to None
final_metadata = None
if not all(m is None for m in filtered_metadata):
final_metadata = filtered_metadata
final_metadata: Optional[OneOrMany[chromadb.Metadata]] = (
None if all(m is None for m in filtered_metadata) else filtered_metadata
)
self.collection.upsert(
documents=filtered_docs,
metadatas=final_metadata,
ids=filtered_ids,
)
except chromadb.errors.InvalidDimensionException as e:
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
raise ValueError(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
) from e
except Exception as e:
if hasattr(chromadb, 'errors') and isinstance(e, chromadb.errors.InvalidDimensionException):
Logger(verbose=True).log(
"error",
"Embedding dimension mismatch. This usually happens when mixing different embedding models. Try resetting the collection using `crewai reset-memories -a`",
"red",
)
logging.error(
"Embedding dimension mismatch. Make sure you're using the same embedding model "
"across all operations with this collection."
"Try resetting the collection using `crewai reset-memories -a`"
)
else:
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
logging.error(f"Failed to upsert documents: {e}")
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
def _create_default_embedding_function(self):
if not CHROMADB_AVAILABLE:
return None
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except (ImportError, AttributeError) as e:
logging.warning(f"Failed to create default embedding function: {str(e)}")
return None
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
def _set_embedder_config(self, embedder: Optional[Dict[str, Any]] = None) -> None:
"""Set the embedding configuration for the knowledge storage.
@@ -254,12 +194,8 @@ class KnowledgeStorage(BaseKnowledgeStorage):
embedder_config (Optional[Dict[str, Any]]): Configuration dictionary for the embedder.
If None or empty, defaults to the default embedding function.
"""
try:
self.embedder = (
EmbeddingConfigurator().configure_embedder(embedder)
if embedder
else self._create_default_embedding_function()
)
except Exception as e:
logging.warning(f"Failed to configure embedder: {str(e)}")
self.embedder = None
self.embedder = (
EmbeddingConfigurator().configure_embedder(embedder)
if embedder
else self._create_default_embedding_function()
)

View File

@@ -114,6 +114,60 @@ LLM_CONTEXT_WINDOW_SIZES = {
"Llama-3.2-11B-Vision-Instruct": 16384,
"Meta-Llama-3.2-3B-Instruct": 4096,
"Meta-Llama-3.2-1B-Instruct": 16384,
# bedrock
"us.amazon.nova-pro-v1:0": 300000,
"us.amazon.nova-micro-v1:0": 128000,
"us.amazon.nova-lite-v1:0": 300000,
"us.anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"us.anthropic.claude-3-5-haiku-20241022-v1:0": 200000,
"us.anthropic.claude-3-5-sonnet-20241022-v2:0": 200000,
"us.anthropic.claude-3-7-sonnet-20250219-v1:0": 200000,
"us.anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"us.anthropic.claude-3-opus-20240229-v1:0": 200000,
"us.anthropic.claude-3-haiku-20240307-v1:0": 200000,
"us.meta.llama3-2-11b-instruct-v1:0": 128000,
"us.meta.llama3-2-3b-instruct-v1:0": 131000,
"us.meta.llama3-2-90b-instruct-v1:0": 128000,
"us.meta.llama3-2-1b-instruct-v1:0": 131000,
"us.meta.llama3-1-8b-instruct-v1:0": 128000,
"us.meta.llama3-1-70b-instruct-v1:0": 128000,
"us.meta.llama3-3-70b-instruct-v1:0": 128000,
"us.meta.llama3-1-405b-instruct-v1:0": 128000,
"eu.anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"eu.anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"eu.anthropic.claude-3-haiku-20240307-v1:0": 200000,
"eu.meta.llama3-2-3b-instruct-v1:0": 131000,
"eu.meta.llama3-2-1b-instruct-v1:0": 131000,
"apac.anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"apac.anthropic.claude-3-5-sonnet-20241022-v2:0": 200000,
"apac.anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"apac.anthropic.claude-3-haiku-20240307-v1:0": 200000,
"amazon.nova-pro-v1:0": 300000,
"amazon.nova-micro-v1:0": 128000,
"amazon.nova-lite-v1:0": 300000,
"anthropic.claude-3-5-sonnet-20240620-v1:0": 200000,
"anthropic.claude-3-5-haiku-20241022-v1:0": 200000,
"anthropic.claude-3-5-sonnet-20241022-v2:0": 200000,
"anthropic.claude-3-7-sonnet-20250219-v1:0": 200000,
"anthropic.claude-3-sonnet-20240229-v1:0": 200000,
"anthropic.claude-3-opus-20240229-v1:0": 200000,
"anthropic.claude-3-haiku-20240307-v1:0": 200000,
"anthropic.claude-v2:1": 200000,
"anthropic.claude-v2": 100000,
"anthropic.claude-instant-v1": 100000,
"meta.llama3-1-405b-instruct-v1:0": 128000,
"meta.llama3-1-70b-instruct-v1:0": 128000,
"meta.llama3-1-8b-instruct-v1:0": 128000,
"meta.llama3-70b-instruct-v1:0": 8000,
"meta.llama3-8b-instruct-v1:0": 8000,
"amazon.titan-text-lite-v1": 4000,
"amazon.titan-text-express-v1": 8000,
"cohere.command-text-v14": 4000,
"ai21.j2-mid-v1": 8191,
"ai21.j2-ultra-v1": 8191,
"ai21.jamba-instruct-v1:0": 256000,
"mistral.mistral-7b-instruct-v0:2": 32000,
"mistral.mixtral-8x7b-instruct-v0:1": 32000,
# mistral
"mistral-tiny": 32768,
"mistral-small-latest": 32768,

View File

@@ -1,7 +1,7 @@
import os
from typing import Any, Dict, List
from mem0 import MemoryClient
from mem0 import Memory, MemoryClient
from crewai.memory.storage.interface import Storage
@@ -32,13 +32,16 @@ class Mem0Storage(Storage):
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
# Initialize MemoryClient with available parameters
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
if mem0_org_id and mem0_project_id:
self.memory = MemoryClient(
api_key=mem0_api_key, org_id=mem0_org_id, project_id=mem0_project_id
)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
else:
self.memory = MemoryClient(api_key=mem0_api_key)
self.memory = Memory() # Fallback to Memory if no Mem0 API key is provided
def _sanitize_role(self, role: str) -> str:
"""

View File

@@ -60,31 +60,25 @@ class RAGStorage(BaseRAGStorage):
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
self.app = chroma_client
try:
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except (ImportError, AttributeError) as e:
import logging
logging.warning(f"Failed to initialize chromadb: {str(e)}. Memory functionality will be limited.")
self.app = None
self.collection = None
def _sanitize_role(self, role: str) -> str:
"""
@@ -109,9 +103,6 @@ class RAGStorage(BaseRAGStorage):
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
if self.app is None or self.collection is None:
logging.warning("Cannot save to memory as chromadb is not available.")
return
try:
self._generate_embedding(value, metadata)
except Exception as e:
@@ -124,12 +115,8 @@ class RAGStorage(BaseRAGStorage):
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Any]:
if not hasattr(self, "app") or not hasattr(self, "collection"):
if not hasattr(self, "app"):
self._initialize_app()
if self.app is None or self.collection is None:
logging.warning("Cannot search memory as chromadb is not available.")
return []
try:
with suppress_logging():
@@ -154,10 +141,6 @@ class RAGStorage(BaseRAGStorage):
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
if self.app is None or self.collection is None:
logging.warning("Cannot generate embeddings as chromadb is not available.")
return
self.collection.add(
documents=[text],
@@ -177,7 +160,15 @@ class RAGStorage(BaseRAGStorage):
# Ignore this specific error
pass
else:
logging.error(f"An error occurred while resetting the {self.type} memory: {e}")
# Don't raise exception to prevent crashes
self.app = None
self.collection = None
raise Exception(
f"An error occurred while resetting the {self.type} memory: {e}"
)
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)

View File

@@ -2,6 +2,7 @@ import datetime
import inspect
import json
import logging
import re
import threading
import uuid
from concurrent.futures import Future
@@ -49,6 +50,7 @@ from crewai.utilities.events import (
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import interpolate_only
class Task(BaseModel):
@@ -507,7 +509,9 @@ class Task(BaseModel):
return
try:
self.description = self._original_description.format(**inputs)
self.description = interpolate_only(
input_string=self._original_description, inputs=inputs
)
except KeyError as e:
raise ValueError(
f"Missing required template variable '{e.args[0]}' in description"
@@ -516,7 +520,7 @@ class Task(BaseModel):
raise ValueError(f"Error interpolating description: {str(e)}") from e
try:
self.expected_output = self.interpolate_only(
self.expected_output = interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
@@ -524,7 +528,7 @@ class Task(BaseModel):
if self.output_file is not None:
try:
self.output_file = self.interpolate_only(
self.output_file = interpolate_only(
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
@@ -555,72 +559,6 @@ class Task(BaseModel):
f"\n\n{conversation_instruction}\n\n{conversation_history}"
)
def interpolate_only(
self,
input_string: Optional[str],
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, floats, and dicts/lists
containing only these types and other nested dicts/lists.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a value contains unsupported types
"""
# Validation function for recursive type checking
def validate_type(value: Any) -> None:
if value is None:
return
if isinstance(value, (str, int, float, bool)):
return
if isinstance(value, (dict, list)):
for item in value.values() if isinstance(value, dict) else value:
validate_type(item)
return
raise ValueError(
f"Unsupported type {type(value).__name__} in inputs. "
"Only str, int, float, bool, dict, and list are allowed."
)
# Validate all input values
for key, value in inputs.items():
try:
validate_type(value)
except ValueError as e:
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
if input_string is None or not input_string:
return ""
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
try:
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
for key in inputs.keys():
escaped_string = escaped_string.replace(f"{{{{{key}}}}}", f"{{{key}}}")
return escaped_string.format(**inputs)
except KeyError as e:
raise KeyError(
f"Template variable '{e.args[0]}' not found in inputs dictionary"
) from e
except ValueError as e:
raise ValueError(f"Error during string interpolation: {str(e)}") from e
def increment_tools_errors(self) -> None:
"""Increment the tools errors counter."""
self.tools_errors += 1

View File

@@ -281,8 +281,16 @@ class Telemetry:
return self._safe_telemetry_operation(operation)
def task_ended(self, span: Span, task: Task, crew: Crew):
"""Records task execution in a crew."""
"""Records the completion of a task execution in a crew.
Args:
span (Span): The OpenTelemetry span tracking the task execution
task (Task): The task that was completed
crew (Crew): The crew context in which the task was executed
Note:
If share_crew is enabled, this will also record the task output
"""
def operation():
if crew.share_crew:
self._add_attribute(
@@ -297,8 +305,13 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the repeated usage 'error' of a tool by an agent."""
"""Records when a tool is used repeatedly, which might indicate an issue.
Args:
llm (Any): The language model being used
tool_name (str): Name of the tool being repeatedly used
attempts (int): Number of attempts made with this tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Repeated Usage")
@@ -317,8 +330,13 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the usage of a tool by an agent."""
"""Records the usage of a tool by an agent.
Args:
llm (Any): The language model being used
tool_name (str): Name of the tool being used
attempts (int): Number of attempts made with this tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage")
@@ -337,8 +355,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_usage_error(self, llm: Any):
"""Records the usage of a tool by an agent."""
"""Records when a tool usage results in an error.
Args:
llm (Any): The language model being used when the error occurred
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage Error")
@@ -357,6 +378,14 @@ class Telemetry:
def individual_test_result_span(
self, crew: Crew, quality: float, exec_time: int, model_name: str
):
"""Records individual test results for a crew execution.
Args:
crew (Crew): The crew being tested
quality (float): Quality score of the execution
exec_time (int): Execution time in seconds
model_name (str): Name of the model used
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Individual Test Result")
@@ -383,6 +412,14 @@ class Telemetry:
inputs: dict[str, Any] | None,
model_name: str,
):
"""Records the execution of a test suite for a crew.
Args:
crew (Crew): The crew being tested
iterations (int): Number of test iterations
inputs (dict[str, Any] | None): Input parameters for the test
model_name (str): Name of the model used in testing
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Test Execution")
@@ -408,6 +445,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def deploy_signup_error_span(self):
"""Records when an error occurs during the deployment signup process."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
@@ -417,6 +455,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def start_deployment_span(self, uuid: Optional[str] = None):
"""Records the start of a deployment process.
Args:
uuid (Optional[str]): Unique identifier for the deployment
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
@@ -428,6 +471,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def create_crew_deployment_span(self):
"""Records the creation of a new crew deployment."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
@@ -437,6 +481,12 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def get_crew_logs_span(self, uuid: Optional[str], log_type: str = "deployment"):
"""Records the retrieval of crew logs.
Args:
uuid (Optional[str]): Unique identifier for the crew
log_type (str, optional): Type of logs being retrieved. Defaults to "deployment".
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
@@ -449,6 +499,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def remove_crew_span(self, uuid: Optional[str] = None):
"""Records the removal of a crew.
Args:
uuid (Optional[str]): Unique identifier for the crew being removed
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
@@ -574,6 +629,11 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_creation_span(self, flow_name: str):
"""Records the creation of a new flow.
Args:
flow_name (str): Name of the flow being created
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
@@ -584,6 +644,12 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_plotting_span(self, flow_name: str, node_names: list[str]):
"""Records flow visualization/plotting activity.
Args:
flow_name (str): Name of the flow being plotted
node_names (list[str]): List of node names in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
@@ -595,6 +661,12 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_execution_span(self, flow_name: str, node_names: list[str]):
"""Records the execution of a flow.
Args:
flow_name (str): Name of the flow being executed
node_names (list[str]): List of nodes being executed in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")

View File

@@ -455,7 +455,7 @@ class ToolUsage:
# Attempt 4: Repair JSON
try:
repaired_input = repair_json(tool_input)
repaired_input = repair_json(tool_input, skip_json_loads=True)
self._printer.print(
content=f"Repaired JSON: {repaired_input}", color="blue"
)

View File

@@ -1,40 +1,8 @@
import os
import warnings
from typing import Any, Callable, Dict, List, Optional, Union, cast
from typing import Any, Dict, Optional, cast
# Initialize with None to indicate module import status
CHROMADB_AVAILABLE = False
# Define placeholder types for when chromadb is not available
class EmbeddingFunction:
def __call__(self, texts):
raise NotImplementedError("Chromadb is not available")
Documents = List[str]
Embeddings = List[List[float]]
def validate_embedding_function(func):
return func
# Try to import chromadb-related modules with proper error handling
try:
from chromadb.api.types import Documents as ChromaDocuments
from chromadb.api.types import EmbeddingFunction as ChromaEmbeddingFunction
from chromadb.api.types import Embeddings as ChromaEmbeddings
from chromadb.utils import (
validate_embedding_function as chroma_validate_embedding_function,
)
# Override our placeholder types with the real ones
Documents = ChromaDocuments
EmbeddingFunction = ChromaEmbeddingFunction
Embeddings = ChromaEmbeddings
validate_embedding_function = chroma_validate_embedding_function
CHROMADB_AVAILABLE = True
except (ImportError, AttributeError) as e:
# This captures both ImportError and AttributeError (which can happen with NumPy 2.x)
warnings.warn(f"Failed to import chromadb: {str(e)}. Embedding functionality will be limited.")
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
class EmbeddingConfigurator:
@@ -58,9 +26,6 @@ class EmbeddingConfigurator:
embedder_config: Optional[Dict[str, Any]] = None,
) -> EmbeddingFunction:
"""Configures and returns an embedding function based on the provided config."""
if not CHROMADB_AVAILABLE:
return self._create_unavailable_embedding_function()
if embedder_config is None:
return self._create_default_embedding_function()
@@ -79,230 +44,143 @@ class EmbeddingConfigurator:
if provider == "custom"
else embedding_function(config, model_name)
)
@staticmethod
def _create_unavailable_embedding_function():
"""Creates a fallback embedding function when chromadb is not available."""
class UnavailableEmbeddingFunction(EmbeddingFunction):
def __call__(self, input):
raise ImportError(
"Chromadb is not available due to NumPy compatibility issues. "
"Either downgrade to NumPy<2 or upgrade chromadb and related dependencies."
)
return UnavailableEmbeddingFunction()
@staticmethod
def _create_default_embedding_function():
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
except (ImportError, AttributeError) as e:
import warnings
warnings.warn(f"Failed to import OpenAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
@staticmethod
def _configure_openai(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import OpenAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
@staticmethod
def _configure_azure(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import OpenAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
@staticmethod
def _configure_ollama(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import OllamaEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
@staticmethod
def _configure_vertexai(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import GoogleVertexEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
@staticmethod
def _configure_google(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import GoogleGenerativeAiEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
@staticmethod
def _configure_cohere(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import CohereEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
@staticmethod
def _configure_voyageai(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import VoyageAIEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
@staticmethod
def _configure_bedrock(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import AmazonBedrockEmbeddingFunction: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
@staticmethod
def _configure_huggingface(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
except (ImportError, AttributeError) as e:
warnings.warn(f"Failed to import HuggingFaceEmbeddingServer: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
@staticmethod
def _configure_watson(config, model_name):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
try:
import ibm_watsonx_ai.foundation_models as watson_models
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames as EmbedParams
except ImportError as e:
warnings.warn(
raise ImportError(
"IBM Watson dependencies are not installed. Please install them to use Watson embedding."
)
return EmbeddingConfigurator._create_unavailable_embedding_function()
) from e
class WatsonEmbeddingFunction(EmbeddingFunction):
def __call__(self, input: Documents) -> Embeddings:
@@ -334,30 +212,25 @@ class EmbeddingConfigurator:
@staticmethod
def _configure_custom(config):
if not CHROMADB_AVAILABLE:
return EmbeddingConfigurator._create_unavailable_embedding_function()
custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction):
try:
validate_embedding_function(custom_embedder)
return custom_embedder
except Exception as e:
warnings.warn(f"Invalid custom embedding function: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
raise ValueError(f"Invalid custom embedding function: {str(e)}")
elif callable(custom_embedder):
try:
instance = custom_embedder()
if isinstance(instance, EmbeddingFunction):
validate_embedding_function(instance)
return instance
warnings.warn("Custom embedder does not create an EmbeddingFunction instance")
return EmbeddingConfigurator._create_unavailable_embedding_function()
raise ValueError(
"Custom embedder does not create an EmbeddingFunction instance"
)
except Exception as e:
warnings.warn(f"Error instantiating custom embedder: {str(e)}")
return EmbeddingConfigurator._create_unavailable_embedding_function()
raise ValueError(f"Error instantiating custom embedder: {str(e)}")
else:
warnings.warn(
raise ValueError(
"Custom embedder must be an instance of `EmbeddingFunction` or a callable that creates one"
)
return EmbeddingConfigurator._create_unavailable_embedding_function()

View File

@@ -1,10 +1,12 @@
from typing import List
import re
from typing import TYPE_CHECKING, List
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
if TYPE_CHECKING:
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> str:
def aggregate_raw_outputs_from_task_outputs(task_outputs: List["TaskOutput"]) -> str:
"""Generate string context from the task outputs."""
dividers = "\n\n----------\n\n"
@@ -13,7 +15,7 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> s
return context
def aggregate_raw_outputs_from_tasks(tasks: List[Task]) -> str:
def aggregate_raw_outputs_from_tasks(tasks: List["Task"]) -> str:
"""Generate string context from the tasks."""
task_outputs = [task.output for task in tasks if task.output is not None]

View File

@@ -96,6 +96,10 @@ class CrewPlanner:
tasks_summary = []
for idx, task in enumerate(self.tasks):
knowledge_list = self._get_agent_knowledge(task)
agent_tools = (
f"[{', '.join(str(tool) for tool in task.agent.tools)}]" if task.agent and task.agent.tools else '"agent has no tools"',
f',\n "agent_knowledge": "[\\"{knowledge_list[0]}\\"]"' if knowledge_list and str(knowledge_list) != "None" else ""
)
task_summary = f"""
Task Number {idx + 1} - {task.description}
"task_description": {task.description}
@@ -103,10 +107,7 @@ class CrewPlanner:
"agent": {task.agent.role if task.agent else "None"}
"agent_goal": {task.agent.goal if task.agent else "None"}
"task_tools": {task.tools}
"agent_tools": %s%s""" % (
f"[{', '.join(str(tool) for tool in task.agent.tools)}]" if task.agent and task.agent.tools else '"agent has no tools"',
f',\n "agent_knowledge": "[\\"{knowledge_list[0]}\\"]"' if knowledge_list and str(knowledge_list) != "None" else ""
)
"agent_tools": {"".join(agent_tools)}"""
tasks_summary.append(task_summary)
return " ".join(tasks_summary)

View File

@@ -0,0 +1,82 @@
import re
from typing import Any, Dict, List, Optional, Union
def interpolate_only(
input_string: Optional[str],
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]],
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Only interpolates placeholders that follow the pattern {variable_name} where
variable_name starts with a letter/underscore and contains only letters, numbers, and underscores.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, floats, and dicts/lists
containing only these types and other nested dicts/lists.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a value contains unsupported types or a template variable is missing
"""
# Validation function for recursive type checking
def validate_type(value: Any) -> None:
if value is None:
return
if isinstance(value, (str, int, float, bool)):
return
if isinstance(value, (dict, list)):
for item in value.values() if isinstance(value, dict) else value:
validate_type(item)
return
raise ValueError(
f"Unsupported type {type(value).__name__} in inputs. "
"Only str, int, float, bool, dict, and list are allowed."
)
# Validate all input values
for key, value in inputs.items():
try:
validate_type(value)
except ValueError as e:
raise ValueError(f"Invalid value for key '{key}': {str(e)}") from e
if input_string is None or not input_string:
return ""
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
# The regex pattern to find valid variable placeholders
# Matches {variable_name} where variable_name starts with a letter/underscore
# and contains only letters, numbers, and underscores
pattern = r"\{([A-Za-z_][A-Za-z0-9_]*)\}"
# Find all matching variables in the input string
variables = re.findall(pattern, input_string)
result = input_string
# Check if all variables exist in inputs
missing_vars = [var for var in variables if var not in inputs]
if missing_vars:
raise KeyError(
f"Template variable '{missing_vars[0]}' not found in inputs dictionary"
)
# Replace each variable with its value
for var in variables:
if var in inputs:
placeholder = "{" + var + "}"
value = str(inputs[var])
result = result.replace(placeholder, value)
return result

View File

@@ -15,6 +15,7 @@ from crewai import Agent, Crew, Process, Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter
from crewai.utilities.string_utils import interpolate_only
def test_task_tool_reflect_agent_tools():
@@ -822,7 +823,7 @@ def test_interpolate_only():
# Test JSON structure preservation
json_string = '{"info": "Look at {placeholder}", "nested": {"val": "{nestedVal}"}}'
result = task.interpolate_only(
result = interpolate_only(
input_string=json_string,
inputs={"placeholder": "the data", "nestedVal": "something else"},
)
@@ -833,20 +834,18 @@ def test_interpolate_only():
# Test normal string interpolation
normal_string = "Hello {name}, welcome to {place}!"
result = task.interpolate_only(
result = interpolate_only(
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
)
assert result == "Hello John, welcome to CrewAI!"
# Test empty string
result = task.interpolate_only(input_string="", inputs={"unused": "value"})
result = interpolate_only(input_string="", inputs={"unused": "value"})
assert result == ""
# Test string with no placeholders
no_placeholders = "Hello, this is a test"
result = task.interpolate_only(
input_string=no_placeholders, inputs={"unused": "value"}
)
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
assert result == no_placeholders
@@ -858,7 +857,7 @@ def test_interpolate_only_with_dict_inside_expected_output():
)
json_string = '{"questions": {"main_question": "What is the user\'s name?", "secondary_question": "What is the user\'s age?"}}'
result = task.interpolate_only(
result = interpolate_only(
input_string=json_string,
inputs={
"questions": {
@@ -872,18 +871,16 @@ def test_interpolate_only_with_dict_inside_expected_output():
assert result == json_string
normal_string = "Hello {name}, welcome to {place}!"
result = task.interpolate_only(
result = interpolate_only(
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
)
assert result == "Hello John, welcome to CrewAI!"
result = task.interpolate_only(input_string="", inputs={"unused": "value"})
result = interpolate_only(input_string="", inputs={"unused": "value"})
assert result == ""
no_placeholders = "Hello, this is a test"
result = task.interpolate_only(
input_string=no_placeholders, inputs={"unused": "value"}
)
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
assert result == no_placeholders
@@ -1085,12 +1082,12 @@ def test_interpolate_with_list_of_strings():
# Test simple list of strings
input_str = "Available items: {items}"
inputs = {"items": ["apple", "banana", "cherry"]}
result = task.interpolate_only(input_str, inputs)
result = interpolate_only(input_str, inputs)
assert result == f"Available items: {inputs['items']}"
# Test empty list
empty_list_input = {"items": []}
result = task.interpolate_only(input_str, empty_list_input)
result = interpolate_only(input_str, empty_list_input)
assert result == "Available items: []"
@@ -1106,7 +1103,7 @@ def test_interpolate_with_list_of_dicts():
{"name": "Bob", "age": 25, "skills": ["Java", "Cloud"]},
]
}
result = task.interpolate_only("{people}", input_data)
result = interpolate_only("{people}", input_data)
parsed_result = eval(result)
assert isinstance(parsed_result, list)
@@ -1138,7 +1135,7 @@ def test_interpolate_with_nested_structures():
],
}
}
result = task.interpolate_only("{company}", input_data)
result = interpolate_only("{company}", input_data)
parsed = eval(result)
assert parsed["name"] == "TechCorp"
@@ -1161,7 +1158,7 @@ def test_interpolate_with_special_characters():
"empty": "",
}
}
result = task.interpolate_only("{special_data}", input_data)
result = interpolate_only("{special_data}", input_data)
parsed = eval(result)
assert parsed["quotes"] == """This has "double" and 'single' quotes"""
@@ -1188,7 +1185,7 @@ def test_interpolate_mixed_types():
},
}
}
result = task.interpolate_only("{data}", input_data)
result = interpolate_only("{data}", input_data)
parsed = eval(result)
assert parsed["name"] == "Test Dataset"
@@ -1216,7 +1213,7 @@ def test_interpolate_complex_combination():
},
]
}
result = task.interpolate_only("{report}", input_data)
result = interpolate_only("{report}", input_data)
parsed = eval(result)
assert len(parsed) == 2
@@ -1233,7 +1230,7 @@ def test_interpolate_invalid_type_validation():
# Test with invalid top-level type
with pytest.raises(ValueError) as excinfo:
task.interpolate_only("{data}", {"data": set()}) # type: ignore we are purposely testing this failure
interpolate_only("{data}", {"data": set()}) # type: ignore we are purposely testing this failure
assert "Unsupported type set" in str(excinfo.value)
@@ -1246,7 +1243,7 @@ def test_interpolate_invalid_type_validation():
}
}
with pytest.raises(ValueError) as excinfo:
task.interpolate_only("{data}", {"data": invalid_nested})
interpolate_only("{data}", {"data": invalid_nested})
assert "Unsupported type set" in str(excinfo.value)
@@ -1265,24 +1262,22 @@ def test_interpolate_custom_object_validation():
# Test with custom object at top level
with pytest.raises(ValueError) as excinfo:
task.interpolate_only("{obj}", {"obj": CustomObject(5)}) # type: ignore we are purposely testing this failure
interpolate_only("{obj}", {"obj": CustomObject(5)}) # type: ignore we are purposely testing this failure
assert "Unsupported type CustomObject" in str(excinfo.value)
# Test with nested custom object in dictionary
with pytest.raises(ValueError) as excinfo:
task.interpolate_only(
"{data}", {"data": {"valid": 1, "invalid": CustomObject(5)}}
)
interpolate_only("{data}", {"data": {"valid": 1, "invalid": CustomObject(5)}})
assert "Unsupported type CustomObject" in str(excinfo.value)
# Test with nested custom object in list
with pytest.raises(ValueError) as excinfo:
task.interpolate_only("{data}", {"data": [1, "valid", CustomObject(5)]})
interpolate_only("{data}", {"data": [1, "valid", CustomObject(5)]})
assert "Unsupported type CustomObject" in str(excinfo.value)
# Test with deeply nested custom object
with pytest.raises(ValueError) as excinfo:
task.interpolate_only(
interpolate_only(
"{data}", {"data": {"level1": {"level2": [{"level3": CustomObject(5)}]}}}
)
assert "Unsupported type CustomObject" in str(excinfo.value)
@@ -1306,7 +1301,7 @@ def test_interpolate_valid_complex_types():
}
# Should not raise any errors
result = task.interpolate_only("{data}", {"data": valid_data})
result = interpolate_only("{data}", {"data": valid_data})
parsed = eval(result)
assert parsed["name"] == "Valid Dataset"
assert parsed["stats"]["nested"]["deeper"]["b"] == 2.5
@@ -1319,16 +1314,16 @@ def test_interpolate_edge_cases():
)
# Test empty dict and list
assert task.interpolate_only("{}", {"data": {}}) == "{}"
assert task.interpolate_only("[]", {"data": []}) == "[]"
assert interpolate_only("{}", {"data": {}}) == "{}"
assert interpolate_only("[]", {"data": []}) == "[]"
# Test numeric types
assert task.interpolate_only("{num}", {"num": 42}) == "42"
assert task.interpolate_only("{num}", {"num": 3.14}) == "3.14"
assert interpolate_only("{num}", {"num": 42}) == "42"
assert interpolate_only("{num}", {"num": 3.14}) == "3.14"
# Test boolean values (valid JSON types)
assert task.interpolate_only("{flag}", {"flag": True}) == "True"
assert task.interpolate_only("{flag}", {"flag": False}) == "False"
assert interpolate_only("{flag}", {"flag": True}) == "True"
assert interpolate_only("{flag}", {"flag": False}) == "False"
def test_interpolate_valid_types():
@@ -1346,7 +1341,7 @@ def test_interpolate_valid_types():
"nested": {"flag": True, "empty": None},
}
result = task.interpolate_only("{data}", {"data": valid_data})
result = interpolate_only("{data}", {"data": valid_data})
parsed = eval(result)
assert parsed["active"] is True

View File

@@ -1,64 +0,0 @@
import importlib
import sys
import warnings
import pytest
def test_crew_import_with_numpy():
"""Test that crewai can be imported even with NumPy compatibility issues."""
try:
# Force reload to ensure we test our fix
if "crewai" in sys.modules:
importlib.reload(sys.modules["crewai"])
# This should not raise an exception
from crewai import Crew
assert Crew is not None
except Exception as e:
pytest.fail(f"Failed to import Crew: {e}")
def test_embedding_configurator_with_numpy():
"""Test that EmbeddingConfigurator can be imported with NumPy."""
try:
# Force reload
if "crewai.utilities.embedding_configurator" in sys.modules:
importlib.reload(sys.modules["crewai.utilities.embedding_configurator"])
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
# Test that we can create an embedder (might be unavailable but shouldn't crash)
embedder = configurator.configure_embedder()
assert embedder is not None
except Exception as e:
pytest.fail(f"Failed to use EmbeddingConfigurator: {e}")
def test_rag_storage_with_numpy():
"""Test that RAGStorage can be imported and used with NumPy."""
try:
# Force reload
if "crewai.memory.storage.rag_storage" in sys.modules:
importlib.reload(sys.modules["crewai.memory.storage.rag_storage"])
from crewai.memory.storage.rag_storage import RAGStorage
# Initialize with minimal config to avoid actual DB operations
storage = RAGStorage(type="test", crew=None)
# Just verify we can create the object without errors
assert storage is not None
except Exception as e:
pytest.fail(f"Failed to use RAGStorage: {e}")
def test_knowledge_storage_with_numpy():
"""Test that KnowledgeStorage can be imported and used with NumPy."""
try:
# Force reload
if "crewai.knowledge.storage.knowledge_storage" in sys.modules:
importlib.reload(sys.modules["crewai.knowledge.storage.knowledge_storage"])
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
# Initialize with minimal config
storage = KnowledgeStorage()
# Just verify we can create the object without errors
assert storage is not None
except Exception as e:
pytest.fail(f"Failed to use KnowledgeStorage: {e}")

View File

@@ -0,0 +1,187 @@
from typing import Any, Dict, List, Union
import pytest
from crewai.utilities.string_utils import interpolate_only
class TestInterpolateOnly:
"""Tests for the interpolate_only function in string_utils.py."""
def test_basic_variable_interpolation(self):
"""Test basic variable interpolation works correctly."""
template = "Hello, {name}! Welcome to {company}."
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Alice",
"company": "CrewAI",
}
result = interpolate_only(template, inputs)
assert result == "Hello, Alice! Welcome to CrewAI."
def test_multiple_occurrences_of_same_variable(self):
"""Test that multiple occurrences of the same variable are replaced."""
template = "{name} is using {name}'s account."
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Bob"
}
result = interpolate_only(template, inputs)
assert result == "Bob is using Bob's account."
def test_json_structure_preservation(self):
"""Test that JSON structures are preserved and not interpolated incorrectly."""
template = """
Instructions for {agent}:
Please return the following object:
{"name": "person's name", "age": 25, "skills": ["coding", "testing"]}
"""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"agent": "DevAgent"
}
result = interpolate_only(template, inputs)
assert "Instructions for DevAgent:" in result
assert (
'{"name": "person\'s name", "age": 25, "skills": ["coding", "testing"]}'
in result
)
def test_complex_nested_json(self):
"""Test with complex JSON structures containing curly braces."""
template = """
{agent} needs to process:
{
"config": {
"nested": {
"value": 42
},
"arrays": [1, 2, {"inner": "value"}]
}
}
"""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"agent": "DataProcessor"
}
result = interpolate_only(template, inputs)
assert "DataProcessor needs to process:" in result
assert '"nested": {' in result
assert '"value": 42' in result
assert '[1, 2, {"inner": "value"}]' in result
def test_missing_variable(self):
"""Test that an error is raised when a required variable is missing."""
template = "Hello, {name}!"
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"not_name": "Alice"
}
with pytest.raises(KeyError) as excinfo:
interpolate_only(template, inputs)
assert "template variable" in str(excinfo.value).lower()
assert "name" in str(excinfo.value)
def test_invalid_input_types(self):
"""Test that an error is raised with invalid input types."""
template = "Hello, {name}!"
# Using Any for this test since we're intentionally testing an invalid type
inputs: Dict[str, Any] = {"name": object()} # Object is not a valid input type
with pytest.raises(ValueError) as excinfo:
interpolate_only(template, inputs)
assert "unsupported type" in str(excinfo.value).lower()
def test_empty_input_string(self):
"""Test handling of empty or None input string."""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Alice"
}
assert interpolate_only("", inputs) == ""
assert interpolate_only(None, inputs) == ""
def test_no_variables_in_template(self):
"""Test a template with no variables to replace."""
template = "This is a static string with no variables."
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"name": "Alice"
}
result = interpolate_only(template, inputs)
assert result == template
def test_variable_name_starting_with_underscore(self):
"""Test variables starting with underscore are replaced correctly."""
template = "Variable: {_special_var}"
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"_special_var": "Special Value"
}
result = interpolate_only(template, inputs)
assert result == "Variable: Special Value"
def test_preserves_non_matching_braces(self):
"""Test that non-matching braces patterns are preserved."""
template = (
"This {123} and {!var} should not be replaced but {valid_var} should."
)
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"valid_var": "works"
}
result = interpolate_only(template, inputs)
assert (
result == "This {123} and {!var} should not be replaced but works should."
)
def test_complex_mixed_scenario(self):
"""Test a complex scenario with both valid variables and JSON structures."""
template = """
{agent_name} is working on task {task_id}.
Instructions:
1. Process the data
2. Return results as:
{
"taskId": "{task_id}",
"results": {
"processed_by": "agent_name",
"status": "complete",
"values": [1, 2, 3]
}
}
"""
inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]] = {
"agent_name": "AnalyticsAgent",
"task_id": "T-12345",
}
result = interpolate_only(template, inputs)
assert "AnalyticsAgent is working on task T-12345" in result
assert '"taskId": "T-12345"' in result
assert '"processed_by": "agent_name"' in result # This shouldn't be replaced
assert '"values": [1, 2, 3]' in result
def test_empty_inputs_dictionary(self):
"""Test that an error is raised with empty inputs dictionary."""
template = "Hello, {name}!"
inputs: Dict[str, Any] = {}
with pytest.raises(ValueError) as excinfo:
interpolate_only(template, inputs)
assert "inputs dictionary cannot be empty" in str(excinfo.value).lower()