mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-08 07:38:29 +00:00
Compare commits
6 Commits
1.0.0b3
...
copilot/fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d88ed3ef9 | ||
|
|
09226e34b2 | ||
|
|
67398db656 | ||
|
|
203d5fd211 | ||
|
|
c4b1f824a1 | ||
|
|
043bbd73cd |
249
docs/encrypted_communication.md
Normal file
249
docs/encrypted_communication.md
Normal file
@@ -0,0 +1,249 @@
|
||||
# Encrypted Agent Communication
|
||||
|
||||
CrewAI now supports encrypted agent-to-agent communication to ensure secure information exchange between agents in multi-agent workflows.
|
||||
|
||||
## Features
|
||||
|
||||
- **Fernet Encryption**: Uses industry-standard Fernet symmetric encryption
|
||||
- **Fingerprint-based Key Derivation**: Unique keys derived from agent fingerprints
|
||||
- **Secure Message Routing**: Messages can only be decrypted by intended recipients
|
||||
- **Backward Compatible**: Non-encrypted agents continue to work normally
|
||||
- **Optional**: Encryption can be enabled per-agent or per-crew
|
||||
|
||||
## Quick Start
|
||||
|
||||
### 1. Enable Encryption for Agents
|
||||
|
||||
```python
|
||||
from crewai import Agent
|
||||
from crewai.security import SecurityConfig
|
||||
|
||||
# Create agents with encryption enabled
|
||||
researcher = Agent(
|
||||
role="Research Analyst",
|
||||
goal="Conduct research and analysis",
|
||||
backstory="Expert researcher with deep analytical skills",
|
||||
security_config=SecurityConfig(encrypted_communication=True)
|
||||
)
|
||||
|
||||
writer = Agent(
|
||||
role="Content Writer",
|
||||
goal="Create engaging content",
|
||||
backstory="Skilled writer who creates compelling narratives",
|
||||
security_config=SecurityConfig(encrypted_communication=True)
|
||||
)
|
||||
```
|
||||
|
||||
### 2. Use Encrypted Communication in Tasks
|
||||
|
||||
```python
|
||||
from crewai import Crew, Task
|
||||
from crewai.tools.agent_tools import AskQuestionTool
|
||||
|
||||
# Create tools that support encrypted communication
|
||||
agent_tools = [
|
||||
AskQuestionTool(
|
||||
agents=[researcher, writer],
|
||||
description="Ask questions to team members with encrypted communication"
|
||||
)
|
||||
]
|
||||
|
||||
# Create tasks that will use encrypted communication
|
||||
research_task = Task(
|
||||
description="Research the latest AI trends",
|
||||
expected_output="Comprehensive research report",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Ask the researcher about their findings and create a blog post",
|
||||
expected_output="Engaging blog post based on research",
|
||||
agent=writer,
|
||||
tools=agent_tools # These will use encrypted communication automatically
|
||||
)
|
||||
|
||||
# Run the crew - communications will be automatically encrypted
|
||||
crew = Crew(agents=[researcher, writer], tasks=[research_task, writing_task])
|
||||
result = crew.kickoff()
|
||||
```
|
||||
|
||||
## How It Works
|
||||
|
||||
### Security Architecture
|
||||
|
||||
1. **Agent Fingerprints**: Each agent gets a unique cryptographic fingerprint
|
||||
2. **Key Derivation**: Communication keys are derived from sender/recipient fingerprint pairs
|
||||
3. **Message Encryption**: Payloads are encrypted using Fernet with derived keys
|
||||
4. **Secure Routing**: Only intended recipients can decrypt messages
|
||||
|
||||
### Message Flow
|
||||
|
||||
```
|
||||
Sender Agent → Encrypt Message → Encrypted Payload → Recipient Agent → Decrypt Message
|
||||
↓ ↓ ↓ ↓ ↓
|
||||
Fingerprint Derive Key EncryptedMessage Derive Key Original Message
|
||||
```
|
||||
|
||||
### Encryption Details
|
||||
|
||||
- **Algorithm**: Fernet (AES 128 in CBC mode with HMAC-SHA256)
|
||||
- **Key Length**: 256-bit encryption keys
|
||||
- **Key Derivation**: SHA-256 based on sorted agent fingerprints
|
||||
- **Message Format**: JSON with encrypted payload and metadata
|
||||
|
||||
## Configuration Options
|
||||
|
||||
### SecurityConfig Parameters
|
||||
|
||||
```python
|
||||
from crewai.security import SecurityConfig
|
||||
|
||||
# Enable encryption
|
||||
security_config = SecurityConfig(encrypted_communication=True)
|
||||
|
||||
# Disable encryption (default)
|
||||
security_config = SecurityConfig(encrypted_communication=False)
|
||||
```
|
||||
|
||||
### Mixed Encryption Scenarios
|
||||
|
||||
```python
|
||||
# Some agents with encryption, others without
|
||||
encrypted_agent = Agent(
|
||||
role="Secure Agent",
|
||||
security_config=SecurityConfig(encrypted_communication=True),
|
||||
# ... other params
|
||||
)
|
||||
|
||||
plain_agent = Agent(
|
||||
role="Regular Agent",
|
||||
security_config=SecurityConfig(encrypted_communication=False),
|
||||
# ... other params
|
||||
)
|
||||
|
||||
# Agent tools automatically handle mixed scenarios:
|
||||
# - Encrypted agents → Encrypted communication
|
||||
# - Non-encrypted agents → Plain communication
|
||||
# - Mixed → Falls back to plain communication with warning
|
||||
```
|
||||
|
||||
## Security Considerations
|
||||
|
||||
### What Is Protected
|
||||
|
||||
✅ **Task descriptions and context** passed between agents
|
||||
✅ **Questions and responses** in agent-to-agent communication
|
||||
✅ **Delegation payloads** including sensitive instructions
|
||||
✅ **Agent-to-agent metadata** like sender/recipient information
|
||||
|
||||
### What Is NOT Protected
|
||||
|
||||
❌ **Agent configurations** (roles, goals, backstories)
|
||||
❌ **Task outputs** stored in crew results
|
||||
❌ **LLM API calls** to external services
|
||||
❌ **Tool executions** outside of agent communication
|
||||
|
||||
### Best Practices
|
||||
|
||||
1. **Enable encryption** for sensitive workflows
|
||||
2. **Use unique fingerprints** per deployment/environment
|
||||
3. **Monitor logs** for encryption failures or downgrades
|
||||
4. **Test mixed scenarios** with both encrypted and non-encrypted agents
|
||||
5. **Keep fingerprints secure** - they are used for key derivation
|
||||
|
||||
## Advanced Usage
|
||||
|
||||
### Direct Encryption API
|
||||
|
||||
For advanced use cases, you can use the encryption API directly:
|
||||
|
||||
```python
|
||||
from crewai.security import AgentCommunicationEncryption, Fingerprint
|
||||
|
||||
# Create encryption handlers
|
||||
sender_fp = Fingerprint()
|
||||
recipient_fp = Fingerprint()
|
||||
sender_encryption = AgentCommunicationEncryption(sender_fp)
|
||||
recipient_encryption = AgentCommunicationEncryption(recipient_fp)
|
||||
|
||||
# Encrypt a message
|
||||
message = {"task": "Analyze data", "context": "Q4 results"}
|
||||
encrypted_msg = sender_encryption.encrypt_message(
|
||||
message,
|
||||
recipient_fp,
|
||||
message_type="analysis_request"
|
||||
)
|
||||
|
||||
# Decrypt the message
|
||||
decrypted_msg = recipient_encryption.decrypt_message(encrypted_msg)
|
||||
```
|
||||
|
||||
### Custom Fingerprints
|
||||
|
||||
```python
|
||||
# Generate deterministic fingerprints from seeds
|
||||
fp = Fingerprint.generate(seed="agent-role-environment")
|
||||
|
||||
# Use custom metadata
|
||||
fp = Fingerprint.generate(
|
||||
seed="unique-seed",
|
||||
metadata={"environment": "production", "version": "1.0"}
|
||||
)
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
**Q: "Message not intended for this agent" error**
|
||||
A: This happens when an agent tries to decrypt a message meant for another agent. Check that the correct recipient agent is being used.
|
||||
|
||||
**Q: "Encryption failed, falling back to plain communication" warning**
|
||||
A: This indicates the encryption process failed and the system fell back to unencrypted communication. Check agent security configurations.
|
||||
|
||||
**Q: Mixed encrypted/non-encrypted agents not working**
|
||||
A: Ensure at least one agent has encryption enabled for the encryption to activate. If no agents have encryption, all communication will be plain text.
|
||||
|
||||
### Debug Logging
|
||||
|
||||
Enable debug logging to see encryption activities:
|
||||
|
||||
```python
|
||||
import logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
# Look for log messages like:
|
||||
# DEBUG:crewai.security.encrypted_communication:Encrypted message from abc12345... to def67890...
|
||||
# DEBUG:crewai.tools.agent_tools.base_agent_tools:Executing encrypted communication task...
|
||||
```
|
||||
|
||||
## Performance Impact
|
||||
|
||||
- **Encryption/Decryption**: Minimal overhead (~1-2ms per message)
|
||||
- **Key Derivation**: Cached after first use per agent pair
|
||||
- **Memory**: Small increase for encryption handlers and cached keys
|
||||
- **Network**: No additional network calls (all local encryption)
|
||||
|
||||
## Migration Guide
|
||||
|
||||
### From Non-Encrypted to Encrypted
|
||||
|
||||
1. Add `security_config` to your agent definitions
|
||||
2. No code changes required for agent tools
|
||||
3. Test with mixed encrypted/non-encrypted agents first
|
||||
4. Enable encryption for all agents in production
|
||||
|
||||
```python
|
||||
# Before
|
||||
agent = Agent(role="Analyst", goal="...", backstory="...")
|
||||
|
||||
# After
|
||||
agent = Agent(
|
||||
role="Analyst",
|
||||
goal="...",
|
||||
backstory="...",
|
||||
security_config=SecurityConfig(encrypted_communication=True)
|
||||
)
|
||||
```
|
||||
|
||||
That's it! Your agents will automatically use encrypted communication when both sender and recipient support it.
|
||||
103
examples/encrypted_communication_example.py
Normal file
103
examples/encrypted_communication_example.py
Normal file
@@ -0,0 +1,103 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Example demonstrating encrypted agent-to-agent communication in CrewAI.
|
||||
|
||||
This example shows how to:
|
||||
1. Enable encrypted communication for agents
|
||||
2. Use existing agent tools with encryption
|
||||
3. Verify that communication is encrypted between agents
|
||||
"""
|
||||
|
||||
from crewai import Agent, Crew, Task
|
||||
from crewai.security import SecurityConfig
|
||||
from crewai.tools.agent_tools.ask_question_tool import AskQuestionTool
|
||||
|
||||
|
||||
def main():
|
||||
"""Demonstrate encrypted agent communication."""
|
||||
print("🔒 CrewAI Encrypted Agent Communication Example")
|
||||
print("=" * 50)
|
||||
|
||||
# Create agents with encrypted communication enabled
|
||||
print("Creating agents with encrypted communication...")
|
||||
|
||||
# Researcher agent with encryption enabled
|
||||
researcher = Agent(
|
||||
role="Senior Research Analyst",
|
||||
goal="Conduct thorough research and analysis",
|
||||
backstory="You are an expert researcher with years of experience in data analysis.",
|
||||
security_config=SecurityConfig(encrypted_communication=True),
|
||||
verbose=True
|
||||
)
|
||||
|
||||
# Writer agent with encryption enabled
|
||||
writer = Agent(
|
||||
role="Content Writer",
|
||||
goal="Create compelling content based on research",
|
||||
backstory="You are a skilled writer who transforms complex research into engaging content.",
|
||||
security_config=SecurityConfig(encrypted_communication=True),
|
||||
verbose=True
|
||||
)
|
||||
|
||||
print(f"✓ Researcher agent created with encryption: {researcher.security_config.encrypted_communication}")
|
||||
print(f"✓ Writer agent created with encryption: {writer.security_config.encrypted_communication}")
|
||||
print(f"✓ Researcher fingerprint: {researcher.security_config.fingerprint.uuid_str[:8]}...")
|
||||
print(f"✓ Writer fingerprint: {writer.security_config.fingerprint.uuid_str[:8]}...")
|
||||
|
||||
# Create agent tools - these will automatically use encryption when available
|
||||
agent_tools = [
|
||||
AskQuestionTool(
|
||||
agents=[researcher, writer],
|
||||
description="Tool for asking questions to coworkers with encrypted communication support"
|
||||
)
|
||||
]
|
||||
|
||||
print(f"✓ Agent tools created with encryption capability")
|
||||
|
||||
# Create tasks that will involve encrypted communication
|
||||
research_task = Task(
|
||||
description="Research the latest trends in artificial intelligence and machine learning",
|
||||
expected_output="A comprehensive research report on AI/ML trends",
|
||||
agent=researcher
|
||||
)
|
||||
|
||||
writing_task = Task(
|
||||
description="Ask the researcher about their findings and write a blog post",
|
||||
expected_output="An engaging blog post about AI trends",
|
||||
agent=writer,
|
||||
tools=agent_tools
|
||||
)
|
||||
|
||||
# Create crew with both agents
|
||||
crew = Crew(
|
||||
agents=[researcher, writer],
|
||||
tasks=[research_task, writing_task],
|
||||
verbose=True
|
||||
)
|
||||
|
||||
print("\n🚀 Starting crew execution with encrypted communication...")
|
||||
print("Note: Agent communications will be automatically encrypted!")
|
||||
|
||||
# Execute the crew - agent tools will use encryption automatically
|
||||
try:
|
||||
result = crew.kickoff()
|
||||
print("\n✅ Crew execution completed successfully!")
|
||||
print("=" * 50)
|
||||
print("RESULT:")
|
||||
print(result)
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Execution failed: {e}")
|
||||
print("This is expected in a demo environment without proper LLM configuration")
|
||||
|
||||
print("\n🔍 Key Features Demonstrated:")
|
||||
print("- Agents created with SecurityConfig(encrypted_communication=True)")
|
||||
print("- Unique fingerprints generated for each agent")
|
||||
print("- Agent tools automatically detect encryption capability")
|
||||
print("- Communication payloads encrypted using Fernet symmetric encryption")
|
||||
print("- Keys derived from agent fingerprints for secure communication")
|
||||
print("- Backward compatibility maintained - non-encrypted agents still work")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,3 +1,4 @@
|
||||
import logging
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from copy import copy as shallow_copy
|
||||
@@ -30,6 +31,8 @@ from crewai.utilities.string_utils import interpolate_only
|
||||
|
||||
T = TypeVar("T", bound="BaseAgent")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseAgent(ABC, BaseModel):
|
||||
"""Abstract Base Class for all third party agents compatible with CrewAI.
|
||||
@@ -217,6 +220,12 @@ class BaseAgent(ABC, BaseModel):
|
||||
if self.security_config is None:
|
||||
self.security_config = SecurityConfig()
|
||||
|
||||
# Log encryption status for agent initialization
|
||||
if hasattr(self.security_config, 'encrypted_communication') and self.security_config.encrypted_communication:
|
||||
logger.info(f"Agent '{self.role}' initialized with encrypted communication enabled (fingerprint: {self.security_config.fingerprint.uuid_str[:8]}...)")
|
||||
else:
|
||||
logger.debug(f"Agent '{self.role}' initialized with encrypted communication disabled")
|
||||
|
||||
return self
|
||||
|
||||
@field_validator("id", mode="before")
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
import warnings
|
||||
@@ -89,6 +90,8 @@ from crewai.utilities.training_handler import CrewTrainingHandler
|
||||
|
||||
warnings.filterwarnings("ignore", category=SyntaxWarning, module="pysbd")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Crew(FlowTrackable, BaseModel):
|
||||
"""
|
||||
@@ -381,6 +384,20 @@ class Crew(FlowTrackable, BaseModel):
|
||||
self._setup_from_config()
|
||||
|
||||
if self.agents:
|
||||
# Count agents with encryption enabled
|
||||
encryption_enabled_agents = [
|
||||
agent for agent in self.agents
|
||||
if hasattr(agent, 'security_config')
|
||||
and agent.security_config
|
||||
and getattr(agent.security_config, 'encrypted_communication', False)
|
||||
]
|
||||
|
||||
if encryption_enabled_agents:
|
||||
logger.info(f"Crew initialized with {len(encryption_enabled_agents)} agent(s) having encrypted communication enabled: {[agent.role for agent in encryption_enabled_agents]}")
|
||||
logger.info("Agent-to-agent communication will be automatically encrypted when using delegation tools")
|
||||
else:
|
||||
logger.debug(f"Crew initialized with {len(self.agents)} agent(s) - encrypted communication disabled for all agents")
|
||||
|
||||
for agent in self.agents:
|
||||
if self.cache:
|
||||
agent.set_cache_handler(self._cache_handler)
|
||||
|
||||
@@ -4,10 +4,15 @@ CrewAI security module.
|
||||
This module provides security-related functionality for CrewAI, including:
|
||||
- Fingerprinting for component identity and tracking
|
||||
- Security configuration for controlling access and permissions
|
||||
- Encrypted agent-to-agent communication
|
||||
- Future: authentication, scoping, and delegation mechanisms
|
||||
"""
|
||||
|
||||
from crewai.security.fingerprint import Fingerprint
|
||||
from crewai.security.security_config import SecurityConfig
|
||||
from crewai.security.encrypted_communication import (
|
||||
AgentCommunicationEncryption,
|
||||
EncryptedMessage
|
||||
)
|
||||
|
||||
__all__ = ["Fingerprint", "SecurityConfig"]
|
||||
__all__ = ["Fingerprint", "SecurityConfig", "AgentCommunicationEncryption", "EncryptedMessage"]
|
||||
|
||||
266
src/crewai/security/encrypted_communication.py
Normal file
266
src/crewai/security/encrypted_communication.py
Normal file
@@ -0,0 +1,266 @@
|
||||
"""
|
||||
Encrypted Communication Module
|
||||
|
||||
This module provides functionality for encrypting and decrypting agent-to-agent
|
||||
communication in CrewAI. It leverages existing security infrastructure including
|
||||
agent fingerprints and Fernet encryption.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Dict, Optional, Union
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from crewai.security.fingerprint import Fingerprint
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.encryption_events import (
|
||||
EncryptionStartedEvent,
|
||||
EncryptionCompletedEvent,
|
||||
DecryptionStartedEvent,
|
||||
DecryptionCompletedEvent,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EncryptedMessage(BaseModel):
|
||||
"""
|
||||
Represents an encrypted message between agents.
|
||||
|
||||
Attributes:
|
||||
encrypted_payload (str): The encrypted message content
|
||||
sender_fingerprint (str): The fingerprint of the sending agent
|
||||
recipient_fingerprint (str): The fingerprint of the intended recipient
|
||||
message_type (str): The type of message (task, question, response, etc.)
|
||||
"""
|
||||
encrypted_payload: str = Field(..., description="The encrypted message content")
|
||||
sender_fingerprint: str = Field(..., description="Sender agent's fingerprint")
|
||||
recipient_fingerprint: str = Field(..., description="Recipient agent's fingerprint")
|
||||
message_type: str = Field(default="communication", description="Type of message")
|
||||
|
||||
|
||||
class AgentCommunicationEncryption:
|
||||
"""
|
||||
Handles encryption and decryption of agent-to-agent communication.
|
||||
|
||||
Uses Fernet symmetric encryption with keys derived from agent fingerprints.
|
||||
Provides methods to encrypt and decrypt communication payloads.
|
||||
"""
|
||||
|
||||
def __init__(self, agent_fingerprint: Fingerprint, agent=None):
|
||||
"""
|
||||
Initialize encryption handler for an agent.
|
||||
|
||||
Args:
|
||||
agent_fingerprint (Fingerprint): The agent's unique fingerprint
|
||||
agent: The agent instance (optional, needed for events)
|
||||
"""
|
||||
self.agent_fingerprint = agent_fingerprint
|
||||
self.agent = agent
|
||||
self._encryption_keys: Dict[str, Fernet] = {}
|
||||
|
||||
def _derive_communication_key(self, sender_fp: str, recipient_fp: str) -> bytes:
|
||||
"""
|
||||
Derive a communication key from sender and recipient fingerprints.
|
||||
|
||||
Creates a deterministic key based on both agent fingerprints to ensure
|
||||
both agents can derive the same key for encrypted communication.
|
||||
|
||||
Args:
|
||||
sender_fp (str): Sender agent's fingerprint
|
||||
recipient_fp (str): Recipient agent's fingerprint
|
||||
|
||||
Returns:
|
||||
bytes: 32-byte encryption key for Fernet
|
||||
"""
|
||||
# Sort fingerprints to ensure consistent key derivation regardless of role
|
||||
fp_pair = tuple(sorted([sender_fp, recipient_fp]))
|
||||
key_material = f"crewai_comm_{fp_pair[0]}_{fp_pair[1]}".encode('utf-8')
|
||||
|
||||
# Use SHA-256 to derive a 32-byte key from the fingerprint pair
|
||||
import hashlib
|
||||
key_hash = hashlib.sha256(key_material).digest()
|
||||
|
||||
# Fernet requires base64-encoded 32-byte key
|
||||
import base64
|
||||
return base64.urlsafe_b64encode(key_hash)
|
||||
|
||||
def _get_fernet(self, sender_fp: str, recipient_fp: str) -> Fernet:
|
||||
"""
|
||||
Get or create Fernet instance for communication between two agents.
|
||||
|
||||
Args:
|
||||
sender_fp (str): Sender agent's fingerprint
|
||||
recipient_fp (str): Recipient agent's fingerprint
|
||||
|
||||
Returns:
|
||||
Fernet: Encryption instance for this agent pair
|
||||
"""
|
||||
# Create cache key from sorted fingerprints
|
||||
cache_key = "_".join(sorted([sender_fp, recipient_fp]))
|
||||
|
||||
if cache_key not in self._encryption_keys:
|
||||
key = self._derive_communication_key(sender_fp, recipient_fp)
|
||||
self._encryption_keys[cache_key] = Fernet(key)
|
||||
|
||||
return self._encryption_keys[cache_key]
|
||||
|
||||
def encrypt_message(
|
||||
self,
|
||||
message: Union[str, Dict[str, Any]],
|
||||
recipient_fingerprint: Fingerprint,
|
||||
message_type: str = "communication",
|
||||
recipient_agent=None
|
||||
) -> EncryptedMessage:
|
||||
"""
|
||||
Encrypt a message for a specific recipient agent.
|
||||
|
||||
Args:
|
||||
message (Union[str, Dict[str, Any]]): The message to encrypt
|
||||
recipient_fingerprint (Fingerprint): The recipient agent's fingerprint
|
||||
message_type (str): Type of message being sent
|
||||
recipient_agent: The recipient agent instance (optional, needed for events)
|
||||
|
||||
Returns:
|
||||
EncryptedMessage: Encrypted message container
|
||||
|
||||
Raises:
|
||||
ValueError: If encryption fails
|
||||
"""
|
||||
try:
|
||||
# Emit encryption started event if both agents are available
|
||||
if self.agent and recipient_agent:
|
||||
crewai_event_bus.emit(
|
||||
self.agent,
|
||||
EncryptionStartedEvent(
|
||||
sender_agent=self.agent,
|
||||
recipient_agent=recipient_agent,
|
||||
message_type=message_type
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Starting encryption for {message_type} message to recipient {recipient_fingerprint.uuid_str[:8]}...")
|
||||
|
||||
# Convert message to JSON string if it's a dict
|
||||
if isinstance(message, dict):
|
||||
message_str = json.dumps(message)
|
||||
else:
|
||||
message_str = str(message)
|
||||
|
||||
# Get Fernet instance for this communication pair
|
||||
fernet = self._get_fernet(
|
||||
self.agent_fingerprint.uuid_str,
|
||||
recipient_fingerprint.uuid_str
|
||||
)
|
||||
|
||||
# Encrypt the message
|
||||
encrypted_bytes = fernet.encrypt(message_str.encode('utf-8'))
|
||||
encrypted_payload = encrypted_bytes.decode('utf-8')
|
||||
|
||||
# Emit encryption completed event if both agents are available
|
||||
if self.agent and recipient_agent:
|
||||
crewai_event_bus.emit(
|
||||
self.agent,
|
||||
EncryptionCompletedEvent(
|
||||
sender_agent=self.agent,
|
||||
recipient_agent=recipient_agent,
|
||||
message_type=message_type
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Successfully encrypted {message_type} message from {self.agent_fingerprint.uuid_str[:8]}... to {recipient_fingerprint.uuid_str[:8]}...")
|
||||
logger.debug(f"Encrypted message from {self.agent_fingerprint.uuid_str[:8]}... to {recipient_fingerprint.uuid_str[:8]}...")
|
||||
|
||||
return EncryptedMessage(
|
||||
encrypted_payload=encrypted_payload,
|
||||
sender_fingerprint=self.agent_fingerprint.uuid_str,
|
||||
recipient_fingerprint=recipient_fingerprint.uuid_str,
|
||||
message_type=message_type
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to encrypt message: {e}")
|
||||
raise ValueError(f"Message encryption failed: {e}")
|
||||
|
||||
def decrypt_message(self, encrypted_message: EncryptedMessage) -> Union[str, Dict[str, Any]]:
|
||||
"""
|
||||
Decrypt a message intended for this agent.
|
||||
|
||||
Args:
|
||||
encrypted_message (EncryptedMessage): The encrypted message to decrypt
|
||||
|
||||
Returns:
|
||||
Union[str, Dict[str, Any]]: The decrypted message content
|
||||
|
||||
Raises:
|
||||
ValueError: If decryption fails or message is not for this agent
|
||||
"""
|
||||
try:
|
||||
# Emit decryption started event if agent is available
|
||||
if self.agent:
|
||||
crewai_event_bus.emit(
|
||||
self.agent,
|
||||
DecryptionStartedEvent(
|
||||
recipient_agent=self.agent,
|
||||
sender_fingerprint=encrypted_message.sender_fingerprint,
|
||||
message_type=encrypted_message.message_type
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Starting decryption of {encrypted_message.message_type} message from sender {encrypted_message.sender_fingerprint[:8]}...")
|
||||
|
||||
# Verify this message is intended for this agent
|
||||
if encrypted_message.recipient_fingerprint != self.agent_fingerprint.uuid_str:
|
||||
raise ValueError(f"Message not intended for this agent. Expected {self.agent_fingerprint.uuid_str[:8]}..., got {encrypted_message.recipient_fingerprint[:8]}...")
|
||||
|
||||
# Get Fernet instance for this communication pair
|
||||
fernet = self._get_fernet(
|
||||
encrypted_message.sender_fingerprint,
|
||||
encrypted_message.recipient_fingerprint
|
||||
)
|
||||
|
||||
# Decrypt the message
|
||||
decrypted_bytes = fernet.decrypt(encrypted_message.encrypted_payload.encode('utf-8'))
|
||||
decrypted_str = decrypted_bytes.decode('utf-8')
|
||||
|
||||
# Try to parse as JSON, fallback to string
|
||||
try:
|
||||
decrypted_content = json.loads(decrypted_str)
|
||||
except json.JSONDecodeError:
|
||||
decrypted_content = decrypted_str
|
||||
|
||||
# Emit decryption completed event if agent is available
|
||||
if self.agent:
|
||||
crewai_event_bus.emit(
|
||||
self.agent,
|
||||
DecryptionCompletedEvent(
|
||||
recipient_agent=self.agent,
|
||||
sender_fingerprint=encrypted_message.sender_fingerprint,
|
||||
message_type=encrypted_message.message_type
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Successfully decrypted {encrypted_message.message_type} message from {encrypted_message.sender_fingerprint[:8]}... to {encrypted_message.recipient_fingerprint[:8]}...")
|
||||
|
||||
return decrypted_content
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to decrypt message: {e}")
|
||||
raise ValueError(f"Message decryption failed: {e}")
|
||||
|
||||
def is_encrypted_communication(self, message: Any) -> bool:
|
||||
"""
|
||||
Check if a message is an encrypted communication.
|
||||
|
||||
Args:
|
||||
message (Any): Message to check
|
||||
|
||||
Returns:
|
||||
bool: True if message is encrypted, False otherwise
|
||||
"""
|
||||
return isinstance(message, (EncryptedMessage, dict)) and (
|
||||
hasattr(message, 'encrypted_payload') or
|
||||
(isinstance(message, dict) and 'encrypted_payload' in message)
|
||||
)
|
||||
@@ -24,12 +24,14 @@ class SecurityConfig(BaseModel):
|
||||
This class manages security settings for CrewAI agents, including:
|
||||
- Authentication credentials *TODO*
|
||||
- Identity information (agent fingerprints)
|
||||
- Encrypted agent-to-agent communication
|
||||
- Scoping rules *TODO*
|
||||
- Impersonation/delegation tokens *TODO*
|
||||
|
||||
Attributes:
|
||||
version (str): Version of the security configuration
|
||||
fingerprint (Fingerprint): The unique fingerprint automatically generated for the component
|
||||
encrypted_communication (bool): Enable encrypted agent-to-agent communication
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(
|
||||
@@ -46,6 +48,11 @@ class SecurityConfig(BaseModel):
|
||||
default_factory=Fingerprint,
|
||||
description="Unique identifier for the component"
|
||||
)
|
||||
|
||||
encrypted_communication: bool = Field(
|
||||
default=False,
|
||||
description="Enable encrypted communication between agents"
|
||||
)
|
||||
|
||||
def is_compatible(self, min_version: str) -> bool:
|
||||
"""
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities import I18N
|
||||
import logging
|
||||
|
||||
from .ask_question_tool import AskQuestionTool
|
||||
from .delegate_work_tool import DelegateWorkTool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AgentTools:
|
||||
"""Manager class for agent-related tools"""
|
||||
@@ -16,6 +19,19 @@ class AgentTools:
|
||||
def tools(self) -> list[BaseTool]:
|
||||
"""Get all available agent tools"""
|
||||
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])
|
||||
|
||||
# Check encryption capabilities of agents
|
||||
encryption_enabled_agents = [
|
||||
agent for agent in self.agents
|
||||
if hasattr(agent, 'security_config')
|
||||
and agent.security_config
|
||||
and getattr(agent.security_config, 'encrypted_communication', False)
|
||||
]
|
||||
|
||||
if encryption_enabled_agents:
|
||||
logger.info(f"Creating agent communication tools with encryption support for {len(encryption_enabled_agents)} agent(s): {[agent.role for agent in encryption_enabled_agents]}")
|
||||
else:
|
||||
logger.debug(f"Creating agent communication tools without encryption (no agents have encrypted_communication enabled)")
|
||||
|
||||
delegate_tool = DelegateWorkTool(
|
||||
agents=self.agents,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
from typing import Optional, Union, Dict, Any
|
||||
|
||||
from pydantic import Field
|
||||
|
||||
@@ -7,18 +7,173 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.task import Task
|
||||
from crewai.tools.base_tool import BaseTool
|
||||
from crewai.utilities import I18N
|
||||
from crewai.security import AgentCommunicationEncryption, EncryptedMessage
|
||||
from crewai.utilities.events.crewai_event_bus import crewai_event_bus
|
||||
from crewai.utilities.events.encryption_events import (
|
||||
EncryptedCommunicationStartedEvent,
|
||||
EncryptedCommunicationEstablishedEvent,
|
||||
EncryptedTaskExecutionEvent,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseAgentTool(BaseTool):
|
||||
"""Base class for agent-related tools"""
|
||||
"""Base class for agent-related tools with optional encrypted communication support"""
|
||||
|
||||
agents: list[BaseAgent] = Field(description="List of available agents")
|
||||
i18n: I18N = Field(
|
||||
default_factory=I18N, description="Internationalization settings"
|
||||
)
|
||||
|
||||
def __init__(self, **data):
|
||||
"""Initialize BaseAgentTool with optional encryption support."""
|
||||
super().__init__(**data)
|
||||
self._encryption_handler: Optional[AgentCommunicationEncryption] = None
|
||||
|
||||
@property
|
||||
def encryption_enabled(self) -> bool:
|
||||
"""Check if encryption is enabled for agent communication."""
|
||||
# Check if any agent has encryption enabled
|
||||
return any(
|
||||
hasattr(agent, 'security_config') and
|
||||
agent.security_config and
|
||||
getattr(agent.security_config, 'encrypted_communication', False)
|
||||
for agent in self.agents
|
||||
)
|
||||
|
||||
def _get_encryption_handler(self, sender_agent: BaseAgent) -> Optional[AgentCommunicationEncryption]:
|
||||
"""Get encryption handler for a specific agent."""
|
||||
if not hasattr(sender_agent, 'security_config') or not sender_agent.security_config:
|
||||
return None
|
||||
|
||||
if not getattr(sender_agent.security_config, 'encrypted_communication', False):
|
||||
return None
|
||||
|
||||
# Create encryption handler if it doesn't exist, passing the agent instance
|
||||
if self._encryption_handler is None:
|
||||
self._encryption_handler = AgentCommunicationEncryption(
|
||||
sender_agent.security_config.fingerprint,
|
||||
agent=sender_agent
|
||||
)
|
||||
|
||||
return self._encryption_handler
|
||||
|
||||
def _prepare_communication_payload(
|
||||
self,
|
||||
sender_agent: BaseAgent,
|
||||
recipient_agent: BaseAgent,
|
||||
task: str,
|
||||
context: Optional[str] = None
|
||||
) -> Union[Dict[str, Any], EncryptedMessage]:
|
||||
"""
|
||||
Prepare communication payload, with optional encryption.
|
||||
|
||||
Args:
|
||||
sender_agent: The agent sending the communication
|
||||
recipient_agent: The agent receiving the communication
|
||||
task: The task or question to communicate
|
||||
context: Optional context for the communication
|
||||
|
||||
Returns:
|
||||
Union[Dict[str, Any], EncryptedMessage]: Plain or encrypted message
|
||||
"""
|
||||
# Prepare the base message
|
||||
message_payload = {
|
||||
"task": task,
|
||||
"context": context or "",
|
||||
"sender_role": getattr(sender_agent, 'role', 'unknown'),
|
||||
"message_type": "agent_communication"
|
||||
}
|
||||
|
||||
# Check if encryption should be used
|
||||
encryption_handler = self._get_encryption_handler(sender_agent)
|
||||
if encryption_handler and hasattr(recipient_agent, 'security_config') and recipient_agent.security_config:
|
||||
try:
|
||||
# Emit communication started event
|
||||
crewai_event_bus.emit(
|
||||
sender_agent,
|
||||
EncryptedCommunicationStartedEvent(
|
||||
sender_agent=sender_agent,
|
||||
recipient_agent=recipient_agent
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Starting encrypted communication from '{sender_agent.role}' to '{recipient_agent.role}'")
|
||||
# Encrypt the message for the recipient
|
||||
encrypted_msg = encryption_handler.encrypt_message(
|
||||
message_payload,
|
||||
recipient_agent.security_config.fingerprint,
|
||||
message_type="agent_communication",
|
||||
recipient_agent=recipient_agent
|
||||
)
|
||||
|
||||
# Emit communication established event
|
||||
crewai_event_bus.emit(
|
||||
sender_agent,
|
||||
EncryptedCommunicationEstablishedEvent(
|
||||
sender_agent=sender_agent,
|
||||
recipient_agent=recipient_agent
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Encrypted communication established between '{sender_agent.role}' and '{recipient_agent.role}'")
|
||||
logger.debug(f"Encrypted communication from {sender_agent.role} to {recipient_agent.role}")
|
||||
return encrypted_msg
|
||||
except Exception as e:
|
||||
logger.warning(f"Encryption failed, falling back to plain communication: {e}")
|
||||
|
||||
return message_payload
|
||||
|
||||
def _process_received_communication(
|
||||
self,
|
||||
recipient_agent: BaseAgent,
|
||||
message: Union[str, Dict[str, Any], EncryptedMessage]
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""
|
||||
Process received communication, with optional decryption.
|
||||
|
||||
Args:
|
||||
recipient_agent: The agent receiving the communication
|
||||
message: The message to process (may be encrypted)
|
||||
|
||||
Returns:
|
||||
Union[str, Dict[str, Any]]: Processed message content
|
||||
"""
|
||||
# Handle encrypted messages
|
||||
if isinstance(message, EncryptedMessage) or (
|
||||
isinstance(message, dict) and 'encrypted_payload' in message
|
||||
):
|
||||
# We need an encryption handler for the recipient agent
|
||||
recipient_encryption_handler = None
|
||||
if hasattr(recipient_agent, 'security_config') and recipient_agent.security_config:
|
||||
if getattr(recipient_agent.security_config, 'encrypted_communication', False):
|
||||
recipient_encryption_handler = AgentCommunicationEncryption(
|
||||
recipient_agent.security_config.fingerprint,
|
||||
agent=recipient_agent
|
||||
)
|
||||
|
||||
if recipient_encryption_handler:
|
||||
try:
|
||||
logger.info(f"Starting decryption of received communication for '{recipient_agent.role}'")
|
||||
# Convert dict to EncryptedMessage if needed
|
||||
if isinstance(message, dict):
|
||||
message = EncryptedMessage(**message)
|
||||
|
||||
decrypted = recipient_encryption_handler.decrypt_message(message)
|
||||
logger.info(f"Successfully decrypted communication for '{recipient_agent.role}'")
|
||||
logger.debug(f"Decrypted communication for {recipient_agent.role}")
|
||||
return decrypted
|
||||
except Exception as e:
|
||||
logger.error(f"Decryption failed for {recipient_agent.role}: {e}")
|
||||
raise ValueError(f"Failed to decrypt communication: {e}")
|
||||
else:
|
||||
logger.warning(f"Received encrypted message but {recipient_agent.role} has no decryption capability")
|
||||
raise ValueError("Received encrypted message but agent cannot decrypt it")
|
||||
|
||||
# Return message as-is for plain communication
|
||||
return message
|
||||
|
||||
def sanitize_agent_name(self, name: str) -> str:
|
||||
"""
|
||||
Sanitize agent role name by normalizing whitespace and setting to lowercase.
|
||||
@@ -54,6 +209,7 @@ class BaseAgentTool(BaseTool):
|
||||
) -> str:
|
||||
"""
|
||||
Execute delegation to an agent with case-insensitive and whitespace-tolerant matching.
|
||||
Supports both encrypted and non-encrypted communication based on agent configuration.
|
||||
|
||||
Args:
|
||||
agent_name: Name/role of the agent to delegate to (case-insensitive)
|
||||
@@ -106,19 +262,63 @@ class BaseAgentTool(BaseTool):
|
||||
error=f"No agent found with role '{sanitized_name}'"
|
||||
)
|
||||
|
||||
agent = agent[0]
|
||||
target_agent = agent[0]
|
||||
|
||||
# Determine sender agent (first agent with security config, or first agent as fallback)
|
||||
sender_agent = None
|
||||
for a in self.agents:
|
||||
if hasattr(a, 'security_config') and a.security_config:
|
||||
sender_agent = a
|
||||
break
|
||||
if not sender_agent:
|
||||
sender_agent = self.agents[0] if self.agents else target_agent
|
||||
|
||||
try:
|
||||
# Prepare communication with optional encryption
|
||||
communication_payload = self._prepare_communication_payload(
|
||||
sender_agent=sender_agent,
|
||||
recipient_agent=target_agent,
|
||||
task=task,
|
||||
context=context
|
||||
)
|
||||
|
||||
# Create task for execution
|
||||
task_with_assigned_agent = Task(
|
||||
description=task,
|
||||
agent=agent,
|
||||
expected_output=agent.i18n.slice("manager_request"),
|
||||
i18n=agent.i18n,
|
||||
agent=target_agent,
|
||||
expected_output=target_agent.i18n.slice("manager_request"),
|
||||
i18n=target_agent.i18n,
|
||||
)
|
||||
logger.debug(f"Created task for agent '{self.sanitize_agent_name(agent.role)}': {task}")
|
||||
return agent.execute_task(task_with_assigned_agent, context)
|
||||
|
||||
# Execute with processed communication context
|
||||
if isinstance(communication_payload, EncryptedMessage):
|
||||
# Emit encrypted task execution event
|
||||
crewai_event_bus.emit(
|
||||
target_agent,
|
||||
EncryptedTaskExecutionEvent(
|
||||
agent=target_agent
|
||||
)
|
||||
)
|
||||
|
||||
logger.info(f"Executing encrypted communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
|
||||
logger.debug(f"Executing encrypted communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
|
||||
# For encrypted messages, pass the encrypted payload as additional context
|
||||
# The target agent will need to handle decryption during execution
|
||||
enhanced_context = f"ENCRYPTED_COMMUNICATION: {communication_payload.model_dump_json()}"
|
||||
if context:
|
||||
enhanced_context += f"\nADDITIONAL_CONTEXT: {context}"
|
||||
result = target_agent.execute_task(task_with_assigned_agent, enhanced_context)
|
||||
else:
|
||||
logger.info(f"Executing plain communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
|
||||
logger.debug(f"Executing plain communication task for agent '{self.sanitize_agent_name(target_agent.role)}'")
|
||||
result = target_agent.execute_task(task_with_assigned_agent, context)
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
# Handle task creation or execution errors
|
||||
logger.error(f"Task execution failed for agent '{self.sanitize_agent_name(target_agent.role)}': {e}")
|
||||
return self.i18n.errors("agent_tool_execution_error").format(
|
||||
agent_role=self.sanitize_agent_name(agent.role),
|
||||
agent_role=self.sanitize_agent_name(target_agent.role),
|
||||
error=str(e)
|
||||
)
|
||||
|
||||
165
src/crewai/utilities/events/encryption_events.py
Normal file
165
src/crewai/utilities/events/encryption_events.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""
|
||||
Encryption events for agent-to-agent communication
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from .base_events import BaseEvent
|
||||
|
||||
|
||||
class EncryptionStartedEvent(BaseEvent):
|
||||
"""Event emitted when agent-to-agent encryption starts"""
|
||||
|
||||
sender_agent: BaseAgent
|
||||
recipient_agent: BaseAgent
|
||||
message_type: str = "agent_communication"
|
||||
type: str = "encryption_started"
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def __init__(self, **data):
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the sender agent
|
||||
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
|
||||
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.sender_agent.fingerprint, "metadata")
|
||||
and self.sender_agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
|
||||
|
||||
|
||||
class EncryptionCompletedEvent(BaseEvent):
|
||||
"""Event emitted when agent-to-agent encryption completes successfully"""
|
||||
|
||||
sender_agent: BaseAgent
|
||||
recipient_agent: BaseAgent
|
||||
message_type: str = "agent_communication"
|
||||
type: str = "encryption_completed"
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def __init__(self, **data):
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the sender agent
|
||||
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
|
||||
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.sender_agent.fingerprint, "metadata")
|
||||
and self.sender_agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
|
||||
|
||||
|
||||
class DecryptionStartedEvent(BaseEvent):
|
||||
"""Event emitted when agent-to-agent decryption starts"""
|
||||
|
||||
recipient_agent: BaseAgent
|
||||
sender_fingerprint: str
|
||||
message_type: str = "agent_communication"
|
||||
type: str = "decryption_started"
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def __init__(self, **data):
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the recipient agent
|
||||
if hasattr(self.recipient_agent, "fingerprint") and self.recipient_agent.fingerprint:
|
||||
self.source_fingerprint = self.recipient_agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.recipient_agent.fingerprint, "metadata")
|
||||
and self.recipient_agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.recipient_agent.fingerprint.metadata
|
||||
|
||||
|
||||
class DecryptionCompletedEvent(BaseEvent):
|
||||
"""Event emitted when agent-to-agent decryption completes successfully"""
|
||||
|
||||
recipient_agent: BaseAgent
|
||||
sender_fingerprint: str
|
||||
message_type: str = "agent_communication"
|
||||
type: str = "decryption_completed"
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def __init__(self, **data):
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the recipient agent
|
||||
if hasattr(self.recipient_agent, "fingerprint") and self.recipient_agent.fingerprint:
|
||||
self.source_fingerprint = self.recipient_agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.recipient_agent.fingerprint, "metadata")
|
||||
and self.recipient_agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.recipient_agent.fingerprint.metadata
|
||||
|
||||
|
||||
class EncryptedCommunicationStartedEvent(BaseEvent):
|
||||
"""Event emitted when encrypted communication between agents begins"""
|
||||
|
||||
sender_agent: BaseAgent
|
||||
recipient_agent: BaseAgent
|
||||
type: str = "encrypted_communication_started"
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def __init__(self, **data):
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the sender agent
|
||||
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
|
||||
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.sender_agent.fingerprint, "metadata")
|
||||
and self.sender_agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
|
||||
|
||||
|
||||
class EncryptedCommunicationEstablishedEvent(BaseEvent):
|
||||
"""Event emitted when encrypted communication is successfully established"""
|
||||
|
||||
sender_agent: BaseAgent
|
||||
recipient_agent: BaseAgent
|
||||
type: str = "encrypted_communication_established"
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def __init__(self, **data):
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the sender agent
|
||||
if hasattr(self.sender_agent, "fingerprint") and self.sender_agent.fingerprint:
|
||||
self.source_fingerprint = self.sender_agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.sender_agent.fingerprint, "metadata")
|
||||
and self.sender_agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.sender_agent.fingerprint.metadata
|
||||
|
||||
|
||||
class EncryptedTaskExecutionEvent(BaseEvent):
|
||||
"""Event emitted when an encrypted communication task is being executed"""
|
||||
|
||||
agent: BaseAgent
|
||||
task_type: str = "encrypted_communication"
|
||||
type: str = "encrypted_task_execution"
|
||||
|
||||
model_config = {"arbitrary_types_allowed": True}
|
||||
|
||||
def __init__(self, **data):
|
||||
super().__init__(**data)
|
||||
# Set fingerprint data from the agent
|
||||
if hasattr(self.agent, "fingerprint") and self.agent.fingerprint:
|
||||
self.source_fingerprint = self.agent.fingerprint.uuid_str
|
||||
self.source_type = "agent"
|
||||
if (
|
||||
hasattr(self.agent.fingerprint, "metadata")
|
||||
and self.agent.fingerprint.metadata
|
||||
):
|
||||
self.fingerprint_metadata = self.agent.fingerprint.metadata
|
||||
@@ -37,6 +37,15 @@ from .agent_events import (
|
||||
LiteAgentExecutionErrorEvent,
|
||||
LiteAgentExecutionStartedEvent,
|
||||
)
|
||||
from .encryption_events import (
|
||||
EncryptionStartedEvent,
|
||||
EncryptionCompletedEvent,
|
||||
DecryptionStartedEvent,
|
||||
DecryptionCompletedEvent,
|
||||
EncryptedCommunicationStartedEvent,
|
||||
EncryptedCommunicationEstablishedEvent,
|
||||
EncryptedTaskExecutionEvent,
|
||||
)
|
||||
from .crew_events import (
|
||||
CrewKickoffCompletedEvent,
|
||||
CrewKickoffFailedEvent,
|
||||
@@ -513,5 +522,70 @@ class EventListener(BaseEventListener):
|
||||
event.verbose,
|
||||
)
|
||||
|
||||
# Encryption event handlers
|
||||
@crewai_event_bus.on(EncryptedCommunicationStartedEvent)
|
||||
def on_encrypted_communication_started(source, event: EncryptedCommunicationStartedEvent):
|
||||
self.formatter.handle_encryption_communication_started(
|
||||
event.sender_agent.role,
|
||||
event.recipient_agent.role
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(EncryptedCommunicationEstablishedEvent)
|
||||
def on_encrypted_communication_established(source, event: EncryptedCommunicationEstablishedEvent):
|
||||
self.formatter.handle_encryption_communication_established(
|
||||
event.sender_agent.role,
|
||||
event.recipient_agent.role
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(EncryptionStartedEvent)
|
||||
def on_encryption_started(source, event: EncryptionStartedEvent):
|
||||
recipient_fingerprint = ""
|
||||
if hasattr(event.recipient_agent, "fingerprint") and event.recipient_agent.fingerprint:
|
||||
recipient_fingerprint = event.recipient_agent.fingerprint.uuid_str[:8] + "..."
|
||||
self.formatter.handle_encryption_started(
|
||||
event.message_type,
|
||||
recipient_fingerprint
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(EncryptionCompletedEvent)
|
||||
def on_encryption_completed(source, event: EncryptionCompletedEvent):
|
||||
sender_fingerprint = ""
|
||||
recipient_fingerprint = ""
|
||||
if hasattr(event.sender_agent, "fingerprint") and event.sender_agent.fingerprint:
|
||||
sender_fingerprint = event.sender_agent.fingerprint.uuid_str[:8] + "..."
|
||||
if hasattr(event.recipient_agent, "fingerprint") and event.recipient_agent.fingerprint:
|
||||
recipient_fingerprint = event.recipient_agent.fingerprint.uuid_str[:8] + "..."
|
||||
self.formatter.handle_encryption_completed(
|
||||
event.message_type,
|
||||
sender_fingerprint,
|
||||
recipient_fingerprint
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(DecryptionStartedEvent)
|
||||
def on_decryption_started(source, event: DecryptionStartedEvent):
|
||||
sender_fingerprint = event.sender_fingerprint[:8] + "..." if event.sender_fingerprint else ""
|
||||
self.formatter.handle_decryption_started(
|
||||
event.message_type,
|
||||
sender_fingerprint
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(DecryptionCompletedEvent)
|
||||
def on_decryption_completed(source, event: DecryptionCompletedEvent):
|
||||
sender_fingerprint = event.sender_fingerprint[:8] + "..." if event.sender_fingerprint else ""
|
||||
recipient_fingerprint = ""
|
||||
if hasattr(event.recipient_agent, "fingerprint") and event.recipient_agent.fingerprint:
|
||||
recipient_fingerprint = event.recipient_agent.fingerprint.uuid_str[:8] + "..."
|
||||
self.formatter.handle_decryption_completed(
|
||||
event.message_type,
|
||||
sender_fingerprint,
|
||||
recipient_fingerprint
|
||||
)
|
||||
|
||||
@crewai_event_bus.on(EncryptedTaskExecutionEvent)
|
||||
def on_encrypted_task_execution(source, event: EncryptedTaskExecutionEvent):
|
||||
self.formatter.handle_encrypted_task_execution(
|
||||
event.agent.role
|
||||
)
|
||||
|
||||
|
||||
event_listener = EventListener()
|
||||
|
||||
@@ -1754,3 +1754,151 @@ class ConsoleFormatter:
|
||||
Attempts=f"{retry_count + 1}",
|
||||
)
|
||||
self.print_panel(content, "🛡️ Guardrail Failed", "red")
|
||||
|
||||
# Encryption event handlers
|
||||
|
||||
def handle_encryption_communication_started(
|
||||
self, sender_role: str, recipient_role: str
|
||||
) -> None:
|
||||
"""Handle encrypted communication started event."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
content = Text()
|
||||
content.append("Starting encrypted communication from '", style="white")
|
||||
content.append(f"{sender_role}", style="bright_cyan bold")
|
||||
content.append("' to '", style="white")
|
||||
content.append(f"{recipient_role}", style="bright_cyan bold")
|
||||
content.append("'", style="white")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title="🔐 Encrypted Communication",
|
||||
border_style="cyan",
|
||||
padding=(0, 1),
|
||||
)
|
||||
self.print(panel)
|
||||
|
||||
def handle_encryption_communication_established(
|
||||
self, sender_role: str, recipient_role: str
|
||||
) -> None:
|
||||
"""Handle encrypted communication established event."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
content = Text()
|
||||
content.append("Encrypted communication established between '", style="white")
|
||||
content.append(f"{sender_role}", style="bright_green bold")
|
||||
content.append("' and '", style="white")
|
||||
content.append(f"{recipient_role}", style="bright_green bold")
|
||||
content.append("'", style="white")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title="✅ Communication Secured",
|
||||
border_style="green",
|
||||
padding=(0, 1),
|
||||
)
|
||||
self.print(panel)
|
||||
|
||||
def handle_encryption_started(
|
||||
self, message_type: str, recipient_fingerprint: str
|
||||
) -> None:
|
||||
"""Handle encryption started event."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
content = Text()
|
||||
content.append(f"Starting encryption for {message_type} message to recipient ", style="white")
|
||||
content.append(f"{recipient_fingerprint}", style="bright_yellow")
|
||||
content.append("...", style="white")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title="🔒 Encrypting Message",
|
||||
border_style="yellow",
|
||||
padding=(0, 1),
|
||||
)
|
||||
self.print(panel)
|
||||
|
||||
def handle_encryption_completed(
|
||||
self, message_type: str, sender_fingerprint: str, recipient_fingerprint: str
|
||||
) -> None:
|
||||
"""Handle encryption completed event."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
content = Text()
|
||||
content.append(f"Successfully encrypted {message_type} message from ", style="white")
|
||||
content.append(f"{sender_fingerprint}", style="bright_green")
|
||||
content.append(" to ", style="white")
|
||||
content.append(f"{recipient_fingerprint}", style="bright_green")
|
||||
content.append("...", style="white")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title="✅ Message Encrypted",
|
||||
border_style="green",
|
||||
padding=(0, 1),
|
||||
)
|
||||
self.print(panel)
|
||||
|
||||
def handle_decryption_started(
|
||||
self, message_type: str, sender_fingerprint: str
|
||||
) -> None:
|
||||
"""Handle decryption started event."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
content = Text()
|
||||
content.append(f"Starting decryption of {message_type} message from sender ", style="white")
|
||||
content.append(f"{sender_fingerprint}", style="bright_yellow")
|
||||
content.append("...", style="white")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title="🔓 Decrypting Message",
|
||||
border_style="yellow",
|
||||
padding=(0, 1),
|
||||
)
|
||||
self.print(panel)
|
||||
|
||||
def handle_decryption_completed(
|
||||
self, message_type: str, sender_fingerprint: str, recipient_fingerprint: str
|
||||
) -> None:
|
||||
"""Handle decryption completed event."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
content = Text()
|
||||
content.append(f"Successfully decrypted {message_type} message from ", style="white")
|
||||
content.append(f"{sender_fingerprint}", style="bright_green")
|
||||
content.append(" to ", style="white")
|
||||
content.append(f"{recipient_fingerprint}", style="bright_green")
|
||||
content.append("...", style="white")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title="✅ Message Decrypted",
|
||||
border_style="green",
|
||||
padding=(0, 1),
|
||||
)
|
||||
self.print(panel)
|
||||
|
||||
def handle_encrypted_task_execution(self, agent_role: str) -> None:
|
||||
"""Handle encrypted task execution event."""
|
||||
if not self.verbose:
|
||||
return
|
||||
|
||||
content = Text()
|
||||
content.append("Executing encrypted communication task for agent '", style="white")
|
||||
content.append(f"{agent_role}", style="bright_blue bold")
|
||||
content.append("'", style="white")
|
||||
|
||||
panel = Panel(
|
||||
content,
|
||||
title="🔐 Executing Encrypted Task",
|
||||
border_style="blue",
|
||||
padding=(0, 1),
|
||||
)
|
||||
self.print(panel)
|
||||
|
||||
314
tests/security/test_encrypted_communication.py
Normal file
314
tests/security/test_encrypted_communication.py
Normal file
@@ -0,0 +1,314 @@
|
||||
"""Test encrypted agent communication functionality."""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from crewai.security import (
|
||||
AgentCommunicationEncryption,
|
||||
EncryptedMessage,
|
||||
Fingerprint,
|
||||
SecurityConfig
|
||||
)
|
||||
from crewai.agent import Agent
|
||||
from crewai.tools.agent_tools.ask_question_tool import AskQuestionTool
|
||||
from crewai.tools.agent_tools.delegate_work_tool import DelegateWorkTool
|
||||
|
||||
|
||||
class TestAgentCommunicationEncryption:
|
||||
"""Test the encryption/decryption functionality."""
|
||||
|
||||
def test_encryption_initialization(self):
|
||||
"""Test initialization of encryption handler."""
|
||||
fp = Fingerprint()
|
||||
encryption = AgentCommunicationEncryption(fp)
|
||||
|
||||
assert encryption.agent_fingerprint == fp
|
||||
assert encryption._encryption_keys == {}
|
||||
|
||||
def test_key_derivation_consistency(self):
|
||||
"""Test that key derivation is consistent for same agent pair."""
|
||||
fp1 = Fingerprint()
|
||||
fp2 = Fingerprint()
|
||||
|
||||
encryption1 = AgentCommunicationEncryption(fp1)
|
||||
encryption2 = AgentCommunicationEncryption(fp2)
|
||||
|
||||
# Keys should be the same regardless of which agent derives them
|
||||
key1_from_1 = encryption1._derive_communication_key(fp1.uuid_str, fp2.uuid_str)
|
||||
key1_from_2 = encryption2._derive_communication_key(fp1.uuid_str, fp2.uuid_str)
|
||||
key2_from_1 = encryption1._derive_communication_key(fp2.uuid_str, fp1.uuid_str)
|
||||
key2_from_2 = encryption2._derive_communication_key(fp2.uuid_str, fp1.uuid_str)
|
||||
|
||||
assert key1_from_1 == key1_from_2
|
||||
assert key1_from_1 == key2_from_1
|
||||
assert key1_from_1 == key2_from_2
|
||||
|
||||
def test_message_encryption_decryption(self):
|
||||
"""Test basic message encryption and decryption."""
|
||||
sender_fp = Fingerprint()
|
||||
recipient_fp = Fingerprint()
|
||||
|
||||
sender_encryption = AgentCommunicationEncryption(sender_fp)
|
||||
recipient_encryption = AgentCommunicationEncryption(recipient_fp)
|
||||
|
||||
original_message = "Hello, this is a test message"
|
||||
|
||||
# Encrypt message
|
||||
encrypted_msg = sender_encryption.encrypt_message(
|
||||
original_message,
|
||||
recipient_fp,
|
||||
"test_message"
|
||||
)
|
||||
|
||||
assert isinstance(encrypted_msg, EncryptedMessage)
|
||||
assert encrypted_msg.sender_fingerprint == sender_fp.uuid_str
|
||||
assert encrypted_msg.recipient_fingerprint == recipient_fp.uuid_str
|
||||
assert encrypted_msg.message_type == "test_message"
|
||||
assert encrypted_msg.encrypted_payload != original_message
|
||||
|
||||
# Decrypt message
|
||||
decrypted_message = recipient_encryption.decrypt_message(encrypted_msg)
|
||||
|
||||
assert decrypted_message == original_message
|
||||
|
||||
def test_dict_message_encryption_decryption(self):
|
||||
"""Test encryption and decryption of dictionary messages."""
|
||||
sender_fp = Fingerprint()
|
||||
recipient_fp = Fingerprint()
|
||||
|
||||
sender_encryption = AgentCommunicationEncryption(sender_fp)
|
||||
recipient_encryption = AgentCommunicationEncryption(recipient_fp)
|
||||
|
||||
original_message = {
|
||||
"task": "Analyze this data",
|
||||
"context": "This is important context",
|
||||
"priority": "high"
|
||||
}
|
||||
|
||||
# Encrypt message
|
||||
encrypted_msg = sender_encryption.encrypt_message(
|
||||
original_message,
|
||||
recipient_fp
|
||||
)
|
||||
|
||||
# Decrypt message
|
||||
decrypted_message = recipient_encryption.decrypt_message(encrypted_msg)
|
||||
|
||||
assert decrypted_message == original_message
|
||||
|
||||
def test_wrong_recipient_decryption_fails(self):
|
||||
"""Test that decryption fails for wrong recipient."""
|
||||
sender_fp = Fingerprint()
|
||||
recipient_fp = Fingerprint()
|
||||
wrong_recipient_fp = Fingerprint()
|
||||
|
||||
sender_encryption = AgentCommunicationEncryption(sender_fp)
|
||||
wrong_recipient_encryption = AgentCommunicationEncryption(wrong_recipient_fp)
|
||||
|
||||
original_message = "Secret message"
|
||||
|
||||
# Encrypt for correct recipient
|
||||
encrypted_msg = sender_encryption.encrypt_message(
|
||||
original_message,
|
||||
recipient_fp
|
||||
)
|
||||
|
||||
# Try to decrypt with wrong recipient
|
||||
with pytest.raises(ValueError, match="Message not intended for this agent"):
|
||||
wrong_recipient_encryption.decrypt_message(encrypted_msg)
|
||||
|
||||
def test_is_encrypted_communication(self):
|
||||
"""Test detection of encrypted communication."""
|
||||
fp = Fingerprint()
|
||||
encryption = AgentCommunicationEncryption(fp)
|
||||
|
||||
# Test with EncryptedMessage
|
||||
encrypted_msg = EncryptedMessage(
|
||||
encrypted_payload="test",
|
||||
sender_fingerprint="sender",
|
||||
recipient_fingerprint="recipient"
|
||||
)
|
||||
assert encryption.is_encrypted_communication(encrypted_msg) is True
|
||||
|
||||
# Test with dict containing encrypted_payload
|
||||
encrypted_dict = {
|
||||
"encrypted_payload": "test",
|
||||
"sender_fingerprint": "sender"
|
||||
}
|
||||
assert encryption.is_encrypted_communication(encrypted_dict) is True
|
||||
|
||||
# Test with regular message
|
||||
regular_msg = "Plain text message"
|
||||
assert encryption.is_encrypted_communication(regular_msg) is False
|
||||
|
||||
# Test with regular dict
|
||||
regular_dict = {"task": "Do something"}
|
||||
assert encryption.is_encrypted_communication(regular_dict) is False
|
||||
|
||||
|
||||
class TestSecurityConfigEncryption:
|
||||
"""Test SecurityConfig encryption settings."""
|
||||
|
||||
def test_security_config_encryption_default(self):
|
||||
"""Test default encryption setting."""
|
||||
config = SecurityConfig()
|
||||
assert config.encrypted_communication is False
|
||||
|
||||
def test_security_config_encryption_enabled(self):
|
||||
"""Test enabling encryption."""
|
||||
config = SecurityConfig(encrypted_communication=True)
|
||||
assert config.encrypted_communication is True
|
||||
|
||||
|
||||
class TestAgentToolsEncryption:
|
||||
"""Test encrypted communication in agent tools."""
|
||||
|
||||
@pytest.fixture
|
||||
def agents_with_encryption(self):
|
||||
"""Create test agents with encryption enabled."""
|
||||
# Create agents with security configs
|
||||
sender_agent = Mock()
|
||||
sender_agent.role = "sender"
|
||||
sender_agent.security_config = SecurityConfig(encrypted_communication=True)
|
||||
sender_agent.i18n = Mock()
|
||||
|
||||
recipient_agent = Mock()
|
||||
recipient_agent.role = "recipient"
|
||||
recipient_agent.security_config = SecurityConfig(encrypted_communication=True)
|
||||
recipient_agent.i18n = Mock()
|
||||
recipient_agent.i18n.slice.return_value = "Expected output"
|
||||
recipient_agent.execute_task = Mock(return_value="Task completed")
|
||||
|
||||
return [sender_agent, recipient_agent]
|
||||
|
||||
@pytest.fixture
|
||||
def agents_without_encryption(self):
|
||||
"""Create test agents without encryption."""
|
||||
sender_agent = Mock()
|
||||
sender_agent.role = "sender"
|
||||
sender_agent.security_config = SecurityConfig(encrypted_communication=False)
|
||||
sender_agent.i18n = Mock()
|
||||
|
||||
recipient_agent = Mock()
|
||||
recipient_agent.role = "recipient"
|
||||
recipient_agent.security_config = SecurityConfig(encrypted_communication=False)
|
||||
recipient_agent.i18n = Mock()
|
||||
recipient_agent.i18n.slice.return_value = "Expected output"
|
||||
recipient_agent.execute_task = Mock(return_value="Task completed")
|
||||
|
||||
return [sender_agent, recipient_agent]
|
||||
|
||||
def test_encryption_enabled_detection(self, agents_with_encryption):
|
||||
"""Test detection of encryption capability."""
|
||||
tool = AskQuestionTool(agents=agents_with_encryption, description="Test tool")
|
||||
assert tool.encryption_enabled is True
|
||||
|
||||
def test_encryption_disabled_detection(self, agents_without_encryption):
|
||||
"""Test detection when encryption is disabled."""
|
||||
tool = AskQuestionTool(agents=agents_without_encryption, description="Test tool")
|
||||
assert tool.encryption_enabled is False
|
||||
|
||||
def test_prepare_encrypted_communication_payload(self, agents_with_encryption):
|
||||
"""Test preparation of encrypted communication payload."""
|
||||
sender, recipient = agents_with_encryption
|
||||
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
|
||||
|
||||
payload = tool._prepare_communication_payload(
|
||||
sender_agent=sender,
|
||||
recipient_agent=recipient,
|
||||
task="Test task",
|
||||
context="Test context"
|
||||
)
|
||||
|
||||
assert isinstance(payload, EncryptedMessage)
|
||||
assert payload.sender_fingerprint == sender.security_config.fingerprint.uuid_str
|
||||
assert payload.recipient_fingerprint == recipient.security_config.fingerprint.uuid_str
|
||||
|
||||
def test_prepare_plain_communication_payload(self, agents_without_encryption):
|
||||
"""Test preparation of plain communication payload."""
|
||||
sender, recipient = agents_without_encryption
|
||||
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
|
||||
|
||||
payload = tool._prepare_communication_payload(
|
||||
sender_agent=sender,
|
||||
recipient_agent=recipient,
|
||||
task="Test task",
|
||||
context="Test context"
|
||||
)
|
||||
|
||||
assert isinstance(payload, dict)
|
||||
assert payload["task"] == "Test task"
|
||||
assert payload["context"] == "Test context"
|
||||
assert payload["sender_role"] == "sender"
|
||||
|
||||
def test_execute_with_encryption_enabled(self, agents_with_encryption):
|
||||
"""Test task execution with encryption enabled."""
|
||||
sender, recipient = agents_with_encryption
|
||||
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
|
||||
|
||||
result = tool._run(
|
||||
question="What is AI?",
|
||||
context="Test context",
|
||||
coworker="recipient"
|
||||
)
|
||||
|
||||
# Verify task was executed
|
||||
assert result == "Task completed"
|
||||
|
||||
# Verify execute_task was called with encrypted context
|
||||
recipient.execute_task.assert_called_once()
|
||||
call_args = recipient.execute_task.call_args
|
||||
context_arg = call_args[0][1] # Second argument is context
|
||||
|
||||
assert "ENCRYPTED_COMMUNICATION:" in context_arg
|
||||
|
||||
def test_execute_with_encryption_disabled(self, agents_without_encryption):
|
||||
"""Test task execution with encryption disabled."""
|
||||
sender, recipient = agents_without_encryption
|
||||
tool = AskQuestionTool(agents=[sender, recipient], description="Test tool")
|
||||
|
||||
result = tool._run(
|
||||
question="What is AI?",
|
||||
context="Test context",
|
||||
coworker="recipient"
|
||||
)
|
||||
|
||||
# Verify task was executed
|
||||
assert result == "Task completed"
|
||||
|
||||
# Verify execute_task was called with plain context
|
||||
recipient.execute_task.assert_called_once()
|
||||
call_args = recipient.execute_task.call_args
|
||||
context_arg = call_args[0][1] # Second argument is context
|
||||
|
||||
assert context_arg == "Test context"
|
||||
|
||||
|
||||
class TestBackwardCompatibility:
|
||||
"""Test that existing functionality still works."""
|
||||
|
||||
def test_agents_without_security_config_work(self):
|
||||
"""Test that agents without security config still function."""
|
||||
# Create agents without security config
|
||||
agent1 = Mock()
|
||||
agent1.role = "agent1"
|
||||
agent1.i18n = Mock()
|
||||
agent1.i18n.slice.return_value = "Expected output"
|
||||
agent1.execute_task = Mock(return_value="Task completed")
|
||||
# No security_config attribute
|
||||
|
||||
agent2 = Mock()
|
||||
agent2.role = "agent2"
|
||||
agent2.i18n = Mock()
|
||||
|
||||
tool = AskQuestionTool(agents=[agent1, agent2], description="Test tool")
|
||||
|
||||
result = tool._run(
|
||||
question="Test question",
|
||||
context="Test context",
|
||||
coworker="agent1"
|
||||
)
|
||||
|
||||
assert result == "Task completed"
|
||||
agent1.execute_task.assert_called_once()
|
||||
Reference in New Issue
Block a user