From 5dfb064c2cda2f1a1ca9346d6d659d8339a25af9 Mon Sep 17 00:00:00 2001 From: devthejo Date: Sun, 18 Jan 2026 16:22:42 +0100 Subject: [PATCH] fix(ws): stabilization try 6 + typo --- .../AggregatedMessagesSubscription.js | 34 +++++++++++++++++++ src/hooks/useLatestWithSubscription.js | 7 +++- src/hooks/useStreamQueryWithSubscription.js | 8 ++++- src/network/WebSocketLink.js | 12 ++++++- src/stores/tree.js | 2 +- 5 files changed, 59 insertions(+), 4 deletions(-) diff --git a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js index 8ad8698..0b6e460 100644 --- a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js +++ b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js @@ -3,6 +3,7 @@ import useStreamQueryWithSubscription from "~/hooks/useStreamQueryWithSubscripti import { aggregatedMessagesActions } from "~/stores"; import { createLogger } from "~/lib/logger"; import { FEATURE_SCOPES, NETWORK_SCOPES } from "~/lib/logger/scopes"; +import { AppState } from "react-native"; import { AGGREGATED_MESSAGES_QUERY, @@ -17,12 +18,14 @@ const messagesLogger = createLogger({ const AggregatedMessagesSubscription = () => { // Ref to track if we've already run the cleanup const initRunRef = useRef(false); + const lastForegroundCatchupAtRef = useRef(0); // Aggregated messages subscription const { data: messagesData, error: messagesError, loading, + refetch, } = useStreamQueryWithSubscription( AGGREGATED_MESSAGES_QUERY, AGGREGATED_MESSAGES_SUBSCRIPTION, @@ -47,6 +50,37 @@ const AggregatedMessagesSubscription = () => { }, ); + // Foreground catch-up: on mobile, WS can take time to resume after background. + // Do a lightweight refresh shortly after the app becomes active. + useEffect(() => { + const sub = AppState.addEventListener("change", (next) => { + if (next !== "active") return; + + const now = Date.now(); + // Avoid spamming refetches if the app toggles state quickly. + if (now - lastForegroundCatchupAtRef.current < 15_000) return; + lastForegroundCatchupAtRef.current = now; + + if (!refetch) return; + try { + messagesLogger.info( + "Foreground catch-up: refetching aggregated messages", + ); + Promise.resolve() + .then(() => refetch()) + .catch((e) => { + messagesLogger.warn("Foreground catch-up refetch failed", { + error: e?.message, + }); + }); + } catch (_e) { + // ignore + } + }); + + return () => sub.remove(); + }, [refetch]); + // Update loading state useEffect(() => { aggregatedMessagesActions.setLoading(loading); diff --git a/src/hooks/useLatestWithSubscription.js b/src/hooks/useLatestWithSubscription.js index 16b35af..ac8156a 100644 --- a/src/hooks/useLatestWithSubscription.js +++ b/src/hooks/useLatestWithSubscription.js @@ -140,7 +140,12 @@ export default function useLatestWithSubscription( `[${subscriptionKey}] WS reconnect detected, refetching base query to prevent gaps`, { wsClosedDate }, ); - await refetch(); + // Don't block re-subscription forever if refetch is slow/stuck. + const maxWaitMs = 8000; + await Promise.race([ + Promise.resolve().then(() => refetch()), + new Promise((resolve) => setTimeout(resolve, maxWaitMs)), + ]); } catch (e) { console.warn( `[${subscriptionKey}] Refetch-on-reconnect failed (continuing with resubscribe)`, diff --git a/src/hooks/useStreamQueryWithSubscription.js b/src/hooks/useStreamQueryWithSubscription.js index d3f11d9..4d0aa24 100644 --- a/src/hooks/useStreamQueryWithSubscription.js +++ b/src/hooks/useStreamQueryWithSubscription.js @@ -143,7 +143,12 @@ export default function useStreamQueryWithSubscription( `[${subscriptionKey}] WS reconnect detected, refetching base query to prevent gaps`, { wsClosedDate }, ); - await refetch(); + // Don't block re-subscription forever if refetch is slow/stuck. + const maxWaitMs = 8000; + await Promise.race([ + Promise.resolve().then(() => refetch()), + new Promise((resolve) => setTimeout(resolve, maxWaitMs)), + ]); } catch (e) { console.warn( `[${subscriptionKey}] Refetch-on-reconnect failed (continuing with resubscribe)`, @@ -800,5 +805,6 @@ export default function useStreamQueryWithSubscription( data: queryData, loading, error, + refetch, }; } diff --git a/src/network/WebSocketLink.js b/src/network/WebSocketLink.js index b4927ce..7f88021 100644 --- a/src/network/WebSocketLink.js +++ b/src/network/WebSocketLink.js @@ -3,6 +3,7 @@ import { print } from "graphql"; // import { createClient } from "graphql-ws"; import { createRestartableClient } from "./graphqlWs"; import network from "~/network"; +import { networkActions } from "~/stores"; export default class WebSocketLink extends ApolloLink { constructor(options) { @@ -18,7 +19,16 @@ export default class WebSocketLink extends ApolloLink { return this.client.subscribe( { ...operation, query: print(operation.query) }, { - next: sink.next.bind(sink), + next: (value) => { + // Any subscription payload means the transport is alive. + // Touch WS heartbeat so watchdog/liveness logic doesn't falsely conclude staleness. + try { + networkActions.WSTouch(); + } catch (_e) { + // ignore + } + sink.next(value); + }, complete: sink.complete.bind(sink), error: (err) => { // Don't propagate client restart events as errors diff --git a/src/stores/tree.js b/src/stores/tree.js index 1d84313..94988a2 100644 --- a/src/stores/tree.js +++ b/src/stores/tree.js @@ -15,7 +15,7 @@ export default createAtom(({ merge, getActions }) => { const networkActions = getActions("network"); const alertActions = getActions("alert"); - const navActions = getActions("alert"); + const navActions = getActions("nav"); const fcmActions = getActions("fcm"); const paramsActions = getActions("params"); const notificationsActions = getActions("notifications");