Location: crates/scanner-engine/src/engine/stream_decode.rs
The stream_decode module implements incremental, streaming decoding of transform output (Base64, URL-encoded data) while simultaneously scanning for pattern matches. Instead of decoding entire encoded spans into memory before scanning, this module:
- Decodes transformations in chunks via streaming decoder callbacks
- Maintains decoded bytes in a ring buffer to avoid full materialization
- Emits candidate pattern match windows via Vectorscan stream databases
- Re-decodes windows on-demand when they fall outside the ring buffer
- Falls back to full decode only on
force_fullsafety violations; truncation/error paths abort the current span
The module provides two primary paths:
- Streaming path (
decode_stream_and_scan): Preferred, memory-efficient streaming decode - Fallback path (
decode_span_fallback): Full decode when streaming is unsafe or unavailable
- Batch approach: Decodes entire encoded spans into temporary buffers, requiring O(n) memory for decoded output
- Streaming approach: Maintains a fixed-size ring buffer, reducing peak memory from unbounded to bounded
- Streaming processes decoded data as it arrives, enabling early pattern detection
- Eliminates waiting for full decode completion before scanning can begin
- Enables responsive handling of large encoded payloads
- Vectorscan provides native stream mode APIs (
scan_stream) designed for incremental input - Streaming path leverages this, maintaining per-pattern state across chunks
- Reduces duplicate work across overlapping chunks
- Per-transform and per-scan decode budget checks on every chunk
- Prevents runaway decode operations that would saturate resources
- Budget exhaustion aborts the current stream decode and drops that span's staged output
- For
Gate::AnchorsInDecodedBase64 transforms, a decoded-space gate stream is scanned alongside the main stream - Final gate enforcement can discard staged output when no anchor evidence is found
The streaming decoder processes encoded input through a callback-based pattern:
// From transform.rs
pub(super) fn stream_decode(
tc: &TransformConfig,
input: &[u8],
on_bytes: impl FnMut(&[u8]) -> ControlFlow<()>,
) -> Result<(), ()>Key characteristics:
- Chunks are emitted incrementally as the decoder progresses
- Chunk boundaries depend on the transformation (Base64, URL-percent decoding)
- Chunk sizes vary based on the transformation engine's internal buffering
- Callback returns
ControlFlow::Break(())to stop early orControlFlow::Continue(())to continue
// From stream_decode.rs
scratch.decode_ring.push(chunk);The ring buffer operates as a circular buffer:
- Each incoming chunk is pushed into the ring
- Decoded bytes are tracked with monotonically increasing offsets (
decoded_offset) - Old bytes are discarded when the ring wraps and overwrites them
- Ring size is bounded by
scratch.decode_ringcapacity
Two budget layers are enforced:
-
Per-transform budget (
tc.max_decoded_bytes):let max_out = tc.max_decoded_bytes.min(remaining);
- Limits total decoded output for a single transform
- Per-transform configuration
-
Per-scan budget (
max_total_decode_output_bytes):let remaining = self.tuning.max_total_decode_output_bytes .saturating_sub(scratch.total_decode_output_bytes); if remaining == 0 { return; }
- Global limit across all transforms in a single scan pass
- Prevents unbounded decode work across multiple transforms
Budget checking occurs on each chunk:
- Local chunk count tracked in
local_out - Global total updated incrementally
- Truncation triggered if limits exceeded
The module maintains strict invariants to ensure correctness:
-
Monotonic decoded offsets:
decoded_offset = decoded_offset.saturating_add(chunk.len() as u64);
- Offsets only increase; never reset or go backward
- Enables predictable window scheduling
-
Timing wheel for window scheduling:
match scratch.pending_windows.push(pending.hi, pending) { Ok(PushOutcome::Scheduled) => {} Ok(PushOutcome::Ready(win)) => { // Window is immediately ready for processing } }
- Windows are keyed by their end offset (
hi) in a timing wheel - Windows only process once decoded offset reaches their
hi - Ensures all bytes are decoded before window materialization
- Windows are keyed by their end offset (
-
Slab append-only semantics:
let slab_start = scratch.slab.buf.len(); // ... streaming work ... // On abort or fallback: scratch.slab.buf.truncate(slab_start);
- Decoded bytes appended to slab during streaming
- On fallback or abort, slab truncated to pre-decode state
- Ensures atomicity of the decode operation
-
Per-rule window cap:
let max_hits = self.tuning.max_windows_per_rule_variant as u32; if *hit > max_hits { force_full = true; // Fallback triggered }
- Limits windows per rule/variant to prevent unbounded work
- Exceeding cap forces fallback to full decode
Ring buffer state:
scratch.decode_ring // Circular buffer of decoded bytes
decoded_offset // Current position in logical decoded streamWindow scheduling:
scratch.pending_windows // Timing wheel for scheduled windows
scratch.stream_hit_counts // Per-rule-variant window counter
scratch.stream_hit_touched // Touched indices for efficient resetVectorscan stream state:
stream // Vectorscan stream handle
vs_scratch // Vectorscan scratch space
vs_stream_matches // Matches emitted by callbackUTF-16 variant tracking:
utf16_stream // Optional UTF-16 stream handle
utf16_stream_scratch // UTF-16-specific scratch space
utf16_stream_ctx // UTF-16 match context
decoded_has_nul // Flag indicating NUL bytes (UTF-16 marker)Gate state:
gate_stream // Optional gate database stream
gate_scratch // Gate scratch space
gate_hit // Flag indicating gate match
gate_db_active // Whether gate is currently active
gate_db_failed // Whether gate failed to open/scanStreaming achieves dedupe without materializing the full buffer using AEGIS-128L MAC:
let key = [0u8; 16];
let mut mac = aegis::aegis128l::Aegis128LMac::<16>::new(&key);
// Per chunk:
mac.update(chunk);
// Final:
let h = mix_root_hint_hash(u128::from_le_bytes(mac.finalize()), &root_hint);
if !scratch.seen.insert(h) {
// Duplicate detected; discard
}- Incremental MAC computation over streaming chunks
- Final hash is mixed with
root_hintso identical decoded bytes at different root spans are treated independently - Mixed hash checked against seen set for deduplication
- No need to buffer entire decoded output for hashing
Stream decode uses TransformConfig from the transform module:
pub(super) fn decode_span_fallback(
&self,
tc: &TransformConfig,
transform_idx: usize,
enc_ref: &EncRef,
enc: &[u8],
...
)Key config fields:
tc.id: Transform type (Base64, UrlPercent, etc.)tc.gate: Gate mode (e.g.,Gate::AnchorsInDecoded)tc.min_len: Minimum encoded length to processtc.max_decoded_bytes: Per-transform decode budgettc.plus_to_space: URL decode flagtc.max_spans_per_buffer: Span stream limits
let res = stream_decode(tc, encoded, |chunk| {
// Process chunk callbacks here
// ...
});Transforms supported:
-
URL-Percent (
TransformId::UrlPercent):- Calls
stream_decode_url_percent()from transform.rs - Supports
plus_to_spaceflag
- Calls
-
Base64 (
TransformId::Base64):- Calls
stream_decode_base64()from transform.rs - Validates base64 alphabet and padding
- Calls
Streaming detectors for child transforms emit spans as decoded bytes arrive:
for (tidx, tcfg) in self.transforms.iter().enumerate() {
let state = match tcfg.id {
TransformId::UrlPercent => SpanStreamState::Url(UrlSpanStream::new(tcfg)),
TransformId::Base64 => SpanStreamState::Base64(Base64SpanStream::new(tcfg)),
};
scratch.span_streams.push(SpanStreamEntry {
transform_idx: tidx,
state,
spans_emitted: 0,
max_spans: tcfg.max_spans_per_buffer,
});
}Per-chunk span feeding:
match &mut entry.state {
SpanStreamState::Url(state) => state.feed(chunk, chunk_start, &mut on_span),
SpanStreamState::Base64(state) => state.feed(chunk, chunk_start, &mut on_span),
}UrlSpanStream::feed()detects percent-encoded sequencesBase64SpanStream::feed()detects base64-like sequences- Emitted spans become pending decode work items at end of stream
For Gate::AnchorsInDecoded Base64 transforms, a Vectorscan gate is applied:
if gate_enabled {
if let Some(db) = self.vs_gate.as_ref() {
// Open a gate stream
gate_db_active = true;
gate_stream = Some(stream);
gate_scratch = Some(vs_gate_scratch);
}
}Per-chunk gate scanning:
if gate_db_active && gate_hit == 0 {
if db.scan_stream(gstream, chunk, gscratch, gate_cb, ...).is_err() {
gate_db_active = false;
gate_db_failed = true;
}
}- Gate patterns checked in decoded space
- Early exit if gate patterns match (anchors found)
- Fallback logic if gate fails
Signature:
pub(super) fn decode_stream_and_scan(
&self,
vs_stream: &VsStreamDb,
tc: &TransformConfig,
transform_idx: usize,
enc_ref: &EncRef,
encoded: &[u8],
step_id: StepId,
root_hint: Option<Range<usize>>,
root_hint_maps_encoded: bool,
depth: usize,
base_offset: u64,
file_id: FileId,
scratch: &mut ScanScratch,
)Responsibilities:
- Stream-decode
encodedin chunks - Feed chunks to Vectorscan stream database
- Collect windows from pattern matches
- Schedule windows in timing wheel
- Materialize and process windows as they become ready
- Detect and emit child transform spans
- Handle UTF-16 scanning if enabled
- Apply gate filtering on Base64
Return behavior:
- Modifies
scratchwith findings and work items - Falls back to
decode_span_fallback()on safety violations - Returns early on budget exhaustion or truncation
Signature:
pub(super) fn decode_span_fallback(
&self,
tc: &TransformConfig,
transform_idx: usize,
enc_ref: &EncRef,
enc: &[u8],
step_id: StepId,
root_hint: Option<Range<usize>>,
depth: usize,
scratch: &mut ScanScratch,
)Responsibilities:
- Fully decode
encinto slab - Check length and budget constraints
- Apply gate filtering if enabled
- Dedupe decoded output via 128-bit hash
- Enqueue as batch
ScanBufwork item
Return behavior:
- On dedupe match: truncates slab and returns early
- On decode error: returns early without enqueueing
- On success: enqueues for batch scanning
Signature:
pub(super) fn redecode_window_into(
&self,
tc: &TransformConfig,
encoded: &[u8],
lo: u64,
hi: u64,
max_out: usize,
out: &mut Vec<u8>,
) -> boolResponsibilities:
- Re-decode the window
[lo, hi)fromencoded - Extract only bytes in the range
- Enforce max output limit
- Clear and fill
outwith window bytes
Returns:
trueif exactlyhi - lobytes reconstructedfalseif decode fails, truncates, or exceeds limit
Usage context:
- Called when a window has fallen out of the ring buffer
- Enables lazy materialization of distant windows
- Fallback if re-decode fails
pub(super) struct PendingWindow {
pub(super) hi: u64, // Window end offset (key in timing wheel)
pub(super) lo: u64, // Window start offset
pub(super) rule_id: u32, // Rule that matched
pub(super) variant: Variant, // Raw/UTF-16LE/UTF-16BE
pub(super) anchor_hint: u64, // Anchor position hint
}Role: Represents a pattern match window ready for rule evaluation
pub(super) struct SpanStreamEntry {
pub(super) transform_idx: usize, // Child transform index
pub(super) state: SpanStreamState, // UrlSpanStream or Base64SpanStream
pub(super) spans_emitted: usize, // Count of spans emitted
pub(super) max_spans: usize, // Max allowed spans per buffer
}Role: Tracks a streaming span detector for child transforms
pub(super) enum SpanStreamState {
Url(UrlSpanStream),
Base64(Base64SpanStream),
}Role: Variant enumeration for URL and Base64 span detectors
pub struct ScanScratch {
pub decode_ring: ByteRing, // Circular buffer of decoded bytes
pub pending_windows: TimingWheel<...>, // Windows scheduled for processing
pub stream_hit_counts: Vec<u32>, // Per-rule-variant window counter
pub stream_hit_touched: ScratchVec<u32>, // Touched indices for reset
pub vs_stream_matches: Vec<...>, // Matches from Vectorscan callback
pub vs_stream_scratch: Option<...>, // Vectorscan scratch reuse
pub pending_spans: Vec<...>, // Child transform spans
pub span_streams: Vec<...>, // Span detector instances
pub window_bytes: Vec<u8>, // Temporary window materialization
pub total_decode_output_bytes: usize, // Running total across transforms
pub slab: DecodeSlab, // Appended decoded output
pub seen: FixedSet128, // Dedupe hashes
pub tmp_findings: Vec<...>, // Temporary findings buffer
pub findings_dropped: usize, // Count of dropped findings
}- Vectorscan stream database handle
- Provides
open_stream(),scan_stream(),close_stream()methods - Maintains per-pattern state and metadata
- Stream handle for ongoing Vectorscan scanning
- Passed to callbacks for incremental matching
- Closed after all chunks processed
- Context passed to Vectorscan callback
- Contains pending match vector, metadata pointers
- Callback appends matches to
pending
- UTF-16 variant of match context
- Tracks targets, pattern offsets, pattern lens
- Base offset for absolute positioning
-
Window cap exceeded:
if *hit > max_hits { force_full = true; }
- Per-rule-variant window count exceeds
max_windows_per_rule_variant - Prevents unbounded window materialization and processing
- Per-rule-variant window count exceeds
-
Ring buffer unable to reconstruct window (stream decode path):
if !scratch.decode_ring.extend_range_to(lo, hi, &mut scratch.window_bytes) && !self.redecode_window_into(tc, encoded, lo, hi, max_out, &mut scratch.window_bytes) { force_full = true; }
- Ring buffer has overwritten window bytes
- Re-decode also failed or exceeded limits
- Cannot reliably materialize the window
-
Child span cannot be materialized from ring (stream/end-of-stream span emission):
- Nested transform span bytes must be present in the decode ring
- Missing bytes force
force_full = trueto preserve correctness
-
Decode budget exceeded:
if local_out.saturating_add(chunk.len()) > max_out { truncated = true; }
- Per-transform budget exhausted
- Further decoding halted
-
Total decode budget exceeded:
if scratch.total_decode_output_bytes.saturating_add(chunk.len()) > self.tuning.max_total_decode_output_bytes { truncated = true; }
- Per-scan global budget exhausted
- No more decoding across any transform
-
Stream decode/scan error:
if vs_stream.scan_stream(...).is_err() { truncated = true; }
- Vectorscan streaming failed
- Cannot continue pattern matching
- Gate database failure:
if db.scan_stream(...).is_err() { gate_db_active = false; gate_db_failed = true; }
- Gate scanning failed; gate enforcement is relaxed to avoid false negatives
For Gate::AnchorsInDecoded Base64 transforms:
let gate_enabled = tc.gate == Gate::AnchorsInDecoded;
if gate_enabled {
if let Some(db) = self.vs_gate.as_ref() {
gate_db_active = true;
gate_stream = Some(stream);
}
}Benefits:
- Gate patterns evaluated in decoded space (more accurate)
- Gate stream scanning stops after first hit (
gate_hit != 0) - Prevents committing non-gated decoded output when enforcement is active
Per-chunk gate scanning:
- Gate database scanned alongside main pattern database
- Pattern scanning continues regardless;
gate_hitdrives final enforcement - If gate is enforced and never matches, staged decoded output is discarded
If gate database unavailable or fails:
let gate_satisfied = if gate_db_active || gate_hit != 0 {
gate_hit != 0
} else {
prefilter_gate_hit // Fall back to prefilter hits
};
let enforce_gate = if gate_enabled {
if gate_db_failed {
false // Don't enforce; be permissive
} else if gate_db_active || gate_hit != 0 {
true // Enforce based on gate DB
} else {
!self.tuning.scan_utf16_variants || !self.has_utf16_anchors
}
} else {
false
};Logic:
- If gate DB failed, don't enforce gate (avoid false negatives)
- Relax enforcement if UTF-16 scanning enabled (UTF-16 anchors may miss in prefilter)
- Otherwise, discard non-gated bytes
Entry: decode_stream_and_scan()
|
+-> Allocate/open Vectorscan stream
|
+-> Per-chunk loop: stream_decode(tc, encoded, |chunk| {
| |
| +-> Budget check (per-transform, per-scan)
| |
| +-> Feed to Vectorscan stream DB
| |
| +-> Feed to gate DB (if enabled)
| |
| +-> Feed to UTF-16 stream (if enabled)
| |
| +-> Feed chunk to span detectors
| |
| +-> Collect Vectorscan matches into timing wheel
| |
| +-> Advance timing wheel: process ready windows
| | |
| | +-> Materialize window (ring buffer or re-decode)
| |
| +-> Process window through rule evaluation
| |
| +-> Continue or Break based on conditions
| })
|
+-> Close stream, finalize matches
|
+-> Process end-of-stream spans
|
+-> UTF-16 scanning (if enabled, deferred)
|
+-> Gate satisfaction check
|
+-> Dedupe check (AEGIS-128L MAC)
|
+-> Enqueue findings
|
+-> Enqueue pending child transform spans
|
+-> Exit
On fallback (`force_full`):
+-> Truncate slab to pre-decode state
|
+-> Reset state collections
|
+-> Call decode_span_fallback()
On stream abort (`truncated` or decode/scan error):
+-> Truncate slab to pre-decode state
|
+-> Return without full-decode fallback
- Ring buffer size is fixed and bounded
- Timing wheel overhead linear in window count (capped)
- State collections reset between spans
- Streaming enables early pattern matching
- Avoids redundant decoding for overlapping regions
- Vectorscan stream mode minimizes state transitions
- Reuse
vs_stream_scratch,gate_scratch,utf16_stream_scratchacross calls - Slab append-only reduces allocation fragmentation
- Temporary
window_bytesbuffer reused per window
- Slab rollback is O(1) truncate operation
- State reset via
clear()andreset()methods - No deep copying on fallback path