Skip to content

Commit 8783134

Browse files
Fix Redis to reconnect in Sentinel (Chris Staite) (TraceMachina#2190)
1 parent 86b86e1 commit 8783134

16 files changed

Lines changed: 847 additions & 274 deletions

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-redis-tester/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ rust_library(
1414
"src/fake_redis.rs",
1515
"src/lib.rs",
1616
"src/pubsub.rs",
17+
"src/read_only_redis.rs",
1718
],
1819
visibility = ["//visibility:public"],
1920
deps = [
2021
"//nativelink-util",
22+
"@crates//:either",
2123
"@crates//:redis",
2224
"@crates//:redis-protocol",
2325
"@crates//:redis-test",

nativelink-redis-tester/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ version = "1.0.0-rc2"
99
[dependencies]
1010
nativelink-util = { path = "../nativelink-util" }
1111

12+
either = { version = "1.15.0", default-features = false }
1213
redis = { version = "1.0.0", default-features = false }
1314
redis-protocol = { version = "6.0.0", default-features = false, features = [
1415
"bytes",

nativelink-redis-tester/src/dynamic_fake_redis.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
9494
};
9595

9696
let ret: Value = match cmd.as_str() {
97+
"HELLO" => Value::Map(vec![(
98+
Value::SimpleString("server".into()),
99+
Value::SimpleString("redis".into()),
100+
)]),
97101
"CLIENT" => {
98102
// We can safely ignore these, as it's just setting the library name/version
99103
Value::Int(0)
@@ -350,7 +354,7 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
350354
}
351355
output
352356
};
353-
fake_redis_internal(listener, inner).await;
357+
fake_redis_internal(listener, vec![inner]).await;
354358
}
355359

356360
pub async fn run(self) -> u16 {

nativelink-redis-tester/src/fake_redis.rs

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,23 +78,54 @@ fn args_as_string(args: Vec<Value>) -> String {
7878
output
7979
}
8080

81-
fn add_to_response<B: BuildHasher>(
81+
pub fn add_to_response<B: BuildHasher>(
8282
response: &mut HashMap<String, String, B>,
8383
cmd: &redis::Cmd,
8484
args: Vec<Value>,
8585
) {
86-
response.insert(cmd_as_string(cmd), args_as_string(args));
86+
add_to_response_raw(response, cmd, args_as_string(args));
87+
}
88+
89+
pub fn add_to_response_raw<B: BuildHasher>(
90+
response: &mut HashMap<String, String, B>,
91+
cmd: &redis::Cmd,
92+
args: String,
93+
) {
94+
response.insert(cmd_as_string(cmd), args);
8795
}
8896

8997
fn setinfo(responses: &mut HashMap<String, String>) {
90-
// Library sends both lib-name and lib-ver in one go, so we respond to both
91-
add_to_response(
92-
responses,
98+
// We do raw inserts of command here, because the library sends 3/4 commands in one go
99+
// They always start with HELLO, then optionally SELECT, so we use this to differentiate
100+
let hello = cmd_as_string(redis::cmd("HELLO").arg("3"));
101+
let setinfo = cmd_as_string(
93102
redis::cmd("CLIENT")
94103
.arg("SETINFO")
95104
.arg("LIB-NAME")
96105
.arg("redis-rs"),
97-
vec![Value::Okay, Value::Okay],
106+
);
107+
responses.insert(
108+
[hello.clone(), setinfo.clone()].join(""),
109+
args_as_string(vec![
110+
Value::Map(vec![(
111+
Value::SimpleString("server".into()),
112+
Value::SimpleString("redis".into()),
113+
)]),
114+
Value::Okay,
115+
Value::Okay,
116+
]),
117+
);
118+
responses.insert(
119+
[hello, cmd_as_string(redis::cmd("SELECT").arg(3)), setinfo].join(""),
120+
args_as_string(vec![
121+
Value::Map(vec![(
122+
Value::SimpleString("server".into()),
123+
Value::SimpleString("redis".into()),
124+
)]),
125+
Value::Okay,
126+
Value::Okay,
127+
Value::Okay,
128+
]),
98129
);
99130
}
100131

@@ -159,10 +190,11 @@ pub fn fake_redis_sentinel_stream(master_name: &str, redis_port: u16) -> HashMap
159190
response
160191
}
161192

162-
pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handler: H)
193+
pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handlers: Vec<H>)
163194
where
164-
H: Fn(&[u8]) -> String + Send + Clone + 'static,
195+
H: Fn(&[u8]) -> String + Send + Clone + 'static + Sync,
165196
{
197+
let mut handler_iter = handlers.iter().cloned().cycle();
166198
loop {
167199
info!(
168200
"Waiting for connection on {}",
@@ -173,7 +205,7 @@ where
173205
panic!("error");
174206
};
175207
info!("Accepted new connection");
176-
let local_handler = handler.clone();
208+
let local_handler = handler_iter.next().unwrap();
177209
background_spawn!("thread", async move {
178210
loop {
179211
let mut buf = vec![0; 8192];
@@ -189,32 +221,38 @@ where
189221
}
190222
}
191223

192-
async fn fake_redis<B>(listener: TcpListener, responses: HashMap<String, String, B>)
224+
async fn fake_redis<B>(listener: TcpListener, all_responses: Vec<HashMap<String, String, B>>)
193225
where
194-
B: BuildHasher + Clone + Send + 'static,
226+
B: BuildHasher + Clone + Send + 'static + Sync,
195227
{
196-
info!("Responses are: {:?}", responses);
197-
let values = responses.clone();
198-
let inner = move |buf: &[u8]| -> String {
199-
let str_buf = str::from_utf8(buf);
200-
if let Ok(s) = str_buf {
201-
for (key, value) in &values {
202-
if s.starts_with(key) {
203-
info!("Responding to {}", s.replace("\r\n", "\\r\\n"));
204-
return value.clone();
228+
let funcs = all_responses
229+
.iter()
230+
.map(|responses| {
231+
info!("Responses are: {:?}", responses);
232+
let values = responses.clone();
233+
move |buf: &[u8]| -> String {
234+
let str_buf = String::from_utf8_lossy(buf).into_owned();
235+
for (key, value) in &values {
236+
if str_buf.starts_with(key) {
237+
info!("Responding to {}", str_buf.replace("\r\n", "\\r\\n"));
238+
return value.clone();
239+
}
205240
}
241+
warn!(
242+
"Unknown command: {}",
243+
str_buf.chars().take(1000).collect::<String>()
244+
);
245+
String::new()
206246
}
207-
warn!("Unknown command: {s}");
208-
} else {
209-
warn!("Bytes buffer: {:?}", &buf);
210-
}
211-
String::new()
212-
};
213-
fake_redis_internal(listener, inner).await;
247+
})
248+
.collect();
249+
fake_redis_internal(listener, funcs).await;
214250
}
215251

216-
pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static>(
217-
responses: HashMap<String, String, B>,
252+
pub async fn make_fake_redis_with_multiple_responses<
253+
B: BuildHasher + Clone + Send + 'static + Sync,
254+
>(
255+
responses: Vec<HashMap<String, String, B>>,
218256
) -> u16 {
219257
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
220258
let port = listener.local_addr().unwrap().port();
@@ -226,3 +264,9 @@ pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'sta
226264

227265
port
228266
}
267+
268+
pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static + Sync>(
269+
responses: HashMap<String, String, B>,
270+
) -> u16 {
271+
make_fake_redis_with_multiple_responses(vec![responses]).await
272+
}

nativelink-redis-tester/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
mod dynamic_fake_redis;
1616
mod fake_redis;
1717
mod pubsub;
18+
mod read_only_redis;
1819

1920
pub use dynamic_fake_redis::{FakeRedisBackend, SubscriptionManagerNotify};
2021
pub use fake_redis::{
21-
add_lua_script, fake_redis_sentinel_master_stream, fake_redis_sentinel_stream,
22-
fake_redis_stream, make_fake_redis_with_responses,
22+
add_lua_script, add_to_response, add_to_response_raw, fake_redis_sentinel_master_stream,
23+
fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_multiple_responses,
24+
make_fake_redis_with_responses,
2325
};
2426
pub use pubsub::MockPubSub;
27+
pub use read_only_redis::ReadOnlyRedis;
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2026 The NativeLink Authors. All rights reserved.
2+
//
3+
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// See LICENSE file for details
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use core::fmt::Write;
16+
use core::sync::atomic::{AtomicBool, Ordering};
17+
use std::sync::Arc;
18+
19+
use either::Either;
20+
use nativelink_util::background_spawn;
21+
use redis::Value;
22+
use redis_protocol::resp2::decode::decode;
23+
use redis_protocol::resp2::types::OwnedFrame;
24+
use tokio::net::TcpListener;
25+
use tracing::info;
26+
27+
use crate::fake_redis::{arg_as_string, fake_redis_internal};
28+
29+
const FAKE_SCRIPT_SHA: &str = "b22b9926cbce9dd9ba97fa7ba3626f89feea1ed5";
30+
31+
#[derive(Clone, Debug)]
32+
pub struct ReadOnlyRedis {
33+
// The first time we hit SETRANGE/HMSET, we output a ReadOnly. Next time, we assume we're reconnected and do correct values
34+
readonly_triggered: Arc<AtomicBool>,
35+
}
36+
37+
impl Default for ReadOnlyRedis {
38+
fn default() -> Self {
39+
Self::new()
40+
}
41+
}
42+
43+
impl ReadOnlyRedis {
44+
pub fn new() -> Self {
45+
Self {
46+
readonly_triggered: Arc::new(AtomicBool::new(false)),
47+
}
48+
}
49+
50+
async fn dynamic_fake_redis(self, listener: TcpListener) {
51+
let readonly_err_str = "READONLY You can't write against a read only replica.";
52+
let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len());
53+
54+
let inner = move |buf: &[u8]| -> String {
55+
let mut output = String::new();
56+
let mut buf_index = 0;
57+
loop {
58+
let frame = match decode(&buf[buf_index..]).unwrap() {
59+
Some((frame, amt)) => {
60+
buf_index += amt;
61+
frame
62+
}
63+
None => {
64+
panic!("No frame!");
65+
}
66+
};
67+
let (cmd, args) = {
68+
if let OwnedFrame::Array(a) = frame {
69+
if let OwnedFrame::BulkString(s) = a.first().unwrap() {
70+
let args: Vec<_> = a[1..].to_vec();
71+
(str::from_utf8(s).unwrap().to_string(), args)
72+
} else {
73+
panic!("Array not starting with cmd: {a:?}");
74+
}
75+
} else {
76+
panic!("Non array cmd: {frame:?}");
77+
}
78+
};
79+
80+
let ret: Either<Value, String> = match cmd.as_str() {
81+
"HELLO" => Either::Left(Value::Map(vec![(
82+
Value::SimpleString("server".into()),
83+
Value::SimpleString("redis".into()),
84+
)])),
85+
"CLIENT" => {
86+
// We can safely ignore these, as it's just setting the library name/version
87+
Either::Left(Value::Int(0))
88+
}
89+
"SCRIPT" => {
90+
assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec()));
91+
92+
let OwnedFrame::BulkString(ref _script) = args[1] else {
93+
panic!("Script should be a bulkstring: {args:?}");
94+
};
95+
Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string()))
96+
}
97+
"ROLE" => Either::Left(Value::Array(vec![
98+
Value::BulkString(b"master".to_vec()),
99+
Value::Int(0),
100+
Value::Array(vec![]),
101+
])),
102+
"SETRANGE" => {
103+
let value = self.readonly_triggered.load(Ordering::Relaxed);
104+
if value {
105+
Either::Left(Value::Int(5))
106+
} else {
107+
self.readonly_triggered.store(true, Ordering::Relaxed);
108+
Either::Right(readonly_err.clone())
109+
}
110+
}
111+
"STRLEN" => Either::Left(Value::Int(5)),
112+
"RENAME" | "HMSET" => {
113+
let value = self.readonly_triggered.load(Ordering::Relaxed);
114+
if value {
115+
Either::Left(Value::Okay)
116+
} else {
117+
self.readonly_triggered.store(true, Ordering::Relaxed);
118+
Either::Right(readonly_err.clone())
119+
}
120+
}
121+
"EVALSHA" => Either::Left(Value::Array(vec![Value::Int(1), Value::Int(0)])),
122+
actual => {
123+
panic!("Mock command not implemented! {actual:?}");
124+
}
125+
};
126+
127+
match ret {
128+
Either::Left(v) => {
129+
arg_as_string(&mut output, v);
130+
}
131+
Either::Right(s) => {
132+
write!(&mut output, "{s}").unwrap();
133+
}
134+
}
135+
136+
if buf_index == buf.len() {
137+
break;
138+
}
139+
}
140+
output
141+
};
142+
fake_redis_internal(listener, vec![inner]).await;
143+
}
144+
145+
pub async fn run(self) -> u16 {
146+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
147+
let port = listener.local_addr().unwrap().port();
148+
info!("Using port {port}");
149+
150+
background_spawn!("listener", async move {
151+
self.dynamic_fake_redis(listener).await;
152+
});
153+
154+
port
155+
}
156+
}

0 commit comments

Comments
 (0)