I am reading chapter 29 of OS: the three easy pieces , which is about concurrent data structures. The first example these is an approximate counter. This data structure increments numbers by using a global Mutex and several local Mutexes with local counters. When a local counter hits a threshold, it grabs the global mutex and flushes its local counter number to the global counter.
This chapter shows code in C language. Since I'm practising Rust language, I implemented the concurrent data structure in Rust. When I see my code, I feel that there must be room to improve my code quality and performance.
Notes:
The book shows a graph that approximate counter scales very well, but in my test code, performance of my approximate counter was not good.
Because mutex is used in the original C code, I used mutex in the Rust code. but the rust programming language book in the official site, shows an example of using
std::sync::mpsc
module. Would it be better to use message rather than mutex to avoid mistakes made by humans?
Original code in OS book:
typedef struct __counter_t { int global; // global count pthread_mutex_t glock; // global lock int local[NUMCPUS]; // per-CPU count pthread_mutex_t llock[NUMCPUS]; // ... and locks int threshold; // update frequency } counter_t; // init: record threshold, init locks, init values // of all local counts and global count void init(counter_t *c, int threshold) { c->threshold = threshold; c->global = 0; pthread_mutex_init(&c->glock, NULL); int i; for (i = 0; i < NUMCPUS; i++) { c->local[i] = 0; pthread_mutex_init(&c->llock[i], NULL); } } // update: usually, just grab local lock and update // local amount; once local count has risen ’threshold’, // grab global lock and transfer local values to it void update(counter_t *c, int threadID, int amt) { int cpu = threadID % NUMCPUS; pthread_mutex_lock(&c->llock[cpu]); c->local[cpu] += amt; if (c->local[cpu] >= c->threshold) { // transfer to global (assumes amt>0) pthread_mutex_lock(&c->glock); c->global += c->local[cpu]; pthread_mutex_unlock(&c->glock); c->local[cpu] = 0; } pthread_mutex_unlock(&c->llock[cpu]); } // get: just return global amount (approximate) int get(counter_t *c) { pthread_mutex_lock(&c->glock); int val = c->global; pthread_mutex_unlock(&c->glock); return val; // only approximate! }
My Counter traits in counter.rs module.
use std::fmt;
use std::sync::Mutex;
pub struct Counter {
value: Mutex<i32>
}
impl Counter {
pub fn new() -> Self {
Counter { value: Mutex::new(0)}
}
pub fn test_and_increment(&self) -> i32 {
let mut value = self.value.lock().unwrap();
*value += 1;
if *value >= 250 {
let old = *value;
*value = 0;
return old;
}
else {
return 0;
}
}
pub fn get(&self) -> i32 {
*(self.value.lock().unwrap())
}
pub fn add(&self, value: i32) {
*(self.value.lock().unwrap()) += value;
}
}
impl fmt::Display for Counter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", *self.value.lock().unwrap())
}
}
pub struct ApproximateCounter {
value: Counter,
local_counters: [Counter; 4]
}
impl ApproximateCounter {
pub fn new() -> Self {
ApproximateCounter {
value: Counter::new(),
local_counters: [Counter::new(), Counter::new(), Counter::new(), Counter::new()]
}
}
pub fn increment(&self, i: usize) {
let local_value = self.local_counters[i].test_and_increment();
if local_value > 0 {
self.value.add(local_value);
}
}
pub fn get(&self) -> i32 {
self.value.get()
}
}
main.rs
use std::time::Instant;
use std::sync::Arc;
use std::thread;
mod counter;
const NUM_TO_LOOP: i32 = 1000000000;
fn main() {
{
let now = Instant::now();
let counter = counter::Counter::new();
let mut result = 0;
for _ in 0..(NUM_TO_LOOP) {
result += counter.test_and_increment();
}
let elapsed = now.elapsed();
println!("{}", result);
println!("[one thread + thread-safe counter] Elapsed: {:.2?}", elapsed);
}
{
let now = Instant::now();
let counter = Arc::new(counter::ApproximateCounter::new());
let mut threads = Vec::new();
for i in 0..4 {
let c_counter = counter.clone();
threads.push(thread::spawn(move || {
for _ in 0..(NUM_TO_LOOP / 4) {
c_counter.increment(i);
}
}));
}
for thread in threads {
thread.join().unwrap();
}
println!("{}", counter.get());
let elapsed = now.elapsed();
println!("[four threads + ApproximateCounter]: Elapsed: {:.2?}", elapsed)
}
}
Result of running main.rs:
1000000000
[one thread + thread-safe counter] Elapsed: 146.02s
1000000000
[four threads + ApproximateCounter]: Elapsed: 104.38s
After following the answer by @Matthieu M. I got following result.
Using Mutex without false sharing
1000000000
[one thread + thread-safe counter] Elapsed: 144.44s
1000000000
[four threads + ApproximateCounter]: Elapsed: 77.16s
Using Atomic with false sharing
999999813
[one thread + thread-safe counter] Elapsed: 35.47s
999999060
[four threads + ApproximateCounter]: Elapsed: 36.40s
Using atomic without false sharing
999999813
[one thread + thread-safe counter] Elapsed: 46.65s
999999060
[four threads + ApproximateCounter]: Elapsed: 28.29s
Still need to figure out why Atomic do not give 1000000000 in one thread test.
1 Answer 1
Your code is looking pretty good, overall. Well done.
Embrace Everything is an Expression
In Rust, nearly everything is an expression. In particular, an if
is an expression, a block is an expression, etc...
As a result, it is superfluous to use a return
statement as the last statement of a function: the function body will return the value of the function "block" expression anyway, for example:
impl Counter {
pub fn test_and_increment(&self) -> i32 {
let mut value = self.value.lock().unwrap();
*value += 1;
if *value >= 250 {
let old = *value;
*value = 0;
old
} else {
0
}
}
}
mem
is Full of Goodies
The core::mem
(or std::mem
) module is full of simple functions to perform swaps/exchanges of values: replace
, swap
, take
, ...
In this case, both replace
and take
would be appropriate, with the former being more explicit: take(x)
is replace(x, Default::default())
but not everybody may immediately realize that the default is 0.
impl Counter {
pub fn test_and_increment(&self) -> i32 {
let mut value = self.value.lock().unwrap();
*value += 1;
if *value >= 250 {
mem::replace(&mut *value, 0)
} else {
0
}
}
}
Implement Default
when appropriate
When a struct
can be constructed without any argument, it should implement the Default
trait.
You can do so manually:
impl Default for Counter {
fn default() -> Self { Self::new() }
}
Although if no special logic is necessary, it's simpler to just derive
it:
#[derive(Default)]
struct Counter { ... }
The derive
will simply use the Default
value for each field.
The same applies to ApproximateCounter
.
Derive more
Speaking of derive
, it is generally good form to derive a number of useful properties. The available ones in the standard library are:
#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct Counter { ... }
Each deriving the trait of the same name.
Now, they may not all be appropriate -- or even possible -- however I would recommend always implementing Debug
at the very least, and Clone
whenever possible as well.
Do Not Repeat Yourself
Your implementation of Display
reaches deep inside the value, when there's a perfectly good getter that already does the job:
impl fmt::Display for Counter {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.get())
}
}
Use Clippy!
You can run Clippy with cargo clippy
. Formally, Clippy is a linter, but I like to think of it as a mentor: it would have highlighted that you should implement Default
for example.
It contains lots of simple lints to make your code simpler, and more idiomatic, and it's constantly updated to keep up with new standard library functions or new language features.
Use the Rust Playground
The Rust Playground allows easily trying out small snippets of code.
You can build the code (the default), and on the right you'll find tools such as cargo fmt
(to reformat your code) and cargo clippy
(to lint errors).
It's also very handy to share your code with others, as people can immediately start tinkering.
Perf Hint: Atomics & Alignment.
The goal of the exercise was to use Mutex
, however it is not here the best tool for the job: an AtomicI32
would be equally suited, and yield better performance.
For example, here is an atomic-based implementation (also available on the playground):
#[derive(Debug, Default)]
pub struct Counter {
value: AtomicI32,
}
impl Counter {
pub fn new() -> Self { Self::default() }
pub fn test_and_increment(&self) -> i32 {
let value = self.value.fetch_add(1, Ordering::Relaxed);
if value >= 250 {
self.value.swap(0, Ordering::Relaxed)
} else {
0
}
}
pub fn get(&self) -> i32 {
self.value.load(Ordering::Relaxed)
}
pub fn add(&self, value: i32) {
self.value.fetch_add(value, Ordering::Relaxed);
}
}
This should be quite faster.
Another avenue of improvement is alignment. CPUs in general operate at the cache-line level, which is often 64 bytes. This means that if two values are within the same 64 bytes, even though they are independent, writing to one requires exclusive access to both. This leads to a "ping-pong" if one core writes to one in a loop and another core writes to the other in a loop in parallel. This is called false sharing.
Intel CPUs are worse, in this regard, in that they often grab 2 cache lines at a time, instead of 1, and thus requires values to be in separate 128 bytes bins.
It can be beneficial to pad structures, or force their alignment, to avoid this. This is as simple as using repr
with the align
directive:
#[derive(Debug, Default)]
#[repr(align(128))]
pub struct Counter {
value: AtomicI32,
}
-
1\$\begingroup\$ Thank you so much for spending your time on this question. It's nice to know what else to study in rust. \$\endgroup\$ybjeon01– ybjeon012022年10月12日 14:15:05 +00:00Commented Oct 12, 2022 at 14:15
counter.test_and_increment()
only returns non-zero when the count is above a threshold. So unless you massively fluke and hit the threshold on exactly iteration number 1,000,000,000, it won’t return the final count. Or, put another way: right after the loop, tryresult += counter.get();
. \$\endgroup\$