fix(ws): stabilization try 2

This commit is contained in:
devthejo 2026-01-17 13:20:29 +01:00
parent f182c9f080
commit 239ca4d86d
No known key found for this signature in database
GPG key ID: 00CCA7A92B1D5351
11 changed files with 790 additions and 28 deletions

View file

@ -37,6 +37,9 @@ const AggregatedMessagesSubscription = () => {
// Chat is latency-sensitive; if the WS transport is up but this subscription // Chat is latency-sensitive; if the WS transport is up but this subscription
// delivers nothing for a while, force a resubscribe. // delivers nothing for a while, force a resubscribe.
livenessStaleMs: 60_000, livenessStaleMs: 60_000,
// If WS reconnects, refetch base query once before resubscribing to reduce cursor gaps.
refetchOnReconnect: true,
}, },
); );

View file

@ -31,7 +31,7 @@ export const AGGREGATED_MESSAGES_SUBSCRIPTION = gql`
subscription aggregatedMessagesSubscription($cursor: Int) { subscription aggregatedMessagesSubscription($cursor: Int) {
selectStreamMessage( selectStreamMessage(
cursor: { initial_value: { id: $cursor }, ordering: ASC } cursor: { initial_value: { id: $cursor }, ordering: ASC }
batch_size: 100 batch_size: 30
) { ) {
...AggMessageFields ...AggMessageFields
} }

View file

@ -24,6 +24,14 @@ const AlertingSubscription = () => {
cursorKey: "updatedSeq", cursorKey: "updatedSeq",
uniqKey: "id", uniqKey: "id",
initialCursor: -1, initialCursor: -1,
subscriptionKey: "alerting",
// Alerting is latency-sensitive; add per-subscription liveness so it can't go stale
// while WS transport heartbeat stays fresh.
livenessStaleMs: 45_000,
livenessCheckEveryMs: 12_000,
// If WS reconnects, refetch base query once before resubscribing to reduce cursor gaps.
refetchOnReconnect: true,
}); });
if (alertingError) { if (alertingError) {

View file

@ -58,7 +58,7 @@ export const ALERTING_SUBSCRIPTION = gql`
subscription alertingSubscription($cursor: bigint!) { subscription alertingSubscription($cursor: bigint!) {
selectStreamAlerting( selectStreamAlerting(
cursor: { initial_value: { updatedSeq: $cursor }, ordering: ASC } cursor: { initial_value: { updatedSeq: $cursor }, ordering: ASC }
batch_size: 100 batch_size: 30
) { ) {
...AlertingFields ...AlertingFields
} }

View file

@ -10,6 +10,7 @@ import {
permissionsActions, permissionsActions,
usePermissionWizardState, usePermissionWizardState,
useNetworkState, useNetworkState,
networkActions,
} from "~/stores"; } from "~/stores";
import { secureStore } from "~/storage/memorySecureStore"; import { secureStore } from "~/storage/memorySecureStore";
import memoryAsyncStorage from "~/storage/memoryAsyncStorage"; import memoryAsyncStorage from "~/storage/memoryAsyncStorage";
@ -234,6 +235,7 @@ const AppLifecycleListener = () => {
activeTimeout.current = setTimeout(() => { activeTimeout.current = setTimeout(() => {
try { try {
lifecycleLogger.info("Restarting WebSocket connection"); lifecycleLogger.info("Restarting WebSocket connection");
networkActions.WSRecoveryTouch();
network.apolloClient.restartWS(); network.apolloClient.restartWS();
} catch (error) { } catch (error) {
lifecycleLogger.error("Failed to restart WebSocket", { error }); lifecycleLogger.error("Failed to restart WebSocket", { error });

View file

@ -1,11 +1,15 @@
import React, { useMemo, useEffect } from "react"; import React, { useEffect, useMemo, useRef } from "react";
import { useIsFocused } from "@react-navigation/native"; import { useIsFocused } from "@react-navigation/native";
import { useQuery } from "@apollo/client";
import * as Sentry from "@sentry/react-native";
import ChatMessages from "~/containers/ChatMessages"; import ChatMessages from "~/containers/ChatMessages";
import { import {
useAggregatedMessagesState, useAggregatedMessagesState,
aggregatedMessagesActions, aggregatedMessagesActions,
} from "~/stores"; } from "~/stores";
import { SELECT_MANY_MESSAGE_QUERY } from "../gql";
const LiveMessagesFetcher = ({ scrollViewRef, alertId }) => { const LiveMessagesFetcher = ({ scrollViewRef, alertId }) => {
// Get messages from aggregated messages store // Get messages from aggregated messages store
const { const {
@ -29,6 +33,65 @@ const LiveMessagesFetcher = ({ scrollViewRef, alertId }) => {
const isFocused = useIsFocused(); const isFocused = useIsFocused();
// Fallback: when focused on a chat, periodically refetch messages for this alert.
// This helps recover faster if the global aggregated subscription is delayed/stale.
const lastSeenCountRef = useRef(0);
const lastChangeAtRef = useRef(Date.now());
const { refetch } = useQuery(SELECT_MANY_MESSAGE_QUERY, {
variables: { alertId },
skip: !isFocused || !alertId,
fetchPolicy: "network-only",
nextFetchPolicy: "cache-first",
notifyOnNetworkStatusChange: false,
});
useEffect(() => {
if (!isFocused) return;
const count = messagesList?.length ?? 0;
if (count !== lastSeenCountRef.current) {
lastSeenCountRef.current = count;
lastChangeAtRef.current = Date.now();
}
}, [isFocused, messagesList]);
useEffect(() => {
if (!isFocused) return;
if (!alertId) return;
const CHECK_EVERY_MS = 15_000;
const STALE_MS = 45_000;
const interval = setInterval(() => {
const age = Date.now() - lastChangeAtRef.current;
if (age < STALE_MS) return;
try {
Sentry.addBreadcrumb({
category: "chat",
level: "warning",
message: "chat fallback refetch (stale)",
data: { alertId, ageMs: age },
});
} catch (_e) {
// ignore
}
refetch?.().catch((e) => {
try {
Sentry.captureException(e, {
tags: { context: "chat-fallback-refetch" },
extra: { alertId, ageMs: age },
});
} catch (_e2) {
// ignore
}
});
}, CHECK_EVERY_MS);
return () => clearInterval(interval);
}, [alertId, isFocused, refetch]);
useEffect(() => { useEffect(() => {
if (!isFocused || !messagesList) { if (!isFocused || !messagesList) {
return; return;

View file

@ -1,7 +1,9 @@
import { useRef, useEffect, useMemo, useState } from "react"; import { useRef, useEffect, useMemo, useState } from "react";
import { useQuery } from "@apollo/client"; import { useQuery } from "@apollo/client";
import * as Sentry from "@sentry/react-native"; import * as Sentry from "@sentry/react-native";
import { useNetworkState } from "~/stores"; import { AppState } from "react-native";
import { useNetworkState, networkActions } from "~/stores";
import network from "~/network";
import useShallowMemo from "./useShallowMemo"; import useShallowMemo from "./useShallowMemo";
// Constants for retry configuration // Constants for retry configuration
@ -35,18 +37,23 @@ export default function useLatestWithSubscription(
maxRetries = MAX_RETRIES, maxRetries = MAX_RETRIES,
livenessStaleMs = null, livenessStaleMs = null,
livenessCheckEveryMs = 15_000, livenessCheckEveryMs = 15_000,
refetchOnReconnect = false,
...queryParams ...queryParams
} = {}, } = {},
) { ) {
const variables = useShallowMemo(() => paramVariables, paramVariables); const variables = useShallowMemo(() => paramVariables, paramVariables);
const { wsClosedDate, wsConnected } = useNetworkState([ const { wsClosedDate, wsConnected, wsLastHeartbeatDate, wsLastRecoveryDate } =
"wsClosedDate", useNetworkState([
"wsConnected", "wsClosedDate",
]); "wsConnected",
"wsLastHeartbeatDate",
"wsLastRecoveryDate",
]);
// State to force re-render and retry subscription // State to force re-render and retry subscription
const [retryTrigger, setRetryTrigger] = useState(0); const [retryTrigger, setRetryTrigger] = useState(0);
const [reconnectSyncTrigger, setReconnectSyncTrigger] = useState(0);
const variableHashRef = useRef(JSON.stringify(variables)); const variableHashRef = useRef(JSON.stringify(variables));
const highestIdRef = useRef(null); const highestIdRef = useRef(null);
@ -71,12 +78,105 @@ export default function useLatestWithSubscription(
// Per-subscription liveness watchdog // Per-subscription liveness watchdog
const lastSubscriptionDataAtRef = useRef(Date.now()); const lastSubscriptionDataAtRef = useRef(Date.now());
const lastLivenessKickAtRef = useRef(0); const lastLivenessKickAtRef = useRef(0);
const consecutiveStaleKicksRef = useRef(0);
const lastWsRestartAtRef = useRef(0);
const lastReloadAtRef = useRef(0);
const wsLastHeartbeatDateRef = useRef(wsLastHeartbeatDate);
const appStateRef = useRef(AppState.currentState);
const wsLastRecoveryDateRef = useRef(wsLastRecoveryDate);
// Optional refetch-on-reconnect support.
// Goal: if WS was reconnected (wsClosedDate changes), force a base refetch once before resubscribing
// to reduce chances of cursor gaps.
const reconnectRefetchPendingRef = useRef(false);
const lastReconnectRefetchKeyRef = useRef(null);
useEffect(() => {
wsLastHeartbeatDateRef.current = wsLastHeartbeatDate;
}, [wsLastHeartbeatDate]);
useEffect(() => {
wsLastRecoveryDateRef.current = wsLastRecoveryDate;
}, [wsLastRecoveryDate]);
useEffect(() => {
const sub = AppState.addEventListener("change", (next) => {
appStateRef.current = next;
if (next === "active") {
// Timers may have been paused/throttled; reset stale timers to avoid false kicks.
lastSubscriptionDataAtRef.current = Date.now();
lastLivenessKickAtRef.current = 0;
consecutiveStaleKicksRef.current = 0;
}
});
return () => sub.remove();
}, []);
useEffect(() => {
if (!refetchOnReconnect) return;
if (skip) return;
if (appStateRef.current !== "active") return;
if (!wsClosedDate) return;
if (!refetch) return;
// Only refetch once per wsClosedDate value.
if (lastReconnectRefetchKeyRef.current === wsClosedDate) return;
lastReconnectRefetchKeyRef.current = wsClosedDate;
reconnectRefetchPendingRef.current = true;
(async () => {
try {
try {
Sentry.addBreadcrumb({
category: "graphql-subscription",
level: "info",
message: "refetch-on-reconnect start",
data: { subscriptionKey, wsClosedDate },
});
} catch (_e) {
// ignore
}
console.log(
`[${subscriptionKey}] WS reconnect detected, refetching base query to prevent gaps`,
{ wsClosedDate },
);
await refetch();
} catch (e) {
console.warn(
`[${subscriptionKey}] Refetch-on-reconnect failed (continuing with resubscribe)`,
e,
);
try {
Sentry.captureException(e, {
tags: {
subscriptionKey,
context: "refetch-on-reconnect",
},
extra: { wsClosedDate },
});
} catch (_e2) {
// ignore
}
} finally {
reconnectRefetchPendingRef.current = false;
setReconnectSyncTrigger((x) => x + 1);
}
})();
}, [refetch, refetchOnReconnect, skip, subscriptionKey, wsClosedDate]);
useEffect(() => { useEffect(() => {
if (!livenessStaleMs) return; if (!livenessStaleMs) return;
if (skip) return; if (skip) return;
const STALE_KICKS_BEFORE_WS_RESTART = 2;
const STALE_KICKS_BEFORE_RELOAD = 4;
const GLOBAL_RECOVERY_COOLDOWN_MS = 30_000;
// Separate throttle for escalations; resubscribe kicks are already throttled by livenessStaleMs.
const MIN_ESCALATION_INTERVAL_MS = 60_000;
const interval = setInterval(() => { const interval = setInterval(() => {
if (appStateRef.current !== "active") return;
if (!wsConnected) return; if (!wsConnected) return;
const age = Date.now() - lastSubscriptionDataAtRef.current; const age = Date.now() - lastSubscriptionDataAtRef.current;
if (age < livenessStaleMs) return; if (age < livenessStaleMs) return;
@ -85,9 +185,131 @@ export default function useLatestWithSubscription(
if (now - lastLivenessKickAtRef.current < livenessStaleMs) return; if (now - lastLivenessKickAtRef.current < livenessStaleMs) return;
lastLivenessKickAtRef.current = now; lastLivenessKickAtRef.current = now;
consecutiveStaleKicksRef.current += 1;
const wsHeartbeatAgeMs = (() => {
const hb = wsLastHeartbeatDateRef.current;
if (!hb) return null;
const last = Date.parse(hb);
return Number.isFinite(last) ? Date.now() - last : null;
})();
console.warn( console.warn(
`[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`, `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe (wsHeartbeatAgeMs=${
wsHeartbeatAgeMs ?? "n/a"
}, kicks=${consecutiveStaleKicksRef.current})`,
); );
try {
Sentry.addBreadcrumb({
category: "graphql-subscription",
level: "warning",
message: "liveness stale kick",
data: {
subscriptionKey,
ageMs: age,
livenessStaleMs,
wsHeartbeatAgeMs,
kicks: consecutiveStaleKicksRef.current,
},
});
} catch (_e) {
// ignore
}
// Escalation policy for repeated consecutive stale kicks.
if (
consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_RELOAD &&
now - lastReloadAtRef.current >= MIN_ESCALATION_INTERVAL_MS
) {
const lastRecovery = wsLastRecoveryDateRef.current
? Date.parse(wsLastRecoveryDateRef.current)
: NaN;
if (
Number.isFinite(lastRecovery) &&
now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS
) {
return;
}
lastReloadAtRef.current = now;
networkActions.WSRecoveryTouch();
console.warn(
`[${subscriptionKey}] Escalation: triggering reload after ${consecutiveStaleKicksRef.current} stale kicks`,
);
try {
Sentry.captureMessage("subscription escalated to reload", {
level: "warning",
tags: { subscriptionKey, context: "liveness" },
extra: {
consecutiveKicks: consecutiveStaleKicksRef.current,
wsHeartbeatAgeMs,
ageMs: age,
livenessStaleMs,
},
});
} catch (_e) {
// ignore
}
networkActions.triggerReload();
} else if (
consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_WS_RESTART &&
now - lastWsRestartAtRef.current >= MIN_ESCALATION_INTERVAL_MS
) {
const lastRecovery = wsLastRecoveryDateRef.current
? Date.parse(wsLastRecoveryDateRef.current)
: NaN;
if (
Number.isFinite(lastRecovery) &&
now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS
) {
return;
}
lastWsRestartAtRef.current = now;
networkActions.WSRecoveryTouch();
try {
console.warn(
`[${subscriptionKey}] Escalation: restarting WS after ${consecutiveStaleKicksRef.current} stale kicks`,
);
try {
Sentry.captureMessage("subscription escalated to ws restart", {
level: "warning",
tags: { subscriptionKey, context: "liveness" },
extra: {
consecutiveKicks: consecutiveStaleKicksRef.current,
wsHeartbeatAgeMs,
ageMs: age,
livenessStaleMs,
},
});
} catch (_e2) {
// ignore
}
network.apolloClient?.restartWS?.();
} catch (error) {
console.warn(
`[${subscriptionKey}] Escalation: WS restart failed`,
error,
);
try {
Sentry.captureException(error, {
tags: {
subscriptionKey,
context: "liveness-ws-restart-failed",
},
});
} catch (_e2) {
// ignore
}
}
}
lastSubscriptionDataAtRef.current = now; lastSubscriptionDataAtRef.current = now;
setRetryTrigger((prev) => prev + 1); setRetryTrigger((prev) => prev + 1);
}, livenessCheckEveryMs); }, livenessCheckEveryMs);
@ -119,6 +341,7 @@ export default function useLatestWithSubscription(
loading, loading,
error, error,
subscribeToMore, subscribeToMore,
refetch,
} = useQuery(initialQuery, { } = useQuery(initialQuery, {
...queryParams, ...queryParams,
variables, variables,
@ -187,6 +410,18 @@ export default function useLatestWithSubscription(
if (!subscribeToMore) return; if (!subscribeToMore) return;
if (highestIdRef.current === null) return; // Wait until we have the highest ID if (highestIdRef.current === null) return; // Wait until we have the highest ID
if (appStateRef.current !== "active") return;
// If we opted into refetch-on-reconnect and a reconnect refetch is still pending,
// wait to (re)subscribe until the base query has been refreshed.
if (
refetchOnReconnect &&
wsClosedDate &&
reconnectRefetchPendingRef.current
) {
return;
}
// Always cleanup any previous active subscription before creating a new one. // Always cleanup any previous active subscription before creating a new one.
// React only runs the cleanup returned directly from the effect. // React only runs the cleanup returned directly from the effect.
if (unsubscribeRef.current) { if (unsubscribeRef.current) {
@ -305,6 +540,7 @@ export default function useLatestWithSubscription(
retryCountRef.current = 0; retryCountRef.current = 0;
subscriptionErrorRef.current = null; subscriptionErrorRef.current = null;
lastSubscriptionDataAtRef.current = Date.now(); lastSubscriptionDataAtRef.current = Date.now();
consecutiveStaleKicksRef.current = 0;
} }
if (!subscriptionData.data) return prev; if (!subscriptionData.data) return prev;
@ -316,6 +552,8 @@ export default function useLatestWithSubscription(
const newItems = subscriptionData.data[subscriptionRootKey] || []; const newItems = subscriptionData.data[subscriptionRootKey] || [];
const existingItems = prev[queryRootKey] || []; const existingItems = prev[queryRootKey] || [];
const mergeStart = Date.now();
// Filter new items // Filter new items
const filteredNewItems = newItems.filter( const filteredNewItems = newItems.filter(
(item) => (item) =>
@ -325,7 +563,15 @@ export default function useLatestWithSubscription(
), ),
); );
if (filteredNewItems.length === 0) return prev; if (filteredNewItems.length === 0) {
const tookMs = Date.now() - mergeStart;
if (tookMs > 100) {
console.warn(
`[${subscriptionKey}] updateQuery merge took ${tookMs}ms (existing=${existingItems.length}, new=${newItems.length}, result=${existingItems.length})`,
);
}
return prev;
}
// Update highestId if we received any new items // Update highestId if we received any new items
filteredNewItems.forEach((item) => { filteredNewItems.forEach((item) => {
@ -341,9 +587,18 @@ export default function useLatestWithSubscription(
); );
// For latest items pattern, we prepend new items (DESC order in UI) // For latest items pattern, we prepend new items (DESC order in UI)
const resultItems = [...filteredNewItems, ...existingItems];
const tookMs = Date.now() - mergeStart;
if (tookMs > 100) {
console.warn(
`[${subscriptionKey}] updateQuery merge took ${tookMs}ms (existing=${existingItems.length}, new=${newItems.length}, result=${resultItems.length})`,
);
}
return { return {
...prev, ...prev,
[queryRootKey]: [...filteredNewItems, ...existingItems], [queryRootKey]: resultItems,
}; };
}, },
}); });
@ -421,6 +676,8 @@ export default function useLatestWithSubscription(
maxRetries, maxRetries,
livenessStaleMs, livenessStaleMs,
livenessCheckEveryMs, livenessCheckEveryMs,
refetchOnReconnect,
reconnectSyncTrigger,
]); ]);
return { return {

View file

@ -1,7 +1,9 @@
import { useRef, useEffect, useMemo, useState } from "react"; import { useRef, useEffect, useMemo, useState } from "react";
import { useQuery } from "@apollo/client"; import { useQuery } from "@apollo/client";
import * as Sentry from "@sentry/react-native"; import * as Sentry from "@sentry/react-native";
import { useNetworkState } from "~/stores"; import { AppState } from "react-native";
import { useNetworkState, networkActions } from "~/stores";
import network from "~/network";
import useShallowMemo from "./useShallowMemo"; import useShallowMemo from "./useShallowMemo";
// Constants for retry configuration // Constants for retry configuration
@ -28,18 +30,23 @@ export default function useStreamQueryWithSubscription(
maxRetries = MAX_RETRIES, // Allow overriding default max retries maxRetries = MAX_RETRIES, // Allow overriding default max retries
livenessStaleMs = null, livenessStaleMs = null,
livenessCheckEveryMs = 15_000, livenessCheckEveryMs = 15_000,
refetchOnReconnect = false,
...queryParams ...queryParams
} = {}, } = {},
) { ) {
const variables = useShallowMemo(() => paramVariables, paramVariables); const variables = useShallowMemo(() => paramVariables, paramVariables);
const { wsClosedDate, wsConnected } = useNetworkState([ const { wsClosedDate, wsConnected, wsLastHeartbeatDate, wsLastRecoveryDate } =
"wsClosedDate", useNetworkState([
"wsConnected", "wsClosedDate",
]); "wsConnected",
"wsLastHeartbeatDate",
"wsLastRecoveryDate",
]);
// State to force re-render and retry subscription // State to force re-render and retry subscription
const [retryTrigger, setRetryTrigger] = useState(0); const [retryTrigger, setRetryTrigger] = useState(0);
const [reconnectSyncTrigger, setReconnectSyncTrigger] = useState(0);
const variableHashRef = useRef(JSON.stringify(variables)); const variableHashRef = useRef(JSON.stringify(variables));
const lastCursorRef = useRef(initialCursor); const lastCursorRef = useRef(initialCursor);
@ -65,12 +72,105 @@ export default function useStreamQueryWithSubscription(
// hasn't delivered any payload for some time, trigger a resubscribe. // hasn't delivered any payload for some time, trigger a resubscribe.
const lastSubscriptionDataAtRef = useRef(Date.now()); const lastSubscriptionDataAtRef = useRef(Date.now());
const lastLivenessKickAtRef = useRef(0); const lastLivenessKickAtRef = useRef(0);
const consecutiveStaleKicksRef = useRef(0);
const lastWsRestartAtRef = useRef(0);
const lastReloadAtRef = useRef(0);
const wsLastHeartbeatDateRef = useRef(wsLastHeartbeatDate);
const appStateRef = useRef(AppState.currentState);
const wsLastRecoveryDateRef = useRef(wsLastRecoveryDate);
// Optional refetch-on-reconnect support.
// Goal: if WS was reconnected (wsClosedDate changes), force a base refetch once before resubscribing
// to reduce chances of cursor gaps.
const reconnectRefetchPendingRef = useRef(false);
const lastReconnectRefetchKeyRef = useRef(null);
useEffect(() => {
wsLastHeartbeatDateRef.current = wsLastHeartbeatDate;
}, [wsLastHeartbeatDate]);
useEffect(() => {
wsLastRecoveryDateRef.current = wsLastRecoveryDate;
}, [wsLastRecoveryDate]);
useEffect(() => {
const sub = AppState.addEventListener("change", (next) => {
appStateRef.current = next;
if (next === "active") {
// Timers may have been paused/throttled; reset stale timers to avoid false kicks.
lastSubscriptionDataAtRef.current = Date.now();
lastLivenessKickAtRef.current = 0;
consecutiveStaleKicksRef.current = 0;
}
});
return () => sub.remove();
}, []);
useEffect(() => {
if (!refetchOnReconnect) return;
if (skip) return;
if (appStateRef.current !== "active") return;
if (!wsClosedDate) return;
if (!refetch) return;
// Only refetch once per wsClosedDate value.
if (lastReconnectRefetchKeyRef.current === wsClosedDate) return;
lastReconnectRefetchKeyRef.current = wsClosedDate;
reconnectRefetchPendingRef.current = true;
(async () => {
try {
try {
Sentry.addBreadcrumb({
category: "graphql-subscription",
level: "info",
message: "refetch-on-reconnect start",
data: { subscriptionKey, wsClosedDate },
});
} catch (_e) {
// ignore
}
console.log(
`[${subscriptionKey}] WS reconnect detected, refetching base query to prevent gaps`,
{ wsClosedDate },
);
await refetch();
} catch (e) {
console.warn(
`[${subscriptionKey}] Refetch-on-reconnect failed (continuing with resubscribe)`,
e,
);
try {
Sentry.captureException(e, {
tags: {
subscriptionKey,
context: "refetch-on-reconnect",
},
extra: { wsClosedDate },
});
} catch (_e2) {
// ignore
}
} finally {
reconnectRefetchPendingRef.current = false;
setReconnectSyncTrigger((x) => x + 1);
}
})();
}, [refetch, refetchOnReconnect, skip, subscriptionKey, wsClosedDate]);
useEffect(() => { useEffect(() => {
if (!livenessStaleMs) return; if (!livenessStaleMs) return;
if (skip) return; if (skip) return;
const STALE_KICKS_BEFORE_WS_RESTART = 2;
const STALE_KICKS_BEFORE_RELOAD = 4;
const GLOBAL_RECOVERY_COOLDOWN_MS = 30_000;
// Separate throttle for escalations; resubscribe kicks are already throttled by livenessStaleMs.
const MIN_ESCALATION_INTERVAL_MS = 60_000;
const interval = setInterval(() => { const interval = setInterval(() => {
if (appStateRef.current !== "active") return;
if (!wsConnected) return; if (!wsConnected) return;
const age = Date.now() - lastSubscriptionDataAtRef.current; const age = Date.now() - lastSubscriptionDataAtRef.current;
if (age < livenessStaleMs) return; if (age < livenessStaleMs) return;
@ -80,9 +180,131 @@ export default function useStreamQueryWithSubscription(
if (now - lastLivenessKickAtRef.current < livenessStaleMs) return; if (now - lastLivenessKickAtRef.current < livenessStaleMs) return;
lastLivenessKickAtRef.current = now; lastLivenessKickAtRef.current = now;
consecutiveStaleKicksRef.current += 1;
const wsHeartbeatAgeMs = (() => {
const hb = wsLastHeartbeatDateRef.current;
if (!hb) return null;
const last = Date.parse(hb);
return Number.isFinite(last) ? Date.now() - last : null;
})();
console.warn( console.warn(
`[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`, `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe (wsHeartbeatAgeMs=${
wsHeartbeatAgeMs ?? "n/a"
}, kicks=${consecutiveStaleKicksRef.current})`,
); );
try {
Sentry.addBreadcrumb({
category: "graphql-subscription",
level: "warning",
message: "liveness stale kick",
data: {
subscriptionKey,
ageMs: age,
livenessStaleMs,
wsHeartbeatAgeMs,
kicks: consecutiveStaleKicksRef.current,
},
});
} catch (_e) {
// ignore
}
// Escalation policy for repeated consecutive stale kicks.
if (
consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_RELOAD &&
now - lastReloadAtRef.current >= MIN_ESCALATION_INTERVAL_MS
) {
const lastRecovery = wsLastRecoveryDateRef.current
? Date.parse(wsLastRecoveryDateRef.current)
: NaN;
if (
Number.isFinite(lastRecovery) &&
now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS
) {
return;
}
lastReloadAtRef.current = now;
networkActions.WSRecoveryTouch();
console.warn(
`[${subscriptionKey}] Escalation: triggering reload after ${consecutiveStaleKicksRef.current} stale kicks`,
);
try {
Sentry.captureMessage("subscription escalated to reload", {
level: "warning",
tags: { subscriptionKey, context: "liveness" },
extra: {
consecutiveKicks: consecutiveStaleKicksRef.current,
wsHeartbeatAgeMs,
ageMs: age,
livenessStaleMs,
},
});
} catch (_e) {
// ignore
}
networkActions.triggerReload();
} else if (
consecutiveStaleKicksRef.current >= STALE_KICKS_BEFORE_WS_RESTART &&
now - lastWsRestartAtRef.current >= MIN_ESCALATION_INTERVAL_MS
) {
const lastRecovery = wsLastRecoveryDateRef.current
? Date.parse(wsLastRecoveryDateRef.current)
: NaN;
if (
Number.isFinite(lastRecovery) &&
now - lastRecovery < GLOBAL_RECOVERY_COOLDOWN_MS
) {
return;
}
lastWsRestartAtRef.current = now;
networkActions.WSRecoveryTouch();
try {
console.warn(
`[${subscriptionKey}] Escalation: restarting WS after ${consecutiveStaleKicksRef.current} stale kicks`,
);
try {
Sentry.captureMessage("subscription escalated to ws restart", {
level: "warning",
tags: { subscriptionKey, context: "liveness" },
extra: {
consecutiveKicks: consecutiveStaleKicksRef.current,
wsHeartbeatAgeMs,
ageMs: age,
livenessStaleMs,
},
});
} catch (_e2) {
// ignore
}
network.apolloClient?.restartWS?.();
} catch (error) {
console.warn(
`[${subscriptionKey}] Escalation: WS restart failed`,
error,
);
try {
Sentry.captureException(error, {
tags: {
subscriptionKey,
context: "liveness-ws-restart-failed",
},
});
} catch (_e2) {
// ignore
}
}
}
lastSubscriptionDataAtRef.current = now; lastSubscriptionDataAtRef.current = now;
setRetryTrigger((prev) => prev + 1); setRetryTrigger((prev) => prev + 1);
}, livenessCheckEveryMs); }, livenessCheckEveryMs);
@ -123,6 +345,7 @@ export default function useStreamQueryWithSubscription(
loading, loading,
error, error,
subscribeToMore, subscribeToMore,
refetch,
} = useQuery(initialQuery, { } = useQuery(initialQuery, {
...queryParams, ...queryParams,
variables: queryVariables, variables: queryVariables,
@ -179,6 +402,18 @@ export default function useStreamQueryWithSubscription(
if (skip) return; // If skipping, do nothing if (skip) return; // If skipping, do nothing
if (!subscribeToMore) return; if (!subscribeToMore) return;
if (appStateRef.current !== "active") return;
// If we opted into refetch-on-reconnect and a reconnect refetch is still pending,
// wait to (re)subscribe until the base query has been refreshed.
if (
refetchOnReconnect &&
wsClosedDate &&
reconnectRefetchPendingRef.current
) {
return;
}
// If we're about to (re)subscribe, always cleanup any previous subscription first. // If we're about to (re)subscribe, always cleanup any previous subscription first.
// This is critical because React effect cleanups must be returned synchronously // This is critical because React effect cleanups must be returned synchronously
// from the effect, not from inside async callbacks. // from the effect, not from inside async callbacks.
@ -298,6 +533,7 @@ export default function useStreamQueryWithSubscription(
retryCountRef.current = 0; retryCountRef.current = 0;
subscriptionErrorRef.current = null; subscriptionErrorRef.current = null;
lastSubscriptionDataAtRef.current = Date.now(); lastSubscriptionDataAtRef.current = Date.now();
consecutiveStaleKicksRef.current = 0;
} }
if (!subscriptionData.data) return prev; if (!subscriptionData.data) return prev;
@ -309,6 +545,74 @@ export default function useStreamQueryWithSubscription(
const newItems = subscriptionData.data[subscriptionRootKey] || []; const newItems = subscriptionData.data[subscriptionRootKey] || [];
const existingItems = prev[queryRootKey] || []; const existingItems = prev[queryRootKey] || [];
const mergeStart = Date.now();
// Fast path: when uniqKey === cursorKey, cursor ordering is ASC, and incoming items
// are strictly newer than the last existing item, we can append without rebuilding
// a full map + sort (avoids O(N log N) work on large lists).
if (uniqKey === cursorKey && existingItems.length > 0) {
const lastExistingCursor =
existingItems[existingItems.length - 1]?.[cursorKey];
// Filter items first (and update cursor), while verifying monotonicity.
let monotonic = true;
const filteredNewItems = [];
for (const item of newItems) {
if (!shouldIncludeItemRef.current(item, contextRef.current)) {
continue;
}
const itemCursor = item[cursorKey];
if (itemCursor == null) continue;
if (
typeof lastExistingCursor === "number" &&
typeof itemCursor === "number" &&
itemCursor <= lastExistingCursor
) {
monotonic = false;
break;
}
// Update last cursor if item is newer
if (
!lastCursorRef.current ||
itemCursor > lastCursorRef.current
) {
lastCursorRef.current = itemCursor;
}
filteredNewItems.push(item);
}
if (monotonic) {
if (filteredNewItems.length === 0) {
const tookMs = Date.now() - mergeStart;
if (tookMs > 100) {
console.warn(
`[${subscriptionKey}] updateQuery merge took ${tookMs}ms (fast-path, existing=${existingItems.length}, new=${newItems.length}, result=${existingItems.length})`,
);
}
return prev;
}
const resultItems = [...existingItems, ...filteredNewItems];
const tookMs = Date.now() - mergeStart;
if (tookMs > 100) {
console.warn(
`[${subscriptionKey}] updateQuery merge took ${tookMs}ms (fast-path, existing=${existingItems.length}, new=${newItems.length}, result=${resultItems.length})`,
);
}
return {
...prev,
[queryRootKey]: resultItems,
};
}
}
// 1) Build a map from existing items // 1) Build a map from existing items
const itemMap = new Map(); const itemMap = new Map();
existingItems.forEach((item) => { existingItems.forEach((item) => {
@ -351,6 +655,13 @@ export default function useStreamQueryWithSubscription(
return aCursor - bCursor; return aCursor - bCursor;
}); });
const tookMs = Date.now() - mergeStart;
if (tookMs > 100) {
console.warn(
`[${subscriptionKey}] updateQuery merge took ${tookMs}ms (existing=${existingItems.length}, new=${newItems.length}, result=${sortedItems.length})`,
);
}
return { return {
[queryRootKey]: sortedItems, [queryRootKey]: sortedItems,
}; };
@ -430,6 +741,8 @@ export default function useStreamQueryWithSubscription(
maxRetries, maxRetries,
livenessStaleMs, livenessStaleMs,
livenessCheckEveryMs, livenessCheckEveryMs,
refetchOnReconnect,
reconnectSyncTrigger,
]); ]);
return { return {

View file

@ -1,4 +1,6 @@
import { useEffect, useRef } from "react"; import { useEffect, useRef } from "react";
import { AppState } from "react-native";
import * as Sentry from "@sentry/react-native";
import { useNetworkState, networkActions } from "~/stores"; import { useNetworkState, networkActions } from "~/stores";
import network from "~/network"; import network from "~/network";
import { createLogger } from "~/lib/logger"; import { createLogger } from "~/lib/logger";
@ -14,24 +16,52 @@ const CHECK_EVERY_MS = 10_000;
const MIN_RESTART_INTERVAL_MS = 30_000; const MIN_RESTART_INTERVAL_MS = 30_000;
export default function useWsWatchdog({ enabled = true } = {}) { export default function useWsWatchdog({ enabled = true } = {}) {
const { wsConnected, wsLastHeartbeatDate, hasInternetConnection } = const {
useNetworkState([ wsConnected,
"wsConnected", wsLastHeartbeatDate,
"wsLastHeartbeatDate", wsLastRecoveryDate,
"hasInternetConnection", hasInternetConnection,
]); } = useNetworkState([
"wsConnected",
"wsLastHeartbeatDate",
"wsLastRecoveryDate",
"hasInternetConnection",
]);
const lastRestartRef = useRef(0); const lastRestartRef = useRef(0);
const wsLastHeartbeatDateRef = useRef(wsLastHeartbeatDate);
const appStateRef = useRef(AppState.currentState);
const wsLastRecoveryDateRef = useRef(wsLastRecoveryDate);
useEffect(() => {
wsLastHeartbeatDateRef.current = wsLastHeartbeatDate;
}, [wsLastHeartbeatDate]);
useEffect(() => {
wsLastRecoveryDateRef.current = wsLastRecoveryDate;
}, [wsLastRecoveryDate]);
useEffect(() => {
const sub = AppState.addEventListener("change", (next) => {
appStateRef.current = next;
if (next === "active") {
// Avoid false positives right after app foreground (timers may have been throttled).
lastRestartRef.current = Date.now();
}
});
return () => sub.remove();
}, []);
useEffect(() => { useEffect(() => {
if (!enabled) return; if (!enabled) return;
const interval = setInterval(() => { const interval = setInterval(() => {
if (appStateRef.current !== "active") return;
if (!hasInternetConnection) return; if (!hasInternetConnection) return;
if (!wsConnected) return; if (!wsConnected) return;
if (!wsLastHeartbeatDate) return; if (!wsLastHeartbeatDateRef.current) return;
const last = Date.parse(wsLastHeartbeatDate); const last = Date.parse(wsLastHeartbeatDateRef.current);
if (!Number.isFinite(last)) return; if (!Number.isFinite(last)) return;
const age = Date.now() - last; const age = Date.now() - last;
@ -39,23 +69,67 @@ export default function useWsWatchdog({ enabled = true } = {}) {
const now = Date.now(); const now = Date.now();
if (now - lastRestartRef.current < MIN_RESTART_INTERVAL_MS) return; if (now - lastRestartRef.current < MIN_RESTART_INTERVAL_MS) return;
// Global recovery throttle: avoid double restarts from multiple sources.
const lastRecovery = wsLastRecoveryDateRef.current
? Date.parse(wsLastRecoveryDateRef.current)
: NaN;
if (Number.isFinite(lastRecovery)) {
const recoveryAge = now - lastRecovery;
if (recoveryAge < MIN_RESTART_INTERVAL_MS) return;
}
lastRestartRef.current = now; lastRestartRef.current = now;
networkActions.WSRecoveryTouch();
watchdogLogger.warn("WS heartbeat stale, triggering recovery", { watchdogLogger.warn("WS heartbeat stale, triggering recovery", {
ageMs: age, ageMs: age,
lastHeartbeatDate: wsLastHeartbeatDate, lastHeartbeatDate: wsLastHeartbeatDateRef.current,
}); });
try {
Sentry.addBreadcrumb({
category: "websocket",
level: "warning",
message: "ws watchdog heartbeat stale",
data: {
ageMs: age,
lastHeartbeatDate: wsLastHeartbeatDateRef.current,
},
});
} catch (_e) {
// ignore
}
try { try {
// First line recovery: restart websocket transport // First line recovery: restart websocket transport
try {
Sentry.captureMessage("ws watchdog restarting transport", {
level: "warning",
extra: {
ageMs: age,
lastHeartbeatDate: wsLastHeartbeatDateRef.current,
},
});
} catch (_e) {
// ignore
}
network.apolloClient?.restartWS?.(); network.apolloClient?.restartWS?.();
} catch (error) { } catch (error) {
watchdogLogger.error("WS restart failed", { error }); watchdogLogger.error("WS restart failed", { error });
try {
Sentry.captureException(error, {
tags: { context: "ws-watchdog-restart-failed" },
});
} catch (_e) {
// ignore
}
} }
// Second line recovery: if WS stays stale, do a full client reload // Second line recovery: if WS stays stale, do a full client reload
setTimeout(() => { setTimeout(() => {
const last2 = Date.parse(wsLastHeartbeatDate); const last2 = Date.parse(wsLastHeartbeatDateRef.current);
const age2 = Number.isFinite(last2) ? Date.now() - last2 : Infinity; const age2 = Number.isFinite(last2) ? Date.now() - last2 : Infinity;
if (age2 >= HEARTBEAT_STALE_MS) { if (age2 >= HEARTBEAT_STALE_MS) {
watchdogLogger.warn( watchdogLogger.warn(
@ -64,11 +138,21 @@ export default function useWsWatchdog({ enabled = true } = {}) {
ageMs: age2, ageMs: age2,
}, },
); );
try {
Sentry.captureMessage("ws watchdog triggering reload", {
level: "warning",
extra: { ageMs: age2 },
});
} catch (_e) {
// ignore
}
networkActions.triggerReload(); networkActions.triggerReload();
} }
}, 10_000); }, 10_000);
}, CHECK_EVERY_MS); }, CHECK_EVERY_MS);
return () => clearInterval(interval); return () => clearInterval(interval);
}, [enabled, hasInternetConnection, wsConnected, wsLastHeartbeatDate]); }, [enabled, hasInternetConnection, wsConnected]);
} }

View file

@ -19,6 +19,11 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) {
// const MAX_RECONNECT_ATTEMPTS = 5; // Limit reconnection attempts // const MAX_RECONNECT_ATTEMPTS = 5; // Limit reconnection attempts
const MAX_RECONNECT_ATTEMPTS = Infinity; // Limit reconnection attempts const MAX_RECONNECT_ATTEMPTS = Infinity; // Limit reconnection attempts
// Graceful degradation: after prolonged WS reconnecting, surface app-level recovery
// via the existing reload mechanism (NetworkProviders will recreate Apollo).
const MAX_RECONNECT_TIME_MS = 5 * 60 * 1000;
let firstFailureAt = null;
let reconnectAttempts = 0; let reconnectAttempts = 0;
function getReconnectDelay() { function getReconnectDelay() {
// Exponential backoff with max delay // Exponential backoff with max delay
@ -80,6 +85,7 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) {
wsLogger.info("WebSocket connected"); wsLogger.info("WebSocket connected");
activeSocket = socket; activeSocket = socket;
reconnectAttempts = 0; // Reset attempts on successful connection reconnectAttempts = 0; // Reset attempts on successful connection
firstFailureAt = null;
networkActions.WSConnected(); networkActions.WSConnected();
networkActions.WSTouch(); networkActions.WSTouch();
cancelReconnect(); // Cancel any pending reconnects cancelReconnect(); // Cancel any pending reconnects
@ -98,6 +104,10 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) {
}); });
networkActions.WSClosed(); networkActions.WSClosed();
if (!firstFailureAt) {
firstFailureAt = Date.now();
}
// Clear socket and timeouts // Clear socket and timeouts
activeSocket = undefined; activeSocket = undefined;
if (pingTimeout) { if (pingTimeout) {
@ -107,6 +117,20 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) {
// Schedule reconnect unless explicitly closed (1000) or going away (1001) // Schedule reconnect unless explicitly closed (1000) or going away (1001)
if (event.code !== 1000 && event.code !== 1001) { if (event.code !== 1000 && event.code !== 1001) {
const reconnectAge = Date.now() - firstFailureAt;
if (reconnectAge >= MAX_RECONNECT_TIME_MS) {
wsLogger.warn(
"WebSocket reconnecting too long, triggering app reload",
{
reconnectAgeMs: reconnectAge,
reconnectAttempts,
lastCloseCode: event.code,
},
);
networkActions.triggerReload();
return;
}
reconnectAttempts++; reconnectAttempts++;
scheduleReconnect(); scheduleReconnect();
} else { } else {

View file

@ -7,6 +7,7 @@ export default createAtom(({ merge, get }) => {
wsConnectedDate: null, wsConnectedDate: null,
wsClosedDate: null, wsClosedDate: null,
wsLastHeartbeatDate: null, wsLastHeartbeatDate: null,
wsLastRecoveryDate: null,
triggerReload: false, triggerReload: false,
initialized: true, initialized: true,
hasInternetConnection: true, hasInternetConnection: true,
@ -48,6 +49,13 @@ export default createAtom(({ merge, get }) => {
wsLastHeartbeatDate: new Date().toISOString(), wsLastHeartbeatDate: new Date().toISOString(),
}); });
}, },
WSRecoveryTouch: () => {
// Shared throttle marker to avoid multiple parts of the app triggering WS recovery
// at the same time (watchdog + per-subscription liveness + lifecycle).
merge({
wsLastRecoveryDate: new Date().toISOString(),
});
},
setHasInternetConnection: (status) => setHasInternetConnection: (status) =>
merge({ hasInternetConnection: status }), merge({ hasInternetConnection: status }),
}, },