feat: add http_log_view for human-readable HTTP logs and refactor migration logic

This commit is contained in:
2026-01-19 11:35:58 +08:00
parent bfd61867c3
commit 967a22bf0f

View File

@@ -103,68 +103,16 @@ impl Migrator {
// Bootstrap (Create Latest Schema directly)
// ==========================================
async fn bootstrap_latest(&self) -> Result<()> {
// v0.0.3 Schema
let sql_create_tcp = r#"
CREATE TABLE IF NOT EXISTS tcp_log (
service LowCardinality(String),
conn_ts DateTime64(3),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
port UInt16,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
bytes UInt64,
bytes_sent UInt64,
bytes_recv UInt64
) ENGINE = MergeTree()
ORDER BY (service, conn_ts);
"#;
self.client.query(sql_create_tcp).execute().await?;
let sql_view_tcp = r#"
CREATE OR REPLACE VIEW tcp_log_view AS
SELECT
service, conn_ts, duration, addr_family,
multiIf(
addr_family = 'unix', 'unix socket',
addr_family = 'ipv4', IPv4NumToString(toIPv4(ip)),
IPv6NumToString(ip)
) as ip_str,
port,
proxy_proto,
formatReadableSize(bytes) AS traffic,
formatReadableSize(bytes_sent) AS traffic_sent,
formatReadableSize(bytes_recv) AS traffic_recv
FROM tcp_log
"#;
self.client.query(sql_view_tcp).execute().await?;
// HTTP Table (Unchanged from v0.0.2 plan but good to include)
let sql_create_http = r#"
CREATE TABLE IF NOT EXISTS http_log (
service LowCardinality(String) CODEC(ZSTD(1)),
conn_ts DateTime64(3) CODEC(Delta, ZSTD(1)),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
resp_body_size UInt64,
req_body_size UInt64,
status_code UInt16,
method LowCardinality(String),
host LowCardinality(String),
path String CODEC(ZSTD(1)),
user_agent String CODEC(ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(conn_ts)
ORDER BY (conn_ts, service, host);
"#;
self.client.query(sql_create_http).execute().await?;
self.create_table_tcp_log_v3().await?;
self.create_view_tcp_log_v3().await?;
self.create_table_http_log().await?;
self.create_view_http_log().await?;
Ok(())
}
// ==========================================
// Individual Migrations
// ==========================================
async fn migrate_v0_0_2(&self) -> Result<()> {
// Baseline schema (tcp_log, http_log)
@@ -183,7 +131,9 @@ impl Migrator {
"#;
self.client.query(sql_create_tcp).execute().await?;
// ... HTTP table etc normally goes here
// v0.0.2 introduced http_log as well
self.create_table_http_log().await?;
Ok(())
}
@@ -200,8 +150,39 @@ impl Migrator {
.execute()
.await;
// Update View
let sql_view_tcp = r#"
// Update Views
self.create_view_tcp_log_v3().await?;
self.create_view_http_log().await?;
Ok(())
}
// ==========================================
// Helpers
// ==========================================
async fn create_table_tcp_log_v3(&self) -> Result<()> {
let sql = r#"
CREATE TABLE IF NOT EXISTS tcp_log (
service LowCardinality(String),
conn_ts DateTime64(3),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
port UInt16,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
bytes UInt64,
bytes_sent UInt64,
bytes_recv UInt64
) ENGINE = MergeTree()
ORDER BY (service, conn_ts);
"#;
self.client.query(sql).execute().await?;
Ok(())
}
async fn create_view_tcp_log_v3(&self) -> Result<()> {
let sql = r#"
CREATE OR REPLACE VIEW tcp_log_view AS
SELECT
service, conn_ts, duration, addr_family,
@@ -216,9 +197,56 @@ impl Migrator {
formatReadableSize(bytes_sent) AS traffic_sent,
formatReadableSize(bytes_recv) AS traffic_recv
FROM tcp_log
"#;
self.client.query(sql_view_tcp).execute().await?;
"#;
self.client.query(sql).execute().await?;
Ok(())
}
async fn create_table_http_log(&self) -> Result<()> {
let sql = r#"
CREATE TABLE IF NOT EXISTS http_log (
service LowCardinality(String) CODEC(ZSTD(1)),
conn_ts DateTime64(3) CODEC(Delta, ZSTD(1)),
duration UInt32,
addr_family Enum8('unix'=1, 'ipv4'=2, 'ipv6'=10),
ip IPv6,
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
resp_body_size UInt64,
req_body_size UInt64,
status_code UInt16,
method LowCardinality(String),
host LowCardinality(String),
path String CODEC(ZSTD(1)),
user_agent String CODEC(ZSTD(1))
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(conn_ts)
ORDER BY (conn_ts, service, host);
"#;
self.client.query(sql).execute().await?;
Ok(())
}
async fn create_view_http_log(&self) -> Result<()> {
let sql = r#"
CREATE OR REPLACE VIEW http_log_view AS
SELECT
service, conn_ts, duration, addr_family,
multiIf(
addr_family = 'unix', 'unix socket',
addr_family = 'ipv4', IPv4NumToString(toIPv4(ip)),
IPv6NumToString(ip)
) as ip_str,
proxy_proto,
formatReadableSize(req_body_size) AS req_traffic,
formatReadableSize(resp_body_size) AS resp_traffic,
status_code,
method,
host,
path,
user_agent
FROM http_log
"#;
self.client.query(sql).execute().await?;
Ok(())
}
}