Last active
September 23, 2025 20:15
-
-
Save seansummers/77ea808dc59c7521fa57018765c4553d to your computer and use it in GitHub Desktop.
HTTP Client Implementations
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """This module implements a basic resquests-like wrapper around awscrt.http. | |
| Functional for one-shot GET and POST (no chunking). | |
| It does not directly support url cookies, multipart upload, auth, or redirects. | |
| It does not support sessions and creates a new connection for every requests_* call. | |
| It supports limited kwargs from the requests API (most notably 'headers'). | |
| Simple cli with `uv run awscrtrequests.py` | |
| """ | |
| # /// script | |
| # requires-python = ">=3.11" | |
| # dependencies = [ | |
| # "awscrt", | |
| # ] | |
| # /// | |
| from collections import deque | |
| from contextlib import suppress | |
| from functools import cache | |
| from json import dumps, loads | |
| from io import BytesIO | |
| from types import SimpleNamespace | |
| from typing import Any, Iterable, Mapping, Sequence | |
| from urllib.parse import quote_plus, urlparse, urlunparse | |
| from urllib.error import HTTPError | |
| from awscrt.http import HttpClientConnection, HttpClientStream, HttpHeaders, HttpRequest | |
| from awscrt.io import ( | |
| ClientTlsContext, | |
| TlsContextOptions, | |
| TlsConnectionOptions, | |
| ) | |
| from awscrt.io import LogLevel, init_logging | |
| init_logging(LogLevel.Warn, "stderr") | |
| TLS_CTX = ClientTlsContext(TlsContextOptions()) | |
| class AwsCrtHttpResponse: | |
| """class to accumulate concurrent `awscrt.http.HttpRequest` results""" | |
| http_stream: HttpClientStream | |
| status_code: int | |
| headers: list[tuple[str, str]] | |
| chunks: deque[bytes] | |
| def __init__(self): | |
| self.chunks = deque() | |
| def on_response( | |
| self, | |
| http_stream: HttpClientStream, | |
| status_code: int, | |
| headers: list[tuple[str, str]], | |
| **_kwargs, | |
| ): | |
| """callback invoked once main response headers are received""" | |
| self.http_stream = http_stream | |
| self.status_code = status_code | |
| self.headers = headers | |
| def on_body( | |
| self, http_stream: HttpClientStream, chunk: bytes, **_kwargs | |
| ): # pyright: ignore[reportUnusedParameter] | |
| """invoked 0+ times as response body data is received""" | |
| self.chunks.append(chunk) | |
| class HttpResponse(AwsCrtHttpResponse): | |
| """class to accumulate a single `awscrt.http.HttpRequest` result and provide | |
| similar interface to the popular `requests.Response` | |
| """ | |
| def _content(self, wait: bool = False) -> bytes: | |
| """Content of the response (possibly incomplete), in bytes. | |
| If wait == True, return after the response is complete. | |
| """ | |
| if wait: | |
| while True: | |
| # might be called before .on_response runs! | |
| with suppress(AttributeError): | |
| if self.http_stream.completion_future.result(): | |
| break | |
| return b"".join(self.iter_content()) | |
| @property | |
| def ok(self) -> bool: | |
| """requests: Returns True if status_code is less than 400, False if not.""" | |
| while True: | |
| # might be called before .on_response runs! | |
| with suppress(AttributeError): | |
| return self.status_code < 400 | |
| def iter_content(self, **_kwargs) -> Iterable[bytes]: | |
| """requests: Start returning chunks, possibly before the request is complete""" | |
| yield from self.chunks | |
| @property | |
| def content(self) -> bytes: | |
| """requests: Content of the response, in bytes.""" | |
| return self._content(wait=True) | |
| @property | |
| def text(self) -> str: | |
| """requests: Content of the response, in unicode.""" | |
| return self.content.decode() | |
| def json(self, **kwargs) -> str: | |
| """requests: Decodes the JSON response body (if any) as a Python object.""" | |
| return loads(self.content, **kwargs) | |
| def raise_for_status(self) -> None: | |
| """requests: Raises `urllib.error.HTTPError`, if one occurred.""" | |
| if self.ok: | |
| return | |
| code = self.status_code | |
| netloc = f"https://{self.http_stream.connection.host_name}:{self.http_stream.connection.port}" | |
| msg = self.http_stream.completion_future.exception() | |
| raise HTTPError(netloc, code, msg, self.headers, None) | |
| class HttpConnection(SimpleNamespace): | |
| """This container holds the components needed for HTTP[S] Connection""" | |
| host_name: str | |
| port: int | |
| path: str | |
| headers: HttpHeaders | |
| tls_connection_options: TlsConnectionOptions | None | |
| @cache | |
| def get_tls_options( | |
| host_name: str, tls_ctx: ClientTlsContext | None = TLS_CTX | |
| ) -> TlsConnectionOptions: | |
| """Cacheable connection-specific TLS options""" | |
| tls_connection_options = TlsConnectionOptions(tls_ctx or TLS_CTX) | |
| tls_connection_options.set_server_name(host_name) | |
| return tls_connection_options | |
| def setup_http_connection( | |
| url: str, headers: list[tuple[str, str]] = None, tls_ctx: ClientTlsContext = None | |
| ) -> HttpConnection: | |
| """HTTP Connection setup that is common accros request types""" | |
| # parse out the URL components and fix them up | |
| parsed_url = urlparse(url) | |
| port = int(parsed_url.port or 443 if parsed_url.scheme == "https" else 80) | |
| parsed_url = parsed_url._replace( | |
| netloc=f"{parsed_url.hostname}:{port}", | |
| path=parsed_url.path or "/", | |
| params=quote_plus(parsed_url.params, safe="=&"), | |
| query=quote_plus(parsed_url.query, safe="=&"), | |
| fragment=quote_plus(parsed_url.fragment, safe="=&"), | |
| ) | |
| # path should not need scheme, host, port | |
| path = urlunparse(parsed_url).removeprefix( | |
| f"{parsed_url.scheme}://{parsed_url.netloc}" | |
| ) | |
| # Set up TLS if needed ('host' is required for SNI) | |
| tls_connection_options = None | |
| if tls_ctx or parsed_url.scheme == "https": | |
| tls_connection_options = get_tls_options(parsed_url.hostname, tls_ctx) | |
| # set up Headers ('host' is almost universally required) | |
| match headers: | |
| case None: | |
| headers = HttpHeaders() | |
| case _ if isinstance(headers, Mapping): | |
| headers = HttpHeaders(headers.items()) | |
| case _ if isinstance(headers, Sequence): | |
| headers = HttpHeaders(headers) | |
| if not headers.get("host"): | |
| headers.add("host", parsed_url.hostname) | |
| return HttpConnection( | |
| host_name=parsed_url.hostname, | |
| port=port, | |
| path=path, | |
| headers=headers, | |
| tls_connection_options=tls_connection_options, | |
| ) | |
| def send_request( | |
| http_request: HttpRequest, http_client_connection: HttpClientConnection | |
| ) -> HttpResponse: | |
| """Return a Python HttpResonse to collect the results from the HTTP request""" | |
| response = HttpResponse() | |
| stream = http_client_connection.request( | |
| http_request, response.on_response, response.on_body | |
| ) | |
| stream.activate() | |
| return response | |
| def requests_post( | |
| url: str, | |
| data: dict[str, Any] | Sequence | bytes | str = None, | |
| json: dict[str, Any] = None, | |
| headers: list[tuple[str, str]] = None, | |
| **_kwargs, | |
| ) -> HttpResponse: | |
| """requests.post: Sends a POST request (supports only one-shot payloads)""" | |
| # start up the connection | |
| connection = setup_http_connection(url, headers) | |
| client_connection_future = HttpClientConnection.new( | |
| connection.host_name, | |
| connection.port, | |
| tls_connection_options=connection.tls_connection_options, | |
| ) | |
| # convert body to bytes, adding headers if needed | |
| while not isinstance(body := json or data or b"", bytes): | |
| match body: | |
| case _ if not isinstance(body, (str, bytes)): | |
| # if not str, convert to str (using json) | |
| json = dumps(body, separators=(",", ":")) | |
| case _ if ( | |
| isinstance(body, str) | |
| and json | |
| and not connection.headers.get("Content-type") | |
| ): | |
| # if json, add Content-type (if not pre-existing) | |
| connection.headers.add("Content-type", "application/json") | |
| case _ if isinstance(body, str): | |
| # if we are a string and content-type is set, update to bytes | |
| json = None | |
| data = body.encode() | |
| if not connection.headers.get("Content-length"): | |
| connection.headers.add("Content-length", str(len(body))) | |
| request = HttpRequest( | |
| method="POST", | |
| path=connection.path, | |
| headers=connection.headers, | |
| body_stream=BytesIO(body), | |
| ) | |
| response = send_request(request, client_connection_future.result()) | |
| return response | |
| def requests_get( | |
| url: str, params: dict[str, Any] | Sequence | bytes = None, **kwargs | |
| ) -> HttpResponse: | |
| """Function to match exisitng use of requests.post""" | |
| # start up the connection | |
| connection = setup_http_connection(url, kwargs.get("headers")) | |
| client_connection_future = HttpClientConnection.new( | |
| connection.host_name, | |
| connection.port, | |
| tls_connection_options=connection.tls_connection_options, | |
| ) | |
| request = HttpRequest( | |
| method="GET", | |
| path=connection.path, | |
| headers=connection.headers, | |
| ) | |
| response = send_request(request, client_connection_future.result()) | |
| return response | |
| def main(args): | |
| """Simple CLI for testing get/post HTTP[S] requests""" | |
| from argparse import ArgumentParser | |
| parser = ArgumentParser(description="Simple HTTP cli using awscrt.") | |
| subparser = parser.add_subparsers(title="HTTP verbs", required=True) | |
| verb_parser = subparser.add_parser("get") | |
| verb_parser.set_defaults(verb="get") | |
| verb_parser.add_argument("url") | |
| verb_parser.add_argument( | |
| "-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers" | |
| ) | |
| verb_parser = subparser.add_parser("post") | |
| verb_parser.set_defaults(verb="post") | |
| verb_parser.add_argument("url") | |
| verb_parser.add_argument( | |
| "-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers" | |
| ) | |
| post_group = verb_parser.add_mutually_exclusive_group() | |
| post_group.add_argument("-d", "--data", nargs="?") | |
| post_group.add_argument("-j", "--json", nargs="?") | |
| args = parser.parse_args(args) | |
| if not hasattr(args, "verb"): | |
| raise SystemExit(parser.print_usage()) | |
| match args.verb: | |
| case "get": | |
| response = requests_get(args.url) | |
| case "post": | |
| response = requests_post( | |
| args.url, data=args.data, json=args.json, headers=args.headers | |
| ) | |
| print(response.text) | |
| if __name__ == "__main__": | |
| import sys | |
| main(sys.argv[1:]) | |
| __all__ = ["HttpResponse", "requests_get", "requests_post"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Pure Python (no dependencies) clone of basic `requests` functionality.""" | |
| # /// script | |
| # requires-python = ">=3.8" | |
| # dependencies = [ | |
| # ] | |
| # /// | |
| import json | |
| from functools import singledispatchmethod | |
| from http.client import HTTPMessage, HTTPResponse | |
| from io import BytesIO | |
| from typing import Iterable | |
| from urllib.error import HTTPError | |
| from urllib.request import Request, urlopen | |
| def json_dumps(obj: dict) -> str: | |
| return json.dumps(obj, separators=(",", ":")) | |
| class Response: | |
| url: str | |
| status_code: int | |
| reason: str | |
| headers: HTTPMessage | |
| content: bytes | |
| @singledispatchmethod | |
| def __init__(self, response: HTTPResponse): | |
| self.url = response.url | |
| self.status_code = response.status | |
| self.reason = response.reason | |
| self.headers = response.headers | |
| self.content = response.read() | |
| @__init__.register | |
| def _(self, http_error: HTTPError): | |
| self.url = http_error.url | |
| self.status_code = http_error.code | |
| self.reason = http_error.reason | |
| self.headers = http_error.headers | |
| self.content = http_error.fp.read() | |
| self._http_error = http_error | |
| @property | |
| def text(self) -> str: | |
| return self.content.decode() | |
| def json(self) -> str | None: | |
| return json.loads(self.text) if self.text else None | |
| @property | |
| def ok(self) -> bool: | |
| return self.status_code < 400 | |
| def raise_for_status(self) -> None: | |
| if self.ok: | |
| return | |
| raise self._http_error | |
| def do_request(request: Request) -> Response: | |
| try: | |
| with urlopen(request) as http_response: | |
| return Response(http_response) | |
| except HTTPError as http_error: | |
| return Response(http_error) | |
| def get(base_url: str, headers: dict = None) -> Response: | |
| request = Request(base_url, headers=headers or {}) | |
| return do_request(request) | |
| def post( | |
| base_url: str, | |
| json: dict = None, | |
| data: bytes | BytesIO | Iterable[bytes] | str = None, | |
| headers: dict = None, | |
| ) -> Response: | |
| args = {"url": base_url, "method": "POST", "headers": headers or {}} | |
| if json: | |
| data = json_dumps(json) | |
| args["headers"]["content-type"] = "application/json" | |
| if isinstance(data, str): | |
| data = data.encode() | |
| if data: | |
| args["data"] = data | |
| request = Request(**args) | |
| return do_request(request) | |
| def main(args): | |
| """Simple CLI for testing get/post HTTP[S] requests""" | |
| from argparse import ArgumentParser | |
| parser = ArgumentParser(description="Simple HTTP cli using awscrt.") | |
| subparser = parser.add_subparsers(title="HTTP verbs", required=True) | |
| verb_parser = subparser.add_parser("get") | |
| verb_parser.set_defaults(verb="get") | |
| verb_parser.add_argument("url") | |
| verb_parser.add_argument( | |
| "-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers" | |
| ) | |
| verb_parser = subparser.add_parser("post") | |
| verb_parser.set_defaults(verb="post") | |
| verb_parser.add_argument("url") | |
| verb_parser.add_argument( | |
| "-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers" | |
| ) | |
| post_group = verb_parser.add_mutually_exclusive_group() | |
| post_group.add_argument("-d", "--data", nargs="?") | |
| post_group.add_argument("-j", "--json", nargs="?") | |
| args = parser.parse_args(args) | |
| if not hasattr(args, "verb"): | |
| raise SystemExit(parser.print_usage()) | |
| match args.verb: | |
| case "get": | |
| response = get(args.url) | |
| case "post": | |
| if args.json: | |
| args.json = json.loads(args.json) | |
| response = post( | |
| args.url, data=args.data, json=args.json, headers=args.headers | |
| ) | |
| print(response.text) | |
| if __name__ == "__main__": | |
| import sys | |
| main(sys.argv[1:]) | |
| __all__ = ["get", "post"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment