Skip to content

Commit 215b7f9

Browse files
authored
chore: add script to fix work experience with epoch dates (CM-914) (#3813)
1 parent 8a7b3a3 commit 215b7f9

8 files changed

Lines changed: 161 additions & 8 deletions

File tree

services/apps/members_enrichment_worker/src/activities/enrichment.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,8 +442,8 @@ export async function updateMemberUsingSquashedPayload(
442442

443443
if (newMemberOrgId) {
444444
newOrUpdatedMemberOrgs.push({
445+
id: newMemberOrgId,
445446
organizationId: org.organizationId,
446-
memberOrganizationId: newMemberOrgId,
447447
})
448448
}
449449
}
@@ -461,7 +461,7 @@ export async function updateMemberUsingSquashedPayload(
461461

462462
if (updatedMemberOrgId) {
463463
newOrUpdatedMemberOrgs.push({
464-
memberOrganizationId: updatedMemberOrgId,
464+
id: updatedMemberOrgId,
465465
organizationId: memberOrg.orgId,
466466
})
467467
}
@@ -478,7 +478,7 @@ export async function updateMemberUsingSquashedPayload(
478478
await changeMemberOrganizationAffiliationOverrides(qx, [
479479
{
480480
memberId,
481-
memberOrganizationId: mo.memberOrganizationId,
481+
memberOrganizationId: mo.id,
482482
allowAffiliation: false,
483483
},
484484
])

services/apps/script_executor_worker/src/activities.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ import {
5151
isLfxMember,
5252
updateOrganizationIdentity,
5353
} from './activities/fix-organization-identities-with-wrong-urls'
54+
import {
55+
findMemberWorkExperienceWithEpochDates,
56+
updateMemberWorkExperience,
57+
} from './activities/fix-work-experience-epoch-dates'
5458
import {
5559
findMembersWithSamePlatformIdentitiesDifferentCapitalization,
5660
findMembersWithSameVerifiedEmailsInDifferentPlatforms,
@@ -100,4 +104,6 @@ export {
100104
deleteOrphanMembersSegmentsAgg,
101105
getOrphanOrganizationSegmentsAgg,
102106
deleteOrphanOrganizationSegmentsAgg,
107+
findMemberWorkExperienceWithEpochDates,
108+
updateMemberWorkExperience,
103109
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { fetchMemberWorkExperienceWithEpochDates, pgpQx } from '@crowd/data-access-layer'
2+
import { updateMemberOrg } from '@crowd/data-access-layer/src/old/apps/members_enrichment_worker'
3+
import { IMemberOrganization, IMemberOrganizationData } from '@crowd/types'
4+
5+
import { svc } from '../main'
6+
7+
export async function findMemberWorkExperienceWithEpochDates(
8+
batchSize: number,
9+
): Promise<IMemberOrganization[]> {
10+
const qx = pgpQx(svc.postgres.reader.connection())
11+
return fetchMemberWorkExperienceWithEpochDates(qx, batchSize)
12+
}
13+
14+
export async function updateMemberWorkExperience(
15+
memberId: string,
16+
original: IMemberOrganizationData,
17+
toUpdate: Partial<IMemberOrganization>,
18+
): Promise<void> {
19+
// updateMemberOrg already handles duplicate detection and soft-deletes the source row
20+
// if another row with the target (memberId, orgId, dateStart, dateEnd) already exists.
21+
await svc.postgres.writer.transactionally(async (tx) => {
22+
await updateMemberOrg(tx.transaction(), memberId, original, toUpdate)
23+
})
24+
}

services/apps/script_executor_worker/src/workflows.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { dissectMember } from './workflows/dissectMember'
88
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
99
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
1010
import { fixBotMembersAffiliation } from './workflows/fix-bot-members-affiliation'
11+
import { fixWorkExperienceEpochDates } from './workflows/fix-work-experience-epoch-dates'
1112
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
1213
import { processLLMVerifiedMerges } from './workflows/processLLMVerifiedMerges'
1314
import { recalculateMemberAffiliations } from './workflows/recalculate-member-affiliations'
@@ -26,4 +27,5 @@ export {
2627
fixBotMembersAffiliation,
2728
blockProjectOrganizationAffiliations,
2829
recalculateMemberAffiliations,
30+
fixWorkExperienceEpochDates,
2931
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import {
2+
ChildWorkflowCancellationType,
3+
ParentClosePolicy,
4+
continueAsNew,
5+
proxyActivities,
6+
startChild,
7+
workflowInfo,
8+
} from '@temporalio/workflow'
9+
10+
import * as activities from '../activities'
11+
import { IScriptBatchTestArgs } from '../types'
12+
import { chunkArray } from '../utils/common'
13+
14+
import { recalculateMemberAffiliations } from './recalculate-member-affiliations'
15+
16+
const {
17+
findMemberWorkExperienceWithEpochDates,
18+
updateMemberWorkExperience,
19+
markMemberForAffiliationRecalc,
20+
} = proxyActivities<typeof activities>({
21+
startToCloseTimeout: '30 minutes',
22+
})
23+
24+
export async function fixWorkExperienceEpochDates(args: IScriptBatchTestArgs): Promise<void> {
25+
const info = workflowInfo()
26+
const WORK_EXPERIENCES_PER_RUN = args.batchSize ?? 1000
27+
28+
const workExperiences = await findMemberWorkExperienceWithEpochDates(WORK_EXPERIENCES_PER_RUN)
29+
30+
if (workExperiences?.length === 0) {
31+
console.log('No more work experiences to fix, triggering recalculation of member affiliations!')
32+
33+
await startChild(recalculateMemberAffiliations, {
34+
workflowId: `recalculateMemberAffiliations/${info.workflowId}`,
35+
cancellationType: ChildWorkflowCancellationType.ABANDON,
36+
parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON,
37+
retry: {
38+
backoffCoefficient: 2,
39+
initialInterval: 2 * 1000,
40+
maximumInterval: 30 * 1000,
41+
},
42+
args: [
43+
{
44+
batchSize: 500,
45+
},
46+
],
47+
})
48+
49+
return
50+
}
51+
52+
const isEpoch = (v?: string | Date | null) => {
53+
if (!v) return false
54+
const d = v instanceof Date ? v : new Date(v)
55+
return d.getTime() === 0
56+
}
57+
58+
for (const chunk of chunkArray(workExperiences, 10)) {
59+
await Promise.all(
60+
chunk.map((we) => {
61+
// prepare original object
62+
const original = {
63+
id: we.id,
64+
orgId: we.organizationId,
65+
jobTitle: we.title,
66+
dateStart: we.dateStart as string,
67+
dateEnd: we.dateEnd as string,
68+
source: we.source,
69+
}
70+
71+
// prepare toUpdate object
72+
const toUpdate = {
73+
...(isEpoch(original.dateStart) && { dateStart: null }),
74+
...(isEpoch(original.dateEnd) && { dateEnd: null }),
75+
}
76+
77+
if (args.testRun) {
78+
console.log(`Updating work experience for member ${we.memberId}`)
79+
console.log(`Original: ${JSON.stringify(original)}`)
80+
}
81+
82+
return updateMemberWorkExperience(we.memberId, original, toUpdate)
83+
}),
84+
)
85+
}
86+
87+
// deduplicate memberIds and queue for affiliation recalculation
88+
const uniqueMemberIds = Array.from(new Set(workExperiences.map((we) => we.memberId)))
89+
90+
await markMemberForAffiliationRecalc(uniqueMemberIds)
91+
92+
if (args.testRun) {
93+
console.log('Test run completed - stopping after first batch!')
94+
return
95+
}
96+
97+
await continueAsNew<typeof fixWorkExperienceEpochDates>(args)
98+
}

services/libs/data-access-layer/src/members/organizations.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,3 +973,25 @@ export async function mergeRoles(
973973
}
974974
}
975975
}
976+
977+
export async function fetchMemberWorkExperienceWithEpochDates(
978+
qx: QueryExecutor,
979+
batchSize: number,
980+
): Promise<IMemberOrganization[]> {
981+
const result = await qx.select(
982+
`
983+
SELECT id, "memberId", "organizationId", "dateStart", "dateEnd", "title", "source"
984+
FROM "memberOrganizations"
985+
WHERE (
986+
"dateStart" = '1970-01-01 00:00:00+00'::timestamptz
987+
OR "dateEnd" = '1970-01-01 00:00:00+00'::timestamptz
988+
)
989+
AND "deletedAt" IS NULL
990+
ORDER BY "id" ASC
991+
LIMIT $(batchSize);
992+
`,
993+
{ batchSize },
994+
)
995+
996+
return result
997+
}

services/libs/data-access-layer/src/old/apps/members_enrichment_worker/index.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -478,14 +478,15 @@ export async function updateMemberOrg(
478478
return null
479479
}
480480

481-
// first check if another row like this exists
482-
// so that we don't get unique index violations
481+
// First check if another row like this exists so that we don't get unique index violations.
482+
// We compute the "target" state after applying toUpdate to decide what to look for.
483483
const params = {
484484
memberId,
485485
id: original.id,
486486
organizationId: original.orgId,
487-
dateStart: toUpdate.dateStart === undefined ? toUpdate.dateStart : original.dateStart,
488-
dateEnd: toUpdate.dateEnd === undefined ? toUpdate.dateEnd : original.dateEnd,
487+
// Use updated value if provided, otherwise keep original
488+
dateStart: toUpdate.dateStart !== undefined ? toUpdate.dateStart : original.dateStart,
489+
dateEnd: toUpdate.dateEnd !== undefined ? toUpdate.dateEnd : original.dateEnd,
489490
}
490491

491492
let dateEndFilter = `and "dateEnd" = $(dateEnd)`

services/libs/types/src/enrichment.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export interface IEnrichableMemberIdentityActivityAggregate {
3434
export interface IMemberOrganizationData {
3535
id: string
3636
orgId: string
37-
orgName: string
37+
orgName?: string
3838
jobTitle: string
3939
dateStart: string
4040
dateEnd: string

0 commit comments

Comments
 (0)