From 33192237a536660e65370a7d45fb7078476a9950 Mon Sep 17 00:00:00 2001 From: Brandon Hancock Date: Thu, 13 Mar 2025 09:16:18 -0400 Subject: [PATCH] WIP --- examples/lite_agent_example.py | 140 +++++++ src/crewai/lite_agent.py | 394 ++++++++++++++++++ src/crewai/utilities/events/agent_events.py | 28 ++ src/crewai/utilities/events/event_listener.py | 35 +- 4 files changed, 596 insertions(+), 1 deletion(-) create mode 100644 examples/lite_agent_example.py create mode 100644 src/crewai/lite_agent.py diff --git a/examples/lite_agent_example.py b/examples/lite_agent_example.py new file mode 100644 index 000000000..5fb4b5695 --- /dev/null +++ b/examples/lite_agent_example.py @@ -0,0 +1,140 @@ +""" +Example script demonstrating how to use the LiteAgent. + +This example shows how to create and use a LiteAgent for simple interactions +without the need for a full crew or task-based workflow. +""" + +import asyncio +from typing import Any, Dict, cast + +from pydantic import BaseModel, Field + +from crewai.lite_agent import LiteAgent +from crewai.tools.base_tool import BaseTool + + +# Define custom tools +class WebSearchTool(BaseTool): + """Tool for searching the web for information.""" + + name: str = "search_web" + description: str = "Search the web for information about a topic." + + def _run(self, query: str) -> str: + """Search the web for information about a topic.""" + # This is a mock implementation + if "tokyo" in query.lower(): + return "Tokyo's population in 2023 was approximately 14 million people in the city proper, and 37 million in the greater metropolitan area." + elif "climate change" in query.lower() and "coral" in query.lower(): + return "Climate change severely impacts coral reefs through: 1) Ocean warming causing coral bleaching, 2) Ocean acidification reducing calcification, 3) Sea level rise affecting light availability, 4) Increased storm frequency damaging reef structures. Sources: NOAA Coral Reef Conservation Program, Global Coral Reef Alliance." + else: + return f"Found information about {query}: This is a simulated search result for demonstration purposes." + + +class CalculatorTool(BaseTool): + """Tool for performing calculations.""" + + name: str = "calculate" + description: str = "Calculate the result of a mathematical expression." + + def _run(self, expression: str) -> str: + """Calculate the result of a mathematical expression.""" + try: + # CAUTION: eval can be dangerous in production code + # This is just for demonstration purposes + result = eval(expression, {"__builtins__": {}}) + return f"The result of {expression} is {result}" + except Exception as e: + return f"Error calculating {expression}: {str(e)}" + + +# Define a custom response format using Pydantic +class ResearchResult(BaseModel): + """Structure for research results.""" + + main_findings: str = Field(description="The main findings from the research") + key_points: list[str] = Field(description="List of key points") + sources: list[str] = Field(description="List of sources used") + + +async def main(): + # Create tools + web_search_tool = WebSearchTool() + calculator_tool = CalculatorTool() + + # Create a LiteAgent with a specific role, goal, and backstory + agent = LiteAgent( + role="Research Analyst", + goal="Provide accurate and concise information on requested topics", + backstory="You are an expert research analyst with years of experience in gathering and synthesizing information from various sources.", + llm="gpt-4", # You can use any supported LLM + tools=[web_search_tool, calculator_tool], + verbose=True, + response_format=ResearchResult, # Optional: Use a structured output format + ) + + # Example 1: Simple query with raw text response + print("\n=== Example 1: Simple Query ===") + result = await agent.kickoff_async("What is the population of Tokyo in 2023?") + print(f"Raw response: {result.raw}") + + # Example 2: Query with structured output + print("\n=== Example 2: Structured Output ===") + structured_query = """ + Research the impact of climate change on coral reefs. + + YOU MUST format your response as a valid JSON object with the following structure: + { + "main_findings": "A summary of the main findings", + "key_points": ["Point 1", "Point 2", "Point 3"], + "sources": ["Source 1", "Source 2"] + } + + Include at least 3 key points and 2 sources. Wrap your JSON in ```json and ``` tags. + """ + + result = await agent.kickoff_async(structured_query) + + if result.pydantic: + # Cast to the specific type for better IDE support + research_result = cast(ResearchResult, result.pydantic) + print(f"Main findings: {research_result.main_findings}") + print("\nKey points:") + for i, point in enumerate(research_result.key_points, 1): + print(f"{i}. {point}") + print("\nSources:") + for i, source in enumerate(research_result.sources, 1): + print(f"{i}. {source}") + else: + print(f"Raw response: {result.raw}") + print( + "\nNote: Structured output was not generated. The LLM may need more explicit instructions to format the response as JSON." + ) + + # Example 3: Multi-turn conversation + print("\n=== Example 3: Multi-turn Conversation ===") + messages = [ + {"role": "user", "content": "I'm planning a trip to Japan."}, + { + "role": "assistant", + "content": "That sounds exciting! Japan is a beautiful country with rich culture, delicious food, and stunning landscapes. What would you like to know about Japan to help with your trip planning?", + }, + { + "role": "user", + "content": "What are the best times to visit Tokyo and Kyoto?", + }, + ] + + result = await agent.kickoff_async(messages) + print(f"Response: {result.raw}") + + # Print usage metrics if available + if result.usage_metrics: + print("\nUsage metrics:") + for key, value in result.usage_metrics.items(): + print(f"{key}: {value}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/crewai/lite_agent.py b/src/crewai/lite_agent.py new file mode 100644 index 000000000..d620599f5 --- /dev/null +++ b/src/crewai/lite_agent.py @@ -0,0 +1,394 @@ +import asyncio +import json +import re +from typing import Any, Dict, List, Optional, Type, Union, cast + +from pydantic import BaseModel, Field, PrivateAttr, model_validator + +from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess +from crewai.agents.cache import CacheHandler +from crewai.llm import LLM +from crewai.tools.base_tool import BaseTool +from crewai.types.usage_metrics import UsageMetrics +from crewai.utilities.events.agent_events import ( + LiteAgentExecutionCompletedEvent, + LiteAgentExecutionErrorEvent, + LiteAgentExecutionStartedEvent, +) +from crewai.utilities.events.crewai_event_bus import crewai_event_bus +from crewai.utilities.llm_utils import create_llm +from crewai.utilities.token_counter_callback import TokenCalcHandler + + +class LiteAgentOutput(BaseModel): + """Class that represents the result of a LiteAgent execution.""" + + model_config = {"arbitrary_types_allowed": True} + + raw: str = Field(description="Raw output of the agent", default="") + pydantic: Optional[BaseModel] = Field( + description="Pydantic output of the agent", default=None + ) + agent_role: str = Field(description="Role of the agent that produced this output") + usage_metrics: Optional[Dict[str, Any]] = Field( + description="Token usage metrics for this execution", default=None + ) + + def to_dict(self) -> Dict[str, Any]: + """Convert pydantic_output to a dictionary.""" + if self.pydantic: + return self.pydantic.model_dump() + return {} + + def __str__(self) -> str: + """String representation of the output.""" + if self.pydantic: + return str(self.pydantic) + return self.raw + + +class LiteAgent(BaseModel): + """ + A lightweight agent that can process messages and use tools. + + This agent is simpler than the full Agent class, focusing on direct execution + rather than task delegation. It's designed to be used for simple interactions + where a full crew is not needed. + + Attributes: + role: The role of the agent. + goal: The objective of the agent. + backstory: The backstory of the agent. + llm: The language model that will run the agent. + tools: Tools at the agent's disposal. + verbose: Whether the agent execution should be in verbose mode. + max_iterations: Maximum number of iterations for tool usage. + max_execution_time: Maximum execution time in seconds. + response_format: Optional Pydantic model for structured output. + system_prompt: Custom system prompt to override the default. + """ + + model_config = {"arbitrary_types_allowed": True} + + role: str = Field(description="Role of the agent") + goal: str = Field(description="Goal of the agent") + backstory: str = Field(description="Backstory of the agent") + llm: Union[str, LLM, Any] = Field( + description="Language model that will run the agent", default=None + ) + tools: List[BaseTool] = Field( + default_factory=list, description="Tools at agent's disposal" + ) + verbose: bool = Field( + default=False, description="Whether to print execution details" + ) + max_iterations: int = Field( + default=15, description="Maximum number of iterations for tool usage" + ) + max_execution_time: Optional[int] = Field( + default=None, description="Maximum execution time in seconds" + ) + response_format: Optional[Type[BaseModel]] = Field( + default=None, description="Pydantic model for structured output" + ) + system_prompt: Optional[str] = Field( + default=None, description="Custom system prompt to override default" + ) + step_callback: Optional[Any] = Field( + default=None, + description="Callback to be executed after each step of the agent execution.", + ) + + _token_process: TokenProcess = PrivateAttr(default_factory=TokenProcess) + _cache_handler: CacheHandler = PrivateAttr(default_factory=CacheHandler) + _times_executed: int = PrivateAttr(default=0) + _max_retry_limit: int = PrivateAttr(default=2) + + @model_validator(mode="after") + def setup_llm(self): + """Set up the LLM after initialization.""" + if self.llm is None: + raise ValueError("LLM must be provided") + + if not isinstance(self.llm, LLM): + self.llm = create_llm(self.llm) + + return self + + def _get_default_system_prompt(self) -> str: + """Get the default system prompt for the agent.""" + prompt = f"""You are a helpful AI assistant acting as {self.role}. + +Your goal is: {self.goal} + +Your backstory: {self.backstory} + +When using tools, follow this format: +Thought: I need to use a tool to help with this task. +Action: tool_name +Action Input: {{ + "parameter1": "value1", + "parameter2": "value2" +}} +Observation: [Result of the tool execution] + +When you have the final answer, respond directly without the above format. +""" + return prompt + + def _format_tools_description(self) -> str: + """Format tools into a string for the prompt.""" + if not self.tools: + return "You don't have any tools available." + + tools_str = "You have access to the following tools:\n\n" + for tool in self.tools: + tools_str += f"Tool: {tool.name}\n" + tools_str += f"Description: {tool.description}\n" + if hasattr(tool, "args_schema"): + tools_str += f"Parameters: {tool.args_schema}\n" + tools_str += "\n" + + return tools_str + + def _parse_tools(self) -> List[Any]: + """Parse tools to be used by the agent.""" + tools_list = [] + try: + from crewai.tools import BaseTool as CrewAITool + + for tool in self.tools: + if isinstance(tool, CrewAITool): + tools_list.append(tool.to_structured_tool()) + else: + tools_list.append(tool) + except ModuleNotFoundError: + tools_list = self.tools + + return tools_list + + def _format_messages( + self, messages: Union[str, List[Dict[str, str]]] + ) -> List[Dict[str, str]]: + """Format messages for the LLM.""" + if isinstance(messages, str): + messages = [{"role": "user", "content": messages}] + + system_prompt = self.system_prompt or self._get_default_system_prompt() + tools_description = self._format_tools_description() + + # Add system message at the beginning + formatted_messages = [ + {"role": "system", "content": f"{system_prompt}\n\n{tools_description}"} + ] + + # Add the rest of the messages + formatted_messages.extend(messages) + + return formatted_messages + + def _extract_structured_output(self, text: str) -> Optional[BaseModel]: + """Extract structured output from text if response_format is set.""" + if not self.response_format: + return None + + try: + # Try to extract JSON from the text + json_match = re.search(r"```json\s*([\s\S]*?)\s*```", text) + if json_match: + json_str = json_match.group(1) + json_data = json.loads(json_str) + else: + # Try to parse the entire text as JSON + try: + json_data = json.loads(text) + except json.JSONDecodeError: + # If that fails, use a more lenient approach to find JSON-like content + potential_json = re.search(r"(\{[\s\S]*\})", text) + if potential_json: + json_data = json.loads(potential_json.group(1)) + else: + return None + + # Convert to Pydantic model + return self.response_format.model_validate(json_data) + except Exception as e: + if self.verbose: + print(f"Error extracting structured output: {e}") + return None + + def kickoff(self, messages: Union[str, List[Dict[str, str]]]) -> LiteAgentOutput: + """ + Execute the agent with the given messages. + + Args: + messages: Either a string query or a list of message dictionaries. + If a string is provided, it will be converted to a user message. + If a list is provided, each dict should have 'role' and 'content' keys. + + Returns: + LiteAgentOutput: The result of the agent execution. + """ + return asyncio.run(self.kickoff_async(messages)) + + async def kickoff_async( + self, messages: Union[str, List[Dict[str, str]]] + ) -> LiteAgentOutput: + """ + Execute the agent asynchronously with the given messages. + + Args: + messages: Either a string query or a list of message dictionaries. + If a string is provided, it will be converted to a user message. + If a list is provided, each dict should have 'role' and 'content' keys. + + Returns: + LiteAgentOutput: The result of the agent execution. + """ + # Format messages for the LLM + formatted_messages = self._format_messages(messages) + + # Prepare tools + parsed_tools = self._parse_tools() + + # Get the original query for event emission + query = messages if isinstance(messages, str) else messages[-1]["content"] + + # Create agent info for event emission + agent_info = { + "role": self.role, + "goal": self.goal, + "backstory": self.backstory, + "tools": self.tools, + "verbose": self.verbose, + } + + # Emit event for agent execution start + crewai_event_bus.emit( + self, + event=LiteAgentExecutionStartedEvent( + agent_info=agent_info, + tools=self.tools, + task_prompt=query, + ), + ) + + try: + # Execute the agent + result = await self._execute_agent(formatted_messages, parsed_tools) + + # Extract structured output if response_format is set + pydantic_output = None + + if self.response_format: + structured_output = self._extract_structured_output(result) + if isinstance(structured_output, BaseModel): + pydantic_output = structured_output + + # Create output object + usage_metrics = {} + if hasattr(self._token_process, "get_summary"): + usage_metrics_obj = self._token_process.get_summary() + if isinstance(usage_metrics_obj, UsageMetrics): + usage_metrics = usage_metrics_obj.model_dump() + + output = LiteAgentOutput( + raw=result, + pydantic=pydantic_output, + agent_role=self.role, + usage_metrics=usage_metrics, + ) + + # Emit event for agent execution completion + crewai_event_bus.emit( + self, + event=LiteAgentExecutionCompletedEvent( + agent_info=agent_info, + output=result, + ), + ) + + return output + + except Exception as e: + # Emit event for agent execution error + crewai_event_bus.emit( + self, + event=LiteAgentExecutionErrorEvent( + agent_info=agent_info, + error=str(e), + ), + ) + + # Retry if we haven't exceeded the retry limit + self._times_executed += 1 + if self._times_executed <= self._max_retry_limit: + if self.verbose: + print( + f"Retrying agent execution ({self._times_executed}/{self._max_retry_limit})..." + ) + return await self.kickoff_async(messages) + + raise e + + async def _execute_agent( + self, messages: List[Dict[str, str]], tools: List[Any] + ) -> str: + """ + Execute the agent with the given messages and tools. + + Args: + messages: List of message dictionaries. + tools: List of parsed tools. + + Returns: + str: The result of the agent execution. + """ + # Set up available functions for tool execution + available_functions = {} + for tool in self.tools: + available_functions[tool.name] = tool.run + + # Set up callbacks for token tracking + token_callback = TokenCalcHandler(token_cost_process=self._token_process) + callbacks = [token_callback] + + # Execute the LLM with the messages and tools + llm_instance = self.llm + if not isinstance(llm_instance, LLM): + llm_instance = create_llm(llm_instance) + + if llm_instance is None: + raise ValueError("LLM instance is None. Please provide a valid LLM.") + + # Set the response_format on the LLM instance if it's not already set + if self.response_format and not llm_instance.response_format: + llm_instance.response_format = self.response_format + + # Convert tools to dictionaries for LLM call + formatted_tools = None + if tools: + formatted_tools = [] + for tool in tools: + if hasattr(tool, "dict"): + formatted_tools.append(tool.dict()) + elif hasattr(tool, "to_dict"): + formatted_tools.append(tool.to_dict()) + elif hasattr(tool, "model_dump"): + formatted_tools.append(tool.model_dump()) + else: + # If we can't convert the tool, skip it + if self.verbose: + print( + f"Warning: Could not convert tool {tool} to dictionary format" + ) + + result = llm_instance.call( + messages=messages, + tools=formatted_tools, + callbacks=callbacks, + available_functions=available_functions, + ) + + return result diff --git a/src/crewai/utilities/events/agent_events.py b/src/crewai/utilities/events/agent_events.py index ed0480957..d25b55140 100644 --- a/src/crewai/utilities/events/agent_events.py +++ b/src/crewai/utilities/events/agent_events.py @@ -38,3 +38,31 @@ class AgentExecutionErrorEvent(CrewEvent): task: Any error: str type: str = "agent_execution_error" + + +# New event classes for LiteAgent +class LiteAgentExecutionStartedEvent(CrewEvent): + """Event emitted when a LiteAgent starts executing""" + + agent_info: Dict[str, Any] + tools: Optional[Sequence[Union[BaseTool, CrewStructuredTool]]] + task_prompt: str + type: str = "lite_agent_execution_started" + + model_config = {"arbitrary_types_allowed": True} + + +class LiteAgentExecutionCompletedEvent(CrewEvent): + """Event emitted when a LiteAgent completes execution""" + + agent_info: Dict[str, Any] + output: str + type: str = "lite_agent_execution_completed" + + +class LiteAgentExecutionErrorEvent(CrewEvent): + """Event emitted when a LiteAgent encounters an error during execution""" + + agent_info: Dict[str, Any] + error: str + type: str = "lite_agent_execution_error" diff --git a/src/crewai/utilities/events/event_listener.py b/src/crewai/utilities/events/event_listener.py index c5c049bc6..73e453673 100644 --- a/src/crewai/utilities/events/event_listener.py +++ b/src/crewai/utilities/events/event_listener.py @@ -15,7 +15,13 @@ from crewai.utilities.events.llm_events import ( LLMStreamChunkEvent, ) -from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent +from .agent_events import ( + AgentExecutionCompletedEvent, + AgentExecutionStartedEvent, + LiteAgentExecutionCompletedEvent, + LiteAgentExecutionErrorEvent, + LiteAgentExecutionStartedEvent, +) from .crew_events import ( CrewKickoffCompletedEvent, CrewKickoffFailedEvent, @@ -192,6 +198,33 @@ class EventListener(BaseEventListener): event.timestamp, ) + # ----------- LITE AGENT EVENTS ----------- + + @crewai_event_bus.on(LiteAgentExecutionStartedEvent) + def on_lite_agent_execution_started( + source, event: LiteAgentExecutionStartedEvent + ): + self.logger.log( + f"🤖 LiteAgent '{event.agent_info['role']}' started execution", + event.timestamp, + ) + + @crewai_event_bus.on(LiteAgentExecutionCompletedEvent) + def on_lite_agent_execution_completed( + source, event: LiteAgentExecutionCompletedEvent + ): + self.logger.log( + f"✅ LiteAgent '{event.agent_info['role']}' completed execution", + event.timestamp, + ) + + @crewai_event_bus.on(LiteAgentExecutionErrorEvent) + def on_lite_agent_execution_error(source, event: LiteAgentExecutionErrorEvent): + self.logger.log( + f"❌ LiteAgent '{event.agent_info['role']}' execution error: {event.error}", + event.timestamp, + ) + # ----------- FLOW EVENTS ----------- @crewai_event_bus.on(FlowCreatedEvent)