Skip to content

Commit 3804abe

Browse files
authored
feat: kafka-connect-monitor service (#3552)
1 parent c60dc69 commit 3804abe

8 files changed

Lines changed: 294 additions & 0 deletions

File tree

pnpm-lock.yaml

Lines changed: 52 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
DOCKERFILE="./services/docker/Dockerfile.kafka_connect_monitor"
2+
CONTEXT="../"
3+
REPO="sjc.ocir.io/axbydjxa5zuh/kafka-connect-monitor"
4+
SERVICES="kafka-connect-monitor"
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
FROM node:20-alpine as builder
2+
3+
RUN apk add --no-cache python3 make g++
4+
5+
WORKDIR /usr/crowd/app
6+
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate
7+
8+
COPY ./pnpm-workspace.yaml ./pnpm-lock.yaml ./
9+
RUN pnpm fetch
10+
11+
COPY ./services ./services
12+
RUN pnpm i --frozen-lockfile
13+
14+
FROM node:20-bookworm-slim as runner
15+
16+
WORKDIR /usr/crowd/app
17+
RUN npm install -g corepack@latest && corepack enable pnpm && corepack prepare pnpm@9.15.0 --activate && apt update && apt install -y ca-certificates --no-install-recommends && rm -rf /var/lib/apt/lists/*
18+
19+
COPY --from=builder /usr/crowd/app/node_modules ./node_modules
20+
COPY --from=builder /usr/crowd/app/services/base.tsconfig.json ./services/base.tsconfig.json
21+
COPY --from=builder /usr/crowd/app/services/libs ./services/libs
22+
COPY --from=builder /usr/crowd/app/services/apps/kafka_connect_monitor/ ./services/apps/kafka_connect_monitor
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
**/.git
2+
**/node_modules
3+
**/venv*
4+
**/.webpack
5+
**/.serverless
6+
**/.env
7+
**/.env.*
8+
**/.idea
9+
**/.vscode
10+
**/dist
11+
.vscode/
12+
.github/
13+
frontend/
14+
scripts/
15+
.flake8
16+
*.md
17+
Makefile
18+
backend/
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"name": "@crowd/kafka-connect-monitor",
3+
"scripts": {
4+
"start": "SERVICE=kafka-connect-monitor tsx src/main.ts",
5+
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && SERVICE=kafka-connect-monitor LOG_LEVEL=trace tsx --inspect=0.0.0.0:9240 src/main.ts",
6+
"start:debug": "SERVICE=kafka-connect-monitor LOG_LEVEL=trace tsx --inspect=0.0.0.0:9240 src/main.ts",
7+
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
8+
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
9+
"lint": "npx eslint --ext .ts src --max-warnings=0",
10+
"format": "npx prettier --write \"src/**/*.ts\"",
11+
"format-check": "npx prettier --check .",
12+
"tsc-check": "tsc --noEmit"
13+
},
14+
"dependencies": {
15+
"@crowd/logging": "workspace:*",
16+
"@types/express": "^4.17.17",
17+
"@types/node": "^20.8.2",
18+
"bunyan-middleware": "^1.0.2",
19+
"express": "^4.18.2",
20+
"prom-client": "^15.1.0",
21+
"tsx": "^4.7.1",
22+
"typescript": "^5.6.3"
23+
},
24+
"devDependencies": {
25+
"nodemon": "^2.0.22"
26+
}
27+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import bunyanMiddleware from 'bunyan-middleware'
2+
import express, { ErrorRequestHandler, Request, RequestHandler } from 'express'
3+
4+
import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging'
5+
6+
import { installConnectorHealthRoutes } from './routes/health'
7+
8+
const log = getServiceLogger()
9+
const PORT = 8085
10+
11+
setImmediate(async () => {
12+
const app = express()
13+
14+
app.use('/health', async (req, res) => {
15+
res.sendStatus(200)
16+
})
17+
18+
app.use(express.json())
19+
app.use(loggingMiddleware(log))
20+
21+
// Install routes
22+
installConnectorHealthRoutes(app, log)
23+
24+
app.use(errorMiddleware())
25+
26+
app.listen(PORT, () => {
27+
log.info(`Kafka Connect Monitor listening on port ${PORT}!`)
28+
})
29+
})
30+
31+
export const errorMiddleware = (): ErrorRequestHandler => {
32+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
33+
return (err, req, res, _next) => {
34+
const request = req as ApiRequest
35+
36+
request.log.error(err, 'Error occurred!')
37+
res.status(500).send('Internal Server Error')
38+
}
39+
}
40+
41+
export interface ApiRequest extends Request {
42+
log: Logger
43+
}
44+
45+
export const loggingMiddleware = (log: Logger): RequestHandler => {
46+
return bunyanMiddleware({
47+
headerName: 'x-request-id',
48+
propertyName: 'requestId',
49+
logName: `requestId`,
50+
logger: getChildLogger('apiRequest', log),
51+
level: 'trace',
52+
})
53+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
import { Request, Response, Router } from 'express'
2+
import { Gauge, Registry } from 'prom-client'
3+
4+
import { Logger } from '@crowd/logging'
5+
6+
const KAFKA_CONNECT_URL = 'http://localhost:8083'
7+
8+
// All possible Kafka Connect states
9+
const CONNECTOR_STATES = ['RUNNING', 'FAILED', 'PAUSED', 'UNASSIGNED'] as const
10+
const TASK_STATES = ['RUNNING', 'FAILED', 'PAUSED', 'UNASSIGNED'] as const
11+
12+
interface ConnectorTask {
13+
id: number
14+
state: string
15+
worker_id: string
16+
}
17+
18+
interface ConnectorStatus {
19+
name: string
20+
connector: {
21+
state: string
22+
worker_id: string
23+
}
24+
tasks: ConnectorTask[]
25+
type: string
26+
}
27+
28+
interface ConnectorsResponse {
29+
[connectorName: string]: {
30+
status: ConnectorStatus
31+
}
32+
}
33+
34+
export function installConnectorHealthRoutes(app: Router, log: Logger): void {
35+
app.get('/connector-health', async (req: Request, res: Response) => {
36+
try {
37+
// Create a new registry for this request
38+
const register = new Registry()
39+
40+
// Fetch connector statuses from Kafka Connect
41+
const connectorsUrl = `${KAFKA_CONNECT_URL}/connectors?expand=status`
42+
const response = await fetch(connectorsUrl)
43+
44+
if (!response.ok) {
45+
log.error(
46+
{ status: response.status, statusText: response.statusText },
47+
'Failed to fetch connector status from Kafka Connect',
48+
)
49+
res.status(500).json({
50+
error: 'Failed to fetch connector status from Kafka Connect',
51+
status: response.status,
52+
})
53+
return
54+
}
55+
56+
const data = (await response.json()) as ConnectorsResponse
57+
58+
// Create gauges for connector status (one-hot encoding per state)
59+
const connectorStatusGauge = new Gauge({
60+
name: 'connector_status',
61+
help: 'Connector status (one-hot: 1 for active state, 0 otherwise)',
62+
labelNames: ['connector', 'worker_id', 'state'],
63+
registers: [register],
64+
})
65+
66+
// Create gauges for task status (one-hot encoding per state)
67+
const taskStatusGauge = new Gauge({
68+
name: 'task_status',
69+
help: 'Task status (one-hot: 1 for active state, 0 otherwise)',
70+
labelNames: ['connector', 'task_id', 'worker_id', 'state'],
71+
registers: [register],
72+
})
73+
74+
// Process each connector
75+
for (const connectorData of Object.values(data)) {
76+
const status = connectorData.status
77+
78+
// Set connector status metric (one-hot: 1 for current state, 0 for all others)
79+
for (const state of CONNECTOR_STATES) {
80+
connectorStatusGauge.set(
81+
{
82+
connector: status.name,
83+
worker_id: status.connector.worker_id,
84+
state,
85+
},
86+
status.connector.state === state ? 1 : 0,
87+
)
88+
}
89+
90+
// Set task status metrics (one-hot: 1 for current state, 0 for all others)
91+
for (const task of status.tasks) {
92+
for (const state of TASK_STATES) {
93+
taskStatusGauge.set(
94+
{
95+
connector: status.name,
96+
task_id: task.id.toString(),
97+
worker_id: task.worker_id,
98+
state,
99+
},
100+
task.state === state ? 1 : 0,
101+
)
102+
}
103+
}
104+
}
105+
106+
// Return metrics in Prometheus format
107+
res.set('Content-Type', register.contentType)
108+
res.send(await register.metrics())
109+
} catch (err) {
110+
log.error(err, 'Error fetching connector health')
111+
res.status(500).json({ error: 'Internal server error' })
112+
}
113+
})
114+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{
2+
"extends": "../../base.tsconfig.json",
3+
"include": ["src/**/*"]
4+
}

0 commit comments

Comments
 (0)