Skip to content

Commit 71f5731

Browse files
committed
refactor(sdk): extract shared MAM catch-up utilities into mamCatchUpUtils
Consolidate duplicated findNewestMessage, connection error checks, and hardcoded constants from chatSideEffects, roomSideEffects, MAM, and backgroundSync into a single shared module.
1 parent 707f8c0 commit 71f5731

10 files changed

Lines changed: 264 additions & 87 deletions

File tree

apps/fluux/src-tauri/tauri.conf.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@
6868
],
6969
"macOS": {
7070
"minimumSystemVersion": "10.13",
71-
"bundleVersion": "12987ef",
71+
"bundleVersion": "707f8c0",
7272
"entitlements": "Entitlements.plist",
7373
"signingIdentity": null
7474
},

packages/fluux-sdk/src/core/backgroundSync.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { connectionStore, chatStore, roomStore } from '../stores'
2020
import { NS_MAM } from './namespaces'
2121
import { logInfo } from './logger'
2222
import { buildScopedStorageKey } from '../utils/storageScope'
23+
import { MAM_ROOM_CATCHUP_DELAY_MS } from '../utils/mamCatchUpUtils'
2324

2425
/**
2526
* Sets up background sync side effects that run after a fresh session.
@@ -173,7 +174,7 @@ export function setupBackgroundSyncSideEffects(
173174
// Silently ignore member discovery errors
174175
}
175176
})()
176-
}, 10_000)
177+
}, MAM_ROOM_CATCHUP_DELAY_MS)
177178
}
178179

179180
// Fresh session: 'online' fires only on fresh sessions (not SM resumption).

packages/fluux-sdk/src/core/chatSideEffects.ts

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,12 @@ import { chatStore, connectionStore } from '../stores'
1616
import { NS_MAM } from './namespaces'
1717
import { logInfo } from './logger'
1818
import { getDomain } from './jid'
19-
20-
/**
21-
* Find the newest message in the array (regardless of delay status).
22-
*
23-
* Used as the catch-up cursor for MAM forward queries. Previously this
24-
* skipped delayed messages, but that caused a subtle bug: when ALL cached
25-
* messages are delayed (common after previous MAM catch-ups), the cursor
26-
* returned undefined, triggering a backward query. The backward query's
27-
* merge (`prependOlderMessages`) incorrectly placed newer messages (sent
28-
* from another client while offline) at the top of the list, making them
29-
* invisible to the user and preventing the sidebar lastMessage update.
30-
*/
31-
function findNewestMessage(messages: Array<{ timestamp?: Date }>): { timestamp: Date } | undefined {
32-
for (let i = messages.length - 1; i >= 0; i--) {
33-
if (messages[i].timestamp) return messages[i] as { timestamp: Date }
34-
}
35-
return undefined
36-
}
19+
import {
20+
findNewestMessage,
21+
buildCatchUpStartTime,
22+
isConnectionError,
23+
MAM_CACHE_LOAD_LIMIT,
24+
} from '../utils/mamCatchUpUtils'
3725

