Skip to content

Instantly share code, notes, and snippets.

@neil90-db
Last active April 16, 2024 21:37
Show Gist options
  • Select an option

  • Save neil90-db/135646db0167bbf8a30387d0daa58402 to your computer and use it in GitHub Desktop.

Select an option

Save neil90-db/135646db0167bbf8a30387d0daa58402 to your computer and use it in GitHub Desktop.
%scala
import java.io
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener._
def writeToFile(file: File, str: String): Unit = {
val writer = new FileWriter(file)
try { writer.append(str).append("\n") }
finally { writer.close }
println("saving progress file successful")
}
val streamListener = new StreamingQueryListener() {
def streamKiller(queryProgress: QueryProgressEvent, reason: String): Unit = {
println(s"Killing stream ${queryProgress.progress.name} (${queryProgress.progress.id})")
println(s"Reason for terminating: ${reason}")
val currentStream = spark.streams.get(queryProgress.progress.id)
currentStream.stop()
}
def saveQueryProgress(queryProgress: QueryProgressEvent, savePath: String): Unit ={
val filename = new File(savePath + queryProgress.progress.runId + queryProgress.progress.batchId + ".json")
writeToFile(filename, queryProgress.progress.toString)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.progress)
println("Number of Input Records "+ queryProgress.progress.numInputRows)
val savePath = "/dbfs/home/adam/queryProgressFiles"
if(queryProgress.progress.batchId % 1 == 0){
saveQueryProgress(queryProgress, savePath)
streamKiller(queryProgress, s"Only want to process 1 Batch")
}
// if(queryProgress.progress.numInputRows == 0){
// saveQueryProgress(queryProgress, savePath)
// streamKiller(queryProgress)
// }
}
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Stream Terminated " + queryTerminated.id)
}
}
spark.streams.addListener(streamListener)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment