Compare commits

..

4 Commits

Author SHA1 Message Date
Heitor Sammuel Carvalho
2fd5c46b42 Emit FlowFailedEvent and finalise batch via added event listener 2025-12-18 11:04:46 -03:00
Heitor Sammuel Carvalho
97fed5229f Remove redundant parameter from emit call 2025-12-18 11:03:53 -03:00
Heitor Sammuel Carvalho
0f28d14e61 Move flow trace collection and batch finalisation to event listener 2025-12-18 11:03:10 -03:00
Heitor Sammuel Carvalho
b4b6434480 Add flow failed event 2025-12-18 11:00:16 -03:00
23 changed files with 73 additions and 632 deletions

View File

@@ -12,7 +12,7 @@ dependencies = [
"pytube~=15.0.0",
"requests~=2.32.5",
"docker~=7.1.0",
"crewai==1.7.2",
"crewai==1.7.1",
"lancedb~=0.5.4",
"tiktoken~=0.8.0",
"beautifulsoup4~=4.13.4",

View File

@@ -291,4 +291,4 @@ __all__ = [
"ZapierActionTools",
]
__version__ = "1.7.2"
__version__ = "1.7.1"

View File

@@ -1,5 +1,5 @@
"""Crewai Enterprise Tools."""
import os
import json
import re
from typing import Any, Optional, Union, cast, get_origin
@@ -432,11 +432,7 @@ class CrewAIPlatformActionTool(BaseTool):
payload = cleaned_kwargs
response = requests.post(
url=api_url,
headers=headers,
json=payload,
timeout=60,
verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true",
url=api_url, headers=headers, json=payload, timeout=60
)
data = response.json()

View File

@@ -1,5 +1,5 @@
from typing import Any
import os
from crewai.tools import BaseTool
import requests
@@ -37,7 +37,6 @@ class CrewaiPlatformToolBuilder:
headers=headers,
timeout=30,
params={"apps": ",".join(self._apps)},
verify=os.environ.get("CREWAI_FACTORY", "false").lower() != "true",
)
response.raise_for_status()
except Exception:

View File

@@ -1,6 +1,4 @@
from typing import Union, get_args, get_origin
from unittest.mock import patch, Mock
import os
from crewai_tools.tools.crewai_platform_tools.crewai_platform_action_tool import (
CrewAIPlatformActionTool,
@@ -251,109 +249,3 @@ class TestSchemaProcessing:
result_type = tool._process_schema_type(test_schema, "TestFieldAllOfMixed")
assert result_type is str
class TestCrewAIPlatformActionToolVerify:
"""Test suite for SSL verification behavior based on CREWAI_FACTORY environment variable"""
def setup_method(self):
self.action_schema = {
"function": {
"name": "test_action",
"parameters": {
"properties": {
"test_param": {
"type": "string",
"description": "Test parameter"
}
},
"required": []
}
}
}
def create_test_tool(self):
return CrewAIPlatformActionTool(
description="Test action tool",
action_name="test_action",
action_schema=self.action_schema
)
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token"}, clear=True)
@patch("crewai_tools.tools.crewai_platform_tools.crewai_platform_action_tool.requests.post")
def test_run_with_ssl_verification_default(self, mock_post):
"""Test that _run uses SSL verification by default when CREWAI_FACTORY is not set"""
mock_response = Mock()
mock_response.ok = True
mock_response.json.return_value = {"result": "success"}
mock_post.return_value = mock_response
tool = self.create_test_tool()
tool._run(test_param="test_value")
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args.kwargs["verify"] is True
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "false"}, clear=True)
@patch("crewai_tools.tools.crewai_platform_tools.crewai_platform_action_tool.requests.post")
def test_run_with_ssl_verification_factory_false(self, mock_post):
"""Test that _run uses SSL verification when CREWAI_FACTORY is 'false'"""
mock_response = Mock()
mock_response.ok = True
mock_response.json.return_value = {"result": "success"}
mock_post.return_value = mock_response
tool = self.create_test_tool()
tool._run(test_param="test_value")
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args.kwargs["verify"] is True
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "FALSE"}, clear=True)
@patch("crewai_tools.tools.crewai_platform_tools.crewai_platform_action_tool.requests.post")
def test_run_with_ssl_verification_factory_false_uppercase(self, mock_post):
"""Test that _run uses SSL verification when CREWAI_FACTORY is 'FALSE' (case-insensitive)"""
mock_response = Mock()
mock_response.ok = True
mock_response.json.return_value = {"result": "success"}
mock_post.return_value = mock_response
tool = self.create_test_tool()
tool._run(test_param="test_value")
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args.kwargs["verify"] is True
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "true"}, clear=True)
@patch("crewai_tools.tools.crewai_platform_tools.crewai_platform_action_tool.requests.post")
def test_run_without_ssl_verification_factory_true(self, mock_post):
"""Test that _run disables SSL verification when CREWAI_FACTORY is 'true'"""
mock_response = Mock()
mock_response.ok = True
mock_response.json.return_value = {"result": "success"}
mock_post.return_value = mock_response
tool = self.create_test_tool()
tool._run(test_param="test_value")
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args.kwargs["verify"] is False
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "TRUE"}, clear=True)
@patch("crewai_tools.tools.crewai_platform_tools.crewai_platform_action_tool.requests.post")
def test_run_without_ssl_verification_factory_true_uppercase(self, mock_post):
"""Test that _run disables SSL verification when CREWAI_FACTORY is 'TRUE' (case-insensitive)"""
mock_response = Mock()
mock_response.ok = True
mock_response.json.return_value = {"result": "success"}
mock_post.return_value = mock_response
tool = self.create_test_tool()
tool._run(test_param="test_value")
mock_post.assert_called_once()
call_args = mock_post.call_args
assert call_args.kwargs["verify"] is False

View File

@@ -258,98 +258,3 @@ class TestCrewaiPlatformToolBuilder(unittest.TestCase):
assert "simple_string" in description_text
assert "nested_object" in description_text
assert "array_prop" in description_text
class TestCrewaiPlatformToolBuilderVerify(unittest.TestCase):
"""Test suite for SSL verification behavior in CrewaiPlatformToolBuilder"""
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token"}, clear=True)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
def test_fetch_actions_with_ssl_verification_default(self, mock_get):
"""Test that _fetch_actions uses SSL verification by default when CREWAI_FACTORY is not set"""
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {}}
mock_get.return_value = mock_response
builder = CrewaiPlatformToolBuilder(apps=["github"])
builder._fetch_actions()
mock_get.assert_called_once()
call_args = mock_get.call_args
assert call_args.kwargs["verify"] is True
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "false"}, clear=True)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
def test_fetch_actions_with_ssl_verification_factory_false(self, mock_get):
"""Test that _fetch_actions uses SSL verification when CREWAI_FACTORY is 'false'"""
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {}}
mock_get.return_value = mock_response
builder = CrewaiPlatformToolBuilder(apps=["github"])
builder._fetch_actions()
mock_get.assert_called_once()
call_args = mock_get.call_args
assert call_args.kwargs["verify"] is True
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "FALSE"}, clear=True)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
def test_fetch_actions_with_ssl_verification_factory_false_uppercase(self, mock_get):
"""Test that _fetch_actions uses SSL verification when CREWAI_FACTORY is 'FALSE' (case-insensitive)"""
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {}}
mock_get.return_value = mock_response
builder = CrewaiPlatformToolBuilder(apps=["github"])
builder._fetch_actions()
mock_get.assert_called_once()
call_args = mock_get.call_args
assert call_args.kwargs["verify"] is True
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "true"}, clear=True)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
def test_fetch_actions_without_ssl_verification_factory_true(self, mock_get):
"""Test that _fetch_actions disables SSL verification when CREWAI_FACTORY is 'true'"""
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {}}
mock_get.return_value = mock_response
builder = CrewaiPlatformToolBuilder(apps=["github"])
builder._fetch_actions()
mock_get.assert_called_once()
call_args = mock_get.call_args
assert call_args.kwargs["verify"] is False
@patch.dict("os.environ", {"CREWAI_PLATFORM_INTEGRATION_TOKEN": "test_token", "CREWAI_FACTORY": "TRUE"}, clear=True)
@patch(
"crewai_tools.tools.crewai_platform_tools.crewai_platform_tool_builder.requests.get"
)
def test_fetch_actions_without_ssl_verification_factory_true_uppercase(self, mock_get):
"""Test that _fetch_actions disables SSL verification when CREWAI_FACTORY is 'TRUE' (case-insensitive)"""
mock_response = Mock()
mock_response.raise_for_status.return_value = None
mock_response.json.return_value = {"actions": {}}
mock_get.return_value = mock_response
builder = CrewaiPlatformToolBuilder(apps=["github"])
builder._fetch_actions()
mock_get.assert_called_once()
call_args = mock_get.call_args
assert call_args.kwargs["verify"] is False

View File

@@ -49,7 +49,7 @@ Repository = "https://github.com/crewAIInc/crewAI"
[project.optional-dependencies]
tools = [
"crewai-tools==1.7.2",
"crewai-tools==1.7.1",
]
embeddings = [
"tiktoken~=0.8.0"

View File

@@ -40,7 +40,7 @@ def _suppress_pydantic_deprecation_warnings() -> None:
_suppress_pydantic_deprecation_warnings()
__version__ = "1.7.2"
__version__ = "1.7.1"
_telemetry_submitted = False

View File

@@ -149,9 +149,7 @@ class AuthenticationCommand:
return
if token_data["error"] not in ("authorization_pending", "slow_down"):
raise requests.HTTPError(
token_data.get("error_description") or token_data.get("error")
)
raise requests.HTTPError(token_data["error_description"])
time.sleep(device_code_data["interval"])
attempts += 1

View File

@@ -1,6 +1,6 @@
from typing import Any
from urllib.parse import urljoin
import os
import requests
from crewai.cli.config import Settings
@@ -33,7 +33,9 @@ class PlusAPI:
if settings.org_uuid:
self.headers["X-Crewai-Organization-Id"] = settings.org_uuid
self.base_url = os.getenv("CREWAI_PLUS_URL") or str(settings.enterprise_base_url) or DEFAULT_CREWAI_ENTERPRISE_URL
self.base_url = (
str(settings.enterprise_base_url) or DEFAULT_CREWAI_ENTERPRISE_URL
)
def _make_request(
self, method: str, endpoint: str, **kwargs: Any

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.7.2"
"crewai[tools]==1.7.1"
]
[project.scripts]

View File

@@ -5,7 +5,7 @@ description = "{{name}} using crewAI"
authors = [{ name = "Your Name", email = "you@example.com" }]
requires-python = ">=3.10,<3.14"
dependencies = [
"crewai[tools]==1.7.2"
"crewai[tools]==1.7.1"
]
[project.scripts]

View File

@@ -12,7 +12,6 @@ from rich.console import Console
from crewai.cli import git
from crewai.cli.command import BaseCommand, PlusAPIMixin
from crewai.cli.config import Settings
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai.cli.utils import (
build_env_with_tool_repository_credentials,
extract_available_exports,
@@ -132,13 +131,10 @@ class ToolCommand(BaseCommand, PlusAPIMixin):
self._validate_response(publish_response)
published_handle = publish_response.json()["handle"]
settings = Settings()
base_url = settings.enterprise_base_url or DEFAULT_CREWAI_ENTERPRISE_URL
console.print(
f"Successfully published `{published_handle}` ({project_version}).\n\n"
+ "⚠️ Security checks are running in the background. Your tool will be available once these are complete.\n"
+ f"You can monitor the status or access your tool here:\n{base_url}/crewai_plus/tools/{published_handle}",
+ f"You can monitor the status or access your tool here:\nhttps://app.crewai.com/crewai_plus/tools/{published_handle}",
style="bold green",
)

View File

@@ -957,8 +957,8 @@ class Crew(FlowTrackable, BaseModel):
pending_tasks.append((task, async_task, task_index))
else:
if pending_tasks:
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
task_outputs = await self._aprocess_async_tasks(
pending_tasks, was_replayed
)
pending_tasks.clear()
@@ -973,9 +973,7 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if pending_tasks:
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
return self._create_crew_output(task_outputs)
@@ -989,9 +987,7 @@ class Crew(FlowTrackable, BaseModel):
) -> TaskOutput | None:
"""Handle conditional task evaluation using native async."""
if pending_tasks:
task_outputs.extend(
await self._aprocess_async_tasks(pending_tasks, was_replayed)
)
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
pending_tasks.clear()
return check_conditional_skip(
@@ -1156,7 +1152,7 @@ class Crew(FlowTrackable, BaseModel):
futures.append((task, future, task_index))
else:
if futures:
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
context = self._get_context(task, task_outputs)
@@ -1170,7 +1166,7 @@ class Crew(FlowTrackable, BaseModel):
self._store_execution_log(task, task_output, task_index, was_replayed)
if futures:
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
task_outputs = self._process_async_tasks(futures, was_replayed)
return self._create_crew_output(task_outputs)
@@ -1183,7 +1179,7 @@ class Crew(FlowTrackable, BaseModel):
was_replayed: bool,
) -> TaskOutput | None:
if futures:
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
return check_conditional_skip(

View File

@@ -9,8 +9,6 @@ from rich.console import Console
from rich.panel import Panel
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.config import Settings
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai.cli.plus_api import PlusAPI
from crewai.cli.version import get_crewai_version
from crewai.events.listeners.tracing.types import TraceEvent
@@ -18,6 +16,7 @@ from crewai.events.listeners.tracing.utils import (
is_tracing_enabled_in_context,
should_auto_collect_first_time_traces,
)
from crewai.utilities.constants import CREWAI_BASE_URL
logger = getLogger(__name__)
@@ -327,12 +326,10 @@ class TraceBatchManager:
if response.status_code == 200:
access_code = response.json().get("access_code", None)
console = Console()
settings = Settings()
base_url = settings.enterprise_base_url or DEFAULT_CREWAI_ENTERPRISE_URL
return_link = (
f"{base_url}/crewai_plus/trace_batches/{self.trace_batch_id}"
f"{CREWAI_BASE_URL}/crewai_plus/trace_batches/{self.trace_batch_id}"
if not self.is_current_batch_ephemeral and access_code is None
else f"{base_url}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
else f"{CREWAI_BASE_URL}/crewai_plus/ephemeral_trace_batches/{self.trace_batch_id}?access_code={access_code}"
)
if self.is_current_batch_ephemeral:

View File

@@ -33,6 +33,7 @@ from crewai.events.types.crew_events import (
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFailedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
@@ -194,6 +195,22 @@ class TraceCollectionListener(BaseEventListener):
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source: Any, event: FlowFinishedEvent) -> None:
self._handle_trace_event("flow_finished", source, event)
if self.batch_manager.batch_owner_type == "flow":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(FlowFailedEvent)
def on_flow_failed(source: Any, event: FlowFailedEvent) -> None:
self._handle_trace_event("flow_failed", source, event)
if self.batch_manager.batch_owner_type == "flow":
if self.first_time_handler.is_first_time:
self.first_time_handler.mark_events_collected()
self.first_time_handler.handle_execution_completion()
else:
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source: Any, event: FlowPlotEvent) -> None:

View File

@@ -67,6 +67,16 @@ class FlowFinishedEvent(FlowEvent):
state: dict[str, Any] | BaseModel
class FlowFailedEvent(FlowEvent):
"""Event emitted when a flow fails execution"""
flow_name: str
error: Exception
type: str = "flow_failed"
model_config = ConfigDict(arbitrary_types_allowed=True)
class FlowPlotEvent(FlowEvent):
"""Event emitted when a flow plot is created"""

View File

@@ -40,6 +40,7 @@ from crewai.events.listeners.tracing.utils import (
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFailedEvent,
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
@@ -977,7 +978,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
future = crewai_event_bus.emit(
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
@@ -1005,7 +1005,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
future = crewai_event_bus.emit(
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.name or self.__class__.__name__,
result=final_output,
state=self._copy_and_serialize_state(),
@@ -1020,15 +1019,18 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_output
except Exception as e:
future = crewai_event_bus.emit(
self,
FlowFailedEvent(
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
if future:
self._event_futures.append(future)
raise e
finally:
detach(flow_token)

View File

@@ -30,3 +30,4 @@ NOT_SPECIFIED: Final[
"allows us to distinguish between 'not passed at all' and 'explicitly passed None' or '[]'.",
]
] = _NotSpecified()
CREWAI_BASE_URL: Final[str] = "https://app.crewai.com"

View File

@@ -1,7 +1,7 @@
import os
import unittest
from unittest.mock import ANY, MagicMock, patch
from crewai.cli.constants import DEFAULT_CREWAI_ENTERPRISE_URL
from crewai.cli.plus_api import PlusAPI
@@ -35,7 +35,7 @@ class TestPlusAPI(unittest.TestCase):
):
mock_make_request.assert_called_once_with(
method,
f"{os.getenv('CREWAI_PLUS_URL')}{endpoint}",
f"{DEFAULT_CREWAI_ENTERPRISE_URL}{endpoint}",
headers={
"Authorization": ANY,
"Content-Type": ANY,
@@ -53,7 +53,7 @@ class TestPlusAPI(unittest.TestCase):
):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
@@ -84,7 +84,7 @@ class TestPlusAPI(unittest.TestCase):
def test_get_agent_with_org_uuid(self, mock_make_request, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
@@ -115,7 +115,7 @@ class TestPlusAPI(unittest.TestCase):
def test_get_tool_with_org_uuid(self, mock_make_request, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
@@ -163,7 +163,7 @@ class TestPlusAPI(unittest.TestCase):
def test_publish_tool_with_org_uuid(self, mock_make_request, mock_settings_class):
mock_settings = MagicMock()
mock_settings.org_uuid = self.org_uuid
mock_settings.enterprise_base_url = os.getenv('CREWAI_PLUS_URL')
mock_settings.enterprise_base_url = DEFAULT_CREWAI_ENTERPRISE_URL
mock_settings_class.return_value = mock_settings
# re-initialize Client
self.api = PlusAPI(self.api_key)
@@ -320,7 +320,6 @@ class TestPlusAPI(unittest.TestCase):
)
@patch("crewai.cli.plus_api.Settings")
@patch.dict(os.environ, {"CREWAI_PLUS_URL": ""})
def test_custom_base_url(self, mock_settings_class):
mock_settings = MagicMock()
mock_settings.enterprise_base_url = "https://custom-url.com/api"
@@ -330,11 +329,3 @@ class TestPlusAPI(unittest.TestCase):
custom_api.base_url,
"https://custom-url.com/api",
)
@patch.dict(os.environ, {"CREWAI_PLUS_URL": "https://custom-url-from-env.com"})
def test_custom_base_url_from_env(self):
custom_api = PlusAPI("test_key")
self.assertEqual(
custom_api.base_url,
"https://custom-url-from-env.com",
)

View File

@@ -381,171 +381,4 @@ class TestAsyncProcessAsyncTasks:
async def test_aprocess_async_tasks_empty(self, test_crew: Crew) -> None:
"""Test processing empty list of async tasks."""
result = await test_crew._aprocess_async_tasks([])
assert result == []
class TestMixedSyncAsyncTaskOutputs:
"""Tests for issue #4137: Task outputs lost when mixing sync and async tasks.
These tests verify that when a Crew executes a mix of synchronous and
asynchronous tasks, all task outputs are preserved correctly.
"""
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_async_sync_task_before_async_task_outputs_preserved(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that sync task outputs before async tasks are preserved.
Scenario: sync -> async -> sync
Expected: All 3 task outputs should be in the result.
"""
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=test_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=test_agent,
async_execution=True,
)
task3 = Task(
description="Sync task 3",
expected_output="Output 3",
agent=test_agent,
async_execution=False,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2, task3],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Test Agent",
)
mock_output3 = TaskOutput(
description="Sync task 3",
raw="Result 3",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2, mock_output3]
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
assert result.tasks_output[2].raw == "Result 3"
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_async_crew_ending_with_async_task_preserves_outputs(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that outputs are preserved when crew ends with async task.
Scenario: sync -> async (final)
Expected: Both task outputs should be in the result.
"""
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=test_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=test_agent,
async_execution=True,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Test Agent",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Test Agent",
)
mock_execute.side_effect = [mock_output1, mock_output2]
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert len(result.tasks_output) == 2
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
@pytest.mark.asyncio
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
async def test_async_multiple_sync_before_async_all_preserved(
self, mock_execute: AsyncMock, test_agent: Agent
) -> None:
"""Test that multiple sync task outputs before async are preserved.
Scenario: sync -> sync -> async -> sync
Expected: All 4 task outputs should be in the result.
"""
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=test_agent,
async_execution=False,
)
task2 = Task(
description="Sync task 2",
expected_output="Output 2",
agent=test_agent,
async_execution=False,
)
task3 = Task(
description="Async task 3",
expected_output="Output 3",
agent=test_agent,
async_execution=True,
)
task4 = Task(
description="Sync task 4",
expected_output="Output 4",
agent=test_agent,
async_execution=False,
)
crew = Crew(
agents=[test_agent],
tasks=[task1, task2, task3, task4],
verbose=False,
)
mock_outputs = [
TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Test Agent")
for i in range(1, 5)
]
mock_execute.side_effect = mock_outputs
result = await crew._aexecute_tasks(crew.tasks)
assert result is not None
assert len(result.tasks_output) == 4
for i in range(4):
assert result.tasks_output[i].raw == f"Result {i + 1}"
assert result == []

View File

@@ -1251,200 +1251,6 @@ async def test_async_task_execution_call_count(researcher, writer):
assert mock_execute_sync.call_count == 1
def test_sync_task_outputs_preserved_when_mixing_sync_async_tasks():
"""Test for issue #4137: Task outputs lost when mixing sync and async tasks.
Scenario: sync -> async -> sync
Expected: All 3 task outputs should be in the result.
"""
researcher_agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
allow_delegation=False,
)
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=researcher_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=researcher_agent,
async_execution=True,
)
task3 = Task(
description="Sync task 3",
expected_output="Output 3",
agent=researcher_agent,
async_execution=False,
)
crew = Crew(
agents=[researcher_agent],
tasks=[task1, task2, task3],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Researcher",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Researcher",
)
mock_output3 = TaskOutput(
description="Sync task 3",
raw="Result 3",
agent="Researcher",
)
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = mock_output2
with (
patch.object(Task, "execute_sync", side_effect=[mock_output1, mock_output3]),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert result is not None
assert len(result.tasks_output) == 3
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
assert result.tasks_output[2].raw == "Result 3"
def test_sync_task_outputs_preserved_when_crew_ends_with_async_task():
"""Test for issue #4137: Task outputs preserved when crew ends with async task.
Scenario: sync -> async (final)
Expected: Both task outputs should be in the result.
"""
researcher_agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
allow_delegation=False,
)
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=researcher_agent,
async_execution=False,
)
task2 = Task(
description="Async task 2",
expected_output="Output 2",
agent=researcher_agent,
async_execution=True,
)
crew = Crew(
agents=[researcher_agent],
tasks=[task1, task2],
verbose=False,
)
mock_output1 = TaskOutput(
description="Sync task 1",
raw="Result 1",
agent="Researcher",
)
mock_output2 = TaskOutput(
description="Async task 2",
raw="Result 2",
agent="Researcher",
)
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = mock_output2
with (
patch.object(Task, "execute_sync", return_value=mock_output1),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert result is not None
assert len(result.tasks_output) == 2
assert result.tasks_output[0].raw == "Result 1"
assert result.tasks_output[1].raw == "Result 2"
def test_sync_multiple_sync_tasks_before_async_all_preserved():
"""Test for issue #4137: Multiple sync task outputs before async are preserved.
Scenario: sync -> sync -> async -> sync
Expected: All 4 task outputs should be in the result.
"""
researcher_agent = Agent(
role="Researcher",
goal="Research topics",
backstory="Expert researcher",
allow_delegation=False,
)
task1 = Task(
description="Sync task 1",
expected_output="Output 1",
agent=researcher_agent,
async_execution=False,
)
task2 = Task(
description="Sync task 2",
expected_output="Output 2",
agent=researcher_agent,
async_execution=False,
)
task3 = Task(
description="Async task 3",
expected_output="Output 3",
agent=researcher_agent,
async_execution=True,
)
task4 = Task(
description="Sync task 4",
expected_output="Output 4",
agent=researcher_agent,
async_execution=False,
)
crew = Crew(
agents=[researcher_agent],
tasks=[task1, task2, task3, task4],
verbose=False,
)
mock_outputs = [
TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Researcher")
for i in range(1, 5)
]
mock_future = MagicMock(spec=Future)
mock_future.result.return_value = mock_outputs[2]
with (
patch.object(
Task, "execute_sync", side_effect=[mock_outputs[0], mock_outputs[1], mock_outputs[3]]
),
patch.object(Task, "execute_async", return_value=mock_future),
):
result = crew.kickoff()
assert result is not None
assert len(result.tasks_output) == 4
for i in range(4):
assert result.tasks_output[i].raw == f"Result {i + 1}"
@pytest.mark.vcr()
def test_kickoff_for_each_single_input():
"""Tests if kickoff_for_each works with a single input."""

View File

@@ -1,3 +1,3 @@
"""CrewAI development tools."""
__version__ = "1.7.2"
__version__ = "1.7.1"