Skip to content

Commit 6dcdb52

Browse files
committed
Drop native java serialization support, add fory and jackson serializers, and add benchmarks
1 parent 4bfdfb9 commit 6dcdb52

4 files changed

Lines changed: 27 additions & 15 deletions

File tree

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMap.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ interface FlowMap<K : Any, V : Any> : MapFlow<K, V>, Map<K, V> {
9090
sealed interface FlowMapStreamEvent<out K : Any, out V : Any> {
9191
/** Emitted on initial collection, containing the entire initial [map] state. */
9292
@JvmInline
93-
value class InitialState<K : Any, V : Any>(val map: PersistentMap<K, V>) : FlowMapStreamEvent<K, V>
93+
value class InitialState<K : Any, V : Any>(val map: PersistentMap<K, V>) :
94+
FlowMapStreamEvent<K, V>
9495

9596
/** Emitted for subsequent updates, containing only the delta ([event]). */
9697
@JvmInline

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapForySupport.kt

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,27 @@ import org.apache.fory.Fory
66
import org.apache.fory.serializer.collection.MapSerializer
77

88
/**
9-
* Registers serializers for [PersistentMap] and other internal types with the provided [Fory] instance.
9+
* Registers serializers for [PersistentMap] and other internal types with the provided [Fory]
10+
* instance.
1011
*/
1112
fun Fory.registerDataSourceSerializers(): Fory = apply {
1213
registerSerializer(PersistentMap::class.java, PersistentMapSerializer::class.java)
1314
// Kotlin immutable collections implementations often have internal names
1415
val hashMapClass =
15-
runCatching { Class.forName("kotlinx.collections.immutable.implementations.immutableMap.PersistentHashMap") }.getOrNull()
16+
runCatching {
17+
Class.forName(
18+
"kotlinx.collections.immutable.implementations.immutableMap.PersistentHashMap"
19+
)
20+
}
21+
.getOrNull()
1622
if (hashMapClass != null) {
1723
registerSerializer(hashMapClass, PersistentMapSerializer::class.java)
1824
}
1925
}
2026

21-
/**
22-
* A Fory [MapSerializer] for [PersistentMap].
23-
*/
24-
class PersistentMapSerializer(fory: Fory, type: Class<PersistentMap<*, *>>) : MapSerializer<PersistentMap<*, *>>(fory, type) {
27+
/** A Fory [MapSerializer] for [PersistentMap]. */
28+
class PersistentMapSerializer(fory: Fory, type: Class<PersistentMap<*, *>>) :
29+
MapSerializer<PersistentMap<*, *>>(fory, type) {
2530

2631
@Suppress("UNCHECKED_CAST")
2732
override fun onMapRead(map: Map<*, *>): PersistentMap<*, *> {

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/ValueOrCompletion.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ import kotlinx.coroutines.flow.transformWhile
2525
sealed interface ValueOrCompletion<out T> {
2626

2727
@Suppress("UNCHECKED_CAST")
28-
suspend fun <R> map(block: suspend (T) -> R): ValueOrCompletion<R> =
29-
this as ValueOrCompletion<R>
28+
suspend fun <R> map(block: suspend (T) -> R): ValueOrCompletion<R> = this as ValueOrCompletion<R>
3029

3130
class Value<out T>(val value: T) : ValueOrCompletion<T> {
3231
operator fun component1(): T = value

util/src/test/kotlin/com/caplin/integration/datasourcex/util/flow/FlowMapSerializationTest.kt

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@ import org.apache.fory.config.Language
1414

1515
class FlowMapSerializationTest :
1616
FunSpec({
17-
1817
context("Jackson Serialization") {
19-
val mapper: ObjectMapper = jacksonObjectMapper()
20-
.registerDataSourceModule()
18+
val mapper: ObjectMapper = jacksonObjectMapper().registerDataSourceModule()
2119

2220
test("serialize and deserialize InitialState") {
2321
val map = mutableFlowMapOf("1" to "A", "2" to "B")
@@ -26,7 +24,10 @@ class FlowMapSerializationTest :
2624

2725
val json = mapper.writeValueAsString(initialState)
2826
val deserialized: FlowMapStreamEvent<String, String> =
29-
mapper.readValue(json, object : TypeReference<FlowMapStreamEvent<String, String>>() {})
27+
mapper.readValue(
28+
json,
29+
object : TypeReference<FlowMapStreamEvent<String, String>>() {},
30+
)
3031

3132
deserialized.shouldBeInstanceOf<FlowMapStreamEvent.InitialState<String, String>>()
3233
deserialized.map shouldContainExactly mapOf("1" to "A", "2" to "B")
@@ -38,7 +39,10 @@ class FlowMapSerializationTest :
3839

3940
val json = mapper.writeValueAsString(event)
4041
val deserialized: FlowMapStreamEvent<String, String> =
41-
mapper.readValue(json, object : TypeReference<FlowMapStreamEvent<String, String>>() {})
42+
mapper.readValue(
43+
json,
44+
object : TypeReference<FlowMapStreamEvent<String, String>>() {},
45+
)
4246

4347
deserialized.shouldBeInstanceOf<FlowMapStreamEvent.EventUpdate<String, String>>()
4448
deserialized.event.shouldBeInstanceOf<Upsert<String, String>>()
@@ -61,7 +65,10 @@ class FlowMapSerializationTest :
6165

6266
context("Apache Fory Serialization") {
6367
val fory =
64-
Fory.builder().withLanguage(Language.JAVA).requireClassRegistration(false).build()
68+
Fory.builder()
69+
.withLanguage(Language.JAVA)
70+
.requireClassRegistration(false)
71+
.build()
6572
.registerDataSourceSerializers()
6673

6774
test("serialize and deserialize InitialState (raw PersistentMap)") {

0 commit comments

Comments
 (0)