Merge branch 'main' into bug_fix

This commit is contained in:
Lucas Gomide
2025-04-02 10:00:53 -03:00
committed by GitHub
47 changed files with 2749 additions and 444 deletions

View File

@@ -142,15 +142,13 @@ class Agent(BaseAgent):
self.embedder = crew_embedder
if self.knowledge_sources:
full_pattern = re.compile(r"[^a-zA-Z0-9\-_\r\n]|(\.\.)")
knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}"
if isinstance(self.knowledge_sources, list) and all(
isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources
):
self.knowledge = Knowledge(
sources=self.knowledge_sources,
embedder=self.embedder,
collection_name=knowledge_agent_name,
collection_name=self.role,
storage=self.knowledge_storage or None,
)
except (TypeError, ValueError) as e:

View File

@@ -20,7 +20,6 @@ from crewai.utilities import I18N, Printer
from crewai.utilities.constants import MAX_LLM_RETRY, TRAINING_DATA_FILE
from crewai.utilities.events import (
ToolUsageErrorEvent,
ToolUsageStartedEvent,
crewai_event_bus,
)
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
@@ -153,8 +152,21 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
formatted_answer = self._process_llm_response(answer)
if isinstance(formatted_answer, AgentAction):
# Extract agent fingerprint if available
fingerprint_context = {}
if (
self.agent
and hasattr(self.agent, "security_config")
and hasattr(self.agent.security_config, "fingerprint")
):
fingerprint_context = {
"agent_fingerprint": str(
self.agent.security_config.fingerprint
)
}
tool_result = self._execute_tool_and_check_finality(
formatted_answer
formatted_answer, fingerprint_context=fingerprint_context
)
formatted_answer = self._handle_agent_action(
formatted_answer, tool_result
@@ -360,19 +372,35 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
content=f"\033[95m## Final Answer:\033[00m \033[92m\n{formatted_answer.output}\033[00m\n\n"
)
def _execute_tool_and_check_finality(self, agent_action: AgentAction) -> ToolResult:
def _execute_tool_and_check_finality(
self,
agent_action: AgentAction,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> ToolResult:
try:
fingerprint_context = fingerprint_context or {}
if self.agent:
# Create tool usage event with fingerprint information
event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
event_data.update(fingerprint_context)
# Emit the tool usage started event with agent information
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
agent_key=self.agent.key,
agent_role=self.agent.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
),
event=ToolUsageStartedEvent(**event_data),
)
tool_usage = ToolUsage(
tools_handler=self.tools_handler,
tools=self.tools,
@@ -383,6 +411,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
task=self.task, # type: ignore[arg-type]
agent=self.agent,
action=agent_action,
fingerprint_context=fingerprint_context, # Pass fingerprint context
)
tool_calling = tool_usage.parse_tool_calling(agent_action.text)
@@ -411,16 +440,23 @@ class CrewAgentExecutor(CrewAgentExecutorMixin):
except Exception as e:
# TODO: drop
if self.agent:
error_event_data = {
"agent_key": self.agent.key,
"agent_role": self.agent.role,
"tool_name": agent_action.tool,
"tool_args": agent_action.tool_input,
"tool_class": agent_action.tool,
"error": str(e),
"agent": self.agent, # Pass the agent object for fingerprint extraction
}
# Include fingerprint context
if fingerprint_context:
error_event_data.update(fingerprint_context)
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent( # validation error
agent_key=self.agent.key,
agent_role=self.agent.role,
tool_name=agent_action.tool,
tool_args=agent_action.tool_input,
tool_class=agent_action.tool,
error=str(e),
),
event=ToolUsageErrorEvent(**error_event_data),
)
raise e

View File

@@ -6,9 +6,8 @@ import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union, cast
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast
from langchain_core.tools import BaseTool as LangchainBaseTool
from pydantic import (
UUID4,
BaseModel,
@@ -291,23 +290,17 @@ class Crew(BaseModel):
else EntityMemory(crew=self, embedder_config=self.embedder)
)
if (
self.memory_config and "user_memory" in self.memory_config
self.memory_config
and "user_memory" in self.memory_config
and self.memory_config.get("provider") == "mem0"
): # Check for user_memory in config
user_memory_config = self.memory_config["user_memory"]
if isinstance(
user_memory_config, UserMemory
): # Check if it is already an instance
self._user_memory = user_memory_config
elif isinstance(
user_memory_config, dict
): # Check if it's a configuration dict
self._user_memory = UserMemory(
crew=self, **user_memory_config
) # Initialize with config
self._user_memory = UserMemory(crew=self)
else:
raise TypeError(
"user_memory must be a UserMemory instance or a configuration dictionary"
)
raise TypeError("user_memory must be a configuration dictionary")
else:
self._user_memory = None # No user memory if not in config
return self
@@ -490,7 +483,7 @@ class Crew(BaseModel):
task.key for task in self.tasks
]
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@property
def fingerprint(self) -> Fingerprint:
"""
@@ -1157,7 +1150,12 @@ class Crew(BaseModel):
return required_inputs
def copy(self):
"""Create a deep copy of the Crew."""
"""
Creates a deep copy of the Crew instance.
Returns:
Crew: A new instance with copied components
"""
exclude = {
"id",
@@ -1169,13 +1167,18 @@ class Crew(BaseModel):
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
"knowledge_sources",
"knowledge",
"manager_agent",
"manager_llm",
}
cloned_agents = [agent.copy() for agent in self.agents]
manager_agent = self.manager_agent.copy() if self.manager_agent else None
manager_llm = shallow_copy(self.manager_llm) if self.manager_llm else None
task_mapping = {}
@@ -1208,6 +1211,8 @@ class Crew(BaseModel):
tasks=cloned_tasks,
knowledge_sources=existing_knowledge_sources,
knowledge=existing_knowledge,
manager_agent=manager_agent,
manager_llm=manager_llm,
)
return copied_crew

View File

@@ -14,6 +14,7 @@ from chromadb.config import Settings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger
from crewai.utilities.paths import db_storage_path
@@ -99,7 +100,8 @@ class KnowledgeStorage(BaseKnowledgeStorage):
)
if self.app:
self.collection = self.app.get_or_create_collection(
name=collection_name, embedding_function=self.embedder
name=sanitize_collection_name(collection_name),
embedding_function=self.embedder,
)
else:
raise Exception("Vector Database Client not initialized")

View File

@@ -94,6 +94,10 @@ class ContextualMemory:
Returns:
str: Formatted user memories as bullet points, or an empty string if none found.
"""
if self.um is None:
return ""
user_memories = self.um.search(query)
if not user_memories:
return ""

