Rust — write gunzip from scratch 10
In this series, we will be writing gunzip decompressor from scratch in Rust. We want to write it ourselves not only to learn Rust but also to understand how it .gz compression works under the hood. For full source code, check out this Github repo. You can find all articles of the series below
- part 1:
main()function and skeletal structure - part 2:
bitreadmodule for reading bits from a byte stream - part 3:
gzipheader & footer for parsing the metadata and checksum - part 4:
inflatefor block type 0 (uncompressed) data - part 5:
codebookandhuffman_decodermodules for decoding Huffman codes - part 6:
lz77andsliding_windowmodules for decompressing LZ77-encoded data - part 7:
inflatefor block type 1 and 2 (compressed) data using fixed or dynamic Huffman codes - part 8:
checksum_writemodule for verifying the decompressed data - part 9: performance optimization
- part 10: multithread support
- part 11: streaming support
- part 12: memory optimization
- part 13: bug fix to reject non-compliant
.gzfile
Today, we are going to add multi-threading support. We live in the world where it is almost impossible to find a computer that has only a single-core CPU — heck, even smartwatches released back in 2020 were equipped with dual-cores! This is a perfect opportunity to learn about fearless concurrency in Rust.
First, let’s think about how we can benefit from having two threads. The inherent design of .gz and deflate can’t be efficiently parallelized, because we need to read and process data in a sequential manner. However, typical bottleneck in a computer lies in its IO operations, i.e., input and output. So, what we can do is to create two threads, one for read and another for write. Technically, we could create another thread for data processing in between, but that would make our design too complex, so we will stick with just two threads in parallel. The speed up will be far less than 2x, but it will still give us a noticeable boost.
Here is how we will re-group our pipeline. The first thread will take care of the pipelines from Read to Inflate, while the second thread will take care of Checksum and Write.

