fix: geodata cleanup cron decouple notification and cleanup
This commit is contained in:
		
							parent
							
								
									a0cbf8ff4b
								
							
						
					
					
						commit
						74430f63ba
					
				
					 2 changed files with 59 additions and 35 deletions
				
			
		|  | @ -8,7 +8,8 @@ module.exports = { | ||||||
|   SCAN_AUTO_CLOSE_CRON: "15 * * * *", // At minute 15
 |   SCAN_AUTO_CLOSE_CRON: "15 * * * *", // At minute 15
 | ||||||
|   SCAN_AUTO_ARCHIVE_CRON: "0 4 * * *", // At 4:00
 |   SCAN_AUTO_ARCHIVE_CRON: "0 4 * * *", // At 4:00
 | ||||||
|   RELATIVE_UNREGISTERED_RECONCILIATION_CRON: "0 4 * * *", // At 4:00
 |   RELATIVE_UNREGISTERED_RECONCILIATION_CRON: "0 4 * * *", // At 4:00
 | ||||||
|   DEVICE_GEODATA_MAX_AGE: "36 hours", // Maximum age before removal from hot storage
 |   DEVICE_GEODATA_NOTIFICATION_AGE: "36 hours", // When to send push notification
 | ||||||
|  |   DEVICE_GEODATA_CLEANUP_AGE: "48 hours", // When to remove/clean data
 | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // cheat on https://crontab.guru/
 | // cheat on https://crontab.guru/
 | ||||||
|  |  | ||||||
|  | @ -2,15 +2,20 @@ const async = require("async") | ||||||
| const { ctx } = require("@modjo/core") | const { ctx } = require("@modjo/core") | ||||||
| const ms = require("ms") | const ms = require("ms") | ||||||
| const cron = require("~/libs/cron") | const cron = require("~/libs/cron") | ||||||
| const { DEVICE_GEODATA_MAX_AGE } = require("~/constants/time") | const { | ||||||
|  |   DEVICE_GEODATA_NOTIFICATION_AGE, | ||||||
|  |   DEVICE_GEODATA_CLEANUP_AGE, | ||||||
|  | } = require("~/constants/time") | ||||||
| const tasks = require("~/tasks") | const tasks = require("~/tasks") | ||||||
| 
 | 
 | ||||||
| const CLEANUP_CRON = "0 9-19 * * *" // Run every hour from 9h to 19h
 | const CLEANUP_CRON = "0 9-19 * * *" // Run every hour from 9h to 19h
 | ||||||
| const MAX_PARALLEL_PROCESS = 10 | const MAX_PARALLEL_PROCESS = 10 | ||||||
| const COLDGEODATA_DEVICE_KEY_PREFIX = "device:geodata:" | const COLDGEODATA_DEVICE_KEY_PREFIX = "device:geodata:" | ||||||
| const COLDGEODATA_OLD_KEY_PREFIX = "old:device:geodata:" | const COLDGEODATA_OLD_KEY_PREFIX = "old:device:geodata:" | ||||||
|  | const COLDGEODATA_NOTIFIED_KEY_PREFIX = "notified:device:geodata:" | ||||||
| const HOTGEODATA_KEY = "device" // The key where hot geodata is stored
 | const HOTGEODATA_KEY = "device" // The key where hot geodata is stored
 | ||||||
| const maxAge = Math.floor(ms(DEVICE_GEODATA_MAX_AGE) / 1000) // Convert to seconds
 | const notificationAge = Math.floor(ms(DEVICE_GEODATA_NOTIFICATION_AGE) / 1000) // Convert to seconds
 | ||||||
|  | const cleanupAge = Math.floor(ms(DEVICE_GEODATA_CLEANUP_AGE) / 1000) // Convert to seconds
 | ||||||
| 
 | 
 | ||||||
| module.exports = async function () { | module.exports = async function () { | ||||||
|   const logger = ctx.require("logger") |   const logger = ctx.require("logger") | ||||||
|  | @ -21,8 +26,8 @@ module.exports = async function () { | ||||||
|   return async function geodataCleanupCron() { |   return async function geodataCleanupCron() { | ||||||
|     logger.info("watcher geodataCleanupCron: daemon started") |     logger.info("watcher geodataCleanupCron: daemon started") | ||||||
| 
 | 
 | ||||||
|     // Process keys in batches to avoid memory accumulation
 |     // Process geodata cleanup with single loop for both notifications and cleanup
 | ||||||
|     async function cleanupOldGeodata() { |     async function processGeodataCleanup() { | ||||||
|       const now = Math.floor(Date.now() / 1000) // Current time in seconds
 |       const now = Math.floor(Date.now() / 1000) // Current time in seconds
 | ||||||
|       let coldCursor = "0" |       let coldCursor = "0" | ||||||
| 
 | 
 | ||||||
|  | @ -51,25 +56,43 @@ module.exports = async function () { | ||||||
| 
 | 
 | ||||||
|               // Parse stored JSON to get `updatedAt`
 |               // Parse stored JSON to get `updatedAt`
 | ||||||
|               const data = JSON.parse(deviceData) |               const data = JSON.parse(deviceData) | ||||||
|  |               const age = data.updatedAt ? now - data.updatedAt : Infinity | ||||||
| 
 | 
 | ||||||
|               // const age = now - data.updatedAt
 |               // Handle cleanup first (48h+) - this takes priority
 | ||||||
|               const age = data.updatedAt ? now - data.updatedAt : Infinity // trick is for missing updatedAt field on old entries
 |               if (age > cleanupAge) { | ||||||
| 
 |  | ||||||
|               // If data is older than maxAge
 |  | ||||||
|               if (age > maxAge) { |  | ||||||
|                 try { |                 try { | ||||||
|                   // Remove from hot storage
 |                   // Remove from hot storage
 | ||||||
|                   await redisHot.zrem(HOTGEODATA_KEY, deviceId) |                   await redisHot.zrem(HOTGEODATA_KEY, deviceId) | ||||||
| 
 | 
 | ||||||
|                   // Move to cleaned prefix in cold storage
 |                   // Move to cleaned prefix in cold storage and clean notification flag atomically
 | ||||||
|                   const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}` |                   const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}` | ||||||
|                   await redisCold.rename(key, oldKey) |                   const notifiedKey = `${COLDGEODATA_NOTIFIED_KEY_PREFIX}${deviceId}` | ||||||
|  | 
 | ||||||
|  |                   const transaction = redisCold.multi() | ||||||
|  |                   transaction.rename(key, oldKey) | ||||||
|  |                   transaction.del(notifiedKey) | ||||||
|  |                   await transaction.exec() | ||||||
| 
 | 
 | ||||||
|                   logger.debug( |                   logger.debug( | ||||||
|                     { deviceId, age: `${Math.floor(age / 3600)}h` }, |                     { deviceId, age: `${Math.floor(age / 3600)}h` }, | ||||||
|                     "Removed old device data from hot storage and marked as cleaned in cold storage" |                     "Removed old device data from hot storage and marked as cleaned in cold storage" | ||||||
|                   ) |                   ) | ||||||
|  |                 } catch (error) { | ||||||
|  |                   logger.error( | ||||||
|  |                     { error, deviceId }, | ||||||
|  |                     "Error cleaning device data" | ||||||
|  |                   ) | ||||||
|  |                 } | ||||||
|  |               } | ||||||
|  |               // Handle notification (36h+ but less than 48h)
 | ||||||
|  |               else if (age > notificationAge) { | ||||||
|  |                 const notifiedKey = `${COLDGEODATA_NOTIFIED_KEY_PREFIX}${deviceId}` | ||||||
| 
 | 
 | ||||||
|  |                 try { | ||||||
|  |                   // Check if we've already notified for this device
 | ||||||
|  |                   const alreadyNotified = await redisCold.exists(notifiedKey) | ||||||
|  | 
 | ||||||
|  |                   if (!alreadyNotified) { | ||||||
|                     // Enqueue task to notify user about lost background geolocation
 |                     // Enqueue task to notify user about lost background geolocation
 | ||||||
|                     try { |                     try { | ||||||
|                       await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, { |                       await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, { | ||||||
|  | @ -77,7 +100,7 @@ module.exports = async function () { | ||||||
|                       }) |                       }) | ||||||
| 
 | 
 | ||||||
|                       logger.info( |                       logger.info( | ||||||
|                       { deviceId }, |                         { deviceId, age: `${Math.floor(age / 3600)}h` }, | ||||||
|                         "Enqueued background geolocation lost notification task" |                         "Enqueued background geolocation lost notification task" | ||||||
|                       ) |                       ) | ||||||
|                     } catch (notifError) { |                     } catch (notifError) { | ||||||
|  | @ -86,18 +109,18 @@ module.exports = async function () { | ||||||
|                         "Error enqueueing background geolocation lost notification task" |                         "Error enqueueing background geolocation lost notification task" | ||||||
|                       ) |                       ) | ||||||
|                     } |                     } | ||||||
|  |                     // Mark as notified with 48h expiry (cleanup age)
 | ||||||
|  |                     await redisCold.set(notifiedKey, "1", "EX", cleanupAge) | ||||||
|  |                   } | ||||||
|                 } catch (error) { |                 } catch (error) { | ||||||
|                   logger.error( |                   logger.error( | ||||||
|                     { error, deviceId }, |                     { error, deviceId }, | ||||||
|                     "Error cleaning device data" |                     "Error processing notification for device" | ||||||
|                   ) |                   ) | ||||||
|                 } |                 } | ||||||
|               } |               } | ||||||
|             } catch (error) { |             } catch (error) { | ||||||
|               logger.error( |               logger.error({ error, key }, "Error processing device data") | ||||||
|                 { error, key }, |  | ||||||
|                 "Error processing device data from cold storage" |  | ||||||
|               ) |  | ||||||
|             } |             } | ||||||
|           }) |           }) | ||||||
|         } |         } | ||||||
|  | @ -153,9 +176,9 @@ module.exports = async function () { | ||||||
|     //   )
 |     //   )
 | ||||||
|     // }
 |     // }
 | ||||||
| 
 | 
 | ||||||
|     // Schedule both cleanup functions to run periodically
 |     // Schedule cleanup function to run periodically
 | ||||||
|     cron.schedule(CLEANUP_CRON, async () => { |     cron.schedule(CLEANUP_CRON, async () => { | ||||||
|       await cleanupOldGeodata() |       await processGeodataCleanup() | ||||||
|       // await cleanupOrphanedHotGeodata()
 |       // await cleanupOrphanedHotGeodata()
 | ||||||
|     }) |     }) | ||||||
|   } |   } | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue