KISS: Refactor LiteAgent integration in flows to use Agents instead. Update documentation and examples to reflect changes in class usage, including async support and structured output handling. Enhance tests for Agent functionality and ensure compatibility with new features.

This commit is contained in:
lorenzejay
2025-04-09 11:26:08 -07:00
parent da42ec7eb9
commit 1d93842223
8 changed files with 3209 additions and 99 deletions

View File

@@ -545,16 +545,20 @@ The `third_method` and `fourth_method` listen to the output of the `second_metho
When you run this Flow, the output will change based on the random boolean value generated by the `start_method`.
## Adding LiteAgent to Flows
## Adding Agents to Flows
LiteAgents can be seamlessly integrated into your flows, providing a lightweight alternative to full Crews when you need simpler, focused task execution. Here's an example of how to use a LiteAgent within a flow to perform market research:
Agents can be seamlessly integrated into your flows, providing a lightweight alternative to full Crews when you need simpler, focused task execution. Here's an example of how to use an Agent within a flow to perform market research:
```python
from typing import List, cast
from crewai_tools.tools.website_search.website_search_tool import WebsiteSearchTool
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
from crewai.lite_agent import LiteAgent
# Define a structured output format
class MarketAnalysis(BaseModel):
@@ -562,28 +566,30 @@ class MarketAnalysis(BaseModel):
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Define flow state
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis | None = None
# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self):
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
def analyze_market(self):
# Create a LiteAgent for market research
analyst = LiteAgent(
async def analyze_market(self) -> Dict[str, Any]:
# Create an Agent for market research
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in "
"identifying market trends and opportunities.",
llm="gpt-4o",
tools=[WebsiteSearchTool()],
tools=[SerperDevTool()],
verbose=True,
response_format=MarketAnalysis,
)
# Define the research query
@@ -592,49 +598,65 @@ class MarketResearchFlow(Flow[MarketResearchState]):
1. Key market trends
2. Market size
3. Major competitors
Format your response according to the specified structure.
"""
# Execute the analysis
result = analyst.kickoff(query)
self.state.analysis = cast(MarketAnalysis, result.pydantic)
return result.pydantic
# Execute the analysis with structured output format
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
# Return the analysis to update the state
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self):
analysis = self.state.analysis
if analysis is None:
print("No analysis results available")
return
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
print("\nKey Market Trends:")
for trend in analysis.key_trends:
print(f"- {trend}")
if isinstance(analysis, dict):
# If we got a dict with 'analysis' key, extract the actual analysis object
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
print(f"\nMarket Size: {analysis.market_size}")
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
print("\nMajor Competitors:")
for competitor in analysis.competitors:
print(f"- {competitor}")
# Usage example
flow = MarketResearchFlow()
result = flow.kickoff(inputs={"product": "AI-powered chatbots"})
async def run_flow():
flow = MarketResearchFlow()
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
# Run the flow
if __name__ == "__main__":
asyncio.run(run_flow())
```
This example demonstrates several key features of using LiteAgents in flows:
This example demonstrates several key features of using Agents in flows:
1. **Structured Output**: Using Pydantic models to define the expected output format (`MarketAnalysis`) ensures type safety and structured data throughout the flow.
2. **State Management**: The flow state (`MarketResearchState`) maintains context between steps and stores both inputs and outputs.
3. **Tool Integration**: LiteAgents can use tools (like `WebsiteSearchTool`) to enhance their capabilities.
If you want to learn more about LiteAgents, check out the [LiteAgent](/concepts/lite-agent) page.
3. **Tool Integration**: Agents can use tools (like `WebsiteSearchTool`) to enhance their capabilities.
## Adding Crews to Flows

View File

@@ -93,11 +93,15 @@ This example demonstrates the core features of a LiteAgent:
For more complex scenarios, you can integrate LiteAgents into a Flow. Here's an example of a market research flow:
````python
from typing import List
import asyncio
from typing import Any, Dict, List
from crewai_tools import SerperDevTool
from pydantic import BaseModel, Field
from crewai.flow.flow import Flow, start, listen
from crewai.lite_agent import LiteAgent
from crewai.tools import WebSearchTool
from crewai.agent import Agent
from crewai.flow.flow import Flow, listen, start
# Define a structured output format
class MarketAnalysis(BaseModel):
@@ -105,29 +109,30 @@ class MarketAnalysis(BaseModel):
market_size: str = Field(description="Estimated market size")
competitors: List[str] = Field(description="Major competitors in the space")
# Define flow state
class MarketResearchState(BaseModel):
product: str = ""
analysis: MarketAnalysis = None
analysis: MarketAnalysis | None = None
# Create a flow class
class MarketResearchFlow(Flow[MarketResearchState]):
@start()
def initialize_research(self, product: str):
print(f"Starting market research for {product}")
self.state.product = product
def initialize_research(self) -> Dict[str, Any]:
print(f"Starting market research for {self.state.product}")
return {"product": self.state.product}
@listen(initialize_research)
async def analyze_market(self):
# Create a LiteAgent for market research
analyst = LiteAgent(
async def analyze_market(self) -> Dict[str, Any]:
# Create an Agent for market research
analyst = Agent(
role="Market Research Analyst",
goal=f"Analyze the market for {self.state.product}",
backstory="You are an experienced market analyst with expertise in "
"identifying market trends and opportunities.",
tools=[WebSearchTool()],
"identifying market trends and opportunities.",
tools=[SerperDevTool()],
verbose=True,
response_format=MarketAnalysis
)
# Define the research query
@@ -140,58 +145,53 @@ class MarketResearchFlow(Flow[MarketResearchState]):
Format your response according to the specified structure.
"""
# Execute the analysis
result = await analyst.kickoff_async(query)
self.state.analysis = result.pydantic
return result.pydantic
# Execute the analysis with structured output format
result = await analyst.kickoff_async(query, response_format=MarketAnalysis)
if result.pydantic:
print("result", result.pydantic)
else:
print("result", result)
# Return the analysis to update the state
return {"analysis": result.pydantic}
@listen(analyze_market)
def present_results(self):
analysis = self.state.analysis
def present_results(self, analysis) -> None:
print("\nMarket Analysis Results")
print("=====================")
print("\nKey Market Trends:")
for trend in analysis.key_trends:
print(f"- {trend}")
if isinstance(analysis, dict):
# If we got a dict with 'analysis' key, extract the actual analysis object
market_analysis = analysis.get("analysis")
else:
market_analysis = analysis
print(f"\nMarket Size: {analysis.market_size}")
if market_analysis and isinstance(market_analysis, MarketAnalysis):
print("\nKey Market Trends:")
for trend in market_analysis.key_trends:
print(f"- {trend}")
print(f"\nMarket Size: {market_analysis.market_size}")
print("\nMajor Competitors:")
for competitor in market_analysis.competitors:
print(f"- {competitor}")
else:
print("No structured analysis data available.")
print("Raw analysis:", analysis)
print("\nMajor Competitors:")
for competitor in analysis.competitors:
print(f"- {competitor}")
# Usage example
import asyncio
async def run_flow():
flow = MarketResearchFlow()
result = await flow.kickoff(inputs={"product": "AI-powered chatbots"})
result = await flow.kickoff_async(inputs={"product": "AI-powered chatbots"})
return result
# Run the flow
if __name__ == "__main__":
# Use asyncio.run at the top level only
asyncio.run(run_flow())
## Key Features
### 1. Simplified Setup
Unlike regular Agents, LiteAgents are designed for quick setup and standalone operation. They don't require crew configuration or task management.
### 2. Structured Output
LiteAgents support Pydantic models for response formatting, making it easy to get structured, type-safe data from your agent's operations.
### 3. Tool Integration
Just like regular Agents, LiteAgents can use tools to enhance their capabilities:
```python
from crewai.tools import SerperDevTool, CalculatorTool
agent = LiteAgent(
role="Research Assistant",
goal="Find and analyze information",
tools=[SerperDevTool(), CalculatorTool()],
verbose=True
)
````
### 4. Async Support