Rust — write gunzip from scratch 11 Link to heading
In this series, we will be writing gunzip decompressor from scratch in Rust. We want to write it ourselves not only to learn Rust but also to understand how it .gz compression works under the hood. For full source code, check out this Github repo. You can find all articles of the series below
- part 1:
main()function and skeletal structure - part 2:
bitreadmodule for reading bits from a byte stream - part 3:
gzipheader & footer for parsing the metadata and checksum - part 4:
inflatefor block type 0 (uncompressed) data - part 5:
codebookandhuffman_decodermodules for decoding Huffman codes - part 6:
lz77andsliding_windowmodules for decompressing LZ77-encoded data - part 7:
inflatefor block type 1 and 2 (compressed) data using fixed or dynamic Huffman codes - part 8:
checksum_writemodule for verifying the decompressed data - part 9: performance optimization
- part 10: multithread support
- part 11: streaming support
- part 12: memory optimization
- part 13: bug fix to reject non-compliant
.gzfile
Today is the final part of the series. What we are going to do today is to add streaming support for our gunzip library. Because we are mostly re-arranging and re-structuring the existing code, I won’t go into much details in terms of what the code does. So, let’s dive in!
Recall the function signature of the library
pub fn gunzip(read: impl Read + Send + 'static, write: impl Write, multithread: bool) -> Result<()>
We ask the user to pass a Read trait and a Write trait. We read the compressed .gz from Read and eagerly write to Write. But what if the user needs, say only the first 10kB of the decompressed data? Or what if the user wants to write to in a controlled manner? With the current function signature, the user is forced to decompress the entire .gz data at once, because it takes Write. The API we are providing has limitation.
A more flexible API from the user perspective is a streaming API where we provide a Decompressor struct that implements Read trait. Here is how bin/gunzip.rs would change with the new streaming API
// replace in bin/gunzip.rs
use gunzip::Decompressor; // todo
// replace in bin/gunzip.rs
fn main() -> Result<()> {
let mut args = std::env::args();
let program = args.next().unwrap();
let multithread = match args.next() {
Some(arg) => {
if arg == "-t" {
true
} else {
usage(&program);
std::process::exit(-1);
}
}
None => false,
};
let reader = std::io::stdin();
let mut writer = std::io::stdout();
let mut decompressor = Decompressor::new(reader, multithread);
std::io::copy(&mut decompressor, &mut writer)?;
Ok(())
}
Here, we provide Decompressor struct that takes in Read trait for reading compressed data. The struct itself implements Read trait for decompressed data so that the user has better control on when and how much of decompressed data to process. Specifically, if multithread is not set, the decompression should be performed in a lazy fashion, i.e., do minimal work for each Read::read() request from the user.
Let’s then write skeletal structure of Decompressor
// append to src/lib.rs
use std::io::Read;
// replace in src/lib.rs
pub mod checksum; // todo
use checksum::Checksum;
use checksum::Crc32Checksum;
// replace gunzip in src/lib.rs
pub struct Decompressor {
iter: Box<dyn Iterator<Item = Produce>>,
buf: Vec<u8>,
begin: usize,
checksum: Crc32Checksum,
}
impl Read for Decompressor {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
todo!()
}
}
First, recall that we won’t be using Write trait, so src/checksum_write.rs is no longer needed. Instead, we will transform it into a simple Checksum without the Write component. Most of the code except for Write will be retained as below
// replace src/checksum_write.rs to src/checksum.rs with the following
use crc32fast::Hasher;
pub trait Checksum {
// update the checksum
fn update(&mut self, xs: &[u8]);
// returns the checksum and resets it
fn checksum(&mut self) -> u32;
// total number of bytes updated so far
fn len(&self) -> usize;
// reset the length counter
fn reset_len(&mut self);
}
pub struct Crc32Checksum {
hasher: Hasher,
n: usize,
}
impl Crc32Checksum {
pub fn new() -> Self {
Self { hasher: Hasher::new(), n: 0 }
}
}
impl Checksum for Crc32Checksum {
fn update(&mut self, xs: &[u8]) {
self.hasher.update(xs);
self.n += xs.len();
}
fn checksum(&mut self) -> u32 {
let hasher = std::mem::replace(&mut self.hasher, Hasher::new());
hasher.finalize()
}
fn len(&self) -> usize {
self.n
}
fn reset_len(&mut self) {
self.n = 0;
}
}
Note that we have buf: Vec<u8> and begin: usize fields in Decompressor. This is because Decompressor will be implemented with an internal buffer, which will significantly simplify our the code. Here, begin points to where unread data starts within buf.
Let’s implement read() method where most of the work is delegated to fill_buf() method
// replace in src/lib.rs
impl Read for Decompressor {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
let mut nbytes = 0;
loop {
let n = buf.len().min(self.buf[self.begin..].len());
buf[..n].copy_from_slice(&self.buf[self.begin..self.begin + n]);
buf = &mut buf[n..];
nbytes += n;
self.begin += n;
if buf.is_empty() || self.fill_buf()? == 0 {
break;
}
}
Ok(nbytes)
}
}
// append in src/lib.rs
impl Decompressor {
fn fill_buf(&mut self) -> std::io::Result<usize> {
todo!()
}
}
fill_buf() method is where we iterate through Produce and carry out the consumer logic as before.
// replace in src/lib.rs
fn fill_buf(&mut self) -> std::io::Result<usize> {
loop {
match self.iter.next() {
Some(Produce::Err(e)) => {
return Err(e.into());
}
Some(Produce::Header(_)) => { /* nothing to do */ }
Some(Produce::Data(xs)) => {
if xs.is_empty() {
continue;
}
self.checksum.update(&xs);
self.buf = xs;
self.begin = 0;
return Ok(self.buf.len());
}
Some(Produce::Footer(footer)) => {
if self.checksum.checksum() != footer.crc32 {
return Err(Error::ChecksumMismatch.into());
}
if self.checksum.len() & 0xFFFFFFFF != footer.size as usize {
return Err(Error::SizeMismatch.into());
}
self.checksum.reset_len();
}
None => return Ok(0),
}
}
}
Note that fill_buf() method is called only when buf is empty. This is why we can simply set self.buf = xs and self.begin = 0 above.
The constructor for Decompressor should be straightforward and mostly borrows the code from gunzip() method, which we no longer need
// append to src/lib.rs
pub fn new<R: Read + Send + 'static>(read: R, multithread: bool) -> Self {
let iter = if multithread {
let (tx, rx) = std::sync::mpsc::channel::<Produce>();
std::thread::spawn(move || {
for produce in Producer::new(read) {
tx.send(produce)
.expect("error while transmitting produce over the channel");
}
});
Box::new(rx.into_iter()) as Box<dyn Iterator<Item = Produce>>
} else {
Box::new(Producer::new(read))
};
Self {
iter,
buf: vec![],
begin: 0,
checksum: Crc32Checksum::new(),
}
}
Finally, we need to implement conversion from error::Error to std::io::Error within fill_buf() method. For that, we simply need to implement From
// append to src/error.rs
impl From<Error> for std::io::Error {
fn from(value: Error) -> Self {
match value {
Error::StdIoError(e) => Self::from(e),
e => Self::new(std::io::ErrorKind::Other, e),
}
}
}
OK, so that’s about it. We may want to clean up our code to remove unused imports. The easiest way is to use clippy
cargo clippy
The code should compile and work! By the way, I did observe about 10% slower runtime after this change with single-thread config.
So, this article concludes write gunzip from scratch series. We not only implemented .gz decompressor in Rust, but also added advanced features like multithreading and streaming support, all of which in just about 1000 lines of code! I hope you enjoyed the journey as much as I did, and feel free to let me know how was your experience.
Previous in series: https://medium.com/towardsdev/rust-write-gunzip-from-scratch-10-f368d5511349
Next in series: https://medium.com/@techhara/rust-write-gunzip-from-scratch-12-f29e26679884