Skip to content

Commit e81fabf

Browse files
authored
feat(snowflake-connectors): meetings implementation [CM-1034] (#3998)
Signed-off-by: Mouad BANI <mouad-mb@outlook.com> Signed-off-by: Mouad BANI <mbani@contractor.linuxfoundation.org>
1 parent aaa5305 commit e81fabf

15 files changed

Lines changed: 372 additions & 28 deletions

File tree

.claude/skills/scaffold-snowflake-connector/SKILL.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ After Step 1 and/or Step 2, build a column registry per table:
196196

197197
**Store this as the canonical column reference. Every column name used in generated code must appear in this registry. Never assume or invent a column name.**
198198

199+
**Flag non-VARCHAR column types** (e.g., `DATE`, `TIME`, `TIMESTAMP_TZ`, `BOOLEAN`, `NUMBER`) — these arrive as native JS types from the Parquet reader, not strings (see touch point 9 rules).
200+
199201
For each JOIN table, check whether any existing transformer in `services/apps/snowflake_connectors/src/integrations/` queries from the same table. If yes, inherit its column mappings; if no, treat every column as unknown and derive it from sample data in the Pre-Analysis step below.
200202

201203
### Step 3 — Sample data
@@ -342,7 +344,10 @@ After all identity fields are confirmed, summarize how `buildMemberIdentities()`
342344
343345
### 3b. Organization Mapping
344346
345-
If Pre-Analysis determined there is no org data (no org-related columns found in any table), confirm: "I don't see any organization columns in the schema. Does this source have org/company data?" — if yes, proceed; if no, skip to 3c.
347+
If Pre-Analysis determined there is no org data (no org-related columns found in any table): before asking the user, first read existing transformers in `services/apps/snowflake_connectors/src/integrations/` to check whether any of them join an org table using a key that also exists in the user's tables. If a match is found, prompt the user:
348+
> "I don't see org columns in the tables you provided, but [EXISTING_PLATFORM] sources org data from `{ORG_TABLE}` via `{join_key}` — which also appears in your table. Did you mean to include this? (Recommended)"
349+
350+
If no existing pattern is joinable, ask: "I don't see any org columns. Does this source have org/company data?" — if yes, ask for the table; if no, skip to 3c.
346351
347352
If Pre-Analysis identified org columns:
348353
@@ -565,6 +570,7 @@ File: `services/apps/snowflake_connectors/src/integrations/{platform}/{source}/b
565570
**Rules (enforced — do not deviate):**
566571
- Use explicit column names only. Do not use `table.*` or `table.* EXCLUDE (...)` in new implementations — existing sources (TNC, CVENT) use these patterns but new sources should list columns explicitly to avoid parquet encoding/decoding issues
567572
- If any TIMESTAMP_TZ columns exist in the schema, exclude and re-cast them as TIMESTAMP_NTZ (see CVENT pattern)
573+
- Do not concatenate or transform date/time columns in SQL — keep them as separate columns and let the transformer handle type coercion (see touch point 9 rules)
568574
- Follow the CTE structure:
569575
1. `org_accounts` CTE (if org data present)
570576
2. `CDP_MATCHED_SEGMENTS` CTE (always)
@@ -585,6 +591,8 @@ Show the full generated file and ask for confirmation before writing.
585591
File: `services/apps/snowflake_connectors/src/integrations/{platform}/{source}/transformer.ts`
586592

587593
**Rules (enforced — do not deviate):**
594+
595+
- **Parquet type coercion — never blindly cast `row.COLUMN as string`.** Snowflake types may arrive as native JS types after Parquet decoding (e.g., `DATE``Date` object, `TIME``number` in ms, `BOOLEAN``boolean`). Always check the Snowflake column type from the schema registry and handle the actual JS type the Parquet reader delivers — do not assume every column is a string.
588596
- All string comparisons must be case-insensitive: use `.toLowerCase()` on both sides of comparison only; preserve the original value in the output
589597
- No broad `else` statements — every branch must have an explicit condition
590598
- All column names referenced in code must exactly match the schema registry — never assumed

backend/src/database/migrations/U1775219382__addMeetingsActivityTypes.sql

Whitespace-only changes.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
INSERT INTO "activityTypes" ("activityType", platform, "isCodeContribution", "isCollaboration", description, "label") VALUES
2+
('invited-meeting', 'meetings', false, false, 'User is invited to a meeting', 'Invited to a meeting'),
3+
('attended-meeting', 'meetings', false, false, 'User attends a meeting', 'Attended a meeting');

services/apps/snowflake_connectors/src/consumer/transformerConsumer.ts

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -89,24 +89,26 @@ export class TransformerConsumer {
8989
let resolveSkippedCount = 0
9090

9191
for await (const row of this.s3Service.streamParquetRows(job.s3Path)) {
92-
const result = source.transformer.safeTransformRow(row)
93-
if (!result) {
92+
const results = source.transformer.safeTransformRow(row)
93+
if (!results) {
9494
transformSkippedCount++
9595
continue
9696
}
9797

98-
const resolved = await this.integrationResolver.resolve(platform, result.segment)
99-
if (!resolved) {
100-
resolveSkippedCount++
101-
continue
98+
for (const result of results) {
99+
const resolved = await this.integrationResolver.resolve(platform, result.segment)
100+
if (!resolved) {
101+
resolveSkippedCount++
102+
continue
103+
}
104+
105+
await this.emitter.createAndProcessActivityResult(
106+
resolved.segmentId,
107+
resolved.integrationId,
108+
result.activity,
109+
)
110+
transformedCount++
102111
}
103-
104-
await this.emitter.createAndProcessActivityResult(
105-
resolved.segmentId,
106-
resolved.integrationId,
107-
result.activity,
108-
)
109-
transformedCount++
110112
}
111113

112114
const skippedCount = transformSkippedCount + resolveSkippedCount

services/apps/snowflake_connectors/src/core/transformerBase.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ export abstract class TransformerBase {
2424
abstract readonly platform: PlatformType
2525

2626
/**
27-
* Transform a single raw row from the S3 export into an activity
27+
* Transform a single raw row from the S3 export into one or more activities
2828
* along with routing metadata. Returns null if the row should be skipped.
2929
*/
30-
abstract transformRow(row: Record<string, unknown>): TransformedActivity | null
30+
abstract transformRow(
31+
row: Record<string, unknown>,
32+
): TransformedActivity | TransformedActivity[] | null
3133

3234
private static readonly INDIVIDUAL_NO_ACCOUNT_RE = /^individual\s*(?:[-?]|with)\s*no\s+account$/i
3335

@@ -104,10 +106,16 @@ export abstract class TransformerBase {
104106

105107
/**
106108
* Safe wrapper around transformRow that catches errors and returns null.
109+
* Always normalizes the result to an array for consistent consumption.
107110
*/
108-
safeTransformRow(row: Record<string, unknown>): TransformedActivity | null {
111+
safeTransformRow(row: Record<string, unknown>): TransformedActivity[] | null {
109112
try {
110-
return this.transformRow(row)
113+
const result = this.transformRow(row)
114+
if (result === null) {
115+
return null
116+
}
117+
const arr = Array.isArray(result) ? result : [result]
118+
return arr.length > 0 ? arr : null
111119
} catch (err) {
112120
const message = err instanceof Error ? err.message : String(err)
113121
const stack = err instanceof Error ? err.stack : undefined

services/apps/snowflake_connectors/src/integrations/committees/committees/transformer.ts

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,16 +111,14 @@ export class CommitteesCommitteesTransformer extends TransformerBase {
111111
{
112112
displayName,
113113
source: OrganizationSource.COMMITTEES,
114-
identities: website
115-
? [
116-
{
117-
platform: PlatformType.COMMITTEES,
118-
value: website,
119-
type: OrganizationIdentityType.PRIMARY_DOMAIN,
120-
verified: true,
121-
},
122-
]
123-
: [],
114+
identities: [
115+
{
116+
platform: PlatformType.COMMITTEES,
117+
value: website,
118+
type: OrganizationIdentityType.PRIMARY_DOMAIN,
119+
verified: true,
120+
},
121+
],
124122
},
125123
]
126124
}

services/apps/snowflake_connectors/src/integrations/index.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import { buildSourceQuery as committeesCommitteesBuildQuery } from './committees
1010
import { CommitteesCommitteesTransformer } from './committees/committees/transformer'
1111
import { buildSourceQuery as cventBuildSourceQuery } from './cvent/event-registrations/buildSourceQuery'
1212
import { CventTransformer } from './cvent/event-registrations/transformer'
13+
import { buildSourceQuery as meetingAttendanceBuildQuery } from './meetings/meeting-attendance/buildSourceQuery'
14+
import { MeetingAttendanceTransformer } from './meetings/meeting-attendance/transformer'
1315
import { buildSourceQuery as tncCertificatesBuildQuery } from './tnc/certificates/buildSourceQuery'
1416
import { TncCertificatesTransformer } from './tnc/certificates/transformer'
1517
import { buildSourceQuery as tncCoursesBuildQuery } from './tnc/courses/buildSourceQuery'
@@ -22,6 +24,15 @@ export type { BuildSourceQuery, DataSource, PlatformDefinition } from './types'
2224
export { DataSourceName } from './types'
2325

2426
const supported: Partial<Record<PlatformType, PlatformDefinition>> = {
27+
[PlatformType.MEETINGS]: {
28+
sources: [
29+
{
30+
name: DataSourceName.MEETINGS_MEETING_ATTENDANCE,
31+
buildSourceQuery: meetingAttendanceBuildQuery,
32+
transformer: new MeetingAttendanceTransformer(),
33+
},
34+
],
35+
},
2536
[PlatformType.COMMITTEES]: {
2637
sources: [
2738
{
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import { IS_PROD_ENV } from '@crowd/common'
2+
3+
const CDP_MATCHED_SEGMENTS = `
4+
cdp_matched_segments AS (
5+
SELECT DISTINCT
6+
s.SOURCE_ID AS sourceId,
7+
s.slug
8+
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
9+
WHERE s.PARENT_SLUG IS NOT NULL
10+
AND s.GRANDPARENTS_SLUG IS NOT NULL
11+
AND s.SOURCE_ID IS NOT NULL
12+
)`
13+
14+
const ORG_ACCOUNTS = `
15+
org_accounts AS (
16+
SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES
17+
FROM analytics.bronze_fivetran_salesforce.accounts
18+
WHERE website IS NOT NULL
19+
UNION ALL
20+
SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES
21+
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
22+
WHERE website IS NOT NULL
23+
)`
24+
25+
const LF_SSO_LOOKUP = `
26+
lf_sso_lookup AS (
27+
SELECT INVITEE_LF_USER_ID AS LF_USER_ID, MAX(INVITEE_LF_SSO) AS LF_SSO
28+
FROM ANALYTICS.SILVER_FACT.MEETING_ATTENDANCE
29+
WHERE INVITEE_LF_SSO IS NOT NULL
30+
AND INVITEE_LF_USER_ID IS NOT NULL
31+
GROUP BY INVITEE_LF_USER_ID
32+
)`
33+
34+
export const buildSourceQuery = (sinceTimestamp?: string): string => {
35+
let select = `
36+
SELECT
37+
MD5(COALESCE(CAST(t.PRIMARY_KEY AS VARCHAR), '') || '|' || COALESCE(CAST(t.COMMITTEE_ID AS VARCHAR), '')) AS GENERATED_SOURCE_ID,
38+
CAST(t.MEETING_ID AS VARCHAR) AS MEETING_ID,
39+
t.MEETING_NAME,
40+
t.PROJECT_ID,
41+
t.PROJECT_NAME,
42+
t.PROJECT_SLUG,
43+
t.ACCOUNT_ID,
44+
t.ACCOUNT_NAME,
45+
t.MEETING_DATE,
46+
t.MEETING_TIME,
47+
t.INVITEE_FULL_NAME,
48+
COALESCE(t.INVITEE_LF_SSO, sso.LF_SSO) AS INVITEE_LF_SSO,
49+
t.INVITEE_LF_USER_ID,
50+
t.INVITEE_EMAIL,
51+
t.INVITEE_ATTENDED,
52+
t.WAS_INVITED,
53+
t.RAW_COMMITTEE_TYPE,
54+
t.UPDATED_TS,
55+
org.website AS ORG_WEBSITE,
56+
org.domain_aliases AS ORG_DOMAIN_ALIASES,
57+
org.logo_url AS LOGO_URL,
58+
org.industry AS ORGANIZATION_INDUSTRY,
59+
CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE
60+
FROM ANALYTICS.SILVER_FACT.MEETING_ATTENDANCE t
61+
INNER JOIN cdp_matched_segments cms
62+
ON cms.slug = t.PROJECT_SLUG
63+
AND cms.sourceId = t.PROJECT_ID
64+
LEFT JOIN lf_sso_lookup sso
65+
ON t.INVITEE_LF_USER_ID = sso.LF_USER_ID
66+
LEFT JOIN org_accounts org
67+
ON t.ACCOUNT_ID = org.account_id
68+
WHERE (t.WAS_INVITED = TRUE OR t.INVITEE_ATTENDED = TRUE)
69+
AND NULLIF(TRIM(t.INVITEE_EMAIL), '') IS NOT NULL`
70+
71+
if (!IS_PROD_ENV) {
72+
select += ` AND t.PROJECT_SLUG = 'cncf'`
73+
}
74+
75+
const dedup = `
76+
QUALIFY ROW_NUMBER() OVER (PARTITION BY t.PRIMARY_KEY ORDER BY org.website DESC) = 1`
77+
78+
if (!sinceTimestamp) {
79+
return `
80+
WITH ${ORG_ACCOUNTS},
81+
${CDP_MATCHED_SEGMENTS},
82+
${LF_SSO_LOOKUP}
83+
${select}
84+
${dedup}`.trim()
85+
}
86+
87+
return `
88+
WITH ${ORG_ACCOUNTS},
89+
${CDP_MATCHED_SEGMENTS},
90+
${LF_SSO_LOOKUP},
91+
new_cdp_segments AS (
92+
SELECT DISTINCT
93+
s.SOURCE_ID AS sourceId,
94+
s.slug
95+
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
96+
WHERE s.CREATED_TS >= '${sinceTimestamp}'
97+
AND s.PARENT_SLUG IS NOT NULL
98+
AND s.GRANDPARENTS_SLUG IS NOT NULL
99+
AND s.SOURCE_ID IS NOT NULL
100+
)
101+
102+
-- Updated records in existing segments
103+
${select}
104+
AND t.UPDATED_TS > '${sinceTimestamp}'
105+
${dedup}
106+
107+
UNION
108+
109+
-- All records in newly created segments
110+
${select}
111+
AND EXISTS (
112+
SELECT 1 FROM new_cdp_segments ncs
113+
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
114+
)
115+
${dedup}`.trim()
116+
}

0 commit comments

Comments
 (0)