1use std::error;
14use std::io;
15use std::io::{Cursor, Read, Write, SeekFrom, Error, ErrorKind};
16use byteorder::{ReadBytesExt, LittleEndian};
17use std::collections::HashMap;
18use std::collections::hash_map::Entry;
19use std::fmt::{Display, Formatter, Error as FmtError};
20use std::mem::replace;
21use crc::vorbis_crc32_update;
22use Packet;
23use std::io::Seek;
24
25#[derive(Debug)]
27pub enum OggReadError {
28 NoCapturePatternFound,
31 InvalidStreamStructVer(u8),
34 HashMismatch(u32, u32),
36 ReadError(io::Error),
38 InvalidData,
40}
41
42impl OggReadError {
43 fn description_str(&self) -> &str {
44 match *self {
45 OggReadError::NoCapturePatternFound => "No Ogg capture pattern found",
46 OggReadError::InvalidStreamStructVer(_) =>
47 "A non zero stream structure version was passed",
48 OggReadError::HashMismatch(_, _) => "CRC32 hash mismatch",
49 OggReadError::ReadError(_) => "I/O error",
50 OggReadError::InvalidData => "Constraint violated",
51 }
52 }
53}
54
55impl error::Error for OggReadError {
56 fn description(&self) -> &str {
57 self.description_str()
58 }
59
60 fn cause(&self) -> Option<&dyn error::Error> {
61 match *self {
62 OggReadError::ReadError(ref err) => Some(err as &dyn error::Error),
63 _ => None
64 }
65 }
66}
67
68impl Display for OggReadError {
69 fn fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError> {
70 write!(fmt, "{}", Self::description_str(self))
71 }
72}
73
74impl From<io::Error> for OggReadError {
75 fn from(err :io::Error) -> OggReadError {
76 return OggReadError::ReadError(err);
77 }
78}
79
80struct PageBaseInfo {
82 starts_with_continued :bool,
84 first_page :bool,
86 last_page :bool,
88 absgp :u64,
90 sequence_num :u32,
92 packet_positions :Vec<(u16,u16)>,
98 ends_with_continued :bool,
101}
102
103struct PageInfo {
105 bi :PageBaseInfo,
107 packet_idx :u8,
109 page_body :Vec<u8>,
111
112 last_overlap_pck :Vec<Vec<u8>>,
117}
118
119impl PageInfo {
120 fn is_first_pck_in_pg(&self) -> bool {
123 return self.packet_idx == 0;
124 }
125 fn is_last_pck_in_pg(&self) -> bool {
130 return (self.packet_idx + 1 + (self.bi.ends_with_continued as u8)) as usize
131 == self.bi.packet_positions.len();
132 }
133}
134
135pub struct OggPage(PageParser);
137
138impl OggPage {
139 fn has_packet_end(&self) -> bool {
141 (self.0.bi.packet_positions.len() -
142 self.0.bi.ends_with_continued as usize) > 0
143 }
144 fn has_whole_packet(&self) -> bool {
147 self.0.bi.packet_positions.len().saturating_sub(
148 self.0.bi.ends_with_continued as usize +
149 self.0.bi.starts_with_continued as usize) > 0
150 }
151 fn has_packet_start(&self) -> bool {
153 (self.0.bi.packet_positions.len() -
154 self.0.bi.starts_with_continued as usize) > 0
155 }
156}
157
158pub struct PageParser {
167 bi :PageBaseInfo,
170
171 stream_serial :u32,
172 checksum :u32,
173 header_buf: [u8; 27],
174 packet_count :u16, segments_or_packets_buf :Vec<u8>,
179}
180
181impl PageParser {
182 pub fn new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError> {
194 let mut header_rdr = Cursor::new(header_buf);
195 header_rdr.set_position(4);
196 let stream_structure_version = tri!(header_rdr.read_u8());
197 if stream_structure_version != 0 {
198 tri!(Err(OggReadError::InvalidStreamStructVer(stream_structure_version)));
199 }
200 let header_type_flag = header_rdr.read_u8().unwrap();
201 let stream_serial;
202
203 Ok((PageParser {
204 bi : PageBaseInfo {
205 starts_with_continued : header_type_flag & 0x01u8 != 0,
206 first_page : header_type_flag & 0x02u8 != 0,
207 last_page : header_type_flag & 0x04u8 != 0,
208 absgp : header_rdr.read_u64::<LittleEndian>().unwrap(),
209 sequence_num : {
210 stream_serial = header_rdr.read_u32::<LittleEndian>().unwrap();
211 header_rdr.read_u32::<LittleEndian>().unwrap()
212 },
213 packet_positions : Vec::new(),
214 ends_with_continued : false,
215 },
216 stream_serial,
217 checksum : header_rdr.read_u32::<LittleEndian>().unwrap(),
218 header_buf,
219 packet_count : 0,
220 segments_or_packets_buf :Vec::new(),
221 },
222 header_rdr.read_u8().unwrap() as usize
224 ))
225 }
226
227 pub fn parse_segments(&mut self, segments_buf :Vec<u8>) -> usize {
233 let mut page_siz :u16 = 0; self.bi.ends_with_continued = self.bi.starts_with_continued;
236
237 for val in &segments_buf {
241 page_siz += *val as u16;
242 self.packet_count += (*val < 255) as u16;
244 self.bi.ends_with_continued = !(*val < 255);
245 }
246
247 let mut packets = Vec::with_capacity(self.packet_count as usize
248 + self.bi.ends_with_continued as usize);
249 let mut cur_packet_siz :u16 = 0;
250 let mut cur_packet_offs :u16 = 0;
251
252 for val in &segments_buf {
255 cur_packet_siz += *val as u16;
256 if *val < 255 {
257 packets.push((cur_packet_offs, cur_packet_siz));
258 cur_packet_offs += cur_packet_siz;
259 cur_packet_siz = 0;
260 }
261 }
262 if self.bi.ends_with_continued {
263 packets.push((cur_packet_offs, cur_packet_siz));
264 }
265
266 self.bi.packet_positions = packets;
267 self.segments_or_packets_buf = segments_buf;
268 page_siz as usize
269 }
270
271 pub fn parse_packet_data(mut self, packet_data :Vec<u8>) ->
275 Result<OggPage, OggReadError> {
276 self.header_buf[22] = 0;
279 self.header_buf[23] = 0;
280 self.header_buf[24] = 0;
281 self.header_buf[25] = 0;
282
283 let mut hash_calculated :u32;
285 hash_calculated = vorbis_crc32_update(0, &self.header_buf);
286 hash_calculated = vorbis_crc32_update(hash_calculated,
287 &self.segments_or_packets_buf);
288 hash_calculated = vorbis_crc32_update(hash_calculated, &packet_data);
289
290 if self.checksum != hash_calculated {
292 if !cfg!(fuzzing) {
296 tri!(Err(OggReadError::HashMismatch(self.checksum, hash_calculated)));
297 }
298 }
299 self.segments_or_packets_buf = packet_data;
300 Ok(OggPage(self))
301 }
302}
303
304pub struct BasePacketReader {
322 page_infos :HashMap<u32, PageInfo>,
328
329 stream_with_stuff :Option<u32>,
333
334 has_seeked :bool,
337}
338
339impl BasePacketReader {
340 pub fn new() -> Self {
345 BasePacketReader { page_infos: HashMap::new(),
346 stream_with_stuff: None, has_seeked: false }
347 }
348 pub fn read_packet(&mut self) -> Option<Packet> {
354 if self.stream_with_stuff == None {
355 return None;
356 }
357 let str_serial :u32 = self.stream_with_stuff.unwrap();
358 let pg_info = self.page_infos.get_mut(&str_serial).unwrap();
359 let (offs, len) = pg_info.bi.packet_positions[pg_info.packet_idx as usize];
360 let need_to_glue = pg_info.packet_idx == 0 &&
363 pg_info.bi.starts_with_continued &&
364 !(pg_info.bi.ends_with_continued && pg_info.bi.packet_positions.len() == 1);
365 let packet_content :Vec<u8> = if need_to_glue {
366 let mut siz :usize = 0;
368 for pck in pg_info.last_overlap_pck.iter() {
369 siz += pck.len();
370 }
371 siz += len as usize;
372 let mut cont :Vec<u8> = Vec::with_capacity(siz);
373
374 for pck in pg_info.last_overlap_pck.iter() {
376 cont.write_all(pck).unwrap();
377 }
378 pg_info.last_overlap_pck = Vec::new();
380 cont.write_all(&pg_info.page_body[offs as usize .. (offs + len) as usize]).unwrap();
381
382 cont
383 } else {
384 let mut cont :Vec<u8> = Vec::with_capacity(len as usize);
385 let cont_slice :&[u8] = &pg_info.page_body[offs as usize .. (offs + len) as usize];
389 cont.write_all(cont_slice).unwrap();
390 cont
391 };
392
393 let first_pck_in_pg = pg_info.is_first_pck_in_pg();
394 let first_pck_overall = pg_info.bi.first_page && first_pck_in_pg;
395
396 let last_pck_in_pg = pg_info.is_last_pck_in_pg();
397 let last_pck_overall = pg_info.bi.last_page && last_pck_in_pg;
398
399 pg_info.packet_idx += 1;
401 if last_pck_in_pg {
404 self.stream_with_stuff = None;
405 }
406
407 return Some(Packet {
408 data: packet_content,
409 first_packet_pg: first_pck_in_pg,
410 first_packet_stream: first_pck_overall,
411 last_packet_pg: last_pck_in_pg,
412 last_packet_stream: last_pck_overall,
413 absgp_page: pg_info.bi.absgp,
414 stream_serial: str_serial,
415 });
416 }
417
418 pub fn push_page(&mut self, page :OggPage) -> Result<(), OggReadError> {
425 let mut pg_prs = page.0;
426 match self.page_infos.entry(pg_prs.stream_serial) {
427 Entry::Occupied(mut o) => {
428 let inf = o.get_mut();
429 if pg_prs.bi.first_page {
430 tri!(Err(OggReadError::InvalidData));
431 }
432 if pg_prs.bi.starts_with_continued != inf.bi.ends_with_continued {
433 if !self.has_seeked {
434 tri!(Err(OggReadError::InvalidData));
435 } else {
436 inf.last_overlap_pck.clear();
440 if pg_prs.bi.starts_with_continued {
441 pg_prs.bi.packet_positions.remove(0);
442 if pg_prs.packet_count != 0 {
443 pg_prs.packet_count -= 1;
445 } else {
446 pg_prs.bi.ends_with_continued = false;
451 }
452 }
453 }
454 } else if pg_prs.bi.starts_with_continued {
455 let (offs, len) = inf.bi.packet_positions[inf.packet_idx as usize];
458 if len as usize != inf.page_body.len() {
459 let mut tmp = Vec::with_capacity(len as usize);
460 tmp.write_all(&inf.page_body[offs as usize .. (offs + len) as usize]).unwrap();
461 inf.last_overlap_pck.push(tmp);
462 } else {
463 inf.last_overlap_pck.push(replace(&mut inf.page_body, vec![0;0]));
465 }
466
467 }
468 inf.bi = pg_prs.bi;
469 inf.packet_idx = 0;
470 inf.page_body = pg_prs.segments_or_packets_buf;
471 },
472 Entry::Vacant(v) => {
473 if !self.has_seeked {
474 if !pg_prs.bi.first_page || pg_prs.bi.starts_with_continued {
475 tri!(Err(OggReadError::InvalidData));
477 }
478 } else {
479 if !pg_prs.bi.first_page {
480 }
482 if pg_prs.bi.starts_with_continued {
483 pg_prs.bi.packet_positions.remove(0);
486 if pg_prs.packet_count != 0 {
487 pg_prs.packet_count -= 1;
489 } else {
490 pg_prs.bi.ends_with_continued = false;
495 }
496 pg_prs.bi.starts_with_continued = false;
498 }
499 }
500 v.insert(PageInfo {
501 bi : pg_prs.bi,
502 packet_idx: 0,
503 page_body: pg_prs.segments_or_packets_buf,
504 last_overlap_pck: Vec::new(),
505 });
506 },
507 }
508 let pg_has_stuff :bool = pg_prs.packet_count > 0;
509
510 if pg_has_stuff {
511 self.stream_with_stuff = Some(pg_prs.stream_serial);
512 } else {
513 self.stream_with_stuff = None;
514 }
515
516 return Ok(());
517 }
518
519 pub fn update_after_seek(&mut self) {
525 self.stream_with_stuff = None;
526 self.page_infos = HashMap::new();
527 self.has_seeked = true;
528 }
529}
530
531#[derive(Clone, Copy)]
532enum UntilPageHeaderReaderMode {
533 Searching,
534 FoundWithNeeded(u8),
535 SeekNeeded(i32),
536 Found,
537}
538
539enum UntilPageHeaderResult {
540 Eof,
541 Found,
542 ReadNeeded,
543 SeekNeeded,
544}
545
546struct UntilPageHeaderReader {
547 mode :UntilPageHeaderReaderMode,
548 cpt_of :u8,
552 ret_buf :[u8; 27],
554 read_amount :usize,
555}
556
557impl UntilPageHeaderReader {
558 pub fn new() -> Self {
559 UntilPageHeaderReader {
560 mode : UntilPageHeaderReaderMode::Searching,
561 cpt_of : 0,
562 ret_buf : [0; 27],
563 read_amount : 0,
564 }
565 }
566 fn check_arr(&mut self, arr :&[u8]) -> Option<usize> {
572 for (i, ch) in arr.iter().enumerate() {
573 match *ch {
574 b'O' => self.cpt_of = 1,
575 b'g' if self.cpt_of == 1 || self.cpt_of == 2 => self.cpt_of += 1,
576 b'S' if self.cpt_of == 3 => return Some(i),
577 _ => self.cpt_of = 0,
578 }
579 }
580 return None;
581 }
582 pub fn do_read<R :Read>(&mut self, mut rdr :R)
587 -> Result<UntilPageHeaderResult, OggReadError> {
588 use self::UntilPageHeaderReaderMode::*;
589 use self::UntilPageHeaderResult as Res;
590 let mut buf :[u8; 1024] = [0; 1024];
593
594 let rd_len = tri!(rdr.read(if self.read_amount < 27 {
595 &mut buf[0 .. 27 - self.read_amount]
599 } else {
600 match self.mode {
601 Searching => &mut buf,
602 FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
603 SeekNeeded(_) => return Ok(Res::SeekNeeded),
604 Found => return Ok(Res::Found),
605 }
606 }));
607
608 if rd_len == 0 {
609 if self.read_amount == 0 {
611 return Ok(Res::Eof);
615 } else {
616 tri!(Err(OggReadError::NoCapturePatternFound));
621 }
622 }
623 self.read_amount += rd_len;
624
625 let read_amount_max = 150 * 1024;
630 if self.read_amount > read_amount_max {
631 tri!(Err(OggReadError::NoCapturePatternFound));
634 }
635
636 let rd_buf = &buf[0 .. rd_len];
637
638 use std::cmp::min;
639 let (off, needed) = match self.mode {
640 Searching => match self.check_arr(rd_buf) {
641 Some(off) => {
643 self.ret_buf[0] = b'O';
644 self.ret_buf[1] = b'g';
645 self.ret_buf[2] = b'g';
646 self.ret_buf[3] = b'S'; (off, 24)
648 },
649 None => return Ok(Res::ReadNeeded),
651 },
652 FoundWithNeeded(needed) => {
653 (0, needed as usize)
654 },
655 _ => unimplemented!(),
656 };
657
658 let fnd_buf = &rd_buf[off..];
659
660 let copy_amount = min(needed, fnd_buf.len());
661 let start_fill = 27 - needed;
662 (&mut self.ret_buf[start_fill .. copy_amount + start_fill])
663 .copy_from_slice(&fnd_buf[0 .. copy_amount]);
664 if fnd_buf.len() == needed {
665 self.mode = Found;
667 return Ok(Res::Found);
668 } else if fnd_buf.len() < needed {
669 let needed_new = needed - copy_amount;
671 self.mode = FoundWithNeeded(needed_new as u8);
672 return Ok(Res::ReadNeeded);
673 } else {
674 self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
679 return Ok(Res::SeekNeeded);
680 }
681 }
682 pub fn do_seek<S :Seek>(&mut self, mut skr :S)
683 -> Result<UntilPageHeaderResult, OggReadError> {
684 use self::UntilPageHeaderReaderMode::*;
685 use self::UntilPageHeaderResult as Res;
686 match self.mode {
687 Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
688 SeekNeeded(offs) => {
689 tri!(skr.seek(SeekFrom::Current(offs as i64)));
690 self.mode = Found;
691 Ok(Res::Found)
692 },
693 Found => Ok(Res::Found),
694 }
695 }
696 pub fn into_header(self) -> [u8; 27] {
697 use self::UntilPageHeaderReaderMode::*;
698 match self.mode {
699 Found => self.ret_buf,
700 _ => panic!("wrong mode"),
701 }
702 }
703}
704
705pub struct PacketReader<T :io::Read + io::Seek> {
716 rdr :T,
717
718 base_pck_rdr :BasePacketReader,
719}
720
721impl<T :io::Read + io::Seek> PacketReader<T> {
722 pub fn new(rdr :T) -> PacketReader<T> {
724 PacketReader { rdr, base_pck_rdr : BasePacketReader::new() }
725 }
726 pub fn into_inner(self) -> T {
728 self.rdr
729 }
730 pub fn read_packet(&mut self) -> Result<Option<Packet>, OggReadError> {
734 loop {
738 if let Some(pck) = self.base_pck_rdr.read_packet() {
739 return Ok(Some(pck));
740 }
741 let page = tri!(self.read_ogg_page());
742 match page {
743 Some(page) => tri!(self.base_pck_rdr.push_page(page)),
744 None => return Ok(None),
745 }
746 }
747 }
748 pub fn read_packet_expected(&mut self) -> Result<Packet, OggReadError> {
754 match tri!(self.read_packet()) {
755 Some(p) => Ok(p),
756 None => tri!(Err(Error::new(ErrorKind::UnexpectedEof,
757 "Expected ogg packet but found end of physical stream"))),
758 }
759 }
760
761 fn read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError> {
770 let mut r = UntilPageHeaderReader::new();
771 use self::UntilPageHeaderResult::*;
772 let mut res = tri!(r.do_read(&mut self.rdr));
773 loop {
774 res = match res {
775 Eof => return Ok(None),
776 Found => break,
777 ReadNeeded => tri!(r.do_read(&mut self.rdr)),
778 SeekNeeded => tri!(r.do_seek(&mut self.rdr))
779 }
780 }
781 Ok(Some(r.into_header()))
782 }
783
784 fn read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError> {
790 let header_buf :[u8; 27] = match tri!(self.read_until_pg_header()) {
791 Some(s) => s,
792 None => return Ok(None)
793 };
794 let (mut pg_prs, page_segments) = tri!(PageParser::new(header_buf));
795
796 let mut segments_buf = vec![0; page_segments]; tri!(self.rdr.read_exact(&mut segments_buf));
798
799 let page_siz = pg_prs.parse_segments(segments_buf);
800
801 let mut packet_data = vec![0; page_siz as usize];
802 tri!(self.rdr.read_exact(&mut packet_data));
803
804 Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
805 }
806
807 pub fn seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error> {
814 let r = tri!(self.rdr.seek(pos));
815 self.base_pck_rdr.update_after_seek();
817 return Ok(r);
818 }
819
820 pub fn seek_absgp(&mut self, stream_serial :Option<u32>,
837 pos_goal :u64) -> Result<bool, OggReadError> {
838 macro_rules! found {
839 ($pos:expr) => {{
840 tri!(self.rdr.seek(SeekFrom::Start($pos)));
842 self.base_pck_rdr.update_after_seek();
843 return Ok(true);
844 }};
845 }
846 macro_rules! bt {
847 ($e:expr) => {{
848 match tri!($e) {
849 Some(s) => s,
850 None => return Ok(false),
851 }
852 }};
853 }
854 macro_rules! pg_read_until_end_or_goal {
859 {$goal:expr} => {{
860 let mut pos;
861 let mut pg;
862 loop {
863 let (n_pos, n_pg) = pg_read_match_serial!();
864 pos = n_pos;
865 pg = n_pg;
866 if pg.0.bi.absgp == $goal {
872 found!(pos);
873 }
874 if pg.0.bi.absgp > $goal {
877 break;
878 }
879 if pg.0.bi.last_page {
881 return Ok(false)
882 }
883 }
885 (pos, pg)
886 }};
887 }
888 macro_rules! pg_read_match_serial {
889 {} => {{
890 let mut pos;
891 let mut pg;
892 let mut continued_pck_start = None;
893 loop {
894 pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
895 pg = bt!(self.read_ogg_page());
896 match stream_serial {
901 Some(s) if pg.0.stream_serial != s => (),
904 _ => match continued_pck_start {
905 None if pg.has_whole_packet() => break,
906 None if pg.has_packet_start() => {
907 continued_pck_start = Some(pos);
908 },
909 Some(s) if pg.has_packet_end() => {
910 pos = s;
915 break;
916 },
917 _ => (),
918 },
919 }
920 }
921 (pos, pg)
922 }};
923 }
924
925 let ab_of = |pg :&OggPage| { pg.0.bi.absgp };
932 let seq_of = |pg :&OggPage| { pg.0.bi.sequence_num };
933
934 tri!(self.rdr.seek(SeekFrom::Start(0)));
937 let (mut begin_pos, mut begin_pg) = pg_read_match_serial!();
938
939 if pos_goal == 0 {
941 found!(begin_pos);
943 }
944
945 tri!(seek_before_end(&mut self.rdr, 200 * 1024));
949 let (mut end_pos, mut end_pg) = pg_read_until_end_or_goal!(pos_goal);
950
951 loop {
953 if seq_of(&end_pg) - seq_of(&begin_pg) <= 1 {
956 found!(end_pos);
957 }
958 let pos_to_seek = begin_pos + (end_pos - begin_pos) / 2;
960 tri!(self.rdr.seek(SeekFrom::Start(pos_to_seek)));
961 let (pos, pg) = pg_read_match_serial!();
962 if seq_of(&end_pg) == seq_of(&pg) ||
967 seq_of(&begin_pg) == seq_of(&pg) {
968 let mut pos;
972 let mut pg;
973 let mut last_packet_end_pos = begin_pos;
974 tri!(self.rdr.seek(SeekFrom::Start(begin_pos)));
975 loop {
976 pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
977 pg = bt!(self.read_ogg_page());
978 match stream_serial {
983 Some(s) if pg.0.stream_serial != s => (),
987 _ if ab_of(&pg) == -1i64 as u64 => (),
988 _ if ab_of(&pg) >= pos_goal => found!(last_packet_end_pos),
990 _ => if pg.has_packet_end() {
993 last_packet_end_pos = pos;
994 },
995 }
996 }
997 }
998 if ab_of(&pg) >= pos_goal {
999 end_pos = pos;
1000 end_pg = pg;
1001 } else {
1002 begin_pos = pos;
1003 begin_pg = pg;
1004 }
1005 }
1006 }
1007 pub fn delete_unread_packets(&mut self) {
1010 self.base_pck_rdr.update_after_seek();
1011 }
1012}
1013
1014fn seek_before_end<T :io::Read + io::Seek>(mut rdr :T,
1016 offs :u64) -> Result<u64, OggReadError> {
1017 let end_pos = tri!(rdr.seek(SeekFrom::End(0)));
1018 let end_pos_to_seek = ::std::cmp::min(end_pos, offs);
1019 return Ok(tri!(rdr.seek(SeekFrom::End(-(end_pos_to_seek as i64)))));
1020}
1021
1022#[cfg(feature = "async")]
1023pub mod async_api {
1027 #![allow(deprecated)]
1028
1029 use super::*;
1030 use tokio_io::AsyncRead;
1031 use tokio_io::codec::{Decoder, FramedRead};
1032 use futures::stream::Stream;
1033 use futures::{Async, Poll};
1034 use bytes::BytesMut;
1035
1036 enum PageDecodeState {
1037 Head,
1038 Segments(PageParser, usize),
1039 PacketData(PageParser, usize),
1040 InUpdate,
1041 }
1042
1043 impl PageDecodeState {
1044 fn needed_size(&self) -> usize {
1045 match self {
1046 &PageDecodeState::Head => 27,
1047 &PageDecodeState::Segments(_, s) => s,
1048 &PageDecodeState::PacketData(_, s) => s,
1049 &PageDecodeState::InUpdate => panic!("invalid state"),
1050 }
1051 }
1052 }
1053
1054 struct PageDecoder {
1058 state : PageDecodeState,
1059 }
1060
1061 impl PageDecoder {
1062 fn new() -> Self {
1063 PageDecoder {
1064 state : PageDecodeState::Head,
1065 }
1066 }
1067 }
1068
1069 impl Decoder for PageDecoder {
1070 type Item = OggPage;
1071 type Error = OggReadError;
1072
1073 fn decode(&mut self, buf :&mut BytesMut) ->
1074 Result<Option<OggPage>, OggReadError> {
1075 use self::PageDecodeState::*;
1076 loop {
1077 let needed_size = self.state.needed_size();
1078 if buf.len() < needed_size {
1079 return Ok(None);
1080 }
1081 let mut ret = None;
1082 let consumed_buf = buf.split_to(needed_size).to_vec();
1083
1084 self.state = match ::std::mem::replace(&mut self.state, InUpdate) {
1085 Head => {
1086 let mut hdr_buf = [0; 27];
1087 hdr_buf.copy_from_slice(&consumed_buf);
1090 let tup = tri!(PageParser::new(hdr_buf));
1091 Segments(tup.0, tup.1)
1092 },
1093 Segments(mut pg_prs, _) => {
1094 let new_needed_len = pg_prs.parse_segments(consumed_buf);
1095 PacketData(pg_prs, new_needed_len)
1096 },
1097 PacketData(pg_prs, _) => {
1098 ret = Some(tri!(pg_prs.parse_packet_data(consumed_buf)));
1099 Head
1100 },
1101 InUpdate => panic!("invalid state"),
1102 };
1103 if ret.is_some() {
1104 return Ok(ret);
1105 }
1106 }
1107 }
1108
1109 fn decode_eof(&mut self, buf :&mut BytesMut) ->
1110 Result<Option<OggPage>, OggReadError> {
1111 return self.decode(buf);
1113 }
1114 }
1115
1116 pub struct PacketReader<T> where T :AsyncRead {
1120 base_pck_rdr :BasePacketReader,
1121 pg_rd :FramedRead<T, PageDecoder>,
1122 }
1123
1124 impl<T :AsyncRead> PacketReader<T> {
1125 pub fn new(inner :T) -> Self {
1126 PacketReader {
1127 base_pck_rdr : BasePacketReader::new(),
1128 pg_rd : FramedRead::new(inner, PageDecoder::new()),
1129 }
1130 }
1131 }
1132
1133 impl<T :AsyncRead> Stream for PacketReader<T> {
1134 type Item = Packet;
1135 type Error = OggReadError;
1136
1137 fn poll(&mut self) -> Poll<Option<Packet>, OggReadError> {
1138 loop {
1142 if let Some(pck) = self.base_pck_rdr.read_packet() {
1143 return Ok(Async::Ready(Some(pck)));
1144 }
1145 let page = try_ready!(self.pg_rd.poll());
1146 match page {
1147 Some(page) => tri!(self.base_pck_rdr.push_page(page)),
1148 None => return Ok(Async::Ready(None)),
1149 }
1150 }
1151 }
1152 }
1153
1154}