By RubberDuck


2019-01-07 12:45:31 8 Comments

I was bored the other day and got to wondering how I would implement a circular FIFO buffer in Rust. My goal was to create an implementation I could use on a bare metal microcontroller. This means no dynamic allocation and a fixed size known at compile time, so I used a primitive fixed size array instead of a vector as a backing store.

My next pass at this will be to make the type stored in the FIFO buffer generic and to find a way to specify the capacity, but I wanted to get some feedback before doing so. Am I missing any test cases? What can I improve here?

src/lib.rs

const FIFO_CAPACITY: usize = 32;

pub struct Fifo {
    size: usize,
    read_idx: usize,
    write_idx: usize,
    buffer: [u8; FIFO_CAPACITY]
}

impl Fifo {
    pub fn new() -> Fifo {
        Fifo {
            size: 0,
            read_idx: 0,
            write_idx: 0,
            buffer: [0; FIFO_CAPACITY]
        }
    }

    pub fn push(&mut self, item: u8) -> Result<(), &'static str> {
        if self.buffer_full() {
           Err("Buffer full.")        
        } else {
            self.buffer[self.write_idx] = item;

            self.write_idx = Fifo::increment_index(self.write_idx);
            self.size = self.size + 1;

            Ok(())
        }
    }

    pub fn pop(&mut self) -> Option<u8> {
        if self.size == 0 {
            None
        } else {
            let result = self.buffer[self.read_idx];

            self.read_idx = Fifo::increment_index(self.read_idx); 
            self.size = self.size - 1;

            Some(result)        
        }
    }

    pub fn buffer_full(&self) -> bool {
        self.size == FIFO_CAPACITY
    }

    fn increment_index(idx: usize) -> usize {
        (idx + 1) % FIFO_CAPACITY
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn pop_item_that_was_pushed_to_buffer() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(5).is_ok());
        let pop_result = buffer.pop();
        assert_eq!(Some(5), pop_result);
    }

    #[test]
    fn popping_empty_buffer_returns_none() {
        let mut buffer = Fifo::new();
        assert_eq!(None, buffer.pop());
    }

    #[test]
    fn popping_returns_first_pushed_first() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(1).is_ok());
        assert!(buffer.push(2).is_ok());

        assert_eq!(Some(1), buffer.pop());
        assert_eq!(Some(2), buffer.pop());
    }

    #[test]
    fn pop_beyond_write_index_returns_none() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(1).is_ok());
        assert_eq!(Some(1), buffer.pop());
        assert_eq!(None, buffer.pop());
    }

    #[test]
    fn pop_beyond_write_index_continuing_on_works() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(1).is_ok());
        assert_eq!(Some(1), buffer.pop());
        assert_eq!(None, buffer.pop());

        assert!(buffer.push(2).is_ok());
        assert_eq!(Some(2), buffer.pop());
    }

    #[test]
    fn buffer_wraps_around() {
        let mut buffer = Fifo::new();
        let capacity = FIFO_CAPACITY as u8;
        for x in 0..=(capacity * 3) {
            assert!(buffer.push(x).is_ok());
            assert_eq!(Some(x), buffer.pop());
        }
    }

    #[test]
    fn push_fails_if_buffer_is_full() {
        let mut buffer = Fifo::new();
        let capacity = FIFO_CAPACITY as u8;

        for x in 0..(capacity) {
            assert!(buffer.push(x).is_ok());
        }

        assert!(buffer.push(capacity + 1).is_err())
    }
}

2 comments

@Ilmari Karonen 2019-01-07 19:31:44

As a generic remark, note that your FIFO always satisfies the invariant:

write_idx == (read_idx + size) % FIFO_CAPACITY

Thus, if you wanted to save some space, you could get rid of the write_idx property entirely and rewrite your push method as:

pub fn push(&mut self, item: u8) -> Result<(), &'static str> {
    if self.buffer_full() {
        Err("Buffer full.")
    } else {
        let write_idx = (self.read_idx + self.size) % FIFO_CAPACITY;
        self.buffer[write_idx] = item;

        self.size = self.size + 1;

        Ok(())
    }
}

