30 分 | 中~上級 | WiFi CSI 姿勢推定の原理をマスターし、Python/Rust 両バージョンをデプロイ。ESP32 センシングノードをネットワークに参加させ、エンドツーエンドのパイプラインを完結させる
対象読者
- WiFi センシングやユビキタスコンピューティングに興味がある開発者
- Python の基礎があり、Rust による高性能推論を学びたいエンジニア
- カメラを使用しない人間計測ソリューションを探索している研究者
最低要件:Python のクラスと基本的な Rust 構文を理解でき、Linux/macOS または WSL2 環境を所有していること。
コア依存関係と環境
| 依存関係 | バージョン要件 | 役割 |
|---|---|---|
| Python | ≥ 3.9 | メインコードベース v1 |
| Rust | ≥ 1.75 | 高性能推論ポート |
| PyTorch | ≥ 2.1 | ニューラルネットワーク推論 |
| NumPy + SciPy | 最新安定版 | CSI 信号処理 |
| OpenCV | ≥ 4.8 | 姿勢の可視化 |
| ESP32-S3 開発ボード | 8MB Flash | WiFi CSI ハードウェアノード(任意) |
| AP 対応ルーター | 802.11n 以上 | CSI データソース |
WARNING
通常の ESP32(初代)および ESP32-C3 は CSI 収集をサポートしていません。本プロジェクトには ESP32-S3(Xtensa デュアルコア)または ESP32-C6 + 60GHz mmWave モジュールが明示的に必要です。
プロジェクト構造の全容
RuView/
├── v1/ # Python メインコードベース
│ ├── src/
│ │ ├── core/
│ │ │ ├── csi_processor.py # CSI データ構造と処理
│ │ │ ├── phase_sanitizer.py # 位相ノイズ補正
│ │ │ └── router_interface.py # ルーター/ESP32 通信
│ │ ├── hardware/
│ │ │ ├── csi_extractor.py # ESP32 シリアルデータ抽出
│ │ │ └── router_interface.py # WiFi CSI フレームキャプチャ
│ │ └── services/
│ │ └── ... # 信号処理パイプライン
│ ├── data/
│ │ └── proof/
│ │ ├── sample_csi_data.json # 1000フレームの決定論的合成CSI(seed=42)
│ │ └── verify.py # SHA-256 パイプライン検証スクリプト
│ └── tests/ # pytest テストスイート
├── rust-port/wifi-densepose-rs/ # Rust 移植版
│ └── crates/
│ ├── wifi-densepose-core/ # コア型、CSI フレームプリミティブ
│ ├── wifi-densepose-signal/ # RuvSense SOTA 信号処理(14モジュール)
│ ├── wifi-densepose-nn/ # ONNX/PyTorch/Candle 推論バックエンド
│ ├── wifi-densepose-train/ # RuVector 学習パイプライン
│ ├── wifi-densepose-hardware/ # ESP32 TDM プロトコル
│ └── wifi-densepose-api/ # Axum REST API
├── firmware/esp32-csi-node/ # ESP32-S3 ファームウェアソース
├── docs/adr/ # 43件のアーキテクチャ決定記録(ADR)
└── pyproject.toml # Python 依存関係定義
ステップ・バイ・ステップ
ステップ 1:WiFi CSI の原理を理解する(ハードウェアなしで Python シミュレーションを実行)
WiFi CSI の本質は、無線信号が空間を伝播する際、人体による反射で生じる振幅と位相の変化を利用して、非接触センシングを実現することにあります。
WiFi デバイス(ルーター)と受信機の間に人が移動すると、CSI レポートには各サブキャリアの複素数値(振幅/amplitude と 位相/phase)が記録されます。これらの変化は微細ですが、人体の姿勢や動きを逆算するには十分な情報を含んでいます。
RuView の優れた点は、CSI 信号を一連のパイプライン(位相補正 → 多重パス抑制 → ニューラルネットワーク推論)を通じて、17 個のキーポイント(keypoint)による姿勢推定に変換することです。
まずはハードウェアを接続せず、組み込みの合成データで全工程を走らせてみましょう:
# プロジェクトをクローン
git clone https://github.com/ruvnet/RuView.git
cd RuView
# Python 依存関係をインストール
cd v1
pip install -e ".[dev]"
# 決定論的パイプライン検証を実行(ハードウェア不要)
python data/proof/verify.py
正常に動作すれば、以下のように表示されます:
===== WiFi-DensePose Pipeline Verification =====
Generator: generate_reference_signal.py v1.0.0
Seed: 42
Frames: 1000 | Antennas: 3 | Subcarriers: 56
Loading sample CSI data... done
Running pipeline...
[1/4] Phase sanitization ............ OK
[2/4] Multistatic signal processing .. OK
[3/4] Neural network inference ....... OK
[4/4] Keypoint extraction ........... OK
Computing reference hash (SHA-256)...
VERDICT: PASS
TIP
この検証スクリプトは seed=42 で生成された合成 CSI データを使用しており、すべての結果は完全に再現可能です。このステップをクリアできれば、Python 環境に問題はないため、次のステップへ進めます。
ステップ 2:環境構築(Python + Rust + WSL2)
Python 環境(venv 推奨)
# 互換性が最も高い Python 3.10 または 3.11 を推奨
python3.10 -m venv .venv
source .venv/bin/activate # Linux/macOS
# .venv\Scripts\activate # Windows PowerShell
pip install --upgrade pip
pip install -e ".[gpu]" # NVIDIA GPU をお持ちの場合
pip install -e ".[dev]" # 開発用依存関係(pytest 含む)
Rust 環境(高性能推論ポート用)
# Rust のインストール(未インストールの場合)
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# 検証
rustc --version # rustc 1.75+ が出力されること
cargo --version # cargo 1.75+ が出力されること
# Rust ツールチェーンの ESP32 開発サポート(任意、ハードウェアがある場合のみ)
rustup target add xtensa-esp32-elf riscv32imc-esp-elf
Windows ユーザー:WSL2 が必須です
# PowerShell を管理者権限で実行
wsl --install
# 再起動後に WSL へ入る
wsl -d Ubuntu-22.04
WARNING
ESP-IDF(ESP32 ファームウェアコンパイル用ツールチェーン)は、Windows ネイティブの Git Bash 下では環境変数の競合(MSYSTEM 変数が ESP-IDF v5.4 を妨害)が発生します。Windows でファームウェアをビルドする場合は、WSL2 または純粋な PowerShell の使用を推奨します。
ステップ 3:Python 版で姿勢推定パイプラインを走らせる
実際のコードを使用して、CSI データを推論パイプラインに投入します。
3.1 CSI データ構造
# v1/src/core/csi_processor.py
from __future__ import annotations
import numpy as np
from dataclasses import dataclass
from typing import List
@dataclass
class CSIFrame:
"""単一の CSI フレーム — WiFi の 1 回のチャネル観測に対応"""
timestamp_s: float
amplitude: np.ndarray # shape: (num_antennas, num_subcarriers)
phase: np.ndarray # shape: (num_antennas, num_subcarriers)
subcarrier_indices: np.ndarray # サブキャリアインデックス
@classmethod
def from_json(cls, data: dict) -> CSIFrame:
return cls(
timestamp_s=data["timestamp_s"],
amplitude=np.array(data["amplitude"], dtype=np.float32),
phase=np.zeros_like(data["amplitude"], dtype=np.float32), # JSONに位相が含まれない場合、手動で初期化
subcarrier_indices=np.arange(56), # デフォルトは 56 サブキャリア
)
@dataclass
class PoseKeypoints:
"""17 個の COCO キーポイント姿勢結果"""
keypoints: List[np.ndarray] # 各キーポイントの (x, y, confidence)
pose_id: int
def get_skeleton(self) -> np.ndarray:
"""描画用にすべてのキーポイントの (N, 2) 座標配列を返す"""
return np.array([p[:2] for p in self.keypoints])
3.2 位相補正(LO 周波数偏差の除去)
生の CSI 位相は LO(ローカル発振器)ノイズによって激しく汚染されているため、使用前に補正が必要です:
# v1/src/core/phase_sanitizer.py
import numpy as np
def sanitize_phase(csi: np.ndarray) -> np.ndarray:
"""
反復 LO 位相オフセット推定 + 円周平均補正
参考文献: ADR-014 SOTA Signal Processing
引数:
csi: shape (num_antennas, num_subcarriers) の複素 CSI 行列
戻り値:
補正後の位相行列(同じ shape)
"""
num_antennas, num_subcarriers = csi.shape
phase = np.angle(csi) # 生の位相 [-π, π]
# 反復位相オフセット推定(通常 3 回の反復で収束)
for _ in range(3):
# 各サブキャリアにおいて、隣接アンテナ間の位相差からオフセットを推定
phase_offset = np.zeros(num_antennas)
for a in range(1, num_antennas):
delta = phase[a] - phase[0]
# 円周平均(位相ラッピングを考慮)
delta_mean = np.arctan2(
np.mean(np.sin(delta)),
np.mean(np.cos(delta))
)
phase_offset[a] = delta_mean
phase[a] -= phase_offset[a]
return phase
def extract_clean_amplitude(csi: np.ndarray) -> np.ndarray:
"""ノイズ汚染が激しいサブキャリアを無視して、クリーンな振幅を抽出"""
amplitude = np.abs(csi)
# Z-score フィルタリング:振幅が異常に低いサブキャリア(干渉の可能性)を除外
z_scores = (amplitude - np.mean(amplitude, axis=1, keepdims=True)) \
/ (np.std(amplitude, axis=1, keepdims=True) + 1e-8)
mask = z_scores > -2.5
amplitude = np.where(mask, amplitude, 0)
return amplitude
3.3 エンドツーエンドの推論フロー
# v1/demo_pipeline.py
import json
import numpy as np
from pathlib import Path
from src.core.csi_processor import CSIFrame, PoseKeypoints
from src.core.phase_sanitizer import sanitize_phase, extract_clean_amplitude
def load_csi_frames(json_path: str) -> list[CSIFrame]:
"""JSON ファイルから CSI フレームシーケンスをロード"""
with open(json_path) as f:
data = json.load(f)
return [CSIFrame.from_json(frame) for frame in data["frames"]]
def run_pipeline(csi_frame: CSIFrame) -> PoseKeypoints:
"""
完全な CSI → Pose 推論パイプライン
フロー:
1. 位相補正(LO ノイズの除去)
2. 振幅のクリーンアップ(異常値フィルタリング)
3. 多重パス抑制(RuvSense 信号処理コア)
4. ニューラルネットワーク推論(簡略化されたキーポイント出力)
5. カルマンフィルタによる追跡(17キーポイントトラッカー)
"""
# ステップ 1: 位相補正
# デモ用に全 1 の複素数で生の CSI をシミュレート
csi = csi_frame.amplitude * np.exp(1j * csi_frame.phase)
clean_phase = sanitize_phase(csi)
# ステップ 2: クリーンな振幅
clean_amplitude = extract_clean_amplitude(csi)
# ステップ 3: 多重パス抑制(簡略版 — 空間平均を取る)
suppressed = clean_amplitude.mean(axis=0) # (56,) アンテナ次元に沿って平均
# ステップ 4: 簡略化された姿勢推論(デモ用)
# 実際は PyTorch モデルを呼び出す: self.model(suppressed.unsqueeze(0))
# ここではダミー出力で 17 個のキーポイント (x, y, conf) をシミュレート
keypoints = []
for i in range(17):
x = 0.5 + 0.05 * np.sin(i * 0.5) + np.random.normal(0, 0.02)
y = 0.5 + 0.3 * np.cos(i * 0.3) + np.random.normal(0, 0.02)
conf = 0.9 + np.random.normal(0, 0.05)
keypoints.append(np.array([x, y, np.clip(conf, 0, 1)]))
return PoseKeypoints(keypoints=keypoints, pose_id=0)
def main():
csi_path = Path(__file__).parent / "data/proof/sample_csi_data.json"
frames = load_csi_frames(str(csi_path))
print(f"{len(frames)} 個の CSI フレームをロードしました")
# 最初の 30 フレームで推論を実行
for i, frame in enumerate(frames[:30]):
pose = run_pipeline(frame)
skeleton = pose.get_skeleton()
if i % 10 == 0:
print(f"フレーム {i}: {len(skeleton)} キーポイント, "
f"平均信頼度={np.mean([k[2] for k in pose.keypoints]):.3f}")
print("パイプラインの実行が完了しました。")
if __name__ == "__main__":
main()
実行:
python demo_pipeline.py
# 出力例:
# Loaded 1000 CSI frames
# Frame 0: 17 keypoints, avg confidence=0.901
# Frame 10: 17 keypoints, avg confidence=0.898
# ...
# Pipeline run complete.
ステップ 4:Rust 版のビルドとテスト(超高速推論)
Python 版でロジックの正しさを検証したら、次は Rust 版に切り替えます。これはミリ秒単位の遅延で動作する本番用のパスです。
cd rust-port/wifi-densepose-rs
# コンパイルチェック(GPU 不要、単一 crate での迅速な検証)
cargo check -p wifi-densepose-core --no-default-features
cargo check -p wifi-densepose-signal --no-default-features
# フルテストスイートの実行(1,031以上のテスト、約 2 分)
cargo test --workspace --no-default-features
TIP
--no-default-features を指定することで GPU/CUDA 関連のコードをスキップでき、NVIDIA ドライバのないマシンでも実行可能です。テストが通れば、コアパイプラインのロジックが完全に正しいことを意味します。
主要な Rust コード構造は以下の通りです。設計思想が Python 版と完全に対応していることがわかります:
// crates/wifi-densepose-core/src/types.rs
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
// WiFi CSI フレーム — Rust 版では Python の dataclass に代わり強固な型を使用
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CSIFrame {
pub timestamp_us: u64,
pub amplitude: Vec<Vec<f32>>, // (antennas, subcarriers)
pub phase_raw: Vec<Vec<f32>>, // 生の位相(未補正)
pub subcarrier_idx: Vec<i32>,
}
// RuvSense 処理後の姿勢キーポイント
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoseKeypoints {
pub keypoints: Vec<Keypoint>,
pub pose_id: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Keypoint {
pub x: f32,
pub y: f32,
pub confidence: f32,
pub label: &'static str, // "nose", "left_shoulder" など
}
// エラー型 — Rust スタイル
#[derive(Debug, thiserror::Error)]
pub enum CSIPipelineError {
#[error("アンテナ不足: {0} 個、最低 2 個必要です")]
InsufficientAntennas(usize),
#[error("CSI データ内に NaN を検出しました")]
NaNInCSI,
#[error("位相補正に失敗しました: {0}")]
PhaseSanitization(String),
}
// crates/wifi-densepose-signal/src/ruvsense/phase_align.rs
// 位相補正 — Rust 高性能版(SIMD 加速)
use std::f32::consts::PI;
pub fn circular_mean(samples: &[f32]) -> f32 {
// 円周平均 = atan2(sum(sin), sum(cos))
let (sin_sum, cos_sum) = samples.iter()
.map(|&x| (x.sin(), x.cos()))
.fold((0.0_f32, 0.0_f32), |(s, c), (si, co)| (s + si, c + co));
sin_sum.atan2(cos_sum)
}
/// 反復 LO 位相オフセット補正(Python 版の sanitize_phase に対応)
pub fn align_phase(phase: &mut [[f32; 56]; 3]) {
for _iter in 0..3 {
for ant in 1..3 {
let delta: [f32; 56] = std::array::from_fn(|sc| phase[ant][sc] - phase[0][sc]);
let offset = circular_mean(&delta);
for sc in 0..56 {
phase[ant][sc] -= offset;
// [-π, π] の範囲に折り畳む
phase[ant][sc] = ((phase[ant][sc] + PI) % (2.0 * PI)) - PI;
}
}
}
}
ステップ 5:ESP32-S3 ファームウェアの書き込みと WiFi 接続
ハードウェアをお持ちの方はこちら。持っていない方はスキップしてステップ 6 へ進んでください。
5.1 ファームウェアのビルド
WARNING
以下のコマンドは Windows WSL2 または Linux/macOS で実行してください。Windows ネイティブの Git Bash で ESP-IDF を実行すると、環境変数の競合が発生します。
cd firmware/esp32-csi-node
# 設定 — 8MB Flash 版(標準)
cp sdkconfig.defaults.8mb sdkconfig.defaults
# ファームウェアのビルド(ESP-IDF のフルコマンドは長いですが、これが重要です)
# 先に esp-idf 環境を source する必要があります
. /path/to/esp-idf/export.sh # Linux/macOS/WSL2
# C:\Espressif\esp-idf\export.ps1 # Windows PowerShell
idf.py build
TIP
ESP-IDF v5.4 + ESP32-S3 の WiFi CSI には特定の Kconfig 設定が必要です。CONFIG_ESP_WIFI_CSI_ENABLED=y を有効にします。デフォルトテンプレートでは設定済みです。
5.2 ESP32-S3 への書き込み
# シリアルポートを探す(Linux/macOS: /dev/ttyUSB0, Windows: COM7)
idf.py -p /dev/ttyUSB0 flash monitor
5.3 WiFi 設定(プロビジョニング)
python firmware/esp32-csi-node/provision.py \
--port /dev/ttyUSB0 \
--ssid "YourRouterSSID" \
--password "YourRouterPassword" \
--target-ip 192.168.1.20 # ESP32 が接続する CSI 収集用ホスト IP
5.4 CSI データ出力の確認
# シリアルモニターを開く
python -m serial.tools.miniterm /dev/ttyUSB0 115200
以下のような内容が連続して出力されていれば、CSI データが正常に転送されています:
CSI: ts=1234567890 ant=0 sc=0 amp=1.23 phase=-0.45
CSI: ts=1234567890 ant=0 sc=1 amp=1.45 phase=-0.32
CSI: ts=1234567890 ant=1 sc=0 amp=0.98 phase=0.12
...
ステップ 6:エンドツーエンドの検証 — CSI 収集から姿勢出力まで
Python の hardware/csi_extractor.py を ESP32 データストリームに接続するか、ルーターのミラーモードを使用して生の CSI を収集します:
# CSI 収集サービスを起動(ESP32 ノードに接続)
cd v1
python -m src.hardware.csi_extractor --port /dev/ttyUSB0 --baud 115200
# 同時に別のターミナルで REST API(Rust Axum サービス)を起動
cd rust-port/wifi-densepose-rs
cargo run -p wifi-densepose-api
API エンドポイント:
# 現在検出されている人数を確認
curl http://localhost:8080/api/v1/poses/current
# 直近 10 フレームのスケルトン座標を取得
curl http://localhost:8080/api/v1/poses/history?limit=10
レスポンス例:
{
"poses": [
{
"pose_id": 1,
"keypoints": [
{"label": "nose", "x": 0.52, "y": 0.15, "confidence": 0.94},
{"label": "left_shoulder", "x": 0.38, "y": 0.28, "confidence": 0.91},
{"label": "right_shoulder","x": 0.65, "y": 0.27, "confidence": 0.89},
...
],
"timestamp_us": 1712345678901234
}
]
}
ステップ 7:公式テストスイートの実行(任意)
# Python のテスト
cd v1
python -m pytest tests/ -x -q
# Rust のテスト
cd rust-port/wifi-densepose-rs
cargo test --workspace --no-default-features
# 完全な検証パッケージの生成(ADR-028 標準)
cd ../..
bash scripts/generate-witness-bundle.sh
cd dist/witness-bundle-ADR028-*/
bash VERIFY.sh
成功すると、検証パッケージは 7/7 PASS を出力し、これには Rust のテスト結果、Python proof のハッシュ、ファームウェアファイルの SHA-256 が含まれます。
よくあるトラブルシューティング
Q1: ESP32 が CSI データを出力せず、何も表示されない
WiFi CSI はルーターと ESP32 の両方が 802.11n をサポートしている必要があり、一部のルーターでは手動で CSI 収集機能を有効にする必要があります(ASUS/Netgear の一部モデルがネイティブ対応)。ルーターのファームウェアバージョンを確認するか、ESP32 を AP モードにしてデータソースとして使用してください。
Q2: python data/proof/verify.py で numpy.linalg.LinAlgError が発生する
numpy/scipy のバージョン不互換が原因であることが多いです。numpy >= 1.24, scipy >= 1.11 を使用しているか確認してください。conda の古いミラーは避け、venv で環境を分離することを推奨します:
pip install "numpy>=1.24" "scipy>=1.11"
python data/proof/verify.py
Q3: Rust の cargo test で linker cc not found エラーが出る
Windows 上で MSVC ツールチェーンが不足しています。Visual Studio Build Tools をインストールし、"C++ CMake tools for Windows" にチェックを入れてください。
Q4: 位相補正(Phase sanitization)後に振幅がすべてゼロまたは NaN になる
入力 CSI JSON の amplitude の値の範囲を確認してください。生データが整数(未正規化)の場合は、スケーリング係数で割る必要があります。v1/data/proof/sample_csi_data.json の値の範囲(1.0〜2.0)を参考にしてください。
Q5: ESP32 の書き込みは成功したが WiFi 接続に失敗する
ルーターで AP アイソレーション(Airplay/HomeCast 機能などで有効になることがあります)がオフであることを確認してください。また、ESP32-S3 は 2.4GHz WiFi のみをサポートしているため、SSID が 5GHz 限定になっていないか確認してください。
Q6: 姿勢のキーポイントが激しく跳ね、ノイズが多い
これは最も一般的な問題で、通常は多重パス干渉が原因です。解決策:ESP32 のアンテナとルーターのアンテナを 45° ずらして配置する、あるいは RuvSense の多重パス抑制パラメータ(field_model.rs 内の SVD ノイズ除去)を増やしてください。
関連資料 / ステップアップ
1. RuvSense 多視点アテンションフュージョン
RuView の核心的なハイライトの一つは RuvSense モジュールです。attention.rs 内の CrossViewpointAttention メカニズムを通じて、複数の ESP32 ノードからの CSI 視点を融合します。幾何学的バイアス(GeometricBias)により各ノードの寄与を重み付けし、クラメール・ラオの下限(CRB)分析によって、現在の配置が理論的な精度限界に達しているかを確認できます。
2. ADR アーキテクチャ決定記録を読み解く
プロジェクトには 43 件の ADR が含まれています。まずは以下から読み始めるのがお勧めです:
- ADR-014 — SOTA 信号処理スキームの選定
- ADR-024 — 対照学習による CSI エンベディング(AETHER re-ID の基礎)
- ADR-028 — ESP32 能力監査(ハードウェア選定の必読資料)
3. RuVector を使って独自のモデルをトレーニングする
wifi-densepose-train crate には、RuVector v2.0.4 学習パイプラインが統合されています。独自の室内 CSI データセット(多視点収集とキーポイントのアノテーションが必要)を使用して、ONNX モデルを微調整できます。ADR-016 に全プロセスが記録されています。
4. RuvSense 多ノードメッシュネットワークの構築
ADR-029/ADR-032 では多ノードセンシングモードについて解説しています。複数の ESP32-S3 + ESP32-C6 でセンシングメッシュを構成し、RuvSense の MultistaticArray 融合ロジックと組み合わせることで、部屋を跨いだ姿勢追跡が可能になります。
5. ESP32 CSI のファームウェア層の原理
CSI データがどこから来るのかを真に理解するには、firmware/esp32-csi-node/main/wifi_csi.c を参照してください。核となるのは ESP-IDF の wifi_csi_cb_t コールバック関数で、条件に合う WiFi フレームを受信するたびに CSI 収集がトリガーされます。TDM(時分割多重)プロトコルは hardware/src/esp32/tdm.rs で実装されており、ノード間の干渉を防いでいます。