tiny_http/util/
sequential.rs
1use std::io::Result as IoResult;
2use std::io::{Read, Write};
3
4use std::sync::mpsc::channel;
5use std::sync::mpsc::{Receiver, Sender};
6use std::sync::{Arc, Mutex};
7
8use std::mem;
9
10pub struct SequentialReaderBuilder<R>
11where
12 R: Read + Send,
13{
14 inner: SequentialReaderBuilderInner<R>,
15}
16
17enum SequentialReaderBuilderInner<R>
18where
19 R: Read + Send,
20{
21 First(R),
22 NotFirst(Receiver<R>),
23}
24
25pub struct SequentialReader<R>
26where
27 R: Read + Send,
28{
29 inner: SequentialReaderInner<R>,
30 next: Sender<R>,
31}
32
33enum SequentialReaderInner<R>
34where
35 R: Read + Send,
36{
37 MyTurn(R),
38 Waiting(Receiver<R>),
39 Empty,
40}
41
42pub struct SequentialWriterBuilder<W>
43where
44 W: Write + Send,
45{
46 writer: Arc<Mutex<W>>,
47 next_trigger: Option<Receiver<()>>,
48}
49
50pub struct SequentialWriter<W>
51where
52 W: Write + Send,
53{
54 trigger: Option<Receiver<()>>,
55 writer: Arc<Mutex<W>>,
56 on_finish: Sender<()>,
57}
58
59impl<R: Read + Send> SequentialReaderBuilder<R> {
60 pub fn new(reader: R) -> SequentialReaderBuilder<R> {
61 SequentialReaderBuilder {
62 inner: SequentialReaderBuilderInner::First(reader),
63 }
64 }
65}
66
67impl<W: Write + Send> SequentialWriterBuilder<W> {
68 pub fn new(writer: W) -> SequentialWriterBuilder<W> {
69 SequentialWriterBuilder {
70 writer: Arc::new(Mutex::new(writer)),
71 next_trigger: None,
72 }
73 }
74}
75
76impl<R: Read + Send> Iterator for SequentialReaderBuilder<R> {
77 type Item = SequentialReader<R>;
78
79 fn next(&mut self) -> Option<SequentialReader<R>> {
80 let (tx, rx) = channel();
81
82 let inner = mem::replace(&mut self.inner, SequentialReaderBuilderInner::NotFirst(rx));
83
84 match inner {
85 SequentialReaderBuilderInner::First(reader) => Some(SequentialReader {
86 inner: SequentialReaderInner::MyTurn(reader),
87 next: tx,
88 }),
89
90 SequentialReaderBuilderInner::NotFirst(previous) => Some(SequentialReader {
91 inner: SequentialReaderInner::Waiting(previous),
92 next: tx,
93 }),
94 }
95 }
96}
97
98impl<W: Write + Send> Iterator for SequentialWriterBuilder<W> {
99 type Item = SequentialWriter<W>;
100 fn next(&mut self) -> Option<SequentialWriter<W>> {
101 let (tx, rx) = channel();
102 let mut next_next_trigger = Some(rx);
103 ::std::mem::swap(&mut next_next_trigger, &mut self.next_trigger);
104
105 Some(SequentialWriter {
106 trigger: next_next_trigger,
107 writer: self.writer.clone(),
108 on_finish: tx,
109 })
110 }
111}
112
113impl<R: Read + Send> Read for SequentialReader<R> {
114 fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
115 let mut reader = match self.inner {
116 SequentialReaderInner::MyTurn(ref mut reader) => return reader.read(buf),
117 SequentialReaderInner::Waiting(ref mut recv) => recv.recv().unwrap(),
118 SequentialReaderInner::Empty => unreachable!(),
119 };
120
121 let result = reader.read(buf);
122 self.inner = SequentialReaderInner::MyTurn(reader);
123 result
124 }
125}
126
127impl<W: Write + Send> Write for SequentialWriter<W> {
128 fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
129 if let Some(v) = self.trigger.as_mut() {
130 v.recv().unwrap()
131 }
132 self.trigger = None;
133
134 self.writer.lock().unwrap().write(buf)
135 }
136
137 fn flush(&mut self) -> IoResult<()> {
138 if let Some(v) = self.trigger.as_mut() {
139 v.recv().unwrap()
140 }
141 self.trigger = None;
142
143 self.writer.lock().unwrap().flush()
144 }
145}
146
147impl<R> Drop for SequentialReader<R>
148where
149 R: Read + Send,
150{
151 fn drop(&mut self) {
152 let inner = mem::replace(&mut self.inner, SequentialReaderInner::Empty);
153
154 match inner {
155 SequentialReaderInner::MyTurn(reader) => {
156 self.next.send(reader).ok();
157 }
158 SequentialReaderInner::Waiting(recv) => {
159 let reader = recv.recv().unwrap();
160 self.next.send(reader).ok();
161 }
162 SequentialReaderInner::Empty => (),
163 }
164 }
165}
166
167impl<W> Drop for SequentialWriter<W>
168where
169 W: Write + Send,
170{
171 fn drop(&mut self) {
172 self.on_finish.send(()).ok();
173 }
174}