fix: resolve argument mismatch and optimize log structure

This commit is contained in:
2026-01-19 23:27:47 +08:00
parent f9ca59e14d
commit 22360750a1
5 changed files with 172 additions and 80 deletions

View File

@@ -143,54 +143,64 @@ impl ProxyHttp for TrauditProxy {
}
// Log connection info
let src_fmt = resolved_ip.to_string();
let physical_fmt = peer_addr.to_string();
let (physical_addr, proxy_info) = crate::core::server::context::CONNECTION_META
.try_with(|meta| (meta.physical_addr, meta.proxy_info.clone()))
.unwrap_or((std::net::SocketAddr::new(peer_addr, 0), None));
let physical_fmt = physical_addr.to_string();
if src_fmt == physical_fmt {
// If we stuck to physical, check if there was an XFF we ignored
let xff_msg = if let Some(xff) = session.req_header().headers.get("x-forwarded-for") {
if let Ok(v) = xff.to_str() {
// Only show if we actually have RealIpConfig that denied us
if let Some(cfg) = &self.real_ip {
if !cfg.is_trusted(peer_addr) {
format!("(untrusted) xff: {}", v)
} else {
"".to_string()
}
} else {
"".to_string()
}
} else {
"".to_string()
}
} else {
"".to_string()
let mut extras = Vec::new();
let is_untrusted = self
.real_ip
.as_ref()
.map_or(false, |cfg| !cfg.is_trusted(physical_addr.ip()));
if let Some(info) = proxy_info {
let v = match info.version {
crate::protocol::Version::V1 => "proxy.v1",
_ => "proxy.v2",
};
extras.push(format!("{}: {}", v, info.source));
}
if !xff_msg.is_empty() {
tracing::info!(
"[{}] {} <- {} {}",
self.service_config.name,
self.listen_addr,
src_fmt,
xff_msg
);
} else {
tracing::info!(
"[{}] {} <- {}",
self.service_config.name,
self.listen_addr,
src_fmt
);
if let Some(xff) = session.req_header().headers.get("x-forwarded-for") {
if let Ok(v) = xff.to_str() {
if resolved_ip != peer_addr {
extras.push(format!("xff: {}", resolved_ip));
} else if self
.real_ip
.as_ref()
.map_or(false, |cfg| !cfg.is_trusted(peer_addr))
{
extras.push(format!("xff: {}", v));
}
}
} else {
}
let mut extra_str = String::new();
if is_untrusted {
extra_str.push_str("(untrusted) ");
}
for (i, e) in extras.iter().enumerate() {
extra_str.push_str(&format!("({})", e));
if i < extras.len() - 1 {
extra_str.push_str(" ");
}
}
if extra_str.is_empty() {
tracing::info!(
"[{}] {} <- {} ({})",
"[{}] {} <- {}",
self.service_config.name,
self.listen_addr,
src_fmt,
physical_fmt
);
} else {
tracing::info!(
"[{}] {} <- {} {}",
self.service_config.name,
self.listen_addr,
physical_fmt,
extra_str
);
}
// 2. Audit Info

View File

@@ -0,0 +1,12 @@
use crate::protocol::ProxyInfo;
use std::net::SocketAddr;
#[derive(Clone, Debug)]
pub struct ConnectionMetadata {
pub physical_addr: SocketAddr,
pub proxy_info: Option<ProxyInfo>,
}
tokio::task_local! {
pub static CONNECTION_META: ConnectionMetadata;
}

View File

@@ -19,6 +19,8 @@ pub async fn handle_connection(
service: ServiceConfig,
db: Arc<ClickHouseLogger>,
listen_addr: String,
physical_addr: SocketAddr,
real_ip_config: Option<RealIpConfig>,
) -> std::io::Result<u64> {
let conn_ts = time::OffsetDateTime::now_utc();
let start_instant = std::time::Instant::now();
@@ -29,7 +31,6 @@ pub async fn handle_connection(
if let Some(pingora::protocols::l4::socket::SocketAddr::Inet(addr)) = d.peer_addr() {
(addr.ip(), addr.port())
} else {
// Should not match other types if logic is correct
(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), 0)
}
} else {
@@ -55,17 +56,53 @@ pub async fn handle_connection(
false
};
let remote_addr = if let Some(ref inbound) = inbound_enum {
match inbound {
InboundStream::Tcp(s) => s.peer_addr()?,
InboundStream::Unix(_) => {
SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), 0)
}
}
let src_fmt = if is_unix {
"local".to_string()
} else {
SocketAddr::new(final_ip, final_port)
physical_addr.to_string()
};
let mut extras = Vec::new();
let mut is_untrusted = false;
// Determine untrusted status based solely on RealIpConfig trust range
if let Some(ref cfg) = real_ip_config {
if !cfg.is_trusted(physical_addr.ip()) {
is_untrusted = true;
}
}
// If we have proxy info, we should show it.
if let Some(ref info) = proxy_info {
// Only show (untrusted) if we have proxy info and the source is not trusted
if is_untrusted {
extras.push("(untrusted)".to_string());
}
let helper_str;
let version_str = match info.version {
protocol::Version::V1 => "proxy.v1",
protocol::Version::V2 => "proxy.v2",
};
helper_str = format!("{}: {}", version_str, info.source);
extras.push(format!("({})", helper_str));
} else if is_untrusted && real_ip_config.is_some() {
}
let log_msg = if extras.is_empty() {
format!("[{}] {} <- {}", service.name, listen_addr, src_fmt)
} else {
format!(
"[{}] {} <- {} {}",
service.name,
listen_addr,
src_fmt,
extras.join(" ")
)
};
info!("{}", log_msg);
// skip redundant proxy/IP resolution (done by listener); determine ProxyProto for logging
let proto_enum = if let Some(ref info) = proxy_info {
match info.version {
@@ -76,27 +113,6 @@ pub async fn handle_connection(
ProxyProto::None
};
// Log connection info
let src_fmt = if is_unix && proto_enum == ProxyProto::None {
"local".to_string()
} else {
final_ip.to_string()
};
let physical_fmt = if is_unix {
"local".to_string()
} else {
remote_addr.to_string()
};
if src_fmt == physical_fmt {
info!("[{}] {} <- {}", service.name, listen_addr, src_fmt);
} else {
info!(
"[{}] {} <- {} ({})",
service.name, listen_addr, src_fmt, physical_fmt
);
}
// 3. Connect Upstream
let mut upstream = UpstreamStream::connect(&service.forward_to).await?;

View File

@@ -113,7 +113,7 @@ pub async fn serve_listener_loop<F, Fut>(
_shutdown: ShutdownWatch,
handler: F,
) where
F: Fn(UnifiedPingoraStream, Option<crate::protocol::ProxyInfo>) -> Fut
F: Fn(UnifiedPingoraStream, Option<crate::protocol::ProxyInfo>, std::net::SocketAddr) -> Fut
+ Send
+ Sync
+ 'static
@@ -228,7 +228,7 @@ pub async fn serve_listener_loop<F, Fut>(
};
// 5. Handler
handler(stream, proxy_info).await;
handler(stream, proxy_info, client_addr).await;
});
}
Err(e) => {

View File

@@ -7,6 +7,7 @@ use std::sync::{Arc, Barrier};
use tokio::signal;
use tracing::{error, info};
pub mod context;
pub mod handler;
mod listener;
mod pingora_compat;
@@ -107,15 +108,55 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
UnifiedListener::Tcp(_) => "tcp",
};
let mut extra_info = String::new();
if let Some(p) = &service_config.binds[0].proxy {
let v = match p.as_str() {
"v1" => "(proxy.v1)",
"v2" => "(proxy.v2)",
_ => "",
};
extra_info.push_str(v);
}
let proxy_tag = match proxy_proto_config.as_deref() {
Some("v1") => "(proxy.v1)",
Some("v2") => "(proxy.v2)",
_ => "",
};
let xff_tag = if real_ip_config
.as_ref()
.map(|c| c.source == crate::config::RealIpSource::Xff)
.unwrap_or(false)
{
"(xff)"
} else {
""
};
let mut tags = Vec::new();
if !proxy_tag.is_empty() {
tags.push(proxy_tag);
}
if !xff_tag.is_empty() {
tags.push(xff_tag);
}
let tag_str = if tags.is_empty() {
String::new()
} else {
format!(" {}", tags.join(" "))
};
if is_http_proxy {
info!(
"[{}] listening on http {} {} (PROXY support)",
service_config.name, listen_type, bind_addr
"[{}] listening on http {} {}{}",
service_config.name, listen_type, bind_addr, tag_str
);
} else {
info!(
"[{}] listening on {} {}",
service_config.name, listen_type, bind_addr
"[{}] listening on {} {}{}",
service_config.name, listen_type, bind_addr, tag_str
);
}
@@ -141,16 +182,19 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
join_set.spawn(serve_listener_loop(
listener,
service_config,
real_ip_config,
real_ip_config.clone(),
proxy_proto_config,
tls_acceptor,
shutdown_dummy,
move |stream, info| {
move |stream, info, client_addr| {
let db = db.clone();
let svc = svc_cfg.clone();
let addr = listen_addr_log.clone();
let real_ip_cloned = real_ip_config.clone();
async move {
if let Err(e) = handle_connection(stream, info, svc, db, addr).await {
if let Err(e) =
handle_connection(stream, info, svc, db, addr, client_addr, real_ip_cloned).await
{
match e.kind() {
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
tracing::debug!("connection closed: {}", e);
@@ -189,13 +233,23 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
proxy_proto_config,
tls_acceptor,
shutdown_dummy.clone(),
move |stream, _info| {
move |stream, info, client_addr| {
let app = app.clone();
let shutdown = shutdown_dummy.clone();
async move {
// stream is UnifiedPingoraStream
// Coerce to Box<dyn IO> (trait object implementation check)
app.process_new(Box::new(stream), &shutdown).await;
let meta = crate::core::server::context::ConnectionMetadata {
physical_addr: client_addr,
proxy_info: info,
};
crate::core::server::context::CONNECTION_META
.scope(meta, async move {
app.process_new(Box::new(stream), &shutdown).await;
})
.await;
}
},
));