From 74430f63bac6c9be99ee41e8e9296156a401c79b Mon Sep 17 00:00:00 2001 From: devthejo Date: Wed, 23 Jul 2025 14:01:57 +0200 Subject: [PATCH] fix: geodata cleanup cron decouple notification and cleanup --- services/watchers/src/constants/time.js | 3 +- .../src/watchers/geodata-cleanup-cron.js | 91 ++++++++++++------- 2 files changed, 59 insertions(+), 35 deletions(-) diff --git a/services/watchers/src/constants/time.js b/services/watchers/src/constants/time.js index 94c5006..30a27ba 100644 --- a/services/watchers/src/constants/time.js +++ b/services/watchers/src/constants/time.js @@ -8,7 +8,8 @@ module.exports = { SCAN_AUTO_CLOSE_CRON: "15 * * * *", // At minute 15 SCAN_AUTO_ARCHIVE_CRON: "0 4 * * *", // At 4:00 RELATIVE_UNREGISTERED_RECONCILIATION_CRON: "0 4 * * *", // At 4:00 - DEVICE_GEODATA_MAX_AGE: "36 hours", // Maximum age before removal from hot storage + DEVICE_GEODATA_NOTIFICATION_AGE: "36 hours", // When to send push notification + DEVICE_GEODATA_CLEANUP_AGE: "48 hours", // When to remove/clean data } // cheat on https://crontab.guru/ diff --git a/services/watchers/src/watchers/geodata-cleanup-cron.js b/services/watchers/src/watchers/geodata-cleanup-cron.js index 30d4a16..58441c0 100644 --- a/services/watchers/src/watchers/geodata-cleanup-cron.js +++ b/services/watchers/src/watchers/geodata-cleanup-cron.js @@ -2,15 +2,20 @@ const async = require("async") const { ctx } = require("@modjo/core") const ms = require("ms") const cron = require("~/libs/cron") -const { DEVICE_GEODATA_MAX_AGE } = require("~/constants/time") +const { + DEVICE_GEODATA_NOTIFICATION_AGE, + DEVICE_GEODATA_CLEANUP_AGE, +} = require("~/constants/time") const tasks = require("~/tasks") const CLEANUP_CRON = "0 9-19 * * *" // Run every hour from 9h to 19h const MAX_PARALLEL_PROCESS = 10 const COLDGEODATA_DEVICE_KEY_PREFIX = "device:geodata:" const COLDGEODATA_OLD_KEY_PREFIX = "old:device:geodata:" +const COLDGEODATA_NOTIFIED_KEY_PREFIX = "notified:device:geodata:" const HOTGEODATA_KEY = "device" // The key where hot geodata is stored -const maxAge = Math.floor(ms(DEVICE_GEODATA_MAX_AGE) / 1000) // Convert to seconds +const notificationAge = Math.floor(ms(DEVICE_GEODATA_NOTIFICATION_AGE) / 1000) // Convert to seconds +const cleanupAge = Math.floor(ms(DEVICE_GEODATA_CLEANUP_AGE) / 1000) // Convert to seconds module.exports = async function () { const logger = ctx.require("logger") @@ -21,8 +26,8 @@ module.exports = async function () { return async function geodataCleanupCron() { logger.info("watcher geodataCleanupCron: daemon started") - // Process keys in batches to avoid memory accumulation - async function cleanupOldGeodata() { + // Process geodata cleanup with single loop for both notifications and cleanup + async function processGeodataCleanup() { const now = Math.floor(Date.now() / 1000) // Current time in seconds let coldCursor = "0" @@ -51,41 +56,27 @@ module.exports = async function () { // Parse stored JSON to get `updatedAt` const data = JSON.parse(deviceData) + const age = data.updatedAt ? now - data.updatedAt : Infinity - // const age = now - data.updatedAt - const age = data.updatedAt ? now - data.updatedAt : Infinity // trick is for missing updatedAt field on old entries - - // If data is older than maxAge - if (age > maxAge) { + // Handle cleanup first (48h+) - this takes priority + if (age > cleanupAge) { try { // Remove from hot storage await redisHot.zrem(HOTGEODATA_KEY, deviceId) - // Move to cleaned prefix in cold storage + // Move to cleaned prefix in cold storage and clean notification flag atomically const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}` - await redisCold.rename(key, oldKey) + const notifiedKey = `${COLDGEODATA_NOTIFIED_KEY_PREFIX}${deviceId}` + + const transaction = redisCold.multi() + transaction.rename(key, oldKey) + transaction.del(notifiedKey) + await transaction.exec() logger.debug( { deviceId, age: `${Math.floor(age / 3600)}h` }, "Removed old device data from hot storage and marked as cleaned in cold storage" ) - - // Enqueue task to notify user about lost background geolocation - try { - await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, { - deviceId, - }) - - logger.info( - { deviceId }, - "Enqueued background geolocation lost notification task" - ) - } catch (notifError) { - logger.error( - { deviceId, error: notifError }, - "Error enqueueing background geolocation lost notification task" - ) - } } catch (error) { logger.error( { error, deviceId }, @@ -93,11 +84,43 @@ module.exports = async function () { ) } } + // Handle notification (36h+ but less than 48h) + else if (age > notificationAge) { + const notifiedKey = `${COLDGEODATA_NOTIFIED_KEY_PREFIX}${deviceId}` + + try { + // Check if we've already notified for this device + const alreadyNotified = await redisCold.exists(notifiedKey) + + if (!alreadyNotified) { + // Enqueue task to notify user about lost background geolocation + try { + await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, { + deviceId, + }) + + logger.info( + { deviceId, age: `${Math.floor(age / 3600)}h` }, + "Enqueued background geolocation lost notification task" + ) + } catch (notifError) { + logger.error( + { deviceId, error: notifError }, + "Error enqueueing background geolocation lost notification task" + ) + } + // Mark as notified with 48h expiry (cleanup age) + await redisCold.set(notifiedKey, "1", "EX", cleanupAge) + } + } catch (error) { + logger.error( + { error, deviceId }, + "Error processing notification for device" + ) + } + } } catch (error) { - logger.error( - { error, key }, - "Error processing device data from cold storage" - ) + logger.error({ error, key }, "Error processing device data") } }) } @@ -153,9 +176,9 @@ module.exports = async function () { // ) // } - // Schedule both cleanup functions to run periodically + // Schedule cleanup function to run periodically cron.schedule(CLEANUP_CRON, async () => { - await cleanupOldGeodata() + await processGeodataCleanup() // await cleanupOrphanedHotGeodata() }) }