Compare commits

..

1 Commits

Author SHA1 Message Date
Devin AI
a4b22e92ad fix: save flow plot HTML to current working directory instead of temp dir
Fixes #4991

Previously,  saved generated HTML/CSS/JS files to a
hidden system temp directory (via ), making the output
inaccessible from the user's project folder. The CLI also printed a
misleading message suggesting the file was saved locally.

Changes:
-  now defaults to saving files in the current
  working directory () instead of a temp directory.
- Added an  parameter to both  and
   so users can optionally specify a custom output directory.
- Added 5 new tests covering CWD output, explicit output_dir, and
  absolute path guarantees.

Co-Authored-By: João <joao@crewai.com>
2026-03-20 19:55:22 +00:00
9 changed files with 129 additions and 371 deletions

View File

@@ -1315,25 +1315,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
context = self._pending_feedback_context
emit = context.emit
default_outcome = context.default_outcome
# Try to get the live LLM from the re-imported decorator instead of the
# serialized string. When a flow pauses for HITL and resumes (possibly in
# a different process), context.llm only contains a model string like
# 'gemini/gemini-3-flash-preview'. This loses credentials, project,
# location, safety_settings, and client_params. By looking up the method
# on the re-imported flow class, we can retrieve the fully-configured LLM
# that was passed to the @human_feedback decorator.
llm = context.llm # fallback to serialized string
method = self._methods.get(FlowMethodName(context.method_name))
if method is not None:
live_llm = getattr(method, "_hf_llm", None)
if live_llm is not None:
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
# Only use live LLM if it's a BaseLLM instance (not a string)
# String values offer no benefit over the serialized context.llm
if isinstance(live_llm, BaseLLMClass):
llm = live_llm
llm = context.llm
# Determine outcome
collapsed_outcome: str | None = None
@@ -3171,12 +3153,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
else:
logger.warning(message)
def plot(self, filename: str = "crewai_flow.html", show: bool = True) -> str:
def plot(
self,
filename: str = "crewai_flow.html",
show: bool = True,
output_dir: str | None = None,
) -> str:
"""Create interactive HTML visualization of Flow structure.
Args:
filename: Output HTML filename (default: "crewai_flow.html").
show: Whether to open in browser (default: True).
output_dir: Directory to save generated files. Defaults to the
current working directory.
Returns:
Absolute path to generated HTML file.
@@ -3189,7 +3178,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
),
)
structure = build_flow_structure(self)
return render_interactive(structure, filename=filename, show=show)
return render_interactive(
structure, filename=filename, show=show, output_dir=output_dir
)
@staticmethod
def _show_tracing_disabled_message() -> None:

View File

@@ -75,7 +75,6 @@ class FlowMethod(Generic[P, R]):
"__is_router__",
"__router_paths__",
"__human_feedback_config__",
"_hf_llm", # Live LLM object for HITL resume
]:
if hasattr(meth, attr):
setattr(self, attr, getattr(meth, attr))

View File

@@ -572,14 +572,6 @@ def human_feedback(
wrapper.__is_router__ = True
wrapper.__router_paths__ = list(emit)
# Stash the live LLM object for HITL resume to retrieve.
# When a flow pauses for human feedback and later resumes (possibly in a
# different process), the serialized context only contains a model string.
# By storing the original LLM on the wrapper, resume_async can retrieve
# the fully-configured LLM (with credentials, project, safety_settings, etc.)
# instead of creating a bare LLM from just the model string.
wrapper._hf_llm = llm
return wrapper # type: ignore[no-any-return]
return decorator

View File

@@ -2,7 +2,6 @@
import json
from pathlib import Path
import tempfile
from typing import Any, ClassVar
import webbrowser
@@ -205,20 +204,24 @@ def render_interactive(
dag: FlowStructure,
filename: str = "flow_dag.html",
show: bool = True,
output_dir: str | None = None,
) -> str:
"""Create interactive HTML visualization of Flow structure.
Generates three output files in a temporary directory: HTML template,
CSS stylesheet, and JavaScript. Optionally opens the visualization in
default browser.
Generates three output files: HTML template, CSS stylesheet, and
JavaScript. Files are saved to the specified output directory, or the
current working directory when *output_dir* is ``None``. Optionally
opens the visualization in the default browser.
Args:
dag: FlowStructure to visualize.
filename: Output HTML filename (basename only, no path).
show: Whether to open in browser.
output_dir: Directory to save generated files. Defaults to the
current working directory (``os.getcwd()``).
Returns:
Absolute path to generated HTML file in temporary directory.
Absolute path to generated HTML file.
"""
node_positions = calculate_node_positions(dag)
@@ -403,12 +406,13 @@ def render_interactive(
extensions=[CSSExtension, JSExtension],
)
temp_dir = Path(tempfile.mkdtemp(prefix="crewai_flow_"))
output_path = temp_dir / Path(filename).name
dest_dir = Path(output_dir) if output_dir else Path.cwd()
dest_dir.mkdir(parents=True, exist_ok=True)
output_path = dest_dir / Path(filename).name
css_filename = output_path.stem + "_style.css"
css_output_path = temp_dir / css_filename
css_output_path = dest_dir / css_filename
js_filename = output_path.stem + "_script.js"
js_output_path = temp_dir / js_filename
js_output_path = dest_dir / js_filename
css_file = template_dir / "style.css"
css_content = css_file.read_text(encoding="utf-8")

View File

@@ -281,7 +281,6 @@ class BaseTool(BaseModel, ABC):
result_as_answer=self.result_as_answer,
max_usage_count=self.max_usage_count,
current_usage_count=self.current_usage_count,
cache_function=self.cache_function,
)
structured_tool._original_tool = self
return structured_tool

View File

@@ -58,7 +58,6 @@ class CrewStructuredTool:
result_as_answer: bool = False,
max_usage_count: int | None = None,
current_usage_count: int = 0,
cache_function: Callable[..., bool] | None = None,
) -> None:
"""Initialize the structured tool.
@@ -70,7 +69,6 @@ class CrewStructuredTool:
result_as_answer: Whether to return the output directly
max_usage_count: Maximum number of times this tool can be used. None means unlimited usage.
current_usage_count: Current number of times this tool has been used.
cache_function: Function to determine if the tool result should be cached.
"""
self.name = name
self.description = description
@@ -80,7 +78,6 @@ class CrewStructuredTool:
self.result_as_answer = result_as_answer
self.max_usage_count = max_usage_count
self.current_usage_count = current_usage_count
self.cache_function = cache_function
self._original_tool: BaseTool | None = None
# Validate the function signature matches the schema
@@ -89,7 +86,7 @@ class CrewStructuredTool:
@classmethod
def from_function(
cls,
func: Callable[..., Any],
func: Callable,
name: str | None = None,
description: str | None = None,
return_direct: bool = False,
@@ -150,7 +147,7 @@ class CrewStructuredTool:
@staticmethod
def _create_schema_from_function(
name: str,
func: Callable[..., Any],
func: Callable,
) -> type[BaseModel]:
"""Create a Pydantic schema from a function's signature.
@@ -185,7 +182,7 @@ class CrewStructuredTool:
# Create model
schema_name = f"{name.title()}Schema"
return create_model(schema_name, **fields) # type: ignore[call-overload, no-any-return]
return create_model(schema_name, **fields) # type: ignore[call-overload]
def _validate_function_signature(self) -> None:
"""Validate that the function signature matches the args schema."""
@@ -213,7 +210,7 @@ class CrewStructuredTool:
f"not found in args_schema"
)
def _parse_args(self, raw_args: str | dict[str, Any]) -> dict[str, Any]:
def _parse_args(self, raw_args: str | dict) -> dict:
"""Parse and validate the input arguments against the schema.
Args:
@@ -237,8 +234,8 @@ class CrewStructuredTool:
async def ainvoke(
self,
input: str | dict[str, Any],
config: dict[str, Any] | None = None,
input: str | dict,
config: dict | None = None,
**kwargs: Any,
) -> Any:
"""Asynchronously invoke the tool.
@@ -272,7 +269,7 @@ class CrewStructuredTool:
except Exception:
raise
def _run(self, *args: Any, **kwargs: Any) -> Any:
def _run(self, *args, **kwargs) -> Any:
"""Legacy method for compatibility."""
# Convert args/kwargs to our expected format
input_dict = dict(zip(self.args_schema.model_fields.keys(), args, strict=False))
@@ -280,10 +277,7 @@ class CrewStructuredTool:
return self.invoke(input_dict)
def invoke(
self,
input: str | dict[str, Any],
config: dict[str, Any] | None = None,
**kwargs: Any,
self, input: str | dict, config: dict | None = None, **kwargs: Any
) -> Any:
"""Main method for tool execution."""
parsed_args = self._parse_args(input)
@@ -319,10 +313,9 @@ class CrewStructuredTool:
self._original_tool.current_usage_count = self.current_usage_count
@property
def args(self) -> dict[str, Any]:
def args(self) -> dict:
"""Get the tool's input arguments schema."""
schema: dict[str, Any] = self.args_schema.model_json_schema()["properties"]
return schema
return self.args_schema.model_json_schema()["properties"]
def __repr__(self) -> str:
return f"CrewStructuredTool(name='{sanitize_tool_name(self.name)}', description='{self.description}')"

View File

@@ -1216,275 +1216,3 @@ class TestAsyncHumanFeedbackEdgeCases:
assert flow.last_human_feedback.outcome == "approved"
assert flow.last_human_feedback.feedback == ""
# =============================================================================
# Tests for _hf_llm attribute and live LLM resolution on resume
# =============================================================================
class TestLiveLLMPreservationOnResume:
"""Tests for preserving the full LLM config across HITL resume."""
def test_hf_llm_attribute_set_on_wrapper_with_basellm(self) -> None:
"""Test that _hf_llm is set on the wrapper when llm is a BaseLLM instance."""
from crewai.llms.base_llm import BaseLLM
# Create a mock BaseLLM object
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_hf_llm")
assert method._hf_llm is mock_llm
def test_hf_llm_attribute_set_on_wrapper_with_string(self) -> None:
"""Test that _hf_llm is set on the wrapper even when llm is a string."""
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("review")
assert method is not None
assert hasattr(method, "_hf_llm")
assert method._hf_llm == "gpt-4o-mini"
@patch("crewai.flow.flow.crewai_event_bus.emit")
def test_resume_async_uses_live_basellm_over_serialized_string(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async uses the live BaseLLM from decorator instead of serialized string.
This is the main bug fix: when a flow resumes, it should use the fully-configured
LLM from the re-imported decorator (with credentials, project, etc.) instead of
creating a new LLM from just the model string.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
from crewai.llms.base_llm import BaseLLM
# Create a mock BaseLLM with full config (simulating Gemini with service account)
live_llm = MagicMock(spec=BaseLLM)
live_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
result_path: str = ""
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm=live_llm, # Full LLM object with credentials
)
def review(self):
return "content"
@listen("approved")
def handle_approved(self):
self.result_path = "approved"
return "Approved!"
# Save pending feedback with just a model STRING (simulating serialization)
context = PendingFeedbackContext(
flow_id="live-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gemini/gemini-3-flash", # Serialized string, NOT the live object
)
persistence.save_pending_feedback(
flow_uuid="live-llm-test",
context=context,
state_data={"id": "live-llm-test"},
)
# Restore flow - this re-imports the class with the live LLM
flow = TestFlow.from_pending("live-llm-test", persistence)
# Mock _collapse_to_outcome to capture what LLM it receives
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# The key assertion: _collapse_to_outcome received the LIVE BaseLLM object,
# NOT the serialized string. The live_llm was captured at class definition
# time and stored on the method wrapper as _hf_llm.
assert len(captured_llm) == 1
# Verify it's the same object that was passed to the decorator
# (which is stored on the method's _hf_llm attribute)
method = flow._methods.get("review")
assert method is not None
assert captured_llm[0] is method._hf_llm
# And verify it's a BaseLLM instance, not a string
assert isinstance(captured_llm[0], BaseLLM)
@patch("crewai.flow.flow.crewai_event_bus.emit")
def test_resume_async_falls_back_to_serialized_string_when_no_hf_llm(
self, mock_emit: MagicMock
) -> None:
"""Test that resume_async falls back to context.llm when _hf_llm is not available.
This ensures backward compatibility with flows that were paused before this fix.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class TestFlow(Flow):
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "content"
# Save pending feedback
context = PendingFeedbackContext(
flow_id="fallback-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
persistence.save_pending_feedback(
flow_uuid="fallback-test",
context=context,
state_data={"id": "fallback-test"},
)
flow = TestFlow.from_pending("fallback-test", persistence)
# Remove _hf_llm to simulate old decorator without this attribute
method = flow._methods.get("review")
if hasattr(method, "_hf_llm"):
delattr(method, "_hf_llm")
# Mock _collapse_to_outcome to capture what LLM it receives
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# Should fall back to the serialized string
assert len(captured_llm) == 1
assert captured_llm[0] == "gpt-4o-mini"
@patch("crewai.flow.flow.crewai_event_bus.emit")
def test_resume_async_uses_string_from_context_when_hf_llm_is_string(
self, mock_emit: MagicMock
) -> None:
"""Test that when _hf_llm is a string (not BaseLLM), we still use context.llm.
String LLM values offer no benefit over the serialized context.llm,
so we don't prefer them.
"""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test_flows.db")
persistence = SQLiteFlowPersistence(db_path)
class TestFlow(Flow):
@start()
@human_feedback(
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini", # String LLM
)
def review(self):
return "content"
# Save pending feedback
context = PendingFeedbackContext(
flow_id="string-llm-test",
flow_class="TestFlow",
method_name="review",
method_output="content",
message="Approve?",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
persistence.save_pending_feedback(
flow_uuid="string-llm-test",
context=context,
state_data={"id": "string-llm-test"},
)
flow = TestFlow.from_pending("string-llm-test", persistence)
# Verify _hf_llm is a string
method = flow._methods.get("review")
assert method._hf_llm == "gpt-4o-mini"
# Mock _collapse_to_outcome to capture what LLM it receives
captured_llm = []
def capture_llm(feedback, outcomes, llm):
captured_llm.append(llm)
return "approved"
with patch.object(flow, "_collapse_to_outcome", side_effect=capture_llm):
flow.resume("looks good!")
# Should use context.llm since _hf_llm is a string (not BaseLLM)
assert len(captured_llm) == 1
assert captured_llm[0] == "gpt-4o-mini"
def test_hf_llm_set_for_async_wrapper(self) -> None:
"""Test that _hf_llm is set on async wrapper functions."""
import asyncio
from crewai.llms.base_llm import BaseLLM
mock_llm = MagicMock(spec=BaseLLM)
mock_llm.model = "gemini/gemini-3-flash"
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm=mock_llm,
)
async def async_review(self):
return "content"
flow = TestFlow()
method = flow._methods.get("async_review")
assert method is not None
assert hasattr(method, "_hf_llm")
assert method._hf_llm is mock_llm

View File

@@ -333,9 +333,9 @@ def test_visualization_plot_method():
"""Test that flow.plot() method works."""
flow = SimpleFlow()
html_file = flow.plot("test_plot.html", show=False)
assert os.path.exists(html_file)
with tempfile.TemporaryDirectory() as tmp_dir:
html_file = flow.plot("test_plot.html", show=False, output_dir=tmp_dir)
assert os.path.exists(html_file)
def test_router_paths_to_string_conditions():
@@ -667,4 +667,94 @@ def test_no_warning_for_properly_typed_router(caplog):
# No warnings should be logged
warning_messages = [r.message for r in caplog.records if r.levelno >= logging.WARNING]
assert not any("Could not determine return paths" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
assert not any("Found listeners waiting for triggers" in msg for msg in warning_messages)
def test_plot_saves_to_current_working_directory():
"""Test that plot() saves the HTML file to the current working directory by default.
Regression test for https://github.com/crewAIInc/crewAI/issues/4991
"""
flow = SimpleFlow()
with tempfile.TemporaryDirectory() as tmp_dir:
original_cwd = os.getcwd()
try:
os.chdir(tmp_dir)
html_file = flow.plot("test_cwd_plot.html", show=False)
# The returned path must live inside the CWD, not a hidden temp dir
assert Path(html_file).parent == Path(tmp_dir)
assert os.path.exists(html_file)
assert html_file == str(Path(tmp_dir) / "test_cwd_plot.html")
finally:
os.chdir(original_cwd)
def test_plot_saves_to_explicit_output_dir():
"""Test that plot() saves files to a user-specified output directory."""
flow = SimpleFlow()
with tempfile.TemporaryDirectory() as output_dir:
html_file = flow.plot(
"custom_output.html", show=False, output_dir=output_dir
)
assert Path(html_file).parent == Path(output_dir)
assert os.path.exists(html_file)
# CSS and JS companion files should also be in the same directory
html_path = Path(html_file)
css_file = html_path.parent / f"{html_path.stem}_style.css"
js_file = html_path.parent / f"{html_path.stem}_script.js"
assert css_file.exists()
assert js_file.exists()
def test_render_interactive_saves_to_cwd_by_default():
"""Test that render_interactive() writes to CWD when output_dir is None.
Regression test for https://github.com/crewAIInc/crewAI/issues/4991
"""
flow = SimpleFlow()
structure = build_flow_structure(flow)
with tempfile.TemporaryDirectory() as tmp_dir:
original_cwd = os.getcwd()
try:
os.chdir(tmp_dir)
html_file = visualize_flow_structure(
structure, "cwd_test.html", show=False
)
assert Path(html_file).parent == Path(tmp_dir)
assert os.path.exists(html_file)
finally:
os.chdir(original_cwd)
def test_render_interactive_saves_to_specified_output_dir():
"""Test that render_interactive() writes to the specified output_dir."""
flow = SimpleFlow()
structure = build_flow_structure(flow)
with tempfile.TemporaryDirectory() as output_dir:
html_file = visualize_flow_structure(
structure, "output_dir_test.html", show=False, output_dir=output_dir
)
assert Path(html_file).parent == Path(output_dir)
assert os.path.exists(html_file)
with open(html_file, "r", encoding="utf-8") as f:
html_content = f.read()
assert "<!DOCTYPE html>" in html_content
def test_plot_returned_path_is_absolute():
"""Test that the path returned by plot() is always absolute."""
flow = SimpleFlow()
with tempfile.TemporaryDirectory() as tmp_dir:
html_file = flow.plot("abs_path_test.html", show=False, output_dir=tmp_dir)
assert os.path.isabs(html_file)

View File

@@ -38,44 +38,6 @@ def test_initialization(basic_function, schema_class):
assert tool.args_schema == schema_class
def test_cache_function_passed_through(basic_function, schema_class):
"""Test that cache_function is stored on CrewStructuredTool."""
def no_cache(_args: dict, _result: str) -> bool:
return False
tool = CrewStructuredTool(
name="test_tool",
description="Test tool description",
func=basic_function,
args_schema=schema_class,
cache_function=no_cache,
)
assert tool.cache_function is no_cache
def test_base_tool_passes_cache_function_to_structured_tool():
"""Test that BaseTool.to_structured_tool propagates cache_function."""
from crewai.tools import BaseTool
def no_cache(_args: dict, _result: str) -> bool:
return False
class MyCacheTool(BaseTool):
name: str = "cache_test"
description: str = "tool for testing cache passthrough"
def _run(self, query: str = "") -> str:
return "result"
my_tool = MyCacheTool()
my_tool.cache_function = no_cache # type: ignore[assignment]
structured = my_tool.to_structured_tool()
assert structured.cache_function is no_cache
def test_from_function(basic_function):
"""Test creating tool from function"""
tool = CrewStructuredTool.from_function(