Merge branch 'feature/procedure_v2' into brandon/cre-107-pipeline-conditional-routing

This commit is contained in:
Brandon Hancock
2024-07-31 11:31:30 -04:00
21 changed files with 98 additions and 68 deletions

View File

@@ -3,7 +3,6 @@ from typing import TYPE_CHECKING, Optional
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities import I18N
@@ -39,18 +38,17 @@ class CrewAgentExecutorMixin:
and "Action: Delegate work to coworker" not in output.log
):
try:
memory = ShortTermMemoryItem(
data=output.log,
agent=self.crew_agent.role,
metadata={
"observation": self.task.description,
},
)
if (
hasattr(self.crew, "_short_term_memory")
and self.crew._short_term_memory
):
self.crew._short_term_memory.save(memory)
self.crew._short_term_memory.save(
value=output.log,
metadata={
"observation": self.task.description,
},
agent=self.crew_agent.role,
)
except Exception as e:
print(f"Failed to add to short term memory: {e}")
pass

View File

@@ -6,9 +6,9 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
)
from .create_crew import create_crew
from .evaluate_crew import evaluate_crew
from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
from .test_crew import test_crew
from .train_crew import train_crew
@@ -144,7 +144,7 @@ def reset_memories(long, short, entities, kickoff_outputs, all):
def test(n_iterations: int, model: str):
"""Test the crew and evaluate the results."""
click.echo(f"Testing the crew for {n_iterations} iterations with model {model}")
test_crew(n_iterations, model)
evaluate_crew(n_iterations, model)
if __name__ == "__main__":

View File

@@ -1,13 +1,11 @@
import subprocess
import click
import pytest
pytest.skip(allow_module_level=True)
def test_crew(n_iterations: int, model: str) -> None:
def evaluate_crew(n_iterations: int, model: str) -> None:
"""
Test the crew by running a command in the Poetry environment.
Test and Evaluate the crew by running a command in the Poetry environment.
Args:
n_iterations (int): The number of iterations to test the crew.

View File

@@ -6,7 +6,7 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.41.1" }
crewai = { extras = ["tools"], version = "^0.46.0" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run"

View File

@@ -1,3 +1,4 @@
from typing import Any, Dict, Optional
from crewai.memory.memory import Memory
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.memory.storage.rag_storage import RAGStorage
@@ -18,7 +19,14 @@ class ShortTermMemory(Memory):
)
super().__init__(storage)
def save(self, item: ShortTermMemoryItem) -> None:
def save(
self,
value: Any,
metadata: Optional[Dict[str, Any]] = None,
agent: Optional[str] = None,
) -> None:
item = ShortTermMemoryItem(data=value, metadata=metadata, agent=agent)
super().save(value=item.data, metadata=item.metadata, agent=item.agent)
def search(self, query: str, score_threshold: float = 0.35):

View File

@@ -3,7 +3,10 @@ from typing import Any, Dict, Optional
class ShortTermMemoryItem:
def __init__(
self, data: Any, agent: str, metadata: Optional[Dict[str, Any]] = None
self,
data: Any,
agent: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
):
self.data = data
self.agent = agent

View File

@@ -4,7 +4,7 @@ from typing import Any, Dict
class Storage:
"""Abstract base class defining the storage interface"""
def save(self, key: str, value: Any, metadata: Dict[str, Any]) -> None:
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
pass
def search(self, key: str) -> Dict[str, Any]: # type: ignore

View File

@@ -54,7 +54,7 @@ Pipeline Terminology:
- Trace: The journey of an individual input through the entire pipeline.
Example pipeline structure:
crew1 >> crew2 >> crew3
crew1 >> crew2 >> crew3
This represents a pipeline with three sequential stages:
1. crew1 is the first stage, which processes the input and passes its output to crew2.

View File

@@ -1,3 +1,4 @@
import datetime
import json
import os
import threading
@@ -108,6 +109,7 @@ class Task(BaseModel):
_original_description: str | None = None
_original_expected_output: str | None = None
_thread: threading.Thread | None = None
_execution_time: float | None = None
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
@@ -121,6 +123,12 @@ class Task(BaseModel):
"may_not_set_field", "This field is not to be set by the user.", {}
)
def _set_start_execution_time(self) -> float:
return datetime.datetime.now().timestamp()
def _set_end_execution_time(self, start_time: float) -> None:
self._execution_time = datetime.datetime.now().timestamp() - start_time
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: str) -> str:
@@ -217,6 +225,7 @@ class Task(BaseModel):
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
start_time = self._set_start_execution_time()
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
self.prompt_context = context
@@ -240,6 +249,7 @@ class Task(BaseModel):
)
self.output = task_output
self._set_end_execution_time(start_time)
if self.callback:
self.callback(self.output)
@@ -251,7 +261,9 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)

View File

@@ -40,7 +40,7 @@ class Telemetry:
- Roles of agents in a crew
- Tools names available
Users can opt-in to sharing more complete data suing the `share_crew`
Users can opt-in to sharing more complete data using the `share_crew`
attribute in the Crew class.
"""

View File

@@ -28,6 +28,7 @@ class CrewEvaluator:
"""
tasks_scores: defaultdict = defaultdict(list)
run_execution_times: defaultdict = defaultdict(list)
iteration: int = 0
def __init__(self, crew, openai_model_name: str):
@@ -40,9 +41,6 @@ class CrewEvaluator:
for task in self.crew.tasks:
task.callback = self.evaluate
def set_iteration(self, iteration: int) -> None:
self.iteration = iteration
def _evaluator_agent(self):
return Agent(
role="Task Execution Evaluator",
@@ -71,6 +69,9 @@ class CrewEvaluator:
output_pydantic=TaskEvaluationPydanticOutput,
)
def set_iteration(self, iteration: int) -> None:
self.iteration = iteration
def print_crew_evaluation_result(self) -> None:
"""
Prints the evaluation result of the crew in a table.
@@ -119,6 +120,16 @@ class CrewEvaluator:
]
table.add_row("Crew", *map(str, crew_scores), f"{crew_average:.1f}")
run_exec_times = [
int(sum(tasks_exec_times))
for _, tasks_exec_times in self.run_execution_times.items()
]
execution_time_avg = int(sum(run_exec_times) / len(run_exec_times))
table.add_row(
"Execution Time (s)",
*map(str, run_exec_times),
f"{execution_time_avg}",
)
# Display the table in the terminal
console = Console()
console.print(table)
@@ -145,5 +156,8 @@ class CrewEvaluator:
if isinstance(evaluation_result.pydantic, TaskEvaluationPydanticOutput):
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(
current_task._execution_time
)
else:
raise ValueError("Evaluation result is not in the expected format")