Skip to content

Commit cf51b4d

Browse files
committed
Allow exception types to be preserved when serializing ValueOrCompletion objects
1 parent b49132a commit cf51b4d

3 files changed

Lines changed: 61 additions & 12 deletions

File tree

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/DataSourceModule.kt

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,13 @@ import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion
88
import org.apache.fory.Fory
99

1010
/** Registers serializers for internal types with the provided [Fory] instance. */
11-
fun Fory.registerDataSourceSerializers(): Fory = apply {
11+
@Suppress("UNCHECKED_CAST")
12+
fun Fory.registerDataSourceSerializers(preserveExceptionTypes: Boolean = false): Fory = apply {
13+
if (preserveExceptionTypes)
14+
check(config.trackingRef()) {
15+
"Tracking references must be enabled for exception types preservation"
16+
}
17+
1218
// Register serializers for FlowMapStreamEvent value classes
1319
registerSerializer(FlowMapStreamEvent::class.java, FlowMapStreamEventSerializer::class.java)
1420
registerSerializer(
@@ -24,7 +30,15 @@ fun Fory.registerDataSourceSerializers(): Fory = apply {
2430
registerSerializer(MapEvent::class.java, MapEventSerializer::class.java)
2531
registerSerializer(SimpleMapEvent::class.java, SimpleMapEventSerializer::class.java)
2632
registerSerializer(SetEvent::class.java, SetEventSerializer::class.java)
27-
registerSerializer(ValueOrCompletion::class.java, ValueOrCompletionSerializer::class.java)
33+
34+
registerSerializer(
35+
ValueOrCompletion::class.java,
36+
ValueOrCompletionSerializer(
37+
this,
38+
ValueOrCompletion::class.java,
39+
preserveExceptionTypes,
40+
),
41+
)
2842
}
2943

3044
fun Fory.registerPersistentCollectionSerializers(): Fory = apply {

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/ValueOrCompletionSerializer.kt

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import org.apache.fory.Fory
55
import org.apache.fory.memory.MemoryBuffer
66
import org.apache.fory.serializer.Serializer
77

8-
internal class ValueOrCompletionSerializer(fory: Fory, type: Class<ValueOrCompletion<*>>) :
9-
Serializer<ValueOrCompletion<*>>(fory, type) {
8+
internal class ValueOrCompletionSerializer(
9+
fory: Fory,
10+
type: Class<ValueOrCompletion<*>>,
11+
private val preserveExceptionTypes: Boolean = true,
12+
) : Serializer<ValueOrCompletion<*>>(fory, type) {
1013

1114
private enum class Type {
1215
VALUE,
@@ -21,8 +24,12 @@ internal class ValueOrCompletionSerializer(fory: Fory, type: Class<ValueOrComple
2124
}
2225
is ValueOrCompletion.Completion -> {
2326
buffer.writeByte(Type.COMPLETION.ordinal.toByte())
24-
val message = value.throwable?.message ?: value.throwable?.toString()
25-
fory.writeRef(buffer, message)
27+
if (preserveExceptionTypes) {
28+
fory.writeRef(buffer, value.throwable)
29+
} else {
30+
val message = value.throwable?.message ?: value.throwable?.toString()
31+
fory.writeRef(buffer, message)
32+
}
2633
}
2734
}
2835
}
@@ -34,8 +41,13 @@ internal class ValueOrCompletionSerializer(fory: Fory, type: Class<ValueOrComple
3441
ValueOrCompletion.Value(value)
3542
}
3643
Type.COMPLETION -> {
37-
val message = fory.readRef(buffer) as String?
38-
ValueOrCompletion.Completion(message?.let { RuntimeException(it) })
44+
if (preserveExceptionTypes) {
45+
val throwable = fory.readRef(buffer) as Throwable?
46+
ValueOrCompletion.Completion(throwable)
47+
} else {
48+
val message = fory.readRef(buffer) as String?
49+
ValueOrCompletion.Completion(message?.let { RuntimeException(it) })
50+
}
3951
}
4052
}
4153
}

util/src/test/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/ValueOrCompletionSerializationTest.kt

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,27 @@ package com.caplin.integration.datasourcex.util.serialization.fory
33
import com.caplin.integration.datasourcex.util.flow.ValueOrCompletion
44
import io.kotest.core.spec.style.FunSpec
55
import io.kotest.matchers.shouldBe
6+
import io.kotest.matchers.types.shouldBeInstanceOf
67
import org.apache.fory.Fory
78
import org.apache.fory.config.Language
89

10+
class CustomException(val customMessage: String) : Exception(customMessage)
11+
912
class ValueOrCompletionSerializationTest :
1013
FunSpec({
1114
val fory =
1215
Fory.builder()
1316
.withLanguage(Language.JAVA)
1417
.requireClassRegistration(false)
18+
.withRefTracking(true)
19+
.build()
20+
.registerDataSourceSerializers(preserveExceptionTypes = true)
21+
22+
val foryNoType =
23+
Fory.builder()
24+
.withLanguage(Language.JAVA)
25+
.requireClassRegistration(false)
26+
.withRefTracking(false)
1527
.build()
1628
.registerDataSourceSerializers()
1729

@@ -37,11 +49,22 @@ class ValueOrCompletionSerializationTest :
3749
deserialized shouldBe event
3850
}
3951

40-
test("Completion") {
41-
val event: ValueOrCompletion.Completion = ValueOrCompletion.Completion(null)
52+
test("Completion with custom exception (preserved type)") {
53+
val event: ValueOrCompletion.Completion =
54+
ValueOrCompletion.Completion(CustomException("aah"))
4255
val bytes = fory.serialize(event)
43-
val deserialized = fory.deserialize(bytes)
44-
deserialized shouldBe event
56+
val deserialized = fory.deserialize(bytes) as ValueOrCompletion.Completion
57+
val throwable = deserialized.throwable.shouldBeInstanceOf<CustomException>()
58+
throwable.customMessage shouldBe "aah"
59+
}
60+
61+
test("Completion with exception (no type)") {
62+
val event: ValueOrCompletion.Completion =
63+
ValueOrCompletion.Completion(IllegalStateException("aah"))
64+
val bytes = foryNoType.serialize(event)
65+
val deserialized = foryNoType.deserialize(bytes) as ValueOrCompletion.Completion
66+
deserialized.throwable.shouldBeInstanceOf<RuntimeException>()
67+
deserialized.throwable?.message shouldBe "aah"
4568
}
4669
}
4770
})

0 commit comments

Comments
 (0)