30 Minutes | Intermediate-Advanced | Master WiFi CSI pose estimation principles, deploy Python/Rust dual versions, and complete an end-to-end pipeline with ESP32 sensing nodes.
Target Audience
- Developers interested in WiFi sensing and ubiquitous computing
- Engineers with a Python foundation who want to explore high-performance inference with Rust
- Researchers exploring non-camera-based human sensing solutions
Minimum requirements: Ability to understand Python classes and basic Rust syntax, with a Linux/macOS or WSL2 environment.
Core Dependencies and Environment
| Dependency | Version Requirement | Role |
|---|---|---|
| Python | β₯ 3.9 | Main codebase v1 |
| Rust | β₯ 1.75 | High-performance inference port |
| PyTorch | β₯ 2.1 | Neural network inference |
| NumPy + SciPy | Latest Stable | CSI signal processing |
| OpenCV | β₯ 4.8 | Pose visualization |
| ESP32-S3 Board | 8MB Flash | WiFi CSI hardware node (Optional) |
| AP-enabled Router | 802.11n and above | CSI data source |
WARNING
Standard ESP32 (original) and ESP32-C3 do not support CSI collection. This project explicitly requires ESP32-S3 (Xtensa dual-core) or ESP32-C6 + 60GHz mmWave module.
Full Project Structure
RuView/
βββ v1/ # Python Main Codebase
β βββ src/
β β βββ core/
β β β βββ csi_processor.py # CSI data structures and processing
β β β βββ phase_sanitizer.py # Phase noise correction
β β β βββ router_interface.py # Router/ESP32 communication
β β βββ hardware/
β β β βββ csi_extractor.py # ESP32 serial data extraction
β β β βββ router_interface.py # WiFi CSI frame capturing
β β βββ services/
β β βββ ... # Signal processing pipeline
β βββ data/
β β βββ proof/
β β βββ sample_csi_data.json # 1000 frames of deterministic synthetic CSI (seed=42)
β β βββ verify.py # SHA-256 pipeline verification script
β βββ tests/ # pytest suite
βββ rust-port/wifi-densepose-rs/ # Rust Port
β βββ crates/
β βββ wifi-densepose-core/ # Core types, CSI frame primitives
β βββ wifi-densepose-signal/ # RuvSense SOTA signal processing (14 modules)
β βββ wifi-densepose-nn/ # ONNX/PyTorch/Candle inference backend
β βββ wifi-densepose-train/ # RuVector training pipeline
β βββ wifi-densepose-hardware/ # ESP32 TDM protocol
β βββ wifi-densepose-api/ # Axum REST API
βββ firmware/esp32-csi-node/ # ESP32-S3 firmware source
βββ docs/adr/ # 43 Architecture Decision Records (ADR)
βββ pyproject.toml # Python dependency definitions
Step-by-Step Guide
Step 1: Understanding WiFi CSI Principles (Run Python simulation without hardware)
The essence of WiFi CSI is: Utilizing the changes in amplitude and phase caused by human body reflections as wireless signals propagate through space to achieve contact-free sensing.
When a person moves between a WiFi device (router) and a receiver, the CSI report records the complex values for each subcarrierβincluding amplitude and phase. While these changes are subtle, they are sufficient to reverse-engineer human poses and actions.
RuView's strength lies in its ability to convert CSI signals into 17-keypoint pose estimations through a complete pipeline (Phase Correction β Multipath Suppression β Neural Network Inference).
Run the full flow using built-in synthetic data before connecting hardware:
# Clone the project
git clone https://github.com/ruvnet/RuView.git
cd RuView
# Install Python dependencies
cd v1
pip install -e ".[dev]"
# Run deterministic pipeline verification (no hardware required)
python data/proof/verify.py
If everything is correct, you will see:
===== WiFi-DensePose Pipeline Verification =====
Generator: generate_reference_signal.py v1.0.0
Seed: 42
Frames: 1000 | Antennas: 3 | Subcarriers: 56
Loading sample CSI data... done
Running pipeline...
[1/4] Phase sanitization ............ OK
[2/4] Multistatic signal processing .. OK
[3/4] Neural network inference ....... OK
[4/4] Keypoint extraction ........... OK
Computing reference hash (SHA-256)...
VERDICT: PASS
TIP
This verification script uses synthetic CSI data generated with seed=42, making all results fully reproducible. Passing this step indicates your Python environment is ready for the next phase.
Step 2: Environment Setup (Python + Rust + WSL2)
Python Environment (venv recommended)
# Python 3.10 or 3.11 is recommended for best compatibility
python3.10 -m venv .venv
source .venv/bin/activate # Linux/macOS
# .venv\Scripts\activate # Windows PowerShell
pip install --upgrade pip
pip install -e ".[gpu]" # If NVIDIA GPU is available
pip install -e ".[dev]" # Development dependencies (including pytest)
Rust Environment (For high-performance inference port)
# Install Rust (if not already installed)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Verify
rustc --version # Should output rustc 1.75+
cargo --version # Should output cargo 1.75+
# Rust toolchain support for ESP32 development (Optional, only for hardware)
rustup target add xtensa-esp32-elf riscv32imc-esp-elf
For Windows Users: WSL2 is required
# Run in PowerShell as Administrator
wsl --install
# Enter WSL after restart
wsl -d Ubuntu-22.04
WARNING
ESP-IDF (ESP32 firmware toolchain) has environment variable conflicts with native Git Bash on Windows (the MSYSTEM variable interferes with ESP-IDF v5.4). To compile ESP32 firmware on Windows, use WSL2 or pure PowerShell.
Step 3: Running the Pose Estimation Pipeline (Python Version)
Now we use real code to feed CSI data into the inference pipeline.
3.1 CSI Data Structure
# v1/src/core/csi_processor.py
from __future__ import annotations
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class CSIFrame:
"""A single CSI frame β corresponds to one WiFi channel observation"""
timestamp_s: float
amplitude: np.ndarray # shape: (num_antennas, num_subcarriers)
phase: np.ndarray # shape: (num_antennas, num_subcarriers)
subcarrier_indices: np.ndarray # Subcarrier indices
@classmethod
def from_json(cls, data: dict) -> CSIFrame:
return cls(
timestamp_s=data["timestamp_s"],
amplitude=np.array(data["amplitude"], dtype=np.float32),
phase=np.zeros_like(data["amplitude"], dtype=np.float32), # JSON lacks phase, init manually
subcarrier_indices=np.arange(56), # Default 56 subcarriers
)
@dataclass
class PoseKeypoints:
"""17 COCO keypoint pose results"""
keypoints: List[np.ndarray] # (x, y, confidence) for each keypoint
pose_id: int
def get_skeleton(self) -> np.ndarray:
"""Returns an (N, 2) array of all keypoint coordinates for plotting"""
return np.array([p[:2] for p in self.keypoints])
3.2 Phase Correction (Eliminating LO Frequency Offset)
Raw CSI phase is heavily contaminated by LO (Local Oscillator) noise and must be corrected before use:
# v1/src/core/phase_sanitizer.py
import numpy as np
def sanitize_phase(csi: np.ndarray) -> np.ndarray:
"""
Iterative LO phase offset estimation + circular mean correction
Ref: ADR-014 SOTA Signal Processing
Args:
csi: Complex CSI matrix of shape (num_antennas, num_subcarriers)
Returns:
Sanitized phase matrix of the same shape
"""
num_antennas, num_subcarriers = csi.shape
phase = np.angle(csi) # Raw phase [-Ο, Ο]
# Iterative phase offset estimation (3 iterations usually converge)
for _ in range(3):
# Estimate offset on each subcarrier using phase difference of adjacent antennas
phase_offset = np.zeros(num_antennas)
for a in range(1, num_antennas):
delta = phase[a] - phase[0]
# Circular mean (considering phase wrapping)
delta_mean = np.arctan2(
np.mean(np.sin(delta)),
np.mean(np.cos(delta))
)
phase_offset[a] = delta_mean
phase[a] -= phase_offset[a]
return phase
def extract_clean_amplitude(csi: np.ndarray) -> np.ndarray:
"""Extracts clean amplitude, ignoring subcarriers heavily contaminated by noise"""
amplitude = np.abs(csi)
# Z-score filtering: discard subcarriers with abnormally low amplitude (likely interference)
z_scores = (amplitude - np.mean(amplitude, axis=1, keepdims=True)) \
/ (np.std(amplitude, axis=1, keepdims=True) + 1e-8)
mask = z_scores > -2.5
amplitude = np.where(mask, amplitude, 0)
return amplitude
3.3 End-to-End Inference Flow
# v1/demo_pipeline.py
import json
import numpy as np
from pathlib import Path
from src.core.csi_processor import CSIFrame, PoseKeypoints
from src.core.phase_sanitizer import sanitize_phase, extract_clean_amplitude
def load_csi_frames(json_path: str) -> list[CSIFrame]:
"""Loads CSI frame sequence from a JSON file"""
with open(json_path) as f:
data = json.load(f)
return [CSIFrame.from_json(frame) for frame in data["frames"]]
def run_pipeline(csi_frame: CSIFrame) -> PoseKeypoints:
"""
Complete CSI β Pose inference pipeline
Flow:
1. Phase Sanitization (Remove LO noise)
2. Amplitude Cleaning (Outlier filtering)
3. Multipath Suppression (RuvSense core signal processing)
4. Neural Network Inference (Simplified keypoint output)
5. Kalman Filter Tracking (17-keypoint tracker)
"""
# Step 1: Phase Sanitization
# Simulate raw CSI using complex ones (for demo purposes)
csi = csi_frame.amplitude * np.exp(1j * csi_frame.phase)
clean_phase = sanitize_phase(csi)
# Step 2: Clean Amplitude
clean_amplitude = extract_clean_amplitude(csi)
# Step 3: Multipath Suppression (Simplified version β Spatial averaging)
suppressed = clean_amplitude.mean(axis=0) # (56,) averaged across antennas
# Step 4: Simplified Pose Inference (Demo only)
# Actual call to PyTorch model: self.model(suppressed.unsqueeze(0))
# Mock output for 17 keypoints (x, y, conf)
keypoints = []
for i in range(17):
x = 0.5 + 0.05 * np.sin(i * 0.5) + np.random.normal(0, 0.02)
y = 0.5 + 0.3 * np.cos(i * 0.3) + np.random.normal(0, 0.02)
conf = 0.9 + np.random.normal(0, 0.05)
keypoints.append(np.array([x, y, np.clip(conf, 0, 1)]))
return PoseKeypoints(keypoints=keypoints, pose_id=0)
def main():
csi_path = Path(__file__).parent / "data/proof/sample_csi_data.json"
frames = load_csi_frames(str(csi_path))
print(f"Loaded {len(frames)} CSI frames")
# Run inference on the first 30 frames
for i, frame in enumerate(frames[:30]):
pose = run_pipeline(frame)
skeleton = pose.get_skeleton()
if i % 10 == 0:
print(f"Frame {i}: {len(skeleton)} keypoints, "
f"avg confidence={np.mean([k[2] for k in pose.keypoints]):.3f}")
print("Pipeline run complete.")
if __name__ == "__main__":
main()
Run:
python demo_pipeline.py
# Example Output:
# Loaded 1000 CSI frames
# Frame 0: 17 keypoints, avg confidence=0.901
# Frame 10: 17 keypoints, avg confidence=0.898
# ...
# Pipeline run complete.
Step 4: Rust Version Compilation and Testing (High-Speed Inference)
While the Python version validates the logic, we switch to Rust for the production path capable of millisecond-level latency.
cd rust-port/wifi-densepose-rs
# Check compilation (No GPU required, fast validation of single crate)
cargo check -p wifi-densepose-core --no-default-features
cargo check -p wifi-densepose-signal --no-default-features
# Run full test suite (1,031+ tests, ~2 minutes)
cargo test --workspace --no-default-features
TIP
--no-default-features skips GPU/CUDA-related code, allowing it to run on machines without NVIDIA drivers. Passing tests indicates the core pipeline logic is fully correct.
The core Rust structure corresponds directly to the Python logic:
// crates/wifi-densepose-core/src/types.rs
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
// WiFi CSI Frame β Rust uses strong typing instead of Python's dataclass
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CSIFrame {
pub timestamp_us: u64,
pub amplitude: Vec<Vec<f32>>, // (antennas, subcarriers)
pub phase_raw: Vec<Vec<f32>>, // Raw phase (unsanitized)
pub subcarrier_idx: Vec<i32>,
}
// Pose keypoints processed by RuvSense
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoseKeypoints {
pub keypoints: Vec<Keypoint>,
pub pose_id: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Keypoint {
pub x: f32,
pub y: f32,
pub confidence: f32,
pub label: &'static str, // "nose", "left_shoulder", etc.
}
// Error types β Rust style
#[derive(Debug, thiserror::Error)]
pub enum CSIPipelineError {
#[error("Insufficient antennas: got {0}, need >= 2")]
InsufficientAntennas(usize),
#[error("NaN detected in CSI data")]
NaNInCSI,
#[error("Phase sanitization failed: {0}")]
PhaseSanitization(String),
}
// crates/wifi-densepose-signal/src/ruvsense/phase_align.rs
// Phase Sanitization β High-performance Rust version (SIMD accelerated)
use std::f32::consts::PI;
pub fn circular_mean(samples: &[f32]) -> f32 {
// Circular mean = atan2(sum(sin), sum(cos))
let (sin_sum, cos_sum) = samples.iter()
.map(|&x| (x.sin(), x.cos()))
.fold((0.0_f32, 0.0_f32), |(s, c), (si, co)| (s + si, c + co));
sin_sum.atan2(cos_sum)
}
/// Iterative LO Phase Offset Correction (corresponds to Python's sanitize_phase)
pub fn align_phase(phase: &mut [[f32; 56]; 3]) {
for _iter in 0..3 {
for ant in 1..3 {
let delta: [f32; 56] = std::array::from_fn(|sc| phase[ant][sc] - phase[0][sc]);
let offset = circular_mean(&delta);
for sc in 0..56 {
phase[ant][sc] -= offset;
// Wrap to [-Ο, Ο]
phase[ant][sc] = ((phase[ant][sc] + PI) % (2.0 * PI)) - PI;
}
}
}
}
Step 5: ESP32-S3 Firmware Flashing and WiFi Provisioning
If you have the hardware, follow these steps. Otherwise, skip to Step 6.
5.1 Firmware Compilation
WARNING
Execute the following commands in Windows WSL2 or Linux/macOS. Do not run ESP-IDF in native Windows Git Bash due to environment variable conflicts.
cd firmware/esp32-csi-node
# Configuration β 8MB Flash version (standard)
cp sdkconfig.defaults.8mb sdkconfig.defaults
# Compile firmware (Assuming ESP-IDF is sourced)
. /path/to/esp-idf/export.sh # Linux/macOS/WSL2
# C:\Espressif\esp-idf\export.ps1 # Windows PowerShell
idf.py build
TIP
WiFi CSI with ESP-IDF v5.4 + ESP32-S3 requires specific Kconfig: CONFIG_ESP_WIFI_CSI_ENABLED=y. This is pre-set in the template.
5.2 Flashing to ESP32-S3
# Find your serial port (Linux/macOS: /dev/ttyUSB0, Windows: COM7)
idf.py -p /dev/ttyUSB0 flash monitor
5.3 WiFi Provisioning
python firmware/esp32-csi-node/provision.py \
--port /dev/ttyUSB0 \
--ssid "YourRouterSSID" \
--password "YourRouterPassword" \
--target-ip 192.168.1.20 # IP of the collection machine
5.4 Verifying CSI Stream
# Open serial monitor
python -m serial.tools.miniterm /dev/ttyUSB0 115200
If you see continuous output like the following, CSI data is transmitting normally:
CSI: ts=1234567890 ant=0 sc=0 amp=1.23 phase=-0.45
CSI: ts=1234567890 ant=0 sc=1 amp=1.45 phase=-0.32
CSI: ts=1234567890 ant=1 sc=0 amp=0.98 phase=0.12
...
Step 6: End-to-End Validation β CSI Collection to Pose Output
Connect Python's hardware/csi_extractor.py to the ESP32 stream, or use router mirror mode for real CSI:
# Start CSI extraction service (connected to ESP32 node)
cd v1
python -m src.hardware.csi_extractor --port /dev/ttyUSB0 --baud 115200
# Simultaneously start the REST API (Rust Axum service) in another terminal
cd rust-port/wifi-densepose-rs
cargo run -p wifi-densepose-api
API Endpoints:
# Check current person count
curl http://localhost:8080/api/v1/poses/current
# Get skeleton coordinates for the last 10 frames
curl http://localhost:8080/api/v1/poses/history?limit=10
Example Response:
{
"poses": [
{
"pose_id": 1,
"keypoints": [
{"label": "nose", "x": 0.52, "y": 0.15, "confidence": 0.94},
{"label": "left_shoulder", "x": 0.38, "y": 0.28, "confidence": 0.91},
{"label": "right_shoulder","x": 0.65, "y": 0.27, "confidence": 0.89},
...
],
"timestamp_us": 1712345678901234
}
]
}
Step 7: Running the Official Test Suite (Optional Extension)
# Python Tests
cd v1
python -m pytest tests/ -x -q
# Rust Tests
cd rust-port/wifi-densepose-rs
cargo test --workspace --no-default-features
# Generate full witness bundle (ADR-028 Standard)
cd ../..
bash scripts/generate-witness-bundle.sh
cd dist/witness-bundle-ADR028-*/
bash VERIFY.sh
Upon success, the bundle will output 7/7 PASS, including Rust test results, Python proof hashes, and firmware SHA-256 files.
Troubleshooting
Q1: ESP32 is not outputting CSI data, it's blank
WiFi CSI requires both the router and ESP32 to support 802.11n. Some routers require manual activation of CSI features. Verify router firmware or use another ESP32 in AP mode as the data source.
Q2: python data/proof/verify.py errors with numpy.linalg.LinAlgError
Usually caused by incompatible NumPy/SciPy versions. Ensure numpy >= 1.24, scipy >= 1.11. Avoid outdated conda mirrors. Use venv for isolation:
pip install "numpy>=1.24" "scipy>=1.11"
python data/proof/verify.py
Q3: Rust cargo test fails with linker cc not found
Missing MSVC toolchain on Windows. Install Visual Studio Build Tools and select "C++ CMake tools for Windows".
Q4: Amplitude is all zeros or NaN after phase sanitization
Check the amplitude range in your CSI JSON. If raw data is integer-based (unnormalized), divide by a scaling factor. Refer to the range in v1/data/proof/sample_csi_data.json (1.0~2.0).
Q5: Firmware flashed successfully but WiFi connection fails
Ensure AP Isolation (Airplay/HomeCast etc.) is disabled on the router. Also, ESP32-S3 only supports 2.4GHz WiFi; ensure the SSID is not 5GHz only.
Q6: Pose keypoints are jittery with high signal noise
This is usually caused by multipath interference. Solutions: Stagger ESP32 and router antennas at 45Β° angles, or increase RuvSense multipath suppression parameters (SVD denoising in field_model.rs).
Extended Reading / Advanced Directions
1. RuvSense Multi-view Attention Fusion
One of RuView's highlights is the RuvSense module. It fuses CSI viewpoints from multiple ESP32 nodes via the CrossViewpointAttention mechanism in attention.rs. GeometricBias weights contributions from different nodes, while Cramer-Rao Bound analysis indicates if your deployment reaches theoretical accuracy limits.
2. Deep Dive into ADRs
The project includes 43 ADRs. Recommended starting points:
- ADR-014 β SOTA Signal Processing Selection
- ADR-024 β Contrastive CSI Embeddings (AETHER re-ID base)
- ADR-028 β ESP32 Capability Audit (Hardware selection guide)
3. Train Your Own Models with RuVector
The wifi-densepose-train crate integrates the RuVector v2.0.4 pipeline. You can fine-tune ONNX models using your own indoor CSI datasets (requires multi-view collection and manual keypoint annotation). ADR-016 documents the full process.
4. Build a RuvSense Multi-node Mesh Network
ADR-029/ADR-032 describes multi-node sensing. Combining multiple ESP32-S3 + ESP32-C6 units into a mesh with RuvSense's MultistaticArray logic enables cross-room pose tracking.
5. ESP32 CSI Firmware Internals
To understand where CSI data originates, read firmware/esp32-csi-node/main/wifi_csi.c. The core is the ESP-IDF wifi_csi_cb_t callback, triggered for every valid WiFi frame. The TDM (Time Division Multiplexing) protocol implemented in hardware/src/esp32/tdm.rs ensures nodes do not interfere with each other.