Each super source type and node type creates its own queue and worker:
N super sources × M node types = N×M queues/workers
Example with current sources:
- Linear (LinearTeam → LinearIssue, LinearComment)
- Notion (NotionWorkspace → NotionPage, NotionDatabase)
- GitHub (GithubRepo → GithubIssue, GithubPullRequest)
- Google Drive, Asana, Attio, Hubspot, Git, GitLab, Website...
This results in 20+ queues/workers, and grows with every new source.
- Memory: Each worker consumes memory even when idle
- Complexity: Adding a new source requires no code changes but spawns new workers
- Monitoring: Hard to monitor 20+ queues
- Resource waste: Most queues are idle most of the time
Instead of per-source queues, consolidate into speed tiers based on API rate limits and concurrency needs.
Different sources have different API constraints:
- Fast APIs (Linear, GitHub): Can handle high concurrency (5+)
- Slow APIs (Notion, Google Drive): Need lower concurrency (2) due to rate limits
Using a single queue with groups would require either:
- Grouping without
organizationId→ lose per-org isolation - Dynamic
setGroupConcurrencyper org → complex and impractical
Speed tiers give us:
- ✅ Per-source concurrency tuning
- ✅ Per-org isolation (via groups within each tier)
- ✅ Fewer queues (6 vs 20+)
- ✅ Simple to add new sources (assign to a tier)
| Queue | Concurrency | Sources |
|---|---|---|
ImportSuperSourceRootFast |
1 | Linear, GitHub, Git, GitLab |
ImportSuperSourceRootSlow |
1 | Notion, Google Drive, Attio, Hubspot, Asana |
IndexSuperSourceNodeFast |
5 | Linear, GitHub, Git, GitLab |
IndexSuperSourceNodeSlow |
2 | Notion, Google Drive, Attio, Hubspot, Asana |
HandleSuperWebhookFast |
5 | Linear, GitHub |
HandleSuperWebhookSlow |
2 | Others |
Total: 6 queues (vs 20+ currently)
// src/workers/utils/super-source-tiers.ts
export enum SuperSourceTier {
FAST = 'Fast',
SLOW = 'Slow',
}
export const SUPER_SOURCE_TIER_CONFIG: Record<SuperSourceTier, {
importConcurrency: number
indexConcurrency: number
webhookConcurrency: number
}> = {
[SuperSourceTier.FAST]: {
importConcurrency: 1,
indexConcurrency: 5,
webhookConcurrency: 5,
},
[SuperSourceTier.SLOW]: {
importConcurrency: 1,
indexConcurrency: 2,
webhookConcurrency: 2,
},
}
// Map each source to a tier
export const SUPER_SOURCE_TO_TIER: Record<SuperSource, SuperSourceTier> = {
[SuperSource.Linear]: SuperSourceTier.FAST,
[SuperSource.GitHub]: SuperSourceTier.FAST,
[SuperSource.Git]: SuperSourceTier.FAST,
[SuperSource.GitLab]: SuperSourceTier.FAST,
[SuperSource.Notion]: SuperSourceTier.SLOW,
[SuperSource.GoogleDrive]: SuperSourceTier.SLOW,
[SuperSource.Attio]: SuperSourceTier.SLOW,
[SuperSource.Hubspot]: SuperSourceTier.SLOW,
[SuperSource.Asana]: SuperSourceTier.SLOW,
[SuperSource.Website]: SuperSourceTier.SLOW,
// Add new sources here
}
export function getTierForSource(source: SuperSource): SuperSourceTier {
return SUPER_SOURCE_TO_TIER[source] ?? SuperSourceTier.SLOW
}// src/workers/utils/queue.ts
type SuperImportPayload = {
superSource: SuperSource
superRootNodeType: SuperRootNodeType
organizationId: string
rootNodeId: string
rootNodeConnectionId: string
firstRun?: boolean
progress?: Record<string, unknown>
}
export function makeSuperImportWorkerAndQueue(
services: QueueServiceDependencies,
tier: SuperSourceTier
) {
const tierConfig = SUPER_SOURCE_TIER_CONFIG[tier]
return makeSliteWorkerAndQueue<void, SuperImportPayload>({
queueName: `ImportSuperSourceRoot${tier}`,
factoryOptions: {
defaultQueueOptions: {
unique: {
byKeys: ['organizationId', 'rootNodeConnectionId', 'rootNodeId'],
},
removeOnFail: false,
retryStrategy: {
attempts: 5,
backoff: { type: 'custom' },
},
group: { byKeys: ['organizationId'] },
},
defaultWorkerOptions: {
group: {
concurrency: tierConfig.importConcurrency,
},
settings: {
backoffStrategy: externalServiceBackoffStrategy,
},
},
},
async processor(processorServices, job, token) {
const {
superSource,
superRootNodeType,
rootNodeId,
rootNodeConnectionId,
organizationId,
progress,
firstRun,
} = job.data
// Check rate limit
const resetAt = await checkRateLimitAndGetResetAt({
services: setupServicesForSuperSourceConnector(processorServices),
connectionId: rootNodeConnectionId,
source: superSource,
})
if (resetAt !== null) {
await job.moveToDelayed(resetAt.getTime(), token)
throw new DelayedError('Rate limit exceeded, retry later')
}
try {
await SystemImportsSuperRootNode(processorServices, {
superSource,
superRootNodeType,
organizationId,
rootNodeId,
rootNodeConnectionId,
progress,
firstRun,
job,
})
} catch (error) {
if (error instanceof RateLimitExceededError) {
await job.moveToDelayed(Date.now() + error.retryAfterMs, token)
throw new DelayedError('Rate limit exceeded, retry later')
}
throw error
}
},
})
}type SuperIndexPayload = {
superSource: SuperSource
superRootNodeType: SuperRootNodeType
superIndexableNodeType: SuperIndexableNodeType
organizationId: string
rootNodeId: string
rootNodeConnectionId: string
nodeId: string
node: SuperSourceIndexableNodeWithFragmentId
synchronizationDate?: string
}
export function makeSuperIndexWorkerAndQueue(
services: QueueServiceDependencies,
tier: SuperSourceTier
) {
const tierConfig = SUPER_SOURCE_TIER_CONFIG[tier]
return makeSliteWorkerAndQueue<void, SuperIndexPayload>({
queueName: `IndexSuperSourceNode${tier}`,
factoryOptions: {
defaultQueueOptions: {
unique: {
byKeys: ['organizationId', 'rootNodeConnectionId', 'rootNodeId', 'nodeId'],
},
removeOnFail: false,
retryStrategy: {
attempts: 5,
backoff: { type: 'custom' },
},
group: { byKeys: ['organizationId'] },
},
defaultWorkerOptions: {
group: {
concurrency: tierConfig.indexConcurrency,
},
settings: {
backoffStrategy: externalServiceBackoffStrategy,
},
},
},
async processor(processorServices, job, token) {
const {
superSource,
superRootNodeType,
superIndexableNodeType,
synchronizationDate: synchronizationDateStr,
rootNodeId,
rootNodeConnectionId,
node,
organizationId,
} = job.data
const synchronizationDate = synchronizationDateStr
? new Date(synchronizationDateStr)
: undefined
// Check rate limit
const resetAt = await checkRateLimitAndGetResetAt({
services: setupServicesForSuperSourceConnector(processorServices),
connectionId: rootNodeConnectionId,
source: superSource,
})
if (resetAt) {
await job.moveToDelayed(resetAt.getTime(), token)
throw new DelayedError('Rate limit exceeded, retry later')
}
try {
const reindexed = await SystemIndexesSuperIndexableNode(processorServices, {
superSource,
superIndexableNodeType,
organizationId,
rootNodeId,
connectionId: rootNodeConnectionId,
node,
synchronizationDate,
})
if (reindexed) {
processorServices.metrics.increment(
`super.${slugifySuperSource(superSource)}.indexation`
)
} else {
processorServices.metrics.increment(
`super.${slugifySuperSource(superSource)}.already_indexed`
)
}
} catch (error) {
if (error instanceof RateLimitExceededError) {
await job.moveToDelayed(Date.now() + error.retryAfterMs, token)
throw new DelayedError('Rate limit exceeded, retry later')
}
throw error
}
},
})
}type SuperWebhookPayload = {
superSource: SuperSource
webhookPath: string | undefined
data: SuperSourceWebhookOnReceiveOutput
}
export function makeSuperWebhookWorkerAndQueue(
services: QueueServiceDependencies,
tier: SuperSourceTier
) {
const tierConfig = SUPER_SOURCE_TIER_CONFIG[tier]
return makeSliteWorkerAndQueue<void, SuperWebhookPayload>({
queueName: `HandleSuperWebhook${tier}`,
factoryOptions: {
defaultQueueOptions: {
removeOnFail: false,
retryStrategy: {
attempts: 5,
backoff: { type: 'custom' },
},
},
defaultWorkerOptions: {
concurrency: tierConfig.webhookConcurrency,
settings: {
backoffStrategy: externalServiceBackoffStrategy,
},
},
},
async processor(processorServices, job, token) {
const { superSource, webhookPath, data } = job.data
const superSourceConfig = SuperSources[superSource]
const webhook = superSourceConfig?.webhooks?.find(w => w.path === webhookPath)
if (!webhook?.processor) {
processorServices.logger.warn(
`No webhook processor found for ${superSource}/${webhookPath}`
)
return
}
try {
const indexationParamsToSchedule = await webhook.processor(
setupServicesForSuperSourceConnector(processorServices),
data
)
if (indexationParamsToSchedule) {
await scheduleNodeIndexations(processorServices, {
indexationParamsToSchedule,
superSource,
})
}
} catch (error) {
if (error instanceof RateLimitExceededError) {
await job.moveToDelayed(Date.now() + error.retryAfterMs, token)
throw new DelayedError('Rate limit exceeded, retry later')
}
throw error
}
},
})
}// src/services/superQueues.ts
export function setupSuperQueues(services: QueueServiceDependencies) {
// Create queues for each tier
const importFast = makeSuperImportWorkerAndQueue(services, SuperSourceTier.FAST)
const importSlow = makeSuperImportWorkerAndQueue(services, SuperSourceTier.SLOW)
const indexFast = makeSuperIndexWorkerAndQueue(services, SuperSourceTier.FAST)
const indexSlow = makeSuperIndexWorkerAndQueue(services, SuperSourceTier.SLOW)
const webhookFast = makeSuperWebhookWorkerAndQueue(services, SuperSourceTier.FAST)
const webhookSlow = makeSuperWebhookWorkerAndQueue(services, SuperSourceTier.SLOW)
return {
import: {
[SuperSourceTier.FAST]: importFast,
[SuperSourceTier.SLOW]: importSlow,
},
index: {
[SuperSourceTier.FAST]: indexFast,
[SuperSourceTier.SLOW]: indexSlow,
},
webhook: {
[SuperSourceTier.FAST]: webhookFast,
[SuperSourceTier.SLOW]: webhookSlow,
},
// Helper to get the right queue for a source
getImportQueue(source: SuperSource) {
return this.import[getTierForSource(source)]
},
getIndexQueue(source: SuperSource) {
return this.index[getTierForSource(source)]
},
getWebhookQueue(source: SuperSource) {
return this.webhook[getTierForSource(source)]
},
}
}// When scheduling an import
const tier = getTierForSource(superSource)
await services.superQueues.import[tier].queue.schedule({
superSource,
superRootNodeType,
organizationId,
rootNodeId,
rootNodeConnectionId,
firstRun,
})
// Or using the helper
await services.superQueues.getImportQueue(superSource).queue.schedule({
superSource,
superRootNodeType,
organizationId,
rootNodeId,
rootNodeConnectionId,
firstRun,
})- Create the tier configuration
- Add the 6 new unified queues alongside existing ones
- Deploy and verify queues are created
- Update scheduling code to write to both old and new queues
- Monitor both sets of queues
- Verify jobs complete successfully in new queues
- Stop writing to old queues
- Wait for old queues to drain
- Monitor new queues exclusively
- Remove old queue definitions
- Remove old worker code
- Update documentation
| Metric | Before | After |
|---|---|---|
| Number of queues | 20+ | 6 |
| Number of workers | 20+ | 6 |
| Memory overhead | High | Low |
| Adding new source | New workers | Assign to tier |
| Per-source concurrency | Yes | Yes (via tiers) |
| Per-org isolation | Yes | Yes (via groups) |
| Monitoring complexity | High | Low |
-
Add the source to
SUPER_SOURCE_TO_TIER:[SuperSource.NewSource]: SuperSourceTier.SLOW, // or FAST
-
That's it. No new queues or workers needed.
- Add a MEDIUM tier if needed for sources with moderate rate limits
- Dynamic tier assignment based on runtime metrics
- Per-source rate limiting using BullMQ Pro's
setGroupRateLimitif needed