feat: add support for http/tls traffic auditing via pingora

This commit is contained in:
2026-01-18 14:47:01 +08:00
parent 278b22e57e
commit d757a23c7a
10 changed files with 2161 additions and 93 deletions

View File

@@ -1,2 +1,2 @@
[build]
target = "x86_64-unknown-linux-musl"
# target = "x86_64-unknown-linux-musl"

1732
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[package]
name = "traudit"
version = "0.0.1"
version = "0.0.2"
edition = "2021"
authors = ["awfufu"]
description = "A reverse proxy that streams audit records directly to databases."
@@ -15,7 +15,7 @@ serde_yaml = "0.9"
socket2 = "0.6"
libc = "0.2"
tracing = "0.1"
tracing-subscriber = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
anyhow = "1.0"
thiserror = "2.0"
bytes = "1.1"
@@ -23,6 +23,7 @@ url = "2.5"
async-trait = "0.1"
time = { version = "0.3.45", features = ["serde", "macros", "formatting", "parsing"] }
serde_repr = "0.1.20"
pingora = { version = "0.6", features = ["lb", "openssl"] }
[dev-dependencies]
tempfile = "3"

View File

@@ -20,3 +20,19 @@ services:
- addr: "0.0.0.0:2222"
forward_to: "127.0.0.1:22"
- name: "https-web"
type: "tcp"
binds:
- addr: "0.0.0.0:443"
tls:
cert: "/etc/ssl/certs/site.pem"
key: "/etc/ssl/private/site.key"
- addr: "0.0.0.0:4433"
tls:
cert: "/etc/ssl/certs/site.pem"
key: "/etc/ssl/private/site.key"
proxy: "v2"
forward_to: "127.0.0.1:8080"

View File

@@ -48,6 +48,13 @@ pub struct BindEntry {
pub proxy: Option<String>,
#[serde(default = "default_socket_mode", deserialize_with = "deserialize_mode")]
pub mode: u32,
pub tls: Option<TlsConfig>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct TlsConfig {
pub cert: String,
pub key: Option<String>,
}
fn default_socket_mode() -> u32 {

View File

@@ -1,3 +1,4 @@
pub mod forwarder;
pub mod pingora_proxy;
pub mod server;
pub mod upstream;

174
src/core/pingora_proxy.rs Normal file
View File

@@ -0,0 +1,174 @@
use crate::config::ServiceConfig;
use crate::db::clickhouse::{ClickHouseLogger, HttpLog, HttpMethod};
use async_trait::async_trait;
use pingora::prelude::*;
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Instant;
pub struct TrauditProxy {
pub db: Arc<ClickHouseLogger>,
pub service_config: ServiceConfig,
}
pub struct HttpContext {
pub src_ip: IpAddr,
pub method: HttpMethod,
pub host: String,
pub path: String,
pub user_agent: String,
pub status_code: u16,
pub resp_body_size: u64,
pub req_body_size: u64,
pub start_ts: Option<Instant>,
}
#[async_trait]
impl ProxyHttp for TrauditProxy {
type CTX = HttpContext;
fn new_ctx(&self) -> Self::CTX {
HttpContext {
start_ts: None,
method: HttpMethod::Other,
path: String::new(),
host: String::new(),
src_ip: IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
user_agent: "unknown".to_string(),
status_code: 0,
resp_body_size: 0,
req_body_size: 0,
}
}
async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
// IP Priority: Proxy Protocol > XFF > X-Real-IP > Peer
let mut client_ip: Option<IpAddr> = session
.client_addr()
.and_then(|a| a.as_inet())
.map(|a| a.ip());
// Check headers for overrides
if let Some(xff) = session.req_header().headers.get("x-forwarded-for") {
if let Ok(xff_str) = xff.to_str() {
if let Some(first_ip) = xff_str.split(',').next() {
if let Ok(parsed_ip) = first_ip.trim().parse::<IpAddr>() {
client_ip = Some(parsed_ip); // Overwrite
}
}
}
}
if let Some(ip) = client_ip {
ctx.src_ip = ip;
} else {
// fallback to 0.0.0.0 if entirely missing (unlikely)
ctx.src_ip = IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0));
}
// 2. Audit Info
ctx.method = match session.req_header().method.as_str() {
"GET" => HttpMethod::Get,
"POST" => HttpMethod::Post,
"PUT" => HttpMethod::Put,
"DELETE" => HttpMethod::Delete,
"HEAD" => HttpMethod::Head,
"PATCH" => HttpMethod::Patch,
"OPTIONS" => HttpMethod::Options,
"CONNECT" => HttpMethod::Connect,
"TRACE" => HttpMethod::Trace,
_ => HttpMethod::Other,
};
ctx.path = session.req_header().uri.path().to_string();
ctx.host = session
.req_header()
.uri
.host()
.map(|h| h.to_string())
.unwrap_or_default();
if ctx.host.is_empty() {
if let Some(h) = session.req_header().headers.get("host") {
ctx.host = h.to_str().unwrap_or("").to_string();
}
}
ctx.user_agent = session
.req_header()
.headers
.get("user-agent")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
Ok(false) // false to continue processing
}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let addr = &self.service_config.forward_to;
let peer = Box::new(HttpPeer::new(addr, false, "".to_string()));
Ok(peer)
}
fn response_body_filter(
&self,
_session: &mut Session,
body: &mut Option<bytes::Bytes>,
_end_of_stream: bool,
ctx: &mut Self::CTX,
) -> Result<Option<std::time::Duration>> {
if let Some(b) = body {
ctx.resp_body_size += b.len() as u64;
}
Ok(None)
}
async fn logging(&self, session: &mut Session, _e: Option<&Error>, ctx: &mut Self::CTX) {
let duration = if let Some(start) = ctx.start_ts {
start.elapsed().as_millis() as u32
} else {
0
};
// Status code
if let Some(header) = session.response_written() {
ctx.status_code = header.status.as_u16();
}
// Bytes (resp_body_size accumulated in filter)
ctx.req_body_size = session.body_bytes_read() as u64;
let addr_family = if ctx.src_ip.is_ipv4() {
crate::db::clickhouse::AddrFamily::Ipv4
} else {
crate::db::clickhouse::AddrFamily::Ipv6
};
let log = HttpLog {
service: self.service_config.name.clone(),
conn_ts: time::OffsetDateTime::now_utc(),
duration,
addr_family,
ip: ctx.src_ip,
proxy_proto: crate::db::clickhouse::ProxyProto::None,
resp_body_size: ctx.resp_body_size,
req_body_size: ctx.req_body_size,
status_code: ctx.status_code,
method: ctx.method,
host: ctx.host.clone(),
path: ctx.path.clone(),
user_agent: ctx.user_agent.clone(),
};
let db = self.db.clone();
tokio::spawn(async move {
if let Err(e) = db.insert_http_log(log).await {
// log error
tracing::error!("failed to insert http log: {}", e);
}
});
}
}

