#!/usr/bin/env python3 import imaplib import email import email.utils import ssl import logging import sys import time import os import hashlib from datetime import datetime from pathlib import Path class Colors: """ANSI color codes for terminal output""" # Detect if colors are supported _colors_supported = sys.stdout.isatty() and ( 'TERM' in os.environ and os.environ['TERM'] != 'dumb' and hasattr(sys.stdout, 'fileno') ) # Color codes (will be empty strings if colors not supported) RESET = '\033[0m' if _colors_supported else '' BOLD = '\033[1m' if _colors_supported else '' DIM = '\033[2m' if _colors_supported else '' RED = '\033[91m' if _colors_supported else '' GREEN = '\033[92m' if _colors_supported else '' YELLOW = '\033[93m' if _colors_supported else '' BLUE = '\033[94m' if _colors_supported else '' MAGENTA = '\033[95m' if _colors_supported else '' CYAN = '\033[96m' if _colors_supported else '' WHITE = '\033[97m' if _colors_supported else '' # Background colors BG_RED = '\033[101m' if _colors_supported else '' BG_GREEN = '\033[102m' if _colors_supported else '' BG_YELLOW = '\033[103m' if _colors_supported else '' BG_BLUE = '\033[104m' if _colors_supported else '' def print_banner(): """Print fancy banner""" banner = f""" {Colors.CYAN}{'='*70} {Colors.BOLD}{Colors.WHITE} EMAIL MIGRATION SCRIPT {Colors.CYAN}{'='*70}{Colors.RESET} {Colors.DIM}Migrating emails while preserving folder structure...{Colors.RESET} """ print(banner) def print_status(status, message, color=Colors.WHITE): """Print formatted status message""" timestamp = datetime.now().strftime("%H:%M:%S") status_colors = { 'INFO': Colors.BLUE, 'SUCCESS': Colors.GREEN, 'WARNING': Colors.YELLOW, 'ERROR': Colors.RED, 'CONNECTING': Colors.CYAN, 'PROCESSING': Colors.MAGENTA } status_color = status_colors.get(status, color) print(f"{Colors.DIM}[{timestamp}]{Colors.RESET} {status_color}[{status:^10}]{Colors.RESET} {message}") def print_progress_bar(current, total, folder_name="", width=40): """Print a progress bar""" if total == 0: percentage = 100 filled = width else: percentage = int((current / total) * 100) filled = int((current / total) * width) # Use different characters based on color support if Colors._colors_supported: bar = '█' * filled + '░' * (width - filled) else: bar = '=' * filled + '-' * (width - filled) folder_display = f" {folder_name}" if folder_name else "" # Clear the line and print progress print(f"\r{' ' * 80}\r{Colors.CYAN}Progress:{Colors.RESET} [{Colors.GREEN}{bar}{Colors.RESET}] {percentage:3d}% ({current}/{total}){folder_display}", end='', flush=True) if current == total: print() # New line when complete def print_summary_section(title, stats): """Print a summary section with line separators""" separator = f"{Colors.CYAN}{'─' * 60}{Colors.RESET}" print(f"\n{separator}") print(f"{Colors.BOLD}{Colors.WHITE}{title:^60}{Colors.RESET}") print(f"{separator}") for key, value in stats.items(): print(f"{Colors.CYAN}{key}:{Colors.RESET} {value}") print(f"{separator}") def load_env_file(): env_vars = {} with open('.env', 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) env_vars[key.strip()] = value.strip() return env_vars def setup_logging(log_level): numeric_level = getattr(logging, log_level.upper(), logging.INFO) logging.basicConfig( level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('email_migration.log'), logging.StreamHandler() ] ) return logging.getLogger(__name__) class IMAPConnection: def __init__(self, server, port, email_addr, username, password, use_ssl=True, timeout=60): self.server = server self.port = port self.email = email_addr or "" self.username = username or "" self.password = password self.use_ssl = use_ssl self.timeout = timeout self.connection = None self.logger = logging.getLogger(__name__) # Validate that we have at least one authentication method if not self.email.strip() and not self.username.strip(): raise ValueError("Either email or username must be provided for authentication") def connect(self): try: if self.use_ssl: context = ssl.create_default_context() self.connection = imaplib.IMAP4_SSL(self.server, self.port, ssl_context=context) else: self.connection = imaplib.IMAP4(self.server, self.port) self.connection.sock.settimeout(self.timeout) # Try authentication with username first (if provided), then fall back to email login_success = False login_user = None if self.username and self.username.strip(): try: self.connection.login(self.username, self.password) login_user = self.username login_success = True self.logger.info(f"Connected to {self.server} using username: {self.username}") print_status("SUCCESS", f"Connected to {self.server} as {self.username}") except Exception as username_error: self.logger.debug(f"Username login failed for {self.username}: {username_error}") # If username login failed or no username provided, try email if not login_success and self.email.strip(): try: self.connection.login(self.email, self.password) login_user = self.email login_success = True self.logger.info(f"Connected to {self.server} using email: {self.email}") print_status("SUCCESS", f"Connected to {self.server} as {self.email}") except Exception as email_error: self.logger.error(f"Email login failed for {self.email}: {email_error}") if not login_success: available_methods = [] if self.username.strip(): available_methods.append("username") if self.email.strip(): available_methods.append("email") self.logger.error(f"Failed to authenticate with {self.server} using {' and '.join(available_methods)}") return False return True except Exception as e: self.logger.error(f"Failed to connect to {self.server}: {e}") return False def disconnect(self): if self.connection: try: self.connection.close() self.connection.logout() self.logger.info(f"Disconnected from {self.server}") except: pass def get_folders(self): try: status, folders = self.connection.list() if status == 'OK': folder_list = [] for folder in folders: parts = folder.decode().split('"') if len(parts) >= 3: # The folder name is the last quoted part folder_name = parts[-1].strip() if folder_name: # Skip empty folder names folder_list.append(folder_name) return folder_list except Exception as e: self.logger.error(f"Error getting folders: {e}") return [] def select_folder(self, folder): try: status, response = self.connection.select(f'"{folder}"') if status == 'OK': count = int(response[0]) return True, count except Exception as e: self.logger.error(f"Error selecting folder '{folder}': {e}") return False, 0 def get_message_ids(self): try: status, messages = self.connection.search(None, 'ALL') if status == 'OK': return messages[0].split() except Exception as e: self.logger.error(f"Error getting message IDs: {e}") return [] def fetch_message(self, msg_id): try: status, msg_data = self.connection.fetch(msg_id, '(RFC822)') if status == 'OK': raw_email = msg_data[0][1] return email.message_from_bytes(raw_email) except Exception as e: self.logger.error(f"Error fetching message {msg_id}: {e}") return None def append_message(self, folder, message, flags='', date_time=None): try: # Don't create folder here - let the migration logic handle it msg_bytes = message.as_bytes() date_str = None if date_time: self.logger.debug(f"Processing date_time: {type(date_time)} - {date_time}") try: # Handle both datetime objects and strings if hasattr(date_time, 'strftime'): date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z") self.logger.debug(f"Formatted date string: {date_str}") else: date_str = str(date_time) self.logger.debug(f"Date as string: {date_str}") except Exception as date_error: self.logger.warning(f"Error formatting date {date_time}: {date_error}") date_str = None # First try with date if available if date_str is not None: try: status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes) if status == 'OK': return True else: self.logger.warning(f"Date upload failed for folder '{folder}', trying without date") except Exception as date_error: self.logger.warning(f"Date upload error for folder '{folder}': {date_error}, trying without date") # Fallback: try without date status, response = self.connection.append(f'"{folder}"', flags, None, msg_bytes) return status == 'OK' except Exception as e: self.logger.error(f"Error appending message to folder '{folder}': {e}") return False def create_folder(self, folder): try: status, response = self.connection.create(f'"{folder}"') if status == 'OK': return True, False # Created successfully elif 'already exists' in str(response).lower(): return True, True # Already exists else: return False, False # Failed to create except: return True, True # Assume exists on error def folder_exists(self, folder): """Check if a folder exists""" try: status, response = self.connection.select(f'"{folder}"', readonly=True) return status == 'OK' except: return False def get_message_ids_with_headers(self): """Get message IDs with basic headers for duplicate detection""" try: status, messages = self.connection.search(None, 'ALL') if status == 'OK': msg_ids = messages[0].split() messages_info = [] for msg_id in msg_ids: try: # Fetch only headers for efficiency status, msg_data = self.connection.fetch(msg_id, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID SUBJECT DATE FROM)])') if status == 'OK' and msg_data[0] is not None: header_data = msg_data[0][1] if msg_data[0][1] else b'' messages_info.append({ 'id': msg_id, 'headers': header_data }) except Exception as e: self.logger.debug(f"Error fetching headers for message {msg_id}: {e}") continue return messages_info except Exception as e: self.logger.error(f"Error getting message IDs with headers: {e}") return [] class EmailMigrator: def __init__(self, config): self.config = config self.logger = setup_logging(config.get('LOG_LEVEL', 'INFO')) self.temp_dir = Path(config.get('TEMP_DOWNLOAD_DIR', './temp_emails')) self.temp_dir.mkdir(exist_ok=True) self.batch_size = int(config.get('BATCH_SIZE', '50')) self.preserve_flags = config.get('PRESERVE_FLAGS', 'True').lower() == 'true' self.preserve_dates = config.get('PRESERVE_DATES', 'True').lower() == 'true' # Track duplicates and existing folders self.duplicate_stats = {} self.existing_folders = set() self.created_folders = set() self.logger.info("Migration will preserve original folder structure") include_str = config.get('INCLUDE_FOLDERS', '') exclude_str = config.get('EXCLUDE_FOLDERS', '') self.include_folders = [f.strip() for f in include_str.split(',') if f.strip()] if include_str else [] self.exclude_folders = [f.strip() for f in exclude_str.split(',') if f.strip()] if exclude_str else [] timeout = int(config.get('IMAP_TIMEOUT', '60')) self.source = IMAPConnection( config['SOURCE_IMAP_SERVER'], int(config['SOURCE_IMAP_PORT']), config.get('SOURCE_EMAIL', ''), config.get('SOURCE_USERNAME', ''), config['SOURCE_PASSWORD'], config.get('SOURCE_IMAP_USE_SSL', 'True').lower() == 'true', timeout ) self.destination = IMAPConnection( config['DEST_IMAP_SERVER'], int(config['DEST_IMAP_PORT']), config.get('DEST_EMAIL', ''), config.get('DEST_USERNAME', ''), config['DEST_PASSWORD'], config.get('DEST_IMAP_USE_SSL', 'True').lower() == 'true', timeout ) def get_destination_folder(self, source_folder): """ Return the destination folder, which is the same as the source folder. Args: source_folder (str): Original folder name from source Returns: str: Destination folder name (same as source) """ return source_folder def generate_message_signature(self, message): """Generate a unique signature for an email message""" try: # Use Message-ID if available (most reliable) message_id = message.get('Message-ID', '').strip() if message_id: return hashlib.md5(message_id.encode()).hexdigest() # Fallback: combine subject, date, and from subject = message.get('Subject', '').strip() date = message.get('Date', '').strip() from_addr = message.get('From', '').strip() # Create signature from available fields signature_string = f"{subject}|{date}|{from_addr}" return hashlib.md5(signature_string.encode()).hexdigest() except Exception as e: self.logger.debug(f"Error generating message signature: {e}") return None def get_existing_message_signatures(self, folder): """Get signatures of existing messages in destination folder""" try: if not self.destination.folder_exists(folder): return set() success, count = self.destination.select_folder(folder) if not success: return set() existing_messages = self.destination.get_message_ids_with_headers() signatures = set() for msg_info in existing_messages: try: # Parse headers to create email object msg = email.message_from_bytes(msg_info['headers']) signature = self.generate_message_signature(msg) if signature: signatures.add(signature) except Exception as e: self.logger.debug(f"Error processing existing message: {e}") continue return signatures except Exception as e: self.logger.error(f"Error getting existing message signatures for {folder}: {e}") return set() def should_process_folder(self, folder): if self.include_folders and folder not in self.include_folders: return False if self.exclude_folders and folder in self.exclude_folders: return False return True def download_emails_from_folder(self, folder): self.logger.info(f"Downloading emails from folder: {folder}") print_status("PROCESSING", f"Accessing folder: {folder}") success, count = self.source.select_folder(folder) if not success: self.logger.error(f"Failed to select source folder: {folder}") print_status("ERROR", f"Could not access folder: {folder}") return [] message_ids = self.source.get_message_ids() msg_count = len(message_ids) self.logger.info(f"Found {msg_count} messages in folder: {folder}") if msg_count == 0: print_status("INFO", f"Folder '{folder}' is empty") return [] # Get existing message signatures from destination to check for duplicates destination_folder = self.get_destination_folder(folder) print_status("INFO", f"Checking for existing emails in destination '{destination_folder}'") existing_signatures = self.get_existing_message_signatures(destination_folder) print_status("INFO", f"Downloading {msg_count} messages from '{folder}'") emails = [] duplicates_found = 0 for i, msg_id in enumerate(message_ids, 1): try: msg = self.source.fetch_message(msg_id) if msg: # Check if this message already exists in destination signature = self.generate_message_signature(msg) if signature and signature in existing_signatures: duplicates_found += 1 self.logger.debug(f"Duplicate message found: {msg.get('Subject', 'No Subject')[:50]}") continue emails.append({ 'message': msg, 'folder': folder, 'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id), 'signature': signature }) # Update progress bar if msg_count > 1: print_progress_bar(i, msg_count, f"from {folder}") if i % self.batch_size == 0: self.logger.info(f"Downloaded {i}/{msg_count} messages from {folder}") except Exception as e: self.logger.error(f"Error downloading message {msg_id} from {folder}: {e}") continue # Update duplicate statistics if duplicates_found > 0: self.duplicate_stats[folder] = duplicates_found print_status("INFO", f"Skipped {duplicates_found} duplicate emails in '{folder}'") self.logger.info(f"Successfully downloaded {len(emails)} new messages from {folder}") if len(emails) > 0: print_status("SUCCESS", f"Downloaded {len(emails)} new messages from '{folder}'") elif duplicates_found > 0: print_status("INFO", f"All {duplicates_found} emails in '{folder}' already exist in destination") return emails def upload_emails_to_folder(self, emails, destination_folder): email_count = len(emails) if email_count == 0: return 0 self.logger.info(f"Uploading {email_count} emails to folder: {destination_folder}") # Check if folder exists and create if necessary folder_exists = self.destination.folder_exists(destination_folder) if folder_exists: if destination_folder not in self.existing_folders: self.existing_folders.add(destination_folder) print_status("INFO", f"Using existing folder: '{destination_folder}'") else: created, was_existing = self.destination.create_folder(destination_folder) if created: if was_existing: self.existing_folders.add(destination_folder) print_status("INFO", f"Using existing folder: '{destination_folder}'") else: self.created_folders.add(destination_folder) print_status("SUCCESS", f"Created new folder: '{destination_folder}'") else: print_status("ERROR", f"Failed to create folder: '{destination_folder}'") return 0 print_status("PROCESSING", f"Uploading to folder: {destination_folder}") uploaded = 0 for i, email_data in enumerate(emails, 1): try: message = email_data['message'] flags = '\\Seen' if self.preserve_flags else '' date_obj = None if self.preserve_dates and message.get('Date'): try: date_obj = email.utils.parsedate_to_datetime(message['Date']) self.logger.debug(f"Parsed date object: {type(date_obj)} - {date_obj}") except Exception as e: self.logger.warning(f"Failed to parse date '{message.get('Date')}': {e}") date_obj = None if self.destination.append_message(destination_folder, message, flags, date_obj): uploaded += 1 # Update progress bar if email_count > 1: print_progress_bar(i, email_count, f"to {destination_folder}") if i % self.batch_size == 0: self.logger.info(f"Uploaded {i}/{email_count} messages to {destination_folder}") except Exception as e: self.logger.error(f"Error uploading message to {destination_folder}: {e}") continue self.logger.info(f"Successfully uploaded {uploaded}/{email_count} messages to {destination_folder}") if uploaded > 0: print_status("SUCCESS", f"Uploaded {uploaded}/{email_count} messages to '{destination_folder}'") return uploaded def migrate_folder(self, source_folder): stats = {'downloaded': 0, 'uploaded': 0} if not self.should_process_folder(source_folder): self.logger.info(f"Skipping folder: {source_folder} (filtered)") print_status("INFO", f"Skipping folder '{source_folder}' (filtered)") return stats try: # Determine destination folder based on configuration destination_folder = self.get_destination_folder(source_folder) self.logger.info(f"Migrating '{source_folder}' -> '{destination_folder}'") emails = self.download_emails_from_folder(source_folder) stats['downloaded'] = len(emails) if emails: stats['uploaded'] = self.upload_emails_to_folder(emails, destination_folder) except Exception as e: self.logger.error(f"Error migrating folder {source_folder}: {e}") return stats def run_migration(self): self.logger.info("Starting email migration...") print_status("INFO", "Initializing migration process...") total_stats = { 'folders_processed': 0, 'total_downloaded': 0, 'total_uploaded': 0, 'errors': 0, 'total_duplicates': 0, 'folders_created': 0, 'folders_existed': 0 } try: print_status("CONNECTING", "Connecting to source server...") if not self.source.connect(): self.logger.error("Failed to connect to source server") print_status("ERROR", "Failed to connect to source server") return total_stats print_status("CONNECTING", "Connecting to destination server...") if not self.destination.connect(): self.logger.error("Failed to connect to destination server") print_status("ERROR", "Failed to connect to destination server") return total_stats folders = self.source.get_folders() folder_count = len(folders) self.logger.info(f"Found {folder_count} folders to process") print_status("INFO", f"Found {folder_count} folders to analyze") print(f"\n{Colors.CYAN}{'─' * 50}") print(f"{Colors.BOLD}{Colors.WHITE}MIGRATION PROGRESS{Colors.RESET}") print(f"{Colors.CYAN}{'─' * 50}{Colors.RESET}") for i, folder in enumerate(folders, 1): try: self.logger.info(f"Processing folder: {folder}") print(f"\n{Colors.MAGENTA}[{i}/{folder_count}]{Colors.RESET} Processing folder: {Colors.BOLD}{folder}{Colors.RESET}") stats = self.migrate_folder(folder) total_stats['folders_processed'] += 1 total_stats['total_downloaded'] += stats['downloaded'] total_stats['total_uploaded'] += stats['uploaded'] destination_folder = self.get_destination_folder(folder) self.logger.info(f"Folder '{folder}' -> '{destination_folder}' completed: {stats['downloaded']} downloaded, {stats['uploaded']} uploaded") # Show folder completion status if stats['downloaded'] > 0: print_status("SUCCESS", f"Folder '{folder}' completed: {stats['uploaded']}/{stats['downloaded']} emails migrated") else: print_status("INFO", f"Folder '{folder}' was empty - skipped") except Exception as e: self.logger.error(f"Error processing folder {folder}: {e}") print_status("ERROR", f"Failed to process folder '{folder}': {e}") total_stats['errors'] += 1 # Calculate final statistics total_stats['total_duplicates'] = sum(self.duplicate_stats.values()) total_stats['folders_created'] = len(self.created_folders) total_stats['folders_existed'] = len(self.existing_folders) finally: print_status("INFO", "Closing connections...") self.source.disconnect() self.destination.disconnect() return total_stats def main(): print_banner() try: print_status("INFO", "Loading configuration from .env file...") config = load_env_file() print_status("SUCCESS", "Configuration loaded successfully") except Exception as e: print_status("ERROR", f"Failed to load .env file: {e}") exit(1) # Basic required variables print_status("INFO", "Validating configuration...") required_vars = ['SOURCE_IMAP_SERVER', 'SOURCE_PASSWORD', 'DEST_IMAP_SERVER', 'DEST_PASSWORD'] missing_vars = [var for var in required_vars if not config.get(var)] if missing_vars: print_status("ERROR", f"Missing required variables: {', '.join(missing_vars)}") print_status("ERROR", "Please check your .env file") exit(1) # Validate authentication methods source_email = config.get('SOURCE_EMAIL', '').strip() source_username = config.get('SOURCE_USERNAME', '').strip() dest_email = config.get('DEST_EMAIL', '').strip() dest_username = config.get('DEST_USERNAME', '').strip() if not source_email and not source_username: print_status("ERROR", "Either SOURCE_EMAIL or SOURCE_USERNAME must be provided for source account") exit(1) if not dest_email and not dest_username: print_status("ERROR", "Either DEST_EMAIL or DEST_USERNAME must be provided for destination account") exit(1) print_status("SUCCESS", "Configuration validation completed") print_status("INFO", "Initializing migration engine...") migrator = EmailMigrator(config) try: stats = migrator.run_migration() print_status("SUCCESS", "Migration completed successfully!") # Display summary summary_stats = { "Folders processed": f"{Colors.CYAN}{stats['folders_processed']}{Colors.RESET}", "Emails downloaded": f"{Colors.GREEN}{stats['total_downloaded']}{Colors.RESET}", "Emails uploaded": f"{Colors.GREEN}{stats['total_uploaded']}{Colors.RESET}", "Duplicates skipped": f"{Colors.YELLOW}{stats['total_duplicates']}{Colors.RESET}", "Folders created": f"{Colors.CYAN}{stats['folders_created']}{Colors.RESET}", "Folders existed": f"{Colors.BLUE}{stats['folders_existed']}{Colors.RESET}", "Errors encountered": f"{Colors.RED if stats['errors'] > 0 else Colors.GREEN}{stats['errors']}{Colors.RESET}" } print_summary_section("MIGRATION SUMMARY", summary_stats) # Show duplicate details if any if stats['total_duplicates'] > 0: duplicate_separator = f"{Colors.YELLOW}{'─' * 60}{Colors.RESET}" print(f"\n{duplicate_separator}") print(f"{Colors.BOLD}{Colors.WHITE}{'DUPLICATE EMAILS SKIPPED':^60}{Colors.RESET}") print(f"{duplicate_separator}") for folder, count in migrator.duplicate_stats.items(): print(f"{Colors.YELLOW}{folder}:{Colors.RESET} {count}") print(f"{duplicate_separator}") print(f"\n{Colors.YELLOW}Note: {stats['total_duplicates']} duplicate emails were not imported{Colors.RESET}") print(f"{Colors.YELLOW}because they already exist in the destination folders.{Colors.RESET}") if stats['errors'] > 0: print_status("WARNING", "Check 'email_migration.log' for error details") # Important notice section notice_separator = f"{Colors.YELLOW}{'─' * 60}{Colors.RESET}" print(f"\n{notice_separator}") print(f"{Colors.BOLD}{Colors.WHITE}{'IMPORTANT NOTE':^60}{Colors.RESET}") print(f"{notice_separator}") print(f"{Colors.YELLOW}Some webmail clients may not show new folders immediately.{Colors.RESET}") print(f"{Colors.YELLOW}You may need to:{Colors.RESET}") print(f"{Colors.YELLOW} {Colors.CYAN}1.{Colors.RESET} {Colors.YELLOW}Refresh your webmail interface{Colors.RESET}") print(f"{Colors.YELLOW} {Colors.CYAN}2.{Colors.RESET} {Colors.YELLOW}Check folder settings/preferences{Colors.RESET}") print(f"{Colors.YELLOW} {Colors.CYAN}3.{Colors.RESET} {Colors.YELLOW}Manually subscribe to new folders{Colors.RESET}") print(f"{notice_separator}") except KeyboardInterrupt: print_status("WARNING", "Migration interrupted by user") sys.exit(0) except Exception as e: print_status("ERROR", f"Migration failed: {e}") sys.exit(1) if __name__ == "__main__": main()