refactor: split server.rs into modules and upgrade dependencies

This commit is contained in:
2026-01-16 21:18:46 +08:00
parent d62c0248f0
commit 278b22e57e
9 changed files with 677 additions and 769 deletions

291
Cargo.lock generated
View File

@@ -31,6 +31,12 @@ version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
[[package]]
name = "bnum"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "119771309b95163ec7aaf79810da82f7cd0599c19722d48b9c03894dca833966"
[[package]]
name = "bstr"
version = "1.12.1"
@@ -45,6 +51,9 @@ name = "bytes"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
dependencies = [
"serde",
]
[[package]]
name = "cfg-if"
@@ -60,35 +69,35 @@ checksum = "93a719913643003b84bd13022b4b7e703c09342cd03b679c4641c7d2e50dc34d"
[[package]]
name = "clickhouse"
version = "0.13.3"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a9a81a1dffadd762ee662635ce409232258ce9beebd7cc0fa227df0b5e7efc0"
checksum = "d975a05171c6f8a453f60ec6287c0018c90911d5a8a46d9b6abe386ea359fab3"
dependencies = [
"bnum",
"bstr",
"bytes",
"cityhash-rs",
"clickhouse-derive",
"futures",
"clickhouse-macros",
"clickhouse-types",
"futures-channel",
"futures-util",
"http-body-util",
"hyper",
"hyper-util",
"lz4_flex",
"replace_with",
"sealed",
"polonius-the-crab",
"serde",
"static_assertions",
"thiserror 1.0.69",
"thiserror",
"time",
"tokio",
"url",
]
[[package]]
name = "clickhouse-derive"
version = "0.2.0"
name = "clickhouse-macros"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d70f3e2893f7d3e017eeacdc9a708fbc29a10488e3ebca21f9df6a5d2b616dbb"
checksum = "ff6669899e23cb87b43daf7996f0ea3b9c07d0fb933d745bb7b815b052515ae3"
dependencies = [
"proc-macro2",
"quote",
@@ -96,6 +105,16 @@ dependencies = [
"syn",
]
[[package]]
name = "clickhouse-types"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "358fbfd439fb0bed02a3e2ecc5131f6a9d039ba5639aed650cf0e845f6ebfc16"
dependencies = [
"bytes",
"thiserror",
]
[[package]]
name = "deranged"
version = "0.5.5"
@@ -148,21 +167,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
@@ -179,34 +183,12 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
@@ -225,10 +207,8 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@@ -255,6 +235,17 @@ version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]]
name = "higher-kinded-types"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e690f8474c6c5d8ff99656fcbc195a215acc3949481a8b0b3351c838972dc776"
dependencies = [
"macro_rules_attribute",
"never-say-never",
"paste",
]
[[package]]
name = "http"
version = "1.4.0"
@@ -330,7 +321,7 @@ dependencies = [
"hyper",
"libc",
"pin-project-lite",
"socket2 0.6.1",
"socket2",
"tokio",
"tower-service",
"tracing",
@@ -499,6 +490,22 @@ version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a"
[[package]]
name = "macro_rules_attribute"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65049d7923698040cd0b1ddcced9b0eb14dd22c5f86ae59c3740eab64a676520"
dependencies = [
"macro_rules_attribute-proc_macro",
"paste",
]
[[package]]
name = "macro_rules_attribute-proc_macro"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670fdfda89751bc4a84ac13eaa63e205cf0fd22b4c9a5fbfa085b63c1f1d3a30"
[[package]]
name = "memchr"
version = "2.7.6"
@@ -516,6 +523,12 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "never-say-never"
version = "6.6.666"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5a574dadd7941adeaa71823ecba5e28331b8313fb2e1c6a5c7e5981ea53ad6"
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@@ -560,6 +573,12 @@ dependencies = [
"windows-link",
]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "percent-encoding"
version = "2.3.2"
@@ -578,6 +597,16 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "polonius-the-crab"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec242d7eccbb2fd8b3b5b6e3cf89f94a91a800f469005b44d154359609f8af72"
dependencies = [
"higher-kinded-types",
"never-say-never",
]
[[package]]
name = "potential_utf"
version = "0.1.4"
@@ -626,12 +655,6 @@ dependencies = [
"bitflags",
]
[[package]]
name = "replace_with"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51743d3e274e2b18df81c4dc6caf8a5b8e15dbe799e0dca05c7617380094e884"
[[package]]
name = "rustix"
version = "1.1.3"
@@ -657,17 +680,6 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "sealed"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22f968c5ea23d555e670b449c1c5e7b2fc399fdaec1d304a17cd48e288abc107"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde"
version = "1.0.228"
@@ -764,16 +776,6 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.1"
@@ -790,12 +792,6 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "syn"
version = "2.0.114"
@@ -831,33 +827,13 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "thiserror"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52"
dependencies = [
"thiserror-impl 1.0.69",
]
[[package]]
name = "thiserror"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
dependencies = [
"thiserror-impl 2.0.17",
]
[[package]]
name = "thiserror-impl"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1"
dependencies = [
"proc-macro2",
"quote",
"syn",
"thiserror-impl",
]
[[package]]
@@ -933,7 +909,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.1",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",
]
@@ -1024,9 +1000,9 @@ dependencies = [
"serde",
"serde_repr",
"serde_yaml",
"socket2 0.5.10",
"socket2",
"tempfile",
"thiserror 2.0.17",
"thiserror",
"time",
"tokio",
"tracing",
@@ -1093,9 +1069,9 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]]
name = "wasip2"
version = "1.0.1+wasi-0.2.4"
version = "1.0.2+wasi-0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5"
dependencies = [
"wit-bindgen",
]
@@ -1106,22 +1082,13 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
dependencies = [
"windows-targets 0.53.5",
"windows-targets",
]
[[package]]
@@ -1133,22 +1100,6 @@ dependencies = [
"windows-link",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6",
"windows_i686_gnullvm 0.52.6",
"windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
]
[[package]]
name = "windows-targets"
version = "0.53.5"
@@ -1156,106 +1107,58 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
dependencies = [
"windows-link",
"windows_aarch64_gnullvm 0.53.1",
"windows_aarch64_msvc 0.53.1",
"windows_i686_gnu 0.53.1",
"windows_i686_gnullvm 0.53.1",
"windows_i686_msvc 0.53.1",
"windows_x86_64_gnu 0.53.1",
"windows_x86_64_gnullvm 0.53.1",
"windows_x86_64_msvc 0.53.1",
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_aarch64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_i686_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "windows_x86_64_msvc"
version = "0.53.1"
@@ -1264,9 +1167,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "wit-bindgen"
version = "0.46.0"
version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5"
[[package]]
name = "writeable"

View File

@@ -9,10 +9,10 @@ repository = "https://github.com/awfufu/traudit"
[dependencies]
tokio = { version = "1", features = ["full"] }
clickhouse = { version = "0.13", features = ["time"] }
clickhouse = { version = "0.14", features = ["time"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
socket2 = "0.5"
socket2 = "0.6"
libc = "0.2"
tracing = "0.1"
tracing-subscriber = "0.3"

View File

@@ -34,6 +34,7 @@ fn default_timeout_secs() -> u64 {
pub struct ServiceConfig {
pub name: String,
#[serde(rename = "type")]
#[allow(dead_code)]
pub service_type: String,
pub binds: Vec<BindEntry>,
#[serde(rename = "forward_to")]

View File

@@ -1,572 +0,0 @@
use crate::config::{Config, ServiceConfig};
use crate::core::forwarder;
use crate::core::upstream::UpstreamStream;
use crate::db::clickhouse::ClickHouseLogger;
use crate::protocol;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use tokio::signal;
use tracing::{error, info};
pub async fn run(config: Config) -> anyhow::Result<()> {
let db_logger = ClickHouseLogger::new(&config.database).map_err(|e| {
error!("database: {}", e);
e
})?;
let db = Arc::new(db_logger);
// init db table
if let Err(e) = db.init().await {
let msg = e.to_string();
if msg.len() > 200 {
error!("failed to init database: {}... (truncated)", &msg[..200]);
} else {
error!("failed to init database: {}", msg);
}
return Err(e);
}
let mut join_set = tokio::task::JoinSet::new();
let mut socket_guards = Vec::new();
for service in config.services {
let db = db.clone();
// Only support TCP service type for now, as per user instructions implied context
if service.service_type != "tcp" {
info!("skipping non-tcp service: {}", service.name);
continue;
}
for bind in &service.binds {
let service_config = service.clone();
let bind_addr = bind.addr.clone();
// proxy is now Option<String>
let proxy_proto_config = bind.proxy.clone();
let mode = bind.mode;
if bind_addr.starts_with("unix://") {
let path = bind_addr.trim_start_matches("unix://");
// bind_robust handles cleanup, existing file checks, and permission checks
let (listener, guard) = bind_robust(path, mode, &service_config.name).await?;
// Push guard to keep it alive until shutdown
socket_guards.push(guard);
info!(
"[{}] listening on unix {} (mode {:o})",
service_config.name, path, mode
);
join_set.spawn(start_unix_service(
service_config,
listener,
proxy_proto_config,
db.clone(),
bind.addr.clone(),
));
} else {
// BindType is removed, assume TCP bind for "tcp" service
let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
error!(
"[{}] failed to bind {}: {}",
service_config.name, bind_addr, e
);
e
})?;
info!("[{}] listening on tcp {}", service_config.name, bind_addr);
join_set.spawn(start_tcp_service(
service_config,
listener,
proxy_proto_config,
db.clone(),
bind.addr.clone(),
));
}
}
}
info!("traudit started...");
// notify systemd if NOTIFY_SOCKET is set
if let Ok(notify_socket) = std::env::var("NOTIFY_SOCKET") {
if let Ok(sock) = std::os::unix::net::UnixDatagram::unbound() {
if let Err(e) = sock.send_to(b"READY=1", notify_socket) {
error!("failed to notify systemd: {}", e);
}
}
}
match signal::ctrl_c().await {
Ok(()) => {
info!("shutdown signal received.");
}
Err(err) => {
error!("unable to listen for shutdown signal: {}", err);
}
}
join_set.shutdown().await;
// socket_guards are dropped here, cleaning up files
Ok(())
}
struct UnixSocketGuard {
path: std::path::PathBuf,
}
impl Drop for UnixSocketGuard {
fn drop(&mut self) {
if let Err(e) = std::fs::remove_file(&self.path) {
// It's possible the file is already gone or we lost permissions, just log debug.
tracing::debug!("failed to remove socket file {:?}: {}", self.path, e);
} else {
tracing::debug!("removed socket file {:?}", self.path);
}
}
}
async fn bind_robust(
path: &str,
mode: u32,
service_name: &str,
) -> anyhow::Result<(UnixListener, UnixSocketGuard)> {
let path_buf = std::path::Path::new(path).to_path_buf();
if path_buf.exists() {
// Check permissions first: if we cannot write to it, we certainly cannot remove it.
// metadata() follows symlinks, symlink_metadata() does not. Unix sockets are regular files-ish.
match std::fs::symlink_metadata(&path_buf) {
Ok(_meta) => {
// We rely on subsequent operations (connect/remove) to fail with PermissionDenied if we lack access.
}
Err(e) => {
if e.kind() == std::io::ErrorKind::PermissionDenied {
anyhow::bail!("Permission denied accessing existing socket: {}", path);
}
}
}
// Try to connect to check if it's active
match UnixStream::connect(&path_buf).await {
Ok(_) => {
// Active!
anyhow::bail!("Address already in use: {}", path);
}
Err(e) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
// Stale! Remove it.
info!("[{}] removing stale socket file: {}", service_name, path);
if let Err(rm_err) = std::fs::remove_file(&path_buf) {
anyhow::bail!("failed to remove stale socket {}: {}", path, rm_err);
}
}
Err(e) => {
// Other error (e.g. Permission Denied during connect?), bail
anyhow::bail!("failed to check existing socket {}: {}", path, e);
}
}
}
// Now bind
let listener = UnixListener::bind(&path_buf).map_err(|e| {
error!("[{}] failed to bind {}: {}", service_name, path, e);
e
})?;
// Set permissions
use std::os::unix::fs::PermissionsExt;
if let Ok(metadata) = std::fs::metadata(&path_buf) {
let mut permissions = metadata.permissions();
// Verify if we need to change it
if permissions.mode() & 0o777 != mode & 0o777 {
permissions.set_mode(mode);
if let Err(e) = std::fs::set_permissions(&path_buf, permissions) {
// This is not fatal but worth error log
error!(
"[{}] failed to set permissions on {}: {}",
service_name, path, e
);
}
}
}
Ok((listener, UnixSocketGuard { path: path_buf }))
}
async fn start_tcp_service(
service: ServiceConfig,
listener: TcpListener,
proxy_cfg: Option<String>,
db: Arc<ClickHouseLogger>,
listen_addr: String,
) {
// Startup liveness check
if let Err(e) = UpstreamStream::connect(&service.forward_to).await {
match e.kind() {
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
_ => {
// For other startup errors, we might want to warn or just debug, but let's stick to user request for WARNING
tracing::warn!(
"[{}] -> '{}': startup check failed: {}",
service.name,
service.forward_to,
e
);
}
}
}
loop {
match listener.accept().await {
Ok((inbound, _client_addr)) => {
let service = service.clone();
let db = db.clone();
let proxy_cfg = proxy_cfg.clone();
let listen_addr = listen_addr.clone();
tokio::spawn(async move {
let svc_name = service.name.clone();
let svc_target = service.forward_to.clone();
let inbound = InboundStream::Tcp(inbound);
if let Err(e) = handle_connection(inbound, service, proxy_cfg, db, listen_addr).await {
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
// normal disconnects, debug log only
tracing::debug!("connection closed: {}", e);
}
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
_ => {
error!("connection error: {}", e);
}
}
}
});
}
Err(e) => {
error!("accept error: {}", e);
}
}
}
}
async fn start_unix_service(
service: ServiceConfig,
listener: UnixListener,
proxy_cfg: Option<String>,
db: Arc<ClickHouseLogger>,
listen_addr: String,
) {
// Startup liveness check (same as TCP)
if let Err(e) = UpstreamStream::connect(&service.forward_to).await {
match e.kind() {
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
_ => {
tracing::warn!(
"[{}] -> '{}': startup check failed: {}",
service.name,
service.forward_to,
e
);
}
}
}
loop {
match listener.accept().await {
Ok((inbound, _addr)) => {
let service = service.clone();
let db = db.clone();
let proxy_cfg = proxy_cfg.clone();
let listen_addr = listen_addr.clone();
tokio::spawn(async move {
let svc_name = service.name.clone();
let svc_target = service.forward_to.clone();
let inbound = InboundStream::Unix(inbound);
if let Err(e) = handle_connection(inbound, service, proxy_cfg, db, listen_addr).await {
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
tracing::debug!("connection closed: {}", e);
}
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
_ => {
error!("connection error: {}", e);
}
}
}
});
}
Err(e) => {
error!("accept error: {}", e);
}
}
}
}
async fn handle_connection(
mut inbound: InboundStream,
service: ServiceConfig,
proxy_cfg: Option<String>,
db: Arc<ClickHouseLogger>,
listen_addr: String,
) -> std::io::Result<u64> {
let conn_ts = time::OffsetDateTime::now_utc();
let start_instant = std::time::Instant::now();
// Default metadata
// We use this flag to help decide addr_family logic later, or infer from inbound type
let is_unix = matches!(inbound, InboundStream::Unix(_));
let (mut final_ip, mut final_port) = match &inbound {
InboundStream::Tcp(s) => {
let addr = s.peer_addr()?;
(addr.ip(), addr.port())
}
InboundStream::Unix(_) => (
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
0,
),
};
let mut proto_enum = crate::db::clickhouse::ProxyProto::None;
let mut skip_log = false;
let result = async {
// read proxy protocol (if configured)
let mut buffer = bytes::BytesMut::new();
if proxy_cfg.is_some() {
// If configured, we attempt to read.
match protocol::read_proxy_header(&mut inbound).await {
Ok((proxy_info, buf)) => {
buffer = buf;
if let Some(info) = proxy_info {
let physical = inbound.peer_addr_string()?;
// Format: [ssh] unix://test.sock <- RealIP:Port (local)
// or [ssh] 0.0.0.0:2222 <- RealIP:Port (1.2.3.4:5678)
let physical_fmt = if matches!(inbound, InboundStream::Unix(_)) {
"local".to_string()
} else {
physical
};
info!(
"[{}] {} <- {} ({})",
service.name, listen_addr, info.source, physical_fmt
);
final_ip = info.source.ip();
final_port = info.source.port();
// Note: If we get proxy info, it's effectively "proxied TCP" usually.
// So we rely on the IP address family of final_ip later.
proto_enum = match info.version {
protocol::Version::V1 => crate::db::clickhouse::ProxyProto::V1,
protocol::Version::V2 => crate::db::clickhouse::ProxyProto::V2,
};
// Optional: verify version matches config if strictly required
if let Some(ref required_ver) = proxy_cfg {
match required_ver.as_str() {
"v1" if info.version != protocol::Version::V1 => {
// warn mismatch?
}
"v2" if info.version != protocol::Version::V2 => {
// warn mismatch?
}
_ => {}
}
}
} else {
// Strict enforcement: if configured with proxy_protocol, MUST have a header
let physical = inbound.peer_addr_string()?;
let msg = format!("strict proxy protocol violation from {}", physical);
error!("[{}] {}", service.name, msg);
skip_log = true;
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, msg));
}
}
Err(e) => {
skip_log = true;
return Err(e);
}
}
} else {
let addr = if matches!(inbound, InboundStream::Unix(_)) {
// [ssh] unix://test.sock <- local
"local".to_string()
} else {
inbound.peer_addr_string()?
};
info!("[{}] {} <- {}", service.name, listen_addr, addr);
}
// connect upstream
let mut upstream = UpstreamStream::connect(&service.forward_to).await?;
// write buffered data (peeked bytes)
if !buffer.is_empty() {
upstream.write_all_buf(&mut buffer).await?;
}
// zero-copy forwarding
let inbound_async = match inbound {
InboundStream::Tcp(s) => crate::core::upstream::AsyncStream::from_tokio_tcp(s)?,
InboundStream::Unix(s) => crate::core::upstream::AsyncStream::from_tokio_unix(s)?,
};
let upstream_async = upstream.into_async_stream()?;
let (spliced_bytes, splice_res) =
forwarder::zero_copy_bidirectional(inbound_async, upstream_async).await;
if let Err(e) = splice_res {
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
tracing::debug!("[{}] connection closed with error: {}", service.name, e);
}
_ => {
error!("[{}] connection error: {}", service.name, e);
}
}
} else {
// Clean close logging removed as per request
// info!("[{}] connection closed cleanly", service.name);
}
// Total bytes = initial peeked/buffered payload + filtered bytes
Ok(spliced_bytes + buffer.len() as u64)
}
.await;
let duration = if result.is_ok() {
start_instant.elapsed().as_millis() as u32
} else {
0
};
let bytes_transferred = result.as_ref().unwrap_or(&0).clone();
// Finalize AddrFamily based on final_ip
// But if it was originally Unix AND no proxy info changed the IP (so it's still 127.0.0.1?)
// Wait, if Unix without proxy, final_ip IS 127.0.0.1.
// We want AddrFamily::Unix (1) for proper unix socket.
// If Unix WITH proxy, final_ip is Real IP -> AddrFamily::Ipv4/6.
let mut addr_family = match final_ip {
std::net::IpAddr::V4(_) => crate::db::clickhouse::AddrFamily::Ipv4,
std::net::IpAddr::V6(_) => crate::db::clickhouse::AddrFamily::Ipv6,
};
if is_unix && proto_enum == crate::db::clickhouse::ProxyProto::None {
// Unix socket, direct connection (or no proxy header received)
addr_family = crate::db::clickhouse::AddrFamily::Unix;
// Store 0 (::)
final_ip = std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED);
final_port = 0;
}
let log_entry = crate::db::clickhouse::TcpLog {
service: service.name.clone(),
conn_ts,
duration: duration as u32,
addr_family,
ip: final_ip,
port: final_port,
proxy_proto: proto_enum,
bytes: bytes_transferred,
};
if !skip_log {
tokio::spawn(async move {
if let Err(e) = db.insert_log(log_entry).await {
error!("failed to insert tcp log: {}", e);
}
});
}
result
}
enum InboundStream {
Tcp(TcpStream),
Unix(UnixStream),
}
impl InboundStream {
fn peer_addr_string(&self) -> std::io::Result<String> {
match self {
InboundStream::Tcp(s) => Ok(s.peer_addr()?.to_string()),
InboundStream::Unix(_) => Ok("unix_socket".to_string()),
}
}
}
impl AsyncRead for InboundStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_read(cx, buf),
InboundStream::Unix(s) => Pin::new(s).poll_read(cx, buf),
}
}
}
impl AsyncWrite for InboundStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_write(cx, buf),
InboundStream::Unix(s) => Pin::new(s).poll_write(cx, buf),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_flush(cx),
InboundStream::Unix(s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_shutdown(cx),
InboundStream::Unix(s) => Pin::new(s).poll_shutdown(cx),
}
}
}