3826
/**
3927
* Options for configuring side effects behavior.
@@ -105,28 +93,24 @@ export function setupChatSideEffects(
10593
// handler races with the conversation subscriber's cache load, and
10694
// messages may be empty — causing a backward query instead of a
10795
// forward catch-up from the newest cached message.
108-
await chatStore.getState().loadMessagesFromCache(conversationId, { limit: 100 })
96+
await chatStore.getState().loadMessagesFromCache(conversationId, { limit: MAM_CACHE_LOAD_LIMIT })
10997

11098
// Re-read messages after cache load (store was mutated)
11199
const cachedMessages = chatStore.getState().messages.get(conversationId) || []
112100
const newestMessage = findNewestMessage(cachedMessages)
113101

114102
const queryOptions: { with: string; start?: string } = { with: conversation.id }
115103
if (newestMessage?.timestamp) {
116-
const startTime = new Date(newestMessage.timestamp.getTime() + 1)
117-
queryOptions.start = startTime.toISOString()
104+
queryOptions.start = buildCatchUpStartTime(newestMessage.timestamp)
118105
}
119106

120107
await client.chat.queryMAM(queryOptions)
121108
logInfo('Chat: MAM sync complete')
122109
} catch (error) {
123-
// Only log if it's not a disconnection error (those are expected during reconnect)
124-
const isConnectionError = error instanceof Error &&
125-
(error.message.includes('disconnected') ||
126-
error.message.includes('Not connected') ||
127-
error.message.includes('Socket not available'))
110+
// Allow retry on next conversation switch or reconnect
111+
fetchInitiated.delete(conversationId)
128112

129-
if (isConnectionError) {
113+
if (isConnectionError(error)) {
130114
if (debug) console.log('[SideEffects] Chat: MAM skipped - client disconnected')
131115
} else {
132116
console.error('[SideEffects] Chat: MAM fetch failed:', error)
@@ -163,7 +147,7 @@ export function setupChatSideEffects(
163147
// Step 1: Always load from IndexedDB cache (deduplication is handled by loadMessagesFromCache).
164148
// This is a fallback for cases where the hook's cache load didn't run (e.g., reconnection).
165149
if (debug) console.log('[SideEffects] Chat: Loading from cache')
166-
await chatStore.getState().loadMessagesFromCache(activeConversationId, { limit: 100 })
150+
await chatStore.getState().loadMessagesFromCache(activeConversationId, { limit: MAM_CACHE_LOAD_LIMIT })
167151

168152
// Step 2: Background MAM fetch (skip if already initiated this session)
169153
if (fetchInitiated.has(activeConversationId)) {

packages/fluux-sdk/src/core/defaultStoreBindings.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export function createDefaultStoreBindings(options: DefaultStoreBindingsOptions
125125
markAllNeedsCatchUp: chatStore.getState().markAllNeedsCatchUp,
126126
clearNeedsCatchUp: chatStore.getState().clearNeedsCatchUp,
127127
updateLastMessagePreview: chatStore.getState().updateLastMessagePreview,
128+
loadMessagesFromCache: chatStore.getState().loadMessagesFromCache,
128129
getAllConversations: () => {
129130
const state = chatStore.getState()
130131
// Use activeConversations() which efficiently returns only non-archived

packages/fluux-sdk/src/core/modules/MAM.ts

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ import { getBareJid, getResource } from '../jid'
3333
import { generateUUID, generateStableMessageId } from '../../utils/uuid'
3434
import { executeWithConcurrency } from '../../utils/concurrencyUtils'
3535
import { parseRSMResponse } from '../../utils/rsm'
36+
import {
37+
findNewestMessage,
38+
buildCatchUpStartTime,
39+
isConnectionError,
40+
MAM_CATCHUP_FORWARD_MAX,
41+
MAM_CATCHUP_BACKWARD_MAX,
42+
MAM_CACHE_LOAD_LIMIT,
43+
} from '../../utils/mamCatchUpUtils'
3644
import {
3745
NS_MAM,
3846
NS_RSM,
@@ -86,21 +94,6 @@ interface UnresolvedModifications {
8694
reactions: { targetId: string; from: string; emojis: string[] }[]
8795
}
8896

89-
/**
90-
* Find the newest message in an array (regardless of delay status).
91-
*
92-
* Used as the catch-up cursor for MAM forward queries. Including delayed
93-
* messages ensures the catch-up always uses a forward query, which merges
94-
* correctly via full sort. Previously skipping delayed messages caused
95-
* backward queries whose prepend-based merge put newer messages (sent
96-
* from another client while offline) at the wrong position.
97-
*/
98-
function findNewestMessage(messages: Array<{ timestamp?: Date }>): { timestamp: Date } | undefined {
99-
for (let i = messages.length - 1; i >= 0; i--) {
100-
if (messages[i].timestamp) return messages[i] as { timestamp: Date }
101-
}
102-
return undefined
103-
}
10497

