@@ -210,6 +210,7 @@ internal unsafe void ConsumeDirect(byte* record, int recordLength, long currentA
210210 ValidateSublogIndex ( physicalSublogIdx ) ;
211211 replicationManager . SetSublogReplicationOffset ( physicalSublogIdx , currentAddress ) ;
212212 var ptr = record ;
213+ var replicationOffset = currentAddress ;
213214 // logger?.LogError("[{physicalSublogIdx}] = {currentAddress} -> {nextAddress}", physicalSublogIdx, currentAddress, nextAddress);
214215 while ( ptr < record + recordLength )
215216 {
@@ -224,7 +225,7 @@ internal unsafe void ConsumeDirect(byte* record, int recordLength, long currentA
224225 if ( isCheckpointStart )
225226 {
226227 // This is safe to be updated in parallel given that each sublog replay taks will update its own slot with corresponding address of the checkpoint marker
227- replicationManager . ReplicationCheckpointStartOffset [ physicalSublogIdx ] = replicationManager . GetSublogReplicationOffset ( physicalSublogIdx ) ;
228+ replicationManager . ReplicationCheckpointStartOffset [ physicalSublogIdx ] = replicationOffset ;
228229 }
229230 entryLength += TsavoriteLog . UnsafeAlign ( payloadLength ) ;
230231 }
@@ -240,11 +241,13 @@ internal unsafe void ConsumeDirect(byte* record, int recordLength, long currentA
240241 entryLength += TsavoriteLog . UnsafeAlign ( - payloadLength ) ;
241242 }
242243 ptr += entryLength ;
243- replicationManager . IncrementSublogReplicationOffset ( physicalSublogIdx , entryLength ) ;
244+ replicationOffset += entryLength ;
244245 }
245246 // logger?.LogError("[{physicalSublogIdx}] = {currentAddress} -> {nextAddress}", physicalSublogIdx, currentAddress, nextAddress);
246247
247- if ( replicationManager . GetSublogReplicationOffset ( physicalSublogIdx ) != nextAddress )
248+ replicationManager . SetSublogReplicationOffset ( physicalSublogIdx , replicationOffset ) ;
249+
250+ if ( replicationOffset != nextAddress )
248251 {
249252 logger ? . LogError ( "ReplicaReplayTask.Consume NextAddress Mismatch sublogIdx: {sublogIdx}; recordLength:{recordLength}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; replicationOffset:{ReplicationOffset}" , physicalSublogIdx , recordLength , currentAddress , nextAddress , replicationManager . ReplicationOffset [ physicalSublogIdx ] ) ;
250253 throw new GarnetException ( "Failed validating integrity of replay" , LogLevel . Warning , clientResponse : false ) ;
0 commit comments