185
src/core/server/handler.rs Normal file
View File

@@ -0,0 +1,185 @@
use super::stream::InboundStream;
use crate::config::ServiceConfig;
use crate::core::forwarder;
use crate::core::upstream::UpstreamStream;
use crate::db::clickhouse::ClickHouseLogger;
use crate::protocol;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tracing::{error, info};
pub async fn handle_connection(
mut inbound: InboundStream,
service: ServiceConfig,
proxy_cfg: Option<String>,
db: Arc<ClickHouseLogger>,
listen_addr: String,
) -> std::io::Result<u64> {
let conn_ts = time::OffsetDateTime::now_utc();
let start_instant = std::time::Instant::now();
// Use this flag or inbound type to determine if it's a Unix socket
let is_unix = matches!(inbound, InboundStream::Unix(_));
let (mut final_ip, mut final_port) = match &inbound {
InboundStream::Tcp(s) => {
let addr = s.peer_addr()?;
(addr.ip(), addr.port())
}
InboundStream::Unix(_) => (
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
0,
),
};
let mut proto_enum = crate::db::clickhouse::ProxyProto::None;
let mut skip_log = false;
let result = async {
// read proxy protocol (if configured)
let mut buffer = bytes::BytesMut::new();
if proxy_cfg.is_some() {
// If configured, we attempt to read.
match protocol::read_proxy_header(&mut inbound).await {
Ok((proxy_info, buf)) => {
buffer = buf;
if let Some(info) = proxy_info {
let physical = inbound.peer_addr_string()?;
// Format: [ssh] unix://test.sock <- RealIP:Port (local) or [ssh] 0.0.0.0:2222 <- RealIP:Port (1.2.3.4:5678)
let physical_fmt = if matches!(inbound, InboundStream::Unix(_)) {
"local".to_string()
} else {
physical
};
info!(
"[{}] {} <- {} ({})",
service.name, listen_addr, info.source, physical_fmt
);
final_ip = info.source.ip();
final_port = info.source.port();
// Proxy info implies "proxied TCP" usually; rely on final_ip family later
proto_enum = match info.version {
protocol::Version::V1 => crate::db::clickhouse::ProxyProto::V1,
protocol::Version::V2 => crate::db::clickhouse::ProxyProto::V2,
};
// Verify version matches config if required
if let Some(ref required_ver) = proxy_cfg {
match required_ver.as_str() {
"v1" if info.version != protocol::Version::V1 => {
// warn mismatch?
}
"v2" if info.version != protocol::Version::V2 => {
// warn mismatch?
}
_ => {}
}
}
} else {
// Strict enforcement: config requires header
let physical = inbound.peer_addr_string()?;
let msg = format!("strict proxy protocol violation from {}", physical);
error!("[{}] {}", service.name, msg);
skip_log = true;
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, msg));
}
}
Err(e) => {
skip_log = true;
return Err(e);
}
}
} else {
let addr = if matches!(inbound, InboundStream::Unix(_)) {
// [ssh] unix://test.sock <- local
"local".to_string()
} else {
inbound.peer_addr_string()?
};
info!("[{}] {} <- {}", service.name, listen_addr, addr);
}
// connect upstream
let mut upstream = UpstreamStream::connect(&service.forward_to).await?;
// write buffered data (peeked bytes)
if !buffer.is_empty() {
upstream.write_all_buf(&mut buffer).await?;
}
// zero-copy forwarding
let inbound_async = match inbound {
InboundStream::Tcp(s) => crate::core::upstream::AsyncStream::from_tokio_tcp(s)?,
InboundStream::Unix(s) => crate::core::upstream::AsyncStream::from_tokio_unix(s)?,
};
let upstream_async = upstream.into_async_stream()?;
let (spliced_bytes, splice_res) =
forwarder::zero_copy_bidirectional(inbound_async, upstream_async).await;
if let Err(e) = splice_res {
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
tracing::debug!("[{}] connection closed with error: {}", service.name, e);
}
_ => {
error!("[{}] connection error: {}", service.name, e);
}
}
} else {
// Clean close logging removed
}
// Total bytes = initial peeked + filtered
Ok(spliced_bytes + buffer.len() as u64)
}
.await;
let duration = if result.is_ok() {
start_instant.elapsed().as_millis() as u32
} else {
0
};
let bytes_transferred = *result.as_ref().unwrap_or(&0);
// Finalize AddrFamily based on final_ip; Unix logic handled below
let mut addr_family = match final_ip {
std::net::IpAddr::V4(_) => crate::db::clickhouse::AddrFamily::Ipv4,
std::net::IpAddr::V6(_) => crate::db::clickhouse::AddrFamily::Ipv6,
};
if is_unix && proto_enum == crate::db::clickhouse::ProxyProto::None {
// Unix socket, direct connection (or no proxy header received)
addr_family = crate::db::clickhouse::AddrFamily::Unix;
// Store 0 (::)
final_ip = std::net::IpAddr::V6(std::net::Ipv6Addr::UNSPECIFIED);
final_port = 0;
}
let log_entry = crate::db::clickhouse::TcpLog {
service: service.name.clone(),
conn_ts,
duration: duration as u32,
addr_family,
ip: final_ip,
port: final_port,
proxy_proto: proto_enum,
bytes: bytes_transferred,
};
if !skip_log {
tokio::spawn(async move {
if let Err(e) = db.insert_log(log_entry).await {
error!("failed to insert tcp log: {}", e);
}
});
}
result
}

