From 40d3156e888df9b5e29b4bd18cfa71079d619d59 Mon Sep 17 00:00:00 2001 From: awfufu Date: Fri, 16 Jan 2026 10:21:21 +0800 Subject: [PATCH] refactor: optimize migration logic with column auto-discovery and stateful view creation --- src/db/clickhouse.rs | 84 +++++++++++++------------------------------- 1 file changed, 25 insertions(+), 59 deletions(-) diff --git a/src/db/clickhouse.rs b/src/db/clickhouse.rs index 5e6584d..592e328 100644 --- a/src/db/clickhouse.rs +++ b/src/db/clickhouse.rs @@ -122,11 +122,9 @@ impl ClickHouseLogger { table_v6 ); - let drop_view = format!("DROP VIEW IF EXISTS {}", view_name); - let sql_view = format!( r#" - CREATE VIEW {} AS + CREATE VIEW IF NOT EXISTS {} AS SELECT service, conn_ts, duration, port, IPv4NumToString(ip) AS ip_str, @@ -151,32 +149,6 @@ impl ClickHouseLogger { .await .map_err(|e| anyhow::anyhow!("failed to create v4 table: {}", e))?; - // Migrations - let _ = self - .client - .query(&format!( - "ALTER TABLE {} RENAME COLUMN IF EXISTS bytes_transferred TO bytes", - table_v4 - )) - .execute() - .await; - let _ = self - .client - .query(&format!( - "ALTER TABLE {} RENAME COLUMN IF EXISTS traffic TO bytes", - table_v4 - )) - .execute() - .await; - let _ = self - .client - .query(&format!( - "ALTER TABLE {} ADD COLUMN IF NOT EXISTS bytes UInt64", - table_v4 - )) - .execute() - .await; - self .client .query(&sql_v6) @@ -184,37 +156,31 @@ impl ClickHouseLogger { .await .map_err(|e| anyhow::anyhow!("failed to create v6 table: {}", e))?; - let _ = self - .client - .query(&format!( - "ALTER TABLE {} RENAME COLUMN IF EXISTS bytes_transferred TO bytes", - table_v6 - )) - .execute() - .await; - let _ = self - .client - .query(&format!( - "ALTER TABLE {} RENAME COLUMN IF EXISTS traffic TO bytes", - table_v6 - )) - .execute() - .await; - let _ = self - .client - .query(&format!( - "ALTER TABLE {} ADD COLUMN IF NOT EXISTS bytes UInt64", - table_v6 - )) - .execute() - .await; + // Schema Check / Migration + for (table, is_v6) in [(&table_v4, false), (&table_v6, true)] { + let ip_type = if is_v6 { "IPv6" } else { "IPv4" }; + let columns = [ + ("service", "LowCardinality(String)"), + ("conn_ts", "DateTime('UTC')"), + ("duration", "UInt32"), + ("port", "UInt16"), + ("ip", ip_type), + ("proxy_proto", "Enum8('None' = 0, 'V1' = 1, 'V2' = 2)"), + ("bytes", "UInt64"), + ]; + for (name, type_def) in columns { + self + .client + .query(&format!( + "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}", + table, name, type_def + )) + .execute() + .await + .map_err(|e| anyhow::anyhow!("failed to add column {} to {}: {}", name, table, e))?; + } + } - self - .client - .query(&drop_view) - .execute() - .await - .map_err(|e| anyhow::anyhow!("failed to drop view: {}", e))?; self .client .query(&sql_view)