Skip to content

Commit 1c09b5a

Browse files
authored
chore: add script to block organization members affiliation (#3469)
1 parent 0e87f0e commit 1c09b5a

6 files changed

Lines changed: 113 additions & 0 deletions

File tree

services/apps/script_executor_worker/src/activities.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
import {
2+
blockMemberOrganizationAffiliation,
3+
getOrganizationMembers,
4+
} from './activities/block-organization-affiliation'
15
import {
26
findDuplicateMembersAfterDate,
37
moveMemberActivityRelations,
@@ -124,4 +128,6 @@ export {
124128
checkActivitiesWithTimestampExistInQuestDb,
125129
saveMissingActivityInQuestDb,
126130
getMissingActivityInQuestDb,
131+
blockMemberOrganizationAffiliation,
132+
getOrganizationMembers,
127133
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { pgpQx } from '@crowd/data-access-layer'
2+
import { changeOverride } from '@crowd/data-access-layer/src/member_organization_affiliation_overrides'
3+
import OrganizationRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/organization.repo'
4+
import { IMemberOrganization } from '@crowd/types'
5+
6+
import { svc } from '../main'
7+
8+
export async function getOrganizationMembers(
9+
organizationId: string,
10+
limit = 100,
11+
offset = 0,
12+
): Promise<IMemberOrganization[]> {
13+
try {
14+
const orgRepo = new OrganizationRepository(svc.postgres.reader.connection(), svc.log)
15+
return orgRepo.findOrganizationMembers(organizationId, limit, offset)
16+
} catch (error) {
17+
svc.log.error(error, 'Error getting organization members!')
18+
throw error
19+
}
20+
}
21+
22+
export async function blockMemberOrganizationAffiliation(
23+
memberId: string,
24+
memberOrganizationId: string,
25+
): Promise<void> {
26+
try {
27+
const qx = pgpQx(svc.postgres.writer.connection())
28+
return changeOverride(qx, {
29+
memberId,
30+
memberOrganizationId,
31+
allowAffiliation: false,
32+
isPrimaryWorkExperience: false,
33+
})
34+
} catch (error) {
35+
svc.log.error(error, 'Error blocking organization affiliation!')
36+
throw error
37+
}
38+
}

services/apps/script_executor_worker/src/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,8 @@ export interface IDedupActivityRelationsArgs extends IScriptBatchTestArgs {
6363
groupsPerRun?: number
6464
cursor?: Omit<IActivityRelationDuplicateGroup, 'activityIds'>
6565
}
66+
67+
export interface IBlockOrganizationAffiliationArgs {
68+
organizationId: string
69+
offset?: number
70+
}

services/apps/script_executor_worker/src/workflows.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { blockOrganizationAffiliation } from './workflows/block-organization-affiliation'
12
import { cleanupDuplicateMembers } from './workflows/cleanup/duplicate-members'
23
import { cleanupMembers } from './workflows/cleanup/members'
34
import { cleanupOrganizations } from './workflows/cleanup/organizations'
@@ -30,4 +31,5 @@ export {
3031
cleanupDuplicateMembers,
3132
fixBotMembersAffiliation,
3233
dedupActivityRelations,
34+
blockOrganizationAffiliation,
3335
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { continueAsNew, proxyActivities } from '@temporalio/workflow'
2+
3+
import * as activities from '../activities'
4+
import { IBlockOrganizationAffiliationArgs } from '../types'
5+
import { chunkArray } from '../utils/common'
6+
7+
const { getOrganizationMembers, blockMemberOrganizationAffiliation, calculateMemberAffiliations } =
8+
proxyActivities<typeof activities>({
9+
startToCloseTimeout: '30 minutes',
10+
})
11+
12+
export async function blockOrganizationAffiliation(
13+
args: IBlockOrganizationAffiliationArgs,
14+
): Promise<void> {
15+
const MEMBERS_PER_RUN = 500
16+
const BATCH_SIZE = 50
17+
const OFFSET = args.offset ?? 0
18+
19+
const memberOrganizations = await getOrganizationMembers(
20+
args.organizationId,
21+
MEMBERS_PER_RUN,
22+
OFFSET,
23+
)
24+
25+
if (memberOrganizations.length === 0) {
26+
console.log('No more organization members to block!')
27+
return
28+
}
29+
30+
// Step 1: Block all affiliations in batches
31+
for (const chunk of chunkArray(memberOrganizations, BATCH_SIZE)) {
32+
await Promise.all(chunk.map((mo) => blockMemberOrganizationAffiliation(mo.memberId, mo.id)))
33+
}
34+
35+
// Step 2: Deduplicate memberIds and calculate affiliations
36+
const uniqueMemberIds = Array.from(new Set(memberOrganizations.map((mo) => mo.memberId)))
37+
for (const chunk of chunkArray(uniqueMemberIds, BATCH_SIZE)) {
38+
await Promise.all(chunk.map((memberId) => calculateMemberAffiliations(memberId)))
39+
}
40+
41+
// Step 3: Continue pagination
42+
await continueAsNew<typeof blockOrganizationAffiliation>({
43+
...args,
44+
offset: OFFSET + MEMBERS_PER_RUN,
45+
})
46+
}

services/libs/data-access-layer/src/old/apps/script_executor_worker/organization.repo.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { DbConnection, DbTransaction } from '@crowd/database'
22
import { Logger } from '@crowd/logging'
3+
import { IMemberOrganization } from '@crowd/types'
34

45
class OrganizationRepository {
56
constructor(
@@ -161,6 +162,21 @@ class OrganizationRepository {
161162
{ memberId },
162163
)
163164
}
165+
166+
async findOrganizationMembers(
167+
organizationId: string,
168+
limit: number,
169+
offset: number,
170+
): Promise<IMemberOrganization[]> {
171+
return this.connection.any(
172+
`
173+
SELECT * FROM "memberOrganizations"
174+
WHERE "organizationId" = $(organizationId) AND "deletedAt" IS NULL
175+
ORDER BY "memberId", "id"
176+
LIMIT $(limit) OFFSET $(offset)`,
177+
{ organizationId, limit, offset },
178+
)
179+
}
164180
}
165181

166182
export default OrganizationRepository

0 commit comments

Comments
 (0)