- Add IMPORT_FOLDER_NAME configuration variable - Support organized import (subfolders) or consolidated import (all to INBOX) - Implement get_destination_folder() method for folder mapping - Add email.utils import for proper date handling - Update logging to show source → destination folder mappings - Auto-create import folders as needed Configuration examples: - IMPORT_FOLDER_NAME=Imported → organized subfolders - IMPORT_FOLDER_NAME= → all emails to INBOX - Update .env.template with comprehensive explanations - Add Host Europe and German hosting provider examples - Include detailed configuration guides and troubleshooting - Add realistic migration scenarios - Update README.md with complete documentation - Add feature overview and configuration guide - Include usage examples and troubleshooting section - Document both import modes with clear examples
359 lines
13 KiB
Python
Executable file
359 lines
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
import imaplib
|
|
import email
|
|
import email.utils
|
|
import ssl
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
def load_env_file():
|
|
env_vars = {}
|
|
with open('.env', 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith('#') and '=' in line:
|
|
key, value = line.split('=', 1)
|
|
env_vars[key.strip()] = value.strip()
|
|
return env_vars
|
|
|
|
def setup_logging(log_level):
|
|
numeric_level = getattr(logging, log_level.upper(), logging.INFO)
|
|
logging.basicConfig(
|
|
level=numeric_level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('email_migration.log'),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
return logging.getLogger(__name__)
|
|
|
|
class IMAPConnection:
|
|
def __init__(self, server, port, email_addr, password, use_ssl=True, timeout=60):
|
|
self.server = server
|
|
self.port = port
|
|
self.email = email_addr
|
|
self.password = password
|
|
self.use_ssl = use_ssl
|
|
self.timeout = timeout
|
|
self.connection = None
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def connect(self):
|
|
try:
|
|
if self.use_ssl:
|
|
context = ssl.create_default_context()
|
|
self.connection = imaplib.IMAP4_SSL(self.server, self.port, ssl_context=context)
|
|
else:
|
|
self.connection = imaplib.IMAP4(self.server, self.port)
|
|
|
|
self.connection.sock.settimeout(self.timeout)
|
|
self.connection.login(self.email, self.password)
|
|
self.logger.info(f"Connected to {self.server} as {self.email}")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to connect to {self.server}: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
if self.connection:
|
|
try:
|
|
self.connection.close()
|
|
self.connection.logout()
|
|
self.logger.info(f"Disconnected from {self.server}")
|
|
except:
|
|
pass
|
|
|
|
def get_folders(self):
|
|
try:
|
|
status, folders = self.connection.list()
|
|
if status == 'OK':
|
|
folder_list = []
|
|
for folder in folders:
|
|
parts = folder.decode().split('"')
|
|
if len(parts) >= 3:
|
|
folder_name = parts[-2]
|
|
folder_list.append(folder_name)
|
|
return folder_list
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting folders: {e}")
|
|
return []
|
|
|
|
def select_folder(self, folder):
|
|
try:
|
|
status, response = self.connection.select(f'"{folder}"')
|
|
if status == 'OK':
|
|
count = int(response[0])
|
|
return True, count
|
|
except Exception as e:
|
|
self.logger.error(f"Error selecting folder '{folder}': {e}")
|
|
return False, 0
|
|
|
|
def get_message_ids(self):
|
|
try:
|
|
status, messages = self.connection.search(None, 'ALL')
|
|
if status == 'OK':
|
|
return messages[0].split()
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting message IDs: {e}")
|
|
return []
|
|
|
|
def fetch_message(self, msg_id):
|
|
try:
|
|
status, msg_data = self.connection.fetch(msg_id, '(RFC822)')
|
|
if status == 'OK':
|
|
raw_email = msg_data[0][1]
|
|
return email.message_from_bytes(raw_email)
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching message {msg_id}: {e}")
|
|
return None
|
|
|
|
def append_message(self, folder, message, flags='', date_time=None):
|
|
try:
|
|
self.create_folder(folder)
|
|
msg_bytes = message.as_bytes()
|
|
date_str = None
|
|
if date_time:
|
|
date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z")
|
|
status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes)
|
|
return status == 'OK'
|
|
except Exception as e:
|
|
self.logger.error(f"Error appending message to folder '{folder}': {e}")
|
|
return False
|
|
|
|
def create_folder(self, folder):
|
|
try:
|
|
status, response = self.connection.create(f'"{folder}"')
|
|
return status == 'OK' or 'already exists' in str(response).lower()
|
|
except:
|
|
return True
|
|
|
|
class EmailMigrator:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.logger = setup_logging(config.get('LOG_LEVEL', 'INFO'))
|
|
self.temp_dir = Path(config.get('TEMP_DOWNLOAD_DIR', './temp_emails'))
|
|
self.temp_dir.mkdir(exist_ok=True)
|
|
self.batch_size = int(config.get('BATCH_SIZE', '50'))
|
|
self.preserve_flags = config.get('PRESERVE_FLAGS', 'True').lower() == 'true'
|
|
self.preserve_dates = config.get('PRESERVE_DATES', 'True').lower() == 'true'
|
|
|
|
# New: Import folder configuration
|
|
self.import_folder_name = config.get('IMPORT_FOLDER_NAME', '').strip()
|
|
if not self.import_folder_name:
|
|
self.import_folder_name = None
|
|
|
|
self.logger.info(f"Import folder configuration: {'All emails will be imported to subfolders within \"' + self.import_folder_name + '\"' if self.import_folder_name else 'All emails will be imported directly to INBOX'}")
|
|
|
|
include_str = config.get('INCLUDE_FOLDERS', '')
|
|
exclude_str = config.get('EXCLUDE_FOLDERS', '')
|
|
|
|
self.include_folders = [f.strip() for f in include_str.split(',') if f.strip()] if include_str else []
|
|
self.exclude_folders = [f.strip() for f in exclude_str.split(',') if f.strip()] if exclude_str else []
|
|
|
|
timeout = int(config.get('IMAP_TIMEOUT', '60'))
|
|
|
|
self.source = IMAPConnection(
|
|
config['SOURCE_IMAP_SERVER'],
|
|
int(config['SOURCE_IMAP_PORT']),
|
|
config['SOURCE_EMAIL'],
|
|
config['SOURCE_PASSWORD'],
|
|
config.get('SOURCE_IMAP_USE_SSL', 'True').lower() == 'true',
|
|
timeout
|
|
)
|
|
|
|
self.destination = IMAPConnection(
|
|
config['DEST_IMAP_SERVER'],
|
|
int(config['DEST_IMAP_PORT']),
|
|
config['DEST_EMAIL'],
|
|
config['DEST_PASSWORD'],
|
|
config.get('DEST_IMAP_USE_SSL', 'True').lower() == 'true',
|
|
timeout
|
|
)
|
|
|
|
def get_destination_folder(self, source_folder):
|
|
"""
|
|
Determine the destination folder based on the import configuration.
|
|
|
|
Args:
|
|
source_folder (str): Original folder name from source
|
|
|
|
Returns:
|
|
str: Destination folder name
|
|
"""
|
|
if self.import_folder_name:
|
|
# Import into subfolders within the specified import folder
|
|
return f"{self.import_folder_name}/{source_folder}"
|
|
else:
|
|
# Import all emails directly to INBOX
|
|
return "INBOX"
|
|
|
|
def should_process_folder(self, folder):
|
|
if self.include_folders and folder not in self.include_folders:
|
|
return False
|
|
if self.exclude_folders and folder in self.exclude_folders:
|
|
return False
|
|
return True
|
|
|
|
def download_emails_from_folder(self, folder):
|
|
self.logger.info(f"Downloading emails from folder: {folder}")
|
|
success, count = self.source.select_folder(folder)
|
|
if not success:
|
|
self.logger.error(f"Failed to select source folder: {folder}")
|
|
return []
|
|
|
|
message_ids = self.source.get_message_ids()
|
|
self.logger.info(f"Found {len(message_ids)} messages in folder: {folder}")
|
|
|
|
emails = []
|
|
for i, msg_id in enumerate(message_ids, 1):
|
|
try:
|
|
msg = self.source.fetch_message(msg_id)
|
|
if msg:
|
|
emails.append({
|
|
'message': msg,
|
|
'folder': folder,
|
|
'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)
|
|
})
|
|
|
|
if i % self.batch_size == 0:
|
|
self.logger.info(f"Downloaded {i}/{len(message_ids)} messages from {folder}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error downloading message {msg_id} from {folder}: {e}")
|
|
continue
|
|
|
|
self.logger.info(f"Successfully downloaded {len(emails)} messages from {folder}")
|
|
return emails
|
|
|
|
def upload_emails_to_folder(self, emails, destination_folder):
|
|
self.logger.info(f"Uploading {len(emails)} emails to folder: {destination_folder}")
|
|
uploaded = 0
|
|
for i, email_data in enumerate(emails, 1):
|
|
try:
|
|
message = email_data['message']
|
|
flags = '\\Seen' if self.preserve_flags else ''
|
|
date_obj = None
|
|
|
|
if self.preserve_dates and message.get('Date'):
|
|
try:
|
|
date_obj = email.utils.parsedate_to_datetime(message['Date'])
|
|
except:
|
|
pass
|
|
|
|
if self.destination.append_message(destination_folder, message, flags, date_obj):
|
|
uploaded += 1
|
|
|
|
if i % self.batch_size == 0:
|
|
self.logger.info(f"Uploaded {i}/{len(emails)} messages to {destination_folder}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error uploading message to {destination_folder}: {e}")
|
|
continue
|
|
|
|
self.logger.info(f"Successfully uploaded {uploaded}/{len(emails)} messages to {destination_folder}")
|
|
return uploaded
|
|
|
|
def migrate_folder(self, source_folder):
|
|
stats = {'downloaded': 0, 'uploaded': 0}
|
|
|
|
if not self.should_process_folder(source_folder):
|
|
self.logger.info(f"Skipping folder: {source_folder} (filtered)")
|
|
return stats
|
|
|
|
try:
|
|
# Determine destination folder based on configuration
|
|
destination_folder = self.get_destination_folder(source_folder)
|
|
self.logger.info(f"Migrating '{source_folder}' -> '{destination_folder}'")
|
|
|
|
emails = self.download_emails_from_folder(source_folder)
|
|
stats['downloaded'] = len(emails)
|
|
|
|
if emails:
|
|
stats['uploaded'] = self.upload_emails_to_folder(emails, destination_folder)
|
|
except Exception as e:
|
|
self.logger.error(f"Error migrating folder {source_folder}: {e}")
|
|
|
|
return stats
|
|
|
|
def run_migration(self):
|
|
self.logger.info("Starting email migration...")
|
|
total_stats = {'folders_processed': 0, 'total_downloaded': 0, 'total_uploaded': 0, 'errors': 0}
|
|
|
|
try:
|
|
if not self.source.connect():
|
|
self.logger.error("Failed to connect to source server")
|
|
return total_stats
|
|
|
|
if not self.destination.connect():
|
|
self.logger.error("Failed to connect to destination server")
|
|
return total_stats
|
|
|
|
folders = self.source.get_folders()
|
|
self.logger.info(f"Found {len(folders)} folders to process")
|
|
|
|
# Create the main import folder if specified
|
|
if self.import_folder_name:
|
|
self.logger.info(f"Creating main import folder: {self.import_folder_name}")
|
|
self.destination.create_folder(self.import_folder_name)
|
|
|
|
for folder in folders:
|
|
try:
|
|
self.logger.info(f"Processing folder: {folder}")
|
|
stats = self.migrate_folder(folder)
|
|
|
|
total_stats['folders_processed'] += 1
|
|
total_stats['total_downloaded'] += stats['downloaded']
|
|
total_stats['total_uploaded'] += stats['uploaded']
|
|
|
|
destination_folder = self.get_destination_folder(folder)
|
|
self.logger.info(f"Folder '{folder}' -> '{destination_folder}' completed: {stats['downloaded']} downloaded, {stats['uploaded']} uploaded")
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing folder {folder}: {e}")
|
|
total_stats['errors'] += 1
|
|
finally:
|
|
self.source.disconnect()
|
|
self.destination.disconnect()
|
|
|
|
return total_stats
|
|
|
|
def main():
|
|
print("Email Migration Script")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
config = load_env_file()
|
|
except Exception as e:
|
|
print(f"Error loading .env file: {e}")
|
|
exit(1)
|
|
|
|
required_vars = ['SOURCE_IMAP_SERVER', 'SOURCE_EMAIL', 'SOURCE_PASSWORD',
|
|
'DEST_IMAP_SERVER', 'DEST_EMAIL', 'DEST_PASSWORD']
|
|
|
|
missing_vars = [var for var in required_vars if not config.get(var)]
|
|
if missing_vars:
|
|
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
|
|
print("Please check your .env file.")
|
|
exit(1)
|
|
|
|
migrator = EmailMigrator(config)
|
|
|
|
try:
|
|
stats = migrator.run_migration()
|
|
|
|
print("\nMigration completed!")
|
|
print(f"Folders processed: {stats['folders_processed']}")
|
|
print(f"Total emails downloaded: {stats['total_downloaded']}")
|
|
print(f"Total emails uploaded: {stats['total_uploaded']}")
|
|
print(f"Errors encountered: {stats['errors']}")
|
|
|
|
if stats['errors'] > 0:
|
|
print("\nCheck the log file 'email_migration.log' for error details.")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nMigration interrupted by user.")
|
|
except Exception as e:
|
|
print(f"Migration failed: {e}")
|
|
exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|