init: first commit

This commit is contained in:
2026-01-15 09:39:49 +08:00
commit e0d57dc975
14 changed files with 2929 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

2333
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

25
Cargo.toml Normal file
View File

@@ -0,0 +1,25 @@
[package]
name = "traudit"
version = "0.1.0"
edition = "2021"
authors = ["awfufu"]
description = "A reverse proxy with auditing capabilities."
[dependencies]
tokio = { version = "1", features = ["full"] }
sqlx = { version = "0.8", features = ["runtime-tokio", "tls-rustls", "mysql", "postgres", "sqlite"] }
clickhouse = { version = "0.13", features = ["test-util"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
socket2 = "0.5"
libc = "0.2"
tracing = "0.1"
tracing-subscriber = "0.3"
anyhow = "1.0"
thiserror = "2.0"
bytes = "1.1"
url = "2.5"
async-trait = "0.1"
[dev-dependencies]
tempfile = "3"

37
README.md Normal file
View File

@@ -0,0 +1,37 @@
# traudit (Traffic Audit)
English | [简体中文](README_cn.md)
traudit is a reverse proxy supporting TCP/UDP/Unix Sockets, focused on connection auditing with support for multiple databases.
## Features
- **Multi-Protocol Support**: TCP, UDP, Unix Domain Sockets.
- **Proxy Protocol**: Support Proxy Protocol to record real IP.
- **Audit Logging**: Store connection information in databases (ClickHouse, MySQL, PostgreSQL, SQLite).
- **Zero-Copy Forwarding**: Uses `splice` on Linux for zero-copy forwarding.
What? You don't need a database? Then go use [HAProxy](https://www.haproxy.org/).
## Configuration
See [config_example.yaml](config_example.yaml).
## TODO List
- [ ] Core Implementation
- [ ] Configuration parsing (`serde_yaml`)
- [ ] TCP/UDP/Unix Listener abstraction
- [ ] Proxy Protocol parsing & handling
- [ ] Zero-copy forwarding loop (`splice`)
- [ ] Database Integration
- [ ] Define Audit Log schema
- [ ] Implement `AuditLogger` trait
- [ ] ClickHouse adapter
- [ ] SQLite/MySQL adapters
- [ ] Testing
- [ ] Unit tests for config & protocol
- [ ] End-to-end forwarding tests
- [ ] Documentation
- [ ] Detailed configuration guide
- [ ] Deployment guide

37
README_cn.md Normal file
View File

@@ -0,0 +1,37 @@
# traudit (Traffic Audit)
[English](README.md) | 简体中文
traudit 是一个支持 TCP/UDP/Unix Socket 的反向代理程序,专注于连接审计,支持多种数据库。
## 功能
- **多协议支持**: 支持 TCP, UDP, Unix Domain Socket。
- **Proxy Protocol**: 支持 Proxy Protocol 以记录真实 IP。
- **审计日志**: 将连接信息存入数据库 (ClickHouse, MySQL, PostgreSQL, SQLite)。
- **高性能转发**: 在 Linux 下使用 `splice` 实现零拷贝转发。
什么?你不需要数据库?那你去用 [HAProxy](https://www.haproxy.org/) 吧。
## 配置
请查看 [config_example.yaml](config_example.yaml)。
## TODO List
- [ ] 核心功能
- [ ] 配置文件解析 (`serde_yaml`)
- [ ] 监听器抽象 (TCP/UDP/Unix)
- [ ] Proxy Protocol 解析与处理
- [ ] 零拷贝转发循环 (`splice`)
- [ ] 数据库
- [ ] 定义审计日志结构
- [ ] 实现 `AuditLogger` Trait
- [ ] ClickHouse 适配器
- [ ] SQLite/MySQL 适配器
- [ ] 测试
- [ ] 单元测试 (配置与协议)
- [ ] 端到端转发测试
- [ ] 文档
- [ ] 详细配置指南
- [ ] 部署文档

42
config_example.yaml Normal file
View File

@@ -0,0 +1,42 @@
# Traudit Configuration Example
database:
dsn: "clickhouse://admin:password@127.0.0.1:8123/audit_db"
batch:
size: 50
timeout_secs: 5
services:
# Scenario: SSH High-Level Audit Gateway
# Receives traffic from FRP with v2 Proxy Protocol header, audits it,
# strips the header, and forwards pure TCP to local SSHD.
- name: "ssh-prod"
db_table: "ssh_audit_logs"
binds:
# Entry 1: Public traffic from FRP
- type: "tcp"
addr: "0.0.0.0:1222"
proxy_protocol: "v2"
# Entry 2: LAN direct traffic (no Proxy Protocol)
- type: "tcp"
addr: "0.0.0.0:1223"
forward_type: "tcp"
forward_addr: "127.0.0.1:22"
# forward_proxy_protocol omitted, sends pure stream to SSHD
# Scenario: Protocol Conversion and Local Socket Forwarding
# Receives normal TCP traffic, converts to v1 Proxy Protocol header,
# and forwards to local Unix socket (Nginx).
- name: "web-gateway"
db_table: "http_access_audit"
binds:
- type: "tcp"
addr: "0.0.0.0:8080"
forward_type: "unix"
forward_addr: "/run/nginx/web.sock"
forward_proxy_protocol: "v1"

2
rustfmt.toml Normal file
View File

@@ -0,0 +1,2 @@
tab_spaces = 2
edition = "2024"

116
src/config/mod.rs Normal file
View File

@@ -0,0 +1,116 @@
use serde::Deserialize;
use std::path::Path;
use tokio::fs;
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub database: DatabaseConfig,
pub services: Vec<ServiceConfig>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct DatabaseConfig {
pub dsn: String,
pub batch: BatchConfig,
}
#[derive(Debug, Deserialize, Clone)]
pub struct BatchConfig {
pub size: usize,
pub timeout_secs: u64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ServiceConfig {
pub name: String,
pub db_table: String,
pub binds: Vec<BindConfig>,
pub forward_type: ForwardType,
pub forward_addr: String,
pub forward_proxy_protocol: Option<ProxyProtocolVersion>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct BindConfig {
#[serde(rename = "type")]
pub bind_type: BindType,
pub addr: String,
pub proxy_protocol: Option<ProxyProtocolVersion>,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum BindType {
Tcp,
Udp,
Unix,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ForwardType {
Tcp,
Udp,
Unix,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ProxyProtocolVersion {
V1,
V2,
}
impl Config {
pub async fn load<P: AsRef<Path>>(path: P) -> Result<Self, anyhow::Error> {
let content = fs::read_to_string(path).await?;
let config: Config = serde_yaml::from_str(&content)?;
Ok(config)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
#[tokio::test]
async fn test_load_config() {
let config_str = r#"
database:
dsn: "clickhouse://admin:password@127.0.0.1:8123/audit_db"
batch:
size: 50
timeout_secs: 5
services:
- name: "ssh-prod"
db_table: "ssh_audit_logs"
binds:
- type: "tcp"
addr: "0.0.0.0:22222"
proxy_protocol: "v2"
forward_type: "tcp"
forward_addr: "127.0.0.1:22"
"#;
let mut file = tempfile::NamedTempFile::new().unwrap();
write!(file, "{}", config_str).unwrap();
let path = file.path().to_path_buf();
// Close the file handle so tokio can read it, or just keep it open and read by path?
// tempfile deletes on drop. We need to keep `file` alive.
let config = Config::load(&path).await.expect("Failed to load config");
assert_eq!(
config.database.dsn,
"clickhouse://admin:password@127.0.0.1:8123/audit_db"
);
assert_eq!(config.services.len(), 1);
assert_eq!(config.services[0].name, "ssh-prod");
assert_eq!(config.services[0].binds[0].bind_type, BindType::Tcp);
assert_eq!(
config.services[0].binds[0].proxy_protocol,
Some(ProxyProtocolVersion::V2)
);
}
}

1
src/core/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod server;

70
src/core/server.rs Normal file
View File

@@ -0,0 +1,70 @@
use crate::config::{BindType, Config};
use crate::db::clickhouse::ClickHouseLogger;
use crate::db::AuditLogger;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::signal;
use tracing::{error, info};
pub async fn run(config: Config) -> anyhow::Result<()> {
let db = Arc::new(ClickHouseLogger::new(&config.database));
let mut join_set = tokio::task::JoinSet::new();
for service in config.services {
let db = db.clone();
for bind in service.binds {
let service_name = service.name.clone();
let bind_addr = bind.addr.clone();
let bind_type = bind.bind_type;
// TODO: Handle UDP and Unix
if bind_type == BindType::Tcp {
let db = db.clone();
join_set.spawn(start_tcp_service(service_name, bind_addr, db));
} else {
info!("Skipping non-TCP bind for now: {:?}", bind_type);
}
}
}
info!("Traudit started.");
match signal::ctrl_c().await {
Ok(()) => {
info!("Shutdown signal received.");
}
Err(err) => {
error!("Unable to listen for shutdown signal: {}", err);
}
}
// Abort all tasks
join_set.shutdown().await;
Ok(())
}
async fn start_tcp_service(name: String, addr: String, _db: Arc<ClickHouseLogger>) {
info!("Service {} listening on TCP {}", name, addr);
let listener = match TcpListener::bind(&addr).await {
Ok(l) => l,
Err(e) => {
error!("Failed to bind {}: {}", addr, e);
return;
}
};
loop {
match listener.accept().await {
Ok((_socket, client_addr)) => {
info!("New connection from {}", client_addr);
// Spawn handler
// tokio::spawn(handle_connection(_socket, ...));
}
Err(e) => {
error!("Accept error: {}", e);
}
}
}
}

20
src/db/clickhouse.rs Normal file
View File

@@ -0,0 +1,20 @@
use super::{AuditEvent, AuditLogger};
use crate::config::DatabaseConfig;
use async_trait::async_trait;
pub struct ClickHouseLogger;
impl ClickHouseLogger {
pub fn new(_config: &DatabaseConfig) -> Self {
// TODO: Initialize ClickHouse client
ClickHouseLogger
}
}
#[async_trait]
impl AuditLogger for ClickHouseLogger {
async fn log(&self, _event: AuditEvent) -> anyhow::Result<()> {
// TODO: Implement insertion logic
Ok(())
}
}

14
src/db/mod.rs Normal file
View File

@@ -0,0 +1,14 @@
use async_trait::async_trait;
pub mod clickhouse;
#[derive(Debug, Clone)]
pub struct AuditEvent {
// TODO: Define audit event fields (src, dst, timestamp, etc.)
}
#[async_trait]
pub trait AuditLogger: Send + Sync {
// TODO: Finalize log interface
async fn log(&self, event: AuditEvent) -> anyhow::Result<()>;
}

28
src/main.rs Normal file
View File

@@ -0,0 +1,28 @@
mod config;
mod core;
mod db;
mod protocol;
use crate::config::Config;
use std::env;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args: Vec<String> = env::args().collect();
let config_path = args.get(1).map(|s| s.as_str()).unwrap_or("config.yaml");
println!("Loading config from {}", config_path);
// Check if config exists, if not warn (for dev purposes)
if !std::path::Path::new(config_path).exists() {
println!("Warning: Config file '{}' not found.", config_path);
// In a real run we might want to exit, but for init check we proceed or return
} else {
let config = Config::load(config_path).await?;
core::server::run(config).await?;
}
Ok(())
}

203
src/protocol/mod.rs Normal file
View File

@@ -0,0 +1,203 @@
use bytes::{Buf, BytesMut};
use std::io;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use tokio::io::{AsyncRead, AsyncReadExt};
#[derive(Debug, Clone)]
pub struct ProxyInfo {
pub version: Version,
pub source: SocketAddr,
pub destination: SocketAddr,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Version {
V1,
V2,
}
const V1_PREFIX: &[u8] = b"PROXY ";
const V2_PREFIX: &[u8] = b"\x0D\x0A\x0D\x0A\x00\x0D\x0A\x51\x55\x49\x54\x0A"; // 12 bytes
pub async fn read_proxy_header<T: AsyncRead + Unpin>(
stream: &mut T,
) -> io::Result<(Option<ProxyInfo>, BytesMut)> {
let mut buf = BytesMut::with_capacity(512);
// Read enough to distinguish version
// Initial read
let n = stream.read_buf(&mut buf).await?;
if n == 0 {
return Ok((None, buf));
}
// Check V2
if buf.len() >= 12 && &buf[..12] == V2_PREFIX {
// v2
return parse_v2(stream, buf).await;
}
// Check V1
if buf.len() >= 6 && &buf[..6] == V1_PREFIX {
// v1
return parse_v1(stream, buf).await;
}
// Neither
Ok((None, buf))
}
async fn parse_v1<T: AsyncRead + Unpin>(
stream: &mut T,
mut buf: BytesMut,
) -> io::Result<(Option<ProxyInfo>, BytesMut)> {
// Read line until \r\n
loop {
if let Some(pos) = buf.windows(2).position(|w| w == b"\r\n") {
// Found line end
let line_bytes = buf.split_to(pos + 2); // Consumes line including \r\n
let line = String::from_utf8_lossy(&line_bytes[..line_bytes.len() - 2]); // drop \r\n
let parts: Vec<&str> = line.split(' ').collect();
// PROXY TCP4 1.2.3.4 5.6.7.8 80 8080
if parts.len() >= 6 && parts[0] == "PROXY" {
let src_ip: IpAddr = parts[2]
.parse()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid src IP"))?;
let dst_ip: IpAddr = parts[3]
.parse()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid dst IP"))?;
let src_port: u16 = parts[4]
.parse()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid src port"))?;
let dst_port: u16 = parts[5]
.parse()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid dst port"))?;
return Ok((
Some(ProxyInfo {
version: Version::V1,
source: SocketAddr::new(src_ip, src_port),
destination: SocketAddr::new(dst_ip, dst_port),
}),
buf,
));
}
return Ok((None, buf));
}
// Read more
let n = stream.read_buf(&mut buf).await?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Incomplete V1 header",
));
}
if buf.len() > 256 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"V1 header too too long",
));
}
}
}
async fn parse_v2<T: AsyncRead + Unpin>(
stream: &mut T,
mut buf: BytesMut,
) -> io::Result<(Option<ProxyInfo>, BytesMut)> {
// We already have at least 12 bytes.
// 13th byte: ver_cmd (version 4 bits + command 4 bits)
// 14th byte: fam (family 4 bits + proto 4 bits)
// 15th-16th: len (u16 big endian)
while buf.len() < 16 {
let n = stream.read_buf(&mut buf).await?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Incomplete V2 header",
));
}
}
// Check version (should be 2) and command (0=Local, 1=Proxy)
let ver_cmd = buf[12];
if (ver_cmd >> 4) != 2 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid Proxy Protocol version",
));
}
// Length
let len_bytes = [buf[14], buf[15]];
let len = u16::from_be_bytes(len_bytes) as usize;
// Read payload
while buf.len() < 16 + len {
let n = stream.read_buf(&mut buf).await?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Incomplete V2 payload",
));
}
}
// Consume header + payload
let full_len = 16 + len;
let header_bytes = buf.split_to(full_len); // Now buf has remaining data
// Parse addresses from payload (header_bytes[16..])
let fam = header_bytes[13];
let payload = &header_bytes[16..];
match fam {
0x11 | 0x12 => {
// TCP/UDP over IPv4
if payload.len() >= 12 {
let src_ip = Ipv4Addr::new(payload[0], payload[1], payload[2], payload[3]);
let dst_ip = Ipv4Addr::new(payload[4], payload[5], payload[6], payload[7]);
let src_port = u16::from_be_bytes([payload[8], payload[9]]);
let dst_port = u16::from_be_bytes([payload[10], payload[11]]);
return Ok((
Some(ProxyInfo {
version: Version::V2,
source: SocketAddr::new(IpAddr::V4(src_ip), src_port),
destination: SocketAddr::new(IpAddr::V4(dst_ip), dst_port),
}),
buf,
));
}
}
0x21 | 0x22 => {
// TCP/UDP over IPv6
if payload.len() >= 36 {
// IPv6 parsing...
// Keep it brief
let mut src = [0u8; 16];
src.copy_from_slice(&payload[0..16]);
let mut dst = [0u8; 16];
dst.copy_from_slice(&payload[16..32]);
let src_port = u16::from_be_bytes([payload[32], payload[33]]);
let dst_port = u16::from_be_bytes([payload[34], payload[35]]);
return Ok((
Some(ProxyInfo {
version: Version::V2,
source: SocketAddr::new(IpAddr::V6(Ipv6Addr::from(src)), src_port),
destination: SocketAddr::new(IpAddr::V6(Ipv6Addr::from(dst)), dst_port),
}),
buf,
));
}
}
_ => {}
}
// If unsupported family or LOCAL command, return Info with dummy/empty or just ignore
// For now, if we can't parse addr, we return None but consume header.
Ok((None, buf))
}