Skip to content

Commit 4e34a9d

Browse files
committed
Align ALP wire format with C++ Arrow for interop compatibility
Switch encode/decode from division-based formula to multiply-by-reciprocal using separate POW10_NEGATIVE arrays, matching C++ Arrow's approach: - Encode: fastRound(value * POW10[e] * POW10_NEGATIVE[f]) - Decode: encoded * POW10[f] * POW10_NEGATIVE[e] Add fastRound helpers with sign branching for correct negative value rounding. Remove version byte from page header (8 -> 7 bytes). Empty pages now emit a 7-byte header with numElements=0. Update all hand-crafted binary tests to match the new header format and add comprehensive end-to-end tests for overflow boundaries, large-scale data, preset caching, and NaN bit-pattern preservation.
1 parent 6d65eaa commit 4e34a9d

7 files changed

Lines changed: 390 additions & 196 deletions

File tree

parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@ private AlpConstants() {
3838
}
3939

4040
// Page header fields
41-
public static final int ALP_VERSION = 1;
4241
public static final int ALP_COMPRESSION_MODE = 0;
4342
public static final int ALP_INTEGER_ENCODING_FOR = 0;
44-
public static final int ALP_HEADER_SIZE = 8;
43+
public static final int ALP_HEADER_SIZE = 7;
4544

4645
public static final int DEFAULT_VECTOR_SIZE = 1024;
4746
public static final int DEFAULT_VECTOR_SIZE_LOG = 10;
@@ -67,12 +66,26 @@ private AlpConstants() {
6766
public static final int FLOAT_FOR_INFO_SIZE = 5; // frame_of_reference(4) + bit_width(1)
6867
public static final int DOUBLE_FOR_INFO_SIZE = 9; // frame_of_reference(8) + bit_width(1)
6968

69+
// POWERS_OF_TEN: positive powers used for scaling up during encode/decode.
70+
// Encode: fastRound(value * POW10[e] * POW10_NEGATIVE[f])
71+
// Decode: encoded * POW10[f] * POW10_NEGATIVE[e]
7072
static final float[] FLOAT_POW10 = {1e0f, 1e1f, 1e2f, 1e3f, 1e4f, 1e5f, 1e6f, 1e7f, 1e8f, 1e9f, 1e10f};
7173

7274
static final double[] DOUBLE_POW10 = {
7375
1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18
7476
};
7577

78+
// NEGATIVE_POWERS_OF_TEN: reciprocals used for scaling down (multiply-by-reciprocal).
79+
// Using separate negative-power arrays instead of division ensures C++ wire compatibility.
80+
static final float[] FLOAT_POW10_NEGATIVE = {
81+
1e0f, 1e-1f, 1e-2f, 1e-3f, 1e-4f, 1e-5f, 1e-6f, 1e-7f, 1e-8f, 1e-9f, 1e-10f
82+
};
83+
84+
static final double[] DOUBLE_POW10_NEGATIVE = {
85+
1e0, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6, 1e-7, 1e-8, 1e-9, 1e-10, 1e-11, 1e-12, 1e-13, 1e-14, 1e-15, 1e-16,
86+
1e-17, 1e-18
87+
};
88+
7689
static final int FLOAT_NEGATIVE_ZERO_BITS = 0x80000000;
7790
static final long DOUBLE_NEGATIVE_ZERO_BITS = 0x8000000000000000L;
7891

parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java

Lines changed: 40 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@
2727
* then applying Frame of Reference encoding and bit-packing.
2828
* Values that cannot be losslessly converted are stored as exceptions.
2929
*
30-
* <p>Encoding formula: encoded = round(value * 10^(exponent - factor))
31-
* <p>Decoding formula: value = encoded / 10^(exponent - factor)
30+
* <p>Encoding formula: encoded = fastRound(value * POW10[e] * POW10_NEGATIVE[f])
31+
* <p>Decoding formula: value = encoded * POW10[f] * POW10_NEGATIVE[e]
32+
*
33+
* <p>The order of operations is critical for IEEE 754 correctness. Both formulas must
34+
* be evaluated as single expressions — storing the intermediate multiplication result
35+
* in a variable before the second multiply changes IEEE 754 rounding and produces extra
36+
* exceptions. Uses multiply-by-reciprocal (via POW10_NEGATIVE) for C++ wire compatibility.
3237
*
3338
* <p>Exception conditions:
3439
* <ul>
@@ -41,26 +46,15 @@
4146
*/
4247
final class AlpEncoderDecoder {
4348

49+
private static final double ENCODING_UPPER_LIMIT = 9223372036854774784.0;
50+
private static final double ENCODING_LOWER_LIMIT = -9223372036854774784.0;
51+
private static final float FLOAT_ENCODING_UPPER_LIMIT = 2147483520.0f;
52+
private static final float FLOAT_ENCODING_LOWER_LIMIT = -2147483520.0f;
53+
4454
private AlpEncoderDecoder() {
4555
// Utility class
4656
}
4757

48-
static float getFloatMultiplier(int exponent, int factor) {
49-
float multiplier = FLOAT_POW10[exponent];
50-
if (factor > 0) {
51-
multiplier /= FLOAT_POW10[factor];
52-
}
53-
return multiplier;
54-
}
55-
56-
static double getDoubleMultiplier(int exponent, int factor) {
57-
double multiplier = DOUBLE_POW10[exponent];
58-
if (factor > 0) {
59-
multiplier /= DOUBLE_POW10[factor];
60-
}
61-
return multiplier;
62-
}
63-
6458
/** NaN, Inf, and -0.0 can never be encoded regardless of exponent/factor. */
6559
static boolean isFloatException(float value) {
6660
if (Float.isNaN(value)) {
@@ -77,27 +71,17 @@ static boolean isFloatException(float value, int exponent, int factor) {
7771
if (isFloatException(value)) {
7872
return true;
7973
}
80-
float multiplier = getFloatMultiplier(exponent, factor);
81-
float scaled = value * multiplier;
82-
if (scaled > Integer.MAX_VALUE || scaled < Integer.MIN_VALUE) {
74+
// Check before rounding: overflow or non-finite after scaling
75+
float scaled = value * FLOAT_POW10[exponent] * FLOAT_POW10_NEGATIVE[factor];
76+
if (!Float.isFinite(scaled) || scaled > FLOAT_ENCODING_UPPER_LIMIT || scaled < FLOAT_ENCODING_LOWER_LIMIT) {
8377
return true;
8478
}
8579
int encoded = encodeFloat(value, exponent, factor);
8680
float decoded = decodeFloat(encoded, exponent, factor);
8781
return Float.floatToRawIntBits(value) != Float.floatToRawIntBits(decoded);
8882
}
8983

90-
/** Encode: round(value * 10^exponent / 10^factor) */
91-
static int encodeFloat(float value, int exponent, int factor) {
92-
return fastRoundFloat(value * getFloatMultiplier(exponent, factor));
93-
}
94-
95-
/** Decode: encoded / 10^exponent * 10^factor */
96-
static float decodeFloat(int encoded, int exponent, int factor) {
97-
return encoded / getFloatMultiplier(exponent, factor);
98-
}
99-
100-
// Uses the 2^22+2^23 magic-number trick to round without branching on the FPU.
84+
/** Round float to nearest integer using magic-number trick with sign branching. */
10185
static int fastRoundFloat(float value) {
10286
if (value >= 0) {
10387
return (int) ((value + MAGIC_FLOAT) - MAGIC_FLOAT);
@@ -106,6 +90,16 @@ static int fastRoundFloat(float value) {
10690
}
10791
}
10892

93+
/** Encode: fastRound(value * POW10[e] * POW10_NEGATIVE[f]) — single expression. */
94+
static int encodeFloat(float value, int exponent, int factor) {
95+
return fastRoundFloat(value * FLOAT_POW10[exponent] * FLOAT_POW10_NEGATIVE[factor]);
96+
}
97+
98+
/** Decode: encoded * POW10[f] * POW10_NEGATIVE[e] — single expression. */
99+
static float decodeFloat(int encoded, int exponent, int factor) {
100+
return encoded * FLOAT_POW10[factor] * FLOAT_POW10_NEGATIVE[exponent];
101+
}
102+
109103
static boolean isDoubleException(double value) {
110104
if (Double.isNaN(value)) {
111105
return true;
@@ -120,25 +114,17 @@ static boolean isDoubleException(double value, int exponent, int factor) {
120114
if (isDoubleException(value)) {
121115
return true;
122116
}
123-
double multiplier = getDoubleMultiplier(exponent, factor);
124-
double scaled = value * multiplier;
125-
if (scaled > Long.MAX_VALUE || scaled < Long.MIN_VALUE) {
117+
// Check before rounding: overflow or non-finite after scaling
118+
double scaled = value * DOUBLE_POW10[exponent] * DOUBLE_POW10_NEGATIVE[factor];
119+
if (!Double.isFinite(scaled) || scaled > ENCODING_UPPER_LIMIT || scaled < ENCODING_LOWER_LIMIT) {
126120
return true;
127121
}
128122
long encoded = encodeDouble(value, exponent, factor);
129123
double decoded = decodeDouble(encoded, exponent, factor);
130124
return Double.doubleToRawLongBits(value) != Double.doubleToRawLongBits(decoded);
131125
}
132126

133-
static long encodeDouble(double value, int exponent, int factor) {
134-
return fastRoundDouble(value * getDoubleMultiplier(exponent, factor));
135-
}
136-
137-
static double decodeDouble(long encoded, int exponent, int factor) {
138-
return encoded / getDoubleMultiplier(exponent, factor);
139-
}
140-
141-
// Same trick but with 2^51+2^52 for double precision.
127+
/** Round double to nearest integer using magic-number trick with sign branching. */
142128
static long fastRoundDouble(double value) {
143129
if (value >= 0) {
144130
return (long) ((value + MAGIC_DOUBLE) - MAGIC_DOUBLE);
@@ -147,6 +133,16 @@ static long fastRoundDouble(double value) {
147133
}
148134
}
149135

136+
/** Encode: fastRound(value * POW10[e] * POW10_NEGATIVE[f]) — single expression. */
137+
static long encodeDouble(double value, int exponent, int factor) {
138+
return fastRoundDouble(value * DOUBLE_POW10[exponent] * DOUBLE_POW10_NEGATIVE[factor]);
139+
}
140+
141+
/** Decode: encoded * POW10[f] * POW10_NEGATIVE[e] — single expression. */
142+
static double decodeDouble(long encoded, int exponent, int factor) {
143+
return encoded * DOUBLE_POW10[factor] * DOUBLE_POW10_NEGATIVE[exponent];
144+
}
145+
150146
/** Number of bits needed to represent maxDelta as an unsigned value. */
151147
static int bitWidthForInt(int maxDelta) {
152148
if (maxDelta == 0) {

parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* <pre>
3535
* ┌─────────┬──────────────────────┬──────────────┬──────────────┬─────┐
3636
* │ Header │ Offset Array │ Vector 0 │ Vector 1 │ ... │
37-
* │ 8 bytes │ 4B &times; numVectors │ (interleaved)│ (interleaved)│ │
37+
* │ 7 bytes │ 4B &times; numVectors │ (interleaved)│ (interleaved)│ │
3838
* └─────────┴──────────────────────┴──────────────┴──────────────┴─────┘
3939
* </pre>
4040
*
@@ -63,15 +63,11 @@ abstract class AlpValuesReader extends ValuesReader {
6363
public void initFromPage(int valuesCount, ByteBufferInputStream stream)
6464
throws ParquetDecodingException, IOException {
6565
ByteBuffer headerBuf = stream.slice(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN);
66-
int version = headerBuf.get() & 0xFF;
6766
int compressionMode = headerBuf.get() & 0xFF;
6867
int integerEncoding = headerBuf.get() & 0xFF;
6968
int logVectorSize = headerBuf.get() & 0xFF;
7069
int numElements = headerBuf.getInt();
7170

72-
if (version != ALP_VERSION) {
73-
throw new ParquetDecodingException("Unsupported ALP version: " + version + ", expected " + ALP_VERSION);
74-
}
7571
if (compressionMode != ALP_COMPRESSION_MODE) {
7672
throw new ParquetDecodingException("Unsupported ALP compression mode: " + compressionMode);
7773
}

0 commit comments

Comments
 (0)