Created
March 16, 2016 22:46
-
-
Save mdigan/b031135fddb193e7e6df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "code", | |
| "execution_count": 201, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "import riak\n", | |
| "import datetime, time, random" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 278, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "127.0.0.1:8087\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "host='127.0.0.1'\n", | |
| "pb_port = '8087'\n", | |
| "hostAndPort = \":\".join([host, pb_port])\n", | |
| "\n", | |
| "client = riak.RiakClient(host=host, pb_port=pb_port)\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 203, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "table_name = 'ingest9999'\n", | |
| "table = client.table(table_name)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 204, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "CREATE TABLE ingest9999 (\n", | |
| "site varchar not null,\n", | |
| "species varchar not null,\n", | |
| "measurementDate timestamp not null,\n", | |
| "value double, \n", | |
| "PRIMARY KEY ((site, species, quantum(measurementDate, 24, m)),\n", | |
| " site, species, measurementDate))\n", | |
| "\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "create_sql = \"\"\"CREATE TABLE %(table_name)s (\n", | |
| "site varchar not null,\n", | |
| "species varchar not null,\n", | |
| "measurementDate timestamp not null,\n", | |
| "value double, \n", | |
| "PRIMARY KEY ((site, species, quantum(measurementDate, 24, m)),\n", | |
| " site, species, measurementDate))\n", | |
| "\"\"\" % ({'table_name': table_name})\n", | |
| "\n", | |
| "print create_sql\n", | |
| "\n", | |
| "result = table.query(create_sql)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 205, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "['site', 'varchar', False, 1L, 1L]\n", | |
| "['species', 'varchar', False, 2L, 2L]\n", | |
| "['measurementDate', 'timestamp', False, 3L, 3L]\n", | |
| "['value', 'double', True, None, None]\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "schema = table.describe().rows\n", | |
| "\n", | |
| "for r in schema:\n", | |
| " print r" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 206, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "['AA', 'FF', 1458161129, 1.6720172646914264]\n", | |
| "['AA', 'FF', 1458161130, 81.83513205882498]\n", | |
| "['AA', 'FF', 1458161131, 75.80167120967674]\n", | |
| "['AA', 'FF', 1458161132, 3.144925271372525]\n", | |
| "['AA', 'FF', 1458161133, 59.62361060515761]\n", | |
| "['AA', 'FF', 1458161134, 40.41685371165855]\n", | |
| "['AA', 'FF', 1458161135, 95.61059941300488]\n", | |
| "['AA', 'FF', 1458161136, 39.98067632522474]\n", | |
| "['AA', 'FF', 1458161137, 96.73061593045676]\n", | |
| "['AA', 'FF', 1458161138, 11.234444541198297]\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "site = 'AA'\n", | |
| "species = 'FF'\n", | |
| "\n", | |
| "start_date = int(time.time())\n", | |
| "\n", | |
| "events = []\n", | |
| "for i in range(10):\n", | |
| " measurementDate = start_date + i\n", | |
| " value = random.uniform(-20, 110)\n", | |
| " events.append([site, species, measurementDate, value])\n", | |
| "\n", | |
| "end_date = measurementDate \n", | |
| " \n", | |
| "#client.ts_put instead? \n", | |
| "\n", | |
| "result = table.new(events).store()\n", | |
| "\n", | |
| "for e in events:\n", | |
| " print e\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 207, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "SELECT *\n", | |
| "FROM ingest9999\n", | |
| "WHERE measurementDate > 1458161129\n", | |
| "AND measurementDate < 1458161138\n", | |
| "AND site = 'AA'\n", | |
| "AND species = 'FF'\n", | |
| "\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 130000), 81.83513205882498]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 131000), 75.80167120967674]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 132000), 3.144925271372525]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 133000), 59.62361060515761]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 134000), 40.41685371165855]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 135000), 95.61059941300488]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 136000), 39.98067632522474]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 137000), 96.73061593045676]\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "select_sql = \"\"\"SELECT *\n", | |
| "FROM %(table_name)s\n", | |
| "WHERE measurementDate > %(start_date)s\n", | |
| "AND measurementDate < %(end_date)s\n", | |
| "AND site = '%(site)s'\n", | |
| "AND species = '%(species)s'\n", | |
| "\"\"\" % ({'table_name': table_name, 'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species})\n", | |
| "\n", | |
| "print select_sql\n", | |
| "\n", | |
| "result = table.query(select_sql)\n", | |
| "\n", | |
| "for r in result.rows:\n", | |
| " print r\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 208, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 129000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 130000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 131000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 132000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 133000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 134000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 135000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 136000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 137000)]\n", | |
| "['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 138000)]\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "stream = table.stream_keys()\n", | |
| "\n", | |
| "for keys in stream:\n", | |
| " for k in sorted(keys):\n", | |
| " print(k)\n", | |
| " #table.delete(t, k)\n", | |
| "\n", | |
| "stream.close()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 313, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "spark.app.name=pyspark-shell\n", | |
| "spark.driver.extraClassPath=file:///tmp/spark-riak-connector/spark-riak-connector-1.3.0-beta1-uber.jar\n", | |
| "spark.master=local[*]\n", | |
| "spark.submit.deployMode=client\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "import findspark\n", | |
| "findspark.init()\n", | |
| "\n", | |
| "import pyspark\n", | |
| "\n", | |
| "sc.stop()\n", | |
| "\n", | |
| "#wget https://bintray.com/artifact/download/basho/data-platform/com/basho/riak/spark-riak-connector/1.3.0-beta1/spark-riak-connector-1.3.0-beta1-uber.jar\n", | |
| "\n", | |
| "spark_connector_path = 'file:///tmp/spark-riak-connector/spark-riak-connector-1.3.0-beta1-uber.jar'\n", | |
| "\n", | |
| "conf = pyspark.SparkConf()\n", | |
| "#http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-td22685.html\n", | |
| "conf.set('spark.driver.extraClassPath', spark_connector_path)\n", | |
| "#conf.set('spark.jars', spark_connector_path)\n", | |
| "\n", | |
| "print conf.toDebugString()\n", | |
| "\n", | |
| "sc = pyspark.SparkContext(conf=conf)\n", | |
| "\n", | |
| "#http://stackoverflow.com/questions/28080419/how-to-get-sparkcontext-from-javasparkcontext-in-pyspark\n", | |
| "sc._jsc.sc().addJar(spark_connector_path)\n", | |
| "\n", | |
| "\n", | |
| "#print sc._jsc.sc()\n", | |
| "\n", | |
| "sqlContext = pyspark.SQLContext(sc)\n", | |
| "\n", | |
| "#https://github.com/TargetHolding/pyspark-cassandra/issues/2\n", | |
| "#sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(\"org.apache.spark.sql.riak\").newInstance()\n", | |
| "\n", | |
| "\n", | |
| "\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 291, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[[0], [2], [3], [4], [6]]" | |
| ] | |
| }, | |
| "execution_count": 291, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 314, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "ename": "Py4JJavaError", | |
| "evalue": "An error occurred while calling o1241.load.\n: java.lang.ClassNotFoundException: Failed to load class for data source: org.apache.spark.sql.riak.\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)\n\tat sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:497)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.ClassNotFoundException: org.apache.spark.sql.riak.DefaultSource\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:381)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try$.apply(Try.scala:161)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try.orElse(Try.scala:82)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:60)\n\t... 13 more\n", | |
| "output_type": "error", | |
| "traceback": [ | |
| "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", | |
| "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", | |
| "\u001b[0;32m<ipython-input-314-448212ed26d7>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m df = sqlContext.read .format(\"org.apache.spark.sql.riak\") .option(\"spark.riak.connection.host\", hostAndPort) .option(\"spark.riakts.bindings.timestamp\", \"useLong\") .load(\"ingest\") .filter(\"\"\"measurementDate >= 1438405200000\n\u001b[0m\u001b[1;32m 3\u001b[0m \u001b[0mAND\u001b[0m \u001b[0mmeasurementDate\u001b[0m \u001b[0;34m<=\u001b[0m \u001b[0;36m1444939200000\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0mAND\u001b[0m \u001b[0msite\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m'ZV2'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0mAND\u001b[0m \u001b[0mspecies\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m'PM10'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
| "\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/pyspark/sql/readwriter.pyc\u001b[0m in \u001b[0;36mload\u001b[0;34m(self, path, format, schema, **options)\u001b[0m\n\u001b[1;32m 119\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0moptions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m**\u001b[0m\u001b[0moptions\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 120\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 121\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mload\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 122\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 123\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mload\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
| "\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[0;32m--> 538\u001b[0;31m self.target_id, self.name)\n\u001b[0m\u001b[1;32m 539\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 540\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
| "\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.pyc\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 34\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 35\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 36\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 37\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 38\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", | |
| "\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 299\u001b[0m \u001b[0;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 300\u001b[0;31m format(target_id, '.', name), value)\n\u001b[0m\u001b[1;32m 301\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 302\u001b[0m raise Py4JError(\n", | |
| "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o1241.load.\n: java.lang.ClassNotFoundException: Failed to load class for data source: org.apache.spark.sql.riak.\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)\n\tat sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:497)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.ClassNotFoundException: org.apache.spark.sql.riak.DefaultSource\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:381)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try$.apply(Try.scala:161)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try.orElse(Try.scala:82)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:60)\n\t... 13 more\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "\n", | |
| "df = sqlContext.read \\\n", | |
| " .format(\"org.apache.spark.sql.riak\") \\\n", | |
| " .option(\"spark.riak.connection.host\", hostAndPort) \\\n", | |
| " .option(\"spark.riakts.bindings.timestamp\", \"useLong\") \\\n", | |
| " .load(\"ingest\") \\\n", | |
| " .filter(\"\"\"measurementDate >= 1438405200000\n", | |
| " AND measurementDate <= 1444939200000\n", | |
| " AND site = 'ZV2'\n", | |
| " AND species = 'PM10'\n", | |
| " \"\"\")\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 2", | |
| "language": "python", | |
| "name": "python2" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 2 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython2", | |
| "version": "2.7.10" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 0 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment