Compare commits

...

8 Commits

Author SHA1 Message Date
Devin AI
8ab236a68e Fix lint issues: remove unused imports
- Remove unused 'Crew' import from both test files
- Remove unused 'pytest' import from test_langdb_documentation.py
- Keep only imports that are actually used in the code

Fixes lint check failure in PR #3241

Co-Authored-By: João <joao@crewai.com>
2025-07-30 10:06:59 +00:00
Devin AI
dce11df0b7 Add LangDB AI Gateway documentation to observability section
- Add comprehensive LangDB documentation following Portkey pattern
- Include installation, configuration, and integration examples
- Add LangDB card to observability overview page
- Include tests for documentation examples
- Addresses issue #3240: Feature request for LangDB observability docs

Features documented:
- Complete end-to-end tracing of agent interactions
- Real-time cost monitoring and optimization
- Performance analytics with detailed metrics
- Enterprise security and governance features
- Multi-environment setup configurations
- Advanced metadata and filtering capabilities

Co-Authored-By: João <joao@crewai.com>
2025-07-30 10:02:49 +00:00
Lorenze Jay
cb522cf500 Enhance Flow class to support custom flow names (#3234)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Added an optional `name` attribute to the Flow class for better identification.
- Updated event emissions to utilize the new `name` attribute, ensuring accurate flow naming in events.
- Added tests to verify the correct flow name is set and emitted during flow execution.
2025-07-29 15:41:30 -07:00
Vini Brasil
017acc74f5 Add timezone to event timestamps (#3231)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
Events were lacking timezone information, making them naive datetimes,
which can be ambiguous.
2025-07-28 17:09:06 -03:00
Greyson LaLonde
fab86d197a Refactor: Move RAG components to dedicated top-level module (#3222)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Move RAG components to top-level module

- Create src/crewai/rag directory structure
- Move embeddings configurator from utilities to rag module
- Update imports across codebase and documentation
- Remove deprecated embedding files

* Remove empty knowledge/embedder directory
2025-07-25 10:55:31 -04:00
Vidit Ostwal
864e9bfb76 Changed the default value in Mem0 config (#3216)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Changed the default value in Mem0 config

* Added regression test for this

* Fixed Linting issues
2025-07-24 13:20:18 -04:00
Lucas Gomide
d3b45d197c fix: remove crewai signup references, replaced by crewai login (#3213) 2025-07-24 07:47:35 -04:00
Manuka Yasas
579153b070 docs: fix incorrect model naming in Google Vertex AI documentation (#3189)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
- Change model format from "gemini/gemini-1.5-pro-latest" to "gemini-1.5-pro-latest"
  in Vertex AI section examples
- Update both English and Portuguese documentation files
- Fixes incorrect provider prefix usage for Vertex AI models
- Ensures consistency with Vertex AI provider requirements

Files changed:
- docs/en/concepts/llms.mdx (line 272)
- docs/pt-BR/concepts/llms.mdx (line 270)

Co-authored-by: Tony Kipkemboi <iamtonykipkemboi@gmail.com>
2025-07-23 16:58:57 -04:00
26 changed files with 736 additions and 92 deletions

View File

@@ -270,7 +270,7 @@ In this section, you'll find detailed examples that help you select, configure,
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -623,7 +623,7 @@ for provider in providers_to_test:
**Model not found errors:**
```python
# Verify model availability
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:
@@ -720,7 +720,7 @@ crew = Crew(
```
### Advanced Mem0 Configuration
When using Mem0 Client, you can customize the memory configuration further, by using parameters like 'includes', 'excludes', 'custom_categories' and 'run_id' (this is only for short-term memory).
When using Mem0 Client, you can customize the memory configuration further, by using parameters like 'includes', 'excludes', 'custom_categories', 'infer' and 'run_id' (this is only for short-term memory).
You can find more details in the [Mem0 documentation](https://docs.mem0.ai/).
```python
@@ -744,7 +744,7 @@ crew = Crew(
"run_id": "my_run_id", # Optional - for short-term memory
"includes": "include1", # Optional
"excludes": "exclude1", # Optional
"infer": True
"infer": True # Optional defaults to True
"custom_categories": new_categories # Optional - custom categories for user memory
},
"user_memory": {}
@@ -776,7 +776,7 @@ crew = Crew(
"config": {"api_key": "your-api-key", "model": "text-embedding-3-small"}
}
},
"infer": True
"infer": True # Optional defaults to True
},
"user_memory": {}
}

View File

@@ -0,0 +1,356 @@
---
title: LangDB Integration
description: How to use LangDB AI Gateway with CrewAI
icon: database
---
<img src="https://raw.githubusercontent.com/LangDB/assets/main/langdb-crewai-header.png" alt="LangDB CrewAI Header Image" width="70%" />
## Introduction
LangDB is the fastest enterprise AI gateway that enhances CrewAI with production-ready observability and optimization features. It provides:
- **Complete end-to-end tracing** of every agent interaction and LLM call
- **Real-time cost monitoring** and optimization across 250+ LLMs
- **Performance analytics** with detailed metrics and insights
- **Secure governance** for enterprise AI deployments
- **OpenAI-compatible APIs** for seamless integration
- **Fine-grained control** over agent workflows and resource usage
### Installation & Setup
<Steps>
<Step title="Install the required packages">
```bash
pip install -U crewai langdb
```
</Step>
<Step title="Set up environment variables" icon="lock">
Configure your LangDB credentials from the [LangDB dashboard](https://app.langdb.ai/):
```bash
export LANGDB_API_KEY="your_langdb_api_key"
export LANGDB_PROJECT_ID="your_project_id"
```
</Step>
<Step title="Initialize LangDB with CrewAI">
The integration requires a single initialization call before creating your agents:
```python
from langdb import LangDB
from crewai import Agent, Task, Crew, LLM
# Initialize LangDB tracing
LangDB.init()
# Create LLM instance - LangDB automatically traces all calls
llm = LLM(
model="gpt-4o",
temperature=0.7
)
# Create your agents as usual
@agent
def research_agent(self) -> Agent:
return Agent(
role="Senior Research Analyst",
goal="Conduct comprehensive research on assigned topics",
backstory="You are an expert researcher with deep analytical skills.",
llm=llm,
verbose=True
)
```
</Step>
</Steps>
## Key Features
### 1. Comprehensive Observability
LangDB provides complete visibility into your CrewAI agent workflows with minimal setup overhead.
<Tabs>
<Tab title="Request Tracing">
LangDB automatically captures every LLM interaction in your crew execution:
```python
from langdb import LangDB
from crewai import Agent, Task, Crew, LLM
# Initialize with custom trace metadata
LangDB.init(
metadata={
"environment": "production",
"crew_type": "research_workflow",
"user_id": "user_123"
}
)
# All agent interactions are automatically traced
crew = Crew(
agents=[research_agent, writer_agent],
tasks=[research_task, writing_task],
verbose=True
)
# Execute with full tracing
result = crew.kickoff(inputs={"topic": "AI trends 2025"})
```
View detailed traces in the LangDB dashboard showing:
- Complete agent conversation flows
- Tool usage and function calls
- Task execution timelines
- LLM request/response pairs
</Tab>
<Tab title="Performance Metrics">
LangDB tracks comprehensive performance metrics for your crews:
- **Execution Time**: Total and per-task execution duration
- **Token Usage**: Input/output tokens for cost optimization
- **Success Rates**: Task completion and failure analytics
- **Latency Analysis**: Response times and bottleneck identification
```python
# Access metrics programmatically
from langdb import LangDB
# Get crew execution metrics
metrics = LangDB.get_metrics(
project_id="your_project_id",
filters={
"crew_type": "research_workflow",
"time_range": "last_24h"
}
)
print(f"Average execution time: {metrics.avg_execution_time}")
print(f"Total cost: ${metrics.total_cost}")
print(f"Success rate: {metrics.success_rate}%")
```
</Tab>
<Tab title="Cost Monitoring">
Track and optimize AI spending across your CrewAI deployments:
```python
from langdb import LangDB
# Initialize with cost tracking
LangDB.init(
cost_tracking=True,
budget_alerts={
"daily_limit": 100.0, # $100 daily limit
"alert_threshold": 0.8 # Alert at 80% of limit
}
)
# LangDB automatically tracks costs for all LLM calls
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
# View cost breakdown
cost_report = LangDB.get_cost_report(
breakdown_by=["model", "agent", "task"]
)
```
Features include:
- Real-time cost tracking across all models
- Budget alerts and spending limits
- Cost optimization recommendations
- Detailed cost attribution by agent and task
</Tab>
</Tabs>
### 2. Advanced Analytics & Insights
LangDB provides powerful analytics to optimize your CrewAI workflows.
<Tabs>
<Tab title="Agent Performance Analysis">
Analyze individual agent performance and identify optimization opportunities:
```python
from langdb import LangDB
# Get agent-specific analytics
analytics = LangDB.get_agent_analytics(
agent_role="Senior Research Analyst",
time_range="last_week"
)
print(f"Average task completion time: {analytics.avg_completion_time}")
print(f"Most used tools: {analytics.top_tools}")
print(f"Success rate: {analytics.success_rate}%")
print(f"Cost per task: ${analytics.cost_per_task}")
```
</Tab>
<Tab title="Workflow Optimization">
Identify bottlenecks and optimization opportunities in your crew workflows:
```python
# Analyze crew workflow patterns
workflow_analysis = LangDB.analyze_workflow(
crew_id="research_crew_v1",
optimization_focus=["speed", "cost", "quality"]
)
# Get optimization recommendations
recommendations = workflow_analysis.recommendations
for rec in recommendations:
print(f"Optimization: {rec.type}")
print(f"Potential savings: {rec.estimated_savings}")
print(f"Implementation: {rec.implementation_guide}")
```
</Tab>
</Tabs>
### 3. Production-Ready Features
<CardGroup cols="2">
<Card title="Error Monitoring" icon="exclamation-triangle" href="https://docs.langdb.ai/features/error-monitoring">
Automatic detection and alerting for agent failures, LLM errors, and workflow issues.
</Card>
<Card title="Rate Limiting" icon="gauge" href="https://docs.langdb.ai/features/rate-limiting">
Intelligent rate limiting to prevent API quota exhaustion and optimize throughput.
</Card>
<Card title="Caching" icon="bolt" href="https://docs.langdb.ai/features/caching">
Smart caching of LLM responses to reduce costs and improve response times.
</Card>
<Card title="Load Balancing" icon="scale-balanced" href="https://docs.langdb.ai/features/load-balancing">
Distribute requests across multiple LLM providers for reliability and performance.
</Card>
</CardGroup>
### 4. Enterprise Security & Governance
LangDB provides enterprise-grade security features for production CrewAI deployments:
```python
from langdb import LangDB
# Initialize with security configurations
LangDB.init(
security_config={
"pii_detection": True,
"content_filtering": True,
"audit_logging": True,
"data_retention_days": 90
}
)
# All crew interactions are automatically secured
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
```
Security features include:
- **PII Detection**: Automatic detection and redaction of sensitive information
- **Content Filtering**: Block inappropriate or harmful content
- **Audit Logging**: Complete audit trails for compliance
- **Data Governance**: Configurable data retention and privacy controls
## Advanced Configuration
### Custom Metadata and Filtering
Add custom metadata to enable powerful filtering and analytics:
```python
from langdb import LangDB
from crewai import Agent, Crew, Task
# Initialize with rich metadata
LangDB.init(
metadata={
"environment": "production",
"team": "research_team",
"version": "v2.1.0",
"customer_tier": "enterprise"
}
)
# Add task-specific metadata
@task
def research_task(self) -> Task:
return Task(
description="Research the latest AI trends",
expected_output="Comprehensive research report",
agent=research_agent,
metadata={
"task_type": "research",
"priority": "high",
"estimated_duration": "30min"
}
)
```
### Multi-Environment Setup
Configure different LangDB projects for different environments:
```python
import os
from langdb import LangDB
# Environment-specific configuration
environment = os.getenv("ENVIRONMENT", "development")
if environment == "production":
LangDB.init(
project_id="prod_project_id",
sampling_rate=1.0, # Trace all requests
cost_tracking=True
)
elif environment == "staging":
LangDB.init(
project_id="staging_project_id",
sampling_rate=0.5, # Sample 50% of requests
cost_tracking=False
)
else:
LangDB.init(
project_id="dev_project_id",
sampling_rate=0.1, # Sample 10% of requests
cost_tracking=False
)
```
## Best Practices
### Development Phase
- Use detailed tracing to understand agent behavior patterns
- Monitor resource usage during testing and development
- Set up cost alerts to prevent unexpected spending
- Implement comprehensive error handling and monitoring
### Production Phase
- Enable full request tracing for complete observability
- Set up automated alerts for performance degradation
- Implement cost optimization strategies based on analytics
- Use metadata for detailed filtering and analysis
### Continuous Improvement
- Regular performance reviews using LangDB analytics
- A/B testing of different agent configurations
- Cost optimization based on usage patterns
- Workflow optimization using bottleneck analysis
## Getting Started
1. **Sign up** for a LangDB account at [app.langdb.ai](https://app.langdb.ai)
2. **Install** the LangDB package: `pip install langdb`
3. **Initialize** LangDB in your CrewAI application
4. **Deploy** your crews with automatic observability
5. **Monitor** and optimize using the LangDB dashboard
<Card title="LangDB Documentation" icon="book" href="https://docs.langdb.ai">
Explore comprehensive LangDB documentation and advanced features
</Card>
LangDB transforms your CrewAI agents into production-ready, observable, and optimized AI workflows with minimal code changes and maximum insights.

View File

@@ -56,6 +56,10 @@ Observability is crucial for understanding how your CrewAI agents perform, ident
<Card title="Weave" icon="network-wired" href="/en/observability/weave">
Weights & Biases platform for tracking and evaluating AI applications.
</Card>
<Card title="LangDB" icon="database" href="/en/observability/langdb">
Enterprise AI gateway with comprehensive tracing, cost optimization, and performance analytics.
</Card>
</CardGroup>
### Evaluation & Quality Assurance

View File

@@ -84,8 +84,8 @@ filename = "seu_modelo.pkl"
try:
SuaCrew().crew().train(
n_iterations=n_iterations,
inputs=inputs,
n_iterations=n_iterations,
inputs=inputs,
filename=filename
)
except Exception as e:
@@ -103,7 +103,7 @@ crewai replay [OPTIONS]
- `-t, --task_id TEXT`: Reexecuta o crew a partir deste task ID, incluindo todas as tarefas subsequentes
Exemplo:
```shell Terminal
```shell Terminal
crewai replay -t task_123456
```
@@ -149,7 +149,7 @@ crewai test [OPTIONS]
- `-m, --model TEXT`: Modelo LLM para executar os testes no Crew (padrão: "gpt-4o-mini")
Exemplo:
```shell Terminal
```shell Terminal
crewai test -n 5 -m gpt-3.5-turbo
```
@@ -203,10 +203,7 @@ def crew(self) -> Crew:
Implemente o crew ou flow no [CrewAI Enterprise](https://app.crewai.com).
- **Autenticação**: Você precisa estar autenticado para implementar no CrewAI Enterprise.
```shell Terminal
crewai signup
```
Caso já tenha uma conta, você pode fazer login com:
Você pode fazer login ou criar uma conta com:
```shell Terminal
crewai login
```
@@ -253,7 +250,7 @@ Você deve estar autenticado no CrewAI Enterprise para usar estes comandos de ge
- **Implantar o Crew**: Depois de autenticado, você pode implantar seu crew ou flow no CrewAI Enterprise.
```shell Terminal
crewai deploy push
```
```
- Inicia o processo de deployment na plataforma CrewAI Enterprise.
- Após a iniciação bem-sucedida, será exibida a mensagem Deployment created successfully! juntamente com o Nome do Deployment e um Deployment ID (UUID) único.
@@ -326,4 +323,4 @@ Ao escolher um provedor, o CLI solicitará que você informe o nome da chave e a
Veja o seguinte link para o nome de chave de cada provedor:
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)
* [LiteLLM Providers](https://docs.litellm.ai/docs/providers)

View File

@@ -268,7 +268,7 @@ Nesta seção, você encontrará exemplos detalhados que ajudam a selecionar, co
from crewai import LLM
llm = LLM(
model="gemini/gemini-1.5-pro-latest",
model="gemini-1.5-pro-latest", # or vertex_ai/gemini-1.5-pro-latest
temperature=0.7,
vertex_credentials=vertex_credentials_json
)

View File

@@ -623,7 +623,7 @@ for provider in providers_to_test:
**Erros de modelo não encontrado:**
```python
# Verifique disponibilidade do modelo
from crewai.utilities.embedding_configurator import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
configurator = EmbeddingConfigurator()
try:

View File

@@ -26,7 +26,7 @@ class PlusAPIMixin:
"Please sign up/login to CrewAI+ before using the CLI.",
style="bold red",
)
console.print("Run 'crewai signup' to sign up/login.", style="bold green")
console.print("Run 'crewai login' to sign up/login.", style="bold green")
raise SystemExit
def _validate_response(self, response: requests.Response) -> None:

View File

@@ -436,6 +436,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
_routers: Set[str] = set()
_router_paths: Dict[str, List[str]] = {}
initial_state: Union[Type[T], T, None] = None
name: Optional[str] = None
def __class_getitem__(cls: Type["Flow"], item: Type[T]) -> Type["Flow"]:
class _FlowGeneric(cls): # type: ignore
@@ -473,7 +474,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowCreatedEvent(
type="flow_created",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
@@ -769,7 +770,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowStartedEvent(
type="flow_started",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
inputs=inputs,
),
)
@@ -792,7 +793,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowFinishedEvent(
type="flow_finished",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
result=final_output,
),
)
@@ -834,7 +835,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionStartedEvent(
type="method_execution_started",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
params=dumped_params,
state=self._copy_state(),
),
@@ -856,7 +857,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFinishedEvent(
type="method_execution_finished",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
state=self._copy_state(),
result=result,
),
@@ -869,7 +870,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
MethodExecutionFailedEvent(
type="method_execution_failed",
method_name=method_name,
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
error=e,
),
)
@@ -1076,7 +1077,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self,
FlowPlotEvent(
type="flow_plot",
flow_name=self.__class__.__name__,
flow_name=self.name or self.__class__.__name__,
),
)
plot_flow(self, filename)

View File

@@ -1,55 +0,0 @@
from abc import ABC, abstractmethod
from typing import List
import numpy as np
class BaseEmbedder(ABC):
"""
Abstract base class for text embedding models
"""
@abstractmethod
def embed_chunks(self, chunks: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of text chunks
Args:
chunks: List of text chunks to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_texts(self, texts: List[str]) -> np.ndarray:
"""
Generate embeddings for a list of texts
Args:
texts: List of texts to embed
Returns:
Array of embeddings
"""
pass
@abstractmethod
def embed_text(self, text: str) -> np.ndarray:
"""
Generate embedding for a single text
Args:
text: Text to embed
Returns:
Embedding array
"""
pass
@property
@abstractmethod
def dimension(self) -> int:
"""Get the dimension of the embeddings"""
pass

View File

@@ -13,7 +13,7 @@ from chromadb.api.types import OneOrMany
from chromadb.config import Settings
from crewai.knowledge.storage.base_knowledge_storage import BaseKnowledgeStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import sanitize_collection_name
from crewai.utilities.constants import KNOWLEDGE_DIRECTORY
from crewai.utilities.logger import Logger

View File

@@ -43,7 +43,7 @@ class Mem0Storage(Storage):
self.includes = cfg.get("includes")
self.excludes = cfg.get("excludes")
self.custom_categories = cfg.get("custom_categories")
self.infer = cfg.get("infer", False)
self.infer = cfg.get("infer", True)
def _initialize_memory(self):
api_key = self.config.get("api_key") or os.getenv("MEM0_API_KEY")

View File

@@ -7,8 +7,8 @@ import uuid
from typing import Any, Dict, List, Optional
from chromadb.api import ClientAPI
from crewai.memory.storage.base_rag_storage import BaseRAGStorage
from crewai.utilities import EmbeddingConfigurator
from crewai.rag.storage.base_rag_storage import BaseRAGStorage
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
from crewai.utilities.chromadb import create_persistent_client
from crewai.utilities.constants import MAX_FILE_NAME_LENGTH
from crewai.utilities.paths import db_storage_path

View File

@@ -0,0 +1 @@
"""RAG (Retrieval-Augmented Generation) infrastructure for CrewAI."""

View File

@@ -0,0 +1 @@
"""Embedding components for RAG infrastructure."""

View File

@@ -0,0 +1 @@
"""Storage components for RAG infrastructure."""

View File

@@ -10,7 +10,6 @@ from .rpm_controller import RPMController
from .exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededException,
)
from .embedding_configurator import EmbeddingConfigurator
__all__ = [
"Converter",
@@ -24,5 +23,4 @@ __all__ = [
"RPMController",
"YamlParser",
"LLMContextLengthExceededException",
"EmbeddingConfigurator",
]

View File

@@ -1,6 +1,5 @@
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.utilities.serialization import to_serializable
@@ -9,7 +8,7 @@ from crewai.utilities.serialization import to_serializable
class BaseEvent(BaseModel):
"""Base class for all events"""
timestamp: datetime = Field(default_factory=datetime.now)
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
type: str
source_fingerprint: Optional[str] = None # UUID string of the source entity
source_type: Optional[str] = None # "agent", "task", "crew", "memory", "entity_memory", "short_term_memory", "long_term_memory", "external_memory"

View File

@@ -755,3 +755,15 @@ def test_multiple_routers_from_same_trigger():
assert execution_order.index("anemia_analysis") > execution_order.index(
"anemia_router"
)
def test_flow_name():
class MyFlow(Flow):
name = "MyFlow"
@start()
def start(self):
return "Hello, world!"
flow = MyFlow()
assert flow.name == "MyFlow"

View File

@@ -0,0 +1,142 @@
from crewai import Agent, Task, LLM
def test_langdb_basic_integration_example():
"""Test the basic LangDB integration example from the documentation."""
class MockLangDB:
@staticmethod
def init(**kwargs):
pass
MockLangDB.init()
llm = LLM(
model="gpt-4o",
temperature=0.7
)
agent = Agent(
role="Senior Research Analyst",
goal="Conduct comprehensive research on assigned topics",
backstory="You are an expert researcher with deep analytical skills.",
llm=llm,
verbose=True
)
assert agent.role == "Senior Research Analyst"
assert agent.goal == "Conduct comprehensive research on assigned topics"
assert agent.llm == llm
def test_langdb_metadata_configuration_example():
"""Test the metadata configuration example from the documentation."""
class MockLangDB:
@staticmethod
def init(metadata=None, **kwargs):
assert metadata is not None
assert "environment" in metadata
assert "crew_type" in metadata
MockLangDB.init(
metadata={
"environment": "production",
"crew_type": "research_workflow",
"user_id": "user_123"
}
)
def test_langdb_cost_tracking_example():
"""Test the cost tracking configuration example from the documentation."""
class MockLangDB:
@staticmethod
def init(cost_tracking=None, budget_alerts=None, **kwargs):
assert cost_tracking is True
assert budget_alerts is not None
assert "daily_limit" in budget_alerts
assert "alert_threshold" in budget_alerts
MockLangDB.init(
cost_tracking=True,
budget_alerts={
"daily_limit": 100.0,
"alert_threshold": 0.8
}
)
def test_langdb_security_configuration_example():
"""Test the security configuration example from the documentation."""
class MockLangDB:
@staticmethod
def init(security_config=None, **kwargs):
assert security_config is not None
assert "pii_detection" in security_config
assert "content_filtering" in security_config
assert "audit_logging" in security_config
MockLangDB.init(
security_config={
"pii_detection": True,
"content_filtering": True,
"audit_logging": True,
"data_retention_days": 90
}
)
def test_langdb_environment_specific_setup():
"""Test the multi-environment setup example from the documentation."""
environments = ["production", "staging", "development"]
for env in environments:
class MockLangDB:
@staticmethod
def init(project_id=None, sampling_rate=None, cost_tracking=None, **kwargs):
assert project_id is not None
assert sampling_rate is not None
assert cost_tracking is not None
if env == "production":
MockLangDB.init(
project_id="prod_project_id",
sampling_rate=1.0,
cost_tracking=True
)
elif env == "staging":
MockLangDB.init(
project_id="staging_project_id",
sampling_rate=0.5,
cost_tracking=False
)
else:
MockLangDB.init(
project_id="dev_project_id",
sampling_rate=0.1,
cost_tracking=False
)
def test_langdb_task_with_metadata():
"""Test task creation with metadata as shown in documentation."""
llm = LLM(model="gpt-4o")
agent = Agent(
role="Senior Research Analyst",
goal="Conduct research",
backstory="Expert researcher",
llm=llm
)
task = Task(
description="Research the latest AI trends",
expected_output="Comprehensive research report",
agent=agent
)
assert task.description == "Research the latest AI trends"
assert task.expected_output == "Comprehensive research report"
assert task.agent == agent

View File

@@ -0,0 +1,141 @@
"""Test for the LangDB documentation examples."""
from crewai import Agent, Task, LLM
def test_langdb_basic_integration_example():
"""Test the basic LangDB integration example from the documentation."""
class MockLangDB:
@staticmethod
def init(**kwargs):
pass
MockLangDB.init()
llm = LLM(
model="gpt-4o",
temperature=0.7
)
agent = Agent(
role="Senior Research Analyst",
goal="Conduct comprehensive research on assigned topics",
backstory="You are an expert researcher with deep analytical skills.",
llm=llm
)
assert agent.role == "Senior Research Analyst"
assert agent.goal == "Conduct comprehensive research on assigned topics"
assert agent.llm == llm
def test_langdb_metadata_configuration_example():
"""Test the metadata configuration example from the documentation."""
class MockLangDB:
@staticmethod
def init(metadata=None, **kwargs):
assert metadata is not None
assert "environment" in metadata
assert "crew_type" in metadata
MockLangDB.init(
metadata={
"environment": "production",
"crew_type": "research_workflow",
"user_id": "user_123"
}
)
def test_langdb_cost_tracking_example():
"""Test the cost tracking configuration example from the documentation."""
class MockLangDB:
@staticmethod
def init(cost_tracking=None, budget_alerts=None, **kwargs):
assert cost_tracking is True
assert budget_alerts is not None
assert "daily_limit" in budget_alerts
assert "alert_threshold" in budget_alerts
MockLangDB.init(
cost_tracking=True,
budget_alerts={
"daily_limit": 100.0,
"alert_threshold": 0.8
}
)
def test_langdb_security_configuration_example():
"""Test the security configuration example from the documentation."""
class MockLangDB:
@staticmethod
def init(security_config=None, **kwargs):
assert security_config is not None
assert "pii_detection" in security_config
assert "content_filtering" in security_config
assert "audit_logging" in security_config
MockLangDB.init(
security_config={
"pii_detection": True,
"content_filtering": True,
"audit_logging": True,
"data_retention_days": 90
}
)
def test_langdb_environment_specific_setup():
"""Test the multi-environment setup example from the documentation."""
environments = ["production", "staging", "development"]
for env in environments:
class MockLangDB:
@staticmethod
def init(project_id=None, sampling_rate=None, cost_tracking=None, **kwargs):
assert project_id is not None
assert sampling_rate is not None
assert cost_tracking is not None
if env == "production":
MockLangDB.init(
project_id="prod_project_id",
sampling_rate=1.0,
cost_tracking=True
)
elif env == "staging":
MockLangDB.init(
project_id="staging_project_id",
sampling_rate=0.5,
cost_tracking=False
)
else:
MockLangDB.init(
project_id="dev_project_id",
sampling_rate=0.1,
cost_tracking=False
)
def test_langdb_task_with_metadata():
"""Test task creation with metadata as shown in documentation."""
llm = LLM(model="gpt-4o")
agent = Agent(
role="Senior Research Analyst",
goal="Conduct research",
backstory="Expert researcher",
llm=llm
)
task = Task(
description="Research the latest AI trends",
expected_output="Comprehensive research report",
agent=agent
)
assert task.description == "Research the latest AI trends"
assert task.expected_output == "Comprehensive research report"
assert task.agent == agent

View File

@@ -270,3 +270,20 @@ def test_search_method_with_memory_client(mem0_storage_with_memory_client_using_
assert len(results) == 2
assert results[0]["content"] == "Result 1"
def test_mem0_storage_default_infer_value(mock_mem0_memory_client):
"""Test that Mem0Storage sets infer=True by default for short_term memory."""
with patch.object(MemoryClient, "__new__", return_value=mock_mem0_memory_client):
crew = MockCrew(
memory_config={
"provider": "mem0",
"config": {
"user_id": "test_user",
"api_key": "ABCDEFGH"
},
}
)
mem0_storage = Mem0Storage(type="short_term", crew=crew)
assert mem0_storage.infer is True

View File

@@ -64,7 +64,8 @@ def base_agent():
llm="gpt-4o-mini",
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
)
)
@pytest.fixture(scope="module")
def base_task(base_agent):
@@ -74,6 +75,7 @@ def base_task(base_agent):
agent=base_agent,
)
event_listener = EventListener()
@@ -448,6 +450,27 @@ def test_flow_emits_start_event():
assert received_events[0].type == "flow_started"
def test_flow_name_emitted_to_event_bus():
received_events = []
class MyFlowClass(Flow):
name = "PRODUCTION_FLOW"
@start()
def start(self):
return "Hello, world!"
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
flow = MyFlowClass()
flow.kickoff()
assert len(received_events) == 1
assert received_events[0].flow_name == "PRODUCTION_FLOW"
def test_flow_emits_finish_event():
received_events = []
@@ -756,6 +779,7 @@ def test_streaming_empty_response_handling():
received_chunks = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
@@ -793,6 +817,7 @@ def test_streaming_empty_response_handling():
# Restore the original method
llm.call = original_call
@pytest.mark.vcr(filter_headers=["authorization"])
def test_stream_llm_emits_event_with_task_and_agent_info():
completed_event = []
@@ -801,6 +826,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -827,7 +853,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
description="Just say hi",
expected_output="hi",
llm=LLM(model="gpt-4o-mini", stream=True),
agent=agent
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
@@ -855,6 +881,7 @@ def test_stream_llm_emits_event_with_task_and_agent_info():
assert set(all_task_id) == {task.id}
assert set(all_task_name) == {task.name}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
completed_event = []
@@ -863,6 +890,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -904,6 +932,7 @@ def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
assert set(all_task_id) == {base_task.id}
assert set(all_task_name) == {base_task.name}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_llm_emits_event_with_lite_agent():
completed_event = []
@@ -912,6 +941,7 @@ def test_llm_emits_event_with_lite_agent():
stream_event = []
with crewai_event_bus.scoped_handlers():
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
failed_event.append(event)
@@ -936,7 +966,6 @@ def test_llm_emits_event_with_lite_agent():
)
agent.kickoff(messages=[{"role": "user", "content": "say hi!"}])
assert len(completed_event) == 2
assert len(failed_event) == 0
assert len(started_event) == 2