chore: cleanup geodata wip
All checks were successful
/ build (map[dockerfile:./services/watchers/Dockerfile name:watchers]) (push) Successful in 1m40s
/ build (map[dockerfile:./services/files/Dockerfile name:files]) (push) Successful in 1m8s
/ build (map[dockerfile:./services/app/Dockerfile name:app]) (push) Successful in 1m28s
/ build (map[dockerfile:./services/api/Dockerfile name:api]) (push) Successful in 1m13s
/ build (map[dockerfile:./services/tasks/Dockerfile name:tasks]) (push) Successful in 1m28s
/ build (map[dockerfile:./services/hasura/Dockerfile name:hasura]) (push) Successful in 1m42s
/ build (map[dockerfile:./services/web/Dockerfile name:web]) (push) Successful in 1m40s
/ deploy (push) Successful in 8s
All checks were successful
/ build (map[dockerfile:./services/watchers/Dockerfile name:watchers]) (push) Successful in 1m40s
/ build (map[dockerfile:./services/files/Dockerfile name:files]) (push) Successful in 1m8s
/ build (map[dockerfile:./services/app/Dockerfile name:app]) (push) Successful in 1m28s
/ build (map[dockerfile:./services/api/Dockerfile name:api]) (push) Successful in 1m13s
/ build (map[dockerfile:./services/tasks/Dockerfile name:tasks]) (push) Successful in 1m28s
/ build (map[dockerfile:./services/hasura/Dockerfile name:hasura]) (push) Successful in 1m42s
/ build (map[dockerfile:./services/web/Dockerfile name:web]) (push) Successful in 1m40s
/ deploy (push) Successful in 8s
This commit is contained in:
parent
e9887b29ae
commit
ddaff39619
1 changed files with 50 additions and 0 deletions
|
@ -104,9 +104,59 @@ module.exports = async function () {
|
||||||
} while (coldCursor !== "0")
|
} while (coldCursor !== "0")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this is temporary function (fixing actual data)
|
||||||
|
async function cleanupOrphanedHotGeodata() {
|
||||||
|
// Get all devices from hot storage
|
||||||
|
const hotDevices = new Set()
|
||||||
|
let hotCursor = "0"
|
||||||
|
do {
|
||||||
|
// Use zscan to iterate through the sorted set
|
||||||
|
const [newCursor, items] = await redisHot.zscan(
|
||||||
|
HOTGEODATA_KEY,
|
||||||
|
hotCursor,
|
||||||
|
"COUNT",
|
||||||
|
"100"
|
||||||
|
)
|
||||||
|
hotCursor = newCursor
|
||||||
|
|
||||||
|
// Extract device IDs (every other item in the result is a score)
|
||||||
|
for (let i = 0; i < items.length; i += 2) {
|
||||||
|
hotDevices.add(items[i])
|
||||||
|
}
|
||||||
|
} while (hotCursor !== "0")
|
||||||
|
|
||||||
|
// Process each hot device
|
||||||
|
await async.eachLimit(
|
||||||
|
[...hotDevices],
|
||||||
|
MAX_PARALLEL_PROCESS,
|
||||||
|
async (deviceId) => {
|
||||||
|
try {
|
||||||
|
// Check if device exists in cold storage
|
||||||
|
const coldKey = `${COLDGEODATA_DEVICE_KEY_PREFIX}${deviceId}`
|
||||||
|
const exists = await redisCold.exists(coldKey)
|
||||||
|
|
||||||
|
// If device doesn't exist in cold storage, remove it from hot storage
|
||||||
|
if (!exists) {
|
||||||
|
await redisHot.zrem(HOTGEODATA_KEY, deviceId)
|
||||||
|
logger.debug(
|
||||||
|
{ deviceId },
|
||||||
|
"Removed orphaned device data from hot storage (not found in cold storage)"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
{ error, deviceId },
|
||||||
|
"Error checking orphaned device data"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// Schedule both cleanup functions to run periodically
|
// Schedule both cleanup functions to run periodically
|
||||||
cron.schedule(CLEANUP_CRON, async () => {
|
cron.schedule(CLEANUP_CRON, async () => {
|
||||||
await cleanupOldGeodata()
|
await cleanupOldGeodata()
|
||||||
|
await cleanupOrphanedHotGeodata()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue