import inspect from typing import Optional from pydantic import BaseModel, Field, InstanceOf, model_validator from crewai.flow import Flow class FlowTrackable(BaseModel): """Mixin that tracks the Flow instance that instantiated the object, e.g. a Flow instance that created a Crew or Agent. Automatically finds and stores a reference to the parent Flow instance by inspecting the call stack. """ parent_flow: Optional[InstanceOf[Flow]] = Field( default=None, description="The parent flow of the instance, if it was created inside a flow.", ) @model_validator(mode="after") def _set_parent_flow(self, max_depth: int = 5) -> "FlowTrackable": frame = inspect.currentframe() try: if frame is None: return self frame = frame.f_back for _ in range(max_depth): if frame is None: break candidate = frame.f_locals.get("self") if isinstance(candidate, Flow): self.parent_flow = candidate break frame = frame.f_back finally: del frame return self