Skip to content

Instantly share code, notes, and snippets.

@mdigan
Last active October 20, 2017 17:48
Show Gist options
  • Select an option

  • Save mdigan/3adf93ab07e1d04803ca to your computer and use it in GitHub Desktop.

Select an option

Save mdigan/3adf93ab07e1d04803ca to your computer and use it in GitHub Desktop.
pandas and spark dataframes with riak ts
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 236,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[{\"site\":\"RY2\", \"species\":\"PM10\", \"measurementDate\":\"2006-12-23 23:00:00\", \"value\":\"35.5\"}]\r\n"
]
}
],
"source": [
"# Given a file of timeseries data in JSON format\n",
"# (for the purposes of this example, the file has only one row)\n",
"\n",
"!echo '[{\"site\":\"RY2\", \"species\":\"PM10\", \"measurementDate\":\"2006-12-23 23:00:00\", \"value\":\"35.5\"}]' > /tmp/data/ts.json\n",
"\n",
"!cat /tmp/data/ts.json"
]
},
{
"cell_type": "code",
"execution_count": 237,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>measurementDate</th>\n",
" <th>site</th>\n",
" <th>species</th>\n",
" <th>value</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>2006-12-23 23:00:00</td>\n",
" <td>RY2</td>\n",
" <td>PM10</td>\n",
" <td>35.5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" measurementDate site species value\n",
"0 2006-12-23 23:00:00 RY2 PM10 35.5"
]
},
"execution_count": 237,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Create a Pandas Dataframe from the JSON data\n",
"\n",
"import pandas as pd\n",
"\n",
"pandasDF = pd.read_json('/tmp/data/ts.json', convert_dates=['measurementDate'])\n",
"\n",
"pandasDF"
]
},
{
"cell_type": "code",
"execution_count": 238,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>site</th>\n",
" <th>species</th>\n",
" <th>measurementDate</th>\n",
" <th>value</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>RY2</td>\n",
" <td>PM10</td>\n",
" <td>2006-12-23 23:00:00</td>\n",
" <td>35.5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" site species measurementDate value\n",
"0 RY2 PM10 2006-12-23 23:00:00 35.5"
]
},
"execution_count": 238,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Re-order the columns of the dataframe\n",
"# to match the order of the columns \n",
"# in the Riak TS table\n",
"\n",
"# this is the order of the columns in the Riak TS table:\n",
"# ['site', 'varchar', False, 1L, 1L]\n",
"# ['species', 'varchar', False, 2L, 2L]\n",
"# ['measurementDate', 'timestamp', False, 3L, 3L]\n",
"# ['value', 'double', True, None, None]\n",
"\n",
"pandasDF = pandasDF[['site', 'species', 'measurementDate', 'value']]\n",
"\n",
"pandasDF"
]
},
{
"cell_type": "code",
"execution_count": 239,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"site object\n",
"species object\n",
"measurementDate datetime64[ns]\n",
"value float64\n",
"dtype: object"
]
},
"execution_count": 239,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Show the Pandas dataframe datatypes\n",
"\n",
"pandasDF.dtypes"
]
},
{
"cell_type": "code",
"execution_count": 240,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Get Spark ready\n",
"\n",
"import findspark\n",
"findspark.init()\n",
"\n",
"import pyspark\n",
"\n",
"sc = pyspark.SparkContext()\n",
"sqlContext = pyspark.SQLContext(sc)"
]
},
{
"cell_type": "code",
"execution_count": 241,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+-------+-------------------+-----+\n",
"|site|species| measurementDate|value|\n",
"+----+-------+-------------------+-----+\n",
"| RY2| PM10|1166914800000000000| 35.5|\n",
"+----+-------+-------------------+-----+\n",
"\n"
]
}
],
"source": [
"# Create a Spark dataframe from the Pandas dataframe\n",
"\n",
"sparkDF = sqlContext.createDataFrame(pandasDF)\n",
"\n",
"sparkDF.show()"
]
},
{
"cell_type": "code",
"execution_count": 242,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[('site', 'string'),\n",
" ('species', 'string'),\n",
" ('measurementDate', 'bigint'),\n",
" ('value', 'double')]"
]
},
"execution_count": 242,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Show the Spark dataframe datatypes\n",
"# Note: Spark is converting the \n",
"# measurementDate column's datatype\n",
"# from a datetime64\n",
"# to a bigint\n",
"\n",
"sparkDF.dtypes"
]
},
{
"cell_type": "code",
"execution_count": 243,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"# Write the Spark dataframe to a Riak TS table\n",
"\n",
"host='127.0.0.1'\n",
"pb_port = '8087'\n",
"hostAndPort = ':'.join([host, pb_port])\n",
"\n",
"tableName = 'pyspark-1458254031'\n",
"\n",
"sparkDF.write\\\n",
" .format('org.apache.spark.sql.riak')\\\n",
" .option('spark.riak.connection.host', hostAndPort)\\\n",
" .mode('Append')\\\n",
" .save(tableName) "
]
},
{
"cell_type": "code",
"execution_count": 244,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+----+-------+-------------------+-----+\n",
"|site|species| measurementDate|value|\n",
"+----+-------+-------------------+-----+\n",
"| RY2| PM10|1166914800000000000| 35.5|\n",
"+----+-------+-------------------+-----+\n",
"\n"
]
}
],
"source": [
"# Read the same data from the Riak TS table into a new Spark dataframe \n",
"\n",
"startDate = 1166914800000000000\n",
"endDate = startDate\n",
"site = 'RY2'\n",
"species = 'PM10'\n",
"\n",
"sparkDF2 = sqlContext.read\\\n",
" .format('org.apache.spark.sql.riak')\\\n",
" .option('spark.riak.connection.host', hostAndPort)\\\n",
" .option('spark.riakts.bindings.timestamp', 'useLong')\\\n",
" .load(tableName)\\\n",
" .filter('''measurementDate >= %(startDate)s\n",
" AND measurementDate <= %(endDate)s\n",
" AND site = '%(site)s'\n",
" AND species = '%(species)s'\n",
" ''' % ({'startDate': startDate, 'endDate': endDate, 'site': site, 'species': species})) \n",
" \n",
"sparkDF2.show()"
]
},
{
"cell_type": "code",
"execution_count": 245,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"# Stop Spark\n",
"\n",
"sc.stop()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment