Compare commits

..

19 Commits

Author SHA1 Message Date
Devin AI
65ce1dbf31 fix: Fix type-checker error in crew_agent_executor.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:19:28 +00:00
Devin AI
7ada6daa39 test: Update VCR configuration and test cases
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:15:03 +00:00
Devin AI
16ab38f330 test: Add VCR cassettes for tool output formatting tests
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:13:54 +00:00
Devin AI
09417f9821 style: Fix import sorting in crew_test.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:10:49 +00:00
Devin AI
77fa4a548e test: Update test environment and VCR configuration
- Add mock_openai_key fixture for consistent test API key
- Update VCR configuration to record API interactions
- Update test cases to handle None and whitespace inputs

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:06:44 +00:00
Devin AI
e9e998e8b2 test: Update VCR cassette with proper test API key
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:02:18 +00:00
Devin AI
4c7fe88ca2 test: Add negative test cases for tool output formatting
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:01:37 +00:00
Devin AI
9c2d49d3de fix: Fix indentation in crew_agent_executor.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 11:00:42 +00:00
Devin AI
892e4f6154 style: Fix import sorting in crew_test.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 10:58:12 +00:00
Devin AI
c8bb9561ce refactor: Improve backtick stripping and test coverage
- Extract backtick stripping to _clean_tool_result method
- Add docstrings explaining the backtick cleaning logic
- Add parameterized tests for various backtick scenarios

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 10:56:42 +00:00
Devin AI
4073d9a103 style: Fix import sorting in crew_test.py
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 10:55:31 +00:00
Devin AI
4129a93993 test: Add VCR cassette for hierarchical tool output test
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 10:53:43 +00:00
Devin AI
c830ffab7a fix: Remove extra backticks from hierarchical tool outputs
Issue #2105 - Tool outputs in hierarchical mode were getting extra backticks
appended. This fix:
- Strips trailing backticks from tool results
- Adds test to verify tool output formatting in hierarchical mode

Co-Authored-By: Joe Moura <joao@crewai.com>
2025-02-12 10:53:11 +00:00
Brandon Hancock (bhancock_ai)
47818f4f41 updating bedrock docs (#2088)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 12:48:12 -05:00
Brandon Hancock (bhancock_ai)
9b10fd47b0 incorporate Small update in memory.mdx, fixing Google AI parameters #2008 (#2087) 2025-02-10 12:17:41 -05:00
Brandon Hancock (bhancock_ai)
c408368267 fix linting issues in new tests (#2089)
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 12:10:53 -05:00
Kevin King
90b3145e92 Updated excel_knowledge_source.py to account for excel files with multiple tabs. (#1921)
* Updated excel_knowledge_source.py to account for excel sheets that have multiple tabs. The old implementation contained a single df=pd.read_excel(excel_file_path), which only reads the first or most recently used excel sheet. The updated functionality reads all sheets in the excel workbook.

* updated load_content() function in excel_knowledge_source.py to reduce memory usage and provide better documentation

* accidentally didn't delete the old load_content() function in last commit - corrected this

* Added an override for the content field from the inheritted BaseFileKnowledgeSource to account for the change in the load_content method to support excel files with multiple tabs/sheets. This change should ensure it passes the type check test, as it failed before since content was assigned a different type in BaseFileKnowledgeSource

* Now removed the commented out imports in _import_dependencies, as requested

* Updated excel_knowledge_source to fix linter errors and type errors. Changed inheritence from basefileknowledgesource to baseknowledgesource because basefileknowledgesource's types conflicted (in particular the load_content function and the content class variable.

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-02-10 08:56:32 -08:00
Nicolas Lorin
fbd0e015d5 doc: use the corresponding source depending on filetype (#2038)
Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com>
2025-02-09 20:25:33 -03:00
Bradley Goodyear
17e25fb842 Fix a typo in the Task Guardrails section (#2043)
Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-02-09 20:23:52 -03:00
32 changed files with 1649 additions and 724 deletions

View File

@@ -91,7 +91,7 @@ result = crew.kickoff(inputs={"question": "What city does John live in and how o
```
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including TXT, PDF, DOCX, HTML, and more.
Here's another example with the `CrewDoclingSource`. The CrewDoclingSource is actually quite versatile and can handle multiple file formats including MD, PDF, DOCX, HTML, and more.
<Note>
You need to install `docling` for the following example to work: `uv add docling`
@@ -152,10 +152,10 @@ Here are examples of how to use different types of knowledge sources:
### Text File Knowledge Source
```python
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
# Create a text file knowledge source
text_source = CrewDoclingSource(
text_source = TextFileKnowledgeSource(
file_paths=["document.txt", "another.txt"]
)

View File

@@ -282,6 +282,19 @@ my_crew = Crew(
### Using Google AI embeddings
#### Prerequisites
Before using Google AI embeddings, ensure you have:
- Access to the Gemini API
- The necessary API keys and permissions
You will need to update your *pyproject.toml* dependencies:
```YAML
dependencies = [
"google-generativeai>=0.8.4", #main version in January/2025 - crewai v.0.100.0 and crewai-tools 0.33.0
"crewai[tools]>=0.100.0,<1.0.0"
]
```
```python Code
from crewai import Crew, Agent, Task, Process
@@ -434,6 +447,38 @@ my_crew = Crew(
)
```
### Using Amazon Bedrock embeddings
```python Code
# Note: Ensure you have installed `boto3` for Bedrock embeddings to work.
import os
import boto3
from crewai import Crew, Agent, Task, Process
boto3_session = boto3.Session(
region_name=os.environ.get("AWS_REGION_NAME"),
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY")
)
my_crew = Crew(
agents=[...],
tasks=[...],
process=Process.sequential,
memory=True,
embedder={
"provider": "bedrock",
"config":{
"session": boto3_session,
"model": "amazon.titan-embed-text-v2:0",
"vector_dimension": 1024
}
}
verbose=True
)
```
### Adding Custom Embedding Function
```python Code

View File

@@ -268,7 +268,7 @@ analysis_task = Task(
Task guardrails provide a way to validate and transform task outputs before they
are passed to the next task. This feature helps ensure data quality and provides
efeedback to agents when their output doesn't meet specific criteria.
feedback to agents when their output doesn't meet specific criteria.
### Using Task Guardrails

View File

@@ -1,45 +0,0 @@
# Memory in CrewAI
CrewAI provides a robust memory system that allows agents to retain and recall information from previous interactions.
## Configuring Embedding Providers
CrewAI supports multiple embedding providers for memory functionality:
- OpenAI (default) - Requires `OPENAI_API_KEY`
- Ollama - Requires `CREWAI_OLLAMA_URL` (defaults to "http://localhost:11434/api/embeddings")
### Environment Variables
Configure the embedding provider using these environment variables:
- `CREWAI_EMBEDDING_PROVIDER`: Provider name (default: "openai")
- `CREWAI_EMBEDDING_MODEL`: Model name (default: "text-embedding-3-small")
- `CREWAI_OLLAMA_URL`: URL for Ollama API (when using Ollama provider)
### Example Configuration
```python
# Using OpenAI (default)
os.environ["OPENAI_API_KEY"] = "your-api-key"
# Using Ollama
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "ollama"
os.environ["CREWAI_EMBEDDING_MODEL"] = "llama2" # or any other model supported by your Ollama instance
os.environ["CREWAI_OLLAMA_URL"] = "http://localhost:11434/api/embeddings" # optional, this is the default
```
## Memory Usage
When an agent has memory enabled, it can access and store information from previous interactions:
```python
agent = Agent(
role="Researcher",
goal="Research AI topics",
backstory="You're an AI researcher",
memory=True # Enable memory for this agent
)
```
The memory system uses embeddings to store and retrieve relevant information, allowing agents to maintain context across multiple interactions and tasks.

View File

@@ -1,14 +1,13 @@
import os
import re
import shutil
import subprocess
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Sequence, Union
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from crewai.agents import CacheHandler
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.cli.constants import ENV_VARS, LITELLM_PARAMS
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
@@ -17,10 +16,10 @@ from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -55,13 +54,13 @@ class Agent(BaseAgent):
llm: The language model that will run the agent.
function_calling_llm: The language model that will handle the tool calling for this agent, it overrides the crew function_calling_llm.
max_iter: Maximum number of iterations for an agent to execute a task.
memory: Whether the agent should have memory or not.
max_rpm: Maximum number of requests per minute for the agent execution to be respected.
verbose: Whether the agent execution should be in verbose mode.
allow_delegation: Whether the agent is allowed to delegate tasks to other agents.
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
knowledge_sources: Knowledge sources for the agent.
embedder: Embedder configuration for the agent.
"""
_times_executed: int = PrivateAttr(default=0)
@@ -71,9 +70,6 @@ class Agent(BaseAgent):
)
agent_ops_agent_name: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
agent_ops_agent_id: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
cache_handler: InstanceOf[CacheHandler] = Field(
default=None, description="An instance of the CacheHandler class."
)
step_callback: Optional[Any] = Field(
default=None,
description="Callback to be executed after each step of the agent execution.",
@@ -85,7 +81,7 @@ class Agent(BaseAgent):
llm: Union[str, InstanceOf[LLM], Any] = Field(
description="Language model that will run the agent.", default=None
)
function_calling_llm: Optional[Any] = Field(
function_calling_llm: Optional[Union[str, InstanceOf[LLM], Any]] = Field(
description="Language model that will run the agent.", default=None
)
system_template: Optional[str] = Field(
@@ -107,10 +103,6 @@ class Agent(BaseAgent):
default=True,
description="Keep messages under the context window size by summarizing content.",
)
max_iter: int = Field(
default=20,
description="Maximum number of iterations for an agent to execute a task before giving it's best answer",
)
max_retry_limit: int = Field(
default=2,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
@@ -123,105 +115,19 @@ class Agent(BaseAgent):
default="safe",
description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).",
)
embedder_config: Optional[Dict[str, Any]] = Field(
embedder: Optional[Dict[str, Any]] = Field(
default=None,
description="Embedder configuration for the agent.",
)
knowledge_sources: Optional[List[BaseKnowledgeSource]] = Field(
default=None,
description="Knowledge sources for the agent.",
)
_knowledge: Optional[Knowledge] = PrivateAttr(
default=None,
)
@model_validator(mode="after")
def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role
unaccepted_attributes = [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_REGION_NAME",
]
# Handle different cases for self.llm
if isinstance(self.llm, str):
# If it's a string, create an LLM instance
self.llm = LLM(model=self.llm)
elif isinstance(self.llm, LLM):
# If it's already an LLM instance, keep it as is
pass
elif self.llm is None:
# Determine the model name from environment variables or use default
model_name = (
os.environ.get("OPENAI_MODEL_NAME")
or os.environ.get("MODEL")
or "gpt-4o-mini"
)
llm_params = {"model": model_name}
api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get(
"OPENAI_BASE_URL"
)
if api_base:
llm_params["base_url"] = api_base
set_provider = model_name.split("/")[0] if "/" in model_name else "openai"
# Iterate over all environment variables to find matching API keys or use defaults
for provider, env_vars in ENV_VARS.items():
if provider == set_provider:
for env_var in env_vars:
# Check if the environment variable is set
key_name = env_var.get("key_name")
if key_name and key_name not in unaccepted_attributes:
env_value = os.environ.get(key_name)
if env_value:
key_name = key_name.lower()
for pattern in LITELLM_PARAMS:
if pattern in key_name:
key_name = pattern
break
llm_params[key_name] = env_value
# Check for default values if the environment variable is not set
elif env_var.get("default", False):
for key, value in env_var.items():
if key not in ["prompt", "key_name", "default"]:
# Only add default if the key is already set in os.environ
if key in os.environ:
llm_params[key] = value
self.llm = LLM(**llm_params)
else:
# For any other type, attempt to extract relevant attributes
llm_params = {
"model": getattr(self.llm, "model_name", None)
or getattr(self.llm, "deployment_name", None)
or str(self.llm),
"temperature": getattr(self.llm, "temperature", None),
"max_tokens": getattr(self.llm, "max_tokens", None),
"logprobs": getattr(self.llm, "logprobs", None),
"timeout": getattr(self.llm, "timeout", None),
"max_retries": getattr(self.llm, "max_retries", None),
"api_key": getattr(self.llm, "api_key", None),
"base_url": getattr(self.llm, "base_url", None),
"organization": getattr(self.llm, "organization", None),
}
# Remove None values to avoid passing unnecessary parameters
llm_params = {k: v for k, v in llm_params.items() if v is not None}
self.llm = LLM(**llm_params)
# Similar handling for function_calling_llm
if self.function_calling_llm:
if isinstance(self.function_calling_llm, str):
self.function_calling_llm = LLM(model=self.function_calling_llm)
elif not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = LLM(
model=getattr(self.function_calling_llm, "model_name", None)
or getattr(self.function_calling_llm, "deployment_name", None)
or str(self.function_calling_llm)
)
self.llm = create_llm(self.llm)
if self.function_calling_llm and not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = create_llm(self.function_calling_llm)
if not self.agent_executor:
self._setup_agent_executor()
@@ -239,23 +145,16 @@ class Agent(BaseAgent):
def _set_knowledge(self):
try:
if self.knowledge_sources:
knowledge_agent_name = f"{self.role.replace(' ', '_')}"
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
if isinstance(self.knowledge_sources, list) and all(
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources
):
# Validate embedding configuration based on provider
from crewai.utilities.constants import DEFAULT_EMBEDDING_PROVIDER
provider = os.getenv("CREWAI_EMBEDDING_PROVIDER", DEFAULT_EMBEDDING_PROVIDER)
if provider == "openai" and not os.getenv("OPENAI_API_KEY"):
raise ValueError("Please provide an OpenAI API key via OPENAI_API_KEY environment variable")
elif provider == "ollama" and not os.getenv("CREWAI_OLLAMA_URL", "http://localhost:11434/api/embeddings"):
raise ValueError("Please provide Ollama URL via CREWAI_OLLAMA_URL environment variable")
self._knowledge = Knowledge(
self.knowledge = Knowledge(
sources=self.knowledge_sources,
embedder_config=self.embedder_config,
embedder=self.embedder,
collection_name=knowledge_agent_name,
storage=self.knowledge_storage or None,
)
except (TypeError, ValueError) as e:
raise ValueError(f"Invalid Knowledge Configuration: {str(e)}")
@@ -289,13 +188,15 @@ class Agent(BaseAgent):
if task.output_json:
# schema = json.dumps(task.output_json, indent=2)
schema = generate_model_description(task.output_json)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
elif task.output_pydantic:
schema = generate_model_description(task.output_pydantic)
task_prompt += "\n" + self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
task_prompt += "\n" + self.i18n.slice(
"formatted_task_instructions"
).format(output_format=schema)
if context:
task_prompt = self.i18n.slice("task_with_context").format(
@@ -314,8 +215,8 @@ class Agent(BaseAgent):
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)
if self._knowledge:
agent_knowledge_snippets = self._knowledge.query([task.prompt()])
if self.knowledge:
agent_knowledge_snippets = self.knowledge.query([task.prompt()])
if agent_knowledge_snippets:
agent_knowledge_context = extract_knowledge_context(
agent_knowledge_snippets
@@ -348,6 +249,9 @@ class Agent(BaseAgent):
}
)["output"]
except Exception as e:
if e.__class__.__module__.startswith("litellm"):
# Do not retry on litellm errors
raise e
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
raise e
@@ -420,13 +324,14 @@ class Agent(BaseAgent):
tools = agent_tools.tools()
return tools
def get_multimodal_tools(self) -> List[Tool]:
def get_multimodal_tools(self) -> Sequence[BaseTool]:
from crewai.tools.agent_tools.add_image_tool import AddImageTool
return [AddImageTool()]
def get_code_execution_tools(self):
try:
from crewai_tools import CodeInterpreterTool
from crewai_tools import CodeInterpreterTool # type: ignore
# Set the unsafe_mode based on the code_execution_mode attribute
unsafe_mode = self.code_execution_mode == "unsafe"

View File

@@ -113,7 +113,7 @@ class BaseAgent(ABC, BaseModel):
description="Enable agent to delegate and ask questions among each other.",
)
tools: Optional[List[Any]] = Field(
default_factory=lambda: [], description="Tools at agents' disposal"
default_factory=list, description="Tools at agents' disposal"
)
max_iter: int = Field(
default=25, description="Maximum iterations for an agent to execute a task"

View File

@@ -366,23 +366,41 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
tool_result = tool_calling.message
return ToolResult(result=tool_result, result_as_answer=False)
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in self.tool_name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in self.tool_name_to_tool_map
]:
valid_names = [name.casefold().strip() for name in self.tool_name_to_tool_map]
tool_name = tool_calling.tool_name.casefold().strip()
tool_name_alt = tool_calling.tool_name.casefold().replace("_", " ")
if tool_name in valid_names or tool_name_alt in valid_names:
tool_result = tool_usage.use(tool_calling, agent_action.text)
tool_result = self._clean_tool_result(tool_result)
tool = self.tool_name_to_tool_map.get(tool_calling.tool_name)
if tool:
return ToolResult(
result=tool_result, result_as_answer=tool.result_as_answer
)
else:
tool_result = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
return ToolResult(
result=tool_result,
result_as_answer=tool.result_as_answer if tool else False
)
return ToolResult(result=tool_result, result_as_answer=False)
tool_result = self._i18n.errors("wrong_tool_name").format(
tool=tool_calling.tool_name,
tools=", ".join([tool.name.casefold() for tool in self.tools]),
)
return ToolResult(result=tool_result, result_as_answer=False)
def _clean_tool_result(self, tool_result: Any) -> Any:
"""Clean tool result by removing trailing backticks.
This is particularly important in hierarchical mode where tool outputs
might contain markdown formatting that needs to be cleaned up.
Args:
tool_result: The result from a tool execution, can be any type
Returns:
The cleaned result with any trailing backticks removed if it's a string,
otherwise returns the original result unchanged
"""
if isinstance(tool_result, str):
return tool_result.rstrip('`').rstrip('```')
return tool_result
def _summarize_messages(self) -> None:
messages_groups = []

View File

@@ -1,10 +1,12 @@
import asyncio
import json
import re
import uuid
import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from pydantic import (
UUID4,
@@ -16,6 +18,7 @@ from pydantic import (
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent

View File

@@ -1,9 +1,7 @@
import json
from typing import Any, Callable, Dict, Optional
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from pydantic.main import IncEx
from typing_extensions import Literal
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
@@ -23,45 +21,16 @@ class CrewOutput(BaseModel):
tasks_output: list[TaskOutput] = Field(
description="Output of each task", default=[]
)
token_usage: UsageMetrics = Field(description="Processed token summary", default_factory=UsageMetrics)
token_usage: UsageMetrics = Field(description="Processed token summary", default={})
def model_json(self) -> str:
"""Get the JSON representation of the output."""
if self.tasks_output and self.tasks_output[-1].output_format != OutputFormat.JSON:
@property
def json(self) -> Optional[str]:
if self.tasks_output[-1].output_format != OutputFormat.JSON:
raise ValueError(
"No JSON output found in the final task. Please make sure to set the output_json property in the final task in your crew."
)
return json.dumps(self.json_dict) if self.json_dict else "{}"
def model_dump_json(
self,
*,
indent: Optional[int] = None,
include: Optional[IncEx] = None,
exclude: Optional[IncEx] = None,
context: Optional[Any] = None,
by_alias: bool = False,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
round_trip: bool = False,
warnings: bool | Literal["none", "warn", "error"] = False,
serialize_as_any: bool = False,
) -> str:
"""Override model_dump_json to handle custom JSON output."""
return super().model_dump_json(
indent=indent,
include=include,
exclude=exclude,
context=context,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
round_trip=round_trip,
warnings=warnings,
serialize_as_any=serialize_as_any,
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""

View File

@@ -47,7 +47,7 @@ class FastEmbed(BaseEmbedder):
cache_dir=str(cache_dir) if cache_dir else None,
)
def embed_chunks(self, chunks: List[str]) -> np.ndarray:
def embed_chunks(self, chunks: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of text chunks
@@ -55,12 +55,12 @@ class FastEmbed(BaseEmbedder):
chunks: List of text chunks to embed
Returns:
Array of embeddings
List of embeddings
"""
embeddings = list(self.model.embed(chunks))
return np.stack(embeddings)
return embeddings
def embed_texts(self, texts: List[str]) -> np.ndarray:
def embed_texts(self, texts: List[str]) -> List[np.ndarray]:
"""
Generate embeddings for a list of texts
@@ -68,10 +68,10 @@ class FastEmbed(BaseEmbedder):
texts: List of texts to embed
Returns:
Array of embeddings
List of embeddings
"""
embeddings = list(self.model.embed(texts))
return np.stack(embeddings)
return embeddings
def embed_text(self, text: str) -> np.ndarray:
"""

View File

@@ -1,28 +1,138 @@
from pathlib import Path
from typing import Dict, List
from typing import Dict, Iterator, List, Optional, Union
from urllib.parse import urlparse
from crewai.knowledge.source.base_file_knowledge_source import BaseFileKnowledgeSource
from pydantic import Field, field_validator
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
class ExcelKnowledgeSource(BaseFileKnowledgeSource):
class ExcelKnowledgeSource(BaseKnowledgeSource):
"""A knowledge source that stores and queries Excel file content using embeddings."""
def load_content(self) -> Dict[Path, str]:
"""Load and preprocess Excel file content."""
pd = self._import_dependencies()
# override content to be a dict of file paths to sheet names to csv content
_logger: Logger = Logger(verbose=True)
file_path: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default=None,
description="[Deprecated] The path to the file. Use file_paths instead.",
)
file_paths: Optional[Union[Path, List[Path], str, List[str]]] = Field(
default_factory=list, description="The path to the file"
)
chunks: List[str] = Field(default_factory=list)
content: Dict[Path, Dict[str, str]] = Field(default_factory=dict)
safe_file_paths: List[Path] = Field(default_factory=list)
@field_validator("file_path", "file_paths", mode="before")
def validate_file_path(cls, v, info):
"""Validate that at least one of file_path or file_paths is provided."""
# Single check if both are None, O(1) instead of nested conditions
if (
v is None
and info.data.get(
"file_path" if info.field_name == "file_paths" else "file_paths"
)
is None
):
raise ValueError("Either file_path or file_paths must be provided")
return v
def _process_file_paths(self) -> List[Path]:
"""Convert file_path to a list of Path objects."""
if hasattr(self, "file_path") and self.file_path is not None:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
if self.file_paths is None:
raise ValueError("Your source must be provided with a file_paths: []")
# Convert single path to list
path_list: List[Union[Path, str]] = (
[self.file_paths]
if isinstance(self.file_paths, (str, Path))
else list(self.file_paths)
if isinstance(self.file_paths, list)
else []
)
if not path_list:
raise ValueError(
"file_path/file_paths must be a Path, str, or a list of these types"
)
return [self.convert_to_path(path) for path in path_list]
def validate_content(self):
"""Validate the paths."""
for path in self.safe_file_paths:
if not path.exists():
self._logger.log(
"error",
f"File not found: {path}. Try adding sources to the knowledge directory. If it's inside the knowledge directory, use the relative path.",
color="red",
)
raise FileNotFoundError(f"File not found: {path}")
if not path.is_file():
self._logger.log(
"error",
f"Path is not a file: {path}",
color="red",
)
def model_post_init(self, _) -> None:
if self.file_path:
self._logger.log(
"warning",
"The 'file_path' attribute is deprecated and will be removed in a future version. Please use 'file_paths' instead.",
color="yellow",
)
self.file_paths = self.file_path
self.safe_file_paths = self._process_file_paths()
self.validate_content()
self.content = self._load_content()
def _load_content(self) -> Dict[Path, Dict[str, str]]:
"""Load and preprocess Excel file content from multiple sheets.
Each sheet's content is converted to CSV format and stored.
Returns:
Dict[Path, Dict[str, str]]: A mapping of file paths to their respective sheet contents.
Raises:
ImportError: If required dependencies are missing.
FileNotFoundError: If the specified Excel file cannot be opened.
"""
pd = self._import_dependencies()
content_dict = {}
for file_path in self.safe_file_paths:
file_path = self.convert_to_path(file_path)
df = pd.read_excel(file_path)
content = df.to_csv(index=False)
content_dict[file_path] = content
with pd.ExcelFile(file_path) as xl:
sheet_dict = {
str(sheet_name): str(
pd.read_excel(xl, sheet_name).to_csv(index=False)
)
for sheet_name in xl.sheet_names
}
content_dict[file_path] = sheet_dict
return content_dict
def convert_to_path(self, path: Union[Path, str]) -> Path:
"""Convert a path to a Path object."""
return Path(KNOWLEDGE_DIRECTORY + "/" + path) if isinstance(path, str) else path
def _import_dependencies(self):
"""Dynamically import dependencies."""
try:
import openpyxl # noqa
import pandas as pd
return pd
@@ -38,10 +148,14 @@ class ExcelKnowledgeSource(BaseFileKnowledgeSource):
and save the embeddings.
"""
# Convert dictionary values to a single string if content is a dictionary
if isinstance(self.content, dict):
content_str = "\n".join(str(value) for value in self.content.values())
else:
content_str = str(self.content)
# Updated to account for .xlsx workbooks with multiple tabs/sheets
content_str = ""
for value in self.content.values():
if isinstance(value, dict):
for sheet_value in value.values():
content_str += str(sheet_value) + "\n"
else:
content_str += str(value) + "\n"
new_chunks = self._chunk_text(content_str)
self.chunks.extend(new_chunks)

View File

@@ -154,15 +154,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
filtered_ids.append(doc_id)
# If we have no metadata at all, set it to None
final_metadata: Optional[List[Dict[str, Union[str, int, float, bool]]]] = None
if not all(m is None for m in filtered_metadata):
final_metadata = []
for m in filtered_metadata:
if m is not None:
filtered_m = {k: v for k, v in m.items() if isinstance(v, (str, int, float, bool))}
final_metadata.append(filtered_m)
else:
final_metadata.append({"empty": True})
final_metadata: Optional[OneOrMany[chromadb.Metadata]] = (
None if all(m is None for m in filtered_metadata) else filtered_metadata
)
self.collection.upsert(
documents=filtered_docs,

View File

@@ -6,17 +6,12 @@ import shutil
import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI, Collection
from chromadb.api.types import Documents, Embeddings, Metadatas
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path
from crewai.utilities.exceptions.embedding_exceptions import (
EmbeddingConfigurationError,
EmbeddingInitializationError
)
@contextlib.contextmanager
@@ -37,24 +32,15 @@ def suppress_logging(
class RAGStorage(BaseRAGStorage):
"""RAG-based Storage implementation using ChromaDB for vector storage and retrieval.
This class extends BaseRAGStorage to handle embeddings for memory entries,
improving search efficiency through vector similarity.
Attributes:
app: ChromaDB client instance
collection: ChromaDB collection for storing embeddings
type: Type of memory storage
allow_reset: Whether memory reset is allowed
path: Custom storage path for the database
"""
Extends Storage to handle embeddings for memory entries, improving
search efficiency.
"""
app: ClientAPI | None = None
collection: Any = None
def __init__(
self, type: str, allow_reset: bool = True, embedder_config: Dict[str, Any] | None = None, crew: Any = None, path: str | None = None
self, type, allow_reset=True, embedder_config=None, crew=None, path=None
):
super().__init__(type, allow_reset, embedder_config, crew)
agents = crew.agents if crew else []
@@ -64,6 +50,7 @@ class RAGStorage(BaseRAGStorage):
self.storage_file_name = self._build_storage_file_name(type, agents)
self.type = type
self.allow_reset = allow_reset
self.path = path
self._initialize_app()
@@ -72,36 +59,26 @@ class RAGStorage(BaseRAGStorage):
configurator = EmbeddingConfigurator()
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self) -> None:
"""Initialize the ChromaDB client and collection.
Raises:
RuntimeError: If ChromaDB client initialization fails
EmbeddingConfigurationError: If embedding configuration is invalid
EmbeddingInitializationError: If embedding function fails to initialize
"""
def _initialize_app(self):
import chromadb
from chromadb.config import Settings
self._set_embedder_config()
try:
self.app = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
if not self.app:
raise RuntimeError("Failed to initialize ChromaDB client")
chroma_client = chromadb.PersistentClient(
path=self.path if self.path else self.storage_file_name,
settings=Settings(allow_reset=self.allow_reset),
)
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception as e:
raise RuntimeError(f"Failed to initialize ChromaDB: {str(e)}")
self.app = chroma_client
try:
self.collection = self.app.get_collection(
name=self.type, embedding_function=self.embedder_config
)
except Exception:
self.collection = self.app.create_collection(
name=self.type, embedding_function=self.embedder_config
)
def _sanitize_role(self, role: str) -> str:
"""
@@ -124,21 +101,12 @@ class RAGStorage(BaseRAGStorage):
return f"{base_path}/{file_name}"
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
"""Save a value with metadata to the memory storage.
Args:
value: The text content to store
metadata: Additional metadata for the stored content
Raises:
EmbeddingInitializationError: If embedding generation fails
"""
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
try:
self._generate_embedding(value, metadata)
except Exception as e:
raise EmbeddingInitializationError(self.type, str(e))
logging.error(f"Error during {self.type} save: {str(e)}")
def search(
self,
@@ -146,18 +114,7 @@ class RAGStorage(BaseRAGStorage):
limit: int = 3,
filter: Optional[dict] = None,
score_threshold: float = 0.35,
) -> List[Dict[str, Any]]:
"""Search for similar content in memory.
Args:
query: The search query text
limit: Maximum number of results to return
filter: Optional filter criteria
score_threshold: Minimum similarity score threshold
Returns:
List of matching results with metadata and scores
"""
) -> List[Any]:
if not hasattr(self, "app"):
self._initialize_app()
@@ -181,50 +138,37 @@ class RAGStorage(BaseRAGStorage):
logging.error(f"Error during {self.type} search: {str(e)}")
return []
def _generate_embedding(self, text: str, metadata: Optional[Dict[str, Any]] = None) -> Any:
"""Generate and store embeddings for the given text.
Args:
text: The text to generate embeddings for
metadata: Optional additional metadata to store with the embeddings
Returns:
Any: The generated embedding or None if only storing
"""
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> None: # type: ignore
if not hasattr(self, "app") or not hasattr(self, "collection"):
self._initialize_app()
try:
self.collection.add(
documents=[text],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
return None
except Exception as e:
raise EmbeddingInitializationError(self.type, f"Failed to generate embedding: {str(e)}")
self.collection.add(
documents=[text],
metadatas=[metadata or {}],
ids=[str(uuid.uuid4())],
)
def reset(self) -> None:
"""Reset the memory storage by clearing the database and removing files.
Raises:
RuntimeError: If memory reset fails and allow_reset is False
EmbeddingConfigurationError: If embedding configuration is invalid during reinitialization
"""
try:
if self.app:
self.app.reset()
storage_path = self.path if self.path else db_storage_path()
db_dir = os.path.join(storage_path, self.type)
if os.path.exists(db_dir):
shutil.rmtree(db_dir)
shutil.rmtree(f"{db_storage_path()}/{self.type}")
self.app = None
self.collection = None
except Exception as e:
if "attempt to write a readonly database" in str(e):
# Ignore this specific error as it's expected in some environments
# Ignore this specific error
pass
else:
if not self.allow_reset:
raise RuntimeError(f"Failed to reset {self.type} memory: {str(e)}")
logging.error(f"Error during {self.type} memory reset: {str(e)}")
raise Exception(
f"An error occurred while resetting the {self.type} memory: {e}"
)
def _create_default_embedding_function(self):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)

View File

@@ -9,13 +9,11 @@ from copy import copy
from hashlib import md5
from pathlib import Path
from typing import (
AbstractSet,
Any,
Callable,
ClassVar,
Dict,
List,
Mapping,
Optional,
Set,
Tuple,
@@ -111,7 +109,7 @@ class Task(BaseModel):
description="Task output, it's final result after being executed", default=None
)
tools: Optional[List[BaseTool]] = Field(
default_factory=lambda: [],
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
id: UUID4 = Field(
@@ -127,7 +125,7 @@ class Task(BaseModel):
description="A converter class used to export structured output",
default=None,
)
processed_by_agents: Set[str] = Field(default_factory=lambda: set())
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Callable[[TaskOutput], Tuple[bool, Any]]] = Field(
default=None,
description="Function to validate task output before proceeding to next task",
@@ -608,56 +606,37 @@ class Task(BaseModel):
self.delegations += 1
def copy(
self,
*,
include: Optional[AbstractSet[int] | AbstractSet[str] | Mapping[int, Any] | Mapping[str, Any]] = None,
exclude: Optional[AbstractSet[int] | AbstractSet[str] | Mapping[int, Any] | Mapping[str, Any]] = None,
update: Optional[Dict[str, Any]] = None,
deep: bool = False,
) -> "Task":
"""Create a copy of the Task."""
exclude_set = {"id", "agent", "context", "tools"}
if exclude:
if isinstance(exclude, (AbstractSet, set)):
exclude_set.update(str(x) for x in exclude)
elif isinstance(exclude, Mapping):
exclude_set.update(str(x) for x in exclude.keys())
copied_task = super().copy(
include=include,
exclude=exclude_set,
update=update,
deep=deep,
)
copied_task.id = uuid.uuid4()
copied_task.agent = None
copied_task.context = None
copied_task.tools = []
return copied_task
def copy_with_agents(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
) -> "Task":
"""Create a copy of the Task with agent references."""
copied_task = self.copy()
"""Create a deep copy of the Task."""
exclude = {
"id",
"agent",
"context",
"tools",
}
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
cloned_context = (
[task_mapping[context_task.key] for context_task in self.context]
if self.context
else None
)
def get_agent_by_role(role: str) -> Union["BaseAgent", None]:
return next((agent for agent in agents if agent.role == role), None)
if self.agent:
copied_task.agent = get_agent_by_role(self.agent.role)
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
if self.context:
copied_task.context = [
task_mapping[context_task.key]
for context_task in self.context
if context_task.key in task_mapping
]
if self.tools:
copied_task.tools = copy(self.tools)
copied_task = Task(
**copied_data,
context=cloned_context,
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task

View File

@@ -1,9 +1,7 @@
import json
from typing import Any, Callable, Dict, Optional
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field, model_validator
from pydantic.main import IncEx
from typing_extensions import Literal
from crewai.tasks.output_format import OutputFormat
@@ -36,8 +34,8 @@ class TaskOutput(BaseModel):
self.summary = f"{excerpt}..."
return self
def model_json(self) -> str:
"""Get the JSON representation of the output."""
@property
def json(self) -> Optional[str]:
if self.output_format != OutputFormat.JSON:
raise ValueError(
"""
@@ -46,37 +44,8 @@ class TaskOutput(BaseModel):
please make sure to set the output_json property for the task
"""
)
return json.dumps(self.json_dict) if self.json_dict else "{}"
def model_dump_json(
self,
*,
indent: Optional[int] = None,
include: Optional[IncEx] = None,
exclude: Optional[IncEx] = None,
context: Optional[Any] = None,
by_alias: bool = False,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
round_trip: bool = False,
warnings: bool | Literal["none", "warn", "error"] = False,
serialize_as_any: bool = False,
) -> str:
"""Override model_dump_json to handle custom JSON output."""
return super().model_dump_json(
indent=indent,
include=include,
exclude=exclude,
context=context,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
round_trip=round_trip,
warnings=warnings,
serialize_as_any=serialize_as_any,
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""

View File

@@ -82,12 +82,12 @@ class BaseAgentTool(BaseTool):
available_agents = [agent.role for agent in self.agents]
logger.debug(f"Available agents: {available_agents}")
matching_agents = [
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
available_agent
for available_agent in self.agents
if self.sanitize_agent_name(available_agent.role) == sanitized_name
]
logger.debug(f"Found {len(matching_agents)} matching agents for role '{sanitized_name}'")
logger.debug(f"Found {len(agent)} matching agents for role '{sanitized_name}'")
except (AttributeError, ValueError) as e:
# Handle specific exceptions that might occur during role name processing
return self.i18n.errors("agent_tool_unexisting_coworker").format(
@@ -97,7 +97,7 @@ class BaseAgentTool(BaseTool):
error=str(e)
)
if not matching_agents:
if not agent:
# No matching agent found after sanitization
return self.i18n.errors("agent_tool_unexisting_coworker").format(
coworkers="\n".join(
@@ -106,19 +106,19 @@ class BaseAgentTool(BaseTool):
error=f"No agent found with role '{sanitized_name}'"
)
selected_agent = matching_agents[0]
agent = agent[0]
try:
task_with_assigned_agent = Task(
description=task,
agent=selected_agent,
expected_output=selected_agent.i18n.slice("manager_request"),
i18n=selected_agent.i18n,
agent=agent,
expected_output=agent.i18n.slice("manager_request"),
i18n=agent.i18n,
)
logger.debug(f"Created task for agent '{self.sanitize_agent_name(selected_agent.role)}': {task}")
return selected_agent.execute_task(task_with_assigned_agent, context)
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
return agent.execute_task(task_with_assigned_agent, context)
except Exception as e:
# Handle task creation or execution errors
return self.i18n.errors("agent_tool_execution_error").format(
agent_role=self.sanitize_agent_name(selected_agent.role),
agent_role=self.sanitize_agent_name(agent.role),
error=str(e)
)

View File

@@ -1,36 +1,40 @@
import warnings
from abc import ABC, abstractmethod
from inspect import signature
from typing import Any, Callable, Dict, Optional, Type, Tuple, get_args, get_origin
from typing import Any, Callable, Type, get_args, get_origin
from pydantic import BaseModel, ConfigDict, Field, create_model, validator
from pydantic.fields import FieldInfo
from pydantic import (
BaseModel,
ConfigDict,
Field,
PydanticDeprecatedSince20,
create_model,
validator,
)
from pydantic import BaseModel as PydanticBaseModel
from crewai.tools.structured_tool import CrewStructuredTool
def _create_model_fields(fields: Dict[str, Tuple[Any, FieldInfo]]) -> Dict[str, Any]:
"""Helper function to create model fields with proper type hints."""
return {name: (annotation, field) for name, (annotation, field) in fields.items()}
# Ignore all "PydanticDeprecatedSince20" warnings globally
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
class BaseTool(BaseModel, ABC):
"""Base class for all tools."""
class _ArgsSchemaPlaceholder(PydanticBaseModel):
pass
model_config = ConfigDict(arbitrary_types_allowed=True)
func: Optional[Callable] = None
model_config = ConfigDict()
name: str
"""The unique name of the tool that clearly communicates its purpose."""
description: str
"""Used to tell the model how/when/why to use the tool."""
args_schema: Type[PydanticBaseModel] = Field(default=_ArgsSchemaPlaceholder)
args_schema: Type[PydanticBaseModel] = Field(default_factory=_ArgsSchemaPlaceholder)
"""The schema for the arguments that the tool accepts."""
description_updated: bool = False
"""Flag to check if the description has been updated."""
cache_function: Callable = lambda _args=None, _result=None: True
"""Function that will be used to determine if the tool should be cached."""
"""Function that will be used to determine if the tool should be cached, should return a boolean. If None, the tool will be cached."""
result_as_answer: bool = False
"""Flag to check if the tool should be the final agent answer."""
@@ -53,6 +57,7 @@ class BaseTool(BaseModel, ABC):
def model_post_init(self, __context: Any) -> None:
self._generate_description()
super().model_post_init(__context)
def run(
@@ -82,7 +87,50 @@ class BaseTool(BaseModel, ABC):
result_as_answer=self.result_as_answer,
)
def _set_args_schema(self) -> None:
@classmethod
def from_langchain(cls, tool: Any) -> "BaseTool":
"""Create a Tool instance from a CrewStructuredTool.
This method takes a CrewStructuredTool object and converts it into a
Tool instance. It ensures that the provided tool has a callable 'func'
attribute and infers the argument schema if not explicitly provided.
"""
if not hasattr(tool, "func") or not callable(tool.func):
raise ValueError("The provided tool must have a callable 'func' attribute.")
args_schema = getattr(tool, "args_schema", None)
if args_schema is None:
# Infer args_schema from the function signature if not provided
func_signature = signature(tool.func)
annotations = func_signature.parameters
args_fields = {}
for name, param in annotations.items():
if name != "self":
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
field_info = Field(
default=...,
description="",
)
args_fields[name] = (param_annotation, field_info)
if args_fields:
args_schema = create_model(f"{tool.name}Input", **args_fields)
else:
# Create a default schema with no fields if no parameters are found
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
)
return cls(
name=getattr(tool, "name", "Unnamed Tool"),
description=getattr(tool, "description", ""),
func=tool.func,
args_schema=args_schema,
)
def _set_args_schema(self):
if self.args_schema is None:
class_name = f"{self.__class__.__name__}Schema"
self.args_schema = type(
@@ -97,7 +145,7 @@ class BaseTool(BaseModel, ABC):
},
)
def _generate_description(self) -> None:
def _generate_description(self):
args_schema = {
name: {
"description": field.description,
@@ -131,25 +179,79 @@ class BaseTool(BaseModel, ABC):
class Tool(BaseTool):
"""Tool class that wraps a function."""
"""The function that will be executed when the tool is called."""
func: Callable
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, **kwargs):
if "func" not in kwargs:
raise ValueError("Tool requires a 'func' argument")
super().__init__(**kwargs)
def _run(self, *args: Any, **kwargs: Any) -> Any:
return self.func(*args, **kwargs)
@classmethod
def from_langchain(cls, tool: Any) -> "Tool":
"""Create a Tool instance from a CrewStructuredTool.
def tool(*args: Any) -> Any:
"""Decorator to create a tool from a function."""
This method takes a CrewStructuredTool object and converts it into a
Tool instance. It ensures that the provided tool has a callable 'func'
attribute and infers the argument schema if not explicitly provided.
Args:
tool (Any): The CrewStructuredTool object to be converted.
Returns:
Tool: A new Tool instance created from the provided CrewStructuredTool.
Raises:
ValueError: If the provided tool does not have a callable 'func' attribute.
"""
if not hasattr(tool, "func") or not callable(tool.func):
raise ValueError("The provided tool must have a callable 'func' attribute.")
args_schema = getattr(tool, "args_schema", None)
if args_schema is None:
# Infer args_schema from the function signature if not provided
func_signature = signature(tool.func)
annotations = func_signature.parameters
args_fields = {}
for name, param in annotations.items():
if name != "self":
param_annotation = (
param.annotation if param.annotation != param.empty else Any
)
field_info = Field(
default=...,
description="",
)
args_fields[name] = (param_annotation, field_info)
if args_fields:
args_schema = create_model(f"{tool.name}Input", **args_fields)
else:
# Create a default schema with no fields if no parameters are found
args_schema = create_model(
f"{tool.name}Input", __base__=PydanticBaseModel
)
return cls(
name=getattr(tool, "name", "Unnamed Tool"),
description=getattr(tool, "description", ""),
func=tool.func,
args_schema=args_schema,
)
def to_langchain(
tools: list[BaseTool | CrewStructuredTool],
) -> list[CrewStructuredTool]:
return [t.to_structured_tool() if isinstance(t, BaseTool) else t for t in tools]
def tool(*args):
"""
Decorator to create a tool from a function.
"""
def _make_with_name(tool_name: str) -> Callable:
def _make_tool(f: Callable) -> Tool:
def _make_tool(f: Callable) -> BaseTool:
if f.__doc__ is None:
raise ValueError("Function must have a docstring")
if f.__annotations__ is None:

View File

@@ -2,14 +2,9 @@ from __future__ import annotations
import inspect
import textwrap
from typing import Any, Callable, Dict, Optional, Tuple, Union, get_type_hints
from typing import Any, Callable, Optional, Union, get_type_hints
from pydantic import BaseModel, ConfigDict, Field, create_model
from pydantic.fields import FieldInfo
def _create_model_fields(fields: Dict[str, Tuple[Any, FieldInfo]]) -> Dict[str, Any]:
"""Helper function to create model fields with proper type hints."""
return {name: (annotation, field) for name, (annotation, field) in fields.items()}
from pydantic import BaseModel, Field, create_model
from crewai.utilities.logger import Logger
@@ -147,8 +142,7 @@ class CrewStructuredTool:
# Create model
schema_name = f"{name.title()}Schema"
model_fields = _create_model_fields(fields)
return create_model(schema_name, __base__=BaseModel, **model_fields)
return create_model(schema_name, **fields)
def _validate_function_signature(self) -> None:
"""Validate that the function signature matches the args schema."""

View File

@@ -4,7 +4,3 @@ DEFAULT_SCORE_THRESHOLD = 0.35
KNOWLEDGE_DIRECTORY = "knowledge"
MAX_LLM_RETRY = 3
MAX_FILE_NAME_LENGTH = 255
# Default embedding configuration
DEFAULT_EMBEDDING_PROVIDER = "openai"
DEFAULT_EMBEDDING_MODEL = "text-embedding-3-small"

View File

@@ -1,15 +1,9 @@
import os
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, Optional, cast
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.api.types import validate_embedding_function
from crewai.utilities.exceptions.embedding_exceptions import (
EmbeddingConfigurationError,
EmbeddingProviderError,
EmbeddingInitializationError
)
class EmbeddingConfigurator:
def __init__(self):
@@ -20,9 +14,11 @@ class EmbeddingConfigurator:
"vertexai": self._configure_vertexai,
"google": self._configure_google,
"cohere": self._configure_cohere,
"voyageai": self._configure_voyageai,
"bedrock": self._configure_bedrock,
"huggingface": self._configure_huggingface,
"watson": self._configure_watson,
"custom": self._configure_custom,
}
def configure_embedder(
@@ -35,119 +31,156 @@ class EmbeddingConfigurator:
provider = embedder_config.get("provider")
config = embedder_config.get("config", {})
model_name = config.get("model")
model_name = config.get("model") if provider != "custom" else None
if isinstance(provider, EmbeddingFunction):
try:
validate_embedding_function(provider)
return provider
except Exception as e:
raise EmbeddingConfigurationError(f"Invalid custom embedding function: {str(e)}")
if provider not in self.embedding_functions:
raise Exception(
f"Unsupported embedding provider: {provider}, supported providers: {list(self.embedding_functions.keys())}"
)
if not provider or provider not in self.embedding_functions:
raise EmbeddingProviderError(str(provider), list(self.embedding_functions.keys()))
try:
return self.embedding_functions[str(provider)](config, model_name)
except Exception as e:
raise EmbeddingInitializationError(str(provider), str(e))
@staticmethod
def _create_default_embedding_function() -> EmbeddingFunction:
from crewai.utilities.constants import DEFAULT_EMBEDDING_PROVIDER, DEFAULT_EMBEDDING_MODEL
provider = os.getenv("CREWAI_EMBEDDING_PROVIDER", DEFAULT_EMBEDDING_PROVIDER)
model = os.getenv("CREWAI_EMBEDDING_MODEL", DEFAULT_EMBEDDING_MODEL)
if provider == "openai":
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise EmbeddingConfigurationError("OpenAI API key is required but not provided")
from chromadb.utils.embedding_functions.openai_embedding_function import OpenAIEmbeddingFunction
return OpenAIEmbeddingFunction(api_key=api_key, model_name=model)
elif provider == "ollama":
from chromadb.utils.embedding_functions.ollama_embedding_function import OllamaEmbeddingFunction
url = os.getenv("CREWAI_OLLAMA_URL", "http://localhost:11434/api/embeddings")
return OllamaEmbeddingFunction(url=url, model_name=model)
else:
raise EmbeddingProviderError(provider, ["openai", "ollama"])
@staticmethod
def _configure_openai(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.openai_embedding_function import OpenAIEmbeddingFunction
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
embedding_function = self.embedding_functions[provider]
return (
embedding_function(config)
if provider == "custom"
else embedding_function(config, model_name)
)
@staticmethod
def _configure_azure(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.openai_embedding_function import OpenAIEmbeddingFunction
def _create_default_embedding_function():
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
@staticmethod
def _configure_openai(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key") or os.getenv("OPENAI_API_KEY"),
model_name=model_name,
api_base=config.get("api_base", None),
api_type=config.get("api_type", None),
api_version=config.get("api_version", None),
default_headers=config.get("default_headers", None),
dimensions=config.get("dimensions", None),
deployment_id=config.get("deployment_id", None),
organization_id=config.get("organization_id", None),
)
@staticmethod
def _configure_azure(config, model_name):
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)
return OpenAIEmbeddingFunction(
api_key=config.get("api_key"),
api_base=config.get("api_base"),
api_type=config.get("api_type", "azure"),
api_version=config.get("api_version"),
model_name=model_name,
default_headers=config.get("default_headers"),
dimensions=config.get("dimensions"),
deployment_id=config.get("deployment_id"),
organization_id=config.get("organization_id"),
)
@staticmethod
def _configure_ollama(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.ollama_embedding_function import OllamaEmbeddingFunction
def _configure_ollama(config, model_name):
from chromadb.utils.embedding_functions.ollama_embedding_function import (
OllamaEmbeddingFunction,
)
return OllamaEmbeddingFunction(
url=config.get("url", "http://localhost:11434/api/embeddings"),
model_name=model_name,
)
@staticmethod
def _configure_vertexai(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.google_embedding_function import GoogleVertexEmbeddingFunction
def _configure_vertexai(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleVertexEmbeddingFunction,
)
return GoogleVertexEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
project_id=config.get("project_id"),
region=config.get("region"),
)
@staticmethod
def _configure_google(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.google_embedding_function import GoogleGenerativeAiEmbeddingFunction
def _configure_google(config, model_name):
from chromadb.utils.embedding_functions.google_embedding_function import (
GoogleGenerativeAiEmbeddingFunction,
)
return GoogleGenerativeAiEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
task_type=config.get("task_type"),
)
@staticmethod
def _configure_cohere(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.cohere_embedding_function import CohereEmbeddingFunction
def _configure_cohere(config, model_name):
from chromadb.utils.embedding_functions.cohere_embedding_function import (
CohereEmbeddingFunction,
)
return CohereEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
@staticmethod
def _configure_bedrock(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import AmazonBedrockEmbeddingFunction
return AmazonBedrockEmbeddingFunction(
session=config.get("session"),
def _configure_voyageai(config, model_name):
from chromadb.utils.embedding_functions.voyageai_embedding_function import (
VoyageAIEmbeddingFunction,
)
return VoyageAIEmbeddingFunction(
model_name=model_name,
api_key=config.get("api_key"),
)
@staticmethod
def _configure_huggingface(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
from chromadb.utils.embedding_functions.huggingface_embedding_function import HuggingFaceEmbeddingServer
def _configure_bedrock(config, model_name):
from chromadb.utils.embedding_functions.amazon_bedrock_embedding_function import (
AmazonBedrockEmbeddingFunction,
)
# Allow custom model_name override with backwards compatibility
kwargs = {"session": config.get("session")}
if model_name is not None:
kwargs["model_name"] = model_name
return AmazonBedrockEmbeddingFunction(**kwargs)
@staticmethod
def _configure_huggingface(config, model_name):
from chromadb.utils.embedding_functions.huggingface_embedding_function import (
HuggingFaceEmbeddingServer,
)
return HuggingFaceEmbeddingServer(
url=config.get("api_url"),
)
@staticmethod
def _configure_watson(config: Dict[str, Any], model_name: str) -> EmbeddingFunction:
def _configure_watson(config, model_name):
try:
import ibm_watsonx_ai.foundation_models as watson_models
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames as EmbedParams
except ImportError as e:
raise EmbeddingConfigurationError(
"IBM Watson dependencies are not installed. Please install them to use Watson embedding.",
provider="watson"
)
raise ImportError(
"IBM Watson dependencies are not installed. Please install them to use Watson embedding."
) from e
class WatsonEmbeddingFunction(EmbeddingFunction):
def __call__(self, input: Documents) -> Embeddings:
@@ -172,6 +205,32 @@ class EmbeddingConfigurator:
embeddings = embedding.embed_documents(input)
return cast(Embeddings, embeddings)
except Exception as e:
raise EmbeddingInitializationError("watson", str(e))
print("Error during Watson embedding:", e)
raise e
return WatsonEmbeddingFunction()
@staticmethod
def _configure_custom(config):
custom_embedder = config.get("embedder")
if isinstance(custom_embedder, EmbeddingFunction):
try:
validate_embedding_function(custom_embedder)
return custom_embedder
except Exception as e:
raise ValueError(f"Invalid custom embedding function: {str(e)}")
elif callable(custom_embedder):
try:
instance = custom_embedder()
if isinstance(instance, EmbeddingFunction):
validate_embedding_function(instance)
return instance
raise ValueError(
"Custom embedder does not create an EmbeddingFunction instance"
)
except Exception as e:
raise ValueError(f"Error instantiating custom embedder: {str(e)}")
else:
raise ValueError(
"Custom embedder must be an instance of `EmbeddingFunction` or a callable that creates one"
)

View File

@@ -1,20 +0,0 @@
from typing import List, Optional
class EmbeddingConfigurationError(Exception):
def __init__(self, message: str, provider: Optional[str] = None):
self.message = message
self.provider = provider
super().__init__(self.message)
class EmbeddingProviderError(EmbeddingConfigurationError):
def __init__(self, provider: str, supported_providers: List[str]):
message = f"Unsupported embedding provider: {provider}, supported providers: {supported_providers}"
super().__init__(message, provider)
class EmbeddingInitializationError(EmbeddingConfigurationError):
def __init__(self, provider: str, error: str):
message = f"Failed to initialize embedding function for provider {provider}: {error}"
super().__init__(message, provider)

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
_cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 910c25fe3c8ca373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:16 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_a5651cefb4c7ae9483499a8e538f579d
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
_cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 910c25fa2a76a373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:15 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_3ad40f35d4ad6d42f9bbb7fd8ace9a68
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
_cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 910c26028e7ba373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:17 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_6f0f7d9e36bbb3ffea4d672b30b700b9
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
_cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 910c25f5e85ba373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:15 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_49a0c35d5999eb9f6d71fcea9fa3c68f
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
_cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 910c25f1be28a373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:14 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_573ccd417b499cf38012dec20261964d
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
_cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 910c25e84886a373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:13 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_e48016407be31edd1f9ee2e1bb4d1b30
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,114 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-RAY:
- 910c25e11d1da373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:11 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
path=/; expires=Wed, 12-Feb-25 11:43:11 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
cf-cache-status:
- DYNAMIC
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_e07e0bddd718b7d5978b6bd9f138fa40
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -0,0 +1,111 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Crew Manager. You
are a seasoned manager with a knack for getting the best out of your team.\nYou
are also known for your ability to delegate work to the right people, and to
ask the right questions to get the best out of your team.\nEven though you don''t
perform tasks by yourself, you have a lot of experience in the field, which
allows you to properly evaluate the work of your team members.\nYour personal
goal is: Manage the team to complete the task in the best way possible.\nYou
ONLY have access to the following tools, and should NEVER make up tools that
are not listed here:\n\nTool Name: Delegate work to coworker\nTool Arguments:
{''task'': {''description'': ''The task to delegate'', ''type'': ''str''}, ''context'':
{''description'': ''The context for the task'', ''type'': ''str''}, ''coworker'':
{''description'': ''The role/name of the coworker to delegate to'', ''type'':
''str''}}\nTool Description: Delegate a specific task to one of the following
coworkers: Researcher\nThe input to this tool should be the coworker, the task
you want them to do, and ALL necessary context to execute the task, they know
nothing about the task, so share absolute everything you know, don''t reference
things but instead explain them.\nTool Name: Ask question to coworker\nTool
Arguments: {''question'': {''description'': ''The question to ask'', ''type'':
''str''}, ''context'': {''description'': ''The context for the question'', ''type'':
''str''}, ''coworker'': {''description'': ''The role/name of the coworker to
ask'', ''type'': ''str''}}\nTool Description: Ask a specific question to one
of the following coworkers: Researcher\nThe input to this tool should be the
coworker, the question you have for them, and ALL necessary context to ask the
question properly, they know nothing about the question, so share absolute everything
you know, don''t reference things but instead explain them.\n\nIMPORTANT: Use
the following format in your response:\n\n```\nThought: you should always think
about what to do\nAction: the action to take, only one name of [Delegate work
to coworker, Ask question to coworker], just the name, exactly as it''s written.\nAction
Input: the input to the action, just a simple JSON object, enclosed in curly
braces, using \" to wrap keys and values.\nObservation: the result of the action\n```\n\nOnce
all necessary information is gathered, return the following format:\n\n```\nThought:
I now know the final answer\nFinal Answer: the final answer to the original
input question\n```"}, {"role": "user", "content": "\nCurrent Task: Test task
using test_tool\n\nThis is the expected criteria for your final answer: Test
output\nyou MUST return the actual complete content as the final answer, not
a summary.\n\nBegin! This is VERY important to you, use the tools available
and give your best Final Answer, your job depends on it!\n\nThought:"}], "model":
"gpt-4o", "stop": ["\nObservation:"]}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '2915'
content-type:
- application/json
cookie:
- __cf_bm=0nI_3cs9LKHlz0_cVifl9lVXZWfNQrzhLRZzoaDCVJs-1739358791-1.0.1.1-GJZzVGs1fHQe9gucVHUCoDlI3mwg3JyXCBIIChx_OsB0jgqbzt1s2et96vUgpdjONt9C2tB8OQ6fk70k3vhh0w;
_cfuvid=TvEPcNq35qaIgBFFzOP7g7NQf632eeMrvRDJyr7UTro-1739358791985-0.0.1.1-604800000
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.61.0
x-stainless-arch:
- x64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- Linux
x-stainless-package-version:
- 1.61.0
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"error\": {\n \"message\": \"Incorrect API key provided:
sk-fake-**********2345. You can find your API key at https://platform.openai.com/account/api-keys.\",\n
\ \"type\": \"invalid_request_error\",\n \"param\": null,\n \"code\":
\"invalid_api_key\"\n }\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 910c25ec6a9fa373-SEA
Connection:
- keep-alive
Content-Length:
- '272'
Content-Type:
- application/json; charset=utf-8
Date:
- Wed, 12 Feb 2025 11:13:13 GMT
Server:
- cloudflare
X-Content-Type-Options:
- nosniff
alt-svc:
- h3=":443"; ma=86400
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
vary:
- Origin
x-request-id:
- req_f9215c0efc514e3dfea73d6c5f49eb4e
http_version: HTTP/1.1
status_code: 401
version: 1

View File

@@ -1,30 +1,42 @@
# conftest.py
import os
import tempfile
from pathlib import Path
import pytest
from dotenv import load_dotenv
load_result = load_dotenv(override=True)
@pytest.fixture(autouse=True)
def setup_test_env():
"""Configure test environment to use Ollama as the default embedding provider."""
# Store original environment variables
original_env = {
"CREWAI_EMBEDDING_PROVIDER": os.environ.get("CREWAI_EMBEDDING_PROVIDER"),
"CREWAI_EMBEDDING_MODEL": os.environ.get("CREWAI_EMBEDDING_MODEL"),
"CREWAI_OLLAMA_URL": os.environ.get("CREWAI_OLLAMA_URL"),
}
# Set test environment
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "ollama"
os.environ["CREWAI_EMBEDDING_MODEL"] = "llama2"
os.environ["CREWAI_OLLAMA_URL"] = "http://localhost:11434/api/embeddings"
yield
# Restore original environment
for key, value in original_env.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
def mock_openai_key(monkeypatch):
"""Mock OpenAI API key for VCR cassettes."""
monkeypatch.setenv("OPENAI_API_KEY", "sk-fake-test-key-12345")
@pytest.fixture(autouse=True)
def setup_test_environment():
"""Set up test environment with a temporary directory for SQLite storage."""
with tempfile.TemporaryDirectory() as temp_dir:
# Create the directory with proper permissions
storage_dir = Path(temp_dir) / "crewai_test_storage"
storage_dir.mkdir(parents=True, exist_ok=True)
# Validate that the directory was created successfully
if not storage_dir.exists() or not storage_dir.is_dir():
raise RuntimeError(f"Failed to create test storage directory: {storage_dir}")
# Verify directory permissions
try:
# Try to create a test file to verify write permissions
test_file = storage_dir / ".permissions_test"
test_file.touch()
test_file.unlink()
except (OSError, IOError) as e:
raise RuntimeError(f"Test storage directory {storage_dir} is not writable: {e}")
# Set environment variable to point to the test storage directory
os.environ["CREWAI_STORAGE_DIR"] = str(storage_dir)
yield
# Cleanup is handled automatically when tempfile context exits

View File

@@ -3,6 +3,7 @@
import hashlib
import json
from concurrent.futures import Future
from typing import Any
from unittest import mock
from unittest.mock import MagicMock, patch
@@ -14,7 +15,9 @@ from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.knowledge.source.string_knowledge_source import (
StringKnowledgeSource,
)
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.process import Process
from crewai.project import crew
@@ -22,10 +25,13 @@ from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import Logger
from crewai.utilities.rpm_controller import RPMController
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.task_output_storage_handler import (
TaskOutputStorageHandler,
)
ceo = Agent(
role="CEO",
@@ -51,6 +57,7 @@ writer = Agent(
def test_crew_with_only_conditional_tasks_raises_error():
"""Test that creating a crew with only conditional tasks raises an error."""
def condition_func(task_output: TaskOutput) -> bool:
return True
@@ -82,6 +89,7 @@ def test_crew_with_only_conditional_tasks_raises_error():
tasks=[conditional1, conditional2, conditional3],
)
def test_crew_config_conditional_requirement():
with pytest.raises(ValueError):
Crew(process=Process.sequential)
@@ -313,6 +321,69 @@ def test_sync_task_execution():
assert mock_execute_sync.call_count == len(tasks)
@pytest.mark.vcr(
filter_headers=["authorization"],
record_mode="once",
decode_compressed_response=True,
ignore_localhost=True,
match_on=["method", "scheme", "host", "port", "path"]
)
@pytest.mark.parametrize(
"tool_output,expected",
[
("test result```", "test result"),
("test result`", "test result"),
("test result``````", "test result"),
("test result", "test result"),
("test ```result```", "test ```result"), # Only strip trailing backticks
(None, "None"), # Test non-string input gets converted to string
(" ", " "), # Test whitespace string
(
"malformed`result```test",
"malformed`result```test",
), # Test non-trailing backticks
],
)
def test_hierarchical_tool_output_formatting(tool_output, expected):
"""Test that tool outputs in hierarchical mode don't have extra backticks.
This test verifies that the tool output cleaning functionality correctly handles
various scenarios of backtick formatting, ensuring only trailing backticks are
removed while preserving any inline markdown formatting.
"""
class TestTool(BaseTool):
name: str = "test_tool"
description: str = "A test tool"
def _run(self, *args: Any, **kwargs: Any) -> str:
return tool_output
task = Task(
description="Test task using test_tool",
expected_output="Test output",
)
crew = Crew(
agents=[researcher],
process=Process.hierarchical,
manager_llm="gpt-4o",
tasks=[task],
tools=[TestTool()],
)
with patch.object(
Task,
"execute_sync",
return_value=TaskOutput(
description="Test task", raw=expected, agent="researcher"
),
) as mock_execute_sync:
result = crew.kickoff()
assert mock_execute_sync.called
assert result.raw == expected
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_process():
task = Task(
@@ -1952,6 +2023,7 @@ def test_task_callback_on_crew():
def test_task_callback_both_on_task_and_crew():
from unittest.mock import MagicMock, patch
mock_callback_on_task = MagicMock()
mock_callback_on_crew = MagicMock()
@@ -2101,21 +2173,22 @@ def test_conditional_task_uses_last_output():
expected_output="First output",
agent=researcher,
)
def condition_fails(task_output: TaskOutput) -> bool:
# This condition will never be met
return "never matches" in task_output.raw.lower()
def condition_succeeds(task_output: TaskOutput) -> bool:
# This condition will match first task's output
return "first success" in task_output.raw.lower()
conditional_task1 = ConditionalTask(
description="Second task - conditional that fails condition",
expected_output="Second output",
agent=researcher,
condition=condition_fails,
)
conditional_task2 = ConditionalTask(
description="Third task - conditional that succeeds using first task output",
expected_output="Third output",
@@ -2134,35 +2207,37 @@ def test_conditional_task_uses_last_output():
raw="First success output", # Will be used by third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Second task output",
raw="", # Empty output since condition fails
agent=researcher.role,
)
mock_third = TaskOutput(
description="Third task output",
raw="Third task executed", # Output when condition succeeds using first task output
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_first, mock_third]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify outputs collection
# Verify outputs collection:
# First executed task output, followed by an automatically generated (skipped) output, then the conditional execution
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "First success output" # First task succeeded
assert result.tasks_output[1].raw == "" # Second task skipped (condition failed)
assert result.tasks_output[2].raw == "Third task executed" # Third task used first task's output
assert (
result.tasks_output[0].raw == "First success output"
) # First task succeeded
assert (
result.tasks_output[1].raw == ""
) # Second task skipped (condition failed)
assert (
result.tasks_output[2].raw == "Third task executed"
) # Third task used first task's output
@pytest.mark.vcr(filter_headers=["authorization"])
def test_conditional_tasks_result_collection():
@@ -2172,20 +2247,20 @@ def test_conditional_tasks_result_collection():
expected_output="First output",
agent=researcher,
)
def condition_never_met(task_output: TaskOutput) -> bool:
return "never matches" in task_output.raw.lower()
def condition_always_met(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
task2 = ConditionalTask(
description="Conditional task that never executes",
expected_output="Second output",
agent=researcher,
condition=condition_never_met,
)
task3 = ConditionalTask(
description="Conditional task that always executes",
expected_output="Third output",
@@ -2204,35 +2279,46 @@ def test_conditional_tasks_result_collection():
raw="Success output", # Triggers third task's condition
agent=researcher.role,
)
mock_skipped = TaskOutput(
description="Skipped output",
raw="", # Empty output for skipped task
agent=researcher.role,
)
mock_conditional = TaskOutput(
description="Conditional output",
raw="Conditional task executed",
agent=writer.role,
)
# Set up mocks for task execution and conditional logic
with patch.object(ConditionalTask, "should_execute") as mock_should_execute:
# First conditional fails, second succeeds
mock_should_execute.side_effect = [False, True]
with patch.object(Task, "execute_sync") as mock_execute:
mock_execute.side_effect = [mock_success, mock_conditional]
result = crew.kickoff()
# Verify execution behavior
assert mock_execute.call_count == 2 # Only first and third tasks execute
assert mock_should_execute.call_count == 2 # Both conditionals checked
# Verify task output collection:
# There should be three outputs: normal task, skipped conditional task (empty output),
# and the conditional task that executed.
assert len(result.tasks_output) == 3
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
# Verify task output collection
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Success output" # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert result.tasks_output[2].raw == "Conditional task executed" # Third task executed
assert (
result.tasks_output[0].raw == "Success output"
) # Normal task executed
assert result.tasks_output[1].raw == "" # Second task skipped
assert (
result.tasks_output[2].raw == "Conditional task executed"
) # Third task executed
@pytest.mark.vcr(filter_headers=["authorization"])
def test_multiple_conditional_tasks():
@@ -2242,20 +2328,20 @@ def test_multiple_conditional_tasks():
expected_output="Research output",
agent=researcher,
)
def condition1(task_output: TaskOutput) -> bool:
return "success" in task_output.raw.lower()
def condition2(task_output: TaskOutput) -> bool:
return "proceed" in task_output.raw.lower()
task2 = ConditionalTask(
description="First conditional task",
expected_output="Conditional output 1",
agent=writer,
condition=condition1,
)
task3 = ConditionalTask(
description="Second conditional task",
expected_output="Conditional output 2",
@@ -2274,7 +2360,7 @@ def test_multiple_conditional_tasks():
raw="Success and proceed output",
agent=researcher.role,
)
# Set up mocks for task execution
with patch.object(Task, "execute_sync", return_value=mock_success) as mock_execute:
result = crew.kickoff()
@@ -2282,6 +2368,7 @@ def test_multiple_conditional_tasks():
assert mock_execute.call_count == 3
assert len(result.tasks_output) == 3
@pytest.mark.vcr(filter_headers=["authorization"])
def test_using_contextual_memory():
from unittest.mock import patch

View File

@@ -1,91 +0,0 @@
import os
import tempfile
import pytest
from crewai.memory import ShortTermMemory, LongTermMemory, EntityMemory
from crewai.utilities.exceptions.embedding_exceptions import (
EmbeddingConfigurationError,
EmbeddingProviderError
)
from crewai.utilities import EmbeddingConfigurator
@pytest.fixture
def temp_db_dir():
with tempfile.TemporaryDirectory() as tmpdir:
yield tmpdir
def test_memory_reset_with_ollama(temp_db_dir):
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "ollama"
os.environ["CREWAI_EMBEDDING_MODEL"] = "llama2"
memories = [
ShortTermMemory(path=temp_db_dir),
LongTermMemory(path=temp_db_dir),
EntityMemory(path=temp_db_dir)
]
for memory in memories:
memory.reset()
def test_memory_reset_with_openai(temp_db_dir):
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "openai"
os.environ["CREWAI_EMBEDDING_MODEL"] = "text-embedding-3-small"
memories = [
ShortTermMemory(path=temp_db_dir),
LongTermMemory(path=temp_db_dir),
EntityMemory(path=temp_db_dir)
]
for memory in memories:
memory.reset()
def test_memory_reset_with_invalid_provider(temp_db_dir):
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "invalid_provider"
with pytest.raises(EmbeddingProviderError):
memories = [
ShortTermMemory(path=temp_db_dir),
LongTermMemory(path=temp_db_dir),
EntityMemory(path=temp_db_dir)
]
for memory in memories:
memory.reset()
def test_memory_reset_with_invalid_configuration(temp_db_dir):
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "openai"
os.environ.pop("OPENAI_API_KEY", None)
with pytest.raises(EmbeddingConfigurationError):
memories = [
ShortTermMemory(path=temp_db_dir),
LongTermMemory(path=temp_db_dir),
EntityMemory(path=temp_db_dir)
]
for memory in memories:
memory.reset()
def test_memory_reset_with_missing_ollama_url(temp_db_dir):
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "ollama"
os.environ.pop("CREWAI_OLLAMA_URL", None)
# Should use default URL when CREWAI_OLLAMA_URL is not set
memories = [
ShortTermMemory(path=temp_db_dir),
LongTermMemory(path=temp_db_dir),
EntityMemory(path=temp_db_dir)
]
for memory in memories:
memory.reset()
def test_memory_reset_with_custom_path(temp_db_dir):
os.environ["CREWAI_EMBEDDING_PROVIDER"] = "ollama"
custom_path = os.path.join(temp_db_dir, "custom")
os.makedirs(custom_path, exist_ok=True)
memories = [
ShortTermMemory(path=custom_path),
LongTermMemory(path=custom_path),
EntityMemory(path=custom_path)
]
for memory in memories:
memory.reset()
assert not os.path.exists(os.path.join(custom_path, "short_term"))
assert not os.path.exists(os.path.join(custom_path, "long_term"))
assert not os.path.exists(os.path.join(custom_path, "entity"))