View File

@@ -0,0 +1,83 @@
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use tokio::net::{UnixListener, UnixStream};
use tracing::{error, info};
pub struct UnixSocketGuard {
pub path: PathBuf,
}
impl Drop for UnixSocketGuard {
fn drop(&mut self) {
if let Err(_e) = std::fs::remove_file(&self.path) {
// File potentially gone or no permissions, debug log only
} else {
tracing::debug!("removed socket file {:?}", self.path);
}
}
}
pub async fn bind_robust(
path: &str,
mode: u32,
service_name: &str,
) -> anyhow::Result<(UnixListener, UnixSocketGuard)> {
let path_buf = std::path::Path::new(path).to_path_buf();
if path_buf.exists() {
// Check permissions; we need write access to remove it
match std::fs::symlink_metadata(&path_buf) {
Ok(_meta) => {
// We rely on subsequent operations (connect/remove) to fail with PermissionDenied if we lack access.
}
Err(e) => {
if e.kind() == std::io::ErrorKind::PermissionDenied {
anyhow::bail!("Permission denied accessing existing socket: {}", path);
}
}
}
// Try to connect to check if it's active
match UnixStream::connect(&path_buf).await {
Ok(_) => {
// Active!
anyhow::bail!("Address already in use: {}", path);
}
Err(e) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
// Stale socket, remove it
info!("[{}] removing stale socket file: {}", service_name, path);
if let Err(rm_err) = std::fs::remove_file(&path_buf) {
anyhow::bail!("failed to remove stale socket {}: {}", path, rm_err);
}
}
Err(e) => {
// Other error, bail
anyhow::bail!("failed to check existing socket {}: {}", path, e);
}
}
}
// Now bind
let listener = UnixListener::bind(&path_buf).map_err(|e| {
error!("[{}] failed to bind {}: {}", service_name, path, e);
e
})?;
// Set permissions
if let Ok(metadata) = std::fs::metadata(&path_buf) {
let mut permissions = metadata.permissions();
// Verify if we need to change it
if permissions.mode() & 0o777 != mode & 0o777 {
permissions.set_mode(mode);
if let Err(e) = std::fs::set_permissions(&path_buf, permissions) {
// Non-fatal error, log only
error!(
"[{}] failed to set permissions on {}: {}",
service_name, path, e
);
}
}
}
Ok((listener, UnixSocketGuard { path: path_buf }))
}

