mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-28 09:38:17 +00:00
Compare commits
3 Commits
devin/1768
...
gl/feat/co
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
68c9990eef | ||
|
|
8d8772d607 | ||
|
|
d916bc8695 |
388
lib/crewai/src/crewai/events/lifecycle_decorator.py
Normal file
388
lib/crewai/src/crewai/events/lifecycle_decorator.py
Normal file
@@ -0,0 +1,388 @@
|
|||||||
|
"""Decorators for automatic event lifecycle management.
|
||||||
|
|
||||||
|
This module provides decorators that automatically emit started/completed/failed
|
||||||
|
events for methods, reducing boilerplate code across the codebase.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
|
from functools import wraps
|
||||||
|
import time
|
||||||
|
from typing import Any, Concatenate, Literal, ParamSpec, TypeVar, TypedDict, cast
|
||||||
|
|
||||||
|
from crewai.events.base_events import BaseEvent
|
||||||
|
from crewai.events.event_bus import crewai_event_bus
|
||||||
|
from crewai.events.types.crew_events import (
|
||||||
|
CrewKickoffCompletedEvent,
|
||||||
|
CrewKickoffFailedEvent,
|
||||||
|
CrewKickoffStartedEvent,
|
||||||
|
CrewTestCompletedEvent,
|
||||||
|
CrewTestFailedEvent,
|
||||||
|
CrewTestStartedEvent,
|
||||||
|
CrewTrainCompletedEvent,
|
||||||
|
CrewTrainFailedEvent,
|
||||||
|
CrewTrainStartedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.memory_events import (
|
||||||
|
MemoryQueryCompletedEvent,
|
||||||
|
MemoryQueryFailedEvent,
|
||||||
|
MemoryQueryStartedEvent,
|
||||||
|
MemorySaveCompletedEvent,
|
||||||
|
MemorySaveFailedEvent,
|
||||||
|
MemorySaveStartedEvent,
|
||||||
|
)
|
||||||
|
from crewai.events.types.task_events import (
|
||||||
|
TaskCompletedEvent,
|
||||||
|
TaskFailedEvent,
|
||||||
|
TaskStartedEvent,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
P = ParamSpec("P")
|
||||||
|
R = TypeVar("R")
|
||||||
|
|
||||||
|
EventPrefix = Literal[
|
||||||
|
"task",
|
||||||
|
"memory_save",
|
||||||
|
"memory_query",
|
||||||
|
"crew_kickoff",
|
||||||
|
"crew_train",
|
||||||
|
"crew_test",
|
||||||
|
]
|
||||||
|
|
||||||
|
EventParams = dict[str, Any]
|
||||||
|
|
||||||
|
StartedParamsFn = Callable[[Any, tuple[Any, ...], dict[str, Any]], EventParams]
|
||||||
|
CompletedParamsFn = Callable[
|
||||||
|
[Any, tuple[Any, ...], dict[str, Any], Any, float], EventParams
|
||||||
|
]
|
||||||
|
FailedParamsFn = Callable[
|
||||||
|
[Any, tuple[Any, ...], dict[str, Any], Exception], EventParams
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class LifecycleEventClasses(TypedDict):
|
||||||
|
"""Mapping of lifecycle event types to their corresponding event classes."""
|
||||||
|
|
||||||
|
started: type[BaseEvent]
|
||||||
|
completed: type[BaseEvent]
|
||||||
|
failed: type[BaseEvent]
|
||||||
|
|
||||||
|
|
||||||
|
class EventClassMap(TypedDict):
|
||||||
|
"""Mapping of event prefixes to their lifecycle event classes."""
|
||||||
|
|
||||||
|
task: LifecycleEventClasses
|
||||||
|
memory_save: LifecycleEventClasses
|
||||||
|
memory_query: LifecycleEventClasses
|
||||||
|
crew_kickoff: LifecycleEventClasses
|
||||||
|
crew_train: LifecycleEventClasses
|
||||||
|
crew_test: LifecycleEventClasses
|
||||||
|
|
||||||
|
|
||||||
|
class LifecycleParamExtractors(TypedDict):
|
||||||
|
"""Parameter extractors for lifecycle events."""
|
||||||
|
|
||||||
|
started_params: StartedParamsFn
|
||||||
|
completed_params: CompletedParamsFn
|
||||||
|
failed_params: FailedParamsFn
|
||||||
|
|
||||||
|
|
||||||
|
EVENT_CLASS_MAP: EventClassMap = {
|
||||||
|
"task": {
|
||||||
|
"started": TaskStartedEvent,
|
||||||
|
"completed": TaskCompletedEvent,
|
||||||
|
"failed": TaskFailedEvent,
|
||||||
|
},
|
||||||
|
"memory_save": {
|
||||||
|
"started": MemorySaveStartedEvent,
|
||||||
|
"completed": MemorySaveCompletedEvent,
|
||||||
|
"failed": MemorySaveFailedEvent,
|
||||||
|
},
|
||||||
|
"memory_query": {
|
||||||
|
"started": MemoryQueryStartedEvent,
|
||||||
|
"completed": MemoryQueryCompletedEvent,
|
||||||
|
"failed": MemoryQueryFailedEvent,
|
||||||
|
},
|
||||||
|
"crew_kickoff": {
|
||||||
|
"started": CrewKickoffStartedEvent,
|
||||||
|
"completed": CrewKickoffCompletedEvent,
|
||||||
|
"failed": CrewKickoffFailedEvent,
|
||||||
|
},
|
||||||
|
"crew_train": {
|
||||||
|
"started": CrewTrainStartedEvent,
|
||||||
|
"completed": CrewTrainCompletedEvent,
|
||||||
|
"failed": CrewTrainFailedEvent,
|
||||||
|
},
|
||||||
|
"crew_test": {
|
||||||
|
"started": CrewTestStartedEvent,
|
||||||
|
"completed": CrewTestCompletedEvent,
|
||||||
|
"failed": CrewTestFailedEvent,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_arg(
|
||||||
|
position: str | int, args: tuple[Any, ...], kwargs: dict[str, Any]
|
||||||
|
) -> Any:
|
||||||
|
"""Extract argument by name from kwargs or by position from args.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
position: Argument name (str) or positional index (int).
|
||||||
|
args: Positional arguments tuple.
|
||||||
|
kwargs: Keyword arguments dict.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Extracted argument value or None if not found.
|
||||||
|
"""
|
||||||
|
if isinstance(position, str):
|
||||||
|
return kwargs.get(position)
|
||||||
|
try:
|
||||||
|
return args[position]
|
||||||
|
except IndexError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def lifecycle_params(
|
||||||
|
*,
|
||||||
|
args_map: dict[str, str | int] | None = None,
|
||||||
|
context: dict[str, Any | Callable[[Any], Any]] | None = None,
|
||||||
|
result_name: str | None = None,
|
||||||
|
elapsed_name: str = "elapsed_ms",
|
||||||
|
) -> LifecycleParamExtractors:
|
||||||
|
"""Helper to create lifecycle event parameter extractors with reduced boilerplate.
|
||||||
|
|
||||||
|
This function generates the three parameter extractors (started_params, completed_params,
|
||||||
|
failed_params) needed by @with_lifecycle_events, following common patterns and reducing
|
||||||
|
code duplication.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
args_map: Maps event parameter names to function argument names (str) or positions (int).
|
||||||
|
Example: {"query": "query", "value": 0} extracts kwargs["query"] and args[0]
|
||||||
|
context: Static or dynamic context fields included in all events.
|
||||||
|
Values can be static (Any) or callables that receive self and return a value.
|
||||||
|
Example: {"source_type": "external_memory", "from_agent": lambda self: self.agent}
|
||||||
|
result_name: Name for the result in completed_params (e.g., "results", "output").
|
||||||
|
If None, result is not included in the event.
|
||||||
|
elapsed_name: Name for elapsed time in completed_params (default: "elapsed_ms").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with keys "started_params", "completed_params", "failed_params"
|
||||||
|
containing the appropriate lambda functions for @with_lifecycle_events.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> param_extractors = lifecycle_params(
|
||||||
|
... args_map={"value": "value", "metadata": "metadata"},
|
||||||
|
... context={
|
||||||
|
... "source_type": "external_memory",
|
||||||
|
... "from_agent": lambda self: self.agent,
|
||||||
|
... "from_task": lambda self: self.task,
|
||||||
|
... },
|
||||||
|
... elapsed_name="save_time_ms",
|
||||||
|
... )
|
||||||
|
>>> param_extractors["started_params"] # doctest: +ELLIPSIS
|
||||||
|
<function lifecycle_params.<locals>.started_params_fn at 0x...>
|
||||||
|
"""
|
||||||
|
args_map = args_map or {}
|
||||||
|
context = context or {}
|
||||||
|
|
||||||
|
static_context: EventParams = {}
|
||||||
|
dynamic_context: dict[str, Callable[[Any], Any]] = {}
|
||||||
|
for ctx_key, ctx_value in context.items():
|
||||||
|
if callable(ctx_value):
|
||||||
|
dynamic_context[ctx_key] = ctx_value
|
||||||
|
else:
|
||||||
|
static_context[ctx_key] = ctx_value
|
||||||
|
|
||||||
|
def started_params_fn(
|
||||||
|
self: Any, args: tuple[Any, ...], kwargs: dict[str, Any]
|
||||||
|
) -> EventParams:
|
||||||
|
"""Extract parameters for started event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
self: Instance emitting the event.
|
||||||
|
args: Positional arguments from decorated method.
|
||||||
|
kwargs: Keyword arguments from decorated method.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parameters for started event.
|
||||||
|
"""
|
||||||
|
params: EventParams = {**static_context}
|
||||||
|
for param_name, arg_spec in args_map.items():
|
||||||
|
params[param_name] = _extract_arg(arg_spec, args, kwargs)
|
||||||
|
for key, func in dynamic_context.items():
|
||||||
|
params[key] = func(self)
|
||||||
|
return params
|
||||||
|
|
||||||
|
def completed_params_fn(
|
||||||
|
self: Any,
|
||||||
|
args: tuple[Any, ...],
|
||||||
|
kwargs: dict[str, Any],
|
||||||
|
result: Any,
|
||||||
|
elapsed_ms: float,
|
||||||
|
) -> EventParams:
|
||||||
|
"""Extract parameters for completed event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
self: Instance emitting the event.
|
||||||
|
args: Positional arguments from decorated method.
|
||||||
|
kwargs: Keyword arguments from decorated method.
|
||||||
|
result: Return value from decorated method.
|
||||||
|
elapsed_ms: Elapsed execution time in milliseconds.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parameters for completed event.
|
||||||
|
"""
|
||||||
|
params: EventParams = {**static_context}
|
||||||
|
for param_name, arg_spec in args_map.items():
|
||||||
|
params[param_name] = _extract_arg(arg_spec, args, kwargs)
|
||||||
|
if result_name is not None:
|
||||||
|
params[result_name] = result
|
||||||
|
params[elapsed_name] = elapsed_ms
|
||||||
|
for key, func in dynamic_context.items():
|
||||||
|
params[key] = func(self)
|
||||||
|
return params
|
||||||
|
|
||||||
|
def failed_params_fn(
|
||||||
|
self: Any, args: tuple[Any, ...], kwargs: dict[str, Any], exc: Exception
|
||||||
|
) -> EventParams:
|
||||||
|
"""Extract parameters for failed event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
self: Instance emitting the event.
|
||||||
|
args: Positional arguments from decorated method.
|
||||||
|
kwargs: Keyword arguments from decorated method.
|
||||||
|
exc: Exception raised during execution.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Parameters for failed event.
|
||||||
|
"""
|
||||||
|
params: EventParams = {**static_context}
|
||||||
|
for param_name, arg_spec in args_map.items():
|
||||||
|
params[param_name] = _extract_arg(arg_spec, args, kwargs)
|
||||||
|
params["error"] = str(exc)
|
||||||
|
for key, func in dynamic_context.items():
|
||||||
|
params[key] = func(self)
|
||||||
|
return params
|
||||||
|
|
||||||
|
return {
|
||||||
|
"started_params": started_params_fn,
|
||||||
|
"completed_params": completed_params_fn,
|
||||||
|
"failed_params": failed_params_fn,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def with_lifecycle_events(
|
||||||
|
prefix: EventPrefix,
|
||||||
|
*,
|
||||||
|
args_map: dict[str, str | int] | None = None,
|
||||||
|
context: dict[str, Any | Callable[[Any], Any]] | None = None,
|
||||||
|
result_name: str | None = None,
|
||||||
|
elapsed_name: str = "elapsed_ms",
|
||||||
|
) -> Callable[[Callable[Concatenate[Any, P], R]], Callable[Concatenate[Any, P], R]]:
|
||||||
|
"""Decorator to automatically emit lifecycle events (started/completed/failed).
|
||||||
|
|
||||||
|
This decorator wraps a method to emit events at different stages of execution:
|
||||||
|
- StartedEvent: Emitted before method execution
|
||||||
|
- CompletedEvent: Emitted after successful execution (includes timing via monotonic_ns)
|
||||||
|
- FailedEvent: Emitted if an exception occurs (re-raises the exception)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
prefix: Event prefix from the EventPrefix Literal type. Determines which
|
||||||
|
event classes to use (e.g., "task" -> TaskStartedEvent, etc.)
|
||||||
|
args_map: Maps event parameter names to function argument names (str) or positions (int).
|
||||||
|
Example: {"query": "query", "value": 0} extracts kwargs["query"] and args[0]
|
||||||
|
context: Static or dynamic context fields included in all events.
|
||||||
|
Values can be static (Any) or callables that receive self and return a value.
|
||||||
|
Example: {"source_type": "external_memory", "from_agent": lambda self: self.agent}
|
||||||
|
result_name: Name for the result in completed_params (e.g., "results", "output").
|
||||||
|
If None, result is not included in the event.
|
||||||
|
elapsed_name: Name for elapsed time in completed_params (default: "elapsed_ms").
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Decorated function that emits lifecycle events.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> @with_lifecycle_events(
|
||||||
|
... "memory_save",
|
||||||
|
... args_map={"value": "value", "metadata": "metadata"},
|
||||||
|
... context={
|
||||||
|
... "source_type": "external_memory",
|
||||||
|
... "from_agent": lambda self: self.agent,
|
||||||
|
... },
|
||||||
|
... elapsed_name="save_time_ms",
|
||||||
|
... )
|
||||||
|
... def save(self, value: Any, metadata: dict[str, Any] | None = None) -> None:
|
||||||
|
... pass
|
||||||
|
"""
|
||||||
|
param_extractors = lifecycle_params(
|
||||||
|
args_map=args_map,
|
||||||
|
context=context,
|
||||||
|
result_name=result_name,
|
||||||
|
elapsed_name=elapsed_name,
|
||||||
|
)
|
||||||
|
started_params: StartedParamsFn = param_extractors["started_params"]
|
||||||
|
completed_params: CompletedParamsFn = param_extractors["completed_params"]
|
||||||
|
failed_params: FailedParamsFn = param_extractors["failed_params"]
|
||||||
|
|
||||||
|
event_classes = EVENT_CLASS_MAP[prefix]
|
||||||
|
|
||||||
|
def decorator(
|
||||||
|
func: Callable[Concatenate[Any, P], R],
|
||||||
|
) -> Callable[Concatenate[Any, P], R]:
|
||||||
|
"""Apply lifecycle event emission to the decorated function.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: Function to decorate.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Decorated function with lifecycle event emission.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
|
def wrapper(self: Any, *args: P.args, **kwargs: P.kwargs) -> R:
|
||||||
|
"""Execute function with lifecycle event emission.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
self: Instance calling the method.
|
||||||
|
*args: Positional arguments.
|
||||||
|
**kwargs: Keyword arguments.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Result from the decorated function.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: Re-raises any exception after emitting failed event.
|
||||||
|
"""
|
||||||
|
started_event_params = started_params(self, args, kwargs)
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
event_classes["started"](**started_event_params),
|
||||||
|
)
|
||||||
|
|
||||||
|
start_time = time.monotonic_ns()
|
||||||
|
try:
|
||||||
|
result = func(self, *args, **kwargs)
|
||||||
|
completed_event_params = completed_params(
|
||||||
|
self,
|
||||||
|
args,
|
||||||
|
kwargs,
|
||||||
|
result,
|
||||||
|
(time.monotonic_ns() - start_time) / 1_000_000,
|
||||||
|
)
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
event_classes["completed"](**completed_event_params),
|
||||||
|
)
|
||||||
|
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
failed_event_params = failed_params(self, args, kwargs, e)
|
||||||
|
crewai_event_bus.emit(
|
||||||
|
self,
|
||||||
|
event_classes["failed"](**failed_event_params),
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
return cast(Callable[Concatenate[Any, P], R], wrapper)
|
||||||
|
|
||||||
|
return decorator
|
||||||
@@ -1,17 +1,9 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import time
|
from collections.abc import Callable
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any, cast
|
||||||
|
|
||||||
from crewai.events.event_bus import crewai_event_bus
|
from crewai.events.lifecycle_decorator import with_lifecycle_events
|
||||||
from crewai.events.types.memory_events import (
|
|
||||||
MemoryQueryCompletedEvent,
|
|
||||||
MemoryQueryFailedEvent,
|
|
||||||
MemoryQueryStartedEvent,
|
|
||||||
MemorySaveCompletedEvent,
|
|
||||||
MemorySaveFailedEvent,
|
|
||||||
MemorySaveStartedEvent,
|
|
||||||
)
|
|
||||||
from crewai.memory.external.external_memory_item import ExternalMemoryItem
|
from crewai.memory.external.external_memory_item import ExternalMemoryItem
|
||||||
from crewai.memory.memory import Memory
|
from crewai.memory.memory import Memory
|
||||||
from crewai.memory.storage.interface import Storage
|
from crewai.memory.storage.interface import Storage
|
||||||
@@ -19,29 +11,31 @@ from crewai.rag.embeddings.types import ProviderSpec
|
|||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from crewai.memory.storage.mem0_storage import Mem0Storage
|
from crewai.crew import Crew
|
||||||
|
|
||||||
|
|
||||||
class ExternalMemory(Memory):
|
class ExternalMemory(Memory):
|
||||||
def __init__(self, storage: Storage | None = None, **data: Any):
|
def __init__(self, storage: Storage | None = None, **data: Any) -> None:
|
||||||
super().__init__(storage=storage, **data)
|
super().__init__(storage=storage, **data)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _configure_mem0(crew: Any, config: dict[str, Any]) -> Mem0Storage:
|
def _configure_mem0(crew: Crew, config: dict[str, Any]) -> Storage:
|
||||||
from crewai.memory.storage.mem0_storage import Mem0Storage
|
from crewai.memory.storage.mem0_storage import Mem0Config, Mem0Storage
|
||||||
|
|
||||||
return Mem0Storage(type="external", crew=crew, config=config)
|
return Mem0Storage(
|
||||||
|
type="external", crew=crew, config=cast(Mem0Config, cast(object, config))
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def external_supported_storages() -> dict[str, Any]:
|
def external_supported_storages() -> dict[
|
||||||
|
str, Callable[[Crew, dict[str, Any]], Storage]
|
||||||
|
]:
|
||||||
return {
|
return {
|
||||||
"mem0": ExternalMemory._configure_mem0,
|
"mem0": ExternalMemory._configure_mem0,
|
||||||
}
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_storage(
|
def create_storage(crew: Crew, embedder_config: ProviderSpec | None) -> Storage:
|
||||||
crew: Any, embedder_config: dict[str, Any] | ProviderSpec | None
|
|
||||||
) -> Storage:
|
|
||||||
if not embedder_config:
|
if not embedder_config:
|
||||||
raise ValueError("embedder_config is required")
|
raise ValueError("embedder_config is required")
|
||||||
|
|
||||||
@@ -53,115 +47,59 @@ class ExternalMemory(Memory):
|
|||||||
if provider not in supported_storages:
|
if provider not in supported_storages:
|
||||||
raise ValueError(f"Provider {provider} not supported")
|
raise ValueError(f"Provider {provider} not supported")
|
||||||
|
|
||||||
return supported_storages[provider](crew, embedder_config.get("config", {}))
|
config = embedder_config.get("config", {})
|
||||||
|
return supported_storages[provider](crew, cast(dict[str, Any], config))
|
||||||
|
|
||||||
|
@with_lifecycle_events(
|
||||||
|
"memory_save",
|
||||||
|
args_map={"value": "value", "metadata": "metadata"},
|
||||||
|
context={
|
||||||
|
"source_type": "external_memory",
|
||||||
|
"from_agent": lambda self: self.agent,
|
||||||
|
"from_task": lambda self: self.task,
|
||||||
|
},
|
||||||
|
elapsed_name="save_time_ms",
|
||||||
|
)
|
||||||
def save(
|
def save(
|
||||||
self,
|
self,
|
||||||
value: Any,
|
value: Any,
|
||||||
metadata: dict[str, Any] | None = None,
|
metadata: dict[str, Any] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Saves a value into the external storage."""
|
"""Saves a value into the external storage."""
|
||||||
crewai_event_bus.emit(
|
item = ExternalMemoryItem(
|
||||||
self,
|
value=value,
|
||||||
event=MemorySaveStartedEvent(
|
metadata=metadata,
|
||||||
value=value,
|
agent=self.agent.role if self.agent else None,
|
||||||
metadata=metadata,
|
|
||||||
source_type="external_memory",
|
|
||||||
from_agent=self.agent,
|
|
||||||
from_task=self.task,
|
|
||||||
),
|
|
||||||
)
|
)
|
||||||
|
super().save(value=item.value, metadata=item.metadata)
|
||||||
|
|
||||||
start_time = time.time()
|
@with_lifecycle_events(
|
||||||
try:
|
"memory_query",
|
||||||
item = ExternalMemoryItem(
|
args_map={
|
||||||
value=value,
|
"query": "query",
|
||||||
metadata=metadata,
|
"limit": "limit",
|
||||||
agent=self.agent.role if self.agent else None,
|
"score_threshold": "score_threshold",
|
||||||
)
|
},
|
||||||
super().save(value=item.value, metadata=item.metadata)
|
context={
|
||||||
|
"source_type": "external_memory",
|
||||||
crewai_event_bus.emit(
|
"from_agent": lambda self: self.agent,
|
||||||
self,
|
"from_task": lambda self: self.task,
|
||||||
event=MemorySaveCompletedEvent(
|
},
|
||||||
value=value,
|
result_name="results",
|
||||||
metadata=metadata,
|
elapsed_name="query_time_ms",
|
||||||
save_time_ms=(time.time() - start_time) * 1000,
|
)
|
||||||
source_type="external_memory",
|
|
||||||
from_agent=self.agent,
|
|
||||||
from_task=self.task,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=MemorySaveFailedEvent(
|
|
||||||
value=value,
|
|
||||||
metadata=metadata,
|
|
||||||
error=str(e),
|
|
||||||
source_type="external_memory",
|
|
||||||
from_agent=self.agent,
|
|
||||||
from_task=self.task,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
def search(
|
def search(
|
||||||
self,
|
self,
|
||||||
query: str,
|
query: str,
|
||||||
limit: int = 5,
|
limit: int = 5,
|
||||||
score_threshold: float = 0.6,
|
score_threshold: float = 0.6,
|
||||||
):
|
) -> Any:
|
||||||
crewai_event_bus.emit(
|
return super().search(query=query, limit=limit, score_threshold=score_threshold)
|
||||||
self,
|
|
||||||
event=MemoryQueryStartedEvent(
|
|
||||||
query=query,
|
|
||||||
limit=limit,
|
|
||||||
score_threshold=score_threshold,
|
|
||||||
source_type="external_memory",
|
|
||||||
from_agent=self.agent,
|
|
||||||
from_task=self.task,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
try:
|
|
||||||
results = super().search(
|
|
||||||
query=query, limit=limit, score_threshold=score_threshold
|
|
||||||
)
|
|
||||||
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=MemoryQueryCompletedEvent(
|
|
||||||
query=query,
|
|
||||||
results=results,
|
|
||||||
limit=limit,
|
|
||||||
score_threshold=score_threshold,
|
|
||||||
query_time_ms=(time.time() - start_time) * 1000,
|
|
||||||
source_type="external_memory",
|
|
||||||
from_agent=self.agent,
|
|
||||||
from_task=self.task,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
return results
|
|
||||||
except Exception as e:
|
|
||||||
crewai_event_bus.emit(
|
|
||||||
self,
|
|
||||||
event=MemoryQueryFailedEvent(
|
|
||||||
query=query,
|
|
||||||
limit=limit,
|
|
||||||
score_threshold=score_threshold,
|
|
||||||
error=str(e),
|
|
||||||
source_type="external_memory",
|
|
||||||
),
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
def reset(self) -> None:
|
def reset(self) -> None:
|
||||||
self.storage.reset()
|
self.storage.reset()
|
||||||
|
|
||||||
def set_crew(self, crew: Any) -> ExternalMemory:
|
def set_crew(self, crew: Crew) -> ExternalMemory:
|
||||||
super().set_crew(crew)
|
super().set_crew(crew)
|
||||||
|
|
||||||
if not self.storage:
|
if not self.storage:
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any, cast
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@@ -24,9 +24,6 @@ class Memory(BaseModel):
|
|||||||
_agent: Agent | None = None
|
_agent: Agent | None = None
|
||||||
_task: Task | None = None
|
_task: Task | None = None
|
||||||
|
|
||||||
def __init__(self, storage: Any, **data: Any):
|
|
||||||
super().__init__(storage=storage, **data)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def task(self) -> Task | None:
|
def task(self) -> Task | None:
|
||||||
"""Get the current task associated with this memory."""
|
"""Get the current task associated with this memory."""
|
||||||
@@ -62,8 +59,11 @@ class Memory(BaseModel):
|
|||||||
limit: int = 5,
|
limit: int = 5,
|
||||||
score_threshold: float = 0.6,
|
score_threshold: float = 0.6,
|
||||||
) -> list[Any]:
|
) -> list[Any]:
|
||||||
return self.storage.search(
|
return cast(
|
||||||
query=query, limit=limit, score_threshold=score_threshold
|
list[Any],
|
||||||
|
self.storage.search(
|
||||||
|
query=query, limit=limit, score_threshold=score_threshold
|
||||||
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
def set_crew(self, crew: Any) -> Memory:
|
def set_crew(self, crew: Any) -> Memory:
|
||||||
|
|||||||
@@ -1,16 +1,83 @@
|
|||||||
from collections import defaultdict
|
from __future__ import annotations
|
||||||
|
|
||||||
from collections.abc import Iterable
|
from collections.abc import Iterable
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from typing import Any
|
from typing import TYPE_CHECKING, Any, Final, Literal, TypedDict
|
||||||
|
|
||||||
from mem0 import Memory, MemoryClient # type: ignore[import-untyped,import-not-found]
|
from mem0 import Memory, MemoryClient # type: ignore[import-untyped]
|
||||||
|
|
||||||
from crewai.memory.storage.interface import Storage
|
from crewai.memory.storage.interface import Storage
|
||||||
from crewai.rag.chromadb.utils import _sanitize_collection_name
|
from crewai.rag.chromadb.utils import _sanitize_collection_name
|
||||||
|
|
||||||
|
|
||||||
MAX_AGENT_ID_LENGTH_MEM0 = 255
|
if TYPE_CHECKING:
|
||||||
|
from crewai.crew import Crew
|
||||||
|
from crewai.utilities.types import LLMMessage, MessageRole
|
||||||
|
|
||||||
|
|
||||||
|
MAX_AGENT_ID_LENGTH_MEM0: Final[int] = 255
|
||||||
|
_ASSISTANT_MESSAGE_MARKER: Final[str] = "Final Answer:"
|
||||||
|
_USER_MESSAGE_PATTERN: Final[re.Pattern[str]] = re.compile(r"User message:\s*(.*)")
|
||||||
|
|
||||||
|
|
||||||
|
class BaseMetadata(TypedDict):
|
||||||
|
short_term: Literal["short_term"]
|
||||||
|
long_term: Literal["long_term"]
|
||||||
|
entities: Literal["entity"]
|
||||||
|
external: Literal["external"]
|
||||||
|
|
||||||
|
|
||||||
|
BASE_METADATA: Final[BaseMetadata] = {
|
||||||
|
"short_term": "short_term",
|
||||||
|
"long_term": "long_term",
|
||||||
|
"entities": "entity",
|
||||||
|
"external": "external",
|
||||||
|
}
|
||||||
|
|
||||||
|
MEMORY_TYPE_MAP: Final[dict[str, dict[str, str]]] = {
|
||||||
|
"short_term": {"type": "short_term"},
|
||||||
|
"long_term": {"type": "long_term"},
|
||||||
|
"entities": {"type": "entity"},
|
||||||
|
"external": {"type": "external"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class BaseParams(TypedDict, total=False):
|
||||||
|
"""Parameters for Mem0 memory operations."""
|
||||||
|
|
||||||
|
metadata: dict[str, Any]
|
||||||
|
infer: bool
|
||||||
|
includes: Any
|
||||||
|
excludes: Any
|
||||||
|
output_format: str
|
||||||
|
version: str
|
||||||
|
run_id: str
|
||||||
|
user_id: str
|
||||||
|
agent_id: str
|
||||||
|
|
||||||
|
|
||||||
|
class Mem0Config(TypedDict, total=False):
|
||||||
|
"""Configuration for Mem0Storage."""
|
||||||
|
|
||||||
|
run_id: str
|
||||||
|
includes: Any
|
||||||
|
excludes: Any
|
||||||
|
custom_categories: Any
|
||||||
|
infer: bool
|
||||||
|
api_key: str
|
||||||
|
org_id: str
|
||||||
|
project_id: str
|
||||||
|
local_mem0_config: Any
|
||||||
|
user_id: str
|
||||||
|
agent_id: str
|
||||||
|
|
||||||
|
|
||||||
|
class Mem0Filter(TypedDict, total=False):
|
||||||
|
"""Filter dictionary for Mem0 search operations."""
|
||||||
|
|
||||||
|
AND: list[dict[str, Any]]
|
||||||
|
OR: list[dict[str, Any]]
|
||||||
|
|
||||||
|
|
||||||
class Mem0Storage(Storage):
|
class Mem0Storage(Storage):
|
||||||
@@ -18,33 +85,22 @@ class Mem0Storage(Storage):
|
|||||||
Extends Storage to handle embedding and searching across entities using Mem0.
|
Extends Storage to handle embedding and searching across entities using Mem0.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, type, crew=None, config=None):
|
def __init__(
|
||||||
super().__init__()
|
self,
|
||||||
|
type: Literal["short_term", "long_term", "entities", "external"],
|
||||||
self._validate_type(type)
|
crew: Crew | None = None,
|
||||||
|
config: Mem0Config | None = None,
|
||||||
|
) -> None:
|
||||||
self.memory_type = type
|
self.memory_type = type
|
||||||
self.crew = crew
|
self.crew = crew
|
||||||
self.config = config or {}
|
if config is None:
|
||||||
|
config = {}
|
||||||
self._extract_config_values()
|
self.config: Mem0Config = config
|
||||||
self._initialize_memory()
|
self.mem0_run_id = config.get("run_id")
|
||||||
|
self.includes = config.get("includes")
|
||||||
def _validate_type(self, type):
|
self.excludes = config.get("excludes")
|
||||||
supported_types = {"short_term", "long_term", "entities", "external"}
|
self.custom_categories = config.get("custom_categories")
|
||||||
if type not in supported_types:
|
self.infer = config.get("infer", True)
|
||||||
raise ValueError(
|
|
||||||
f"Invalid type '{type}' for Mem0Storage. "
|
|
||||||
f"Must be one of: {', '.join(supported_types)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _extract_config_values(self):
|
|
||||||
self.mem0_run_id = self.config.get("run_id")
|
|
||||||
self.includes = self.config.get("includes")
|
|
||||||
self.excludes = self.config.get("excludes")
|
|
||||||
self.custom_categories = self.config.get("custom_categories")
|
|
||||||
self.infer = self.config.get("infer", True)
|
|
||||||
|
|
||||||
def _initialize_memory(self):
|
|
||||||
api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY")
|
api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY")
|
||||||
org_id = self.config.get("org_id")
|
org_id = self.config.get("org_id")
|
||||||
project_id = self.config.get("project_id")
|
project_id = self.config.get("project_id")
|
||||||
@@ -65,47 +121,39 @@ class Mem0Storage(Storage):
|
|||||||
else Memory()
|
else Memory()
|
||||||
)
|
)
|
||||||
|
|
||||||
def _create_filter_for_search(self):
|
def _create_filter_for_search(self) -> Mem0Filter:
|
||||||
"""
|
"""Create filter dictionary for search operations.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: A filter dictionary containing AND conditions for querying data.
|
Filter dictionary containing AND/OR conditions for querying data.
|
||||||
- Includes user_id and agent_id if both are present.
|
|
||||||
- Includes user_id if only user_id is present.
|
|
||||||
- Includes agent_id if only agent_id is present.
|
|
||||||
- Includes run_id if memory_type is 'short_term' and
|
|
||||||
mem0_run_id is present.
|
|
||||||
"""
|
"""
|
||||||
filter = defaultdict(list)
|
|
||||||
|
|
||||||
if self.memory_type == "short_term" and self.mem0_run_id:
|
if self.memory_type == "short_term" and self.mem0_run_id:
|
||||||
filter["AND"].append({"run_id": self.mem0_run_id})
|
return {"AND": [{"run_id": self.mem0_run_id}]}
|
||||||
else:
|
|
||||||
user_id = self.config.get("user_id", "")
|
|
||||||
agent_id = self.config.get("agent_id", "")
|
|
||||||
|
|
||||||
if user_id and agent_id:
|
user_id = self.config.get("user_id")
|
||||||
filter["OR"].append({"user_id": user_id})
|
agent_id = self.config.get("agent_id")
|
||||||
filter["OR"].append({"agent_id": agent_id})
|
if user_id and agent_id:
|
||||||
elif user_id:
|
return {"OR": [{"user_id": user_id}, {"agent_id": agent_id}]}
|
||||||
filter["AND"].append({"user_id": user_id})
|
if user_id:
|
||||||
elif agent_id:
|
return {"AND": [{"user_id": user_id}]}
|
||||||
filter["AND"].append({"agent_id": agent_id})
|
if agent_id:
|
||||||
|
return {"AND": [{"agent_id": agent_id}]}
|
||||||
return filter
|
return {}
|
||||||
|
|
||||||
def save(self, value: Any, metadata: dict[str, Any]) -> None:
|
def save(self, value: Any, metadata: dict[str, Any]) -> None:
|
||||||
def _last_content(messages: Iterable[dict[str, Any]], role: str) -> str:
|
def _last_content(messages_: Iterable[LLMMessage], role: MessageRole) -> str:
|
||||||
return next(
|
content = next(
|
||||||
(
|
(
|
||||||
m.get("content", "")
|
m.get("content", "")
|
||||||
for m in reversed(list(messages))
|
for m in reversed(list(messages_))
|
||||||
if m.get("role") == role
|
if m.get("role") == role
|
||||||
),
|
),
|
||||||
"",
|
"",
|
||||||
)
|
)
|
||||||
|
return str(content) if content else ""
|
||||||
|
|
||||||
conversations = []
|
conversations = []
|
||||||
messages = metadata.pop("messages", None)
|
messages: Iterable[LLMMessage] = metadata.pop("messages", [])
|
||||||
if messages:
|
if messages:
|
||||||
last_user = _last_content(messages, "user")
|
last_user = _last_content(messages, "user")
|
||||||
last_assistant = _last_content(messages, "assistant")
|
last_assistant = _last_content(messages, "assistant")
|
||||||
@@ -120,20 +168,11 @@ class Mem0Storage(Storage):
|
|||||||
|
|
||||||
user_id = self.config.get("user_id", "")
|
user_id = self.config.get("user_id", "")
|
||||||
|
|
||||||
base_metadata = {
|
params: BaseParams = {
|
||||||
"short_term": "short_term",
|
"metadata": {"type": BASE_METADATA[self.memory_type], **metadata},
|
||||||
"long_term": "long_term",
|
|
||||||
"entities": "entity",
|
|
||||||
"external": "external",
|
|
||||||
}
|
|
||||||
|
|
||||||
# Shared base params
|
|
||||||
params: dict[str, Any] = {
|
|
||||||
"metadata": {"type": base_metadata[self.memory_type], **metadata},
|
|
||||||
"infer": self.infer,
|
"infer": self.infer,
|
||||||
}
|
}
|
||||||
|
|
||||||
# MemoryClient-specific overrides
|
|
||||||
if isinstance(self.memory, MemoryClient):
|
if isinstance(self.memory, MemoryClient):
|
||||||
params["includes"] = self.includes
|
params["includes"] = self.includes
|
||||||
params["excludes"] = self.excludes
|
params["excludes"] = self.excludes
|
||||||
@@ -154,7 +193,7 @@ class Mem0Storage(Storage):
|
|||||||
def search(
|
def search(
|
||||||
self, query: str, limit: int = 5, score_threshold: float = 0.6
|
self, query: str, limit: int = 5, score_threshold: float = 0.6
|
||||||
) -> list[Any]:
|
) -> list[Any]:
|
||||||
params = {
|
params: dict[str, Any] = {
|
||||||
"query": query,
|
"query": query,
|
||||||
"limit": limit,
|
"limit": limit,
|
||||||
"version": "v2",
|
"version": "v2",
|
||||||
@@ -164,15 +203,8 @@ class Mem0Storage(Storage):
|
|||||||
if user_id := self.config.get("user_id", ""):
|
if user_id := self.config.get("user_id", ""):
|
||||||
params["user_id"] = user_id
|
params["user_id"] = user_id
|
||||||
|
|
||||||
memory_type_map = {
|
if self.memory_type in MEMORY_TYPE_MAP:
|
||||||
"short_term": {"type": "short_term"},
|
params["metadata"] = MEMORY_TYPE_MAP[self.memory_type]
|
||||||
"long_term": {"type": "long_term"},
|
|
||||||
"entities": {"type": "entity"},
|
|
||||||
"external": {"type": "external"},
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.memory_type in memory_type_map:
|
|
||||||
params["metadata"] = memory_type_map[self.memory_type]
|
|
||||||
if self.memory_type == "short_term":
|
if self.memory_type == "short_term":
|
||||||
params["run_id"] = self.mem0_run_id
|
params["run_id"] = self.mem0_run_id
|
||||||
|
|
||||||
@@ -195,11 +227,12 @@ class Mem0Storage(Storage):
|
|||||||
|
|
||||||
return [r for r in results["results"]]
|
return [r for r in results["results"]]
|
||||||
|
|
||||||
def reset(self):
|
def reset(self) -> None:
|
||||||
if self.memory:
|
if self.memory:
|
||||||
self.memory.reset()
|
self.memory.reset()
|
||||||
|
|
||||||
def _sanitize_role(self, role: str) -> str:
|
@staticmethod
|
||||||
|
def _sanitize_role(role: str) -> str:
|
||||||
"""
|
"""
|
||||||
Sanitizes agent roles to ensure valid directory names.
|
Sanitizes agent roles to ensure valid directory names.
|
||||||
"""
|
"""
|
||||||
@@ -210,21 +243,20 @@ class Mem0Storage(Storage):
|
|||||||
return ""
|
return ""
|
||||||
|
|
||||||
agents = self.crew.agents
|
agents = self.crew.agents
|
||||||
agents = [self._sanitize_role(agent.role) for agent in agents]
|
agents_roles = "".join([self._sanitize_role(agent.role) for agent in agents])
|
||||||
agents = "_".join(agents)
|
|
||||||
return _sanitize_collection_name(
|
return _sanitize_collection_name(
|
||||||
name=agents, max_collection_length=MAX_AGENT_ID_LENGTH_MEM0
|
name=agents_roles, max_collection_length=MAX_AGENT_ID_LENGTH_MEM0
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_assistant_message(self, text: str) -> str:
|
@staticmethod
|
||||||
marker = "Final Answer:"
|
def _get_assistant_message(text: str) -> str:
|
||||||
if marker in text:
|
if _ASSISTANT_MESSAGE_MARKER in text:
|
||||||
return text.split(marker, 1)[1].strip()
|
return text.split(_ASSISTANT_MESSAGE_MARKER, 1)[1].strip()
|
||||||
return text
|
return text
|
||||||
|
|
||||||
def _get_user_message(self, text: str) -> str:
|
@staticmethod
|
||||||
pattern = r"User message:\s*(.*)"
|
def _get_user_message(text: str) -> str:
|
||||||
match = re.search(pattern, text)
|
match = _USER_MESSAGE_PATTERN.search(text)
|
||||||
if match:
|
if match:
|
||||||
return match.group(1).strip()
|
return match.group(1).strip()
|
||||||
return text
|
return text
|
||||||
|
|||||||
@@ -3,6 +3,9 @@
|
|||||||
from typing import Any, Literal, TypedDict
|
from typing import Any, Literal, TypedDict
|
||||||
|
|
||||||
|
|
||||||
|
MessageRole = Literal["user", "assistant", "system"]
|
||||||
|
|
||||||
|
|
||||||
class LLMMessage(TypedDict):
|
class LLMMessage(TypedDict):
|
||||||
"""Type for formatted LLM messages.
|
"""Type for formatted LLM messages.
|
||||||
|
|
||||||
@@ -11,5 +14,5 @@ class LLMMessage(TypedDict):
|
|||||||
instead of str | list[dict[str, str]]
|
instead of str | list[dict[str, str]]
|
||||||
"""
|
"""
|
||||||
|
|
||||||
role: Literal["user", "assistant", "system"]
|
role: MessageRole
|
||||||
content: str | list[dict[str, Any]]
|
content: str | list[dict[str, Any]]
|
||||||
|
|||||||
Reference in New Issue
Block a user