From 4af40c64f2e24700c3a0f85ec8be49aa71aca9ff Mon Sep 17 00:00:00 2001 From: lorenzejay Date: Tue, 5 May 2026 14:42:25 -0700 Subject: [PATCH] self improve with skills --- lib/crewai/src/crewai/agent/core.py | 48 ++- lib/crewai/src/crewai/cli/cli.py | 4 + .../src/crewai/cli/skill_proposals_tui.py | 147 +++++++++ lib/crewai/src/crewai/cli/skills_proposals.py | 249 +++++++++++++++ .../crewai/skills/self_improve/__init__.py | 38 +++ .../crewai/skills/self_improve/acceptance.py | 91 ++++++ .../crewai/skills/self_improve/auto_grade.py | 76 +++++ .../crewai/skills/self_improve/collector.py | 286 ++++++++++++++++++ .../src/crewai/skills/self_improve/models.py | 116 +++++++ .../crewai/skills/self_improve/reviewer.py | 208 +++++++++++++ .../src/crewai/skills/self_improve/storage.py | 178 +++++++++++ .../tests/skills/self_improve/__init__.py | 0 .../skills/self_improve/test_acceptance.py | 165 ++++++++++ .../skills/self_improve/test_auto_grade.py | 85 ++++++ .../skills/self_improve/test_cli_skills.py | 103 +++++++ .../skills/self_improve/test_collector.py | 153 ++++++++++ .../tests/skills/self_improve/test_models.py | 82 +++++ .../skills/self_improve/test_reviewer.py | 192 ++++++++++++ .../tests/skills/self_improve/test_storage.py | 69 +++++ uv.lock | 2 +- 20 files changed, 2289 insertions(+), 3 deletions(-) create mode 100644 lib/crewai/src/crewai/cli/skill_proposals_tui.py create mode 100644 lib/crewai/src/crewai/cli/skills_proposals.py create mode 100644 lib/crewai/src/crewai/skills/self_improve/__init__.py create mode 100644 lib/crewai/src/crewai/skills/self_improve/acceptance.py create mode 100644 lib/crewai/src/crewai/skills/self_improve/auto_grade.py create mode 100644 lib/crewai/src/crewai/skills/self_improve/collector.py create mode 100644 lib/crewai/src/crewai/skills/self_improve/models.py create mode 100644 lib/crewai/src/crewai/skills/self_improve/reviewer.py create mode 100644 lib/crewai/src/crewai/skills/self_improve/storage.py create mode 100644 lib/crewai/tests/skills/self_improve/__init__.py create mode 100644 lib/crewai/tests/skills/self_improve/test_acceptance.py create mode 100644 lib/crewai/tests/skills/self_improve/test_auto_grade.py create mode 100644 lib/crewai/tests/skills/self_improve/test_cli_skills.py create mode 100644 lib/crewai/tests/skills/self_improve/test_collector.py create mode 100644 lib/crewai/tests/skills/self_improve/test_models.py create mode 100644 lib/crewai/tests/skills/self_improve/test_reviewer.py create mode 100644 lib/crewai/tests/skills/self_improve/test_storage.py diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 561307680..d3555ff3a 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -84,6 +84,7 @@ from crewai.rag.embeddings.types import EmbedderConfig from crewai.security.fingerprint import Fingerprint from crewai.skills.loader import activate_skill, discover_skills from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel +from crewai.skills.self_improve.models import SelfImprovementConfig from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.types.callback import SerializableCallable @@ -190,6 +191,7 @@ class Agent(BaseAgent): _times_executed: int = PrivateAttr(default=0) _mcp_resolver: MCPToolResolver | None = PrivateAttr(default=None) _last_messages: list[LLMMessage] = PrivateAttr(default_factory=list) + _self_improve_collector: Any = PrivateAttr(default=None) max_execution_time: int | None = Field( default=None, description="Maximum execution time for an agent to execute a task", @@ -320,6 +322,15 @@ class Agent(BaseAgent): agent_executor: CrewAgentExecutor | AgentExecutor | None = Field( default=None, description="An instance of the CrewAgentExecutor class." ) + self_improve: bool | SelfImprovementConfig = Field( + default=False, + description=( + "Enable the self-improvement loop. ``True`` uses defaults; pass a " + "``SelfImprovementConfig`` to override (e.g. point ``skills_dir`` at " + "a project-relative path so accepted skills get committed alongside " + "the code). See ``crewai.skills.self_improve``." + ), + ) executor_class: Annotated[ type[CrewAgentExecutor] | type[AgentExecutor], BeforeValidator(_validate_executor_class), @@ -360,6 +371,21 @@ class Agent(BaseAgent): self.set_skills() + if self.self_improve and self._self_improve_collector is None: + from crewai.skills.self_improve.collector import TraceCollector + + collector = TraceCollector(self) + collector.attach(crewai_event_bus) + self._self_improve_collector = collector + + def _self_improve_config(self) -> SelfImprovementConfig | None: + """Return the active SelfImprovementConfig, or None when disabled.""" + if not self.self_improve: + return None + if isinstance(self.self_improve, SelfImprovementConfig): + return self.self_improve + return SelfImprovementConfig() + if self.reasoning and self.planning_config is None: warnings.warn( "The 'reasoning' parameter is deprecated. Use 'planning_config=PlanningConfig()' instead.", @@ -429,7 +455,22 @@ class Agent(BaseAgent): else: crew_skills = list(resolved_crew_skills) - if not self.skills and not crew_skills: + self_improve_dir: Path | None = None + if (config := self._self_improve_config()) is not None: + from crewai.skills.self_improve.storage import SkillStore, _slug + + if config.skills_dir is not None: + candidate = config.skills_dir / _slug(self.role) + else: + candidate = SkillStore().role_dir(self.role) + if candidate.is_dir() and any( + (c / "SKILL.md").is_file() + for c in candidate.iterdir() + if c.is_dir() + ): + self_improve_dir = candidate + + if not self.skills and not crew_skills and self_improve_dir is None: return needs_work = self.skills and any( @@ -437,7 +478,7 @@ class Agent(BaseAgent): or (isinstance(s, SkillModel) and s.disclosure_level < INSTRUCTIONS) for s in self.skills ) - if not needs_work and not crew_skills: + if not needs_work and not crew_skills and self_improve_dir is None: return seen: set[str] = set() @@ -447,6 +488,9 @@ class Agent(BaseAgent): if crew_skills: items.extend(crew_skills) + if self_improve_dir is not None: + items.append(self_improve_dir) + for item in items: if isinstance(item, Path): discovered = discover_skills(item, source=self) diff --git a/lib/crewai/src/crewai/cli/cli.py b/lib/crewai/src/crewai/cli/cli.py index a25fb41d8..184f6d458 100644 --- a/lib/crewai/src/crewai/cli/cli.py +++ b/lib/crewai/src/crewai/cli/cli.py @@ -24,6 +24,7 @@ from crewai.cli.reset_memories_command import reset_memories_command from crewai.cli.run_crew import run_crew from crewai.cli.settings.main import SettingsCommand from crewai.cli.shared.token_manager import TokenManager +from crewai.cli.skills_proposals import skills as skills_group from crewai.cli.tools.main import ToolCommand from crewai.cli.train_crew import train_crew from crewai.cli.triggers.main import TriggersCommand @@ -955,5 +956,8 @@ def checkpoint_prune( prune_checkpoints(ctx.obj["location"], keep, older_than, dry_run) +crewai.add_command(skills_group) + + if __name__ == "__main__": crewai() diff --git a/lib/crewai/src/crewai/cli/skill_proposals_tui.py b/lib/crewai/src/crewai/cli/skill_proposals_tui.py new file mode 100644 index 000000000..76545f9be --- /dev/null +++ b/lib/crewai/src/crewai/cli/skill_proposals_tui.py @@ -0,0 +1,147 @@ +"""Minimal Textual TUI for triaging skill proposals. + +Two panes: the proposals list on the left, the highlighted proposal's +``SKILL.md`` body on the right. Keystrokes accept/reject in place. No +search, no scopes, no async workers — the underlying actions are the +same `accept_proposal` / `reject_proposal` calls the CLI uses. +""" + +from __future__ import annotations + +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Horizontal, VerticalScroll +from textual.widgets import Footer, Header, OptionList, Static + +from crewai.skills.self_improve import ( + ProposalStore, + accept_proposal, + reject_proposal, +) +from crewai.skills.self_improve.models import SkillProposal + + +_PRIMARY = "#eb6658" +_SECONDARY = "#1F7982" +_TERTIARY = "#ffffff" + + +def _format_proposal_detail(p: SkillProposal) -> str: + kind = ( + f"[bold]{p.proposal_kind}[/] → {p.target_skill}" + if p.proposal_kind == "patch_existing" + else "[bold]new[/]" + ) + runs = ", ".join(p.derived_from_runs) or "-" + return ( + f"[bold {_PRIMARY}]{p.name}[/]\n" + f"[dim]role:[/] {p.agent_role}\n" + f"[dim]kind:[/] {kind}\n" + f"[dim]confidence:[/] [bold]{p.confidence:.2f}[/]\n" + f"[dim]from runs:[/] {runs}\n\n" + f"[bold]Rationale[/]\n{p.rationale}\n\n" + f"[bold {_PRIMARY}]{'─' * 44}[/]\n" + f"{p.body}" + ) + + +class SkillProposalsTUI(App[None]): + """Triage UI: navigate the queue, accept or reject in place.""" + + TITLE = "CrewAI Skill Proposals" + SUB_TITLE = "↑↓ list · tab focus pane · PgUp/PgDn or mouse to scroll body · a/r/q" + + BINDINGS = [ + Binding("a", "accept", "Accept"), + Binding("r", "reject", "Reject"), + Binding("q", "quit", "Quit"), + Binding("tab", "focus_next", "Switch pane", show=False), + ] + + CSS = f""" + Header {{ background: {_PRIMARY}; color: {_TERTIARY}; }} + Footer {{ background: {_SECONDARY}; color: {_TERTIARY}; }} + Footer > .footer-key--key {{ background: {_PRIMARY}; color: {_TERTIARY}; }} + Horizontal {{ height: 1fr; }} + #list {{ + width: 40%; + border-right: solid {_SECONDARY}; + scrollbar-color: {_PRIMARY}; + }} + #list > .option-list--option-highlighted {{ + background: {_SECONDARY}; color: {_TERTIARY}; + }} + #detail-scroll {{ + width: 60%; + padding: 1 2; + scrollbar-color: {_PRIMARY}; + }} + #detail-scroll:focus {{ + background: {_SECONDARY} 5%; + }} + """ + + def __init__(self, store: ProposalStore | None = None) -> None: + super().__init__() + self._store = store or ProposalStore() + self._proposals: list[SkillProposal] = [] + + def compose(self) -> ComposeResult: + yield Header(show_clock=False) + with Horizontal(): + yield OptionList(id="list") + with VerticalScroll(id="detail-scroll"): + yield Static("Select a proposal to view its body.", id="detail") + yield Footer() + + def on_mount(self) -> None: + self.query_one("#list", OptionList).border_title = "Pending" + self.query_one("#detail-scroll", VerticalScroll).border_title = "Detail" + self._reload() + + def _reload(self) -> None: + self._proposals = self._store.list_all() + option_list = self.query_one("#list", OptionList) + option_list.clear_options() + for p in self._proposals: + kind_tag = "P" if p.proposal_kind == "patch_existing" else "N" + label = f"[{kind_tag}] {p.confidence:.2f} {p.name}" + option_list.add_option(label) + option_list.border_title = f"Pending ({len(self._proposals)})" + if not self._proposals: + self.query_one("#detail", Static).update("[dim](queue is empty)[/]") + + def _selected(self) -> SkillProposal | None: + idx = self.query_one("#list", OptionList).highlighted + if idx is None or idx >= len(self._proposals): + return None + return self._proposals[idx] + + def on_option_list_option_highlighted( + self, event: OptionList.OptionHighlighted + ) -> None: + idx = event.option_index + if idx < len(self._proposals): + self.query_one("#detail", Static).update( + _format_proposal_detail(self._proposals[idx]) + ) + + def action_accept(self) -> None: + prop = self._selected() + if prop is None: + return + try: + accept_proposal(prop) + self.notify(f"Accepted: {prop.name}", severity="information") + except FileExistsError as e: + self.notify(str(e), severity="warning", timeout=8) + return + self._reload() + + def action_reject(self) -> None: + prop = self._selected() + if prop is None: + return + reject_proposal(prop) + self.notify(f"Rejected: {prop.name}", severity="information") + self._reload() diff --git a/lib/crewai/src/crewai/cli/skills_proposals.py b/lib/crewai/src/crewai/cli/skills_proposals.py new file mode 100644 index 000000000..7784f8d49 --- /dev/null +++ b/lib/crewai/src/crewai/cli/skills_proposals.py @@ -0,0 +1,249 @@ +"""``crewai skills`` subcommands for the self-improvement loop.""" + +from __future__ import annotations + +from collections import defaultdict +from pathlib import Path + +import click + +from crewai.skills.self_improve import ( + ProposalStore, + SkillReviewer, + TraceStore, + accept_proposal, + reject_proposal, +) + + +_DEFAULT_REVIEW_MODEL = "anthropic/claude-haiku-4-5" + + +def _print_proposal_summary(p) -> None: + kind = ( + f"PATCH→{p.target_skill}" + if p.proposal_kind == "patch_existing" + else "NEW" + ) + click.echo( + f" {p.id} {kind:<28} conf={p.confidence:.2f} role={p.agent_role}" + ) + click.echo(f" {p.name}: {p.description}") + + +@click.group(name="skills") +def skills() -> None: + """Manage agent skills.""" + + +@skills.command(name="review") +@click.option( + "--role", + default=None, + help="Limit review to one role (slug or full name). Default: all roles with traces.", +) +@click.option( + "--model", + default=_DEFAULT_REVIEW_MODEL, + help=f"LiteLLM model id for the reviewer LLM. Default: {_DEFAULT_REVIEW_MODEL}.", +) +@click.option( + "--min-traces", + default=2, + type=int, + help="Minimum traces per role before review fires. Default: 2.", +) +@click.option( + "--floor", + default=0.6, + type=float, + help="Drop proposals below this confidence. Default: 0.6.", +) +def skills_review(role: str | None, model: str, min_traces: int, floor: float) -> None: + """Mine accumulated traces for skill proposals. + + Reads role + goal from the trace metadata, calls the reviewer LLM, and + persists each proposal that scores above ``--floor`` to the queue. Use + ``crewai skills proposals list`` to see what came out. + """ + from crewai import LLM + + trace_store = TraceStore() + proposal_store = ProposalStore() + + if not trace_store.root.exists(): + click.echo(f"No traces yet at {trace_store.root}", err=True) + raise SystemExit(1) + + # Group traces by role from disk; the role-slug dirs come from the store. + by_role: dict[str, list] = defaultdict(list) + role_dirs = ( + [trace_store.role_dir(role)] if role else sorted(trace_store.root.iterdir()) + ) + for d in role_dirs: + if not d.is_dir(): + continue + for path in sorted(d.glob("*.json")): + t = trace_store.load(path) + by_role[t.agent_role].append(t) + + if not by_role: + click.echo("No traces found." if role is None else f"No traces for role={role!r}.") + return + + reviewer_llm = LLM(model=model) + total_emitted = 0 + + for agent_role, traces in by_role.items(): + if len(traces) < min_traces: + click.echo( + f"Skipping {agent_role!r}: {len(traces)} trace(s), " + f"need {min_traces}." + ) + continue + + agent_goal = next((t.agent_goal for t in traces if t.agent_goal), "") + loaded_skills_seen = sorted({s for t in traces for s in t.loaded_skills}) + pending = [ + proposal_store.load(p) for p in proposal_store.list_for_role(agent_role) + ] + + reviewer = SkillReviewer( + agent_role=agent_role, + agent_goal=agent_goal, + llm=reviewer_llm, + min_traces=min_traces, + confidence_floor=floor, + ) + click.echo( + f"🧠 Reviewing {len(traces)} trace(s) for {agent_role!r} " + f"(model={model}, pending={len(pending)})…" + ) + proposals_out = reviewer.review( + traces, + loaded_skill_names=loaded_skills_seen, + pending_proposals=pending, + ) + + for p in proposals_out: + path = proposal_store.save(p) + click.echo(f" + {p.id} conf={p.confidence:.2f} {p.name}") + click.echo(f" → {path}") + total_emitted += len(proposals_out) + + click.echo( + f"\n✅ Done. {total_emitted} proposal(s) added to the queue. " + f"Run `crewai skills proposals list` to view." + ) + + +@skills.group(name="proposals") +def proposals() -> None: + """Manage skill proposals from the self-improvement reviewer.""" + + +@proposals.command(name="list") +@click.option("--role", default=None, help="Filter by agent role (slug or full name).") +def proposals_list(role: str | None) -> None: + """List pending proposals across all roles.""" + store = ProposalStore() + items = store.list_for_role(role) if role else None + + if role: + records = [store.load(p) for p in store.list_for_role(role)] + else: + records = store.list_all() + + if not records: + click.echo("(no pending proposals)") + return + + click.echo(f"{len(records)} proposal(s):") + for p in records: + _print_proposal_summary(p) + + +@proposals.command(name="show") +@click.argument("proposal_id") +def proposals_show(proposal_id: str) -> None: + """Print the full body of a proposal.""" + store = ProposalStore() + prop = store.find(proposal_id) + if prop is None: + click.echo(f"No proposal with id {proposal_id!r}", err=True) + raise SystemExit(1) + + click.echo(f"id: {prop.id}") + click.echo(f"role: {prop.agent_role}") + click.echo(f"name: {prop.name}") + click.echo(f"description: {prop.description}") + click.echo(f"confidence: {prop.confidence:.2f}") + click.echo(f"kind: {prop.proposal_kind}") + if prop.target_skill: + click.echo(f"target: {prop.target_skill}") + click.echo(f"derived from: {', '.join(prop.derived_from_runs)}") + click.echo("\nrationale:") + click.echo(prop.rationale) + click.echo("\n--- SKILL.md body ---") + click.echo(prop.body) + + +@proposals.command(name="accept") +@click.argument("proposal_id") +@click.option("--force", is_flag=True, help="Overwrite an existing skill of the same name.") +@click.option( + "--skills-dir", + type=click.Path(file_okay=False, path_type=Path), + default=None, + envvar="CREWAI_SELF_IMPROVE_SKILLS_DIR", + help=( + "Directory to write the SKILL.md to. Defaults to the env var " + "CREWAI_SELF_IMPROVE_SKILLS_DIR, then to the platform data dir. " + "Use a project-relative path (e.g. ./skills/learned) to keep " + "accepted skills under version control — and pass the same path " + "to Agent(self_improve=SelfImprovementConfig(skills_dir=...)) so " + "the agent loads it on the next kickoff." + ), +) +def proposals_accept(proposal_id: str, force: bool, skills_dir: Path | None) -> None: + """Materialize a proposal as a live SKILL.md.""" + from crewai.skills.self_improve import SkillStore + + store = ProposalStore() + prop = store.find(proposal_id) + if prop is None: + click.echo(f"No proposal with id {proposal_id!r}", err=True) + raise SystemExit(1) + skill_store = SkillStore(skills_root=skills_dir) if skills_dir else None + try: + path = accept_proposal(prop, force=force, skill_store=skill_store) + except FileExistsError as e: + click.echo(f"{e}", err=True) + raise SystemExit(2) from None + click.echo(f"✅ Accepted: {path}") + click.echo( + " This skill will load on the next kickoff for any agent with " + f"role={prop.agent_role!r} and self_improve enabled " + "(make sure SelfImprovementConfig.skills_dir matches if you used --skills-dir)." + ) + + +@proposals.command(name="reject") +@click.argument("proposal_id") +def proposals_reject(proposal_id: str) -> None: + """Drop a proposal from the queue without accepting.""" + store = ProposalStore() + prop = store.find(proposal_id) + if prop is None: + click.echo(f"No proposal with id {proposal_id!r}", err=True) + raise SystemExit(1) + reject_proposal(prop) + click.echo(f"🗑 Rejected: {prop.id}") + + +@proposals.command(name="tui") +def proposals_tui() -> None: + """Open an interactive triage TUI for the proposals queue.""" + from crewai.cli.skill_proposals_tui import SkillProposalsTUI + + SkillProposalsTUI().run() diff --git a/lib/crewai/src/crewai/skills/self_improve/__init__.py b/lib/crewai/src/crewai/skills/self_improve/__init__.py new file mode 100644 index 000000000..5de0e4eb8 --- /dev/null +++ b/lib/crewai/src/crewai/skills/self_improve/__init__.py @@ -0,0 +1,38 @@ +"""Self-improving skills for crewAI agents. + +When an Agent is configured with ``self_improve=True``, a TraceCollector +subscribes to the event bus during kickoff, captures tool calls + outcome +signals into a RunTrace, auto-grades the run, and persists the trace to +disk. Across many runs, a SkillReviewer mines those traces for recurring +approaches and emits SkillProposals for human review. +""" + +from crewai.skills.self_improve.acceptance import accept_proposal, reject_proposal +from crewai.skills.self_improve.auto_grade import grade_trace +from crewai.skills.self_improve.collector import TraceCollector +from crewai.skills.self_improve.models import ( + Outcome, + RunTrace, + SelfImprovementConfig, + SkillProposal, + ToolCallRecord, +) +from crewai.skills.self_improve.reviewer import SkillReviewer +from crewai.skills.self_improve.storage import ProposalStore, SkillStore, TraceStore + + +__all__ = [ + "Outcome", + "ProposalStore", + "RunTrace", + "SelfImprovementConfig", + "SkillProposal", + "SkillReviewer", + "SkillStore", + "ToolCallRecord", + "TraceCollector", + "TraceStore", + "accept_proposal", + "grade_trace", + "reject_proposal", +] diff --git a/lib/crewai/src/crewai/skills/self_improve/acceptance.py b/lib/crewai/src/crewai/skills/self_improve/acceptance.py new file mode 100644 index 000000000..a3433488a --- /dev/null +++ b/lib/crewai/src/crewai/skills/self_improve/acceptance.py @@ -0,0 +1,91 @@ +"""Materialize an accepted ``SkillProposal`` as a live ``SKILL.md`` file. + +Acceptance is the human checkpoint in the self-improvement loop: until a +proposal is accepted, it lives in the proposals queue and never affects +the agent. After acceptance, the SKILL.md lands at +``~/.crewai/skills///SKILL.md`` where the existing +skill loader discovers it on the next kickoff. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +from crewai.skills.self_improve.models import SkillProposal +from crewai.skills.self_improve.storage import ProposalStore, SkillStore + + +def _format_skill_md(proposal: SkillProposal) -> str: + """Render the proposal as a SKILL.md document. + + The body is written verbatim — proposals already include their own + markdown structure (title, sections). We only prepend the YAML + frontmatter the loader requires. Both fields are JSON-quoted because + JSON strings are valid YAML scalars and handle every special-char case + safely. + """ + body = proposal.body.lstrip() + # If the LLM already emitted frontmatter (defensive), don't double it. + if body.startswith("---"): + return body if body.endswith("\n") else body + "\n" + frontmatter = ( + f"---\n" + f"name: {json.dumps(proposal.name)}\n" + f"description: {json.dumps(proposal.description)}\n" + f"---\n\n" + ) + return frontmatter + body if body.endswith("\n") else frontmatter + body + "\n" + + +def accept_proposal( + proposal: SkillProposal, + *, + force: bool = False, + skill_store: SkillStore | None = None, + proposal_store: ProposalStore | None = None, +) -> Path: + """Write the proposal as a SKILL.md and remove it from the queue. + + Args: + proposal: The proposal to materialize. + force: When True, overwrite an existing SKILL.md at the target path. + skill_store: Override for the live-skills store (test injection). + proposal_store: Override for the proposals store (test injection). + + Returns: + Path to the written ``SKILL.md``. + + Raises: + FileExistsError: When a SKILL.md already exists at the target and + ``force=False``. + """ + skill_store = skill_store or SkillStore() + proposal_store = proposal_store or ProposalStore() + + target_dir = skill_store.skill_dir(proposal.agent_role, proposal.name) + skill_md = target_dir / "SKILL.md" + + if skill_md.exists() and not force: + raise FileExistsError( + f"{skill_md} already exists. Pass force=True to overwrite." + ) + + target_dir.mkdir(parents=True, exist_ok=True) + skill_md.write_text(_format_skill_md(proposal), encoding="utf-8") + + # Once accepted, the proposal record is no longer the source of truth — + # the SKILL.md is. Drop the queue entry so it doesn't show up in `list`. + proposal_store.delete(proposal.id, proposal.agent_role) + + return skill_md + + +def reject_proposal( + proposal: SkillProposal, + *, + proposal_store: ProposalStore | None = None, +) -> bool: + """Delete a proposal from the queue. Returns True if it existed.""" + proposal_store = proposal_store or ProposalStore() + return proposal_store.delete(proposal.id, proposal.agent_role) diff --git a/lib/crewai/src/crewai/skills/self_improve/auto_grade.py b/lib/crewai/src/crewai/skills/self_improve/auto_grade.py new file mode 100644 index 000000000..9c8acf9b3 --- /dev/null +++ b/lib/crewai/src/crewai/skills/self_improve/auto_grade.py @@ -0,0 +1,76 @@ +"""Derive a run outcome from observable signals. + +No human grading required. The reviewer treats outcomes as +confidence-weighted hints, not ground truth — clustering across runs +absorbs label noise. +""" + +from __future__ import annotations + +from collections import Counter + +from crewai.skills.self_improve.models import Outcome, RunTrace + + +_RETRY_THRASH_THRESHOLD = 3 + + +def _has_thrashing(trace: RunTrace) -> bool: + """Detect a tool being called repeatedly with the same args summary. + + A tool fired more than ``_RETRY_THRASH_THRESHOLD`` times with identical + args is likely stuck in a retry loop rather than making progress. + """ + if len(trace.tool_calls) <= _RETRY_THRASH_THRESHOLD: + return False + keys = [(t.name, t.args_summary) for t in trace.tool_calls] + most_common_count = Counter(keys).most_common(1)[0][1] + return most_common_count > _RETRY_THRASH_THRESHOLD + + +def _output_looks_empty(trace: RunTrace) -> bool: + if trace.output_summary is None: + return False + stripped = trace.output_summary.strip() + if not stripped: + return True + lowered = stripped.lower() + return lowered.startswith("error") or lowered.startswith("traceback") + + +def grade_trace(trace: RunTrace) -> Outcome: + """Compute outcome from signals already present on the trace. + + Signal hierarchy (strongest first): + 1. explicit error → failure + 2. guardrail decided → trust it + 3. max_iter exhaustion → failure + 4. tool error rate / thrashing / empty output → failure or partial + 5. otherwise → success when we saw output, else unknown + """ + if trace.error: + return "failure" + + if trace.guardrail_passed is True: + return "success" + if trace.guardrail_passed is False: + return "failure" + + if trace.max_iter_exhausted: + return "failure" + + if _has_thrashing(trace): + return "failure" + + if _output_looks_empty(trace): + return "failure" + + if trace.tool_call_count > 0: + error_rate = trace.tool_error_count / trace.tool_call_count + if error_rate >= 0.5: + return "failure" + + if trace.output_summary: + return "success" + + return "unknown" diff --git a/lib/crewai/src/crewai/skills/self_improve/collector.py b/lib/crewai/src/crewai/skills/self_improve/collector.py new file mode 100644 index 000000000..e3a41d407 --- /dev/null +++ b/lib/crewai/src/crewai/skills/self_improve/collector.py @@ -0,0 +1,286 @@ +"""Trace collector that subscribes to the event bus. + +One ``TraceCollector`` per Agent instance. It keeps a per-(agent, task) +in-flight trace, identifies which events belong to its agent, and +finalizes the trace on completion — auto-grading and persisting it. +""" + +from __future__ import annotations + +import atexit +from datetime import UTC, datetime +import logging +import threading +from typing import TYPE_CHECKING, Any + +from crewai.events.event_bus import CrewAIEventsBus, crewai_event_bus +from crewai.events.types.agent_events import ( + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, + LiteAgentExecutionCompletedEvent, + LiteAgentExecutionErrorEvent, + LiteAgentExecutionStartedEvent, +) +from crewai.events.types.tool_usage_events import ( + ToolUsageErrorEvent, + ToolUsageFinishedEvent, + ToolUsageStartedEvent, +) +from crewai.skills.self_improve.auto_grade import grade_trace +from crewai.skills.self_improve.models import RunTrace, ToolCallRecord +from crewai.skills.self_improve.storage import TraceStore + + +if TYPE_CHECKING: + from crewai.agents.agent_builder.base_agent import BaseAgent + + +_OUTPUT_TRUNCATE = 4000 +_ARGS_TRUNCATE = 200 +_FLUSH_TIMEOUT = 10.0 + +logger = logging.getLogger(__name__) + +# ``Agent.kickoff()`` can return before the bus's thread-pool handlers drain; +# without this hook a script that exits immediately would lose the just- +# finalized trace. ``flush()`` is a no-op when no events are pending, so it's +# safe to register at module import time even if no agent ever opts in. +atexit.register(lambda: crewai_event_bus.flush(timeout=_FLUSH_TIMEOUT)) + + +def _truncate(value: Any, limit: int) -> str: + if value is None: + return "" + s = value if isinstance(value, str) else repr(value) + if len(s) <= limit: + return s + return s[: limit - 1] + "…" + + +class TraceCollector: + """Captures one ``RunTrace`` per agent execution by subscribing to events. + + Lifecycle: + - ``attach(bus)`` registers handlers on the global event bus. + - On ``AgentExecutionStartedEvent`` for our agent, a new trace begins. + - ``ToolUsage*`` events for our agent append ``ToolCallRecord``s. + - On ``AgentExecutionCompletedEvent`` / ``AgentExecutionErrorEvent`` + for our agent, the trace is auto-graded and persisted. + + The collector is intentionally tolerant: a missing Started event + (e.g. because the agent was already executing when ``attach`` ran) just + skips that trace. Tool events without a current trace are ignored. + """ + + def __init__( + self, + agent: BaseAgent, + store: TraceStore | None = None, + ) -> None: + self._agent = agent + self._store = store or TraceStore() + self._current: RunTrace | None = None + self._tool_started_at: dict[str, datetime] = {} + self._attached = False + # Bus dispatches handlers on a thread pool, and Agent.kickoff() + + # LiteAgent each emit Started/Completed for the same logical run, so + # the lock + ``self._current is not None`` check serializes the + # "create or skip" decision and dedupes the second half of the pair. + self._lock = threading.RLock() + + @property + def current_trace(self) -> RunTrace | None: + """The in-flight trace, if an agent execution is active.""" + return self._current + + def _is_my_agent(self, event_agent: Any) -> bool: + if event_agent is None: + return False + if event_agent is self._agent: + return True + agent_id = getattr(event_agent, "id", None) + my_id = getattr(self._agent, "id", None) + return bool(agent_id is not None and my_id is not None and agent_id == my_id) + + def _is_my_id(self, event_agent_id: str | None) -> bool: + if not event_agent_id: + return False + my_id = getattr(self._agent, "id", None) + return bool(my_id is not None and str(my_id) == str(event_agent_id)) + + def attach(self, bus: CrewAIEventsBus) -> None: + """Register event handlers. Idempotent.""" + if self._attached: + return + self._attached = True + + @bus.on(AgentExecutionStartedEvent) + def _on_started(_source: Any, event: AgentExecutionStartedEvent) -> None: + if not self._is_my_agent(event.agent): + return + with self._lock: + if self._current is not None: + return # duplicate Started for an in-flight trace + task = event.task + self._current = RunTrace( + agent_id=str(getattr(self._agent, "id", "") or ""), + agent_role=self._agent.role, + agent_goal=getattr(self._agent, "goal", "") or "", + task_id=str(getattr(task, "id", "") or "") or None, + task_description=getattr(task, "description", None), + loaded_skills=self._collect_loaded_skills(), + ) + + @bus.on(ToolUsageStartedEvent) + def _on_tool_started(_source: Any, event: ToolUsageStartedEvent) -> None: + if not self._is_my_id(event.agent_id): + return + with self._lock: + if self._current is None: + return + self._tool_started_at[event.tool_name] = datetime.now(UTC) + + @bus.on(ToolUsageFinishedEvent) + def _on_tool_finished(_source: Any, event: ToolUsageFinishedEvent) -> None: + if not self._is_my_id(event.agent_id): + return + with self._lock: + if self._current is None: + return + self._current.tool_calls.append( + ToolCallRecord( + name=event.tool_name, + args_summary=_truncate(event.tool_args, _ARGS_TRUNCATE), + ok=True, + duration_ms=self._duration_ms( + event.tool_name, event.finished_at + ), + ) + ) + self._tool_started_at.pop(event.tool_name, None) + + @bus.on(ToolUsageErrorEvent) + def _on_tool_error(_source: Any, event: ToolUsageErrorEvent) -> None: + if not self._is_my_id(event.agent_id): + return + with self._lock: + if self._current is None: + return + self._current.tool_calls.append( + ToolCallRecord( + name=event.tool_name, + args_summary=_truncate(event.tool_args, _ARGS_TRUNCATE), + ok=False, + error=_truncate(event.error, _ARGS_TRUNCATE), + duration_ms=self._duration_ms( + event.tool_name, datetime.now(UTC) + ), + ) + ) + self._tool_started_at.pop(event.tool_name, None) + + @bus.on(AgentExecutionCompletedEvent) + def _on_completed(_source: Any, event: AgentExecutionCompletedEvent) -> None: + if not self._is_my_agent(event.agent): + return + with self._lock: + if self._current is None: + return + self._current.output_summary = _truncate( + event.output, _OUTPUT_TRUNCATE + ) + self._finalize_locked() + + @bus.on(AgentExecutionErrorEvent) + def _on_error(_source: Any, event: AgentExecutionErrorEvent) -> None: + if not self._is_my_agent(event.agent): + return + with self._lock: + if self._current is None: + return + self._current.error = _truncate(event.error, _OUTPUT_TRUNCATE) + self._finalize_locked() + + @bus.on(LiteAgentExecutionStartedEvent) + def _on_lite_started( + _source: Any, event: LiteAgentExecutionStartedEvent + ) -> None: + if not self._is_my_id(event.agent_info.get("id")): + return + with self._lock: + if self._current is not None: + return # duplicate Started for an in-flight trace + messages = event.messages + if isinstance(messages, list): + task_desc = " ".join( + str(m.get("content", "")) + for m in messages + if isinstance(m, dict) + ) + else: + task_desc = str(messages) + self._current = RunTrace( + agent_id=str(getattr(self._agent, "id", "") or ""), + agent_role=self._agent.role, + agent_goal=getattr(self._agent, "goal", "") or "", + task_description=_truncate(task_desc, _OUTPUT_TRUNCATE), + loaded_skills=self._collect_loaded_skills(), + ) + + @bus.on(LiteAgentExecutionCompletedEvent) + def _on_lite_completed( + _source: Any, event: LiteAgentExecutionCompletedEvent + ) -> None: + if not self._is_my_id(event.agent_info.get("id")): + return + with self._lock: + if self._current is None: + return + self._current.output_summary = _truncate( + event.output, _OUTPUT_TRUNCATE + ) + self._finalize_locked() + + @bus.on(LiteAgentExecutionErrorEvent) + def _on_lite_error( + _source: Any, event: LiteAgentExecutionErrorEvent + ) -> None: + if not self._is_my_id(event.agent_info.get("id")): + return + with self._lock: + if self._current is None: + return + self._current.error = _truncate(event.error, _OUTPUT_TRUNCATE) + self._finalize_locked() + + def _duration_ms(self, tool_name: str, finished_at: datetime) -> int | None: + started = self._tool_started_at.get(tool_name) + if started is None: + return None + return max(0, int((finished_at - started).total_seconds() * 1000)) + + def _collect_loaded_skills(self) -> list[str]: + skills = getattr(self._agent, "skills", None) or [] + names: list[str] = [] + for s in skills: + name = getattr(s, "name", None) + if isinstance(name, str): + names.append(name) + return names + + def _finalize_locked(self) -> None: + """Caller holds ``self._lock``.""" + trace = self._current + if trace is None: + return + self._current = None + self._tool_started_at.clear() + trace.ended_at = datetime.now(UTC) + trace.outcome = grade_trace(trace) + try: + self._store.save(trace) + except OSError: + logger.exception( + "Failed to persist run trace for role %s", trace.agent_role + ) diff --git a/lib/crewai/src/crewai/skills/self_improve/models.py b/lib/crewai/src/crewai/skills/self_improve/models.py new file mode 100644 index 000000000..e367ad7bb --- /dev/null +++ b/lib/crewai/src/crewai/skills/self_improve/models.py @@ -0,0 +1,116 @@ +"""Data models for self-improving skills.""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path +from typing import Literal +import uuid + +from pydantic import BaseModel, ConfigDict, Field + + +def _now() -> datetime: + return datetime.now(UTC) + + +class SelfImprovementConfig(BaseModel): + """Per-agent configuration for the self-improvement loop. + + All fields are optional with sensible defaults. Pass to ``Agent`` as + ``self_improve=SelfImprovementConfig(skills_dir=Path("./skills/learned"))`` + to override; ``Agent(self_improve=True)`` uses defaults. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + skills_dir: Path | None = Field( + default=None, + description=( + "Where accepted SKILL.md files are written and auto-loaded from. " + "When None, falls back to /self_improve/skills/. " + "Set to a project-relative path (e.g. Path('./skills/learned')) " + "to keep accepted skills under version control." + ), + ) + + +Outcome = Literal["success", "failure", "unknown"] +ProposalKind = Literal["new", "patch_existing"] + + +def _new_id(prefix: str) -> str: + """Generate a short id with a stable prefix.""" + return f"{prefix}_{uuid.uuid4().hex[:12]}" + + +class ToolCallRecord(BaseModel): + """One tool invocation within a run.""" + + model_config = ConfigDict(frozen=True) + + name: str + args_summary: str = Field( + default="", + description="Truncated string repr of args, suitable for clustering.", + ) + ok: bool = True + error: str | None = None + duration_ms: int | None = None + + +class RunTrace(BaseModel): + """One agent + task execution. + + Built incrementally by ``TraceCollector`` from event-bus events, + finalized at agent completion, then persisted to disk. + """ + + model_config = ConfigDict(arbitrary_types_allowed=True) + + id: str = Field(default_factory=lambda: _new_id("run")) + agent_id: str | None = None + agent_role: str + agent_goal: str = "" + task_id: str | None = None + task_description: str | None = None + started_at: datetime = Field(default_factory=_now) + ended_at: datetime | None = None + tool_calls: list[ToolCallRecord] = Field(default_factory=list) + loaded_skills: list[str] = Field(default_factory=list) + outcome: Outcome = "unknown" + output_summary: str | None = None + error: str | None = None + max_iter_exhausted: bool = False + guardrail_passed: bool | None = None + + @property + def tool_error_count(self) -> int: + return sum(1 for t in self.tool_calls if not t.ok) + + @property + def tool_call_count(self) -> int: + return len(self.tool_calls) + + +class SkillProposal(BaseModel): + """A proposed new or updated skill, awaiting human review. + + The reviewer LLM emits these directly via a thin envelope; the server + stamps ``agent_role`` and ``derived_from_runs`` after the call, which is + why those two have permissive defaults rather than being required. + """ + + model_config = ConfigDict(frozen=True) + + id: str = Field(default_factory=lambda: _new_id("prop")) + agent_role: str = "" + name: str + description: str + body: str + rationale: str + confidence: float = Field(ge=0.0, le=1.0) + proposal_kind: ProposalKind = "new" + target_skill: str | None = None + derived_from_runs: list[str] = Field(default_factory=list) + created_at: datetime = Field(default_factory=_now) diff --git a/lib/crewai/src/crewai/skills/self_improve/reviewer.py b/lib/crewai/src/crewai/skills/self_improve/reviewer.py new file mode 100644 index 000000000..2be92f64a --- /dev/null +++ b/lib/crewai/src/crewai/skills/self_improve/reviewer.py @@ -0,0 +1,208 @@ +"""LLM-driven skill reviewer. + +Reads N ``RunTrace`` records, identifies recurring *approaches* (not +facts — those go to memory), and emits ``SkillProposal``s for human +review. + +The reviewer never writes to the active skills directory itself; it only +populates the proposals queue. Acceptance is a separate, explicit step. +""" + +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict, Field + +from crewai.llms.base_llm import BaseLLM +from crewai.skills.self_improve.models import RunTrace, SkillProposal + + +_TRACE_OUTPUT_TRUNCATE = 1500 +_TRACE_TASK_TRUNCATE = 800 + + +class _ReviewerOutput(BaseModel): + """Envelope around the LLM's structured proposals. + + ``SkillProposal.agent_role`` and ``derived_from_runs`` are intentionally + server-filled post-call; the LLM doesn't need to emit them. + """ + + model_config = ConfigDict(extra="ignore") + + proposals: list[SkillProposal] = Field(default_factory=list) + + +_SYSTEM_PROMPT = """\ +You are reviewing execution traces from a CrewAI agent to decide what +*reusable approaches* are worth saving as agent skills. + +The agent's role: {role} +The agent's goal: {goal} + +You see {n} traces from past runs. Your job: + +1. Identify recurring NON-DETERMINISTIC APPROACHES — patterns of how this + agent thinks about a class of problems. Examples of approaches: + - "When asked to scaffold a project, always start by listing required + config files before writing code." + - "If a tool returns empty results, rephrase the query before retrying." + +2. DROP facts. Facts go to memory, not skills. Examples of facts: + - "the user prefers dark mode" + - "the API key for service X is in vault Y" + - "this project uses pytest" + +3. Drop one-off observations. A skill needs ≥2 traces showing the same + approach. If only one trace shows it, omit. + +4. If a proposal would patch an EXISTING loaded skill (listed below), set + proposal_kind="patch_existing" and target_skill=. + Otherwise leave proposal_kind="new". + +5. Score confidence honestly. Below 0.6 will be auto-rejected, so don't + pad. A pattern seen in 2/{n} traces with no contradictions ≈ 0.65; in + all {n} traces with consistent outcome ≈ 0.85. + +6. SKILL.md body should be markdown: + - First line: short description in italics or as a sentence + - "When to use this skill" section + - "How to apply it" section with concrete steps + - No invented facts or fabricated tool names + +Loaded skills already available to this agent: +{loaded_skills} + +Proposals already QUEUED for human review (do not re-propose these — a +prior reviewer round already surfaced them, they're awaiting accept/reject): +{pending_proposals} + +If a recurring pattern matches a queued proposal semantically, simply +omit it from your output. Don't restate the same approach under a new +name. The human curator will resolve the queue. + +Return a JSON object matching the schema. Empty proposals list is a valid +answer when no recurring approach is worth saving. +""" + + +_USER_PROMPT = """\ +TRACES ({n}): + +{traces} + +Review the traces above and emit your proposals. +""" + + +def _format_trace(trace: RunTrace) -> str: + """One block per trace, compact but enough signal.""" + task = (trace.task_description or "").strip() + if len(task) > _TRACE_TASK_TRUNCATE: + task = task[: _TRACE_TASK_TRUNCATE - 1] + "…" + + output = (trace.output_summary or "").strip() + if len(output) > _TRACE_OUTPUT_TRUNCATE: + output = output[: _TRACE_OUTPUT_TRUNCATE - 1] + "…" + + tool_lines: list[str] = [] + for t in trace.tool_calls: + tag = "ok" if t.ok else "ERR" + tool_lines.append(f" [{tag}] {t.name}({t.args_summary})") + tools_block = "\n".join(tool_lines) if tool_lines else " (no tool calls)" + + return ( + f"--- {trace.id} outcome={trace.outcome} " + f"max_iter_exhausted={trace.max_iter_exhausted} " + f"guardrail_passed={trace.guardrail_passed}\n" + f"task:\n{task}\n\n" + f"tool_calls ({len(trace.tool_calls)}):\n{tools_block}\n\n" + f"output_summary:\n{output}\n" + ) + + +class SkillReviewer: + """Synthesize ``SkillProposal``s from a batch of ``RunTrace``s. + + Stateless; the only state lives in the disk store. Pass an LLM (any + ``BaseLLM`` instance — typically a small/cheap model like Haiku is + enough since the reviewer just summarizes). + """ + + def __init__( + self, + *, + agent_role: str, + agent_goal: str, + llm: BaseLLM, + min_traces: int = 3, + confidence_floor: float = 0.6, + ) -> None: + self.agent_role = agent_role + self.agent_goal = agent_goal + self.llm = llm + self.min_traces = min_traces + self.confidence_floor = confidence_floor + + def review( + self, + traces: list[RunTrace], + *, + loaded_skill_names: list[str] | None = None, + pending_proposals: list[SkillProposal] | None = None, + ) -> list[SkillProposal]: + """Run the LLM review and return filtered proposals. + + Returns an empty list when ``len(traces) < self.min_traces`` so + the reviewer is safe to call early — it just no-ops. + + Pass ``pending_proposals`` (typically the current contents of the + ProposalStore for this role) so the reviewer doesn't re-emit + semantic duplicates of items already awaiting human review. + """ + if len(traces) < self.min_traces: + return [] + + loaded_skills_str = ( + "\n".join(f" - {name}" for name in loaded_skill_names) + if loaded_skill_names + else " (none)" + ) + + if pending_proposals: + pending_str = "\n".join( + f" - {p.name}: {p.description}" for p in pending_proposals + ) + else: + pending_str = " (none)" + + system = _SYSTEM_PROMPT.format( + role=self.agent_role, + goal=self.agent_goal, + n=len(traces), + loaded_skills=loaded_skills_str, + pending_proposals=pending_str, + ) + user = _USER_PROMPT.format( + n=len(traces), + traces="\n".join(_format_trace(t) for t in traces), + ) + + result = self.llm.call( + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + response_model=_ReviewerOutput, + ) + + if not isinstance(result, _ReviewerOutput): + return [] + + run_ids = [t.id for t in traces] + return [ + p.model_copy( + update={"agent_role": self.agent_role, "derived_from_runs": run_ids} + ) + for p in result.proposals + if p.confidence >= self.confidence_floor + ] diff --git a/lib/crewai/src/crewai/skills/self_improve/storage.py b/lib/crewai/src/crewai/skills/self_improve/storage.py new file mode 100644 index 000000000..595388cbd --- /dev/null +++ b/lib/crewai/src/crewai/skills/self_improve/storage.py @@ -0,0 +1,178 @@ +"""On-disk storage for traces and skill proposals. + +Layout:: + + / + traces//.json + skill_proposals//.json + skills///SKILL.md + +The default root is ``db_storage_path() / "self_improve"`` — the same +project-scoped, platform-correct data dir that memory DBs use (set by +``appdirs.user_data_dir`` and overridable via ``CREWAI_STORAGE_DIR``). +``CREWAI_SELF_IMPROVE_DIR`` overrides specifically this feature's root, +useful for tests or migrations. +""" + +from __future__ import annotations + +import os +from pathlib import Path +import re + +from crewai.skills.self_improve.models import RunTrace, SkillProposal +from crewai.utilities.paths import db_storage_path + + +_ENV_VAR = "CREWAI_SELF_IMPROVE_DIR" +_SLUG_RE = re.compile(r"[^a-z0-9_-]+") + + +def _slug(role: str) -> str: + """Slugify an agent role for use as a directory name.""" + s = role.strip().lower().replace(" ", "-") + s = _SLUG_RE.sub("", s) + return s or "agent" + + +def _resolve_root(root: Path | None) -> Path: + if root is not None: + return root + env = os.environ.get(_ENV_VAR) + if env: + return Path(env) + return Path(db_storage_path()) / "self_improve" + + +def _write_json(path: Path, payload: str) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(payload, encoding="utf-8") + + +class TraceStore: + """Filesystem store for ``RunTrace`` records.""" + + def __init__(self, root: Path | None = None) -> None: + self.root = _resolve_root(root) / "traces" + + def role_dir(self, role: str) -> Path: + return self.root / _slug(role) + + def path_for(self, trace: RunTrace) -> Path: + return self.role_dir(trace.agent_role) / f"{trace.id}.json" + + def save(self, trace: RunTrace) -> Path: + path = self.path_for(trace) + _write_json(path, trace.model_dump_json(indent=2)) + return path + + def list_for_role(self, role: str) -> list[Path]: + d = self.role_dir(role) + if not d.exists(): + return [] + return sorted(d.glob("*.json")) + + def load(self, path: Path) -> RunTrace: + return RunTrace.model_validate_json(path.read_text(encoding="utf-8")) + + def count_for_role(self, role: str) -> int: + return len(self.list_for_role(role)) + + +class ProposalStore: + """Filesystem store for ``SkillProposal`` records pending human review.""" + + def __init__(self, root: Path | None = None) -> None: + self.root = _resolve_root(root) / "skill_proposals" + + def role_dir(self, role: str) -> Path: + return self.root / _slug(role) + + def path_for(self, proposal: SkillProposal) -> Path: + return self.role_dir(proposal.agent_role) / f"{proposal.id}.json" + + def save(self, proposal: SkillProposal) -> Path: + path = self.path_for(proposal) + _write_json(path, proposal.model_dump_json(indent=2)) + return path + + def list_for_role(self, role: str) -> list[Path]: + d = self.role_dir(role) + if not d.exists(): + return [] + return sorted(d.glob("*.json")) + + def load(self, path: Path) -> SkillProposal: + return SkillProposal.model_validate_json(path.read_text(encoding="utf-8")) + + def delete(self, proposal_id: str, role: str) -> bool: + path = self.role_dir(role) / f"{proposal_id}.json" + if path.exists(): + path.unlink() + return True + return False + + def find(self, proposal_id: str) -> SkillProposal | None: + """Locate a proposal by id across all role dirs. Returns None if missing.""" + if not self.root.exists(): + return None + for role_dir in self.root.iterdir(): + if not role_dir.is_dir(): + continue + path = role_dir / f"{proposal_id}.json" + if path.exists(): + return self.load(path) + return None + + def list_all(self) -> list[SkillProposal]: + """All proposals across roles, oldest first by file id.""" + if not self.root.exists(): + return [] + out: list[SkillProposal] = [] + for role_dir in sorted(self.root.iterdir()): + if not role_dir.is_dir(): + continue + for path in sorted(role_dir.glob("*.json")): + out.append(self.load(path)) + return out + + +class SkillStore: + """Filesystem store for accepted (live) skills. + + Each accepted ``SkillProposal`` becomes a directory under + ``role_dir(role) / skill_name`` with a ``SKILL.md`` inside, matching + the layout the existing skill loader discovers. + + Two ways to construct: + + - ``SkillStore()`` — root is ``/self_improve/skills/`` + (the platform default colocated with traces + proposals). + - ``SkillStore(skills_root=Path("./skills/learned"))`` — root is the + given path verbatim. Use this when the agent is configured with + ``SelfImprovementConfig(skills_dir=...)`` so accepted skills land in + the project tree. + """ + + def __init__( + self, + root: Path | None = None, + skills_root: Path | None = None, + ) -> None: + self.root = ( + skills_root + if skills_root is not None + else _resolve_root(root) / "skills" + ) + + def role_dir(self, role: str) -> Path: + return self.root / _slug(role) + + def skill_dir(self, role: str, skill_name: str) -> Path: + return self.role_dir(role) / skill_name + + def has_any(self, role: str) -> bool: + d = self.role_dir(role) + if not d.exists(): + return False + return any((child / "SKILL.md").is_file() for child in d.iterdir() if child.is_dir()) diff --git a/lib/crewai/tests/skills/self_improve/__init__.py b/lib/crewai/tests/skills/self_improve/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/crewai/tests/skills/self_improve/test_acceptance.py b/lib/crewai/tests/skills/self_improve/test_acceptance.py new file mode 100644 index 000000000..b183f4689 --- /dev/null +++ b/lib/crewai/tests/skills/self_improve/test_acceptance.py @@ -0,0 +1,165 @@ +"""Tests for self_improve/acceptance.py.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from crewai.skills.self_improve.acceptance import ( + _format_skill_md, + accept_proposal, + reject_proposal, +) +from crewai.skills.self_improve.models import SkillProposal +from crewai.skills.self_improve.storage import ProposalStore, SkillStore + + +@pytest.fixture +def proposal() -> SkillProposal: + return SkillProposal( + agent_role="Senior Researcher", + name="cite-sources", + description="Always cite sources in research outputs", + body="# Cite Sources\n\n*Always cite sources.*\n\n## When to use\n\nWhenever you write a research summary.\n", + rationale="Seen in 3 of 4 traces", + confidence=0.8, + derived_from_runs=["run_a", "run_b", "run_c"], + ) + + +@pytest.fixture +def stores(tmp_path: Path): + return SkillStore(root=tmp_path), ProposalStore(root=tmp_path) + + +class TestFormatSkillMd: + def test_includes_yaml_frontmatter(self, proposal: SkillProposal) -> None: + out = _format_skill_md(proposal) + assert out.startswith("---\n") + # Values are JSON-quoted (valid YAML scalars). + assert 'name: "cite-sources"' in out + assert '"Always cite sources in research outputs"' in out + assert "# Cite Sources" in out + + def test_quotes_descriptions_with_special_chars(self) -> None: + prop = SkillProposal( + agent_role="r", + name="n", + description='Has a "quote" and: colon', + body="b", + rationale="r", + confidence=0.7, + ) + out = _format_skill_md(prop) + # JSON-quoted: backslash-escapes the inner quote, retains the colon literally. + assert 'description: "Has a \\"quote\\" and: colon"' in out + + def test_passes_through_body_with_existing_frontmatter(self) -> None: + prop = SkillProposal( + agent_role="r", + name="n", + description="d", + body="---\nname: n\n---\n# Body\n", + rationale="r", + confidence=0.7, + ) + out = _format_skill_md(prop) + # No double frontmatter + assert out.count("---") == 2 # one open, one close + + +class TestAcceptProposal: + def test_writes_skill_md_at_expected_path( + self, stores, proposal: SkillProposal + ) -> None: + skill_store, proposal_store = stores + proposal_store.save(proposal) + + path = accept_proposal( + proposal, + skill_store=skill_store, + proposal_store=proposal_store, + ) + + assert path.name == "SKILL.md" + # role gets slugified + assert "senior-researcher" in str(path) + assert "cite-sources" in str(path) + + body = path.read_text() + assert body.startswith("---\n") + assert "# Cite Sources" in body + + def test_removes_proposal_from_queue_on_accept( + self, stores, proposal: SkillProposal + ) -> None: + skill_store, proposal_store = stores + proposal_store.save(proposal) + assert proposal_store.find(proposal.id) is not None + + accept_proposal( + proposal, skill_store=skill_store, proposal_store=proposal_store + ) + + assert proposal_store.find(proposal.id) is None + + def test_refuses_to_overwrite_existing( + self, stores, proposal: SkillProposal + ) -> None: + skill_store, proposal_store = stores + proposal_store.save(proposal) + accept_proposal( + proposal, skill_store=skill_store, proposal_store=proposal_store + ) + # Re-save and re-accept the same proposal id (or a fresh one) → conflict + proposal_store.save(proposal) + with pytest.raises(FileExistsError): + accept_proposal( + proposal, + skill_store=skill_store, + proposal_store=proposal_store, + ) + + def test_force_overwrites(self, stores, proposal: SkillProposal) -> None: + skill_store, proposal_store = stores + proposal_store.save(proposal) + accept_proposal( + proposal, skill_store=skill_store, proposal_store=proposal_store + ) + + proposal_store.save(proposal) + path = accept_proposal( + proposal, + skill_store=skill_store, + proposal_store=proposal_store, + force=True, + ) + assert path.exists() + + +class TestRejectProposal: + def test_removes_from_queue(self, stores, proposal: SkillProposal) -> None: + _, proposal_store = stores + proposal_store.save(proposal) + assert reject_proposal(proposal, proposal_store=proposal_store) is True + assert proposal_store.find(proposal.id) is None + + def test_returns_false_when_missing( + self, stores, proposal: SkillProposal + ) -> None: + _, proposal_store = stores + assert reject_proposal(proposal, proposal_store=proposal_store) is False + + +class TestSkillStore: + def test_has_any_detects_at_least_one_skill( + self, stores, proposal: SkillProposal + ) -> None: + skill_store, proposal_store = stores + assert skill_store.has_any("Senior Researcher") is False + proposal_store.save(proposal) + accept_proposal( + proposal, skill_store=skill_store, proposal_store=proposal_store + ) + assert skill_store.has_any("Senior Researcher") is True diff --git a/lib/crewai/tests/skills/self_improve/test_auto_grade.py b/lib/crewai/tests/skills/self_improve/test_auto_grade.py new file mode 100644 index 000000000..ddaeacbed --- /dev/null +++ b/lib/crewai/tests/skills/self_improve/test_auto_grade.py @@ -0,0 +1,85 @@ +"""Tests for self_improve/auto_grade.py.""" + +from __future__ import annotations + +from crewai.skills.self_improve.auto_grade import grade_trace +from crewai.skills.self_improve.models import RunTrace, ToolCallRecord + + +def _trace(**kw): + return RunTrace(agent_role="r", **kw) + + +def test_explicit_error_is_failure() -> None: + assert grade_trace(_trace(error="kaboom", output_summary="ok")) == "failure" + + +def test_guardrail_pass_overrides_other_signals() -> None: + trace = _trace( + guardrail_passed=True, + max_iter_exhausted=True, # would normally fail, but guardrail wins + output_summary="ok", + ) + assert grade_trace(trace) == "success" + + +def test_guardrail_fail_is_failure() -> None: + assert grade_trace(_trace(guardrail_passed=False, output_summary="x")) == "failure" + + +def test_max_iter_is_failure() -> None: + assert grade_trace(_trace(max_iter_exhausted=True, output_summary="x")) == "failure" + + +def test_thrashing_is_failure() -> None: + trace = _trace( + tool_calls=[ + ToolCallRecord(name="search", args_summary="q=x") for _ in range(5) + ], + output_summary="ok", + ) + assert grade_trace(trace) == "failure" + + +def test_empty_output_is_failure() -> None: + assert grade_trace(_trace(output_summary=" ")) == "failure" + + +def test_error_string_output_is_failure() -> None: + assert grade_trace(_trace(output_summary="Error: boom")) == "failure" + + +def test_minority_tool_errors_still_count_as_success() -> None: + trace = _trace( + tool_calls=[ + ToolCallRecord(name="a", ok=True), + ToolCallRecord(name="b", ok=True), + ToolCallRecord(name="c", ok=False, error="x"), + ], + output_summary="answer", + ) + assert grade_trace(trace) == "success" + + +def test_failure_when_majority_tool_errors() -> None: + trace = _trace( + tool_calls=[ + ToolCallRecord(name="a", ok=False, error="x"), + ToolCallRecord(name="b", ok=False, error="x"), + ToolCallRecord(name="c", ok=True), + ], + output_summary="answer", + ) + assert grade_trace(trace) == "failure" + + +def test_clean_run_is_success() -> None: + trace = _trace( + tool_calls=[ToolCallRecord(name="a", ok=True)], + output_summary="answer", + ) + assert grade_trace(trace) == "success" + + +def test_no_signal_is_unknown() -> None: + assert grade_trace(_trace()) == "unknown" diff --git a/lib/crewai/tests/skills/self_improve/test_cli_skills.py b/lib/crewai/tests/skills/self_improve/test_cli_skills.py new file mode 100644 index 000000000..01d600340 --- /dev/null +++ b/lib/crewai/tests/skills/self_improve/test_cli_skills.py @@ -0,0 +1,103 @@ +"""Tests for ``crewai skills proposals`` CLI commands.""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch + +from click.testing import CliRunner +import pytest + +from crewai.cli.skills_proposals import skills as skills_group +from crewai.skills.self_improve.models import SkillProposal +from crewai.skills.self_improve.storage import ProposalStore, SkillStore + + +@pytest.fixture +def proposal() -> SkillProposal: + return SkillProposal( + agent_role="researcher", + name="cite-sources", + description="Always cite sources", + body="# Cite Sources\n\nbody.", + rationale="seen in 3 traces", + confidence=0.75, + ) + + +@pytest.fixture +def runner_with_root(tmp_path: Path): + """CliRunner with the self-improve root patched at the storage layer.""" + runner = CliRunner() + proposal_store = ProposalStore(root=tmp_path) + skill_store = SkillStore(root=tmp_path) + + with patch( + "crewai.cli.skills_proposals.ProposalStore", return_value=proposal_store + ), patch( + "crewai.skills.self_improve.acceptance.ProposalStore", + return_value=proposal_store, + ), patch( + "crewai.skills.self_improve.acceptance.SkillStore", return_value=skill_store + ): + yield runner, proposal_store, skill_store + + +class TestList: + def test_empty(self, runner_with_root) -> None: + runner, _, _ = runner_with_root + result = runner.invoke(skills_group, ["proposals", "list"]) + assert result.exit_code == 0 + assert "no pending proposals" in result.output + + def test_one_pending(self, runner_with_root, proposal: SkillProposal) -> None: + runner, ps, _ = runner_with_root + ps.save(proposal) + result = runner.invoke(skills_group, ["proposals", "list"]) + assert result.exit_code == 0 + assert proposal.id in result.output + assert "cite-sources" in result.output + + +class TestShow: + def test_unknown_id_exits_nonzero(self, runner_with_root) -> None: + runner, _, _ = runner_with_root + result = runner.invoke(skills_group, ["proposals", "show", "prop_does_not_exist"]) + assert result.exit_code == 1 + assert "No proposal" in result.output + + def test_prints_body(self, runner_with_root, proposal: SkillProposal) -> None: + runner, ps, _ = runner_with_root + ps.save(proposal) + result = runner.invoke(skills_group, ["proposals", "show", proposal.id]) + assert result.exit_code == 0 + assert "# Cite Sources" in result.output + assert "rationale" in result.output + + +class TestAccept: + def test_writes_skill_md_and_clears_queue( + self, runner_with_root, proposal: SkillProposal + ) -> None: + runner, ps, ss = runner_with_root + ps.save(proposal) + + result = runner.invoke(skills_group, ["proposals", "accept", proposal.id]) + assert result.exit_code == 0, result.output + assert ps.find(proposal.id) is None + assert (ss.skill_dir("researcher", "cite-sources") / "SKILL.md").is_file() + + def test_unknown_id_exits_nonzero(self, runner_with_root) -> None: + runner, _, _ = runner_with_root + result = runner.invoke(skills_group, ["proposals", "accept", "prop_nope"]) + assert result.exit_code == 1 + + +class TestReject: + def test_removes_from_queue(self, runner_with_root, proposal: SkillProposal) -> None: + runner, ps, _ = runner_with_root + ps.save(proposal) + + result = runner.invoke(skills_group, ["proposals", "reject", proposal.id]) + assert result.exit_code == 0 + assert ps.find(proposal.id) is None diff --git a/lib/crewai/tests/skills/self_improve/test_collector.py b/lib/crewai/tests/skills/self_improve/test_collector.py new file mode 100644 index 000000000..3ac2303dc --- /dev/null +++ b/lib/crewai/tests/skills/self_improve/test_collector.py @@ -0,0 +1,153 @@ +"""Tests for self_improve/collector.py. + +The collector is exercised through the real event bus inside a +``scoped_handlers()`` block. Events are constructed with +``model_construct`` so we don't need to spin up a real Agent + LLM. +""" + +from __future__ import annotations + +from datetime import UTC, datetime +from pathlib import Path +from types import SimpleNamespace + +import pytest + +from crewai.events.event_bus import crewai_event_bus +from crewai.events.types.agent_events import ( + AgentExecutionCompletedEvent, + AgentExecutionErrorEvent, + AgentExecutionStartedEvent, +) +from crewai.events.types.tool_usage_events import ( + ToolUsageErrorEvent, + ToolUsageFinishedEvent, + ToolUsageStartedEvent, +) +from crewai.skills.self_improve.collector import TraceCollector +from crewai.skills.self_improve.storage import TraceStore + + +def _fake_agent(*, agent_id: str = "agent-1", role: str = "researcher"): + return SimpleNamespace(id=agent_id, role=role, skills=None) + + +def _fake_task(task_id: str = "task-1", description: str = "do the thing"): + return SimpleNamespace(id=task_id, description=description) + + +def _started(agent, task) -> AgentExecutionStartedEvent: + return AgentExecutionStartedEvent.model_construct( + agent=agent, task=task, tools=[], task_prompt=task.description + ) + + +def _completed(agent, task, output: str) -> AgentExecutionCompletedEvent: + return AgentExecutionCompletedEvent.model_construct( + agent=agent, task=task, output=output + ) + + +def _error(agent, task, msg: str) -> AgentExecutionErrorEvent: + return AgentExecutionErrorEvent.model_construct(agent=agent, task=task, error=msg) + + +def _tool_started(agent_id: str, name: str, args="q=x") -> ToolUsageStartedEvent: + return ToolUsageStartedEvent.model_construct( + agent_id=agent_id, tool_name=name, tool_args=args + ) + + +def _tool_finished(agent_id: str, name: str, args="q=x") -> ToolUsageFinishedEvent: + now = datetime.now(UTC) + return ToolUsageFinishedEvent.model_construct( + agent_id=agent_id, + tool_name=name, + tool_args=args, + started_at=now, + finished_at=now, + output="result", + ) + + +def _tool_error(agent_id: str, name: str, args="q=x") -> ToolUsageErrorEvent: + return ToolUsageErrorEvent.model_construct( + agent_id=agent_id, tool_name=name, tool_args=args, error="boom" + ) + + +@pytest.fixture +def store(tmp_path: Path) -> TraceStore: + return TraceStore(root=tmp_path) + + +def test_collects_full_run_and_persists(store: TraceStore) -> None: + agent = _fake_agent() + task = _fake_task() + collector = TraceCollector(agent, store=store) + + with crewai_event_bus.scoped_handlers(): + collector.attach(crewai_event_bus) + # Flush between emits so the bus thread pool can't reorder + # tool events past the completion event in this fast test path. + crewai_event_bus.emit(None, _started(agent, task)) + crewai_event_bus.flush(timeout=5.0) + crewai_event_bus.emit(None, _tool_started(agent.id, "search")) + crewai_event_bus.emit(None, _tool_finished(agent.id, "search")) + crewai_event_bus.flush(timeout=5.0) + crewai_event_bus.emit(None, _completed(agent, task, "final answer")) + crewai_event_bus.flush(timeout=5.0) + + saved = store.list_for_role("researcher") + assert len(saved) == 1 + + trace = store.load(saved[0]) + assert trace.agent_role == "researcher" + assert trace.task_id == "task-1" + assert trace.output_summary == "final answer" + assert trace.outcome == "success" + assert trace.tool_call_count == 1 + assert trace.tool_error_count == 0 + + +def test_error_path_is_graded_as_failure(store: TraceStore) -> None: + agent = _fake_agent() + task = _fake_task() + collector = TraceCollector(agent, store=store) + + with crewai_event_bus.scoped_handlers(): + collector.attach(crewai_event_bus) + crewai_event_bus.emit(None, _started(agent, task)) + crewai_event_bus.flush(timeout=5.0) + crewai_event_bus.emit(None, _tool_error(agent.id, "search")) + crewai_event_bus.flush(timeout=5.0) + crewai_event_bus.emit(None, _error(agent, task, "agent crashed")) + crewai_event_bus.flush(timeout=5.0) + + [path] = store.list_for_role("researcher") + trace = store.load(path) + assert trace.outcome == "failure" + assert trace.error == "agent crashed" + assert trace.tool_error_count == 1 + + +def test_ignores_events_for_other_agents(store: TraceStore) -> None: + mine = _fake_agent(agent_id="mine", role="researcher") + other = _fake_agent(agent_id="other", role="editor") + task = _fake_task() + collector = TraceCollector(mine, store=store) + + with crewai_event_bus.scoped_handlers(): + collector.attach(crewai_event_bus) + crewai_event_bus.emit(None, _started(mine, task)) + crewai_event_bus.flush(timeout=5.0) + # tool events for some other agent must not pollute our trace + crewai_event_bus.emit(None, _tool_finished(other.id, "leaked-tool")) + crewai_event_bus.emit(None, _tool_finished(mine.id, "real-tool")) + crewai_event_bus.flush(timeout=5.0) + crewai_event_bus.emit(None, _completed(mine, task, "ok")) + crewai_event_bus.flush(timeout=5.0) + + [path] = store.list_for_role("researcher") + trace = store.load(path) + assert [t.name for t in trace.tool_calls] == ["real-tool"] diff --git a/lib/crewai/tests/skills/self_improve/test_models.py b/lib/crewai/tests/skills/self_improve/test_models.py new file mode 100644 index 000000000..feb5f8aec --- /dev/null +++ b/lib/crewai/tests/skills/self_improve/test_models.py @@ -0,0 +1,82 @@ +"""Tests for self_improve/models.py.""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from pathlib import Path + +from crewai.skills.self_improve.models import ( + RunTrace, + SelfImprovementConfig, + SkillProposal, + ToolCallRecord, +) + + +class TestSelfImprovementConfig: + def test_defaults(self) -> None: + cfg = SelfImprovementConfig() + assert cfg.skills_dir is None + + def test_skills_dir_round_trip(self, tmp_path: Path) -> None: + cfg = SelfImprovementConfig(skills_dir=tmp_path / "learned") + assert cfg.skills_dir == tmp_path / "learned" + + +class TestRunTrace: + def test_minimal_trace_has_defaults(self) -> None: + trace = RunTrace(agent_role="researcher") + assert trace.agent_role == "researcher" + assert trace.outcome == "unknown" + assert trace.tool_calls == [] + assert trace.id.startswith("run_") + assert trace.tool_call_count == 0 + assert trace.tool_error_count == 0 + + def test_tool_counters(self) -> None: + trace = RunTrace( + agent_role="researcher", + tool_calls=[ + ToolCallRecord(name="search", ok=True), + ToolCallRecord(name="search", ok=False, error="boom"), + ], + ) + assert trace.tool_call_count == 2 + assert trace.tool_error_count == 1 + + def test_serializes_roundtrip(self) -> None: + trace = RunTrace( + agent_role="researcher", + tool_calls=[ToolCallRecord(name="search", args_summary="q=hi")], + ) + payload = trace.model_dump_json() + roundtrip = RunTrace.model_validate_json(payload) + assert roundtrip.id == trace.id + assert roundtrip.tool_calls[0].name == "search" + + +class TestSkillProposal: + def test_minimal_proposal(self) -> None: + prop = SkillProposal( + agent_role="researcher", + name="my-skill", + description="A skill", + body="# body", + rationale="seen 3 times", + confidence=0.8, + ) + assert prop.proposal_kind == "new" + assert prop.id.startswith("prop_") + + def test_confidence_must_be_in_range(self) -> None: + with pytest.raises(ValidationError): + SkillProposal( + agent_role="r", + name="n", + description="d", + body="b", + rationale="r", + confidence=1.5, + ) diff --git a/lib/crewai/tests/skills/self_improve/test_reviewer.py b/lib/crewai/tests/skills/self_improve/test_reviewer.py new file mode 100644 index 000000000..9f63c421a --- /dev/null +++ b/lib/crewai/tests/skills/self_improve/test_reviewer.py @@ -0,0 +1,192 @@ +"""Tests for self_improve/reviewer.py.""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from crewai.skills.self_improve.models import RunTrace, SkillProposal, ToolCallRecord +from crewai.skills.self_improve.reviewer import ( + SkillReviewer, + _ReviewerOutput, + _format_trace, +) + + +def _llm_proposal(**kw): + """Build a SkillProposal as the LLM would emit it (no server-filled fields).""" + base = dict( + name="x", + description="d", + body="b", + rationale="r", + confidence=0.7, + ) + base.update(kw) + return SkillProposal(**base) + + +def _trace(**kw: Any) -> RunTrace: + base = {"agent_role": "researcher", "outcome": "success"} + base.update(kw) + return RunTrace(**base) + + +def _stub_llm(output: _ReviewerOutput) -> MagicMock: + """Return a BaseLLM stand-in whose ``call`` returns the given output.""" + llm = MagicMock() + llm.call = MagicMock(return_value=output) + return llm + + +@pytest.fixture +def reviewer_factory(): + def make(llm, **kw): + return SkillReviewer( + agent_role="researcher", + agent_goal="answer questions", + llm=llm, + **kw, + ) + + return make + + +class TestFormatTrace: + def test_includes_outcome_task_and_tool_calls(self) -> None: + trace = _trace( + task_description="Find papers on X", + output_summary="Found 3 papers.", + tool_calls=[ + ToolCallRecord(name="search", args_summary="q=X", ok=True), + ToolCallRecord(name="fetch", args_summary="id=42", ok=False, error="404"), + ], + ) + block = _format_trace(trace) + assert "outcome=success" in block + assert "Find papers on X" in block + assert "Found 3 papers." in block + assert "[ok] search(q=X)" in block + assert "[ERR] fetch(id=42)" in block + + def test_truncates_long_output(self) -> None: + trace = _trace(output_summary="x" * 5000) + block = _format_trace(trace) + assert "…" in block + assert len(block) < 5000 + + +class TestSkillReviewer: + def test_returns_empty_when_below_min_traces(self, reviewer_factory) -> None: + llm = _stub_llm(_ReviewerOutput(proposals=[])) + reviewer = reviewer_factory(llm, min_traces=3) + result = reviewer.review([_trace(), _trace()]) + assert result == [] + llm.call.assert_not_called() + + def test_filters_by_confidence_floor(self, reviewer_factory) -> None: + llm_output = _ReviewerOutput( + proposals=[ + _llm_proposal(name="keep", confidence=0.8), + _llm_proposal(name="drop", confidence=0.3), + ] + ) + llm = _stub_llm(llm_output) + reviewer = reviewer_factory(llm, min_traces=2, confidence_floor=0.6) + out = reviewer.review([_trace(), _trace(), _trace()]) + assert [p.name for p in out] == ["keep"] + + def test_sets_agent_role_and_run_ids(self, reviewer_factory) -> None: + llm = _stub_llm( + _ReviewerOutput(proposals=[_llm_proposal(name="cite-sources")]) + ) + reviewer = reviewer_factory(llm, min_traces=2) + traces = [_trace(), _trace(), _trace()] + out = reviewer.review(traces) + assert len(out) == 1 + prop = out[0] + assert prop.agent_role == "researcher" + assert prop.derived_from_runs == [t.id for t in traces] + assert prop.proposal_kind == "new" + + def test_passes_loaded_skills_into_prompt(self, reviewer_factory) -> None: + llm = _stub_llm(_ReviewerOutput(proposals=[])) + reviewer = reviewer_factory(llm, min_traces=2) + reviewer.review( + [_trace(), _trace(), _trace()], + loaded_skill_names=["citing", "search-tactics"], + ) + call_kwargs = llm.call.call_args.kwargs + messages = call_kwargs["messages"] + system_msg = next(m["content"] for m in messages if m["role"] == "system") + assert "citing" in system_msg + assert "search-tactics" in system_msg + + def test_handles_non_model_response_gracefully(self, reviewer_factory) -> None: + # An LLM that returned a plain string instead of the structured model. + llm = MagicMock() + llm.call = MagicMock(return_value="totally not a model") + reviewer = reviewer_factory(llm, min_traces=2) + out = reviewer.review([_trace(), _trace(), _trace()]) + assert out == [] + + def test_pending_proposals_appear_in_prompt(self, reviewer_factory) -> None: + llm = _stub_llm(_ReviewerOutput(proposals=[])) + reviewer = reviewer_factory(llm, min_traces=2) + pending = [ + SkillProposal( + agent_role="researcher", + name="cite-sources", + description="Always cite sources in research output.", + body="b", + rationale="r", + confidence=0.8, + ) + ] + reviewer.review( + [_trace(), _trace(), _trace()], + pending_proposals=pending, + ) + messages = llm.call.call_args.kwargs["messages"] + system_msg = next(m["content"] for m in messages if m["role"] == "system") + assert "cite-sources" in system_msg + assert "Always cite sources" in system_msg + # And it should be in the queued-proposals section, not the loaded + # skills section. + assert "QUEUED" in system_msg + + def test_pending_proposals_default_none_renders_none( + self, reviewer_factory + ) -> None: + llm = _stub_llm(_ReviewerOutput(proposals=[])) + reviewer = reviewer_factory(llm, min_traces=2) + reviewer.review([_trace(), _trace(), _trace()]) + system_msg = next( + m["content"] + for m in llm.call.call_args.kwargs["messages"] + if m["role"] == "system" + ) + # The "(none)" sentinel renders under both sections when nothing was passed. + assert system_msg.count("(none)") >= 2 + + def test_patch_existing_kind_passes_through(self, reviewer_factory) -> None: + llm = _stub_llm( + _ReviewerOutput( + proposals=[ + _llm_proposal( + name="citing-v2", + confidence=0.9, + proposal_kind="patch_existing", + target_skill="citing", + ) + ] + ) + ) + reviewer = reviewer_factory(llm, min_traces=2) + [prop] = reviewer.review( + [_trace(), _trace(), _trace()], loaded_skill_names=["citing"] + ) + assert prop.proposal_kind == "patch_existing" + assert prop.target_skill == "citing" diff --git a/lib/crewai/tests/skills/self_improve/test_storage.py b/lib/crewai/tests/skills/self_improve/test_storage.py new file mode 100644 index 000000000..af169cff6 --- /dev/null +++ b/lib/crewai/tests/skills/self_improve/test_storage.py @@ -0,0 +1,69 @@ +"""Tests for self_improve/storage.py.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from crewai.skills.self_improve.models import RunTrace, SkillProposal +from crewai.skills.self_improve.storage import ProposalStore, TraceStore + + +@pytest.fixture +def trace() -> RunTrace: + return RunTrace(agent_role="Senior Researcher", output_summary="hello") + + +@pytest.fixture +def proposal() -> SkillProposal: + return SkillProposal( + agent_role="Senior Researcher", + name="cite-sources", + description="Always cite sources", + body="# body", + rationale="3 of 5 runs cited", + confidence=0.7, + ) + + +class TestTraceStore: + def test_save_and_load_roundtrip(self, tmp_path: Path, trace: RunTrace) -> None: + store = TraceStore(root=tmp_path) + path = store.save(trace) + assert path.exists() + # role gets slugified into the dir name + assert "senior-researcher" in str(path) + + loaded = store.load(path) + assert loaded.id == trace.id + assert loaded.output_summary == "hello" + + def test_list_for_role(self, tmp_path: Path) -> None: + store = TraceStore(root=tmp_path) + for _ in range(3): + store.save(RunTrace(agent_role="researcher")) + assert store.count_for_role("researcher") == 3 + + def test_role_slug_is_filesystem_safe(self, tmp_path: Path) -> None: + store = TraceStore(root=tmp_path) + store.save(RunTrace(agent_role="Weird/Role:Name!")) + # only safe chars survive after slugify + assert any(p.is_dir() for p in store.root.iterdir()) + + +class TestProposalStore: + def test_save_and_load_roundtrip( + self, tmp_path: Path, proposal: SkillProposal + ) -> None: + store = ProposalStore(root=tmp_path) + path = store.save(proposal) + loaded = store.load(path) + assert loaded.id == proposal.id + assert loaded.name == "cite-sources" + + def test_delete(self, tmp_path: Path, proposal: SkillProposal) -> None: + store = ProposalStore(root=tmp_path) + store.save(proposal) + assert store.delete(proposal.id, "Senior Researcher") is True + assert store.delete(proposal.id, "Senior Researcher") is False diff --git a/uv.lock b/uv.lock index 5101cea49..fe366de86 100644 --- a/uv.lock +++ b/uv.lock @@ -13,7 +13,7 @@ resolution-markers = [ ] [options] -exclude-newer = "2026-04-27T16:00:00Z" +exclude-newer = "2026-04-28T07:00:00Z" [manifest] members = [