From aade47beb376a8736acbe975a479350dbd3b5e69 Mon Sep 17 00:00:00 2001 From: devthejo Date: Thu, 15 Jan 2026 22:35:00 +0100 Subject: [PATCH] fix: chat subscription hangs up --- .../AggregatedMessagesSubscription.js | 3 + src/hooks/useLatestWithSubscription.js | 67 +++++++++++++++-- src/hooks/useStreamQueryWithSubscription.js | 71 ++++++++++++++++--- 3 files changed, 126 insertions(+), 15 deletions(-) diff --git a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js index bff82f5..0ed9cdc 100644 --- a/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js +++ b/src/app/subscriptions/aggregatedMessages/AggregatedMessagesSubscription.js @@ -34,6 +34,9 @@ const AggregatedMessagesSubscription = () => { subscriptionKey: "aggregated-messages", fetchPolicy: "network-only", nextFetchPolicy: "cache-first", + // Chat is latency-sensitive; if the WS transport is up but this subscription + // delivers nothing for a while, force a resubscribe. + livenessStaleMs: 60_000, }, ); diff --git a/src/hooks/useLatestWithSubscription.js b/src/hooks/useLatestWithSubscription.js index 71bb564..b5246b8 100644 --- a/src/hooks/useLatestWithSubscription.js +++ b/src/hooks/useLatestWithSubscription.js @@ -9,6 +9,9 @@ const MAX_RETRIES = 5; const INITIAL_BACKOFF_MS = 1000; // 1 second const MAX_BACKOFF_MS = 30000; // 30 seconds +const DEFAULT_CONTEXT = {}; +const DEFAULT_SHOULD_INCLUDE_ITEM = () => true; + /** * Hook that queries for items with custom sorting (e.g., acknowledged first, then by newest) * while still using ID-based cursor for subscriptions to new items. @@ -27,15 +30,20 @@ export default function useLatestWithSubscription( variables: paramVariables = {}, skip = false, subscriptionKey = "default", - context = {}, - shouldIncludeItem = () => true, + context = DEFAULT_CONTEXT, + shouldIncludeItem = DEFAULT_SHOULD_INCLUDE_ITEM, maxRetries = MAX_RETRIES, + livenessStaleMs = null, + livenessCheckEveryMs = 15_000, ...queryParams } = {}, ) { const variables = useShallowMemo(() => paramVariables, paramVariables); - const { wsClosedDate } = useNetworkState(["wsClosedDate"]); + const { wsClosedDate, wsConnected } = useNetworkState([ + "wsClosedDate", + "wsConnected", + ]); // State to force re-render and retry subscription const [retryTrigger, setRetryTrigger] = useState(0); @@ -49,6 +57,50 @@ export default function useLatestWithSubscription( const timeoutIdRef = useRef(null); const unsubscribeRef = useRef(null); + // Avoid resubscribe loops caused by unstable inline params (object/function identity). + // We deliberately do NOT put these in the subscribe effect dependency array. + const contextRef = useRef(context); + const shouldIncludeItemRef = useRef(shouldIncludeItem); + useEffect(() => { + contextRef.current = context; + }, [context]); + useEffect(() => { + shouldIncludeItemRef.current = shouldIncludeItem; + }, [shouldIncludeItem]); + + // Per-subscription liveness watchdog + const lastSubscriptionDataAtRef = useRef(Date.now()); + const lastLivenessKickAtRef = useRef(0); + + useEffect(() => { + if (!livenessStaleMs) return; + if (skip) return; + + const interval = setInterval(() => { + if (!wsConnected) return; + const age = Date.now() - lastSubscriptionDataAtRef.current; + if (age < livenessStaleMs) return; + + const now = Date.now(); + if (now - lastLivenessKickAtRef.current < livenessStaleMs) return; + lastLivenessKickAtRef.current = now; + + console.warn( + `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`, + ); + lastSubscriptionDataAtRef.current = now; + setRetryTrigger((prev) => prev + 1); + }, livenessCheckEveryMs); + + return () => clearInterval(interval); + }, [ + livenessStaleMs, + livenessCheckEveryMs, + skip, + subscriptionKey, + wsConnected, + ]); + useEffect(() => { const currentVarsHash = JSON.stringify(variables); if (currentVarsHash !== variableHashRef.current) { @@ -221,7 +273,7 @@ export default function useLatestWithSubscription( [cursorVar]: highestIdRef.current, }, context: { - ...context, + ...contextRef.current, subscriptionKey, }, onError: (error) => { @@ -252,6 +304,7 @@ export default function useLatestWithSubscription( if (subscriptionData.data) { retryCountRef.current = 0; subscriptionErrorRef.current = null; + lastSubscriptionDataAtRef.current = Date.now(); } if (!subscriptionData.data) return prev; @@ -266,7 +319,7 @@ export default function useLatestWithSubscription( // Filter new items const filteredNewItems = newItems.filter( (item) => - shouldIncludeItem(item, context) && + shouldIncludeItemRef.current(item, contextRef.current) && !existingItems.some( (existing) => existing[uniqKey] === item[uniqKey], ), @@ -364,10 +417,10 @@ export default function useLatestWithSubscription( uniqKey, cursorKey, subscriptionKey, - context, - shouldIncludeItem, retryTrigger, maxRetries, + livenessStaleMs, + livenessCheckEveryMs, ]); return { diff --git a/src/hooks/useStreamQueryWithSubscription.js b/src/hooks/useStreamQueryWithSubscription.js index 1474dfe..326b95c 100644 --- a/src/hooks/useStreamQueryWithSubscription.js +++ b/src/hooks/useStreamQueryWithSubscription.js @@ -9,6 +9,9 @@ const MAX_RETRIES = 5; const INITIAL_BACKOFF_MS = 1000; // 1 second const MAX_BACKOFF_MS = 30000; // 30 seconds +const DEFAULT_CONTEXT = {}; +const DEFAULT_SHOULD_INCLUDE_ITEM = () => true; + export default function useStreamQueryWithSubscription( initialQuery, subscription, @@ -20,15 +23,20 @@ export default function useStreamQueryWithSubscription( initialCursor = -1, skip = false, subscriptionKey = "default", - context = {}, - shouldIncludeItem = () => true, + context = DEFAULT_CONTEXT, + shouldIncludeItem = DEFAULT_SHOULD_INCLUDE_ITEM, maxRetries = MAX_RETRIES, // Allow overriding default max retries + livenessStaleMs = null, + livenessCheckEveryMs = 15_000, ...queryParams } = {}, ) { const variables = useShallowMemo(() => paramVariables, paramVariables); - const { wsClosedDate } = useNetworkState(["wsClosedDate"]); + const { wsClosedDate, wsConnected } = useNetworkState([ + "wsClosedDate", + "wsConnected", + ]); // State to force re-render and retry subscription const [retryTrigger, setRetryTrigger] = useState(0); @@ -42,6 +50,52 @@ export default function useStreamQueryWithSubscription( const timeoutIdRef = useRef(null); const unsubscribeRef = useRef(null); + // Avoid resubscribe loops caused by unstable inline params (object/function identity). + // We deliberately do NOT put these in the subscribe effect dependency array. + const contextRef = useRef(context); + const shouldIncludeItemRef = useRef(shouldIncludeItem); + useEffect(() => { + contextRef.current = context; + }, [context]); + useEffect(() => { + shouldIncludeItemRef.current = shouldIncludeItem; + }, [shouldIncludeItem]); + + // Per-subscription liveness watchdog: if WS is connected but this subscription + // hasn't delivered any payload for some time, trigger a resubscribe. + const lastSubscriptionDataAtRef = useRef(Date.now()); + const lastLivenessKickAtRef = useRef(0); + + useEffect(() => { + if (!livenessStaleMs) return; + if (skip) return; + + const interval = setInterval(() => { + if (!wsConnected) return; + const age = Date.now() - lastSubscriptionDataAtRef.current; + if (age < livenessStaleMs) return; + + const now = Date.now(); + // Avoid spamming resubscribe triggers. + if (now - lastLivenessKickAtRef.current < livenessStaleMs) return; + lastLivenessKickAtRef.current = now; + + console.warn( + `[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`, + ); + lastSubscriptionDataAtRef.current = now; + setRetryTrigger((prev) => prev + 1); + }, livenessCheckEveryMs); + + return () => clearInterval(interval); + }, [ + livenessStaleMs, + livenessCheckEveryMs, + skip, + subscriptionKey, + wsConnected, + ]); + useEffect(() => { const currentVarsHash = JSON.stringify(variables); if (currentVarsHash !== variableHashRef.current) { @@ -212,7 +266,7 @@ export default function useStreamQueryWithSubscription( [cursorVar]: lastCursorRef.current, }, context: { - ...context, + ...contextRef.current, subscriptionKey, }, onError: (error) => { @@ -243,6 +297,7 @@ export default function useStreamQueryWithSubscription( if (subscriptionData.data) { retryCountRef.current = 0; subscriptionErrorRef.current = null; + lastSubscriptionDataAtRef.current = Date.now(); } if (!subscriptionData.data) return prev; @@ -258,14 +313,14 @@ export default function useStreamQueryWithSubscription( const itemMap = new Map(); existingItems.forEach((item) => { // If the user's filter says "include," we add it - if (shouldIncludeItem(item, context)) { + if (shouldIncludeItemRef.current(item, contextRef.current)) { itemMap.set(item[uniqKey], item); } }); // 2) Merge new items newItems.forEach((item) => { - if (!shouldIncludeItem(item, context)) { + if (!shouldIncludeItemRef.current(item, contextRef.current)) { return; } @@ -371,10 +426,10 @@ export default function useStreamQueryWithSubscription( uniqKey, cursorKey, subscriptionKey, - context, - shouldIncludeItem, retryTrigger, maxRetries, + livenessStaleMs, + livenessCheckEveryMs, ]); return {