chore: refactor crew to provider
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled

Enable dynamic extension exports and small behavior fixes across events and flow modules:

- events/__init__.py: Added _extension_exports and extended __getattr__ to lazily resolve registered extension values or import paths.
- events/event_bus.py: Implemented off() to unregister sync/async handlers, clean handler dependencies, and invalidate execution plan cache.
- events/listeners/tracing/utils.py: Added Callable import and _first_time_trace_hook to allow overriding first-time trace auto-collection behavior.
- events/types/tool_usage_events.py: Changed ToolUsageEvent.run_attempts default from None to 0 to avoid nullable handling.
- events/utils/console_formatter.py: Respect CREWAI_DISABLE_VERSION_CHECK env var to skip version checks in CI-like flows.
- flow/async_feedback/__init__.py: Added typing.Any import, _extension_exports and __getattr__ to support extensions via attribute lookup.

These changes add extension points and safer defaults, and provide a way to unregister event handlers.
This commit is contained in:
Greyson LaLonde
2026-02-04 16:05:21 -05:00
committed by GitHub
parent 6bfc98e960
commit d86d43d3e0
10 changed files with 176 additions and 12 deletions

View File

@@ -751,6 +751,8 @@ class Crew(FlowTrackable, BaseModel):
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
result = self._post_kickoff(result)
self.usage_metrics = self.calculate_usage_metrics()
return result
@@ -764,6 +766,9 @@ class Crew(FlowTrackable, BaseModel):
clear_files(self.id)
detach(token)
def _post_kickoff(self, result: CrewOutput) -> CrewOutput:
return result
def kickoff_for_each(
self,
inputs: list[dict[str, Any]],
@@ -936,6 +941,8 @@ class Crew(FlowTrackable, BaseModel):
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
result = self._post_kickoff(result)
self.usage_metrics = self.calculate_usage_metrics()
return result
@@ -1181,6 +1188,9 @@ class Crew(FlowTrackable, BaseModel):
self.manager_agent = manager
manager.crew = self
def _get_execution_start_index(self, tasks: list[Task]) -> int | None:
return None
def _execute_tasks(
self,
tasks: list[Task],
@@ -1197,6 +1207,9 @@ class Crew(FlowTrackable, BaseModel):
Returns:
CrewOutput: Final output of the crew
"""
custom_start = self._get_execution_start_index(tasks)
if custom_start is not None:
start_index = custom_start
task_outputs: list[TaskOutput] = []
futures: list[tuple[Task, Future[TaskOutput], int]] = []
@@ -1305,8 +1318,10 @@ class Crew(FlowTrackable, BaseModel):
if files:
supported_types: list[str] = []
if agent and agent.llm and agent.llm.supports_multimodal():
provider = getattr(agent.llm, "provider", None) or getattr(
agent.llm, "model", "openai"
provider = (
getattr(agent.llm, "provider", None)
or getattr(agent.llm, "model", None)
or "openai"
)
api = getattr(agent.llm, "api", None)
supported_types = get_supported_content_types(provider, api)

View File

@@ -195,6 +195,7 @@ __all__ = [
"ToolUsageFinishedEvent",
"ToolUsageStartedEvent",
"ToolValidateInputErrorEvent",
"_extension_exports",
"crewai_event_bus",
]
@@ -210,14 +211,29 @@ _AGENT_EVENT_MAPPING = {
"LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events",
}
_extension_exports: dict[str, Any] = {}
def __getattr__(name: str) -> Any:
"""Lazy import for agent events to avoid circular imports."""
"""Lazy import for agent events and registered extensions."""
if name in _AGENT_EVENT_MAPPING:
import importlib
module_path = _AGENT_EVENT_MAPPING[name]
module = importlib.import_module(module_path)
return getattr(module, name)
if name in _extension_exports:
import importlib
value = _extension_exports[name]
if isinstance(value, str):
module_path, _, attr_name = value.rpartition(".")
if module_path:
module = importlib.import_module(module_path)
return getattr(module, attr_name)
return importlib.import_module(value)
return value
msg = f"module {__name__!r} has no attribute {name!r}"
raise AttributeError(msg)

View File

@@ -227,6 +227,39 @@ class CrewAIEventsBus:
return decorator
def off(
self,
event_type: type[BaseEvent],
handler: Callable[..., Any],
) -> None:
"""Unregister an event handler for a specific event type.
Args:
event_type: The event class to stop listening for
handler: The handler function to unregister
"""
with self._rwlock.w_locked():
if event_type in self._sync_handlers:
existing_sync = self._sync_handlers[event_type]
if handler in existing_sync:
self._sync_handlers[event_type] = existing_sync - {handler}
if not self._sync_handlers[event_type]:
del self._sync_handlers[event_type]
if event_type in self._async_handlers:
existing_async = self._async_handlers[event_type]
if handler in existing_async:
self._async_handlers[event_type] = existing_async - {handler}
if not self._async_handlers[event_type]:
del self._async_handlers[event_type]
if event_type in self._handler_dependencies:
self._handler_dependencies[event_type].pop(handler, None)
if not self._handler_dependencies[event_type]:
del self._handler_dependencies[event_type]
self._execution_plan_cache.pop(event_type, None)
def _call_handlers(
self,
source: Any,

View File

@@ -1,3 +1,4 @@
from collections.abc import Callable
from contextvars import ContextVar, Token
from datetime import datetime
import getpass
@@ -26,6 +27,8 @@ logger = logging.getLogger(__name__)
_tracing_enabled: ContextVar[bool | None] = ContextVar("_tracing_enabled", default=None)
_first_time_trace_hook: Callable[[], bool] | None = None
def should_enable_tracing(*, override: bool | None = None) -> bool:
"""Determine if tracing should be enabled.
@@ -407,10 +410,12 @@ def truncate_messages(
def should_auto_collect_first_time_traces() -> bool:
"""True if we should auto-collect traces for first-time user.
Returns:
True if first-time user AND telemetry not disabled AND tracing not explicitly enabled, False otherwise.
"""
if _first_time_trace_hook is not None:
return _first_time_trace_hook()
if _is_test_environment():
return False

View File

@@ -16,7 +16,7 @@ class ToolUsageEvent(BaseEvent):
tool_name: str
tool_args: dict[str, Any] | str
tool_class: str | None = None
run_attempts: int | None = None
run_attempts: int = 0
delegations: int | None = None
agent: Any | None = None
task_name: str | None = None
@@ -26,7 +26,7 @@ class ToolUsageEvent(BaseEvent):
model_config = ConfigDict(arbitrary_types_allowed=True)
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
if data.get("from_task"):
task = data["from_task"]
data["task_id"] = str(task.id)
@@ -96,10 +96,10 @@ class ToolExecutionErrorEvent(BaseEvent):
type: str = "tool_execution_error"
tool_name: str
tool_args: dict[str, Any]
tool_class: Callable
tool_class: Callable[..., Any]
agent: Any | None = None
def __init__(self, **data):
def __init__(self, **data: Any) -> None:
super().__init__(**data)
# Set fingerprint data from the agent
if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint:

View File

@@ -49,6 +49,9 @@ class ConsoleFormatter:
if os.getenv("CI", "").lower() in ("true", "1"):
return
if os.getenv("CREWAI_DISABLE_VERSION_CHECK", "").lower() in ("true", "1"):
return
try:
is_newer, current, latest = is_newer_version_available()
if is_newer and latest:

View File

@@ -28,6 +28,8 @@ Example:
```
"""
from typing import Any
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import (
HumanFeedbackPending,
@@ -41,4 +43,15 @@ __all__ = [
"HumanFeedbackPending",
"HumanFeedbackProvider",
"PendingFeedbackContext",
"_extension_exports",
]
_extension_exports: dict[str, Any] = {}
def __getattr__(name: str) -> Any:
"""Support extensions via dynamic attribute lookup."""
if name in _extension_exports:
return _extension_exports[name]
msg = f"module {__name__!r} has no attribute {name!r}"
raise AttributeError(msg)

View File

@@ -0,0 +1 @@
"""Knowledge source utilities."""

View File

@@ -0,0 +1,70 @@
"""Helper utilities for knowledge sources."""
from typing import Any, ClassVar
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource
from crewai.knowledge.source.excel_knowledge_source import ExcelKnowledgeSource
from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource
from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource
class SourceHelper:
"""Helper class for creating and managing knowledge sources."""
SUPPORTED_FILE_TYPES: ClassVar[list[str]] = [
".csv",
".pdf",
".json",
".txt",
".xlsx",
".xls",
]
_FILE_TYPE_MAP: ClassVar[dict[str, type[BaseKnowledgeSource]]] = {
".csv": CSVKnowledgeSource,
".pdf": PDFKnowledgeSource,
".json": JSONKnowledgeSource,
".txt": TextFileKnowledgeSource,
".xlsx": ExcelKnowledgeSource,
".xls": ExcelKnowledgeSource,
}
@classmethod
def is_supported_file(cls, file_path: str) -> bool:
"""Check if a file type is supported.
Args:
file_path: Path to the file.
Returns:
True if the file type is supported.
"""
return file_path.lower().endswith(tuple(cls.SUPPORTED_FILE_TYPES))
@classmethod
def get_source(
cls, file_path: str, metadata: dict[str, Any] | None = None
) -> BaseKnowledgeSource:
"""Create appropriate KnowledgeSource based on file extension.
Args:
file_path: Path to the file.
metadata: Optional metadata to attach to the source.
Returns:
The appropriate KnowledgeSource instance.
Raises:
ValueError: If the file type is not supported.
"""
if not cls.is_supported_file(file_path):
raise ValueError(f"Unsupported file type: {file_path}")
lower_path = file_path.lower()
for ext, source_cls in cls._FILE_TYPE_MAP.items():
if lower_path.endswith(ext):
return source_cls(file_path=[file_path], metadata=metadata)
raise ValueError(f"Unsupported file type: {file_path}")

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
from collections import defaultdict
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel, Field, InstanceOf
from rich.box import HEAVY_EDGE
@@ -36,7 +36,13 @@ class CrewEvaluator:
iteration: The current iteration of the evaluation.
"""
def __init__(self, crew: Crew, eval_llm: InstanceOf[BaseLLM]) -> None:
def __init__(
self,
crew: Crew,
eval_llm: InstanceOf[BaseLLM] | str | None = None,
openai_model_name: str | None = None,
llm: InstanceOf[BaseLLM] | str | None = None,
) -> None:
self.crew = crew
self.llm = eval_llm
self.tasks_scores: defaultdict[int, list[float]] = defaultdict(list)
@@ -86,7 +92,9 @@ class CrewEvaluator:
"""
self.iteration = iteration
def print_crew_evaluation_result(self) -> None:
def print_crew_evaluation_result(
self, token_usage: list[dict[str, Any]] | None = None
) -> None:
"""
Prints the evaluation result of the crew in a table.
A Crew with 2 tasks using the command crewai test -n 3
@@ -204,7 +212,7 @@ class CrewEvaluator:
CrewTestResultEvent(
quality=quality_score,
execution_duration=current_task.execution_duration,
model=self.llm.model,
model=getattr(self.llm, "model", str(self.llm)),
crew_name=self.crew.name,
crew=self.crew,
),