From e6100debac0fd88dd54975519485419fdd136bff Mon Sep 17 00:00:00 2001 From: Nicolas Lorin Date: Thu, 6 Feb 2025 21:19:22 +0100 Subject: [PATCH 1/6] agent: improve knowledge naming (#2041) --- src/crewai/agent.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/crewai/agent.py b/src/crewai/agent.py index dec0effd7..a222995c6 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,3 +1,4 @@ +import re import shutil import subprocess from typing import Any, Dict, List, Literal, Optional, Union @@ -153,7 +154,8 @@ class Agent(BaseAgent): def _set_knowledge(self): try: if self.knowledge_sources: - knowledge_agent_name = f"{self.role.replace(' ', '_')}" + full_pattern = re.compile(r'[^a-zA-Z0-9\-_\r\n]|(\.\.)') + knowledge_agent_name = f"{re.sub(full_pattern, '_', self.role)}" if isinstance(self.knowledge_sources, list) and all( isinstance(k, BaseKnowledgeSource) for k in self.knowledge_sources ): From 5a8649a97f9ff0c92e791b8f28c95f06c040603e Mon Sep 17 00:00:00 2001 From: hyjbrave Date: Fri, 7 Feb 2025 23:38:15 +0800 Subject: [PATCH 2/6] fix unstructured example flow (#2052) --- docs/concepts/flows.mdx | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/concepts/flows.mdx b/docs/concepts/flows.mdx index 9820d3465..c22a619fe 100644 --- a/docs/concepts/flows.mdx +++ b/docs/concepts/flows.mdx @@ -232,18 +232,18 @@ class UnstructuredExampleFlow(Flow): def first_method(self): # The state automatically includes an 'id' field print(f"State ID: {self.state['id']}") - self.state.message = "Hello from structured flow" - self.state.counter = 0 + self.state['counter'] = 0 + self.state['message'] = "Hello from structured flow" @listen(first_method) def second_method(self): - self.state.counter += 1 - self.state.message += " - updated" + self.state['counter'] += 1 + self.state['message'] += " - updated" @listen(second_method) def third_method(self): - self.state.counter += 1 - self.state.message += " - updated again" + self.state['counter'] += 1 + self.state['message'] += " - updated again" print(f"State after third_method: {self.state}") From f6c29826194faa9ea7875681ec5549f9acd51cce Mon Sep 17 00:00:00 2001 From: "Brandon Hancock (bhancock_ai)" <109994880+bhancockio@users.noreply.github.com> Date: Fri, 7 Feb 2025 10:58:38 -0500 Subject: [PATCH 3/6] fix manager (#2056) --- src/crewai/crew.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 0d702b45a..6ec5520a0 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -681,12 +681,7 @@ class Crew(BaseModel): manager.tools = [] raise Exception("Manager agent should not have tools") else: - self.manager_llm = ( - getattr(self.manager_llm, "model_name", None) - or getattr(self.manager_llm, "model", None) - or getattr(self.manager_llm, "deployment_name", None) - or self.manager_llm - ) + self.manager_llm = create_llm(self.manager_llm) manager = Agent( role=i18n.retrieve("hierarchical_manager_agent", "role"), goal=i18n.retrieve("hierarchical_manager_agent", "goal"), From fa26f6ebae5f33fdcb905ec75c8d87d3f5f6f0ce Mon Sep 17 00:00:00 2001 From: Vidit Ostwal <110953813+Vidit-Ostwal@users.noreply.github.com> Date: Fri, 7 Feb 2025 23:19:25 +0530 Subject: [PATCH 4/6] Added reset memories function inside crew class (#2047) * Added reset memories function inside crew class * Fixed typos * Refractored the code * Refactor memory reset functionality in Crew class - Improved error handling and logging for memory reset operations - Added private methods to modularize memory reset logic - Enhanced type hints and docstrings - Updated CLI reset memories command to use new Crew method - Added utility function to get crew instance in CLI utils * fix linting issues * knowledge: Add null check in reset method for storage * cli: Update memory reset tests to use Crew's reset_memories method * cli: Enhance memory reset command with improved error handling and validation --------- Co-authored-by: Lorenze Jay Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com> --- src/crewai/cli/reset_memories_command.py | 48 +++++++------ src/crewai/cli/utils.py | 62 +++++++++++++++++ src/crewai/crew.py | 78 +++++++++++++++++++++ src/crewai/knowledge/knowledge.py | 6 ++ tests/cli/cli_test.py | 89 +++++++++++++----------- 5 files changed, 223 insertions(+), 60 deletions(-) diff --git a/src/crewai/cli/reset_memories_command.py b/src/crewai/cli/reset_memories_command.py index 554232f52..4f7f1beb6 100644 --- a/src/crewai/cli/reset_memories_command.py +++ b/src/crewai/cli/reset_memories_command.py @@ -2,6 +2,7 @@ import subprocess import click +from crewai.cli.utils import get_crew from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage from crewai.memory.entity.entity_memory import EntityMemory from crewai.memory.long_term.long_term_memory import LongTermMemory @@ -30,30 +31,35 @@ def reset_memories_command( """ try: + crew = get_crew() + if not crew: + raise ValueError("No crew found.") if all: - ShortTermMemory().reset() - EntityMemory().reset() - LongTermMemory().reset() - TaskOutputStorageHandler().reset() - KnowledgeStorage().reset() + crew.reset_memories(command_type="all") click.echo("All memories have been reset.") - else: - if long: - LongTermMemory().reset() - click.echo("Long term memory has been reset.") + return - if short: - ShortTermMemory().reset() - click.echo("Short term memory has been reset.") - if entity: - EntityMemory().reset() - click.echo("Entity memory has been reset.") - if kickoff_outputs: - TaskOutputStorageHandler().reset() - click.echo("Latest Kickoff outputs stored has been reset.") - if knowledge: - KnowledgeStorage().reset() - click.echo("Knowledge has been reset.") + if not any([long, short, entity, kickoff_outputs, knowledge]): + click.echo( + "No memory type specified. Please specify at least one type to reset." + ) + return + + if long: + crew.reset_memories(command_type="long") + click.echo("Long term memory has been reset.") + if short: + crew.reset_memories(command_type="short") + click.echo("Short term memory has been reset.") + if entity: + crew.reset_memories(command_type="entity") + click.echo("Entity memory has been reset.") + if kickoff_outputs: + crew.reset_memories(command_type="kickoff_outputs") + click.echo("Latest Kickoff outputs stored has been reset.") + if knowledge: + crew.reset_memories(command_type="knowledge") + click.echo("Knowledge has been reset.") except subprocess.CalledProcessError as e: click.echo(f"An error occurred while resetting the memories: {e}", err=True) diff --git a/src/crewai/cli/utils.py b/src/crewai/cli/utils.py index a385e1f37..60eb2488a 100644 --- a/src/crewai/cli/utils.py +++ b/src/crewai/cli/utils.py @@ -9,6 +9,7 @@ import tomli from rich.console import Console from crewai.cli.constants import ENV_VARS +from crewai.crew import Crew if sys.version_info >= (3, 11): import tomllib @@ -247,3 +248,64 @@ def write_env_file(folder_path, env_vars): with open(env_file_path, "w") as file: for key, value in env_vars.items(): file.write(f"{key}={value}\n") + + +def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None: + """Get the crew instance from the crew.py file.""" + try: + import importlib.util + import os + + for root, _, files in os.walk("."): + if "crew.py" in files: + crew_path = os.path.join(root, "crew.py") + try: + spec = importlib.util.spec_from_file_location( + "crew_module", crew_path + ) + if not spec or not spec.loader: + continue + module = importlib.util.module_from_spec(spec) + try: + sys.modules[spec.name] = module + spec.loader.exec_module(module) + + for attr_name in dir(module): + attr = getattr(module, attr_name) + try: + if callable(attr) and hasattr(attr, "crew"): + crew_instance = attr().crew() + return crew_instance + + except Exception as e: + print(f"Error processing attribute {attr_name}: {e}") + continue + + except Exception as exec_error: + print(f"Error executing module: {exec_error}") + import traceback + + print(f"Traceback: {traceback.format_exc()}") + + except (ImportError, AttributeError) as e: + if require: + console.print( + f"Error importing crew from {crew_path}: {str(e)}", + style="bold red", + ) + continue + + break + + if require: + console.print("No valid Crew instance found in crew.py", style="bold red") + raise SystemExit + return None + + except Exception as e: + if require: + console.print( + f"Unexpected error while loading crew: {str(e)}", style="bold red" + ) + raise SystemExit + return None diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 6ec5520a0..6b500e097 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -1,6 +1,7 @@ import asyncio import json import re +import sys import uuid import warnings from concurrent.futures import Future @@ -1147,3 +1148,80 @@ class Crew(BaseModel): def __repr__(self): return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})" + + def reset_memories(self, command_type: str) -> None: + """Reset specific or all memories for the crew. + + Args: + command_type: Type of memory to reset. + Valid options: 'long', 'short', 'entity', 'knowledge', + 'kickoff_outputs', or 'all' + + Raises: + ValueError: If an invalid command type is provided. + RuntimeError: If memory reset operation fails. + """ + VALID_TYPES = frozenset( + ["long", "short", "entity", "knowledge", "kickoff_outputs", "all"] + ) + + if command_type not in VALID_TYPES: + raise ValueError( + f"Invalid command type. Must be one of: {', '.join(sorted(VALID_TYPES))}" + ) + + try: + if command_type == "all": + self._reset_all_memories() + else: + self._reset_specific_memory(command_type) + + self._logger.log("info", f"{command_type} memory has been reset") + + except Exception as e: + error_msg = f"Failed to reset {command_type} memory: {str(e)}" + self._logger.log("error", error_msg) + raise RuntimeError(error_msg) from e + + def _reset_all_memories(self) -> None: + """Reset all available memory systems.""" + memory_systems = [ + ("short term", self._short_term_memory), + ("entity", self._entity_memory), + ("long term", self._long_term_memory), + ("task output", self._task_output_handler), + ("knowledge", self.knowledge), + ] + + for name, system in memory_systems: + if system is not None: + try: + system.reset() + except Exception as e: + raise RuntimeError(f"Failed to reset {name} memory") from e + + def _reset_specific_memory(self, memory_type: str) -> None: + """Reset a specific memory system. + + Args: + memory_type: Type of memory to reset + + Raises: + RuntimeError: If the specified memory system fails to reset + """ + reset_functions = { + "long": (self._long_term_memory, "long term"), + "short": (self._short_term_memory, "short term"), + "entity": (self._entity_memory, "entity"), + "knowledge": (self.knowledge, "knowledge"), + "kickoff_outputs": (self._task_output_handler, "task output"), + } + + memory_system, name = reset_functions[memory_type] + if memory_system is None: + raise RuntimeError(f"{name} memory system is not initialized") + + try: + memory_system.reset() + except Exception as e: + raise RuntimeError(f"Failed to reset {name} memory") from e diff --git a/src/crewai/knowledge/knowledge.py b/src/crewai/knowledge/knowledge.py index d1d4ede6c..da1db90a8 100644 --- a/src/crewai/knowledge/knowledge.py +++ b/src/crewai/knowledge/knowledge.py @@ -67,3 +67,9 @@ class Knowledge(BaseModel): source.add() except Exception as e: raise e + + def reset(self) -> None: + if self.storage: + self.storage.reset() + else: + raise ValueError("Storage is not initialized.") diff --git a/tests/cli/cli_test.py b/tests/cli/cli_test.py index 15ed81637..dc0c502b7 100644 --- a/tests/cli/cli_test.py +++ b/tests/cli/cli_test.py @@ -55,72 +55,83 @@ def test_train_invalid_string_iterations(train_crew, runner): ) -@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory") -@mock.patch("crewai.cli.reset_memories_command.EntityMemory") -@mock.patch("crewai.cli.reset_memories_command.LongTermMemory") -@mock.patch("crewai.cli.reset_memories_command.TaskOutputStorageHandler") -def test_reset_all_memories( - MockTaskOutputStorageHandler, - MockLongTermMemory, - MockEntityMemory, - MockShortTermMemory, - runner, -): - result = runner.invoke(reset_memories, ["--all"]) - MockShortTermMemory().reset.assert_called_once() - MockEntityMemory().reset.assert_called_once() - MockLongTermMemory().reset.assert_called_once() - MockTaskOutputStorageHandler().reset.assert_called_once() +@mock.patch("crewai.cli.reset_memories_command.get_crew") +def test_reset_all_memories(mock_get_crew, runner): + mock_crew = mock.Mock() + mock_get_crew.return_value = mock_crew + result = runner.invoke(reset_memories, ["-a"]) + mock_crew.reset_memories.assert_called_once_with(command_type="all") assert result.output == "All memories have been reset.\n" -@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory") -def test_reset_short_term_memories(MockShortTermMemory, runner): +@mock.patch("crewai.cli.reset_memories_command.get_crew") +def test_reset_short_term_memories(mock_get_crew, runner): + mock_crew = mock.Mock() + mock_get_crew.return_value = mock_crew result = runner.invoke(reset_memories, ["-s"]) - MockShortTermMemory().reset.assert_called_once() + + mock_crew.reset_memories.assert_called_once_with(command_type="short") assert result.output == "Short term memory has been reset.\n" -@mock.patch("crewai.cli.reset_memories_command.EntityMemory") -def test_reset_entity_memories(MockEntityMemory, runner): +@mock.patch("crewai.cli.reset_memories_command.get_crew") +def test_reset_entity_memories(mock_get_crew, runner): + mock_crew = mock.Mock() + mock_get_crew.return_value = mock_crew result = runner.invoke(reset_memories, ["-e"]) - MockEntityMemory().reset.assert_called_once() + + mock_crew.reset_memories.assert_called_once_with(command_type="entity") assert result.output == "Entity memory has been reset.\n" -@mock.patch("crewai.cli.reset_memories_command.LongTermMemory") -def test_reset_long_term_memories(MockLongTermMemory, runner): +@mock.patch("crewai.cli.reset_memories_command.get_crew") +def test_reset_long_term_memories(mock_get_crew, runner): + mock_crew = mock.Mock() + mock_get_crew.return_value = mock_crew result = runner.invoke(reset_memories, ["-l"]) - MockLongTermMemory().reset.assert_called_once() + + mock_crew.reset_memories.assert_called_once_with(command_type="long") assert result.output == "Long term memory has been reset.\n" -@mock.patch("crewai.cli.reset_memories_command.TaskOutputStorageHandler") -def test_reset_kickoff_outputs(MockTaskOutputStorageHandler, runner): +@mock.patch("crewai.cli.reset_memories_command.get_crew") +def test_reset_kickoff_outputs(mock_get_crew, runner): + mock_crew = mock.Mock() + mock_get_crew.return_value = mock_crew result = runner.invoke(reset_memories, ["-k"]) - MockTaskOutputStorageHandler().reset.assert_called_once() + + mock_crew.reset_memories.assert_called_once_with(command_type="kickoff_outputs") assert result.output == "Latest Kickoff outputs stored has been reset.\n" -@mock.patch("crewai.cli.reset_memories_command.ShortTermMemory") -@mock.patch("crewai.cli.reset_memories_command.LongTermMemory") -def test_reset_multiple_memory_flags(MockShortTermMemory, MockLongTermMemory, runner): - result = runner.invoke( - reset_memories, - [ - "-s", - "-l", - ], +@mock.patch("crewai.cli.reset_memories_command.get_crew") +def test_reset_multiple_memory_flags(mock_get_crew, runner): + mock_crew = mock.Mock() + mock_get_crew.return_value = mock_crew + result = runner.invoke(reset_memories, ["-s", "-l"]) + + # Check that reset_memories was called twice with the correct arguments + assert mock_crew.reset_memories.call_count == 2 + mock_crew.reset_memories.assert_has_calls( + [mock.call(command_type="long"), mock.call(command_type="short")] ) - MockShortTermMemory().reset.assert_called_once() - MockLongTermMemory().reset.assert_called_once() assert ( result.output == "Long term memory has been reset.\nShort term memory has been reset.\n" ) +@mock.patch("crewai.cli.reset_memories_command.get_crew") +def test_reset_knowledge(mock_get_crew, runner): + mock_crew = mock.Mock() + mock_get_crew.return_value = mock_crew + result = runner.invoke(reset_memories, ["--knowledge"]) + + mock_crew.reset_memories.assert_called_once_with(command_type="knowledge") + assert result.output == "Knowledge has been reset.\n" + + def test_reset_no_memory_flags(runner): result = runner.invoke( reset_memories, From 0cc02d9492c90b944ac2b7e3b1d80933a9e4f718 Mon Sep 17 00:00:00 2001 From: Vidit Ostwal <110953813+Vidit-Ostwal@users.noreply.github.com> Date: Fri, 7 Feb 2025 23:46:44 +0530 Subject: [PATCH 5/6] Added support for logging in JSON format as well. (#1985) * Added functionality to have json format as well for the logs * Added additional comments, refractored logging functionality * Fixed documentation to include the new paramter * Fixed typo * Added a Pydantic Error Check between output_log_file and save_as_json parameter * Removed the save_to_json parameter, incorporated the functionality directly with output_log_file * Fixed typo * Sorted the imports using isort --------- Co-authored-by: Vidit Ostwal Co-authored-by: Brandon Hancock (bhancock_ai) <109994880+bhancockio@users.noreply.github.com> --- docs/concepts/crews.mdx | 27 +++++++++--- src/crewai/crew.py | 5 ++- src/crewai/utilities/file_handler.py | 64 +++++++++++++++++++++------- 3 files changed, 74 insertions(+), 22 deletions(-) diff --git a/docs/concepts/crews.mdx b/docs/concepts/crews.mdx index 11ba3387e..3792e752d 100644 --- a/docs/concepts/crews.mdx +++ b/docs/concepts/crews.mdx @@ -23,14 +23,14 @@ A crew in crewAI represents a collaborative group of agents working together to | **Language** _(optional)_ | `language` | Language used for the crew, defaults to English. | | **Language File** _(optional)_ | `language_file` | Path to the language file to be used for the crew. | | **Memory** _(optional)_ | `memory` | Utilized for storing execution memories (short-term, long-term, entity memory). | -| **Memory Config** _(optional)_ | `memory_config` | Configuration for the memory provider to be used by the crew. | -| **Cache** _(optional)_ | `cache` | Specifies whether to use a cache for storing the results of tools' execution. Defaults to `True`. | -| **Embedder** _(optional)_ | `embedder` | Configuration for the embedder to be used by the crew. Mostly used by memory for now. Default is `{"provider": "openai"}`. | -| **Full Output** _(optional)_ | `full_output` | Whether the crew should return the full output with all tasks outputs or just the final output. Defaults to `False`. | +| **Memory Config** _(optional)_ | `memory_config` | Configuration for the memory provider to be used by the crew. | +| **Cache** _(optional)_ | `cache` | Specifies whether to use a cache for storing the results of tools' execution. Defaults to `True`. | +| **Embedder** _(optional)_ | `embedder` | Configuration for the embedder to be used by the crew. Mostly used by memory for now. Default is `{"provider": "openai"}`. | +| **Full Output** _(optional)_ | `full_output` | Whether the crew should return the full output with all tasks outputs or just the final output. Defaults to `False`. | | **Step Callback** _(optional)_ | `step_callback` | A function that is called after each step of every agent. This can be used to log the agent's actions or to perform other operations; it won't override the agent-specific `step_callback`. | | **Task Callback** _(optional)_ | `task_callback` | A function that is called after the completion of each task. Useful for monitoring or additional operations post-task execution. | | **Share Crew** _(optional)_ | `share_crew` | Whether you want to share the complete crew information and execution with the crewAI team to make the library better, and allow us to train models. | -| **Output Log File** _(optional)_ | `output_log_file` | Whether you want to have a file with the complete crew output and execution. You can set it using True and it will default to the folder you are currently in and it will be called logs.txt or passing a string with the full path and name of the file. | +| **Output Log File** _(optional)_ | `output_log_file` | Set to True to save logs as logs.txt in the current directory or provide a file path. Logs will be in JSON format if the filename ends in .json, otherwise .txt. Defautls to `None`. | | **Manager Agent** _(optional)_ | `manager_agent` | `manager` sets a custom agent that will be used as a manager. | | **Prompt File** _(optional)_ | `prompt_file` | Path to the prompt JSON file to be used for the crew. | | **Planning** *(optional)* | `planning` | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description. | @@ -240,6 +240,23 @@ print(f"Tasks Output: {crew_output.tasks_output}") print(f"Token Usage: {crew_output.token_usage}") ``` +## Accessing Crew Logs + +You can see real time log of the crew execution, by setting `output_log_file` as a `True(Boolean)` or a `file_name(str)`. Supports logging of events as both `file_name.txt` and `file_name.json`. +In case of `True(Boolean)` will save as `logs.txt`. + +In case of `output_log_file` is set as `False(Booelan)` or `None`, the logs will not be populated. + +```python Code +# Save crew logs +crew = Crew(output_log_file = True) # Logs will be saved as logs.txt +crew = Crew(output_log_file = file_name) # Logs will be saved as file_name.txt +crew = Crew(output_log_file = file_name.txt) # Logs will be saved as file_name.txt +crew = Crew(output_log_file = file_name.json) # Logs will be saved as file_name.json +``` + + + ## Memory Utilization Crews can utilize memory (short-term, long-term, and entity memory) to enhance their execution and learning over time. This feature allows crews to store and recall execution memories, aiding in decision-making and task execution strategies. diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 6b500e097..85d7955fb 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -184,9 +184,9 @@ class Crew(BaseModel): default=None, description="Path to the prompt json file to be used for the crew.", ) - output_log_file: Optional[str] = Field( + output_log_file: Optional[Union[bool, str]] = Field( default=None, - description="output_log_file", + description="Path to the log file to be saved", ) planning: Optional[bool] = Field( default=False, @@ -440,6 +440,7 @@ class Crew(BaseModel): ) return self + @property def key(self) -> str: source = [agent.key for agent in self.agents] + [ diff --git a/src/crewai/utilities/file_handler.py b/src/crewai/utilities/file_handler.py index bb97b940f..85d9766c5 100644 --- a/src/crewai/utilities/file_handler.py +++ b/src/crewai/utilities/file_handler.py @@ -1,30 +1,64 @@ +import json import os import pickle from datetime import datetime +from typing import Union class FileHandler: - """take care of file operations, currently it only logs messages to a file""" + """Handler for file operations supporting both JSON and text-based logging. + + Args: + file_path (Union[bool, str]): Path to the log file or boolean flag + """ - def __init__(self, file_path): - if isinstance(file_path, bool): + def __init__(self, file_path: Union[bool, str]): + self._initialize_path(file_path) + + def _initialize_path(self, file_path: Union[bool, str]): + if file_path is True: # File path is boolean True self._path = os.path.join(os.curdir, "logs.txt") - elif isinstance(file_path, str): - self._path = file_path + + elif isinstance(file_path, str): # File path is a string + if file_path.endswith((".json", ".txt")): + self._path = file_path # No modification if the file ends with .json or .txt + else: + self._path = file_path + ".txt" # Append .txt if the file doesn't end with .json or .txt + else: - raise ValueError("file_path must be either a boolean or a string.") - + raise ValueError("file_path must be a string or boolean.") # Handle the case where file_path isn't valid + def log(self, **kwargs): - now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - message = ( - f"{now}: " - + ", ".join([f'{key}="{value}"' for key, value in kwargs.items()]) - + "\n" - ) - with open(self._path, "a", encoding="utf-8") as file: - file.write(message + "\n") + try: + now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + log_entry = {"timestamp": now, **kwargs} + if self._path.endswith(".json"): + # Append log in JSON format + with open(self._path, "a", encoding="utf-8") as file: + # If the file is empty, start with a list; else, append to it + try: + # Try reading existing content to avoid overwriting + with open(self._path, "r", encoding="utf-8") as read_file: + existing_data = json.load(read_file) + existing_data.append(log_entry) + except (json.JSONDecodeError, FileNotFoundError): + # If no valid JSON or file doesn't exist, start with an empty list + existing_data = [log_entry] + + with open(self._path, "w", encoding="utf-8") as write_file: + json.dump(existing_data, write_file, indent=4) + write_file.write("\n") + + else: + # Append log in plain text format + message = f"{now}: " + ", ".join([f"{key}=\"{value}\"" for key, value in kwargs.items()]) + "\n" + with open(self._path, "a", encoding="utf-8") as file: + file.write(message) + except Exception as e: + raise ValueError(f"Failed to log message: {str(e)}") + class PickleHandler: def __init__(self, file_name: str) -> None: """ From a7f5d574dc6e521dec51453d9d842cf1cee908c9 Mon Sep 17 00:00:00 2001 From: "Brandon Hancock (bhancock_ai)" <109994880+bhancockio@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:45:36 -0500 Subject: [PATCH 6/6] General Clean UP (#2042) * clean up. fix type safety. address memory config docs * improve manager * Include fix for o1 models not supporting system messages * more broad with o1 * address fix: Typo in expected_output string #2045 * drop prints * drop prints * wip * wip * fix failing memory tests * Fix memory provider issue * clean up short term memory * revert ltm * drop --- docs/concepts/memory.mdx | 38 ++++++++++++------- src/crewai/agent.py | 24 +++++------- src/crewai/agents/agent_builder/base_agent.py | 12 +++--- src/crewai/flow/flow.py | 33 ++++++++++------ src/crewai/llm.py | 7 ++++ src/crewai/memory/entity/entity_memory.py | 24 ++++++++---- .../memory/long_term/long_term_memory.py | 2 +- src/crewai/memory/memory.py | 12 ++++-- .../memory/short_term/short_term_memory.py | 21 ++++++---- .../tools/agent_tools/add_image_tool.py | 11 ++---- src/crewai/translations/en.json | 2 +- tests/agent_test.py | 4 +- 12 files changed, 113 insertions(+), 77 deletions(-) diff --git a/docs/concepts/memory.mdx b/docs/concepts/memory.mdx index 6eaf8553e..a725c41e7 100644 --- a/docs/concepts/memory.mdx +++ b/docs/concepts/memory.mdx @@ -185,7 +185,12 @@ my_crew = Crew( process=Process.sequential, memory=True, verbose=True, - embedder=OpenAIEmbeddingFunction(api_key=os.getenv("OPENAI_API_KEY"), model="text-embedding-3-small"), + embedder={ + "provider": "openai", + "config": { + "model": 'text-embedding-3-small' + } + } ) ``` @@ -242,13 +247,15 @@ my_crew = Crew( process=Process.sequential, memory=True, verbose=True, - embedder=OpenAIEmbeddingFunction( - api_key="YOUR_API_KEY", - api_base="YOUR_API_BASE_PATH", - api_type="azure", - api_version="YOUR_API_VERSION", - model="text-embedding-3-small" - ) + embedder={ + "provider": "openai", + "config": { + "api_key": "YOUR_API_KEY", + "api_base": "YOUR_API_BASE_PATH", + "api_version": "YOUR_API_VERSION", + "model_name": 'text-embedding-3-small' + } + } ) ``` @@ -264,12 +271,15 @@ my_crew = Crew( process=Process.sequential, memory=True, verbose=True, - embedder=GoogleVertexEmbeddingFunction( - project_id="YOUR_PROJECT_ID", - region="YOUR_REGION", - api_key="YOUR_API_KEY", - model="textembedding-gecko" - ) + embedder={ + "provider": "vertexai", + "config": { + "project_id"="YOUR_PROJECT_ID", + "region"="YOUR_REGION", + "api_key"="YOUR_API_KEY", + "model_name"="textembedding-gecko" + } + } ) ``` diff --git a/src/crewai/agent.py b/src/crewai/agent.py index a222995c6..2ab8228eb 100644 --- a/src/crewai/agent.py +++ b/src/crewai/agent.py @@ -1,7 +1,7 @@ import re import shutil import subprocess -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Sequence, Union from pydantic import Field, InstanceOf, PrivateAttr, model_validator @@ -55,7 +55,6 @@ class Agent(BaseAgent): llm: The language model that will run the agent. function_calling_llm: The language model that will handle the tool calling for this agent, it overrides the crew function_calling_llm. max_iter: Maximum number of iterations for an agent to execute a task. - memory: Whether the agent should have memory or not. max_rpm: Maximum number of requests per minute for the agent execution to be respected. verbose: Whether the agent execution should be in verbose mode. allow_delegation: Whether the agent is allowed to delegate tasks to other agents. @@ -72,9 +71,6 @@ class Agent(BaseAgent): ) agent_ops_agent_name: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str") agent_ops_agent_id: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str") - cache_handler: InstanceOf[CacheHandler] = Field( - default=None, description="An instance of the CacheHandler class." - ) step_callback: Optional[Any] = Field( default=None, description="Callback to be executed after each step of the agent execution.", @@ -108,10 +104,6 @@ class Agent(BaseAgent): default=True, description="Keep messages under the context window size by summarizing content.", ) - max_iter: int = Field( - default=20, - description="Maximum number of iterations for an agent to execute a task before giving it's best answer", - ) max_retry_limit: int = Field( default=2, description="Maximum number of retries for an agent to execute a task when an error occurs.", @@ -197,13 +189,15 @@ class Agent(BaseAgent): if task.output_json: # schema = json.dumps(task.output_json, indent=2) schema = generate_model_description(task.output_json) + task_prompt += "\n" + self.i18n.slice( + "formatted_task_instructions" + ).format(output_format=schema) elif task.output_pydantic: schema = generate_model_description(task.output_pydantic) - - task_prompt += "\n" + self.i18n.slice("formatted_task_instructions").format( - output_format=schema - ) + task_prompt += "\n" + self.i18n.slice( + "formatted_task_instructions" + ).format(output_format=schema) if context: task_prompt = self.i18n.slice("task_with_context").format( @@ -331,14 +325,14 @@ class Agent(BaseAgent): tools = agent_tools.tools() return tools - def get_multimodal_tools(self) -> List[Tool]: + def get_multimodal_tools(self) -> Sequence[BaseTool]: from crewai.tools.agent_tools.add_image_tool import AddImageTool return [AddImageTool()] def get_code_execution_tools(self): try: - from crewai_tools import CodeInterpreterTool + from crewai_tools import CodeInterpreterTool # type: ignore # Set the unsafe_mode based on the code_execution_mode attribute unsafe_mode = self.code_execution_mode == "unsafe" diff --git a/src/crewai/agents/agent_builder/base_agent.py b/src/crewai/agents/agent_builder/base_agent.py index a8c829a2a..e602e42a9 100644 --- a/src/crewai/agents/agent_builder/base_agent.py +++ b/src/crewai/agents/agent_builder/base_agent.py @@ -24,6 +24,7 @@ from crewai.tools import BaseTool from crewai.tools.base_tool import Tool from crewai.utilities import I18N, Logger, RPMController from crewai.utilities.config import process_config +from crewai.utilities.converter import Converter T = TypeVar("T", bound="BaseAgent") @@ -42,7 +43,7 @@ class BaseAgent(ABC, BaseModel): max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution. allow_delegation (bool): Allow delegation of tasks to agents. tools (Optional[List[Any]]): Tools at the agent's disposal. - max_iter (Optional[int]): Maximum iterations for an agent to execute a task. + max_iter (int): Maximum iterations for an agent to execute a task. agent_executor (InstanceOf): An instance of the CrewAgentExecutor class. llm (Any): Language model that will run the agent. crew (Any): Crew to which the agent belongs. @@ -114,7 +115,7 @@ class BaseAgent(ABC, BaseModel): tools: Optional[List[Any]] = Field( default_factory=list, description="Tools at agents' disposal" ) - max_iter: Optional[int] = Field( + max_iter: int = Field( default=25, description="Maximum iterations for an agent to execute a task" ) agent_executor: InstanceOf = Field( @@ -125,11 +126,12 @@ class BaseAgent(ABC, BaseModel): ) crew: Any = Field(default=None, description="Crew to which the agent belongs.") i18n: I18N = Field(default=I18N(), description="Internationalization settings.") - cache_handler: InstanceOf[CacheHandler] = Field( + cache_handler: Optional[InstanceOf[CacheHandler]] = Field( default=None, description="An instance of the CacheHandler class." ) tools_handler: InstanceOf[ToolsHandler] = Field( - default=None, description="An instance of the ToolsHandler class." + default_factory=ToolsHandler, + description="An instance of the ToolsHandler class.", ) max_tokens: Optional[int] = Field( default=None, description="Maximum number of tokens for the agent's execution." @@ -254,7 +256,7 @@ class BaseAgent(ABC, BaseModel): @abstractmethod def get_output_converter( self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str - ): + ) -> Converter: """Get the converter class for the agent to create json/pydantic outputs.""" pass diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index b744ba6ad..382a792e5 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -600,7 +600,7 @@ class Flow(Generic[T], metaclass=FlowMeta): ``` """ try: - if not hasattr(self, '_state'): + if not hasattr(self, "_state"): return "" if isinstance(self._state, dict): @@ -706,26 +706,31 @@ class Flow(Generic[T], metaclass=FlowMeta): inputs: Optional dictionary containing input values and potentially a state ID to restore """ # Handle state restoration if ID is provided in inputs - if inputs and 'id' in inputs and self._persistence is not None: - restore_uuid = inputs['id'] + if inputs and "id" in inputs and self._persistence is not None: + restore_uuid = inputs["id"] stored_state = self._persistence.load_state(restore_uuid) # Override the id in the state if it exists in inputs - if 'id' in inputs: + if "id" in inputs: if isinstance(self._state, dict): - self._state['id'] = inputs['id'] + self._state["id"] = inputs["id"] elif isinstance(self._state, BaseModel): - setattr(self._state, 'id', inputs['id']) + setattr(self._state, "id", inputs["id"]) if stored_state: - self._log_flow_event(f"Loading flow state from memory for UUID: {restore_uuid}", color="yellow") + self._log_flow_event( + f"Loading flow state from memory for UUID: {restore_uuid}", + color="yellow", + ) # Restore the state self._restore_state(stored_state) else: - self._log_flow_event(f"No flow state found for UUID: {restore_uuid}", color="red") + self._log_flow_event( + f"No flow state found for UUID: {restore_uuid}", color="red" + ) # Apply any additional inputs after restoration - filtered_inputs = {k: v for k, v in inputs.items() if k != 'id'} + filtered_inputs = {k: v for k, v in inputs.items() if k != "id"} if filtered_inputs: self._initialize_state(filtered_inputs) @@ -737,9 +742,11 @@ class Flow(Generic[T], metaclass=FlowMeta): flow_name=self.__class__.__name__, ), ) - self._log_flow_event(f"Flow started with ID: {self.flow_id}", color="bold_magenta") + self._log_flow_event( + f"Flow started with ID: {self.flow_id}", color="bold_magenta" + ) - if inputs is not None and 'id' not in inputs: + if inputs is not None and "id" not in inputs: self._initialize_state(inputs) return asyncio.run(self.kickoff_async()) @@ -984,7 +991,9 @@ class Flow(Generic[T], metaclass=FlowMeta): traceback.print_exc() - def _log_flow_event(self, message: str, color: str = "yellow", level: str = "info") -> None: + def _log_flow_event( + self, message: str, color: str = "yellow", level: str = "info" + ) -> None: """Centralized logging method for flow events. This method provides a consistent interface for logging flow-related events, diff --git a/src/crewai/llm.py b/src/crewai/llm.py index d8f2be230..d6be4b588 100644 --- a/src/crewai/llm.py +++ b/src/crewai/llm.py @@ -221,6 +221,13 @@ class LLM: if isinstance(messages, str): messages = [{"role": "user", "content": messages}] + # For O1 models, system messages are not supported. + # Convert any system messages into assistant messages. + if "o1" in self.model.lower(): + for message in messages: + if message.get("role") == "system": + message["role"] = "assistant" + with suppress_warnings(): if callbacks and len(callbacks) > 0: self.set_callbacks(callbacks) diff --git a/src/crewai/memory/entity/entity_memory.py b/src/crewai/memory/entity/entity_memory.py index 67c72e927..536da72e4 100644 --- a/src/crewai/memory/entity/entity_memory.py +++ b/src/crewai/memory/entity/entity_memory.py @@ -1,3 +1,7 @@ +from typing import Any, Optional + +from pydantic import PrivateAttr + from crewai.memory.entity.entity_memory_item import EntityMemoryItem from crewai.memory.memory import Memory from crewai.memory.storage.rag_storage import RAGStorage @@ -10,13 +14,15 @@ class EntityMemory(Memory): Inherits from the Memory class. """ - def __init__(self, crew=None, embedder_config=None, storage=None, path=None): - if hasattr(crew, "memory_config") and crew.memory_config is not None: - self.memory_provider = crew.memory_config.get("provider") - else: - self.memory_provider = None + _memory_provider: Optional[str] = PrivateAttr() - if self.memory_provider == "mem0": + def __init__(self, crew=None, embedder_config=None, storage=None, path=None): + if crew and hasattr(crew, "memory_config") and crew.memory_config is not None: + memory_provider = crew.memory_config.get("provider") + else: + memory_provider = None + + if memory_provider == "mem0": try: from crewai.memory.storage.mem0_storage import Mem0Storage except ImportError: @@ -36,11 +42,13 @@ class EntityMemory(Memory): path=path, ) ) - super().__init__(storage) + + super().__init__(storage=storage) + self._memory_provider = memory_provider def save(self, item: EntityMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory" """Saves an entity item into the SQLite storage.""" - if self.memory_provider == "mem0": + if self._memory_provider == "mem0": data = f""" Remember details about the following entity: Name: {item.name} diff --git a/src/crewai/memory/long_term/long_term_memory.py b/src/crewai/memory/long_term/long_term_memory.py index 656709ac9..94aac3a97 100644 --- a/src/crewai/memory/long_term/long_term_memory.py +++ b/src/crewai/memory/long_term/long_term_memory.py @@ -17,7 +17,7 @@ class LongTermMemory(Memory): def __init__(self, storage=None, path=None): if not storage: storage = LTMSQLiteStorage(db_path=path) if path else LTMSQLiteStorage() - super().__init__(storage) + super().__init__(storage=storage) def save(self, item: LongTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory" metadata = item.metadata diff --git a/src/crewai/memory/memory.py b/src/crewai/memory/memory.py index 46af2c04d..51a700323 100644 --- a/src/crewai/memory/memory.py +++ b/src/crewai/memory/memory.py @@ -1,15 +1,19 @@ -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union + +from pydantic import BaseModel from crewai.memory.storage.rag_storage import RAGStorage -class Memory: +class Memory(BaseModel): """ Base class for memory, now supporting agent tags and generic metadata. """ - def __init__(self, storage: RAGStorage): - self.storage = storage + storage: Any + + def __init__(self, storage: Any, **data: Any): + super().__init__(storage=storage, **data) def save( self, diff --git a/src/crewai/memory/short_term/short_term_memory.py b/src/crewai/memory/short_term/short_term_memory.py index 4e5fbbb77..b7581f400 100644 --- a/src/crewai/memory/short_term/short_term_memory.py +++ b/src/crewai/memory/short_term/short_term_memory.py @@ -1,5 +1,7 @@ from typing import Any, Dict, Optional +from pydantic import PrivateAttr + from crewai.memory.memory import Memory from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem from crewai.memory.storage.rag_storage import RAGStorage @@ -14,13 +16,15 @@ class ShortTermMemory(Memory): MemoryItem instances. """ - def __init__(self, crew=None, embedder_config=None, storage=None, path=None): - if hasattr(crew, "memory_config") and crew.memory_config is not None: - self.memory_provider = crew.memory_config.get("provider") - else: - self.memory_provider = None + _memory_provider: Optional[str] = PrivateAttr() - if self.memory_provider == "mem0": + def __init__(self, crew=None, embedder_config=None, storage=None, path=None): + if crew and hasattr(crew, "memory_config") and crew.memory_config is not None: + memory_provider = crew.memory_config.get("provider") + else: + memory_provider = None + + if memory_provider == "mem0": try: from crewai.memory.storage.mem0_storage import Mem0Storage except ImportError: @@ -39,7 +43,8 @@ class ShortTermMemory(Memory): path=path, ) ) - super().__init__(storage) + super().__init__(storage=storage) + self._memory_provider = memory_provider def save( self, @@ -48,7 +53,7 @@ class ShortTermMemory(Memory): agent: Optional[str] = None, ) -> None: item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent) - if self.memory_provider == "mem0": + if self._memory_provider == "mem0": item.data = f"Remember the following insights from Agent run: {item.data}" super().save(value=item.data, metadata=item.metadata, agent=item.agent) diff --git a/src/crewai/tools/agent_tools/add_image_tool.py b/src/crewai/tools/agent_tools/add_image_tool.py index 06bdfcf5b..939dff2df 100644 --- a/src/crewai/tools/agent_tools/add_image_tool.py +++ b/src/crewai/tools/agent_tools/add_image_tool.py @@ -7,11 +7,11 @@ from crewai.utilities import I18N i18n = I18N() + class AddImageToolSchema(BaseModel): image_url: str = Field(..., description="The URL or path of the image to add") action: Optional[str] = Field( - default=None, - description="Optional context or question about the image" + default=None, description="Optional context or question about the image" ) @@ -36,10 +36,7 @@ class AddImageTool(BaseTool): "image_url": { "url": image_url, }, - } + }, ] - return { - "role": "user", - "content": content - } + return {"role": "user", "content": content} diff --git a/src/crewai/translations/en.json b/src/crewai/translations/en.json index 0c45321ea..f09f1dba0 100644 --- a/src/crewai/translations/en.json +++ b/src/crewai/translations/en.json @@ -15,7 +15,7 @@ "final_answer_format": "If you don't need to use any more tools, you must give your best complete final answer, make sure it satisfies the expected criteria, use the EXACT format below:\n\n```\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task.\n\n```", "format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nHere is the expected format I must follow:\n\n```\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n```\n This Thought/Action/Action Input/Result process can repeat N times. Once I know the final answer, I must return the following format:\n\n```\nThought: I now can give a great answer\nFinal Answer: Your final answer must be the great and the most complete as possible, it must be outcome described\n\n```", "task_with_context": "{task}\n\nThis is the context you're working with:\n{context}", - "expected_output": "\nThis is the expect criteria for your final answer: {expected_output}\nyou MUST return the actual complete content as the final answer, not a summary.", + "expected_output": "\nThis is the expected criteria for your final answer: {expected_output}\nyou MUST return the actual complete content as the final answer, not a summary.", "human_feedback": "You got human feedback on your work, re-evaluate it and give a new Final Answer when ready.\n {human_feedback}", "getting_input": "This is the agent's final answer: {final_answer}\n\n", "summarizer_system_message": "You are a helpful assistant that summarizes text.", diff --git a/tests/agent_test.py b/tests/agent_test.py index b0efef82b..e67a7454a 100644 --- a/tests/agent_test.py +++ b/tests/agent_test.py @@ -1183,7 +1183,7 @@ def test_agent_max_retry_limit(): [ mock.call( { - "input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.", + "input": "Say the word: Hi\n\nThis is the expected criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.", "tool_names": "", "tools": "", "ask_for_human_input": True, @@ -1191,7 +1191,7 @@ def test_agent_max_retry_limit(): ), mock.call( { - "input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.", + "input": "Say the word: Hi\n\nThis is the expected criteria for your final answer: The word: Hi\nyou MUST return the actual complete content as the final answer, not a summary.", "tool_names": "", "tools": "", "ask_for_human_input": True,