Compare commits

..

21 Commits

Author SHA1 Message Date
lorenzejay
eca18b03ab linted 2026-05-20 16:04:59 -07:00
lorenzejay
9e8b47f2db feat: add conversational flows documentation and chat session support
- Introduced a new guide for building multi-turn chat applications using , detailing session management and message handling.
- Added  class to facilitate chat interactions, including streaming support and event handling.
- Implemented  for class-level defaults and improved input normalization for conversational turns.
- Enhanced event listeners to manage flow events and tracing more effectively, including support for nested crew executions.
- Added tests for conversational flow helpers and kickoff parameters to ensure functionality and reliability.
2026-05-20 16:04:17 -07:00
alex-clawd
418afd29e7 feat: Skills Repository — registry, cache, CLI, and SDK integration (#5867)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run
* feat: add Skills Repository — registry, cache, CLI, and SDK integration

Adds a Skills Repository feature allowing users to publish, install,
and use skills from the CrewAI registry with @org/skill-name refs.

## What's New

### SDK (lib/crewai/)
- SkillFrontmatter: added optional 'version' field (backward compatible)
- SkillCacheManager: manages ~/.crewai/skills/{org}/{name}/ with
  .crewai_meta.json tracking, path-traversal-safe tar extraction
- SkillRegistry: parse @org/skill-name refs, local-first resolution
  (./skills/ > cache > download), interactive prompt on first use,
  CI-mode guard (CREWAI_NONINTERACTIVE/CI env vars)
- Agent.skills and Crew.skills widened to accept str refs (@org/name)
- set_skills() resolves registry refs with org-prefixed dedup keys
- New events: SkillDownloadStartedEvent, SkillDownloadCompletedEvent

### CLI (lib/cli/)
- crewai skill create <name> — context-aware (project vs standalone)
- crewai skill install @org/name — downloads to ./skills/ or cache
- crewai skill publish — ZIP + upload to org registry
- crewai skill list — show installed skills

### PlusAPI (lib/crewai-core/)
- Added SKILLS_RESOURCE, get_skill(), publish_skill(), list_skills()

### Scaffolding
- crew and flow templates now include skills/ directory

### Tests
- 91 SDK skill tests + 15 CLI skill tests, all passing

* fix: address all CI failures and CodeRabbit review comments

Lint:
- Remove unused imports (click, pytest, json)
- Replace try-except-pass with logging (S110)
- Fix unprotected zipfile.extractall (S202)

Security:
- Path traversal: startswith → is_relative_to for tar extraction
- Add path traversal protection to ZIP extraction via _safe_extract_zip
- Both cache.py and CLI main.py hardened

Type checker:
- Fix import path: crewai.events.event_bus (not crewai_event_bus)
- Remove unused type: ignore comments
- Fix type mismatches in set_skills() variable types

Code quality:
- Fix f-string interpolation in SkillNotCachedError
- Use ValidationError instead of Exception in test

* style: ruff format + autofix remaining lint errors

* refactor: reuse SDK parser and SkillCacheManager in CLI

- _parse_frontmatter() now delegates to crewai.skills.parser.parse_frontmatter
  when available, with a minimal fallback for CLI-only installs
- install() global cache path now reuses SkillCacheManager.store() instead
  of duplicating metadata writing logic

* refactor: add _print_current_organization to SkillCommand (matches ToolCommand pattern)

* fix: write .crewai_meta.json in fallback install path

CodeRabbit caught that the ImportError fallback in install() didn't write
cache metadata, making skills invisible to 'crewai skill list'.

* fix: tighten @org/name ref validation to prevent path traversal

Reject refs with multiple slashes (@org/a/b), dot segments (@../skill),
or leading dots in org/name. Applied to both CLI install() and SDK
parse_registry_ref() so the contract is enforced consistently.

* fix: update test assertions to match tightened error messages

* fix: align OSS client with AMP API contract

- download_skill(): fetch download_url (presigned URL) instead of
  expecting inline base64. Falls back to 'file' field for compat.
- Read 'latest_version' field, fall back to 'version'
- Same fixes applied to CLI install() command

* fix: publish as tar.gz (matches AMP content_type validation) + add zip fallback to SDK cache

CLI publish:
- _build_skill_zip → _build_skill_tarball (tar.gz format)
- Content type: application/x-gzip (matches SkillVersion validation)

SDK cache:
- store() now tries tar.gz first, falls back to zip extraction
- Added _safe_extract_zip for path-traversal-safe zip handling
- Both formats work for download/install regardless of server format

---------

Co-authored-by: João Moura <joaomdmoura@gmail.com>
2026-05-20 14:38:25 -03:00
Greyson LaLonde
7cc1a7bb41 fix(deps): bump pip and paramiko to drop pip-audit ignores
Some checks failed
Build uv cache / build-cache (3.10) (push) Waiting to run
Build uv cache / build-cache (3.11) (push) Waiting to run
Build uv cache / build-cache (3.12) (push) Waiting to run
Build uv cache / build-cache (3.13) (push) Waiting to run
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
OSV no longer flags pip 26.1.1 (GHSA-58qw-9mgm-455v) or paramiko
5.0.0 (GHSA-r374-rxx8-8654), so override both to those minimums
and remove the corresponding --ignore-vuln entries. paramiko is
pulled in transitively via composio-core.
2026-05-20 22:33:43 +08:00
Greyson LaLonde
09ffe87fbb ci: ignore pip-audit findings without published fixes
Adds joblib, markdown, nltk, onnx, pyjwt, torch and transformers
advisories that have no fixed version available (or are disputed)
to the pip-audit ignore list. Rationale recorded next to each ID.
2026-05-20 21:40:30 +08:00
Greyson LaLonde
14af56b74d ci: pin third-party actions to commit SHAs
Replaces version tags (e.g. astral-sh/setup-uv@v6, slackapi/slack-github-action@v2.1.0)
with full commit SHAs across every workflow. Mitigates supply-chain risk from
mutable tags.
2026-05-20 19:01:53 +08:00
Greyson LaLonde
35f693cf68 chore: tighten typing across plus_api client
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Adds typed containers for wire payloads, literal aliases for HTTP method
and log type, and Ffnal markers on resource constants. Updates
upstream returns in project_utils.py and deploy/main.py to match
the new contracts.
2026-05-20 01:43:48 +08:00
Greyson LaLonde
da15554d81 feat: generate categorized release notes for enterprise 2026-05-20 00:24:26 +08:00
Greyson LaLonde
284533464f fix: bump idna to 3.15 to address GHSA-65pc-fj4g-8rjx
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
2026-05-19 23:38:34 +08:00
Tiago Freire
024e230b2c docs: remove {" "} JSX expressions breaking <Steps> render (#5857)
## Overview

Prettier-inserted bare `{" "}` lines between sibling `<Step>` elements caused Mintlify's `<Steps>` to crash with "Cannot read properties of undefined (reading 'stepNumber')", leaving the page body blank.

### Affected pages (en/ar/ko/pt-BR):
- enterprise/guides/enable-crew-studio
- learn/llm-selection-guide
2026-05-19 10:44:53 -04:00
Greyson LaLonde
a4c90b6912 docs: update changelog and version for v1.14.5
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-05-19 03:19:40 +08:00
Greyson LaLonde
c50da7a6f2 feat: bump versions to 1.14.5 2026-05-19 03:11:26 +08:00
Irfaan Mansoori
e8aa870f90 fix: memory leak in git.py by using cached_property
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-18 21:55:57 +08:00
Greyson LaLonde
14cd81eec6 docs: update changelog and version for v1.14.5a7 2026-05-18 21:13:34 +08:00
Greyson LaLonde
a6225da326 feat: bump versions to 1.14.5a7 2026-05-18 21:08:46 +08:00
Greyson LaLonde
259d334e38 chore(devtools): skip pinning crewai-files in file-processing extra 2026-05-18 21:00:37 +08:00
Greyson LaLonde
42aa8a777c chore: deprecate function_calling_llm field 2026-05-18 20:49:11 +08:00
Heitor Carvalho
a95d26763f docs: update changelog and version for v1.14.5a6 (#5828)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-05-15 17:05:04 -03:00
Heitor Carvalho
65ec783aae feat: bump versions to 1.14.5a6 (#5827) 2026-05-15 16:51:59 -03:00
Greyson LaLonde
eefe0e42ac fix: surface streamed tool calls when available_functions is absent 2026-05-16 02:46:35 +08:00
Greyson LaLonde
75bb882911 fix(deps): bump langsmith to >=0.8.0 for GHSA-3644-q5cj-c5c7
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2026-05-15 21:32:52 +08:00
79 changed files with 7270 additions and 1678 deletions

View File

@@ -26,7 +26,7 @@ jobs:
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: ${{ matrix.python-version }}

View File

@@ -22,10 +22,10 @@ jobs:
steps:
- name: Generate GitHub App token
id: app-token
uses: tibdex/github-app-token@v2
uses: actions/create-github-app-token@bcd2ba49218906704ab6c1aa796996da409d3eb1 # v3.2.0
with:
app_id: ${{ secrets.CREWAI_TOOL_SPECS_APP_ID }}
private_key: ${{ secrets.CREWAI_TOOL_SPECS_PRIVATE_KEY }}
app-id: ${{ secrets.CREWAI_TOOL_SPECS_APP_ID }}
private-key: ${{ secrets.CREWAI_TOOL_SPECS_PRIVATE_KEY }}
- name: Checkout code
uses: actions/checkout@v4
@@ -34,7 +34,7 @@ jobs:
token: ${{ steps.app-token.outputs.token }}
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: "3.12"

View File

@@ -13,7 +13,7 @@ jobs:
code: ${{ steps.filter.outputs.code }}
steps:
- uses: actions/checkout@v4
- uses: dorny/paths-filter@v3
- uses: dorny/paths-filter@d1c1ffe0248fe513906c8e24db8ea791d46f8590 # v3
id: filter
with:
filters: |
@@ -41,7 +41,7 @@ jobs:
uv-main-py3.11-
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: "3.11"

View File

@@ -44,7 +44,7 @@ jobs:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: "3.12"
@@ -103,7 +103,7 @@ jobs:
contents: read
steps:
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: "3.12"

View File

@@ -10,7 +10,7 @@ jobs:
permissions:
pull-requests: write
steps:
- uses: codelytv/pr-size-labeler@v1
- uses: codelytv/pr-size-labeler@095a41fca88b8764fd9e008ad269bcdb82bb38b9 # v1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
xs_label: "size/XS"

View File

@@ -12,7 +12,7 @@ jobs:
pr-title:
runs-on: ubuntu-latest
steps:
- uses: amannn/action-semantic-pull-request@v5
- uses: amannn/action-semantic-pull-request@e32d7e603df1aa1ba07e981f2a23455dee596825 # v5
continue-on-error: true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -34,7 +34,7 @@ jobs:
python-version: "3.12"
- name: Install uv
uses: astral-sh/setup-uv@v4
uses: astral-sh/setup-uv@38f3f104447c67c051c4a08e39b64a148898af3a # v4
- name: Build packages
run: |
@@ -63,7 +63,7 @@ jobs:
ref: ${{ inputs.release_tag || github.ref }}
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: "3.12"
@@ -159,7 +159,7 @@ jobs:
- name: Notify Slack
if: success()
uses: slackapi/slack-github-action@v2.1.0
uses: slackapi/slack-github-action@b0fa283ad8fea605de13dc3f449259339835fc52 # v2.1.0
with:
webhook: ${{ secrets.SLACK_WEBHOOK_URL }}
webhook-type: incoming-webhook

View File

@@ -13,7 +13,7 @@ jobs:
code: ${{ steps.filter.outputs.code }}
steps:
- uses: actions/checkout@v4
- uses: dorny/paths-filter@v3
- uses: dorny/paths-filter@d1c1ffe0248fe513906c8e24db8ea791d46f8590 # v3
id: filter
with:
filters: |
@@ -51,7 +51,7 @@ jobs:
uv-main-py${{ matrix.python-version }}-
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: ${{ matrix.python-version }}

View File

@@ -13,7 +13,7 @@ jobs:
code: ${{ steps.filter.outputs.code }}
steps:
- uses: actions/checkout@v4
- uses: dorny/paths-filter@v3
- uses: dorny/paths-filter@d1c1ffe0248fe513906c8e24db8ea791d46f8590 # v3
id: filter
with:
filters: |
@@ -48,7 +48,7 @@ jobs:
uv-main-py${{ matrix.python-version }}-
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: ${{ matrix.python-version }}

View File

@@ -38,7 +38,7 @@ jobs:
uv-main-py${{ matrix.python-version }}-
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: ${{ matrix.python-version }}

View File

@@ -31,7 +31,7 @@ jobs:
uv-main-py3.11-
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@d0cc045d04ccac9d8b7881df0226f9e82c39688e # v6
with:
version: "0.11.3"
python-version: "3.11"
@@ -46,11 +46,39 @@ jobs:
- name: Run pip-audit
run: |
uv run pip-audit --desc --aliases --skip-editable --format json --output pip-audit-report.json \
--ignore-vuln CVE-2026-3219 \
--ignore-vuln GHSA-r374-rxx8-8654
--ignore-vuln PYSEC-2024-277 \
--ignore-vuln PYSEC-2026-89 \
--ignore-vuln PYSEC-2026-97 \
--ignore-vuln PYSEC-2025-148 \
--ignore-vuln PYSEC-2025-183 \
--ignore-vuln PYSEC-2025-189 \
--ignore-vuln PYSEC-2025-190 \
--ignore-vuln PYSEC-2025-191 \
--ignore-vuln PYSEC-2025-192 \
--ignore-vuln PYSEC-2025-193 \
--ignore-vuln PYSEC-2025-194 \
--ignore-vuln PYSEC-2025-195 \
--ignore-vuln PYSEC-2025-196 \
--ignore-vuln PYSEC-2025-197 \
--ignore-vuln PYSEC-2025-210 \
--ignore-vuln PYSEC-2026-139 \
--ignore-vuln PYSEC-2025-211 \
--ignore-vuln PYSEC-2025-212 \
--ignore-vuln PYSEC-2025-213 \
--ignore-vuln PYSEC-2025-214 \
--ignore-vuln PYSEC-2025-215 \
--ignore-vuln PYSEC-2025-216 \
--ignore-vuln PYSEC-2025-217 \
--ignore-vuln PYSEC-2025-218
# Ignored CVEs:
# CVE-2026-3219 - pip 26.0.1 (GHSA-58qw-9mgm-455v): no fix available, archive handling issue
# GHSA-r374-rxx8-8654 - paramiko 4.0.0 (SHA-1 in rsakey.py): no fix available; transitive via composio-core
# PYSEC-2024-277 - joblib 1.5.3: disputed; NumpyArrayWrapper only used with trusted caches
# PYSEC-2026-89 - markdown 3.10.2: DoS via malformed HTML; fix 3.8.1 — already past, advisory range is stale
# PYSEC-2026-97 - nltk 3.9.4: arbitrary file read in filestring(); no fix available
# PYSEC-2025-148 - onnx 1.21.0: path traversal in save_external_data; no fix available
# PYSEC-2025-183 - pyjwt 2.12.1: disputed weak-encryption claim; key length is application-chosen
# PYSEC-2025-189..197 - torch 2.11.0: memory-corruption/DoS in functions only reachable via untrusted models; no fix available
# PYSEC-2025-210, PYSEC-2026-139 - torch 2.11.0: profiler/deserialization issues; no fix available
# PYSEC-2025-211..218 - transformers 5.5.4: deserialization/code injection via malicious model checkpoints; no fix available
continue-on-error: true
- name: Display results

View File

@@ -4,6 +4,86 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="19 مايو 2026">
## v1.14.5
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5)
## ما الذي تغير
### الميزات
- إلغاء استخدام `CrewAgentExecutor`، وتعيين وكلاء الطاقم الافتراضيين إلى `AgentExecutor`
- تحسين أدوات صندوق الرمل Daytona
- إضافة معلمة بدء `restore_from_state_id`
- إضافة تسليط الضوء على `ExaSearchTool`، وإعادة تسميته من `EXASearchTool`
### إصلاحات الأخطاء
- إصلاح تسرب الذاكرة في `git.py` باستخدام `cached_property`
- عرض استدعاءات الأدوات المتدفقة عندما تكون `available_functions` غائبة
- ضمان تحميل أحداث `skills` للتتبع
- تصحيح مسار نقطة النهاية للحالة من `/{kickoff_id}/status` إلى `/status/{kickoff_id}`
- استعادة كتلة الشيفرة المفقودة في دليل التدفق الأول للغة البرتغالية (pt-BR)
- منع `result_as_answer` من إرجاع رسائل الخطأ أو الكتل المرتبطة كإجابة نهائية
- الحفاظ على مخرجات المهام عبر تفريغ الدفعات غير المتزامنة
- دائمًا استعادة `task.output_pydantic` في كتلة finally
- التعامل مع إدخال `BaseModel` في `convert_to_model`
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.5
- إضافة دليل ترقية OSS و انتقال الطاقم إلى التدفق
- توثيق متغيرات البيئة الإضافية لأدوات المطور
- إضافة وثائق لـ `TavilyGetResearch`
### إعادة الهيكلة
- استخراج واجهة سطر الأوامر إلى حزمة مستقلة `crewai-cli`
## المساهمون
@NIK-TIGER-BILL, @akaKuruma, @cgoeppinger, @github-actions[bot], @greysonlalonde, @heitorado, @irfaan101, @iris-clawd, @lorenzejay, @manisrinivasan2k1, @minasami-pr, @mislavivanda, @theCyberTech, @theishangoswami, @wishhyt
</Update>
<Update label="18 مايو 2026">
## v1.14.5a7
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a7)
## ما الذي تغير
### الوثائق
- تحديث سجل التغييرات والإصدار لـ v1.14.5a6
### تغييرات كسرية
- إلغاء حقل function_calling_llm
## المساهمون
@greysonlalonde, @heitorado
</Update>
<Update label="15 مايو 2026">
## v1.14.5a6
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a6)
## ما الذي تغير
### إصلاحات الأخطاء
- إصلاح استدعاءات الأدوات المتدفقة عندما تكون available_functions غائبة
- رفع اعتماد langsmith إلى الإصدار >=0.8.0 لمعالجة GHSA-3644-q5cj-c5c7
- حل مشاكل الأماكن الشاغرة لكتل التعليمات البرمجية غير المترجمة في وثائق البرتغالية البرازيلية
### الوثائق
- إضافة وثائق لـ TavilyGetResearch
- تحديث سجل التغييرات والإصدار لـ v1.14.5a5
## المساهمون
@greysonlalonde, @heitorado, @iris-clawd, @lorenzejay, @manisrinivasan2k1
</Update>
<Update label="13 مايو 2026">
## v1.14.5a5

View File

@@ -146,7 +146,6 @@ Crew Studio هو طريقة مبتكرة لإنشاء طواقم وكلاء ال
</Step>
{" "}
<Step title="الإجابة على الأسئلة">
أجب على أسئلة التوضيح من مساعد الطاقم لتنقيح
متطلباتك.
@@ -161,12 +160,10 @@ Crew Studio هو طريقة مبتكرة لإنشاء طواقم وكلاء ال
</Step>
{" "}
<Step title="الموافقة أو التعديل">
وافق على الخطة أو اطلب تغييرات إذا لزم الأمر.
</Step>
{" "}
<Step title="التنزيل أو النشر">
نزّل الكود للتخصيص أو انشر مباشرة على المنصة.
</Step>

View File

@@ -802,7 +802,6 @@ The tables below show a representative sample of current top-performing models a
Begin with well-established models like **GPT-4.1**, **Claude 3.7 Sonnet**, or **Gemini 2.0 Flash** that offer good performance across multiple dimensions and have extensive real-world validation.
</Step>
{" "}
<Step title="Identify Specialized Needs">
Determine if your crew has specific requirements (coding, reasoning, speed)
that would benefit from specialized models like **Claude 4 Sonnet** for
@@ -810,7 +809,6 @@ The tables below show a representative sample of current top-performing models a
consider fast inference providers like **Groq** alongside model selection.
</Step>
{" "}
<Step title="Implement Multi-Model Strategy">
Use different models for different agents based on their roles.
High-capability models for managers and complex tasks, efficient models for

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,86 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="May 19, 2026">
## v1.14.5
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5)
## What's Changed
### Features
- Deprecate `CrewAgentExecutor`, default Crew agents to `AgentExecutor`
- Improve Daytona sandbox tools
- Add `restore_from_state_id` kickoff parameter
- Add highlights to `ExaSearchTool`, rename from `EXASearchTool`
### Bug Fixes
- Fix memory leak in `git.py` by using `cached_property`
- Surface streamed tool calls when `available_functions` is absent
- Ensure `skills` loading events for traces
- Correct status endpoint path from `/{kickoff_id}/status` to `/status/{kickoff_id}`
- Restore missing code block in pt-BR first-flow guide
- Prevent `result_as_answer` from returning hook-block or error messages as final answer
- Preserve task outputs across async batch flush
- Always restore `task.output_pydantic` in finally block
- Handle `BaseModel` input in `convert_to_model`
### Documentation
- Update changelog and version for v1.14.5
- Add OSS upgrade & crew-to-flow migration guide
- Document additional env vars for devtools
- Add docs for `TavilyGetResearch`
### Refactoring
- Extract CLI into standalone `crewai-cli` package
## Contributors
@NIK-TIGER-BILL, @akaKuruma, @cgoeppinger, @github-actions[bot], @greysonlalonde, @heitorado, @irfaan101, @iris-clawd, @lorenzejay, @manisrinivasan2k1, @minasami-pr, @mislavivanda, @theCyberTech, @theishangoswami, @wishhyt
</Update>
<Update label="May 18, 2026">
## v1.14.5a7
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a7)
## What's Changed
### Documentation
- Update changelog and version for v1.14.5a6
### Breaking Changes
- Deprecate function_calling_llm field
## Contributors
@greysonlalonde, @heitorado
</Update>
<Update label="May 15, 2026">
## v1.14.5a6
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a6)
## What's Changed
### Bug Fixes
- Fix streamed tool calls when available_functions is absent
- Bump langsmith dependency to version >=0.8.0 to address GHSA-3644-q5cj-c5c7
- Resolve untranslated code block placeholders in Brazilian Portuguese documentation
### Documentation
- Add documentation for TavilyGetResearch
- Update changelog and version for v1.14.5a5
## Contributors
@greysonlalonde, @heitorado, @iris-clawd, @lorenzejay, @manisrinivasan2k1
</Update>
<Update label="May 13, 2026">
## v1.14.5a5

View File

@@ -146,7 +146,6 @@ Here's a typical workflow for creating a crew with Crew Studio:
</Step>
{" "}
<Step title="Answer Questions">
Respond to clarifying questions from the Crew Assistant to refine your
requirements.
@@ -161,12 +160,10 @@ Here's a typical workflow for creating a crew with Crew Studio:
</Step>
{" "}
<Step title="Approve or Modify">
Approve the plan or request changes if necessary.
</Step>
{" "}
<Step title="Download or Deploy">
Download the code for customization or deploy directly to the platform.
</Step>

View File

@@ -0,0 +1,150 @@
---
title: Conversational Flows
description: Build multi-turn chat apps with kickoff per turn, message history, and intent routing.
icon: comments
mode: "wide"
---
## One entry point: `kickoff`
Chat apps should use **`flow.kickoff(user_message=..., session_id=...)`** for each user line. Do not add a separate `chat()` method — session identity is `state.id` (same as `inputs["id"]` with `@persist`).
| API | Use for |
|-----|---------|
| `kickoff(user_message=..., session_id=...)` | Each new user message (API, WebSocket turn, CLI loop) |
| `ask()` | Blocking prompt **inside** one step (wizard, clarification) |
| `@human_feedback` | Approve/reject **a step output** before continuing — not the next chat line |
## Quick start
```python
from uuid import uuid4
from pydantic import Field
from crewai.flow import ChatState, Flow, listen, persist, router, start
from crewai.flow.persistence import SQLiteFlowPersistence
@persist(SQLiteFlowPersistence())
class SupportFlow(Flow[ChatState]):
@start()
def bootstrap(self):
if self.state.session_ready:
return "ready"
# load permissions once per session
self.state.session_ready = True
return "ready"
@router(bootstrap)
def route(self):
if self.state.last_intent:
return self.state.last_intent
return self.classify_intent(
self.state.last_user_message,
outcomes=["order", "help", "goodbye"],
llm="gpt-4o-mini",
context=self.conversation_messages,
)
@listen("order")
def handle_order(self):
reply = "Your order is on the way."
self.append_message("assistant", reply)
return reply
@listen("help")
def handle_help(self):
reply = "How can I help?"
self.append_message("assistant", reply)
return reply
session_id = str(uuid4())
flow = SupportFlow()
# Turn 1
flow.kickoff(user_message="Where is my order?", session_id=session_id)
# Turn 2 — same session, flow finished after turn 1 is normal
flow.kickoff(user_message="What about returns?", session_id=session_id)
```
## When the flow finishes but the user keeps chatting
`FlowFinished` means **this graph run** completed. The conversation continues with **another `kickoff`** and the same `session_id`. `@persist` restores `messages`, permissions flags, and context.
For multi-turn chat, prefer **`@persist` on a single terminal step** (for example `finalize`) rather than on the whole `Flow` class. Class-level persist saves after every method; `load_state` uses the latest row, which is often a mid-run snapshot (for example right after `bootstrap`) and can omit handler updates from the same turn.
Do **not** use `@human_feedback` for follow-up questions unless a human must approve a specific payload before it is shown.
## Tracing across turns
By default, `ConversationalConfig(defer_trace_finalization=True)` keeps **one trace batch** for the whole chat session instead of finalizing after every `kickoff()`. Call `flow.finalize_session_traces()` when the user leaves (or use `ChatSession.close()`, which does this automatically).
```python
flow.kickoff(user_message="Hello", session_id=session_id)
flow.kickoff(user_message="Track my order", session_id=session_id)
flow.finalize_session_traces() # one link for the full conversation
```
Crews kicked off **inside** a flow method (for example a research step) append their events to the **parent flow batch**. `CrewKickoffCompleted` does not finalize that batch (even when crew worker threads lose `current_flow_id` context); finalization happens in `finalize_session_traces()`.
Per-turn `flow_finished` is also deferred: only `flow_started` opens the session scope on the first turn, and `flow_finished` runs once in `finalize_session_traces()` so the event bus does not warn about a missing `flow_started`.
## Kickoff parameters
| Parameter | Purpose |
|-----------|---------|
| `user_message` | This turn's user text (also `inputs["user_message"]`) |
| `session_id` | Conversation UUID → `inputs["id"]` / `state.id` |
| `intents` | Optional labels for pre-kickoff `classify_intent` |
| `intent_llm` | LLM for classification (required with `intents`) |
| `interactive=True` | CLI demo loop via `ask()` (not for production APIs) |
Class-level defaults:
```python
from crewai.flow import ConversationalConfig, Flow
class MyFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
default_intents=["order", "help"],
intent_llm="gpt-4o-mini",
)
```
## Helpers on `Flow`
- `append_message(role, content)` — update `state.messages`
- `conversation_messages` — history for LLM calls
- `classify_intent(text, outcomes, llm=..., context=...)` — route labels (same logic as `@human_feedback` collapse)
- `receive_user_message(text, ...)` — append user line + optional classify
- `input_history` — audit trail from `ask()`
Recommended state shape: `ChatState` (`id`, `messages`, `last_user_message`, `last_intent`, `session_ready`).
## ChatSession (WebSocket / SSE bridge)
For UIs, use `ChatSession` to wrap kickoff and map events to `ChatMessage`:
```python
from crewai.flow import ChatSession
def on_event(msg):
print(msg.type, msg.payload)
session = ChatSession(flow, session_id="channel-1", on_event=on_event)
turn = session.handle_turn("Hello")
print(turn.output, turn.intent)
session.close()
```
`QueueInputProvider` supports blocking `ask()` fed by a WebSocket handler (`provider.push(session_id, text)`).
## Streaming
Enable `stream = True` on the Flow class and use `kickoff` / `ChatSession.handle_turn(..., stream=True)` to emit `assistant_delta` events through `ConversationEventBridge`.

View File

@@ -1,131 +0,0 @@
---
title: Platform Tools CLI
description: Create, publish, and install custom tools on the CrewAI platform using the CLI.
icon: terminal
mode: "wide"
---
## Overview
The CrewAI CLI provides commands to manage custom tools on the **CrewAI platform** — a hosted tool registry that lets you share tools within your organization without publishing to PyPI.
| Command | Purpose |
|---------|---------|
| `crewai tool create <handle>` | Scaffold a new tool project |
| `crewai tool publish` | Publish the tool to the CrewAI platform |
| `crewai tool install <handle>` | Install a platform tool into your crew project |
<Note type="info" title="Platform vs PyPI">
These commands manage tools on the **CrewAI platform registry**. If you want to publish a standalone Python package to PyPI instead, see the [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) guide.
</Note>
## Prerequisites
- **CrewAI CLI** installed (`pip install crewai`)
- **Authenticated** with the platform — run `crewai login` first
---
## Step 1: Create a Tool Project
Scaffold a new tool project:
```bash
crewai tool create my_custom_tool
```
This generates a project structure with the boilerplate you need to start building your tool.
<Tip>
The `handle` is the unique identifier for your tool on the platform. Choose something descriptive and specific to what the tool does.
</Tip>
### Implement Your Tool
Edit the generated tool file to add your logic. The tool follows the standard CrewAI tools contract — you can subclass `BaseTool` or use the `@tool` decorator:
```python
from crewai.tools import BaseTool
class MyCustomTool(BaseTool):
name: str = "My Custom Tool"
description: str = "Description of what this tool does — be specific so agents know when to use it."
def _run(self, argument: str) -> str:
# Your tool logic here
return "result"
```
For the full tools API reference (input schemas, caching, async support, error handling), see the [Create Custom Tools](/en/learn/create-custom-tools) guide.
---
## Step 2: Publish to the Platform
From your tool project directory, publish it to the CrewAI platform:
```bash
crewai tool publish
```
### Options
| Flag | Description |
|------|-------------|
| `--force` | Bypass Git remote validations |
Tools are published privately to your organization by default.
---
## Step 3: Install a Platform Tool
To install a tool that's been published to the platform:
```bash
crewai tool install my_custom_tool
```
Once installed, you can use the tool in your crew like any other tool — assign it to an agent via the `tools` parameter.
---
## Full Lifecycle Example
```bash
# 1. Authenticate with the platform
crewai login
# 2. Scaffold a new tool
crewai tool create weather_lookup
# 3. Implement your logic in the generated project
cd weather_lookup
# ... edit the tool file ...
# 4. Publish to the platform
crewai tool publish
# 5. In another project, install and use it
crewai tool install weather_lookup
```
---
## Platform Tools vs PyPI Packages
| | Platform Tools | PyPI Packages |
|---|---|---|
| **Publish** | `crewai tool publish` | `uv build` + `uv publish` |
| **Registry** | CrewAI platform | PyPI |
| **Install** | `crewai tool install <handle>` | `pip install <package>` |
| **Auth** | `crewai login` | PyPI account + token |
| **Visibility** | Organization-scoped (private) | Always public |
| **Guide** | This page | [Publish Custom Tools](/en/guides/tools/publish-custom-tools) |
---
## Related
- [Create Custom Tools](/en/learn/create-custom-tools) — Python API reference for building tools (BaseTool, @tool decorator)
- [Publish Custom Tools to PyPI](/en/guides/tools/publish-custom-tools) — package and distribute tools as standalone Python libraries

View File

@@ -12,9 +12,7 @@ incorporating the latest functionalities such as tool delegation, error handling
enabling agents to perform a wide range of actions.
<Tip>
**Want to publish your tool to the CrewAI platform?** Use the CLI to scaffold, publish, and share tools directly on the platform — see the [Platform Tools CLI](/en/guides/tools/platform-tools-cli) guide.
**Prefer publishing to PyPI?** Check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to package and distribute your tool as a standalone Python library.
**Want to publish your tool for the community?** If you're building a tool that others could benefit from, check out the [Publish Custom Tools](/en/guides/tools/publish-custom-tools) guide to learn how to package and distribute your tool on PyPI.
</Tip>
### Subclassing `BaseTool`

View File

@@ -805,7 +805,6 @@ The tables below show a representative sample of current top-performing models a
Begin with well-established models like **GPT-4.1**, **Claude 3.7 Sonnet**, or **Gemini 2.0 Flash** that offer good performance across multiple dimensions and have extensive real-world validation.
</Step>
{" "}
<Step title="Identify Specialized Needs">
Determine if your crew has specific requirements (coding, reasoning, speed)
that would benefit from specialized models like **Claude 4 Sonnet** for
@@ -813,7 +812,6 @@ The tables below show a representative sample of current top-performing models a
consider fast inference providers like **Groq** alongside model selection.
</Step>
{" "}
<Step title="Implement Multi-Model Strategy">
Use different models for different agents based on their roles.
High-capability models for managers and complex tasks, efficient models for

View File

@@ -4,6 +4,86 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 5월 19일">
## v1.14.5
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5)
## 변경 사항
### 기능
- `CrewAgentExecutor` 사용 중단, 기본 Crew 에이전트를 `AgentExecutor`로 설정
- Daytona 샌드박스 도구 개선
- `restore_from_state_id` 시작 매개변수 추가
- `ExaSearchTool`에 하이라이트 추가, 이름을 `EXASearchTool`에서 변경
### 버그 수정
- `git.py`에서 `cached_property`를 사용하여 메모리 누수 수정
- `available_functions`가 없을 때 스트리밍 도구 호출 표시
- 추적을 위한 `skills` 로딩 이벤트 보장
- 상태 엔드포인트 경로를 `/{kickoff_id}/status`에서 `/status/{kickoff_id}`로 수정
- pt-BR 첫 흐름 가이드에서 누락된 코드 블록 복원
- `result_as_answer`가 후크 블록이나 오류 메시지를 최종 답변으로 반환하지 않도록 방지
- 비동기 배치 플러시 간 작업 출력 보존
- 항상 finally 블록에서 `task.output_pydantic` 복원
- `convert_to_model`에서 `BaseModel` 입력 처리
### 문서화
- v1.14.5에 대한 변경 로그 및 버전 업데이트
- OSS 업그레이드 및 Crew-투-흐름 마이그레이션 가이드 추가
- 개발 도구를 위한 추가 환경 변수 문서화
- `TavilyGetResearch`에 대한 문서 추가
### 리팩토링
- CLI를 독립형 `crewai-cli` 패키지로 추출
## 기여자
@NIK-TIGER-BILL, @akaKuruma, @cgoeppinger, @github-actions[bot], @greysonlalonde, @heitorado, @irfaan101, @iris-clawd, @lorenzejay, @manisrinivasan2k1, @minasami-pr, @mislavivanda, @theCyberTech, @theishangoswami, @wishhyt
</Update>
<Update label="2026년 5월 18일">
## v1.14.5a7
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a7)
## 변경 사항
### 문서
- v1.14.5a6의 변경 로그 및 버전 업데이트
### 주요 변경 사항
- function_calling_llm 필드 사용 중단
## 기여자
@greysonlalonde, @heitorado
</Update>
<Update label="2026년 5월 15일">
## v1.14.5a6
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a6)
## 변경 사항
### 버그 수정
- available_functions가 없을 때 스트리밍 도구 호출 수정
- GHSA-3644-q5cj-c5c7 문제를 해결하기 위해 langsmith 의존성을 버전 >=0.8.0으로 업데이트
- 브라질 포르투갈어 문서에서 번역되지 않은 코드 블록 자리 표시자 해결
### 문서
- TavilyGetResearch에 대한 문서 추가
- v1.14.5a5에 대한 변경 로그 및 버전 업데이트
## 기여자
@greysonlalonde, @heitorado, @iris-clawd, @lorenzejay, @manisrinivasan2k1
</Update>
<Update label="2026년 5월 13일">
## v1.14.5a5

View File

@@ -145,7 +145,6 @@ LLM 연결과 기본 설정을 구성했다면 이제 Crew Studio 사용을 시
</Step>
{" "}
<Step title="질문에 답하기">
crew assistant가 요구 사항을 구체화할 수 있도록 하는 추가 질문에 답변하세요.
</Step>
@@ -159,12 +158,10 @@ LLM 연결과 기본 설정을 구성했다면 이제 Crew Studio 사용을 시
</Step>
{" "}
<Step title="승인 또는 수정">
계획을 승인하거나 필요하다면 변경을 요청하세요.
</Step>
{" "}
<Step title="다운로드 또는 배포">
사용자화를 위해 코드를 다운로드하거나 플랫폼에 직접 배포하세요.
</Step>

View File

@@ -797,7 +797,6 @@ LLM 선택을 최적화하고자 하는 팀을 위해 **CrewAI AMP 플랫폼**
여러 차원에서 우수한 성능을 제공하며 실제 환경에서 광범위하게 검증된 **GPT-4.1**, **Claude 3.7 Sonnet**, **Gemini 2.0 Flash**와 같은 잘 알려진 모델부터 시작하십시오.
</Step>
{" "}
<Step title="특화된 요구 사항 식별">
crew에 코드 작성, reasoning, 속도 등 특정 요구가 있는지 확인하고, 이러한
요구에 부합하는 **Claude 4 Sonnet**(개발용) 또는 **o3**(복잡한 분석용)과 같은
@@ -805,7 +804,6 @@ LLM 선택을 최적화하고자 하는 팀을 위해 **CrewAI AMP 플랫폼**
더불어 **Groq**와 같은 빠른 추론 제공자를 고려할 수 있습니다.
</Step>
{" "}
<Step title="다중 모델 전략 구현">
각 에이전트의 역할에 따라 다양한 모델을 사용하세요. 관리자와 복잡한 작업에는
고성능 모델을, 일상적 운영에는 효율적인 모델을 적용합니다.

View File

@@ -4,6 +4,86 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="19 mai 2026">
## v1.14.5
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5)
## O que Mudou
### Recursos
- Deprecar `CrewAgentExecutor`, definir agentes Crew como `AgentExecutor`
- Melhorar ferramentas do sandbox Daytona
- Adicionar parâmetro de início `restore_from_state_id`
- Adicionar destaques ao `ExaSearchTool`, renomeando de `EXASearchTool`
### Correções de Bugs
- Corrigir vazamento de memória em `git.py` usando `cached_property`
- Exibir chamadas de ferramentas transmitidas quando `available_functions` está ausente
- Garantir eventos de carregamento de `skills` para rastros
- Corrigir caminho do endpoint de status de `/{kickoff_id}/status` para `/status/{kickoff_id}`
- Restaurar bloco de código ausente no guia de primeiro fluxo em pt-BR
- Impedir que `result_as_answer` retorne mensagens de bloqueio de hook ou de erro como resposta final
- Preservar saídas de tarefas durante o descarregamento assíncrono em lote
- Sempre restaurar `task.output_pydantic` no bloco finally
- Lidar com entrada de `BaseModel` em `convert_to_model`
### Documentação
- Atualizar changelog e versão para v1.14.5
- Adicionar guia de migração de atualização OSS & crew-to-flow
- Documentar variáveis de ambiente adicionais para devtools
- Adicionar documentação para `TavilyGetResearch`
### Refatoração
- Extrair CLI para o pacote autônomo `crewai-cli`
## Contribuidores
@NIK-TIGER-BILL, @akaKuruma, @cgoeppinger, @github-actions[bot], @greysonlalonde, @heitorado, @irfaan101, @iris-clawd, @lorenzejay, @manisrinivasan2k1, @minasami-pr, @mislavivanda, @theCyberTech, @theishangoswami, @wishhyt
</Update>
<Update label="18 mai 2026">
## v1.14.5a7
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a7)
## O que Mudou
### Documentação
- Atualizar changelog e versão para v1.14.5a6
### Mudanças Quebradoras
- Depreciar o campo function_calling_llm
## Contributors
@greysonlalonde, @heitorado
</Update>
<Update label="15 mai 2026">
## v1.14.5a6
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a6)
## O que mudou
### Correções de Bugs
- Corrigir chamadas de ferramentas transmitidas quando available_functions está ausente
- Atualizar a dependência langsmith para a versão >=0.8.0 para resolver GHSA-3644-q5cj-c5c7
- Resolver espaços reservados de blocos de código não traduzidos na documentação em português brasileiro
### Documentação
- Adicionar documentação para TavilyGetResearch
- Atualizar changelog e versão para v1.14.5a5
## Contributors
@greysonlalonde, @heitorado, @iris-clawd, @lorenzejay, @manisrinivasan2k1
</Update>
<Update label="13 mai 2026">
## v1.14.5a5

View File

@@ -146,7 +146,6 @@ Veja um fluxo de trabalho típico para criação de um crew com o Crew Studio:
</Step>
{" "}
<Step title="Responder Perguntas">
Responda às perguntas de esclarecimento do Crew Assistant para refinar seus
requisitos.
@@ -161,12 +160,10 @@ Veja um fluxo de trabalho típico para criação de um crew com o Crew Studio:
</Step>
{" "}
<Step title="Aprovar ou Modificar">
Aprove o plano ou solicite alterações, se necessário.
</Step>
{" "}
<Step title="Baixar ou Fazer Deploy">
Baixe o código para personalização ou faça o deploy diretamente na plataforma.
</Step>

View File

@@ -797,7 +797,6 @@ As tabelas abaixo mostram uma amostra dos modelos de maior destaque em cada cate
Inicie com opções consagradas como **GPT-4.1**, **Claude 3.7 Sonnet** ou **Gemini 2.0 Flash**, que oferecem bom desempenho e ampla validação.
</Step>
{" "}
<Step title="Identifique Demandas Especializadas">
Descubra se sua crew possui requisitos específicos (código, raciocínio,
velocidade) que justifiquem modelos como **Claude 4 Sonnet** para
@@ -805,7 +804,6 @@ As tabelas abaixo mostram uma amostra dos modelos de maior destaque em cada cate
velocidade, considere Groq aliado à seleção do modelo.
</Step>
{" "}
<Step title="Implemente Estratégia Multi-Modelo">
Use modelos diferentes para agentes distintos conforme o papel. Modelos de
alta capacidade para managers e tarefas complexas, eficientes para rotinas.

View File

@@ -8,7 +8,7 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.5a5",
"crewai-core==1.14.5",
"click~=8.1.7",
"pydantic>=2.11.9,<2.13",
"pydantic-settings~=2.10.1",

View File

@@ -1 +1 @@
__version__ = "1.14.5a5"
__version__ = "1.14.5"

View File

@@ -26,6 +26,7 @@ from crewai_cli.replay_from_task import replay_task_command
from crewai_cli.reset_memories_command import reset_memories_command
from crewai_cli.run_crew import run_crew
from crewai_cli.settings.main import SettingsCommand
from crewai_cli.skills.main import SkillCommand
from crewai_cli.task_outputs import load_task_outputs
from crewai_cli.tools.main import ToolCommand
from crewai_cli.train_crew import train_crew
@@ -546,6 +547,56 @@ def tool_publish(is_public: bool, force: bool) -> None:
tool_cmd.publish(is_public, force)
@crewai.group()
def skill() -> None:
"""Skill Repository related commands."""
@skill.command(name="create")
@click.argument("name")
@click.option(
"--no-project",
"in_project",
is_flag=True,
default=True,
flag_value=False,
help="Create skill in current dir instead of ./skills/",
)
def skill_create(name: str, in_project: bool) -> None:
skill_cmd = SkillCommand()
skill_cmd.create(name, in_project=in_project)
@skill.command(name="install")
@click.argument("ref")
def skill_install(ref: str) -> None:
skill_cmd = SkillCommand()
skill_cmd.install(ref)
@skill.command(name="publish")
@click.option(
"--force",
is_flag=True,
default=False,
show_default=True,
help="Skip git-state validation.",
)
@click.option("--public", "is_public", flag_value=True, default=False)
@click.option("--private", "is_public", flag_value=False)
@click.option("--org", default=None, help="Organisation slug (overrides settings).")
def skill_publish(is_public: bool, org: str | None, force: bool) -> None:
skill_cmd = SkillCommand()
skill_cmd.publish(is_public, org=org, force=force)
@skill.command(name="list")
def skill_list() -> None:
"""List locally installed skills."""
skill_cmd = SkillCommand()
skill_cmd.list_cached()
@crewai.group()
def template() -> None:
"""Browse and install project templates."""

View File

@@ -1,5 +1,6 @@
from typing import Any
from crewai_core.plus_api import CreateCrewPayload
from rich.console import Console
from crewai_cli import git
@@ -161,7 +162,7 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
self,
env_vars: dict[str, str],
remote_repo_url: str,
) -> dict[str, Any]:
) -> CreateCrewPayload:
"""
Create the payload for crew creation.
@@ -172,6 +173,8 @@ class DeployCommand(BaseCommand, PlusAPIMixin):
Returns:
Dict[str, Any]: The payload for crew creation.
"""
if not self.project_name:
raise ValueError("project_name is required to create a deployment payload")
return {
"deploy": {
"name": self.project_name,

View File

@@ -1,4 +1,4 @@
from functools import lru_cache
from functools import cached_property
import subprocess
@@ -9,7 +9,7 @@ class Repository:
if not self.is_git_installed():
raise ValueError("Git is not installed or not found in your PATH.")
if not self.is_git_repo():
if not self.is_git_repo:
raise ValueError(f"{self.path} is not a Git repository.")
self.fetch()
@@ -40,13 +40,9 @@ class Repository:
encoding="utf-8",
).strip()
@lru_cache(maxsize=None) # noqa: B019
@cached_property
def is_git_repo(self) -> bool:
"""Check if the current directory is a git repository.
Notes:
- TODO: This method is cached to avoid redundant checks, but using lru_cache on methods can lead to memory leaks
"""
"""Check if the current directory is a git repository."""
try:
subprocess.check_output(
["git", "rev-parse", "--is-inside-work-tree"], # noqa: S607

View File

@@ -0,0 +1,415 @@
"""Skill Repository CLI commands for CrewAI."""
from __future__ import annotations
import base64
import io
import json
import os
from pathlib import Path
import tarfile
import zipfile
from rich.console import Console
from rich.table import Table
from crewai_cli.command import BaseCommand, PlusAPIMixin
from crewai_cli.config import Settings
from crewai_cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
console = Console()
_SKILL_MD_TEMPLATE = """\
---
name: {name}
version: 0.1.0
description: |
A short description of what this skill does.
---
## Instructions
Describe the skill behaviour here. This section is shown to the agent at activation time.
"""
class SkillCommand(BaseCommand, PlusAPIMixin):
"""Skill Repository related operations for CrewAI projects."""
def __init__(self) -> None:
BaseCommand.__init__(self)
PlusAPIMixin.__init__(self, telemetry=self._telemetry)
# ------------------------------------------------------------------
# create
# ------------------------------------------------------------------
def create(self, name: str, in_project: bool = True) -> None:
"""Scaffold a new skill directory.
If pyproject.toml is present (crew project), creates ./skills/{name}/.
Otherwise creates ./{name}/.
"""
if in_project and os.path.isfile("pyproject.toml"):
skill_dir = Path("skills") / name
else:
skill_dir = Path(name)
if skill_dir.exists():
console.print(f"[red]Directory {skill_dir} already exists.[/red]")
raise SystemExit(1)
skill_dir.mkdir(parents=True)
(skill_dir / "scripts").mkdir()
(skill_dir / "references").mkdir()
(skill_dir / "assets").mkdir()
skill_md = skill_dir / "SKILL.md"
skill_md.write_text(_SKILL_MD_TEMPLATE.format(name=name))
console.print(
f"[green]Created skill [bold]{name}[/bold] at [bold]{skill_dir}[/bold].[/green]"
)
console.print(f"Edit [bold]{skill_md}[/bold] to define the skill instructions.")
# ------------------------------------------------------------------
# install
# ------------------------------------------------------------------
def install(self, ref: str) -> None:
"""Download and install a registry skill.
Format: @org/name
Inside a crew project (pyproject.toml present): installs to ./skills/{name}/
Outside a project: installs to ~/.crewai/skills/{org}/{name}/
"""
if not ref.startswith("@"):
console.print(
"[red]Invalid skill reference. Use the format @org/name.[/red]"
)
raise SystemExit(1)
without_at = ref[1:]
if without_at.count("/") != 1:
console.print(
"[red]Invalid skill reference. Use the format @org/name.[/red]"
)
raise SystemExit(1)
org, name = without_at.split("/", 1)
if (
not org
or not name
or org.startswith(".")
or name.startswith(".")
or len(Path(org).parts) != 1
or len(Path(name).parts) != 1
):
console.print(
"[red]Invalid skill reference: org and name must be single, "
"non-empty path segments (no slashes, no '..').[/red]"
)
raise SystemExit(1)
self._print_current_organization()
console.print(f"[bold blue]Downloading skill {ref}...[/bold blue]")
get_response = self.plus_api_client.get_skill(org, name)
if get_response.status_code == 404:
console.print(
f"[red]Skill {ref} not found. Ensure it has been published and you have access.[/red]"
)
raise SystemExit(1)
if get_response.status_code != 200:
console.print(
f"[red]Failed to download skill {ref}: {get_response.status_code}[/red]"
)
raise SystemExit(1)
data = get_response.json()
version = data.get("latest_version") or data.get("version")
download_url = data.get("download_url")
if download_url:
import httpx
dl_response = httpx.get(download_url, follow_redirects=True)
dl_response.raise_for_status()
archive_bytes = dl_response.content
else:
encoded = data.get("file", "")
if "," in encoded:
encoded = encoded.split(",", 1)[1]
archive_bytes = base64.b64decode(encoded)
in_project = os.path.isfile("pyproject.toml")
if in_project:
dest = Path("skills") / name
dest.mkdir(parents=True, exist_ok=True)
self._unpack_archive(archive_bytes, dest)
console.print(
f"[green]Installed [bold]{ref}[/bold]{' (' + version + ')' if version else ''} to [bold]{dest}[/bold].[/green]"
)
else:
try:
from crewai.skills.cache import SkillCacheManager
cache = SkillCacheManager()
cache.store(org, name, version, archive_bytes)
except ImportError:
# Fallback if SDK not installed — write directly
cache_dir = Path.home() / ".crewai" / "skills" / org / name
if cache_dir.exists():
import shutil
shutil.rmtree(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
self._unpack_archive(archive_bytes, cache_dir)
# Write metadata so `crewai skill list` can discover it
from datetime import datetime, timezone
meta = {
"org": org,
"name": name,
"version": version,
"installed_at": datetime.now(tz=timezone.utc).isoformat(),
}
(cache_dir / ".crewai_meta.json").write_text(json.dumps(meta, indent=2))
console.print(
f"[green]Installed [bold]{ref}[/bold]{' (' + version + ')' if version else ''} to global cache.[/green]"
)
# ------------------------------------------------------------------
# publish
# ------------------------------------------------------------------
def publish(self, is_public: bool, org: str | None, force: bool = False) -> None:
"""Publish the skill in the current directory to the registry."""
skill_md = Path("SKILL.md")
if not skill_md.exists():
console.print(
"[red]No SKILL.md found in current directory. "
"Run this command from inside a skill directory.[/red]"
)
raise SystemExit(1)
# Parse frontmatter to extract name + version
try:
frontmatter = self._parse_frontmatter(skill_md.read_text())
except ValueError as exc:
console.print(f"[red]Failed to parse SKILL.md frontmatter: {exc}[/red]")
raise SystemExit(1) from exc
name = frontmatter.get("name")
version = frontmatter.get("version")
description = frontmatter.get("description")
if not name:
console.print(
"[red]SKILL.md frontmatter must include a 'name' field.[/red]"
)
raise SystemExit(1)
if not version:
console.print(
"[red]SKILL.md frontmatter must include a 'version' field before publishing.[/red]"
)
raise SystemExit(1)
settings = Settings()
effective_org = org or settings.org_name
if not effective_org:
console.print(
"[red]No organisation set. Run `crewai org switch <org_id>` first, "
"or pass --org.[/red]"
)
raise SystemExit(1)
self._print_current_organization()
console.print(
f"[bold blue]Publishing skill [bold]{name}[/bold] v{version} to {effective_org}...[/bold blue]"
)
archive_bytes = self._build_skill_tarball()
encoded_file = "data:application/x-gzip;base64," + base64.b64encode(
archive_bytes
).decode("utf-8")
response = self.plus_api_client.publish_skill(
org=effective_org,
name=name,
version=version,
is_public=is_public,
description=description,
encoded_file=encoded_file,
)
self._validate_response(response)
base_url = settings.enterprise_base_url or DEFAULT_CREWAI_ENTERPRISE_URL
console.print(
f"[green]Published [bold]{effective_org}/{name}[/bold] v{version}.\n\n"
"Security checks are running in the background. "
"Your skill will be available once checks complete.\n"
f"Monitor status at: {base_url}/crewai_plus/skills/{effective_org}/{name}[/green]"
)
# ------------------------------------------------------------------
# list_cached
# ------------------------------------------------------------------
def list_cached(self) -> None:
"""Show locally installed skills."""
table = Table(title="Installed Skills", show_lines=True)
table.add_column("Source", style="dim")
table.add_column("Ref")
table.add_column("Version")
table.add_column("Path")
# Project-local ./skills/
local_skills_dir = Path("skills")
if local_skills_dir.is_dir():
for skill_dir in sorted(local_skills_dir.iterdir()):
if skill_dir.is_dir() and (skill_dir / "SKILL.md").exists():
version = self._read_version(skill_dir / "SKILL.md")
table.add_row(
"project",
skill_dir.name,
version or "-",
str(skill_dir),
)
# Global cache
cache_root = Path.home() / ".crewai" / "skills"
if cache_root.exists():
for org_dir in sorted(cache_root.iterdir()):
if not org_dir.is_dir():
continue
for skill_dir in sorted(org_dir.iterdir()):
meta_file = skill_dir / ".crewai_meta.json"
if meta_file.exists():
try:
meta = json.loads(meta_file.read_text())
table.add_row(
"cache",
f"@{meta['org']}/{meta['name']}",
meta.get("version") or "-",
str(skill_dir),
)
except (json.JSONDecodeError, KeyError):
console.print(
f"[yellow]Warning: skipping malformed cache entry at {meta_file}[/yellow]"
)
console.print(table)
# ------------------------------------------------------------------
# internal helpers
# ------------------------------------------------------------------
def _print_current_organization(self) -> None:
settings = Settings()
if settings.org_uuid:
console.print(
f"Current organization: {settings.org_name} ({settings.org_uuid})",
style="bold blue",
)
else:
console.print(
"No organization currently set. We recommend setting one before using: "
"`crewai org switch <org_id>` command.",
style="yellow",
)
def _unpack_archive(self, archive_bytes: bytes, dest: Path) -> None:
"""Unpack a .tar.gz or .zip archive into dest."""
# Try tar first, then zip
try:
with tarfile.open(fileobj=io.BytesIO(archive_bytes), mode="r:gz") as tf:
try:
tf.extractall(dest, filter="data")
except TypeError:
_safe_extractall(tf, dest)
return
except tarfile.TarError:
pass
# Fallback: zip
with zipfile.ZipFile(io.BytesIO(archive_bytes)) as zf:
_safe_extract_zip(zf, dest)
def _build_skill_tarball(self) -> bytes:
"""Build an in-memory .tar.gz of SKILL.md + scripts/ + references/ + assets/."""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:gz") as tf:
tf.add("SKILL.md")
for folder in ("scripts", "references", "assets"):
folder_path = Path(folder)
if folder_path.is_dir():
for fpath in sorted(folder_path.rglob("*")):
if fpath.is_file():
tf.add(str(fpath))
return buf.getvalue()
def _parse_frontmatter(self, content: str) -> dict[str, str]:
"""Extract YAML frontmatter fields from a SKILL.md string.
Reuses crewai.skills.parser when available, with a minimal
fallback for environments where the full SDK isn't installed.
"""
try:
from crewai.skills.parser import parse_frontmatter
fm_dict, _ = parse_frontmatter(content)
return fm_dict
except ImportError:
pass
# Fallback: minimal YAML parsing without SDK dependency
import re
match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
if not match:
raise ValueError("No YAML frontmatter block found")
try:
import yaml
return yaml.safe_load(match.group(1)) or {}
except ImportError:
result: dict[str, str] = {}
for line in match.group(1).splitlines():
if ":" in line:
key, _, value = line.partition(":")
result[key.strip()] = value.strip()
return result
def _read_version(self, skill_md: Path) -> str | None:
"""Read the version field from a SKILL.md file, or None."""
try:
fm = self._parse_frontmatter(skill_md.read_text())
return fm.get("version")
except Exception:
return None
def _safe_extractall(tf: tarfile.TarFile, dest: Path) -> None:
"""Path-traversal-safe extraction for Python < 3.12."""
dest_resolved = dest.resolve()
for member in tf.getmembers():
member_path = (dest / member.name).resolve()
if not member_path.is_relative_to(dest_resolved):
raise ValueError(f"Blocked path traversal attempt: {member.name!r}")
tf.extractall(dest) # noqa: S202
def _safe_extract_zip(zf: zipfile.ZipFile, dest: Path) -> None:
"""Path-traversal-safe ZIP extraction."""
dest_resolved = dest.resolve()
for member in zf.namelist():
member_path = (dest / member).resolve()
if not member_path.is_relative_to(dest_resolved):
raise ValueError(f"Blocked path traversal attempt: {member!r}")
zf.extractall(dest) # noqa: S202

View File

View File

@@ -0,0 +1,205 @@
"""Tests for SkillCommand CLI."""
from __future__ import annotations
import io
import os
import tempfile
import zipfile
from contextlib import contextmanager
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai_cli.shared.token_manager import TokenManager
@contextmanager
def in_temp_dir():
original = os.getcwd()
with tempfile.TemporaryDirectory() as td:
os.chdir(td)
try:
yield td
finally:
os.chdir(original)
@pytest.fixture
def skill_command():
with tempfile.TemporaryDirectory() as temp_dir:
with patch.object(
TokenManager, "_get_secure_storage_path", return_value=Path(temp_dir)
):
TokenManager().save_tokens(
"test-token", (datetime.now() + timedelta(seconds=36000)).timestamp()
)
from crewai_cli.skills.main import SkillCommand
cmd = SkillCommand()
yield cmd
# ---------------------------------------------------------------------------
# create
# ---------------------------------------------------------------------------
class TestSkillCreate:
def test_create_in_project(self, skill_command, tmp_path):
with in_temp_dir():
# Simulate being inside a project
Path("pyproject.toml").write_text("[tool.poetry]\nname = 'test'\n")
skill_command.create("my-skill")
assert Path("skills/my-skill/SKILL.md").exists()
assert Path("skills/my-skill/scripts").is_dir()
assert Path("skills/my-skill/references").is_dir()
assert Path("skills/my-skill/assets").is_dir()
def test_create_outside_project(self, skill_command, tmp_path):
with in_temp_dir():
skill_command.create("standalone-skill", in_project=False)
assert Path("standalone-skill/SKILL.md").exists()
def test_create_adds_name_to_skill_md(self, skill_command):
with in_temp_dir():
skill_command.create("hello-world", in_project=False)
content = Path("hello-world/SKILL.md").read_text()
assert "name: hello-world" in content
assert "version: 0.1.0" in content
def test_create_fails_if_dir_exists(self, skill_command):
with in_temp_dir():
Path("existing-skill").mkdir()
with pytest.raises(SystemExit):
skill_command.create("existing-skill", in_project=False)
# ---------------------------------------------------------------------------
# install
# ---------------------------------------------------------------------------
class TestSkillInstall:
def _zip_skill(self, name: str) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("SKILL.md", f"---\nname: {name}\ndescription: Test.\n---\nInstructions.")
return buf.getvalue()
def test_install_invalid_ref_no_at(self, skill_command):
with pytest.raises(SystemExit):
skill_command.install("acme/my-skill")
def test_install_invalid_ref_no_slash(self, skill_command):
with pytest.raises(SystemExit):
skill_command.install("@acmeskill")
def test_install_404(self, skill_command):
mock_resp = MagicMock()
mock_resp.status_code = 404
skill_command.plus_api_client.get_skill = MagicMock(return_value=mock_resp)
with pytest.raises(SystemExit):
skill_command.install("@acme/ghost")
def test_install_in_project(self, skill_command):
import base64
archive = self._zip_skill("my-skill")
encoded = "data:application/zip;base64," + base64.b64encode(archive).decode()
mock_resp = MagicMock()
mock_resp.status_code = 200
mock_resp.json.return_value = {"file": encoded, "version": "1.0.0"}
skill_command.plus_api_client.get_skill = MagicMock(return_value=mock_resp)
with in_temp_dir():
Path("pyproject.toml").write_text("[tool]\n")
skill_command.install("@acme/my-skill")
assert Path("skills/my-skill/SKILL.md").exists()
# ---------------------------------------------------------------------------
# publish
# ---------------------------------------------------------------------------
class TestSkillPublish:
def test_publish_no_skill_md(self, skill_command):
with in_temp_dir():
with pytest.raises(SystemExit):
skill_command.publish(is_public=True, org="acme")
def test_publish_missing_version(self, skill_command):
with in_temp_dir():
Path("SKILL.md").write_text(
"---\nname: my-skill\ndescription: Test.\n---\nInstructions."
)
with pytest.raises(SystemExit):
skill_command.publish(is_public=True, org="acme")
def test_publish_missing_name(self, skill_command):
with in_temp_dir():
Path("SKILL.md").write_text(
"---\ndescription: Test.\nversion: 1.0.0\n---\nInstructions."
)
with pytest.raises(SystemExit):
skill_command.publish(is_public=True, org="acme")
def test_publish_no_org(self, skill_command):
with in_temp_dir():
Path("SKILL.md").write_text(
"---\nname: my-skill\nversion: 1.0.0\ndescription: Test.\n---\nInstructions."
)
with patch.object(skill_command, "plus_api_client") as mock_client:
mock_resp = MagicMock()
mock_resp.is_success = True
mock_resp.status_code = 200
mock_resp.json.return_value = {}
mock_client.publish_skill.return_value = mock_resp
# No org set → should SystemExit (no org_name in settings)
with patch("crewai_cli.skills.main.Settings") as mock_settings_cls:
mock_settings_cls.return_value.org_name = None
mock_settings_cls.return_value.enterprise_base_url = None
with pytest.raises(SystemExit):
skill_command.publish(is_public=True, org=None)
def test_publish_calls_api(self, skill_command):
with in_temp_dir():
Path("SKILL.md").write_text(
"---\nname: my-skill\nversion: 1.0.0\ndescription: A test skill.\n---\nInstructions."
)
mock_resp = MagicMock()
mock_resp.is_success = True
mock_resp.status_code = 200
mock_resp.json.return_value = {}
skill_command.plus_api_client.publish_skill = MagicMock(return_value=mock_resp)
with patch("crewai_cli.skills.main.Settings") as mock_settings_cls:
mock_settings_cls.return_value.org_name = "acme"
mock_settings_cls.return_value.enterprise_base_url = None
skill_command.publish(is_public=False, org="acme")
skill_command.plus_api_client.publish_skill.assert_called_once()
call_kwargs = skill_command.plus_api_client.publish_skill.call_args
assert call_kwargs.kwargs["name"] == "my-skill"
assert call_kwargs.kwargs["version"] == "1.0.0"
# ---------------------------------------------------------------------------
# list_cached
# ---------------------------------------------------------------------------
class TestSkillListCached:
def test_list_cached_empty(self, skill_command, capsys):
with in_temp_dir():
skill_command.list_cached()
# Should not raise
def test_list_cached_shows_project_skills(self, skill_command, capsys):
with in_temp_dir():
skill_dir = Path("skills/my-skill")
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: my-skill\nversion: 0.5.0\ndescription: A skill.\n---\nBody."
)
skill_command.list_cached()
# Should complete without error

View File

@@ -1 +1 @@
__version__ = "1.14.5a5"
__version__ = "1.14.5"

View File

@@ -3,36 +3,162 @@
from __future__ import annotations
import os
from typing import Any
from typing import Any, Final, Literal, TypedDict, cast
from urllib.parse import urljoin
import httpx
from typing_extensions import NotRequired
from crewai_core.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai_core.settings import Settings
from crewai_core.version import get_crewai_version
HttpMethod = Literal["GET", "POST", "PATCH", "DELETE"]
class AvailableExport(TypedDict):
name: str
class EnvVarEntry(TypedDict):
name: str
description: str
required: bool
default: str | None
class ToolMetadata(TypedDict):
name: str
module: str
humanized_name: str
description: str
run_params_schema: dict[str, Any]
init_params_schema: dict[str, Any]
env_vars: list[EnvVarEntry]
class ToolsMetadataPayload(TypedDict):
package: str
tools: list[ToolMetadata] | None
class PublishToolPayload(TypedDict):
handle: str
public: bool
version: str
file: str
description: str | None
available_exports: list[AvailableExport] | None
tools_metadata: ToolsMetadataPayload | None
class CrewDeploymentSpec(TypedDict):
name: str
repo_clone_url: str
env: dict[str, str]
class CreateCrewPayload(TypedDict):
deploy: CrewDeploymentSpec
class _WithUserIdentifier(TypedDict):
user_identifier: NotRequired[str]
class LoginPayload(_WithUserIdentifier):
pass
class TraceExecutionContext(TypedDict):
crew_fingerprint: str | None
crew_name: str | None
flow_name: str | None
crewai_version: str
privacy_level: str
class TraceExecutionMetadata(TypedDict):
expected_duration_estimate: int
agent_count: int
task_count: int
flow_method_count: int
execution_started_at: str
class TraceBatchInitPayload(_WithUserIdentifier):
trace_id: str
execution_type: str
execution_context: TraceExecutionContext
execution_metadata: TraceExecutionMetadata
ephemeral_trace_id: NotRequired[str]
class TraceBatchMetadata(TypedDict):
events_count: int
batch_sequence: int
is_final_batch: bool
class TraceEventsPayload(TypedDict):
events: list[dict[str, Any]]
batch_metadata: TraceBatchMetadata
class TraceFinalizePayload(TypedDict):
status: Literal["completed"]
duration_ms: float | None
final_event_count: int
class TraceFailedPayload(TypedDict):
status: Literal["failed"]
failure_reason: str
Headers = TypedDict(
"Headers",
{
"Content-Type": str,
"User-Agent": str,
"X-Crewai-Version": str,
"Authorization": NotRequired[str],
"X-Crewai-Organization-Id": NotRequired[str],
},
)
class RequestKwargs(TypedDict):
headers: dict[str, str]
json: NotRequired[Any]
params: NotRequired[dict[str, str]]
timeout: NotRequired[float]
class PlusAPI:
"""Client for working with the CrewAI+ API."""
TOOLS_RESOURCE = "/crewai_plus/api/v1/tools"
ORGANIZATIONS_RESOURCE = "/crewai_plus/api/v1/me/organizations"
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral"
INTEGRATIONS_RESOURCE = "/crewai_plus/api/v1/integrations"
TOOLS_RESOURCE: Final = "/crewai_plus/api/v1/tools"
SKILLS_RESOURCE: Final = "/crewai_plus/api/v1/skills"
ORGANIZATIONS_RESOURCE: Final = "/crewai_plus/api/v1/me/organizations"
CREWS_RESOURCE: Final = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE: Final = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE: Final = "/crewai_plus/api/v1/tracing"
EPHEMERAL_TRACING_RESOURCE: Final = "/crewai_plus/api/v1/tracing/ephemeral"
INTEGRATIONS_RESOURCE: Final = "/crewai_plus/api/v1/integrations"
def __init__(self, api_key: str | None = None) -> None:
version = get_crewai_version()
self.api_key = api_key
self.headers = {
self.headers: Headers = {
"Content-Type": "application/json",
"User-Agent": f"CrewAI-CLI/{get_crewai_version()}",
"X-Crewai-Version": get_crewai_version(),
"User-Agent": f"CrewAI-CLI/{version}",
"X-Crewai-Version": version,
}
if api_key:
self.headers["Authorization"] = f"Bearer {api_key}"
settings = Settings()
if settings.org_uuid:
self.headers["X-Crewai-Organization-Id"] = settings.org_uuid
@@ -44,17 +170,30 @@ class PlusAPI:
)
def _make_request(
self, method: str, endpoint: str, **kwargs: Any
self,
method: HttpMethod,
endpoint: str,
*,
json: Any = None,
params: dict[str, str] | None = None,
timeout: float | None = None,
verify: bool = True,
) -> httpx.Response:
url = urljoin(self.base_url, endpoint)
verify = kwargs.pop("verify", True)
request_kwargs: RequestKwargs = {"headers": cast(dict[str, str], self.headers)}
if json is not None:
request_kwargs["json"] = json
if params is not None:
request_kwargs["params"] = params
if timeout is not None:
request_kwargs["timeout"] = timeout
with httpx.Client(trust_env=False, verify=verify) as client:
return client.request(method, url, headers=self.headers, **kwargs)
return client.request(method, url, **request_kwargs)
def login_to_tool_repository(
self, user_identifier: str | None = None
) -> httpx.Response:
payload = {}
payload: LoginPayload = {}
if user_identifier:
payload["user_identifier"] = user_identifier
return self._make_request("POST", f"{self.TOOLS_RESOURCE}/login", json=payload)
@@ -65,7 +204,7 @@ class PlusAPI:
async def get_agent(self, handle: str) -> httpx.Response:
url = urljoin(self.base_url, f"{self.AGENTS_RESOURCE}/{handle}")
async with httpx.AsyncClient() as client:
return await client.get(url, headers=self.headers)
return await client.get(url, headers=cast(dict[str, str], self.headers))
def publish_tool(
self,
@@ -74,10 +213,10 @@ class PlusAPI:
version: str,
description: str | None,
encoded_file: str,
available_exports: list[dict[str, Any]] | None = None,
tools_metadata: list[dict[str, Any]] | None = None,
available_exports: list[AvailableExport] | None = None,
tools_metadata: list[ToolMetadata] | None = None,
) -> httpx.Response:
params = {
params: PublishToolPayload = {
"handle": handle,
"public": is_public,
"version": version,
@@ -90,6 +229,47 @@ class PlusAPI:
}
return self._make_request("POST", f"{self.TOOLS_RESOURCE}", json=params)
def get_skill(
self, org: str, name: str, version: str | None = None
) -> httpx.Response:
params: dict[str, str] = {}
if version is not None:
params["version"] = version
return self._make_request(
"GET",
f"{self.SKILLS_RESOURCE}/{org}/{name}",
params=params or None,
)
def publish_skill(
self,
org: str,
name: str,
version: str,
is_public: bool,
description: str | None,
encoded_file: str,
) -> httpx.Response:
payload = {
"org": org,
"name": name,
"version": version,
"public": is_public,
"description": description,
"file": encoded_file,
}
return self._make_request("POST", self.SKILLS_RESOURCE, json=payload)
def list_skills(self, org: str | None = None) -> httpx.Response:
params: dict[str, str] = {}
if org is not None:
params["org"] = org
return self._make_request(
"GET",
self.SKILLS_RESOURCE,
params=params or None,
)
def deploy_by_name(self, project_name: str) -> httpx.Response:
return self._make_request(
"POST", f"{self.CREWS_RESOURCE}/by-name/{project_name}/deploy"
@@ -129,13 +309,13 @@ class PlusAPI:
def list_crews(self) -> httpx.Response:
return self._make_request("GET", self.CREWS_RESOURCE)
def create_crew(self, payload: dict[str, Any]) -> httpx.Response:
def create_crew(self, payload: CreateCrewPayload) -> httpx.Response:
return self._make_request("POST", self.CREWS_RESOURCE, json=payload)
def get_organizations(self) -> httpx.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)
def initialize_trace_batch(self, payload: dict[str, Any]) -> httpx.Response:
def initialize_trace_batch(self, payload: TraceBatchInitPayload) -> httpx.Response:
return self._make_request(
"POST",
f"{self.TRACING_RESOURCE}/batches",
@@ -144,7 +324,7 @@ class PlusAPI:
)
def initialize_ephemeral_trace_batch(
self, payload: dict[str, Any]
self, payload: TraceBatchInitPayload
) -> httpx.Response:
return self._make_request(
"POST",
@@ -153,7 +333,7 @@ class PlusAPI:
)
def send_trace_events(
self, trace_batch_id: str, payload: dict[str, Any]
self, trace_batch_id: str, payload: TraceEventsPayload
) -> httpx.Response:
return self._make_request(
"POST",
@@ -163,7 +343,7 @@ class PlusAPI:
)
def send_ephemeral_trace_events(
self, trace_batch_id: str, payload: dict[str, Any]
self, trace_batch_id: str, payload: TraceEventsPayload
) -> httpx.Response:
return self._make_request(
"POST",
@@ -173,7 +353,7 @@ class PlusAPI:
)
def finalize_trace_batch(
self, trace_batch_id: str, payload: dict[str, Any]
self, trace_batch_id: str, payload: TraceFinalizePayload
) -> httpx.Response:
return self._make_request(
"PATCH",
@@ -183,7 +363,7 @@ class PlusAPI:
)
def finalize_ephemeral_trace_batch(
self, trace_batch_id: str, payload: dict[str, Any]
self, trace_batch_id: str, payload: TraceFinalizePayload
) -> httpx.Response:
return self._make_request(
"PATCH",
@@ -195,20 +375,28 @@ class PlusAPI:
def mark_trace_batch_as_failed(
self, trace_batch_id: str, error_message: str
) -> httpx.Response:
payload: TraceFailedPayload = {
"status": "failed",
"failure_reason": error_message,
}
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}",
json={"status": "failed", "failure_reason": error_message},
json=payload,
timeout=30,
)
def mark_ephemeral_trace_batch_as_failed(
self, trace_batch_id: str, error_message: str
) -> httpx.Response:
payload: TraceFailedPayload = {
"status": "failed",
"failure_reason": error_message,
}
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}",
json={"status": "failed", "failure_reason": error_message},
json=payload,
timeout=30,
)

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.5a5"
__version__ = "1.14.5"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.5a5",
"crewai==1.14.5",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.5a5"
__version__ = "1.14.5"

View File

@@ -8,8 +8,8 @@ authors = [
]
requires-python = ">=3.10, <3.14"
dependencies = [
"crewai-core==1.14.5a5",
"crewai-cli==1.14.5a5",
"crewai-core==1.14.5",
"crewai-cli==1.14.5",
# Core Dependencies
"pydantic>=2.11.9,<2.13",
"openai>=2.30.0,<3",
@@ -54,7 +54,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.5a5",
"crewai-tools==1.14.5",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"
@@ -105,7 +105,7 @@ a2a = [
"aiocache[redis,memcached]~=0.12.3",
]
file-processing = [
"crewai-files==1.14.5a5",
"crewai-files",
]
qdrant-edge = [
"qdrant-edge-py>=0.6.0",

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.5a5"
__version__ = "1.14.5"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -220,7 +220,11 @@ class Agent(BaseAgent):
str | BaseLLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(description="Language model that will run the agent.", default=None)
] = Field(
description="Language model that will run the agent.",
default=None,
deprecated="function_calling_llm is deprecated and will be removed in a future release.",
)
system_template: str | None = Field(
default=None, description="System format for the agent."
)
@@ -430,7 +434,7 @@ class Agent(BaseAgent):
from crewai.crew import Crew
if resolved_crew_skills is None:
crew_skills: list[Path | SkillModel] | None = (
crew_skills: list[Path | SkillModel | str] | None = (
self.crew.skills
if isinstance(self.crew, Crew) and isinstance(self.crew.skills, list)
else None
@@ -442,7 +446,7 @@ class Agent(BaseAgent):
return
needs_work = self.skills and any(
isinstance(s, Path)
isinstance(s, (Path, str))
or (isinstance(s, SkillModel) and s.disclosure_level < INSTRUCTIONS)
for s in self.skills
)
@@ -450,14 +454,28 @@ class Agent(BaseAgent):
return
seen: set[str] = set()
resolved: list[Path | SkillModel] = []
items: list[Path | SkillModel] = list(self.skills) if self.skills else []
resolved: list[Path | SkillModel | str] = []
items: list[Path | SkillModel | str] = list(self.skills) if self.skills else []
if crew_skills:
items.extend(crew_skills)
for item in items:
if isinstance(item, Path):
if isinstance(item, str):
from crewai.skills.registry import (
is_registry_ref,
parse_registry_ref,
resolve_registry_ref,
)
if is_registry_ref(item):
skill = resolve_registry_ref(item, source=self)
org, _ = parse_registry_ref(item)
dedup_key = f"{org}/{skill.name}"
if dedup_key not in seen:
seen.add(dedup_key)
resolved.append(skill)
elif isinstance(item, Path):
discovered = discover_skills(item, source=self)
for skill in discovered:
if skill.name not in seen:

View File

@@ -51,7 +51,10 @@ class LangGraphAgentAdapter(BaseAgentAdapter):
_graph: Any = PrivateAttr(default=None)
_memory: Any = PrivateAttr(default=None)
_max_iterations: int = PrivateAttr(default=10)
function_calling_llm: Any = Field(default=None)
function_calling_llm: Any = Field(
default=None,
deprecated="function_calling_llm is deprecated and will be removed in a future release.",
)
step_callback: SerializableCallable | None = Field(default=None)
model: str = Field(default="gpt-4o")

View File

@@ -60,7 +60,10 @@ class OpenAIAgentAdapter(BaseAgentAdapter):
_openai_agent: OpenAIAgentProtocol = PrivateAttr()
_logger: Logger = PrivateAttr(default_factory=Logger)
_active_thread: str | None = PrivateAttr(default=None)
function_calling_llm: Any = Field(default=None)
function_calling_llm: Any = Field(
default=None,
deprecated="function_calling_llm is deprecated and will be removed in a future release.",
)
step_callback: Any = Field(default=None)
_tool_adapter: OpenAIAgentToolAdapter = PrivateAttr()
_converter_adapter: OpenAIConverterAdapter = PrivateAttr()

View File

@@ -334,9 +334,9 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
"If not set, falls back to crew memory."
),
)
skills: list[Path | Skill] | None = Field(
skills: list[Path | Skill | str] | None = Field(
default=None,
description="Agent Skills. Accepts paths for discovery or pre-loaded Skill objects.",
description="Agent Skills. Accepts paths for discovery, pre-loaded Skill objects, or '@org/name' registry refs.",
min_length=1,
)
execution_context: ExecutionContext | None = Field(default=None)
@@ -429,6 +429,20 @@ class BaseAgent(BaseModel, ABC, metaclass=AgentMeta):
def process_model_config(cls, values: Any) -> dict[str, Any]:
return process_config(values, cls)
@field_validator("skills", mode="before")
@classmethod
def coerce_skill_strings(cls, skills: Any) -> Any:
"""Coerce plain path strings to Path objects; keep @-prefixed refs as str."""
if not isinstance(skills, list):
return skills
result = []
for item in skills:
if isinstance(item, str) and not item.startswith("@"):
result.append(Path(item))
else:
result.append(item)
return result
@field_validator("tools")
@classmethod
def validate_tools(cls, tools: list[Any]) -> list[BaseTool]:

View File

@@ -251,7 +251,11 @@ class Crew(FlowTrackable, BaseModel):
str | LLM | None,
BeforeValidator(_validate_llm_ref),
PlainSerializer(_serialize_llm_ref, return_type=dict | None, when_used="json"),
] = Field(description="Language model that will run the agent.", default=None)
] = Field(
description="Language model that will run the agent.",
default=None,
deprecated="function_calling_llm is deprecated and will be removed in a future release.",
)
config: Json[dict[str, Any]] | dict[str, Any] | None = Field(default=None)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
share_crew: bool | None = Field(default=False)
@@ -337,9 +341,9 @@ class Crew(FlowTrackable, BaseModel):
default=None,
description="Knowledge for the crew.",
)
skills: list[Path | Skill] | None = Field(
skills: list[Path | Skill | str] | None = Field(
default=None,
description="Skill search paths or pre-loaded Skill objects applied to all agents in the crew.",
description="Skill search paths, pre-loaded Skill objects, or '@org/name' registry refs applied to all agents in the crew.",
)
security_config: SecurityConfig = Field(
@@ -522,6 +526,20 @@ class Crew(FlowTrackable, BaseModel):
if max_seq > 0:
set_emission_counter(max_seq)
@field_validator("skills", mode="before")
@classmethod
def coerce_skill_strings(cls, skills: Any) -> Any:
"""Coerce plain path strings to Path objects; keep @-prefixed refs as str."""
if not isinstance(skills, list):
return skills
result = []
for item in skills:
if isinstance(item, str) and not item.startswith("@"):
result.append(Path(item))
else:
result.append(item)
return result
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: UUID4 | None, info: Any) -> UUID4 | None:

View File

@@ -320,20 +320,24 @@ class EventListener(BaseEventListener):
self._telemetry.flow_execution_span(
event.flow_name, list(source._methods.keys())
)
self.formatter.handle_flow_created(event.flow_name, str(source.flow_id))
self.formatter.handle_flow_started(event.flow_name, str(source.flow_id))
if not getattr(source, "suppress_flow_events", False):
self.formatter.handle_flow_created(event.flow_name, str(source.flow_id))
self.formatter.handle_flow_started(event.flow_name, str(source.flow_id))
@crewai_event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self.formatter.handle_flow_status(
event.flow_name,
source.flow_id,
)
if not getattr(source, "suppress_flow_events", False):
self.formatter.handle_flow_status(
event.flow_name,
source.flow_id,
)
@crewai_event_bus.on(MethodExecutionStartedEvent)
def on_method_execution_started(
_: Any, event: MethodExecutionStartedEvent
source: Any, event: MethodExecutionStartedEvent
) -> None:
if getattr(source, "suppress_flow_events", False):
return
self.formatter.handle_method_status(
event.method_name,
"running",
@@ -341,8 +345,10 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def on_method_execution_finished(
_: Any, event: MethodExecutionFinishedEvent
source: Any, event: MethodExecutionFinishedEvent
) -> None:
if getattr(source, "suppress_flow_events", False):
return
self.formatter.handle_method_status(
event.method_name,
"completed",

View File

@@ -222,6 +222,8 @@ To enable tracing later, do any one of these:
return
self.batch_manager.batch_owner_type = None
self.batch_manager.batch_owner_id = None
self.batch_manager.defer_session_finalization = False
self.batch_manager._batch_finalized = False
self.batch_manager.current_batch = None
self.batch_manager.event_buffer.clear()
self.batch_manager.trace_batch_id = None

View File

@@ -6,6 +6,14 @@ import time
from typing import Any
import uuid
from crewai_core.plus_api import (
TraceBatchInitPayload,
TraceBatchMetadata,
TraceEventsPayload,
TraceExecutionContext,
TraceExecutionMetadata,
TraceFinalizePayload,
)
from crewai_core.settings import Settings
from rich.console import Console
from rich.panel import Panel
@@ -62,6 +70,8 @@ class TraceBatchManager:
self.execution_start_times: dict[str, datetime] = {}
self.batch_owner_type: str | None = None
self.batch_owner_id: str | None = None
self.defer_session_finalization: bool = False
self._batch_finalized: bool = False
self.backend_initialized: bool = False
self.ephemeral_trace_url: str | None = None
try:
@@ -93,6 +103,7 @@ class TraceBatchManager:
user_context=user_context, execution_metadata=execution_metadata
)
self.is_current_batch_ephemeral = use_ephemeral
self._batch_finalized = False
self.record_start_time("execution")
@@ -123,25 +134,27 @@ class TraceBatchManager:
return None
try:
payload = {
execution_context: TraceExecutionContext = {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", None),
"flow_name": execution_metadata.get("flow_name", None),
"crewai_version": self.current_batch.version,
"privacy_level": user_context.get("privacy_level", "standard"),
}
execution_metadata_payload: TraceExecutionMetadata = {
"expected_duration_estimate": execution_metadata.get(
"expected_duration_estimate", 300
),
"agent_count": execution_metadata.get("agent_count", 0),
"task_count": execution_metadata.get("task_count", 0),
"flow_method_count": execution_metadata.get("flow_method_count", 0),
"execution_started_at": datetime.now(timezone.utc).isoformat(),
}
payload: TraceBatchInitPayload = {
"trace_id": self.current_batch.batch_id,
"execution_type": execution_metadata.get("execution_type", "crew"),
"execution_context": {
"crew_fingerprint": execution_metadata.get("crew_fingerprint"),
"crew_name": execution_metadata.get("crew_name", None),
"flow_name": execution_metadata.get("flow_name", None),
"crewai_version": self.current_batch.version,
"privacy_level": user_context.get("privacy_level", "standard"),
},
"execution_metadata": {
"expected_duration_estimate": execution_metadata.get(
"expected_duration_estimate", 300
),
"agent_count": execution_metadata.get("agent_count", 0),
"task_count": execution_metadata.get("task_count", 0),
"flow_method_count": execution_metadata.get("flow_method_count", 0),
"execution_started_at": datetime.now(timezone.utc).isoformat(),
},
"execution_context": execution_context,
"execution_metadata": execution_metadata_payload,
}
if use_ephemeral:
payload["ephemeral_trace_id"] = self.current_batch.batch_id
@@ -264,13 +277,14 @@ class TraceBatchManager:
if not self.plus_api or not self.trace_batch_id or not self.event_buffer:
return 500
try:
payload = {
batch_metadata: TraceBatchMetadata = {
"events_count": len(self.event_buffer),
"batch_sequence": 1,
"is_final_batch": False,
}
payload: TraceEventsPayload = {
"events": [event.to_dict() for event in self.event_buffer],
"batch_metadata": {
"events_count": len(self.event_buffer),
"batch_sequence": 1,
"is_final_batch": False,
},
"batch_metadata": batch_metadata,
}
response = (
@@ -301,6 +315,9 @@ class TraceBatchManager:
def finalize_batch(self) -> TraceBatch | None:
"""Finalize batch and return it for sending"""
if self._batch_finalized:
return None
if not self.current_batch or not is_tracing_enabled_in_context():
return None
@@ -329,10 +346,8 @@ class TraceBatchManager:
self.current_batch.events = sorted_events
events_sent_count = len(sorted_events)
if sorted_events:
original_buffer = self.event_buffer
self.event_buffer = sorted_events
events_sent_to_backend_status = self._send_events_to_backend()
self.event_buffer = original_buffer
if events_sent_to_backend_status == 500 and self.trace_batch_id:
self._mark_batch_as_failed(
self.trace_batch_id, "Error sending events to backend"
@@ -349,6 +364,7 @@ class TraceBatchManager:
self.event_buffer.clear()
self.trace_batch_id = None
self.is_current_batch_ephemeral = False
self._batch_finalized = True
self._cleanup_batch_data()
@@ -360,11 +376,11 @@ class TraceBatchManager:
Args:
events_count: Number of events that were successfully sent
"""
if not self.plus_api or not self.trace_batch_id:
if self._batch_finalized or not self.plus_api or not self.trace_batch_id:
return
try:
payload = {
payload: TraceFinalizePayload = {
"status": "completed",
"duration_ms": self.calculate_duration("execution"),
"final_event_count": events_count,
@@ -379,6 +395,7 @@ class TraceBatchManager:
)
if response.status_code == 200:
self._batch_finalized = True
access_code = response.json().get("access_code", None)
console = Console()
settings = Settings()

View File

@@ -1,5 +1,6 @@
"""Trace collection listener for orchestrating trace collection."""
from datetime import datetime, timezone
import os
from typing import Any, ClassVar
import uuid
@@ -264,18 +265,18 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source: Any, event: CrewKickoffStartedEvent) -> None:
if self.batch_manager.batch_owner_type != "flow":
# Always call _initialize_crew_batch to claim ownership.
# If batch was already initialized by a concurrent action event
# (e.g. LLM/tool before crew_kickoff_started), initialize_batch()
# returns early but batch_owner_type is still correctly set to "crew".
# Skip only when a parent flow already owns the batch.
# Nested crew inside Flow.kickoff: never claim an existing flow session batch.
if not self._nested_in_flow_execution() and (
not self.batch_manager.is_batch_initialized()
):
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source: Any, event: CrewKickoffCompletedEvent) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self._nested_in_flow_execution():
return
if self.batch_manager.batch_owner_type == "crew":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
@@ -286,10 +287,12 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source: Any, event: CrewKickoffFailedEvent) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
if self._nested_in_flow_execution():
return
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
elif self.batch_manager.batch_owner_type == "crew":
self.batch_manager.finalize_batch()
@event_bus.on(TaskStartedEvent)
@@ -708,8 +711,32 @@ class TraceCollectionListener(BaseEventListener):
@on_signal
def handle_signal(source: Any, event: SignalEvent) -> None:
"""Flush trace batch on system signals to prevent data loss."""
if self.batch_manager.is_batch_initialized():
self.batch_manager.finalize_batch()
if not self.batch_manager.is_batch_initialized():
return
# Multi-turn flows defer batch finalization to finalize_session_traces().
if self.batch_manager.defer_session_finalization:
return
self.batch_manager.finalize_batch()
@staticmethod
def _is_inside_active_flow_context() -> bool:
"""True when ``kickoff_async`` has set ``current_flow_id`` (nested crew)."""
from crewai.flow.flow_context import current_flow_id
return current_flow_id.get() is not None
def _flow_owns_trace_batch(self) -> bool:
"""True when an in-flight conversational flow already owns the trace batch."""
if self.batch_manager.batch_owner_type == "flow":
return True
batch = self.batch_manager.current_batch
if batch is not None:
return batch.execution_metadata.get("execution_type") == "flow"
return False
def _nested_in_flow_execution(self) -> bool:
"""True when a crew runs inside a flow session (context or batch ownership)."""
return self._is_inside_active_flow_context() or self._flow_owns_trace_batch()
def _initialize_crew_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch.
@@ -730,6 +757,31 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata)
def _try_initialize_flow_batch_from_context(self, event: Any) -> bool:
"""Claim a flow trace batch when an action event fires inside kickoff.
Flows with ``suppress_flow_events=True`` skip ``FlowStartedEvent``, so
LLM/tool events must not fall back to implicit crew batches.
"""
from crewai.flow.flow_context import current_flow_id, current_flow_name
flow_id = current_flow_id.get()
if flow_id is None:
return False
started_at = getattr(event, "timestamp", None) or datetime.now(timezone.utc)
user_context = self._get_user_context()
execution_metadata = {
"flow_name": current_flow_name.get() or "Unknown Flow",
"execution_start": started_at,
"crewai_version": get_crewai_version(),
"execution_type": "flow",
}
self.batch_manager.batch_owner_type = "flow"
self.batch_manager.batch_owner_id = flow_id
self._initialize_batch(user_context, execution_metadata)
return True
def _initialize_flow_batch(self, source: Any, event: BaseEvent) -> None:
"""Initialize trace batch for Flow execution.
@@ -794,12 +846,19 @@ class TraceCollectionListener(BaseEventListener):
event: Event object.
"""
if not self.batch_manager.is_batch_initialized():
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self._initialize_batch(user_context, execution_metadata)
if self._try_initialize_flow_batch_from_context(event):
pass
elif not self._nested_in_flow_execution():
user_context = self._get_user_context()
execution_metadata = {
"crew_name": getattr(source, "name", "Unknown Crew"),
"crewai_version": get_crewai_version(),
}
self.batch_manager.batch_owner_type = "crew"
self.batch_manager.batch_owner_id = getattr(
source, "id", str(uuid.uuid4())
)
self._initialize_batch(user_context, execution_metadata)
self.batch_manager.begin_event_processing()
try:

View File

@@ -60,3 +60,20 @@ class SkillLoadFailedEvent(SkillEvent):
type: Literal["skill_load_failed"] = "skill_load_failed"
error: str
class SkillDownloadStartedEvent(SkillEvent):
"""Event emitted when a registry skill download begins."""
type: Literal["skill_download_started"] = "skill_download_started"
registry_ref: str
version: str | None = None
class SkillDownloadCompletedEvent(SkillEvent):
"""Event emitted when a registry skill download completes."""
type: Literal["skill_download_completed"] = "skill_download_completed"
registry_ref: str
version: str | None = None
cache_path: Path | None = None

View File

@@ -4,12 +4,24 @@ from crewai.flow.async_feedback import (
HumanFeedbackProvider,
PendingFeedbackContext,
)
from crewai.flow.chat import (
ChatMessage,
ChatSession,
ConversationEventBridge,
TurnResult,
)
from crewai.flow.conversation import (
ChatState,
ConversationalConfig,
ConversationalInputs,
)
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.flow.flow_config import flow_config
from crewai.flow.flow_serializer import flow_structure
from crewai.flow.human_feedback import HumanFeedbackResult, human_feedback
from crewai.flow.input_provider import InputProvider, InputResponse
from crewai.flow.persistence import persist
from crewai.flow.providers import QueueInputProvider
from crewai.flow.visualization import (
FlowStructure,
build_flow_structure,
@@ -18,7 +30,13 @@ from crewai.flow.visualization import (
__all__ = [
"ChatMessage",
"ChatSession",
"ChatState",
"ConsoleProvider",
"ConversationEventBridge",
"ConversationalConfig",
"ConversationalInputs",
"Flow",
"FlowStructure",
"HumanFeedbackPending",
@@ -27,6 +45,8 @@ __all__ = [
"InputProvider",
"InputResponse",
"PendingFeedbackContext",
"QueueInputProvider",
"TurnResult",
"and_",
"build_flow_structure",
"flow_config",

View File

@@ -0,0 +1,310 @@
"""Transport-agnostic chat session bridge for conversational flows."""
from __future__ import annotations
from collections.abc import Callable, Iterator, Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Literal
from uuid import uuid4
from pydantic import BaseModel, Field
from crewai.flow.conversation import (
get_conversation_messages,
get_conversational_config,
)
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.flow.flow import Flow
from crewai.llms.base_llm import BaseLLM
from crewai.types.streaming import FlowStreamingOutput
ChatMessageType = Literal[
"user_message",
"assistant_delta",
"assistant_done",
"turn_started",
"turn_finished",
"error",
"tool_started",
"tool_finished",
]
class ChatMessage(BaseModel):
"""Versioned wire format for chat UIs (WebSocket, SSE, webhooks)."""
version: str = "1"
type: ChatMessageType
session_id: str
payload: dict[str, Any] = Field(default_factory=dict)
seq: int | None = None
@dataclass
class TurnResult:
"""Outcome of a single conversational turn."""
session_id: str
output: Any
intent: str | None = None
messages: list[LLMMessage] = field(default_factory=list)
streaming: FlowStreamingOutput | None = None
class ChatSession:
"""Wraps ``Flow.kickoff`` for one chat session (``state.id``)."""
def __init__(
self,
flow: Flow[Any],
session_id: str | None = None,
*,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
on_event: Callable[[ChatMessage], None] | None = None,
) -> None:
self._flow = flow
self._session_id = session_id or str(uuid4())
self._intents = list(intents) if intents else None
self._intent_llm = intent_llm
self._on_event = on_event
self._seq = 0
self._bridge: ConversationEventBridge | None = None
config = get_conversational_config(flow)
if config is not None and config.defer_trace_finalization:
flow.defer_trace_finalization = True
if on_event is not None:
self._bridge = ConversationEventBridge(
session_id=self._session_id,
handler=on_event,
)
self._bridge.register()
@property
def session_id(self) -> str:
return self._session_id
def handle_turn(
self,
user_message: str,
*,
stream: bool | None = None,
) -> TurnResult:
"""Run one conversational turn and return output plus message history."""
self._emit("turn_started", {"user_message": user_message})
use_stream = stream if stream is not None else bool(self._flow.stream)
try:
result = self._flow.kickoff(
user_message=user_message,
session_id=self._session_id,
intents=self._intents,
intent_llm=self._intent_llm,
)
except Exception as exc:
self._emit("error", {"message": str(exc)})
raise
streaming = None
output: Any = result
if use_stream and hasattr(result, "__iter__"):
from crewai.types.streaming import FlowStreamingOutput
if isinstance(result, FlowStreamingOutput):
streaming = result
for chunk in result:
text = getattr(chunk, "content", None) or str(chunk)
self._emit("assistant_delta", {"chunk": text})
output = result.result
else:
for chunk in result:
text = getattr(chunk, "content", None) or str(chunk)
self._emit("assistant_delta", {"chunk": text})
intent = None
state = self._flow.state
if hasattr(state, "last_intent"):
intent = getattr(state, "last_intent", None)
elif isinstance(state, dict):
intent = state.get("last_intent")
messages = get_conversation_messages(self._flow)
self._emit(
"assistant_done",
{"output": output, "intent": intent},
)
self._emit("turn_finished", {"output": output})
return TurnResult(
session_id=self._session_id,
output=output,
intent=intent,
messages=messages,
streaming=streaming,
)
def iter_turn_stream(
self,
user_message: str,
) -> Iterator[ChatMessage]:
"""Run a streaming turn and yield ``ChatMessage`` events."""
collected: list[ChatMessage] = []
def _collect(msg: ChatMessage) -> None:
collected.append(msg)
prior = self._on_event
self._on_event = _collect
if self._bridge is None:
self._bridge = ConversationEventBridge(
session_id=self._session_id,
handler=_collect,
)
self._bridge.register()
try:
self.handle_turn(user_message, stream=True)
finally:
self._on_event = prior
yield from collected
def close(self) -> None:
if self._bridge is not None:
self._bridge.unregister()
self._bridge = None
if self._flow._should_defer_trace_finalization():
self._flow.finalize_session_traces()
def _emit(self, msg_type: ChatMessageType, payload: dict[str, Any]) -> None:
if self._on_event is None:
return
self._seq += 1
self._on_event(
ChatMessage(
type=msg_type,
session_id=self._session_id,
payload=payload,
seq=self._seq,
)
)
class ConversationEventBridge:
"""Maps CrewAI bus events to ``ChatMessage`` for a session."""
def __init__(
self,
session_id: str,
handler: Callable[[ChatMessage], None],
) -> None:
self._session_id = session_id
self._handler = handler
self._seq = 0
self._handlers: list[Any] = []
def register(self) -> None:
from crewai.events import crewai_event_bus
from crewai.events.types.flow_events import FlowFinishedEvent
from crewai.events.types.llm_events import (
LLMStreamChunkEvent,
LLMThinkingChunkEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
bus = crewai_event_bus
@bus.on(LLMStreamChunkEvent)
def _on_chunk(_source: Any, event: LLMStreamChunkEvent) -> None:
if not self._matches(event):
return
chunk = getattr(event, "chunk", None)
if chunk:
self._dispatch(
"assistant_delta",
{"chunk": chunk, "agent_role": getattr(event, "agent_role", "")},
)
@bus.on(LLMThinkingChunkEvent)
def _on_thinking(_source: Any, event: LLMThinkingChunkEvent) -> None:
if not self._matches(event):
return
chunk = getattr(event, "chunk", None)
if chunk:
self._dispatch(
"assistant_delta",
{
"chunk": chunk,
"thinking": True,
"agent_role": getattr(event, "agent_role", ""),
},
)
@bus.on(ToolUsageStartedEvent)
def _on_tool_start(_source: Any, event: ToolUsageStartedEvent) -> None:
if not self._matches(event):
return
self._dispatch(
"tool_started",
{"tool_name": getattr(event, "tool_name", "")},
)
@bus.on(ToolUsageFinishedEvent)
def _on_tool_end(_source: Any, event: ToolUsageFinishedEvent) -> None:
if not self._matches(event):
return
self._dispatch(
"tool_finished",
{"tool_name": getattr(event, "tool_name", "")},
)
@bus.on(FlowFinishedEvent)
def _on_finished(_source: Any, event: FlowFinishedEvent) -> None:
if not self._matches(event):
return
self._dispatch("turn_finished", {"result": getattr(event, "result", None)})
self._handlers = [
_on_chunk,
_on_thinking,
_on_tool_start,
_on_tool_end,
_on_finished,
]
def unregister(self) -> None:
self._handlers.clear()
def _matches(self, event: Any) -> bool:
meta = getattr(event, "fingerprint_metadata", None) or {}
if isinstance(meta, dict) and meta.get("conversation_id") == self._session_id:
return True
fp = getattr(event, "source_fingerprint", None)
return fp == self._session_id
def _dispatch(self, msg_type: ChatMessageType, payload: dict[str, Any]) -> None:
self._seq += 1
self._handler(
ChatMessage(
type=msg_type,
session_id=self._session_id,
payload=payload,
seq=self._seq,
)
)
def stamp_conversation_fingerprint(event: Any, session_id: str) -> None:
"""Stamp ``conversation_id`` on an event before dispatch to external systems."""
if not getattr(event, "source_fingerprint", None):
event.source_fingerprint = session_id
meta = getattr(event, "fingerprint_metadata", None)
if meta is None:
event.fingerprint_metadata = {"conversation_id": session_id}
elif isinstance(meta, dict):
meta.setdefault("conversation_id", session_id)

View File

@@ -0,0 +1,243 @@
"""Conversational turn helpers for CrewAI Flows.
Provides message history utilities, kickoff input normalization, and optional
class-level defaults via ``ConversationalConfig``. Session identity is ``state.id``
(``inputs["id"]`` / ``kickoff(session_id=...)``), not a separate Flow field.
"""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast
from uuid import uuid4
from pydantic import BaseModel, Field
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.flow.flow import Flow
from crewai.llms.base_llm import BaseLLM
TurnMode = Literal["auto", "follow_up", "initial"]
_EXIT_COMMANDS_DEFAULT: tuple[str, ...] = ("exit", "quit")
class ConversationalInputs(TypedDict, total=False):
"""Conventional ``kickoff(inputs=...)`` keys for chat turns."""
id: str
user_message: str | dict[str, Any]
last_intent: str
@dataclass
class ConversationalConfig:
"""Optional class-level defaults for conversational flows.
Override per kickoff via ``user_message``, ``session_id``, ``intents``, etc.
"""
default_intents: Sequence[str] | None = None
intent_llm: str | None = None
interactive_prompt: str = "You: "
interactive_timeout: float | None = None
exit_commands: Sequence[str] = field(default_factory=lambda: _EXIT_COMMANDS_DEFAULT)
defer_trace_finalization: bool = True
class ChatState(BaseModel):
"""Recommended persisted state shape for multi-turn flows."""
id: str = Field(default_factory=lambda: str(uuid4()))
messages: list[LLMMessage] = Field(default_factory=list)
last_user_message: str | None = None
last_intent: str | None = None
session_ready: bool = False
def _coerce_user_message_text(user_message: str | dict[str, Any] | Any) -> str:
if isinstance(user_message, str):
return user_message
if isinstance(user_message, dict):
content = user_message.get("content")
if content is not None:
return str(content)
return str(user_message)
def normalize_kickoff_inputs(
inputs: dict[str, Any] | None,
*,
user_message: str | dict[str, Any] | None = None,
session_id: str | None = None,
) -> dict[str, Any]:
"""Merge conversational kickoff kwargs into the inputs dict."""
merged: dict[str, Any] = dict(inputs or {})
if session_id is not None:
merged["id"] = session_id
if user_message is not None:
merged["user_message"] = user_message
elif "user_message" in merged and isinstance(merged["user_message"], str):
pass
return merged
def get_conversation_messages(flow: Flow[Any]) -> list[LLMMessage]:
"""Read message history from flow state or the internal fallback buffer."""
buffer: list[LLMMessage] = getattr(flow, "_conversation_messages", [])
state = getattr(flow, "_state", None)
if state is None:
return list(buffer)
if isinstance(state, dict):
messages = state.get("messages")
if isinstance(messages, list):
return cast(list[LLMMessage], messages)
elif isinstance(state, BaseModel) and hasattr(state, "messages"):
messages = getattr(state, "messages", None)
if isinstance(messages, list):
return cast(list[LLMMessage], messages)
return list(buffer)
def append_message(
flow: Flow[Any],
role: Literal["user", "assistant", "system", "tool"],
content: str,
**extra: Any,
) -> None:
"""Append a message to ``state.messages`` or the flow fallback buffer."""
message: LLMMessage = {"role": role, "content": content}
for key, value in extra.items():
if key in ("tool_call_id", "name", "tool_calls", "files"):
message[key] = value # type: ignore[literal-required]
state = getattr(flow, "_state", None)
if state is not None:
if isinstance(state, dict):
messages = state.get("messages")
if isinstance(messages, list):
messages.append(message)
return
elif isinstance(state, BaseModel) and hasattr(state, "messages"):
messages = getattr(state, "messages", None)
if messages is None:
object.__setattr__(state, "messages", [])
messages = state.messages
if isinstance(messages, list):
messages.append(message)
return
if not hasattr(flow, "_conversation_messages"):
object.__setattr__(flow, "_conversation_messages", [])
flow._conversation_messages.append(message)
def set_state_field(flow: Flow[Any], name: str, value: Any) -> None:
"""Set a field on structured or dict flow state when present."""
state = getattr(flow, "_state", None)
if state is None:
return
if isinstance(state, dict):
state[name] = value
elif isinstance(state, BaseModel) and hasattr(state, name):
object.__setattr__(state, name, value)
def receive_user_message(
flow: Flow[Any],
text: str,
*,
outcomes: Sequence[str] | None = None,
llm: str | BaseLLM | None = None,
metadata: dict[str, Any] | None = None,
) -> str:
"""Record a user turn: append message and optionally classify intent."""
append_message(flow, "user", text)
set_state_field(flow, "last_user_message", text)
if outcomes and llm is not None:
intent = flow.classify_intent(
text,
outcomes,
llm=llm,
context=get_conversation_messages(flow),
)
set_state_field(flow, "last_intent", intent)
return intent
return text
def prepare_conversational_turn(
flow: Flow[Any],
*,
user_message: str | dict[str, Any] | None = None,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
config: ConversationalConfig | None = None,
) -> None:
"""Hydrate conversation state after inputs are merged into flow state."""
if user_message is None:
state = getattr(flow, "_state", None)
if isinstance(state, dict) and "user_message" in state:
user_message = state["user_message"]
elif isinstance(state, BaseModel) and hasattr(state, "user_message"):
user_message = getattr(state, "user_message", None)
if user_message is None:
return
text = _coerce_user_message_text(user_message)
if not text.strip():
return
# Fresh classification each turn (do not reuse prior turn's route label).
set_state_field(flow, "last_intent", None)
resolved_intents = intents
if resolved_intents is None and config is not None:
resolved_intents = config.default_intents
resolved_llm = intent_llm
if resolved_llm is None and config is not None:
resolved_llm = config.intent_llm
if resolved_intents:
if resolved_llm is None:
raise ValueError("intent_llm is required when intents are provided")
receive_user_message(
flow,
text,
outcomes=resolved_intents,
llm=resolved_llm,
)
else:
receive_user_message(flow, text)
def input_history_to_messages(entries: Sequence[Any]) -> list[LLMMessage]:
"""Convert ``Flow.input_history`` entries to LLM message format."""
messages: list[LLMMessage] = []
for entry in entries:
prompt = entry.get("message") if isinstance(entry, dict) else None
response = entry.get("response") if isinstance(entry, dict) else None
if prompt:
messages.append({"role": "assistant", "content": str(prompt)})
if response:
messages.append({"role": "user", "content": str(response)})
return messages
def get_conversational_config(flow: Flow[Any]) -> ConversationalConfig | None:
"""Return class-level ``conversational_config`` if defined."""
return getattr(type(flow), "conversational_config", None)

View File

@@ -83,7 +83,20 @@ from crewai.events.types.flow_events import (
MethodExecutionStartedEvent,
)
from crewai.flow.constants import AND_CONDITION, OR_CONDITION
from crewai.flow.flow_context import current_flow_id, current_flow_request_id
from crewai.flow.conversation import (
ConversationalConfig,
append_message as _append_conversation_message,
get_conversation_messages,
get_conversational_config,
normalize_kickoff_inputs,
prepare_conversational_turn,
receive_user_message as _receive_user_message,
)
from crewai.flow.flow_context import (
current_flow_id,
current_flow_name,
current_flow_request_id,
)
from crewai.flow.flow_wrappers import (
FlowCondition,
FlowConditions,
@@ -952,6 +965,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
memory: Memory | MemoryScope | MemorySlice | None = Field(default=None)
input_provider: InputProvider | None = Field(default=None)
suppress_flow_events: bool = Field(default=False)
defer_trace_finalization: bool = Field(
default=False,
description=(
"When True, do not finalize the trace batch at the end of each kickoff. "
"Call finalize_session_traces() when the chat session ends."
),
)
human_feedback_history: list[HumanFeedbackResult] = Field(default_factory=list)
last_human_feedback: HumanFeedbackResult | None = Field(default=None)
@@ -1073,8 +1093,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
_pending_feedback_context: PendingFeedbackContext | None = PrivateAttr(default=None)
_human_feedback_method_outputs: dict[str, Any] = PrivateAttr(default_factory=dict)
_input_history: list[InputHistoryEntry] = PrivateAttr(default_factory=list)
_conversation_messages: list[dict[str, Any]] = PrivateAttr(default_factory=list)
_pending_user_message: str | dict[str, Any] | None = PrivateAttr(default=None)
_pending_intents: Sequence[str] | None = PrivateAttr(default=None)
_pending_intent_llm: str | BaseLLM | None = PrivateAttr(default=None)
_state: Any = PrivateAttr(default=None)
conversational_config: ClassVar[ConversationalConfig | None] = None
def __class_getitem__(cls: type[Flow[T]], item: type[T]) -> type[Flow[T]]: # type: ignore[override]
class _FlowGeneric(cls): # type: ignore[valid-type,misc]
pass
@@ -1199,6 +1225,116 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
result: list[str] = self.memory.extract_memories(content)
return result
@property
def conversation_messages(self) -> list[dict[str, Any]]:
"""Message history from state or the internal conversation buffer."""
return get_conversation_messages(self)
@property
def input_history(self) -> list[InputHistoryEntry]:
"""Read-only view of prompts and responses from ``ask()``."""
return list(self._input_history)
def append_message(
self,
role: Literal["user", "assistant", "system", "tool"],
content: str,
**extra: Any,
) -> None:
"""Append a message to conversation history on state or the fallback buffer."""
_append_conversation_message(self, role, content, **extra)
def classify_intent(
self,
text: str,
outcomes: Sequence[str],
*,
llm: str | BaseLLM,
context: Sequence[dict[str, Any]] | None = None,
) -> str:
"""Map user text to one of the given outcomes using an LLM."""
if context:
context_blob = "\n".join(
f"{m.get('role', 'user')}: {m.get('content', '')}" for m in context
)
feedback = f"{context_blob}\n\nLatest user message: {text}"
else:
feedback = text
return self._collapse_to_outcome(feedback, outcomes, llm)
def receive_user_message(
self,
text: str,
*,
outcomes: Sequence[str] | None = None,
llm: str | BaseLLM | None = None,
) -> str:
"""Append a user message and optionally set ``last_intent`` on state."""
return _receive_user_message(
self,
text,
outcomes=outcomes,
llm=llm,
)
def _configure_conversational_kickoff(
self,
*,
inputs: dict[str, Any] | None = None,
user_message: str | dict[str, Any] | None = None,
session_id: str | None = None,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
) -> dict[str, Any]:
"""Store pending conversational turn options for ``kickoff_async``."""
config = get_conversational_config(self) or self.conversational_config
resolved_intents = intents
resolved_llm = intent_llm
if config is not None:
if resolved_intents is None:
resolved_intents = config.default_intents
if resolved_llm is None:
resolved_llm = config.intent_llm
resolved_message = user_message
if resolved_message is None and inputs and "user_message" in inputs:
resolved_message = inputs["user_message"]
self._pending_user_message = resolved_message
self._pending_intents = list(resolved_intents) if resolved_intents else None
self._pending_intent_llm = resolved_llm
if config is not None and config.defer_trace_finalization:
self.defer_trace_finalization = True
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener().batch_manager.defer_session_finalization = True
return normalize_kickoff_inputs(
inputs,
user_message=resolved_message,
session_id=session_id,
)
def _clear_conversational_kickoff(self) -> None:
self._pending_user_message = None
self._pending_intents = None
self._pending_intent_llm = None
def _apply_pending_conversational_turn(self) -> None:
if self._pending_user_message is None:
return
config = get_conversational_config(self) or self.conversational_config
prepare_conversational_turn(
self,
user_message=self._pending_user_message,
intents=self._pending_intents,
intent_llm=self._pending_intent_llm,
config=config,
)
def _mark_or_listener_fired(self, listener_name: FlowMethodName) -> bool:
"""Mark an OR listener as fired atomically.
@@ -1532,20 +1668,19 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
reset_emission_counter()
reset_last_event_id()
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=None,
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self._flow_display_name(),
inputs=None,
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
get_env_context()
@@ -1698,29 +1833,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_result,
state=self._copy_and_serialize_state(),
),
)
if future and isinstance(future, Future):
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowFinishedEvent handler failed", exc_info=True)
if not self._should_defer_trace_finalization():
await self._emit_flow_finished_async(final_result)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
self._finalize_flow_trace_batch()
return final_result
@@ -2033,6 +2149,15 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
*,
user_message: str | dict[str, Any] | None = None,
session_id: str | None = None,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
interactive: bool = False,
interactive_prompt: str | None = None,
interactive_timeout: float | None = None,
exit_commands: Sequence[str] | None = None,
) -> Any | FlowStreamingOutput:
"""Start the flow execution in a synchronous context.
@@ -2052,10 +2177,49 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
If the referenced state is not found, the kickoff falls back
silently to baseline behavior. Cannot be combined with
``from_checkpoint``; passing both raises ``ValueError``.
user_message: Text or ``{"role": "user", "content": "..."}`` for this
chat turn. Appended to ``state.messages`` before the graph runs.
session_id: Conversation session UUID; merged into ``inputs["id"]``
for ``@persist`` restoration.
intents: Optional outcome labels for pre-kickoff intent classification.
intent_llm: LLM used when ``intents`` is set.
interactive: If True, run a CLI loop (``ask`` per line) until exit or
timeout. For local demos only; APIs should pass ``user_message``.
interactive_prompt: Prompt shown by ``ask()`` in interactive mode.
interactive_timeout: Per-line timeout for interactive ``ask()``.
exit_commands: Words that end interactive mode (default exit, quit).
Returns:
The final output from the flow or FlowStreamingOutput if streaming.
"""
if interactive:
if user_message is not None:
raise ValueError(
"Cannot pass user_message with interactive=True; "
"messages are collected via ask()."
)
if self.stream:
raise ValueError("interactive=True is not supported with stream=True")
return self._kickoff_interactive(
inputs=inputs,
input_files=input_files,
session_id=session_id,
intents=intents,
intent_llm=intent_llm,
interactive_prompt=interactive_prompt,
interactive_timeout=interactive_timeout,
exit_commands=exit_commands,
restore_from_state_id=restore_from_state_id,
)
inputs = self._configure_conversational_kickoff(
inputs=inputs,
user_message=user_message,
session_id=session_id,
intents=intents,
intent_llm=intent_llm,
)
if from_checkpoint is not None and restore_from_state_id is not None:
raise ValueError(
"Cannot combine `from_checkpoint` and `restore_from_state_id`. "
@@ -2064,7 +2228,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
restored = apply_checkpoint(self, from_checkpoint)
if restored is not None:
return restored.kickoff(inputs=inputs, input_files=input_files)
return restored.kickoff(
inputs=inputs,
input_files=input_files,
user_message=user_message,
session_id=session_id,
intents=intents,
intent_llm=intent_llm,
)
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
@@ -2087,6 +2258,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
inputs=inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
user_message=self._pending_user_message,
session_id=inputs.get("id") if inputs else None,
intents=self._pending_intents,
intent_llm=self._pending_intent_llm,
)
result_holder.append(result)
except Exception as e:
@@ -2110,11 +2285,18 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
return streaming_output
async def _run_flow() -> Any:
return await self.kickoff_async(
inputs,
input_files,
restore_from_state_id=restore_from_state_id,
)
try:
return await self.kickoff_async(
inputs,
input_files,
restore_from_state_id=restore_from_state_id,
user_message=self._pending_user_message,
session_id=inputs.get("id") if inputs else None,
intents=self._pending_intents,
intent_llm=self._pending_intent_llm,
)
finally:
self._clear_conversational_kickoff()
try:
asyncio.get_running_loop()
@@ -2124,12 +2306,72 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
except RuntimeError:
return asyncio.run(_run_flow())
def _kickoff_interactive(
self,
*,
inputs: dict[str, Any] | None,
input_files: dict[str, FileInput] | None,
session_id: str | None,
intents: Sequence[str] | None,
intent_llm: str | BaseLLM | None,
interactive_prompt: str | None,
interactive_timeout: float | None,
exit_commands: Sequence[str] | None,
restore_from_state_id: str | None,
) -> Any:
config = get_conversational_config(self) or self.conversational_config
prompt = interactive_prompt or (
config.interactive_prompt if config else "You: "
)
timeout = (
interactive_timeout
if interactive_timeout is not None
else (config.interactive_timeout if config else None)
)
exits = {
c.strip().lower()
for c in (
exit_commands or (config.exit_commands if config else ("exit", "quit"))
)
}
sid = session_id
if sid is None and inputs and "id" in inputs:
sid = str(inputs["id"])
if sid is None:
sid = str(uuid4())
last_result: Any = None
while True:
line = self.ask(prompt, timeout=timeout)
if line is None or line.strip().lower() in exits:
break
turn_inputs = self._configure_conversational_kickoff(
inputs=inputs,
user_message=line,
session_id=sid,
intents=intents,
intent_llm=intent_llm,
)
last_result = self.kickoff(
inputs=turn_inputs,
input_files=input_files,
restore_from_state_id=restore_from_state_id,
)
restore_from_state_id = None
self._clear_conversational_kickoff()
return last_result
async def kickoff_async(
self,
inputs: dict[str, Any] | None = None,
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
*,
user_message: str | dict[str, Any] | None = None,
session_id: str | None = None,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
) -> Any | FlowStreamingOutput:
"""Start the flow execution asynchronously.
@@ -2150,10 +2392,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
separate persistence key. If the referenced state is not
found, falls back silently to baseline. Cannot be combined
with ``from_checkpoint``; passing both raises ``ValueError``.
user_message: User text for this conversational turn.
session_id: Session UUID (``inputs["id"]``).
intents: Optional labels for pre-kickoff classification.
intent_llm: LLM for classification when ``intents`` is set.
Returns:
The final output from the flow, which is the result of the last executed method.
"""
inputs = self._configure_conversational_kickoff(
inputs=inputs,
user_message=user_message,
session_id=session_id,
intents=intents,
intent_llm=intent_llm,
)
if from_checkpoint is not None and restore_from_state_id is not None:
raise ValueError(
"Cannot combine `from_checkpoint` and `restore_from_state_id`. "
@@ -2162,7 +2416,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
restored = apply_checkpoint(self, from_checkpoint)
if restored is not None:
return await restored.kickoff_async(inputs=inputs, input_files=input_files)
return await restored.kickoff_async(
inputs=inputs,
input_files=input_files,
user_message=user_message,
session_id=session_id,
intents=intents,
intent_llm=intent_llm,
)
if self.stream:
result_holder: list[Any] = []
current_task_info: TaskInfo = {
@@ -2215,10 +2476,13 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
flow_id_token = None
request_id_token = None
flow_name_token = None
if current_flow_id.get() is None:
flow_id_token = current_flow_id.set(self.flow_id)
if current_flow_request_id.get() is None:
request_id_token = current_flow_request_id.set(self.flow_id)
if current_flow_name.get() is None:
flow_name_token = current_flow_name.set(self._flow_display_name())
try:
# Reset flow state for fresh execution unless restoring from persistence
@@ -2301,8 +2565,12 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
f"No flow state found for UUID: {restore_uuid}", color="red"
)
# Update state with any additional inputs (ignoring the 'id' key)
filtered_inputs = {k: v for k, v in inputs.items() if k != "id"}
# Update state with any additional inputs (ignoring conversational keys)
filtered_inputs = {
k: v
for k, v in inputs.items()
if k not in ("id", "user_message", "last_intent")
}
if filtered_inputs:
self._initialize_state(filtered_inputs)
@@ -2310,31 +2578,47 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
reset_emission_counter()
reset_last_event_id()
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
skip_flow_started = self._should_defer_trace_finalization() and getattr(
self, "_conversation_trace_started", False
)
if not skip_flow_started:
started_event = FlowStartedEvent(
type="flow_started",
flow_name=self._flow_display_name(),
inputs=inputs,
)
future = crewai_event_bus.emit(self, started_event)
if future:
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning("FlowStartedEvent handler failed", exc_info=True)
if self._should_defer_trace_finalization():
object.__setattr__(self, "_conversation_trace_started", True)
object.__setattr__(
self,
"_conversation_flow_started_event_id",
started_event.event_id,
)
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
TraceCollectionListener().batch_manager.defer_session_finalization = True
if not self.suppress_flow_events:
self._log_flow_event(
f"Flow started with ID: {self.flow_id}", color="bold magenta"
)
# After FlowStarted (when not suppressed): env events must not pre-empt
# trace batch init with implicit "crew" execution_type.
# After FlowStarted: env events must not pre-empt trace batch init
# with implicit "crew" execution_type.
get_env_context()
if inputs is not None and "id" not in inputs:
self._initialize_state(inputs)
self._apply_pending_conversational_turn()
if self._is_execution_resuming:
await self._replay_recorded_events()
@@ -2428,35 +2712,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
if not self.suppress_flow_events:
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_output,
state=self._copy_and_serialize_state(),
),
)
if future:
try:
await asyncio.wrap_future(future)
except Exception:
logger.warning(
"FlowFinishedEvent handler failed", exc_info=True
)
if not self._should_defer_trace_finalization():
await self._emit_flow_finished_async(final_output)
if not self.suppress_flow_events:
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
self._finalize_flow_trace_batch()
return final_output
finally:
self._clear_conversational_kickoff()
# Ensure all background memory saves complete before returning
if self.memory is not None and hasattr(self.memory, "drain_writes"):
self.memory.drain_writes()
@@ -2464,6 +2727,8 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
current_flow_request_id.reset(request_id_token)
if flow_id_token is not None:
current_flow_id.reset(flow_id_token)
if flow_name_token is not None:
current_flow_name.reset(flow_name_token)
detach(flow_token)
async def akickoff(
@@ -2472,6 +2737,11 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files: dict[str, FileInput] | None = None,
from_checkpoint: CheckpointConfig | None = None,
restore_from_state_id: str | None = None,
*,
user_message: str | dict[str, Any] | None = None,
session_id: str | None = None,
intents: Sequence[str] | None = None,
intent_llm: str | BaseLLM | None = None,
) -> Any | FlowStreamingOutput:
"""Native async method to start the flow execution. Alias for kickoff_async.
@@ -2483,6 +2753,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
restore_from_state_id: Optional UUID of a previously-persisted flow
whose latest snapshot should hydrate this run's state. See
``kickoff_async`` for full semantics.
user_message: User text for this conversational turn.
session_id: Session UUID (``inputs["id"]``).
intents: Optional labels for pre-kickoff classification.
intent_llm: LLM for classification when ``intents`` is set.
Returns:
The final output from the flow, which is the result of the last executed method.
@@ -2492,6 +2766,10 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
input_files,
from_checkpoint,
restore_from_state_id=restore_from_state_id,
user_message=user_message,
session_id=session_id,
intents=intents,
intent_llm=intent_llm,
)
async def _replay_recorded_events(self) -> None:
@@ -3354,6 +3632,9 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
),
)
if response:
_append_conversation_message(self, "user", response)
return response
def _request_human_feedback(
@@ -3554,6 +3835,98 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
)
return outcomes[0]
def _flow_display_name(self) -> str:
return self.name or self.__class__.__name__
def _should_defer_trace_finalization(self) -> bool:
if self.defer_trace_finalization:
return True
config = get_conversational_config(self)
return bool(config and config.defer_trace_finalization)
async def _emit_flow_finished_async(self, result: Any) -> None:
"""Emit ``FlowFinishedEvent`` and await handlers."""
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self._flow_display_name(),
result=result,
state=self._copy_and_serialize_state(),
),
)
if not future:
return
try:
if isinstance(future, Future):
await asyncio.wrap_future(future)
else:
await future
except Exception:
logger.warning("FlowFinishedEvent handler failed", exc_info=True)
def _emit_flow_finished_sync(self, result: Any) -> None:
"""Emit ``FlowFinishedEvent`` from synchronous session teardown."""
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.run(self._emit_flow_finished_async(result))
else:
raise RuntimeError(
"Cannot emit flow_finished synchronously while an event loop is running"
)
def finalize_session_traces(self) -> None:
"""Finalize the trace batch after a multi-turn conversational session."""
from crewai.events.event_context import restore_event_scope
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
trace_listener = TraceCollectionListener()
batch_manager = trace_listener.batch_manager
if batch_manager._batch_finalized or not batch_manager.is_batch_initialized():
batch_manager.defer_session_finalization = False
object.__setattr__(self, "_conversation_trace_started", False)
object.__setattr__(self, "_conversation_flow_started_event_id", None)
return
result = self._method_outputs[-1] if self._method_outputs else None
if self._should_defer_trace_finalization() and getattr(
self, "_conversation_trace_started", False
):
started_id = getattr(self, "_conversation_flow_started_event_id", None)
if started_id:
restore_event_scope(((started_id, "flow_started"),))
try:
self._emit_flow_finished_sync(result)
finally:
restore_event_scope(())
object.__setattr__(self, "_conversation_flow_started_event_id", None)
self._finalize_flow_trace_batch(force=True)
object.__setattr__(self, "_conversation_trace_started", False)
batch_manager.defer_session_finalization = False
def _finalize_flow_trace_batch(self, *, force: bool = False) -> None:
"""Finalize the active trace batch when this flow owns it."""
if not force and self._should_defer_trace_finalization():
return
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type != "flow":
return
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
def _log_flow_event(
self,
message: str,

View File

@@ -18,3 +18,7 @@ current_flow_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
current_flow_method_name: contextvars.ContextVar[str] = contextvars.ContextVar(
"flow_method_name", default="unknown"
)
current_flow_name: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"flow_name", default=None
)

View File

@@ -0,0 +1,4 @@
from crewai.flow.providers.queue import QueueInputProvider
__all__ = ["QueueInputProvider"]

View File

@@ -0,0 +1,82 @@
"""Queue-backed input provider for conversational flows."""
from __future__ import annotations
import queue
import threading
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from crewai.flow.flow import Flow
class QueueInputProvider:
"""Blocks on a per-session queue until a user message is pushed.
Use for long-running workers where ``Flow.ask()`` should wait on WebSocket
or another transport without blocking the event loop thread (flow runs ask
in a worker thread).
Example:
```python
provider = QueueInputProvider()
flow.input_provider = provider
# From a WebSocket handler:
provider.push(session_id, "hello")
# Inside the flow:
reply = flow.ask("You: ", metadata={"session_id": session_id})
```
"""
def __init__(self) -> None:
self._queues: dict[str, queue.Queue[str | None]] = {}
self._lock = threading.Lock()
def _get_queue(self, session_id: str) -> queue.Queue[str | None]:
with self._lock:
if session_id not in self._queues:
self._queues[session_id] = queue.Queue()
return self._queues[session_id]
def push(self, session_id: str, text: str) -> None:
"""Enqueue a user message for the given session."""
self._get_queue(session_id).put(text)
def close_session(self, session_id: str) -> None:
"""Signal end of session (unblocks ``ask()`` with None)."""
self._get_queue(session_id).put(None)
def request_input(
self,
message: str,
flow: Flow[Any],
metadata: dict[str, Any] | None = None,
) -> str | None:
session_id = self._resolve_session_id(flow, metadata)
if session_id is None:
return None
try:
return self._get_queue(session_id).get()
except Exception:
return None
@staticmethod
def _resolve_session_id(
flow: Flow[Any],
metadata: dict[str, Any] | None,
) -> str | None:
if metadata and metadata.get("session_id"):
return str(metadata["session_id"])
state = getattr(flow, "_state", None)
if state is None:
return None
if isinstance(state, dict):
value = state.get("id")
return str(value) if value else None
if hasattr(state, "id"):
value = getattr(state, "id", None)
return str(value) if value else None
return None

View File

@@ -940,6 +940,21 @@ class LLM(BaseLLM):
self._track_token_usage_internal(usage_info)
self._handle_streaming_callbacks(callbacks, usage_info, last_chunk)
if accumulated_tool_args and not available_functions:
tool_calls_list: list[ChatCompletionDeltaToolCall] = [
ChatCompletionDeltaToolCall(
index=idx,
function=Function(
name=tool_arg.function.name,
arguments=tool_arg.function.arguments,
),
)
for idx, tool_arg in sorted(accumulated_tool_args.items())
if tool_arg.function.name
]
if tool_calls_list:
return tool_calls_list
if not tool_calls or not available_functions:
if response_model and self.is_litellm:
instructor_instance = InternalInstructor(
@@ -1535,8 +1550,7 @@ class LLM(BaseLLM):
if usage_info:
self._track_token_usage_internal(usage_info)
if accumulated_tool_args and available_functions:
# Convert accumulated tool args to ChatCompletionDeltaToolCall objects
if accumulated_tool_args:
tool_calls_list: list[ChatCompletionDeltaToolCall] = [
ChatCompletionDeltaToolCall(
index=idx,
@@ -1545,21 +1559,24 @@ class LLM(BaseLLM):
arguments=tool_arg.function.arguments,
),
)
for idx, tool_arg in accumulated_tool_args.items()
for idx, tool_arg in sorted(accumulated_tool_args.items())
if tool_arg.function.name
]
if tool_calls_list:
result = self._handle_streaming_tool_calls(
tool_calls=tool_calls_list,
accumulated_tool_args=accumulated_tool_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
)
if result is not None:
return result
if available_functions:
result = self._handle_streaming_tool_calls(
tool_calls=tool_calls_list,
accumulated_tool_args=accumulated_tool_args,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_id=response_id,
)
if result is not None:
return result
else:
return tool_calls_list
usage_dict = self._usage_to_dict(usage_info)
self._handle_emit_call_events(

View File

@@ -3,15 +3,20 @@
Provides filesystem-based skill packaging with progressive disclosure.
"""
from crewai.skills.cache import SkillCacheManager
from crewai.skills.loader import activate_skill, discover_skills
from crewai.skills.models import Skill, SkillFrontmatter
from crewai.skills.parser import SkillParseError
from crewai.skills.registry import is_registry_ref, resolve_registry_ref
__all__ = [
"Skill",
"SkillCacheManager",
"SkillFrontmatter",
"SkillParseError",
"activate_skill",
"discover_skills",
"is_registry_ref",
"resolve_registry_ref",
]

View File

@@ -0,0 +1,148 @@
"""Cache manager for registry-downloaded skills.
Manages ~/.crewai/skills/{org}/{name}/ as the global skill cache.
One version is stored per skill (last install wins).
"""
from __future__ import annotations
from datetime import datetime, timezone
import json
import logging
from pathlib import Path
import tarfile
from typing import TypedDict
import zipfile
_logger = logging.getLogger(__name__)
_CACHE_ROOT = Path.home() / ".crewai" / "skills"
_META_FILENAME = ".crewai_meta.json"
class SkillMetadata(TypedDict):
org: str
name: str
version: str | None
installed_at: str
class SkillCacheManager:
"""Manages the global skill cache at ~/.crewai/skills/."""
def __init__(self, cache_root: Path | None = None) -> None:
self._root = cache_root or _CACHE_ROOT
def _skill_dir(self, org: str, name: str) -> Path:
return self._root / org / name
def get_cached_path(self, org: str, name: str) -> Path | None:
"""Return the cached skill directory path if it exists, else None."""
skill_dir = self._skill_dir(org, name)
meta_file = skill_dir / _META_FILENAME
if skill_dir.is_dir() and meta_file.exists():
return skill_dir
return None
def store(
self, org: str, name: str, version: str | None, archive_bytes: bytes
) -> Path:
"""Unpack an archive into the cache and write metadata.
Uses tarfile with filter='data' for path-traversal protection.
Args:
org: Organisation slug.
name: Skill name.
version: Semantic version string, or None if unknown.
archive_bytes: Raw bytes of a .tar.gz archive.
Returns:
Path to the stored skill directory.
"""
skill_dir = self._skill_dir(org, name)
# Wipe any previous version
if skill_dir.exists():
import shutil
shutil.rmtree(skill_dir)
skill_dir.mkdir(parents=True, exist_ok=True)
import io
# Try tar.gz first, fall back to zip
try:
with tarfile.open(fileobj=io.BytesIO(archive_bytes), mode="r:gz") as tf:
try:
tf.extractall(skill_dir, filter="data")
except TypeError:
_safe_extractall(tf, skill_dir)
except tarfile.TarError:
with zipfile.ZipFile(io.BytesIO(archive_bytes)) as zf:
_safe_extract_zip(zf, skill_dir)
meta: SkillMetadata = {
"org": org,
"name": name,
"version": version,
"installed_at": datetime.now(tz=timezone.utc).isoformat(),
}
(skill_dir / _META_FILENAME).write_text(json.dumps(meta, indent=2))
return skill_dir
def list_cached(self) -> list[SkillMetadata]:
"""Return metadata for every cached skill."""
results: list[SkillMetadata] = []
if not self._root.exists():
return results
for org_dir in sorted(self._root.iterdir()):
if not org_dir.is_dir():
continue
for skill_dir in sorted(org_dir.iterdir()):
meta_file = skill_dir / _META_FILENAME
if meta_file.exists():
try:
results.append(json.loads(meta_file.read_text()))
except (json.JSONDecodeError, KeyError):
_logger.debug(
"Skipping malformed cache entry: %s",
meta_file,
exc_info=True,
)
return results
def invalidate(self, org: str, name: str) -> bool:
"""Remove a cached skill.
Returns:
True if the cache entry existed and was removed, False otherwise.
"""
skill_dir = self._skill_dir(org, name)
if skill_dir.exists():
import shutil
shutil.rmtree(skill_dir)
return True
return False
def _safe_extractall(tf: tarfile.TarFile, dest: Path) -> None:
"""Path-traversal-safe extraction for Python < 3.12."""
dest_resolved = dest.resolve()
for member in tf.getmembers():
member_path = (dest / member.name).resolve()
if not member_path.is_relative_to(dest_resolved):
raise ValueError(f"Blocked path traversal attempt: {member.name!r}")
tf.extractall(dest) # noqa: S202
def _safe_extract_zip(zf: zipfile.ZipFile, dest: Path) -> None:
"""Path-traversal-safe ZIP extraction."""
dest_resolved = dest.resolve()
for member in zf.namelist():
member_path = (dest / member).resolve()
if not member_path.is_relative_to(dest_resolved):
raise ValueError(f"Blocked path traversal attempt: {member!r}")
zf.extractall(dest) # noqa: S202

View File

@@ -78,6 +78,10 @@ class SkillFrontmatter(BaseModel):
alias="allowed-tools",
description="Pre-approved tool names the skill may use, parsed from a space-delimited string in frontmatter.",
)
version: str | None = Field(
default=None,
description="Semantic version of the skill, e.g. '1.0.0'. Optional for local skills.",
)
@model_validator(mode="before")
@classmethod

View File

@@ -0,0 +1,223 @@
"""Registry reference resolution for the Agent Skills standard.
Handles @org/skill-name references, local-first resolution, and downloads
via the CrewAI+ API with a global cache at ~/.crewai/skills/.
"""
from __future__ import annotations
import logging
from pathlib import Path
import sys
from typing import Any
from crewai.skills.cache import SkillCacheManager
_logger = logging.getLogger(__name__)
class SkillNotCachedError(Exception):
"""Raised when a registry skill is not cached and the environment is non-interactive."""
def __init__(self, ref: str) -> None:
super().__init__(
f"Skill {ref!r} is not cached locally. "
f"Run `crewai skill install {ref}` to install it first."
)
self.ref = ref
def is_registry_ref(value: Any) -> bool:
"""Return True if *value* looks like a registry reference (@org/name)."""
return isinstance(value, str) and value.startswith("@")
def parse_registry_ref(ref: str) -> tuple[str, str]:
"""Parse '@org/skill-name' into (org, name).
Args:
ref: A registry reference, e.g. '@acme/my-skill'.
Returns:
A (org, name) tuple.
Raises:
ValueError: If the reference format is invalid.
"""
if not ref.startswith("@"):
raise ValueError(f"Registry reference must start with '@', got: {ref!r}")
without_at = ref[1:]
if without_at.count("/") != 1:
raise ValueError(
f"Registry reference must be in '@org/name' format, got: {ref!r}"
)
org, name = without_at.split("/", 1)
if (
not org
or not name
or org.startswith(".")
or name.startswith(".")
or "/" in org
or "/" in name
):
raise ValueError(
f"Registry reference org and name must be single, non-empty path "
f"segments (no '..' or leading dots), got: {ref!r}"
)
return org, name
def _is_noninteractive() -> bool:
"""Return True in CI or explicitly non-interactive environments."""
import os
return (
os.environ.get("CI") == "1"
or os.environ.get("CREWAI_NONINTERACTIVE") == "1"
or not sys.stdin.isatty()
)
def resolve_registry_ref(
ref: str,
source: Any = None,
) -> Skill: # type: ignore[name-defined] # noqa: F821
"""Resolve a registry reference to a Skill object.
Resolution order:
1. ./skills/{name}/ in the current working directory (project-local)
2. ~/.crewai/skills/{org}/{name}/ (global cache)
3. Download from registry (interactive only; raises SkillNotCachedError in CI)
Args:
ref: A registry reference, e.g. '@acme/my-skill'.
source: Optional source object passed through to skill loaders (for events).
Returns:
A Skill loaded at INSTRUCTIONS disclosure level.
Raises:
SkillNotCachedError: When not cached and running in non-interactive mode.
"""
from crewai.skills.loader import activate_skill
from crewai.skills.parser import load_skill_metadata
org, name = parse_registry_ref(ref)
# 1. Project-local: ./skills/{name}/
local_path = Path.cwd() / "skills" / name
if local_path.is_dir() and (local_path / "SKILL.md").exists():
try:
skill = load_skill_metadata(local_path)
return activate_skill(skill, source=source)
except Exception:
_logger.debug("Failed to load local skill at %s", local_path, exc_info=True)
# 2. Global cache
cache = SkillCacheManager()
cached_path = cache.get_cached_path(org, name)
if cached_path is not None and (cached_path / "SKILL.md").exists():
try:
skill = load_skill_metadata(cached_path)
return activate_skill(skill, source=source)
except Exception:
_logger.debug(
"Failed to load cached skill at %s", cached_path, exc_info=True
)
# 3. Download
if _is_noninteractive():
raise SkillNotCachedError(ref)
return download_skill(org, name, source=source)
def download_skill(
org: str,
name: str,
source: Any = None,
) -> Skill: # type: ignore[name-defined] # noqa: F821
"""Download a skill from the registry and store it in the cache.
Args:
org: Organisation slug.
name: Skill name.
source: Optional source for event emission.
Returns:
The downloaded Skill at INSTRUCTIONS level.
"""
from crewai.skills.loader import activate_skill
from crewai.skills.parser import load_skill_metadata
ref = f"@{org}/{name}"
try:
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.skill_events import (
SkillDownloadCompletedEvent,
SkillDownloadStartedEvent,
)
_has_events = True
except ImportError:
_has_events = False
if _has_events:
crewai_event_bus.emit(
source,
event=SkillDownloadStartedEvent(
registry_ref=ref,
),
)
try:
from crewai_core.plus_api import PlusAPI
api = PlusAPI()
response = api.get_skill(org, name)
response.raise_for_status()
data = response.json()
except Exception as exc:
raise RuntimeError(
f"Failed to download skill {ref!r} from registry: {exc}"
) from exc
import base64
import httpx
version = data.get("latest_version") or data.get("version")
download_url = data.get("download_url")
if download_url:
dl_response = httpx.get(download_url, follow_redirects=True)
dl_response.raise_for_status()
archive_bytes = dl_response.content
else:
encoded = data.get("file", "")
# Strip data URI prefix if present
if "," in encoded:
encoded = encoded.split(",", 1)[1]
archive_bytes = base64.b64decode(encoded)
cache = SkillCacheManager()
skill_dir = cache.store(org, name, version, archive_bytes)
if _has_events:
crewai_event_bus.emit(
source,
event=SkillDownloadCompletedEvent(
registry_ref=ref,
version=version,
cache_path=skill_dir,
),
)
if not (skill_dir / "SKILL.md").exists():
raise RuntimeError(
f"Skill archive for {ref!r} downloaded but no SKILL.md found in {skill_dir}"
)
skill = load_skill_metadata(skill_dir)
return activate_skill(skill, source=source)

View File

@@ -13,6 +13,7 @@ import sys
import types
from typing import Any, cast, get_type_hints
from crewai_core.plus_api import AvailableExport, EnvVarEntry, ToolMetadata
from crewai_core.project import (
get_project_description as get_project_description,
get_project_name as get_project_name,
@@ -279,7 +280,7 @@ def is_valid_tool(obj: Any) -> bool:
return isinstance(obj, Tool)
def extract_available_exports(dir_path: str = "src") -> list[dict[str, Any]]:
def extract_available_exports(dir_path: str = "src") -> list[AvailableExport]:
"""Extract available tool classes from the project's __init__.py files.
Only includes classes that inherit from BaseTool or functions decorated with @tool.
@@ -338,7 +339,7 @@ def _load_module_from_file(
sys.modules.pop(module_name, None)
def _load_tools_from_init(init_file: Path) -> list[dict[str, Any]]:
def _load_tools_from_init(init_file: Path) -> list[AvailableExport]:
"""Load and validate tools from a given __init__.py file."""
try:
with _load_module_from_file(init_file) as module:
@@ -392,7 +393,7 @@ def _print_no_tools_warning() -> None:
)
def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]:
def extract_tools_metadata(dir_path: str = "src") -> list[ToolMetadata]:
"""
Extract rich metadata from tool classes in the project.
@@ -404,7 +405,7 @@ def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]:
- init_params_schema: JSON Schema for __init__ params (filtered)
- env_vars: List of environment variable dicts
"""
tools_metadata: list[dict[str, Any]] = []
tools_metadata: list[ToolMetadata] = []
for init_file in Path(dir_path).glob("**/__init__.py"):
tools = _extract_tool_metadata_from_init(init_file)
@@ -413,7 +414,7 @@ def extract_tools_metadata(dir_path: str = "src") -> list[dict[str, Any]]:
return tools_metadata
def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]:
def _extract_tool_metadata_from_init(init_file: Path) -> list[ToolMetadata]:
"""
Load module from init file and extract metadata from valid tool classes.
"""
@@ -428,7 +429,7 @@ def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]:
if not exported_names:
return []
tools_metadata = []
tools_metadata: list[ToolMetadata] = []
for name in exported_names:
obj = getattr(module, name, None)
if obj is None or not (
@@ -446,7 +447,7 @@ def _extract_tool_metadata_from_init(init_file: Path) -> list[dict[str, Any]]:
return []
def _extract_single_tool_metadata(tool_class: type) -> dict[str, Any] | None:
def _extract_single_tool_metadata(tool_class: type) -> ToolMetadata | None:
"""
Extract metadata from a single tool class.
"""
@@ -470,19 +471,17 @@ def _extract_single_tool_metadata(tool_class: type) -> dict[str, Any] | None:
except (TypeError, ValueError):
module = tool_class.__module__
return {
"name": tool_class.__name__,
"module": module,
"humanized_name": _extract_field_default(
fields.get("name"), fallback=tool_class.__name__
return ToolMetadata(
name=tool_class.__name__,
module=module,
humanized_name=str(
_extract_field_default(fields.get("name"), fallback=tool_class.__name__)
),
"description": str(
_extract_field_default(fields.get("description"))
).strip(),
"run_params_schema": _extract_run_params_schema(fields.get("args_schema")),
"init_params_schema": _extract_init_params_schema(tool_class),
"env_vars": _extract_env_vars(fields.get("env_vars")),
}
description=str(_extract_field_default(fields.get("description"))).strip(),
run_params_schema=_extract_run_params_schema(fields.get("args_schema")),
init_params_schema=_extract_init_params_schema(tool_class),
env_vars=_extract_env_vars(fields.get("env_vars")),
)
except Exception:
return None
@@ -597,7 +596,7 @@ def _extract_init_params_schema(tool_class: type) -> dict[str, Any]:
return {}
def _extract_env_vars(env_vars_field: dict[str, Any] | None) -> list[dict[str, Any]]:
def _extract_env_vars(env_vars_field: dict[str, Any] | None) -> list[EnvVarEntry]:
"""
Extract environment variable definitions from env_vars field.
"""

View File

@@ -0,0 +1,116 @@
"""Tests for SkillCacheManager."""
from __future__ import annotations
import gzip
import io
import json
import tarfile
from pathlib import Path
from crewai.skills.cache import SkillCacheManager
def _make_tar_gz(files: dict[str, str]) -> bytes:
"""Build an in-memory .tar.gz containing the given filename → content mapping."""
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
gz_buf = io.BytesIO()
with tarfile.open(fileobj=gz_buf, mode="w") as tf:
for name, content in files.items():
data = content.encode()
info = tarfile.TarInfo(name=name)
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
gz.write(gz_buf.getvalue())
buf.seek(0)
# Re-create properly: gzip wrapping a tar stream
out = io.BytesIO()
with tarfile.open(fileobj=out, mode="w:gz") as tf:
for name, content in files.items():
data = content.encode()
info = tarfile.TarInfo(name=name)
info.size = len(data)
tf.addfile(info, io.BytesIO(data))
return out.getvalue()
class TestSkillCacheManager:
def test_get_cached_path_missing(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
assert cache.get_cached_path("acme", "my-skill") is None
def test_store_and_retrieve(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
archive = _make_tar_gz({"SKILL.md": "---\nname: my-skill\n---\nHello"})
dest = cache.store("acme", "my-skill", "1.0.0", archive)
assert dest.is_dir()
assert (dest / "SKILL.md").exists()
retrieved = cache.get_cached_path("acme", "my-skill")
assert retrieved == dest
def test_store_writes_metadata(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
archive = _make_tar_gz({"SKILL.md": "content"})
dest = cache.store("acme", "my-skill", "2.3.4", archive)
meta_file = dest / ".crewai_meta.json"
assert meta_file.exists()
meta = json.loads(meta_file.read_text())
assert meta["org"] == "acme"
assert meta["name"] == "my-skill"
assert meta["version"] == "2.3.4"
assert "installed_at" in meta
def test_store_overwrites_previous_version(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
archive_v1 = _make_tar_gz({"SKILL.md": "v1", "extra.txt": "old"})
cache.store("acme", "my-skill", "1.0.0", archive_v1)
archive_v2 = _make_tar_gz({"SKILL.md": "v2"})
dest = cache.store("acme", "my-skill", "2.0.0", archive_v2)
# Old file should be gone
assert not (dest / "extra.txt").exists()
assert (dest / "SKILL.md").read_text() == "v2"
meta = json.loads((dest / ".crewai_meta.json").read_text())
assert meta["version"] == "2.0.0"
def test_list_cached_empty(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
assert cache.list_cached() == []
def test_list_cached(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
archive = _make_tar_gz({"SKILL.md": "x"})
cache.store("acme", "skill-a", "1.0.0", archive)
cache.store("acme", "skill-b", "0.1.0", archive)
cache.store("other-org", "skill-c", None, archive)
entries = cache.list_cached()
names = {e["name"] for e in entries}
assert names == {"skill-a", "skill-b", "skill-c"}
def test_invalidate_existing(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
archive = _make_tar_gz({"SKILL.md": "x"})
cache.store("acme", "my-skill", "1.0.0", archive)
removed = cache.invalidate("acme", "my-skill")
assert removed is True
assert cache.get_cached_path("acme", "my-skill") is None
def test_invalidate_missing(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
removed = cache.invalidate("acme", "ghost-skill")
assert removed is False
def test_store_version_none(self, tmp_path: Path) -> None:
cache = SkillCacheManager(cache_root=tmp_path)
archive = _make_tar_gz({"SKILL.md": "x"})
dest = cache.store("acme", "my-skill", None, archive)
meta = json.loads((dest / ".crewai_meta.json").read_text())
assert meta["version"] is None

View File

@@ -0,0 +1,32 @@
"""Tests for the version field added to SkillFrontmatter."""
from __future__ import annotations
import pytest
from pydantic import ValidationError
from crewai.skills.models import SkillFrontmatter
class TestSkillFrontmatterVersion:
def test_version_defaults_to_none(self) -> None:
fm = SkillFrontmatter(name="my-skill", description="A skill.")
assert fm.version is None
def test_version_can_be_set(self) -> None:
fm = SkillFrontmatter(name="my-skill", description="A skill.", version="1.2.3")
assert fm.version == "1.2.3"
def test_existing_frontmatter_without_version_still_valid(self) -> None:
"""Backward compat: existing SKILL.md files without version must still parse."""
fm = SkillFrontmatter(name="old-skill", description="Old skill without version.")
assert fm.version is None
def test_version_is_optional_string(self) -> None:
fm = SkillFrontmatter(name="my-skill", description="Desc.", version=None)
assert fm.version is None
def test_frontmatter_is_frozen(self) -> None:
fm = SkillFrontmatter(name="my-skill", description="A skill.", version="1.0.0")
with pytest.raises(ValidationError):
fm.version = "2.0.0" # type: ignore[misc]

View File

@@ -0,0 +1,129 @@
"""Tests for SkillRegistry."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from crewai.skills.registry import (
SkillNotCachedError,
is_registry_ref,
parse_registry_ref,
)
class TestIsRegistryRef:
def test_at_prefixed(self) -> None:
assert is_registry_ref("@acme/my-skill") is True
def test_plain_string(self) -> None:
assert is_registry_ref("my-skill") is False
def test_path_like_string(self) -> None:
assert is_registry_ref("./skills/my-skill") is False
def test_non_string(self) -> None:
assert is_registry_ref(None) is False
assert is_registry_ref(42) is False
assert is_registry_ref(Path("something")) is False
class TestParseRegistryRef:
def test_valid(self) -> None:
assert parse_registry_ref("@acme/my-skill") == ("acme", "my-skill")
def test_valid_with_dashes(self) -> None:
assert parse_registry_ref("@my-org/cool-skill") == ("my-org", "cool-skill")
def test_missing_at(self) -> None:
with pytest.raises(ValueError, match="must start with '@'"):
parse_registry_ref("acme/my-skill")
def test_missing_slash(self) -> None:
with pytest.raises(ValueError, match="'@org/name' format"):
parse_registry_ref("@acme-skill")
def test_empty_org(self) -> None:
with pytest.raises(ValueError, match="non-empty"):
parse_registry_ref("@/my-skill")
def test_empty_name(self) -> None:
with pytest.raises(ValueError, match="non-empty"):
parse_registry_ref("@acme/")
class TestResolveRegistryRef:
"""Test resolution order and CI mode behaviour."""
def _make_skill_dir(self, base: Path, name: str) -> Path:
"""Write a minimal SKILL.md into base/name/."""
skill_dir = base / name
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
f"---\nname: {name}\ndescription: Test skill.\n---\n\nInstructions."
)
return skill_dir
def test_resolves_project_local(self, tmp_path: Path) -> None:
"""Local ./skills/{name}/ takes priority over cache."""
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
self._make_skill_dir(skills_dir, "my-skill")
# Mock SkillCacheManager to return None (not cached) so only local is hit
mock_cache = MagicMock()
mock_cache.get_cached_path.return_value = None
with (
patch("crewai.skills.registry._is_noninteractive", return_value=False),
patch.object(Path, "cwd", return_value=tmp_path),
patch("crewai.skills.registry.SkillCacheManager", return_value=mock_cache),
):
from crewai.skills.registry import resolve_registry_ref
skill = resolve_registry_ref("@acme/my-skill")
assert skill.name == "my-skill"
def test_raises_in_ci_when_not_cached(self, tmp_path: Path) -> None:
"""In CI mode, raise SkillNotCachedError if no local or cached copy."""
mock_cache = MagicMock()
mock_cache.get_cached_path.return_value = None
with (
patch("crewai.skills.registry._is_noninteractive", return_value=True),
patch.object(Path, "cwd", return_value=tmp_path),
patch("crewai.skills.registry.SkillCacheManager", return_value=mock_cache),
):
from crewai.skills.registry import resolve_registry_ref
with pytest.raises(SkillNotCachedError) as exc_info:
resolve_registry_ref("@acme/ghost-skill")
assert "@acme/ghost-skill" in str(exc_info.value)
def test_resolves_from_cache(self, tmp_path: Path) -> None:
"""Falls back to global cache when no project-local skill exists."""
cache_dir = tmp_path / "acme" / "cached-skill"
cache_dir.mkdir(parents=True)
(cache_dir / "SKILL.md").write_text(
"---\nname: cached-skill\ndescription: Cached.\n---\n\nCached instructions."
)
mock_cache = MagicMock()
mock_cache.get_cached_path.return_value = cache_dir
# tmp_path has no ./skills/ directory
with (
patch("crewai.skills.registry._is_noninteractive", return_value=False),
patch.object(Path, "cwd", return_value=tmp_path),
patch("crewai.skills.registry.SkillCacheManager", return_value=mock_cache),
):
from crewai.skills.registry import resolve_registry_ref
skill = resolve_registry_ref("@acme/cached-skill")
assert skill.name == "cached-skill"
def test_skill_not_cached_error_contains_ref(self) -> None:
err = SkillNotCachedError("@foo/bar")
assert "@foo/bar" in str(err)
assert err.ref == "@foo/bar"

View File

@@ -0,0 +1,480 @@
"""Tests for conversational Flow helpers and kickoff parameters."""
from __future__ import annotations
from typing import Any
from unittest.mock import MagicMock, patch
from uuid import uuid4
import pytest
from pydantic import BaseModel, Field
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.trace_listener import TraceCollectionListener
from crewai.events.types.flow_events import FlowStartedEvent
from crewai.events.types.llm_events import LLMCallStartedEvent
from crewai.flow import Flow, ChatState, listen, start
from crewai.flow.flow_context import current_flow_id, current_flow_name
from crewai.flow.conversation import (
ConversationalConfig,
append_message,
get_conversation_messages,
normalize_kickoff_inputs,
prepare_conversational_turn,
)
from crewai.flow.chat import ChatMessage, ChatSession
from crewai.flow.providers import QueueInputProvider
from crewai.utilities.types import LLMMessage
class SimpleChatFlow(Flow[ChatState]):
@start()
def begin(self):
return "done"
class DictChatFlow(Flow):
@start()
def begin(self):
return self.state.get("marker", "ok")
class TestNormalizeKickoffInputs:
def test_merges_session_and_user_message(self) -> None:
merged = normalize_kickoff_inputs(
{"foo": 1},
user_message="hello",
session_id="sess-1",
)
assert merged["id"] == "sess-1"
assert merged["user_message"] == "hello"
assert merged["foo"] == 1
class TestMessageHelpers:
def test_append_message_on_pydantic_state(self) -> None:
flow = SimpleChatFlow()
flow._state = ChatState()
append_message(flow, "user", "hi")
assert get_conversation_messages(flow) == [{"role": "user", "content": "hi"}]
def test_append_message_fallback_buffer(self) -> None:
flow = DictChatFlow()
class _State:
id = str(uuid4())
flow._state = _State()
append_message(flow, "assistant", "reply")
assert get_conversation_messages(flow) == [
{"role": "assistant", "content": "reply"}
]
assert flow._conversation_messages == [
{"role": "assistant", "content": "reply"}
]
class TestIntentPerTurn:
def test_prepare_clears_stale_last_intent(self) -> None:
flow = SimpleChatFlow()
flow._state = ChatState(last_intent="ORDER", messages=[])
prepare_conversational_turn(flow, user_message="hello")
assert flow.state.last_intent is None
class TestKickoffConversational:
def test_kickoff_user_message_hydrates_state(self) -> None:
flow = SimpleChatFlow()
flow.kickoff(user_message="track my order", session_id="session-abc")
assert flow.state.last_user_message == "track my order"
assert any(
m.get("role") == "user" and m.get("content") == "track my order"
for m in flow.state.messages
)
assert flow.state.id == "session-abc"
def test_kickoff_classifies_intent_when_configured(self) -> None:
flow = SimpleChatFlow()
with patch.object(
flow,
"_collapse_to_outcome",
return_value="order",
) as mock_collapse:
flow.kickoff(
user_message="where is my package",
session_id="s1",
intents=["order", "help"],
intent_llm="gpt-4o-mini",
)
mock_collapse.assert_called_once()
assert flow.state.last_intent == "order"
def test_ask_appends_to_messages(self) -> None:
class AskFlow(Flow[ChatState]):
input_provider = MagicMock()
input_provider.request_input = MagicMock(return_value="user reply")
@start()
def begin(self):
self.ask("Prompt:")
return "ok"
flow = AskFlow()
flow._state = ChatState()
flow.kickoff()
assert any(
m.get("role") == "user" and m.get("content") == "user reply"
for m in flow.state.messages
)
class TestClassifyIntent:
def test_uses_collapse_with_context(self) -> None:
flow = SimpleChatFlow()
flow._state = ChatState(
messages=[{"role": "user", "content": "prior"}],
)
with patch.object(flow, "_collapse_to_outcome", return_value="help") as mock:
outcome = flow.classify_intent(
"I need help",
["order", "help"],
llm="gpt-4o-mini",
context=flow.conversation_messages,
)
assert outcome == "help"
assert "I need help" in mock.call_args[0][0]
class TestQueueInputProvider:
def test_push_and_request_input(self) -> None:
provider = QueueInputProvider()
flow = SimpleChatFlow()
flow._state = ChatState(id="sess-q")
provider.push("sess-q", "hello")
result = provider.request_input(">", flow, metadata={"session_id": "sess-q"})
assert result == "hello"
class TestChatSession:
def test_handle_turn_returns_turn_result(self) -> None:
flow = SimpleChatFlow()
session = ChatSession(
flow,
session_id="chat-1",
intents=["order", "help"],
intent_llm="gpt-4o-mini",
)
with patch.object(flow, "_collapse_to_outcome", return_value="help"):
turn = session.handle_turn("hi there")
assert turn.session_id == "chat-1"
assert turn.output == "done"
assert turn.intent == "help"
assert any(m["role"] == "user" for m in turn.messages)
session.close()
def test_chat_message_model(self) -> None:
msg = ChatMessage(
type="assistant_delta",
session_id="x",
payload={"chunk": "hi"},
)
assert msg.version == "1"
assert msg.type == "assistant_delta"
class TestFlowTracingWhenSuppressed:
def test_flow_started_emitted_when_panel_events_suppressed(self) -> None:
class QuietFlow(Flow[ChatState]):
suppress_flow_events = True
@start()
def begin(self) -> str:
return "ok"
started: list[str] = []
original_emit = crewai_event_bus.emit
def track_emit(source: Any, event: Any, *args: Any, **kwargs: Any) -> Any:
if isinstance(event, FlowStartedEvent):
started.append(event.flow_name)
return original_emit(source, event, *args, **kwargs)
with patch.object(crewai_event_bus, "emit", side_effect=track_emit):
QuietFlow().kickoff()
assert started == ["QuietFlow"]
def test_llm_action_inside_flow_claims_flow_trace_batch(self) -> None:
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.batch_owner_id = None
flow_id_token = current_flow_id.set("flow-test-id")
flow_name_token = current_flow_name.set("DemoSupportFlow")
try:
event = LLMCallStartedEvent(
model="gpt-4o-mini",
messages=[],
call_id="call-test",
)
listener._handle_action_event("llm_call_started", object(), event)
finally:
current_flow_id.reset(flow_id_token)
current_flow_name.reset(flow_name_token)
assert listener.batch_manager.batch_owner_type == "flow"
assert listener.batch_manager.batch_owner_id == "flow-test-id"
assert (
listener.batch_manager.current_batch.execution_metadata["execution_type"]
== "flow"
)
assert (
listener.batch_manager.current_batch.execution_metadata["flow_name"]
== "DemoSupportFlow"
)
class TestDeferTraceFinalization:
def test_conversational_kickoff_enables_defer_flag(self) -> None:
class ChatFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
defer_trace_finalization=True
)
@start()
def begin(self) -> str:
return "ok"
flow = ChatFlow()
flow._configure_conversational_kickoff(
user_message="hi",
session_id="sess-trace",
)
assert flow.defer_trace_finalization is True
assert flow._should_defer_trace_finalization() is True
def test_finalize_skipped_until_forced(self) -> None:
flow = SimpleChatFlow()
flow.defer_trace_finalization = True
with patch(
"crewai.events.listeners.tracing.trace_listener.TraceCollectionListener"
) as mock_listener_cls:
mock_listener_cls.return_value.batch_manager.batch_owner_type = "flow"
mock_listener_cls.return_value.first_time_handler.is_first_time = False
flow._finalize_flow_trace_batch()
mock_listener_cls.assert_not_called()
flow._finalize_flow_trace_batch(force=True)
mock_listener_cls.assert_called_once()
class TestDeferredFlowLifecycleEvents:
def test_deferred_kickoff_skips_per_turn_flow_finished(self) -> None:
class ChatFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
defer_trace_finalization=True
)
@start()
def begin(self) -> str:
return "ok"
flow = ChatFlow()
with patch.object(flow, "_emit_flow_finished_async") as mock_finished:
flow.kickoff(user_message="hi", session_id="sess-lifecycle")
mock_finished.assert_not_called()
def test_flow_finished_without_flow_started_warns(self, capsys) -> None:
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import restore_event_scope
from crewai.events.types.flow_events import FlowFinishedEvent
class BareFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "ok"
restore_event_scope(())
flow = BareFlow()
crewai_event_bus.emit(
flow,
FlowFinishedEvent(
type="flow_finished",
flow_name="BareFlow",
result="ok",
state={},
),
)
captured = capsys.readouterr().out
assert "flow_finished" in captured
assert "Missing starting event" in captured
def test_finalize_session_restores_flow_started_scope(self, capsys) -> None:
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatch
class ChatFlow(Flow[ChatState]):
conversational_config = ConversationalConfig(
defer_trace_finalization=True
)
@start()
def begin(self) -> str:
return "ok"
flow = ChatFlow()
flow.defer_trace_finalization = True
object.__setattr__(flow, "_conversation_trace_started", True)
object.__setattr__(flow, "_conversation_flow_started_event_id", "start-evt-1")
flow._method_outputs.append("ok")
listener = TraceCollectionListener()
listener.batch_manager.batch_owner_type = "flow"
listener.batch_manager.current_batch = TraceBatch(
execution_metadata={"execution_type": "flow", "flow_name": "ChatFlow"},
)
listener.batch_manager.defer_session_finalization = True
listener.batch_manager._batch_finalized = False
with patch.object(flow, "_finalize_flow_trace_batch") as mock_finalize:
flow.finalize_session_traces()
captured = capsys.readouterr().out
assert "Missing starting event" not in captured
mock_finalize.assert_called_once_with(force=True)
assert listener.batch_manager.defer_session_finalization is False
def test_finalize_batch_is_idempotent(self) -> None:
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatchManager
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
):
bm = TraceBatchManager()
bm.current_batch = bm.initialize_batch(
user_context={"privacy_level": "standard"},
execution_metadata={"execution_type": "flow", "flow_name": "ChatFlow"},
)
bm.trace_batch_id = "batch-idempotent"
bm.backend_initialized = True
with (
patch.object(
bm.plus_api,
"send_trace_events",
return_value=MagicMock(status_code=200),
),
patch.object(
bm.plus_api,
"finalize_trace_batch",
return_value=MagicMock(status_code=200, json=MagicMock(return_value={})),
) as mock_finalize_api,
):
bm.finalize_batch()
bm.finalize_batch()
assert mock_finalize_api.call_count == 1
assert bm._batch_finalized is True
def test_finalize_session_is_idempotent_after_batch_cleared(self) -> None:
class ChatFlow(Flow[ChatState]):
@start()
def begin(self) -> str:
return "ok"
flow = ChatFlow()
flow.defer_trace_finalization = True
object.__setattr__(flow, "_conversation_trace_started", True)
listener = TraceCollectionListener()
listener.batch_manager.current_batch = None
listener.batch_manager.batch_owner_type = None
listener.batch_manager.trace_batch_id = None
listener.batch_manager._batch_finalized = True
with patch.object(flow, "_emit_flow_finished_sync") as mock_finished:
with patch.object(flow, "_finalize_flow_trace_batch") as mock_finalize:
flow.finalize_session_traces()
flow.finalize_session_traces()
mock_finished.assert_not_called()
mock_finalize.assert_not_called()
def test_sigint_skips_deferred_session_batch(self) -> None:
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatch
listener = TraceCollectionListener()
listener.batch_manager.current_batch = TraceBatch()
listener.batch_manager.defer_session_finalization = True
with patch.object(listener.batch_manager, "finalize_batch") as mock_finalize:
if listener.batch_manager.is_batch_initialized():
if not listener.batch_manager.defer_session_finalization:
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
class TestNestedCrewTracing:
def test_is_inside_active_flow_context_when_kickoff_running(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.flow.flow_context import current_flow_id
assert TraceCollectionListener._is_inside_active_flow_context() is False
token = current_flow_id.set("parent-flow-id")
try:
assert TraceCollectionListener._is_inside_active_flow_context() is True
finally:
current_flow_id.reset(token)
def test_nested_crew_completion_skips_finalize(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.flow.flow_context import current_flow_id
listener = TraceCollectionListener()
listener.batch_manager.batch_owner_type = "crew"
token = current_flow_id.set("parent-flow-id")
try:
with patch.object(listener.batch_manager, "finalize_batch") as mock_finalize:
if listener._nested_in_flow_execution():
pass
elif listener.batch_manager.batch_owner_type == "crew":
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()
finally:
current_flow_id.reset(token)
def test_flow_owned_batch_skips_finalize_without_flow_context(self) -> None:
from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener,
)
from crewai.events.listeners.tracing.trace_batch_manager import TraceBatch
listener = TraceCollectionListener()
listener.batch_manager.batch_owner_type = "flow"
listener.batch_manager.current_batch = TraceBatch(
execution_metadata={"execution_type": "flow", "flow_name": "Demo"},
)
with patch.object(listener.batch_manager, "finalize_batch") as mock_finalize:
if listener._nested_in_flow_execution():
pass
elif listener.batch_manager.batch_owner_type == "crew":
listener.batch_manager.finalize_batch()
mock_finalize.assert_not_called()

View File

@@ -624,12 +624,15 @@ def test_handle_streaming_tool_calls_no_available_functions(
],
tools=[get_weather_tool_schema],
)
assert response == ""
assert isinstance(response, list)
assert len(response) == 1
assert response[0].function.name == "get_weather"
assert response[0].function.arguments == '{"location":"New York, NY"}'
assert_event_count(
mock_emit=mock_emit,
expected_stream_chunk=9,
expected_completed_llm_call=1,
expected_completed_llm_call=0,
expected_final_chunk_result='{"location":"New York, NY"}',
)

View File

@@ -884,6 +884,49 @@ class TestTraceListenerSetup:
"test_batch_id_12345", "Internal Server Error"
)
def test_finalize_batch_clears_buffer_after_successful_send(self) -> None:
"""Successful send must not restore a stale event buffer (duplicate events)."""
from crewai.events.listeners.tracing.types import TraceEvent
with patch(
"crewai.events.listeners.tracing.trace_batch_manager.is_tracing_enabled_in_context",
return_value=True,
):
batch_manager = TraceBatchManager()
batch_manager.current_batch = batch_manager.initialize_batch(
user_context={"privacy_level": "standard"},
execution_metadata={
"execution_type": "flow",
"flow_name": "TestFlow",
},
)
batch_manager.trace_batch_id = "batch-clear-test"
batch_manager.backend_initialized = True
batch_manager.event_buffer = [
TraceEvent(
type="llm_call_started",
timestamp="2026-01-01T00:00:00",
event_id="evt-1",
emission_sequence=1,
)
]
with (
patch.object(
batch_manager.plus_api,
"send_trace_events",
return_value=MagicMock(status_code=200),
),
patch.object(
batch_manager.plus_api,
"finalize_trace_batch",
return_value=MagicMock(status_code=200, json=MagicMock(return_value={})),
),
):
batch_manager.finalize_batch()
assert batch_manager.event_buffer == []
def test_ephemeral_batch_includes_anon_id(self):
"""Test that ephemeral batch initialization sends anon_id from get_user_id()"""
fake_user_id = "abc123def456"

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.5a5"
__version__ = "1.14.5"

View File

@@ -3,6 +3,7 @@
from collections.abc import Mapping
import os
from pathlib import Path
import re
import subprocess
import sys
import tempfile
@@ -355,8 +356,19 @@ def update_pyproject_dependencies(
workspace_packages = _DEFAULT_WORKSPACE_PACKAGES + (extra_packages or [])
current_extra: str | None = None
extra_header = re.compile(r"^\s*([A-Za-z0-9_-]+)\s*=\s*\[")
for i, line in enumerate(lines):
match = extra_header.match(line)
if match:
current_extra = match.group(1)
elif line.strip().startswith("]"):
current_extra = None
for pkg in workspace_packages:
if pkg == "crewai-files" and current_extra == "file-processing":
continue
if f"{pkg}==" in line:
stripped = line.lstrip()
indent = line[: len(line) - len(stripped)]
@@ -732,18 +744,23 @@ def _is_prerelease(version: str) -> bool:
return any(indicator in v for indicator in _PRERELEASE_INDICATORS)
def get_commits_from_last_tag(tag_name: str, version: str) -> tuple[str, str]:
def get_commits_from_last_tag(
tag_name: str, version: str, cwd: Path | None = None
) -> tuple[str, str]:
"""Get commits from the last tag, excluding current version.
Args:
tag_name: Current tag name (e.g., "v1.0.0").
version: Current version (e.g., "1.0.0").
cwd: Directory to run git commands in (defaults to current).
Returns:
Tuple of (commit_range, commits) where commits is newline-separated.
"""
try:
all_tags = run_command(["git", "tag", "--sort=-version:refname"]).split("\n")
all_tags = run_command(
["git", "tag", "--sort=-version:refname"], cwd=cwd
).split("\n")
prev_tags = [t for t in all_tags if t and t != tag_name and t != f"v{version}"]
if not _is_prerelease(version):
@@ -752,22 +769,30 @@ def get_commits_from_last_tag(tag_name: str, version: str) -> tuple[str, str]:
if prev_tags:
last_tag = prev_tags[0]
commit_range = f"{last_tag}..HEAD"
commits = run_command(["git", "log", commit_range, "--pretty=format:%s"])
commits = run_command(
["git", "log", commit_range, "--pretty=format:%s"], cwd=cwd
)
else:
commit_range = "HEAD"
commits = run_command(["git", "log", "--pretty=format:%s"])
commits = run_command(["git", "log", "--pretty=format:%s"], cwd=cwd)
except subprocess.CalledProcessError:
commit_range = "HEAD"
commits = run_command(["git", "log", "--pretty=format:%s"])
commits = run_command(["git", "log", "--pretty=format:%s"], cwd=cwd)
return commit_range, commits
def get_github_contributors(commit_range: str) -> list[str]:
def get_github_contributors(
commit_range: str,
repo: str = "crewAIInc/crewAI",
cwd: Path | None = None,
) -> list[str]:
"""Get GitHub usernames from commit range using GitHub API.
Args:
commit_range: Git commit range (e.g., "abc123..HEAD").
repo: GitHub repo in ``owner/name`` form to resolve commits against.
cwd: Directory to run git commands in (defaults to current).
Returns:
List of GitHub usernames sorted alphabetically.
@@ -779,10 +804,10 @@ def get_github_contributors(commit_range: str) -> list[str]:
gh_token = None
g = Github(login_or_token=gh_token) if gh_token else Github()
github_repo = g.get_repo("crewAIInc/crewAI")
github_repo = g.get_repo(repo)
commit_shas = run_command(
["git", "log", commit_range, "--pretty=format:%H"]
["git", "log", commit_range, "--pretty=format:%H"], cwd=cwd
).split("\n")
contributors = set()
@@ -922,9 +947,26 @@ def _generate_release_notes(
version: str,
tag_name: str,
no_edit: bool,
cwd: Path | None = None,
gh_repo: str = "crewAIInc/crewAI",
openai_client: OpenAI | None = None,
bump_already_done: bool = True,
) -> tuple[str, OpenAI, bool]:
"""Generate, display, and optionally edit release notes.
Args:
version: Version being released.
tag_name: Tag name for the release.
no_edit: Skip the interactive edit prompt.
cwd: Directory to run git commands in (defaults to current).
gh_repo: GitHub repo (``owner/name``) for resolving contributors.
openai_client: Reuse an existing OpenAI client if provided.
bump_already_done: True when the ``feat: bump versions to <version>``
commit for the current release is already in history (the real
release path). False in previews where no bump exists yet — the
most recent bump commit is the *previous* version and must be
used as the range start.
Returns:
Tuple of (release_notes, openai_client, is_prerelease).
"""
@@ -939,7 +981,8 @@ def _generate_release_notes(
"log",
"--grep=^feat: bump versions to",
"--format=%H %s",
]
],
cwd=cwd,
)
bump_entries = [
line for line in prev_bump_output.strip().split("\n") if line.strip()
@@ -947,7 +990,8 @@ def _generate_release_notes(
is_stable = not _is_prerelease(version)
prev_commit = None
for entry in bump_entries[1:]:
scan_entries = bump_entries[1:] if bump_already_done else bump_entries
for entry in scan_entries:
bump_ver = entry.split("feat: bump versions to", 1)[-1].strip()
if is_stable and _is_prerelease(bump_ver):
continue
@@ -957,7 +1001,7 @@ def _generate_release_notes(
if prev_commit:
commit_range = f"{prev_commit}..HEAD"
commits = run_command(
["git", "log", commit_range, "--pretty=format:%s"]
["git", "log", commit_range, "--pretty=format:%s"], cwd=cwd
)
commit_lines = [
@@ -967,14 +1011,21 @@ def _generate_release_notes(
]
commits = "\n".join(commit_lines)
else:
commit_range, commits = get_commits_from_last_tag(tag_name, version)
commit_range, commits = get_commits_from_last_tag(
tag_name, version, cwd=cwd
)
except subprocess.CalledProcessError:
commit_range, commits = get_commits_from_last_tag(tag_name, version)
commit_range, commits = get_commits_from_last_tag(
tag_name, version, cwd=cwd
)
github_contributors = get_github_contributors(commit_range)
github_contributors = get_github_contributors(
commit_range, repo=gh_repo, cwd=cwd
)
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if openai_client is None:
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
if commits.strip():
contributors_section = ""
@@ -1532,7 +1583,13 @@ def _wait_for_pr_merged(branch: str, cwd: Path) -> None:
time.sleep(_PR_MERGE_POLL_INTERVAL)
def _release_enterprise(version: str, is_prerelease: bool, dry_run: bool) -> None:
def _release_enterprise(
version: str,
is_prerelease: bool,
dry_run: bool,
no_edit: bool = False,
openai_client: OpenAI | None = None,
) -> None:
"""Clone the enterprise repo, bump versions, and create a release PR.
Expects ENTERPRISE_REPO, ENTERPRISE_VERSION_DIRS, and
@@ -1542,6 +1599,8 @@ def _release_enterprise(version: str, is_prerelease: bool, dry_run: bool) -> Non
version: New version string.
is_prerelease: Whether this is a pre-release version.
dry_run: Show what would be done without making changes.
no_edit: Skip the interactive release-notes edit prompt.
openai_client: Reuse OpenAI client from earlier phases if available.
"""
if (
not _ENTERPRISE_REPO
@@ -1559,7 +1618,6 @@ def _release_enterprise(version: str, is_prerelease: bool, dry_run: bool) -> Non
)
if dry_run:
console.print(f"[dim][DRY RUN][/dim] Would clone {enterprise_repo}")
for d in _ENTERPRISE_VERSION_DIRS:
console.print(f"[dim][DRY RUN][/dim] Would update versions in {d}")
console.print(
@@ -1570,6 +1628,26 @@ def _release_enterprise(version: str, is_prerelease: bool, dry_run: bool) -> Non
"[dim][DRY RUN][/dim] Would create bump PR, wait for merge, "
"then tag and release"
)
with tempfile.TemporaryDirectory() as tmp:
repo_dir = Path(tmp) / enterprise_repo.split("/")[-1]
console.print(f"\nCloning {enterprise_repo} (read-only preview)...")
run_command(["gh", "repo", "clone", enterprise_repo, str(repo_dir)])
console.print(f"[green]✓[/green] Cloned {enterprise_repo}")
_generate_release_notes(
version,
version,
no_edit,
cwd=repo_dir,
gh_repo=enterprise_repo,
openai_client=openai_client,
bump_already_done=False,
)
console.print(
"[dim][DRY RUN][/dim] Would tag and create GitHub release "
"with the notes above"
)
return
with tempfile.TemporaryDirectory() as tmp:
@@ -1682,8 +1760,18 @@ def _release_enterprise(version: str, is_prerelease: bool, dry_run: bool) -> Non
run_command(["git", "pull"], cwd=repo_dir)
tag_name = version
release_notes, _, _ = _generate_release_notes(
version,
tag_name,
no_edit,
cwd=repo_dir,
gh_repo=enterprise_repo,
openai_client=openai_client,
)
run_command(
["git", "tag", "-a", tag_name, "-m", f"Release {version}"],
["git", "tag", "-a", tag_name, "-m", release_notes],
cwd=repo_dir,
)
run_command(["git", "push", "origin", tag_name], cwd=repo_dir)
@@ -1699,7 +1787,7 @@ def _release_enterprise(version: str, is_prerelease: bool, dry_run: bool) -> Non
"--title",
tag_name,
"--notes",
f"Release {version}",
release_notes,
]
if is_prerelease:
gh_cmd.append("--prerelease")
@@ -1998,7 +2086,7 @@ def tag(dry_run: bool, no_edit: bool) -> None:
console.print("[green]✓[/green] main branch up to date")
release_notes, openai_client, is_prerelease = _generate_release_notes(
version, tag_name, no_edit
version, tag_name, no_edit, bump_already_done=True
)
docs_branch = _update_docs_and_create_pr(
@@ -2109,7 +2197,7 @@ def release(
if skip_to_enterprise:
try:
_release_enterprise(version, is_prerelease, dry_run)
_release_enterprise(version, is_prerelease, dry_run, no_edit=no_edit)
except BaseException as e:
_print_release_error(e)
_resume_hint(
@@ -2205,7 +2293,7 @@ def release(
console.print("[green]✓[/green] main branch up to date")
release_notes, openai_client, is_prerelease = _generate_release_notes(
version, tag_name, no_edit
version, tag_name, no_edit, bump_already_done=not dry_run
)
docs_branch = _update_docs_and_create_pr(
@@ -2259,7 +2347,13 @@ def release(
if not skip_enterprise:
try:
_release_enterprise(version, is_prerelease, dry_run)
_release_enterprise(
version,
is_prerelease,
dry_run,
no_edit=no_edit,
openai_client=openai_client,
)
except BaseException as e:
_print_release_error(e)
_resume_hint(

View File

@@ -282,6 +282,25 @@ class TestUpdatePyprojectDependencies:
assert '"crewai-files==2.0.0"' in result
assert '"requests>=2.0"' in result
def test_skips_crewai_files_in_file_processing_extra(self, tmp_path: Path) -> None:
pyproject = tmp_path / "pyproject.toml"
pyproject.write_text(
dedent("""\
[project.optional-dependencies]
file-processing = [
"crewai-files==1.0.0",
]
other = [
"crewai-files==1.0.0",
]
""")
)
update_pyproject_dependencies(pyproject, "2.0.0")
result = pyproject.read_text()
assert '"crewai-files==1.0.0"' in result
assert '"crewai-files==2.0.0"' in result
def test_leaves_bare_crewai_pin_alone(self, tmp_path: Path) -> None:
"""`crewai==` must not collide with `crewai-core==` etc."""
pyproject = tmp_path / "pyproject.toml"

View File

@@ -185,8 +185,10 @@ exclude-newer = "3 days"
# python-multipart <0.0.27 has GHSA-pp6c-gr5w-3c5g (DoS via unbounded multipart headers).
# gitpython <3.1.50 has GHSA-mv93-w799-cj2w (config_writer newline injection bypassing the 3.1.49 patch -> RCE via core.hooksPath).
# urllib3 <2.7.0 has GHSA-qccp-gfcp-xxvc (ProxyManager cross-origin redirect leaks Authorization/Cookie) and GHSA-mf9v-mfxr-j63j (streaming decompression-bomb bypass); force 2.7.0+.
# langsmith <0.7.31 has GHSA-rr7j-v2q5-chgv (streaming token redaction bypass); force 0.7.31+.
# langsmith <0.8.0 has GHSA-3644-q5cj-c5c7 (public prompt manifest deserialization, SSRF/secret disclosure); force 0.8.0+.
# authlib <1.6.11 has GHSA-jj8c-mmj3-mmgv (CSRF bypass in cache-based state storage).
# pip <26.1.1 has GHSA-58qw-9mgm-455v (archive handling); OSV considers 26.1.1 unaffected.
# paramiko <5.0.0 has GHSA-r374-rxx8-8654 (SHA-1 in rsakey.py); OSV considers 5.0.0 unaffected. Transitive via composio-core.
# litellm 1.83.8+ hard-pins openai==2.24.0, missing openai.types.responses used by crewai;
# override to >=2.30.0 (the version litellm 1.83.7 used) until upstream relaxes the pin.
override-dependencies = [
@@ -203,8 +205,10 @@ override-dependencies = [
"uv>=0.11.6,<1",
"python-multipart>=0.0.27,<1",
"gitpython>=3.1.50,<4",
"langsmith>=0.7.31,<0.8",
"langsmith>=0.8.0,<1",
"authlib>=1.6.11",
"pip>=26.1.1",
"paramiko>=5.0.0",
]
[tool.uv.workspace]

30
uv.lock generated
View File

@@ -13,7 +13,7 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-05-08T16:33:02.834109Z"
exclude-newer = "2026-05-17T14:20:01.778505Z"
exclude-newer-span = "P3D"
[manifest]
@@ -31,10 +31,12 @@ overrides = [
{ name = "gitpython", specifier = ">=3.1.50,<4" },
{ name = "langchain-core", specifier = ">=1.3.3,<2" },
{ name = "langchain-text-splitters", specifier = ">=1.1.2,<2" },
{ name = "langsmith", specifier = ">=0.7.31,<0.8" },
{ name = "langsmith", specifier = ">=0.8.0,<1" },
{ name = "onnxruntime", marker = "python_full_version < '3.11'", specifier = "<1.24" },
{ name = "openai", specifier = ">=2.30.0,<3" },
{ name = "paramiko", specifier = ">=5.0.0" },
{ name = "pillow", specifier = ">=12.1.1" },
{ name = "pip", specifier = ">=26.1.1" },
{ name = "pypdf", specifier = ">=6.10.2,<7" },
{ name = "python-multipart", specifier = ">=0.0.27,<1" },
{ name = "rich", specifier = ">=13.7.1" },
@@ -3268,11 +3270,11 @@ wheels = [
[[package]]
name = "idna"
version = "3.11"
version = "3.15"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/6f/6d/0703ccc57f3a7233505399edb88de3cbd678da106337b9fcde432b65ed60/idna-3.11.tar.gz", hash = "sha256:795dafcc9c04ed0c1fb032c2aa73654d8e8c5023a7df64a53f39190ada629902", size = 194582, upload-time = "2025-10-12T14:55:20.501Z" }
sdist = { url = "https://files.pythonhosted.org/packages/82/77/7b3966d0b9d1d31a36ddf1746926a11dface89a83409bf1483f0237aa758/idna-3.15.tar.gz", hash = "sha256:ca962446ea538f7092a95e057da437618e886f4d349216d2b1e294abfdb65fdc", size = 199245, upload-time = "2026-05-12T22:45:57.011Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" },
{ url = "https://files.pythonhosted.org/packages/d2/23/408243171aa9aaba178d3e2559159c24c1171a641aa83b67bdd3394ead8e/idna-3.15-py3-none-any.whl", hash = "sha256:048adeaf8c2d788c40fee287673ccaa74c24ffd8dcf09ffa555a2fbb59f10ac8", size = 72340, upload-time = "2026-05-12T22:45:55.733Z" },
]
[[package]]
@@ -3888,7 +3890,7 @@ sdist = { url = "https://files.pythonhosted.org/packages/0e/72/a3add0e4eec4eb9e2
[[package]]
name = "langsmith"
version = "0.7.32"
version = "0.8.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
@@ -3901,9 +3903,9 @@ dependencies = [
{ name = "xxhash" },
{ name = "zstandard" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2f/b4/a0b4a501bee6b8a741ce29f8c48155b132118483cddc6f9247735ddb38fa/langsmith-0.7.32.tar.gz", hash = "sha256:b59b8e106d0e4c4842e158229296086e2aa7c561e3f602acda73d3ad0062e915", size = 1184518, upload-time = "2026-04-15T23:42:41.885Z" }
sdist = { url = "https://files.pythonhosted.org/packages/de/8a/1e8ea5e8bab2a65fa95bd36229ef38e8723ec46e430e20ca2d953487a7f1/langsmith-0.8.3.tar.gz", hash = "sha256:767ff7a8d136ed42926bf99059ac631dc6883542d6e3104b32e71c7625e1fa05", size = 4460330, upload-time = "2026-05-07T19:56:56.18Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/62/bc/148f98ac7dad73ac5e1b1c985290079cfeeb9ba13d760a24f25002beb2c9/langsmith-0.7.32-py3-none-any.whl", hash = "sha256:e1fde928990c4c52f47dc5132708cec674355d9101723d564183e965f383bf5f", size = 378272, upload-time = "2026-04-15T23:42:39.905Z" },
{ url = "https://files.pythonhosted.org/packages/98/a9/51e644c1f1dbc3dd7d22dfd6412eab206d538c81e024e4f287373544bdcb/langsmith-0.8.3-py3-none-any.whl", hash = "sha256:b2e40e308222fa0beb2dccee3b4b30bfee9062d7a4f20a3e3e93df3c51a08ab4", size = 399048, upload-time = "2026-05-07T19:56:53.994Z" },
]
[[package]]
@@ -5788,7 +5790,7 @@ wheels = [
[[package]]
name = "paramiko"
version = "4.0.0"
version = "5.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "bcrypt" },
@@ -5796,9 +5798,9 @@ dependencies = [
{ name = "invoke" },
{ name = "pynacl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/1f/e7/81fdcbc7f190cdb058cffc9431587eb289833bdd633e2002455ca9bb13d4/paramiko-4.0.0.tar.gz", hash = "sha256:6a25f07b380cc9c9a88d2b920ad37167ac4667f8d9886ccebd8f90f654b5d69f", size = 1630743, upload-time = "2025-08-04T01:02:03.711Z" }
sdist = { url = "https://files.pythonhosted.org/packages/62/93/dcc25d52f49022ae6175d15e6bd751f1acc99b98bc61fc55e5155a7be2e7/paramiko-5.0.0.tar.gz", hash = "sha256:36763b5b95c2a0dcfdf1abc48e48156ee425b21efe2f0e787c2dd5a95c0e5e79", size = 1548586, upload-time = "2026-05-09T18:28:52.256Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a9/90/a744336f5af32c433bd09af7854599682a383b37cfd78f7de263de6ad6cb/paramiko-4.0.0-py3-none-any.whl", hash = "sha256:0e20e00ac666503bf0b4eda3b6d833465a2b7aff2e2b3d79a8bba5ef144ee3b9", size = 223932, upload-time = "2025-08-04T01:02:02.029Z" },
{ url = "https://files.pythonhosted.org/packages/82/5b/eadf6d45de38d30ab603f49393b6cd2cbe7e233af8cf90197e32782b68a9/paramiko-5.0.0-py3-none-any.whl", hash = "sha256:b7044611c30140d9a75261653210e2002977b71a0497ff3ba0d98d7edbf62f7c", size = 208919, upload-time = "2026-05-09T18:28:50.295Z" },
]
[[package]]
@@ -6060,11 +6062,11 @@ wheels = [
[[package]]
name = "pip"
version = "26.1"
version = "26.1.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/73/7e/d2b04004e1068ad4fdfa2f227b839b5d03e602e47cdbbf49de71137c9546/pip-26.1.tar.gz", hash = "sha256:81e13ebcca3ffa8cc85e4deff5c27e1ee26dea0aa7fc2f294a073ac208806ff3", size = 1840316, upload-time = "2026-04-26T21:00:05.406Z" }
sdist = { url = "https://files.pythonhosted.org/packages/b6/48/cb9b7a682f6fe01a4221e1728941dd4ac3cd9090a17db3779d6ff490b602/pip-26.1.1.tar.gz", hash = "sha256:d36762751d156a4ee895de8af39aa0abeeeb577f93a2eca6ab62467bbf0f8a78", size = 1840400, upload-time = "2026-05-04T19:02:21.248Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/70/7a/be4bd8bcbb24ea475856dd68159d78b03b2bb53dae369f69c9606b8888f5/pip-26.1-py3-none-any.whl", hash = "sha256:4e8486d821d814b77319acb7b9e8bf5a4ee7590a643e7cb21029f209be8573c1", size = 1812804, upload-time = "2026-04-26T21:00:03.194Z" },
{ url = "https://files.pythonhosted.org/packages/3a/eb/fea4d1d51c49832120f7f285d07306db3960f423a2612c6057caf3e8196f/pip-26.1.1-py3-none-any.whl", hash = "sha256:99cb1c2899893b075ff56e4ed0af55669a955b49ad7fb8d8603ecdaf4ed653fb", size = 1812777, upload-time = "2026-05-04T19:02:18.9Z" },
]
[[package]]