brotli/enc/
threading.rs

1use super::backward_references::{AnyHasher, BrotliEncoderParams, CloneWithAlloc, UnionHasher};
2use super::encode::{
3    BrotliEncoderCompressStream, BrotliEncoderCreateInstance, BrotliEncoderDestroyInstance,
4    BrotliEncoderMaxCompressedSize, BrotliEncoderOperation,
5    BrotliEncoderSetCustomDictionaryWithOptionalPrecomputedHasher, HasherSetup, SanitizeParams,
6};
7use super::BrotliAlloc;
8use alloc::{Allocator, SliceWrapper, SliceWrapperMut};
9use concat::{BroCatli, BroCatliResult};
10use core::any;
11use core::marker::PhantomData;
12use core::mem;
13use core::ops::Range;
14#[cfg(feature = "std")]
15use std;
16pub type PoisonedThreadError = ();
17
18#[cfg(feature = "std")]
19pub type LowLevelThreadError = std::boxed::Box<dyn any::Any + Send + 'static>;
20#[cfg(not(feature = "std"))]
21pub type LowLevelThreadError = ();
22
23pub trait AnyBoxConstructor {
24    fn new(data: LowLevelThreadError) -> Self;
25}
26
27pub trait Joinable<T: Send + 'static, U: Send + 'static>: Sized {
28    fn join(self) -> Result<T, U>;
29}
30#[derive(Debug)]
31pub enum BrotliEncoderThreadError {
32    InsufficientOutputSpace,
33    ConcatenationDidNotProcessFullFile,
34    ConcatenationError(BroCatliResult),
35    ConcatenationFinalizationError(BroCatliResult),
36    OtherThreadPanic,
37    ThreadExecError(LowLevelThreadError),
38}
39
40impl AnyBoxConstructor for BrotliEncoderThreadError {
41    fn new(data: LowLevelThreadError) -> Self {
42        BrotliEncoderThreadError::ThreadExecError(data)
43    }
44}
45
46pub struct CompressedFileChunk<Alloc: BrotliAlloc + Send + 'static>
47where
48    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
49{
50    data_backing: <Alloc as Allocator<u8>>::AllocatedMemory,
51    data_size: usize,
52}
53pub struct CompressionThreadResult<Alloc: BrotliAlloc + Send + 'static>
54where
55    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
56{
57    compressed: Result<CompressedFileChunk<Alloc>, BrotliEncoderThreadError>,
58    alloc: Alloc,
59}
60pub enum InternalSendAlloc<
61    ReturnVal: Send + 'static,
62    ExtraInput: Send + 'static,
63    Alloc: BrotliAlloc + Send + 'static,
64    Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
65> where
66    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
67{
68    A(Alloc, ExtraInput),
69    Join(Join),
70    SpawningOrJoining(PhantomData<ReturnVal>),
71}
72impl<
73        ReturnVal: Send + 'static,
74        ExtraInput: Send + 'static,
75        Alloc: BrotliAlloc + Send + 'static,
76        Join: Joinable<ReturnVal, BrotliEncoderThreadError>,
77    > InternalSendAlloc<ReturnVal, ExtraInput, Alloc, Join>
78where
79    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
80{
81    fn unwrap_input(&mut self) -> (&mut Alloc, &mut ExtraInput) {
82        match *self {
83            InternalSendAlloc::A(ref mut alloc, ref mut extra) => (alloc, extra),
84            _ => panic!("Bad state for allocator"),
85        }
86    }
87}
88
89pub struct SendAlloc<
90    ReturnValue: Send + 'static,
91    ExtraInput: Send + 'static,
92    Alloc: BrotliAlloc + Send + 'static,
93    Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
94>(pub InternalSendAlloc<ReturnValue, ExtraInput, Alloc, Join>)
95//FIXME pub
96where
97    <Alloc as Allocator<u8>>::AllocatedMemory: Send;
98
99impl<
100        ReturnValue: Send + 'static,
101        ExtraInput: Send + 'static,
102        Alloc: BrotliAlloc + Send + 'static,
103        Join: Joinable<ReturnValue, BrotliEncoderThreadError>,
104    > SendAlloc<ReturnValue, ExtraInput, Alloc, Join>
105where
106    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
107{
108    pub fn new(alloc: Alloc, extra_input: ExtraInput) -> Self {
109        SendAlloc::<ReturnValue, ExtraInput, Alloc, Join>(InternalSendAlloc::A(alloc, extra_input))
110    }
111    pub fn unwrap_or(self, other: Alloc, other_extra: ExtraInput) -> (Alloc, ExtraInput) {
112        match self.0 {
113            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
114            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
115                (other, other_extra)
116            }
117        }
118    }
119    fn unwrap_view_mut(&mut self) -> (&mut Alloc, &mut ExtraInput) {
120        match self.0 {
121            InternalSendAlloc::A(ref mut alloc, ref mut extra_input) => (alloc, extra_input),
122            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
123                panic!("Item permanently borrowed/leaked")
124            }
125        }
126    }
127    pub fn unwrap(self) -> (Alloc, ExtraInput) {
128        match self.0 {
129            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
130            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
131                panic!("Item permanently borrowed/leaked")
132            }
133        }
134    }
135    pub fn replace_with_default(&mut self) -> (Alloc, ExtraInput) {
136        match mem::replace(
137            &mut self.0,
138            InternalSendAlloc::SpawningOrJoining(PhantomData),
139        ) {
140            InternalSendAlloc::A(alloc, extra_input) => (alloc, extra_input),
141            InternalSendAlloc::SpawningOrJoining(_) | InternalSendAlloc::Join(_) => {
142                panic!("Item permanently borrowed/leaked")
143            }
144        }
145    }
146}
147
148pub enum InternalOwned<T> {
149    // FIXME pub
150    Item(T),
151    Borrowed,
152}
153
154pub struct Owned<T>(pub InternalOwned<T>); // FIXME pub
155impl<T> Owned<T> {
156    pub fn new(data: T) -> Self {
157        Owned::<T>(InternalOwned::Item(data))
158    }
159    pub fn unwrap_or(self, other: T) -> T {
160        if let InternalOwned::Item(x) = self.0 {
161            x
162        } else {
163            other
164        }
165    }
166    pub fn unwrap(self) -> T {
167        if let InternalOwned::Item(x) = self.0 {
168            x
169        } else {
170            panic!("Item permanently borrowed")
171        }
172    }
173    pub fn view(&self) -> &T {
174        if let InternalOwned::Item(ref x) = self.0 {
175            x
176        } else {
177            panic!("Item permanently borrowed")
178        }
179    }
180}
181
182pub trait OwnedRetriever<U: Send + 'static> {
183    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError>;
184    fn unwrap(self) -> Result<U, PoisonedThreadError>;
185}
186
187#[cfg(feature = "std")]
188impl<U: Send + 'static> OwnedRetriever<U> for std::sync::Arc<std::sync::RwLock<U>> {
189    fn view<T, F: FnOnce(&U) -> T>(&self, f: F) -> Result<T, PoisonedThreadError> {
190        match self.read() {
191            Ok(ref u) => Ok(f(u)),
192            Err(_) => Err(PoisonedThreadError::default()),
193        }
194    }
195    fn unwrap(self) -> Result<U, PoisonedThreadError> {
196        match std::sync::Arc::try_unwrap(self) {
197            Ok(rwlock) => match rwlock.into_inner() {
198                Ok(u) => Ok(u),
199                Err(_) => Err(PoisonedThreadError::default()),
200            },
201            Err(_) => Err(PoisonedThreadError::default()),
202        }
203    }
204}
205
206pub trait BatchSpawnable<
207    ReturnValue: Send + 'static,
208    ExtraInput: Send + 'static,
209    Alloc: BrotliAlloc + Send + 'static,
210    U: Send + 'static + Sync,
211> where
212    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
213{
214    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
215    type FinalJoinHandle: OwnedRetriever<U>;
216    // this function takes in an input slice
217    // a SendAlloc per thread and converts them all into JoinHandle
218    // the input is borrowed until the joins complete
219    // owned is set to borrowed
220    // the final join handle is a r/w lock which will return the SliceW to the owner
221    // the FinalJoinHandle is only to be called when each individual JoinHandle has been examined
222    // the function is called with the thread_index, the num_threads, a reference to the slice under a read lock,
223    // and an allocator from the alloc_per_thread
224    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
225    fn spawn<F: Fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue + Send + 'static + Copy>(
226        &mut self,
227        handle: &mut Self::FinalJoinHandle,
228        alloc: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
229        index: usize,
230        num_threads: usize,
231        f: F,
232    );
233}
234
235pub trait BatchSpawnableLite<
236    ReturnValue: Send + 'static,
237    ExtraInput: Send + 'static,
238    Alloc: BrotliAlloc + Send + 'static,
239    U: Send + 'static + Sync,
240> where
241    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
242{
243    type JoinHandle: Joinable<ReturnValue, BrotliEncoderThreadError>;
244    type FinalJoinHandle: OwnedRetriever<U>;
245    fn make_spawner(&mut self, input: &mut Owned<U>) -> Self::FinalJoinHandle;
246    fn spawn(
247        &mut self,
248        handle: &mut Self::FinalJoinHandle,
249        alloc_per_thread: &mut SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>,
250        index: usize,
251        num_threads: usize,
252        f: fn(ExtraInput, usize, usize, &U, Alloc) -> ReturnValue,
253    );
254}
255/*
256impl<ReturnValue:Send+'static,
257     ExtraInput:Send+'static,
258     Alloc:BrotliAlloc+Send+'static,
259     U:Send+'static+Sync>
260     BatchSpawnableLite<T, Alloc, U> for BatchSpawnable<T, Alloc, U> {
261  type JoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::JoinHandle;
262  type FinalJoinHandle = <Self as BatchSpawnable<T, Alloc, U>>::FinalJoinHandle;
263  fn batch_spawn(
264    &mut self,
265    input: &mut Owned<U>,
266    alloc_per_thread:&mut [SendAlloc<ReturnValue, ExtraInput, Alloc, Self::JoinHandle>],
267    f: fn(usize, usize, &U, Alloc) -> T,
268  ) -> Self::FinalJoinHandle {
269   <Self as BatchSpawnable<ReturnValue, ExtraInput,  Alloc, U>>::batch_spawn(self, input, alloc_per_thread, f)
270  }
271}*/
272
273pub fn CompressMultiSlice<
274    Alloc: BrotliAlloc + Send + 'static,
275    Spawner: BatchSpawnableLite<
276        CompressionThreadResult<Alloc>,
277        UnionHasher<Alloc>,
278        Alloc,
279        (
280            <Alloc as Allocator<u8>>::AllocatedMemory,
281            BrotliEncoderParams,
282        ),
283    >,
284>(
285    params: &BrotliEncoderParams,
286    input_slice: &[u8],
287    output: &mut [u8],
288    alloc_per_thread: &mut [SendAlloc<
289        CompressionThreadResult<Alloc>,
290        UnionHasher<Alloc>,
291        Alloc,
292        Spawner::JoinHandle,
293    >],
294    thread_spawner: &mut Spawner,
295) -> Result<usize, BrotliEncoderThreadError>
296where
297    <Alloc as Allocator<u8>>::AllocatedMemory: Send + Sync,
298    <Alloc as Allocator<u16>>::AllocatedMemory: Send + Sync,
299    <Alloc as Allocator<u32>>::AllocatedMemory: Send + Sync,
300{
301    let input = if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
302        let mut input = <Alloc as Allocator<u8>>::alloc_cell(alloc, input_slice.len());
303        input.slice_mut().clone_from_slice(input_slice);
304        input
305    } else {
306        <Alloc as Allocator<u8>>::AllocatedMemory::default()
307    };
308    let mut owned_input = Owned::new(input);
309    let ret = CompressMulti(
310        params,
311        &mut owned_input,
312        output,
313        alloc_per_thread,
314        thread_spawner,
315    );
316    if let InternalSendAlloc::A(ref mut alloc, ref _extra) = alloc_per_thread[0].0 {
317        <Alloc as Allocator<u8>>::free_cell(alloc, owned_input.unwrap());
318    }
319    ret
320}
321
322fn get_range(thread_index: usize, num_threads: usize, file_size: usize) -> Range<usize> {
323    ((thread_index * file_size) / num_threads)..(((thread_index + 1) * file_size) / num_threads)
324}
325
326fn compress_part<Alloc: BrotliAlloc + Send + 'static, SliceW: SliceWrapper<u8>>(
327    hasher: UnionHasher<Alloc>,
328    thread_index: usize,
329    num_threads: usize,
330    input_and_params: &(SliceW, BrotliEncoderParams),
331    mut alloc: Alloc,
332) -> CompressionThreadResult<Alloc>
333where
334    <Alloc as Allocator<u8>>::AllocatedMemory: Send + 'static,
335{
336    let mut range = get_range(thread_index, num_threads, input_and_params.0.len());
337    let mut mem = <Alloc as Allocator<u8>>::alloc_cell(
338        &mut alloc,
339        BrotliEncoderMaxCompressedSize(range.end - range.start),
340    );
341    let mut state = BrotliEncoderCreateInstance(alloc);
342    state.params = input_and_params.1.clone();
343    if thread_index != 0 {
344        state.params.catable = true; // make sure we can concatenate this to the other work results
345        state.params.magic_number = false; // no reason to pepper this around
346    }
347    state.params.appendable = true; // make sure we are at least appendable, so that future items can be catted in
348    if thread_index != 0 {
349        BrotliEncoderSetCustomDictionaryWithOptionalPrecomputedHasher(
350            &mut state,
351            range.start,
352            &input_and_params.0.slice()[..range.start],
353            hasher,
354        );
355    }
356    let mut out_offset = 0usize;
357    let compression_result;
358    let mut available_out = mem.len();
359    loop {
360        let mut next_in_offset = 0usize;
361        let mut available_in = range.end - range.start;
362        let result = BrotliEncoderCompressStream(
363            &mut state,
364            BrotliEncoderOperation::BROTLI_OPERATION_FINISH,
365            &mut available_in,
366            &input_and_params.0.slice()[range.clone()],
367            &mut next_in_offset,
368            &mut available_out,
369            mem.slice_mut(),
370            &mut out_offset,
371            &mut None,
372            &mut |_a, _b, _c, _d| (),
373        );
374        let new_range = range.start + next_in_offset..range.end;
375        range = new_range;
376        if result != 0 {
377            compression_result = Ok(out_offset);
378            break;
379        } else if available_out == 0 {
380            compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace); // mark no space??
381            break;
382        }
383    }
384    BrotliEncoderDestroyInstance(&mut state);
385    match compression_result {
386        Ok(size) => CompressionThreadResult::<Alloc> {
387            compressed: Ok(CompressedFileChunk {
388                data_backing: mem,
389                data_size: size,
390            }),
391            alloc: state.m8,
392        },
393        Err(e) => {
394            <Alloc as Allocator<u8>>::free_cell(&mut state.m8, mem);
395            CompressionThreadResult::<Alloc> {
396                compressed: Err(e),
397                alloc: state.m8,
398            }
399        }
400    }
401}
402
403pub fn CompressMulti<
404    Alloc: BrotliAlloc + Send + 'static,
405    SliceW: SliceWrapper<u8> + Send + 'static + Sync,
406    Spawner: BatchSpawnableLite<
407        CompressionThreadResult<Alloc>,
408        UnionHasher<Alloc>,
409        Alloc,
410        (SliceW, BrotliEncoderParams),
411    >,
412>(
413    params: &BrotliEncoderParams,
414    owned_input: &mut Owned<SliceW>,
415    output: &mut [u8],
416    alloc_per_thread: &mut [SendAlloc<
417        CompressionThreadResult<Alloc>,
418        UnionHasher<Alloc>,
419        Alloc,
420        Spawner::JoinHandle,
421    >],
422    thread_spawner: &mut Spawner,
423) -> Result<usize, BrotliEncoderThreadError>
424where
425    <Alloc as Allocator<u8>>::AllocatedMemory: Send,
426    <Alloc as Allocator<u16>>::AllocatedMemory: Send,
427    <Alloc as Allocator<u32>>::AllocatedMemory: Send,
428{
429    let num_threads = alloc_per_thread.len();
430    let actually_owned_mem = mem::replace(owned_input, Owned(InternalOwned::Borrowed));
431    let mut owned_input_pair = Owned::new((actually_owned_mem.unwrap(), params.clone()));
432    // start thread spawner
433    let mut spawner_and_input = thread_spawner.make_spawner(&mut owned_input_pair);
434    if num_threads > 1 {
435        // spawn first thread without "custom dictionary" while we compute the custom dictionary for other work items
436        thread_spawner.spawn(
437            &mut spawner_and_input,
438            &mut alloc_per_thread[0],
439            0,
440            num_threads,
441            compress_part,
442        );
443    }
444    // populate all hashers at once, cloning them one by one
445    let mut compression_last_thread_result;
446    if num_threads > 1 && params.favor_cpu_efficiency {
447        let mut local_params = params.clone();
448        SanitizeParams(&mut local_params);
449        let mut hasher = UnionHasher::Uninit;
450        HasherSetup(
451            alloc_per_thread[num_threads - 1].0.unwrap_input().0,
452            &mut hasher,
453            &mut local_params,
454            &[],
455            0,
456            0,
457            0,
458        );
459        for thread_index in 1..num_threads {
460            let res = spawner_and_input.view(|input_and_params: &(SliceW, BrotliEncoderParams)| {
461                let range = get_range(thread_index - 1, num_threads, input_and_params.0.len());
462                let overlap = hasher.StoreLookahead().wrapping_sub(1);
463                if range.end - range.start > overlap {
464                    hasher.BulkStoreRange(
465                        input_and_params.0.slice(),
466                        !(0),
467                        if range.start > overlap {
468                            range.start - overlap
469                        } else {
470                            0
471                        },
472                        range.end - overlap,
473                    );
474                }
475            });
476            if let Err(_e) = res {
477                return Err(BrotliEncoderThreadError::OtherThreadPanic);
478            }
479            if thread_index + 1 != num_threads {
480                {
481                    let (alloc, out_hasher) = alloc_per_thread[thread_index].unwrap_view_mut();
482                    *out_hasher = hasher.clone_with_alloc(alloc);
483                }
484                thread_spawner.spawn(
485                    &mut spawner_and_input,
486                    &mut alloc_per_thread[thread_index],
487                    thread_index,
488                    num_threads,
489                    compress_part,
490                );
491            }
492        }
493        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
494        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
495        compress_part(hasher,
496                      num_threads - 1,
497                      num_threads,
498                      input_and_params,
499                      alloc,
500        )
501      });
502    } else {
503        if num_threads > 1 {
504            for thread_index in 1..num_threads - 1 {
505                thread_spawner.spawn(
506                    &mut spawner_and_input,
507                    &mut alloc_per_thread[thread_index],
508                    thread_index,
509                    num_threads,
510                    compress_part,
511                );
512            }
513        }
514        let (alloc, _extra) = alloc_per_thread[num_threads - 1].replace_with_default();
515        compression_last_thread_result = spawner_and_input.view(move |input_and_params:&(SliceW, BrotliEncoderParams)| -> CompressionThreadResult<Alloc> {
516        compress_part(UnionHasher::Uninit,
517                      num_threads - 1,
518                      num_threads,
519                      input_and_params,
520                      alloc,
521        )
522      });
523    }
524    let mut compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
525    let mut out_file_size = 0usize;
526    let mut bro_cat_li = BroCatli::new();
527    for (index, thread) in alloc_per_thread.iter_mut().enumerate() {
528        let mut cur_result = if index + 1 == num_threads {
529            match mem::replace(&mut compression_last_thread_result, Err(())) {
530                Ok(result) => result,
531                Err(_err) => return Err(BrotliEncoderThreadError::OtherThreadPanic),
532            }
533        } else {
534            match mem::replace(
535                &mut thread.0,
536                InternalSendAlloc::SpawningOrJoining(PhantomData),
537            ) {
538                InternalSendAlloc::A(_, _) | InternalSendAlloc::SpawningOrJoining(_) => {
539                    panic!("Thread not properly spawned")
540                }
541                InternalSendAlloc::Join(join) => match join.join() {
542                    Ok(result) => result,
543                    Err(err) => {
544                        return Err(err);
545                    }
546                },
547            }
548        };
549        match cur_result.compressed {
550            Ok(compressed_out) => {
551                bro_cat_li.new_brotli_file();
552                let mut in_offset = 0usize;
553                let cat_result = bro_cat_li.stream(
554                    &compressed_out.data_backing.slice()[..compressed_out.data_size],
555                    &mut in_offset,
556                    output,
557                    &mut out_file_size,
558                );
559                match cat_result {
560                    BroCatliResult::Success | BroCatliResult::NeedsMoreInput => {
561                        compression_result = Ok(out_file_size);
562                    }
563                    BroCatliResult::NeedsMoreOutput => {
564                        compression_result = Err(BrotliEncoderThreadError::InsufficientOutputSpace);
565                        // not enough space
566                    }
567                    err => {
568                        compression_result = Err(BrotliEncoderThreadError::ConcatenationError(err));
569                        // misc error
570                    }
571                }
572                <Alloc as Allocator<u8>>::free_cell(
573                    &mut cur_result.alloc,
574                    compressed_out.data_backing,
575                );
576            }
577            Err(e) => {
578                compression_result = Err(e);
579            }
580        }
581        thread.0 = InternalSendAlloc::A(cur_result.alloc, UnionHasher::Uninit);
582    }
583    compression_result?;
584    match bro_cat_li.finish(output, &mut out_file_size) {
585        BroCatliResult::Success => compression_result = Ok(out_file_size),
586        err => {
587            compression_result = Err(BrotliEncoderThreadError::ConcatenationFinalizationError(
588                err,
589            ))
590        }
591    }
592    if let Ok(retrieved_owned_input) = spawner_and_input.unwrap() {
593        *owned_input = Owned::new(retrieved_owned_input.0); // return the input to its rightful owner before returning
594    } else if compression_result.is_ok() {
595        compression_result = Err(BrotliEncoderThreadError::OtherThreadPanic);
596    }
597    compression_result
598}