Skip to content

Instantly share code, notes, and snippets.

@neoeahit
Created August 27, 2017 22:03
Show Gist options
  • Select an option

  • Save neoeahit/4c80af355d5166eab07fbae1079060fe to your computer and use it in GitHub Desktop.

Select an option

Save neoeahit/4c80af355d5166eab07fbae1079060fe to your computer and use it in GitHub Desktop.
Issues in files upload
class FileArchievar()
extends RichSinkFunction[Message]
with CheckpointedFunction
with CheckpointedRestoring[ArrayBuffer[Message]] {
private var checkpointedMessages: ListState[Message] = _
private val bufferredMessages = ListBuffer[Message] = _
private val pendingFiles = ListBuffer[String]()
@throws[IOException]
def invoke(message: Message) {
bufferredMessages += message
if (bufferredMessages.size >= 10000){
createPendingFiles()
}
}
@throws[IOException]
def snapshotState(context: FunctionSnapshotContext) {
checkpointedMessages.clear()
bufferredMessages.foreach(checkpointedMessages.add)
pendingFiles synchronized {
if (pendingFiles.nonEmpty) {
// we have a list of pending files
// we move all times to S3( thats the sink in our case)
// and post that we delete these files
}
pendingFiles.clear()
}
}
def handlePreviousPendingFiles() = {
// if any pending files are present post program crash, here were move them to sink ( upload in our case
// to s3), and then delete them from local storage
}
@throws[IOException]
def initializeState(context: FunctionInitializationContext) {
// Check is files alreay exist in /tmp
// this might be the case the program crashed before these files were uploaded to s3
// We need to recover(upload these files to S3 and clear the directory
handlePreviousPendingFiles()
checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {})))
import scala.collection.JavaConversions._
for (message <- checkpointedMessages.get) {
bufferredMessages.add(message)
}
}
@throws[Exception]
def restoreState(state: ArrayBuffer[Message]) {
bufferredMessages ++= state
}
private def createPendingFiles(): Unit = {
// push bufferred messages to a pending file
resetState()
}
private def resetState(): Unit = {
// reset various counters etc
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment