Move to src/crewai/flow/flow_trackable.py

This commit is contained in:
Vinicius Brasil
2025-04-30 18:45:25 -03:00
parent 5aa0659a7c
commit 7f97ace752
4 changed files with 109 additions and 32 deletions

View File

@@ -1,5 +1,4 @@
import asyncio
import inspect
import json
import re
import uuid
@@ -35,7 +34,7 @@ from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.flow import Flow
from crewai.flow.flow_trackable import FlowTrackable
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.llm import LLM, BaseLLM
@@ -81,7 +80,7 @@ from crewai.utilities.training_handler import CrewTrainingHandler
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
class Crew(BaseModel):
class Crew(FlowTrackable, BaseModel):
"""
Represents a group of agents, defining how they should collaborate and the tasks they should perform.
@@ -245,10 +244,6 @@ class Crew(BaseModel):
default_factory=SecurityConfig,
description="Security configuration for the crew, including fingerprinting.",
)
parent_flow: Optional[InstanceOf[Flow]] = Field(
default=None,
description="The parent flow of the crew, if the crew was created inside a flow.",
)
@field_validator("id", mode="before")
@classmethod
@@ -291,29 +286,6 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def set_parent_flow(self, max_depth: int = 5) -> Optional["Crew"]:
"""Find the nearest Flow instance in the call stack.
Args:
max_depth: Maximum frames to traverse up the call stack.
Returns:
The first Flow instance found in the call stack, or None.
"""
stack = inspect.stack(context=0)[1 : max_depth + 1]
try:
for frame_info in stack:
candidate = frame_info.frame.f_locals.get("self")
if isinstance(candidate, Flow):
self.parent_flow = candidate
break
else:
self.parent_flow = None
finally:
del stack
return self
def _initialize_user_memory(self):
if (
self.memory_config

View File

@@ -0,0 +1,44 @@
import inspect
from typing import Optional
from pydantic import BaseModel, Field, InstanceOf, model_validator
from crewai.flow import Flow
class FlowTrackable(BaseModel):
"""Mixin that tracks the Flow instance that instantiated the object, e.g. a
Flow instance that created a Crew or Agent.
Automatically finds and stores a reference to the parent Flow instance by
inspecting the call stack.
"""
parent_flow: Optional[InstanceOf[Flow]] = Field(
default=None,
description="The parent flow of the instance, if it was created inside a flow.",
)
@model_validator(mode="after")
def _set_parent_flow(self, max_depth: int = 5) -> "FlowTrackable":
frame = inspect.currentframe()
try:
if frame is None:
return self
frame = frame.f_back
for _ in range(max_depth):
if frame is None:
break
candidate = frame.f_locals.get("self")
if isinstance(candidate, Flow):
self.parent_flow = candidate
break
frame = frame.f_back
finally:
del frame
return self

View File

@@ -13,6 +13,7 @@ from crewai.agents.parser import (
AgentFinish,
OutputParserException,
)
from crewai.flow.flow_trackable import FlowTrackable
from crewai.llm import LLM
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
@@ -80,7 +81,7 @@ class LiteAgentOutput(BaseModel):
return self.raw
class LiteAgent(BaseModel):
class LiteAgent(FlowTrackable, BaseModel):
"""
A lightweight agent that can process messages and use tools.
@@ -162,7 +163,7 @@ class LiteAgent(BaseModel):
_messages: List[Dict[str, str]] = PrivateAttr(default_factory=list)
_iterations: int = PrivateAttr(default=0)
_printer: Printer = PrivateAttr(default_factory=Printer)
@model_validator(mode="after")
def setup_llm(self):
"""Set up the LLM and other components after initialization."""

View File

@@ -1,13 +1,16 @@
import asyncio
from typing import cast
from unittest.mock import Mock
import pytest
from pydantic import BaseModel, Field
from crewai import LLM, Agent
from crewai.flow import Flow, start
from crewai.lite_agent import LiteAgent, LiteAgentOutput
from crewai.tools import BaseTool
from crewai.utilities.events import crewai_event_bus
from crewai.utilities.events.agent_events import LiteAgentExecutionStartedEvent
from crewai.utilities.events.tool_usage_events import ToolUsageStartedEvent
@@ -255,3 +258,60 @@ async def test_lite_agent_returns_usage_metrics_async():
assert "21 million" in result.raw or "37 million" in result.raw
assert result.usage_metrics is not None
assert result.usage_metrics["total_tokens"] > 0
class TestFlow(Flow):
"""A test flow that creates and runs an agent."""
def __init__(self, llm, tools):
self.llm = llm
self.tools = tools
super().__init__()
@start()
def start(self):
agent = Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
llm=self.llm,
tools=self.tools,
)
return agent.kickoff("Test query")
def verify_agent_parent_flow(result, agent, flow):
"""Verify that both the result and agent have the correct parent flow."""
assert result.parent_flow is flow
assert agent is not None
assert agent.parent_flow is flow
def test_sets_parent_flow_when_inside_flow():
captured_agent = None
@crewai_event_bus.on(LiteAgentExecutionStartedEvent)
def capture_agent(source, event):
nonlocal captured_agent
captured_agent = source
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Test response"
class MyFlow(Flow):
@start()
def start(self):
agent = Agent(
role="Test Agent",
goal="Test Goal",
backstory="Test Backstory",
llm=mock_llm,
tools=[WebSearchTool()],
)
return agent.kickoff("Test query")
flow = MyFlow()
with crewai_event_bus.scoped_handlers():
result = flow.kickoff()
assert result.parent_flow is flow
assert captured_agent.parent_flow is flow