Compare commits

..

7 Commits

Author SHA1 Message Date
theCyberTech
9fbc602b3e Revert "Update CodeQL workflow to include custom config file"
This reverts commit 9c54bfce1b.
2025-09-26 15:15:43 +08:00
Greyson LaLonde
aa15b38d41 ci: add canary workflow trigger for branch testing 2025-09-23 23:57:04 -04:00
theCyberTech
9c54bfce1b Update CodeQL workflow to include custom config file
This commit adds a reference to a custom CodeQL configuration file (.github/codeql-config.yml) in the GitHub Actions workflow for CodeQL analysis. This enhancement allows for more tailored queries and analysis settings during the code scanning process.
2025-09-24 00:21:31 +08:00
theCyberTech
2c80ac6283 Add Canary Crew for Github Action
Initial commit for the Canary Crew project using crewAI. Includes workflow for GitHub Actions, project configuration, agent and task YAML files, main execution and utility scripts, a custom tool example, user knowledge file, and documentation. Enables multi-agent AI research and reporting with markdown output.
2025-09-22 15:23:26 +08:00
Vini Brasil
aa8dc9d77f Add source to LLM Guardrail events (#3572)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
This commit adds the source attribute to LLM Guardrail event calls to
identify the Lite Agent or Task that executed the guardrail.
2025-09-22 11:58:00 +09:00
Jonathan Hill
9c1096dbdc fix: Make 'ready' parameter optional in _create_reasoning_plan function (#3561)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix: Make 'ready' parameter optional in _create_reasoning_plan function

This PR fixes Issue #3466 where the _create_reasoning_plan function was missing
the 'ready' parameter when called by the LLM. The fix makes the 'ready' parameter
optional with a default value of False, which allows the function to be called
with only the 'plan' argument.

Fixes #3466

* Change default value of 'ready' parameter to True

---------

Co-authored-by: João Moura <joaomdmoura@gmail.com>
2025-09-20 22:57:18 -03:00
João Moura
47044450c0 Adding fallback to crew settings (#3562)
* Adding fallback to crew settings

* fix: resolve ruff and mypy issues in cli/config.py

---------

Co-authored-by: Greyson Lalonde <greyson.r.lalonde@gmail.com>
2025-09-20 22:54:36 -03:00
20 changed files with 4025 additions and 165 deletions

50
.github/workflows/canary.yml vendored Normal file
View File

@@ -0,0 +1,50 @@
name: Canary Crew Check
on:
push:
branches:
- main
- Canary-Crew-Github-Action
pull_request:
branches:
- main
permissions:
contents: read
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
jobs:
canary-run:
name: Run Canary Crew
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: "3.11"
- name: Install canary dependencies
working-directory: canary
run: uv sync
- name: Run canary crew
working-directory: canary
run: uv run crewai run
- name: Upload canary report
if: always()
uses: actions/upload-artifact@v4
with:
name: canary-report
path: canary/report.md
if-no-files-found: ignore

5
canary/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.env
__pycache__/
.DS_Store
report.md

54
canary/README.md Normal file
View File

@@ -0,0 +1,54 @@
# Canary Crew
Welcome to the Canary Crew project, powered by [crewAI](https://crewai.com). This template is designed to help you set up a multi-agent AI system with ease, leveraging the powerful and flexible framework provided by crewAI. Our goal is to enable your agents to collaborate effectively on complex tasks, maximizing their collective intelligence and capabilities.
## Installation
Ensure you have Python >=3.10 <3.13 installed on your system. This project uses [UV](https://docs.astral.sh/uv/) for dependency management and package handling, offering a seamless setup and execution experience.
First, if you haven't already, install uv:
```bash
pip install uv
```
Next, navigate to your project directory and install the dependencies:
(Optional) Lock the dependencies and install them by using the CLI command:
```bash
crewai install
```
### Customizing
**Add your `OPENAI_API_KEY` into the `.env` file**
- Modify `src/canary/config/agents.yaml` to define your agents
- Modify `src/canary/config/tasks.yaml` to define your tasks
- Modify `src/canary/crew.py` to add your own logic, tools and specific args
- Modify `src/canary/main.py` to add custom inputs for your agents and tasks
## Running the Project
To kickstart your crew of AI agents and begin task execution, run this from the root folder of your project:
```bash
$ crewai run
```
This command initializes the canary Crew, assembling the agents and assigning them tasks as defined in your configuration.
This example, unmodified, will run the create a `report.md` file with the output of a research on LLMs in the root folder.
## Understanding Your Crew
The canary Crew is composed of multiple AI agents, each with unique roles, goals, and tools. These agents collaborate on a series of tasks, defined in `config/tasks.yaml`, leveraging their collective skills to achieve complex objectives. The `config/agents.yaml` file outlines the capabilities and configurations of each agent in your crew.
## Support
For support, questions, or feedback regarding the Canary Crew or crewAI.
- Visit our [documentation](https://docs.crewai.com)
- Reach out to us through our [GitHub repository](https://github.com/joaomdmoura/crewai)
- [Join our Discord](https://discord.com/invite/X4JWnZnxPb)
- [Chat with our docs](https://chatg.pt/DWjSBZn)
Let's create wonders together with the power and simplicity of crewAI.

View File

@@ -0,0 +1,4 @@
User name is John Doe.
User is an AI Engineer.
User is interested in AI Agents.
User is based in San Francisco, California.

23
canary/pyproject.toml Normal file
View File

@@ -0,0 +1,23 @@
[project]
name = "canary"
version = "0.1.0"
description = "canary using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.120.1,<1.0.0"
]
[project.scripts]
canary = "canary.main:run"
run_crew = "canary.main:run"
train = "canary.main:train"
replay = "canary.main:replay"
test = "canary.main:test"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.crewai]
type = "crew"

View File

View File

@@ -0,0 +1,19 @@
researcher:
role: >
{topic} Senior Data Researcher
goal: >
Uncover cutting-edge developments in {topic}
backstory: >
You're a seasoned researcher with a knack for uncovering the latest
developments in {topic}. Known for your ability to find the most relevant
information and present it in a clear and concise manner.
reporting_analyst:
role: >
{topic} Reporting Analyst
goal: >
Create detailed reports based on {topic} data analysis and research findings
backstory: >
You're a meticulous analyst with a keen eye for detail. You're known for
your ability to turn complex data into clear and concise reports, making
it easy for others to understand and act on the information you provide.

View File

@@ -0,0 +1,17 @@
research_task:
description: >
Conduct a thorough research about {topic}
Make sure you find any interesting and relevant information given
the current year is {current_year}.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
reporting_task:
description: >
Review the context you got and expand each topic into a full section for a report.
Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledged report with the main topics, each with a full section of information.
Formatted as markdown without '```'
agent: reporting_analyst

64
canary/src/canary/crew.py Normal file
View File

@@ -0,0 +1,64 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
from crewai.agents.agent_builder.base_agent import BaseAgent
from typing import List
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class Canary():
"""Canary crew"""
agents: List[BaseAgent]
tasks: List[Task]
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'], # type: ignore[index]
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'], # type: ignore[index]
verbose=True
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'], # type: ignore[index]
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'], # type: ignore[index]
output_file='report.md'
)
@crew
def crew(self) -> Crew:
"""Creates the Canary crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

68
canary/src/canary/main.py Normal file
View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python
import sys
import warnings
from datetime import datetime
from canary.crew import Canary
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
# This main file is intended to be a way for you to run your
# crew locally, so refrain from adding unnecessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information
def run():
"""
Run the crew.
"""
inputs = {
'topic': 'AI LLMs',
'current_year': str(datetime.now().year)
}
try:
Canary().crew().kickoff(inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while running the crew: {e}")
def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {
"topic": "AI LLMs",
'current_year': str(datetime.now().year)
}
try:
Canary().crew().train(n_iterations=int(sys.argv[1]), filename=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")
def replay():
"""
Replay the crew execution from a specific task.
"""
try:
Canary().crew().replay(task_id=sys.argv[1])
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
def test():
"""
Test the crew execution and returns the results.
"""
inputs = {
"topic": "AI LLMs",
"current_year": str(datetime.now().year)
}
try:
Canary().crew().test(n_iterations=int(sys.argv[1]), eval_llm=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while testing the crew: {e}")

View File

View File

@@ -0,0 +1,19 @@
from crewai.tools import BaseTool
from typing import Type
from pydantic import BaseModel, Field
class MyCustomToolInput(BaseModel):
"""Input schema for MyCustomTool."""
argument: str = Field(..., description="Description of the argument.")
class MyCustomTool(BaseTool):
name: str = "Name of my tool"
description: str = (
"Clear description for what this tool is useful for, your agent will need this information to use it."
)
args_schema: Type[BaseModel] = MyCustomToolInput
def _run(self, argument: str) -> str:
# Implementation goes here
return "this is an example of a tool output, ignore it and move along."

3513
canary/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,6 @@
import json
import tempfile
from logging import getLogger
from pathlib import Path
from pydantic import BaseModel, Field
@@ -12,8 +14,48 @@ from crewai.cli.constants import (
)
from crewai.cli.shared.token_manager import TokenManager
logger = getLogger(__name__)
DEFAULT_CONFIG_PATH = Path.home() / ".config" / "crewai" / "settings.json"
def get_writable_config_path() -> Path | None:
"""
Find a writable location for the config file with fallback options.
Tries in order:
1. Default: ~/.config/crewai/settings.json
2. Temp directory: /tmp/crewai_settings.json (or OS equivalent)
3. Current directory: ./crewai_settings.json
4. In-memory only (returns None)
Returns:
Path object for writable config location, or None if no writable location found
"""
fallback_paths = [
DEFAULT_CONFIG_PATH, # Default location
Path(tempfile.gettempdir()) / "crewai_settings.json", # Temporary directory
Path.cwd() / "crewai_settings.json", # Current working directory
]
for config_path in fallback_paths:
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
test_file = config_path.parent / ".crewai_write_test"
try:
test_file.write_text("test")
test_file.unlink() # Clean up test file
logger.info(f"Using config path: {config_path}")
return config_path
except Exception: # noqa: S112
continue
except Exception: # noqa: S112
continue
return None
# Settings that are related to the user's account
USER_SETTINGS_KEYS = [
"tool_repository_username",
@@ -93,16 +135,32 @@ class Settings(BaseModel):
default=DEFAULT_CLI_SETTINGS["oauth2_domain"],
)
def __init__(self, config_path: Path = DEFAULT_CONFIG_PATH, **data):
"""Load Settings from config path"""
config_path.parent.mkdir(parents=True, exist_ok=True)
def __init__(self, config_path: Path | None = None, **data):
"""Load Settings from config path with fallback support"""
if config_path is None:
config_path = get_writable_config_path()
# If config_path is None, we're in memory-only mode
if config_path is None:
merged_data = {**data}
# Dummy path for memory-only mode
super().__init__(config_path=Path("/dev/null"), **merged_data)
return
try:
config_path.parent.mkdir(parents=True, exist_ok=True)
except Exception:
merged_data = {**data}
# Dummy path for memory-only mode
super().__init__(config_path=Path("/dev/null"), **merged_data)
return
file_data = {}
if config_path.is_file():
try:
with config_path.open("r") as f:
file_data = json.load(f)
except json.JSONDecodeError:
except Exception:
file_data = {}
merged_data = {**file_data, **data}
@@ -122,15 +180,22 @@ class Settings(BaseModel):
def dump(self) -> None:
"""Save current settings to settings.json"""
if self.config_path.is_file():
with self.config_path.open("r") as f:
existing_data = json.load(f)
else:
existing_data = {}
if str(self.config_path) == "/dev/null":
return
updated_data = {**existing_data, **self.model_dump(exclude_unset=True)}
with self.config_path.open("w") as f:
json.dump(updated_data, f, indent=4)
try:
if self.config_path.is_file():
with self.config_path.open("r") as f:
existing_data = json.load(f)
else:
existing_data = {}
updated_data = {**existing_data, **self.model_dump(exclude_unset=True)}
with self.config_path.open("w") as f:
json.dump(updated_data, f, indent=4)
except Exception: # noqa: S110
pass
def _reset_user_settings(self) -> None:
"""Reset all user settings to default values"""

View File

@@ -367,6 +367,7 @@ class LiteAgent(FlowTrackable, BaseModel):
output=output,
guardrail=self._guardrail,
retry_count=self._guardrail_retry_count,
event_source=self,
)
if not guardrail_result.success:

View File

@@ -5,20 +5,14 @@ import logging
import threading
import uuid
import warnings
from collections.abc import Callable
from concurrent.futures import Future
from copy import copy
from hashlib import md5
from pathlib import Path
from typing import (
Any,
Callable,
ClassVar,
Dict,
List,
Optional,
Set,
Tuple,
Type,
Union,
get_args,
get_origin,
@@ -35,20 +29,20 @@ from pydantic import (
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_types import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.security import Fingerprint, SecurityConfig
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.guardrail import process_guardrail, GuardrailResult
from crewai.utilities.converter import Converter, convert_to_model
from crewai.events.event_types import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.utilities.guardrail import process_guardrail
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.string_utils import interpolate_only
@@ -85,50 +79,50 @@ class Task(BaseModel):
tools_errors: int = 0
delegations: int = 0
i18n: I18N = I18N()
name: Optional[str] = Field(default=None)
prompt_context: Optional[str] = None
name: str | None = Field(default=None)
prompt_context: str | None = None
description: str = Field(description="Description of the actual task.")
expected_output: str = Field(
description="Clear definition of expected output for the task."
)
config: Optional[Dict[str, Any]] = Field(
config: dict[str, Any] | None = Field(
description="Configuration for the agent",
default=None,
)
callback: Optional[Any] = Field(
callback: Any | None = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[BaseAgent] = Field(
agent: BaseAgent | None = Field(
description="Agent responsible for execution the task.", default=None
)
context: Union[List["Task"], None, _NotSpecified] = Field(
context: list["Task"] | None | _NotSpecified = Field(
description="Other tasks that will have their output used as context for this task.",
default=NOT_SPECIFIED,
)
async_execution: Optional[bool] = Field(
async_execution: bool | None = Field(
description="Whether the task should be executed asynchronously or not.",
default=False,
)
output_json: Optional[Type[BaseModel]] = Field(
output_json: type[BaseModel] | None = Field(
description="A Pydantic model to be used to create a JSON output.",
default=None,
)
output_pydantic: Optional[Type[BaseModel]] = Field(
output_pydantic: type[BaseModel] | None = Field(
description="A Pydantic model to be used to create a Pydantic output.",
default=None,
)
output_file: Optional[str] = Field(
output_file: str | None = Field(
description="A file path to be used to create a file output.",
default=None,
)
create_directory: Optional[bool] = Field(
create_directory: bool | None = Field(
description="Whether to create the directory for output_file if it doesn't exist.",
default=True,
)
output: Optional[TaskOutput] = Field(
output: TaskOutput | None = Field(
description="Task output, it's final result after being executed", default=None
)
tools: Optional[List[BaseTool]] = Field(
tools: list[BaseTool] | None = Field(
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
@@ -141,24 +135,24 @@ class Task(BaseModel):
frozen=True,
description="Unique identifier for the object, not set by user.",
)
human_input: Optional[bool] = Field(
human_input: bool | None = Field(
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
markdown: Optional[bool] = Field(
markdown: bool | None = Field(
description="Whether the task should instruct the agent to return the final answer formatted in Markdown",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
converter_cls: type[Converter] | None = Field(
description="A converter class used to export structured output",
default=None,
)
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Union[Callable[[TaskOutput], Tuple[bool, Any]], str]] = Field(
processed_by_agents: set[str] = Field(default_factory=set)
guardrail: Callable[[TaskOutput], tuple[bool, Any]] | str | None = Field(
default=None,
description="Function or string description of a guardrail to validate task output before proceeding to next task",
)
max_retries: Optional[int] = Field(
max_retries: int | None = Field(
default=None,
description="[DEPRECATED] Maximum number of retries when guardrail fails. Use guardrail_max_retries instead. Will be removed in v1.0.0",
)
@@ -166,13 +160,13 @@ class Task(BaseModel):
default=3, description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(default=0, description="Current number of retries")
start_time: Optional[datetime.datetime] = Field(
start_time: datetime.datetime | None = Field(
default=None, description="Start time of the task execution"
)
end_time: Optional[datetime.datetime] = Field(
end_time: datetime.datetime | None = Field(
default=None, description="End time of the task execution"
)
allow_crewai_trigger_context: Optional[bool] = Field(
allow_crewai_trigger_context: bool | None = Field(
default=None,
description="Whether this task should append 'Trigger Payload: {crewai_trigger_payload}' to the task description when crewai_trigger_payload exists in crew inputs.",
)
@@ -181,8 +175,8 @@ class Task(BaseModel):
@field_validator("guardrail")
@classmethod
def validate_guardrail_function(
cls, v: Optional[str | Callable]
) -> Optional[str | Callable]:
cls, v: str | Callable | None
) -> str | Callable | None:
"""
If v is a callable, validate that the guardrail function has the correct signature and behavior.
If v is a string, return it as is.
@@ -229,7 +223,7 @@ class Task(BaseModel):
return_annotation_args[1] is Any
or return_annotation_args[1] is str
or return_annotation_args[1] is TaskOutput
or return_annotation_args[1] == Union[str, TaskOutput]
or return_annotation_args[1] == str | TaskOutput
)
):
raise ValueError(
@@ -237,11 +231,11 @@ class Task(BaseModel):
)
return v
_guardrail: Optional[Callable] = PrivateAttr(default=None)
_original_description: Optional[str] = PrivateAttr(default=None)
_original_expected_output: Optional[str] = PrivateAttr(default=None)
_original_output_file: Optional[str] = PrivateAttr(default=None)
_thread: Optional[threading.Thread] = PrivateAttr(default=None)
_guardrail: Callable | None = PrivateAttr(default=None)
_original_description: str | None = PrivateAttr(default=None)
_original_expected_output: str | None = PrivateAttr(default=None)
_original_output_file: str | None = PrivateAttr(default=None)
_thread: threading.Thread | None = PrivateAttr(default=None)
@model_validator(mode="before")
@classmethod
@@ -265,7 +259,9 @@ class Task(BaseModel):
elif isinstance(self.guardrail, str):
from crewai.tasks.llm_guardrail import LLMGuardrail
assert self.agent is not None
if self.agent is None:
raise ValueError("Agent is required to use LLMGuardrail")
self._guardrail = LLMGuardrail(
description=self.guardrail, llm=self.agent.llm
)
@@ -274,7 +270,7 @@ class Task(BaseModel):
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
def _deny_user_set_id(cls, v: UUID4 | None) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
@@ -282,7 +278,7 @@ class Task(BaseModel):
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
def output_file_validation(cls, value: str | None) -> str | None:
"""Validate the output file path.
Args:
@@ -307,7 +303,7 @@ class Task(BaseModel):
)
# Check for shell expansion first
if value.startswith("~") or value.startswith("$"):
if value.startswith(("~", "$")):
raise ValueError(
"Shell expansion characters are not allowed in output_file paths"
)
@@ -373,9 +369,9 @@ class Task(BaseModel):
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
agent: BaseAgent | None = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> TaskOutput:
"""Execute the task synchronously."""
return self._execute_core(agent, context, tools)
@@ -397,8 +393,8 @@ class Task(BaseModel):
def execute_async(
self,
agent: BaseAgent | None = None,
context: Optional[str] = None,
tools: Optional[List[BaseTool]] = None,
context: str | None = None,
tools: list[BaseTool] | None = None,
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
@@ -411,9 +407,9 @@ class Task(BaseModel):
def _execute_task_async(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
@@ -422,9 +418,9 @@ class Task(BaseModel):
def _execute_core(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
agent: BaseAgent | None,
context: str | None,
tools: list[Any] | None,
) -> TaskOutput:
"""Run the core execution logic of the task."""
try:
@@ -465,6 +461,7 @@ class Task(BaseModel):
output=task_output,
guardrail=self._guardrail,
retry_count=self.retry_count,
event_source=self,
)
if not guardrail_result.success:
if self.retry_count >= self.guardrail_max_retries:
@@ -528,41 +525,6 @@ class Task(BaseModel):
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
raise e # Re-raise the exception after emitting the event
def _process_guardrail(self, task_output: TaskOutput) -> GuardrailResult:
assert self._guardrail is not None
from crewai.events.event_types import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus.emit(
self,
LLMGuardrailStartedEvent(
guardrail=self._guardrail, retry_count=self.retry_count
),
)
try:
result = self._guardrail(task_output)
guardrail_result = GuardrailResult.from_tuple(result)
except Exception as e:
guardrail_result = GuardrailResult(
success=False, result=None, error=f"Guardrail execution error: {str(e)}"
)
crewai_event_bus.emit(
self,
LLMGuardrailCompletedEvent(
success=guardrail_result.success,
result=guardrail_result.result,
error=guardrail_result.error,
retry_count=self.retry_count,
),
)
return guardrail_result
def prompt(self) -> str:
"""Generates the task prompt with optional markdown formatting.
@@ -604,7 +566,7 @@ Follow these guidelines:
return "\n".join(tasks_slices)
def interpolate_inputs_and_add_conversation_history(
self, inputs: Dict[str, Union[str, int, float, Dict[str, Any], List[Any]]]
self, inputs: dict[str, str | int | float | dict[str, Any] | list[Any]]
) -> None:
"""Interpolate inputs into the task description, expected output, and output file path.
Add conversation history if present.
@@ -635,14 +597,14 @@ Follow these guidelines:
f"Missing required template variable '{e.args[0]}' in description"
) from e
except ValueError as e:
raise ValueError(f"Error interpolating description: {str(e)}") from e
raise ValueError(f"Error interpolating description: {e!s}") from e
try:
self.expected_output = interpolate_only(
input_string=self._original_expected_output, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating expected_output: {str(e)}") from e
raise ValueError(f"Error interpolating expected_output: {e!s}") from e
if self.output_file is not None:
try:
@@ -650,11 +612,9 @@ Follow these guidelines:
input_string=self._original_output_file, inputs=inputs
)
except (KeyError, ValueError) as e:
raise ValueError(
f"Error interpolating output_file path: {str(e)}"
) from e
raise ValueError(f"Error interpolating output_file path: {e!s}") from e
if "crew_chat_messages" in inputs and inputs["crew_chat_messages"]:
if inputs.get("crew_chat_messages"):
conversation_instruction = self.i18n.slice(
"conversation_history_instruction"
)
@@ -681,14 +641,14 @@ Follow these guidelines:
"""Increment the tools errors counter."""
self.tools_errors += 1
def increment_delegations(self, agent_name: Optional[str]) -> None:
def increment_delegations(self, agent_name: str | None) -> None:
"""Increment the delegations counter."""
if agent_name:
self.processed_by_agents.add(agent_name)
self.delegations += 1
def copy(
self, agents: List["BaseAgent"], task_mapping: Dict[str, "Task"]
def copy( # type: ignore
self, agents: list["BaseAgent"], task_mapping: dict[str, "Task"]
) -> "Task":
"""Creates a deep copy of the Task while preserving its original class type.
@@ -721,20 +681,18 @@ Follow these guidelines:
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = self.__class__(
return self.__class__(
**copied_data,
context=cloned_context,
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
def _export_output(
self, result: str
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
pydantic_output: Optional[BaseModel] = None
json_output: Optional[Dict[str, Any]] = None
) -> tuple[BaseModel | None, dict[str, Any] | None]:
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
if self.output_pydantic or self.output_json:
model_output = convert_to_model(
@@ -764,7 +722,7 @@ Follow these guidelines:
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _save_file(self, result: Union[Dict, str, Any]) -> None:
def _save_file(self, result: dict | str | Any) -> None:
"""Save task output to a file.
Note:
@@ -785,7 +743,7 @@ Follow these guidelines:
if self.output_file is None:
raise ValueError("output_file is not set.")
FILEWRITER_RECOMMENDATION = (
filewriter_recommendation = (
"For cross-platform file writing, especially on Windows, "
"use FileWriterTool from crewai_tools package."
)
@@ -811,10 +769,10 @@ Follow these guidelines:
except (OSError, IOError) as e:
raise RuntimeError(
"\n".join(
[f"Failed to save output file: {e}", FILEWRITER_RECOMMENDATION]
[f"Failed to save output file: {e}", filewriter_recommendation]
)
)
return None
) from e
return
def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})"

View File

@@ -1,4 +1,5 @@
from typing import Any, Callable, Optional, Tuple, Union
from collections.abc import Callable
from typing import Any
from pydantic import BaseModel, field_validator
@@ -17,8 +18,8 @@ class GuardrailResult(BaseModel):
"""
success: bool
result: Optional[Any] = None
error: Optional[str] = None
result: Any | None = None
error: str | None = None
@field_validator("result", "error")
@classmethod
@@ -36,7 +37,7 @@ class GuardrailResult(BaseModel):
return v
@classmethod
def from_tuple(cls, result: Tuple[bool, Union[Any, str]]) -> "GuardrailResult":
def from_tuple(cls, result: tuple[bool, Any | str]) -> "GuardrailResult":
"""Create a GuardrailResult from a validation tuple.
Args:
@@ -55,33 +56,27 @@ class GuardrailResult(BaseModel):
def process_guardrail(
output: Any, guardrail: Callable, retry_count: int
output: Any, guardrail: Callable, retry_count: int, event_source: Any | None = None
) -> GuardrailResult:
"""Process the guardrail for the agent output.
Args:
output: The output to validate with the guardrail
guardrail: The guardrail to validate the output with
retry_count: The number of times the guardrail has been retried
event_source: The source of the guardrail to be sent in events
Returns:
GuardrailResult: The result of the guardrail validation
"""
from crewai.task import TaskOutput
from crewai.lite_agent import LiteAgentOutput
assert isinstance(output, TaskOutput) or isinstance(
output, LiteAgentOutput
), "Output must be a TaskOutput or LiteAgentOutput"
assert guardrail is not None
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.llm_guardrail_events import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
crewai_event_bus.emit(
None,
event_source,
LLMGuardrailStartedEvent(guardrail=guardrail, retry_count=retry_count),
)
@@ -89,7 +84,7 @@ def process_guardrail(
guardrail_result = GuardrailResult.from_tuple(result)
crewai_event_bus.emit(
None,
event_source,
LLMGuardrailCompletedEvent(
success=guardrail_result.success,
result=guardrail_result.result,

View File

@@ -259,7 +259,7 @@ class AgentReasoning:
)
# Prepare a simple callable that just returns the tool arguments as JSON
def _create_reasoning_plan(plan: str, ready: bool): # noqa: N802
def _create_reasoning_plan(plan: str, ready: bool = True): # noqa: N802
"""Return the reasoning plan result in JSON string form."""
return json.dumps({"plan": plan, "ready": ready})

View File

@@ -1,4 +1,3 @@
# ruff: noqa: S101
# mypy: ignore-errors
from collections import defaultdict
from typing import cast
@@ -329,23 +328,27 @@ def test_guardrail_is_called_using_string():
LLMGuardrailStartedEvent,
)
agent = Agent(
role="Sports Analyst",
goal="Gather information about the best soccer players",
backstory="""You are an expert at gathering and organizing information. You carefully collect details and present them in a structured way.""",
guardrail="""Only include Brazilian players, both women and men""",
)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMGuardrailStartedEvent)
def capture_guardrail_started(source, event):
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
guardrail_events["started"].append(event)
@crewai_event_bus.on(LLMGuardrailCompletedEvent)
def capture_guardrail_completed(source, event):
assert isinstance(source, LiteAgent)
assert source.original_agent == agent
guardrail_events["completed"].append(event)
agent = Agent(
role="Sports Analyst",
goal="Gather information about the best soccer players",
backstory="""You are an expert at gathering and organizing information. You carefully collect details and present them in a structured way.""",
guardrail="""Only include Brazilian players, both women and men""",
)
result = agent.kickoff(messages="Top 10 best players in the world?")
assert len(guardrail_events["started"]) == 2

View File

@@ -3,15 +3,15 @@ from unittest.mock import Mock, patch
import pytest
from crewai import Agent, Task
from crewai.llm import LLM
from crewai.tasks.hallucination_guardrail import HallucinationGuardrail
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.task_output import TaskOutput
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_types import (
LLMGuardrailCompletedEvent,
LLMGuardrailStartedEvent,
)
from crewai.events.event_bus import crewai_event_bus
from crewai.llm import LLM
from crewai.tasks.hallucination_guardrail import HallucinationGuardrail
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.task_output import TaskOutput
def test_task_without_guardrail():
@@ -177,16 +177,25 @@ def test_guardrail_emits_events(sample_agent):
started_guardrail = []
completed_guardrail = []
task = Task(
description="Gather information about available books on the First World War",
agent=sample_agent,
expected_output="A list of available books on the First World War",
guardrail="Ensure the authors are from Italy",
)
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMGuardrailStartedEvent)
def handle_guardrail_started(source, event):
assert source == task
started_guardrail.append(
{"guardrail": event.guardrail, "retry_count": event.retry_count}
)
@crewai_event_bus.on(LLMGuardrailCompletedEvent)
def handle_guardrail_completed(source, event):
assert source == task
completed_guardrail.append(
{
"success": event.success,
@@ -196,13 +205,6 @@ def test_guardrail_emits_events(sample_agent):
}
)
task = Task(
description="Gather information about available books on the First World War",
agent=sample_agent,
expected_output="A list of available books on the First World War",
guardrail="Ensure the authors are from Italy",
)
result = task.execute_sync(agent=sample_agent)
def custom_guardrail(result: TaskOutput):