Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active October 16, 2024 17:26
Show Gist options
  • Select an option

  • Save ottomata/dcffa388ea738c326d281889ceb04ffa to your computer and use it in GitHub Desktop.

Select an option

Save ottomata/dcffa388ea738c326d281889ceb04ffa to your computer and use it in GitHub Desktop.
Apache Paimon + MediaWIki demo

Apache Paimon + MediaWiki demo

Screen Recording: https://vimeo.com/942403540?share=copy

Setup

./bin/flink run ../paimon-flink-action-0.8-20240501.002151-76.jar mysql_sync_database \
    --warehouse file:/tmp/paimon \
    --database mediawiki1 \
    --mysql_conf hostname=localhost \
    --mysql_conf username=root \
    --mysql_conf password=main_root_password \
    --mysql_conf database-name='my_database' \
    --mysql_conf server-time-zone='UTC'

Check Flink UI to see all the crazy tasks syncing all MariaDB tables!

Flink Batch SQL Query

Start Flink SQL client ./bin/sql-client.sh

Create the (temporary) Paimon catalog, and create an empty 'mediawiki' database.

CREATE CATALOG my_catalog WITH (
    'type'='paimon',
    'warehouse'='file:/tmp/paimon'
);

USE CATALOG my_catalog;

-- Database already created by paimon mysql_sync_database job
USE mediawiki1;
-- use tableau result mode
SET 'sql-client.execution.result-mode' = 'tableau';

-- switch to batch mode
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';


SHOW TABLES;
DESCRIBE page;

-- Join and select page_title, rev_id, content_body WHERE page_id = 4

/*
NOTE: 
We have to DECODE to UTF-8 because MediaWiki stores strings as varbinary, 
and Flink mysql-cdc doesn't know about that :/
 */
SELECT 
    DECODE(page.page_title, 'UTF-8') as page_title, 
    revision.rev_id, 
    DECODE(text.old_text, 'UTF-8') as content_body
FROM page, revision, slots, content, text
WHERE 
    /*
    NOTE: 
    In local dev wiki, the content_address looks like tt:1.
    Here we get the text.old_id join via SUBSTRING. 
    We also have to cast string to int so join can be done.
    In prod we couldn't join like this, as the 'text' table doesn't exist; 
    it is in 'external storage'
    */
    CAST(SUBSTRING(DECODE(content.content_address, 'UTF-8'), 4) AS BIGINT)  = text.old_id AND
    slots.slot_content_id = content.content_id AND
    revision.rev_id = slots.slot_revision_id AND
    revision.rev_page = page.page_id AND
    page_id = 4
;

Streaming queries are also possible via

SET 'execution.runtime-mode' = 'streaming';

Locally, I have not allocated enough resources and it takes a while for the replication updates to propagate

Query with Spark

TODO!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment