001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 */ 019package org.apache.commons.compress.compressors.snappy; 020 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PushbackInputStream; 024import java.util.Arrays; 025 026import org.apache.commons.compress.compressors.CompressorInputStream; 027import org.apache.commons.compress.utils.BoundedInputStream; 028import org.apache.commons.compress.utils.IOUtils; 029 030/** 031 * CompressorInputStream for the framing Snappy format. 032 * 033 * <p>Based on the "spec" in the version "Last revised: 2013-10-25"</p> 034 * 035 * @see <a href="https://github.com/google/snappy/blob/master/framing_format.txt">Snappy framing format description</a> 036 * @since 1.7 037 */ 038public class FramedSnappyCompressorInputStream extends CompressorInputStream { 039 040 /** 041 * package private for tests only. 042 */ 043 static final long MASK_OFFSET = 0xa282ead8L; 044 045 private static final int STREAM_IDENTIFIER_TYPE = 0xff; 046 private static final int COMPRESSED_CHUNK_TYPE = 0; 047 private static final int UNCOMPRESSED_CHUNK_TYPE = 1; 048 private static final int PADDING_CHUNK_TYPE = 0xfe; 049 private static final int MIN_UNSKIPPABLE_TYPE = 2; 050 private static final int MAX_UNSKIPPABLE_TYPE = 0x7f; 051 private static final int MAX_SKIPPABLE_TYPE = 0xfd; 052 053 private static final byte[] SZ_SIGNATURE = new byte[] { 054 (byte) STREAM_IDENTIFIER_TYPE, // tag 055 6, 0, 0, // length 056 's', 'N', 'a', 'P', 'p', 'Y' 057 }; 058 059 /** The underlying stream to read compressed data from */ 060 private final PushbackInputStream in; 061 062 /** The dialect to expect */ 063 private final FramedSnappyDialect dialect; 064 065 private SnappyCompressorInputStream currentCompressedChunk; 066 067 // used in no-arg read method 068 private final byte[] oneByte = new byte[1]; 069 070 private boolean endReached, inUncompressedChunk; 071 072 private int uncompressedBytesRemaining; 073 private long expectedChecksum = -1; 074 private final PureJavaCrc32C checksum = new PureJavaCrc32C(); 075 076 /** 077 * Constructs a new input stream that decompresses 078 * snappy-framed-compressed data from the specified input stream 079 * using the {@link FramedSnappyDialect#STANDARD} dialect. 080 * @param in the InputStream from which to read the compressed data 081 * @throws IOException if reading fails 082 */ 083 public FramedSnappyCompressorInputStream(final InputStream in) throws IOException { 084 this(in, FramedSnappyDialect.STANDARD); 085 } 086 087 /** 088 * Constructs a new input stream that decompresses snappy-framed-compressed data 089 * from the specified input stream. 090 * @param in the InputStream from which to read the compressed data 091 * @param dialect the dialect used by the compressed stream 092 * @throws IOException if reading fails 093 */ 094 public FramedSnappyCompressorInputStream(final InputStream in, 095 final FramedSnappyDialect dialect) 096 throws IOException { 097 this.in = new PushbackInputStream(in, 1); 098 this.dialect = dialect; 099 if (dialect.hasStreamIdentifier()) { 100 readStreamIdentifier(); 101 } 102 } 103 104 /** {@inheritDoc} */ 105 @Override 106 public int read() throws IOException { 107 return read(oneByte, 0, 1) == -1 ? -1 : oneByte[0] & 0xFF; 108 } 109 110 /** {@inheritDoc} */ 111 @Override 112 public void close() throws IOException { 113 if (currentCompressedChunk != null) { 114 currentCompressedChunk.close(); 115 currentCompressedChunk = null; 116 } 117 in.close(); 118 } 119 120 /** {@inheritDoc} */ 121 @Override 122 public int read(final byte[] b, final int off, final int len) throws IOException { 123 int read = readOnce(b, off, len); 124 if (read == -1) { 125 readNextBlock(); 126 if (endReached) { 127 return -1; 128 } 129 read = readOnce(b, off, len); 130 } 131 return read; 132 } 133 134 /** {@inheritDoc} */ 135 @Override 136 public int available() throws IOException { 137 if (inUncompressedChunk) { 138 return Math.min(uncompressedBytesRemaining, 139 in.available()); 140 } else if (currentCompressedChunk != null) { 141 return currentCompressedChunk.available(); 142 } 143 return 0; 144 } 145 146 /** 147 * Read from the current chunk into the given array. 148 * 149 * @return -1 if there is no current chunk or the number of bytes 150 * read from the current chunk (which may be -1 if the end of the 151 * chunk is reached). 152 */ 153 private int readOnce(final byte[] b, final int off, final int len) throws IOException { 154 int read = -1; 155 if (inUncompressedChunk) { 156 final int amount = Math.min(uncompressedBytesRemaining, len); 157 if (amount == 0) { 158 return -1; 159 } 160 read = in.read(b, off, amount); 161 if (read != -1) { 162 uncompressedBytesRemaining -= read; 163 count(read); 164 } 165 } else if (currentCompressedChunk != null) { 166 final long before = currentCompressedChunk.getBytesRead(); 167 read = currentCompressedChunk.read(b, off, len); 168 if (read == -1) { 169 currentCompressedChunk.close(); 170 currentCompressedChunk = null; 171 } else { 172 count(currentCompressedChunk.getBytesRead() - before); 173 } 174 } 175 if (read > 0) { 176 checksum.update(b, off, read); 177 } 178 return read; 179 } 180 181 private void readNextBlock() throws IOException { 182 verifyLastChecksumAndReset(); 183 inUncompressedChunk = false; 184 final int type = readOneByte(); 185 if (type == -1) { 186 endReached = true; 187 } else if (type == STREAM_IDENTIFIER_TYPE) { 188 in.unread(type); 189 pushedBackBytes(1); 190 readStreamIdentifier(); 191 readNextBlock(); 192 } else if (type == PADDING_CHUNK_TYPE 193 || (type > MAX_UNSKIPPABLE_TYPE && type <= MAX_SKIPPABLE_TYPE)) { 194 skipBlock(); 195 readNextBlock(); 196 } else if (type >= MIN_UNSKIPPABLE_TYPE && type <= MAX_UNSKIPPABLE_TYPE) { 197 throw new IOException("unskippable chunk with type " + type 198 + " (hex " + Integer.toHexString(type) + ")" 199 + " detected."); 200 } else if (type == UNCOMPRESSED_CHUNK_TYPE) { 201 inUncompressedChunk = true; 202 uncompressedBytesRemaining = readSize() - 4 /* CRC */; 203 expectedChecksum = unmask(readCrc()); 204 } else if (type == COMPRESSED_CHUNK_TYPE) { 205 final boolean expectChecksum = dialect.usesChecksumWithCompressedChunks(); 206 final long size = readSize() - (expectChecksum ? 4l : 0l); 207 if (expectChecksum) { 208 expectedChecksum = unmask(readCrc()); 209 } else { 210 expectedChecksum = -1; 211 } 212 currentCompressedChunk = 213 new SnappyCompressorInputStream(new BoundedInputStream(in, size)); 214 // constructor reads uncompressed size 215 count(currentCompressedChunk.getBytesRead()); 216 } else { 217 // impossible as all potential byte values have been covered 218 throw new IOException("unknown chunk type " + type 219 + " detected."); 220 } 221 } 222 223 private long readCrc() throws IOException { 224 final byte[] b = new byte[4]; 225 final int read = IOUtils.readFully(in, b); 226 count(read); 227 if (read != 4) { 228 throw new IOException("premature end of stream"); 229 } 230 long crc = 0; 231 for (int i = 0; i < 4; i++) { 232 crc |= (b[i] & 0xFFL) << (8 * i); 233 } 234 return crc; 235 } 236 237 static long unmask(long x) { 238 // ugly, maybe we should just have used ints and deal with the 239 // overflow 240 x -= MASK_OFFSET; 241 x &= 0xffffFFFFL; 242 return ((x >> 17) | (x << 15)) & 0xffffFFFFL; 243 } 244 245 private int readSize() throws IOException { 246 int b = 0; 247 int sz = 0; 248 for (int i = 0; i < 3; i++) { 249 b = readOneByte(); 250 if (b == -1) { 251 throw new IOException("premature end of stream"); 252 } 253 sz |= (b << (i * 8)); 254 } 255 return sz; 256 } 257 258 private void skipBlock() throws IOException { 259 final int size = readSize(); 260 final long read = IOUtils.skip(in, size); 261 count(read); 262 if (read != size) { 263 throw new IOException("premature end of stream"); 264 } 265 } 266 267 private void readStreamIdentifier() throws IOException { 268 final byte[] b = new byte[10]; 269 final int read = IOUtils.readFully(in, b); 270 count(read); 271 if (10 != read || !matches(b, 10)) { 272 throw new IOException("Not a framed Snappy stream"); 273 } 274 } 275 276 private int readOneByte() throws IOException { 277 final int b = in.read(); 278 if (b != -1) { 279 count(1); 280 return b & 0xFF; 281 } 282 return -1; 283 } 284 285 private void verifyLastChecksumAndReset() throws IOException { 286 if (expectedChecksum >= 0 && expectedChecksum != checksum.getValue()) { 287 throw new IOException("Checksum verification failed"); 288 } 289 expectedChecksum = -1; 290 checksum.reset(); 291 } 292 293 /** 294 * Checks if the signature matches what is expected for a .sz file. 295 * 296 * <p>.sz files start with a chunk with tag 0xff and content sNaPpY.</p> 297 * 298 * @param signature the bytes to check 299 * @param length the number of bytes to check 300 * @return true if this is a .sz stream, false otherwise 301 */ 302 public static boolean matches(final byte[] signature, final int length) { 303 304 if (length < SZ_SIGNATURE.length) { 305 return false; 306 } 307 308 byte[] shortenedSig = signature; 309 if (signature.length > SZ_SIGNATURE.length) { 310 shortenedSig = new byte[SZ_SIGNATURE.length]; 311 System.arraycopy(signature, 0, shortenedSig, 0, SZ_SIGNATURE.length); 312 } 313 314 return Arrays.equals(shortenedSig, SZ_SIGNATURE); 315 } 316 317}