This design is commonly referred to as a single-consumer single-producer, and there is a standard library support for this in Rust. The communication layer between the producer and the consumer is typically called a channel. Let’s first start from the top-level, i.e., the main() function
// replace in src/bin/gunzip.rs
fn usage(program: &str) {
eprintln!("Usage: {} [-t]", program);
eprintln!("\tDecompresses .gz file read from stdin and outputs to stdout");
eprintln!("\t-t: employ two threads");
eprintln!("Example: {} < input.gz > output", program);
}
fn main() -> Result<()> {
let mut args = std::env::args();
let program = args.next().unwrap();
let multithread = match args.next() {
Some(arg) => {
if arg == "-t" {
true
} else {
usage(&program);
std::process::exit(-1);
}
}
None => false,
};
let reader = std::io::stdin();
let writer = std::io::stdout();
gunzip(reader, writer, multithread)
}
We are adding -t option to enable multi-threading. Other than that, the only change will be to pass multithread option to gunzip() function.
Now is time to decide the interface between Producer and Consumer. What Which granularity of data should be transmitted from Producer to Consumer over the channel? To me, the simplest way is to pass the following through the channel
// append to src/lib.rs
pub enum Produce {
Header(Header),
Footer(Footer),
Data(Vec<u8>),
Err(Error),
}
Here, Header and Footer are metadata within each Member, and Data variant contains some chunk of actual decompressed data.
To communicate efficiently between threads in parallel programs, we should select an appropriate size for the data units that are exchanged.
- If the data chunk size is too small, then there will be more overhead in creating, sending, and receiving many small messages. This can reduce the parallelism and increase the latency of the communication. Moreover, small data chunks may not fully utilize the available bandwidth or memory resources of the system.
- If the data chunk size is too large, then there will be more contention and blocking among threads that are waiting for their turn to access or process a large message. This can also reduce the parallelism and increase the latency of the communication. Moreover, large data chunks may exceed the capacity or limit of the system resources, such as buffer size or message queue length.
As a typical compressed .gz file consists of multiple blocks that each carry ~32kB of data, we will transmit each block’s data as Data variant over the channel. This approach won’t work efficiently if a large data (> 10MB) is compressed as a single block, but let’s not worry about it for our simple implementation.
Now, let’s try to write Producer. This will eventually replace Inflate.
// src/producer.rs
use crate::Produce;
use std::io::Read;
use crate::bitread::{BitRead, BitReader};
use crate::sliding_window::SlidingWindow;
pub struct Producer<R: Read> {
reader: BitReader<R>,
state: State, // todo
member_idx: usize,
window: SlidingWindow,
}
impl<R: Read> Iterator for Producer<R> {
type Item = Produce;
fn next(&mut self) -> Option<Self::Item> {
todo!()
}
}
We will implement the rest of it soon, but what is important now is that we let Producer be an iterator. With this, the consumer side logic is pretty straightforward
// append in src/lib.rs
pub mod producer;
use producer::Producer;
// replace in src/lib.rs
pub fn gunzip(read: impl Read + Send + 'static, write: impl Write, multithread: bool) -> Result<()> {
// producer: takes Read and produces data block
// consumer: takes Write and consumes data block
let iter = if multithread {
let (tx, rx) = std::sync::mpsc::channel::<Produce>();
std::thread::spawn(move || {
for produce in Producer::new(read) {
tx.send(produce).expect("error while transmitting produce over the channel");
}
});
Box::new(rx.into_iter()) as Box<dyn Iterator<Item = Produce>>
} else {
Box::new(Producer::new(read))
};
let mut writer = Crc32Writer::new(write);
for xs in iter {
match xs {
Produce::Header(_) => (),
Produce::Footer(footer) => {
let checksum = writer.checksum();
let size = writer.len();
if footer.crc32 != checksum {
return Err(Error::ChecksumMismatch);
}
if footer.size as usize != size & 0xFFFFFFFF {
return Err(Error::SizeMismatch);
}
writer.reset_len();
}
Produce::Data(xs) => {
writer.write_all(&xs)?;
}
Produce::Err(e) => {
return Err(e);
}
}
}
Ok(())
}
All we need is an iterator over Produce. If multithread is set, we use the standard library std::sync::mpsc::channel() function to create a channel over which the producer and consumer can communicate. We then create the Producer in a new thread, continuously sending its Produce to the consumer. If multithread is not set, then we simply use the Producer itself as an iterator. Either way, the consumer sees an iterator and does not need to know about the detailed implementation of the iterator, whether it is spanning over different threads or not.
As for the consumer logic, it writes the Data into Write, which performs running checksum as before. For Footer, it performs the usual data integrity check, which we have already seen before.
Let’s implement Producer. We can borrow most of the code from Inflate with minor modifications. We will first bring in inflate() method
// append to src/producer.rs
use crate::huffman_decoder::HuffmanDecoder;
use crate::error::{Error, Result};
use crate::lz77::{decode, CodeIterator, DecodeResult};
use crate::header::Header;
use crate::footer::Footer;
use crate::codebook::CodeBook;
impl<R: Read> Producer<R> {
fn inflate(
&mut self,
ll_decoder: HuffmanDecoder,
dist_decoder: HuffmanDecoder,
) -> Result<Produce> {
let mut iter = CodeIterator::new(&mut self.reader, ll_decoder, dist_decoder);
let mut done = false;
let mut buf = Vec::new();
loop {
let boundary = self.window.boundary();
let n = match decode(self.window.buffer(), boundary, &mut iter)? {
DecodeResult::Done(n) => {
done = true;
n
}
DecodeResult::WindowIsFull(n) => n,
DecodeResult::Error(e) => {
return Err(e);
}
};
// accumulate data for the entire block
// this may not be efficient if the block size is large
buf.extend_from_slice(&self.window.write_buffer()[..n]);
self.window.slide(n);
if done {
break;
}
}
Ok(Produce::Data(buf))
}
}
One difference to note from Inflate is that we accumulate the data until the entire block data is read. This is to simplify our iterator logic. A more efficient implementation would be to transmit each partial block data, but requires us to write an iterator function within inflate() method. This is left as an exercise for the readers who want to further optimize our multithreading pipeline.
Next, let’s implement inflate_block() methods:
// append to Producer impl in src/producer.rs
fn inflate_block0(&mut self) -> Result<Produce> {
self.reader.byte_align();
let len = self.reader.read_bits(16)?;
let nlen = self.reader.read_bits(16)?;
if len ^ nlen != 0xFFFF {
Err(Error::BlockType0LenMismatch)
} else {
let mut buf = vec![0; len as usize];
self.reader.read_exact(&mut buf)?;
self.window.write_buffer()[..len as usize].copy_from_slice(&buf);
self.window.slide(len as usize);
Ok(Produce::Data(buf))
}
}
fn inflate_block1(&mut self) -> Result<Produce> {
let ll_decoder = HuffmanDecoder::new(CodeBook::default_ll());
let dist_decoder = HuffmanDecoder::new(CodeBook::default_dist());
self.inflate(ll_decoder, dist_decoder)
}
fn inflate_block2(&mut self) -> Result<Produce> {
let (ll_decoder, dist_decoder) = self.read_dynamic_codebooks()?;
self.inflate(ll_decoder, dist_decoder)
}
Again, the main logic is the same as before, but we just return the data rather than writing to Write trait. As for the read_dynamic_codebooks, there is no change at all, so simply copy the method from Inflate to Producer.
For next() method as an Iterator, we will, as before, delegate it to next_helper() method to make it easy to propagate errors of Result<> type with ? operators
// replace in src/producer.rs
impl<R: Read> Iterator for Producer<R> {
type Item = Produce;
fn next(&mut self) -> Option<Self::Item> {
self.next_helper().unwrap_or_else(|e| Some(Produce::Err(e)))
}
}
// append to impl Producer in src/producer.rs
fn next_helper(&mut self) -> Result<Option<Produce>> {
let produce = match self.state {
State::Header => {
if !self.reader.has_data_left()? {
return if self.member_idx == 0 {
Err(Error::EmptyInput)
} else {
Ok(None)
};
}
self.state = State::Block;
self.member_idx += 1;
Produce::Header(Header::read(&mut self.reader)?)
}
State::Block => {
let header = self.reader.read_bits(3)?;
if header & 1 == 1 {
self.state = State::Footer;
}
match header & 0b110 {
0b000 => self.inflate_block0()?,
0b010 => self.inflate_block1()?,
0b100 => self.inflate_block2()?,
_ => return Err(Error::InvalidBlockType),
}
}
State::Footer => {
self.state = State::Header;
Produce::Footer(Footer::read(&mut self.reader)?)
}
};
Ok(Some(produce))
}
The main logic is the same. The only added complexity compared to Inflate is that we now need to keep track of the state it is in. We can easily divide into three stages: Header, Data, and Footer corresponding to Produce. That is where State comes in.
// append to src/producer.rs
enum State {
Header,
Block,
Footer,
}
Unfortunately, Rust does not support a generator, and hence we are required to manually manage the states. If a generator is supported, the code would have been much simpler.
Finally, we can implement trivial new() method
// add to impl Producer in src/producer.rs
pub fn new(read: R) -> Self {
Self { reader: BitReader::new(read), state: State::Header, member_idx: 0, window: SlidingWindow::new() }
}
Remember to delete src/inflate.rs file as we don’t need it anymore and remove the lines pub mod inflate and use inflate::Inflate from src/lib.rs file.
Build the code and run with -t option to see how much speed gain you get from multi-threading support. On my system I get about 20% boost!
OK, so this concludes our work on adding a multi-threading support. Though we had to make some code changes, hopefully the overall process was not too difficult. The producer-consumer-iterator pattern allows seamless integration of single-thread and multi-threaded implementations, re-using as much code as possible. In the next series, we will add a streaming support, so stay tuned!
Previous in series: https://techhara.me/posts/2023-10-14_rust—write-gunzip-from-scratch-9-66cbda36cc0c/
Next in series: https://techhara.me/posts/2023-10-18_rust—write-gunzip-from-scratch-11-1b4e7cbcc0ef/