001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing,
013 * software distributed under the License is distributed on an
014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 * KIND, either express or implied.  See the License for the
016 * specific language governing permissions and limitations
017 * under the License.
018 */
019package org.apache.commons.compress.compressors.snappy;
020
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024import java.util.Arrays;
025
026import org.apache.commons.compress.compressors.CompressorInputStream;
027import org.apache.commons.compress.utils.BoundedInputStream;
028import org.apache.commons.compress.utils.IOUtils;
029
030/**
031 * CompressorInputStream for the framing Snappy format.
032 *
033 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p>
034 *
035 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a>
036 * @since 1.7
037 */
038public class FramedSnappyCompressorInputStream extends CompressorInputStream {
039
040    /**
041     * package private for tests only.
042     */
043    static final long MASK_OFFSET = 0xa282ead8L;
044
045    private static final int STREAM_IDENTIFIER_TYPE = 0xff;
046    private static final int COMPRESSED_CHUNK_TYPE = 0;
047    private static final int UNCOMPRESSED_CHUNK_TYPE = 1;
048    private static final int PADDING_CHUNK_TYPE = 0xfe;
049    private static final int MIN_UNSKIPPABLE_TYPE = 2;
050    private static final int MAX_UNSKIPPABLE_TYPE = 0x7f;
051    private static final int MAX_SKIPPABLE_TYPE = 0xfd;
052
053    private static final byte[] SZ_SIGNATURE = new byte[] {
054        (byte) STREAM_IDENTIFIER_TYPE, // tag
055        6, 0, 0, // length
056        's', 'N', 'a', 'P', 'p', 'Y'
057    };
058
059    /** The underlying stream to read compressed data from */
060    private final PushbackInputStream in;
061    
062    /** The dialect to expect */
063    private final FramedSnappyDialect dialect;
064
065    private SnappyCompressorInputStream currentCompressedChunk;
066
067    // used in no-arg read method
068    private final byte[] oneByte = new byte[1];
069
070    private boolean endReached, inUncompressedChunk;
071
072    private int uncompressedBytesRemaining;
073    private long expectedChecksum = -1;
074    private final PureJavaCrc32C checksum = new PureJavaCrc32C();
075
076    /**
077     * Constructs a new input stream that decompresses
078     * snappy-framed-compressed data from the specified input stream
079     * using the {@link FramedSnappyDialect#STANDARD} dialect.
080     * @param in  the InputStream from which to read the compressed data
081     * @throws IOException if reading fails
082     */
083    public FramedSnappyCompressorInputStream(final InputStream in) throws IOException {
084        this(in, FramedSnappyDialect.STANDARD);
085    }
086
087    /**
088     * Constructs a new input stream that decompresses snappy-framed-compressed data
089     * from the specified input stream.
090     * @param in  the InputStream from which to read the compressed data
091     * @param dialect the dialect used by the compressed stream
092     * @throws IOException if reading fails
093     */
094    public FramedSnappyCompressorInputStream(final InputStream in,
095                                             final FramedSnappyDialect dialect)
096        throws IOException {
097        this.in = new PushbackInputStream(in, 1);
098        this.dialect = dialect;
099        if (dialect.hasStreamIdentifier()) {
100            readStreamIdentifier();
101        }
102    }
103
104    /** {@inheritDoc} */
105    @Override
106    public int read() throws IOException {
107        return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF;
108    }
109
110    /** {@inheritDoc} */
111    @Override
112    public void close() throws IOException {
113        if (currentCompressedChunk != null) {
114            currentCompressedChunk.close();
115            currentCompressedChunk = null;
116        }
117        in.close();
118    }
119
120    /** {@inheritDoc} */
121    @Override
122    public int read(final byte[] b, final int off, final int len) throws IOException {
123        int read = readOnce(b, off, len);
124        if (read == -1) {
125            readNextBlock();
126            if (endReached) {
127                return -1;
128            }
129            read = readOnce(b, off, len);
130        }
131        return read;
132    }
133
134    /** {@inheritDoc} */
135    @Override
136    public int available() throws IOException {
137        if (inUncompressedChunk) {
138            return Math.min(uncompressedBytesRemaining,
139                            in.available());
140        } else if (currentCompressedChunk != null) {
141            return currentCompressedChunk.available();
142        }
143        return 0;
144    }
145
146    /**
147     * Read from the current chunk into the given array.
148     *
149     * @return -1 if there is no current chunk or the number of bytes
150     * read from the current chunk (which may be -1 if the end of the
151     * chunk is reached).
152     */
153    private int readOnce(final byte[] b, final int off, final int len) throws IOException {
154        int read = -1;
155        if (inUncompressedChunk) {
156            final int amount = Math.min(uncompressedBytesRemaining, len);
157            if (amount == 0) {
158                return -1;
159            }
160            read = in.read(b, off, amount);
161            if (read != -1) {
162                uncompressedBytesRemaining -= read;
163                count(read);
164            }
165        } else if (currentCompressedChunk != null) {
166            final long before = currentCompressedChunk.getBytesRead();
167            read = currentCompressedChunk.read(b, off, len);
168            if (read == -1) {
169                currentCompressedChunk.close();
170                currentCompressedChunk = null;
171            } else {
172                count(currentCompressedChunk.getBytesRead() - before);
173            }
174        }
175        if (read > 0) {
176            checksum.update(b, off, read);
177        }
178        return read;
179    }
180
181    private void readNextBlock() throws IOException {
182        verifyLastChecksumAndReset();
183        inUncompressedChunk = false;
184        final int type = readOneByte();
185        if (type == -1) {
186            endReached = true;
187        } else if (type == STREAM_IDENTIFIER_TYPE) {
188            in.unread(type);
189            pushedBackBytes(1);
190            readStreamIdentifier();
191            readNextBlock();
192        } else if (type == PADDING_CHUNK_TYPE
193                   || (type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE)) {
194            skipBlock();
195            readNextBlock();
196        } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) {
197            throw new IOException("unskippable chunk with type " + type
198                                  + " (hex " + Integer.toHexString(type) + ")"
199                                  + " detected.");
200        } else if (type == UNCOMPRESSED_CHUNK_TYPE) {
201            inUncompressedChunk = true;
202            uncompressedBytesRemaining = readSize() - 4 /* CRC */;
203            expectedChecksum = unmask(readCrc());
204        } else if (type == COMPRESSED_CHUNK_TYPE) {
205            final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks();
206            final long size = readSize() - (expectChecksum ? 4l : 0l);
207            if (expectChecksum) {
208                expectedChecksum = unmask(readCrc());
209            } else {
210                expectedChecksum = -1;
211            }
212            currentCompressedChunk =
213                new SnappyCompressorInputStream(new BoundedInputStream(in, size));
214            // constructor reads uncompressed size
215            count(currentCompressedChunk.getBytesRead());
216        } else {
217            // impossible as all potential byte values have been covered
218            throw new IOException("unknown chunk type " + type
219                                  + " detected.");
220        }
221    }
222
223    private long readCrc() throws IOException {
224        final byte[] b = new byte[4];
225        final int read = IOUtils.readFully(in, b);
226        count(read);
227        if (read != 4) {
228            throw new IOException("premature end of stream");
229        }
230        long crc = 0;
231        for (int i = 0; i < 4; i++) {
232            crc |= (b[i] & 0xFFL) << (8 * i);
233        }
234        return crc;
235    }
236
237    static long unmask(long x) {
238        // ugly, maybe we should just have used ints and deal with the
239        // overflow
240        x -= MASK_OFFSET;
241        x &= 0xffffFFFFL;
242        return ((x >> 17) | (x << 15)) & 0xffffFFFFL;
243    }
244
245    private int readSize() throws IOException {
246        int b = 0;
247        int sz = 0;
248        for (int i = 0; i < 3; i++) {
249            b = readOneByte();
250            if (b == -1) {
251                throw new IOException("premature end of stream");
252            }
253            sz |= (b << (i * 8));
254        }
255        return sz;
256    }
257
258    private void skipBlock() throws IOException {
259        final int size = readSize();
260        final long read = IOUtils.skip(in, size);
261        count(read);
262        if (read != size) {
263            throw new IOException("premature end of stream");
264        }
265    }
266
267    private void readStreamIdentifier() throws IOException {
268        final byte[] b = new byte[10];
269        final int read = IOUtils.readFully(in, b);
270        count(read);
271        if (10 != read || !matches(b, 10)) {
272            throw new IOException("Not a framed Snappy stream");
273        }
274    }
275
276    private int readOneByte() throws IOException {
277        final int b = in.read();
278        if (b != -1) {
279            count(1);
280            return b & 0xFF;
281        }
282        return -1;
283    }
284
285    private void verifyLastChecksumAndReset() throws IOException {
286        if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) {
287            throw new IOException("Checksum verification failed");
288        }
289        expectedChecksum = -1;
290        checksum.reset();
291    }
292
293    /**
294     * Checks if the signature matches what is expected for a .sz file.
295     *
296     * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p>
297     * 
298     * @param signature the bytes to check
299     * @param length    the number of bytes to check
300     * @return          true if this is a .sz stream, false otherwise
301     */
302    public static boolean matches(final byte[] signature, final int length) {
303
304        if (length < SZ_SIGNATURE.length) {
305            return false;
306        }
307
308        byte[] shortenedSig = signature;
309        if (signature.length > SZ_SIGNATURE.length) {
310            shortenedSig = new byte[SZ_SIGNATURE.length];
311            System.arraycopy(signature, 0, shortenedSig, 0, SZ_SIGNATURE.length);
312        }
313
314        return Arrays.equals(shortenedSig, SZ_SIGNATURE);
315    }
316
317}