as-services/libs/redis-queue-dedup/index.js
devthejo 16b7e7d6aa
All checks were successful
/ build (map[dockerfile:./services/hasura/Dockerfile name:hasura]) (push) Successful in 47s
/ build (map[dockerfile:./services/web/Dockerfile name:web]) (push) Successful in 1m47s
/ build (map[dockerfile:./services/watchers/Dockerfile name:watchers]) (push) Successful in 2m37s
/ build (map[dockerfile:./services/files/Dockerfile name:files]) (push) Successful in 2m52s
/ build (map[dockerfile:./services/api/Dockerfile name:api]) (push) Successful in 3m2s
/ build (map[dockerfile:./services/app/Dockerfile name:app]) (push) Successful in 31s
/ build (map[dockerfile:./services/tasks/Dockerfile name:tasks]) (push) Successful in 2m44s
/ deploy (push) Successful in 48s
chore(init): available sources
2025-04-13 10:46:53 +02:00

104 lines
2.3 KiB
JavaScript

const { setTimeout: sleep } = require("timers/promises")
const murmurhash = require("murmurhash").v3
const deepmerge = require("@modjo/core/utils/object/deepmerge")
const { ctx } = require("@modjo/core")
/*
Goals:
- allow watchers to scale
- allow worker retry while interrupted without duplication
TODO:
- add logging
*/
function hashJsonObjectForRedisKey(jsonObject) {
const serialized = JSON.stringify(jsonObject)
const hash = murmurhash(serialized).toString(36)
return hash
}
function getTimestamp() {
return Math.ceil(Date.now() / 1000)
}
async function recurseDedup(
queueName,
data,
handler,
factoryOptions,
hash,
retryCount = 0
) {
const defaultOptions = {
enabled: false,
okTTL: 900,
waitTTL: 600,
delayMargin: 5,
retryLoop: false,
retryLimit: 10,
}
const handlerOptions = handler.dedupOptions || {}
const options = deepmerge(defaultOptions, factoryOptions, handlerOptions)
const { enabled, okTTL, waitTTL, delayMargin, retryLoop, retryLimit } =
options
if (!enabled) {
return handler(data)
}
const baseKey = `qd:${queueName}:${hash}`
const keyGo = `${baseKey}:go`
const keyOK = `${baseKey}:ok`
// console.log("baseKey", baseKey)
const redis = ctx.require("redisQueueDedup")
const keyOKExists = await redis.exists(keyOK)
if (keyOKExists) {
return null
}
const inserted = await redis.set(keyGo, getTimestamp(), "EX", waitTTL, "NX")
if (!inserted) {
const startedTime = await redis.get(keyGo)
const expires = parseInt(startedTime, 10) + waitTTL
const delay = expires * 1000 - Date.now()
await sleep(delay + delayMargin * 1000)
if (retryLoop && retryCount <= retryLimit) {
return recurseDedup(
queueName,
data,
handler,
options,
hash,
retryCount + 1
)
}
return false
}
const res = await handler(data)
await redis
.pipeline()
.set(keyOK, getTimestamp(), "EX", okTTL)
.del(keyGo)
.exec()
return res
}
async function runWithDedup(queueName, data, handler, options = {}) {
const hash = hashJsonObjectForRedisKey(data)
return recurseDedup(queueName, data, handler, options, hash)
}
module.exports = function redisQueueDedupFactory(handler, q, options) {
return async (data) => {
return runWithDedup(q, data, handler, options)
}
}