From 6e717077f9acc2cbb853b5bf2f09bc3463a0a861 Mon Sep 17 00:00:00 2001 From: devthejo Date: Sat, 17 Jan 2026 21:34:11 +0100 Subject: [PATCH] fix(ws): stabilization try 3 --- .../AggregatedMessagesSubscription.js | 4 ++ .../alerting/AlertingSubscription.js | 5 ++ src/containers/AppLifecycleListener.js | 59 ++++++++++----- .../components/LiveMessagesFetcher.js | 65 +---------------- src/containers/MessagesFetcher/gql.js | 2 +- src/hooks/useStreamQueryWithSubscription.js | 65 +++++++++++++++-- src/scenes/Profile/index.js | 72 ++++++++++++++++++- src/stores/aggregatedMessages.js | 21 ++++++ 8 files changed, 202 insertions(+), 91 deletions(-) diff --git a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js index f2dca24..8ad8698 100644 --- a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js +++ b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js @@ -34,9 +34,13 @@ const AggregatedMessagesSubscription = () => { subscriptionKey: "aggregated-messages", fetchPolicy: "network-only", nextFetchPolicy: "cache-first", + notifyOnNetworkStatusChange: false, // Chat is latency-sensitive; if the WS transport is up but this subscription // delivers nothing for a while, force a resubscribe. livenessStaleMs: 60_000, + // If we detect staleness, first do a base refetch to catch-up, then resubscribe. + refetchOnStale: true, + refetchOnStaleCooldownMs: 60_000, // If WS reconnects, refetch base query once before resubscribing to reduce cursor gaps. refetchOnReconnect: true, diff --git a/src/app/subscriptions/alerting/AlertingSubscription.js b/src/app/subscriptions/alerting/AlertingSubscription.js index 674ee34..f2f04e8 100644 --- a/src/app/subscriptions/alerting/AlertingSubscription.js +++ b/src/app/subscriptions/alerting/AlertingSubscription.js @@ -25,11 +25,16 @@ const AlertingSubscription = () => { uniqKey: "id", initialCursor: -1, subscriptionKey: "alerting", + notifyOnNetworkStatusChange: false, // Alerting is latency-sensitive; add per-subscription liveness so it can't go stale // while WS transport heartbeat stays fresh. livenessStaleMs: 45_000, livenessCheckEveryMs: 12_000, + // If we detect staleness, first do a base refetch to catch-up, then resubscribe. + refetchOnStale: true, + refetchOnStaleCooldownMs: 60_000, + // If WS reconnects, refetch base query once before resubscribing to reduce cursor gaps. refetchOnReconnect: true, }); diff --git a/src/containers/AppLifecycleListener.js b/src/containers/AppLifecycleListener.js index c932e8f..6d94306 100644 --- a/src/containers/AppLifecycleListener.js +++ b/src/containers/AppLifecycleListener.js @@ -177,8 +177,15 @@ const AppLifecycleListener = () => { const appState = useRef(AppState.currentState); const activeTimeout = useRef(null); const lastActiveTimestamp = useRef(Date.now()); + const lastWsRestartAtRef = useRef(0); + const MIN_WS_RESTART_INTERVAL_MS = 15_000; const { completed } = usePermissionWizardState(["completed"]); - const { hasInternetConnection } = useNetworkState(["hasInternetConnection"]); + const { hasInternetConnection, wsConnected, wsLastHeartbeatDate } = + useNetworkState([ + "hasInternetConnection", + "wsConnected", + "wsLastHeartbeatDate", + ]); useEffect(() => { const handleAppStateChange = (nextAppState) => { @@ -202,9 +209,10 @@ const AppLifecycleListener = () => { (appState.current === "background" || appState.current === "inactive") ) { const timeSinceLastActive = Date.now() - lastActiveTimestamp.current; - if (timeSinceLastActive > 10000) { - clearTimeout(activeTimeout.current); + clearTimeout(activeTimeout.current); + // Permissions/sync are heavier; keep them for longer background durations. + if (timeSinceLastActive > 10_000) { // First check permissions immediately lifecycleLogger.info( "App returned to foreground, checking permissions", @@ -230,20 +238,37 @@ const AppLifecycleListener = () => { error: error.message, }); }); - - // Then handle WebSocket reconnection with proper error handling - activeTimeout.current = setTimeout(() => { - try { - lifecycleLogger.info("Restarting WebSocket connection"); - networkActions.WSRecoveryTouch(); - network.apolloClient.restartWS(); - } catch (error) { - lifecycleLogger.error("Failed to restart WebSocket", { error }); - } finally { - activeTimeout.current = null; - } - }, 2000); } + + // Always consider restarting WS on foreground (iOS can suspend sockets even for short durations). + activeTimeout.current = setTimeout(() => { + try { + const now = Date.now(); + if (now - lastWsRestartAtRef.current < MIN_WS_RESTART_INTERVAL_MS) { + return; + } + + const hbMs = wsLastHeartbeatDate + ? Date.parse(wsLastHeartbeatDate) + : NaN; + const heartbeatAgeMs = Number.isFinite(hbMs) ? now - hbMs : null; + + lifecycleLogger.info("Foreground WS check", { + inactiveTime: timeSinceLastActive, + wsConnected, + heartbeatAgeMs, + }); + + lastWsRestartAtRef.current = now; + lifecycleLogger.info("Restarting WebSocket connection"); + networkActions.WSRecoveryTouch(); + network.apolloClient.restartWS(); + } catch (error) { + lifecycleLogger.error("Failed to restart WebSocket", { error }); + } finally { + activeTimeout.current = null; + } + }, 1500); } appState.current = nextAppState; @@ -266,7 +291,7 @@ const AppLifecycleListener = () => { activeTimeout.current = null; } }; - }, [completed, hasInternetConnection]); + }, [completed, hasInternetConnection, wsConnected, wsLastHeartbeatDate]); return null; }; diff --git a/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js b/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js index 42360a2..2b7faf0 100644 --- a/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js +++ b/src/containers/MessagesFetcher/components/LiveMessagesFetcher.js @@ -1,15 +1,11 @@ -import React, { useEffect, useMemo, useRef } from "react"; +import React, { useEffect } from "react"; import { useIsFocused } from "@react-navigation/native"; -import { useQuery } from "@apollo/client"; -import * as Sentry from "@sentry/react-native"; import ChatMessages from "~/containers/ChatMessages"; import { useAggregatedMessagesState, aggregatedMessagesActions, } from "~/stores"; -import { SELECT_MANY_MESSAGE_QUERY } from "../gql"; - const LiveMessagesFetcher = ({ scrollViewRef, alertId }) => { // Get messages from aggregated messages store const { @@ -33,65 +29,6 @@ const LiveMessagesFetcher = ({ scrollViewRef, alertId }) => { const isFocused = useIsFocused(); - // Fallback: when focused on a chat, periodically refetch messages for this alert. - // This helps recover faster if the global aggregated subscription is delayed/stale. - const lastSeenCountRef = useRef(0); - const lastChangeAtRef = useRef(Date.now()); - - const { refetch } = useQuery(SELECT_MANY_MESSAGE_QUERY, { - variables: { alertId }, - skip: !isFocused || !alertId, - fetchPolicy: "network-only", - nextFetchPolicy: "cache-first", - notifyOnNetworkStatusChange: false, - }); - - useEffect(() => { - if (!isFocused) return; - const count = messagesList?.length ?? 0; - if (count !== lastSeenCountRef.current) { - lastSeenCountRef.current = count; - lastChangeAtRef.current = Date.now(); - } - }, [isFocused, messagesList]); - - useEffect(() => { - if (!isFocused) return; - if (!alertId) return; - - const CHECK_EVERY_MS = 15_000; - const STALE_MS = 45_000; - - const interval = setInterval(() => { - const age = Date.now() - lastChangeAtRef.current; - if (age < STALE_MS) return; - - try { - Sentry.addBreadcrumb({ - category: "chat", - level: "warning", - message: "chat fallback refetch (stale)", - data: { alertId, ageMs: age }, - }); - } catch (_e) { - // ignore - } - - refetch?.().catch((e) => { - try { - Sentry.captureException(e, { - tags: { context: "chat-fallback-refetch" }, - extra: { alertId, ageMs: age }, - }); - } catch (_e2) { - // ignore - } - }); - }, CHECK_EVERY_MS); - - return () => clearInterval(interval); - }, [alertId, isFocused, refetch]); - useEffect(() => { if (!isFocused || !messagesList) { return; diff --git a/src/containers/MessagesFetcher/gql.js b/src/containers/MessagesFetcher/gql.js index 97a2271..ff4081e 100644 --- a/src/containers/MessagesFetcher/gql.js +++ b/src/containers/MessagesFetcher/gql.js @@ -33,7 +33,7 @@ export const SELECT_STREAM_MESSAGE_SUBSCRIPTION = gql` selectStreamMessage( where: { alertId: { _eq: $alertId } } cursor: { initial_value: { id: $cursor }, ordering: ASC } - batch_size: 100 + batch_size: 30 ) { ...MessageFields } diff --git a/src/hooks/useStreamQueryWithSubscription.js b/src/hooks/useStreamQueryWithSubscription.js index 8d01376..d3f11d9 100644 --- a/src/hooks/useStreamQueryWithSubscription.js +++ b/src/hooks/useStreamQueryWithSubscription.js @@ -31,18 +31,26 @@ export default function useStreamQueryWithSubscription( livenessStaleMs = null, livenessCheckEveryMs = 15_000, refetchOnReconnect = false, + refetchOnStale = false, + refetchOnStaleCooldownMs = 60_000, ...queryParams } = {}, ) { const variables = useShallowMemo(() => paramVariables, paramVariables); - const { wsClosedDate, wsConnected, wsLastHeartbeatDate, wsLastRecoveryDate } = - useNetworkState([ - "wsClosedDate", - "wsConnected", - "wsLastHeartbeatDate", - "wsLastRecoveryDate", - ]); + const { + wsClosedDate, + wsConnected, + wsLastHeartbeatDate, + wsLastRecoveryDate, + hasInternetConnection, + } = useNetworkState([ + "wsClosedDate", + "wsConnected", + "wsLastHeartbeatDate", + "wsLastRecoveryDate", + "hasInternetConnection", + ]); // State to force re-render and retry subscription const [retryTrigger, setRetryTrigger] = useState(0); @@ -72,6 +80,7 @@ export default function useStreamQueryWithSubscription( // hasn't delivered any payload for some time, trigger a resubscribe. const lastSubscriptionDataAtRef = useRef(Date.now()); const lastLivenessKickAtRef = useRef(0); + const lastStaleRefetchAtRef = useRef(0); const consecutiveStaleKicksRef = useRef(0); const lastWsRestartAtRef = useRef(0); const lastReloadAtRef = useRef(0); @@ -171,11 +180,49 @@ export default function useStreamQueryWithSubscription( const interval = setInterval(() => { if (appStateRef.current !== "active") return; + if (!hasInternetConnection) return; if (!wsConnected) return; const age = Date.now() - lastSubscriptionDataAtRef.current; if (age < livenessStaleMs) return; const now = Date.now(); + + // Catch-up refetch: if enabled, first try an HTTP refetch (base query) + // to fill potential gaps, then proceed with resubscribe logic. + if ( + refetchOnStale && + refetch && + now - lastStaleRefetchAtRef.current >= refetchOnStaleCooldownMs + ) { + lastStaleRefetchAtRef.current = now; + try { + Sentry.addBreadcrumb({ + category: "graphql-subscription", + level: "warning", + message: "refetch-on-stale start", + data: { + subscriptionKey, + ageMs: age, + livenessStaleMs, + }, + }); + } catch (_e) { + // ignore + } + + Promise.resolve() + .then(() => refetch()) + .catch((e) => { + try { + Sentry.captureException(e, { + tags: { subscriptionKey, context: "refetch-on-stale" }, + extra: { ageMs: age, livenessStaleMs }, + }); + } catch (_e2) { + // ignore + } + }); + } // Avoid spamming resubscribe triggers. if (now - lastLivenessKickAtRef.current < livenessStaleMs) return; lastLivenessKickAtRef.current = now; @@ -313,9 +360,13 @@ export default function useStreamQueryWithSubscription( }, [ livenessStaleMs, livenessCheckEveryMs, + refetchOnStale, + refetchOnStaleCooldownMs, + refetch, skip, subscriptionKey, wsConnected, + hasInternetConnection, ]); useEffect(() => { diff --git a/src/scenes/Profile/index.js b/src/scenes/Profile/index.js index 5c8e734..da31fe4 100644 --- a/src/scenes/Profile/index.js +++ b/src/scenes/Profile/index.js @@ -1,19 +1,22 @@ -import React, { useEffect } from "react"; +import React, { useEffect, useRef } from "react"; import { ScrollView, View } from "react-native"; import Loader from "~/components/Loader"; import { useSubscription } from "@apollo/client"; import Error from "~/components/Error"; +import * as Sentry from "@sentry/react-native"; import { LOAD_PROFILE_SUBSCRIPTION } from "./gql"; -import { useSessionState } from "~/stores"; +import { useNetworkState, useSessionState } from "~/stores"; import { createLogger } from "~/lib/logger"; import { FEATURE_SCOPES } from "~/lib/logger/scopes"; import withConnectivity from "~/hoc/withConnectivity"; +import { useIsFocused } from "@react-navigation/native"; + import Form from "./Form"; const profileLogger = createLogger({ @@ -23,6 +26,14 @@ const profileLogger = createLogger({ export default withConnectivity(function Profile({ navigation, route }) { const { userId } = useSessionState(["userId"]); + const { wsClosedDate, wsConnected, hasInternetConnection } = useNetworkState([ + "wsClosedDate", + "wsConnected", + "hasInternetConnection", + ]); + const isFocused = useIsFocused(); + + const lastDataAtRef = useRef(Date.now()); // profileLogger.debug("Profile user ID", { userId }); const { data, loading, error, restart } = useSubscription( LOAD_PROFILE_SUBSCRIPTION, @@ -38,6 +49,63 @@ export default withConnectivity(function Profile({ navigation, route }) { // eslint-disable-next-line react-hooks/exhaustive-deps }, [userId]); + useEffect(() => { + if (!wsClosedDate) return; + // WS was closed/reconnected; restart the subscription to avoid being stuck. + try { + profileLogger.info( + "WS reconnect detected, restarting profile subscription", + { + wsClosedDate, + }, + ); + restart(); + } catch (_e) { + // ignore + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [wsClosedDate]); + + useEffect(() => { + if (!isFocused) return; + if (!hasInternetConnection) return; + if (!wsConnected) return; + + const interval = setInterval(() => { + const age = Date.now() - lastDataAtRef.current; + if (age < 45_000) return; + + try { + Sentry.addBreadcrumb({ + category: "profile", + level: "warning", + message: "profile subscription stale, restarting", + data: { ageMs: age }, + }); + } catch (_e) { + // ignore + } + + profileLogger.warn("Profile subscription stale, restarting", { + ageMs: age, + }); + try { + lastDataAtRef.current = Date.now(); + restart(); + } catch (_e) { + // ignore + } + }, 15_000); + + return () => clearInterval(interval); + }, [hasInternetConnection, isFocused, restart, wsConnected]); + + useEffect(() => { + if (data?.selectOneUser) { + lastDataAtRef.current = Date.now(); + } + }, [data]); + const clearAuthWaitParams = React.useCallback(() => { navigation.setParams({ waitingSmsType: undefined, diff --git a/src/stores/aggregatedMessages.js b/src/stores/aggregatedMessages.js index a63be0c..224cc28 100644 --- a/src/stores/aggregatedMessages.js +++ b/src/stores/aggregatedMessages.js @@ -156,6 +156,26 @@ export default createAtom(({ merge, set, get, reset }) => { mergeMessagesList({ realMessagesList: messages }); }; + const replaceMessagesForAlert = (alertId, messagesForAlert) => { + const { realMessagesList } = get(); + + const remaining = realMessagesList.filter( + (m) => m.alertId !== alertId && m.alertId !== Number(alertId), + ); + + const next = [...remaining, ...(messagesForAlert || [])]; + + // Preserve global ordering by id asc. + next.sort((a, b) => { + const aId = a?.id; + const bId = b?.id; + if (typeof aId === "number" && typeof bId === "number") return aId - bId; + return String(aId).localeCompare(String(bId)); + }); + + mergeMessagesList({ realMessagesList: next }); + }; + const debouncedUpdateMessagesList = debounce(updateMessagesList, 300, { trailing: true, }); @@ -218,6 +238,7 @@ export default createAtom(({ merge, set, get, reset }) => { init, reset, updateMessagesList, + replaceMessagesForAlert, debouncedUpdateMessagesList, initializeAlert: (alert) => { addVirtualFirstMessage(alert);