diff --git a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js index 0ed9cdc..f2dca24 100644 --- a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js +++ b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js @@ -37,6 +37,9 @@ const AggregatedMessagesSubscription = () => { // Chat is latency-sensitive; if the WS transport is up but this subscription // delivers nothing for a while, force a resubscribe. livenessStaleMs: 60_000, + + // If WS reconnects, refetch base query once before resubscribing to reduce cursor gaps. + refetchOnReconnect: true, }, ); diff --git a/src/app/subscriptions/aggregatedMessages/gql.js b/src/app/subscriptions/aggregatedMessages/gql.js index a435466..e6dfe82 100644 --- a/src/app/subscriptions/aggregatedMessages/gql.js +++ b/src/app/subscriptions/aggregatedMessages/gql.js @@ -31,7 +31,7 @@ export const AGGREGATED_MESSAGES_SUBSCRIPTION = gql` subscription aggregatedMessagesSubscription($cursor: Int) { selectStreamMessage( cursor: { initial_value: { id: $cursor }, ordering: ASC } - batch_size: 100 + batch_size: 30 ) { ...AggMessageFields } diff --git a/src/app/subscriptions/alerting/AlertingSubscription.js b/src/app/subscriptions/alerting/AlertingSubscription.js index 290f62c..674ee34 100644 --- a/src/app/subscriptions/alerting/AlertingSubscription.js +++ b/src/app/subscriptions/alerting/AlertingSubscription.js @@ -24,6 +24,14 @@ const AlertingSubscription = () => { cursorKey: "updatedSeq", uniqKey: "id", initialCursor: -1, + subscriptionKey: "alerting", + // Alerting is latency-sensitive; add per-subscription liveness so it can't go stale + // while WS transport heartbeat stays fresh. + livenessStaleMs: 45_000, + livenessCheckEveryMs: 12_000, + + // If WS reconnects, refetch base query once before resubscribing to reduce cursor gaps. + refetchOnReconnect: true, }); if (alertingError) { diff --git a/src/app/subscriptions/alerting/gql.js b/src/app/subscriptions/alerting/gql.js index b43919f..0129efc 100644 --- a/src/app/subscriptions/alerting/gql.js +++ b/src/app/subscriptions/alerting/gql.js @@ -58,7 +58,7 @@ export const ALERTING_SUBSCRIPTION = gql` subscription alertingSubscription($cursor: bigint!) { selectStreamAlerting( cursor: { initial_value: { updatedSeq: $cursor }, ordering: ASC } - batch_size: 100 + batch_size: 30 ) { ...AlertingFields } diff --git a/src/containers/AppLifecycleListener.js b/src/containers/AppLifecycleListener.js index 5c86e7e..c932e8f 100644 --- a/src/containers/AppLifecycleListener.js +++ b/src/containers/AppLifecycleListener.js @@ -10,6 +10,7 @@ import { permissionsActions, usePermissionWizardState, useNetworkState, + networkActions, } from "~/stores"; import { secureStore } from "~/storage/memorySecureStore"; import memoryAsyncStorage from "~/storage/memoryAsyncStorage"; @@ -234,6 +235,7 @@ const AppLifecycleListener = () => { activeTimeout.current = setTimeout(() => { try { lifecycleLogger.info("Restarting WebSocket connection"); + networkActions.WSRecoveryTouch(); network.apolloClient.restartWS(); } catch (error) { lifecycleLogger.error("Failed to restart WebSocket", { error }); diff --git a/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js b/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js index 38e050d..42360a2 100644 --- a/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js +++ b/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js @@ -1,11 +1,15 @@ -import React, { useMemo, useEffect } from "react"; +import React, { useEffect, useMemo, useRef } from "react"; import { useIsFocused } from "@react-navigation/native"; +import { useQuery } from "@apollo/client"; +import * as Sentry from "@sentry/react-native"; import ChatMessages from "~/containers/ChatMessages"; import { useAggregatedMessagesState, aggregatedMessagesActions, } from "~/stores"; +import { SELECT_MANY_MESSAGE_QUERY } from "../gql"; + const LiveMessagesFetcher = ({ scrollViewRef, alertId }) => { // Get messages from aggregated messages store const { @@ -29,6 +33,65 @@ const LiveMessagesFetcher = ({ scrollViewRef, alertId }) => { const isFocused = useIsFocused(); + // Fallback: when focused on a chat, periodically refetch messages for this alert. + // This helps recover faster if the global aggregated subscription is delayed/stale. + const lastSeenCountRef = useRef(0); + const lastChangeAtRef = useRef(Date.now()); + + const { refetch } = useQuery(SELECT_MANY_MESSAGE_QUERY, { + variables: { alertId }, + skip: !isFocused || !alertId, + fetchPolicy: "network-only", + nextFetchPolicy: "cache-first", + notifyOnNetworkStatusChange: false, + }); + + useEffect(() => { + if (!isFocused) return; + const count = messagesList?.length ?? 0; + if (count !== lastSeenCountRef.current) { + lastSeenCountRef.current = count; + lastChangeAtRef.current = Date.now(); + } + }, [isFocused, messagesList]); + + useEffect(() => { + if (!isFocused) return; + if (!alertId) return; + + const CHECK_EVERY_MS = 15_000; + const STALE_MS = 45_000; + + const interval = setInterval(() => { + const age = Date.now() - lastChangeAtRef.current; + if (age < STALE_MS) return; + + try { + Sentry.addBreadcrumb({ + category: "chat", + level: "warning", + message: "chat fallback refetch (stale)", + data: { alertId, ageMs: age }, + }); + } catch (_e) { + // ignore + } + + refetch?.().catch((e) => { + try { + Sentry.captureException(e, { + tags: { context: "chat-fallback-refetch" }, + extra: { alertId, ageMs: age }, + }); + } catch (_e2) { + // ignore + } + }); + }, CHECK_EVERY_MS); + + return () => clearInterval(interval); + }, [alertId, isFocused, refetch]); + useEffect(() => { if (!isFocused || !messagesList) { return; diff --git a/src/hooks/useLatestWithSubscription.js b/src/hooks/useLatestWithSubscription.js index b5246b8..16b35af 100644 --- a/src/hooks/useLatestWithSubscription.js +++ b/src/hooks/useLatestWithSubscription.js @@ -1,7 +1,9 @@ import { useRef, useEffect, useMemo, useState } from "react"; import { useQuery } from "@apollo/client"; import * as Sentry from "@sentry/react-native"; -import { useNetworkState } from "~/stores"; +import { AppState } from "react-native"; +import { useNetworkState, networkActions } from "~/stores"; +import network from "~/network"; import useShallowMemo from "./useShallowMemo"; // Constants for retry configuration @@ -35,18 +37,23 @@ export default function useLatestWithSubscription( maxRetries = MAX_RETRIES, livenessStaleMs = null, livenessCheckEveryMs = 15_000, + refetchOnReconnect = false, ...queryParams } = {}, ) { const variables = useShallowMemo(() => paramVariables, paramVariables); - const { wsClosedDate, wsConnected } = useNetworkState([ - "wsClosedDate", - "wsConnected", - ]); + const { wsClosedDate, wsConnected, wsLastHeartbeatDate, wsLastRecoveryDate } = + useNetworkState([ + "wsClosedDate", + "wsConnected", + "wsLastHeartbeatDate", + "wsLastRecoveryDate", + ]); // State to force re-render and retry subscription const [retryTrigger, setRetryTrigger] = useState(0); + const [reconnectSyncTrigger, setReconnectSyncTrigger] = useState(0); const variableHashRef = useRef(JSON.stringify(variables)); const highestIdRef = useRef(null); @@ -71,12 +78,105 @@ export default function useLatestWithSubscription( // Per-subscription liveness watchdog const lastSubscriptionDataAtRef = useRef(Date.now()); const lastLivenessKickAtRef = useRef(0); + const consecutiveStaleKicksRef = useRef(0); + const lastWsRestartAtRef = useRef(0); + const lastReloadAtRef = useRef(0); + const wsLastHeartbeatDateRef = useRef(wsLastHeartbeatDate); + const appStateRef = useRef(AppState.currentState); + const wsLastRecoveryDateRef = useRef(wsLastRecoveryDate); + + // Optional refetch-on-reconnect support. + // Goal: if WS was reconnected (wsClosedDate changes), force a base refetch once before resubscribing + // to reduce chances of cursor gaps. + const reconnectRefetchPendingRef = useRef(false); + const lastReconnectRefetchKeyRef = useRef(null); + + useEffect(() => { + wsLastHeartbeatDateRef.current = wsLastHeartbeatDate; + }, [wsLastHeartbeatDate]); + + useEffect(() => { + wsLastRecoveryDateRef.current = wsLastRecoveryDate; + }, [wsLastRecoveryDate]); + + useEffect(() => { + const sub = AppState.addEventListener("change", (next) => { + appStateRef.current = next; + if (next === "active") { + // Timers may have been paused/throttled; reset stale timers to avoid false kicks. + lastSubscriptionDataAtRef.current = Date.now(); + lastLivenessKickAtRef.current = 0; + consecutiveStaleKicksRef.current = 0; + } + }); + return () => sub.remove(); + }, []); + + useEffect(() => { + if (!refetchOnReconnect) return; + if (skip) return; + if (appStateRef.current !== "active") return; + if (!wsClosedDate) return; + if (!refetch) return; + + // Only refetch once per wsClosedDate value. + if (lastReconnectRefetchKeyRef.current === wsClosedDate) return; + lastReconnectRefetchKeyRef.current = wsClosedDate; + reconnectRefetchPendingRef.current = true; + + (async () => { + try { + try { + Sentry.addBreadcrumb({ + category: "graphql-subscription", + level: "info", + message: "refetch-on-reconnect start", + data: { subscriptionKey, wsClosedDate }, + }); + } catch (_e) { + // ignore + } + console.log( + `[${subscriptionKey}] WS reconnect detected, refetching base query to prevent gaps`, + { wsClosedDate }, + ); + await refetch(); + } catch (e) { + console.warn( + `[${subscriptionKey}] Refetch-on-reconnect failed (continuing with resubscribe)`, + e, + ); + + try { + Sentry.captureException(e, { + tags: { + subscriptionKey, + context: "refetch-on-reconnect", + }, + extra: { wsClosedDate }, + }); + } catch (_e2) { + // ignore + } + } finally { + reconnectRefetchPendingRef.current = false; + setReconnectSyncTrigger((x) => x + 1); + } + })(); + }, [refetch, refetchOnReconnect, skip, subscriptionKey, wsClosedDate]); useEffect(() => { if (!livenessStaleMs) return; if (skip) return; + const STALE_KICKS_BEFORE_WS_RESTART = 2; + const STALE_KICKS_BEFORE_RELOAD = 4; + const GLOBAL_RECOVERY_COOLDOWN_MS = 30_000; + // Separate throttle for escalations; resubscribe kicks are already throttled by livenessStaleMs. + const MIN_ESCALATION_INTERVAL_MS = 60_000; + const interval = setInterval(() => { + if (appStateRef.current !== "active") return; if (!wsConnected) return; const age = Date.now() - lastSubscriptionDataAtRef.current; if (age < livenessStaleMs) return; @@ -85,9 +185,131 @@ export default function useLatestWithSubscription( if (now - lastLivenessKickAtRef.current < livenessStaleMs) return; lastLivenessKickAtRef.current = now; + consecutiveStaleKicksRef.current += 1; + + const wsHeartbeatAgeMs = (() => { + const hb = wsLastHeartbeatDateRef.current; + if (!hb) return null; + const last = Date.parse(hb); + return Number.isFinite(last) ? Date.now() - last : null; + })(); + console.warn( - `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`, + `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe (wsHeartbeatAgeMs=${ + wsHeartbeatAgeMs ?? "n/a" + }, kicks=${consecutiveStaleKicksRef.current})`, ); + + try { + Sentry.addBreadcrumb({ + category: "graphql-subscription", + level: "warning", + message: "liveness stale kick", + data: { + subscriptionKey, + ageMs: age, + livenessStaleMs, + wsHeartbeatAgeMs, + kicks: consecutiveStaleKicksRef.current, + }, + }); + } catch (_e) { + // ignore + } + + // Escalation policy for repeated consecutive stale kicks. + if ( + consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_RELOAD && + now - lastReloadAtRef.current >= MIN_ESCALATION_INTERVAL_MS + ) { + const lastRecovery = wsLastRecoveryDateRef.current + ? Date.parse(wsLastRecoveryDateRef.current) + : NaN; + if ( + Number.isFinite(lastRecovery) && + now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS + ) { + return; + } + + lastReloadAtRef.current = now; + networkActions.WSRecoveryTouch(); + console.warn( + `[${subscriptionKey}] Escalation: triggering reload after ${consecutiveStaleKicksRef.current} stale kicks`, + ); + + try { + Sentry.captureMessage("subscription escalated to reload", { + level: "warning", + tags: { subscriptionKey, context: "liveness" }, + extra: { + consecutiveKicks: consecutiveStaleKicksRef.current, + wsHeartbeatAgeMs, + ageMs: age, + livenessStaleMs, + }, + }); + } catch (_e) { + // ignore + } + + networkActions.triggerReload(); + } else if ( + consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_WS_RESTART && + now - lastWsRestartAtRef.current >= MIN_ESCALATION_INTERVAL_MS + ) { + const lastRecovery = wsLastRecoveryDateRef.current + ? Date.parse(wsLastRecoveryDateRef.current) + : NaN; + if ( + Number.isFinite(lastRecovery) && + now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS + ) { + return; + } + + lastWsRestartAtRef.current = now; + networkActions.WSRecoveryTouch(); + try { + console.warn( + `[${subscriptionKey}] Escalation: restarting WS after ${consecutiveStaleKicksRef.current} stale kicks`, + ); + + try { + Sentry.captureMessage("subscription escalated to ws restart", { + level: "warning", + tags: { subscriptionKey, context: "liveness" }, + extra: { + consecutiveKicks: consecutiveStaleKicksRef.current, + wsHeartbeatAgeMs, + ageMs: age, + livenessStaleMs, + }, + }); + } catch (_e2) { + // ignore + } + + network.apolloClient?.restartWS?.(); + } catch (error) { + console.warn( + `[${subscriptionKey}] Escalation: WS restart failed`, + error, + ); + + try { + Sentry.captureException(error, { + tags: { + subscriptionKey, + context: "liveness-ws-restart-failed", + }, + }); + } catch (_e2) { + // ignore + } + } + } + lastSubscriptionDataAtRef.current = now; setRetryTrigger((prev) => prev + 1); }, livenessCheckEveryMs); @@ -119,6 +341,7 @@ export default function useLatestWithSubscription( loading, error, subscribeToMore, + refetch, } = useQuery(initialQuery, { ...queryParams, variables, @@ -187,6 +410,18 @@ export default function useLatestWithSubscription( if (!subscribeToMore) return; if (highestIdRef.current === null) return; // Wait until we have the highest ID + if (appStateRef.current !== "active") return; + + // If we opted into refetch-on-reconnect and a reconnect refetch is still pending, + // wait to (re)subscribe until the base query has been refreshed. + if ( + refetchOnReconnect && + wsClosedDate && + reconnectRefetchPendingRef.current + ) { + return; + } + // Always cleanup any previous active subscription before creating a new one. // React only runs the cleanup returned directly from the effect. if (unsubscribeRef.current) { @@ -305,6 +540,7 @@ export default function useLatestWithSubscription( retryCountRef.current = 0; subscriptionErrorRef.current = null; lastSubscriptionDataAtRef.current = Date.now(); + consecutiveStaleKicksRef.current = 0; } if (!subscriptionData.data) return prev; @@ -316,6 +552,8 @@ export default function useLatestWithSubscription( const newItems = subscriptionData.data[subscriptionRootKey] || []; const existingItems = prev[queryRootKey] || []; + const mergeStart = Date.now(); + // Filter new items const filteredNewItems = newItems.filter( (item) => @@ -325,7 +563,15 @@ export default function useLatestWithSubscription( ), ); - if (filteredNewItems.length === 0) return prev; + if (filteredNewItems.length === 0) { + const tookMs = Date.now() - mergeStart; + if (tookMs > 100) { + console.warn( + `[${subscriptionKey}] updateQuery merge took ${tookMs}ms (existing=${existingItems.length}, new=${newItems.length}, result=${existingItems.length})`, + ); + } + return prev; + } // Update highestId if we received any new items filteredNewItems.forEach((item) => { @@ -341,9 +587,18 @@ export default function useLatestWithSubscription( ); // For latest items pattern, we prepend new items (DESC order in UI) + const resultItems = [...filteredNewItems, ...existingItems]; + + const tookMs = Date.now() - mergeStart; + if (tookMs > 100) { + console.warn( + `[${subscriptionKey}] updateQuery merge took ${tookMs}ms (existing=${existingItems.length}, new=${newItems.length}, result=${resultItems.length})`, + ); + } + return { ...prev, - [queryRootKey]: [...filteredNewItems, ...existingItems], + [queryRootKey]: resultItems, }; }, }); @@ -421,6 +676,8 @@ export default function useLatestWithSubscription( maxRetries, livenessStaleMs, livenessCheckEveryMs, + refetchOnReconnect, + reconnectSyncTrigger, ]); return { diff --git a/src/hooks/useStreamQueryWithSubscription.js b/src/hooks/useStreamQueryWithSubscription.js index 326b95c..8d01376 100644 --- a/src/hooks/useStreamQueryWithSubscription.js +++ b/src/hooks/useStreamQueryWithSubscription.js @@ -1,7 +1,9 @@ import { useRef, useEffect, useMemo, useState } from "react"; import { useQuery } from "@apollo/client"; import * as Sentry from "@sentry/react-native"; -import { useNetworkState } from "~/stores"; +import { AppState } from "react-native"; +import { useNetworkState, networkActions } from "~/stores"; +import network from "~/network"; import useShallowMemo from "./useShallowMemo"; // Constants for retry configuration @@ -28,18 +30,23 @@ export default function useStreamQueryWithSubscription( maxRetries = MAX_RETRIES, // Allow overriding default max retries livenessStaleMs = null, livenessCheckEveryMs = 15_000, + refetchOnReconnect = false, ...queryParams } = {}, ) { const variables = useShallowMemo(() => paramVariables, paramVariables); - const { wsClosedDate, wsConnected } = useNetworkState([ - "wsClosedDate", - "wsConnected", - ]); + const { wsClosedDate, wsConnected, wsLastHeartbeatDate, wsLastRecoveryDate } = + useNetworkState([ + "wsClosedDate", + "wsConnected", + "wsLastHeartbeatDate", + "wsLastRecoveryDate", + ]); // State to force re-render and retry subscription const [retryTrigger, setRetryTrigger] = useState(0); + const [reconnectSyncTrigger, setReconnectSyncTrigger] = useState(0); const variableHashRef = useRef(JSON.stringify(variables)); const lastCursorRef = useRef(initialCursor); @@ -65,12 +72,105 @@ export default function useStreamQueryWithSubscription( // hasn't delivered any payload for some time, trigger a resubscribe. const lastSubscriptionDataAtRef = useRef(Date.now()); const lastLivenessKickAtRef = useRef(0); + const consecutiveStaleKicksRef = useRef(0); + const lastWsRestartAtRef = useRef(0); + const lastReloadAtRef = useRef(0); + const wsLastHeartbeatDateRef = useRef(wsLastHeartbeatDate); + const appStateRef = useRef(AppState.currentState); + const wsLastRecoveryDateRef = useRef(wsLastRecoveryDate); + + // Optional refetch-on-reconnect support. + // Goal: if WS was reconnected (wsClosedDate changes), force a base refetch once before resubscribing + // to reduce chances of cursor gaps. + const reconnectRefetchPendingRef = useRef(false); + const lastReconnectRefetchKeyRef = useRef(null); + + useEffect(() => { + wsLastHeartbeatDateRef.current = wsLastHeartbeatDate; + }, [wsLastHeartbeatDate]); + + useEffect(() => { + wsLastRecoveryDateRef.current = wsLastRecoveryDate; + }, [wsLastRecoveryDate]); + + useEffect(() => { + const sub = AppState.addEventListener("change", (next) => { + appStateRef.current = next; + if (next === "active") { + // Timers may have been paused/throttled; reset stale timers to avoid false kicks. + lastSubscriptionDataAtRef.current = Date.now(); + lastLivenessKickAtRef.current = 0; + consecutiveStaleKicksRef.current = 0; + } + }); + return () => sub.remove(); + }, []); + + useEffect(() => { + if (!refetchOnReconnect) return; + if (skip) return; + if (appStateRef.current !== "active") return; + if (!wsClosedDate) return; + if (!refetch) return; + + // Only refetch once per wsClosedDate value. + if (lastReconnectRefetchKeyRef.current === wsClosedDate) return; + lastReconnectRefetchKeyRef.current = wsClosedDate; + reconnectRefetchPendingRef.current = true; + + (async () => { + try { + try { + Sentry.addBreadcrumb({ + category: "graphql-subscription", + level: "info", + message: "refetch-on-reconnect start", + data: { subscriptionKey, wsClosedDate }, + }); + } catch (_e) { + // ignore + } + console.log( + `[${subscriptionKey}] WS reconnect detected, refetching base query to prevent gaps`, + { wsClosedDate }, + ); + await refetch(); + } catch (e) { + console.warn( + `[${subscriptionKey}] Refetch-on-reconnect failed (continuing with resubscribe)`, + e, + ); + + try { + Sentry.captureException(e, { + tags: { + subscriptionKey, + context: "refetch-on-reconnect", + }, + extra: { wsClosedDate }, + }); + } catch (_e2) { + // ignore + } + } finally { + reconnectRefetchPendingRef.current = false; + setReconnectSyncTrigger((x) => x + 1); + } + })(); + }, [refetch, refetchOnReconnect, skip, subscriptionKey, wsClosedDate]); useEffect(() => { if (!livenessStaleMs) return; if (skip) return; + const STALE_KICKS_BEFORE_WS_RESTART = 2; + const STALE_KICKS_BEFORE_RELOAD = 4; + const GLOBAL_RECOVERY_COOLDOWN_MS = 30_000; + // Separate throttle for escalations; resubscribe kicks are already throttled by livenessStaleMs. + const MIN_ESCALATION_INTERVAL_MS = 60_000; + const interval = setInterval(() => { + if (appStateRef.current !== "active") return; if (!wsConnected) return; const age = Date.now() - lastSubscriptionDataAtRef.current; if (age < livenessStaleMs) return; @@ -80,9 +180,131 @@ export default function useStreamQueryWithSubscription( if (now - lastLivenessKickAtRef.current < livenessStaleMs) return; lastLivenessKickAtRef.current = now; + consecutiveStaleKicksRef.current += 1; + + const wsHeartbeatAgeMs = (() => { + const hb = wsLastHeartbeatDateRef.current; + if (!hb) return null; + const last = Date.parse(hb); + return Number.isFinite(last) ? Date.now() - last : null; + })(); + console.warn( - `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`, + `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe (wsHeartbeatAgeMs=${ + wsHeartbeatAgeMs ?? "n/a" + }, kicks=${consecutiveStaleKicksRef.current})`, ); + + try { + Sentry.addBreadcrumb({ + category: "graphql-subscription", + level: "warning", + message: "liveness stale kick", + data: { + subscriptionKey, + ageMs: age, + livenessStaleMs, + wsHeartbeatAgeMs, + kicks: consecutiveStaleKicksRef.current, + }, + }); + } catch (_e) { + // ignore + } + + // Escalation policy for repeated consecutive stale kicks. + if ( + consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_RELOAD && + now - lastReloadAtRef.current >= MIN_ESCALATION_INTERVAL_MS + ) { + const lastRecovery = wsLastRecoveryDateRef.current + ? Date.parse(wsLastRecoveryDateRef.current) + : NaN; + if ( + Number.isFinite(lastRecovery) && + now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS + ) { + return; + } + + lastReloadAtRef.current = now; + networkActions.WSRecoveryTouch(); + console.warn( + `[${subscriptionKey}] Escalation: triggering reload after ${consecutiveStaleKicksRef.current} stale kicks`, + ); + + try { + Sentry.captureMessage("subscription escalated to reload", { + level: "warning", + tags: { subscriptionKey, context: "liveness" }, + extra: { + consecutiveKicks: consecutiveStaleKicksRef.current, + wsHeartbeatAgeMs, + ageMs: age, + livenessStaleMs, + }, + }); + } catch (_e) { + // ignore + } + + networkActions.triggerReload(); + } else if ( + consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_WS_RESTART && + now - lastWsRestartAtRef.current >= MIN_ESCALATION_INTERVAL_MS + ) { + const lastRecovery = wsLastRecoveryDateRef.current + ? Date.parse(wsLastRecoveryDateRef.current) + : NaN; + if ( + Number.isFinite(lastRecovery) && + now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS + ) { + return; + } + + lastWsRestartAtRef.current = now; + networkActions.WSRecoveryTouch(); + try { + console.warn( + `[${subscriptionKey}] Escalation: restarting WS after ${consecutiveStaleKicksRef.current} stale kicks`, + ); + + try { + Sentry.captureMessage("subscription escalated to ws restart", { + level: "warning", + tags: { subscriptionKey, context: "liveness" }, + extra: { + consecutiveKicks: consecutiveStaleKicksRef.current, + wsHeartbeatAgeMs, + ageMs: age, + livenessStaleMs, + }, + }); + } catch (_e2) { + // ignore + } + + network.apolloClient?.restartWS?.(); + } catch (error) { + console.warn( + `[${subscriptionKey}] Escalation: WS restart failed`, + error, + ); + + try { + Sentry.captureException(error, { + tags: { + subscriptionKey, + context: "liveness-ws-restart-failed", + }, + }); + } catch (_e2) { + // ignore + } + } + } + lastSubscriptionDataAtRef.current = now; setRetryTrigger((prev) => prev + 1); }, livenessCheckEveryMs); @@ -123,6 +345,7 @@ export default function useStreamQueryWithSubscription( loading, error, subscribeToMore, + refetch, } = useQuery(initialQuery, { ...queryParams, variables: queryVariables, @@ -179,6 +402,18 @@ export default function useStreamQueryWithSubscription( if (skip) return; // If skipping, do nothing if (!subscribeToMore) return; + if (appStateRef.current !== "active") return; + + // If we opted into refetch-on-reconnect and a reconnect refetch is still pending, + // wait to (re)subscribe until the base query has been refreshed. + if ( + refetchOnReconnect && + wsClosedDate && + reconnectRefetchPendingRef.current + ) { + return; + } + // If we're about to (re)subscribe, always cleanup any previous subscription first. // This is critical because React effect cleanups must be returned synchronously // from the effect, not from inside async callbacks. @@ -298,6 +533,7 @@ export default function useStreamQueryWithSubscription( retryCountRef.current = 0; subscriptionErrorRef.current = null; lastSubscriptionDataAtRef.current = Date.now(); + consecutiveStaleKicksRef.current = 0; } if (!subscriptionData.data) return prev; @@ -309,6 +545,74 @@ export default function useStreamQueryWithSubscription( const newItems = subscriptionData.data[subscriptionRootKey] || []; const existingItems = prev[queryRootKey] || []; + const mergeStart = Date.now(); + + // Fast path: when uniqKey === cursorKey, cursor ordering is ASC, and incoming items + // are strictly newer than the last existing item, we can append without rebuilding + // a full map + sort (avoids O(N log N) work on large lists). + if (uniqKey === cursorKey && existingItems.length > 0) { + const lastExistingCursor = + existingItems[existingItems.length - 1]?.[cursorKey]; + + // Filter items first (and update cursor), while verifying monotonicity. + let monotonic = true; + const filteredNewItems = []; + + for (const item of newItems) { + if (!shouldIncludeItemRef.current(item, contextRef.current)) { + continue; + } + + const itemCursor = item[cursorKey]; + if (itemCursor == null) continue; + + if ( + typeof lastExistingCursor === "number" && + typeof itemCursor === "number" && + itemCursor <= lastExistingCursor + ) { + monotonic = false; + break; + } + + // Update last cursor if item is newer + if ( + !lastCursorRef.current || + itemCursor > lastCursorRef.current + ) { + lastCursorRef.current = itemCursor; + } + + filteredNewItems.push(item); + } + + if (monotonic) { + if (filteredNewItems.length === 0) { + const tookMs = Date.now() - mergeStart; + if (tookMs > 100) { + console.warn( + `[${subscriptionKey}] updateQuery merge took ${tookMs}ms (fast-path, existing=${existingItems.length}, new=${newItems.length}, result=${existingItems.length})`, + ); + } + return prev; + } + + const resultItems = [...existingItems, ...filteredNewItems]; + + const tookMs = Date.now() - mergeStart; + if (tookMs > 100) { + console.warn( + `[${subscriptionKey}] updateQuery merge took ${tookMs}ms (fast-path, existing=${existingItems.length}, new=${newItems.length}, result=${resultItems.length})`, + ); + } + + return { + ...prev, + [queryRootKey]: resultItems, + }; + } + } + // 1) Build a map from existing items const itemMap = new Map(); existingItems.forEach((item) => { @@ -351,6 +655,13 @@ export default function useStreamQueryWithSubscription( return aCursor - bCursor; }); + const tookMs = Date.now() - mergeStart; + if (tookMs > 100) { + console.warn( + `[${subscriptionKey}] updateQuery merge took ${tookMs}ms (existing=${existingItems.length}, new=${newItems.length}, result=${sortedItems.length})`, + ); + } + return { [queryRootKey]: sortedItems, }; @@ -430,6 +741,8 @@ export default function useStreamQueryWithSubscription( maxRetries, livenessStaleMs, livenessCheckEveryMs, + refetchOnReconnect, + reconnectSyncTrigger, ]); return { diff --git a/src/hooks/useWsWatchdog.js b/src/hooks/useWsWatchdog.js index dec52c5..9beeb01 100644 --- a/src/hooks/useWsWatchdog.js +++ b/src/hooks/useWsWatchdog.js @@ -1,4 +1,6 @@ import { useEffect, useRef } from "react"; +import { AppState } from "react-native"; +import * as Sentry from "@sentry/react-native"; import { useNetworkState, networkActions } from "~/stores"; import network from "~/network"; import { createLogger } from "~/lib/logger"; @@ -14,24 +16,52 @@ const CHECK_EVERY_MS = 10_000; const MIN_RESTART_INTERVAL_MS = 30_000; export default function useWsWatchdog({ enabled = true } = {}) { - const { wsConnected, wsLastHeartbeatDate, hasInternetConnection } = - useNetworkState([ - "wsConnected", - "wsLastHeartbeatDate", - "hasInternetConnection", - ]); + const { + wsConnected, + wsLastHeartbeatDate, + wsLastRecoveryDate, + hasInternetConnection, + } = useNetworkState([ + "wsConnected", + "wsLastHeartbeatDate", + "wsLastRecoveryDate", + "hasInternetConnection", + ]); const lastRestartRef = useRef(0); + const wsLastHeartbeatDateRef = useRef(wsLastHeartbeatDate); + const appStateRef = useRef(AppState.currentState); + const wsLastRecoveryDateRef = useRef(wsLastRecoveryDate); + + useEffect(() => { + wsLastHeartbeatDateRef.current = wsLastHeartbeatDate; + }, [wsLastHeartbeatDate]); + + useEffect(() => { + wsLastRecoveryDateRef.current = wsLastRecoveryDate; + }, [wsLastRecoveryDate]); + + useEffect(() => { + const sub = AppState.addEventListener("change", (next) => { + appStateRef.current = next; + if (next === "active") { + // Avoid false positives right after app foreground (timers may have been throttled). + lastRestartRef.current = Date.now(); + } + }); + return () => sub.remove(); + }, []); useEffect(() => { if (!enabled) return; const interval = setInterval(() => { + if (appStateRef.current !== "active") return; if (!hasInternetConnection) return; if (!wsConnected) return; - if (!wsLastHeartbeatDate) return; + if (!wsLastHeartbeatDateRef.current) return; - const last = Date.parse(wsLastHeartbeatDate); + const last = Date.parse(wsLastHeartbeatDateRef.current); if (!Number.isFinite(last)) return; const age = Date.now() - last; @@ -39,23 +69,67 @@ export default function useWsWatchdog({ enabled = true } = {}) { const now = Date.now(); if (now - lastRestartRef.current < MIN_RESTART_INTERVAL_MS) return; + + // Global recovery throttle: avoid double restarts from multiple sources. + const lastRecovery = wsLastRecoveryDateRef.current + ? Date.parse(wsLastRecoveryDateRef.current) + : NaN; + if (Number.isFinite(lastRecovery)) { + const recoveryAge = now - lastRecovery; + if (recoveryAge < MIN_RESTART_INTERVAL_MS) return; + } + lastRestartRef.current = now; + networkActions.WSRecoveryTouch(); watchdogLogger.warn("WS heartbeat stale, triggering recovery", { ageMs: age, - lastHeartbeatDate: wsLastHeartbeatDate, + lastHeartbeatDate: wsLastHeartbeatDateRef.current, }); + try { + Sentry.addBreadcrumb({ + category: "websocket", + level: "warning", + message: "ws watchdog heartbeat stale", + data: { + ageMs: age, + lastHeartbeatDate: wsLastHeartbeatDateRef.current, + }, + }); + } catch (_e) { + // ignore + } + try { // First line recovery: restart websocket transport + try { + Sentry.captureMessage("ws watchdog restarting transport", { + level: "warning", + extra: { + ageMs: age, + lastHeartbeatDate: wsLastHeartbeatDateRef.current, + }, + }); + } catch (_e) { + // ignore + } network.apolloClient?.restartWS?.(); } catch (error) { watchdogLogger.error("WS restart failed", { error }); + + try { + Sentry.captureException(error, { + tags: { context: "ws-watchdog-restart-failed" }, + }); + } catch (_e) { + // ignore + } } // Second line recovery: if WS stays stale, do a full client reload setTimeout(() => { - const last2 = Date.parse(wsLastHeartbeatDate); + const last2 = Date.parse(wsLastHeartbeatDateRef.current); const age2 = Number.isFinite(last2) ? Date.now() - last2 : Infinity; if (age2 >= HEARTBEAT_STALE_MS) { watchdogLogger.warn( @@ -64,11 +138,21 @@ export default function useWsWatchdog({ enabled = true } = {}) { ageMs: age2, }, ); + + try { + Sentry.captureMessage("ws watchdog triggering reload", { + level: "warning", + extra: { ageMs: age2 }, + }); + } catch (_e) { + // ignore + } + networkActions.triggerReload(); } }, 10_000); }, CHECK_EVERY_MS); return () => clearInterval(interval); - }, [enabled, hasInternetConnection, wsConnected, wsLastHeartbeatDate]); + }, [enabled, hasInternetConnection, wsConnected]); } diff --git a/src/network/wsLink.js b/src/network/wsLink.js index 2d4a9bd..bc7abc5 100644 --- a/src/network/wsLink.js +++ b/src/network/wsLink.js @@ -19,6 +19,11 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) { // const MAX_RECONNECT_ATTEMPTS = 5; // Limit reconnection attempts const MAX_RECONNECT_ATTEMPTS = Infinity; // Limit reconnection attempts + // Graceful degradation: after prolonged WS reconnecting, surface app-level recovery + // via the existing reload mechanism (NetworkProviders will recreate Apollo). + const MAX_RECONNECT_TIME_MS = 5 * 60 * 1000; + let firstFailureAt = null; + let reconnectAttempts = 0; function getReconnectDelay() { // Exponential backoff with max delay @@ -80,6 +85,7 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) { wsLogger.info("WebSocket connected"); activeSocket = socket; reconnectAttempts = 0; // Reset attempts on successful connection + firstFailureAt = null; networkActions.WSConnected(); networkActions.WSTouch(); cancelReconnect(); // Cancel any pending reconnects @@ -98,6 +104,10 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) { }); networkActions.WSClosed(); + if (!firstFailureAt) { + firstFailureAt = Date.now(); + } + // Clear socket and timeouts activeSocket = undefined; if (pingTimeout) { @@ -107,6 +117,20 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) { // Schedule reconnect unless explicitly closed (1000) or going away (1001) if (event.code !== 1000 && event.code !== 1001) { + const reconnectAge = Date.now() - firstFailureAt; + if (reconnectAge >= MAX_RECONNECT_TIME_MS) { + wsLogger.warn( + "WebSocket reconnecting too long, triggering app reload", + { + reconnectAgeMs: reconnectAge, + reconnectAttempts, + lastCloseCode: event.code, + }, + ); + networkActions.triggerReload(); + return; + } + reconnectAttempts++; scheduleReconnect(); } else { diff --git a/src/stores/network.js b/src/stores/network.js index 3b8f89b..2113f08 100644 --- a/src/stores/network.js +++ b/src/stores/network.js @@ -7,6 +7,7 @@ export default createAtom(({ merge, get }) => { wsConnectedDate: null, wsClosedDate: null, wsLastHeartbeatDate: null, + wsLastRecoveryDate: null, triggerReload: false, initialized: true, hasInternetConnection: true, @@ -48,6 +49,13 @@ export default createAtom(({ merge, get }) => { wsLastHeartbeatDate: new Date().toISOString(), }); }, + WSRecoveryTouch: () => { + // Shared throttle marker to avoid multiple parts of the app triggering WS recovery + // at the same time (watchdog + per-subscription liveness + lifecycle). + merge({ + wsLastRecoveryDate: new Date().toISOString(), + }); + }, setHasInternetConnection: (status) => merge({ hasInternetConnection: status }), },