(具体的な実装についてはなるべく触れず、なんかイメージ的な話をする。)
- 「これこれのイベントが起こったら、この続く処理を実行してくれ」
| import array | |
| import json | |
| import os | |
| import socket | |
| import sys | |
| SOCKET_PATH = '/tmp/pyd.sock' | |
| sock = socket.socket(socket.AF_UNIX) |
| #!/bin/sh | |
| # XXX: 雑 | |
| mkdir -p ~/.piping | |
| if [ "$1" = "-w" ]; then | |
| mkfifo ~/.piping/"$2" || exit 1 | |
| cat > ~/.piping/"$2" | |
| rm ~/.piping/"$2" | |
| elif [ "$1" = "-r" ]; then | |
| cat ~/.piping/"$2" |
| import email.message | |
| msg = email.message.EmailMessage() | |
| msg['Subject'] = '[こんにちは、世界] 新しい朝が来た、希望の朝だ' | |
| print(msg.as_string()) |
| import asyncio | |
| class FunctionPool: | |
| '''Connection pool の関数版みたいなやつ考える | |
| * 同時に使っていい分だけ関数が pool されている | |
| * 関数を使いたいときはそこから1個取り出して使う | |
| * 使い終わったら pool に戻す | |
| * pool が空なら誰かが使い終わって戻してくれるまで待つ |
| import asyncio | |
| import functools | |
| import time | |
| import urllib.parse | |
| import aiohttp | |
| def throttling(sleep): | |
| def decorator(func): |
| python3 <<EOP | |
| import gi | |
| gi.require_version('Gtk', '3.0') | |
| gi.require_version('WebKit2', '4.0') | |
| from gi.repository import Gtk | |
| from gi.repository import WebKit2 |
| import threading | |
| from asyncio_extras.threads import threadpool | |
| async def main(): | |
| print('main:', threading.current_thread()) | |
| for _ in range(4): | |
| async with threadpool(): |
| class A(object): | |
| # XXX: こういう、お互いに循環して使っているクラスの type hint ってどうやって書くの? | |
| #def use_B(self, b: B): | |
| def use_B(self, b): | |
| print("I'm using {}".format(b)) | |
| class B(object): | |
| def use_A(self, a: A): | |
| print("I'm using {}".format(a)) |
| import asyncio | |
| import threading | |
| def unset_event_loop_policy_forever(): | |
| while True: | |
| asyncio.set_event_loop_policy(None) | |
| t = threading.Thread(target=unset_event_loop_policy_forever) |