fix: opti cleanup geodata
All checks were successful
/ build (map[dockerfile:./services/api/Dockerfile name:api]) (push) Successful in 2m41s
/ build (map[dockerfile:./services/web/Dockerfile name:web]) (push) Successful in 1m50s
/ build (map[dockerfile:./services/tasks/Dockerfile name:tasks]) (push) Successful in 2m24s
/ build (map[dockerfile:./services/app/Dockerfile name:app]) (push) Successful in 45s
/ build (map[dockerfile:./services/hasura/Dockerfile name:hasura]) (push) Successful in 1m3s
/ build (map[dockerfile:./services/watchers/Dockerfile name:watchers]) (push) Successful in 2m10s
/ build (map[dockerfile:./services/files/Dockerfile name:files]) (push) Successful in 2m31s
/ deploy (push) Successful in 14s
All checks were successful
/ build (map[dockerfile:./services/api/Dockerfile name:api]) (push) Successful in 2m41s
/ build (map[dockerfile:./services/web/Dockerfile name:web]) (push) Successful in 1m50s
/ build (map[dockerfile:./services/tasks/Dockerfile name:tasks]) (push) Successful in 2m24s
/ build (map[dockerfile:./services/app/Dockerfile name:app]) (push) Successful in 45s
/ build (map[dockerfile:./services/hasura/Dockerfile name:hasura]) (push) Successful in 1m3s
/ build (map[dockerfile:./services/watchers/Dockerfile name:watchers]) (push) Successful in 2m10s
/ build (map[dockerfile:./services/files/Dockerfile name:files]) (push) Successful in 2m31s
/ deploy (push) Successful in 14s
This commit is contained in:
parent
dc351b2ed6
commit
f63766314b
1 changed files with 55 additions and 103 deletions
|
@ -21,61 +21,13 @@ module.exports = async function () {
|
||||||
return async function geodataCleanupCron() {
|
return async function geodataCleanupCron() {
|
||||||
logger.info("watcher geodataCleanupCron: daemon started")
|
logger.info("watcher geodataCleanupCron: daemon started")
|
||||||
|
|
||||||
// this is temporary function (fixing actual data)
|
// Process keys in batches to avoid memory accumulation
|
||||||
async function cleanupOrphanedHotGeodata() {
|
|
||||||
// Get all devices from hot storage
|
|
||||||
const hotDevices = new Set()
|
|
||||||
let hotCursor = "0"
|
|
||||||
do {
|
|
||||||
// Use zscan to iterate through the sorted set
|
|
||||||
const [newCursor, items] = await redisHot.zscan(
|
|
||||||
HOTGEODATA_KEY,
|
|
||||||
hotCursor,
|
|
||||||
"COUNT",
|
|
||||||
"100"
|
|
||||||
)
|
|
||||||
hotCursor = newCursor
|
|
||||||
|
|
||||||
// Extract device IDs (every other item in the result is a score)
|
|
||||||
for (let i = 0; i < items.length; i += 2) {
|
|
||||||
hotDevices.add(items[i])
|
|
||||||
}
|
|
||||||
} while (hotCursor !== "0")
|
|
||||||
|
|
||||||
// Process each hot device
|
|
||||||
await async.eachLimit(
|
|
||||||
[...hotDevices],
|
|
||||||
MAX_PARALLEL_PROCESS,
|
|
||||||
async (deviceId) => {
|
|
||||||
try {
|
|
||||||
// Check if device exists in cold storage
|
|
||||||
const coldKey = `${COLDGEODATA_DEVICE_KEY_PREFIX}${deviceId}`
|
|
||||||
const exists = await redisCold.exists(coldKey)
|
|
||||||
|
|
||||||
// If device doesn't exist in cold storage, remove it from hot storage
|
|
||||||
if (!exists) {
|
|
||||||
await redisHot.zrem(HOTGEODATA_KEY, deviceId)
|
|
||||||
logger.debug(
|
|
||||||
{ deviceId },
|
|
||||||
"Removed orphaned device data from hot storage (not found in cold storage)"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
{ error, deviceId },
|
|
||||||
"Error checking orphaned device data"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO optimize by removing memory accumulation (cursor iteration to make it scalable)
|
|
||||||
async function cleanupOldGeodata() {
|
async function cleanupOldGeodata() {
|
||||||
const now = Math.floor(Date.now() / 1000) // Current time in seconds
|
const now = Math.floor(Date.now() / 1000) // Current time in seconds
|
||||||
const coldKeys = new Set() // Store cold geodata keys
|
|
||||||
let coldCursor = "0"
|
let coldCursor = "0"
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
// Get batch of keys using SCAN
|
||||||
const [newCursor, keys] = await redisCold.scan(
|
const [newCursor, keys] = await redisCold.scan(
|
||||||
coldCursor,
|
coldCursor,
|
||||||
"MATCH",
|
"MATCH",
|
||||||
|
@ -84,77 +36,77 @@ module.exports = async function () {
|
||||||
"100"
|
"100"
|
||||||
)
|
)
|
||||||
coldCursor = newCursor
|
coldCursor = newCursor
|
||||||
keys.forEach((key) => coldKeys.add(key))
|
|
||||||
} while (coldCursor !== "0")
|
|
||||||
|
|
||||||
await async.eachLimit(
|
// Process this batch of keys immediately
|
||||||
[...coldKeys],
|
if (keys.length > 0) {
|
||||||
MAX_PARALLEL_PROCESS,
|
await async.eachLimit(keys, MAX_PARALLEL_PROCESS, async (key) => {
|
||||||
async (key) => {
|
const deviceId = key.slice(COLDGEODATA_DEVICE_KEY_PREFIX.length)
|
||||||
const deviceId = key.slice(COLDGEODATA_DEVICE_KEY_PREFIX.length)
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Get device data from cold storage
|
// Get device data from cold storage
|
||||||
const deviceData = await redisCold.get(key)
|
const deviceData = await redisCold.get(key)
|
||||||
if (!deviceData) {
|
if (!deviceData) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse stored JSON to get `updatedAt`
|
// Parse stored JSON to get `updatedAt`
|
||||||
const data = JSON.parse(deviceData)
|
const data = JSON.parse(deviceData)
|
||||||
|
|
||||||
// const age = now - data.updatedAt
|
// const age = now - data.updatedAt
|
||||||
const age = data.updatedAt ? now - data.updatedAt : Infinity // trick is for missing updatedAt field on old entries
|
const age = data.updatedAt ? now - data.updatedAt : Infinity // trick is for missing updatedAt field on old entries
|
||||||
|
|
||||||
// If data is older than maxAge
|
// If data is older than maxAge
|
||||||
if (age > maxAge) {
|
if (age > maxAge) {
|
||||||
try {
|
|
||||||
// Remove from hot storage
|
|
||||||
await redisHot.zrem(HOTGEODATA_KEY, deviceId)
|
|
||||||
|
|
||||||
// Move to cleaned prefix in cold storage
|
|
||||||
const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}`
|
|
||||||
await redisCold.rename(key, oldKey)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
{ deviceId, age: `${Math.floor(age / 3600)}h` },
|
|
||||||
"Removed old device data from hot storage and marked as cleaned in cold storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Enqueue task to notify user about lost background geolocation
|
|
||||||
try {
|
try {
|
||||||
await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, {
|
// Remove from hot storage
|
||||||
deviceId,
|
await redisHot.zrem(HOTGEODATA_KEY, deviceId)
|
||||||
})
|
|
||||||
|
|
||||||
logger.info(
|
// Move to cleaned prefix in cold storage
|
||||||
{ deviceId },
|
const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}`
|
||||||
"Enqueued background geolocation lost notification task"
|
await redisCold.rename(key, oldKey)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
{ deviceId, age: `${Math.floor(age / 3600)}h` },
|
||||||
|
"Removed old device data from hot storage and marked as cleaned in cold storage"
|
||||||
)
|
)
|
||||||
} catch (notifError) {
|
|
||||||
|
// Enqueue task to notify user about lost background geolocation
|
||||||
|
try {
|
||||||
|
await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, {
|
||||||
|
deviceId,
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
{ deviceId },
|
||||||
|
"Enqueued background geolocation lost notification task"
|
||||||
|
)
|
||||||
|
} catch (notifError) {
|
||||||
|
logger.error(
|
||||||
|
{ deviceId, error: notifError },
|
||||||
|
"Error enqueueing background geolocation lost notification task"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
{ deviceId, error: notifError },
|
{ error, deviceId },
|
||||||
"Error enqueueing background geolocation lost notification task"
|
"Error cleaning device data"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
} catch (error) {
|
|
||||||
logger.error({ error, deviceId }, "Error cleaning device data")
|
|
||||||
}
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
{ error, key },
|
||||||
|
"Error processing device data from cold storage"
|
||||||
|
)
|
||||||
}
|
}
|
||||||
} catch (error) {
|
})
|
||||||
logger.error(
|
|
||||||
{ error, key },
|
|
||||||
"Error processing device data from cold storage"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
)
|
} while (coldCursor !== "0")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule both cleanup functions to run periodically
|
// Schedule both cleanup functions to run periodically
|
||||||
cron.schedule(CLEANUP_CRON, async () => {
|
cron.schedule(CLEANUP_CRON, async () => {
|
||||||
await cleanupOldGeodata()
|
await cleanupOldGeodata()
|
||||||
await cleanupOrphanedHotGeodata()
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue