Skip to content

Commit cbcaa5c

Browse files
authored
fix: offload delete member and try to handle member identities conflicts better (CM-1054) (#3974)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent d953ce0 commit cbcaa5c

10 files changed

Lines changed: 310 additions & 44 deletions

File tree

backend/src/database/migrations/U1774609007__data-sink-worker-optimizations.sql

Whitespace-only changes.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Drop 4 unused activityRelations indexes (already dropped on prod 2026-03-27,
2+
-- see ACTIVITYRELATIONS_INDEX_CLEANUP.md — IF EXISTS guards for idempotency)
3+
alter table "activityRelations" drop constraint if exists "activityRelations_activityId_memberId_key";
4+
5+
drop index concurrently if exists "ix_activityRelations_memberId_segmentId_include";
6+
drop index concurrently if exists "ix_activityRelations_organizationId_segmentId_include";
7+
drop index concurrently if exists "ix_activityRelations_platform_username";
8+
9+
create index concurrently if not exists idx_osa_org_segment_membercount
10+
on "organizationSegmentsAgg" ("organizationId", "segmentId")
11+
include ("memberCount");

services/apps/data_sink_worker/src/service/activity.service.ts

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import {
2828
} from '@crowd/data-access-layer'
2929
import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types'
3030
import { DbStore, arePrimitivesDbEqual } from '@crowd/data-access-layer/src/database'
31-
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
3231
import {
3332
IActivityRelationCreateOrUpdateData,
3433
IDbActivity,
@@ -56,7 +55,7 @@ import {
5655
} from '@crowd/types'
5756

5857
import { IActivityUpdateData, ISentimentActivityInput } from './activity.data'
59-
import MemberService from './member.service'
58+
import MemberService, { mergeIfAllowed } from './member.service'
6059
import { IProcessActivityResult } from './types'
6160

6261
/* eslint-disable @typescript-eslint/no-explicit-any */
@@ -293,6 +292,28 @@ export default class ActivityService extends LoggerBase {
293292
}
294293
}
295294

295+
// When activity.username is set but differs from the member's platform identity value,
296+
// override it so the member lookup and the identity insert use the same key.
297+
// Example: git activities set activity.username to the author display name (e.g. "John Doe")
298+
// while the identity stores the email (e.g. "john.doe@example.com"). Without this correction
299+
// the lookup misses the existing member, creating an unnecessary orphan member.
300+
if (username && member) {
301+
const platformIdentity = member.identities.find(
302+
(i) =>
303+
i.platform === platform &&
304+
i.type === MemberIdentityType.USERNAME &&
305+
i.value &&
306+
i.verified,
307+
)
308+
if (platformIdentity && platformIdentity.value !== username) {
309+
this.log.debug(
310+
{ platform, originalUsername: username, correctedUsername: platformIdentity.value },
311+
'Overriding activity.username with member platform identity value',
312+
)
313+
activity.username = platformIdentity.value
314+
}
315+
}
316+
296317
member.identities = member.identities.filter((i) => i.value)
297318

298319
if (!username) {
@@ -1721,32 +1742,24 @@ export default class ActivityService extends LoggerBase {
17211742
const originalId = metadata.memberWithIdentity as string
17221743
const targetId = metadata.memberIdToUpdate as string
17231744

1724-
// but first check memberNoMerge table
1725-
const noMergeMemberIds = await getMemberNoMerge(this.pgQx, [originalId, targetId])
1726-
1727-
const noMerge = singleOrDefault(
1728-
noMergeMemberIds,
1729-
(m) =>
1730-
(m.memberId === originalId && m.noMergeId === targetId) ||
1731-
(m.memberId === targetId && m.noMergeId === originalId),
1732-
)
1733-
1734-
if (noMerge) {
1735-
metadata.noMerge = true
1736-
} else {
1737-
try {
1738-
await this.pgQx.tx(async (txPgQx) => {
1739-
const service = new CommonMemberService(txPgQx, this.temporal, this.log)
1740-
await service.merge(originalId, targetId)
1741-
})
1742-
1745+
try {
1746+
const merged = await mergeIfAllowed(
1747+
this.pgQx,
1748+
this.temporal,
1749+
this.log,
1750+
originalId,
1751+
targetId,
1752+
)
1753+
if (merged) {
17431754
return originalId
1744-
} catch (err) {
1745-
metadata.mergeError = {
1746-
errorMessage: err?.message ?? '<no error message>',
1747-
errorStack: err?.stack,
1748-
err,
1749-
}
1755+
} else {
1756+
metadata.noMerge = true
1757+
}
1758+
} catch (err) {
1759+
metadata.mergeError = {
1760+
errorMessage: err?.message ?? '<no error message>',
1761+
errorStack: err?.stack,
1762+
err,
17501763
}
17511764
}
17521765
}

services/apps/data_sink_worker/src/service/member.service.ts

Lines changed: 221 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,16 @@ import {
1212
isObjectEmpty,
1313
singleOrDefault,
1414
} from '@crowd/common'
15-
import { BotDetectionService } from '@crowd/common_services'
15+
import { BotDetectionService, CommonMemberService } from '@crowd/common_services'
1616
import { QueryExecutor, createMember, dbStoreQx, updateMember } from '@crowd/data-access-layer'
17-
import { findIdentitiesForMembers, findMembersByVerifiedUsernames } from '@crowd/data-access-layer'
17+
import {
18+
findIdentitiesForMembers,
19+
findMemberIdByVerifiedIdentity,
20+
findMembersByVerifiedUsernames,
21+
moveToNewMember,
22+
} from '@crowd/data-access-layer'
1823
import { DbStore } from '@crowd/data-access-layer/src/database'
24+
import { getMemberNoMerge } from '@crowd/data-access-layer/src/member_merge'
1925
import IntegrationRepository from '@crowd/data-access-layer/src/old/apps/data_sink_worker/repo/integration.repo'
2026
import {
2127
IDbMember,
@@ -60,6 +66,36 @@ function orgCacheKey(org: IOrganization): string | null {
6066
return null
6167
}
6268

69+
/**
70+
* Checks the memberNoMerge table and, if allowed, merges secondaryId into primaryId
71+
* using CommonMemberService. Returns true if the merge was performed, false if a noMerge
72+
* record prevents it. Throws if the merge itself fails.
73+
*/
74+
export async function mergeIfAllowed(
75+
pgQx: QueryExecutor,
76+
temporal: TemporalClient,
77+
log: Logger,
78+
primaryId: string,
79+
secondaryId: string,
80+
): Promise<boolean> {
81+
const noMergeMemberIds = await getMemberNoMerge(pgQx, [primaryId, secondaryId])
82+
const noMerge = singleOrDefault(
83+
noMergeMemberIds,
84+
(m) =>
85+
(m.memberId === primaryId && m.noMergeId === secondaryId) ||
86+
(m.memberId === secondaryId && m.noMergeId === primaryId),
87+
)
88+
if (noMerge) {
89+
log.warn({ primaryId, secondaryId }, 'Members are marked as no-merge — skipping merge')
90+
return false
91+
}
92+
await pgQx.tx(async (txPgQx) => {
93+
const service = new CommonMemberService(txPgQx, temporal, log)
94+
await service.merge(primaryId, secondaryId)
95+
})
96+
return true
97+
}
98+
6399
export default class MemberService extends LoggerBase {
64100
private readonly memberRepo: MemberRepository
65101
private readonly pgQx: QueryExecutor
@@ -91,8 +127,18 @@ export default class MemberService extends LoggerBase {
91127
try {
92128
this.log.debug('Creating a new member!')
93129

94-
// prevent empty identity handles
95-
data.identities = data.identities.filter((i) => i.value)
130+
// filter empty handles and deduplicate by (platform, value, type, verified)
131+
data.identities = data.identities.filter(
132+
(identity, idx) =>
133+
!!identity.value &&
134+
data.identities.findIndex(
135+
(j) =>
136+
j.platform === identity.platform &&
137+
j.value === identity.value &&
138+
j.type === identity.type &&
139+
j.verified === identity.verified,
140+
) === idx,
141+
)
96142

97143
if (data.identities.length === 0) {
98144
throw new Error('Member must have at least one identity!')
@@ -154,20 +200,174 @@ export default class MemberService extends LoggerBase {
154200
'memberService -> create -> createMember',
155201
)
156202

203+
let insertedCount: number
157204
try {
158-
await logExecutionTimeV2(
205+
insertedCount = await logExecutionTimeV2(
159206
() => this.memberRepo.insertIdentities(id, integrationId, data.identities),
160207
this.log,
161208
'memberService -> create -> insertIdentities',
162209
)
163210
} catch (err) {
164-
this.log.error(err, { memberId: id }, 'Error while inserting identities!')
211+
this.log.error(err, { memberId: id }, 'Error inserting member identities!')
165212
await logExecutionTimeV2(
166-
async () => this.memberRepo.destroyMemberAfterError(id, false),
213+
async () => this.memberRepo.destroyMemberAfterError(id, true),
167214
this.log,
168215
'memberService -> create -> destroyMemberAfterError',
169216
)
170-
throw new ApplicationError('Error while inserting identities for a new member!', err)
217+
throw err
218+
}
219+
220+
if (insertedCount < data.identities.length) {
221+
// At least one verified identity conflicted. Walk every verified identity to:
222+
// (a) find the existing member(s) that own the conflicting ones, and
223+
// (b) collect identities that were successfully inserted for the orphan.
224+
let existingMemberId: string | null = null
225+
const orphanVerifiedIdentities: IMemberIdentity[] = []
226+
227+
for (const identity of data.identities.filter((i) => i.verified)) {
228+
const owner = await findMemberIdByVerifiedIdentity(
229+
this.pgQx,
230+
identity.platform,
231+
identity.value,
232+
identity.type,
233+
)
234+
235+
if (!owner) {
236+
// The identity disappeared between INSERT and SELECT — unusual race condition.
237+
// Cannot safely resolve; schedule orphan deletion and throw.
238+
this.log.error(
239+
{ orphanMemberId: id, identity },
240+
'Verified identity not found after conflict detection — scheduling orphan deletion',
241+
)
242+
await this.scheduleOrphanMemberDeletion(id)
243+
throw new ApplicationError(
244+
`Identity conflict during member creation: owner not found for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
245+
)
246+
} else if (owner === id) {
247+
// Successfully inserted for the orphan — will be moved to the existing member below
248+
orphanVerifiedIdentities.push(identity)
249+
} else if (!existingMemberId) {
250+
// First conflicting owner found
251+
existingMemberId = owner
252+
} else if (existingMemberId !== owner) {
253+
// A second conflicting owner — two existing members each own a different verified
254+
// identity of this incoming member, so the data source asserts they are the same
255+
// person. Auto-merge the second into the first.
256+
this.log.warn(
257+
{
258+
orphanMemberId: id,
259+
primaryMemberId: existingMemberId,
260+
secondaryMemberId: owner,
261+
identity,
262+
},
263+
'Multiple conflicting verified identities belong to different existing members — merging automatically',
264+
)
265+
let merged: boolean
266+
try {
267+
merged = await mergeIfAllowed(
268+
this.pgQx,
269+
this.temporal,
270+
this.log,
271+
existingMemberId,
272+
owner,
273+
)
274+
} catch (mergeErr) {
275+
this.log.error(
276+
mergeErr,
277+
{
278+
orphanMemberId: id,
279+
primaryMemberId: existingMemberId,
280+
secondaryMemberId: owner,
281+
},
282+
'Auto-merge of conflicting members failed — scheduling orphan deletion',
283+
)
284+
await this.scheduleOrphanMemberDeletion(id)
285+
throw new ApplicationError(
286+
`Identity conflict during member creation: auto-merge of members ${existingMemberId} and ${owner} failed for identity (${identity.platform}, ${identity.value}, ${identity.type})`,
287+
)
288+
}
289+
if (!merged) {
290+
this.log.error(
291+
{
292+
orphanMemberId: id,
293+
primaryMemberId: existingMemberId,
294+
secondaryMemberId: owner,
295+
},
296+
'Auto-merge prevented by noMerge record — scheduling orphan deletion',
297+
)
298+
await this.scheduleOrphanMemberDeletion(id)
299+
throw new ApplicationError(
300+
`Identity conflict during member creation: members ${existingMemberId} and ${owner} are marked as no-merge but share identity (${identity.platform}, ${identity.value}, ${identity.type})`,
301+
)
302+
}
303+
// existingMemberId (primary) survives; owner (secondary) was absorbed
304+
this.log.info(
305+
{
306+
orphanMemberId: id,
307+
survivingMemberId: existingMemberId,
308+
mergedMemberId: owner,
309+
identity,
310+
},
311+
'Auto-merge of conflicting members succeeded',
312+
)
313+
}
314+
// else: owner === existingMemberId — same member owns this identity too, nothing to do
315+
}
316+
317+
if (existingMemberId) {
318+
// Move any verified identities that were inserted for the orphan to the existing
319+
// member so they are not lost when the orphan is cascade-deleted.
320+
// UPDATE memberId rather than INSERT to avoid unique constraint violations.
321+
for (const identity of orphanVerifiedIdentities) {
322+
try {
323+
await moveToNewMember(this.pgQx, {
324+
oldMemberId: id,
325+
newMemberId: existingMemberId,
326+
platform: identity.platform,
327+
value: identity.value,
328+
type: identity.type,
329+
})
330+
} catch (moveErr) {
331+
this.log.error(
332+
moveErr,
333+
{ orphanMemberId: id, existingMemberId, identity },
334+
'Failed to move orphan verified identity to existing member — scheduling orphan deletion',
335+
)
336+
await this.scheduleOrphanMemberDeletion(id)
337+
throw new ApplicationError(
338+
`Failed to move identity (${identity.platform}, ${identity.value}, ${identity.type}) from orphan ${id} to existing member ${existingMemberId}`,
339+
)
340+
}
341+
}
342+
this.log.warn(
343+
{
344+
orphanMemberId: id,
345+
existingMemberId,
346+
transferredIdentities: orphanVerifiedIdentities.length,
347+
},
348+
'Identity conflict during member creation — reusing existing member, scheduling orphan deletion',
349+
)
350+
await logExecutionTimeV2(
351+
() => this.memberRepo.addToSegments(existingMemberId, segmentIds),
352+
this.log,
353+
'memberService -> create -> addToSegments (conflict path)',
354+
)
355+
if (releaseMemberLock) {
356+
await releaseMemberLock()
357+
}
358+
await this.scheduleOrphanMemberDeletion(id)
359+
return existingMemberId
360+
}
361+
362+
// insertedCount < data.identities.length but no conflicting owner found — unexpected
363+
this.log.error(
364+
{ memberId: id },
365+
'Identity conflict during member creation but existing member not found — scheduling orphan deletion',
366+
)
367+
await this.scheduleOrphanMemberDeletion(id)
368+
throw new ApplicationError(
369+
`Identity conflict during member creation for member ${id}: inserted ${insertedCount} of ${data.identities.length} identities but found no conflicting owner`,
370+
)
171371
}
172372

173373
try {
@@ -396,7 +596,7 @@ export default class MemberService extends LoggerBase {
396596
this.log.trace({ memberId: id }, 'Inserting new identities!')
397597
try {
398598
await logExecutionTimeV2(
399-
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate),
599+
() => this.memberRepo.insertIdentities(id, integrationId, identitiesToCreate, true),
400600
this.log,
401601
'memberService -> update -> insertIdentities',
402602
)
@@ -822,6 +1022,18 @@ export default class MemberService extends LoggerBase {
8221022
return out
8231023
}
8241024

1025+
private async scheduleOrphanMemberDeletion(memberId: string): Promise<void> {
1026+
try {
1027+
await this.temporal.workflow.start('deleteOrphanMember', {
1028+
taskQueue: 'entity-merging',
1029+
workflowId: `${TemporalWorkflowId.DELETE_ORPHAN_MEMBER}/${memberId}`,
1030+
args: [memberId],
1031+
})
1032+
} catch (err) {
1033+
this.log.error(err, { memberId }, 'Failed to schedule orphan member deletion!')
1034+
}
1035+
}
1036+
8251037
private async startMemberBotAnalysisWithLLMWorkflow(memberId: string): Promise<void> {
8261038
await this.temporal.workflow.start('processMemberBotAnalysisWithLLM', {
8271039
taskQueue: 'profiles',

services/apps/entity_merging_worker/src/workflows.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export {
2+
deleteOrphanMember,
23
finishMemberMerging,
34
finishOrganizationMerging,
45
finishMemberUnmerging,

0 commit comments

Comments
 (0)