Skip to content

Commit f7ad9b6

Browse files
authored
fix: handle and log problematic records in data-sink-worker (#3975)
Signed-off-by: Mouad BANI <mouad-mb@outlook.com>
1 parent bfc1bae commit f7ad9b6

1 file changed

Lines changed: 41 additions & 24 deletions

File tree

services/apps/data_sink_worker/src/service/dataSink.service.ts

Lines changed: 41 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -488,31 +488,48 @@ export default class DataSinkService extends LoggerBase {
488488
const end = performance.now()
489489
const totalTime = end - start
490490

491-
for (const type of types) {
492-
const items = groupedByType.get(type)
493-
const msPerItem = Math.floor(totalTime / items.length)
494-
495-
const args = { type }
496-
497-
if (type === IntegrationResultType.ACTIVITY) {
498-
items.forEach((item) => {
499-
const activityArgs = {
500-
...args,
501-
platform: item.platform,
502-
integrationId: item.integrationId,
503-
onboarding:
504-
item.onboarding === null || item.onboarding === undefined
505-
? '<not-set>'
506-
: item.onboarding.toString(),
507-
channel: (item.data.data as IActivityData).channel,
508-
}
509-
telemetry.distribution('data_sink_worker.process_result', msPerItem, activityArgs)
510-
})
511-
} else {
512-
items.forEach(() => {
513-
telemetry.distribution('data_sink_worker.process_result', msPerItem, args)
514-
})
491+
try {
492+
for (const type of types) {
493+
const items = groupedByType.get(type)
494+
const msPerItem = Math.floor(totalTime / items.length)
495+
496+
const args = { type }
497+
498+
if (type === IntegrationResultType.ACTIVITY) {
499+
items.forEach((item) => {
500+
const activityData = item.data?.data as IActivityData | undefined
501+
if (!activityData) {
502+
this.log.warn(
503+
{
504+
resultId: item.id,
505+
integrationId: item.integrationId,
506+
platform: item.platform,
507+
streamId: item.streamId,
508+
dataType: item.data?.type,
509+
},
510+
'Activity result has missing data payload (data.data is undefined)!',
511+
)
512+
}
513+
const activityArgs = {
514+
...args,
515+
platform: item.platform,
516+
integrationId: item.integrationId,
517+
onboarding:
518+
item.onboarding === null || item.onboarding === undefined
519+
? '<not-set>'
520+
: item.onboarding.toString(),
521+
channel: activityData?.channel,
522+
}
523+
telemetry.distribution('data_sink_worker.process_result', msPerItem, activityArgs)
524+
})
525+
} else {
526+
items.forEach(() => {
527+
telemetry.distribution('data_sink_worker.process_result', msPerItem, args)
528+
})
529+
}
515530
}
531+
} catch (telemetryErr) {
532+
this.log.error(telemetryErr, 'Error while reporting telemetry for processed results!')
516533
}
517534
}
518535
}

0 commit comments

Comments
 (0)