Compare commits

...

16 Commits

Author SHA1 Message Date
Devin AI
6725d86304 fix(memory): escape user input in LanceDBStorage SQL filters (#5728)
LanceDBStorage interpolated caller-supplied scope paths and record IDs
directly into the WHERE clauses passed to LanceDB's where(), which
accepts a raw DataFusion SQL expression and does not support
parameterized queries. A malicious or unprivileged caller could escape
the configured scope sandbox -- for example, calling
delete(scope_prefix="/alpha' OR scope LIKE '/%") would wipe every
record in the table instead of just the /alpha subtree -- and ordinary
strings containing apostrophes (e.g. 'O''Brien') could crash the SQL
parser.

Add _escape_sql_str() and _escape_like() helpers and route every
user-controlled value through them in search(), delete(), reset(), and
the shared _scan_rows() reader. The LIKE clauses now also use
ESCAPE '\\' so % and _ in caller-supplied prefixes are treated as
literals instead of wildcards.

Adds tests/memory/test_lancedb_storage_security.py covering each
sink (search, delete by scope, delete by id, reset, scan-based
readers) with both injection payloads and legitimate apostrophe-
containing scopes/IDs.
2026-05-06 06:23:02 +00:00
iris-clawd
ec8a522c2c fix: correct status endpoint path from /{kickoff_id}/status to /status/{kickoff_id}
Some checks failed
Check Documentation Broken Links / Check broken links (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2026-05-05 07:29:49 +08:00
Greyson LaLonde
e25f6538a8 fix(deps): bump gitpython to >=3.1.47 for GHSA-rpm5-65cw-6hj4
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
2026-05-04 23:44:28 +08:00
Greyson LaLonde
470d4035db docs: update changelog and version for v1.14.5a2 2026-05-04 23:04:56 +08:00
Greyson LaLonde
57d1b338f7 feat: bump versions to 1.14.5a2 2026-05-04 22:58:06 +08:00
huang yutong
01df19b029 fix(a2a): always restore task.output_pydantic in finally block
In `_execute_task_with_a2a` and its async variant, the try body
sets `task.output_pydantic = None` before returning an A2A
response. The finally block then checks
`if task.output_pydantic is not None` before restoring the
original value — but since it was just set to None, the condition
is always False and the original value is never restored. This
permanently mutates the Task object.

Remove the guard so `output_pydantic` is unconditionally restored,
matching the unconditional restoration of `description` and
`response_model` in the same block.

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 22:41:04 +08:00
Rip&Tear
dca2c3160f chore: update security reporting instructions
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 22:31:35 +08:00
Greyson LaLonde
6494d68ffc fix(gemini): include thoughts_token_count in completion tokens 2026-05-04 21:03:38 +08:00
Greyson LaLonde
f579aa53ae fix: preserve task outputs across async batch flush 2026-05-04 20:24:24 +08:00
minasami-pr
a23e118b11 fix: forward kwargs to loader calls in CrewAIRagAdapter
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 19:52:24 +08:00
Greyson LaLonde
095f796922 fix: prevent result_as_answer from returning hook-block message as final answer 2026-05-04 19:42:07 +08:00
Zamuldinov Nikita
bfbdba426f fix: prevent result_as_answer from returning error as final answer
When a tool with result_as_answer=True raises an exception, the agent
was receiving result_as_answer=True and returning the error string as
the final answer. Now we set result_as_answer=False when an error event
is emitted, allowing the agent to reflect and retry.

Fixes crewAIInc/crewAI#5156

---------

Co-authored-by: NIK-TIGER-BILL <nik.tiger.bill@github.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-05-04 19:28:21 +08:00
Greyson LaLonde
a058a3b15b fix(task): use acall for output conversion in async paths 2026-05-04 18:42:12 +08:00
Greyson LaLonde
184c228ae9 fix: prevent shared LLM stop words mutation across agents
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
2026-05-04 14:23:17 +08:00
Greyson LaLonde
c9100cb51d docs(devtools): document additional env vars
Some checks failed
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Vulnerability Scan / pip-audit (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
2026-05-03 14:50:44 +08:00
Greyson LaLonde
17e82743f6 fix: handle BaseModel input in convert_to_model 2026-05-03 14:17:03 +08:00
50 changed files with 1445 additions and 239 deletions

5
.github/security.md vendored
View File

@@ -5,7 +5,10 @@ CrewAI ecosystem.
### How to Report
Please submit reports to **crewai-vdp-ess@submit.bugcrowd.com**
Please submit reports through one of the following channels:
- **crewai-vdp-ess@submit.bugcrowd.com**
- https://security.crewai.com
- **Please do not** disclose vulnerabilities via public GitHub issues, pull requests,
or social media

View File

@@ -26,7 +26,7 @@ mode: "wide"
</Step>
<Step title="مراقبة التقدم">
استخدم `GET /{kickoff_id}/status` للتحقق من حالة التنفيذ واسترجاع النتائج.
استخدم `GET /status/{kickoff_id}` للتحقق من حالة التنفيذ واسترجاع النتائج.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **الاكتشاف**: استدعِ `GET /inputs` لفهم ما يحتاجه طاقمك
2. **التنفيذ**: أرسل المدخلات عبر `POST /kickoff` لبدء المعالجة
3. **المراقبة**: استعلم عن `GET /{kickoff_id}/status` حتى الاكتمال
3. **المراقبة**: استعلم عن `GET /status/{kickoff_id}` حتى الاكتمال
4. **النتائج**: استخرج المخرجات النهائية من الاستجابة المكتملة
## معالجة الأخطاء

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "الحصول على حالة التنفيذ"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "تحديثات المنتج والتحسينات وإصلاحات
icon: "clock"
mode: "wide"
---
<Update label="4 مايو 2026">
## v1.14.5a2
[عرض الإصدار على GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## ما الذي تغير
### إصلاحات الأخطاء
- إصلاح استعادة مخرجات المهام في كتلة finally
- تضمين `thoughts_token_count` في رموز الإكمال
- الحفاظ على مخرجات المهام عبر تفريغ دفعات غير متزامنة
- تمرير kwargs إلى استدعاءات المحمل في `CrewAIRagAdapter`
- منع `result_as_answer` من إرجاع رسالة كتلة الخطاف كإجابة نهائية
- منع `result_as_answer` من إرجاع خطأ كإجابة نهائية
- استخدام `acall` لتحويل المخرجات في المسارات غير المتزامنة
- منع تغيير كلمات التوقف المشتركة في LLM عبر الوكلاء
- التعامل مع مدخلات `BaseModel` في `convert_to_model`
### الوثائق
- توثيق متغيرات البيئة الإضافية
- تحديث سجل التغييرات والإصدار لـ v1.14.5a1
## المساهمون
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="1 مايو 2026">
## v1.14.5a1

View File

@@ -26,7 +26,7 @@ Welcome to the CrewAI AMP API reference. This API allows you to programmatically
</Step>
<Step title="Monitor Progress">
Use `GET /{kickoff_id}/status` to check execution status and retrieve results.
Use `GET /status/{kickoff_id}` to check execution status and retrieve results.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Replace `your-crew-name` with your actual crew's URL from the dashboard.
1. **Discovery**: Call `GET /inputs` to understand what your crew needs
2. **Execution**: Submit inputs via `POST /kickoff` to start processing
3. **Monitoring**: Poll `GET /{kickoff_id}/status` until completion
3. **Monitoring**: Poll `GET /status/{kickoff_id}` until completion
4. **Results**: Extract the final output from the completed response
## Error Handling

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "Get execution status"
openapi: "/enterprise-api.en.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.en.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="May 04, 2026">
## v1.14.5a2
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## What's Changed
### Bug Fixes
- Fix task output restoration in finally block
- Include `thoughts_token_count` in completion tokens
- Preserve task outputs across async batch flush
- Forward kwargs to loader calls in `CrewAIRagAdapter`
- Prevent `result_as_answer` from returning hook-block message as final answer
- Prevent `result_as_answer` from returning error as final answer
- Use `acall` for output conversion in async paths
- Prevent shared LLM stop words mutation across agents
- Handle `BaseModel` input in `convert_to_model`
### Documentation
- Document additional environment variables
- Update changelog and version for v1.14.5a1
## Contributors
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="May 01, 2026">
## v1.14.5a1

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /{kickoff_id}/status`
3. **Monitor progress** using `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: Get Execution Status
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Discover inputs** using `GET /inputs`
2. **Start execution** using `POST /kickoff`
3. **Monitor progress** using `GET /{kickoff_id}/status`
3. **Monitor progress** using `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Support
@@ -207,7 +207,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: Get Execution Status
description: |

View File

@@ -84,7 +84,7 @@ paths:
'500':
$ref: '#/components/responses/ServerError'
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: 실행 상태 조회
description: |

View File

@@ -35,7 +35,7 @@ info:
1. **Descubra os inputs** usando `GET /inputs`
2. **Inicie a execução** usando `POST /kickoff`
3. **Monitore o progresso** usando `GET /{kickoff_id}/status`
3. **Monitore o progresso** usando `GET /status/{kickoff_id}`
version: 1.0.0
contact:
name: CrewAI Suporte
@@ -120,7 +120,7 @@ paths:
"500":
$ref: "#/components/responses/ServerError"
/{kickoff_id}/status:
/status/{kickoff_id}:
get:
summary: Obter Status da Execução
description: |

View File

@@ -26,7 +26,7 @@ CrewAI 엔터프라이즈 API 참고 자료에 오신 것을 환영합니다.
</Step>
<Step title="진행 상황 모니터링">
`GET /{kickoff_id}/status`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
`GET /status/{kickoff_id}`를 사용하여 실행 상태를 확인하고 결과를 조회하세요.
</Step>
</Steps>
@@ -65,7 +65,7 @@ https://your-crew-name.crewai.com
1. **탐색**: `GET /inputs`를 호출하여 crew가 필요한 것을 파악합니다.
2. **실행**: `POST /kickoff`를 통해 입력값을 제출하여 처리를 시작합니다.
3. **모니터링**: 완료될 때까지 `GET /{kickoff_id}/status`를 주기적으로 조회합니다.
3. **모니터링**: 완료될 때까지 `GET /status/{kickoff_id}`를 주기적으로 조회합니다.
4. **결과**: 완료된 응답에서 최종 출력을 추출합니다.
## 오류 처리

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "실행 상태 조회"
openapi: "/enterprise-api.ko.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.ko.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 5월 4일">
## v1.14.5a2
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## 변경 사항
### 버그 수정
- finally 블록에서 작업 출력 복원 수정
- 완료 토큰에 `thoughts_token_count` 포함
- 비동기 배치 플러시 간 작업 출력 보존
- `CrewAIRagAdapter`의 로더 호출에 kwargs 전달
- `result_as_answer`가 후크 차단 메시지를 최종 답변으로 반환하지 않도록 방지
- `result_as_answer`가 오류를 최종 답변으로 반환하지 않도록 방지
- 비동기 경로에서 출력 변환을 위해 `acall` 사용
- 에이전트 간 공유 LLM 중지 단어 변형 방지
- `convert_to_model`에서 `BaseModel` 입력 처리
### 문서화
- 추가 환경 변수 문서화
- v1.14.5a1에 대한 변경 로그 및 버전 업데이트
## 기여자
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="2026년 5월 1일">
## v1.14.5a1

View File

@@ -26,7 +26,7 @@ Bem-vindo à referência da API do CrewAI AMP. Esta API permite que você intera
</Step>
<Step title="Monitore o Progresso">
Use `GET /{kickoff_id}/status` para checar o status da execução e recuperar os resultados.
Use `GET /status/{kickoff_id}` para checar o status da execução e recuperar os resultados.
</Step>
</Steps>
@@ -65,7 +65,7 @@ Substitua `your-crew-name` pela URL real do seu crew no painel.
1. **Descoberta**: Chame `GET /inputs` para entender o que seu crew precisa
2. **Execução**: Envie os inputs via `POST /kickoff` para iniciar o processamento
3. **Monitoramento**: Faça polling em `GET /{kickoff_id}/status` até a conclusão
3. **Monitoramento**: Faça polling em `GET /status/{kickoff_id}` até a conclusão
4. **Resultados**: Extraia o output final da resposta concluída
## Tratamento de Erros

View File

@@ -1,6 +1,6 @@
---
title: "GET /{kickoff_id}/status"
title: "GET /status/{kickoff_id}"
description: "Obter o status da execução"
openapi: "/enterprise-api.pt-BR.yaml GET /{kickoff_id}/status"
openapi: "/enterprise-api.pt-BR.yaml GET /status/{kickoff_id}"
mode: "wide"
---

View File

@@ -4,6 +4,34 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="04 mai 2026">
## v1.14.5a2
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.14.5a2)
## O que Mudou
### Correções de Bugs
- Corrigir a restauração da saída da tarefa no bloco finally
- Incluir `thoughts_token_count` nos tokens de conclusão
- Preservar as saídas das tarefas durante o descarregamento assíncrono em lote
- Encaminhar kwargs para chamadas de carregador em `CrewAIRagAdapter`
- Impedir que `result_as_answer` retorne mensagem de bloqueio de hook como resposta final
- Impedir que `result_as_answer` retorne erro como resposta final
- Usar `acall` para conversão de saída em caminhos assíncronos
- Prevenir a mutação de palavras de parada compartilhadas do LLM entre agentes
- Lidar com entrada `BaseModel` em `convert_to_model`
### Documentação
- Documentar variáveis de ambiente adicionais
- Atualizar changelog e versão para v1.14.5a1
## Contribuidores
@NIK-TIGER-BILL, @greysonlalonde, @lorenzejay, @minasami-pr, @theCyberTech, @wishhyt
</Update>
<Update label="01 mai 2026">
## v1.14.5a1

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"

View File

@@ -10,7 +10,7 @@ requires-python = ">=3.10, <3.14"
dependencies = [
"pytube~=15.0.0",
"requests>=2.33.0,<3",
"crewai==1.14.5a1",
"crewai==1.14.5a2",
"tiktoken>=0.8.0,<0.13",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",
@@ -107,7 +107,7 @@ stagehand = [
"stagehand>=0.4.1",
]
github = [
"gitpython>=3.1.41,<4",
"gitpython>=3.1.47,<4",
"PyGithub==1.59.1",
]
rag = [

View File

@@ -330,4 +330,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"

View File

@@ -268,7 +268,9 @@ class CrewAIRagAdapter(Adapter):
file_chunker = file_data_type.get_chunker()
file_source = SourceContent(file_path)
file_result: LoaderResult = file_loader.load(file_source)
file_result: LoaderResult = file_loader.load(
file_source, **kwargs
)
file_chunks = file_chunker.chunk(file_result.content)
@@ -319,7 +321,7 @@ class CrewAIRagAdapter(Adapter):
loader = data_type.get_loader()
chunker = data_type.get_chunker()
loader_result: LoaderResult = loader.load(source_content)
loader_result: LoaderResult = loader.load(source_content, **kwargs)
chunks = chunker.chunk(loader_result.content)

View File

@@ -55,7 +55,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.14.5a1",
"crewai-tools==1.14.5a2",
]
embeddings = [
"tiktoken>=0.8.0,<0.13"

View File

@@ -48,7 +48,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
"Memory": ("crewai.memory.unified_memory", "Memory"),

View File

@@ -386,8 +386,7 @@ def _execute_task_with_a2a(
return raw_result
finally:
task.description = original_description
if task.output_pydantic is not None:
task.output_pydantic = original_output_pydantic
task.output_pydantic = original_output_pydantic
task.response_model = original_response_model
@@ -1534,8 +1533,7 @@ async def _aexecute_task_with_a2a(
return raw_result
finally:
task.description = original_description
if task.output_pydantic is not None:
task.output_pydantic = original_output_pydantic
task.output_pydantic = original_output_pydantic
task.response_model = original_response_model

View File

@@ -1102,16 +1102,6 @@ class Agent(BaseAgent):
self.agent_executor.tools_handler = self.tools_handler
self.agent_executor.request_within_rpm_limit = rpm_limit_fn
if isinstance(self.agent_executor.llm, BaseLLM):
existing_stop = getattr(self.agent_executor.llm, "stop", [])
self.agent_executor.llm.stop = list(
set(
existing_stop + stop_words
if isinstance(existing_stop, list)
else stop_words
)
)
def get_delegation_tools(self, agents: Sequence[BaseAgent]) -> list[BaseTool]:
agent_tools = AgentTools(agents=agents)
return agent_tools.tools()

View File

@@ -49,6 +49,7 @@ from crewai.hooks.tool_hooks import (
)
from crewai.types.callback import SerializableCallable
from crewai.utilities.agent_utils import (
_llm_stop_words_applied,
aget_llm_response,
convert_tools_to_openai_schema,
enforce_rpm_limit,
@@ -141,15 +142,6 @@ class CrewAgentExecutor(BaseAgentExecutor):
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
if not self.after_llm_call_hooks:
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
if self.llm and not isinstance(self.llm, str):
existing_stop = getattr(self.llm, "stop", [])
self.llm.stop = list(
set(
existing_stop + self.stop
if isinstance(existing_stop, list)
else self.stop
)
)
@property
def use_stop_words(self) -> bool:
@@ -210,21 +202,22 @@ class CrewAgentExecutor(BaseAgentExecutor):
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
try:
formatted_answer = self._invoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
with _llm_stop_words_applied(self.llm, self):
try:
formatted_answer = self._invoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
if self.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._save_to_memory(formatted_answer)
return {"output": formatted_answer.output}
@@ -1082,21 +1075,22 @@ class CrewAgentExecutor(BaseAgentExecutor):
self.ask_for_human_input = bool(inputs.get("ask_for_human_input", False))
try:
formatted_answer = await self._ainvoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
with _llm_stop_words_applied(self.llm, self):
try:
formatted_answer = await self._ainvoke_loop()
except AssertionError:
if self.agent.verbose:
PRINTER.print(
content="Agent failed to reach a final answer. This is likely a bug - please report it.",
color="red",
)
raise
except Exception as e:
handle_unknown_error(PRINTER, e, verbose=self.agent.verbose)
raise
if self.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(formatted_answer)
if self.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(formatted_answer)
self._save_to_memory(formatted_answer)
return {"output": formatted_answer.output}

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.5a1"
"crewai[tools]==1.14.5a2"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.5a1"
"crewai[tools]==1.14.5a2"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "Power up your crews with {{folder_name}}"
readme = "README.md"
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.14.5a1"
"crewai[tools]==1.14.5a2"
]
[tool.crewai]

View File

@@ -1283,8 +1283,8 @@ class Crew(FlowTrackable, BaseModel):
pending_tasks.append((task, async_task, task_index))
else:
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(
pending_tasks, was_replayed
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
pending_tasks.clear()
@@ -1299,7 +1299,9 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
return self._create_crew_output(task_outputs)
@@ -1313,7 +1315,9 @@ class Crew(FlowTrackable, BaseModel):
) -> TaskOutput | None:
"""Handle conditional task evaluation using native async."""
if pending_tasks:
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
pending_tasks.clear()
return check_conditional_skip(
@@ -1489,7 +1493,9 @@ class Crew(FlowTrackable, BaseModel):
futures.append((task, future, task_index))
else:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(
self._process_async_tasks(futures, was_replayed)
)
futures.clear()
context = self._get_context(task, task_outputs)
@@ -1503,7 +1509,7 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
return self._create_crew_output(task_outputs)
@@ -1516,7 +1522,7 @@ class Crew(FlowTrackable, BaseModel):
was_replayed: bool,
) -> TaskOutput | None:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
futures.clear()
return check_conditional_skip(

View File

@@ -71,6 +71,7 @@ from crewai.hooks.types import (
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.agent_utils import (
_llm_stop_words_applied,
check_native_tool_support,
enforce_rpm_limit,
extract_tool_call_info,
@@ -215,12 +216,6 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
if self.llm:
existing_stop = getattr(self.llm, "stop", [])
if not isinstance(existing_stop, list):
existing_stop = []
self.llm.stop = list(set(existing_stop + self.stop_words))
self._state = AgentExecutorState()
self.max_method_calls = self.max_iter * 10
@@ -2601,17 +2596,18 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
inputs.get("ask_for_human_input", False)
)
self.kickoff()
with _llm_stop_words_applied(self.llm, self):
self.kickoff()
formatted_answer = self.state.current_answer
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
if self.state.ask_for_human_input:
formatted_answer = self._handle_human_feedback(formatted_answer)
self._save_to_memory(formatted_answer)
@@ -2691,18 +2687,20 @@ class AgentExecutor(Flow[AgentExecutorState], BaseAgentExecutor):
inputs.get("ask_for_human_input", False)
)
# Use async kickoff directly since we're already in an async context
await self.kickoff_async()
with _llm_stop_words_applied(self.llm, self):
await self.kickoff_async()
formatted_answer = self.state.current_answer
formatted_answer = self.state.current_answer
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if not isinstance(formatted_answer, AgentFinish):
raise RuntimeError(
"Agent execution ended without reaching a final answer."
)
if self.state.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(formatted_answer)
if self.state.ask_for_human_input:
formatted_answer = await self._ahandle_human_feedback(
formatted_answer
)
self._save_to_memory(formatted_answer)

View File

@@ -688,7 +688,9 @@ class LLM(BaseLLM):
"temperature": self.temperature,
"top_p": self.top_p,
"n": self.n,
"stop": (self.stop or None) if self.supports_stop_words() else None,
"stop": (self.stop_sequences or None)
if self.supports_stop_words()
else None,
"max_tokens": self.max_tokens or self.max_completion_tokens,
"presence_penalty": self.presence_penalty,
"frequency_penalty": self.frequency_penalty,

View File

@@ -72,6 +72,9 @@ _JSON_EXTRACTION_PATTERN: Final[re.Pattern[str]] = re.compile(r"\{.*}", re.DOTAL
_current_call_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"_current_call_id", default=None
)
_call_stop_override_var: contextvars.ContextVar[dict[int, list[str]] | None] = (
contextvars.ContextVar("_call_stop_override_var", default=None)
)
@contextmanager
@@ -85,6 +88,31 @@ def llm_call_context() -> Generator[str, None, None]:
_current_call_id.reset(token)
@contextmanager
def call_stop_override(
llm: BaseLLM, stop: list[str] | None
) -> Generator[None, None, None]:
"""Override the stop list for ``llm`` within the current call scope.
Only ``llm``'s reads via :attr:`BaseLLM.stop_sequences` see ``stop``;
other LLM instances (e.g. an agent's ``function_calling_llm``) keep their
own ``stop`` field. Passing ``None`` clears any prior override for ``llm``
in the same scope. The instance-level ``stop`` field is never mutated,
so the override is safe under concurrent execution.
"""
current = _call_stop_override_var.get()
new_overrides: dict[int, list[str]] = dict(current) if current else {}
if stop is None:
new_overrides.pop(id(llm), None)
else:
new_overrides[id(llm)] = stop
token = _call_stop_override_var.set(new_overrides)
try:
yield
finally:
_call_stop_override_var.reset(token)
def get_current_call_id() -> str:
"""Get current call_id from context"""
call_id = _current_call_id.get()
@@ -158,11 +186,18 @@ class BaseLLM(BaseModel, ABC):
@property
def stop_sequences(self) -> list[str]:
"""Alias for ``stop`` — kept for backward compatibility with provider APIs.
"""Stop list active for the current call.
Writes are handled by ``__setattr__``, which normalizes and redirects
``stop_sequences`` assignments to the ``stop`` field.
Returns the per-instance override set via :func:`call_stop_override`
when one is in effect for this LLM; otherwise the instance-level
``stop`` field. Kept under this name for backward compatibility with
provider APIs that already read ``stop_sequences``.
"""
overrides = _call_stop_override_var.get()
if overrides is not None:
override = overrides.get(id(self))
if override is not None:
return override
return self.stop
_token_usage: dict[str, int] = PrivateAttr(
@@ -341,7 +376,7 @@ class BaseLLM(BaseModel, ABC):
Returns:
True if stop words are configured and can be applied
"""
return bool(self.stop)
return bool(self.stop_sequences)
def _apply_stop_words(self, content: str) -> str:
"""Apply stop words to truncate response content.
@@ -363,14 +398,14 @@ class BaseLLM(BaseModel, ABC):
>>> llm._apply_stop_words(response)
"I need to search.\\n\\nAction: search"
"""
if not self.stop or not content:
stops = self.stop_sequences
if not stops or not content:
return content
# Find the earliest occurrence of any stop word
earliest_stop_pos = len(content)
found_stop_word = None
for stop_word in self.stop:
for stop_word in stops:
stop_pos = content.find(stop_word)
if stop_pos != -1 and stop_pos < earliest_stop_pos:
earliest_stop_pos = stop_pos

View File

@@ -679,8 +679,9 @@ class AzureCompletion(BaseLLM):
params["presence_penalty"] = self.presence_penalty
if self.max_tokens is not None:
params["max_tokens"] = self.max_tokens
if self.stop and self.supports_stop_words():
params["stop"] = self.stop
stops = self.stop_sequences
if stops and self.supports_stop_words():
params["stop"] = stops
# Handle tools/functions for Azure OpenAI models
if tools and self.is_openai_model:

View File

@@ -1328,9 +1328,11 @@ class GeminiCompletion(BaseLLM):
usage = response.usage_metadata
cached_tokens = getattr(usage, "cached_content_token_count", 0) or 0
thinking_tokens = getattr(usage, "thoughts_token_count", 0) or 0
candidates_tokens = getattr(usage, "candidates_token_count", 0) or 0
result: dict[str, Any] = {
"prompt_token_count": getattr(usage, "prompt_token_count", 0),
"candidates_token_count": getattr(usage, "candidates_token_count", 0),
"candidates_token_count": candidates_tokens,
"completion_tokens": candidates_tokens + thinking_tokens,
"total_token_count": getattr(usage, "total_token_count", 0),
"total_tokens": getattr(usage, "total_token_count", 0),
"cached_prompt_tokens": cached_tokens,

View File

@@ -112,6 +112,37 @@ class LanceDBStorage:
with store_lock(self._lock_name):
self._table = self._create_table(vector_dim)
@staticmethod
def _escape_sql_str(value: Any) -> str:
"""Escape a string for inclusion inside a single-quoted SQL literal.
LanceDB's ``where()`` accepts an Apache DataFusion SQL expression as a
raw string -- it does not support parameterized queries. Any caller-
supplied value (scope path, record id, etc.) that is interpolated into
a string literal MUST first have its single quotes doubled, otherwise
an attacker (or simply a record with an apostrophe in its id) can
terminate the literal early and inject arbitrary SQL.
"""
return str(value).replace("'", "''")
@staticmethod
def _escape_like(value: Any) -> str:
"""Escape a string for use as a literal prefix inside a ``LIKE`` clause.
Doubles single quotes (so the pattern can't break out of its literal)
and escapes the SQL ``LIKE`` metacharacters ``%`` and ``_`` so that
callers passing those characters in a scope path don't accidentally
(or maliciously) widen the match. The returned pattern fragment is
intended to be paired with ``ESCAPE '\\'``.
"""
return (
str(value)
.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_")
.replace("'", "''")
)
@staticmethod
def _infer_dim_from_table(table: Any) -> int:
"""Read vector dimension from an existing table's schema."""
@@ -317,7 +348,7 @@ class LanceDBStorage:
"""Update a record by ID. Preserves created_at, updates last_accessed."""
with store_lock(self._lock_name):
self._ensure_table()
safe_id = str(record.id).replace("'", "''")
safe_id = self._escape_sql_str(record.id)
self._do_write("delete", f"id = '{safe_id}'")
row = self._record_to_row(record)
if row["vector"] is None or len(row["vector"]) != self._vector_dim:
@@ -338,7 +369,7 @@ class LanceDBStorage:
return
with store_lock(self._lock_name):
now = datetime.utcnow().isoformat()
safe_ids = [str(rid).replace("'", "''") for rid in record_ids]
safe_ids = [self._escape_sql_str(rid) for rid in record_ids]
ids_expr = ", ".join(f"'{rid}'" for rid in safe_ids)
self._do_write(
"update",
@@ -350,7 +381,7 @@ class LanceDBStorage:
"""Return a single record by ID, or None if not found."""
if self._table is None:
return None
safe_id = str(record_id).replace("'", "''")
safe_id = self._escape_sql_str(record_id)
rows = self._table.search().where(f"id = '{safe_id}'").limit(1).to_list()
if not rows:
return None
@@ -370,8 +401,8 @@ class LanceDBStorage:
query = self._table.search(query_embedding)
if scope_prefix is not None and scope_prefix.strip("/"):
prefix = scope_prefix.rstrip("/")
like_val = prefix + "%"
query = query.where(f"scope LIKE '{like_val}'")
like_val = self._escape_like(prefix) + "%"
query = query.where(f"scope LIKE '{like_val}' ESCAPE '\\'")
results = query.limit(
limit * 3 if (categories or metadata_filter) else limit
).to_list()
@@ -405,7 +436,8 @@ class LanceDBStorage:
with store_lock(self._lock_name):
if record_ids and not (categories or metadata_filter):
before = int(self._table.count_rows())
ids_expr = ", ".join(f"'{rid}'" for rid in record_ids)
safe_ids = [self._escape_sql_str(rid) for rid in record_ids]
ids_expr = ", ".join(f"'{rid}'" for rid in safe_ids)
self._do_write("delete", f"id IN ({ids_expr})")
return before - int(self._table.count_rows())
if categories or metadata_filter:
@@ -427,7 +459,8 @@ class LanceDBStorage:
if not to_delete:
return 0
before = int(self._table.count_rows())
ids_expr = ", ".join(f"'{rid}'" for rid in to_delete)
safe_ids = [self._escape_sql_str(rid) for rid in to_delete]
ids_expr = ", ".join(f"'{rid}'" for rid in safe_ids)
self._do_write("delete", f"id IN ({ids_expr})")
return before - int(self._table.count_rows())
conditions = []
@@ -435,9 +468,13 @@ class LanceDBStorage:
prefix = scope_prefix.rstrip("/")
if not prefix.startswith("/"):
prefix = "/" + prefix
conditions.append(f"scope LIKE '{prefix}%' OR scope = '/'")
like_val = self._escape_like(prefix) + "%"
conditions.append(
f"(scope LIKE '{like_val}' ESCAPE '\\' OR scope = '/')"
)
if older_than is not None:
conditions.append(f"created_at < '{older_than.isoformat()}'")
safe_ts = self._escape_sql_str(older_than.isoformat())
conditions.append(f"created_at < '{safe_ts}'")
if not conditions:
before = int(self._table.count_rows())
self._do_write("delete", "id != ''")
@@ -469,7 +506,8 @@ class LanceDBStorage:
return []
q = self._table.search()
if scope_prefix is not None and scope_prefix.strip("/"):
q = q.where(f"scope LIKE '{scope_prefix.rstrip('/')}%'")
like_val = self._escape_like(scope_prefix.rstrip("/")) + "%"
q = q.where(f"scope LIKE '{like_val}' ESCAPE '\\'")
if columns is not None:
q = q.select(columns)
result: list[dict[str, Any]] = q.limit(limit).to_list()
@@ -595,8 +633,10 @@ class LanceDBStorage:
return
prefix = scope_prefix.rstrip("/")
if prefix:
safe_prefix = self._escape_sql_str(prefix)
self._do_write(
"delete", f"scope >= '{prefix}' AND scope < '{prefix}/\uffff'"
"delete",
f"scope >= '{safe_prefix}' AND scope < '{safe_prefix}/\uffff'",
)
def optimize(self) -> None:

View File

@@ -53,7 +53,11 @@ from crewai.tasks.task_output import TaskOutput
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.converter import (
Converter,
async_convert_to_model,
convert_to_model,
)
from crewai.utilities.file_store import (
clear_task_files,
get_all_files,
@@ -681,7 +685,7 @@ class Task(BaseModel):
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
else:
raw = result
pydantic_output, json_output = None, None
@@ -1110,7 +1114,7 @@ Follow these guidelines:
)
def _export_output(
self, result: str
self, result: str | BaseModel
) -> tuple[BaseModel | None, dict[str, Any] | None]:
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
@@ -1123,19 +1127,44 @@ Follow these guidelines:
self.agent,
self.converter_cls,
)
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
pydantic_output, json_output = self._unpack_model_output(model_output)
return pydantic_output, json_output
async def _aexport_output(
self, result: str | BaseModel
) -> tuple[BaseModel | None, dict[str, Any] | None]:
"""Async equivalent of ``_export_output`` — uses ``acall`` so the event loop is not blocked."""
pydantic_output: BaseModel | None = None
json_output: dict[str, Any] | None = None
if self.output_pydantic or self.output_json:
model_output = await async_convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
pydantic_output, json_output = self._unpack_model_output(model_output)
return pydantic_output, json_output
@staticmethod
def _unpack_model_output(
model_output: dict[str, Any] | BaseModel | str,
) -> tuple[BaseModel | None, dict[str, Any] | None]:
if isinstance(model_output, BaseModel):
return model_output, None
if isinstance(model_output, dict):
return None, model_output
if isinstance(model_output, str):
try:
return None, json.loads(model_output)
except json.JSONDecodeError:
return None, None
return None, None
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
@@ -1364,7 +1393,7 @@ Follow these guidelines:
if isinstance(guardrail_result.result, str):
task_output.raw = guardrail_result.result
pydantic_output, json_output = self._export_output(
pydantic_output, json_output = await self._aexport_output(
guardrail_result.result
)
task_output.pydantic = pydantic_output
@@ -1421,7 +1450,7 @@ Follow these guidelines:
json_output = None
else:
raw = result
pydantic_output, json_output = self._export_output(result)
pydantic_output, json_output = await self._aexport_output(result)
task_output = TaskOutput(
name=self.name or self.description,

View File

@@ -1,8 +1,9 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable, Sequence
from collections.abc import Callable, Iterator, Sequence
import concurrent.futures
import contextlib
import contextvars
from dataclasses import dataclass, field
from datetime import datetime
@@ -22,7 +23,7 @@ from crewai.agents.parser import (
parse,
)
from crewai.cli.config import Settings
from crewai.llms.base_llm import BaseLLM
from crewai.llms.base_llm import BaseLLM, call_stop_override
from crewai.tools import BaseTool as CrewAITool
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
@@ -238,6 +239,38 @@ def extract_task_section(text: str) -> str:
return text
def _executor_stop_words(
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
) -> list[str]:
"""Return the executor's stop words, regardless of which field name it uses."""
if executor_context is None:
return []
stops = getattr(executor_context, "stop", None)
if stops is None:
stops = getattr(executor_context, "stop_words", None)
return list(stops) if stops else []
@contextlib.contextmanager
def _llm_stop_words_applied(
llm: LLM | BaseLLM,
executor_context: CrewAgentExecutor | AgentExecutor | LiteAgent | None,
) -> Iterator[None]:
"""Apply the executor's stop words to the LLM for the duration of one call.
Uses :func:`crewai.llms.base_llm.call_stop_override` so the LLM's stop
field is never mutated. Safe under concurrent execution: the override is
propagated via a :class:`contextvars.ContextVar` and is scoped to this
call's task / thread context.
"""
extra = _executor_stop_words(executor_context)
if not extra or not isinstance(llm, BaseLLM) or set(extra).issubset(llm.stop):
yield
return
with call_stop_override(llm, list(set(llm.stop + extra))):
yield
def has_reached_max_iterations(iterations: int, max_iterations: int) -> bool:
"""Check if the maximum number of iterations has been reached.
@@ -459,18 +492,15 @@ def get_llm_response(
"""
messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose)
try:
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
raise e
answer = llm.call(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return _validate_and_finalize_llm_response(
answer, executor_context, printer, verbose=verbose
@@ -515,18 +545,15 @@ async def aget_llm_response(
"""
messages = _prepare_llm_call(executor_context, messages, printer, verbose=verbose)
try:
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
except Exception as e:
raise e
answer = await llm.acall(
messages,
tools=tools,
callbacks=callbacks,
available_functions=available_functions,
from_task=from_task,
from_agent=from_agent,
response_model=response_model,
)
return _validate_and_finalize_llm_response(
answer, executor_context, printer, verbose=verbose
@@ -1565,11 +1592,12 @@ def execute_single_native_tool_call(
color="green",
)
# Check result_as_answer
is_result_as_answer = bool(
original_tool
and hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer
and not error_event_emitted
and not hook_blocked
)
return NativeToolCallResult(

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import json
import re
from typing import TYPE_CHECKING, Any, Final, TypedDict
@@ -41,6 +42,45 @@ class ConverterError(Exception):
class Converter(OutputConverter):
"""Class that converts text into either pydantic or json."""
def _build_messages(self) -> list[dict[str, str]]:
return [
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
def _coerce_response_to_pydantic(self, response: Any) -> BaseModel:
"""Validate an LLM response into the configured Pydantic model.
Pure post-processing — performs no I/O. Shared by ``to_pydantic`` and
``ato_pydantic`` so the validation/partial-JSON fallback logic stays in
a single place.
"""
if isinstance(response, BaseModel):
return response
try:
return self.model.model_validate_json(response)
except ValidationError:
partial = handle_partial_json(
result=response,
model=self.model,
is_json_output=False,
agent=None,
)
if isinstance(partial, BaseModel):
return partial
if isinstance(partial, dict):
return self.model.model_validate(partial)
if isinstance(partial, str):
try:
return self.model.model_validate_json(partial)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
) from parse_err
raise ConverterError(
"handle_partial_json returned an unexpected type."
) from None
def to_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Convert text to pydantic.
@@ -56,50 +96,12 @@ class Converter(OutputConverter):
try:
if self.llm.supports_function_calling():
response = self.llm.call(
messages=[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
],
messages=self._build_messages(),
response_model=self.model,
)
if isinstance(response, BaseModel):
result = response
else:
result = self.model.model_validate_json(response)
else:
response = self.llm.call(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
try:
# Try to directly validate the response JSON
result = self.model.model_validate_json(response)
except ValidationError:
# If direct validation fails, attempt to extract valid JSON
result = handle_partial_json( # type: ignore[assignment]
result=response,
model=self.model,
is_json_output=False,
agent=None,
)
# Ensure result is a BaseModel instance
if not isinstance(result, BaseModel):
if isinstance(result, dict):
result = self.model.model_validate(result)
elif isinstance(result, str):
try:
result = self.model.model_validate_json(result)
except Exception as parse_err:
raise ConverterError(
f"Failed to convert partial JSON result into Pydantic: {parse_err}"
) from parse_err
else:
raise ConverterError(
"handle_partial_json returned an unexpected type."
) from None
return result
response = self.llm.call(self._build_messages())
return self._coerce_response_to_pydantic(response)
except ValidationError as e:
if current_attempt < self.max_attempts:
return self.to_pydantic(current_attempt + 1)
@@ -113,6 +115,30 @@ class Converter(OutputConverter):
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e
async def ato_pydantic(self, current_attempt: int = 1) -> BaseModel:
"""Async equivalent of ``to_pydantic`` — uses ``acall`` so the event loop is not blocked."""
try:
if self.llm.supports_function_calling():
response = await self.llm.acall(
messages=self._build_messages(),
response_model=self.model,
)
else:
response = await self.llm.acall(self._build_messages())
return self._coerce_response_to_pydantic(response)
except ValidationError as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to validation error: {e}"
) from e
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_pydantic(current_attempt + 1)
raise ConverterError(
f"Failed to convert text into a Pydantic model due to error: {e}"
) from e
def to_json(self, current_attempt: int = 1) -> str | ConverterError | Any: # type: ignore[override]
"""Convert text to json.
@@ -129,19 +155,28 @@ class Converter(OutputConverter):
try:
if self.llm.supports_function_calling():
return self._create_instructor().to_json()
return json.dumps(
self.llm.call(
[
{"role": "system", "content": self.instructions},
{"role": "user", "content": self.text},
]
)
)
return json.dumps(self.llm.call(self._build_messages()))
except Exception as e:
if current_attempt < self.max_attempts:
return self.to_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
async def ato_json(self, current_attempt: int = 1) -> str | ConverterError | Any:
"""Async equivalent of ``to_json``.
The function-calling path delegates to ``InternalInstructor`` (currently
sync-only); we run it via ``asyncio.to_thread`` so the event loop stays
free.
"""
try:
if self.llm.supports_function_calling():
return await asyncio.to_thread(self._create_instructor().to_json)
return json.dumps(await self.llm.acall(self._build_messages()))
except Exception as e:
if current_attempt < self.max_attempts:
return await self.ato_json(current_attempt + 1)
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
def _create_instructor(self) -> InternalInstructor[Any]:
"""Create an instructor."""
@@ -153,16 +188,18 @@ class Converter(OutputConverter):
def convert_to_model(
result: str,
result: str | BaseModel,
output_pydantic: type[BaseModel] | None,
output_json: type[BaseModel] | None,
agent: Agent | BaseAgent | None = None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Convert a result string to a Pydantic model or JSON.
"""Convert a result to a Pydantic model or JSON.
Args:
result: The result string to convert.
result: The result to convert. Usually a JSON string, but a Pydantic
instance is also accepted when an upstream caller already produced
a structured object.
output_pydantic: The Pydantic model class to convert to.
output_json: The Pydantic model class to convert to JSON.
agent: The agent instance.
@@ -175,6 +212,11 @@ def convert_to_model(
if model is None:
return result
if isinstance(result, BaseModel):
if isinstance(result, model):
return result.model_dump() if output_json else result
result = result.model_dump_json()
if converter_cls:
return convert_with_instructions(
result=result,
@@ -347,6 +389,144 @@ def convert_with_instructions(
return exported_result
async def async_convert_to_model(
result: str | BaseModel,
output_pydantic: type[BaseModel] | None,
output_json: type[BaseModel] | None,
agent: Agent | BaseAgent | None = None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``convert_to_model`` — uses native ``acall``.
Mirrors the dispatch semantics of the sync version exactly; the only
difference is that LLM-bearing branches are awaited.
"""
model = output_pydantic or output_json
if model is None:
return result
if isinstance(result, BaseModel):
if isinstance(result, model):
return result.model_dump() if output_json else result
result = result.model_dump_json()
if converter_cls:
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(
result=escaped_result, model=model, is_json_output=bool(output_json)
)
except (json.JSONDecodeError, ValidationError):
return await async_handle_partial_json(
result=result,
model=model,
is_json_output=bool(output_json),
agent=agent,
converter_cls=converter_cls,
)
except Exception as e:
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result
async def async_handle_partial_json(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``handle_partial_json`` — defers LLM fallback to ``acall``."""
match = _JSON_PATTERN.search(result)
if match:
try:
parsed = json.loads(match.group(), strict=False)
except json.JSONDecodeError:
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)
try:
exported_result = model.model_validate(parsed)
if is_json_output:
return exported_result.model_dump()
return exported_result
except ValidationError:
raise
except Exception as e:
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)
return await async_convert_with_instructions(
result=result,
model=model,
is_json_output=is_json_output,
agent=agent,
converter_cls=converter_cls,
)
async def async_convert_with_instructions(
result: str,
model: type[BaseModel],
is_json_output: bool,
agent: Agent | BaseAgent | None,
converter_cls: type[Converter] | None = None,
) -> dict[str, Any] | BaseModel | str:
"""Async equivalent of ``convert_with_instructions`` — calls ``ato_pydantic``/``ato_json``."""
if agent is None:
raise TypeError("Agent must be provided if converter_cls is not specified.")
llm = getattr(agent, "function_calling_llm", None) or agent.llm
if llm is None:
raise ValueError("Agent must have a valid LLM instance for conversion")
instructions = get_conversion_instructions(model=model, llm=llm)
converter = create_converter(
agent=agent,
converter_cls=converter_cls,
llm=llm,
text=result,
model=model,
instructions=instructions,
)
exported_result = (
await converter.ato_pydantic()
if not is_json_output
else await converter.ato_json()
)
if isinstance(exported_result, ConverterError):
if agent and getattr(agent, "verbose", True):
PRINTER.print(
content=f"Failed to convert result to model: {exported_result}",
color="red",
)
return result
return exported_result
def get_conversion_instructions(
model: type[BaseModel], llm: BaseLLM | LLM | str | Any
) -> str:

View File

@@ -2452,3 +2452,167 @@ def test_agent_mcps_accepts_legacy_prefix_with_tool():
mcps=["crewai-amp:notion#get_page"],
)
assert agent.mcps == ["crewai-amp:notion#get_page"]
class TestSharedLLMStopWords:
"""Regression tests for shared LLM stop words mutation (issue #5141).
Stop words from one executor must not leak into the shared LLM permanently
or pollute other agents sharing that LLM.
"""
@staticmethod
def _make_executor(llm: LLM, stop_words: list[str]) -> CrewAgentExecutor:
"""Build a CrewAgentExecutor with minimal deps."""
from crewai.agents.tools_handler import ToolsHandler
agent = Agent(role="r", goal="g", backstory="b")
task = Task(description="d", expected_output="o", agent=agent)
return CrewAgentExecutor(
agent=agent,
task=task,
llm=llm,
crew=None,
prompt={"prompt": "p {input} {tool_names} {tools}"},
max_iter=5,
tools=[],
tools_names="",
stop_words=stop_words,
tools_description="",
tools_handler=ToolsHandler(),
)
def test_executor_init_does_not_mutate_shared_llm(self) -> None:
"""Constructing executors must not touch the shared LLM's stop list."""
shared = LLM(model="gpt-4", stop=["Original:"])
original = list(shared.stop)
a = self._make_executor(shared, stop_words=["StopA:"])
b = self._make_executor(shared, stop_words=["StopB:"])
assert shared.stop == original
assert a.llm is shared
assert b.llm is shared
def test_effective_stop_reflects_override_inside_context(self) -> None:
"""Inside the helper, the effective stop list includes the executor's words."""
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
with _llm_stop_words_applied(shared, executor):
assert set(shared.stop_sequences) == {"Original:", "Observation:"}
assert shared.stop == ["Original:"]
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_override_cleared_when_context_raises(self) -> None:
"""A failed call must still clear the per-call stop override."""
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
try:
with _llm_stop_words_applied(shared, executor):
raise RuntimeError("boom")
except RuntimeError:
pass
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_override_applies_for_post_processing_when_api_lacks_stop_support(
self,
) -> None:
"""Models that lack API-level stop support still need the override.
Native providers (e.g. Azure on gpt-5/o-series) read ``stop_sequences``
in ``_apply_stop_words`` to truncate the response post-hoc even when
``supports_stop_words()`` returns False, so the override must be set
regardless of API-level support. (Issue raised by Cursor Bugbot.)
"""
from unittest.mock import patch
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
with patch.object(shared, "supports_stop_words", return_value=False):
with _llm_stop_words_applied(shared, executor):
assert set(shared.stop_sequences) == {"Original:", "Observation:"}
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_concurrent_overrides_do_not_collide(self) -> None:
"""Concurrent agents on a shared LLM must each see their own effective stop."""
import asyncio
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
exec_a = self._make_executor(shared, stop_words=["StopA:"])
exec_b = self._make_executor(shared, stop_words=["StopB:"])
async def run(executor: CrewAgentExecutor, expected: str) -> set[str]:
with _llm_stop_words_applied(shared, executor):
await asyncio.sleep(0)
seen = set(shared.stop_sequences)
assert expected in seen
return seen
async def main() -> tuple[set[str], set[str]]:
return await asyncio.gather(
run(exec_a, "StopA:"), run(exec_b, "StopB:")
)
a_seen, b_seen = asyncio.run(main())
assert a_seen == {"Original:", "StopA:"}
assert b_seen == {"Original:", "StopB:"}
assert shared.stop == ["Original:"]
assert shared.stop_sequences == ["Original:"]
def test_override_does_not_leak_to_other_llm_instances(self) -> None:
"""Override for one LLM must not affect another LLM (e.g. function_calling_llm).
Regression for Cursor Bugbot: a global ContextVar would leak the
override to every BaseLLM that reads stop_sequences during the scope.
"""
from crewai.utilities.agent_utils import _llm_stop_words_applied
target = LLM(model="gpt-4", stop=["TargetStop:"])
other = LLM(model="gpt-4", stop=["OtherStop:"])
executor = self._make_executor(target, stop_words=["Observation:"])
with _llm_stop_words_applied(target, executor):
assert set(target.stop_sequences) == {"TargetStop:", "Observation:"}
assert other.stop_sequences == ["OtherStop:"]
assert target.stop_sequences == ["TargetStop:"]
assert other.stop_sequences == ["OtherStop:"]
def test_override_propagates_to_nested_direct_llm_calls(self) -> None:
"""Once invoke wraps with the override, nested direct llm.call sites
(StepExecutor, handle_max_iterations_exceeded) see the merged stops.
Regression for Cursor Bugbot: those direct call sites bypass
get_llm_response, so the override must be set at executor entry, not
only around get_llm_response.
"""
from crewai.utilities.agent_utils import _llm_stop_words_applied
shared = LLM(model="gpt-4", stop=["Original:"])
executor = self._make_executor(shared, stop_words=["Observation:"])
seen: list[set[str]] = []
def nested_direct_call() -> None:
seen.append(set(shared.stop_sequences))
with _llm_stop_words_applied(shared, executor):
nested_direct_call()
assert seen == [{"Original:", "Observation:"}]
assert shared.stop == ["Original:"]

View File

@@ -596,6 +596,35 @@ def test_gemini_token_usage_tracking():
assert usage.total_tokens > 0
def test_gemini_thoughts_tokens_counted_in_completion_and_total():
"""Gemini's thoughts_token_count must be folded into completion_tokens so the
tracked total matches the API's total_token_count for thinking models."""
from crewai.llms.providers.gemini.completion import GeminiCompletion
llm = GeminiCompletion(model="gemini-2.0-flash-001")
response = MagicMock()
response.usage_metadata = MagicMock(
prompt_token_count=100,
candidates_token_count=50,
thoughts_token_count=25,
total_token_count=175,
cached_content_token_count=0,
)
usage = llm._extract_token_usage(response)
assert usage["candidates_token_count"] == 50
assert usage["completion_tokens"] == 75
assert usage["reasoning_tokens"] == 25
llm._track_token_usage_internal(usage)
summary = llm.get_token_usage_summary()
assert summary.prompt_tokens == 100
assert summary.completion_tokens == 75
assert summary.total_tokens == 175
assert summary.reasoning_tokens == 25
@pytest.mark.vcr()
def test_gemini_tool_returning_float():
"""

View File

@@ -0,0 +1,271 @@
"""Regression tests for SQL-injection hardening of ``LanceDBStorage``.
Issue: GH #5728
LanceDB's ``where()`` accepts a raw Apache DataFusion SQL expression and does
not support parameterized queries. Earlier versions of ``LanceDBStorage``
interpolated caller-supplied scope paths and record IDs directly into the
WHERE clause via f-strings, which allowed:
* an unprivileged caller to escape the configured ``scope`` sandbox and
read / delete records belonging to any other scope, and
* legitimate strings containing single quotes (e.g. ``"O'Brien"``) to crash
with a SQL parse error.
These tests pin the hardened behaviour so the bug can never silently
regress.
"""
from __future__ import annotations
from pathlib import Path
import pytest
from crewai.memory.storage.lancedb_storage import LanceDBStorage
from crewai.memory.types import MemoryRecord
@pytest.fixture
def storage(tmp_path: Path) -> LanceDBStorage:
return LanceDBStorage(path=str(tmp_path / "mem"), vector_dim=4)
def _seed(storage: LanceDBStorage) -> None:
storage.save(
[
MemoryRecord(
id="alpha-1",
content="alpha",
scope="/alpha",
embedding=[0.1, 0.2, 0.3, 0.4],
),
MemoryRecord(
id="bravo-1",
content="bravo",
scope="/bravo",
embedding=[0.5, 0.6, 0.7, 0.8],
),
]
)
# ----------------------------------------------------------------------
# Helper unit tests
# ----------------------------------------------------------------------
def test_escape_sql_str_doubles_single_quotes() -> None:
assert LanceDBStorage._escape_sql_str("O'Brien") == "O''Brien"
assert LanceDBStorage._escape_sql_str("a'; DROP TABLE t;--") == "a''; DROP TABLE t;--"
# Non-string inputs are coerced.
assert LanceDBStorage._escape_sql_str(42) == "42"
def test_escape_like_escapes_metacharacters() -> None:
# Backslash is escaped first so subsequent escapes don't double-escape.
assert LanceDBStorage._escape_like("a\\b") == "a\\\\b"
assert LanceDBStorage._escape_like("a%b") == "a\\%b"
assert LanceDBStorage._escape_like("a_b") == "a\\_b"
assert LanceDBStorage._escape_like("O'Brien") == "O''Brien"
# All metacharacters together.
assert (
LanceDBStorage._escape_like("100%_off'\\")
== "100\\%\\_off''\\\\"
)
# ----------------------------------------------------------------------
# Sink 1: search() must not leak across scopes via injected scope_prefix
# ----------------------------------------------------------------------
def test_search_scope_prefix_injection_returns_no_match(
storage: LanceDBStorage,
) -> None:
_seed(storage)
# Classic ``' OR '1'='1`` style payload aimed at widening the LIKE.
payload = "/alpha' OR '1'='1"
results = storage.search([0.1, 0.2, 0.3, 0.4], scope_prefix=payload, limit=10)
assert results == []
def test_search_scope_prefix_with_apostrophe_does_not_crash(
storage: LanceDBStorage,
) -> None:
storage.save(
[
MemoryRecord(
id="x-1",
content="x",
scope="/O'Brien",
embedding=[0.1, 0.2, 0.3, 0.4],
)
]
)
# Must round-trip a legitimate scope containing an apostrophe.
results = storage.search(
[0.1, 0.2, 0.3, 0.4], scope_prefix="/O'Brien", limit=10
)
assert len(results) == 1
assert results[0][0].scope == "/O'Brien"
def test_search_scope_prefix_percent_is_literal_not_wildcard(
storage: LanceDBStorage,
) -> None:
"""A ``%`` in the user-supplied prefix must be treated as a literal,
not as a SQL ``LIKE`` wildcard that would match unrelated scopes."""
_seed(storage)
# ``/%`` would, without escaping, match every scope that starts with ``/``.
results = storage.search([0.1, 0.2, 0.3, 0.4], scope_prefix="/%", limit=10)
assert results == []
# ----------------------------------------------------------------------
# Sink 2: delete(scope_prefix=...) must not let an attacker wipe other scopes
# ----------------------------------------------------------------------
def test_delete_scope_prefix_injection_does_not_bypass_isolation(
storage: LanceDBStorage,
) -> None:
"""The most damaging payload from issue #5728: a malicious scope_prefix
that, before the fix, deleted every record in the table by appending
``OR scope LIKE '/%`` to the WHERE clause."""
_seed(storage)
assert storage.count() == 2
# Pre-fix, this WHERE evaluated to:
# scope LIKE '/alpha' OR scope LIKE '/%' OR scope = '/'
# which deletes /alpha AND /bravo. Post-fix the entire payload is
# treated as a literal prefix and matches nothing.
payload = "/alpha' OR scope LIKE '/%"
n = storage.delete(scope_prefix=payload)
assert n == 0
assert storage.count() == 2
# And the legitimate scoped delete must still work.
n = storage.delete(scope_prefix="/alpha")
assert n == 1
assert storage.count() == 1
remaining = storage.list_records()
assert [r.scope for r in remaining] == ["/bravo"]
# ----------------------------------------------------------------------
# Sink 3: delete(record_ids=[...]) must escape IDs
# ----------------------------------------------------------------------
def test_delete_record_ids_injection_does_not_match_real_rows(
storage: LanceDBStorage,
) -> None:
_seed(storage)
# An attacker-controlled "id" containing a quote used to either
# crash the SQL tokenizer or, worse, evaluate a tautology.
n = storage.delete(record_ids=["' OR '1'='1"])
assert n == 0
assert storage.count() == 2
def test_delete_record_ids_with_apostrophe_round_trips(
storage: LanceDBStorage,
) -> None:
storage.save(
[
MemoryRecord(
id="O'Reilly-42",
content="ok",
scope="/team",
embedding=[0.0] * 4,
)
]
)
assert storage.count() == 1
n = storage.delete(record_ids=["O'Reilly-42"])
assert n == 1
assert storage.count() == 0
# ----------------------------------------------------------------------
# Sink 4: reset(scope_prefix=...) must not crash on apostrophes
# ----------------------------------------------------------------------
def test_reset_scope_prefix_with_apostrophe_does_not_crash(
storage: LanceDBStorage,
) -> None:
storage.save(
[
MemoryRecord(
id="r-1",
content="x",
scope="/O'Brien/team",
embedding=[0.0] * 4,
),
MemoryRecord(
id="r-2",
content="y",
scope="/other",
embedding=[0.0] * 4,
),
]
)
# Must not raise and must scope the reset correctly.
storage.reset(scope_prefix="/O'Brien")
remaining = storage.list_records()
assert [r.scope for r in remaining] == ["/other"]
def test_reset_scope_prefix_injection_does_not_drop_unrelated_scopes(
storage: LanceDBStorage,
) -> None:
_seed(storage)
# ``' OR scope >= ''`` would, without escaping, broaden the range
# comparison to every row.
storage.reset(scope_prefix="/alpha' OR scope >= '")
assert storage.count() == 2 # nothing should have been deleted
# ----------------------------------------------------------------------
# Scan-based readers: list_records / list_scopes / get_scope_info /
# list_categories / count all flow through ``_scan_rows`` and used to
# crash on apostrophes and leak across scopes via ``%``/``_`` wildcards.
# ----------------------------------------------------------------------
def test_scan_methods_with_apostrophe_in_scope(
storage: LanceDBStorage,
) -> None:
storage.save(
[
MemoryRecord(
id="s-1",
content="x",
scope="/O'Brien",
categories=["c1"],
embedding=[0.0] * 4,
)
]
)
assert [r.id for r in storage.list_records(scope_prefix="/O'Brien")] == ["s-1"]
info = storage.get_scope_info("/O'Brien")
assert info.record_count == 1
assert info.path == "/O'Brien"
assert storage.list_scopes("/O'Brien") == []
assert storage.list_categories(scope_prefix="/O'Brien") == {"c1": 1}
assert storage.count(scope_prefix="/O'Brien") == 1
def test_scan_methods_treat_percent_as_literal(
storage: LanceDBStorage,
) -> None:
_seed(storage)
# ``/%`` should NOT match every scope rooted at ``/``; it should match
# only literal ``/<percent>...`` prefixes (of which there are none).
assert storage.list_records(scope_prefix="/%") == []
assert storage.count(scope_prefix="/%") == 0
assert storage.list_categories(scope_prefix="/%") == {}

View File

@@ -1,12 +1,14 @@
"""Tests for async task execution."""
import pytest
from pydantic import BaseModel
from unittest.mock import AsyncMock, MagicMock, patch
from crewai.agent import Agent
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.tasks.output_format import OutputFormat
from crewai.utilities.converter import Converter
@pytest.fixture
@@ -383,4 +385,73 @@ class TestAsyncTaskOutput:
assert result.description == "Test description"
assert result.expected_output == "Test expected"
assert result.raw == "Test result"
assert result.agent == "Test Agent"
assert result.agent == "Test Agent"
class _AsyncOnlyOutput(BaseModel):
value: str
class TestAsyncOutputConversion:
"""Regression tests for native-async output conversion (issue #5230).
Ensures `_aexport_output` reaches the LLM via `acall` and never via the
blocking `call` method.
"""
@pytest.mark.asyncio
async def test_aexport_output_uses_acall_not_call(self) -> None:
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = False
mock_llm.acall = AsyncMock(return_value='{"value": "ok"}')
mock_llm.call = MagicMock(
side_effect=AssertionError("call() must NOT be invoked from async path")
)
converter = Converter(
llm=mock_llm,
model=_AsyncOnlyOutput,
text="raw",
instructions="convert",
max_attempts=1,
)
result = await converter.ato_pydantic()
assert isinstance(result, _AsyncOnlyOutput)
assert result.value == "ok"
mock_llm.acall.assert_awaited_once()
mock_llm.call.assert_not_called()
@pytest.mark.asyncio
async def test_ato_json_function_calling_does_not_block_event_loop(self) -> None:
"""The function-calling JSON path must run via asyncio.to_thread.
``InternalInstructor`` is sync-only; `ato_json` should offload it so the
event loop is not blocked.
"""
mock_llm = MagicMock()
mock_llm.supports_function_calling.return_value = True
converter = Converter(
llm=mock_llm,
model=_AsyncOnlyOutput,
text="raw",
instructions="convert",
max_attempts=1,
)
sentinel = '{"value": "ok"}'
with patch.object(
converter, "_create_instructor"
) as mock_create, patch(
"crewai.utilities.converter.asyncio.to_thread", new_callable=AsyncMock
) as mock_to_thread:
instructor = MagicMock()
instructor.to_json = MagicMock(return_value=sentinel)
mock_create.return_value = instructor
mock_to_thread.return_value = sentinel
result = await converter.ato_json()
assert result == sentinel
mock_to_thread.assert_awaited_once_with(instructor.to_json)

View File

@@ -1254,6 +1254,119 @@ async def test_async_task_execution_call_count(researcher, writer):
assert mock_execute_sync.call_count == 1
def test_mixed_sync_async_task_outputs_not_dropped(researcher, writer):
"""Sync outputs accumulated before a pending async batch must survive the flush."""
sync1_output = TaskOutput(description="sync1", raw="s1", agent="researcher")
async1_output = TaskOutput(description="async1", raw="a1", agent="researcher")
sync2_output = TaskOutput(description="sync2", raw="s2", agent="writer")
sync1 = Task(description="sync1", expected_output="x", agent=researcher)
async1 = Task(
description="async1",
expected_output="x",
agent=researcher,
async_execution=True,
)
sync2 = Task(description="sync2", expected_output="x", agent=writer)
sync1.output = sync1_output
async1.output = async1_output
sync2.output = sync2_output
crew = Crew(agents=[researcher, writer], tasks=[sync1, async1, sync2])
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = async1_output
with (
patch.object(
Task, "execute_sync", side_effect=[sync1_output, sync2_output]
),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert [o.raw for o in result.tasks_output] == ["s1", "a1", "s2"]
@pytest.mark.asyncio
async def test_mixed_sync_async_task_outputs_not_dropped_native_async(
researcher, writer
):
"""Same regression as the sync path, exercised via akickoff (native async)."""
sync1_output = TaskOutput(description="sync1", raw="s1", agent="researcher")
async1_output = TaskOutput(description="async1", raw="a1", agent="researcher")
sync2_output = TaskOutput(description="sync2", raw="s2", agent="writer")
sync1 = Task(description="sync1", expected_output="x", agent=researcher)
async1 = Task(
description="async1",
expected_output="x",
agent=researcher,
async_execution=True,
)
sync2 = Task(description="sync2", expected_output="x", agent=writer)
sync1.output = sync1_output
async1.output = async1_output
sync2.output = sync2_output
crew = Crew(agents=[researcher, writer], tasks=[sync1, async1, sync2])
aexecute_outputs = iter([sync1_output, async1_output, sync2_output])
async def fake_aexecute_sync(*_args: Any, **_kwargs: Any) -> TaskOutput:
return next(aexecute_outputs)
with patch.object(Task, "aexecute_sync", side_effect=fake_aexecute_sync):
result = await crew.akickoff()
assert [o.raw for o in result.tasks_output] == ["s1", "a1", "s2"]
def test_pending_async_outputs_preserved_through_conditional_task(researcher, writer):
"""A conditional task encountered after a pending async batch must not silently drop the async output."""
sync1_output = TaskOutput(description="sync1", raw="s1", agent="researcher")
async1_output = TaskOutput(description="async1", raw="a1", agent="researcher")
def always_skip(_: TaskOutput) -> bool:
return False
sync1 = Task(description="sync1", expected_output="x", agent=researcher)
async1 = Task(
description="async1",
expected_output="x",
agent=researcher,
async_execution=True,
)
conditional = ConditionalTask(
description="conditional",
expected_output="x",
agent=writer,
condition=always_skip,
)
sync1.output = sync1_output
async1.output = async1_output
crew = Crew(
agents=[researcher, writer], tasks=[sync1, async1, conditional]
)
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = async1_output
with (
patch.object(Task, "execute_sync", return_value=sync1_output),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
raws = [o.raw for o in result.tasks_output]
assert raws[:2] == ["s1", "a1"]
assert len(result.tasks_output) == 3
@pytest.mark.vcr()
def test_kickoff_for_each_single_input():
"""Tests if kickoff_for_each works with a single input."""

View File

@@ -690,6 +690,27 @@ def test_multiple_guardrails_with_pydantic_output():
assert parsed["processed"] is True
def test_export_output_accepts_pydantic_input():
"""Regression test for #5458: _export_output must not crash with TypeError
when called with a Pydantic instance (e.g. when an upstream caller passes
an already-converted model from a context task)."""
from pydantic import BaseModel
class StructuredResult(BaseModel):
value: str
task = create_smart_task(
description="Test pydantic export",
expected_output="Structured output",
output_pydantic=StructuredResult,
)
instance = StructuredResult(value="ok")
pydantic_output, json_output = task._export_output(instance)
assert pydantic_output is instance
assert json_output is None
def test_guardrails_vs_single_guardrail_mutual_exclusion():
"""Test that guardrails list nullifies single guardrail."""

View File

@@ -17,6 +17,8 @@ from crewai.utilities.agent_utils import (
_format_messages_for_summary,
_split_messages_into_chunks,
convert_tools_to_openai_schema,
execute_single_native_tool_call,
NativeToolCallResult,
parse_tool_call_args,
summarize_messages,
)
@@ -1033,3 +1035,91 @@ class TestParseToolCallArgs:
_, error = parse_tool_call_args("{bad json}", "tool", "call_7")
assert error is not None
assert set(error.keys()) == {"call_id", "func_name", "result", "from_cache", "original_tool"}
class TestExecuteSingleNativeToolCall:
"""Tests for execute_single_native_tool_call."""
def test_result_as_answer_false_on_tool_error(self) -> None:
"""When a tool with result_as_answer=True raises, result_as_answer must be False.
Regression test for https://github.com/crewAIInc/crewAI/issues/5156
"""
from unittest.mock import MagicMock
class FailingTool(BaseTool):
name: str = "failing_tool"
description: str = "A tool that always fails"
result_as_answer: bool = True
def _run(self, **kwargs: Any) -> str:
raise RuntimeError("intentional failure")
tool = FailingTool()
tool_call = MagicMock()
tool_call.id = "call_1"
tool_call.function.name = "failing_tool"
tool_call.function.arguments = "{}"
result = execute_single_native_tool_call(
tool_call,
available_functions={"failing_tool": tool._run},
original_tools=[tool],
structured_tools=None,
tools_handler=None,
agent=None,
task=None,
crew=None,
event_source=MagicMock(),
printer=None,
verbose=False,
)
assert isinstance(result, NativeToolCallResult)
assert result.result_as_answer is False
assert "Error executing tool" in result.result
def test_result_as_answer_false_when_hook_blocks(self) -> None:
"""When a before-hook blocks a tool with result_as_answer=True, result_as_answer must be False."""
from unittest.mock import MagicMock
from crewai.hooks.tool_hooks import (
clear_before_tool_call_hooks,
register_before_tool_call_hook,
)
class BlockedTool(BaseTool):
name: str = "blocked_tool"
description: str = "A tool whose execution will be blocked by a hook"
result_as_answer: bool = True
def _run(self, **kwargs: Any) -> str:
return "should not run"
tool = BlockedTool()
tool_call = MagicMock()
tool_call.id = "call_1"
tool_call.function.name = "blocked_tool"
tool_call.function.arguments = "{}"
register_before_tool_call_hook(lambda _ctx: False)
try:
result = execute_single_native_tool_call(
tool_call,
available_functions={"blocked_tool": tool._run},
original_tools=[tool],
structured_tools=None,
tools_handler=None,
agent=None,
task=None,
crew=None,
event_source=MagicMock(),
printer=None,
verbose=False,
)
finally:
clear_before_tool_call_hooks()
assert isinstance(result, NativeToolCallResult)
assert result.result_as_answer is False
assert "blocked by hook" in result.result

View File

@@ -87,6 +87,31 @@ def test_convert_to_model_with_no_model() -> None:
assert output == "Plain text"
def test_convert_to_model_with_basemodel_input_matching_pydantic() -> None:
instance = SimpleModel(name="John", age=30)
output = convert_to_model(instance, SimpleModel, None, None)
assert output is instance
def test_convert_to_model_with_basemodel_input_matching_json() -> None:
instance = SimpleModel(name="John", age=30)
output = convert_to_model(instance, None, SimpleModel, None)
assert output == {"name": "John", "age": 30}
def test_convert_to_model_with_basemodel_input_different_class() -> None:
class OtherModel(BaseModel):
name: str
age: int
extra: str = "default"
instance = OtherModel(name="John", age=30, extra="ignored")
output = convert_to_model(instance, SimpleModel, None, None)
assert isinstance(output, SimpleModel)
assert output.name == "John"
assert output.age == 30
def test_convert_to_model_with_special_characters() -> None:
json_string_test = """
{

View File

@@ -11,6 +11,8 @@ Installed automatically via the workspace (`uv sync`). Requires:
- `ENTERPRISE_REPO` env var — GitHub repo for enterprise releases
- `ENTERPRISE_VERSION_DIRS` env var — comma-separated directories to bump in the enterprise repo
- `ENTERPRISE_CREWAI_DEP_PATH` env var — path to the pyproject.toml with the `crewai[tools]` pin in the enterprise repo
- `ENTERPRISE_WORKFLOW_PATHS` env var — comma-separated workflow file paths in the enterprise repo whose `crewai[extras]==<version>` pins should be rewritten on each release (e.g. `.github/workflows/tests.yml`)
- `ENTERPRISE_EXTRA_PACKAGES` env var — comma-separated packages to also pin in enterprise pyproject files, in addition to `crewai` / `crewai[extras]`
## Commands

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.14.5a1"
__version__ = "1.14.5a2"

10
uv.lock generated
View File

@@ -13,7 +13,7 @@ resolution-markers = [
]
[options]
exclude-newer = "2026-04-28T07:00:00Z"
exclude-newer = "2026-04-27T16:00:00Z"
[manifest]
members = [
@@ -1626,7 +1626,7 @@ requires-dist = [
{ name = "e2b-code-interpreter", marker = "extra == 'e2b'", specifier = "~=2.6.0" },
{ name = "exa-py", marker = "extra == 'exa-py'", specifier = ">=1.8.7" },
{ name = "firecrawl-py", marker = "extra == 'firecrawl-py'", specifier = ">=1.8.0" },
{ name = "gitpython", marker = "extra == 'github'", specifier = ">=3.1.41,<4" },
{ name = "gitpython", marker = "extra == 'github'", specifier = ">=3.1.47,<4" },
{ name = "hyperbrowser", marker = "extra == 'hyperbrowser'", specifier = ">=0.18.0" },
{ name = "langchain-apify", marker = "extra == 'apify'", specifier = ">=0.1.2,<1.0.0" },
{ name = "linkup-sdk", marker = "extra == 'linkup-sdk'", specifier = ">=0.2.2" },
@@ -2619,14 +2619,14 @@ wheels = [
[[package]]
name = "gitpython"
version = "3.1.46"
version = "3.1.47"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "gitdb" },
]
sdist = { url = "https://files.pythonhosted.org/packages/df/b5/59d16470a1f0dfe8c793f9ef56fd3826093fc52b3bd96d6b9d6c26c7e27b/gitpython-3.1.46.tar.gz", hash = "sha256:400124c7d0ef4ea03f7310ac2fbf7151e09ff97f2a3288d64a440c584a29c37f", size = 215371, upload-time = "2026-01-01T15:37:32.073Z" }
sdist = { url = "https://files.pythonhosted.org/packages/c1/bd/50db468e9b1310529a19fce651b3b0e753b5c07954d486cba31bbee9a5d5/gitpython-3.1.47.tar.gz", hash = "sha256:dba27f922bd2b42cb54c87a8ab3cb6beb6bf07f3d564e21ac848913a05a8a3cd", size = 216978, upload-time = "2026-04-22T02:44:44.059Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6a/09/e21df6aef1e1ffc0c816f0522ddc3f6dcded766c3261813131c78a704470/gitpython-3.1.46-py3-none-any.whl", hash = "sha256:79812ed143d9d25b6d176a10bb511de0f9c67b1fa641d82097b0ab90398a2058", size = 208620, upload-time = "2026-01-01T15:37:30.574Z" },
{ url = "https://files.pythonhosted.org/packages/f2/c5/a1bc0996af85757903cf2bf444a7824e68e0035ce63fb41d6f76f9def68b/gitpython-3.1.47-py3-none-any.whl", hash = "sha256:489f590edfd6d20571b2c0e72c6a6ac6915ee8b8cd04572330e3842207a78905", size = 209547, upload-time = "2026-04-22T02:44:41.271Z" },
]
[[package]]