Created
August 27, 2017 22:03
-
-
Save neoeahit/4c80af355d5166eab07fbae1079060fe to your computer and use it in GitHub Desktop.
Issues in files upload
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class FileArchievar() | |
| extends RichSinkFunction[Message] | |
| with CheckpointedFunction | |
| with CheckpointedRestoring[ArrayBuffer[Message]] { | |
| private var checkpointedMessages: ListState[Message] = _ | |
| private val bufferredMessages = ListBuffer[Message] = _ | |
| private val pendingFiles = ListBuffer[String]() | |
| @throws[IOException] | |
| def invoke(message: Message) { | |
| bufferredMessages += message | |
| if (bufferredMessages.size >= 10000){ | |
| createPendingFiles() | |
| } | |
| } | |
| @throws[IOException] | |
| def snapshotState(context: FunctionSnapshotContext) { | |
| checkpointedMessages.clear() | |
| bufferredMessages.foreach(checkpointedMessages.add) | |
| pendingFiles synchronized { | |
| if (pendingFiles.nonEmpty) { | |
| // we have a list of pending files | |
| // we move all times to S3( thats the sink in our case) | |
| // and post that we delete these files | |
| } | |
| pendingFiles.clear() | |
| } | |
| } | |
| def handlePreviousPendingFiles() = { | |
| // if any pending files are present post program crash, here were move them to sink ( upload in our case | |
| // to s3), and then delete them from local storage | |
| } | |
| @throws[IOException] | |
| def initializeState(context: FunctionInitializationContext) { | |
| // Check is files alreay exist in /tmp | |
| // this might be the case the program crashed before these files were uploaded to s3 | |
| // We need to recover(upload these files to S3 and clear the directory | |
| handlePreviousPendingFiles() | |
| checkpointedMessages = context.getOperatorStateStore.getListState(new ListStateDescriptor[Message](STATE_UID, TypeInformation.of(new TypeHint[Message]() {}))) | |
| import scala.collection.JavaConversions._ | |
| for (message <- checkpointedMessages.get) { | |
| bufferredMessages.add(message) | |
| } | |
| } | |
| @throws[Exception] | |
| def restoreState(state: ArrayBuffer[Message]) { | |
| bufferredMessages ++= state | |
| } | |
| private def createPendingFiles(): Unit = { | |
| // push bufferred messages to a pending file | |
| resetState() | |
| } | |
| private def resetState(): Unit = { | |
| // reset various counters etc | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment