diff --git a/services/watchers/src/watchers/geodata-cleanup-cron.js b/services/watchers/src/watchers/geodata-cleanup-cron.js index 66cbe62..5a034e9 100644 --- a/services/watchers/src/watchers/geodata-cleanup-cron.js +++ b/services/watchers/src/watchers/geodata-cleanup-cron.js @@ -21,61 +21,13 @@ module.exports = async function () { return async function geodataCleanupCron() { logger.info("watcher geodataCleanupCron: daemon started") - // this is temporary function (fixing actual data) - async function cleanupOrphanedHotGeodata() { - // Get all devices from hot storage - const hotDevices = new Set() - let hotCursor = "0" - do { - // Use zscan to iterate through the sorted set - const [newCursor, items] = await redisHot.zscan( - HOTGEODATA_KEY, - hotCursor, - "COUNT", - "100" - ) - hotCursor = newCursor - - // Extract device IDs (every other item in the result is a score) - for (let i = 0; i < items.length; i += 2) { - hotDevices.add(items[i]) - } - } while (hotCursor !== "0") - - // Process each hot device - await async.eachLimit( - [...hotDevices], - MAX_PARALLEL_PROCESS, - async (deviceId) => { - try { - // Check if device exists in cold storage - const coldKey = `${COLDGEODATA_DEVICE_KEY_PREFIX}${deviceId}` - const exists = await redisCold.exists(coldKey) - - // If device doesn't exist in cold storage, remove it from hot storage - if (!exists) { - await redisHot.zrem(HOTGEODATA_KEY, deviceId) - logger.debug( - { deviceId }, - "Removed orphaned device data from hot storage (not found in cold storage)" - ) - } - } catch (error) { - logger.error( - { error, deviceId }, - "Error checking orphaned device data" - ) - } - } - ) - } - - // TODO optimize by removing memory accumulation (cursor iteration to make it scalable) + // Process keys in batches to avoid memory accumulation async function cleanupOldGeodata() { const now = Math.floor(Date.now() / 1000) // Current time in seconds - const coldKeys = new Set() // Store cold geodata keys let coldCursor = "0" + do { + // Get batch of keys using SCAN const [newCursor, keys] = await redisCold.scan( coldCursor, "MATCH", @@ -84,77 +36,77 @@ module.exports = async function () { "100" ) coldCursor = newCursor - keys.forEach((key) => coldKeys.add(key)) - } while (coldCursor !== "0") - await async.eachLimit( - [...coldKeys], - MAX_PARALLEL_PROCESS, - async (key) => { - const deviceId = key.slice(COLDGEODATA_DEVICE_KEY_PREFIX.length) + // Process this batch of keys immediately + if (keys.length > 0) { + await async.eachLimit(keys, MAX_PARALLEL_PROCESS, async (key) => { + const deviceId = key.slice(COLDGEODATA_DEVICE_KEY_PREFIX.length) - try { - // Get device data from cold storage - const deviceData = await redisCold.get(key) - if (!deviceData) { - return - } + try { + // Get device data from cold storage + const deviceData = await redisCold.get(key) + if (!deviceData) { + return + } - // Parse stored JSON to get `updatedAt` - const data = JSON.parse(deviceData) + // Parse stored JSON to get `updatedAt` + const data = JSON.parse(deviceData) - // const age = now - data.updatedAt - const age = data.updatedAt ? now - data.updatedAt : Infinity // trick is for missing updatedAt field on old entries + // const age = now - data.updatedAt + const age = data.updatedAt ? now - data.updatedAt : Infinity // trick is for missing updatedAt field on old entries - // If data is older than maxAge - if (age > maxAge) { - try { - // Remove from hot storage - await redisHot.zrem(HOTGEODATA_KEY, deviceId) - - // Move to cleaned prefix in cold storage - const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}` - await redisCold.rename(key, oldKey) - - logger.debug( - { deviceId, age: `${Math.floor(age / 3600)}h` }, - "Removed old device data from hot storage and marked as cleaned in cold storage" - ) - - // Enqueue task to notify user about lost background geolocation + // If data is older than maxAge + if (age > maxAge) { try { - await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, { - deviceId, - }) + // Remove from hot storage + await redisHot.zrem(HOTGEODATA_KEY, deviceId) - logger.info( - { deviceId }, - "Enqueued background geolocation lost notification task" + // Move to cleaned prefix in cold storage + const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}` + await redisCold.rename(key, oldKey) + + logger.debug( + { deviceId, age: `${Math.floor(age / 3600)}h` }, + "Removed old device data from hot storage and marked as cleaned in cold storage" ) - } catch (notifError) { + + // Enqueue task to notify user about lost background geolocation + try { + await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, { + deviceId, + }) + + logger.info( + { deviceId }, + "Enqueued background geolocation lost notification task" + ) + } catch (notifError) { + logger.error( + { deviceId, error: notifError }, + "Error enqueueing background geolocation lost notification task" + ) + } + } catch (error) { logger.error( - { deviceId, error: notifError }, - "Error enqueueing background geolocation lost notification task" + { error, deviceId }, + "Error cleaning device data" ) } - } catch (error) { - logger.error({ error, deviceId }, "Error cleaning device data") } + } catch (error) { + logger.error( + { error, key }, + "Error processing device data from cold storage" + ) } - } catch (error) { - logger.error( - { error, key }, - "Error processing device data from cold storage" - ) - } + }) } - ) + } while (coldCursor !== "0") } // Schedule both cleanup functions to run periodically cron.schedule(CLEANUP_CRON, async () => { await cleanupOldGeodata() - await cleanupOrphanedHotGeodata() }) } }