commit f7cca439cc1de33510533b13ef49512d56fa4e9e Author: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu Dec 12 16:00:10 2024 +0000 refactor: Implement CrewAI Flow for email processing - Add EmailState model for Flow state management - Create EmailProcessingFlow class with event-based automation - Update tools and crews for Flow integration - Add comprehensive Flow tests - Implement error handling and state tracking - Add mock implementations for testing This implementation uses CrewAI Flow features to create an event-based email processing system that can analyze emails, research senders, and generate appropriate responses using specialized AI crews. Co-Authored-By: Joe Moura diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..96ba13312 --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual Environment +.env +.venv +env/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Testing +.coverage +htmlcov/ +.pytest_cache/ diff --git a/README.md b/README.md new file mode 100644 index 000000000..c0a4e0fb1 --- /dev/null +++ b/README.md @@ -0,0 +1,172 @@ +# Email Processor + +A CrewAI-powered email processing system that analyzes and responds to emails intelligently. + +## Features + +- Automated email analysis using AI agents +- Smart response generation based on context +- Thread history analysis +- Sender research and profiling +- Priority-based response handling + +## Installation + +```bash +# Clone the repository +git clone https://github.com/yourusername/email-processor.git +cd email-processor + +# Install using UV package manager (recommended) +uv sync --dev --all-extras +uv build +pip install dist/*.whl + +# Or install using pip +pip install -r requirements.txt +python setup.py install +``` + +## Dependencies + +- Python 3.8+ +- CrewAI +- Google API Python Client (for Gmail integration) + +## Quick Start + +```python +from email_processor import EmailAnalysisCrew, ResponseCrew, GmailTool + +# Initialize the email analysis crew +analysis_crew = EmailAnalysisCrew() + +# Analyze an email thread +analysis = analysis_crew.analyze_email("thread_id") + +# If response is needed, use the response crew +if analysis["response_needed"]: + response_crew = ResponseCrew() + response = response_crew.draft_response("thread_id", analysis) + print(f"Generated response: {response['response']['content']}") +``` + +## Gmail Integration Setup + +1. Enable Gmail API in Google Cloud Console +2. Create OAuth 2.0 credentials +3. Download credentials file +4. Set up authentication: + +```python +from email_processor import GmailTool + +gmail_tool = GmailTool() +# Follow authentication prompts +``` + +## Components + +### EmailAnalysisCrew +Analyzes email threads and determines response strategy: +```python +analysis = analysis_crew.analyze_email(thread_id) +print(f"Response needed: {analysis['response_needed']}") +print(f"Priority: {analysis['priority']}") +``` + +### ResponseCrew +Generates contextually appropriate email responses: +```python +response = response_crew.draft_response(thread_id, analysis) +print(f"Response: {response['response']['content']}") +``` + +### GmailTool +Handles Gmail API integration: +```python +gmail_tool = GmailTool() +new_threads = gmail_tool.get_new_threads() +``` + +### EmailTool +Core email processing functionality: +```python +email_tool = EmailTool() +thread_context = email_tool.analyze_thread_context(thread_id) +``` + +## Complete Example + +```python +from email_processor import EmailAnalysisCrew, ResponseCrew, GmailTool + +def process_new_emails(): + # Initialize components + gmail_tool = GmailTool() + analysis_crew = EmailAnalysisCrew() + response_crew = ResponseCrew() + + # Process new emails + new_threads = gmail_tool.get_new_threads() + for thread_id in new_threads: + # Analyze thread + analysis = analysis_crew.analyze_email(thread_id) + + print(f"Thread {thread_id}:") + print(f"Priority: {analysis['priority']}") + print(f"Similar threads found: {analysis['similar_threads_found']}") + + # Generate response if needed + if analysis["response_needed"]: + response = response_crew.draft_response(thread_id, analysis) + print(f"Response generated: {response['response']['content']}") + +if __name__ == "__main__": + process_new_emails() +``` + +## Development + +```bash +# Clone repository +git clone https://github.com/yourusername/email-processor.git + +# Install development dependencies +cd email-processor +uv sync --dev --all-extras + +# Run tests +python -m pytest tests/ +``` + +## Project Structure + +``` +email_processor/ +├── src/ +│ └── email_processor/ +│ ├── __init__.py +│ ├── email_analysis_crew.py +│ ├── response_crew.py +│ ├── email_tool.py +│ ├── gmail_tool.py +│ ├── gmail_auth.py +│ └── mock_email_data.py +├── tests/ +├── requirements.txt +├── setup.py +└── README.md +``` + +## License + +MIT License + +## Contributing + +1. Fork the repository +2. Create your feature branch +3. Commit your changes +4. Push to the branch +5. Create a Pull Request diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 000000000..da072504d --- /dev/null +++ b/docs/README.md @@ -0,0 +1,146 @@ +# Email Processing System with CrewAI + +A smart email processing system that uses CrewAI to analyze Gmail messages and automatically generate appropriate responses based on context and history. + +## Quick Start + +```bash +# Install required packages +pip install crewai 'crewai[tools]' +pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib +``` + +## Example Usage + +```python +from email_analysis_crew import EmailAnalysisCrew +from response_crew import ResponseCrew +from gmail_tool import GmailTool + +# Initialize Gmail connection +gmail = GmailTool() +gmail.authenticate() + +# Create analysis crew +analysis_crew = EmailAnalysisCrew( + gmail_tool=gmail, + config={ + "check_similar_threads": True, + "analyze_sender_history": True + } +) + +# Process new emails +new_emails = gmail.get_new_emails() +for email in new_emails: + # Analyze email and decide on response + analysis = analysis_crew.analyze_email(email) + + # If analysis determines response is needed + if analysis["response_needed"]: + # Create new crew for response generation + response_crew = ResponseCrew( + email_context=analysis, + gmail_tool=gmail + ) + + # Generate and send response + response = response_crew.generate_response() + gmail.send_response(response) +``` + +## How It Works + +1. **Email Analysis** + - System connects to Gmail + - Retrieves new emails + - Analyzes email context and history + - Checks for similar threads + - Researches sender and company + +2. **Response Decision** + The analysis crew decides whether to respond based on: + - Email urgency + - Sender relationship + - Previous interactions + - Business context + - Similar thread history + +3. **Response Generation** + If a response is needed: + - New response crew is created + - Analyzes email context + - Generates appropriate response + - Reviews for tone and content + - Sends through Gmail + +## Setup Instructions + +1. **Install Dependencies** + ```bash + pip install crewai 'crewai[tools]' + pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib + ``` + +2. **Gmail Setup** + - Visit [Google Cloud Console](https://console.cloud.google.com) + - Create new project + - Enable Gmail API + - Create OAuth credentials + - Download as `credentials.json` + +3. **Configure Authentication** + ```python + from gmail_tool import GmailTool + + gmail = GmailTool() + gmail.authenticate() # Opens browser for auth + ``` + +## Configuration + +```python +# Analysis configuration +config = { + "check_similar_threads": True, # Look for similar conversations + "analyze_sender_history": True, # Check previous interactions + "research_company": True, # Research sender's company + "priority_threshold": 0.7 # Threshold for response +} + +# Create crews +analysis_crew = EmailAnalysisCrew( + gmail_tool=gmail, + config=config +) + +response_crew = ResponseCrew( + email_context=analysis, + gmail_tool=gmail +) +``` + +## Components + +- `EmailAnalysisCrew`: Analyzes emails and makes response decisions +- `ResponseCrew`: Generates appropriate responses +- `GmailTool`: Handles Gmail integration +- `EmailTool`: Provides email processing utilities + +## Error Handling + +The system includes handling for: +- Gmail API connection issues +- Authentication errors +- Rate limiting +- Invalid email formats + +## Contributing + +1. Fork the repository +2. Create feature branch +3. Submit pull request + +## License + +MIT License diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..dce35434f --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,22 @@ +[build-system] +requires = ["setuptools>=45", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "email-processor" +version = "0.1.0" +description = "CrewAI-powered email processing system" +readme = "docs/README.md" +requires-python = ">=3.10,<=3.13" +license = {text = "MIT"} +dependencies = [ + "crewai>=0.86.0", # Updated to latest version with Flow support + "crewai-tools>=0.1.0", + "google-api-python-client>=2.0.0", + "google-auth>=2.0.0", + "google-auth-oauthlib>=0.4.6", + "python-dotenv>=0.19.0", +] + +[tool.setuptools] +package-dir = {"" = "src"} diff --git a/src/email_processor/__init__.py b/src/email_processor/__init__.py new file mode 100644 index 000000000..1f3a9aa5c --- /dev/null +++ b/src/email_processor/__init__.py @@ -0,0 +1,14 @@ +""" +CrewAI Email Processing System +============================= + +A system for intelligent email processing and response generation using CrewAI. +""" + +from .email_analysis_crew import EmailAnalysisCrew +from .response_crew import ResponseCrew +from .gmail_tool import GmailTool +from .email_tool import EmailTool + +__version__ = "0.1.0" +__all__ = ['EmailAnalysisCrew', 'ResponseCrew', 'GmailTool', 'EmailTool'] diff --git a/src/email_processor/email_analysis_crew.py b/src/email_processor/email_analysis_crew.py new file mode 100644 index 000000000..1b571184b --- /dev/null +++ b/src/email_processor/email_analysis_crew.py @@ -0,0 +1,113 @@ +""" +Email analysis crew implementation using CrewAI. +Handles comprehensive email analysis including thread history and sender research. +""" +from crewai import Agent, Task, Crew, Process +from typing import Dict, List, Optional +from datetime import datetime + +from .gmail_tool import GmailTool + +class EmailAnalysisCrew: + """ + Crew for analyzing emails and determining response strategy. + """ + + def __init__(self, gmail_tool: Optional[GmailTool] = None): + """Initialize analysis crew with required tools""" + self.gmail_tool = gmail_tool or GmailTool() + self._create_agents() + + def _create_agents(self): + """Create specialized agents for email analysis""" + self.context_analyzer = Agent( + role="Email Context Analyst", + name="Context Analyzer", + goal="Analyze email context and history", + backstory="Expert at understanding email threads and communication patterns", + tools=[self.gmail_tool], + verbose=True + ) + + self.research_specialist = Agent( + role="Research Specialist", + name="Research Expert", + goal="Research sender and company background", + backstory="Skilled at gathering and analyzing business and personal information", + tools=[self.gmail_tool], + verbose=True + ) + + self.response_strategist = Agent( + role="Response Strategist", + name="Strategy Expert", + goal="Determine optimal response approach", + backstory="Expert at developing communication strategies", + tools=[self.gmail_tool], + verbose=True + ) + + def analyze_email(self, + email: Dict, + thread_history: List[Dict], + sender_info: Dict) -> Dict: + """ + Analyze email with comprehensive context. + + Args: + email: Current email data + thread_history: Previous thread messages + sender_info: Information about the sender + + Returns: + Dict: Analysis results including response decision + """ + try: + # Create analysis crew + crew = Crew( + agents=[ + self.context_analyzer, + self.research_specialist, + self.response_strategist + ], + tasks=[ + Task( + description="Analyze email context and thread history", + agent=self.context_analyzer + ), + Task( + description="Research sender and company background", + agent=self.research_specialist + ), + Task( + description="Determine response strategy", + agent=self.response_strategist + ) + ], + verbose=True + ) + + # Execute analysis + results = crew.kickoff() + + # Process results + return { + "email_id": email["id"], + "thread_id": email["thread_id"], + "response_needed": results[-1].get("response_needed", False), + "priority": results[-1].get("priority", "low"), + "similar_threads": results[0].get("similar_threads", []), + "sender_context": results[1].get("sender_context", {}), + "company_info": results[1].get("company_info", {}), + "response_strategy": results[-1].get("strategy", {}) + } + + except Exception as e: + print(f"Analysis error: {str(e)}") + return { + "email_id": email.get("id", "unknown"), + "thread_id": email.get("thread_id", "unknown"), + "error": f"Analysis failed: {str(e)}", + "response_needed": False, + "priority": "error" + } diff --git a/src/email_processor/email_flow.py b/src/email_processor/email_flow.py new file mode 100644 index 000000000..7b8a8ec87 --- /dev/null +++ b/src/email_processor/email_flow.py @@ -0,0 +1,100 @@ +""" +Email processing flow implementation using CrewAI. +Handles email polling, analysis, and response generation. +""" +from crewai import Flow +from typing import Dict, List, Optional +from datetime import datetime + +from .models import EmailState +from .gmail_tool import GmailTool +from .email_analysis_crew import EmailAnalysisCrew +from .response_crew import ResponseCrew + +class EmailProcessingFlow(Flow): + """ + Flow for processing emails using CrewAI. + Implements email fetching, analysis, and response generation. + """ + + def __init__(self, gmail_tool: Optional[GmailTool] = None): + """Initialize flow with required tools and crews""" + super().__init__() + self.gmail_tool = gmail_tool or GmailTool() + self.analysis_crew = EmailAnalysisCrew(gmail_tool=self.gmail_tool) + self.response_crew = ResponseCrew(gmail_tool=self.gmail_tool) + self._state = EmailState() + self._initialize_state() + + def _initialize_state(self): + """Initialize flow state attributes""" + if not hasattr(self._state, "latest_emails"): + self._state.latest_emails = [] + if not hasattr(self._state, "analysis_results"): + self._state.analysis_results = {} + if not hasattr(self._state, "generated_responses"): + self._state.generated_responses = {} + if not hasattr(self._state, "errors"): + self._state.errors = {} + + def kickoff(self) -> Dict: + """ + Execute the email processing flow. + + Returns: + Dict: Flow execution results + """ + try: + # Fetch latest emails (limited to 5) + self._state.latest_emails = self.gmail_tool.get_latest_emails(limit=5) + + # Analyze each email + for email in self._state.latest_emails: + email_id = email.get('id') + thread_id = email.get('thread_id') + sender_email = email.get('sender') # Now matches test format + + analysis = self.analysis_crew.analyze_email( + email=email, + thread_history=self._get_thread_history(thread_id), + sender_info=self._get_sender_info(sender_email) + ) + + self._state.analysis_results[email_id] = analysis + + # Generate response if needed + if analysis.get("response_needed", False): + response = self.response_crew.draft_response( + email=email, + analysis=analysis, + thread_history=self._get_thread_history(thread_id) + ) + self._state.generated_responses[email_id] = response + + return { + "emails_processed": len(self._state.latest_emails), + "analyses_completed": len(self._state.analysis_results), + "responses_generated": len(self._state.generated_responses) + } + + except Exception as e: + error_data = { + "error": str(e), + "timestamp": datetime.now().isoformat() + } + self._state.errors["flow_execution"] = [error_data] # Changed to list for multiple errors + return {"error": error_data} + + def _get_thread_history(self, thread_id: str) -> List[Dict]: + """Get thread history for email context""" + try: + return self.gmail_tool.get_thread_history(thread_id) + except Exception: + return [] + + def _get_sender_info(self, sender_email: str) -> Dict: + """Get sender information for context""" + try: + return self.gmail_tool.get_sender_info(sender_email) + except Exception: + return {} diff --git a/src/email_processor/email_tool.py b/src/email_processor/email_tool.py new file mode 100644 index 000000000..5fc52c72b --- /dev/null +++ b/src/email_processor/email_tool.py @@ -0,0 +1,171 @@ +"""Email processing tool for CrewAI""" +from crewai.tools import BaseTool +from pydantic import BaseModel, Field +from typing import List, Optional, Dict, Literal +from datetime import datetime +from .mock_email_data import ( + get_mock_thread, + get_mock_sender_info, + find_similar_threads, + get_sender_threads +) + +class EmailMessage(BaseModel): + from_email: str + to: List[str] + date: str + subject: str + body: str + +class EmailThread(BaseModel): + thread_id: str + messages: List[EmailMessage] + subject: str + participants: List[str] + last_message_date: datetime + +class EmailTool(BaseTool): + name: str = "Email Processing Tool" + description: str = "Processes emails, finds similar threads, and analyzes communication history" + + def _run(self, operation: Literal["get_thread", "find_similar", "get_history", "analyze_context"], + thread_id: Optional[str] = None, + query: Optional[str] = None, + sender_email: Optional[str] = None, + max_results: int = 10) -> Dict: + if operation == "get_thread": + if not thread_id: + raise ValueError("thread_id is required for get_thread operation") + return self.get_email_thread(thread_id).dict() + + elif operation == "find_similar": + if not query: + raise ValueError("query is required for find_similar operation") + threads = self.find_similar_threads(query, max_results) + return {"threads": [thread.dict() for thread in threads]} + + elif operation == "get_history": + if not sender_email: + raise ValueError("sender_email is required for get_history operation") + return self.get_sender_history(sender_email) + + elif operation == "analyze_context": + if not thread_id: + raise ValueError("thread_id is required for analyze_context operation") + return self.analyze_thread_context(thread_id) + + else: + raise ValueError(f"Unknown operation: {operation}") + + def get_email_thread(self, thread_id: str) -> EmailThread: + mock_thread = get_mock_thread(thread_id) + if not mock_thread: + raise Exception(f"Thread not found: {thread_id}") + + messages = [ + EmailMessage( + from_email=msg.from_email, + to=msg.to, + date=msg.date, + subject=msg.subject, + body=msg.body + ) for msg in mock_thread.messages + ] + + return EmailThread( + thread_id=mock_thread.thread_id, + messages=messages, + subject=mock_thread.subject, + participants=list({msg.from_email for msg in mock_thread.messages} | + {to for msg in mock_thread.messages for to in msg.to}), + last_message_date=datetime.fromisoformat(mock_thread.messages[-1].date) + ) + + def find_similar_threads(self, query: str, max_results: int = 10) -> List[EmailThread]: + similar_mock_threads = find_similar_threads(query)[:max_results] + return [ + EmailThread( + thread_id=thread.thread_id, + messages=[ + EmailMessage( + from_email=msg.from_email, + to=msg.to, + date=msg.date, + subject=msg.subject, + body=msg.body + ) for msg in thread.messages + ], + subject=thread.subject, + participants=list({msg.from_email for msg in thread.messages} | + {to for msg in thread.messages for to in msg.to}), + last_message_date=datetime.fromisoformat(thread.messages[-1].date) + ) + for thread in similar_mock_threads + ] + + def get_sender_history(self, sender_email: str) -> Dict: + sender_info = get_mock_sender_info(sender_email) + if not sender_info: + return { + "name": "", + "company": "", + "threads": [], + "last_interaction": None, + "interaction_frequency": "none" + } + + sender_threads = get_sender_threads(sender_email) + return { + "name": sender_info.name, + "company": sender_info.company, + "threads": [ + self.get_email_thread(thread.thread_id).dict() + for thread in sender_threads + ], + "last_interaction": sender_info.last_interaction, + "interaction_frequency": sender_info.interaction_frequency + } + + def analyze_thread_context(self, thread_id: str) -> Dict: + try: + # Get thread data + thread = self.get_email_thread(thread_id) + + # Get sender info from first message + sender_email = thread.messages[0].from_email + sender_info = self.get_sender_history(sender_email) + + # Find similar threads + similar_threads = self.find_similar_threads(thread.subject) + + # Create context summary + context_summary = { + "thread_length": len(thread.messages), + "thread_duration": ( + datetime.fromisoformat(thread.messages[-1].date) - + datetime.fromisoformat(thread.messages[0].date) + ).days, + "participant_count": len(thread.participants), + "has_previous_threads": len(similar_threads) > 0 + } + + return { + "thread": thread.dict(), + "sender_info": sender_info, + "similar_threads": [t.dict() for t in similar_threads], + "context_summary": context_summary + } + + except Exception as e: + print(f"Error analyzing thread context: {str(e)}") + return { + "thread": {}, + "sender_info": {}, + "similar_threads": [], + "context_summary": { + "thread_length": 0, + "thread_duration": 0, + "participant_count": 0, + "has_previous_threads": False + } + } diff --git a/src/email_processor/gmail_auth.py b/src/email_processor/gmail_auth.py new file mode 100644 index 000000000..23ebb728a --- /dev/null +++ b/src/email_processor/gmail_auth.py @@ -0,0 +1,51 @@ +"""Gmail authentication and configuration handler""" +import os +import json +import pickle +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request +from google.oauth2.credentials import Credentials +from typing import Optional + +class GmailAuthManager: + """Manages Gmail API authentication and credentials""" + + SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] + TOKEN_FILE = 'token.pickle' + CREDENTIALS_FILE = 'credentials.json' + + @classmethod + def get_credentials(cls) -> Optional[Credentials]: + """Get valid credentials, requesting user authentication if necessary.""" + creds = None + + # Load existing token if available + if os.path.exists(cls.TOKEN_FILE): + with open(cls.TOKEN_FILE, 'rb') as token: + creds = pickle.load(token) + + # If credentials are invalid or don't exist, refresh or create new ones + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + if not os.path.exists(cls.CREDENTIALS_FILE): + raise Exception( + f"Missing {cls.CREDENTIALS_FILE}. Please provide OAuth2 credentials." + ) + flow = InstalledAppFlow.from_client_secrets_file( + cls.CREDENTIALS_FILE, cls.SCOPES + ) + creds = flow.run_local_server(port=0) + + # Save credentials for future use + with open(cls.TOKEN_FILE, 'wb') as token: + pickle.dump(creds, token) + + return creds + + @classmethod + def clear_credentials(cls) -> None: + """Clear stored credentials""" + if os.path.exists(cls.TOKEN_FILE): + os.remove(cls.TOKEN_FILE) diff --git a/src/email_processor/gmail_tool.py b/src/email_processor/gmail_tool.py new file mode 100644 index 000000000..7fd6284b8 --- /dev/null +++ b/src/email_processor/gmail_tool.py @@ -0,0 +1,222 @@ +""" +Gmail integration tool for email processing flow. +Handles email fetching and thread context retrieval. +""" +from typing import Dict, List, Optional +from datetime import datetime +import base64 +import email +from email.mime.text import MIMEText +from googleapiclient.discovery import build +from google.oauth2.credentials import Credentials +from crewai.tools import BaseTool +from .gmail_auth import GmailAuthManager + +class GmailTool(BaseTool): + """Tool for interacting with Gmail API""" + name: str = "Gmail Tool" + description: str = "Tool for interacting with Gmail API to fetch and process emails" + + def __init__(self): + """Initialize Gmail API client""" + super().__init__() + self.credentials = GmailAuthManager.get_credentials() + self.service = build('gmail', 'v1', credentials=self.credentials) + + def get_latest_emails(self, limit: int = 5) -> List[Dict]: + """ + Get latest emails with thread context. + + Args: + limit: Maximum number of emails to fetch (default: 5) + + Returns: + List[Dict]: List of email data with context + """ + try: + # Get latest messages + messages = self.service.users().messages().list( + userId='me', + maxResults=limit, + labelIds=['INBOX'], + q='is:unread' # Focus on unread messages + ).execute().get('messages', []) + + # Get full email data with context + return [self._get_email_with_context(msg['id']) for msg in messages] + except Exception as e: + print(f"Error fetching emails: {e}") + return [] + + def _get_email_with_context(self, message_id: str) -> Dict: + """ + Get full email data with thread context. + + Args: + message_id: Gmail message ID + + Returns: + Dict: Email data with thread context + """ + try: + # Get full message data + message = self.service.users().messages().get( + userId='me', + id=message_id, + format='full' + ).execute() + + # Get thread data + thread_id = message.get('threadId') + thread = self.service.users().threads().get( + userId='me', + id=thread_id + ).execute() + + # Extract headers + headers = { + header['name'].lower(): header['value'] + for header in message['payload']['headers'] + } + + # Parse message content + content = self._get_message_content(message['payload']) + + # Extract sender information + sender = self._parse_email_address(headers.get('from', '')) + + return { + 'id': message_id, + 'thread_id': thread_id, + 'subject': headers.get('subject', ''), + 'sender': sender, + 'to': self._parse_email_address(headers.get('to', '')), + 'date': headers.get('date'), + 'content': content, + 'labels': message.get('labelIds', []), + 'thread_messages': [ + self._parse_thread_message(msg) + for msg in thread.get('messages', []) + if msg['id'] != message_id # Exclude current message + ], + 'thread_size': len(thread.get('messages', [])), + 'is_unread': 'UNREAD' in message.get('labelIds', []) + } + except Exception as e: + print(f"Error getting email context: {e}") + return {} + + def _get_message_content(self, payload: Dict) -> str: + """Extract message content from payload""" + if 'body' in payload and payload['body'].get('data'): + return base64.urlsafe_b64decode( + payload['body']['data'].encode('ASCII') + ).decode('utf-8') + + if 'parts' in payload: + for part in payload['parts']: + if part.get('mimeType') == 'text/plain': + if 'data' in part['body']: + return base64.urlsafe_b64decode( + part['body']['data'].encode('ASCII') + ).decode('utf-8') + return '' + + def _parse_thread_message(self, message: Dict) -> Dict: + """Parse thread message into simplified format""" + headers = { + header['name'].lower(): header['value'] + for header in message['payload']['headers'] + } + + return { + 'id': message['id'], + 'sender': self._parse_email_address(headers.get('from', '')), + 'date': headers.get('date'), + 'content': self._get_message_content(message['payload']), + 'labels': message.get('labelIds', []) + } + + def _parse_email_address(self, address: str) -> Dict: + """Parse email address string into components""" + if '<' in address and '>' in address: + name = address[:address.find('<')].strip() + email_addr = address[address.find('<')+1:address.find('>')] + return {'name': name, 'email': email_addr} + return {'name': '', 'email': address.strip()} + + def mark_as_read(self, message_id: str) -> bool: + """Mark email as read""" + try: + self.service.users().messages().modify( + userId='me', + id=message_id, + body={'removeLabelIds': ['UNREAD']} + ).execute() + return True + except Exception as e: + print(f"Error marking message as read: {e}") + return False + + def send_response(self, + to: str, + subject: str, + message_text: str, + thread_id: Optional[str] = None) -> bool: + """ + Send email response. + + Args: + to: Recipient email address + subject: Email subject + message_text: Response content + thread_id: Optional thread ID for reply + + Returns: + bool: True if sent successfully + """ + try: + message = MIMEText(message_text) + message['to'] = to + message['subject'] = subject + + # Create message + raw_message = base64.urlsafe_b64encode( + message.as_bytes() + ).decode('utf-8') + + body = {'raw': raw_message} + if thread_id: + body['threadId'] = thread_id + + self.service.users().messages().send( + userId='me', + body=body + ).execute() + return True + except Exception as e: + print(f"Error sending response: {e}") + return False + + def _run(self, method: str = "get_latest_emails", **kwargs) -> Dict: + """Required implementation of BaseTool._run""" + try: + if method == "get_latest_emails": + return self.get_latest_emails(kwargs.get("limit", 5)) + elif method == "get_thread_history": + return self.get_thread_history(kwargs.get("thread_id")) + elif method == "get_sender_info": + return self.get_sender_info(kwargs.get("email")) + elif method == "mark_as_read": + return self.mark_as_read(kwargs.get("message_id")) + elif method == "send_response": + return self.send_response( + to=kwargs.get("to"), + subject=kwargs.get("subject"), + message_text=kwargs.get("message_text"), + thread_id=kwargs.get("thread_id") + ) + return None + except Exception as e: + print(f"Error in GmailTool._run: {e}") + return None diff --git a/src/email_processor/main.py b/src/email_processor/main.py new file mode 100644 index 000000000..599b52d27 --- /dev/null +++ b/src/email_processor/main.py @@ -0,0 +1,60 @@ +""" +Main workflow script for email processing system. +Implements event-based automation for email analysis and response generation. +""" +from typing import Dict +import logging +from datetime import datetime + +from .email_flow import EmailProcessingFlow + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def process_emails() -> Dict: + """ + Main workflow function. + Implements event-based email processing using CrewAI Flow. + + Returns: + Dict: Processing results including counts and any errors + """ + try: + logger.info("Starting email processing workflow") + + # Initialize and start flow + flow = EmailProcessingFlow() + results = flow.kickoff() + + # Log processing results + logger.info( + f"Processed {results['processed_emails']} emails, " + f"generated {results['responses_generated']} responses" + ) + + if results.get('errors', 0) > 0: + logger.warning(f"Encountered {results['errors']} errors during processing") + + return { + "timestamp": datetime.now().isoformat(), + "status": "success", + **results + } + + except Exception as e: + logger.error(f"Email processing failed: {str(e)}") + return { + "timestamp": datetime.now().isoformat(), + "status": "error", + "error": str(e) + } + +if __name__ == "__main__": + results = process_emails() + print(f"\nProcessing Results:\n{'-' * 20}") + for key, value in results.items(): + print(f"{key}: {value}") diff --git a/src/email_processor/mock_email_data.py b/src/email_processor/mock_email_data.py new file mode 100644 index 000000000..c10589a96 --- /dev/null +++ b/src/email_processor/mock_email_data.py @@ -0,0 +1,122 @@ +"""Mock email data for testing email processing tool""" +from datetime import datetime, timedelta +from typing import List, Dict, Optional +from pydantic import BaseModel + +class MockEmailMessage(BaseModel): + id: str + from_email: str + to: List[str] + date: str + subject: str + body: str + +class MockEmailThread(BaseModel): + thread_id: str + subject: str + messages: List[MockEmailMessage] + + def dict(self) -> Dict: + return { + "thread_id": self.thread_id, + "subject": self.subject, + "messages": [msg.dict() for msg in self.messages] + } + +class MockSenderInfo(BaseModel): + name: str + company: str + previous_threads: List[str] + last_interaction: str + interaction_frequency: str + +MOCK_EMAILS = { + "thread_1": MockEmailThread( + thread_id="thread_1", + subject="Meeting Follow-up", + messages=[ + MockEmailMessage( + id="msg1", + from_email="john@example.com", + to=["user@company.com"], + date=(datetime.now() - timedelta(days=2)).isoformat(), + subject="Meeting Follow-up", + body="Thanks for the great discussion yesterday. Looking forward to next steps." + ), + MockEmailMessage( + id="msg2", + from_email="user@company.com", + to=["john@example.com"], + date=(datetime.now() - timedelta(days=1)).isoformat(), + subject="Re: Meeting Follow-up", + body="Great meeting indeed. I'll prepare the proposal by next week." + ) + ] + ), + "thread_2": MockEmailThread( + thread_id="thread_2", + subject="Project Proposal", + messages=[ + MockEmailMessage( + id="msg3", + from_email="john@example.com", + to=["user@company.com"], + date=(datetime.now() - timedelta(days=30)).isoformat(), + subject="Project Proposal", + body="Here's the initial project proposal for your review." + ) + ] + ), + "thread_3": MockEmailThread( + thread_id="thread_3", + subject="Quick Question", + messages=[ + MockEmailMessage( + id="msg4", + from_email="sarah@othercompany.com", + to=["user@company.com"], + date=datetime.now().isoformat(), + subject="Quick Question", + body="Do you have time for a quick call tomorrow?" + ) + ] + ) +} + +MOCK_SENDERS = { + "john@example.com": MockSenderInfo( + name="John Smith", + company="Example Corp", + previous_threads=["thread_1", "thread_2"], + last_interaction=(datetime.now() - timedelta(days=1)).isoformat(), + interaction_frequency="weekly" + ), + "sarah@othercompany.com": MockSenderInfo( + name="Sarah Johnson", + company="Other Company Ltd", + previous_threads=["thread_3"], + last_interaction=datetime.now().isoformat(), + interaction_frequency="first_time" + ) +} + +def get_mock_thread(thread_id: str) -> Optional[MockEmailThread]: + return MOCK_EMAILS.get(thread_id) + +def get_mock_sender_info(email: str) -> Optional[MockSenderInfo]: + return MOCK_SENDERS.get(email) + +def find_similar_threads(query: str) -> List[MockEmailThread]: + similar = [] + query = query.lower() + for thread in MOCK_EMAILS.values(): + if (query in thread.subject.lower() or + any(query in msg.body.lower() for msg in thread.messages)): + similar.append(thread) + return similar + +def get_sender_threads(sender_email: str) -> List[MockEmailThread]: + sender = MOCK_SENDERS.get(sender_email) + if not sender: + return [] + return [MOCK_EMAILS[thread_id] for thread_id in sender.previous_threads if thread_id in MOCK_EMAILS] diff --git a/src/email_processor/mock_llm.py b/src/email_processor/mock_llm.py new file mode 100644 index 000000000..5534e22c4 --- /dev/null +++ b/src/email_processor/mock_llm.py @@ -0,0 +1,257 @@ +"""Mock LLM implementation for testing CrewAI agents""" +from typing import Dict, Any, Optional +import json + +class MockLLM: + """Mock LLM responses for testing agent interactions""" + + @staticmethod + def analyze_context(input_data: Dict) -> str: + """Generate mock analysis of email context""" + thread = input_data.get("thread", {}) + similar_threads = input_data.get("similar_threads", []) + context_summary = input_data.get("context_summary", {}) + + context = { + "thread_type": "business" if "meeting" in thread.get("subject", "").lower() else "general", + "conversation_stage": "follow_up" if context_summary.get("thread_length", 0) > 1 else "initial", + "sender_relationship": "established" if context_summary.get("thread_length", 0) > 2 else "new", + "urgency_indicators": any(word in thread.get("subject", "").lower() for word in ["urgent", "asap", "quick"]), + "key_topics": ["meeting", "proposal"] if "meeting" in thread.get("subject", "").lower() else ["general inquiry"], + "thread_summary": { + "subject": thread.get("subject", "Unknown"), + "message_count": context_summary.get("thread_length", 0), + "participant_count": context_summary.get("participant_count", 0), + "has_previous_threads": context_summary.get("has_previous_threads", False) + } + } + return json.dumps(context) + + @staticmethod + def research_sender(input_data: Dict) -> str: + """Generate mock research about sender""" + sender_info = input_data.get("sender_info", {}) + return json.dumps({ + "sender_background": f"{sender_info.get('name', 'Unknown')} at {sender_info.get('company', 'Unknown Company')}", + "company_info": "Leading technology firm" if "Corp" in sender_info.get("company", "") else "Emerging business", + "interaction_history": { + "frequency": sender_info.get("interaction_frequency", "new"), + "last_contact": sender_info.get("last_interaction", "unknown"), + "thread_count": len(sender_info.get("previous_threads", [])) + } + }) + + @staticmethod + def determine_response(input_data: Dict) -> str: + """Generate mock response strategy determination""" + context = input_data.get("context", {}) + research = input_data.get("research", {}) + thread_data = input_data.get("thread_data", {}).get("thread", {}) + interaction_frequency = input_data.get("interaction_frequency", "new") + similar_threads_count = input_data.get("similar_threads_count", 0) + + # Determine if response is needed based on multiple factors + is_business = context.get("thread_type") == "business" + is_urgent = context.get("urgency_indicators", False) + is_frequent = interaction_frequency == "weekly" + has_previous_threads = similar_threads_count > 0 + is_first_time = interaction_frequency == "first_time" + + # Response needed logic - more conservative for first-time senders + response_needed = ( + (is_urgent and (not is_first_time or is_business)) or # Urgent only matters if not first-time or is business + (is_business and not is_first_time) or # Business communications need response if not first-time + is_frequent or # Regular contacts need response + (has_previous_threads and context.get("conversation_stage") == "follow_up") # Follow-ups to existing threads + ) + + # Priority level determination - more nuanced + priority_level = "high" if ( + (is_urgent and (is_business or is_frequent)) or # Urgent is only high priority for business or frequent contacts + (is_business and is_frequent) # Regular business contacts are high priority + ) else "medium" if ( + is_business or # Business communications are at least medium priority + (has_previous_threads and is_frequent) or # Regular contacts with history are medium priority + (is_urgent and is_first_time) # First-time urgent requests are medium priority + ) else "low" # Everything else is low priority + + return json.dumps({ + "response_needed": response_needed, + "priority_level": priority_level, + "response_strategy": { + "tone": "professional" if is_business else "casual", + "key_points": [ + "Acknowledge previous interaction" if has_previous_threads else "Introduce context", + "Address specific inquiries", + "Propose next steps" if priority_level in ["high", "medium"] else "Maintain relationship" + ], + "timing": "urgent" if priority_level == "high" else "standard", + "considerations": [ + f"Relationship: {'established' if is_frequent else 'new'}", + f"Previous threads: {similar_threads_count}", + f"Business context: {research.get('company_info', '')}", + f"Interaction frequency: {interaction_frequency}", + f"Priority level: {priority_level}", + f"Response needed: {response_needed}", + f"First time sender: {is_first_time}" + ] + } + }) + + @staticmethod + def create_content_strategy(input_data: Dict) -> str: + """Generate mock content strategy for email response""" + thread_context = input_data.get("thread_context", {}) + analysis = input_data.get("analysis", {}) + + return json.dumps({ + "tone": "professional", + "key_points": [ + "Reference previous interaction", + "Address main topics", + "Propose next steps" + ], + "structure": { + "greeting": "Personalized based on relationship", + "context": "Reference previous messages", + "main_content": "Address key points", + "closing": "Action-oriented" + }, + "considerations": [ + f"Relationship: {analysis.get('priority', 'medium')}", + "Previous communication history", + "Business context" + ] + }) + + @staticmethod + def draft_response(input_data: Dict) -> str: + """Generate mock email response draft""" + strategy = input_data.get("strategy", {}) + context = input_data.get("context", {}) + thread_data = input_data.get("thread_data", {}) + analysis = input_data.get("analysis", {}) + + # Extract subject properly + subject = thread_data.get("subject", "Unknown Subject") + if isinstance(subject, str) and subject: + subject = f"Re: {subject}" + + # Get context information + priority = analysis.get("priority", "medium") + sender_info = analysis.get("analysis", {}).get("research", {}) + context_info = analysis.get("analysis", {}).get("context", {}) + is_business = context_info.get("thread_type") == "business" + is_followup = context_info.get("conversation_stage") == "follow_up" + + # Generate appropriate greeting + sender_name = sender_info.get('sender_background', '[Contact]').split(' at ')[0] + greeting = f"Dear {sender_name}" if is_business else f"Hi {sender_name}" + + # Generate appropriate context line + context_line = ( + "Thank you for your follow-up regarding" if is_followup + else "Thank you for reaching out about" + ) + + # Generate appropriate priority/context acknowledgment + priority_line = ( + "I understand the urgency of your request and will address it promptly" if priority == "high" + else "I appreciate you bringing this to my attention and will address your points" + ) + + # Generate appropriate closing + closing = ( + "I look forward to our continued collaboration" if is_followup + else "I look forward to discussing this further" + ) + + content = f""" +{greeting}, + +{context_line} {subject.replace('Re: ', '')}. + +{priority_line}. + +{strategy.get('key_points', [''])[0]} +{strategy.get('key_points', ['', ''])[1]} +{strategy.get('key_points', ['', '', ''])[2]} + +{closing}. + +Best regards, +[Your Name] + """.strip() + + return json.dumps({ + "subject": subject, + "content": content, + "tone_used": strategy.get("tone", "professional"), + "points_addressed": strategy.get("key_points", []), + "draft_version": "1.0", + "context_used": { + "priority": priority, + "relationship": context_info.get("sender_relationship", "professional"), + "background": sender_info.get("sender_background", "") + } + }) + + @staticmethod + def review_response(input_data: Dict) -> str: + """Generate mock review of email response""" + draft = input_data.get("draft", {}) + strategy = input_data.get("strategy", {}) + context = input_data.get("context", {}) + + content = draft.get("content", "") + points_addressed = draft.get("points_addressed", []) + context_used = draft.get("context_used", {}) + + suggestions = [] + if "[Contact]" in content: + suggestions.append("Add specific contact name") + if "[Your Name]" in content: + suggestions.append("Add sender name") + if len(points_addressed) < len(strategy.get("key_points", [])): + suggestions.append("Address all key points") + + return json.dumps({ + "final_content": content, + "subject": draft.get("subject", ""), + "review_notes": { + "tone_appropriate": True, + "points_addressed": len(suggestions) == 0, + "clarity": "High", + "professionalism": "Maintained", + "context_awareness": { + "priority_reflected": context_used.get("priority") in content.lower(), + "relationship_acknowledged": context_used.get("relationship") in content.lower(), + "background_used": bool(context_used.get("background")) + } + }, + "suggestions_implemented": suggestions if suggestions else [ + "All requirements met", + "Context appropriately used", + "Professional tone maintained" + ], + "version": "1.1" + }) + +def mock_agent_executor(agent_role: str, task_input: Dict) -> str: + """Mock agent execution with predefined responses""" + llm = MockLLM() + + if "Context Analyzer" in agent_role: + return llm.analyze_context(task_input) + elif "Research Specialist" in agent_role: + return llm.research_sender(task_input) + elif "Response Strategist" in agent_role: + return llm.determine_response(task_input) + elif "Content Strategist" in agent_role: + return llm.create_content_strategy(task_input) + elif "Response Writer" in agent_role: + return llm.draft_response(task_input) + elif "Quality Reviewer" in agent_role: + return llm.review_response(task_input) + + return json.dumps({"error": "Unknown agent role"}) diff --git a/src/email_processor/models.py b/src/email_processor/models.py new file mode 100644 index 000000000..8d7e072cd --- /dev/null +++ b/src/email_processor/models.py @@ -0,0 +1,85 @@ +""" +Models for email processing state management using Pydantic. +Handles state for email analysis, thread history, and response generation. +""" +from pydantic import BaseModel, Field +from typing import Dict, List, Optional +from datetime import datetime + +class SenderInfo(BaseModel): + """Information about email senders and their companies""" + name: str + email: str + company: Optional[str] = None + title: Optional[str] = None + company_info: Optional[Dict] = Field(default_factory=dict) + interaction_history: List[Dict] = Field(default_factory=list) + last_interaction: Optional[datetime] = None + notes: Optional[str] = None + +class EmailThread(BaseModel): + """Email thread information and history""" + thread_id: str + subject: str + participants: List[str] + messages: List[Dict] = Field(default_factory=list) + last_update: datetime + labels: List[str] = Field(default_factory=list) + summary: Optional[str] = None + +class EmailAnalysis(BaseModel): + """Analysis results for an email""" + email_id: str + sender_email: str + importance: int = Field(ge=0, le=10) + response_needed: bool = False + response_deadline: Optional[datetime] = None + similar_threads: List[str] = Field(default_factory=list) + context_summary: Optional[str] = None + action_items: List[str] = Field(default_factory=list) + sentiment: Optional[str] = None + +class EmailResponse(BaseModel): + """Generated email response""" + email_id: str + thread_id: str + response_text: str + context_used: Dict = Field(default_factory=dict) + generated_at: datetime = Field(default_factory=datetime.now) + approved: bool = False + +class EmailState(BaseModel): + """Main state container for email processing flow""" + latest_emails: List[Dict] = Field( + default_factory=list, + description="Latest 5 emails fetched from Gmail" + ) + thread_history: Dict[str, EmailThread] = Field( + default_factory=dict, + description="History of email threads indexed by thread_id" + ) + sender_info: Dict[str, SenderInfo] = Field( + default_factory=dict, + description="Information about senders and their companies" + ) + analysis_results: Dict[str, EmailAnalysis] = Field( + default_factory=dict, + description="Analysis results for processed emails" + ) + response_decisions: Dict[str, bool] = Field( + default_factory=dict, + description="Decision whether to respond to each email" + ) + generated_responses: Dict[str, EmailResponse] = Field( + default_factory=dict, + description="Generated responses for emails" + ) + errors: Dict[str, Dict] = Field( + default_factory=dict, + description="Error information for flow execution" + ) + + class Config: + """Pydantic model configuration""" + arbitrary_types_allowed = True + extra = "allow" # Allow extra fields to be set diff --git a/src/email_processor/response_crew.py b/src/email_processor/response_crew.py new file mode 100644 index 000000000..a82bd735c --- /dev/null +++ b/src/email_processor/response_crew.py @@ -0,0 +1,118 @@ +""" +Response crew implementation for email processing. +Handles response generation with comprehensive context. +""" +from crewai import Agent, Task, Crew, Process +from typing import Dict, List, Optional +from datetime import datetime + +from .gmail_tool import GmailTool + +class ResponseCrew: + """ + Crew for drafting email responses based on analysis. + """ + + def __init__(self, gmail_tool: Optional[GmailTool] = None): + """Initialize response crew with required tools""" + self.gmail_tool = gmail_tool or GmailTool() + self._create_agents() + + def _create_agents(self): + """Create specialized agents for response generation""" + self.content_strategist = Agent( + role="Content Strategist", + name="Strategy Expert", + goal="Develop response content strategy", + backstory="Expert at planning effective communication approaches", + tools=[self.gmail_tool], + verbose=True + ) + + self.response_writer = Agent( + role="Email Writer", + name="Content Creator", + goal="Write effective email responses", + backstory="Skilled at crafting clear and impactful email content", + tools=[self.gmail_tool], + verbose=True + ) + + self.quality_reviewer = Agent( + role="Quality Reviewer", + name="Content Reviewer", + goal="Ensure response quality and appropriateness", + backstory="Expert at reviewing and improving email communications", + tools=[self.gmail_tool], + verbose=True + ) + + def draft_response(self, + email: Dict, + analysis: Dict, + thread_history: List[Dict]) -> Dict: + """ + Generate response using comprehensive context. + + Args: + email: Current email data + analysis: Analysis results for the email + thread_history: Previous messages in the thread + + Returns: + Dict: Generated response data + """ + try: + # Create response crew + crew = Crew( + agents=[ + self.content_strategist, + self.response_writer, + self.quality_reviewer + ], + tasks=[ + Task( + description="Develop response strategy", + agent=self.content_strategist + ), + Task( + description="Write email response", + agent=self.response_writer + ), + Task( + description="Review and improve response", + agent=self.quality_reviewer + ) + ], + verbose=True + ) + + # Generate response + results = crew.kickoff() + + # Process results + return { + "email_id": email["id"], + "thread_id": email["thread_id"], + "response_text": results[-1].get("response_text", ""), + "strategy": results[0], + "context_used": { + "analysis": analysis, + "thread_size": len(thread_history) + }, + "metadata": { + "generated_at": datetime.now().isoformat(), + "reviewed": True, + "review_feedback": results[-1].get("feedback", {}) + } + } + + except Exception as e: + print(f"Response generation error: {str(e)}") + return { + "email_id": email.get("id", "unknown"), + "thread_id": email.get("thread_id", "unknown"), + "error": f"Response generation failed: {str(e)}", + "response_text": None, + "strategy": None + } diff --git a/tests/test_complete_workflow.py b/tests/test_complete_workflow.py new file mode 100644 index 000000000..9d8d6146d --- /dev/null +++ b/tests/test_complete_workflow.py @@ -0,0 +1,103 @@ +"""Test script for complete email processing workflow""" +from email_analysis_crew import EmailAnalysisCrew +from response_crew import ResponseCrew +from email_tool import EmailTool +from mock_email_data import MockEmailThread +import json + +def test_email_analysis(email_tool, analysis_crew, thread_id): + """Test comprehensive email analysis including similar threads and research""" + print("\nAnalyzing email thread...") + + # Get thread context + thread = email_tool.get_email_thread(thread_id) + print(f"\nThread subject: {thread.subject}") + + # Find similar threads + similar = email_tool.find_similar_threads(thread.subject) + print(f"\nFound {len(similar)} similar threads") + + # Get sender history + sender = thread.messages[0].from_email + sender_info = email_tool.get_sender_history(sender) + print(f"\nSender: {sender_info['name']} from {sender_info['company']}") + print(f"Previous interactions: {sender_info['interaction_frequency']}") + + # Analyze with crew + analysis_result = analysis_crew.analyze_email(thread_id) + print(f"\nAnalysis Results:") + print(f"Response needed: {analysis_result.get('response_needed', False)}") + print(f"Priority: {analysis_result.get('priority', 'error')}") + print(f"Decision factors:") + context = analysis_result.get('analysis', {}).get('context', {}) + print(f"- Thread type: {context.get('thread_type', 'unknown')}") + print(f"- Similar threads found: {analysis_result.get('similar_threads_found', 0)}") + print(f"- Interaction frequency: {sender_info.get('interaction_frequency', 'unknown')}") + print(f"- Urgency indicators: {context.get('urgency_indicators', False)}") + print(f"- Conversation stage: {context.get('conversation_stage', 'unknown')}") + + return analysis_result + +def test_complete_workflow(): + """Test the complete email processing workflow""" + try: + print("\nTesting Complete Email Processing Workflow") + print("=========================================") + + # Initialize tools and crews + email_tool = EmailTool() + analysis_crew = EmailAnalysisCrew() + response_crew = ResponseCrew() + + # Test 1: Process email requiring response (weekly interaction) + print("\nTest 1: Processing email requiring response") + print("------------------------------------------") + thread_id = "thread_1" # Meeting follow-up thread from frequent contact + + analysis_result = test_email_analysis(email_tool, analysis_crew, thread_id) + + if analysis_result.get('response_needed', False): + print("\nGenerating response...") + response_result = response_crew.draft_response(thread_id, analysis_result) + print("\nGenerated Response:") + print(json.dumps(response_result.get('response', {}), indent=2)) + + # Verify response matches context + print("\nResponse Analysis:") + print(f"Tone matches relationship: {response_result['response']['review_notes']['context_awareness']['relationship_acknowledged']}") + print(f"Priority reflected: {response_result['response']['review_notes']['context_awareness']['priority_reflected']}") + print(f"Background used: {response_result['response']['review_notes']['context_awareness']['background_used']}") + else: + print("\nNo response required.") + + # Test 2: Process email not requiring response (first-time sender) + print("\nTest 2: Processing email not requiring response") + print("----------------------------------------------") + thread_id = "thread_3" # First-time contact + + analysis_result = test_email_analysis(email_tool, analysis_crew, thread_id) + + if analysis_result.get('response_needed', False): + print("\nGenerating response...") + response_result = response_crew.draft_response(thread_id, analysis_result) + print("\nGenerated Response:") + print(json.dumps(response_result.get('response', {}), indent=2)) + + # Verify response matches context + print("\nResponse Analysis:") + context = analysis_result.get('analysis', {}).get('context', {}) + print(f"Thread type: {context.get('thread_type', 'unknown')}") + print(f"Conversation stage: {context.get('conversation_stage', 'unknown')}") + print(f"Response priority: {analysis_result.get('priority', 'unknown')}") + else: + print("\nNo response required - First time sender with no urgent context") + + print("\nWorkflow test completed successfully!") + return True + + except Exception as e: + print(f"Analysis error: {str(e)}") + return False + +if __name__ == "__main__": + test_complete_workflow() diff --git a/tests/test_crewai_components.py b/tests/test_crewai_components.py new file mode 100644 index 000000000..effb77582 --- /dev/null +++ b/tests/test_crewai_components.py @@ -0,0 +1,64 @@ +from crewai import Agent, Task, Crew, Process +from typing import List +import json +from pydantic import BaseModel +import os + +# Define output models +class EmailAnalysis(BaseModel): + needs_response: bool + priority: str + context: str + +# Sample email data for testing +SAMPLE_EMAIL = { + "subject": "Meeting Follow-up", + "body": "Thanks for the great discussion yesterday. Looking forward to next steps.", + "sender": "john@example.com" +} + +# Test Agent Creation +researcher = Agent( + role="Email Researcher", + goal="Analyze email content and gather relevant context", + backstory="Expert at analyzing communication patterns and gathering contextual information", + verbose=True, + allow_delegation=True +) + +# Test Task Creation +analysis_task = Task( + description=f"Analyze this email content and determine if it requires a response: {json.dumps(SAMPLE_EMAIL)}", + agent=researcher, + expected_output="Detailed analysis of email content and response requirement", + output_json=EmailAnalysis +) + +# Test Crew Creation with Sequential Process +crew = Crew( + agents=[researcher], + tasks=[analysis_task], + process=Process.sequential, + verbose=True +) + +# Test execution with error handling +if __name__ == "__main__": + try: + # Ensure we have an API key + if not os.getenv("OPENAI_API_KEY"): + print("Please set OPENAI_API_KEY environment variable") + exit(1) + + result = crew.kickoff() + print("Execution Results:", result) + + # Access structured output + if hasattr(result, "output") and result.output: + analysis = EmailAnalysis.parse_raw(result.output) + print("\nStructured Analysis:") + print(f"Needs Response: {analysis.needs_response}") + print(f"Priority: {analysis.priority}") + print(f"Context: {analysis.context}") + except Exception as e: + print(f"Error during execution: {str(e)}") diff --git a/tests/test_email_crew.py b/tests/test_email_crew.py new file mode 100644 index 000000000..d19c56603 --- /dev/null +++ b/tests/test_email_crew.py @@ -0,0 +1,93 @@ +"""Test script for email analysis crew""" +from email_analysis_crew import EmailAnalysisCrew +from email_tool import EmailTool +import json + +def test_email_tool(): + """Test the email tool functionality first""" + try: + tool = EmailTool() + + # Test get_thread operation + result = tool._run("get_thread", thread_id="thread_1") + print("\nThread details:") + print(f"Subject: {result['subject']}") + print(f"Participants: {', '.join(result['participants'])}") + + # Test find_similar operation + result = tool._run("find_similar", query="meeting") + print("\nSimilar threads:") + for thread in result['threads']: + print(f"- {thread['subject']}") + + # Test get_history operation + result = tool._run("get_history", sender_email="john@example.com") + print("\nSender history:") + print(f"Name: {result['name']}") + print(f"Company: {result['company']}") + + # Test analyze_context operation + result = tool._run("analyze_context", thread_id="thread_1") + print("\nContext analysis:") + print(f"Thread length: {result['context_summary']['thread_length']}") + print(f"Relationship: {result['context_summary']['sender_relationship']}") + + return True + except Exception as e: + print(f"Tool test error: {str(e)}") + return False + +def test_email_analysis(): + """Test the email analysis crew functionality""" + if not test_email_tool(): + print("Skipping crew test due to tool failure") + return False + + try: + # Initialize crew + crew = EmailAnalysisCrew() + print("\nTesting email analysis crew...") + + # Test analysis of thread_1 (meeting follow-up thread) + print("\nAnalyzing meeting follow-up thread...") + result = crew.analyze_email("thread_1") + + print("\nAnalysis Results:") + print(f"Thread ID: {result['thread_id']}") + print(f"Response Needed: {result['response_needed']}") + print(f"Priority: {result['priority']}") + + if result['response_needed']: + print("\nContext Analysis:") + print(json.dumps(result['analysis']['context'], indent=2)) + print("\nSender Research:") + print(json.dumps(result['analysis']['research'], indent=2)) + print("\nResponse Strategy:") + print(json.dumps(result['analysis']['strategy'], indent=2)) + + # Test analysis of thread_3 (new inquiry) + print("\nAnalyzing new inquiry thread...") + result = crew.analyze_email("thread_3") + + print("\nAnalysis Results:") + print(f"Thread ID: {result['thread_id']}") + print(f"Response Needed: {result['response_needed']}") + print(f"Priority: {result['priority']}") + + if result['response_needed']: + print("\nContext Analysis:") + print(json.dumps(result['analysis']['context'], indent=2)) + print("\nSender Research:") + print(json.dumps(result['analysis']['research'], indent=2)) + print("\nResponse Strategy:") + print(json.dumps(result['analysis']['strategy'], indent=2)) + + print("\nAll tests completed successfully!") + return True + + except Exception as e: + print(f"Error during testing: {str(e)}") + return False + +if __name__ == "__main__": + test_email_analysis() diff --git a/tests/test_email_flow.py b/tests/test_email_flow.py new file mode 100644 index 000000000..cc34907e3 --- /dev/null +++ b/tests/test_email_flow.py @@ -0,0 +1,195 @@ +""" +Test suite for email processing flow implementation. +""" +import pytest +from datetime import datetime +from typing import Dict, List +from unittest.mock import MagicMock +from crewai.tools import BaseTool + +from email_processor.models import EmailState +from email_processor.email_flow import EmailProcessingFlow +from email_processor.email_analysis_crew import EmailAnalysisCrew +from email_processor.response_crew import ResponseCrew + +class MockGmailTool(BaseTool): + """Mock Gmail tool for testing""" + name: str = "Gmail Tool" + description: str = "Tool for interacting with Gmail" + + def get_latest_emails(self, limit: int = 5) -> List[Dict]: + """Mock getting latest emails""" + return [ + { + "id": f"email_{i}", + "thread_id": f"thread_{i}", + "subject": f"Test Email {i}", + "sender": "test@example.com", + "body": f"Test email body {i}", + "date": datetime.now().isoformat() + } + for i in range(limit) + ] + + def get_thread_history(self, thread_id: str) -> List[Dict]: + """Mock getting thread history""" + return [ + { + "id": f"history_{i}", + "thread_id": thread_id, + "subject": f"Previous Email {i}", + "sender": "test@example.com", + "body": f"Previous email body {i}", + "date": datetime.now().isoformat() + } + for i in range(3) + ] + + def get_sender_info(self, email: str) -> Dict: + """Mock getting sender information""" + return { + "email": email, + "name": "Test User", + "company": "Test Corp", + "previous_threads": ["thread_1", "thread_2"], + "interaction_history": { + "total_emails": 10, + "last_interaction": datetime.now().isoformat() + } + } + + def _run(self, method: str = "get_latest_emails", **kwargs) -> Dict: + """Required implementation of BaseTool._run""" + if method == "get_latest_emails": + return self.get_latest_emails(kwargs.get("limit", 5)) + elif method == "get_thread_history": + return self.get_thread_history(kwargs.get("thread_id")) + elif method == "get_sender_info": + return self.get_sender_info(kwargs.get("email")) + return None + +@pytest.fixture +def mock_crews(monkeypatch): + """Mock analysis and response crews""" + def mock_analyze_email(*args, **kwargs): + email = kwargs.get("email", {}) + return { + "email_id": email.get("id", "unknown"), + "thread_id": email.get("thread_id", "unknown"), + "response_needed": True, + "priority": "high", + "similar_threads": ["thread_1"], + "sender_context": {"previous_interactions": 5}, + "company_info": {"name": "Test Corp", "industry": "Technology"}, + "response_strategy": {"tone": "professional", "key_points": ["previous collaboration"]} + } + + def mock_draft_response(*args, **kwargs): + email = kwargs.get("email", {}) + return { + "email_id": email.get("id", "unknown"), + "response_text": "Thank you for your email. We appreciate your continued collaboration.", + "strategy": {"type": "professional", "focus": "relationship building"}, + "metadata": { + "generated_at": datetime.now().isoformat(), + "reviewed": True, + "review_feedback": {"quality": "high", "tone": "appropriate"} + } + } + + monkeypatch.setattr(EmailAnalysisCrew, "analyze_email", mock_analyze_email) + monkeypatch.setattr(ResponseCrew, "draft_response", mock_draft_response) + +@pytest.fixture +def email_flow(monkeypatch): + """Create email flow with mocked components""" + mock_tool = MockGmailTool() + def mock_init(self): + self.gmail_tool = mock_tool + self.analysis_crew = EmailAnalysisCrew(gmail_tool=mock_tool) + self.response_crew = ResponseCrew(gmail_tool=mock_tool) + self._state = EmailState() + self._initialize_state() + + monkeypatch.setattr(EmailProcessingFlow, "__init__", mock_init) + return EmailProcessingFlow() + +def test_email_flow_initialization(email_flow): + """Test flow initialization and state setup""" + # Verify state initialization + assert hasattr(email_flow._state, "latest_emails") + assert hasattr(email_flow._state, "analysis_results") + assert hasattr(email_flow._state, "generated_responses") + assert isinstance(email_flow._state.latest_emails, list) + assert isinstance(email_flow._state.analysis_results, dict) + assert isinstance(email_flow._state.generated_responses, dict) + +def test_email_fetching(email_flow): + """Test email fetching with 5-email limit""" + email_flow.kickoff() + + # Verify email fetching + assert len(email_flow._state.latest_emails) <= 5 + assert len(email_flow._state.latest_emails) > 0 + assert all(isinstance(email, dict) for email in email_flow._state.latest_emails) + +def test_email_analysis(email_flow, mock_crews): + """Test email analysis and response decision""" + email_flow.kickoff() + + # Verify analysis results + assert len(email_flow._state.analysis_results) > 0 + for email_id, analysis in email_flow._state.analysis_results.items(): + assert "response_needed" in analysis + assert "priority" in analysis + assert isinstance(analysis["response_needed"], bool) + +def test_response_generation(email_flow, mock_crews): + """Test response generation for emails needing response""" + email_flow.kickoff() + + # Verify response generation + for email_id, analysis in email_flow._state.analysis_results.items(): + if analysis["response_needed"]: + assert email_id in email_flow._state.generated_responses + response = email_flow._state.generated_responses[email_id] + assert "response_text" in response + assert "strategy" in response + assert "metadata" in response + +def test_complete_flow(email_flow, mock_crews): + """Test complete email processing flow""" + result = email_flow.kickoff() + + # Verify complete flow execution + assert len(email_flow._state.latest_emails) <= 5 + assert isinstance(email_flow._state.analysis_results, dict) + assert isinstance(email_flow._state.generated_responses, dict) + + # Verify response generation for emails needing response + for email_id, analysis in email_flow._state.analysis_results.items(): + if analysis["response_needed"]: + assert email_id in email_flow._state.generated_responses + assert email_flow._state.generated_responses[email_id]["email_id"] == email_id + +def test_error_handling(email_flow): + """Test error handling in flow execution""" + # Simulate error in email fetching by modifying _run method + original_run = email_flow.gmail_tool._run + + def mock_run(method: str = None, **kwargs): + if method == "get_latest_emails": + raise Exception("Test error") + return original_run(method, **kwargs) + + email_flow.gmail_tool._run = mock_run + result = email_flow.kickoff() + + # Verify error handling + assert "flow_execution" in email_flow._state.errors + assert isinstance(email_flow._state.errors["flow_execution"], list) + assert len(email_flow._state.errors["flow_execution"]) > 0 + assert "Test error" in email_flow._state.errors["flow_execution"][0]["error"] + + # Restore original method + email_flow.gmail_tool._run = original_run diff --git a/tests/test_email_tool.py b/tests/test_email_tool.py new file mode 100644 index 000000000..4fb3410da --- /dev/null +++ b/tests/test_email_tool.py @@ -0,0 +1,48 @@ +"""Test script for email processing tool""" +from gmail_tool import EmailTool +from datetime import datetime, timedelta + +def test_email_tool(): + """Test email processing tool functionality""" + try: + # Initialize tool + email_tool = EmailTool() + + # Test getting email thread + print("\nTesting thread retrieval...") + thread = email_tool.get_email_thread("thread_1") + print(f"Retrieved thread: {thread.subject}") + print(f"Participants: {', '.join(thread.participants)}") + print(f"Messages: {len(thread.messages)}") + + # Test finding similar threads + print("\nTesting similar thread search...") + similar = email_tool.find_similar_threads("meeting") + print(f"Found {len(similar)} similar threads") + for t in similar: + print(f"- {t.subject}") + + # Test sender history + print("\nTesting sender history...") + history = email_tool.get_sender_history("john@example.com") + print(f"Sender: {history['name']} from {history['company']}") + print(f"Last interaction: {history['last_interaction']}") + print(f"Interaction frequency: {history['interaction_frequency']}") + + # Test thread context analysis + print("\nTesting thread context analysis...") + context = email_tool.analyze_thread_context("thread_1") + print("Context Summary:") + print(f"Thread length: {context['context_summary']['thread_length']} messages") + print(f"Time span: {context['context_summary']['time_span']} days") + print(f"Relationship: {context['context_summary']['sender_relationship']}") + + print("\nAll tests completed successfully!") + return True + + except Exception as e: + print(f"Error during testing: {str(e)}") + return False + +if __name__ == "__main__": + test_email_tool() diff --git a/tests/test_response_crew.py b/tests/test_response_crew.py new file mode 100644 index 000000000..ff60c97be --- /dev/null +++ b/tests/test_response_crew.py @@ -0,0 +1,45 @@ +"""Test script for response crew functionality""" +from response_crew import ResponseCrew +from email_analysis_crew import EmailAnalysisCrew +import json + +def test_response_crew(): + """Test the response crew functionality""" + try: + # First get analysis results + analysis_crew = EmailAnalysisCrew() + analysis_result = analysis_crew.analyze_email("thread_1") + + if not analysis_result.get("response_needed", False): + print("No response needed for this thread") + return True + + # Initialize response crew + response_crew = ResponseCrew() + print("\nTesting response crew...") + + # Draft response + result = response_crew.draft_response("thread_1", analysis_result) + + print("\nResponse Results:") + print(f"Thread ID: {result['thread_id']}") + + if result.get("error"): + print(f"Error: {result['error']}") + return False + + print("\nContent Strategy:") + print(json.dumps(result['strategy_used'], indent=2)) + + print("\nFinal Response:") + print(json.dumps(result['response'], indent=2)) + + print("\nAll tests completed successfully!") + return True + + except Exception as e: + print(f"Error during testing: {str(e)}") + return False + +if __name__ == "__main__": + test_response_crew()