Skip to content

Commit 369bdb8

Browse files
authored
chore: change nango trigger timings and refactor cursors out of integration.settings (CM-962) (#3845)
Signed-off-by: Uroš Marolt <uros@marolt.me>
1 parent d031d74 commit 369bdb8

15 files changed

Lines changed: 354 additions & 181 deletions

File tree

backend/src/database/migrations/U1770818540__createNangoCursorsTable.sql

Whitespace-only changes.
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
-- Backup settings before any modifications
2+
CREATE TABLE integration.integrations_settings_backup_02_13_2026 AS
3+
SELECT id, settings FROM integrations;
4+
5+
CREATE TABLE integration.nango_cursors (
6+
"integrationId" UUID NOT NULL,
7+
"connectionId" TEXT NOT NULL,
8+
platform TEXT NOT NULL,
9+
model TEXT NOT NULL,
10+
cursor TEXT NOT NULL,
11+
"lastCheckedAt" TIMESTAMPTZ,
12+
"createdAt" TIMESTAMPTZ NOT NULL DEFAULT now(),
13+
"updatedAt" TIMESTAMPTZ NOT NULL DEFAULT now(),
14+
PRIMARY KEY ("integrationId", "connectionId", model),
15+
FOREIGN KEY ("integrationId") REFERENCES integrations(id) ON DELETE CASCADE
16+
);
17+
18+
CREATE INDEX ix_nango_cursors_lastCheckedAt ON integration.nango_cursors ("lastCheckedAt" NULLS FIRST);
19+
CREATE INDEX ix_nango_cursors_connectionId ON integration.nango_cursors ("connectionId");
20+
21+
-- GitHub-nango: unnest nangoMapping keys x cursor models
22+
INSERT INTO integration.nango_cursors ("integrationId", "connectionId", platform, model, cursor)
23+
SELECT
24+
i.id,
25+
nm.key,
26+
'github',
27+
cm.key,
28+
cm.value #>> '{}'
29+
FROM integrations i,
30+
jsonb_each(i.settings->'nangoMapping') nm,
31+
jsonb_each(COALESCE(i.settings->'cursors'->nm.key, '{}'::jsonb)) cm
32+
WHERE i.platform = 'github-nango'
33+
AND i."deletedAt" IS NULL
34+
AND i.settings->'nangoMapping' IS NOT NULL
35+
ON CONFLICT DO NOTHING;
36+
37+
-- Non-GitHub nango: connectionId = integrationId, unnest cursor models
38+
INSERT INTO integration.nango_cursors ("integrationId", "connectionId", platform, model, cursor)
39+
SELECT
40+
i.id,
41+
i.id::text,
42+
CASE
43+
WHEN i.platform = 'gerrit' THEN 'gerrit'
44+
WHEN i.platform = 'jira' THEN COALESCE(i.settings->>'nangoIntegrationName', 'jira-basic')
45+
WHEN i.platform = 'confluence' THEN COALESCE(i.settings->>'nangoIntegrationName', 'confluence')
46+
ELSE i.platform
47+
END,
48+
cm.key,
49+
cm.value #>> '{}'
50+
FROM integrations i,
51+
jsonb_each(COALESCE(i.settings->'cursors'->i.id::text, '{}'::jsonb)) cm
52+
WHERE i.platform IN ('gerrit', 'jira', 'confluence')
53+
AND i."deletedAt" IS NULL
54+
ON CONFLICT DO NOTHING;
55+
56+
-- Clean up settings.cursors
57+
UPDATE integrations
58+
SET settings = settings - 'cursors'
59+
WHERE settings->'cursors' IS NOT NULL;

backend/src/services/integrationService.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -869,11 +869,6 @@ export default class IntegrationService {
869869
platform: PlatformType.GITHUB_NANGO,
870870
settings: {
871871
...settings,
872-
...(integration.settings.cursors
873-
? {
874-
cursors: integration.settings.cursors,
875-
}
876-
: {}),
877872
...(integration.settings.nangoMapping
878873
? {
879874
nangoMapping: integration.settings.nangoMapping,

services/apps/cron_service/src/jobs/nangoMonitoring.job.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
1111
import {
1212
INangoIntegrationData,
13+
fetchNangoCursorRowsForIntegration,
1314
fetchNangoIntegrationData,
1415
} from '@crowd/data-access-layer/src/integrations'
1516
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
@@ -98,9 +99,12 @@ const job: IJobDefinition = {
9899

99100
// then collect nango connection status checks for each connection
100101
if (int.settings.nangoMapping) {
102+
const cursorRows = await fetchNangoCursorRowsForIntegration(pgpQx(dbConnection), int.id)
103+
const connectionIdsWithCursors = new Set(cursorRows.map((r) => r.connectionId))
104+
101105
for (const connectionId of Object.keys(int.settings.nangoMapping)) {
102106
// check if we have cursors already for this connection
103-
if (!int.settings.cursors || !int.settings.cursors[connectionId]) {
107+
if (!connectionIdsWithCursors.has(connectionId)) {
104108
if (ghNoCursorsYet.has(int.id)) {
105109
ghNoCursorsYet.set(int.id, ghNoCursorsYet.get(int.id) + 1)
106110
} else {

services/apps/cron_service/src/jobs/nangoTrigger.job.ts

Lines changed: 92 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -2,152 +2,141 @@ import CronTime from 'cron-time-generator'
22

33
import { ConcurrencyLimiter, IS_DEV_ENV } from '@crowd/common'
44
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
5-
import { fetchNangoIntegrationDataForCheck } from '@crowd/data-access-layer/src/integrations'
5+
import {
6+
fetchNangoIntegrationDataForCheck,
7+
fetchNangoLastCheckedAt,
8+
} from '@crowd/data-access-layer/src/integrations'
69
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
710
import {
811
ALL_NANGO_INTEGRATIONS,
9-
INangoWebhookPayload,
12+
INangoConnectionToCheck,
1013
NANGO_INTEGRATION_CONFIG,
1114
NangoIntegration,
1215
nangoIntegrationToPlatform,
1316
platformToNangoIntegration,
1417
} from '@crowd/nango'
15-
import { TEMPORAL_CONFIG, WorkflowIdReusePolicy, getTemporalClient } from '@crowd/temporal'
18+
import {
19+
TEMPORAL_CONFIG,
20+
WorkflowIdConflictPolicy,
21+
WorkflowIdReusePolicy,
22+
getTemporalClient,
23+
} from '@crowd/temporal'
1624
import { PlatformType } from '@crowd/types'
1725

1826
import { IJobDefinition } from '../types'
1927

28+
// How old an integration must be before we reduce its check frequency
29+
const AGE_THRESHOLD_MS = IS_DEV_ENV
30+
? 20 * 60 * 1000 // 20 minutes for local testing
31+
: 30 * 24 * 60 * 60 * 1000 // 1 month
32+
33+
// Minimum interval between checks for new integrations
34+
const NEW_INTERVAL_MS = IS_DEV_ENV
35+
? 5 * 60 * 1000 // 5 minutes
36+
: 60 * 60 * 1000 // 1 hour
37+
38+
// Minimum interval between checks for old integrations
39+
const OLD_INTERVAL_MS = IS_DEV_ENV
40+
? 15 * 60 * 1000 // 15 minutes
41+
: 6 * 60 * 60 * 1000 // 6 hours
42+
2043
const job: IJobDefinition = {
2144
name: 'nango-trigger',
22-
cronTime: IS_DEV_ENV ? CronTime.everyMinute() : CronTime.everyHour(),
45+
cronTime: IS_DEV_ENV ? CronTime.every(5).minutes() : CronTime.everyHour(),
2346
timeout: 4 * 60 * 60, // 4 hours
2447
process: async (ctx) => {
2548
ctx.log.info('Triggering nango API check as if a webhook was received!')
2649

2750
const temporal = await getTemporalClient(TEMPORAL_CONFIG())
2851

2952
const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0)
53+
const qx = pgpQx(dbConnection)
3054

31-
const integrationsToTrigger = await fetchNangoIntegrationDataForCheck(pgpQx(dbConnection), [
32-
...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform)),
33-
])
55+
const platforms = [...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform))]
3456

35-
const limiter = new ConcurrencyLimiter(5)
57+
const allIntegrations = await fetchNangoIntegrationDataForCheck(qx, platforms)
3658

37-
// Collect all workflow start operations
38-
const workflowStarts: Array<() => Promise<void>> = []
59+
// Batch-fetch lastCheckedAt for all connections
60+
const lastCheckedAtRows = await fetchNangoLastCheckedAt(qx, platforms)
61+
const lastCheckedAtMap = new Map<string, string | null>()
62+
for (const row of lastCheckedAtRows) {
63+
lastCheckedAtMap.set(`${row.integrationId}/${row.connectionId}`, row.lastCheckedAt)
64+
}
3965

40-
for (let i = 0; i < integrationsToTrigger.length; i++) {
41-
const int = integrationsToTrigger[i]
66+
const now = new Date()
67+
const limiter = new ConcurrencyLimiter(5)
68+
const workflowStarts: Array<() => Promise<void>> = []
69+
let skippedConnections = 0
4270

71+
for (let i = 0; i < allIntegrations.length; i++) {
72+
const int = allIntegrations[i]
4373
const { id, settings } = int
4474

45-
ctx.log.info(
46-
`${i + 1}/${integrationsToTrigger.length} Triggering nango integration check for ${id} (${int.platform})`,
47-
)
48-
4975
const platform = platformToNangoIntegration(int.platform as PlatformType, settings)
5076

5177
if (platform === NangoIntegration.GITHUB && !settings.nangoMapping) {
5278
// ignore non-nango github integrations
5379
continue
5480
}
5581

56-
for (const model of Object.values(NANGO_INTEGRATION_CONFIG[platform].models)) {
57-
ctx.log.debug(
58-
{
59-
integrationId: id,
60-
platform,
61-
model,
62-
},
63-
'Triggering nango integration check!',
82+
const integrationAgeMs = now.getTime() - new Date(int.createdAt).getTime()
83+
const isOld = integrationAgeMs >= AGE_THRESHOLD_MS
84+
const requiredInterval = isOld ? OLD_INTERVAL_MS : NEW_INTERVAL_MS
85+
86+
// Determine connectionIds for this integration
87+
const connectionIds: string[] =
88+
platform === NangoIntegration.GITHUB ? Object.keys(settings.nangoMapping) : [id]
89+
90+
const models = Object.values(NANGO_INTEGRATION_CONFIG[platform].models) as string[]
91+
const connections: INangoConnectionToCheck[] = []
92+
93+
for (const connectionId of connectionIds) {
94+
const key = `${id}/${connectionId}`
95+
const lastCheckedAt = lastCheckedAtMap.get(key)
96+
97+
// Skip if checked recently enough
98+
if (lastCheckedAt) {
99+
const elapsed = now.getTime() - new Date(lastCheckedAt).getTime()
100+
if (elapsed < requiredInterval) {
101+
skippedConnections++
102+
continue
103+
}
104+
}
105+
106+
ctx.log.info(
107+
`${i + 1}/${allIntegrations.length} Triggering nango integration check for ${id} / ${connectionId} (${platform})`,
64108
)
65109

110+
let workflowIdPrefix = ''
66111
if (platform === NangoIntegration.GITHUB) {
67-
// trigger for each connection id - could be multiple because 1 integration can have multiple repositories and each repository has a connection id on nango
68-
for (const connectionId of Object.keys(settings.nangoMapping)) {
69-
const payload: INangoWebhookPayload = {
70-
connectionId: connectionId,
71-
providerConfigKey: platform,
72-
syncName: 'not important',
73-
model,
74-
responseResults: { added: 1, updated: 1, deleted: 1 },
75-
syncType: 'INCREMENTAL',
76-
modifiedAfter: new Date().toISOString(),
77-
}
78-
79-
workflowStarts.push(async () => {
80-
try {
81-
await temporal.workflow.start('processNangoWebhook', {
82-
taskQueue: 'nango',
83-
workflowId: `nango-webhook/${platform}/${id}/${connectionId}/${model}/cron-triggered`,
84-
workflowIdReusePolicy:
85-
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
86-
retry: {
87-
maximumAttempts: 10,
88-
},
89-
args: [payload],
90-
})
91-
} catch (error) {
92-
if (error.name === 'WorkflowExecutionAlreadyStartedError') {
93-
ctx.log.debug(
94-
{
95-
integrationId: id,
96-
platform,
97-
model,
98-
connectionId,
99-
},
100-
'Workflow already running, skipping...',
101-
)
102-
return
103-
}
104-
throw error
105-
}
106-
})
107-
}
108-
} else {
109-
const payload: INangoWebhookPayload = {
110-
connectionId: id,
111-
providerConfigKey: platform,
112-
syncName: 'not important',
113-
model,
114-
responseResults: { added: 1, updated: 1, deleted: 1 },
115-
syncType: 'INCREMENTAL',
116-
modifiedAfter: new Date().toISOString(),
117-
}
112+
const mapping = settings.nangoMapping[connectionId]
113+
workflowIdPrefix = `${mapping.owner}/${mapping.repoName}/${connectionId}`
114+
}
118115

119-
workflowStarts.push(async () => {
120-
try {
121-
await temporal.workflow.start('processNangoWebhook', {
122-
taskQueue: 'nango',
123-
workflowId: `nango-webhook/${platform}/${id}/${model}/cron-triggered`,
124-
workflowIdReusePolicy:
125-
WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
126-
retry: {
127-
maximumAttempts: 10,
128-
},
129-
args: [payload],
130-
})
131-
} catch (error) {
132-
if (error.name === 'WorkflowExecutionAlreadyStartedError') {
133-
ctx.log.debug(
134-
{
135-
integrationId: id,
136-
platform,
137-
model,
138-
},
139-
'Workflow already running, skipping...',
140-
)
141-
return
142-
}
143-
throw error
144-
}
116+
connections.push({ connectionId, models, workflowIdPrefix })
117+
}
118+
119+
if (connections.length > 0) {
120+
const workflowId = `nango-trigger/${platform}/${id}/cron-triggered`
121+
122+
workflowStarts.push(async () => {
123+
await temporal.workflow.start('triggerNangoIntegrationCheck', {
124+
taskQueue: 'nango',
125+
workflowId,
126+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
127+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.USE_EXISTING,
128+
retry: {
129+
maximumAttempts: 10,
130+
},
131+
args: [{ integrationId: id, providerConfigKey: platform, connections }],
145132
})
146-
}
133+
})
147134
}
148135
}
149136

150-
ctx.log.info(`Triggering nango integration checks with ${workflowStarts.length} workflows!`)
137+
ctx.log.info(
138+
`Triggering ${workflowStarts.length} workflows (skipped ${skippedConnections} connections due to recent checks)`,
139+
)
151140

152141
// Track completed workflows
153142
let completedWorkflows = 0

0 commit comments

Comments
 (0)