#!/usr/bin/env python3 import imaplib import email import email.utils import ssl import logging from datetime import datetime from pathlib import Path def load_env_file(): env_vars = {} with open('.env', 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) env_vars[key.strip()] = value.strip() return env_vars def setup_logging(log_level): numeric_level = getattr(logging, log_level.upper(), logging.INFO) logging.basicConfig( level=numeric_level, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('email_migration.log'), logging.StreamHandler() ] ) return logging.getLogger(__name__) class IMAPConnection: def __init__(self, server, port, email_addr, username, password, use_ssl=True, timeout=60): self.server = server self.port = port self.email = email_addr or "" self.username = username or "" self.password = password self.use_ssl = use_ssl self.timeout = timeout self.connection = None self.logger = logging.getLogger(__name__) # Validate that we have at least one authentication method if not self.email.strip() and not self.username.strip(): raise ValueError("Either email or username must be provided for authentication") def connect(self): try: if self.use_ssl: context = ssl.create_default_context() self.connection = imaplib.IMAP4_SSL(self.server, self.port, ssl_context=context) else: self.connection = imaplib.IMAP4(self.server, self.port) self.connection.sock.settimeout(self.timeout) # Try authentication with username first (if provided), then fall back to email login_success = False login_user = None if self.username and self.username.strip(): try: self.connection.login(self.username, self.password) login_user = self.username login_success = True self.logger.info(f"Connected to {self.server} using username: {self.username}") except Exception as username_error: self.logger.debug(f"Username login failed for {self.username}: {username_error}") # If username login failed or no username provided, try email if not login_success and self.email.strip(): try: self.connection.login(self.email, self.password) login_user = self.email login_success = True self.logger.info(f"Connected to {self.server} using email: {self.email}") except Exception as email_error: self.logger.error(f"Email login failed for {self.email}: {email_error}") if not login_success: available_methods = [] if self.username.strip(): available_methods.append("username") if self.email.strip(): available_methods.append("email") self.logger.error(f"Failed to authenticate with {self.server} using {' and '.join(available_methods)}") return False return True except Exception as e: self.logger.error(f"Failed to connect to {self.server}: {e}") return False def disconnect(self): if self.connection: try: self.connection.close() self.connection.logout() self.logger.info(f"Disconnected from {self.server}") except: pass def get_folders(self): try: status, folders = self.connection.list() if status == 'OK': folder_list = [] for folder in folders: parts = folder.decode().split('"') if len(parts) >= 3: folder_name = parts[-2] folder_list.append(folder_name) return folder_list except Exception as e: self.logger.error(f"Error getting folders: {e}") return [] def select_folder(self, folder): try: status, response = self.connection.select(f'"{folder}"') if status == 'OK': count = int(response[0]) return True, count except Exception as e: self.logger.error(f"Error selecting folder '{folder}': {e}") return False, 0 def get_message_ids(self): try: status, messages = self.connection.search(None, 'ALL') if status == 'OK': return messages[0].split() except Exception as e: self.logger.error(f"Error getting message IDs: {e}") return [] def fetch_message(self, msg_id): try: status, msg_data = self.connection.fetch(msg_id, '(RFC822)') if status == 'OK': raw_email = msg_data[0][1] return email.message_from_bytes(raw_email) except Exception as e: self.logger.error(f"Error fetching message {msg_id}: {e}") return None def append_message(self, folder, message, flags='', date_time=None): try: self.create_folder(folder) msg_bytes = message.as_bytes() date_str = None if date_time: date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z") status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes) return status == 'OK' except Exception as e: self.logger.error(f"Error appending message to folder '{folder}': {e}") return False def create_folder(self, folder): try: status, response = self.connection.create(f'"{folder}"') return status == 'OK' or 'already exists' in str(response).lower() except: return True class EmailMigrator: def __init__(self, config): self.config = config self.logger = setup_logging(config.get('LOG_LEVEL', 'INFO')) self.temp_dir = Path(config.get('TEMP_DOWNLOAD_DIR', './temp_emails')) self.temp_dir.mkdir(exist_ok=True) self.batch_size = int(config.get('BATCH_SIZE', '50')) self.preserve_flags = config.get('PRESERVE_FLAGS', 'True').lower() == 'true' self.preserve_dates = config.get('PRESERVE_DATES', 'True').lower() == 'true' # New: Import folder configuration self.import_folder_name = config.get('IMPORT_FOLDER_NAME', '').strip() if not self.import_folder_name: self.import_folder_name = None if self.import_folder_name: self.logger.info(f"Import folder configuration: All emails will be imported to subfolders within \"{self.import_folder_name}\"") else: self.logger.info("Import folder configuration: All emails will be imported directly to INBOX") include_str = config.get('INCLUDE_FOLDERS', '') exclude_str = config.get('EXCLUDE_FOLDERS', '') self.include_folders = [f.strip() for f in include_str.split(',') if f.strip()] if include_str else [] self.exclude_folders = [f.strip() for f in exclude_str.split(',') if f.strip()] if exclude_str else [] timeout = int(config.get('IMAP_TIMEOUT', '60')) self.source = IMAPConnection( config['SOURCE_IMAP_SERVER'], int(config['SOURCE_IMAP_PORT']), config.get('SOURCE_EMAIL', ''), config.get('SOURCE_USERNAME', ''), config['SOURCE_PASSWORD'], config.get('SOURCE_IMAP_USE_SSL', 'True').lower() == 'true', timeout ) self.destination = IMAPConnection( config['DEST_IMAP_SERVER'], int(config['DEST_IMAP_PORT']), config.get('DEST_EMAIL', ''), config.get('DEST_USERNAME', ''), config['DEST_PASSWORD'], config.get('DEST_IMAP_USE_SSL', 'True').lower() == 'true', timeout ) def get_destination_folder(self, source_folder): """ Determine the destination folder based on the import configuration. Args: source_folder (str): Original folder name from source Returns: str: Destination folder name """ if self.import_folder_name: # Import into subfolders within the specified import folder return f"{self.import_folder_name}/{source_folder}" else: # Import all emails directly to INBOX return "INBOX" def should_process_folder(self, folder): if self.include_folders and folder not in self.include_folders: return False if self.exclude_folders and folder in self.exclude_folders: return False return True def download_emails_from_folder(self, folder): self.logger.info(f"Downloading emails from folder: {folder}") success, count = self.source.select_folder(folder) if not success: self.logger.error(f"Failed to select source folder: {folder}") return [] message_ids = self.source.get_message_ids() self.logger.info(f"Found {len(message_ids)} messages in folder: {folder}") emails = [] for i, msg_id in enumerate(message_ids, 1): try: msg = self.source.fetch_message(msg_id) if msg: emails.append({ 'message': msg, 'folder': folder, 'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id) }) if i % self.batch_size == 0: self.logger.info(f"Downloaded {i}/{len(message_ids)} messages from {folder}") except Exception as e: self.logger.error(f"Error downloading message {msg_id} from {folder}: {e}") continue self.logger.info(f"Successfully downloaded {len(emails)} messages from {folder}") return emails def upload_emails_to_folder(self, emails, destination_folder): self.logger.info(f"Uploading {len(emails)} emails to folder: {destination_folder}") uploaded = 0 for i, email_data in enumerate(emails, 1): try: message = email_data['message'] flags = '\\Seen' if self.preserve_flags else '' date_obj = None if self.preserve_dates and message.get('Date'): try: date_obj = email.utils.parsedate_to_datetime(message['Date']) except: pass if self.destination.append_message(destination_folder, message, flags, date_obj): uploaded += 1 if i % self.batch_size == 0: self.logger.info(f"Uploaded {i}/{len(emails)} messages to {destination_folder}") except Exception as e: self.logger.error(f"Error uploading message to {destination_folder}: {e}") continue self.logger.info(f"Successfully uploaded {uploaded}/{len(emails)} messages to {destination_folder}") return uploaded def migrate_folder(self, source_folder): stats = {'downloaded': 0, 'uploaded': 0} if not self.should_process_folder(source_folder): self.logger.info(f"Skipping folder: {source_folder} (filtered)") return stats try: # Determine destination folder based on configuration destination_folder = self.get_destination_folder(source_folder) self.logger.info(f"Migrating '{source_folder}' -> '{destination_folder}'") emails = self.download_emails_from_folder(source_folder) stats['downloaded'] = len(emails) if emails: stats['uploaded'] = self.upload_emails_to_folder(emails, destination_folder) except Exception as e: self.logger.error(f"Error migrating folder {source_folder}: {e}") return stats def run_migration(self): self.logger.info("Starting email migration...") total_stats = {'folders_processed': 0, 'total_downloaded': 0, 'total_uploaded': 0, 'errors': 0} try: if not self.source.connect(): self.logger.error("Failed to connect to source server") return total_stats if not self.destination.connect(): self.logger.error("Failed to connect to destination server") return total_stats folders = self.source.get_folders() self.logger.info(f"Found {len(folders)} folders to process") # Create the main import folder if specified if self.import_folder_name: self.logger.info(f"Creating main import folder: {self.import_folder_name}") self.destination.create_folder(self.import_folder_name) for folder in folders: try: self.logger.info(f"Processing folder: {folder}") stats = self.migrate_folder(folder) total_stats['folders_processed'] += 1 total_stats['total_downloaded'] += stats['downloaded'] total_stats['total_uploaded'] += stats['uploaded'] destination_folder = self.get_destination_folder(folder) self.logger.info(f"Folder '{folder}' -> '{destination_folder}' completed: {stats['downloaded']} downloaded, {stats['uploaded']} uploaded") except Exception as e: self.logger.error(f"Error processing folder {folder}: {e}") total_stats['errors'] += 1 finally: self.source.disconnect() self.destination.disconnect() return total_stats def main(): print("Email Migration Script") print("=" * 50) try: config = load_env_file() except Exception as e: print(f"Error loading .env file: {e}") exit(1) # Basic required variables required_vars = ['SOURCE_IMAP_SERVER', 'SOURCE_PASSWORD', 'DEST_IMAP_SERVER', 'DEST_PASSWORD'] missing_vars = [var for var in required_vars if not config.get(var)] if missing_vars: print(f"Error: Missing required environment variables: {', '.join(missing_vars)}") print("Please check your .env file.") exit(1) # Validate authentication methods source_email = config.get('SOURCE_EMAIL', '').strip() source_username = config.get('SOURCE_USERNAME', '').strip() dest_email = config.get('DEST_EMAIL', '').strip() dest_username = config.get('DEST_USERNAME', '').strip() if not source_email and not source_username: print("Error: Either SOURCE_EMAIL or SOURCE_USERNAME must be provided for source account") exit(1) if not dest_email and not dest_username: print("Error: Either DEST_EMAIL or DEST_USERNAME must be provided for destination account") exit(1) migrator = EmailMigrator(config) try: stats = migrator.run_migration() print("\nMigration completed!") print(f"Folders processed: {stats['folders_processed']}") print(f"Total emails downloaded: {stats['total_downloaded']}") print(f"Total emails uploaded: {stats['total_uploaded']}") print(f"Errors encountered: {stats['errors']}") if stats['errors'] > 0: print("\nCheck the log file 'email_migration.log' for error details.") except KeyboardInterrupt: print("\nMigration interrupted by user.") except Exception as e: print(f"Migration failed: {e}") exit(1) if __name__ == "__main__": main()