Compare commits

..

13 Commits

Author SHA1 Message Date
dependabot[bot]
9d905709aa chore(deps): bump pyasn1
Bumps the security-updates group with 1 update in the / directory: [pyasn1](https://github.com/pyasn1/pyasn1).


Updates `pyasn1` from 0.6.2 to 0.6.3
- [Release notes](https://github.com/pyasn1/pyasn1/releases)
- [Changelog](https://github.com/pyasn1/pyasn1/blob/main/CHANGES.rst)
- [Commits](https://github.com/pyasn1/pyasn1/compare/v0.6.2...v0.6.3)

---
updated-dependencies:
- dependency-name: pyasn1
  dependency-version: 0.6.3
  dependency-type: indirect
  dependency-group: security-updates
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-03-17 20:35:56 +00:00
Tanishq
0b07b4c45f docs: update Exa Search Tool page with improved naming, description, and configuration options (#4800)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
* docs: update Exa Search Tool page with improved naming, description, and configuration options

Co-Authored-By: Tanishq Jaiswal <tanishq.jaiswal97@gmail.com>

* docs: fix API key link and remove neural/keyword search type references

Co-Authored-By: Tanishq Jaiswal <tanishq.jaiswal97@gmail.com>

* docs: add instant, fast, auto, deep search types

Co-Authored-By: Tanishq Jaiswal <tanishq.jaiswal97@gmail.com>

---------

Co-authored-by: João Moura <joaomdmoura@gmail.com>
2026-03-17 12:27:41 -03:00
João Moura
6235810844 fix: enhance LLM response handling and serialization (#4909)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
* fix: enhance LLM response handling and serialization

* Updated the Flow class to improve error handling when both structured and simple prompting fail, ensuring the first outcome is returned as a fallback.
* Introduced a new function, _serialize_llm_for_context, to properly serialize LLM objects with provider prefixes for better context management.
* Added tests to validate the new serialization logic and ensure correct behavior when LLM calls fail.

This update enhances the robustness of LLM interactions and improves the overall flow of handling outcomes.

* fix: patch VCR response handling to prevent httpx.ResponseNotRead errors (#4917)

* fix: enhance LLM response handling and serialization

* Updated the Flow class to improve error handling when both structured and simple prompting fail, ensuring the first outcome is returned as a fallback.
* Introduced a new function, _serialize_llm_for_context, to properly serialize LLM objects with provider prefixes for better context management.
* Added tests to validate the new serialization logic and ensure correct behavior when LLM calls fail.

This update enhances the robustness of LLM interactions and improves the overall flow of handling outcomes.

* fix: patch VCR response handling to prevent httpx.ResponseNotRead errors

VCR's _from_serialized_response mocks httpx.Response.read(), which
prevents the response's internal _content attribute from being properly
initialized. When OpenAI's client (using with_raw_response) accesses
response.content, httpx raises ResponseNotRead.

This patch explicitly sets response._content after the response is
created, ensuring that tests using VCR cassettes work correctly with
the OpenAI client's raw response handling.

Fixes tests:
- test_hierarchical_crew_creation_tasks_with_sync_last
- test_conditional_task_last_task_when_conditional_is_false
- test_crew_log_file_output

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Joao Moura <joaomdmoura@gmail.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: alex-clawd <alex@crewai.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-17 05:19:31 -03:00
Matt Aitchison
b95486c187 fix: upgrade vulnerable transitive dependencies (authlib, PyJWT, snowflake-connector-python) (#4913)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
- authlib 1.6.7 → 1.6.9 (CVE-2026-27962 critical, CVE-2026-28498, CVE-2026-28490)
- PyJWT 2.11.0 → 2.12.1 (CVE-2026-32597)
- snowflake-connector-python 4.2.0 → 4.3.0
2026-03-16 19:02:39 -05:00
Lucas Gomide
ead8e8d6e6 docs: add Custom MCP Servers in How-To Guide (#4911) 2026-03-16 17:01:41 -04:00
Vini Brasil
5bbf9c8e03 Update OTEL collectors documentation (#4908)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Check Documentation Broken Links / Check broken links (push) Has been cancelled
* Update OTEL collectors documentation

* Add translations
2026-03-16 13:27:57 -03:00
Greyson LaLonde
5053fae8a1 docs: update changelog and version for v1.11.0rc1 2026-03-16 09:55:45 -04:00
Lucas Gomide
9facd96aad docs: update MCP documentation (#4904) 2026-03-16 09:13:10 -04:00
Rip&Tear
9acb327d9f fix: replace os.system with subprocess.run in unsafe mode pip install
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* fix: replace os.system with subprocess.run in unsafe mode pip install

Eliminates shell injection risk (A05) where a malicious library name like
"pkg; rm -rf /" could execute arbitrary host commands. Using list-form
subprocess.run with shell=False ensures the library name is always treated
as a single argument with no shell metacharacter expansion.

Adds two tests: one verifying list-form invocation, one verifying that
shell metacharacters in a library name cannot trigger shell execution.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: use sys.executable -m pip to satisfy S607 linting rule

S607 flags partial executable paths like ["pip", ...]. Using
[sys.executable, "-m", "pip", ...] provides an absolute path and also
ensures installation targets the correct Python environment.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
2026-03-16 02:04:24 -04:00
Greyson LaLonde
aca0817421 feat: bump versions to 1.11.0rc1 2026-03-15 23:37:20 -04:00
Greyson LaLonde
4d21c6e4ad feat(a2a): add plus api token auth
* feat(a2a): add plus api token auth

* feat(a2a): use stub for plus api

* fix: use dynamic separator in slugify for dedup and strip

* fix: remove unused _DUPLICATE_SEPARATOR_PATTERN, cache compiled regex in slugify
2026-03-15 23:30:29 -04:00
Lorenze Jay
32d7b4a8d4 Lorenze/feat/plan execute pattern (#4817)
* feat: introduce PlanningConfig for enhanced agent planning capabilities (#4344)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* Lorenze/feat planning pt 2 todo list gen (#4449)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* feat: enhance agent planning with structured todo management

This commit introduces a new planning system within the AgentExecutor class, allowing for the creation of structured todo items from planning steps. The TodoList and TodoItem models have been added to facilitate tracking of plan execution. The reasoning plan now includes a list of steps, improving the clarity and organization of agent tasks. Additionally, tests have been added to validate the new planning functionality and ensure proper integration with existing workflows.

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* improve handler

* linted

* linted

* Lorenze/feat/planning pt 3 todo list execution (#4450)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* feat: enhance agent planning with structured todo management

This commit introduces a new planning system within the AgentExecutor class, allowing for the creation of structured todo items from planning steps. The TodoList and TodoItem models have been added to facilitate tracking of plan execution. The reasoning plan now includes a list of steps, improving the clarity and organization of agent tasks. Additionally, tests have been added to validate the new planning functionality and ensure proper integration with existing workflows.

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* improve handler

* execute todos and be able to track them

* feat: introduce PlannerObserver and StepExecutor for enhanced plan execution

This commit adds the PlannerObserver and StepExecutor classes to the CrewAI framework, implementing the observation phase of the Plan-and-Execute architecture. The PlannerObserver analyzes step execution results, determines plan validity, and suggests refinements, while the StepExecutor executes individual todo items in isolation. These additions improve the overall planning and execution process, allowing for more dynamic and responsive agent behavior. Additionally, new observation events have been defined to facilitate monitoring and logging of the planning process.

* refactor: enhance final answer synthesis in AgentExecutor

This commit improves the synthesis of final answers in the AgentExecutor class by implementing a more coherent approach to combining results from multiple todo items. The method now utilizes a single LLM call to generate a polished response, falling back to concatenation if the synthesis fails. Additionally, the test cases have been updated to reflect the changes in planning and execution, ensuring that the results are properly validated and that the plan-and-execute architecture is functioning as intended.

* refactor: enhance final answer synthesis in AgentExecutor

This commit improves the synthesis of final answers in the AgentExecutor class by implementing a more coherent approach to combining results from multiple todo items. The method now utilizes a single LLM call to generate a polished response, falling back to concatenation if the synthesis fails. Additionally, the test cases have been updated to reflect the changes in planning and execution, ensuring that the results are properly validated and that the plan-and-execute architecture is functioning as intended.

* refactor: implement structured output handling in final answer synthesis

This commit enhances the final answer synthesis process in the AgentExecutor class by introducing support for structured outputs when a response model is specified. The synthesis method now utilizes the response model to produce outputs that conform to the expected schema, while still falling back to concatenation in case of synthesis failures. This change ensures that intermediate steps yield free-text results, but the final output can be structured, improving the overall coherence and usability of the synthesized answers.

* regen tests

* linted

* fix

* Enhance PlanningConfig and AgentExecutor with Reasoning Effort Levels

This update introduces a new  attribute in the  class, allowing users to customize the observation and replanning behavior during task execution. The  class has been modified to utilize this new attribute, routing step observations based on the specified reasoning effort level: low, medium, or high.

Additionally, tests have been added to validate the functionality of the reasoning effort levels, ensuring that the agent behaves as expected under different configurations. This enhancement improves the adaptability and efficiency of the planning process in agent execution.

* regen cassettes for test and fix test

* cassette regen

* fixing tests

* dry

* Refactor PlannerObserver and StepExecutor to Utilize I18N for Prompts

This update enhances the PlannerObserver and StepExecutor classes by integrating the I18N utility for managing prompts and messages. The system and user prompts are now retrieved from the I18N module, allowing for better localization and maintainability. Additionally, the code has been cleaned up to remove hardcoded strings, improving readability and consistency across the planning and execution processes.

* Refactor PlannerObserver and StepExecutor to Utilize I18N for Prompts

This update enhances the PlannerObserver and StepExecutor classes by integrating the I18N utility for managing prompts and messages. The system and user prompts are now retrieved from the I18N module, allowing for better localization and maintainability. Additionally, the code has been cleaned up to remove hardcoded strings, improving readability and consistency across the planning and execution processes.

* consolidate agent logic

* fix datetime

* improving step executor

* refactor: streamline observation and refinement process in PlannerObserver

- Updated the PlannerObserver to apply structured refinements directly from observations without requiring a second LLM call.
- Renamed  method to  for clarity.
- Enhanced documentation to reflect changes in how refinements are handled.
- Removed unnecessary LLM message building and parsing logic, simplifying the refinement process.
- Updated event emissions to include summaries of refinements instead of raw data.

* enhance step executor with tool usage events and validation

- Added event emissions for tool usage, including started and finished events, to track tool execution.
- Implemented validation to ensure expected tools are called during step execution, raising errors when not.
- Refactored the  method to handle tool execution with event logging.
- Introduced a new method  for parsing tool input into a structured format.
- Updated tests to cover new functionality and ensure correct behavior of tool usage events.

* refactor: enhance final answer synthesis logic in AgentExecutor

- Updated the finalization process to conditionally skip synthesis when the last todo result is sufficient as a complete answer.
- Introduced a new method to determine if the last todo result can be used directly, improving efficiency.
- Added tests to verify the new behavior, ensuring synthesis is skipped when appropriate and maintained when a response model is set.

* fix: update observation handling in PlannerObserver for LLM errors

- Modified the error handling in the PlannerObserver to default to a conservative replan when an LLM call fails.
- Updated the return values to indicate that the step was not completed successfully and that a full replan is needed.
- Added a new test to verify the behavior of the observer when an LLM error occurs, ensuring the correct replan logic is triggered.

* refactor: enhance planning and execution flow in agents

- Updated the PlannerObserver to accept a kickoff input for standalone task execution, improving flexibility in task handling.
- Refined the step execution process in StepExecutor to support multi-turn action loops, allowing for iterative tool execution and observation.
- Introduced a method to extract relevant task sections from descriptions, ensuring clarity in task requirements.
- Enhanced the AgentExecutor to manage step failures more effectively, triggering replans only when necessary and preserving completed task history.
- Updated translations to reflect changes in planning principles and execution prompts, emphasizing concrete and executable steps.

* refactor: update setup_native_tools to include tool_name_mapping

- Modified the setup_native_tools function to return an additional mapping of tool names.
- Updated StepExecutor and AgentExecutor classes to accommodate the new return value from setup_native_tools.

* fix tests

* linted

* linted

* feat: enhance image block handling in Anthropic provider and update AgentExecutor logic

- Added a method to convert OpenAI-style image_url blocks to Anthropic's required format.
- Updated AgentExecutor to handle cases where no todos are ready, introducing a needs_replan return state.
- Improved fallback answer generation in AgentExecutor to prevent RuntimeErrors when no final output is produced.

* lint

* lint

* 1. Added failed to TodoStatus (planning_types.py)

  - TodoStatus now includes failed as a valid state: Literal[pending, running, completed, failed]
  - Added mark_failed(step_number, result) method to TodoList
  - Added get_failed_todos() method to TodoList
  - Updated is_complete to treat both completed and failed as terminal states
  - Updated replace_pending_todos docstring to mention failed items are preserved

  2. Mark running todos as failed before replan (agent_executor.py)

  All three effort-level handlers now call mark_failed() on the current todo before routing to replan_now:

  - Low effort (handle_step_observed_low): hard-failure branch
  - Medium effort (handle_step_observed_medium): needs_full_replan branch
  - High effort (decide_next_action): both needs_full_replan and step_completed_successfully=False branches

  3. Updated _should_replan to use get_failed_todos()

  Previously filtered on todo.status == failed which was dead code. Now uses the proper accessor method that will actually find failed items.

  What this fixes: Before these changes, a step that triggered a replan would stay in running status permanently, causing is_complete to never
  return True and next_pending to skip it — leading to stuck execution states. Now failed steps are properly tracked, replanning context correctly
  reports them, and LiteAgentOutput.failed_todos will actually return results.

* fix test

* imp on failed states

* adjusted the var name from AgentReActState to AgentExecutorState

* addressed p0 bugs

* more improvements

* linted

* regen cassette

* addressing crictical comments

* ensure configurable timeouts, max_replans and max step iterations

* adjusted tools

* dropping debug statements

* addressed comment

* fix  linter

* lints and test fixes

* fix: default observation parse fallback to failure and clean up plan-execute types

When _parse_observation_response fails all parse attempts, default to
step_completed_successfully=False instead of True to avoid silently
masking failures. Extract duplicate _extract_task_section into a shared
utility in agent_utils. Type PlanningConfig.llm as str | BaseLLM | None
instead of str | Any | None. Make StepResult a frozen dataclass for
immutability consistency with StepExecutionContext.

* fix: remove Any from function_calling_llm union type in step_executor

* fix: make BaseTool usage count thread-safe for parallel step execution

Add _usage_lock and _claim_usage() to BaseTool for atomic
check-and-increment of current_usage_count. This prevents race
conditions when parallel plan steps invoke the same tool concurrently
via execute_todos_parallel. Remove the racy pre-check from
execute_single_native_tool_call since the limit is now enforced
atomically inside tool.run().

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
Co-authored-by: Greyson LaLonde <greyson@crewai.com>
2026-03-15 18:33:17 -07:00
Rip&Tear
fb2323b3de Code interpreter sandbox escape (#4791)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Nightly Canary Release / Check for new commits (push) Has been cancelled
Nightly Canary Release / Build nightly packages (push) Has been cancelled
Nightly Canary Release / Publish nightly to PyPI (push) Has been cancelled
* [SECURITY] Fix sandbox escape vulnerability in CodeInterpreterTool (F-001)

This commit addresses a critical security vulnerability where the CodeInterpreterTool
could be exploited via sandbox escape attacks when Docker was unavailable.

Changes:
- Remove insecure fallback to restricted sandbox in run_code_safety()
- Now fails closed with RuntimeError when Docker is unavailable
- Mark run_code_in_restricted_sandbox() as deprecated and insecure
- Add clear security warnings to SandboxPython class documentation
- Update tests to reflect secure-by-default behavior
- Add test demonstrating the sandbox escape vulnerability
- Update README with security requirements and best practices

The previous implementation would fall back to a Python-based 'restricted sandbox'
when Docker was unavailable. However, this sandbox could be easily bypassed using
Python object introspection to recover the original __import__ function, allowing
arbitrary module access and command execution on the host.

The fix enforces Docker as a requirement for safe code execution. Users who cannot
use Docker must explicitly enable unsafe_mode=True, acknowledging the security risks.

Security Impact:
- Prevents RCE via sandbox escape when Docker is unavailable
- Enforces fail-closed security model
- Maintains backward compatibility via unsafe_mode flag

References:
- https://docs.crewai.com/tools/ai-ml/codeinterpretertool

Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>

* Add security fix documentation for F-001

Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>

* Add Slack summary for security fix

Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>

* Delete SECURITY_FIX_F001.md

* Delete SLACK_SUMMARY.md

* chore: regen cassettes

* chore: regen more cassettes

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: Rip&Tear <theCyberTech@users.noreply.github.com>
Co-authored-by: Greyson LaLonde <greyson@crewai.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-03-15 13:18:02 +08:00
329 changed files with 46047 additions and 12797 deletions

View File

@@ -55,7 +55,6 @@ jobs:
echo "${{ steps.changed-files.outputs.files }}" \
| tr ' ' '\n' \
| grep -v 'src/crewai/cli/templates/' \
| grep -v 'src/crewai_cli/templates/' \
| grep -v '/tests/' \
| xargs -I{} uv run ruff check "{}"

View File

@@ -19,7 +19,7 @@ repos:
language: system
pass_filenames: true
types: [python]
exclude: ^(lib/crewai/src/crewai/cli/templates/|lib/cli/|lib/crewai/tests/|lib/crewai-tools/tests/|lib/crewai-files/tests/)
exclude: ^(lib/crewai/src/crewai/cli/templates/|lib/crewai/tests/|lib/crewai-tools/tests/|lib/crewai-files/tests/)
- repo: https://github.com/astral-sh/uv-pre-commit
rev: 0.9.3
hooks:

View File

@@ -43,6 +43,35 @@ def _patched_make_vcr_request(httpx_request: Any, **kwargs: Any) -> Any:
httpx_stubs._make_vcr_request = _patched_make_vcr_request
# Patch the response-side of VCR to fix httpx.ResponseNotRead errors.
# VCR's _from_serialized_response mocks httpx.Response.read(), which prevents
# the response's internal _content attribute from being properly initialized.
# When OpenAI's client (using with_raw_response) accesses response.content,
# httpx raises ResponseNotRead because read() was never actually called.
# This patch ensures _content is explicitly set after response creation.
_original_from_serialized_response = getattr(
httpx_stubs, "_from_serialized_response", None
)
if _original_from_serialized_response is not None:
def _patched_from_serialized_response(
request: Any, serialized_response: Any, history: Any = None
) -> Any:
"""Patched version that ensures response._content is properly set."""
response = _original_from_serialized_response(request, serialized_response, history)
# Explicitly set _content to avoid ResponseNotRead errors
# The content was passed to the constructor but the mocked read() prevents
# proper initialization of the internal state
body_content = serialized_response.get("body", {}).get("string", b"")
if isinstance(body_content, str):
body_content = body_content.encode("utf-8")
response._content = body_content
return response
httpx_stubs._from_serialized_response = _patched_from_serialized_response
@pytest.fixture(autouse=True, scope="function")
def cleanup_event_handlers() -> Generator[None, Any, None]:
"""Clean up event bus handlers after each test to prevent test pollution."""
@@ -226,7 +255,7 @@ def vcr_cassette_dir(request: Any) -> str:
for parent in test_file.parents:
if (
parent.name in ("crewai", "crewai-tools", "crewai-files", "cli")
parent.name in ("crewai", "crewai-tools", "crewai-files")
and parent.parent.name == "lib"
):
package_root = parent

View File

@@ -458,6 +458,7 @@
"en/enterprise/guides/capture_telemetry_logs",
"en/enterprise/guides/azure-openai-setup",
"en/enterprise/guides/tool-repository",
"en/enterprise/guides/custom-mcp-server",
"en/enterprise/guides/react-component-export",
"en/enterprise/guides/team-management",
"en/enterprise/guides/human-in-the-loop",
@@ -917,6 +918,7 @@
"en/enterprise/guides/capture_telemetry_logs",
"en/enterprise/guides/azure-openai-setup",
"en/enterprise/guides/tool-repository",
"en/enterprise/guides/custom-mcp-server",
"en/enterprise/guides/react-component-export",
"en/enterprise/guides/team-management",
"en/enterprise/guides/human-in-the-loop",
@@ -1366,8 +1368,10 @@
"pt-BR/enterprise/guides/kickoff-crew",
"pt-BR/enterprise/guides/update-crew",
"pt-BR/enterprise/guides/enable-crew-studio",
"pt-BR/enterprise/guides/capture_telemetry_logs",
"pt-BR/enterprise/guides/azure-openai-setup",
"pt-BR/enterprise/guides/tool-repository",
"pt-BR/enterprise/guides/custom-mcp-server",
"pt-BR/enterprise/guides/react-component-export",
"pt-BR/enterprise/guides/team-management",
"pt-BR/enterprise/guides/human-in-the-loop",
@@ -1803,8 +1807,10 @@
"pt-BR/enterprise/guides/kickoff-crew",
"pt-BR/enterprise/guides/update-crew",
"pt-BR/enterprise/guides/enable-crew-studio",
"pt-BR/enterprise/guides/capture_telemetry_logs",
"pt-BR/enterprise/guides/azure-openai-setup",
"pt-BR/enterprise/guides/tool-repository",
"pt-BR/enterprise/guides/custom-mcp-server",
"pt-BR/enterprise/guides/react-component-export",
"pt-BR/enterprise/guides/team-management",
"pt-BR/enterprise/guides/human-in-the-loop",
@@ -2282,8 +2288,10 @@
"ko/enterprise/guides/kickoff-crew",
"ko/enterprise/guides/update-crew",
"ko/enterprise/guides/enable-crew-studio",
"ko/enterprise/guides/capture_telemetry_logs",
"ko/enterprise/guides/azure-openai-setup",
"ko/enterprise/guides/tool-repository",
"ko/enterprise/guides/custom-mcp-server",
"ko/enterprise/guides/react-component-export",
"ko/enterprise/guides/team-management",
"ko/enterprise/guides/human-in-the-loop",
@@ -2731,8 +2739,10 @@
"ko/enterprise/guides/kickoff-crew",
"ko/enterprise/guides/update-crew",
"ko/enterprise/guides/enable-crew-studio",
"ko/enterprise/guides/capture_telemetry_logs",
"ko/enterprise/guides/azure-openai-setup",
"ko/enterprise/guides/tool-repository",
"ko/enterprise/guides/custom-mcp-server",
"ko/enterprise/guides/react-component-export",
"ko/enterprise/guides/team-management",
"ko/enterprise/guides/human-in-the-loop",

View File

@@ -4,6 +4,29 @@ description: "Product updates, improvements, and bug fixes for CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="Mar 15, 2026">
## v1.11.0rc1
[View release on GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.11.0rc1)
## What's Changed
### Features
- Add Plus API token authentication in a2a
- Implement plan execute pattern
### Bug Fixes
- Resolve code interpreter sandbox escape issue
### Documentation
- Update changelog and version for v1.10.2rc2
## Contributors
@Copilot, @greysonlalonde, @lorenzejay, @theCyberTech
</Update>
<Update label="Mar 14, 2026">
## v1.10.2rc2

View File

@@ -219,6 +219,16 @@ CrewAI provides a wide range of events that you can listen for:
- **ToolExecutionErrorEvent**: Emitted when a tool execution encounters an error
- **ToolSelectionErrorEvent**: Emitted when there's an error selecting a tool
### MCP Events
- **MCPConnectionStartedEvent**: Emitted when starting to connect to an MCP server. Contains the server name, URL, transport type, connection timeout, and whether it's a reconnection attempt.
- **MCPConnectionCompletedEvent**: Emitted when successfully connected to an MCP server. Contains the server name, connection duration in milliseconds, and whether it was a reconnection.
- **MCPConnectionFailedEvent**: Emitted when connection to an MCP server fails. Contains the server name, error message, and error type (`timeout`, `authentication`, `network`, etc.).
- **MCPToolExecutionStartedEvent**: Emitted when starting to execute an MCP tool. Contains the server name, tool name, and tool arguments.
- **MCPToolExecutionCompletedEvent**: Emitted when MCP tool execution completes successfully. Contains the server name, tool name, result, and execution duration in milliseconds.
- **MCPToolExecutionFailedEvent**: Emitted when MCP tool execution fails. Contains the server name, tool name, error message, and error type (`timeout`, `validation`, `server_error`, etc.).
- **MCPConfigFetchFailedEvent**: Emitted when fetching an MCP server configuration fails (e.g., the MCP is not connected in your account, API error, or connection failure after config was fetched). Contains the slug, error message, and error type (`not_connected`, `api_error`, `connection_failed`).
### Knowledge Events
- **KnowledgeRetrievalStartedEvent**: Emitted when a knowledge retrieval is started

View File

@@ -1,30 +1,39 @@
---
title: "Open Telemetry Logs"
description: "Understand how to capture telemetry logs from your CrewAI AMP deployments"
title: "OpenTelemetry Export"
description: "Export traces and logs from your CrewAI AMP deployments to your own OpenTelemetry collector"
icon: "magnifying-glass-chart"
mode: "wide"
---
CrewAI AMP provides a powerful way to capture telemetry logs from your deployments. This allows you to monitor the performance of your agents and workflows, and to debug issues that may arise.
CrewAI AMP can export OpenTelemetry **traces** and **logs** from your deployments directly to your own collector. This lets you monitor agent performance, track LLM calls, and debug issues using your existing observability stack.
Telemetry data follows the [OpenTelemetry GenAI semantic conventions](https://opentelemetry.io/docs/specs/semconv/gen-ai/) plus additional CrewAI-specific attributes.
## Prerequisites
<CardGroup cols={2}>
<Card title="ENTERPRISE OTEL SETUP enabled" icon="users">
Your organization should have ENTERPRISE OTEL SETUP enabled
<Card title="CrewAI AMP account" icon="users">
Your organization must have an active CrewAI AMP account.
</Card>
<Card title="OTEL collector setup" icon="server">
Your organization should have an OTEL collector setup or a provider like
Datadog log intake setup
<Card title="OpenTelemetry collector" icon="server">
You need an OpenTelemetry-compatible collector endpoint (e.g., your own OTel Collector, Datadog, Grafana, or any OTLP-compatible backend).
</Card>
</CardGroup>
## How to capture telemetry logs
## Setting up a collector
1. Go to settings/organization tab
2. Configure your OTEL collector setup
3. Save
1. In CrewAI AMP, go to **Settings** > **OpenTelemetry Collectors**.
2. Click **Add Collector**.
3. Select an integration type — **OpenTelemetry Traces** or **OpenTelemetry Logs**.
4. Configure the connection:
- **Endpoint** — Your collector's OTLP endpoint (e.g., `https://otel-collector.example.com:4317`).
- **Service Name** — A name to identify this service in your observability platform.
- **Custom Headers** *(optional)* — Add authentication or routing headers as key-value pairs.
- **Certificate** *(optional)* — Provide a TLS certificate if your collector requires one.
5. Click **Save**.
Example to setup OTEL log collection capture to Datadog.
<Frame>![OpenTelemetry Collector Configuration](/images/crewai-otel-collector-config.png)</Frame>
<Frame>![Capture Telemetry Logs](/images/crewai-otel-export.png)</Frame>
<Tip>
You can add multiple collectors — for example, one for traces and another for logs, or send to different backends for different purposes.
</Tip>

View File

@@ -0,0 +1,136 @@
---
title: "Custom MCP Servers"
description: "Connect your own MCP servers to CrewAI AMP with public access, API key authentication, or OAuth 2.0"
icon: "plug"
mode: "wide"
---
CrewAI AMP supports connecting to any MCP server that implements the [Model Context Protocol](https://modelcontextprotocol.io/). You can bring public servers that require no authentication, servers protected by an API key or bearer token, and servers that use OAuth 2.0 for secure delegated access.
## Prerequisites
<CardGroup cols={2}>
<Card title="CrewAI AMP Account" icon="user">
You need an active [CrewAI AMP](https://app.crewai.com) account.
</Card>
<Card title="MCP Server URL" icon="link">
The URL of the MCP server you want to connect. The server must be accessible from the internet and support Streamable HTTP transport.
</Card>
</CardGroup>
## Adding a Custom MCP Server
<Steps>
<Step title="Open Tools & Integrations">
Navigate to **Tools & Integrations** in the left sidebar of CrewAI AMP, then select the **Connections** tab.
</Step>
<Step title="Start adding a Custom MCP Server">
Click the **Add Custom MCP Server** button. A dialog will appear with the configuration form.
</Step>
<Step title="Fill in the basic information">
- **Name** (required): A descriptive name for your MCP server (e.g., "My Internal Tools Server").
- **Description**: An optional summary of what this MCP server provides.
- **Server URL** (required): The full URL to your MCP server endpoint (e.g., `https://my-server.example.com/mcp`).
</Step>
<Step title="Choose an authentication method">
Select one of the three available authentication methods based on how your MCP server is secured. See the sections below for details on each method.
</Step>
<Step title="Add custom headers (optional)">
If your MCP server requires additional headers on every request (e.g., tenant identifiers or routing headers), click **+ Add Header** and provide the header name and value. You can add multiple custom headers.
</Step>
<Step title="Create the connection">
Click **Create MCP Server** to save the connection. Your custom MCP server will now appear in the Connections list and its tools will be available for use in your crews.
</Step>
</Steps>
## Authentication Methods
### No Authentication
Choose this option when your MCP server is publicly accessible and does not require any credentials. This is common for open-source or internal servers running behind a VPN.
### Authentication Token
Use this method when your MCP server is protected by an API key or bearer token.
<Frame>
<img src="/images/enterprise/custom-mcp-auth-token.png" alt="Custom MCP Server with Authentication Token" />
</Frame>
| Field | Required | Description |
|-------|----------|-------------|
| **Header Name** | Yes | The name of the HTTP header that carries the token (e.g., `X-API-Key`, `Authorization`). |
| **Value** | Yes | Your API key or bearer token. |
| **Add to** | No | Where to attach the credential — **Header** (default) or **Query parameter**. |
<Tip>
If your server expects a `Bearer` token in the `Authorization` header, set the Header Name to `Authorization` and the Value to `Bearer <your-token>`.
</Tip>
### OAuth 2.0
Use this method for MCP servers that require OAuth 2.0 authorization. CrewAI will handle the full OAuth flow, including token refresh.
<Frame>
<img src="/images/enterprise/custom-mcp-oauth.png" alt="Custom MCP Server with OAuth 2.0" />
</Frame>
| Field | Required | Description |
|-------|----------|-------------|
| **Redirect URI** | — | Pre-filled and read-only. Copy this URI and register it as an authorized redirect URI in your OAuth provider. |
| **Authorization Endpoint** | Yes | The URL where users are sent to authorize access (e.g., `https://auth.example.com/oauth/authorize`). |
| **Token Endpoint** | Yes | The URL used to exchange the authorization code for an access token (e.g., `https://auth.example.com/oauth/token`). |
| **Client ID** | Yes | The OAuth client ID issued by your provider. |
| **Client Secret** | No | The OAuth client secret. Not required for public clients using PKCE. |
| **Scopes** | No | Space-separated list of scopes to request (e.g., `read write`). |
| **Token Auth Method** | No | How the client credentials are sent when exchanging tokens — **Standard (POST body)** or **Basic Auth (header)**. Defaults to Standard. |
| **PKCE Supported** | No | Enable if your OAuth provider supports Proof Key for Code Exchange. Recommended for improved security. |
<Info>
**Discover OAuth Config**: If your OAuth provider supports OpenID Connect Discovery, click the **Discover OAuth Config** link to auto-populate the authorization and token endpoints from the provider's `/.well-known/openid-configuration` URL.
</Info>
#### Setting Up OAuth 2.0 Step by Step
<Steps>
<Step title="Register the redirect URI">
Copy the **Redirect URI** shown in the form and add it as an authorized redirect URI in your OAuth provider's application settings.
</Step>
<Step title="Enter endpoints and credentials">
Fill in the **Authorization Endpoint**, **Token Endpoint**, **Client ID**, and optionally the **Client Secret** and **Scopes**.
</Step>
<Step title="Configure token exchange method">
Select the appropriate **Token Auth Method**. Most providers use the default **Standard (POST body)**. Some older providers require **Basic Auth (header)**.
</Step>
<Step title="Enable PKCE (recommended)">
Check **PKCE Supported** if your provider supports it. PKCE adds an extra layer of security to the authorization code flow and is recommended for all new integrations.
</Step>
<Step title="Create and authorize">
Click **Create MCP Server**. You will be redirected to your OAuth provider to authorize access. Once authorized, CrewAI will store the tokens and automatically refresh them as needed.
</Step>
</Steps>
## Using Your Custom MCP Server
Once connected, your custom MCP server's tools appear alongside built-in connections on the **Tools & Integrations** page. You can:
- **Assign tools to agents** in your crews just like any other CrewAI tool.
- **Manage visibility** to control which team members can use the server.
- **Edit or remove** the connection at any time from the Connections list.
<Warning>
If your MCP server becomes unreachable or the credentials expire, tool calls using that server will fail. Make sure the server URL is stable and credentials are kept up to date.
</Warning>
<Card title="Need Help?" icon="headset" href="mailto:support@crewai.com">
Contact our support team for assistance with custom MCP server configuration or troubleshooting.
</Card>

View File

@@ -62,22 +62,22 @@ Use the `#` syntax to select specific tools from a server:
"https://mcp.exa.ai/mcp?api_key=your_key#web_search_exa"
```
### CrewAI AMP Marketplace
### Connected MCP Integrations
Access tools from the CrewAI AMP marketplace:
Connect MCP servers from the CrewAI catalog or bring your own. Once connected in your account, reference them by slug:
```python
# Full service with all tools
"crewai-amp:financial-data"
# Connected MCP with all tools
"snowflake"
# Specific tool from AMP service
"crewai-amp:research-tools#pubmed_search"
# Specific tool from a connected MCP
"stripe#list_invoices"
# Multiple AMP services
# Multiple connected MCPs
mcps=[
"crewai-amp:weather-insights",
"crewai-amp:market-analysis",
"crewai-amp:social-media-monitoring"
"snowflake",
"stripe",
"github"
]
```
@@ -99,10 +99,10 @@ multi_source_agent = Agent(
"https://mcp.exa.ai/mcp?api_key=your_exa_key&profile=research",
"https://weather.api.com/mcp#get_current_conditions",
# CrewAI AMP marketplace
"crewai-amp:financial-insights",
"crewai-amp:academic-research#pubmed_search",
"crewai-amp:market-intelligence#competitor_analysis"
# Connected MCPs from catalog
"snowflake",
"stripe#list_invoices",
"github#search_repositories"
]
)
@@ -147,7 +147,7 @@ agent = Agent(
mcps=[
"https://mcp.exa.ai/mcp?api_key=key", # Tools: mcp_exa_ai_*
"https://weather.service.com/mcp", # Tools: weather_service_com_*
"crewai-amp:financial-data" # Tools: financial_data_*
"snowflake" # Tools: snowflake_*
]
)
@@ -170,7 +170,7 @@ agent = Agent(
"https://primary-server.com/mcp", # Primary data source
"https://backup-server.com/mcp", # Backup if primary fails
"https://unreachable-server.com/mcp", # Will be skipped with warning
"crewai-amp:reliable-service" # Reliable AMP service
"snowflake" # Connected MCP from catalog
]
)
@@ -254,7 +254,7 @@ agent = Agent(
apps=["gmail", "slack"], # Platform integrations
mcps=[ # MCP servers
"https://mcp.exa.ai/mcp?api_key=key",
"crewai-amp:research-tools"
"snowflake"
],
verbose=True,
@@ -298,7 +298,7 @@ agent = Agent(
mcps=[
"https://primary-api.com/mcp", # Primary choice
"https://backup-api.com/mcp", # Backup option
"crewai-amp:reliable-service" # AMP fallback
"snowflake" # Connected MCP fallback
]
```
@@ -311,7 +311,7 @@ agent = Agent(
backstory="Financial analyst with access to weather data for agricultural market insights",
mcps=[
"https://weather.service.com/mcp#get_forecast",
"crewai-amp:financial-data#stock_analysis"
"stripe#list_invoices"
]
)
```

View File

@@ -17,7 +17,7 @@ Use the `mcps` field directly on agents for seamless MCP tool integration. The D
#### String-Based References (Quick Setup)
Perfect for remote HTTPS servers and CrewAI AMP marketplace:
Perfect for remote HTTPS servers and connected MCP integrations from the CrewAI catalog:
```python
from crewai import Agent
@@ -29,8 +29,8 @@ agent = Agent(
mcps=[
"https://mcp.exa.ai/mcp?api_key=your_key", # External MCP server
"https://api.weather.com/mcp#get_forecast", # Specific tool from server
"crewai-amp:financial-data", # CrewAI AMP marketplace
"crewai-amp:research-tools#pubmed_search" # Specific AMP tool
"snowflake", # Connected MCP from catalog
"stripe#list_invoices" # Specific tool from connected MCP
]
)
# MCP tools are now automatically available to your agent!
@@ -127,7 +127,7 @@ research_agent = Agent(
backstory="Expert researcher with access to multiple data sources",
mcps=[
"https://mcp.exa.ai/mcp?api_key=your_key&profile=your_profile",
"crewai-amp:weather-service#current_conditions"
"snowflake#run_query"
]
)
@@ -204,19 +204,22 @@ mcps=[
]
```
#### CrewAI AMP Marketplace
#### Connected MCP Integrations
Connect MCP servers from the CrewAI catalog or bring your own. Once connected in your account, reference them by slug:
```python
mcps=[
# Full AMP MCP service - get all available tools
"crewai-amp:financial-data",
# Connected MCP - get all available tools
"snowflake",
# Specific tool from AMP service using # syntax
"crewai-amp:research-tools#pubmed_search",
# Specific tool from a connected MCP using # syntax
"stripe#list_invoices",
# Multiple AMP services
"crewai-amp:weather-service",
"crewai-amp:market-analysis"
# Multiple connected MCPs
"snowflake",
"stripe",
"github"
]
```
@@ -299,7 +302,7 @@ from crewai.mcp import MCPServerStdio, MCPServerHTTP
mcps=[
# String references
"https://external-api.com/mcp", # External server
"crewai-amp:financial-insights", # AMP service
"snowflake", # Connected MCP from catalog
# Structured configurations
MCPServerStdio(
@@ -409,7 +412,7 @@ agent = Agent(
# String references
"https://reliable-server.com/mcp", # Will work
"https://unreachable-server.com/mcp", # Will be skipped gracefully
"crewai-amp:working-service", # Will work
"snowflake", # Connected MCP from catalog
# Structured configs
MCPServerStdio(

View File

@@ -1,53 +1,110 @@
---
title: EXA Search Web Loader
description: The `EXASearchTool` is designed to perform a semantic search for a specified query from a text's content across the internet.
icon: globe-pointer
title: "Exa Search Tool"
description: "Search the web using the Exa Search API to find the most relevant results for any query, with options for full page content, highlights, and summaries."
icon: "magnifying-glass"
mode: "wide"
---
# `EXASearchTool`
## Description
The EXASearchTool is designed to perform a semantic search for a specified query from a text's content across the internet.
It utilizes the [exa.ai](https://exa.ai/) API to fetch and display the most relevant search results based on the query provided by the user.
The `EXASearchTool` lets CrewAI agents search the web using the [Exa](https://exa.ai/) search API. It returns the most relevant results for any query, with options for full page content and AI-generated summaries.
## Installation
To incorporate this tool into your project, follow the installation instructions below:
Install the CrewAI tools package:
```shell
pip install 'crewai[tools]'
```
## Example
## Environment Variables
The following example demonstrates how to initialize the tool and execute a search with a given query:
Set your Exa API key as an environment variable:
```python Code
from crewai_tools import EXASearchTool
# Initialize the tool for internet searching capabilities
tool = EXASearchTool()
```bash
export EXA_API_KEY='your_exa_api_key'
```
## Steps to Get Started
Get an API key from the [Exa dashboard](https://dashboard.exa.ai/api-keys).
To effectively use the EXASearchTool, follow these steps:
## Example Usage
<Steps>
<Step title="Package Installation">
Confirm that the `crewai[tools]` package is installed in your Python environment.
</Step>
<Step title="API Key Acquisition">
Acquire a [exa.ai](https://exa.ai/) API key by registering for a free account at [exa.ai](https://exa.ai/).
</Step>
<Step title="Environment Configuration">
Store your obtained API key in an environment variable named `EXA_API_KEY` to facilitate its use by the tool.
</Step>
</Steps>
Here's how to use the `EXASearchTool` within a CrewAI agent:
## Conclusion
```python
import os
from crewai import Agent, Task, Crew
from crewai_tools import EXASearchTool
By integrating the `EXASearchTool` into Python projects, users gain the ability to conduct real-time, relevant searches across the internet directly from their applications.
By adhering to the setup and usage guidelines provided, incorporating this tool into projects is streamlined and straightforward.
# Initialize the tool
exa_tool = EXASearchTool()
# Create an agent that uses the tool
researcher = Agent(
role='Research Analyst',
goal='Find the latest information on any topic',
backstory='An expert researcher who finds the most relevant and up-to-date information.',
tools=[exa_tool],
verbose=True
)
# Create a task for the agent
research_task = Task(
description='Find the top 3 recent breakthroughs in quantum computing.',
expected_output='A summary of the top 3 breakthroughs with source URLs.',
agent=researcher
)
# Form the crew and kick it off
crew = Crew(
agents=[researcher],
tasks=[research_task],
verbose=True
)
result = crew.kickoff()
print(result)
```
## Configuration Options
The `EXASearchTool` accepts the following parameters during initialization:
- `type` (str, optional): The search type to use. Defaults to `"auto"`. Options: `"auto"`, `"instant"`, `"fast"`, `"deep"`.
- `content` (bool, optional): Whether to include full page content in results. Defaults to `False`.
- `summary` (bool, optional): Whether to include AI-generated summaries of each result. Requires `content=True`. Defaults to `False`.
- `api_key` (str, optional): Your Exa API key. Falls back to the `EXA_API_KEY` environment variable if not provided.
- `base_url` (str, optional): Custom API server URL. Falls back to the `EXA_BASE_URL` environment variable if not provided.
When calling the tool (or when an agent invokes it), the following search parameters are available:
- `search_query` (str): **Required**. The search query string.
- `start_published_date` (str, optional): Filter results published after this date (ISO 8601 format, e.g. `"2024-01-01"`).
- `end_published_date` (str, optional): Filter results published before this date (ISO 8601 format).
- `include_domains` (list[str], optional): A list of domains to restrict the search to.
## Advanced Usage
You can configure the tool with custom parameters for richer results:
```python
# Get full page content with AI summaries
exa_tool = EXASearchTool(
content=True,
summary=True,
type="deep"
)
# Use it in an agent
agent = Agent(
role="Deep Researcher",
goal="Conduct thorough research with full content and summaries",
tools=[exa_tool]
)
```
## Features
- **Semantic Search**: Find results based on meaning, not just keywords
- **Full Content Retrieval**: Get the full text of web pages alongside search results
- **AI Summaries**: Get concise, AI-generated summaries of each result
- **Date Filtering**: Limit results to specific time periods with published date filters
- **Domain Filtering**: Restrict searches to specific domains

Binary file not shown.

After

Width:  |  Height:  |  Size: 356 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 317 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB

View File

@@ -4,6 +4,29 @@ description: "CrewAI의 제품 업데이트, 개선 사항 및 버그 수정"
icon: "clock"
mode: "wide"
---
<Update label="2026년 3월 15일">
## v1.11.0rc1
[GitHub 릴리스 보기](https://github.com/crewAIInc/crewAI/releases/tag/1.11.0rc1)
## 변경 사항
### 기능
- Plus API 토큰 인증 추가
- 에서 계획 실행 패턴 구현
### 버그 수정
- 코드 인터프리터 샌드박스 탈출 문제 해결
### 문서
- v1.10.2rc2의 변경 로그 및 버전 업데이트
## 기여자
@Copilot, @greysonlalonde, @lorenzejay, @theCyberTech
</Update>
<Update label="2026년 3월 14일">
## v1.10.2rc2

View File

@@ -0,0 +1,39 @@
---
title: "OpenTelemetry 내보내기"
description: "CrewAI AMP 배포에서 자체 OpenTelemetry 수집기로 트레이스와 로그를 내보내기"
icon: "magnifying-glass-chart"
mode: "wide"
---
CrewAI AMP는 배포에서 OpenTelemetry **트레이스**와 **로그**를 자체 수집기로 직접 내보낼 수 있습니다. 이를 통해 기존 관측 가능성 스택을 사용하여 에이전트 성능을 모니터링하고, LLM 호출을 추적하고, 문제를 디버깅할 수 있습니다.
텔레메트리 데이터는 [OpenTelemetry GenAI 시맨틱 규칙](https://opentelemetry.io/docs/specs/semconv/gen-ai/)과 추가적인 CrewAI 전용 속성을 따릅니다.
## 사전 요구 사항
<CardGroup cols={2}>
<Card title="CrewAI AMP 계정" icon="users">
조직에 활성 CrewAI AMP 계정이 있어야 합니다.
</Card>
<Card title="OpenTelemetry 수집기" icon="server">
OpenTelemetry 호환 수집기 엔드포인트가 필요합니다 (예: 자체 OTel Collector, Datadog, Grafana 또는 OTLP 호환 백엔드).
</Card>
</CardGroup>
## 수집기 설정
1. CrewAI AMP에서 **Settings** > **OpenTelemetry Collectors**로 이동합니다.
2. **Add Collector**를 클릭합니다.
3. 통합 유형을 선택합니다 — **OpenTelemetry Traces** 또는 **OpenTelemetry Logs**.
4. 연결을 구성합니다:
- **Endpoint** — 수집기의 OTLP 엔드포인트 (예: `https://otel-collector.example.com:4317`).
- **Service Name** — 관측 가능성 플랫폼에서 이 서비스를 식별하기 위한 이름.
- **Custom Headers** *(선택 사항)* — 인증 또는 라우팅 헤더를 키-값 쌍으로 추가합니다.
- **Certificate** *(선택 사항)* — 수집기에서 TLS 인증서가 필요한 경우 제공합니다.
5. **Save**를 클릭합니다.
<Frame>![OpenTelemetry 수집기 구성](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
여러 수집기를 추가할 수 있습니다 — 예를 들어, 트레이스용 하나와 로그용 하나를 추가하거나, 다른 목적을 위해 다른 백엔드로 전송할 수 있습니다.
</Tip>

View File

@@ -0,0 +1,136 @@
---
title: "커스텀 MCP 서버"
description: "공개 액세스, API 키 인증 또는 OAuth 2.0을 사용하여 자체 MCP 서버를 CrewAI AMP에 연결하세요"
icon: "plug"
mode: "wide"
---
CrewAI AMP는 [Model Context Protocol](https://modelcontextprotocol.io/)을 구현하는 모든 MCP 서버에 연결할 수 있습니다. 인증이 필요 없는 공개 서버, API 키 또는 Bearer 토큰으로 보호되는 서버, OAuth 2.0을 사용하는 서버를 연결할 수 있습니다.
## 사전 요구사항
<CardGroup cols={2}>
<Card title="CrewAI AMP 계정" icon="user">
활성화된 [CrewAI AMP](https://app.crewai.com) 계정이 필요합니다.
</Card>
<Card title="MCP 서버 URL" icon="link">
연결하려는 MCP 서버의 URL입니다. 서버는 인터넷에서 접근 가능해야 하며 Streamable HTTP 전송을 지원해야 합니다.
</Card>
</CardGroup>
## 커스텀 MCP 서버 추가하기
<Steps>
<Step title="Tools & Integrations 열기">
CrewAI AMP 왼쪽 사이드바에서 **Tools & Integrations**로 이동한 후 **Connections** 탭을 선택합니다.
</Step>
<Step title="커스텀 MCP 서버 추가 시작">
**Add Custom MCP Server** 버튼을 클릭합니다. 구성 양식이 포함된 대화 상자가 나타납니다.
</Step>
<Step title="기본 정보 입력">
- **Name** (필수): MCP 서버의 설명적 이름 (예: "내부 도구 서버").
- **Description**: 이 MCP 서버가 제공하는 기능에 대한 선택적 요약.
- **Server URL** (필수): MCP 서버 엔드포인트의 전체 URL (예: `https://my-server.example.com/mcp`).
</Step>
<Step title="인증 방법 선택">
MCP 서버의 보안 방식에 따라 세 가지 인증 방법 중 하나를 선택합니다. 각 방법에 대한 자세한 내용은 아래 섹션을 참조하세요.
</Step>
<Step title="커스텀 헤더 추가 (선택사항)">
MCP 서버가 모든 요청에 추가 헤더를 요구하는 경우 (예: 테넌트 식별자 또는 라우팅 헤더), **+ Add Header**를 클릭하고 헤더 이름과 값을 입력합니다. 여러 커스텀 헤더를 추가할 수 있습니다.
</Step>
<Step title="연결 생성">
**Create MCP Server**를 클릭하여 연결을 저장합니다. 커스텀 MCP 서버가 Connections 목록에 나타나고 해당 도구를 crew에서 사용할 수 있게 됩니다.
</Step>
</Steps>
## 인증 방법
### 인증 없음
MCP 서버가 공개적으로 접근 가능하고 자격 증명이 필요 없을 때 이 옵션을 선택합니다. 오픈 소스 서버나 VPN 뒤에서 실행되는 내부 서버에 일반적입니다.
### 인증 토큰
MCP 서버가 API 키 또는 Bearer 토큰으로 보호되는 경우 이 방법을 사용합니다.
<Frame>
<img src="/images/enterprise/custom-mcp-auth-token.png" alt="인증 토큰을 사용하는 커스텀 MCP 서버" />
</Frame>
| 필드 | 필수 | 설명 |
|------|------|------|
| **Header Name** | 예 | 토큰을 전달하는 HTTP 헤더 이름 (예: `X-API-Key`, `Authorization`). |
| **Value** | 예 | API 키 또는 Bearer 토큰. |
| **Add to** | 아니오 | 자격 증명을 첨부할 위치 — **Header** (기본값) 또는 **Query parameter**. |
<Tip>
서버가 `Authorization` 헤더에 `Bearer` 토큰을 예상하는 경우, Header Name을 `Authorization`으로, Value를 `Bearer <토큰>`으로 설정하세요.
</Tip>
### OAuth 2.0
OAuth 2.0 인증이 필요한 MCP 서버에 이 방법을 사용합니다. CrewAI가 토큰 갱신을 포함한 전체 OAuth 흐름을 처리합니다.
<Frame>
<img src="/images/enterprise/custom-mcp-oauth.png" alt="OAuth 2.0을 사용하는 커스텀 MCP 서버" />
</Frame>
| 필드 | 필수 | 설명 |
|------|------|------|
| **Redirect URI** | — | 자동으로 채워지며 읽기 전용입니다. 이 URI를 복사하여 OAuth 제공자에 승인된 리디렉션 URI로 등록하세요. |
| **Authorization Endpoint** | 예 | 사용자가 접근을 승인하기 위해 이동하는 URL (예: `https://auth.example.com/oauth/authorize`). |
| **Token Endpoint** | 예 | 인증 코드를 액세스 토큰으로 교환하는 데 사용되는 URL (예: `https://auth.example.com/oauth/token`). |
| **Client ID** | 예 | OAuth 제공자가 발급한 클라이언트 ID. |
| **Client Secret** | 아니오 | OAuth 클라이언트 시크릿. PKCE를 사용하는 공개 클라이언트에는 필요하지 않습니다. |
| **Scopes** | 아니오 | 요청할 스코프의 공백으로 구분된 목록 (예: `read write`). |
| **Token Auth Method** | 아니오 | 토큰 교환 시 클라이언트 자격 증명을 보내는 방법 — **Standard (POST body)** 또는 **Basic Auth (header)**. 기본값은 Standard입니다. |
| **PKCE Supported** | 아니오 | OAuth 제공자가 Proof Key for Code Exchange를 지원하는 경우 활성화합니다. 보안 강화를 위해 권장됩니다. |
<Info>
**Discover OAuth Config**: OAuth 제공자가 OpenID Connect Discovery를 지원하는 경우, **Discover OAuth Config** 링크를 클릭하여 제공자의 `/.well-known/openid-configuration` URL에서 인증 및 토큰 엔드포인트를 자동으로 채울 수 있습니다.
</Info>
#### OAuth 2.0 단계별 설정
<Steps>
<Step title="리디렉션 URI 등록">
양식에 표시된 **Redirect URI**를 복사하여 OAuth 제공자의 애플리케이션 설정에서 승인된 리디렉션 URI로 추가합니다.
</Step>
<Step title="엔드포인트 및 자격 증명 입력">
**Authorization Endpoint**, **Token Endpoint**, **Client ID**를 입력하고, 선택적으로 **Client Secret**과 **Scopes**를 입력합니다.
</Step>
<Step title="토큰 교환 방법 구성">
적절한 **Token Auth Method**를 선택합니다. 대부분의 제공자는 기본값인 **Standard (POST body)**를 사용합니다. 일부 오래된 제공자는 **Basic Auth (header)**를 요구합니다.
</Step>
<Step title="PKCE 활성화 (권장)">
제공자가 지원하는 경우 **PKCE Supported**를 체크합니다. PKCE는 인증 코드 흐름에 추가 보안 계층을 제공하며 모든 새 통합에 권장됩니다.
</Step>
<Step title="생성 및 인증">
**Create MCP Server**를 클릭합니다. OAuth 제공자로 리디렉션되어 접근을 인증합니다. 인증 완료 후 CrewAI가 토큰을 저장하고 필요에 따라 자동으로 갱신합니다.
</Step>
</Steps>
## 커스텀 MCP 서버 사용하기
연결이 완료되면 커스텀 MCP 서버의 도구가 **Tools & Integrations** 페이지에서 기본 제공 연결과 함께 표시됩니다. 다음을 수행할 수 있습니다:
- 다른 CrewAI 도구와 마찬가지로 crew의 **에이전트에 도구를 할당**합니다.
- **가시성을 관리**하여 어떤 팀원이 서버를 사용할 수 있는지 제어합니다.
- Connections 목록에서 언제든지 연결을 **편집하거나 제거**합니다.
<Warning>
MCP 서버에 접근할 수 없거나 자격 증명이 만료되면 해당 서버를 사용하는 도구 호출이 실패합니다. 서버 URL이 안정적이고 자격 증명이 최신 상태인지 확인하세요.
</Warning>
<Card title="도움이 필요하신가요?" icon="headset" href="mailto:support@crewai.com">
커스텀 MCP 서버 구성 또는 문제 해결에 대한 도움이 필요하면 지원팀에 문의하세요.
</Card>

View File

@@ -62,22 +62,22 @@ agent = Agent(
"https://mcp.exa.ai/mcp?api_key=your_key#web_search_exa"
```
### CrewAI AMP 마켓플레이스
### 연결된 MCP 통합
CrewAI AMP 마켓플레이스의 도구에 액세스하세요:
CrewAI 카탈로그에서 MCP 서버를 연결하거나 직접 가져올 수 있습니다. 계정에 연결한 후 슬러그로 참조하세요:
```python
# 모든 도구가 포함된 전체 서비스
"crewai-amp:financial-data"
# 모든 도구가 포함된 연결된 MCP
"snowflake"
# AMP 서비스의 특정 도구
"crewai-amp:research-tools#pubmed_search"
# 연결된 MCP의 특정 도구
"stripe#list_invoices"
# 다중 AMP 서비스
# 여러 연결된 MCP
mcps=[
"crewai-amp:weather-insights",
"crewai-amp:market-analysis",
"crewai-amp:social-media-monitoring"
"snowflake",
"stripe",
"github"
]
```
@@ -99,10 +99,10 @@ multi_source_agent = Agent(
"https://mcp.exa.ai/mcp?api_key=your_exa_key&profile=research",
"https://weather.api.com/mcp#get_current_conditions",
# CrewAI AMP 마켓플레이스
"crewai-amp:financial-insights",
"crewai-amp:academic-research#pubmed_search",
"crewai-amp:market-intelligence#competitor_analysis"
# 카탈로그에서 연결된 MCP
"snowflake",
"stripe#list_invoices",
"github#search_repositories"
]
)
@@ -154,7 +154,7 @@ agent = Agent(
"https://reliable-server.com/mcp", # 작동할 것
"https://unreachable-server.com/mcp", # 우아하게 건너뛸 것
"https://slow-server.com/mcp", # 우아하게 타임아웃될 것
"crewai-amp:working-service" # 작동할 것
"snowflake" # 카탈로그에서 연결된 MCP
]
)
# 에이전트는 작동하는 서버의 도구를 사용하고 실패한 서버에 대한 경고를 로그에 남깁니다
@@ -229,6 +229,6 @@ agent = Agent(
mcps=[
"https://primary-api.com/mcp", # 주요 선택
"https://backup-api.com/mcp", # 백업 옵션
"crewai-amp:reliable-service" # AMP 폴백
"snowflake" # 연결된 MCP 폴백
]
```

View File

@@ -25,8 +25,8 @@ agent = Agent(
mcps=[
"https://mcp.exa.ai/mcp?api_key=your_key", # 외부 MCP 서버
"https://api.weather.com/mcp#get_forecast", # 서버의 특정 도구
"crewai-amp:financial-data", # CrewAI AMP 마켓플레이스
"crewai-amp:research-tools#pubmed_search" # 특정 AMP 도구
"snowflake", # 카탈로그에서 연결된 MCP
"stripe#list_invoices" # 연결된 MCP의 특정 도구
]
)
# MCP 도구들이 이제 자동으로 에이전트에서 사용 가능합니다!

View File

@@ -4,6 +4,29 @@ description: "Atualizações de produto, melhorias e correções do CrewAI"
icon: "clock"
mode: "wide"
---
<Update label="15 mar 2026">
## v1.11.0rc1
[Ver release no GitHub](https://github.com/crewAIInc/crewAI/releases/tag/1.11.0rc1)
## O que Mudou
### Funcionalidades
- Adicionar autenticação de token da API Plus
- Implementar padrão de execução de plano
### Correções de Bugs
- Resolver problema de escape do sandbox do interpretador de código
### Documentação
- Atualizar changelog e versão para v1.10.2rc2
## Contribuidores
@Copilot, @greysonlalonde, @lorenzejay, @theCyberTech
</Update>
<Update label="14 mar 2026">
## v1.10.2rc2

View File

@@ -0,0 +1,39 @@
---
title: "Exportação OpenTelemetry"
description: "Exporte traces e logs das suas implantações CrewAI AMP para seu próprio coletor OpenTelemetry"
icon: "magnifying-glass-chart"
mode: "wide"
---
O CrewAI AMP pode exportar **traces** e **logs** do OpenTelemetry das suas implantações diretamente para seu próprio coletor. Isso permite que você monitore o desempenho dos agentes, rastreie chamadas de LLM e depure problemas usando sua stack de observabilidade existente.
Os dados de telemetria seguem as [convenções semânticas GenAI do OpenTelemetry](https://opentelemetry.io/docs/specs/semconv/gen-ai/) além de atributos adicionais específicos do CrewAI.
## Pré-requisitos
<CardGroup cols={2}>
<Card title="Conta CrewAI AMP" icon="users">
Sua organização deve ter uma conta CrewAI AMP ativa.
</Card>
<Card title="Coletor OpenTelemetry" icon="server">
Você precisa de um endpoint de coletor compatível com OpenTelemetry (por exemplo, seu próprio OTel Collector, Datadog, Grafana ou qualquer backend compatível com OTLP).
</Card>
</CardGroup>
## Configurando um coletor
1. No CrewAI AMP, vá para **Settings** > **OpenTelemetry Collectors**.
2. Clique em **Add Collector**.
3. Selecione um tipo de integração — **OpenTelemetry Traces** ou **OpenTelemetry Logs**.
4. Configure a conexão:
- **Endpoint** — O endpoint OTLP do seu coletor (por exemplo, `https://otel-collector.example.com:4317`).
- **Service Name** — Um nome para identificar este serviço na sua plataforma de observabilidade.
- **Custom Headers** *(opcional)* — Adicione headers de autenticação ou roteamento como pares chave-valor.
- **Certificate** *(opcional)* — Forneça um certificado TLS se o seu coletor exigir um.
5. Clique em **Save**.
<Frame>![Configuração do Coletor OpenTelemetry](/images/crewai-otel-collector-config.png)</Frame>
<Tip>
Você pode adicionar múltiplos coletores — por exemplo, um para traces e outro para logs, ou enviar para diferentes backends para diferentes propósitos.
</Tip>

View File

@@ -0,0 +1,136 @@
---
title: "Servidores MCP Personalizados"
description: "Conecte seus próprios servidores MCP ao CrewAI AMP com acesso público, autenticação por token ou OAuth 2.0"
icon: "plug"
mode: "wide"
---
O CrewAI AMP suporta a conexão com qualquer servidor MCP que implemente o [Model Context Protocol](https://modelcontextprotocol.io/). Você pode conectar servidores públicos que não exigem autenticação, servidores protegidos por chave de API ou token bearer, e servidores que utilizam OAuth 2.0 para acesso delegado seguro.
## Pré-requisitos
<CardGroup cols={2}>
<Card title="Conta CrewAI AMP" icon="user">
Você precisa de uma conta ativa no [CrewAI AMP](https://app.crewai.com).
</Card>
<Card title="URL do Servidor MCP" icon="link">
A URL do servidor MCP que você deseja conectar. O servidor deve ser acessível pela internet e suportar transporte Streamable HTTP.
</Card>
</CardGroup>
## Adicionando um Servidor MCP Personalizado
<Steps>
<Step title="Acesse Tools & Integrations">
Navegue até **Tools & Integrations** no menu lateral esquerdo do CrewAI AMP e selecione a aba **Connections**.
</Step>
<Step title="Inicie a adição de um Servidor MCP Personalizado">
Clique no botão **Add Custom MCP Server**. Um diálogo aparecerá com o formulário de configuração.
</Step>
<Step title="Preencha as informações básicas">
- **Name** (obrigatório): Um nome descritivo para seu servidor MCP (ex.: "Meu Servidor de Ferramentas Internas").
- **Description**: Um resumo opcional do que este servidor MCP fornece.
- **Server URL** (obrigatório): A URL completa do endpoint do seu servidor MCP (ex.: `https://my-server.example.com/mcp`).
</Step>
<Step title="Escolha um método de autenticação">
Selecione um dos três métodos de autenticação disponíveis com base em como seu servidor MCP está protegido. Veja as seções abaixo para detalhes sobre cada método.
</Step>
<Step title="Adicione headers personalizados (opcional)">
Se seu servidor MCP requer headers adicionais em cada requisição (ex.: identificadores de tenant ou headers de roteamento), clique em **+ Add Header** e forneça o nome e valor do header. Você pode adicionar múltiplos headers personalizados.
</Step>
<Step title="Crie a conexão">
Clique em **Create MCP Server** para salvar a conexão. Seu servidor MCP personalizado aparecerá na lista de Connections e suas ferramentas estarão disponíveis para uso nas suas crews.
</Step>
</Steps>
## Métodos de Autenticação
### Sem Autenticação
Escolha esta opção quando seu servidor MCP é publicamente acessível e não requer nenhuma credencial. Isso é comum para servidores open-source ou servidores internos rodando atrás de uma VPN.
### Token de Autenticação
Use este método quando seu servidor MCP é protegido por uma chave de API ou token bearer.
<Frame>
<img src="/images/enterprise/custom-mcp-auth-token.png" alt="Servidor MCP Personalizado com Token de Autenticação" />
</Frame>
| Campo | Obrigatório | Descrição |
|-------|-------------|-----------|
| **Header Name** | Sim | O nome do header HTTP que carrega o token (ex.: `X-API-Key`, `Authorization`). |
| **Value** | Sim | Sua chave de API ou token bearer. |
| **Add to** | Não | Onde anexar a credencial — **Header** (padrão) ou **Query parameter**. |
<Tip>
Se seu servidor espera um token `Bearer` no header `Authorization`, defina o Header Name como `Authorization` e o Value como `Bearer <seu-token>`.
</Tip>
### OAuth 2.0
Use este método para servidores MCP que requerem autorização OAuth 2.0. O CrewAI gerenciará todo o fluxo OAuth, incluindo a renovação de tokens.
<Frame>
<img src="/images/enterprise/custom-mcp-oauth.png" alt="Servidor MCP Personalizado com OAuth 2.0" />
</Frame>
| Campo | Obrigatório | Descrição |
|-------|-------------|-----------|
| **Redirect URI** | — | Preenchido automaticamente e somente leitura. Copie esta URI e registre-a como URI de redirecionamento autorizada no seu provedor OAuth. |
| **Authorization Endpoint** | Sim | A URL para onde os usuários são enviados para autorizar o acesso (ex.: `https://auth.example.com/oauth/authorize`). |
| **Token Endpoint** | Sim | A URL usada para trocar o código de autorização por um token de acesso (ex.: `https://auth.example.com/oauth/token`). |
| **Client ID** | Sim | O Client ID OAuth emitido pelo seu provedor. |
| **Client Secret** | Não | O Client Secret OAuth. Não é necessário para clientes públicos usando PKCE. |
| **Scopes** | Não | Lista de escopos separados por espaço a solicitar (ex.: `read write`). |
| **Token Auth Method** | Não | Como as credenciais do cliente são enviadas ao trocar tokens — **Standard (POST body)** ou **Basic Auth (header)**. Padrão é Standard. |
| **PKCE Supported** | Não | Ative se seu provedor OAuth suporta Proof Key for Code Exchange. Recomendado para maior segurança. |
<Info>
**Discover OAuth Config**: Se seu provedor OAuth suporta OpenID Connect Discovery, clique no link **Discover OAuth Config** para preencher automaticamente os endpoints de autorização e token a partir da URL `/.well-known/openid-configuration` do provedor.
</Info>
#### Configurando OAuth 2.0 Passo a Passo
<Steps>
<Step title="Registre a URI de redirecionamento">
Copie a **Redirect URI** exibida no formulário e adicione-a como URI de redirecionamento autorizada nas configurações do seu provedor OAuth.
</Step>
<Step title="Insira os endpoints e credenciais">
Preencha o **Authorization Endpoint**, **Token Endpoint**, **Client ID** e, opcionalmente, o **Client Secret** e **Scopes**.
</Step>
<Step title="Configure o método de troca de tokens">
Selecione o **Token Auth Method** apropriado. A maioria dos provedores usa o padrão **Standard (POST body)**. Alguns provedores mais antigos requerem **Basic Auth (header)**.
</Step>
<Step title="Ative o PKCE (recomendado)">
Marque **PKCE Supported** se seu provedor suporta. O PKCE adiciona uma camada extra de segurança ao fluxo de código de autorização e é recomendado para todas as novas integrações.
</Step>
<Step title="Crie e autorize">
Clique em **Create MCP Server**. Você será redirecionado ao seu provedor OAuth para autorizar o acesso. Uma vez autorizado, o CrewAI armazenará os tokens e os renovará automaticamente conforme necessário.
</Step>
</Steps>
## Usando Seu Servidor MCP Personalizado
Uma vez conectado, as ferramentas do seu servidor MCP personalizado aparecem junto com as conexões integradas na página **Tools & Integrations**. Você pode:
- **Atribuir ferramentas a agentes** nas suas crews, assim como qualquer outra ferramenta CrewAI.
- **Gerenciar visibilidade** para controlar quais membros da equipe podem usar o servidor.
- **Editar ou remover** a conexão a qualquer momento na lista de Connections.
<Warning>
Se seu servidor MCP ficar inacessível ou as credenciais expirarem, as chamadas de ferramentas usando esse servidor falharão. Certifique-se de que a URL do servidor seja estável e as credenciais estejam atualizadas.
</Warning>
<Card title="Precisa de Ajuda?" icon="headset" href="mailto:support@crewai.com">
Entre em contato com nossa equipe de suporte para assistência com configuração ou resolução de problemas de servidores MCP personalizados.
</Card>

View File

@@ -62,22 +62,22 @@ Use a sintaxe `#` para selecionar ferramentas específicas de um servidor:
"https://mcp.exa.ai/mcp?api_key=sua_chave#web_search_exa"
```
### Marketplace CrewAI AMP
### Integrações MCP Conectadas
Acesse ferramentas do marketplace CrewAI AMP:
Conecte servidores MCP do catálogo CrewAI ou traga os seus próprios. Uma vez conectados em sua conta, referencie-os pelo slug:
```python
# Serviço completo com todas as ferramentas
"crewai-amp:financial-data"
# MCP conectado com todas as ferramentas
"snowflake"
# Ferramenta específica do serviço AMP
"crewai-amp:research-tools#pubmed_search"
# Ferramenta específica de um MCP conectado
"stripe#list_invoices"
# Múltiplos serviços AMP
# Múltiplos MCPs conectados
mcps=[
"crewai-amp:weather-insights",
"crewai-amp:market-analysis",
"crewai-amp:social-media-monitoring"
"snowflake",
"stripe",
"github"
]
```
@@ -99,10 +99,10 @@ agente_multi_fonte = Agent(
"https://mcp.exa.ai/mcp?api_key=sua_chave_exa&profile=pesquisa",
"https://weather.api.com/mcp#get_current_conditions",
# Marketplace CrewAI AMP
"crewai-amp:financial-insights",
"crewai-amp:academic-research#pubmed_search",
"crewai-amp:market-intelligence#competitor_analysis"
# MCPs conectados do catálogo
"snowflake",
"stripe#list_invoices",
"github#search_repositories"
]
)
@@ -154,7 +154,7 @@ agente = Agent(
"https://servidor-confiavel.com/mcp", # Vai funcionar
"https://servidor-inalcancavel.com/mcp", # Será ignorado graciosamente
"https://servidor-lento.com/mcp", # Timeout gracioso
"crewai-amp:servico-funcionando" # Vai funcionar
"snowflake" # MCP conectado do catálogo
]
)
# O agente usará ferramentas de servidores funcionais e registrará avisos para os que falharem
@@ -229,6 +229,6 @@ agente = Agent(
mcps=[
"https://api-principal.com/mcp", # Escolha principal
"https://api-backup.com/mcp", # Opção de backup
"crewai-amp:servico-confiavel" # Fallback AMP
"snowflake" # Fallback MCP conectado
]
```

View File

@@ -25,8 +25,8 @@ agent = Agent(
mcps=[
"https://mcp.exa.ai/mcp?api_key=sua_chave", # Servidor MCP externo
"https://api.weather.com/mcp#get_forecast", # Ferramenta específica do servidor
"crewai-amp:financial-data", # Marketplace CrewAI AMP
"crewai-amp:research-tools#pubmed_search" # Ferramenta AMP específica
"snowflake", # MCP conectado do catálogo
"stripe#list_invoices" # Ferramenta específica de MCP conectado
]
)
# Ferramentas MCP agora estão automaticamente disponíveis para seu agente!

View File

@@ -1,15 +0,0 @@
# crewai-cli
CLI for CrewAI - scaffold, run, deploy and manage AI agent crews without installing the full framework.
## Installation
```bash
pip install crewai-cli
```
Or install alongside the full framework:
```bash
pip install crewai[cli]
```

View File

@@ -1,39 +0,0 @@
[project]
name = "crewai-cli"
version = "1.10.0"
description = "CLI for CrewAI - scaffold, run, deploy and manage AI agent crews without installing the full framework."
readme = "README.md"
authors = [
{ name = "Joao Moura", email = "joao@crewai.com" }
]
requires-python = ">=3.10, <3.14"
dependencies = [
"click~=8.1.7",
"pydantic~=2.11.9",
"pydantic-settings~=2.10.1",
"appdirs~=1.4.4",
"httpx~=0.28.1",
"pyjwt>=2.9.0,<3",
"rich>=13.7.1",
"tomli~=2.0.2",
"tomli-w~=1.1.0",
"packaging>=23.0",
"python-dotenv~=1.1.1",
"uv~=0.9.13",
"portalocker~=2.7.0",
]
[project.urls]
Homepage = "https://crewai.com"
Documentation = "https://docs.crewai.com"
Repository = "https://github.com/crewAIInc/crewAI"
[project.scripts]
crewai = "crewai_cli.cli:crewai"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/crewai_cli"]

View File

@@ -1 +0,0 @@
__version__ = "1.10.0"

View File

@@ -1,4 +0,0 @@
from crewai_cli.authentication.main import AuthenticationCommand
__all__ = ["AuthenticationCommand"]

View File

@@ -1,23 +0,0 @@
"""Wrapper for the crew chat command.
Delegates to ``crewai.utilities.crew_chat.run_chat`` when the full crewai
package is installed, otherwise prints a helpful error message.
"""
from __future__ import annotations
import click
def run_chat() -> None:
try:
from crewai.utilities.crew_chat import run_chat as _run_chat
except ImportError:
click.secho(
"The 'chat' command requires the full crewai package.\n"
"Install it with: pip install crewai",
fg="red",
)
raise SystemExit(1) from None
_run_chat()

View File

@@ -1,210 +0,0 @@
import os
from typing import Any
from urllib.parse import urljoin
import httpx
from crewai_cli.config import Settings
from crewai_cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai_cli.version import get_crewai_version
class PlusAPI:
"""
This class exposes methods for working with the CrewAI+ API.
"""
TOOLS_RESOURCE = "/crewai_plus/api/v1/tools"
ORGANIZATIONS_RESOURCE = "/crewai_plus/api/v1/me/organizations"
CREWS_RESOURCE = "/crewai_plus/api/v1/crews"
AGENTS_RESOURCE = "/crewai_plus/api/v1/agents"
TRACING_RESOURCE = "/crewai_plus/api/v1/tracing"
EPHEMERAL_TRACING_RESOURCE = "/crewai_plus/api/v1/tracing/ephemeral"
INTEGRATIONS_RESOURCE = "/crewai_plus/api/v1/integrations"
def __init__(self, api_key: str) -> None:
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": f"CrewAI-CLI/{get_crewai_version()}",
"X-Crewai-Version": get_crewai_version(),
}
settings = Settings()
if settings.org_uuid:
self.headers["X-Crewai-Organization-Id"] = settings.org_uuid
self.base_url = (
os.getenv("CREWAI_PLUS_URL")
or str(settings.enterprise_base_url)
or DEFAULT_CREWAI_ENTERPRISE_URL
)
def _make_request(
self, method: str, endpoint: str, **kwargs: Any
) -> httpx.Response:
url = urljoin(self.base_url, endpoint)
verify = kwargs.pop("verify", True)
with httpx.Client(trust_env=False, verify=verify) as client:
return client.request(method, url, headers=self.headers, **kwargs)
def login_to_tool_repository(self) -> httpx.Response:
return self._make_request("POST", f"{self.TOOLS_RESOURCE}/login")
def get_tool(self, handle: str) -> httpx.Response:
return self._make_request("GET", f"{self.TOOLS_RESOURCE}/{handle}")
async def get_agent(self, handle: str) -> httpx.Response:
url = urljoin(self.base_url, f"{self.AGENTS_RESOURCE}/{handle}")
async with httpx.AsyncClient() as client:
return await client.get(url, headers=self.headers)
def publish_tool(
self,
handle: str,
is_public: bool,
version: str,
description: str | None,
encoded_file: str,
available_exports: list[dict[str, Any]] | None = None,
) -> httpx.Response:
params = {
"handle": handle,
"public": is_public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": available_exports,
}
return self._make_request("POST", f"{self.TOOLS_RESOURCE}", json=params)
def deploy_by_name(self, project_name: str) -> httpx.Response:
return self._make_request(
"POST", f"{self.CREWS_RESOURCE}/by-name/{project_name}/deploy"
)
def deploy_by_uuid(self, uuid: str) -> httpx.Response:
return self._make_request("POST", f"{self.CREWS_RESOURCE}/{uuid}/deploy")
def crew_status_by_name(self, project_name: str) -> httpx.Response:
return self._make_request(
"GET", f"{self.CREWS_RESOURCE}/by-name/{project_name}/status"
)
def crew_status_by_uuid(self, uuid: str) -> httpx.Response:
return self._make_request("GET", f"{self.CREWS_RESOURCE}/{uuid}/status")
def crew_by_name(
self, project_name: str, log_type: str = "deployment"
) -> httpx.Response:
return self._make_request(
"GET", f"{self.CREWS_RESOURCE}/by-name/{project_name}/logs/{log_type}"
)
def crew_by_uuid(self, uuid: str, log_type: str = "deployment") -> httpx.Response:
return self._make_request(
"GET", f"{self.CREWS_RESOURCE}/{uuid}/logs/{log_type}"
)
def delete_crew_by_name(self, project_name: str) -> httpx.Response:
return self._make_request(
"DELETE", f"{self.CREWS_RESOURCE}/by-name/{project_name}"
)
def delete_crew_by_uuid(self, uuid: str) -> httpx.Response:
return self._make_request("DELETE", f"{self.CREWS_RESOURCE}/{uuid}")
def list_crews(self) -> httpx.Response:
return self._make_request("GET", self.CREWS_RESOURCE)
def create_crew(self, payload: dict[str, Any]) -> httpx.Response:
return self._make_request("POST", self.CREWS_RESOURCE, json=payload)
def get_organizations(self) -> httpx.Response:
return self._make_request("GET", self.ORGANIZATIONS_RESOURCE)
def initialize_trace_batch(self, payload: dict[str, Any]) -> httpx.Response:
return self._make_request(
"POST",
f"{self.TRACING_RESOURCE}/batches",
json=payload,
timeout=30,
)
def initialize_ephemeral_trace_batch(
self, payload: dict[str, Any]
) -> httpx.Response:
return self._make_request(
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches",
json=payload,
)
def send_trace_events(
self, trace_batch_id: str, payload: dict[str, Any]
) -> httpx.Response:
return self._make_request(
"POST",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
timeout=30,
)
def send_ephemeral_trace_events(
self, trace_batch_id: str, payload: dict[str, Any]
) -> httpx.Response:
return self._make_request(
"POST",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/events",
json=payload,
timeout=30,
)
def finalize_trace_batch(
self, trace_batch_id: str, payload: dict[str, Any]
) -> httpx.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
timeout=30,
)
def finalize_ephemeral_trace_batch(
self, trace_batch_id: str, payload: dict[str, Any]
) -> httpx.Response:
return self._make_request(
"PATCH",
f"{self.EPHEMERAL_TRACING_RESOURCE}/batches/{trace_batch_id}/finalize",
json=payload,
timeout=30,
)
def mark_trace_batch_as_failed(
self, trace_batch_id: str, error_message: str
) -> httpx.Response:
return self._make_request(
"PATCH",
f"{self.TRACING_RESOURCE}/batches/{trace_batch_id}",
json={"status": "failed", "failure_reason": error_message},
timeout=30,
)
def get_mcp_configs(self, slugs: list[str]) -> httpx.Response:
"""Get MCP server configurations for the given slugs."""
return self._make_request(
"GET",
f"{self.INTEGRATIONS_RESOURCE}/mcp_configs",
params={"slugs": ",".join(slugs)},
timeout=30,
)
def get_triggers(self) -> httpx.Response:
"""Get all available triggers from integrations."""
return self._make_request("GET", f"{self.INTEGRATIONS_RESOURCE}/apps")
def get_trigger_payload(self, app_slug: str, trigger_slug: str) -> httpx.Response:
"""Get sample payload for a specific trigger."""
return self._make_request(
"GET", f"{self.INTEGRATIONS_RESOURCE}/{app_slug}/{trigger_slug}/payload"
)

View File

@@ -1,31 +0,0 @@
"""Wrapper for the reset-memories command.
Delegates to ``crewai.utilities.reset_memories`` when the full crewai
package is installed, otherwise prints a helpful error message.
"""
from __future__ import annotations
import click
def reset_memories_command(
memory: bool,
knowledge: bool,
agent_knowledge: bool,
kickoff_outputs: bool,
all: bool,
) -> None:
try:
from crewai.utilities.reset_memories import (
reset_memories_command as _reset,
)
except ImportError:
click.secho(
"The 'reset-memories' command requires the full crewai package.\n"
"Install it with: pip install crewai",
fg="red",
)
raise SystemExit(1) from None
_reset(memory, knowledge, agent_knowledge, kickoff_outputs, all)

View File

@@ -1,54 +0,0 @@
"""Lightweight SQLite reader for kickoff task outputs.
Only used by the ``crewai log-tasks-outputs`` CLI command. Depends solely on
the standard library + *appdirs* so crewai-cli can read stored outputs without
importing the full crewai framework.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
import sqlite3
from typing import Any
from crewai_cli.user_data import _db_storage_path
logger = logging.getLogger(__name__)
def load_task_outputs(db_path: str | None = None) -> list[dict[str, Any]]:
"""Return all rows from the kickoff task outputs database."""
if db_path is None:
db_path = str(Path(_db_storage_path()) / "latest_kickoff_task_outputs.db")
if not Path(db_path).exists():
return []
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM latest_kickoff_task_outputs
ORDER BY task_index
""")
rows = cursor.fetchall()
results: list[dict[str, Any]] = [
{
"task_id": row[0],
"expected_output": row[1],
"output": json.loads(row[2]),
"task_index": row[3],
"inputs": json.loads(row[4]),
"was_replayed": row[5],
"timestamp": row[6],
}
for row in rows
]
return results
except sqlite3.Error as e:
logger.error("Failed to load task outputs: %s", e)
return []

View File

@@ -1,66 +0,0 @@
"""Standalone user-data helpers for the CLI package.
These mirror the functions in ``crewai.events.listeners.tracing.utils`` but
depend only on the standard library + *appdirs* so that crewai-cli can work
without importing the full crewai framework.
"""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
from typing import Any, cast
import appdirs
logger = logging.getLogger(__name__)
def _get_project_directory_name() -> str:
return os.environ.get("CREWAI_STORAGE_DIR", Path.cwd().name)
def _db_storage_path() -> str:
app_name = _get_project_directory_name()
app_author = "CrewAI"
data_dir = Path(appdirs.user_data_dir(app_name, app_author))
data_dir.mkdir(parents=True, exist_ok=True)
return str(data_dir)
def _user_data_file() -> Path:
base = Path(_db_storage_path())
base.mkdir(parents=True, exist_ok=True)
return base / ".crewai_user.json"
def _load_user_data() -> dict[str, Any]:
p = _user_data_file()
if p.exists():
try:
return cast(dict[str, Any], json.loads(p.read_text()))
except (json.JSONDecodeError, OSError, PermissionError) as e:
logger.warning("Failed to load user data: %s", e)
return {}
def _save_user_data(data: dict[str, Any]) -> None:
try:
p = _user_data_file()
p.write_text(json.dumps(data, indent=2))
except (OSError, PermissionError) as e:
logger.warning("Failed to save user data: %s", e)
def is_tracing_enabled() -> bool:
"""Check if tracing is enabled (mirrors crewai core logic)."""
data = _load_user_data()
if (
data.get("first_execution_done", False)
and data.get("trace_consent", False) is False
):
return False
return os.getenv("CREWAI_TRACING_ENABLED", "false").lower() == "true"

View File

@@ -1,369 +0,0 @@
from __future__ import annotations
from functools import reduce
from inspect import getmro, isclass
import os
from pathlib import Path
import shutil
import sys
from typing import Any, cast
import click
from rich.console import Console
import tomli
from crewai_cli.config import Settings
from crewai_cli.constants import ENV_VARS
if sys.version_info >= (3, 11):
import tomllib
console = Console()
def copy_template(
src: Path, dst: Path, name: str, class_name: str, folder_name: str
) -> None:
"""Copy a file from src to dst."""
with open(src, "r") as file:
content = file.read()
content = content.replace("{{name}}", name)
content = content.replace("{{crew_name}}", class_name)
content = content.replace("{{folder_name}}", folder_name)
with open(dst, "w") as file:
file.write(content)
click.secho(f" - Created {dst}", fg="green")
def read_toml(file_path: str = "pyproject.toml") -> dict[str, Any]:
"""Read the content of a TOML file and return it as a dictionary."""
with open(file_path, "rb") as f:
return tomli.load(f)
def parse_toml(content: str) -> dict[str, Any]:
if sys.version_info >= (3, 11):
return tomllib.loads(content)
return tomli.loads(content)
def get_project_name(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Get the project name from the pyproject.toml file."""
return _get_project_attribute(pyproject_path, ["project", "name"], require=require)
def get_project_version(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Get the project version from the pyproject.toml file."""
return _get_project_attribute(
pyproject_path, ["project", "version"], require=require
)
def get_project_description(
pyproject_path: str = "pyproject.toml", require: bool = False
) -> str | None:
"""Get the project description from the pyproject.toml file."""
return _get_project_attribute(
pyproject_path, ["project", "description"], require=require
)
def _get_project_attribute(
pyproject_path: str, keys: list[str], require: bool
) -> Any | None:
"""Get an attribute from the pyproject.toml file."""
attribute = None
try:
with open(pyproject_path, "r") as f:
pyproject_content = parse_toml(f.read())
dependencies = (
_get_nested_value(pyproject_content, ["project", "dependencies"]) or []
)
if not any(True for dep in dependencies if "crewai" in dep):
raise Exception("crewai is not in the dependencies.")
attribute = _get_nested_value(pyproject_content, keys)
except FileNotFoundError:
console.print(f"Error: {pyproject_path} not found.", style="bold red")
except KeyError:
console.print(
f"Error: {pyproject_path} is not a valid pyproject.toml file.",
style="bold red",
)
except Exception as e:
if sys.version_info >= (3, 11) and isinstance(e, tomllib.TOMLDecodeError):
console.print(
f"Error: {pyproject_path} is not a valid TOML file.", style="bold red"
)
else:
console.print(
f"Error reading the pyproject.toml file: {e}", style="bold red"
)
if require and not attribute:
console.print(
f"Unable to read '{'.'.join(keys)}' in the pyproject.toml file. Please verify that the file exists and contains the specified attribute.",
style="bold red",
)
raise SystemExit
return attribute
def _get_nested_value(data: dict[str, Any], keys: list[str]) -> Any:
return reduce(dict.__getitem__, keys, data)
def fetch_and_json_env_file(env_file_path: str = ".env") -> dict[str, Any]:
"""Fetch the environment variables from a .env file and return them as a dictionary."""
try:
with open(env_file_path, "r") as f:
env_content = f.read()
env_dict = {}
for line in env_content.splitlines():
if line.strip() and not line.strip().startswith("#"):
key, value = line.split("=", 1)
env_dict[key.strip()] = value.strip()
return env_dict
except FileNotFoundError:
console.print(f"Error: {env_file_path} not found.", style="bold red")
except Exception as e:
console.print(f"Error reading the .env file: {e}", style="bold red")
return {}
def tree_copy(source: Path, destination: Path) -> None:
"""Copies the entire directory structure from the source to the destination."""
for item in os.listdir(source):
source_item = os.path.join(source, item)
destination_item = os.path.join(destination, item)
if os.path.isdir(source_item):
shutil.copytree(source_item, destination_item)
else:
shutil.copy2(source_item, destination_item)
def tree_find_and_replace(directory: Path, find: str, replace: str) -> None:
"""Recursively searches through a directory, replacing a target string in
both file contents and filenames with a specified replacement string.
"""
for path, dirs, files in os.walk(os.path.abspath(directory), topdown=False):
for filename in files:
filepath = os.path.join(path, filename)
with open(filepath, "r", encoding="utf-8", errors="ignore") as file:
contents = file.read()
with open(filepath, "w") as file:
file.write(contents.replace(find, replace))
if find in filename:
new_filename = filename.replace(find, replace)
new_filepath = os.path.join(path, new_filename)
os.rename(filepath, new_filepath)
for dirname in dirs:
if find in dirname:
new_dirname = dirname.replace(find, replace)
new_dirpath = os.path.join(path, new_dirname)
old_dirpath = os.path.join(path, dirname)
os.rename(old_dirpath, new_dirpath)
def load_env_vars(folder_path: Path) -> dict[str, Any]:
"""Loads environment variables from a .env file in the specified folder path."""
env_file_path = folder_path / ".env"
env_vars = {}
if env_file_path.exists():
with open(env_file_path, "r") as file:
for line in file:
key, _, value = line.strip().partition("=")
if key and value:
env_vars[key] = value
return env_vars
def update_env_vars(
env_vars: dict[str, Any], provider: str, model: str
) -> dict[str, Any] | None:
"""Updates environment variables with the API key for the selected provider and model."""
provider_config = cast(
list[str],
ENV_VARS.get(
provider,
[
click.prompt(
f"Enter the environment variable name for your {provider.capitalize()} API key",
type=str,
)
],
),
)
api_key_var = provider_config[0]
if api_key_var not in env_vars:
try:
env_vars[api_key_var] = click.prompt(
f"Enter your {provider.capitalize()} API key", type=str, hide_input=True
)
except click.exceptions.Abort:
click.secho("Operation aborted by the user.", fg="red")
return None
else:
click.secho(f"API key already exists for {provider.capitalize()}.", fg="yellow")
env_vars["MODEL"] = model
click.secho(f"Selected model: {model}", fg="green")
return env_vars
def write_env_file(folder_path: Path, env_vars: dict[str, Any]) -> None:
"""Writes environment variables to a .env file in the specified folder."""
env_file_path = folder_path / ".env"
with open(env_file_path, "w") as file:
for key, value in env_vars.items():
file.write(f"{key.upper()}={value}\n")
def is_valid_tool(obj: Any) -> bool:
"""Check if an object is a valid tool class.
Works without importing crewai by checking MRO class names.
Falls back to crewai's ``is_valid_tool`` when available.
"""
try:
from crewai.utilities.project_utils import is_valid_tool as _core_is_valid_tool
return _core_is_valid_tool(obj)
except ImportError:
pass
if isclass(obj):
try:
return any(base.__name__ == "BaseTool" for base in getmro(obj))
except (TypeError, AttributeError):
return False
return False
def extract_available_exports(dir_path: str = "src") -> list[dict[str, Any]]:
"""Extract available tool classes from the project's __init__.py files."""
try:
init_files = Path(dir_path).glob("**/__init__.py")
available_exports: list[dict[str, Any]] = []
for init_file in init_files:
tools = _load_tools_from_init(init_file)
available_exports.extend(tools)
if not available_exports:
_print_no_tools_warning()
raise SystemExit(1)
return available_exports
except SystemExit:
raise
except Exception as e:
console.print(f"[red]Error: Could not extract tool classes: {e!s}[/red]")
console.print(
"Please ensure your project contains valid tools (classes inheriting from BaseTool or functions with @tool decorator)."
)
raise SystemExit(1) from e
def _load_tools_from_init(init_file: Path) -> list[dict[str, Any]]:
"""Load and validate tools from a given __init__.py file."""
import importlib.util as _importlib_util
spec = _importlib_util.spec_from_file_location("temp_module", init_file)
if not spec or not spec.loader:
return []
module = _importlib_util.module_from_spec(spec)
sys.modules["temp_module"] = module
try:
spec.loader.exec_module(module)
if not hasattr(module, "__all__"):
console.print(
f"Warning: No __all__ defined in {init_file}",
style="bold yellow",
)
raise SystemExit(1)
return [
{"name": name}
for name in module.__all__
if hasattr(module, name) and is_valid_tool(getattr(module, name))
]
except SystemExit:
raise
except Exception as e:
console.print(f"[red]Warning: Could not load {init_file}: {e!s}[/red]")
raise SystemExit(1) from e
finally:
sys.modules.pop("temp_module", None)
def _print_no_tools_warning() -> None:
"""Display warning and usage instructions if no tools were found."""
console.print(
"\n[bold yellow]Warning: No valid tools were exposed in your __init__.py file![/bold yellow]"
)
console.print(
"Your __init__.py file must contain all classes that inherit from [bold]BaseTool[/bold] "
"or functions decorated with [bold]@tool[/bold]."
)
console.print(
"\nExample:\n[dim]# In your __init__.py file[/dim]\n"
"[green]__all__ = ['YourTool', 'your_tool_function'][/green]\n\n"
"[dim]# In your tool.py file[/dim]\n"
"[green]from crewai.tools import BaseTool, tool\n\n"
"# Tool class example\n"
"class YourTool(BaseTool):\n"
' name = "your_tool"\n'
' description = "Your tool description"\n'
" # ... rest of implementation\n\n"
"# Decorated function example\n"
"@tool\n"
"def your_tool_function(text: str) -> str:\n"
' """Your tool description"""\n'
" # ... implementation\n"
" return result\n"
)
def build_env_with_tool_repository_credentials(
repository_handle: str,
) -> dict[str, Any]:
repository_handle = repository_handle.upper().replace("-", "_")
settings = Settings()
env = os.environ.copy()
env[f"UV_INDEX_{repository_handle}_USERNAME"] = str(
settings.tool_repository_username or ""
)
env[f"UV_INDEX_{repository_handle}_PASSWORD"] = str(
settings.tool_repository_password or ""
)
return env

View File

@@ -1,91 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.auth0 import Auth0Provider
class TestAuth0Provider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="auth0",
domain="test-domain.auth0.com",
client_id="test-client-id",
audience="test-audience"
)
self.provider = Auth0Provider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = Auth0Provider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "auth0"
assert provider.settings.domain == "test-domain.auth0.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://test-domain.auth0.com/oauth/device/code"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="my-company.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://my-company.auth0.com/oauth/device/code"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://test-domain.auth0.com/oauth/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="another-domain.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://another-domain.auth0.com/oauth/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://test-domain.auth0.com/.well-known/jwks.json"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="dev.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_url = "https://dev.auth0.com/.well-known/jwks.json"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://test-domain.auth0.com/"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="auth0",
domain="prod.auth0.com",
client_id="test-client",
audience="test-audience"
)
provider = Auth0Provider(settings)
expected_issuer = "https://prod.auth0.com/"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"

View File

@@ -1,141 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.entra_id import EntraIdProvider
class TestEntraIdProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "openid profile email api://crewai-cli-dev/read"
}
)
self.provider = EntraIdProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = EntraIdProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "entra_id"
assert provider.settings.domain == "tenant-id-abcdef123456"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/oauth2/v2.0/devicecode"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="my-company.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/my-company.entra.id/oauth2/v2.0/devicecode"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/oauth2/v2.0/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="another-domain.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/another-domain.entra.id/oauth2/v2.0/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://login.microsoftonline.com/tenant-id-abcdef123456/discovery/v2.0/keys"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="dev.entra.id",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_url = "https://login.microsoftonline.com/dev.entra.id/discovery/v2.0/keys"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://login.microsoftonline.com/tenant-id-abcdef123456/v2.0"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
# For EntraID, the domain is the tenant ID.
settings = Oauth2Settings(
provider="entra_id",
domain="other-tenant-id-xpto",
client_id="test-client",
audience="test-audience",
)
provider = EntraIdProvider(settings)
expected_issuer = "https://login.microsoftonline.com/other-tenant-id-xpto/v2.0"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_assertion_error_when_none(self):
settings = Oauth2Settings(
provider="entra_id",
domain="test-tenant-id",
client_id="test-client-id",
audience=None,
)
provider = EntraIdProvider(settings)
with pytest.raises(ValueError, match="Audience is required"):
provider.get_audience()
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert set(self.provider.get_required_fields()) == set(["scope"])
def test_get_oauth_scopes(self):
settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "api://crewai-cli-dev/read"
}
)
provider = EntraIdProvider(settings)
assert provider.get_oauth_scopes() == ["openid", "profile", "email", "api://crewai-cli-dev/read"]
def test_get_oauth_scopes_with_multiple_custom_scopes(self):
settings = Oauth2Settings(
provider="entra_id",
domain="tenant-id-abcdef123456",
client_id="test-client-id",
audience="test-audience",
extra={
"scope": "api://crewai-cli-dev/read api://crewai-cli-dev/write custom-scope1 custom-scope2"
}
)
provider = EntraIdProvider(settings)
assert provider.get_oauth_scopes() == ["openid", "profile", "email", "api://crewai-cli-dev/read", "api://crewai-cli-dev/write", "custom-scope1", "custom-scope2"]
def test_base_url(self):
assert self.provider._base_url() == "https://login.microsoftonline.com/tenant-id-abcdef123456"

View File

@@ -1,138 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.keycloak import KeycloakProvider
class TestKeycloakProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="keycloak",
domain="keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
self.provider = KeycloakProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = KeycloakProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "keycloak"
assert provider.settings.domain == "keycloak.example.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
assert provider.settings.extra.get("realm") == "test-realm"
def test_get_authorize_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/auth/device"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="auth.company.com",
client_id="test-client",
audience="test-audience",
extra={
"realm": "my-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://auth.company.com/realms/my-realm/protocol/openid-connect/auth/device"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="sso.enterprise.com",
client_id="test-client",
audience="test-audience",
extra={
"realm": "enterprise-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://sso.enterprise.com/realms/enterprise-realm/protocol/openid-connect/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://keycloak.example.com/realms/test-realm/protocol/openid-connect/certs"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="identity.org",
client_id="test-client",
audience="test-audience",
extra={
"realm": "org-realm"
}
)
provider = KeycloakProvider(settings)
expected_url = "https://identity.org/realms/org-realm/protocol/openid-connect/certs"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://keycloak.example.com/realms/test-realm"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="keycloak",
domain="login.myapp.io",
client_id="test-client",
audience="test-audience",
extra={
"realm": "app-realm"
}
)
provider = KeycloakProvider(settings)
expected_issuer = "https://login.myapp.io/realms/app-realm"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert self.provider.get_required_fields() == ["realm"]
def test_oauth2_base_url(self):
assert self.provider._oauth2_base_url() == "https://keycloak.example.com"
def test_oauth2_base_url_strips_https_prefix(self):
settings = Oauth2Settings(
provider="keycloak",
domain="https://keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
provider = KeycloakProvider(settings)
assert provider._oauth2_base_url() == "https://keycloak.example.com"
def test_oauth2_base_url_strips_http_prefix(self):
settings = Oauth2Settings(
provider="keycloak",
domain="http://keycloak.example.com",
client_id="test-client-id",
audience="test-audience",
extra={
"realm": "test-realm"
}
)
provider = KeycloakProvider(settings)
assert provider._oauth2_base_url() == "https://keycloak.example.com"

View File

@@ -1,257 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.okta import OktaProvider
class TestOktaProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience="test-audience",
)
self.provider = OktaProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = OktaProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "okta"
assert provider.settings.domain == "test-domain.okta.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/device/authorize"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="my-company.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://my-company.okta.com/oauth2/default/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_authorize_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/device/authorize"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="another-domain.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://another-domain.okta.com/oauth2/default/v1/token"
assert provider.get_token_url() == expected_url
def test_get_token_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/token"
assert provider.get_token_url() == expected_url
def test_get_token_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://test-domain.okta.com/oauth2/default/v1/keys"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="dev.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_url = "https://dev.okta.com/oauth2/default/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_jwks_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_url = "https://test-domain.okta.com/oauth2/v1/keys"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://test-domain.okta.com/oauth2/default"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="okta",
domain="prod.okta.com",
client_id="test-client",
audience="test-audience",
)
provider = OktaProvider(settings)
expected_issuer = "https://prod.okta.com/oauth2/default"
assert provider.get_issuer() == expected_issuer
def test_get_issuer_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
expected_issuer = "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777"
assert provider.get_issuer() == expected_issuer
def test_get_issuer_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
expected_issuer = "https://test-domain.okta.com"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_assertion_error_when_none(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
)
provider = OktaProvider(settings)
with pytest.raises(ValueError, match="Audience is required"):
provider.get_audience()
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"
def test_get_required_fields(self):
assert set(self.provider.get_required_fields()) == set(["authorization_server_name", "using_org_auth_server"])
def test_oauth2_base_url(self):
assert self.provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2/default"
def test_oauth2_base_url_with_custom_authorization_server_name(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": False,
"authorization_server_name": "my_auth_server_xxxAAA777"
}
)
provider = OktaProvider(settings)
assert provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2/my_auth_server_xxxAAA777"
def test_oauth2_base_url_when_using_org_auth_server(self):
settings = Oauth2Settings(
provider="okta",
domain="test-domain.okta.com",
client_id="test-client-id",
audience=None,
extra={
"using_org_auth_server": True,
"authorization_server_name": None
}
)
provider = OktaProvider(settings)
assert provider._oauth2_base_url() == "https://test-domain.okta.com/oauth2"

View File

@@ -1,100 +0,0 @@
import pytest
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.workos import WorkosProvider
class TestWorkosProvider:
@pytest.fixture(autouse=True)
def setup_method(self):
self.valid_settings = Oauth2Settings(
provider="workos",
domain="login.company.com",
client_id="test-client-id",
audience="test-audience"
)
self.provider = WorkosProvider(self.valid_settings)
def test_initialization_with_valid_settings(self):
provider = WorkosProvider(self.valid_settings)
assert provider.settings == self.valid_settings
assert provider.settings.provider == "workos"
assert provider.settings.domain == "login.company.com"
assert provider.settings.client_id == "test-client-id"
assert provider.settings.audience == "test-audience"
def test_get_authorize_url(self):
expected_url = "https://login.company.com/oauth2/device_authorization"
assert self.provider.get_authorize_url() == expected_url
def test_get_authorize_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="login.example.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://login.example.com/oauth2/device_authorization"
assert provider.get_authorize_url() == expected_url
def test_get_token_url(self):
expected_url = "https://login.company.com/oauth2/token"
assert self.provider.get_token_url() == expected_url
def test_get_token_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="api.workos.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://api.workos.com/oauth2/token"
assert provider.get_token_url() == expected_url
def test_get_jwks_url(self):
expected_url = "https://login.company.com/oauth2/jwks"
assert self.provider.get_jwks_url() == expected_url
def test_get_jwks_url_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="auth.enterprise.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_url = "https://auth.enterprise.com/oauth2/jwks"
assert provider.get_jwks_url() == expected_url
def test_get_issuer(self):
expected_issuer = "https://login.company.com"
assert self.provider.get_issuer() == expected_issuer
def test_get_issuer_with_different_domain(self):
settings = Oauth2Settings(
provider="workos",
domain="sso.company.com",
client_id="test-client",
audience="test-audience"
)
provider = WorkosProvider(settings)
expected_issuer = "https://sso.company.com"
assert provider.get_issuer() == expected_issuer
def test_get_audience(self):
assert self.provider.get_audience() == "test-audience"
def test_get_audience_fallback_to_default(self):
settings = Oauth2Settings(
provider="workos",
domain="login.company.com",
client_id="test-client-id",
audience=None
)
provider = WorkosProvider(settings)
assert provider.get_audience() == ""
def test_get_client_id(self):
assert self.provider.get_client_id() == "test-client-id"

View File

@@ -1,348 +0,0 @@
from datetime import datetime, timedelta
from unittest.mock import MagicMock, call, patch
import pytest
import httpx
from crewai_cli.authentication.main import AuthenticationCommand
from crewai_cli.constants import (
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
)
class TestAuthenticationCommand:
def setup_method(self):
# Mock Settings so we always use default constants regardless of local config.
with patch("crewai_cli.authentication.main.Settings") as mock_settings:
instance = mock_settings.return_value
instance.oauth2_provider = "workos"
instance.oauth2_domain = CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN
instance.oauth2_client_id = CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID
instance.oauth2_audience = CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE
instance.oauth2_extra = {}
self.auth_command = AuthenticationCommand()
@pytest.mark.parametrize(
"user_provider,expected_urls",
[
(
"workos",
{
"device_code_url": f"https://{CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN}/oauth2/device_authorization",
"token_url": f"https://{CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN}/oauth2/token",
"client_id": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID,
"audience": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE,
"domain": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
},
),
],
)
@patch("crewai_cli.authentication.main.AuthenticationCommand._get_device_code")
@patch(
"crewai_cli.authentication.main.AuthenticationCommand._display_auth_instructions"
)
@patch("crewai_cli.authentication.main.AuthenticationCommand._poll_for_token")
@patch("crewai_cli.authentication.main.console.print")
def test_login(
self,
mock_console_print,
mock_poll,
mock_display,
mock_get_device,
user_provider,
expected_urls,
):
mock_get_device.return_value = {
"device_code": "test_code",
"user_code": "123456",
}
self.auth_command.login()
mock_console_print.assert_called_once_with(
"Signing in to CrewAI AMP...\n", style="bold blue"
)
mock_get_device.assert_called_once()
mock_display.assert_called_once_with(
{"device_code": "test_code", "user_code": "123456"}
)
mock_poll.assert_called_once_with(
{"device_code": "test_code", "user_code": "123456"},
)
assert (
self.auth_command.oauth2_provider.get_client_id()
== expected_urls["client_id"]
)
assert (
self.auth_command.oauth2_provider.get_audience()
== expected_urls["audience"]
)
assert (
self.auth_command.oauth2_provider._get_domain() == expected_urls["domain"]
)
@patch("crewai_cli.authentication.main.webbrowser")
@patch("crewai_cli.authentication.main.console.print")
def test_display_auth_instructions(self, mock_console_print, mock_webbrowser):
device_code_data = {
"verification_uri_complete": "https://example.com/auth",
"user_code": "123456",
}
self.auth_command._display_auth_instructions(device_code_data)
expected_calls = [
call("1. Navigate to: ", "https://example.com/auth"),
call("2. Enter the following code: ", "123456"),
]
mock_console_print.assert_has_calls(expected_calls)
mock_webbrowser.open.assert_called_once_with("https://example.com/auth")
@pytest.mark.parametrize(
"user_provider,jwt_config",
[
(
"workos",
{
"jwks_url": f"https://{CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN}/oauth2/jwks",
"issuer": f"https://{CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN}",
"audience": CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE,
},
),
],
)
@pytest.mark.parametrize("has_expiration", [True, False])
@patch("crewai_cli.authentication.main.validate_jwt_token")
@patch("crewai_cli.authentication.main.TokenManager.save_tokens")
def test_validate_and_save_token(
self,
mock_save_tokens,
mock_validate_jwt,
user_provider,
jwt_config,
has_expiration,
):
from crewai_cli.authentication.main import Oauth2Settings
from crewai_cli.authentication.providers.workos import WorkosProvider
if user_provider == "workos":
self.auth_command.oauth2_provider = WorkosProvider(
settings=Oauth2Settings(
provider=user_provider,
client_id="test-client-id",
domain=CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
audience=jwt_config["audience"],
)
)
token_data = {"access_token": "test_access_token", "id_token": "test_id_token"}
if has_expiration:
future_timestamp = int((datetime.now() + timedelta(days=100)).timestamp())
decoded_token = {"exp": future_timestamp}
else:
decoded_token = {}
mock_validate_jwt.return_value = decoded_token
self.auth_command._validate_and_save_token(token_data)
mock_validate_jwt.assert_called_once_with(
jwt_token="test_access_token",
jwks_url=jwt_config["jwks_url"],
issuer=jwt_config["issuer"],
audience=jwt_config["audience"],
)
if has_expiration:
mock_save_tokens.assert_called_once_with(
"test_access_token", future_timestamp
)
else:
mock_save_tokens.assert_called_once_with("test_access_token", 0)
@patch("crewai_cli.tools.main.ToolCommand")
@patch("crewai_cli.authentication.main.Settings")
@patch("crewai_cli.authentication.main.console.print")
def test_login_to_tool_repository_success(
self, mock_console_print, mock_settings, mock_tool_command
):
mock_tool_instance = MagicMock()
mock_tool_command.return_value = mock_tool_instance
mock_settings_instance = MagicMock()
mock_settings_instance.org_name = "Test Org"
mock_settings_instance.org_uuid = "test-uuid-123"
mock_settings.return_value = mock_settings_instance
self.auth_command._login_to_tool_repository()
mock_tool_command.assert_called_once()
mock_tool_instance.login.assert_called_once()
expected_calls = [
call(
"Now logging you in to the Tool Repository... ",
style="bold blue",
end="",
),
call("Success!\n", style="bold green"),
call(
"You are now authenticated to the tool repository for organization [bold cyan]'Test Org'[/bold cyan]",
style="green",
),
]
mock_console_print.assert_has_calls(expected_calls)
@patch("crewai_cli.tools.main.ToolCommand")
@patch("crewai_cli.authentication.main.console.print")
def test_login_to_tool_repository_error(
self, mock_console_print, mock_tool_command
):
mock_tool_instance = MagicMock()
mock_tool_instance.login.side_effect = Exception("Tool repository error")
mock_tool_command.return_value = mock_tool_instance
self.auth_command._login_to_tool_repository()
mock_tool_command.assert_called_once()
mock_tool_instance.login.assert_called_once()
expected_calls = [
call(
"Now logging you in to the Tool Repository... ",
style="bold blue",
end="",
),
call(
"\n[bold yellow]Warning:[/bold yellow] Authentication with the Tool Repository failed.",
style="yellow",
),
call(
"Other features will work normally, but you may experience limitations with downloading and publishing tools.\nRun [bold]crewai login[/bold] to try logging in again.\n",
style="yellow",
),
]
mock_console_print.assert_has_calls(expected_calls)
@patch("crewai_cli.authentication.main.httpx.post")
def test_get_device_code(self, mock_post):
mock_response = MagicMock()
mock_response.json.return_value = {
"device_code": "test_device_code",
"user_code": "123456",
"verification_uri_complete": "https://example.com/auth",
}
mock_post.return_value = mock_response
self.auth_command.oauth2_provider = MagicMock()
self.auth_command.oauth2_provider.get_client_id.return_value = "test_client"
self.auth_command.oauth2_provider.get_authorize_url.return_value = (
"https://example.com/device"
)
self.auth_command.oauth2_provider.get_audience.return_value = "test_audience"
self.auth_command.oauth2_provider.get_oauth_scopes.return_value = ["openid", "profile", "email"]
result = self.auth_command._get_device_code()
mock_post.assert_called_once_with(
url="https://example.com/device",
data={
"client_id": "test_client",
"scope": "openid profile email",
"audience": "test_audience",
},
timeout=20,
)
assert result == {
"device_code": "test_device_code",
"user_code": "123456",
"verification_uri_complete": "https://example.com/auth",
}
@patch("crewai_cli.authentication.main.httpx.post")
@patch("crewai_cli.authentication.main.console.print")
def test_poll_for_token_success(self, mock_console_print, mock_post):
mock_response_success = MagicMock()
mock_response_success.status_code = 200
mock_response_success.json.return_value = {
"access_token": "test_access_token",
"id_token": "test_id_token",
}
mock_post.return_value = mock_response_success
device_code_data = {"device_code": "test_device_code", "interval": 1}
with (
patch.object(
self.auth_command, "_validate_and_save_token"
) as mock_validate,
patch.object(
self.auth_command, "_login_to_tool_repository"
) as mock_tool_login,
):
self.auth_command.oauth2_provider = MagicMock()
self.auth_command.oauth2_provider.get_token_url.return_value = (
"https://example.com/token"
)
self.auth_command.oauth2_provider.get_client_id.return_value = "test_client"
self.auth_command._poll_for_token(device_code_data)
mock_post.assert_called_once_with(
"https://example.com/token",
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": "test_device_code",
"client_id": "test_client",
},
timeout=30,
)
mock_validate.assert_called_once()
mock_tool_login.assert_called_once()
expected_calls = [
call("\nWaiting for authentication... ", style="bold blue", end=""),
call("Success!", style="bold green"),
call("\n[bold green]Welcome to CrewAI AMP![/bold green]\n"),
]
mock_console_print.assert_has_calls(expected_calls)
@patch("crewai_cli.authentication.main.httpx.post")
@patch("crewai_cli.authentication.main.console.print")
def test_poll_for_token_timeout(self, mock_console_print, mock_post):
mock_response_pending = MagicMock()
mock_response_pending.status_code = 400
mock_response_pending.json.return_value = {"error": "authorization_pending"}
mock_post.return_value = mock_response_pending
device_code_data = {
"device_code": "test_device_code",
"interval": 0.1, # Short interval for testing
}
self.auth_command._poll_for_token(device_code_data)
mock_console_print.assert_any_call(
"Timeout: Failed to get the token. Please try again.", style="bold red"
)
@patch("crewai_cli.authentication.main.httpx.post")
def test_poll_for_token_error(self, mock_post):
"""Test the method to poll for token (error path)."""
# Setup mock to return error
mock_response_error = MagicMock()
mock_response_error.status_code = 400
mock_response_error.json.return_value = {
"error": "access_denied",
"error_description": "User denied access",
}
mock_post.return_value = mock_response_error
device_code_data = {"device_code": "test_device_code", "interval": 1}
with pytest.raises(httpx.HTTPError):
self.auth_command._poll_for_token(device_code_data)

View File

@@ -1,107 +0,0 @@
import unittest
from unittest.mock import MagicMock, patch
import jwt
from crewai_cli.authentication.utils import validate_jwt_token
@patch("crewai_cli.authentication.utils.PyJWKClient", return_value=MagicMock())
@patch("crewai_cli.authentication.utils.jwt")
class TestUtils(unittest.TestCase):
def test_validate_jwt_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.return_value = {"exp": 1719859200}
# Create signing key object mock with a .key attribute
mock_pyjwkclient.return_value.get_signing_key_from_jwt.return_value = MagicMock(
key="mock_signing_key"
)
jwt_token = "aaaaa.bbbbbb.cccccc" # noqa: S105
decoded_token = validate_jwt_token(
jwt_token=jwt_token,
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
mock_jwt.decode.assert_called_with(
jwt_token,
"mock_signing_key",
algorithms=["RS256"],
audience="app_id_xxxx",
issuer="https://mock_issuer",
leeway=10.0,
options={
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"require": ["exp", "iat", "iss", "aud", "sub"],
},
)
mock_pyjwkclient.assert_called_once_with("https://mock_jwks_url")
self.assertEqual(decoded_token, {"exp": 1719859200})
def test_validate_jwt_token_expired(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.ExpiredSignatureError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_audience(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidAudienceError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_issuer(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidIssuerError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_missing_required_claims(
self, mock_jwt, mock_pyjwkclient
):
mock_jwt.decode.side_effect = jwt.MissingRequiredClaimError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_jwks_error(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.exceptions.PyJWKClientError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)
def test_validate_jwt_token_invalid_token(self, mock_jwt, mock_pyjwkclient):
mock_jwt.decode.side_effect = jwt.InvalidTokenError
with self.assertRaises(Exception): # noqa: B017
validate_jwt_token(
jwt_token="aaaaa.bbbbbb.cccccc", # noqa: S106
jwks_url="https://mock_jwks_url",
issuer="https://mock_issuer",
audience="app_id_xxxx",
)

View File

@@ -1,255 +0,0 @@
from pathlib import Path
from unittest import mock
import pytest
from click.testing import CliRunner
from crewai_cli.cli import (
deploy_create,
deploy_list,
deploy_logs,
deploy_push,
deploy_remove,
deply_status,
flow_add_crew,
login,
reset_memories,
test,
train,
version,
)
@pytest.fixture
def runner():
return CliRunner()
@mock.patch("crewai_cli.cli.train_crew")
def test_train_default_iterations(train_crew, runner):
result = runner.invoke(train)
train_crew.assert_called_once_with(5, "trained_agents_data.pkl")
assert result.exit_code == 0
assert "Training the Crew for 5 iterations" in result.output
@mock.patch("crewai_cli.cli.train_crew")
def test_train_custom_iterations(train_crew, runner):
result = runner.invoke(train, ["--n_iterations", "10"])
train_crew.assert_called_once_with(10, "trained_agents_data.pkl")
assert result.exit_code == 0
assert "Training the Crew for 10 iterations" in result.output
@mock.patch("crewai_cli.cli.train_crew")
def test_train_invalid_string_iterations(train_crew, runner):
result = runner.invoke(train, ["--n_iterations", "invalid"])
train_crew.assert_not_called()
assert result.exit_code == 2
assert (
"Usage: train [OPTIONS]\nTry 'train --help' for help.\n\nError: Invalid value for '-n' / '--n_iterations': 'invalid' is not a valid integer.\n"
in result.output
)
def test_reset_no_memory_flags(runner):
result = runner.invoke(
reset_memories,
)
assert (
result.output
== "Please specify at least one memory type to reset using the appropriate flags.\n"
)
def test_version_flag(runner):
result = runner.invoke(version)
assert result.exit_code == 0
assert "crewai version:" in result.output
def test_version_command(runner):
result = runner.invoke(version)
assert result.exit_code == 0
assert "crewai version:" in result.output
def test_version_command_with_tools(runner):
result = runner.invoke(version, ["--tools"])
assert result.exit_code == 0
assert "crewai version:" in result.output
assert (
"crewai tools version:" in result.output
or "crewai tools not installed" in result.output
)
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_default_iterations(evaluate_crew, runner):
result = runner.invoke(test)
evaluate_crew.assert_called_once_with(3, "gpt-4o-mini")
assert result.exit_code == 0
assert "Testing the crew for 3 iterations with model gpt-4o-mini" in result.output
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_custom_iterations(evaluate_crew, runner):
result = runner.invoke(test, ["--n_iterations", "5", "--model", "gpt-4o"])
evaluate_crew.assert_called_once_with(5, "gpt-4o")
assert result.exit_code == 0
assert "Testing the crew for 5 iterations with model gpt-4o" in result.output
@mock.patch("crewai_cli.cli.evaluate_crew")
def test_test_invalid_string_iterations(evaluate_crew, runner):
result = runner.invoke(test, ["--n_iterations", "invalid"])
evaluate_crew.assert_not_called()
assert result.exit_code == 2
assert (
"Usage: test [OPTIONS]\nTry 'test --help' for help.\n\nError: Invalid value for '-n' / '--n_iterations': 'invalid' is not a valid integer.\n"
in result.output
)
@mock.patch("crewai_cli.cli.AuthenticationCommand")
def test_login(command, runner):
mock_auth = command.return_value
result = runner.invoke(login)
assert result.exit_code == 0
mock_auth.login.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_create(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_create)
assert result.exit_code == 0
mock_deploy.create_crew.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_list(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_list)
assert result.exit_code == 0
mock_deploy.list_crews.assert_called_once()
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_push(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_push, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.deploy.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_push_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_push)
assert result.exit_code == 0
mock_deploy.deploy.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_status(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deply_status, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.get_crew_status.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_status_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deply_status)
assert result.exit_code == 0
mock_deploy.get_crew_status.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_logs(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_logs, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.get_crew_logs.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_logs_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_logs)
assert result.exit_code == 0
mock_deploy.get_crew_logs.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_remove(command, runner):
mock_deploy = command.return_value
uuid = "test-uuid"
result = runner.invoke(deploy_remove, ["-u", uuid])
assert result.exit_code == 0
mock_deploy.remove_crew.assert_called_once_with(uuid=uuid)
@mock.patch("crewai_cli.cli.DeployCommand")
def test_deploy_remove_no_uuid(command, runner):
mock_deploy = command.return_value
result = runner.invoke(deploy_remove)
assert result.exit_code == 0
mock_deploy.remove_crew.assert_called_once_with(uuid=None)
@mock.patch("crewai_cli.add_crew_to_flow.create_embedded_crew")
@mock.patch("pathlib.Path.exists", return_value=True)
def test_flow_add_crew(mock_path_exists, mock_create_embedded_crew, runner):
crew_name = "new_crew"
result = runner.invoke(flow_add_crew, [crew_name])
assert result.exit_code == 0, f"Command failed with output: {result.output}"
assert f"Adding crew {crew_name} to the flow" in result.output
mock_create_embedded_crew.assert_called_once()
call_args, call_kwargs = mock_create_embedded_crew.call_args
assert call_args[0] == crew_name
assert "parent_folder" in call_kwargs
assert isinstance(call_kwargs["parent_folder"], Path)
def test_add_crew_to_flow_not_in_root(runner):
with mock.patch("pathlib.Path.exists", autospec=True) as mock_exists:
def exists_side_effect(self):
if self.name == "pyproject.toml":
return False
return True
mock_exists.side_effect = exists_side_effect
result = runner.invoke(flow_add_crew, ["new_crew"])
assert result.exit_code != 0
assert "This command must be run from the root of a flow project." in str(
result.output
)

View File

@@ -1,148 +0,0 @@
import json
import shutil
import tempfile
import unittest
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai_cli.config import (
CLI_SETTINGS_KEYS,
DEFAULT_CLI_SETTINGS,
USER_SETTINGS_KEYS,
Settings,
)
from crewai_cli.shared.token_manager import TokenManager
class TestSettings(unittest.TestCase):
def setUp(self):
self.test_dir = Path(tempfile.mkdtemp())
self.config_path = self.test_dir / "settings.json"
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_empty_initialization(self):
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)
self.assertIsNone(settings.tool_repository_password)
def test_initialization_with_data(self):
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
self.assertEqual(settings.tool_repository_username, "user1")
self.assertIsNone(settings.tool_repository_password)
def test_initialization_with_existing_file(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump({"tool_repository_username": "file_user"}, f)
settings = Settings(config_path=self.config_path)
self.assertEqual(settings.tool_repository_username, "file_user")
def test_merge_file_and_input_data(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump(
{
"tool_repository_username": "file_user",
"tool_repository_password": "file_pass",
},
f,
)
settings = Settings(
config_path=self.config_path, tool_repository_username="new_user"
)
self.assertEqual(settings.tool_repository_username, "new_user")
self.assertEqual(settings.tool_repository_password, "file_pass")
def test_clear_user_settings(self):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
settings = Settings(config_path=self.config_path, **user_settings)
settings.clear_user_settings()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
@patch("crewai_cli.config.TokenManager")
def test_reset_settings(self, mock_token_manager):
user_settings = {key: f"value_for_{key}" for key in USER_SETTINGS_KEYS}
cli_settings = {key: f"value_for_{key}" for key in CLI_SETTINGS_KEYS if key != "oauth2_extra"}
cli_settings["oauth2_extra"] = {"scope": "xxx", "other": "yyy"}
settings = Settings(
config_path=self.config_path, **user_settings, **cli_settings
)
mock_token_manager.return_value = MagicMock()
TokenManager().save_tokens(
"aaa.bbb.ccc", (datetime.now() + timedelta(seconds=36000)).timestamp()
)
settings.reset()
for key in user_settings.keys():
self.assertEqual(getattr(settings, key), None)
for key in cli_settings.keys():
self.assertEqual(getattr(settings, key), DEFAULT_CLI_SETTINGS.get(key))
mock_token_manager.return_value.clear_tokens.assert_called_once()
def test_dump_new_settings(self):
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertEqual(saved_data["tool_repository_username"], "user1")
def test_update_existing_settings(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
json.dump({"existing_setting": "value"}, f)
settings = Settings(
config_path=self.config_path, tool_repository_username="user1"
)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertEqual(saved_data["existing_setting"], "value")
self.assertEqual(saved_data["tool_repository_username"], "user1")
def test_none_values(self):
settings = Settings(config_path=self.config_path, tool_repository_username=None)
settings.dump()
with self.config_path.open("r") as f:
saved_data = json.load(f)
self.assertIsNone(saved_data.get("tool_repository_username"))
def test_invalid_json_in_config(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
with self.config_path.open("w") as f:
f.write("invalid json")
try:
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)
except json.JSONDecodeError:
self.fail("Settings initialization should handle invalid JSON")
def test_empty_config_file(self):
self.config_path.parent.mkdir(parents=True, exist_ok=True)
self.config_path.touch()
settings = Settings(config_path=self.config_path)
self.assertIsNone(settings.tool_repository_username)

View File

@@ -1,20 +0,0 @@
from crewai_cli.constants import ENV_VARS, MODELS, PROVIDERS
def test_huggingface_in_providers():
"""Test that Huggingface is in the PROVIDERS list."""
assert "huggingface" in PROVIDERS
def test_huggingface_env_vars():
"""Test that Huggingface environment variables are properly configured."""
assert "huggingface" in ENV_VARS
assert any(
detail.get("key_name") == "HF_TOKEN" for detail in ENV_VARS["huggingface"]
)
def test_huggingface_models():
"""Test that Huggingface models are properly configured."""
assert "huggingface" in MODELS
assert len(MODELS["huggingface"]) > 0

View File

@@ -1,356 +0,0 @@
import os
import unittest
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
from crewai_cli.plus_api import PlusAPI
class TestPlusAPI(unittest.TestCase):
def setUp(self):
self.api_key = "test_api_key"
self.api = PlusAPI(self.api_key)
self.org_uuid = "test-org-uuid"
def test_init(self):
self.assertEqual(self.api.api_key, self.api_key)
self.assertEqual(self.api.headers["Authorization"], f"Bearer {self.api_key}")
self.assertEqual(self.api.headers["Content-Type"], "application/json")
self.assertTrue("CrewAI-CLI/" in self.api.headers["User-Agent"])
self.assertTrue(self.api.headers["X-Crewai-Version"])
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_login_to_tool_repository(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.login_to_tool_repository()
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools/login"
)
self.assertEqual(response, mock_response)
def assert_request_with_org_id(
self, mock_client_instance, method: str, endpoint: str, **kwargs
):
mock_client_instance.request.assert_called_once_with(
method,
f"{os.getenv('CREWAI_PLUS_URL')}{endpoint}",
headers={
"Authorization": ANY,
"Content-Type": ANY,
"User-Agent": ANY,
"X-Crewai-Version": ANY,
"X-Crewai-Organization-Id": self.org_uuid,
},
**kwargs,
)
@patch("crewai_cli.plus_api.Settings")
@patch("crewai_cli.plus_api.httpx.Client")
def test_login_to_tool_repository_with_org_uuid(
self, mock_client_class, mock_settings_class
):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api.login_to_tool_repository()
self.assert_request_with_org_id(
mock_client_instance, "POST", "/crewai_plus/api/v1/tools/login"
)
self.assertEqual(response, mock_response)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_get_tool(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
response = self.api.get_tool("test_tool_handle")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai_cli.plus_api.Settings")
@patch("crewai_cli.plus_api.httpx.Client")
def test_get_tool_with_org_uuid(self, mock_client_class, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api.get_tool("test_tool_handle")
self.assert_request_with_org_id(
mock_client_instance, "GET", "/crewai_plus/api/v1/tools/test_tool_handle"
)
self.assertEqual(response, mock_response)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_publish_tool(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai_cli.plus_api.Settings")
@patch("crewai_cli.plus_api.httpx.Client")
def test_publish_tool_with_org_uuid(self, mock_client_class, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings_class.return_value = mock_settings
self.api = PlusAPI(self.api_key)
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
handle = "test_tool_handle"
public = True
version = "1.0.0"
description = "Test tool description"
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
expected_params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
}
self.assert_request_with_org_id(
mock_client_instance, "POST", "/crewai_plus/api/v1/tools", json=expected_params
)
self.assertEqual(response, mock_response)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_publish_tool_without_description(self, mock_make_request):
mock_response = MagicMock()
mock_make_request.return_value = mock_response
handle = "test_tool_handle"
public = False
version = "2.0.0"
description = None
encoded_file = "encoded_test_file"
response = self.api.publish_tool(
handle, public, version, description, encoded_file
)
params = {
"handle": handle,
"public": public,
"version": version,
"file": encoded_file,
"description": description,
"available_exports": None,
}
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/tools", json=params
)
self.assertEqual(response, mock_response)
@patch("crewai_cli.plus_api.httpx.Client")
def test_make_request(self, mock_client_class):
mock_client_instance = MagicMock()
mock_response = MagicMock()
mock_client_instance.request.return_value = mock_response
mock_client_class.return_value.__enter__.return_value = mock_client_instance
response = self.api._make_request("GET", "test_endpoint")
mock_client_class.assert_called_once_with(trust_env=False, verify=True)
mock_client_instance.request.assert_called_once_with(
"GET", f"{self.api.base_url}/test_endpoint", headers=self.api.headers
)
self.assertEqual(response, mock_response)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_deploy_by_name(self, mock_make_request):
self.api.deploy_by_name("test_project")
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews/by-name/test_project/deploy"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_deploy_by_uuid(self, mock_make_request):
self.api.deploy_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews/test_uuid/deploy"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_crew_status_by_name(self, mock_make_request):
self.api.crew_status_by_name("test_project")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/status"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_crew_status_by_uuid(self, mock_make_request):
self.api.crew_status_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/status"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_crew_by_name(self, mock_make_request):
self.api.crew_by_name("test_project")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/logs/deployment"
)
self.api.crew_by_name("test_project", "custom_log")
mock_make_request.assert_called_with(
"GET", "/crewai_plus/api/v1/crews/by-name/test_project/logs/custom_log"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_crew_by_uuid(self, mock_make_request):
self.api.crew_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/logs/deployment"
)
self.api.crew_by_uuid("test_uuid", "custom_log")
mock_make_request.assert_called_with(
"GET", "/crewai_plus/api/v1/crews/test_uuid/logs/custom_log"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_delete_crew_by_name(self, mock_make_request):
self.api.delete_crew_by_name("test_project")
mock_make_request.assert_called_once_with(
"DELETE", "/crewai_plus/api/v1/crews/by-name/test_project"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_delete_crew_by_uuid(self, mock_make_request):
self.api.delete_crew_by_uuid("test_uuid")
mock_make_request.assert_called_once_with(
"DELETE", "/crewai_plus/api/v1/crews/test_uuid"
)
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_list_crews(self, mock_make_request):
self.api.list_crews()
mock_make_request.assert_called_once_with("GET", "/crewai_plus/api/v1/crews")
@patch("crewai_cli.plus_api.PlusAPI._make_request")
def test_create_crew(self, mock_make_request):
payload = {"name": "test_crew"}
self.api.create_crew(payload)
mock_make_request.assert_called_once_with(
"POST", "/crewai_plus/api/v1/crews", json=payload
)
@patch("crewai_cli.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):
mock_settings = MagicMock()
mock_settings.enterprise_base_url = "https://custom-url.com/api"
mock_settings_class.return_value = mock_settings
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,
"https://custom-url.com/api",
)
@patch.dict(os.environ, {"CREWAI_PLUS_URL": "https://custom-url-from-env.com"})
def test_custom_base_url_from_env(self):
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,
"https://custom-url-from-env.com",
)
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
async def test_get_agent(mock_async_client_class):
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
response = await api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
)
assert response == mock_response
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
@patch("crewai_cli.plus_api.Settings")
async def test_get_agent_with_org_uuid(mock_settings_class, mock_async_client_class):
org_uuid = "test-org-uuid"
mock_settings = MagicMock()
mock_settings.org_uuid = org_uuid
mock_settings.enterprise_base_url = os.getenv("CREWAI_PLUS_URL")
mock_settings_class.return_value = mock_settings
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
response = await api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
)
assert "X-Crewai-Organization-Id" in api.headers
assert api.headers["X-Crewai-Organization-Id"] == org_uuid
assert response == mock_response

View File

@@ -1,294 +0,0 @@
"""Tests for TokenManager with atomic file operations."""
import json
import os
import tempfile
import unittest
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import patch
from cryptography.fernet import Fernet
from crewai_cli.shared.token_manager import TokenManager
class TestTokenManager(unittest.TestCase):
"""Test cases for TokenManager."""
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def setUp(self, mock_get_key: unittest.mock.MagicMock) -> None:
"""Set up test fixtures."""
mock_get_key.return_value = Fernet.generate_key()
self.token_manager = TokenManager()
@patch("crewai_cli.shared.token_manager.TokenManager._read_secure_file")
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_get_or_create_key_existing(
self,
mock_get_or_create: unittest.mock.MagicMock,
mock_read: unittest.mock.MagicMock,
) -> None:
"""Test that existing key is returned when present."""
mock_key = Fernet.generate_key()
mock_get_or_create.return_value = mock_key
token_manager = TokenManager()
result = token_manager.key
self.assertEqual(result, mock_key)
def test_get_or_create_key_new(self) -> None:
"""Test that new key is created when none exists."""
mock_key = Fernet.generate_key()
with (
patch.object(self.token_manager, "_read_secure_file", return_value=None) as mock_read,
patch.object(self.token_manager, "_atomic_create_secure_file", return_value=True) as mock_atomic_create,
patch("crewai_cli.shared.token_manager.Fernet.generate_key", return_value=mock_key) as mock_generate,
):
result = self.token_manager._get_or_create_key()
self.assertEqual(result, mock_key)
mock_read.assert_called_with("secret.key")
mock_generate.assert_called_once()
mock_atomic_create.assert_called_once_with("secret.key", mock_key)
def test_get_or_create_key_race_condition(self) -> None:
"""Test that another process's key is used when atomic create fails."""
our_key = Fernet.generate_key()
their_key = Fernet.generate_key()
with (
patch.object(self.token_manager, "_read_secure_file", side_effect=[None, their_key]) as mock_read,
patch.object(self.token_manager, "_atomic_create_secure_file", return_value=False) as mock_atomic_create,
patch("crewai_cli.shared.token_manager.Fernet.generate_key", return_value=our_key),
):
result = self.token_manager._get_or_create_key()
self.assertEqual(result, their_key)
self.assertEqual(mock_read.call_count, 2)
@patch("crewai_cli.shared.token_manager.TokenManager._atomic_write_secure_file")
def test_save_tokens(
self, mock_write: unittest.mock.MagicMock
) -> None:
"""Test saving tokens encrypts and writes atomically."""
access_token = "test_token"
expires_at = int((datetime.now() + timedelta(seconds=3600)).timestamp())
self.token_manager.save_tokens(access_token, expires_at)
mock_write.assert_called_once()
args = mock_write.call_args[0]
self.assertEqual(args[0], "tokens.enc")
decrypted_data = self.token_manager.fernet.decrypt(args[1])
data = json.loads(decrypted_data)
self.assertEqual(data["access_token"], access_token)
expiration = datetime.fromisoformat(data["expiration"])
self.assertEqual(expiration, datetime.fromtimestamp(expires_at))
@patch("crewai_cli.shared.token_manager.TokenManager._read_secure_file")
def test_get_token_valid(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test getting a valid non-expired token."""
access_token = "test_token"
expiration = (datetime.now() + timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertEqual(result, access_token)
@patch("crewai_cli.shared.token_manager.TokenManager._read_secure_file")
def test_get_token_expired(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test that expired token returns None."""
access_token = "test_token"
expiration = (datetime.now() - timedelta(hours=1)).isoformat()
data = {"access_token": access_token, "expiration": expiration}
encrypted_data = self.token_manager.fernet.encrypt(json.dumps(data).encode())
mock_read.return_value = encrypted_data
result = self.token_manager.get_token()
self.assertIsNone(result)
@patch("crewai_cli.shared.token_manager.TokenManager._read_secure_file")
def test_get_token_not_found(
self, mock_read: unittest.mock.MagicMock
) -> None:
"""Test that missing token file returns None."""
mock_read.return_value = None
result = self.token_manager.get_token()
self.assertIsNone(result)
@patch("crewai_cli.shared.token_manager.TokenManager._delete_secure_file")
def test_clear_tokens(
self, mock_delete: unittest.mock.MagicMock
) -> None:
"""Test clearing tokens deletes the token file."""
self.token_manager.clear_tokens()
mock_delete.assert_called_once_with("tokens.enc")
class TestAtomicFileOperations(unittest.TestCase):
"""Test atomic file operations directly."""
def setUp(self) -> None:
"""Set up test fixtures with temp directory."""
self.temp_dir = tempfile.mkdtemp()
self.original_get_path = TokenManager._get_secure_storage_path
# Patch to use temp directory
def mock_get_path() -> Path:
return Path(self.temp_dir)
TokenManager._get_secure_storage_path = staticmethod(mock_get_path)
def tearDown(self) -> None:
"""Clean up temp directory."""
TokenManager._get_secure_storage_path = staticmethod(self.original_get_path)
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_atomic_create_new_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic create succeeds for new file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
result = tm._atomic_create_secure_file("test.txt", b"content")
self.assertTrue(result)
file_path = Path(self.temp_dir) / "test.txt"
self.assertTrue(file_path.exists())
self.assertEqual(file_path.read_bytes(), b"content")
self.assertEqual(file_path.stat().st_mode & 0o777, 0o600)
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_atomic_create_existing_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic create fails for existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Create file first
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"original")
result = tm._atomic_create_secure_file("test.txt", b"new content")
self.assertFalse(result)
self.assertEqual(file_path.read_bytes(), b"original")
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_new_file(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic write creates new file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
tm._atomic_write_secure_file("test.txt", b"content")
file_path = Path(self.temp_dir) / "test.txt"
self.assertTrue(file_path.exists())
self.assertEqual(file_path.read_bytes(), b"content")
self.assertEqual(file_path.stat().st_mode & 0o777, 0o600)
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_overwrites(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test atomic write overwrites existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"original")
tm._atomic_write_secure_file("test.txt", b"new content")
self.assertEqual(file_path.read_bytes(), b"new content")
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_atomic_write_no_temp_file_on_success(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test that temp file is cleaned up after successful write."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
tm._atomic_write_secure_file("test.txt", b"content")
# Check no temp files remain
temp_files = list(Path(self.temp_dir).glob(".test.txt.*"))
self.assertEqual(len(temp_files), 0)
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_read_secure_file_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test reading existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"content")
result = tm._read_secure_file("test.txt")
self.assertEqual(result, b"content")
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_read_secure_file_not_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test reading non-existent file returns None."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
result = tm._read_secure_file("nonexistent.txt")
self.assertIsNone(result)
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_delete_secure_file_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test deleting existing file."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
file_path = Path(self.temp_dir) / "test.txt"
file_path.write_bytes(b"content")
tm._delete_secure_file("test.txt")
self.assertFalse(file_path.exists())
@patch("crewai_cli.shared.token_manager.TokenManager._get_or_create_key")
def test_delete_secure_file_not_exists(
self, mock_get_key: unittest.mock.MagicMock
) -> None:
"""Test deleting non-existent file doesn't raise."""
mock_get_key.return_value = Fernet.generate_key()
tm = TokenManager()
# Should not raise
tm._delete_secure_file("nonexistent.txt")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,146 +0,0 @@
import os
import shutil
import tempfile
from pathlib import Path
import pytest
from crewai_cli import utils
@pytest.fixture
def temp_tree():
root_dir = tempfile.mkdtemp()
create_file(os.path.join(root_dir, "file1.txt"), "Hello, world!")
create_file(os.path.join(root_dir, "file2.txt"), "Another file")
os.mkdir(os.path.join(root_dir, "empty_dir"))
nested_dir = os.path.join(root_dir, "nested_dir")
os.mkdir(nested_dir)
create_file(os.path.join(nested_dir, "nested_file.txt"), "Nested content")
yield root_dir
shutil.rmtree(root_dir)
def create_file(path, content):
with open(path, "w") as f:
f.write(content)
def test_tree_find_and_replace_file_content(temp_tree):
utils.tree_find_and_replace(temp_tree, "world", "universe")
with open(os.path.join(temp_tree, "file1.txt"), "r") as f:
assert f.read() == "Hello, universe!"
def test_tree_find_and_replace_file_name(temp_tree):
old_path = os.path.join(temp_tree, "file2.txt")
new_path = os.path.join(temp_tree, "file2_renamed.txt")
os.rename(old_path, new_path)
utils.tree_find_and_replace(temp_tree, "renamed", "modified")
assert os.path.exists(os.path.join(temp_tree, "file2_modified.txt"))
assert not os.path.exists(new_path)
def test_tree_find_and_replace_directory_name(temp_tree):
utils.tree_find_and_replace(temp_tree, "empty", "renamed")
assert os.path.exists(os.path.join(temp_tree, "renamed_dir"))
assert not os.path.exists(os.path.join(temp_tree, "empty_dir"))
def test_tree_find_and_replace_nested_content(temp_tree):
utils.tree_find_and_replace(temp_tree, "Nested", "Updated")
with open(os.path.join(temp_tree, "nested_dir", "nested_file.txt"), "r") as f:
assert f.read() == "Updated content"
def test_tree_find_and_replace_no_matches(temp_tree):
utils.tree_find_and_replace(temp_tree, "nonexistent", "replacement")
assert set(os.listdir(temp_tree)) == {
"file1.txt",
"file2.txt",
"empty_dir",
"nested_dir",
}
def test_tree_copy_full_structure(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
utils.tree_copy(temp_tree, dest_dir)
assert set(os.listdir(dest_dir)) == set(os.listdir(temp_tree))
assert os.path.isfile(os.path.join(dest_dir, "file1.txt"))
assert os.path.isfile(os.path.join(dest_dir, "file2.txt"))
assert os.path.isdir(os.path.join(dest_dir, "empty_dir"))
assert os.path.isdir(os.path.join(dest_dir, "nested_dir"))
assert os.path.isfile(os.path.join(dest_dir, "nested_dir", "nested_file.txt"))
finally:
shutil.rmtree(dest_dir)
def test_tree_copy_preserve_content(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
utils.tree_copy(temp_tree, dest_dir)
with open(os.path.join(dest_dir, "file1.txt"), "r") as f:
assert f.read() == "Hello, world!"
with open(os.path.join(dest_dir, "nested_dir", "nested_file.txt"), "r") as f:
assert f.read() == "Nested content"
finally:
shutil.rmtree(dest_dir)
def test_tree_copy_to_existing_directory(temp_tree):
dest_dir = tempfile.mkdtemp()
try:
create_file(os.path.join(dest_dir, "existing_file.txt"), "I was here first")
utils.tree_copy(temp_tree, dest_dir)
assert os.path.isfile(os.path.join(dest_dir, "existing_file.txt"))
assert os.path.isfile(os.path.join(dest_dir, "file1.txt"))
finally:
shutil.rmtree(dest_dir)
@pytest.fixture
def temp_project_dir():
"""Create a temporary directory for testing tool extraction."""
with tempfile.TemporaryDirectory() as temp_dir:
yield Path(temp_dir)
def create_init_file(directory, content):
return create_file(directory / "__init__.py", content)
def test_extract_available_exports_empty_project(temp_project_dir, capsys):
with pytest.raises(SystemExit):
utils.extract_available_exports(dir_path=temp_project_dir)
captured = capsys.readouterr()
assert "No valid tools were exposed in your __init__.py file" in captured.out
def test_extract_available_exports_no_init_file(temp_project_dir, capsys):
(temp_project_dir / "some_file.py").write_text("print('hello')")
with pytest.raises(SystemExit):
utils.extract_available_exports(dir_path=temp_project_dir)
captured = capsys.readouterr()
assert "No valid tools were exposed in your __init__.py file" in captured.out
def test_extract_available_exports_empty_init_file(temp_project_dir, capsys):
create_init_file(temp_project_dir, "")
with pytest.raises(SystemExit):
utils.extract_available_exports(dir_path=temp_project_dir)
captured = capsys.readouterr()
assert "Warning: No __all__ defined in" in captured.out
# Tests for extract_available_exports with crewai.tools (BaseTool, @tool)
# remain in lib/crewai/tests/cli/test_utils.py as they require the crewai core package.
# Tests for get_crews, get_flows, fetch_crews, is_valid_tool
# remain in lib/crewai/tests/cli/test_utils.py as they require the crewai core package.

View File

@@ -1,372 +0,0 @@
"""Test for version management."""
import json
from datetime import datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
from crewai_cli.version import get_crewai_version as _get_ver
from crewai_cli.version import (
_find_latest_non_yanked_version,
_get_cache_file,
_is_cache_valid,
_is_version_yanked,
get_crewai_version,
get_latest_version_from_pypi,
is_current_version_yanked,
is_newer_version_available,
)
def test_dynamic_versioning_consistency() -> None:
"""Test that dynamic versioning provides consistent version across all access methods."""
cli_version = get_crewai_version()
package_version = _get_ver()
assert cli_version == package_version
assert package_version is not None
assert len(package_version.strip()) > 0
class TestVersionChecking:
"""Test version checking utilities."""
def test_get_crewai_version(self) -> None:
"""Test getting current crewai version."""
version = get_crewai_version()
assert isinstance(version, str)
assert len(version) > 0
def test_get_cache_file(self) -> None:
"""Test cache file path generation."""
cache_file = _get_cache_file()
assert isinstance(cache_file, Path)
assert cache_file.name == "version_cache.json"
def test_is_cache_valid_with_fresh_cache(self) -> None:
"""Test cache validation with fresh cache."""
cache_data = {"timestamp": datetime.now().isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is True
def test_is_cache_valid_with_stale_cache(self) -> None:
"""Test cache validation with stale cache."""
old_time = datetime.now() - timedelta(hours=25)
cache_data = {"timestamp": old_time.isoformat(), "version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
def test_is_cache_valid_with_missing_timestamp(self) -> None:
"""Test cache validation with missing timestamp."""
cache_data = {"version": "1.0.0"}
assert _is_cache_valid(cache_data) is False
@patch("crewai_cli.version.Path.exists")
@patch("crewai_cli.version.request.urlopen")
def test_get_latest_version_from_pypi_success(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test successful PyPI version fetch uses releases data."""
mock_exists.return_value = False
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": False}],
"2.1.0": [{"yanked": True, "yanked_reason": "bad release"}],
}
mock_response = MagicMock()
mock_response.read.return_value = json.dumps(
{"info": {"version": "2.1.0"}, "releases": releases}
).encode()
mock_urlopen.return_value.__enter__.return_value = mock_response
version = get_latest_version_from_pypi()
assert version == "2.0.0"
@patch("crewai_cli.version.Path.exists")
@patch("crewai_cli.version.request.urlopen")
def test_get_latest_version_from_pypi_failure(
self, mock_urlopen: MagicMock, mock_exists: MagicMock
) -> None:
"""Test PyPI version fetch failure."""
from urllib.error import URLError
mock_exists.return_value = False
mock_urlopen.side_effect = URLError("Network error")
version = get_latest_version_from_pypi()
assert version is None
@patch("crewai_cli.version.get_crewai_version")
@patch("crewai_cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_true(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when newer version is available."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is True
assert current == "1.0.0"
assert latest == "2.0.0"
@patch("crewai_cli.version.get_crewai_version")
@patch("crewai_cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_false(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when no newer version is available."""
mock_current.return_value = "2.0.0"
mock_latest.return_value = "2.0.0"
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "2.0.0"
assert latest == "2.0.0"
@patch("crewai_cli.version.get_crewai_version")
@patch("crewai_cli.version.get_latest_version_from_pypi")
def test_is_newer_version_available_with_none_latest(
self, mock_latest: MagicMock, mock_current: MagicMock
) -> None:
"""Test when PyPI fetch fails."""
mock_current.return_value = "1.0.0"
mock_latest.return_value = None
is_newer, current, latest = is_newer_version_available()
assert is_newer is False
assert current == "1.0.0"
assert latest is None
class TestFindLatestNonYankedVersion:
"""Test _find_latest_non_yanked_version helper."""
def test_skips_yanked_versions(self) -> None:
"""Test that yanked versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_returns_highest_non_yanked(self) -> None:
"""Test that the highest non-yanked version is returned."""
releases = {
"1.0.0": [{"yanked": False}],
"1.5.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) == "1.5.0"
def test_returns_none_when_all_yanked(self) -> None:
"""Test that None is returned when all versions are yanked."""
releases = {
"1.0.0": [{"yanked": True}],
"2.0.0": [{"yanked": True}],
}
assert _find_latest_non_yanked_version(releases) is None
def test_skips_prerelease_versions(self) -> None:
"""Test that pre-release versions are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0a1": [{"yanked": False}],
"2.0.0rc1": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_skips_versions_with_empty_files(self) -> None:
"""Test that versions with no files are skipped."""
releases: dict[str, list[dict[str, bool]]] = {
"1.0.0": [{"yanked": False}],
"2.0.0": [],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_handles_invalid_version_strings(self) -> None:
"""Test that invalid version strings are skipped."""
releases = {
"1.0.0": [{"yanked": False}],
"not-a-version": [{"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "1.0.0"
def test_partially_yanked_files_not_considered_yanked(self) -> None:
"""Test that a version with some non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": False}],
"2.0.0": [{"yanked": True}, {"yanked": False}],
}
assert _find_latest_non_yanked_version(releases) == "2.0.0"
class TestIsVersionYanked:
"""Test _is_version_yanked helper."""
def test_non_yanked_version(self) -> None:
"""Test a non-yanked version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_yanked_version_with_reason(self) -> None:
"""Test a yanked version returns True with reason."""
releases = {
"1.0.0": [{"yanked": True, "yanked_reason": "critical bug"}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "critical bug"
def test_yanked_version_without_reason(self) -> None:
"""Test a yanked version returns True with empty reason."""
releases = {"1.0.0": [{"yanked": True}]}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == ""
def test_unknown_version(self) -> None:
"""Test an unknown version returns False."""
releases = {"1.0.0": [{"yanked": False}]}
is_yanked, reason = _is_version_yanked("9.9.9", releases)
assert is_yanked is False
assert reason == ""
def test_partially_yanked_files(self) -> None:
"""Test a version with mixed yanked/non-yanked files is not yanked."""
releases = {
"1.0.0": [{"yanked": True}, {"yanked": False}],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is False
assert reason == ""
def test_multiple_yanked_files_picks_first_reason(self) -> None:
"""Test that the first available reason is returned."""
releases = {
"1.0.0": [
{"yanked": True, "yanked_reason": ""},
{"yanked": True, "yanked_reason": "second reason"},
],
}
is_yanked, reason = _is_version_yanked("1.0.0", releases)
assert is_yanked is True
assert reason == "second reason"
class TestIsCurrentVersionYanked:
"""Test is_current_version_yanked public function."""
@patch("crewai_cli.version.get_crewai_version")
@patch("crewai_cli.version._get_cache_file")
def test_reads_from_valid_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test reading yanked status from a valid cache."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "bad release",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is True
assert reason == "bad release"
@patch("crewai_cli.version.get_crewai_version")
@patch("crewai_cli.version._get_cache_file")
def test_not_yanked_from_cache(
self, mock_cache_file: MagicMock, mock_version: MagicMock, tmp_path: Path
) -> None:
"""Test non-yanked status from a valid cache."""
mock_version.return_value = "2.0.0"
cache_file = tmp_path / "version_cache.json"
cache_data = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "2.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
@patch("crewai_cli.version.get_latest_version_from_pypi")
@patch("crewai_cli.version.get_crewai_version")
@patch("crewai_cli.version._get_cache_file")
def test_triggers_fetch_on_stale_cache(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that a stale cache triggers a re-fetch."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
old_time = datetime.now() - timedelta(hours=25)
cache_data = {
"version": "2.0.0",
"timestamp": old_time.isoformat(),
"current_version": "1.0.0",
"current_version_yanked": True,
"current_version_yanked_reason": "old reason",
}
cache_file.write_text(json.dumps(cache_data))
mock_cache_file.return_value = cache_file
fresh_cache = {
"version": "2.0.0",
"timestamp": datetime.now().isoformat(),
"current_version": "1.0.0",
"current_version_yanked": False,
"current_version_yanked_reason": "",
}
def write_fresh_cache() -> str:
cache_file.write_text(json.dumps(fresh_cache))
return "2.0.0"
mock_fetch.side_effect = lambda: write_fresh_cache()
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
mock_fetch.assert_called_once()
@patch("crewai_cli.version.get_latest_version_from_pypi")
@patch("crewai_cli.version.get_crewai_version")
@patch("crewai_cli.version._get_cache_file")
def test_returns_false_on_fetch_failure(
self,
mock_cache_file: MagicMock,
mock_version: MagicMock,
mock_fetch: MagicMock,
tmp_path: Path,
) -> None:
"""Test that fetch failure returns not yanked."""
mock_version.return_value = "1.0.0"
cache_file = tmp_path / "version_cache.json"
mock_cache_file.return_value = cache_file
mock_fetch.return_value = None
is_yanked, reason = is_current_version_yanked()
assert is_yanked is False
assert reason == ""
# TestConsoleFormatterVersionCheck tests remain in lib/crewai/tests/cli/test_version.py
# as they depend on crewai.events.utils.console_formatter (core package).

View File

@@ -152,4 +152,4 @@ __all__ = [
"wrap_file_source",
]
__version__ = "1.10.2rc2"
__version__ = "1.11.0rc1"

View File

@@ -11,7 +11,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.10.2rc2",
"crewai==1.11.0rc1",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",
"python-docx~=1.2.0",

View File

@@ -309,4 +309,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.10.2rc2"
__version__ = "1.11.0rc1"

View File

@@ -1,13 +1,27 @@
# CodeInterpreterTool
## Description
This tool is used to give the Agent the ability to run code (Python3) from the code generated by the Agent itself. The code is executed in a sandboxed environment, so it is safe to run any code.
This tool is used to give the Agent the ability to run code (Python3) from the code generated by the Agent itself. The code is executed in a Docker container for secure isolation.
It is incredible useful since it allows the Agent to generate code, run it in the same environment, get the result and use it to make decisions.
It is incredibly useful since it allows the Agent to generate code, run it in an isolated environment, get the result and use it to make decisions.
## ⚠️ Security Requirements
**Docker is REQUIRED** for safe code execution. The tool will refuse to execute code without Docker to prevent security vulnerabilities.
### Why Docker is Required
Previous versions included a "restricted sandbox" fallback when Docker was unavailable. This has been **removed** due to critical security vulnerabilities:
- The Python-based sandbox could be escaped via object introspection
- Attackers could recover the original `__import__` function and access any module
- This allowed arbitrary command execution on the host system
**Docker provides real process isolation** and is the only secure way to execute untrusted code.
## Requirements
- Docker
- **Docker (REQUIRED)** - Install from [docker.com](https://docs.docker.com/get-docker/)
## Installation
Install the crewai_tools package
@@ -17,7 +31,9 @@ pip install 'crewai[tools]'
## Example
Remember that when using this tool, the code must be generated by the Agent itself. The code must be a Python3 code. And it will take some time for the first time to run because it needs to build the Docker image.
Remember that when using this tool, the code must be generated by the Agent itself. The code must be Python3 code. It will take some time the first time to run because it needs to build the Docker image.
### Basic Usage (Docker Container - Recommended)
```python
from crewai_tools import CodeInterpreterTool
@@ -28,7 +44,9 @@ Agent(
)
```
Or if you need to pass your own Dockerfile just do this
### Custom Dockerfile
If you need to pass your own Dockerfile:
```python
from crewai_tools import CodeInterpreterTool
@@ -39,15 +57,39 @@ Agent(
)
```
If it is difficult to connect to docker daemon automatically (especially for macOS users), you can do this to setup docker host manually
### Manual Docker Host Configuration
If it is difficult to connect to the Docker daemon automatically (especially for macOS users), you can set up the Docker host manually:
```python
from crewai_tools import CodeInterpreterTool
Agent(
...
tools=[CodeInterpreterTool(user_docker_base_url="<Docker Host Base Url>",
user_dockerfile_path="<Dockerfile_path>")],
tools=[CodeInterpreterTool(
user_docker_base_url="<Docker Host Base Url>",
user_dockerfile_path="<Dockerfile_path>"
)],
)
```
### Unsafe Mode (NOT RECOMMENDED)
If you absolutely cannot use Docker and **fully trust the code source**, you can use unsafe mode:
```python
from crewai_tools import CodeInterpreterTool
# WARNING: Only use with fully trusted code!
Agent(
...
tools=[CodeInterpreterTool(unsafe_mode=True)],
)
```
**⚠️ SECURITY WARNING:** `unsafe_mode=True` executes code directly on the host without any isolation. Only use this if:
- You completely trust the code being executed
- You understand the security risks
- You cannot install Docker in your environment
For production use, **always use Docker** (the default mode).

View File

@@ -8,6 +8,7 @@ potentially unsafe operations and importing restricted modules.
import importlib.util
import os
import subprocess
import sys
from types import ModuleType
from typing import Any, ClassVar, TypedDict
@@ -50,11 +51,16 @@ class CodeInterpreterSchema(BaseModel):
class SandboxPython:
"""A restricted Python execution environment for running code safely.
"""INSECURE: A restricted Python execution environment with known vulnerabilities.
This class provides methods to safely execute Python code by restricting access to
potentially dangerous modules and built-in functions. It creates a sandboxed
environment where harmful operations are blocked.
WARNING: This class does NOT provide real security isolation and is vulnerable to
sandbox escape attacks via Python object introspection. Attackers can recover the
original __import__ function and bypass all restrictions.
DO NOT USE for untrusted code execution. Use Docker containers instead.
This class attempts to restrict access to dangerous modules and built-in functions
but provides no real security boundary against a motivated attacker.
"""
BLOCKED_MODULES: ClassVar[set[str]] = {
@@ -299,8 +305,8 @@ class CodeInterpreterTool(BaseTool):
def run_code_safety(self, code: str, libraries_used: list[str]) -> str:
"""Runs code in the safest available environment.
Attempts to run code in Docker if available, falls back to a restricted
sandbox if Docker is not available.
Requires Docker to be available for secure code execution. Fails closed
if Docker is not available to prevent sandbox escape vulnerabilities.
Args:
code: The Python code to execute as a string.
@@ -308,10 +314,24 @@ class CodeInterpreterTool(BaseTool):
Returns:
The output of the executed code as a string.
Raises:
RuntimeError: If Docker is not available, as the restricted sandbox
is vulnerable to escape attacks and should not be used
for untrusted code execution.
"""
if self._check_docker_available():
return self.run_code_in_docker(code, libraries_used)
return self.run_code_in_restricted_sandbox(code)
error_msg = (
"Docker is required for safe code execution but is not available. "
"The restricted sandbox fallback has been removed due to security vulnerabilities "
"that allow sandbox escape via Python object introspection. "
"Please install Docker (https://docs.docker.com/get-docker/) or use unsafe_mode=True "
"if you trust the code source and understand the security risks."
)
Printer.print(error_msg, color="bold_red")
raise RuntimeError(error_msg)
def run_code_in_docker(self, code: str, libraries_used: list[str]) -> str:
"""Runs Python code in a Docker container for safe isolation.
@@ -342,10 +362,19 @@ class CodeInterpreterTool(BaseTool):
@staticmethod
def run_code_in_restricted_sandbox(code: str) -> str:
"""Runs Python code in a restricted sandbox environment.
"""DEPRECATED AND INSECURE: Runs Python code in a restricted sandbox environment.
Executes the code with restricted access to potentially dangerous modules and
built-in functions for basic safety when Docker is not available.
WARNING: This method is vulnerable to sandbox escape attacks via Python object
introspection and should NOT be used for untrusted code execution. It has been
deprecated and is only kept for backward compatibility with trusted code.
The "restricted" environment can be bypassed by attackers who can:
- Use object graph introspection to recover the original __import__ function
- Access any Python module including os, subprocess, sys, etc.
- Execute arbitrary commands on the host system
Use run_code_in_docker() for secure code execution, or run_code_unsafe()
if you explicitly acknowledge the security risks.
Args:
code: The Python code to execute as a string.
@@ -354,7 +383,10 @@ class CodeInterpreterTool(BaseTool):
The value of the 'result' variable from the executed code,
or an error message if execution failed.
"""
Printer.print("Running code in restricted sandbox", color="yellow")
Printer.print(
"WARNING: Running code in INSECURE restricted sandbox (vulnerable to escape attacks)",
color="bold_red"
)
exec_locals: dict[str, Any] = {}
try:
SandboxPython.exec(code=code, locals_=exec_locals)
@@ -380,7 +412,7 @@ class CodeInterpreterTool(BaseTool):
Printer.print("WARNING: Running code in unsafe mode", color="bold_magenta")
# Install libraries on the host machine
for library in libraries_used:
os.system(f"pip install {library}") # noqa: S605
subprocess.run([sys.executable, "-m", "pip", "install", library], check=False) # noqa: S603
# Execute the code
try:

View File

@@ -1,3 +1,4 @@
import sys
from unittest.mock import patch
from crewai_tools.tools.code_interpreter_tool.code_interpreter_tool import (
@@ -76,24 +77,22 @@ print("This is line 2")"""
)
def test_restricted_sandbox_basic_code_execution(printer_mock, docker_unavailable_mock):
"""Test basic code execution."""
def test_docker_unavailable_raises_error(printer_mock, docker_unavailable_mock):
"""Test that execution fails when Docker is unavailable in safe mode."""
tool = CodeInterpreterTool()
code = """
result = 2 + 2
print(result)
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
assert result == 4
with pytest.raises(RuntimeError) as exc_info:
tool.run(code=code, libraries_used=[])
assert "Docker is required for safe code execution" in str(exc_info.value)
assert "sandbox escape" in str(exc_info.value)
def test_restricted_sandbox_running_with_blocked_modules(
printer_mock, docker_unavailable_mock
):
"""Test that restricted modules cannot be imported."""
def test_restricted_sandbox_running_with_blocked_modules():
"""Test that restricted modules cannot be imported when using the deprecated sandbox directly."""
tool = CodeInterpreterTool()
restricted_modules = SandboxPython.BLOCKED_MODULES
@@ -102,18 +101,15 @@ def test_restricted_sandbox_running_with_blocked_modules(
import {module}
result = "Import succeeded"
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
# Note: run_code_in_restricted_sandbox is deprecated and insecure
# This test verifies the old behavior but should not be used in production
result = tool.run_code_in_restricted_sandbox(code)
assert f"An error occurred: Importing '{module}' is not allowed" in result
def test_restricted_sandbox_running_with_blocked_builtins(
printer_mock, docker_unavailable_mock
):
"""Test that restricted builtins are not available."""
def test_restricted_sandbox_running_with_blocked_builtins():
"""Test that restricted builtins are not available when using the deprecated sandbox directly."""
tool = CodeInterpreterTool()
restricted_builtins = SandboxPython.UNSAFE_BUILTINS
@@ -122,25 +118,23 @@ def test_restricted_sandbox_running_with_blocked_builtins(
{builtin}("test")
result = "Builtin available"
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
# Note: run_code_in_restricted_sandbox is deprecated and insecure
# This test verifies the old behavior but should not be used in production
result = tool.run_code_in_restricted_sandbox(code)
assert f"An error occurred: name '{builtin}' is not defined" in result
def test_restricted_sandbox_running_with_no_result_variable(
printer_mock, docker_unavailable_mock
):
"""Test behavior when no result variable is set."""
"""Test behavior when no result variable is set in deprecated sandbox."""
tool = CodeInterpreterTool()
code = """
x = 10
"""
result = tool.run(code=code, libraries_used=[])
printer_mock.assert_called_with(
"Running code in restricted sandbox", color="yellow"
)
# Note: run_code_in_restricted_sandbox is deprecated and insecure
# This test verifies the old behavior but should not be used in production
result = tool.run_code_in_restricted_sandbox(code)
assert result == "No result variable found."
@@ -159,6 +153,44 @@ x = 10
assert result == "No result variable found."
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run")
def test_unsafe_mode_installs_libraries_without_shell(
subprocess_run_mock, printer_mock, docker_unavailable_mock
):
"""Test that library installation uses subprocess.run with shell=False, not os.system."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 1"
libraries_used = ["numpy", "pandas"]
tool.run(code=code, libraries_used=libraries_used)
assert subprocess_run_mock.call_count == 2
for call, library in zip(subprocess_run_mock.call_args_list, libraries_used):
args, kwargs = call
# Must be list form (no shell expansion possible)
assert args[0] == [sys.executable, "-m", "pip", "install", library]
# shell= must not be True (defaults to False)
assert kwargs.get("shell", False) is False
@patch("crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run")
def test_unsafe_mode_library_name_with_shell_metacharacters_does_not_invoke_shell(
subprocess_run_mock, printer_mock, docker_unavailable_mock
):
"""Test that a malicious library name cannot inject shell commands."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 1"
malicious_library = "numpy; rm -rf /"
tool.run(code=code, libraries_used=[malicious_library])
subprocess_run_mock.assert_called_once()
args, kwargs = subprocess_run_mock.call_args
# The entire malicious string is passed as a single argument — no shell parsing
assert args[0] == [sys.executable, "-m", "pip", "install", malicious_library]
assert kwargs.get("shell", False) is False
def test_unsafe_mode_running_unsafe_code(printer_mock, docker_unavailable_mock):
"""Test behavior when no result variable is set."""
tool = CodeInterpreterTool(unsafe_mode=True)
@@ -172,3 +204,50 @@ result = eval("5/1")
"WARNING: Running code in unsafe mode", color="bold_magenta"
)
assert 5.0 == result
@pytest.mark.xfail(
reason=(
"run_code_in_restricted_sandbox is known to be vulnerable to sandbox "
"escape via object introspection. This test encodes the desired secure "
"behavior (no escape possible) and will start passing once the "
"vulnerability is fixed or the function is removed."
)
)
def test_sandbox_escape_vulnerability_demonstration(printer_mock):
"""Demonstrate that the restricted sandbox is vulnerable to escape attacks.
This test shows that an attacker can use Python object introspection to bypass
the restricted sandbox and access blocked modules like 'os'. This is why the
sandbox should never be used for untrusted code execution.
NOTE: This test uses the deprecated run_code_in_restricted_sandbox directly
to demonstrate the vulnerability. In production, Docker is now required.
"""
tool = CodeInterpreterTool()
# Classic Python sandbox escape via object introspection
escape_code = """
# Recover the real __import__ function via object introspection
for cls in ().__class__.__bases__[0].__subclasses__():
if cls.__name__ == 'catch_warnings':
# Get the real builtins module
real_builtins = cls()._module.__builtins__
real_import = real_builtins['__import__']
# Now we can import os and execute commands
os = real_import('os')
# Demonstrate we have escaped the sandbox
result = "SANDBOX_ESCAPED" if hasattr(os, 'system') else "FAILED"
break
"""
# The deprecated sandbox is vulnerable to this attack
result = tool.run_code_in_restricted_sandbox(escape_code)
# Desired behavior: the restricted sandbox should prevent this escape.
# If this assertion fails, run_code_in_restricted_sandbox remains vulnerable.
assert result != "SANDBOX_ESCAPED", (
"The restricted sandbox was bypassed via object introspection. "
"This indicates run_code_in_restricted_sandbox is still vulnerable and "
"is why Docker is now required for safe code execution."
)

View File

@@ -53,7 +53,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.10.2rc2",
"crewai-tools==1.11.0rc1",
]
embeddings = [
"tiktoken~=0.8.0"
@@ -105,15 +105,10 @@ a2a = [
file-processing = [
"crewai-files",
]
cli = [
"crewai-cli",
]
# CLI entry point has moved to the crewai-cli package.
# Install it via: pip install crewai[cli]
# [project.scripts]
# crewai = "crewai.cli.cli:crewai"
[project.scripts]
crewai = "crewai.cli.cli:crewai"
# PyTorch index configuration, since torch 2.5.0 is not compatible with python 3.13

View File

@@ -5,6 +5,7 @@ import urllib.request
import warnings
from crewai.agent.core import Agent
from crewai.agent.planning_config import PlanningConfig
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.flow.flow import Flow
@@ -41,7 +42,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.10.2rc2"
__version__ = "1.11.0rc1"
_telemetry_submitted = False
@@ -102,6 +103,7 @@ __all__ = [
"Knowledge",
"LLMGuardrail",
"Memory",
"PlanningConfig",
"Process",
"Task",
"TaskOutput",

View File

@@ -13,6 +13,7 @@ from crewai.a2a.auth.client_schemes import (
)
from crewai.a2a.auth.server_schemes import (
AuthenticatedUser,
EnterpriseTokenAuth,
OIDCAuth,
ServerAuthScheme,
SimpleTokenAuth,
@@ -25,6 +26,7 @@ __all__ = [
"AuthenticatedUser",
"BearerTokenAuth",
"ClientAuthScheme",
"EnterpriseTokenAuth",
"HTTPBasicAuth",
"HTTPDigestAuth",
"OAuth2AuthorizationCode",

View File

@@ -4,6 +4,7 @@ These schemes validate incoming requests to A2A server endpoints.
Supported authentication methods:
- Simple token validation with static bearer tokens
- Enterprise token validation (via PlusAPI)
- OpenID Connect with JWT validation using JWKS
- OAuth2 with JWT validation or token introspection
"""
@@ -16,6 +17,7 @@ import logging
import os
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal
import httpx
import jwt
from jwt import PyJWKClient
from pydantic import (
@@ -33,6 +35,7 @@ from typing_extensions import Self
if TYPE_CHECKING:
from a2a.types import OAuth2SecurityScheme
from jwt.types import Options
logger = logging.getLogger(__name__)
@@ -183,6 +186,24 @@ class SimpleTokenAuth(ServerAuthScheme):
)
class EnterpriseTokenAuth(ServerAuthScheme):
"""Enterprise token authentication.
Validates tokens via the PlusAPI enterprise verification endpoint.
"""
async def authenticate(self, token: str) -> AuthenticatedUser:
"""Authenticate using enterprise token verification.
Args:
token: The bearer token to authenticate.
Raises:
NotImplementedError
"""
raise NotImplementedError
class OIDCAuth(ServerAuthScheme):
"""OpenID Connect authentication.
@@ -475,7 +496,7 @@ class OAuth2ServerAuth(ServerAuthScheme):
try:
signing_key = self._jwk_client.get_signing_key_from_jwt(token)
decode_options: dict[str, Any] = {
decode_options: Options = {
"require": self.required_claims,
}
@@ -556,7 +577,6 @@ class OAuth2ServerAuth(ServerAuthScheme):
async def _authenticate_introspection(self, token: str) -> AuthenticatedUser:
"""Authenticate using OAuth2 token introspection (RFC 7662)."""
import httpx
if not self.introspection_url:
raise HTTPException(

View File

@@ -633,6 +633,10 @@ class A2AServerConfig(BaseModel):
default=False,
description="Whether agent provides extended card to authenticated users",
)
extended_skills: list[AgentSkill] = Field(
default_factory=list,
description="Additional skills visible only to authenticated users in the extended card",
)
url: Url | None = Field(
default=None,
description="Preferred endpoint URL for the agent. Set at runtime if not provided.",

View File

@@ -63,6 +63,9 @@ class A2AErrorCode(IntEnum):
INVALID_AGENT_RESPONSE = -32006
"""The agent produced an invalid response."""
AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED = -32007
"""Authenticated extended card feature is not configured."""
# CrewAI Custom Extensions (-32768 to -32100)
UNSUPPORTED_VERSION = -32009
"""The requested A2A protocol version is not supported."""
@@ -108,6 +111,7 @@ ERROR_MESSAGES: dict[int, str] = {
A2AErrorCode.UNSUPPORTED_OPERATION: "This operation is not supported",
A2AErrorCode.CONTENT_TYPE_NOT_SUPPORTED: "Incompatible content types",
A2AErrorCode.INVALID_AGENT_RESPONSE: "Invalid agent response",
A2AErrorCode.AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED: "Authenticated Extended Card is not configured",
A2AErrorCode.UNSUPPORTED_VERSION: "Unsupported A2A version",
A2AErrorCode.UNSUPPORTED_EXTENSION: "Client does not support required extensions",
A2AErrorCode.AUTHENTICATION_REQUIRED: "Authentication required",
@@ -284,6 +288,15 @@ class InvalidAgentResponseError(A2AError):
code: int = field(default=A2AErrorCode.INVALID_AGENT_RESPONSE, init=False)
@dataclass
class AuthenticatedExtendedCardNotConfiguredError(A2AError):
"""Authenticated extended card is not configured."""
code: int = field(
default=A2AErrorCode.AUTHENTICATED_EXTENDED_CARD_NOT_CONFIGURED, init=False
)
@dataclass
class UnsupportedVersionError(A2AError):
"""The requested A2A version is not supported."""

View File

@@ -23,6 +23,7 @@ from pydantic import (
)
from typing_extensions import Self
from crewai.agent.planning_config import PlanningConfig
from crewai.agent.utils import (
ahandle_knowledge_retrieval,
apply_training_data,
@@ -192,13 +193,23 @@ class Agent(BaseAgent):
default="safe",
description="Mode for code execution: 'safe' (using Docker) or 'unsafe' (direct execution).",
)
reasoning: bool = Field(
planning_config: PlanningConfig | None = Field(
default=None,
description="Configuration for agent planning before task execution.",
)
planning: bool = Field(
default=False,
description="Whether the agent should reflect and create a plan before executing a task.",
)
reasoning: bool = Field(
default=False,
description="[DEPRECATED: Use planning_config instead] Whether the agent should reflect and create a plan before executing a task.",
deprecated=True,
)
max_reasoning_attempts: int | None = Field(
default=None,
description="Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
description="[DEPRECATED: Use planning_config.max_attempts instead] Maximum number of reasoning attempts before executing the task. If None, will try until ready.",
deprecated=True,
)
embedder: EmbedderConfig | None = Field(
default=None,
@@ -265,8 +276,26 @@ class Agent(BaseAgent):
if self.allow_code_execution:
self._validate_docker_installation()
# Handle backward compatibility: convert reasoning=True to planning_config
if self.reasoning and self.planning_config is None:
import warnings
warnings.warn(
"The 'reasoning' parameter is deprecated. Use 'planning_config=PlanningConfig()' instead.",
DeprecationWarning,
stacklevel=2,
)
self.planning_config = PlanningConfig(
max_attempts=self.max_reasoning_attempts,
)
return self
@property
def planning_enabled(self) -> bool:
"""Check if planning is enabled for this agent."""
return self.planning_config is not None or self.planning
def _setup_agent_executor(self) -> None:
if not self.cache_handler:
self.cache_handler = CacheHandler()
@@ -335,7 +364,11 @@ class Agent(BaseAgent):
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
handle_reasoning(self, task)
# Only call handle_reasoning for legacy CrewAgentExecutor
# For AgentExecutor, planning is handled in AgentExecutor.generate_plan()
if self.executor_class is not AgentExecutor:
handle_reasoning(self, task)
self._inject_date_to_task(task)
if self.tools_handler:
@@ -577,7 +610,10 @@ class Agent(BaseAgent):
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
handle_reasoning(self, task)
if self.executor_class is not AgentExecutor:
handle_reasoning(
self, task
) # we need this till CrewAgentExecutor migrates to AgentExecutor
self._inject_date_to_task(task)
if self.tools_handler:
@@ -1423,17 +1459,19 @@ class Agent(BaseAgent):
except Exception as e:
self._logger.log("error", f"Failed to save kickoff result to memory: {e}")
def _execute_and_build_output(
def _build_output_from_result(
self,
result: dict[str, Any],
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent and build the output object.
"""Build a LiteAgentOutput from an executor result dict.
Shared logic used by both sync and async execution paths.
Args:
result: The result dictionary from executor.invoke / invoke_async.
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
@@ -1441,8 +1479,6 @@ class Agent(BaseAgent):
"""
import json
# Execute the agent (this is called from sync path, so invoke returns dict)
result = cast(dict[str, Any], executor.invoke(inputs))
output = result.get("output", "")
# Handle response format conversion
@@ -1490,91 +1526,39 @@ class Agent(BaseAgent):
else str(raw_output)
)
todo_results = LiteAgentOutput.from_todo_items(executor.state.todos.items)
return LiteAgentOutput(
raw=raw_str,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
messages=list(executor.state.messages),
plan=executor.state.plan,
todos=todo_results,
replan_count=executor.state.replan_count,
last_replan_reason=executor.state.last_replan_reason,
)
def _execute_and_build_output(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent synchronously and build the output object."""
result = cast(dict[str, Any], executor.invoke(inputs))
return self._build_output_from_result(result, executor, response_format)
async def _execute_and_build_output_async(
self,
executor: AgentExecutor,
inputs: dict[str, str],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
"""Execute the agent asynchronously and build the output object.
This is the async version of _execute_and_build_output that uses
invoke_async() for native async execution within event loops.
Args:
executor: The executor instance.
inputs: Input dictionary for execution.
response_format: Optional response format.
Returns:
LiteAgentOutput with raw output, formatted result, and metrics.
"""
import json
# Execute the agent asynchronously
"""Execute the agent asynchronously and build the output object."""
result = await executor.invoke_async(inputs)
output = result.get("output", "")
# Handle response format conversion
formatted_result: BaseModel | None = None
raw_output: str
if isinstance(output, BaseModel):
formatted_result = output
raw_output = output.model_dump_json()
elif response_format:
raw_output = str(output) if not isinstance(output, str) else output
try:
model_schema = generate_model_description(response_format)
schema = json.dumps(model_schema, indent=2)
instructions = self.i18n.slice("formatted_task_instructions").format(
output_format=schema
)
converter = Converter(
llm=self.llm,
text=raw_output,
model=response_format,
instructions=instructions,
)
conversion_result = converter.to_pydantic()
if isinstance(conversion_result, BaseModel):
formatted_result = conversion_result
except ConverterError:
pass # Keep raw output if conversion fails
else:
raw_output = str(output) if not isinstance(output, str) else output
# Get token usage metrics
if isinstance(self.llm, BaseLLM):
usage_metrics = self.llm.get_token_usage_summary()
else:
usage_metrics = self._token_process.get_summary()
raw_str = (
raw_output
if isinstance(raw_output, str)
else raw_output.model_dump_json()
if isinstance(raw_output, BaseModel)
else str(raw_output)
)
return LiteAgentOutput(
raw=raw_str,
pydantic=formatted_result,
agent_role=self.role,
usage_metrics=usage_metrics.model_dump() if usage_metrics else None,
messages=executor.messages,
)
return self._build_output_from_result(result, executor, response_format)
def _process_kickoff_guardrail(
self,

View File

@@ -0,0 +1,138 @@
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
from crewai.llms.base_llm import BaseLLM
class PlanningConfig(BaseModel):
"""Configuration for agent planning/reasoning before task execution.
This allows users to customize the planning behavior including prompts,
iteration limits, the LLM used for planning, and the reasoning effort
level that controls post-step observation and replanning behavior.
Note: To disable planning, don't pass a planning_config or set planning=False
on the Agent. The presence of a PlanningConfig enables planning.
Attributes:
reasoning_effort: Controls observation and replanning after each step.
- "low": Observe each step (validates success), but skip the
decide/replan/refine pipeline. Steps are marked complete and
execution continues linearly. Fastest option.
- "medium": Observe each step. On failure, trigger replanning.
On success, skip refinement and continue. Balanced option.
- "high": Full observation pipeline — observe every step, then
route through decide_next_action which can trigger early goal
achievement, full replanning, or lightweight refinement.
Most adaptive but adds latency per step.
max_attempts: Maximum number of planning refinement attempts.
If None, will continue until the agent indicates readiness.
max_steps: Maximum number of steps in the generated plan.
system_prompt: Custom system prompt for planning. Uses default if None.
plan_prompt: Custom prompt for creating the initial plan.
refine_prompt: Custom prompt for refining the plan.
llm: LLM to use for planning. Uses agent's LLM if None.
Example:
```python
from crewai import Agent
from crewai.agent.planning_config import PlanningConfig
# Simple usage — fast, linear execution (default)
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
planning_config=PlanningConfig(),
)
# Balanced — replan only when steps fail
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
planning_config=PlanningConfig(
reasoning_effort="medium",
),
)
# Full adaptive planning with refinement and replanning
agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
planning_config=PlanningConfig(
reasoning_effort="high",
max_attempts=3,
max_steps=10,
plan_prompt="Create a focused plan for: {description}",
llm="gpt-4o-mini", # Use cheaper model for planning
),
)
```
"""
reasoning_effort: Literal["low", "medium", "high"] = Field(
default="medium",
description=(
"Controls post-step observation and replanning behavior. "
"'low' observes steps but skips replanning/refinement (fastest). "
"'medium' observes and replans only on step failure (balanced). "
"'high' runs full observation pipeline with replanning, refinement, "
"and early goal detection (most adaptive, highest latency)."
),
)
max_attempts: int | None = Field(
default=None,
description=(
"Maximum number of planning refinement attempts. "
"If None, will continue until the agent indicates readiness."
),
)
max_steps: int = Field(
default=20,
description="Maximum number of steps in the generated plan.",
ge=1,
)
system_prompt: str | None = Field(
default=None,
description="Custom system prompt for planning. Uses default if None.",
)
plan_prompt: str | None = Field(
default=None,
description="Custom prompt for creating the initial plan.",
)
refine_prompt: str | None = Field(
default=None,
description="Custom prompt for refining the plan.",
)
max_replans: int = Field(
default=3,
description="Maximum number of full replanning attempts before finalizing.",
ge=0,
)
max_step_iterations: int = Field(
default=15,
description=(
"Maximum LLM iterations per step in the StepExecutor multi-turn loop. "
"Lower values make steps faster but less thorough."
),
ge=1,
)
step_timeout: int | None = Field(
default=None,
description=(
"Maximum wall-clock seconds for a single step execution. "
"If exceeded, the step is marked as failed and observation decides "
"whether to continue or replan. None means no per-step timeout."
),
)
llm: str | BaseLLM | None = Field(
default=None,
description="LLM to use for planning. Uses agent's LLM if None.",
)
model_config = {"arbitrary_types_allowed": True}

View File

@@ -28,13 +28,20 @@ if TYPE_CHECKING:
def handle_reasoning(agent: Agent, task: Task) -> None:
"""Handle the reasoning process for an agent before task execution.
"""Handle the reasoning/planning process for an agent before task execution.
This function checks if planning is enabled for the agent and, if so,
creates a plan that gets appended to the task description.
Note: This function is used by CrewAgentExecutor (legacy path).
For AgentExecutor, planning is handled in AgentExecutor.generate_plan().
Args:
agent: The agent performing the task.
task: The task to execute.
"""
if not agent.reasoning:
# Check if planning is enabled using the planning_enabled property
if not getattr(agent, "planning_enabled", False):
return
try:
@@ -43,13 +50,13 @@ def handle_reasoning(agent: Agent, task: Task) -> None:
AgentReasoningOutput,
)
reasoning_handler = AgentReasoning(task=task, agent=agent)
reasoning_output: AgentReasoningOutput = (
reasoning_handler.handle_agent_reasoning()
planning_handler = AgentReasoning(agent=agent, task=task)
planning_output: AgentReasoningOutput = (
planning_handler.handle_agent_reasoning()
)
task.description += f"\n\nReasoning Plan:\n{reasoning_output.plan.plan}"
task.description += f"\n\nPlanning:\n{planning_output.plan.plan}"
except Exception as e:
agent._logger.log("error", f"Error during reasoning process: {e!s}")
agent._logger.log("error", f"Error during planning: {e!s}")
def build_task_prompt_with_schema(task: Task, task_prompt: str, i18n: I18N) -> str:

View File

@@ -0,0 +1,345 @@
"""PlannerObserver: Observation phase after each step execution.
Implements the "Observe" phase. After every step execution, the Planner
analyzes what happened, what new information was learned, and whether the
remaining plan is still valid.
This is NOT an error detector — it runs on every step, including successes,
to incorporate runtime observations into the remaining plan.
Refinements are structured (StepRefinement objects) and applied directly
from the observation result — no second LLM call required.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.observation_events import (
StepObservationCompletedEvent,
StepObservationFailedEvent,
StepObservationStartedEvent,
)
from crewai.utilities.agent_utils import extract_task_section
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.planning_types import StepObservation, TodoItem
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.task import Task
logger = logging.getLogger(__name__)
class PlannerObserver:
"""Observes step execution results and decides on plan continuation.
After EVERY step execution, this class:
1. Analyzes what the step accomplished
2. Identifies new information learned
3. Decides if the remaining plan is still valid
4. Suggests lightweight refinements or triggers full replanning
LLM resolution (magical fallback):
- If ``agent.planning_config.llm`` is explicitly set → use that
- Otherwise → fall back to ``agent.llm`` (same LLM for everything)
Args:
agent: The agent instance (for LLM resolution and config).
task: Optional task context (for description and expected output).
"""
def __init__(
self,
agent: Agent,
task: Task | None = None,
kickoff_input: str = "",
) -> None:
self.agent = agent
self.task = task
self.kickoff_input = kickoff_input
self.llm = self._resolve_llm()
self._i18n: I18N = get_i18n()
def _resolve_llm(self) -> Any:
"""Resolve which LLM to use for observation/planning.
Mirrors AgentReasoning._resolve_llm(): uses planning_config.llm
if explicitly set, otherwise falls back to agent.llm.
Returns:
The resolved LLM instance.
"""
from crewai.llm import LLM
config = getattr(self.agent, "planning_config", None)
if config is not None and config.llm is not None:
if isinstance(config.llm, LLM):
return config.llm
return create_llm(config.llm)
return self.agent.llm
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def observe(
self,
completed_step: TodoItem,
result: str,
all_completed: list[TodoItem],
remaining_todos: list[TodoItem],
) -> StepObservation:
"""Observe a step's result and decide on plan continuation.
This runs after EVERY step execution — not just failures.
Args:
completed_step: The todo item that was just executed.
result: The final result string from the step.
all_completed: All previously completed todos (for context).
remaining_todos: The pending todos still in the plan.
Returns:
StepObservation with the Planner's analysis. Any suggested
refinements are structured StepRefinement objects ready for
direct application — no second LLM call needed.
"""
agent_role = self.agent.role
crewai_event_bus.emit(
self.agent,
event=StepObservationStartedEvent(
agent_role=agent_role,
step_number=completed_step.step_number,
step_description=completed_step.description,
from_task=self.task,
from_agent=self.agent,
),
)
messages = self._build_observation_messages(
completed_step, result, all_completed, remaining_todos
)
try:
response = self.llm.call(
messages,
response_model=StepObservation,
from_task=self.task,
from_agent=self.agent,
)
observation = self._parse_observation_response(response)
refinement_summaries = (
[
f"Step {r.step_number}: {r.new_description}"
for r in observation.suggested_refinements
]
if observation.suggested_refinements
else None
)
crewai_event_bus.emit(
self.agent,
event=StepObservationCompletedEvent(
agent_role=agent_role,
step_number=completed_step.step_number,
step_description=completed_step.description,
step_completed_successfully=observation.step_completed_successfully,
key_information_learned=observation.key_information_learned,
remaining_plan_still_valid=observation.remaining_plan_still_valid,
needs_full_replan=observation.needs_full_replan,
replan_reason=observation.replan_reason,
goal_already_achieved=observation.goal_already_achieved,
suggested_refinements=refinement_summaries,
from_task=self.task,
from_agent=self.agent,
),
)
return observation
except Exception as e:
logger.warning(
f"Observation LLM call failed: {e}. Defaulting to conservative replan."
)
crewai_event_bus.emit(
self.agent,
event=StepObservationFailedEvent(
agent_role=agent_role,
step_number=completed_step.step_number,
step_description=completed_step.description,
error=str(e),
from_task=self.task,
from_agent=self.agent,
),
)
# Don't force a full replan — the step may have succeeded even if the
# observer LLM failed to parse the result. Defaulting to "continue" is
# far less disruptive than wiping the entire plan on every observer error.
return StepObservation(
step_completed_successfully=True,
key_information_learned="",
remaining_plan_still_valid=True,
needs_full_replan=False,
)
def apply_refinements(
self,
observation: StepObservation,
remaining_todos: list[TodoItem],
) -> list[TodoItem]:
"""Apply structured refinements from the observation directly to todo descriptions.
No LLM call needed — refinements are already structured StepRefinement
objects produced by the observation call. This is a pure in-memory update.
Args:
observation: The observation containing structured refinements.
remaining_todos: The pending todos to update in-place.
Returns:
The same todo list with updated descriptions where refinements applied.
"""
if not observation.suggested_refinements:
return remaining_todos
todo_by_step: dict[int, TodoItem] = {t.step_number: t for t in remaining_todos}
for refinement in observation.suggested_refinements:
if refinement.step_number in todo_by_step and refinement.new_description:
todo_by_step[
refinement.step_number
].description = refinement.new_description
return remaining_todos
# ------------------------------------------------------------------
# Internal: Message building
# ------------------------------------------------------------------
def _build_observation_messages(
self,
completed_step: TodoItem,
result: str,
all_completed: list[TodoItem],
remaining_todos: list[TodoItem],
) -> list[LLMMessage]:
"""Build messages for the observation LLM call."""
task_desc = ""
task_goal = ""
if self.task:
task_desc = self.task.description or ""
task_goal = self.task.expected_output or ""
elif self.kickoff_input:
# Standalone kickoff path — no Task object, but we have the raw input.
# Extract just the ## Task section so the observer sees the actual goal,
# not the full enriched instruction with env/tools/verification noise.
task_desc = extract_task_section(self.kickoff_input)
task_goal = "Complete the task successfully"
system_prompt = self._i18n.retrieve("planning", "observation_system_prompt")
# Build context of what's been done
completed_summary = ""
if all_completed:
completed_lines = []
for todo in all_completed:
result_preview = (todo.result or "")[:200]
completed_lines.append(
f" Step {todo.step_number}: {todo.description}\n"
f" Result: {result_preview}"
)
completed_summary = "\n## Previously completed steps:\n" + "\n".join(
completed_lines
)
# Build remaining plan
remaining_summary = ""
if remaining_todos:
remaining_lines = [
f" Step {todo.step_number}: {todo.description}"
for todo in remaining_todos
]
remaining_summary = "\n## Remaining plan steps:\n" + "\n".join(
remaining_lines
)
user_prompt = self._i18n.retrieve("planning", "observation_user_prompt").format(
task_description=task_desc,
task_goal=task_goal,
completed_summary=completed_summary,
step_number=completed_step.step_number,
step_description=completed_step.description,
step_result=result,
remaining_summary=remaining_summary,
)
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
@staticmethod
def _parse_observation_response(response: Any) -> StepObservation:
"""Parse the LLM response into a StepObservation.
The LLM may return:
- A StepObservation instance directly (streaming + litellm path)
- A JSON string (non-streaming path serialises model_dump_json())
- A dict (some provider paths)
- Something else (unexpected)
We handle all cases to avoid silently falling back to a
hardcoded success default.
"""
if isinstance(response, StepObservation):
return response
# JSON string path — most common miss before this fix
if isinstance(response, str):
text = response.strip()
try:
return StepObservation.model_validate_json(text)
except Exception: # noqa: S110
pass
# Some LLMs wrap the JSON in markdown fences
if text.startswith("```"):
lines = text.split("\n")
# Strip first and last lines (``` markers)
inner = "\n".join(
lines[1:-1] if lines[-1].strip() == "```" else lines[1:]
)
try:
return StepObservation.model_validate_json(inner.strip())
except Exception: # noqa: S110
pass
# Dict path
if isinstance(response, dict):
try:
return StepObservation.model_validate(response)
except Exception: # noqa: S110
pass
# Last resort — log what we got so it's diagnosable
logger.warning(
"Could not parse observation response (type=%s). "
"Falling back to default failure observation. Preview: %.200s",
type(response).__name__,
str(response),
)
return StepObservation(
step_completed_successfully=False,
key_information_learned=str(response) if response else "",
remaining_plan_still_valid=False,
)

View File

@@ -0,0 +1,629 @@
"""StepExecutor: Isolated executor for a single plan step.
Implements the direct-action execution pattern from Plan-and-Act
(arxiv 2503.09572): the Executor receives one step description,
makes a single LLM call, executes any tool call returned, and
returns the result immediately.
There is no inner loop. Recovery from failure (retry, replan) is
the responsibility of PlannerObserver and AgentExecutor — keeping
this class single-purpose and fast.
"""
from __future__ import annotations
from collections.abc import Callable
from datetime import datetime
import json
import time
from typing import TYPE_CHECKING, Any, cast
from pydantic import BaseModel
from crewai.agents.parser import AgentAction, AgentFinish
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
ToolUsageStartedEvent,
)
from crewai.utilities.agent_utils import (
build_tool_calls_assistant_message,
check_native_tool_support,
enforce_rpm_limit,
execute_single_native_tool_call,
extract_task_section,
format_message_for_llm,
is_tool_call_list,
process_llm_response,
setup_native_tools,
)
from crewai.utilities.i18n import I18N, get_i18n
from crewai.utilities.planning_types import TodoItem
from crewai.utilities.printer import Printer
from crewai.utilities.step_execution_context import StepExecutionContext, StepResult
from crewai.utilities.string_utils import sanitize_tool_name
from crewai.utilities.tool_utils import execute_tool_and_check_finality
from crewai.utilities.types import LLMMessage
if TYPE_CHECKING:
from crewai.agent import Agent
from crewai.agents.tools_handler import ToolsHandler
from crewai.crew import Crew
from crewai.llms.base_llm import BaseLLM
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from crewai.tools.structured_tool import CrewStructuredTool
class StepExecutor:
"""Executes a SINGLE todo item using direct-action execution.
The StepExecutor owns its own message list per invocation. It never reads
or writes the AgentExecutor's state. Results flow back via StepResult.
Execution pattern (per Plan-and-Act, arxiv 2503.09572):
1. Build messages from todo + context
2. Call LLM once (with or without native tools)
3. If tool call → execute it → return tool result
4. If text answer → return it directly
No inner loop — recovery is PlannerObserver's responsibility.
Args:
llm: The language model to use for execution.
tools: Structured tools available to the executor.
agent: The agent instance (for role/goal/verbose/config).
original_tools: Original BaseTool instances (needed for native tool schema).
tools_handler: Optional tools handler for caching and delegation tracking.
task: Optional task context.
crew: Optional crew context.
function_calling_llm: Optional separate LLM for function calling.
request_within_rpm_limit: Optional RPM limit function.
callbacks: Optional list of callbacks.
i18n: Optional i18n instance.
"""
def __init__(
self,
llm: BaseLLM,
tools: list[CrewStructuredTool],
agent: Agent,
original_tools: list[BaseTool] | None = None,
tools_handler: ToolsHandler | None = None,
task: Task | None = None,
crew: Crew | None = None,
function_calling_llm: BaseLLM | None = None,
request_within_rpm_limit: Callable[[], bool] | None = None,
callbacks: list[Any] | None = None,
i18n: I18N | None = None,
) -> None:
self.llm = llm
self.tools = tools
self.agent = agent
self.original_tools = original_tools or []
self.tools_handler = tools_handler
self.task = task
self.crew = crew
self.function_calling_llm = function_calling_llm
self.request_within_rpm_limit = request_within_rpm_limit
self.callbacks = callbacks or []
self._i18n: I18N = i18n or get_i18n()
self._printer: Printer = Printer()
# Native tool support — set up once
self._use_native_tools = check_native_tool_support(
self.llm, self.original_tools
)
self._openai_tools: list[dict[str, Any]] = []
self._available_functions: dict[str, Callable[..., Any]] = {}
if self._use_native_tools and self.original_tools:
(
self._openai_tools,
self._available_functions,
_,
) = setup_native_tools(self.original_tools)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def execute(
self,
todo: TodoItem,
context: StepExecutionContext,
max_step_iterations: int = 15,
step_timeout: int | None = None,
) -> StepResult:
"""Execute a single todo item using a multi-turn action loop.
Enforces the RPM limit, builds a fresh message list, then iterates
LLM call → tool execution → observation until the LLM signals it is
done (text answer) or max_step_iterations is reached. Never touches
external AgentExecutor state.
Args:
todo: The todo item to execute.
context: Immutable context with task info and dependency results.
max_step_iterations: Maximum LLM iterations in the multi-turn loop.
step_timeout: Maximum wall-clock seconds for this step. None = no limit.
Returns:
StepResult with the outcome.
"""
start_time = time.monotonic()
tool_calls_made: list[str] = []
try:
enforce_rpm_limit(self.request_within_rpm_limit)
messages = self._build_isolated_messages(todo, context)
if self._use_native_tools:
result_text = self._execute_native(
messages,
tool_calls_made,
max_step_iterations=max_step_iterations,
step_timeout=step_timeout,
start_time=start_time,
)
else:
result_text = self._execute_text_parsed(
messages,
tool_calls_made,
max_step_iterations=max_step_iterations,
step_timeout=step_timeout,
start_time=start_time,
)
self._validate_expected_tool_usage(todo, tool_calls_made)
elapsed = time.monotonic() - start_time
return StepResult(
success=True,
result=result_text,
tool_calls_made=tool_calls_made,
execution_time=elapsed,
)
except Exception as e:
elapsed = time.monotonic() - start_time
return StepResult(
success=False,
result="",
error=str(e),
tool_calls_made=tool_calls_made,
execution_time=elapsed,
)
# ------------------------------------------------------------------
# Internal: Message building
# ------------------------------------------------------------------
def _build_isolated_messages(
self, todo: TodoItem, context: StepExecutionContext
) -> list[LLMMessage]:
"""Build a fresh message list for this step's execution.
System prompt tells the LLM it is an Executor focused on one step.
User prompt provides the step description, dependencies, and tools.
"""
system_prompt = self._build_system_prompt()
user_prompt = self._build_user_prompt(todo, context)
return [
format_message_for_llm(system_prompt, role="system"),
format_message_for_llm(user_prompt, role="user"),
]
def _build_system_prompt(self) -> str:
"""Build the Executor's system prompt."""
role = self.agent.role if self.agent else "Assistant"
goal = self.agent.goal if self.agent else "Complete tasks efficiently"
backstory = getattr(self.agent, "backstory", "") or ""
tools_section = ""
if self.tools and not self._use_native_tools:
tool_names = ", ".join(sanitize_tool_name(t.name) for t in self.tools)
tools_section = self._i18n.retrieve(
"planning", "step_executor_tools_section"
).format(tool_names=tool_names)
elif self.tools:
tool_names = ", ".join(sanitize_tool_name(t.name) for t in self.tools)
tools_section = f"\n\nAvailable tools: {tool_names}"
return self._i18n.retrieve("planning", "step_executor_system_prompt").format(
role=role,
backstory=backstory,
goal=goal,
tools_section=tools_section,
)
def _build_user_prompt(self, todo: TodoItem, context: StepExecutionContext) -> str:
"""Build the user prompt for this specific step."""
parts: list[str] = []
# Include overall task context so the executor knows the full goal and
# required output format/location — critical for knowing WHAT to produce.
# We extract only the task body (not tool instructions or verification
# sections) to avoid duplicating directives already in the system prompt.
if context.task_description:
task_section = extract_task_section(context.task_description)
if task_section:
parts.append(
self._i18n.retrieve(
"planning", "step_executor_task_context"
).format(
task_context=task_section,
)
)
parts.append(
self._i18n.retrieve("planning", "step_executor_user_prompt").format(
step_description=todo.description,
)
)
if todo.tool_to_use:
parts.append(
self._i18n.retrieve("planning", "step_executor_suggested_tool").format(
tool_to_use=todo.tool_to_use,
)
)
# Include dependency results (final results only, no traces)
if context.dependency_results:
parts.append(
self._i18n.retrieve("planning", "step_executor_context_header")
)
for step_num, result in sorted(context.dependency_results.items()):
parts.append(
self._i18n.retrieve(
"planning", "step_executor_context_entry"
).format(step_number=step_num, result=result)
)
parts.append(self._i18n.retrieve("planning", "step_executor_complete_step"))
return "\n".join(parts)
# ------------------------------------------------------------------
# Internal: Multi-turn execution loop
# ------------------------------------------------------------------
def _execute_text_parsed(
self,
messages: list[LLMMessage],
tool_calls_made: list[str],
max_step_iterations: int = 15,
step_timeout: int | None = None,
start_time: float | None = None,
) -> str:
"""Execute step using text-parsed tool calling with a multi-turn loop.
Iterates LLM call → tool execution → observation until the LLM
produces a Final Answer or max_step_iterations is reached.
This allows the agent to: run a command, see the output, adjust its
approach, and run another command — all within a single plan step.
"""
use_stop_words = self.llm.supports_stop_words() if self.llm else False
last_tool_result = ""
for _ in range(max_step_iterations):
# Check step timeout
if step_timeout and start_time:
elapsed = time.monotonic() - start_time
if elapsed >= step_timeout:
return last_tool_result or f"Step timed out after {elapsed:.0f}s"
answer = self.llm.call(
messages,
callbacks=self.callbacks,
from_task=self.task,
from_agent=self.agent,
)
if not answer:
raise ValueError("Empty response from LLM")
answer_str = str(answer)
formatted = process_llm_response(answer_str, use_stop_words)
if isinstance(formatted, AgentFinish):
return str(formatted.output)
if isinstance(formatted, AgentAction):
tool_calls_made.append(formatted.tool)
tool_result = self._execute_text_tool_with_events(formatted)
last_tool_result = tool_result
# Append the assistant's reasoning + action, then the observation.
# _build_observation_message handles vision sentinels so the LLM
# receives an image content block instead of raw base64 text.
messages.append({"role": "assistant", "content": answer_str})
messages.append(self._build_observation_message(tool_result))
continue
# Raw text response with no Final Answer marker — treat as done
return answer_str
# Max iterations reached — return the last tool result we accumulated
return last_tool_result
def _execute_text_tool_with_events(self, formatted: AgentAction) -> str:
"""Execute text-parsed tool calls with tool usage events."""
args_dict = self._parse_tool_args(formatted.tool_input)
agent_key = getattr(self.agent, "key", "unknown") if self.agent else "unknown"
started_at = datetime.now()
crewai_event_bus.emit(
self,
event=ToolUsageStartedEvent(
tool_name=formatted.tool,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
),
)
try:
fingerprint_context = {}
if (
self.agent
and hasattr(self.agent, "security_config")
and hasattr(self.agent.security_config, "fingerprint")
):
fingerprint_context = {
"agent_fingerprint": str(self.agent.security_config.fingerprint)
}
tool_result = execute_tool_and_check_finality(
agent_action=formatted,
fingerprint_context=fingerprint_context,
tools=self.tools,
i18n=self._i18n,
agent_key=self.agent.key if self.agent else None,
agent_role=self.agent.role if self.agent else None,
tools_handler=self.tools_handler,
task=self.task,
agent=self.agent,
function_calling_llm=self.function_calling_llm,
crew=self.crew,
)
except Exception as e:
crewai_event_bus.emit(
self,
event=ToolUsageErrorEvent(
tool_name=formatted.tool,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
error=e,
),
)
raise
crewai_event_bus.emit(
self,
event=ToolUsageFinishedEvent(
output=str(tool_result.result),
tool_name=formatted.tool,
tool_args=args_dict,
from_agent=self.agent,
from_task=self.task,
agent_key=agent_key,
started_at=started_at,
finished_at=datetime.now(),
),
)
return str(tool_result.result)
def _parse_tool_args(self, tool_input: Any) -> dict[str, Any]:
"""Parse tool args from the parser output into a dict payload for events."""
if isinstance(tool_input, dict):
return tool_input
if isinstance(tool_input, str):
stripped_input = tool_input.strip()
if not stripped_input:
return {}
try:
parsed = json.loads(stripped_input)
if isinstance(parsed, dict):
return parsed
return {"input": parsed}
except json.JSONDecodeError:
return {"input": stripped_input}
return {"input": str(tool_input)}
# ------------------------------------------------------------------
# Internal: Vision support
# ------------------------------------------------------------------
@staticmethod
def _parse_vision_sentinel(raw: str) -> tuple[str, str] | None:
"""Parse a VISION_IMAGE sentinel into (media_type, base64_data), or None."""
prefix = "VISION_IMAGE:"
if not raw.startswith(prefix):
return None
rest = raw[len(prefix) :]
sep = rest.find(":")
if sep <= 0:
return None
return rest[:sep], rest[sep + 1 :]
@staticmethod
def _build_observation_message(tool_result: str) -> LLMMessage:
"""Build an observation message, converting vision sentinels to image blocks.
When a tool returns a VISION_IMAGE sentinel (e.g. from read_image),
we build a multimodal content block so the LLM can actually *see*
the image rather than receiving a wall of base64 text.
Uses the standard image_url / data-URI format so each LLM provider's
SDK (OpenAI, LiteLLM, etc.) handles the provider-specific conversion.
Format: ``VISION_IMAGE:<media_type>:<base64_data>``
"""
parsed = StepExecutor._parse_vision_sentinel(tool_result)
if parsed:
media_type, b64_data = parsed
return {
"role": "user",
"content": [
{"type": "text", "text": "Observation: Here is the image:"},
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{b64_data}",
},
},
],
}
return {"role": "user", "content": f"Observation: {tool_result}"}
def _validate_expected_tool_usage(
self,
todo: TodoItem,
tool_calls_made: list[str],
) -> None:
"""Fail step execution when a required tool is configured but not called."""
expected_tool = getattr(todo, "tool_to_use", None)
if not expected_tool:
return
expected_tool_name = sanitize_tool_name(expected_tool)
available_tool_names = {
sanitize_tool_name(tool.name)
for tool in self.tools
if getattr(tool, "name", "")
} | set(self._available_functions.keys())
if expected_tool_name not in available_tool_names:
return
called_names = {sanitize_tool_name(name) for name in tool_calls_made}
if expected_tool_name not in called_names:
raise ValueError(
f"Expected tool '{expected_tool_name}' was not called "
f"for step {todo.step_number}."
)
def _execute_native(
self,
messages: list[LLMMessage],
tool_calls_made: list[str],
max_step_iterations: int = 15,
step_timeout: int | None = None,
start_time: float | None = None,
) -> str:
"""Execute step using native function calling with a multi-turn loop.
Iterates LLM call → tool execution → appended results until the LLM
returns a text answer (no more tool calls) or max_step_iterations is
reached. This lets the agent run a shell command, observe the output,
correct mistakes, and issue follow-up commands — all within one step.
"""
accumulated_results: list[str] = []
for _ in range(max_step_iterations):
# Check step timeout
if step_timeout and start_time:
elapsed = time.monotonic() - start_time
if elapsed >= step_timeout:
return (
"\n\n".join(accumulated_results)
if accumulated_results
else f"Step timed out after {elapsed:.0f}s"
)
answer = self.llm.call(
messages,
tools=self._openai_tools,
callbacks=self.callbacks,
from_task=self.task,
from_agent=self.agent,
)
if not answer:
raise ValueError("Empty response from LLM")
if isinstance(answer, BaseModel):
return answer.model_dump_json()
if isinstance(answer, list) and answer and is_tool_call_list(answer):
# _execute_native_tool_calls appends assistant + tool messages
# to `messages` as a side-effect, so the next LLM call will
# see the full conversation history including tool outputs.
result = self._execute_native_tool_calls(
answer, messages, tool_calls_made
)
accumulated_results.append(result)
continue
# Text answer → LLM decided the step is done
return str(answer)
# Max iterations reached — return everything we accumulated
return "\n".join(filter(None, accumulated_results))
def _execute_native_tool_calls(
self,
tool_calls: list[Any],
messages: list[LLMMessage],
tool_calls_made: list[str],
) -> str:
"""Execute a batch of native tool calls and return their results.
Returns the result of the first tool marked result_as_answer if any,
otherwise returns all tool results concatenated.
"""
assistant_message, _reports = build_tool_calls_assistant_message(tool_calls)
if assistant_message:
messages.append(assistant_message)
tool_results: list[str] = []
for tool_call in tool_calls:
call_result = execute_single_native_tool_call(
tool_call,
available_functions=self._available_functions,
original_tools=self.original_tools,
structured_tools=self.tools,
tools_handler=self.tools_handler,
agent=self.agent,
task=self.task,
crew=self.crew,
event_source=self,
printer=self._printer,
verbose=bool(self.agent and self.agent.verbose),
)
if call_result.func_name:
tool_calls_made.append(call_result.func_name)
if call_result.result_as_answer:
return str(call_result.result)
if call_result.tool_message:
raw_content = call_result.tool_message.get("content", "")
if isinstance(raw_content, str):
parsed = self._parse_vision_sentinel(raw_content)
if parsed:
media_type, b64_data = parsed
# Replace the sentinel with a standard image_url content block.
# Each provider's _format_messages handles conversion to
# its native format (e.g. Anthropic image blocks).
modified: LLMMessage = cast(
LLMMessage, dict(call_result.tool_message)
)
modified["content"] = [
{
"type": "image_url",
"image_url": {
"url": f"data:{media_type};base64,{b64_data}",
},
}
]
messages.append(modified)
tool_results.append("[image]")
else:
messages.append(call_result.tool_message)
if raw_content:
tool_results.append(raw_content)
else:
messages.append(call_result.tool_message)
if raw_content:
tool_results.append(str(raw_content))
return "\n".join(tool_results) if tool_results else ""

View File

@@ -1,7 +0,0 @@
"""Authentication utilities for the CrewAI platform."""
from crewai.auth.oauth2 import AuthenticationCommand
from crewai.auth.token import AuthError, get_auth_token
__all__ = ["AuthError", "AuthenticationCommand", "get_auth_token"]

View File

@@ -1,3 +0,0 @@
"""Authentication constants."""
ALGORITHMS = ["RS256"]

View File

@@ -1,184 +0,0 @@
"""OAuth2 authentication for the CrewAI platform."""
import time
from typing import TYPE_CHECKING, Any, TypeVar, cast
import webbrowser
import httpx
from pydantic import BaseModel, Field
from rich.console import Console
from crewai.auth.token_manager import TokenManager
from crewai.auth.utils import validate_jwt_token
from crewai.settings import Settings
console = Console()
TOauth2Settings = TypeVar("TOauth2Settings", bound="Oauth2Settings")
class Oauth2Settings(BaseModel):
"""OAuth2 provider configuration."""
provider: str = Field(
description="OAuth2 provider used for authentication (e.g., workos, okta, auth0)."
)
client_id: str = Field(
description="OAuth2 client ID issued by the provider, used during authentication requests."
)
domain: str = Field(
description="OAuth2 provider's domain (e.g., your-org.auth0.com) used for issuing tokens."
)
audience: str | None = Field(
description="OAuth2 audience value, typically used to identify the target API or resource.",
default=None,
)
extra: dict[str, Any] = Field(
description="Extra configuration for the OAuth2 provider.",
default={},
)
@classmethod
def from_settings(cls: type[TOauth2Settings]) -> TOauth2Settings:
"""Create an Oauth2Settings instance from the CLI settings."""
settings = Settings()
return cls(
provider=settings.oauth2_provider,
domain=settings.oauth2_domain,
client_id=settings.oauth2_client_id,
audience=settings.oauth2_audience,
extra=settings.oauth2_extra,
)
if TYPE_CHECKING:
from crewai.auth.providers.base_provider import BaseProvider
class ProviderFactory:
"""Factory for creating OAuth2 providers from settings."""
@classmethod
def from_settings(
cls: type["ProviderFactory"], # noqa: UP037
settings: Oauth2Settings | None = None,
) -> "BaseProvider": # noqa: UP037
"""Create a provider instance from settings."""
settings = settings or Oauth2Settings.from_settings()
import importlib
module = importlib.import_module(
f"crewai.auth.providers.{settings.provider.lower()}"
)
provider = getattr(
module,
f"{''.join(word.capitalize() for word in settings.provider.split('_'))}Provider",
)
return cast("BaseProvider", provider(settings))
class AuthenticationCommand:
"""Handles authentication with the CrewAI platform."""
def __init__(self) -> None:
self.token_manager = TokenManager()
self.oauth2_provider = ProviderFactory.from_settings()
def login(self) -> None:
"""Sign up to CrewAI+"""
console.print("Signing in to CrewAI AMP...\n", style="bold blue")
device_code_data = self._get_device_code()
self._display_auth_instructions(device_code_data)
return self._poll_for_token(device_code_data)
def _get_device_code(self) -> dict[str, Any]:
"""Get the device code to authenticate the user."""
device_code_payload = {
"client_id": self.oauth2_provider.get_client_id(),
"scope": " ".join(self.oauth2_provider.get_oauth_scopes()),
"audience": self.oauth2_provider.get_audience(),
}
response = httpx.post(
url=self.oauth2_provider.get_authorize_url(),
data=device_code_payload,
timeout=20,
)
response.raise_for_status()
return cast(dict[str, Any], response.json())
def _display_auth_instructions(self, device_code_data: dict[str, str]) -> None:
"""Display the authentication instructions to the user."""
verification_uri = device_code_data.get(
"verification_uri_complete", device_code_data.get("verification_uri", "")
)
console.print("1. Navigate to: ", verification_uri)
console.print("2. Enter the following code: ", device_code_data["user_code"])
webbrowser.open(verification_uri)
def _poll_for_token(self, device_code_data: dict[str, Any]) -> None:
"""Polls the server for the token until it is received, or max attempts are reached."""
token_payload = {
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code_data["device_code"],
"client_id": self.oauth2_provider.get_client_id(),
}
console.print("\nWaiting for authentication... ", style="bold blue", end="")
attempts = 0
while True and attempts < 10:
response = httpx.post(
self.oauth2_provider.get_token_url(), data=token_payload, timeout=30
)
token_data = response.json()
if response.status_code == 200:
self._validate_and_save_token(token_data)
console.print(
"Success!",
style="bold green",
)
self._post_login()
console.print("\n[bold green]Welcome to CrewAI AMP![/bold green]\n")
return
if token_data["error"] not in ("authorization_pending", "slow_down"):
raise httpx.HTTPError(
token_data.get("error_description") or token_data.get("error")
)
time.sleep(device_code_data["interval"])
attempts += 1
console.print(
"Timeout: Failed to get the token. Please try again.", style="bold red"
)
def _validate_and_save_token(self, token_data: dict[str, Any]) -> None:
"""Validates the JWT token and saves the token to the token manager."""
jwt_token = token_data["access_token"]
issuer = self.oauth2_provider.get_issuer()
jwt_token_data = {
"jwt_token": jwt_token,
"jwks_url": self.oauth2_provider.get_jwks_url(),
"issuer": issuer,
"audience": self.oauth2_provider.get_audience(),
}
decoded_token = validate_jwt_token(**jwt_token_data)
expires_at = decoded_token.get("exp", 0)
self.token_manager.save_tokens(jwt_token, expires_at)
def _post_login(self) -> None:
"""Hook called after successful login. Override in subclasses for additional behavior."""

View File

@@ -1 +0,0 @@
"""OAuth2 authentication providers."""

View File

@@ -1,38 +0,0 @@
"""Auth0 OAuth2 provider."""
from crewai.auth.providers.base_provider import BaseProvider
class Auth0Provider(BaseProvider):
"""Auth0 OAuth2 provider implementation."""
def get_authorize_url(self) -> str:
return f"https://{self._get_domain()}/oauth/device/code"
def get_token_url(self) -> str:
return f"https://{self._get_domain()}/oauth/token"
def get_jwks_url(self) -> str:
return f"https://{self._get_domain()}/.well-known/jwks.json"
def get_issuer(self) -> str:
return f"https://{self._get_domain()}/"
def get_audience(self) -> str:
if self.settings.audience is None:
raise ValueError(
"Audience is required. Please set it in the configuration."
)
return self.settings.audience
def get_client_id(self) -> str:
if self.settings.client_id is None:
raise ValueError(
"Client ID is required. Please set it in the configuration."
)
return self.settings.client_id
def _get_domain(self) -> str:
if self.settings.domain is None:
raise ValueError("Domain is required. Please set it in the configuration.")
return self.settings.domain

View File

@@ -1,38 +0,0 @@
"""Base OAuth2 provider interface."""
from abc import ABC, abstractmethod
from crewai.auth.oauth2 import Oauth2Settings
class BaseProvider(ABC):
"""Abstract base class for OAuth2 providers."""
def __init__(self, settings: Oauth2Settings):
self.settings = settings
@abstractmethod
def get_authorize_url(self) -> str: ...
@abstractmethod
def get_token_url(self) -> str: ...
@abstractmethod
def get_jwks_url(self) -> str: ...
@abstractmethod
def get_issuer(self) -> str: ...
@abstractmethod
def get_audience(self) -> str: ...
@abstractmethod
def get_client_id(self) -> str: ...
def get_required_fields(self) -> list[str]:
"""Returns which provider-specific fields inside the "extra" dict will be required."""
return []
def get_oauth_scopes(self) -> list[str]:
"""Returns the OAuth scopes to request."""
return ["openid", "profile", "email"]

View File

@@ -1,47 +0,0 @@
"""Entra ID (Azure AD) OAuth2 provider."""
from typing import cast
from crewai.auth.providers.base_provider import BaseProvider
class EntraIdProvider(BaseProvider):
"""Entra ID (Azure AD) OAuth2 provider implementation."""
def get_authorize_url(self) -> str:
return f"{self._base_url()}/oauth2/v2.0/devicecode"
def get_token_url(self) -> str:
return f"{self._base_url()}/oauth2/v2.0/token"
def get_jwks_url(self) -> str:
return f"{self._base_url()}/discovery/v2.0/keys"
def get_issuer(self) -> str:
return f"{self._base_url()}/v2.0"
def get_audience(self) -> str:
if self.settings.audience is None:
raise ValueError(
"Audience is required. Please set it in the configuration."
)
return self.settings.audience
def get_client_id(self) -> str:
if self.settings.client_id is None:
raise ValueError(
"Client ID is required. Please set it in the configuration."
)
return self.settings.client_id
def get_oauth_scopes(self) -> list[str]:
return [
*super().get_oauth_scopes(),
*cast(str, self.settings.extra.get("scope", "")).split(),
]
def get_required_fields(self) -> list[str]:
return ["scope"]
def _base_url(self) -> str:
return f"https://login.microsoftonline.com/{self.settings.domain}"

View File

@@ -1,36 +0,0 @@
"""Keycloak OAuth2 provider."""
from crewai.auth.providers.base_provider import BaseProvider
class KeycloakProvider(BaseProvider):
"""Keycloak OAuth2 provider implementation."""
def get_authorize_url(self) -> str:
return f"{self._oauth2_base_url()}/realms/{self.settings.extra.get('realm')}/protocol/openid-connect/auth/device"
def get_token_url(self) -> str:
return f"{self._oauth2_base_url()}/realms/{self.settings.extra.get('realm')}/protocol/openid-connect/token"
def get_jwks_url(self) -> str:
return f"{self._oauth2_base_url()}/realms/{self.settings.extra.get('realm')}/protocol/openid-connect/certs"
def get_issuer(self) -> str:
return f"{self._oauth2_base_url()}/realms/{self.settings.extra.get('realm')}"
def get_audience(self) -> str:
return self.settings.audience or "no-audience-provided"
def get_client_id(self) -> str:
if self.settings.client_id is None:
raise ValueError(
"Client ID is required. Please set it in the configuration."
)
return self.settings.client_id
def get_required_fields(self) -> list[str]:
return ["realm"]
def _oauth2_base_url(self) -> str:
domain = self.settings.domain.removeprefix("https://").removeprefix("http://")
return f"https://{domain}"

View File

@@ -1,46 +0,0 @@
"""Okta OAuth2 provider."""
from crewai.auth.providers.base_provider import BaseProvider
class OktaProvider(BaseProvider):
"""Okta OAuth2 provider implementation."""
def get_authorize_url(self) -> str:
return f"{self._oauth2_base_url()}/v1/device/authorize"
def get_token_url(self) -> str:
return f"{self._oauth2_base_url()}/v1/token"
def get_jwks_url(self) -> str:
return f"{self._oauth2_base_url()}/v1/keys"
def get_issuer(self) -> str:
return self._oauth2_base_url().removesuffix("/oauth2")
def get_audience(self) -> str:
if self.settings.audience is None:
raise ValueError(
"Audience is required. Please set it in the configuration."
)
return self.settings.audience
def get_client_id(self) -> str:
if self.settings.client_id is None:
raise ValueError(
"Client ID is required. Please set it in the configuration."
)
return self.settings.client_id
def get_required_fields(self) -> list[str]:
return ["authorization_server_name", "using_org_auth_server"]
def _oauth2_base_url(self) -> str:
using_org_auth_server = self.settings.extra.get("using_org_auth_server", False)
if using_org_auth_server:
base_url = f"https://{self.settings.domain}/oauth2"
else:
base_url = f"https://{self.settings.domain}/oauth2/{self.settings.extra.get('authorization_server_name', 'default')}"
return f"{base_url}"

View File

@@ -1,34 +0,0 @@
"""WorkOS OAuth2 provider."""
from crewai.auth.providers.base_provider import BaseProvider
class WorkosProvider(BaseProvider):
"""WorkOS OAuth2 provider implementation."""
def get_authorize_url(self) -> str:
return f"https://{self._get_domain()}/oauth2/device_authorization"
def get_token_url(self) -> str:
return f"https://{self._get_domain()}/oauth2/token"
def get_jwks_url(self) -> str:
return f"https://{self._get_domain()}/oauth2/jwks"
def get_issuer(self) -> str:
return f"https://{self._get_domain()}"
def get_audience(self) -> str:
return self.settings.audience or ""
def get_client_id(self) -> str:
if self.settings.client_id is None:
raise ValueError(
"Client ID is required. Please set it in the configuration."
)
return self.settings.client_id
def _get_domain(self) -> str:
if self.settings.domain is None:
raise ValueError("Domain is required. Please set it in the configuration.")
return self.settings.domain

View File

@@ -1,15 +0,0 @@
"""Authentication token retrieval."""
from crewai.auth.token_manager import TokenManager
class AuthError(Exception):
"""Raised when authentication fails."""
def get_auth_token() -> str:
"""Get the authentication token."""
access_token = TokenManager().get_token()
if not access_token:
raise AuthError("No token found, make sure you are logged in")
return access_token

View File

@@ -1,188 +0,0 @@
"""Manages encrypted token storage."""
from datetime import datetime
import json
import os
from pathlib import Path
import sys
import tempfile
from typing import Final, Literal, cast
from cryptography.fernet import Fernet
_FERNET_KEY_LENGTH: Final[Literal[44]] = 44
class TokenManager:
"""Manages encrypted token storage."""
def __init__(self, file_path: str = "tokens.enc") -> None:
"""Initialize the TokenManager.
Args:
file_path: The file path to store encrypted tokens.
"""
self.file_path = file_path
self.key = self._get_or_create_key()
self.fernet = Fernet(self.key)
def _get_or_create_key(self) -> bytes:
"""Get or create the encryption key.
Returns:
The encryption key as bytes.
"""
key_filename: str = "secret.key"
key = self._read_secure_file(key_filename)
if key is not None and len(key) == _FERNET_KEY_LENGTH:
return key
new_key = Fernet.generate_key()
if self._atomic_create_secure_file(key_filename, new_key):
return new_key
key = self._read_secure_file(key_filename)
if key is not None and len(key) == _FERNET_KEY_LENGTH:
return key
raise RuntimeError("Failed to create or read encryption key")
def save_tokens(self, access_token: str, expires_at: int) -> None:
"""Save the access token and its expiration time.
Args:
access_token: The access token to save.
expires_at: The UNIX timestamp of the expiration time.
"""
expiration_time = datetime.fromtimestamp(expires_at)
data = {
"access_token": access_token,
"expiration": expiration_time.isoformat(),
}
encrypted_data = self.fernet.encrypt(json.dumps(data).encode())
self._atomic_write_secure_file(self.file_path, encrypted_data)
def get_token(self) -> str | None:
"""Get the access token if it is valid and not expired.
Returns:
The access token if valid and not expired, otherwise None.
"""
encrypted_data = self._read_secure_file(self.file_path)
if encrypted_data is None:
return None
decrypted_data = self.fernet.decrypt(encrypted_data)
data = json.loads(decrypted_data)
expiration = datetime.fromisoformat(data["expiration"])
if expiration <= datetime.now():
return None
return cast(str | None, data.get("access_token"))
def clear_tokens(self) -> None:
"""Clear the stored tokens."""
self._delete_secure_file(self.file_path)
@staticmethod
def _get_secure_storage_path() -> Path:
"""Get the secure storage path based on the operating system.
Returns:
The secure storage path.
"""
if sys.platform == "win32":
base_path = os.environ.get("LOCALAPPDATA")
elif sys.platform == "darwin":
base_path = os.path.expanduser("~/Library/Application Support")
else:
base_path = os.path.expanduser("~/.local/share")
app_name = "crewai/credentials"
storage_path = Path(base_path) / app_name
storage_path.mkdir(parents=True, exist_ok=True)
return storage_path
def _atomic_create_secure_file(self, filename: str, content: bytes) -> bool:
"""Create a file only if it doesn't exist.
Args:
filename: The name of the file.
content: The content to write.
Returns:
True if file was created, False if it already exists.
"""
storage_path = self._get_secure_storage_path()
file_path = storage_path / filename
try:
fd = os.open(file_path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
try:
os.write(fd, content)
finally:
os.close(fd)
return True
except FileExistsError:
return False
def _atomic_write_secure_file(self, filename: str, content: bytes) -> None:
"""Write content to a secure file.
Args:
filename: The name of the file.
content: The content to write.
"""
storage_path = self._get_secure_storage_path()
file_path = storage_path / filename
fd, temp_path = tempfile.mkstemp(dir=storage_path, prefix=f".{filename}.")
fd_closed = False
try:
os.write(fd, content)
os.close(fd)
fd_closed = True
os.chmod(temp_path, 0o600)
os.replace(temp_path, file_path)
except Exception:
if not fd_closed:
os.close(fd)
if os.path.exists(temp_path):
os.unlink(temp_path)
raise
def _read_secure_file(self, filename: str) -> bytes | None:
"""Read the content of a secure file.
Args:
filename: The name of the file.
Returns:
The content of the file if it exists, otherwise None.
"""
storage_path = self._get_secure_storage_path()
file_path = storage_path / filename
try:
with open(file_path, "rb") as f:
return f.read()
except FileNotFoundError:
return None
def _delete_secure_file(self, filename: str) -> None:
"""Delete a secure file.
Args:
filename: The name of the file.
"""
storage_path = self._get_secure_storage_path()
file_path = storage_path / filename
try:
file_path.unlink()
except FileNotFoundError:
pass

View File

@@ -1,67 +0,0 @@
"""JWT token validation utilities."""
from typing import Any
import jwt
from jwt import PyJWKClient
def validate_jwt_token(
jwt_token: str, jwks_url: str, issuer: str, audience: str
) -> Any:
"""Verify the token's signature and claims using PyJWT.
Args:
jwt_token: The JWT (JWS) string to validate.
jwks_url: The URL of the JWKS endpoint.
issuer: The expected issuer of the token.
audience: The expected audience of the token.
Returns:
The decoded token.
Raises:
Exception: If the token is invalid for any reason.
"""
try:
jwk_client = PyJWKClient(jwks_url)
signing_key = jwk_client.get_signing_key_from_jwt(jwt_token)
_unverified_decoded_token = jwt.decode(
jwt_token, options={"verify_signature": False}
)
return jwt.decode(
jwt_token,
signing_key.key,
algorithms=["RS256"],
audience=audience,
issuer=issuer,
leeway=10.0,
options={
"verify_signature": True,
"verify_exp": True,
"verify_nbf": True,
"verify_iat": True,
"require": ["exp", "iat", "iss", "aud", "sub"],
},
)
except jwt.ExpiredSignatureError as e:
raise Exception("Token has expired.") from e
except jwt.InvalidAudienceError as e:
actual_audience = _unverified_decoded_token.get("aud", "[no audience found]")
raise Exception(
f"Invalid token audience. Got: '{actual_audience}'. Expected: '{audience}'"
) from e
except jwt.InvalidIssuerError as e:
actual_issuer = _unverified_decoded_token.get("iss", "[no issuer found]")
raise Exception(
f"Invalid token issuer. Got: '{actual_issuer}'. Expected: '{issuer}'"
) from e
except jwt.MissingRequiredClaimError as e:
raise Exception(f"Token is missing required claims: {e!s}") from e
except jwt.exceptions.PyJWKClientError as e:
raise Exception(f"JWKS or key processing error: {e!s}") from e
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {e!s}") from e

View File

@@ -2,15 +2,19 @@ from pathlib import Path
import click
from crewai_cli.utils import copy_template
from crewai.cli.utils import copy_template
from crewai.utilities.printer import Printer
_printer = Printer()
def add_crew_to_flow(crew_name: str) -> None:
"""Add a new crew to the current flow."""
# Check if pyproject.toml exists in the current directory
if not Path("pyproject.toml").exists():
click.secho(
"This command must be run from the root of a flow project.", fg="red"
_printer.print(
"This command must be run from the root of a flow project.", color="red"
)
raise click.ClickException(
"This command must be run from the root of a flow project."
@@ -21,7 +25,7 @@ def add_crew_to_flow(crew_name: str) -> None:
crews_folder = flow_folder / "src" / flow_folder.name / "crews"
if not crews_folder.exists():
click.secho("Crews folder does not exist in the current flow.", fg="red")
_printer.print("Crews folder does not exist in the current flow.", color="red")
raise click.ClickException("Crews folder does not exist in the current flow.")
# Create the crew within the flow's crews directory

View File

@@ -0,0 +1,4 @@
from crewai.cli.authentication.main import AuthenticationCommand
__all__ = ["AuthenticationCommand"]

View File

@@ -6,9 +6,9 @@ import httpx
from pydantic import BaseModel, Field
from rich.console import Console
from crewai_cli.authentication.utils import validate_jwt_token
from crewai_cli.config import Settings
from crewai_cli.shared.token_manager import TokenManager
from crewai.cli.authentication.utils import validate_jwt_token
from crewai.cli.config import Settings
from crewai.cli.shared.token_manager import TokenManager
console = Console()
@@ -51,7 +51,7 @@ class Oauth2Settings(BaseModel):
if TYPE_CHECKING:
from crewai_cli.authentication.providers.base_provider import BaseProvider
from crewai.cli.authentication.providers.base_provider import BaseProvider
class ProviderFactory:
@@ -65,7 +65,7 @@ class ProviderFactory:
import importlib
module = importlib.import_module(
f"crewai_cli.authentication.providers.{settings.provider.lower()}"
f"crewai.cli.authentication.providers.{settings.provider.lower()}"
)
# Converts from snake_case to CamelCase to obtain the provider class name.
provider = getattr(
@@ -180,7 +180,7 @@ class AuthenticationCommand:
def _login_to_tool_repository(self) -> None:
"""Login to the tool repository."""
from crewai_cli.tools.main import ToolCommand
from crewai.cli.tools.main import ToolCommand
try:
console.print(

View File

@@ -1,4 +1,4 @@
from crewai_cli.authentication.providers.base_provider import BaseProvider
from crewai.cli.authentication.providers.base_provider import BaseProvider
class Auth0Provider(BaseProvider):

View File

@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from crewai_cli.authentication.main import Oauth2Settings
from crewai.cli.authentication.main import Oauth2Settings
class BaseProvider(ABC):

View File

@@ -1,6 +1,6 @@
from typing import cast
from crewai_cli.authentication.providers.base_provider import BaseProvider
from crewai.cli.authentication.providers.base_provider import BaseProvider
class EntraIdProvider(BaseProvider):

View File

@@ -1,4 +1,4 @@
from crewai_cli.authentication.providers.base_provider import BaseProvider
from crewai.cli.authentication.providers.base_provider import BaseProvider
class KeycloakProvider(BaseProvider):

View File

@@ -1,4 +1,4 @@
from crewai_cli.authentication.providers.base_provider import BaseProvider
from crewai.cli.authentication.providers.base_provider import BaseProvider
class OktaProvider(BaseProvider):

View File

@@ -1,4 +1,4 @@
from crewai_cli.authentication.providers.base_provider import BaseProvider
from crewai.cli.authentication.providers.base_provider import BaseProvider
class WorkosProvider(BaseProvider):

View File

@@ -1,4 +1,4 @@
from crewai_cli.shared.token_manager import TokenManager
from crewai.cli.shared.token_manager import TokenManager
class AuthError(Exception):

View File

@@ -1,5 +1,3 @@
from __future__ import annotations
from importlib.metadata import version as get_version
import os
import subprocess
@@ -7,58 +5,44 @@ from typing import Any
import click
from crewai_cli.add_crew_to_flow import add_crew_to_flow
from crewai_cli.authentication.main import AuthenticationCommand
from crewai_cli.config import Settings
from crewai_cli.create_crew import create_crew
from crewai_cli.create_flow import create_flow
from crewai_cli.crew_chat import run_chat
from crewai_cli.deploy.main import DeployCommand
from crewai_cli.enterprise.main import EnterpriseConfigureCommand
from crewai_cli.evaluate_crew import evaluate_crew
from crewai_cli.install_crew import install_crew
from crewai_cli.kickoff_flow import kickoff_flow
from crewai_cli.organization.main import OrganizationCommand
from crewai_cli.plot_flow import plot_flow
from crewai_cli.replay_from_task import replay_task_command
from crewai_cli.reset_memories_command import reset_memories_command
from crewai_cli.run_crew import run_crew
from crewai_cli.settings.main import SettingsCommand
from crewai_cli.task_outputs import load_task_outputs
from crewai_cli.tools.main import ToolCommand
from crewai_cli.train_crew import train_crew
from crewai_cli.triggers.main import TriggersCommand
from crewai_cli.update_crew import update_crew
from crewai_cli.user_data import (
_load_user_data,
_save_user_data,
is_tracing_enabled,
from crewai.cli.add_crew_to_flow import add_crew_to_flow
from crewai.cli.authentication.main import AuthenticationCommand
from crewai.cli.config import Settings
from crewai.cli.create_crew import create_crew
from crewai.cli.create_flow import create_flow
from crewai.cli.crew_chat import run_chat
from crewai.cli.deploy.main import DeployCommand
from crewai.cli.enterprise.main import EnterpriseConfigureCommand
from crewai.cli.evaluate_crew import evaluate_crew
from crewai.cli.install_crew import install_crew
from crewai.cli.kickoff_flow import kickoff_flow
from crewai.cli.organization.main import OrganizationCommand
from crewai.cli.plot_flow import plot_flow
from crewai.cli.replay_from_task import replay_task_command
from crewai.cli.reset_memories_command import reset_memories_command
from crewai.cli.run_crew import run_crew
from crewai.cli.settings.main import SettingsCommand
from crewai.cli.tools.main import ToolCommand
from crewai.cli.train_crew import train_crew
from crewai.cli.triggers.main import TriggersCommand
from crewai.cli.update_crew import update_crew
from crewai.cli.utils import build_env_with_tool_repository_credentials, read_toml
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
from crewai_cli.utils import build_env_with_tool_repository_credentials, read_toml
def _get_cli_version() -> str:
"""Return the best available version string for the CLI."""
# Prefer crewai version if installed (keeps existing UX)
try:
return get_version("crewai")
except Exception: # noqa: S110
pass
try:
return get_version("crewai-cli")
except Exception:
return "unknown"
@click.group()
@click.version_option(_get_cli_version())
@click.version_option(get_version("crewai"))
def crewai():
"""Top-level command group for crewai."""
@crewai.command(
name="uv",
context_settings={"ignore_unknown_options": True},
context_settings=dict(
ignore_unknown_options=True,
),
)
@click.argument("uv_args", nargs=-1, type=click.UNPROCESSED)
def uv(uv_args):
@@ -123,7 +107,7 @@ def version(tools):
if tools:
try:
tools_version = get_version("crewai-tools")
tools_version = get_version("crewai")
click.echo(f"crewai tools version: {tools_version}")
except Exception:
click.echo("crewai tools not installed")
@@ -158,7 +142,12 @@ def train(n_iterations: int, filename: str):
help="Replay the crew from this task ID, including all subsequent tasks.",
)
def replay(task_id: str) -> None:
"""Replay the crew execution from a specific task."""
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
try:
click.echo(f"Replaying the crew from task {task_id}")
replay_task_command(task_id)
@@ -168,9 +157,12 @@ def replay(task_id: str) -> None:
@crewai.command()
def log_tasks_outputs() -> None:
"""Retrieve your latest crew.kickoff() task outputs."""
"""
Retrieve your latest crew.kickoff() task outputs.
"""
try:
tasks = load_task_outputs()
storage = KickoffTaskOutputsSQLiteStorage()
tasks = storage.load()
if not tasks:
click.echo(
@@ -228,8 +220,11 @@ def reset_memories(
agent_knowledge: bool,
all: bool,
) -> None:
"""Reset the crew memories (memory, knowledge, agent_knowledge, kickoff_outputs). This will delete all the data saved."""
"""
Reset the crew memories (memory, knowledge, agent_knowledge, kickoff_outputs). This will delete all the data saved.
"""
try:
# Treat legacy flags as --memory with a deprecation warning
if long or short or entities:
legacy_used = [
f
@@ -296,7 +291,7 @@ def memory(
) -> None:
"""Open the Memory TUI to browse scopes and recall memories."""
try:
from crewai_cli.memory_tui import MemoryTUI
from crewai.cli.memory_tui import MemoryTUI
except ImportError as exc:
click.echo(
"Textual is required for the memory TUI but could not be imported. "
@@ -346,10 +341,10 @@ def test(n_iterations: int, model: str):
@crewai.command(
context_settings={
"ignore_unknown_options": True,
"allow_extra_args": True,
}
context_settings=dict(
ignore_unknown_options=True,
allow_extra_args=True,
)
)
@click.pass_context
def install(context):
@@ -514,12 +509,14 @@ def triggers_run(trigger_path: str):
@crewai.command()
def chat():
"""Start a conversation with the Crew, collecting user-supplied inputs,
"""
Start a conversation with the Crew, collecting user-supplied inputs,
and using the Chat LLM to generate responses.
"""
click.secho(
"\nStarting a conversation with the Crew\nType 'exit' or Ctrl+C to quit.\n",
)
run_chat()
@@ -630,7 +627,7 @@ def env_view():
table.add_row(
"CREWAI_TRACING_ENABLED",
"[dim]Not set[/dim]",
"[dim]---[/dim]",
"[dim][/dim]",
)
# Check other related env vars
@@ -649,7 +646,7 @@ def env_view():
# Check if .env file exists
table.add_row(
".env file",
"Found" if env_file_exists else "Not found",
"Found" if env_file_exists else "Not found",
str(env_file.resolve()) if env_file_exists else "N/A",
)
@@ -665,11 +662,11 @@ def env_view():
# Show helpful message
if env_file_exists:
console.print(
"\n[dim]Tip: To enable tracing via .env, add: CREWAI_TRACING_ENABLED=true[/dim]"
"\n[dim]💡 Tip: To enable tracing via .env, add: CREWAI_TRACING_ENABLED=true[/dim]"
)
else:
console.print(
"\n[dim]Tip: Create a .env file in your project root and add: CREWAI_TRACING_ENABLED=true[/dim]"
"\n[dim]💡 Tip: Create a .env file in your project root and add: CREWAI_TRACING_ENABLED=true[/dim]"
)
console.print()
@@ -685,16 +682,14 @@ def traces_enable():
from rich.console import Console
from rich.panel import Panel
from crewai.events.listeners.tracing.utils import update_user_data
console = Console()
# Update user data to enable traces
user_data = _load_user_data()
user_data["trace_consent"] = True
user_data["first_execution_done"] = True
_save_user_data(user_data)
update_user_data({"trace_consent": True, "first_execution_done": True})
panel = Panel(
"Trace collection has been enabled!\n\n"
"Trace collection has been enabled!\n\n"
"Your crew/flow executions will now send traces to CrewAI+.\n"
"Use 'crewai traces disable' to turn off trace collection.",
title="Traces Enabled",
@@ -710,16 +705,14 @@ def traces_disable():
from rich.console import Console
from rich.panel import Panel
from crewai.events.listeners.tracing.utils import update_user_data
console = Console()
# Update user data to disable traces
user_data = _load_user_data()
user_data["trace_consent"] = False
user_data["first_execution_done"] = True
_save_user_data(user_data)
update_user_data({"trace_consent": False, "first_execution_done": True})
panel = Panel(
"Trace collection has been disabled!\n\n"
"Trace collection has been disabled!\n\n"
"Your crew/flow executions will no longer send traces.\n"
"Use 'crewai traces enable' to turn trace collection back on.",
title="Traces Disabled",
@@ -738,6 +731,11 @@ def traces_status():
from rich.panel import Panel
from rich.table import Table
from crewai.events.listeners.tracing.utils import (
_load_user_data,
is_tracing_enabled,
)
console = Console()
user_data = _load_user_data()
@@ -752,19 +750,19 @@ def traces_status():
# Check user consent
trace_consent = user_data.get("trace_consent")
if trace_consent is True:
consent_status = "Enabled (user consented)"
consent_status = "Enabled (user consented)"
elif trace_consent is False:
consent_status = "Disabled (user declined)"
consent_status = "Disabled (user declined)"
else:
consent_status = "Not set (first-time user)"
consent_status = "Not set (first-time user)"
table.add_row("User Consent", consent_status)
# Check overall status
if is_tracing_enabled():
overall_status = "ENABLED"
overall_status = "ENABLED"
border_style = "green"
else:
overall_status = "DISABLED"
overall_status = "DISABLED"
border_style = "red"
table.add_row("Overall Status", overall_status)

View File

@@ -1,12 +1,11 @@
from __future__ import annotations
import json
import httpx
from rich.console import Console
from crewai_cli.authentication.token import get_auth_token
from crewai_cli.plus_api import PlusAPI
from crewai.cli.authentication.token import get_auth_token
from crewai.cli.plus_api import PlusAPI
from crewai.telemetry.telemetry import Telemetry
console = Console()
@@ -14,14 +13,17 @@ console = Console()
class BaseCommand:
def __init__(self) -> None:
pass
self._telemetry = Telemetry()
self._telemetry.set_tracer()
class PlusAPIMixin:
def __init__(self) -> None:
def __init__(self, telemetry: Telemetry) -> None:
try:
telemetry.set_tracer()
self.plus_api_client = PlusAPI(api_key=get_auth_token())
except Exception:
telemetry.deploy_signup_error_span()
console.print(
"Please sign up/login to CrewAI+ before using the CLI.",
style="bold red",
@@ -30,6 +32,12 @@ class PlusAPIMixin:
raise SystemExit from None
def _validate_response(self, response: httpx.Response) -> None:
"""
Handle and display error messages from API responses.
Args:
response (httpx.Response): The response from the Plus API
"""
try:
json_response = response.json()
except (json.JSONDecodeError, ValueError):

View File

@@ -6,14 +6,14 @@ from typing import Any
from pydantic import BaseModel, Field
from crewai_cli.constants import (
from crewai.cli.constants import (
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_AUDIENCE,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_CLIENT_ID,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_DOMAIN,
CREWAI_ENTERPRISE_DEFAULT_OAUTH2_PROVIDER,
DEFAULT_CREWAI_ENTERPRISE_URL,
)
from crewai_cli.shared.token_manager import TokenManager
from crewai.cli.shared.token_manager import TokenManager
logger = getLogger(__name__)

Some files were not shown because too many files have changed in this diff Show More