Skip to content

Commit 2a77484

Browse files
committed
More improvements
1 parent 7f59f07 commit 2a77484

14 files changed

Lines changed: 522 additions & 161 deletions

File tree

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/Buffer.kt

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ import kotlinx.coroutines.flow.channelFlow
1111
import kotlinx.coroutines.flow.produceIn
1212
import kotlinx.coroutines.selects.onTimeout
1313
import kotlinx.coroutines.selects.whileSelect
14+
import kotlinx.coroutines.yield
1415

1516
/**
1617
* Buffers all elements emitted until there is a period of no emissions greater than
1718
* [timeoutMillis], then emits all buffered elements within a [List].
1819
*
1920
* If the upstream [Flow] completes, any remaining elements are emitted immediately.
2021
*/
21-
fun <T : Any?> Flow<T>.bufferingDebounce(timeoutMillis: Long): Flow<List<T>> = channelFlow {
22+
fun <T> Flow<T>.bufferingDebounce(timeoutMillis: Long): Flow<List<T>> = channelFlow {
2223
val itemChannel = produceIn(this)
2324
var bufferedItems = mutableListOf<T>()
2425
whileSelect {
@@ -31,7 +32,14 @@ fun <T : Any?> Flow<T>.bufferingDebounce(timeoutMillis: Long): Flow<List<T>> = c
3132
itemChannel.onReceiveCatching { result ->
3233
result
3334
.onSuccess { item -> bufferedItems += item }
34-
.onFailure { if (bufferedItems.isNotEmpty()) send(bufferedItems) }
35+
.onFailure {
36+
if (bufferedItems.isNotEmpty()) {
37+
send(bufferedItems)
38+
bufferedItems = mutableListOf()
39+
yield()
40+
}
41+
it?.let { throw it }
42+
}
3543
.isSuccess
3644
}
3745
}
@@ -43,5 +51,5 @@ fun <T : Any?> Flow<T>.bufferingDebounce(timeoutMillis: Long): Flow<List<T>> = c
4351
*
4452
* If the upstream [Flow] completes, any remaining elements are emitted immediately.
4553
*/
46-
fun <T : Any?> Flow<T>.bufferingDebounce(timeout: Duration): Flow<List<T>> =
54+
fun <T> Flow<T>.bufferingDebounce(timeout: Duration): Flow<List<T>> =
4755
bufferingDebounce(timeout.toMillis())

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/Demultiplex.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ fun <T, K, R> Flow<T>.demultiplexBy(
4545
keyResponseChannels[key] = keyResponseChannel
4646
flow { flowProducer(key, keyResponseChannel.consumeAsFlow()) }
4747
.onEach { response -> send(response) }
48-
.onCompletion { completedKeysChannel.send(key) }
48+
.onCompletion { completedKeysChannel.trySend(key) }
4949
.launchIn(this@callbackFlow)
5050
}
5151
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/MapEvent.kt

Lines changed: 48 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import com.caplin.integration.datasourcex.util.serializable
1010
import java.io.Serializable
1111
import kotlinx.collections.immutable.persistentMapOf
1212
import kotlinx.coroutines.ExperimentalCoroutinesApi
13+
import kotlinx.coroutines.channels.onFailure
14+
import kotlinx.coroutines.channels.onSuccess
1315
import kotlinx.coroutines.channels.produce
1416
import kotlinx.coroutines.flow.Flow
1517
import kotlinx.coroutines.flow.channelFlow
1618
import kotlinx.coroutines.flow.flow
17-
import kotlinx.coroutines.selects.whileSelect
19+
import kotlinx.coroutines.selects.select
1820

1921
/**
2022
* Events representing a mutation to a [Map].
@@ -209,45 +211,57 @@ fun <K : Any, V : Any> Flow<MapEvent<K, V>>.conflateKeys() = channelFlow {
209211
fun nextValueToSend(): MapEvent<K, V>? =
210212
unsentValues.entries.firstOrNull()?.value ?: Populated.takeIf { unsentPopulated }
211213

212-
whileSelect {
213-
nextValueToSend()?.let { value ->
214-
onSend(value) {
215-
when (value) {
216-
is EntryEvent -> unsentValues.remove(value.key)
217-
is Populated -> unsentPopulated = false
214+
var upstreamClosed = false
215+
while (true) {
216+
val value = nextValueToSend()
217+
if (upstreamClosed && value == null) break
218+
219+
select<Unit> {
220+
if (value != null) {
221+
onSend(value) {
222+
when (value) {
223+
is EntryEvent<K, V> -> unsentValues.remove(value.key)
224+
is Populated -> unsentPopulated = false
225+
}
218226
}
219-
true
220227
}
221-
}
222-
upstream.onReceive { event ->
223-
when (event) {
224-
is Populated -> unsentPopulated = true
225-
is EntryEvent -> {
226-
val key = event.key
227-
val oldEvent = unsentValues[key]
228-
if (oldEvent == null) {
229-
unsentValues[key] = event // Nothing to conflate
230-
} else {
231-
val oldValue = oldEvent.oldValue
232-
when (event) {
233-
is Removed -> {
234-
when (oldEvent) {
235-
is Removed -> error("Two Removed events for the same key")
236-
is Upsert -> if (oldValue != null) unsentValues[key] = Removed(key, oldValue)
228+
if (!upstreamClosed) {
229+
upstream.onReceiveCatching { result ->
230+
result
231+
.onSuccess { event ->
232+
when (event) {
233+
is Populated -> unsentPopulated = true
234+
is EntryEvent -> {
235+
val key = event.key
236+
val oldEvent = unsentValues[key]
237+
if (oldEvent == null) {
238+
unsentValues[key] = event // Nothing to conflate
239+
} else {
240+
val oldValue = oldEvent.oldValue
241+
when (event) {
242+
is Removed -> {
243+
when (oldEvent) {
244+
is Removed -> error("Two Removed events for the same key")
245+
is Upsert ->
246+
if (oldValue != null) unsentValues[key] = Removed(key, oldValue)
247+
else unsentValues.remove(key)
248+
}
249+
}
250+
251+
is Upsert -> {
252+
when (oldEvent) {
253+
is Removed -> unsentValues[key] = Upsert(key, null, event.newValue)
254+
is Upsert -> unsentValues[key] = Upsert(key, oldValue, event.newValue)
255+
}
256+
}
257+
}
258+
}
259+
}
237260
}
238261
}
239-
240-
is Upsert -> {
241-
when (oldEvent) {
242-
is Removed -> unsentValues[key] = Upsert(key, null, event.newValue)
243-
is Upsert -> unsentValues[key] = Upsert(key, oldValue, event.newValue)
244-
}
245-
}
246-
}
247-
}
262+
.onFailure { upstreamClosed = true }
248263
}
249264
}
250-
true
251265
}
252266
}
253267
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/SetEvent.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,14 @@ fun <V : Any> Flow<SetEvent<V>>.runningFoldToSet(
128128
is Removed -> {
129129
set = oldSet.remove(setEvent.value)
130130
val changed = oldSet !== set
131-
if (relaxed && !changed) error("Received $setEvent but this did not exist")
131+
if (!relaxed && !changed) error("Received $setEvent but this did not exist")
132132
if (changed && (populated || emitPartials)) emit = true
133133
}
134134

135135
is Insert -> {
136136
set = oldSet.add(setEvent.value)
137137
val changed = oldSet !== set
138-
if (relaxed && !changed) error("Received $setEvent but this already existed")
138+
if (!relaxed && !changed) error("Received $setEvent but this already existed")
139139
if (changed && (populated || emitPartials)) emit = true
140140
}
141141

util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/BufferKtTest.kt

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ package com.caplin.integration.datasourcex.util.flow
55
import app.cash.turbine.test
66
import io.kotest.core.spec.style.FunSpec
77
import io.kotest.matchers.equals.shouldBeEqual
8+
import io.kotest.matchers.types.shouldBeInstanceOf
9+
import java.time.Duration
810
import kotlinx.coroutines.ExperimentalCoroutinesApi
911
import kotlinx.coroutines.channels.Channel
1012
import kotlinx.coroutines.delay
1113
import kotlinx.coroutines.flow.consumeAsFlow
14+
import kotlinx.coroutines.flow.receiveAsFlow
1215

1316
class BufferKtTest :
1417
FunSpec({
15-
test("Buffered debounce") {
18+
test("Buffered debounce with millis") {
1619
val channel = Channel<String>(Channel.BUFFERED)
1720
channel.consumeAsFlow().bufferingDebounce(10).test {
1821
delay(1)
@@ -34,4 +37,38 @@ class BufferKtTest :
3437
awaitComplete()
3538
}
3639
}
40+
41+
test("Buffered debounce with Duration") {
42+
val channel = Channel<String>(Channel.BUFFERED)
43+
channel.consumeAsFlow().bufferingDebounce(Duration.ofMillis(10)).test {
44+
delay(1)
45+
channel.send("A")
46+
delay(11)
47+
awaitItem() shouldBeEqual listOf("A")
48+
channel.close()
49+
awaitComplete()
50+
}
51+
}
52+
53+
test("Buffered debounce upstream error propagation") {
54+
val channel = Channel<String>(Channel.BUFFERED)
55+
channel.receiveAsFlow().bufferingDebounce(10).test {
56+
channel.send("A")
57+
delay(50) // Ensure "A" is processed into bufferedItems
58+
channel.close(IllegalArgumentException("test error"))
59+
awaitItem() shouldBeEqual listOf("A")
60+
awaitError().shouldBeInstanceOf<IllegalArgumentException>()
61+
}
62+
}
63+
64+
test("Buffered debounce immediate emit on completion") {
65+
val channel = Channel<String>(Channel.BUFFERED)
66+
channel.consumeAsFlow().bufferingDebounce(1000).test {
67+
channel.send("A")
68+
channel.send("B")
69+
channel.close()
70+
awaitItem() shouldBeEqual listOf("A", "B")
71+
awaitComplete()
72+
}
73+
}
3774
})
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package com.caplin.integration.datasourcex.util.flow
2+
3+
import app.cash.turbine.test
4+
import io.kotest.assertions.throwables.shouldThrow
5+
import io.kotest.core.spec.style.FunSpec
6+
import io.kotest.matchers.equals.shouldBeEqual
7+
import kotlinx.coroutines.flow.flowOf
8+
9+
class CastKtTest :
10+
FunSpec({
11+
test("cast successfully casts items") {
12+
val flow = flowOf(1, 2, 3)
13+
flow.cast<Number>().test {
14+
awaitItem() shouldBeEqual 1
15+
awaitItem() shouldBeEqual 2
16+
awaitItem() shouldBeEqual 3
17+
awaitComplete()
18+
}
19+
}
20+
21+
test("cast throws ClassCastException on failure when accessed") {
22+
val flow = flowOf(1, "string")
23+
val castFlow = flow.cast<Int>()
24+
shouldThrow<ClassCastException> {
25+
castFlow.collect {
26+
// The cast happens here because 'it' is typed as Int
27+
@Suppress("UNUSED_VARIABLE") val i: Int = it
28+
}
29+
}
30+
}
31+
})
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.caplin.integration.datasourcex.util.flow
2+
3+
import io.kotest.core.spec.style.FunSpec
4+
import io.kotest.matchers.nulls.shouldNotBeNull
5+
6+
class CommonKtTest :
7+
FunSpec({ test("UNSET is a valid non-null instance") { UNSET.shouldNotBeNull() } })

0 commit comments

Comments
 (0)