#!/usr/bin/env python3 import imaplib import email import ssl import logging from datetime import datetime from pathlib import Path def load_env_file(): env_vars = {} with open('.env', 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) env_vars[key.strip()] = value.strip() return env_vars def setup_logging(log_level): numeric_level = getattr(logging, log_level.upper(), logging.INFO) logging.basicConfig( level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('email_migration.log'), logging.StreamHandler() ] ) return logging.getLogger(__name__) class IMAPConnection: def __init__(self, server, port, email_addr, password, use_ssl=True, timeout=60): self.server = server self.port = port self.email = email_addr self.password = password self.use_ssl = use_ssl self.timeout = timeout self.connection = None self.logger = logging.getLogger(__name__) def connect(self): try: if self.use_ssl: context = ssl.create_default_context() self.connection = imaplib.IMAP4_SSL(self.server, self.port, ssl_context=context) else: self.connection = imaplib.IMAP4(self.server, self.port) self.connection.sock.settimeout(self.timeout) self.connection.login(self.email, self.password) self.logger.info(f"Connected to {self.server} as {self.email}") return True except Exception as e: self.logger.error(f"Failed to connect to {self.server}: {e}") return False def disconnect(self): if self.connection: try: self.connection.close() self.connection.logout() self.logger.info(f"Disconnected from {self.server}") except: pass def get_folders(self): try: status, folders = self.connection.list() if status == 'OK': folder_list = [] for folder in folders: parts = folder.decode().split('"') if len(parts) >= 3: folder_name = parts[-2] folder_list.append(folder_name) return folder_list except Exception as e: self.logger.error(f"Error getting folders: {e}") return [] def select_folder(self, folder): try: status, response = self.connection.select(f'"{folder}"') if status == 'OK': count = int(response[0]) return True, count except Exception as e: self.logger.error(f"Error selecting folder '{folder}': {e}") return False, 0 def get_message_ids(self): try: status, messages = self.connection.search(None, 'ALL') if status == 'OK': return messages[0].split() except Exception as e: self.logger.error(f"Error getting message IDs: {e}") return [] def fetch_message(self, msg_id): try: status, msg_data = self.connection.fetch(msg_id, '(RFC822)') if status == 'OK': raw_email = msg_data[0][1] return email.message_from_bytes(raw_email) except Exception as e: self.logger.error(f"Error fetching message {msg_id}: {e}") return None def append_message(self, folder, message, flags='', date_time=None): try: self.create_folder(folder) msg_bytes = message.as_bytes() date_str = None if date_time: date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z") status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes) return status == 'OK' except Exception as e: self.logger.error(f"Error appending message to folder '{folder}': {e}") return False def create_folder(self, folder): try: status, response = self.connection.create(f'"{folder}"') return status == 'OK' or 'already exists' in str(response).lower() except: return True class EmailMigrator: def __init__(self, config): self.config = config self.logger = setup_logging(config.get('LOG_LEVEL', 'INFO')) self.temp_dir = Path(config.get('TEMP_DOWNLOAD_DIR', './temp_emails')) self.temp_dir.mkdir(exist_ok=True) self.batch_size = int(config.get('BATCH_SIZE', '50')) self.preserve_flags = config.get('PRESERVE_FLAGS', 'True').lower() == 'true' self.preserve_dates = config.get('PRESERVE_DATES', 'True').lower() == 'true' include_str = config.get('INCLUDE_FOLDERS', '') exclude_str = config.get('EXCLUDE_FOLDERS', '') self.include_folders = [f.strip() for f in include_str.split(',') if f.strip()] if include_str else [] self.exclude_folders = [f.strip() for f in exclude_str.split(',') if f.strip()] if exclude_str else [] timeout = int(config.get('IMAP_TIMEOUT', '60')) self.source = IMAPConnection( config['SOURCE_IMAP_SERVER'], int(config['SOURCE_IMAP_PORT']), config['SOURCE_EMAIL'], config['SOURCE_PASSWORD'], config.get('SOURCE_IMAP_USE_SSL', 'True').lower() == 'true', timeout ) self.destination = IMAPConnection( config['DEST_IMAP_SERVER'], int(config['DEST_IMAP_PORT']), config['DEST_EMAIL'], config['DEST_PASSWORD'], config.get('DEST_IMAP_USE_SSL', 'True').lower() == 'true', timeout ) def should_process_folder(self, folder): if self.include_folders and folder not in self.include_folders: return False if self.exclude_folders and folder in self.exclude_folders: return False return True def download_emails_from_folder(self, folder): self.logger.info(f"Downloading emails from folder: {folder}") success, count = self.source.select_folder(folder) if not success: self.logger.error(f"Failed to select source folder: {folder}") return [] message_ids = self.source.get_message_ids() self.logger.info(f"Found {len(message_ids)} messages in folder: {folder}") emails = [] for i, msg_id in enumerate(message_ids, 1): try: msg = self.source.fetch_message(msg_id) if msg: emails.append({ 'message': msg, 'folder': folder, 'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id) }) if i % self.batch_size == 0: self.logger.info(f"Downloaded {i}/{len(message_ids)} messages from {folder}") except Exception as e: self.logger.error(f"Error downloading message {msg_id} from {folder}: {e}") continue self.logger.info(f"Successfully downloaded {len(emails)} messages from {folder}") return emails def upload_emails_to_folder(self, emails, folder): self.logger.info(f"Uploading {len(emails)} emails to folder: {folder}") uploaded = 0 for i, email_data in enumerate(emails, 1): try: message = email_data['message'] flags = '\\Seen' if self.preserve_flags else '' date_obj = None if self.preserve_dates and message.get('Date'): try: date_obj = email.utils.parsedate_to_datetime(message['Date']) except: pass if self.destination.append_message(folder, message, flags, date_obj): uploaded += 1 if i % self.batch_size == 0: self.logger.info(f"Uploaded {i}/{len(emails)} messages to {folder}") except Exception as e: self.logger.error(f"Error uploading message to {folder}: {e}") continue self.logger.info(f"Successfully uploaded {uploaded}/{len(emails)} messages to {folder}") return uploaded def migrate_folder(self, folder): stats = {'downloaded': 0, 'uploaded': 0} if not self.should_process_folder(folder): self.logger.info(f"Skipping folder: {folder} (filtered)") return stats try: emails = self.download_emails_from_folder(folder) stats['downloaded'] = len(emails) if emails: stats['uploaded'] = self.upload_emails_to_folder(emails, folder) except Exception as e: self.logger.error(f"Error migrating folder {folder}: {e}") return stats def run_migration(self): self.logger.info("Starting email migration...") total_stats = {'folders_processed': 0, 'total_downloaded': 0, 'total_uploaded': 0, 'errors': 0} try: if not self.source.connect(): self.logger.error("Failed to connect to source server") return total_stats if not self.destination.connect(): self.logger.error("Failed to connect to destination server") return total_stats folders = self.source.get_folders() self.logger.info(f"Found {len(folders)} folders to process") for folder in folders: try: self.logger.info(f"Processing folder: {folder}") stats = self.migrate_folder(folder) total_stats['folders_processed'] += 1 total_stats['total_downloaded'] += stats['downloaded'] total_stats['total_uploaded'] += stats['uploaded'] self.logger.info(f"Folder '{folder}' completed: {stats['downloaded']} downloaded, {stats['uploaded']} uploaded") except Exception as e: self.logger.error(f"Error processing folder {folder}: {e}") total_stats['errors'] += 1 finally: self.source.disconnect() self.destination.disconnect() return total_stats def main(): print("Email Migration Script") print("=" * 50) try: config = load_env_file() except Exception as e: print(f"Error loading .env file: {e}") exit(1) required_vars = ['SOURCE_IMAP_SERVER', 'SOURCE_EMAIL', 'SOURCE_PASSWORD', 'DEST_IMAP_SERVER', 'DEST_EMAIL', 'DEST_PASSWORD'] missing_vars = [var for var in required_vars if not config.get(var)] if missing_vars: print(f"Error: Missing required environment variables: {', '.join(missing_vars)}") print("Please check your .env file.") exit(1) migrator = EmailMigrator(config) try: stats = migrator.run_migration() print("\nMigration completed!") print(f"Folders processed: {stats['folders_processed']}") print(f"Total emails downloaded: {stats['total_downloaded']}") print(f"Total emails uploaded: {stats['total_uploaded']}") print(f"Errors encountered: {stats['errors']}") if stats['errors'] > 0: print("\nCheck the log file 'email_migration.log' for error details.") except KeyboardInterrupt: print("\nMigration interrupted by user.") except Exception as e: print(f"Migration failed: {e}") exit(1) if __name__ == "__main__": main()