Skip to content

Commit

Permalink
[Features][CustomSched][BPFLib]
Browse files Browse the repository at this point in the history
    * Custom Scheduling
    * Capturing cgroup stats on per invoke level
    * BPFLibrary to create a pinned BPFMap of function characteristics
      * used to share data with scx scheduler
  • Loading branch information
abrehman94 authored and Abdul Rehman committed Nov 20, 2024
1 parent 696b116 commit 39e4a48
Show file tree
Hide file tree
Showing 77 changed files with 3,899 additions and 165 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ __pycache__/*.lock
**.lock
**__pycache__**
.idea/
.vscode/
.vscode/
**.swp
15 changes: 15 additions & 0 deletions pr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@



```
Error when --all-features is used for check command.
Compiling libbpf-sys v1.5.0+v1.5.0
error: could not find native static library `elf`, perhaps an -L flag is missing?
error: could not compile `libbpf-sys` (lib) due to 1 previous error
make: *** [Makefile:26: check] Error 101
```



4 changes: 2 additions & 2 deletions src/Ilúvatar/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ iluvatar_worker/src/worker.dev*json
iluvatar_worker_library/tests/resources/worker.dev*json
iluvatar_controller/src/controller.dev*json
target/
**.swp
.vscode/
**.sw*
.vscode/
4 changes: 3 additions & 1 deletion src/Ilúvatar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ members = [
"iluvatar_library",
"iluvatar_worker_library",
"iluvatar_energy_mon",
"iluvatar_rpc"
"iluvatar_rpc",
"iluvatar_bpf_library",
"fs_policy_tsksz",
]
resolver = "2"

Expand Down
6 changes: 5 additions & 1 deletion src/Ilúvatar/Cross.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ passthrough = ["ARCH=amd64", "GO_VERSION=1.22.0", "CNI_VERSION=v1.1.1", "GOPATH=
default-target = "x86_64-unknown-linux-gnu" # use this target if none is explicitly provided
pre-build = [ # additional commands to run prior to building the package
"dpkg --add-architecture $CROSS_DEB_ARCH",
"apt-get update && apt-get --assume-yes install protobuf-compiler iproute2 wget curl runc bridge-utils iptables net-tools sysstat"
"apt-get update && apt-get --assume-yes install protobuf-compiler iproute2 wget curl runc bridge-utils iptables net-tools sysstat libelf-dev lsb-release wget software-properties-common gnupg",
"wget https://apt.llvm.org/llvm.sh && chmod +x llvm.sh && ./llvm.sh all && ln -s /usr/bin/ld.lld-18 /usr/bin/ld.lld && ln -s /usr/bin/clang-18 /usr/bin/clang && ln -s /usr/bin/clang++-18 /usr/bin/clang++ && rm llvm.sh"
]



8 changes: 6 additions & 2 deletions src/Ilúvatar/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ endif
RUST_C?=cross
DEBUG_FLAGS=--all-targets $(TARGET) -j $(NPROCS)
RELEASE_FLAGS=--lib --bins $(TARGET) -j $(NPROCS) --release
CARGO_ARGS?=""
CARGO_ARGS?=

default: debug

Expand All @@ -23,7 +23,7 @@ clean:
@$(RUST_C) clean
check:
@echo "Checking"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) check --all-features $(DEBUG_FLAGS) $(CARGO_ARGS)
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) check $(DEBUG_FLAGS) $(CARGO_ARGS)
release:
@echo "Building release"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) build $(RELEASE_FLAGS) $(CARGO_ARGS)
Expand All @@ -37,6 +37,10 @@ tiny:
debug:
@echo "Building debug"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) build $(DEBUG_FLAGS) $(CARGO_ARGS)
fix:
@echo "Fixing lint errors"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) fix $(DEBUG_FLAGS) $(CARGO_ARGS)
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) clippy --fix --allow-no-vcs --workspace --examples --benches --no-deps -- -Dclippy::suspicious -Dclippy::correctness -Dclippy::perf -Aclippy::single_match -Aclippy::new_without_default -Aclippy::too_many_arguments -Aclippy::type-complexity -Dclippy::from_over_into -Aclippy::redundant-field-names -Dwarnings
spans:
@echo "Building full_spans"
@RUSTFLAGS=$(RUST_FLAGS) $(RUST_C) build --features full_spans $(RELEASE_FLAGS) $(CARGO_ARGS)
Expand Down
8 changes: 8 additions & 0 deletions src/Ilúvatar/ansible/worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@
remote_src: "{{__remote_bin_src}}"
become: yes

- name: copy fs_policy_tsksz
ansible.builtin.copy:
src: "{{__bin_src}}/fs_policy_tsksz"
dest: "{{bin_dir}}/"
mode: "preserve"
remote_src: "{{__remote_bin_src}}"
become: yes

- name: copy worker config
ansible.builtin.copy:
src: "{{__bin_src}}/{{worker.config_name}}"
Expand Down
Empty file.
33 changes: 33 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "fs_policy_tsksz"
version = "0.0.3"
authors = ["Abdul Rehman <[email protected]>"]
edition = "2021"
description = "A simple scheduler that preserves locality for a function cgroup"
license = "GPL-2.0-only"

[dependencies]
anyhow = "1.0.65"
plain = "0.2.3"
ctrlc = { version = "3.1", features = ["termination"] }
libbpf-rs = "0.24.1"
libc = "0.2.137"
scx_utils = { version = "1.0.6" }
scx_rustland_core = { version = "2.2.3" }

# Specific to iluvatar
iluvatar_library = { path = "../iluvatar_library" }
iluvatar_worker_library = { path = "../iluvatar_worker_library" }
clap = { version = "4.5.4", features = ["derive"] }
ipc-channel = { version = "0.18.1", features = ["memfd"] }
serde = { version = "1.0" }

[build-dependencies]
scx_utils = { version = "1.0.6" }
scx_rustland_core = { version = "2.2.3" }

[features]
enable_backtrace = []



1 change: 1 addition & 0 deletions src/Ilúvatar/fs_policy_tsksz/LICENSE
20 changes: 20 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# scx_rlfifo

This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main).

## Overview

scx_rlfifo is a simple FIFO scheduler runs in user-space, based on the
scx_rustland_core framework.

## Typical Use Case

This scheduler is provided as a simple template that can be used as a baseline
to test more complex scheduling policies.

## Production Ready?

Definitely not. Using this scheduler in a production environment is not
recommended, unless there are specific requirements that necessitate a basic
FIFO scheduling approach. Even then, it's still recommended to use the kernel's
SCHED_FIFO real-time class.
11 changes: 11 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.

fn main() {
scx_utils::BpfBuilder::new()
.unwrap()
.enable_intf("src/bpf/intf.h", "bpf_intf.rs")
.enable_skel("src/bpf/main.bpf.c", "bpf")
.build()
.unwrap();
}
14 changes: 14 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/meson.build
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
if serialize
sched_deps = [libbpf, bpftool_target, sched]
else
sched_deps = [libbpf, bpftool_target]
endif

sched = custom_target('scx_rlfifo',
output: '@PLAINNAME@.__PHONY__',
input: 'Cargo.toml',
command: [cargo, 'build', '--manifest-path=@INPUT@', '--target-dir=@OUTDIR@',
cargo_build_args],
env: cargo_env,
depends: sched_deps,
build_always_stale: true)
8 changes: 8 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Get help on options with `rustfmt --help=config`
# Please keep these in alphabetical order.
edition = "2021"
group_imports = "StdExternalCrate"
imports_granularity = "Item"
merge_derives = false
use_field_init_shorthand = true
version = "Two"
187 changes: 187 additions & 0 deletions src/Ilúvatar/fs_policy_tsksz/src/bpf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright (c) Andrea Righi <[email protected]>

// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.

use std::mem::MaybeUninit;

use crate::bpf_intf;
use crate::bpf_skel::*;

use anyhow::Context;
use anyhow::Result;

use libbpf_rs::skel::OpenSkel;
use libbpf_rs::skel::Skel;
use libbpf_rs::skel::SkelBuilder;
use libbpf_rs::OpenObject;

use libc::{pthread_self, pthread_setschedparam, sched_param};

#[cfg(target_env = "musl")]
use libc::timespec;

use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::UserExitInfo;

use scx_rustland_core::ALLOCATOR;

// Defined in UAPI
const SCHED_EXT: i32 = 7;

pub struct BpfScheduler<'cb> {
pub skel: BpfSkel<'cb>, // Low-level BPF connector
struct_ops: Option<libbpf_rs::Link>, // Low-level BPF methods
queued_stats: libbpf_rs::RingBuffer<'cb>, // ring buffer of tasks pids to be switched to schedext
}

#[derive(Clone, Copy, Debug)]
#[allow(non_camel_case_types, dead_code)]
pub struct lpolicy_stats(bpf_intf::policy_stats);

macro_rules! define_buffer {
( $bufname: ident, $abufname: ident, $abuf: ident, $callback: ident, $tdst: ty ) => {
const $bufname: usize = std::mem::size_of::<$tdst>();
#[repr(align(8))]
struct $abufname([u8; $bufname]);
static mut $abuf: $abufname = $abufname([0; $bufname]);
fn $callback(data: &[u8]) -> i32 {
unsafe {
$abuf.0.copy_from_slice(data);
}
LIBBPF_STOP
}
};
}

define_buffer!(
BUFSIZE_STATS,
AlignedBufferstats,
BUF_STATS,
callback_stats,
bpf_intf::policy_stats
);
fn fetch_stats(bytes: &[u8]) -> lpolicy_stats {
let ps = unsafe { *(bytes.as_ptr() as *const bpf_intf::policy_stats) };
lpolicy_stats(ps)
}

// Special negative error code for libbpf to stop after consuming just one item from a BPF
// ring buffer.
const LIBBPF_STOP: i32 = -255;

impl<'cb> BpfScheduler<'cb> {
pub fn init(
open_object: &'cb mut MaybeUninit<OpenObject>,
slice_us: u64,
exit_dump_len: u32,
verbose: bool,
) -> Result<Self> {
// Open the BPF prog first for verification.
let mut skel_builder = BpfSkelBuilder::default();
skel_builder.obj_builder.debug(verbose);
let mut skel = scx_ops_open!(skel_builder, open_object, tsksz_ops)?;

// Lock all the memory to prevent page faults that could trigger potential deadlocks during
// scheduling.
ALLOCATOR.lock_memory();

skel.struct_ops.tsksz_ops_mut().exit_dump_len = exit_dump_len;
skel.maps.bss_data.usersched_pid = std::process::id();
skel.maps.rodata_data.effective_slice_ns = slice_us * 1000;

let path = "/sys/fs/bpf/func_metadata";
let func_metadata = &mut skel.maps.func_metadata;
assert!(func_metadata.reuse_pinned_map("/asdf").is_err());
func_metadata
.reuse_pinned_map(path)
.expect("failed to reuse map");

// Attach BPF scheduler.
let mut skel = scx_ops_load!(skel, tsksz_ops, uei)?;
let struct_ops = Some(scx_ops_attach!(skel, tsksz_ops)?);

// Build the ring buffer of queued tasks.
let rb_map = &mut skel.maps.queued_stats;
let mut builder = libbpf_rs::RingBufferBuilder::new();
builder.add(rb_map, callback_stats).unwrap();
let queued_stats = builder.build().unwrap();

// Make sure to use the SCHED_EXT class at least for the scheduler itself.
match Self::use_sched_ext() {
0 => Ok(Self {
skel,
struct_ops,
queued_stats,
}),
err => Err(anyhow::Error::msg(format!(
"sched_setscheduler error: {}",
err
))),
}
}

// Receive stats from the BPF scheduler to switch to schedext policy.
pub fn dequeue_stats(&mut self) -> Result<Option<lpolicy_stats>, i32> {
match self.queued_stats.consume_raw() {
0 => Ok(None),
LIBBPF_STOP => {
// A valid pid is received, convert data to a proper pid.
let stats = unsafe { fetch_stats(&BUF_STATS.0) };
Ok(Some(stats))
}
res if res < 0 => Err(res),
res => panic!(
"Unexpected return value from libbpf-rs::consume_raw(): {}",
res
),
}
}

// Set scheduling class for the scheduler itself to SCHED_EXT
fn use_sched_ext() -> i32 {
#[cfg(target_env = "gnu")]
let param: sched_param = sched_param { sched_priority: 0 };
#[cfg(target_env = "musl")]
let param: sched_param = sched_param {
sched_priority: 0,
sched_ss_low_priority: 0,
sched_ss_repl_period: timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_init_budget: timespec {
tv_sec: 0,
tv_nsec: 0,
},
sched_ss_max_repl: 0,
};

unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) }
}

// Read exit code from the BPF part.
pub fn exited(&mut self) -> bool {
uei_exited!(&self.skel, uei)
}

// Called on exit to shutdown and report exit message from the BPF part.
pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
self.struct_ops.take();
uei_report!(&self.skel, uei)
}
}

// Disconnect the low-level BPF scheduler.
impl<'a> Drop for BpfScheduler<'a> {
fn drop(&mut self) {
if let Some(struct_ops) = self.struct_ops.take() {
drop(struct_ops);
}
ALLOCATOR.unlock_memory();
}
}
Loading

0 comments on commit 39e4a48

Please sign in to comment.