feat: bg location lost notify + fix cleanupOrphanedHotGeodata
This commit is contained in:
		
							parent
							
								
									f4313ce9ed
								
							
						
					
					
						commit
						dc351b2ed6
					
				
					 5 changed files with 180 additions and 2 deletions
				
			
		|  | @ -0,0 +1 @@ | ||||||
|  | DELETE FROM "public"."enum_notification_type" WHERE "value" = 'background_geolocation_lost'; | ||||||
|  | @ -0,0 +1 @@ | ||||||
|  | INSERT INTO "public"."enum_notification_type"("value") VALUES (E'background_geolocation_lost'); | ||||||
							
								
								
									
										103
									
								
								services/tasks/src/queues/background-geolocation-lost-notify.js
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										103
									
								
								services/tasks/src/queues/background-geolocation-lost-notify.js
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,103 @@ | ||||||
|  | const { ctx } = require("@modjo/core") | ||||||
|  | const { taskCtx } = require("@modjo/microservice-worker/ctx") | ||||||
|  | 
 | ||||||
|  | const addNotification = require("~/services/add-notification") | ||||||
|  | 
 | ||||||
|  | function createBackgroundGeolocationLostNotification() { | ||||||
|  |   return { | ||||||
|  |     data: { | ||||||
|  |       action: "background_geolocation_lost", | ||||||
|  |     }, | ||||||
|  |     notification: { | ||||||
|  |       title: "Localisation en arrière-plan désactivée", | ||||||
|  |       body: "Votre localisation en arrière-plan a été désactivée. Veuillez vérifier les paramètres de l'application.", | ||||||
|  |       channel: "system", | ||||||
|  |       priority: "high", | ||||||
|  |       actionId: "open-settings", | ||||||
|  |     }, | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | module.exports = async function () { | ||||||
|  |   return Object.assign( | ||||||
|  |     async function backgroundGeolocationLostNotify(params) { | ||||||
|  |       const logger = taskCtx.require("logger") | ||||||
|  |       const sql = ctx.require("postgres") | ||||||
|  | 
 | ||||||
|  |       const { deviceId } = params | ||||||
|  | 
 | ||||||
|  |       try { | ||||||
|  |         // Get the user ID associated with this device
 | ||||||
|  |         const userResult = await sql` | ||||||
|  |           SELECT | ||||||
|  |             "user_id" as "userId" | ||||||
|  |           FROM | ||||||
|  |             "device" | ||||||
|  |           WHERE | ||||||
|  |             id = ${deviceId} | ||||||
|  |           ` | ||||||
|  | 
 | ||||||
|  |         if (!userResult || userResult.length === 0) { | ||||||
|  |           logger.warn( | ||||||
|  |             { deviceId }, | ||||||
|  |             "No user found for device when sending background geolocation lost notification" | ||||||
|  |           ) | ||||||
|  |           return | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         const { userId } = userResult[0] | ||||||
|  | 
 | ||||||
|  |         // Get the FCM token for this device
 | ||||||
|  |         const deviceResult = await sql` | ||||||
|  |           SELECT | ||||||
|  |             "fcm_token" as "fcmToken" | ||||||
|  |           FROM | ||||||
|  |             "device" | ||||||
|  |           WHERE | ||||||
|  |             id = ${deviceId} | ||||||
|  |           ` | ||||||
|  | 
 | ||||||
|  |         if (!deviceResult[0]?.fcmToken) { | ||||||
|  |           logger.warn( | ||||||
|  |             { deviceId, userId }, | ||||||
|  |             "No FCM token found for device when sending background geolocation lost notification" | ||||||
|  |           ) | ||||||
|  |           return | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         const { fcmToken } = deviceResult[0] | ||||||
|  | 
 | ||||||
|  |         // Create notification config
 | ||||||
|  |         const notificationConfig = createBackgroundGeolocationLostNotification() | ||||||
|  | 
 | ||||||
|  |         // Send notification
 | ||||||
|  |         const { success } = await addNotification({ | ||||||
|  |           fcmToken, | ||||||
|  |           deviceId, | ||||||
|  |           userId, | ||||||
|  |           type: "background_geolocation_lost", | ||||||
|  |           ...notificationConfig, | ||||||
|  |         }) | ||||||
|  | 
 | ||||||
|  |         if (!success) { | ||||||
|  |           throw new Error( | ||||||
|  |             "Unable to send background geolocation lost notification" | ||||||
|  |           ) | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         logger.info( | ||||||
|  |           { deviceId, userId }, | ||||||
|  |           "Successfully sent background geolocation lost notification" | ||||||
|  |         ) | ||||||
|  |       } catch (error) { | ||||||
|  |         logger.error( | ||||||
|  |           { deviceId, error }, | ||||||
|  |           "Error sending background geolocation lost notification" | ||||||
|  |         ) | ||||||
|  |       } | ||||||
|  |     }, | ||||||
|  |     { | ||||||
|  |       dedupOptions: { enabled: true }, | ||||||
|  |     } | ||||||
|  |   ) | ||||||
|  | } | ||||||
|  | @ -9,4 +9,5 @@ module.exports = { | ||||||
|   RELATIVE_ALLOW_ASK_NOTIFY: "relativeAllowAskNotify", |   RELATIVE_ALLOW_ASK_NOTIFY: "relativeAllowAskNotify", | ||||||
|   RELATIVE_INVITATION_NOTIFY: "relativeInvitationNotify", |   RELATIVE_INVITATION_NOTIFY: "relativeInvitationNotify", | ||||||
|   ALERT_CALL_EMERGENCY_INFO_NOTIFY: "alertCallEmergencyInfoNotify", |   ALERT_CALL_EMERGENCY_INFO_NOTIFY: "alertCallEmergencyInfoNotify", | ||||||
|  |   BACKGROUND_GEOLOCATION_LOST_NOTIFY: "backgroundGeolocationLostNotify", | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -3,6 +3,7 @@ const { ctx } = require("@modjo/core") | ||||||
| const ms = require("ms") | const ms = require("ms") | ||||||
| const cron = require("~/libs/cron") | const cron = require("~/libs/cron") | ||||||
| const { DEVICE_GEODATA_MAX_AGE } = require("~/constants/time") | const { DEVICE_GEODATA_MAX_AGE } = require("~/constants/time") | ||||||
|  | const tasks = require("~/tasks") | ||||||
| 
 | 
 | ||||||
| const CLEANUP_CRON = "0 */1 * * *" // Run every hour
 | const CLEANUP_CRON = "0 */1 * * *" // Run every hour
 | ||||||
| const MAX_PARALLEL_PROCESS = 10 | const MAX_PARALLEL_PROCESS = 10 | ||||||
|  | @ -15,10 +16,61 @@ module.exports = async function () { | ||||||
|   const logger = ctx.require("logger") |   const logger = ctx.require("logger") | ||||||
|   const redisCold = ctx.require("keydbColdGeodata") |   const redisCold = ctx.require("keydbColdGeodata") | ||||||
|   const redisHot = ctx.require("redisHotGeodata") |   const redisHot = ctx.require("redisHotGeodata") | ||||||
|  |   const { addTask } = ctx.require("amqp") | ||||||
| 
 | 
 | ||||||
|   return async function geodataCleanupCron() { |   return async function geodataCleanupCron() { | ||||||
|     logger.info("watcher geodataCleanupCron: daemon started") |     logger.info("watcher geodataCleanupCron: daemon started") | ||||||
| 
 | 
 | ||||||
|  |     // this is temporary function (fixing actual data)
 | ||||||
|  |     async function cleanupOrphanedHotGeodata() { | ||||||
|  |       // Get all devices from hot storage
 | ||||||
|  |       const hotDevices = new Set() | ||||||
|  |       let hotCursor = "0" | ||||||
|  |       do { | ||||||
|  |         // Use zscan to iterate through the sorted set
 | ||||||
|  |         const [newCursor, items] = await redisHot.zscan( | ||||||
|  |           HOTGEODATA_KEY, | ||||||
|  |           hotCursor, | ||||||
|  |           "COUNT", | ||||||
|  |           "100" | ||||||
|  |         ) | ||||||
|  |         hotCursor = newCursor | ||||||
|  | 
 | ||||||
|  |         // Extract device IDs (every other item in the result is a score)
 | ||||||
|  |         for (let i = 0; i < items.length; i += 2) { | ||||||
|  |           hotDevices.add(items[i]) | ||||||
|  |         } | ||||||
|  |       } while (hotCursor !== "0") | ||||||
|  | 
 | ||||||
|  |       // Process each hot device
 | ||||||
|  |       await async.eachLimit( | ||||||
|  |         [...hotDevices], | ||||||
|  |         MAX_PARALLEL_PROCESS, | ||||||
|  |         async (deviceId) => { | ||||||
|  |           try { | ||||||
|  |             // Check if device exists in cold storage
 | ||||||
|  |             const coldKey = `${COLDGEODATA_DEVICE_KEY_PREFIX}${deviceId}` | ||||||
|  |             const exists = await redisCold.exists(coldKey) | ||||||
|  | 
 | ||||||
|  |             // If device doesn't exist in cold storage, remove it from hot storage
 | ||||||
|  |             if (!exists) { | ||||||
|  |               await redisHot.zrem(HOTGEODATA_KEY, deviceId) | ||||||
|  |               logger.debug( | ||||||
|  |                 { deviceId }, | ||||||
|  |                 "Removed orphaned device data from hot storage (not found in cold storage)" | ||||||
|  |               ) | ||||||
|  |             } | ||||||
|  |           } catch (error) { | ||||||
|  |             logger.error( | ||||||
|  |               { error, deviceId }, | ||||||
|  |               "Error checking orphaned device data" | ||||||
|  |             ) | ||||||
|  |           } | ||||||
|  |         } | ||||||
|  |       ) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     // TODO optimize by removing memory accumulation (cursor iteration to make it scalable)
 | ||||||
|     async function cleanupOldGeodata() { |     async function cleanupOldGeodata() { | ||||||
|       const now = Math.floor(Date.now() / 1000) // Current time in seconds
 |       const now = Math.floor(Date.now() / 1000) // Current time in seconds
 | ||||||
|       const coldKeys = new Set() // Store cold geodata keys
 |       const coldKeys = new Set() // Store cold geodata keys
 | ||||||
|  | @ -68,6 +120,23 @@ module.exports = async function () { | ||||||
|                   { deviceId, age: `${Math.floor(age / 3600)}h` }, |                   { deviceId, age: `${Math.floor(age / 3600)}h` }, | ||||||
|                   "Removed old device data from hot storage and marked as cleaned in cold storage" |                   "Removed old device data from hot storage and marked as cleaned in cold storage" | ||||||
|                 ) |                 ) | ||||||
|  | 
 | ||||||
|  |                 // Enqueue task to notify user about lost background geolocation
 | ||||||
|  |                 try { | ||||||
|  |                   await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, { | ||||||
|  |                     deviceId, | ||||||
|  |                   }) | ||||||
|  | 
 | ||||||
|  |                   logger.info( | ||||||
|  |                     { deviceId }, | ||||||
|  |                     "Enqueued background geolocation lost notification task" | ||||||
|  |                   ) | ||||||
|  |                 } catch (notifError) { | ||||||
|  |                   logger.error( | ||||||
|  |                     { deviceId, error: notifError }, | ||||||
|  |                     "Error enqueueing background geolocation lost notification task" | ||||||
|  |                   ) | ||||||
|  |                 } | ||||||
|               } catch (error) { |               } catch (error) { | ||||||
|                 logger.error({ error, deviceId }, "Error cleaning device data") |                 logger.error({ error, deviceId }, "Error cleaning device data") | ||||||
|               } |               } | ||||||
|  | @ -82,7 +151,10 @@ module.exports = async function () { | ||||||
|       ) |       ) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // Schedule the cleanup to run periodically
 |     // Schedule both cleanup functions to run periodically
 | ||||||
|     cron.schedule(CLEANUP_CRON, cleanupOldGeodata) |     cron.schedule(CLEANUP_CRON, async () => { | ||||||
|  |       await cleanupOldGeodata() | ||||||
|  |       await cleanupOrphanedHotGeodata() | ||||||
|  |     }) | ||||||
|   } |   } | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue