mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-05-14 05:28:12 +00:00
Compare commits
3 Commits
1.14.5a5
...
docs/custo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3843432cd | ||
|
|
264da8245a | ||
|
|
f2960ccaaf |
2457
docs/docs.json
2457
docs/docs.json
File diff suppressed because it is too large
Load Diff
139
docs/en/guides/tools/platform-tools-cli.mdx
Normal file
139
docs/en/guides/tools/platform-tools-cli.mdx
Normal file
@@ -0,0 +1,139 @@
|
||||
---
|
||||
title: Platform Tools CLI
|
||||
description: Create, publish, and install custom tools on the CrewAI platform using the CLI.
|
||||
icon: terminal
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
## Overview
|
||||
|
||||
The CrewAI CLI provides commands to manage custom tools on the **CrewAI platform** — a hosted tool registry that lets you share tools within your organization and across the community without publishing to PyPI.
|
||||
|
||||
| Command | Purpose |
|
||||
|---------|---------|
|
||||
| `crewai tool create <handle>` | Scaffold a new tool project |
|
||||
| `crewai tool publish` | Publish the tool to the CrewAI platform |
|
||||
| `crewai tool install <handle>` | Install a platform tool into your crew project |
|
||||
|
||||
<Note type="info" title="Platform vs PyPI">
|
||||
These commands manage tools on the **CrewAI platform registry**. If you want to publish a standalone Python package to PyPI instead, see the [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) guide.
|
||||
</Note>
|
||||
|
||||
## Prerequisites
|
||||
|
||||
- **CrewAI CLI** installed (`pip install crewai`)
|
||||
- **Authenticated** with the platform — run `crewai login` first
|
||||
|
||||
---
|
||||
|
||||
## Step 1: Create a Tool Project
|
||||
|
||||
Scaffold a new tool project:
|
||||
|
||||
```bash
|
||||
crewai tool create my_custom_tool
|
||||
```
|
||||
|
||||
This generates a project structure with the boilerplate you need to start building your tool.
|
||||
|
||||
<Tip>
|
||||
The `handle` is the unique identifier for your tool on the platform. Choose something descriptive and specific to what the tool does.
|
||||
</Tip>
|
||||
|
||||
### Implement Your Tool
|
||||
|
||||
Edit the generated tool file to add your logic. The tool follows the standard CrewAI tools contract — you can subclass `BaseTool` or use the `@tool` decorator:
|
||||
|
||||
```python
|
||||
from crewai.tools import BaseTool
|
||||
|
||||
class MyCustomTool(BaseTool):
|
||||
name: str = "My Custom Tool"
|
||||
description: str = "Description of what this tool does — be specific so agents know when to use it."
|
||||
|
||||
def _run(self, argument: str) -> str:
|
||||
# Your tool logic here
|
||||
return "result"
|
||||
```
|
||||
|
||||
For the full tools API reference (input schemas, caching, async support, error handling), see the [Create Custom Tools](/en/learn/create-custom-tools) guide.
|
||||
|
||||
---
|
||||
|
||||
## Step 2: Publish to the Platform
|
||||
|
||||
From your tool project directory, publish it to the CrewAI platform:
|
||||
|
||||
```bash
|
||||
crewai tool publish
|
||||
```
|
||||
|
||||
### Visibility Options
|
||||
|
||||
| Flag | Description |
|
||||
|------|-------------|
|
||||
| `--public` | Make the tool available to all platform users |
|
||||
| `--private` | Restrict visibility to your organization |
|
||||
| `--force` | Bypass Git remote validations |
|
||||
|
||||
```bash
|
||||
# Publish as a public tool
|
||||
crewai tool publish --public
|
||||
|
||||
# Publish privately (organization only)
|
||||
crewai tool publish --private
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Step 3: Install a Platform Tool
|
||||
|
||||
To install a tool that's been published to the platform:
|
||||
|
||||
```bash
|
||||
crewai tool install my_custom_tool
|
||||
```
|
||||
|
||||
Once installed, you can use the tool in your crew like any other tool — assign it to an agent via the `tools` parameter.
|
||||
|
||||
---
|
||||
|
||||
## Full Lifecycle Example
|
||||
|
||||
```bash
|
||||
# 1. Authenticate with the platform
|
||||
crewai login
|
||||
|
||||
# 2. Scaffold a new tool
|
||||
crewai tool create weather_lookup
|
||||
|
||||
# 3. Implement your logic in the generated project
|
||||
cd weather_lookup
|
||||
# ... edit the tool file ...
|
||||
|
||||
# 4. Publish to the platform
|
||||
crewai tool publish --public
|
||||
|
||||
# 5. In another project, install and use it
|
||||
crewai tool install weather_lookup
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Platform Tools vs PyPI Packages
|
||||
|
||||
| | Platform Tools | PyPI Packages |
|
||||
|---|---|---|
|
||||
| **Publish** | `crewai tool publish` | `uv build` + `uv publish` |
|
||||
| **Registry** | CrewAI platform | PyPI |
|
||||
| **Install** | `crewai tool install <handle>` | `pip install <package>` |
|
||||
| **Auth** | `crewai login` | PyPI account + token |
|
||||
| **Visibility** | `--public` / `--private` flags | Always public |
|
||||
| **Guide** | This page | [Publish Custom Tools](/en/guides/tools/publish-custom-tools) |
|
||||
|
||||
---
|
||||
|
||||
## Related
|
||||
|
||||
- [Create Custom Tools](/en/learn/create-custom-tools) — Python API reference for building tools (BaseTool, @tool decorator)
|
||||
- [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) — package and distribute tools as standalone Python libraries
|
||||
@@ -12,7 +12,9 @@ incorporating the latest functionalities such as tool delegation, error handling
|
||||
enabling agents to perform a wide range of actions.
|
||||
|
||||
<Tip>
|
||||
**Want to publish your tool for the community?** If you're building a tool that others could benefit from, check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to learn how to package and distribute your tool on PyPI.
|
||||
**Want to publish your tool to the CrewAI platform?** Use the CLI to scaffold, publish, and share tools directly on the platform — see the [Platform Tools CLI](/en/guides/tools/platform-tools-cli) guide.
|
||||
|
||||
**Prefer publishing to PyPI?** Check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to package and distribute your tool as a standalone Python library.
|
||||
</Tip>
|
||||
|
||||
### Subclassing `BaseTool`
|
||||
|
||||
@@ -54,6 +54,14 @@ These tools enable your agents to search the web, research topics, and find info
|
||||
Extract structured content from web pages using the Tavily API.
|
||||
</Card>
|
||||
|
||||
<Card title="Tavily Research Tool" icon="flask" href="/en/tools/search-research/tavilyresearchtool">
|
||||
Run multi-step research tasks and get cited reports using the Tavily Research API.
|
||||
</Card>
|
||||
|
||||
<Card title="Tavily Get Research Tool" icon="clipboard-list" href="/en/tools/search-research/tavilygetresearchtool">
|
||||
Retrieve the status and results of an existing Tavily research task.
|
||||
</Card>
|
||||
|
||||
<Card title="Arxiv Paper Tool" icon="box-archive" href="/en/tools/search-research/arxivpapertool">
|
||||
Search arXiv and optionally download PDFs.
|
||||
</Card>
|
||||
@@ -76,7 +84,15 @@ These tools enable your agents to search the web, research topics, and find info
|
||||
- **Academic Research**: Find scholarly articles and technical papers
|
||||
|
||||
```python
|
||||
from crewai_tools import SerperDevTool, GitHubSearchTool, YoutubeVideoSearchTool, TavilySearchTool, TavilyExtractorTool
|
||||
from crewai_tools import (
|
||||
GitHubSearchTool,
|
||||
SerperDevTool,
|
||||
TavilyExtractorTool,
|
||||
TavilyGetResearchTool,
|
||||
TavilyResearchTool,
|
||||
TavilySearchTool,
|
||||
YoutubeVideoSearchTool,
|
||||
)
|
||||
|
||||
# Create research tools
|
||||
web_search = SerperDevTool()
|
||||
@@ -84,11 +100,21 @@ code_search = GitHubSearchTool()
|
||||
video_research = YoutubeVideoSearchTool()
|
||||
tavily_search = TavilySearchTool()
|
||||
content_extractor = TavilyExtractorTool()
|
||||
tavily_research = TavilyResearchTool()
|
||||
tavily_get_research = TavilyGetResearchTool()
|
||||
|
||||
# Add to your agent
|
||||
agent = Agent(
|
||||
role="Research Analyst",
|
||||
tools=[web_search, code_search, video_research, tavily_search, content_extractor],
|
||||
tools=[
|
||||
web_search,
|
||||
code_search,
|
||||
video_research,
|
||||
tavily_search,
|
||||
content_extractor,
|
||||
tavily_research,
|
||||
tavily_get_research,
|
||||
],
|
||||
goal="Gather comprehensive information on any topic"
|
||||
)
|
||||
```
|
||||
|
||||
85
docs/en/tools/search-research/tavilygetresearchtool.mdx
Normal file
85
docs/en/tools/search-research/tavilygetresearchtool.mdx
Normal file
@@ -0,0 +1,85 @@
|
||||
---
|
||||
title: "Tavily Get Research Tool"
|
||||
description: "Retrieve the status and results of an existing Tavily research task"
|
||||
icon: "clipboard-list"
|
||||
mode: "wide"
|
||||
---
|
||||
|
||||
The `TavilyGetResearchTool` lets CrewAI agents check an existing Tavily research task by `request_id`. Use it when a research task was started earlier and you need to retrieve its current status or final results.
|
||||
|
||||
If you need to start a new research job, use the [Tavily Research Tool](/en/tools/search-research/tavilyresearchtool). This tool is specifically for looking up an existing Tavily research request after you already have its `request_id`.
|
||||
|
||||
## Installation
|
||||
|
||||
To use the `TavilyGetResearchTool`, install the `tavily-python` library alongside `crewai-tools`:
|
||||
|
||||
```shell
|
||||
uv add 'crewai[tools]' tavily-python
|
||||
```
|
||||
|
||||
## Environment Variables
|
||||
|
||||
Set your Tavily API key:
|
||||
|
||||
```bash
|
||||
export TAVILY_API_KEY='your_tavily_api_key'
|
||||
```
|
||||
|
||||
Get an API key at [https://app.tavily.com/](https://app.tavily.com/) (sign up, then create a key).
|
||||
|
||||
## Example Usage
|
||||
|
||||
```python
|
||||
from crewai_tools import TavilyGetResearchTool
|
||||
|
||||
tavily_get_research_tool = TavilyGetResearchTool()
|
||||
|
||||
status_result = tavily_get_research_tool.run(
|
||||
request_id="your-research-request-id"
|
||||
)
|
||||
|
||||
print(status_result)
|
||||
```
|
||||
|
||||
## Common Workflow
|
||||
|
||||
Use `TavilyGetResearchTool` when your application or another service has already created a Tavily research task and saved its `request_id`.
|
||||
|
||||
Typical cases include:
|
||||
|
||||
- Polling for completion after kicking off research in a background job.
|
||||
- Looking up the latest status of a long-running research task.
|
||||
- Fetching final research output from a previously created Tavily request.
|
||||
|
||||
## Configuration Options
|
||||
|
||||
The `TavilyGetResearchTool` accepts the following argument when calling the `run` method:
|
||||
|
||||
- `request_id` (str): **Required.** The existing Tavily research request ID to retrieve.
|
||||
|
||||
## Async Usage
|
||||
|
||||
Use `_arun` when your application is already running inside an async event loop:
|
||||
|
||||
```python
|
||||
from crewai_tools import TavilyGetResearchTool
|
||||
|
||||
tavily_get_research_tool = TavilyGetResearchTool()
|
||||
|
||||
status_result = await tavily_get_research_tool._arun(
|
||||
request_id="your-research-request-id"
|
||||
)
|
||||
```
|
||||
|
||||
## Features
|
||||
|
||||
- **Research status retrieval**: Fetch the current status of an existing Tavily research task.
|
||||
- **Result retrieval**: Return available research output once Tavily has completed the task.
|
||||
- **Sync and async**: Use either `_run`/`run` or `_arun` depending on your application's runtime.
|
||||
- **JSON output**: Returns Tavily responses as formatted JSON strings.
|
||||
|
||||
## Response Format
|
||||
|
||||
The tool returns a JSON string containing the current research task status and any available results from Tavily. The exact response shape depends on the task state returned by Tavily, so incomplete tasks may return status information before the final research output is available.
|
||||
|
||||
Refer to the [Tavily API documentation](https://docs.tavily.com/) for full details on the Research API.
|
||||
@@ -36,7 +36,6 @@ from typing_extensions import Self, TypeIs
|
||||
from crewai.agent.planning_config import PlanningConfig
|
||||
from crewai.agent.utils import (
|
||||
ahandle_knowledge_retrieval,
|
||||
append_skill_context,
|
||||
apply_training_data,
|
||||
build_task_prompt_with_schema,
|
||||
format_task_with_context,
|
||||
@@ -549,7 +548,6 @@ class Agent(BaseAgent):
|
||||
Returns:
|
||||
The fully prepared task prompt.
|
||||
"""
|
||||
task_prompt = append_skill_context(self, task_prompt)
|
||||
prepare_tools(self, tools, task)
|
||||
|
||||
return apply_training_data(self, task_prompt)
|
||||
@@ -1486,8 +1484,6 @@ class Agent(BaseAgent):
|
||||
),
|
||||
)
|
||||
|
||||
formatted_messages = append_skill_context(self, formatted_messages)
|
||||
|
||||
inputs: dict[str, Any] = {
|
||||
"input": formatted_messages,
|
||||
"tool_names": get_tool_names(parsed_tools),
|
||||
|
||||
@@ -213,30 +213,6 @@ def _combine_knowledge_context(agent: Agent) -> str:
|
||||
return agent_ctx + separator + crew_ctx
|
||||
|
||||
|
||||
def append_skill_context(agent: Agent, task_prompt: str) -> str:
|
||||
"""Append activated skill context sections to the task prompt.
|
||||
|
||||
Args:
|
||||
agent: The agent with optional skills.
|
||||
task_prompt: The current task prompt.
|
||||
|
||||
Returns:
|
||||
The task prompt with skill context appended.
|
||||
"""
|
||||
if not agent.skills:
|
||||
return task_prompt
|
||||
|
||||
from crewai.skills.loader import format_skill_context
|
||||
from crewai.skills.models import Skill
|
||||
|
||||
skill_sections = [
|
||||
format_skill_context(s) for s in agent.skills if isinstance(s, Skill)
|
||||
]
|
||||
if skill_sections:
|
||||
task_prompt += "\n\n" + "\n\n".join(skill_sections)
|
||||
return task_prompt
|
||||
|
||||
|
||||
def apply_training_data(agent: Agent, task_prompt: str) -> str:
|
||||
"""Apply training data to the task prompt.
|
||||
|
||||
|
||||
@@ -174,6 +174,8 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
if provider.setup_messages(cast(ExecutorContext, cast(object, self))):
|
||||
return
|
||||
|
||||
from crewai.llms.cache import mark_cache_breakpoint
|
||||
|
||||
if self.prompt is not None and "system" in self.prompt:
|
||||
system_prompt = self._format_prompt(
|
||||
cast(str, self.prompt.get("system", "")), inputs
|
||||
@@ -181,11 +183,22 @@ class CrewAgentExecutor(BaseAgentExecutor):
|
||||
user_prompt = self._format_prompt(
|
||||
cast(str, self.prompt.get("user", "")), inputs
|
||||
)
|
||||
self.messages.append(format_message_for_llm(system_prompt, role="system"))
|
||||
self.messages.append(format_message_for_llm(user_prompt))
|
||||
# Cache breakpoints: end-of-system caches the per-agent stable
|
||||
# prefix; end-of-user caches the per-task stable prefix across
|
||||
# ReAct-loop iterations.
|
||||
self.messages.append(
|
||||
mark_cache_breakpoint(
|
||||
format_message_for_llm(system_prompt, role="system")
|
||||
)
|
||||
)
|
||||
self.messages.append(
|
||||
mark_cache_breakpoint(format_message_for_llm(user_prompt))
|
||||
)
|
||||
elif self.prompt is not None:
|
||||
user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs)
|
||||
self.messages.append(format_message_for_llm(user_prompt))
|
||||
self.messages.append(
|
||||
mark_cache_breakpoint(format_message_for_llm(user_prompt))
|
||||
)
|
||||
|
||||
provider.post_setup_messages(cast(ExecutorContext, cast(object, self)))
|
||||
|
||||
|
||||
@@ -2586,16 +2586,26 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
self._kickoff_input = inputs.get("input", "")
|
||||
|
||||
if "system" in self.prompt:
|
||||
from crewai.llms.cache import mark_cache_breakpoint
|
||||
|
||||
prompt = cast("SystemPromptResult", self.prompt)
|
||||
system_prompt = self._format_prompt(prompt["system"], inputs)
|
||||
user_prompt = self._format_prompt(prompt["user"], inputs)
|
||||
self.state.messages.append(
|
||||
format_message_for_llm(system_prompt, role="system")
|
||||
mark_cache_breakpoint(
|
||||
format_message_for_llm(system_prompt, role="system")
|
||||
)
|
||||
)
|
||||
self.state.messages.append(
|
||||
mark_cache_breakpoint(format_message_for_llm(user_prompt))
|
||||
)
|
||||
self.state.messages.append(format_message_for_llm(user_prompt))
|
||||
else:
|
||||
from crewai.llms.cache import mark_cache_breakpoint
|
||||
|
||||
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
|
||||
self.state.messages.append(format_message_for_llm(user_prompt))
|
||||
self.state.messages.append(
|
||||
mark_cache_breakpoint(format_message_for_llm(user_prompt))
|
||||
)
|
||||
|
||||
self._inject_files_from_inputs(inputs)
|
||||
|
||||
@@ -2677,16 +2687,26 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
|
||||
self._kickoff_input = inputs.get("input", "")
|
||||
|
||||
if "system" in self.prompt:
|
||||
from crewai.llms.cache import mark_cache_breakpoint
|
||||
|
||||
prompt = cast("SystemPromptResult", self.prompt)
|
||||
system_prompt = self._format_prompt(prompt["system"], inputs)
|
||||
user_prompt = self._format_prompt(prompt["user"], inputs)
|
||||
self.state.messages.append(
|
||||
format_message_for_llm(system_prompt, role="system")
|
||||
mark_cache_breakpoint(
|
||||
format_message_for_llm(system_prompt, role="system")
|
||||
)
|
||||
)
|
||||
self.state.messages.append(
|
||||
mark_cache_breakpoint(format_message_for_llm(user_prompt))
|
||||
)
|
||||
self.state.messages.append(format_message_for_llm(user_prompt))
|
||||
else:
|
||||
from crewai.llms.cache import mark_cache_breakpoint
|
||||
|
||||
user_prompt = self._format_prompt(self.prompt["prompt"], inputs)
|
||||
self.state.messages.append(format_message_for_llm(user_prompt))
|
||||
self.state.messages.append(
|
||||
mark_cache_breakpoint(format_message_for_llm(user_prompt))
|
||||
)
|
||||
|
||||
self._inject_files_from_inputs(inputs)
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ from datetime import datetime
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, Any, Final, Literal
|
||||
from typing import TYPE_CHECKING, Any, Final, Literal, cast
|
||||
import uuid
|
||||
|
||||
from pydantic import (
|
||||
@@ -703,10 +703,19 @@ class BaseLLM(BaseModel, ABC):
|
||||
Raises:
|
||||
ValueError: If message format is invalid
|
||||
"""
|
||||
from crewai.llms.cache import CACHE_BREAKPOINT_KEY
|
||||
from crewai.utilities.types import LLMMessage as _LLMMessage
|
||||
|
||||
if isinstance(messages, str):
|
||||
return [{"role": "user", "content": messages}]
|
||||
|
||||
# Validate message format
|
||||
# Validate then copy each message, dropping the cache-breakpoint
|
||||
# flag in the copy only. The caller (e.g. CrewAgentExecutor,
|
||||
# experimental.AgentExecutor) reuses its messages buffer across
|
||||
# many LLM calls in the tool-use loop; mutating their dicts
|
||||
# in place would erase the markers after the first call and
|
||||
# break prompt caching for every subsequent iteration.
|
||||
cleaned: list[LLMMessage] = []
|
||||
for i, msg in enumerate(messages):
|
||||
if not isinstance(msg, dict):
|
||||
raise ValueError(f"Message at index {i} must be a dictionary")
|
||||
@@ -714,8 +723,12 @@ class BaseLLM(BaseModel, ABC):
|
||||
raise ValueError(
|
||||
f"Message at index {i} must have 'role' and 'content' keys"
|
||||
)
|
||||
copy: dict[str, Any] = {
|
||||
k: v for k, v in msg.items() if k != CACHE_BREAKPOINT_KEY
|
||||
}
|
||||
cleaned.append(cast(_LLMMessage, copy))
|
||||
|
||||
return self._process_message_files(messages)
|
||||
return self._process_message_files(cleaned)
|
||||
|
||||
def _process_message_files(self, messages: list[LLMMessage]) -> list[LLMMessage]:
|
||||
"""Process files attached to messages and format for the provider.
|
||||
|
||||
37
lib/crewai/src/crewai/llms/cache.py
Normal file
37
lib/crewai/src/crewai/llms/cache.py
Normal file
@@ -0,0 +1,37 @@
|
||||
"""Provider-agnostic prompt-cache breakpoint marker.
|
||||
|
||||
Application code (prompt builders, agent executors) marks messages where a
|
||||
stable prefix ends. Provider adapters then translate the marker into the
|
||||
cache directive their API expects, or strip it for providers that cache
|
||||
implicitly (OpenAI, Gemini) or do not cache at all.
|
||||
|
||||
Usage:
|
||||
|
||||
from crewai.llms.cache import mark_cache_breakpoint
|
||||
|
||||
messages = [
|
||||
mark_cache_breakpoint({"role": "system", "content": stable_system}),
|
||||
mark_cache_breakpoint({"role": "user", "content": stable_user_prefix}),
|
||||
{"role": "user", "content": volatile_query},
|
||||
]
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
CACHE_BREAKPOINT_KEY = "cache_breakpoint"
|
||||
|
||||
|
||||
def mark_cache_breakpoint(message: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Return ``message`` with the cache-breakpoint flag set.
|
||||
|
||||
Returns a new dict so callers can safely pass literal dicts.
|
||||
"""
|
||||
return {**message, CACHE_BREAKPOINT_KEY: True}
|
||||
|
||||
|
||||
def strip_cache_breakpoint(message: dict[str, Any]) -> None:
|
||||
"""Remove the breakpoint flag from a message in place."""
|
||||
message.pop(CACHE_BREAKPOINT_KEY, None)
|
||||
@@ -425,7 +425,7 @@ class AnthropicCompletion(BaseLLM):
|
||||
def _prepare_completion_params(
|
||||
self,
|
||||
messages: list[LLMMessage],
|
||||
system_message: str | None = None,
|
||||
system_message: str | list[dict[str, Any]] | None = None,
|
||||
tools: list[dict[str, Any]] | None = None,
|
||||
available_functions: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
@@ -665,7 +665,7 @@ class AnthropicCompletion(BaseLLM):
|
||||
|
||||
def _format_messages_for_anthropic(
|
||||
self, messages: str | list[LLMMessage]
|
||||
) -> tuple[list[LLMMessage], str | None]:
|
||||
) -> tuple[list[LLMMessage], str | list[dict[str, Any]] | None]:
|
||||
"""Format messages for Anthropic API.
|
||||
|
||||
Anthropic has specific requirements:
|
||||
@@ -679,8 +679,51 @@ class AnthropicCompletion(BaseLLM):
|
||||
messages: Input messages
|
||||
|
||||
Returns:
|
||||
Tuple of (formatted_messages, system_message)
|
||||
Tuple of (formatted_messages, system_message). `system_message` is
|
||||
a list of content blocks (with cache_control stamped) when any
|
||||
system message in the input carried a cache_breakpoint flag;
|
||||
otherwise a plain string for backwards compatibility.
|
||||
"""
|
||||
from crewai.llms.cache import CACHE_BREAKPOINT_KEY
|
||||
|
||||
# Read cache_breakpoint flags from raw input BEFORE super strips them.
|
||||
# We track the CONTENT of marked user/assistant messages so we can
|
||||
# locate the corresponding block in formatted_messages — Anthropic
|
||||
# rewrites tool results into user messages, so positional indices
|
||||
# do not survive the conversion. We must stamp the original stable
|
||||
# message (typically the initial task prompt), not whatever happens
|
||||
# to be the trailing user-role block after tool_result expansion.
|
||||
cache_system = False
|
||||
cache_match_contents: list[str] = []
|
||||
if not isinstance(messages, str):
|
||||
for m in messages:
|
||||
if not (isinstance(m, dict) and m.get(CACHE_BREAKPOINT_KEY)):
|
||||
continue
|
||||
role = m.get("role")
|
||||
if role == "system":
|
||||
cache_system = True
|
||||
continue
|
||||
if role != "user":
|
||||
# Only user messages survive Anthropic's role-coalescing
|
||||
# in a stable, addressable position. Markers on assistant
|
||||
# or tool messages have no reliable stamp target after
|
||||
# tool_result expansion, so we ignore them.
|
||||
continue
|
||||
raw_content = m.get("content")
|
||||
if isinstance(raw_content, str) and raw_content:
|
||||
cache_match_contents.append(raw_content)
|
||||
continue
|
||||
if isinstance(raw_content, list):
|
||||
# Pull text from a single-text-block list so callers that
|
||||
# pre-format content blocks still match cleanly.
|
||||
text_blocks = [
|
||||
b.get("text")
|
||||
for b in raw_content
|
||||
if isinstance(b, dict) and b.get("type") == "text"
|
||||
]
|
||||
if len(text_blocks) == 1 and isinstance(text_blocks[0], str):
|
||||
cache_match_contents.append(text_blocks[0])
|
||||
|
||||
# Use base class formatting first
|
||||
base_formatted = super()._format_messages(messages)
|
||||
|
||||
@@ -788,7 +831,62 @@ class AnthropicCompletion(BaseLLM):
|
||||
# If first message is not from user, insert a user message at the beginning
|
||||
formatted_messages.insert(0, {"role": "user", "content": "Hello"})
|
||||
|
||||
return formatted_messages, system_message
|
||||
# Stamp cache_control on the message(s) whose original content was
|
||||
# marked. We scan formatted_messages in order and stamp the first
|
||||
# match per marked content — Anthropic permits up to 4 cache
|
||||
# breakpoints per request, which is more than enough for our usage.
|
||||
# Matching by content (rather than position) handles the ReAct
|
||||
# case where tool_result blocks get expanded into trailing user
|
||||
# messages: the stable initial-task prompt still maps cleanly.
|
||||
for needle in cache_match_contents:
|
||||
for fm in formatted_messages:
|
||||
if fm.get("role") != "user":
|
||||
continue
|
||||
content = fm.get("content")
|
||||
if isinstance(content, str) and content == needle:
|
||||
self._stamp_cache_control_on_message(fm)
|
||||
break
|
||||
if isinstance(content, list):
|
||||
fm_texts: list[str] = [
|
||||
b.get("text", "")
|
||||
for b in content
|
||||
if isinstance(b, dict) and b.get("type") == "text"
|
||||
]
|
||||
if len(fm_texts) == 1 and fm_texts[0] == needle:
|
||||
self._stamp_cache_control_on_message(fm)
|
||||
break
|
||||
|
||||
# Convert system to content-block form when caching is requested.
|
||||
system_payload: str | list[dict[str, Any]] | None = system_message
|
||||
if system_message and cache_system:
|
||||
system_payload = [
|
||||
{
|
||||
"type": "text",
|
||||
"text": system_message,
|
||||
"cache_control": {"type": "ephemeral"},
|
||||
}
|
||||
]
|
||||
|
||||
return formatted_messages, system_payload
|
||||
|
||||
@staticmethod
|
||||
def _stamp_cache_control_on_message(message: LLMMessage) -> None:
|
||||
"""Stamp cache_control on the last content block of an Anthropic message."""
|
||||
msg = cast(dict[str, Any], message)
|
||||
content = msg.get("content")
|
||||
if isinstance(content, str):
|
||||
msg["content"] = [
|
||||
{
|
||||
"type": "text",
|
||||
"text": content,
|
||||
"cache_control": {"type": "ephemeral"},
|
||||
}
|
||||
]
|
||||
return
|
||||
if isinstance(content, list) and content:
|
||||
last = content[-1]
|
||||
if isinstance(last, dict):
|
||||
last["cache_control"] = {"type": "ephemeral"}
|
||||
|
||||
def _handle_completion(
|
||||
self,
|
||||
|
||||
@@ -161,6 +161,9 @@ def format_skill_context(skill: Skill) -> str:
|
||||
At METADATA level: returns name and description only.
|
||||
At INSTRUCTIONS level or above: returns full SKILL.md body.
|
||||
|
||||
Output is wrapped in <skill name="..."> XML tags so the block can serve
|
||||
as a stable cache anchor when injected into the system prompt.
|
||||
|
||||
Args:
|
||||
skill: The skill to format.
|
||||
|
||||
@@ -169,7 +172,7 @@ def format_skill_context(skill: Skill) -> str:
|
||||
"""
|
||||
if skill.disclosure_level >= INSTRUCTIONS and skill.instructions:
|
||||
parts = [
|
||||
f"## Skill: {skill.name}",
|
||||
f'<skill name="{skill.name}">',
|
||||
skill.description,
|
||||
"",
|
||||
skill.instructions,
|
||||
@@ -180,5 +183,6 @@ def format_skill_context(skill: Skill) -> str:
|
||||
for dir_name, files in sorted(skill.resource_files.items()):
|
||||
if files:
|
||||
parts.append(f"- **{dir_name}/**: {', '.join(files)}")
|
||||
parts.append("</skill>")
|
||||
return "\n".join(parts)
|
||||
return f"## Skill: {skill.name}\n{skill.description}"
|
||||
return f'<skill name="{skill.name}">\n{skill.description}\n</skill>'
|
||||
|
||||
@@ -86,7 +86,7 @@ class Prompts(BaseModel):
|
||||
slices.append("tools")
|
||||
else:
|
||||
slices.append("no_tools")
|
||||
system: str = self._build_prompt(slices)
|
||||
system: str = self._build_prompt(slices) + self._build_skill_block()
|
||||
|
||||
# Determine which task slice to use:
|
||||
task_slice: COMPONENTS
|
||||
@@ -106,7 +106,7 @@ class Prompts(BaseModel):
|
||||
return SystemPromptResult(
|
||||
system=system,
|
||||
user=self._build_prompt([task_slice]),
|
||||
prompt=self._build_prompt(slices),
|
||||
prompt=self._build_prompt(slices) + self._build_skill_block(),
|
||||
)
|
||||
return StandardPromptResult(
|
||||
prompt=self._build_prompt(
|
||||
@@ -115,8 +115,27 @@ class Prompts(BaseModel):
|
||||
self.prompt_template,
|
||||
self.response_template,
|
||||
)
|
||||
+ self._build_skill_block()
|
||||
)
|
||||
|
||||
def _build_skill_block(self) -> str:
|
||||
"""Render the agent's activated skills as a stable XML block.
|
||||
|
||||
Skills are agent-scoped (do not change per task), so they live in the
|
||||
system prompt where prompt-cache prefixes can survive across calls.
|
||||
"""
|
||||
skills = getattr(self.agent, "skills", None)
|
||||
if not skills:
|
||||
return ""
|
||||
|
||||
from crewai.skills.loader import format_skill_context
|
||||
from crewai.skills.models import Skill
|
||||
|
||||
sections = [format_skill_context(s) for s in skills if isinstance(s, Skill)]
|
||||
if not sections:
|
||||
return ""
|
||||
return "\n\n<skills>\n" + "\n\n".join(sections) + "\n</skills>"
|
||||
|
||||
def _build_prompt(
|
||||
self,
|
||||
components: list[COMPONENTS],
|
||||
|
||||
196
lib/crewai/tests/llms/test_prompt_cache.py
Normal file
196
lib/crewai/tests/llms/test_prompt_cache.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""Regression tests for the provider-agnostic prompt-cache breakpoint flag."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from crewai.llms.cache import (
|
||||
CACHE_BREAKPOINT_KEY,
|
||||
mark_cache_breakpoint,
|
||||
strip_cache_breakpoint,
|
||||
)
|
||||
from crewai.llms.providers.anthropic.completion import AnthropicCompletion
|
||||
from crewai.llms.providers.openai.completion import OpenAICompletion
|
||||
|
||||
|
||||
class TestCacheMarkerHelpers:
|
||||
def test_mark_returns_new_dict(self) -> None:
|
||||
original = {"role": "user", "content": "hi"}
|
||||
marked = mark_cache_breakpoint(original)
|
||||
assert marked[CACHE_BREAKPOINT_KEY] is True
|
||||
# Marker must NOT bleed back into the caller's dict — callers may
|
||||
# pass literal dicts and reuse them across calls.
|
||||
assert CACHE_BREAKPOINT_KEY not in original
|
||||
|
||||
def test_strip_is_idempotent(self) -> None:
|
||||
msg = {"role": "user", "content": "hi", CACHE_BREAKPOINT_KEY: True}
|
||||
strip_cache_breakpoint(msg)
|
||||
assert CACHE_BREAKPOINT_KEY not in msg
|
||||
strip_cache_breakpoint(msg)
|
||||
assert CACHE_BREAKPOINT_KEY not in msg
|
||||
|
||||
|
||||
class TestBaseFormatDoesNotMutate:
|
||||
"""The strip-on-format pass must not erase markers from the caller's
|
||||
messages list — executors reuse a single list across many LLM calls,
|
||||
and mutating it would defeat caching on every iteration after the first.
|
||||
"""
|
||||
|
||||
def test_repeated_format_preserves_markers(self) -> None:
|
||||
llm = OpenAICompletion(model="gpt-4o-mini")
|
||||
messages = [
|
||||
mark_cache_breakpoint({"role": "system", "content": "stable system"}),
|
||||
mark_cache_breakpoint({"role": "user", "content": "stable user"}),
|
||||
]
|
||||
# First call: provider strips markers from the returned (copied) list
|
||||
first = llm._format_messages(messages)
|
||||
assert all(CACHE_BREAKPOINT_KEY not in m for m in first)
|
||||
# Original list must STILL carry the markers
|
||||
assert messages[0][CACHE_BREAKPOINT_KEY] is True
|
||||
assert messages[1][CACHE_BREAKPOINT_KEY] is True
|
||||
# Second call from the same list still sees the markers
|
||||
second = llm._format_messages(messages)
|
||||
assert all(CACHE_BREAKPOINT_KEY not in m for m in second)
|
||||
assert messages[0][CACHE_BREAKPOINT_KEY] is True
|
||||
assert messages[1][CACHE_BREAKPOINT_KEY] is True
|
||||
|
||||
|
||||
class TestAnthropicCacheStamping:
|
||||
def test_stamps_system_with_cache_control(self) -> None:
|
||||
llm = AnthropicCompletion(model="claude-sonnet-4-5")
|
||||
messages = [
|
||||
mark_cache_breakpoint({"role": "system", "content": "you are helpful"}),
|
||||
mark_cache_breakpoint({"role": "user", "content": "ping"}),
|
||||
]
|
||||
formatted, system = llm._format_messages_for_anthropic(messages)
|
||||
assert isinstance(system, list)
|
||||
assert system[0]["cache_control"] == {"type": "ephemeral"}
|
||||
assert system[0]["text"] == "you are helpful"
|
||||
# First user block carries cache_control too
|
||||
last_block = formatted[0]["content"][-1]
|
||||
assert last_block["cache_control"] == {"type": "ephemeral"}
|
||||
|
||||
def test_stamps_stable_user_not_tool_result(self) -> None:
|
||||
"""Within a ReAct loop, tool results are flattened into a trailing
|
||||
user message. We must NOT stamp that volatile trailing block — we
|
||||
must stamp the original stable user prompt instead.
|
||||
"""
|
||||
llm = AnthropicCompletion(model="claude-sonnet-4-5")
|
||||
messages = [
|
||||
mark_cache_breakpoint({"role": "system", "content": "you are helpful"}),
|
||||
mark_cache_breakpoint({"role": "user", "content": "stable task prompt"}),
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": "tc_1",
|
||||
"function": {"name": "ping", "arguments": "{}"},
|
||||
}
|
||||
],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "tc_1", "content": "volatile tool result"},
|
||||
]
|
||||
formatted, _system = llm._format_messages_for_anthropic(messages)
|
||||
# Find the message that holds the stable prompt
|
||||
stable = next(
|
||||
fm
|
||||
for fm in formatted
|
||||
if fm["role"] == "user"
|
||||
and isinstance(fm["content"], list)
|
||||
and any(
|
||||
isinstance(b, dict)
|
||||
and b.get("type") == "text"
|
||||
and b.get("text") == "stable task prompt"
|
||||
for b in fm["content"]
|
||||
)
|
||||
)
|
||||
text_block = next(
|
||||
b for b in stable["content"] if isinstance(b, dict) and b.get("type") == "text"
|
||||
)
|
||||
assert text_block.get("cache_control") == {"type": "ephemeral"}
|
||||
# The tool_result-bearing user message must NOT be stamped
|
||||
tool_carrier = next(
|
||||
fm
|
||||
for fm in formatted
|
||||
if fm["role"] == "user"
|
||||
and isinstance(fm["content"], list)
|
||||
and any(
|
||||
isinstance(b, dict) and b.get("type") == "tool_result"
|
||||
for b in fm["content"]
|
||||
)
|
||||
)
|
||||
for block in tool_carrier["content"]:
|
||||
assert "cache_control" not in block
|
||||
|
||||
def test_assistant_marker_is_ignored(self) -> None:
|
||||
"""Markers on assistant messages have no stable stamp target after
|
||||
Anthropic's role coalescing, so they should be silently ignored
|
||||
rather than collected and then dropped on a mismatch.
|
||||
"""
|
||||
llm = AnthropicCompletion(model="claude-sonnet-4-5")
|
||||
messages = [
|
||||
mark_cache_breakpoint({"role": "system", "content": "you are helpful"}),
|
||||
mark_cache_breakpoint(
|
||||
{"role": "assistant", "content": "I will help you out."}
|
||||
),
|
||||
{"role": "user", "content": "ping"},
|
||||
]
|
||||
formatted, system = llm._format_messages_for_anthropic(messages)
|
||||
# System still cached
|
||||
assert isinstance(system, list)
|
||||
# No user message was marked → no user message should carry cache_control
|
||||
for fm in formatted:
|
||||
if fm.get("role") != "user":
|
||||
continue
|
||||
content = fm.get("content")
|
||||
if isinstance(content, list):
|
||||
for block in content:
|
||||
if isinstance(block, dict):
|
||||
assert "cache_control" not in block
|
||||
|
||||
def test_list_content_user_marker_matches(self) -> None:
|
||||
"""A pre-formatted user message with a single text block should still
|
||||
match against the post-format user message.
|
||||
"""
|
||||
llm = AnthropicCompletion(model="claude-sonnet-4-5")
|
||||
messages = [
|
||||
mark_cache_breakpoint(
|
||||
{
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": "stable list prompt"}],
|
||||
}
|
||||
),
|
||||
]
|
||||
formatted, _system = llm._format_messages_for_anthropic(messages)
|
||||
user_msg = next(fm for fm in formatted if fm["role"] == "user")
|
||||
content = user_msg["content"]
|
||||
assert isinstance(content, list)
|
||||
text_block = next(b for b in content if isinstance(b, dict) and b.get("type") == "text")
|
||||
assert text_block.get("cache_control") == {"type": "ephemeral"}
|
||||
|
||||
def test_unmarked_messages_get_no_cache_control(self) -> None:
|
||||
llm = AnthropicCompletion(model="claude-sonnet-4-5")
|
||||
messages = [
|
||||
{"role": "system", "content": "no caching here"},
|
||||
{"role": "user", "content": "no caching here either"},
|
||||
]
|
||||
formatted, system = llm._format_messages_for_anthropic(messages)
|
||||
# No marker → system stays a plain string (no content-block conversion)
|
||||
assert isinstance(system, str)
|
||||
# No marker → no cache_control anywhere in formatted messages
|
||||
for fm in formatted:
|
||||
content = fm.get("content")
|
||||
if isinstance(content, list):
|
||||
for block in content:
|
||||
assert "cache_control" not in block
|
||||
|
||||
|
||||
class TestNonAnthropicStripsMarker:
|
||||
def test_openai_format_strips_marker_from_wire_payload(self) -> None:
|
||||
llm = OpenAICompletion(model="gpt-4o-mini")
|
||||
messages = [
|
||||
mark_cache_breakpoint({"role": "system", "content": "stable"}),
|
||||
mark_cache_breakpoint({"role": "user", "content": "hi"}),
|
||||
]
|
||||
formatted = llm._format_messages(messages)
|
||||
for m in formatted:
|
||||
assert CACHE_BREAKPOINT_KEY not in m
|
||||
@@ -5,9 +5,9 @@ from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from crewai import Agent
|
||||
from crewai.agent.utils import append_skill_context
|
||||
from crewai.skills.loader import activate_skill, discover_skills, format_skill_context
|
||||
from crewai.skills.models import INSTRUCTIONS, METADATA
|
||||
from crewai.utilities.prompts import Prompts
|
||||
|
||||
|
||||
def _create_skill_dir(parent: Path, name: str, body: str = "Body.") -> Path:
|
||||
@@ -34,7 +34,7 @@ class TestSkillDiscoveryAndActivation:
|
||||
assert activated.instructions == "Use this skill."
|
||||
|
||||
context = format_skill_context(activated)
|
||||
assert "## Skill: my-skill" in context
|
||||
assert '<skill name="my-skill">' in context
|
||||
assert "Use this skill." in context
|
||||
|
||||
def test_filter_by_skill_names(self, tmp_path: Path) -> None:
|
||||
@@ -94,7 +94,9 @@ class TestSkillDiscoveryAndActivation:
|
||||
assert agent.skills[0].disclosure_level == METADATA
|
||||
assert agent.skills[0].instructions is None
|
||||
|
||||
prompt = append_skill_context(agent, "Plan a 10-day Japan itinerary.")
|
||||
assert "## Skill: travel" in prompt
|
||||
assert "Skill travel" in prompt
|
||||
assert "Use this skill for travel planning." not in prompt
|
||||
result = Prompts(agent=agent, has_tools=False, use_system_prompt=True).task_execution()
|
||||
system = getattr(result, "system", "") or result.prompt
|
||||
assert '<skill name="travel">' in system
|
||||
assert "Skill travel" in system
|
||||
# METADATA-level skills must not leak full instructions into the prompt
|
||||
assert "Use this skill for travel planning." not in system
|
||||
|
||||
@@ -105,7 +105,7 @@ class TestFormatSkillContext:
|
||||
frontmatter=fm, path=tmp_path, disclosure_level=METADATA
|
||||
)
|
||||
ctx = format_skill_context(skill)
|
||||
assert "## Skill: test-skill" in ctx
|
||||
assert '<skill name="test-skill">' in ctx
|
||||
assert "A skill" in ctx
|
||||
|
||||
def test_instructions_level(self, tmp_path: Path) -> None:
|
||||
@@ -117,7 +117,7 @@ class TestFormatSkillContext:
|
||||
instructions="Do these things.",
|
||||
)
|
||||
ctx = format_skill_context(skill)
|
||||
assert "## Skill: test-skill" in ctx
|
||||
assert '<skill name="test-skill">' in ctx
|
||||
assert "Do these things." in ctx
|
||||
|
||||
def test_no_instructions_at_instructions_level(self, tmp_path: Path) -> None:
|
||||
@@ -129,7 +129,7 @@ class TestFormatSkillContext:
|
||||
instructions=None,
|
||||
)
|
||||
ctx = format_skill_context(skill)
|
||||
assert ctx == "## Skill: test-skill\nA skill"
|
||||
assert ctx == '<skill name="test-skill">\nA skill\n</skill>'
|
||||
|
||||
def test_resources_level(self, tmp_path: Path) -> None:
|
||||
fm = SkillFrontmatter(name="test-skill", description="A skill")
|
||||
|
||||
Reference in New Issue
Block a user