Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
3efc5f67fb Fix pyright LSP errors in example code
Co-Authored-By: Joe Moura <joao@crewai.com>
2025-04-08 22:29:10 +00:00
7 changed files with 89 additions and 92 deletions

View File

@@ -23,6 +23,7 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F
from crewai.utilities.converter import generate_model_description
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
from crewai.utilities.typing import AgentConfig
agentops = None
@@ -88,6 +89,7 @@ class Agent(BaseAgent):
function_calling_llm: Optional[Any] = Field(
description="Language model that will run the agent.", default=None
)
config: Optional[Union[Dict[str, Any], AgentConfig]] = Field(default=None)
system_template: Optional[str] = Field(
default=None, description="System format for the agent."
)

View File

@@ -16,6 +16,12 @@ def after_kickoff(func):
def task(func):
"""Decorator to mark a method as a task creator.
When applied to a method in a class decorated with @CrewBase,
this makes the method's return value accessible as an element
of the self.tasks list.
"""
func.is_task = True
@wraps(func)
@@ -29,6 +35,12 @@ def task(func):
def agent(func):
"""Decorator to mark a method as an agent creator.
When applied to a method in a class decorated with @CrewBase,
this makes the method's return value accessible as an element
of the self.agents list.
"""
func.is_agent = True
func = memoize(func)
return func

View File

@@ -1,6 +1,6 @@
import inspect
from pathlib import Path
from typing import Any, Callable, Dict, TypeVar, cast
from typing import Any, Callable, Dict, List, TypeVar, cast
import yaml
from dotenv import load_dotenv
@@ -66,6 +66,9 @@ def CrewBase(cls: T) -> T:
self._kickoff = self._filter_functions(
self._original_functions, "is_kickoff"
)
self.agents = [] # type: List[Any]
self.tasks = [] # type: List[Any]
@staticmethod
def load_yaml(config_path: Path):

View File

@@ -41,6 +41,7 @@ from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.i18n import I18N
from crewai.utilities.typing import TaskConfig
class Task(BaseModel):
@@ -74,7 +75,7 @@ class Task(BaseModel):
expected_output: str = Field(
description="Clear definition of expected output for the task."
)
config: Optional[Dict[str, Any]] = Field(
config: Optional[Union[Dict[str, Any], TaskConfig]] = Field(
description="Configuration for the agent",
default=None,
)

View File

@@ -0,0 +1,14 @@
from typing import Dict, List, Optional, Any, TypedDict, Union
class AgentConfig(TypedDict, total=False):
"""TypedDict for agent configuration loaded from YAML."""
role: str
goal: str
backstory: str
verbose: bool
class TaskConfig(TypedDict, total=False):
"""TypedDict for task configuration loaded from YAML."""
description: str
expected_output: str
agent: str # Role of the agent to execute this task

View File

@@ -1,10 +1,8 @@
"""Test Flow creation and execution basic functionality."""
import asyncio
import threading
import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start
@@ -324,91 +322,3 @@ def test_router_with_multiple_conditions():
# final_step should run after router_and
assert execution_order.index("log_final_step") > execution_order.index("router_and")
def test_flow_with_rlock_in_state():
"""Test that Flow can handle unpickleable objects like RLock in state.
Regression test for issue #3828: Flow should not crash when state contains
objects that cannot be deep copied (like threading.RLock).
In version 1.3.0, Flow._copy_state() used copy.deepcopy() which would fail
with "TypeError: cannot pickle '_thread.RLock' object" when state contained
threading locks (e.g., from memory components or LLM instances).
The current implementation no longer deep copies state, so this test verifies
that flows with unpickleable objects in state work correctly.
"""
execution_order = []
class StateWithRLock(BaseModel):
class Config:
arbitrary_types_allowed = True
counter: int = 0
lock: threading.RLock = None
class FlowWithRLock(Flow[StateWithRLock]):
@start()
def step_1(self):
execution_order.append("step_1")
self.state.counter += 1
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
self.state.counter += 1
flow = FlowWithRLock()
flow._state.lock = threading.RLock()
flow.kickoff()
assert execution_order == ["step_1", "step_2"]
assert flow.state.counter == 2
def test_flow_with_nested_unpickleable_objects():
"""Test that Flow can handle unpickleable objects nested in containers.
Regression test for issue #3828: Verifies that unpickleable objects
nested inside dicts/lists in state don't cause crashes.
This simulates real-world scenarios where memory components or other
resources with locks might be stored in nested data structures.
"""
execution_order = []
class NestedState(BaseModel):
class Config:
arbitrary_types_allowed = True
data: dict = {}
items: list = []
class FlowWithNestedUnpickleable(Flow[NestedState]):
@start()
def step_1(self):
execution_order.append("step_1")
self.state.data["lock"] = threading.RLock()
self.state.data["value"] = 42
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
self.state.items.append(threading.Lock())
self.state.items.append("normal_value")
@listen(step_2)
def step_3(self):
execution_order.append("step_3")
assert self.state.data["value"] == 42
assert len(self.state.items) == 2
flow = FlowWithNestedUnpickleable()
flow.kickoff()
assert execution_order == ["step_1", "step_2", "step_3"]
assert flow.state.data["value"] == 42
assert len(flow.state.items) == 2

55
tests/typing_test.py Normal file
View File

@@ -0,0 +1,55 @@
from typing import Dict, Any
import pytest
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities.typing import AgentConfig, TaskConfig
def test_agent_with_config_dict():
config: AgentConfig = {
"role": "Test Agent",
"goal": "Test Goal",
"backstory": "Test Backstory",
"verbose": True
}
agent = Agent(config=config)
assert agent.role == "Test Agent"
assert agent.goal == "Test Goal"
assert agent.backstory == "Test Backstory"
assert agent.verbose is True
def test_agent_with_yaml_config():
config: Dict[str, Any] = {
"researcher": {
"role": "Researcher",
"goal": "Research Goal",
"backstory": "Researcher Backstory",
"verbose": True
}
}
agent = Agent(config=config["researcher"])
assert agent.role == "Researcher"
assert agent.goal == "Research Goal"
assert agent.backstory == "Researcher Backstory"
def test_task_with_config_dict():
config: TaskConfig = {
"description": "Test Task",
"expected_output": "Test Output",
"agent": "researcher"
}
agent = Agent(role="Researcher", goal="Goal", backstory="Backstory")
task = Task(config=config, agent=agent)
assert task.description == "Test Task"
assert task.expected_output == "Test Output"
assert task.agent == agent