Created
February 23, 2011 23:37
-
-
Save kjellwinblad/841435 to your computer and use it in GitHub Desktop.
A Non-Blocking GZIP Decompress Machine Written in Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.util.zip.GZIPInputStream | |
| import java.io.InputStream | |
| import java.util.concurrent.ArrayBlockingQueue | |
| import java.io.ByteArrayInputStream | |
| import java.util.concurrent.TimeUnit | |
| class GZIPDecompressMachine { | |
| private abstract class DataHolder | |
| private case class Data(val bytes: Array[Byte]) extends DataHolder | |
| private case class Stop extends DataHolder | |
| private case class Error(val exception: Throwable) extends DataHolder | |
| private val toBeDecompressedDataQueue = new ArrayBlockingQueue[DataHolder](1) | |
| private val decompressedDataQueue = new ArrayBlockingQueue[DataHolder](1) | |
| private var started = false | |
| /** | |
| * | |
| * @param toWrite | |
| * @return true if the data is written false if the machine was full and no | |
| * data could be written | |
| * @throws Exception | |
| */ | |
| def write(toWrite: Array[Byte]): Boolean = { | |
| toBeDecompressedDataQueue.offer(Data(toWrite), 10, TimeUnit.MILLISECONDS) | |
| } | |
| /** | |
| * Reads data from the stream if there is any uncompressed data available | |
| * | |
| * @return a non empty byte array if there is data available or null if no | |
| * data is available at the moment | |
| * @throws Exception | |
| */ | |
| def read(): Array[Byte] = { | |
| val result = decompressedDataQueue.poll(10, TimeUnit.MILLISECONDS) | |
| if (result == null) new Array[Byte](0) | |
| else | |
| result match { | |
| case Data(bytes) => bytes | |
| case Stop() => null | |
| case Error(e) => throw new Exception(e) | |
| } | |
| } | |
| /** | |
| * Stops the the machine and lets it decompress the data that has not been | |
| * decompressed before. | |
| * | |
| * @return an array of bytearrays that has not been read before | |
| * @throws Exception | |
| */ | |
| def stop(): Array[Array[Byte]] = { | |
| def createResult(dontEndProporlyCountDown: Int = 200): List[Array[Byte]] = { | |
| toBeDecompressedDataQueue.offer(Stop()) | |
| val readResult = read() | |
| if (dontEndProporlyCountDown == 0) { | |
| decompressThread.interrupt() | |
| throw new Exception("Gziped data does not end properly.") | |
| } else if (readResult == null) | |
| Nil | |
| else if (readResult.length == 0) | |
| createResult(dontEndProporlyCountDown - 1) | |
| else | |
| readResult :: createResult(dontEndProporlyCountDown) | |
| } | |
| createResult().toArray | |
| } | |
| /** | |
| * Stops the machine without letting it process any more data | |
| */ | |
| def stopQuietNow() { | |
| while (decompressThread.isAlive) { | |
| toBeDecompressedDataQueue.offer(Stop()) | |
| read() | |
| decompressThread.interrupt() | |
| } | |
| } | |
| private val decompressThread = new Thread(new Runnable() { | |
| def run() { | |
| def decompress() = { | |
| var decompressedDataInputStream = new GZIPInputStream(new InputStream() { | |
| var readBuffer = new ByteArrayInputStream(Array[Byte]()) | |
| def read() = | |
| if (Thread.interrupted()) | |
| throw new Exception("Closed while decompressing") | |
| else readBuffer.read() match { | |
| case -1 => { | |
| started = true | |
| toBeDecompressedDataQueue.take() match { | |
| case Data(bytes) => { | |
| readBuffer = new ByteArrayInputStream(bytes) | |
| read() | |
| } case _ => -1 | |
| } | |
| } case v => v | |
| } | |
| }) | |
| val readBuffer = new Array[Byte](1024) | |
| while (!Thread.interrupted()) { | |
| decompressedDataInputStream.read(readBuffer) match { | |
| case -1 => { | |
| decompressedDataQueue.put(Stop()) | |
| Thread.currentThread.interrupt() | |
| } case n => decompressedDataQueue.put(Data(readBuffer.slice(0, n))) | |
| } | |
| } | |
| } | |
| try { | |
| decompress() | |
| } catch { | |
| case e => decompressedDataQueue.put(Error(e)) | |
| } | |
| finally { | |
| decompressedDataQueue.put(Stop()) | |
| } | |
| } | |
| }) | |
| decompressThread.start() | |
| //Wait until the thread has started up proporly | |
| while (!started) Thread.`yield`() | |
| } | |
| //Test | |
| object GZIPDecompressMachine { | |
| def main(args: Array[String]) { | |
| try { | |
| //Create gzip data | |
| import java.util.zip.GZIPOutputStream | |
| import java.io.ByteArrayOutputStream | |
| val uncompressedData = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10, 10, 10, 10, 22).map(_.toByte).toArray | |
| println("Decompressed data before " + uncompressedData.mkString(", ")) | |
| val outStream = new ByteArrayOutputStream() | |
| val zipOutStream = new GZIPOutputStream(outStream) | |
| zipOutStream.write(uncompressedData) | |
| zipOutStream.flush() | |
| outStream.flush() | |
| zipOutStream.close() | |
| outStream.close() | |
| val compressedData = outStream.toByteArray | |
| println("Compressed data " + compressedData.toList.mkString(", ")) | |
| //Test decompress with the machine | |
| val machine = new GzipDecompressMachine() | |
| machine.write(compressedData) | |
| println("Decompressed data after " + machine.stop().flatMap((a) => a).mkString(",")) | |
| } catch { | |
| case e: Exception => e.printStackTrace() | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment