Compare commits

...

2 Commits

Author SHA1 Message Date
lorenzejay
3855076670 self improve with skills 2026-05-05 14:45:52 -07:00
lorenzejay
4af40c64f2 self improve with skills 2026-05-05 14:42:25 -07:00
20 changed files with 2481 additions and 3 deletions

View File

@@ -84,6 +84,7 @@ from crewai.rag.embeddings.types import EmbedderConfig
from crewai.security.fingerprint import Fingerprint
from crewai.skills.loader import activate_skill, discover_skills
from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel
from crewai.skills.self_improve.models import SelfImprovementConfig
from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.types.callback import SerializableCallable
@@ -190,6 +191,7 @@ class Agent(BaseAgent):
_times_executed: int = PrivateAttr(default=0)
_mcp_resolver: MCPToolResolver | None = PrivateAttr(default=None)
_last_messages: list[LLMMessage] = PrivateAttr(default_factory=list)
_self_improve_collector: Any = PrivateAttr(default=None)
max_execution_time: int | None = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -320,6 +322,15 @@ class Agent(BaseAgent):
agent_executor: CrewAgentExecutor | AgentExecutor | None = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
self_improve: bool | SelfImprovementConfig = Field(
default=False,
description=(
"Enable the self-improvement loop. ``True`` uses defaults; pass a "
"``SelfImprovementConfig`` to override (e.g. point ``skills_dir`` at "
"a project-relative path so accepted skills get committed alongside "
"the code). See ``crewai.skills.self_improve``."
),
)
executor_class: Annotated[
type[CrewAgentExecutor] | type[AgentExecutor],
BeforeValidator(_validate_executor_class),
@@ -360,6 +371,13 @@ class Agent(BaseAgent):
self.set_skills()
if self.self_improve and self._self_improve_collector is None:
from crewai.skills.self_improve.collector import TraceCollector
collector = TraceCollector(self)
collector.attach(crewai_event_bus)
self._self_improve_collector = collector
if self.reasoning and self.planning_config is None:
warnings.warn(
"The 'reasoning' parameter is deprecated. Use 'planning_config=PlanningConfig()' instead.",
@@ -372,6 +390,14 @@ class Agent(BaseAgent):
return self
def _self_improve_config(self) -> SelfImprovementConfig | None:
"""Return the active SelfImprovementConfig, or None when disabled."""
if not self.self_improve:
return None
if isinstance(self.self_improve, SelfImprovementConfig):
return self.self_improve
return SelfImprovementConfig()
@property
def planning_enabled(self) -> bool:
"""Check if planning is enabled for this agent."""
@@ -429,7 +455,20 @@ class Agent(BaseAgent):
else:
crew_skills = list(resolved_crew_skills)
if not self.skills and not crew_skills:
self_improve_dir: Path | None = None
if (config := self._self_improve_config()) is not None:
from crewai.skills.self_improve.storage import SkillStore, _slug
if config.skills_dir is not None:
candidate = config.skills_dir / _slug(self.role)
else:
candidate = SkillStore().role_dir(self.role)
if candidate.is_dir() and any(
(c / "SKILL.md").is_file() for c in candidate.iterdir() if c.is_dir()
):
self_improve_dir = candidate
if not self.skills and not crew_skills and self_improve_dir is None:
return
needs_work = self.skills and any(
@@ -437,7 +476,7 @@ class Agent(BaseAgent):
or (isinstance(s, SkillModel) and s.disclosure_level < INSTRUCTIONS)
for s in self.skills
)
if not needs_work and not crew_skills:
if not needs_work and not crew_skills and self_improve_dir is None:
return
seen: set[str] = set()
@@ -447,6 +486,9 @@ class Agent(BaseAgent):
if crew_skills:
items.extend(crew_skills)
if self_improve_dir is not None:
items.append(self_improve_dir)
for item in items:
if isinstance(item, Path):
discovered = discover_skills(item, source=self)

View File

@@ -24,6 +24,7 @@ from crewai.cli.reset_memories_command import reset_memories_command
from crewai.cli.run_crew import run_crew
from crewai.cli.settings.main import SettingsCommand
from crewai.cli.shared.token_manager import TokenManager
from crewai.cli.skills_proposals import skills as skills_group
from crewai.cli.tools.main import ToolCommand
from crewai.cli.train_crew import train_crew
from crewai.cli.triggers.main import TriggersCommand
@@ -955,5 +956,8 @@ def checkpoint_prune(
prune_checkpoints(ctx.obj["location"], keep, older_than, dry_run)
crewai.add_command(skills_group)
if __name__ == "__main__":
crewai()

View File

@@ -0,0 +1,147 @@
"""Minimal Textual TUI for triaging skill proposals.
Two panes: the proposals list on the left, the highlighted proposal's
``SKILL.md`` body on the right. Keystrokes accept/reject in place. No
search, no scopes, no async workers — the underlying actions are the
same `accept_proposal` / `reject_proposal` calls the CLI uses.
"""
from __future__ import annotations
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, VerticalScroll
from textual.widgets import Footer, Header, OptionList, Static
from crewai.skills.self_improve import (
ProposalStore,
accept_proposal,
reject_proposal,
)
from crewai.skills.self_improve.models import SkillProposal
_PRIMARY = "#eb6658"
_SECONDARY = "#1F7982"
_TERTIARY = "#ffffff"
def _format_proposal_detail(p: SkillProposal) -> str:
kind = (
f"[bold]{p.proposal_kind}[/] → {p.target_skill}"
if p.proposal_kind == "patch_existing"
else "[bold]new[/]"
)
runs = ", ".join(p.derived_from_runs) or "-"
return (
f"[bold {_PRIMARY}]{p.name}[/]\n"
f"[dim]role:[/] {p.agent_role}\n"
f"[dim]kind:[/] {kind}\n"
f"[dim]confidence:[/] [bold]{p.confidence:.2f}[/]\n"
f"[dim]from runs:[/] {runs}\n\n"
f"[bold]Rationale[/]\n{p.rationale}\n\n"
f"[bold {_PRIMARY}]{'' * 44}[/]\n"
f"{p.body}"
)
class SkillProposalsTUI(App[None]):
"""Triage UI: navigate the queue, accept or reject in place."""
TITLE = "CrewAI Skill Proposals"
SUB_TITLE = "↑↓ list · tab focus pane · PgUp/PgDn or mouse to scroll body · a/r/q"
BINDINGS = [
Binding("a", "accept", "Accept"),
Binding("r", "reject", "Reject"),
Binding("q", "quit", "Quit"),
Binding("tab", "focus_next", "Switch pane", show=False),
]
CSS = f"""
Header {{ background: {_PRIMARY}; color: {_TERTIARY}; }}
Footer {{ background: {_SECONDARY}; color: {_TERTIARY}; }}
Footer > .footer-key--key {{ background: {_PRIMARY}; color: {_TERTIARY}; }}
Horizontal {{ height: 1fr; }}
#list {{
width: 40%;
border-right: solid {_SECONDARY};
scrollbar-color: {_PRIMARY};
}}
#list > .option-list--option-highlighted {{
background: {_SECONDARY}; color: {_TERTIARY};
}}
#detail-scroll {{
width: 60%;
padding: 1 2;
scrollbar-color: {_PRIMARY};
}}
#detail-scroll:focus {{
background: {_SECONDARY} 5%;
}}
"""
def __init__(self, store: ProposalStore | None = None) -> None:
super().__init__()
self._store = store or ProposalStore()
self._proposals: list[SkillProposal] = []
def compose(self) -> ComposeResult:
yield Header(show_clock=False)
with Horizontal():
yield OptionList(id="list")
with VerticalScroll(id="detail-scroll"):
yield Static("Select a proposal to view its body.", id="detail")
yield Footer()
def on_mount(self) -> None:
self.query_one("#list", OptionList).border_title = "Pending"
self.query_one("#detail-scroll", VerticalScroll).border_title = "Detail"
self._reload()
def _reload(self) -> None:
self._proposals = self._store.list_all()
option_list = self.query_one("#list", OptionList)
option_list.clear_options()
for p in self._proposals:
kind_tag = "P" if p.proposal_kind == "patch_existing" else "N"
label = f"[{kind_tag}] {p.confidence:.2f} {p.name}"
option_list.add_option(label)
option_list.border_title = f"Pending ({len(self._proposals)})"
if not self._proposals:
self.query_one("#detail", Static).update("[dim](queue is empty)[/]")
def _selected(self) -> SkillProposal | None:
idx = self.query_one("#list", OptionList).highlighted
if idx is None or idx >= len(self._proposals):
return None
return self._proposals[idx]
def on_option_list_option_highlighted(
self, event: OptionList.OptionHighlighted
) -> None:
idx = event.option_index
if idx < len(self._proposals):
self.query_one("#detail", Static).update(
_format_proposal_detail(self._proposals[idx])
)
def action_accept(self) -> None:
prop = self._selected()
if prop is None:
return
try:
accept_proposal(prop)
self.notify(f"Accepted: {prop.name}", severity="information")
except FileExistsError as e:
self.notify(str(e), severity="warning", timeout=8)
return
self._reload()
def action_reject(self) -> None:
prop = self._selected()
if prop is None:
return
reject_proposal(prop)
self.notify(f"Rejected: {prop.name}", severity="information")
self._reload()

View File

@@ -0,0 +1,249 @@
"""``crewai skills`` subcommands for the self-improvement loop."""
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING
import click
from crewai.skills.self_improve import (
ProposalStore,
SkillReviewer,
TraceStore,
accept_proposal,
reject_proposal,
)
if TYPE_CHECKING:
from crewai.skills.self_improve.models import RunTrace, SkillProposal
_DEFAULT_REVIEW_MODEL = "anthropic/claude-haiku-4-5"
def _print_proposal_summary(p: SkillProposal) -> None:
kind = f"PATCH→{p.target_skill}" if p.proposal_kind == "patch_existing" else "NEW"
click.echo(f" {p.id} {kind:<28} conf={p.confidence:.2f} role={p.agent_role}")
click.echo(f" {p.name}: {p.description}")
@click.group(name="skills")
def skills() -> None:
"""Manage agent skills."""
@skills.command(name="review")
@click.option(
"--role",
default=None,
help="Limit review to one role (slug or full name). Default: all roles with traces.",
)
@click.option(
"--model",
default=_DEFAULT_REVIEW_MODEL,
help=f"LiteLLM model id for the reviewer LLM. Default: {_DEFAULT_REVIEW_MODEL}.",
)
@click.option(
"--min-traces",
default=2,
type=int,
help="Minimum traces per role before review fires. Default: 2.",
)
@click.option(
"--floor",
default=0.6,
type=float,
help="Drop proposals below this confidence. Default: 0.6.",
)
def skills_review(role: str | None, model: str, min_traces: int, floor: float) -> None:
"""Mine accumulated traces for skill proposals.
Reads role + goal from the trace metadata, calls the reviewer LLM, and
persists each proposal that scores above ``--floor`` to the queue. Use
``crewai skills proposals list`` to see what came out.
"""
from crewai import LLM
trace_store = TraceStore()
proposal_store = ProposalStore()
if not trace_store.root.exists():
click.echo(f"No traces yet at {trace_store.root}", err=True)
raise SystemExit(1)
# Group traces by role from disk; the role-slug dirs come from the store.
by_role: dict[str, list[RunTrace]] = defaultdict(list)
role_dirs = (
[trace_store.role_dir(role)] if role else sorted(trace_store.root.iterdir())
)
for d in role_dirs:
if not d.is_dir():
continue
for path in sorted(d.glob("*.json")):
t = trace_store.load(path)
by_role[t.agent_role].append(t)
if not by_role:
click.echo(
"No traces found." if role is None else f"No traces for role={role!r}."
)
return
reviewer_llm = LLM(model=model)
total_emitted = 0
for agent_role, traces in by_role.items():
if len(traces) < min_traces:
click.echo(
f"Skipping {agent_role!r}: {len(traces)} trace(s), need {min_traces}."
)
continue
agent_goal = next((t.agent_goal for t in traces if t.agent_goal), "")
loaded_skills_seen = sorted({s for t in traces for s in t.loaded_skills})
pending = [
proposal_store.load(p) for p in proposal_store.list_for_role(agent_role)
]
reviewer = SkillReviewer(
agent_role=agent_role,
agent_goal=agent_goal,
llm=reviewer_llm,
min_traces=min_traces,
confidence_floor=floor,
)
click.echo(
f"🧠 Reviewing {len(traces)} trace(s) for {agent_role!r} "
f"(model={model}, pending={len(pending)})…"
)
proposals_out = reviewer.review(
traces,
loaded_skill_names=loaded_skills_seen,
pending_proposals=pending,
)
for p in proposals_out:
path = proposal_store.save(p)
click.echo(f" + {p.id} conf={p.confidence:.2f} {p.name}")
click.echo(f"{path}")
total_emitted += len(proposals_out)
click.echo(
f"\n✅ Done. {total_emitted} proposal(s) added to the queue. "
f"Run `crewai skills proposals list` to view."
)
@skills.group(name="proposals")
def proposals() -> None:
"""Manage skill proposals from the self-improvement reviewer."""
@proposals.command(name="list")
@click.option("--role", default=None, help="Filter by agent role (slug or full name).")
def proposals_list(role: str | None) -> None:
"""List pending proposals across all roles."""
store = ProposalStore()
if role:
records = [store.load(p) for p in store.list_for_role(role)]
else:
records = store.list_all()
if not records:
click.echo("(no pending proposals)")
return
click.echo(f"{len(records)} proposal(s):")
for p in records:
_print_proposal_summary(p)
@proposals.command(name="show")
@click.argument("proposal_id")
def proposals_show(proposal_id: str) -> None:
"""Print the full body of a proposal."""
store = ProposalStore()
prop = store.find(proposal_id)
if prop is None:
click.echo(f"No proposal with id {proposal_id!r}", err=True)
raise SystemExit(1)
click.echo(f"id: {prop.id}")
click.echo(f"role: {prop.agent_role}")
click.echo(f"name: {prop.name}")
click.echo(f"description: {prop.description}")
click.echo(f"confidence: {prop.confidence:.2f}")
click.echo(f"kind: {prop.proposal_kind}")
if prop.target_skill:
click.echo(f"target: {prop.target_skill}")
click.echo(f"derived from: {', '.join(prop.derived_from_runs)}")
click.echo("\nrationale:")
click.echo(prop.rationale)
click.echo("\n--- SKILL.md body ---")
click.echo(prop.body)
@proposals.command(name="accept")
@click.argument("proposal_id")
@click.option(
"--force", is_flag=True, help="Overwrite an existing skill of the same name."
)
@click.option(
"--skills-dir",
type=click.Path(file_okay=False, path_type=Path),
default=None,
envvar="CREWAI_SELF_IMPROVE_SKILLS_DIR",
help=(
"Directory to write the SKILL.md to. Defaults to the env var "
"CREWAI_SELF_IMPROVE_SKILLS_DIR, then to the platform data dir. "
"Use a project-relative path (e.g. ./skills/learned) to keep "
"accepted skills under version control — and pass the same path "
"to Agent(self_improve=SelfImprovementConfig(skills_dir=...)) so "
"the agent loads it on the next kickoff."
),
)
def proposals_accept(proposal_id: str, force: bool, skills_dir: Path | None) -> None:
"""Materialize a proposal as a live SKILL.md."""
from crewai.skills.self_improve import SkillStore
store = ProposalStore()
prop = store.find(proposal_id)
if prop is None:
click.echo(f"No proposal with id {proposal_id!r}", err=True)
raise SystemExit(1)
skill_store = SkillStore(skills_root=skills_dir) if skills_dir else None
try:
path = accept_proposal(prop, force=force, skill_store=skill_store)
except FileExistsError as e:
click.echo(f"{e}", err=True)
raise SystemExit(2) from None
click.echo(f"✅ Accepted: {path}")
click.echo(
" This skill will load on the next kickoff for any agent with "
f"role={prop.agent_role!r} and self_improve enabled "
"(make sure SelfImprovementConfig.skills_dir matches if you used --skills-dir)."
)
@proposals.command(name="reject")
@click.argument("proposal_id")
def proposals_reject(proposal_id: str) -> None:
"""Drop a proposal from the queue without accepting."""
store = ProposalStore()
prop = store.find(proposal_id)
if prop is None:
click.echo(f"No proposal with id {proposal_id!r}", err=True)
raise SystemExit(1)
reject_proposal(prop)
click.echo(f"🗑 Rejected: {prop.id}")
@proposals.command(name="tui")
def proposals_tui() -> None:
"""Open an interactive triage TUI for the proposals queue."""
from crewai.cli.skill_proposals_tui import SkillProposalsTUI
SkillProposalsTUI().run()

View File

@@ -0,0 +1,38 @@
"""Self-improving skills for crewAI agents.
When an Agent is configured with ``self_improve=True``, a TraceCollector
subscribes to the event bus during kickoff, captures tool calls + outcome
signals into a RunTrace, auto-grades the run, and persists the trace to
disk. Across many runs, a SkillReviewer mines those traces for recurring
approaches and emits SkillProposals for human review.
"""
from crewai.skills.self_improve.acceptance import accept_proposal, reject_proposal
from crewai.skills.self_improve.auto_grade import grade_trace
from crewai.skills.self_improve.collector import TraceCollector
from crewai.skills.self_improve.models import (
Outcome,
RunTrace,
SelfImprovementConfig,
SkillProposal,
ToolCallRecord,
)
from crewai.skills.self_improve.reviewer import SkillReviewer
from crewai.skills.self_improve.storage import ProposalStore, SkillStore, TraceStore
__all__ = [
"Outcome",
"ProposalStore",
"RunTrace",
"SelfImprovementConfig",
"SkillProposal",
"SkillReviewer",
"SkillStore",
"ToolCallRecord",
"TraceCollector",
"TraceStore",
"accept_proposal",
"grade_trace",
"reject_proposal",
]

View File

@@ -0,0 +1,104 @@
"""Materialize an accepted ``SkillProposal`` as a live ``SKILL.md`` file.
Acceptance is the human checkpoint in the self-improvement loop: until a
proposal is accepted, it lives in the proposals queue and never affects
the agent. After acceptance, the SKILL.md lands at
``~/.crewai/skills/<role-slug>/<skill-name>/SKILL.md`` where the existing
skill loader discovers it on the next kickoff.
"""
from __future__ import annotations
import json
from pathlib import Path
from crewai.skills.self_improve.models import SkillProposal
from crewai.skills.self_improve.storage import ProposalStore, SkillStore
def _format_skill_md(proposal: SkillProposal) -> str:
"""Render the proposal as a SKILL.md document.
The body is written verbatim — proposals already include their own
markdown structure (title, sections). We only prepend the YAML
frontmatter the loader requires. Both fields are JSON-quoted because
JSON strings are valid YAML scalars and handle every special-char case
safely.
"""
body = proposal.body.lstrip()
# If the LLM already emitted frontmatter (defensive), don't double it.
if body.startswith("---"):
return body if body.endswith("\n") else body + "\n"
frontmatter = (
f"---\n"
f"name: {json.dumps(proposal.name)}\n"
f"description: {json.dumps(proposal.description)}\n"
f"---\n\n"
)
return frontmatter + body if body.endswith("\n") else frontmatter + body + "\n"
def accept_proposal(
proposal: SkillProposal,
*,
force: bool = False,
skill_store: SkillStore | None = None,
proposal_store: ProposalStore | None = None,
) -> Path:
"""Write the proposal as a SKILL.md and remove it from the queue.
When ``skill_store`` is not provided, the destination is selected in
this order:
1. ``proposal.skills_dir`` — set by the reviewer from the agent's
``SelfImprovementConfig.skills_dir``. This is the common case; it
keeps accept aligned with where the agent reads from.
2. Platform default — ``<db_storage_path>/self_improve/skills/``.
Args:
proposal: The proposal to materialize.
force: When True, overwrite an existing SKILL.md at the target path.
skill_store: Explicit override; bypasses the proposal hint.
proposal_store: Override for the proposals store (test injection).
Returns:
Path to the written ``SKILL.md``.
Raises:
FileExistsError: When a SKILL.md already exists at the target and
``force=False``.
"""
if skill_store is None:
skill_store = (
SkillStore(skills_root=proposal.skills_dir)
if proposal.skills_dir is not None
else SkillStore()
)
proposal_store = proposal_store or ProposalStore()
target_dir = skill_store.skill_dir(proposal.agent_role, proposal.name)
skill_md = target_dir / "SKILL.md"
if skill_md.exists() and not force:
raise FileExistsError(
f"{skill_md} already exists. Pass force=True to overwrite."
)
target_dir.mkdir(parents=True, exist_ok=True)
skill_md.write_text(_format_skill_md(proposal), encoding="utf-8")
# Once accepted, the proposal record is no longer the source of truth —
# the SKILL.md is. Drop the queue entry so it doesn't show up in `list`.
proposal_store.delete(proposal.id, proposal.agent_role)
return skill_md
def reject_proposal(
proposal: SkillProposal,
*,
proposal_store: ProposalStore | None = None,
) -> bool:
"""Delete a proposal from the queue. Returns True if it existed."""
proposal_store = proposal_store or ProposalStore()
return proposal_store.delete(proposal.id, proposal.agent_role)

View File

@@ -0,0 +1,76 @@
"""Derive a run outcome from observable signals.
No human grading required. The reviewer treats outcomes as
confidence-weighted hints, not ground truth — clustering across runs
absorbs label noise.
"""
from __future__ import annotations
from collections import Counter
from crewai.skills.self_improve.models import Outcome, RunTrace
_RETRY_THRASH_THRESHOLD = 3
def _has_thrashing(trace: RunTrace) -> bool:
"""Detect a tool being called repeatedly with the same args summary.
A tool fired more than ``_RETRY_THRASH_THRESHOLD`` times with identical
args is likely stuck in a retry loop rather than making progress.
"""
if len(trace.tool_calls) <= _RETRY_THRASH_THRESHOLD:
return False
keys = [(t.name, t.args_summary) for t in trace.tool_calls]
most_common_count = Counter(keys).most_common(1)[0][1]
return most_common_count > _RETRY_THRASH_THRESHOLD
def _output_looks_empty(trace: RunTrace) -> bool:
if trace.output_summary is None:
return False
stripped = trace.output_summary.strip()
if not stripped:
return True
lowered = stripped.lower()
return lowered.startswith("error") or lowered.startswith("traceback")
def grade_trace(trace: RunTrace) -> Outcome:
"""Compute outcome from signals already present on the trace.
Signal hierarchy (strongest first):
1. explicit error → failure
2. guardrail decided → trust it
3. max_iter exhaustion → failure
4. tool error rate / thrashing / empty output → failure or partial
5. otherwise → success when we saw output, else unknown
"""
if trace.error:
return "failure"
if trace.guardrail_passed is True:
return "success"
if trace.guardrail_passed is False:
return "failure"
if trace.max_iter_exhausted:
return "failure"
if _has_thrashing(trace):
return "failure"
if _output_looks_empty(trace):
return "failure"
if trace.tool_call_count > 0:
error_rate = trace.tool_error_count / trace.tool_call_count
if error_rate >= 0.5:
return "failure"
if trace.output_summary:
return "success"
return "unknown"

View File

@@ -0,0 +1,298 @@
"""Trace collector that subscribes to the event bus.
One ``TraceCollector`` per Agent instance. It keeps a per-(agent, task)
in-flight trace, identifies which events belong to its agent, and
finalizes the trace on completion — auto-grading and persisting it.
"""
from __future__ import annotations
import atexit
from datetime import UTC, datetime
import logging
from pathlib import Path
import threading
from typing import TYPE_CHECKING, Any
from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
LiteAgentExecutionCompletedEvent,
LiteAgentExecutionErrorEvent,
LiteAgentExecutionStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.skills.self_improve.auto_grade import grade_trace
from crewai.skills.self_improve.models import RunTrace, ToolCallRecord
from crewai.skills.self_improve.storage import TraceStore
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
_OUTPUT_TRUNCATE = 4000
_ARGS_TRUNCATE = 200
_FLUSH_TIMEOUT = 10.0
logger = logging.getLogger(__name__)
# ``Agent.kickoff()`` can return before the bus's thread-pool handlers drain;
# without this hook a script that exits immediately would lose the just-
# finalized trace. ``flush()`` is a no-op when no events are pending, so it's
# safe to register at module import time even if no agent ever opts in.
atexit.register(lambda: crewai_event_bus.flush(timeout=_FLUSH_TIMEOUT))
def _truncate(value: Any, limit: int) -> str:
if value is None:
return ""
s = value if isinstance(value, str) else repr(value)
if len(s) <= limit:
return s
return s[: limit - 1] + ""
class TraceCollector:
"""Captures one ``RunTrace`` per agent execution by subscribing to events.
Lifecycle:
- ``attach(bus)`` registers handlers on the global event bus.
- On ``AgentExecutionStartedEvent`` for our agent, a new trace begins.
- ``ToolUsage*`` events for our agent append ``ToolCallRecord``s.
- On ``AgentExecutionCompletedEvent`` / ``AgentExecutionErrorEvent``
for our agent, the trace is auto-graded and persisted.
The collector is intentionally tolerant: a missing Started event
(e.g. because the agent was already executing when ``attach`` ran) just
skips that trace. Tool events without a current trace are ignored.
"""
def __init__(
self,
agent: BaseAgent,
store: TraceStore | None = None,
) -> None:
self._agent = agent
self._store = store or TraceStore()
self._current: RunTrace | None = None
self._tool_started_at: dict[str, datetime] = {}
self._attached = False
# Bus dispatches handlers on a thread pool, and Agent.kickoff() +
# LiteAgent each emit Started/Completed for the same logical run, so
# the lock + ``self._current is not None`` check serializes the
# "create or skip" decision and dedupes the second half of the pair.
self._lock = threading.RLock()
@property
def current_trace(self) -> RunTrace | None:
"""The in-flight trace, if an agent execution is active."""
return self._current
def _is_my_agent(self, event_agent: Any) -> bool:
if event_agent is None:
return False
if event_agent is self._agent:
return True
agent_id = getattr(event_agent, "id", None)
my_id = getattr(self._agent, "id", None)
return bool(agent_id is not None and my_id is not None and agent_id == my_id)
def _is_my_id(self, event_agent_id: str | None) -> bool:
if not event_agent_id:
return False
my_id = getattr(self._agent, "id", None)
return bool(my_id is not None and str(my_id) == str(event_agent_id))
def attach(self, bus: CrewAIEventsBus) -> None:
"""Register event handlers. Idempotent."""
if self._attached:
return
self._attached = True
@bus.on(AgentExecutionStartedEvent)
def _on_started(_source: Any, event: AgentExecutionStartedEvent) -> None:
if not self._is_my_agent(event.agent):
return
with self._lock:
if self._current is not None:
return # duplicate Started for an in-flight trace
task = event.task
self._current = RunTrace(
agent_id=str(getattr(self._agent, "id", "") or ""),
agent_role=self._agent.role,
agent_goal=getattr(self._agent, "goal", "") or "",
agent_skills_dir=self._agent_skills_dir(),
task_id=str(getattr(task, "id", "") or "") or None,
task_description=getattr(task, "description", None),
loaded_skills=self._collect_loaded_skills(),
)
@bus.on(ToolUsageStartedEvent)
def _on_tool_started(_source: Any, event: ToolUsageStartedEvent) -> None:
if not self._is_my_id(event.agent_id):
return
with self._lock:
if self._current is None:
return
self._tool_started_at[event.tool_name] = datetime.now(UTC)
@bus.on(ToolUsageFinishedEvent)
def _on_tool_finished(_source: Any, event: ToolUsageFinishedEvent) -> None:
if not self._is_my_id(event.agent_id):
return
with self._lock:
if self._current is None:
return
self._current.tool_calls.append(
ToolCallRecord(
name=event.tool_name,
args_summary=_truncate(event.tool_args, _ARGS_TRUNCATE),
ok=True,
duration_ms=self._duration_ms(
event.tool_name, event.finished_at
),
)
)
self._tool_started_at.pop(event.tool_name, None)
@bus.on(ToolUsageErrorEvent)
def _on_tool_error(_source: Any, event: ToolUsageErrorEvent) -> None:
if not self._is_my_id(event.agent_id):
return
with self._lock:
if self._current is None:
return
self._current.tool_calls.append(
ToolCallRecord(
name=event.tool_name,
args_summary=_truncate(event.tool_args, _ARGS_TRUNCATE),
ok=False,
error=_truncate(event.error, _ARGS_TRUNCATE),
duration_ms=self._duration_ms(
event.tool_name, datetime.now(UTC)
),
)
)
self._tool_started_at.pop(event.tool_name, None)
@bus.on(AgentExecutionCompletedEvent)
def _on_completed(_source: Any, event: AgentExecutionCompletedEvent) -> None:
if not self._is_my_agent(event.agent):
return
with self._lock:
if self._current is None:
return
self._current.output_summary = _truncate(event.output, _OUTPUT_TRUNCATE)
self._finalize_locked()
@bus.on(AgentExecutionErrorEvent)
def _on_error(_source: Any, event: AgentExecutionErrorEvent) -> None:
if not self._is_my_agent(event.agent):
return
with self._lock:
if self._current is None:
return
self._current.error = _truncate(event.error, _OUTPUT_TRUNCATE)
self._finalize_locked()
@bus.on(LiteAgentExecutionStartedEvent)
def _on_lite_started(
_source: Any, event: LiteAgentExecutionStartedEvent
) -> None:
if not self._is_my_id(event.agent_info.get("id")):
return
with self._lock:
if self._current is not None:
return # duplicate Started for an in-flight trace
messages = event.messages
if isinstance(messages, list):
task_desc = " ".join(
str(m.get("content", ""))
for m in messages
if isinstance(m, dict)
)
else:
task_desc = str(messages)
self._current = RunTrace(
agent_id=str(getattr(self._agent, "id", "") or ""),
agent_role=self._agent.role,
agent_goal=getattr(self._agent, "goal", "") or "",
agent_skills_dir=self._agent_skills_dir(),
task_description=_truncate(task_desc, _OUTPUT_TRUNCATE),
loaded_skills=self._collect_loaded_skills(),
)
@bus.on(LiteAgentExecutionCompletedEvent)
def _on_lite_completed(
_source: Any, event: LiteAgentExecutionCompletedEvent
) -> None:
if not self._is_my_id(event.agent_info.get("id")):
return
with self._lock:
if self._current is None:
return
self._current.output_summary = _truncate(event.output, _OUTPUT_TRUNCATE)
self._finalize_locked()
@bus.on(LiteAgentExecutionErrorEvent)
def _on_lite_error(_source: Any, event: LiteAgentExecutionErrorEvent) -> None:
if not self._is_my_id(event.agent_info.get("id")):
return
with self._lock:
if self._current is None:
return
self._current.error = _truncate(event.error, _OUTPUT_TRUNCATE)
self._finalize_locked()
def _duration_ms(self, tool_name: str, finished_at: datetime) -> int | None:
started = self._tool_started_at.get(tool_name)
if started is None:
return None
return max(0, int((finished_at - started).total_seconds() * 1000))
def _agent_skills_dir(self) -> Path | None:
"""Read skills_dir from the agent's SelfImprovementConfig if set.
We use ``getattr`` + duck typing instead of importing Agent so the
collector stays usable in tests with stub agents.
"""
config_getter = getattr(self._agent, "_self_improve_config", None)
if not callable(config_getter):
return None
try:
config = config_getter()
except Exception:
return None
return getattr(config, "skills_dir", None) if config is not None else None
def _collect_loaded_skills(self) -> list[str]:
skills = getattr(self._agent, "skills", None) or []
names: list[str] = []
for s in skills:
name = getattr(s, "name", None)
if isinstance(name, str):
names.append(name)
return names
def _finalize_locked(self) -> None:
"""Caller holds ``self._lock``."""
trace = self._current
if trace is None:
return
self._current = None
self._tool_started_at.clear()
trace.ended_at = datetime.now(UTC)
trace.outcome = grade_trace(trace)
try:
self._store.save(trace)
except OSError:
logger.exception(
"Failed to persist run trace for role %s", trace.agent_role
)

View File

@@ -0,0 +1,156 @@
"""Data models for self-improving skills."""
from __future__ import annotations
from datetime import UTC, datetime
from pathlib import Path
import re
from typing import Literal
import uuid
from pydantic import BaseModel, ConfigDict, Field, field_validator
_SLUG_NON_ALNUM = re.compile(r"[^a-z0-9-]+")
_SLUG_DASHES = re.compile(r"-+")
def _slugify(value: str) -> str:
"""Force a string into the kebab-case shape the skill loader requires.
The reviewer LLM is told to emit kebab-case names, but doesn't always
comply (e.g. it emits Title Case with spaces). Without this normalization
the accepted SKILL.md fails the loader's name pattern and is silently
skipped, breaking the closed loop.
"""
s = value.strip().lower().replace("_", "-").replace(" ", "-")
s = _SLUG_NON_ALNUM.sub("", s)
s = _SLUG_DASHES.sub("-", s).strip("-")
return s[:64] or "skill"
def _now() -> datetime:
return datetime.now(UTC)
class SelfImprovementConfig(BaseModel):
"""Per-agent configuration for the self-improvement loop.
All fields are optional with sensible defaults. Pass to ``Agent`` as
``self_improve=SelfImprovementConfig(skills_dir=Path("./skills/learned"))``
to override; ``Agent(self_improve=True)`` uses defaults.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
skills_dir: Path | None = Field(
default=None,
description=(
"Where accepted SKILL.md files are written and auto-loaded from. "
"When None, falls back to <db_storage_path>/self_improve/skills/. "
"Set to a project-relative path (e.g. Path('./skills/learned')) "
"to keep accepted skills under version control."
),
)
Outcome = Literal["success", "failure", "unknown"]
ProposalKind = Literal["new", "patch_existing"]
def _new_id(prefix: str) -> str:
"""Generate a short id with a stable prefix."""
return f"{prefix}_{uuid.uuid4().hex[:12]}"
class ToolCallRecord(BaseModel):
"""One tool invocation within a run."""
model_config = ConfigDict(frozen=True)
name: str
args_summary: str = Field(
default="",
description="Truncated string repr of args, suitable for clustering.",
)
ok: bool = True
error: str | None = None
duration_ms: int | None = None
class RunTrace(BaseModel):
"""One agent + task execution.
Built incrementally by ``TraceCollector`` from event-bus events,
finalized at agent completion, then persisted to disk.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
id: str = Field(default_factory=lambda: _new_id("run"))
agent_id: str | None = None
agent_role: str
agent_goal: str = ""
agent_skills_dir: Path | None = Field(
default=None,
description=(
"Resolved skills_dir from the agent's SelfImprovementConfig at "
"trace time. Carried into proposals so accept writes back to "
"the same place the agent reads from."
),
)
task_id: str | None = None
task_description: str | None = None
started_at: datetime = Field(default_factory=_now)
ended_at: datetime | None = None
tool_calls: list[ToolCallRecord] = Field(default_factory=list)
loaded_skills: list[str] = Field(default_factory=list)
outcome: Outcome = "unknown"
output_summary: str | None = None
error: str | None = None
max_iter_exhausted: bool = False
guardrail_passed: bool | None = None
@property
def tool_error_count(self) -> int:
return sum(1 for t in self.tool_calls if not t.ok)
@property
def tool_call_count(self) -> int:
return len(self.tool_calls)
class SkillProposal(BaseModel):
"""A proposed new or updated skill, awaiting human review.
The reviewer LLM emits these directly via a thin envelope; the server
stamps ``agent_role`` and ``derived_from_runs`` after the call, which is
why those two have permissive defaults rather than being required.
"""
model_config = ConfigDict(frozen=True)
id: str = Field(default_factory=lambda: _new_id("prop"))
agent_role: str = ""
name: str
description: str
body: str
rationale: str
confidence: float = Field(ge=0.0, le=1.0)
proposal_kind: ProposalKind = "new"
target_skill: str | None = None
derived_from_runs: list[str] = Field(default_factory=list)
skills_dir: Path | None = Field(
default=None,
description=(
"Where to write the SKILL.md when this proposal is accepted, "
"carried over from the agent's SelfImprovementConfig at trace "
"time. Falls back to platform default when None."
),
)
created_at: datetime = Field(default_factory=_now)
@field_validator("name", mode="before")
@classmethod
def _force_slug(cls, v: str | None) -> str | None:
return _slugify(v) if isinstance(v, str) else v

View File

@@ -0,0 +1,220 @@
"""LLM-driven skill reviewer.
Reads N ``RunTrace`` records, identifies recurring *approaches* (not
facts — those go to memory), and emits ``SkillProposal``s for human
review.
The reviewer never writes to the active skills directory itself; it only
populates the proposals queue. Acceptance is a separate, explicit step.
"""
from __future__ import annotations
from pydantic import BaseModel, ConfigDict, Field
from crewai.llms.base_llm import BaseLLM
from crewai.skills.self_improve.models import RunTrace, SkillProposal
_TRACE_OUTPUT_TRUNCATE = 1500
_TRACE_TASK_TRUNCATE = 800
class _ReviewerOutput(BaseModel):
"""Envelope around the LLM's structured proposals.
``SkillProposal.agent_role`` and ``derived_from_runs`` are intentionally
server-filled post-call; the LLM doesn't need to emit them.
"""
model_config = ConfigDict(extra="ignore")
proposals: list[SkillProposal] = Field(default_factory=list)
_SYSTEM_PROMPT = """\
You are reviewing execution traces from a CrewAI agent to decide what
*reusable approaches* are worth saving as agent skills.
The agent's role: {role}
The agent's goal: {goal}
You see {n} traces from past runs. Your job:
1. Identify recurring NON-DETERMINISTIC APPROACHES — patterns of how this
agent thinks about a class of problems. Examples of approaches:
- "When asked to scaffold a project, always start by listing required
config files before writing code."
- "If a tool returns empty results, rephrase the query before retrying."
2. DROP facts. Facts go to memory, not skills. Examples of facts:
- "the user prefers dark mode"
- "the API key for service X is in vault Y"
- "this project uses pytest"
3. Drop one-off observations. A skill needs ≥2 traces showing the same
approach. If only one trace shows it, omit.
4. If a proposal would patch an EXISTING loaded skill (listed below), set
proposal_kind="patch_existing" and target_skill=<that skill name>.
Otherwise leave proposal_kind="new".
5. Score confidence honestly. Below 0.6 will be auto-rejected, so don't
pad. A pattern seen in 2/{n} traces with no contradictions ≈ 0.65; in
all {n} traces with consistent outcome ≈ 0.85.
6. SKILL.md body should be markdown:
- First line: short description in italics or as a sentence
- "When to use this skill" section
- "How to apply it" section with concrete steps
- No invented facts or fabricated tool names
Loaded skills already available to this agent:
{loaded_skills}
Proposals already QUEUED for human review (do not re-propose these — a
prior reviewer round already surfaced them, they're awaiting accept/reject):
{pending_proposals}
If a recurring pattern matches a queued proposal semantically, simply
omit it from your output. Don't restate the same approach under a new
name. The human curator will resolve the queue.
Return a JSON object matching the schema. Empty proposals list is a valid
answer when no recurring approach is worth saving.
"""
_USER_PROMPT = """\
TRACES ({n}):
{traces}
Review the traces above and emit your proposals.
"""
def _format_trace(trace: RunTrace) -> str:
"""One block per trace, compact but enough signal."""
task = (trace.task_description or "").strip()
if len(task) > _TRACE_TASK_TRUNCATE:
task = task[: _TRACE_TASK_TRUNCATE - 1] + ""
output = (trace.output_summary or "").strip()
if len(output) > _TRACE_OUTPUT_TRUNCATE:
output = output[: _TRACE_OUTPUT_TRUNCATE - 1] + ""
tool_lines: list[str] = []
for t in trace.tool_calls:
tag = "ok" if t.ok else "ERR"
tool_lines.append(f" [{tag}] {t.name}({t.args_summary})")
tools_block = "\n".join(tool_lines) if tool_lines else " (no tool calls)"
return (
f"--- {trace.id} outcome={trace.outcome} "
f"max_iter_exhausted={trace.max_iter_exhausted} "
f"guardrail_passed={trace.guardrail_passed}\n"
f"task:\n{task}\n\n"
f"tool_calls ({len(trace.tool_calls)}):\n{tools_block}\n\n"
f"output_summary:\n{output}\n"
)
class SkillReviewer:
"""Synthesize ``SkillProposal``s from a batch of ``RunTrace``s.
Stateless; the only state lives in the disk store. Pass an LLM (any
``BaseLLM`` instance — typically a small/cheap model like Haiku is
enough since the reviewer just summarizes).
"""
def __init__(
self,
*,
agent_role: str,
agent_goal: str,
llm: BaseLLM,
min_traces: int = 3,
confidence_floor: float = 0.6,
) -> None:
self.agent_role = agent_role
self.agent_goal = agent_goal
self.llm = llm
self.min_traces = min_traces
self.confidence_floor = confidence_floor
def review(
self,
traces: list[RunTrace],
*,
loaded_skill_names: list[str] | None = None,
pending_proposals: list[SkillProposal] | None = None,
) -> list[SkillProposal]:
"""Run the LLM review and return filtered proposals.
Returns an empty list when ``len(traces) < self.min_traces`` so
the reviewer is safe to call early — it just no-ops.
Pass ``pending_proposals`` (typically the current contents of the
ProposalStore for this role) so the reviewer doesn't re-emit
semantic duplicates of items already awaiting human review.
"""
if len(traces) < self.min_traces:
return []
loaded_skills_str = (
"\n".join(f" - {name}" for name in loaded_skill_names)
if loaded_skill_names
else " (none)"
)
if pending_proposals:
pending_str = "\n".join(
f" - {p.name}: {p.description}" for p in pending_proposals
)
else:
pending_str = " (none)"
system = _SYSTEM_PROMPT.format(
role=self.agent_role,
goal=self.agent_goal,
n=len(traces),
loaded_skills=loaded_skills_str,
pending_proposals=pending_str,
)
user = _USER_PROMPT.format(
n=len(traces),
traces="\n".join(_format_trace(t) for t in traces),
)
result = self.llm.call(
messages=[
{"role": "system", "content": system},
{"role": "user", "content": user},
],
response_model=_ReviewerOutput,
)
if not isinstance(result, _ReviewerOutput):
return []
run_ids = [t.id for t in traces]
# Carry the agent's skills_dir from the latest trace forward so the
# accept step writes to the same path the agent reads from. We pick
# the *last* trace's value because configuration drift (a user
# changing skills_dir between runs) should land at the most recent.
skills_dir = next(
(t.agent_skills_dir for t in reversed(traces) if t.agent_skills_dir),
None,
)
return [
p.model_copy(
update={
"agent_role": self.agent_role,
"derived_from_runs": run_ids,
"skills_dir": skills_dir,
}
)
for p in result.proposals
if p.confidence >= self.confidence_floor
]

View File

@@ -0,0 +1,177 @@
"""On-disk storage for traces and skill proposals.
Layout::
<root>/
traces/<role>/<run_id>.json
skill_proposals/<role>/<proposal_id>.json
skills/<role>/<skill-name>/SKILL.md
The default root is ``db_storage_path() / "self_improve"`` — the same
project-scoped, platform-correct data dir that memory DBs use (set by
``appdirs.user_data_dir`` and overridable via ``CREWAI_STORAGE_DIR``).
``CREWAI_SELF_IMPROVE_DIR`` overrides specifically this feature's root,
useful for tests or migrations.
"""
from __future__ import annotations
import os
from pathlib import Path
import re
from crewai.skills.self_improve.models import RunTrace, SkillProposal
from crewai.utilities.paths import db_storage_path
_ENV_VAR = "CREWAI_SELF_IMPROVE_DIR"
_SLUG_RE = re.compile(r"[^a-z0-9_-]+")
def _slug(role: str) -> str:
"""Slugify an agent role for use as a directory name."""
s = role.strip().lower().replace(" ", "-")
s = _SLUG_RE.sub("", s)
return s or "agent"
def _resolve_root(root: Path | None) -> Path:
if root is not None:
return root
env = os.environ.get(_ENV_VAR)
if env:
return Path(env)
return Path(db_storage_path()) / "self_improve"
def _write_json(path: Path, payload: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(payload, encoding="utf-8")
class TraceStore:
"""Filesystem store for ``RunTrace`` records."""
def __init__(self, root: Path | None = None) -> None:
self.root = _resolve_root(root) / "traces"
def role_dir(self, role: str) -> Path:
return self.root / _slug(role)
def path_for(self, trace: RunTrace) -> Path:
return self.role_dir(trace.agent_role) / f"{trace.id}.json"
def save(self, trace: RunTrace) -> Path:
path = self.path_for(trace)
_write_json(path, trace.model_dump_json(indent=2))
return path
def list_for_role(self, role: str) -> list[Path]:
d = self.role_dir(role)
if not d.exists():
return []
return sorted(d.glob("*.json"))
def load(self, path: Path) -> RunTrace:
return RunTrace.model_validate_json(path.read_text(encoding="utf-8"))
def count_for_role(self, role: str) -> int:
return len(self.list_for_role(role))
class ProposalStore:
"""Filesystem store for ``SkillProposal`` records pending human review."""
def __init__(self, root: Path | None = None) -> None:
self.root = _resolve_root(root) / "skill_proposals"
def role_dir(self, role: str) -> Path:
return self.root / _slug(role)
def path_for(self, proposal: SkillProposal) -> Path:
return self.role_dir(proposal.agent_role) / f"{proposal.id}.json"
def save(self, proposal: SkillProposal) -> Path:
path = self.path_for(proposal)
_write_json(path, proposal.model_dump_json(indent=2))
return path
def list_for_role(self, role: str) -> list[Path]:
d = self.role_dir(role)
if not d.exists():
return []
return sorted(d.glob("*.json"))
def load(self, path: Path) -> SkillProposal:
return SkillProposal.model_validate_json(path.read_text(encoding="utf-8"))
def delete(self, proposal_id: str, role: str) -> bool:
path = self.role_dir(role) / f"{proposal_id}.json"
if path.exists():
path.unlink()
return True
return False
def find(self, proposal_id: str) -> SkillProposal | None:
"""Locate a proposal by id across all role dirs. Returns None if missing."""
if not self.root.exists():
return None
for role_dir in self.root.iterdir():
if not role_dir.is_dir():
continue
path = role_dir / f"{proposal_id}.json"
if path.exists():
return self.load(path)
return None
def list_all(self) -> list[SkillProposal]:
"""All proposals across roles, oldest first by file id."""
if not self.root.exists():
return []
out: list[SkillProposal] = []
for role_dir in sorted(self.root.iterdir()):
if not role_dir.is_dir():
continue
out.extend(self.load(path) for path in sorted(role_dir.glob("*.json")))
return out
class SkillStore:
"""Filesystem store for accepted (live) skills.
Each accepted ``SkillProposal`` becomes a directory under
``role_dir(role) / skill_name`` with a ``SKILL.md`` inside, matching
the layout the existing skill loader discovers.
Two ways to construct:
- ``SkillStore()`` — root is ``<db_storage_path>/self_improve/skills/``
(the platform default colocated with traces + proposals).
- ``SkillStore(skills_root=Path("./skills/learned"))`` — root is the
given path verbatim. Use this when the agent is configured with
``SelfImprovementConfig(skills_dir=...)`` so accepted skills land in
the project tree.
"""
def __init__(
self,
root: Path | None = None,
skills_root: Path | None = None,
) -> None:
self.root = (
skills_root if skills_root is not None else _resolve_root(root) / "skills"
)
def role_dir(self, role: str) -> Path:
return self.root / _slug(role)
def skill_dir(self, role: str, skill_name: str) -> Path:
return self.role_dir(role) / skill_name
def has_any(self, role: str) -> bool:
d = self.role_dir(role)
if not d.exists():
return False
return any(
(child / "SKILL.md").is_file() for child in d.iterdir() if child.is_dir()
)

View File

@@ -0,0 +1,243 @@
"""Tests for self_improve/acceptance.py."""
from __future__ import annotations
from pathlib import Path
import pytest
from crewai.skills.self_improve.acceptance import (
_format_skill_md,
accept_proposal,
reject_proposal,
)
from crewai.skills.self_improve.models import SkillProposal
from crewai.skills.self_improve.storage import ProposalStore, SkillStore
@pytest.fixture
def proposal() -> SkillProposal:
return SkillProposal(
agent_role="Senior Researcher",
name="cite-sources",
description="Always cite sources in research outputs",
body="# Cite Sources\n\n*Always cite sources.*\n\n## When to use\n\nWhenever you write a research summary.\n",
rationale="Seen in 3 of 4 traces",
confidence=0.8,
derived_from_runs=["run_a", "run_b", "run_c"],
)
@pytest.fixture
def stores(tmp_path: Path):
return SkillStore(root=tmp_path), ProposalStore(root=tmp_path)
class TestFormatSkillMd:
def test_includes_yaml_frontmatter(self, proposal: SkillProposal) -> None:
out = _format_skill_md(proposal)
assert out.startswith("---\n")
# Values are JSON-quoted (valid YAML scalars).
assert 'name: "cite-sources"' in out
assert '"Always cite sources in research outputs"' in out
assert "# Cite Sources" in out
def test_quotes_descriptions_with_special_chars(self) -> None:
prop = SkillProposal(
agent_role="r",
name="n",
description='Has a "quote" and: colon',
body="b",
rationale="r",
confidence=0.7,
)
out = _format_skill_md(prop)
# JSON-quoted: backslash-escapes the inner quote, retains the colon literally.
assert 'description: "Has a \\"quote\\" and: colon"' in out
def test_passes_through_body_with_existing_frontmatter(self) -> None:
prop = SkillProposal(
agent_role="r",
name="n",
description="d",
body="---\nname: n\n---\n# Body\n",
rationale="r",
confidence=0.7,
)
out = _format_skill_md(prop)
# No double frontmatter
assert out.count("---") == 2 # one open, one close
class TestAcceptProposal:
def test_writes_skill_md_at_expected_path(
self, stores, proposal: SkillProposal
) -> None:
skill_store, proposal_store = stores
proposal_store.save(proposal)
path = accept_proposal(
proposal,
skill_store=skill_store,
proposal_store=proposal_store,
)
assert path.name == "SKILL.md"
# role gets slugified
assert "senior-researcher" in str(path)
assert "cite-sources" in str(path)
body = path.read_text()
assert body.startswith("---\n")
assert "# Cite Sources" in body
def test_removes_proposal_from_queue_on_accept(
self, stores, proposal: SkillProposal
) -> None:
skill_store, proposal_store = stores
proposal_store.save(proposal)
assert proposal_store.find(proposal.id) is not None
accept_proposal(
proposal, skill_store=skill_store, proposal_store=proposal_store
)
assert proposal_store.find(proposal.id) is None
def test_refuses_to_overwrite_existing(
self, stores, proposal: SkillProposal
) -> None:
skill_store, proposal_store = stores
proposal_store.save(proposal)
accept_proposal(
proposal, skill_store=skill_store, proposal_store=proposal_store
)
# Re-save and re-accept the same proposal id (or a fresh one) → conflict
proposal_store.save(proposal)
with pytest.raises(FileExistsError):
accept_proposal(
proposal,
skill_store=skill_store,
proposal_store=proposal_store,
)
def test_force_overwrites(self, stores, proposal: SkillProposal) -> None:
skill_store, proposal_store = stores
proposal_store.save(proposal)
accept_proposal(
proposal, skill_store=skill_store, proposal_store=proposal_store
)
proposal_store.save(proposal)
path = accept_proposal(
proposal,
skill_store=skill_store,
proposal_store=proposal_store,
force=True,
)
assert path.exists()
class TestRejectProposal:
def test_removes_from_queue(self, stores, proposal: SkillProposal) -> None:
_, proposal_store = stores
proposal_store.save(proposal)
assert reject_proposal(proposal, proposal_store=proposal_store) is True
assert proposal_store.find(proposal.id) is None
def test_returns_false_when_missing(
self, stores, proposal: SkillProposal
) -> None:
_, proposal_store = stores
assert reject_proposal(proposal, proposal_store=proposal_store) is False
class TestSkillsDirPlumbing:
"""The agent's ``SelfImprovementConfig.skills_dir`` should flow through
trace → proposal → accept so the SKILL.md lands at the same place the
agent reads from, regardless of which CLI/TUI path triggered the accept.
"""
def test_proposal_skills_dir_overrides_default(self, tmp_path: Path) -> None:
# Simulate the reviewer setting skills_dir on a proposal (carried
# over from the trace, which captured it from the agent config).
project_skills = tmp_path / "project" / "skills" / "learned"
proposal = SkillProposal(
agent_role="researcher",
name="cite-sources",
description="Always cite sources",
body="# Cite Sources\n",
rationale="r",
confidence=0.8,
skills_dir=project_skills,
)
# No skill_store passed — accept should honor proposal.skills_dir.
proposal_store = ProposalStore(root=tmp_path / "queue")
proposal_store.save(proposal)
path = accept_proposal(proposal, proposal_store=proposal_store)
# SKILL.md is at <project_skills>/<role>/<name>/SKILL.md, NOT at
# the default platform path.
assert project_skills in path.parents
assert path.name == "SKILL.md"
assert "researcher" in str(path)
assert "cite-sources" in str(path)
def test_explicit_skill_store_overrides_proposal_hint(
self, tmp_path: Path
) -> None:
# If a caller passes skill_store explicitly (e.g. CLI --skills-dir
# flag), it wins over the proposal's stored hint.
proposal_skills = tmp_path / "from-proposal"
cli_skills = tmp_path / "from-cli"
proposal = SkillProposal(
agent_role="researcher",
name="cite-sources",
description="d",
body="b",
rationale="r",
confidence=0.8,
skills_dir=proposal_skills,
)
proposal_store = ProposalStore(root=tmp_path / "queue")
proposal_store.save(proposal)
skill_store = SkillStore(skills_root=cli_skills)
path = accept_proposal(
proposal, proposal_store=proposal_store, skill_store=skill_store
)
assert cli_skills in path.parents
assert proposal_skills not in path.parents
def test_proposal_without_skills_dir_uses_platform_default(
self, tmp_path: Path, monkeypatch
) -> None:
monkeypatch.setenv("CREWAI_SELF_IMPROVE_DIR", str(tmp_path / "platform"))
proposal = SkillProposal(
agent_role="researcher",
name="cite-sources",
description="d",
body="b",
rationale="r",
confidence=0.8,
)
proposal_store = ProposalStore(root=tmp_path / "queue")
proposal_store.save(proposal)
path = accept_proposal(proposal, proposal_store=proposal_store)
assert tmp_path / "platform" in path.parents
class TestSkillStore:
def test_has_any_detects_at_least_one_skill(
self, stores, proposal: SkillProposal
) -> None:
skill_store, proposal_store = stores
assert skill_store.has_any("Senior Researcher") is False
proposal_store.save(proposal)
accept_proposal(
proposal, skill_store=skill_store, proposal_store=proposal_store
)
assert skill_store.has_any("Senior Researcher") is True

View File

@@ -0,0 +1,85 @@
"""Tests for self_improve/auto_grade.py."""
from __future__ import annotations
from crewai.skills.self_improve.auto_grade import grade_trace
from crewai.skills.self_improve.models import RunTrace, ToolCallRecord
def _trace(**kw):
return RunTrace(agent_role="r", **kw)
def test_explicit_error_is_failure() -> None:
assert grade_trace(_trace(error="kaboom", output_summary="ok")) == "failure"
def test_guardrail_pass_overrides_other_signals() -> None:
trace = _trace(
guardrail_passed=True,
max_iter_exhausted=True, # would normally fail, but guardrail wins
output_summary="ok",
)
assert grade_trace(trace) == "success"
def test_guardrail_fail_is_failure() -> None:
assert grade_trace(_trace(guardrail_passed=False, output_summary="x")) == "failure"
def test_max_iter_is_failure() -> None:
assert grade_trace(_trace(max_iter_exhausted=True, output_summary="x")) == "failure"
def test_thrashing_is_failure() -> None:
trace = _trace(
tool_calls=[
ToolCallRecord(name="search", args_summary="q=x") for _ in range(5)
],
output_summary="ok",
)
assert grade_trace(trace) == "failure"
def test_empty_output_is_failure() -> None:
assert grade_trace(_trace(output_summary=" ")) == "failure"
def test_error_string_output_is_failure() -> None:
assert grade_trace(_trace(output_summary="Error: boom")) == "failure"
def test_minority_tool_errors_still_count_as_success() -> None:
trace = _trace(
tool_calls=[
ToolCallRecord(name="a", ok=True),
ToolCallRecord(name="b", ok=True),
ToolCallRecord(name="c", ok=False, error="x"),
],
output_summary="answer",
)
assert grade_trace(trace) == "success"
def test_failure_when_majority_tool_errors() -> None:
trace = _trace(
tool_calls=[
ToolCallRecord(name="a", ok=False, error="x"),
ToolCallRecord(name="b", ok=False, error="x"),
ToolCallRecord(name="c", ok=True),
],
output_summary="answer",
)
assert grade_trace(trace) == "failure"
def test_clean_run_is_success() -> None:
trace = _trace(
tool_calls=[ToolCallRecord(name="a", ok=True)],
output_summary="answer",
)
assert grade_trace(trace) == "success"
def test_no_signal_is_unknown() -> None:
assert grade_trace(_trace()) == "unknown"

View File

@@ -0,0 +1,103 @@
"""Tests for ``crewai skills proposals`` CLI commands."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import patch
from click.testing import CliRunner
import pytest
from crewai.cli.skills_proposals import skills as skills_group
from crewai.skills.self_improve.models import SkillProposal
from crewai.skills.self_improve.storage import ProposalStore, SkillStore
@pytest.fixture
def proposal() -> SkillProposal:
return SkillProposal(
agent_role="researcher",
name="cite-sources",
description="Always cite sources",
body="# Cite Sources\n\nbody.",
rationale="seen in 3 traces",
confidence=0.75,
)
@pytest.fixture
def runner_with_root(tmp_path: Path):
"""CliRunner with the self-improve root patched at the storage layer."""
runner = CliRunner()
proposal_store = ProposalStore(root=tmp_path)
skill_store = SkillStore(root=tmp_path)
with patch(
"crewai.cli.skills_proposals.ProposalStore", return_value=proposal_store
), patch(
"crewai.skills.self_improve.acceptance.ProposalStore",
return_value=proposal_store,
), patch(
"crewai.skills.self_improve.acceptance.SkillStore", return_value=skill_store
):
yield runner, proposal_store, skill_store
class TestList:
def test_empty(self, runner_with_root) -> None:
runner, _, _ = runner_with_root
result = runner.invoke(skills_group, ["proposals", "list"])
assert result.exit_code == 0
assert "no pending proposals" in result.output
def test_one_pending(self, runner_with_root, proposal: SkillProposal) -> None:
runner, ps, _ = runner_with_root
ps.save(proposal)
result = runner.invoke(skills_group, ["proposals", "list"])
assert result.exit_code == 0
assert proposal.id in result.output
assert "cite-sources" in result.output
class TestShow:
def test_unknown_id_exits_nonzero(self, runner_with_root) -> None:
runner, _, _ = runner_with_root
result = runner.invoke(skills_group, ["proposals", "show", "prop_does_not_exist"])
assert result.exit_code == 1
assert "No proposal" in result.output
def test_prints_body(self, runner_with_root, proposal: SkillProposal) -> None:
runner, ps, _ = runner_with_root
ps.save(proposal)
result = runner.invoke(skills_group, ["proposals", "show", proposal.id])
assert result.exit_code == 0
assert "# Cite Sources" in result.output
assert "rationale" in result.output
class TestAccept:
def test_writes_skill_md_and_clears_queue(
self, runner_with_root, proposal: SkillProposal
) -> None:
runner, ps, ss = runner_with_root
ps.save(proposal)
result = runner.invoke(skills_group, ["proposals", "accept", proposal.id])
assert result.exit_code == 0, result.output
assert ps.find(proposal.id) is None
assert (ss.skill_dir("researcher", "cite-sources") / "SKILL.md").is_file()
def test_unknown_id_exits_nonzero(self, runner_with_root) -> None:
runner, _, _ = runner_with_root
result = runner.invoke(skills_group, ["proposals", "accept", "prop_nope"])
assert result.exit_code == 1
class TestReject:
def test_removes_from_queue(self, runner_with_root, proposal: SkillProposal) -> None:
runner, ps, _ = runner_with_root
ps.save(proposal)
result = runner.invoke(skills_group, ["proposals", "reject", proposal.id])
assert result.exit_code == 0
assert ps.find(proposal.id) is None

View File

@@ -0,0 +1,153 @@
"""Tests for self_improve/collector.py.
The collector is exercised through the real event bus inside a
``scoped_handlers()`` block. Events are constructed with
``model_construct`` so we don't need to spin up a real Agent + LLM.
"""
from __future__ import annotations
from datetime import UTC, datetime
from pathlib import Path
from types import SimpleNamespace
import pytest
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.skills.self_improve.collector import TraceCollector
from crewai.skills.self_improve.storage import TraceStore
def _fake_agent(*, agent_id: str = "agent-1", role: str = "researcher"):
return SimpleNamespace(id=agent_id, role=role, skills=None)
def _fake_task(task_id: str = "task-1", description: str = "do the thing"):
return SimpleNamespace(id=task_id, description=description)
def _started(agent, task) -> AgentExecutionStartedEvent:
return AgentExecutionStartedEvent.model_construct(
agent=agent, task=task, tools=[], task_prompt=task.description
)
def _completed(agent, task, output: str) -> AgentExecutionCompletedEvent:
return AgentExecutionCompletedEvent.model_construct(
agent=agent, task=task, output=output
)
def _error(agent, task, msg: str) -> AgentExecutionErrorEvent:
return AgentExecutionErrorEvent.model_construct(agent=agent, task=task, error=msg)
def _tool_started(agent_id: str, name: str, args="q=x") -> ToolUsageStartedEvent:
return ToolUsageStartedEvent.model_construct(
agent_id=agent_id, tool_name=name, tool_args=args
)
def _tool_finished(agent_id: str, name: str, args="q=x") -> ToolUsageFinishedEvent:
now = datetime.now(UTC)
return ToolUsageFinishedEvent.model_construct(
agent_id=agent_id,
tool_name=name,
tool_args=args,
started_at=now,
finished_at=now,
output="result",
)
def _tool_error(agent_id: str, name: str, args="q=x") -> ToolUsageErrorEvent:
return ToolUsageErrorEvent.model_construct(
agent_id=agent_id, tool_name=name, tool_args=args, error="boom"
)
@pytest.fixture
def store(tmp_path: Path) -> TraceStore:
return TraceStore(root=tmp_path)
def test_collects_full_run_and_persists(store: TraceStore) -> None:
agent = _fake_agent()
task = _fake_task()
collector = TraceCollector(agent, store=store)
with crewai_event_bus.scoped_handlers():
collector.attach(crewai_event_bus)
# Flush between emits so the bus thread pool can't reorder
# tool events past the completion event in this fast test path.
crewai_event_bus.emit(None, _started(agent, task))
crewai_event_bus.flush(timeout=5.0)
crewai_event_bus.emit(None, _tool_started(agent.id, "search"))
crewai_event_bus.emit(None, _tool_finished(agent.id, "search"))
crewai_event_bus.flush(timeout=5.0)
crewai_event_bus.emit(None, _completed(agent, task, "final answer"))
crewai_event_bus.flush(timeout=5.0)
saved = store.list_for_role("researcher")
assert len(saved) == 1
trace = store.load(saved[0])
assert trace.agent_role == "researcher"
assert trace.task_id == "task-1"
assert trace.output_summary == "final answer"
assert trace.outcome == "success"
assert trace.tool_call_count == 1
assert trace.tool_error_count == 0
def test_error_path_is_graded_as_failure(store: TraceStore) -> None:
agent = _fake_agent()
task = _fake_task()
collector = TraceCollector(agent, store=store)
with crewai_event_bus.scoped_handlers():
collector.attach(crewai_event_bus)
crewai_event_bus.emit(None, _started(agent, task))
crewai_event_bus.flush(timeout=5.0)
crewai_event_bus.emit(None, _tool_error(agent.id, "search"))
crewai_event_bus.flush(timeout=5.0)
crewai_event_bus.emit(None, _error(agent, task, "agent crashed"))
crewai_event_bus.flush(timeout=5.0)
[path] = store.list_for_role("researcher")
trace = store.load(path)
assert trace.outcome == "failure"
assert trace.error == "agent crashed"
assert trace.tool_error_count == 1
def test_ignores_events_for_other_agents(store: TraceStore) -> None:
mine = _fake_agent(agent_id="mine", role="researcher")
other = _fake_agent(agent_id="other", role="editor")
task = _fake_task()
collector = TraceCollector(mine, store=store)
with crewai_event_bus.scoped_handlers():
collector.attach(crewai_event_bus)
crewai_event_bus.emit(None, _started(mine, task))
crewai_event_bus.flush(timeout=5.0)
# tool events for some other agent must not pollute our trace
crewai_event_bus.emit(None, _tool_finished(other.id, "leaked-tool"))
crewai_event_bus.emit(None, _tool_finished(mine.id, "real-tool"))
crewai_event_bus.flush(timeout=5.0)
crewai_event_bus.emit(None, _completed(mine, task, "ok"))
crewai_event_bus.flush(timeout=5.0)
[path] = store.list_for_role("researcher")
trace = store.load(path)
assert [t.name for t in trace.tool_calls] == ["real-tool"]

View File

@@ -0,0 +1,106 @@
"""Tests for self_improve/models.py."""
from __future__ import annotations
import pytest
from pydantic import ValidationError
from pathlib import Path
from crewai.skills.self_improve.models import (
RunTrace,
SelfImprovementConfig,
SkillProposal,
ToolCallRecord,
)
class TestSelfImprovementConfig:
def test_defaults(self) -> None:
cfg = SelfImprovementConfig()
assert cfg.skills_dir is None
def test_skills_dir_round_trip(self, tmp_path: Path) -> None:
cfg = SelfImprovementConfig(skills_dir=tmp_path / "learned")
assert cfg.skills_dir == tmp_path / "learned"
class TestRunTrace:
def test_minimal_trace_has_defaults(self) -> None:
trace = RunTrace(agent_role="researcher")
assert trace.agent_role == "researcher"
assert trace.outcome == "unknown"
assert trace.tool_calls == []
assert trace.id.startswith("run_")
assert trace.tool_call_count == 0
assert trace.tool_error_count == 0
def test_tool_counters(self) -> None:
trace = RunTrace(
agent_role="researcher",
tool_calls=[
ToolCallRecord(name="search", ok=True),
ToolCallRecord(name="search", ok=False, error="boom"),
],
)
assert trace.tool_call_count == 2
assert trace.tool_error_count == 1
def test_serializes_roundtrip(self) -> None:
trace = RunTrace(
agent_role="researcher",
tool_calls=[ToolCallRecord(name="search", args_summary="q=hi")],
)
payload = trace.model_dump_json()
roundtrip = RunTrace.model_validate_json(payload)
assert roundtrip.id == trace.id
assert roundtrip.tool_calls[0].name == "search"
class TestSkillProposal:
def test_minimal_proposal(self) -> None:
prop = SkillProposal(
agent_role="researcher",
name="my-skill",
description="A skill",
body="# body",
rationale="seen 3 times",
confidence=0.8,
)
assert prop.proposal_kind == "new"
assert prop.id.startswith("prop_")
def test_name_is_slugified(self) -> None:
# The reviewer LLM sometimes emits Title Case with spaces; we force
# kebab-case so the existing skill loader's name validator passes.
prop = SkillProposal(
agent_role="r",
name="Provide Direct Answer with Supporting Historical Context",
description="d",
body="b",
rationale="r",
confidence=0.7,
)
assert prop.name == "provide-direct-answer-with-supporting-historical-context"
def test_name_strips_punctuation_and_underscores(self) -> None:
prop = SkillProposal(
agent_role="r",
name="My Skill: V2 (the_good_one!)",
description="d",
body="b",
rationale="r",
confidence=0.7,
)
assert prop.name == "my-skill-v2-the-good-one"
def test_confidence_must_be_in_range(self) -> None:
with pytest.raises(ValidationError):
SkillProposal(
agent_role="r",
name="n",
description="d",
body="b",
rationale="r",
confidence=1.5,
)

View File

@@ -0,0 +1,208 @@
"""Tests for self_improve/reviewer.py."""
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock
import pytest
from crewai.skills.self_improve.models import RunTrace, SkillProposal, ToolCallRecord
from crewai.skills.self_improve.reviewer import (
SkillReviewer,
_ReviewerOutput,
_format_trace,
)
def _llm_proposal(**kw):
"""Build a SkillProposal as the LLM would emit it (no server-filled fields)."""
base = dict(
name="x",
description="d",
body="b",
rationale="r",
confidence=0.7,
)
base.update(kw)
return SkillProposal(**base)
def _trace(**kw: Any) -> RunTrace:
base = {"agent_role": "researcher", "outcome": "success"}
base.update(kw)
return RunTrace(**base)
def _stub_llm(output: _ReviewerOutput) -> MagicMock:
"""Return a BaseLLM stand-in whose ``call`` returns the given output."""
llm = MagicMock()
llm.call = MagicMock(return_value=output)
return llm
@pytest.fixture
def reviewer_factory():
def make(llm, **kw):
return SkillReviewer(
agent_role="researcher",
agent_goal="answer questions",
llm=llm,
**kw,
)
return make
class TestFormatTrace:
def test_includes_outcome_task_and_tool_calls(self) -> None:
trace = _trace(
task_description="Find papers on X",
output_summary="Found 3 papers.",
tool_calls=[
ToolCallRecord(name="search", args_summary="q=X", ok=True),
ToolCallRecord(name="fetch", args_summary="id=42", ok=False, error="404"),
],
)
block = _format_trace(trace)
assert "outcome=success" in block
assert "Find papers on X" in block
assert "Found 3 papers." in block
assert "[ok] search(q=X)" in block
assert "[ERR] fetch(id=42)" in block
def test_truncates_long_output(self) -> None:
trace = _trace(output_summary="x" * 5000)
block = _format_trace(trace)
assert "" in block
assert len(block) < 5000
class TestSkillReviewer:
def test_returns_empty_when_below_min_traces(self, reviewer_factory) -> None:
llm = _stub_llm(_ReviewerOutput(proposals=[]))
reviewer = reviewer_factory(llm, min_traces=3)
result = reviewer.review([_trace(), _trace()])
assert result == []
llm.call.assert_not_called()
def test_filters_by_confidence_floor(self, reviewer_factory) -> None:
llm_output = _ReviewerOutput(
proposals=[
_llm_proposal(name="keep", confidence=0.8),
_llm_proposal(name="drop", confidence=0.3),
]
)
llm = _stub_llm(llm_output)
reviewer = reviewer_factory(llm, min_traces=2, confidence_floor=0.6)
out = reviewer.review([_trace(), _trace(), _trace()])
assert [p.name for p in out] == ["keep"]
def test_sets_agent_role_and_run_ids(self, reviewer_factory) -> None:
llm = _stub_llm(
_ReviewerOutput(proposals=[_llm_proposal(name="cite-sources")])
)
reviewer = reviewer_factory(llm, min_traces=2)
traces = [_trace(), _trace(), _trace()]
out = reviewer.review(traces)
assert len(out) == 1
prop = out[0]
assert prop.agent_role == "researcher"
assert prop.derived_from_runs == [t.id for t in traces]
assert prop.proposal_kind == "new"
def test_passes_loaded_skills_into_prompt(self, reviewer_factory) -> None:
llm = _stub_llm(_ReviewerOutput(proposals=[]))
reviewer = reviewer_factory(llm, min_traces=2)
reviewer.review(
[_trace(), _trace(), _trace()],
loaded_skill_names=["citing", "search-tactics"],
)
call_kwargs = llm.call.call_args.kwargs
messages = call_kwargs["messages"]
system_msg = next(m["content"] for m in messages if m["role"] == "system")
assert "citing" in system_msg
assert "search-tactics" in system_msg
def test_handles_non_model_response_gracefully(self, reviewer_factory) -> None:
# An LLM that returned a plain string instead of the structured model.
llm = MagicMock()
llm.call = MagicMock(return_value="totally not a model")
reviewer = reviewer_factory(llm, min_traces=2)
out = reviewer.review([_trace(), _trace(), _trace()])
assert out == []
def test_skills_dir_propagates_from_trace_to_proposal(
self, reviewer_factory, tmp_path
) -> None:
from pathlib import Path
skills_dir = Path(tmp_path) / "project" / "skills" / "learned"
llm = _stub_llm(_ReviewerOutput(proposals=[_llm_proposal(name="cite")]))
reviewer = reviewer_factory(llm, min_traces=2)
traces = [
_trace(agent_skills_dir=skills_dir),
_trace(agent_skills_dir=skills_dir),
_trace(agent_skills_dir=skills_dir),
]
[prop] = reviewer.review(traces)
assert prop.skills_dir == skills_dir
def test_pending_proposals_appear_in_prompt(self, reviewer_factory) -> None:
llm = _stub_llm(_ReviewerOutput(proposals=[]))
reviewer = reviewer_factory(llm, min_traces=2)
pending = [
SkillProposal(
agent_role="researcher",
name="cite-sources",
description="Always cite sources in research output.",
body="b",
rationale="r",
confidence=0.8,
)
]
reviewer.review(
[_trace(), _trace(), _trace()],
pending_proposals=pending,
)
messages = llm.call.call_args.kwargs["messages"]
system_msg = next(m["content"] for m in messages if m["role"] == "system")
assert "cite-sources" in system_msg
assert "Always cite sources" in system_msg
# And it should be in the queued-proposals section, not the loaded
# skills section.
assert "QUEUED" in system_msg
def test_pending_proposals_default_none_renders_none(
self, reviewer_factory
) -> None:
llm = _stub_llm(_ReviewerOutput(proposals=[]))
reviewer = reviewer_factory(llm, min_traces=2)
reviewer.review([_trace(), _trace(), _trace()])
system_msg = next(
m["content"]
for m in llm.call.call_args.kwargs["messages"]
if m["role"] == "system"
)
# The "(none)" sentinel renders under both sections when nothing was passed.
assert system_msg.count("(none)") >= 2
def test_patch_existing_kind_passes_through(self, reviewer_factory) -> None:
llm = _stub_llm(
_ReviewerOutput(
proposals=[
_llm_proposal(
name="citing-v2",
confidence=0.9,
proposal_kind="patch_existing",
target_skill="citing",
)
]
)
)
reviewer = reviewer_factory(llm, min_traces=2)
[prop] = reviewer.review(
[_trace(), _trace(), _trace()], loaded_skill_names=["citing"]
)
assert prop.proposal_kind == "patch_existing"
assert prop.target_skill == "citing"

View File

@@ -0,0 +1,69 @@
"""Tests for self_improve/storage.py."""
from __future__ import annotations
from pathlib import Path
import pytest
from crewai.skills.self_improve.models import RunTrace, SkillProposal
from crewai.skills.self_improve.storage import ProposalStore, TraceStore
@pytest.fixture
def trace() -> RunTrace:
return RunTrace(agent_role="Senior Researcher", output_summary="hello")
@pytest.fixture
def proposal() -> SkillProposal:
return SkillProposal(
agent_role="Senior Researcher",
name="cite-sources",
description="Always cite sources",
body="# body",
rationale="3 of 5 runs cited",
confidence=0.7,
)
class TestTraceStore:
def test_save_and_load_roundtrip(self, tmp_path: Path, trace: RunTrace) -> None:
store = TraceStore(root=tmp_path)
path = store.save(trace)
assert path.exists()
# role gets slugified into the dir name
assert "senior-researcher" in str(path)
loaded = store.load(path)
assert loaded.id == trace.id
assert loaded.output_summary == "hello"
def test_list_for_role(self, tmp_path: Path) -> None:
store = TraceStore(root=tmp_path)
for _ in range(3):
store.save(RunTrace(agent_role="researcher"))
assert store.count_for_role("researcher") == 3
def test_role_slug_is_filesystem_safe(self, tmp_path: Path) -> None:
store = TraceStore(root=tmp_path)
store.save(RunTrace(agent_role="Weird/Role:Name!"))
# only safe chars survive after slugify
assert any(p.is_dir() for p in store.root.iterdir())
class TestProposalStore:
def test_save_and_load_roundtrip(
self, tmp_path: Path, proposal: SkillProposal
) -> None:
store = ProposalStore(root=tmp_path)
path = store.save(proposal)
loaded = store.load(path)
assert loaded.id == proposal.id
assert loaded.name == "cite-sources"
def test_delete(self, tmp_path: Path, proposal: SkillProposal) -> None:
store = ProposalStore(root=tmp_path)
store.save(proposal)
assert store.delete(proposal.id, "Senior Researcher") is True
assert store.delete(proposal.id, "Senior Researcher") is False

2
uv.lock generated
View File

@@ -13,7 +13,7 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-04-27T16:00:00Z"
exclude-newer = "2026-04-28T07:00:00Z"
[manifest]
members = [