View File

@@ -31,6 +31,7 @@ class Mem0Storage(Storage):
mem0_api_key = config.get("api_key") or os.getenv("MEM0_API_KEY")
mem0_org_id = config.get("org_id")
mem0_project_id = config.get("project_id")
mem0_local_config = config.get("local_mem0_config")
# Initialize MemoryClient or Memory based on the presence of the mem0_api_key
if mem0_api_key:
@@ -41,7 +42,10 @@ class Mem0Storage(Storage):
else:
self.memory = MemoryClient(api_key=mem0_api_key)
else:
self.memory = Memory() # Fallback to Memory if no Mem0 API key is provided
if mem0_local_config and len(mem0_local_config):
self.memory = Memory.from_config(config)
else:
self.memory = Memory()
def _sanitize_role(self, role: str) -> str:
"""
@@ -114,3 +118,7 @@ class Mem0Storage(Storage):
agents = [self._sanitize_role(agent.role) for agent in agents]
agents = "_".join(agents)
return agents
def reset(self):
if self.memory:
self.memory.reset()

View File

@@ -43,3 +43,11 @@ class UserMemory(Memory):
score_threshold=score_threshold,
)
return results
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the user memory: {e}"
)

View File

@@ -388,7 +388,7 @@ class Task(BaseModel):
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
crewai_event_bus.emit(self, TaskStartedEvent(context=context))
crewai_event_bus.emit(self, TaskStartedEvent(context=context, task=self))
result = agent.execute_task(
task=self,
context=context,
@@ -464,11 +464,11 @@ class Task(BaseModel):
)
)
self._save_file(content)
crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output))
crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output, task=self))
return task_output
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e)))
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
raise e # Re-raise the exception after emitting the event
def prompt(self) -> str:
@@ -572,7 +572,15 @@ class Task(BaseModel):
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
) -> "Task":
"""Create a deep copy of the Task."""
"""Creates a deep copy of the Task while preserving its original class type.
Args:
agents: List of agents available for the task.
task_mapping: Dictionary mapping task IDs to Task instances.
Returns:
A copy of the task with the same class type as the original.
"""
exclude = {
"id",
"agent",
@@ -595,7 +603,7 @@ class Task(BaseModel):
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = Task(
copied_task = self.__class__(
**copied_data,
context=cloned_context,
agent=cloned_agent,

View File

@@ -112,6 +112,23 @@ class Telemetry:
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
# Add fingerprint data
if hasattr(crew, "fingerprint") and crew.fingerprint:
self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str)
self._add_attribute(
span,
"crew_fingerprint_created_at",
crew.fingerprint.created_at.isoformat(),
)
# Add fingerprint metadata if it exists
if hasattr(crew.fingerprint, "metadata") and crew.fingerprint.metadata:
self._add_attribute(
span,
"crew_fingerprint_metadata",
json.dumps(crew.fingerprint.metadata),
)
if crew.share_crew:
self._add_attribute(
span,
@@ -129,17 +146,43 @@ class Telemetry:
"max_rpm": agent.max_rpm,
"i18n": agent.i18n.prompt_file,
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
getattr(
getattr(agent, "function_calling_llm", None),
"model",
"",
)
if getattr(agent, "function_calling_llm", None)
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
"max_retry_limit": agent.max_retry_limit,
"allow_code_execution?": getattr(
agent, "allow_code_execution", False
),
"max_retry_limit": getattr(agent, "max_retry_limit", 3),
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
],
# Add agent fingerprint data if sharing crew details
"fingerprint": (
getattr(
getattr(agent, "fingerprint", None),
"uuid_str",
None,
)
),
"fingerprint_created_at": (
created_at.isoformat()
if (
created_at := getattr(
getattr(agent, "fingerprint", None),
"created_at",
None,
)
)
is not None
else None
),
}
for agent in crew.agents
]
@@ -169,6 +212,17 @@ class Telemetry:
"tools_names": [
tool.name.casefold() for tool in task.tools or []
],
# Add task fingerprint data if sharing crew details
"fingerprint": (
task.fingerprint.uuid_str
if hasattr(task, "fingerprint") and task.fingerprint
else None
),
"fingerprint_created_at": (
task.fingerprint.created_at.isoformat()
if hasattr(task, "fingerprint") and task.fingerprint
else None
),
}
for task in crew.tasks
]
@@ -196,14 +250,20 @@ class Telemetry:
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
"function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm
getattr(
getattr(agent, "function_calling_llm", None),
"model",
"",
)
if getattr(agent, "function_calling_llm", None)
else ""
),
"llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution,
"max_retry_limit": agent.max_retry_limit,
"allow_code_execution?": getattr(
agent, "allow_code_execution", False
),
"max_retry_limit": getattr(agent, "max_retry_limit", 3),
"tools_names": [
tool.name.casefold() for tool in agent.tools or []
],
@@ -252,6 +312,39 @@ class Telemetry:
self._add_attribute(created_span, "task_key", task.key)
self._add_attribute(created_span, "task_id", str(task.id))
# Add fingerprint data
if hasattr(crew, "fingerprint") and crew.fingerprint:
self._add_attribute(
created_span, "crew_fingerprint", crew.fingerprint.uuid_str
)
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(
created_span, "task_fingerprint", task.fingerprint.uuid_str
)
self._add_attribute(
created_span,
"task_fingerprint_created_at",
task.fingerprint.created_at.isoformat(),
)
# Add fingerprint metadata if it exists
if hasattr(task.fingerprint, "metadata") and task.fingerprint.metadata:
self._add_attribute(
created_span,
"task_fingerprint_metadata",
json.dumps(task.fingerprint.metadata),
)
# Add agent fingerprint if task has an assigned agent
if hasattr(task, "agent") and task.agent:
agent_fingerprint = getattr(
getattr(task.agent, "fingerprint", None), "uuid_str", None
)
if agent_fingerprint:
self._add_attribute(
created_span, "agent_fingerprint", agent_fingerprint
)
if crew.share_crew:
self._add_attribute(
created_span, "formatted_description", task.description
@@ -270,6 +363,21 @@ class Telemetry:
self._add_attribute(span, "task_key", task.key)
self._add_attribute(span, "task_id", str(task.id))
# Add fingerprint data to execution span
if hasattr(crew, "fingerprint") and crew.fingerprint:
self._add_attribute(span, "crew_fingerprint", crew.fingerprint.uuid_str)
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
# Add agent fingerprint if task has an assigned agent
if hasattr(task, "agent") and task.agent:
agent_fingerprint = getattr(
getattr(task.agent, "fingerprint", None), "uuid_str", None
)
if agent_fingerprint:
self._add_attribute(span, "agent_fingerprint", agent_fingerprint)
if crew.share_crew:
self._add_attribute(span, "formatted_description", task.description)
self._add_attribute(
@@ -291,7 +399,12 @@ class Telemetry:
Note:
If share_crew is enabled, this will also record the task output
"""
def operation():
# Ensure fingerprint data is present on completion span
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
if crew.share_crew:
self._add_attribute(
span,
@@ -312,6 +425,7 @@ class Telemetry:
tool_name (str): Name of the tool being repeatedly used
attempts (int): Number of attempts made with this tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Repeated Usage")
@@ -329,14 +443,16 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_usage(self, llm: Any, tool_name: str, attempts: int):
def tool_usage(self, llm: Any, tool_name: str, attempts: int, agent: Any = None):
"""Records the usage of a tool by an agent.
Args:
llm (Any): The language model being used
tool_name (str): Name of the tool being used
attempts (int): Number of attempts made with this tool
agent (Any, optional): The agent using the tool
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage")
@@ -349,17 +465,31 @@ class Telemetry:
self._add_attribute(span, "attempts", attempts)
if llm:
self._add_attribute(span, "llm", llm.model)
# Add agent fingerprint data if available
if agent and hasattr(agent, "fingerprint") and agent.fingerprint:
self._add_attribute(
span, "agent_fingerprint", agent.fingerprint.uuid_str
)
if hasattr(agent, "role"):
self._add_attribute(span, "agent_role", agent.role)
span.set_status(Status(StatusCode.OK))
span.end()
self._safe_telemetry_operation(operation)
def tool_usage_error(self, llm: Any):
def tool_usage_error(
self, llm: Any, agent: Any = None, tool_name: Optional[str] = None
):
"""Records when a tool usage results in an error.
Args:
llm (Any): The language model being used when the error occurred
agent (Any, optional): The agent using the tool
tool_name (str, optional): Name of the tool that caused the error
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage Error")
@@ -370,6 +500,18 @@ class Telemetry:
)
if llm:
self._add_attribute(span, "llm", llm.model)
if tool_name:
self._add_attribute(span, "tool_name", tool_name)
# Add agent fingerprint data if available
if agent and hasattr(agent, "fingerprint") and agent.fingerprint:
self._add_attribute(
span, "agent_fingerprint", agent.fingerprint.uuid_str
)
if hasattr(agent, "role"):
self._add_attribute(span, "agent_role", agent.role)
span.set_status(Status(StatusCode.OK))
span.end()
@@ -386,6 +528,7 @@ class Telemetry:
exec_time (int): Execution time in seconds
model_name (str): Name of the model used
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Individual Test Result")
@@ -420,6 +563,7 @@ class Telemetry:
inputs (dict[str, Any] | None): Input parameters for the test
model_name (str): Name of the model used in testing
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Test Execution")
@@ -446,6 +590,7 @@ class Telemetry:
def deploy_signup_error_span(self):
"""Records when an error occurs during the deployment signup process."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
@@ -460,6 +605,7 @@ class Telemetry:
Args:
uuid (Optional[str]): Unique identifier for the deployment
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
@@ -472,6 +618,7 @@ class Telemetry:
def create_crew_deployment_span(self):
"""Records the creation of a new crew deployment."""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
@@ -487,6 +634,7 @@ class Telemetry:
uuid (Optional[str]): Unique identifier for the crew
log_type (str, optional): Type of logs being retrieved. Defaults to "deployment".
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
@@ -504,6 +652,7 @@ class Telemetry:
Args:
uuid (Optional[str]): Unique identifier for the crew being removed
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
@@ -634,6 +783,7 @@ class Telemetry:
Args:
flow_name (str): Name of the flow being created
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
@@ -650,6 +800,7 @@ class Telemetry:
flow_name (str): Name of the flow being plotted
node_names (list[str]): List of node names in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
@@ -667,6 +818,7 @@ class Telemetry:
flow_name (str): Name of the flow being executed
node_names (list[str]): List of nodes being executed in the flow
"""
def operation():
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")

