Skip to content

Commit 4bfdfb9

Browse files
committed
Drop native java serialization support, add fory and jackson serializers, and add benchmarks
1 parent f612425 commit 4bfdfb9

19 files changed

Lines changed: 550 additions & 236 deletions

File tree

.idea/compiler.xml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gradle/libs.versions.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ springBoot = "3.5.10"
33
kotlin = "1.9.25" # Keep in sync with Spring - https://docs.spring.io/spring-boot/docs/current/reference/html/dependency-versions.html
44
kotlinCollectionsImmutable = "0.3.8"
55
jsonPatch = "1.13"
6+
jmh = "1.37"
7+
jmh-plugin = "0.7.3"
68
kotlinpoet = "2.2.0"
79
ktfmt = "0.61"
10+
fory = "0.15.0"
811

912
# Caplin Dependencies
1013
datasource = "8.0.10-1695-5ddb3798"
@@ -28,6 +31,10 @@ vanniktech-maven-publish-plugin = "0.31.0"
2831
kotlin-collections-immutable = { module = "org.jetbrains.kotlinx:kotlinx-collections-immutable", version.ref = "kotlinCollectionsImmutable" }
2932
json-patch = { module = "com.github.java-json-tools:json-patch", version.ref = "jsonPatch" }
3033
kotlinpoet = { module = "com.squareup:kotlinpoet", version.ref = "kotlinpoet" }
34+
fory-core = { module = "org.apache.fory:fory-core", version.ref = "fory" }
35+
fory-kotlin = { module = "org.apache.fory:fory-kotlin", version.ref = "fory" }
36+
jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }
37+
jmh-generator = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" }
3138

3239
# Caplin Dependencies
3340
datasource = { module = "com.caplin.platform.integration.java:datasource", version.ref = "datasource" }
@@ -54,4 +61,5 @@ spring-boot-dependencies = { module = "org.springframework.boot:spring-boot-depe
5461
spring-boot-configuration-processor = { module = "org.springframework.boot:spring-boot-configuration-processor", version.ref = "springBoot" }
5562

5663
[plugins]
57-
spring-boot = { id = "org.springframework.boot", version.ref = "springBoot" }
64+
spring-boot = { id = "org.springframework.boot", version.ref = "springBoot" }
65+
jmh = { id = "me.champeau.jmh", version.ref = "jmh-plugin" }

util/build.gradle.kts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
plugins { `common-library` }
1+
plugins {
2+
`common-library`
3+
alias(libs.plugins.jmh)
4+
}
25

36
description = "Utility classes for DataSource extensions"
47

@@ -15,12 +18,22 @@ dependencies {
1518
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
1619
implementation(libs.kotlin.collections.immutable)
1720

21+
compileOnly(libs.fory.core)
22+
compileOnly(libs.fory.kotlin)
23+
1824
testRuntimeOnly("org.slf4j:slf4j-simple")
1925

2026
testImplementation("org.springframework:spring-core") // For testing the RegexPathMatcher
2127
testImplementation(libs.turbine)
2228
testImplementation(libs.kotest.assertions)
2329
testImplementation(libs.kotest.runner)
30+
testImplementation(libs.fory.core)
31+
testImplementation(libs.fory.kotlin)
32+
33+
jmh(libs.jmh.core)
34+
jmh(libs.jmh.generator)
2435
}
2536

37+
jmh { duplicateClassesStrategy.set(DuplicatesStrategy.EXCLUDE) }
38+
2639
dokka { dokkaSourceSets.configureEach { includes.from("README.md") } }
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package com.caplin.integration.datasourcex.util.flow
2+
3+
import com.caplin.integration.datasourcex.util.flow.MapEvent.EntryEvent.Upsert
4+
import com.caplin.integration.datasourcex.util.flow.MapEvent.Populated
5+
import java.util.concurrent.TimeUnit
6+
import kotlinx.coroutines.CoroutineScope
7+
import kotlinx.coroutines.Dispatchers
8+
import kotlinx.coroutines.cancel
9+
import kotlinx.coroutines.flow.flow
10+
import kotlinx.coroutines.flow.launchIn
11+
import kotlinx.coroutines.flow.take
12+
import kotlinx.coroutines.flow.takeWhile
13+
import kotlinx.coroutines.runBlocking
14+
import org.openjdk.jmh.annotations.*
15+
16+
/**
17+
* Benchmarks for [FlowMap] implementation, focusing on mutation performance, lookup efficiency, and
18+
* Flow-based state reconstruction.
19+
*/
20+
@State(Scope.Benchmark)
21+
@BenchmarkMode(Mode.Throughput)
22+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
23+
@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
24+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
25+
@Fork(1)
26+
open class FlowMapBenchmark {
27+
28+
private lateinit var flowMap: MutableFlowMap<String, String>
29+
30+
@Setup
31+
fun setup() {
32+
flowMap = mutableFlowMapOf()
33+
}
34+
35+
/**
36+
* Measures the throughput of single [MutableFlowMap.put] operations on an initially empty map.
37+
*/
38+
@Benchmark
39+
fun putSingle() {
40+
flowMap.put("key", "value")
41+
}
42+
43+
/**
44+
* Measures the throughput of a [MutableFlowMap.put] followed by a [MutableFlowMap.remove] on the
45+
* same key, exercising the event emission and state update logic.
46+
*/
47+
@Benchmark
48+
fun putAndRemove() {
49+
flowMap.put("key", "value")
50+
flowMap.remove("key")
51+
}
52+
53+
/**
54+
* Measures the throughput of [MutableFlowMap.putAll] with a small map, which triggers multiple
55+
* events in a single state update.
56+
*/
57+
@Benchmark
58+
fun putAllSmall() {
59+
flowMap.putAll(mapOf("1" to "A", "2" to "B", "3" to "C"))
60+
}
61+
62+
/** State holder for a [FlowMap] pre-populated with 1,000 entries. */
63+
@State(Scope.Benchmark)
64+
open class PopulatedFlowMapState {
65+
lateinit var flowMap: MutableFlowMap<String, Int>
66+
67+
@Setup
68+
fun setup() {
69+
flowMap = mutableFlowMapOf()
70+
repeat(1000) { flowMap.put("key$it", it) }
71+
}
72+
}
73+
74+
/** State holder for measuring mutation throughput with multiple active subscribers. */
75+
@State(Scope.Benchmark)
76+
open class ActiveSubscriberState {
77+
@Param("1", "10", "100") var subscriberCount: Int = 0
78+
79+
lateinit var flowMap: MutableFlowMap<String, String>
80+
lateinit var scope: CoroutineScope
81+
82+
@Setup
83+
fun setup() {
84+
flowMap = mutableFlowMapOf()
85+
// Using Dispatchers.Default for subscribers to simulate real-world processing
86+
scope = CoroutineScope(Dispatchers.Default)
87+
repeat(subscriberCount) { flowMap.asFlow().launchIn(scope) }
88+
}
89+
90+
@TearDown
91+
fun tearDown() {
92+
scope.cancel()
93+
}
94+
}
95+
96+
/**
97+
* Measures the throughput of [MutableFlowMap.put] when there are multiple active subscribers
98+
* collecting from the map. This identifies contention or overhead in the event dispatching logic.
99+
*/
100+
@Benchmark
101+
fun putWithSubscribers(state: ActiveSubscriberState) {
102+
state.flowMap.put("key", "value")
103+
}
104+
105+
/** Measures the throughput of retrieving a value from a large, pre-populated [FlowMap]. */
106+
@Benchmark
107+
fun getFromLargeMap(state: PopulatedFlowMapState): Int? {
108+
return state.flowMap["key500"]
109+
}
110+
111+
/**
112+
* Measures the time taken to collect the initial state of a large [FlowMap] via [FlowMap.asFlow].
113+
*/
114+
@Benchmark
115+
fun asFlowCollection(state: PopulatedFlowMapState) = runBlocking {
116+
state.flowMap
117+
.asFlow()
118+
.takeWhile { it != Populated }
119+
.collect {
120+
// just collect
121+
}
122+
}
123+
124+
/**
125+
* Measures the time taken to collect the initial state of a large [FlowMap] via
126+
* [FlowMap.asFlowWithState]. This avoids emitting individual Upsert events, making it much
127+
* faster.
128+
*/
129+
@Benchmark
130+
fun asFlowWithStateCollection(state: PopulatedFlowMapState) = runBlocking {
131+
state.flowMap.asFlowWithState().take(1).collect {
132+
// just collect
133+
}
134+
}
135+
136+
/**
137+
* Measures the time taken to collect the initial state of a large [FlowMap] via [FlowMap.asFlow]
138+
* when a predicate is applied, exercising the filtering logic within the flow.
139+
*/
140+
@Benchmark
141+
fun asFlowWithPredicateCollection(state: PopulatedFlowMapState) = runBlocking {
142+
state.flowMap
143+
.asFlow { _, value -> value % 2 == 0 }
144+
.takeWhile { it != Populated }
145+
.collect {
146+
// just collect
147+
}
148+
}
149+
150+
/**
151+
* Measures the overhead of reconstructing a [FlowMap] from a stream of events using
152+
* [toFlowMapIn].
153+
*/
154+
@Benchmark
155+
fun toFlowMapInBenchmark() = runBlocking {
156+
val events = flow {
157+
repeat(100) { emit(Upsert("key$it", null, it)) }
158+
emit(Populated)
159+
}
160+
val scope = CoroutineScope(Dispatchers.Default)
161+
events.toFlowMapIn(scope)
162+
scope.cancel()
163+
}
164+
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/SerializablePersistentMap.kt

Lines changed: 0 additions & 71 deletions
This file was deleted.

util/src/main/kotlin/com/caplin/integration/datasourcex/util/SerializablePersistentSet.kt

Lines changed: 0 additions & 61 deletions
This file was deleted.

util/src/main/kotlin/com/caplin/integration/datasourcex/util/SimpleDataSourceFactory.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.caplin.integration.datasourcex.util
22

33
import com.caplin.datasource.DataSource
44
import com.caplin.datasource.messaging.json.JacksonJsonHandler
5+
import com.caplin.integration.datasourcex.util.flow.registerDataSourceModule
56
import com.fasterxml.jackson.databind.ObjectMapper
67
import com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS
78
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
@@ -19,6 +20,7 @@ object SimpleDataSourceFactory {
1920
jacksonObjectMapper()
2021
.configure(WRITE_DATES_AS_TIMESTAMPS, false)
2122
.registerModule(JavaTimeModule())
23+
.registerDataSourceModule()
2224

2325
/**
2426
* Creates a data source based on the given simple configuration.

0 commit comments

Comments
 (0)