1use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
2use super::encode::{
3 BrotliEncoderCompressStream, BrotliEncoderCreateInstance, BrotliEncoderDestroyInstance,
4 BrotliEncoderMaxCompressedSize, BrotliEncoderOperation,
5 BrotliEncoderSetCustomDictionaryWithOptionalPrecomputedHasher, HasherSetup, SanitizeParams,
6};
7use super::BrotliAlloc;
8use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
9use concat::{BroCatli, BroCatliResult};
10use core::any;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Range;
14#[cfg(feature = "std")]
15use std;
16pub type PoisonedThreadError = ();
17
18#[cfg(feature = "std")]
19pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
20#[cfg(not(feature = "std"))]
21pub type LowLevelThreadError = ();
22
23pub trait AnyBoxConstructor {
24 fn new(data: LowLevelThreadError) -> Self;
25}
26
27pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
28 fn join(self) -> Result<T, U>;
29}
30#[derive(Debug)]
31pub enum BrotliEncoderThreadError {
32 InsufficientOutputSpace,
33 ConcatenationDidNotProcessFullFile,
34 ConcatenationError(BroCatliResult),
35 ConcatenationFinalizationError(BroCatliResult),
36 OtherThreadPanic,
37 ThreadExecError(LowLevelThreadError),
38}
39
40impl AnyBoxConstructor for BrotliEncoderThreadError {
41 fn new(data: LowLevelThreadError) -> Self {
42 BrotliEncoderThreadError::ThreadExecError(data)
43 }
44}
45
46pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
47where
48 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
49{
50 data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
51 data_size: usize,
52}
53pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
54where
55 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
56{
57 compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
58 alloc: Alloc,
59}
60pub enum InternalSendAlloc<
61 ReturnVal: Send + 'static,
62 ExtraInput: Send + 'static,
63 Alloc: BrotliAlloc + Send + 'static,
64 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
65> where
66 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
67{
68 A(Alloc, ExtraInput),
69 Join(Join),
70 SpawningOrJoining(PhantomData<ReturnVal>),
71}
72impl<
73 ReturnVal: Send + 'static,
74 ExtraInput: Send + 'static,
75 Alloc: BrotliAlloc + Send + 'static,
76 Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
77 > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
78where
79 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
80{
81 fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
82 match *self {
83 InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
84 _ => panic!("Bad state for allocator"),
85 }
86 }
87}
88
89pub struct SendAlloc<
90 ReturnValue: Send + 'static,
91 ExtraInput: Send + 'static,
92 Alloc: BrotliAlloc + Send + 'static,
93 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
94>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
95where
97 <Alloc as Allocator<u8>>::AllocatedMemory: Send;
98
99impl<
100 ReturnValue: Send + 'static,
101 ExtraInput: Send + 'static,
102 Alloc: BrotliAlloc + Send + 'static,
103 Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
104 > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
105where
106 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
107{
108 pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
109 SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
110 }
111 pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
112 match self.0 {
113 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
114 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
115 (other, other_extra)
116 }
117 }
118 }
119 fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
120 match self.0 {
121 InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
122 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
123 panic!("Item permanently borrowed/leaked")
124 }
125 }
126 }
127 pub fn unwrap(self) -> (Alloc, ExtraInput) {
128 match self.0 {
129 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
130 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
131 panic!("Item permanently borrowed/leaked")
132 }
133 }
134 }
135 pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
136 match mem::replace(
137 &mut self.0,
138 InternalSendAlloc::SpawningOrJoining(PhantomData),
139 ) {
140 InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
141 InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
142 panic!("Item permanently borrowed/leaked")
143 }
144 }
145 }
146}
147
148pub enum InternalOwned<T> {
149 Item(T),
151 Borrowed,
152}
153
154pub struct Owned<T>(pub InternalOwned<T>); impl<T> Owned<T> {
156 pub fn new(data: T) -> Self {
157 Owned::<T>(InternalOwned::Item(data))
158 }
159 pub fn unwrap_or(self, other: T) -> T {
160 if let InternalOwned::Item(x) = self.0 {
161 x
162 } else {
163 other
164 }
165 }
166 pub fn unwrap(self) -> T {
167 if let InternalOwned::Item(x) = self.0 {
168 x
169 } else {
170 panic!("Item permanently borrowed")
171 }
172 }
173 pub fn view(&self) -> &T {
174 if let InternalOwned::Item(ref x) = self.0 {
175 x
176 } else {
177 panic!("Item permanently borrowed")
178 }
179 }
180}
181
182pub trait OwnedRetriever<U: Send + 'static> {
183 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
184 fn unwrap(self) -> Result<U, PoisonedThreadError>;
185}
186
187#[cfg(feature = "std")]
188impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
189 fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
190 match self.read() {
191 Ok(ref u) => Ok(f(u)),
192 Err(_) => Err(PoisonedThreadError::default()),
193 }
194 }
195 fn unwrap(self) -> Result<U, PoisonedThreadError> {
196 match std::sync::Arc::try_unwrap(self) {
197 Ok(rwlock) => match rwlock.into_inner() {
198 Ok(u) => Ok(u),
199 Err(_) => Err(PoisonedThreadError::default()),
200 },
201 Err(_) => Err(PoisonedThreadError::default()),
202 }
203 }
204}
205
206pub trait BatchSpawnable<
207 ReturnValue: Send + 'static,
208 ExtraInput: Send + 'static,
209 Alloc: BrotliAlloc + Send + 'static,
210 U: Send + 'static + Sync,
211> where
212 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
213{
214 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
215 type FinalJoinHandle: OwnedRetriever<U>;
216 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
225 fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
226 &mut self,
227 handle: &mut Self::FinalJoinHandle,
228 alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
229 index: usize,
230 num_threads: usize,
231 f: F,
232 );
233}
234
235pub trait BatchSpawnableLite<
236 ReturnValue: Send + 'static,
237 ExtraInput: Send + 'static,
238 Alloc: BrotliAlloc + Send + 'static,
239 U: Send + 'static + Sync,
240> where
241 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
242{
243 type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
244 type FinalJoinHandle: OwnedRetriever<U>;
245 fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
246 fn spawn(
247 &mut self,
248 handle: &mut Self::FinalJoinHandle,
249 alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
250 index: usize,
251 num_threads: usize,
252 f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
253 );
254}
255pub fn CompressMultiSlice<
274 Alloc: BrotliAlloc + Send + 'static,
275 Spawner: BatchSpawnableLite<
276 CompressionThreadResult<Alloc>,
277 UnionHasher<Alloc>,
278 Alloc,
279 (
280 <Alloc as Allocator<u8>>::AllocatedMemory,
281 BrotliEncoderParams,
282 ),
283 >,
284>(
285 params: &BrotliEncoderParams,
286 input_slice: &[u8],
287 output: &mut [u8],
288 alloc_per_thread: &mut [SendAlloc<
289 CompressionThreadResult<Alloc>,
290 UnionHasher<Alloc>,
291 Alloc,
292 Spawner::JoinHandle,
293 >],
294 thread_spawner: &mut Spawner,
295) -> Result<usize, BrotliEncoderThreadError>
296where
297 <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
298 <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
299 <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
300{
301 let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
302 let mut input = <Alloc as Allocator<u8>>::alloc_cell(alloc, input_slice.len());
303 input.slice_mut().clone_from_slice(input_slice);
304 input
305 } else {
306 <Alloc as Allocator<u8>>::AllocatedMemory::default()
307 };
308 let mut owned_input = Owned::new(input);
309 let ret = CompressMulti(
310 params,
311 &mut owned_input,
312 output,
313 alloc_per_thread,
314 thread_spawner,
315 );
316 if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
317 <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
318 }
319 ret
320}
321
322fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
323 ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
324}
325
326fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
327 hasher: UnionHasher<Alloc>,
328 thread_index: usize,
329 num_threads: usize,
330 input_and_params: &(SliceW, BrotliEncoderParams),
331 mut alloc: Alloc,
332) -> CompressionThreadResult<Alloc>
333where
334 <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
335{
336 let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
337 let mut mem = <Alloc as Allocator<u8>>::alloc_cell(
338 &mut alloc,
339 BrotliEncoderMaxCompressedSize(range.end - range.start),
340 );
341 let mut state = BrotliEncoderCreateInstance(alloc);
342 state.params = input_and_params.1.clone();
343 if thread_index != 0 {
344 state.params.catable = true; state.params.magic_number = false; }
347 state.params.appendable = true; if thread_index != 0 {
349 BrotliEncoderSetCustomDictionaryWithOptionalPrecomputedHasher(
350 &mut state,
351 range.start,
352 &input_and_params.0.slice()[..range.start],
353 hasher,
354 );
355 }
356 let mut out_offset = 0usize;
357 let compression_result;
358 let mut available_out = mem.len();
359 loop {
360 let mut next_in_offset = 0usize;
361 let mut available_in = range.end - range.start;
362 let result = BrotliEncoderCompressStream(
363 &mut state,
364 BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
365 &mut available_in,
366 &input_and_params.0.slice()[range.clone()],
367 &mut next_in_offset,
368 &mut available_out,
369 mem.slice_mut(),
370 &mut out_offset,
371 &mut None,
372 &mut |_a, _b, _c, _d| (),
373 );
374 let new_range = range.start + next_in_offset..range.end;
375 range = new_range;
376 if result != 0 {
377 compression_result = Ok(out_offset);
378 break;
379 } else if available_out == 0 {
380 compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); break;
382 }
383 }
384 BrotliEncoderDestroyInstance(&mut state);
385 match compression_result {
386 Ok(size) => CompressionThreadResult::<Alloc> {
387 compressed: Ok(CompressedFileChunk {
388 data_backing: mem,
389 data_size: size,
390 }),
391 alloc: state.m8,
392 },
393 Err(e) => {
394 <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
395 CompressionThreadResult::<Alloc> {
396 compressed: Err(e),
397 alloc: state.m8,
398 }
399 }
400 }
401}
402
403pub fn CompressMulti<
404 Alloc: BrotliAlloc + Send + 'static,
405 SliceW: SliceWrapper<u8> + Send + 'static + Sync,
406 Spawner: BatchSpawnableLite<
407 CompressionThreadResult<Alloc>,
408 UnionHasher<Alloc>,
409 Alloc,
410 (SliceW, BrotliEncoderParams),
411 >,
412>(
413 params: &BrotliEncoderParams,
414 owned_input: &mut Owned<SliceW>,
415 output: &mut [u8],
416 alloc_per_thread: &mut [SendAlloc<
417 CompressionThreadResult<Alloc>,
418 UnionHasher<Alloc>,
419 Alloc,
420 Spawner::JoinHandle,
421 >],
422 thread_spawner: &mut Spawner,
423) -> Result<usize, BrotliEncoderThreadError>
424where
425 <Alloc as Allocator<u8>>::AllocatedMemory: Send,
426 <Alloc as Allocator<u16>>::AllocatedMemory: Send,
427 <Alloc as Allocator<u32>>::AllocatedMemory: Send,
428{
429 let num_threads = alloc_per_thread.len();
430 let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
431 let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
432 let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
434 if num_threads > 1 {
435 thread_spawner.spawn(
437 &mut spawner_and_input,
438 &mut alloc_per_thread[0],
439 0,
440 num_threads,
441 compress_part,
442 );
443 }
444 let mut compression_last_thread_result;
446 if num_threads > 1 && params.favor_cpu_efficiency {
447 let mut local_params = params.clone();
448 SanitizeParams(&mut local_params);
449 let mut hasher = UnionHasher::Uninit;
450 HasherSetup(
451 alloc_per_thread[num_threads - 1].0.unwrap_input().0,
452 &mut hasher,
453 &mut local_params,
454 &[],
455 0,
456 0,
457 0,
458 );
459 for thread_index in 1..num_threads {
460 let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
461 let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
462 let overlap = hasher.StoreLookahead().wrapping_sub(1);
463 if range.end - range.start > overlap {
464 hasher.BulkStoreRange(
465 input_and_params.0.slice(),
466 !(0),
467 if range.start > overlap {
468 range.start - overlap
469 } else {
470 0
471 },
472 range.end - overlap,
473 );
474 }
475 });
476 if let Err(_e) = res {
477 return Err(BrotliEncoderThreadError::OtherThreadPanic);
478 }
479 if thread_index + 1 != num_threads {
480 {
481 let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
482 *out_hasher = hasher.clone_with_alloc(alloc);
483 }
484 thread_spawner.spawn(
485 &mut spawner_and_input,
486 &mut alloc_per_thread[thread_index],
487 thread_index,
488 num_threads,
489 compress_part,
490 );
491 }
492 }
493 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
494 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
495 compress_part(hasher,
496 num_threads - 1,
497 num_threads,
498 input_and_params,
499 alloc,
500 )
501 });
502 } else {
503 if num_threads > 1 {
504 for thread_index in 1..num_threads - 1 {
505 thread_spawner.spawn(
506 &mut spawner_and_input,
507 &mut alloc_per_thread[thread_index],
508 thread_index,
509 num_threads,
510 compress_part,
511 );
512 }
513 }
514 let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
515 compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
516 compress_part(UnionHasher::Uninit,
517 num_threads - 1,
518 num_threads,
519 input_and_params,
520 alloc,
521 )
522 });
523 }
524 let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
525 let mut out_file_size = 0usize;
526 let mut bro_cat_li = BroCatli::new();
527 for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
528 let mut cur_result = if index + 1 == num_threads {
529 match mem::replace(&mut compression_last_thread_result, Err(())) {
530 Ok(result) => result,
531 Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic),
532 }
533 } else {
534 match mem::replace(
535 &mut thread.0,
536 InternalSendAlloc::SpawningOrJoining(PhantomData),
537 ) {
538 InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
539 panic!("Thread not properly spawned")
540 }
541 InternalSendAlloc::Join(join) => match join.join() {
542 Ok(result) => result,
543 Err(err) => {
544 return Err(err);
545 }
546 },
547 }
548 };
549 match cur_result.compressed {
550 Ok(compressed_out) => {
551 bro_cat_li.new_brotli_file();
552 let mut in_offset = 0usize;
553 let cat_result = bro_cat_li.stream(
554 &compressed_out.data_backing.slice()[..compressed_out.data_size],
555 &mut in_offset,
556 output,
557 &mut out_file_size,
558 );
559 match cat_result {
560 BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
561 compression_result = Ok(out_file_size);
562 }
563 BroCatliResult::NeedsMoreOutput => {
564 compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
565 }
567 err => {
568 compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err));
569 }
571 }
572 <Alloc as Allocator<u8>>::free_cell(
573 &mut cur_result.alloc,
574 compressed_out.data_backing,
575 );
576 }
577 Err(e) => {
578 compression_result = Err(e);
579 }
580 }
581 thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
582 }
583 compression_result?;
584 match bro_cat_li.finish(output, &mut out_file_size) {
585 BroCatliResult::Success => compression_result = Ok(out_file_size),
586 err => {
587 compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
588 err,
589 ))
590 }
591 }
592 if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
593 *owned_input = Owned::new(retrieved_owned_input.0); } else if compression_result.is_ok() {
595 compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
596 }
597 compression_result
598}