10598
/**
10699
* Message Archive Management (XEP-0313) module.
@@ -272,8 +265,7 @@ export class MAM extends BaseModule {
272265
return { messages: allMessages, complete: isComplete, rsm: lastRsm }
273266
} catch (error) {
274267
const msg = error instanceof Error ? error.message : 'Unknown error'
275-
const isConnectionError = msg.includes('Not connected') || msg.includes('Socket not available')
276-
if (isConnectionError) {
268+
if (isConnectionError(error)) {
277269
logInfo(`MAM skipped: ...@${getDomain(conversationId) || '*'}${msg}`)
278270
} else {
279271
logErr(`MAM error: ...@${getDomain(conversationId) || '*'}${msg}`)
@@ -407,8 +399,7 @@ export class MAM extends BaseModule {
407399
return { messages: allMessages, complete: isComplete, rsm: lastRsm }
408400
} catch (error) {
409401
const msg = error instanceof Error ? error.message : 'Unknown error'
410-
const isConnectionError = msg.includes('Not connected') || msg.includes('Socket not available')
411-
if (isConnectionError) {
402+
if (isConnectionError(error)) {
412403
logInfo(`Room MAM skipped: ${roomJid}${msg}`)
413404
} else {
414405
logErr(`Room MAM error: ${roomJid}${msg}`)
@@ -822,23 +813,30 @@ export class MAM extends BaseModule {
822813
// Skip if disconnected (avoid queuing doomed queries)
823814
if (this.deps.stores?.connection.getStatus() !== 'online') return
824815

825-
const messages = conv.messages || []
816+
// Load IndexedDB cache first so we know the newest cached message
817+
// and can do a proper forward catch-up instead of fetching only latest.
818+
// Without this, conv.messages is empty after app restart (runtime-only),
819+
// causing a backward "before:''" query that creates gaps with old cache.
820+
await this.deps.stores?.chat.loadMessagesFromCache?.(conv.id, { limit: MAM_CACHE_LOAD_LIMIT })
821+
822+
// Re-read messages after cache load (store was mutated)
823+
const updatedConv = this.deps.stores?.chat.getAllConversations()?.find(c => c.id === conv.id)
824+
const messages = updatedConv?.messages || conv.messages || []
826825
const newestMessage = findNewestMessage(messages)
827826

828827
if (newestMessage?.timestamp) {
829828
// Forward query from the last known message
830-
const startTime = new Date(newestMessage.timestamp.getTime() + 1)
831829
await this.queryArchive({
832830
with: conv.id,
833-
start: startTime.toISOString(),
834-
max: 100,
831+
start: buildCatchUpStartTime(newestMessage.timestamp),
832+
max: MAM_CATCHUP_FORWARD_MAX,
835833
})
836834
} else {
837835
// No messages (empty) — fetch latest from MAM
838836
await this.queryArchive({
839837
with: conv.id,
840838
before: '',
841-
max: 50,
839+
max: MAM_CATCHUP_BACKWARD_MAX,
842840
})
843841
}
844842
} catch (_error) {
@@ -940,10 +938,10 @@ export class MAM extends BaseModule {
940938
if (this.deps.stores?.connection.getStatus() !== 'online') return
941939

942940
// Load IndexedDB cache first so we know the newest cached message
943-
// and can do a proper forward catch-up instead of fetching only latest 50.
941+
// and can do a proper forward catch-up instead of fetching only latest.
944942
// Without this, room.messages is empty after app restart (runtime-only),
945943
// causing a backward "before:''" query that creates gaps with old cache.
946-
await this.deps.stores?.room.loadMessagesFromCache(room.jid, { limit: 100 })
944+
await this.deps.stores?.room.loadMessagesFromCache(room.jid, { limit: MAM_CACHE_LOAD_LIMIT })
947945

948946
// Re-read room after cache load (store was mutated)
949947
const updatedRoom = this.deps.stores?.room.getRoom(room.jid)
@@ -952,18 +950,17 @@ export class MAM extends BaseModule {
952950

953951
if (newestMessage?.timestamp) {
954952
// Forward query from the last known message
955-
const startTime = new Date(newestMessage.timestamp.getTime() + 1)
956953
await this.queryRoomArchive({
957954
roomJid: room.jid,
958-
start: startTime.toISOString(),
959-
max: 100,
955+
start: buildCatchUpStartTime(newestMessage.timestamp),
956+
max: MAM_CATCHUP_FORWARD_MAX,
960957
})
961958
} else {
962959
// No messages (empty) — fetch latest from MAM
963960
await this.queryRoomArchive({
964961
roomJid: room.jid,
965962
before: '',
966-
max: 50,
963+
max: MAM_CATCHUP_BACKWARD_MAX,
967964
})
968965
}
969966
} catch (_error) {

packages/fluux-sdk/src/core/roomSideEffects.ts

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,14 @@ import type { XMPPClient } from './XMPPClient'
1818
import type { SideEffectsOptions } from './chatSideEffects'
1919
import { roomStore, connectionStore } from '../stores'
2020
import { logInfo } from './logger'
21-
22-
/**
23-
* Find the newest message in the array (regardless of delay status).
24-
*
25-
* Used as the catch-up cursor for MAM forward queries. Including delayed
26-
* messages ensures the catch-up always uses a forward query, which merges
27-
* correctly via full sort.
28-
*/
29-
function findNewestMessage(messages: Array<{ timestamp?: Date }>): { timestamp: Date } | undefined {
30-
for (let i = messages.length - 1; i >= 0; i--) {
31-
if (messages[i].timestamp) return messages[i] as { timestamp: Date }
32-
}
33-
return undefined
34-
}
21+
import {
22+
findNewestMessage,
23+
buildCatchUpStartTime,
24+
isConnectionError,
25+
MAM_CATCHUP_FORWARD_MAX,
26+
MAM_CATCHUP_BACKWARD_MAX,
27+
MAM_CACHE_LOAD_LIMIT,
28+
} from '../utils/mamCatchUpUtils'
3529

3630
/**
3731
* Sets up room-related side effects.
@@ -111,7 +105,7 @@ export function setupRoomSideEffects(
111105
// handler races with the conversation subscriber's cache load, and
112106
// room.messages may be empty — causing a backward "before:''" query
113107
// instead of a forward catch-up from the newest cached message.
114-
await roomStore.getState().loadMessagesFromCache(roomJid, { limit: 100 })
108+
await roomStore.getState().loadMessagesFromCache(roomJid, { limit: MAM_CACHE_LOAD_LIMIT })
115109

116110
// Re-read room after cache load (store was mutated)
117111
const roomAfterCache = roomStore.getState().rooms.get(roomJid)
@@ -120,32 +114,25 @@ export function setupRoomSideEffects(
120114

121115
if (newestMessage?.timestamp) {
122116
// Query for messages AFTER the newest known message (catchup)
123-
const startTime = new Date(newestMessage.timestamp.getTime() + 1)
124117
await client.chat.queryRoomMAM({
125118
roomJid,
126-
start: startTime.toISOString(),
127-
max: 100,
119+
start: buildCatchUpStartTime(newestMessage.timestamp),
120+
max: MAM_CATCHUP_FORWARD_MAX,
128121
})
129122
} else {
130123
// No cached messages - fetch latest
131124
await client.chat.queryRoomMAM({
132125
roomJid,
133126
before: '', // Empty = get latest
134-
max: 50,
127+
max: MAM_CATCHUP_BACKWARD_MAX,
135128
})
136129
}
137130
logInfo('Room: MAM catch-up complete')
138131
} catch (error) {
139132
// Allow backup handlers (room:joined, supportsMAM watcher) to retry
140133
fetchInitiated.delete(roomJid)
141134

142-
// Only log if it's not a disconnection error (those are expected during reconnect)
143-
const isConnectionError = error instanceof Error &&
144-
(error.message.includes('disconnected') ||
145-
error.message.includes('Not connected') ||
146-
error.message.includes('Socket not available'))
147-
148-
if (isConnectionError) {
135+
if (isConnectionError(error)) {
149136
if (debug) console.log('[SideEffects] Room: MAM skipped - client disconnected')
150137
} else {
151138
console.error('[SideEffects] Room: MAM catchup failed:', error)
@@ -187,7 +174,7 @@ export function setupRoomSideEffects(
187174
// Step 1: Always load from IndexedDB cache (deduplication is handled by loadMessagesFromCache).
188175
// This is a fallback for cases where the hook's cache load didn't run (e.g., reconnection).
189176
if (debug) console.log('[SideEffects] Room: Loading from cache')
190-
await roomStore.getState().loadMessagesFromCache(activeRoomJid, { limit: 100 })
177+
await roomStore.getState().loadMessagesFromCache(activeRoomJid, { limit: MAM_CACHE_LOAD_LIMIT })
191178

192179
// Step 2: Background MAM fetch for catchup (skip if already initiated this session)
193180
if (fetchInitiated.has(activeRoomJid)) {

packages/fluux-sdk/src/core/test-utils.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ export const createMockStores = (): MockStoreBindings => ({
642642
markAllNeedsCatchUp: vi.fn(),
643643
clearNeedsCatchUp: vi.fn(),
644644
updateLastMessagePreview: vi.fn(),
645+
loadMessagesFromCache: vi.fn().mockResolvedValue([]),
645646
getAllConversations: vi.fn().mockReturnValue([]),
646647
getArchivedConversations: vi.fn().mockReturnValue([]),
647648
archiveConversation: vi.fn(),

packages/fluux-sdk/src/core/types/client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ export interface StoreBindings {
9696
clearNeedsCatchUp: (conversationId: string) => void
9797
// Update sidebar preview without affecting message history
9898
updateLastMessagePreview: (conversationId: string, lastMessage: Message) => void
99+
// Load messages from IndexedDB cache into the conversation's in-memory message array
100+
loadMessagesFromCache?: (conversationId: string, options?: { limit?: number }) => Promise<unknown>
99101
// Get all conversations for MAM catch-up
100102
getAllConversations: () => Array<{ id: string; messages: Message[] }>
101103
// Smart MAM: archived conversation preview refresh

0 commit comments

Comments
 (0)