Compare commits

...

5 Commits

Author SHA1 Message Date
Greyson LaLonde
eda4430d6f Merge branch 'main' into gl/fix/hitl-flow-plot 2026-02-02 13:37:12 -05:00
Greyson LaLonde
3f264b4cc8 fix: add human_feedback metadata and visualization 2026-02-02 13:18:22 -05:00
Lucas Gomide
96bde4510b feat: auto update tools.specs (#4341) 2026-02-02 12:52:00 -05:00
Greyson LaLonde
9d7f45376a fix: use contextvars for flow execution context 2026-02-02 11:24:02 -05:00
Thiago Moretto
536447ab0e declare stagehand package as dep for StagehandTool (#4336) 2026-02-02 09:45:47 -05:00
21 changed files with 455 additions and 97 deletions

View File

@@ -0,0 +1,63 @@
name: Generate Tool Specifications
on:
pull_request:
branches:
- main
paths:
- 'lib/crewai-tools/src/crewai_tools/**'
workflow_dispatch:
permissions:
contents: write
pull-requests: write
jobs:
generate-specs:
runs-on: ubuntu-latest
env:
PYTHONUNBUFFERED: 1
steps:
- name: Generate GitHub App token
id: app-token
uses: tibdex/github-app-token@v2
with:
app_id: ${{ secrets.CREWAI_TOOL_SPECS_APP_ID }}
private_key: ${{ secrets.CREWAI_TOOL_SPECS_PRIVATE_KEY }}
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.head_ref }}
token: ${{ steps.app-token.outputs.token }}
- name: Install uv
uses: astral-sh/setup-uv@v6
with:
version: "0.8.4"
python-version: "3.12"
enable-cache: true
- name: Install the project
working-directory: lib/crewai-tools
run: uv sync --dev --all-extras
- name: Generate tool specifications
working-directory: lib/crewai-tools
run: uv run python src/crewai_tools/generate_tool_specs.py
- name: Check for changes and commit
run: |
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
git add lib/crewai-tools/tool.specs.json
if git diff --quiet --staged; then
echo "No changes detected in tool.specs.json"
else
echo "Changes detected in tool.specs.json, committing..."
git commit -m "chore: update tool specifications"
git push
fi

View File

@@ -137,6 +137,7 @@ class StagehandTool(BaseTool):
- 'observe': For finding elements in a specific area - 'observe': For finding elements in a specific area
""" """
args_schema: type[BaseModel] = StagehandToolSchema args_schema: type[BaseModel] = StagehandToolSchema
package_dependencies: list[str] = Field(default_factory=lambda: ["stagehand"])
# Stagehand configuration # Stagehand configuration
api_key: str | None = None api_key: str | None = None

View File

