Merge branch 'main' into feature/procedure_v2

This commit is contained in:
Brandon Hancock
2024-07-22 09:55:03 -04:00
30 changed files with 1275 additions and 405 deletions

View File

@@ -9,6 +9,7 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
from .create_crew import create_crew
from .train_crew import train_crew
from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
@click.group()
@@ -99,5 +100,31 @@ def log_tasks_outputs() -> None:
click.echo(f"An error occurred while logging task outputs: {e}", err=True)
@crewai.command()
@click.option("-l", "--long", is_flag=True, help="Reset LONG TERM memory")
@click.option("-s", "--short", is_flag=True, help="Reset SHORT TERM memory")
@click.option("-e", "--entities", is_flag=True, help="Reset ENTITIES memory")
@click.option(
"-k",
"--kickoff-outputs",
is_flag=True,
help="Reset LATEST KICKOFF TASK OUTPUTS",
)
@click.option("-a", "--all", is_flag=True, help="Reset ALL memories")
def reset_memories(long, short, entities, kickoff_outputs, all):
"""
Reset the crew memories (long, short, entity, latest_crew_kickoff_ouputs). This will delete all the data saved.
"""
try:
if not all and not (long or short or entities or kickoff_outputs):
click.echo(
"Please specify at least one memory type to reset using the appropriate flags."
)
return
reset_memories_command(long, short, entities, kickoff_outputs, all)
except Exception as e:
click.echo(f"An error occurred while resetting memories: {e}", err=True)
if __name__ == "__main__":
crewai()

View File

@@ -0,0 +1,45 @@
import subprocess
import click
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None:
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
try:
if all:
ShortTermMemory().reset()
EntityMemory().reset()
LongTermMemory().reset()
TaskOutputStorageHandler().reset()
click.echo("All memories have been reset.")
else:
if long:
LongTermMemory().reset()
click.echo("Long term memory has been reset.")
if short:
ShortTermMemory().reset()
click.echo("Short term memory has been reset.")
if entity:
EntityMemory().reset()
click.echo("Entity memory has been reset.")
if kickoff_outputs:
TaskOutputStorageHandler().reset()
click.echo("Latest Kickoff outputs stored has been reset.")
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while resetting the memories: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -2,9 +2,15 @@
import sys
from {{folder_name}}.crew import {{crew_name}}Crew
# This main file is intended to be a way for your to run your
# crew locally, so refrain from adding necessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information
def run():
# Replace with your inputs, it will automatically interpolate any tasks and agents information
"""
Run the crew.
"""
inputs = {
'topic': 'AI LLMs'
}
@@ -15,19 +21,21 @@ def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {"topic": "AI LLMs"}
inputs = {
"topic": "AI LLMs"
}
try:
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]), inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")
def replay_from_task():
def replay():
"""
Replay the crew execution from a specific task.
"""
try:
{{crew_name}}Crew().crew().replay_from_task(task_id=sys.argv[1])
{{crew_name}}Crew().crew().replay(task_id=sys.argv[1])
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")

View File

@@ -6,12 +6,12 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.35.8" }
crewai = { extras = ["tools"], version = "^0.41.1" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run"
train = "{{folder_name}}.main:train"
replay = "{{folder_name}}.main:replay_from_task"
replay = "{{folder_name}}.main:replay"
[build-system]
requires = ["poetry-core"]

View File

@@ -39,6 +39,7 @@ from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
)
from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -73,6 +74,7 @@ class Crew(BaseModel):
task_callback: Callback to be executed after each task for every agents execution.
step_callback: Callback to be executed after each step for every agents execution.
share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models.
planning: Plan the crew execution and add the plan to the crew.
"""
__hash__ = object.__hash__ # type: ignore
@@ -149,6 +151,10 @@ class Crew(BaseModel):
default="",
description="output_log_file",
)
planning: Optional[bool] = Field(
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
task_execution_output_json_files: Optional[List[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
@@ -454,6 +460,9 @@ class Crew(BaseModel):
agent.create_agent_executor()
if self.planning:
self._handle_crew_planning()
metrics = []
if self.process == Process.sequential:
@@ -548,6 +557,19 @@ class Crew(BaseModel):
self._task_output_handler.reset()
return results
def _handle_crew_planning(self):
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")
result = CrewPlanner(self.tasks)._handle_crew_planning()
if result is not None and hasattr(result, "list_of_plans_per_task"):
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
task.description += step_plan
else:
self._logger.log(
"info", "Something went wrong with the planning process of the Crew"
)
def _store_execution_log(
self,
task: Task,
@@ -656,7 +678,6 @@ class Crew(BaseModel):
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
self._log_task_start(task, agent_to_use.role)
future = task.execute_async(
agent=agent_to_use,
context=context,
@@ -669,7 +690,6 @@ class Crew(BaseModel):
futures.clear()
context = self._get_context(task, task_outputs)
self._log_task_start(task, agent_to_use.role)
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
@@ -824,7 +844,7 @@ class Crew(BaseModel):
None,
)
def replay_from_task(
def replay(
self, task_id: str, inputs: Optional[Dict[str, Any]] = None
) -> CrewOutput:
stored_outputs = self._task_output_handler.load()

View File

@@ -23,3 +23,9 @@ class EntityMemory(Memory):
"""Saves an entity item into the SQLite storage."""
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(f"An error occurred while resetting the entity memory: {e}")

View File

@@ -30,3 +30,6 @@ class LongTermMemory(Memory):
def search(self, task: str, latest_n: int = 3) -> Dict[str, Any]:
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
def reset(self) -> None:
self.storage.reset()

View File

@@ -18,8 +18,16 @@ class ShortTermMemory(Memory):
)
super().__init__(storage)
def save(self, item: ShortTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
def save(self, item: ShortTermMemoryItem) -> None:
super().save(item.data, item.metadata, item.agent)
def search(self, query: str, score_threshold: float = 0.35):
return self.storage.search(query=query, score_threshold=score_threshold) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the short-term memory: {e}"
)

View File

@@ -9,3 +9,6 @@ class Storage:
def search(self, key: str) -> Dict[str, Any]: # type: ignore
pass
def reset(self) -> None:
pass

View File

@@ -103,3 +103,20 @@ class LTMSQLiteStorage:
color="red",
)
return None
def reset(
self,
) -> None:
"""Resets the LTM table with error handling."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM long_term_memories")
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)
return None

