Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created September 27, 2016 16:12
Show Gist options
  • Select an option

  • Save mrocklin/85bd3f35892d1ef0f63d5b5774ad8712 to your computer and use it in GitHub Desktop.

Select an option

Save mrocklin/85bd3f35892d1ef0f63d5b5774ad8712 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n",
" width=\"30%\" \n",
" align=right\n",
" alt=\"Dask logo\">\n",
"\n",
"Custom Workloads\n",
"-------------------------\n",
"\n",
"*Because not all problems are dataframes*"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Client: scheduler=\"127.0.0.1:8786\" processes=1 cores=4>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client, progress\n",
"c = Client('127.0.0.1:8786')\n",
"c"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from time import sleep\n",
"\n",
"def inc(x):\n",
" from random import random\n",
" sleep(random() / 5)\n",
" return x + 1\n",
"\n",
"def double(x):\n",
" from random import random\n",
" sleep(random() / 5)\n",
" return 2 * x\n",
" \n",
"def add(x, y):\n",
" from random import random\n",
" sleep(random() / 5)\n",
" return x + y "
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"2"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"inc(1)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Future: status: pending, key: inc-7882fdf8d50959500f060f6a0b99eed4>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"future = c.submit(inc, 1) # returns immediately with pending future\n",
"future"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Future: status: finished, type: int, key: inc-7882fdf8d50959500f060f6a0b99eed4>"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"future # scheduler and client talk constantly"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"2"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"future.result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit many tasks\n",
"\n",
"We submit many tasks that depend on each other in a normal Python for loop"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 404 ms, sys: 4 ms, total: 408 ms\n",
"Wall time: 406 ms\n"
]
}
],
"source": [
"%%time\n",
"zs = []\n",
"for i in range(256):\n",
" x = c.submit(inc, i) # x = inc(i)\n",
" y = c.submit(double, x) # y = inc(x)\n",
" z = c.submit(add, x, y) # z = inc(y)\n",
" zs.append(z)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48]"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"c.gather(zs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Custom computation: Tree summation\n",
"\n",
"As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.\n",
"\n",
"```\n",
"finish total single output\n",
" ^ / \\\n",
" | c1 c2 neighbors merge\n",
" | / \\ / \\\n",
" | b1 b2 b3 b4 neighbors merge\n",
" ^ / \\ / \\ / \\ / \\\n",
"start a1 a2 a3 a4 a5 a6 a7 a8 many inputs\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Widget Javascript not detected. It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n"
]
}
],
"source": [
"L = zs\n",
"while len(L) > 1:\n",
" new_L = []\n",
" for i in range(0, len(L), 2):\n",
" future = c.submit(add, L[i], L[i + 1]) # add neighbors\n",
" new_L.append(future)\n",
" L = new_L # swap old list for new\n",
" \n",
"progress(L)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[<Future: status: finished, type: int, key: add-dd368eac4b5e809d2f1ce18277e270a0>]"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"L"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[408]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"c.gather(L)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Example with data streams\n",
"----------------------------\n",
"\n",
"The executor can map functions over lists or queues. This is nothing more than calling `submit` many times. We can chain maps on queues together to construct simple data processing pipelines.\n",
"\n",
"All of this logic happens on the client-side. None of this logic was hard-coded into the scheduler. This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from queue import Queue\n",
"from threading import Thread\n",
"\n",
"def multiplex(n, q, **kwargs):\n",
" \"\"\" Convert one queue into several equivalent Queues\n",
" \n",
" >>> q1, q2, q3 = multiplex(3, in_q)\n",
" \"\"\"\n",
" out_queues = [Queue(**kwargs) for i in range(n)]\n",
" def f():\n",
" while True:\n",
" x = q.get()\n",
" for out_q in out_queues:\n",
" out_q.put(x)\n",
" t = Thread(target=f)\n",
" t.daemon = True\n",
" t.start()\n",
" return out_queues "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
" ----inc---->\n",
" / \\ \n",
"in_q --> q \\_add__ results\n",
" \\ / \n",
" ---double-->/\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"in_q = Queue()\n",
"q = c.scatter(in_q)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"<Future: status: finished, type: int, key: 8068024105615db5ba4cbd263d42a6db>"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"in_q.put(1)\n",
"q.get()"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"q_1, q_2 = multiplex(2, q)\n",
"\n",
"inc_q = c.map(inc, q_1)\n",
"double_q = c.map(double, q_2)\n",
"\n",
"add_q = c.map(add, inc_q, double_q)\n",
"\n",
"out_q = c.gather(add_q)"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"31"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"in_q.put(10)\n",
"out_q.get()"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from random import random\n",
"\n",
"def feed(q):\n",
" for i in range(10000):\n",
" sleep(random())\n",
" q.put(i)\n",
" \n",
"t = Thread(target=feed, args=(q,))\n",
"t.daemon = True\n",
"t.start()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"out_q.qsize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [Root]",
"language": "python",
"name": "Python [Root]"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment