Skip to content

Instantly share code, notes, and snippets.

@seansummers
Last active September 23, 2025 20:15
Show Gist options
  • Select an option

  • Save seansummers/77ea808dc59c7521fa57018765c4553d to your computer and use it in GitHub Desktop.

Select an option

Save seansummers/77ea808dc59c7521fa57018765c4553d to your computer and use it in GitHub Desktop.
HTTP Client Implementations
"""This module implements a basic resquests-like wrapper around awscrt.http.
Functional for one-shot GET and POST (no chunking).
It does not directly support url cookies, multipart upload, auth, or redirects.
It does not support sessions and creates a new connection for every requests_* call.
It supports limited kwargs from the requests API (most notably 'headers').
Simple cli with `uv run awscrtrequests.py`
"""
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "awscrt",
# ]
# ///
from collections import deque
from contextlib import suppress
from functools import cache
from json import dumps, loads
from io import BytesIO
from types import SimpleNamespace
from typing import Any, Iterable, Mapping, Sequence
from urllib.parse import quote_plus, urlparse, urlunparse
from urllib.error import HTTPError
from awscrt.http import HttpClientConnection, HttpClientStream, HttpHeaders, HttpRequest
from awscrt.io import (
ClientTlsContext,
TlsContextOptions,
TlsConnectionOptions,
)
from awscrt.io import LogLevel, init_logging
init_logging(LogLevel.Warn, "stderr")
TLS_CTX = ClientTlsContext(TlsContextOptions())
class AwsCrtHttpResponse:
"""class to accumulate concurrent `awscrt.http.HttpRequest` results"""
http_stream: HttpClientStream
status_code: int
headers: list[tuple[str, str]]
chunks: deque[bytes]
def __init__(self):
self.chunks = deque()
def on_response(
self,
http_stream: HttpClientStream,
status_code: int,
headers: list[tuple[str, str]],
**_kwargs,
):
"""callback invoked once main response headers are received"""
self.http_stream = http_stream
self.status_code = status_code
self.headers = headers
def on_body(
self, http_stream: HttpClientStream, chunk: bytes, **_kwargs
): # pyright: ignore[reportUnusedParameter]
"""invoked 0+ times as response body data is received"""
self.chunks.append(chunk)
class HttpResponse(AwsCrtHttpResponse):
"""class to accumulate a single `awscrt.http.HttpRequest` result and provide
similar interface to the popular `requests.Response`
"""
def _content(self, wait: bool = False) -> bytes:
"""Content of the response (possibly incomplete), in bytes.
If wait == True, return after the response is complete.
"""
if wait:
while True:
# might be called before .on_response runs!
with suppress(AttributeError):
if self.http_stream.completion_future.result():
break
return b"".join(self.iter_content())
@property
def ok(self) -> bool:
"""requests: Returns True if status_code is less than 400, False if not."""
while True:
# might be called before .on_response runs!
with suppress(AttributeError):
return self.status_code < 400
def iter_content(self, **_kwargs) -> Iterable[bytes]:
"""requests: Start returning chunks, possibly before the request is complete"""
yield from self.chunks
@property
def content(self) -> bytes:
"""requests: Content of the response, in bytes."""
return self._content(wait=True)
@property
def text(self) -> str:
"""requests: Content of the response, in unicode."""
return self.content.decode()
def json(self, **kwargs) -> str:
"""requests: Decodes the JSON response body (if any) as a Python object."""
return loads(self.content, **kwargs)
def raise_for_status(self) -> None:
"""requests: Raises `urllib.error.HTTPError`, if one occurred."""
if self.ok:
return
code = self.status_code
netloc = f"https://{self.http_stream.connection.host_name}:{self.http_stream.connection.port}"
msg = self.http_stream.completion_future.exception()
raise HTTPError(netloc, code, msg, self.headers, None)
class HttpConnection(SimpleNamespace):
"""This container holds the components needed for HTTP[S] Connection"""
host_name: str
port: int
path: str
headers: HttpHeaders
tls_connection_options: TlsConnectionOptions | None
@cache
def get_tls_options(
host_name: str, tls_ctx: ClientTlsContext | None = TLS_CTX
) -> TlsConnectionOptions:
"""Cacheable connection-specific TLS options"""
tls_connection_options = TlsConnectionOptions(tls_ctx or TLS_CTX)
tls_connection_options.set_server_name(host_name)
return tls_connection_options
def setup_http_connection(
url: str, headers: list[tuple[str, str]] = None, tls_ctx: ClientTlsContext = None
) -> HttpConnection:
"""HTTP Connection setup that is common accros request types"""
# parse out the URL components and fix them up
parsed_url = urlparse(url)
port = int(parsed_url.port or 443 if parsed_url.scheme == "https" else 80)
parsed_url = parsed_url._replace(
netloc=f"{parsed_url.hostname}:{port}",
path=parsed_url.path or "/",
params=quote_plus(parsed_url.params, safe="=&"),
query=quote_plus(parsed_url.query, safe="=&"),
fragment=quote_plus(parsed_url.fragment, safe="=&"),
)
# path should not need scheme, host, port
path = urlunparse(parsed_url).removeprefix(
f"{parsed_url.scheme}://{parsed_url.netloc}"
)
# Set up TLS if needed ('host' is required for SNI)
tls_connection_options = None
if tls_ctx or parsed_url.scheme == "https":
tls_connection_options = get_tls_options(parsed_url.hostname, tls_ctx)
# set up Headers ('host' is almost universally required)
match headers:
case None:
headers = HttpHeaders()
case _ if isinstance(headers, Mapping):
headers = HttpHeaders(headers.items())
case _ if isinstance(headers, Sequence):
headers = HttpHeaders(headers)
if not headers.get("host"):
headers.add("host", parsed_url.hostname)
return HttpConnection(
host_name=parsed_url.hostname,
port=port,
path=path,
headers=headers,
tls_connection_options=tls_connection_options,
)
def send_request(
http_request: HttpRequest, http_client_connection: HttpClientConnection
) -> HttpResponse:
"""Return a Python HttpResonse to collect the results from the HTTP request"""
response = HttpResponse()
stream = http_client_connection.request(
http_request, response.on_response, response.on_body
)
stream.activate()
return response
def requests_post(
url: str,
data: dict[str, Any] | Sequence | bytes | str = None,
json: dict[str, Any] = None,
headers: list[tuple[str, str]] = None,
**_kwargs,
) -> HttpResponse:
"""requests.post: Sends a POST request (supports only one-shot payloads)"""
# start up the connection
connection = setup_http_connection(url, headers)
client_connection_future = HttpClientConnection.new(
connection.host_name,
connection.port,
tls_connection_options=connection.tls_connection_options,
)
# convert body to bytes, adding headers if needed
while not isinstance(body := json or data or b"", bytes):
match body:
case _ if not isinstance(body, (str, bytes)):
# if not str, convert to str (using json)
json = dumps(body, separators=(",", ":"))
case _ if (
isinstance(body, str)
and json
and not connection.headers.get("Content-type")
):
# if json, add Content-type (if not pre-existing)
connection.headers.add("Content-type", "application/json")
case _ if isinstance(body, str):
# if we are a string and content-type is set, update to bytes
json = None
data = body.encode()
if not connection.headers.get("Content-length"):
connection.headers.add("Content-length", str(len(body)))
request = HttpRequest(
method="POST",
path=connection.path,
headers=connection.headers,
body_stream=BytesIO(body),
)
response = send_request(request, client_connection_future.result())
return response
def requests_get(
url: str, params: dict[str, Any] | Sequence | bytes = None, **kwargs
) -> HttpResponse:
"""Function to match exisitng use of requests.post"""
# start up the connection
connection = setup_http_connection(url, kwargs.get("headers"))
client_connection_future = HttpClientConnection.new(
connection.host_name,
connection.port,
tls_connection_options=connection.tls_connection_options,
)
request = HttpRequest(
method="GET",
path=connection.path,
headers=connection.headers,
)
response = send_request(request, client_connection_future.result())
return response
def main(args):
"""Simple CLI for testing get/post HTTP[S] requests"""
from argparse import ArgumentParser
parser = ArgumentParser(description="Simple HTTP cli using awscrt.")
subparser = parser.add_subparsers(title="HTTP verbs", required=True)
verb_parser = subparser.add_parser("get")
verb_parser.set_defaults(verb="get")
verb_parser.add_argument("url")
verb_parser.add_argument(
"-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers"
)
verb_parser = subparser.add_parser("post")
verb_parser.set_defaults(verb="post")
verb_parser.add_argument("url")
verb_parser.add_argument(
"-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers"
)
post_group = verb_parser.add_mutually_exclusive_group()
post_group.add_argument("-d", "--data", nargs="?")
post_group.add_argument("-j", "--json", nargs="?")
args = parser.parse_args(args)
if not hasattr(args, "verb"):
raise SystemExit(parser.print_usage())
match args.verb:
case "get":
response = requests_get(args.url)
case "post":
response = requests_post(
args.url, data=args.data, json=args.json, headers=args.headers
)
print(response.text)
if __name__ == "__main__":
import sys
main(sys.argv[1:])
__all__ = ["HttpResponse", "requests_get", "requests_post"]
"""Pure Python (no dependencies) clone of basic `requests` functionality."""
# /// script
# requires-python = ">=3.8"
# dependencies = [
# ]
# ///
import json
from functools import singledispatchmethod
from http.client import HTTPMessage, HTTPResponse
from io import BytesIO
from typing import Iterable
from urllib.error import HTTPError
from urllib.request import Request, urlopen
def json_dumps(obj: dict) -> str:
return json.dumps(obj, separators=(",", ":"))
class Response:
url: str
status_code: int
reason: str
headers: HTTPMessage
content: bytes
@singledispatchmethod
def __init__(self, response: HTTPResponse):
self.url = response.url
self.status_code = response.status
self.reason = response.reason
self.headers = response.headers
self.content = response.read()
@__init__.register
def _(self, http_error: HTTPError):
self.url = http_error.url
self.status_code = http_error.code
self.reason = http_error.reason
self.headers = http_error.headers
self.content = http_error.fp.read()
self._http_error = http_error
@property
def text(self) -> str:
return self.content.decode()
def json(self) -> str | None:
return json.loads(self.text) if self.text else None
@property
def ok(self) -> bool:
return self.status_code < 400
def raise_for_status(self) -> None:
if self.ok:
return
raise self._http_error
def do_request(request: Request) -> Response:
try:
with urlopen(request) as http_response:
return Response(http_response)
except HTTPError as http_error:
return Response(http_error)
def get(base_url: str, headers: dict = None) -> Response:
request = Request(base_url, headers=headers or {})
return do_request(request)
def post(
base_url: str,
json: dict = None,
data: bytes | BytesIO | Iterable[bytes] | str = None,
headers: dict = None,
) -> Response:
args = {"url": base_url, "method": "POST", "headers": headers or {}}
if json:
data = json_dumps(json)
args["headers"]["content-type"] = "application/json"
if isinstance(data, str):
data = data.encode()
if data:
args["data"] = data
request = Request(**args)
return do_request(request)
def main(args):
"""Simple CLI for testing get/post HTTP[S] requests"""
from argparse import ArgumentParser
parser = ArgumentParser(description="Simple HTTP cli using awscrt.")
subparser = parser.add_subparsers(title="HTTP verbs", required=True)
verb_parser = subparser.add_parser("get")
verb_parser.set_defaults(verb="get")
verb_parser.add_argument("url")
verb_parser.add_argument(
"-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers"
)
verb_parser = subparser.add_parser("post")
verb_parser.set_defaults(verb="post")
verb_parser.add_argument("url")
verb_parser.add_argument(
"-H", "--header", nargs="+", type=lambda _: tuple(_.split("=")), dest="headers"
)
post_group = verb_parser.add_mutually_exclusive_group()
post_group.add_argument("-d", "--data", nargs="?")
post_group.add_argument("-j", "--json", nargs="?")
args = parser.parse_args(args)
if not hasattr(args, "verb"):
raise SystemExit(parser.print_usage())
match args.verb:
case "get":
response = get(args.url)
case "post":
if args.json:
args.json = json.loads(args.json)
response = post(
args.url, data=args.data, json=args.json, headers=args.headers
)
print(response.text)
if __name__ == "__main__":
import sys
main(sys.argv[1:])
__all__ = ["get", "post"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment