feat: upgrade to v0.0.3 with split traffic metrics and refactored migration logic

This commit is contained in:
2026-01-19 11:14:17 +08:00
parent dd72f152a6
commit bfd61867c3
6 changed files with 248 additions and 130 deletions

2
Cargo.lock generated
View File

@@ -2489,7 +2489,7 @@ dependencies = [
[[package]]
name = "traudit"
version = "0.0.2"
version = "0.0.3"
dependencies = [
"anyhow",
"async-trait",

View File

@@ -1,6 +1,6 @@
[package]
name = "traudit"
version = "0.0.2"
version = "0.0.3"
edition = "2021"
authors = ["awfufu"]
description = "A reverse proxy that streams audit records directly to databases."

View File

@@ -56,7 +56,7 @@ async fn splice_loop(read: &AsyncStream, write: &AsyncStream) -> (u64, io::Resul
pub async fn zero_copy_bidirectional(
inbound: AsyncStream,
outbound: AsyncStream,
) -> (u64, io::Result<()>) {
) -> ((u64, u64), io::Result<()>) {
// We own the streams now, so we can split references to them for the join.
let ((c2s_bytes, c2s_res), (s2c_bytes, s2c_res)) = tokio::join!(
async {
@@ -71,15 +71,17 @@ pub async fn zero_copy_bidirectional(
}
);
let total = c2s_bytes + s2c_bytes;
// s2c = sent (upstream -> client)
// c2s = recv (client -> upstream)
let metrics = (s2c_bytes, c2s_bytes);
// Prefer returning the first error encountered, but prioritize keeping the bytes
if let Err(e) = c2s_res {
return (total, Err(e));
return (metrics, Err(e));
}
if let Err(e) = s2c_res {
return (total, Err(e));
return (metrics, Err(e));
}
(total, Ok(()))
(metrics, Ok(()))
}

View File

@@ -91,7 +91,7 @@ pub async fn handle_connection(
};
let upstream_async = upstream.into_async_stream()?;
let (spliced_bytes, splice_res) =
let ((s2c_bytes, c2s_bytes), splice_res) =
forwarder::zero_copy_bidirectional(inbound_async, upstream_async).await;
if let Err(e) = splice_res {
@@ -104,7 +104,9 @@ pub async fn handle_connection(
}
// Calculate total bytes
let total_bytes = spliced_bytes + read_buffer.len() as u64;
let bytes_sent = s2c_bytes;
let bytes_recv = c2s_bytes + read_buffer.len() as u64;
let total_bytes = bytes_sent + bytes_recv;
// Logging logic
let duration = start_instant.elapsed().as_millis() as u32;
@@ -133,6 +135,8 @@ pub async fn handle_connection(
port: log_port,
proxy_proto: proto_enum,
bytes: total_bytes,
bytes_sent,
bytes_recv,
};
tokio::spawn(async move {

View File

@@ -0,0 +1,224 @@
use anyhow::Result;
use clickhouse::Client;
use tracing::info;
pub const VERSION_LIST: &[&str] = &[
"v0.0.2", // add http_log table
"v0.0.3", // add bytes_sent/bytes_recv for tcp_log
];
pub struct Migrator {
client: Client,
}
impl Migrator {
pub fn new(client: Client) -> Self {
Self { client }
}
pub async fn run(&self) -> Result<()> {
self.ensure_migration_table().await?;
let current_version = self.get_current_version().await?;
let target_version = VERSION_LIST.last().unwrap();
if current_version.is_none() {
// Fresh install: Bootstrap directly to latest
info!(
"fresh install detected, bootstrapping to {}",
target_version
);
self.bootstrap_latest().await?;
self.record_migration(target_version).await?;
return Ok(());
}
let current_ver_str = current_version.unwrap();
// Iterative upgrade
for &version in VERSION_LIST {
if version > current_ver_str.as_str() {
info!("migrating database from {} to {}", current_ver_str, version);
match version {
"v0.0.2" => self.migrate_v0_0_2().await?,
"v0.0.3" => self.migrate_v0_0_3().await?,
_ => {}
}
self.record_migration(version).await?;
}
}
Ok(())
}
async fn ensure_migration_table(&self) -> Result<()> {
self
.client
.query(
"CREATE TABLE IF NOT EXISTS db_migrations (
version String,
success UInt8,
apply_ts DateTime64 DEFAULT now()
) ENGINE = ReplacingMergeTree(apply_ts)
ORDER BY version",
)
.execute()
.await
.map_err(|e| anyhow::anyhow!("failed to create migrations table: {}", e))
}
async fn get_current_version(&self) -> Result<Option<String>> {
#[derive(clickhouse::Row, serde::Deserialize)]
struct MigrationRow {
version: String,
success: u8,
}
let row = self
.client
.query("SELECT version, success FROM db_migrations ORDER BY apply_ts DESC LIMIT 1")
.fetch_optional::<MigrationRow>()
.await
.map_err(|e| anyhow::anyhow!("failed to fetch version: {}", e))?;
if let Some(r) = row {
if r.success == 1 {
return Ok(Some(r.version));
}
}
Ok(None)
}
async fn record_migration(&self, version: &str) -> Result<()> {
self
.client
.query("INSERT INTO db_migrations (version, success) VALUES (?, 1)")
.bind(version)
.execute()
.await
.map_err(|e| anyhow::anyhow!("failed to record migration: {}", e))
}
// ==========================================
// Bootstrap (Create Latest Schema directly)
// ==========================================
async fn bootstrap_latest(&self) -> Result<()> {
// v0.0.3 Schema
let sql_create_tcp = r#"
CREATE TABLE IF NOT EXISTS tcp_log (
service LowCardinality(String),
conn_ts DateTime64(3),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
port UInt16,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
bytes UInt64,
bytes_sent UInt64,
bytes_recv UInt64
) ENGINE = MergeTree()
ORDER BY (service, conn_ts);
"#;
self.client.query(sql_create_tcp).execute().await?;
let sql_view_tcp = r#"
CREATE OR REPLACE VIEW tcp_log_view AS
SELECT
service, conn_ts, duration, addr_family,
multiIf(
addr_family = 'unix', 'unix socket',
addr_family = 'ipv4', IPv4NumToString(toIPv4(ip)),
IPv6NumToString(ip)
) as ip_str,
port,
proxy_proto,
formatReadableSize(bytes) AS traffic,
formatReadableSize(bytes_sent) AS traffic_sent,
formatReadableSize(bytes_recv) AS traffic_recv
FROM tcp_log
"#;
self.client.query(sql_view_tcp).execute().await?;
// HTTP Table (Unchanged from v0.0.2 plan but good to include)
let sql_create_http = r#"
CREATE TABLE IF NOT EXISTS http_log (
service LowCardinality(String) CODEC(ZSTD(1)),
conn_ts DateTime64(3) CODEC(Delta, ZSTD(1)),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
resp_body_size UInt64,
req_body_size UInt64,
status_code UInt16,
method LowCardinality(String),
host LowCardinality(String),
path String CODEC(ZSTD(1)),
user_agent String CODEC(ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(conn_ts)
ORDER BY (conn_ts, service, host);
"#;
self.client.query(sql_create_http).execute().await?;
Ok(())
}
// Individual Migrations
async fn migrate_v0_0_2(&self) -> Result<()> {
// Baseline schema (tcp_log, http_log)
let sql_create_tcp = r#"
CREATE TABLE IF NOT EXISTS tcp_log (
service LowCardinality(String),
conn_ts DateTime64(3),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
port UInt16,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
bytes UInt64
) ENGINE = MergeTree()
ORDER BY (service, conn_ts);
"#;
self.client.query(sql_create_tcp).execute().await?;
// ... HTTP table etc normally goes here
Ok(())
}
async fn migrate_v0_0_3(&self) -> Result<()> {
// ADD columns
let _ = self
.client
.query("ALTER TABLE tcp_log ADD COLUMN IF NOT EXISTS bytes_sent UInt64 DEFAULT 0")
.execute()
.await;
let _ = self
.client
.query("ALTER TABLE tcp_log ADD COLUMN IF NOT EXISTS bytes_recv UInt64 DEFAULT 0")
.execute()
.await;
// Update View
let sql_view_tcp = r#"
CREATE OR REPLACE VIEW tcp_log_view AS
SELECT
service, conn_ts, duration, addr_family,
multiIf(
addr_family = 'unix', 'unix socket',
addr_family = 'ipv4', IPv4NumToString(toIPv4(ip)),
IPv6NumToString(ip)
) as ip_str,
port,
proxy_proto,
formatReadableSize(bytes) AS traffic,
formatReadableSize(bytes_sent) AS traffic_sent,
formatReadableSize(bytes_recv) AS traffic_recv
FROM tcp_log
"#;
self.client.query(sql_view_tcp).execute().await?;
Ok(())
}
}

View File

@@ -1,5 +1,6 @@
use crate::config::DatabaseConfig;
use clickhouse::{Client, Row};
mod migration;
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
@@ -32,6 +33,8 @@ pub struct TcpLog {
pub port: u16,
pub proxy_proto: ProxyProto,
pub bytes: u64,
pub bytes_sent: u64,
pub bytes_recv: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Row)]
@@ -45,6 +48,8 @@ struct TcpLogNew {
pub port: u16,
pub proxy_proto: ProxyProto,
pub bytes: u64,
pub bytes_sent: u64,
pub bytes_recv: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
@@ -186,127 +191,8 @@ impl ClickHouseLogger {
}
async fn check_migrations(&self) -> anyhow::Result<()> {
// Create migrations table
self
.client
.query(
"
CREATE TABLE IF NOT EXISTS db_migrations (
version String,
success UInt8,
apply_ts DateTime64 DEFAULT now()
) ENGINE = ReplacingMergeTree(apply_ts)
ORDER BY version
",
)
.execute()
.await
.map_err(|e| anyhow::anyhow!("failed to create migrations table: {}", e))?;
// Get current DB version
#[derive(Row, Deserialize)]
struct MigrationRow {
version: String,
success: u8,
}
let last_migration = self
.client
.query("SELECT version, success FROM db_migrations ORDER BY apply_ts DESC LIMIT 1")
.fetch_optional::<MigrationRow>()
.await
.map_err(|e| anyhow::anyhow!("failed to fetch last migration: {}", e))?;
let (current_db_version, success) = last_migration
.map(|r| (r.version, r.success == 1))
.unwrap_or_else(|| ("v0.0.0".to_string(), true));
// Consolidate everything to v0.0.2
let target_version = "v0.0.2";
if current_db_version == target_version && success {
return Ok(());
}
info!(
"migrating database from {} to {}",
current_db_version, target_version
);
self.run_bootstrap_migration(target_version).await?;
Ok(())
}
async fn run_bootstrap_migration(&self, version: &str) -> anyhow::Result<()> {
info!("applying bootstrap migration {}...", version);
// 1. Create table (tcp_log)
let sql_create_tcp = r#"
CREATE TABLE IF NOT EXISTS tcp_log (
service LowCardinality(String),
conn_ts DateTime64(3),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
port UInt16,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
bytes UInt64
) ENGINE = MergeTree()
ORDER BY (service, conn_ts);
"#;
self.client.query(sql_create_tcp).execute().await?;
// 2. Create View
let sql_view_tcp = r#"
CREATE OR REPLACE VIEW tcp_log_view AS
SELECT
service, conn_ts, duration, addr_family,
multiIf(
addr_family = 'unix', 'unix socket',
addr_family = 'ipv4', IPv4NumToString(toIPv4(ip)),
IPv6NumToString(ip)
) as ip_str,
port,
proxy_proto,
formatReadableSize(bytes) AS traffic
FROM tcp_log
"#;
self.client.query(sql_view_tcp).execute().await?;
// 3. Create table (http_log)
let sql_create_http = r#"
CREATE TABLE IF NOT EXISTS http_log (
service LowCardinality(String) CODEC(ZSTD(1)),
conn_ts DateTime64(3) CODEC(Delta, ZSTD(1)),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
resp_body_size UInt64,
req_body_size UInt64,
status_code UInt16,
method LowCardinality(String),
host LowCardinality(String),
path String CODEC(ZSTD(1)),
user_agent String CODEC(ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(conn_ts)
ORDER BY (conn_ts, service, host);
"#;
self.client.query(sql_create_http).execute().await?;
// Record success
self
.client
.query("INSERT INTO db_migrations (version, success) VALUES (?, 1)")
.bind(version)
.execute()
.await
.map_err(|e| anyhow::anyhow!("failed to record migration success: {}", e))?;
info!("bootstrap migration {} applied successfully", version);
Ok(())
let migrator = migration::Migrator::new(self.client.clone());
migrator.run().await
}
pub async fn insert_log(&self, log: TcpLog) -> anyhow::Result<()> {
@@ -324,6 +210,8 @@ impl ClickHouseLogger {
port: log.port,
proxy_proto: log.proxy_proto,
bytes: log.bytes,
bytes_sent: log.bytes_sent,
bytes_recv: log.bytes_recv,
};
let mut insert = self.client.insert::<TcpLogNew>("tcp_log").await?;