Skip to content

Instantly share code, notes, and snippets.

@bagbag
Created April 3, 2025 12:51
Show Gist options
  • Select an option

  • Save bagbag/9306415f6fea269f020ede970a9fecef to your computer and use it in GitHub Desktop.

Select an option

Save bagbag/9306415f6fea269f020ede970a9fecef to your computer and use it in GitHub Desktop.
embassy trace
use core::sync::atomic::{AtomicU32, Ordering};
use defmt::trace;
use embassy_sync::{
blocking_mutex::{CriticalSectionMutex, raw::CriticalSectionRawMutex},
signal::Signal,
};
use embassy_time::{Duration, Instant};
use heapless::Vec;
// type PendingTasksMap = FnvIndexMap<u32, TaskInstant, 16>;
type PendingTasksVec = Vec<TaskInstant, 16>;
const UPDATE_PERIOD_MS: u64 = 1000;
pub struct TaskStats {
pub count: u32,
pub max: Duration,
pub total: Duration,
}
impl TaskStats {
pub fn into_avg(&self) -> Duration {
match self.count > 0 {
true => self.total / self.count,
false => Duration::from_ticks(0), // Avoid division by zero
}
}
}
pub static PENDING_DURATION_SIGNAL: Signal<CriticalSectionRawMutex, TaskStats> = Signal::new();
pub static RUNNING_DURATION_SIGNAL: Signal<CriticalSectionRawMutex, TaskStats> = Signal::new();
static mut PENDING_TASKS: CriticalSectionMutex<PendingTasksVec> = CriticalSectionMutex::new(PendingTasksVec::new());
static mut RUNNING_TASKS: CriticalSectionMutex<PendingTasksVec> = CriticalSectionMutex::new(PendingTasksVec::new());
static PENDING_COUNT: AtomicU32 = AtomicU32::new(0);
static PENDING_MAX_DURATION: AtomicU32 = AtomicU32::new(0);
static PENDING_DURATION_SUM: AtomicU32 = AtomicU32::new(0);
static PENDING_LAST_UPDATE_PERIOD: AtomicU32 = AtomicU32::new(0);
static RUNNING_COUNT: AtomicU32 = AtomicU32::new(0);
static RUNNING_MAX_DURATION: AtomicU32 = AtomicU32::new(0);
static RUNNING_DURATION_SUM: AtomicU32 = AtomicU32::new(0);
static RUNNING_LAST_UPDATE_PERIOD: AtomicU32 = AtomicU32::new(0);
#[derive(Clone, Copy, Debug)]
struct TaskInstant {
task_id: u32,
instant: Instant,
}
#[inline]
#[allow(static_mut_refs)]
fn add_task(store: &mut CriticalSectionMutex<PendingTasksVec>, _executor_id: u32, task_id: u32, instant: Instant) {
let entry = TaskInstant { task_id, instant };
critical_section::with(|_| store.get_mut().push(entry).unwrap());
}
#[inline]
#[allow(static_mut_refs)]
fn get_task(store: &mut CriticalSectionMutex<PendingTasksVec>, _executor_id: u32, task_id: u32) -> Option<TaskInstant> {
critical_section::with(|_| {
let v = store.get_mut();
let index = v.iter().position(|x| x.task_id == task_id);
match index {
Some(index) => Some(v.remove(index)),
None => None,
}
})
}
#[unsafe(no_mangle)]
#[allow(static_mut_refs)]
pub extern "Rust" fn _embassy_trace_task_ready_begin(executor_id: u32, task_id: u32) {
add_task(unsafe { &mut PENDING_TASKS }, executor_id, task_id, Instant::now());
}
#[unsafe(no_mangle)]
#[allow(static_mut_refs)]
pub extern "Rust" fn _embassy_trace_task_exec_begin(executor_id: u32, task_id: u32) {
let pending_task = get_task(unsafe { &mut PENDING_TASKS }, executor_id, task_id);
add_task(unsafe { &mut RUNNING_TASKS }, executor_id, task_id, Instant::now());
if let Some(stats) = aggregate_end_event(
pending_task,
&PENDING_LAST_UPDATE_PERIOD,
&PENDING_COUNT,
&PENDING_DURATION_SUM,
&PENDING_MAX_DURATION,
) {
PENDING_DURATION_SIGNAL.signal(stats);
}
}
#[unsafe(no_mangle)]
pub extern "Rust" fn _embassy_trace_task_new(executor_id: u32, task_id: u32) {
trace!("[TRACE] TaskNew: exec={} task={}", executor_id, task_id);
}
#[unsafe(no_mangle)]
#[allow(static_mut_refs)]
pub extern "Rust" fn _embassy_trace_task_exec_end(executor_id: u32, task_id: u32) {
let running_task = get_task(unsafe { &mut RUNNING_TASKS }, executor_id, task_id);
if let Some(stats) = aggregate_end_event(
running_task,
&RUNNING_LAST_UPDATE_PERIOD,
&RUNNING_COUNT,
&RUNNING_DURATION_SUM,
&RUNNING_MAX_DURATION,
) {
RUNNING_DURATION_SIGNAL.signal(stats);
}
}
#[unsafe(no_mangle)]
pub extern "Rust" fn _embassy_trace_executor_idle(executor_id: u32) {
trace!("[TRACE] ExecutorIdle: exec={}", executor_id);
}
fn aggregate_end_event(
task_instant: Option<TaskInstant>,
period_store: &AtomicU32,
count_store: &AtomicU32,
sum_store: &AtomicU32,
max_store: &AtomicU32,
) -> Option<TaskStats> {
match task_instant {
Some(TaskInstant { task_id: _, instant }) => {
let now = Instant::now();
let delay = now.duration_since(instant).as_micros() as u32;
let current_period = (now.as_millis() / UPDATE_PERIOD_MS) as u32;
let previous_period = period_store.load(Ordering::Relaxed);
if current_period > previous_period {
let mut delay_stats_value: Option<TaskStats> = None;
// Attempt to atomically update the period. Only one task should succeed.
// If successful, this task is responsible for finalizing the *previous* period.
match period_store.compare_exchange(
previous_period, // Expected current value
current_period, // New value if expected matches
Ordering::AcqRel, // Ordering on success
Ordering::Relaxed, // Ordering on failure (value read is potentially stale, but okay)
) {
Ok(_) => {
// --- Success: We are the first task in the new period ---
// Atomically read the totals from the *previous* period and reset/initialize them.
// For MAX: read previous max and simultaneously set the new period's initial max to 'delay'.
let previous_max = max_store.swap(delay, Ordering::AcqRel); // Optimized: Read previous max & set initial max for new period
// For SUM/COUNTER: read previous totals and reset to the current task's values.
let previous_sum = sum_store.swap(delay, Ordering::AcqRel); // Read previous sum & set initial sum for new period
let previous_count = count_store.swap(1, Ordering::AcqRel); // Read previous count & set initial count for new period
// Calculate statistics for the *previous* period
delay_stats_value = Some(TaskStats {
count: previous_count,
max: Duration::from_micros(previous_max as u64),
total: Duration::from_micros(previous_sum as u64),
});
}
Err(_) => {
// --- Failure: Another task already updated the period ---
// Just add the current task's delay to the ongoing statistics for the *current* period.
// These atomics were potentially reset by the winning task moments ago.
max_store.fetch_max(delay, Ordering::AcqRel);
sum_store.fetch_add(delay, Ordering::AcqRel);
count_store.fetch_add(1, Ordering::AcqRel);
}
}
delay_stats_value
} else {
// --- Period has not changed ---
// Just add the current task's delay to the ongoing statistics for the current period.
max_store.fetch_max(delay, Ordering::AcqRel);
sum_store.fetch_add(delay, Ordering::AcqRel);
count_store.fetch_add(1, Ordering::AcqRel);
None
}
}
_ => None,
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment