Skip to content

Instantly share code, notes, and snippets.

@kjellwinblad
Created February 23, 2011 23:37
Show Gist options
  • Select an option

  • Save kjellwinblad/841435 to your computer and use it in GitHub Desktop.

Select an option

Save kjellwinblad/841435 to your computer and use it in GitHub Desktop.
A Non-Blocking GZIP Decompress Machine Written in Scala
import java.util.zip.GZIPInputStream
import java.io.InputStream
import java.util.concurrent.ArrayBlockingQueue
import java.io.ByteArrayInputStream
import java.util.concurrent.TimeUnit
class GZIPDecompressMachine {
private abstract class DataHolder
private case class Data(val bytes: Array[Byte]) extends DataHolder
private case class Stop extends DataHolder
private case class Error(val exception: Throwable) extends DataHolder
private val toBeDecompressedDataQueue = new ArrayBlockingQueue[DataHolder](1)
private val decompressedDataQueue = new ArrayBlockingQueue[DataHolder](1)
private var started = false
/**
*
* @param toWrite
* @return true if the data is written false if the machine was full and no
* data could be written
* @throws Exception
*/
def write(toWrite: Array[Byte]): Boolean = {
toBeDecompressedDataQueue.offer(Data(toWrite), 10, TimeUnit.MILLISECONDS)
}
/**
* Reads data from the stream if there is any uncompressed data available
*
* @return a non empty byte array if there is data available or null if no
* data is available at the moment
* @throws Exception
*/
def read(): Array[Byte] = {
val result = decompressedDataQueue.poll(10, TimeUnit.MILLISECONDS)
if (result == null) new Array[Byte](0)
else
result match {
case Data(bytes) => bytes
case Stop() => null
case Error(e) => throw new Exception(e)
}
}
/**
* Stops the the machine and lets it decompress the data that has not been
* decompressed before.
*
* @return an array of bytearrays that has not been read before
* @throws Exception
*/
def stop(): Array[Array[Byte]] = {
def createResult(dontEndProporlyCountDown: Int = 200): List[Array[Byte]] = {
toBeDecompressedDataQueue.offer(Stop())
val readResult = read()
if (dontEndProporlyCountDown == 0) {
decompressThread.interrupt()
throw new Exception("Gziped data does not end properly.")
} else if (readResult == null)
Nil
else if (readResult.length == 0)
createResult(dontEndProporlyCountDown - 1)
else
readResult :: createResult(dontEndProporlyCountDown)
}
createResult().toArray
}
/**
* Stops the machine without letting it process any more data
*/
def stopQuietNow() {
while (decompressThread.isAlive) {
toBeDecompressedDataQueue.offer(Stop())
read()
decompressThread.interrupt()
}
}
private val decompressThread = new Thread(new Runnable() {
def run() {
def decompress() = {
var decompressedDataInputStream = new GZIPInputStream(new InputStream() {
var readBuffer = new ByteArrayInputStream(Array[Byte]())
def read() =
if (Thread.interrupted())
throw new Exception("Closed while decompressing")
else readBuffer.read() match {
case -1 => {
started = true
toBeDecompressedDataQueue.take() match {
case Data(bytes) => {
readBuffer = new ByteArrayInputStream(bytes)
read()
} case _ => -1
}
} case v => v
}
})
val readBuffer = new Array[Byte](1024)
while (!Thread.interrupted()) {
decompressedDataInputStream.read(readBuffer) match {
case -1 => {
decompressedDataQueue.put(Stop())
Thread.currentThread.interrupt()
} case n => decompressedDataQueue.put(Data(readBuffer.slice(0, n)))
}
}
}
try {
decompress()
} catch {
case e => decompressedDataQueue.put(Error(e))
}
finally {
decompressedDataQueue.put(Stop())
}
}
})
decompressThread.start()
//Wait until the thread has started up proporly
while (!started) Thread.`yield`()
}
//Test
object GZIPDecompressMachine {
def main(args: Array[String]) {
try {
//Create gzip data
import java.util.zip.GZIPOutputStream
import java.io.ByteArrayOutputStream
val uncompressedData = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 10, 10, 10, 10, 10, 10, 22).map(_.toByte).toArray
println("Decompressed data before " + uncompressedData.mkString(", "))
val outStream = new ByteArrayOutputStream()
val zipOutStream = new GZIPOutputStream(outStream)
zipOutStream.write(uncompressedData)
zipOutStream.flush()
outStream.flush()
zipOutStream.close()
outStream.close()
val compressedData = outStream.toByteArray
println("Compressed data " + compressedData.toList.mkString(", "))
//Test decompress with the machine
val machine = new GzipDecompressMachine()
machine.write(compressedData)
println("Decompressed data after " + machine.stop().flatMap((a) => a).mkString(","))
} catch {
case e: Exception => e.printStackTrace()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment