Key improvements: - Either email OR username can be empty (but not both) - Enhanced validation logic for authentication methods - Improved error messages showing available auth methods - More flexible IMAPConnection constructor with better defaults Authentication options: 1. Email-only: SOURCE_EMAIL=user@domain.de, SOURCE_USERNAME= 2. Username-only: SOURCE_EMAIL=, SOURCE_USERNAME=username123 3. Both (fallback): Provide both for automatic fallback 4. Dual method: Username tried first, email as fallback Configuration updates: - Update .env.template with all four authentication examples - Clarify requirements and optional nature of fields - Add comprehensive authentication scenarios Documentation updates: - Expand README with four authentication options - Add troubleshooting for different auth combinations - Clarify flexible requirements (either email OR username required) This makes the script compatible with even more email providers and hosting scenarios.
415 lines
16 KiB
Python
Executable file
415 lines
16 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import imaplib
|
|
import email
|
|
import email.utils
|
|
import ssl
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
def load_env_file():
|
|
env_vars = {}
|
|
with open('.env', 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and '=' in line:
|
|
key, value = line.split('=', 1)
|
|
env_vars[key.strip()] = value.strip()
|
|
return env_vars
|
|
|
|
def setup_logging(log_level):
|
|
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
|
|
logging.basicConfig(
|
|
level=numeric_level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('email_migration.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
return logging.getLogger(__name__)
|
|
|
|
class IMAPConnection:
|
|
def __init__(self, server, port, email_addr, username, password, use_ssl=True, timeout=60):
|
|
self.server = server
|
|
self.port = port
|
|
self.email = email_addr or ""
|
|
self.username = username or ""
|
|
self.password = password
|
|
self.use_ssl = use_ssl
|
|
self.timeout = timeout
|
|
self.connection = None
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Validate that we have at least one authentication method
|
|
if not self.email.strip() and not self.username.strip():
|
|
raise ValueError("Either email or username must be provided for authentication")
|
|
|
|
def connect(self):
|
|
try:
|
|
if self.use_ssl:
|
|
context = ssl.create_default_context()
|
|
self.connection = imaplib.IMAP4_SSL(self.server, self.port, ssl_context=context)
|
|
else:
|
|
self.connection = imaplib.IMAP4(self.server, self.port)
|
|
|
|
self.connection.sock.settimeout(self.timeout)
|
|
|
|
# Try authentication with username first (if provided), then fall back to email
|
|
login_success = False
|
|
login_user = None
|
|
|
|
if self.username and self.username.strip():
|
|
try:
|
|
self.connection.login(self.username, self.password)
|
|
login_user = self.username
|
|
login_success = True
|
|
self.logger.info(f"Connected to {self.server} using username: {self.username}")
|
|
except Exception as username_error:
|
|
self.logger.debug(f"Username login failed for {self.username}: {username_error}")
|
|
|
|
# If username login failed or no username provided, try email
|
|
if not login_success and self.email.strip():
|
|
try:
|
|
self.connection.login(self.email, self.password)
|
|
login_user = self.email
|
|
login_success = True
|
|
self.logger.info(f"Connected to {self.server} using email: {self.email}")
|
|
except Exception as email_error:
|
|
self.logger.error(f"Email login failed for {self.email}: {email_error}")
|
|
|
|
if not login_success:
|
|
available_methods = []
|
|
if self.username.strip():
|
|
available_methods.append("username")
|
|
if self.email.strip():
|
|
available_methods.append("email")
|
|
self.logger.error(f"Failed to authenticate with {self.server} using {' and '.join(available_methods)}")
|
|
return False
|
|
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect to {self.server}: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
if self.connection:
|
|
try:
|
|
self.connection.close()
|
|
self.connection.logout()
|
|
self.logger.info(f"Disconnected from {self.server}")
|
|
except:
|
|
pass
|
|
|
|
def get_folders(self):
|
|
try:
|
|
status, folders = self.connection.list()
|
|
if status == 'OK':
|
|
folder_list = []
|
|
for folder in folders:
|
|
parts = folder.decode().split('"')
|
|
if len(parts) >= 3:
|
|
folder_name = parts[-2]
|
|
folder_list.append(folder_name)
|
|
return folder_list
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting folders: {e}")
|
|
return []
|
|
|
|
def select_folder(self, folder):
|
|
try:
|
|
status, response = self.connection.select(f'"{folder}"')
|
|
if status == 'OK':
|
|
count = int(response[0])
|
|
return True, count
|
|
except Exception as e:
|
|
self.logger.error(f"Error selecting folder '{folder}': {e}")
|
|
return False, 0
|
|
|
|
def get_message_ids(self):
|
|
try:
|
|
status, messages = self.connection.search(None, 'ALL')
|
|
if status == 'OK':
|
|
return messages[0].split()
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting message IDs: {e}")
|
|
return []
|
|
|
|
def fetch_message(self, msg_id):
|
|
try:
|
|
status, msg_data = self.connection.fetch(msg_id, '(RFC822)')
|
|
if status == 'OK':
|
|
raw_email = msg_data[0][1]
|
|
return email.message_from_bytes(raw_email)
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching message {msg_id}: {e}")
|
|
return None
|
|
|
|
def append_message(self, folder, message, flags='', date_time=None):
|
|
try:
|
|
self.create_folder(folder)
|
|
msg_bytes = message.as_bytes()
|
|
date_str = None
|
|
if date_time:
|
|
date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z")
|
|
status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes)
|
|
return status == 'OK'
|
|
except Exception as e:
|
|
self.logger.error(f"Error appending message to folder '{folder}': {e}")
|
|
return False
|
|
|
|
def create_folder(self, folder):
|
|
try:
|
|
status, response = self.connection.create(f'"{folder}"')
|
|
return status == 'OK' or 'already exists' in str(response).lower()
|
|
except:
|
|
return True
|
|
|
|
class EmailMigrator:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.logger = setup_logging(config.get('LOG_LEVEL', 'INFO'))
|
|
self.temp_dir = Path(config.get('TEMP_DOWNLOAD_DIR', './temp_emails'))
|
|
self.temp_dir.mkdir(exist_ok=True)
|
|
self.batch_size = int(config.get('BATCH_SIZE', '50'))
|
|
self.preserve_flags = config.get('PRESERVE_FLAGS', 'True').lower() == 'true'
|
|
self.preserve_dates = config.get('PRESERVE_DATES', 'True').lower() == 'true'
|
|
|
|
# New: Import folder configuration
|
|
self.import_folder_name = config.get('IMPORT_FOLDER_NAME', '').strip()
|
|
if not self.import_folder_name:
|
|
self.import_folder_name = None
|
|
|
|
if self.import_folder_name:
|
|
self.logger.info(f"Import folder configuration: All emails will be imported to subfolders within \"{self.import_folder_name}\"")
|
|
else:
|
|
self.logger.info("Import folder configuration: All emails will be imported directly to INBOX")
|
|
|
|
include_str = config.get('INCLUDE_FOLDERS', '')
|
|
exclude_str = config.get('EXCLUDE_FOLDERS', '')
|
|
|
|
self.include_folders = [f.strip() for f in include_str.split(',') if f.strip()] if include_str else []
|
|
self.exclude_folders = [f.strip() for f in exclude_str.split(',') if f.strip()] if exclude_str else []
|
|
|
|
timeout = int(config.get('IMAP_TIMEOUT', '60'))
|
|
|
|
self.source = IMAPConnection(
|
|
config['SOURCE_IMAP_SERVER'],
|
|
int(config['SOURCE_IMAP_PORT']),
|
|
config.get('SOURCE_EMAIL', ''),
|
|
config.get('SOURCE_USERNAME', ''),
|
|
config['SOURCE_PASSWORD'],
|
|
config.get('SOURCE_IMAP_USE_SSL', 'True').lower() == 'true',
|
|
timeout
|
|
)
|
|
|
|
self.destination = IMAPConnection(
|
|
config['DEST_IMAP_SERVER'],
|
|
int(config['DEST_IMAP_PORT']),
|
|
config.get('DEST_EMAIL', ''),
|
|
config.get('DEST_USERNAME', ''),
|
|
config['DEST_PASSWORD'],
|
|
config.get('DEST_IMAP_USE_SSL', 'True').lower() == 'true',
|
|
timeout
|
|
)
|
|
|
|
def get_destination_folder(self, source_folder):
|
|
"""
|
|
Determine the destination folder based on the import configuration.
|
|
|
|
Args:
|
|
source_folder (str): Original folder name from source
|
|
|
|
Returns:
|
|
str: Destination folder name
|
|
"""
|
|
if self.import_folder_name:
|
|
# Import into subfolders within the specified import folder
|
|
return f"{self.import_folder_name}/{source_folder}"
|
|
else:
|
|
# Import all emails directly to INBOX
|
|
return "INBOX"
|
|
|
|
def should_process_folder(self, folder):
|
|
if self.include_folders and folder not in self.include_folders:
|
|
return False
|
|
if self.exclude_folders and folder in self.exclude_folders:
|
|
return False
|
|
return True
|
|
|
|
def download_emails_from_folder(self, folder):
|
|
self.logger.info(f"Downloading emails from folder: {folder}")
|
|
success, count = self.source.select_folder(folder)
|
|
if not success:
|
|
self.logger.error(f"Failed to select source folder: {folder}")
|
|
return []
|
|
|
|
message_ids = self.source.get_message_ids()
|
|
self.logger.info(f"Found {len(message_ids)} messages in folder: {folder}")
|
|
|
|
emails = []
|
|
for i, msg_id in enumerate(message_ids, 1):
|
|
try:
|
|
msg = self.source.fetch_message(msg_id)
|
|
if msg:
|
|
emails.append({
|
|
'message': msg,
|
|
'folder': folder,
|
|
'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)
|
|
})
|
|
|
|
if i % self.batch_size == 0:
|
|
self.logger.info(f"Downloaded {i}/{len(message_ids)} messages from {folder}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error downloading message {msg_id} from {folder}: {e}")
|
|
continue
|
|
|
|
self.logger.info(f"Successfully downloaded {len(emails)} messages from {folder}")
|
|
return emails
|
|
|
|
def upload_emails_to_folder(self, emails, destination_folder):
|
|
self.logger.info(f"Uploading {len(emails)} emails to folder: {destination_folder}")
|
|
uploaded = 0
|
|
for i, email_data in enumerate(emails, 1):
|
|
try:
|
|
message = email_data['message']
|
|
flags = '\\Seen' if self.preserve_flags else ''
|
|
date_obj = None
|
|
|
|
if self.preserve_dates and message.get('Date'):
|
|
try:
|
|
date_obj = email.utils.parsedate_to_datetime(message['Date'])
|
|
except:
|
|
pass
|
|
|
|
if self.destination.append_message(destination_folder, message, flags, date_obj):
|
|
uploaded += 1
|
|
|
|
if i % self.batch_size == 0:
|
|
self.logger.info(f"Uploaded {i}/{len(emails)} messages to {destination_folder}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error uploading message to {destination_folder}: {e}")
|
|
continue
|
|
|
|
self.logger.info(f"Successfully uploaded {uploaded}/{len(emails)} messages to {destination_folder}")
|
|
return uploaded
|
|
|
|
def migrate_folder(self, source_folder):
|
|
stats = {'downloaded': 0, 'uploaded': 0}
|
|
|
|
if not self.should_process_folder(source_folder):
|
|
self.logger.info(f"Skipping folder: {source_folder} (filtered)")
|
|
return stats
|
|
|
|
try:
|
|
# Determine destination folder based on configuration
|
|
destination_folder = self.get_destination_folder(source_folder)
|
|
self.logger.info(f"Migrating '{source_folder}' -> '{destination_folder}'")
|
|
|
|
emails = self.download_emails_from_folder(source_folder)
|
|
stats['downloaded'] = len(emails)
|
|
|
|
if emails:
|
|
stats['uploaded'] = self.upload_emails_to_folder(emails, destination_folder)
|
|
except Exception as e:
|
|
self.logger.error(f"Error migrating folder {source_folder}: {e}")
|
|
|
|
return stats
|
|
|
|
def run_migration(self):
|
|
self.logger.info("Starting email migration...")
|
|
total_stats = {'folders_processed': 0, 'total_downloaded': 0, 'total_uploaded': 0, 'errors': 0}
|
|
|
|
try:
|
|
if not self.source.connect():
|
|
self.logger.error("Failed to connect to source server")
|
|
return total_stats
|
|
|
|
if not self.destination.connect():
|
|
self.logger.error("Failed to connect to destination server")
|
|
return total_stats
|
|
|
|
folders = self.source.get_folders()
|
|
self.logger.info(f"Found {len(folders)} folders to process")
|
|
|
|
# Create the main import folder if specified
|
|
if self.import_folder_name:
|
|
self.logger.info(f"Creating main import folder: {self.import_folder_name}")
|
|
self.destination.create_folder(self.import_folder_name)
|
|
|
|
for folder in folders:
|
|
try:
|
|
self.logger.info(f"Processing folder: {folder}")
|
|
stats = self.migrate_folder(folder)
|
|
|
|
total_stats['folders_processed'] += 1
|
|
total_stats['total_downloaded'] += stats['downloaded']
|
|
total_stats['total_uploaded'] += stats['uploaded']
|
|
|
|
destination_folder = self.get_destination_folder(folder)
|
|
self.logger.info(f"Folder '{folder}' -> '{destination_folder}' completed: {stats['downloaded']} downloaded, {stats['uploaded']} uploaded")
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing folder {folder}: {e}")
|
|
total_stats['errors'] += 1
|
|
finally:
|
|
self.source.disconnect()
|
|
self.destination.disconnect()
|
|
|
|
return total_stats
|
|
|
|
def main():
|
|
print("Email Migration Script")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
config = load_env_file()
|
|
except Exception as e:
|
|
print(f"Error loading .env file: {e}")
|
|
exit(1)
|
|
|
|
# Basic required variables
|
|
required_vars = ['SOURCE_IMAP_SERVER', 'SOURCE_PASSWORD',
|
|
'DEST_IMAP_SERVER', 'DEST_PASSWORD']
|
|
|
|
missing_vars = [var for var in required_vars if not config.get(var)]
|
|
if missing_vars:
|
|
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
|
|
print("Please check your .env file.")
|
|
exit(1)
|
|
|
|
# Validate authentication methods
|
|
source_email = config.get('SOURCE_EMAIL', '').strip()
|
|
source_username = config.get('SOURCE_USERNAME', '').strip()
|
|
dest_email = config.get('DEST_EMAIL', '').strip()
|
|
dest_username = config.get('DEST_USERNAME', '').strip()
|
|
|
|
if not source_email and not source_username:
|
|
print("Error: Either SOURCE_EMAIL or SOURCE_USERNAME must be provided for source account")
|
|
exit(1)
|
|
|
|
if not dest_email and not dest_username:
|
|
print("Error: Either DEST_EMAIL or DEST_USERNAME must be provided for destination account")
|
|
exit(1)
|
|
|
|
migrator = EmailMigrator(config)
|
|
|
|
try:
|
|
stats = migrator.run_migration()
|
|
|
|
print("\nMigration completed!")
|
|
print(f"Folders processed: {stats['folders_processed']}")
|
|
print(f"Total emails downloaded: {stats['total_downloaded']}")
|
|
print(f"Total emails uploaded: {stats['total_uploaded']}")
|
|
print(f"Errors encountered: {stats['errors']}")
|
|
|
|
if stats['errors'] > 0:
|
|
print("\nCheck the log file 'email_migration.log' for error details.")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nMigration interrupted by user.")
|
|
except Exception as e:
|
|
print(f"Migration failed: {e}")
|
|
exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|