fix(ws): stabilization try 1

This commit is contained in:
devthejo 2026-01-15 19:17:58 +01:00
parent 4d71c229d6
commit 147e514d03
No known key found for this signature in database
GPG key ID: 00CCA7A92B1D5351
8 changed files with 150 additions and 35 deletions

View file

@ -29,6 +29,7 @@ import {
} from "react-native-safe-area-context"; } from "react-native-safe-area-context";
import useTrackLocation from "~/hooks/useTrackLocation"; import useTrackLocation from "~/hooks/useTrackLocation";
import useWsWatchdog from "~/hooks/useWsWatchdog";
// import { initializeBackgroundFetch } from "~/services/backgroundFetch"; // import { initializeBackgroundFetch } from "~/services/backgroundFetch";
import useMount from "~/hooks/useMount"; import useMount from "~/hooks/useMount";
@ -224,6 +225,7 @@ function AppContent() {
useUpdates(); useUpdates();
useNetworkListener(); useNetworkListener();
useTrackLocation(); useTrackLocation();
useWsWatchdog();
// useMount(() => { // useMount(() => {
// const setupBackgroundFetch = async () => { // const setupBackgroundFetch = async () => {

View file

@ -47,6 +47,7 @@ export default function useLatestWithSubscription(
const retryCountRef = useRef(0); const retryCountRef = useRef(0);
const subscriptionErrorRef = useRef(null); const subscriptionErrorRef = useRef(null);
const timeoutIdRef = useRef(null); const timeoutIdRef = useRef(null);
const unsubscribeRef = useRef(null);
useEffect(() => { useEffect(() => {
const currentVarsHash = JSON.stringify(variables); const currentVarsHash = JSON.stringify(variables);
@ -134,6 +135,17 @@ export default function useLatestWithSubscription(
if (!subscribeToMore) return; if (!subscribeToMore) return;
if (highestIdRef.current === null) return; // Wait until we have the highest ID if (highestIdRef.current === null) return; // Wait until we have the highest ID
// Always cleanup any previous active subscription before creating a new one.
// React only runs the cleanup returned directly from the effect.
if (unsubscribeRef.current) {
try {
unsubscribeRef.current();
} catch (_e) {
// ignore
}
unsubscribeRef.current = null;
}
// Check if max retries reached and we have an error // Check if max retries reached and we have an error
if (retryCountRef.current >= maxRetries && subscriptionErrorRef.current) { if (retryCountRef.current >= maxRetries && subscriptionErrorRef.current) {
console.error( console.error(
@ -283,15 +295,7 @@ export default function useLatestWithSubscription(
}, },
}); });
// Cleanup on unmount or re-run unsubscribeRef.current = unsubscribe;
return () => {
console.log(`[${subscriptionKey}] Cleaning up subscription`);
if (timeoutIdRef.current) {
clearTimeout(timeoutIdRef.current);
timeoutIdRef.current = null;
}
unsubscribe();
};
} catch (error) { } catch (error) {
// Handle setup errors (like malformed queries) // Handle setup errors (like malformed queries)
console.error( console.error(
@ -331,22 +335,24 @@ export default function useLatestWithSubscription(
console.error("Failed to report to Sentry:", sentryError); console.error("Failed to report to Sentry:", sentryError);
} }
} }
return () => {
if (timeoutIdRef.current) {
clearTimeout(timeoutIdRef.current);
timeoutIdRef.current = null;
}
};
} }
}, backoffDelay); }, backoffDelay);
// Cleanup function that will run when component unmounts or effect re-runs // Cleanup function that will run when component unmounts or effect re-runs
return () => { return () => {
console.log(`[${subscriptionKey}] Cleaning up subscription`);
if (timeoutIdRef.current) { if (timeoutIdRef.current) {
clearTimeout(timeoutIdRef.current); clearTimeout(timeoutIdRef.current);
timeoutIdRef.current = null; timeoutIdRef.current = null;
} }
if (unsubscribeRef.current) {
try {
unsubscribeRef.current();
} catch (_e) {
// ignore
}
unsubscribeRef.current = null;
}
}; };
}, [ }, [
skip, skip,

View file

@ -40,6 +40,7 @@ export default function useStreamQueryWithSubscription(
const retryCountRef = useRef(0); const retryCountRef = useRef(0);
const subscriptionErrorRef = useRef(null); const subscriptionErrorRef = useRef(null);
const timeoutIdRef = useRef(null); const timeoutIdRef = useRef(null);
const unsubscribeRef = useRef(null);
useEffect(() => { useEffect(() => {
const currentVarsHash = JSON.stringify(variables); const currentVarsHash = JSON.stringify(variables);
@ -124,6 +125,18 @@ export default function useStreamQueryWithSubscription(
if (skip) return; // If skipping, do nothing if (skip) return; // If skipping, do nothing
if (!subscribeToMore) return; if (!subscribeToMore) return;
// If we're about to (re)subscribe, always cleanup any previous subscription first.
// This is critical because React effect cleanups must be returned synchronously
// from the effect, not from inside async callbacks.
if (unsubscribeRef.current) {
try {
unsubscribeRef.current();
} catch (_e) {
// ignore
}
unsubscribeRef.current = null;
}
// Check if max retries reached and we have an error - this check must be done regardless of other conditions // Check if max retries reached and we have an error - this check must be done regardless of other conditions
if (retryCountRef.current >= maxRetries && subscriptionErrorRef.current) { if (retryCountRef.current >= maxRetries && subscriptionErrorRef.current) {
console.error( console.error(
@ -289,15 +302,7 @@ export default function useStreamQueryWithSubscription(
}, },
}); });
// Cleanup on unmount or re-run unsubscribeRef.current = unsubscribe;
return () => {
console.log(`[${subscriptionKey}] Cleaning up subscription`);
if (timeoutIdRef.current) {
clearTimeout(timeoutIdRef.current);
timeoutIdRef.current = null;
}
unsubscribe();
};
} catch (error) { } catch (error) {
// Handle setup errors (like malformed queries) // Handle setup errors (like malformed queries)
console.error( console.error(
@ -337,22 +342,24 @@ export default function useStreamQueryWithSubscription(
console.error("Failed to report to Sentry:", sentryError); console.error("Failed to report to Sentry:", sentryError);
} }
} }
return () => {
if (timeoutIdRef.current) {
clearTimeout(timeoutIdRef.current);
timeoutIdRef.current = null;
}
};
} }
}, backoffDelay); }, backoffDelay);
// Cleanup function that will run when component unmounts or effect re-runs // Cleanup function that will run when component unmounts or effect re-runs
return () => { return () => {
console.log(`[${subscriptionKey}] Cleaning up subscription`);
if (timeoutIdRef.current) { if (timeoutIdRef.current) {
clearTimeout(timeoutIdRef.current); clearTimeout(timeoutIdRef.current);
timeoutIdRef.current = null; timeoutIdRef.current = null;
} }
if (unsubscribeRef.current) {
try {
unsubscribeRef.current();
} catch (_e) {
// ignore
}
unsubscribeRef.current = null;
}
}; };
}, [ }, [
skip, skip,

View file

@ -0,0 +1,74 @@
import { useEffect, useRef } from "react";
import { useNetworkState, networkActions } from "~/stores";
import network from "~/network";
import { createLogger } from "~/lib/logger";
import { NETWORK_SCOPES } from "~/lib/logger/scopes";
const watchdogLogger = createLogger({
module: NETWORK_SCOPES.WEBSOCKET,
feature: "watchdog",
});
const HEARTBEAT_STALE_MS = 45_000;
const CHECK_EVERY_MS = 10_000;
const MIN_RESTART_INTERVAL_MS = 30_000;
export default function useWsWatchdog({ enabled = true } = {}) {
const { wsConnected, wsLastHeartbeatDate, hasInternetConnection } =
useNetworkState([
"wsConnected",
"wsLastHeartbeatDate",
"hasInternetConnection",
]);
const lastRestartRef = useRef(0);
useEffect(() => {
if (!enabled) return;
const interval = setInterval(() => {
if (!hasInternetConnection) return;
if (!wsConnected) return;
if (!wsLastHeartbeatDate) return;
const last = Date.parse(wsLastHeartbeatDate);
if (!Number.isFinite(last)) return;
const age = Date.now() - last;
if (age < HEARTBEAT_STALE_MS) return;
const now = Date.now();
if (now - lastRestartRef.current < MIN_RESTART_INTERVAL_MS) return;
lastRestartRef.current = now;
watchdogLogger.warn("WS heartbeat stale, triggering recovery", {
ageMs: age,
lastHeartbeatDate: wsLastHeartbeatDate,
});
try {
// First line recovery: restart websocket transport
network.apolloClient?.restartWS?.();
} catch (error) {
watchdogLogger.error("WS restart failed", { error });
}
// Second line recovery: if WS stays stale, do a full client reload
setTimeout(() => {
const last2 = Date.parse(wsLastHeartbeatDate);
const age2 = Number.isFinite(last2) ? Date.now() - last2 : Infinity;
if (age2 >= HEARTBEAT_STALE_MS) {
watchdogLogger.warn(
"WS still stale after restart, triggering reload",
{
ageMs: age2,
},
);
networkActions.triggerReload();
}
}, 10_000);
}, CHECK_EVERY_MS);
return () => clearInterval(interval);
}, [enabled, hasInternetConnection, wsConnected, wsLastHeartbeatDate]);
}

View file

@ -81,6 +81,7 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) {
activeSocket = socket; activeSocket = socket;
reconnectAttempts = 0; // Reset attempts on successful connection reconnectAttempts = 0; // Reset attempts on successful connection
networkActions.WSConnected(); networkActions.WSConnected();
networkActions.WSTouch();
cancelReconnect(); // Cancel any pending reconnects cancelReconnect(); // Cancel any pending reconnects
// Clear any lingering ping timeouts // Clear any lingering ping timeouts
@ -114,6 +115,7 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) {
}, },
ping: (received) => { ping: (received) => {
// wsLogger.debug("WebSocket ping", { received }); // wsLogger.debug("WebSocket ping", { received });
networkActions.WSTouch();
if (!received) { if (!received) {
// Clear any existing ping timeout // Clear any existing ping timeout
if (pingTimeout) { if (pingTimeout) {
@ -138,6 +140,7 @@ export default function createWsLink({ store, GRAPHQL_WS_URL }) {
}, },
pong: (received) => { pong: (received) => {
// wsLogger.debug("WebSocket pong", { received }); // wsLogger.debug("WebSocket pong", { received });
networkActions.WSTouch();
if (received) { if (received) {
clearTimeout(pingTimeout); // pong is received, clear connection close timeout clearTimeout(pingTimeout); // pong is received, clear connection close timeout
} }

View file

@ -17,11 +17,14 @@ export default withConnectivity(function Params() {
deviceId, deviceId,
}, },
}); });
if (loading || !data) { if (loading) {
return <Loader />; return <Loader />;
} }
if (error) { if (error) {
return <Error />; return <Error />;
} }
if (!data) {
return <Error />;
}
return <ParamsView data={data} />; return <ParamsView data={data} />;
}); });

