fix: chat subscription hangs up

This commit is contained in:
devthejo 2026-01-15 22:35:00 +01:00
parent 6cf49086c0
commit aade47beb3
No known key found for this signature in database
GPG key ID: 00CCA7A92B1D5351
3 changed files with 126 additions and 15 deletions

View file

@ -34,6 +34,9 @@ const AggregatedMessagesSubscription = () => {
subscriptionKey: "aggregated-messages",
fetchPolicy: "network-only",
nextFetchPolicy: "cache-first",
// Chat is latency-sensitive; if the WS transport is up but this subscription
// delivers nothing for a while, force a resubscribe.
livenessStaleMs: 60_000,
},
);

View file

@ -9,6 +9,9 @@ const MAX_RETRIES = 5;
const INITIAL_BACKOFF_MS = 1000; // 1 second
const MAX_BACKOFF_MS = 30000; // 30 seconds
const DEFAULT_CONTEXT = {};
const DEFAULT_SHOULD_INCLUDE_ITEM = () => true;
/**
* Hook that queries for items with custom sorting (e.g., acknowledged first, then by newest)
* while still using ID-based cursor for subscriptions to new items.
@ -27,15 +30,20 @@ export default function useLatestWithSubscription(
variables: paramVariables = {},
skip = false,
subscriptionKey = "default",
context = {},
shouldIncludeItem = () => true,
context = DEFAULT_CONTEXT,
shouldIncludeItem = DEFAULT_SHOULD_INCLUDE_ITEM,
maxRetries = MAX_RETRIES,
livenessStaleMs = null,
livenessCheckEveryMs = 15_000,
...queryParams
} = {},
) {
const variables = useShallowMemo(() => paramVariables, paramVariables);
const { wsClosedDate } = useNetworkState(["wsClosedDate"]);
const { wsClosedDate, wsConnected } = useNetworkState([
"wsClosedDate",
"wsConnected",
]);
// State to force re-render and retry subscription
const [retryTrigger, setRetryTrigger] = useState(0);
@ -49,6 +57,50 @@ export default function useLatestWithSubscription(
const timeoutIdRef = useRef(null);
const unsubscribeRef = useRef(null);
// Avoid resubscribe loops caused by unstable inline params (object/function identity).
// We deliberately do NOT put these in the subscribe effect dependency array.
const contextRef = useRef(context);
const shouldIncludeItemRef = useRef(shouldIncludeItem);
useEffect(() => {
contextRef.current = context;
}, [context]);
useEffect(() => {
shouldIncludeItemRef.current = shouldIncludeItem;
}, [shouldIncludeItem]);
// Per-subscription liveness watchdog
const lastSubscriptionDataAtRef = useRef(Date.now());
const lastLivenessKickAtRef = useRef(0);
useEffect(() => {
if (!livenessStaleMs) return;
if (skip) return;
const interval = setInterval(() => {
if (!wsConnected) return;
const age = Date.now() - lastSubscriptionDataAtRef.current;
if (age < livenessStaleMs) return;
const now = Date.now();
if (now - lastLivenessKickAtRef.current < livenessStaleMs) return;
lastLivenessKickAtRef.current = now;
console.warn(
`[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`,
);
lastSubscriptionDataAtRef.current = now;
setRetryTrigger((prev) => prev + 1);
}, livenessCheckEveryMs);
return () => clearInterval(interval);
}, [
livenessStaleMs,
livenessCheckEveryMs,
skip,
subscriptionKey,
wsConnected,
]);
useEffect(() => {
const currentVarsHash = JSON.stringify(variables);
if (currentVarsHash !== variableHashRef.current) {
@ -221,7 +273,7 @@ export default function useLatestWithSubscription(
[cursorVar]: highestIdRef.current,
},
context: {
...context,
...contextRef.current,
subscriptionKey,
},
onError: (error) => {
@ -252,6 +304,7 @@ export default function useLatestWithSubscription(
if (subscriptionData.data) {
retryCountRef.current = 0;
subscriptionErrorRef.current = null;
lastSubscriptionDataAtRef.current = Date.now();
}
if (!subscriptionData.data) return prev;
@ -266,7 +319,7 @@ export default function useLatestWithSubscription(
// Filter new items
const filteredNewItems = newItems.filter(
(item) =>
shouldIncludeItem(item, context) &&
shouldIncludeItemRef.current(item, contextRef.current) &&
!existingItems.some(
(existing) => existing[uniqKey] === item[uniqKey],
),
@ -364,10 +417,10 @@ export default function useLatestWithSubscription(
uniqKey,
cursorKey,
subscriptionKey,
context,
shouldIncludeItem,
retryTrigger,
maxRetries,
livenessStaleMs,
livenessCheckEveryMs,
]);
return {

View file

@ -9,6 +9,9 @@ const MAX_RETRIES = 5;
const INITIAL_BACKOFF_MS = 1000; // 1 second
const MAX_BACKOFF_MS = 30000; // 30 seconds
const DEFAULT_CONTEXT = {};
const DEFAULT_SHOULD_INCLUDE_ITEM = () => true;
export default function useStreamQueryWithSubscription(
initialQuery,
subscription,
@ -20,15 +23,20 @@ export default function useStreamQueryWithSubscription(
initialCursor = -1,
skip = false,
subscriptionKey = "default",
context = {},
shouldIncludeItem = () => true,
context = DEFAULT_CONTEXT,
shouldIncludeItem = DEFAULT_SHOULD_INCLUDE_ITEM,
maxRetries = MAX_RETRIES, // Allow overriding default max retries
livenessStaleMs = null,
livenessCheckEveryMs = 15_000,
...queryParams
} = {},
) {
const variables = useShallowMemo(() => paramVariables, paramVariables);
const { wsClosedDate } = useNetworkState(["wsClosedDate"]);
const { wsClosedDate, wsConnected } = useNetworkState([
"wsClosedDate",
"wsConnected",
]);
// State to force re-render and retry subscription
const [retryTrigger, setRetryTrigger] = useState(0);
@ -42,6 +50,52 @@ export default function useStreamQueryWithSubscription(
const timeoutIdRef = useRef(null);
const unsubscribeRef = useRef(null);
// Avoid resubscribe loops caused by unstable inline params (object/function identity).
// We deliberately do NOT put these in the subscribe effect dependency array.
const contextRef = useRef(context);
const shouldIncludeItemRef = useRef(shouldIncludeItem);
useEffect(() => {
contextRef.current = context;
}, [context]);
useEffect(() => {
shouldIncludeItemRef.current = shouldIncludeItem;
}, [shouldIncludeItem]);
// Per-subscription liveness watchdog: if WS is connected but this subscription
// hasn't delivered any payload for some time, trigger a resubscribe.
const lastSubscriptionDataAtRef = useRef(Date.now());
const lastLivenessKickAtRef = useRef(0);
useEffect(() => {
if (!livenessStaleMs) return;
if (skip) return;
const interval = setInterval(() => {
if (!wsConnected) return;
const age = Date.now() - lastSubscriptionDataAtRef.current;
if (age < livenessStaleMs) return;
const now = Date.now();
// Avoid spamming resubscribe triggers.
if (now - lastLivenessKickAtRef.current < livenessStaleMs) return;
lastLivenessKickAtRef.current = now;
console.warn(
`[${subscriptionKey}] Liveness stale (${age}ms >= ${livenessStaleMs}ms), forcing resubscribe`,
);
lastSubscriptionDataAtRef.current = now;
setRetryTrigger((prev) => prev + 1);
}, livenessCheckEveryMs);
return () => clearInterval(interval);
}, [
livenessStaleMs,
livenessCheckEveryMs,
skip,
subscriptionKey,
wsConnected,
]);
useEffect(() => {
const currentVarsHash = JSON.stringify(variables);
if (currentVarsHash !== variableHashRef.current) {
@ -212,7 +266,7 @@ export default function useStreamQueryWithSubscription(
[cursorVar]: lastCursorRef.current,
},
context: {
...context,
...contextRef.current,
subscriptionKey,
},
onError: (error) => {
@ -243,6 +297,7 @@ export default function useStreamQueryWithSubscription(
if (subscriptionData.data) {
retryCountRef.current = 0;
subscriptionErrorRef.current = null;
lastSubscriptionDataAtRef.current = Date.now();
}
if (!subscriptionData.data) return prev;
@ -258,14 +313,14 @@ export default function useStreamQueryWithSubscription(
const itemMap = new Map();
existingItems.forEach((item) => {
// If the user's filter says "include," we add it
if (shouldIncludeItem(item, context)) {
if (shouldIncludeItemRef.current(item, contextRef.current)) {
itemMap.set(item[uniqKey], item);
}
});
// 2) Merge new items
newItems.forEach((item) => {
if (!shouldIncludeItem(item, context)) {
if (!shouldIncludeItemRef.current(item, contextRef.current)) {
return;
}
@ -371,10 +426,10 @@ export default function useStreamQueryWithSubscription(
uniqKey,
cursorKey,
subscriptionKey,
context,
shouldIncludeItem,
retryTrigger,
maxRetries,
livenessStaleMs,
livenessCheckEveryMs,
]);
return {