Created
July 30, 2019 22:30
-
-
Save calibr/c8764d796db270c7c60460f254451580 to your computer and use it in GitHub Desktop.
yjs v13(refactored) level db persistence
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import level = require('level') | |
| import * as encoding from 'lib0/dist/encoding' | |
| import * as decoding from 'lib0/dist/decoding' | |
| import * as syncProtocol from 'y-protocols/dist/sync.js' | |
| import * as authProtocol from 'y-protocols/dist/auth.js' | |
| import { createMutex } from 'lib0/dist/mutex.js' | |
| const mux = createMutex() | |
| function getEmptyEncodedStateVector() { | |
| const encoder = encoding.createEncoder() | |
| encoding.writeVarUint(encoder, 0) | |
| return encoding.toUint8Array(encoder) | |
| } | |
| /* | |
| * Improves the uniqueness of timestamps. | |
| * We gamble with the fact that users won't create more than 10000 changes on a single document | |
| * within one millisecond (also assuming clock works correctly). | |
| */ | |
| let timestampIterator = 0 | |
| /** | |
| * @return {string} A random, time-based string starting with "${roomName}:" | |
| */ | |
| const getNextTimestamp = () => { | |
| timestampIterator = (timestampIterator + 1) % 10000 | |
| return `${Date.now()}${timestampIterator.toString().padStart(4, '0')}` | |
| } | |
| /** | |
| * @param {string} docName | |
| * @return {string} | |
| */ | |
| const generateEntryKey = docName => `${docName}#${getNextTimestamp()}` | |
| /** | |
| * | |
| * @param {any} db | |
| * @param {string} docName | |
| * @param {Uint8Array | ArrayBuffer} buf | |
| */ | |
| const writeEntry = (db, docName, buf) => db.put(generateEntryKey(docName), buf) | |
| /** | |
| * @param {Uint8Array} arr | |
| * @param {Y.Y} ydocument | |
| */ | |
| const readEntry = (arr, ydocument) => mux(() => | |
| syncProtocol.readSyncMessage( | |
| decoding.createDecoder(arr), encoding.createEncoder(), ydocument | |
| ) | |
| ) | |
| /** | |
| * @param {any} db | |
| * @param {string} docName | |
| * @param {Y.Y} ydocument | |
| */ | |
| const loadFromPersistence = (db, docName, ydocument) => new Promise((resolve, reject) => | |
| db.createReadStream({ | |
| gte: `${docName}#`, | |
| lte: `${docName}#Z`, | |
| keys: false, | |
| values: true | |
| }) | |
| .on('data', data => readEntry(data, ydocument)) | |
| .on('error', reject) | |
| .on('end', resolve) | |
| .on('close', resolve) | |
| ) | |
| const persistState = (db, docName, ydocument) => { | |
| const encoder = encoding.createEncoder() | |
| syncProtocol.writeSyncStep2(encoder, ydocument, getEmptyEncodedStateVector()) | |
| const entryKey = generateEntryKey(docName) | |
| const entryPromise = db.put(entryKey, encoding.toUint8Array(encoder)) | |
| const delOps = [] | |
| return new Promise((resolve, reject) => db.createKeyStream({ | |
| gte: `${docName}#`, | |
| lt: entryKey | |
| }) | |
| .on('data', key => delOps.push({ type: 'del', key })) | |
| .on('error', reject) | |
| .on('end', resolve) | |
| .on('close', resolve) | |
| ).then(() => entryPromise).then(() => db.batch(delOps)) | |
| } | |
| /** | |
| * Persistence layer for Leveldb. | |
| */ | |
| export class LevelDbPersistence { | |
| /** | |
| * @param {string} fpath Path to leveldb database | |
| */ | |
| constructor (fpath, conf = {}) { | |
| this.db = level(fpath, { valueEncoding: 'binary' }) | |
| this.conf = Object.assign({ | |
| writeStateOnLoad: true | |
| }, conf) | |
| } | |
| /** | |
| * Retrieve all data from LevelDB and automatically persist all document updates to leveldb. | |
| * | |
| * @param {string} docName | |
| * @param {Y.Y} ydocument | |
| */ | |
| bindState (docName, ydocument) { | |
| const broadcastUpdate = (update, origin, y) => { | |
| if (origin !== 'remote') { | |
| const encoder = encoding.createEncoder() | |
| encoding.writeVarUint(encoder, messageSync) | |
| syncProtocol.writeUpdate(encoder, update) | |
| const buf = encoding.toUint8Array(encoder) | |
| if (y.wsconnected) { | |
| // @ts-ignore We know that wsconnected = true | |
| y.ws.send(buf) | |
| } | |
| } | |
| } | |
| // write all updates received from other clients | |
| // - unless it is created by this persistence layer (e.g. loadFromPersistence, we we mux). | |
| ydocument.on('update', (update, origin, y) => { | |
| mux(() => { | |
| const encoder = encoding.createEncoder() | |
| syncProtocol.writeUpdate(encoder, update) | |
| writeEntry(this.db, docName, encoding.toUint8Array(encoder)) | |
| }) | |
| }) | |
| // read all data from persistence | |
| return loadFromPersistence(this.db, docName, ydocument).then(() => { | |
| // write current state (just in case anything was added before state was bound) | |
| if(this.conf.writeStateOnLoad) { | |
| this.writeState(docName, ydocument) | |
| } | |
| }) | |
| } | |
| /** | |
| * Write current state to persistence layer. Deletes all entries that were made before. | |
| * Call this method at any time - the recommended time to call this method is before the ydocument is destroyed. | |
| * | |
| * @param {string} docName | |
| * @param {Y.Y} ydocument | |
| */ | |
| writeState (docName, ydocument) { | |
| return persistState(this.db, docName, ydocument) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For future learners: you can also use
performance.now()instead of the customgetNextTimestampfunction above. See yjs/yjs#170 (comment)