Identify parent_flow of a crew

This commit adds a new crew field called `parent_flow`, evaluated when
the `Crew` instance is instantiated. The stacktrace is traversed to look
up if the caller is an instance of `Flow`, and if so, it fills in the
field.

Other alternatives were considered, such as a global context or even a
new field to be manually filled, however, this is the most **magical**
solution that was thread-safe and did not require public API changes.
This commit is contained in:
Vinicius Brasil
2025-04-29 14:48:32 -03:00
parent f89c2bfb7e
commit 70cd98035f
2 changed files with 78 additions and 2 deletions

View File

@@ -1,4 +1,5 @@
import asyncio
import inspect
import json
import re
import uuid
@@ -6,7 +7,18 @@ import warnings
from concurrent.futures import Future
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
Union,
cast,
)
from pydantic import (
UUID4,
@@ -66,6 +78,9 @@ from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
if TYPE_CHECKING:
from crewai.flow import Flow
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -233,6 +248,10 @@ class Crew(BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the crew, including fingerprinting.",
)
parent_flow: Optional["InstanceOf[Flow]"] = Field(
default=None,
description="The parent flow of the crew, if the crew was created inside a flow.",
)
@field_validator("id", mode="before")
@classmethod
@@ -275,6 +294,31 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def set_parent_flow(self, max_depth: int = 5) -> Optional["Crew"]:
"""Find the nearest Flow instance in the call stack.
Args:
max_depth: Maximum frames to traverse up the call stack.
Returns:
The first Flow instance found in the call stack, or None.
"""
from crewai.flow import Flow
stack = inspect.stack(context=0)[1 : max_depth + 1]
try:
for frame_info in stack:
candidate = frame_info.frame.f_locals.get("self")
if isinstance(candidate, Flow):
self.parent_flow = candidate
break
else:
self.parent_flow = None
finally:
del stack
return self
def _initialize_user_memory(self):
if (
self.memory_config

View File

@@ -17,6 +17,7 @@ from crewai.agents.cache import CacheHandler
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.flow import Flow, listen, start
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
@@ -2164,7 +2165,6 @@ def test_tools_with_custom_caching():
with patch.object(
CacheHandler, "add", wraps=crew._cache_handler.add
) as add_to_cache:
result = crew.kickoff()
# Check that add_to_cache was called exactly twice
@@ -4351,3 +4351,35 @@ def test_crew_copy_with_memory():
raise e # Re-raise other validation errors
except Exception as e:
pytest.fail(f"Copying crew raised an unexpected exception: {e}")
def test_sets_parent_flow_when_outside_flow():
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[
Task(description="Task 1", expected_output="output", agent=researcher),
Task(description="Task 2", expected_output="output", agent=writer),
],
)
assert crew.parent_flow is None
def test_sets_parent_flow_when_inside_flow():
class MyFlow(Flow):
@start()
def start(self):
return Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[
Task(
description="Task 1", expected_output="output", agent=researcher
),
Task(description="Task 2", expected_output="output", agent=writer),
],
)
flow = MyFlow()
result = flow.kickoff()
assert result.parent_flow is flow