mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
fix: resolve additional mypy type errors
- Fix tool_usage.py: proper type annotations for result and fingerprint metadata - Fix lite_agent.py: proper Union type for guardrail callable accepting both LiteAgentOutput and TaskOutput - Add missing return type annotations to task_output_storage_handler.py methods - Fix crew.py: replace Json generic check with str, remove unused type:ignore and redundant cast
This commit is contained in:
@@ -273,7 +273,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
"""
|
||||
|
||||
# TODO: Improve typing
|
||||
return json.loads(v) if isinstance(v, Json) else v
|
||||
return json.loads(v) if isinstance(v, str) else v
|
||||
|
||||
@model_validator(mode="after")
|
||||
def set_private_attrs(self) -> Self:
|
||||
@@ -641,7 +641,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
|
||||
for agent in self.agents:
|
||||
agent.i18n = i18n
|
||||
agent.crew = self # type: ignore[attr-defined]
|
||||
agent.crew = self
|
||||
agent.set_knowledge(crew_embedder=self.embedder)
|
||||
# TODO: Create an AgentFunctionCalling protocol for future refactoring
|
||||
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
|
||||
@@ -983,7 +983,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
if hasattr(task_agent, "get_delegation_tools"):
|
||||
delegation_tools = task_agent.get_delegation_tools(agents)
|
||||
# Cast delegation_tools to the expected type for _merge_tools
|
||||
return self._merge_tools(tools, cast(list[BaseTool], delegation_tools))
|
||||
return self._merge_tools(tools, delegation_tools)
|
||||
return cast(list[BaseTool], tools)
|
||||
|
||||
def _add_multimodal_tools(
|
||||
|
||||
@@ -5,6 +5,7 @@ from collections.abc import Callable
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
Union,
|
||||
cast,
|
||||
get_args,
|
||||
get_origin,
|
||||
@@ -39,6 +40,7 @@ from crewai.events.types.logging_events import AgentLogsExecutionEvent
|
||||
from crewai.flow.flow_trackable import FlowTrackable
|
||||
from crewai.llm import LLM
|
||||
from crewai.llms.base_llm import BaseLLM
|
||||
from crewai.task import TaskOutput
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.tools.structured_tool import CrewStructuredTool
|
||||
from crewai.utilities import I18N
|
||||
@@ -185,8 +187,8 @@ class LiteAgent(FlowTrackable, BaseModel):
|
||||
_messages: list[dict[str, str]] = PrivateAttr(default_factory=list)
|
||||
_iterations: int = PrivateAttr(default=0)
|
||||
_printer: Printer = PrivateAttr(default_factory=Printer)
|
||||
_guardrail: Optional[Callable[[LiteAgentOutput], tuple[bool, Any]]] = PrivateAttr(
|
||||
default=None
|
||||
_guardrail: Optional[Callable[[LiteAgentOutput | TaskOutput], tuple[bool, Any]]] = (
|
||||
PrivateAttr(default=None)
|
||||
)
|
||||
_guardrail_retry_count: int = PrivateAttr(default=0)
|
||||
|
||||
|
||||
@@ -199,7 +199,7 @@ class ToolUsage:
|
||||
|
||||
started_at = time.time()
|
||||
from_cache = False
|
||||
result = None # type: ignore
|
||||
result: str | None = None
|
||||
|
||||
if self.tools_handler and self.tools_handler.cache:
|
||||
result = self.tools_handler.cache.read(
|
||||
@@ -243,7 +243,7 @@ class ToolUsage:
|
||||
try:
|
||||
acceptable_args = tool.args_schema.model_json_schema()[
|
||||
"properties"
|
||||
].keys() # type: ignore
|
||||
].keys()
|
||||
arguments = {
|
||||
k: v
|
||||
for k, v in calling.arguments.items()
|
||||
@@ -668,7 +668,7 @@ class ToolUsage:
|
||||
|
||||
return event_data
|
||||
|
||||
def _add_fingerprint_metadata(self, arguments: dict) -> dict:
|
||||
def _add_fingerprint_metadata(self, arguments: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Add fingerprint metadata to tool arguments if available.
|
||||
|
||||
Args:
|
||||
|
||||
@@ -33,7 +33,7 @@ class TaskOutputStorageHandler:
|
||||
def __init__(self) -> None:
|
||||
self.storage = KickoffTaskOutputsSQLiteStorage()
|
||||
|
||||
def update(self, task_index: int, log: dict[str, Any]):
|
||||
def update(self, task_index: int, log: dict[str, Any]) -> None:
|
||||
saved_outputs = self.load()
|
||||
if saved_outputs is None:
|
||||
raise ValueError("Logs cannot be None")
|
||||
@@ -60,7 +60,7 @@ class TaskOutputStorageHandler:
|
||||
task_index: int,
|
||||
inputs: dict[str, Any] | None = None,
|
||||
was_replayed: bool = False,
|
||||
):
|
||||
) -> None:
|
||||
inputs = inputs or {}
|
||||
self.storage.add(task, output, task_index, was_replayed, inputs)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user