test: add comprehensive zero-downtime reload stress test

This commit is contained in:
2026-02-03 20:36:38 +08:00
parent c1fdcd46d0
commit ba609187eb
4 changed files with 580 additions and 56 deletions

321
Cargo.lock generated
View File

@@ -232,8 +232,8 @@ dependencies = [
"axum-core",
"bytes",
"futures-util",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"itoa",
"matchit",
@@ -256,8 +256,8 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"mime",
"pin-project-lite",
@@ -346,9 +346,9 @@ dependencies = [
"futures-util",
"hex",
"home",
"http",
"http 1.4.0",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-named-pipe",
"hyper-rustls",
"hyper-util",
@@ -581,7 +581,7 @@ dependencies = [
"futures-channel",
"futures-util",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-util",
"lz4_flex",
"polonius-the-crab",
@@ -649,6 +649,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.5.0"
@@ -1188,6 +1197,25 @@ version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7"
[[package]]
name = "h2"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap 2.13.0",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "h2"
version = "0.4.13"
@@ -1199,7 +1227,7 @@ dependencies = [
"fnv",
"futures-core",
"futures-sink",
"http",
"http 1.4.0",
"indexmap 2.13.0",
"slab",
"tokio",
@@ -1235,6 +1263,30 @@ dependencies = [
"foldhash 0.2.0",
]
[[package]]
name = "headers"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270"
dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 0.2.12",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http 0.2.12",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -1293,6 +1345,17 @@ dependencies = [
"windows-link",
]
[[package]]
name = "http"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http"
version = "1.4.0"
@@ -1303,6 +1366,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.1"
@@ -1310,7 +1384,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http",
"http 1.4.0",
]
[[package]]
@@ -1321,8 +1395,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"pin-project-lite",
]
@@ -1338,6 +1412,30 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "0.14.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2 0.3.27",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper"
version = "1.8.1"
@@ -1348,9 +1446,9 @@ dependencies = [
"bytes",
"futures-channel",
"futures-core",
"h2",
"http",
"http-body",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"httparse",
"httpdate",
"itoa",
@@ -1368,7 +1466,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278"
dependencies = [
"hex",
"hyper",
"hyper 1.8.1",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -1382,8 +1480,8 @@ version = "0.27.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
dependencies = [
"http",
"hyper",
"http 1.4.0",
"hyper 1.8.1",
"hyper-util",
"rustls",
"rustls-pki-types",
@@ -1399,7 +1497,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper 1.8.1",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -1414,7 +1512,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-util",
"native-tls",
"tokio",
@@ -1433,14 +1531,14 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"hyper",
"http 1.4.0",
"http-body 1.0.1",
"hyper 1.8.1",
"ipnet",
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"socket2 0.6.1",
"system-configuration",
"tokio",
"tower-service",
@@ -1456,7 +1554,7 @@ checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7"
dependencies = [
"hex",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -1831,6 +1929,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -1858,6 +1966,24 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 0.2.12",
"httparse",
"log",
"memchr",
"mime",
"spin",
"version_check",
]
[[package]]
name = "native-tls"
version = "0.2.14"
@@ -2252,7 +2378,7 @@ dependencies = [
"cf-rustracing",
"cf-rustracing-jaeger",
"hex",
"http",
"http 1.4.0",
"httparse",
"httpdate",
"indexmap 1.9.3",
@@ -2291,8 +2417,8 @@ dependencies = [
"derivative",
"flate2",
"futures",
"h2",
"http",
"h2 0.4.13",
"http 1.4.0",
"httparse",
"httpdate",
"libc",
@@ -2314,7 +2440,7 @@ dependencies = [
"serde",
"serde_yaml 0.8.26",
"sfv",
"socket2",
"socket2 0.6.1",
"strum",
"strum_macros",
"tokio",
@@ -2337,7 +2463,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "252a16def05c7adbbdda776e87b2be36e9481c8a77249207a2f3b563e8933b35"
dependencies = [
"bytes",
"http",
"http 1.4.0",
"httparse",
"pingora-error",
"pingora-http",
@@ -2353,7 +2479,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3542fd0fd0a83212882c5066ae739ba51804f20d624ff7e12ec85113c5c89a"
dependencies = [
"bytes",
"http",
"http 1.4.0",
"pingora-error",
]
@@ -2377,7 +2503,7 @@ dependencies = [
"derivative",
"fnv",
"futures",
"http",
"http 1.4.0",
"log",
"pingora-core",
"pingora-error",
@@ -2438,8 +2564,8 @@ dependencies = [
"bytes",
"clap",
"futures",
"h2",
"http",
"h2 0.4.13",
"http 1.4.0",
"log",
"once_cell",
"pingora-cache",
@@ -2643,7 +2769,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2",
"socket2 0.6.1",
"thiserror 2.0.17",
"tokio",
"tracing",
@@ -2680,7 +2806,7 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2",
"socket2 0.6.1",
"tracing",
"windows-sys 0.60.2",
]
@@ -2851,11 +2977,11 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-rustls",
"hyper-tls",
"hyper-util",
@@ -3068,6 +3194,12 @@ dependencies = [
"serde_json",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -3264,6 +3396,17 @@ dependencies = [
"rust_decimal",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
@@ -3307,6 +3450,16 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.1"
@@ -3317,6 +3470,12 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
@@ -3634,7 +3793,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.6.1",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -3703,6 +3862,18 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
@@ -3726,16 +3897,16 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"h2",
"http",
"http-body",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"socket2",
"socket2 0.6.1",
"sync_wrapper",
"tokio",
"tokio-stream",
@@ -3784,8 +3955,8 @@ dependencies = [
"bitflags 2.10.0",
"bytes",
"futures-util",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"iri-string",
"pin-project-lite",
"tower",
@@ -3811,6 +3982,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -3894,7 +4066,7 @@ dependencies = [
"bytes",
"clickhouse",
"ctor",
"http",
"http 1.4.0",
"httparse",
"ipnet",
"libc",
@@ -3912,7 +4084,7 @@ dependencies = [
"serde_json",
"serde_repr",
"serde_yaml 0.9.34+deprecated",
"socket2",
"socket2 0.6.1",
"tempfile",
"testcontainers",
"thiserror 2.0.17",
@@ -3923,6 +4095,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"url",
"warp",
]
[[package]]
@@ -3931,6 +4104,25 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.4.0",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror 1.0.69",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.19.0"
@@ -3984,7 +4176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d81f9efa9df032be5934a46a068815a10a042b494b6a58cb0a1a97bb5467ed6f"
dependencies = [
"base64 0.22.1",
"http",
"http 1.4.0",
"httparse",
"log",
]
@@ -4041,6 +4233,35 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"headers",
"http 0.2.12",
"hyper 0.14.32",
"log",
"mime",
"mime_guess",
"multer",
"percent-encoding",
"pin-project",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tungstenite",
"tokio-util",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"

View File

@@ -48,7 +48,8 @@ rustls-pemfile = "2.1"
rand = "0.9"
testcontainers = "0.26"
once_cell = "1.19"
ctor = "0.6"
warp = "0.3"
ctor = "0.6.3"
[profile.release]
opt-level = 3

View File

@@ -245,11 +245,9 @@ pub async fn handle_connection(
bytes_recv,
};
tokio::spawn(async move {
if let Err(e) = db.insert_log(log_entry).await {
error!("failed to insert tcp log: {}", e);
}
});
if let Err(e) = db.insert_log(log_entry).await {
error!("failed to insert tcp log: {}", e);
}
Ok(total_bytes)
}

304
tests/reload_test.rs Normal file
View File

@@ -0,0 +1,304 @@
mod common;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UnixStream};
use tokio::process::Command;
use warp::Filter;
// Shared stats
static BYTES_SENT: AtomicUsize = AtomicUsize::new(0);
#[tokio::test]
async fn test_reload_stress() -> anyhow::Result<()> {
// Setup Environment: Initialize DB and clear tables
let host_port = common::get_shared_db_port().await;
// Create tables using system client if needed, or rely on Traudit
let system_client = common::get_db_client(host_port, "default");
let _ = system_client
.query("CREATE DATABASE IF NOT EXISTS traudit")
.execute()
.await;
let client = common::get_db_client(host_port, "traudit");
// Clean start
let _ = client.query("DROP TABLE IF EXISTS tcp_log").execute().await;
let _ = client
.query("DROP TABLE IF EXISTS http_log")
.execute()
.await;
let tcp_backend = TcpListener::bind("127.0.0.1:0").await?;
let tcp_backend_port = tcp_backend.local_addr()?.port();
tokio::spawn(async move {
loop {
if let Ok((mut socket, _)) = tcp_backend.accept().await {
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
let _ = tokio::io::copy(&mut rd, &mut wr).await;
});
}
}
});
// HTTP Backend
let http_backend_port = {
let (addr, server) = warp::serve(
warp::any().map(|| "ok"), // Fix: closure taking 0 args
)
.bind_ephemeral(([127, 0, 0, 1], 0));
let port: u16 = addr.port(); // Fix: explicit type
tokio::spawn(server);
port
};
// Config: Use fixed random high ports to ensure reloading works on same port
let t_tcp_port = 30000 + (rand::random::<u16>() % 1000);
// Ensure unique manually if conflict, but probability low for test.
let t_http_port = t_tcp_port + 1;
let t_unix_path = format!("/tmp/traudit_test_{}.sock", rand::random::<u32>());
// Write Config
let config_content = format!(
r#"
database:
type: clickhouse
dsn: http://traudit:traudit@127.0.0.1:{}/traudit
batch_size: 1
batch_timeout_secs: 1
services:
- name: tcp-bench
type: tcp
forward_to: 127.0.0.1:{}
binds:
- addr: 127.0.0.1:{}
- name: unix-bench
type: tcp
forward_to: 127.0.0.1:{}
binds:
- addr: unix://{}
mode: 666
- name: http-bench
type: http
forward_to: 127.0.0.1:{}
binds:
- addr: 127.0.0.1:{}
"#,
host_port,
tcp_backend_port,
t_tcp_port,
tcp_backend_port,
t_unix_path,
http_backend_port,
t_http_port
);
let config_path = std::env::temp_dir().join("stress_test.yaml");
std::fs::write(&config_path, config_content)?;
// Start Traudit using cargo run to ensure correct binary execution
let mut _child = Command::new("cargo")
.arg("run")
.arg("--bin")
.arg("traudit")
.arg("--")
.arg("-f")
.arg(&config_path)
.spawn()?;
// Give it time to start
tokio::time::sleep(Duration::from_secs(5)).await;
// Run Test Loop for 10 seconds generating mixed traffic
let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
// Task: Staggered Connections
let r_stagger = running.clone(); // Clone for staggered task loop
let staggered_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
for _ in 0..10 {
// 0 to 9
interval.tick().await;
if !r_stagger.load(Ordering::Relaxed) {
break;
}
// TCP Client
let t_addr = format!("127.0.0.1:{}", t_tcp_port);
let r_inner = r_stagger.clone();
tokio::spawn(async move {
// Retry connect logic
for _ in 0..5 {
if let Ok(mut stream) = TcpStream::connect(&t_addr).await {
let mut buf = [0u8; 1024];
while r_inner.load(Ordering::Relaxed) {
if stream.write_all(b"PING").await.is_ok() {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
let _ = stream.read(&mut buf).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
// Unix Client
let u_path = t_unix_path.clone(); // Clone path string for new task
let r_inner = r_stagger.clone();
tokio::spawn(async move {
for _ in 0..5 {
if let Ok(mut stream) = UnixStream::connect(&u_path).await {
let mut buf = [0u8; 1024];
while r_inner.load(Ordering::Relaxed) {
if stream.write_all(b"PING").await.is_ok() {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
let _ = stream.read(&mut buf).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
// HTTP Keep-Alive Client
let h_url = format!("http://127.0.0.1:{}", t_http_port);
let r_inner = r_stagger.clone();
tokio::spawn(async move {
let client = reqwest::Client::builder().build().unwrap();
while r_inner.load(Ordering::Relaxed) {
if let Ok(_) = client.post(&h_url).body("PING").send().await {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
}
});
// Task: High Freq HTTP
let r_high = running.clone();
let high_freq_url = format!("http://127.0.0.1:{}", t_http_port);
let high_handle = tokio::spawn(async move {
let client = reqwest::Client::new();
while r_high.load(Ordering::Relaxed) {
if let Ok(_) = client.post(&high_freq_url).body("FAST").send().await {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
}
// Slight delay to not overwhelm test runner completely
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
// Reload Sequence: Trigger SIGHUP at T=5s
tokio::time::sleep(Duration::from_secs(5)).await;
// Send SIGHUP
let output = Command::new("pgrep")
.arg("-f")
.arg(&config_path.to_string_lossy().to_string())
.output()
.await?;
let pid_str = String::from_utf8(output.stdout)?;
println!("Found PIDs for Reload: {}", pid_str);
for line in pid_str.lines() {
if let Ok(pid) = line.trim().parse::<i32>() {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGHUP,
);
}
}
// Wait remaining time
tokio::time::sleep(Duration::from_secs(5)).await;
// Stop Sequence: Stop generators and signal shutdown
running.store(false, Ordering::Relaxed);
let _ = staggered_handle.await;
let _ = high_handle.await;
// Wait for clients to actually disconnect and server to process logs
tokio::time::sleep(Duration::from_secs(3)).await;
// Kill traudit
let output = Command::new("pgrep")
.arg("-f")
.arg(&config_path.to_string_lossy().to_string())
.output()
.await?;
let pid_str = String::from_utf8(output.stdout)?;
for line in pid_str.lines() {
if let Ok(pid) = line.trim().parse::<i32>() {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGINT,
);
}
}
// Wait for buffered records to flush (batch_timeout_secs: 1)
tokio::time::sleep(Duration::from_secs(3)).await;
// Verify: Aggregated DB logs must cover Client payload (DB >= Client due to headers)
let client_sent = BYTES_SENT.load(Ordering::SeqCst) as u64;
let tcp_res = client
.query("SELECT sum(bytes_recv) FROM tcp_log")
.fetch_one::<u64>()
.await;
let tcp_bytes = match tcp_res {
Ok(n) => n,
Err(e) => {
println!("TCP Query Error: {}", e);
0
}
};
let http_res = client
.query("SELECT sum(req_body_size) FROM http_log")
.fetch_one::<u64>()
.await;
let http_bytes = match http_res {
Ok(n) => n,
Err(e) => {
println!("HTTP Query Error: {}", e);
0
}
};
let db_sent = tcp_bytes + http_bytes;
println!(
"Client Sent Payload: {}, DB Recorded (TCP Recv + HTTP Req Body): {}",
client_sent, db_sent
);
assert!(
db_sent > 0,
"DB recorded 0 bytes! Queries failed or no data."
);
// Ensure DB recorded at least as many bytes as the client sent.
assert!(
db_sent >= client_sent,
"Data loss detected! DB {} < Client {}",
db_sent,
client_sent
);
Ok(())
}