tiny_http/util/
sequential.rs

1use std::io::Result as IoResult;
2use std::io::{Read, Write};
3
4use std::sync::mpsc::channel;
5use std::sync::mpsc::{Receiver, Sender};
6use std::sync::{Arc, Mutex};
7
8use std::mem;
9
10pub struct SequentialReaderBuilder<R>
11where
12    R: Read + Send,
13{
14    inner: SequentialReaderBuilderInner<R>,
15}
16
17enum SequentialReaderBuilderInner<R>
18where
19    R: Read + Send,
20{
21    First(R),
22    NotFirst(Receiver<R>),
23}
24
25pub struct SequentialReader<R>
26where
27    R: Read + Send,
28{
29    inner: SequentialReaderInner<R>,
30    next: Sender<R>,
31}
32
33enum SequentialReaderInner<R>
34where
35    R: Read + Send,
36{
37    MyTurn(R),
38    Waiting(Receiver<R>),
39    Empty,
40}
41
42pub struct SequentialWriterBuilder<W>
43where
44    W: Write + Send,
45{
46    writer: Arc<Mutex<W>>,
47    next_trigger: Option<Receiver<()>>,
48}
49
50pub struct SequentialWriter<W>
51where
52    W: Write + Send,
53{
54    trigger: Option<Receiver<()>>,
55    writer: Arc<Mutex<W>>,
56    on_finish: Sender<()>,
57}
58
59impl<R: Read + Send> SequentialReaderBuilder<R> {
60    pub fn new(reader: R) -> SequentialReaderBuilder<R> {
61        SequentialReaderBuilder {
62            inner: SequentialReaderBuilderInner::First(reader),
63        }
64    }
65}
66
67impl<W: Write + Send> SequentialWriterBuilder<W> {
68    pub fn new(writer: W) -> SequentialWriterBuilder<W> {
69        SequentialWriterBuilder {
70            writer: Arc::new(Mutex::new(writer)),
71            next_trigger: None,
72        }
73    }
74}
75
76impl<R: Read + Send> Iterator for SequentialReaderBuilder<R> {
77    type Item = SequentialReader<R>;
78
79    fn next(&mut self) -> Option<SequentialReader<R>> {
80        let (tx, rx) = channel();
81
82        let inner = mem::replace(&mut self.inner, SequentialReaderBuilderInner::NotFirst(rx));
83
84        match inner {
85            SequentialReaderBuilderInner::First(reader) => Some(SequentialReader {
86                inner: SequentialReaderInner::MyTurn(reader),
87                next: tx,
88            }),
89
90            SequentialReaderBuilderInner::NotFirst(previous) => Some(SequentialReader {
91                inner: SequentialReaderInner::Waiting(previous),
92                next: tx,
93            }),
94        }
95    }
96}
97
98impl<W: Write + Send> Iterator for SequentialWriterBuilder<W> {
99    type Item = SequentialWriter<W>;
100    fn next(&mut self) -> Option<SequentialWriter<W>> {
101        let (tx, rx) = channel();
102        let mut next_next_trigger = Some(rx);
103        ::std::mem::swap(&mut next_next_trigger, &mut self.next_trigger);
104
105        Some(SequentialWriter {
106            trigger: next_next_trigger,
107            writer: self.writer.clone(),
108            on_finish: tx,
109        })
110    }
111}
112
113impl<R: Read + Send> Read for SequentialReader<R> {
114    fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
115        let mut reader = match self.inner {
116            SequentialReaderInner::MyTurn(ref mut reader) => return reader.read(buf),
117            SequentialReaderInner::Waiting(ref mut recv) => recv.recv().unwrap(),
118            SequentialReaderInner::Empty => unreachable!(),
119        };
120
121        let result = reader.read(buf);
122        self.inner = SequentialReaderInner::MyTurn(reader);
123        result
124    }
125}
126
127impl<W: Write + Send> Write for SequentialWriter<W> {
128    fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
129        if let Some(v) = self.trigger.as_mut() {
130            v.recv().unwrap()
131        }
132        self.trigger = None;
133
134        self.writer.lock().unwrap().write(buf)
135    }
136
137    fn flush(&mut self) -> IoResult<()> {
138        if let Some(v) = self.trigger.as_mut() {
139            v.recv().unwrap()
140        }
141        self.trigger = None;
142
143        self.writer.lock().unwrap().flush()
144    }
145}
146
147impl<R> Drop for SequentialReader<R>
148where
149    R: Read + Send,
150{
151    fn drop(&mut self) {
152        let inner = mem::replace(&mut self.inner, SequentialReaderInner::Empty);
153
154        match inner {
155            SequentialReaderInner::MyTurn(reader) => {
156                self.next.send(reader).ok();
157            }
158            SequentialReaderInner::Waiting(recv) => {
159                let reader = recv.recv().unwrap();
160                self.next.send(reader).ok();
161            }
162            SequentialReaderInner::Empty => (),
163        }
164    }
165}
166
167impl<W> Drop for SequentialWriter<W>
168where
169    W: Write + Send,
170{
171    fn drop(&mut self) {
172        self.on_finish.send(()).ok();
173    }
174}