Skip to content

Commit f629a46

Browse files
committed
Merge branch 'flatMapLatestAndMerge-fix'
2 parents 9c44a29 + cf51b4d commit f629a46

3 files changed

Lines changed: 97 additions & 46 deletions

File tree

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModule.kt

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion
88
import org.apache.fory.Fory
99

1010
/** Registers serializers for internal types with the provided [Fory] instance. */
11-
fun Fory.registerDataSourceSerializers(): Fory = apply {
11+
@Suppress("UNCHECKED_CAST")
12+
fun Fory.registerDataSourceSerializers(preserveExceptionTypes: Boolean = false): Fory = apply {
13+
if (preserveExceptionTypes)
14+
check(config.trackingRef()) {
15+
"Tracking references must be enabled for exception types preservation"
16+
}
17+
1218
// Register serializers for FlowMapStreamEvent value classes
1319
registerSerializer(FlowMapStreamEvent::class.java, FlowMapStreamEventSerializer::class.java)
1420
registerSerializer(
@@ -24,7 +30,15 @@ fun Fory.registerDataSourceSerializers(): Fory = apply {
2430
registerSerializer(MapEvent::class.java, MapEventSerializer::class.java)
2531
registerSerializer(SimpleMapEvent::class.java, SimpleMapEventSerializer::class.java)
2632
registerSerializer(SetEvent::class.java, SetEventSerializer::class.java)
27-
registerSerializer(ValueOrCompletion::class.java, ValueOrCompletionSerializer::class.java)
33+
34+
registerSerializer(
35+
ValueOrCompletion::class.java,
36+
ValueOrCompletionSerializer(
37+
this,
38+
ValueOrCompletion::class.java,
39+
preserveExceptionTypes,
40+
),
41+
)
2842
}
2943

3044
fun Fory.registerPersistentCollectionSerializers(): Fory = apply {

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/ValueOrCompletionSerializer.kt

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import org.apache.fory.Fory
55
import org.apache.fory.memory.MemoryBuffer
66
import org.apache.fory.serializer.Serializer
77

8-
internal class ValueOrCompletionSerializer(fory: Fory, type: Class<ValueOrCompletion<*>>) :
9-
Serializer<ValueOrCompletion<*>>(fory, type) {
8+
internal class ValueOrCompletionSerializer(
9+
fory: Fory,
10+
type: Class<ValueOrCompletion<*>>,
11+
private val preserveExceptionTypes: Boolean = true,
12+
) : Serializer<ValueOrCompletion<*>>(fory, type) {
1013

1114
private enum class Type {
1215
VALUE,
@@ -21,8 +24,12 @@ internal class ValueOrCompletionSerializer(fory: Fory, type: Class<ValueOrComple
2124
}
2225
is ValueOrCompletion.Completion -> {
2326
buffer.writeByte(Type.COMPLETION.ordinal.toByte())
24-
val message = value.throwable?.message ?: value.throwable?.toString()
25-
fory.writeRef(buffer, message)
27+
if (preserveExceptionTypes) {
28+
fory.writeRef(buffer, value.throwable)
29+
} else {
30+
val message = value.throwable?.message ?: value.throwable?.toString()
31+
fory.writeRef(buffer, message)
32+
}
2633
}
2734
}
2835
}
@@ -34,8 +41,13 @@ internal class ValueOrCompletionSerializer(fory: Fory, type: Class<ValueOrComple
3441
ValueOrCompletion.Value(value)
3542
}
3643
Type.COMPLETION -> {
37-
val message = fory.readRef(buffer) as String?
38-
ValueOrCompletion.Completion(message?.let { RuntimeException(it) })
44+
if (preserveExceptionTypes) {
45+
val throwable = fory.readRef(buffer) as Throwable?
46+
ValueOrCompletion.Completion(throwable)
47+
} else {
48+
val message = fory.readRef(buffer) as String?
49+
ValueOrCompletion.Completion(message?.let { RuntimeException(it) })
50+
}
3951
}
4052
}
4153
}

util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/ValueOrCompletionSerializationTest.kt

Lines changed: 63 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,45 +3,70 @@ package com.caplin.integration.datasourcex.util.serialization.fory
33
import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion
44
import io.kotest.core.spec.style.FunSpec
55
import io.kotest.matchers.shouldBe
6+
import io.kotest.matchers.types.shouldBeInstanceOf
67
import org.apache.fory.Fory
78
import org.apache.fory.config.Language
89

10+
class CustomException(val customMessage: String) : Exception(customMessage)
11+
912
class ValueOrCompletionSerializationTest :
10-
FunSpec({
11-
val fory =
12-
Fory.builder()
13-
.withLanguage(Language.JAVA)
14-
.requireClassRegistration(false)
15-
.build()
16-
.registerDataSourceSerializers()
17-
18-
test("Value") {
19-
val event = ValueOrCompletion.Value("value")
20-
val bytes = fory.serialize(event)
21-
val deserialized = fory.deserialize(bytes)
22-
deserialized shouldBe event
23-
}
24-
25-
test("Completion") {
26-
val event = ValueOrCompletion.Completion(null)
27-
val bytes = fory.serialize(event)
28-
val deserialized = fory.deserialize(bytes)
29-
deserialized shouldBe event
30-
}
31-
32-
context("Value and Completion specifically") {
33-
test("Value") {
34-
val event: ValueOrCompletion.Value<String> = ValueOrCompletion.Value("value")
35-
val bytes = fory.serialize(event)
36-
val deserialized = fory.deserialize(bytes)
37-
deserialized shouldBe event
38-
}
39-
40-
test("Completion") {
41-
val event: ValueOrCompletion.Completion = ValueOrCompletion.Completion(null)
42-
val bytes = fory.serialize(event)
43-
val deserialized = fory.deserialize(bytes)
44-
deserialized shouldBe event
45-
}
46-
}
47-
})
13+
FunSpec(
14+
{
15+
val fory =
16+
Fory.builder()
17+
.withLanguage(Language.JAVA)
18+
.requireClassRegistration(false)
19+
.withRefTracking(true)
20+
.build()
21+
.registerDataSourceSerializers(preserveExceptionTypes = true)
22+
23+
val foryNoType =
24+
Fory.builder()
25+
.withLanguage(Language.JAVA)
26+
.requireClassRegistration(false)
27+
.withRefTracking(false)
28+
.build()
29+
.registerDataSourceSerializers()
30+
31+
test("Value") {
32+
val event = ValueOrCompletion.Value("value")
33+
val bytes = fory.serialize(event)
34+
val deserialized = fory.deserialize(bytes)
35+
deserialized shouldBe event
36+
}
37+
38+
test("Completion") {
39+
val event = ValueOrCompletion.Completion(null)
40+
val bytes = fory.serialize(event)
41+
val deserialized = fory.deserialize(bytes)
42+
deserialized shouldBe event
43+
}
44+
45+
context("Value and Completion specifically") {
46+
test("Value") {
47+
val event: ValueOrCompletion.Value<String> = ValueOrCompletion.Value("value")
48+
val bytes = fory.serialize(event)
49+
val deserialized = fory.deserialize(bytes)
50+
deserialized shouldBe event
51+
}
52+
53+
test("Completion with custom exception (preserved type)") {
54+
val event: ValueOrCompletion.Completion =
55+
ValueOrCompletion.Completion(CustomException("aah"))
56+
val bytes = fory.serialize(event)
57+
val deserialized = fory.deserialize(bytes) as ValueOrCompletion.Completion
58+
val throwable = deserialized.throwable.shouldBeInstanceOf<CustomException>()
59+
throwable.customMessage shouldBe "aah"
60+
}
61+
62+
test("Completion with exception (no type)") {
63+
val event: ValueOrCompletion.Completion =
64+
ValueOrCompletion.Completion(IllegalStateException("aah"))
65+
val bytes = foryNoType.serialize(event)
66+
val deserialized = foryNoType.deserialize(bytes) as ValueOrCompletion.Completion
67+
deserialized.throwable.shouldBeInstanceOf<RuntimeException>()
68+
deserialized.throwable.message shouldBe "aah"
69+
}
70+
}
71+
},
72+
)

0 commit comments

Comments
 (0)