Compare commits

..

13 Commits

Author SHA1 Message Date
Lorenze Jay
a567b54a8c added how to doc 2024-07-15 11:31:05 -07:00
Lorenze Jay
e802a2b866 fixed printing colors 2024-07-15 09:29:11 -07:00
Lorenze Jay
7178f8b765 separating ltm and replay db 2024-07-15 09:16:55 -07:00
Lorenze Jay
a0e59b7285 fixed docs and removed unneeded comments 2024-07-15 09:13:59 -07:00
Lorenze Jay
79912a87df fixed .db 2024-07-15 09:04:53 -07:00
Lorenze Jay
6bb909d77a added to docs and fixed tests 2024-07-15 08:24:34 -07:00
Lorenze Jay
d28ae857b7 tools fix 2024-07-15 08:06:28 -07:00
Lorenze Jay
f27c8e728d Merge branch 'main' of github.com:joaomdmoura/crewAI into replay-feat-using-db 2024-07-15 08:02:14 -07:00
Gui Vieira
b93632a53a [DO NOT MERGE] Provide inputs on crew creation (#898)
* Provide inputs on crew creation

* Better naming

* Add crew id and task index to tasks

* Fix type again
2024-07-15 09:00:02 -03:00
Eduardo Chiarotti
09938641cd feat: add max retry limit to agent execution (#899)
* feat: add max retry limit to agent execution

* feat: add test to max retry limit feature

* feat: add code execution docstring

---------

Co-authored-by: João Moura <joaomdmoura@gmail.com>
2024-07-15 08:58:50 -03:00
Brandon Hancock (bhancock_ai)
7acf0b2107 Feature/use converter instead of manually trimming (#894)
* Exploring output being passed to tool selector to see if we can better format data

* WIP. Adding JSON repair functionality

* Almost done implementing JSON repair. Testing fixes vs current base case.

* More action cleanup with additional tests

* WIP. Trying to figure out what is going on with tool descriptions

* Update tool description generation

* WIP. Trying to find out what is causing the tools to duplicate

* Replacing tools properly instead of duplicating them accidentally

* Fixing issues for MR

* Update dependencies for JSON_REPAIR

* More cleaning up pull request

* preppering for call

* Fix type-checking issues

---------

Co-authored-by: João Moura <joaomdmoura@gmail.com>
2024-07-15 08:53:41 -03:00
OP (oppenheimer)
4eb4073661 Add Groq - OpenAI Compatible API - details (#934) 2024-07-14 16:11:54 -03:00
Lorenze Jay
9eefa312ae using sqllite instead of .json file for logging previous task_outputs 2024-07-13 18:06:38 -07:00
28 changed files with 1001 additions and 264 deletions

View File

@@ -155,12 +155,15 @@ for async_result in async_results:
print(async_result)
```
These methods provide flexibility in how you manage and execute tasks within your crew, allowing for both synchronous and asynchronous workflows tailored to your needs
### Replaying from specific task:
You can now replay from a specific task using our cli command replay.
The replay_from_tasks feature in CrewAI allows you to replay from a specific task using the command-line interface (CLI). By running the command `crewai replay -t <task_id>`, you can specify the task name for the replay process.
The replay_from_tasks feature in CrewAI allows you to replay from a specific task using the command-line interface (CLI). By running the command `crewai replay -t <task_id>`, you can specify the `task_id` for the replay process.
Kickoffs will now create a `crew_tasks_ouput.json` file with the output of the tasks which you use to retrieve the task id to replay.
Kickoffs will now save the latest kickoffs returned task outputs locally for you to be able to replay from.
### Replaying from specific task Using the CLI
@@ -170,8 +173,15 @@ To use the replay feature, follow these steps:
2. Navigate to the directory where your CrewAI project is located.
3. Run the following command:
To view latest kickoff task_ids use:
```shell
crewai log-tasks-outputs
```
```shell
crewai replay -t <task_id>
```
These methods provide flexibility in how you manage and execute tasks within your crew, allowing for both synchronous and asynchronous workflows tailored to your needs
These commands let you replay from your latest kickoff tasks, still retaining context from previously executed tasks.

View File

@@ -127,7 +127,7 @@ llm = HuggingFaceHub(
```
## OpenAI Compatible API Endpoints
Switch between APIs and models seamlessly using environment variables, supporting platforms like FastChat, LM Studio, and Mistral AI.
Switch between APIs and models seamlessly using environment variables, supporting platforms like FastChat, LM Studio, Groq, and Mistral AI.
### Configuration Examples
#### FastChat
@@ -144,6 +144,13 @@ OPENAI_API_BASE="http://localhost:1234/v1"
OPENAI_API_KEY="lm-studio"
```
#### Groq API
```sh
OPENAI_API_KEY=your-groq-api-key
OPENAI_MODEL_NAME='llama3-8b-8192'
OPENAI_API_BASE=https://api.groq.com/openai/v1
```
#### Mistral API
```sh
OPENAI_API_KEY=your-mistral-api-key
@@ -211,4 +218,4 @@ azure_agent = Agent(
```
## Conclusion
Integrating CrewAI with different LLMs expands the framework's versatility, allowing for customized, efficient AI solutions across various domains and platforms.
Integrating CrewAI with different LLMs expands the framework's versatility, allowing for customized, efficient AI solutions across various domains and platforms.

View File

@@ -0,0 +1,49 @@
---
title: Replay Tasks from Latest Crew Kickoff
description: Replay tasks from the latest crew.kickoff(...)
---
## Introduction
CrewAI provides the ability to replay from a task specified from the latest crew kickoff. This feature is particularly useful when you've finished a kickoff and may want to retry certain tasks or don't need to refetch data over and your agents already have the context saved from the kickoff execution so you just need to replay the tasks you want to.
## Note:
You must run `crew.kickoff()` before you can replay a task. Currently, only the latest kickoff is supported, so if you use `kickoff_for_each`, it will only allow you to replay from the most recent crew run.
Here's an example of how to replay from a task:
### Replaying from specific task Using the CLI
To use the replay feature, follow these steps:
1. Open your terminal or command prompt.
2. Navigate to the directory where your CrewAI project is located.
3. Run the following command:
To view latest kickoff task_ids use:
```shell
crewai log-tasks-outputs
```
Once you have your task_id to replay from use:
```shell
crewai replay -t <task_id>
```
### Replaying from a task Programmatically
To replay from a task programmatically, use the following steps:
1. Specify the task_id and input parameters for the replay process.
2. Execute the replay command within a try-except block to handle potential errors.
```python
def replay_from_task():
"""
Replay the crew execution from a specific task.
"""
task_id = '<task_id>'
inputs = {"topic": "CrewAI Training"} # this is optional, you can pass in the inputs you want to replay otherwise uses the previous kickoffs inputs
try:
YourCrewName_Crew().crew().replay_from_task(task_id=task_id, inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")

View File

@@ -113,6 +113,11 @@ Cutting-edge framework for orchestrating role-playing, autonomous AI agents. By
Kickoff a Crew for a List
</a>
</li>
<li>
<a href="./how-to/Replay-tasks-from-latest-Crew-Kickoff">
Replay from a Task
</a>
</li>
<li>
<a href="./how-to/AgentOps-Observability">
Agent Monitoring with AgentOps

View File

@@ -145,6 +145,7 @@ nav:
- Human Input on Execution: 'how-to/Human-Input-on-Execution.md'
- Kickoff a Crew Asynchronously: 'how-to/Kickoff-async.md'
- Kickoff a Crew for a List: 'how-to/Kickoff-for-each.md'
- Replay from a specific task from a kickoff: 'how-to/Replay-tasks-from-latest-Crew-Kickoff.md'
- Agent Monitoring with AgentOps: 'how-to/AgentOps-Observability.md'
- Agent Monitoring with LangTrace: 'how-to/Langtrace-Observability.md'
- Tools Docs:

23
poetry.lock generated
View File

@@ -2282,6 +2282,17 @@ files = [
{file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"},
]
[[package]]
name = "json-repair"
version = "0.25.2"
description = "A package to repair broken json strings"
optional = false
python-versions = ">=3.7"
files = [
{file = "json_repair-0.25.2-py3-none-any.whl", hash = "sha256:51d67295c3184b6c41a3572689661c6128cef6cfc9fb04db63130709adfc5bf0"},
{file = "json_repair-0.25.2.tar.gz", hash = "sha256:161a56d7e6bbfd4cad3a614087e3e0dbd0e10d402dd20dc7db418432428cb32b"},
]
[[package]]
name = "jsonpatch"
version = "1.33"
@@ -2395,8 +2406,8 @@ langchain-core = ">=0.2.10,<0.3.0"
langchain-text-splitters = ">=0.2.0,<0.3.0"
langsmith = ">=0.1.17,<0.2.0"
numpy = [
{version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""},
{version = ">=1,<2", markers = "python_version < \"3.12\""},
{version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""},
]
pydantic = ">=1,<3"
PyYAML = ">=5.3"
@@ -2437,8 +2448,8 @@ langchain = ">=0.2.6,<0.3.0"
langchain-core = ">=0.2.10,<0.3.0"
langsmith = ">=0.1.0,<0.2.0"
numpy = [
{version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""},
{version = ">=1,<2", markers = "python_version < \"3.12\""},
{version = ">=1.26.0,<2.0.0", markers = "python_version >= \"3.12\""},
]
PyYAML = ">=5.3"
requests = ">=2,<3"
@@ -2461,8 +2472,8 @@ jsonpatch = ">=1.33,<2.0"
langsmith = ">=0.1.75,<0.2.0"
packaging = ">=23.2,<25"
pydantic = [
{version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""},
{version = ">=1,<3", markers = "python_full_version < \"3.12.4\""},
{version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""},
]
PyYAML = ">=5.3"
tenacity = ">=8.1.0,<8.4.0 || >8.4.0,<9.0.0"
@@ -2511,8 +2522,8 @@ files = [
[package.dependencies]
orjson = ">=3.9.14,<4.0.0"
pydantic = [
{version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""},
{version = ">=1,<3", markers = "python_full_version < \"3.12.4\""},
{version = ">=2.7.4,<3.0.0", markers = "python_full_version >= \"3.12.4\""},
]
requests = ">=2,<3"
@@ -3989,8 +4000,8 @@ files = [
annotated-types = ">=0.4.0"
pydantic-core = "2.20.1"
typing-extensions = [
{version = ">=4.12.2", markers = "python_version >= \"3.13\""},
{version = ">=4.6.1", markers = "python_version < \"3.13\""},
{version = ">=4.12.2", markers = "python_version >= \"3.13\""},
]
[package.extras]
@@ -6090,4 +6101,4 @@ tools = ["crewai-tools"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<=3.13"
content-hash = "0dbf6f6e2e841fb3eec4ff87ea5d6b430f29702118fee91307983c6b2581e59e"
content-hash = "2cf5a3904e7cbcfebb85e198b6035252d47213a9b0dd3dd51837516e03b38d3e"

View File

@@ -28,6 +28,7 @@ appdirs = "^1.4.4"
jsonref = "^1.1.0"
agentops = { version = "^0.1.9", optional = true }
embedchain = "^0.1.114"
json-repair = "^0.25.2"
[tool.poetry.extras]
tools = ["crewai-tools"]

View File

@@ -1,13 +1,14 @@
import os
from inspect import signature
from typing import Any, List, Optional, Tuple
from langchain.agents.agent import RunnableAgent
from langchain.agents.tools import BaseTool
from langchain.agents.tools import tool as LangChainTool
from langchain.tools.render import render_text_description
from langchain_core.agents import AgentAction
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from pydantic import Field, InstanceOf, model_validator
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -54,8 +55,11 @@ class Agent(BaseAgent):
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
callbacks: A list of callback functions from the langchain library that are triggered during the agent's execution process
allow_code_execution: Enable code execution for the agent.
max_retry_limit: Maximum number of retries for an agent to execute a task when an error occurs.
"""
_times_executed: int = PrivateAttr(default=0)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -96,6 +100,10 @@ class Agent(BaseAgent):
allow_code_execution: Optional[bool] = Field(
default=False, description="Enable code execution for the agent."
)
max_retry_limit: int = Field(
default=2,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
)
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
@@ -167,14 +175,16 @@ class Agent(BaseAgent):
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)
tools = tools or self.tools
parsed_tools = self._parse_tools(tools or []) # type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]"
tools = tools or self.tools or []
parsed_tools = self._parse_tools(tools)
self.create_agent_executor(tools=tools)
self.agent_executor.tools = parsed_tools
self.agent_executor.task = task
self.agent_executor.tools_description = render_text_description(parsed_tools)
# TODO: COMPARE WITH ARGS AND WITHOUT ARGS
self.agent_executor.tools_description = self._render_text_description_and_args(
parsed_tools
)
self.agent_executor.tools_names = self.__tools_names(parsed_tools)
if self.crew and self.crew._train:
@@ -182,13 +192,20 @@ class Agent(BaseAgent):
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
}
)["output"]
try:
result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
}
)["output"]
except Exception as e:
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
raise e
self.execute_task(task, context, tools)
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
@@ -220,7 +237,7 @@ class Agent(BaseAgent):
Returns:
An instance of the CrewAgentExecutor class.
"""
tools = tools or self.tools
tools = tools or self.tools or []
agent_args = {
"input": lambda x: x["input"],
@@ -315,6 +332,7 @@ class Agent(BaseAgent):
tools_list = []
for tool in tools:
tools_list.append(tool)
return tools_list
def _training_handler(self, task_prompt: str) -> str:
@@ -341,6 +359,52 @@ class Agent(BaseAgent):
)
return task_prompt
def _render_text_description(self, tools: List[BaseTool]) -> str:
"""Render the tool name and description in plain text.
Output will be in the format of:
.. code-block:: markdown
search: This tool is used for search
calculator: This tool is used for math
"""
description = "\n".join(
[
f"Tool name: {tool.name}\nTool description:\n{tool.description}"
for tool in tools
]
)
return description
def _render_text_description_and_args(self, tools: List[BaseTool]) -> str:
"""Render the tool name, description, and args in plain text.
Output will be in the format of:
.. code-block:: markdown
search: This tool is used for search, args: {"query": {"type": "string"}}
calculator: This tool is used for math, \
args: {"expression": {"type": "string"}}
"""
tool_strings = []
for tool in tools:
args_schema = str(tool.args)
if hasattr(tool, "func") and tool.func:
sig = signature(tool.func)
description = (
f"Tool Name: {tool.name}{sig}\nTool Description: {tool.description}"
)
else:
description = (
f"Tool Name: {tool.name}\nTool Description: {tool.description}"
)
tool_strings.append(f"{description}\nTool Arguments: {args_schema}")
return "\n".join(tool_strings)
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])

View File

@@ -180,7 +180,7 @@ class BaseAgent(ABC, BaseModel):
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]):
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[Any]:
"""Set the task tools that init BaseAgenTools class."""
pass

View File

@@ -24,6 +24,7 @@ class BaseAgentTools(BaseModel, ABC):
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return coworker
def delegate_work(
@@ -40,11 +41,13 @@ class BaseAgentTools(BaseModel, ABC):
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, question, context)
def _execute(self, agent: Union[str, None], task: str, context: Union[str, None]):
def _execute(
self, agent_name: Union[str, None], task: str, context: Union[str, None]
):
"""Execute the command."""
try:
if agent is None:
agent = ""
if agent_name is None:
agent_name = ""
# It is important to remove the quotes from the agent name.
# The reason we have to do this is because less-powerful LLM's
@@ -53,7 +56,7 @@ class BaseAgentTools(BaseModel, ABC):
# {"task": "....", "coworker": "....
# when it should look like this:
# {"task": "....", "coworker": "...."}
agent_name = agent.casefold().replace('"', "").replace("\n", "")
agent_name = agent_name.casefold().replace('"', "").replace("\n", "")
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
available_agent
@@ -75,9 +78,9 @@ class BaseAgentTools(BaseModel, ABC):
)
agent = agent[0]
task = Task( # type: ignore # Incompatible types in assignment (expression has type "Task", variable has type "str")
task_with_assigned_agent = Task( # type: ignore # Incompatible types in assignment (expression has type "Task", variable has type "str")
description=task,
agent=agent,
expected_output="Your best answer to your coworker asking you this, accounting for the context shared.",
)
return agent.execute_task(task, context) # type: ignore # "str" has no attribute "execute_task"
return agent.execute_task(task_with_assigned_agent, context)

View File

@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any, Optional
from pydantic import BaseModel, Field, PrivateAttr
from pydantic import BaseModel, Field
class OutputConverter(BaseModel, ABC):
@@ -21,7 +21,6 @@ class OutputConverter(BaseModel, ABC):
max_attempts (int): Maximum number of conversion attempts (default: 3).
"""
_is_gpt: bool = PrivateAttr(default=True)
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
@@ -41,7 +40,8 @@ class OutputConverter(BaseModel, ABC):
"""Convert text to json."""
pass
@abstractmethod # type: ignore # Name "_is_gpt" already defined on line 25
def _is_gpt(self, llm): # type: ignore # Name "_is_gpt" already defined on line 25
@property
@abstractmethod
def is_gpt(self) -> bool:
"""Return if llm provided is of gpt from openai."""
pass

View File

@@ -1,14 +1,6 @@
import threading
import time
from typing import (
Any,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
)
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from langchain.agents import AgentExecutor
from langchain.agents.agent import ExceptionTool
@@ -19,9 +11,7 @@ from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from pydantic import InstanceOf
from crewai.agents.agent_builder.base_agent_executor_mixin import (
CrewAgentExecutorMixin,
)
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.tools_handler import ToolsHandler
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N

View File

@@ -1,6 +1,7 @@
import re
from typing import Any, Union
from json_repair import repair_json
from langchain.agents.output_parsers import ReActSingleInputOutputParser
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.exceptions import OutputParserException
@@ -48,11 +49,15 @@ class CrewAgentParser(ReActSingleInputOutputParser):
raise OutputParserException(
f"{FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE}: {text}"
)
action = action_match.group(1).strip()
action_input = action_match.group(2)
tool_input = action_input.strip(" ")
tool_input = tool_input.strip('"')
return AgentAction(action, tool_input, text)
action = action_match.group(1)
clean_action = self._clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = self._safe_repair_json(tool_input)
return AgentAction(clean_action, safe_tool_input, text)
elif includes_answer:
return AgentFinish(
@@ -87,3 +92,30 @@ class CrewAgentParser(ReActSingleInputOutputParser):
llm_output=text,
send_to_llm=True,
)
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""
return re.sub(r"^\s*\*+\s*|\s*\*+\s*$", "", text).strip()
def _safe_repair_json(self, tool_input: str) -> str:
UNABLE_TO_REPAIR_JSON_RESULTS = ['""', "{}"]
# Skip repair if the input starts and ends with square brackets
# Explanation: The JSON parser has issues handling inputs that are enclosed in square brackets ('[]').
# These are typically valid JSON arrays or strings that do not require repair. Attempting to repair such inputs
# might lead to unintended alterations, such as wrapping the entire input in additional layers or modifying
# the structure in a way that changes its meaning. By skipping the repair for inputs that start and end with
# square brackets, we preserve the integrity of these valid JSON structures and avoid unnecessary modifications.
if tool_input.startswith("[") and tool_input.endswith("]"):
return tool_input
# Before repair, handle common LLM issues:
# 1. Replace """ with " to avoid JSON parser errors
tool_input = tool_input.replace('"""', '"')
result = repair_json(tool_input)
if result in UNABLE_TO_REPAIR_JSON_RESULTS:
return tool_input
return str(result)

View File

@@ -1,11 +1,14 @@
import click
import pkg_resources
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
from .create_crew import create_crew
from .train_crew import train_crew
from .replay_from_task import replay_task_command
from .list_task_outputs import show_task_outputs
@click.group()
@@ -73,9 +76,27 @@ def replay(task_id: str) -> None:
@crewai.command()
def list_completed_tasks_ids():
"""List all task outputs saved from crew_tasks_output.json."""
show_task_outputs()
def log_tasks_outputs() -> None:
"""
Retrieve your latest crew.kickoff() task outputs.
"""
try:
storage = KickoffTaskOutputsSQLiteStorage()
tasks = storage.load()
if not tasks:
click.echo(
"No task outputs found. Only crew kickoff task outputs are logged."
)
return
for index, task in enumerate(tasks, 1):
click.echo(f"Task {index}: {task['task_id']}")
click.echo(f"Description: {task['expected_output']}")
click.echo("------")
except Exception as e:
click.echo(f"An error occurred while logging task outputs: {e}", err=True)
if __name__ == "__main__":

View File

@@ -1,34 +0,0 @@
import subprocess
import click
from pathlib import Path
import json
def show_task_outputs() -> None:
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
try:
file_path = Path("crew_tasks_output.json")
if not file_path.exists():
click.echo("crew_tasks_output.json not found.")
return
with open(file_path, "r") as f:
tasks = json.load(f)
for index, task in enumerate(tasks):
click.echo(f"Task {index + 1}: {task['task_id']}")
click.echo(f"Description: {task['output']['description']}")
click.echo("---")
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while replaying the task: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -9,7 +9,7 @@ def replay_task_command(task_id: str) -> None:
Args:
task_id (str): The ID of the task to replay from.
"""
command = ["poetry", "run", "replay_from_task", task_id]
command = ["poetry", "run", "replay", task_id]
try:
result = subprocess.run(command, capture_output=False, text=True, check=True)

View File

@@ -32,15 +32,12 @@ from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import (
CREW_TASKS_OUTPUT_FILE,
TRAINED_AGENTS_DATA_FILE,
TRAINING_DATA_FILE,
)
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.task_output_handler import (
ExecutionLog,
TaskOutputJsonHandler,
)
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -92,6 +89,9 @@ class Crew(BaseModel):
_logging_color: str = PrivateAttr(
default="bold_purple",
)
_task_output_handler: TaskOutputStorageHandler = PrivateAttr(
default_factory=TaskOutputStorageHandler
)
cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
@@ -151,7 +151,7 @@ class Crew(BaseModel):
default=None,
description="List of file paths for task execution JSON files.",
)
execution_logs: List[ExecutionLog] = Field(
execution_logs: List[Dict[str, Any]] = Field(
default=[],
description="List of execution logs for tasks",
)
@@ -190,7 +190,6 @@ class Crew(BaseModel):
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self._telemetry.crew_creation(self)
return self
@model_validator(mode="after")
@@ -397,8 +396,7 @@ class Crew(BaseModel):
) -> CrewOutput:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
TaskOutputJsonHandler(CREW_TASKS_OUTPUT_FILE).initialize_file()
TaskOutputJsonHandler(CREW_TASKS_OUTPUT_FILE).reset()
self._task_output_handler.reset()
self._logging_color = "bold_purple"
if inputs is not None:
@@ -466,6 +464,7 @@ class Crew(BaseModel):
results.append(output)
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput:
@@ -514,7 +513,7 @@ class Crew(BaseModel):
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
def _store_execution_log(
@@ -529,10 +528,9 @@ class Crew(BaseModel):
else:
inputs = {}
log = ExecutionLog(
task_id=str(task.id),
expected_output=task.expected_output,
output={
log = {
"task": task,
"output": {
"description": output.description,
"summary": output.summary,
"raw": output.raw,
@@ -541,16 +539,11 @@ class Crew(BaseModel):
"output_format": output.output_format,
"agent": output.agent,
},
task_index=task_index,
inputs=inputs,
was_replayed=was_replayed,
)
if task_index < len(self.execution_logs):
self.execution_logs[task_index] = log
else:
self.execution_logs.append(log)
TaskOutputJsonHandler(CREW_TASKS_OUTPUT_FILE).update(task_index, log)
"task_index": task_index,
"inputs": inputs,
"was_replayed": was_replayed,
}
self._task_output_handler.update(task_index, log)
def _run_sequential_process(self) -> CrewOutput:
"""Executes tasks sequentially and returns the final output."""
@@ -662,8 +655,29 @@ class Crew(BaseModel):
def _add_delegation_tools(self, task: Task):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and agents_for_delegation:
task.tools += task.agent.get_delegation_tools(agents_for_delegation) # type: ignore
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
delegation_tools = task.agent.get_delegation_tools(agents_for_delegation)
# Add tools if they are not already in task.tools
for new_tool in delegation_tools:
# Find the index of the tool with the same name
existing_tool_index = next(
(
index
for index, tool in enumerate(task.tools or [])
if tool.name == new_tool.name
),
None,
)
if not task.tools:
task.tools = []
if existing_tool_index is not None:
# Replace the existing tool
task.tools[existing_tool_index] = new_tool
else:
# Add the new tool
task.tools.append(new_tool)
def _log_task_start(self, task: Task, agent: Optional[BaseAgent]):
color = self._logging_color
@@ -741,7 +755,10 @@ class Crew(BaseModel):
def replay_from_task(
self, task_id: str, inputs: Optional[Dict[str, Any]] = None
) -> CrewOutput:
stored_outputs = TaskOutputJsonHandler(CREW_TASKS_OUTPUT_FILE).load()
stored_outputs = self._task_output_handler.load()
if not stored_outputs:
raise ValueError(f"Task with id {task_id} not found in the crew's tasks.")
start_index = self._find_task_index(task_id, stored_outputs)
if start_index is None:
@@ -759,7 +776,9 @@ class Crew(BaseModel):
self._create_manager_agent()
for i in range(start_index):
stored_output = stored_outputs[i]["output"]
stored_output = stored_outputs[i][
"output"
] # for adding context to the task
task_output = TaskOutput(
description=stored_output["description"],
agent=stored_output["agent"],

View File

@@ -0,0 +1,166 @@
import json
import sqlite3
from typing import Any, Dict, List, Optional
from crewai.task import Task
from crewai.utilities import Printer
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
from crewai.utilities.paths import db_storage_path
class KickoffTaskOutputsSQLiteStorage:
"""
An updated SQLite storage class for kickoff task outputs storage.
"""
def __init__(
self, db_path: str = f"{db_storage_path()}/latest_kickoff_task_outputs.db"
) -> None:
self.db_path = db_path
self._printer: Printer = Printer()
self._initialize_db()
def _initialize_db(self):
"""
Initializes the SQLite database and creates LTM table
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS latest_kickoff_task_outputs (
task_id TEXT PRIMARY KEY,
expected_output TEXT,
output JSON,
task_index INTEGER,
inputs JSON,
was_replayed BOOLEAN,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"SAVING KICKOFF TASK OUTPUTS ERROR: An error occurred during database initialization: {e}",
color="red",
)
def add(
self,
task: Task,
output: Dict[str, Any],
task_index: int,
was_replayed: bool = False,
inputs: Dict[str, Any] = {},
):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR REPLACE INTO latest_kickoff_task_outputs
(task_id, expected_output, output, task_index, inputs, was_replayed)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
str(task.id),
task.expected_output,
json.dumps(output, cls=CrewJSONEncoder),
task_index,
json.dumps(inputs),
was_replayed,
),
)
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"SAVING KICKOFF TASK OUTPUTS ERROR: An error occurred during database initialization: {e}",
color="red",
)
def update(
self,
task_index: int,
**kwargs,
):
"""
Updates an existing row in the latest_kickoff_task_outputs table based on task_index.
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
fields = []
values = []
for key, value in kwargs.items():
fields.append(f"{key} = ?")
values.append(
json.dumps(value, cls=CrewJSONEncoder)
if isinstance(value, dict)
else value
)
query = f"UPDATE latest_kickoff_task_outputs SET {', '.join(fields)} WHERE task_index = ?"
values.append(task_index)
cursor.execute(query, tuple(values))
conn.commit()
if cursor.rowcount == 0:
self._printer.print(
f"No row found with task_index {task_index}. No update performed.",
color="red",
)
except sqlite3.Error as e:
self._printer.print(f"UPDATE KICKOFF TASK OUTPUTS ERROR: {e}", color="red")
def load(self) -> Optional[List[Dict[str, Any]]]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM latest_kickoff_task_outputs
ORDER BY task_index
""")
rows = cursor.fetchall()
results = []
for row in rows:
result = {
"task_id": row[0],
"expected_output": row[1],
"output": json.loads(row[2]),
"task_index": row[3],
"inputs": json.loads(row[4]),
"was_replayed": row[5],
"timestamp": row[6],
}
results.append(result)
return results
except sqlite3.Error as e:
self._printer.print(
content=f"LOADING KICKOFF TASK OUTPUTS ERROR: An error occurred while querying kickoff task outputs: {e}",
color="red",
)
return None
def delete_all(self):
"""
Deletes all rows from the latest_kickoff_task_outputs table.
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM latest_kickoff_task_outputs")
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"ERROR: Failed to delete all kickoff task outputs: {e}",
color="red",
)

View File

@@ -80,7 +80,7 @@ class Telemetry:
self.ready = False
self.trace_set = False
def crew_creation(self, crew):
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
"""Records the creation of a crew."""
if self.ready:
try:
@@ -93,6 +93,12 @@ class Telemetry:
)
self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_id", str(crew.id))
if crew.share_crew:
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
@@ -114,7 +120,7 @@ class Telemetry:
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [
tool.name.casefold() for tool in agent.tools
tool.name.casefold() for tool in agent.tools or []
],
}
for agent in crew.agents
@@ -139,7 +145,7 @@ class Telemetry:
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools
tool.name.casefold() for tool in task.tools or []
],
}
for task in crew.tasks
@@ -161,10 +167,11 @@ class Telemetry:
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Task Execution")
created_span = tracer.start_span("Task Created")
self._add_attribute(created_span, "crew_id", str(crew.id))
self._add_attribute(created_span, "task_index", crew.tasks.index(task))
self._add_attribute(created_span, "task_id", str(task.id))
if crew.share_crew:
@@ -178,6 +185,10 @@ class Telemetry:
created_span.set_status(Status(StatusCode.OK))
created_span.end()
span = tracer.start_span("Task Execution")
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "task_index", crew.tasks.index(task))
self._add_attribute(span, "task_id", str(task.id))
if crew.share_crew:
@@ -275,6 +286,8 @@ class Telemetry:
"""
if (self.ready) and (crew.share_crew):
try:
self.crew_creation(crew, inputs)
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Execution")
self._add_attribute(
@@ -283,7 +296,9 @@ class Telemetry:
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "inputs", json.dumps(inputs))
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
self._add_attribute(
span,
"crew_agents",

View File

@@ -7,7 +7,7 @@ class AgentTools(BaseAgentTools):
"""Default tools around agent delegation"""
def tools(self):
coworkers = f"[{', '.join([f'{agent.role}' for agent in self.agents])}]"
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])
tools = [
StructuredTool.from_function(
func=self.delegate_work,

View File

@@ -1,3 +1,2 @@
TRAINING_DATA_FILE = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE = "trained_agents_data.pkl"
CREW_TASKS_OUTPUT_FILE = "crew_tasks_output.json"

View File

@@ -2,10 +2,8 @@ import json
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import model_validator
from crewai.agents.agent_builder.utilities.base_output_converter_base import (
OutputConverter,
)
from crewai.agents.agent_builder.utilities.base_output_converter import OutputConverter
class ConverterError(Exception):
@@ -19,15 +17,10 @@ class ConverterError(Exception):
class Converter(OutputConverter):
"""Class that converts text into either pydantic or json."""
@model_validator(mode="after")
def check_llm_provider(self):
if not self._is_gpt(self.llm):
self._is_gpt = False
def to_pydantic(self, current_attempt=1):
"""Convert text to pydantic."""
try:
if self._is_gpt:
if self.is_gpt:
return self._create_instructor().to_pydantic()
else:
return self._create_chain().invoke({})
@@ -41,7 +34,7 @@ class Converter(OutputConverter):
def to_json(self, current_attempt=1):
"""Convert text to json."""
try:
if self._is_gpt:
if self.is_gpt:
return self._create_instructor().to_json()
else:
return json.dumps(self._create_chain().invoke({}).model_dump())
@@ -75,5 +68,7 @@ class Converter(OutputConverter):
)
return new_prompt | self.llm | parser
def _is_gpt(self, llm) -> bool: # type: ignore # BUG? Name "_is_gpt" defined on line 20 hides name from outer scope
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
@property
def is_gpt(self) -> bool:
"""Return if llm provided is of gpt from openai."""
return isinstance(self.llm, ChatOpenAI) and self.llm.openai_api_base is None

View File

@@ -1,69 +0,0 @@
import json
import os
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Dict, Any, Optional, List
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
class ExecutionLog(BaseModel):
task_id: str
expected_output: Optional[str] = None
output: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.now)
task_index: int
inputs: Dict[str, Any] = Field(default_factory=dict)
was_replayed: bool = False
def __getitem__(self, key: str) -> Any:
return getattr(self, key)
class TaskOutputJsonHandler:
def __init__(self, file_name: str) -> None:
self.file_path = os.path.join(os.getcwd(), file_name)
def initialize_file(self) -> None:
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
with open(self.file_path, "w") as file:
json.dump([], file)
def update(self, task_index: int, log: ExecutionLog):
logs = self.load()
if task_index < len(logs):
logs[task_index] = log
else:
logs.append(log)
self.save(logs)
def save(self, logs: List[ExecutionLog]):
with open(self.file_path, "w") as file:
json.dump(logs, file, indent=2, cls=CrewJSONEncoder)
def reset(self):
"""Reset the JSON file by creating an empty file."""
with open(self.file_path, "w") as f:
json.dump([], f)
def load(self) -> List[ExecutionLog]:
try:
if (
not os.path.exists(self.file_path)
or os.path.getsize(self.file_path) == 0
):
return []
with open(self.file_path, "r") as file:
return json.load(file)
except FileNotFoundError:
print(f"File {self.file_path} not found. Returning empty list.")
return []
except json.JSONDecodeError:
print(
f"Error decoding JSON from file {self.file_path}. Returning empty list."
)
return []
except Exception as e:
print(f"An unexpected error occurred: {e}")
return []

View File

@@ -0,0 +1,61 @@
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Dict, Any, Optional, List
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
from crewai.task import Task
class ExecutionLog(BaseModel):
task_id: str
expected_output: Optional[str] = None
output: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.now)
task_index: int
inputs: Dict[str, Any] = Field(default_factory=dict)
was_replayed: bool = False
def __getitem__(self, key: str) -> Any:
return getattr(self, key)
class TaskOutputStorageHandler:
def __init__(self) -> None:
self.storage = KickoffTaskOutputsSQLiteStorage()
def update(self, task_index: int, log: Dict[str, Any]):
saved_outputs = self.load()
if saved_outputs is None:
raise ValueError("Logs cannot be None")
if log.get("was_replayed", False):
replayed = {
"task_id": str(log["task"].id),
"expected_output": log["task"].expected_output,
"output": log["output"],
"was_replayed": log["was_replayed"],
"inputs": log["inputs"],
}
self.storage.update(
task_index,
**replayed,
)
else:
self.storage.add(**log)
def add(
self,
task: Task,
output: Dict[str, Any],
task_index: int,
inputs: Dict[str, Any] = {},
was_replayed: bool = False,
):
self.storage.add(task, output, task_index, was_replayed, inputs)
def reset(self):
self.storage.delete_all()
def load(self) -> Optional[List[Dict[str, Any]]]:
return self.storage.load()

