Created
July 27, 2017 14:26
-
-
Save maasg/9d51a2a42fc831e385cf744b84e80479 to your computer and use it in GitHub Desktop.
Build path for different events and assign globalID
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "metadata": { | |
| "id": "295a615e-43da-4ebb-9aac-776a28141f84", | |
| "name": "GlobalUniqueState.snb", | |
| "user_save_timestamp": "2017-07-27T16:18:09.707Z", | |
| "auto_save_timestamp": "1970-01-01T01:00:00.000Z", | |
| "language_info": { | |
| "name": "scala", | |
| "file_extension": "scala", | |
| "codemirror_mode": "text/x-scala" | |
| }, | |
| "trusted": true, | |
| "sparkNotebook": null, | |
| "customLocalRepo": null, | |
| "customRepos": null, | |
| "customDeps": null, | |
| "customImports": null, | |
| "customArgs": null, | |
| "customSparkConf": null, | |
| "customVars": null | |
| }, | |
| "cells": [ | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "798981D2E2BA4F8B9EE088E3BE243145" | |
| }, | |
| "cell_type": "code", | |
| "source": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "import org.apache.spark.streaming._\nimport org.apache.spark.streaming.dstream.QueueInputDStream\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 1, | |
| "time": "Took: 1.085s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "6D72421639A845418934F7C62011627A" | |
| }, | |
| "cell_type": "code", | |
| "source": "val data = Seq( \"1|a,1|b,3|c\", \"2|d,2|e,2|f\", \"3|g,3|h,3|i,4|j\", \"5|k\", \"4|f,1|g\", \"6|h\")\nval inputRDDs = data.map(str => str.split(\",\")).map(arr => sparkContext.parallelize(arr))\n/** expected output\nZZ => 1a, 1b, 1c, 2d, 2e, 2f, 3g, 3h, 4f\nXX => 3c, 4j, 5k, 6h\nYY => 1g\n**/\n", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "data: Seq[String] = List(1|a,1|b,3|c, 2|d,2|e,2|f, 3|g,3|h,3|i,4|j, 5|k, 4|f,1|g, 6|h)\ninputRDDs: Seq[org.apache.spark.rdd.RDD[String]] = List(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 2, | |
| "time": "Took: 1.242s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "4FDAFE2A3E7B458086D5C235B3226697" | |
| }, | |
| "cell_type": "code", | |
| "source": "case class Event(id: Int, payload: String)", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "defined class Event\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 3, | |
| "time": "Took: 0.894s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "3DBA2B7EB1124740A16DADA6755B69B6" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient val ssc = new StreamingContext(sparkContext, Seconds(5))", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@5db87859\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 4, | |
| "time": "Took: 0.733s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "59152C091B524251A6741AECC244CCA5" | |
| }, | |
| "cell_type": "code", | |
| "source": "val queue = scala.collection.mutable.Queue(inputRDDs:_*)", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "queue: scala.collection.mutable.Queue[org.apache.spark.rdd.RDD[String]] = Queue(ParallelCollectionRDD[0] at parallelize at <console>:72, ParallelCollectionRDD[1] at parallelize at <console>:72, ParallelCollectionRDD[2] at parallelize at <console>:72, ParallelCollectionRDD[3] at parallelize at <console>:72, ParallelCollectionRDD[4] at parallelize at <console>:72, ParallelCollectionRDD[5] at parallelize at <console>:72)\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 5, | |
| "time": "Took: 0.711s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "C51FDEBB5F9043D8A0D045F2B0BE2A9E" | |
| }, | |
| "cell_type": "code", | |
| "source": "// This is for test purposes only. Replace with actual stream source.\n@transient val queueDStream = ssc.queueStream(queue, oneAtATime = true)", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "queueDStream: org.apache.spark.streaming.dstream.InputDStream[String] = org.apache.spark.streaming.dstream.QueueInputDStream@6b7cdd97\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 6, | |
| "time": "Took: 0.716s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "147F4ABC9B834C768197D79C303F26C0" | |
| }, | |
| "cell_type": "code", | |
| "source": "// this is our similarity function. Replace with something appropriate.\n// We're using the function notation instead of a def, b/c it's cleaner for the serialization.\nval isSimilar: Int => Int => Boolean = event1 => event2 => Math.abs(event2 - event1).toInt == 1 \n\n// Global Id Generator. Should generate unique ids each time - replace accordingly\nval genGlobalId: () => String = () => \"gen-\" + scala.util.Random.nextInt(10000)", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "isSimilar: Int => (Int => Boolean) = <function1>\ngenGlobalId: () => String = <function0>\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 7, | |
| "time": "Took: 0.855s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "BD368F7AC4C645568923DB4AD92372F9" | |
| }, | |
| "cell_type": "code", | |
| "source": "// Here we have our initial Event stream\n@transient val eventStream = queueDStream.map{entry => \n val Array(id, payload) = entry.split(\"\\\\|\")\n Event(id.toInt, payload)\n}", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "eventStream: org.apache.spark.streaming.dstream.DStream[Event] = org.apache.spark.streaming.dstream.MappedDStream@56ab87b6\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 8, | |
| "time": "Took: 0.868s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "3C96904AD563474CA8B4185F6B5B8A49" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient var states: RDD[(String, (Int, Long))] = sparkContext.emptyRDD", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "states: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[7] at emptyRDD at <console>:71\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 9, | |
| "time": "Took: 0.690s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "5BF0201296CF47538F052BED61EF0509" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient var currentState: RDD[(String, (Int, Long))] = sparkContext.emptyRDD", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "currentState: org.apache.spark.rdd.RDD[(String, (Int, Long))] = EmptyRDD[8] at emptyRDD at <console>:71\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 10, | |
| "time": "Took: 0.675s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "5EC3D2A2AC2D45C2831C88AA68992DA7" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient val eventsById = eventStream.map(event => (event.id, event))\n@transient val groupedEvents = eventsById.groupByKey()", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "eventsById: org.apache.spark.streaming.dstream.DStream[(Int, Event)] = org.apache.spark.streaming.dstream.MappedDStream@5ab031f5\ngroupedEvents: org.apache.spark.streaming.dstream.DStream[(Int, Iterable[Event])] = org.apache.spark.streaming.dstream.ShuffledDStream@2367d0e2\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 11, | |
| "time": "Took: 0.751s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "95C4DDBD66A6419C8E6C7EFE7334B056" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient val taggedEvents = groupedEvents.transform{ (events, currentTime) => \n val currentTransitions = states.reduceByKey{case (event1, event2) => Seq(event1, event2).maxBy{case (id, ts) => ts}} \n val currentMappings = currentTransitions.map{case (globalId, (currentId, maxTx)) => (currentId, globalId)}\n \n val newEventIds = events.keys // let's extract the ids of the incoming (grouped) events\n val similarityJoinMap = newEventIds.cartesian(currentMappings)\n .collect{case (eventId, (currentId, globalId)) if (isSimilar(currentId)(eventId)) => (eventId, globalId)}\n .collectAsMap\n //val similarityBC = sparkContext.broadcast(similarityJoinMap) \n val newGlobalKeys = newEventIds.map(id => (id, similarityJoinMap.getOrElse(id, genGlobalId())))\n newGlobalKeys.cache() //avoid lazy evaluation to generate multiple global ids\n \n val newTaggedEvents = events.join(newGlobalKeys).flatMap{case (eventId, (events, globalKey)) => \n events.map(event => (event.id,event.payload, globalKey))\n }\n val newStates = newGlobalKeys.map{case (eventId, globalKey) => (globalKey, (eventId, currentTime.milliseconds))}\n currentState = newStates \n states.unpersist(false) \n states = newStates.union(states)\n states.cache() \n newTaggedEvents\n }", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "taggedEvents: org.apache.spark.streaming.dstream.DStream[(Int, String, String)] = org.apache.spark.streaming.dstream.TransformedDStream@6a596725\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 12, | |
| "time": "Took: 0.971s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "EE22776EA26647BFA2DF78B80B444A10" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient val rawEventBox = ul(20)\nrawEventBox", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "rawEventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres15: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon66a44b169710c8728ef4ba67ac6a09c7"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 14, | |
| "time": "Took: 0.863s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "3381C07E0EAE430E90362A9733923446" | |
| }, | |
| "cell_type": "code", | |
| "source": "eventStream.foreachRDD(e => rawEventBox.append(e.collect.mkString(\", \")))", | |
| "outputs": [ | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 15, | |
| "time": "Took: 0.785s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "FE764B4A05EC481C8EA40A88ED6D473B" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient val currentTrans = ul(20)\ncurrentTrans", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "currentTrans: notebook.front.widgets.HtmlList = <HtmlList widget>\nres19: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anonfc521a3a25c9a0a5958acd946894c1a9"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 16, | |
| "time": "Took: 0.725s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "80A8C316BEBB440E899157A51627AE1F" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient val eventBox = ul(20)\neventBox", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "eventBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres21: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anona38d37904c4f09c626ff390359d2c8af"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 17, | |
| "time": "Took: 0.650s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "6056967780B14BFC8BE5EBC3635362E5" | |
| }, | |
| "cell_type": "code", | |
| "source": "@transient val transitionChainBox = ul(20)\ntransitionChainBox", | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "transitionChainBox: notebook.front.widgets.HtmlList = <HtmlList widget>\nres23: notebook.front.widgets.HtmlList = <HtmlList widget>\n" | |
| }, | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "<ul data-bind=\"foreach: value\"><li data-bind=\"html: $data\"></li><script data-this=\"{"valueId":"anon1022644358033f118f633706870a441a"}\" type=\"text/x-scoped-javascript\">/*<![CDATA[*/\nreq(\n['observable', 'knockout'],\nfunction (O, ko) {\n ko.applyBindings({\n value: O.makeObservable(valueId)\n },\n this\n );\n});\n /*]]>*/</script></ul>" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 18, | |
| "time": "Took: 0.667s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "48D1F71E85F04A63B0F9026622D7E78E" | |
| }, | |
| "cell_type": "code", | |
| "source": "taggedEvents.foreachRDD{events => \n eventBox.append(\"---\")\n eventBox.append(events.collect.map{case (id, payload, globalKey) => s\"$id|$payload: $globalKey\"}.mkString(\",\"))\n val transitions = states.groupByKey.mapValues(eventSeq => eventSeq.toList.sortBy{case (id, ts) => ts}.map{case (id, ts) => id}.mkString (\"<-\"))\n transitionChainBox.append(\"---\")\n transitions.collect.map{case (globalId, eventSeq) => s\"$globalId: $eventSeq\"}.foreach(s => transitionChainBox.append(s))\n \n currentTrans.append(currentState.collect.map(_.toString).mkString(\",\"))\n \n }", | |
| "outputs": [ | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 19, | |
| "time": "Took: 0.924s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "C19D7885F7B2437EB9B89D69462D35F9" | |
| }, | |
| "cell_type": "code", | |
| "source": "ssc.start()", | |
| "outputs": [ | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 20, | |
| "time": "Took: 0.627s, at 2017-07-27 16:16" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": false, | |
| "id": "054D3411347247539EBC9AB16488F987" | |
| }, | |
| "cell_type": "code", | |
| "source": "ssc.stop(false)", | |
| "outputs": [ | |
| { | |
| "metadata": {}, | |
| "data": { | |
| "text/html": "" | |
| }, | |
| "output_type": "execute_result", | |
| "execution_count": 21, | |
| "time": "Took: 0.766s, at 2017-07-27 16:17" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "input_collapsed": false, | |
| "collapsed": true, | |
| "id": "0342E90E4BE949748B7D837F71336974" | |
| }, | |
| "cell_type": "code", | |
| "source": "", | |
| "outputs": [] | |
| } | |
| ], | |
| "nbformat": 4 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment