Created
September 27, 2016 16:12
-
-
Save mrocklin/85bd3f35892d1ef0f63d5b5774ad8712 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n", | |
| " width=\"30%\" \n", | |
| " align=right\n", | |
| " alt=\"Dask logo\">\n", | |
| "\n", | |
| "Custom Workloads\n", | |
| "-------------------------\n", | |
| "\n", | |
| "*Because not all problems are dataframes*" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "<Client: scheduler=\"127.0.0.1:8786\" processes=1 cores=4>" | |
| ] | |
| }, | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "from dask.distributed import Client, progress\n", | |
| "c = Client('127.0.0.1:8786')\n", | |
| "c" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 3, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "from time import sleep\n", | |
| "\n", | |
| "def inc(x):\n", | |
| " from random import random\n", | |
| " sleep(random() / 5)\n", | |
| " return x + 1\n", | |
| "\n", | |
| "def double(x):\n", | |
| " from random import random\n", | |
| " sleep(random() / 5)\n", | |
| " return 2 * x\n", | |
| " \n", | |
| "def add(x, y):\n", | |
| " from random import random\n", | |
| " sleep(random() / 5)\n", | |
| " return x + y " | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "2" | |
| ] | |
| }, | |
| "execution_count": 4, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "inc(1)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "<Future: status: pending, key: inc-7882fdf8d50959500f060f6a0b99eed4>" | |
| ] | |
| }, | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "future = c.submit(inc, 1) # returns immediately with pending future\n", | |
| "future" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 6, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "<Future: status: finished, type: int, key: inc-7882fdf8d50959500f060f6a0b99eed4>" | |
| ] | |
| }, | |
| "execution_count": 6, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "future # scheduler and client talk constantly" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 7, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "2" | |
| ] | |
| }, | |
| "execution_count": 7, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "future.result()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "### Submit many tasks\n", | |
| "\n", | |
| "We submit many tasks that depend on each other in a normal Python for loop" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "CPU times: user 404 ms, sys: 4 ms, total: 408 ms\n", | |
| "Wall time: 406 ms\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "%%time\n", | |
| "zs = []\n", | |
| "for i in range(256):\n", | |
| " x = c.submit(inc, i) # x = inc(i)\n", | |
| " y = c.submit(double, x) # y = inc(x)\n", | |
| " z = c.submit(add, x, y) # z = inc(y)\n", | |
| " zs.append(z)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[3, 6, 9, 12, 15, 18, 21, 24, 27, 30, 33, 36, 39, 42, 45, 48]" | |
| ] | |
| }, | |
| "execution_count": 8, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "c.gather(zs)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "### Custom computation: Tree summation\n", | |
| "\n", | |
| "As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.\n", | |
| "\n", | |
| "```\n", | |
| "finish total single output\n", | |
| " ^ / \\\n", | |
| " | c1 c2 neighbors merge\n", | |
| " | / \\ / \\\n", | |
| " | b1 b2 b3 b4 neighbors merge\n", | |
| " ^ / \\ / \\ / \\ / \\\n", | |
| "start a1 a2 a3 a4 a5 a6 a7 a8 many inputs\n", | |
| "```" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "Widget Javascript not detected. It may not be installed properly. Did you enable the widgetsnbextension? If not, then run \"jupyter nbextension enable --py --sys-prefix widgetsnbextension\"\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "L = zs\n", | |
| "while len(L) > 1:\n", | |
| " new_L = []\n", | |
| " for i in range(0, len(L), 2):\n", | |
| " future = c.submit(add, L[i], L[i + 1]) # add neighbors\n", | |
| " new_L.append(future)\n", | |
| " L = new_L # swap old list for new\n", | |
| " \n", | |
| "progress(L)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 11, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[<Future: status: finished, type: int, key: add-dd368eac4b5e809d2f1ce18277e270a0>]" | |
| ] | |
| }, | |
| "execution_count": 11, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "L" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 12, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[408]" | |
| ] | |
| }, | |
| "execution_count": 12, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "c.gather(L)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Example with data streams\n", | |
| "----------------------------\n", | |
| "\n", | |
| "The executor can map functions over lists or queues. This is nothing more than calling `submit` many times. We can chain maps on queues together to construct simple data processing pipelines.\n", | |
| "\n", | |
| "All of this logic happens on the client-side. None of this logic was hard-coded into the scheduler. This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 13, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "from queue import Queue\n", | |
| "from threading import Thread\n", | |
| "\n", | |
| "def multiplex(n, q, **kwargs):\n", | |
| " \"\"\" Convert one queue into several equivalent Queues\n", | |
| " \n", | |
| " >>> q1, q2, q3 = multiplex(3, in_q)\n", | |
| " \"\"\"\n", | |
| " out_queues = [Queue(**kwargs) for i in range(n)]\n", | |
| " def f():\n", | |
| " while True:\n", | |
| " x = q.get()\n", | |
| " for out_q in out_queues:\n", | |
| " out_q.put(x)\n", | |
| " t = Thread(target=f)\n", | |
| " t.daemon = True\n", | |
| " t.start()\n", | |
| " return out_queues " | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "```\n", | |
| " ----inc---->\n", | |
| " / \\ \n", | |
| "in_q --> q \\_add__ results\n", | |
| " \\ / \n", | |
| " ---double-->/\n", | |
| "```" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 14, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "in_q = Queue()\n", | |
| "q = c.scatter(in_q)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 15, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "<Future: status: finished, type: int, key: 8068024105615db5ba4cbd263d42a6db>" | |
| ] | |
| }, | |
| "execution_count": 15, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "in_q.put(1)\n", | |
| "q.get()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 16, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "q_1, q_2 = multiplex(2, q)\n", | |
| "\n", | |
| "inc_q = c.map(inc, q_1)\n", | |
| "double_q = c.map(double, q_2)\n", | |
| "\n", | |
| "add_q = c.map(add, inc_q, double_q)\n", | |
| "\n", | |
| "out_q = c.gather(add_q)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 17, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "31" | |
| ] | |
| }, | |
| "execution_count": 17, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "in_q.put(10)\n", | |
| "out_q.get()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 18, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "from random import random\n", | |
| "\n", | |
| "def feed(q):\n", | |
| " for i in range(10000):\n", | |
| " sleep(random())\n", | |
| " q.put(i)\n", | |
| " \n", | |
| "t = Thread(target=feed, args=(q,))\n", | |
| "t.daemon = True\n", | |
| "t.start()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": false | |
| }, | |
| "outputs": [], | |
| "source": [ | |
| "out_q.qsize()" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": null, | |
| "metadata": { | |
| "collapsed": true | |
| }, | |
| "outputs": [], | |
| "source": [] | |
| } | |
| ], | |
| "metadata": { | |
| "anaconda-cloud": {}, | |
| "kernelspec": { | |
| "display_name": "Python [Root]", | |
| "language": "python", | |
| "name": "Python [Root]" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.5.2" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 0 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment