mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
* feat: add `apps` & `actions` attributes to Agent (#3504)
* feat: add app attributes to Agent
* feat: add actions attribute to Agent
* chore: resolve linter issues
* refactor: merge the apps and actions parameters into a single one
* fix: remove unnecessary print
* feat: logging error when CrewaiPlatformTools fails
* chore: export CrewaiPlatformTools directly from crewai_tools
* style: resolver linter issues
* test: fix broken tests
* style: solve linter issues
* fix: fix broken test
* feat: monorepo restructure and test/ci updates
- Add crewai workspace member
- Fix vcr cassette paths and restore test dirs
- Resolve ci failures and update linter/pytest rules
* chore: update python version to 3.13 and package metadata
* feat: add crewai-tools workspace and fix tests/dependencies
* feat: add crewai-tools workspace structure
* Squashed 'temp-crewai-tools/' content from commit 9bae5633
git-subtree-dir: temp-crewai-tools
git-subtree-split: 9bae56339096cb70f03873e600192bd2cd207ac9
* feat: configure crewai-tools workspace package with dependencies
* fix: apply ruff auto-formatting to crewai-tools code
* chore: update lockfile
* fix: don't allow tool tests yet
* fix: comment out extra pytest flags for now
* fix: remove conflicting conftest.py from crewai-tools tests
* fix: resolve dependency conflicts and test issues
- Pin vcrpy to 7.0.0 to fix pytest-recording compatibility
- Comment out types-requests to resolve urllib3 conflict
- Update requests requirement in crewai-tools to >=2.32.0
* chore: update CI workflows and docs for monorepo structure
* chore: update CI workflows and docs for monorepo structure
* fix: actions syntax
* chore: ci publish and pin versions
* fix: add permission to action
* chore: bump version to 1.0.0a1 across all packages
- Updated version to 1.0.0a1 in pyproject.toml for crewai and crewai-tools
- Adjusted version in __init__.py files for consistency
* WIP: v1 docs (#3626)
(cherry picked from commit d46e20fa09bcd2f5916282f5553ddeb7183bd92c)
* docs: parity for all translations
* docs: full name of acronym AMP
* docs: fix lingering unused code
* docs: expand contextual options in docs.json
* docs: add contextual action to request feature on GitHub (#3635)
* chore: apply linting fixes to crewai-tools
* feat: add required env var validation for brightdata
Co-authored-by: Greyson Lalonde <greyson.r.lalonde@gmail.com>
* fix: handle properly anyOf oneOf allOf schema's props
Co-authored-by: Greyson Lalonde <greyson.r.lalonde@gmail.com>
* feat: bump version to 1.0.0a2
* Lorenze/native inference sdks (#3619)
* ruff linted
* using native sdks with litellm fallback
* drop exa
* drop print on completion
* Refactor LLM and utility functions for type consistency
- Updated `max_tokens` parameter in `LLM` class to accept `float` in addition to `int`.
- Modified `create_llm` function to ensure consistent type hints and return types, now returning `LLM | BaseLLM | None`.
- Adjusted type hints for various parameters in `create_llm` and `_llm_via_environment_or_fallback` functions for improved clarity and type safety.
- Enhanced test cases to reflect changes in type handling and ensure proper instantiation of LLM instances.
* fix agent_tests
* fix litellm tests and usagemetrics fix
* drop print
* Refactor LLM event handling and improve test coverage
- Removed commented-out event emission for LLM call failures in `llm.py`.
- Added `from_agent` parameter to `CrewAgentExecutor` for better context in LLM responses.
- Enhanced test for LLM call failure to simulate OpenAI API failure and updated assertions for clarity.
- Updated agent and task ID assertions in tests to ensure they are consistently treated as strings.
* fix test_converter
* fixed tests/agents/test_agent.py
* Refactor LLM context length exception handling and improve provider integration
- Renamed `LLMContextLengthExceededException` to `LLMContextLengthExceededExceptionError` for clarity and consistency.
- Updated LLM class to pass the provider parameter correctly during initialization.
- Enhanced error handling in various LLM provider implementations to raise the new exception type.
- Adjusted tests to reflect the updated exception name and ensure proper error handling in context length scenarios.
* Enhance LLM context window handling across providers
- Introduced CONTEXT_WINDOW_USAGE_RATIO to adjust context window sizes dynamically for Anthropic, Azure, Gemini, and OpenAI LLMs.
- Added validation for context window sizes in Azure and Gemini providers to ensure they fall within acceptable limits.
- Updated context window size calculations to use the new ratio, improving consistency and adaptability across different models.
- Removed hardcoded context window sizes in favor of ratio-based calculations for better flexibility.
* fix test agent again
* fix test agent
* feat: add native LLM providers for Anthropic, Azure, and Gemini
- Introduced new completion implementations for Anthropic, Azure, and Gemini, integrating their respective SDKs.
- Added utility functions for tool validation and extraction to support function calling across LLM providers.
- Enhanced context window management and token usage extraction for each provider.
- Created a common utility module for shared functionality among LLM providers.
* chore: update dependencies and improve context management
- Removed direct dependency on `litellm` from the main dependencies and added it under extras for better modularity.
- Updated the `litellm` dependency specification to allow for greater flexibility in versioning.
- Refactored context length exception handling across various LLM providers to use a consistent error class.
- Enhanced platform-specific dependency markers for NVIDIA packages to ensure compatibility across different systems.
* refactor(tests): update LLM instantiation to include is_litellm flag in test cases
- Modified multiple test cases in test_llm.py to set the is_litellm parameter to True when instantiating the LLM class.
- This change ensures that the tests are aligned with the latest LLM configuration requirements and improves consistency across test scenarios.
- Adjusted relevant assertions and comments to reflect the updated LLM behavior.
* linter
* linted
* revert constants
* fix(tests): correct type hint in expected model description
- Updated the expected description in the test_generate_model_description_dict_field function to use 'Dict' instead of 'dict' for consistency with type hinting conventions.
- This change ensures that the test accurately reflects the expected output format for model descriptions.
* refactor(llm): enhance LLM instantiation and error handling
- Updated the LLM class to include validation for the model parameter, ensuring it is a non-empty string.
- Improved error handling by logging warnings when the native SDK fails, allowing for a fallback to LiteLLM.
- Adjusted the instantiation of LLM in test cases to consistently include the is_litellm flag, aligning with recent changes in LLM configuration.
- Modified relevant tests to reflect these updates, ensuring better coverage and accuracy in testing scenarios.
* fixed test
* refactor(llm): enhance token usage tracking and add copy methods
- Updated the LLM class to track token usage and log callbacks in streaming mode, improving monitoring capabilities.
- Introduced shallow and deep copy methods for the LLM instance, allowing for better management of LLM configurations and parameters.
- Adjusted test cases to instantiate LLM with the is_litellm flag, ensuring alignment with recent changes in LLM configuration.
* refactor(tests): reorganize imports and enhance error messages in test cases
- Cleaned up import statements in test_crew.py for better organization and readability.
- Enhanced error messages in test cases to use `re.escape` for improved regex matching, ensuring more robust error handling.
- Adjusted comments for clarity and consistency across test scenarios.
- Ensured that all necessary modules are imported correctly to avoid potential runtime issues.
* feat: add base devtooling
* fix: ensure dep refs are updated for devtools
* fix: allow pre-release
* feat: allow release after tag
* feat: bump versions to 1.0.0a3
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
* fix: match tag and release title, ignore devtools build for pypi
* fix: allow failed pypi publish
* feat: introduce trigger listing and execution commands for local development (#3643)
* chore: exclude tests from ruff linting
* chore: exclude tests from GitHub Actions linter
* fix: replace print statements with logger in agent and memory handling
* chore: add noqa for intentional print in printer utility
* fix: resolve linting errors across codebase
* feat: update docs with new approach to consume Platform Actions (#3675)
* fix: remove duplicate line and add explicit env var
* feat: bump versions to 1.0.0a4 (#3686)
* Update triggers docs (#3678)
* docs: introduce triggers list & triggers run command
* docs: add KO triggers docs
* docs: ensure CREWAI_PLATFORM_INTEGRATION_TOKEN is mentioned on docs (#3687)
* Lorenze/bedrock llm (#3693)
* feat: add AWS Bedrock support and update dependencies
- Introduced BedrockCompletion class for AWS Bedrock integration in LLM.
- Added boto3 as a new dependency in both pyproject.toml and uv.lock.
- Updated LLM class to support Bedrock provider.
- Created new files for Bedrock provider implementation.
* using converse api
* converse
* linted
* refactor: update BedrockCompletion class to improve parameter handling
- Changed max_tokens from a fixed integer to an optional integer.
- Simplified model ID assignment by removing the inference profile mapping method.
- Cleaned up comments and unnecessary code related to tool specifications and model-specific parameters.
* feat: improve event bus thread safety and async support
Add thread-safe, async-compatible event bus with read–write locking and
handler dependency ordering. Remove blinker dependency and implement
direct dispatch. Improve type safety, error handling, and deterministic
event synchronization.
Refactor tests to auto-wait for async handlers, ensure clean teardown,
and add comprehensive concurrency coverage. Replace thread-local state
in AgentEvaluator with instance-based locking for correct cross-thread
access. Enhance tracing reliability and event finalization.
* feat: enhance OpenAICompletion class with additional client parameters (#3701)
* feat: enhance OpenAICompletion class with additional client parameters
- Added support for default_headers, default_query, and client_params in the OpenAICompletion class.
- Refactored client initialization to use a dedicated method for client parameter retrieval.
- Introduced new test cases to validate the correct usage of OpenAICompletion with various parameters.
* fix: correct test case for unsupported OpenAI model
- Updated the test_openai.py to ensure that the LLM instance is created before calling the method, maintaining proper error handling for unsupported models.
- This change ensures that the test accurately checks for the NotFoundError when an invalid model is specified.
* fix: enhance error handling in OpenAICompletion class
- Added specific exception handling for NotFoundError and APIConnectionError in the OpenAICompletion class to provide clearer error messages and improve logging.
- Updated the test case for unsupported models to ensure it raises a ValueError with the appropriate message when a non-existent model is specified.
- This change improves the robustness of the OpenAI API integration and enhances the clarity of error reporting.
* fix: improve test for unsupported OpenAI model handling
- Refactored the test case in test_openai.py to create the LLM instance after mocking the OpenAI client, ensuring proper error handling for unsupported models.
- This change enhances the clarity of the test by accurately checking for ValueError when a non-existent model is specified, aligning with recent improvements in error handling for the OpenAICompletion class.
* feat: bump versions to 1.0.0b1 (#3706)
* Lorenze/tools drop litellm (#3710)
* completely drop litellm and correctly pass config for qdrant
* feat: add support for additional embedding models in EmbeddingService
- Expanded the list of supported embedding models to include Google Vertex, Hugging Face, Jina, Ollama, OpenAI, Roboflow, Watson X, custom embeddings, Sentence Transformers, Text2Vec, OpenClip, and Instructor.
- This enhancement improves the versatility of the EmbeddingService by allowing integration with a wider range of embedding providers.
* fix: update collection parameter handling in CrewAIRagAdapter
- Changed the condition for setting vectors_config in the CrewAIRagAdapter to check for QdrantConfig instance instead of using hasattr. This improves type safety and ensures proper configuration handling for Qdrant integration.
* moved stagehand as optional dep (#3712)
* feat: bump versions to 1.0.0b2 (#3713)
* feat: enhance AnthropicCompletion class with additional client parame… (#3707)
* feat: enhance AnthropicCompletion class with additional client parameters and tool handling
- Added support for client_params in the AnthropicCompletion class to allow for additional client configuration.
- Refactored client initialization to use a dedicated method for retrieving client parameters.
- Implemented a new method to handle tool use conversation flow, ensuring proper execution and response handling.
- Introduced comprehensive test cases to validate the functionality of the AnthropicCompletion class, including tool use scenarios and parameter handling.
* drop print statements
* test: add fixture to mock ANTHROPIC_API_KEY for tests
- Introduced a pytest fixture to automatically mock the ANTHROPIC_API_KEY environment variable for all tests in the test_anthropic.py module.
- This change ensures that tests can run without requiring a real API key, improving test isolation and reliability.
* refactor: streamline streaming message handling in AnthropicCompletion class
- Removed the 'stream' parameter from the API call as it is set internally by the SDK.
- Simplified the handling of tool use events and response construction by extracting token usage from the final message.
- Enhanced the flow for managing tool use conversation, ensuring proper integration with the streaming API response.
* fix streaming here too
* fix: improve error handling in tool conversion for AnthropicCompletion class
- Enhanced exception handling during tool conversion by catching KeyError and ValueError.
- Added logging for conversion errors to aid in debugging and maintain robustness in tool integration.
* feat: enhance GeminiCompletion class with client parameter support (#3717)
* feat: enhance GeminiCompletion class with client parameter support
- Added support for client_params in the GeminiCompletion class to allow for additional client configuration.
- Refactored client initialization into a dedicated method for improved parameter handling.
- Introduced a new method to retrieve client parameters, ensuring compatibility with the base class.
- Enhanced error handling during client initialization to provide clearer messages for missing configuration.
- Updated documentation to reflect the changes in client parameter usage.
* add optional dependancies
* refactor: update test fixture to mock GOOGLE_API_KEY
- Renamed the fixture from `mock_anthropic_api_key` to `mock_google_api_key` to reflect the change in the environment variable being mocked.
- This update ensures that all tests in the module can run with a mocked GOOGLE_API_KEY, improving test isolation and reliability.
* fix tests
* feat: enhance BedrockCompletion class with advanced features
* feat: enhance BedrockCompletion class with advanced features and error handling
- Added support for guardrail configuration, additional model request fields, and custom response field paths in the BedrockCompletion class.
- Improved error handling for AWS exceptions and added token usage tracking with stop reason logging.
- Enhanced streaming response handling with comprehensive event management, including tool use and content block processing.
- Updated documentation to reflect new features and initialization parameters.
- Introduced a new test suite for BedrockCompletion to validate functionality and ensure robust integration with AWS Bedrock APIs.
* chore: add boto typing
* fix: use typing_extensions.Required for Python 3.10 compatibility
---------
Co-authored-by: Greyson Lalonde <greyson.r.lalonde@gmail.com>
* feat: azure native tests
* feat: add Azure AI Inference support and related tests
- Introduced the `azure-ai-inference` package with version `1.0.0b9` and its dependencies in `uv.lock` and `pyproject.toml`.
- Added new test files for Azure LLM functionality, including tests for Azure completion and tool handling.
- Implemented comprehensive test cases to validate Azure-specific behavior and integration with the CrewAI framework.
- Enhanced the testing framework to mock Azure credentials and ensure proper isolation during tests.
* feat: enhance AzureCompletion class with Azure OpenAI support
- Added support for the Azure OpenAI endpoint in the AzureCompletion class, allowing for flexible endpoint configurations.
- Implemented endpoint validation and correction to ensure proper URL formats for Azure OpenAI deployments.
- Enhanced error handling to provide clearer messages for common HTTP errors, including authentication and rate limit issues.
- Updated tests to validate the new endpoint handling and error messaging, ensuring robust integration with Azure AI Inference.
- Refactored parameter preparation to conditionally include the model parameter based on the endpoint type.
* refactor: convert project module to metaclass with full typing
* Lorenze/OpenAI base url backwards support (#3723)
* fix: enhance OpenAICompletion class base URL handling
- Updated the base URL assignment in the OpenAICompletion class to prioritize the new `api_base` attribute and fallback to the environment variable `OPENAI_BASE_URL` if both are not set.
- Added `api_base` to the list of parameters in the OpenAICompletion class to ensure proper configuration and flexibility in API endpoint management.
* feat: enhance OpenAICompletion class with api_base support
- Added the `api_base` parameter to the OpenAICompletion class to allow for flexible API endpoint configuration.
- Updated the `_get_client_params` method to prioritize `base_url` over `api_base`, ensuring correct URL handling.
- Introduced comprehensive tests to validate the behavior of `api_base` and `base_url` in various scenarios, including environment variable fallback.
- Enhanced test coverage for client parameter retrieval, ensuring robust integration with the OpenAI API.
* fix: improve OpenAICompletion class configuration handling
- Added a debug print statement to log the client configuration parameters during initialization for better traceability.
- Updated the base URL assignment logic to ensure it defaults to None if no valid base URL is provided, enhancing robustness in API endpoint configuration.
- Refined the retrieval of the `api_base` environment variable to streamline the configuration process.
* drop print
* feat: improvements on import native sdk support (#3725)
* feat: add support for Anthropic provider and enhance logging
- Introduced the `anthropic` package with version `0.69.0` in `pyproject.toml` and `uv.lock`, allowing for integration with the Anthropic API.
- Updated logging in the LLM class to provide clearer error messages when importing native providers, enhancing debugging capabilities.
- Improved error handling in the AnthropicCompletion class to guide users on installation via the updated error message format.
- Refactored import error handling in other provider classes to maintain consistency in error messaging and installation instructions.
* feat: enhance LLM support with Bedrock provider and update dependencies
- Added support for the `bedrock` provider in the LLM class, allowing integration with AWS Bedrock APIs.
- Updated `uv.lock` to replace `boto3` with `bedrock` in the dependencies, reflecting the new provider structure.
- Introduced `SUPPORTED_NATIVE_PROVIDERS` to include `bedrock` and ensure proper error handling when instantiating native providers.
- Enhanced error handling in the LLM class to raise informative errors when native provider instantiation fails.
- Added tests to validate the behavior of the new Bedrock provider and ensure fallback mechanisms work correctly for unsupported providers.
* test: update native provider fallback tests to expect ImportError
* adjust the test with the expected bevaior - raising ImportError
* this is exoecting the litellm format, all gemini native tests are in test_google.py
---------
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
* fix: remove stdout prints, improve test determinism, and update trace handling
Removed `print` statements from the `LLMStreamChunkEvent` handler to prevent
LLM response chunks from being written directly to stdout. The listener now
only tracks chunks internally.
Fixes #3715
Added explicit return statements for trace-related tests.
Updated cassette for `test_failed_evaluation` to reflect new behavior where
an empty trace dict is used instead of returning early.
Ensured deterministic cleanup order in test fixtures by making
`clear_event_bus_handlers` depend on `setup_test_environment`. This guarantees
event bus shutdown and file handle cleanup occur before temporary directory
deletion, resolving intermittent “Directory not empty” errors in CI.
* chore: remove lib/crewai exclusion from pre-commit hooks
* feat: enhance task guardrail functionality and validation
* feat: enhance task guardrail functionality and validation
- Introduced support for multiple guardrails in the Task class, allowing for sequential processing of guardrails.
- Added a new `guardrails` field to the Task model to accept a list of callable guardrails or string descriptions.
- Implemented validation to ensure guardrails are processed correctly, including handling of retries and error messages.
- Enhanced the `_invoke_guardrail_function` method to manage guardrail execution and integrate with existing task output processing.
- Updated tests to cover various scenarios involving multiple guardrails, including success, failure, and retry mechanisms.
This update improves the flexibility and robustness of task execution by allowing for more complex validation scenarios.
* refactor: enhance guardrail type handling in Task model
- Updated the Task class to improve guardrail type definitions, introducing GuardrailType and GuardrailsType for better clarity and type safety.
- Simplified the validation logic for guardrails, ensuring that both single and multiple guardrails are processed correctly.
- Enhanced error messages for guardrail validation to provide clearer feedback when incorrect types are provided.
- This refactor improves the maintainability and robustness of task execution by standardizing guardrail handling.
* feat: implement per-guardrail retry tracking in Task model
- Introduced a new private attribute `_guardrail_retry_counts` to the Task class for tracking retry attempts on a per-guardrail basis.
- Updated the guardrail processing logic to utilize the new retry tracking, allowing for independent retry counts for each guardrail.
- Enhanced error handling to provide clearer feedback when guardrails fail validation after exceeding retry limits.
- Modified existing tests to validate the new retry tracking behavior, ensuring accurate assertions on guardrail retries.
This update improves the robustness and flexibility of task execution by allowing for more granular control over guardrail validation and retry mechanisms.
* chore: 1.0.0b3 bump (#3734)
* chore: full ruff and mypy
improved linting, pre-commit setup, and internal architecture. Configured Ruff to respect .gitignore, added stricter rules, and introduced a lock pre-commit hook with virtualenv activation. Fixed type shadowing in EXASearchTool using a type_ alias to avoid PEP 563 conflicts and resolved circular imports in agent executor and guardrail modules. Removed agent-ops attributes, deprecated watson alias, and dropped crewai-enterprise tools with corresponding test updates. Refactored cache and memoization for thread safety and cleaned up structured output adapters and related logic.
* New MCL DSL (#3738)
* Adding MCP implementation
* New tests for MCP implementation
* fix tests
* update docs
* Revert "New tests for MCP implementation"
This reverts commit 0bbe6dee90.
* linter
* linter
* fix
* verify mcp pacakge exists
* adjust docs to be clear only remote servers are supported
* reverted
* ensure args schema generated properly
* properly close out
---------
Co-authored-by: lorenzejay <lorenzejaytech@gmail.com>
Co-authored-by: Greyson Lalonde <greyson.r.lalonde@gmail.com>
* feat: a2a experimental
experimental a2a support
---------
Co-authored-by: Lucas Gomide <lucaslg200@gmail.com>
Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
Co-authored-by: Mike Plachta <mplachta@users.noreply.github.com>
Co-authored-by: João Moura <joaomdmoura@gmail.com>
1683 lines
54 KiB
Python
1683 lines
54 KiB
Python
"""Test Agent creation and execution basic functionality."""
|
|
|
|
import ast
|
|
import json
|
|
import os
|
|
import time
|
|
from functools import partial
|
|
from hashlib import md5
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from pydantic import BaseModel
|
|
from pydantic_core import ValidationError
|
|
|
|
from crewai import Agent, Crew, Process, Task
|
|
from crewai.tasks.conditional_task import ConditionalTask
|
|
from crewai.tasks.task_output import TaskOutput
|
|
from crewai.utilities.converter import Converter
|
|
from crewai.utilities.string_utils import interpolate_only
|
|
|
|
|
|
def test_task_tool_reflect_agent_tools():
|
|
from crewai.tools import tool
|
|
|
|
@tool
|
|
def fake_tool() -> None:
|
|
"Fake tool"
|
|
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
tools=[fake_tool],
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 ideas.",
|
|
agent=researcher,
|
|
)
|
|
|
|
assert task.tools == [fake_tool]
|
|
|
|
|
|
def test_task_tool_takes_precedence_over_agent_tools():
|
|
from crewai.tools import tool
|
|
|
|
@tool
|
|
def fake_tool() -> None:
|
|
"Fake tool"
|
|
|
|
@tool
|
|
def fake_task_tool() -> None:
|
|
"Fake tool"
|
|
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
tools=[fake_tool],
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 ideas.",
|
|
agent=researcher,
|
|
tools=[fake_task_tool],
|
|
)
|
|
|
|
assert task.tools == [fake_task_tool]
|
|
|
|
|
|
def test_task_prompt_includes_expected_output():
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
agent=researcher,
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task") as execute:
|
|
execute.return_value = "ok"
|
|
task.execute_sync(agent=researcher)
|
|
execute.assert_called_once_with(task=task, context=None, tools=[])
|
|
|
|
|
|
def test_task_callback():
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task_completed = MagicMock(return_value="done")
|
|
|
|
task = Task(
|
|
name="Brainstorm",
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
agent=researcher,
|
|
callback=task_completed,
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task") as execute:
|
|
execute.return_value = "ok"
|
|
task.execute_sync(agent=researcher)
|
|
task_completed.assert_called_once_with(task.output)
|
|
|
|
assert task.output.description == task.description
|
|
assert task.output.expected_output == task.expected_output
|
|
assert task.output.name == task.name
|
|
|
|
|
|
def test_task_callback_returns_task_output():
|
|
from crewai.tasks.output_format import OutputFormat
|
|
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task_completed = MagicMock(return_value="done")
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
agent=researcher,
|
|
callback=task_completed,
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task") as execute:
|
|
execute.return_value = "exported_ok"
|
|
task.execute_sync(agent=researcher)
|
|
# Ensure the callback is called with a TaskOutput object serialized to JSON
|
|
task_completed.assert_called_once()
|
|
callback_data = task_completed.call_args[0][0]
|
|
|
|
# Check if callback_data is TaskOutput object or JSON string
|
|
if isinstance(callback_data, TaskOutput):
|
|
callback_data = json.dumps(callback_data.model_dump())
|
|
|
|
assert isinstance(callback_data, str)
|
|
output_dict = json.loads(callback_data)
|
|
expected_output = {
|
|
"description": task.description,
|
|
"raw": "exported_ok",
|
|
"pydantic": None,
|
|
"json_dict": None,
|
|
"agent": researcher.role,
|
|
"summary": "Give me a list of 5 interesting ideas to explore...",
|
|
"name": task.name or task.description,
|
|
"expected_output": "Bullet point list of 5 interesting ideas.",
|
|
"output_format": OutputFormat.RAW,
|
|
}
|
|
assert output_dict == expected_output
|
|
|
|
|
|
def test_execute_with_agent():
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task", return_value="ok") as execute:
|
|
task.execute_sync(agent=researcher)
|
|
execute.assert_called_once_with(task=task, context=None, tools=[])
|
|
|
|
|
|
def test_async_execution():
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
async_execution=True,
|
|
agent=researcher,
|
|
)
|
|
|
|
with patch.object(Agent, "execute_task", return_value="ok") as execute:
|
|
execution = task.execute_async(agent=researcher)
|
|
result = execution.result()
|
|
assert result.raw == "ok"
|
|
execute.assert_called_once_with(task=task, context=None, tools=[])
|
|
|
|
|
|
def test_multiple_output_type_error():
|
|
class Output(BaseModel):
|
|
field: str
|
|
|
|
with pytest.raises(ValidationError):
|
|
Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
output_json=Output,
|
|
output_pydantic=Output,
|
|
)
|
|
|
|
|
|
def test_guardrail_type_error():
|
|
desc = "Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting."
|
|
expected_output = "Bullet point list of 5 interesting ideas."
|
|
# Lambda function
|
|
Task(
|
|
description=desc,
|
|
expected_output=expected_output,
|
|
guardrail=lambda x: (True, x),
|
|
)
|
|
|
|
# Function
|
|
def guardrail_fn(x: TaskOutput) -> tuple[bool, TaskOutput]:
|
|
return (True, x)
|
|
|
|
Task(
|
|
description=desc,
|
|
expected_output=expected_output,
|
|
guardrail=guardrail_fn,
|
|
)
|
|
|
|
class Object:
|
|
def guardrail_fn(self, x: TaskOutput) -> tuple[bool, TaskOutput]:
|
|
return (True, x)
|
|
|
|
@classmethod
|
|
def guardrail_class_fn(cls, x: TaskOutput) -> tuple[bool, str]:
|
|
return (True, x)
|
|
|
|
@staticmethod
|
|
def guardrail_static_fn(x: TaskOutput) -> tuple[bool, str | TaskOutput]:
|
|
return (True, x)
|
|
|
|
obj = Object()
|
|
# Method
|
|
Task(
|
|
description=desc,
|
|
expected_output=expected_output,
|
|
guardrail=obj.guardrail_fn,
|
|
)
|
|
# Class method
|
|
Task(
|
|
description=desc,
|
|
expected_output=expected_output,
|
|
guardrail=Object.guardrail_class_fn,
|
|
)
|
|
# Static method
|
|
Task(
|
|
description=desc,
|
|
expected_output=expected_output,
|
|
guardrail=Object.guardrail_static_fn,
|
|
)
|
|
|
|
def error_fn(x: TaskOutput, y: bool) -> tuple[bool, TaskOutput]:
|
|
return (y, x)
|
|
|
|
Task(
|
|
description=desc,
|
|
expected_output=expected_output,
|
|
guardrail=partial(error_fn, y=True),
|
|
)
|
|
|
|
with pytest.raises(ValidationError):
|
|
Task(
|
|
description=desc,
|
|
expected_output=expected_output,
|
|
guardrail=error_fn,
|
|
)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_pydantic_sequential():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_pydantic=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task], process=Process.sequential)
|
|
result = crew.kickoff()
|
|
assert isinstance(result.pydantic, ScoreOutput)
|
|
assert result.to_dict() == {"score": 4}
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_pydantic_hierarchical():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_pydantic=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[scorer],
|
|
tasks=[task],
|
|
process=Process.hierarchical,
|
|
manager_llm="gpt-4o",
|
|
)
|
|
result = crew.kickoff()
|
|
assert isinstance(result.pydantic, ScoreOutput)
|
|
assert result.to_dict() == {"score": 4}
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_json_sequential():
|
|
import uuid
|
|
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
output_file = f"score_{uuid.uuid4()}.json"
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_json=ScoreOutput,
|
|
output_file=output_file,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task], process=Process.sequential)
|
|
result = crew.kickoff()
|
|
assert '{"score": 4}' == result.json
|
|
assert result.to_dict() == {"score": 4}
|
|
|
|
if os.path.exists(output_file):
|
|
os.remove(output_file)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_json_hierarchical():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_json=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[scorer],
|
|
tasks=[task],
|
|
process=Process.hierarchical,
|
|
manager_llm="gpt-4o",
|
|
)
|
|
result = crew.kickoff()
|
|
assert result.json == '{"score": 4}'
|
|
assert result.to_dict() == {"score": 4}
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_inject_date():
|
|
reporter = Agent(
|
|
role="Reporter",
|
|
goal="Report the date",
|
|
backstory="You're an expert reporter, specialized in reporting the date.",
|
|
allow_delegation=False,
|
|
inject_date=True,
|
|
)
|
|
|
|
task = Task(
|
|
description="What is the date today?",
|
|
expected_output="The date today as you were told, same format as the date you were told.",
|
|
agent=reporter,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[reporter],
|
|
tasks=[task],
|
|
process=Process.sequential,
|
|
)
|
|
result = crew.kickoff()
|
|
assert "2025-05-21" in result.raw
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_inject_date_custom_format():
|
|
reporter = Agent(
|
|
role="Reporter",
|
|
goal="Report the date",
|
|
backstory="You're an expert reporter, specialized in reporting the date.",
|
|
allow_delegation=False,
|
|
inject_date=True,
|
|
date_format="%B %d, %Y",
|
|
)
|
|
|
|
task = Task(
|
|
description="What is the date today?",
|
|
expected_output="The date today.",
|
|
agent=reporter,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[reporter],
|
|
tasks=[task],
|
|
process=Process.sequential,
|
|
)
|
|
result = crew.kickoff()
|
|
assert "May 21, 2025" in result.raw
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_no_inject_date():
|
|
reporter = Agent(
|
|
role="Reporter",
|
|
goal="Report the date",
|
|
backstory="You're an expert reporter, specialized in reporting the date.",
|
|
allow_delegation=False,
|
|
inject_date=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="What is the date today?",
|
|
expected_output="The date today.",
|
|
agent=reporter,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[reporter],
|
|
tasks=[task],
|
|
process=Process.sequential,
|
|
)
|
|
result = crew.kickoff()
|
|
assert "2025-05-21" not in result.raw
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_json_property_without_output_json():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_pydantic=ScoreOutput, # Using output_pydantic instead of output_json
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task], process=Process.sequential)
|
|
result = crew.kickoff()
|
|
|
|
with pytest.raises(ValueError) as excinfo:
|
|
_ = result.json # Attempt to access the json property
|
|
|
|
assert "No JSON output found in the final task." in str(excinfo.value)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_json_dict_sequential():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_json=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task], process=Process.sequential)
|
|
result = crew.kickoff()
|
|
assert {"score": 4} == result.json_dict
|
|
assert result.to_dict() == {"score": 4}
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_json_dict_hierarchical():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_json=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[scorer],
|
|
tasks=[task],
|
|
process=Process.hierarchical,
|
|
manager_llm="gpt-4o",
|
|
)
|
|
result = crew.kickoff()
|
|
assert {"score": 4} == result.json_dict
|
|
assert result.to_dict() == {"score": 4}
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_pydantic_to_another_task():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
llm="gpt-4-0125-preview",
|
|
function_calling_llm="gpt-3.5-turbo-0125",
|
|
verbose=True,
|
|
)
|
|
|
|
task1 = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_pydantic=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
task2 = Task(
|
|
description="Given the score the title 'The impact of AI in the future of work' got, give me an integer score between 1-5 for the following title: 'Return of the Jedi', you MUST give it a score, use your best judgment",
|
|
expected_output="The score of the title.",
|
|
output_pydantic=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task1, task2], verbose=True)
|
|
result = crew.kickoff()
|
|
pydantic_result = result.pydantic
|
|
assert isinstance(pydantic_result, ScoreOutput), (
|
|
"Expected pydantic result to be of type ScoreOutput"
|
|
)
|
|
assert pydantic_result.score == 5
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_output_json_to_another_task():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task1 = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_json=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
task2 = Task(
|
|
description="Given the score the title 'The impact of AI in the future of work' got, give me an integer score between 1-5 for the following title: 'Return of the Jedi'",
|
|
expected_output="The score of the title.",
|
|
output_json=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task1, task2])
|
|
result = crew.kickoff()
|
|
assert '{"score": 4}' == result.json
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_save_task_output():
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_file="score.json",
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task])
|
|
|
|
with patch.object(Task, "_save_file") as save_file:
|
|
save_file.return_value = None
|
|
crew.kickoff()
|
|
save_file.assert_called_once()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_save_task_json_output():
|
|
from unittest.mock import patch
|
|
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_file="score.json",
|
|
output_json=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task])
|
|
|
|
# Mock only the _save_file method to avoid actual file I/O
|
|
with patch.object(Task, "_save_file") as mock_save:
|
|
result = crew.kickoff()
|
|
assert result is not None
|
|
mock_save.assert_called_once()
|
|
|
|
call_args = mock_save.call_args
|
|
if call_args:
|
|
saved_content = call_args[0][0]
|
|
if isinstance(saved_content, str):
|
|
data = json.loads(saved_content)
|
|
assert "score" in data
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_save_task_pydantic_output():
|
|
import uuid
|
|
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
output_file = f"score_{uuid.uuid4()}.json"
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_file=output_file,
|
|
output_pydantic=ScoreOutput,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task])
|
|
crew.kickoff()
|
|
|
|
output_file_exists = os.path.exists(output_file)
|
|
assert output_file_exists
|
|
assert {"score": 4} == json.loads(open(output_file).read())
|
|
if output_file_exists:
|
|
os.remove(output_file)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_custom_converter_cls():
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
class ScoreConverter(Converter):
|
|
pass
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
output_pydantic=ScoreOutput,
|
|
converter_cls=ScoreConverter,
|
|
agent=scorer,
|
|
)
|
|
|
|
crew = Crew(agents=[scorer], tasks=[task])
|
|
|
|
with patch.object(
|
|
ScoreConverter, "to_pydantic", return_value=ScoreOutput(score=5)
|
|
) as mock_to_pydantic:
|
|
crew.kickoff()
|
|
mock_to_pydantic.assert_called_once()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_increment_delegations_for_hierarchical_process():
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[scorer],
|
|
tasks=[task],
|
|
process=Process.hierarchical,
|
|
manager_llm="gpt-4o",
|
|
)
|
|
|
|
with patch.object(Task, "increment_delegations") as increment_delegations:
|
|
increment_delegations.return_value = None
|
|
crew.kickoff()
|
|
increment_delegations.assert_called_once()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_increment_delegations_for_sequential_process():
|
|
manager = Agent(
|
|
role="Manager",
|
|
goal="Coordinate scoring processes",
|
|
backstory="You're great at delegating work about scoring.",
|
|
allow_delegation=True,
|
|
)
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
allow_delegation=True,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'",
|
|
expected_output="The score of the title.",
|
|
agent=manager,
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[manager, scorer],
|
|
tasks=[task],
|
|
process=Process.sequential,
|
|
)
|
|
|
|
with patch.object(Task, "increment_delegations") as increment_delegations:
|
|
increment_delegations.return_value = None
|
|
crew.kickoff()
|
|
increment_delegations.assert_called_once()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_increment_tool_errors():
|
|
from crewai.tools import tool
|
|
|
|
@tool
|
|
def scoring_examples() -> None:
|
|
"Useful examples for scoring titles."
|
|
raise Exception("Error")
|
|
|
|
scorer = Agent(
|
|
role="Scorer",
|
|
goal="Score the title",
|
|
backstory="You're an expert scorer, specialized in scoring titles.",
|
|
tools=[scoring_examples],
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work', check examples to based your evaluation.",
|
|
expected_output="The score of the title.",
|
|
)
|
|
|
|
crew = Crew(
|
|
agents=[scorer],
|
|
tasks=[task],
|
|
process=Process.hierarchical,
|
|
manager_llm="gpt-4-0125-preview",
|
|
)
|
|
|
|
with patch.object(Task, "increment_tools_errors") as increment_tools_errors:
|
|
increment_tools_errors.return_value = None
|
|
crew.kickoff()
|
|
assert len(increment_tools_errors.mock_calls) > 0
|
|
|
|
|
|
def test_task_definition_based_on_dict():
|
|
config = {
|
|
"description": "Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work', check examples to based your evaluation.",
|
|
"expected_output": "The score of the title.",
|
|
}
|
|
|
|
task = Task(**config)
|
|
|
|
assert task.description == config["description"]
|
|
assert task.expected_output == config["expected_output"]
|
|
assert task.agent is None
|
|
|
|
|
|
def test_conditional_task_definition_based_on_dict():
|
|
config = {
|
|
"description": "Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work', check examples to based your evaluation.",
|
|
"expected_output": "The score of the title.",
|
|
}
|
|
|
|
task = ConditionalTask(**config, condition=lambda x: True)
|
|
|
|
assert task.description == config["description"]
|
|
assert task.expected_output == config["expected_output"]
|
|
assert task.agent is None
|
|
|
|
|
|
def test_conditional_task_copy_preserves_type():
|
|
task_config = {
|
|
"description": "Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work', check examples to based your evaluation.",
|
|
"expected_output": "The score of the title.",
|
|
}
|
|
original_task = Task(**task_config)
|
|
copied_task = original_task.copy(agents=[], task_mapping={})
|
|
assert isinstance(copied_task, Task)
|
|
|
|
original_conditional_config = {
|
|
"description": "Give me an integer score between 1-5 for the following title: 'The impact of AI in the future of work'. Check examples to base your evaluation on.",
|
|
"expected_output": "The score of the title.",
|
|
"condition": lambda x: True,
|
|
}
|
|
original_conditional_task = ConditionalTask(**original_conditional_config)
|
|
copied_conditional_task = original_conditional_task.copy(agents=[], task_mapping={})
|
|
assert isinstance(copied_conditional_task, ConditionalTask)
|
|
|
|
|
|
def test_interpolate_inputs(tmp_path):
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas about {topic} to explore for an article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas about {topic}.",
|
|
output_file=str(tmp_path / "{topic}" / "output_{date}.txt"),
|
|
)
|
|
|
|
task.interpolate_inputs_and_add_conversation_history(
|
|
inputs={"topic": "AI", "date": "2025"}
|
|
)
|
|
assert (
|
|
task.description
|
|
== "Give me a list of 5 interesting ideas about AI to explore for an article, what makes them unique and interesting."
|
|
)
|
|
assert task.expected_output == "Bullet point list of 5 interesting ideas about AI."
|
|
assert task.output_file == str(tmp_path / "AI" / "output_2025.txt")
|
|
|
|
task.interpolate_inputs_and_add_conversation_history(
|
|
inputs={"topic": "ML", "date": "2025"}
|
|
)
|
|
assert (
|
|
task.description
|
|
== "Give me a list of 5 interesting ideas about ML to explore for an article, what makes them unique and interesting."
|
|
)
|
|
assert task.expected_output == "Bullet point list of 5 interesting ideas about ML."
|
|
assert task.output_file == str(tmp_path / "ML" / "output_2025.txt")
|
|
|
|
|
|
def test_interpolate_only():
|
|
"""Test the interpolate_only method for various scenarios including JSON structure preservation."""
|
|
|
|
# Test JSON structure preservation
|
|
json_string = '{"info": "Look at {placeholder}", "nested": {"val": "{nestedVal}"}}'
|
|
result = interpolate_only(
|
|
input_string=json_string,
|
|
inputs={"placeholder": "the data", "nestedVal": "something else"},
|
|
)
|
|
assert '"info": "Look at the data"' in result
|
|
assert '"val": "something else"' in result
|
|
assert "{placeholder}" not in result
|
|
assert "{nestedVal}" not in result
|
|
|
|
# Test normal string interpolation
|
|
normal_string = "Hello {name}, welcome to {place}!"
|
|
result = interpolate_only(
|
|
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
|
|
)
|
|
assert result == "Hello John, welcome to CrewAI!"
|
|
|
|
# Test empty string
|
|
result = interpolate_only(input_string="", inputs={"unused": "value"})
|
|
assert result == ""
|
|
|
|
# Test string with no placeholders
|
|
no_placeholders = "Hello, this is a test"
|
|
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
|
|
assert result == no_placeholders
|
|
|
|
|
|
def test_interpolate_only_with_dict_inside_expected_output():
|
|
"""Test the interpolate_only method for various scenarios including JSON structure preservation."""
|
|
|
|
json_string = '{"questions": {"main_question": "What is the user\'s name?", "secondary_question": "What is the user\'s age?"}}'
|
|
result = interpolate_only(
|
|
input_string=json_string,
|
|
inputs={
|
|
"questions": {
|
|
"main_question": "What is the user's name?",
|
|
"secondary_question": "What is the user's age?",
|
|
}
|
|
},
|
|
)
|
|
assert '"main_question": "What is the user\'s name?"' in result
|
|
assert '"secondary_question": "What is the user\'s age?"' in result
|
|
assert result == json_string
|
|
|
|
normal_string = "Hello {name}, welcome to {place}!"
|
|
result = interpolate_only(
|
|
input_string=normal_string, inputs={"name": "John", "place": "CrewAI"}
|
|
)
|
|
assert result == "Hello John, welcome to CrewAI!"
|
|
|
|
result = interpolate_only(input_string="", inputs={"unused": "value"})
|
|
assert result == ""
|
|
|
|
no_placeholders = "Hello, this is a test"
|
|
result = interpolate_only(input_string=no_placeholders, inputs={"unused": "value"})
|
|
assert result == no_placeholders
|
|
|
|
|
|
def test_task_output_str_with_pydantic():
|
|
from crewai.tasks.output_format import OutputFormat
|
|
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
score_output = ScoreOutput(score=4)
|
|
task_output = TaskOutput(
|
|
description="Test task",
|
|
agent="Test Agent",
|
|
pydantic=score_output,
|
|
output_format=OutputFormat.PYDANTIC,
|
|
)
|
|
|
|
assert str(task_output) == str(score_output)
|
|
|
|
|
|
def test_task_output_str_with_json_dict():
|
|
from crewai.tasks.output_format import OutputFormat
|
|
|
|
json_dict = {"score": 4}
|
|
task_output = TaskOutput(
|
|
description="Test task",
|
|
agent="Test Agent",
|
|
json_dict=json_dict,
|
|
output_format=OutputFormat.JSON,
|
|
)
|
|
|
|
assert str(task_output) == str(json_dict)
|
|
|
|
|
|
def test_task_output_str_with_raw():
|
|
from crewai.tasks.output_format import OutputFormat
|
|
|
|
raw_output = "Raw task output"
|
|
task_output = TaskOutput(
|
|
description="Test task",
|
|
agent="Test Agent",
|
|
raw=raw_output,
|
|
output_format=OutputFormat.RAW,
|
|
)
|
|
|
|
assert str(task_output) == raw_output
|
|
|
|
|
|
def test_task_output_str_with_pydantic_and_json_dict():
|
|
from crewai.tasks.output_format import OutputFormat
|
|
|
|
class ScoreOutput(BaseModel):
|
|
score: int
|
|
|
|
score_output = ScoreOutput(score=4)
|
|
json_dict = {"score": 4}
|
|
task_output = TaskOutput(
|
|
description="Test task",
|
|
agent="Test Agent",
|
|
pydantic=score_output,
|
|
json_dict=json_dict,
|
|
output_format=OutputFormat.PYDANTIC,
|
|
)
|
|
|
|
# When both pydantic and json_dict are present, pydantic should take precedence
|
|
assert str(task_output) == str(score_output)
|
|
|
|
|
|
def test_task_output_str_with_none():
|
|
from crewai.tasks.output_format import OutputFormat
|
|
|
|
task_output = TaskOutput(
|
|
description="Test task",
|
|
agent="Test Agent",
|
|
output_format=OutputFormat.RAW,
|
|
)
|
|
|
|
assert str(task_output) == ""
|
|
|
|
|
|
def test_key():
|
|
original_description = "Give me a list of 5 interesting ideas about {topic} to explore for an article, what makes them unique and interesting."
|
|
original_expected_output = "Bullet point list of 5 interesting ideas about {topic}."
|
|
task = Task(
|
|
description=original_description,
|
|
expected_output=original_expected_output,
|
|
)
|
|
hash = md5(
|
|
f"{original_description}|{original_expected_output}".encode(),
|
|
usedforsecurity=False,
|
|
).hexdigest()
|
|
|
|
assert task.key == hash, "The key should be the hash of the description."
|
|
|
|
task.interpolate_inputs_and_add_conversation_history(inputs={"topic": "AI"})
|
|
assert task.key == hash, (
|
|
"The key should be the hash of the non-interpolated description."
|
|
)
|
|
|
|
|
|
def test_output_file_validation(tmp_path):
|
|
"""Test output file path validation."""
|
|
# Valid paths
|
|
assert (
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="output.txt",
|
|
).output_file
|
|
== "output.txt"
|
|
)
|
|
# Use secure temporary path instead of /tmp
|
|
temp_file = tmp_path / "output.txt"
|
|
assert (
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file=str(temp_file),
|
|
).output_file
|
|
== str(temp_file).lstrip("/") # Remove leading slash to match expected behavior
|
|
)
|
|
assert (
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="{dir}/output_{date}.txt",
|
|
).output_file
|
|
== "{dir}/output_{date}.txt"
|
|
)
|
|
|
|
# Invalid paths
|
|
with pytest.raises(ValueError, match="Path traversal"):
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="../output.txt",
|
|
)
|
|
with pytest.raises(ValueError, match="Path traversal"):
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="folder/../output.txt",
|
|
)
|
|
with pytest.raises(ValueError, match="Shell special characters"):
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="output.txt | rm -rf /",
|
|
)
|
|
with pytest.raises(ValueError, match="Shell expansion"):
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="~/output.txt",
|
|
)
|
|
with pytest.raises(ValueError, match="Shell expansion"):
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="$HOME/output.txt",
|
|
)
|
|
with pytest.raises(ValueError, match="Invalid template variable"):
|
|
Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="{invalid-name}/output.txt",
|
|
)
|
|
|
|
|
|
def test_create_directory_true():
|
|
"""Test that directories are created when create_directory=True."""
|
|
from pathlib import Path
|
|
|
|
output_path = "test_create_dir/output.txt"
|
|
|
|
task = Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file=output_path,
|
|
create_directory=True,
|
|
)
|
|
|
|
resolved_path = Path(output_path).expanduser().resolve()
|
|
resolved_dir = resolved_path.parent
|
|
|
|
if resolved_path.exists():
|
|
resolved_path.unlink()
|
|
if resolved_dir.exists():
|
|
import shutil
|
|
|
|
shutil.rmtree(resolved_dir)
|
|
|
|
assert not resolved_dir.exists()
|
|
|
|
task._save_file("test content")
|
|
|
|
assert resolved_dir.exists()
|
|
assert resolved_path.exists()
|
|
|
|
if resolved_path.exists():
|
|
resolved_path.unlink()
|
|
if resolved_dir.exists():
|
|
import shutil
|
|
|
|
shutil.rmtree(resolved_dir)
|
|
|
|
|
|
def test_create_directory_false():
|
|
"""Test that directories are not created when create_directory=False."""
|
|
from pathlib import Path
|
|
|
|
output_path = "nonexistent_test_dir/output.txt"
|
|
|
|
task = Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file=output_path,
|
|
create_directory=False,
|
|
)
|
|
|
|
resolved_path = Path(output_path).expanduser().resolve()
|
|
resolved_dir = resolved_path.parent
|
|
|
|
if resolved_dir.exists():
|
|
import shutil
|
|
|
|
shutil.rmtree(resolved_dir)
|
|
|
|
assert not resolved_dir.exists()
|
|
|
|
with pytest.raises(
|
|
RuntimeError, match=r"Directory .* does not exist and create_directory is False"
|
|
):
|
|
task._save_file("test content")
|
|
|
|
|
|
def test_create_directory_default():
|
|
"""Test that create_directory defaults to True for backward compatibility."""
|
|
task = Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file="output.txt",
|
|
)
|
|
|
|
assert task.create_directory is True
|
|
|
|
|
|
def test_create_directory_with_existing_directory():
|
|
"""Test that create_directory=False works when directory already exists."""
|
|
from pathlib import Path
|
|
|
|
output_path = "existing_test_dir/output.txt"
|
|
|
|
resolved_path = Path(output_path).expanduser().resolve()
|
|
resolved_dir = resolved_path.parent
|
|
resolved_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
task = Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
output_file=output_path,
|
|
create_directory=False,
|
|
)
|
|
|
|
task._save_file("test content")
|
|
assert resolved_path.exists()
|
|
|
|
if resolved_path.exists():
|
|
resolved_path.unlink()
|
|
if resolved_dir.exists():
|
|
import shutil
|
|
|
|
shutil.rmtree(resolved_dir)
|
|
|
|
|
|
def test_github_issue_3149_reproduction():
|
|
"""Test that reproduces the exact issue from GitHub issue #3149."""
|
|
task = Task(
|
|
description="Test task for issue reproduction",
|
|
expected_output="Test output",
|
|
output_file="test_output.txt",
|
|
create_directory=True,
|
|
)
|
|
|
|
assert task.create_directory is True
|
|
assert task.output_file == "test_output.txt"
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_task_execution_times():
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
agent=researcher,
|
|
)
|
|
|
|
assert task.start_time is None
|
|
assert task.end_time is None
|
|
assert task.execution_duration is None
|
|
|
|
task.execute_sync(agent=researcher)
|
|
|
|
assert task.start_time is not None
|
|
assert task.end_time is not None
|
|
assert task.execution_duration == (task.end_time - task.start_time).total_seconds()
|
|
|
|
|
|
def test_interpolate_with_list_of_strings():
|
|
# Test simple list of strings
|
|
input_str = "Available items: {items}"
|
|
inputs = {"items": ["apple", "banana", "cherry"]}
|
|
result = interpolate_only(input_str, inputs)
|
|
assert result == f"Available items: {inputs['items']}"
|
|
|
|
# Test empty list
|
|
empty_list_input = {"items": []}
|
|
result = interpolate_only(input_str, empty_list_input)
|
|
assert result == "Available items: []"
|
|
|
|
|
|
def test_interpolate_with_list_of_dicts():
|
|
input_data = {
|
|
"people": [
|
|
{"name": "Alice", "age": 30, "skills": ["Python", "AI"]},
|
|
{"name": "Bob", "age": 25, "skills": ["Java", "Cloud"]},
|
|
]
|
|
}
|
|
result = interpolate_only("{people}", input_data)
|
|
|
|
parsed_result = ast.literal_eval(result)
|
|
assert isinstance(parsed_result, list)
|
|
assert len(parsed_result) == 2
|
|
assert parsed_result[0]["name"] == "Alice"
|
|
assert parsed_result[0]["age"] == 30
|
|
assert parsed_result[0]["skills"] == ["Python", "AI"]
|
|
assert parsed_result[1]["name"] == "Bob"
|
|
assert parsed_result[1]["age"] == 25
|
|
assert parsed_result[1]["skills"] == ["Java", "Cloud"]
|
|
|
|
|
|
def test_interpolate_with_nested_structures():
|
|
input_data = {
|
|
"company": {
|
|
"name": "TechCorp",
|
|
"departments": [
|
|
{
|
|
"name": "Engineering",
|
|
"employees": 50,
|
|
"tools": ["Git", "Docker", "Kubernetes"],
|
|
},
|
|
{"name": "Sales", "employees": 20, "regions": {"north": 5, "south": 3}},
|
|
],
|
|
}
|
|
}
|
|
result = interpolate_only("{company}", input_data)
|
|
parsed = ast.literal_eval(result)
|
|
|
|
assert parsed["name"] == "TechCorp"
|
|
assert len(parsed["departments"]) == 2
|
|
assert parsed["departments"][0]["tools"] == ["Git", "Docker", "Kubernetes"]
|
|
assert parsed["departments"][1]["regions"]["north"] == 5
|
|
|
|
|
|
def test_interpolate_with_special_characters():
|
|
input_data = {
|
|
"special_data": {
|
|
"quotes": """This has "double" and 'single' quotes""",
|
|
"unicode": "文字化けテスト",
|
|
"symbols": "!@#$%^&*()",
|
|
"empty": "",
|
|
}
|
|
}
|
|
result = interpolate_only("{special_data}", input_data)
|
|
parsed = ast.literal_eval(result)
|
|
|
|
assert parsed["quotes"] == """This has "double" and 'single' quotes"""
|
|
assert parsed["unicode"] == "文字化けテスト"
|
|
assert parsed["symbols"] == "!@#$%^&*()"
|
|
assert parsed["empty"] == ""
|
|
|
|
|
|
def test_interpolate_mixed_types():
|
|
input_data = {
|
|
"data": {
|
|
"name": "Test Dataset",
|
|
"samples": 1000,
|
|
"features": ["age", "income", "location"],
|
|
"metadata": {
|
|
"source": "public",
|
|
"validated": True,
|
|
"tags": ["demo", "test", "temp"],
|
|
},
|
|
}
|
|
}
|
|
result = interpolate_only("{data}", input_data)
|
|
parsed = ast.literal_eval(result)
|
|
|
|
assert parsed["name"] == "Test Dataset"
|
|
assert parsed["samples"] == 1000
|
|
assert parsed["metadata"]["tags"] == ["demo", "test", "temp"]
|
|
|
|
|
|
def test_interpolate_complex_combination():
|
|
input_data = {
|
|
"report": [
|
|
{
|
|
"month": "January",
|
|
"metrics": {"sales": 15000, "expenses": 8000, "profit": 7000},
|
|
"top_products": ["Product A", "Product B"],
|
|
},
|
|
{
|
|
"month": "February",
|
|
"metrics": {"sales": 18000, "expenses": 8500, "profit": 9500},
|
|
"top_products": ["Product C", "Product D"],
|
|
},
|
|
]
|
|
}
|
|
result = interpolate_only("{report}", input_data)
|
|
parsed = ast.literal_eval(result)
|
|
|
|
assert len(parsed) == 2
|
|
assert parsed[0]["month"] == "January"
|
|
assert parsed[1]["metrics"]["profit"] == 9500
|
|
assert "Product D" in parsed[1]["top_products"]
|
|
|
|
|
|
def test_interpolate_invalid_type_validation():
|
|
# Test with invalid top-level type
|
|
with pytest.raises(ValueError) as excinfo:
|
|
interpolate_only("{data}", {"data": set()}) # type: ignore we are purposely testing this failure
|
|
|
|
assert "Unsupported type set" in str(excinfo.value)
|
|
|
|
# Test with invalid nested type
|
|
invalid_nested = {
|
|
"profile": {
|
|
"name": "John",
|
|
"age": 30,
|
|
"tags": {"a", "b", "c"}, # Set is invalid
|
|
}
|
|
}
|
|
with pytest.raises(ValueError) as excinfo:
|
|
interpolate_only("{data}", {"data": invalid_nested})
|
|
assert "Unsupported type set" in str(excinfo.value)
|
|
|
|
|
|
def test_interpolate_custom_object_validation():
|
|
class CustomObject:
|
|
def __init__(self, value):
|
|
self.value = value
|
|
|
|
def __str__(self):
|
|
return str(self.value)
|
|
|
|
# Test with custom object at top level
|
|
with pytest.raises(ValueError) as excinfo:
|
|
interpolate_only("{obj}", {"obj": CustomObject(5)}) # type: ignore we are purposely testing this failure
|
|
assert "Unsupported type CustomObject" in str(excinfo.value)
|
|
|
|
# Test with nested custom object in dictionary
|
|
with pytest.raises(ValueError) as excinfo:
|
|
interpolate_only("{data}", {"data": {"valid": 1, "invalid": CustomObject(5)}})
|
|
assert "Unsupported type CustomObject" in str(excinfo.value)
|
|
|
|
# Test with nested custom object in list
|
|
with pytest.raises(ValueError) as excinfo:
|
|
interpolate_only("{data}", {"data": [1, "valid", CustomObject(5)]})
|
|
assert "Unsupported type CustomObject" in str(excinfo.value)
|
|
|
|
# Test with deeply nested custom object
|
|
with pytest.raises(ValueError) as excinfo:
|
|
interpolate_only(
|
|
"{data}", {"data": {"level1": {"level2": [{"level3": CustomObject(5)}]}}}
|
|
)
|
|
assert "Unsupported type CustomObject" in str(excinfo.value)
|
|
|
|
|
|
def test_interpolate_valid_complex_types():
|
|
# Valid complex structure
|
|
valid_data = {
|
|
"name": "Valid Dataset",
|
|
"stats": {
|
|
"count": 1000,
|
|
"distribution": [0.2, 0.3, 0.5],
|
|
"features": ["age", "income"],
|
|
"nested": {"deep": [1, 2, 3], "deeper": {"a": 1, "b": 2.5}},
|
|
},
|
|
}
|
|
|
|
# Should not raise any errors
|
|
result = interpolate_only("{data}", {"data": valid_data})
|
|
parsed = ast.literal_eval(result)
|
|
assert parsed["name"] == "Valid Dataset"
|
|
assert parsed["stats"]["nested"]["deeper"]["b"] == 2.5
|
|
|
|
|
|
def test_interpolate_edge_cases():
|
|
# Test empty dict and list
|
|
assert interpolate_only("{}", {"data": {}}) == "{}"
|
|
assert interpolate_only("[]", {"data": []}) == "[]"
|
|
|
|
# Test numeric types
|
|
assert interpolate_only("{num}", {"num": 42}) == "42"
|
|
assert interpolate_only("{num}", {"num": 3.14}) == "3.14"
|
|
|
|
# Test boolean values (valid JSON types)
|
|
assert interpolate_only("{flag}", {"flag": True}) == "True"
|
|
assert interpolate_only("{flag}", {"flag": False}) == "False"
|
|
|
|
|
|
def test_interpolate_valid_types():
|
|
# Test with boolean and null values (valid JSON types)
|
|
valid_data = {
|
|
"name": "Test",
|
|
"active": True,
|
|
"deleted": False,
|
|
"optional": None,
|
|
"nested": {"flag": True, "empty": None},
|
|
}
|
|
|
|
result = interpolate_only("{data}", {"data": valid_data})
|
|
parsed = ast.literal_eval(result)
|
|
|
|
assert parsed["active"] is True
|
|
assert parsed["deleted"] is False
|
|
assert parsed["optional"] is None
|
|
assert parsed["nested"]["flag"] is True
|
|
assert parsed["nested"]["empty"] is None
|
|
|
|
|
|
def test_task_with_no_max_execution_time():
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
max_execution_time=None,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
agent=researcher,
|
|
)
|
|
|
|
with patch.object(Agent, "_execute_without_timeout", return_value="ok") as execute:
|
|
result = task.execute_sync(agent=researcher)
|
|
assert result.raw == "ok"
|
|
execute.assert_called_once()
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_task_with_max_execution_time():
|
|
from crewai.tools import tool
|
|
|
|
"""Test that execution raises TimeoutError when max_execution_time is exceeded."""
|
|
|
|
@tool("what amazing tool", result_as_answer=True)
|
|
def my_tool() -> str:
|
|
"My tool"
|
|
time.sleep(1)
|
|
return "okay"
|
|
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents. Use the tool provided to you.",
|
|
backstory=(
|
|
"You're an expert researcher, specialized in technology, software engineering, AI and startups. "
|
|
"You work as a freelancer and are now working on doing research and analysis for a new customer."
|
|
),
|
|
allow_delegation=False,
|
|
tools=[my_tool],
|
|
max_execution_time=4,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
agent=researcher,
|
|
)
|
|
|
|
result = task.execute_sync(agent=researcher)
|
|
assert result.raw == "okay"
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_task_with_max_execution_time_exceeded():
|
|
from crewai.tools import tool
|
|
|
|
"""Test that execution raises TimeoutError when max_execution_time is exceeded."""
|
|
|
|
@tool("what amazing tool", result_as_answer=True)
|
|
def my_tool() -> str:
|
|
"My tool"
|
|
time.sleep(10)
|
|
return "okay"
|
|
|
|
researcher = Agent(
|
|
role="Researcher",
|
|
goal="Make the best research and analysis on content about AI and AI agents. Use the tool provided to you.",
|
|
backstory=(
|
|
"You're an expert researcher, specialized in technology, software engineering, AI and startups. "
|
|
"You work as a freelancer and are now working on doing research and analysis for a new customer."
|
|
),
|
|
allow_delegation=False,
|
|
tools=[my_tool],
|
|
max_execution_time=1,
|
|
)
|
|
|
|
task = Task(
|
|
description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
|
|
expected_output="Bullet point list of 5 interesting ideas.",
|
|
agent=researcher,
|
|
)
|
|
|
|
with pytest.raises(TimeoutError):
|
|
task.execute_sync(agent=researcher)
|
|
|
|
|
|
@pytest.mark.vcr(filter_headers=["authorization"])
|
|
def test_task_interpolation_with_hyphens():
|
|
agent = Agent(
|
|
role="Researcher",
|
|
goal="be an assistant that responds with {interpolation-with-hyphens}",
|
|
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
|
|
allow_delegation=False,
|
|
)
|
|
task = Task(
|
|
description="be an assistant that responds with {interpolation-with-hyphens}",
|
|
expected_output="The response should be addressing: {interpolation-with-hyphens}",
|
|
agent=agent,
|
|
)
|
|
crew = Crew(
|
|
agents=[agent],
|
|
tasks=[task],
|
|
verbose=True,
|
|
)
|
|
result = crew.kickoff(inputs={"interpolation-with-hyphens": "say hello world"})
|
|
assert "say hello world" in task.prompt()
|
|
|
|
assert result.raw == "Hello, World!"
|
|
|
|
|
|
def test_task_copy_with_none_context():
|
|
original_task = Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
context=None
|
|
)
|
|
|
|
new_task = original_task.copy(agents=[], task_mapping={})
|
|
assert original_task.context is None
|
|
assert new_task.context is None
|
|
|
|
|
|
def test_task_copy_with_not_specified_context():
|
|
from crewai.utilities.constants import NOT_SPECIFIED
|
|
original_task = Task(
|
|
description="Test task",
|
|
expected_output="Test output",
|
|
)
|
|
|
|
new_task = original_task.copy(agents=[], task_mapping={})
|
|
assert original_task.context is NOT_SPECIFIED
|
|
assert new_task.context is NOT_SPECIFIED
|
|
|
|
|
|
def test_task_copy_with_list_context():
|
|
"""Test that copying a task with list context works correctly."""
|
|
task1 = Task(
|
|
description="Task 1",
|
|
expected_output="Output 1"
|
|
)
|
|
task2 = Task(
|
|
description="Task 2",
|
|
expected_output="Output 2",
|
|
context=[task1]
|
|
)
|
|
|
|
task_mapping = {task1.key: task1}
|
|
|
|
copied_task2 = task2.copy(agents=[], task_mapping=task_mapping)
|
|
|
|
assert isinstance(copied_task2.context, list)
|
|
assert len(copied_task2.context) == 1
|
|
assert copied_task2.context[0] is task1
|