View File

@@ -35,22 +35,30 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
let mut join_set = tokio::task::JoinSet::new();
let mut socket_guards = Vec::new();
// Pingora server initialization (TLS only)
let mut pingora_services = Vec::new();
for service in config.services {
let db = db.clone();
// Only support TCP, skipping others
for bind in &service.binds {
let service_config = service.clone();
let bind_addr = bind.addr.clone();
// proxy is Option<String>
let proxy_proto_config = bind.proxy.clone();
let mode = bind.mode;
// Check if this bind is TLS/Pingora managed
if let Some(tls_config) = &bind.tls {
// This is a Pingora service
pingora_services.push((service_config, bind.clone(), tls_config.clone()));
continue;
}
// Legacy TCP/Unix Logic
if bind_addr.starts_with("unix://") {
let path = bind_addr.trim_start_matches("unix://");
// bind_robust handles cleanup and permission checks
// Bind robustly
let (listener, guard) = bind_robust(path, mode, &service_config.name).await?;
// Push guard to keep it alive
@@ -69,7 +77,6 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
bind.addr.clone(),
));
} else {
// BindType removed, assume TCP bind
let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
error!(
"[{}] failed to bind {}: {}",
@@ -102,6 +109,52 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
}
}
// Run Pingora in a separate thread if needed
if !pingora_services.is_empty() {
info!(
"initializing pingora for {} tls services",
pingora_services.len()
);
// Spawn Pingora
std::thread::spawn(move || {
use crate::core::pingora_proxy::TrauditProxy;
use pingora::proxy::http_proxy_service;
use pingora::server::configuration::Opt;
use pingora::server::Server;
if let Err(e) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut server = Server::new(Some(Opt::default())).unwrap();
server.bootstrap();
for (svc_config, bind, tls) in pingora_services {
let proxy = TrauditProxy {
db: db.clone(),
service_config: svc_config.clone(),
};
let mut service = http_proxy_service(&server.configuration, proxy);
// Key path fallback
let key_path = tls.key.as_deref().unwrap_or(&tls.cert);
service.add_tls(&bind.addr, &tls.cert, key_path).unwrap();
info!("[{}] listening on tcp {}", svc_config.name, bind.addr);
server.add_service(service);
}
info!("starting pingora server run_forever loop");
server.run_forever();
})) {
error!("pingora server panicked: {:?}", e);
}
error!("pingora server exited unexpectedly!");
error!("pingora server exited unexpectedly!");
});
}
// Always wait for signals
match signal::ctrl_c().await {
Ok(()) => {
info!("shutdown signal received.");

View File

@@ -1,12 +1,13 @@
use crate::config::DatabaseConfig;
use clickhouse::{Client, Row};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use std::net::{IpAddr, Ipv6Addr};
use tracing::{error, info};
use tracing::info;
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr, PartialEq)]
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize_repr, Deserialize_repr)]
#[repr(i8)]
pub enum ProxyProto {
None = 0,
V1 = 1,
@@ -14,7 +15,7 @@ pub enum ProxyProto {
}
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
#[repr(i8)]
pub enum AddrFamily {
Unix = 1,
Ipv4 = 2,
@@ -46,6 +47,73 @@ struct TcpLogNew {
pub bytes: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "UPPERCASE")]
pub enum HttpMethod {
Other,
Get,
Post,
Put,
Delete,
Head,
Patch,
Options,
Connect,
Trace,
}
impl HttpMethod {
pub fn as_str(&self) -> &'static str {
match self {
Self::Get => "GET",
Self::Post => "POST",
Self::Put => "PUT",
Self::Delete => "DELETE",
Self::Head => "HEAD",
Self::Patch => "PATCH",
Self::Options => "OPTIONS",
Self::Connect => "CONNECT",
Self::Trace => "TRACE",
Self::Other => "OTHER",
}
}
}
#[derive(Debug, Clone)]
pub struct HttpLog {
pub service: String,
pub conn_ts: time::OffsetDateTime,
pub duration: u32,
pub addr_family: AddrFamily,
pub ip: IpAddr,
pub proxy_proto: ProxyProto,
pub resp_body_size: u64,
pub req_body_size: u64,
pub status_code: u16,
pub method: HttpMethod,
pub host: String,
pub path: String,
pub user_agent: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Row)]
struct HttpLogRow {
pub service: String,
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
pub conn_ts: time::OffsetDateTime,
pub duration: u32,
pub addr_family: AddrFamily,
pub ip: Ipv6Addr,
pub proxy_proto: ProxyProto,
pub resp_body_size: u64,
pub req_body_size: u64,
pub status_code: u16,
pub method: String,
pub host: String,
pub path: String,
pub user_agent: String,
}
pub struct ClickHouseLogger {
client: Client,
}
@@ -61,7 +129,7 @@ impl ClickHouseLogger {
.ok_or_else(|| anyhow::anyhow!("database name is required in dsn"))?
.to_string();
// Clear path from URL so client doesn't append it to requests
// Clear path so client doesn't append it
url.set_path("");
let mut client = Client::default().with_url(url.as_str());
@@ -82,11 +150,41 @@ impl ClickHouseLogger {
pub async fn init(&self) -> anyhow::Result<()> {
// Check migrations
self.check_migrations().await?;
self.ensure_http_table().await?;
info!("connected to database");
Ok(())
}
async fn ensure_http_table(&self) -> anyhow::Result<()> {
let sql_create = r#"
CREATE TABLE IF NOT EXISTS http_log (
service LowCardinality(String) CODEC(ZSTD(1)),
conn_ts DateTime64(3) CODEC(Delta, ZSTD(1)),
duration UInt32,
addr_family LowCardinality(String),
ip IPv6,
proxy_proto LowCardinality(String),
bytes_sent UInt64,
bytes_recv UInt64,
status_code UInt16,
method LowCardinality(String),
host LowCardinality(String),
path String CODEC(ZSTD(1)),
user_agent String CODEC(ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(conn_ts)
ORDER BY (conn_ts, service, host);
"#;
self
.client
.query(sql_create)
.execute()
.await
.map_err(|e| anyhow::anyhow!("failed to create http_log table: {}", e))?;
Ok(())
}
async fn check_migrations(&self) -> anyhow::Result<()> {
// Create migrations table
self
@@ -123,57 +221,45 @@ impl ClickHouseLogger {
.map(|r| (r.version, r.success == 1))
.unwrap_or_else(|| ("v0.0.0".to_string(), true));
if current_db_version == crate::VERSION && success {
// Consolidate everything to v0.0.2
let target_version = "v0.0.2";
if current_db_version == target_version && success {
return Ok(());
}
if !success {
error!(
"previous migration to {} failed. retrying...",
current_db_version
);
} else {
info!(
"migrating database from {} to {}",
current_db_version,
crate::VERSION
);
}
self.run_migrations(&current_db_version, success).await?;
info!(
"migrating database from {} to {}",
current_db_version, target_version
);
self.run_bootstrap_migration(target_version).await?;
Ok(())
}
async fn run_migrations(&self, from_version: &str, last_success: bool) -> anyhow::Result<()> {
if from_version < "v0.0.1" || (from_version == "v0.0.1" && !last_success) {
info!("applying migration v0.0.1...");
if let Err(e) = self.apply_v0_0_1().await {
error!("migration v0.0.1 failed: {}", e);
// Record failure
let _ = self
.client
.query("INSERT INTO db_migrations (version, success) VALUES (?, 0)")
.bind(crate::VERSION)
.execute()
.await;
return Err(e);
}
// Record success
self
.client
.query("INSERT INTO db_migrations (version, success) VALUES (?, 1)")
.bind(crate::VERSION)
.execute()
.await
.map_err(|e| anyhow::anyhow!("failed to record migration success: {}", e))?;
info!("migration v0.0.1 applied successfully");
}
Ok(())
}
async fn run_bootstrap_migration(&self, version: &str) -> anyhow::Result<()> {
info!("applying bootstrap migration {}...", version);
// Clean old tables
self
.client
.query("DROP TABLE IF EXISTS tcp_log")
.execute()
.await?;
self
.client
.query("DROP VIEW IF EXISTS tcp_log_view")
.execute()
.await?;
self
.client
.query("DROP TABLE IF EXISTS http_log")
.execute()
.await?;
async fn apply_v0_0_1(&self) -> anyhow::Result<()> {
// 1. Create table (tcp_log)
let sql_create = r#"
let sql_create_tcp = r#"
CREATE TABLE IF NOT EXISTS tcp_log (
service LowCardinality(String),
conn_ts DateTime64(3),
@@ -186,16 +272,16 @@ impl ClickHouseLogger {
) ENGINE = MergeTree()
ORDER BY (service, conn_ts);
"#;
self.client.query(sql_create).execute().await?;
self.client.query(sql_create_tcp).execute().await?;
// 2. Create View
let sql_view_refined = r#"
let sql_view_tcp = r#"
CREATE VIEW IF NOT EXISTS tcp_log_view AS
SELECT
service, conn_ts, duration, addr_family,
multiIf(
addr_family = 1, 'unix socket',
addr_family = 2, IPv4NumToString(toIPv4(ip)),
addr_family = 'unix', 'unix socket',
addr_family = 'ipv4', IPv4NumToString(toIPv4(ip)),
IPv6NumToString(ip)
) as ip_str,
port,
@@ -203,9 +289,40 @@ impl ClickHouseLogger {
formatReadableSize(bytes) AS traffic
FROM tcp_log
"#;
self.client.query(sql_view_tcp).execute().await?;
self.client.query(sql_view_refined).execute().await?;
// 3. Create table (http_log)
let sql_create_http = r#"
CREATE TABLE IF NOT EXISTS http_log (
service LowCardinality(String) CODEC(ZSTD(1)),
conn_ts DateTime64(3) CODEC(Delta, ZSTD(1)),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
resp_body_size UInt64,
req_body_size UInt64,
status_code UInt16,
method LowCardinality(String),
host LowCardinality(String),
path String CODEC(ZSTD(1)),
user_agent String CODEC(ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(conn_ts)
ORDER BY (conn_ts, service, host);
"#;
self.client.query(sql_create_http).execute().await?;
// Record success
self
.client
.query("INSERT INTO db_migrations (version, success) VALUES (?, 1)")
.bind(version)
.execute()
.await
.map_err(|e| anyhow::anyhow!("failed to record migration success: {}", e))?;
info!("bootstrap migration {} applied successfully", version);
Ok(())
}
@@ -232,4 +349,33 @@ impl ClickHouseLogger {
Ok(())
}
pub async fn insert_http_log(&self, log: HttpLog) -> anyhow::Result<()> {
let ipv6 = match log.ip {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
let row = HttpLogRow {
service: log.service,
conn_ts: log.conn_ts,
duration: log.duration,
addr_family: log.addr_family,
ip: ipv6,
proxy_proto: log.proxy_proto,
resp_body_size: log.resp_body_size,
req_body_size: log.req_body_size,
status_code: log.status_code,
method: log.method.as_str().to_string(),
host: log.host,
path: log.path,
user_agent: log.user_agent,
};
let mut insert = self.client.insert::<HttpLogRow>("http_log").await?;
insert.write(&row).await?;
insert.end().await?;
Ok(())
}
}

View File

@@ -28,7 +28,11 @@ fn print_help() {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,pingora=error".into());
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_target(false)
.with_thread_ids(false)
.with_file(false)