Fix agent kn reset (#2765)

* CLI command added

* Added reset agent knowledge function

* Reduced verbose

* Added test cases

* Added docs

* Llama test case failing

* Changed _reset_agent_knowledge function

* Fixed new line error

* Added docs

* fixed the new line error

* Refractored

* Uncommmented some test cases

* ruff check fixed

* fixed run type checks

* fixed run type checks

* fixed run type checks

* Made reset_fn callable by casting to silence run type checks

* Changed the reset_knowledge as it expects only list of knowledge

* Fixed typo in docs

* Refractored the memory_system

* Minor Changes

* fixed test case

* Fixed linting issues

* Network test cases failing

---------

Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
This commit is contained in:
Vidit Ostwal
2025-05-15 00:43:39 +05:30
committed by GitHub
parent e1541b2619
commit 30ef8ed70b
8 changed files with 279 additions and 40 deletions

View File

@@ -1356,7 +1356,7 @@ class Crew(FlowTrackable, BaseModel):
Args:
command_type: Type of memory to reset.
Valid options: 'long', 'short', 'entity', 'knowledge',
Valid options: 'long', 'short', 'entity', 'knowledge', 'agent_knowledge'
'kickoff_outputs', or 'all'
Raises:
@@ -1369,6 +1369,7 @@ class Crew(FlowTrackable, BaseModel):
"short",
"entity",
"knowledge",
"agent_knowledge",
"kickoff_outputs",
"all",
"external",
@@ -1393,19 +1394,14 @@ class Crew(FlowTrackable, BaseModel):
def _reset_all_memories(self) -> None:
"""Reset all available memory systems."""
memory_systems = [
("short term", getattr(self, "_short_term_memory", None)),
("entity", getattr(self, "_entity_memory", None)),
("external", getattr(self, "_external_memory", None)),
("long term", getattr(self, "_long_term_memory", None)),
("task output", getattr(self, "_task_output_handler", None)),
("knowledge", getattr(self, "knowledge", None)),
]
memory_systems = self._get_memory_systems()
for name, system in memory_systems:
if system is not None:
for memory_type, config in memory_systems.items():
if (system := config.get('system')) is not None:
name = config.get('name')
try:
system.reset()
reset_fn: Callable = cast(Callable, config.get('reset'))
reset_fn(system)
self._logger.log(
"info",
f"[Crew ({self.name if self.name else self.id})] {name} memory has been reset",
@@ -1424,24 +1420,17 @@ class Crew(FlowTrackable, BaseModel):
Raises:
RuntimeError: If the specified memory system fails to reset
"""
reset_functions = {
"long": (getattr(self, "_long_term_memory", None), "long term"),
"short": (getattr(self, "_short_term_memory", None), "short term"),
"entity": (getattr(self, "_entity_memory", None), "entity"),
"knowledge": (getattr(self, "knowledge", None), "knowledge"),
"kickoff_outputs": (
getattr(self, "_task_output_handler", None),
"task output",
),
"external": (getattr(self, "_external_memory", None), "external"),
}
memory_systems = self._get_memory_systems()
config = memory_systems[memory_type]
system = config.get('system')
name = config.get('name')
memory_system, name = reset_functions[memory_type]
if memory_system is None:
if system is None:
raise RuntimeError(f"{name} memory system is not initialized")
try:
memory_system.reset()
reset_fn: Callable = cast(Callable, config.get('reset'))
reset_fn(system)
self._logger.log(
"info",
f"[Crew ({self.name if self.name else self.id})] {name} memory has been reset",
@@ -1450,3 +1439,64 @@ class Crew(FlowTrackable, BaseModel):
raise RuntimeError(
f"[Crew ({self.name if self.name else self.id})] Failed to reset {name} memory: {str(e)}"
) from e
def _get_memory_systems(self):
"""Get all available memory systems with their configuration.
Returns:
Dict containing all memory systems with their reset functions and display names.
"""
def default_reset(memory):
return memory.reset()
def knowledge_reset(memory):
return self.reset_knowledge(memory)
# Get knowledge for agents
agent_knowledges = [getattr(agent, "knowledge", None) for agent in self.agents
if getattr(agent, "knowledge", None) is not None]
# Get knowledge for crew and agents
crew_knowledge = getattr(self, "knowledge", None)
crew_and_agent_knowledges = ([crew_knowledge] if crew_knowledge is not None else []) + agent_knowledges
return {
'short': {
'system': getattr(self, "_short_term_memory", None),
'reset': default_reset,
'name': 'Short Term'
},
'entity': {
'system': getattr(self, "_entity_memory", None),
'reset': default_reset,
'name': 'Entity'
},
'external': {
'system': getattr(self, "_external_memory", None),
'reset': default_reset,
'name': 'External'
},
'long': {
'system': getattr(self, "_long_term_memory", None),
'reset': default_reset,
'name': 'Long Term'
},
'kickoff_outputs': {
'system': getattr(self, "_task_output_handler", None),
'reset': default_reset,
'name': 'Task Output'
},
'knowledge': {
'system': crew_and_agent_knowledges if crew_and_agent_knowledges else None,
'reset': knowledge_reset,
'name': 'Crew Knowledge and Agent Knowledge'
},
'agent_knowledge': {
'system': agent_knowledges if agent_knowledges else None,
'reset': knowledge_reset,
'name': 'Agent Knowledge'
}
}
def reset_knowledge(self, knowledges: List[Knowledge]) -> None:
"""Reset crew and agent knowledge storage."""
for ks in knowledges:
ks.reset()