Some checks failed
/ build (map[dockerfile:./services/tasks/Dockerfile name:tasks]) (push) Failing after 1m25s
/ deploy (push) Has been skipped
/ build (map[dockerfile:./services/files/Dockerfile name:files]) (push) Failing after 1m34s
/ build (map[dockerfile:./services/app/Dockerfile name:app]) (push) Successful in 2m9s
/ build (map[dockerfile:./services/api/Dockerfile name:api]) (push) Failing after 1m42s
/ build (map[dockerfile:./services/web/Dockerfile name:web]) (push) Failing after 1m18s
/ build (map[dockerfile:./services/watchers/Dockerfile name:watchers]) (push) Failing after 1m22s
/ build (map[dockerfile:./services/hasura/Dockerfile name:hasura]) (push) Successful in 2m3s
104 lines
2.3 KiB
JavaScript
104 lines
2.3 KiB
JavaScript
const { setTimeout: sleep } = require("timers/promises")
|
|
const murmurhash = require("murmurhash").v3
|
|
|
|
const deepmerge = require("@modjo/core/utils/object/deepmerge")
|
|
const { ctx } = require("@modjo/core")
|
|
|
|
/*
|
|
Goals:
|
|
- allow watchers to scale
|
|
- allow worker retry while interrupted without duplication
|
|
|
|
TODO:
|
|
- add logging
|
|
*/
|
|
|
|
function hashJsonObjectForRedisKey(jsonObject) {
|
|
const serialized = JSON.stringify(jsonObject)
|
|
const hash = murmurhash(serialized).toString(36)
|
|
return hash
|
|
}
|
|
|
|
function getTimestamp() {
|
|
return Math.ceil(Date.now() / 1000)
|
|
}
|
|
|
|
async function recurseDedup(
|
|
queueName,
|
|
data,
|
|
handler,
|
|
factoryOptions,
|
|
hash,
|
|
retryCount = 0
|
|
) {
|
|
const defaultOptions = {
|
|
enabled: false,
|
|
okTTL: 900,
|
|
waitTTL: 600,
|
|
delayMargin: 5,
|
|
retryLoop: false,
|
|
retryLimit: 10,
|
|
}
|
|
const handlerOptions = handler.dedupOptions || {}
|
|
const options = deepmerge(defaultOptions, factoryOptions, handlerOptions)
|
|
|
|
const { enabled, okTTL, waitTTL, delayMargin, retryLoop, retryLimit } =
|
|
options
|
|
|
|
if (!enabled) {
|
|
return handler(data)
|
|
}
|
|
|
|
const baseKey = `qd:${queueName}:${hash}`
|
|
const keyGo = `${baseKey}:go`
|
|
const keyOK = `${baseKey}:ok`
|
|
|
|
// console.log("baseKey", baseKey)
|
|
|
|
const redis = ctx.require("redisQueueDedup")
|
|
|
|
const keyOKExists = await redis.exists(keyOK)
|
|
if (keyOKExists) {
|
|
return null
|
|
}
|
|
|
|
const inserted = await redis.set(keyGo, getTimestamp(), "EX", waitTTL, "NX")
|
|
if (!inserted) {
|
|
const startedTime = await redis.get(keyGo)
|
|
const expires = parseInt(startedTime, 10) + waitTTL
|
|
const delay = expires * 1000 - Date.now()
|
|
await sleep(delay + delayMargin * 1000)
|
|
if (retryLoop && retryCount <= retryLimit) {
|
|
return recurseDedup(
|
|
queueName,
|
|
data,
|
|
handler,
|
|
options,
|
|
hash,
|
|
retryCount + 1
|
|
)
|
|
}
|
|
return false
|
|
}
|
|
|
|
const res = await handler(data)
|
|
|
|
await redis
|
|
.pipeline()
|
|
.set(keyOK, getTimestamp(), "EX", okTTL)
|
|
.del(keyGo)
|
|
.exec()
|
|
|
|
return res
|
|
}
|
|
|
|
async function runWithDedup(queueName, data, handler, options = {}) {
|
|
const hash = hashJsonObjectForRedisKey(data)
|
|
return recurseDedup(queueName, data, handler, options, hash)
|
|
}
|
|
|
|
module.exports = function redisQueueDedupFactory(handler, q, options) {
|
|
return async (data) => {
|
|
return runWithDedup(q, data, handler, options)
|
|
}
|
|
}
|