Skip to content

Instantly share code, notes, and snippets.

@richytong
Created April 23, 2024 02:44
Show Gist options
  • Select an option

  • Save richytong/e868ac0e8fcac567908dbb3a4d83ebde to your computer and use it in GitHub Desktop.

Select an option

Save richytong/e868ac0e8fcac567908dbb3a4d83ebde to your computer and use it in GitHub Desktop.
require('rubico/global')
const Transducer = require('rubico/Transducer')
const x = require('rubico/x')
const Dynamo = require('presidium/Dynamo')
const DynamoIndexQueryIterator = require('presidium/DynamoIndexQueryIterator')
const { groupBy } = x
// intoMapBy(attr string)(array Array) -> Map
const intoMapBy = attr => array => {
const result = new Map()
for (const item of array) {
const key = item[attr]
result.set(key, item)
}
return result
}
class PostCache {
constructor(options) {
this.dependencies = options.dependencies
this.ready = this.readyPromise()
}
async readyPromise() {
const {
postStream,
postImageStream,
postCoinStream,
postCommentStream,
postCommentImageStream,
postCommentCoinStream,
commentCommentStream,
commentCommentCoinStream,
commentCommentImageStream,
profileStream,
postTypeCreateTimeIndex,
postImageTypeCreateTimeIndex,
postCoinTypeCreateTimeIndex,
postCommentTypeCreateTimeIndex,
postCommentImageTypeCreateTimeIndex,
postCommentCoinTypeCreateTimeIndex,
commentCommentTypeCreateTimeIndex,
commentCommentCoinTypeCreateTimeIndex,
commentCommentImageTypeCreateTimeIndex,
profileTypeCreateTimeIndex,
} = this.dependencies
await Promise.all([
pipe(DynamoIndexQueryIterator(
postTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'post', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
all({
postMap: intoMapBy('postSlug'),
communityPostsMap: groupBy('communitySlug'),
}),
({ postMap, communityPostsMap }) => {
this.postMap = postMap
this.communityPostsMap = communityPostsMap
},
]),
pipe(DynamoIndexQueryIterator(
postImageTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'image', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('postSlug'),
m => {
// postSlug => Array<postImage>
this.postImagesMap = m
},
]),
pipe(DynamoIndexQueryIterator(
postCoinTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'coin', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('postSlug'),
m => {
// postSlug => Array<postCoin>
this.postCoinsMap = m
},
]),
pipe(DynamoIndexQueryIterator(
postCommentTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'comment', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('postSlug'),
m => {
// postSlug => Array<postComment>
this.postCommentsMap = m
},
]),
pipe(DynamoIndexQueryIterator(
postCommentImageTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'image', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('commentId'),
m => {
// postCommentId => Array<postCommentImage>
this.postCommentImagesMap = m
},
]),
pipe(DynamoIndexQueryIterator(
postCommentCoinTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'coin', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('commentId'),
m => {
// postCommentId => Array<postCommentCoin>
this.postCommentCoinsMap = m
},
]),
pipe(DynamoIndexQueryIterator(
commentCommentTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'comment', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('parentCommentId'),
m => {
// postCommentId => Array<commentComment>
this.commentCommentsMap = m
},
]),
pipe(DynamoIndexQueryIterator(
commentCommentCoinTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'coin', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('commentId'),
m => {
// commentCommentId => Array<commentCommentCoin>
this.commentCommentCoinsMap = m
},
]),
pipe(DynamoIndexQueryIterator(
commentCommentImageTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'image', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
groupBy('commentId'),
m => {
// commentCommentId => Array<commentCommentImage>
this.commentCommentImagesMap = m
},
]),
pipe(DynamoIndexQueryIterator(
profileTypeCreateTimeIndex,
'type = :type AND createTime > :createTime',
{ type: 'profile', createTime: 0 },
{ scanIndexForward: true },
), [
transform(Transducer.map(map(Dynamo.attributeValueToJSON)), []),
intoMapBy('phoneNumber'),
m => {
// phoneNumber => Array<profile>
this.profileMap = m
},
]),
])
setImmediate(async () => {
for await (const streamItem of postStream) {
this._handlePostStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of postImageStream) {
this._handlePostImageStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of postCoinStream) {
this._handlePostCoinStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of postCommentStream) {
this._handlePostCommentStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of postCommentImageStream) {
this._handlePostCommentImageStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of postCommentCoinStream) {
this._handlePostCommentCoinStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of commentCommentStream) {
this._handleCommentCommentStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of commentCommentCoinStream) {
this._handleCommentCommentCoinStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of commentCommentImageStream) {
this._handleCommentCommentImageStreamItem(streamItem)
}
})
setImmediate(async () => {
for await (const streamItem of profileStream) {
this._handleProfileStreamItem(streamItem)
}
})
}
_genericHandleStreamItemForMapOfArrays(streamItem, options) {
const { m, mapKeyName, arrayKeyName } = options
if (streamItem.eventName == 'REMOVE') {
const item = pipe(streamItem, [
get('dynamodb.OldImage'),
map(Dynamo.attributeValueToJSON),
])
const mapKey = item[mapKeyName]
const arrayKey = item[arrayKeyName]
if (!m.has(mapKey)) {
console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`)
m.set(mapKey, [])
}
const array = m.get(mapKey)
const index = array.findIndex(eq(arrayKey, get(arrayKeyName)))
if (index > -1) {
array.splice(index, 1)
} else {
console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`)
}
} else if (streamItem.eventName == 'INSERT') {
const item = pipe(streamItem, [
get('dynamodb.NewImage'),
map(Dynamo.attributeValueToJSON),
])
const mapKey = item[mapKeyName]
const arrayKey = item[arrayKeyName]
if (!m.has(mapKey)) {
m.set(mapKey, [])
}
m.get(mapKey).push(item)
} else if (streamItem.eventName == 'MODIFY') {
const item = pipe(streamItem, [
get('dynamodb.NewImage'),
map(Dynamo.attributeValueToJSON),
])
const mapKey = item[mapKeyName]
const arrayKey = item[arrayKeyName]
if (!m.has(mapKey)) {
console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`)
m.set(mapKey, [])
}
const array = m.get(mapKey)
const index = array.findIndex(eq(arrayKey, get(arrayKeyName)))
if (index > -1) {
array[index] = item
} else {
console.error(`Item ${arrayKey}, ${mapKey} (${mapKeyName}, ${arrayKeyName}) not found`)
array.push(item)
}
}
}
_genericHandleStreamItemForDictionary(streamItem, options) {
const { m, mapKeyName } = options
if (streamItem.eventName == 'REMOVE') {
const item = pipe(streamItem, [
get('dynamodb.OldImage'),
map(Dynamo.attributeValueToJSON),
])
const mapKey = item[mapKeyName]
m.delete(mapKey)
} else if (streamItem.eventName == 'INSERT' || streamItem.eventName == 'MODIFY') {
const item = pipe(streamItem, [
get('dynamodb.NewImage'),
map(Dynamo.attributeValueToJSON),
])
const mapKey = item[mapKeyName]
m.set(mapKey, item)
}
}
_handlePostStreamItem(streamItem) {
this._genericHandleStreamItemForDictionary(streamItem, {
m: this.postMap,
mapKeyName: 'postSlug',
})
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.communityPostsMap,
mapKeyName: 'communitySlug',
arrayKeyName: 'postSlug',
})
}
_handlePostImageStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.postImagesMap,
mapKeyName: 'postSlug',
arrayKeyName: 'postImageKey',
})
}
_handlePostCoinStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.postCoinsMap,
mapKeyName: 'postSlug',
arrayKeyName: 'senderReferrerHash',
})
}
_handlePostCommentStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.postCommentsMap,
mapKeyName: 'postSlug',
arrayKeyName: 'commentId',
})
}
_handlePostCommentImageStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.postCommentImagesMap,
mapKeyName: 'commentId',
arrayKeyName: 'commentImageKey',
})
}
_handlePostCommentCoinStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.postCommentCoinsMap,
mapKeyName: 'commentId',
arrayKeyName: 'senderReferrerHash',
})
}
_handleCommentCommentStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.commentCommentsMap,
mapKeyName: 'parentCommentId',
arrayKeyName: 'commentId',
})
}
_handleCommentCommentCoinStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.commentCommentCoinsMap,
mapKeyName: 'commentId',
arrayKeyName: 'senderReferrerHash',
})
}
_handleCommentCommentImageStreamItem(streamItem) {
this._genericHandleStreamItemForMapOfArrays(streamItem, {
m: this.commentCommentImagesMap,
mapKeyName: 'commentId',
arrayKeyName: 'commentImageKey',
})
}
_handleProfileStreamItem(streamItem) {
this._genericHandleStreamItemForDictionary(streamItem, {
m: this.profileMap,
mapKeyName: 'phoneNumber',
})
}
_completePost(post) {
const { postSlug } = post
const {
postImagesMap,
postCoinsMap,
postCommentsMap,
postCommentImagesMap,
postCommentCoinsMap,
commentCommentsMap,
commentCommentCoinsMap,
commentCommentImagesMap,
profileMap,
} = this
const result = { ...post }
result.postImages = postImagesMap.get(postSlug) ?? []
result.postCoins = postCoinsMap.get(postSlug) ?? []
result.posterProfileUrl =
profileMap.get(post.posterPhoneNumber)?.profileImageUrl
?? post.posterProfileUrl
result.postComments =
(postCommentsMap.get(postSlug) ?? []).map(postComment => {
const {
commentId,
commenterPhoneNumber,
commenterProfileImageUrl,
} = postComment
const result2 = { ...postComment }
result2.commentImages = postCommentImagesMap.get(commentId) ?? []
result2.postCommentCoins = postCommentCoinsMap.get(commentId) ?? []
result2.commenterProfileImageUrl =
profileMap.get(commenterPhoneNumber)?.profileImageUrl
?? commenterProfileImageUrl
result2.commentComments =
(commentCommentsMap.get(commentId) ?? []).map(commentComment => {
const {
commentId,
commenterPhoneNumber,
commenterProfileImageUrl,
} = commentComment
const result3 = { ...commentComment }
result3.commentImages =
commentCommentImagesMap.get(commentId) ?? []
result3.commentCommentCoins =
commentCommentCoinsMap.get(commentId) ?? []
result3.commenterProfileImageUrl =
profileMap.get(commenterPhoneNumber)?.profileImageUrl
?? commenterProfileImageUrl
return result3
})
return result2
})
return result
}
addPost(post) {
const { postSlug, communitySlug } = post
const { postMap, communityPostsMap } = this
postMap.set(postSlug, post)
if (communityPostsMap.has(communitySlug)) {
communityPostsMap.get(communitySlug).push(post)
} else {
communityPostsMap.set(communitySlug, [post])
}
}
getByPostSlug(postSlug) {
const { postMap } = this
const post = postMap.get(postSlug)
if (post == null) {
return undefined
}
return this._completePost(post)
}
findByCommunitySlug(communitySlug, options = {}) {
const { communityPostsMap } = this
const {
page = 1,
pageSize = 100,
order = 1, // ASC, -1 for DESC
sortField = 'createTime',
} = options
const startIndex = (page - 1) * pageSize
const stopIndex = page * pageSize
const communityPosts = communityPostsMap.get(communitySlug) ?? []
return (
communityPosts
.map(post => this._completePost(post))
.sort((a, b) => order * (a[sortField] - b[sortField]))
.slice(startIndex, stopIndex)
)
}
getLastCommunityPage(communitySlug, options = {}) {
const { communityPostsMap } = this
const { pageSize = 100 } = options
const communityPosts = communityPostsMap.get(communitySlug) ?? []
return Math.ceil(communityPosts.length / pageSize)
}
}
module.exports = PostCache
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment