Home | History | Annotate | Download | only in zip
      1 /*
      2  *  Licensed to the Apache Software Foundation (ASF) under one or more
      3  *  contributor license agreements.  See the NOTICE file distributed with
      4  *  this work for additional information regarding copyright ownership.
      5  *  The ASF licenses this file to You under the Apache License, Version 2.0
      6  *  (the "License"); you may not use this file except in compliance with
      7  *  the License.  You may obtain a copy of the License at
      8  *
      9  *      http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  *
     17  */
     18 package org.apache.commons.compress.archivers.zip;
     19 
     20 import org.apache.commons.compress.parallel.FileBasedScatterGatherBackingStore;
     21 import org.apache.commons.compress.parallel.InputStreamSupplier;
     22 import org.apache.commons.compress.parallel.ScatterGatherBackingStore;
     23 import org.apache.commons.compress.parallel.ScatterGatherBackingStoreSupplier;
     24 
     25 import java.io.File;
     26 import java.io.IOException;
     27 import java.util.ArrayList;
     28 import java.util.List;
     29 import java.util.concurrent.Callable;
     30 import java.util.concurrent.ExecutionException;
     31 import java.util.concurrent.ExecutorService;
     32 import java.util.concurrent.Executors;
     33 import java.util.concurrent.Future;
     34 import java.util.concurrent.TimeUnit;
     35 import java.util.concurrent.atomic.AtomicInteger;
     36 import java.util.zip.Deflater;
     37 
     38 import static java.util.Collections.synchronizedList;
     39 import static org.apache.commons.compress.archivers.zip.ZipArchiveEntryRequest.createZipArchiveEntryRequest;
     40 
     41 /**
     42  * Creates a zip in parallel by using multiple threadlocal {@link ScatterZipOutputStream} instances.
     43  * <p>
     44  * Note that this class generally makes no guarantees about the order of things written to
     45  * the output file. Things that need to come in a specific order (manifests, directories)
     46  * must be handled by the client of this class, usually by writing these things to the
     47  * {@link ZipArchiveOutputStream} <em>before</em> calling {@link #writeTo writeTo} on this class.</p>
     48  * <p>
     49  * The client can supply an {@link java.util.concurrent.ExecutorService}, but for reasons of
     50  * memory model consistency, this will be shut down by this class prior to completion.
     51  * </p>
     52  * @since 1.10
     53  */
     54 public class ParallelScatterZipCreator {
     55     private final List<ScatterZipOutputStream> streams = synchronizedList(new ArrayList<ScatterZipOutputStream>());
     56     private final ExecutorService es;
     57     private final ScatterGatherBackingStoreSupplier backingStoreSupplier;
     58     private final List<Future<Object>> futures = new ArrayList<>();
     59 
     60     private final long startedAt = System.currentTimeMillis();
     61     private long compressionDoneAt = 0;
     62     private long scatterDoneAt;
     63 
     64     private static class DefaultBackingStoreSupplier implements ScatterGatherBackingStoreSupplier {
     65         final AtomicInteger storeNum = new AtomicInteger(0);
     66 
     67         @Override
     68         public ScatterGatherBackingStore get() throws IOException {
     69             final File tempFile = File.createTempFile("parallelscatter", "n" + storeNum.incrementAndGet());
     70             return new FileBasedScatterGatherBackingStore(tempFile);
     71         }
     72     }
     73 
     74     private ScatterZipOutputStream createDeferred(final ScatterGatherBackingStoreSupplier scatterGatherBackingStoreSupplier)
     75             throws IOException {
     76         final ScatterGatherBackingStore bs = scatterGatherBackingStoreSupplier.get();
     77         // lifecycle is bound to the ScatterZipOutputStream returned
     78         final StreamCompressor sc = StreamCompressor.create(Deflater.DEFAULT_COMPRESSION, bs); //NOSONAR
     79         return new ScatterZipOutputStream(bs, sc);
     80     }
     81 
     82     private final ThreadLocal<ScatterZipOutputStream> tlScatterStreams = new ThreadLocal<ScatterZipOutputStream>() {
     83         @Override
     84         protected ScatterZipOutputStream initialValue() {
     85             try {
     86                 final ScatterZipOutputStream scatterStream = createDeferred(backingStoreSupplier);
     87                 streams.add(scatterStream);
     88                 return scatterStream;
     89             } catch (final IOException e) {
     90                 throw new RuntimeException(e); //NOSONAR
     91             }
     92         }
     93     };
     94 
     95     /**
     96      * Create a ParallelScatterZipCreator with default threads, which is set to the number of available
     97      * processors, as defined by {@link java.lang.Runtime#availableProcessors}
     98      */
     99     public ParallelScatterZipCreator() {
    100         this(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
    101     }
    102 
    103     /**
    104      * Create a ParallelScatterZipCreator
    105      *
    106      * @param executorService The executorService to use for parallel scheduling. For technical reasons,
    107      *                        this will be shut down by this class.
    108      */
    109     public ParallelScatterZipCreator(final ExecutorService executorService) {
    110         this(executorService, new DefaultBackingStoreSupplier());
    111     }
    112 
    113     /**
    114      * Create a ParallelScatterZipCreator
    115      *
    116      * @param executorService The executorService to use. For technical reasons, this will be shut down
    117      *                        by this class.
    118      * @param backingStoreSupplier The supplier of backing store which shall be used
    119      */
    120     public ParallelScatterZipCreator(final ExecutorService executorService,
    121                                      final ScatterGatherBackingStoreSupplier backingStoreSupplier) {
    122         this.backingStoreSupplier = backingStoreSupplier;
    123         es = executorService;
    124     }
    125 
    126     /**
    127      * Adds an archive entry to this archive.
    128      * <p>
    129      * This method is expected to be called from a single client thread
    130      * </p>
    131      *
    132      * @param zipArchiveEntry The entry to add.
    133      * @param source          The source input stream supplier
    134      */
    135 
    136     public void addArchiveEntry(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
    137         submit(createCallable(zipArchiveEntry, source));
    138     }
    139 
    140     /**
    141      * Adds an archive entry to this archive.
    142      * <p>
    143      * This method is expected to be called from a single client thread
    144      * </p>
    145      *
    146      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
    147      * @since 1.13
    148      */
    149     public void addArchiveEntry(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
    150         submit(createCallable(zipArchiveEntryRequestSupplier));
    151     }
    152 
    153     /**
    154      * Submit a callable for compression.
    155      *
    156      * @see ParallelScatterZipCreator#createCallable for details of if/when to use this.
    157      *
    158      * @param callable The callable to run, created by {@link #createCallable createCallable}, possibly wrapped by caller.
    159      */
    160     public final void submit(final Callable<Object> callable) {
    161         futures.add(es.submit(callable));
    162     }
    163 
    164     /**
    165      * Create a callable that will compress the given archive entry.
    166      *
    167      * <p>This method is expected to be called from a single client thread.</p>
    168      *
    169      * Consider using {@link #addArchiveEntry addArchiveEntry}, which wraps this method and {@link #submit submit}.
    170      * The most common use case for using {@link #createCallable createCallable} and {@link #submit submit} from a
    171      * client is if you want to wrap the callable in something that can be prioritized by the supplied
    172      * {@link ExecutorService}, for instance to process large or slow files first.
    173      * Since the creation of the {@link ExecutorService} is handled by the client, all of this is up to the client.
    174      *
    175      * @param zipArchiveEntry The entry to add.
    176      * @param source          The source input stream supplier
    177      * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
    178      * value of this callable is not used, but any exceptions happening inside the compression
    179      * will be propagated through the callable.
    180      */
    181 
    182     public final Callable<Object> createCallable(final ZipArchiveEntry zipArchiveEntry, final InputStreamSupplier source) {
    183         final int method = zipArchiveEntry.getMethod();
    184         if (method == ZipMethod.UNKNOWN_CODE) {
    185             throw new IllegalArgumentException("Method must be set on zipArchiveEntry: " + zipArchiveEntry);
    186         }
    187         final ZipArchiveEntryRequest zipArchiveEntryRequest = createZipArchiveEntryRequest(zipArchiveEntry, source);
    188         return new Callable<Object>() {
    189             @Override
    190             public Object call() throws Exception {
    191                 tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequest);
    192                 return null;
    193             }
    194         };
    195     }
    196 
    197     /**
    198      * Create a callable that will compress archive entry supplied by {@link ZipArchiveEntryRequestSupplier}.
    199      *
    200      * <p>This method is expected to be called from a single client thread.</p>
    201      *
    202      * The same as {@link #createCallable(ZipArchiveEntry, InputStreamSupplier)}, but the archive entry
    203      * to be added is supplied by a {@link ZipArchiveEntryRequestSupplier}.
    204      *
    205      * @see #createCallable(ZipArchiveEntry, InputStreamSupplier)
    206      *
    207      * @param zipArchiveEntryRequestSupplier Should supply the entry to be added.
    208      * @return A callable that should subsequently passed to #submit, possibly in a wrapped/adapted from. The
    209      * value of this callable is not used, but any exceptions happening inside the compression
    210      * will be propagated through the callable.
    211      * @since 1.13
    212      */
    213     public final Callable<Object> createCallable(final ZipArchiveEntryRequestSupplier zipArchiveEntryRequestSupplier) {
    214         return new Callable<Object>() {
    215             @Override
    216             public Object call() throws Exception {
    217                 tlScatterStreams.get().addArchiveEntry(zipArchiveEntryRequestSupplier.get());
    218                 return null;
    219             }
    220         };
    221     }
    222 
    223     /**
    224      * Write the contents this to the target {@link ZipArchiveOutputStream}.
    225      * <p>
    226      * It may be beneficial to write things like directories and manifest files to the targetStream
    227      * before calling this method.
    228      * </p>
    229      *
    230      * <p>Calling this method will shut down the {@link ExecutorService} used by this class. If any of the {@link
    231      * Callable}s {@link #submit}ted to this instance throws an exception, the archive can not be created properly and
    232      * this method will throw an exception.</p>
    233      *
    234      * @param targetStream The {@link ZipArchiveOutputStream} to receive the contents of the scatter streams
    235      * @throws IOException          If writing fails
    236      * @throws InterruptedException If we get interrupted
    237      * @throws ExecutionException   If something happens in the parallel execution
    238      */
    239     public void writeTo(final ZipArchiveOutputStream targetStream)
    240             throws IOException, InterruptedException, ExecutionException {
    241 
    242         // Make sure we catch any exceptions from parallel phase
    243         try {
    244             for (final Future<?> future : futures) {
    245                 future.get();
    246             }
    247         } finally {
    248             es.shutdown();
    249         }
    250 
    251         es.awaitTermination(1000 * 60L, TimeUnit.SECONDS);  // == Infinity. We really *must* wait for this to complete
    252 
    253         // It is important that all threads terminate before we go on, ensure happens-before relationship
    254         compressionDoneAt = System.currentTimeMillis();
    255 
    256         synchronized (streams) {
    257             for (final ScatterZipOutputStream scatterStream : streams) {
    258                 scatterStream.writeTo(targetStream);
    259                 scatterStream.close();
    260             }
    261         }
    262 
    263         scatterDoneAt = System.currentTimeMillis();
    264     }
    265 
    266     /**
    267      * Returns a message describing the overall statistics of the compression run
    268      *
    269      * @return A string
    270      */
    271     public ScatterStatistics getStatisticsMessage() {
    272         return new ScatterStatistics(compressionDoneAt - startedAt, scatterDoneAt - compressionDoneAt);
    273     }
    274 }
    275 
    276