View file

@ -4,6 +4,7 @@ import { ScrollView, View } from "react-native";
import Loader from "~/components/Loader"; import Loader from "~/components/Loader";
import { useSubscription } from "@apollo/client"; import { useSubscription } from "@apollo/client";
import Error from "~/components/Error";
import { LOAD_PROFILE_SUBSCRIPTION } from "./gql"; import { LOAD_PROFILE_SUBSCRIPTION } from "./gql";
@ -23,7 +24,7 @@ const profileLogger = createLogger({
export default withConnectivity(function Profile({ navigation, route }) { export default withConnectivity(function Profile({ navigation, route }) {
const { userId } = useSessionState(["userId"]); const { userId } = useSessionState(["userId"]);
// profileLogger.debug("Profile user ID", { userId }); // profileLogger.debug("Profile user ID", { userId });
const { data, loading, restart } = useSubscription( const { data, loading, error, restart } = useSubscription(
LOAD_PROFILE_SUBSCRIPTION, LOAD_PROFILE_SUBSCRIPTION,
{ {
variables: { variables: {
@ -44,10 +45,21 @@ export default withConnectivity(function Profile({ navigation, route }) {
}); });
}, [navigation]); }, [navigation]);
if (loading || !data?.selectOneUser) { if (loading) {
return <Loader />; return <Loader />;
} }
if (error) {
profileLogger.error("Profile subscription error", { error });
return <Error />;
}
if (!data?.selectOneUser) {
// No error surfaced, but no payload either. Avoid infinite loader.
profileLogger.error("Profile subscription returned no user", { userId });
return <Error />;
}
return ( return (
<ScrollView <ScrollView
style={{ style={{

View file

@ -6,6 +6,7 @@ export default createAtom(({ merge, get }) => {
wsConnected: false, wsConnected: false,
wsConnectedDate: null, wsConnectedDate: null,
wsClosedDate: null, wsClosedDate: null,
wsLastHeartbeatDate: null,
triggerReload: false, triggerReload: false,
initialized: true, initialized: true,
hasInternetConnection: true, hasInternetConnection: true,
@ -27,6 +28,7 @@ export default createAtom(({ merge, get }) => {
merge({ merge({
wsConnected: true, wsConnected: true,
wsConnectedDate: new Date().toISOString(), wsConnectedDate: new Date().toISOString(),
wsLastHeartbeatDate: new Date().toISOString(),
}); });
}, },
WSClosed: () => { WSClosed: () => {
@ -40,6 +42,12 @@ export default createAtom(({ merge, get }) => {
wsClosedDate: new Date().toISOString(), wsClosedDate: new Date().toISOString(),
}); });
}, },
WSTouch: () => {
// Update whenever we get any WS-level signal: connected, ping/pong, or a message.
merge({
wsLastHeartbeatDate: new Date().toISOString(),
});
},
setHasInternetConnection: (status) => setHasInternetConnection: (status) =>
merge({ hasInternetConnection: status }), merge({ hasInternetConnection: status }),
}, },