mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-30 14:52:36 +00:00
fix: checkpoint resume bugs and handler signature caching
- Return len(tasks) from _get_execution_start_index when all tasks complete, preventing full re-execution of finished checkpoints - Add _get_execution_start_index call to _aexecute_tasks so async resume skips completed tasks like the sync path does - Cache inspect.signature results per handler to avoid repeated introspection on every event emission
This commit is contained in:
@@ -1113,6 +1113,10 @@ class Crew(FlowTrackable, BaseModel):
|
||||
Returns:
|
||||
CrewOutput: Final output of the crew
|
||||
"""
|
||||
custom_start = self._get_execution_start_index(tasks)
|
||||
if custom_start is not None:
|
||||
start_index = custom_start
|
||||
|
||||
task_outputs: list[TaskOutput] = []
|
||||
pending_tasks: list[tuple[Task, asyncio.Task[TaskOutput], int]] = []
|
||||
last_sync_output: TaskOutput | None = None
|
||||
@@ -1297,7 +1301,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
for i, task in enumerate(tasks):
|
||||
if task.output is None:
|
||||
return i if i > 0 else None
|
||||
return None
|
||||
return len(tasks) if tasks else None
|
||||
|
||||
def _execute_tasks(
|
||||
self,
|
||||
|
||||
@@ -13,7 +13,6 @@ from collections.abc import Callable, Generator
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from contextlib import contextmanager
|
||||
import contextvars
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, Any, Final, ParamSpec, TypeVar
|
||||
@@ -51,7 +50,11 @@ from crewai.events.types.event_bus_types import (
|
||||
)
|
||||
from crewai.events.types.llm_events import LLMStreamChunkEvent
|
||||
from crewai.events.utils.console_formatter import ConsoleFormatter
|
||||
from crewai.events.utils.handlers import is_async_handler, is_call_handler_safe
|
||||
from crewai.events.utils.handlers import (
|
||||
_get_param_count,
|
||||
is_async_handler,
|
||||
is_call_handler_safe,
|
||||
)
|
||||
from crewai.utilities.rw_lock import RWLock
|
||||
|
||||
|
||||
@@ -356,8 +359,7 @@ class CrewAIEventsBus:
|
||||
state = self._runtime_state
|
||||
|
||||
async def _call(handler: AsyncHandler) -> Any:
|
||||
sig = inspect.signature(handler)
|
||||
if len(sig.parameters) >= 3:
|
||||
if _get_param_count(handler) >= 3:
|
||||
return await handler(source, event, state) # type: ignore[call-arg]
|
||||
return await handler(source, event) # type: ignore[call-arg]
|
||||
|
||||
|
||||
@@ -10,6 +10,19 @@ from crewai.events.base_events import BaseEvent
|
||||
from crewai.events.types.event_bus_types import AsyncHandler, SyncHandler
|
||||
|
||||
|
||||
_handler_param_count: dict[int, int] = {}
|
||||
|
||||
|
||||
def _get_param_count(handler: Any) -> int:
|
||||
"""Return the number of parameters a handler accepts, with caching."""
|
||||
key = id(handler)
|
||||
count = _handler_param_count.get(key)
|
||||
if count is None:
|
||||
count = len(inspect.signature(handler).parameters)
|
||||
_handler_param_count[key] = count
|
||||
return count
|
||||
|
||||
|
||||
def is_async_handler(
|
||||
handler: Any,
|
||||
) -> TypeIs[AsyncHandler]:
|
||||
@@ -55,8 +68,7 @@ def is_call_handler_safe(
|
||||
Exception if handler raised one, None otherwise
|
||||
"""
|
||||
try:
|
||||
sig = inspect.signature(handler)
|
||||
if len(sig.parameters) >= 3:
|
||||
if _get_param_count(handler) >= 3:
|
||||
handler(source, event, state) # type: ignore[call-arg]
|
||||
else:
|
||||
handler(source, event) # type: ignore[call-arg]
|
||||
|
||||
Reference in New Issue
Block a user