Skip to content

Instantly share code, notes, and snippets.

@adamziel
Last active June 4, 2025 12:31
Show Gist options
  • Select an option

  • Save adamziel/c859da85ba8a8aec4f45fe3491113b52 to your computer and use it in GitHub Desktop.

Select an option

Save adamziel/c859da85ba8a8aec4f45fe3491113b52 to your computer and use it in GitHub Desktop.
Data Liberation Boilerplate
<?php
/**
* Data Liberation Boilerplate
* 1. Clone the https://github.com/wordPress/php-toolkit/ repo
* 2. Switch to branch push-qvpwqtzqzyrw (this PR https://github.com/WordPress/php-toolkit/pull/127)
* 3. Put this file in the repo root
* 4. Adjust the require paths below
* 5. Run the script
*
* It will import content from the accessibility unit test data WXR file at
* https://raw.githubusercontent.com/wpaccessibility/a11y-theme-unit-test/refs/heads/master/a11y-theme-unit-test-data.xml
*/
use WordPress\Blueprints\DataReference\DataReference;
use WordPress\Blueprints\DataReference\DataReferenceResolver;
use WordPress\Blueprints\DataReference\ExecutionContextPath;
use WordPress\Blueprints\DataReference\File;
use WordPress\Blueprints\Progress\Tracker;
use WordPress\HttpClient\Client;
use WordPress\DataLiberation\EntityReader\WXREntityReader;
use WordPress\DataLiberation\Importer\ImportSession;
use WordPress\DataLiberation\Importer\RetryFrontloadingIterator;
use WordPress\DataLiberation\Importer\StreamImporter;
use WordPress\Filesystem\LocalFilesystem;
// Run in context of a working WordPress site:
require_once getenv('DOCROOT') . '/wp-load.php';
/**
* You can easily start a fresh site with the Blueprints CLI tool:
*
* echo '{"version": 2}' > blueprint.json
* php components/Blueprints/bin/blueprint.php exec \
* ./blueprint.json \
* --site-path=./new-wp-site \
* --site-url=http://127.0.0.1:2457 \
* --db-engine=sqlite \
* --mode=create-new-site \
* --truncate-new-site-directory
*/
// Include the php-toolkit autoloader
require_once __DIR__ . '/vendor/autoload.php';
// Progress reporting, you can skip over on the first read {{{
interface ProgressReporter {
/**
* Report progress update
*
* @param float $progress Progress percentage (0-100)
* @param string $caption Progress caption/message
*/
public function reportProgress(float $progress, string $caption): void;
/**
* Report an error
*
* @param string $message Error message
* @param \Throwable|null $exception Optional exception details
*/
public function reportError(string $message, ?\Throwable $exception = null): void;
/**
* Report completion
*
* @param string $message Completion message
*/
public function reportCompletion(string $message): void;
/**
* Close/cleanup the reporter
*/
public function close(): void;
}
class TerminalProgressReporter implements ProgressReporter {
private $stdout;
private $lastProgress = -1;
private $lastCaption = '';
private $progressBarWidth = 50;
public function __construct() {
$this->stdout = fopen('php://stdout', 'w');
}
public function reportProgress(float $progress, string $caption): void {
// Don't repeat identical progress
if ($this->lastProgress === $progress && $this->lastCaption === $caption) {
return;
}
$this->lastProgress = $progress;
$this->lastCaption = $caption;
$percentage = min(100, max(0, $progress));
$filled = (int)round($this->progressBarWidth * ($percentage / 100));
$empty = $this->progressBarWidth - $filled;
$bar = str_repeat('=', $filled);
if ($empty > 0 && $filled < $this->progressBarWidth) {
$bar .= '>';
$bar .= str_repeat(' ', $empty - 1);
} else {
$bar .= str_repeat(' ', $empty);
}
$status = sprintf(
"\r[%s] %3.1f%% - %s",
$bar,
$percentage,
$caption
);
if ($this->isTty()) {
// Clear line and write new progress
fwrite($this->stdout, "\r\033[K" . $status);
} else {
// Non-TTY, just write new line
fwrite($this->stdout, $status . "\n");
}
fflush($this->stdout);
}
public function reportError(string $message, ?\Throwable $exception = null): void {
$this->clearCurrentLine();
$errorMsg = "\033[1;31mError:\033[0m " . $message;
if ($exception) {
$errorMsg .= " (" . $exception->getMessage() . ")";
}
fwrite($this->stdout, $errorMsg . "\n");
fflush($this->stdout);
}
public function reportCompletion(string $message): void {
$this->clearCurrentLine();
fwrite($this->stdout, "\033[1;32m" . $message . "\033[0m\n");
fflush($this->stdout);
}
public function close(): void {
if ($this->stdout) {
fclose($this->stdout);
}
}
private function clearCurrentLine(): void {
if ($this->isTty()) {
fwrite($this->stdout, "\r\033[K");
}
}
private function isTty(): bool {
return stream_isatty($this->stdout);
}
}
class JsonProgressReporter implements ProgressReporter {
private $outputFile;
public function __construct() {
$outputPath = getenv('OUTPUT_FILE') ?: 'php://stdout';
$this->outputFile = fopen($outputPath, 'w');
}
public function reportProgress(float $progress, string $caption): void {
$this->writeJsonMessage([
'type' => 'progress',
'progress' => round($progress, 2),
'caption' => $caption
]);
}
public function reportError(string $message, ?\Throwable $exception = null): void {
$errorData = [
'type' => 'error',
'message' => $message
];
if ($exception) {
$errorData['details'] = [
'exception' => get_class($exception),
'message' => $exception->getMessage(),
'file' => $exception->getFile(),
'line' => $exception->getLine(),
'trace' => $exception->getTraceAsString()
];
}
$this->writeJsonMessage($errorData);
}
public function reportCompletion(string $message): void {
$this->writeJsonMessage([
'type' => 'completion',
'message' => $message
]);
}
public function close(): void {
if ($this->outputFile) {
fclose($this->outputFile);
}
}
private function writeJsonMessage(array $data): void {
fwrite($this->outputFile, json_encode($data) . "\n");
fflush($this->outputFile);
}
}
function createProgressReporter(): ProgressReporter {
// Use JSON mode if OUTPUT_FILE is set or if we're not in a TTY
if (getenv('OUTPUT_FILE') || !stream_isatty(STDOUT)) {
return new JsonProgressReporter();
}
return new TerminalProgressReporter();
}
// }}}
$reporter = createProgressReporter();
/**
* Handle a fatal failure in your environment. In CLI we can
* just throw an exception.
*/
function bail_out( $message ) {
global $reporter;
$reporter->reportError($message);
throw new \InvalidArgumentException( $message );
};
function run_content_import( $options ) {
global $reporter;
if ( ! isset( $options['source'] ) ) {
help_message_and_die( 'The "wxr file" option is required.' );
}
$mainTracker = new Tracker();
// Set up progress reporting
$mainTracker->events->addListener(
'WordPress\Blueprints\Progress\ProgressEvent',
function($event) {
global $reporter;
$reporter->reportProgress($event->getProgress(), $event->getCaption());
}
);
try {
define( 'NEW_SITE_CONTENT_ROOT', get_site_url() );
$reporter->reportProgress(0, 'Target site URL: ' . NEW_SITE_CONTENT_ROOT);
// Set up progress stages
$mainTracker->split([
'setup' => ['ratio' => 10, 'caption' => 'Setting up import'],
'indexing' => ['ratio' => 20, 'caption' => 'Indexing entities'],
'assets' => ['ratio' => 30, 'caption' => 'Processing assets'],
'importing' => ['ratio' => 40, 'caption' => 'Importing content']
]);
$setupTracker = $mainTracker['setup'];
$setupTracker->set(10, 'Resolving content source');
$httpClient = new Client();
$content_source = DataReference::create($options['source'], [
ExecutionContextPath::class,
]);
$execution_context = LocalFilesystem::create($options['execution_context_root']);
$resolver = new DataReferenceResolver($httpClient);
$resolver->setExecutionContext($execution_context);
$resolved_source = $resolver->resolve_uncached($content_source);
$setupTracker->set(30, 'Configuring import mode');
$chrooted_fs = null;
$source_site_url = null;
if(!($resolved_source instanceof File)) {
bail_out( 'The "source" option must resolve to a file.' );
}
/**
* This is where you'd plug in ShopifyEntityReader, CSVEntityReader etc.
*/
$entity_reader_factory = function ( $cursor ) use ( $resolved_source ) {
$stream = $resolved_source->getStream();
$stream->seek(0);
return WXREntityReader::create(
$stream,
$cursor
);
};
$setupTracker->finish();
$source = $options['source'];
$reporter->reportProgress($mainTracker->getProgress(), "Importing static files from $source");
// Set up the URL mapping – see the StreamImporter internals for details on
// how URL mapping works. Tl;dr it parses all the content markup and replaces
// the old site domain with the new site domain.
$additional_url_mappings = array();
foreach ( $options['additional_site_urls'] ?? [] as $url ) {
$additional_url_mappings[] = array(
'from' => $url,
'to' => NEW_SITE_CONTENT_ROOT,
);
}
$importer = StreamImporter::create(
$entity_reader_factory,
array(
'source_site_url' => $source_site_url,
'new_site_content_root_url' => NEW_SITE_CONTENT_ROOT,
'source_media_root_urls' => $options['media_url'] ?? array( $source_site_url ),
'additional_url_mappings' => $additional_url_mappings,
'index_batch_size' => 1,
'attachment_downloader_options' => array(
'source_from_filesystem' => $chrooted_fs,
),
)
);
$import_session = ImportSession::create(
array(
'data_source' => 'local_directory',
// @TODO: the phrase "file_name" doesn't make sense here. We're sourcing
// data from a directory, not a file. This string is used to tell
// the user in the UI what this they're importing in this import
// session. Let's rename it to something more descriptive.
'file_name' => $options['source'],
)
);
$retries_iterator = new RetryFrontloadingIterator( $import_session->get_id() );
$importer->set_frontloading_retries_iterator( $retries_iterator );
// Run the import with progress tracking
$ignored_message_printed = false;
do {
$result = data_liberation_unit_of_work(
$import_session,
$importer,
$mainTracker,
$reporter
);
if ( $importer->get_stage() === StreamImporter::STAGE_FINISHED ) {
$reporter->reportProgress(100, 'Import completed successfully');
// Get the first page with non-empty content.
$posts = get_posts(
array(
'numberposts' => 10,
'orderby' => 'ID',
'order' => 'ASC',
'post_type' => 'page',
'post_status' => 'publish',
)
);
$url = NEW_SITE_CONTENT_ROOT;
foreach ( $posts as $post ) {
if ( ! empty( $post->post_content ) ) {
$url = get_permalink( $post );
break;
}
}
$reporter->reportCompletion("Import finished! See your imported content at: " . $url);
break;
} elseif ( false === $result ) {
if ( $importer->get_stage() === StreamImporter::STAGE_FRONTLOAD_ASSETS ) {
if ( ! $ignored_message_printed ) {
$reporter->reportProgress($mainTracker->getProgress(), "Some assets could not be downloaded – they will be ignored so we can continue with the import.");
$ignored_message_printed = true;
}
// We are explicitly ignoring media download errors in this
// simple example. In Woo importer you'll need to decide how
// to handle this. Ignoring could be fine, letting the user know
// and letting them provide their own media files would also be
// a good option.
$import_session->mark_frontloading_errors_as_ignored();
} else {
// We could not index or insert an entity – that's a fatal error.
bail_out("Import failed, aborting");
break;
}
}
} while ( true );
} catch ( \Throwable $e ) {
bail_out("Import failed: " . $e->getMessage());
throw $e;
} finally {
$reporter->close();
if ( isset( $cache_fs ) ) {
$cache_fs->rmdir(
'/',
array(
'recursive' => true,
)
);
}
}
}
/**
* Runs a batch of importing work up to the time constraint. It could also
* support memory constraints, time of days limits, maximum import rate, etc.
*/
function data_liberation_unit_of_work( ImportSession $session, StreamImporter $importer, Tracker $mainTracker ) {
$soft_time_limit_seconds = 15;
$hard_time_limit_seconds = 25;
$start_time = microtime( true );
$fetched_files = 0;
while ( true ) {
$time_taken = microtime( true ) - $start_time;
if ( $time_taken >= $soft_time_limit_seconds ) {
if ( $importer->get_stage() === StreamImporter::STAGE_FRONTLOAD_ASSETS ) {
if ( $fetched_files > 0 ) {
return true;
}
} else {
return true;
}
}
if ( $time_taken >= $hard_time_limit_seconds ) {
return true;
}
if ( true !== $importer->next_step() ) {
$session->set_reentrancy_cursor( $importer->get_reentrancy_cursor() );
$should_advance_to_next_stage = null !== $importer->get_next_stage();
if ( ! $should_advance_to_next_stage ) {
bail_out('Step failed in the middle of a stage: ' . $session->get_stage() . "\n");
}
if ( StreamImporter::STAGE_FRONTLOAD_ASSETS === $importer->get_stage() ) {
$resolved_all_failures = $session->count_unfinished_frontloading_stubs() === 0;
if ( ! $resolved_all_failures ) {
// Refuse to process if we have unresolved frontloading failures.
// It's a design choice. We could keep going and just let the user know about
// the failures.
return false;
}
}
if ( ! $importer->advance_to_next_stage() ) {
return false;
}
$session->set_stage( $importer->get_stage() );
$session->set_reentrancy_cursor( $importer->get_reentrancy_cursor() );
if ( $session->get_stage() === StreamImporter::STAGE_FINISHED ) {
return true;
}
continue;
}
switch ( $importer->get_stage() ) {
case StreamImporter::STAGE_INDEX_ENTITIES:
$entities_counts = $importer->get_indexed_entities_counts();
$session->create_frontloading_stubs( $importer->get_indexed_assets_urls() );
$session->bump_total_number_of_entities( $entities_counts );
$totalEntities = array_sum( $session->get_total_number_of_entities() );
$mainTracker['indexing']->set(
min(100, ($totalEntities / 100) * 100), // Rough progress calculation
'Indexing entities (' . $totalEntities . ' found)'
);
break;
case StreamImporter::STAGE_FRONTLOAD_ASSETS:
$progress = $importer->get_frontloading_progress();
$session->bump_frontloading_progress(
$progress,
$importer->get_frontloading_events()
);
$remaining = $session->count_unfinished_frontloading_stubs();
$mainTracker['assets']->set(
max(0, 100 - ($remaining / max(1, $remaining + ($progress['downloaded'] ?? 0)) * 100)),
'Fetching media files (' . $remaining . ' remaining)'
);
break;
case StreamImporter::STAGE_IMPORT_ENTITIES:
$imported_counts = $importer->get_imported_entities_counts();
$session->bump_imported_entities_counts( $imported_counts );
$imported = $session->count_all_imported_entities();
$total = $session->count_remaining_entities() + $imported;
$progress = $total > 0 ? ($imported / $total) * 100 : 0;
$mainTracker['importing']->set(
$progress,
'Importing entities (' . $imported . '/' . $total . ')'
);
break;
}
$session->set_reentrancy_cursor( $importer->get_reentrancy_cursor() );
}
return false;
}
// Finally, run the import
run_content_import( [
// It can be a URL, local path, git repo, etc. We parse this value and resolve
// based on what it seems to be.
'source' => 'https://raw.githubusercontent.com/wpaccessibility/a11y-theme-unit-test/refs/heads/master/a11y-theme-unit-test-data.xml',
'execution_context_root' => __DIR__,
] );
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment