Compare commits

...

16 Commits

Author SHA1 Message Date
Devin AI
d633aaa5fa fix(hitl): log pre-review failures and add learn_strict mode
Closes #5725.

In _pre_review_with_lessons (lib/crewai/src/crewai/flow/human_feedback.py),
a broad except Exception: return method_output silently swallowed any
LLM, network, auth, or structured-output failure during the pre-review
step. Callers could not distinguish pre-reviewed output from raw output,
and there was no log or event surfaced.

Changes:
- Add a module logger.
- Narrow the try/except in _pre_review_with_lessons so the mem is None
  and not matches short-circuits stay outside the failure path (those
  are not error cases).
- On recall or LLM pre-review failure, log a WARNING with exc_info=True
  so the silent fallback is observable. Continue to fall back to the raw
  method_output so the flow does not break.
- Add an opt-in learn_strict=True parameter on the human_feedback
  decorator and HumanFeedbackConfig that re-raises pre-review failures
  instead of falling back, for callers that need fail-closed behavior.
- Update the Graceful degradation docs section to reflect the new
  observable-by-default behavior and document learn_strict.

Tests (lib/crewai/tests/test_human_feedback_decorator.py):
- New TestHumanFeedbackPreReviewFailure class with 7 tests covering
  WARNING logging on LLM and recall failures, learn_strict propagation
  in both sync and async paths, and config introspection of the new
  learn_strict flag.

Co-Authored-By: João <joao@crewai.com>
2026-05-06 04:25:15 +00:00
iris-clawd
ec8a522c2c fix: correct status endpoint path from /{kickoff_id}/status to /status/{kickoff_id}
Some checks failed
Check Documentation Broken Links / Check broken links (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2026-05-05 07:29:49 +08:00
Greyson LaLonde
e25f6538a8 fix(deps): bump gitpython to >=3.1.47 for GHSA-rpm5-65cw-6hj4
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2026-05-04 23:44:28 +08:00
Greyson LaLonde
470d4035db docs: update changelog and version for v1.14.5a2 2026-05-04 23:04:56 +08:00
Greyson LaLonde
57d1b338f7 feat: bump versions to 1.14.5a2 2026-05-04 22:58:06 +08:00
huang yutong
01df19b029 fix(a2a): always restore task.output_pydantic in finally block
In `_execute_task_with_a2a` and its async variant, the try body
sets `task.output_pydantic = None` before returning an A2A
response. The finally block then checks
`if task.output_pydantic is not None` before restoring the
original value — but since it was just set to None, the condition
is always False and the original value is never restored. This
permanently mutates the Task object.

Remove the guard so `output_pydantic` is unconditionally restored,
matching the unconditional restoration of `description` and
`response_model` in the same block.

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 22:41:04 +08:00
Rip&Tear
dca2c3160f chore: update security reporting instructions
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 22:31:35 +08:00
Greyson LaLonde
6494d68ffc fix(gemini): include thoughts_token_count in completion tokens 2026-05-04 21:03:38 +08:00
Greyson LaLonde
f579aa53ae fix: preserve task outputs across async batch flush 2026-05-04 20:24:24 +08:00
minasami-pr
a23e118b11 fix: forward kwargs to loader calls in CrewAIRagAdapter
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 19:52:24 +08:00
Greyson LaLonde
095f796922 fix: prevent result_as_answer from returning hook-block message as final answer 2026-05-04 19:42:07 +08:00
Zamuldinov Nikita
bfbdba426f fix: prevent result_as_answer from returning error as final answer
When a tool with result_as_answer=True raises an exception, the agent
was receiving result_as_answer=True and returning the error string as
the final answer. Now we set result_as_answer=False when an error event
is emitted, allowing the agent to reflect and retry.

Fixes crewAIInc/crewAI#5156

---------

Co-authored-by: NIK-TIGER-BILL <nik.tiger.bill@github.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 19:28:21 +08:00
Greyson LaLonde
a058a3b15b fix(task): use acall for output conversion in async paths 2026-05-04 18:42:12 +08:00
Greyson LaLonde
184c228ae9 fix: prevent shared LLM stop words mutation across agents
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-05-04 14:23:17 +08:00
Greyson LaLonde
c9100cb51d docs(devtools): document additional env vars
Some checks failed
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
2026-05-03 14:50:44 +08:00
Greyson LaLonde
17e82743f6 fix: handle BaseModel input in convert_to_model 2026-05-03 14:17:03 +08:00
51 changed files with 1459 additions and 239 deletions

5
.github/security.md vendored
View File

@@ -5,7 +5,10 @@ CrewAI ecosystem.
### How to Report
Please submit reports to **crewai-vdp-ess@submit.bugcrowd.com**
Please submit reports through one of the following channels:
- **crewai-vdp-ess@submit.bugcrowd.com**
- https://security.crewai.com
- **Please do not** disclose vulnerabilities via public GitHub issues, pull requests,
or social media

View File

@@ -26,7 +26,7 @@ mode: "wide"
</Step>
<Step title="مراقبة التقدم">
استخدم `GET /{kickoff_id}/status` للتحقق من حالة التنفيذ واسترجاع النتائج.
استخدم `GET /status/{kickoff_id}` للتحقق من حالة التنفيذ واسترجاع النتائج.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **الاكتشاف**: استدعِ `GET /inputs` لفهم ما يحتاجه طاقمك
2. **التنفيذ**: أرسل المدخلات عبر `POST /kickoff` لبدء المعالجة
3. **المراقبة**: استعلم عن `GET /{kickoff_id}/status` حتى الاكتمال
3. **المراقبة**: استعلم عن `GET /status/{kickoff_id}` حتى الاكتمال
4. **النتائج**: استخرج المخرجات النهائية من الاستجابة المكتملة
## معالجة الأخطاء

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "الحصول على حالة التنفيذ"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="4 مايو 2026">
## v1.14.5a2
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## ما الذي تغير
### إصلاحات الأخطاء
- إصلاح استعادة مخرجات المهام في كتلة finally
- تضمين `thoughts_token_count` في رموز الإكمال
- الحفاظ على مخرجات المهام عبر تفريغ دفعات غير متزامنة
- تمرير kwargs إلى استدعاءات المحمل في `CrewAIRagAdapter`
- منع `result_as_answer` من إرجاع رسالة كتلة الخطاف كإجابة نهائية
- منع `result_as_answer` من إرجاع خطأ كإجابة نهائية
- استخدام `acall` لتحويل المخرجات في المسارات غير المتزامنة
- منع تغيير كلمات التوقف المشتركة في LLM عبر الوكلاء
- التعامل مع مدخلات `BaseModel` في `convert_to_model`
### الوثائق
- توثيق متغيرات البيئة الإضافية
- تحديث سجل التغييرات والإصدار لـ v1.14.5a1
## المساهمون
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="1 مايو 2026">
## v1.14.5a1

View File

@@ -26,7 +26,7 @@ Welcome to the CrewAI AMP API reference. This API allows you to programmatically
</Step>
<Step title="Monitor Progress">
Use `GET /{kickoff_id}/status` to check execution status and retrieve results.
Use `GET /status/{kickoff_id}` to check execution status and retrieve results.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Replace `your-crew-name` with your actual crew's URL from the dashboard.
1. **Discovery**: Call `GET /inputs` to understand what your crew needs
2. **Execution**: Submit inputs via `POST /kickoff` to start processing
3. **Monitoring**: Poll `GET /{kickoff_id}/status` until completion
3. **Monitoring**: Poll `GET /status/{kickoff_id}` until completion
4. **Results**: Extract the final output from the completed response
## Error Handling

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "Get execution status"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="May 04, 2026">
## v1.14.5a2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## What's Changed
### Bug Fixes
- Fix task output restoration in finally block
- Include `thoughts_token_count` in completion tokens
- Preserve task outputs across async batch flush
- Forward kwargs to loader calls in `CrewAIRagAdapter`
- Prevent `result_as_answer` from returning hook-block message as final answer
- Prevent `result_as_answer` from returning error as final answer
- Use `acall` for output conversion in async paths
- Prevent shared LLM stop words mutation across agents
- Handle `BaseModel` input in `convert_to_model`
### Documentation
- Document additional environment variables
- Update changelog and version for v1.14.5a1
## Contributors
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="May 01, 2026">
## v1.14.5a1

View File

@@ -682,6 +682,7 @@ class ArticleReviewFlow(Flow):
| Parameter | Default | Description |
|-----------|---------|-------------|
| `learn` | `False` | Enable HITL learning |
| `learn_strict` | `False` | If `True`, pre-review failures are re-raised instead of falling back to raw output |
| `learn_limit` | `5` | Max past lessons to recall for pre-review |
### Key Design Decisions
@@ -689,7 +690,8 @@ class ArticleReviewFlow(Flow):
- **Same LLM for everything**: The `llm` parameter on the decorator is shared by outcome collapsing, lesson distillation, and pre-review. No need to configure multiple models.
- **Structured output**: Both distillation and pre-review use function calling with Pydantic models when the LLM supports it, falling back to text parsing otherwise.
- **Non-blocking storage**: Lessons are stored via `remember_many()` which runs in a background thread -- the flow continues immediately.
- **Graceful degradation**: If the LLM fails during distillation, nothing is stored. If it fails during pre-review, the raw output is shown. Neither failure blocks the flow.
- **Observable graceful degradation**: If the LLM fails during distillation, nothing is stored. If it fails during pre-review, the raw output is shown to the human and the failure is logged at `WARNING` level with the full traceback (`exc_info=True`) under the `crewai.flow.human_feedback` logger -- so the silent fallback is detectable. Neither failure blocks the flow.
- **Strict mode**: Pass `learn_strict=True` to make pre-review fail closed -- failures (LLM error, network/auth error, structured-output parse error, memory `recall` error) propagate out of the flow method instead of being swallowed. Use this when downstream code must be able to assume that pre-review actually executed.
- **No scope/categories needed**: When storing lessons, only `source` is passed. The encoding pipeline infers scope, categories, and importance automatically.
<Note>

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /{kickoff_id}/status`
3. **Monitor progress** using `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: Get Execution Status
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /{kickoff_id}/status`
3. **Monitor progress** using `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: Get Execution Status
description: |

View File

@@ -84,7 +84,7 @@ paths:
'500':
$ref: '#/components/responses/ServerError'
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: 실행 상태 조회
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Descubra os inputs** usando `GET /inputs`
2. **Inicie a execução** usando `POST /kickoff`
3. **Monitore o progresso** usando `GET /{kickoff_id}/status`
3. **Monitore o progresso** usando `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Suporte
@@ -120,7 +120,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: Obter Status da Execução
description: |

View File

@@ -26,7 +26,7 @@ CrewAI 엔터프라이즈 API 참고 자료에 오신 것을 환영합니다.
</Step>
<Step title="진행 상황 모니터링">
`GET /{kickoff_id}/status`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
`GET /status/{kickoff_id}`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **탐색**: `GET /inputs`를 호출하여 crew가 필요한 것을 파악합니다.
2. **실행**: `POST /kickoff`를 통해 입력값을 제출하여 처리를 시작합니다.
3. **모니터링**: 완료될 때까지 `GET /{kickoff_id}/status`를 주기적으로 조회합니다.
3. **모니터링**: 완료될 때까지 `GET /status/{kickoff_id}`를 주기적으로 조회합니다.
4. **결과**: 완료된 응답에서 최종 출력을 추출합니다.
## 오류 처리

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "실행 상태 조회"
openapi: "/enterprise-api.ko.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.ko.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 5월 4일">
## v1.14.5a2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## 변경 사항
### 버그 수정
- finally 블록에서 작업 출력 복원 수정
- 완료 토큰에 `thoughts_token_count` 포함
- 비동기 배치 플러시 간 작업 출력 보존
- `CrewAIRagAdapter`의 로더 호출에 kwargs 전달
- `result_as_answer`가 후크 차단 메시지를 최종 답변으로 반환하지 않도록 방지
- `result_as_answer`가 오류를 최종 답변으로 반환하지 않도록 방지
- 비동기 경로에서 출력 변환을 위해 `acall` 사용
- 에이전트 간 공유 LLM 중지 단어 변형 방지
- `convert_to_model`에서 `BaseModel` 입력 처리
### 문서화
- 추가 환경 변수 문서화
- v1.14.5a1에 대한 변경 로그 및 버전 업데이트
## 기여자
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="2026년 5월 1일">
## v1.14.5a1

View File

@@ -26,7 +26,7 @@ Bem-vindo à referência da API do CrewAI AMP. Esta API permite que você intera
</Step>
<Step title="Monitore o Progresso">
Use `GET /{kickoff_id}/status` para checar o status da execução e recuperar os resultados.
Use `GET /status/{kickoff_id}` para checar o status da execução e recuperar os resultados.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Substitua `your-crew-name` pela URL real do seu crew no painel.
1. **Descoberta**: Chame `GET /inputs` para entender o que seu crew precisa
2. **Execução**: Envie os inputs via `POST /kickoff` para iniciar o processamento
3. **Monitoramento**: Faça polling em `GET /{kickoff_id}/status` até a conclusão
3. **Monitoramento**: Faça polling em `GET /status/{kickoff_id}` até a conclusão
4. **Resultados**: Extraia o output final da resposta concluída
## Tratamento de Erros

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "Obter o status da execução"
openapi: "/enterprise-api.pt-BR.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.pt-BR.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="04 mai 2026">
## v1.14.5a2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## O que Mudou
### Correções de Bugs
- Corrigir a restauração da saída da tarefa no bloco finally
- Incluir `thoughts_token_count` nos tokens de conclusão
- Preservar as saídas das tarefas durante o descarregamento assíncrono em lote
- Encaminhar kwargs para chamadas de carregador em `CrewAIRagAdapter`
- Impedir que `result_as_answer` retorne mensagem de bloqueio de hook como resposta final
- Impedir que `result_as_answer` retorne erro como resposta final
- Usar `acall` para conversão de saída em caminhos assíncronos
- Prevenir a mutação de palavras de parada compartilhadas do LLM entre agentes
- Lidar com entrada `BaseModel` em `convert_to_model`
### Documentação
- Documentar variáveis de ambiente adicionais
- Atualizar changelog e versão para v1.14.5a1
## Contribuidores
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="01 mai 2026">
## v1.14.5a1

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.5a1",
"crewai==1.14.5a2",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",
@@ -107,7 +107,7 @@ stagehand = [
"stagehand>=0.4.1",
]
github = [
"gitpython>=3.1.41,<4",
"gitpython>=3.1.47,<4",
"PyGithub==1.59.1",
]
rag = [

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"

View File

@@ -268,7 +268,9 @@ class CrewAIRagAdapter(Adapter):
file_chunker = file_data_type.get_chunker()
file_source = SourceContent(file_path)
file_result: LoaderResult = file_loader.load(file_source)
file_result: LoaderResult = file_loader.load(
file_source, **kwargs
)
file_chunks = file_chunker.chunk(file_result.content)
@@ -319,7 +321,7 @@ class CrewAIRagAdapter(Adapter):
loader = data_type.get_loader()
chunker = data_type.get_chunker()
loader_result: LoaderResult = loader.load(source_content)
loader_result: LoaderResult = loader.load(source_content, **kwargs)
chunks = chunker.chunk(loader_result.content)

View File

@@ -55,7 +55,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.5a1",
"crewai-tools==1.14.5a2",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -386,8 +386,7 @@ def _execute_task_with_a2a(
return raw_result
finally:
task.description = original_description
if task.output_pydantic is not None:
task.output_pydantic = original_output_pydantic
task.output_pydantic = original_output_pydantic
task.response_model = original_response_model
@@ -1534,8 +1533,7 @@ async def _aexecute_task_with_a2a(
return raw_result
finally:
task.description = original_description
if task.output_pydantic is not None:
task.output_pydantic = original_output_pydantic
task.output_pydantic = original_output_pydantic
task.response_model = original_response_model

View File

@@ -1102,16 +1102,6 @@ class Agent(BaseAgent):
self.agent_executor.tools_handler = self.tools_handler
self.agent_executor.request_within_rpm_limit = rpm_limit_fn
if isinstance(self.agent_executor.llm, BaseLLM):
existing_stop = getattr(self.agent_executor.llm, "stop", [])
self.agent_executor.llm.stop = list(
set(
existing_stop + stop_words
if isinstance(existing_stop, list)
else stop_words
)
)
def get_delegation_tools(self, agents: Sequence[BaseAgent]) -> list[BaseTool]:
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()

View File

@@ -49,6 +49,7 @@ from crewai.hooks.tool_hooks import (
)
from crewai.types.callback import SerializableCallable
from crewai.utilities.agent_utils import (
_llm_stop_words_applied,
aget_llm_response,
convert_tools_to_openai_schema,
enforce_rpm_limit,
@@ -141,15 +142,6 @@ class CrewAgentExecutor(BaseAgentExecutor):
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
if not self.after_llm_call_hooks:
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
if self.llm and not isinstance(self.llm, str):
existing_stop = getattr(self.llm, "stop", [])
self.llm.stop = list(
set(
existing_stop + self.stop
if isinstance(existing_stop, list)
else self.stop
)
)
@property
def use_stop_words(self) -> bool:
@@ -210,21 +202,22 @@ class CrewAgentExecutor(BaseAgentExecutor):
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
try:
formatted_answer = self._invoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
with _llm_stop_words_applied(self.llm, self):
try:
formatted_answer = self._invoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._save_to_memory(formatted_answer)
return {"output": formatted_answer.output}
@@ -1082,21 +1075,22 @@ class CrewAgentExecutor(BaseAgentExecutor):
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
try:
formatted_answer = await self._ainvoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
with _llm_stop_words_applied(self.llm, self):
try:
formatted_answer = await self._ainvoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
if self.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(formatted_answer)
if self.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(formatted_answer)
self._save_to_memory(formatted_answer)
return {"output": formatted_answer.output}

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.5a1"
"crewai[tools]==1.14.5a2"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.5a1"
"crewai[tools]==1.14.5a2"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.5a1"
"crewai[tools]==1.14.5a2"
]
[tool.crewai]

View File

@@ -1283,8 +1283,8 @@ class Crew(FlowTrackable, BaseModel):
pending_tasks.append((task, async_task, task_index))
else:
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(
pending_tasks, was_replayed
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
pending_tasks.clear()
@@ -1299,7 +1299,9 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
return self._create_crew_output(task_outputs)
@@ -1313,7 +1315,9 @@ class Crew(FlowTrackable, BaseModel):
) -> TaskOutput | None:
"""Handle conditional task evaluation using native async."""
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
pending_tasks.clear()
return check_conditional_skip(
@@ -1489,7 +1493,9 @@ class Crew(FlowTrackable, BaseModel):
futures.append((task, future, task_index))
else:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(
self._process_async_tasks(futures, was_replayed)
)
futures.clear()
context = self._get_context(task, task_outputs)
@@ -1503,7 +1509,7 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
return self._create_crew_output(task_outputs)
@@ -1516,7 +1522,7 @@ class Crew(FlowTrackable, BaseModel):
was_replayed: bool,
) -> TaskOutput | None:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
futures.clear()
return check_conditional_skip(

View File

@@ -71,6 +71,7 @@ from crewai.hooks.types import (
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.agent_utils import (
_llm_stop_words_applied,
check_native_tool_support,
enforce_rpm_limit,
extract_tool_call_info,
@@ -215,12 +216,6 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
if self.llm:
existing_stop = getattr(self.llm, "stop", [])
if not isinstance(existing_stop, list):
existing_stop = []
self.llm.stop = list(set(existing_stop + self.stop_words))
self._state = AgentExecutorState()
self.max_method_calls = self.max_iter * 10
@@ -2601,17 +2596,18 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
inputs.get("ask_for_human_input", False)
)
self.kickoff()
with _llm_stop_words_applied(self.llm, self):
self.kickoff()
formatted_answer = self.state.current_answer
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._save_to_memory(formatted_answer)
@@ -2691,18 +2687,20 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
inputs.get("ask_for_human_input", False)
)
# Use async kickoff directly since we're already in an async context
await self.kickoff_async()
with _llm_stop_words_applied(self.llm, self):
await self.kickoff_async()
formatted_answer = self.state.current_answer
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(formatted_answer)
if self.state.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(
formatted_answer
)
self._save_to_memory(formatted_answer)

View File

@@ -60,6 +60,7 @@ from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from datetime import datetime
from functools import wraps
import logging
from typing import TYPE_CHECKING, Any, TypeVar
from pydantic import BaseModel, Field
@@ -75,6 +76,8 @@ if TYPE_CHECKING:
F = TypeVar("F", bound=Callable[..., Any])
logger = logging.getLogger(__name__)
def _serialize_llm_for_context(llm: Any) -> dict[str, Any] | str | None:
"""Serialize a BaseLLM object to a dict preserving full config.
@@ -188,6 +191,7 @@ class HumanFeedbackConfig:
provider: HumanFeedbackProvider | None = None
learn: bool = False
learn_source: str = "hitl"
learn_strict: bool = False
class HumanFeedbackMethod(FlowMethod[Any, Any]):
@@ -237,6 +241,7 @@ def human_feedback(
provider: HumanFeedbackProvider | None = None,
learn: bool = False,
learn_source: str = "hitl",
learn_strict: bool = False,
) -> Callable[[F], F]:
"""Decorator for Flow methods that require human feedback.
@@ -275,6 +280,20 @@ def human_feedback(
external systems like Slack, Teams, or webhooks. When the
provider raises HumanFeedbackPending, the flow pauses and
can be resumed later with Flow.resume().
learn: When True, enables HITL learning. After feedback is
collected, the LLM distills generalizable lessons and stores
them in memory. Before the next review, past lessons are
recalled and applied via an LLM pre-review step so the human
sees a progressively improved output.
learn_source: The memory ``source`` tag used when storing and
recalling HITL lessons. Defaults to ``"hitl"``.
learn_strict: When True (default False), pre-review failures are
re-raised instead of falling back to the raw output. By
default, failures are logged at WARNING level with full
traceback (``exc_info=True``) and the raw method output is
shown to the human. Set this to True if downstream callers
must be able to assume that pre-review actually executed
successfully.
Returns:
A decorator function that wraps the method with human feedback
@@ -373,16 +392,40 @@ def human_feedback(
def _pre_review_with_lessons(
flow_instance: Flow[Any], method_output: Any
) -> Any:
"""Recall past HITL lessons and use LLM to pre-review the output."""
try:
mem = flow_instance.memory
if mem is None:
return method_output
query = f"human feedback lessons for {func.__name__}: {method_output!s}"
matches = mem.recall(query, source=learn_source)
if not matches:
return method_output
"""Recall past HITL lessons and use LLM to pre-review the output.
Returns the original ``method_output`` when memory is unavailable
or no lessons match — these are not error cases.
When the recall or LLM pre-review call raises an exception (for
example LLM/network/auth failure or structured-output parse
error), the failure is logged at WARNING level with full
traceback (``exc_info=True``) so callers can detect the silent
fallback. When ``learn_strict=True`` was passed to the decorator,
the exception is re-raised instead of being swallowed.
"""
mem = flow_instance.memory
if mem is None:
return method_output
query = f"human feedback lessons for {func.__name__}: {method_output!s}"
try:
matches = mem.recall(query, source=learn_source)
except Exception:
logger.warning(
"HITL pre-review: memory recall failed for %s; falling "
"back to raw output.",
func.__name__,
exc_info=True,
)
if learn_strict:
raise
return method_output
if not matches:
return method_output
try:
lessons = "\n".join(f"- {m.record.content}" for m in matches)
llm_inst = _resolve_llm_instance()
prompt = _get_hitl_prompt("hitl_pre_review_user").format(
@@ -404,7 +447,14 @@ def human_feedback(
reviewed = llm_inst.call(messages)
return reviewed if isinstance(reviewed, str) else str(reviewed)
except Exception:
return method_output # fallback to raw output on any failure
logger.warning(
"HITL pre-review failed for %s; falling back to raw output.",
func.__name__,
exc_info=True,
)
if learn_strict:
raise
return method_output
def _distill_and_store_lessons(
flow_instance: Flow[Any], method_output: Any, raw_feedback: str
@@ -654,6 +704,7 @@ def human_feedback(
provider=provider,
learn=learn,
learn_source=learn_source,
learn_strict=learn_strict,
)
wrapper.__is_flow_method__ = True

View File

@@ -688,7 +688,9 @@ class LLM(BaseLLM):
"temperature": self.temperature,
"top_p": self.top_p,
"n": self.n,
"stop": (self.stop or None) if self.supports_stop_words() else None,
"stop": (self.stop_sequences or None)
if self.supports_stop_words()
else None,
"max_tokens": self.max_tokens or self.max_completion_tokens,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,

View File

@@ -72,6 +72,9 @@ _JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTAL
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
_call_stop_override_var: contextvars.ContextVar[dict[int, list[str]] | None] = (
contextvars.ContextVar("_call_stop_override_var", default=None)
)
@contextmanager
@@ -85,6 +88,31 @@ def llm_call_context() -> Generator[str, None, None]:
_current_call_id.reset(token)
@contextmanager
def call_stop_override(
llm: BaseLLM, stop: list[str] | None
) -> Generator[None, None, None]:
"""Override the stop list for ``llm`` within the current call scope.
Only ``llm``'s reads via :attr:`BaseLLM.stop_sequences` see ``stop``;
other LLM instances (e.g. an agent's ``function_calling_llm``) keep their
own ``stop`` field. Passing ``None`` clears any prior override for ``llm``
in the same scope. The instance-level ``stop`` field is never mutated,
so the override is safe under concurrent execution.
"""
current = _call_stop_override_var.get()
new_overrides: dict[int, list[str]] = dict(current) if current else {}
if stop is None:
new_overrides.pop(id(llm), None)
else:
new_overrides[id(llm)] = stop
token = _call_stop_override_var.set(new_overrides)
try:
yield
finally:
_call_stop_override_var.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
@@ -158,11 +186,18 @@ class BaseLLM(BaseModel, ABC):
@property
def stop_sequences(self) -> list[str]:
"""Alias for ``stop`` — kept for backward compatibility with provider APIs.
"""Stop list active for the current call.
Writes are handled by ``__setattr__``, which normalizes and redirects
``stop_sequences`` assignments to the ``stop`` field.
Returns the per-instance override set via :func:`call_stop_override`
when one is in effect for this LLM; otherwise the instance-level
``stop`` field. Kept under this name for backward compatibility with
provider APIs that already read ``stop_sequences``.
"""
overrides = _call_stop_override_var.get()
if overrides is not None:
override = overrides.get(id(self))
if override is not None:
return override
return self.stop
_token_usage: dict[str, int] = PrivateAttr(
@@ -341,7 +376,7 @@ class BaseLLM(BaseModel, ABC):
Returns:
True if stop words are configured and can be applied
"""
return bool(self.stop)
return bool(self.stop_sequences)
def _apply_stop_words(self, content: str) -> str:
"""Apply stop words to truncate response content.
@@ -363,14 +398,14 @@ class BaseLLM(BaseModel, ABC):
>>> llm._apply_stop_words(response)
"I need to search.\\n\\nAction: search"
"""
if not self.stop or not content:
stops = self.stop_sequences
if not stops or not content:
return content
# Find the earliest occurrence of any stop word
earliest_stop_pos = len(content)
found_stop_word = None
for stop_word in self.stop:
for stop_word in stops:
stop_pos = content.find(stop_word)
if stop_pos != -1 and stop_pos < earliest_stop_pos:
earliest_stop_pos = stop_pos

View File

@@ -679,8 +679,9 @@ class AzureCompletion(BaseLLM):
params["presence_penalty"] = self.presence_penalty
if self.max_tokens is not None:
params["max_tokens"] = self.max_tokens
if self.stop and self.supports_stop_words():
params["stop"] = self.stop
stops = self.stop_sequences
if stops and self.supports_stop_words():
params["stop"] = stops
# Handle tools/functions for Azure OpenAI models
if tools and self.is_openai_model:

View File

@@ -1328,9 +1328,11 @@ class GeminiCompletion(BaseLLM):
usage = response.usage_metadata
cached_tokens = getattr(usage, "cached_content_token_count", 0) or 0
thinking_tokens = getattr(usage, "thoughts_token_count", 0) or 0
candidates_tokens = getattr(usage, "candidates_token_count", 0) or 0
result: dict[str, Any] = {
"prompt_token_count": getattr(usage, "prompt_token_count", 0),
"candidates_token_count": getattr(usage, "candidates_token_count", 0),
"candidates_token_count": candidates_tokens,
"completion_tokens": candidates_tokens + thinking_tokens,
"total_token_count": getattr(usage, "total_token_count", 0),
"total_tokens": getattr(usage, "total_token_count", 0),
"cached_prompt_tokens": cached_tokens,

View File

@@ -53,7 +53,11 @@ from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.converter import (
Converter,
async_convert_to_model,
convert_to_model,
)
from crewai.utilities.file_store import (
clear_task_files,
get_all_files,
@@ -681,7 +685,7 @@ class Task(BaseModel):
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
else:
raw = result
pydantic_output, json_output = None, None
@@ -1110,7 +1114,7 @@ Follow these guidelines:
)
def _export_output(
self, result: str
self, result: str | BaseModel
) -> tuple[BaseModel | None, dict[str, Any] | None]:
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
@@ -1123,19 +1127,44 @@ Follow these guidelines:
self.agent,
self.converter_cls,
)
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
pydantic_output, json_output = self._unpack_model_output(model_output)
return pydantic_output, json_output
async def _aexport_output(
self, result: str | BaseModel
) -> tuple[BaseModel | None, dict[str, Any] | None]:
"""Async equivalent of ``_export_output`` — uses ``acall`` so the event loop is not blocked."""
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
if self.output_pydantic or self.output_json:
model_output = await async_convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
pydantic_output, json_output = self._unpack_model_output(model_output)
return pydantic_output, json_output
@staticmethod
def _unpack_model_output(
model_output: dict[str, Any] | BaseModel | str,
) -> tuple[BaseModel | None, dict[str, Any] | None]:
if isinstance(model_output, BaseModel):
return model_output, None
if isinstance(model_output, dict):
return None, model_output
if isinstance(model_output, str):
try:
return None, json.loads(model_output)
except json.JSONDecodeError:
return None, None
return None, None
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
@@ -1364,7 +1393,7 @@ Follow these guidelines:
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
pydantic_output, json_output = await self._aexport_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
@@ -1421,7 +1450,7 @@ Follow these guidelines:
json_output = None
else:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
task_output = TaskOutput(
name=self.name or self.description,

View File

@@ -1,8 +1,9 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from collections.abc import Callable, Iterator, Sequence
import concurrent.futures
import contextlib
import contextvars
from dataclasses import dataclass, field
from datetime import datetime
@@ -22,7 +23,7 @@ from crewai.agents.parser import (
parse,
)
from crewai.cli.config import Settings
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, call_stop_override
from crewai.tools import BaseTool as CrewAITool
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
@@ -238,6 +239,38 @@ def extract_task_section(text: str) -> str:
return text
def _executor_stop_words(
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
) -> list[str]:
"""Return the executor's stop words, regardless of which field name it uses."""
if executor_context is None:
return []
stops = getattr(executor_context, "stop", None)
if stops is None:
stops = getattr(executor_context, "stop_words", None)
return list(stops) if stops else []
@contextlib.contextmanager
def _llm_stop_words_applied(
llm: LLM | BaseLLM,
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
) -> Iterator[None]:
"""Apply the executor's stop words to the LLM for the duration of one call.
Uses :func:`crewai.llms.base_llm.call_stop_override` so the LLM's stop
field is never mutated. Safe under concurrent execution: the override is
propagated via a :class:`contextvars.ContextVar` and is scoped to this
call's task / thread context.
"""
extra = _executor_stop_words(executor_context)
if not extra or not isinstance(llm, BaseLLM) or set(extra).issubset(llm.stop):
yield
return
with call_stop_override(llm, list(set(llm.stop + extra))):
yield
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached.
@@ -459,18 +492,15 @@ def get_llm_response(
"""
messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose)
try:
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
raise e
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return _validate_and_finalize_llm_response(
answer, executor_context, printer, verbose=verbose
@@ -515,18 +545,15 @@ async def aget_llm_response(
"""
messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose)
try:
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
raise e
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return _validate_and_finalize_llm_response(
answer, executor_context, printer, verbose=verbose
@@ -1565,11 +1592,12 @@ def execute_single_native_tool_call(
color="green",
)
# Check result_as_answer
is_result_as_answer = bool(
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
and not error_event_emitted
and not hook_blocked
)
return NativeToolCallResult(

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import json
import re
from typing import TYPE_CHECKING, Any, Final, TypedDict
@@ -41,6 +42,45 @@ class ConverterError(Exception):
class Converter(OutputConverter):
"""Class that converts text into either pydantic or json."""
def _build_messages(self) -> list[dict[str, str]]:
return [
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
def _coerce_response_to_pydantic(self, response: Any) -> BaseModel:
"""Validate an LLM response into the configured Pydantic model.
Pure post-processing — performs no I/O. Shared by ``to_pydantic`` and
``ato_pydantic`` so the validation/partial-JSON fallback logic stays in
a single place.
"""
if isinstance(response, BaseModel):
return response
try:
return self.model.model_validate_json(response)
except ValidationError:
partial = handle_partial_json(
result=response,
model=self.model,
is_json_output=False,
agent=None,
)
if isinstance(partial, BaseModel):
return partial
if isinstance(partial, dict):
return self.model.model_validate(partial)
if isinstance(partial, str):
try:
return self.model.model_validate_json(partial)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
) from parse_err
raise ConverterError(
"handle_partial_json returned an unexpected type."
) from None
def to_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Convert text to pydantic.
@@ -56,50 +96,12 @@ class Converter(OutputConverter):
try:
if self.llm.supports_function_calling():
response = self.llm.call(
messages=[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
],
messages=self._build_messages(),
response_model=self.model,
)
if isinstance(response, BaseModel):
result = response
else:
result = self.model.model_validate_json(response)
else:
response = self.llm.call(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
try:
# Try to directly validate the response JSON
result = self.model.model_validate_json(response)
except ValidationError:
# If direct validation fails, attempt to extract valid JSON
result = handle_partial_json( # type: ignore[assignment]
result=response,
model=self.model,
is_json_output=False,
agent=None,
)
# Ensure result is a BaseModel instance
if not isinstance(result, BaseModel):
if isinstance(result, dict):
result = self.model.model_validate(result)
elif isinstance(result, str):
try:
result = self.model.model_validate_json(result)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
) from parse_err
else:
raise ConverterError(
"handle_partial_json returned an unexpected type."
) from None
return result
response = self.llm.call(self._build_messages())
return self._coerce_response_to_pydantic(response)
except ValidationError as e:
if current_attempt < self.max_attempts:
return self.to_pydantic(current_attempt + 1)
@@ -113,6 +115,30 @@ class Converter(OutputConverter):
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e
async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Async equivalent of ``to_pydantic`` — uses ``acall`` so the event loop is not blocked."""
try:
if self.llm.supports_function_calling():
response = await self.llm.acall(
messages=self._build_messages(),
response_model=self.model,
)
else:
response = await self.llm.acall(self._build_messages())
return self._coerce_response_to_pydantic(response)
except ValidationError as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to validation error: {e}"
) from e
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e
def to_json(self, current_attempt: int = 1) -> str | ConverterError | Any: # type: ignore[override]
"""Convert text to json.
@@ -129,19 +155,28 @@ class Converter(OutputConverter):
try:
if self.llm.supports_function_calling():
return self._create_instructor().to_json()
return json.dumps(
self.llm.call(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
)
return json.dumps(self.llm.call(self._build_messages()))
except Exception as e:
if current_attempt < self.max_attempts:
return self.to_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
async def ato_json(self, current_attempt: int = 1) -> str | ConverterError | Any:
"""Async equivalent of ``to_json``.
The function-calling path delegates to ``InternalInstructor`` (currently
sync-only); we run it via ``asyncio.to_thread`` so the event loop stays
free.
"""
try:
if self.llm.supports_function_calling():
return await asyncio.to_thread(self._create_instructor().to_json)
return json.dumps(await self.llm.acall(self._build_messages()))
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
def _create_instructor(self) -> InternalInstructor[Any]:
"""Create an instructor."""
@@ -153,16 +188,18 @@ class Converter(OutputConverter):
def convert_to_model(
result: str,
result: str | BaseModel,
output_pydantic: type[BaseModel] | None,
output_json: type[BaseModel] | None,
agent: Agent | BaseAgent | None = None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Convert a result string to a Pydantic model or JSON.
"""Convert a result to a Pydantic model or JSON.
Args:
result: The result string to convert.
result: The result to convert. Usually a JSON string, but a Pydantic
instance is also accepted when an upstream caller already produced
a structured object.
output_pydantic: The Pydantic model class to convert to.
output_json: The Pydantic model class to convert to JSON.
agent: The agent instance.
@@ -175,6 +212,11 @@ def convert_to_model(
if model is None:
return result
if isinstance(result, BaseModel):
if isinstance(result, model):
return result.model_dump() if output_json else result
result = result.model_dump_json()
if converter_cls:
return convert_with_instructions(
result=result,
@@ -347,6 +389,144 @@ def convert_with_instructions(
return exported_result
async def async_convert_to_model(
result: str | BaseModel,
output_pydantic: type[BaseModel] | None,
output_json: type[BaseModel] | None,
agent: Agent | BaseAgent | None = None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``convert_to_model`` — uses native ``acall``.
Mirrors the dispatch semantics of the sync version exactly; the only
difference is that LLM-bearing branches are awaited.
"""
model = output_pydantic or output_json
if model is None:
return result
if isinstance(result, BaseModel):
if isinstance(result, model):
return result.model_dump() if output_json else result
result = result.model_dump_json()
if converter_cls:
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(
result=escaped_result, model=model, is_json_output=bool(output_json)
)
except (json.JSONDecodeError, ValidationError):
return await async_handle_partial_json(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
except Exception as e:
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result
async def async_handle_partial_json(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``handle_partial_json`` — defers LLM fallback to ``acall``."""
match = _JSON_PATTERN.search(result)
if match:
try:
parsed = json.loads(match.group(), strict=False)
except json.JSONDecodeError:
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)
try:
exported_result = model.model_validate(parsed)
if is_json_output:
return exported_result.model_dump()
return exported_result
except ValidationError:
raise
except Exception as e:
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)
async def async_convert_with_instructions(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``convert_with_instructions`` — calls ``ato_pydantic``/``ato_json``."""
if agent is None:
raise TypeError("Agent must be provided if converter_cls is not specified.")
llm = getattr(agent, "function_calling_llm", None) or agent.llm
if llm is None:
raise ValueError("Agent must have a valid LLM instance for conversion")
instructions = get_conversion_instructions(model=model, llm=llm)
converter = create_converter(
agent=agent,
converter_cls=converter_cls,
llm=llm,
text=result,
model=model,
instructions=instructions,
)
exported_result = (
await converter.ato_pydantic()
if not is_json_output
else await converter.ato_json()
)
if isinstance(exported_result, ConverterError):
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Failed to convert result to model: {exported_result}",
color="red",
)
return result
return exported_result
def get_conversion_instructions(
model: type[BaseModel], llm: BaseLLM | LLM | str | Any
) -> str:

View File

@@ -2452,3 +2452,167 @@ def test_agent_mcps_accepts_legacy_prefix_with_tool():
mcps=["crewai-amp:notion#get_page"],
)
assert agent.mcps == ["crewai-amp:notion#get_page"]
class TestSharedLLMStopWords:
"""Regression tests for shared LLM stop words mutation (issue #5141).
Stop words from one executor must not leak into the shared LLM permanently
or pollute other agents sharing that LLM.
"""
@staticmethod
def _make_executor(llm: LLM, stop_words: list[str]) -> CrewAgentExecutor:
"""Build a CrewAgentExecutor with minimal deps."""
from crewai.agents.tools_handler import ToolsHandler
agent = Agent(role="r", goal="g", backstory="b")
task = Task(description="d", expected_output="o", agent=agent)
return CrewAgentExecutor(
agent=agent,
task=task,
llm=llm,
crew=None,
prompt={"prompt": "p {input} {tool_names} {tools}"},
max_iter=5,
tools=[],
tools_names="",
stop_words=stop_words,
tools_description="",
tools_handler=ToolsHandler(),
)
def test_executor_init_does_not_mutate_shared_llm(self) -> None:
"""Constructing executors must not touch the shared LLM's stop list."""
shared = LLM(model="gpt-4", stop=["Original:"])
original = list(shared.stop)
a = self._make_executor(shared, stop_words=["StopA:"])
b = self._make_executor(shared, stop_words=["StopB:"])
assert shared.stop == original
assert a.llm is shared
assert b.llm is shared
def test_effective_stop_reflects_override_inside_context(self) -> None:
"""Inside the helper, the effective stop list includes the executor's words."""
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
with _llm_stop_words_applied(shared, executor):
assert set(shared.stop_sequences) == {"Original:", "Observation:"}
assert shared.stop == ["Original:"]
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_override_cleared_when_context_raises(self) -> None:
"""A failed call must still clear the per-call stop override."""
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
try:
with _llm_stop_words_applied(shared, executor):
raise RuntimeError("boom")
except RuntimeError:
pass
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_override_applies_for_post_processing_when_api_lacks_stop_support(
self,
) -> None:
"""Models that lack API-level stop support still need the override.
Native providers (e.g. Azure on gpt-5/o-series) read ``stop_sequences``
in ``_apply_stop_words`` to truncate the response post-hoc even when
``supports_stop_words()`` returns False, so the override must be set
regardless of API-level support. (Issue raised by Cursor Bugbot.)
"""
from unittest.mock import patch
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
with patch.object(shared, "supports_stop_words", return_value=False):
with _llm_stop_words_applied(shared, executor):
assert set(shared.stop_sequences) == {"Original:", "Observation:"}
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_concurrent_overrides_do_not_collide(self) -> None:
"""Concurrent agents on a shared LLM must each see their own effective stop."""
import asyncio
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
exec_a = self._make_executor(shared, stop_words=["StopA:"])
exec_b = self._make_executor(shared, stop_words=["StopB:"])
async def run(executor: CrewAgentExecutor, expected: str) -> set[str]:
with _llm_stop_words_applied(shared, executor):
await asyncio.sleep(0)
seen = set(shared.stop_sequences)
assert expected in seen
return seen
async def main() -> tuple[set[str], set[str]]:
return await asyncio.gather(
run(exec_a, "StopA:"), run(exec_b, "StopB:")
)
a_seen, b_seen = asyncio.run(main())
assert a_seen == {"Original:", "StopA:"}
assert b_seen == {"Original:", "StopB:"}
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_override_does_not_leak_to_other_llm_instances(self) -> None:
"""Override for one LLM must not affect another LLM (e.g. function_calling_llm).
Regression for Cursor Bugbot: a global ContextVar would leak the
override to every BaseLLM that reads stop_sequences during the scope.
"""
from crewai.utilities.agent_utils import _llm_stop_words_applied
target = LLM(model="gpt-4", stop=["TargetStop:"])
other = LLM(model="gpt-4", stop=["OtherStop:"])
executor = self._make_executor(target, stop_words=["Observation:"])
with _llm_stop_words_applied(target, executor):
assert set(target.stop_sequences) == {"TargetStop:", "Observation:"}
assert other.stop_sequences == ["OtherStop:"]
assert target.stop_sequences == ["TargetStop:"]
assert other.stop_sequences == ["OtherStop:"]
def test_override_propagates_to_nested_direct_llm_calls(self) -> None:
"""Once invoke wraps with the override, nested direct llm.call sites
(StepExecutor, handle_max_iterations_exceeded) see the merged stops.
Regression for Cursor Bugbot: those direct call sites bypass
get_llm_response, so the override must be set at executor entry, not
only around get_llm_response.
"""
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
seen: list[set[str]] = []
def nested_direct_call() -> None:
seen.append(set(shared.stop_sequences))
with _llm_stop_words_applied(shared, executor):
nested_direct_call()
assert seen == [{"Original:", "Observation:"}]
assert shared.stop == ["Original:"]

View File

@@ -596,6 +596,35 @@ def test_gemini_token_usage_tracking():
assert usage.total_tokens > 0
def test_gemini_thoughts_tokens_counted_in_completion_and_total():
"""Gemini's thoughts_token_count must be folded into completion_tokens so the
tracked total matches the API's total_token_count for thinking models."""
from crewai.llms.providers.gemini.completion import GeminiCompletion
llm = GeminiCompletion(model="gemini-2.0-flash-001")
response = MagicMock()
response.usage_metadata = MagicMock(
prompt_token_count=100,
candidates_token_count=50,
thoughts_token_count=25,
total_token_count=175,
cached_content_token_count=0,
)
usage = llm._extract_token_usage(response)
assert usage["candidates_token_count"] == 50
assert usage["completion_tokens"] == 75
assert usage["reasoning_tokens"] == 25
llm._track_token_usage_internal(usage)
summary = llm.get_token_usage_summary()
assert summary.prompt_tokens == 100
assert summary.completion_tokens == 75
assert summary.total_tokens == 175
assert summary.reasoning_tokens == 25
@pytest.mark.vcr()
def test_gemini_tool_returning_float():
"""

View File

@@ -1,12 +1,14 @@
"""Tests for async task execution."""
import pytest
from pydantic import BaseModel
from unittest.mock import AsyncMock, MagicMock, patch
from crewai.agent import Agent
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.tasks.output_format import OutputFormat
from crewai.utilities.converter import Converter
@pytest.fixture
@@ -383,4 +385,73 @@ class TestAsyncTaskOutput:
assert result.description == "Test description"
assert result.expected_output == "Test expected"
assert result.raw == "Test result"
assert result.agent == "Test Agent"
assert result.agent == "Test Agent"
class _AsyncOnlyOutput(BaseModel):
value: str
class TestAsyncOutputConversion:
"""Regression tests for native-async output conversion (issue #5230).
Ensures `_aexport_output` reaches the LLM via `acall` and never via the
blocking `call` method.
"""
@pytest.mark.asyncio
async def test_aexport_output_uses_acall_not_call(self) -> None:
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = False
mock_llm.acall = AsyncMock(return_value='{"value": "ok"}')
mock_llm.call = MagicMock(
side_effect=AssertionError("call() must NOT be invoked from async path")
)
converter = Converter(
llm=mock_llm,
model=_AsyncOnlyOutput,
text="raw",
instructions="convert",
max_attempts=1,
)
result = await converter.ato_pydantic()
assert isinstance(result, _AsyncOnlyOutput)
assert result.value == "ok"
mock_llm.acall.assert_awaited_once()
mock_llm.call.assert_not_called()
@pytest.mark.asyncio
async def test_ato_json_function_calling_does_not_block_event_loop(self) -> None:
"""The function-calling JSON path must run via asyncio.to_thread.
``InternalInstructor`` is sync-only; `ato_json` should offload it so the
event loop is not blocked.
"""
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
converter = Converter(
llm=mock_llm,
model=_AsyncOnlyOutput,
text="raw",
instructions="convert",
max_attempts=1,
)
sentinel = '{"value": "ok"}'
with patch.object(
converter, "_create_instructor"
) as mock_create, patch(
"crewai.utilities.converter.asyncio.to_thread", new_callable=AsyncMock
) as mock_to_thread:
instructor = MagicMock()
instructor.to_json = MagicMock(return_value=sentinel)
mock_create.return_value = instructor
mock_to_thread.return_value = sentinel
result = await converter.ato_json()
assert result == sentinel
mock_to_thread.assert_awaited_once_with(instructor.to_json)

View File

@@ -1254,6 +1254,119 @@ async def test_async_task_execution_call_count(researcher, writer):
assert mock_execute_sync.call_count == 1
def test_mixed_sync_async_task_outputs_not_dropped(researcher, writer):
"""Sync outputs accumulated before a pending async batch must survive the flush."""
sync1_output = TaskOutput(description="sync1", raw="s1", agent="researcher")
async1_output = TaskOutput(description="async1", raw="a1", agent="researcher")
sync2_output = TaskOutput(description="sync2", raw="s2", agent="writer")
sync1 = Task(description="sync1", expected_output="x", agent=researcher)
async1 = Task(
description="async1",
expected_output="x",
agent=researcher,
async_execution=True,
)
sync2 = Task(description="sync2", expected_output="x", agent=writer)
sync1.output = sync1_output
async1.output = async1_output
sync2.output = sync2_output
crew = Crew(agents=[researcher, writer], tasks=[sync1, async1, sync2])
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = async1_output
with (
patch.object(
Task, "execute_sync", side_effect=[sync1_output, sync2_output]
),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert [o.raw for o in result.tasks_output] == ["s1", "a1", "s2"]
@pytest.mark.asyncio
async def test_mixed_sync_async_task_outputs_not_dropped_native_async(
researcher, writer
):
"""Same regression as the sync path, exercised via akickoff (native async)."""
sync1_output = TaskOutput(description="sync1", raw="s1", agent="researcher")
async1_output = TaskOutput(description="async1", raw="a1", agent="researcher")
sync2_output = TaskOutput(description="sync2", raw="s2", agent="writer")
sync1 = Task(description="sync1", expected_output="x", agent=researcher)
async1 = Task(
description="async1",
expected_output="x",
agent=researcher,
async_execution=True,
)
sync2 = Task(description="sync2", expected_output="x", agent=writer)
sync1.output = sync1_output
async1.output = async1_output
sync2.output = sync2_output
crew = Crew(agents=[researcher, writer], tasks=[sync1, async1, sync2])
aexecute_outputs = iter([sync1_output, async1_output, sync2_output])
async def fake_aexecute_sync(*_args: Any, **_kwargs: Any) -> TaskOutput:
return next(aexecute_outputs)
with patch.object(Task, "aexecute_sync", side_effect=fake_aexecute_sync):
result = await crew.akickoff()
assert [o.raw for o in result.tasks_output] == ["s1", "a1", "s2"]
def test_pending_async_outputs_preserved_through_conditional_task(researcher, writer):
"""A conditional task encountered after a pending async batch must not silently drop the async output."""
sync1_output = TaskOutput(description="sync1", raw="s1", agent="researcher")
async1_output = TaskOutput(description="async1", raw="a1", agent="researcher")
def always_skip(_: TaskOutput) -> bool:
return False
sync1 = Task(description="sync1", expected_output="x", agent=researcher)
async1 = Task(
description="async1",
expected_output="x",
agent=researcher,
async_execution=True,
)
conditional = ConditionalTask(
description="conditional",
expected_output="x",
agent=writer,
condition=always_skip,
)
sync1.output = sync1_output
async1.output = async1_output
crew = Crew(
agents=[researcher, writer], tasks=[sync1, async1, conditional]
)
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = async1_output
with (
patch.object(Task, "execute_sync", return_value=sync1_output),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
raws = [o.raw for o in result.tasks_output]
assert raws[:2] == ["s1", "a1"]
assert len(result.tasks_output) == 3
@pytest.mark.vcr()
def test_kickoff_for_each_single_input():
"""Tests if kickoff_for_each works with a single input."""

View File

@@ -7,6 +7,7 @@ async support, and attribute preservation functionality.
from __future__ import annotations
import asyncio
import logging
from datetime import datetime
from typing import Any
from unittest.mock import MagicMock, patch
@@ -597,6 +598,277 @@ class TestHumanFeedbackLearn:
assert config.llm == "gpt-4o-mini"
class TestHumanFeedbackPreReviewFailure:
"""Tests for HITL pre-review failure handling (issue #5725).
Pre-review must NOT silently swallow exceptions: failures should be
logged at WARNING level with full traceback, and an opt-in
``learn_strict=True`` should propagate the exception instead of
falling back to raw output.
"""
@staticmethod
def _seeded_memory() -> MagicMock:
"""Return a mock memory object with one stored lesson so that
pre-review proceeds past the ``not matches`` short-circuit."""
from crewai.memory.types import MemoryMatch, MemoryRecord
mem = MagicMock()
mem.recall.return_value = [
MemoryMatch(
record=MemoryRecord(
content="Always include source citations",
embedding=[],
),
score=0.9,
match_reasons=["semantic"],
)
]
return mem
def test_pre_review_llm_failure_logs_warning_and_falls_back(self, caplog):
"""When the pre-review LLM call raises, the failure is logged at
WARNING with exc_info, and the raw method_output is shown to the
human (default fail-open behavior, but now observable)."""
class LearnFlow(Flow):
@start()
@human_feedback(message="Review:", llm="gpt-4o-mini", learn=True)
def produce(self):
return "raw draft"
flow = LearnFlow()
flow.memory = self._seeded_memory()
captured: dict[str, Any] = {}
def capture_feedback(message, output, metadata=None, emit=None):
captured["shown_to_human"] = output
return "approved"
with (
patch.object(flow, "_request_human_feedback", side_effect=capture_feedback),
patch("crewai.llm.LLM") as MockLLM,
caplog.at_level(logging.WARNING, logger="crewai.flow.human_feedback"),
):
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
mock_llm.call.side_effect = RuntimeError(
"simulated pre-review failure"
)
MockLLM.return_value = mock_llm
flow.produce()
# The human still sees the raw output (fail-open by default).
assert captured["shown_to_human"] == "raw draft"
# ...but the failure is now observable via a WARNING log with traceback.
warning_records = [
r
for r in caplog.records
if r.name == "crewai.flow.human_feedback"
and r.levelno == logging.WARNING
and "HITL pre-review failed" in r.getMessage()
]
assert len(warning_records) == 1, (
"Expected exactly one HITL pre-review failure warning, got: "
f"{[r.getMessage() for r in caplog.records]}"
)
# exc_info=True must be set so the traceback is captured.
assert warning_records[0].exc_info is not None
# The exception type/message should be visible in the captured exc_info.
exc_type, exc_value, _ = warning_records[0].exc_info
assert exc_type is RuntimeError
assert "simulated pre-review failure" in str(exc_value)
def test_pre_review_recall_failure_logs_warning_and_falls_back(self, caplog):
"""A failure in ``memory.recall`` is also logged and falls back to
the raw output instead of being silently swallowed."""
class LearnFlow(Flow):
@start()
@human_feedback(message="Review:", llm="gpt-4o-mini", learn=True)
def produce(self):
return "raw draft"
flow = LearnFlow()
flow.memory = MagicMock()
flow.memory.recall.side_effect = RuntimeError("recall blew up")
captured: dict[str, Any] = {}
def capture_feedback(message, output, metadata=None, emit=None):
captured["shown_to_human"] = output
return ""
with (
patch.object(flow, "_request_human_feedback", side_effect=capture_feedback),
caplog.at_level(logging.WARNING, logger="crewai.flow.human_feedback"),
):
flow.produce()
assert captured["shown_to_human"] == "raw draft"
warning_records = [
r
for r in caplog.records
if r.name == "crewai.flow.human_feedback"
and r.levelno == logging.WARNING
and "memory recall failed" in r.getMessage()
]
assert len(warning_records) == 1
assert warning_records[0].exc_info is not None
def test_pre_review_failure_with_learn_strict_propagates(self):
"""When ``learn_strict=True``, a pre-review LLM failure is re-raised
instead of silently falling back to raw output."""
class StrictFlow(Flow):
@start()
@human_feedback(
message="Review:",
llm="gpt-4o-mini",
learn=True,
learn_strict=True,
)
def produce(self):
return "raw draft"
flow = StrictFlow()
flow.memory = self._seeded_memory()
with (
patch.object(flow, "_request_human_feedback", return_value="approved"),
patch("crewai.llm.LLM") as MockLLM,
):
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
mock_llm.call.side_effect = RuntimeError(
"simulated pre-review failure"
)
MockLLM.return_value = mock_llm
with pytest.raises(RuntimeError, match="simulated pre-review failure"):
flow.produce()
def test_pre_review_recall_failure_with_learn_strict_propagates(self):
"""When ``learn_strict=True``, a ``memory.recall`` failure is re-raised
instead of silently falling back to raw output."""
class StrictFlow(Flow):
@start()
@human_feedback(
message="Review:",
llm="gpt-4o-mini",
learn=True,
learn_strict=True,
)
def produce(self):
return "raw draft"
flow = StrictFlow()
flow.memory = MagicMock()
flow.memory.recall.side_effect = RuntimeError("recall blew up")
with patch.object(flow, "_request_human_feedback", return_value="approved"):
with pytest.raises(RuntimeError, match="recall blew up"):
flow.produce()
@pytest.mark.asyncio
async def test_async_pre_review_failure_logs_warning_and_falls_back(self, caplog):
"""The async wrapper exhibits the same logging + fail-open behavior
as the sync wrapper on pre-review LLM failure."""
class AsyncLearnFlow(Flow):
@start()
@human_feedback(message="Review:", llm="gpt-4o-mini", learn=True)
async def produce(self):
return "raw draft"
flow = AsyncLearnFlow()
flow.memory = self._seeded_memory()
captured: dict[str, Any] = {}
def capture_feedback(message, output, metadata=None, emit=None):
captured["shown_to_human"] = output
return "approved"
with (
patch.object(flow, "_request_human_feedback", side_effect=capture_feedback),
patch("crewai.llm.LLM") as MockLLM,
caplog.at_level(logging.WARNING, logger="crewai.flow.human_feedback"),
):
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
mock_llm.call.side_effect = RuntimeError(
"simulated pre-review failure"
)
MockLLM.return_value = mock_llm
await flow.produce()
assert captured["shown_to_human"] == "raw draft"
warning_records = [
r
for r in caplog.records
if r.name == "crewai.flow.human_feedback"
and r.levelno == logging.WARNING
and "HITL pre-review failed" in r.getMessage()
]
assert len(warning_records) == 1
assert warning_records[0].exc_info is not None
@pytest.mark.asyncio
async def test_async_pre_review_failure_with_learn_strict_propagates(self):
"""The async wrapper also re-raises when ``learn_strict=True``."""
class AsyncStrictFlow(Flow):
@start()
@human_feedback(
message="Review:",
llm="gpt-4o-mini",
learn=True,
learn_strict=True,
)
async def produce(self):
return "raw draft"
flow = AsyncStrictFlow()
flow.memory = self._seeded_memory()
with (
patch.object(flow, "_request_human_feedback", return_value="approved"),
patch("crewai.llm.LLM") as MockLLM,
):
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
mock_llm.call.side_effect = RuntimeError(
"simulated pre-review failure"
)
MockLLM.return_value = mock_llm
with pytest.raises(RuntimeError, match="simulated pre-review failure"):
await flow.produce()
def test_learn_strict_default_is_false_and_propagates_to_config(self):
"""``learn_strict`` defaults to False and is exposed on the
``HumanFeedbackConfig`` for introspection."""
@human_feedback(message="Review:", learn=True)
def default_method(self):
return "output"
@human_feedback(message="Review:", learn=True, learn_strict=True)
def strict_method(self):
return "output"
assert default_method.__human_feedback_config__.learn_strict is False
assert strict_method.__human_feedback_config__.learn_strict is True
class TestHumanFeedbackFinalOutputPreservation:
"""Tests for preserving method return value as flow's final output when @human_feedback with emit is terminal.

View File

@@ -690,6 +690,27 @@ def test_multiple_guardrails_with_pydantic_output():
assert parsed["processed"] is True
def test_export_output_accepts_pydantic_input():
"""Regression test for #5458: _export_output must not crash with TypeError
when called with a Pydantic instance (e.g. when an upstream caller passes
an already-converted model from a context task)."""
from pydantic import BaseModel
class StructuredResult(BaseModel):
value: str
task = create_smart_task(
description="Test pydantic export",
expected_output="Structured output",
output_pydantic=StructuredResult,
)
instance = StructuredResult(value="ok")
pydantic_output, json_output = task._export_output(instance)
assert pydantic_output is instance
assert json_output is None
def test_guardrails_vs_single_guardrail_mutual_exclusion():
"""Test that guardrails list nullifies single guardrail."""

View File

@@ -17,6 +17,8 @@ from crewai.utilities.agent_utils import (
_format_messages_for_summary,
_split_messages_into_chunks,
convert_tools_to_openai_schema,
execute_single_native_tool_call,
NativeToolCallResult,
parse_tool_call_args,
summarize_messages,
)
@@ -1033,3 +1035,91 @@ class TestParseToolCallArgs:
_, error = parse_tool_call_args("{bad json}", "tool", "call_7")
assert error is not None
assert set(error.keys()) == {"call_id", "func_name", "result", "from_cache", "original_tool"}
class TestExecuteSingleNativeToolCall:
"""Tests for execute_single_native_tool_call."""
def test_result_as_answer_false_on_tool_error(self) -> None:
"""When a tool with result_as_answer=True raises, result_as_answer must be False.
Regression test for https://github.com/crewAIInc/crewAI/issues/5156
"""
from unittest.mock import MagicMock
class FailingTool(BaseTool):
name: str = "failing_tool"
description: str = "A tool that always fails"
result_as_answer: bool = True
def _run(self, **kwargs: Any) -> str:
raise RuntimeError("intentional failure")
tool = FailingTool()
tool_call = MagicMock()
tool_call.id = "call_1"
tool_call.function.name = "failing_tool"
tool_call.function.arguments = "{}"
result = execute_single_native_tool_call(
tool_call,
available_functions={"failing_tool": tool._run},
original_tools=[tool],
structured_tools=None,
tools_handler=None,
agent=None,
task=None,
crew=None,
event_source=MagicMock(),
printer=None,
verbose=False,
)
assert isinstance(result, NativeToolCallResult)
assert result.result_as_answer is False
assert "Error executing tool" in result.result
def test_result_as_answer_false_when_hook_blocks(self) -> None:
"""When a before-hook blocks a tool with result_as_answer=True, result_as_answer must be False."""
from unittest.mock import MagicMock
from crewai.hooks.tool_hooks import (
clear_before_tool_call_hooks,
register_before_tool_call_hook,
)
class BlockedTool(BaseTool):
name: str = "blocked_tool"
description: str = "A tool whose execution will be blocked by a hook"
result_as_answer: bool = True
def _run(self, **kwargs: Any) -> str:
return "should not run"
tool = BlockedTool()
tool_call = MagicMock()
tool_call.id = "call_1"
tool_call.function.name = "blocked_tool"
tool_call.function.arguments = "{}"
register_before_tool_call_hook(lambda _ctx: False)
try:
result = execute_single_native_tool_call(
tool_call,
available_functions={"blocked_tool": tool._run},
original_tools=[tool],
structured_tools=None,
tools_handler=None,
agent=None,
task=None,
crew=None,
event_source=MagicMock(),
printer=None,
verbose=False,
)
finally:
clear_before_tool_call_hooks()
assert isinstance(result, NativeToolCallResult)
assert result.result_as_answer is False
assert "blocked by hook" in result.result

View File

@@ -87,6 +87,31 @@ def test_convert_to_model_with_no_model() -> None:
assert output == "Plain text"
def test_convert_to_model_with_basemodel_input_matching_pydantic() -> None:
instance = SimpleModel(name="John", age=30)
output = convert_to_model(instance, SimpleModel, None, None)
assert output is instance
def test_convert_to_model_with_basemodel_input_matching_json() -> None:
instance = SimpleModel(name="John", age=30)
output = convert_to_model(instance, None, SimpleModel, None)
assert output == {"name": "John", "age": 30}
def test_convert_to_model_with_basemodel_input_different_class() -> None:
class OtherModel(BaseModel):
name: str
age: int
extra: str = "default"
instance = OtherModel(name="John", age=30, extra="ignored")
output = convert_to_model(instance, SimpleModel, None, None)
assert isinstance(output, SimpleModel)
assert output.name == "John"
assert output.age == 30
def test_convert_to_model_with_special_characters() -> None:
json_string_test = """
{

View File

@@ -11,6 +11,8 @@ Installed automatically via the workspace (`uv sync`). Requires:
- `ENTERPRISE_REPO` env var — GitHub repo for enterprise releases
- `ENTERPRISE_VERSION_DIRS` env var — comma-separated directories to bump in the enterprise repo
- `ENTERPRISE_CREWAI_DEP_PATH` env var — path to the pyproject.toml with the `crewai[tools]` pin in the enterprise repo
- `ENTERPRISE_WORKFLOW_PATHS` env var — comma-separated workflow file paths in the enterprise repo whose `crewai[extras]==<version>` pins should be rewritten on each release (e.g. `.github/workflows/tests.yml`)
- `ENTERPRISE_EXTRA_PACKAGES` env var — comma-separated packages to also pin in enterprise pyproject files, in addition to `crewai` / `crewai[extras]`
## Commands

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"

10
uv.lock generated
View File

@@ -13,7 +13,7 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-04-28T07:00:00Z"
exclude-newer = "2026-04-27T16:00:00Z"
[manifest]
members = [
@@ -1626,7 +1626,7 @@ requires-dist = [
{ name = "e2b-code-interpreter", marker = "extra == 'e2b'", specifier = "~=2.6.0" },
{ name = "exa-py", marker = "extra == 'exa-py'", specifier = ">=1.8.7" },
{ name = "firecrawl-py", marker = "extra == 'firecrawl-py'", specifier = ">=1.8.0" },
{ name = "gitpython", marker = "extra == 'github'", specifier = ">=3.1.41,<4" },
{ name = "gitpython", marker = "extra == 'github'", specifier = ">=3.1.47,<4" },
{ name = "hyperbrowser", marker = "extra == 'hyperbrowser'", specifier = ">=0.18.0" },
{ name = "langchain-apify", marker = "extra == 'apify'", specifier = ">=0.1.2,<1.0.0" },
{ name = "linkup-sdk", marker = "extra == 'linkup-sdk'", specifier = ">=0.2.2" },
@@ -2619,14 +2619,14 @@ wheels = [
[[package]]
name = "gitpython"
version = "3.1.46"
version = "3.1.47"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "gitdb" },
]
sdist = { url = "https://files.pythonhosted.org/packages/df/b5/59d16470a1f0dfe8c793f9ef56fd3826093fc52b3bd96d6b9d6c26c7e27b/gitpython-3.1.46.tar.gz", hash = "sha256:400124c7d0ef4ea03f7310ac2fbf7151e09ff97f2a3288d64a440c584a29c37f", size = 215371, upload-time = "2026-01-01T15:37:32.073Z" }
sdist = { url = "https://files.pythonhosted.org/packages/c1/bd/50db468e9b1310529a19fce651b3b0e753b5c07954d486cba31bbee9a5d5/gitpython-3.1.47.tar.gz", hash = "sha256:dba27f922bd2b42cb54c87a8ab3cb6beb6bf07f3d564e21ac848913a05a8a3cd", size = 216978, upload-time = "2026-04-22T02:44:44.059Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6a/09/e21df6aef1e1ffc0c816f0522ddc3f6dcded766c3261813131c78a704470/gitpython-3.1.46-py3-none-any.whl", hash = "sha256:79812ed143d9d25b6d176a10bb511de0f9c67b1fa641d82097b0ab90398a2058", size = 208620, upload-time = "2026-01-01T15:37:30.574Z" },
{ url = "https://files.pythonhosted.org/packages/f2/c5/a1bc0996af85757903cf2bf444a7824e68e0035ce63fb41d6f76f9def68b/gitpython-3.1.47-py3-none-any.whl", hash = "sha256:489f590edfd6d20571b2c0e72c6a6ac6915ee8b8cd04572330e3842207a78905", size = 209547, upload-time = "2026-04-22T02:44:41.271Z" },
]
[[package]]