66 IntegrationStreamDataState ,
77 IntegrationStreamState ,
88 WebhookState ,
9- WebhookType ,
109} from '@crowd/types'
1110import {
1211 IInsertableWebhookStream ,
@@ -31,26 +30,25 @@ export default class IntegrationStreamRepository extends RepositoryBase<Integrat
3130 try {
3231 const results = await this . db ( ) . any (
3332 `
34- select iw.id,
33+ SELECT iw.id,
3534 iw."tenantId",
3635 iw."integrationId",
3736 iw.state,
3837 iw.type,
3938 iw.payload,
40- iw."createdAt" as "createdAt",
41- i.platform as "platform"
42- from "incomingWebhooks" iw
43- inner join integrations i on iw."integrationId" = i.id
44- left join integration.streams s on iw.id = s."webhookId"
45- where s .id is null
46- and iw.type <> $(discourseType )
47- and iw.state = $(pendingState)
48- and iw."createdAt" < now () - interval '1 hour'
49- limit ${ limit }
50- for update skip locked ;
39+ iw."createdAt" AS "createdAt",
40+ i.platform AS "platform"
41+ FROM "incomingWebhooks" iw
42+ INNER JOIN integrations i ON iw."integrationId" = i.id
43+ WHERE NOT EXISTS (
44+ SELECT 1 FROM integration.streams s WHERE iw .id = s."webhookId"
45+ )
46+ AND iw.state = $(pendingState)
47+ AND iw."createdAt" < NOW () - INTERVAL '1 hour'
48+ LIMIT ${ limit }
49+ FOR UPDATE SKIP LOCKED ;
5150 ` ,
5251 {
53- discourseType : WebhookType . DISCOURSE ,
5452 pendingState : WebhookState . PENDING ,
5553 } ,
5654 )
0 commit comments