View File

@@ -963,3 +963,54 @@ def test_agent_use_trained_data(crew_training_handler):
crew_training_handler.assert_has_calls(
[mock.call(), mock.call("trained_agents_data.pkl"), mock.call().load()]
)
def test_agent_max_retry_limit():
agent = Agent(
role="test role",
goal="test goal",
backstory="test backstory",
max_retry_limit=1,
)
task = Task(
agent=agent,
description="Say the word: Hi",
expected_output="The word: Hi",
human_input=True,
)
error_message = "Error happening while sending prompt to model."
with patch.object(
CrewAgentExecutor, "invoke", wraps=agent.agent_executor.invoke
) as invoke_mock:
invoke_mock.side_effect = Exception(error_message)
assert agent._times_executed == 0
assert agent.max_retry_limit == 1
with pytest.raises(Exception) as e:
agent.execute_task(
task=task,
)
assert e.value.args[0] == error_message
assert agent._times_executed == 2
invoke_mock.assert_has_calls(
[
mock.call(
{
"input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi \n you MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
}
),
mock.call(
{
"input": "Say the word: Hi\n\nThis is the expect criteria for your final answer: The word: Hi \n you MUST return the actual complete content as the final answer, not a summary.",
"tool_names": "",
"tools": "",
}
),
]
)

0
tests/agents/__init__.py Normal file
View File

View File

@@ -0,0 +1,378 @@
import pytest
from crewai.agents.parser import CrewAgentParser
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.exceptions import OutputParserException
@pytest.fixture
def parser():
p = CrewAgentParser()
p.agent = MockAgent()
return p
def test_valid_action_parsing_special_characters(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: what's the temperature in SF?"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what's the temperature in SF?"
def test_valid_action_parsing_with_json_tool_input(parser):
text = """
Thought: Let's find the information
Action: query
Action Input: ** {"task": "What are some common challenges or barriers that you have observed or experienced when implementing AI-powered solutions in healthcare settings?", "context": "As we've discussed recent advancements in AI applications in healthcare, it's crucial to acknowledge the potential hurdles. Some possible obstacles include...", "coworker": "Senior Researcher"}
"""
result = parser.parse(text)
assert isinstance(result, AgentAction)
expected_tool_input = '{"task": "What are some common challenges or barriers that you have observed or experienced when implementing AI-powered solutions in healthcare settings?", "context": "As we\'ve discussed recent advancements in AI applications in healthcare, it\'s crucial to acknowledge the potential hurdles. Some possible obstacles include...", "coworker": "Senior Researcher"}'
assert result.tool == "query"
assert result.tool_input == expected_tool_input
def test_valid_action_parsing_with_quotes(parser):
text = 'Thought: Let\'s find the temperature\nAction: search\nAction Input: "temperature in SF"'
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "temperature in SF"
def test_valid_action_parsing_with_curly_braces(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: {temperature in SF}"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "{temperature in SF}"
def test_valid_action_parsing_with_angle_brackets(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: <temperature in SF>"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "<temperature in SF>"
def test_valid_action_parsing_with_parentheses(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: (temperature in SF)"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "(temperature in SF)"
def test_valid_action_parsing_with_mixed_brackets(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: [temperature in {SF}]"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "[temperature in {SF}]"
def test_valid_action_parsing_with_nested_quotes(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: \"what's the temperature in 'SF'?\""
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what's the temperature in 'SF'?"
def test_valid_action_parsing_with_incomplete_json(parser):
text = 'Thought: Let\'s find the temperature\nAction: search\nAction Input: {"query": "temperature in SF"'
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == '{"query": "temperature in SF"}'
def test_valid_action_parsing_with_special_characters(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: what is the temperature in SF? @$%^&*"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what is the temperature in SF? @$%^&*"
def test_valid_action_parsing_with_combination(parser):
text = 'Thought: Let\'s find the temperature\nAction: search\nAction Input: "[what is the temperature in SF?]"'
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "[what is the temperature in SF?]"
def test_valid_action_parsing_with_mixed_quotes(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: \"what's the temperature in SF?\""
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what's the temperature in SF?"
def test_valid_action_parsing_with_newlines(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: what is\nthe temperature in SF?"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what is\nthe temperature in SF?"
def test_valid_action_parsing_with_escaped_characters(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: what is the temperature in SF? \\n"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what is the temperature in SF? \\n"
def test_valid_action_parsing_with_json_string(parser):
text = 'Thought: Let\'s find the temperature\nAction: search\nAction Input: {"query": "temperature in SF"}'
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == '{"query": "temperature in SF"}'
def test_valid_action_parsing_with_unbalanced_quotes(parser):
text = "Thought: Let's find the temperature\nAction: search\nAction Input: \"what is the temperature in SF?"
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what is the temperature in SF?"
def test_clean_action_no_formatting(parser):
action = "Ask question to senior researcher"
cleaned_action = parser._clean_action(action)
assert cleaned_action == "Ask question to senior researcher"
def test_clean_action_with_leading_asterisks(parser):
action = "** Ask question to senior researcher"
cleaned_action = parser._clean_action(action)
assert cleaned_action == "Ask question to senior researcher"
def test_clean_action_with_trailing_asterisks(parser):
action = "Ask question to senior researcher **"
cleaned_action = parser._clean_action(action)
assert cleaned_action == "Ask question to senior researcher"
def test_clean_action_with_leading_and_trailing_asterisks(parser):
action = "** Ask question to senior researcher **"
cleaned_action = parser._clean_action(action)
assert cleaned_action == "Ask question to senior researcher"
def test_clean_action_with_multiple_leading_asterisks(parser):
action = "**** Ask question to senior researcher"
cleaned_action = parser._clean_action(action)
assert cleaned_action == "Ask question to senior researcher"
def test_clean_action_with_multiple_trailing_asterisks(parser):
action = "Ask question to senior researcher ****"
cleaned_action = parser._clean_action(action)
assert cleaned_action == "Ask question to senior researcher"
def test_clean_action_with_spaces_and_asterisks(parser):
action = " ** Ask question to senior researcher ** "
cleaned_action = parser._clean_action(action)
print(f"Original action: '{action}'")
print(f"Cleaned action: '{cleaned_action}'")
assert cleaned_action == "Ask question to senior researcher"
def test_clean_action_with_only_asterisks(parser):
action = "****"
cleaned_action = parser._clean_action(action)
assert cleaned_action == ""
def test_clean_action_with_empty_string(parser):
action = ""
cleaned_action = parser._clean_action(action)
assert cleaned_action == ""
def test_valid_final_answer_parsing(parser):
text = (
"Thought: I found the information\nFinal Answer: The temperature is 100 degrees"
)
result = parser.parse(text)
assert isinstance(result, AgentFinish)
assert result.return_values["output"] == "The temperature is 100 degrees"
def test_missing_action_error(parser):
text = "Thought: Let's find the temperature\nAction Input: what is the temperature in SF?"
with pytest.raises(OutputParserException) as exc_info:
parser.parse(text)
assert "Could not parse LLM output" in str(exc_info.value)
def test_missing_action_input_error(parser):
text = "Thought: Let's find the temperature\nAction: search"
with pytest.raises(OutputParserException) as exc_info:
parser.parse(text)
assert "Could not parse LLM output" in str(exc_info.value)
def test_action_and_final_answer_error(parser):
text = "Thought: I found the information\nAction: search\nAction Input: what is the temperature in SF?\nFinal Answer: The temperature is 100 degrees"
with pytest.raises(OutputParserException) as exc_info:
parser.parse(text)
assert "both perform Action and give a Final Answer" in str(exc_info.value)
def test_safe_repair_json(parser):
invalid_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": Senior Researcher'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_unrepairable(parser):
invalid_json = "{invalid_json"
result = parser._safe_repair_json(invalid_json)
print("result:", invalid_json)
assert result == invalid_json # Should return the original if unrepairable
def test_safe_repair_json_missing_quotes(parser):
invalid_json = (
'{task: "Research XAI", context: "Explainable AI", coworker: Senior Researcher}'
)
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_unclosed_brackets(parser):
invalid_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_extra_commas(parser):
invalid_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher",}'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_trailing_commas(parser):
invalid_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher",}'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_single_quotes(parser):
invalid_json = "{'task': 'Research XAI', 'context': 'Explainable AI', 'coworker': 'Senior Researcher'}"
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_mixed_quotes(parser):
invalid_json = "{'task': \"Research XAI\", 'context': \"Explainable AI\", 'coworker': 'Senior Researcher'}"
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_unescaped_characters(parser):
invalid_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher\n"}'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
print("result:", result)
assert result == expected_repaired_json
def test_safe_repair_json_missing_colon(parser):
invalid_json = '{"task" "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_missing_comma(parser):
invalid_json = '{"task": "Research XAI" "context": "Explainable AI", "coworker": "Senior Researcher"}'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_unexpected_trailing_characters(parser):
invalid_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"} random text'
expected_repaired_json = '{"task": "Research XAI", "context": "Explainable AI", "coworker": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_safe_repair_json_special_characters_key(parser):
invalid_json = '{"task!@#": "Research XAI", "context$%^": "Explainable AI", "coworker&*()": "Senior Researcher"}'
expected_repaired_json = '{"task!@#": "Research XAI", "context$%^": "Explainable AI", "coworker&*()": "Senior Researcher"}'
result = parser._safe_repair_json(invalid_json)
assert result == expected_repaired_json
def test_parsing_with_whitespace(parser):
text = " Thought: Let's find the temperature \n Action: search \n Action Input: what is the temperature in SF? "
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what is the temperature in SF?"
def test_parsing_with_special_characters(parser):
text = 'Thought: Let\'s find the temperature\nAction: search\nAction Input: "what is the temperature in SF?"'
result = parser.parse(text)
assert isinstance(result, AgentAction)
assert result.tool == "search"
assert result.tool_input == "what is the temperature in SF?"
def test_integration_valid_and_invalid(parser):
text = """
Thought: Let's find the temperature
Action: search
Action Input: what is the temperature in SF?
Thought: I found the information
Final Answer: The temperature is 100 degrees
Thought: Missing action
Action Input: invalid
Thought: Missing action input
Action: invalid
"""
parts = text.strip().split("\n\n")
results = []
for part in parts:
try:
result = parser.parse(part.strip())
results.append(result)
except OutputParserException as e:
results.append(e)
assert isinstance(results[0], AgentAction)
assert isinstance(results[1], AgentFinish)
assert isinstance(results[2], OutputParserException)
assert isinstance(results[3], OutputParserException)
class MockAgent:
def increment_formatting_errors(self):
pass
# TODO: ADD TEST TO MAKE SURE ** REMOVAL DOESN'T MESS UP ANYTHING

View File

@@ -1,6 +1,5 @@
"""Test Agent creation and execution basic functionality."""
import os
import json
from concurrent.futures import Future
from unittest import mock
@@ -19,7 +18,7 @@ from crewai.task import Task
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import Logger, RPMController
from crewai.utilities.constants import CREW_TASKS_OUTPUT_FILE
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
ceo = Agent(
role="CEO",
@@ -1270,7 +1269,7 @@ def test_hierarchical_crew_creation_tasks_with_agents():
assert crew.manager_agent.tools is not None
print("TOOL DESCRIPTION", crew.manager_agent.tools[0].description)
assert crew.manager_agent.tools[0].description.startswith(
"Delegate a specific task to one of the following coworkers: [Senior Writer, Researcher]"
"Delegate a specific task to one of the following coworkers: Senior Writer"
)
@@ -1861,7 +1860,7 @@ def test_crew_replay_from_task_error():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_task_output_file_creation():
def test_crew_task_db_init():
agent = Agent(
role="Content Writer",
goal="Write engaging content on various topics.",
@@ -1889,46 +1888,13 @@ def test_crew_task_output_file_creation():
crew.kickoff()
# Check if the crew_tasks_output.json file is created
assert os.path.exists(CREW_TASKS_OUTPUT_FILE)
# Clean up the file after test
if os.path.exists(CREW_TASKS_OUTPUT_FILE):
os.remove(CREW_TASKS_OUTPUT_FILE)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_replay_without_output_tasks_json():
agent = Agent(
role="Technical Writer",
goal="Write detailed technical documentation.",
backstory="You have a background in software engineering and technical writing.",
)
task = Task(
description="Document the process of setting up a Python project.",
expected_output="A step-by-step guide on setting up a Python project.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
with patch.object(Task, "execute_sync") as mock_execute_task:
mock_execute_task.return_value = TaskOutput(
description="Document the process of setting up a Python project.",
raw="To set up a Python project, first create a virtual environment...",
agent="Technical Writer",
json_dict=None,
output_format=OutputFormat.RAW,
pydantic=None,
summary="Document the process of setting up a Python project...",
)
if os.path.exists(CREW_TASKS_OUTPUT_FILE):
os.remove(CREW_TASKS_OUTPUT_FILE)
with pytest.raises(ValueError):
crew.replay_from_task(str(task.id))
# Check if this runs without raising an exception
try:
db_handler = TaskOutputStorageHandler()
db_handler.load()
assert True # If we reach this point, no exception was raised
except Exception as e:
pytest.fail(f"An exception was raised: {str(e)}")
@pytest.mark.vcr(filter_headers=["authorization"])
@@ -2018,20 +1984,16 @@ def test_replay_task_with_context():
]
crew.kickoff()
db_handler = TaskOutputStorageHandler()
assert db_handler.load() != []
# Check if the crew_tasks_output.json file is created
assert os.path.exists(CREW_TASKS_OUTPUT_FILE)
# Replay task4 and ensure it uses task1's context properly
with patch.object(Task, "execute_sync") as mock_replay_task:
mock_replay_task.return_value = mock_task_output4
replayed_output = crew.replay_from_task(str(task4.id))
assert replayed_output.raw == "Presentation on AI advancements..."
# Clean up the file after test
if os.path.exists(CREW_TASKS_OUTPUT_FILE):
os.remove(CREW_TASKS_OUTPUT_FILE)
db_handler.reset()
def test_replay_from_task_with_context():
@@ -2056,7 +2018,7 @@ def test_replay_from_task_with_context():
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
with patch(
"crewai.utilities.task_output_handler.TaskOutputJsonHandler.load",
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
return_value=[
{
"task_id": str(task1.id),
@@ -2114,7 +2076,7 @@ def test_replay_with_invalid_task_id():
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
with patch(
"crewai.utilities.task_output_handler.TaskOutputJsonHandler.load",
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
return_value=[
{
"task_id": str(task1.id),
@@ -2176,7 +2138,7 @@ def test_replay_interpolates_inputs_properly(mock_interpolate_inputs):
crew.kickoff(inputs={"name": "John"})
with patch(
"crewai.utilities.task_output_handler.TaskOutputJsonHandler.load",
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
return_value=[
{
"task_id": str(task1.id),
@@ -2231,7 +2193,7 @@ def test_replay_from_task_setup_context():
task1.output = context_output
crew = Crew(agents=[agent], tasks=[task1, task2], process=Process.sequential)
with patch(
"crewai.utilities.task_output_handler.TaskOutputJsonHandler.load",
"crewai.utilities.task_output_storage_handler.TaskOutputStorageHandler.load",
return_value=[
{
"task_id": str(task1.id),