Skip to content

Commit 5d9868d

Browse files
committed
Avoid another copy on write
1 parent 08c3fcd commit 5d9868d

5 files changed

Lines changed: 55 additions & 43 deletions

File tree

util/src/main/kotlin/com/caplin/integration/datasourcex/util/flow/Timeout.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import java.time.Duration
66
import java.util.concurrent.TimeoutException
77
import kotlinx.coroutines.ExperimentalCoroutinesApi
88
import kotlinx.coroutines.channels.consumeEach
9+
import kotlinx.coroutines.channels.onSuccess
910
import kotlinx.coroutines.channels.produce
1011
import kotlinx.coroutines.flow.Flow
1112
import kotlinx.coroutines.flow.channelFlow
@@ -21,7 +22,7 @@ fun <T, R> Flow<T>.timeoutFirstOrDefault(millis: Long, default: () -> R): Flow<R
2122
val receiveChannel = produce { collect { send(it) } }
2223

2324
select {
24-
receiveChannel.onReceiveCatching { result -> result.getOrNull()?.let { send(it as R) } }
25+
receiveChannel.onReceiveCatching { result -> result.onSuccess { send(it as R) } }
2526
onTimeout(millis) { send(default()) }
2627
}
2728
receiveChannel.consumeEach { send(it as R) }
@@ -45,7 +46,7 @@ fun <T, R> Flow<T>.timeoutFirstOrDefault(millis: Long, default: R): Flow<R> =
4546
* If the upstream emits no first event within [duration] it will emit the event [default] followed
4647
* by all later emissions from the upstream.
4748
*/
48-
fun <T, R : Any?> Flow<T>.timeoutFirstOrDefault(duration: Duration, default: R): Flow<R> =
49+
fun <T, R> Flow<T>.timeoutFirstOrDefault(duration: Duration, default: R): Flow<R> =
4950
timeoutFirstOrDefault(duration.toMillis(), default)
5051

