feat: Add duplicate detection, folder management, and enhanced UI

- Implement smart duplicate email detection using Message-ID and fallback signatures
- Add automatic folder creation with existing folder detection and reuse
- Enhance terminal output with colors, progress bars, and professional formatting
- Replace import folder functionality with original folder structure preservation
- Add comprehensive statistics tracking (duplicates, folder creation, etc.)
- Improve error handling with graceful date format fallbacks
- Add universal terminal compatibility with line-based formatting
- Update documentation and configuration files
- Provide clear user feedback for all migration decisions

Migration now intelligently skips duplicates, preserves folder structure,
and provides detailed feedback on what was migrated vs. what was skipped.
This commit is contained in:
Elmar Sönser 2025-09-24 15:21:01 +02:00
commit 6ef7979445
3 changed files with 564 additions and 72 deletions

View file

@ -4,9 +4,99 @@ import email
import email.utils
import ssl
import logging
import sys
import time
import os
import hashlib
from datetime import datetime
from pathlib import Path
class Colors:
"""ANSI color codes for terminal output"""
# Detect if colors are supported
_colors_supported = sys.stdout.isatty() and (
'TERM' in os.environ and
os.environ['TERM'] != 'dumb' and
hasattr(sys.stdout, 'fileno')
)
# Color codes (will be empty strings if colors not supported)
RESET = '\033[0m' if _colors_supported else ''
BOLD = '\033[1m' if _colors_supported else ''
DIM = '\033[2m' if _colors_supported else ''
RED = '\033[91m' if _colors_supported else ''
GREEN = '\033[92m' if _colors_supported else ''
YELLOW = '\033[93m' if _colors_supported else ''
BLUE = '\033[94m' if _colors_supported else ''
MAGENTA = '\033[95m' if _colors_supported else ''
CYAN = '\033[96m' if _colors_supported else ''
WHITE = '\033[97m' if _colors_supported else ''
# Background colors
BG_RED = '\033[101m' if _colors_supported else ''
BG_GREEN = '\033[102m' if _colors_supported else ''
BG_YELLOW = '\033[103m' if _colors_supported else ''
BG_BLUE = '\033[104m' if _colors_supported else ''
def print_banner():
"""Print fancy banner"""
banner = f"""
{Colors.CYAN}{'='*70}
{Colors.BOLD}{Colors.WHITE} EMAIL MIGRATION SCRIPT
{Colors.CYAN}{'='*70}{Colors.RESET}
{Colors.DIM}Migrating emails while preserving folder structure...{Colors.RESET}
"""
print(banner)
def print_status(status, message, color=Colors.WHITE):
"""Print formatted status message"""
timestamp = datetime.now().strftime("%H:%M:%S")
status_colors = {
'INFO': Colors.BLUE,
'SUCCESS': Colors.GREEN,
'WARNING': Colors.YELLOW,
'ERROR': Colors.RED,
'CONNECTING': Colors.CYAN,
'PROCESSING': Colors.MAGENTA
}
status_color = status_colors.get(status, color)
print(f"{Colors.DIM}[{timestamp}]{Colors.RESET} {status_color}[{status:^10}]{Colors.RESET} {message}")
def print_progress_bar(current, total, folder_name="", width=40):
"""Print a progress bar"""
if total == 0:
percentage = 100
filled = width
else:
percentage = int((current / total) * 100)
filled = int((current / total) * width)
# Use different characters based on color support
if Colors._colors_supported:
bar = '' * filled + '' * (width - filled)
else:
bar = '=' * filled + '-' * (width - filled)
folder_display = f" {folder_name}" if folder_name else ""
# Clear the line and print progress
print(f"\r{' ' * 80}\r{Colors.CYAN}Progress:{Colors.RESET} [{Colors.GREEN}{bar}{Colors.RESET}] {percentage:3d}% ({current}/{total}){folder_display}", end='', flush=True)
if current == total:
print() # New line when complete
def print_summary_section(title, stats):
"""Print a summary section with line separators"""
separator = f"{Colors.CYAN}{'' * 60}{Colors.RESET}"
print(f"\n{separator}")
print(f"{Colors.BOLD}{Colors.WHITE}{title:^60}{Colors.RESET}")
print(f"{separator}")
for key, value in stats.items():
print(f"{Colors.CYAN}{key}:{Colors.RESET} {value}")
print(f"{separator}")
def load_env_file():
env_vars = {}
with open('.env', 'r') as f:
@ -65,6 +155,7 @@ class IMAPConnection:
login_user = self.username
login_success = True
self.logger.info(f"Connected to {self.server} using username: {self.username}")
print_status("SUCCESS", f"Connected to {self.server} as {self.username}")
except Exception as username_error:
self.logger.debug(f"Username login failed for {self.username}: {username_error}")
@ -75,6 +166,7 @@ class IMAPConnection:
login_user = self.email
login_success = True
self.logger.info(f"Connected to {self.server} using email: {self.email}")
print_status("SUCCESS", f"Connected to {self.server} as {self.email}")
except Exception as email_error:
self.logger.error(f"Email login failed for {self.email}: {email_error}")
@ -109,8 +201,10 @@ class IMAPConnection:
for folder in folders:
parts = folder.decode().split('"')
if len(parts) >= 3:
folder_name = parts[-2]
folder_list.append(folder_name)
# The folder name is the last quoted part
folder_name = parts[-1].strip()
if folder_name: # Skip empty folder names
folder_list.append(folder_name)
return folder_list
except Exception as e:
self.logger.error(f"Error getting folders: {e}")
@ -147,12 +241,36 @@ class IMAPConnection:
def append_message(self, folder, message, flags='', date_time=None):
try:
self.create_folder(folder)
# Don't create folder here - let the migration logic handle it
msg_bytes = message.as_bytes()
date_str = None
if date_time:
date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z")
status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes)
self.logger.debug(f"Processing date_time: {type(date_time)} - {date_time}")
try:
# Handle both datetime objects and strings
if hasattr(date_time, 'strftime'):
date_str = date_time.strftime("%d-%b-%Y %H:%M:%S %z")
self.logger.debug(f"Formatted date string: {date_str}")
else:
date_str = str(date_time)
self.logger.debug(f"Date as string: {date_str}")
except Exception as date_error:
self.logger.warning(f"Error formatting date {date_time}: {date_error}")
date_str = None
# First try with date if available
if date_str is not None:
try:
status, response = self.connection.append(f'"{folder}"', flags, date_str, msg_bytes)
if status == 'OK':
return True
else:
self.logger.warning(f"Date upload failed for folder '{folder}', trying without date")
except Exception as date_error:
self.logger.warning(f"Date upload error for folder '{folder}': {date_error}, trying without date")
# Fallback: try without date
status, response = self.connection.append(f'"{folder}"', flags, None, msg_bytes)
return status == 'OK'
except Exception as e:
self.logger.error(f"Error appending message to folder '{folder}': {e}")
@ -161,9 +279,49 @@ class IMAPConnection:
def create_folder(self, folder):
try:
status, response = self.connection.create(f'"{folder}"')
return status == 'OK' or 'already exists' in str(response).lower()
if status == 'OK':
return True, False # Created successfully
elif 'already exists' in str(response).lower():
return True, True # Already exists
else:
return False, False # Failed to create
except:
return True
return True, True # Assume exists on error
def folder_exists(self, folder):
"""Check if a folder exists"""
try:
status, response = self.connection.select(f'"{folder}"', readonly=True)
return status == 'OK'
except:
return False
def get_message_ids_with_headers(self):
"""Get message IDs with basic headers for duplicate detection"""
try:
status, messages = self.connection.search(None, 'ALL')
if status == 'OK':
msg_ids = messages[0].split()
messages_info = []
for msg_id in msg_ids:
try:
# Fetch only headers for efficiency
status, msg_data = self.connection.fetch(msg_id, '(BODY.PEEK[HEADER.FIELDS (MESSAGE-ID SUBJECT DATE FROM)])')
if status == 'OK' and msg_data[0] is not None:
header_data = msg_data[0][1] if msg_data[0][1] else b''
messages_info.append({
'id': msg_id,
'headers': header_data
})
except Exception as e:
self.logger.debug(f"Error fetching headers for message {msg_id}: {e}")
continue
return messages_info
except Exception as e:
self.logger.error(f"Error getting message IDs with headers: {e}")
return []
class EmailMigrator:
def __init__(self, config):
@ -175,15 +333,12 @@ class EmailMigrator:
self.preserve_flags = config.get('PRESERVE_FLAGS', 'True').lower() == 'true'
self.preserve_dates = config.get('PRESERVE_DATES', 'True').lower() == 'true'
# New: Import folder configuration
self.import_folder_name = config.get('IMPORT_FOLDER_NAME', '').strip()
if not self.import_folder_name:
self.import_folder_name = None
# Track duplicates and existing folders
self.duplicate_stats = {}
self.existing_folders = set()
self.created_folders = set()
if self.import_folder_name:
self.logger.info(f"Import folder configuration: All emails will be imported to subfolders within \"{self.import_folder_name}\"")
else:
self.logger.info("Import folder configuration: All emails will be imported directly to INBOX")
self.logger.info("Migration will preserve original folder structure")
include_str = config.get('INCLUDE_FOLDERS', '')
exclude_str = config.get('EXCLUDE_FOLDERS', '')
@ -215,20 +370,64 @@ class EmailMigrator:
def get_destination_folder(self, source_folder):
"""
Determine the destination folder based on the import configuration.
Return the destination folder, which is the same as the source folder.
Args:
source_folder (str): Original folder name from source
Returns:
str: Destination folder name
str: Destination folder name (same as source)
"""
if self.import_folder_name:
# Import into subfolders within the specified import folder
return f"{self.import_folder_name}/{source_folder}"
else:
# Import all emails directly to INBOX
return "INBOX"
return source_folder
def generate_message_signature(self, message):
"""Generate a unique signature for an email message"""
try:
# Use Message-ID if available (most reliable)
message_id = message.get('Message-ID', '').strip()
if message_id:
return hashlib.md5(message_id.encode()).hexdigest()
# Fallback: combine subject, date, and from
subject = message.get('Subject', '').strip()
date = message.get('Date', '').strip()
from_addr = message.get('From', '').strip()
# Create signature from available fields
signature_string = f"{subject}|{date}|{from_addr}"
return hashlib.md5(signature_string.encode()).hexdigest()
except Exception as e:
self.logger.debug(f"Error generating message signature: {e}")
return None
def get_existing_message_signatures(self, folder):
"""Get signatures of existing messages in destination folder"""
try:
if not self.destination.folder_exists(folder):
return set()
success, count = self.destination.select_folder(folder)
if not success:
return set()
existing_messages = self.destination.get_message_ids_with_headers()
signatures = set()
for msg_info in existing_messages:
try:
# Parse headers to create email object
msg = email.message_from_bytes(msg_info['headers'])
signature = self.generate_message_signature(msg)
if signature:
signatures.add(signature)
except Exception as e:
self.logger.debug(f"Error processing existing message: {e}")
continue
return signatures
except Exception as e:
self.logger.error(f"Error getting existing message signatures for {folder}: {e}")
return set()
def should_process_folder(self, folder):
if self.include_folders and folder not in self.include_folders:
@ -239,36 +438,101 @@ class EmailMigrator:
def download_emails_from_folder(self, folder):
self.logger.info(f"Downloading emails from folder: {folder}")
print_status("PROCESSING", f"Accessing folder: {folder}")
success, count = self.source.select_folder(folder)
if not success:
self.logger.error(f"Failed to select source folder: {folder}")
print_status("ERROR", f"Could not access folder: {folder}")
return []
message_ids = self.source.get_message_ids()
self.logger.info(f"Found {len(message_ids)} messages in folder: {folder}")
msg_count = len(message_ids)
self.logger.info(f"Found {msg_count} messages in folder: {folder}")
if msg_count == 0:
print_status("INFO", f"Folder '{folder}' is empty")
return []
# Get existing message signatures from destination to check for duplicates
destination_folder = self.get_destination_folder(folder)
print_status("INFO", f"Checking for existing emails in destination '{destination_folder}'")
existing_signatures = self.get_existing_message_signatures(destination_folder)
print_status("INFO", f"Downloading {msg_count} messages from '{folder}'")
emails = []
duplicates_found = 0
for i, msg_id in enumerate(message_ids, 1):
try:
msg = self.source.fetch_message(msg_id)
if msg:
# Check if this message already exists in destination
signature = self.generate_message_signature(msg)
if signature and signature in existing_signatures:
duplicates_found += 1
self.logger.debug(f"Duplicate message found: {msg.get('Subject', 'No Subject')[:50]}")
continue
emails.append({
'message': msg,
'folder': folder,
'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id)
'original_id': msg_id.decode() if isinstance(msg_id, bytes) else str(msg_id),
'signature': signature
})
# Update progress bar
if msg_count > 1:
print_progress_bar(i, msg_count, f"from {folder}")
if i % self.batch_size == 0:
self.logger.info(f"Downloaded {i}/{len(message_ids)} messages from {folder}")
self.logger.info(f"Downloaded {i}/{msg_count} messages from {folder}")
except Exception as e:
self.logger.error(f"Error downloading message {msg_id} from {folder}: {e}")
continue
self.logger.info(f"Successfully downloaded {len(emails)} messages from {folder}")
# Update duplicate statistics
if duplicates_found > 0:
self.duplicate_stats[folder] = duplicates_found
print_status("INFO", f"Skipped {duplicates_found} duplicate emails in '{folder}'")
self.logger.info(f"Successfully downloaded {len(emails)} new messages from {folder}")
if len(emails) > 0:
print_status("SUCCESS", f"Downloaded {len(emails)} new messages from '{folder}'")
elif duplicates_found > 0:
print_status("INFO", f"All {duplicates_found} emails in '{folder}' already exist in destination")
return emails
def upload_emails_to_folder(self, emails, destination_folder):
self.logger.info(f"Uploading {len(emails)} emails to folder: {destination_folder}")
email_count = len(emails)
if email_count == 0:
return 0
self.logger.info(f"Uploading {email_count} emails to folder: {destination_folder}")
# Check if folder exists and create if necessary
folder_exists = self.destination.folder_exists(destination_folder)
if folder_exists:
if destination_folder not in self.existing_folders:
self.existing_folders.add(destination_folder)
print_status("INFO", f"Using existing folder: '{destination_folder}'")
else:
created, was_existing = self.destination.create_folder(destination_folder)
if created:
if was_existing:
self.existing_folders.add(destination_folder)
print_status("INFO", f"Using existing folder: '{destination_folder}'")
else:
self.created_folders.add(destination_folder)
print_status("SUCCESS", f"Created new folder: '{destination_folder}'")
else:
print_status("ERROR", f"Failed to create folder: '{destination_folder}'")
return 0
print_status("PROCESSING", f"Uploading to folder: {destination_folder}")
uploaded = 0
for i, email_data in enumerate(emails, 1):
try:
@ -279,19 +543,27 @@ class EmailMigrator:
if self.preserve_dates and message.get('Date'):
try:
date_obj = email.utils.parsedate_to_datetime(message['Date'])
except:
pass
self.logger.debug(f"Parsed date object: {type(date_obj)} - {date_obj}")
except Exception as e:
self.logger.warning(f"Failed to parse date '{message.get('Date')}': {e}")
date_obj = None
if self.destination.append_message(destination_folder, message, flags, date_obj):
uploaded += 1
# Update progress bar
if email_count > 1:
print_progress_bar(i, email_count, f"to {destination_folder}")
if i % self.batch_size == 0:
self.logger.info(f"Uploaded {i}/{len(emails)} messages to {destination_folder}")
self.logger.info(f"Uploaded {i}/{email_count} messages to {destination_folder}")
except Exception as e:
self.logger.error(f"Error uploading message to {destination_folder}: {e}")
continue
self.logger.info(f"Successfully uploaded {uploaded}/{len(emails)} messages to {destination_folder}")
self.logger.info(f"Successfully uploaded {uploaded}/{email_count} messages to {destination_folder}")
if uploaded > 0:
print_status("SUCCESS", f"Uploaded {uploaded}/{email_count} messages to '{destination_folder}'")
return uploaded
def migrate_folder(self, source_folder):
@ -299,6 +571,7 @@ class EmailMigrator:
if not self.should_process_folder(source_folder):
self.logger.info(f"Skipping folder: {source_folder} (filtered)")
print_status("INFO", f"Skipping folder '{source_folder}' (filtered)")
return stats
try:
@ -318,28 +591,44 @@ class EmailMigrator:
def run_migration(self):
self.logger.info("Starting email migration...")
total_stats = {'folders_processed': 0, 'total_downloaded': 0, 'total_uploaded': 0, 'errors': 0}
print_status("INFO", "Initializing migration process...")
total_stats = {
'folders_processed': 0,
'total_downloaded': 0,
'total_uploaded': 0,
'errors': 0,
'total_duplicates': 0,
'folders_created': 0,
'folders_existed': 0
}
try:
print_status("CONNECTING", "Connecting to source server...")
if not self.source.connect():
self.logger.error("Failed to connect to source server")
print_status("ERROR", "Failed to connect to source server")
return total_stats
print_status("CONNECTING", "Connecting to destination server...")
if not self.destination.connect():
self.logger.error("Failed to connect to destination server")
print_status("ERROR", "Failed to connect to destination server")
return total_stats
folders = self.source.get_folders()
self.logger.info(f"Found {len(folders)} folders to process")
folder_count = len(folders)
self.logger.info(f"Found {folder_count} folders to process")
print_status("INFO", f"Found {folder_count} folders to analyze")
# Create the main import folder if specified
if self.import_folder_name:
self.logger.info(f"Creating main import folder: {self.import_folder_name}")
self.destination.create_folder(self.import_folder_name)
print(f"\n{Colors.CYAN}{'' * 50}")
print(f"{Colors.BOLD}{Colors.WHITE}MIGRATION PROGRESS{Colors.RESET}")
print(f"{Colors.CYAN}{'' * 50}{Colors.RESET}")
for folder in folders:
for i, folder in enumerate(folders, 1):
try:
self.logger.info(f"Processing folder: {folder}")
print(f"\n{Colors.MAGENTA}[{i}/{folder_count}]{Colors.RESET} Processing folder: {Colors.BOLD}{folder}{Colors.RESET}")
stats = self.migrate_folder(folder)
total_stats['folders_processed'] += 1
@ -348,33 +637,51 @@ class EmailMigrator:
destination_folder = self.get_destination_folder(folder)
self.logger.info(f"Folder '{folder}' -> '{destination_folder}' completed: {stats['downloaded']} downloaded, {stats['uploaded']} uploaded")
# Show folder completion status
if stats['downloaded'] > 0:
print_status("SUCCESS", f"Folder '{folder}' completed: {stats['uploaded']}/{stats['downloaded']} emails migrated")
else:
print_status("INFO", f"Folder '{folder}' was empty - skipped")
except Exception as e:
self.logger.error(f"Error processing folder {folder}: {e}")
print_status("ERROR", f"Failed to process folder '{folder}': {e}")
total_stats['errors'] += 1
# Calculate final statistics
total_stats['total_duplicates'] = sum(self.duplicate_stats.values())
total_stats['folders_created'] = len(self.created_folders)
total_stats['folders_existed'] = len(self.existing_folders)
finally:
print_status("INFO", "Closing connections...")
self.source.disconnect()
self.destination.disconnect()
return total_stats
def main():
print("Email Migration Script")
print("=" * 50)
print_banner()
try:
print_status("INFO", "Loading configuration from .env file...")
config = load_env_file()
print_status("SUCCESS", "Configuration loaded successfully")
except Exception as e:
print(f"Error loading .env file: {e}")
print_status("ERROR", f"Failed to load .env file: {e}")
exit(1)
# Basic required variables
print_status("INFO", "Validating configuration...")
required_vars = ['SOURCE_IMAP_SERVER', 'SOURCE_PASSWORD',
'DEST_IMAP_SERVER', 'DEST_PASSWORD']
missing_vars = [var for var in required_vars if not config.get(var)]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
print("Please check your .env file.")
print_status("ERROR", f"Missing required variables: {', '.join(missing_vars)}")
print_status("ERROR", "Please check your .env file")
exit(1)
# Validate authentication methods
@ -384,32 +691,71 @@ def main():
dest_username = config.get('DEST_USERNAME', '').strip()
if not source_email and not source_username:
print("Error: Either SOURCE_EMAIL or SOURCE_USERNAME must be provided for source account")
print_status("ERROR", "Either SOURCE_EMAIL or SOURCE_USERNAME must be provided for source account")
exit(1)
if not dest_email and not dest_username:
print("Error: Either DEST_EMAIL or DEST_USERNAME must be provided for destination account")
print_status("ERROR", "Either DEST_EMAIL or DEST_USERNAME must be provided for destination account")
exit(1)
print_status("SUCCESS", "Configuration validation completed")
print_status("INFO", "Initializing migration engine...")
migrator = EmailMigrator(config)
try:
stats = migrator.run_migration()
print("\nMigration completed!")
print(f"Folders processed: {stats['folders_processed']}")
print(f"Total emails downloaded: {stats['total_downloaded']}")
print(f"Total emails uploaded: {stats['total_uploaded']}")
print(f"Errors encountered: {stats['errors']}")
print_status("SUCCESS", "Migration completed successfully!")
# Display summary
summary_stats = {
"Folders processed": f"{Colors.CYAN}{stats['folders_processed']}{Colors.RESET}",
"Emails downloaded": f"{Colors.GREEN}{stats['total_downloaded']}{Colors.RESET}",
"Emails uploaded": f"{Colors.GREEN}{stats['total_uploaded']}{Colors.RESET}",
"Duplicates skipped": f"{Colors.YELLOW}{stats['total_duplicates']}{Colors.RESET}",
"Folders created": f"{Colors.CYAN}{stats['folders_created']}{Colors.RESET}",
"Folders existed": f"{Colors.BLUE}{stats['folders_existed']}{Colors.RESET}",
"Errors encountered": f"{Colors.RED if stats['errors'] > 0 else Colors.GREEN}{stats['errors']}{Colors.RESET}"
}
print_summary_section("MIGRATION SUMMARY", summary_stats)
# Show duplicate details if any
if stats['total_duplicates'] > 0:
duplicate_separator = f"{Colors.YELLOW}{'' * 60}{Colors.RESET}"
print(f"\n{duplicate_separator}")
print(f"{Colors.BOLD}{Colors.WHITE}{'DUPLICATE EMAILS SKIPPED':^60}{Colors.RESET}")
print(f"{duplicate_separator}")
for folder, count in migrator.duplicate_stats.items():
print(f"{Colors.YELLOW}{folder}:{Colors.RESET} {count}")
print(f"{duplicate_separator}")
print(f"\n{Colors.YELLOW}Note: {stats['total_duplicates']} duplicate emails were not imported{Colors.RESET}")
print(f"{Colors.YELLOW}because they already exist in the destination folders.{Colors.RESET}")
if stats['errors'] > 0:
print("\nCheck the log file 'email_migration.log' for error details.")
print_status("WARNING", "Check 'email_migration.log' for error details")
# Important notice section
notice_separator = f"{Colors.YELLOW}{'' * 60}{Colors.RESET}"
print(f"\n{notice_separator}")
print(f"{Colors.BOLD}{Colors.WHITE}{'IMPORTANT NOTE':^60}{Colors.RESET}")
print(f"{notice_separator}")
print(f"{Colors.YELLOW}Some webmail clients may not show new folders immediately.{Colors.RESET}")
print(f"{Colors.YELLOW}You may need to:{Colors.RESET}")
print(f"{Colors.YELLOW} {Colors.CYAN}1.{Colors.RESET} {Colors.YELLOW}Refresh your webmail interface{Colors.RESET}")
print(f"{Colors.YELLOW} {Colors.CYAN}2.{Colors.RESET} {Colors.YELLOW}Check folder settings/preferences{Colors.RESET}")
print(f"{Colors.YELLOW} {Colors.CYAN}3.{Colors.RESET} {Colors.YELLOW}Manually subscribe to new folders{Colors.RESET}")
print(f"{notice_separator}")
except KeyboardInterrupt:
print("\nMigration interrupted by user.")
print_status("WARNING", "Migration interrupted by user")
sys.exit(0)
except Exception as e:
print(f"Migration failed: {e}")
exit(1)
print_status("ERROR", f"Migration failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()