mirror of
https://github.com/awfufu/traudit
synced 2026-03-01 05:29:44 +08:00
refactor: optimize migration logic with column auto-discovery and stateful view creation
This commit is contained in:
@@ -122,11 +122,9 @@ impl ClickHouseLogger {
|
||||
table_v6
|
||||
);
|
||||
|
||||
let drop_view = format!("DROP VIEW IF EXISTS {}", view_name);
|
||||
|
||||
let sql_view = format!(
|
||||
r#"
|
||||
CREATE VIEW {} AS
|
||||
CREATE VIEW IF NOT EXISTS {} AS
|
||||
SELECT
|
||||
service, conn_ts, duration, port,
|
||||
IPv4NumToString(ip) AS ip_str,
|
||||
@@ -151,32 +149,6 @@ impl ClickHouseLogger {
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to create v4 table: {}", e))?;
|
||||
|
||||
// Migrations
|
||||
let _ = self
|
||||
.client
|
||||
.query(&format!(
|
||||
"ALTER TABLE {} RENAME COLUMN IF EXISTS bytes_transferred TO bytes",
|
||||
table_v4
|
||||
))
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query(&format!(
|
||||
"ALTER TABLE {} RENAME COLUMN IF EXISTS traffic TO bytes",
|
||||
table_v4
|
||||
))
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query(&format!(
|
||||
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS bytes UInt64",
|
||||
table_v4
|
||||
))
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
self
|
||||
.client
|
||||
.query(&sql_v6)
|
||||
@@ -184,37 +156,31 @@ impl ClickHouseLogger {
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to create v6 table: {}", e))?;
|
||||
|
||||
let _ = self
|
||||
.client
|
||||
.query(&format!(
|
||||
"ALTER TABLE {} RENAME COLUMN IF EXISTS bytes_transferred TO bytes",
|
||||
table_v6
|
||||
))
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query(&format!(
|
||||
"ALTER TABLE {} RENAME COLUMN IF EXISTS traffic TO bytes",
|
||||
table_v6
|
||||
))
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query(&format!(
|
||||
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS bytes UInt64",
|
||||
table_v6
|
||||
))
|
||||
.execute()
|
||||
.await;
|
||||
// Schema Check / Migration
|
||||
for (table, is_v6) in [(&table_v4, false), (&table_v6, true)] {
|
||||
let ip_type = if is_v6 { "IPv6" } else { "IPv4" };
|
||||
let columns = [
|
||||
("service", "LowCardinality(String)"),
|
||||
("conn_ts", "DateTime('UTC')"),
|
||||
("duration", "UInt32"),
|
||||
("port", "UInt16"),
|
||||
("ip", ip_type),
|
||||
("proxy_proto", "Enum8('None' = 0, 'V1' = 1, 'V2' = 2)"),
|
||||
("bytes", "UInt64"),
|
||||
];
|
||||
for (name, type_def) in columns {
|
||||
self
|
||||
.client
|
||||
.query(&format!(
|
||||
"ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}",
|
||||
table, name, type_def
|
||||
))
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to add column {} to {}: {}", name, table, e))?;
|
||||
}
|
||||
}
|
||||
|
||||
self
|
||||
.client
|
||||
.query(&drop_view)
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to drop view: {}", e))?;
|
||||
self
|
||||
.client
|
||||
.query(&sql_view)
|
||||
|
||||
Reference in New Issue
Block a user