Merge branch 'main' into Branch_2260

This commit is contained in:
Vidit-Ostwal
2025-03-20 22:46:30 +05:30
86 changed files with 14175 additions and 1100 deletions

View File

@@ -14,7 +14,7 @@ warnings.filterwarnings(
category=UserWarning,
module="pydantic.main",
)
__version__ = "0.102.0"
__version__ = "0.108.0"
__all__ = [
"Agent",
"Crew",

View File

@@ -13,6 +13,7 @@ from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.knowledge.utils.knowledge_utils import extract_knowledge_context
from crewai.llm import LLM
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.security import Fingerprint
from crewai.task import Task
from crewai.tools import BaseTool
from crewai.tools.agent_tools.agent_tools import AgentTools
@@ -472,3 +473,13 @@ class Agent(BaseAgent):
def __repr__(self):
return f"Agent(role={self.role}, goal={self.goal}, backstory={self.backstory})"
@property
def fingerprint(self) -> Fingerprint:
"""
Get the agent's fingerprint.
Returns:
Fingerprint: The agent's fingerprint
"""
return self.security_config.fingerprint

View File

@@ -20,6 +20,7 @@ from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.tools_handler import ToolsHandler
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource
from crewai.security.security_config import SecurityConfig
from crewai.tools.base_tool import BaseTool, Tool
from crewai.utilities import I18N, Logger, RPMController
from crewai.utilities.config import process_config
@@ -52,6 +53,7 @@ class BaseAgent(ABC, BaseModel):
max_tokens: Maximum number of tokens for the agent to generate in a response.
knowledge_sources: Knowledge sources for the agent.
knowledge_storage: Custom knowledge storage for the agent.
security_config: Security configuration for the agent, including fingerprinting.
Methods:
@@ -146,6 +148,10 @@ class BaseAgent(ABC, BaseModel):
default=None,
description="Custom knowledge storage for the agent.",
)
security_config: SecurityConfig = Field(
default_factory=SecurityConfig,
description="Security configuration for the agent, including fingerprinting.",
)
@model_validator(mode="before")
@classmethod
@@ -199,6 +205,10 @@ class BaseAgent(ABC, BaseModel):
if not self._token_process:
self._token_process = TokenProcess()
# Initialize security_config if not provided
if self.security_config is None:
self.security_config = SecurityConfig()
return self
@field_validator("id", mode="before")

View File

@@ -1,62 +1,62 @@
from crewai import Agent, Crew, Process, Task
from crewai.project import CrewBase, agent, crew, task
# If you want to run a snippet of code before or after the crew starts,
# If you want to run a snippet of code before or after the crew starts,
# you can use the @before_kickoff and @after_kickoff decorators
# https://docs.crewai.com/concepts/crews#example-crew-class-with-decorators
@CrewBase
class {{crew_name}}():
"""{{crew_name}} crew"""
"""{{crew_name}} crew"""
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
# Learn more about YAML configuration files here:
# Agents: https://docs.crewai.com/concepts/agents#yaml-configuration-recommended
# Tasks: https://docs.crewai.com/concepts/tasks#yaml-configuration-recommended
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True
)
# If you would like to add tools to your agents, you can learn more about it here:
# https://docs.crewai.com/concepts/agents#agent-tools
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
@agent
def reporting_analyst(self) -> Agent:
return Agent(
config=self.agents_config['reporting_analyst'],
verbose=True
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
# To learn more about structured task outputs,
# task dependencies, and task callbacks, check out the documentation:
# https://docs.crewai.com/concepts/tasks#overview-of-a-task
@task
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='report.md'
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
output_file='report.md'
)
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
@crew
def crew(self) -> Crew:
"""Creates the {{crew_name}} crew"""
# To learn how to add knowledge sources to your crew, check out the documentation:
# https://docs.crewai.com/concepts/knowledge#what-is-knowledge
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)
return Crew(
agents=self.agents, # Automatically created by the @agent decorator
tasks=self.tasks, # Automatically created by the @task decorator
process=Process.sequential,
verbose=True,
# process=Process.hierarchical, # In case you wanna use that instead https://docs.crewai.com/how-to/Hierarchical/
)

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.102.0,<1.0.0"
"crewai[tools]>=0.108.0,<1.0.0"
]
[project.scripts]

View File

@@ -5,11 +5,12 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.102.0,<1.0.0",
"crewai[tools]>=0.108.0,<1.0.0",
]
[project.scripts]
kickoff = "{{folder_name}}.main:kickoff"
run_crew = "{{folder_name}}.main:kickoff"
plot = "{{folder_name}}.main:plot"
[build-system]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.13"
dependencies = [
"crewai[tools]>=0.102.0"
"crewai[tools]>=0.108.0"
]
[tool.crewai]

View File

@@ -32,6 +32,7 @@ from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.user.user_memory import UserMemory
from crewai.process import Process
from crewai.security import Fingerprint, SecurityConfig
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
@@ -54,6 +55,7 @@ from crewai.utilities.events.crew_events import (
CrewTrainStartedEvent,
)
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
from crewai.utilities.events.event_listener import EventListener
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
@@ -90,6 +92,7 @@ class Crew(BaseModel):
share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models.
planning: Plan the crew execution and add the plan to the crew.
chat_llm: The language model used for orchestrating chat interactions with the crew.
security_config: Security configuration for the crew, including fingerprinting.
"""
__hash__ = object.__hash__ # type: ignore
@@ -220,6 +223,10 @@ class Crew(BaseModel):
default=None,
description="Knowledge for the crew.",
)
security_config: SecurityConfig = Field(
default_factory=SecurityConfig,
description="Security configuration for the crew, including fingerprinting.",
)
@field_validator("id", mode="before")
@classmethod
@@ -248,7 +255,11 @@ class Crew(BaseModel):
@model_validator(mode="after")
def set_private_attrs(self) -> "Crew":
"""Set private attributes."""
self._cache_handler = CacheHandler()
event_listener = EventListener()
event_listener.verbose = self.verbose
event_listener.formatter.verbose = self.verbose
self._logger = Logger(verbose=self.verbose)
if self.output_log_file:
self._file_handler = FileHandler(self.output_log_file)
@@ -474,10 +485,20 @@ class Crew(BaseModel):
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
source: List[str] = [agent.key for agent in self.agents] + [
task.key for task in self.tasks
]
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@property
def fingerprint(self) -> Fingerprint:
"""
Get the crew's fingerprint.
Returns:
Fingerprint: The crew's fingerprint
"""
return self.security_config.fingerprint
def _setup_from_config(self):
assert self.config is not None, "Config should not be None."

View File

@@ -5,7 +5,17 @@ import sys
import threading
import warnings
from contextlib import contextmanager
from typing import Any, Dict, List, Literal, Optional, Type, Union, cast
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Type,
TypedDict,
Union,
cast,
)
from dotenv import load_dotenv
from pydantic import BaseModel
@@ -15,6 +25,7 @@ from crewai.utilities.events.llm_events import (
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
LLMStreamChunkEvent,
)
from crewai.utilities.events.tool_usage_events import ToolExecutionErrorEvent
@@ -22,8 +33,11 @@ with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
import litellm
from litellm import Choices
from litellm.litellm_core_utils.get_supported_openai_params import (
get_supported_openai_params,
)
from litellm.types.utils import ModelResponse
from litellm.utils import get_supported_openai_params, supports_response_schema
from litellm.utils import supports_response_schema
from crewai.utilities.events import crewai_event_bus
@@ -100,6 +114,19 @@ LLM_CONTEXT_WINDOW_SIZES = {
"Llama-3.2-11B-Vision-Instruct": 16384,
"Meta-Llama-3.2-3B-Instruct": 4096,
"Meta-Llama-3.2-1B-Instruct": 16384,
# mistral
"mistral-tiny": 32768,
"mistral-small-latest": 32768,
"mistral-medium-latest": 32768,
"mistral-large-latest": 32768,
"mistral-large-2407": 32768,
"mistral-large-2402": 32768,
"mistral/mistral-tiny": 32768,
"mistral/mistral-small-latest": 32768,
"mistral/mistral-medium-latest": 32768,
"mistral/mistral-large-latest": 32768,
"mistral/mistral-large-2407": 32768,
"mistral/mistral-large-2402": 32768,
}
DEFAULT_CONTEXT_WINDOW_SIZE = 8192
@@ -126,6 +153,17 @@ def suppress_warnings():
sys.stderr = old_stderr
class Delta(TypedDict):
content: Optional[str]
role: Optional[str]
class StreamingChoices(TypedDict):
delta: Delta
index: int
finish_reason: Optional[str]
class LLM:
def __init__(
self,
@@ -150,6 +188,7 @@ class LLM:
api_key: Optional[str] = None,
callbacks: List[Any] = [],
reasoning_effort: Optional[Literal["none", "low", "medium", "high"]] = None,
stream: bool = False,
**kwargs,
):
self.model = model
@@ -175,6 +214,7 @@ class LLM:
self.reasoning_effort = reasoning_effort
self.additional_params = kwargs
self.is_anthropic = self._is_anthropic_model(model)
self.stream = stream
litellm.drop_params = True
@@ -201,6 +241,432 @@ class LLM:
ANTHROPIC_PREFIXES = ("anthropic/", "claude-", "claude/")
return any(prefix in model.lower() for prefix in ANTHROPIC_PREFIXES)
def _prepare_completion_params(
self,
messages: Union[str, List[Dict[str, str]]],
tools: Optional[List[dict]] = None,
) -> Dict[str, Any]:
"""Prepare parameters for the completion call.
Args:
messages: Input messages for the LLM
tools: Optional list of tool schemas
callbacks: Optional list of callback functions
available_functions: Optional dict of available functions
Returns:
Dict[str, Any]: Parameters for the completion call
"""
# --- 1) Format messages according to provider requirements
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
formatted_messages = self._format_messages_for_provider(messages)
# --- 2) Prepare the parameters for the completion call
params = {
"model": self.model,
"messages": formatted_messages,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
"n": self.n,
"stop": self.stop,
"max_tokens": self.max_tokens or self.max_completion_tokens,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,
"logit_bias": self.logit_bias,
"response_format": self.response_format,
"seed": self.seed,
"logprobs": self.logprobs,
"top_logprobs": self.top_logprobs,
"api_base": self.api_base,
"base_url": self.base_url,
"api_version": self.api_version,
"api_key": self.api_key,
"stream": self.stream,
"tools": tools,
"reasoning_effort": self.reasoning_effort,
**self.additional_params,
}
# Remove None values from params
return {k: v for k, v in params.items() if v is not None}
def _handle_streaming_response(
self,
params: Dict[str, Any],
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> str:
"""Handle a streaming response from the LLM.
Args:
params: Parameters for the completion call
callbacks: Optional list of callback functions
available_functions: Dict of available functions
Returns:
str: The complete response text
Raises:
Exception: If no content is received from the streaming response
"""
# --- 1) Initialize response tracking
full_response = ""
last_chunk = None
chunk_count = 0
usage_info = None
# --- 2) Make sure stream is set to True and include usage metrics
params["stream"] = True
params["stream_options"] = {"include_usage": True}
try:
# --- 3) Process each chunk in the stream
for chunk in litellm.completion(**params):
chunk_count += 1
last_chunk = chunk
# Extract content from the chunk
chunk_content = None
# Safely extract content from various chunk formats
try:
# Try to access choices safely
choices = None
if isinstance(chunk, dict) and "choices" in chunk:
choices = chunk["choices"]
elif hasattr(chunk, "choices"):
# Check if choices is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "choices"), type):
choices = getattr(chunk, "choices")
# Try to extract usage information if available
if isinstance(chunk, dict) and "usage" in chunk:
usage_info = chunk["usage"]
elif hasattr(chunk, "usage"):
# Check if usage is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "usage"), type):
usage_info = getattr(chunk, "usage")
if choices and len(choices) > 0:
choice = choices[0]
# Handle different delta formats
delta = None
if isinstance(choice, dict) and "delta" in choice:
delta = choice["delta"]
elif hasattr(choice, "delta"):
delta = getattr(choice, "delta")
# Extract content from delta
if delta:
# Handle dict format
if isinstance(delta, dict):
if "content" in delta and delta["content"] is not None:
chunk_content = delta["content"]
# Handle object format
elif hasattr(delta, "content"):
chunk_content = getattr(delta, "content")
# Handle case where content might be None or empty
if chunk_content is None and isinstance(delta, dict):
# Some models might send empty content chunks
chunk_content = ""
except Exception as e:
logging.debug(f"Error extracting content from chunk: {e}")
logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}")
# Only add non-None content to the response
if chunk_content is not None:
# Add the chunk content to the full response
full_response += chunk_content
# Emit the chunk event
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content),
)
# --- 4) Fallback to non-streaming if no content received
if not full_response.strip() and chunk_count == 0:
logging.warning(
"No chunks received in streaming response, falling back to non-streaming"
)
non_streaming_params = params.copy()
non_streaming_params["stream"] = False
non_streaming_params.pop(
"stream_options", None
) # Remove stream_options for non-streaming call
return self._handle_non_streaming_response(
non_streaming_params, callbacks, available_functions
)
# --- 5) Handle empty response with chunks
if not full_response.strip() and chunk_count > 0:
logging.warning(
f"Received {chunk_count} chunks but no content was extracted"
)
if last_chunk is not None:
try:
# Try to extract content from the last chunk's message
choices = None
if isinstance(last_chunk, dict) and "choices" in last_chunk:
choices = last_chunk["choices"]
elif hasattr(last_chunk, "choices"):
if not isinstance(getattr(last_chunk, "choices"), type):
choices = getattr(last_chunk, "choices")
if choices and len(choices) > 0:
choice = choices[0]
# Try to get content from message
message = None
if isinstance(choice, dict) and "message" in choice:
message = choice["message"]
elif hasattr(choice, "message"):
message = getattr(choice, "message")
if message:
content = None
if isinstance(message, dict) and "content" in message:
content = message["content"]
elif hasattr(message, "content"):
content = getattr(message, "content")
if content:
full_response = content
logging.info(
f"Extracted content from last chunk message: {full_response}"
)
except Exception as e:
logging.debug(f"Error extracting content from last chunk: {e}")
logging.debug(
f"Last chunk format: {type(last_chunk)}, content: {last_chunk}"
)
# --- 6) If still empty, raise an error instead of using a default response
if not full_response.strip():
raise Exception(
"No content received from streaming response. Received empty chunks or failed to extract content."
)
# --- 7) Check for tool calls in the final response
tool_calls = None
try:
if last_chunk:
choices = None
if isinstance(last_chunk, dict) and "choices" in last_chunk:
choices = last_chunk["choices"]
elif hasattr(last_chunk, "choices"):
if not isinstance(getattr(last_chunk, "choices"), type):
choices = getattr(last_chunk, "choices")
if choices and len(choices) > 0:
choice = choices[0]
message = None
if isinstance(choice, dict) and "message" in choice:
message = choice["message"]
elif hasattr(choice, "message"):
message = getattr(choice, "message")
if message:
if isinstance(message, dict) and "tool_calls" in message:
tool_calls = message["tool_calls"]
elif hasattr(message, "tool_calls"):
tool_calls = getattr(message, "tool_calls")
except Exception as e:
logging.debug(f"Error checking for tool calls: {e}")
# --- 8) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
# Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response
# --- 9) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result
# --- 10) Log token usage if available in streaming mode
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
# --- 11) Emit completion event and return response
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response
except Exception as e:
logging.error(f"Error in streaming response: {str(e)}")
if full_response.strip():
logging.warning(f"Returning partial response despite error: {str(e)}")
self._handle_emit_call_events(full_response, LLMCallType.LLM_CALL)
return full_response
# Emit failed event and re-raise the exception
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=str(e)),
)
raise Exception(f"Failed to get streaming response: {str(e)}")
def _handle_streaming_callbacks(
self,
callbacks: Optional[List[Any]],
usage_info: Optional[Dict[str, Any]],
last_chunk: Optional[Any],
) -> None:
"""Handle callbacks with usage info for streaming responses.
Args:
callbacks: Optional list of callback functions
usage_info: Usage information collected during streaming
last_chunk: The last chunk received from the streaming response
"""
if callbacks and len(callbacks) > 0:
for callback in callbacks:
if hasattr(callback, "log_success_event"):
# Use the usage_info we've been tracking
if not usage_info:
# Try to get usage from the last chunk if we haven't already
try:
if last_chunk:
if (
isinstance(last_chunk, dict)
and "usage" in last_chunk
):
usage_info = last_chunk["usage"]
elif hasattr(last_chunk, "usage"):
if not isinstance(
getattr(last_chunk, "usage"), type
):
usage_info = getattr(last_chunk, "usage")
except Exception as e:
logging.debug(f"Error extracting usage info: {e}")
if usage_info:
callback.log_success_event(
kwargs={}, # We don't have the original params here
response_obj={"usage": usage_info},
start_time=0,
end_time=0,
)
def _handle_non_streaming_response(
self,
params: Dict[str, Any],
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> str:
"""Handle a non-streaming response from the LLM.
Args:
params: Parameters for the completion call
callbacks: Optional list of callback functions
available_functions: Dict of available functions
Returns:
str: The response text
"""
# --- 1) Make the completion call
response = litellm.completion(**params)
# --- 2) Extract response message and content
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
text_response = response_message.content or ""
# --- 3) Handle callbacks with usage info
if callbacks and len(callbacks) > 0:
for callback in callbacks:
if hasattr(callback, "log_success_event"):
usage_info = getattr(response, "usage", None)
if usage_info:
callback.log_success_event(
kwargs=params,
response_obj={"usage": usage_info},
start_time=0,
end_time=0,
)
# --- 4) Check for tool calls
tool_calls = getattr(response_message, "tool_calls", [])
# --- 5) If no tool calls or no available functions, return the text response directly
if not tool_calls or not available_functions:
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL)
return text_response
# --- 6) Handle tool calls if present
tool_result = self._handle_tool_call(tool_calls, available_functions)
if tool_result is not None:
return tool_result
# --- 7) If tool call handling didn't return a result, emit completion event and return text response
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL)
return text_response
def _handle_tool_call(
self,
tool_calls: List[Any],
available_functions: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
"""Handle a tool call from the LLM.
Args:
tool_calls: List of tool calls from the LLM
available_functions: Dict of available functions
Returns:
Optional[str]: The result of the tool call, or None if no tool call was made
"""
# --- 1) Validate tool calls and available functions
if not tool_calls or not available_functions:
return None
# --- 2) Extract function name from first tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
function_args = {} # Initialize to empty dict to avoid unbound variable
# --- 3) Check if function is available
if function_name in available_functions:
try:
# --- 3.1) Parse function arguments
function_args = json.loads(tool_call.function.arguments)
fn = available_functions[function_name]
# --- 3.2) Execute function
result = fn(**function_args)
# --- 3.3) Emit success event
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
return result
except Exception as e:
# --- 3.4) Handle execution errors
fn = available_functions.get(
function_name, lambda: None
) # Ensure fn is always a callable
logging.error(f"Error executing function '{function_name}': {e}")
crewai_event_bus.emit(
self,
event=ToolExecutionErrorEvent(
tool_name=function_name,
tool_args=function_args,
tool_class=fn,
error=str(e),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(error=f"Tool execution error: {str(e)}"),
)
return None
def call(
self,
messages: Union[str, List[Dict[str, str]]],
@@ -230,22 +696,8 @@ class LLM:
TypeError: If messages format is invalid
ValueError: If response format is not supported
LLMContextLengthExceededException: If input exceeds model's context limit
Examples:
# Example 1: Simple string input
>>> response = llm.call("Return the name of a random city.")
>>> print(response)
"Paris"
# Example 2: Message list with system and user messages
>>> messages = [
... {"role": "system", "content": "You are a geography expert"},
... {"role": "user", "content": "What is France's capital?"}
... ]
>>> response = llm.call(messages)
>>> print(response)
"The capital of France is Paris."
"""
# --- 1) Emit call started event
crewai_event_bus.emit(
self,
event=LLMCallStartedEvent(
@@ -255,127 +707,38 @@ class LLM:
available_functions=available_functions,
),
)
# Validate parameters before proceeding with the call.
# --- 2) Validate parameters before proceeding with the call
self._validate_call_params()
# --- 3) Convert string messages to proper format if needed
if isinstance(messages, str):
messages = [{"role": "user", "content": messages}]
# For O1 models, system messages are not supported.
# Convert any system messages into assistant messages.
# --- 4) Handle O1 model special case (system messages not supported)
if "o1" in self.model.lower():
for message in messages:
if message.get("role") == "system":
message["role"] = "assistant"
# --- 5) Set up callbacks if provided
with suppress_warnings():
if callbacks and len(callbacks) > 0:
self.set_callbacks(callbacks)
try:
# --- 1) Format messages according to provider requirements
formatted_messages = self._format_messages_for_provider(messages)
# --- 6) Prepare parameters for the completion call
params = self._prepare_completion_params(messages, tools)
# --- 2) Prepare the parameters for the completion call
params = {
"model": self.model,
"messages": formatted_messages,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
"n": self.n,
"stop": self.stop,
"max_tokens": self.max_tokens or self.max_completion_tokens,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,
"logit_bias": self.logit_bias,
"response_format": self.response_format,
"seed": self.seed,
"logprobs": self.logprobs,
"top_logprobs": self.top_logprobs,
"api_base": self.api_base,
"base_url": self.base_url,
"api_version": self.api_version,
"api_key": self.api_key,
"stream": False,
"tools": tools,
"reasoning_effort": self.reasoning_effort,
**self.additional_params,
}
# Remove None values from params
params = {k: v for k, v in params.items() if v is not None}
# --- 2) Make the completion call
response = litellm.completion(**params)
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
text_response = response_message.content or ""
tool_calls = getattr(response_message, "tool_calls", [])
# --- 3) Handle callbacks with usage info
if callbacks and len(callbacks) > 0:
for callback in callbacks:
if hasattr(callback, "log_success_event"):
usage_info = getattr(response, "usage", None)
if usage_info:
callback.log_success_event(
kwargs=params,
response_obj={"usage": usage_info},
start_time=0,
end_time=0,
)
# --- 4) If no tool calls, return the text response
if not tool_calls or not available_functions:
self._handle_emit_call_events(text_response, LLMCallType.LLM_CALL)
return text_response
# --- 5) Handle the tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
if function_name in available_functions:
try:
function_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError as e:
logging.warning(f"Failed to parse function arguments: {e}")
return text_response
fn = available_functions[function_name]
try:
# Call the actual tool function
result = fn(**function_args)
self._handle_emit_call_events(result, LLMCallType.TOOL_CALL)
return result
except Exception as e:
logging.error(
f"Error executing function '{function_name}': {e}"
)
crewai_event_bus.emit(
self,
event=ToolExecutionErrorEvent(
tool_name=function_name,
tool_args=function_args,
tool_class=fn,
error=str(e),
),
)
crewai_event_bus.emit(
self,
event=LLMCallFailedEvent(
error=f"Tool execution error: {str(e)}"
),
)
return text_response
else:
logging.warning(
f"Tool call requested unknown function '{function_name}'"
# --- 7) Make the completion call and handle response
if self.stream:
return self._handle_streaming_response(
params, callbacks, available_functions
)
else:
return self._handle_non_streaming_response(
params, callbacks, available_functions
)
return text_response
except Exception as e:
crewai_event_bus.emit(
@@ -426,6 +789,31 @@ class LLM:
"Invalid message format. Each message must be a dict with 'role' and 'content' keys"
)
# Handle O1 models specially
if "o1" in self.model.lower():
formatted_messages = []
for msg in messages:
# Convert system messages to assistant messages
if msg["role"] == "system":
formatted_messages.append(
{"role": "assistant", "content": msg["content"]}
)
else:
formatted_messages.append(msg)
return formatted_messages
# Handle Mistral models - they require the last message to have a role of 'user' or 'tool'
if "mistral" in self.model.lower():
# Check if the last message has a role of 'assistant'
if messages and messages[-1]["role"] == "assistant":
# Add a dummy user message to ensure the last message has a role of 'user'
messages = (
messages.copy()
) # Create a copy to avoid modifying the original
messages.append({"role": "user", "content": "Please continue."})
return messages
# Handle Anthropic models
if not self.is_anthropic:
return messages
@@ -436,7 +824,7 @@ class LLM:
return messages
def _get_custom_llm_provider(self) -> str:
def _get_custom_llm_provider(self) -> Optional[str]:
"""
Derives the custom_llm_provider from the model string.
- For example, if the model is "openrouter/deepseek/deepseek-chat", returns "openrouter".
@@ -445,7 +833,7 @@ class LLM:
"""
if "/" in self.model:
return self.model.split("/")[0]
return "openai"
return None
def _validate_call_params(self) -> None:
"""
@@ -468,10 +856,12 @@ class LLM:
def supports_function_calling(self) -> bool:
try:
params = get_supported_openai_params(model=self.model)
return params is not None and "tools" in params
provider = self._get_custom_llm_provider()
return litellm.utils.supports_function_calling(
self.model, custom_llm_provider=provider
)
except Exception as e:
logging.error(f"Failed to get supported params: {str(e)}")
logging.error(f"Failed to check function calling support: {str(e)}")
return False
def supports_stop_words(self) -> bool:

View File

@@ -0,0 +1,13 @@
"""
CrewAI security module.
This module provides security-related functionality for CrewAI, including:
- Fingerprinting for component identity and tracking
- Security configuration for controlling access and permissions
- Future: authentication, scoping, and delegation mechanisms
"""
from crewai.security.fingerprint import Fingerprint
from crewai.security.security_config import SecurityConfig
__all__ = ["Fingerprint", "SecurityConfig"]

View File

@@ -0,0 +1,170 @@
"""
Fingerprint Module
This module provides functionality for generating and validating unique identifiers
for CrewAI agents. These identifiers are used for tracking, auditing, and security.
"""
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel, ConfigDict, Field, field_validator
class Fingerprint(BaseModel):
"""
A class for generating and managing unique identifiers for agents.
Each agent has dual identifiers:
- Human-readable ID: For debugging and reference (derived from role if not specified)
- Fingerprint UUID: Unique runtime identifier for tracking and auditing
Attributes:
uuid_str (str): String representation of the UUID for this fingerprint, auto-generated
created_at (datetime): When this fingerprint was created, auto-generated
metadata (Dict[str, Any]): Additional metadata associated with this fingerprint
"""
uuid_str: str = Field(default_factory=lambda: str(uuid.uuid4()), description="String representation of the UUID")
created_at: datetime = Field(default_factory=datetime.now, description="When this fingerprint was created")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata for this fingerprint")
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_validator('metadata')
@classmethod
def validate_metadata(cls, v):
"""Validate that metadata is a dictionary with string keys and valid values."""
if not isinstance(v, dict):
raise ValueError("Metadata must be a dictionary")
# Validate that all keys are strings
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Metadata keys must be strings, got {type(key)}")
# Validate nested dictionaries (prevent deeply nested structures)
if isinstance(value, dict):
# Check for nested dictionaries (limit depth to 1)
for nested_key, nested_value in value.items():
if not isinstance(nested_key, str):
raise ValueError(f"Nested metadata keys must be strings, got {type(nested_key)}")
if isinstance(nested_value, dict):
raise ValueError("Metadata can only be nested one level deep")
# Check for maximum metadata size (prevent DoS)
if len(str(v)) > 10000: # Limit metadata size to 10KB
raise ValueError("Metadata size exceeds maximum allowed (10KB)")
return v
def __init__(self, **data):
"""Initialize a Fingerprint with auto-generated uuid_str and created_at."""
# Remove uuid_str and created_at from data to ensure they're auto-generated
if 'uuid_str' in data:
data.pop('uuid_str')
if 'created_at' in data:
data.pop('created_at')
# Call the parent constructor with the modified data
super().__init__(**data)
@property
def uuid(self) -> uuid.UUID:
"""Get the UUID object for this fingerprint."""
return uuid.UUID(self.uuid_str)
@classmethod
def _generate_uuid(cls, seed: str) -> str:
"""
Generate a deterministic UUID based on a seed string.
Args:
seed (str): The seed string to use for UUID generation
Returns:
str: A string representation of the UUID consistently generated from the seed
"""
if not isinstance(seed, str):
raise ValueError("Seed must be a string")
if not seed.strip():
raise ValueError("Seed cannot be empty or whitespace")
# Create a deterministic UUID using v5 (SHA-1)
# Custom namespace for CrewAI to enhance security
# Using a unique namespace specific to CrewAI to reduce collision risks
CREW_AI_NAMESPACE = uuid.UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479')
return str(uuid.uuid5(CREW_AI_NAMESPACE, seed))
@classmethod
def generate(cls, seed: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> 'Fingerprint':
"""
Static factory method to create a new Fingerprint.
Args:
seed (Optional[str]): A string to use as seed for the UUID generation.
If None, a random UUID is generated.
metadata (Optional[Dict[str, Any]]): Additional metadata to store with the fingerprint.
Returns:
Fingerprint: A new Fingerprint instance
"""
fingerprint = cls(metadata=metadata or {})
if seed:
# For seed-based generation, we need to manually set the uuid_str after creation
object.__setattr__(fingerprint, 'uuid_str', cls._generate_uuid(seed))
return fingerprint
def __str__(self) -> str:
"""String representation of the fingerprint (the UUID)."""
return self.uuid_str
def __eq__(self, other) -> bool:
"""Compare fingerprints by their UUID."""
if isinstance(other, Fingerprint):
return self.uuid_str == other.uuid_str
return False
def __hash__(self) -> int:
"""Hash of the fingerprint (based on UUID)."""
return hash(self.uuid_str)
def to_dict(self) -> Dict[str, Any]:
"""
Convert the fingerprint to a dictionary representation.
Returns:
Dict[str, Any]: Dictionary representation of the fingerprint
"""
return {
"uuid_str": self.uuid_str,
"created_at": self.created_at.isoformat(),
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Fingerprint':
"""
Create a Fingerprint from a dictionary representation.
Args:
data (Dict[str, Any]): Dictionary representation of a fingerprint
Returns:
Fingerprint: A new Fingerprint instance
"""
if not data:
return cls()
fingerprint = cls(metadata=data.get("metadata", {}))
# For consistency with existing stored fingerprints, we need to manually set these
if "uuid_str" in data:
object.__setattr__(fingerprint, 'uuid_str', data["uuid_str"])
if "created_at" in data and isinstance(data["created_at"], str):
object.__setattr__(fingerprint, 'created_at', datetime.fromisoformat(data["created_at"]))
return fingerprint

View File

@@ -0,0 +1,116 @@
"""
Security Configuration Module
This module provides configuration for CrewAI security features, including:
- Authentication settings
- Scoping rules
- Fingerprinting
The SecurityConfig class is the primary interface for managing security settings
in CrewAI applications.
"""
from typing import Any, Dict, Optional
from pydantic import BaseModel, ConfigDict, Field, model_validator
from crewai.security.fingerprint import Fingerprint
class SecurityConfig(BaseModel):
"""
Configuration for CrewAI security features.
This class manages security settings for CrewAI agents, including:
- Authentication credentials *TODO*
- Identity information (agent fingerprints)
- Scoping rules *TODO*
- Impersonation/delegation tokens *TODO*
Attributes:
version (str): Version of the security configuration
fingerprint (Fingerprint): The unique fingerprint automatically generated for the component
"""
model_config = ConfigDict(
arbitrary_types_allowed=True
# Note: Cannot use frozen=True as existing tests modify the fingerprint property
)
version: str = Field(
default="1.0.0",
description="Version of the security configuration"
)
fingerprint: Fingerprint = Field(
default_factory=Fingerprint,
description="Unique identifier for the component"
)
def is_compatible(self, min_version: str) -> bool:
"""
Check if this security configuration is compatible with the minimum required version.
Args:
min_version (str): Minimum required version in semver format (e.g., "1.0.0")
Returns:
bool: True if this configuration is compatible, False otherwise
"""
# Simple version comparison (can be enhanced with packaging.version if needed)
current = [int(x) for x in self.version.split(".")]
minimum = [int(x) for x in min_version.split(".")]
# Compare major, minor, patch versions
for c, m in zip(current, minimum):
if c > m:
return True
if c < m:
return False
return True
@model_validator(mode='before')
@classmethod
def validate_fingerprint(cls, values):
"""Ensure fingerprint is properly initialized."""
if isinstance(values, dict):
# Handle case where fingerprint is not provided or is None
if 'fingerprint' not in values or values['fingerprint'] is None:
values['fingerprint'] = Fingerprint()
# Handle case where fingerprint is a string (seed)
elif isinstance(values['fingerprint'], str):
if not values['fingerprint'].strip():
raise ValueError("Fingerprint seed cannot be empty")
values['fingerprint'] = Fingerprint.generate(seed=values['fingerprint'])
return values
def to_dict(self) -> Dict[str, Any]:
"""
Convert the security config to a dictionary.
Returns:
Dict[str, Any]: Dictionary representation of the security config
"""
result = {
"fingerprint": self.fingerprint.to_dict()
}
return result
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SecurityConfig':
"""
Create a SecurityConfig from a dictionary.
Args:
data (Dict[str, Any]): Dictionary representation of a security config
Returns:
SecurityConfig: A new SecurityConfig instance
"""
# Make a copy to avoid modifying the original
data_copy = data.copy()
fingerprint_data = data_copy.pop("fingerprint", None)
fingerprint = Fingerprint.from_dict(fingerprint_data) if fingerprint_data else Fingerprint()
return cls(fingerprint=fingerprint)

View File

@@ -19,6 +19,8 @@ from typing import (
Tuple,
Type,
Union,
get_args,
get_origin,
)
from pydantic import (
@@ -32,6 +34,7 @@ from pydantic import (
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.security import Fingerprint, SecurityConfig
from crewai.tasks.guardrail_result import GuardrailResult
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
@@ -64,6 +67,7 @@ class Task(BaseModel):
output_file: File path for storing task output.
output_json: Pydantic model for structuring JSON output.
output_pydantic: Pydantic model for task output.
security_config: Security configuration including fingerprinting.
tools: List of tools/resources limited for task execution.
"""
@@ -116,6 +120,10 @@ class Task(BaseModel):
default_factory=list,
description="Tools the agent is limited to use for this task.",
)
security_config: SecurityConfig = Field(
default_factory=SecurityConfig,
description="Security configuration for the task.",
)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
@@ -172,15 +180,29 @@ class Task(BaseModel):
"""
if v is not None:
sig = inspect.signature(v)
if len(sig.parameters) != 1:
positional_args = [
param
for param in sig.parameters.values()
if param.default is inspect.Parameter.empty
]
if len(positional_args) != 1:
raise ValueError("Guardrail function must accept exactly one parameter")
# Check return annotation if present, but don't require it
return_annotation = sig.return_annotation
if return_annotation != inspect.Signature.empty:
return_annotation_args = get_args(return_annotation)
if not (
return_annotation == Tuple[bool, Any]
or str(return_annotation) == "Tuple[bool, Any]"
get_origin(return_annotation) is tuple
and len(return_annotation_args) == 2
and return_annotation_args[0] is bool
and (
return_annotation_args[1] is Any
or return_annotation_args[1] is str
or return_annotation_args[1] is TaskOutput
or return_annotation_args[1] == Union[str, TaskOutput]
)
):
raise ValueError(
"If return type is annotated, it must be Tuple[bool, Any]"
@@ -435,9 +457,9 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json()
if pydantic_output
else result
else (
pydantic_output.model_dump_json() if pydantic_output else result
)
)
self._save_file(content)
crewai_event_bus.emit(self, TaskCompletedEvent(output=task_output))
@@ -728,3 +750,12 @@ class Task(BaseModel):
def __repr__(self):
return f"Task(description={self.description}, expected_output={self.expected_output})"
@property
def fingerprint(self) -> Fingerprint:
"""Get the fingerprint of the task.
Returns:
Fingerprint: The fingerprint of the task
"""
return self.security_config.fingerprint

View File

@@ -14,7 +14,12 @@ from .agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
)
from .task_events import TaskStartedEvent, TaskCompletedEvent, TaskFailedEvent, TaskEvaluationEvent
from .task_events import (
TaskStartedEvent,
TaskCompletedEvent,
TaskFailedEvent,
TaskEvaluationEvent,
)
from .flow_events import (
FlowCreatedEvent,
FlowStartedEvent,
@@ -34,7 +39,13 @@ from .tool_usage_events import (
ToolUsageEvent,
ToolValidateInputErrorEvent,
)
from .llm_events import LLMCallCompletedEvent, LLMCallFailedEvent, LLMCallStartedEvent
from .llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMCallType,
LLMStreamChunkEvent,
)
# events
from .event_listener import EventListener

View File

@@ -5,6 +5,8 @@ from crewai.utilities.events.crewai_event_bus import CrewAIEventsBus, crewai_eve
class BaseEventListener(ABC):
verbose: bool = False
def __init__(self):
super().__init__()
self.setup_listeners(crewai_event_bus)

View File

@@ -67,15 +67,12 @@ class CrewAIEventsBus:
source: The object emitting the event
event: The event instance to emit
"""
event_type = type(event)
if event_type in self._handlers:
for handler in self._handlers[event_type]:
handler(source, event)
self._signal.send(source, event=event)
for event_type, handlers in self._handlers.items():
if isinstance(event, event_type):
for handler in handlers:
handler(source, event)
def clear_handlers(self) -> None:
"""Clear all registered event handlers - useful for testing"""
self._handlers.clear()
self._signal.send(source, event=event)
def register_handler(
self, event_type: Type[EventTypes], handler: Callable[[Any, EventTypes], None]

View File

@@ -1,3 +1,4 @@
from io import StringIO
from typing import Any, Dict
from pydantic import Field, PrivateAttr
@@ -11,7 +12,9 @@ from crewai.utilities.events.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
from .agent_events import AgentExecutionCompletedEvent, AgentExecutionStartedEvent
from .crew_events import (
@@ -46,6 +49,8 @@ class EventListener(BaseEventListener):
_telemetry: Telemetry = PrivateAttr(default_factory=lambda: Telemetry())
logger = Logger(verbose=True, default_color=EMITTER_COLOR)
execution_spans: Dict[Task, Any] = Field(default_factory=dict)
next_chunk = 0
text_stream = StringIO()
def __new__(cls):
if cls._instance is None:
@@ -60,34 +65,222 @@ class EventListener(BaseEventListener):
self._telemetry.set_tracer()
self.execution_spans = {}
self._initialized = True
self.formatter = ConsoleFormatter()
# ----------- CREW EVENTS -----------
def setup_listeners(self, crewai_event_bus):
@crewai_event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event: CrewKickoffStartedEvent):
self.logger.log(
f"🚀 Crew '{event.crew_name}' started, {source.id}",
event.timestamp,
)
self.formatter.create_crew_tree(event.crew_name or "Crew", source.id)
self._telemetry.crew_execution_span(source, event.inputs)
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event: CrewKickoffCompletedEvent):
# Handle telemetry
final_string_output = event.output.raw
self._telemetry.end_crew(source, final_string_output)
self.logger.log(
f"✅ Crew '{event.crew_name}' completed, {source.id}",
event.timestamp,
self.formatter.update_crew_tree(
self.formatter.current_crew_tree,
event.crew_name or "Crew",
source.id,
"completed",
)
@crewai_event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event: CrewKickoffFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed, {source.id}",
event.timestamp,
self.formatter.update_crew_tree(
self.formatter.current_crew_tree,
event.crew_name or "Crew",
source.id,
"failed",
)
@crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
self.formatter.handle_crew_train_started(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
self.formatter.handle_crew_train_completed(
event.crew_name or "Crew", str(event.timestamp)
)
@crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
self.formatter.handle_crew_train_failed(event.crew_name or "Crew")
# ----------- TASK EVENTS -----------
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
self.formatter.create_task_branch(
self.formatter.current_crew_tree, source.id
)
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
# Handle telemetry
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"completed",
)
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
self.formatter.update_task_status(
self.formatter.current_crew_tree,
source.id,
source.agent.role,
"failed",
)
# ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
self.formatter.create_agent_branch(
self.formatter.current_task_branch,
event.agent.role,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
self.formatter.update_agent_status(
self.formatter.current_agent_branch,
event.agent.role,
self.formatter.current_crew_tree,
)
# ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(event.flow_name)
self.formatter.create_flow_tree(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.formatter.start_flow(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.formatter.update_flow_status(
self.formatter.current_flow_tree, event.flow_name, source.flow_id
)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent):
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
event.method_name,
"running",
)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
event.method_name,
"completed",
)
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
self.formatter.update_method_status(
self.formatter.current_method_branch,
self.formatter.current_flow_tree,
event.method_name,
"failed",
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.formatter.handle_tool_usage_started(
self.formatter.current_agent_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
self.formatter.handle_tool_usage_finished(
self.formatter.current_tool_branch,
event.tool_name,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.formatter.handle_tool_usage_error(
self.formatter.current_tool_branch,
event.tool_name,
event.error,
self.formatter.current_crew_tree,
)
# ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.formatter.handle_llm_call_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.formatter.handle_llm_call_completed(
self.formatter.current_tool_branch,
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent):
self.formatter.handle_llm_call_failed(
self.formatter.current_tool_branch,
event.error,
self.formatter.current_crew_tree,
)
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_llm_stream_chunk(source, event: LLMStreamChunkEvent):
self.text_stream.write(event.chunk)
self.text_stream.seek(self.next_chunk)
# Read from the in-memory stream
content = self.text_stream.read()
print(content, end="", flush=True)
self.next_chunk = self.text_stream.tell()
@crewai_event_bus.on(CrewTestStartedEvent)
def on_crew_test_started(source, event: CrewTestStartedEvent):
cloned_crew = source.copy()
@@ -97,192 +290,21 @@ class EventListener(BaseEventListener):
event.inputs,
event.eval_llm or "",
)
self.logger.log(
f"🚀 Crew '{event.crew_name}' started test, {source.id}",
event.timestamp,
self.formatter.handle_crew_test_started(
event.crew_name or "Crew", source.id, event.n_iterations
)
@crewai_event_bus.on(CrewTestCompletedEvent)
def on_crew_test_completed(source, event: CrewTestCompletedEvent):
self.logger.log(
f"✅ Crew '{event.crew_name}' completed test",
event.timestamp,
self.formatter.handle_crew_test_completed(
self.formatter.current_flow_tree,
event.crew_name or "Crew",
)
@crewai_event_bus.on(CrewTestFailedEvent)
def on_crew_test_failed(source, event: CrewTestFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed test",
event.timestamp,
)
@crewai_event_bus.on(CrewTrainStartedEvent)
def on_crew_train_started(source, event: CrewTrainStartedEvent):
self.logger.log(
f"📋 Crew '{event.crew_name}' started train",
event.timestamp,
)
@crewai_event_bus.on(CrewTrainCompletedEvent)
def on_crew_train_completed(source, event: CrewTrainCompletedEvent):
self.logger.log(
f"✅ Crew '{event.crew_name}' completed train",
event.timestamp,
)
@crewai_event_bus.on(CrewTrainFailedEvent)
def on_crew_train_failed(source, event: CrewTrainFailedEvent):
self.logger.log(
f"❌ Crew '{event.crew_name}' failed train",
event.timestamp,
)
# ----------- TASK EVENTS -----------
@crewai_event_bus.on(TaskStartedEvent)
def on_task_started(source, event: TaskStartedEvent):
span = self._telemetry.task_started(crew=source.agent.crew, task=source)
self.execution_spans[source] = span
self.logger.log(
f"📋 Task started: {source.description}",
event.timestamp,
)
@crewai_event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event: TaskCompletedEvent):
span = self.execution_spans.get(source)
if span:
self._telemetry.task_ended(span, source, source.agent.crew)
self.logger.log(
f"✅ Task completed: {source.description}",
event.timestamp,
)
self.execution_spans[source] = None
@crewai_event_bus.on(TaskFailedEvent)
def on_task_failed(source, event: TaskFailedEvent):
span = self.execution_spans.get(source)
if span:
if source.agent and source.agent.crew:
self._telemetry.task_ended(span, source, source.agent.crew)
self.execution_spans[source] = None
self.logger.log(
f"❌ Task failed: {source.description}",
event.timestamp,
)
# ----------- AGENT EVENTS -----------
@crewai_event_bus.on(AgentExecutionStartedEvent)
def on_agent_execution_started(source, event: AgentExecutionStartedEvent):
self.logger.log(
f"🤖 Agent '{event.agent.role}' started task",
event.timestamp,
)
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def on_agent_execution_completed(source, event: AgentExecutionCompletedEvent):
self.logger.log(
f"✅ Agent '{event.agent.role}' completed task",
event.timestamp,
)
# ----------- FLOW EVENTS -----------
@crewai_event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event: FlowCreatedEvent):
self._telemetry.flow_creation_span(event.flow_name)
self.logger.log(
f"🌊 Flow Created: '{event.flow_name}'",
event.timestamp,
)
@crewai_event_bus.on(FlowStartedEvent)
def on_flow_started(source, event: FlowStartedEvent):
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.logger.log(
f"🤖 Flow Started: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event: FlowFinishedEvent):
self.logger.log(
f"👍 Flow Finished: '{event.flow_name}', {source.flow_id}",
event.timestamp,
)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(source, event: MethodExecutionStartedEvent):
self.logger.log(
f"🤖 Flow Method Started: '{event.method_name}'",
event.timestamp,
)
@crewai_event_bus.on(MethodExecutionFailedEvent)
def on_method_execution_failed(source, event: MethodExecutionFailedEvent):
self.logger.log(
f"❌ Flow Method Failed: '{event.method_name}'",
event.timestamp,
)
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(source, event: MethodExecutionFinishedEvent):
self.logger.log(
f"👍 Flow Method Finished: '{event.method_name}'",
event.timestamp,
)
# ----------- TOOL USAGE EVENTS -----------
@crewai_event_bus.on(ToolUsageStartedEvent)
def on_tool_usage_started(source, event: ToolUsageStartedEvent):
self.logger.log(
f"🤖 Tool Usage Started: '{event.tool_name}'",
event.timestamp,
)
@crewai_event_bus.on(ToolUsageFinishedEvent)
def on_tool_usage_finished(source, event: ToolUsageFinishedEvent):
self.logger.log(
f"✅ Tool Usage Finished: '{event.tool_name}'",
event.timestamp,
#
)
@crewai_event_bus.on(ToolUsageErrorEvent)
def on_tool_usage_error(source, event: ToolUsageErrorEvent):
self.logger.log(
f"❌ Tool Usage Error: '{event.tool_name}'",
event.timestamp,
#
)
# ----------- LLM EVENTS -----------
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.logger.log(
f"🤖 LLM Call Started",
event.timestamp,
)
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):
self.logger.log(
f"✅ LLM Call Completed",
event.timestamp,
)
@crewai_event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event: LLMCallFailedEvent):
self.logger.log(
f"❌ LLM Call Failed: '{event.error}'",
event.timestamp,
)
self.formatter.handle_crew_test_failed(event.crew_name or "Crew")
event_listener = EventListener()

View File

@@ -23,6 +23,12 @@ from .flow_events import (
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from .llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from .task_events import (
TaskCompletedEvent,
TaskFailedEvent,
@@ -58,4 +64,8 @@ EventTypes = Union[
ToolUsageFinishedEvent,
ToolUsageErrorEvent,
ToolUsageStartedEvent,
LLMCallStartedEvent,
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMStreamChunkEvent,
]

View File

@@ -1,6 +1,6 @@
from typing import Any, Dict, Optional, Union
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict
from .base_events import CrewEvent
@@ -52,9 +52,11 @@ class MethodExecutionFailedEvent(FlowEvent):
flow_name: str
method_name: str
error: Any
error: Exception
type: str = "method_execution_failed"
model_config = ConfigDict(arbitrary_types_allowed=True)
class FlowFinishedEvent(FlowEvent):
"""Event emitted when a flow completes execution"""

View File

@@ -34,3 +34,10 @@ class LLMCallFailedEvent(CrewEvent):
error: str
type: str = "llm_call_failed"
class LLMStreamChunkEvent(CrewEvent):
"""Event emitted when a streaming chunk is received"""
type: str = "llm_stream_chunk"
chunk: str

View File

@@ -0,0 +1,658 @@
from typing import Dict, Optional
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.tree import Tree
class ConsoleFormatter:
current_crew_tree: Optional[Tree] = None
current_task_branch: Optional[Tree] = None
current_agent_branch: Optional[Tree] = None
current_tool_branch: Optional[Tree] = None
current_flow_tree: Optional[Tree] = None
current_method_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
def __init__(self, verbose: bool = False):
self.console = Console(width=None)
self.verbose = verbose
def create_panel(self, content: Text, title: str, style: str = "blue") -> Panel:
"""Create a standardized panel with consistent styling."""
return Panel(
content,
title=title,
border_style=style,
padding=(1, 2),
)
def create_status_content(
self, title: str, name: str, status_style: str = "blue", **fields
) -> Text:
"""Create standardized status content with consistent formatting."""
content = Text()
content.append(f"{title}\n", style=f"{status_style} bold")
content.append("Name: ", style="white")
content.append(f"{name}\n", style=status_style)
for label, value in fields.items():
content.append(f"{label}: ", style="white")
content.append(
f"{value}\n", style=fields.get(f"{label}_style", status_style)
)
return content
def update_tree_label(
self,
tree: Tree,
prefix: str,
name: str,
style: str = "blue",
status: Optional[str] = None,
) -> None:
"""Update tree label with consistent formatting."""
label = Text()
label.append(f"{prefix} ", style=f"{style} bold")
label.append(name, style=style)
if status:
label.append("\n Status: ", style="white")
label.append(status, style=f"{style} bold")
tree.label = label
def add_tree_node(self, parent: Tree, text: str, style: str = "yellow") -> Tree:
"""Add a node to the tree with consistent styling."""
return parent.add(Text(text, style=style))
def print(self, *args, **kwargs) -> None:
"""Print to console with consistent formatting if verbose is enabled."""
self.console.print(*args, **kwargs)
def print_panel(
self, content: Text, title: str, style: str = "blue", is_flow: bool = False
) -> None:
"""Print a panel with consistent formatting if verbose is enabled."""
panel = self.create_panel(content, title, style)
if is_flow:
self.print(panel)
self.print()
else:
if self.verbose:
self.print(panel)
self.print()
def update_crew_tree(
self,
tree: Optional[Tree],
crew_name: str,
source_id: str,
status: str = "completed",
) -> None:
"""Handle crew tree updates with consistent formatting."""
if not self.verbose or tree is None:
return
if status == "completed":
prefix, style = "✅ Crew:", "green"
title = "Crew Completion"
content_title = "Crew Execution Completed"
elif status == "failed":
prefix, style = "❌ Crew:", "red"
title = "Crew Failure"
content_title = "Crew Execution Failed"
else:
prefix, style = "🚀 Crew:", "cyan"
title = "Crew Execution"
content_title = "Crew Execution Started"
self.update_tree_label(
tree,
prefix,
crew_name or "Crew",
style,
)
content = self.create_status_content(
content_title,
crew_name or "Crew",
style,
ID=source_id,
)
self.print_panel(content, title, style)
def create_crew_tree(self, crew_name: str, source_id: str) -> Optional[Tree]:
"""Create and initialize a new crew tree with initial status."""
if not self.verbose:
return None
tree = Tree(
Text("🚀 Crew: ", style="cyan bold") + Text(crew_name, style="cyan")
)
content = self.create_status_content(
"Crew Execution Started",
crew_name,
"cyan",
ID=source_id,
)
self.print_panel(content, "Crew Execution Started", "cyan")
# Set the current_crew_tree attribute directly
self.current_crew_tree = tree
return tree
def create_task_branch(
self, crew_tree: Optional[Tree], task_id: str
) -> Optional[Tree]:
"""Create and initialize a task branch."""
if not self.verbose:
return None
task_content = Text()
task_content.append(f"📋 Task: {task_id}", style="yellow bold")
task_content.append("\n Status: ", style="white")
task_content.append("Executing Task...", style="yellow dim")
task_branch = None
if crew_tree:
task_branch = crew_tree.add(task_content)
self.print(crew_tree)
else:
self.print_panel(task_content, "Task Started", "yellow")
self.print()
# Set the current_task_branch attribute directly
self.current_task_branch = task_branch
return task_branch
def update_task_status(
self,
crew_tree: Optional[Tree],
task_id: str,
agent_role: str,
status: str = "completed",
) -> None:
"""Update task status in the tree."""
if not self.verbose or crew_tree is None:
return
if status == "completed":
style = "green"
status_text = "✅ Completed"
panel_title = "Task Completion"
else:
style = "red"
status_text = "❌ Failed"
panel_title = "Task Failure"
# Update tree label
for branch in crew_tree.children:
if str(task_id) in str(branch.label):
task_content = Text()
task_content.append(f"📋 Task: {task_id}", style=f"{style} bold")
task_content.append("\n Assigned to: ", style="white")
task_content.append(agent_role, style=style)
task_content.append("\n Status: ", style="white")
task_content.append(status_text, style=f"{style} bold")
branch.label = task_content
self.print(crew_tree)
break
# Show status panel
content = self.create_status_content(
f"Task {status.title()}", str(task_id), style, Agent=agent_role
)
self.print_panel(content, panel_title, style)
def create_agent_branch(
self, task_branch: Optional[Tree], agent_role: str, crew_tree: Optional[Tree]
) -> Optional[Tree]:
"""Create and initialize an agent branch."""
if not self.verbose or not task_branch or not crew_tree:
return None
agent_branch = task_branch.add("")
self.update_tree_label(
agent_branch, "🤖 Agent:", agent_role, "green", "In Progress"
)
self.print(crew_tree)
self.print()
# Set the current_agent_branch attribute directly
self.current_agent_branch = agent_branch
return agent_branch
def update_agent_status(
self,
agent_branch: Optional[Tree],
agent_role: str,
crew_tree: Optional[Tree],
status: str = "completed",
) -> None:
"""Update agent status in the tree."""
if not self.verbose or agent_branch is None or crew_tree is None:
return
self.update_tree_label(
agent_branch,
"🤖 Agent:",
agent_role,
"green",
"✅ Completed" if status == "completed" else "❌ Failed",
)
self.print(crew_tree)
self.print()
def create_flow_tree(self, flow_name: str, flow_id: str) -> Optional[Tree]:
"""Create and initialize a flow tree."""
content = self.create_status_content(
"Starting Flow Execution", flow_name, "blue", ID=flow_id
)
self.print_panel(content, "Flow Execution", "blue", is_flow=True)
# Create initial tree with flow ID
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(flow_name, style="blue")
flow_label.append("\n ID: ", style="white")
flow_label.append(flow_id, style="blue")
flow_tree = Tree(flow_label)
self.add_tree_node(flow_tree, "✨ Created", "blue")
self.add_tree_node(flow_tree, "✅ Initialization Complete", "green")
return flow_tree
def start_flow(self, flow_name: str, flow_id: str) -> Optional[Tree]:
"""Initialize a flow execution tree."""
flow_tree = Tree("")
flow_label = Text()
flow_label.append("🌊 Flow: ", style="blue bold")
flow_label.append(flow_name, style="blue")
flow_label.append("\n ID: ", style="white")
flow_label.append(flow_id, style="blue")
flow_tree.label = flow_label
self.add_tree_node(flow_tree, "🧠 Starting Flow...", "yellow")
self.print(flow_tree)
self.print()
self.current_flow_tree = flow_tree
return flow_tree
def update_flow_status(
self,
flow_tree: Optional[Tree],
flow_name: str,
flow_id: str,
status: str = "completed",
) -> None:
"""Update flow status in the tree."""
if flow_tree is None:
return
# Update main flow label
self.update_tree_label(
flow_tree,
"✅ Flow Finished:" if status == "completed" else "❌ Flow Failed:",
flow_name,
"green" if status == "completed" else "red",
)
# Update initialization node status
for child in flow_tree.children:
if "Starting Flow" in str(child.label):
child.label = Text(
(
"✅ Flow Completed"
if status == "completed"
else "❌ Flow Failed"
),
style="green" if status == "completed" else "red",
)
break
content = self.create_status_content(
(
"Flow Execution Completed"
if status == "completed"
else "Flow Execution Failed"
),
flow_name,
"green" if status == "completed" else "red",
ID=flow_id,
)
self.print(flow_tree)
self.print_panel(
content, "Flow Completion", "green" if status == "completed" else "red"
)
def update_method_status(
self,
method_branch: Optional[Tree],
flow_tree: Optional[Tree],
method_name: str,
status: str = "running",
) -> Optional[Tree]:
"""Update method status in the flow tree."""
if not flow_tree:
return None
if status == "running":
prefix, style = "🔄 Running:", "yellow"
elif status == "completed":
prefix, style = "✅ Completed:", "green"
# Update initialization node when a method completes successfully
for child in flow_tree.children:
if "Starting Flow" in str(child.label):
child.label = Text("Flow Method Step", style="white")
break
else:
prefix, style = "❌ Failed:", "red"
# Update initialization node on failure
for child in flow_tree.children:
if "Starting Flow" in str(child.label):
child.label = Text("❌ Flow Step Failed", style="red")
break
if not method_branch:
# Find or create method branch
for branch in flow_tree.children:
if method_name in str(branch.label):
method_branch = branch
break
if not method_branch:
method_branch = flow_tree.add("")
method_branch.label = Text(prefix, style=f"{style} bold") + Text(
f" {method_name}", style=style
)
self.print(flow_tree)
self.print()
return method_branch
def handle_tool_usage_started(
self,
agent_branch: Optional[Tree],
tool_name: str,
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle tool usage started event."""
if not self.verbose or agent_branch is None or crew_tree is None:
return None
# Update tool usage count
self.tool_usage_counts[tool_name] = self.tool_usage_counts.get(tool_name, 0) + 1
# Find existing tool node or create new one
tool_branch = None
for child in agent_branch.children:
if tool_name in str(child.label):
tool_branch = child
break
if not tool_branch:
tool_branch = agent_branch.add("")
# Update label with current count
self.update_tree_label(
tool_branch,
"🔧",
f"Using {tool_name} ({self.tool_usage_counts[tool_name]})",
"yellow",
)
self.print(crew_tree)
self.print()
# Set the current_tool_branch attribute directly
self.current_tool_branch = tool_branch
return tool_branch
def handle_tool_usage_finished(
self,
tool_branch: Optional[Tree],
tool_name: str,
crew_tree: Optional[Tree],
) -> None:
"""Handle tool usage finished event."""
if not self.verbose or tool_branch is None or crew_tree is None:
return
self.update_tree_label(
tool_branch,
"🔧",
f"Used {tool_name} ({self.tool_usage_counts[tool_name]})",
"green",
)
self.print(crew_tree)
self.print()
def handle_tool_usage_error(
self,
tool_branch: Optional[Tree],
tool_name: str,
error: str,
crew_tree: Optional[Tree],
) -> None:
"""Handle tool usage error event."""
if not self.verbose:
return
if tool_branch:
self.update_tree_label(
tool_branch,
"🔧 Failed",
f"{tool_name} ({self.tool_usage_counts[tool_name]})",
"red",
)
self.print(crew_tree)
self.print()
# Show error panel
error_content = self.create_status_content(
"Tool Usage Failed", tool_name, "red", Error=error
)
self.print_panel(error_content, "Tool Error", "red")
def handle_llm_call_started(
self,
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> Optional[Tree]:
"""Handle LLM call started event."""
if not self.verbose or agent_branch is None or crew_tree is None:
return None
# Only add thinking status if it doesn't exist
if not any("Thinking" in str(child.label) for child in agent_branch.children):
tool_branch = agent_branch.add("")
self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue")
self.print(crew_tree)
self.print()
# Set the current_tool_branch attribute directly
self.current_tool_branch = tool_branch
return tool_branch
return None
def handle_llm_call_completed(
self,
tool_branch: Optional[Tree],
agent_branch: Optional[Tree],
crew_tree: Optional[Tree],
) -> None:
"""Handle LLM call completed event."""
if (
not self.verbose
or tool_branch is None
or agent_branch is None
or crew_tree is None
):
return
# Remove the thinking status node when complete
if "Thinking" in str(tool_branch.label):
agent_branch.children.remove(tool_branch)
self.print(crew_tree)
self.print()
def handle_llm_call_failed(
self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree]
) -> None:
"""Handle LLM call failed event."""
if not self.verbose:
return
# Update tool branch if it exists
if tool_branch:
tool_branch.label = Text("❌ LLM Failed", style="red bold")
self.print(crew_tree)
self.print()
# Show error panel
error_content = Text()
error_content.append("❌ LLM Call Failed\n", style="red bold")
error_content.append("Error: ", style="white")
error_content.append(str(error), style="red")
self.print_panel(error_content, "LLM Error", "red")
def handle_crew_test_started(
self, crew_name: str, source_id: str, n_iterations: int
) -> Optional[Tree]:
"""Handle crew test started event."""
if not self.verbose:
return None
# Create initial panel
content = Text()
content.append("🧪 Starting Crew Test\n\n", style="blue bold")
content.append("Crew: ", style="white")
content.append(f"{crew_name}\n", style="blue")
content.append("ID: ", style="white")
content.append(str(source_id), style="blue")
content.append("\nIterations: ", style="white")
content.append(str(n_iterations), style="yellow")
self.print()
self.print_panel(content, "Test Execution", "blue")
self.print()
# Create and display the test tree
test_label = Text()
test_label.append("🧪 Test: ", style="blue bold")
test_label.append(crew_name or "Crew", style="blue")
test_label.append("\n Status: ", style="white")
test_label.append("In Progress", style="yellow")
test_tree = Tree(test_label)
self.add_tree_node(test_tree, "🔄 Running tests...", "yellow")
self.print(test_tree)
self.print()
return test_tree
def handle_crew_test_completed(
self, flow_tree: Optional[Tree], crew_name: str
) -> None:
"""Handle crew test completed event."""
if not self.verbose:
return
if flow_tree:
# Update test tree label to show completion
test_label = Text()
test_label.append("✅ Test: ", style="green bold")
test_label.append(crew_name or "Crew", style="green")
test_label.append("\n Status: ", style="white")
test_label.append("Completed", style="green bold")
flow_tree.label = test_label
# Update the running tests node
for child in flow_tree.children:
if "Running tests" in str(child.label):
child.label = Text("✅ Tests completed successfully", style="green")
self.print(flow_tree)
self.print()
# Create completion panel
completion_content = Text()
completion_content.append("Test Execution Completed\n", style="green bold")
completion_content.append("Crew: ", style="white")
completion_content.append(f"{crew_name}\n", style="green")
completion_content.append("Status: ", style="white")
completion_content.append("Completed", style="green")
self.print_panel(completion_content, "Test Completion", "green")
def handle_crew_train_started(self, crew_name: str, timestamp: str) -> None:
"""Handle crew train started event."""
if not self.verbose:
return
content = Text()
content.append("📋 Crew Training Started\n", style="blue bold")
content.append("Crew: ", style="white")
content.append(f"{crew_name}\n", style="blue")
content.append("Time: ", style="white")
content.append(timestamp, style="blue")
self.print_panel(content, "Training Started", "blue")
self.print()
def handle_crew_train_completed(self, crew_name: str, timestamp: str) -> None:
"""Handle crew train completed event."""
if not self.verbose:
return
content = Text()
content.append("✅ Crew Training Completed\n", style="green bold")
content.append("Crew: ", style="white")
content.append(f"{crew_name}\n", style="green")
content.append("Time: ", style="white")
content.append(timestamp, style="green")
self.print_panel(content, "Training Completed", "green")
self.print()
def handle_crew_train_failed(self, crew_name: str) -> None:
"""Handle crew train failed event."""
if not self.verbose:
return
failure_content = Text()
failure_content.append("❌ Crew Training Failed\n", style="red bold")
failure_content.append("Crew: ", style="white")
failure_content.append(crew_name or "Crew", style="red")
self.print_panel(failure_content, "Training Failure", "red")
self.print()
def handle_crew_test_failed(self, crew_name: str) -> None:
"""Handle crew test failed event."""
if not self.verbose:
return
failure_content = Text()
failure_content.append("❌ Crew Test Failed\n", style="red bold")
failure_content.append("Crew: ", style="white")
failure_content.append(crew_name or "Crew", style="red")
self.print_panel(failure_content, "Test Failure", "red")
self.print()

View File

@@ -43,8 +43,8 @@ def create_llm(
try:
# Extract attributes with explicit types
model = (
getattr(llm_value, "model_name", None)
or getattr(llm_value, "model", None)
getattr(llm_value, "model", None)
or getattr(llm_value, "model_name", None)
or getattr(llm_value, "deployment_name", None)
or str(llm_value)
)
@@ -77,8 +77,9 @@ def _llm_via_environment_or_fallback() -> Optional[LLM]:
Helper function: if llm_value is None, we load environment variables or fallback default model.
"""
model_name = (
os.environ.get("OPENAI_MODEL_NAME")
or os.environ.get("MODEL")
os.environ.get("MODEL")
or os.environ.get("MODEL_NAME")
or os.environ.get("OPENAI_MODEL_NAME")
or DEFAULT_LLM_MODEL
)