Skip to content

Commit 8f6ac75

Browse files
author
Uros Marolt
authored
Nodejs worker fixes (#1854)
1 parent f8d9281 commit 8f6ac75

11 files changed

Lines changed: 67 additions & 318 deletions

File tree

backend/src/bin/jobs/checkSqsQueues.ts

Lines changed: 0 additions & 60 deletions
This file was deleted.

backend/src/bin/jobs/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { CrowdJob } from '../../types/jobTypes'
22
import integrationTicks from './integrationTicks'
33
import weeklyAnalyticsEmailsCoordinator from './weeklyAnalyticsEmailsCoordinator'
44
import memberScoreCoordinator from './memberScoreCoordinator'
5-
import checkSqsQueues from './checkSqsQueues'
65
import refreshMaterializedViews from './refreshMaterializedViews'
76
import refreshMaterializedViewsForCube from './refreshMaterializedViewsForCube'
87
import downgradeExpiredPlans from './downgradeExpiredPlans'
@@ -20,7 +19,6 @@ const EMAILS_ENABLED = WEEKLY_EMAILS_CONFIG.enabled === 'true'
2019
const jobs: CrowdJob[] = [
2120
integrationTicks,
2221
memberScoreCoordinator,
23-
checkSqsQueues,
2422
refreshMaterializedViews,
2523
refreshMaterializedViewsForCube,
2624
downgradeExpiredPlans,

backend/src/bin/nodejs-worker.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
import { timeout } from '@crowd/common'
22
import { Logger, getChildLogger, getServiceLogger, logExecutionTimeV2 } from '@crowd/logging'
3+
import {
4+
SqsDeleteMessageRequest,
5+
SqsMessage,
6+
SqsReceiveMessageRequest,
7+
deleteMessage,
8+
receiveMessage,
9+
sendMessage,
10+
} from '@crowd/sqs'
311
import { SpanStatusCode, getServiceTracer } from '@crowd/tracing'
4-
import { DeleteMessageRequest, Message, ReceiveMessageRequest } from 'aws-sdk/clients/sqs'
512
import moment from 'moment'
613
import { SQS_CONFIG } from '../conf'
714
import { processDbOperationsMessage } from '../serverless/dbOperations/workDispatcher'
815
import { processNodeMicroserviceMessage } from '../serverless/microservices/nodejs/workDispatcher'
916
import { NodeWorkerMessageType } from '../serverless/types/workerTypes'
1017
import { sendNodeWorkerMessage } from '../serverless/utils/nodeWorkerSQS'
1118
import { NodeWorkerMessageBase } from '../types/mq/nodeWorkerMessageBase'
12-
import { deleteMessage, receiveMessage, sendMessage } from '../utils/sqs'
1319
import { processIntegration, processWebhook } from './worker/integrations'
20+
import { SQS_CLIENT } from '@/serverless/utils/serviceSQS'
1421

1522
/* eslint-disable no-constant-condition */
1623

@@ -26,24 +33,30 @@ process.on('SIGTERM', async () => {
2633
exiting = true
2734
})
2835

29-
const receive = (delayed?: boolean): Promise<Message | undefined> => {
30-
const params: ReceiveMessageRequest = {
36+
const receive = async (delayed?: boolean): Promise<SqsMessage | undefined> => {
37+
const params: SqsReceiveMessageRequest = {
3138
QueueUrl: delayed ? SQS_CONFIG.nodejsWorkerDelayableQueue : SQS_CONFIG.nodejsWorkerQueue,
3239
MessageAttributeNames: !delayed
3340
? undefined
3441
: ['remainingDelaySeconds', 'tenantId', 'targetQueueUrl'],
3542
}
3643

37-
return receiveMessage(params)
44+
const messages = await receiveMessage(SQS_CLIENT(), params)
45+
46+
if (messages && messages.length === 1) {
47+
return messages[0]
48+
}
49+
50+
return undefined
3851
}
3952

4053
const removeFromQueue = (receiptHandle: string, delayed?: boolean): Promise<void> => {
41-
const params: DeleteMessageRequest = {
54+
const params: SqsDeleteMessageRequest = {
4255
QueueUrl: delayed ? SQS_CONFIG.nodejsWorkerDelayableQueue : SQS_CONFIG.nodejsWorkerQueue,
4356
ReceiptHandle: receiptHandle,
4457
}
4558

46-
return deleteMessage(params)
59+
return deleteMessage(SQS_CLIENT(), params)
4760
}
4861

4962
async function handleDelayedMessages() {
@@ -81,7 +94,7 @@ async function handleDelayedMessages() {
8194
if (message.MessageAttributes.targetQueueUrl) {
8295
const targetQueueUrl = message.MessageAttributes.targetQueueUrl.StringValue
8396
messageLogger.debug({ tenantId, targetQueueUrl }, 'Successfully delayed a message!')
84-
await sendMessage({
97+
await sendMessage(SQS_CLIENT(), {
8598
QueueUrl: targetQueueUrl,
8699
MessageGroupId: tenantId,
87100
MessageDeduplicationId: `${tenantId}-${moment().valueOf()}`,
@@ -129,7 +142,7 @@ async function handleMessages() {
129142
})
130143
handlerLogger.info('Listening for messages!')
131144

132-
const processSingleMessage = async (message: Message): Promise<void> => {
145+
const processSingleMessage = async (message: SqsMessage): Promise<void> => {
133146
await tracer.startActiveSpan('ProcessMessage', async (span) => {
134147
const msg: NodeWorkerMessageBase = JSON.parse(message.Body)
135148

backend/src/bin/scripts/trigger-webhook.ts

Lines changed: 0 additions & 167 deletions
This file was deleted.

backend/src/serverless/utils/nodeWorkerSQS.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import { MessageBodyAttributeMap } from 'aws-sdk/clients/sqs'
2-
import moment from 'moment'
31
import { getServiceChildLogger } from '@crowd/logging'
2+
import { SqsMessageAttributes, sendMessage } from '@crowd/sqs'
43
import { AutomationTrigger } from '@crowd/types'
5-
import { NodeWorkerMessageBase } from '../../types/mq/nodeWorkerMessageBase'
4+
import moment from 'moment'
65
import { IS_TEST_ENV, SQS_CONFIG } from '../../conf'
7-
import { sendMessage } from '../../utils/sqs'
8-
import { NodeWorkerMessageType } from '../types/workerTypes'
6+
import { NodeWorkerMessageBase } from '../../types/mq/nodeWorkerMessageBase'
97
import { ExportableEntity } from '../microservices/nodejs/messageTypes'
8+
import { NodeWorkerMessageType } from '../types/workerTypes'
9+
import { SQS_CLIENT } from './serviceSQS'
1010

1111
const log = getServiceChildLogger('nodeWorkerSQS')
1212

@@ -24,7 +24,7 @@ export const sendNodeWorkerMessage = async (
2424
}
2525

2626
// we can only delay for 15 minutes then we have to re-delay message
27-
let attributes: MessageBodyAttributeMap
27+
let attributes: SqsMessageAttributes
2828
let delay: number
2929
let delayed = false
3030
if (delaySeconds) {
@@ -80,7 +80,7 @@ export const sendNodeWorkerMessage = async (
8080
},
8181
'Sending nodejs-worker sqs message!',
8282
)
83-
await sendMessage(params)
83+
await sendMessage(SQS_CLIENT(), params)
8484
}
8585

8686
export const sendNewActivityNodeSQSMessage = async (
Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { sendMessage } from '@crowd/sqs'
12
import moment from 'moment'
2-
import { sqs } from '../../services/aws'
33
import { IS_TEST_ENV, KUBE_MODE, SQS_CONFIG } from '../../conf'
44
import { PythonWorkerMessage } from '../types/workerTypes'
5+
import { SQS_CLIENT } from './serviceSQS'
56

67
export const sendPythonWorkerMessage = async (
78
tenantId: string,
@@ -16,12 +17,10 @@ export const sendPythonWorkerMessage = async (
1617
throw new Error("Can't send python-worker SQS message when not in kube mode!")
1718
}
1819

19-
await sqs
20-
.sendMessage({
21-
QueueUrl: SQS_CONFIG.pythonWorkerQueue,
22-
MessageGroupId: tenantId,
23-
MessageDeduplicationId: `${tenantId}-${moment().valueOf()}`,
24-
MessageBody: JSON.stringify(body),
25-
})
26-
.promise()
20+
await sendMessage(SQS_CLIENT(), {
21+
QueueUrl: SQS_CONFIG.pythonWorkerQueue,
22+
MessageGroupId: tenantId,
23+
MessageDeduplicationId: `${tenantId}-${moment().valueOf()}`,
24+
MessageBody: JSON.stringify(body),
25+
})
2726
}

0 commit comments

Comments
 (0)