Skip to content

Instantly share code, notes, and snippets.

@mdigan
Created March 16, 2016 22:46
Show Gist options
  • Select an option

  • Save mdigan/b031135fddb193e7e6df to your computer and use it in GitHub Desktop.

Select an option

Save mdigan/b031135fddb193e7e6df to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 201,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"import riak\n",
"import datetime, time, random"
]
},
{
"cell_type": "code",
"execution_count": 278,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"127.0.0.1:8087\n"
]
}
],
"source": [
"host='127.0.0.1'\n",
"pb_port = '8087'\n",
"hostAndPort = \":\".join([host, pb_port])\n",
"\n",
"client = riak.RiakClient(host=host, pb_port=pb_port)\n"
]
},
{
"cell_type": "code",
"execution_count": 203,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"table_name = 'ingest9999'\n",
"table = client.table(table_name)"
]
},
{
"cell_type": "code",
"execution_count": 204,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CREATE TABLE ingest9999 (\n",
"site varchar not null,\n",
"species varchar not null,\n",
"measurementDate timestamp not null,\n",
"value double, \n",
"PRIMARY KEY ((site, species, quantum(measurementDate, 24, m)),\n",
" site, species, measurementDate))\n",
"\n"
]
}
],
"source": [
"create_sql = \"\"\"CREATE TABLE %(table_name)s (\n",
"site varchar not null,\n",
"species varchar not null,\n",
"measurementDate timestamp not null,\n",
"value double, \n",
"PRIMARY KEY ((site, species, quantum(measurementDate, 24, m)),\n",
" site, species, measurementDate))\n",
"\"\"\" % ({'table_name': table_name})\n",
"\n",
"print create_sql\n",
"\n",
"result = table.query(create_sql)"
]
},
{
"cell_type": "code",
"execution_count": 205,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['site', 'varchar', False, 1L, 1L]\n",
"['species', 'varchar', False, 2L, 2L]\n",
"['measurementDate', 'timestamp', False, 3L, 3L]\n",
"['value', 'double', True, None, None]\n"
]
}
],
"source": [
"schema = table.describe().rows\n",
"\n",
"for r in schema:\n",
" print r"
]
},
{
"cell_type": "code",
"execution_count": 206,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['AA', 'FF', 1458161129, 1.6720172646914264]\n",
"['AA', 'FF', 1458161130, 81.83513205882498]\n",
"['AA', 'FF', 1458161131, 75.80167120967674]\n",
"['AA', 'FF', 1458161132, 3.144925271372525]\n",
"['AA', 'FF', 1458161133, 59.62361060515761]\n",
"['AA', 'FF', 1458161134, 40.41685371165855]\n",
"['AA', 'FF', 1458161135, 95.61059941300488]\n",
"['AA', 'FF', 1458161136, 39.98067632522474]\n",
"['AA', 'FF', 1458161137, 96.73061593045676]\n",
"['AA', 'FF', 1458161138, 11.234444541198297]\n"
]
}
],
"source": [
"site = 'AA'\n",
"species = 'FF'\n",
"\n",
"start_date = int(time.time())\n",
"\n",
"events = []\n",
"for i in range(10):\n",
" measurementDate = start_date + i\n",
" value = random.uniform(-20, 110)\n",
" events.append([site, species, measurementDate, value])\n",
"\n",
"end_date = measurementDate \n",
" \n",
"#client.ts_put instead? \n",
"\n",
"result = table.new(events).store()\n",
"\n",
"for e in events:\n",
" print e\n"
]
},
{
"cell_type": "code",
"execution_count": 207,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"SELECT *\n",
"FROM ingest9999\n",
"WHERE measurementDate > 1458161129\n",
"AND measurementDate < 1458161138\n",
"AND site = 'AA'\n",
"AND species = 'FF'\n",
"\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 130000), 81.83513205882498]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 131000), 75.80167120967674]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 132000), 3.144925271372525]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 133000), 59.62361060515761]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 134000), 40.41685371165855]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 135000), 95.61059941300488]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 136000), 39.98067632522474]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 137000), 96.73061593045676]\n"
]
}
],
"source": [
"select_sql = \"\"\"SELECT *\n",
"FROM %(table_name)s\n",
"WHERE measurementDate > %(start_date)s\n",
"AND measurementDate < %(end_date)s\n",
"AND site = '%(site)s'\n",
"AND species = '%(species)s'\n",
"\"\"\" % ({'table_name': table_name, 'start_date': start_date, 'end_date': end_date, 'site': site, 'species': species})\n",
"\n",
"print select_sql\n",
"\n",
"result = table.query(select_sql)\n",
"\n",
"for r in result.rows:\n",
" print r\n"
]
},
{
"cell_type": "code",
"execution_count": 208,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 129000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 130000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 131000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 132000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 133000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 134000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 135000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 136000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 137000)]\n",
"['AA', 'FF', datetime.datetime(1970, 1, 17, 21, 2, 41, 138000)]\n"
]
}
],
"source": [
"stream = table.stream_keys()\n",
"\n",
"for keys in stream:\n",
" for k in sorted(keys):\n",
" print(k)\n",
" #table.delete(t, k)\n",
"\n",
"stream.close()"
]
},
{
"cell_type": "code",
"execution_count": 313,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"spark.app.name=pyspark-shell\n",
"spark.driver.extraClassPath=file:///tmp/spark-riak-connector/spark-riak-connector-1.3.0-beta1-uber.jar\n",
"spark.master=local[*]\n",
"spark.submit.deployMode=client\n"
]
}
],
"source": [
"import findspark\n",
"findspark.init()\n",
"\n",
"import pyspark\n",
"\n",
"sc.stop()\n",
"\n",
"#wget https://bintray.com/artifact/download/basho/data-platform/com/basho/riak/spark-riak-connector/1.3.0-beta1/spark-riak-connector-1.3.0-beta1-uber.jar\n",
"\n",
"spark_connector_path = 'file:///tmp/spark-riak-connector/spark-riak-connector-1.3.0-beta1-uber.jar'\n",
"\n",
"conf = pyspark.SparkConf()\n",
"#http://apache-spark-user-list.1001560.n3.nabble.com/How-to-add-jars-to-standalone-pyspark-program-td22685.html\n",
"conf.set('spark.driver.extraClassPath', spark_connector_path)\n",
"#conf.set('spark.jars', spark_connector_path)\n",
"\n",
"print conf.toDebugString()\n",
"\n",
"sc = pyspark.SparkContext(conf=conf)\n",
"\n",
"#http://stackoverflow.com/questions/28080419/how-to-get-sparkcontext-from-javasparkcontext-in-pyspark\n",
"sc._jsc.sc().addJar(spark_connector_path)\n",
"\n",
"\n",
"#print sc._jsc.sc()\n",
"\n",
"sqlContext = pyspark.SQLContext(sc)\n",
"\n",
"#https://github.com/TargetHolding/pyspark-cassandra/issues/2\n",
"#sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(\"org.apache.spark.sql.riak\").newInstance()\n",
"\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 291,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[[0], [2], [3], [4], [6]]"
]
},
"execution_count": 291,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()"
]
},
{
"cell_type": "code",
"execution_count": 314,
"metadata": {
"collapsed": false
},
"outputs": [
{
"ename": "Py4JJavaError",
"evalue": "An error occurred while calling o1241.load.\n: java.lang.ClassNotFoundException: Failed to load class for data source: org.apache.spark.sql.riak.\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)\n\tat sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:497)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.ClassNotFoundException: org.apache.spark.sql.riak.DefaultSource\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:381)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try$.apply(Try.scala:161)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try.orElse(Try.scala:82)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:60)\n\t... 13 more\n",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-314-448212ed26d7>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 2\u001b[0;31m df = sqlContext.read .format(\"org.apache.spark.sql.riak\") .option(\"spark.riak.connection.host\", hostAndPort) .option(\"spark.riakts.bindings.timestamp\", \"useLong\") .load(\"ingest\") .filter(\"\"\"measurementDate >= 1438405200000\n\u001b[0m\u001b[1;32m 3\u001b[0m \u001b[0mAND\u001b[0m \u001b[0mmeasurementDate\u001b[0m \u001b[0;34m<=\u001b[0m \u001b[0;36m1444939200000\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0mAND\u001b[0m \u001b[0msite\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m'ZV2'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[0mAND\u001b[0m \u001b[0mspecies\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m'PM10'\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/pyspark/sql/readwriter.pyc\u001b[0m in \u001b[0;36mload\u001b[0;34m(self, path, format, schema, **options)\u001b[0m\n\u001b[1;32m 119\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0moptions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m**\u001b[0m\u001b[0moptions\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 120\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;32mis\u001b[0m \u001b[0;32mnot\u001b[0m \u001b[0mNone\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 121\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mload\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 122\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 123\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mload\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 536\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 537\u001b[0m return_value = get_return_value(answer, self.gateway_client,\n\u001b[0;32m--> 538\u001b[0;31m self.target_id, self.name)\n\u001b[0m\u001b[1;32m 539\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 540\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.pyc\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 34\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 35\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 36\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 37\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 38\u001b[0m \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 298\u001b[0m raise Py4JJavaError(\n\u001b[1;32m 299\u001b[0m \u001b[0;34m'An error occurred while calling {0}{1}{2}.\\n'\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 300\u001b[0;31m format(target_id, '.', name), value)\n\u001b[0m\u001b[1;32m 301\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 302\u001b[0m raise Py4JError(\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o1241.load.\n: java.lang.ClassNotFoundException: Failed to load class for data source: org.apache.spark.sql.riak.\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)\n\tat org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104)\n\tat sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:497)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:207)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.ClassNotFoundException: org.apache.spark.sql.riak.DefaultSource\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:381)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4$$anonfun$apply$1.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try$.apply(Try.scala:161)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$$anonfun$4.apply(ResolvedDataSource.scala:60)\n\tat scala.util.Try.orElse(Try.scala:82)\n\tat org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:60)\n\t... 13 more\n"
]
}
],
"source": [
"\n",
"df = sqlContext.read \\\n",
" .format(\"org.apache.spark.sql.riak\") \\\n",
" .option(\"spark.riak.connection.host\", hostAndPort) \\\n",
" .option(\"spark.riakts.bindings.timestamp\", \"useLong\") \\\n",
" .load(\"ingest\") \\\n",
" .filter(\"\"\"measurementDate >= 1438405200000\n",
" AND measurementDate <= 1444939200000\n",
" AND site = 'ZV2'\n",
" AND species = 'PM10'\n",
" \"\"\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment