By RubberDuck


2019-01-07 12:45:31 8 Comments

I was bored the other day and got to wondering how I would implement a circular FIFO buffer in Rust. My goal was to create an implementation I could use on a bare metal microcontroller. This means no dynamic allocation and a fixed size known at compile time, so I used a primitive fixed size array instead of a vector as a backing store.

My next pass at this will be to make the type stored in the FIFO buffer generic and to find a way to specify the capacity, but I wanted to get some feedback before doing so. Am I missing any test cases? What can I improve here?

src/lib.rs

const FIFO_CAPACITY: usize = 32;

pub struct Fifo {
    size: usize,
    read_idx: usize,
    write_idx: usize,
    buffer: [u8; FIFO_CAPACITY]
}

impl Fifo {
    pub fn new() -> Fifo {
        Fifo {
            size: 0,
            read_idx: 0,
            write_idx: 0,
            buffer: [0; FIFO_CAPACITY]
        }
    }

    pub fn push(&mut self, item: u8) -> Result<(), &'static str> {
        if self.buffer_full() {
           Err("Buffer full.")        
        } else {
            self.buffer[self.write_idx] = item;

            self.write_idx = Fifo::increment_index(self.write_idx);
            self.size = self.size + 1;

            Ok(())
        }
    }

    pub fn pop(&mut self) -> Option<u8> {
        if self.size == 0 {
            None
        } else {
            let result = self.buffer[self.read_idx];

            self.read_idx = Fifo::increment_index(self.read_idx); 
            self.size = self.size - 1;

            Some(result)        
        }
    }

    pub fn buffer_full(&self) -> bool {
        self.size == FIFO_CAPACITY
    }

    fn increment_index(idx: usize) -> usize {
        (idx + 1) % FIFO_CAPACITY
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn pop_item_that_was_pushed_to_buffer() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(5).is_ok());
        let pop_result = buffer.pop();
        assert_eq!(Some(5), pop_result);
    }

    #[test]
    fn popping_empty_buffer_returns_none() {
        let mut buffer = Fifo::new();
        assert_eq!(None, buffer.pop());
    }

    #[test]
    fn popping_returns_first_pushed_first() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(1).is_ok());
        assert!(buffer.push(2).is_ok());

        assert_eq!(Some(1), buffer.pop());
        assert_eq!(Some(2), buffer.pop());
    }

    #[test]
    fn pop_beyond_write_index_returns_none() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(1).is_ok());
        assert_eq!(Some(1), buffer.pop());
        assert_eq!(None, buffer.pop());
    }

    #[test]
    fn pop_beyond_write_index_continuing_on_works() {
        let mut buffer = Fifo::new();
        assert!(buffer.push(1).is_ok());
        assert_eq!(Some(1), buffer.pop());
        assert_eq!(None, buffer.pop());

        assert!(buffer.push(2).is_ok());
        assert_eq!(Some(2), buffer.pop());
    }

    #[test]
    fn buffer_wraps_around() {
        let mut buffer = Fifo::new();
        let capacity = FIFO_CAPACITY as u8;
        for x in 0..=(capacity * 3) {
            assert!(buffer.push(x).is_ok());
            assert_eq!(Some(x), buffer.pop());
        }
    }

    #[test]
    fn push_fails_if_buffer_is_full() {
        let mut buffer = Fifo::new();
        let capacity = FIFO_CAPACITY as u8;

        for x in 0..(capacity) {
            assert!(buffer.push(x).is_ok());
        }

        assert!(buffer.push(capacity + 1).is_err())
    }
}

2 comments

@Ilmari Karonen 2019-01-07 19:31:44

As a generic remark, note that your FIFO always satisfies the invariant:

write_idx == (read_idx + size) % FIFO_CAPACITY

Thus, if you wanted to save some space, you could get rid of the write_idx property entirely and rewrite your push method as:

pub fn push(&mut self, item: u8) -> Result<(), &'static str> {
    if self.buffer_full() {
        Err("Buffer full.")
    } else {
        let write_idx = Fifo::wrap_around(self.read_idx + self.size);
        self.buffer[write_idx] = item;

        self.size = self.size + 1;

        Ok(())
    }
}

fn wrap_around(idx: usize) -> usize {
    idx % FIFO_CAPACITY
}

Note that storing only read_idx and write_idx and getting rid of size instead would not work, since there are two different situations where read_idx == write_idx: when the buffer is empty, and when it is full. Storing size explicitly lets you differentiate between those two cases, since an empty FIFO has size == 0 while a full one has size == FIFO_CAPACITY.

I would also replace the line

self.read_idx = Fifo::increment_index(self.read_idx);

in your pop method with

self.read_idx = Fifo::wrap_around(self.read_idx + 1);

and get rid of the increment_index method entirely, since it's kind of redundant with the more general-purpose wrap_around method above.


One interesting side effect of the push rewrite I suggested above is that (as seen below) it allows the compiler omit the array bounds check, since it can tell that the index returned by the wrap_around method is always within the bounds of the array. We can enable the same optimization for pop by moving the wrap_around call before the array access, e.g. like this:

pub fn pop(&mut self) -> Option<u8> {
    if self.size == 0 {
        None
    } else {
        self.read_idx = Fifo::wrap_around(self.read_idx); 
        let result = self.buffer[self.read_idx];
        self.read_idx = self.read_idx + 1;
        self.size = self.size - 1;
        Some(result)        
    }
}

Note that, with this change, it becomes possible for self.read_idx to be equal to FIFO_CAPACITY after a call to pop. But that doesn't matter, since any values there will still be correctly wrapped before being used to access the buffer (but see the note at the end of the next section below!).


Also, since you say this code is intended for a microcontroller, it's worth keeping in mind that division and remainder can be rather slow operations on low-end microcontrollers.

If your FIFO capacity is always a power of two (like it is in your example code), and given that you're working with unsigned integers, it's likely that the compiler will be able to optimize the idx % FIFO_CAPACITY operation into a bitwise AND, in which case your current code is probably optimal. Otherwise, however, you may want to consider manually replacing the remainder operation with a comparison, something like this:

fn wrap_around(idx: usize) -> usize {
    if idx < FIFO_CAPACITY {
        idx
    } else {
        idx - FIFO_CAPACITY
    }
}

The compiler will not be able to make this optimization automatically, since this function will behave differently than your original if idx >= 2 * FIFO_CAPACITY. We know that can never actually happen in this code, but the compiler (probably) isn't that smart.

Unfortunately, this version of wrap_around is more efficient than the original for non-power-of-two buffer sizes, it's likely to be less efficient when the capacity is a power of two. But with a bit of cleverness and trust in the compiler's optimization (specifically, constant folding and dead code elimination) skills, we can actually get optimal code for both cases, like this:

fn wrap_around(idx: usize) -> usize {
    if Fifo::is_power_of_2(FIFO_CAPACITY) {
        idx & (FIFO_CAPACITY - 1)  // faster when capacity is a power of 2
    } else if idx < FIFO_CAPACITY {
        idx
    } else {
        idx - FIFO_CAPACITY
    }
}

fn is_power_of_2(num: usize) -> bool {
    num & (num - 1) == 0
}

The expression num & (num - 1) evaluates to zero if and only if num is a power of two (or zero, but that's not a valid capacity anyway). Since FIFO_CAPACITY is a constant, the compiler will evaluate Fifo::is_power_of_2(FIFO_CAPACITY) at compile time, and optimize away the branch that isn't taken. Thus, we get both highly efficient code for power-of-two sizes, and nearly as fast code for sizes that are not powers of two.

Ps. The combination of all these optimizations does create a somewhat subtle edge case: with the optimized pop implementation, it's possible for both self.read_idx and self.size to equal FIFO_CAPACITY when the buffer is full, potentially causing Fifo::wrap_around(self.read_idx + self.size) not to be a valid index into the buffer if the buffer size is not a power of two. (This can happen e.g. after pushing FIFO_CAPACITY items into a new FIFO, popping them all off and then pushing FIFO_CAPACITY more items again.) Fortunately, this can only occur when the buffer is full, in which case pushing more items will fail anyway, so the invalid array access will never actually be attempted. (And of course we're still using Rust, so the compiler does add bounds checks to make sure of that!) But it's a case that should definitely be tested.


Addendum: It turns out that godbolt.org supports Rust, so we can do some experiments to see how these changes affect the generated assembly.

First, let's take a look at your original code, with FIFO_CAPACITY set to 32. I'll compile it with the -O switch, which enables a moderate level of compiler optimization, and with --target=arm-unknown-linux-gnueabi to produce ARM instead of x86 assembly (thanks, hellow!).

Here's what your push and pop methods looks like in ARM assembly, with some manual annotations for readers not so familiar with the syntax. Note how the calls to buffer_full and increment_index have been inlined:

example::Fifo::push:
        ldr     r2, [r0]        @ r2 = self.size
        cmp     r2, #32         @ r2 == FIFO_CAPACITY?

        ldreq   r0, .LCPI1_0    @ Err("Buffer full.")
        moveq   r1, #12
        addeq   r0, pc, r0
        bxeq    lr

        ldr     r2, [r0, #8]    @ r2 = self.write_idx
        cmp     r2, #31         @ [array bounds check]

        addls   r2, r0, r2      @ self.buffer[r2] = r1
        strbls  r1, [r2, #12]

        ldrls   r1, [r0]        @ r1 = self.size
        ldrls   r2, [r0, #8]    @ r2 = self.write_idx
        addls   r1, r1, #1      @ r1 = r1 + 1
        strls   r1, [r0]        @ self.size = r1

        addls   r1, r2, #1      @ r1 = r2 + 1
        andls   r1, r1, #31     @ r1 = r1 & (FIFO_CAPACITY-1)
        strls   r1, [r0, #8]    @ self.write_idx = r1

        movls   r1, #0          @ Ok(())
        movls   r0, #0
        bxls    lr

        push    {r11, lr}       @ [array bounds check failed]
        ldr     r0, .LCPI1_1
        mov     r1, r2
        mov     r2, #32
        add     r0, pc, r0
        bl      core::panicking::panic_bounds_check

example::Fifo::pop:
        ldr     r3, [r0]        @ r3 = self.size
        cmp     r3, #0          @ if (r3 == 0) goto .LBB2_2 
        beq     .LBB2_2

        ldr     r2, [r0, #4]    @ r2 = self.read_idx
        cmp     r2, #31         @ [array bounds check]

        addls   r1, r0, r2      @ r1 = self.buffer[r2] (interleaved...)

        addls   r2, r2, #1      @ r2 = r2 + 1
        subls   r3, r3, #1      @ r3 = r3 - 1
        andls   r2, r2, #31     @ r2 = r2 & (FIFO_CAPACITY-1)

        ldrbls  r1, [r1, #12]   @ r1 = self.buffer[r2] (...interleaved)

        strls   r3, [r0]        @ self.size = r3
        strls   r2, [r0, #4]    @ self.read_idx = r2

        movls   r0, #1          @ Some(r1)
        bxls    lr

        push    {r11, lr}       @ [array bounds check failed]
        ldr     r0, .LCPI2_0
        mov     r1, r2
        mov     r2, #32
        add     r0, pc, r0
        bl      core::panicking::panic_bounds_check

.LBB2_2:
        mov     r0, #0          @ None
        bx      lr

In general, this doesn't look too bad. For push there are four loads (one of which the compiler could have optimized out, but didn't), three stores and no branches (due to the use of conditional code instead), while pop has three loads, two stores and one branch (for the self.size == 0 case) that the compiler for some reason didn't replace with conditional code. There's no particularly slow arithmetic (since the % operation was optimized into a bitwise &), and while the unnecessary array bounds checks bloat the code a little bit, their effect on execution time should be negligible.

Now let's see how the same code would look with the modifications I suggested:

example::Fifo::push:
        ldr     r2, [r0]        @ r2 = self.size
        cmp     r2, #32         @ r2 == FIFO_CAPACITY?

        ldreq   r0, .LCPI1_0    @ Err("Buffer full.")
        moveq   r1, #12
        addeq   r0, pc, r0
        bxeq    lr

        ldr     r3, [r0, #4]    @ r3 = self.read_idx
        add     r3, r3, r2      @ r3 = r3 + r2
        and     r3, r3, #31     @ r3 = r3 & (FIFO_CAPACITY-1)

        add     r3, r0, r3      @ self.buffer[r3] = r1
        strb    r1, [r3, #8]

        add     r1, r2, #1      @ r1 = r2 + 1
        str     r1, [r0]        @ self.size = r1

        mov     r1, #0          @ Ok(())
        mov     r0, #0
        bx      lr

example::Fifo::pop:
        ldr     r2, [r0]        @ r2 = self.size
        cmp     r2, #0          @ if (r2 == 0) goto .LBB2_2
        beq     .LBB2_2

        ldr     r1, [r0, #4]    @ r1 = self.read_idx
        sub     r2, r2, #1      @ r2 = r2 - 1
        and     r3, r1, #31     @ r3 = r1 & (FIFO_CAPACITY-1)

        add     r1, r0, r3      @ r1 = self.buffer[r3] (interleaved...)
        add     r3, r3, #1      @ r3 = r3 + 1
        ldrb    r1, [r1, #8]    @ r1 = self.buffer[r3] (...interleaved)

        stm     r0, {r2, r3}    @ self.size = r2, self.read_idx = r3

        mov     r0, #1          @ Some(r1)
        bx      lr

.LBB2_2:
        mov     r0, #0          @ None
        bx      lr

The first six instructions in push (which implement the buffer fullness check) are exactly the same. The rest, however, looks a bit simpler: now we have only two loads and two stores, and the unnecessary array bounds check is also gone (because the compiler can now tell that the wrapped index can never overflow the array).

In the pop method, the self.size == 0 check is compiled into the exact same code as before (still with an explicit branch, for some reason), and we still have the same number of loads and stores (although this time the compiler managed to merge the two stores into a single stm instruction). Here, as well, avoiding the array bounds check makes the code shorter and simpler.


OK, but what about non-power-of-two buffer sizes? Well, ideally, you'd probably want to avoid them entirely, if you want to maximize performance. But what if you just had to use a buffer capacity that wasn't a power of two?

Well, here's what the increment_index call in your push method compiles to with FIFO_CAPACITY set to 37:

        addls   r1, r2, #1              @ r1 = self.write_idx + 1
        ldrls   r2, .LCPI1_0            @ r2 = 3134165325 (!)
        umullls r2, r3, r1, r2          @ r3 = (r1 * r2) >> 32 
        subls   r2, r1, r3              @ r2 = r1 - r3
        addls   r2, r3, r2, lsr #1      @ r2 = r3 + (r2 >> 1)
        movls   r3, #37                 @ r3 = FIFO_CAPACITY
        lsrls   r2, r2, #5              @ r2 = r2 >> 5
        mulls   r2, r2, r3              @ r2 = r2 * r3
        subls   r1, r1, r2              @ r1 = r1 - r2

.LCPI1_0:
        .long   3134165325

Wait, what the heck is going on here?

Well, what's going on is reciprocal multiplication. Basically, since division is one of the slowest arithmetic operations on any modern CPU, compilers use clever arithmetic tricks to replace division (and modulo) by a constant with a combination of multiplications and shifts.

So, basically, instead of calculating idx = idx % 37 directly, the assembly code generated by the compiler effectively calculates

tmp = (3134165325 * idx) >> 32;
avg = tmp + ((idx - tmp) >> 1);
idx = idx - (avg >> 5) * 37

using unsigned 32-bit arithmetic (except with the first multiplication calculated as a 64-bit result, the lower half of which is immediately discarded). If you want, you can verify that this indeed produces the same results as the normal remainder calculation!

(It may be illustrative to do the calculation step by step for idx = 37. You'll find that tmp works out to 27, and their average avg to 32, which when shifted right by 5 bits yields 1. If idx = 36, however, then tmp = 26 and avg = 31, which yields 0 when shifted right. Clever!)

Meanwhile, however, in my optimized version the equivalent code (sans increment) compiles to just this:

        subs    r2, r3, #37     @ r2 = r3 - 37
        movlo   r2, r3          @ if (r2 < 0) r2 = r3

Not nearly as clever and enigmatic, perhaps, but a lot simpler and faster.

@RubberDuck 2019-01-07 21:32:04

The write index is not a function of read index. If I write twice, but never pop, the write index moves while the read index stays stationary.

@RubberDuck 2019-01-07 21:33:22

I’m also extremely dubious of the “optimization” you propose. I’m willing to bet the assembly is nearly identical.

@Bergi 2019-01-07 21:39:49

@RubberDuck The write index is a function of read index and size.

@return true 2019-01-07 22:59:20

Its more a question of consistency (as the invariance states in the answer always holds) as a question of space optimization.

@hellow 2019-01-08 07:29:49

You can choose the target architecture by using --target=arm-unknown-linux-gnueabi. To get a list of known targets use --print target-list.

@RubberDuck 2019-01-08 12:50:16

I appreciate the effort, but it does nothing to convince me. A) This makes the code harder to read, and until/unless I use it in a real situation where I can prove this is an actual bottleneck, this makes the code worse. B) I'm not targeting x86_64, so it's likely the assembly would be different for my target. C) If I was interested in this kind of micro optimization, I would inline the assembly code.

@hellow 2019-01-07 13:14:12

Your code looks pretty decent. My two cents:

  1. Implement Default

    impl Default for Fifo {
        fn default() -> Fifo {
            Fifo {
                size: 0,
                read_idx: 0,
                write_idx: 0,
                buffer: [0; FIFO_CAPACITY],
            }
        }
    }
    

    Then you could simplify your new to:

    pub fn new() -> Fifo {
        Fifo::default()
    }
    
  2. Simplify expressions

    L27: self.size = self.size + 1 will become self.size += 1
    L40: self.size = self.size - 1 will become self.size -= 1

  3. Replace str error with enums

    pub enum FifoError {
        FifoFull,
    }
    
    pub fn push(&mut self, item: u8) -> Result<(), FifoError> {
        if self.buffer_full() {
           Err(FifoError::FifoFull)
        } else {
            ...
    }
    
  4. Add assert_eq!(None, buffer.pop()); to the end of every test where feasible, e,g.

    • pop_item_that_was_pushed_to_buffer
    • popping_returns_first_pushed_first
    • pop_beyond_write_index_continuing_on_works
    • buffer_wraps_around

For further exercices I would recommend:

  1. Implement Iterator/IntoIterator/FromIterator
  2. Next, implement Debug, which is fairly easy (Hint: Take a look at the implementation of Debug for slice)
  3. Make it accept a generic type

@RubberDuck 2019-01-07 13:20:29

Nice review! I think I might actually implement a custom error type instead of the enum. Would you mind explaining the benefit of implementing Default for the struct? I think I understand what it does, but don't quite see how that helps me in this particular case.

@hellow 2019-01-07 13:35:39

Some traits or methods require Default, e.g. Vec::resize_default, which is a more convenient way of resize.

@hellow 2019-01-07 13:40:36

It's getting even more fancy if you do something like this

@hellow 2019-01-07 13:45:36

Quoting Matthieu: "There's no introspection in Rust, you can't try to invoke a constructor with no parameter and hope it works in generic code: a trait MUST describe the available functionality. Using Default is the way to ensure that whoever wishes to use the struct where Default is required will be able to without wrapping it."

Related Questions

Sponsored Content

2 Answered Questions

[SOLVED] Circular / Cyclic Buffer implementation

3 Answered Questions

[SOLVED] Elegant Circular Buffer

2 Answered Questions

[SOLVED] Circular Buffer Implementation

1 Answered Questions

[SOLVED] Implementation of IPC circular buffer

1 Answered Questions

[SOLVED] C++ Circular buffer through Circular iterator

2 Answered Questions

[SOLVED] Circular Buffer C++ Implementation

  • 2017-05-25 02:42:32
  • RSharma2017
  • 2826 View
  • 9 Score
  • 2 Answer
  • Tags:   c++ circular-list

3 Answered Questions

[SOLVED] Circular Buffer

1 Answered Questions

[SOLVED] Efficient Ring Buffer (FIFO)

1 Answered Questions

[SOLVED] Multi threaded circular buffer

1 Answered Questions

[SOLVED] Circular Buffer implementation in Ruby

Sponsored Content