250
src/core/server/mod.rs Normal file
View File

@@ -0,0 +1,250 @@
use crate::config::{Config, ServiceConfig};
use crate::core::upstream::UpstreamStream;
use crate::db::clickhouse::ClickHouseLogger;
use std::sync::Arc;
use tokio::net::{TcpListener, UnixListener};
use tokio::signal;
use tracing::{error, info};
mod handler;
mod listener;
mod stream;
use self::handler::handle_connection;
use self::listener::bind_robust;
use self::stream::InboundStream;
pub async fn run(config: Config) -> anyhow::Result<()> {
let db_logger = ClickHouseLogger::new(&config.database).map_err(|e| {
error!("database: {}", e);
e
})?;
let db = Arc::new(db_logger);
// init db table
if let Err(e) = db.init().await {
let msg = e.to_string();
if msg.len() > 200 {
error!("failed to init database: {}... (truncated)", &msg[..200]);
} else {
error!("failed to init database: {}", msg);
}
return Err(e);
}
let mut join_set = tokio::task::JoinSet::new();
let mut socket_guards = Vec::new();
for service in config.services {
let db = db.clone();
// Only support TCP, skipping others
for bind in &service.binds {
let service_config = service.clone();
let bind_addr = bind.addr.clone();
// proxy is Option<String>
let proxy_proto_config = bind.proxy.clone();
let mode = bind.mode;
if bind_addr.starts_with("unix://") {
let path = bind_addr.trim_start_matches("unix://");
// bind_robust handles cleanup and permission checks
let (listener, guard) = bind_robust(path, mode, &service_config.name).await?;
// Push guard to keep it alive
socket_guards.push(guard);
info!(
"[{}] listening on unix {} (mode {:o})",
service_config.name, path, mode
);
join_set.spawn(start_unix_service(
service_config,
listener,
proxy_proto_config,
db.clone(),
bind.addr.clone(),
));
} else {
// BindType removed, assume TCP bind
let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
error!(
"[{}] failed to bind {}: {}",
service_config.name, bind_addr, e
);
e
})?;
info!("[{}] listening on tcp {}", service_config.name, bind_addr);
join_set.spawn(start_tcp_service(
service_config,
listener,
proxy_proto_config,
db.clone(),
bind.addr.clone(),
));
}
}
}
info!("traudit started...");
// notify systemd if configured
if let Ok(notify_socket) = std::env::var("NOTIFY_SOCKET") {
if let Ok(sock) = std::os::unix::net::UnixDatagram::unbound() {
if let Err(e) = sock.send_to(b"READY=1", notify_socket) {
error!("failed to notify systemd: {}", e);
}
}
}
match signal::ctrl_c().await {
Ok(()) => {
info!("shutdown signal received.");
}
Err(err) => {
error!("unable to listen for shutdown signal: {}", err);
}
}
join_set.shutdown().await;
// socket_guards dropped here, cleaning up files
Ok(())
}
async fn start_tcp_service(
service: ServiceConfig,
listener: TcpListener,
proxy_cfg: Option<String>,
db: Arc<ClickHouseLogger>,
listen_addr: String,
) {
// Startup liveness check
if let Err(e) = UpstreamStream::connect(&service.forward_to).await {
match e.kind() {
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
_ => {
// Log other startup errors as warnings
tracing::warn!(
"[{}] -> '{}': startup check failed: {}",
service.name,
service.forward_to,
e
);
}
}
}
loop {
match listener.accept().await {
Ok((inbound, _client_addr)) => {
let service = service.clone();
let db = db.clone();
let proxy_cfg = proxy_cfg.clone();
let listen_addr = listen_addr.clone();
tokio::spawn(async move {
let svc_name = service.name.clone();
let svc_target = service.forward_to.clone();
let inbound = InboundStream::Tcp(inbound);
if let Err(e) = handle_connection(inbound, service, proxy_cfg, db, listen_addr).await {
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
// normal disconnects, debug log only
tracing::debug!("connection closed: {}", e);
}
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
_ => {
error!("connection error: {}", e);
}
}
}
});
}
Err(e) => {
error!("accept error: {}", e);
}
}
}
}
async fn start_unix_service(
service: ServiceConfig,
listener: UnixListener,
proxy_cfg: Option<String>,
db: Arc<ClickHouseLogger>,
listen_addr: String,
) {
// Startup liveness check (same as TCP)
if let Err(e) = UpstreamStream::connect(&service.forward_to).await {
match e.kind() {
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", service.name, service.forward_to, e);
}
_ => {
tracing::warn!(
"[{}] -> '{}': startup check failed: {}",
service.name,
service.forward_to,
e
);
}
}
}
loop {
match listener.accept().await {
Ok((inbound, _addr)) => {
let service = service.clone();
let db = db.clone();
let proxy_cfg = proxy_cfg.clone();
let listen_addr = listen_addr.clone();
tokio::spawn(async move {
let svc_name = service.name.clone();
let svc_target = service.forward_to.clone();
let inbound = InboundStream::Unix(inbound);
if let Err(e) = handle_connection(inbound, service, proxy_cfg, db, listen_addr).await {
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
tracing::debug!("connection closed: {}", e);
}
std::io::ErrorKind::ConnectionRefused => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
std::io::ErrorKind::NotFound => {
tracing::warn!("[{}] -> '{}': {}", svc_name, svc_target, e);
}
_ => {
error!("connection error: {}", e);
}
}
}
});
}
Err(e) => {
error!("accept error: {}", e);
}
}
}
}

58
src/core/server/stream.rs Normal file
View File

@@ -0,0 +1,58 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::{TcpStream, UnixStream};
pub enum InboundStream {
Tcp(TcpStream),
Unix(UnixStream),
}
impl InboundStream {
pub fn peer_addr_string(&self) -> std::io::Result<String> {
match self {
InboundStream::Tcp(s) => Ok(s.peer_addr()?.to_string()),
InboundStream::Unix(_) => Ok("unix_socket".to_string()),
}
}
}
impl AsyncRead for InboundStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_read(cx, buf),
InboundStream::Unix(s) => Pin::new(s).poll_read(cx, buf),
}
}
}
impl AsyncWrite for InboundStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_write(cx, buf),
InboundStream::Unix(s) => Pin::new(s).poll_write(cx, buf),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_flush(cx),
InboundStream::Unix(s) => Pin::new(s).poll_flush(cx),
}
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
match self.get_mut() {
InboundStream::Tcp(s) => Pin::new(s).poll_shutdown(cx),
InboundStream::Unix(s) => Pin::new(s).poll_shutdown(cx),
}
}
}

View File

@@ -226,7 +226,7 @@ impl ClickHouseLogger {
bytes: log.bytes,
};
let mut insert = self.client.insert("tcp_log")?;
let mut insert = self.client.insert::<TcpLogNew>("tcp_log").await?;
insert.write(&row).await?;
insert.end().await?;