30 分钟 | 中高级 | 掌握 WiFi CSI 姿态估计原理,部署 Python/Rust 双版,ESP32 感知节点入网跑通端到端 pipeline
目标读者
- 对 WiFi 感知、泛在计算感兴趣的开发者
- 有 Python 基础、想了解 Rust 高性能推理的工程师
- 想探索非摄像头人体感知方案的 researcher
最低要求:能读懂 Python 类和基本 Rust 语法,有一台 Linux/macOS 或 WSL2 环境。
核心依赖与环境
| 依赖 | 版本要求 | 作用 |
|---|---|---|
| Python | ≥ 3.9 | 主代码库 v1 |
| Rust | ≥ 1.75 | 高性能推理端口 |
| PyTorch | ≥ 2.1 | 神经网络推理 |
| NumPy + SciPy | 最新稳定版 | CSI 信号处理 |
| OpenCV | ≥ 4.8 | 姿态可视化 |
| ESP32-S3 开发板 | 8MB Flash | WiFi CSI 硬件节点(可选) |
| 支持 AP 的路由器 | 802.11n 及以上 | CSI 数据来源 |
WARNING
普通 ESP32(初代)和 ESP32-C3 不支持 CSI 采集。项目明确需要 ESP32-S3(Xtensa 双核)或 ESP32-C6 + 60GHz mmWave 模块。
完整项目结构
RuView/
├── v1/ # Python 主代码库
│ ├── src/
│ │ ├── core/
│ │ │ ├── csi_processor.py # CSI 数据结构与处理
│ │ │ ├── phase_sanitizer.py # 相位噪声校正
│ │ │ └── router_interface.py # 路由器/ESP32 通信
│ │ ├── hardware/
│ │ │ ├── csi_extractor.py # ESP32 串口数据提取
│ │ │ └── router_interface.py # WiFi CSI 帧抓取
│ │ └── services/
│ │ └── ... # 信号处理 pipeline
│ ├── data/
│ │ └── proof/
│ │ ├── sample_csi_data.json # 1000帧确定性合成CSI(seed=42)
│ │ └── verify.py # SHA-256 pipeline 验证脚本
│ └── tests/ # pytest 测试套件
├── rust-port/wifi-densepose-rs/ # Rust 移植版
│ └── crates/
│ ├── wifi-densepose-core/ # 核心类型、CSI frame 原语
│ ├── wifi-densepose-signal/ # RuvSense SOTA 信号处理(14个模块)
│ ├── wifi-densepose-nn/ # ONNX/PyTorch/Candle 推理后端
│ ├── wifi-densepose-train/ # RuVector 训练 pipeline
│ ├── wifi-densepose-hardware/ # ESP32 TDM 协议
│ └── wifi-densepose-api/ # Axum REST API
├── firmware/esp32-csi-node/ # ESP32-S3 固件源码
├── docs/adr/ # 43 个架构决策记录(ADR)
└── pyproject.toml # Python 依赖定义
手把手步骤
第一步:理解 WiFi CSI 原理(无需硬件先跑 Python 仿真)
WiFi CSI 的本质是:利用无线信号在空间中传播时,被人体反射导致幅度和相位变化来实现无接触感知。
当 WiFi 设备(路由器)和接收端之间有人移动时,CSI 报告会记录下每个子载波的复数值——包括幅度(amplitude)和相位(phase)。这些变化虽然微小,但足以反推出人体的姿态和动作。
RuView 的厉害之处在于,它把 CSI 信号通过一套完整的 pipeline(相位校正 → 多径抑制 → 神经网络推理)转换为 17 个关键点(keypoint)的姿态估计。
先不接硬件,用内置合成数据跑通全流程:
# 克隆项目
git clone https://github.com/ruvnet/RuView.git
cd RuView
# 安装 Python 依赖
cd v1
pip install -e ".[dev]"
# 运行确定性 pipeline 验证(无需任何硬件)
python data/proof/verify.py
如果一切正常,你会看到:
===== WiFi-DensePose Pipeline Verification =====
Generator: generate_reference_signal.py v1.0.0
Seed: 42
Frames: 1000 | Antennas: 3 | Subcarriers: 56
Loading sample CSI data... done
Running pipeline...
[1/4] Phase sanitization ............ OK
[2/4] Multistatic signal processing .. OK
[3/4] Neural network inference ....... OK
[4/4] Keypoint extraction ........... OK
Computing reference hash (SHA-256)...
VERDICT: PASS
TIP
这个验证脚本使用的是 seed=42 生成的合成 CSI 数据,所有结果完全可复现。跑通这一步,说明你的 Python 环境没问题,可以进入下一步。
第二步:环境准备(Python + Rust + WSL2)
Python 环境(推荐 venv)
# 推荐用 Python 3.10 或 3.11,兼容性最好
python3.10 -m venv .venv
source .venv/bin/activate # Linux/macOS
# .venv\Scripts\activate # Windows PowerShell
pip install --upgrade pip
pip install -e ".[gpu]" # 有 NVIDIA GPU 时
pip install -e ".[dev]" # 开发依赖(含 pytest)
Rust 环境(用于高性能推理端口)
# 安装 Rust(如果还没有)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# 验证
rustc --version # 应输出 rustc 1.75+
cargo --version # 应输出 cargo 1.75+
# Rust 工具链支持 ESP32 开发(可选,有硬件才需要)
rustup target add xtensa-esp32-elf riscv32imc-esp-elf
Windows 用户:WSL2 是必需的
# 在 PowerShell 中以管理员运行
wsl --install
# 重启后进入 WSL
wsl -d Ubuntu-22.04
WARNING
ESP-IDF(ESP32 固件编译工具链)在 Windows 原生 Git Bash 下存在环境变量冲突(MSYSTEM 变量会干扰 ESP-IDF v5.4)。如果要在 Windows 上编译 ESP32 固件,推荐使用 WSL2 或纯 PowerShell。
第三步:Python 版跑通姿态估计 pipeline
现在我们用真实代码把 CSI 数据送进推理 pipeline。
3.1 CSI 数据结构
# v1/src/core/csi_processor.py
from __future__ import annotations
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class CSIFrame:
"""单个 CSI 帧 — 对应 WiFi 的一次信道观测"""
timestamp_s: float
amplitude: np.ndarray # shape: (num_antennas, num_subcarriers)
phase: np.ndarray # shape: (num_antennas, num_subcarriers)
subcarrier_indices: np.ndarray # 子载波编号
@classmethod
def from_json(cls, data: dict) -> CSIFrame:
return cls(
timestamp_s=data["timestamp_s"],
amplitude=np.array(data["amplitude"], dtype=np.float32),
phase=np.zeros_like(data["amplitude"], dtype=np.float32), # JSON不含相位,手动初始化
subcarrier_indices=np.arange(56), # 默认56个子载波
)
@dataclass
class PoseKeypoints:
"""17 个 COCO 关键点姿态结果"""
keypoints: List[np.ndarray] # 每个 keypoint 的 (x, y, confidence)
pose_id: int
def get_skeleton(self) -> np.ndarray:
"""返回所有关键点的 (N, 2) 坐标数组,方便绘图"""
return np.array([p[:2] for p in self.keypoints])
3.2 相位校正(消除 LO 频偏)
原始 CSI 相位被 LO(本地振荡器)噪声严重污染,需要先校正才能用:
# v1/src/core/phase_sanitizer.py
import numpy as np
def sanitize_phase(csi: np.ndarray) -> np.ndarray:
"""
迭代 LO 相位偏移估计 + 圆形均值校正
参考文献: ADR-014 SOTA Signal Processing
参数:
csi: shape (num_antennas, num_subcarriers) 的复数CSI矩阵
返回:
校正后的相位矩阵 shape 相同
"""
num_antennas, num_subcarriers = csi.shape
phase = np.angle(csi) # 原始相位 [-π, π]
# 迭代相位偏移估计(3次迭代通常收敛)
for _ in range(3):
# 在每个子载波上,用相邻天线的相位差估算偏移
phase_offset = np.zeros(num_antennas)
for a in range(1, num_antennas):
delta = phase[a] - phase[0]
# 圆形均值(考虑相位缠绕)
delta_mean = np.arctan2(
np.mean(np.sin(delta)),
np.mean(np.cos(delta))
)
phase_offset[a] = delta_mean
phase[a] -= phase_offset[a]
return phase
def extract_clean_amplitude(csi: np.ndarray) -> np.ndarray:
"""提取干净幅度,忽略被噪声污染严重的子载波"""
amplitude = np.abs(csi)
# Z-score 过滤:丢弃幅度异常低的子载波(可能是干扰)
z_scores = (amplitude - np.mean(amplitude, axis=1, keepdims=True)) \
/ (np.std(amplitude, axis=1, keepdims=True) + 1e-8)
mask = z_scores > -2.5
amplitude = np.where(mask, amplitude, 0)
return amplitude
3.3 端到端推理流程
# v1/demo_pipeline.py
import json
import numpy as np
from pathlib import Path
from src.core.csi_processor import CSIFrame, PoseKeypoints
from src.core.phase_sanitizer import sanitize_phase, extract_clean_amplitude
def load_csi_frames(json_path: str) -> list[CSIFrame]:
"""从 JSON 文件加载 CSI 帧序列"""
with open(json_path) as f:
data = json.load(f)
return [CSIFrame.from_json(frame) for frame in data["frames"]]
def run_pipeline(csi_frame: CSIFrame) -> PoseKeypoints:
"""
完整的 CSI → Pose 推理 pipeline
流程:
1. 相位校正(去除 LO 噪声)
2. 幅度清理(异常值过滤)
3. 多径抑制(RuvSense 信号处理核心)
4. 神经网络推理(简化的关键点输出)
5. 卡尔曼滤波跟踪(17-keypoint tracker)
"""
# Step 1: 相位校正
# 这里用全1复数模拟原始CSI(演示用)
csi = csi_frame.amplitude * np.exp(1j * csi_frame.phase)
clean_phase = sanitize_phase(csi)
# Step 2: 干净幅度
clean_amplitude = extract_clean_amplitude(csi)
# Step 3: 多径抑制(简化版 — 取空间平均)
suppressed = clean_amplitude.mean(axis=0) # (56,) 沿天线维度平均
# Step 4: 简化的姿态推理(演示用)
# 实际调用 PyTorch 模型: self.model(suppressed.unsqueeze(0))
# 这里用假输出模拟 17 个关键点的 (x, y, conf)
keypoints = []
for i in range(17):
x = 0.5 + 0.05 * np.sin(i * 0.5) + np.random.normal(0, 0.02)
y = 0.5 + 0.3 * np.cos(i * 0.3) + np.random.normal(0, 0.02)
conf = 0.9 + np.random.normal(0, 0.05)
keypoints.append(np.array([x, y, np.clip(conf, 0, 1)]))
return PoseKeypoints(keypoints=keypoints, pose_id=0)
def main():
csi_path = Path(__file__).parent / "data/proof/sample_csi_data.json"
frames = load_csi_frames(str(csi_path))
print(f"Loaded {len(frames)} CSI frames")
# 取前 30 帧跑推理
for i, frame in enumerate(frames[:30]):
pose = run_pipeline(frame)
skeleton = pose.get_skeleton()
if i % 10 == 0:
print(f"Frame {i}: {len(skeleton)} keypoints, "
f"avg confidence={np.mean([k[2] for k in pose.keypoints]):.3f}")
print("Pipeline run complete.")
if __name__ == "__main__":
main()
运行:
python demo_pipeline.py
# 输出示例:
# Loaded 1000 CSI frames
# Frame 0: 17 keypoints, avg confidence=0.901
# Frame 10: 17 keypoints, avg confidence=0.898
# ...
# Pipeline run complete.
第四步:Rust 版编译与测试(极速推理)
Python 版验证了逻辑正确性,现在切换到 Rust 版——这是真正能跑到毫秒级延迟的生产路径。
cd rust-port/wifi-densepose-rs
# 检查编译(无需 GPU,单 crate 快速验证)
cargo check -p wifi-densepose-core --no-default-features
cargo check -p wifi-densepose-signal --no-default-features
# 运行完整测试套件(1,031+ 个测试,约 2 分钟)
cargo test --workspace --no-default-features
TIP
--no-default-features 跳过 GPU/CUDA 相关代码,在没有 NVIDIA 驱动的机器上也能跑。测试通过说明核心 pipeline 逻辑完全正确。
核心 Rust 代码结构如下——你会发现它的设计思路和 Python 版完全对应:
// crates/wifi-densepose-core/src/types.rs
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
// WiFi CSI 帧 — Rust 版本用强类型替代 Python 的 dataclass
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CSIFrame {
pub timestamp_us: u64,
pub amplitude: Vec<Vec<f32>>, // (antennas, subcarriers)
pub phase_raw: Vec<Vec<f32>>, // 原始相位(未校正)
pub subcarrier_idx: Vec<i32>,
}
// RuvSense 处理后的姿态关键点
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoseKeypoints {
pub keypoints: Vec<Keypoint>,
pub pose_id: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Keypoint {
pub x: f32,
pub y: f32,
pub confidence: f32,
pub label: &'static str, // "nose", "left_shoulder", 等
}
// 错误类型 — Rust 风格
#[derive(Debug, thiserror::Error)]
pub enum CSIPipelineError {
#[error("Insufficient antennas: got {0}, need >= 2")]
InsufficientAntennas(usize),
#[error("NaN detected in CSI data")]
NaNInCSI,
#[error("Phase sanitization failed: {0}")]
PhaseSanitization(String),
}
// crates/wifi-densepose-signal/src/ruvsense/phase_align.rs
// 相位校正 — Rust 高性能版(SIMD 加速)
use std::f32::consts::PI;
pub fn circular_mean(samples: &[f32]) -> f32 {
// 圆形均值 = atan2(sum(sin), sum(cos))
let (sin_sum, cos_sum) = samples.iter()
.map(|&x| (x.sin(), x.cos()))
.fold((0.0_f32, 0.0_f32), |(s, c), (si, co)| (s + si, c + co));
sin_sum.atan2(cos_sum)
}
/// 迭代 LO 相位偏移校正(对应 Python 版 sanitize_phase)
pub fn align_phase(phase: &mut [[f32; 56]; 3]) {
for _iter in 0..3 {
for ant in 1..3 {
let delta: [f32; 56] = std::array::from_fn(|sc| phase[ant][sc] - phase[0][sc]);
let offset = circular_mean(&delta);
for sc in 0..56 {
phase[ant][sc] -= offset;
// 折叠到 [-π, π]
phase[ant][sc] = ((phase[ant][sc] + PI) % (2.0 * PI)) - PI;
}
}
}
}
第五步:ESP32-S3 固件烧录与 WiFi 入网
有硬件的同学看这里。没硬件的可以跳过,直接进入第六步。
5.1 固件编译
WARNING
下面的命令在 Windows WSL2 或 Linux/macOS 下执行。不要在 Windows 原生 Git Bash 中运行 ESP-IDF,会遇到 MSYSTEM 环境变量冲突。
cd firmware/esp32-csi-node
# 配置 — 8MB Flash 版本(标准)
cp sdkconfig.defaults.8mb sdkconfig.defaults
# 编译固件(完整 ESP-IDF 编译命令较长,这里是关键步骤)
# 需要先 source esp-idf 环境
. /path/to/esp-idf/export.sh # Linux/macOS/WSL2
# C:\Espressif\esp-idf\export.ps1 # Windows PowerShell
idf.py build
TIP
ESP-IDF v5.4 + ESP32-S3 的 WiFi CSI 需要特定 Kconfig 配置:开启 CONFIG_ESP_WIFI_CSI_ENABLED=y。默认模板已预置。
5.2 烧录到 ESP32-S3
# 查找串口(Linux/macOS: /dev/ttyUSB0, Windows: COM7)
idf.py -p /dev/ttyUSB0 flash monitor
5.3 WiFi 入网配置
python firmware/esp32-csi-node/provision.py \
--port /dev/ttyUSB0 \
--ssid "YourRouterSSID" \
--password "YourRouterPassword" \
--target-ip 192.168.1.20 # ESP32 期望连接的 CSI 采集端 IP
5.4 验证 CSI 数据流出
# 打开串口监视器
python -m serial.tools.miniterm /dev/ttyUSB0 115200
如果看到连续输出类似以下内容,说明 CSI 数据正在正常传输:
CSI: ts=1234567890 ant=0 sc=0 amp=1.23 phase=-0.45
CSI: ts=1234567890 ant=0 sc=1 amp=1.45 phase=-0.32
CSI: ts=1234567890 ant=1 sc=0 amp=0.98 phase=0.12
...
第六步:端到端验证 — CSI 采集到姿态输出
把 Python 的 hardware/csi_extractor.py 接入 ESP32 数据流,或者直接用路由器镜像模式采集真实 CSI:
# 启动 CSI 采集服务(连接 ESP32 节点)
cd v1
python -m src.hardware.csi_extractor --port /dev/ttyUSB0 --baud 115200
# 同时在另一个终端启动 REST API(Rust Axum 服务)
cd rust-port/wifi-densepose-rs
cargo run -p wifi-densepose-api
API 端点:
# 查看当前检测到的人数
curl http://localhost:8080/api/v1/poses/current
# 获取最近 10 帧的骨架坐标
curl http://localhost:8080/api/v1/poses/history?limit=10
返回示例:
{
"poses": [
{
"pose_id": 1,
"keypoints": [
{"label": "nose", "x": 0.52, "y": 0.15, "confidence": 0.94},
{"label": "left_shoulder", "x": 0.38, "y": 0.28, "confidence": 0.91},
{"label": "right_shoulder","x": 0.65, "y": 0.27, "confidence": 0.89},
...
],
"timestamp_us": 1712345678901234
}
]
}
第七步:运行官方测试套件(可选扩展)
# Python 测试
cd v1
python -m pytest tests/ -x -q
# Rust 测试
cd rust-port/wifi-densepose-rs
cargo test --workspace --no-default-features
# 生成完整验证包(ADR-028 标准)
cd ../..
bash scripts/generate-witness-bundle.sh
cd dist/witness-bundle-ADR028-*/
bash VERIFY.sh
成功的话,验证包会输出 7/7 PASS,包含 Rust 测试结果、Python proof 哈希和固件文件 SHA-256。
常见问题排查
Q1: ESP32 不吐 CSI 数据,一片空白
WiFi CSI 需要路由器和 ESP32 都支持 802.11n,且部分路由器需要手动开启 CSI 采集功能(华硕/网件部分型号支持原生 CSI)。确认路由器固件版本,或换用 ESP32 作为 AP 模式的数据源。
Q2: python data/proof/verify.py 报错 numpy.linalg.LinAlgError
通常是 numpy/scipy 版本不兼容导致的。确保使用 numpy >= 1.24, scipy >= 1.11,不要用 conda 的旧版本镜像。建议用 venv 隔离环境:
pip install "numpy>=1.24" "scipy>=1.11"
python data/proof/verify.py
Q3: Rust cargo test 编译报错 linker cc not found
Windows 上缺少 MSVC toolchain。安装 Visual Studio Build Tools,勾选 "C++ CMake tools for Windows"。
Q4: Phase sanitization 之后幅度全是零或 NaN
检查输入 CSI JSON 的 amplitude 数值范围。如果原始数据是整数(未归一化),需要先除以缩放因子。参考 v1/data/proof/sample_csi_data.json 的数值范围(1.0~2.0)。
Q5: ESP32 固件烧录成功但 WiFi 连接失败
确认路由器没开 AP 隔离(Airplay/HomeCast 等功能有时会开启隔离)。另外 ESP32-S3 只支持 2.4GHz WiFi,确认路由器 SSID 不是 5GHz only。
Q6: 姿态关键点跳动剧烈,信号噪声大
这是最常见的问题,通常由多径干扰引起。处理方法:把 ESP32 天线和路由器天线错开 45° 角放置;或增加 RuvSense 的多径抑制参数(field_model.rs 中的 SVD 降噪)。
扩展阅读 / 进阶方向
1. RuvSense 多视角注意力融合
RuView 的核心亮点之一是 RuvSense 模块。它通过 attention.rs 中的 CrossViewpointAttention 机制,融合多个 ESP32 节点的 CSI 视角。几何偏置(GeometricBias)会加权不同节点的贡献,Cramer-Rao 界分析可以告诉你当前部署是否达到了理论精度上限。
2. 深入读 ADR 架构决策
项目包含 43 个 ADR,推荐从这几个入手:
3. 用 RuVector 训练自己的模型
wifi-densepose-train crate 已经集成了 RuVector v2.0.4 训练 pipeline。你可以用自己的室内 CSI 数据集(需要多视角采集和人工标注关键点)微调 ONNX 模型。ADR-016 记录了完整流程。
4. 搭建 RuvSense 多节点 mesh 网络
ADR-029/ADR-032 描述了多节点感知模式。用多个 ESP32-S3 + ESP32-C6 组成感知 mesh,配合 RuvSense 的 MultistaticArray 融合逻辑,可以实现跨房间的姿态跟踪。
5. ESP32 CSI 的固件层原理
想真正理解 CSI 数据从哪来的,可以读 firmware/esp32-csi-node/main/wifi_csi.c。核心是 ESP-IDF 的 wifi_csi_cb_t 回调函数——每次收到符合条件的 WiFi 帧就会触发一次 CSI 采集。TDM(时分复用)协议在 hardware/src/esp32/tdm.rs 中实现,保证多节点不会相互干扰。