diff --git a/services/watchers/src/watchers/geodata-cleanup-cron.js b/services/watchers/src/watchers/geodata-cleanup-cron.js index 5a034e9..68c5f3b 100644 --- a/services/watchers/src/watchers/geodata-cleanup-cron.js +++ b/services/watchers/src/watchers/geodata-cleanup-cron.js @@ -104,9 +104,59 @@ module.exports = async function () { } while (coldCursor !== "0") } + // this is temporary function (fixing actual data) + async function cleanupOrphanedHotGeodata() { + // Get all devices from hot storage + const hotDevices = new Set() + let hotCursor = "0" + do { + // Use zscan to iterate through the sorted set + const [newCursor, items] = await redisHot.zscan( + HOTGEODATA_KEY, + hotCursor, + "COUNT", + "100" + ) + hotCursor = newCursor + + // Extract device IDs (every other item in the result is a score) + for (let i = 0; i < items.length; i += 2) { + hotDevices.add(items[i]) + } + } while (hotCursor !== "0") + + // Process each hot device + await async.eachLimit( + [...hotDevices], + MAX_PARALLEL_PROCESS, + async (deviceId) => { + try { + // Check if device exists in cold storage + const coldKey = `${COLDGEODATA_DEVICE_KEY_PREFIX}${deviceId}` + const exists = await redisCold.exists(coldKey) + + // If device doesn't exist in cold storage, remove it from hot storage + if (!exists) { + await redisHot.zrem(HOTGEODATA_KEY, deviceId) + logger.debug( + { deviceId }, + "Removed orphaned device data from hot storage (not found in cold storage)" + ) + } + } catch (error) { + logger.error( + { error, deviceId }, + "Error checking orphaned device data" + ) + } + } + ) + } + // Schedule both cleanup functions to run periodically cron.schedule(CLEANUP_CRON, async () => { await cleanupOldGeodata() + await cleanupOrphanedHotGeodata() }) } }