core loop should be working and ready for testing.

This commit is contained in:
Brandon Hancock
2024-12-26 14:18:42 -05:00
parent 1c45f730c6
commit 2bf5b15f1e
7 changed files with 402 additions and 169 deletions

View File

@@ -10,6 +10,7 @@ from crewai import (
from crewai.cli.add_crew_to_flow import add_crew_to_flow
from crewai.cli.create_crew import create_crew
from crewai.cli.create_flow import create_flow
from crewai.cli.crew_chat import run_chat
from crewai.cli.fetch_chat_llm import fetch_chat_llm
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
@@ -18,7 +19,6 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
from .authentication.main import AuthenticationCommand
from .deploy.main import DeployCommand
from .evaluate_crew import evaluate_crew
from .fetch_crew_inputs import fetch_crew_inputs
from .install_crew import install_crew
from .kickoff_flow import kickoff_flow
from .plot_flow import plot_flow
@@ -351,123 +351,10 @@ def flow_add_crew(crew_name):
@crewai.command()
def chat():
"""
Start a conversation with the Crew, collecting user-supplied inputs
only if needed. This is a demo of a 'chat' flow.
Start a conversation with the Crew, collecting user-supplied inputs,
and using the Chat LLM to generate responses.
"""
click.secho("Welcome to CrewAI Chat!", fg="green")
# --------------------------------------------------------------------------
# 1) Attempt to fetch Crew inputs
# --------------------------------------------------------------------------
click.secho("Gathering crew inputs via `fetch_crew_inputs()`...", fg="cyan")
try:
crew_inputs = fetch_crew_inputs()
except Exception as e:
# If an error occurs, we print it and halt.
click.secho(f"Error fetching crew inputs: {e}", fg="red")
return
# If crew_inputs is empty, that's fine. We'll proceed anyway.
click.secho(
f"Found placeholders (possibly empty): {sorted(list(crew_inputs))}", fg="yellow"
)
# --------------------------------------------------------------------------
# 2) Retrieve the Chat LLM
# --------------------------------------------------------------------------
click.secho("Fetching the Chat LLM...", fg="cyan")
try:
chat_llm = fetch_chat_llm()
except Exception as e:
click.secho(f"Failed to retrieve Chat LLM: {e}", fg="red")
return
if not chat_llm:
click.secho("No valid Chat LLM returned. Exiting.", fg="red")
return
# --------------------------------------------------------------------------
# 3) Simple chat loop (demo)
# --------------------------------------------------------------------------
click.secho(
"\nEntering interactive chat loop. Type 'exit' or Ctrl+C to quit.\n", fg="cyan"
)
while True:
try:
user_input = click.prompt("You", type=str)
if user_input.strip().lower() in ["exit", "quit"]:
click.echo("Exiting chat. Goodbye!")
break
# For demonstration, we'll call the LLM directly on the user input:
response = chat_llm.generate(user_input)
click.secho(f"\nAI: {response}\n", fg="green")
except (KeyboardInterrupt, EOFError):
click.echo("\nExiting chat. Goodbye!")
break
except Exception as e:
click.secho(f"Error occurred while generating chat response: {e}", fg="red")
break
def load_crew_and_find_inputs(file_path: str) -> Tuple[Optional[Crew], set]:
"""
Attempt to load a Crew from the provided file path or default location.
Then gather placeholders from tasks. Returns (crew, set_of_placeholders).
"""
crew = None
placeholders_found = set()
# 1) If file_path is not provided, attempt to detect the default crew config.
if not file_path:
# This is naive detection logic.
# A real implementation might search typical locations like ./
# or src/<project_name>/config/ for a crew configuration.
default_candidate = "crew.yaml"
if os.path.exists(default_candidate):
file_path = default_candidate
# 2) Try to load the crew from file if file_path exists
if file_path and os.path.isfile(file_path):
# Pseudocode for loading a crew from file—may vary depending on how the users config is stored
try:
# For demonstration, we do something like:
# with open(file_path, "r") as f:
# content = f.read()
# crew_data = parse_yaml_crew(content)
# crew = Crew(**crew_data)
# Placeholder logic below:
crew = Crew(name="ExampleCrew")
except Exception as e:
click.secho(f"Error loading Crew from {file_path}: {e}", fg="red")
raise e
if crew:
# 3) Inspect crew tasks for placeholders
# For each Task, we gather placeholders used in description/expected_output
for task in crew.tasks:
placeholders_in_desc = extract_placeholders(task.description)
placeholders_in_out = extract_placeholders(task.expected_output)
placeholders_found.update(placeholders_in_desc)
placeholders_found.update(placeholders_in_out)
return crew, placeholders_found
def extract_placeholders(text: str) -> set:
"""
Given a string, find all placeholders of the form {something} that might be used for input interpolation.
This is a naive example—actual logic might do advanced parsing to avoid curly braces used in JSON.
"""
import re
if not text:
return set()
pattern = r"\{([a-zA-Z0-9_]+)\}"
matches = re.findall(pattern, text)
return set(matches)
run_chat()
if __name__ == "__main__":

172
src/crewai/cli/crew_chat.py Normal file
View File

@@ -0,0 +1,172 @@
import json
import subprocess
from typing import cast
import click
from crewai.cli.fetch_chat_llm import fetch_chat_llm
from crewai.cli.fetch_crew_inputs import fetch_crew_inputs
from crewai.types.crew_chat import ChatInputs
def run_chat():
"""
Runs an interactive chat loop using the Crew's chat LLM with function calling.
Incorporates crew_name, crew_description, and input fields to build a tool schema.
Exits if crew_name or crew_description are missing.
"""
click.secho("Welcome to CrewAI Chat with Function-Calling!", fg="green")
# 1) Fetch CrewInputs
click.secho("Gathering crew inputs via `fetch_crew_inputs()`...", fg="cyan")
try:
crew_inputs: ChatInputs = fetch_crew_inputs()
except Exception as e:
click.secho(f"Error fetching crew inputs: {e}", fg="red")
return
# Check for mandatory fields
if not crew_inputs.crew_name:
click.secho("Error: Crew name is missing. Exiting.", fg="red")
return
if not crew_inputs.crew_description:
click.secho("Error: Crew description is missing. Exiting.", fg="red")
return
# 2) Generate a tool schema from the crew inputs
crew_tool_schema = generate_crew_tool_schema(crew_inputs)
# 3) Build initial system message
required_fields_str = (
", ".join(
f"{field.name} (desc: {field.description or 'n/a'})"
for field in crew_inputs.inputs
)
or "(No required fields detected)"
)
system_message = (
"You are a helpful AI assistant for the CrewAI platform. "
"You have a function (tool) you can call by name if you have all required inputs. "
f"Those required inputs are: {required_fields_str}. "
"Once you have them, call the function. "
"Please keep your responses concise and friendly."
f"\nCrew Name: {crew_inputs.crew_name}"
f"\nCrew Description: {crew_inputs.crew_description}"
)
messages = [
{"role": "system", "content": system_message},
]
# 4) Retrieve ChatLLM
click.secho("\nFetching the Chat LLM...", fg="cyan")
try:
chat_llm = fetch_chat_llm()
except Exception as e:
click.secho(f"Failed to retrieve Chat LLM: {e}", fg="red")
return
if not chat_llm:
click.secho("No valid Chat LLM returned. Exiting.", fg="red")
return
# 5) Prepare available_functions for the callback dictionary
available_functions = {
crew_inputs.crew_name: run_crew_tool, # The LLM can call run_crew_tool using the crew's name
}
click.secho(
"\nEntering an interactive chat loop with function-calling.\n"
"Type 'exit' or Ctrl+C to quit.\n",
fg="cyan",
)
# 6) Main chat loop
while True:
try:
user_input = click.prompt("You: ", type=str)
if user_input.strip().lower() in ["exit", "quit"]:
click.echo("Exiting chat. Goodbye!")
break
# Append user message
messages.append({"role": "user", "content": user_input})
# Invoke the LLM, passing tools and available_functions
final_response = chat_llm.call(
messages=messages,
tools=[crew_tool_schema],
available_functions=available_functions,
)
# Append the final assistant response and print
messages.append({"role": "assistant", "content": final_response})
click.secho(f"\nAI: {final_response}\n", fg="green")
except (KeyboardInterrupt, EOFError):
click.echo("\nExiting chat. Goodbye!")
break
except Exception as e:
click.secho(f"Error occurred: {e}", fg="red")
break
def generate_crew_tool_schema(crew_inputs: ChatInputs) -> dict:
"""
Dynamically build a Littellm 'function' schema for the given crew.
crew_name: The name of the crew (used for the function 'name').
crew_inputs: A ChatInputs object containing crew_description
and a list of input fields (each with a name & description).
"""
properties = {}
for field in crew_inputs.inputs:
properties[field.name] = {
"type": "string",
"description": field.description or "No description provided",
}
required_fields = [field.name for field in crew_inputs.inputs]
return {
"type": "function",
"function": {
"name": crew_inputs.crew_name,
"description": crew_inputs.crew_description or "No crew description",
"parameters": {
"type": "object",
"properties": properties,
"required": required_fields,
},
},
}
def run_crew_tool(**kwargs) -> str:
"""
Subprocess-based function that:
1) Calls 'uv run run_crew' (which in turn calls your crew's 'run()' in main.py)
2) Passes the LLM-provided kwargs as CLI overrides (e.g. --key=value).
"""
import subprocess
command = ["uv", "run", "run_crew"]
# Convert LLM arguments to --key=value CLI params
for key, value in kwargs.items():
val_str = str(value)
command.append(f"--{key}={val_str}")
try:
# Capture stdout so we can return it to the LLM
result = subprocess.run(command, capture_output=True, text=True, check=True)
stdout_str = result.stdout.strip()
return stdout_str if stdout_str else "No output from run_crew command."
except subprocess.CalledProcessError as e:
return (
f"Error: Command failed with exit code {e.returncode}\n"
f"STDERR:\n{e.stderr}\nSTDOUT:\n{e.stdout}"
)
except Exception as e:
return f"Unexpected error running crew: {e}"

View File

@@ -6,23 +6,26 @@ from packaging import version
from crewai.cli.utils import read_toml
from crewai.cli.version import get_crewai_version
from crewai.types.crew_chat import ChatInputs
def fetch_crew_inputs() -> set[str]:
def fetch_crew_inputs() -> ChatInputs:
"""
Fetch placeholders/inputs for the crew by running 'uv run fetch_inputs'.
This captures stdout (which is now expected to be JSON),
parses it into a Python list/set, and returns it.
"""
command = ["uv", "run", "fetch_inputs"]
placeholders = set()
Fetch the crew's ChatInputs (a structure containing crew_description and input fields)
by running "uv run fetch_chat_inputs", which prints JSON representing a ChatInputs object.
This function will parse that JSON and return a ChatInputs instance.
If the output is empty or invalid, an empty ChatInputs object is returned.
"""
command = ["uv", "run", "fetch_chat_inputs"]
crewai_version = get_crewai_version()
min_required_version = "0.87.0" # TODO: Update to latest version when cut
min_required_version = "0.87.0"
pyproject_data = read_toml()
crew_name = pyproject_data.get("project", {}).get("name", None)
# Check for old poetry-based setups
# If you're on an older poetry-based setup and version < min_required_version
if pyproject_data.get("tool", {}).get("poetry") and (
version.parse(crewai_version) < version.parse(min_required_version)
):
@@ -34,17 +37,25 @@ def fetch_crew_inputs() -> set[str]:
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
# The entire stdout should now be a JSON array of placeholders (e.g. ["topic","username",...])
stdout_str = result.stdout.strip()
if stdout_str:
try:
placeholders_list = json.loads(stdout_str)
if isinstance(placeholders_list, list):
placeholders = set(placeholders_list)
except json.JSONDecodeError:
click.echo("Unable to parse JSON from `fetch_inputs` output.", err=True)
if not stdout_str:
return ChatInputs(crew_name=crew_name)
try:
raw_data = json.loads(stdout_str)
chat_inputs = ChatInputs(**raw_data)
if crew_name:
chat_inputs.crew_name = crew_name
return chat_inputs
except json.JSONDecodeError as e:
click.echo(
f"Unable to parse JSON from `fetch_chat_inputs` output: {e}", err=True
)
return ChatInputs(crew_name=crew_name)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while fetching inputs: {e}", err=True)
click.echo(f"An error occurred while fetching chat inputs: {e}", err=True)
click.echo(e.output, err=True, nl=True)
if pyproject_data.get("tool", {}).get("poetry"):
@@ -53,8 +64,7 @@ def fetch_crew_inputs() -> set[str]:
"Please run `crewai update` to update your pyproject.toml to use uv.",
fg="yellow",
)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)
return placeholders
return ChatInputs(crew_name=crew_name)

View File

@@ -15,12 +15,30 @@ warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
def run():
"""
Run the crew.
Run the crew, allowing CLI overrides for required inputs.
Usage example:
uv run run_crew -- --topic="New Topic" --some_other_field="Value"
"""
# Default inputs
inputs = {
'topic': 'AI LLMs'
# Add any other default fields here
}
{{crew_name}}().crew().kickoff(inputs=inputs)
# 1) Gather overrides from sys.argv
# sys.argv might look like: ['run_crew', '--topic=NewTopic']
# But be aware that if you're calling "uv run run_crew", sys.argv might have
# additional items. So we typically skip the first 1 or 2 items to get only overrides.
overrides = parse_cli_overrides(sys.argv[1:])
# 2) Merge the overrides into defaults
inputs.update(overrides)
# 3) Kick off the crew with final inputs
try:
{{crew_name}}().crew().kickoff(inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while running the crew: {e}")
def train():
@@ -107,3 +125,44 @@ def fetch_chat_llm():
print(json.dumps({}))
except Exception as e:
raise Exception(f"An error occurred while fetching chat LLM: {e}")
# TODO: Talk to Joao about making using LLM calls to analyze the crew
# and generate all of this information automatically
def fetch_chat_inputs():
"""
Command that fetches the 'chat_inputs' property from the Crew,
and prints it as JSON to stdout.
"""
try:
crew = {{crew_name}}().crew()
raw_chat_inputs = getattr(crew, "chat_inputs", None)
if raw_chat_inputs:
# Convert to dictionary to print JSON
print(json.dumps(raw_chat_inputs.model_dump()))
else:
# If crew.chat_inputs is None or empty, print an empty JSON
print(json.dumps({}))
except Exception as e:
raise Exception(f"An error occurred while fetching chat inputs: {e}")
def parse_cli_overrides(args_list) -> dict:
"""
Parse arguments in the form of --key=value from a list of CLI arguments.
Return them as a dict. For example:
['--topic=AI LLMs', '--username=John'] => {'topic': 'AI LLMs', 'username': 'John'}
"""
overrides = {}
for arg in args_list:
if arg.startswith("--"):
# remove the leading --
trimmed = arg[2:]
if "=" in trimmed:
key, val = trimmed.split("=", 1)
overrides[key] = val
else:
# If someone passed something like --topic (no =),
# either handle differently or ignore
pass
return overrides

View File

@@ -36,6 +36,7 @@ from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.types.crew_chat import ChatInputs
from crewai.types.usage_metrics import UsageMetrics
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINING_DATA_FILE
@@ -207,6 +208,10 @@ class Crew(BaseModel):
default=None,
description="LLM used to handle chatting with the crew.",
)
chat_inputs: Optional[ChatInputs] = Field(
default=None,
description="Holds descriptions of the crew as well as named inputs for chat usage.",
)
_knowledge: Optional[Knowledge] = PrivateAttr(
default=None,
)

View File

@@ -1,3 +1,4 @@
import json
import logging
import os
import sys
@@ -7,7 +8,7 @@ from contextlib import contextmanager
from typing import Any, Dict, List, Optional, Union
import litellm
from litellm import get_supported_openai_params
from litellm import ModelResponse, get_supported_openai_params
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
@@ -21,6 +22,7 @@ class FilteredStream:
def write(self, s) -> int:
with self._lock:
# Filter out extraneous messages from LiteLLM
if (
"Give Feedback / Get Help: https://github.com/BerriAI/litellm/issues/new"
in s
@@ -80,11 +82,9 @@ def suppress_warnings():
old_stderr = sys.stderr
sys.stdout = FilteredStream(old_stdout)
sys.stderr = FilteredStream(old_stderr)
try:
yield
finally:
# Restore stdout and stderr
sys.stdout = old_stdout
sys.stderr = old_stderr
@@ -135,8 +135,10 @@ class LLM:
self.context_window_size = 0
self.kwargs = kwargs
# For safety, we disable passing init params to next calls
litellm.drop_params = True
litellm.set_verbose = False
self.set_callbacks(callbacks)
self.set_env_callbacks()
@@ -173,8 +175,6 @@ class LLM:
Create an LLM instance from a dict.
We assume the dict has all relevant keys that match what's in the constructor.
"""
# We can pop off fields we know, then pass the rest into **kwargs
# so that any leftover keys still get passed into the LLM constructor.
known_fields = {}
known_fields["model"] = data.pop("model", None)
known_fields["timeout"] = data.pop("timeout", None)
@@ -196,15 +196,37 @@ class LLM:
known_fields["api_key"] = data.pop("api_key", None)
known_fields["callbacks"] = data.pop("callbacks", None)
# leftover keys go into kwargs:
return cls(**known_fields, **data)
def call(self, messages: List[Dict[str, str]], callbacks: List[Any] = []) -> str:
def call(
self,
messages: List[Dict[str, str]],
tools: Optional[List[dict]] = None,
callbacks: Optional[List[Any]] = None,
available_functions: Optional[Dict[str, Any]] = None,
) -> str:
"""
High-level call method that:
1) Calls litellm.completion
2) Checks for function/tool calls
3) If tool calls found:
a) executes each function
b) appends their output as tool messages
c) calls litellm.completion again with the updated messages
4) Returns the final text response
:param messages: The conversation messages
:param tools: Optional list of function schemas for function calling
:param callbacks: Optional list of callbacks
:param available_functions: A dictionary mapping function_name -> actual Python function
:return: Final text response from the LLM
"""
with suppress_warnings():
if callbacks and len(callbacks) > 0:
if callbacks:
self.set_callbacks(callbacks)
try:
# --- 1) Make first completion call
params = {
"model": self.model,
"messages": messages,
@@ -225,21 +247,71 @@ class LLM:
"api_version": self.api_version,
"api_key": self.api_key,
"stream": False,
"tools": tools, # pass the tool schema
**self.kwargs,
}
# Remove None values to avoid passing unnecessary parameters
# remove None values
params = {k: v for k, v in params.items() if v is not None}
response = litellm.completion(**params)
return response["choices"][0]["message"]["content"]
response_message = response.choices[0].message
text_response = response_message.content or ""
tool_calls = getattr(response_message, "tool_calls", [])
# --- 2) If no tool calls, we can just return
if not tool_calls or not available_functions:
return text_response
# --- 3) We have tool calls and a dictionary of available functions
# run them, append output to messages
for tool_call in tool_calls:
function_name = tool_call.function.name
if function_name in available_functions:
# parse arguments
function_args = {}
try:
function_args = json.loads(tool_call.function.arguments)
except Exception as e:
logging.warning(f"Failed to parse function arguments: {e}")
fn = available_functions[function_name]
# call the actual tool function
result = fn(**function_args)
# append the "tool" response to messages
messages.append(
{
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(result),
}
)
else:
logging.warning(
f"Tool call requested unknown function '{function_name}'"
)
# --- 4) Make a second call so the LLM can incorporate the tool results
second_params = dict(params) # copy the same params
second_params["messages"] = messages
# We'll remove "tools" from second call, or keep it if you want more calls
# but typically you keep it in case it wants additional calls
second_response = litellm.completion(**second_params)
second_msg = second_response.choices[0].message
final_response = second_msg.content or ""
return final_response
except Exception as e:
# check if context length was exceeded, otherwise log
if not LLMContextLengthExceededException(
str(e)
)._is_context_limit_error(str(e)):
logging.error(f"LiteLLM call failed: {str(e)}")
raise # Re-raise the exception after logging
# re-raise
raise
def supports_function_calling(self) -> bool:
try:
@@ -258,7 +330,10 @@ class LLM:
return False
def get_context_window_size(self) -> int:
# Only using 75% of the context window size to avoid cutting the message in the middle
"""
Returns the context window size, using 75% of the maximum to avoid
cutting off messages mid-thread.
"""
if self.context_window_size != 0:
return self.context_window_size
@@ -271,6 +346,10 @@ class LLM:
return self.context_window_size
def set_callbacks(self, callbacks: List[Any]):
"""
Attempt to keep a single set of callbacks in litellm by removing old
duplicates and adding new ones.
"""
callback_types = [type(callback) for callback in callbacks]
for callback in litellm.success_callback[:]:
if type(callback) in callback_types:
@@ -285,34 +364,19 @@ class LLM:
def set_env_callbacks(self):
"""
Sets the success and failure callbacks for the LiteLLM library from environment variables.
This method reads the `LITELLM_SUCCESS_CALLBACKS` and `LITELLM_FAILURE_CALLBACKS`
environment variables, which should contain comma-separated lists of callback names.
It then assigns these lists to `litellm.success_callback` and `litellm.failure_callback`,
respectively.
If the environment variables are not set or are empty, the corresponding callback lists
will be set to empty lists.
Example:
LITELLM_SUCCESS_CALLBACKS="langfuse,langsmith"
LITELLM_FAILURE_CALLBACKS="langfuse"
This will set `litellm.success_callback` to ["langfuse", "langsmith"] and
`litellm.failure_callback` to ["langfuse"].
"""
success_callbacks_str = os.environ.get("LITELLM_SUCCESS_CALLBACKS", "")
success_callbacks = []
if success_callbacks_str:
success_callbacks = [
callback.strip() for callback in success_callbacks_str.split(",")
cb.strip() for cb in success_callbacks_str.split(",") if cb.strip()
]
failure_callbacks_str = os.environ.get("LITELLM_FAILURE_CALLBACKS", "")
failure_callbacks = []
if failure_callbacks_str:
failure_callbacks = [
callback.strip() for callback in failure_callbacks_str.split(",")
cb.strip() for cb in failure_callbacks_str.split(",") if cb.strip()
]
litellm.success_callback = success_callbacks

View File

@@ -0,0 +1,36 @@
from typing import List, Optional
from pydantic import BaseModel, Field
class ChatInputField(BaseModel):
"""
Represents a single required input for the crew, with a name and short description.
Example:
{
"name": "topic",
"description": "The topic to focus on for the conversation"
}
"""
name: str
description: Optional[str] = None
class ChatInputs(BaseModel):
"""
Holds a high-level crew_description plus a list of ChatInputFields.
Example:
{
"crew_name": "topic-based-qa",
"crew_description": "Use this crew for topic-based Q&A",
"inputs": [
{"name": "topic", "description": "The topic to focus on"},
{"name": "username", "description": "Name of the user"},
]
}
"""
crew_name: Optional[str] = Field(default="Crew")
crew_description: Optional[str] = None
inputs: List[ChatInputField] = Field(default_factory=list)