Last active
July 21, 2025 00:25
-
-
Save peterbmarks/b3beeeeb22ae0d7282cd0dc895e00d1f to your computer and use it in GitHub Desktop.
Migrate a Google Blogger takeout feed.atom over to Wordpress with posts, images and comments.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Google Takeout Atom Feed to WordPress Migrator. | |
| Reads a Google Takeout feed.atom file and posts content with comments and images to WordPress via REST API | |
| You need to edit your user to create an Application Password and use it below in order | |
| to use the REST API. | |
| If download_images is True they will be pulled from the original blog and added to Wordpress. | |
| Set limit=None for unlimited import count. | |
| Anthropic Claude did most of this work, I just fixed a few things. | |
| Peter Marks | |
| """ | |
| WORDPRESS_URL = "https://www.XXXXXXX" # Replace with your WordPress site URL | |
| USERNAME = "XXXXXXX" # Replace with your WordPress username | |
| PASSWORD = "XXXX XXXX XXXX XXXX XXXX XXXX" # Replace with WordPress application password | |
| ATOM_FILE = "feed.atom" # Path to your Google Takeout feed.atom file | |
| START_POST_NUMBER = 0 | |
| PAUSE_BETWEEN_SECONDS = 5 # throttle upload to prevent being blocked | |
| import xml.etree.ElementTree as ET | |
| import requests | |
| import json | |
| import base64 | |
| from datetime import datetime | |
| import html | |
| import re | |
| import os | |
| import mimetypes | |
| from urllib.parse import urljoin, urlparse | |
| from pathlib import Path | |
| import time | |
| def main(): | |
| """Example usage""" | |
| # Configuration | |
| # Initialize migrator with image processing and comment migration enabled | |
| migrator = TakeoutToWordPress( | |
| WORDPRESS_URL, | |
| USERNAME, | |
| PASSWORD, | |
| download_images=True, # Enable image downloading | |
| images_dir="temp_images", # Directory for temporary image storage | |
| migrate_comments=True # Enable comment migration | |
| ) | |
| try: | |
| # Test parsing (optional) | |
| print("Parsing feed...") | |
| entries, comments = migrator.parse_atom_feed(ATOM_FILE) | |
| print(f"Found {len(entries)} entries and {len(comments)} comments") | |
| if entries: | |
| print("\nFirst entry preview:") | |
| print(f"Title: {entries[0]['title']}") | |
| print(f"Published: {entries[0]['published']}") | |
| print(f"Categories: {entries[0]['categories']}") | |
| print(f"Content preview: {entries[0]['content'][:200]}...") | |
| # Check for images in first entry | |
| if migrator.download_images: | |
| images = migrator._find_images_in_content(entries[0]['content']) | |
| print(f"Images found in first entry: {len(images)}") | |
| if images: | |
| print(f"First image URL: {images[0]}") | |
| # Show comment preview | |
| if comments: | |
| print(f"\nFirst comment preview:") | |
| print(f"Author: {comments[0]['author_name']}") | |
| print(f"Content: {comments[0]['content'][:100]}...") | |
| print(f"Published: {comments[0]['published']}") | |
| # Group comments by post | |
| comment_groups = migrator._group_comments_by_post(comments, entries) | |
| print(f"\nComments grouped by post:") | |
| for post_id, post_comments in comment_groups.items(): | |
| print(f" Post {post_id}: {len(post_comments)} comments") | |
| # Migrate posts (uncomment to run) | |
| print("\nStarting migration...") | |
| migrator.migrate_feed(ATOM_FILE, post_status='publish', start_post_number=START_POST_NUMBER) #, limit=1000) | |
| finally: | |
| # Clean up temporary images directory | |
| migrator.cleanup_images_directory() | |
| print("Cleanup completed") | |
| class TakeoutToWordPress: | |
| def __init__(self, wordpress_url, username, password, download_images=True, images_dir="downloaded_images", migrate_comments=True): | |
| """ | |
| Initialize the migrator | |
| Args: | |
| wordpress_url (str): Your WordPress site URL (e.g., 'https://yoursite.com') | |
| username (str): WordPress username | |
| password (str): WordPress application password | |
| download_images (bool): Whether to download and upload images | |
| images_dir (str): Directory to temporarily store downloaded images | |
| migrate_comments (bool): Whether to migrate comments | |
| """ | |
| self.wordpress_url = wordpress_url.rstrip('/') | |
| self.api_url = f"{self.wordpress_url}/wp-json/wp/v2/" | |
| self.auth_header = self._create_auth_header(username, password) | |
| self.download_images = download_images | |
| self.migrate_comments = migrate_comments | |
| self.images_dir = Path(images_dir) | |
| # Create images directory if it doesn't exist | |
| if self.download_images: | |
| self.images_dir.mkdir(exist_ok=True) | |
| # Track uploaded images to avoid duplicates | |
| self.uploaded_images = {} | |
| # Track post ID mappings for comments | |
| self.post_id_mapping = {} | |
| # Session for image downloads | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| def _create_auth_header(self, username, password): | |
| """Create Basic Auth header for WordPress API""" | |
| credentials = f"{username}:{password}" | |
| encoded_credentials = base64.b64encode(credentials.encode()).decode() | |
| return {"Authorization": f"Basic {encoded_credentials}"} | |
| def parse_atom_feed(self, atom_file_path): | |
| """ | |
| Parse Google Takeout Atom feed file | |
| Args: | |
| atom_file_path (str): Path to the feed.atom file | |
| Returns: | |
| tuple: (entries, comments) - Lists of parsed entries and comments | |
| """ | |
| try: | |
| tree = ET.parse(atom_file_path) | |
| root = tree.getroot() | |
| # Define namespaces commonly used in Atom feeds | |
| namespaces = { | |
| 'atom': 'http://www.w3.org/2005/Atom', | |
| 'thr': 'http://purl.org/syndication/thread/1.0', | |
| 'app': 'http://www.w3.org/2007/app', | |
| 'blogger': 'http://schemas.google.com/blogger/2018' | |
| } | |
| entries = [] | |
| comments = [] | |
| for entry in root.findall('.//atom:entry', namespaces): | |
| # Check if this is a comment (has thr:in-reply-to) | |
| parent = entry.find('blogger:parent', namespaces) | |
| if parent is not None: | |
| # This is a comment | |
| comment_data = self._parse_comment(entry, namespaces) | |
| if comment_data: | |
| comments.append(comment_data) | |
| else: | |
| # This is a regular post | |
| entry_data = self._parse_entry(entry, namespaces) | |
| if entry_data: | |
| entries.append(entry_data) | |
| return entries, comments | |
| except ET.ParseError as e: | |
| print(f"Error parsing XML: {e}") | |
| return [], [] | |
| except FileNotFoundError: | |
| print(f"File not found: {atom_file_path}") | |
| return [], [] | |
| def _parse_entry(self, entry, namespaces): | |
| """Parse individual entry from Atom feed""" | |
| try: | |
| # Extract basic information | |
| title_elem = entry.find('atom:title', namespaces) | |
| title = title_elem.text if title_elem is not None else "Untitled" | |
| # Get content | |
| content_elem = entry.find('atom:content', namespaces) | |
| content = "" | |
| if content_elem is not None: | |
| content = content_elem.text or "" | |
| # Handle HTML content | |
| if content_elem.get('type') == 'html': | |
| content = html.unescape(content) | |
| # Get published date | |
| published_elem = entry.find('atom:published', namespaces) | |
| published = published_elem.text if published_elem is not None else None | |
| # Get updated date | |
| updated_elem = entry.find('atom:updated', namespaces) | |
| updated = updated_elem.text if updated_elem is not None else None | |
| # Get author | |
| author_elem = entry.find('atom:author/atom:name', namespaces) | |
| author = author_elem.text if author_elem is not None else None | |
| # Get categories/tags | |
| categories = [] | |
| for category in entry.findall('atom:category', namespaces): | |
| term = category.get('term') | |
| if term: | |
| categories.append(term) | |
| # Get ID | |
| id_elem = entry.find('atom:id', namespaces) | |
| entry_id = id_elem.text if id_elem is not None else None | |
| # Get link to original post (useful for comment processing) | |
| link_elem = entry.find('atom:link[@rel="alternate"]', namespaces) | |
| original_url = link_elem.get('href') if link_elem is not None else None | |
| return { | |
| 'title': title, | |
| 'content': content, | |
| 'published': published, | |
| 'updated': updated, | |
| 'author': author, | |
| 'categories': categories, | |
| 'entry_id': entry_id, | |
| 'original_url': original_url | |
| } | |
| except Exception as e: | |
| print(f"Error parsing entry: {e}") | |
| return None | |
| def _create_wordpress_comment(self, comment_data, post_id, parent_comment_id=None): | |
| """ | |
| Create a WordPress comment | |
| Args: | |
| comment_data (dict): Comment data from Atom feed | |
| post_id (int): WordPress post ID | |
| parent_comment_id (int): Parent comment ID for replies | |
| Returns: | |
| dict: WordPress comment creation response | |
| """ | |
| # Prepare comment data | |
| wp_comment = { | |
| 'post': post_id, | |
| 'content': self._clean_content(comment_data['content']), | |
| 'author_name': comment_data['author_name'], | |
| 'author_email': comment_data['author_email'], | |
| 'author_url': comment_data['author_url'], | |
| 'status': 'approved' # You might want to set this to 'hold' for moderation | |
| } | |
| # Add parent comment if this is a reply | |
| if parent_comment_id: | |
| wp_comment['parent'] = parent_comment_id | |
| # Add date if available | |
| if comment_data['published']: | |
| formatted_date = self._convert_date_format(comment_data['published']) | |
| if formatted_date: | |
| wp_comment['date'] = formatted_date | |
| try: | |
| response = requests.post( | |
| f"{self.api_url}comments", | |
| headers={ | |
| **self.auth_header, | |
| 'Content-Type': 'application/json' | |
| }, | |
| #verify=False, | |
| json=wp_comment | |
| ) | |
| if response.status_code == 201: | |
| return response.json() | |
| else: | |
| print(f"Error creating comment: {response.status_code}") | |
| print(f"Response: {response.text}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request error creating comment: {e}") | |
| return None | |
| def _group_comments_by_post(self, comments, entries): | |
| """Group comments by their parent post""" | |
| comment_groups = {} | |
| for comment in comments: | |
| parent_ref = comment['parent_ref'] | |
| if parent_ref is not None: | |
| parent_id = parent_ref.text | |
| # Find the corresponding entry | |
| for entry in entries: | |
| if entry['entry_id'] == parent_id: | |
| if parent_id not in comment_groups: | |
| comment_groups[parent_id] = [] | |
| comment_groups[parent_id].append(comment) | |
| break | |
| return comment_groups | |
| def _process_comments_for_post(self, post_id, comments, original_entry_id): | |
| """Process all comments for a specific post""" | |
| if not comments: | |
| return {'success': 0, 'failed': 0} | |
| results = {'success': 0, 'failed': 0} | |
| comment_id_mapping = {} # Map original comment IDs to WordPress comment IDs | |
| # Sort comments by date to ensure proper threading | |
| comments.sort(key=lambda x: x['published'] or '') | |
| print(f"Processing {len(comments)} comments for post {post_id}") | |
| for comment in comments: | |
| # Check if this is a reply to another comment | |
| parent_comment_id = None | |
| # For now, we'll create all comments as top-level | |
| # More sophisticated reply threading would require additional logic | |
| wp_comment = self._create_wordpress_comment( | |
| comment, post_id, parent_comment_id | |
| ) | |
| if wp_comment: | |
| results['success'] += 1 | |
| comment_id_mapping[comment['comment_id']] = wp_comment['id'] | |
| print(f" ✓ Created comment by {comment['author_name']}") | |
| else: | |
| results['failed'] += 1 | |
| print(f" ✗ Failed to create comment by {comment['author_name']}") | |
| return results | |
| def _parse_comment(self, comment_entry, namespaces): | |
| """Parse comment entry from Atom feed""" | |
| try: | |
| # Get reply-to reference | |
| parent_ref = comment_entry.find('blogger:parent', namespaces) | |
| #reply_to = comment_entry.find('thr:in-reply-to', namespaces) | |
| #parent_ref = reply_to.get('ref') if reply_to is not None else None | |
| # Extract basic information | |
| title_elem = comment_entry.find('atom:title', namespaces) | |
| title = title_elem.text if title_elem is not None else "" | |
| # Get content | |
| content_elem = comment_entry.find('atom:content', namespaces) | |
| content = "" | |
| if content_elem is not None: | |
| content = content_elem.text or "" | |
| if content_elem.get('type') == 'html': | |
| content = html.unescape(content) | |
| # Get published date | |
| published_elem = comment_entry.find('atom:published', namespaces) | |
| published = published_elem.text if published_elem is not None else None | |
| # Get author | |
| author_elem = comment_entry.find('atom:author/atom:name', namespaces) | |
| author_name = "Anonymous" | |
| if author_elem is not None: | |
| if author_elem.text is not None: | |
| author_name = author_elem.text | |
| # Get author email if available | |
| author_email_elem = comment_entry.find('atom:author/atom:email', namespaces) | |
| author_email = "[email protected]" | |
| if author_email_elem is not None and author_email_elem.text is not None: | |
| author_email = author_email_elem.text | |
| # Get author URI if available | |
| author_uri_elem = comment_entry.find('atom:author/atom:uri', namespaces) | |
| author_uri = author_uri_elem.text if author_uri_elem is not None else "" | |
| # Get comment ID | |
| id_elem = comment_entry.find('atom:id', namespaces) | |
| comment_id = id_elem.text if id_elem is not None else None | |
| return { | |
| 'parent_ref': parent_ref, | |
| 'title': title, | |
| 'content': content, | |
| 'published': published, | |
| 'author_name': author_name, | |
| 'author_email': author_email, | |
| 'author_url': author_uri, | |
| 'comment_id': comment_id | |
| } | |
| except Exception as e: | |
| print(f"Error parsing comment: {e}") | |
| return None | |
| def _convert_date_format(self, date_string): | |
| """Convert date string to WordPress format""" | |
| if not date_string: | |
| return None | |
| try: | |
| # Parse ISO 8601 date format | |
| dt = datetime.fromisoformat(date_string.replace('Z', '+00:00')) | |
| return dt.strftime('%Y-%m-%dT%H:%M:%S') | |
| except ValueError: | |
| print(f"Could not parse date: {date_string}") | |
| return None | |
| def _clean_content(self, content): | |
| """Clean and prepare content for WordPress""" | |
| if not content: | |
| return "" | |
| # Remove or replace problematic HTML tags | |
| content = re.sub(r'<script.*?</script>', '', content, flags=re.DOTALL) | |
| content = re.sub(r'<style.*?</style>', '', content, flags=re.DOTALL) | |
| # Fix common HTML issues | |
| content = content.replace(' ', ' ') | |
| content = re.sub(r'\s+', ' ', content) # Normalize whitespace | |
| return content.strip() | |
| def _find_images_in_content(self, content): | |
| """Find all image URLs in content""" | |
| if not content: | |
| return [] | |
| # Find img tags | |
| img_pattern = r'<img[^>]+src=["\']([^"\']+)["\'][^>]*>' | |
| images = re.findall(img_pattern, content, re.IGNORECASE) | |
| # Also look for direct image links | |
| direct_image_pattern = r'https?://[^\s<>"\']+\.(?:jpg|jpeg|png|gif|webp|svg)(?:\?[^\s<>"\']*)?' | |
| direct_images = re.findall(direct_image_pattern, content, re.IGNORECASE) | |
| # Combine and deduplicate | |
| all_images = list(set(images + direct_images)) | |
| return all_images | |
| def _download_image(self, image_url): | |
| """Download an image from URL""" | |
| try: | |
| print(f"Downloading image: {image_url}") | |
| # Clean the URL | |
| image_url = image_url.strip() | |
| # Get filename from URL | |
| parsed_url = urlparse(image_url) | |
| filename = os.path.basename(parsed_url.path) | |
| # If no filename, generate one | |
| if not filename or '.' not in filename: | |
| filename = f"image_{int(time.time())}.jpg" | |
| # Download image | |
| response = self.session.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| # Save to local file | |
| local_path = self.images_dir / filename | |
| with open(local_path, 'wb') as f: | |
| f.write(response.content) | |
| print(f"Downloaded: {filename}") | |
| return local_path | |
| except Exception as e: | |
| print(f"Error downloading image {image_url}: {e}") | |
| return None | |
| def _upload_image_to_wordpress(self, image_path, post_id=None): | |
| """Upload an image to WordPress media library""" | |
| try: | |
| if not image_path.exists(): | |
| print(f"Image file not found: {image_path}") | |
| return None | |
| # Check if already uploaded | |
| image_key = str(image_path) | |
| if image_key in self.uploaded_images: | |
| print(f"Image already uploaded: {image_path.name}") | |
| return self.uploaded_images[image_key] | |
| # Determine MIME type | |
| mime_type, _ = mimetypes.guess_type(str(image_path)) | |
| if not mime_type: | |
| mime_type = 'image/jpeg' | |
| # Prepare multipart form data | |
| with open(image_path, 'rb') as f: | |
| files = { | |
| 'file': (image_path.name, f, mime_type) | |
| } | |
| headers = self.auth_header.copy() | |
| # Add post ID if provided (for attachment) | |
| data = {} | |
| if post_id: | |
| data['post'] = post_id | |
| response = requests.post( | |
| f"{self.api_url}media", | |
| headers=headers, | |
| #verify=False, | |
| files=files, | |
| data=data | |
| ) | |
| if response.status_code == 201: | |
| media_data = response.json() | |
| print(f"Uploaded image: {media_data['source_url']}") | |
| # Cache the result | |
| self.uploaded_images[image_key] = media_data | |
| return media_data | |
| else: | |
| print(f"Error uploading image: {response.status_code}") | |
| print(f"Response: {response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"Error uploading image {image_path}: {e}") | |
| return None | |
| def _process_images_in_content(self, content, post_id=None): | |
| """Download and replace images in content""" | |
| if not self.download_images or not content: | |
| return content, None | |
| # Find all images | |
| images = self._find_images_in_content(content) | |
| if not images: | |
| return content, None | |
| print(f"Found {len(images)} images to process") | |
| featured_image_id = None | |
| updated_content = content | |
| for i, image_url in enumerate(images): | |
| # Download image | |
| local_image = self._download_image(image_url) | |
| if local_image: | |
| # Upload to WordPress | |
| media_data = self._upload_image_to_wordpress(local_image, post_id) | |
| if media_data: | |
| # Replace URL in content | |
| new_url = media_data['source_url'] | |
| updated_content = updated_content.replace(image_url, new_url) | |
| # Use first image as featured image | |
| if i == 0: | |
| featured_image_id = media_data['id'] | |
| # Clean up local file | |
| try: | |
| local_image.unlink() | |
| except: | |
| pass | |
| return updated_content, featured_image_id | |
| def _get_first_image_for_featured(self, content): | |
| """Extract first image URL for featured image""" | |
| if not content: | |
| return None | |
| # Find first img tag | |
| img_match = re.search(r'<img[^>]+src=["\']([^"\']+)["\'][^>]*>', content, re.IGNORECASE) | |
| if img_match: | |
| return img_match.group(1) | |
| # Look for direct image links | |
| direct_match = re.search(r'https?://[^\s<>"\']+\.(?:jpg|jpeg|png|gif|webp)(?:\?[^\s<>"\']*)?', content, re.IGNORECASE) | |
| if direct_match: | |
| return direct_match.group(0) | |
| return None | |
| def create_wordpress_post(self, entry_data, status='draft'): | |
| """ | |
| Create a WordPress post from entry data | |
| Args: | |
| entry_data (dict): Parsed entry data | |
| status (str): Post status ('publish', 'draft', 'private') | |
| Returns: | |
| dict: WordPress API response | |
| """ | |
| # Clean content first | |
| cleaned_content = self._clean_content(entry_data['content']) | |
| # Prepare post data | |
| post_data = { | |
| 'title': entry_data['title'], | |
| 'content': cleaned_content, | |
| 'status': status, | |
| 'format': 'standard' | |
| } | |
| # Add date if available | |
| if entry_data['published']: | |
| formatted_date = self._convert_date_format(entry_data['published']) | |
| if formatted_date: | |
| post_data['date'] = formatted_date | |
| # Add categories if available | |
| if entry_data['categories']: | |
| # First, get or create categories | |
| category_ids = [] | |
| for category_name in entry_data['categories']: | |
| category_id = self._get_or_create_category(category_name) | |
| if category_id: | |
| category_ids.append(category_id) | |
| if category_ids: | |
| post_data['categories'] = category_ids | |
| # Make API request to create post | |
| try: | |
| print(f"create post: {self.api_url}posts") | |
| #print(f"post_data={post_data}") | |
| #print(f"headers={self.auth_header}") | |
| response = requests.post( | |
| f"{self.api_url}posts", | |
| headers={ | |
| **self.auth_header, | |
| 'Content-Type': 'application/json' | |
| }, | |
| #verify=False, | |
| json=post_data | |
| ) | |
| if response.status_code == 201: | |
| post_response = response.json() | |
| post_id = post_response['id'] | |
| # Process images if enabled | |
| if self.download_images and cleaned_content: | |
| print(f"Processing images for post: {entry_data['title']}") | |
| # Process images in content | |
| updated_content, featured_image_id = self._process_images_in_content( | |
| cleaned_content, post_id | |
| ) | |
| # Update post with new content and featured image | |
| update_data = {} | |
| if updated_content != cleaned_content: | |
| update_data['content'] = updated_content | |
| if featured_image_id: | |
| update_data['featured_media'] = featured_image_id | |
| # Update the post if we have changes | |
| if update_data: | |
| update_response = requests.post( | |
| f"{self.api_url}posts/{post_id}", | |
| headers={ | |
| **self.auth_header, | |
| 'Content-Type': 'application/json' | |
| }, | |
| #verify=False, | |
| json=update_data | |
| ) | |
| if update_response.status_code == 200: | |
| post_response = update_response.json() | |
| print(f"Updated post with processed images") | |
| else: | |
| print(f"Warning: Could not update post with images: {update_response.status_code}") | |
| # Store post ID mapping for comments | |
| self.post_id_mapping[entry_data['entry_id']] = post_id | |
| return post_response | |
| else: | |
| print(f"Error creating post: {response.status_code}") | |
| print(f"Response: {response.text}") | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| print(f"Request error: {e}") | |
| return None | |
| def _get_or_create_category(self, category_name): | |
| """Get existing category or create new one""" | |
| print(f"get or create category '{category_name}'...") | |
| try: | |
| # Search for existing category | |
| # https://www.homebrewradio.us/blog/wp-json/wp/v2/categories | |
| print(f"{self.api_url}categories") | |
| search_response = requests.get( | |
| f"{self.api_url}categories", | |
| #headers=self.auth_header, | |
| #verify=False, | |
| params={'search': category_name} | |
| ) | |
| print(f"status_code = {search_response.status_code}") | |
| if search_response.status_code == 200: | |
| categories = search_response.json() | |
| print(f"retrieved categories: {categories}") | |
| for cat in categories: | |
| print(f" {cat['name']}") | |
| if cat['name'].lower() == category_name.lower(): | |
| print("found category") | |
| return cat['id'] | |
| # Create new category if not found | |
| print("creating category") | |
| print(self.auth_header) | |
| create_response = requests.post( | |
| f"{self.api_url}categories", | |
| headers={ | |
| **self.auth_header, | |
| 'Content-Type': 'application/json' | |
| }, | |
| #verify=False, | |
| json={'name': category_name} | |
| ) | |
| if create_response.status_code == 201: | |
| return create_response.json()['id'] | |
| else: | |
| print(f"failed to create category {category_name}") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error with category '{category_name}': {e}") | |
| return None | |
| def migrate_feed(self, atom_file_path, post_status='draft', start_post_number=0, limit=None): | |
| """ | |
| Migrate entire feed to WordPress | |
| Args: | |
| atom_file_path (str): Path to feed.atom file | |
| post_status (str): Status for created posts ('publish', 'draft', 'private') | |
| limit (int): Maximum number of posts to migrate (None for all) | |
| Returns: | |
| dict: Migration results | |
| """ | |
| # Parse both entries and comments | |
| entries, comments = self.parse_atom_feed(atom_file_path) | |
| if not entries: | |
| return {'success': 0, 'failed': 0, 'total': 0, 'comments': {'success': 0, 'failed': 0, 'total': 0}} | |
| entries = entries[start_post_number:] | |
| if limit: | |
| entries = entries[:limit] | |
| results = { | |
| 'success': 0, | |
| 'failed': 0, | |
| 'total': len(entries), | |
| 'comments': {'success': 0, 'failed': 0, 'total': 0} | |
| } | |
| print(f"Starting migration of {len(entries)} entries...") | |
| if self.download_images: | |
| print(f"Image processing enabled - images will be downloaded to: {self.images_dir}") | |
| if self.migrate_comments: | |
| print(f"Comment migration enabled - found {len(comments)} comments") | |
| # Group comments by their parent posts | |
| comment_groups = self._group_comments_by_post(comments, entries) if self.migrate_comments else {} | |
| # Migrate posts first | |
| for i, entry in enumerate(entries): | |
| print(f"\nProcessing entry {i+1}/{len(entries)}: {entry['title']}") | |
| result = self.create_wordpress_post(entry, post_status) | |
| if result: | |
| results['success'] += 1 | |
| post_id = result['id'] | |
| print(f"✓ Created post: {result.get('link', 'N/A')}") | |
| # Show image processing results | |
| if self.download_images and result.get('featured_media'): | |
| print(f" → Featured image set: ID {result['featured_media']}") | |
| # Process comments for this post | |
| if self.migrate_comments and entry['entry_id'] in comment_groups: | |
| post_comments = comment_groups[entry['entry_id']] | |
| comment_results = self._process_comments_for_post( | |
| post_id, post_comments, entry['entry_id'] | |
| ) | |
| results['comments']['success'] += comment_results['success'] | |
| results['comments']['failed'] += comment_results['failed'] | |
| results['comments']['total'] += comment_results['success'] + comment_results['failed'] | |
| else: | |
| results['failed'] += 1 | |
| print(f"✗ Failed to create post: {entry['title']}") | |
| # Small delay to be respectful to the server | |
| time.sleep(PAUSE_BETWEEN_SECONDS) | |
| print(f"\nMigration complete!") | |
| print(f"Posts - Successful: {results['success']}, Failed: {results['failed']}, Total: {results['total']}") | |
| if self.migrate_comments: | |
| print(f"Comments - Successful: {results['comments']['success']}, Failed: {results['comments']['failed']}, Total: {results['comments']['total']}") | |
| if self.download_images: | |
| print(f"Images processed: {len(self.uploaded_images)}") | |
| return results | |
| def cleanup_images_directory(self): | |
| """Clean up the images directory""" | |
| if self.images_dir.exists(): | |
| for file in self.images_dir.glob('*'): | |
| try: | |
| file.unlink() | |
| except: | |
| pass | |
| try: | |
| self.images_dir.rmdir() | |
| except: | |
| pass | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment