feat: simplify connection logs with real IP support

This commit is contained in:
2026-01-15 13:43:05 +08:00
parent c7f9ccb0b2
commit daffc3664d
3 changed files with 120 additions and 43 deletions

View File

@@ -7,7 +7,7 @@ use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
use tokio::signal;
use tracing::{error, info, instrument};
use tracing::{error, info};
pub async fn run(config: Config) -> anyhow::Result<()> {
let db = Arc::new(ClickHouseLogger::new(&config.database));
@@ -17,32 +17,48 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
for service in config.services {
let db = db.clone();
for bind in &service.binds {
let service_config = service.clone(); // Clone for the task
let service_config = service.clone();
let bind_addr = bind.addr.clone();
let proxy_protocol = bind.proxy_protocol.is_some();
let bind_type = bind.bind_type;
if bind_type == BindType::Tcp {
let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
error!("[{}] failed to bind {}: {}", service_config.name, bind_addr, e);
e
})?;
info!("[{}] listening on tcp {}", service_config.name, bind_addr);
join_set.spawn(start_tcp_service(
service_config,
bind_addr,
listener,
proxy_protocol,
db.clone(),
));
} else {
info!("Skipping non-TCP bind for now: {:?}", bind_type);
info!("skipping non-tcp bind for now: {:?}", bind_type);
}
}
}
info!("Traudit started.");
info!("traudit started...");
// notify systemd if NOTIFY_SOCKET is set
if let Ok(notify_socket) = std::env::var("NOTIFY_SOCKET") {
if let Ok(sock) = std::os::unix::net::UnixDatagram::unbound() {
if let Err(e) = sock.send_to(b"READY=1", notify_socket) {
error!("failed to notify systemd: {}", e);
}
}
}
match signal::ctrl_c().await {
Ok(()) => {
info!("Shutdown signal received.");
info!("shutdown signal received.");
}
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
error!("unable to listen for shutdown signal: {}", err);
}
}
@@ -53,64 +69,72 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
async fn start_tcp_service(
service: ServiceConfig,
addr: String,
listener: TcpListener,
proxy_protocol: bool,
_db: Arc<ClickHouseLogger>,
) {
info!("Service {} listening on TCP {}", service.name, addr);
let listener = match TcpListener::bind(&addr).await {
Ok(l) => l,
Err(e) => {
error!("Failed to bind {}: {}", addr, e);
return;
}
};
loop {
match listener.accept().await {
Ok((mut inbound, client_addr)) => {
info!("New connection from {}", client_addr);
Ok((inbound, _client_addr)) => {
// log moved to handle_connection for consistent real ip logging
let service = service.clone();
// let db = _db.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(inbound, service, proxy_protocol).await {
error!("Connection error: {}", e);
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
// normal disconnects, debug log only
tracing::debug!("connection closed: {}", e);
}
_ => {
error!("connection error: {}", e);
}
}
}
});
}
Err(e) => {
error!("Accept error: {}", e);
error!("accept error: {}", e);
}
}
}
}
#[instrument(skip(inbound, service), fields(service = %service.name))]
async fn handle_connection(
mut inbound: tokio::net::TcpStream,
service: ServiceConfig,
proxy_protocol: bool,
) -> std::io::Result<()> {
// 1. Read Proxy Protocol (if configured)
let mut buffer = if proxy_protocol {
let (_proxy_info, buffer) = protocol::read_proxy_header(&mut inbound).await?;
buffer
// read proxy protocol (if configured)
let (_client_addr, mut buffer) = if proxy_protocol {
let (proxy_info, buffer) = protocol::read_proxy_header(&mut inbound).await?;
if let Some(info) = proxy_info {
let physical = inbound.peer_addr()?;
info!("[{}] <- {} ({})", service.name, info.source, physical);
(info.source, buffer)
} else {
let addr = inbound.peer_addr()?;
info!("[{}] <- {}", service.name, addr);
(addr, buffer)
}
} else {
bytes::BytesMut::new()
let addr = inbound.peer_addr()?;
info!("[{}] <- {}", service.name, addr);
(addr, bytes::BytesMut::new())
};
// 2. Connect Upstream
// connect upstream
let mut upstream = UpstreamStream::connect(service.forward_type, &service.forward_addr).await?;
// 3. Forward Header (TODO: if configured)
// forward header (TODO: if configured)
// 4. Write buffered data (peeked bytes)
// write buffered data (peeked bytes)
if !buffer.is_empty() {
upstream.write_all_buf(&mut buffer).await?;
}
// 5. Zero-copy forwarding
// zero-copy forwarding
let inbound_async = crate::core::upstream::AsyncStream::from_tokio_tcp(inbound)?;
let upstream_async = upstream.into_async_stream()?;

View File

@@ -4,25 +4,78 @@ mod db;
mod protocol;
use crate::config::Config;
use anyhow::bail;
use std::env;
use std::path::Path;
use tracing::{error, info};
fn print_help() {
println!("traudit - a reverse proxy with auditing capabilities");
println!();
println!("usage:");
println!(" traudit -f <config_file>");
println!();
println!("options:");
println!(" -f <config_file> path to the yaml configuration file");
println!(" -h, --help print this help message");
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args: Vec<String> = env::args().collect();
let config_path = args.get(1).map(|s| s.as_str()).unwrap_or("config.yaml");
println!("Loading config from {}", config_path);
let mut config_path = None;
// Check if config exists, if not warn (for dev purposes)
if !std::path::Path::new(config_path).exists() {
println!("Warning: Config file '{}' not found.", config_path);
// In a real run we might want to exit, but for init check we proceed or return
} else {
let config = Config::load(config_path).await?;
core::server::run(config).await?;
let mut i = 1;
while i < args.len() {
match args[i].as_str() {
"-f" => {
if i + 1 < args.len() {
config_path = Some(args[i + 1].clone());
i += 2;
} else {
bail!("missing value for -f");
}
}
"-h" | "--help" => {
print_help();
return Ok(());
}
_ => {
bail!("unknown argument: {}\n\nuse -h for help", args[i]);
}
}
}
let config_path = match config_path {
Some(p) => {
let path = Path::new(&p);
if !path.exists() {
bail!("config file '{}' not found", p);
}
std::fs::canonicalize(path)?
}
None => {
print_help();
return Ok(());
}
};
tracing_subscriber::fmt()
.with_target(false)
.with_thread_ids(false)
.with_file(false)
.with_line_number(false)
.init();
info!("loading config from {}", config_path.display());
let config = Config::load(&config_path).await.map_err(|e| {
error!("failed to load config: {}", e);
e
})?;
core::server::run(config).await?;
Ok(())
}

View File

@@ -1,4 +1,4 @@
use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use tokio::io::{AsyncRead, AsyncReadExt};