Compare commits

...

14 Commits

Author SHA1 Message Date
Brandon Hancock
0aae59dc1d fix more type errors 2025-01-03 16:42:15 -05:00
Brandon Hancock
b97c4cf2c6 fix llm_utils.py and other type errors 2025-01-03 16:25:59 -05:00
Brandon Hancock
4a794622c7 Fix linting 2025-01-03 15:25:35 -05:00
Brandon Hancock
104e8bc167 Merge branch 'main' into brandon/eng-266-conversation-crew-v1 2025-01-03 15:24:19 -05:00
Brandon Hancock
9e5d4972b9 everything is working for conversation now 2025-01-03 15:18:33 -05:00
Gui Vieira
30bd79390a [ENG-227] Record task execution timestamps (#1844) 2025-01-03 13:12:13 -05:00
Brandon Hancock (bhancock_ai)
3ba15e8bc9 Merge branch 'main' into brandon/eng-266-conversation-crew-v1 2025-01-03 10:41:16 -05:00
Brandon Hancock
0284095ff8 accessing crew directly instead of through uv commands 2024-12-30 14:43:18 -05:00
Brandon Hancock
bcd838a2ff properly return tool call result 2024-12-30 13:32:53 -05:00
Brandon Hancock
5da6d36dd9 Added in Joaos feedback to steer crew chats back towards the purpose of the crew 2024-12-30 11:19:35 -05:00
Brandon Hancock
0e7aa192c0 its alive!! 2024-12-27 13:57:09 -05:00
Brandon Hancock
2f882d68ad high level chat working 2024-12-27 11:21:40 -05:00
Brandon Hancock
2bf5b15f1e core loop should be working and ready for testing. 2024-12-26 14:18:42 -05:00
Brandon Hancock
1c45f730c6 worked on foundation for new conversational crews. Now going to work on chatting. 2024-12-24 14:10:00 -05:00
18 changed files with 1443 additions and 172 deletions

View File

@@ -21,6 +21,7 @@ from crewai.tools.base_tool import Tool
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.converter import generate_model_description
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -139,89 +140,9 @@ class Agent(BaseAgent):
def post_init_setup(self):
self._set_knowledge()
self.agent_ops_agent_name = self.role
unaccepted_attributes = [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_REGION_NAME",
]
# Handle different cases for self.llm
if isinstance(self.llm, str):
# If it's a string, create an LLM instance
self.llm = LLM(model=self.llm)
elif isinstance(self.llm, LLM):
# If it's already an LLM instance, keep it as is
pass
elif self.llm is None:
# Determine the model name from environment variables or use default
model_name = (
os.environ.get("OPENAI_MODEL_NAME")
or os.environ.get("MODEL")
or "gpt-4o-mini"
)
llm_params = {"model": model_name}
api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get(
"OPENAI_BASE_URL"
)
if api_base:
llm_params["base_url"] = api_base
set_provider = model_name.split("/")[0] if "/" in model_name else "openai"
# Iterate over all environment variables to find matching API keys or use defaults
for provider, env_vars in ENV_VARS.items():
if provider == set_provider:
for env_var in env_vars:
# Check if the environment variable is set
key_name = env_var.get("key_name")
if key_name and key_name not in unaccepted_attributes:
env_value = os.environ.get(key_name)
if env_value:
key_name = key_name.lower()
for pattern in LITELLM_PARAMS:
if pattern in key_name:
key_name = pattern
break
llm_params[key_name] = env_value
# Check for default values if the environment variable is not set
elif env_var.get("default", False):
for key, value in env_var.items():
if key not in ["prompt", "key_name", "default"]:
# Only add default if the key is already set in os.environ
if key in os.environ:
llm_params[key] = value
self.llm = LLM(**llm_params)
else:
# For any other type, attempt to extract relevant attributes
llm_params = {
"model": getattr(self.llm, "model_name", None)
or getattr(self.llm, "deployment_name", None)
or str(self.llm),
"temperature": getattr(self.llm, "temperature", None),
"max_tokens": getattr(self.llm, "max_tokens", None),
"logprobs": getattr(self.llm, "logprobs", None),
"timeout": getattr(self.llm, "timeout", None),
"max_retries": getattr(self.llm, "max_retries", None),
"api_key": getattr(self.llm, "api_key", None),
"base_url": getattr(self.llm, "base_url", None),
"organization": getattr(self.llm, "organization", None),
}
# Remove None values to avoid passing unnecessary parameters
llm_params = {k: v for k, v in llm_params.items() if v is not None}
self.llm = LLM(**llm_params)
# Similar handling for function_calling_llm
if self.function_calling_llm:
if isinstance(self.function_calling_llm, str):
self.function_calling_llm = LLM(model=self.function_calling_llm)
elif not isinstance(self.function_calling_llm, LLM):
self.function_calling_llm = LLM(
model=getattr(self.function_calling_llm, "model_name", None)
or getattr(self.function_calling_llm, "deployment_name", None)
or str(self.function_calling_llm)
)
self.llm = create_llm(self.llm)
self.function_calling_llm = create_llm(self.function_calling_llm)
if not self.agent_executor:
self._setup_agent_executor()
@@ -272,6 +193,8 @@ class Agent(BaseAgent):
task_prompt = task.prompt()
print("task_prompt:", task_prompt)
# If the task requires output in JSON or Pydantic format,
# append specific instructions to the task prompt to ensure
# that the final answer does not include any code block markers

View File

@@ -1,11 +1,14 @@
import os
from importlib.metadata import version as get_version
from typing import Optional
from typing import Optional, Tuple
import click
from crewai.cli.add_crew_to_flow import add_crew_to_flow
from crewai.cli.create_crew import create_crew
from crewai.cli.create_flow import create_flow
from crewai.cli.crew_chat import run_chat
from crewai.cli.fetch_chat_llm import fetch_chat_llm
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
@@ -342,5 +345,15 @@ def flow_add_crew(crew_name):
add_crew_to_flow(crew_name)
@crewai.command()
def chat():
"""
Start a conversation with the Crew, collecting user-supplied inputs,
and using the Chat LLM to generate responses.
"""
click.echo("Starting a conversation with the Crew")
run_chat()
if __name__ == "__main__":
crewai()

View File

@@ -158,6 +158,8 @@ MODELS = {
],
}
DEFAULT_LLM_MODEL = "gpt-4o-mini"
JSON_URL = "https://raw.githubusercontent.com/BerriAI/litellm/main/model_prices_and_context_window.json"

386
src/crewai/cli/crew_chat.py Normal file
View File

@@ -0,0 +1,386 @@
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Set, Tuple
import click
import tomli
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.cli.fetch_chat_llm import fetch_chat_llm
from crewai.cli.fetch_crew_inputs import fetch_crew_inputs
from crewai.crew import Crew
from crewai.task import Task
from crewai.types.crew_chat import ChatInputField, ChatInputs
from crewai.utilities.llm_utils import create_llm
def run_chat():
"""
Runs an interactive chat loop using the Crew's chat LLM with function calling.
Incorporates crew_name, crew_description, and input fields to build a tool schema.
Exits if crew_name or crew_description are missing.
"""
crew, crew_name = load_crew_and_name()
click.secho("\nFetching the Chat LLM...", fg="cyan")
try:
chat_llm = create_llm(crew.chat_llm)
except Exception as e:
click.secho(f"Failed to retrieve Chat LLM: {e}", fg="red")
return
if not chat_llm:
click.secho("No valid Chat LLM returned. Exiting.", fg="red")
return
# Generate crew chat inputs automatically
crew_chat_inputs = generate_crew_chat_inputs(crew, crew_name, chat_llm)
print("crew_inputs:", crew_chat_inputs)
# Generate a tool schema from the crew inputs
crew_tool_schema = generate_crew_tool_schema(crew_chat_inputs)
print("crew_tool_schema:", crew_tool_schema)
# Build initial system message
required_fields_str = (
", ".join(
f"{field.name} (desc: {field.description or 'n/a'})"
for field in crew_chat_inputs.inputs
)
or "(No required fields detected)"
)
system_message = (
"You are a helpful AI assistant for the CrewAI platform. "
"Your primary purpose is to assist users with the crew's specific tasks. "
"You can answer general questions, but should guide users back to the crew's purpose afterward. "
"For example, after answering a general question, remind the user of your main purpose, such as generating a research report, and prompt them to specify a topic or task related to the crew's purpose. "
"You have a function (tool) you can call by name if you have all required inputs. "
f"Those required inputs are: {required_fields_str}. "
"Once you have them, call the function. "
"Please keep your responses concise and friendly. "
"If a user asks a question outside the crew's scope, provide a brief answer and remind them of the crew's purpose. "
"After calling the tool, be prepared to take user feedback and make adjustments as needed. "
"If you are ever unsure about a user's request or need clarification, ask the user for more information."
f"\nCrew Name: {crew_chat_inputs.crew_name}"
f"\nCrew Description: {crew_chat_inputs.crew_description}"
)
messages = [
{"role": "system", "content": system_message},
]
# Create a wrapper function that captures 'crew' and 'messages' from the enclosing scope
def run_crew_tool_with_messages(**kwargs):
return run_crew_tool(crew, messages, **kwargs)
# Prepare available_functions with the wrapper function
available_functions = {
crew_chat_inputs.crew_name: run_crew_tool_with_messages,
}
click.secho(
"\nEntering an interactive chat loop with function-calling.\n"
"Type 'exit' or Ctrl+C to quit.\n",
fg="cyan",
)
# Main chat loop
while True:
try:
user_input = click.prompt("You", type=str)
if user_input.strip().lower() in ["exit", "quit"]:
click.echo("Exiting chat. Goodbye!")
break
# Append user message
messages.append({"role": "user", "content": user_input})
# Invoke the LLM, passing tools and available_functions
final_response = chat_llm.call(
messages=messages,
tools=[crew_tool_schema],
available_functions=available_functions,
)
# Append assistant's reply
messages.append({"role": "assistant", "content": final_response})
# Display assistant's reply
click.secho(f"\nAssistant: {final_response}\n", fg="green")
except KeyboardInterrupt:
click.echo("\nExiting chat. Goodbye!")
break
except Exception as e:
click.secho(f"An error occurred: {e}", fg="red")
break
def generate_crew_tool_schema(crew_inputs: ChatInputs) -> dict:
"""
Dynamically build a Littellm 'function' schema for the given crew.
crew_name: The name of the crew (used for the function 'name').
crew_inputs: A ChatInputs object containing crew_description
and a list of input fields (each with a name & description).
"""
properties = {}
for field in crew_inputs.inputs:
properties[field.name] = {
"type": "string",
"description": field.description or "No description provided",
}
required_fields = [field.name for field in crew_inputs.inputs]
return {
"type": "function",
"function": {
"name": crew_inputs.crew_name,
"description": crew_inputs.crew_description or "No crew description",
"parameters": {
"type": "object",
"properties": properties,
"required": required_fields,
},
},
}
def run_crew_tool(crew: Crew, messages: List[Dict[str, str]], **kwargs):
"""
Runs the crew using crew.kickoff(inputs=kwargs) and returns the output.
Args:
crew (Crew): The crew instance to run.
messages (List[Dict[str, str]]): The chat messages up to this point.
**kwargs: The inputs collected from the user.
Returns:
str: The output from the crew's execution.
Raises:
SystemExit: Exits the chat if an error occurs during crew execution.
"""
try:
# Serialize 'messages' to JSON string before adding to kwargs
kwargs['crew_chat_messages'] = json.dumps(messages)
# Run the crew with the provided inputs
crew_output = crew.kickoff(inputs=kwargs)
# Convert CrewOutput to a string to send back to the user
result = str(crew_output)
return result
except Exception as e:
# Exit the chat and show the error message
click.secho("An error occurred while running the crew:", fg="red")
click.secho(str(e), fg="red")
sys.exit(1)
def load_crew_and_name() -> Tuple[Crew, str]:
"""
Loads the crew by importing the crew class from the user's project.
Returns:
Tuple[Crew, str]: A tuple containing the Crew instance and the name of the crew.
"""
# Get the current working directory
cwd = Path.cwd()
# Path to the pyproject.toml file
pyproject_path = cwd / "pyproject.toml"
if not pyproject_path.exists():
raise FileNotFoundError("pyproject.toml not found in the current directory.")
# Load the pyproject.toml file using 'tomli'
with pyproject_path.open("rb") as f:
pyproject_data = tomli.load(f)
# Get the project name from the 'project' section
project_name = pyproject_data["project"]["name"]
folder_name = project_name
# Derive the crew class name from the project name
# E.g., if project_name is 'my_project', crew_class_name is 'MyProject'
crew_class_name = project_name.replace("_", " ").title().replace(" ", "")
# Add the 'src' directory to sys.path
src_path = cwd / "src"
if str(src_path) not in sys.path:
sys.path.insert(0, str(src_path))
# Import the crew module
crew_module_name = f"{folder_name}.crew"
try:
crew_module = __import__(crew_module_name, fromlist=[crew_class_name])
except ImportError as e:
raise ImportError(f"Failed to import crew module {crew_module_name}: {e}")
# Get the crew class from the module
try:
crew_class = getattr(crew_module, crew_class_name)
except AttributeError:
raise AttributeError(
f"Crew class {crew_class_name} not found in module {crew_module_name}"
)
# Instantiate the crew
crew_instance = crew_class().crew()
return crew_instance, crew_class_name
def generate_crew_chat_inputs(crew: Crew, crew_name: str, chat_llm) -> ChatInputs:
"""
Generates the ChatInputs required for the crew by analyzing the tasks and agents.
Args:
crew (Crew): The crew object containing tasks and agents.
crew_name (str): The name of the crew.
chat_llm: The chat language model to use for AI calls.
Returns:
ChatInputs: An object containing the crew's name, description, and input fields.
"""
# Extract placeholders from tasks and agents
required_inputs = fetch_required_inputs(crew)
# Generate descriptions for each input using AI
input_fields = []
for input_name in required_inputs:
description = generate_input_description_with_ai(input_name, crew, chat_llm)
input_fields.append(ChatInputField(name=input_name, description=description))
# Generate crew description using AI
crew_description = generate_crew_description_with_ai(crew, chat_llm)
return ChatInputs(
crew_name=crew_name,
crew_description=crew_description,
inputs=input_fields
)
def fetch_required_inputs(crew: Crew) -> Set[str]:
"""
Extracts placeholders from the crew's tasks and agents.
Args:
crew (Crew): The crew object.
Returns:
Set[str]: A set of placeholder names.
"""
placeholder_pattern = re.compile(r"\{(.+?)\}")
required_inputs: Set[str] = set()
# Scan tasks
for task in crew.tasks:
text = f"{task.description or ''} {task.expected_output or ''}"
required_inputs.update(placeholder_pattern.findall(text))
# Scan agents
for agent in crew.agents:
text = f"{agent.role or ''} {agent.goal or ''} {agent.backstory or ''}"
required_inputs.update(placeholder_pattern.findall(text))
return required_inputs
def generate_input_description_with_ai(input_name: str, crew: Crew, chat_llm) -> str:
"""
Generates an input description using AI based on the context of the crew.
Args:
input_name (str): The name of the input placeholder.
crew (Crew): The crew object.
chat_llm: The chat language model to use for AI calls.
Returns:
str: A concise description of the input.
"""
# Gather context from tasks and agents where the input is used
context_texts = []
placeholder_pattern = re.compile(r"\{(.+?)\}")
for task in crew.tasks:
if f"{{{input_name}}}" in task.description or f"{{{input_name}}}" in task.expected_output:
# Replace placeholders with input names
task_description = placeholder_pattern.sub(lambda m: m.group(1), task.description)
expected_output = placeholder_pattern.sub(lambda m: m.group(1), task.expected_output)
context_texts.append(f"Task Description: {task_description}")
context_texts.append(f"Expected Output: {expected_output}")
for agent in crew.agents:
if f"{{{input_name}}}" in agent.role or f"{{{input_name}}}" in agent.goal or f"{{{input_name}}}" in agent.backstory:
# Replace placeholders with input names
agent_role = placeholder_pattern.sub(lambda m: m.group(1), agent.role)
agent_goal = placeholder_pattern.sub(lambda m: m.group(1), agent.goal)
agent_backstory = placeholder_pattern.sub(lambda m: m.group(1), agent.backstory)
context_texts.append(f"Agent Role: {agent_role}")
context_texts.append(f"Agent Goal: {agent_goal}")
context_texts.append(f"Agent Backstory: {agent_backstory}")
context = "\n".join(context_texts)
if not context:
# If no context is found for the input, raise an exception as per instruction
raise ValueError(f"No context found for input '{input_name}'.")
prompt = (
f"Based on the following context, write a concise description (15 words or less) of the input '{input_name}'.\n"
"Provide only the description, without any extra text or labels. Do not include placeholders like '{topic}' in the description.\n"
"Context:\n"
f"{context}"
)
response = chat_llm.call(messages=[{"role": "user", "content": prompt}])
description = response.strip()
return description
def generate_crew_description_with_ai(crew: Crew, chat_llm) -> str:
"""
Generates a brief description of the crew using AI.
Args:
crew (Crew): The crew object.
chat_llm: The chat language model to use for AI calls.
Returns:
str: A concise description of the crew's purpose (15 words or less).
"""
# Gather context from tasks and agents
context_texts = []
placeholder_pattern = re.compile(r"\{(.+?)\}")
for task in crew.tasks:
# Replace placeholders with input names
task_description = placeholder_pattern.sub(lambda m: m.group(1), task.description)
expected_output = placeholder_pattern.sub(lambda m: m.group(1), task.expected_output)
context_texts.append(f"Task Description: {task_description}")
context_texts.append(f"Expected Output: {expected_output}")
for agent in crew.agents:
# Replace placeholders with input names
agent_role = placeholder_pattern.sub(lambda m: m.group(1), agent.role)
agent_goal = placeholder_pattern.sub(lambda m: m.group(1), agent.goal)
agent_backstory = placeholder_pattern.sub(lambda m: m.group(1), agent.backstory)
context_texts.append(f"Agent Role: {agent_role}")
context_texts.append(f"Agent Goal: {agent_goal}")
context_texts.append(f"Agent Backstory: {agent_backstory}")
context = "\n".join(context_texts)
if not context:
raise ValueError("No context found for generating crew description.")
prompt = (
"Based on the following context, write a concise, action-oriented description (15 words or less) of the crew's purpose.\n"
"Provide only the description, without any extra text or labels. Do not include placeholders like '{topic}' in the description.\n"
"Context:\n"
f"{context}"
)
response = chat_llm.call(messages=[{"role": "user", "content": prompt}])
crew_description = response.strip()
return crew_description

View File

@@ -0,0 +1,81 @@
import json
import subprocess
import click
from packaging import version
from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
from crewai.llm import LLM
def fetch_chat_llm() -> LLM:
"""
Fetch the chat LLM by running "uv run fetch_chat_llm" (or your chosen script name),
parsing its JSON stdout, and returning an LLM instance.
This expects the script "fetch_chat_llm" to print out JSON that represents the
LLM parameters (e.g., by calling something like: print(json.dumps(llm.to_dict()))).
Any error, whether from the subprocess or JSON parsing, will raise a RuntimeError.
"""
# You may change this command to match whatever's in your pyproject.toml [project.scripts].
command = ["uv", "run", "fetch_chat_llm"]
crewai_version = get_crewai_version()
min_required_version = "0.87.0" # Adjust as needed
pyproject_data = read_toml()
# If old poetry-based setup is detected and version is below min_required_version
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
click.secho(
f"You are running an older version of crewAI ({crewai_version}) that uses poetry pyproject.toml.\n"
f"Please run `crewai update` to transition your pyproject.toml to use uv.",
fg="red",
)
# Initialize a reference to your LLM
llm_instance = None
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
stdout_lines = result.stdout.strip().splitlines()
# Find the line that contains the JSON data
json_line = next(
(
line
for line in stdout_lines
if line.startswith("{") and line.endswith("}")
),
None,
)
if not json_line:
raise RuntimeError(
"No valid JSON output received from `fetch_chat_llm` command."
)
try:
llm_data = json.loads(json_line)
llm_instance = LLM.from_dict(llm_data)
except json.JSONDecodeError as e:
raise RuntimeError(
f"Unable to parse JSON from `fetch_chat_llm` output: {e}\nOutput: {repr(json_line)}"
) from e
except subprocess.CalledProcessError as e:
raise RuntimeError(f"An error occurred while fetching chat LLM: {e}") from e
except Exception as e:
raise RuntimeError(
f"An unexpected error occurred while fetching chat LLM: {e}"
) from e
if not llm_instance:
raise RuntimeError("Failed to create a valid LLM from `fetch_chat_llm` output.")
return llm_instance

View File

@@ -0,0 +1,86 @@
import json
import subprocess
from typing import Optional
import click
from packaging import version
from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
from crewai.types.crew_chat import ChatInputs
def fetch_crew_inputs() -> Optional[ChatInputs]:
"""
Fetch the crew's ChatInputs (a structure containing crew_description and input fields)
by running "uv run fetch_chat_inputs", which prints JSON representing a ChatInputs object.
This function will parse that JSON and return a ChatInputs instance.
If the output is empty or invalid, an empty ChatInputs object is returned.
"""
command = ["uv", "run", "fetch_chat_inputs"]
crewai_version = get_crewai_version()
min_required_version = "0.87.0"
pyproject_data = read_toml()
crew_name = pyproject_data.get("project", {}).get("name", None)
# If you're on an older poetry-based setup and version < min_required_version
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
click.secho(
f"You are running an older version of crewAI ({crewai_version}) that uses poetry pyproject.toml.\n"
f"Please run `crewai update` to update your pyproject.toml to use uv.",
fg="red",
)
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
stdout_lines = result.stdout.strip().splitlines()
# Find the line that contains the JSON data
json_line = next(
(
line
for line in stdout_lines
if line.startswith("{") and line.endswith("}")
),
None,
)
if not json_line:
click.echo(
"No valid JSON output received from `fetch_chat_inputs` command.",
err=True,
)
return None
try:
raw_data = json.loads(json_line)
chat_inputs = ChatInputs(**raw_data)
if crew_name:
chat_inputs.crew_name = crew_name
return chat_inputs
except json.JSONDecodeError as e:
click.echo(
f"Unable to parse JSON from `fetch_chat_inputs` output: {e}\nOutput: {repr(json_line)}",
err=True,
)
return None
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while fetching chat inputs: {e}", err=True)
click.echo(e.output, err=True, nl=True)
if pyproject_data.get("tool", {}).get("poetry"):
click.secho(
"It's possible that you are using an old version of crewAI that uses poetry.\n"
"Please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
return None

View File

@@ -1,8 +1,10 @@
#!/usr/bin/env python
import sys
import json
import warnings
from {{folder_name}}.crew import {{crew_name}}
from crewai.utilities.llm_utils import create_llm
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
@@ -13,12 +15,30 @@ warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
def run():
"""
Run the crew.
Run the crew, allowing CLI overrides for required inputs.
Usage example:
uv run run_crew -- --topic="New Topic" --some_other_field="Value"
"""
# Default inputs
inputs = {
'topic': 'AI LLMs'
# Add any other default fields here
}
{{crew_name}}().crew().kickoff(inputs=inputs)
# 1) Gather overrides from sys.argv
# sys.argv might look like: ['run_crew', '--topic=NewTopic']
# But be aware that if you're calling "uv run run_crew", sys.argv might have
# additional items. So we typically skip the first 1 or 2 items to get only overrides.
overrides = parse_cli_overrides(sys.argv[1:])
# 2) Merge the overrides into defaults
inputs.update(overrides)
# 3) Kick off the crew with final inputs
try:
{{crew_name}}().crew().kickoff(inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while running the crew: {e}")
def train():
@@ -55,4 +75,94 @@ def test():
{{crew_name}}().crew().test(n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
raise Exception(f"An error occurred while testing the crew: {e}")
def fetch_inputs():
"""
Command that gathers required placeholders/inputs from the Crew, then
prints them as JSON to stdout so external scripts can parse them easily.
"""
try:
crew = {{crew_name}}().crew()
crew_inputs = crew.fetch_inputs()
json_string = json.dumps(list(crew_inputs))
print(json_string)
except Exception as e:
raise Exception(f"An error occurred while fetching inputs: {e}")
def fetch_chat_llm():
"""
Command that fetches the 'chat_llm' property from the Crew,
instantiates it via create_llm(),
and prints the resulting LLM as JSON (using LLM.to_dict()) to stdout.
"""
try:
crew = {{crew_name}}().crew()
raw_chat_llm = getattr(crew, "chat_llm", None)
if not raw_chat_llm:
# If the crew doesn't have chat_llm, fallback to create_llm(None)
final_llm = create_llm(None)
else:
# raw_chat_llm might be a dict, or an LLM, or something else
final_llm = create_llm(raw_chat_llm)
if final_llm:
# Print the final LLM as JSON, so fetch_chat_llm.py can parse it
from crewai.llm import LLM # Import here to avoid circular references
# Make sure it's an instance of the LLM class:
if isinstance(final_llm, LLM):
print(json.dumps(final_llm.to_dict()))
else:
# If somehow it's not an LLM, try to interpret as a dict
# or revert to an empty fallback
if isinstance(final_llm, dict):
print(json.dumps(final_llm))
else:
print(json.dumps({}))
else:
print(json.dumps({}))
except Exception as e:
raise Exception(f"An error occurred while fetching chat LLM: {e}")
# TODO: Talk to Joao about making using LLM calls to analyze the crew
# and generate all of this information automatically
def fetch_chat_inputs():
"""
Command that fetches the 'chat_inputs' property from the Crew,
and prints it as JSON to stdout.
"""
try:
crew = {{crew_name}}().crew()
raw_chat_inputs = getattr(crew, "chat_inputs", None)
if raw_chat_inputs:
# Convert to dictionary to print JSON
print(json.dumps(raw_chat_inputs.model_dump()))
else:
# If crew.chat_inputs is None or empty, print an empty JSON
print(json.dumps({}))
except Exception as e:
raise Exception(f"An error occurred while fetching chat inputs: {e}")
def parse_cli_overrides(args_list) -> dict:
"""
Parse arguments in the form of --key=value from a list of CLI arguments.
Return them as a dict. For example:
['--topic=AI LLMs', '--username=John'] => {'topic': 'AI LLMs', 'username': 'John'}
"""
overrides = {}
for arg in args_list:
if arg.startswith("--"):
# remove the leading --
trimmed = arg[2:]
if "=" in trimmed:
key, val = trimmed.split("=", 1)
overrides[key] = val
else:
# If someone passed something like --topic (no =),
# either handle differently or ignore
pass
return overrides

View File

@@ -1,10 +1,11 @@
import asyncio
import json
import re
import uuid
import warnings
from concurrent.futures import Future
from hashlib import md5
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from pydantic import (
UUID4,
@@ -36,6 +37,7 @@ from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.base_tool import Tool
from crewai.types.crew_chat import ChatInputs
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -203,6 +205,14 @@ class Crew(BaseModel):
default=None,
description="Knowledge sources for the crew. Add knowledge sources to the knowledge object.",
)
chat_llm: Optional[Any] = Field(
default=None,
description="LLM used to handle chatting with the crew.",
)
chat_inputs: Optional[ChatInputs] = Field(
default=None,
description="Holds descriptions of the crew as well as named inputs for chat usage.",
)
_knowledge: Optional[Knowledge] = PrivateAttr(
default=None,
)
@@ -991,6 +1001,31 @@ class Crew(BaseModel):
return self._knowledge.query(query)
return None
def fetch_inputs(self) -> Set[str]:
"""
Gathers placeholders (e.g., {something}) referenced in tasks or agents.
Scans each task's 'description' + 'expected_output', and each agent's
'role', 'goal', and 'backstory'.
Returns a set of all discovered placeholder names.
"""
placeholder_pattern = re.compile(r"\{(.+?)\}")
required_inputs: Set[str] = set()
# Scan tasks for inputs
for task in self.tasks:
# description and expected_output might contain e.g. {topic}, {user_name}, etc.
text = f"{task.description or ''} {task.expected_output or ''}"
required_inputs.update(placeholder_pattern.findall(text))
# Scan agents for inputs
for agent in self.agents:
# role, goal, backstory might have placeholders like {role_detail}, etc.
text = f"{agent.role or ''} {agent.goal or ''} {agent.backstory or ''}"
required_inputs.update(placeholder_pattern.findall(text))
return required_inputs
def copy(self):
"""Create a deep copy of the Crew."""
@@ -1046,7 +1081,7 @@ class Crew(BaseModel):
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolates the inputs in the tasks and agents."""
[
task.interpolate_inputs(
task.interpolate_inputs_and_add_conversation_history(
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
inputs
)

View File

@@ -1,20 +1,27 @@
import json
import logging
import os
import sys
import threading
import warnings
from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, cast
from dotenv import load_dotenv
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
import litellm
from litellm import get_supported_openai_params
from litellm import Choices, get_supported_openai_params
from litellm.types.utils import ModelResponse
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
load_dotenv()
class FilteredStream:
def __init__(self, original_stream):
@@ -23,6 +30,7 @@ class FilteredStream:
def write(self, s) -> int:
with self._lock:
# Filter out extraneous messages from LiteLLM
if (
"Give Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new"
in s
@@ -84,11 +92,9 @@ def suppress_warnings():
old_stderr = sys.stderr
sys.stdout = FilteredStream(old_stdout)
sys.stderr = FilteredStream(old_stderr)
try:
yield
finally:
# Restore stdout and stderr
sys.stdout = old_stdout
sys.stderr = old_stderr
@@ -109,13 +115,12 @@ class LLM:
logit_bias: Optional[Dict[int, float]] = None,
response_format: Optional[Dict[str, Any]] = None,
seed: Optional[int] = None,
logprobs: Optional[bool] = None,
logprobs: Optional[int] = None,
top_logprobs: Optional[int] = None,
base_url: Optional[str] = None,
api_version: Optional[str] = None,
api_key: Optional[str] = None,
callbacks: List[Any] = [],
**kwargs,
):
self.model = model
self.timeout = timeout
@@ -137,19 +142,96 @@ class LLM:
self.api_key = api_key
self.callbacks = callbacks
self.context_window_size = 0
self.kwargs = kwargs
# For safety, we disable passing init params to next calls
litellm.drop_params = True
self.set_callbacks(callbacks)
self.set_env_callbacks()
def call(self, messages: List[Dict[str, str]], callbacks: List[Any] = []) -> str:
def to_dict(self) -> dict:
"""
Return a dict of all relevant parameters for serialization.
"""
return {
"model": self.model,
"timeout": self.timeout,
"temperature": self.temperature,
"top_p": self.top_p,
"n": self.n,
"stop": self.stop,
"max_completion_tokens": self.max_completion_tokens,
"max_tokens": self.max_tokens,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,
"logit_bias": self.logit_bias,
"response_format": self.response_format,
"seed": self.seed,
"logprobs": self.logprobs,
"top_logprobs": self.top_logprobs,
"base_url": self.base_url,
"api_version": self.api_version,
"api_key": self.api_key,
"callbacks": self.callbacks,
}
@classmethod
def from_dict(cls, data: dict) -> "LLM":
"""
Create an LLM instance from a dict.
We assume the dict has all relevant keys that match what's in the constructor.
"""
known_fields = {}
known_fields["model"] = data.pop("model", None)
known_fields["timeout"] = data.pop("timeout", None)
known_fields["temperature"] = data.pop("temperature", None)
known_fields["top_p"] = data.pop("top_p", None)
known_fields["n"] = data.pop("n", None)
known_fields["stop"] = data.pop("stop", None)
known_fields["max_completion_tokens"] = data.pop("max_completion_tokens", None)
known_fields["max_tokens"] = data.pop("max_tokens", None)
known_fields["presence_penalty"] = data.pop("presence_penalty", None)
known_fields["frequency_penalty"] = data.pop("frequency_penalty", None)
known_fields["logit_bias"] = data.pop("logit_bias", None)
known_fields["response_format"] = data.pop("response_format", None)
known_fields["seed"] = data.pop("seed", None)
known_fields["logprobs"] = data.pop("logprobs", None)
known_fields["top_logprobs"] = data.pop("top_logprobs", None)
known_fields["base_url"] = data.pop("base_url", None)
known_fields["api_version"] = data.pop("api_version", None)
known_fields["api_key"] = data.pop("api_key", None)
known_fields["callbacks"] = data.pop("callbacks", None)
return cls(**known_fields, **data)
def call(
self,
messages: List[Dict[str, str]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> str:
"""
High-level call method that:
1) Calls litellm.completion
2) Checks for function/tool calls
3) If a tool call is found:
a) executes the function
b) returns the result
4) If no tool call, returns the text response
:param messages: The conversation messages
:param tools: Optional list of function schemas for function calling
:param callbacks: Optional list of callbacks
:param available_functions: A dictionary mapping function_name -> actual Python function
:return: Final text response from the LLM or the tool result
"""
with suppress_warnings():
if callbacks and len(callbacks) > 0:
if callbacks:
self.set_callbacks(callbacks)
try:
# --- 1) Make the completion call
params = {
"model": self.model,
"messages": messages,
@@ -170,21 +252,65 @@ class LLM:
"api_version": self.api_version,
"api_key": self.api_key,
"stream": False,
**self.kwargs,
"tools": tools, # pass the tool schema
}
# Remove None values to avoid passing unnecessary parameters
# Remove None values
params = {k: v for k, v in params.items() if v is not None}
response = litellm.completion(**params)
return response["choices"][0]["message"]["content"]
response_message = cast(Choices, cast(ModelResponse, response).choices)[
0
].message
text_response = response_message.content or ""
tool_calls = getattr(response_message, "tool_calls", [])
# --- 2) If no tool calls, return the text response
if not tool_calls or not available_functions:
return text_response
# --- 3) Handle the tool call
tool_call = tool_calls[0]
function_name = tool_call.function.name
if function_name in available_functions:
# Parse arguments
try:
function_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError as e:
logging.warning(f"Failed to parse function arguments: {e}")
return text_response # Fallback to text response
fn = available_functions[function_name]
try:
# Call the actual tool function
result = fn(**function_args)
print(f"Result from function '{function_name}': {result}")
# Return the result directly
return result
except Exception as e:
logging.error(
f"Error executing function '{function_name}': {e}"
)
return text_response # Fallback to text response
else:
logging.warning(
f"Tool call requested unknown function '{function_name}'"
)
return text_response # Fallback to text response
except Exception as e:
# Check if context length was exceeded, otherwise log
if not LLMContextLengthExceededException(
str(e)
)._is_context_limit_error(str(e)):
logging.error(f"LiteLLM call failed: {str(e)}")
raise # Re-raise the exception after logging
# Re-raise the exception
raise
def supports_function_calling(self) -> bool:
try:
@@ -203,7 +329,10 @@ class LLM:
return False
def get_context_window_size(self) -> int:
# Only using 75% of the context window size to avoid cutting the message in the middle
"""
Returns the context window size, using 75% of the maximum to avoid
cutting off messages mid-thread.
"""
if self.context_window_size != 0:
return self.context_window_size
@@ -216,6 +345,10 @@ class LLM:
return self.context_window_size
def set_callbacks(self, callbacks: List[Any]):
"""
Attempt to keep a single set of callbacks in litellm by removing old
duplicates and adding new ones.
"""
callback_types = [type(callback) for callback in callbacks]
for callback in litellm.success_callback[:]:
if type(callback) in callback_types:
@@ -230,34 +363,19 @@ class LLM:
def set_env_callbacks(self):
"""
Sets the success and failure callbacks for the LiteLLM library from environment variables.
This method reads the `LITELLM_SUCCESS_CALLBACKS` and `LITELLM_FAILURE_CALLBACKS`
environment variables, which should contain comma-separated lists of callback names.
It then assigns these lists to `litellm.success_callback` and `litellm.failure_callback`,
respectively.
If the environment variables are not set or are empty, the corresponding callback lists
will be set to empty lists.
Example:
LITELLM_SUCCESS_CALLBACKS="langfuse,langsmith"
LITELLM_FAILURE_CALLBACKS="langfuse"
This will set `litellm.success_callback` to ["langfuse", "langsmith"] and
`litellm.failure_callback` to ["langfuse"].
"""
success_callbacks_str = os.environ.get("LITELLM_SUCCESS_CALLBACKS", "")
success_callbacks = []
if success_callbacks_str:
success_callbacks = [
callback.strip() for callback in success_callbacks_str.split(",")
cb.strip() for cb in success_callbacks_str.split(",") if cb.strip()
]
failure_callbacks_str = os.environ.get("LITELLM_FAILURE_CALLBACKS", "")
failure_callbacks = []
if failure_callbacks_str:
failure_callbacks = [
callback.strip() for callback in failure_callbacks_str.split(",")
cb.strip() for cb in failure_callbacks_str.split(",") if cb.strip()
]
litellm.success_callback = success_callbacks

View File

@@ -127,38 +127,41 @@ class Task(BaseModel):
processed_by_agents: Set[str] = Field(default_factory=set)
guardrail: Optional[Callable[[TaskOutput], Tuple[bool, Any]]] = Field(
default=None,
description="Function to validate task output before proceeding to next task"
description="Function to validate task output before proceeding to next task",
)
max_retries: int = Field(
default=3,
description="Maximum number of retries when guardrail fails"
default=3, description="Maximum number of retries when guardrail fails"
)
retry_count: int = Field(
default=0,
description="Current number of retries"
retry_count: int = Field(default=0, description="Current number of retries")
start_time: Optional[datetime.datetime] = Field(
default=None, description="Start time of the task execution"
)
end_time: Optional[datetime.datetime] = Field(
default=None, description="End time of the task execution"
)
@field_validator("guardrail")
@classmethod
def validate_guardrail_function(cls, v: Optional[Callable]) -> Optional[Callable]:
"""Validate that the guardrail function has the correct signature and behavior.
While type hints provide static checking, this validator ensures runtime safety by:
1. Verifying the function accepts exactly one parameter (the TaskOutput)
2. Checking return type annotations match Tuple[bool, Any] if present
3. Providing clear, immediate error messages for debugging
This runtime validation is crucial because:
- Type hints are optional and can be ignored at runtime
- Function signatures need immediate validation before task execution
- Clear error messages help users debug guardrail implementation issues
Args:
v: The guardrail function to validate
Returns:
The validated guardrail function
Raises:
ValueError: If the function signature is invalid or return annotation
doesn't match Tuple[bool, Any]
@@ -171,8 +174,13 @@ class Task(BaseModel):
# Check return annotation if present, but don't require it
return_annotation = sig.return_annotation
if return_annotation != inspect.Signature.empty:
if not (return_annotation == Tuple[bool, Any] or str(return_annotation) == 'Tuple[bool, Any]'):
raise ValueError("If return type is annotated, it must be Tuple[bool, Any]")
if not (
return_annotation == Tuple[bool, Any]
or str(return_annotation) == "Tuple[bool, Any]"
):
raise ValueError(
"If return type is annotated, it must be Tuple[bool, Any]"
)
return v
_telemetry: Telemetry = PrivateAttr(default_factory=Telemetry)
@@ -181,7 +189,6 @@ class Task(BaseModel):
_original_expected_output: Optional[str] = PrivateAttr(default=None)
_original_output_file: Optional[str] = PrivateAttr(default=None)
_thread: Optional[threading.Thread] = PrivateAttr(default=None)
_execution_time: Optional[float] = PrivateAttr(default=None)
@model_validator(mode="before")
@classmethod
@@ -206,25 +213,19 @@ class Task(BaseModel):
"may_not_set_field", "This field is not to be set by the user.", {}
)
def _set_start_execution_time(self) -> float:
return datetime.datetime.now().timestamp()
def _set_end_execution_time(self, start_time: float) -> None:
self._execution_time = datetime.datetime.now().timestamp() - start_time
@field_validator("output_file")
@classmethod
def output_file_validation(cls, value: Optional[str]) -> Optional[str]:
"""Validate the output file path.
Args:
value: The output file path to validate. Can be None or a string.
If the path contains template variables (e.g. {var}), leading slashes are preserved.
For regular paths, leading slashes are stripped.
Returns:
The validated and potentially modified path, or None if no path was provided.
Raises:
ValueError: If the path contains invalid characters, path traversal attempts,
or other security concerns.
@@ -234,18 +235,24 @@ class Task(BaseModel):
# Basic security checks
if ".." in value:
raise ValueError("Path traversal attempts are not allowed in output_file paths")
raise ValueError(
"Path traversal attempts are not allowed in output_file paths"
)
# Check for shell expansion first
if value.startswith('~') or value.startswith('$'):
raise ValueError("Shell expansion characters are not allowed in output_file paths")
if value.startswith("~") or value.startswith("$"):
raise ValueError(
"Shell expansion characters are not allowed in output_file paths"
)
# Then check other shell special characters
if any(char in value for char in ['|', '>', '<', '&', ';']):
raise ValueError("Shell special characters are not allowed in output_file paths")
if any(char in value for char in ["|", ">", "<", "&", ";"]):
raise ValueError(
"Shell special characters are not allowed in output_file paths"
)
# Don't strip leading slash if it's a template path with variables
if "{" in value or "}" in value:
if "{" in value or "}" in value:
# Validate template variable format
template_vars = [part.split("}")[0] for part in value.split("{")[1:]]
for var in template_vars:
@@ -302,6 +309,12 @@ class Task(BaseModel):
return md5("|".join(source).encode(), usedforsecurity=False).hexdigest()
@property
def execution_duration(self) -> float | None:
if not self.start_time or not self.end_time:
return None
return (self.end_time - self.start_time).total_seconds()
def execute_async(
self,
agent: BaseAgent | None = None,
@@ -342,7 +355,7 @@ class Task(BaseModel):
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
start_time = self._set_start_execution_time()
self.start_time = datetime.datetime.now()
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
self.prompt_context = context
@@ -392,15 +405,17 @@ class Task(BaseModel):
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(guardrail_result.result)
pydantic_output, json_output = self._export_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
task_output.json_dict = json_output
elif isinstance(guardrail_result.result, TaskOutput):
task_output = guardrail_result.result
self.output = task_output
self.end_time = datetime.datetime.now()
self._set_end_execution_time(start_time)
if self.callback:
self.callback(self.output)
@@ -412,7 +427,9 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)
@@ -432,13 +449,15 @@ class Task(BaseModel):
tasks_slices = [self.description, output]
return "\n".join(tasks_slices)
def interpolate_inputs(self, inputs: Dict[str, Union[str, int, float]]) -> None:
def interpolate_inputs_and_add_conversation_history(self, inputs: Dict[str, Union[str, int, float]]) -> None:
"""Interpolate inputs into the task description, expected output, and output file path.
Add conversation history if present.
Args:
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, and floats.
Raises:
ValueError: If a required template variable is missing from inputs.
"""
@@ -455,7 +474,9 @@ class Task(BaseModel):
try:
self.description = self._original_description.format(**inputs)
except KeyError as e:
raise ValueError(f"Missing required template variable '{e.args[0]}' in description") from e
raise ValueError(
f"Missing required template variable '{e.args[0]}' in description"
) from e
except ValueError as e:
raise ValueError(f"Error interpolating description: {str(e)}") from e
@@ -473,21 +494,49 @@ class Task(BaseModel):
)
except (KeyError, ValueError) as e:
raise ValueError(f"Error interpolating output_file path: {str(e)}") from e
if "crew_chat_messages" in inputs and inputs["crew_chat_messages"]:
# Fetch the conversation history instruction using self.i18n.slice
conversation_instruction = self.i18n.slice(
"conversation_history_instruction"
)
print("crew_chat_messages:", inputs["crew_chat_messages"])
def interpolate_only(self, input_string: Optional[str], inputs: Dict[str, Union[str, int, float]]) -> str:
# Ensure that inputs["crew_chat_messages"] is a string
crew_chat_messages_json = str(inputs["crew_chat_messages"])
try:
crew_chat_messages = json.loads(crew_chat_messages_json)
except json.JSONDecodeError as e:
print("An error occurred while parsing crew chat messages:", e)
raise
# Process the messages to build conversation history
conversation_history = "\n".join(
f"{msg['role'].capitalize()}: {msg['content']}"
for msg in crew_chat_messages
if isinstance(msg, dict) and "role" in msg and "content" in msg
)
# Add the instruction and conversation history to the description
self.description += f"\n\n{conversation_instruction}\n\n{conversation_history}"
def interpolate_only(
self, input_string: Optional[str], inputs: Dict[str, Union[str, int, float]]
) -> str:
"""Interpolate placeholders (e.g., {key}) in a string while leaving JSON untouched.
Args:
input_string: The string containing template variables to interpolate.
Can be None or empty, in which case an empty string is returned.
inputs: Dictionary mapping template variables to their values.
Supported value types are strings, integers, and floats.
If input_string is empty or has no placeholders, inputs can be empty.
Returns:
The interpolated string with all template variables replaced with their values.
Empty string if input_string is None or empty.
Raises:
ValueError: If a required template variable is missing from inputs.
KeyError: If a template variable is not found in the inputs dictionary.
@@ -497,13 +546,17 @@ class Task(BaseModel):
if "{" not in input_string and "}" not in input_string:
return input_string
if not inputs:
raise ValueError("Inputs dictionary cannot be empty when interpolating variables")
raise ValueError(
"Inputs dictionary cannot be empty when interpolating variables"
)
try:
# Validate input types
for key, value in inputs.items():
if not isinstance(value, (str, int, float)):
raise ValueError(f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}")
raise ValueError(
f"Value for key '{key}' must be a string, integer, or float, got {type(value).__name__}"
)
escaped_string = input_string.replace("{", "{{").replace("}", "}}")
@@ -512,7 +565,9 @@ class Task(BaseModel):
return escaped_string.format(**inputs)
except KeyError as e:
raise KeyError(f"Template variable '{e.args[0]}' not found in inputs dictionary") from e
raise KeyError(
f"Template variable '{e.args[0]}' not found in inputs dictionary"
) from e
except ValueError as e:
raise ValueError(f"Error during string interpolation: {str(e)}") from e
@@ -597,10 +652,10 @@ class Task(BaseModel):
def _save_file(self, result: Any) -> None:
"""Save task output to a file.
Args:
result: The result to save to the file. Can be a dict or any stringifiable object.
Raises:
ValueError: If output_file is not set
RuntimeError: If there is an error writing to the file
@@ -618,6 +673,7 @@ class Task(BaseModel):
with resolved_path.open("w", encoding="utf-8") as file:
if isinstance(result, dict):
import json
json.dump(result, file, ensure_ascii=False, indent=2)
else:
file.write(str(result))

View File

@@ -23,7 +23,8 @@
"summary": "This is a summary of our conversation so far:\n{merged_summary}",
"manager_request": "Your best answer to your coworker asking you this, accounting for the context shared.",
"formatted_task_instructions": "Ensure your final answer contains only the content in the following format: {output_format}\n\nEnsure the final output does not include any code block markers like ```json or ```python.",
"human_feedback_classification": "Determine if the following feedback indicates that the user is satisfied or if further changes are needed. Respond with 'True' if further changes are needed, or 'False' if the user is satisfied. **Important** Do not include any additional commentary outside of your 'True' or 'False' response.\n\nFeedback: \"{feedback}\""
"human_feedback_classification": "Determine if the following feedback indicates that the user is satisfied or if further changes are needed. Respond with 'True' if further changes are needed, or 'False' if the user is satisfied. **Important** Do not include any additional commentary outside of your 'True' or 'False' response.\n\nFeedback: \"{feedback}\"",
"conversation_history_instruction": "You are a member of a crew collaborating to achieve a common goal. Your task is a specific action that contributes to this larger objective. For additional context, please review the conversation history between you and the user that led to the initiation of this crew. Use any relevant information or feedback from the conversation to inform your task execution and ensure your response aligns with both the immediate task and the crew's overall goals."
},
"errors": {
"force_final_answer_error": "You can't keep going, this was the best you could do.\n {formatted_answer.text}",

View File

@@ -0,0 +1,44 @@
from typing import List, Optional
from pydantic import BaseModel, Field
class ChatInputField(BaseModel):
"""
Represents a single required input for the crew, with a name and short description.
Example:
{
"name": "topic",
"description": "The topic to focus on for the conversation"
}
"""
name: str = Field(..., description="The name of the input field")
description: str = Field(
...,
description="A short description of the input field",
)
class ChatInputs(BaseModel):
"""
Holds a high-level crew_description plus a list of ChatInputFields.
Example:
{
"crew_name": "topic-based-qa",
"crew_description": "Use this crew for topic-based Q&A",
"inputs": [
{"name": "topic", "description": "The topic to focus on"},
{"name": "username", "description": "Name of the user"},
]
}
"""
crew_name: str = Field(..., description="The name of the crew")
crew_description: str = Field(
...,
description="A description of the crew's purpose",
)
inputs: List[ChatInputField] = Field(
default_factory=list, description="A list of input fields for the crew"
)

View File

@@ -180,12 +180,12 @@ class CrewEvaluator:
self._test_result_span = self._telemetry.individual_test_result_span(
self.crew,
evaluation_result.pydantic.quality,
current_task._execution_time,
current_task.execution_duration,
self.openai_model_name,
)
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
self.run_execution_times[self.iteration].append(
current_task._execution_time
current_task.execution_duration
)
else:
raise ValueError("Evaluation result is not in the expected format")

View File

@@ -0,0 +1,215 @@
import os
from typing import Any, Dict, List, Optional, Union
from packaging import version
from crewai.cli.constants import DEFAULT_LLM_MODEL, ENV_VARS, LITELLM_PARAMS
from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
from crewai.llm import LLM
def create_llm(
llm_value: Union[str, LLM, Any, None] = None,
) -> Optional[LLM]:
"""
Creates or returns an LLM instance based on the given llm_value.
Args:
llm_value (str | LLM | Any | None):
- str: The model name (e.g., "gpt-4").
- LLM: Already instantiated LLM, returned as-is.
- Any: Attempt to extract known attributes like model_name, temperature, etc.
- None: Use environment-based or fallback default model.
Returns:
An LLM instance if successful, or None if something fails.
"""
# 1) If llm_value is already an LLM object, return it directly
if isinstance(llm_value, LLM):
return llm_value
# 2) If llm_value is a string (model name)
if isinstance(llm_value, str):
try:
created_llm = LLM(model=llm_value)
print(f"LLM created with model='{llm_value}'")
return created_llm
except Exception as e:
print(f"Failed to instantiate LLM with model='{llm_value}': {e}")
return None
# 3) If llm_value is None, parse environment variables or use default
if llm_value is None:
return _llm_via_environment_or_fallback()
# 4) Otherwise, attempt to extract relevant attributes from an unknown object
try:
# Extract attributes with explicit types
model = (
getattr(llm_value, "model_name", None)
or getattr(llm_value, "deployment_name", None)
or str(llm_value)
)
temperature: Optional[float] = getattr(llm_value, "temperature", None)
max_tokens: Optional[int] = getattr(llm_value, "max_tokens", None)
logprobs: Optional[int] = getattr(llm_value, "logprobs", None)
timeout: Optional[float] = getattr(llm_value, "timeout", None)
api_key: Optional[str] = getattr(llm_value, "api_key", None)
base_url: Optional[str] = getattr(llm_value, "base_url", None)
created_llm = LLM(
model=model,
temperature=temperature,
max_tokens=max_tokens,
logprobs=logprobs,
timeout=timeout,
api_key=api_key,
base_url=base_url,
)
print(
"LLM created with extracted parameters; "
f"model='{model}'"
)
return created_llm
except Exception as e:
print(f"Error instantiating LLM from unknown object type: {e}")
return None
def create_chat_llm() -> Optional[LLM]:
"""
Creates a Chat LLM with additional checks, such as verifying crewAI version
or reading from pyproject.toml. Then calls `create_llm(None, default_model)`.
Args:
default_model (str): Fallback model if not set in environment.
Returns:
An instance of LLM or None if instantiation fails.
"""
print("[create_chat_llm] Checking environment and version info...")
crewai_version = get_crewai_version()
min_required_version = "0.87.0" # Update to latest if needed
pyproject_data = read_toml()
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
print(
f"You are running an older version of crewAI ({crewai_version}) that uses poetry.\n"
"Please run `crewai update` to switch to uv-based builds."
)
# After checks, simply call create_llm with None (meaning "use env or fallback"):
return create_llm(None)
def _llm_via_environment_or_fallback() -> Optional[LLM]:
"""
Helper function: if llm_value is None, we load environment variables or fallback default model.
"""
model_name = (
os.environ.get("OPENAI_MODEL_NAME")
or os.environ.get("MODEL")
or DEFAULT_LLM_MODEL
)
# Initialize parameters with correct types
model: str = model_name
temperature: Optional[float] = None
max_tokens: Optional[int] = None
max_completion_tokens: Optional[int] = None
logprobs: Optional[int] = None
timeout: Optional[float] = None
api_key: Optional[str] = None
base_url: Optional[str] = None
api_version: Optional[str] = None
presence_penalty: Optional[float] = None
frequency_penalty: Optional[float] = None
top_p: Optional[float] = None
n: Optional[int] = None
stop: Optional[Union[str, List[str]]] = None
logit_bias: Optional[Dict[int, float]] = None
response_format: Optional[Dict[str, Any]] = None
seed: Optional[int] = None
top_logprobs: Optional[int] = None
callbacks: List[Any] = []
# Optional base URL from env
api_base = os.environ.get("OPENAI_API_BASE") or os.environ.get("OPENAI_BASE_URL")
if api_base:
base_url = api_base
# Initialize llm_params dictionary
llm_params: Dict[str, Any] = {
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,
"max_completion_tokens": max_completion_tokens,
"logprobs": logprobs,
"timeout": timeout,
"api_key": api_key,
"base_url": base_url,
"api_version": api_version,
"presence_penalty": presence_penalty,
"frequency_penalty": frequency_penalty,
"top_p": top_p,
"n": n,
"stop": stop,
"logit_bias": logit_bias,
"response_format": response_format,
"seed": seed,
"top_logprobs": top_logprobs,
"callbacks": callbacks,
}
UNACCEPTED_ATTRIBUTES = [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_REGION_NAME",
]
set_provider = model_name.split("/")[0] if "/" in model_name else "openai"
if set_provider in ENV_VARS:
for env_var in ENV_VARS[set_provider]:
key_name = env_var.get("key_name")
if key_name and key_name not in UNACCEPTED_ATTRIBUTES:
env_value = os.environ.get(key_name)
if env_value:
# Map environment variable names to recognized parameters
param_key = _normalize_key_name(key_name.lower())
llm_params[param_key] = env_value
elif isinstance(env_var, dict):
if env_var.get("default", False):
for key, value in env_var.items():
if key not in ["prompt", "key_name", "default"]:
if key in os.environ:
llm_params[key] = os.environ[key]
else:
print(f"Expected env_var to be a dictionary, but got {type(env_var)}")
# Remove None values
llm_params = {k: v for k, v in llm_params.items() if v is not None}
# Try creating the LLM
try:
new_llm = LLM(**llm_params)
print(f"LLM created with model='{model_name}'")
return new_llm
except Exception as e:
print(f"Error instantiating LLM from environment/fallback: {type(e).__name__}: {e}")
return None
def _normalize_key_name(key_name: str) -> str:
"""
Maps environment variable names to recognized litellm parameter keys,
using patterns from LITELLM_PARAMS.
"""
for pattern in LITELLM_PARAMS:
if pattern in key_name:
return pattern
return key_name

View File

@@ -1,4 +1,5 @@
import warnings
from typing import Any, Dict, Optional
from litellm.integrations.custom_logger import CustomLogger
from litellm.types.utils import Usage
@@ -7,10 +8,16 @@ from crewai.agents.agent_builder.utilities.base_token_process import TokenProces
class TokenCalcHandler(CustomLogger):
def __init__(self, token_cost_process: TokenProcess):
def __init__(self, token_cost_process: Optional[TokenProcess]):
self.token_cost_process = token_cost_process
def log_success_event(self, kwargs, response_obj, start_time, end_time):
def log_success_event(
self,
kwargs: Dict[str, Any],
response_obj: Dict[str, Any],
start_time: float,
end_time: float,
) -> None:
if self.token_cost_process is None:
return

View File

@@ -0,0 +1,146 @@
interactions:
- request:
body: '{"messages": [{"role": "system", "content": "You are Researcher. You''re
an expert researcher, specialized in technology, software engineering, AI and
startups. You work as a freelancer and is now working on doing research and
analysis for a new customer.\nYour personal goal is: Make the best research
and analysis on content about AI and AI agents\nTo give my best complete final
answer to the task use the exact following format:\n\nThought: I now can give
a great answer\nFinal Answer: Your final answer must be the great and the most
complete as possible, it must be outcome described.\n\nI MUST use these formats,
my job depends on it!"}, {"role": "user", "content": "\nCurrent Task: Give me
a list of 5 interesting ideas to explore for na article, what makes them unique
and interesting.\n\nThis is the expect criteria for your final answer: Bullet
point list of 5 interesting ideas.\nyou MUST return the actual complete content
as the final answer, not a summary.\n\nBegin! This is VERY important to you,
use the tools available and give your best Final Answer, your job depends on
it!\n\nThought:"}], "model": "gpt-4o-mini", "stop": ["\nObservation:"], "stream":
false}'
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '1177'
content-type:
- application/json
host:
- api.openai.com
user-agent:
- OpenAI/Python 1.52.1
x-stainless-arch:
- arm64
x-stainless-async:
- 'false'
x-stainless-lang:
- python
x-stainless-os:
- MacOS
x-stainless-package-version:
- 1.52.1
x-stainless-raw-response:
- 'true'
x-stainless-retry-count:
- '0'
x-stainless-runtime:
- CPython
x-stainless-runtime-version:
- 3.12.7
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
content: "{\n \"id\": \"chatcmpl-AlfwrGToOoVtDhb3ryZMpA07aZy4m\",\n \"object\":
\"chat.completion\",\n \"created\": 1735926029,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: \\n- **The Role of Emotional Intelligence in AI Agents**: Explore how
developing emotional intelligence in AI can change user interactions. Investigate
algorithms that enable AI agents to recognize and respond to human emotions,
enhancing user experience in sectors such as therapy, customer service, and
education. This idea is unique as it blends psychology with artificial intelligence,
presenting a new frontier for AI applications.\\n\\n- **AI Agents in Problem-Solving
for Climate Change**: Analyze how AI agents can contribute to developing innovative
solutions for climate change challenges. Focus on their role in predicting climate
patterns, optimizing energy consumption, and managing resources more efficiently.
This topic is unique because it highlights the practical impact of AI on one
of the most pressing global issues.\\n\\n- **The Ethics of Autonomous Decision-Making
AI**: Delve into the ethical implications surrounding AI agents that make autonomous
decisions, especially in critical areas like healthcare, transportation, and
law enforcement. This idea raises questions about accountability and bias, making
it a vital discussion point as AI continues to advance. The unique aspect lies
in the intersection of technology and moral philosophy.\\n\\n- **AI Agents Shaping
the Future of Remote Work**: Investigate how AI agents are transforming remote
work environments through automation, communication facilitation, and performance
monitoring. Discuss unique applications such as virtual assistants, project
management tools, and AI-driven team collaboration platforms. This topic is
particularly relevant as the workforce becomes increasingly remote, making it
an appealing area of exploration.\\n\\n- **Cultural Impacts of AI Agents in
Media and Entertainment**: Examine how AI-driven characters and narratives are
changing the media landscape, from video games to films and animations. Analyze
audience reception and the role of AI in personalizing content. This concept
is unique due to its intersection with digital culture and artistic expression,
offering insights into how technology influences social norms and preferences.\",\n
\ \"refusal\": null\n },\n \"logprobs\": null,\n \"finish_reason\":
\"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": 220,\n \"completion_tokens\":
376,\n \"total_tokens\": 596,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n
\ \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"system_fingerprint\":
\"fp_0aa8d3e20b\"\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 8fc4c6324d42ad5a-POA
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Fri, 03 Jan 2025 17:40:34 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=zdRUS9YIvR7oCmJGeB7BOAnmxI7FOE5Jae5yRZDCnPE-1735926034-1.0.1.1-gvIEXrMfT69wL2mv4ApivWX67OOpDegjf1LE6g9u3GEDuQdLQok.vlLZD.SdGzK0bMug86JZhBeDZMleJlI2EQ;
path=/; expires=Fri, 03-Jan-25 18:10:34 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=CW_cKQGYWY3cL.S6Xo5z0cmkmWHy5Q50OA_KjPEijNk-1735926034530-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
access-control-expose-headers:
- X-Request-ID
alt-svc:
- h3=":443"; ma=86400
openai-organization:
- crewai-iuxna1
openai-processing-ms:
- '5124'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=31536000; includeSubDomains; preload
x-ratelimit-limit-requests:
- '30000'
x-ratelimit-limit-tokens:
- '150000000'
x-ratelimit-remaining-requests:
- '29999'
x-ratelimit-remaining-tokens:
- '149999729'
x-ratelimit-reset-requests:
- 2ms
x-ratelimit-reset-tokens:
- 0s
x-request-id:
- req_95ae59da1099e02c0d95bf25ba179fed
http_version: HTTP/1.1
status_code: 200
version: 1

View File

@@ -3087,6 +3087,28 @@ def test_hierarchical_verbose_false_manager_agent():
assert not crew.manager_agent.verbose
def test_fetch_inputs():
agent = Agent(
role="{role_detail} Researcher",
goal="Research on {topic}.",
backstory="Expert in {field}.",
)
task = Task(
description="Analyze the data on {topic}.",
expected_output="Summary of {topic} analysis.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
expected_placeholders = {"role_detail", "topic", "field"}
actual_placeholders = crew.fetch_inputs()
assert (
actual_placeholders == expected_placeholders
), f"Expected {expected_placeholders}, but got {actual_placeholders}"
def test_task_tools_preserve_code_execution_tools():
"""
Test that task tools don't override code execution tools when allow_code_execution=True

View File

@@ -936,3 +936,29 @@ def test_output_file_validation():
expected_output="Test output",
output_file="{invalid-name}/output.txt",
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_task_execution_times():
researcher = Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
allow_delegation=False,
)
task = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 interesting ideas.",
agent=researcher,
)
assert task.start_time is None
assert task.end_time is None
assert task.execution_duration is None
task.execute_sync(agent=researcher)
assert task.start_time is not None
assert task.end_time is not None
assert task.execution_duration == (task.end_time - task.start_time).total_seconds()