@@ -8,11 +8,13 @@ Example:
from crewai.flow import Flow, start, human_feedback from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider): class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow): def request_feedback(self, context, flow):
self.send_slack_notification(context) self.send_slack_notification(context)
raise HumanFeedbackPending(context=context) raise HumanFeedbackPending(context=context)
class MyFlow(Flow): class MyFlow(Flow):
@start() @start()
@human_feedback( @human_feedback(
@@ -26,12 +28,13 @@ Example:
``` ```
""" """
from crewai.flow.async_feedback.providers import ConsoleProvider
from crewai.flow.async_feedback.types import ( from crewai.flow.async_feedback.types import (
HumanFeedbackPending, HumanFeedbackPending,
HumanFeedbackProvider, HumanFeedbackProvider,
PendingFeedbackContext, PendingFeedbackContext,
) )
from crewai.flow.async_feedback.providers import ConsoleProvider
__all__ = [ __all__ = [
"ConsoleProvider", "ConsoleProvider",

View File

@@ -6,10 +6,11 @@ provider that collects feedback via console input.
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Any
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.flow import Flow from crewai.flow.flow import Flow
@@ -27,6 +28,7 @@ class ConsoleProvider:
```python ```python
from crewai.flow.async_feedback import ConsoleProvider from crewai.flow.async_feedback import ConsoleProvider
# Explicitly use console provider # Explicitly use console provider
@human_feedback( @human_feedback(
message="Review this:", message="Review this:",
@@ -49,7 +51,7 @@ class ConsoleProvider:
def request_feedback( def request_feedback(
self, self,
context: PendingFeedbackContext, context: PendingFeedbackContext,
flow: Flow, flow: Flow[Any],
) -> str: ) -> str:
"""Request feedback via console input (blocking). """Request feedback via console input (blocking).

View File

@@ -10,6 +10,7 @@ from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.flow import Flow from crewai.flow.flow import Flow
@@ -155,7 +156,7 @@ class HumanFeedbackPending(Exception): # noqa: N818 - Not an error, a control f
callback_info={ callback_info={
"slack_channel": "#reviews", "slack_channel": "#reviews",
"thread_id": ticket_id, "thread_id": ticket_id,
} },
) )
``` ```
""" """
@@ -232,7 +233,7 @@ class HumanFeedbackProvider(Protocol):
callback_info={ callback_info={
"channel": self.channel, "channel": self.channel,
"thread_id": thread_id, "thread_id": thread_id,
} },
) )
``` ```
""" """
@@ -240,7 +241,7 @@ class HumanFeedbackProvider(Protocol):
def request_feedback( def request_feedback(
self, self,
context: PendingFeedbackContext, context: PendingFeedbackContext,
flow: Flow, flow: Flow[Any],
) -> str: ) -> str:
"""Request feedback from a human. """Request feedback from a human.

View File

@@ -1,4 +1,5 @@
from typing import Final, Literal from typing import Final, Literal
AND_CONDITION: Final[Literal["AND"]] = "AND" AND_CONDITION: Final[Literal["AND"]] = "AND"
OR_CONDITION: Final[Literal["OR"]] = "OR" OR_CONDITION: Final[Literal["OR"]] = "OR"

View File

@@ -58,6 +58,7 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent, MethodExecutionStartedEvent,
) )
from crewai.flow.constants import AND_CONDITION, OR_CONDITION from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.flow_wrappers import ( from crewai.flow.flow_wrappers import (
FlowCondition, FlowCondition,
FlowConditions, FlowConditions,
@@ -512,11 +513,17 @@ class FlowMeta(type):
and attr_value.__is_router__ and attr_value.__is_router__
): ):
routers.add(attr_name) routers.add(attr_name)
possible_returns = get_possible_return_constants(attr_value) if (
if possible_returns: hasattr(attr_value, "__router_paths__")
router_paths[attr_name] = possible_returns and attr_value.__router_paths__
):
router_paths[attr_name] = attr_value.__router_paths__
else: else:
router_paths[attr_name] = [] possible_returns = get_possible_return_constants(attr_value)
if possible_returns:
router_paths[attr_name] = possible_returns
else:
router_paths[attr_name] = []
# Handle start methods that are also routers (e.g., @human_feedback with emit) # Handle start methods that are also routers (e.g., @human_feedback with emit)
if ( if (
@@ -1540,6 +1547,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx) ctx = baggage.set_baggage("flow_input_files", input_files or {}, context=ctx)
flow_token = attach(ctx) flow_token = attach(ctx)
flow_id_token = None
request_id_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
try: try:
# Reset flow state for fresh execution unless restoring from persistence # Reset flow state for fresh execution unless restoring from persistence
is_restoring = inputs and "id" in inputs and self._persistence is not None is_restoring = inputs and "id" in inputs and self._persistence is not None
@@ -1717,6 +1731,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
return final_output return final_output
finally: finally:
if request_id_token is not None:
current_flow_request_id.reset(request_id_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
detach(flow_token) detach(flow_token)
async def akickoff( async def akickoff(

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.async_feedback.types import HumanFeedbackProvider from crewai.flow.async_feedback.types import HumanFeedbackProvider

View File

@@ -0,0 +1,16 @@
"""Flow execution context management.
This module provides context variables for tracking flow execution state across
async boundaries and nested function calls.
"""
import contextvars
current_flow_request_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_request_id", default=None
)
current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_id", default=None
)

View File

@@ -1,46 +1,22 @@
import inspect from pydantic import BaseModel, model_validator
from typing import Any
from pydantic import BaseModel, Field, InstanceOf, model_validator
from typing_extensions import Self from typing_extensions import Self
from crewai.flow.flow import Flow from crewai.flow.flow_context import current_flow_id, current_flow_request_id
class FlowTrackable(BaseModel): class FlowTrackable(BaseModel):
"""Mixin that tracks the Flow instance that instantiated the object, e.g. a """Mixin that tracks flow execution context for objects created within flows.
Flow instance that created a Crew or Agent.
Automatically finds and stores a reference to the parent Flow instance by When a Crew or Agent is instantiated inside a flow execution, this mixin
inspecting the call stack. automatically captures the flow ID and request ID from context variables,
enabling proper tracking and association with the parent flow execution.
""" """
parent_flow: InstanceOf[Flow[Any]] | None = Field(
default=None,
description="The parent flow of the instance, if it was created inside a flow.",
)
@model_validator(mode="after") @model_validator(mode="after")
def _set_parent_flow(self) -> Self: def _set_flow_context(self) -> Self:
max_depth = 8 request_id = current_flow_request_id.get()
frame = inspect.currentframe() if request_id:
self._request_id = request_id
try: self._flow_id = current_flow_id.get()
if frame is None:
return self
frame = frame.f_back
for _ in range(max_depth):
if frame is None:
break
candidate = frame.f_locals.get("self")
if isinstance(candidate, Flow):
self.parent_flow = candidate
break
frame = frame.f_back
finally:
del frame
return self return self

View File

@@ -11,6 +11,7 @@ Example (synchronous, default):
```python ```python
from crewai.flow import Flow, start, listen, human_feedback from crewai.flow import Flow, start, listen, human_feedback
class ReviewFlow(Flow): class ReviewFlow(Flow):
@start() @start()
@human_feedback( @human_feedback(
@@ -32,11 +33,13 @@ Example (asynchronous with custom provider):
from crewai.flow import Flow, start, human_feedback from crewai.flow import Flow, start, human_feedback
from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending from crewai.flow.async_feedback import HumanFeedbackProvider, HumanFeedbackPending
class SlackProvider(HumanFeedbackProvider): class SlackProvider(HumanFeedbackProvider):
def request_feedback(self, context, flow): def request_feedback(self, context, flow):
self.send_notification(context) self.send_notification(context)
raise HumanFeedbackPending(context=context) raise HumanFeedbackPending(context=context)
class ReviewFlow(Flow): class ReviewFlow(Flow):
@start() @start()
@human_feedback( @human_feedback(
@@ -229,6 +232,7 @@ def human_feedback(
def review_document(self): def review_document(self):
return document_content return document_content
@listen("approved") @listen("approved")
def publish(self): def publish(self):
print(f"Publishing: {self.last_human_feedback.output}") print(f"Publishing: {self.last_human_feedback.output}")
@@ -265,7 +269,7 @@ def human_feedback(
def decorator(func: F) -> F: def decorator(func: F) -> F:
"""Inner decorator that wraps the function.""" """Inner decorator that wraps the function."""
def _request_feedback(flow_instance: Flow, method_output: Any) -> str: def _request_feedback(flow_instance: Flow[Any], method_output: Any) -> str:
"""Request feedback using provider or default console.""" """Request feedback using provider or default console."""
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -291,19 +295,16 @@ def human_feedback(
effective_provider = flow_config.hitl_provider effective_provider = flow_config.hitl_provider
if effective_provider is not None: if effective_provider is not None:
# Use provider (may raise HumanFeedbackPending for async providers)
return effective_provider.request_feedback(context, flow_instance) return effective_provider.request_feedback(context, flow_instance)
else: return flow_instance._request_human_feedback(
# Use default console input (local development) message=message,
return flow_instance._request_human_feedback( output=method_output,
message=message, metadata=metadata,
output=method_output, emit=emit,
metadata=metadata, )
emit=emit,
)
def _process_feedback( def _process_feedback(
flow_instance: Flow, flow_instance: Flow[Any],
method_output: Any, method_output: Any,
raw_feedback: str, raw_feedback: str,
) -> HumanFeedbackResult | str: ) -> HumanFeedbackResult | str:
@@ -319,12 +320,14 @@ def human_feedback(
# No default and no feedback - use first outcome # No default and no feedback - use first outcome
collapsed_outcome = emit[0] collapsed_outcome = emit[0]
elif emit: elif emit:
# Collapse feedback to outcome using LLM if llm is not None:
collapsed_outcome = flow_instance._collapse_to_outcome( collapsed_outcome = flow_instance._collapse_to_outcome(
feedback=raw_feedback, feedback=raw_feedback,
outcomes=emit, outcomes=emit,
llm=llm, llm=llm,
) )
else:
collapsed_outcome = emit[0]
# Create result # Create result
result = HumanFeedbackResult( result = HumanFeedbackResult(
@@ -349,7 +352,7 @@ def human_feedback(
if asyncio.iscoroutinefunction(func): if asyncio.iscoroutinefunction(func):
# Async wrapper # Async wrapper
@wraps(func) @wraps(func)
async def async_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any: async def async_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
# Execute the original method # Execute the original method
method_output = await func(self, *args, **kwargs) method_output = await func(self, *args, **kwargs)
@@ -363,7 +366,7 @@ def human_feedback(
else: else:
# Sync wrapper # Sync wrapper
@wraps(func) @wraps(func)
def sync_wrapper(self: Flow, *args: Any, **kwargs: Any) -> Any: def sync_wrapper(self: Flow[Any], *args: Any, **kwargs: Any) -> Any:
# Execute the original method # Execute the original method
method_output = func(self, *args, **kwargs) method_output = func(self, *args, **kwargs)
@@ -397,11 +400,10 @@ def human_feedback(
) )
wrapper.__is_flow_method__ = True wrapper.__is_flow_method__ = True
# Make it a router if emit specified
if emit: if emit:
wrapper.__is_router__ = True wrapper.__is_router__ = True
wrapper.__router_paths__ = list(emit) wrapper.__router_paths__ = list(emit)
return wrapper # type: ignore[return-value] return wrapper # type: ignore[no-any-return]
return decorator return decorator

View File

@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any
from pydantic import BaseModel from pydantic import BaseModel
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -103,4 +104,3 @@ class FlowPersistence(ABC):
Args: Args:
flow_uuid: Unique identifier for the flow instance flow_uuid: Unique identifier for the flow instance
""" """
pass

View File

@@ -15,6 +15,7 @@ from pydantic import BaseModel
from crewai.flow.persistence.base import FlowPersistence from crewai.flow.persistence.base import FlowPersistence
from crewai.utilities.paths import db_storage_path from crewai.utilities.paths import db_storage_path
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
@@ -176,7 +177,8 @@ class SQLiteFlowPersistence(FlowPersistence):
row = cursor.fetchone() row = cursor.fetchone()
if row: if row:
return json.loads(row[0]) result = json.loads(row[0])
return result if isinstance(result, dict) else None
return None return None
def save_pending_feedback( def save_pending_feedback(
@@ -196,7 +198,6 @@ class SQLiteFlowPersistence(FlowPersistence):
state_data: Current state data state_data: Current state data
""" """
# Import here to avoid circular imports # Import here to avoid circular imports
from crewai.flow.async_feedback.types import PendingFeedbackContext
# Convert state_data to dict # Convert state_data to dict
if isinstance(state_data, BaseModel): if isinstance(state_data, BaseModel):

View File

@@ -1025,7 +1025,7 @@ class TriggeredByHighlighter {
const isAndOrRouter = edge.dashes || edge.label === "AND"; const isAndOrRouter = edge.dashes || edge.label === "AND";
const highlightColor = isAndOrRouter const highlightColor = isAndOrRouter
? "{{ CREWAI_ORANGE }}" ? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim(); : getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const updateData = { const updateData = {
@@ -1080,7 +1080,7 @@ class TriggeredByHighlighter {
// Keep the original edge color instead of turning gray // Keep the original edge color instead of turning gray
const isAndOrRouter = edge.dashes || edge.label === "AND"; const isAndOrRouter = edge.dashes || edge.label === "AND";
const baseColor = isAndOrRouter const baseColor = isAndOrRouter
? "{{ CREWAI_ORANGE }}" ? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim(); : getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
// Convert color to rgba with opacity for vis.js // Convert color to rgba with opacity for vis.js
@@ -1142,7 +1142,7 @@ class TriggeredByHighlighter {
const defaultColor = const defaultColor =
edge.dashes || edge.label === "AND" edge.dashes || edge.label === "AND"
? "{{ CREWAI_ORANGE }}" ? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim(); : getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0; const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0;
const currentWidth = const currentWidth =
@@ -1253,7 +1253,7 @@ class TriggeredByHighlighter {
const defaultColor = const defaultColor =
edge.dashes || edge.label === "AND" edge.dashes || edge.label === "AND"
? "{{ CREWAI_ORANGE }}" ? (edge.color?.color || "{{ CREWAI_ORANGE }}")
: getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim(); : getComputedStyle(document.documentElement).getPropertyValue('--edge-or-color').trim();
const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0; const currentOpacity = edge.opacity !== undefined ? edge.opacity : 1.0;
const currentWidth = const currentWidth =
@@ -2370,7 +2370,7 @@ class NetworkManager {
this.edges.forEach((edge) => { this.edges.forEach((edge) => {
let edgeColor; let edgeColor;
if (edge.dashes || edge.label === "AND") { if (edge.dashes || edge.label === "AND") {
edgeColor = "{{ CREWAI_ORANGE }}"; edgeColor = edge.color?.color || "{{ CREWAI_ORANGE }}";
} else { } else {
edgeColor = orEdgeColor; edgeColor = orEdgeColor;
} }

View File

@@ -129,7 +129,7 @@ def _create_edges_from_condition(
edges: list[StructureEdge] = [] edges: list[StructureEdge] = []
if isinstance(condition, str): if isinstance(condition, str):
if condition in nodes: if condition in nodes and condition != target:
edges.append( edges.append(
StructureEdge( StructureEdge(
source=condition, source=condition,
@@ -140,7 +140,7 @@ def _create_edges_from_condition(
) )
elif callable(condition) and hasattr(condition, "__name__"): elif callable(condition) and hasattr(condition, "__name__"):
method_name = condition.__name__ method_name = condition.__name__
if method_name in nodes: if method_name in nodes and method_name != target:
edges.append( edges.append(
StructureEdge( StructureEdge(
source=method_name, source=method_name,
@@ -163,7 +163,7 @@ def _create_edges_from_condition(
is_router_path=False, is_router_path=False,
) )
for trigger in triggers for trigger in triggers
if trigger in nodes if trigger in nodes and trigger != target
) )
else: else:
for sub_cond in conditions_list: for sub_cond in conditions_list:
@@ -196,9 +196,34 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
node_metadata["type"] = "start" node_metadata["type"] = "start"
start_methods.append(method_name) start_methods.append(method_name)
if (
hasattr(method, "__human_feedback_config__")
and method.__human_feedback_config__
):
config = method.__human_feedback_config__
node_metadata["is_human_feedback"] = True
node_metadata["human_feedback_message"] = config.message
if config.emit:
node_metadata["human_feedback_emit"] = list(config.emit)
if config.llm:
llm_str = (
config.llm
if isinstance(config.llm, str)
else str(type(config.llm).__name__)
)
node_metadata["human_feedback_llm"] = llm_str
if config.default_outcome:
node_metadata["human_feedback_default_outcome"] = config.default_outcome
if hasattr(method, "__is_router__") and method.__is_router__: if hasattr(method, "__is_router__") and method.__is_router__:
node_metadata["is_router"] = True node_metadata["is_router"] = True
node_metadata["type"] = "router" if "is_human_feedback" not in node_metadata:
node_metadata["type"] = "router"
else:
node_metadata["type"] = "human_feedback"
router_methods.append(method_name) router_methods.append(method_name)
if method_name in flow._router_paths: if method_name in flow._router_paths:
@@ -317,7 +342,7 @@ def build_flow_structure(flow: Flow[Any]) -> FlowStructure:
is_router_path=False, is_router_path=False,
) )
for trigger_method in methods for trigger_method in methods
if str(trigger_method) in nodes if str(trigger_method) in nodes and str(trigger_method) != listener_name
) )
elif is_flow_condition_dict(condition_data): elif is_flow_condition_dict(condition_data):
edges.extend( edges.extend(

View File

@@ -81,6 +81,7 @@ class JSExtension(Extension):
CREWAI_ORANGE = "#FF5A50" CREWAI_ORANGE = "#FF5A50"
HITL_BLUE = "#4A90E2"
DARK_GRAY = "#333333" DARK_GRAY = "#333333"
WHITE = "#FFFFFF" WHITE = "#FFFFFF"
GRAY = "#666666" GRAY = "#666666"
@@ -225,6 +226,7 @@ def render_interactive(
nodes_list: list[dict[str, Any]] = [] nodes_list: list[dict[str, Any]] = []
for name, metadata in dag["nodes"].items(): for name, metadata in dag["nodes"].items():
node_type: str = metadata.get("type", "listen") node_type: str = metadata.get("type", "listen")
is_human_feedback: bool = metadata.get("is_human_feedback", False)
color_config: dict[str, Any] color_config: dict[str, Any]
font_color: str font_color: str
@@ -241,6 +243,17 @@ def render_interactive(
} }
font_color = "var(--node-text-color)" font_color = "var(--node-text-color)"
border_width = 3 border_width = 3
elif node_type == "human_feedback":
color_config = {
"background": "var(--node-bg-router)",
"border": HITL_BLUE,
"highlight": {
"background": "var(--node-bg-router)",
"border": HITL_BLUE,
},
}
font_color = "var(--node-text-color)"
border_width = 3
elif node_type == "router": elif node_type == "router":
color_config = { color_config = {
"background": "var(--node-bg-router)", "background": "var(--node-bg-router)",
@@ -266,16 +279,57 @@ def render_interactive(
title_parts: list[str] = [] title_parts: list[str] = []
type_badge_bg: str = ( display_type = node_type
CREWAI_ORANGE if node_type in ["start", "router"] else DARK_GRAY type_badge_bg: str
) if node_type == "human_feedback":
type_badge_bg = HITL_BLUE
display_type = "HITL"
elif node_type in ["start", "router"]:
type_badge_bg = CREWAI_ORANGE
else:
type_badge_bg = DARK_GRAY
title_parts.append(f""" title_parts.append(f"""
<div style="border-bottom: 1px solid rgba(102,102,102,0.15); padding-bottom: 8px; margin-bottom: 10px;"> <div style="border-bottom: 1px solid rgba(102,102,102,0.15); padding-bottom: 8px; margin-bottom: 10px;">
<div style="font-size: 13px; font-weight: 700; color: {DARK_GRAY}; margin-bottom: 6px;">{name}</div> <div style="font-size: 13px; font-weight: 700; color: {DARK_GRAY}; margin-bottom: 6px;">{name}</div>
<span style="display: inline-block; background: {type_badge_bg}; color: white; padding: 2px 8px; border-radius: 4px; font-size: 10px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px;">{node_type}</span> <span style="display: inline-block; background: {type_badge_bg}; color: white; padding: 2px 8px; border-radius: 4px; font-size: 10px; font-weight: 600; text-transform: uppercase; letter-spacing: 0.5px;">{display_type}</span>
</div> </div>
""") """)
if is_human_feedback:
feedback_msg = metadata.get("human_feedback_message", "")
if feedback_msg:
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">👤 Human Feedback</div>
<div style="background: rgba(74,144,226,0.08); padding: 6px 8px; border-radius: 4px; font-size: 11px; color: {DARK_GRAY}; border: 1px solid rgba(74,144,226,0.2); line-height: 1.4;">{feedback_msg}</div>
</div>
""")
if metadata.get("human_feedback_emit"):
emit_options = metadata["human_feedback_emit"]
emit_items = "".join(
[
f'<li style="margin: 3px 0;"><code style="background: rgba(74,144,226,0.08); padding: 2px 6px; border-radius: 3px; font-size: 10px; color: {HITL_BLUE}; border: 1px solid rgba(74,144,226,0.2); font-weight: 600;">{opt}</code></li>'
for opt in emit_options
]
)
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 4px; font-weight: 600;">Outcomes</div>
<ul style="list-style: none; padding: 0; margin: 0;">{emit_items}</ul>
</div>
""")
if metadata.get("human_feedback_llm"):
llm_model = metadata["human_feedback_llm"]
title_parts.append(f"""
<div style="margin-bottom: 8px;">
<div style="font-size: 10px; text-transform: uppercase; color: {GRAY}; letter-spacing: 0.5px; margin-bottom: 3px; font-weight: 600;">LLM</div>
<span style="display: inline-block; background: rgba(102,102,102,0.08); padding: 3px 8px; border-radius: 4px; font-size: 10px; color: {DARK_GRAY}; border: 1px solid rgba(102,102,102,0.12);">{llm_model}</span>
</div>
""")
if metadata.get("condition_type"): if metadata.get("condition_type"):
condition = metadata["condition_type"] condition = metadata["condition_type"]
if condition == "AND": if condition == "AND":
@@ -309,7 +363,7 @@ def render_interactive(
</div> </div>
""") """)
if metadata.get("router_paths"): if metadata.get("router_paths") and not is_human_feedback:
paths = metadata["router_paths"] paths = metadata["router_paths"]
paths_items = "".join( paths_items = "".join(
[ [
@@ -365,7 +419,11 @@ def render_interactive(
edge_dashes: bool | list[int] = False edge_dashes: bool | list[int] = False
if edge["is_router_path"]: if edge["is_router_path"]:
edge_color = CREWAI_ORANGE source_node = dag["nodes"].get(edge["source"], {})
if source_node.get("is_human_feedback", False):
edge_color = HITL_BLUE
else:
edge_color = CREWAI_ORANGE
edge_dashes = [15, 10] edge_dashes = [15, 10]
if "router_path_label" in edge: if "router_path_label" in edge:
edge_label = edge["router_path_label"] edge_label = edge["router_path_label"]
@@ -417,6 +475,7 @@ def render_interactive(
css_content = css_content.replace("'{{ DARK_GRAY }}'", DARK_GRAY) css_content = css_content.replace("'{{ DARK_GRAY }}'", DARK_GRAY)
css_content = css_content.replace("'{{ GRAY }}'", GRAY) css_content = css_content.replace("'{{ GRAY }}'", GRAY)
css_content = css_content.replace("'{{ CREWAI_ORANGE }}'", CREWAI_ORANGE) css_content = css_content.replace("'{{ CREWAI_ORANGE }}'", CREWAI_ORANGE)
css_content = css_content.replace("'{{ HITL_BLUE }}'", HITL_BLUE)
css_output_path.write_text(css_content, encoding="utf-8") css_output_path.write_text(css_content, encoding="utf-8")
@@ -430,6 +489,7 @@ def render_interactive(
js_content = js_content.replace("{{ DARK_GRAY }}", DARK_GRAY) js_content = js_content.replace("{{ DARK_GRAY }}", DARK_GRAY)
js_content = js_content.replace("{{ GRAY }}", GRAY) js_content = js_content.replace("{{ GRAY }}", GRAY)
js_content = js_content.replace("{{ CREWAI_ORANGE }}", CREWAI_ORANGE) js_content = js_content.replace("{{ CREWAI_ORANGE }}", CREWAI_ORANGE)
js_content = js_content.replace("{{ HITL_BLUE }}", HITL_BLUE)
js_content = js_content.replace("'{{ nodeData }}'", dag_nodes_json) js_content = js_content.replace("'{{ nodeData }}'", dag_nodes_json)
js_content = js_content.replace("'{{ dagData }}'", dag_full_json) js_content = js_content.replace("'{{ dagData }}'", dag_full_json)
js_content = js_content.replace("'{{ nodes_list_json }}'", json.dumps(nodes_list)) js_content = js_content.replace("'{{ nodes_list_json }}'", json.dumps(nodes_list))
@@ -441,6 +501,7 @@ def render_interactive(
html_content = template.render( html_content = template.render(
CREWAI_ORANGE=CREWAI_ORANGE, CREWAI_ORANGE=CREWAI_ORANGE,
HITL_BLUE=HITL_BLUE,
DARK_GRAY=DARK_GRAY, DARK_GRAY=DARK_GRAY,
WHITE=WHITE, WHITE=WHITE,
GRAY=GRAY, GRAY=GRAY,

View File

@@ -21,6 +21,11 @@ class NodeMetadata(TypedDict, total=False):
class_signature: str class_signature: str
class_name: str class_name: str
class_line_number: int class_line_number: int
is_human_feedback: bool
human_feedback_message: str
human_feedback_emit: list[str]
human_feedback_llm: str
human_feedback_default_outcome: str
class StructureEdge(TypedDict, total=False): class StructureEdge(TypedDict, total=False):

View File

@@ -299,14 +299,16 @@ class TestFlow(Flow):
return agent.kickoff("Test query") return agent.kickoff("Test query")
def verify_agent_parent_flow(result, agent, flow): def verify_agent_flow_context(result, agent, flow):
"""Verify that both the result and agent have the correct parent flow.""" """Verify that both the result and agent have the correct flow context."""
assert result.parent_flow is flow assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
assert agent is not None assert agent is not None
assert agent.parent_flow is flow assert agent._flow_id == flow.flow_id # type: ignore[attr-defined]
assert agent._request_id == flow.flow_id # type: ignore[attr-defined]
def test_sets_parent_flow_when_inside_flow(): def test_sets_flow_context_when_inside_flow():
"""Test that an Agent can be created and executed inside a Flow context.""" """Test that an Agent can be created and executed inside a Flow context."""
captured_event = None captured_event = None

View File

@@ -4520,7 +4520,7 @@ def test_crew_copy_with_memory():
pytest.fail(f"Copying crew raised an unexpected exception: {e}") pytest.fail(f"Copying crew raised an unexpected exception: {e}")
def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow(): def test_sets_flow_context_when_using_crewbase_pattern_inside_flow():
@CrewBase @CrewBase
class TestCrew: class TestCrew:
agents_config = None agents_config = None
@@ -4582,10 +4582,11 @@ def test_sets_parent_flow_when_using_crewbase_pattern_inside_flow():
flow.kickoff() flow.kickoff()
assert captured_crew is not None assert captured_crew is not None
assert captured_crew.parent_flow is flow assert captured_crew._flow_id == flow.flow_id # type: ignore[attr-defined]
assert captured_crew._request_id == flow.flow_id # type: ignore[attr-defined]
def test_sets_parent_flow_when_outside_flow(researcher, writer): def test_sets_flow_context_when_outside_flow(researcher, writer):
crew = Crew( crew = Crew(
agents=[researcher, writer], agents=[researcher, writer],
process=Process.sequential, process=Process.sequential,
@@ -4594,11 +4595,12 @@ def test_sets_parent_flow_when_outside_flow(researcher, writer):
Task(description="Task 2", expected_output="output", agent=writer), Task(description="Task 2", expected_output="output", agent=writer),
], ],
) )
assert crew.parent_flow is None assert not hasattr(crew, "_flow_id")
assert not hasattr(crew, "_request_id")
@pytest.mark.vcr() @pytest.mark.vcr()
def test_sets_parent_flow_when_inside_flow(researcher, writer): def test_sets_flow_context_when_inside_flow(researcher, writer):
class MyFlow(Flow): class MyFlow(Flow):
@start() @start()
def start(self): def start(self):
@@ -4615,7 +4617,8 @@ def test_sets_parent_flow_when_inside_flow(researcher, writer):
flow = MyFlow() flow = MyFlow()
result = flow.kickoff() result = flow.kickoff()
assert result.parent_flow is flow assert result._flow_id == flow.flow_id # type: ignore[attr-defined]
assert result._request_id == flow.flow_id # type: ignore[attr-defined]
def test_reset_knowledge_with_no_crew_knowledge(researcher, writer): def test_reset_knowledge_with_no_crew_knowledge(researcher, writer):

View File

@@ -8,6 +8,7 @@ from pathlib import Path
import pytest import pytest
from crewai.flow.flow import Flow, and_, listen, or_, router, start from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.human_feedback import human_feedback
from crewai.flow.visualization import ( from crewai.flow.visualization import (
build_flow_structure, build_flow_structure,
visualize_flow_structure, visualize_flow_structure,
@@ -667,4 +668,180 @@ def test_no_warning_for_properly_typed_router(caplog):
# No warnings should be logged # No warnings should be logged
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING] warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert not any("Could not determine return paths" in msg for msg in warning_messages) assert not any("Could not determine return paths" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages) assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
def test_human_feedback_node_metadata():
"""Test that human feedback nodes have correct metadata."""
from typing import Literal
class HITLFlow(Flow):
"""Flow with human-in-the-loop feedback."""
@start()
@human_feedback(
message="Please review the output:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_content(self) -> Literal["approved", "rejected"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
@listen("rejected")
def on_rejected(self):
return "discarded"
flow = HITLFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review_content"]
assert review_node["is_human_feedback"] is True
assert review_node["type"] == "human_feedback"
assert review_node["human_feedback_message"] == "Please review the output:"
assert review_node["human_feedback_emit"] == ["approved", "rejected"]
assert review_node["human_feedback_llm"] == "gpt-4o-mini"
def test_human_feedback_visualization_includes_hitl_data():
"""Test that visualization includes human feedback data in HTML."""
from typing import Literal
class HITLFlow(Flow):
"""Flow with human-in-the-loop feedback."""
@start()
@human_feedback(
message="Please review the output:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review_content(self) -> Literal["approved", "rejected"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
flow = HITLFlow()
structure = build_flow_structure(flow)
html_file = visualize_flow_structure(structure, "test_hitl.html", show=False)
html_path = Path(html_file)
js_file = html_path.parent / f"{html_path.stem}_script.js"
js_content = js_file.read_text(encoding="utf-8")
assert "HITL" in js_content
assert "Please review the output:" in js_content
assert "approved" in js_content
assert "rejected" in js_content
assert "#4A90E2" in js_content
def test_human_feedback_without_emit_metadata():
"""Test that human feedback without emit has correct metadata."""
class HITLSimpleFlow(Flow):
"""Flow with simple human feedback (no routing)."""
@start()
@human_feedback(message="Please provide feedback:")
def review_step(self):
return "content"
@listen(review_step)
def next_step(self):
return "done"
flow = HITLSimpleFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review_step"]
assert review_node["is_human_feedback"] is True
assert "is_router" not in review_node or review_node["is_router"] is False
assert review_node["type"] == "start"
assert review_node["human_feedback_message"] == "Please provide feedback:"
def test_human_feedback_with_default_outcome():
"""Test that human feedback with default outcome includes it in metadata."""
from typing import Literal
class HITLDefaultFlow(Flow):
"""Flow with human feedback that has a default outcome."""
@start()
@human_feedback(
message="Review this:",
emit=["approved", "needs_work"],
llm="gpt-4o-mini",
default_outcome="needs_work",
)
def review(self) -> Literal["approved", "needs_work"]:
return "approved"
@listen("approved")
def on_approved(self):
return "published"
@listen("needs_work")
def on_needs_work(self):
return "revised"
flow = HITLDefaultFlow()
structure = build_flow_structure(flow)
review_node = structure["nodes"]["review"]
assert review_node["is_human_feedback"] is True
assert review_node["human_feedback_default_outcome"] == "needs_work"
def test_mixed_router_and_human_feedback():
"""Test flow with both regular routers and human feedback routers."""
from typing import Literal
class MixedFlow(Flow):
"""Flow with both regular routers and HITL."""
@start()
def init(self):
return "initialized"
@router(init)
def auto_decision(self) -> Literal["path_a", "path_b"]:
return "path_a"
@listen("path_a")
@human_feedback(
message="Review this step:",
emit=["continue", "stop"],
llm="gpt-4o-mini",
)
def human_review(self) -> Literal["continue", "stop"]:
return "continue"
@listen("continue")
def proceed(self):
return "done"
@listen("stop")
def halt(self):
return "halted"
flow = MixedFlow()
structure = build_flow_structure(flow)
auto_node = structure["nodes"]["auto_decision"]
assert auto_node["type"] == "router"
assert auto_node["is_router"] is True
assert "is_human_feedback" not in auto_node or auto_node["is_human_feedback"] is False
human_node = structure["nodes"]["human_review"]
assert human_node["type"] == "human_feedback"
assert human_node["is_router"] is True
assert human_node["is_human_feedback"] is True
assert human_node["human_feedback_message"] == "Review this step:"