(I hope that's syntactically correct, anyway... my Rust is rather, um, rusty and I haven't actually tested to see if the code above compiles. If not, please assume it's meant to be pseudocode.)

Note that storing only read_idx and write_idx and getting rid of size instead would not work, since there are two different situations where read_idx == write_idx: when the buffer is empty, and when it is full. Storing size explicitly lets you differentiate between those two cases, since an empty FIFO has size == 0 while a full one has size == FIFO_CAPACITY.


Also, since you say this code is intended for a microcontroller, it's worth keeping in mind that division and remainder can be rather slow operations on low-end microcontrollers.

If your FIFO capacity is always a power of two (like it is in your example code), and given that you're working with unsigned integers, it's likely that the compiler will be able to optimize the % FIFO_CAPACITY operation into a bitwise AND, in which case your current code is probably fine. Otherwise, however, you may want to consider manually replacing the remainder operation with a comparison, something like this:

fn increment_index(idx: usize) -> usize {
    if idx + 1 < FIFO_CAPACITY {
        idx + 1
    } else {
        0
    }
}

The compiler will probably not be able to make this optimization automatically, since this function will behave differently than your original if idx >= FIFO_CAPACITY. We know that can never actually happen in your code, but the compiler (probably) isn't that smart.

Of course, you should probably benchmark both versions of the code, and/or to examine the generated assembly code, to see what the compiler actually does with them and whether it actually makes any difference in practice. You might even consider using different code paths for the cases where FIFO_CAPACITY is a power of two, since in those cases using bit masking is probably faster.


Also, to combine the two optimizations above, I would recommend replacing

self.read_idx = Fifo::increment_index(self.read_idx);

in your pop method with something like

self.read_idx = Fifo::wrap_around(self.read_idx + 1);

where the wrap_around function would look e.g. like this:

fn wrap_around(idx: usize) -> usize {
    if idx < FIFO_CAPACITY {
        idx
    } else {
        idx - FIFO_CAPACITY
    }
}

This returns the same results as idx % FIFO_CAPACITY for all values of idx < 2*FIFO_CAPACITY, and thus allows you to also replace:

let write_idx = (self.read_idx + self.size) % FIFO_CAPACITY;

with:

let write_idx = Fifo::wrap_around(self.read_idx + self.size);

Addendum: It turns out that godbolt.org supports Rust, so we can do some experiments to see how these changes affect the generated assembly. Unfortunately, it seems that for now the only supported target is x86, but that should be enough for a general idea of what the compiler is likely to do.

First, let's take a look at your original code, with FIFO_CAPACITY set to 32. I'll compile it with the -O switch, which enables a moderate level of compiler optimization.

With these settings, the code for your increment_index compiles down to just:

    add     eax, 1
    and     eax, 31

That's one increment and one bitwise AND with FIFO_CAPACITY - 1, which is about as good as we can possibly expect. So, as expected, for power-of-two buffer sizes, your increment_index is just about as efficient as it can be.

But wait! Looking at the surrounding code, there is something slightly suboptimal. For example, here's the assembly code for the else branch of push (the code for pop is pretty similar):

    mov     eax, esi
    mov     rsi, qword ptr [rdi + 16]
    cmp     rsi, 31
    ja      .LBB1_5
    mov     byte ptr [rdi + rsi + 24], al

    mov     eax, dword ptr [rdi + 16]
    add     eax, 1
    and     eax, 31
    mov     qword ptr [rdi + 16], rax

    add     qword ptr [rdi], 1

Here, the first five lines actually write the item into the buffer, the next four increment write_idx, and the last line increments size. I've added blank lines between these for readability.

Can you spot something that could be optimized here?

Well, first of all, the code is loading self.write_idx twice from memory (at [rdi + 16]), instead of reusing the value already in rsi, which AFAICT is really just a missed optimization that the compiler could have done but for some reason wasn't smart enough to do. (Presumably the mix of 64-bit and 32-bit operations somehow confused it.)

But what caught my eye was the cmp rsi, 31 / ja .LBB1_5 pair. What is it? It's the automatic array bounds check inserted by the Rust compiler! Of course, we know that it's redundant here, since the index can never point outside the buffer, but the compiler doesn't. It can't know it, really, since it's only looking at one function at a time, and so for all it knows, some other code might have set write_idx to any random value before the call to push.

One thing we could do to fix that would be to add a redundant

self.write_idx = self.write_idx % FIFO_CAPACITY;

at the beginning of the else block. Hopefully, the compiler should be smart enough to optimize that to a single and rsi, 31 and then skip the bounds check. Let's see if it's smart enough to do that:

    mov     rcx, qword ptr [rdi + 16]
    mov     edx, ecx
    and     edx, 31
    mov     byte ptr [rdi + rdx + 24], sil

    add     ecx, 1
    and     ecx, 31
    mov     qword ptr [rdi + 16], rcx

    add     rax, 1
    mov     qword ptr [rdi], rax

Yes, yes it is! No more comparisons, no more branches, just a simple linear chain of movs, ands and adds. And this time the compiler is also a lot smarter about memory accesses, loading write_idx from memory only once and also reusing the value of size that it loaded into rax earlier for the buffer_full test.

But it still needs to do a redundant second and after the increment. While the actual cost of that and is probably negligible, it would still be nice to get rid of it. Can we? Yes, simply by not reducing the index modulo FIFO_CAPACITY after the increment. After all, we know it's going to get wrapped around on the next call anyway! With all these changes, here's what our optimized push looks like:

pub fn push(&mut self, item: u8) -> Result<(), &'static str> {
    if self.buffer_full() {
       Err("Buffer full.")        
    } else {
        self.write_idx = self.write_idx % FIFO_CAPACITY;
        self.buffer[self.write_idx] = item;

        self.write_idx = self.write_idx + 1;
        self.size = self.size + 1;

        Ok(())
    }
}

Let's try compiling it:

    mov     rcx, qword ptr [rdi + 16]
    and     ecx, 31
    mov     byte ptr [rdi + rcx + 24], sil

    add     rcx, 1
    mov     qword ptr [rdi + 16], rcx

    add     rax, 1
    mov     qword ptr [rdi], rax

Curiously, not only is the redundant and gone, but the compiler now also realized that it doesn't actually need to copy the value of write_idx into both ecx and edx, when just one register will do. So this simple change saved us not one but two redundant instructions per push / pop.

But wait, what about my suggestion above of getting rid of write_idx entirely? Of course, it's mostly a memory usage optimization, but at least it would be nice to check that it doesn't make the generated assembly code too much less efficient.

Well, the code is already up there, so let's try compiling it:

    mov     ecx, dword ptr [rdi + 8]
    add     ecx, eax
    and     ecx, 31
    mov     byte ptr [rdi + rcx + 16], sil

    add     rax, 1
    mov     qword ptr [rdi], rax

Well, that looks even better than before! We save one memory store per push, since we now only need to increment size. Of course, it would be nice to see if other target architectures produce similar results, and maybe run some benchmarks. But I'd definitely expect these changes (getting rid of write_idx entirely, and reducing read_idx modulo FIFO_CAPACITY at the beginning of pop, rather than at the end) to give a real and measurable, if possibly minor performance improvement.


OK, but what about non-power-of-two buffer sizes? Well, ideally, you'd probably want to avoid them entirely, if you want to maximize performance. But what if you just had to use a buffer capacity that wasn't a power of two?

Well, here's what happens to the same else block of my optimized push with FIFO_CAPACITY set to 37:

    mov     rcx, qword ptr [rdi + 8]
    add     rcx, r8

    movabs  rdx, -2492803253203993461
    mov     rax, rcx
    mul     rdx
    shr     rdx, 5
    lea     rax, [rdx + 8*rdx]
    lea     rax, [rdx + 4*rax]
    sub     rcx, rax

    mov     byte ptr [rdi + rcx + 16], sil

    add     r8, 1
    mov     qword ptr [rdi], r8

Wait, what?! OK, the first two instructions are clear enough, at least if we look a few lines back and see that r8 contains the value of self.size, while [rdi + 8] is the address of self.read_idx. And the last three instructions (for actually storing the item in the buffer and incrementing size) are pretty much the same as before. But what the heck happens in between?

Well, what happens is reciprocal multiplication. Basically, since division is one of the slowest arithmetic operations on any modern CPU, compilers use clever arithmetic tricks to replace division (and modulo) by a constant with a combination of multiplication and shifts.

So, basically, instead of calculating idx % 37 directly, the assembly code generated by the compiler effectively calculates

idx - ((0xDD67C8A60DD67C8B * idx) >> 5) * 37

using unsigned 64-bit arithmetic, where the constant 0xDD67C8A60DD67C8B == -2492803253203993461 is the 64-bit reciprocal of 37 (and the final multiplication by 37 before the subtraction is itself optimized into two lea instructions).

(And yes, you can verify that something similar happens with your original code as well.)

OK, so that's very clever of the compiler. But surely we can do better, especially given that multiplication can also be pretty slow on microcontrollers?

Well, let's try the alternative wrap-around implementation I suggested above:

pub fn push(&mut self, item: u8) -> Result<(), &'static str> {
    if self.buffer_full() {
       Err("Buffer full.")        
    } else {
        let write_idx = Fifo::wrap_around(self.read_idx + self.size);
        self.buffer[write_idx] = item;
        self.size = self.size + 1;
        Ok(())
    }
}

pub fn pop(&mut self) -> Option<u8> {
    if self.size == 0 {
        None
    } else {
        self.read_idx = Fifo::wrap_around(self.read_idx); 
        let result = self.buffer[self.read_idx];
        self.read_idx = self.read_idx + 1;
        self.size = self.size - 1;
        Some(result)        
    }
}

fn wrap_around(idx: usize) -> usize {
    if idx < FIFO_CAPACITY {
        idx
    } else {
        idx - FIFO_CAPACITY
    }
}

Compiling it, this is what we get for, once more, the else block of push (which seems to be a pretty good representative section; if you want to see what happens to pop, check out the godbolt.org links yourself):

    mov     rcx, qword ptr [rdi + 8]

    lea     rdx, [rcx + rax]
    cmp     rdx, 37
    lea     rax, [rcx + rax - 37]
    cmovb   rax, rdx

    cmp     rax, 36
    ja      .LBB1_5
    mov     byte ptr [rdi + rax + 16], sil

    add     qword ptr [rdi], 1

Yeah, that's a bit more like it. Instead of all that reciprocal multiplication stuff, we just have a comparison and a conditional register move (cmovb) to select between load_idx + size and load_idx + size - FIFO_CAPACITY without branches.

Alas, the array bounds check is also back, since the compiler can no longer be sure that a single subtraction of FIFO_CAPACITY is really enough to reduce the index into the valid range. Unfortunately, I don't think there's any way to avoid that, but even with that redundant never-taken branch, this code should still be faster than the reciprocal multiplication, at least on CPUs without a fast hardware multiplier.

Specking of bounds checks, the implementation above has a non-trivial edge case that should be well tested: when the buffer is full, it's possible for both self.read_idx and self.size to simultaneously equal FIFO_CAPACITY, potentially causing Fifo::wrap_around(self.read_idx + self.size) not to be a valid index into the buffer. In fact, this will trivially happen after pushing FIFO_CAPACITY items into the buffer without popping any. Fortunately, that situation can only occur when the buffer is full, in which case pushing more items will fail anyway, so the invalid array access will never actually be attempted. But it definitely should be throughly tested and documented.

(Also, it seems that in this particular case the compiler forgot to save the value of self.size before clobbering the rax register, so it has to reload it again for the increment. At least it should be cached.)


Unfortunately, while the wrap_around implementation above is more efficient than just using idx % FIFO_CAPACITY for non-power-of-two buffer sizes, it's less efficient when the capacity is a power of two. But with a bit of cleverness and trust in the compiler's optimization (specifically, constant folding and dead code elimination) skills, we can actually get the best of both worlds:

fn wrap_around(idx: usize) -> usize {
    if FIFO_CAPACITY & (FIFO_CAPACITY-1) == 0 {
        idx % FIFO_CAPACITY  // capacity is a power of 2
    } else if idx < FIFO_CAPACITY {
        idx
    } else {
        idx - FIFO_CAPACITY
    }
}

The expression FIFO_CAPACITY & (FIFO_CAPACITY-1) evaluates to zero if and only if FIFO_CAPACITY is a power of two (or zero, but that's not a valid capacity anyway). Since FIFO_CAPACITY is a constant, the compiler will evaluate it at compile time, and optimize away the branch that isn't taken. Thus, we get both highly efficient code for power-of-two sizes, and slightly slower but still pretty well optimized assembly for sizes that are not powers of two.

@RubberDuck 2019-01-07 21:32:04

The write index is not a function of read index. If I write twice, but never pop, the write index moves while the read index stays stationary.

@RubberDuck 2019-01-07 21:33:22

I’m also extremely dubious of the “optimization” you propose. I’m willing to bet the assembly is nearly identical.

@Bergi 2019-01-07 21:39:49

@RubberDuck The write index is a function of read index and size.

@return true 2019-01-07 22:59:20

Its more a question of consistency (as the invariance states in the answer always holds) as a question of space optimization.

@hellow 2019-01-08 07:29:49

You can choose the target architecture by using --target=arm-unknown-linux-gnueabi. To get a list of known targets use --print target-list.

@RubberDuck 2019-01-08 12:50:16

I appreciate the effort, but it does nothing to convince me. A) This makes the code harder to read, and until/unless I use it in a real situation where I can prove this is an actual bottleneck, this makes the code worse. B) I'm not targeting x86_64, so it's likely the assembly would be different for my target. C) If I was interested in this kind of micro optimization, I would inline the assembly code.

@hellow 2019-01-07 13:14:12

Your code looks pretty decent. My two cents:

  1. Implement Default

    impl Default for Fifo {
        fn default() -> Fifo {
            Fifo {
                size: 0,
                read_idx: 0,
                write_idx: 0,
                buffer: [0; FIFO_CAPACITY],
            }
        }
    }
    

    Then you could simplify your new to:

    pub fn new() -> Fifo {
        Fifo::default()
    }
    
  2. Simplify expressions

    L27: self.size = self.size + 1 will become self.size += 1
    L40: self.size = self.size - 1 will become self.size -= 1

  3. Replace str error with enums

    pub enum FifoError {
        FifoFull,
    }
    
    pub fn push(&mut self, item: u8) -> Result<(), FifoError> {
        if self.buffer_full() {
           Err(FifoError::FifoFull)
        } else {
            ...
    }
    
  4. Add assert_eq!(None, buffer.pop()); to the end of every test where feasible, e,g.

    • pop_item_that_was_pushed_to_buffer
    • popping_returns_first_pushed_first
    • pop_beyond_write_index_continuing_on_works
    • buffer_wraps_around

For further exercices I would recommend:

  1. Implement Iterator/IntoIterator/FromIterator
  2. Next, implement Debug, which is fairly easy (Hint: Take a look at the implementation of Debug for slice)
  3. Make it accept a generic type

@RubberDuck 2019-01-07 13:20:29

Nice review! I think I might actually implement a custom error type instead of the enum. Would you mind explaining the benefit of implementing Default for the struct? I think I understand what it does, but don't quite see how that helps me in this particular case.

@hellow 2019-01-07 13:35:39

Some traits or methods require Default, e.g. Vec::resize_default, which is a more convenient way of resize.

@hellow 2019-01-07 13:40:36

It's getting even more fancy if you do something like this

@hellow 2019-01-07 13:45:36

Quoting Matthieu: "There's no introspection in Rust, you can't try to invoke a constructor with no parameter and hope it works in generic code: a trait MUST describe the available functionality. Using Default is the way to ensure that whoever wishes to use the struct where Default is required will be able to without wrapping it."

Related Questions

Sponsored Content

2 Answered Questions

[SOLVED] Circular / Cyclic Buffer implementation

3 Answered Questions

[SOLVED] Elegant Circular Buffer

2 Answered Questions

[SOLVED] Circular Buffer Implementation

1 Answered Questions

[SOLVED] Implementation of IPC circular buffer

1 Answered Questions

[SOLVED] C++ Circular buffer through Circular iterator

2 Answered Questions

[SOLVED] Circular Buffer C++ Implementation

  • 2017-05-25 02:42:32
  • RSharma2017
  • 1998 View
  • 8 Score
  • 2 Answer
  • Tags:   c++ circular-list

3 Answered Questions

[SOLVED] Circular Buffer

1 Answered Questions

[SOLVED] Efficient Ring Buffer (FIFO)

1 Answered Questions

[SOLVED] Multi threaded circular buffer

1 Answered Questions

[SOLVED] Circular Buffer implementation in Ruby

Sponsored Content