Skip to content

Commit 491111d

Browse files
committed
More performance improvements to FlowMap
1 parent 2a77484 commit 491111d

1 file changed

Lines changed: 54 additions & 32 deletions

File tree

  • util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import kotlinx.coroutines.flow.MutableSharedFlow
1616
import kotlinx.coroutines.flow.MutableStateFlow
1717
import kotlinx.coroutines.flow.conflate
1818
import kotlinx.coroutines.flow.distinctUntilChanged
19-
import kotlinx.coroutines.flow.filterNotNull
2019
import kotlinx.coroutines.flow.flow
2120
import kotlinx.coroutines.flow.launchIn
2221
import kotlinx.coroutines.flow.map
@@ -133,11 +132,13 @@ private class FlowMapImpl<K : Any, V : Any>(initialMap: PersistentMap<K, V>) :
133132
version = it.state.version
134133
emit(it)
135134
if (held != null) {
136-
var i = 1
137-
do {
138-
val next = held?.remove(expectedVersion + i++)
139-
if (next != null) emit(next)
140-
} while (next != null && held?.isNotEmpty() == true)
135+
var nextVersion = version + 1
136+
while (held?.containsKey(nextVersion) == true) {
137+
val next = held!!.remove(nextVersion)!!
138+
emit(next)
139+
version = next.state.version
140+
nextVersion = version + 1
141+
}
141142
if (held?.isEmpty() == true) held = null
142143
}
143144
} else if (it.state.version > expectedVersion) {
@@ -152,36 +153,57 @@ private class FlowMapImpl<K : Any, V : Any>(initialMap: PersistentMap<K, V>) :
152153
override fun asFlow(predicate: ((K, V) -> Boolean)?): Flow<MapEvent<K, V>> = flow {
153154
val emittedKeys = if (predicate != null) mutableSetOf<K>() else null
154155

155-
suspend fun processEvents(mapEvents: List<MapEvent<K, V>>) {
156-
mapEvents.forEach { mapEvent ->
157-
if (emittedKeys == null || predicate == null) emit(mapEvent)
158-
else
159-
when (mapEvent) {
160-
is Removed -> if (emittedKeys.remove(mapEvent.key)) emit(mapEvent)
161-
is Upsert ->
162-
if (predicate(mapEvent.key, mapEvent.newValue)) {
163-
val newValue =
164-
if (emittedKeys.add(mapEvent.key))
165-
Upsert(mapEvent.key, null, mapEvent.newValue)
166-
else mapEvent
167-
emit(newValue)
168-
} else if (emittedKeys.remove(mapEvent.key))
169-
emit(Removed(mapEvent.key, mapEvent.oldValue!!))
170-
171-
else -> {}
172-
}
173-
}
174-
}
175-
176156
var first = true
177-
orderedSignal.filterNotNull().collect { flowMapEvent ->
157+
orderedSignal.collect { flowMapEvent ->
178158
if (first) {
179-
flowMapEvent.state.map.entries
180-
.map { Upsert(it.key, null, it.value) }
181-
.let { processEvents(it) }
159+
val map = flowMapEvent.state.map
160+
if (predicate == null) {
161+
for (entry in map.entries) {
162+
emit(Upsert(entry.key, null, entry.value))
163+
}
164+
} else {
165+
for (entry in map.entries) {
166+
val key = entry.key
167+
val value = entry.value
168+
if (predicate(key, value)) {
169+
emittedKeys!!.add(key)
170+
emit(Upsert(key, null, value))
171+
}
172+
}
173+
}
182174
emit(Populated)
183175
first = false
184-
} else processEvents(flowMapEvent.events)
176+
} else {
177+
val events = flowMapEvent.events
178+
if (predicate == null) {
179+
for (event in events) {
180+
emit(event)
181+
}
182+
} else {
183+
for (event in events) {
184+
when (event) {
185+
is Removed -> {
186+
if (emittedKeys!!.remove(event.key)) emit(event)
187+
}
188+
189+
is Upsert -> {
190+
val key = event.key
191+
val newValue = event.newValue
192+
if (predicate(key, newValue)) {
193+
val wasEmitted = !emittedKeys!!.add(key)
194+
if (wasEmitted) {
195+
emit(event)
196+
} else {
197+
emit(Upsert(key, null, newValue))
198+
}
199+
} else if (emittedKeys!!.remove(key)) {
200+
emit(Removed(key, event.oldValue!!))
201+
}
202+
}
203+
}
204+
}
205+
}
206+
}
185207
}
186208
}
187209

0 commit comments

Comments
 (0)