This guide walks through adding a new analyzer to the diagnostics pipeline. The pipeline is defined in mod.rs and runs during the streaming analyze() pass: each analyzer declares the ULog topics it needs, receives messages one at a time, and emits structured Diagnostic results at the end.
If you're new to the module, read rc_loss.rs first — it's the shortest complete reference implementation.
Key properties to keep in mind when designing an analyzer:
- Single-pass, streaming. You see each message exactly once, in timestamp order. You don't get a random-access view of the whole log. If your detection needs a window, buffer it yourself inside the analyzer struct.
- One log at a time. There is no batch/training phase. If your detector needs prior data (e.g. a trained model), it has to be baked into the binary as a constant, loaded from disk at startup, or derived from the current log's early samples before you start emitting results.
- Topic-scoped dispatch. Messages are only delivered for topics you list in
required_topics(). Don't try to filter them yourself inon_message— just declare what you need. - Performance budget. The whole diagnostic pass has a 500ms budget enforced by
cargo benchin CI. A ~4MB log currently runs in ~37ms end-to-end. Stay cheap per message.
Every analyzer emits typed, structured evidence — not freeform strings or maps. The frontend matches on evidence.type and renders a specific UI for each variant. Changing an existing variant's fields is a breaking schema change, so pick field names carefully the first time.
Edit mod.rs and add your variant:
pub enum Evidence {
// ... existing variants ...
ZAxisVibrationAnomaly {
score: f32,
peak_accel_m_s2: f32,
window_start_us: u64,
window_end_us: u64,
},
}Also bump ANALYSIS_VERSION in the same file when your analyzer is ready to ship. That tells the reprocessing pipeline historical logs need a re-scan.
Create crates/converter/src/diagnostics/your_analyzer.rs. Minimal skeleton:
//! Short description of what this analyzer detects and how.
//!
//! Topics consumed, thresholds used, and any known limitations or fixture gaps
//! (use SKIP_FIXTURE: <reason> if no real-world log exists yet).
use super::{parse_field, Analyzer, Diagnostic, Evidence, Severity};
use px4_ulog::stream_parser::model::DataMessage;
const SOME_THRESHOLD: f32 = 2.5;
pub struct YourAnalyzer {
// Per-flight state. Reset per log.
detections: Vec<Diagnostic>,
}
impl Default for YourAnalyzer {
fn default() -> Self { Self::new() }
}
impl YourAnalyzer {
pub fn new() -> Self {
Self { detections: Vec::new() }
}
}
impl Analyzer for YourAnalyzer {
fn id(&self) -> &str { "your_analyzer" }
fn description(&self) -> &str { "One-line human description" }
fn required_topics(&self) -> &[&str] {
&["sensor_combined", "vehicle_status"]
}
fn on_message(&mut self, data: &DataMessage) {
let topic = data.flattened_format.message_name.as_str();
let ts = data
.flattened_format
.timestamp_field
.as_ref()
.map(|tf| tf.parse_timestamp(data.data))
.unwrap_or(0);
match topic {
"sensor_combined" => {
let Some(az) = parse_field::<f32>(data, "accelerometer_m_s2[2]") else {
return;
};
// ... update state, maybe push to self.detections ...
let _ = (ts, az);
}
_ => {}
}
}
fn finish(self: Box<Self>) -> Vec<Diagnostic> {
self.detections
}
}Notes on the trait methods:
id()is the stable machine identifier stored in the database and exposed via the API's?diagnostic=filter. Don't change it after release.required_topics()must match the exact ULog topic names. Typos mean your analyzer silently never runs.on_message()must not panic and must handle missing fields gracefully — useparse_field::<T>(), which returnsOption<T>, never unwrap.finish()takesBox<Self>(the pipeline owns the analyzers). Move your accumulated detections out and return them.
In mod.rs, add your analyzer to create_analyzers():
pub fn create_analyzers() -> Vec<Box<dyn Analyzer>> {
vec![
// ... existing ones ...
Box::new(your_analyzer::YourAnalyzer::new()),
]
}And add the pub mod your_analyzer; declaration at the top of the file.
Until you do this, nothing in the pipeline will ever construct or call your analyzer. This is the step most first-time contributors miss.
CI runs scripts/ci/check-analyzer.sh on every PR touching this directory. It grep-checks your file for a specific test pattern. At minimum you need:
no_false_positives_sample— runs your analyzer againsttests/fixtures/sample.ulg(a normal flight) and asserts zero detections.- A real-world detection test named
detects_real_*— points at a fixture ULog that actually exhibits the anomaly, asserts the detection fires with the right severity/evidence. If no fixture exists, addSKIP_FIXTURE: <reason>to the module doc comment and open an issue to collect one. handles_missing_fields— feed it a message with no fields and assert it doesn't panic and emits nothing.- At least one synthetic detection test — uses
MessageBuilderfromtesting.rsto construct messages by hand. This is where you pin down your detection logic with fast deterministic tests. - A snapshot test using
insta::assert_json_snapshot!on the fixture output. Runcargo insta reviewlocally to accept the first snapshot.
Copy the test block from rc_loss.rs and adapt it — it hits every required category.
Before opening a PR, run locally:
# The trait/test/registration checker CI uses
scripts/ci/check-analyzer.sh
# The diagnostic test suite
cargo test -p flight-review --lib diagnostics
# The performance budget
cargo bench -p flight-review --bench convertIf check-analyzer.sh complains, it will tell you exactly which of the five criteria you missed. If the bench regresses past the budget, profile your on_message — the usual culprit is allocating or parsing the same field multiple times per message.
- Defining a new
Analyzertrait. There's already one inmod.rs. Implement it; don't redefine it. - Putting the file outside
diagnostics/. It has to live in this directory, otherwise the CI checker and the registration factory won't see it. - Returning
Option<String>or a freeform summary. Results must beVec<Diagnostic>with a typedEvidencevariant. - Assuming you get the whole log at once. You don't. Design for streaming.
- Pulling in heavy ML dependencies without discussing the perf/memory budget first. The converter is zero-ML today; open an issue before adding something like
extended-isolation-forest,smartcore, etc. so we can agree on how the model is trained, shipped, and benchmarked. - Skipping the real-world fixture. Synthetic tests alone don't count toward the CI gate. Either ship a fixture or mark
SKIP_FIXTUREwith a reason.
Open a draft PR early and tag @mrpollo. Draft PRs are the right place to get architecture feedback before you go deep on implementation.