Compare commits

...

12 Commits

Author SHA1 Message Date
Devin AI
32a013eb2f Fix trailing whitespace to satisfy ruff lint
Co-Authored-By: João <joao@crewai.com>
2025-10-20 19:30:30 +00:00
Devin AI
36673f89e7 Fix google-generativeai embedder validation error (issue #3741)
This commit fixes the validation error that occurred when using the
google-generativeai embedder provider with a flat configuration format.

Changes:
1. Made the 'config' field optional in GenerativeAiProviderSpec by adding
   'total=False' and marking 'provider' as Required, consistent with other
   provider specs like VertexAIProviderSpec.

2. Added normalization in the Crew class to automatically convert flat
   embedder configs to nested format before validation. This allows users
   to use either format:
   - Flat: {'provider': 'google-generativeai', 'api_key': '...', 'model_name': '...'}
   - Nested: {'provider': 'google-generativeai', 'config': {'api_key': '...', 'model_name': '...'}}

3. Updated the embedder factory to support both flat and nested config
   formats by checking for the presence of 'config' key and extracting
   config fields accordingly.

4. Added comprehensive tests to verify both formats work correctly:
   - Test for flat config format (the issue reported in #3741)
   - Test for nested config format (recommended format)
   - Test for TypedDict validation

Fixes #3741

Co-Authored-By: João <joao@crewai.com>
2025-10-20 19:26:08 +00:00
Greyson LaLonde
42f2b4d551 fix: preserve nested condition structure in Flow decorators
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Fixes nested boolean conditions being flattened in @listen, @start, and @router decorators. The or_() and and_() combinators now preserve their nested structure using a "conditions" key instead of flattening to a list. Added recursive evaluation logic to properly handle complex patterns like or_(and_(A, B), and_(C, D)).
2025-10-17 17:06:19 -04:00
Greyson LaLonde
0229390ad1 fix: add standard print parameters to Printer.print method
- Adds sep, end, file, and flush parameters to match Python's built-in print function signature.
2025-10-17 15:27:22 -04:00
Vidit Ostwal
f0fb349ddf Fixing copy and adding NOT_SPECIFIED check in task.py (#3690)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
* Fixing copy and adding NOT_SPECIFIED check:

* Fixed mypy issues

* Added test Cases

* added linting checks

* Removed the docs bot folder

* Fixed ruff checks

* Remove secret_folder from tracking

---------

Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-10-14 09:52:39 -07:00
João Moura
bf2e2a42da fix: don't error out if there it no input() available
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Specific to jupyter notebooks
2025-10-13 22:36:19 -04:00
Lorenze Jay
814c962196 chore: update crewAI version to 0.203.1 in multiple templates (#3699)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Update Test Durations / update-durations (3.10) (push) Has been cancelled
Update Test Durations / update-durations (3.11) (push) Has been cancelled
Update Test Durations / update-durations (3.12) (push) Has been cancelled
Update Test Durations / update-durations (3.13) (push) Has been cancelled
- Bumped the `crewai` version in `__init__.py` to 0.203.1.
- Updated the dependency versions in the crew, flow, and tool templates' `pyproject.toml` files to reflect the new `crewai` version.
2025-10-13 11:46:22 -07:00
Heitor Carvalho
2ebb2e845f fix: add a leeway of 10s when decoding jwt (#3698) 2025-10-13 12:42:03 -03:00
Greyson LaLonde
7b550ebfe8 fix: inject tool repository credentials in crewai run command
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2025-10-10 15:00:04 -04:00
Greyson LaLonde
29919c2d81 fix: revert bad cron sched
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
This reverts commit b71c88814f.
2025-10-09 13:52:25 -04:00
Greyson LaLonde
b71c88814f fix: correct cron schedule to run every 5 days at specific dates 2025-10-09 13:10:45 -04:00
Rip&Tear
cb8bcfe214 docs: update security policy for vulnerability reporting
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
- Revised the security policy to clarify the reporting process for vulnerabilities.
- Added detailed sections on scope, reporting requirements, and our commitment to addressing reported issues.
- Emphasized the importance of not disclosing vulnerabilities publicly and provided guidance on how to report them securely.
- Included a new section on coordinated disclosure and safe harbor provisions for ethical reporting.

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2025-10-09 00:57:57 -04:00
20 changed files with 4003 additions and 3579 deletions

63
.github/security.md vendored
View File

@@ -1,27 +1,50 @@
## CrewAI Security Vulnerability Reporting Policy
## CrewAI Security Policy
CrewAI prioritizes the security of our software products, services, and GitHub repositories. To promptly address vulnerabilities, follow these steps for reporting security issues:
We are committed to protecting the confidentiality, integrity, and availability of the CrewAI ecosystem. This policy explains how to report potential vulnerabilities and what you can expect from us when you do.
### Reporting Process
Do **not** report vulnerabilities via public GitHub issues.
### Scope
Email all vulnerability reports directly to:
**security@crewai.com**
We welcome reports for vulnerabilities that could impact:
### Required Information
To help us quickly validate and remediate the issue, your report must include:
- CrewAI-maintained source code and repositories
- CrewAI-operated infrastructure and services
- Official CrewAI releases, packages, and distributions
- **Vulnerability Type:** Clearly state the vulnerability type (e.g., SQL injection, XSS, privilege escalation).
- **Affected Source Code:** Provide full file paths and direct URLs (branch, tag, or commit).
- **Reproduction Steps:** Include detailed, step-by-step instructions. Screenshots are recommended.
- **Special Configuration:** Document any special settings or configurations required to reproduce.
- **Proof-of-Concept (PoC):** Provide exploit or PoC code (if available).
- **Impact Assessment:** Clearly explain the severity and potential exploitation scenarios.
Issues affecting clearly unaffiliated third-party services or user-generated content are out of scope, unless you can demonstrate a direct impact on CrewAI systems or customers.
### Our Response
- We will acknowledge receipt of your report promptly via your provided email.
- Confirmed vulnerabilities will receive priority remediation based on severity.
- Patches will be released as swiftly as possible following verification.
### How to Report
### Reward Notice
Currently, we do not offer a bug bounty program. Rewards, if issued, are discretionary.
- **Please do not** disclose vulnerabilities via public GitHub issues, pull requests, or social media.
- Email detailed reports to **security@crewai.com** with the subject line `Security Report`.
- If you need to share large files or sensitive artifacts, mention it in your email and we will coordinate a secure transfer method.
### What to Include
Providing comprehensive information enables us to validate the issue quickly:
- **Vulnerability overview** — a concise description and classification (e.g., RCE, privilege escalation)
- **Affected components** — repository, branch, tag, or deployed service along with relevant file paths or endpoints
- **Reproduction steps** — detailed, step-by-step instructions; include logs, screenshots, or screen recordings when helpful
- **Proof-of-concept** — exploit details or code that demonstrates the impact (if available)
- **Impact analysis** — severity assessment, potential exploitation scenarios, and any prerequisites or special configurations
### Our Commitment
- **Acknowledgement:** We aim to acknowledge your report within two business days.
- **Communication:** We will keep you informed about triage results, remediation progress, and planned release timelines.
- **Resolution:** Confirmed vulnerabilities will be prioritized based on severity and fixed as quickly as possible.
- **Recognition:** We currently do not run a bug bounty program; any rewards or recognition are issued at CrewAI's discretion.
### Coordinated Disclosure
We ask that you allow us a reasonable window to investigate and remediate confirmed issues before any public disclosure. We will coordinate publication timelines with you whenever possible.
### Safe Harbor
We will not pursue or support legal action against individuals who, in good faith:
- Follow this policy and refrain from violating any applicable laws
- Avoid privacy violations, data destruction, or service disruption
- Limit testing to systems in scope and respect rate limits and terms of service
If you are unsure whether your testing is covered, please contact us at **security@crewai.com** before proceeding.

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "0.203.0"
__version__ = "0.203.1"
_telemetry_submitted = False

View File

@@ -30,6 +30,7 @@ def validate_jwt_token(
algorithms=["RS256"],
audience=audience,
issuer=issuer,
leeway=10.0,
options={
"verify_signature": True,
"verify_exp": True,

View File

@@ -1,10 +1,11 @@
import os
import subprocess
from enum import Enum
import click
from packaging import version
from crewai.cli.utils import read_toml
from crewai.cli.utils import build_env_with_tool_repository_credentials, read_toml
from crewai.cli.version import get_crewai_version
@@ -55,8 +56,22 @@ def execute_command(crew_type: CrewType) -> None:
"""
command = ["uv", "run", "kickoff" if crew_type == CrewType.FLOW else "run_crew"]
env = os.environ.copy()
try:
subprocess.run(command, capture_output=False, text=True, check=True) # noqa: S603
pyproject_data = read_toml()
sources = pyproject_data.get("tool", {}).get("uv", {}).get("sources", {})
for source_config in sources.values():
if isinstance(source_config, dict):
index = source_config.get("index")
if index:
index_env = build_env_with_tool_repository_credentials(index)
env.update(index_env)
except Exception: # noqa: S110
pass
try:
subprocess.run(command, capture_output=False, text=True, check=True, env=env) # noqa: S603
except subprocess.CalledProcessError as e:
handle_error(e, crew_type)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.203.0,<1.0.0"
"crewai[tools]>=0.203.1,<1.0.0"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.203.0,<1.0.0",
"crewai[tools]>=0.203.1,<1.0.0",
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]>=0.203.0"
"crewai[tools]>=0.203.1"
]
[tool.crewai]

View File

@@ -283,6 +283,30 @@ class Crew(FlowTrackable, BaseModel):
"may_not_set_field", "The 'id' field cannot be set by the user.", {}
)
@field_validator("embedder", mode="before")
@classmethod
def normalize_embedder_config(
cls, v: dict[str, Any] | None
) -> dict[str, Any] | None:
"""Normalize embedder config to support both flat and nested formats.
Args:
v: The embedder config to be normalized.
Returns:
The normalized embedder config with nested structure.
"""
if v is None or not isinstance(v, dict):
return v
if "provider" in v and "config" not in v:
provider = v["provider"]
config_fields = {k: val for k, val in v.items() if k != "provider"}
if config_fields:
return {"provider": provider, "config": config_fields}
return v
@field_validator("config", mode="before")
@classmethod
def check_config_type(cls, v: Json | dict[str, Any]) -> Json | dict[str, Any]:

View File

@@ -358,7 +358,8 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
try:
response = input().strip().lower()
result[0] = response in ["y", "yes"]
except (EOFError, KeyboardInterrupt):
except (EOFError, KeyboardInterrupt, OSError, LookupError):
# Handle all input-related errors silently
result[0] = False
input_thread = threading.Thread(target=get_input, daemon=True)
@@ -371,6 +372,7 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool:
return result[0]
except Exception:
# Suppress any warnings or errors and assume "no"
return False

View File

@@ -31,7 +31,7 @@ from crewai.flow.flow_visualizer import plot_flow
from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData
from crewai.flow.utils import get_possible_return_constants
from crewai.utilities.printer import Printer
from crewai.utilities.printer import Printer, PrinterColor
logger = logging.getLogger(__name__)
@@ -105,7 +105,7 @@ def start(condition: str | dict | Callable | None = None) -> Callable:
condition : Optional[Union[str, dict, Callable]], optional
Defines when the start method should execute. Can be:
- str: Name of a method that triggers this start
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
- dict: Result from or_() or and_(), including nested conditions
- Callable: A method reference that triggers this start
Default is None, meaning unconditional start.
@@ -140,13 +140,18 @@ def start(condition: str | dict | Callable | None = None) -> Callable:
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif isinstance(condition, dict) and "type" in condition:
if "conditions" in condition:
func.__trigger_condition__ = condition
func.__trigger_methods__ = _extract_all_methods(condition)
func.__condition_type__ = condition["type"]
elif "methods" in condition:
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
@@ -172,7 +177,7 @@ def listen(condition: str | dict | Callable) -> Callable:
condition : Union[str, dict, Callable]
Specifies when the listener should execute. Can be:
- str: Name of a method that triggers this listener
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
- dict: Result from or_() or and_(), including nested conditions
- Callable: A method reference that triggers this listener
Returns
@@ -200,13 +205,18 @@ def listen(condition: str | dict | Callable) -> Callable:
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif isinstance(condition, dict) and "type" in condition:
if "conditions" in condition:
func.__trigger_condition__ = condition
func.__trigger_methods__ = _extract_all_methods(condition)
func.__condition_type__ = condition["type"]
elif "methods" in condition:
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
@@ -233,7 +243,7 @@ def router(condition: str | dict | Callable) -> Callable:
condition : Union[str, dict, Callable]
Specifies when the router should execute. Can be:
- str: Name of a method that triggers this router
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
- dict: Result from or_() or and_(), including nested conditions
- Callable: A method reference that triggers this router
Returns
@@ -266,13 +276,18 @@ def router(condition: str | dict | Callable) -> Callable:
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif isinstance(condition, dict) and "type" in condition:
if "conditions" in condition:
func.__trigger_condition__ = condition
func.__trigger_methods__ = _extract_all_methods(condition)
func.__condition_type__ = condition["type"]
elif "methods" in condition:
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
@@ -298,14 +313,15 @@ def or_(*conditions: str | dict | Callable) -> dict:
*conditions : Union[str, dict, Callable]
Variable number of conditions that can be:
- str: Method names
- dict: Existing condition dictionaries
- dict: Existing condition dictionaries (nested conditions)
- Callable: Method references
Returns
-------
dict
A condition dictionary with format:
{"type": "OR", "methods": list_of_method_names}
{"type": "OR", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
Raises
------
@@ -317,18 +333,22 @@ def or_(*conditions: str | dict | Callable) -> dict:
>>> @listen(or_("success", "timeout"))
>>> def handle_completion(self):
... pass
>>> @listen(or_(and_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
methods = []
processed_conditions: list[str | dict[str, Any]] = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
methods.extend(condition["methods"])
if isinstance(condition, dict):
processed_conditions.append(condition)
elif isinstance(condition, str):
methods.append(condition)
processed_conditions.append(condition)
elif callable(condition):
methods.append(getattr(condition, "__name__", repr(condition)))
processed_conditions.append(getattr(condition, "__name__", repr(condition)))
else:
raise ValueError("Invalid condition in or_()")
return {"type": "OR", "methods": methods}
return {"type": "OR", "conditions": processed_conditions}
def and_(*conditions: str | dict | Callable) -> dict:
@@ -344,14 +364,15 @@ def and_(*conditions: str | dict | Callable) -> dict:
*conditions : Union[str, dict, Callable]
Variable number of conditions that can be:
- str: Method names
- dict: Existing condition dictionaries
- dict: Existing condition dictionaries (nested conditions)
- Callable: Method references
Returns
-------
dict
A condition dictionary with format:
{"type": "AND", "methods": list_of_method_names}
{"type": "AND", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
Raises
------
@@ -363,18 +384,69 @@ def and_(*conditions: str | dict | Callable) -> dict:
>>> @listen(and_("validated", "processed"))
>>> def handle_complete_data(self):
... pass
>>> @listen(and_(or_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
methods = []
processed_conditions: list[str | dict[str, Any]] = []
for condition in conditions:
if isinstance(condition, dict) and "methods" in condition:
methods.extend(condition["methods"])
if isinstance(condition, dict):
processed_conditions.append(condition)
elif isinstance(condition, str):
methods.append(condition)
processed_conditions.append(condition)
elif callable(condition):
methods.append(getattr(condition, "__name__", repr(condition)))
processed_conditions.append(getattr(condition, "__name__", repr(condition)))
else:
raise ValueError("Invalid condition in and_()")
return {"type": "AND", "methods": methods}
return {"type": "AND", "conditions": processed_conditions}
def _normalize_condition(condition: str | dict | list) -> dict:
"""Normalize a condition to standard format with 'conditions' key.
Args:
condition: Can be a string (method name), dict (condition), or list
Returns:
Normalized dict with 'type' and 'conditions' keys
"""
if isinstance(condition, str):
return {"type": "OR", "conditions": [condition]}
if isinstance(condition, dict):
if "conditions" in condition:
return condition
if "methods" in condition:
return {"type": condition["type"], "conditions": condition["methods"]}
return condition
if isinstance(condition, list):
return {"type": "OR", "conditions": condition}
return {"type": "OR", "conditions": [condition]}
def _extract_all_methods(condition: str | dict | list) -> list[str]:
"""Extract all method names from a condition (including nested).
Args:
condition: Can be a string, dict, or list
Returns:
List of all method names in the condition tree
"""
if isinstance(condition, str):
return [condition]
if isinstance(condition, dict):
normalized = _normalize_condition(condition)
methods = []
for sub_cond in normalized.get("conditions", []):
methods.extend(_extract_all_methods(sub_cond))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods(item))
return methods
return []
class FlowMeta(type):
@@ -402,7 +474,10 @@ class FlowMeta(type):
if hasattr(attr_value, "__trigger_methods__"):
methods = attr_value.__trigger_methods__
condition_type = getattr(attr_value, "__condition_type__", "OR")
listeners[attr_name] = (condition_type, methods)
if hasattr(attr_value, "__trigger_condition__"):
listeners[attr_name] = attr_value.__trigger_condition__
else:
listeners[attr_name] = (condition_type, methods)
if (
hasattr(attr_value, "__is_router__")
@@ -822,6 +897,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
@@ -1086,10 +1162,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
for method_name in self._start_methods:
# Check if this start method is triggered by the current trigger
if method_name in self._listeners:
condition_type, trigger_methods = self._listeners[
method_name
]
if current_trigger in trigger_methods:
condition_data = self._listeners[method_name]
should_trigger = False
if isinstance(condition_data, tuple):
_, trigger_methods = condition_data
should_trigger = current_trigger in trigger_methods
elif isinstance(condition_data, dict):
all_methods = _extract_all_methods(condition_data)
should_trigger = current_trigger in all_methods
if should_trigger:
# Only execute if this is a cycle (method was already completed)
if method_name in self._completed_methods:
# For router-triggered start methods in cycles, temporarily clear resumption flag
@@ -1099,6 +1181,51 @@ class Flow(Generic[T], metaclass=FlowMeta):
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
def _evaluate_condition(
self, condition: str | dict, trigger_method: str, listener_name: str
) -> bool:
"""Recursively evaluate a condition (simple or nested).
Args:
condition: Can be a string (method name) or dict (nested condition)
trigger_method: The method that just completed
listener_name: Name of the listener being evaluated
Returns:
True if the condition is satisfied, False otherwise
"""
if isinstance(condition, str):
return condition == trigger_method
if isinstance(condition, dict):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", "OR")
sub_conditions = normalized.get("conditions", [])
if cond_type == "OR":
return any(
self._evaluate_condition(sub_cond, trigger_method, listener_name)
for sub_cond in sub_conditions
)
if cond_type == "AND":
pending_key = f"{listener_name}:{id(condition)}"
if pending_key not in self._pending_and_listeners:
all_methods = set(_extract_all_methods(condition))
self._pending_and_listeners[pending_key] = all_methods
if trigger_method in self._pending_and_listeners[pending_key]:
self._pending_and_listeners[pending_key].discard(trigger_method)
if not self._pending_and_listeners[pending_key]:
self._pending_and_listeners.pop(pending_key, None)
return True
return False
return False
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> list[str]:
@@ -1106,7 +1233,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Finds all methods that should be triggered based on conditions.
This internal method evaluates both OR and AND conditions to determine
which methods should be executed next in the flow.
which methods should be executed next in the flow. Supports nested conditions.
Parameters
----------
@@ -1123,14 +1250,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Handles both OR and AND conditions:
* OR: Triggers if any condition is met
* AND: Triggers only when all conditions are met
- Handles both OR and AND conditions, including nested combinations
- Maintains state for AND conditions using _pending_and_listeners
- Separates router and normal listener evaluation
"""
triggered = []
for listener_name, (condition_type, methods) in self._listeners.items():
for listener_name, condition_data in self._listeners.items():
is_router = listener_name in self._routers
if router_only != is_router:
@@ -1139,23 +1265,29 @@ class Flow(Generic[T], metaclass=FlowMeta):
if not router_only and listener_name in self._start_methods:
continue
if condition_type == "OR":
# If the trigger_method matches any in methods, run this
if trigger_method in methods:
triggered.append(listener_name)
elif condition_type == "AND":
# Initialize pending methods for this listener if not already done
if listener_name not in self._pending_and_listeners:
self._pending_and_listeners[listener_name] = set(methods)
# Remove the trigger method from pending methods
if trigger_method in self._pending_and_listeners[listener_name]:
self._pending_and_listeners[listener_name].discard(trigger_method)
if isinstance(condition_data, tuple):
condition_type, methods = condition_data
if not self._pending_and_listeners[listener_name]:
# All required methods have been executed
if condition_type == "OR":
if trigger_method in methods:
triggered.append(listener_name)
elif condition_type == "AND":
if listener_name not in self._pending_and_listeners:
self._pending_and_listeners[listener_name] = set(methods)
if trigger_method in self._pending_and_listeners[listener_name]:
self._pending_and_listeners[listener_name].discard(
trigger_method
)
if not self._pending_and_listeners[listener_name]:
triggered.append(listener_name)
self._pending_and_listeners.pop(listener_name, None)
elif isinstance(condition_data, dict):
if self._evaluate_condition(
condition_data, trigger_method, listener_name
):
triggered.append(listener_name)
# Reset pending methods for this listener
self._pending_and_listeners.pop(listener_name, None)
return triggered
@@ -1218,7 +1350,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
raise
def _log_flow_event(
self, message: str, color: str = "yellow", level: str = "info"
self, message: str, color: PrinterColor | None = "yellow", level: str = "info"
) -> None:
"""Centralized logging method for flow events.

View File

@@ -228,8 +228,11 @@ def build_embedder_from_dict(spec):
"""Build an embedding function instance from a dictionary specification.
Args:
spec: A dictionary with 'provider' and 'config' keys.
Example: {
spec: A dictionary with 'provider' and optionally 'config' keys.
Supports two formats:
Nested format (recommended):
{
"provider": "openai",
"config": {
"api_key": "sk-...",
@@ -237,6 +240,13 @@ def build_embedder_from_dict(spec):
}
}
Flat format (for backward compatibility):
{
"provider": "openai",
"api_key": "sk-...",
"model_name": "text-embedding-3-small"
}
Returns:
An instance of the appropriate embedding function.
@@ -266,7 +276,10 @@ def build_embedder_from_dict(spec):
except (ImportError, AttributeError, ValueError) as e:
raise ImportError(f"Failed to import provider {provider_name}: {e}") from e
provider_config = spec.get("config", {})
if "config" in spec:
provider_config = spec["config"]
else:
provider_config = {k: v for k, v in spec.items() if k != "provider"}
if provider_name == "custom" and "embedding_callable" not in provider_config:
raise ValueError("Custom provider requires 'embedding_callable' in config")

View File

@@ -13,10 +13,10 @@ class GenerativeAiProviderConfig(TypedDict, total=False):
task_type: Annotated[str, "RETRIEVAL_DOCUMENT"]
class GenerativeAiProviderSpec(TypedDict):
class GenerativeAiProviderSpec(TypedDict, total=False):
"""Google Generative AI provider specification."""
provider: Literal["google-generativeai"]
provider: Required[Literal["google-generativeai"]]
config: GenerativeAiProviderConfig

View File

@@ -7,7 +7,7 @@ import uuid
import warnings
from collections.abc import Callable
from concurrent.futures import Future
from copy import copy
from copy import copy as shallow_copy
from hashlib import md5
from pathlib import Path
from typing import (
@@ -672,7 +672,9 @@ Follow these guidelines:
copied_data = {k: v for k, v in copied_data.items() if v is not None}
cloned_context = (
[task_mapping[context_task.key] for context_task in self.context]
self.context
if self.context is NOT_SPECIFIED
else [task_mapping[context_task.key] for context_task in self.context]
if isinstance(self.context, list)
else None
)
@@ -681,7 +683,7 @@ Follow these guidelines:
return next((agent for agent in agents if agent.role == role), None)
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
cloned_tools = shallow_copy(self.tools) if self.tools else []
return self.__class__(
**copied_data,

View File

@@ -1,6 +1,11 @@
"""Utility for colored console output."""
from typing import Final, Literal, NamedTuple
from __future__ import annotations
from typing import TYPE_CHECKING, Final, Literal, NamedTuple
if TYPE_CHECKING:
from _typeshed import SupportsWrite
PrinterColor = Literal[
"purple",
@@ -54,13 +59,22 @@ class Printer:
@staticmethod
def print(
content: str | list[ColoredText], color: PrinterColor | None = None
content: str | list[ColoredText],
color: PrinterColor | None = None,
sep: str | None = " ",
end: str | None = "\n",
file: SupportsWrite[str] | None = None,
flush: Literal[False] = False,
) -> None:
"""Prints content to the console with optional color formatting.
Args:
content: Either a string or a list of ColoredText objects for multicolor output.
color: Optional color for the text when content is a string. Ignored when content is a list.
sep: Separator to use between the text and color.
end: String appended after the last value.
file: A file-like object (stream); defaults to the current sys.stdout.
flush: Whether to forcibly flush the stream.
"""
if isinstance(content, str):
content = [ColoredText(content, color)]
@@ -68,5 +82,9 @@ class Printer:
"".join(
f"{_COLOR_CODES[c.color] if c.color else ''}{c.text}{RESET}"
for c in content
)
),
sep=sep,
end=end,
file=file,
flush=flush,
)

View File

@@ -1,7 +1,7 @@
import jwt
import unittest
from unittest.mock import MagicMock, patch
import jwt
from crewai.cli.authentication.utils import validate_jwt_token
@@ -17,19 +17,22 @@ class TestUtils(unittest.TestCase):
key="mock_signing_key"
)
jwt_token = "aaaaa.bbbbbb.cccccc" # noqa: S105
decoded_token = validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc",
jwt_token=jwt_token,
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
mock_jwt.decode.assert_called_with(
"aaaaa.bbbbbb.cccccc",
jwt_token,
"mock_signing_key",
algorithms=["RS256"],
audience="app_id_xxxx",
issuer="https://mock_issuer",
leeway=10.0,
options={
"verify_signature": True,
"verify_exp": True,
@@ -43,9 +46,9 @@ class TestUtils(unittest.TestCase):
def test_validate_jwt_token_expired(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.ExpiredSignatureError
with self.assertRaises(Exception):
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc",
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
@@ -53,9 +56,9 @@ class TestUtils(unittest.TestCase):
def test_validate_jwt_token_invalid_audience(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidAudienceError
with self.assertRaises(Exception):
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc",
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
@@ -63,9 +66,9 @@ class TestUtils(unittest.TestCase):
def test_validate_jwt_token_invalid_issuer(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidIssuerError
with self.assertRaises(Exception):
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc",
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
@@ -75,9 +78,9 @@ class TestUtils(unittest.TestCase):
self, mock_jwt, mock_pyjwkclient
):
mock_jwt.decode.side_effect = jwt.MissingRequiredClaimError
with self.assertRaises(Exception):
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc",
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
@@ -85,9 +88,9 @@ class TestUtils(unittest.TestCase):
def test_validate_jwt_token_jwks_error(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.exceptions.PyJWKClientError
with self.assertRaises(Exception):
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc",
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
@@ -95,9 +98,9 @@ class TestUtils(unittest.TestCase):
def test_validate_jwt_token_invalid_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidTokenError
with self.assertRaises(Exception):
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc",
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",

View File

@@ -242,3 +242,61 @@ class TestEmbeddingFactory:
mock_build_from_provider.assert_called_once_with(mock_provider)
assert result == mock_embedding_function
mock_import.assert_not_called()
@patch("crewai.rag.embeddings.factory.import_and_validate_definition")
def test_build_embedder_google_generativeai_nested_config(self, mock_import):
"""Test building Google Generative AI embedder with nested config format."""
mock_provider_class = MagicMock()
mock_provider_instance = MagicMock()
mock_embedding_function = MagicMock()
mock_import.return_value = mock_provider_class
mock_provider_class.return_value = mock_provider_instance
mock_provider_instance.embedding_callable.return_value = mock_embedding_function
config = {
"provider": "google-generativeai",
"config": {
"api_key": "test-gemini-key",
"model_name": "models/text-embedding-004",
},
}
build_embedder(config)
mock_import.assert_called_once_with(
"crewai.rag.embeddings.providers.google.generative_ai.GenerativeAiProvider"
)
mock_provider_class.assert_called_once()
call_kwargs = mock_provider_class.call_args.kwargs
assert call_kwargs["api_key"] == "test-gemini-key"
assert call_kwargs["model_name"] == "models/text-embedding-004"
@patch("crewai.rag.embeddings.factory.import_and_validate_definition")
def test_build_embedder_google_generativeai_flat_config(self, mock_import):
"""Test building Google Generative AI embedder with flat config format (issue #3741)."""
mock_provider_class = MagicMock()
mock_provider_instance = MagicMock()
mock_embedding_function = MagicMock()
mock_import.return_value = mock_provider_class
mock_provider_class.return_value = mock_provider_instance
mock_provider_instance.embedding_callable.return_value = mock_embedding_function
config = {
"provider": "google-generativeai",
"api_key": "test-gemini-key",
"model_name": "models/text-embedding-004",
}
build_embedder(config)
mock_import.assert_called_once_with(
"crewai.rag.embeddings.providers.google.generative_ai.GenerativeAiProvider"
)
mock_provider_class.assert_called_once()
call_kwargs = mock_provider_class.call_args.kwargs
assert call_kwargs["api_key"] == "test-gemini-key"
assert call_kwargs["model_name"] == "models/text-embedding-004"

View File

@@ -0,0 +1,107 @@
"""Tests for Google Generative AI embedder configuration (issue #3741)."""
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Crew, Task
class TestGoogleGenerativeAIEmbedder:
"""Test Google Generative AI embedder configuration formats."""
@patch("crewai.crew.Knowledge")
@patch("crewai.crew.ShortTermMemory")
@patch("crewai.crew.LongTermMemory")
@patch("crewai.crew.EntityMemory")
def test_crew_with_google_generativeai_flat_config(
self, mock_entity_memory, mock_long_term_memory, mock_short_term_memory, mock_knowledge
):
"""Test that Crew accepts google-generativeai embedder with flat config format (issue #3741)."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
embedder_config = {
"provider": "google-generativeai",
"api_key": "test-gemini-key",
"model_name": "models/text-embedding-004",
}
crew = Crew(
agents=[agent],
tasks=[task],
embedder=embedder_config,
)
expected_normalized_config = {
"provider": "google-generativeai",
"config": {
"api_key": "test-gemini-key",
"model_name": "models/text-embedding-004",
},
}
assert crew.embedder == expected_normalized_config
@patch("crewai.crew.Knowledge")
@patch("crewai.crew.ShortTermMemory")
@patch("crewai.crew.LongTermMemory")
@patch("crewai.crew.EntityMemory")
def test_crew_with_google_generativeai_nested_config(
self, mock_entity_memory, mock_long_term_memory, mock_short_term_memory, mock_knowledge
):
"""Test that Crew accepts google-generativeai embedder with nested config format."""
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
embedder_config = {
"provider": "google-generativeai",
"config": {
"api_key": "test-gemini-key",
"model_name": "models/text-embedding-004",
},
}
crew = Crew(
agents=[agent],
tasks=[task],
embedder=embedder_config,
)
assert crew.embedder == embedder_config
def test_generativeai_provider_spec_validation(self):
"""Test that GenerativeAiProviderSpec validates correctly with optional config."""
from crewai.rag.embeddings.types import GenerativeAiProviderSpec
flat_spec: GenerativeAiProviderSpec = {
"provider": "google-generativeai",
}
assert flat_spec["provider"] == "google-generativeai"
nested_spec: GenerativeAiProviderSpec = {
"provider": "google-generativeai",
"config": {
"api_key": "test-key",
"model_name": "models/text-embedding-004",
},
}
assert nested_spec["provider"] == "google-generativeai"
assert nested_spec["config"]["api_key"] == "test-key"

View File

@@ -6,15 +6,15 @@ from datetime import datetime
import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
FlowFinishedEvent,
FlowStartedEvent,
FlowPlotEvent,
FlowStartedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.flow import Flow, and_, listen, or_, router, start
def test_simple_sequential_flow():
@@ -679,11 +679,11 @@ def test_structured_flow_event_emission():
assert isinstance(received_events[3], MethodExecutionStartedEvent)
assert received_events[3].method_name == "send_welcome_message"
assert received_events[3].params == {}
assert getattr(received_events[3].state, "sent") is False
assert received_events[3].state.sent is False
assert isinstance(received_events[4], MethodExecutionFinishedEvent)
assert received_events[4].method_name == "send_welcome_message"
assert getattr(received_events[4].state, "sent") is True
assert received_events[4].state.sent is True
assert received_events[4].result == "Welcome, Anakin!"
assert isinstance(received_events[5], FlowFinishedEvent)
@@ -894,3 +894,75 @@ def test_flow_name():
flow = MyFlow()
assert flow.name == "MyFlow"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
Reproduces bug from issue #3719 where nested conditions are flattened,
causing premature execution.
"""
execution_order = []
class NestedConditionFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen(method_1)
def method_2(self):
execution_order.append("method_2")
@router(method_2)
def method_3(self):
execution_order.append("method_3")
# Choose b_condition path
return "b_condition"
@listen("b_condition")
def method_5(self):
execution_order.append("method_5")
@listen(method_5)
async def method_4(self):
execution_order.append("method_4")
@listen(or_("a_condition", "b_condition"))
async def method_6(self):
execution_order.append("method_6")
@listen(
or_(
and_("a_condition", method_6),
and_(method_6, method_4),
)
)
def method_7(self):
execution_order.append("method_7")
@listen(method_7)
async def method_8(self):
execution_order.append("method_8")
flow = NestedConditionFlow()
flow.kickoff()
# Verify execution happened
assert "method_1" in execution_order
assert "method_2" in execution_order
assert "method_3" in execution_order
assert "method_5" in execution_order
assert "method_4" in execution_order
assert "method_6" in execution_order
assert "method_7" in execution_order
assert "method_8" in execution_order
# Critical assertion: method_7 should only execute AFTER both method_6 AND method_4
# Since b_condition was returned, method_6 triggers on b_condition
# method_7 requires: (a_condition AND method_6) OR (method_6 AND method_4)
# The second condition (method_6 AND method_4) should be the one that triggers
assert execution_order.index("method_7") > execution_order.index("method_6")
assert execution_order.index("method_7") > execution_order.index("method_4")
# method_8 should execute after method_7
assert execution_order.index("method_8") > execution_order.index("method_7")

View File

@@ -1218,7 +1218,7 @@ def test_create_directory_false():
assert not resolved_dir.exists()
with pytest.raises(
RuntimeError, match="Directory .* does not exist and create_directory is False"
RuntimeError, match=r"Directory .* does not exist and create_directory is False"
):
task._save_file("test content")
@@ -1635,3 +1635,48 @@ def test_task_interpolation_with_hyphens():
assert "say hello world" in task.prompt()
assert result.raw == "Hello, World!"
def test_task_copy_with_none_context():
original_task = Task(
description="Test task",
expected_output="Test output",
context=None
)
new_task = original_task.copy(agents=[], task_mapping={})
assert original_task.context is None
assert new_task.context is None
def test_task_copy_with_not_specified_context():
from crewai.utilities.constants import NOT_SPECIFIED
original_task = Task(
description="Test task",
expected_output="Test output",
)
new_task = original_task.copy(agents=[], task_mapping={})
assert original_task.context is NOT_SPECIFIED
assert new_task.context is NOT_SPECIFIED
def test_task_copy_with_list_context():
"""Test that copying a task with list context works correctly."""
task1 = Task(
description="Task 1",
expected_output="Output 1"
)
task2 = Task(
description="Task 2",
expected_output="Output 2",
context=[task1]
)
task_mapping = {task1.key: task1}
copied_task2 = task2.copy(agents=[], task_mapping=task_mapping)
assert isinstance(copied_task2.context, list)
assert len(copied_task2.context) == 1
assert copied_task2.context[0] is task1

6817
uv.lock generated

File diff suppressed because it is too large Load Diff