ogg/
reading.rs

1// Ogg decoder and encoder written in Rust
2//
3// Copyright (c) 2016-2017 est31 <MTest31@outlook.com>
4// and contributors. All rights reserved.
5// Redistribution or use only under the terms
6// specified in the LICENSE file attached to this
7// source distribution.
8
9/*!
10Reading logic
11*/
12
13use std::error;
14use std::io;
15use std::io::{Cursor, Read, Write, SeekFrom, Error, ErrorKind};
16use byteorder::{ReadBytesExt, LittleEndian};
17use std::collections::HashMap;
18use std::collections::hash_map::Entry;
19use std::fmt::{Display, Formatter, Error as FmtError};
20use std::mem::replace;
21use crc::vorbis_crc32_update;
22use Packet;
23use std::io::Seek;
24
25/// Error that can be raised when decoding an Ogg transport.
26#[derive(Debug)]
27pub enum OggReadError {
28	/// The capture pattern for a new page was not found
29	/// where one was expected.
30	NoCapturePatternFound,
31	/// Invalid stream structure version, with the given one
32	/// attached.
33	InvalidStreamStructVer(u8),
34	/// Mismatch of the hash value with (expected, calculated) value.
35	HashMismatch(u32, u32),
36	/// I/O error occured.
37	ReadError(io::Error),
38	/// Some constraint required by the spec was not met.
39	InvalidData,
40}
41
42impl OggReadError {
43	fn description_str(&self) -> &str {
44		match *self {
45			OggReadError::NoCapturePatternFound => "No Ogg capture pattern found",
46			OggReadError::InvalidStreamStructVer(_) =>
47				"A non zero stream structure version was passed",
48			OggReadError::HashMismatch(_, _) => "CRC32 hash mismatch",
49			OggReadError::ReadError(_) => "I/O error",
50			OggReadError::InvalidData => "Constraint violated",
51		}
52	}
53}
54
55impl error::Error for OggReadError {
56	fn description(&self) -> &str {
57		self.description_str()
58	}
59
60	fn cause(&self) -> Option<&dyn error::Error> {
61		match *self {
62			OggReadError::ReadError(ref err) => Some(err as &dyn error::Error),
63			_ => None
64		}
65	}
66}
67
68impl Display for OggReadError {
69	fn fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError> {
70		write!(fmt, "{}", Self::description_str(self))
71	}
72}
73
74impl From<io::Error> for OggReadError {
75	fn from(err :io::Error) -> OggReadError {
76		return OggReadError::ReadError(err);
77	}
78}
79
80/// Containing information about an OGG page that is shared between multiple places
81struct PageBaseInfo {
82	/// `true`: the first packet is continued from the page before. `false`: if it's a "fresh" one
83	starts_with_continued :bool,
84	/// `true` if this page is the first one in the logical bitstream
85	first_page :bool,
86	/// `true` if this page is the last one in the logical bitstream
87	last_page :bool,
88	/// Absolute granule position. The codec defines further meaning.
89	absgp :u64,
90	/// Page counter
91	sequence_num :u32,
92	/// Packet information:
93	/// index is number of packet,
94	/// tuple is (offset, length) of packet
95	/// if ends_with_continued is true, the last element will contain information
96	/// about the continued packet
97	packet_positions :Vec<(u16,u16)>,
98	/// `true` if the packet is continued in subsequent page(s)
99	/// `false` if the packet has a segment of length < 255 inside this page
100	ends_with_continued :bool,
101}
102
103/// Internal helper struct for PacketReader state
104struct PageInfo {
105	/// Basic information about the last read page
106	bi :PageBaseInfo,
107	/// The index of the first "unread" packet
108	packet_idx :u8,
109	/// Contains the package data
110	page_body :Vec<u8>,
111
112	/// If there is a residue from previous pages in terms of a package spanning multiple
113	/// pages, this field contains it. Having this Vec<Vec<u8>> and
114	/// not Vec<u8> ensures to give us O(n) complexity, not O(n^2)
115	/// for `n` as number of pages that the packet is contained in.
116	last_overlap_pck :Vec<Vec<u8>>,
117}
118
119impl PageInfo {
120	/// Returns `true` if the first "unread" packet is the first one
121	/// in the page, `false` otherwise.
122	fn is_first_pck_in_pg(&self) -> bool {
123		return self.packet_idx == 0;
124	}
125	/// Returns `true` if the first "unread" packet is the last one
126	/// in the page, `false` otherwise.
127	/// If the first "unread" packet isn't completed in this page
128	/// (spans page borders), this returns `false`.
129	fn is_last_pck_in_pg(&self) -> bool {
130		return (self.packet_idx + 1 + (self.bi.ends_with_continued as u8)) as usize
131			== self.bi.packet_positions.len();
132	}
133}
134
135/// Contains a fully parsed OGG page.
136pub struct OggPage(PageParser);
137
138impl OggPage {
139	/// Returns whether there is an ending packet in the page
140	fn has_packet_end(&self) -> bool {
141		(self.0.bi.packet_positions.len() -
142			self.0.bi.ends_with_continued as usize) > 0
143	}
144	/// Returns whether there is a packet that both
145	/// starts and ends inside the page
146	fn has_whole_packet(&self) -> bool {
147		self.0.bi.packet_positions.len().saturating_sub(
148			self.0.bi.ends_with_continued as usize +
149			self.0.bi.starts_with_continued as usize) > 0
150	}
151	/// Returns whether there is a starting packet in the page
152	fn has_packet_start(&self) -> bool {
153		(self.0.bi.packet_positions.len() -
154			self.0.bi.starts_with_continued as usize) > 0
155	}
156}
157
158/**
159Helper struct for parsing pages
160
161It's created using the `new` function and then it's fed more data via the `parse_segments`
162and `parse_packet_data` functions, each called exactly once and in that precise order.
163
164Then later code uses the `OggPage` returned by the `parse_packet_data` function.
165*/
166pub struct PageParser {
167	// Members packet_positions, ends_with_continued and packet_count
168	// get populated after segments have been parsed
169	bi :PageBaseInfo,
170
171	stream_serial :u32,
172	checksum :u32,
173	header_buf: [u8; 27],
174	/// Number of packet ending segments
175	packet_count :u16, // Gets populated gafter segments have been parsed
176	/// after segments have been parsed, this contains the segments buffer,
177	/// after the packet data have been read, this contains the packets buffer.
178	segments_or_packets_buf :Vec<u8>,
179}
180
181impl PageParser {
182	/// Creates a new Page parser
183	///
184	/// The `header_buf` param contains the first 27 bytes of a new OGG page.
185	/// Determining when one begins is your responsibility. Usually they
186	/// begin directly after the end of a previous OGG page, but
187	/// after you've performed a seek you might end up within the middle of a page
188	/// and need to recapture.
189	///
190	/// Returns a page parser, and the requested size of the segments array.
191	/// You should allocate and fill such an array, in order to pass it to the `parse_segments`
192	/// function.
193	pub fn new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError> {
194		let mut header_rdr = Cursor::new(header_buf);
195		header_rdr.set_position(4);
196		let stream_structure_version = tri!(header_rdr.read_u8());
197		if stream_structure_version != 0 {
198			tri!(Err(OggReadError::InvalidStreamStructVer(stream_structure_version)));
199		}
200		let header_type_flag = header_rdr.read_u8().unwrap();
201		let stream_serial;
202
203		Ok((PageParser {
204			bi : PageBaseInfo {
205				starts_with_continued : header_type_flag & 0x01u8 != 0,
206				first_page : header_type_flag & 0x02u8 != 0,
207				last_page : header_type_flag & 0x04u8 != 0,
208				absgp : header_rdr.read_u64::<LittleEndian>().unwrap(),
209				sequence_num : {
210					stream_serial = header_rdr.read_u32::<LittleEndian>().unwrap();
211					header_rdr.read_u32::<LittleEndian>().unwrap()
212				},
213				packet_positions : Vec::new(),
214				ends_with_continued : false,
215			},
216			stream_serial,
217			checksum : header_rdr.read_u32::<LittleEndian>().unwrap(),
218			header_buf,
219			packet_count : 0,
220			segments_or_packets_buf :Vec::new(),
221		},
222			// Number of page segments
223			header_rdr.read_u8().unwrap() as usize
224		))
225	}
226
227	/// Parses the segments buffer, and returns the requested size
228	/// of the packets content array.
229	///
230	/// You should allocate and fill such an array, in order to pass it to the `parse_packet_data`
231	/// function.
232	pub fn parse_segments(&mut self, segments_buf :Vec<u8>) -> usize {
233		let mut page_siz :u16 = 0; // Size of the page's body
234		// Whether our page ends with a continued packet
235		self.bi.ends_with_continued = self.bi.starts_with_continued;
236
237		// First run: get the number of packets,
238		// whether the page ends with a continued packet,
239		// and the size of the page's body
240		for val in &segments_buf {
241			page_siz += *val as u16;
242			// Increment by 1 if val < 255, otherwise by 0
243			self.packet_count += (*val < 255) as u16;
244			self.bi.ends_with_continued = !(*val < 255);
245		}
246
247		let mut packets = Vec::with_capacity(self.packet_count as usize
248			+ self.bi.ends_with_continued as usize);
249		let mut cur_packet_siz :u16 = 0;
250		let mut cur_packet_offs :u16 = 0;
251
252		// Second run: get the offsets of the packets
253		// Not that we need it right now, but it's much more fun this way, am I right
254		for val in &segments_buf {
255			cur_packet_siz += *val as u16;
256			if *val < 255 {
257				packets.push((cur_packet_offs, cur_packet_siz));
258				cur_packet_offs += cur_packet_siz;
259				cur_packet_siz = 0;
260			}
261		}
262		if self.bi.ends_with_continued {
263			packets.push((cur_packet_offs, cur_packet_siz));
264		}
265
266		self.bi.packet_positions = packets;
267		self.segments_or_packets_buf = segments_buf;
268		page_siz as usize
269	}
270
271	/// Parses the packets data and verifies the checksum.
272	///
273	/// Returns an `OggPage` to be used by later code.
274	pub fn parse_packet_data(mut self, packet_data :Vec<u8>) ->
275			Result<OggPage, OggReadError> {
276		// Now to hash calculation.
277		// 1. Clear the header buffer
278		self.header_buf[22] = 0;
279		self.header_buf[23] = 0;
280		self.header_buf[24] = 0;
281		self.header_buf[25] = 0;
282
283		// 2. Calculate the hash
284		let mut hash_calculated :u32;
285		hash_calculated = vorbis_crc32_update(0, &self.header_buf);
286		hash_calculated = vorbis_crc32_update(hash_calculated,
287			&self.segments_or_packets_buf);
288		hash_calculated = vorbis_crc32_update(hash_calculated, &packet_data);
289
290		// 3. Compare to the extracted one
291		if self.checksum != hash_calculated {
292			// Do not verify checksum when the decoder is being fuzzed.
293			// This allows random input from fuzzers reach decoding code that's actually interesting,
294			// instead of being rejected early due to checksum mismatch.
295			if !cfg!(fuzzing) {
296				tri!(Err(OggReadError::HashMismatch(self.checksum, hash_calculated)));
297			}
298		}
299		self.segments_or_packets_buf = packet_data;
300		Ok(OggPage(self))
301	}
302}
303
304/**
305Low level struct for reading from an Ogg stream.
306
307Note that most times you'll want the higher level `PacketReader` struct.
308
309It takes care of most of the internal parsing and logic, you
310will only have to take care of handing over your data.
311
312Essentially, it manages a cache of package data for each logical
313bitstream, and when the cache of every logical bistream is empty,
314it asks for a fresh page. You will then need to feed the struct
315one via the `push_page` function.
316
317All functions on this struct are async ready.
318They get their data fed, instead of calling and blocking
319in order to get it.
320*/
321pub struct BasePacketReader {
322	// TODO the hashmap plus the set is perhaps smart ass perfect design but could be made more performant I guess...
323	// I mean: in > 99% of all cases we'll just have one or two streams.
324	// AND: their setup changes only very rarely.
325
326	/// Contains info about all logical streams that
327	page_infos :HashMap<u32, PageInfo>,
328
329	/// Contains the stream_serial of the stream that contains some unprocessed packet data.
330	/// There is always <= 1, bc if there is one, no new pages will be read, so there is no chance for a second to be added
331	/// None if there is no such stream and one has to read a new page.
332	stream_with_stuff :Option<u32>,
333
334	// Bool that is set to true when a seek of the stream has occured.
335	// This helps validator code to decide whether to accept certain strange data.
336	has_seeked :bool,
337}
338
339impl BasePacketReader {
340	/// Constructs a new blank `BasePacketReader`.
341	///
342	/// You can feed it data using the `push_page` function, and
343	/// obtain data using the `read_packet` function.
344	pub fn new() -> Self {
345		BasePacketReader { page_infos: HashMap::new(),
346			stream_with_stuff: None, has_seeked: false }
347	}
348	/// Extracts a packet from the cache, if the cache contains valid packet data,
349	/// otherwise it returns `None`.
350	///
351	/// If this function returns `None`, you'll need to add a page to the cache
352	/// by using the `push_page` function.
353	pub fn read_packet(&mut self) -> Option<Packet> {
354		if self.stream_with_stuff == None {
355			return None;
356		}
357		let str_serial :u32 = self.stream_with_stuff.unwrap();
358		let pg_info = self.page_infos.get_mut(&str_serial).unwrap();
359		let (offs, len) = pg_info.bi.packet_positions[pg_info.packet_idx as usize];
360		// If there is a continued packet, and we are at the start right now,
361		// and we actually have its end in the current page, glue it together.
362		let need_to_glue = pg_info.packet_idx == 0 &&
363				pg_info.bi.starts_with_continued &&
364				!(pg_info.bi.ends_with_continued && pg_info.bi.packet_positions.len() == 1);
365		let packet_content :Vec<u8> = if need_to_glue {
366			// First find out the size of our spanning packet
367			let mut siz :usize = 0;
368			for pck in pg_info.last_overlap_pck.iter() {
369				siz += pck.len();
370			}
371			siz += len as usize;
372			let mut cont :Vec<u8> = Vec::with_capacity(siz);
373
374			// Then do the copying
375			for pck in pg_info.last_overlap_pck.iter() {
376				cont.write_all(pck).unwrap();
377			}
378			// Now reset the overlap container again
379			pg_info.last_overlap_pck = Vec::new();
380			cont.write_all(&pg_info.page_body[offs as usize .. (offs + len) as usize]).unwrap();
381
382			cont
383		} else {
384			let mut cont :Vec<u8> = Vec::with_capacity(len as usize);
385			// TODO The copy below is totally unneccessary. It is only needed so that we don't have to carry around the old Vec's.
386			// TODO get something like the shared_slice crate for RefCells, so that we can also have mutable data, shared through
387			// slices.
388			let cont_slice :&[u8] = &pg_info.page_body[offs as usize .. (offs + len) as usize];
389			cont.write_all(cont_slice).unwrap();
390			cont
391		};
392
393		let first_pck_in_pg = pg_info.is_first_pck_in_pg();
394		let first_pck_overall = pg_info.bi.first_page && first_pck_in_pg;
395
396		let last_pck_in_pg = pg_info.is_last_pck_in_pg();
397		let last_pck_overall = pg_info.bi.last_page && last_pck_in_pg;
398
399		// Update the last read index.
400		pg_info.packet_idx += 1;
401		// Set stream_with_stuff to None so that future packet reads
402		// yield a page read first
403		if last_pck_in_pg {
404			self.stream_with_stuff = None;
405		}
406
407		return Some(Packet {
408			data: packet_content,
409			first_packet_pg: first_pck_in_pg,
410			first_packet_stream: first_pck_overall,
411			last_packet_pg: last_pck_in_pg,
412			last_packet_stream: last_pck_overall,
413			absgp_page: pg_info.bi.absgp,
414			stream_serial: str_serial,
415		});
416	}
417
418	/// Pushes a given Ogg page, updating the internal structures
419	/// with its contents.
420	///
421	/// If you want the code to function properly, you should first call
422	/// `parse_segments`, then `parse_packet_data` on a `PageParser`
423	/// before passing the resulting `OggPage` to this function.
424	pub fn push_page(&mut self, page :OggPage) -> Result<(), OggReadError> {
425		let mut pg_prs = page.0;
426		match self.page_infos.entry(pg_prs.stream_serial) {
427			Entry::Occupied(mut o) => {
428				let inf = o.get_mut();
429				if pg_prs.bi.first_page {
430					tri!(Err(OggReadError::InvalidData));
431				}
432				if pg_prs.bi.starts_with_continued != inf.bi.ends_with_continued {
433					if !self.has_seeked {
434						tri!(Err(OggReadError::InvalidData));
435					} else {
436						// If we have seeked, we are more tolerant here,
437						// and just drop the continued packet's content.
438
439						inf.last_overlap_pck.clear();
440						if pg_prs.bi.starts_with_continued {
441							pg_prs.bi.packet_positions.remove(0);
442							if pg_prs.packet_count != 0 {
443								// Decrease packet count by one. Normal case.
444								pg_prs.packet_count -= 1;
445							} else {
446								// If the packet count is 0, this means
447								// that we start and end with the same continued packet.
448								// So now as we ignore that packet, we must clear the
449								// ends_with_continued state as well.
450								pg_prs.bi.ends_with_continued = false;
451							}
452						}
453					}
454				} else if pg_prs.bi.starts_with_continued {
455					// Remember the packet at the end so that it can be glued together once
456					// we encounter the next segment with length < 255 (doesnt have to be in this page)
457					let (offs, len) = inf.bi.packet_positions[inf.packet_idx as usize];
458					if len as usize != inf.page_body.len() {
459						let mut tmp = Vec::with_capacity(len as usize);
460						tmp.write_all(&inf.page_body[offs as usize .. (offs + len) as usize]).unwrap();
461						inf.last_overlap_pck.push(tmp);
462					} else {
463						// Little optimisation: don't copy if not neccessary
464						inf.last_overlap_pck.push(replace(&mut inf.page_body, vec![0;0]));
465					}
466
467				}
468				inf.bi = pg_prs.bi;
469				inf.packet_idx = 0;
470				inf.page_body = pg_prs.segments_or_packets_buf;
471			},
472			Entry::Vacant(v) => {
473				if !self.has_seeked {
474					if !pg_prs.bi.first_page || pg_prs.bi.starts_with_continued {
475						// If we haven't seeked, this is an error.
476						tri!(Err(OggReadError::InvalidData));
477					}
478				} else {
479					if !pg_prs.bi.first_page {
480						// we can just ignore this.
481					}
482					if pg_prs.bi.starts_with_continued {
483						// Ignore the continued packet's content.
484						// This is a normal occurence if we have just seeked.
485						pg_prs.bi.packet_positions.remove(0);
486						if pg_prs.packet_count != 0 {
487							// Decrease packet count by one. Normal case.
488							pg_prs.packet_count -= 1;
489						} else {
490							// If the packet count is 0, this means
491							// that we start and end with the same continued packet.
492							// So now as we ignore that packet, we must clear the
493							// ends_with_continued state as well.
494							pg_prs.bi.ends_with_continued = false;
495						}
496						// Not actually needed, but good for consistency
497						pg_prs.bi.starts_with_continued = false;
498					}
499				}
500				v.insert(PageInfo {
501					bi : pg_prs.bi,
502					packet_idx: 0,
503					page_body: pg_prs.segments_or_packets_buf,
504					last_overlap_pck: Vec::new(),
505				});
506			},
507		}
508		let pg_has_stuff :bool = pg_prs.packet_count > 0;
509
510		if pg_has_stuff {
511			self.stream_with_stuff = Some(pg_prs.stream_serial);
512		} else {
513			self.stream_with_stuff = None;
514		}
515
516		return Ok(());
517	}
518
519	/// Reset the internal state after a seek
520	///
521	/// It flushes the cache so that no partial data is left inside.
522	/// It also tells the parsing logic to expect little inconsistencies
523	/// due to the read position not being at the start.
524	pub fn update_after_seek(&mut self) {
525		self.stream_with_stuff = None;
526		self.page_infos = HashMap::new();
527		self.has_seeked = true;
528	}
529}
530
531#[derive(Clone, Copy)]
532enum UntilPageHeaderReaderMode {
533	Searching,
534	FoundWithNeeded(u8),
535	SeekNeeded(i32),
536	Found,
537}
538
539enum UntilPageHeaderResult {
540	Eof,
541	Found,
542	ReadNeeded,
543	SeekNeeded,
544}
545
546struct UntilPageHeaderReader {
547	mode :UntilPageHeaderReaderMode,
548	/// Capture pattern offset. Needed so that if we only partially
549	/// recognized the capture pattern, we later on only check the
550	/// remaining part.
551	cpt_of :u8,
552	/// The return buffer.
553	ret_buf :[u8; 27],
554	read_amount :usize,
555}
556
557impl UntilPageHeaderReader {
558	pub fn new() -> Self {
559		UntilPageHeaderReader {
560			mode : UntilPageHeaderReaderMode::Searching,
561			cpt_of : 0,
562			ret_buf : [0; 27],
563			read_amount : 0,
564		}
565	}
566	/// Returns Some(off), where off is the offset of the last byte
567	/// of the capture pattern if it's found, None if the capture pattern
568	/// is not inside the passed slice.
569	///
570	/// Changes the capture pattern offset accordingly
571	fn check_arr(&mut self, arr :&[u8]) -> Option<usize> {
572		for (i, ch) in arr.iter().enumerate() {
573			match *ch {
574				b'O' => self.cpt_of = 1,
575				b'g' if self.cpt_of == 1 || self.cpt_of == 2 => self.cpt_of += 1,
576				b'S' if self.cpt_of == 3 => return Some(i),
577				_ => self.cpt_of = 0,
578			}
579		}
580		return None;
581	}
582	/// Do one read exactly, and if it was successful,
583	/// return Ok(true) if the full header has been read and can be extracted with
584	///
585	/// or return Ok(false) if the
586	pub fn do_read<R :Read>(&mut self, mut rdr :R)
587			-> Result<UntilPageHeaderResult, OggReadError> {
588		use self::UntilPageHeaderReaderMode::*;
589		use self::UntilPageHeaderResult as Res;
590		// The array's size is freely choseable, but must be > 27,
591		// and must well fit into an i32 (needs to be stored in SeekNeeded)
592		let mut buf :[u8; 1024] = [0; 1024];
593
594		let rd_len = tri!(rdr.read(if self.read_amount < 27 {
595			// This is an optimisation for the most likely case:
596			// the next page directly follows the current read position.
597			// Then it would be a waste to read more than the needed amount.
598			&mut buf[0 .. 27 - self.read_amount]
599		} else {
600			match self.mode {
601				Searching => &mut buf,
602				FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
603				SeekNeeded(_) => return Ok(Res::SeekNeeded),
604				Found => return Ok(Res::Found),
605			}
606		}));
607
608		if rd_len == 0 {
609			// Reached EOF.
610			if self.read_amount == 0 {
611				// If we have read nothing yet, there is no data
612				// but ogg data, meaning the stream ends legally
613				// and without corruption.
614				return Ok(Res::Eof);
615			} else {
616				// There is most likely a corruption here.
617				// I'm not sure, but the ogg spec doesn't say that
618				// random data past the last ogg page is allowed,
619				// so we just assume it's not allowed.
620				tri!(Err(OggReadError::NoCapturePatternFound));
621			}
622		}
623		self.read_amount += rd_len;
624
625		// 150 kb gives us a bit of safety: we can survive
626		// up to one page with a corrupted capture pattern
627		// after having seeked right after a capture pattern
628		// of an earlier page.
629		let read_amount_max = 150 * 1024;
630		if self.read_amount > read_amount_max {
631			// Exhaustive searching for the capture pattern
632			// has returned no ogg capture pattern.
633			tri!(Err(OggReadError::NoCapturePatternFound));
634		}
635
636		let rd_buf = &buf[0 .. rd_len];
637
638		use std::cmp::min;
639		let (off, needed) = match self.mode {
640			Searching => match self.check_arr(rd_buf) {
641				// Capture pattern found
642				Some(off) => {
643					self.ret_buf[0] = b'O';
644					self.ret_buf[1] = b'g';
645					self.ret_buf[2] = b'g';
646					self.ret_buf[3] = b'S'; // (Not actually needed)
647					(off, 24)
648				},
649				// Nothing found
650				None => return Ok(Res::ReadNeeded),
651			},
652			FoundWithNeeded(needed) => {
653				(0, needed as usize)
654			},
655			_ => unimplemented!(),
656		};
657
658		let fnd_buf = &rd_buf[off..];
659
660		let copy_amount = min(needed, fnd_buf.len());
661		let start_fill = 27 - needed;
662		(&mut self.ret_buf[start_fill .. copy_amount + start_fill])
663				.copy_from_slice(&fnd_buf[0 .. copy_amount]);
664		if fnd_buf.len() == needed {
665			// Capture pattern found!
666			self.mode = Found;
667			return Ok(Res::Found);
668		} else if fnd_buf.len() < needed {
669			// We still have to read some content.
670			let needed_new = needed - copy_amount;
671			self.mode = FoundWithNeeded(needed_new as u8);
672			return Ok(Res::ReadNeeded);
673		} else {
674			// We have read too much content (exceeding the header).
675			// Seek back so that we are at the position
676			// right after the header.
677
678			self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
679			return Ok(Res::SeekNeeded);
680		}
681	}
682	pub fn do_seek<S :Seek>(&mut self, mut skr :S)
683			-> Result<UntilPageHeaderResult, OggReadError> {
684		use self::UntilPageHeaderReaderMode::*;
685		use self::UntilPageHeaderResult as Res;
686		match self.mode {
687			Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
688			SeekNeeded(offs) => {
689				tri!(skr.seek(SeekFrom::Current(offs as i64)));
690				self.mode = Found;
691				Ok(Res::Found)
692			},
693			Found => Ok(Res::Found),
694		}
695	}
696	pub fn into_header(self) -> [u8; 27] {
697		use self::UntilPageHeaderReaderMode::*;
698		match self.mode {
699			Found => self.ret_buf,
700			_ => panic!("wrong mode"),
701		}
702	}
703}
704
705/**
706Reader for packets from an Ogg stream.
707
708This reads codec packets belonging to several different logical streams from one physical Ogg container stream.
709
710This reader is not async ready. It does not keep its internal state
711consistent when it encounters the `WouldBlock` error kind.
712If you desire async functionality, consider enabling the `async` feature
713and look into the async module.
714*/
715pub struct PacketReader<T :io::Read + io::Seek> {
716	rdr :T,
717
718	base_pck_rdr :BasePacketReader,
719}
720
721impl<T :io::Read + io::Seek> PacketReader<T> {
722	/// Constructs a new `PacketReader` with a given `Read`.
723	pub fn new(rdr :T) -> PacketReader<T> {
724		PacketReader { rdr, base_pck_rdr : BasePacketReader::new() }
725	}
726	/// Returns the wrapped reader, consuming the `PacketReader`.
727	pub fn into_inner(self) -> T {
728		self.rdr
729	}
730	/// Reads a packet, and returns it on success.
731	///
732	/// Ok(None) is returned if the physical stream has ended.
733	pub fn read_packet(&mut self) -> Result<Option<Packet>, OggReadError> {
734		// Read pages until we got a valid entire packet
735		// (packets may span multiple pages, so reading one page
736		// doesn't always suffice to give us a valid packet)
737		loop {
738			if let Some(pck) = self.base_pck_rdr.read_packet() {
739				return Ok(Some(pck));
740			}
741			let page = tri!(self.read_ogg_page());
742			match page {
743				Some(page) => tri!(self.base_pck_rdr.push_page(page)),
744				None => return Ok(None),
745			}
746		}
747	}
748	/// Reads a packet, and returns it on success.
749	///
750	/// The difference to the `read_packet` function is that this function
751	/// returns an Err(_) if the physical stream has ended.
752	/// This function is useful if you expect a new packet to come.
753	pub fn read_packet_expected(&mut self) -> Result<Packet, OggReadError> {
754		match tri!(self.read_packet()) {
755			Some(p) => Ok(p),
756			None => tri!(Err(Error::new(ErrorKind::UnexpectedEof,
757				"Expected ogg packet but found end of physical stream"))),
758		}
759	}
760
761	/// Reads until the new page header, and then returns the page header array.
762	///
763	/// If no new page header is immediately found, it performs a "recapture",
764	/// meaning it searches for the capture pattern, and if it finds it, it
765	/// reads the complete first 27 bytes of the header, and returns them.
766	///
767	/// Ok(None) is returned if the stream has ended without an uncompleted page
768	/// or non page data after the last page (if any) present.
769	fn read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError> {
770		let mut r = UntilPageHeaderReader::new();
771		use self::UntilPageHeaderResult::*;
772		let mut res = tri!(r.do_read(&mut self.rdr));
773		loop {
774			res = match res {
775				Eof => return Ok(None),
776				Found => break,
777				ReadNeeded => tri!(r.do_read(&mut self.rdr)),
778				SeekNeeded => tri!(r.do_seek(&mut self.rdr))
779			}
780		}
781		Ok(Some(r.into_header()))
782	}
783
784	/// Parses and reads a new OGG page
785	///
786	/// To support seeking this does not assume that the capture pattern
787	/// is at the current reader position.
788	/// Instead it searches until it finds the capture pattern.
789	fn read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError> {
790		let header_buf :[u8; 27] = match tri!(self.read_until_pg_header()) {
791			Some(s) => s,
792			None => return Ok(None)
793		};
794		let (mut pg_prs, page_segments) = tri!(PageParser::new(header_buf));
795
796		let mut segments_buf = vec![0; page_segments]; // TODO fix this, we initialize memory for NOTHING!!! Out of some reason, this is seen as "unsafe" by rustc.
797		tri!(self.rdr.read_exact(&mut segments_buf));
798
799		let page_siz = pg_prs.parse_segments(segments_buf);
800
801		let mut packet_data = vec![0; page_siz as usize];
802		tri!(self.rdr.read_exact(&mut packet_data));
803
804		Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
805	}
806
807	/// Seeks the underlying reader
808	///
809	/// Seeks the reader that this PacketReader bases on by the specified
810	/// number of bytes. All new pages will be read from the new position.
811	///
812	/// This also flushes all the unread packets in the queue.
813	pub fn seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error> {
814		let r = tri!(self.rdr.seek(pos));
815		// Reset the internal state
816		self.base_pck_rdr.update_after_seek();
817		return Ok(r);
818	}
819
820	/// Seeks to absolute granule pos
821	///
822	/// More specifically, it seeks to the first Ogg page
823	/// that has an `absgp` greater or equal to the specified one.
824	/// In the case of continued packets, the seek operation may also end up
825	/// at the last page that comes before such a page and has a packet start.
826	///
827	/// The passed `stream_serial` parameter controls the stream
828	/// serial number to filter our search for. If it's `None`, no
829	/// filtering is applied, but if it is `Some(n)`, we filter for
830	/// streams with the serial number `n`.
831	/// Note that the `None` case is only intended for streams
832	/// where only one logical stream exists, the seek may misbehave
833	/// if `Ǹone` gets passed when multiple streams exist.
834	///
835	/// The returned bool indicates whether the seek was successful.
836	pub fn seek_absgp(&mut self, stream_serial :Option<u32>,
837			pos_goal :u64) -> Result<bool, OggReadError> {
838		macro_rules! found {
839			($pos:expr) => {{
840				// println!("found: {}", $pos);
841				tri!(self.rdr.seek(SeekFrom::Start($pos)));
842				self.base_pck_rdr.update_after_seek();
843				return Ok(true);
844			}};
845		}
846		macro_rules! bt {
847			($e:expr) => {{
848				match tri!($e) {
849					Some(s) => s,
850					None => return Ok(false),
851				}
852			}};
853		}
854		// The task of this macro is to read to the
855		// end of the logical stream. For optimisation reasons,
856		// it returns early if we found our goal
857		// or any page past it.
858		macro_rules! pg_read_until_end_or_goal {
859			{$goal:expr} => {{
860				let mut pos;
861				let mut pg;
862				loop {
863					let (n_pos, n_pg) = pg_read_match_serial!();
864					pos = n_pos;
865					pg = n_pg;
866					// If the absgp matches our goal, the seek process is done.
867					// This is a nice shortcut as we don't need to perform
868					// the remainder of the seek process any more.
869					// Of course, an exact match only happens in the fewest
870					// of cases
871					if pg.0.bi.absgp == $goal {
872						found!(pos);
873					}
874					// If we found a page past our goal, we already
875					// found a position that can serve as end post of the search.
876					if pg.0.bi.absgp > $goal {
877						break;
878					}
879					// Stop the search if the stream has ended.
880					if pg.0.bi.last_page {
881						return Ok(false)
882					}
883					// If the page is not interesting, seek over it.
884				}
885				(pos, pg)
886			}};
887		}
888		macro_rules! pg_read_match_serial {
889			{} => {{
890				let mut pos;
891				let mut pg;
892				let mut continued_pck_start = None;
893				loop {
894					pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
895					pg = bt!(self.read_ogg_page());
896					/*println!("absgp {} serial {} wh {} pe {} @ {}",
897						pg.0.bi.absgp, pg.0.bi.sequence_num,
898						pg.has_whole_packet(), pg.has_packet_end(), pos);// */
899
900					match stream_serial {
901						// Continue the search if we encounter a
902						// page with a different stream serial
903						Some(s) if pg.0.stream_serial != s => (),
904						_ => match continued_pck_start {
905								None if pg.has_whole_packet() => break,
906								None if pg.has_packet_start() => {
907									continued_pck_start = Some(pos);
908								},
909								Some(s) if pg.has_packet_end() => {
910									// We have remembered a packet start,
911									// and have just encountered a packet end.
912									// Return the position of the start with the
913									// info from the end (for the absgp).
914									pos = s;
915									break;
916								},
917								_ => (),
918						},
919					}
920				}
921				(pos, pg)
922			}};
923		}
924
925		// Bisect seeking algo.
926		// Start by finding boundaries, e.g. at the start and
927		// end of the file, then bisect those boundaries successively
928		// until a page is found.
929
930		//println!("seek start. goal = {}", pos_goal);
931		let ab_of = |pg :&OggPage| { pg.0.bi.absgp };
932		let seq_of = |pg :&OggPage| { pg.0.bi.sequence_num };
933
934		// First, find initial "boundaries"
935		// Seek to the start of the file to get the starting boundary
936		tri!(self.rdr.seek(SeekFrom::Start(0)));
937		let (mut begin_pos, mut begin_pg) = pg_read_match_serial!();
938
939		// If the goal is the beginning, we are done.
940		if pos_goal == 0 {
941			//println!("Seeking to the beginning of the stream - skipping bisect.");
942			found!(begin_pos);
943		}
944
945		// Seek to the end of the file to get the ending boundary
946		// TODO the 200 KB is just a guessed number, any ideas
947		// to improve it?
948		tri!(seek_before_end(&mut self.rdr, 200 * 1024));
949		let (mut end_pos, mut end_pg) = pg_read_until_end_or_goal!(pos_goal);
950
951		// Then perform the bisection
952		loop {
953			// Search is done if the two limits are the same page,
954			// or consecutive pages.
955			if seq_of(&end_pg) - seq_of(&begin_pg) <= 1 {
956				found!(end_pos);
957			}
958			// Perform the bisection step
959			let pos_to_seek = begin_pos + (end_pos - begin_pos) / 2;
960			tri!(self.rdr.seek(SeekFrom::Start(pos_to_seek)));
961			let (pos, pg) = pg_read_match_serial!();
962			/*println!("seek {} {} . {} @ {} {} . {}",
963				ab_of(&begin_pg), ab_of(&end_pg), ab_of(&pg),
964				begin_pos, end_pos, pos);// */
965
966			if seq_of(&end_pg) == seq_of(&pg) ||
967					seq_of(&begin_pg) == seq_of(&pg) {
968				//println!("switching to linear.");
969				// The bisection seek doesn't bring us any further.
970				// Switch to a linear seek to get the last details.
971				let mut pos;
972				let mut pg;
973				let mut last_packet_end_pos = begin_pos;
974				tri!(self.rdr.seek(SeekFrom::Start(begin_pos)));
975				loop {
976					pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
977					pg = bt!(self.read_ogg_page());
978					/*println!("absgp {} pck_start {} whole_pck {} pck_end {} @ {} {}",
979						ab_of(&pg), pg.has_packet_start(), pg.has_whole_packet(),
980						pg.has_packet_end(),
981						pos, last_packet_end_pos);// */
982					match stream_serial {
983						// Continue the search if we encounter a
984						// page with a different stream serial,
985						// or one with an absgp of -1.
986						Some(s) if pg.0.stream_serial != s => (),
987						_ if ab_of(&pg) == -1i64 as u64 => (),
988						// The page is found if the absgp is >= our goal
989						_ if ab_of(&pg) >= pos_goal => found!(last_packet_end_pos),
990						// If we encounter a page with a packet start,
991						// update accordingly.
992						_ => if pg.has_packet_end() {
993							last_packet_end_pos = pos;
994						},
995					}
996				}
997			}
998			if ab_of(&pg) >= pos_goal {
999				end_pos = pos;
1000				end_pg = pg;
1001			} else {
1002				begin_pos = pos;
1003				begin_pg = pg;
1004			}
1005		}
1006	}
1007	/// Resets the internal state by deleting all
1008	/// unread packets.
1009	pub fn delete_unread_packets(&mut self) {
1010		self.base_pck_rdr.update_after_seek();
1011	}
1012}
1013
1014// util function
1015fn seek_before_end<T :io::Read + io::Seek>(mut rdr :T,
1016		offs :u64) -> Result<u64, OggReadError> {
1017	let end_pos = tri!(rdr.seek(SeekFrom::End(0)));
1018	let end_pos_to_seek = ::std::cmp::min(end_pos, offs);
1019	return Ok(tri!(rdr.seek(SeekFrom::End(-(end_pos_to_seek as i64)))));
1020}
1021
1022#[cfg(feature = "async")]
1023/**
1024Asyncronous ogg decoding
1025*/
1026pub mod async_api {
1027	#![allow(deprecated)]
1028
1029	use super::*;
1030	use tokio_io::AsyncRead;
1031	use tokio_io::codec::{Decoder, FramedRead};
1032	use futures::stream::Stream;
1033	use futures::{Async, Poll};
1034	use bytes::BytesMut;
1035
1036	enum PageDecodeState {
1037		Head,
1038		Segments(PageParser, usize),
1039		PacketData(PageParser, usize),
1040		InUpdate,
1041	}
1042
1043	impl PageDecodeState {
1044		fn needed_size(&self) -> usize {
1045			match self {
1046				&PageDecodeState::Head => 27,
1047				&PageDecodeState::Segments(_, s) => s,
1048				&PageDecodeState::PacketData(_, s) => s,
1049				&PageDecodeState::InUpdate => panic!("invalid state"),
1050			}
1051		}
1052	}
1053
1054	/**
1055	Async page reading functionality.
1056	*/
1057	struct PageDecoder {
1058		state : PageDecodeState,
1059	}
1060
1061	impl PageDecoder {
1062		fn new() -> Self {
1063			PageDecoder {
1064				state : PageDecodeState::Head,
1065			}
1066		}
1067	}
1068
1069	impl Decoder for PageDecoder {
1070		type Item = OggPage;
1071		type Error = OggReadError;
1072
1073		fn decode(&mut self, buf :&mut BytesMut) ->
1074				Result<Option<OggPage>, OggReadError> {
1075			use self::PageDecodeState::*;
1076			loop {
1077				let needed_size = self.state.needed_size();
1078				if buf.len() < needed_size {
1079					return Ok(None);
1080				}
1081				let mut ret = None;
1082				let consumed_buf = buf.split_to(needed_size).to_vec();
1083
1084				self.state = match ::std::mem::replace(&mut self.state, InUpdate) {
1085					Head => {
1086						let mut hdr_buf = [0; 27];
1087						// TODO once we have const generics, the copy below can be done
1088						// much nicer, maybe with a new into_array fn on Vec's
1089						hdr_buf.copy_from_slice(&consumed_buf);
1090						let tup = tri!(PageParser::new(hdr_buf));
1091						Segments(tup.0, tup.1)
1092					},
1093					Segments(mut pg_prs, _) => {
1094						let new_needed_len = pg_prs.parse_segments(consumed_buf);
1095						PacketData(pg_prs, new_needed_len)
1096					},
1097					PacketData(pg_prs, _) => {
1098						ret = Some(tri!(pg_prs.parse_packet_data(consumed_buf)));
1099						Head
1100					},
1101					InUpdate => panic!("invalid state"),
1102				};
1103				if ret.is_some() {
1104					return Ok(ret);
1105				}
1106			}
1107		}
1108
1109		fn decode_eof(&mut self, buf :&mut BytesMut) ->
1110				Result<Option<OggPage>, OggReadError> {
1111			// Ugly hack for "bytes remaining on stream" error
1112			return self.decode(buf);
1113		}
1114	}
1115
1116	/**
1117	Async packet reading functionality.
1118	*/
1119	pub struct PacketReader<T> where T :AsyncRead {
1120		base_pck_rdr :BasePacketReader,
1121		pg_rd :FramedRead<T, PageDecoder>,
1122	}
1123
1124	impl<T :AsyncRead> PacketReader<T> {
1125		pub fn new(inner :T) -> Self {
1126			PacketReader {
1127				base_pck_rdr : BasePacketReader::new(),
1128				pg_rd : FramedRead::new(inner, PageDecoder::new()),
1129			}
1130		}
1131	}
1132
1133	impl<T :AsyncRead> Stream for PacketReader<T> {
1134		type Item = Packet;
1135		type Error = OggReadError;
1136
1137		fn poll(&mut self) -> Poll<Option<Packet>, OggReadError> {
1138			// Read pages until we got a valid entire packet
1139			// (packets may span multiple pages, so reading one page
1140			// doesn't always suffice to give us a valid packet)
1141			loop {
1142				if let Some(pck) = self.base_pck_rdr.read_packet() {
1143					return Ok(Async::Ready(Some(pck)));
1144				}
1145				let page = try_ready!(self.pg_rd.poll());
1146				match page {
1147					Some(page) => tri!(self.base_pck_rdr.push_page(page)),
1148					None => return Ok(Async::Ready(None)),
1149				}
1150			}
1151		}
1152	}
1153
1154}