View File

@@ -7,29 +7,27 @@ from pydantic import (
BaseModel,
ConfigDict,
Field,
PydanticDeprecatedSince20,
create_model,
validator,
field_validator,
)
from pydantic import BaseModel as PydanticBaseModel
from crewai.tools.structured_tool import CrewStructuredTool
# Ignore all "PydanticDeprecatedSince20" warnings globally
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20)
class BaseTool(BaseModel, ABC):
class _ArgsSchemaPlaceholder(PydanticBaseModel):
pass
model_config = ConfigDict()
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
"""The unique name of the tool that clearly communicates its purpose."""
description: str
"""Used to tell the model how/when/why to use the tool."""
args_schema: Type[PydanticBaseModel] = Field(default_factory=_ArgsSchemaPlaceholder)
args_schema: Type[PydanticBaseModel] = Field(
default_factory=_ArgsSchemaPlaceholder, validate_default=True
)
"""The schema for the arguments that the tool accepts."""
description_updated: bool = False
"""Flag to check if the description has been updated."""
@@ -38,7 +36,8 @@ class BaseTool(BaseModel, ABC):
result_as_answer: bool = False
"""Flag to check if the tool should be the final agent answer."""
@validator("args_schema", always=True, pre=True)
@field_validator("args_schema", mode="before")
@classmethod
def _default_args_schema(
cls, v: Type[PydanticBaseModel]
) -> Type[PydanticBaseModel]:

View File

@@ -22,6 +22,7 @@ from crewai.utilities.events.tool_usage_events import (
ToolSelectionErrorEvent,
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
ToolValidateInputErrorEvent,
)
@@ -69,6 +70,7 @@ class ToolUsage:
function_calling_llm: Any,
agent: Any,
action: Any,
fingerprint_context: Optional[Dict[str, str]] = None,
) -> None:
self._i18n: I18N = agent.i18n
self._printer: Printer = Printer()
@@ -85,6 +87,7 @@ class ToolUsage:
self.task = task
self.action = action
self.function_calling_llm = function_calling_llm
self.fingerprint_context = fingerprint_context or {}
# Set the maximum parsing attempts for bigger models
if (
@@ -117,7 +120,10 @@ class ToolUsage:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
if isinstance(tool, CrewStructuredTool) and tool.name == self._i18n.tools("add_image")["name"]: # type: ignore
if (
isinstance(tool, CrewStructuredTool)
and tool.name == self._i18n.tools("add_image")["name"] # type: ignore
):
try:
result = self._use(tool_string=tool_string, tool=tool, calling=calling)
return result
@@ -181,18 +187,26 @@ class ToolUsage:
if calling.arguments:
try:
acceptable_args = tool.args_schema.model_json_schema()["properties"].keys() # type: ignore
acceptable_args = tool.args_schema.model_json_schema()[
"properties"
].keys() # type: ignore
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
except Exception:
arguments = calling.arguments
# Add fingerprint metadata if available
arguments = self._add_fingerprint_metadata(arguments)
result = tool.invoke(input=arguments)
else:
result = tool.invoke(input={})
# Add fingerprint metadata even to empty arguments
arguments = self._add_fingerprint_metadata({})
result = tool.invoke(input=arguments)
except Exception as e:
self.on_tool_error(tool=tool, tool_calling=calling, e=e)
self._run_attempts += 1
@@ -202,7 +216,7 @@ class ToolUsage:
error=e, tool=tool.name, tool_inputs=tool.description
)
error = ToolUsageErrorException(
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
f"\n{error_message}.\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
).message
self.task.increment_tools_errors()
if self.agent.verbose:
@@ -244,6 +258,7 @@ class ToolUsage:
tool_calling=calling,
from_cache=from_cache,
started_at=started_at,
result=result,
)
if (
@@ -380,7 +395,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f'{self._i18n.errors("tool_arguments_error")}'
f"{self._i18n.errors('tool_arguments_error')}"
)
if not isinstance(arguments, dict):
@@ -388,7 +403,7 @@ class ToolUsage:
raise
else:
return ToolUsageErrorException(
f'{self._i18n.errors("tool_arguments_error")}'
f"{self._i18n.errors('tool_arguments_error')}"
)
return ToolCalling(
@@ -416,7 +431,7 @@ class ToolUsage:
if self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
f"{self._i18n.errors('tool_usage_error').format(error=e)}\nMoving on then. {self._i18n.slice('format').format(tool_names=self.tools_names)}"
)
return self._tool_calling(tool_string)
@@ -480,8 +495,13 @@ class ToolUsage:
"tool_name": self.action.tool,
"tool_args": str(self.action.tool_input),
"tool_class": self.__class__.__name__,
"agent": self.agent, # Adding agent for fingerprint extraction
}
# Include fingerprint context if available
if self.fingerprint_context:
tool_selection_data.update(self.fingerprint_context)
crewai_event_bus.emit(
self,
ToolValidateInputErrorEvent(**tool_selection_data, error=final_error),
@@ -492,7 +512,12 @@ class ToolUsage:
crewai_event_bus.emit(self, ToolUsageErrorEvent(**{**event_data, "error": e}))
def on_tool_use_finished(
self, tool: Any, tool_calling: ToolCalling, from_cache: bool, started_at: float
self,
tool: Any,
tool_calling: ToolCalling,
from_cache: bool,
started_at: float,
result: Any,
) -> None:
finished_at = time.time()
event_data = self._prepare_event_data(tool, tool_calling)
@@ -501,12 +526,13 @@ class ToolUsage:
"started_at": datetime.datetime.fromtimestamp(started_at),
"finished_at": datetime.datetime.fromtimestamp(finished_at),
"from_cache": from_cache,
"output": result,
}
)
crewai_event_bus.emit(self, ToolUsageFinishedEvent(**event_data))
def _prepare_event_data(self, tool: Any, tool_calling: ToolCalling) -> dict:
return {
event_data = {
"agent_key": self.agent.key,
"agent_role": (self.agent._original_role or self.agent.role),
"run_attempts": self._run_attempts,
@@ -514,4 +540,43 @@ class ToolUsage:
"tool_name": tool.name,
"tool_args": tool_calling.arguments,
"tool_class": tool.__class__.__name__,
"agent": self.agent, # Adding agent for fingerprint extraction
}
# Include fingerprint context if available
if self.fingerprint_context:
event_data.update(self.fingerprint_context)
return event_data
def _add_fingerprint_metadata(self, arguments: dict) -> dict:
"""Add fingerprint metadata to tool arguments if available.
Args:
arguments: The original tool arguments
Returns:
Updated arguments dictionary with fingerprint metadata
"""
# Create a shallow copy to avoid modifying the original
arguments = arguments.copy()
# Add security metadata under a designated key
if not "security_context" in arguments:
arguments["security_context"] = {}
security_context = arguments["security_context"]
# Add agent fingerprint if available
if hasattr(self, "agent") and hasattr(self.agent, "security_config"):
security_context["agent_fingerprint"] = self.agent.security_config.fingerprint.to_dict()
# Add task fingerprint if available
if hasattr(self, "task") and hasattr(self.task, "security_config"):
security_context["task_fingerprint"] = self.task.security_config.fingerprint.to_dict()
# Add crew fingerprint if available
if hasattr(self, "crew") and hasattr(self.crew, "security_config"):
security_context["crew_fingerprint"] = self.crew.security_config.fingerprint.to_dict()
return arguments

View File

@@ -0,0 +1,62 @@
import re
from typing import Optional
MIN_COLLECTION_LENGTH = 3
MAX_COLLECTION_LENGTH = 63
DEFAULT_COLLECTION = "default_collection"
# Compiled regex patterns for better performance
INVALID_CHARS_PATTERN = re.compile(r"[^a-zA-Z0-9_-]")
IPV4_PATTERN = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$")
def is_ipv4_pattern(name: str) -> bool:
"""
Check if a string matches an IPv4 address pattern.
Args:
name: The string to check
Returns:
True if the string matches an IPv4 pattern, False otherwise
"""
return bool(IPV4_PATTERN.match(name))
def sanitize_collection_name(name: Optional[str]) -> str:
"""
Sanitize a collection name to meet ChromaDB requirements:
1. 3-63 characters long
2. Starts and ends with alphanumeric character
3. Contains only alphanumeric characters, underscores, or hyphens
4. No consecutive periods
5. Not a valid IPv4 address
Args:
name: The original collection name to sanitize
Returns:
A sanitized collection name that meets ChromaDB requirements
"""
if not name:
return DEFAULT_COLLECTION
if is_ipv4_pattern(name):
name = f"ip_{name}"
sanitized = INVALID_CHARS_PATTERN.sub("_", name)
if not sanitized[0].isalnum():
sanitized = "a" + sanitized
if not sanitized[-1].isalnum():
sanitized = sanitized[:-1] + "z"
if len(sanitized) < MIN_COLLECTION_LENGTH:
sanitized = sanitized + "x" * (MIN_COLLECTION_LENGTH - len(sanitized))
if len(sanitized) > MAX_COLLECTION_LENGTH:
sanitized = sanitized[:MAX_COLLECTION_LENGTH]
if not sanitized[-1].isalnum():
sanitized = sanitized[:-1] + "z"
return sanitized

View File

@@ -287,8 +287,9 @@ def generate_model_description(model: Type[BaseModel]) -> str:
else:
return str(field_type)
fields = model.__annotations__
fields = model.model_fields
field_descriptions = [
f'"{name}": {describe_field(type_)}' for name, type_ in fields.items()
f'"{name}": {describe_field(field.annotation)}'
for name, field in fields.items()
]
return "{\n " + ",\n ".join(field_descriptions) + "\n}"

View File

@@ -45,7 +45,7 @@ class TaskEvaluator:
def evaluate(self, task, output) -> TaskEvaluation:
crewai_event_bus.emit(
self, TaskEvaluationEvent(evaluation_type="task_evaluation")
self, TaskEvaluationEvent(evaluation_type="task_evaluation", task=task)
)
evaluation_query = (
f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n"

View File

@@ -4,13 +4,13 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from .base_events import CrewEvent
from .base_events import BaseEvent
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
class AgentExecutionStartedEvent(CrewEvent):
class AgentExecutionStartedEvent(BaseEvent):
"""Event emitted when an agent starts executing a task"""
agent: BaseAgent
@@ -21,8 +21,20 @@ class AgentExecutionStartedEvent(CrewEvent):
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
class AgentExecutionCompletedEvent(CrewEvent):
class AgentExecutionCompletedEvent(BaseEvent):
"""Event emitted when an agent completes executing a task"""
agent: BaseAgent
@@ -30,11 +42,35 @@ class AgentExecutionCompletedEvent(CrewEvent):
output: str
type: str = "agent_execution_completed"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
class AgentExecutionErrorEvent(CrewEvent):
class AgentExecutionErrorEvent(BaseEvent):
"""Event emitted when an agent encounters an error during execution"""
agent: BaseAgent
task: Any
error: str
type: str = "agent_execution_error"
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata

View File

@@ -1,10 +1,28 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
class CrewEvent(BaseModel):
"""Base class for all crew events"""
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=datetime.now)
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew"
fingerprint_metadata: Optional[Dict[str, Any]] = None # Any relevant metadata
def to_json(self, exclude: set[str] | None = None):
"""
Converts the event to a JSON-serializable dictionary.
Args:
exclude (set[str], optional): Set of keys to exclude from the result. Defaults to None.
Returns:
dict: A JSON-serializable dictionary.
"""
return to_serializable(self, exclude=exclude)

View File

@@ -1,81 +1,102 @@
from typing import Any, Dict, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from pydantic import InstanceOf
from crewai.utilities.events.base_events import BaseEvent
from crewai.utilities.events.base_events import CrewEvent
if TYPE_CHECKING:
from crewai.crew import Crew
else:
Crew = Any
class CrewKickoffStartedEvent(CrewEvent):
"""Event emitted when a crew starts execution"""
class CrewBaseEvent(BaseEvent):
"""Base class for crew events with fingerprint handling"""
crew_name: Optional[str]
crew: Optional[Crew] = None
def __init__(self, **data):
super().__init__(**data)
self.set_crew_fingerprint()
def set_crew_fingerprint(self) -> None:
if self.crew and hasattr(self.crew, "fingerprint") and self.crew.fingerprint:
self.source_fingerprint = self.crew.fingerprint.uuid_str
self.source_type = "crew"
if (
hasattr(self.crew.fingerprint, "metadata")
and self.crew.fingerprint.metadata
):
self.fingerprint_metadata = self.crew.fingerprint.metadata
def to_json(self, exclude: set[str] | None = None):
if exclude is None:
exclude = set()
exclude.add("crew")
return super().to_json(exclude=exclude)
class CrewKickoffStartedEvent(CrewBaseEvent):
"""Event emitted when a crew starts execution"""
inputs: Optional[Dict[str, Any]]
type: str = "crew_kickoff_started"
class CrewKickoffCompletedEvent(CrewEvent):
class CrewKickoffCompletedEvent(CrewBaseEvent):
"""Event emitted when a crew completes execution"""
crew_name: Optional[str]
output: Any
type: str = "crew_kickoff_completed"
class CrewKickoffFailedEvent(CrewEvent):
class CrewKickoffFailedEvent(CrewBaseEvent):
"""Event emitted when a crew fails to complete execution"""
error: str
crew_name: Optional[str]
type: str = "crew_kickoff_failed"
class CrewTrainStartedEvent(CrewEvent):
class CrewTrainStartedEvent(CrewBaseEvent):
"""Event emitted when a crew starts training"""
crew_name: Optional[str]
n_iterations: int
filename: str
inputs: Optional[Dict[str, Any]]
type: str = "crew_train_started"
class CrewTrainCompletedEvent(CrewEvent):
class CrewTrainCompletedEvent(CrewBaseEvent):
"""Event emitted when a crew completes training"""
crew_name: Optional[str]
n_iterations: int
filename: str
type: str = "crew_train_completed"
class CrewTrainFailedEvent(CrewEvent):
class CrewTrainFailedEvent(CrewBaseEvent):
"""Event emitted when a crew fails to complete training"""
error: str
crew_name: Optional[str]
type: str = "crew_train_failed"
class CrewTestStartedEvent(CrewEvent):
class CrewTestStartedEvent(CrewBaseEvent):
"""Event emitted when a crew starts testing"""
crew_name: Optional[str]
n_iterations: int
eval_llm: Optional[Union[str, Any]]
inputs: Optional[Dict[str, Any]]
type: str = "crew_test_started"
class CrewTestCompletedEvent(CrewEvent):
class CrewTestCompletedEvent(CrewBaseEvent):
"""Event emitted when a crew completes testing"""
crew_name: Optional[str]
type: str = "crew_test_completed"
class CrewTestFailedEvent(CrewEvent):
class CrewTestFailedEvent(CrewBaseEvent):
"""Event emitted when a crew fails to complete testing"""
error: str
crew_name: Optional[str]
type: str = "crew_test_failed"

View File

@@ -4,10 +4,10 @@ from typing import Any, Callable, Dict, List, Type, TypeVar, cast
from blinker import Signal
from crewai.utilities.events.base_events import CrewEvent
from crewai.utilities.events.base_events import BaseEvent
from crewai.utilities.events.event_types import EventTypes
EventT = TypeVar("EventT", bound=CrewEvent)
EventT = TypeVar("EventT", bound=BaseEvent)
class CrewAIEventsBus:
@@ -30,7 +30,7 @@ class CrewAIEventsBus:
def _initialize(self) -> None:
"""Initialize the event bus internal state"""
self._signal = Signal("crewai_event_bus")
self._handlers: Dict[Type[CrewEvent], List[Callable]] = {}
self._handlers: Dict[Type[BaseEvent], List[Callable]] = {}
def on(
self, event_type: Type[EventT]
@@ -59,7 +59,7 @@ class CrewAIEventsBus:
return decorator
def emit(self, source: Any, event: CrewEvent) -> None:
def emit(self, source: Any, event: BaseEvent) -> None:
"""
Emit an event to all registered handlers

View File

@@ -2,10 +2,10 @@ from typing import Any, Dict, Optional, Union
from pydantic import BaseModel, ConfigDict
from .base_events import CrewEvent
from .base_events import BaseEvent
class FlowEvent(CrewEvent):
class FlowEvent(BaseEvent):
"""Base class for all flow events"""
type: str

View File

@@ -1,7 +1,7 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from crewai.utilities.events.base_events import CrewEvent
from crewai.utilities.events.base_events import BaseEvent
class LLMCallType(Enum):
@@ -11,17 +11,22 @@ class LLMCallType(Enum):
LLM_CALL = "llm_call"
class LLMCallStartedEvent(CrewEvent):
"""Event emitted when a LLM call starts"""
class LLMCallStartedEvent(BaseEvent):
"""Event emitted when a LLM call starts
Attributes:
messages: Content can be either a string or a list of dictionaries that support
multimodal content (text, images, etc.)
"""
type: str = "llm_call_started"
messages: Union[str, List[Dict[str, str]]]
messages: Union[str, List[Dict[str, Any]]]
tools: Optional[List[dict]] = None
callbacks: Optional[List[Any]] = None
available_functions: Optional[Dict[str, Any]] = None
class LLMCallCompletedEvent(CrewEvent):
class LLMCallCompletedEvent(BaseEvent):
"""Event emitted when a LLM call completes"""
type: str = "llm_call_completed"
@@ -29,14 +34,14 @@ class LLMCallCompletedEvent(CrewEvent):
call_type: LLMCallType
class LLMCallFailedEvent(CrewEvent):
class LLMCallFailedEvent(BaseEvent):
"""Event emitted when a LLM call fails"""
error: str
type: str = "llm_call_failed"
class LLMStreamChunkEvent(CrewEvent):
class LLMStreamChunkEvent(BaseEvent):
"""Event emitted when a streaming chunk is received"""
type: str = "llm_stream_chunk"

View File

@@ -1,32 +1,84 @@
from typing import Optional
from typing import Any, Optional
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.events.base_events import CrewEvent
from crewai.utilities.events.base_events import BaseEvent
class TaskStartedEvent(CrewEvent):
class TaskStartedEvent(BaseEvent):
"""Event emitted when a task starts"""
type: str = "task_started"
context: Optional[str]
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskCompletedEvent(CrewEvent):
class TaskCompletedEvent(BaseEvent):
"""Event emitted when a task completes"""
output: TaskOutput
type: str = "task_completed"
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskFailedEvent(CrewEvent):
class TaskFailedEvent(BaseEvent):
"""Event emitted when a task fails"""
error: str
type: str = "task_failed"
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata
class TaskEvaluationEvent(CrewEvent):
class TaskEvaluationEvent(BaseEvent):
"""Event emitted when a task evaluation is completed"""
type: str = "task_evaluation"
evaluation_type: str
task: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the task
if hasattr(self.task, "fingerprint") and self.task.fingerprint:
self.source_fingerprint = self.task.fingerprint.uuid_str
self.source_type = "task"
if (
hasattr(self.task.fingerprint, "metadata")
and self.task.fingerprint.metadata
):
self.fingerprint_metadata = self.task.fingerprint.metadata

View File

@@ -1,10 +1,10 @@
from datetime import datetime
from typing import Any, Callable, Dict
from typing import Any, Callable, Dict, Optional
from .base_events import CrewEvent
from .base_events import BaseEvent
class ToolUsageEvent(CrewEvent):
class ToolUsageEvent(BaseEvent):
"""Base event for tool usage tracking"""
agent_key: str
@@ -14,9 +14,22 @@ class ToolUsageEvent(CrewEvent):
tool_class: str
run_attempts: int | None = None
delegations: int | None = None
agent: Optional[Any] = None
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata
class ToolUsageStartedEvent(ToolUsageEvent):
"""Event emitted when a tool execution is started"""
@@ -30,6 +43,7 @@ class ToolUsageFinishedEvent(ToolUsageEvent):
started_at: datetime
finished_at: datetime
from_cache: bool = False
output: Any
type: str = "tool_usage_finished"
@@ -54,7 +68,7 @@ class ToolSelectionErrorEvent(ToolUsageEvent):
type: str = "tool_selection_error"
class ToolExecutionErrorEvent(CrewEvent):
class ToolExecutionErrorEvent(BaseEvent):
"""Event emitted when a tool execution encounters an error"""
error: Any
@@ -62,3 +76,16 @@ class ToolExecutionErrorEvent(CrewEvent):
tool_name: str
tool_args: Dict[str, Any]
tool_class: Callable
agent: Optional[Any] = None
def __init__(self, **data):
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
self.source_fingerprint = self.agent.fingerprint.uuid_str
self.source_type = "agent"
if (
hasattr(self.agent.fingerprint, "metadata")
and self.agent.fingerprint.metadata
):
self.fingerprint_metadata = self.agent.fingerprint.metadata

View File

@@ -507,9 +507,10 @@ class ConsoleFormatter:
# Remove the thinking status node when complete
if "Thinking" in str(tool_branch.label):
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
if tool_branch in agent_branch.children:
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
@@ -587,6 +588,7 @@ class ConsoleFormatter:
for child in flow_tree.children:
if "Running tests" in str(child.label):
child.label = Text("✅ Tests completed successfully", style="green")
break
self.print(flow_tree)
self.print()

View File

@@ -1,38 +1,21 @@
import json
import uuid
from datetime import date, datetime
from typing import Any, Dict, List, Union
from pydantic import BaseModel
from crewai.flow import Flow
SerializablePrimitive = Union[str, int, float, bool, None]
Serializable = Union[
SerializablePrimitive, List["Serializable"], Dict[str, "Serializable"]
]
def export_state(flow: Flow) -> dict[str, Serializable]:
"""Exports the Flow's internal state as JSON-compatible data structures.
Performs a one-way transformation of a Flow's state into basic Python types
that can be safely serialized to JSON. To prevent infinite recursion with
circular references, the conversion is limited to a depth of 5 levels.
Args:
flow: The Flow object whose state needs to be exported
Returns:
dict[str, Any]: The transformed state using JSON-compatible Python
types.
"""
result = to_serializable(flow._state)
assert isinstance(result, dict)
return result
def to_serializable(
obj: Any, max_depth: int = 5, _current_depth: int = 0
obj: Any,
exclude: set[str] | None = None,
max_depth: int = 5,
_current_depth: int = 0,
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
@@ -42,6 +25,7 @@ def to_serializable(
Args:
obj (Any): Object to transform.
exclude (set[str], optional): Set of keys to exclude from the result.
max_depth (int, optional): Maximum recursion depth. Defaults to 5.
Returns:
@@ -50,21 +34,39 @@ def to_serializable(
if _current_depth >= max_depth:
return repr(obj)
if exclude is None:
exclude = set()
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
elif isinstance(obj, uuid.UUID):
return str(obj)
elif isinstance(obj, (date, datetime)):
return obj.isoformat()
elif isinstance(obj, (list, tuple, set)):
return [to_serializable(item, max_depth, _current_depth + 1) for item in obj]
return [
to_serializable(
item, max_depth=max_depth, _current_depth=_current_depth + 1
)
for item in obj
]
elif isinstance(obj, dict):
return {
_to_serializable_key(key): to_serializable(
value, max_depth, _current_depth + 1
obj=value,
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
)
for key, value in obj.items()
if key not in exclude
}
elif isinstance(obj, BaseModel):
return to_serializable(obj.model_dump(), max_depth, _current_depth + 1)
return to_serializable(
obj=obj.model_dump(exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
)
else:
return repr(obj)