Skip to content

Commit 8fd2975

Browse files
committed
Review fixes
1 parent b84223c commit 8fd2975

8 files changed

Lines changed: 55 additions & 15 deletions

File tree

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ sealed interface FlowMapStreamEvent<out K : Any, out V : Any> {
9595
/** Emitted for subsequent updates, containing only the delta ([event]). */
9696
@JvmInline
9797
value class EventUpdate<K : Any, V : Any>(val event: EntryEvent<K, V>) : FlowMapStreamEvent<K, V>
98+
99+
/** Emitted when the map is cleared. */
100+
object Cleared : FlowMapStreamEvent<Nothing, Nothing> {
101+
override fun toString(): String = "Cleared"
102+
}
98103
}
99104

100105
interface MapFlow<K : Any, V : Any> {
@@ -150,6 +155,11 @@ fun <K : Any, V : Any> Flow<FlowMapStreamEvent<K, V>>.runningFoldToMap(): Flow<M
150155
emit(map!!)
151156
}
152157

158+
is FlowMapStreamEvent.Cleared -> {
159+
map = persistentMapOf()
160+
emit(map!!)
161+
}
162+
153163
is FlowMapStreamEvent.EventUpdate -> {
154164
val currentMap = map ?: error("InitialState must be received before EventUpdate")
155165
when (val mapEvent = streamEvent.event) {
@@ -180,6 +190,7 @@ private class FlowMapImpl<K : Any, V : Any>(initialMap: PersistentMap<K, V>) :
180190
private data class FlowMapEvent<K : Any, V : Any>(
181191
val state: State<K, V>,
182192
val events: List<EntryEvent<K, V>>,
193+
val isClear: Boolean = false,
183194
)
184195

185196
private val state = MutableStateFlow(State(0L, initialMap))
@@ -281,9 +292,13 @@ private class FlowMapImpl<K : Any, V : Any>(initialMap: PersistentMap<K, V>) :
281292
emit(FlowMapStreamEvent.InitialState(flowMapEvent.state.map))
282293
first = false
283294
} else {
284-
val events = flowMapEvent.events
285-
for (event in events) {
286-
emit(FlowMapStreamEvent.EventUpdate(event))
295+
if (flowMapEvent.isClear) {
296+
emit(FlowMapStreamEvent.Cleared)
297+
} else {
298+
val events = flowMapEvent.events
299+
for (event in events) {
300+
emit(FlowMapStreamEvent.EventUpdate(event))
301+
}
287302
}
288303
}
289304
}
@@ -334,14 +349,20 @@ private class FlowMapImpl<K : Any, V : Any>(initialMap: PersistentMap<K, V>) :
334349
override fun containsKey(key: K): Boolean = state.value.map.containsKey(key)
335350

336351
override fun putAll(from: Map<out K, V>) {
337-
val (prev, next) = state.updateAndGetPrevAndNext { State(it.version + 1, it.map.putAll(from)) }
338-
339-
val events =
340-
from.mapNotNull { (key, newValue) ->
341-
val oldValue = prev.map[key]
342-
if (newValue != oldValue) Upsert(key, oldValue, newValue) else null
352+
val (prev, next) =
353+
state.updateAndGetPrevAndNext {
354+
val nextMap = it.map.putAll(from)
355+
if (nextMap === it.map) it else State(it.version + 1, nextMap)
343356
}
344-
if (events.isNotEmpty()) signal.tryEmit(FlowMapEvent(next, events))
357+
358+
if (prev != next) {
359+
val events =
360+
from.mapNotNull { (key, newValue) ->
361+
val oldValue = prev.map[key]
362+
if (newValue != oldValue) Upsert(key, oldValue, newValue) else null
363+
}
364+
if (events.isNotEmpty()) signal.tryEmit(FlowMapEvent(next, events))
365+
}
345366
}
346367

347368
override fun clear() {
@@ -350,7 +371,9 @@ private class FlowMapImpl<K : Any, V : Any>(initialMap: PersistentMap<K, V>) :
350371
if (it.map.isEmpty()) it else State(it.version + 1, it.map.clear())
351372
}
352373
if (prev.map.isNotEmpty())
353-
signal.tryEmit(FlowMapEvent(next, prev.map.map { Removed(it.key, it.value) }))
374+
signal.tryEmit(
375+
FlowMapEvent(next, prev.map.map { Removed(it.key, it.value) }, isClear = true)
376+
)
354377
}
355378
}
356379

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,15 +159,19 @@ fun <V : Any, R> Flow<SetEvent<V>>.flatMapLatestAndMerge(
159159
val jobs = ConcurrentHashMap<V, Job>()
160160
collect { setEvent ->
161161
when (setEvent) {
162-
is EntryEvent<V> -> {
162+
is Insert<V> -> {
163163
jobs[setEvent.value]?.cancelAndJoin()
164164
jobs[setEvent.value] =
165165
entryEventTransformer(setEvent)
166166
.onEach { send(it) }
167-
.onCompletion { throwable -> if (throwable == null) jobs.remove(setEvent.value) }
167+
.onCompletion { jobs.remove(setEvent.value) }
168168
.launchIn(this@channelFlow)
169169
}
170170

171+
is Removed<V> -> {
172+
jobs.remove(setEvent.value)?.cancelAndJoin()
173+
}
174+
171175
is Populated -> {}
172176
}
173177
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SimpleMapEvent.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ fun <K : Any, V : Any> Flow<SimpleMapEvent<K, V>>.runningFoldToMap(
113113
var populated = false
114114
var map = persistentMapOf<K, V>()
115115

116+
if (emitPartials) emit(map)
117+
116118
collect { mapEvent ->
117119
var emit = false
118120
when (mapEvent) {
@@ -130,8 +132,9 @@ fun <K : Any, V : Any> Flow<SimpleMapEvent<K, V>>.runningFoldToMap(
130132
}
131133

132134
is Populated -> {
135+
if (populated) error("Populated event already received")
133136
populated = true
134-
if (!emitted || !emitPartials) emit = true
137+
if (!emitted && !emitPartials) emit = true
135138
}
136139
}
137140
if (emit) {

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/Timeout.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ fun <T, R> Flow<T>.timeoutFirstOrDefault(millis: Long, default: () -> R): Flow<R
2121
val receiveChannel = produce { collect { send(it) } }
2222

2323
select {
24-
receiveChannel.onReceive { result -> send(result as R) }
24+
receiveChannel.onReceiveCatching { result -> result.getOrNull()?.let { send(it as R) } }
2525
onTimeout(millis) { send(default()) }
2626
}
2727
receiveChannel.consumeEach { send(it as R) }

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/FlowMapStreamEventSerializer.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ internal class FlowMapStreamEventSerializer(fory: Fory, type: Class<FlowMapStrea
1212
private enum class Type {
1313
INITIAL_STATE,
1414
EVENT_UPDATE,
15+
CLEARED,
1516
}
1617

1718
override fun write(buffer: MemoryBuffer, value: FlowMapStreamEvent<*, *>) {
@@ -24,6 +25,9 @@ internal class FlowMapStreamEventSerializer(fory: Fory, type: Class<FlowMapStrea
2425
buffer.writeByte(Type.EVENT_UPDATE.ordinal.toByte())
2526
fory.writeRef(buffer, value.event)
2627
}
28+
is FlowMapStreamEvent.Cleared -> {
29+
buffer.writeByte(Type.CLEARED.ordinal.toByte())
30+
}
2731
}
2832
}
2933

@@ -37,6 +41,7 @@ internal class FlowMapStreamEventSerializer(fory: Fory, type: Class<FlowMapStrea
3741
val event = fory.readRef(buffer) as MapEvent.EntryEvent<Any, Any>
3842
FlowMapStreamEvent.EventUpdate(event)
3943
}
44+
Type.CLEARED -> FlowMapStreamEvent.Cleared
4045
}
4146
}
4247
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson/FlowMapStreamEventDeserializer.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ internal class FlowMapStreamEventDeserializer :
3131
as MapEvent.EntryEvent<Any, Any>
3232
FlowMapStreamEvent.EventUpdate(event)
3333
}
34+
"cleared" -> FlowMapStreamEvent.Cleared
3435
else -> throw JsonMappingException.from(p, "Unknown type: $type")
3536
}
3637
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/jackson/FlowMapStreamEventSerializer.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ internal class FlowMapStreamEventSerializer :
2424
gen.writeFieldName("event")
2525
provider.defaultSerializeValue(value.event, gen)
2626
}
27+
is FlowMapStreamEvent.Cleared -> {
28+
gen.writeStringField("type", "cleared")
29+
}
2730
}
2831
gen.writeEndObject()
2932
}

util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/SimpleMapEventKtTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class SimpleMapEventKtTest :
4242
)
4343
.runningFoldToMap(emitPartials = true)
4444
.test {
45+
awaitItem() shouldContainExactly emptyMap()
4546
awaitItem() shouldContainExactly mapOf("K" to "v1")
4647
awaitItem() shouldContainExactly mapOf("K" to "v2")
4748
awaitItem() shouldContainExactly mapOf("K" to "v2", "K2" to "v3")

0 commit comments

Comments
 (0)