From 70cd98035fb31b59b6f843b3fd0c1bb89bcaaa1c Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Tue, 29 Apr 2025 14:48:32 -0300 Subject: [PATCH] Identify `parent_flow` of a crew This commit adds a new crew field called `parent_flow`, evaluated when the `Crew` instance is instantiated. The stacktrace is traversed to look up if the caller is an instance of `Flow`, and if so, it fills in the field. Other alternatives were considered, such as a global context or even a new field to be manually filled, however, this is the most **magical** solution that was thread-safe and did not require public API changes. --- src/crewai/crew.py | 46 +++++++++++++++++++++++++++++++++++++++++++++- tests/crew_test.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 75433554c..bac8c085a 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -1,4 +1,5 @@ import asyncio +import inspect import json import re import uuid @@ -6,7 +7,18 @@ import warnings from concurrent.futures import Future from copy import copy as shallow_copy from hashlib import md5 -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Set, + Tuple, + Union, + cast, +) from pydantic import ( UUID4, @@ -66,6 +78,9 @@ from crewai.utilities.planning_handler import CrewPlanner from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler from crewai.utilities.training_handler import CrewTrainingHandler +if TYPE_CHECKING: + from crewai.flow import Flow + warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd") @@ -233,6 +248,10 @@ class Crew(BaseModel): default_factory=SecurityConfig, description="Security configuration for the crew, including fingerprinting.", ) + parent_flow: Optional["InstanceOf[Flow]"] = Field( + default=None, + description="The parent flow of the crew, if the crew was created inside a flow.", + ) @field_validator("id", mode="before") @classmethod @@ -275,6 +294,31 @@ class Crew(BaseModel): return self + @model_validator(mode="after") + def set_parent_flow(self, max_depth: int = 5) -> Optional["Crew"]: + """Find the nearest Flow instance in the call stack. + + Args: + max_depth: Maximum frames to traverse up the call stack. + + Returns: + The first Flow instance found in the call stack, or None. + """ + from crewai.flow import Flow + + stack = inspect.stack(context=0)[1 : max_depth + 1] + try: + for frame_info in stack: + candidate = frame_info.frame.f_locals.get("self") + if isinstance(candidate, Flow): + self.parent_flow = candidate + break + else: + self.parent_flow = None + finally: + del stack + return self + def _initialize_user_memory(self): if ( self.memory_config diff --git a/tests/crew_test.py b/tests/crew_test.py index aa23294fb..6ca67cfd9 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -17,6 +17,7 @@ from crewai.agents.cache import CacheHandler from crewai.agents.crew_agent_executor import CrewAgentExecutor from crewai.crew import Crew from crewai.crews.crew_output import CrewOutput +from crewai.flow import Flow, listen, start from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource from crewai.llm import LLM from crewai.memory.contextual.contextual_memory import ContextualMemory @@ -2164,7 +2165,6 @@ def test_tools_with_custom_caching(): with patch.object( CacheHandler, "add", wraps=crew._cache_handler.add ) as add_to_cache: - result = crew.kickoff() # Check that add_to_cache was called exactly twice @@ -4351,3 +4351,35 @@ def test_crew_copy_with_memory(): raise e # Re-raise other validation errors except Exception as e: pytest.fail(f"Copying crew raised an unexpected exception: {e}") + + +def test_sets_parent_flow_when_outside_flow(): + crew = Crew( + agents=[researcher, writer], + process=Process.sequential, + tasks=[ + Task(description="Task 1", expected_output="output", agent=researcher), + Task(description="Task 2", expected_output="output", agent=writer), + ], + ) + assert crew.parent_flow is None + + +def test_sets_parent_flow_when_inside_flow(): + class MyFlow(Flow): + @start() + def start(self): + return Crew( + agents=[researcher, writer], + process=Process.sequential, + tasks=[ + Task( + description="Task 1", expected_output="output", agent=researcher + ), + Task(description="Task 2", expected_output="output", agent=writer), + ], + ) + + flow = MyFlow() + result = flow.kickoff() + assert result.parent_flow is flow