Compare commits
4 commits
d5f63107b9
...
ace5657057
Author | SHA1 | Date | |
---|---|---|---|
ace5657057 | |||
74d999a9b8 | |||
74430f63ba | |||
a0cbf8ff4b |
22 changed files with 373 additions and 96 deletions
|
@ -1,6 +1,11 @@
|
|||
module.exports = async function ({ services: { authTokenHandler } }) {
|
||||
async function doAuthLoginToken(req) {
|
||||
const { authTokenJwt, phoneModel = null, deviceUuid = null } = req.body
|
||||
const {
|
||||
authTokenJwt,
|
||||
phoneModel = null,
|
||||
phoneOS = null,
|
||||
deviceUuid = null,
|
||||
} = req.body
|
||||
|
||||
// Validate the auth token JWT and extract the auth token
|
||||
const authToken = authTokenHandler.decodeAuthToken(authTokenJwt)
|
||||
|
@ -10,6 +15,7 @@ module.exports = async function ({ services: { authTokenHandler } }) {
|
|||
await authTokenHandler.getOrCreateUserSession(
|
||||
authToken,
|
||||
phoneModel,
|
||||
phoneOS,
|
||||
deviceUuid
|
||||
)
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ requestBody:
|
|||
format: apiJwt
|
||||
phoneModel:
|
||||
type: string
|
||||
phoneOS:
|
||||
type: string
|
||||
deviceUuid:
|
||||
type: string
|
||||
format: uuid
|
||||
|
|
|
@ -22,6 +22,7 @@ module.exports = ({ services }) => {
|
|||
async function getOrCreateUserSession(
|
||||
authToken,
|
||||
phoneModel = null,
|
||||
phoneOS = null,
|
||||
deviceUuid = null
|
||||
) {
|
||||
let userId
|
||||
|
@ -54,8 +55,8 @@ module.exports = ({ services }) => {
|
|||
id
|
||||
`
|
||||
;[{ id: deviceId }] = await sql`
|
||||
INSERT INTO "device" ("user_id", "phone_model", "uuid")
|
||||
VALUES (${userId}, ${phoneModel}, ${deviceUuid})
|
||||
INSERT INTO "device" ("user_id", "phone_model", "phone_os", "uuid")
|
||||
VALUES (${userId}, ${phoneModel}, ${phoneOS}, ${deviceUuid})
|
||||
RETURNING
|
||||
id
|
||||
`
|
||||
|
@ -104,8 +105,8 @@ module.exports = ({ services }) => {
|
|||
if (!deviceId) {
|
||||
// Only create new device if UUID doesn't exist
|
||||
;[{ id: deviceId }] = await sql`
|
||||
INSERT INTO "device" ("user_id", "phone_model", "uuid")
|
||||
VALUES (${userId}, ${phoneModel}, ${deviceUuid})
|
||||
INSERT INTO "device" ("user_id", "phone_model", "phone_os", "uuid")
|
||||
VALUES (${userId}, ${phoneModel}, ${phoneOS}, ${deviceUuid})
|
||||
RETURNING
|
||||
id
|
||||
`
|
||||
|
|
|
@ -3,10 +3,22 @@ table:
|
|||
schema: public
|
||||
configuration:
|
||||
column_config:
|
||||
acknowledged_around_count:
|
||||
custom_name: acknowledgedAroundCount
|
||||
acknowledged_connect_count:
|
||||
custom_name: acknowledgedConnectCount
|
||||
acknowledged_relative_count:
|
||||
custom_name: acknowledgedRelativeCount
|
||||
alert_id:
|
||||
custom_name: alertId
|
||||
alert_tag:
|
||||
custom_name: alertTag
|
||||
alerting_around_count:
|
||||
custom_name: alertingAroundCount
|
||||
alerting_connect_count:
|
||||
custom_name: alertingConnectCount
|
||||
alerting_relative_count:
|
||||
custom_name: alertingRelativeCount
|
||||
altitude_accuracy:
|
||||
custom_name: altitudeAccuracy
|
||||
archive_created_at:
|
||||
|
@ -21,8 +33,20 @@ configuration:
|
|||
custom_name: createdAt
|
||||
device_id:
|
||||
custom_name: deviceId
|
||||
emergency_calling_notification_sent:
|
||||
custom_name: emergencyCallingNotificationSent
|
||||
follow_location:
|
||||
custom_name: followLocation
|
||||
initial_location:
|
||||
custom_name: initialLocation
|
||||
keep_open_at:
|
||||
custom_name: keepOpenAt
|
||||
last_address:
|
||||
custom_name: lastAddress
|
||||
last_nearest_place:
|
||||
custom_name: lastNearestPlace
|
||||
last_what3words:
|
||||
custom_name: lastWhat3Words
|
||||
nearest_place:
|
||||
custom_name: nearestPlace
|
||||
notified_count:
|
||||
|
@ -44,8 +68,14 @@ configuration:
|
|||
what3words:
|
||||
custom_name: what3Words
|
||||
custom_column_names:
|
||||
acknowledged_around_count: acknowledgedAroundCount
|
||||
acknowledged_connect_count: acknowledgedConnectCount
|
||||
acknowledged_relative_count: acknowledgedRelativeCount
|
||||
alert_id: alertId
|
||||
alert_tag: alertTag
|
||||
alerting_around_count: alertingAroundCount
|
||||
alerting_connect_count: alertingConnectCount
|
||||
alerting_relative_count: alertingRelativeCount
|
||||
altitude_accuracy: altitudeAccuracy
|
||||
archive_created_at: archiveCreatedAt
|
||||
call_emergency: callEmergency
|
||||
|
@ -53,7 +83,13 @@ configuration:
|
|||
closed_by: closedBy
|
||||
created_at: createdAt
|
||||
device_id: deviceId
|
||||
emergency_calling_notification_sent: emergencyCallingNotificationSent
|
||||
follow_location: followLocation
|
||||
initial_location: initialLocation
|
||||
keep_open_at: keepOpenAt
|
||||
last_address: lastAddress
|
||||
last_nearest_place: lastNearestPlace
|
||||
last_what3words: lastWhat3Words
|
||||
nearest_place: nearestPlace
|
||||
notified_count: notifiedCount
|
||||
notify_around: notifyAround
|
||||
|
|
|
@ -15,6 +15,8 @@ configuration:
|
|||
custom_name: notificationAlertLevel
|
||||
phone_model:
|
||||
custom_name: phoneModel
|
||||
phone_os:
|
||||
custom_name: phoneOs
|
||||
preferred_emergency_call:
|
||||
custom_name: preferredEmergencyCall
|
||||
radius_all:
|
||||
|
@ -32,6 +34,7 @@ configuration:
|
|||
follow_location: followLocation
|
||||
notification_alert_level: notificationAlertLevel
|
||||
phone_model: phoneModel
|
||||
phone_os: phoneOs
|
||||
preferred_emergency_call: preferredEmergencyCall
|
||||
radius_all: radiusAll
|
||||
radius_reach: radiusReach
|
||||
|
@ -59,6 +62,7 @@ insert_permissions:
|
|||
user_id: X-Hasura-User-Id
|
||||
columns:
|
||||
- phone_model
|
||||
- phone_os
|
||||
- uuid
|
||||
select_permissions:
|
||||
- role: owner
|
||||
|
@ -73,6 +77,7 @@ select_permissions:
|
|||
- location
|
||||
- notification_alert_level
|
||||
- phone_model
|
||||
- phone_os
|
||||
- preferred_emergency_call
|
||||
- radius_all
|
||||
- radius_reach
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
table:
|
||||
name: enum_phone_os
|
||||
schema: public
|
||||
is_enum: true
|
||||
configuration:
|
||||
column_config: {}
|
||||
custom_column_names: {}
|
||||
custom_root_fields:
|
||||
delete: deleteManyEnumPhoneOs
|
||||
delete_by_pk: deleteOneEnumPhoneOs
|
||||
insert: insertManyEnumPhoneOs
|
||||
insert_one: insertOneEnumPhoneOs
|
||||
select: selectManyEnumPhoneOs
|
||||
select_aggregate: selectAggEnumPhoneOs
|
||||
select_by_pk: selectOneEnumPhoneOs
|
||||
select_stream: selectStreamEnumPhoneOs
|
||||
update: updateManyEnumPhoneOs
|
||||
update_by_pk: updateOneEnumPhoneOs
|
||||
update_many: updateBatchEnumPhoneOs
|
|
@ -19,6 +19,7 @@
|
|||
- "!include public_enum_content_type.yaml"
|
||||
- "!include public_enum_emergency_call.yaml"
|
||||
- "!include public_enum_notification_type.yaml"
|
||||
- "!include public_enum_phone_os.yaml"
|
||||
- "!include public_enum_user_login_request_type.yaml"
|
||||
- "!include public_enum_user_role.yaml"
|
||||
- "!include public_external_public_config.yaml"
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
-- Could not auto-generate a down migration.
|
||||
-- Please write an appropriate down migration for the SQL below:
|
||||
-- alter table "public"."device" add column "phone_os" text
|
||||
-- null;
|
|
@ -0,0 +1,2 @@
|
|||
alter table "public"."device" add column "phone_os" text
|
||||
null;
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE "public"."enum_phone_os";
|
|
@ -0,0 +1 @@
|
|||
CREATE TABLE "public"."enum_phone_os" ("value" text NOT NULL, PRIMARY KEY ("value") );
|
|
@ -0,0 +1 @@
|
|||
DELETE FROM "public"."enum_phone_os" WHERE "value" = 'ios';
|
|
@ -0,0 +1 @@
|
|||
INSERT INTO "public"."enum_phone_os"("value") VALUES (E'ios');
|
|
@ -0,0 +1 @@
|
|||
DELETE FROM "public"."enum_phone_os" WHERE "value" = 'android';
|
|
@ -0,0 +1 @@
|
|||
INSERT INTO "public"."enum_phone_os"("value") VALUES (E'android');
|
|
@ -0,0 +1 @@
|
|||
alter table "public"."device" drop constraint "device_phone_os_fkey";
|
|
@ -0,0 +1,5 @@
|
|||
alter table "public"."device"
|
||||
add constraint "device_phone_os_fkey"
|
||||
foreign key ("phone_os")
|
||||
references "public"."enum_phone_os"
|
||||
("value") on update restrict on delete restrict;
|
120
services/tasks/src/queues/ios-geolocation-heartbeat-sync.js
Normal file
120
services/tasks/src/queues/ios-geolocation-heartbeat-sync.js
Normal file
|
@ -0,0 +1,120 @@
|
|||
const { ctx } = require("@modjo/core")
|
||||
const { taskCtx } = require("@modjo/microservice-worker/ctx")
|
||||
|
||||
const pushNotification = require("~/services/push-notification")
|
||||
|
||||
function createIosGeolocationHeartbeatSyncNotification() {
|
||||
return {
|
||||
data: {
|
||||
action: "geolocation-heartbeat-sync",
|
||||
},
|
||||
// Silent push notification
|
||||
notification: {
|
||||
silent: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = async function () {
|
||||
return Object.assign(
|
||||
async function iosGeolocationHeartbeatSync(params) {
|
||||
const logger = taskCtx.require("logger")
|
||||
const sql = ctx.require("postgres")
|
||||
const redisCold = ctx.require("keydbColdGeodata")
|
||||
|
||||
const { deviceId } = params
|
||||
|
||||
try {
|
||||
// Get the device information including phone_os and fcm_token
|
||||
const deviceResult = await sql`
|
||||
SELECT
|
||||
"user_id" as "userId",
|
||||
"phone_os" as "phoneOs",
|
||||
"fcm_token" as "fcmToken"
|
||||
FROM
|
||||
"device"
|
||||
WHERE
|
||||
id = ${deviceId}
|
||||
`
|
||||
|
||||
if (!deviceResult || deviceResult.length === 0) {
|
||||
logger.warn(
|
||||
{ deviceId },
|
||||
"No device found when sending iOS geolocation heartbeat sync"
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
const { userId, phoneOs, fcmToken } = deviceResult[0]
|
||||
|
||||
// Only proceed if device is iOS
|
||||
if (phoneOs !== "ios") {
|
||||
logger.debug(
|
||||
{ deviceId, phoneOs },
|
||||
"Skipping iOS heartbeat sync - device is not iOS"
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if (!fcmToken) {
|
||||
logger.warn(
|
||||
{ deviceId, userId },
|
||||
"No FCM token found for iOS device when sending heartbeat sync"
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if we've already sent a heartbeat sync for this device in the last 24h
|
||||
const heartbeatSentKey = `ios_heartbeat_sent:device:${deviceId}`
|
||||
const alreadySent = await redisCold.exists(heartbeatSentKey)
|
||||
|
||||
if (alreadySent) {
|
||||
logger.debug(
|
||||
{ deviceId, userId },
|
||||
"iOS heartbeat sync already sent for this device in the last 24h"
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// Create silent notification config
|
||||
const notificationConfig =
|
||||
createIosGeolocationHeartbeatSyncNotification()
|
||||
|
||||
// Send silent push notification
|
||||
logger.info(
|
||||
{ deviceId, userId, notificationConfig },
|
||||
"Sending iOS silent push for geolocation heartbeat sync"
|
||||
)
|
||||
|
||||
const { success } = await pushNotification({
|
||||
fcmToken,
|
||||
deviceId,
|
||||
notification: notificationConfig.notification,
|
||||
data: notificationConfig.data,
|
||||
})
|
||||
|
||||
if (!success) {
|
||||
throw new Error(
|
||||
"Unable to send iOS geolocation heartbeat sync notification"
|
||||
)
|
||||
}
|
||||
|
||||
// Mark as sent with 24h expiry to prevent duplicate sends
|
||||
await redisCold.set(heartbeatSentKey, "1", "EX", 24 * 60 * 60)
|
||||
|
||||
logger.info(
|
||||
{ deviceId, userId },
|
||||
"Successfully sent iOS geolocation heartbeat sync notification"
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{ deviceId, error },
|
||||
"Error sending iOS geolocation heartbeat sync notification"
|
||||
)
|
||||
}
|
||||
},
|
||||
{
|
||||
dedupOptions: { enabled: true },
|
||||
}
|
||||
)
|
||||
}
|
|
@ -48,7 +48,9 @@ function deriveNotificationConfig({
|
|||
android = {},
|
||||
apns = {},
|
||||
uid,
|
||||
silent = false,
|
||||
}) {
|
||||
const isVisible = !silent
|
||||
const notification = {
|
||||
// https://firebase.google.com/docs/reference/admin/node/firebase-admin.messaging.notification.md#notification_interface
|
||||
title,
|
||||
|
@ -74,7 +76,14 @@ function deriveNotificationConfig({
|
|||
priority: "high",
|
||||
visibility: "public",
|
||||
defaultSound: true,
|
||||
clickAction: `com.alertesecours.${snakeCase(actionId).toUpperCase()}`,
|
||||
// Only include clickAction for visible notifications (not silent ones)
|
||||
...(actionId && isVisible
|
||||
? {
|
||||
clickAction: `com.alertesecours.${snakeCase(
|
||||
actionId
|
||||
).toUpperCase()}`,
|
||||
}
|
||||
: {}),
|
||||
...(android.notification || {}),
|
||||
},
|
||||
restrictedPackageName: "com.alertesecours",
|
||||
|
@ -85,7 +94,7 @@ function deriveNotificationConfig({
|
|||
// https://firebase.google.com/docs/reference/admin/node/firebase-admin.messaging.apnsconfig.md#apnsconfig_interface
|
||||
headers: {
|
||||
"apns-priority": priority === "high" ? "10" : "5",
|
||||
"apns-push-type": "alert",
|
||||
"apns-push-type": isVisible ? "alert" : "background", // Use background for silent pushes
|
||||
"apns-collapse-id": uid, // https://firebase.google.com/docs/cloud-messaging/concept-options
|
||||
...(apns.headers || {}),
|
||||
},
|
||||
|
@ -93,16 +102,20 @@ function deriveNotificationConfig({
|
|||
aps: {
|
||||
category: channel,
|
||||
threadId: channel, // Thread ID for grouping notifications
|
||||
// Critical alerts for high importance
|
||||
sound: {
|
||||
critical: priority === "high",
|
||||
name: "default",
|
||||
volume: 1.0,
|
||||
},
|
||||
// Content available flag for background processing
|
||||
contentAvailable: true,
|
||||
// Support for modification of notification content
|
||||
mutableContent: true,
|
||||
// Only include sound for non-silent notifications
|
||||
...(isVisible
|
||||
? {
|
||||
sound: {
|
||||
critical: priority === "high",
|
||||
name: "default",
|
||||
volume: 1.0,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
|
||||
// alert: {
|
||||
// // https://firebase.google.com/docs/reference/admin/node/firebase-admin.messaging.apsalert.md#apsalert_interface
|
||||
|
@ -160,14 +173,21 @@ async function pushNotification({
|
|||
// https://firebase.google.com/docs/reference/admin/node/firebase-admin.messaging.basemessage
|
||||
const message = {
|
||||
token: fcmToken,
|
||||
// Basic notification for platforms that don't need specific configs
|
||||
notification: derivedNotification.notification,
|
||||
data: { json: JSON.stringify(data), uid, actionId: notification.actionId },
|
||||
data: { json: JSON.stringify(data), uid, actionId: notification?.actionId },
|
||||
// Platform specific configurations
|
||||
android: derivedNotification.android,
|
||||
apns: derivedNotification.apns,
|
||||
}
|
||||
|
||||
// Only include notification for non-silent pushes
|
||||
if (
|
||||
notification &&
|
||||
!notification.silent &&
|
||||
(notification.title || notification.body)
|
||||
) {
|
||||
message.notification = derivedNotification.notification
|
||||
}
|
||||
|
||||
try {
|
||||
const res = await admin.messaging().send(message)
|
||||
logger.info(
|
||||
|
|
|
@ -8,7 +8,9 @@ module.exports = {
|
|||
SCAN_AUTO_CLOSE_CRON: "15 * * * *", // At minute 15
|
||||
SCAN_AUTO_ARCHIVE_CRON: "0 4 * * *", // At 4:00
|
||||
RELATIVE_UNREGISTERED_RECONCILIATION_CRON: "0 4 * * *", // At 4:00
|
||||
DEVICE_GEODATA_MAX_AGE: "36 hours", // Maximum age before removal from hot storage
|
||||
DEVICE_GEODATA_IOS_SILENT_PUSH_AGE: "24 hours", // When to send iOS silent push for heartbeat sync
|
||||
DEVICE_GEODATA_NOTIFICATION_AGE: "36 hours", // When to send push notification
|
||||
DEVICE_GEODATA_CLEANUP_AGE: "48 hours", // When to remove/clean data
|
||||
}
|
||||
|
||||
// cheat on https://crontab.guru/
|
||||
|
|
|
@ -10,4 +10,5 @@ module.exports = {
|
|||
RELATIVE_INVITATION_NOTIFY: "relativeInvitationNotify",
|
||||
ALERT_CALL_EMERGENCY_INFO_NOTIFY: "alertCallEmergencyInfoNotify",
|
||||
BACKGROUND_GEOLOCATION_LOST_NOTIFY: "backgroundGeolocationLostNotify",
|
||||
IOS_GEOLOCATION_HEARTBEAT_SYNC: "iosGeolocationHeartbeatSync",
|
||||
}
|
||||
|
|
|
@ -2,15 +2,24 @@ const async = require("async")
|
|||
const { ctx } = require("@modjo/core")
|
||||
const ms = require("ms")
|
||||
const cron = require("~/libs/cron")
|
||||
const { DEVICE_GEODATA_MAX_AGE } = require("~/constants/time")
|
||||
const {
|
||||
DEVICE_GEODATA_IOS_SILENT_PUSH_AGE,
|
||||
DEVICE_GEODATA_NOTIFICATION_AGE,
|
||||
DEVICE_GEODATA_CLEANUP_AGE,
|
||||
} = require("~/constants/time")
|
||||
const tasks = require("~/tasks")
|
||||
|
||||
const CLEANUP_CRON = "0 9-19 * * *" // Run every hour from 9h to 19h
|
||||
const MAX_PARALLEL_PROCESS = 10
|
||||
const COLDGEODATA_DEVICE_KEY_PREFIX = "device:geodata:"
|
||||
const COLDGEODATA_OLD_KEY_PREFIX = "old:device:geodata:"
|
||||
const COLDGEODATA_NOTIFIED_KEY_PREFIX = "notified:device:geodata:"
|
||||
const HOTGEODATA_KEY = "device" // The key where hot geodata is stored
|
||||
const maxAge = Math.floor(ms(DEVICE_GEODATA_MAX_AGE) / 1000) // Convert to seconds
|
||||
const iosHeartbeatAge = Math.floor(
|
||||
ms(DEVICE_GEODATA_IOS_SILENT_PUSH_AGE) / 1000
|
||||
) // Convert to seconds
|
||||
const notificationAge = Math.floor(ms(DEVICE_GEODATA_NOTIFICATION_AGE) / 1000) // Convert to seconds
|
||||
const cleanupAge = Math.floor(ms(DEVICE_GEODATA_CLEANUP_AGE) / 1000) // Convert to seconds
|
||||
|
||||
module.exports = async function () {
|
||||
const logger = ctx.require("logger")
|
||||
|
@ -21,8 +30,8 @@ module.exports = async function () {
|
|||
return async function geodataCleanupCron() {
|
||||
logger.info("watcher geodataCleanupCron: daemon started")
|
||||
|
||||
// Process keys in batches to avoid memory accumulation
|
||||
async function cleanupOldGeodata() {
|
||||
// Process geodata cleanup with single loop for both notifications and cleanup
|
||||
async function processGeodataCleanup() {
|
||||
const now = Math.floor(Date.now() / 1000) // Current time in seconds
|
||||
let coldCursor = "0"
|
||||
|
||||
|
@ -51,41 +60,27 @@ module.exports = async function () {
|
|||
|
||||
// Parse stored JSON to get `updatedAt`
|
||||
const data = JSON.parse(deviceData)
|
||||
const age = data.updatedAt ? now - data.updatedAt : Infinity
|
||||
|
||||
// const age = now - data.updatedAt
|
||||
const age = data.updatedAt ? now - data.updatedAt : Infinity // trick is for missing updatedAt field on old entries
|
||||
|
||||
// If data is older than maxAge
|
||||
if (age > maxAge) {
|
||||
// Handle cleanup first (48h+) - this takes priority
|
||||
if (age > cleanupAge) {
|
||||
try {
|
||||
// Remove from hot storage
|
||||
await redisHot.zrem(HOTGEODATA_KEY, deviceId)
|
||||
|
||||
// Move to cleaned prefix in cold storage
|
||||
// Move to cleaned prefix in cold storage and clean notification flag atomically
|
||||
const oldKey = `${COLDGEODATA_OLD_KEY_PREFIX}${deviceId}`
|
||||
await redisCold.rename(key, oldKey)
|
||||
const notifiedKey = `${COLDGEODATA_NOTIFIED_KEY_PREFIX}${deviceId}`
|
||||
|
||||
const transaction = redisCold.multi()
|
||||
transaction.rename(key, oldKey)
|
||||
transaction.del(notifiedKey)
|
||||
await transaction.exec()
|
||||
|
||||
logger.debug(
|
||||
{ deviceId, age: `${Math.floor(age / 3600)}h` },
|
||||
"Removed old device data from hot storage and marked as cleaned in cold storage"
|
||||
)
|
||||
|
||||
// Enqueue task to notify user about lost background geolocation
|
||||
try {
|
||||
await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, {
|
||||
deviceId,
|
||||
})
|
||||
|
||||
logger.info(
|
||||
{ deviceId },
|
||||
"Enqueued background geolocation lost notification task"
|
||||
)
|
||||
} catch (notifError) {
|
||||
logger.error(
|
||||
{ deviceId, error: notifError },
|
||||
"Error enqueueing background geolocation lost notification task"
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{ error, deviceId },
|
||||
|
@ -93,11 +88,62 @@ module.exports = async function () {
|
|||
)
|
||||
}
|
||||
}
|
||||
// Handle iOS silent push for heartbeat sync (24h+ but less than 36h)
|
||||
else if (age > iosHeartbeatAge) {
|
||||
try {
|
||||
// Enqueue task to send iOS silent push for geolocation heartbeat sync
|
||||
await addTask(tasks.IOS_GEOLOCATION_HEARTBEAT_SYNC, {
|
||||
deviceId,
|
||||
})
|
||||
|
||||
logger.info(
|
||||
{ deviceId, age: `${Math.floor(age / 3600)}h` },
|
||||
"Enqueued iOS geolocation heartbeat sync task"
|
||||
)
|
||||
} catch (heartbeatError) {
|
||||
logger.error(
|
||||
{ deviceId, error: heartbeatError },
|
||||
"Error enqueueing iOS geolocation heartbeat sync task"
|
||||
)
|
||||
}
|
||||
}
|
||||
// Handle notification (36h+ but less than 48h)
|
||||
else if (age > notificationAge) {
|
||||
const notifiedKey = `${COLDGEODATA_NOTIFIED_KEY_PREFIX}${deviceId}`
|
||||
|
||||
try {
|
||||
// Check if we've already notified for this device
|
||||
const alreadyNotified = await redisCold.exists(notifiedKey)
|
||||
|
||||
if (!alreadyNotified) {
|
||||
// Enqueue task to notify user about lost background geolocation
|
||||
try {
|
||||
await addTask(tasks.BACKGROUND_GEOLOCATION_LOST_NOTIFY, {
|
||||
deviceId,
|
||||
})
|
||||
|
||||
logger.info(
|
||||
{ deviceId, age: `${Math.floor(age / 3600)}h` },
|
||||
"Enqueued background geolocation lost notification task"
|
||||
)
|
||||
} catch (notifError) {
|
||||
logger.error(
|
||||
{ deviceId, error: notifError },
|
||||
"Error enqueueing background geolocation lost notification task"
|
||||
)
|
||||
}
|
||||
// Mark as notified with 48h expiry (cleanup age)
|
||||
await redisCold.set(notifiedKey, "1", "EX", cleanupAge)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{ error, deviceId },
|
||||
"Error processing notification for device"
|
||||
)
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{ error, key },
|
||||
"Error processing device data from cold storage"
|
||||
)
|
||||
logger.error({ error, key }, "Error processing device data")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -105,58 +151,58 @@ module.exports = async function () {
|
|||
}
|
||||
|
||||
// this is temporary function (fixing actual data)
|
||||
async function cleanupOrphanedHotGeodata() {
|
||||
// Get all devices from hot storage
|
||||
const hotDevices = new Set()
|
||||
let hotCursor = "0"
|
||||
do {
|
||||
// Use zscan to iterate through the sorted set
|
||||
const [newCursor, items] = await redisHot.zscan(
|
||||
HOTGEODATA_KEY,
|
||||
hotCursor,
|
||||
"COUNT",
|
||||
"100"
|
||||
)
|
||||
hotCursor = newCursor
|
||||
// async function cleanupOrphanedHotGeodata() {
|
||||
// // Get all devices from hot storage
|
||||
// const hotDevices = new Set()
|
||||
// let hotCursor = "0"
|
||||
// do {
|
||||
// // Use zscan to iterate through the sorted set
|
||||
// const [newCursor, items] = await redisHot.zscan(
|
||||
// HOTGEODATA_KEY,
|
||||
// hotCursor,
|
||||
// "COUNT",
|
||||
// "100"
|
||||
// )
|
||||
// hotCursor = newCursor
|
||||
|
||||
// Extract device IDs (every other item in the result is a score)
|
||||
for (let i = 0; i < items.length; i += 2) {
|
||||
hotDevices.add(items[i])
|
||||
}
|
||||
} while (hotCursor !== "0")
|
||||
// // Extract device IDs (every other item in the result is a score)
|
||||
// for (let i = 0; i < items.length; i += 2) {
|
||||
// hotDevices.add(items[i])
|
||||
// }
|
||||
// } while (hotCursor !== "0")
|
||||
|
||||
// Process each hot device
|
||||
await async.eachLimit(
|
||||
[...hotDevices],
|
||||
MAX_PARALLEL_PROCESS,
|
||||
async (deviceId) => {
|
||||
try {
|
||||
// Check if device exists in cold storage
|
||||
const coldKey = `${COLDGEODATA_DEVICE_KEY_PREFIX}${deviceId}`
|
||||
const exists = await redisCold.exists(coldKey)
|
||||
// // Process each hot device
|
||||
// await async.eachLimit(
|
||||
// [...hotDevices],
|
||||
// MAX_PARALLEL_PROCESS,
|
||||
// async (deviceId) => {
|
||||
// try {
|
||||
// // Check if device exists in cold storage
|
||||
// const coldKey = `${COLDGEODATA_DEVICE_KEY_PREFIX}${deviceId}`
|
||||
// const exists = await redisCold.exists(coldKey)
|
||||
|
||||
// If device doesn't exist in cold storage, remove it from hot storage
|
||||
if (!exists) {
|
||||
await redisHot.zrem(HOTGEODATA_KEY, deviceId)
|
||||
logger.debug(
|
||||
{ deviceId },
|
||||
"Removed orphaned device data from hot storage (not found in cold storage)"
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
{ error, deviceId },
|
||||
"Error checking orphaned device data"
|
||||
)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
// // If device doesn't exist in cold storage, remove it from hot storage
|
||||
// if (!exists) {
|
||||
// await redisHot.zrem(HOTGEODATA_KEY, deviceId)
|
||||
// logger.debug(
|
||||
// { deviceId },
|
||||
// "Removed orphaned device data from hot storage (not found in cold storage)"
|
||||
// )
|
||||
// }
|
||||
// } catch (error) {
|
||||
// logger.error(
|
||||
// { error, deviceId },
|
||||
// "Error checking orphaned device data"
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
// )
|
||||
// }
|
||||
|
||||
// Schedule both cleanup functions to run periodically
|
||||
// Schedule cleanup function to run periodically
|
||||
cron.schedule(CLEANUP_CRON, async () => {
|
||||
await cleanupOldGeodata()
|
||||
await cleanupOrphanedHotGeodata()
|
||||
await processGeodataCleanup()
|
||||
// await cleanupOrphanedHotGeodata()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue