chunked_transfer/
encoder.rs

1use std::io::Result as IoResult;
2use std::io::Write;
3
4/// Splits the incoming data into HTTP chunks.
5///
6/// # Example
7///
8/// ```
9/// use chunked_transfer::Encoder;
10/// use std::io::Write;
11///
12/// let mut decoded = "hello world";
13/// let mut encoded: Vec<u8> = vec![];
14///
15/// {
16///     let mut encoder = Encoder::with_chunks_size(&mut encoded, 5);
17///     encoder.write_all(decoded.as_bytes());
18/// }
19///
20/// assert_eq!(encoded, b"5\r\nhello\r\n5\r\n worl\r\n1\r\nd\r\n0\r\n\r\n");
21/// ```
22pub struct Encoder<W>
23where
24    W: Write,
25{
26    // where to send the result
27    output: W,
28
29    // size of each chunk
30    chunks_size: usize,
31
32    // data waiting to be sent is stored here
33    // This will always be at least 6 bytes long. The first 6 bytes
34    // are reserved for the chunk size and \r\n.
35    buffer: Vec<u8>,
36
37    // Flushes the internal buffer after each write. This might be useful
38    // if data should be sent immediately to downstream consumers
39    flush_after_write: bool,
40}
41
42const MAX_CHUNK_SIZE: usize = std::u32::MAX as usize;
43// This accounts for four hex digits (enough to hold a u32) plus two bytes
44// for the \r\n
45const MAX_HEADER_SIZE: usize = 6;
46
47impl<W> Encoder<W>
48where
49    W: Write,
50{
51    pub fn new(output: W) -> Encoder<W> {
52        Encoder::with_chunks_size(output, 8192)
53    }
54
55    /// Gets a reference to the underlying value in this encoder.
56    pub fn get_ref(&self) -> &W {
57        &self.output
58    }
59
60    /// Gets a mutable reference to the underlying value in this encoder.
61    pub fn get_mut(&mut self) -> &mut W {
62        &mut self.output
63    }
64
65    pub fn with_chunks_size(output: W, chunks: usize) -> Encoder<W> {
66        let chunks_size = chunks.min(MAX_CHUNK_SIZE);
67        let mut encoder = Encoder {
68            output,
69            chunks_size,
70            buffer: vec![0; MAX_HEADER_SIZE],
71            flush_after_write: false,
72        };
73        encoder.reset_buffer();
74        encoder
75    }
76
77    pub fn with_flush_after_write(output: W) -> Encoder<W> {
78        let mut encoder = Encoder {
79            output,
80            chunks_size: 8192,
81            buffer: vec![0; MAX_HEADER_SIZE],
82            flush_after_write: true,
83        };
84        encoder.reset_buffer();
85        encoder
86    }
87
88    fn reset_buffer(&mut self) {
89        // Reset buffer, still leaving space for the chunk size. That space
90        // will be populated once we know the size of the chunk.
91        self.buffer.truncate(MAX_HEADER_SIZE);
92    }
93
94    fn is_buffer_empty(&self) -> bool {
95        self.buffer.len() == MAX_HEADER_SIZE
96    }
97
98    fn buffer_len(&self) -> usize {
99        self.buffer.len() - MAX_HEADER_SIZE
100    }
101
102    fn send(&mut self) -> IoResult<()> {
103        // Never send an empty buffer, because that would be interpreted
104        // as the end of the stream, which we indicate explicitly on drop.
105        if self.is_buffer_empty() {
106            return Ok(());
107        }
108        // Prepend the length and \r\n to the buffer.
109        let prelude = format!("{:x}\r\n", self.buffer_len());
110        let prelude = prelude.as_bytes();
111
112        // This should never happen because MAX_CHUNK_SIZE of u32::MAX
113        // can always be encoded in 4 hex bytes.
114        assert!(
115            prelude.len() <= MAX_HEADER_SIZE,
116            "invariant failed: prelude longer than MAX_HEADER_SIZE"
117        );
118
119        // Copy the prelude into the buffer. For small chunks, this won't necessarily
120        // take up all the space that was reserved for the prelude.
121        let offset = MAX_HEADER_SIZE - prelude.len();
122        self.buffer[offset..MAX_HEADER_SIZE].clone_from_slice(prelude);
123
124        // Append the chunk-finishing \r\n to the buffer.
125        self.buffer.write_all(b"\r\n")?;
126
127        self.output.write_all(&self.buffer[offset..])?;
128        self.reset_buffer();
129
130        Ok(())
131    }
132}
133
134impl<W> Write for Encoder<W>
135where
136    W: Write,
137{
138    fn write(&mut self, data: &[u8]) -> IoResult<usize> {
139        let remaining_buffer_space = self.chunks_size - self.buffer_len();
140        let bytes_to_buffer = std::cmp::min(remaining_buffer_space, data.len());
141        self.buffer.extend_from_slice(&data[0..bytes_to_buffer]);
142        let more_to_write: bool = bytes_to_buffer < data.len();
143        if self.flush_after_write || more_to_write {
144            self.send()?;
145        }
146
147        // If we didn't write the whole thing, keep working on it.
148        if more_to_write {
149            self.write_all(&data[bytes_to_buffer..])?;
150        }
151        Ok(data.len())
152    }
153
154    fn flush(&mut self) -> IoResult<()> {
155        self.send()
156    }
157}
158
159impl<W> Drop for Encoder<W>
160where
161    W: Write,
162{
163    fn drop(&mut self) {
164        self.flush().ok();
165        write!(self.output, "0\r\n\r\n").ok();
166    }
167}
168
169#[cfg(test)]
170mod test {
171    use super::Encoder;
172    use std::io;
173    use std::io::Write;
174    use std::str::from_utf8;
175
176    #[test]
177    fn test() {
178        let mut source = io::Cursor::new("hello world".to_string().into_bytes());
179        let mut dest: Vec<u8> = vec![];
180
181        {
182            let mut encoder = Encoder::with_chunks_size(dest.by_ref(), 5);
183            io::copy(&mut source, &mut encoder).unwrap();
184            assert!(!encoder.is_buffer_empty());
185        }
186
187        let output = from_utf8(&dest).unwrap();
188
189        assert_eq!(output, "5\r\nhello\r\n5\r\n worl\r\n1\r\nd\r\n0\r\n\r\n");
190    }
191    #[test]
192    fn flush_after_write() {
193        let mut source = io::Cursor::new("hello world".to_string().into_bytes());
194        let mut dest: Vec<u8> = vec![];
195
196        {
197            let mut encoder = Encoder::with_flush_after_write(dest.by_ref());
198            io::copy(&mut source, &mut encoder).unwrap();
199            // The internal buffer has been flushed.
200            assert!(encoder.is_buffer_empty());
201        }
202
203        let output = from_utf8(&dest).unwrap();
204
205        assert_eq!(output, "b\r\nhello world\r\n0\r\n\r\n");
206    }
207}