View File

@@ -2,6 +2,7 @@ import contextlib
import io
import logging
import os
import shutil
from typing import Any, Dict, List, Optional
from embedchain import App
@@ -71,13 +72,13 @@ class RAGStorage(Storage):
if embedder_config:
config["embedder"] = embedder_config
self.type = type
self.app = App.from_config(config=config)
self.app.llm = FakeLLM()
if allow_reset:
self.app.reset()
def save(self, value: Any, metadata: Dict[str, Any]) -> None: # type: ignore # BUG?: Should be save(key, value, metadata) Signature of "save" incompatible with supertype "Storage"
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
self._generate_embedding(value, metadata)
def search( # type: ignore # BUG?: Signature of "search" incompatible with supertype "Storage"
@@ -102,3 +103,11 @@ class RAGStorage(Storage):
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> Any:
with suppress_logging():
self.app.add(text, data_type="text", metadata=metadata)
def reset(self) -> None:
try:
shutil.rmtree(f"{db_storage_path()}/{self.type}")
except Exception as e:
raise Exception(
f"An error occurred while resetting the {self.type} memory: {e}"
)

View File

@@ -213,8 +213,8 @@ class Task(BaseModel):
tools: Optional[List[Any]],
) -> TaskOutput:
"""Run the core execution logic of the task."""
self.agent = agent
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
@@ -254,7 +254,9 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)
@@ -326,9 +328,14 @@ class Task(BaseModel):
def _create_converter(self, *args, **kwargs) -> Converter:
"""Create a converter instance."""
converter = self.agent.get_output_converter(*args, **kwargs)
if self.converter_cls:
if self.agent and not self.converter_cls:
converter = self.agent.get_output_converter(*args, **kwargs)
elif self.converter_cls:
converter = self.converter_cls(*args, **kwargs)
if not converter:
raise Exception("No output converter found or set.")
return converter
def _export_output(

View File

@@ -0,0 +1,64 @@
from typing import List, Optional
from pydantic import BaseModel
from crewai.agent import Agent
from crewai.task import Task
class PlannerTaskPydanticOutput(BaseModel):
list_of_plans_per_task: List[str]
class CrewPlanner:
def __init__(self, tasks: List[Task]):
self.tasks = tasks
def _handle_crew_planning(self) -> Optional[BaseModel]:
"""Handles the Crew planning by creating detailed step-by-step plans for each task."""
planning_agent = self._create_planning_agent()
tasks_summary = self._create_tasks_summary()
planner_task = self._create_planner_task(planning_agent, tasks_summary)
return planner_task.execute_sync().pydantic
def _create_planning_agent(self) -> Agent:
"""Creates the planning agent for the crew planning."""
return Agent(
role="Task Execution Planner",
goal=(
"Your goal is to create an extremely detailed, step-by-step plan based on the tasks and tools "
"available to each agent so that they can perform the tasks in an exemplary manner"
),
backstory="Planner agent for crew planning",
)
def _create_planner_task(self, planning_agent: Agent, tasks_summary: str) -> Task:
"""Creates the planner task using the given agent and tasks summary."""
return Task(
description=(
f"Based on these tasks summary: {tasks_summary} \n Create the most descriptive plan based on the tasks "
"descriptions, tools available, and agents' goals for them to execute their goals with perfection."
),
expected_output="Step by step plan on how the agents can execute their tasks using the available tools with mastery",
agent=planning_agent,
output_pydantic=PlannerTaskPydanticOutput,
)
def _create_tasks_summary(self) -> str:
"""Creates a summary of all tasks."""
tasks_summary = []
for idx, task in enumerate(self.tasks):
tasks_summary.append(
f"""
Task Number {idx + 1} - {task.description}
"task_description": {task.description}
"task_expected_output": {task.expected_output}
"agent": {task.agent.role if task.agent else "None"}
"agent_goal": {task.agent.goal if task.agent else "None"}
"task_tools": {task.tools}
"agent_tools": {task.agent.tools if task.agent else "None"}
"""
)
return " ".join(tasks_summary)