5152
/**

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/PersistentHashMapSerializer.kt

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,27 @@ import org.apache.fory.serializer.collection.MapSerializer
88

99
/** A Fory [MapSerializer] for [PersistentMap] (Hash implementation). */
1010
internal class PersistentHashMapSerializer(fory: Fory, type: Class<PersistentMap<*, *>>) :
11-
MapSerializer<PersistentMap<*, *>>(fory, type) {
11+
MapSerializer<PersistentMap<*, *>>(fory, type, true) {
1212

13-
override fun write(buffer: MemoryBuffer, value: PersistentMap<*, *>) {
14-
fory.writeRef(buffer, HashMap(value))
13+
override fun newMap(buffer: MemoryBuffer): MutableMap<*, *> {
14+
val numElements = buffer.readVarUint32Small7()
15+
setNumElements(numElements)
16+
val map = HashMap<Any?, Any?>(numElements)
17+
refResolver.reference(map)
18+
return map
1519
}
1620

17-
@Suppress("UNCHECKED_CAST")
18-
override fun read(buffer: MemoryBuffer): PersistentMap<*, *> {
19-
val map = fory.readRef(buffer) as Map<Any, Any>
20-
return map.toPersistentHashMap()
21+
override fun newMap(map: Map<*, *>): MutableMap<*, *> {
22+
return HashMap<Any?, Any?>(map.size)
2123
}
2224

23-
override fun newMap(buffer: MemoryBuffer): MutableMap<*, *> {
24-
return HashMap<Any, Any>()
25+
@Suppress("UNCHECKED_CAST")
26+
override fun onMapRead(map: Map<*, *>): PersistentMap<*, *> {
27+
return (map as Map<Any, Any>).toPersistentHashMap()
2528
}
2629

2730
@Suppress("UNCHECKED_CAST")
28-
override fun onMapRead(map: Map<*, *>): PersistentMap<*, *> {
31+
override fun onMapCopy(map: Map<*, *>): PersistentMap<*, *> {
2932
return (map as Map<Any, Any>).toPersistentHashMap()
3033
}
3134
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/PersistentHashSetSerializer.kt

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,27 @@ import org.apache.fory.serializer.collection.CollectionSerializer
88

99
/** A Fory [CollectionSerializer] for [PersistentSet] (Hash implementation). */
1010
internal class PersistentHashSetSerializer(fory: Fory, type: Class<PersistentSet<*>>) :
11-
CollectionSerializer<PersistentSet<*>>(fory, type) {
11+
CollectionSerializer<PersistentSet<*>>(fory, type, true) {
1212

13-
override fun write(buffer: MemoryBuffer, value: PersistentSet<*>) {
14-
fory.writeRef(buffer, HashSet(value))
13+
override fun newCollection(buffer: MemoryBuffer): MutableCollection<*> {
14+
val numElements = buffer.readVarUint32Small7()
15+
setNumElements(numElements)
16+
val set = HashSet<Any?>(numElements)
17+
refResolver.reference(set)
18+
return set
1519
}
1620

17-
@Suppress("UNCHECKED_CAST")
18-
override fun read(buffer: MemoryBuffer): PersistentSet<*> {
19-
val collection = fory.readRef(buffer) as Collection<Any>
20-
return collection.toPersistentHashSet()
21+
override fun newCollection(collection: Collection<*>): MutableCollection<*> {
22+
return HashSet<Any?>(collection.size)
2123
}
2224

23-
override fun newCollection(buffer: MemoryBuffer): MutableCollection<*> {
24-
return HashSet<Any>()
25+
@Suppress("UNCHECKED_CAST")
26+
override fun onCollectionRead(collection: Collection<*>): PersistentSet<*> {
27+
return (collection as Collection<Any>).toPersistentHashSet()
2528
}
2629

2730
@Suppress("UNCHECKED_CAST")
28-
override fun onCollectionRead(collection: Collection<*>): PersistentSet<*> {
31+
fun onCollectionCopy(collection: Collection<*>): PersistentSet<*> {
2932
return (collection as Collection<Any>).toPersistentHashSet()
3033
}
3134
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/PersistentOrderedMapSerializer.kt

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,27 @@ import org.apache.fory.serializer.collection.MapSerializer
88

99
/** A Fory [MapSerializer] for [PersistentMap] (Ordered implementation). */
1010
internal class PersistentOrderedMapSerializer(fory: Fory, type: Class<PersistentMap<*, *>>) :
11-
MapSerializer<PersistentMap<*, *>>(fory, type) {
11+
MapSerializer<PersistentMap<*, *>>(fory, type, true) {
1212

13-
override fun write(buffer: MemoryBuffer, value: PersistentMap<*, *>) {
14-
fory.writeRef(buffer, LinkedHashMap(value))
13+
override fun newMap(buffer: MemoryBuffer): MutableMap<*, *> {
14+
val numElements = buffer.readVarUint32Small7()
15+
setNumElements(numElements)
16+
val map = LinkedHashMap<Any?, Any?>(numElements)
17+
refResolver.reference(map)
18+
return map
1519
}
1620

17-
@Suppress("UNCHECKED_CAST")
18-
override fun read(buffer: MemoryBuffer): PersistentMap<*, *> {
19-
val map = fory.readRef(buffer) as Map<Any, Any>
20-
return map.toPersistentMap()
21+
override fun newMap(map: Map<*, *>): MutableMap<*, *> {
22+
return LinkedHashMap<Any?, Any?>(map.size)
2123
}
2224

23-
override fun newMap(buffer: MemoryBuffer): MutableMap<*, *> {
24-
return LinkedHashMap<Any, Any>()
25+
@Suppress("UNCHECKED_CAST")
26+
override fun onMapRead(map: Map<*, *>): PersistentMap<*, *> {
27+
return (map as Map<Any, Any>).toPersistentMap()
2528
}
2629

2730
@Suppress("UNCHECKED_CAST")
28-
override fun onMapRead(map: Map<*, *>): PersistentMap<*, *> {
29-
// toPersistentMap() on a LinkedHashMap preserves order in kotlinx-collections-immutable
31+
override fun onMapCopy(map: Map<*, *>): PersistentMap<*, *> {
3032
return (map as Map<Any, Any>).toPersistentMap()
3133
}
3234
}

util/src/main/kotlin/com/caplin/integration/datasourcex/util/serialization/fory/PersistentOrderedSetSerializer.kt

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,27 @@ import org.apache.fory.serializer.collection.CollectionSerializer
88

99
/** A Fory [CollectionSerializer] for [PersistentSet] (Ordered implementation). */
1010
internal class PersistentOrderedSetSerializer(fory: Fory, type: Class<PersistentSet<*>>) :
11-
CollectionSerializer<PersistentSet<*>>(fory, type) {
11+
CollectionSerializer<PersistentSet<*>>(fory, type, true) {
1212

13-
override fun write(buffer: MemoryBuffer, value: PersistentSet<*>) {
14-
fory.writeRef(buffer, LinkedHashSet(value))
13+
override fun newCollection(buffer: MemoryBuffer): MutableCollection<*> {
14+
val numElements = buffer.readVarUint32Small7()
15+
setNumElements(numElements)
16+
val set = LinkedHashSet<Any?>(numElements)
17+
refResolver.reference(set)
18+
return set
1519
}
1620

17-
@Suppress("UNCHECKED_CAST")
18-
override fun read(buffer: MemoryBuffer): PersistentSet<*> {
19-
val collection = fory.readRef(buffer) as Collection<Any>
20-
return collection.toPersistentSet()
21+
override fun newCollection(collection: Collection<*>): MutableCollection<*> {
22+
return LinkedHashSet<Any?>(collection.size)
2123
}
2224

23-
override fun newCollection(buffer: MemoryBuffer): MutableCollection<*> {
24-
return LinkedHashSet<Any>()
25+
@Suppress("UNCHECKED_CAST")
26+
override fun onCollectionRead(collection: Collection<*>): PersistentSet<*> {
27+
return (collection as Collection<Any>).toPersistentSet()
2528
}
2629

2730
@Suppress("UNCHECKED_CAST")
28-
override fun onCollectionRead(collection: Collection<*>): PersistentSet<*> {
31+
fun onCollectionCopy(collection: Collection<*>): PersistentSet<*> {
2932
return (collection as Collection<Any>).toPersistentSet()
3033
}
3134
}

0 commit comments

Comments
 (0)