diff --git a/tools/sched_ext/Makefile b/tools/sched_ext/Makefile index 71b5809243e30..73c43782837d4 100644 --- a/tools/sched_ext/Makefile +++ b/tools/sched_ext/Makefile @@ -85,6 +85,8 @@ CFLAGS += -g -O2 -rdynamic -pthread -Wall -Werror $(GENFLAGS) \ -I$(INCLUDE_DIR) -I$(GENDIR) -I$(LIBDIR) \ -I$(TOOLSINCDIR) -I$(APIDIR) +CARGOFLAGS := --release + # Silence some warnings when compiled with clang ifneq ($(LLVM),) CFLAGS += -Wno-unused-command-line-argument @@ -116,7 +118,7 @@ BPF_CFLAGS = -g -D__TARGET_ARCH_$(SRCARCH) \ -O2 -mcpu=v3 all: scx_example_simple scx_example_qmap scx_example_central scx_example_pair \ - scx_example_flatcg scx_example_userland + scx_example_flatcg scx_example_userland atropos # sort removes libbpf duplicates when not cross-building MAKE_DIRS := $(sort $(BUILD_DIR)/libbpf $(HOST_BUILD_DIR)/libbpf \ @@ -192,13 +194,20 @@ scx_example_userland: scx_example_userland.c scx_example_userland.skel.h \ $(CC) $(CFLAGS) -c $< -o $@.o $(CC) -o $@ $@.o $(HOST_BPFOBJ) $(LDFLAGS) +atropos: export RUSTFLAGS = -C link-args=-lzstd -C link-args=-lz -C link-args=-lelf -L $(BPFOBJ_DIR) +atropos: export ATROPOS_CLANG = $(CLANG) +atropos: export ATROPOS_BPF_CFLAGS = $(BPF_CFLAGS) +atropos: $(INCLUDE_DIR)/vmlinux.h + cargo build --manifest-path=atropos/Cargo.toml $(CARGOFLAGS) + clean: + cargo clean --manifest-path=atropos/Cargo.toml rm -rf $(SCRATCH_DIR) $(HOST_SCRATCH_DIR) rm -f *.o *.bpf.o *.skel.h *.subskel.h rm -f scx_example_simple scx_example_qmap scx_example_central \ scx_example_pair scx_example_flatcg scx_example_userland -.PHONY: all clean +.PHONY: all atropos clean # delete failed targets .DELETE_ON_ERROR: diff --git a/tools/sched_ext/atropos/.gitignore b/tools/sched_ext/atropos/.gitignore new file mode 100644 index 0000000000000..186dba259ec21 --- /dev/null +++ b/tools/sched_ext/atropos/.gitignore @@ -0,0 +1,3 @@ +src/bpf/.output +Cargo.lock +target diff --git a/tools/sched_ext/atropos/Cargo.toml b/tools/sched_ext/atropos/Cargo.toml new file mode 100644 index 0000000000000..7462a836d53dd --- /dev/null +++ b/tools/sched_ext/atropos/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "scx_atropos" +version = "0.5.0" +authors = ["Dan Schatzberg ", "Meta"] +edition = "2021" +description = "Userspace scheduling with BPF" +license = "GPL-2.0-only" + +[dependencies] +anyhow = "1.0.65" +bitvec = { version = "1.0", features = ["serde"] } +clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] } +ctrlc = { version = "3.1", features = ["termination"] } +fb_procfs = { git = "https://github.com/facebookincubator/below.git", rev = "f305730"} +hex = "0.4.3" +libbpf-rs = "0.19.1" +libbpf-sys = { version = "1.0.4", features = ["novendor", "static"] } +libc = "0.2.137" +log = "0.4.17" +ordered-float = "3.4.0" +simplelog = "0.12.0" + +[build-dependencies] +bindgen = { version = "0.61.0", features = ["logging", "static"], default-features = false } +libbpf-cargo = "0.13.0" + +[features] +enable_backtrace = [] diff --git a/tools/sched_ext/atropos/build.rs b/tools/sched_ext/atropos/build.rs new file mode 100644 index 0000000000000..26e792c5e17e9 --- /dev/null +++ b/tools/sched_ext/atropos/build.rs @@ -0,0 +1,70 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +extern crate bindgen; + +use std::env; +use std::fs::create_dir_all; +use std::path::Path; +use std::path::PathBuf; + +use libbpf_cargo::SkeletonBuilder; + +const HEADER_PATH: &str = "src/bpf/atropos.h"; + +fn bindgen_atropos() { + // Tell cargo to invalidate the built crate whenever the wrapper changes + println!("cargo:rerun-if-changed={}", HEADER_PATH); + + // The bindgen::Builder is the main entry point + // to bindgen, and lets you build up options for + // the resulting bindings. + let bindings = bindgen::Builder::default() + // The input header we would like to generate + // bindings for. + .header(HEADER_PATH) + // Tell cargo to invalidate the built crate whenever any of the + // included header files changed. + .parse_callbacks(Box::new(bindgen::CargoCallbacks)) + // Finish the builder and generate the bindings. + .generate() + // Unwrap the Result and panic on failure. + .expect("Unable to generate bindings"); + + // Write the bindings to the $OUT_DIR/bindings.rs file. + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); + bindings + .write_to_file(out_path.join("atropos-sys.rs")) + .expect("Couldn't write bindings!"); +} + +fn gen_bpf_sched(name: &str) { + let bpf_cflags = env::var("ATROPOS_BPF_CFLAGS").unwrap(); + let clang = env::var("ATROPOS_CLANG").unwrap(); + eprintln!("{}", clang); + let outpath = format!("./src/bpf/.output/{}.skel.rs", name); + let skel = Path::new(&outpath); + let src = format!("./src/bpf/{}.bpf.c", name); + SkeletonBuilder::new() + .source(src.clone()) + .clang(clang) + .clang_args(bpf_cflags) + .build_and_generate(&skel) + .unwrap(); + println!("cargo:rerun-if-changed={}", src); +} + +fn main() { + bindgen_atropos(); + // It's unfortunate we cannot use `OUT_DIR` to store the generated skeleton. + // Reasons are because the generated skeleton contains compiler attributes + // that cannot be `include!()`ed via macro. And we cannot use the `#[path = "..."]` + // trick either because you cannot yet `concat!(env!("OUT_DIR"), "/skel.rs")` inside + // the path attribute either (see https://github.com/rust-lang/rust/pull/83366). + // + // However, there is hope! When the above feature stabilizes we can clean this + // all up. + create_dir_all("./src/bpf/.output").unwrap(); + gen_bpf_sched("atropos"); +} diff --git a/tools/sched_ext/atropos/rustfmt.toml b/tools/sched_ext/atropos/rustfmt.toml new file mode 100644 index 0000000000000..b7258ed0a8d84 --- /dev/null +++ b/tools/sched_ext/atropos/rustfmt.toml @@ -0,0 +1,8 @@ +# Get help on options with `rustfmt --help=config` +# Please keep these in alphabetical order. +edition = "2021" +group_imports = "StdExternalCrate" +imports_granularity = "Item" +merge_derives = false +use_field_init_shorthand = true +version = "Two" diff --git a/tools/sched_ext/atropos/src/atropos_sys.rs b/tools/sched_ext/atropos/src/atropos_sys.rs new file mode 100644 index 0000000000000..bbeaf856d40e8 --- /dev/null +++ b/tools/sched_ext/atropos/src/atropos_sys.rs @@ -0,0 +1,10 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(dead_code)] + +include!(concat!(env!("OUT_DIR"), "/atropos-sys.rs")); diff --git a/tools/sched_ext/atropos/src/bpf/atropos.bpf.c b/tools/sched_ext/atropos/src/bpf/atropos.bpf.c new file mode 100644 index 0000000000000..c26ecf0e77a84 --- /dev/null +++ b/tools/sched_ext/atropos/src/bpf/atropos.bpf.c @@ -0,0 +1,751 @@ +/* Copyright (c) Meta Platforms, Inc. and affiliates. */ +/* + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + * + * Atropos is a multi-domain BPF / userspace hybrid scheduler where the BPF + * part does simple round robin in each domain and the userspace part + * calculates the load factor of each domain and tells the BPF part how to load + * balance the domains. + * + * Every task has an entry in the task_data map which lists which domain the + * task belongs to. When a task first enters the system (atropos_prep_enable), + * they are round-robined to a domain. + * + * atropos_select_cpu is the primary scheduling logic, invoked when a task + * becomes runnable. The lb_data map is populated by userspace to inform the BPF + * scheduler that a task should be migrated to a new domain. Otherwise, the task + * is scheduled in priority order as follows: + * * The current core if the task was woken up synchronously and there are idle + * cpus in the system + * * The previous core, if idle + * * The pinned-to core if the task is pinned to a specific core + * * Any idle cpu in the domain + * + * If none of the above conditions are met, then the task is enqueued to a + * dispatch queue corresponding to the domain (atropos_enqueue). + * + * atropos_dispatch will attempt to consume a task from its domain's + * corresponding dispatch queue (this occurs after scheduling any tasks directly + * assigned to it due to the logic in atropos_select_cpu). If no task is found, + * then greedy load stealing will attempt to find a task on another dispatch + * queue to run. + * + * Load balancing is almost entirely handled by userspace. BPF populates the + * task weight, dom mask and current dom in the task_data map and executes the + * load balance based on userspace populating the lb_data map. + */ +#include "../../../scx_common.bpf.h" +#include "atropos.h" + +#include +#include +#include +#include +#include +#include + +char _license[] SEC("license") = "GPL"; + +/* + * const volatiles are set during initialization and treated as consts by the + * jit compiler. + */ + +/* + * Domains and cpus + */ +const volatile __u32 nr_doms = 32; /* !0 for veristat, set during init */ +const volatile __u32 nr_cpus = 64; /* !0 for veristat, set during init */ +const volatile __u32 cpu_dom_id_map[MAX_CPUS]; +const volatile __u64 dom_cpumasks[MAX_DOMS][MAX_CPUS / 64]; + +const volatile bool kthreads_local; +const volatile bool fifo_sched; +const volatile bool switch_partial; +const volatile __u32 greedy_threshold; + +/* base slice duration */ +const volatile __u64 slice_us = 20000; + +/* + * Exit info + */ +int exit_type = SCX_EXIT_NONE; +char exit_msg[SCX_EXIT_MSG_LEN]; + +struct pcpu_ctx { + __u32 dom_rr_cur; /* used when scanning other doms */ + + /* libbpf-rs does not respect the alignment, so pad out the struct explicitly */ + __u8 _padding[CACHELINE_SIZE - sizeof(u64)]; +} __attribute__((aligned(CACHELINE_SIZE))); + +struct pcpu_ctx pcpu_ctx[MAX_CPUS]; + +/* + * Domain context + */ +struct dom_ctx { + struct bpf_cpumask __kptr *cpumask; + u64 vtime_now; +}; + +struct { + __uint(type, BPF_MAP_TYPE_ARRAY); + __type(key, u32); + __type(value, struct dom_ctx); + __uint(max_entries, MAX_DOMS); + __uint(map_flags, 0); +} dom_ctx SEC(".maps"); + +/* + * Statistics + */ +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(key_size, sizeof(u32)); + __uint(value_size, sizeof(u64)); + __uint(max_entries, ATROPOS_NR_STATS); +} stats SEC(".maps"); + +static inline void stat_add(enum stat_idx idx, u64 addend) +{ + u32 idx_v = idx; + + u64 *cnt_p = bpf_map_lookup_elem(&stats, &idx_v); + if (cnt_p) + (*cnt_p) += addend; +} + +/* Map pid -> task_ctx */ +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, pid_t); + __type(value, struct task_ctx); + __uint(max_entries, 1000000); + __uint(map_flags, 0); +} task_data SEC(".maps"); + +/* + * This is populated from userspace to indicate which pids should be reassigned + * to new doms. + */ +struct { + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, pid_t); + __type(value, u32); + __uint(max_entries, 1000); + __uint(map_flags, 0); +} lb_data SEC(".maps"); + +static inline bool vtime_before(u64 a, u64 b) +{ + return (s64)(a - b) < 0; +} + +static bool task_set_dsq(struct task_ctx *task_ctx, struct task_struct *p, + u32 new_dom_id) +{ + struct dom_ctx *old_domc, *new_domc; + struct bpf_cpumask *d_cpumask, *t_cpumask; + u32 old_dom_id = task_ctx->dom_id; + s64 vtime_delta; + + old_domc = bpf_map_lookup_elem(&dom_ctx, &old_dom_id); + if (!old_domc) { + scx_bpf_error("No dom%u", old_dom_id); + return false; + } + + vtime_delta = p->scx.dsq_vtime - old_domc->vtime_now; + + new_domc = bpf_map_lookup_elem(&dom_ctx, &new_dom_id); + if (!new_domc) { + scx_bpf_error("No dom%u", new_dom_id); + return false; + } + + d_cpumask = bpf_cpumask_kptr_get(&new_domc->cpumask); + if (!d_cpumask) { + scx_bpf_error("Failed to get domain %u cpumask kptr", + new_dom_id); + return false; + } + + t_cpumask = bpf_cpumask_kptr_get(&task_ctx->cpumask); + if (!t_cpumask) { + scx_bpf_error("Failed to look up task cpumask"); + bpf_cpumask_release(d_cpumask); + return false; + } + + /* + * set_cpumask might have happened between userspace requesting LB and + * here and @p might not be able to run in @dom_id anymore. Verify. + */ + if (bpf_cpumask_intersects((const struct cpumask *)d_cpumask, + p->cpus_ptr)) { + p->scx.dsq_vtime = new_domc->vtime_now + vtime_delta; + task_ctx->dom_id = new_dom_id; + bpf_cpumask_and(t_cpumask, (const struct cpumask *)d_cpumask, + p->cpus_ptr); + } + + bpf_cpumask_release(d_cpumask); + bpf_cpumask_release(t_cpumask); + + return task_ctx->dom_id == new_dom_id; +} + +s32 BPF_STRUCT_OPS(atropos_select_cpu, struct task_struct *p, int prev_cpu, + u32 wake_flags) +{ + s32 cpu; + pid_t pid = p->pid; + struct task_ctx *task_ctx = bpf_map_lookup_elem(&task_data, &pid); + struct bpf_cpumask *p_cpumask; + + if (!task_ctx) + return -ENOENT; + + if (kthreads_local && + (p->flags & PF_KTHREAD) && p->nr_cpus_allowed == 1) { + cpu = prev_cpu; + stat_add(ATROPOS_STAT_DIRECT_DISPATCH, 1); + goto local; + } + + /* + * If WAKE_SYNC and the machine isn't fully saturated, wake up @p to the + * local dsq of the waker. + */ + if (p->nr_cpus_allowed > 1 && (wake_flags & SCX_WAKE_SYNC)) { + struct task_struct *current = (void *)bpf_get_current_task(); + + if (!(BPF_CORE_READ(current, flags) & PF_EXITING) && + task_ctx->dom_id < MAX_DOMS) { + struct dom_ctx *domc; + struct bpf_cpumask *d_cpumask; + const struct cpumask *idle_cpumask; + bool has_idle; + + domc = bpf_map_lookup_elem(&dom_ctx, &task_ctx->dom_id); + if (!domc) { + scx_bpf_error("Failed to find dom%u", + task_ctx->dom_id); + return prev_cpu; + } + d_cpumask = bpf_cpumask_kptr_get(&domc->cpumask); + if (!d_cpumask) { + scx_bpf_error("Failed to acquire domain %u cpumask kptr", + task_ctx->dom_id); + return prev_cpu; + } + + idle_cpumask = scx_bpf_get_idle_cpumask(); + + has_idle = bpf_cpumask_intersects((const struct cpumask *)d_cpumask, + idle_cpumask); + + bpf_cpumask_release(d_cpumask); + scx_bpf_put_idle_cpumask(idle_cpumask); + + if (has_idle) { + cpu = bpf_get_smp_processor_id(); + if (bpf_cpumask_test_cpu(cpu, p->cpus_ptr)) { + stat_add(ATROPOS_STAT_WAKE_SYNC, 1); + goto local; + } + } + } + } + + /* if the previous CPU is idle, dispatch directly to it */ + if (scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + stat_add(ATROPOS_STAT_PREV_IDLE, 1); + cpu = prev_cpu; + goto local; + } + + /* If only one core is allowed, dispatch */ + if (p->nr_cpus_allowed == 1) { + stat_add(ATROPOS_STAT_PINNED, 1); + cpu = prev_cpu; + goto local; + } + + p_cpumask = bpf_cpumask_kptr_get(&task_ctx->cpumask); + if (!p_cpumask) + return -ENOENT; + + /* If there is an eligible idle CPU, dispatch directly */ + cpu = scx_bpf_pick_idle_cpu((const struct cpumask *)p_cpumask); + if (cpu >= 0) { + bpf_cpumask_release(p_cpumask); + stat_add(ATROPOS_STAT_DIRECT_DISPATCH, 1); + goto local; + } + + /* + * @prev_cpu may be in a different domain. Returning an out-of-domain + * CPU can lead to stalls as all in-domain CPUs may be idle by the time + * @p gets enqueued. + */ + if (bpf_cpumask_test_cpu(prev_cpu, (const struct cpumask *)p_cpumask)) + cpu = prev_cpu; + else + cpu = bpf_cpumask_any((const struct cpumask *)p_cpumask); + + bpf_cpumask_release(p_cpumask); + return cpu; + +local: + task_ctx->dispatch_local = true; + return cpu; +} + +void BPF_STRUCT_OPS(atropos_enqueue, struct task_struct *p, u32 enq_flags) +{ + pid_t pid = p->pid; + struct task_ctx *task_ctx = bpf_map_lookup_elem(&task_data, &pid); + u32 *new_dom; + + if (!task_ctx) { + scx_bpf_error("No task_ctx[%d]", pid); + return; + } + + new_dom = bpf_map_lookup_elem(&lb_data, &pid); + if (new_dom && *new_dom != task_ctx->dom_id && + task_set_dsq(task_ctx, p, *new_dom)) { + struct bpf_cpumask *p_cpumask; + s32 cpu; + + stat_add(ATROPOS_STAT_LOAD_BALANCE, 1); + + /* + * If dispatch_local is set, We own @p's idle state but we are + * not gonna put the task in the associated local dsq which can + * cause the CPU to stall. Kick it. + */ + if (task_ctx->dispatch_local) { + task_ctx->dispatch_local = false; + scx_bpf_kick_cpu(scx_bpf_task_cpu(p), 0); + } + + p_cpumask = bpf_cpumask_kptr_get(&task_ctx->cpumask); + if (!p_cpumask) { + scx_bpf_error("Failed to get task_ctx->cpumask"); + return; + } + cpu = scx_bpf_pick_idle_cpu((const struct cpumask *)p_cpumask); + bpf_cpumask_release(p_cpumask); + + if (cpu >= 0) + scx_bpf_kick_cpu(cpu, 0); + } + + if (task_ctx->dispatch_local) { + task_ctx->dispatch_local = false; + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, slice_us * 1000, enq_flags); + return; + } + + if (fifo_sched) { + scx_bpf_dispatch(p, task_ctx->dom_id, slice_us * 1000, + enq_flags); + } else { + u64 vtime = p->scx.dsq_vtime; + u32 dom_id = task_ctx->dom_id; + struct dom_ctx *domc; + + domc = bpf_map_lookup_elem(&dom_ctx, &dom_id); + if (!domc) { + scx_bpf_error("No dom[%u]", dom_id); + return; + } + + /* + * Limit the amount of budget that an idling task can accumulate + * to one slice. + */ + if (vtime_before(vtime, domc->vtime_now - slice_us * 1000)) + vtime = domc->vtime_now - slice_us * 1000; + + scx_bpf_dispatch_vtime(p, task_ctx->dom_id, SCX_SLICE_DFL, vtime, + enq_flags); + } +} + +static u32 cpu_to_dom_id(s32 cpu) +{ + const volatile u32 *dom_idp; + + if (nr_doms <= 1) + return 0; + + dom_idp = MEMBER_VPTR(cpu_dom_id_map, [cpu]); + if (!dom_idp) + return MAX_DOMS; + + return *dom_idp; +} + +static bool cpumask_intersects_domain(const struct cpumask *cpumask, u32 dom_id) +{ + s32 cpu; + + if (dom_id >= MAX_DOMS) + return false; + + bpf_for(cpu, 0, nr_cpus) { + if (bpf_cpumask_test_cpu(cpu, cpumask) && + (dom_cpumasks[dom_id][cpu / 64] & (1LLU << (cpu % 64)))) + return true; + } + return false; +} + +static u32 dom_rr_next(s32 cpu) +{ + struct pcpu_ctx *pcpuc; + u32 dom_id; + + pcpuc = MEMBER_VPTR(pcpu_ctx, [cpu]); + if (!pcpuc) + return 0; + + dom_id = (pcpuc->dom_rr_cur + 1) % nr_doms; + + if (dom_id == cpu_to_dom_id(cpu)) + dom_id = (dom_id + 1) % nr_doms; + + pcpuc->dom_rr_cur = dom_id; + return dom_id; +} + +void BPF_STRUCT_OPS(atropos_dispatch, s32 cpu, struct task_struct *prev) +{ + u32 dom = cpu_to_dom_id(cpu); + + if (scx_bpf_consume(dom)) { + stat_add(ATROPOS_STAT_DSQ_DISPATCH, 1); + return; + } + + if (!greedy_threshold) + return; + + bpf_repeat(nr_doms - 1) { + u32 dom_id = dom_rr_next(cpu); + + if (scx_bpf_dsq_nr_queued(dom_id) >= greedy_threshold && + scx_bpf_consume(dom_id)) { + stat_add(ATROPOS_STAT_GREEDY, 1); + break; + } + } +} + +void BPF_STRUCT_OPS(atropos_runnable, struct task_struct *p, u64 enq_flags) +{ + pid_t pid = p->pid; + struct task_ctx *task_ctx = bpf_map_lookup_elem(&task_data, &pid); + + if (!task_ctx) { + scx_bpf_error("No task_ctx[%d]", pid); + return; + } + + task_ctx->runnable_at = bpf_ktime_get_ns(); +} + +void BPF_STRUCT_OPS(atropos_running, struct task_struct *p) +{ + struct task_ctx *taskc; + struct dom_ctx *domc; + pid_t pid = p->pid; + u32 dom_id; + + if (fifo_sched) + return; + + taskc = bpf_map_lookup_elem(&task_data, &pid); + if (!taskc) { + scx_bpf_error("No task_ctx[%d]", pid); + return; + } + dom_id = taskc->dom_id; + + domc = bpf_map_lookup_elem(&dom_ctx, &dom_id); + if (!domc) { + scx_bpf_error("No dom[%u]", dom_id); + return; + } + + /* + * Global vtime always progresses forward as tasks start executing. The + * test and update can be performed concurrently from multiple CPUs and + * thus racy. Any error should be contained and temporary. Let's just + * live with it. + */ + if (vtime_before(domc->vtime_now, p->scx.dsq_vtime)) + domc->vtime_now = p->scx.dsq_vtime; +} + +void BPF_STRUCT_OPS(atropos_stopping, struct task_struct *p, bool runnable) +{ + if (fifo_sched) + return; + + /* scale the execution time by the inverse of the weight and charge */ + p->scx.dsq_vtime += (SCX_SLICE_DFL - p->scx.slice) * 100 / p->scx.weight; +} + +void BPF_STRUCT_OPS(atropos_quiescent, struct task_struct *p, u64 deq_flags) +{ + pid_t pid = p->pid; + struct task_ctx *task_ctx = bpf_map_lookup_elem(&task_data, &pid); + + if (!task_ctx) { + scx_bpf_error("No task_ctx[%d]", pid); + return; + } + + task_ctx->runnable_for += bpf_ktime_get_ns() - task_ctx->runnable_at; + task_ctx->runnable_at = 0; +} + +void BPF_STRUCT_OPS(atropos_set_weight, struct task_struct *p, u32 weight) +{ + pid_t pid = p->pid; + struct task_ctx *task_ctx = bpf_map_lookup_elem(&task_data, &pid); + + if (!task_ctx) { + scx_bpf_error("No task_ctx[%d]", pid); + return; + } + + task_ctx->weight = weight; +} + +struct pick_task_domain_loop_ctx { + struct task_struct *p; + const struct cpumask *cpumask; + u64 dom_mask; + u32 dom_rr_base; + u32 dom_id; +}; + +static int pick_task_domain_loopfn(u32 idx, void *data) +{ + struct pick_task_domain_loop_ctx *lctx = data; + u32 dom_id = (lctx->dom_rr_base + idx) % nr_doms; + + if (dom_id >= MAX_DOMS) + return 1; + + if (cpumask_intersects_domain(lctx->cpumask, dom_id)) { + lctx->dom_mask |= 1LLU << dom_id; + if (lctx->dom_id == MAX_DOMS) + lctx->dom_id = dom_id; + } + return 0; +} + +static u32 pick_task_domain(struct task_ctx *task_ctx, struct task_struct *p, + const struct cpumask *cpumask) +{ + struct pick_task_domain_loop_ctx lctx = { + .p = p, + .cpumask = cpumask, + .dom_id = MAX_DOMS, + }; + s32 cpu = bpf_get_smp_processor_id(); + + if (cpu < 0 || cpu >= MAX_CPUS) + return MAX_DOMS; + + lctx.dom_rr_base = ++(pcpu_ctx[cpu].dom_rr_cur); + + bpf_loop(nr_doms, pick_task_domain_loopfn, &lctx, 0); + task_ctx->dom_mask = lctx.dom_mask; + + return lctx.dom_id; +} + +static void task_set_domain(struct task_ctx *task_ctx, struct task_struct *p, + const struct cpumask *cpumask) +{ + u32 dom_id = 0; + + if (nr_doms > 1) + dom_id = pick_task_domain(task_ctx, p, cpumask); + + if (!task_set_dsq(task_ctx, p, dom_id)) + scx_bpf_error("Failed to set domain %d for %s[%d]", + dom_id, p->comm, p->pid); +} + +void BPF_STRUCT_OPS(atropos_set_cpumask, struct task_struct *p, + const struct cpumask *cpumask) +{ + pid_t pid = p->pid; + struct task_ctx *task_ctx = bpf_map_lookup_elem(&task_data, &pid); + if (!task_ctx) { + scx_bpf_error("No task_ctx[%d]", pid); + return; + } + + task_set_domain(task_ctx, p, cpumask); +} + +s32 BPF_STRUCT_OPS(atropos_prep_enable, struct task_struct *p, + struct scx_enable_args *args) +{ + struct bpf_cpumask *cpumask; + struct task_ctx task_ctx, *map_value; + long ret; + pid_t pid; + + memset(&task_ctx, 0, sizeof(task_ctx)); + + pid = p->pid; + ret = bpf_map_update_elem(&task_data, &pid, &task_ctx, BPF_NOEXIST); + if (ret) { + stat_add(ATROPOS_STAT_TASK_GET_ERR, 1); + return ret; + } + + /* + * Read the entry from the map immediately so we can add the cpumask + * with bpf_kptr_xchg(). + */ + map_value = bpf_map_lookup_elem(&task_data, &pid); + if (!map_value) + /* Should never happen -- it was just inserted above. */ + return -EINVAL; + + cpumask = bpf_cpumask_create(); + if (!cpumask) { + bpf_map_delete_elem(&task_data, &pid); + return -ENOMEM; + } + + cpumask = bpf_kptr_xchg(&map_value->cpumask, cpumask); + if (cpumask) { + /* Should never happen as we just inserted it above. */ + bpf_cpumask_release(cpumask); + bpf_map_delete_elem(&task_data, &pid); + return -EINVAL; + } + + task_set_domain(map_value, p, p->cpus_ptr); + + return 0; +} + +void BPF_STRUCT_OPS(atropos_disable, struct task_struct *p) +{ + pid_t pid = p->pid; + long ret = bpf_map_delete_elem(&task_data, &pid); + if (ret) { + stat_add(ATROPOS_STAT_TASK_GET_ERR, 1); + return; + } +} + +static int create_dom_dsq(u32 idx, void *data) +{ + struct dom_ctx domc_init = {}, *domc; + struct bpf_cpumask *cpumask; + u32 cpu, dom_id = idx; + s32 ret; + + ret = scx_bpf_create_dsq(dom_id, -1); + if (ret < 0) { + scx_bpf_error("Failed to create dsq %u (%d)", dom_id, ret); + return 1; + } + + ret = bpf_map_update_elem(&dom_ctx, &dom_id, &domc_init, 0); + if (ret) { + scx_bpf_error("Failed to add dom_ctx entry %u (%d)", dom_id, ret); + return 1; + } + + domc = bpf_map_lookup_elem(&dom_ctx, &dom_id); + if (!domc) { + /* Should never happen, we just inserted it above. */ + scx_bpf_error("No dom%u", dom_id); + return 1; + } + + cpumask = bpf_cpumask_create(); + if (!cpumask) { + scx_bpf_error("Failed to create BPF cpumask for domain %u", dom_id); + return 1; + } + + for (cpu = 0; cpu < MAX_CPUS; cpu++) { + const volatile __u64 *dmask; + + dmask = MEMBER_VPTR(dom_cpumasks, [dom_id][cpu / 64]); + if (!dmask) { + scx_bpf_error("array index error"); + bpf_cpumask_release(cpumask); + return 1; + } + + if (*dmask & (1LLU << (cpu % 64))) + bpf_cpumask_set_cpu(cpu, cpumask); + } + + cpumask = bpf_kptr_xchg(&domc->cpumask, cpumask); + if (cpumask) { + scx_bpf_error("Domain %u was already present", dom_id); + bpf_cpumask_release(cpumask); + return 1; + } + + return 0; +} + +int BPF_STRUCT_OPS_SLEEPABLE(atropos_init) +{ + if (!switch_partial) + scx_bpf_switch_all(); + + bpf_loop(nr_doms, create_dom_dsq, NULL, 0); + + for (u32 i = 0; i < nr_cpus; i++) + pcpu_ctx[i].dom_rr_cur = i; + + return 0; +} + +void BPF_STRUCT_OPS(atropos_exit, struct scx_exit_info *ei) +{ + bpf_probe_read_kernel_str(exit_msg, sizeof(exit_msg), ei->msg); + exit_type = ei->type; +} + +SEC(".struct_ops") +struct sched_ext_ops atropos = { + .select_cpu = (void *)atropos_select_cpu, + .enqueue = (void *)atropos_enqueue, + .dispatch = (void *)atropos_dispatch, + .runnable = (void *)atropos_runnable, + .running = (void *)atropos_running, + .stopping = (void *)atropos_stopping, + .quiescent = (void *)atropos_quiescent, + .set_weight = (void *)atropos_set_weight, + .set_cpumask = (void *)atropos_set_cpumask, + .prep_enable = (void *)atropos_prep_enable, + .disable = (void *)atropos_disable, + .init = (void *)atropos_init, + .exit = (void *)atropos_exit, + .flags = 0, + .name = "atropos", +}; diff --git a/tools/sched_ext/atropos/src/bpf/atropos.h b/tools/sched_ext/atropos/src/bpf/atropos.h new file mode 100644 index 0000000000000..addf29ca104a5 --- /dev/null +++ b/tools/sched_ext/atropos/src/bpf/atropos.h @@ -0,0 +1,44 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +#ifndef __ATROPOS_H +#define __ATROPOS_H + +#include +#ifndef __kptr +#ifdef __KERNEL__ +#error "__kptr_ref not defined in the kernel" +#endif +#define __kptr +#endif + +#define MAX_CPUS 512 +#define MAX_DOMS 64 /* limited to avoid complex bitmask ops */ +#define CACHELINE_SIZE 64 + +/* Statistics */ +enum stat_idx { + ATROPOS_STAT_TASK_GET_ERR, + ATROPOS_STAT_WAKE_SYNC, + ATROPOS_STAT_PREV_IDLE, + ATROPOS_STAT_PINNED, + ATROPOS_STAT_DIRECT_DISPATCH, + ATROPOS_STAT_DSQ_DISPATCH, + ATROPOS_STAT_GREEDY, + ATROPOS_STAT_LOAD_BALANCE, + ATROPOS_STAT_LAST_TASK, + ATROPOS_NR_STATS, +}; + +struct task_ctx { + unsigned long long dom_mask; /* the domains this task can run on */ + struct bpf_cpumask __kptr *cpumask; + unsigned int dom_id; + unsigned int weight; + unsigned long long runnable_at; + unsigned long long runnable_for; + bool dispatch_local; +}; + +#endif /* __ATROPOS_H */ diff --git a/tools/sched_ext/atropos/src/main.rs b/tools/sched_ext/atropos/src/main.rs new file mode 100644 index 0000000000000..0d313662f7136 --- /dev/null +++ b/tools/sched_ext/atropos/src/main.rs @@ -0,0 +1,942 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +#[path = "bpf/.output/atropos.skel.rs"] +mod atropos; +pub use atropos::*; +pub mod atropos_sys; + +use std::cell::Cell; +use std::collections::{BTreeMap, BTreeSet}; +use std::ffi::CStr; +use std::ops::Bound::{Included, Unbounded}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use ::fb_procfs as procfs; +use anyhow::{anyhow, bail, Context, Result}; +use bitvec::prelude::*; +use clap::Parser; +use log::{info, trace, warn}; +use ordered_float::OrderedFloat; + +/// Atropos is a multi-domain BPF / userspace hybrid scheduler where the BPF +/// part does simple round robin in each domain and the userspace part +/// calculates the load factor of each domain and tells the BPF part how to load +/// balance the domains. + +/// This scheduler demonstrates dividing scheduling logic between BPF and +/// userspace and using rust to build the userspace part. An earlier variant of +/// this scheduler was used to balance across six domains, each representing a +/// chiplet in a six-chiplet AMD processor, and could match the performance of +/// production setup using CFS. +#[derive(Debug, Parser)] +struct Opts { + /// Scheduling slice duration in microseconds. + #[clap(short, long, default_value = "20000")] + slice_us: u64, + + /// Monitoring and load balance interval in seconds. + #[clap(short, long, default_value = "2.0")] + interval: f64, + + /// Build domains according to how CPUs are grouped at this cache level + /// as determined by /sys/devices/system/cpu/cpuX/cache/indexI/id. + #[clap(short = 'c', long, default_value = "3")] + cache_level: u32, + + /// Instead of using cache locality, set the cpumask for each domain + /// manually, provide multiple --cpumasks, one for each domain. E.g. + /// --cpumasks 0xff_00ff --cpumasks 0xff00 will create two domains with + /// the corresponding CPUs belonging to each domain. Each CPU must + /// belong to precisely one domain. + #[clap(short = 'C', long, num_args = 1.., conflicts_with = "cache_level")] + cpumasks: Vec, + + /// When non-zero, enable greedy task stealing. When a domain is idle, a + /// cpu will attempt to steal tasks from a domain with at least + /// greedy_threshold tasks enqueued. These tasks aren't permanently + /// stolen from the domain. + #[clap(short, long, default_value = "4")] + greedy_threshold: u32, + + /// The load decay factor. Every interval, the existing load is decayed + /// by this factor and new load is added. Must be in the range [0.0, + /// 0.99]. The smaller the value, the more sensitive load calculation + /// is to recent changes. When 0.0, history is ignored and the load + /// value from the latest period is used directly. + #[clap(short, long, default_value = "0.5")] + load_decay_factor: f64, + + /// Disable load balancing. Unless disabled, periodically userspace will + /// calculate the load factor of each domain and instruct BPF which + /// processes to move. + #[clap(short, long, action = clap::ArgAction::SetTrue)] + no_load_balance: bool, + + /// Put per-cpu kthreads directly into local dsq's. + #[clap(short, long, action = clap::ArgAction::SetTrue)] + kthreads_local: bool, + + /// Use FIFO scheduling instead of weighted vtime scheduling. + #[clap(short, long, action = clap::ArgAction::SetTrue)] + fifo_sched: bool, + + /// If specified, only tasks which have their scheduling policy set to + /// SCHED_EXT using sched_setscheduler(2) are switched. Otherwise, all + /// tasks are switched. + #[clap(short, long, action = clap::ArgAction::SetTrue)] + partial: bool, + + /// Enable verbose output including libbpf details. Specify multiple + /// times to increase verbosity. + #[clap(short, long, action = clap::ArgAction::Count)] + verbose: u8, +} + +fn read_total_cpu(reader: &mut procfs::ProcReader) -> Result { + Ok(reader + .read_stat() + .context("Failed to read procfs")? + .total_cpu + .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?) +} + +fn now_monotonic() -> u64 { + let mut time = libc::timespec { + tv_sec: 0, + tv_nsec: 0, + }; + let ret = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut time) }; + assert!(ret == 0); + time.tv_sec as u64 * 1_000_000_000 + time.tv_nsec as u64 +} + +fn clear_map(map: &mut libbpf_rs::Map) { + // XXX: libbpf_rs has some design flaw that make it impossible to + // delete while iterating despite it being safe so we alias it here + let deleter: &mut libbpf_rs::Map = unsafe { &mut *(map as *mut _) }; + for key in map.keys() { + let _ = deleter.delete(&key); + } +} + +#[derive(Debug)] +struct TaskLoad { + runnable_for: u64, + load: f64, +} + +#[derive(Debug)] +struct TaskInfo { + pid: i32, + dom_mask: u64, + migrated: Cell, +} + +struct LoadBalancer<'a, 'b, 'c> { + maps: AtroposMapsMut<'a>, + task_loads: &'b mut BTreeMap, + nr_doms: usize, + load_decay_factor: f64, + + tasks_by_load: Vec, TaskInfo>>, + load_avg: f64, + dom_loads: Vec, + + imbal: Vec, + doms_to_push: BTreeMap, u32>, + doms_to_pull: BTreeMap, u32>, + + nr_lb_data_errors: &'c mut u64, +} + +impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> { + const LOAD_IMBAL_HIGH_RATIO: f64 = 0.10; + const LOAD_IMBAL_REDUCTION_MIN_RATIO: f64 = 0.1; + const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50; + + fn new( + maps: AtroposMapsMut<'a>, + task_loads: &'b mut BTreeMap, + nr_doms: usize, + load_decay_factor: f64, + nr_lb_data_errors: &'c mut u64, + ) -> Self { + Self { + maps, + task_loads, + nr_doms, + load_decay_factor, + + tasks_by_load: (0..nr_doms).map(|_| BTreeMap::<_, _>::new()).collect(), + load_avg: 0f64, + dom_loads: vec![0.0; nr_doms], + + imbal: vec![0.0; nr_doms], + doms_to_pull: BTreeMap::new(), + doms_to_push: BTreeMap::new(), + + nr_lb_data_errors, + } + } + + fn read_task_loads(&mut self, period: Duration) -> Result<()> { + let now_mono = now_monotonic(); + let task_data = self.maps.task_data(); + let mut this_task_loads = BTreeMap::::new(); + let mut load_sum = 0.0f64; + self.dom_loads = vec![0f64; self.nr_doms]; + + for key in task_data.keys() { + if let Some(task_ctx_vec) = task_data + .lookup(&key, libbpf_rs::MapFlags::ANY) + .context("Failed to lookup task_data")? + { + let task_ctx = + unsafe { &*(task_ctx_vec.as_slice().as_ptr() as *const atropos_sys::task_ctx) }; + let pid = i32::from_ne_bytes( + key.as_slice() + .try_into() + .context("Invalid key length in task_data map")?, + ); + + let (this_at, this_for, weight) = unsafe { + ( + std::ptr::read_volatile(&task_ctx.runnable_at as *const u64), + std::ptr::read_volatile(&task_ctx.runnable_for as *const u64), + std::ptr::read_volatile(&task_ctx.weight as *const u32), + ) + }; + + let (mut delta, prev_load) = match self.task_loads.get(&pid) { + Some(prev) => (this_for - prev.runnable_for, Some(prev.load)), + None => (this_for, None), + }; + + // Non-zero this_at indicates that the task is currently + // runnable. Note that we read runnable_at and runnable_for + // without any synchronization and there is a small window + // where we end up misaccounting. While this can cause + // temporary error, it's unlikely to cause any noticeable + // misbehavior especially given the load value clamping. + if this_at > 0 && this_at < now_mono { + delta += now_mono - this_at; + } + + delta = delta.min(period.as_nanos() as u64); + let this_load = (weight as f64 * delta as f64 / period.as_nanos() as f64) + .clamp(0.0, weight as f64); + + let this_load = match prev_load { + Some(prev_load) => { + prev_load * self.load_decay_factor + + this_load * (1.0 - self.load_decay_factor) + } + None => this_load, + }; + + this_task_loads.insert( + pid, + TaskLoad { + runnable_for: this_for, + load: this_load, + }, + ); + + load_sum += this_load; + self.dom_loads[task_ctx.dom_id as usize] += this_load; + // Only record pids that are eligible for load balancing + if task_ctx.dom_mask == (1u64 << task_ctx.dom_id) { + continue; + } + self.tasks_by_load[task_ctx.dom_id as usize].insert( + OrderedFloat(this_load), + TaskInfo { + pid, + dom_mask: task_ctx.dom_mask, + migrated: Cell::new(false), + }, + ); + } + } + + self.load_avg = load_sum / self.nr_doms as f64; + *self.task_loads = this_task_loads; + Ok(()) + } + + // To balance dom loads we identify doms with lower and higher load than average + fn calculate_dom_load_balance(&mut self) -> Result<()> { + for (dom, dom_load) in self.dom_loads.iter().enumerate() { + let imbal = dom_load - self.load_avg; + if imbal.abs() >= self.load_avg * Self::LOAD_IMBAL_HIGH_RATIO { + if imbal > 0f64 { + self.doms_to_push.insert(OrderedFloat(imbal), dom as u32); + } else { + self.doms_to_pull.insert(OrderedFloat(-imbal), dom as u32); + } + self.imbal[dom] = imbal; + } + } + Ok(()) + } + + // Find the first candidate pid which hasn't already been migrated and + // can run in @pull_dom. + fn find_first_candidate<'d, I>(tasks_by_load: I, pull_dom: u32) -> Option<(f64, &'d TaskInfo)> + where + I: IntoIterator, &'d TaskInfo)>, + { + match tasks_by_load + .into_iter() + .skip_while(|(_, task)| task.migrated.get() || task.dom_mask & (1 << pull_dom) == 0) + .next() + { + Some((OrderedFloat(load), task)) => Some((*load, task)), + None => None, + } + } + + fn pick_victim( + &self, + (push_dom, to_push): (u32, f64), + (pull_dom, to_pull): (u32, f64), + ) -> Option<(&TaskInfo, f64)> { + let to_xfer = to_pull.min(to_push); + + trace!( + "considering dom {}@{:.2} -> {}@{:.2}", + push_dom, + to_push, + pull_dom, + to_pull + ); + + let calc_new_imbal = |xfer: f64| (to_push - xfer).abs() + (to_pull - xfer).abs(); + + trace!( + "to_xfer={:.2} tasks_by_load={:?}", + to_xfer, + &self.tasks_by_load[push_dom as usize] + ); + + // We want to pick a task to transfer from push_dom to pull_dom to + // maximize the reduction of load imbalance between the two. IOW, + // pick a task which has the closest load value to $to_xfer that can + // be migrated. Find such task by locating the first migratable task + // while scanning left from $to_xfer and the counterpart while + // scanning right and picking the better of the two. + let (load, task, new_imbal) = match ( + Self::find_first_candidate( + self.tasks_by_load[push_dom as usize] + .range((Unbounded, Included(&OrderedFloat(to_xfer)))) + .rev(), + pull_dom, + ), + Self::find_first_candidate( + self.tasks_by_load[push_dom as usize] + .range((Included(&OrderedFloat(to_xfer)), Unbounded)), + pull_dom, + ), + ) { + (None, None) => return None, + (Some((load, task)), None) | (None, Some((load, task))) => { + (load, task, calc_new_imbal(load)) + } + (Some((load0, task0)), Some((load1, task1))) => { + let (new_imbal0, new_imbal1) = (calc_new_imbal(load0), calc_new_imbal(load1)); + if new_imbal0 <= new_imbal1 { + (load0, task0, new_imbal0) + } else { + (load1, task1, new_imbal1) + } + } + }; + + // If the best candidate can't reduce the imbalance, there's nothing + // to do for this pair. + let old_imbal = to_push + to_pull; + if old_imbal * (1.0 - Self::LOAD_IMBAL_REDUCTION_MIN_RATIO) < new_imbal { + trace!( + "skipping pid {}, dom {} -> {} won't improve imbal {:.2} -> {:.2}", + task.pid, + push_dom, + pull_dom, + old_imbal, + new_imbal + ); + return None; + } + + trace!( + "migrating pid {}, dom {} -> {}, imbal={:.2} -> {:.2}", + task.pid, + push_dom, + pull_dom, + old_imbal, + new_imbal, + ); + + Some((task, load)) + } + + // Actually execute the load balancing. Concretely this writes pid -> dom + // entries into the lb_data map for bpf side to consume. + fn load_balance(&mut self) -> Result<()> { + clear_map(self.maps.lb_data()); + + trace!("imbal={:?}", &self.imbal); + trace!("doms_to_push={:?}", &self.doms_to_push); + trace!("doms_to_pull={:?}", &self.doms_to_pull); + + // Push from the most imbalanced to least. + while let Some((OrderedFloat(mut to_push), push_dom)) = self.doms_to_push.pop_last() { + let push_max = self.dom_loads[push_dom as usize] * Self::LOAD_IMBAL_PUSH_MAX_RATIO; + let mut pushed = 0f64; + + // Transfer tasks from push_dom to reduce imbalance. + loop { + let last_pushed = pushed; + + // Pull from the most imbalaned to least. + let mut doms_to_pull = BTreeMap::<_, _>::new(); + std::mem::swap(&mut self.doms_to_pull, &mut doms_to_pull); + let mut pull_doms = doms_to_pull.into_iter().rev().collect::>(); + + for (to_pull, pull_dom) in pull_doms.iter_mut() { + if let Some((task, load)) = + self.pick_victim((push_dom, to_push), (*pull_dom, f64::from(*to_pull))) + { + // Execute migration. + task.migrated.set(true); + to_push -= load; + *to_pull -= load; + pushed += load; + + // Ask BPF code to execute the migration. + let pid = task.pid; + let cpid = (pid as libc::pid_t).to_ne_bytes(); + if let Err(e) = self.maps.lb_data().update( + &cpid, + &pull_dom.to_ne_bytes(), + libbpf_rs::MapFlags::NO_EXIST, + ) { + warn!( + "Failed to update lb_data map for pid={} error={:?}", + pid, &e + ); + *self.nr_lb_data_errors += 1; + } + + // Always break after a successful migration so that + // the pulling domains are always considered in the + // descending imbalance order. + break; + } + } + + pull_doms + .into_iter() + .map(|(k, v)| self.doms_to_pull.insert(k, v)) + .count(); + + // Stop repeating if nothing got transferred or pushed enough. + if pushed == last_pushed || pushed >= push_max { + break; + } + } + } + Ok(()) + } +} + +struct Scheduler<'a> { + skel: AtroposSkel<'a>, + struct_ops: Option, + + nr_cpus: usize, + nr_doms: usize, + load_decay_factor: f64, + balance_load: bool, + + proc_reader: procfs::ProcReader, + + prev_at: SystemTime, + prev_total_cpu: procfs::CpuStat, + task_loads: BTreeMap, + + nr_lb_data_errors: u64, +} + +impl<'a> Scheduler<'a> { + // Returns Vec of cpuset for each dq and a vec of dq for each cpu + fn parse_cpusets( + cpumasks: &[String], + nr_cpus: usize, + ) -> Result<(Vec>, Vec)> { + if cpumasks.len() > atropos_sys::MAX_DOMS as usize { + bail!( + "Number of requested DSQs ({}) is greater than MAX_DOMS ({})", + cpumasks.len(), + atropos_sys::MAX_DOMS + ); + } + let mut cpus = vec![-1i32; nr_cpus]; + let mut cpusets = + vec![bitvec![u64, Lsb0; 0; atropos_sys::MAX_CPUS as usize]; cpumasks.len()]; + for (dq, cpumask) in cpumasks.iter().enumerate() { + let hex_str = { + let mut tmp_str = cpumask + .strip_prefix("0x") + .unwrap_or(cpumask) + .replace('_', ""); + if tmp_str.len() % 2 != 0 { + tmp_str = "0".to_string() + &tmp_str; + } + tmp_str + }; + let byte_vec = hex::decode(&hex_str) + .with_context(|| format!("Failed to parse cpumask: {}", cpumask))?; + + for (index, &val) in byte_vec.iter().rev().enumerate() { + let mut v = val; + while v != 0 { + let lsb = v.trailing_zeros() as usize; + v &= !(1 << lsb); + let cpu = index * 8 + lsb; + if cpu > nr_cpus { + bail!( + concat!( + "Found cpu ({}) in cpumask ({}) which is larger", + " than the number of cpus on the machine ({})" + ), + cpu, + cpumask, + nr_cpus + ); + } + if cpus[cpu] != -1 { + bail!( + "Found cpu ({}) with dq ({}) but also in cpumask ({})", + cpu, + cpus[cpu], + cpumask + ); + } + cpus[cpu] = dq as i32; + cpusets[dq].set(cpu, true); + } + } + cpusets[dq].set_uninitialized(false); + } + + for (cpu, &dq) in cpus.iter().enumerate() { + if dq < 0 { + bail!( + "Cpu {} not assigned to any dq. Make sure it is covered by some --cpumasks argument.", + cpu + ); + } + } + + Ok((cpusets, cpus)) + } + + // Returns Vec of cpuset for each dq and a vec of dq for each cpu + fn cpusets_from_cache( + level: u32, + nr_cpus: usize, + ) -> Result<(Vec>, Vec)> { + let mut cpu_to_cache = vec![]; // (cpu_id, cache_id) + let mut cache_ids = BTreeSet::::new(); + let mut nr_not_found = 0; + + // Build cpu -> cache ID mapping. + for cpu in 0..nr_cpus { + let path = format!("/sys/devices/system/cpu/cpu{}/cache/index{}/id", cpu, level); + let id = match std::fs::read_to_string(&path) { + Ok(val) => val + .trim() + .parse::() + .with_context(|| format!("Failed to parse {:?}'s content {:?}", &path, &val))?, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + nr_not_found += 1; + 0 + } + Err(e) => return Err(e).with_context(|| format!("Failed to open {:?}", &path)), + }; + + cpu_to_cache.push(id); + cache_ids.insert(id); + } + + if nr_not_found > 1 { + warn!( + "Couldn't determine level {} cache IDs for {} CPUs out of {}, assigned to cache ID 0", + level, nr_not_found, nr_cpus + ); + } + + // Cache IDs may have holes. Assign consecutive domain IDs to + // existing cache IDs. + let mut cache_to_dom = BTreeMap::::new(); + let mut nr_doms = 0; + for cache_id in cache_ids.iter() { + cache_to_dom.insert(*cache_id, nr_doms); + nr_doms += 1; + } + + if nr_doms > atropos_sys::MAX_DOMS { + bail!( + "Total number of doms {} is greater than MAX_DOMS ({})", + nr_doms, + atropos_sys::MAX_DOMS + ); + } + + // Build and return dom -> cpumask and cpu -> dom mappings. + let mut cpusets = + vec![bitvec![u64, Lsb0; 0; atropos_sys::MAX_CPUS as usize]; nr_doms as usize]; + let mut cpu_to_dom = vec![]; + + for cpu in 0..nr_cpus { + let dom_id = cache_to_dom[&cpu_to_cache[cpu]]; + cpusets[dom_id as usize].set(cpu, true); + cpu_to_dom.push(dom_id as i32); + } + + Ok((cpusets, cpu_to_dom)) + } + + fn init(opts: &Opts) -> Result { + // Open the BPF prog first for verification. + let mut skel_builder = AtroposSkelBuilder::default(); + skel_builder.obj_builder.debug(opts.verbose > 0); + let mut skel = skel_builder.open().context("Failed to open BPF program")?; + + let nr_cpus = libbpf_rs::num_possible_cpus().unwrap(); + if nr_cpus > atropos_sys::MAX_CPUS as usize { + bail!( + "nr_cpus ({}) is greater than MAX_CPUS ({})", + nr_cpus, + atropos_sys::MAX_CPUS + ); + } + + // Initialize skel according to @opts. + let (cpusets, cpus) = if opts.cpumasks.len() > 0 { + Self::parse_cpusets(&opts.cpumasks, nr_cpus)? + } else { + Self::cpusets_from_cache(opts.cache_level, nr_cpus)? + }; + let nr_doms = cpusets.len(); + skel.rodata().nr_doms = nr_doms as u32; + skel.rodata().nr_cpus = nr_cpus as u32; + + for (cpu, dom) in cpus.iter().enumerate() { + skel.rodata().cpu_dom_id_map[cpu] = *dom as u32; + } + + for (dom, cpuset) in cpusets.iter().enumerate() { + let raw_cpuset_slice = cpuset.as_raw_slice(); + let dom_cpumask_slice = &mut skel.rodata().dom_cpumasks[dom]; + let (left, _) = dom_cpumask_slice.split_at_mut(raw_cpuset_slice.len()); + left.clone_from_slice(cpuset.as_raw_slice()); + let cpumask_str = dom_cpumask_slice + .iter() + .take((nr_cpus + 63) / 64) + .rev() + .fold(String::new(), |acc, x| format!("{} {:016X}", acc, x)); + info!( + "DOM[{:02}] cpumask{} ({} cpus)", + dom, + &cpumask_str, + cpuset.count_ones() + ); + } + + skel.rodata().slice_us = opts.slice_us; + skel.rodata().kthreads_local = opts.kthreads_local; + skel.rodata().fifo_sched = opts.fifo_sched; + skel.rodata().switch_partial = opts.partial; + skel.rodata().greedy_threshold = opts.greedy_threshold; + + // Attach. + let mut skel = skel.load().context("Failed to load BPF program")?; + skel.attach().context("Failed to attach BPF program")?; + let struct_ops = Some( + skel.maps_mut() + .atropos() + .attach_struct_ops() + .context("Failed to attach atropos struct ops")?, + ); + info!("Atropos Scheduler Attached"); + + // Other stuff. + let mut proc_reader = procfs::ProcReader::new(); + let prev_total_cpu = read_total_cpu(&mut proc_reader)?; + + Ok(Self { + skel, + struct_ops, // should be held to keep it attached + + nr_cpus, + nr_doms, + load_decay_factor: opts.load_decay_factor.clamp(0.0, 0.99), + balance_load: !opts.no_load_balance, + + proc_reader, + + prev_at: SystemTime::now(), + prev_total_cpu, + task_loads: BTreeMap::new(), + + nr_lb_data_errors: 0, + }) + } + + fn get_cpu_busy(&mut self) -> Result { + let total_cpu = read_total_cpu(&mut self.proc_reader)?; + let busy = match (&self.prev_total_cpu, &total_cpu) { + ( + procfs::CpuStat { + user_usec: Some(prev_user), + nice_usec: Some(prev_nice), + system_usec: Some(prev_system), + idle_usec: Some(prev_idle), + iowait_usec: Some(prev_iowait), + irq_usec: Some(prev_irq), + softirq_usec: Some(prev_softirq), + stolen_usec: Some(prev_stolen), + guest_usec: _, + guest_nice_usec: _, + }, + procfs::CpuStat { + user_usec: Some(curr_user), + nice_usec: Some(curr_nice), + system_usec: Some(curr_system), + idle_usec: Some(curr_idle), + iowait_usec: Some(curr_iowait), + irq_usec: Some(curr_irq), + softirq_usec: Some(curr_softirq), + stolen_usec: Some(curr_stolen), + guest_usec: _, + guest_nice_usec: _, + }, + ) => { + let idle_usec = curr_idle - prev_idle; + let iowait_usec = curr_iowait - prev_iowait; + let user_usec = curr_user - prev_user; + let system_usec = curr_system - prev_system; + let nice_usec = curr_nice - prev_nice; + let irq_usec = curr_irq - prev_irq; + let softirq_usec = curr_softirq - prev_softirq; + let stolen_usec = curr_stolen - prev_stolen; + + let busy_usec = + user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec; + let total_usec = idle_usec + busy_usec + iowait_usec; + busy_usec as f64 / total_usec as f64 + } + _ => { + bail!("Some procfs stats are not populated!"); + } + }; + + self.prev_total_cpu = total_cpu; + Ok(busy) + } + + fn read_bpf_stats(&mut self) -> Result> { + let mut maps = self.skel.maps_mut(); + let stats_map = maps.stats(); + let mut stats: Vec = Vec::new(); + let zero_vec = vec![vec![0u8; stats_map.value_size() as usize]; self.nr_cpus]; + + for stat in 0..atropos_sys::stat_idx_ATROPOS_NR_STATS { + let cpu_stat_vec = stats_map + .lookup_percpu(&(stat as u32).to_ne_bytes(), libbpf_rs::MapFlags::ANY) + .with_context(|| format!("Failed to lookup stat {}", stat))? + .expect("per-cpu stat should exist"); + let sum = cpu_stat_vec + .iter() + .map(|val| { + u64::from_ne_bytes( + val.as_slice() + .try_into() + .expect("Invalid value length in stat map"), + ) + }) + .sum(); + stats_map + .update_percpu( + &(stat as u32).to_ne_bytes(), + &zero_vec, + libbpf_rs::MapFlags::ANY, + ) + .context("Failed to zero stat")?; + stats.push(sum); + } + Ok(stats) + } + + fn report( + &self, + stats: &Vec, + cpu_busy: f64, + processing_dur: Duration, + load_avg: f64, + dom_loads: &Vec, + imbal: &Vec, + ) { + let stat = |idx| stats[idx as usize]; + let total = stat(atropos_sys::stat_idx_ATROPOS_STAT_WAKE_SYNC) + + stat(atropos_sys::stat_idx_ATROPOS_STAT_PREV_IDLE) + + stat(atropos_sys::stat_idx_ATROPOS_STAT_PINNED) + + stat(atropos_sys::stat_idx_ATROPOS_STAT_DIRECT_DISPATCH) + + stat(atropos_sys::stat_idx_ATROPOS_STAT_DSQ_DISPATCH) + + stat(atropos_sys::stat_idx_ATROPOS_STAT_GREEDY) + + stat(atropos_sys::stat_idx_ATROPOS_STAT_LAST_TASK); + + info!( + "cpu={:6.1} load_avg={:7.1} bal={} task_err={} lb_data_err={} proc={:?}ms", + cpu_busy * 100.0, + load_avg, + stats[atropos_sys::stat_idx_ATROPOS_STAT_LOAD_BALANCE as usize], + stats[atropos_sys::stat_idx_ATROPOS_STAT_TASK_GET_ERR as usize], + self.nr_lb_data_errors, + processing_dur.as_millis(), + ); + + let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0; + + info!( + "tot={:6} wsync={:4.1} prev_idle={:4.1} pin={:4.1} dir={:4.1} dq={:4.1} greedy={:4.1}", + total, + stat_pct(atropos_sys::stat_idx_ATROPOS_STAT_WAKE_SYNC), + stat_pct(atropos_sys::stat_idx_ATROPOS_STAT_PREV_IDLE), + stat_pct(atropos_sys::stat_idx_ATROPOS_STAT_PINNED), + stat_pct(atropos_sys::stat_idx_ATROPOS_STAT_DIRECT_DISPATCH), + stat_pct(atropos_sys::stat_idx_ATROPOS_STAT_DSQ_DISPATCH), + stat_pct(atropos_sys::stat_idx_ATROPOS_STAT_GREEDY), + ); + + for i in 0..self.nr_doms { + info!( + "DOM[{:02}] load={:7.1} to_pull={:7.1} to_push={:7.1}", + i, + dom_loads[i], + if imbal[i] < 0.0 { -imbal[i] } else { 0.0 }, + if imbal[i] > 0.0 { imbal[i] } else { 0.0 }, + ); + } + } + + fn step(&mut self) -> Result<()> { + let started_at = std::time::SystemTime::now(); + let bpf_stats = self.read_bpf_stats()?; + let cpu_busy = self.get_cpu_busy()?; + + let mut lb = LoadBalancer::new( + self.skel.maps_mut(), + &mut self.task_loads, + self.nr_doms, + self.load_decay_factor, + &mut self.nr_lb_data_errors, + ); + + lb.read_task_loads(started_at.duration_since(self.prev_at)?)?; + lb.calculate_dom_load_balance()?; + + if self.balance_load { + lb.load_balance()?; + } + + // Extract fields needed for reporting and drop lb to release + // mutable borrows. + let (load_avg, dom_loads, imbal) = (lb.load_avg, lb.dom_loads, lb.imbal); + + self.report( + &bpf_stats, + cpu_busy, + std::time::SystemTime::now().duration_since(started_at)?, + load_avg, + &dom_loads, + &imbal, + ); + + self.prev_at = started_at; + Ok(()) + } + + fn read_bpf_exit_type(&mut self) -> i32 { + unsafe { std::ptr::read_volatile(&self.skel.bss().exit_type as *const _) } + } + + fn report_bpf_exit_type(&mut self) -> Result<()> { + // Report msg if EXT_OPS_EXIT_ERROR. + match self.read_bpf_exit_type() { + 0 => Ok(()), + etype if etype == 2 => { + let cstr = unsafe { CStr::from_ptr(self.skel.bss().exit_msg.as_ptr() as *const _) }; + let msg = cstr + .to_str() + .context("Failed to convert exit msg to string") + .unwrap(); + bail!("BPF exit_type={} msg={}", etype, msg); + } + etype => { + info!("BPF exit_type={}", etype); + Ok(()) + } + } + } +} + +impl<'a> Drop for Scheduler<'a> { + fn drop(&mut self) { + if let Some(struct_ops) = self.struct_ops.take() { + drop(struct_ops); + } + } +} + +fn main() -> Result<()> { + let opts = Opts::parse(); + + let llv = match opts.verbose { + 0 => simplelog::LevelFilter::Info, + 1 => simplelog::LevelFilter::Debug, + _ => simplelog::LevelFilter::Trace, + }; + let mut lcfg = simplelog::ConfigBuilder::new(); + lcfg.set_time_level(simplelog::LevelFilter::Error) + .set_location_level(simplelog::LevelFilter::Off) + .set_target_level(simplelog::LevelFilter::Off) + .set_thread_level(simplelog::LevelFilter::Off); + simplelog::TermLogger::init( + llv, + lcfg.build(), + simplelog::TerminalMode::Stderr, + simplelog::ColorChoice::Auto, + )?; + + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = shutdown.clone(); + ctrlc::set_handler(move || { + shutdown_clone.store(true, Ordering::Relaxed); + }) + .context("Error setting Ctrl-C handler")?; + + let mut sched = Scheduler::init(&opts)?; + + while !shutdown.load(Ordering::Relaxed) && sched.read_bpf_exit_type() == 0 { + std::thread::sleep(Duration::from_secs_f64(opts.interval)); + sched.step()?; + } + + sched.report_bpf_exit_type() +}