email_migration/email_migration.py
Elmar Sönser 1d7fe31845 Add dual authentication support and comprehensive provider examples
Core functionality:
- Add dual authentication support (username + email fallback)
- Enhance IMAPConnection to try username first, then email
- Add SOURCE_USERNAME and DEST_USERNAME configuration options
- Improve authentication error handling and logging

Configuration updates:
- Add Host Europe to Securehost.de migration examples
- Include authentication method explanations for various providers
- Add comprehensive provider-specific settings (Host Europe, Securehost.de, etc.)
- Document username vs email login methods with examples

Documentation updates:
- Add dual authentication section in README
- Include Host Europe and Securehost.de specific examples
- Expand troubleshooting section with authentication help
- Add provider-specific troubleshooting guidance
- Include migration best practices

Features:
- Automatic fallback from username to email authentication
- Enhanced logging showing which authentication method succeeded
- Support for various hosting providers and their login requirements
- Improved error messages for authentication failures
2025-09-24 14:16:49 +02:00

391 lines
15 KiB
Python
Executable file

#!/usr/bin/env python3
import imaplib
import email
import email.utils
import ssl
import logging
from datetime import datetime
from pathlib import Path
def load_env_file():
env_vars = {}
with open('.env', 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key.strip()] = value.strip()
return env_vars
def setup_logging(log_level):
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
logging.basicConfig(
level=numeric_level,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('email_migration.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
class IMAPConnection:
def __init__(self, server, port, email_addr, username, password, use_ssl=True, timeout=60):
self.server = server
self.port = port
self.email = email_addr
self.username = username
self.password = password
self.use_ssl = use_ssl
self.timeout = timeout
self.connection = None
self.logger = logging.getLogger(__name__)
def connect(self):
try:
if self.use_ssl:
context = ssl.create_default_context()
self.connection = imaplib.IMAP4_SSL(self.server, self.port, ssl_context=context)
else:
self.connection = imaplib.IMAP4(self.server, self.port)
self.connection.sock.settimeout(self.timeout)
# Try authentication with username first (if provided), then fall back to email
login_success = False
login_user = None
if self.username and self.username.strip():
try:
self.connection.login(self.username, self.password)
login_user = self.username
login_success = True
self.logger.info(f"Connected to {self.server} using username: {self.username}")
except Exception as username_error:
self.logger.debug(f"Username login failed for {self.username}: {username_error}")
# If username login failed or no username provided, try email
if not login_success:
try:
self.connection.login(self.email, self.password)
login_user = self.email
login_success = True
self.logger.info(f"Connected to {self.server} using email: {self.email}")
except Exception as email_error:
self.logger.error(f"Email login failed for {self.email}: {email_error}")
if not login_success:
self.logger.error(f"Failed to authenticate with {self.server} using both username and email")
return False
return True
except Exception as e:
self.logger.error(f"Failed to connect to {self.server}: {e}")
return False
def disconnect(self):
if self.connection:
try:
self.connection.close()
self.connection.logout()
self.logger.info(f"Disconnected from {self.server}")
except:
pass
def get_folders(self):
try:
status, folders = self.connection.list()
if status == 'OK':
folder_list = []
for folder in folders:
parts = folder.decode().split('"')
if len(parts) >= 3:
folder_name = parts[-2]
folder_list.append(folder_name)
return folder_list
except Exception as e:
self.logger.error(f"Error getting folders: {e}")
return []
def select_folder(self, folder):
try:
status, response = self.connection.select(f'"{folder}"')
if status == 'OK':
count = int(response[0])
return True, count
except Exception as e:
self.logger.error(f"Error selecting folder '{folder}': {e}")
return False, 0
def get_message_ids(self):
try:
status, messages = self.connection.search(None, 'ALL')
if status == 'OK':
return messages[0].split()
except Exception as e:
self.logger.error(f"Error getting message IDs: {e}")
return []
def fetch_message(self, msg_id):
try:
status, msg_data = self.connection.fetch(msg_id, '(RFC822)')
if status == 'OK':
raw_email = msg_data[0][1]
return email.message_from_bytes(raw_email)
except Exception as e:
self.logger.error(f"Error fetching message {msg_id}: {e}")
return None
def append_message(self, folder, message, flags='', date_time=None):
try:
self.create_folder(folder)
msg_bytes = message.as_bytes()
date_str = None
if date_time:
date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z")
status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes)
return status == 'OK'
except Exception as e:
self.logger.error(f"Error appending message to folder '{folder}': {e}")
return False
def create_folder(self, folder):
try:
status, response = self.connection.create(f'"{folder}"')
return status == 'OK' or 'already exists' in str(response).lower()
except:
return True
class EmailMigrator:
def __init__(self, config):
self.config = config
self.logger = setup_logging(config.get('LOG_LEVEL', 'INFO'))
self.temp_dir = Path(config.get('TEMP_DOWNLOAD_DIR', './temp_emails'))
self.temp_dir.mkdir(exist_ok=True)
self.batch_size = int(config.get('BATCH_SIZE', '50'))
self.preserve_flags = config.get('PRESERVE_FLAGS', 'True').lower() == 'true'
self.preserve_dates = config.get('PRESERVE_DATES', 'True').lower() == 'true'
# New: Import folder configuration
self.import_folder_name = config.get('IMPORT_FOLDER_NAME', '').strip()
if not self.import_folder_name:
self.import_folder_name = None
if self.import_folder_name:
self.logger.info(f"Import folder configuration: All emails will be imported to subfolders within \"{self.import_folder_name}\"")
else:
self.logger.info("Import folder configuration: All emails will be imported directly to INBOX")
include_str = config.get('INCLUDE_FOLDERS', '')
exclude_str = config.get('EXCLUDE_FOLDERS', '')
self.include_folders = [f.strip() for f in include_str.split(',') if f.strip()] if include_str else []
self.exclude_folders = [f.strip() for f in exclude_str.split(',') if f.strip()] if exclude_str else []
timeout = int(config.get('IMAP_TIMEOUT', '60'))
self.source = IMAPConnection(
config['SOURCE_IMAP_SERVER'],
int(config['SOURCE_IMAP_PORT']),
config['SOURCE_EMAIL'],
config.get('SOURCE_USERNAME', ''),
config['SOURCE_PASSWORD'],
config.get('SOURCE_IMAP_USE_SSL', 'True').lower() == 'true',
timeout
)
self.destination = IMAPConnection(
config['DEST_IMAP_SERVER'],
int(config['DEST_IMAP_PORT']),
config['DEST_EMAIL'],
config.get('DEST_USERNAME', ''),
config['DEST_PASSWORD'],
config.get('DEST_IMAP_USE_SSL', 'True').lower() == 'true',
timeout
)
def get_destination_folder(self, source_folder):
"""
Determine the destination folder based on the import configuration.
Args:
source_folder (str): Original folder name from source
Returns:
str: Destination folder name
"""
if self.import_folder_name:
# Import into subfolders within the specified import folder
return f"{self.import_folder_name}/{source_folder}"
else:
# Import all emails directly to INBOX
return "INBOX"
def should_process_folder(self, folder):
if self.include_folders and folder not in self.include_folders:
return False
if self.exclude_folders and folder in self.exclude_folders:
return False
return True
def download_emails_from_folder(self, folder):
self.logger.info(f"Downloading emails from folder: {folder}")
success, count = self.source.select_folder(folder)
if not success:
self.logger.error(f"Failed to select source folder: {folder}")
return []
message_ids = self.source.get_message_ids()
self.logger.info(f"Found {len(message_ids)} messages in folder: {folder}")
emails = []
for i, msg_id in enumerate(message_ids, 1):
try:
msg = self.source.fetch_message(msg_id)
if msg:
emails.append({
'message': msg,
'folder': folder,
'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)
})
if i % self.batch_size == 0:
self.logger.info(f"Downloaded {i}/{len(message_ids)} messages from {folder}")
except Exception as e:
self.logger.error(f"Error downloading message {msg_id} from {folder}: {e}")
continue
self.logger.info(f"Successfully downloaded {len(emails)} messages from {folder}")
return emails
def upload_emails_to_folder(self, emails, destination_folder):
self.logger.info(f"Uploading {len(emails)} emails to folder: {destination_folder}")
uploaded = 0
for i, email_data in enumerate(emails, 1):
try:
message = email_data['message']
flags = '\\Seen' if self.preserve_flags else ''
date_obj = None
if self.preserve_dates and message.get('Date'):
try:
date_obj = email.utils.parsedate_to_datetime(message['Date'])
except:
pass
if self.destination.append_message(destination_folder, message, flags, date_obj):
uploaded += 1
if i % self.batch_size == 0:
self.logger.info(f"Uploaded {i}/{len(emails)} messages to {destination_folder}")
except Exception as e:
self.logger.error(f"Error uploading message to {destination_folder}: {e}")
continue
self.logger.info(f"Successfully uploaded {uploaded}/{len(emails)} messages to {destination_folder}")
return uploaded
def migrate_folder(self, source_folder):
stats = {'downloaded': 0, 'uploaded': 0}
if not self.should_process_folder(source_folder):
self.logger.info(f"Skipping folder: {source_folder} (filtered)")
return stats
try:
# Determine destination folder based on configuration
destination_folder = self.get_destination_folder(source_folder)
self.logger.info(f"Migrating '{source_folder}' -> '{destination_folder}'")
emails = self.download_emails_from_folder(source_folder)
stats['downloaded'] = len(emails)
if emails:
stats['uploaded'] = self.upload_emails_to_folder(emails, destination_folder)
except Exception as e:
self.logger.error(f"Error migrating folder {source_folder}: {e}")
return stats
def run_migration(self):
self.logger.info("Starting email migration...")
total_stats = {'folders_processed': 0, 'total_downloaded': 0, 'total_uploaded': 0, 'errors': 0}
try:
if not self.source.connect():
self.logger.error("Failed to connect to source server")
return total_stats
if not self.destination.connect():
self.logger.error("Failed to connect to destination server")
return total_stats
folders = self.source.get_folders()
self.logger.info(f"Found {len(folders)} folders to process")
# Create the main import folder if specified
if self.import_folder_name:
self.logger.info(f"Creating main import folder: {self.import_folder_name}")
self.destination.create_folder(self.import_folder_name)
for folder in folders:
try:
self.logger.info(f"Processing folder: {folder}")
stats = self.migrate_folder(folder)
total_stats['folders_processed'] += 1
total_stats['total_downloaded'] += stats['downloaded']
total_stats['total_uploaded'] += stats['uploaded']
destination_folder = self.get_destination_folder(folder)
self.logger.info(f"Folder '{folder}' -> '{destination_folder}' completed: {stats['downloaded']} downloaded, {stats['uploaded']} uploaded")
except Exception as e:
self.logger.error(f"Error processing folder {folder}: {e}")
total_stats['errors'] += 1
finally:
self.source.disconnect()
self.destination.disconnect()
return total_stats
def main():
print("Email Migration Script")
print("=" * 50)
try:
config = load_env_file()
except Exception as e:
print(f"Error loading .env file: {e}")
exit(1)
required_vars = ['SOURCE_IMAP_SERVER', 'SOURCE_EMAIL', 'SOURCE_PASSWORD',
'DEST_IMAP_SERVER', 'DEST_EMAIL', 'DEST_PASSWORD']
missing_vars = [var for var in required_vars if not config.get(var)]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Please check your .env file.")
exit(1)
migrator = EmailMigrator(config)
try:
stats = migrator.run_migration()
print("\nMigration completed!")
print(f"Folders processed: {stats['folders_processed']}")
print(f"Total emails downloaded: {stats['total_downloaded']}")
print(f"Total emails uploaded: {stats['total_uploaded']}")
print(f"Errors encountered: {stats['errors']}")
if stats['errors'] > 0:
print("\nCheck the log file 'email_migration.log' for error details.")
except KeyboardInterrupt:
print("\nMigration interrupted by user.")
except Exception as e:
print(f"Migration failed: {e}")
exit(1)
if __name__ == "__main__":
main()