Skip to content

Instantly share code, notes, and snippets.

@peterbmarks
Last active July 21, 2025 00:25
Show Gist options
  • Select an option

  • Save peterbmarks/b3beeeeb22ae0d7282cd0dc895e00d1f to your computer and use it in GitHub Desktop.

Select an option

Save peterbmarks/b3beeeeb22ae0d7282cd0dc895e00d1f to your computer and use it in GitHub Desktop.
Migrate a Google Blogger takeout feed.atom over to Wordpress with posts, images and comments.
#!/usr/bin/env python3
"""
Google Takeout Atom Feed to WordPress Migrator.
Reads a Google Takeout feed.atom file and posts content with comments and images to WordPress via REST API
You need to edit your user to create an Application Password and use it below in order
to use the REST API.
If download_images is True they will be pulled from the original blog and added to Wordpress.
Set limit=None for unlimited import count.
Anthropic Claude did most of this work, I just fixed a few things.
Peter Marks
"""
WORDPRESS_URL = "https://www.XXXXXXX" # Replace with your WordPress site URL
USERNAME = "XXXXXXX" # Replace with your WordPress username
PASSWORD = "XXXX XXXX XXXX XXXX XXXX XXXX" # Replace with WordPress application password
ATOM_FILE = "feed.atom" # Path to your Google Takeout feed.atom file
START_POST_NUMBER = 0
PAUSE_BETWEEN_SECONDS = 5 # throttle upload to prevent being blocked
import xml.etree.ElementTree as ET
import requests
import json
import base64
from datetime import datetime
import html
import re
import os
import mimetypes
from urllib.parse import urljoin, urlparse
from pathlib import Path
import time
def main():
"""Example usage"""
# Configuration
# Initialize migrator with image processing and comment migration enabled
migrator = TakeoutToWordPress(
WORDPRESS_URL,
USERNAME,
PASSWORD,
download_images=True, # Enable image downloading
images_dir="temp_images", # Directory for temporary image storage
migrate_comments=True # Enable comment migration
)
try:
# Test parsing (optional)
print("Parsing feed...")
entries, comments = migrator.parse_atom_feed(ATOM_FILE)
print(f"Found {len(entries)} entries and {len(comments)} comments")
if entries:
print("\nFirst entry preview:")
print(f"Title: {entries[0]['title']}")
print(f"Published: {entries[0]['published']}")
print(f"Categories: {entries[0]['categories']}")
print(f"Content preview: {entries[0]['content'][:200]}...")
# Check for images in first entry
if migrator.download_images:
images = migrator._find_images_in_content(entries[0]['content'])
print(f"Images found in first entry: {len(images)}")
if images:
print(f"First image URL: {images[0]}")
# Show comment preview
if comments:
print(f"\nFirst comment preview:")
print(f"Author: {comments[0]['author_name']}")
print(f"Content: {comments[0]['content'][:100]}...")
print(f"Published: {comments[0]['published']}")
# Group comments by post
comment_groups = migrator._group_comments_by_post(comments, entries)
print(f"\nComments grouped by post:")
for post_id, post_comments in comment_groups.items():
print(f" Post {post_id}: {len(post_comments)} comments")
# Migrate posts (uncomment to run)
print("\nStarting migration...")
migrator.migrate_feed(ATOM_FILE, post_status='publish', start_post_number=START_POST_NUMBER) #, limit=1000)
finally:
# Clean up temporary images directory
migrator.cleanup_images_directory()
print("Cleanup completed")
class TakeoutToWordPress:
def __init__(self, wordpress_url, username, password, download_images=True, images_dir="downloaded_images", migrate_comments=True):
"""
Initialize the migrator
Args:
wordpress_url (str): Your WordPress site URL (e.g., 'https://yoursite.com')
username (str): WordPress username
password (str): WordPress application password
download_images (bool): Whether to download and upload images
images_dir (str): Directory to temporarily store downloaded images
migrate_comments (bool): Whether to migrate comments
"""
self.wordpress_url = wordpress_url.rstrip('/')
self.api_url = f"{self.wordpress_url}/wp-json/wp/v2/"
self.auth_header = self._create_auth_header(username, password)
self.download_images = download_images
self.migrate_comments = migrate_comments
self.images_dir = Path(images_dir)
# Create images directory if it doesn't exist
if self.download_images:
self.images_dir.mkdir(exist_ok=True)
# Track uploaded images to avoid duplicates
self.uploaded_images = {}
# Track post ID mappings for comments
self.post_id_mapping = {}
# Session for image downloads
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
def _create_auth_header(self, username, password):
"""Create Basic Auth header for WordPress API"""
credentials = f"{username}:{password}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
return {"Authorization": f"Basic {encoded_credentials}"}
def parse_atom_feed(self, atom_file_path):
"""
Parse Google Takeout Atom feed file
Args:
atom_file_path (str): Path to the feed.atom file
Returns:
tuple: (entries, comments) - Lists of parsed entries and comments
"""
try:
tree = ET.parse(atom_file_path)
root = tree.getroot()
# Define namespaces commonly used in Atom feeds
namespaces = {
'atom': 'http://www.w3.org/2005/Atom',
'thr': 'http://purl.org/syndication/thread/1.0',
'app': 'http://www.w3.org/2007/app',
'blogger': 'http://schemas.google.com/blogger/2018'
}
entries = []
comments = []
for entry in root.findall('.//atom:entry', namespaces):
# Check if this is a comment (has thr:in-reply-to)
parent = entry.find('blogger:parent', namespaces)
if parent is not None:
# This is a comment
comment_data = self._parse_comment(entry, namespaces)
if comment_data:
comments.append(comment_data)
else:
# This is a regular post
entry_data = self._parse_entry(entry, namespaces)
if entry_data:
entries.append(entry_data)
return entries, comments
except ET.ParseError as e:
print(f"Error parsing XML: {e}")
return [], []
except FileNotFoundError:
print(f"File not found: {atom_file_path}")
return [], []
def _parse_entry(self, entry, namespaces):
"""Parse individual entry from Atom feed"""
try:
# Extract basic information
title_elem = entry.find('atom:title', namespaces)
title = title_elem.text if title_elem is not None else "Untitled"
# Get content
content_elem = entry.find('atom:content', namespaces)
content = ""
if content_elem is not None:
content = content_elem.text or ""
# Handle HTML content
if content_elem.get('type') == 'html':
content = html.unescape(content)
# Get published date
published_elem = entry.find('atom:published', namespaces)
published = published_elem.text if published_elem is not None else None
# Get updated date
updated_elem = entry.find('atom:updated', namespaces)
updated = updated_elem.text if updated_elem is not None else None
# Get author
author_elem = entry.find('atom:author/atom:name', namespaces)
author = author_elem.text if author_elem is not None else None
# Get categories/tags
categories = []
for category in entry.findall('atom:category', namespaces):
term = category.get('term')
if term:
categories.append(term)
# Get ID
id_elem = entry.find('atom:id', namespaces)
entry_id = id_elem.text if id_elem is not None else None
# Get link to original post (useful for comment processing)
link_elem = entry.find('atom:link[@rel="alternate"]', namespaces)
original_url = link_elem.get('href') if link_elem is not None else None
return {
'title': title,
'content': content,
'published': published,
'updated': updated,
'author': author,
'categories': categories,
'entry_id': entry_id,
'original_url': original_url
}
except Exception as e:
print(f"Error parsing entry: {e}")
return None
def _create_wordpress_comment(self, comment_data, post_id, parent_comment_id=None):
"""
Create a WordPress comment
Args:
comment_data (dict): Comment data from Atom feed
post_id (int): WordPress post ID
parent_comment_id (int): Parent comment ID for replies
Returns:
dict: WordPress comment creation response
"""
# Prepare comment data
wp_comment = {
'post': post_id,
'content': self._clean_content(comment_data['content']),
'author_name': comment_data['author_name'],
'author_email': comment_data['author_email'],
'author_url': comment_data['author_url'],
'status': 'approved' # You might want to set this to 'hold' for moderation
}
# Add parent comment if this is a reply
if parent_comment_id:
wp_comment['parent'] = parent_comment_id
# Add date if available
if comment_data['published']:
formatted_date = self._convert_date_format(comment_data['published'])
if formatted_date:
wp_comment['date'] = formatted_date
try:
response = requests.post(
f"{self.api_url}comments",
headers={
**self.auth_header,
'Content-Type': 'application/json'
},
#verify=False,
json=wp_comment
)
if response.status_code == 201:
return response.json()
else:
print(f"Error creating comment: {response.status_code}")
print(f"Response: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Request error creating comment: {e}")
return None
def _group_comments_by_post(self, comments, entries):
"""Group comments by their parent post"""
comment_groups = {}
for comment in comments:
parent_ref = comment['parent_ref']
if parent_ref is not None:
parent_id = parent_ref.text
# Find the corresponding entry
for entry in entries:
if entry['entry_id'] == parent_id:
if parent_id not in comment_groups:
comment_groups[parent_id] = []
comment_groups[parent_id].append(comment)
break
return comment_groups
def _process_comments_for_post(self, post_id, comments, original_entry_id):
"""Process all comments for a specific post"""
if not comments:
return {'success': 0, 'failed': 0}
results = {'success': 0, 'failed': 0}
comment_id_mapping = {} # Map original comment IDs to WordPress comment IDs
# Sort comments by date to ensure proper threading
comments.sort(key=lambda x: x['published'] or '')
print(f"Processing {len(comments)} comments for post {post_id}")
for comment in comments:
# Check if this is a reply to another comment
parent_comment_id = None
# For now, we'll create all comments as top-level
# More sophisticated reply threading would require additional logic
wp_comment = self._create_wordpress_comment(
comment, post_id, parent_comment_id
)
if wp_comment:
results['success'] += 1
comment_id_mapping[comment['comment_id']] = wp_comment['id']
print(f" ✓ Created comment by {comment['author_name']}")
else:
results['failed'] += 1
print(f" ✗ Failed to create comment by {comment['author_name']}")
return results
def _parse_comment(self, comment_entry, namespaces):
"""Parse comment entry from Atom feed"""
try:
# Get reply-to reference
parent_ref = comment_entry.find('blogger:parent', namespaces)
#reply_to = comment_entry.find('thr:in-reply-to', namespaces)
#parent_ref = reply_to.get('ref') if reply_to is not None else None
# Extract basic information
title_elem = comment_entry.find('atom:title', namespaces)
title = title_elem.text if title_elem is not None else ""
# Get content
content_elem = comment_entry.find('atom:content', namespaces)
content = ""
if content_elem is not None:
content = content_elem.text or ""
if content_elem.get('type') == 'html':
content = html.unescape(content)
# Get published date
published_elem = comment_entry.find('atom:published', namespaces)
published = published_elem.text if published_elem is not None else None
# Get author
author_elem = comment_entry.find('atom:author/atom:name', namespaces)
author_name = "Anonymous"
if author_elem is not None:
if author_elem.text is not None:
author_name = author_elem.text
# Get author email if available
author_email_elem = comment_entry.find('atom:author/atom:email', namespaces)
author_email = "[email protected]"
if author_email_elem is not None and author_email_elem.text is not None:
author_email = author_email_elem.text
# Get author URI if available
author_uri_elem = comment_entry.find('atom:author/atom:uri', namespaces)
author_uri = author_uri_elem.text if author_uri_elem is not None else ""
# Get comment ID
id_elem = comment_entry.find('atom:id', namespaces)
comment_id = id_elem.text if id_elem is not None else None
return {
'parent_ref': parent_ref,
'title': title,
'content': content,
'published': published,
'author_name': author_name,
'author_email': author_email,
'author_url': author_uri,
'comment_id': comment_id
}
except Exception as e:
print(f"Error parsing comment: {e}")
return None
def _convert_date_format(self, date_string):
"""Convert date string to WordPress format"""
if not date_string:
return None
try:
# Parse ISO 8601 date format
dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%dT%H:%M:%S')
except ValueError:
print(f"Could not parse date: {date_string}")
return None
def _clean_content(self, content):
"""Clean and prepare content for WordPress"""
if not content:
return ""
# Remove or replace problematic HTML tags
content = re.sub(r'<script.*?</script>', '', content, flags=re.DOTALL)
content = re.sub(r'<style.*?</style>', '', content, flags=re.DOTALL)
# Fix common HTML issues
content = content.replace('&nbsp;', ' ')
content = re.sub(r'\s+', ' ', content) # Normalize whitespace
return content.strip()
def _find_images_in_content(self, content):
"""Find all image URLs in content"""
if not content:
return []
# Find img tags
img_pattern = r'<img[^>]+src=["\']([^"\']+)["\'][^>]*>'
images = re.findall(img_pattern, content, re.IGNORECASE)
# Also look for direct image links
direct_image_pattern = r'https?://[^\s<>"\']+\.(?:jpg|jpeg|png|gif|webp|svg)(?:\?[^\s<>"\']*)?'
direct_images = re.findall(direct_image_pattern, content, re.IGNORECASE)
# Combine and deduplicate
all_images = list(set(images + direct_images))
return all_images
def _download_image(self, image_url):
"""Download an image from URL"""
try:
print(f"Downloading image: {image_url}")
# Clean the URL
image_url = image_url.strip()
# Get filename from URL
parsed_url = urlparse(image_url)
filename = os.path.basename(parsed_url.path)
# If no filename, generate one
if not filename or '.' not in filename:
filename = f"image_{int(time.time())}.jpg"
# Download image
response = self.session.get(image_url, timeout=30)
response.raise_for_status()
# Save to local file
local_path = self.images_dir / filename
with open(local_path, 'wb') as f:
f.write(response.content)
print(f"Downloaded: {filename}")
return local_path
except Exception as e:
print(f"Error downloading image {image_url}: {e}")
return None
def _upload_image_to_wordpress(self, image_path, post_id=None):
"""Upload an image to WordPress media library"""
try:
if not image_path.exists():
print(f"Image file not found: {image_path}")
return None
# Check if already uploaded
image_key = str(image_path)
if image_key in self.uploaded_images:
print(f"Image already uploaded: {image_path.name}")
return self.uploaded_images[image_key]
# Determine MIME type
mime_type, _ = mimetypes.guess_type(str(image_path))
if not mime_type:
mime_type = 'image/jpeg'
# Prepare multipart form data
with open(image_path, 'rb') as f:
files = {
'file': (image_path.name, f, mime_type)
}
headers = self.auth_header.copy()
# Add post ID if provided (for attachment)
data = {}
if post_id:
data['post'] = post_id
response = requests.post(
f"{self.api_url}media",
headers=headers,
#verify=False,
files=files,
data=data
)
if response.status_code == 201:
media_data = response.json()
print(f"Uploaded image: {media_data['source_url']}")
# Cache the result
self.uploaded_images[image_key] = media_data
return media_data
else:
print(f"Error uploading image: {response.status_code}")
print(f"Response: {response.text}")
return None
except Exception as e:
print(f"Error uploading image {image_path}: {e}")
return None
def _process_images_in_content(self, content, post_id=None):
"""Download and replace images in content"""
if not self.download_images or not content:
return content, None
# Find all images
images = self._find_images_in_content(content)
if not images:
return content, None
print(f"Found {len(images)} images to process")
featured_image_id = None
updated_content = content
for i, image_url in enumerate(images):
# Download image
local_image = self._download_image(image_url)
if local_image:
# Upload to WordPress
media_data = self._upload_image_to_wordpress(local_image, post_id)
if media_data:
# Replace URL in content
new_url = media_data['source_url']
updated_content = updated_content.replace(image_url, new_url)
# Use first image as featured image
if i == 0:
featured_image_id = media_data['id']
# Clean up local file
try:
local_image.unlink()
except:
pass
return updated_content, featured_image_id
def _get_first_image_for_featured(self, content):
"""Extract first image URL for featured image"""
if not content:
return None
# Find first img tag
img_match = re.search(r'<img[^>]+src=["\']([^"\']+)["\'][^>]*>', content, re.IGNORECASE)
if img_match:
return img_match.group(1)
# Look for direct image links
direct_match = re.search(r'https?://[^\s<>"\']+\.(?:jpg|jpeg|png|gif|webp)(?:\?[^\s<>"\']*)?', content, re.IGNORECASE)
if direct_match:
return direct_match.group(0)
return None
def create_wordpress_post(self, entry_data, status='draft'):
"""
Create a WordPress post from entry data
Args:
entry_data (dict): Parsed entry data
status (str): Post status ('publish', 'draft', 'private')
Returns:
dict: WordPress API response
"""
# Clean content first
cleaned_content = self._clean_content(entry_data['content'])
# Prepare post data
post_data = {
'title': entry_data['title'],
'content': cleaned_content,
'status': status,
'format': 'standard'
}
# Add date if available
if entry_data['published']:
formatted_date = self._convert_date_format(entry_data['published'])
if formatted_date:
post_data['date'] = formatted_date
# Add categories if available
if entry_data['categories']:
# First, get or create categories
category_ids = []
for category_name in entry_data['categories']:
category_id = self._get_or_create_category(category_name)
if category_id:
category_ids.append(category_id)
if category_ids:
post_data['categories'] = category_ids
# Make API request to create post
try:
print(f"create post: {self.api_url}posts")
#print(f"post_data={post_data}")
#print(f"headers={self.auth_header}")
response = requests.post(
f"{self.api_url}posts",
headers={
**self.auth_header,
'Content-Type': 'application/json'
},
#verify=False,
json=post_data
)
if response.status_code == 201:
post_response = response.json()
post_id = post_response['id']
# Process images if enabled
if self.download_images and cleaned_content:
print(f"Processing images for post: {entry_data['title']}")
# Process images in content
updated_content, featured_image_id = self._process_images_in_content(
cleaned_content, post_id
)
# Update post with new content and featured image
update_data = {}
if updated_content != cleaned_content:
update_data['content'] = updated_content
if featured_image_id:
update_data['featured_media'] = featured_image_id
# Update the post if we have changes
if update_data:
update_response = requests.post(
f"{self.api_url}posts/{post_id}",
headers={
**self.auth_header,
'Content-Type': 'application/json'
},
#verify=False,
json=update_data
)
if update_response.status_code == 200:
post_response = update_response.json()
print(f"Updated post with processed images")
else:
print(f"Warning: Could not update post with images: {update_response.status_code}")
# Store post ID mapping for comments
self.post_id_mapping[entry_data['entry_id']] = post_id
return post_response
else:
print(f"Error creating post: {response.status_code}")
print(f"Response: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
return None
def _get_or_create_category(self, category_name):
"""Get existing category or create new one"""
print(f"get or create category '{category_name}'...")
try:
# Search for existing category
# https://www.homebrewradio.us/blog/wp-json/wp/v2/categories
print(f"{self.api_url}categories")
search_response = requests.get(
f"{self.api_url}categories",
#headers=self.auth_header,
#verify=False,
params={'search': category_name}
)
print(f"status_code = {search_response.status_code}")
if search_response.status_code == 200:
categories = search_response.json()
print(f"retrieved categories: {categories}")
for cat in categories:
print(f" {cat['name']}")
if cat['name'].lower() == category_name.lower():
print("found category")
return cat['id']
# Create new category if not found
print("creating category")
print(self.auth_header)
create_response = requests.post(
f"{self.api_url}categories",
headers={
**self.auth_header,
'Content-Type': 'application/json'
},
#verify=False,
json={'name': category_name}
)
if create_response.status_code == 201:
return create_response.json()['id']
else:
print(f"failed to create category {category_name}")
except requests.exceptions.RequestException as e:
print(f"Error with category '{category_name}': {e}")
return None
def migrate_feed(self, atom_file_path, post_status='draft', start_post_number=0, limit=None):
"""
Migrate entire feed to WordPress
Args:
atom_file_path (str): Path to feed.atom file
post_status (str): Status for created posts ('publish', 'draft', 'private')
limit (int): Maximum number of posts to migrate (None for all)
Returns:
dict: Migration results
"""
# Parse both entries and comments
entries, comments = self.parse_atom_feed(atom_file_path)
if not entries:
return {'success': 0, 'failed': 0, 'total': 0, 'comments': {'success': 0, 'failed': 0, 'total': 0}}
entries = entries[start_post_number:]
if limit:
entries = entries[:limit]
results = {
'success': 0,
'failed': 0,
'total': len(entries),
'comments': {'success': 0, 'failed': 0, 'total': 0}
}
print(f"Starting migration of {len(entries)} entries...")
if self.download_images:
print(f"Image processing enabled - images will be downloaded to: {self.images_dir}")
if self.migrate_comments:
print(f"Comment migration enabled - found {len(comments)} comments")
# Group comments by their parent posts
comment_groups = self._group_comments_by_post(comments, entries) if self.migrate_comments else {}
# Migrate posts first
for i, entry in enumerate(entries):
print(f"\nProcessing entry {i+1}/{len(entries)}: {entry['title']}")
result = self.create_wordpress_post(entry, post_status)
if result:
results['success'] += 1
post_id = result['id']
print(f"✓ Created post: {result.get('link', 'N/A')}")
# Show image processing results
if self.download_images and result.get('featured_media'):
print(f" → Featured image set: ID {result['featured_media']}")
# Process comments for this post
if self.migrate_comments and entry['entry_id'] in comment_groups:
post_comments = comment_groups[entry['entry_id']]
comment_results = self._process_comments_for_post(
post_id, post_comments, entry['entry_id']
)
results['comments']['success'] += comment_results['success']
results['comments']['failed'] += comment_results['failed']
results['comments']['total'] += comment_results['success'] + comment_results['failed']
else:
results['failed'] += 1
print(f"✗ Failed to create post: {entry['title']}")
# Small delay to be respectful to the server
time.sleep(PAUSE_BETWEEN_SECONDS)
print(f"\nMigration complete!")
print(f"Posts - Successful: {results['success']}, Failed: {results['failed']}, Total: {results['total']}")
if self.migrate_comments:
print(f"Comments - Successful: {results['comments']['success']}, Failed: {results['comments']['failed']}, Total: {results['comments']['total']}")
if self.download_images:
print(f"Images processed: {len(self.uploaded_images)}")
return results
def cleanup_images_directory(self):
"""Clean up the images directory"""
if self.images_dir.exists():
for file in self.images_dir.glob('*'):
try:
file.unlink()
except:
pass
try:
self.images_dir.rmdir()
except:
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment