Go DCP Cassandra streams documents from Couchbase Database Change Protocol (DCP) and writes to Cassandra tables in near real-time.
- Custom Cassandra queries per DCP event.
- Update multiple documents for a DCP event(see Example).
- Handling different DCP events such as expiration, deletion and mutation(see Example).
- Buffer accumulation with configurable size and time limits for high-throughput batching.
- Concurrent writes with configurable in-flight request cap.
- Per-event unlogged batches for mappers that emit multiple rows per DCP event.
- Scale up and down by custom membership algorithms(Couchbase, KubernetesHa, Kubernetes StatefulSet or Static, see examples).
- Easily manageable configurations.
- Flexible mapping with support for both default mapper (configuration-based) and custom mapper (code-based).
- Graceful shutdown with safe batch writing to Cassandra.
Note: If you prefer to use the default mapper by entering the configuration instead of creating a custom mapper, please refer to this topic. Otherwise, you can refer to the example provided below:
package main
import (
dcpcassandra "github.com/Trendyol/go-dcp-cassandra"
)
func CustomMapper(event couchbase.Event) []cassandra.Model {
//
}
func main() {
connector, err := dcpcassandra.NewConnectorBuilder("config.yml").
SetMapper(CustomMapper). // NOT NEEDED IF YOU'RE USING DEFAULT MAPPER. JUST CALL Build() FUNCTION
Build()
if err != nil {
panic(err)
}
defer connector.Close()
connector.Start()
}Couchbase DCP
│
▼
mapper() ← your code transforms a DCP event into []cassandra.Model
│
▼
active buffer ← shared in-memory buffer, mutex protected
│
├── flush trigger: batchSizeLimit, batchByteSizeLimit, or batchTickerDuration
│
▼
swap buffer ← active buffer swapped out; DCP immediately resumes writing to fresh buffer
│
├── wait for previous flush to complete (max 2 buffers: one active, one in-flight)
│ if Cassandra is slow, AddActions blocks here providing backpressure
│
▼
concurrent writes ← up to maxInFlightRequests goroutines write to Cassandra simultaneously
│ if batchPerEvent: multi-row events grouped into a single UNLOGGED BATCH
│
▼
all writes done → ack all items in order → dcpClient.Commit()
│
▼
Cassandra
DCP events are accumulated in a shared in-memory buffer. A flush is triggered when any of the following conditions are met:
- The number of items reaches
batchSizeLimit - The estimated byte size of the buffer reaches
batchByteSizeLimit - The
batchTickerDurationinterval elapses
On flush, the buffer is swapped atomically — the active buffer is replaced with a fresh empty one and returned immediately, so AddActions is never blocked waiting for Cassandra writes.
Only one flush is in flight at a time. If a new flush is triggered while the previous one is still writing, it waits until the previous flush's workers finish before starting its own. This preserves correct checkpoint ordering.
During a flush, up to maxInFlightRequests goroutines write to Cassandra concurrently. This is the primary throughput knob — set it to match the number of Cassandra nodes × connections per node.
With writeTimestamp: event_time or writeTimestamp: now, concurrent writes for the same key are safe — the USING TIMESTAMP clause ensures the most recent event always wins in Cassandra regardless of write completion order.
When batchPerEvent: true, actions returned by the mapper for a single DCP event are grouped into one CQL UNLOGGED BATCH statement. This reduces round trips for mappers that emit multiple rows per event (e.g. fan-out to multiple tables).
When batchPerEvent: false (default), every action is an individual prepared statement routed directly to the owning Cassandra replica via token-aware routing.
dcpClient.Commit() is called once per flush, after all writes in the flush complete. This is much cheaper than committing per-event — at high throughput, commits happen at the flush boundary (controlled by batchSizeLimit and batchTickerDuration) rather than on every DCP event.
All items in a flush are acked after writes complete, so the checkpoint always reflects a consistent written state. On a crash, the entire last flush window is replayed.
Set writeTimestamp to attach a USING TIMESTAMP clause to every write:
none(default): no timestamp clause; Cassandra uses the server-side write timeevent_time: uses the DCP event time in microseconds; correct ordering even with concurrent writesnow: uses the wall clock at ingestion time in microseconds; same correctness guarantee asevent_time
Recommended when using concurrent writes (maxInFlightRequests > 1) to ensure last-write-wins correctness.
hosts:
- http://localhost:8091
bucketName: example_bucket
logging:
level: info
dcp:
group:
name: example_group
checkpoint:
type: auto # go-dcp persists checkpoints automatically on an interval
interval: 10s # tune based on acceptable replay window on crash
metadata:
config:
bucket: metadata
cassandra:
hosts:
- localhost:9042
keyspace: example_keyspace
timeout: 10s
batchSizeLimit: 2000
batchByteSizeLimit: 10485760
batchTickerDuration: 10s
maxInFlightRequests: 100
batchPerEvent: false
writeTimestamp: event_time
hostSelectionPolicy: token_aware
collectionTableMapping:
- collection: example_collection
tableName: example_table
fieldMappings:
id: "id" # Document key → id column
partition_key: "partitionId" # JSON field partitionId → partition_key column
data: "documentData" # Full JSON document → data column
metadata: "meta" # JSON field meta → metadata column
status: "status" # JSON field status → status columnCheck out on go-dcp
| Variable | Type | Required | Default | Description |
|---|---|---|---|---|
cassandra.hosts |
[]string | yes | Cassandra connection hosts | |
cassandra.username |
string | yes | Cassandra username | |
cassandra.password |
string | yes | Cassandra password | |
cassandra.keyspace |
string | yes | Cassandra keyspace name | |
cassandra.timeout |
time.Duration | no | 10s | Cassandra query timeout |
cassandra.batchSizeLimit |
int | no | 2000 | Flush the buffer when this many items have accumulated |
cassandra.batchByteSizeLimit |
int | no | 10485760 | Flush the buffer when its estimated byte size exceeds this limit |
cassandra.batchTickerDuration |
time.Duration | no | 10s | Flush the buffer at this interval even if size/byte limits are not reached |
cassandra.maxInFlightRequests |
int | no | 100 | Maximum concurrent Cassandra writes during a flush. Set to Cassandra node count × connections per node |
cassandra.batchPerEvent |
bool | no | false | Group multiple rows from the same DCP event into a single CQL UNLOGGED BATCH. Useful when the mapper emits multiple rows per event |
cassandra.writeTimestamp |
string | no | none | none, event_time (DCP event time in µs), or now (ingestion wall clock in µs). Recommended when maxInFlightRequests > 1 |
cassandra.hostSelectionPolicy |
string | no | token_aware | token_aware (default) or round_robin |
cassandra.consistency |
string | no | QUORUM | Cassandra consistency level |
cassandra.tableName |
string | no | Target table name (used when no collection mapping is configured) | |
cassandra.collectionTableMapping |
[]CollectionTableMapping | no | Used by the default mapper. See next section |
When implementing a custom mapper, the couchbase.Event you receive has different payloads depending on the event type:
| Event Type | event.Key |
event.Value |
Description |
|---|---|---|---|
| Mutation | Document key | Full document body (JSON) | Document was created or updated |
| Deletion | Document key | Document body (may be present depending on DCP stream config) | Document was deleted |
| Expiration | Document key | Always nil (Couchbase DCP protocol does not include body for expirations) | Document TTL expired |
When writing a custom mapper, rely on the event type to determine the operation rather than checking whether event.Value is nil.
Collection table mapping configuration is optional. This configuration should only be provided if you are using the default mapper. If you are implementing your own custom mapper function, this configuration is not needed.
| Variable | Type | Required | Default | Description |
|---|---|---|---|---|
cassandra.collectionTableMapping[].collection |
string | yes | Couchbase collection name | |
cassandra.collectionTableMapping[].tableName |
string | yes | Target Cassandra table name | |
cassandra.collectionTableMapping[].fieldMappings |
map | yes | Mapping between Cassandra columns and JSON document fields. Key is Cassandra column name, value is source field name. Special values: _key for document key, documentData for full JSON document |
|
cassandra.collectionTableMapping[].primaryKeyFields |
[]string | no | Cassandra column names that form the primary key. When set, DELETE and expiration operations only include these columns in the WHERE clause, preventing tombstones from null non-PK columns. Each name must exist as a key in fieldMappings. |
Given a Couchbase document:
{
"userId": "user123",
"name": "John Doe",
"email": "john@example.com",
"metadata": {
"createdAt": "2024-01-01T00:00:00Z",
"version": 1
}
}And field mappings:
fieldMappings:
id: "id" # Document key → id column
user_id: "userId" # JSON field userId → user_id column
full_name: "name" # JSON field name → full_name column
email_address: "email" # JSON field email → email_address column
raw_data: "documentData" # Full JSON document → raw_data column
meta_info: "metadata" # JSON field metadata → meta_info columnThe resulting Cassandra row will have:
id: Document key (e.g., "doc123")user_id: "user123"full_name: "John Doe"email_address: "john@example.com"raw_data: Full JSON stringmeta_info: {"createdAt": "2024-01-01T00:00:00Z", "version": 1}
When a document is deleted or expires, the default mapper builds a DELETE FROM table WHERE ... query using all mapped fields. For tables with non-PK columns, this writes null values and creates Cassandra tombstones over time.
Setting primaryKeyFields limits the WHERE clause to only the primary key columns:
collectionTableMapping:
- collection: orders
tableName: orders_table
primaryKeyFields: ["order_id", "partition_id"]
fieldMappings:
order_id: "_key"
partition_id: "partitionId"
status: "status"
data: "documentData"With this config, a delete event produces DELETE FROM orders_table WHERE order_id = ? AND partition_id = ? instead of including status and data in the query.
Without primaryKeyFields, existing behavior is preserved and all mapped fields are included.
| Metric Name | Description | Labels | Value Type |
|---|---|---|---|
| go_dcp_cassandra_connector_latency_ms_current | Time to adding to the batch. | N/A | Gauge |
| go_dcp_cassandra_connector_bulk_request_process_latency_ms_current | Time to process bulk request. | N/A | Gauge |
You can also use all DCP-related metrics explained here. All DCP-related metrics are automatically injected. It means you don't need to do anything.
- Cassandra write errors: Application panics and does not commit to Couchbase to ensure data consistency
- Cassandra connection errors
- Couchbase connection errors
- Document parsing errors
- Timeout errors
- Network connection issues
Go DCP Cassandra is always open for direct contributions. For more information please check our Contribution Guideline document.
Released under the MIT License.