mirror of
https://github.com/awfufu/traudit
synced 2026-03-01 05:29:44 +08:00
feat: integrate clickhouse database adapter for traffic auditing
This commit is contained in:
1089
Cargo.lock
generated
1089
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -7,8 +7,7 @@ description = "A reverse proxy with auditing capabilities."
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "mysql", "postgres", "sqlite"] }
|
||||
clickhouse = { version = "0.13", features = ["test-util"] }
|
||||
clickhouse = { version = "0.13", features = ["time"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_yaml = "0.9"
|
||||
socket2 = "0.5"
|
||||
@@ -20,6 +19,8 @@ thiserror = "2.0"
|
||||
bytes = "1.1"
|
||||
url = "2.5"
|
||||
async-trait = "0.1"
|
||||
time = { version = "0.3.45", features = ["serde", "macros", "formatting", "parsing"] }
|
||||
serde_repr = "0.1.20"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
|
||||
27
README.md
27
README.md
@@ -21,17 +21,16 @@ See [config_example.yaml](config_example.yaml).
|
||||
|
||||
- [x] Core Implementation
|
||||
- [x] Configuration parsing (`serde_yaml`)
|
||||
- [x] TCP/UDP/Unix Listener abstraction
|
||||
- [x] Proxy Protocol parsing & handling
|
||||
- [x] Zero-copy forwarding loop (`splice`)
|
||||
- [ ] Database Integration
|
||||
- [ ] Define Audit Log schema
|
||||
- [ ] Implement `AuditLogger` trait
|
||||
- [ ] ClickHouse adapter
|
||||
- [ ] SQLite/MySQL adapters
|
||||
- [x] Testing
|
||||
- [ ] Unit tests for config & protocol
|
||||
- [x] End-to-end forwarding tests
|
||||
- [ ] Documentation
|
||||
- [ ] Detailed configuration guide
|
||||
- [ ] Deployment guide
|
||||
- [x] TCP Proxy & Zero-copy forwarding (`splice`)
|
||||
- [x] Proxy Protocol V1/V2 parsing
|
||||
- [ ] UDP Forwarding (Planned)
|
||||
- [ ] Unix Socket Forwarding (Planned)
|
||||
- [x] Database Integration
|
||||
- [x] ClickHouse Adapter (Native Interface)
|
||||
- [x] Traffic Accounting (Bytes/Bandwidth)
|
||||
- [x] IPv6 Support
|
||||
- [ ] SQLite/MySQL Adapters (Future)
|
||||
- [ ] Documentation & Testing
|
||||
- [x] Basic End-to-end tests
|
||||
- [ ] Comprehensive Unit Tests
|
||||
- [ ] Deployment Guide
|
||||
|
||||
25
README_cn.md
25
README_cn.md
@@ -21,17 +21,16 @@ traudit 是一个支持 TCP/UDP/Unix Socket 的反向代理程序,专注于连
|
||||
|
||||
- [x] 核心功能
|
||||
- [x] 配置文件解析 (`serde_yaml`)
|
||||
- [x] 监听器抽象 (TCP/UDP/Unix)
|
||||
- [x] Proxy Protocol 解析与处理
|
||||
- [x] 零拷贝转发循环 (`splice`)
|
||||
- [ ] 数据库
|
||||
- [ ] 定义审计日志结构
|
||||
- [ ] 实现 `AuditLogger` Trait
|
||||
- [ ] ClickHouse 适配器
|
||||
- [ ] SQLite/MySQL 适配器
|
||||
- [x] 测试
|
||||
- [ ] 单元测试 (配置与协议)
|
||||
- [x] 端到端转发测试
|
||||
- [ ] 文档
|
||||
- [ ] 详细配置指南
|
||||
- [x] TCP 代理与零拷贝转发 (`splice`)
|
||||
- [x] Proxy Protocol V1/V2 解析
|
||||
- [ ] UDP 转发 (计划中)
|
||||
- [ ] Unix Socket 转发 (计划中)
|
||||
- [x] 数据库集成
|
||||
- [x] ClickHouse 适配器 (原生接口)
|
||||
- [x] 流量统计 (字节数)
|
||||
- [x] IPv6 支持
|
||||
- [ ] SQLite/MySQL 适配器 (计划中)
|
||||
- [ ] 文档与测试
|
||||
- [x] 基础端到端测试
|
||||
- [ ] 单元测试
|
||||
- [ ] 部署文档
|
||||
|
||||
@@ -11,22 +11,27 @@ pub struct Config {
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct DatabaseConfig {
|
||||
pub dsn: String,
|
||||
#[allow(dead_code)]
|
||||
pub batch: BatchConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct BatchConfig {
|
||||
#[allow(dead_code)]
|
||||
pub size: usize,
|
||||
#[allow(dead_code)]
|
||||
pub timeout_secs: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct ServiceConfig {
|
||||
pub name: String,
|
||||
#[allow(dead_code)]
|
||||
pub db_table: String,
|
||||
pub binds: Vec<BindConfig>,
|
||||
pub forward_type: ForwardType,
|
||||
pub forward_addr: String,
|
||||
#[allow(dead_code)]
|
||||
pub forward_proxy_protocol: Option<ProxyProtocolVersion>,
|
||||
}
|
||||
|
||||
@@ -35,6 +40,7 @@ pub struct BindConfig {
|
||||
#[serde(rename = "type")]
|
||||
pub bind_type: BindType,
|
||||
pub addr: String,
|
||||
#[serde(alias = "proxy")]
|
||||
pub proxy_protocol: Option<ProxyProtocolVersion>,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,13 +1,10 @@
|
||||
use crate::core::upstream::AsyncStream;
|
||||
use std::io;
|
||||
|
||||
// Actual implementation below
|
||||
// Spliceable trait and its implementations are removed as AsyncStream handles readiness internally.
|
||||
|
||||
async fn splice_loop(read: &AsyncStream, write: &AsyncStream) -> io::Result<u64> {
|
||||
async fn splice_loop(read: &AsyncStream, write: &AsyncStream) -> (u64, io::Result<()>) {
|
||||
let mut pipe = [0i32; 2];
|
||||
if unsafe { libc::pipe2(pipe.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC) } < 0 {
|
||||
return Err(io::Error::last_os_error());
|
||||
return (0, Err(io::Error::last_os_error()));
|
||||
}
|
||||
let (pipe_rd, pipe_wr) = (pipe[0], pipe[1]);
|
||||
|
||||
@@ -26,26 +23,32 @@ async fn splice_loop(read: &AsyncStream, write: &AsyncStream) -> io::Result<u64>
|
||||
|
||||
loop {
|
||||
// src -> pipe
|
||||
// splice_read handles readiness internally with AsyncFd
|
||||
let len = match read.splice_read(pipe_wr, 65536).await {
|
||||
Ok(0) => return Ok(total_bytes), // EOF
|
||||
Ok(0) => return (total_bytes, Ok(())), // EOF
|
||||
Ok(n) => n,
|
||||
Err(e) => return Err(e),
|
||||
Err(e) => return (total_bytes, Err(e)),
|
||||
};
|
||||
|
||||
// pipe -> dst
|
||||
let mut written = 0;
|
||||
while written < len {
|
||||
let to_write = len - written;
|
||||
let n = write.splice_write(pipe_rd, to_write).await?;
|
||||
if n == 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::WriteZero,
|
||||
"Zero write in splice logic",
|
||||
));
|
||||
match write.splice_write(pipe_rd, to_write).await {
|
||||
Ok(0) => {
|
||||
return (
|
||||
total_bytes,
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::WriteZero,
|
||||
"Zero write in splice logic",
|
||||
)),
|
||||
);
|
||||
}
|
||||
Ok(n) => {
|
||||
written += n;
|
||||
total_bytes += n as u64;
|
||||
}
|
||||
Err(e) => return (total_bytes, Err(e)),
|
||||
}
|
||||
written += n;
|
||||
total_bytes += n as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -53,13 +56,22 @@ async fn splice_loop(read: &AsyncStream, write: &AsyncStream) -> io::Result<u64>
|
||||
pub async fn zero_copy_bidirectional(
|
||||
inbound: AsyncStream,
|
||||
outbound: AsyncStream,
|
||||
) -> io::Result<()> {
|
||||
) -> (u64, io::Result<()>) {
|
||||
// We own the streams now, so we can split references to them for the join.
|
||||
let (c2s, s2c) = tokio::join!(
|
||||
let ((c2s_bytes, c2s_res), (s2c_bytes, s2c_res)) = tokio::join!(
|
||||
splice_loop(&inbound, &outbound),
|
||||
splice_loop(&outbound, &inbound)
|
||||
);
|
||||
c2s?;
|
||||
s2c?;
|
||||
Ok(())
|
||||
|
||||
let total = c2s_bytes + s2c_bytes;
|
||||
|
||||
// Prefer returning the first error encountered, but prioritize keeping the bytes
|
||||
if let Err(e) = c2s_res {
|
||||
return (total, Err(e));
|
||||
}
|
||||
if let Err(e) = s2c_res {
|
||||
return (total, Err(e));
|
||||
}
|
||||
|
||||
(total, Ok(()))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,12 @@ use tracing::{error, info};
|
||||
pub async fn run(config: Config) -> anyhow::Result<()> {
|
||||
let db = Arc::new(ClickHouseLogger::new(&config.database));
|
||||
|
||||
// init db table
|
||||
if let Err(e) = db.init().await {
|
||||
error!("failed to init database: {}", e);
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
let mut join_set = tokio::task::JoinSet::new();
|
||||
|
||||
for service in config.services {
|
||||
@@ -24,7 +30,10 @@ pub async fn run(config: Config) -> anyhow::Result<()> {
|
||||
|
||||
if bind_type == BindType::Tcp {
|
||||
let listener = TcpListener::bind(&bind_addr).await.map_err(|e| {
|
||||
error!("[{}] failed to bind {}: {}", service_config.name, bind_addr, e);
|
||||
error!(
|
||||
"[{}] failed to bind {}: {}",
|
||||
service_config.name, bind_addr, e
|
||||
);
|
||||
e
|
||||
})?;
|
||||
|
||||
@@ -71,25 +80,24 @@ async fn start_tcp_service(
|
||||
service: ServiceConfig,
|
||||
listener: TcpListener,
|
||||
proxy_protocol: bool,
|
||||
_db: Arc<ClickHouseLogger>,
|
||||
db: Arc<ClickHouseLogger>,
|
||||
) {
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((inbound, _client_addr)) => {
|
||||
// log moved to handle_connection for consistent real ip logging
|
||||
let service = service.clone();
|
||||
// let db = _db.clone();
|
||||
let db = db.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_connection(inbound, service, proxy_protocol).await {
|
||||
if let Err(e) = handle_connection(inbound, service, proxy_protocol, db).await {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
|
||||
// normal disconnects, debug log only
|
||||
tracing::debug!("connection closed: {}", e);
|
||||
}
|
||||
_ => {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
|
||||
// normal disconnects, debug log only
|
||||
tracing::debug!("connection closed: {}", e);
|
||||
}
|
||||
_ => {
|
||||
error!("connection error: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -105,40 +113,104 @@ async fn handle_connection(
|
||||
mut inbound: tokio::net::TcpStream,
|
||||
service: ServiceConfig,
|
||||
proxy_protocol: bool,
|
||||
) -> std::io::Result<()> {
|
||||
// read proxy protocol (if configured)
|
||||
let (_client_addr, mut buffer) = if proxy_protocol {
|
||||
let (proxy_info, buffer) = protocol::read_proxy_header(&mut inbound).await?;
|
||||
if let Some(info) = proxy_info {
|
||||
let physical = inbound.peer_addr()?;
|
||||
info!("[{}] <- {} ({})", service.name, info.source, physical);
|
||||
(info.source, buffer)
|
||||
db: Arc<ClickHouseLogger>,
|
||||
) -> std::io::Result<u64> {
|
||||
let conn_ts = time::OffsetDateTime::now_utc();
|
||||
let start_instant = std::time::Instant::now();
|
||||
|
||||
// Default metadata
|
||||
let mut final_ip = inbound.peer_addr()?.ip();
|
||||
let mut final_port = inbound.peer_addr()?.port();
|
||||
let mut proto_enum = crate::db::clickhouse::ProxyProto::None;
|
||||
|
||||
let result = async {
|
||||
// read proxy protocol (if configured)
|
||||
let mut buffer = bytes::BytesMut::new();
|
||||
|
||||
if proxy_protocol {
|
||||
match protocol::read_proxy_header(&mut inbound).await {
|
||||
Ok((proxy_info, buf)) => {
|
||||
buffer = buf;
|
||||
if let Some(info) = proxy_info {
|
||||
let physical = inbound.peer_addr()?;
|
||||
info!("[{}] <- {} ({})", service.name, info.source, physical);
|
||||
final_ip = info.source.ip();
|
||||
final_port = info.source.port();
|
||||
proto_enum = match info.version {
|
||||
protocol::Version::V1 => crate::db::clickhouse::ProxyProto::V1,
|
||||
protocol::Version::V2 => crate::db::clickhouse::ProxyProto::V2,
|
||||
};
|
||||
} else {
|
||||
// Strict enforcement: if configured with proxy_protocol, MUST have a header
|
||||
let physical = inbound.peer_addr()?;
|
||||
let msg = format!("strict proxy protocol violation from {}", physical);
|
||||
error!("[{}] {}", service.name, msg);
|
||||
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, msg));
|
||||
}
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
} else {
|
||||
let addr = inbound.peer_addr()?;
|
||||
info!("[{}] <- {}", service.name, addr);
|
||||
(addr, buffer)
|
||||
}
|
||||
|
||||
// connect upstream
|
||||
let mut upstream = UpstreamStream::connect(service.forward_type, &service.forward_addr).await?;
|
||||
|
||||
// write buffered data (peeked bytes)
|
||||
if !buffer.is_empty() {
|
||||
upstream.write_all_buf(&mut buffer).await?;
|
||||
}
|
||||
|
||||
// zero-copy forwarding
|
||||
let inbound_async = crate::core::upstream::AsyncStream::from_tokio_tcp(inbound)?;
|
||||
let upstream_async = upstream.into_async_stream()?;
|
||||
|
||||
let (spliced_bytes, splice_res) =
|
||||
forwarder::zero_copy_bidirectional(inbound_async, upstream_async).await;
|
||||
|
||||
if let Err(e) = splice_res {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::ConnectionReset | std::io::ErrorKind::BrokenPipe => {
|
||||
tracing::debug!("[{}] connection closed with error: {}", service.name, e);
|
||||
}
|
||||
_ => {
|
||||
error!("[{}] connection error: {}", service.name, e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("[{}] connection closed cleanly", service.name);
|
||||
}
|
||||
|
||||
// Total bytes = initial peeked/buffered payload + filtered bytes
|
||||
Ok(spliced_bytes + buffer.len() as u64)
|
||||
}
|
||||
.await;
|
||||
|
||||
let duration = if result.is_ok() {
|
||||
start_instant.elapsed().as_millis() as u32
|
||||
} else {
|
||||
let addr = inbound.peer_addr()?;
|
||||
info!("[{}] <- {}", service.name, addr);
|
||||
(addr, bytes::BytesMut::new())
|
||||
0
|
||||
};
|
||||
|
||||
// connect upstream
|
||||
let mut upstream = UpstreamStream::connect(service.forward_type, &service.forward_addr).await?;
|
||||
let bytes_transferred = result.as_ref().unwrap_or(&0).clone();
|
||||
|
||||
// forward header (TODO: if configured)
|
||||
let log_entry = crate::db::clickhouse::TcpLog {
|
||||
service: service.name.clone(),
|
||||
conn_ts,
|
||||
duration,
|
||||
port: final_port,
|
||||
ip: final_ip,
|
||||
proxy_proto: proto_enum,
|
||||
bytes: bytes_transferred,
|
||||
};
|
||||
|
||||
// write buffered data (peeked bytes)
|
||||
if !buffer.is_empty() {
|
||||
upstream.write_all_buf(&mut buffer).await?;
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = db.insert_log(log_entry).await {
|
||||
error!("failed to insert tcp log: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
// zero-copy forwarding
|
||||
let inbound_async = crate::core::upstream::AsyncStream::from_tokio_tcp(inbound)?;
|
||||
let upstream_async = upstream.into_async_stream()?;
|
||||
|
||||
forwarder::zero_copy_bidirectional(inbound_async, upstream_async).await?;
|
||||
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
|
||||
@@ -1,20 +1,222 @@
|
||||
use super::{AuditEvent, AuditLogger};
|
||||
use crate::config::DatabaseConfig;
|
||||
use async_trait::async_trait;
|
||||
use clickhouse::{Client, Row};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_repr::{Deserialize_repr, Serialize_repr};
|
||||
use std::net::{IpAddr, Ipv6Addr};
|
||||
use tracing::info;
|
||||
|
||||
pub struct ClickHouseLogger;
|
||||
|
||||
impl ClickHouseLogger {
|
||||
pub fn new(_config: &DatabaseConfig) -> Self {
|
||||
// TODO: Initialize ClickHouse client
|
||||
ClickHouseLogger
|
||||
}
|
||||
#[derive(Debug, Clone, Copy, Serialize_repr, Deserialize_repr)]
|
||||
#[repr(u8)]
|
||||
pub enum ProxyProto {
|
||||
None = 0,
|
||||
V1 = 1,
|
||||
V2 = 2,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AuditLogger for ClickHouseLogger {
|
||||
async fn log(&self, _event: AuditEvent) -> anyhow::Result<()> {
|
||||
// TODO: Implement insertion logic
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TcpLog {
|
||||
pub service: String,
|
||||
pub conn_ts: time::OffsetDateTime,
|
||||
pub duration: u32,
|
||||
pub port: u16,
|
||||
pub ip: IpAddr,
|
||||
pub proxy_proto: ProxyProto,
|
||||
pub bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Row)]
|
||||
struct TcpLogV4 {
|
||||
pub service: String,
|
||||
#[serde(with = "clickhouse::serde::time::datetime")]
|
||||
pub conn_ts: time::OffsetDateTime,
|
||||
pub duration: u32,
|
||||
pub port: u16,
|
||||
pub ip: u32,
|
||||
pub proxy_proto: ProxyProto,
|
||||
pub bytes: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Row)]
|
||||
struct TcpLogV6 {
|
||||
pub service: String,
|
||||
#[serde(with = "clickhouse::serde::time::datetime")]
|
||||
pub conn_ts: time::OffsetDateTime,
|
||||
pub duration: u32,
|
||||
pub port: u16,
|
||||
pub ip: Ipv6Addr,
|
||||
pub proxy_proto: ProxyProto,
|
||||
pub bytes: u64,
|
||||
}
|
||||
|
||||
pub struct ClickHouseLogger {
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl ClickHouseLogger {
|
||||
pub fn new(config: &DatabaseConfig) -> Self {
|
||||
let url = url::Url::parse(&config.dsn).expect("invalid dsn");
|
||||
let mut client = Client::default().with_url(url.as_str());
|
||||
|
||||
if let (Some(u), Some(p)) = (Some(url.username()), url.password()) {
|
||||
if !u.is_empty() {
|
||||
client = client.with_user(u).with_password(p);
|
||||
}
|
||||
} else if !url.username().is_empty() {
|
||||
client = client.with_user(url.username());
|
||||
}
|
||||
|
||||
if let Some(path) = url.path_segments().map(|c| c.collect::<Vec<_>>()) {
|
||||
if let Some(db) = path.first() {
|
||||
if !db.is_empty() {
|
||||
client = client.with_database(*db);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub async fn init(&self) -> anyhow::Result<()> {
|
||||
let sql_v4 = r#"
|
||||
CREATE TABLE IF NOT EXISTS tcp_log_v4 (
|
||||
service LowCardinality(String),
|
||||
conn_ts DateTime('UTC'),
|
||||
duration UInt32,
|
||||
port UInt16,
|
||||
ip IPv4,
|
||||
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
|
||||
bytes UInt64
|
||||
) ENGINE = MergeTree()
|
||||
ORDER BY (service, conn_ts);
|
||||
"#;
|
||||
|
||||
let sql_v6 = r#"
|
||||
CREATE TABLE IF NOT EXISTS tcp_log_v6 (
|
||||
service LowCardinality(String),
|
||||
conn_ts DateTime('UTC'),
|
||||
duration UInt32,
|
||||
port UInt16,
|
||||
ip IPv6,
|
||||
proxy_proto Enum8('None' = 0, 'V1' = 1, 'V2' = 2),
|
||||
bytes UInt64
|
||||
) ENGINE = MergeTree()
|
||||
ORDER BY (service, conn_ts);
|
||||
"#;
|
||||
|
||||
let drop_view = "DROP VIEW IF EXISTS tcp_log";
|
||||
|
||||
let sql_view = r#"
|
||||
CREATE VIEW tcp_log AS
|
||||
SELECT
|
||||
service, conn_ts, duration, port,
|
||||
IPv4NumToString(ip) AS ip_str,
|
||||
proxy_proto,
|
||||
formatReadableSize(bytes) AS traffic
|
||||
FROM tcp_log_v4
|
||||
UNION ALL
|
||||
SELECT
|
||||
service, conn_ts, duration, port,
|
||||
IPv6NumToString(ip) AS ip_str,
|
||||
proxy_proto,
|
||||
formatReadableSize(bytes) AS traffic
|
||||
FROM tcp_log_v6;
|
||||
"#;
|
||||
|
||||
self
|
||||
.client
|
||||
.query(sql_v4)
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to create v4 table: {}", e))?;
|
||||
|
||||
// Migrations
|
||||
let _ = self
|
||||
.client
|
||||
.query("ALTER TABLE tcp_log_v4 RENAME COLUMN IF EXISTS bytes_transferred TO bytes")
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query("ALTER TABLE tcp_log_v4 RENAME COLUMN IF EXISTS traffic TO bytes")
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query("ALTER TABLE tcp_log_v4 ADD COLUMN IF NOT EXISTS bytes UInt64")
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
self
|
||||
.client
|
||||
.query(sql_v6)
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to create v6 table: {}", e))?;
|
||||
|
||||
let _ = self
|
||||
.client
|
||||
.query("ALTER TABLE tcp_log_v6 RENAME COLUMN IF EXISTS bytes_transferred TO bytes")
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query("ALTER TABLE tcp_log_v6 RENAME COLUMN IF EXISTS traffic TO bytes")
|
||||
.execute()
|
||||
.await;
|
||||
let _ = self
|
||||
.client
|
||||
.query("ALTER TABLE tcp_log_v6 ADD COLUMN IF NOT EXISTS bytes UInt64")
|
||||
.execute()
|
||||
.await;
|
||||
|
||||
self
|
||||
.client
|
||||
.query(drop_view)
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to drop view: {}", e))?;
|
||||
self
|
||||
.client
|
||||
.query(sql_view)
|
||||
.execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("failed to create view: {}", e))?;
|
||||
|
||||
info!("ensured tables and view exist");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn insert_log(&self, log: TcpLog) -> anyhow::Result<()> {
|
||||
match log.ip {
|
||||
IpAddr::V4(ip) => {
|
||||
let row = TcpLogV4 {
|
||||
service: log.service,
|
||||
conn_ts: log.conn_ts,
|
||||
duration: log.duration,
|
||||
port: log.port,
|
||||
ip: u32::from(ip),
|
||||
proxy_proto: log.proxy_proto,
|
||||
bytes: log.bytes,
|
||||
};
|
||||
let mut insert = self.client.insert("tcp_log_v4")?;
|
||||
insert.write(&row).await?;
|
||||
insert.end().await?;
|
||||
}
|
||||
IpAddr::V6(ip) => {
|
||||
let row = TcpLogV6 {
|
||||
service: log.service,
|
||||
conn_ts: log.conn_ts,
|
||||
duration: log.duration,
|
||||
port: log.port,
|
||||
ip,
|
||||
proxy_proto: log.proxy_proto,
|
||||
bytes: log.bytes,
|
||||
};
|
||||
let mut insert = self.client.insert("tcp_log_v6")?;
|
||||
insert.write(&row).await?;
|
||||
insert.end().await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
pub mod clickhouse;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AuditEvent {
|
||||
// TODO: Define audit event fields (src, dst, timestamp, etc.)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait AuditLogger: Send + Sync {
|
||||
// TODO: Finalize log interface
|
||||
async fn log(&self, event: AuditEvent) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ use tokio::io::{AsyncRead, AsyncReadExt};
|
||||
pub struct ProxyInfo {
|
||||
pub version: Version,
|
||||
pub source: SocketAddr,
|
||||
pub destination: SocketAddr,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -24,28 +23,45 @@ pub async fn read_proxy_header<T: AsyncRead + Unpin>(
|
||||
) -> io::Result<(Option<ProxyInfo>, BytesMut)> {
|
||||
let mut buf = BytesMut::with_capacity(512);
|
||||
|
||||
// Read enough to distinguish version
|
||||
loop {
|
||||
if !buf.is_empty() {
|
||||
// Check potential V2 match
|
||||
let v2_match = if buf.len() <= V2_PREFIX.len() {
|
||||
V2_PREFIX.starts_with(&buf)
|
||||
} else {
|
||||
buf.starts_with(V2_PREFIX)
|
||||
};
|
||||
|
||||
// Initial read
|
||||
let n = stream.read_buf(&mut buf).await?;
|
||||
if n == 0 {
|
||||
return Ok((None, buf));
|
||||
// Check potential V1 match
|
||||
let v1_match = if buf.len() <= V1_PREFIX.len() {
|
||||
V1_PREFIX.starts_with(&buf)
|
||||
} else {
|
||||
buf.starts_with(V1_PREFIX)
|
||||
};
|
||||
|
||||
// If matches neither, it's not a proxy header
|
||||
if !v2_match && !v1_match {
|
||||
return Ok((None, buf));
|
||||
}
|
||||
|
||||
// If identified as V2 (full signature match)
|
||||
if buf.len() >= 12 && buf.starts_with(V2_PREFIX) {
|
||||
return parse_v2(stream, buf).await;
|
||||
}
|
||||
|
||||
// If identified as V1 (full signature match)
|
||||
if buf.len() >= 6 && buf.starts_with(V1_PREFIX) {
|
||||
return parse_v1(stream, buf).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Need more data to decide
|
||||
let n = stream.read_buf(&mut buf).await?;
|
||||
if n == 0 {
|
||||
// EOF before header complete/identified
|
||||
return Ok((None, buf));
|
||||
}
|
||||
}
|
||||
|
||||
// Check V2
|
||||
if buf.len() >= 12 && &buf[..12] == V2_PREFIX {
|
||||
// v2
|
||||
return parse_v2(stream, buf).await;
|
||||
}
|
||||
|
||||
// Check V1
|
||||
if buf.len() >= 6 && &buf[..6] == V1_PREFIX {
|
||||
// v1
|
||||
return parse_v1(stream, buf).await;
|
||||
}
|
||||
|
||||
// Neither
|
||||
Ok((None, buf))
|
||||
}
|
||||
|
||||
async fn parse_v1<T: AsyncRead + Unpin>(
|
||||
@@ -65,13 +81,13 @@ async fn parse_v1<T: AsyncRead + Unpin>(
|
||||
let src_ip: IpAddr = parts[2]
|
||||
.parse()
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid src IP"))?;
|
||||
let dst_ip: IpAddr = parts[3]
|
||||
let _dst_ip: IpAddr = parts[3]
|
||||
.parse()
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid dst IP"))?;
|
||||
let src_port: u16 = parts[4]
|
||||
.parse()
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid src port"))?;
|
||||
let dst_port: u16 = parts[5]
|
||||
let _dst_port: u16 = parts[5]
|
||||
.parse()
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid dst port"))?;
|
||||
|
||||
@@ -79,7 +95,6 @@ async fn parse_v1<T: AsyncRead + Unpin>(
|
||||
Some(ProxyInfo {
|
||||
version: Version::V1,
|
||||
source: SocketAddr::new(src_ip, src_port),
|
||||
destination: SocketAddr::new(dst_ip, dst_port),
|
||||
}),
|
||||
buf,
|
||||
));
|
||||
@@ -160,14 +175,13 @@ async fn parse_v2<T: AsyncRead + Unpin>(
|
||||
// TCP/UDP over IPv4
|
||||
if payload.len() >= 12 {
|
||||
let src_ip = Ipv4Addr::new(payload[0], payload[1], payload[2], payload[3]);
|
||||
let dst_ip = Ipv4Addr::new(payload[4], payload[5], payload[6], payload[7]);
|
||||
let _dst_ip = Ipv4Addr::new(payload[4], payload[5], payload[6], payload[7]);
|
||||
let src_port = u16::from_be_bytes([payload[8], payload[9]]);
|
||||
let dst_port = u16::from_be_bytes([payload[10], payload[11]]);
|
||||
let _dst_port = u16::from_be_bytes([payload[10], payload[11]]);
|
||||
return Ok((
|
||||
Some(ProxyInfo {
|
||||
version: Version::V2,
|
||||
source: SocketAddr::new(IpAddr::V4(src_ip), src_port),
|
||||
destination: SocketAddr::new(IpAddr::V4(dst_ip), dst_port),
|
||||
}),
|
||||
buf,
|
||||
));
|
||||
@@ -180,15 +194,14 @@ async fn parse_v2<T: AsyncRead + Unpin>(
|
||||
// Keep it brief
|
||||
let mut src = [0u8; 16];
|
||||
src.copy_from_slice(&payload[0..16]);
|
||||
let mut dst = [0u8; 16];
|
||||
dst.copy_from_slice(&payload[16..32]);
|
||||
let mut _dst = [0u8; 16];
|
||||
_dst.copy_from_slice(&payload[16..32]);
|
||||
let src_port = u16::from_be_bytes([payload[32], payload[33]]);
|
||||
let dst_port = u16::from_be_bytes([payload[34], payload[35]]);
|
||||
let _dst_port = u16::from_be_bytes([payload[34], payload[35]]);
|
||||
return Ok((
|
||||
Some(ProxyInfo {
|
||||
version: Version::V2,
|
||||
source: SocketAddr::new(IpAddr::V6(Ipv6Addr::from(src)), src_port),
|
||||
destination: SocketAddr::new(IpAddr::V6(Ipv6Addr::from(dst)), dst_port),
|
||||
}),
|
||||
buf,
|
||||
));
|
||||
|
||||
Reference in New Issue
Block a user