Last active
October 20, 2017 17:48
-
-
Save mdigan/3adf93ab07e1d04803ca to your computer and use it in GitHub Desktop.
pandas and spark dataframes with riak ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "code", | |
| "execution_count": 236, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "[{\"site\":\"RY2\", \"species\":\"PM10\", \"measurementDate\":\"2006-12-23 23:00:00\", \"value\":\"35.5\"}]\r\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "# Given a file of timeseries data in JSON format\n", | |
| "# (for the purposes of this example, the file has only one row)\n", | |
| "\n", | |
| "!echo '[{\"site\":\"RY2\", \"species\":\"PM10\", \"measurementDate\":\"2006-12-23 23:00:00\", \"value\":\"35.5\"}]' > /tmp/data/ts.json\n", | |
| "\n", | |
| "!cat /tmp/data/ts.json" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 237, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/html": [ | |
| "<div>\n", | |
| "<table border=\"1\" class=\"dataframe\">\n", | |
| " <thead>\n", | |
| " <tr style=\"text-align: right;\">\n", | |
| " <th></th>\n", | |
| " <th>measurementDate</th>\n", | |
| " <th>site</th>\n", | |
| " <th>species</th>\n", | |
| " <th>value</th>\n", | |
| " </tr>\n", | |
| " </thead>\n", | |
| " <tbody>\n", | |
| " <tr>\n", | |
| " <th>0</th>\n", | |
| " <td>2006-12-23 23:00:00</td>\n", | |
| " <td>RY2</td>\n", | |
| " <td>PM10</td>\n", | |
| " <td>35.5</td>\n", | |
| " </tr>\n", | |
| " </tbody>\n", | |
| "</table>\n", | |
| "</div>" | |
| ], | |
| "text/plain": [ | |
| " measurementDate site species value\n", | |
| "0 2006-12-23 23:00:00 RY2 PM10 35.5" | |
| ] | |
| }, | |
| "execution_count": 237, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "# Create a Pandas Dataframe from the JSON data\n", | |
| "\n", | |
| "import pandas as pd\n", | |
| "\n", | |
| "pandasDF = pd.read_json('/tmp/data/ts.json', convert_dates=['measurementDate'])\n", | |
| "\n", | |
| "pandasDF" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 238, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/html": [ | |
| "<div>\n", | |
| "<table border=\"1\" class=\"dataframe\">\n", | |
| " <thead>\n", | |
| " <tr style=\"text-align: right;\">\n", | |
| " <th></th>\n", | |
| " <th>site</th>\n", | |
| " <th>species</th>\n", | |
| " <th>measurementDate</th>\n", | |
| " <th>value</th>\n", | |
| " </tr>\n", | |
| " </thead>\n", | |
| " <tbody>\n", | |
| " <tr>\n", | |
| " <th>0</th>\n", | |
| " <td>RY2</td>\n", | |
| " <td>PM10</td>\n", | |
| " <td>2006-12-23 23:00:00</td>\n", | |
| " <td>35.5</td>\n", | |
| " </tr>\n", | |
| " </tbody>\n", | |
| "</table>\n", | |
| "</div>" | |
| ], | |
| "text/plain": [ | |
| " site species measurementDate value\n", | |
| "0 RY2 PM10 2006-12-23 23:00:00 35.5" | |
| ] | |
| }, | |
| "execution_count": 238, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "# Re-order the columns of the dataframe\n", | |
| "# to match the order of the columns \n", | |
| "# in the Riak TS table\n", | |
| "\n", | |
| "# this is the order of the columns in the Riak TS table:\n", | |
| "# ['site', 'varchar', False, 1L, 1L]\n", | |
| "# ['species', 'varchar', False, 2L, 2L]\n", | |
| "# ['measurementDate', 'timestamp', False, 3L, 3L]\n", | |
| "# ['value', 'double', True, None, None]\n", | |
| "\n", | |
| "pandasDF = pandasDF[['site', 'species', 'measurementDate', 'value']]\n", | |
| "\n", | |
| "pandasDF" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 239, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "site object\n", | |
| "species object\n", | |
| "measurementDate datetime64[ns]\n", | |
| "value float64\n", | |
| "dtype: object" | |
| ] | |
| }, | |
| "execution_count": 239, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "# Show the Pandas dataframe datatypes\n", | |
| "\n", | |
| "pandasDF.dtypes" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 240, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "# Get Spark ready\n", | |
| "\n", | |
| "import findspark\n", | |
| "findspark.init()\n", | |
| "\n", | |
| "import pyspark\n", | |
| "\n", | |
| "sc = pyspark.SparkContext()\n", | |
| "sqlContext = pyspark.SQLContext(sc)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 241, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "+----+-------+-------------------+-----+\n", | |
| "|site|species| measurementDate|value|\n", | |
| "+----+-------+-------------------+-----+\n", | |
| "| RY2| PM10|1166914800000000000| 35.5|\n", | |
| "+----+-------+-------------------+-----+\n", | |
| "\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "# Create a Spark dataframe from the Pandas dataframe\n", | |
| "\n", | |
| "sparkDF = sqlContext.createDataFrame(pandasDF)\n", | |
| "\n", | |
| "sparkDF.show()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 242, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[('site', 'string'),\n", | |
| " ('species', 'string'),\n", | |
| " ('measurementDate', 'bigint'),\n", | |
| " ('value', 'double')]" | |
| ] | |
| }, | |
| "execution_count": 242, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "# Show the Spark dataframe datatypes\n", | |
| "# Note: Spark is converting the \n", | |
| "# measurementDate column's datatype\n", | |
| "# from a datetime64\n", | |
| "# to a bigint\n", | |
| "\n", | |
| "sparkDF.dtypes" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 243, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "# Write the Spark dataframe to a Riak TS table\n", | |
| "\n", | |
| "host='127.0.0.1'\n", | |
| "pb_port = '8087'\n", | |
| "hostAndPort = ':'.join([host, pb_port])\n", | |
| "\n", | |
| "tableName = 'pyspark-1458254031'\n", | |
| "\n", | |
| "sparkDF.write\\\n", | |
| " .format('org.apache.spark.sql.riak')\\\n", | |
| " .option('spark.riak.connection.host', hostAndPort)\\\n", | |
| " .mode('Append')\\\n", | |
| " .save(tableName) " | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 244, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "+----+-------+-------------------+-----+\n", | |
| "|site|species| measurementDate|value|\n", | |
| "+----+-------+-------------------+-----+\n", | |
| "| RY2| PM10|1166914800000000000| 35.5|\n", | |
| "+----+-------+-------------------+-----+\n", | |
| "\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "# Read the same data from the Riak TS table into a new Spark dataframe \n", | |
| "\n", | |
| "startDate = 1166914800000000000\n", | |
| "endDate = startDate\n", | |
| "site = 'RY2'\n", | |
| "species = 'PM10'\n", | |
| "\n", | |
| "sparkDF2 = sqlContext.read\\\n", | |
| " .format('org.apache.spark.sql.riak')\\\n", | |
| " .option('spark.riak.connection.host', hostAndPort)\\\n", | |
| " .option('spark.riakts.bindings.timestamp', 'useLong')\\\n", | |
| " .load(tableName)\\\n", | |
| " .filter('''measurementDate >= %(startDate)s\n", | |
| " AND measurementDate <= %(endDate)s\n", | |
| " AND site = '%(site)s'\n", | |
| " AND species = '%(species)s'\n", | |
| " ''' % ({'startDate': startDate, 'endDate': endDate, 'site': site, 'species': species})) \n", | |
| " \n", | |
| "sparkDF2.show()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 245, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "# Stop Spark\n", | |
| "\n", | |
| "sc.stop()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 2", | |
| "language": "python", | |
| "name": "python2" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 2 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython2", | |
| "version": "2.7.10" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 0 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment