Compare commits

..

8 Commits

Author SHA1 Message Date
Greyson LaLonde
9f921cba86 fix: docs import path for json search tool
- updated import path to crewai-tools
- removed old comment
2025-04-17 07:47:21 -07:00
Lucas Gomide
a96a27f064 docs: fix guardrail documentation usage (#2630) 2025-04-17 10:34:50 -04:00
Vidit Ostwal
a2f3566cd9 Pr branch (#2312)
* Adjust checking for callable crew object.

Changes back to how it was being done before.
Fixes #2307

* Fix specific memory reset errors.

When not initiated, the function should raise
the "memory system is not initialized" RuntimeError.

* Remove print statement

* Fixes test case

---------

Co-authored-by: Carlos Souza <carloshrsouza@gmail.com>
2025-04-17 08:59:15 -04:00
Greyson LaLonde
e655412aca refactor: create constants.py & use in telemetry (#2627)
Some checks are pending
Notify Downstream / notify-downstream (push) Waiting to run
- created `constants.py` for telemetry base url and service name
- updated `telemetry.py` to reflect changes
- ran ruff --fix to apply lint fixes
2025-04-16 12:46:15 -07:00
Lorenze Jay
1d91ab5d1b fix: pass original agent reference to lite agent initialization (#2625)
Some checks are pending
Notify Downstream / notify-downstream (push) Waiting to run
2025-04-16 10:05:09 -07:00
Vini Brasil
37359a34f0 Remove redundant comment from sqlite.py (#2622) 2025-04-16 11:25:41 -03:00
Vini Brasil
6eb4045339 Update .github/workflows/notify-downstream.yml (#2621) 2025-04-16 10:39:51 -03:00
Vini Brasil
aebbc75dea Notify downstream repo of changes (#2615)
* Notify downstream repo of changes

* Add permissions block
2025-04-16 10:18:26 -03:00
11 changed files with 77 additions and 85 deletions

33
.github/workflows/notify-downstream.yml vendored Normal file
View File

@@ -0,0 +1,33 @@
name: Notify Downstream
on:
push:
branches:
- main
permissions:
contents: read
jobs:
notify-downstream:
runs-on: ubuntu-latest
steps:
- name: Generate GitHub App token
id: app-token
uses: tibdex/github-app-token@v2
with:
app_id: ${{ secrets.OSS_SYNC_APP_ID }}
private_key: ${{ secrets.OSS_SYNC_APP_PRIVATE_KEY }}
- name: Notify Repo B
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.app-token.outputs.token }}
repository: ${{ secrets.OSS_SYNC_DOWNSTREAM_REPO }}
event-type: upstream-commit
client-payload: |
{
"commit_sha": "${{ github.sha }}"
}

View File

@@ -288,26 +288,20 @@ To add a guardrail to a task, provide a validation function through the `guardra
```python Code
from typing import Tuple, Union, Dict, Any
from crewai import TaskOutput
def validate_blog_content(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
def validate_blog_content(result: TaskOutput) -> Tuple[bool, Any]:
"""Validate blog content meets requirements."""
try:
# Check word count
word_count = len(result.split())
if word_count > 200:
return (False, {
"error": "Blog content exceeds 200 words",
"code": "WORD_COUNT_ERROR",
"context": {"word_count": word_count}
})
return (False, "Blog content exceeds 200 words")
# Additional validation logic here
return (True, result.strip())
except Exception as e:
return (False, {
"error": "Unexpected error during validation",
"code": "SYSTEM_ERROR"
})
return (False, "Unexpected error during validation")
blog_task = Task(
description="Write a blog post about AI",
@@ -325,29 +319,24 @@ blog_task = Task(
- Type hints are recommended but optional
2. **Return Values**:
- Success: Return `(True, validated_result)`
- Failure: Return `(False, error_details)`
- On success: it returns a tuple of `(bool, Any)`. For example: `(True, validated_result)`
- On Failure: it returns a tuple of `(bool, str)`. For example: `(False, "Error message explain the failure")`
### Error Handling Best Practices
1. **Structured Error Responses**:
```python Code
def validate_with_context(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
from crewai import TaskOutput
def validate_with_context(result: TaskOutput) -> Tuple[bool, Any]:
try:
# Main validation logic
validated_data = perform_validation(result)
return (True, validated_data)
except ValidationError as e:
return (False, {
"error": str(e),
"code": "VALIDATION_ERROR",
"context": {"input": result}
})
return (False, f"VALIDATION_ERROR: {str(e)}")
except Exception as e:
return (False, {
"error": "Unexpected error",
"code": "SYSTEM_ERROR"
})
return (False, str(e))
```
2. **Error Categories**:
@@ -358,28 +347,25 @@ def validate_with_context(result: str) -> Tuple[bool, Union[Dict[str, Any], str]
3. **Validation Chain**:
```python Code
from typing import Any, Dict, List, Tuple, Union
from crewai import TaskOutput
def complex_validation(result: str) -> Tuple[bool, Union[str, Dict[str, Any]]]:
def complex_validation(result: TaskOutput) -> Tuple[bool, Any]:
"""Chain multiple validation steps."""
# Step 1: Basic validation
if not result:
return (False, {"error": "Empty result", "code": "EMPTY_INPUT"})
return (False, "Empty result")
# Step 2: Content validation
try:
validated = validate_content(result)
if not validated:
return (False, {"error": "Invalid content", "code": "CONTENT_ERROR"})
return (False, "Invalid content")
# Step 3: Format validation
formatted = format_output(validated)
return (True, formatted)
except Exception as e:
return (False, {
"error": str(e),
"code": "VALIDATION_ERROR",
"context": {"step": "content_validation"}
})
return (False, str(e))
```
### Handling Guardrail Results
@@ -394,19 +380,16 @@ When a guardrail returns `(False, error)`:
Example with retry handling:
```python Code
from typing import Optional, Tuple, Union
from crewai import TaskOutput, Task
def validate_json_output(result: str) -> Tuple[bool, Union[Dict[str, Any], str]]:
def validate_json_output(result: TaskOutput) -> Tuple[bool, Any]:
"""Validate and parse JSON output."""
try:
# Try to parse as JSON
data = json.loads(result)
return (True, data)
except json.JSONDecodeError as e:
return (False, {
"error": "Invalid JSON format",
"code": "JSON_ERROR",
"context": {"line": e.lineno, "column": e.colno}
})
return (False, "Invalid JSON format")
task = Task(
description="Generate a JSON report",

View File

@@ -30,7 +30,7 @@ pip install 'crewai[tools]'
Here are updated examples on how to utilize the JSONSearchTool effectively for searching within JSON files. These examples take into account the current implementation and usage patterns identified in the codebase.
```python Code
from crewai.json_tools import JSONSearchTool # Updated import path
from crewai_tools import JSONSearchTool
# General JSON content search
# This approach is suitable when the JSON path is either known beforehand or can be dynamically identified.

View File

@@ -535,6 +535,7 @@ class Agent(BaseAgent):
verbose=self.verbose,
response_format=response_format,
i18n=self.i18n,
original_agent=self,
)
return await lite_agent.kickoff_async(messages)

View File

@@ -273,11 +273,9 @@ def get_crew(crew_path: str = "crew.py", require: bool = False) -> Crew | None:
for attr_name in dir(module):
attr = getattr(module, attr_name)
try:
if isinstance(attr, Crew) and hasattr(attr, "kickoff"):
print(
f"Found valid crew object in attribute '{attr_name}' at {crew_os_path}."
)
return attr
if callable(attr) and hasattr(attr, "crew"):
crew_instance = attr().crew()
return crew_instance
except Exception as e:
print(f"Error processing attribute {attr_name}: {e}")

View File

@@ -1399,12 +1399,12 @@ class Crew(BaseModel):
RuntimeError: If the specified memory system fails to reset
"""
reset_functions = {
"long": (self._long_term_memory, "long term"),
"short": (self._short_term_memory, "short term"),
"entity": (self._entity_memory, "entity"),
"knowledge": (self.knowledge, "knowledge"),
"kickoff_outputs": (self._task_output_handler, "task output"),
"external": (self._external_memory, "external"),
"long": (getattr(self, "_long_term_memory", None), "long term"),
"short": (getattr(self, "_short_term_memory", None), "short term"),
"entity": (getattr(self, "_entity_memory", None), "entity"),
"knowledge": (getattr(self, "knowledge", None), "knowledge"),
"kickoff_outputs": (getattr(self, "_task_output_handler", None), "task output"),
"external": (getattr(self, "_external_memory", None), "external"),
}
memory_system, name = reset_functions[memory_type]

View File

@@ -719,10 +719,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
async def run_flow():
return await self.kickoff_async(inputs)
try:
return asyncio.run(run_flow())
except KeyboardInterrupt:
raise
return asyncio.run(run_flow())
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = None) -> Any:
"""

View File

@@ -21,7 +21,7 @@ class SQLiteFlowPersistence(FlowPersistence):
moderate performance requirements.
"""
db_path: str # Type annotation for instance variable
db_path: str
def __init__(self, db_path: Optional[str] = None):
"""Initialize SQLite persistence.

View File

@@ -0,0 +1,2 @@
CREWAI_TELEMETRY_BASE_URL: str = "https://telemetry.crewai.com:4319"
CREWAI_TELEMETRY_SERVICE_NAME: str = "crewAI-telemetry"

View File

@@ -9,6 +9,11 @@ from contextlib import contextmanager
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Optional
from crewai.telemetry.constants import (
CREWAI_TELEMETRY_BASE_URL,
CREWAI_TELEMETRY_SERVICE_NAME,
)
@contextmanager
def suppress_warnings():
@@ -52,16 +57,15 @@ class Telemetry:
return
try:
telemetry_endpoint = "https://telemetry.crewai.com:4319"
self.resource = Resource(
attributes={SERVICE_NAME: "crewAI-telemetry"},
attributes={SERVICE_NAME: CREWAI_TELEMETRY_SERVICE_NAME},
)
with suppress_warnings():
self.provider = TracerProvider(resource=self.resource)
processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"{telemetry_endpoint}/v1/traces",
endpoint=f"{CREWAI_TELEMETRY_BASE_URL}/v1/traces",
timeout=30,
)
)
@@ -75,12 +79,12 @@ class Telemetry:
):
raise # Re-raise the exception to not interfere with system signals
self.ready = False
def _is_telemetry_disabled(self) -> bool:
"""Check if telemetry should be disabled based on environment variables."""
return (
os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true" or
os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true"
os.getenv("OTEL_SDK_DISABLED", "false").lower() == "true"
or os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true"
)
def set_tracer(self):

View File

@@ -755,29 +755,3 @@ def test_multiple_routers_from_same_trigger():
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)
def test_flow_keyboard_interrupt_handling():
"""Test that a flow properly terminates when a keyboard interrupt is received."""
execution_order = []
class KeyboardInterruptFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
@listen(step_1)
def step_2(self):
execution_order.append("step_2")
raise KeyboardInterrupt()
@listen(step_2)
def step_3(self):
execution_order.append("step_3")
flow = KeyboardInterruptFlow()
with pytest.raises(KeyboardInterrupt):
flow.kickoff()
assert execution_order == ["step_1", "step_2"]