Skip to content

Commit 5bff7a7

Browse files
committed
Add serializers
1 parent 6dcdb52 commit 5bff7a7

43 files changed

Lines changed: 1408 additions & 296 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
*.sh text eol=lf
44
*.conf text eol=lf
55
gradlew text eol=lf
6+
*.api text eol=lf
67

78
# These files are text and should be normalized (Convert crlf <=> lf)
89
*.kt text

util/api/datasourcex-util.api

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,6 @@ public final class com/caplin/integration/datasourcex/util/ReadWriteLock {
7777
public final fun writeUnlock ()V
7878
}
7979

80-
public final class com/caplin/integration/datasourcex/util/SerializablePersistentMapKt {
81-
public static final fun serializable (Lkotlinx/collections/immutable/PersistentMap;)Lkotlinx/collections/immutable/PersistentMap;
82-
}
83-
84-
public final class com/caplin/integration/datasourcex/util/SerializablePersistentSetKt {
85-
public static final fun serializable (Lkotlinx/collections/immutable/PersistentSet;)Lkotlinx/collections/immutable/PersistentSet;
86-
}
87-
8880
public abstract interface class com/caplin/integration/datasourcex/util/SimpleDataSourceConfig {
8981
public abstract fun getExtraConfig ()Ljava/lang/String;
9082
public abstract fun getLocalLabel ()Ljava/lang/String;
@@ -190,16 +182,48 @@ public abstract interface class com/caplin/integration/datasourcex/util/flow/Flo
190182
public final class com/caplin/integration/datasourcex/util/flow/FlowMapKt {
191183
public static final fun mutableFlowMapOf ()Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
192184
public static final fun mutableFlowMapOf ([Lkotlin/Pair;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
185+
public static final fun runningFoldToMapFlowMapStreamEvent (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
193186
public static final fun simpleToFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
194187
public static final fun toFlowMapIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
195188
public static final fun toMutableFlowMap (Ljava/util/Map;)Lcom/caplin/integration/datasourcex/util/flow/MutableFlowMap;
196189
}
197190

191+
public abstract interface class com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent {
192+
}
193+
194+
public final class com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$EventUpdate : com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent {
195+
public static final synthetic fun box-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Lcom/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$EventUpdate;
196+
public static fun constructor-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;
197+
public fun equals (Ljava/lang/Object;)Z
198+
public static fun equals-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;Ljava/lang/Object;)Z
199+
public static final fun equals-impl0 (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Z
200+
public final fun getEvent ()Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;
201+
public fun hashCode ()I
202+
public static fun hashCode-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)I
203+
public fun toString ()Ljava/lang/String;
204+
public static fun toString-impl (Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;)Ljava/lang/String;
205+
public final synthetic fun unbox-impl ()Lcom/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent;
206+
}
207+
208+
public final class com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$InitialState : com/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent {
209+
public static final synthetic fun box-impl (Ljava/util/Map;)Lcom/caplin/integration/datasourcex/util/flow/FlowMapStreamEvent$InitialState;
210+
public static fun constructor-impl (Ljava/util/Map;)Ljava/util/Map;
211+
public fun equals (Ljava/lang/Object;)Z
212+
public static fun equals-impl (Ljava/util/Map;Ljava/lang/Object;)Z
213+
public static final fun equals-impl0 (Ljava/util/Map;Ljava/util/Map;)Z
214+
public final fun getMap ()Ljava/util/Map;
215+
public fun hashCode ()I
216+
public static fun hashCode-impl (Ljava/util/Map;)I
217+
public fun toString ()Ljava/lang/String;
218+
public static fun toString-impl (Ljava/util/Map;)Ljava/lang/String;
219+
public final synthetic fun unbox-impl ()Ljava/util/Map;
220+
}
221+
198222
public abstract interface class com/caplin/integration/datasourcex/util/flow/LoadingCompletingSharedFlowCache {
199223
public abstract fun get (Ljava/lang/Object;)Lcom/caplin/integration/datasourcex/util/flow/CompletingSharedFlow;
200224
}
201225

202-
public abstract interface class com/caplin/integration/datasourcex/util/flow/MapEvent : java/io/Serializable {
226+
public abstract interface class com/caplin/integration/datasourcex/util/flow/MapEvent {
203227
}
204228

205229
public abstract interface class com/caplin/integration/datasourcex/util/flow/MapEvent$EntryEvent : com/caplin/integration/datasourcex/util/flow/MapEvent {
@@ -257,6 +281,7 @@ public final class com/caplin/integration/datasourcex/util/flow/MapEventKt {
257281

258282
public abstract interface class com/caplin/integration/datasourcex/util/flow/MapFlow {
259283
public abstract fun asFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
284+
public abstract fun asFlowWithState ()Lkotlinx/coroutines/flow/Flow;
260285
public abstract fun valueFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
261286
}
262287

@@ -294,7 +319,7 @@ public final class com/caplin/integration/datasourcex/util/flow/RetryKt {
294319
public static synthetic fun retryWithExponentialBackoff$default (Lkotlinx/coroutines/flow/Flow;JJLkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
295320
}
296321

297-
public abstract interface class com/caplin/integration/datasourcex/util/flow/SetEvent : java/io/Serializable {
322+
public abstract interface class com/caplin/integration/datasourcex/util/flow/SetEvent {
298323
}
299324

300325
public abstract interface class com/caplin/integration/datasourcex/util/flow/SetEvent$EntryEvent : com/caplin/integration/datasourcex/util/flow/SetEvent {
@@ -337,7 +362,7 @@ public final class com/caplin/integration/datasourcex/util/flow/SetEventKt {
337362
public static final fun toEvents (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
338363
}
339364

340-
public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent : java/io/Serializable {
365+
public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent {
341366
}
342367

343368
public abstract interface class com/caplin/integration/datasourcex/util/flow/SimpleMapEvent$EntryEvent : com/caplin/integration/datasourcex/util/flow/SimpleMapEvent {
@@ -393,7 +418,7 @@ public final class com/caplin/integration/datasourcex/util/flow/TimeoutKt {
393418
public static final fun timeoutFirstOrNull (Lkotlinx/coroutines/flow/Flow;Ljava/time/Duration;)Lkotlinx/coroutines/flow/Flow;
394419
}
395420

396-
public abstract interface class com/caplin/integration/datasourcex/util/flow/ValueOrCompletion : java/io/Serializable {
421+
public abstract interface class com/caplin/integration/datasourcex/util/flow/ValueOrCompletion {
397422
public abstract fun map (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
398423
}
399424

@@ -430,3 +455,15 @@ public final class com/caplin/integration/datasourcex/util/flow/ValueOrCompletio
430455
public static final fun materializeUnboxed (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
431456
}
432457

458+
public final class com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModuleKt {
459+
public static final fun registerDataSourceSerializers (Lorg/apache/fory/Fory;)Lorg/apache/fory/Fory;
460+
}
461+
462+
public final class com/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModule : com/fasterxml/jackson/databind/module/SimpleModule {
463+
public static final field INSTANCE Lcom/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModule;
464+
}
465+
466+
public final class com/caplin/integration/datasourcex/util/serialization/jackson/DataSourceModuleKt {
467+
public static final fun registerDataSourceModule (Lcom/fasterxml/jackson/databind/ObjectMapper;)Lcom/fasterxml/jackson/databind/ObjectMapper;
468+
}
469+

util/src/main/kotlin/com/caplin/integration/datasourcex/util/SimpleDataSourceFactory.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.caplin.integration.datasourcex.util
22

33
import com.caplin.datasource.DataSource
44
import com.caplin.datasource.messaging.json.JacksonJsonHandler
5-
import com.caplin.integration.datasourcex.util.flow.registerDataSourceModule
5+
import com.caplin.integration.datasourcex.util.serialization.jackson.registerDataSourceModule
66
import com.fasterxml.jackson.databind.ObjectMapper
77
import com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS
88
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/DataSourceJacksonModule.kt

Lines changed: 0 additions & 145 deletions
This file was deleted.

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,11 @@ interface FlowMap<K : Any, V : Any> : MapFlow<K, V>, Map<K, V> {
9090
sealed interface FlowMapStreamEvent<out K : Any, out V : Any> {
9191
/** Emitted on initial collection, containing the entire initial [map] state. */
9292
@JvmInline
93-
value class InitialState<K : Any, V : Any>(val map: PersistentMap<K, V>) :
94-
FlowMapStreamEvent<K, V>
93+
value class InitialState<K : Any, V : Any>(val map: Map<K, V>) : FlowMapStreamEvent<K, V>
9594

9695
/** Emitted for subsequent updates, containing only the delta ([event]). */
9796
@JvmInline
98-
value class EventUpdate<K : Any, V : Any>(val event: MapEvent<K, V>) : FlowMapStreamEvent<K, V>
97+
value class EventUpdate<K : Any, V : Any>(val event: EntryEvent<K, V>) : FlowMapStreamEvent<K, V>
9998
}
10099

101100
interface MapFlow<K : Any, V : Any> {
@@ -139,6 +138,41 @@ interface MapFlow<K : Any, V : Any> {
139138
fun valueFlow(key: K): Flow<V?>
140139
}
141140

141+
/** Folds a flow of [FlowMapStreamEvent]s into a flow of [Map]. */
142+
@JvmName("runningFoldToMapFlowMapStreamEvent")
143+
fun <K : Any, V : Any> Flow<FlowMapStreamEvent<K, V>>.runningFoldToMap(): Flow<Map<K, V>> = flow {
144+
var map: PersistentMap<K, V>? = null
145+
146+
collect { streamEvent ->
147+
when (streamEvent) {
148+
is FlowMapStreamEvent.InitialState -> {
149+
map = streamEvent.map.toPersistentMap()
150+
emit(map!!)
151+
}
152+
153+
is FlowMapStreamEvent.EventUpdate -> {
154+
val currentMap = map ?: error("InitialState must be received before EventUpdate")
155+
when (val mapEvent = streamEvent.event) {
156+
is Removed -> {
157+
map =
158+
currentMap.remove(mapEvent.key).also { newMap ->
159+
check(newMap !== currentMap) {
160+
"Attempted to remove non existent key ${mapEvent.key}"
161+
}
162+
}
163+
emit(map!!)
164+
}
165+
166+
is Upsert -> {
167+
map = currentMap.put(mapEvent.key, mapEvent.newValue)
168+
emit(map!!)
169+
}
170+
}
171+
}
172+
}
173+
}
174+
}
175+
142176
private class FlowMapImpl<K : Any, V : Any>(initialMap: PersistentMap<K, V>) :
143177
MutableFlowMap<K, V> {
144178
private data class State<K, V>(val version: Long, val map: PersistentMap<K, V>)

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapForySupport.kt

Lines changed: 0 additions & 35 deletions
This file was deleted.

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/Retry.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.retry
1212
* be modified by proving [onRetry] - if a call to this returns `false` it will stop retrying and
1313
* propagate the error downstream.
1414
*/
15-
fun <T : Any?> Flow<T>.retryWithExponentialBackoff(
15+
fun <T> Flow<T>.retryWithExponentialBackoff(
1616
minMillis: Long = 100L,
1717
maxMillis: Long = 60000L,
1818
onRetry: (suspend (Throwable, Long) -> Boolean) = { _, _ -> true },

0 commit comments

Comments
 (0)