Skip to content

Commit cde8af1

Browse files
kevin-dpclaudeautofix-ci[bot]
authored
feat(electric-db-collection): DNF/active_conditions support (#1270)
* feat(electric-db-collection): add DNF/active_conditions support for arbitrary boolean WHERE clauses Support the new Electric server wire protocol (PR electric-sql/electric#3791): - Change tag delimiter from `|` to `/`, replace `_` wildcards with empty segments (NON_PARTICIPATING positions) - Add `active_conditions` header support for DNF visibility evaluation - Shapes with subquery dependencies use DNF: a row is visible if ANY disjunct has ALL its positions satisfied in active_conditions - Simple shapes (no subquery dependencies) retain existing behavior: row deleted when tag set becomes empty - Derive disjunct_positions once per shape (not per-row like the Elixir client) since the DNF structure is fixed by the WHERE clause Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * ci: apply automated fixes * chore: add changeset for DNF/active_conditions support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat(electric-db-collection): add move-in support and rename MoveOutPattern to MovePattern - Rename MoveOutPattern to MovePattern in tag-index.ts and electric.ts - Add isMoveInMessage() type guard - Add processMoveInEvent(): re-activates conditions for matching rows (silent operation, no messages emitted to collection) - Refactor move-out in DNF mode to preserve tag index entries for visible rows, so move-in can find them to re-activate positions - Handle move-in in message loop and progressive mode atomic swap Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test(electric-db-collection): add move-in and DNF coverage tests Add tests ported from the Elixir client's tag_tracker_test.exs: - Move-in activates correct positions - Move-out → move-in → move-out full lifecycle cycle - Deleted rows not resurrected by move-in (tag index cleaned up) - Orphaned tag index entries don't cause phantom deletes - Deleted row cleans up ALL tag index entries - Multiple patterns deactivating same row in one call - Unit tests for parseTag, rowVisible, and deriveDisjunctPositions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: use preview @electric-sql/client with move-in support Switch to the preview package from electric-sql/electric#4043 so that move-in event types are available without unsafe casts in tests. NOTE: replace the preview URL with the released version before merging. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * ci: apply automated fixes * Update electric-sql/client version * Bump electric-sql/client version --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent c314c36 commit cde8af1

9 files changed

Lines changed: 3505 additions & 1649 deletions

File tree

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@tanstack/electric-db-collection': minor
3+
---
4+
5+
feat: add DNF/active_conditions support for arbitrary boolean WHERE clauses
6+
7+
Support the new Electric server wire protocol (electric-sql/electric#3791). Tags now use `/` delimiter with empty segments for non-participating positions. Shapes with subquery dependencies send `active_conditions` headers and use DNF evaluation for row visibility. Simple shapes without subqueries retain existing empty-tag-set deletion behavior.

packages/electric-db-collection/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
"src"
4747
],
4848
"dependencies": {
49-
"@electric-sql/client": "^1.5.13",
49+
"@electric-sql/client": "^1.5.14",
5050
"@standard-schema/spec": "^1.1.0",
5151
"@tanstack/db": "workspace:*",
5252
"@tanstack/store": "^0.9.2",

packages/electric-db-collection/src/electric.ts

Lines changed: 107 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,21 @@ import {
1616
import { compileSQL } from './sql-compiler'
1717
import {
1818
addTagToIndex,
19+
deriveDisjunctPositions,
1920
findRowsMatchingPattern,
2021
getTagLength,
22+
isMoveInMessage,
2123
isMoveOutMessage,
24+
parseTag as parseTagString,
2225
removeTagFromIndex,
26+
rowVisible,
2327
tagMatchesPattern,
2428
} from './tag-index'
2529
import type { ColumnEncoder } from './sql-compiler'
2630
import type {
27-
MoveOutPattern,
31+
ActiveConditions,
32+
DisjunctPositions,
33+
MovePattern,
2834
MoveTag,
2935
ParsedMoveTag,
3036
RowId,
@@ -981,16 +987,16 @@ function createElectricSync<T extends Row<unknown>>(
981987

982988
const tagCache = new Map<MoveTag, ParsedMoveTag>()
983989

984-
// Parses a tag string into a MoveTag.
990+
// Parses a tag string into a ParsedMoveTag.
985991
// It memoizes the result parsed tag such that future calls
986-
// for the same tag string return the same MoveTag array.
992+
// for the same tag string return the same ParsedMoveTag array.
987993
const parseTag = (tag: MoveTag): ParsedMoveTag => {
988994
const cachedTag = tagCache.get(tag)
989995
if (cachedTag) {
990996
return cachedTag
991997
}
992998

993-
const parsedTag = tag.split(`|`)
999+
const parsedTag = parseTagString(tag)
9941000
tagCache.set(tag, parsedTag)
9951001
return parsedTag
9961002
}
@@ -1000,6 +1006,11 @@ function createElectricSync<T extends Row<unknown>>(
10001006
const tagIndex: TagIndex = []
10011007
let tagLength: number | undefined = undefined
10021008

1009+
// DNF state: active_conditions are per-row, disjunct_positions are global
1010+
// (fixed by the shape's WHERE clause, derived once from the first tagged message).
1011+
const rowActiveConditions = new Map<RowId, ActiveConditions>()
1012+
let disjunctPositions: DisjunctPositions | undefined = undefined
1013+
10031014
/**
10041015
* Initialize the tag index with the correct length
10051016
*/
@@ -1074,6 +1085,7 @@ function createElectricSync<T extends Row<unknown>>(
10741085
tags: Array<MoveTag> | undefined,
10751086
removedTags: Array<MoveTag> | undefined,
10761087
rowId: RowId,
1088+
activeConditions?: ActiveConditions,
10771089
): Set<MoveTag> => {
10781090
// Initialize tag set for this row if it doesn't exist (needed for checking deletion)
10791091
if (!rowTagSets.has(rowId)) {
@@ -1084,13 +1096,24 @@ function createElectricSync<T extends Row<unknown>>(
10841096
// Add new tags
10851097
if (tags) {
10861098
addTagsToRow(tags, rowId, rowTagSet)
1099+
1100+
// Derive disjunct positions once — they are fixed by the shape's WHERE clause.
1101+
if (disjunctPositions === undefined) {
1102+
const parsedTags = tags.map(parseTag)
1103+
disjunctPositions = deriveDisjunctPositions(parsedTags)
1104+
}
10871105
}
10881106

10891107
// Remove tags
10901108
if (removedTags) {
10911109
removeTagsFromRow(removedTags, rowId, rowTagSet)
10921110
}
10931111

1112+
// Store active conditions if provided (overwrite on re-send)
1113+
if (activeConditions && activeConditions.length > 0) {
1114+
rowActiveConditions.set(rowId, [...activeConditions])
1115+
}
1116+
10941117
return rowTagSet
10951118
}
10961119

@@ -1101,6 +1124,8 @@ function createElectricSync<T extends Row<unknown>>(
11011124
rowTagSets.clear()
11021125
tagIndex.length = 0
11031126
tagLength = undefined
1127+
rowActiveConditions.clear()
1128+
disjunctPositions = undefined
11041129
}
11051130

11061131
/**
@@ -1129,22 +1154,45 @@ function createElectricSync<T extends Row<unknown>>(
11291154

11301155
// Remove the row from the tag sets map
11311156
rowTagSets.delete(rowId)
1157+
rowActiveConditions.delete(rowId)
11321158
}
11331159

11341160
/**
11351161
* Remove matching tags from a row based on a pattern
1136-
* Returns true if the row's tag set is now empty
1162+
* Returns true if the row should be deleted (no longer visible)
11371163
*/
11381164
const removeMatchingTagsFromRow = (
11391165
rowId: RowId,
1140-
pattern: MoveOutPattern,
1166+
pattern: MovePattern,
11411167
): boolean => {
11421168
const rowTagSet = rowTagSets.get(rowId)
11431169
if (!rowTagSet) {
11441170
return false
11451171
}
11461172

1147-
// Find tags that match this pattern and remove them
1173+
// DNF mode: check visibility using active conditions.
1174+
// Tag index entries are preserved so that move-in can re-activate positions.
1175+
const activeConditions = rowActiveConditions.get(rowId)
1176+
if (activeConditions && disjunctPositions) {
1177+
// Set the condition at this pattern's position to false
1178+
activeConditions[pattern.pos] = false
1179+
1180+
if (!rowVisible(activeConditions, disjunctPositions)) {
1181+
// Row is no longer visible — clean up all state including tag index
1182+
for (const tag of rowTagSet) {
1183+
const parsedTag = parseTag(tag)
1184+
removeTagFromIndex(parsedTag, rowId, tagIndex, tagLength!)
1185+
tagCache.delete(tag)
1186+
}
1187+
rowTagSets.delete(rowId)
1188+
rowActiveConditions.delete(rowId)
1189+
return true
1190+
}
1191+
return false
1192+
}
1193+
1194+
// Simple shape (no subquery dependencies — server sends no active_conditions):
1195+
// Remove matching tags and delete if tag set is empty
11481196
for (const tag of rowTagSet) {
11491197
const parsedTag = parseTag(tag)
11501198
if (tagMatchesPattern(parsedTag, pattern)) {
@@ -1153,7 +1201,6 @@ function createElectricSync<T extends Row<unknown>>(
11531201
}
11541202
}
11551203

1156-
// Check if row's tag set is now empty
11571204
if (rowTagSet.size === 0) {
11581205
rowTagSets.delete(rowId)
11591206
return true
@@ -1166,7 +1213,7 @@ function createElectricSync<T extends Row<unknown>>(
11661213
* Process move-out event: remove matching tags from rows and delete rows with empty tag sets
11671214
*/
11681215
const processMoveOutEvent = (
1169-
patterns: Array<MoveOutPattern>,
1216+
patterns: Array<MovePattern>,
11701217
begin: () => void,
11711218
write: (message: ChangeMessageOrDeleteKeyMessage<T>) => void,
11721219
transactionStarted: boolean,
@@ -1204,6 +1251,30 @@ function createElectricSync<T extends Row<unknown>>(
12041251
return txStarted
12051252
}
12061253

1254+
/**
1255+
* Process move-in event: re-activate conditions for rows matching the patterns.
1256+
* This is a silent operation — no messages are emitted to the collection.
1257+
*/
1258+
const processMoveInEvent = (patterns: Array<MovePattern>): void => {
1259+
if (tagLength === undefined) {
1260+
debug(
1261+
`${collectionId ? `[${collectionId}] ` : ``}Received move-in message but no tag length set yet, ignoring`,
1262+
)
1263+
return
1264+
}
1265+
1266+
for (const pattern of patterns) {
1267+
const affectedRowIds = findRowsMatchingPattern(pattern, tagIndex)
1268+
1269+
for (const rowId of affectedRowIds) {
1270+
const activeConditions = rowActiveConditions.get(rowId)
1271+
if (activeConditions) {
1272+
activeConditions[pattern.pos] = true
1273+
}
1274+
}
1275+
}
1276+
}
1277+
12071278
/**
12081279
* Get the sync metadata for insert operations
12091280
* @returns Record containing relation information
@@ -1433,6 +1504,11 @@ function createElectricSync<T extends Row<unknown>>(
14331504
const removedTags = changeMessage.headers.removed_tags
14341505
const hasTags = tags || removedTags
14351506

1507+
// Extract active_conditions from headers (DNF support)
1508+
const activeConditions = changeMessage.headers.active_conditions as
1509+
| ActiveConditions
1510+
| undefined
1511+
14361512
const rowId = collection.getKeyFromItem(changeMessage.value)
14371513
const operation = changeMessage.headers.operation
14381514

@@ -1453,7 +1529,12 @@ function createElectricSync<T extends Row<unknown>>(
14531529
if (isDelete) {
14541530
clearTagsForRow(rowId)
14551531
} else if (hasTags) {
1456-
processTagsForChangeMessage(tags, removedTags, rowId)
1532+
processTagsForChangeMessage(
1533+
tags,
1534+
removedTags,
1535+
rowId,
1536+
activeConditions,
1537+
)
14571538
}
14581539

14591540
write({
@@ -1496,7 +1577,11 @@ function createElectricSync<T extends Row<unknown>>(
14961577

14971578
for (const message of messages) {
14981579
// Add message to current batch buffer (for race condition handling)
1499-
if (isChangeMessage(message) || isMoveOutMessage(message)) {
1580+
if (
1581+
isChangeMessage(message) ||
1582+
isMoveOutMessage(message) ||
1583+
isMoveInMessage(message)
1584+
) {
15001585
currentBatchMessages.setState((currentBuffer) => {
15011586
const newBuffer = [...currentBuffer, message]
15021587
// Limit buffer size for safety
@@ -1593,6 +1678,14 @@ function createElectricSync<T extends Row<unknown>>(
15931678
transactionStarted,
15941679
)
15951680
}
1681+
} else if (isMoveInMessage(message)) {
1682+
// Handle move-in event: re-activate conditions for matching rows.
1683+
// Buffer if buffering, otherwise process immediately.
1684+
if (isBufferingInitialSync() && !transactionStarted) {
1685+
bufferedMessages.push(message)
1686+
} else {
1687+
processMoveInEvent(message.headers.patterns)
1688+
}
15961689
} else if (isMustRefetchMessage(message)) {
15971690
debug(
15981691
`${collectionId ? `[${collectionId}] ` : ``}Received must-refetch message, starting transaction with truncate`,
@@ -1672,6 +1765,9 @@ function createElectricSync<T extends Row<unknown>>(
16721765
write,
16731766
transactionStarted,
16741767
)
1768+
} else if (isMoveInMessage(bufferedMsg)) {
1769+
// Process buffered move-in messages during atomic swap
1770+
processMoveInEvent(bufferedMsg.headers.patterns)
16751771
}
16761772
}
16771773

0 commit comments

Comments
 (0)