Created
April 23, 2024 02:44
-
-
Save richytong/e868ac0e8fcac567908dbb3a4d83ebde to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require('rubico/global') | |
| const Transducer = require('rubico/Transducer') | |
| const x = require('rubico/x') | |
| const Dynamo = require('presidium/Dynamo') | |
| const DynamoIndexQueryIterator = require('presidium/DynamoIndexQueryIterator') | |
| const { groupBy } = x | |
| // intoMapBy(attr string)(array Array) -> Map | |
| const intoMapBy = attr => array => { | |
| const result = new Map() | |
| for (const item of array) { | |
| const key = item[attr] | |
| result.set(key, item) | |
| } | |
| return result | |
| } | |
| class PostCache { | |
| constructor(options) { | |
| this.dependencies = options.dependencies | |
| this.ready = this.readyPromise() | |
| } | |
| async readyPromise() { | |
| const { | |
| postStream, | |
| postImageStream, | |
| postCoinStream, | |
| postCommentStream, | |
| postCommentImageStream, | |
| postCommentCoinStream, | |
| commentCommentStream, | |
| commentCommentCoinStream, | |
| commentCommentImageStream, | |
| profileStream, | |
| postTypeCreateTimeIndex, | |
| postImageTypeCreateTimeIndex, | |
| postCoinTypeCreateTimeIndex, | |
| postCommentTypeCreateTimeIndex, | |
| postCommentImageTypeCreateTimeIndex, | |
| postCommentCoinTypeCreateTimeIndex, | |
| commentCommentTypeCreateTimeIndex, | |
| commentCommentCoinTypeCreateTimeIndex, | |
| commentCommentImageTypeCreateTimeIndex, | |
| profileTypeCreateTimeIndex, | |
| } = this.dependencies | |
| await Promise.all([ | |
| pipe(DynamoIndexQueryIterator( | |
| postTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'post', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| all({ | |
| postMap: intoMapBy('postSlug'), | |
| communityPostsMap: groupBy('communitySlug'), | |
| }), | |
| ({ postMap, communityPostsMap }) => { | |
| this.postMap = postMap | |
| this.communityPostsMap = communityPostsMap | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| postImageTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'image', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('postSlug'), | |
| m => { | |
| // postSlug => Array<postImage> | |
| this.postImagesMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| postCoinTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'coin', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('postSlug'), | |
| m => { | |
| // postSlug => Array<postCoin> | |
| this.postCoinsMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| postCommentTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'comment', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('postSlug'), | |
| m => { | |
| // postSlug => Array<postComment> | |
| this.postCommentsMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| postCommentImageTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'image', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('commentId'), | |
| m => { | |
| // postCommentId => Array<postCommentImage> | |
| this.postCommentImagesMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| postCommentCoinTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'coin', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('commentId'), | |
| m => { | |
| // postCommentId => Array<postCommentCoin> | |
| this.postCommentCoinsMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| commentCommentTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'comment', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('parentCommentId'), | |
| m => { | |
| // postCommentId => Array<commentComment> | |
| this.commentCommentsMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| commentCommentCoinTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'coin', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('commentId'), | |
| m => { | |
| // commentCommentId => Array<commentCommentCoin> | |
| this.commentCommentCoinsMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| commentCommentImageTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'image', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| groupBy('commentId'), | |
| m => { | |
| // commentCommentId => Array<commentCommentImage> | |
| this.commentCommentImagesMap = m | |
| }, | |
| ]), | |
| pipe(DynamoIndexQueryIterator( | |
| profileTypeCreateTimeIndex, | |
| 'type = :type AND createTime > :createTime', | |
| { type: 'profile', createTime: 0 }, | |
| { scanIndexForward: true }, | |
| ), [ | |
| transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []), | |
| intoMapBy('phoneNumber'), | |
| m => { | |
| // phoneNumber => Array<profile> | |
| this.profileMap = m | |
| }, | |
| ]), | |
| ]) | |
| setImmediate(async () => { | |
| for await (const streamItem of postStream) { | |
| this._handlePostStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of postImageStream) { | |
| this._handlePostImageStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of postCoinStream) { | |
| this._handlePostCoinStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of postCommentStream) { | |
| this._handlePostCommentStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of postCommentImageStream) { | |
| this._handlePostCommentImageStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of postCommentCoinStream) { | |
| this._handlePostCommentCoinStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of commentCommentStream) { | |
| this._handleCommentCommentStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of commentCommentCoinStream) { | |
| this._handleCommentCommentCoinStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of commentCommentImageStream) { | |
| this._handleCommentCommentImageStreamItem(streamItem) | |
| } | |
| }) | |
| setImmediate(async () => { | |
| for await (const streamItem of profileStream) { | |
| this._handleProfileStreamItem(streamItem) | |
| } | |
| }) | |
| } | |
| _genericHandleStreamItemForMapOfArrays(streamItem, options) { | |
| const { m, mapKeyName, arrayKeyName } = options | |
| if (streamItem.eventName == 'REMOVE') { | |
| const item = pipe(streamItem, [ | |
| get('dynamodb.OldImage'), | |
| map(Dynamo.attributeValueToJSON), | |
| ]) | |
| const mapKey = item[mapKeyName] | |
| const arrayKey = item[arrayKeyName] | |
| if (!m.has(mapKey)) { | |
| console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`) | |
| m.set(mapKey, []) | |
| } | |
| const array = m.get(mapKey) | |
| const index = array.findIndex(eq(arrayKey, get(arrayKeyName))) | |
| if (index > -1) { | |
| array.splice(index, 1) | |
| } else { | |
| console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`) | |
| } | |
| } else if (streamItem.eventName == 'INSERT') { | |
| const item = pipe(streamItem, [ | |
| get('dynamodb.NewImage'), | |
| map(Dynamo.attributeValueToJSON), | |
| ]) | |
| const mapKey = item[mapKeyName] | |
| const arrayKey = item[arrayKeyName] | |
| if (!m.has(mapKey)) { | |
| m.set(mapKey, []) | |
| } | |
| m.get(mapKey).push(item) | |
| } else if (streamItem.eventName == 'MODIFY') { | |
| const item = pipe(streamItem, [ | |
| get('dynamodb.NewImage'), | |
| map(Dynamo.attributeValueToJSON), | |
| ]) | |
| const mapKey = item[mapKeyName] | |
| const arrayKey = item[arrayKeyName] | |
| if (!m.has(mapKey)) { | |
| console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`) | |
| m.set(mapKey, []) | |
| } | |
| const array = m.get(mapKey) | |
| const index = array.findIndex(eq(arrayKey, get(arrayKeyName))) | |
| if (index > -1) { | |
| array[index] = item | |
| } else { | |
| console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`) | |
| array.push(item) | |
| } | |
| } | |
| } | |
| _genericHandleStreamItemForDictionary(streamItem, options) { | |
| const { m, mapKeyName } = options | |
| if (streamItem.eventName == 'REMOVE') { | |
| const item = pipe(streamItem, [ | |
| get('dynamodb.OldImage'), | |
| map(Dynamo.attributeValueToJSON), | |
| ]) | |
| const mapKey = item[mapKeyName] | |
| m.delete(mapKey) | |
| } else if (streamItem.eventName == 'INSERT' || streamItem.eventName == 'MODIFY') { | |
| const item = pipe(streamItem, [ | |
| get('dynamodb.NewImage'), | |
| map(Dynamo.attributeValueToJSON), | |
| ]) | |
| const mapKey = item[mapKeyName] | |
| m.set(mapKey, item) | |
| } | |
| } | |
| _handlePostStreamItem(streamItem) { | |
| this._genericHandleStreamItemForDictionary(streamItem, { | |
| m: this.postMap, | |
| mapKeyName: 'postSlug', | |
| }) | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.communityPostsMap, | |
| mapKeyName: 'communitySlug', | |
| arrayKeyName: 'postSlug', | |
| }) | |
| } | |
| _handlePostImageStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.postImagesMap, | |
| mapKeyName: 'postSlug', | |
| arrayKeyName: 'postImageKey', | |
| }) | |
| } | |
| _handlePostCoinStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.postCoinsMap, | |
| mapKeyName: 'postSlug', | |
| arrayKeyName: 'senderReferrerHash', | |
| }) | |
| } | |
| _handlePostCommentStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.postCommentsMap, | |
| mapKeyName: 'postSlug', | |
| arrayKeyName: 'commentId', | |
| }) | |
| } | |
| _handlePostCommentImageStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.postCommentImagesMap, | |
| mapKeyName: 'commentId', | |
| arrayKeyName: 'commentImageKey', | |
| }) | |
| } | |
| _handlePostCommentCoinStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.postCommentCoinsMap, | |
| mapKeyName: 'commentId', | |
| arrayKeyName: 'senderReferrerHash', | |
| }) | |
| } | |
| _handleCommentCommentStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.commentCommentsMap, | |
| mapKeyName: 'parentCommentId', | |
| arrayKeyName: 'commentId', | |
| }) | |
| } | |
| _handleCommentCommentCoinStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.commentCommentCoinsMap, | |
| mapKeyName: 'commentId', | |
| arrayKeyName: 'senderReferrerHash', | |
| }) | |
| } | |
| _handleCommentCommentImageStreamItem(streamItem) { | |
| this._genericHandleStreamItemForMapOfArrays(streamItem, { | |
| m: this.commentCommentImagesMap, | |
| mapKeyName: 'commentId', | |
| arrayKeyName: 'commentImageKey', | |
| }) | |
| } | |
| _handleProfileStreamItem(streamItem) { | |
| this._genericHandleStreamItemForDictionary(streamItem, { | |
| m: this.profileMap, | |
| mapKeyName: 'phoneNumber', | |
| }) | |
| } | |
| _completePost(post) { | |
| const { postSlug } = post | |
| const { | |
| postImagesMap, | |
| postCoinsMap, | |
| postCommentsMap, | |
| postCommentImagesMap, | |
| postCommentCoinsMap, | |
| commentCommentsMap, | |
| commentCommentCoinsMap, | |
| commentCommentImagesMap, | |
| profileMap, | |
| } = this | |
| const result = { ...post } | |
| result.postImages = postImagesMap.get(postSlug) ?? [] | |
| result.postCoins = postCoinsMap.get(postSlug) ?? [] | |
| result.posterProfileUrl = | |
| profileMap.get(post.posterPhoneNumber)?.profileImageUrl | |
| ?? post.posterProfileUrl | |
| result.postComments = | |
| (postCommentsMap.get(postSlug) ?? []).map(postComment => { | |
| const { | |
| commentId, | |
| commenterPhoneNumber, | |
| commenterProfileImageUrl, | |
| } = postComment | |
| const result2 = { ...postComment } | |
| result2.commentImages = postCommentImagesMap.get(commentId) ?? [] | |
| result2.postCommentCoins = postCommentCoinsMap.get(commentId) ?? [] | |
| result2.commenterProfileImageUrl = | |
| profileMap.get(commenterPhoneNumber)?.profileImageUrl | |
| ?? commenterProfileImageUrl | |
| result2.commentComments = | |
| (commentCommentsMap.get(commentId) ?? []).map(commentComment => { | |
| const { | |
| commentId, | |
| commenterPhoneNumber, | |
| commenterProfileImageUrl, | |
| } = commentComment | |
| const result3 = { ...commentComment } | |
| result3.commentImages = | |
| commentCommentImagesMap.get(commentId) ?? [] | |
| result3.commentCommentCoins = | |
| commentCommentCoinsMap.get(commentId) ?? [] | |
| result3.commenterProfileImageUrl = | |
| profileMap.get(commenterPhoneNumber)?.profileImageUrl | |
| ?? commenterProfileImageUrl | |
| return result3 | |
| }) | |
| return result2 | |
| }) | |
| return result | |
| } | |
| addPost(post) { | |
| const { postSlug, communitySlug } = post | |
| const { postMap, communityPostsMap } = this | |
| postMap.set(postSlug, post) | |
| if (communityPostsMap.has(communitySlug)) { | |
| communityPostsMap.get(communitySlug).push(post) | |
| } else { | |
| communityPostsMap.set(communitySlug, [post]) | |
| } | |
| } | |
| getByPostSlug(postSlug) { | |
| const { postMap } = this | |
| const post = postMap.get(postSlug) | |
| if (post == null) { | |
| return undefined | |
| } | |
| return this._completePost(post) | |
| } | |
| findByCommunitySlug(communitySlug, options = {}) { | |
| const { communityPostsMap } = this | |
| const { | |
| page = 1, | |
| pageSize = 100, | |
| order = 1, // ASC, -1 for DESC | |
| sortField = 'createTime', | |
| } = options | |
| const startIndex = (page - 1) * pageSize | |
| const stopIndex = page * pageSize | |
| const communityPosts = communityPostsMap.get(communitySlug) ?? [] | |
| return ( | |
| communityPosts | |
| .map(post => this._completePost(post)) | |
| .sort((a, b) => order * (a[sortField] - b[sortField])) | |
| .slice(startIndex, stopIndex) | |
| ) | |
| } | |
| getLastCommunityPage(communitySlug, options = {}) { | |
| const { communityPostsMap } = this | |
| const { pageSize = 100 } = options | |
| const communityPosts = communityPostsMap.get(communitySlug) ?? [] | |
| return Math.ceil(communityPosts.length / pageSize) | |
| } | |
| } | |
| module.exports = PostCache |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment