5 Commits

10 changed files with 780 additions and 190 deletions

324
Cargo.lock generated
View File

@@ -232,8 +232,8 @@ dependencies = [
"axum-core",
"bytes",
"futures-util",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"itoa",
"matchit",
@@ -256,8 +256,8 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"mime",
"pin-project-lite",
@@ -346,9 +346,9 @@ dependencies = [
"futures-util",
"hex",
"home",
"http",
"http 1.4.0",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-named-pipe",
"hyper-rustls",
"hyper-util",
@@ -581,7 +581,7 @@ dependencies = [
"futures-channel",
"futures-util",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-util",
"lz4_flex",
"polonius-the-crab",
@@ -649,6 +649,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.5.0"
@@ -1188,6 +1197,25 @@ version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7"
[[package]]
name = "h2"
version = "0.3.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap 2.13.0",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "h2"
version = "0.4.13"
@@ -1199,7 +1227,7 @@ dependencies = [
"fnv",
"futures-core",
"futures-sink",
"http",
"http 1.4.0",
"indexmap 2.13.0",
"slab",
"tokio",
@@ -1235,6 +1263,30 @@ dependencies = [
"foldhash 0.2.0",
]
[[package]]
name = "headers"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270"
dependencies = [
"base64 0.21.7",
"bytes",
"headers-core",
"http 0.2.12",
"httpdate",
"mime",
"sha1",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http 0.2.12",
]
[[package]]
name = "heck"
version = "0.4.1"
@@ -1293,6 +1345,17 @@ dependencies = [
"windows-link",
]
[[package]]
name = "http"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http"
version = "1.4.0"
@@ -1303,6 +1366,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.1"
@@ -1310,7 +1384,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184"
dependencies = [
"bytes",
"http",
"http 1.4.0",
]
[[package]]
@@ -1321,8 +1395,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"pin-project-lite",
]
@@ -1338,6 +1412,30 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "hyper"
version = "0.14.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2 0.3.27",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper"
version = "1.8.1"
@@ -1348,9 +1446,9 @@ dependencies = [
"bytes",
"futures-channel",
"futures-core",
"h2",
"http",
"http-body",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"httparse",
"httpdate",
"itoa",
@@ -1368,7 +1466,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278"
dependencies = [
"hex",
"hyper",
"hyper 1.8.1",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -1382,8 +1480,8 @@ version = "0.27.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
dependencies = [
"http",
"hyper",
"http 1.4.0",
"hyper 1.8.1",
"hyper-util",
"rustls",
"rustls-pki-types",
@@ -1399,7 +1497,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper 1.8.1",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -1414,7 +1512,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-util",
"native-tls",
"tokio",
@@ -1433,14 +1531,14 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"hyper",
"http 1.4.0",
"http-body 1.0.1",
"hyper 1.8.1",
"ipnet",
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"socket2 0.6.1",
"system-configuration",
"tokio",
"tower-service",
@@ -1456,7 +1554,7 @@ checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7"
dependencies = [
"hex",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -1831,6 +1929,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -1858,6 +1966,24 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "multer"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http 0.2.12",
"httparse",
"log",
"memchr",
"mime",
"spin",
"version_check",
]
[[package]]
name = "native-tls"
version = "0.2.14"
@@ -2252,7 +2378,7 @@ dependencies = [
"cf-rustracing",
"cf-rustracing-jaeger",
"hex",
"http",
"http 1.4.0",
"httparse",
"httpdate",
"indexmap 1.9.3",
@@ -2291,8 +2417,8 @@ dependencies = [
"derivative",
"flate2",
"futures",
"h2",
"http",
"h2 0.4.13",
"http 1.4.0",
"httparse",
"httpdate",
"libc",
@@ -2314,7 +2440,7 @@ dependencies = [
"serde",
"serde_yaml 0.8.26",
"sfv",
"socket2",
"socket2 0.6.1",
"strum",
"strum_macros",
"tokio",
@@ -2337,7 +2463,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "252a16def05c7adbbdda776e87b2be36e9481c8a77249207a2f3b563e8933b35"
dependencies = [
"bytes",
"http",
"http 1.4.0",
"httparse",
"pingora-error",
"pingora-http",
@@ -2353,7 +2479,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3542fd0fd0a83212882c5066ae739ba51804f20d624ff7e12ec85113c5c89a"
dependencies = [
"bytes",
"http",
"http 1.4.0",
"pingora-error",
]
@@ -2377,7 +2503,7 @@ dependencies = [
"derivative",
"fnv",
"futures",
"http",
"http 1.4.0",
"log",
"pingora-core",
"pingora-error",
@@ -2438,8 +2564,8 @@ dependencies = [
"bytes",
"clap",
"futures",
"h2",
"http",
"h2 0.4.13",
"http 1.4.0",
"log",
"once_cell",
"pingora-cache",
@@ -2643,7 +2769,7 @@ dependencies = [
"quinn-udp",
"rustc-hash",
"rustls",
"socket2",
"socket2 0.6.1",
"thiserror 2.0.17",
"tokio",
"tracing",
@@ -2680,7 +2806,7 @@ dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2",
"socket2 0.6.1",
"tracing",
"windows-sys 0.60.2",
]
@@ -2851,11 +2977,11 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-rustls",
"hyper-tls",
"hyper-util",
@@ -3068,6 +3194,12 @@ dependencies = [
"serde_json",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294"
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -3264,6 +3396,17 @@ dependencies = [
"rust_decimal",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
@@ -3307,6 +3450,16 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.1"
@@ -3317,6 +3470,12 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "spin"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "stable_deref_trait"
version = "1.2.1"
@@ -3634,7 +3793,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.6.1",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -3703,6 +3862,18 @@ dependencies = [
"tokio-stream",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
@@ -3726,16 +3897,16 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"h2",
"http",
"http-body",
"h2 0.4.13",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper 1.8.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
"pin-project",
"socket2",
"socket2 0.6.1",
"sync_wrapper",
"tokio",
"tokio-stream",
@@ -3784,8 +3955,8 @@ dependencies = [
"bitflags 2.10.0",
"bytes",
"futures-util",
"http",
"http-body",
"http 1.4.0",
"http-body 1.0.1",
"iri-string",
"pin-project-lite",
"tower",
@@ -3811,6 +3982,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@@ -3887,14 +4059,14 @@ dependencies = [
[[package]]
name = "traudit"
version = "0.0.7"
version = "0.0.8"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"clickhouse",
"ctor",
"http",
"http 1.4.0",
"httparse",
"ipnet",
"libc",
@@ -3909,9 +4081,10 @@ dependencies = [
"rustls-pemfile",
"serde",
"serde_ignored",
"serde_json",
"serde_repr",
"serde_yaml 0.9.34+deprecated",
"socket2",
"socket2 0.6.1",
"tempfile",
"testcontainers",
"thiserror 2.0.17",
@@ -3922,6 +4095,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"url",
"warp",
]
[[package]]
@@ -3930,6 +4104,25 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "tungstenite"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http 1.4.0",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror 1.0.69",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.19.0"
@@ -3983,7 +4176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d81f9efa9df032be5934a46a068815a10a042b494b6a58cb0a1a97bb5467ed6f"
dependencies = [
"base64 0.22.1",
"http",
"http 1.4.0",
"httparse",
"log",
]
@@ -4040,6 +4233,35 @@ dependencies = [
"try-lock",
]
[[package]]
name = "warp"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"headers",
"http 0.2.12",
"hyper 0.14.32",
"log",
"mime",
"mime_guess",
"multer",
"percent-encoding",
"pin-project",
"scoped-tls",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tungstenite",
"tokio-util",
"tower-service",
"tracing",
]
[[package]]
name = "wasi"
version = "0.11.1+wasi-snapshot-preview1"

View File

@@ -1,6 +1,6 @@
[package]
name = "traudit"
version = "0.0.7"
version = "0.0.8"
edition = "2021"
authors = ["awfufu"]
description = "A reverse proxy that streams audit records directly to databases."
@@ -12,6 +12,7 @@ tokio = { version = "1", features = ["full"] }
clickhouse = { version = "0.14", features = ["time"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
serde_json = "1"
http = "1"
socket2 = "0.6"
libc = "0.2"
@@ -30,7 +31,7 @@ httparse = "1.10.1"
openssl = { version = "0.10" }
serde_ignored = "0.1.14"
tokio-openssl = "0.6"
nix = { version = "0.31.1", features = ["signal", "process", "socket"] }
nix = { version = "0.31.1", features = ["signal", "process", "socket", "fs"] }
[features]
default = []
@@ -47,7 +48,8 @@ rustls-pemfile = "2.1"
rand = "0.9"
testcontainers = "0.26"
once_cell = "1.19"
ctor = "0.6"
warp = "0.3"
ctor = "0.6.3"
[profile.release]
opt-level = 3

View File

@@ -32,5 +32,5 @@ See [config_example.yaml](config_example.yaml).
- [ ] SQLite/MySQL Adapters (Future)
- [ ] Documentation & Testing
- [x] Basic End-to-end tests
- [ ] Comprehensive Unit Tests
- [x] Comprehensive Unit Tests
- [ ] Deployment Guide

View File

@@ -32,5 +32,5 @@ traudit 是一个支持 TCP/UDP/Unix Socket 的反向代理程序,专注于连
- [ ] SQLite/MySQL 适配器 (计划中)
- [ ] 文档与测试
- [x] 基础端到端测试
- [ ] 单元测试
- [x] 单元测试
- [ ] 部署文档

View File

@@ -245,11 +245,9 @@ pub async fn handle_connection(
bytes_recv,
};
tokio::spawn(async move {
if let Err(e) = db.insert_log(log_entry).await {
error!("failed to insert tcp log: {}", e);
}
});
if let Err(e) = db.insert_log(log_entry).await {
error!("failed to insert tcp log: {}", e);
}
Ok(total_bytes)
}

View File

@@ -4,7 +4,6 @@ use crate::core::server::stream::InboundStream;
use bytes::BytesMut;
use openssl::ssl::{Ssl, SslAcceptor};
use pingora::protocols::l4::socket::SocketAddr;
// ShutdownWatch removed
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::sync::Arc;
@@ -46,12 +45,52 @@ impl UnifiedListener {
}
}
// Global registry for FDs to be passed during reload
pub static FD_REGISTRY: std::sync::OnceLock<
std::sync::Mutex<std::collections::HashMap<String, std::os::unix::io::RawFd>>,
> = std::sync::OnceLock::new();
pub fn get_fd_registry(
) -> &'static std::sync::Mutex<std::collections::HashMap<String, std::os::unix::io::RawFd>> {
FD_REGISTRY.get_or_init(|| std::sync::Mutex::new(std::collections::HashMap::new()))
}
pub async fn bind_listener(
addr_str: &str,
mode: u32,
service_name: &str,
) -> anyhow::Result<UnifiedListener> {
if let Some(path) = addr_str.strip_prefix("unix://") {
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
// Check if we inherited an FD for this service
let inherited_fds_json = std::env::var("TRAUDIT_INHERITED_FDS").ok();
let mut inherited_fd: Option<RawFd> = None;
if let Some(json) = inherited_fds_json {
let map: std::collections::HashMap<String, RawFd> =
serde_json::from_str(&json).unwrap_or_default();
if let Some(&fd) = map.get(service_name) {
info!("[{}] inherited fd: {}", service_name, fd);
inherited_fd = Some(fd);
}
}
let listener = if let Some(fd) = inherited_fd {
// Determine type based on address string prefix
if addr_str.starts_with("unix://") {
let l = unsafe { std::os::unix::net::UnixListener::from_raw_fd(fd) };
// We must set it non-blocking as tokio expects
l.set_nonblocking(true)?;
let l = UnixListener::from_std(l)?;
let path = std::path::PathBuf::from(addr_str.trim_start_matches("unix://"));
UnifiedListener::Unix(l, path)
} else {
let l = unsafe { std::net::TcpListener::from_raw_fd(fd) };
l.set_nonblocking(true)?;
let l = TcpListener::from_std(l)?;
UnifiedListener::Tcp(l)
}
} else if let Some(path) = addr_str.strip_prefix("unix://") {
// Robust bind logic adapted from previous implementation
let path_buf = std::path::Path::new(path).to_path_buf();
@@ -93,7 +132,7 @@ pub async fn bind_listener(
}
}
Ok(UnifiedListener::Unix(listener, path_buf))
UnifiedListener::Unix(listener, path_buf)
} else {
// TCP with SO_REUSEPORT
use nix::sys::socket::{setsockopt, sockopt};
@@ -158,8 +197,41 @@ pub async fn bind_listener(
e
})?;
Ok(UnifiedListener::Tcp(listener))
UnifiedListener::Tcp(listener)
};
// Register duplicated FD for reload to pass to the next process.
let raw_fd = match &listener {
UnifiedListener::Tcp(l) => l.as_raw_fd(),
UnifiedListener::Unix(l, _) => l.as_raw_fd(),
};
// Use libc for dup to avoid nix version issues
let dup_fd = unsafe { libc::dup(raw_fd) };
if dup_fd < 0 {
let err = std::io::Error::last_os_error();
error!("failed to dup fd: {}", err);
return Err(anyhow::anyhow!(err));
}
// Set CLOEXEC on the dup_fd
let flags = unsafe { libc::fcntl(dup_fd, libc::F_GETFD) };
if flags < 0 {
let _ = unsafe { libc::close(dup_fd) };
return Err(anyhow::anyhow!(std::io::Error::last_os_error()));
}
if unsafe { libc::fcntl(dup_fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) } < 0 {
let _ = unsafe { libc::close(dup_fd) };
return Err(anyhow::anyhow!(std::io::Error::last_os_error()));
}
get_fd_registry()
.lock()
.unwrap()
.insert(service_name.to_string(), dup_fd);
Ok(listener)
}
pub async fn serve_listener_loop<F, Fut>(
@@ -256,22 +328,45 @@ pub async fn serve_listener_loop<F, Fut>(
}
// 2. Resolve Real IP (consumes stream/buffer for XFF peeking if needed).
let (real_peer_ip, real_peer_port) = match crate::core::server::handler::resolve_real_ip(
&real_ip_config,
client_addr,
&proxy_info,
&mut stream,
&mut buffer,
)
.await
{
Ok((ip, port)) => (ip, port),
Err(e) => {
error!("[{}] real ip resolution failed: {}", service.name, e);
// Fallback or abort?
// Abort is safer if I/O broken.
return;
}
// [FIX] If TLS is enabled, we CANNOT peek for XFF headers on the raw stream because it's encrypted.
// In that case, we skip XFF resolution here and let the proxy application (Pingora) handle it
// after decryption (though Pingora might need its own config for that).
// For now, avoiding the deadlock is priority.
let perform_xff = if tls_acceptor.is_some() {
if let Some(ref cfg) = real_ip_config {
// If source is Xff, we must skip.
// If source is ProxyProtocol, we can still do it (already done via proxy_info above).
cfg.source != crate::config::RealIpSource::Xff
} else {
true
}
} else {
true
};
let (real_peer_ip, real_peer_port) = if perform_xff {
match crate::core::server::handler::resolve_real_ip(
&real_ip_config,
client_addr,
&proxy_info,
&mut stream,
&mut buffer,
)
.await
{
Ok((ip, port)) => (ip, port),
Err(e) => {
error!("[{}] real ip resolution failed: {}", service.name, e);
return;
}
}
} else {
// Fallback to what we know (Proxy Protocol or Physical)
if let Some(info) = &proxy_info {
(info.source.ip(), info.source.port())
} else {
(client_addr.ip(), client_addr.port())
}
};
let local_addr = match &stream {

View File

@@ -2,13 +2,12 @@ use crate::config::Config;
use crate::core::upstream::UpstreamStream;
use crate::db::clickhouse::ClickHouseLogger;
use pingora::apps::ServerApp;
use std::os::unix::fs::PermissionsExt;
use std::sync::{Arc, Barrier};
use std::sync::Arc;
use tracing::{error, info};
pub mod context;
pub mod handler;
mod listener;
pub mod listener;
mod pingora_compat;
pub mod stream;
@@ -37,11 +36,9 @@ pub async fn run(
return Err(e);
}
// JoinSet to manage all server tasks
let mut join_set = tokio::task::JoinSet::new();
// Pingora server initialization (TLS only or Standard HTTP)
let mut pingora_services = Vec::new();
for service in config.services {
let db = db.clone();
for bind in &service.binds {
@@ -51,26 +48,9 @@ pub async fn run(
let mode = bind.mode;
let real_ip_config = bind.real_ip.clone();
// Use custom loop for TCP services or HTTP services requiring PROXY protocol parsing (not fully supported by pingora standard loop).
let is_tcp_service = service.service_type == "tcp";
// Use custom loop if Proxy Protocol is enabled, even if TLS is used
let is_http_proxy = service.service_type == "http" && bind.proxy.is_some();
let use_custom_loop = is_tcp_service || is_http_proxy;
if !use_custom_loop {
// Use Standard Pingora Service (For TLS, or Pure HTTP, or Unix HTTP without PROXY)
pingora_services.push((
service_config,
bind.clone(),
bind.tls.clone(),
real_ip_config,
));
continue;
}
// --- Custom Loop Logic ---
// Custom Loop
let mut tls_acceptor = None;
if let Some(tls_config) = &bind.tls {
@@ -91,7 +71,7 @@ pub async fn run(
error!("failed to load cert chain {}: {}", tls_config.cert, e);
anyhow::anyhow!(e)
})?;
// ALPN support matching Pingora's defaults?
// ALPN support matching Pingora's defaults
acceptor.set_alpn_protos(b"\x02h2\x08http/1.1").ok();
tls_acceptor = Some(Arc::new(acceptor.build()));
}
@@ -150,14 +130,14 @@ pub async fn run(
format!(" {}", tags.join(" "))
};
if is_http_proxy {
if is_tcp_service {
info!(
"[{}] listening on http {} {}{}",
"[{}] listening on {} {}{}",
service_config.name, listen_type, bind_addr, tag_str
);
} else {
info!(
"[{}] listening on {} {}{}",
"[{}] listening on http {} {}{}",
service_config.name, listen_type, bind_addr, tag_str
);
}
@@ -262,61 +242,6 @@ pub async fn run(
}
}
// Run Pingora in a separate thread if needed
if !pingora_services.is_empty() {
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = barrier.clone();
std::thread::spawn(move || {
use crate::core::pingora_proxy::TrauditProxy;
use pingora::proxy::http_proxy_service;
use pingora::server::configuration::Opt;
use pingora::server::Server;
if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut server = Server::new(Some(Opt::default())).unwrap();
server.bootstrap();
for (svc_config, bind, tls, real_ip) in pingora_services {
let proxy = TrauditProxy {
db: db.clone(),
service_config: svc_config.clone(),
listen_addr: bind.addr.clone(),
real_ip,
add_xff_header: bind.add_xff_header,
};
let mut service = http_proxy_service(&server.configuration, proxy);
if let Some(tls_config) = tls {
let key_path = tls_config.key.as_deref().unwrap_or(&tls_config.cert);
service
.add_tls(&bind.addr, &tls_config.cert, key_path)
.unwrap();
info!("[{}] listening on https {}", svc_config.name, bind.addr);
} else if bind.addr.starts_with("unix://") {
let path = bind.addr.trim_start_matches("unix://");
service.add_uds(path, Some(std::fs::Permissions::from_mode(bind.mode)));
info!("[{}] listening on http unix {}", svc_config.name, path);
} else {
service.add_tcp(&bind.addr);
info!("[{}] listening on http {}", svc_config.name, bind.addr);
}
server.add_service(service);
}
barrier_clone.wait();
server.run_forever();
})) {
error!("pingora server panicked: {:?}", e);
}
error!("pingora server exited unexpectedly!");
});
barrier.wait();
}
info!("traudit started...");
// notify systemd if configured

View File

@@ -127,11 +127,40 @@ async fn main() -> anyhow::Result<()> {
tokio::select! {
_ = sighup.recv() => {
info!("received SIGHUP (reload). spawning new process...");
// Prepare FDs to pass
let fd_map = {
let registry = traudit::core::server::listener::get_fd_registry().lock().unwrap();
registry.clone()
};
let fd_json = serde_json::to_string(&fd_map).unwrap_or_default();
info!("passing fds: {}", fd_json);
// Spawn new process
let args: Vec<String> = env::args().collect();
match std::process::Command::new(&args[0])
.args(&args[1..])
.spawn() {
let mut cmd = std::process::Command::new(&args[0]);
cmd.args(&args[1..]);
cmd.env("TRAUDIT_INHERITED_FDS", fd_json);
unsafe {
// Use pre_exec to clear CLOEXEC on the FDs to be inherited.
let fd_map_for_closure = fd_map.clone();
use std::os::unix::process::CommandExt;
cmd.pre_exec(move || {
for (_, &fd) in &fd_map_for_closure {
// Clear FD_CLOEXEC flag
let flags = libc::fcntl(fd, libc::F_GETFD);
if flags >= 0 {
libc::fcntl(fd, libc::F_SETFD, flags & !libc::FD_CLOEXEC);
}
}
Ok(())
});
}
match cmd.spawn() {
Ok(child) => {
let child_pid = child.id();
info!("spawned new process with pid: {}", child_pid);

View File

@@ -1,7 +1,7 @@
use std::net::SocketAddr;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use traudit::config::{
BindEntry, Config, DatabaseConfig, RealIpConfig, RealIpSource, ServiceConfig,
};
@@ -164,6 +164,7 @@ async fn test_https_v2_append_xff() {
.await;
}
// Helper for Chain Test
// Helper for Chain Test
struct ChainTestResources {
config: Config,
@@ -176,26 +177,12 @@ async fn prepare_chain_env() -> ChainTestResources {
// E4 Upstream (Mock Server)
let (e4_upstream_addr, _) = spawn_mock_upstream().await;
// Assign ports dynamically
let l1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let p1 = l1.local_addr().unwrap().port();
let addr1 = format!("127.0.0.1:{}", p1);
drop(l1);
let l2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let p2 = l2.local_addr().unwrap().port();
let addr2 = format!("127.0.0.1:{}", p2);
drop(l2);
let l3 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let p3 = l3.local_addr().unwrap().port();
let addr3 = format!("127.0.0.1:{}", p3);
drop(l3);
let l4 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let p4 = l4.local_addr().unwrap().port();
let addr4 = format!("127.0.0.1:{}", p4);
drop(l4);
// Use Unix sockets to avoid port collisions/stealing
let suffix = rand::random::<u64>();
let addr1 = format!("unix:///tmp/traudit_e1_{}.sock", suffix);
let addr2 = format!("unix:///tmp/traudit_e2_{}.sock", suffix);
let addr3 = format!("unix:///tmp/traudit_e3_{}.sock", suffix);
let addr4 = format!("unix:///tmp/traudit_e4_{}.sock", suffix);
// DB Config
let db_port = get_shared_db_port().await;
@@ -307,8 +294,9 @@ async fn test_proxy_chain() {
});
tokio::time::sleep(Duration::from_millis(2000)).await;
// Connect to E1
let mut stream = TcpStream::connect(&res.e1_addr)
// Connect to E1 (Unix)
let path = res.e1_addr.strip_prefix("unix://").unwrap();
let mut stream = tokio::net::UnixStream::connect(path)
.await
.expect("Failed to connect to E1");
@@ -325,4 +313,8 @@ async fn test_proxy_chain() {
response, b"chain_test_ping",
"Chain test failed: response mismatch"
);
// Cleanup E1 socket (others are cleaned up by server, but E1 we explicitly connected to)
// Actually all are cleaned up by server::run when it drops listener.
// We don't need manual cleanup here unless we want to be pedantic.
}

327
tests/reload_test.rs Normal file
View File

@@ -0,0 +1,327 @@
mod common;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UnixStream};
use tokio::process::Command;
use warp::Filter;
// Shared stats
static BYTES_SENT: AtomicUsize = AtomicUsize::new(0);
#[tokio::test]
async fn test_reload_stress() -> anyhow::Result<()> {
// Setup Environment: Initialize DB and clear tables
let host_port = common::get_shared_db_port().await;
// Create tables using system client if needed, or rely on Traudit
let system_client = common::get_db_client(host_port, "default");
let _ = system_client
.query("CREATE DATABASE IF NOT EXISTS traudit")
.execute()
.await;
let client = common::get_db_client(host_port, "traudit");
// Clean start
let _ = client.query("DROP TABLE IF EXISTS tcp_log").execute().await;
let _ = client
.query("DROP TABLE IF EXISTS http_log")
.execute()
.await;
let tcp_backend = TcpListener::bind("127.0.0.1:0").await?;
let tcp_backend_port = tcp_backend.local_addr()?.port();
tokio::spawn(async move {
loop {
if let Ok((mut socket, _)) = tcp_backend.accept().await {
tokio::spawn(async move {
let (mut rd, mut wr) = socket.split();
let _ = tokio::io::copy(&mut rd, &mut wr).await;
});
}
}
});
// HTTP Backend
let http_backend_port = {
let (addr, server) = warp::serve(
warp::any().map(|| "ok"), // Fix: closure taking 0 args
)
.bind_ephemeral(([127, 0, 0, 1], 0));
let port: u16 = addr.port(); // Fix: explicit type
tokio::spawn(server);
port
};
// Config: Use fixed random high ports to ensure reloading works on same port
let t_tcp_port = 30000 + (rand::random::<u16>() % 1000);
// Ensure unique manually if conflict, but probability low for test.
let t_http_port = t_tcp_port + 1;
let t_unix_path = format!("/tmp/traudit_test_{}.sock", rand::random::<u32>());
// Write Config
let config_content = format!(
r#"
database:
type: clickhouse
dsn: http://traudit:traudit@127.0.0.1:{}/traudit
batch_size: 1
batch_timeout_secs: 1
services:
- name: tcp-bench
type: tcp
forward_to: 127.0.0.1:{}
binds:
- addr: 127.0.0.1:{}
- name: unix-bench
type: tcp
forward_to: 127.0.0.1:{}
binds:
- addr: unix://{}
mode: 666
- name: http-bench
type: http
forward_to: 127.0.0.1:{}
binds:
- addr: 127.0.0.1:{}
"#,
host_port,
tcp_backend_port,
t_tcp_port,
tcp_backend_port,
t_unix_path,
http_backend_port,
t_http_port
);
let config_path = std::env::temp_dir().join("stress_test.yaml");
std::fs::write(&config_path, config_content)?;
// Pre-build to ensure cargo run doesn't time out compiling
let _ = Command::new("cargo")
.arg("build")
.arg("--bin")
.arg("traudit")
.status()
.await?;
// Start Traudit using cargo run to ensure correct binary execution
let mut _child = Command::new("cargo")
.arg("run")
.arg("--bin")
.arg("traudit")
.arg("--")
.arg("-f")
.arg(&config_path)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()?;
// Wait for port to be open (up to 30s)
let addr = format!("127.0.0.1:{}", t_tcp_port);
let mut started = false;
for _ in 0..60 {
// 30s total (500ms * 60)
if TcpStream::connect(&addr).await.is_ok() {
started = true;
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
if !started {
panic!("Traudit failed to start on port {} within 30s", t_tcp_port);
}
// Run Test Loop for 10 seconds generating mixed traffic
let running = Arc::new(std::sync::atomic::AtomicBool::new(true));
// Task: Staggered Connections
let r_stagger = running.clone(); // Clone for staggered task loop
let staggered_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
for _ in 0..10 {
// 0 to 9
interval.tick().await;
if !r_stagger.load(Ordering::Relaxed) {
break;
}
// TCP Client
let t_addr = format!("127.0.0.1:{}", t_tcp_port);
let r_inner = r_stagger.clone();
tokio::spawn(async move {
// Retry connect logic
for _ in 0..5 {
if let Ok(mut stream) = TcpStream::connect(&t_addr).await {
let mut buf = [0u8; 1024];
while r_inner.load(Ordering::Relaxed) {
if stream.write_all(b"PING").await.is_ok() {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
let _ = stream.read(&mut buf).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
// Unix Client
let u_path = t_unix_path.clone(); // Clone path string for new task
let r_inner = r_stagger.clone();
tokio::spawn(async move {
for _ in 0..5 {
if let Ok(mut stream) = UnixStream::connect(&u_path).await {
let mut buf = [0u8; 1024];
while r_inner.load(Ordering::Relaxed) {
if stream.write_all(b"PING").await.is_ok() {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
let _ = stream.read(&mut buf).await;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
// HTTP Keep-Alive Client
let h_url = format!("http://127.0.0.1:{}", t_http_port);
let r_inner = r_stagger.clone();
tokio::spawn(async move {
let client = reqwest::Client::builder().build().unwrap();
while r_inner.load(Ordering::Relaxed) {
if let Ok(_) = client.post(&h_url).body("PING").send().await {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
}
});
// Task: High Freq HTTP
let r_high = running.clone();
let high_freq_url = format!("http://127.0.0.1:{}", t_http_port);
let high_handle = tokio::spawn(async move {
let client = reqwest::Client::new();
while r_high.load(Ordering::Relaxed) {
if let Ok(_) = client.post(&high_freq_url).body("FAST").send().await {
BYTES_SENT.fetch_add(4, Ordering::SeqCst);
}
// Slight delay to not overwhelm test runner completely
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
// Reload Sequence: Trigger SIGHUP at T=5s
tokio::time::sleep(Duration::from_secs(5)).await;
// Send SIGHUP
let output = Command::new("pgrep")
.arg("-f")
.arg(&config_path.to_string_lossy().to_string())
.output()
.await?;
let pid_str = String::from_utf8(output.stdout)?;
println!("Found PIDs for Reload: {}", pid_str);
for line in pid_str.lines() {
if let Ok(pid) = line.trim().parse::<i32>() {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGHUP,
);
}
}
// Wait remaining time
tokio::time::sleep(Duration::from_secs(5)).await;
// Stop Sequence: Stop generators and signal shutdown
running.store(false, Ordering::Relaxed);
let _ = staggered_handle.await;
let _ = high_handle.await;
// Wait for clients to actually disconnect and server to process logs
tokio::time::sleep(Duration::from_secs(3)).await;
// Kill traudit
let output = Command::new("pgrep")
.arg("-f")
.arg(&config_path.to_string_lossy().to_string())
.output()
.await?;
let pid_str = String::from_utf8(output.stdout)?;
for line in pid_str.lines() {
if let Ok(pid) = line.trim().parse::<i32>() {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid),
nix::sys::signal::Signal::SIGINT,
);
}
}
// Wait for buffered records to flush (batch_timeout_secs: 1)
tokio::time::sleep(Duration::from_secs(3)).await;
// Verify: Aggregated DB logs must cover Client payload (DB >= Client due to headers)
let client_sent = BYTES_SENT.load(Ordering::SeqCst) as u64;
let tcp_res = client
.query("SELECT sum(bytes_recv) FROM tcp_log")
.fetch_one::<u64>()
.await;
let tcp_bytes = match tcp_res {
Ok(n) => n,
Err(e) => {
println!("TCP Query Error: {}", e);
0
}
};
let http_res = client
.query("SELECT sum(req_body_size) FROM http_log")
.fetch_one::<u64>()
.await;
let http_bytes = match http_res {
Ok(n) => n,
Err(e) => {
println!("HTTP Query Error: {}", e);
0
}
};
let db_sent = tcp_bytes + http_bytes;
println!(
"Client Sent Payload: {}, DB Recorded (TCP Recv + HTTP Req Body): {}",
client_sent, db_sent
);
assert!(
db_sent > 0,
"DB recorded 0 bytes! Queries failed or no data."
);
// Ensure DB recorded at least as many bytes as the client sent.
assert!(
db_sent >= client_sent,
"Data loss detected! DB {} < Client {}",
db_sent,
client_sent
);
Ok(())
}