Skip to content

Instantly share code, notes, and snippets.

@Archonic944
Created May 25, 2025 13:27
Show Gist options
  • Select an option

  • Save Archonic944/10ded3db054e0599833b75effe22d254 to your computer and use it in GitHub Desktop.

Select an option

Save Archonic944/10ded3db054e0599833b75effe22d254 to your computer and use it in GitHub Desktop.
# qtpy_esp32s2_i2s_stream.py
#
# Continuous 16-kHz mono recording → HTTP POST
# – no sample loss, no packet loss –
# tailored for Adafruit QT Py ESP32-S2 4 MB Flash / 2 MB PSRAM
#
import _thread, gc, time, network, usocket
from machine import Pin, I2S, reset
### ---------- user settings ---------- ###
SSID = "YOUR-SSID"
PASSWORD = "YOUR-PASS"
POST_URL = "http://143.244.173.85/mic1/upload/{epoch}?password=GreatSecurityForSending"
# GPIO mapping – adjust only if you rewired the mic breakout
PIN_SCK = 10 # BCK
PIN_WS = 11 # LRCL
PIN_SD = 15 # DATA
### ------------------------------------ ###
SAMPLE_RATE = 16_000 # Hz
BITS = 16
CHANNELS = 1
SECONDS_PER_CHUNK = 1 # duration of one buffer
BYTES_PER_CHUNK = SAMPLE_RATE * SECONDS_PER_CHUNK * CHANNELS * (BITS // 8)
NUM_BUFFERS = 4 # 4 × 32 kB = 128 kB, safe on PSRAM builds
# -------------------------------------------------------------------------
# fixed objects – every allocation here happens only once at boot
# -------------------------------------------------------------------------
BUFFERS = [bytearray(BYTES_PER_CHUNK) for _ in range(NUM_BUFFERS)]
BUF_READY = [False] * NUM_BUFFERS
BUF_COUNTER = [0] * NUM_BUFFERS
LOCK = _thread.allocate_lock()
# WAV header is constant because chunk size is constant
def _wav_header(datasize, sr=SAMPLE_RATE, bits=BITS, ch=CHANNELS):
riff = b"RIFF" + (datasize + 36).to_bytes(4, "little") + b"WAVE"
fmt_blk = (b"fmt " + (16).to_bytes(4, "little") +
(1).to_bytes(2, "little") + # PCM
ch.to_bytes(2, "little") +
sr.to_bytes(4, "little") +
(sr*ch*bits//8).to_bytes(4, "little") +
(ch*bits//8).to_bytes(2, "little") +
bits.to_bytes(2, "little"))
data = b"data" + datasize.to_bytes(4, "little")
return riff + fmt_blk + data
WAV_HDR = _wav_header(BYTES_PER_CHUNK) # 44 bytes
HTTP_HDR = (
"POST /mic1/upload/{epoch}?password=GreatSecurityForSending HTTP/1.1\r\n"
"Host: 143.244.173.85\r\n"
"Content-Type: audio/wav\r\n"
"Content-Length: {length}\r\n\r\n"
).encode()
# -------------------------------------------------------------------------
# Wi-Fi helper – blocks until link is up
# -------------------------------------------------------------------------
def wifi_connect():
wlan = network.WLAN(network.STA_IF)
if not wlan.active():
wlan.active(True)
if not wlan.isconnected():
wlan.connect(SSID, PASSWORD)
t0 = time.time()
while not wlan.isconnected():
if time.time() - t0 > 20:
print("Wi-Fi timeout; resetting")
reset()
time.sleep_ms(200)
print("Wi-Fi OK", wlan.ifconfig())
# -------------------------------------------------------------------------
# Recorder (thread 0 – foreground)
# -------------------------------------------------------------------------
def record_audio():
i2s = I2S(
0,
sck=Pin(PIN_SCK), ws=Pin(PIN_WS), sd=Pin(PIN_SD),
mode=I2S.RX, bits=BITS, format=I2S.MONO,
rate=SAMPLE_RATE, ibuf=4096 # small HW DMA buffer
)
buf_idx, chunk_num = 0, 0
while True:
# read one whole chunk in place
mv = memoryview(BUFFERS[buf_idx])
read = 0
while read < BYTES_PER_CHUNK: # under-run guard
read += i2s.readinto(mv[read:]) # never allocates
# flag buffer ready
with LOCK:
BUF_READY[buf_idx] = True
BUF_COUNTER[buf_idx] = chunk_num
chunk_num += 1
# stroll to the next free slot; wait if uploader is behind
buf_idx = (buf_idx + 1) % NUM_BUFFERS
while BUF_READY[buf_idx]:
time.sleep_ms(2) # back-pressure without dropping samples
gc.collect() # keep the heap tidy
# -------------------------------------------------------------------------
# Uploader (thread 1 – background)
# -------------------------------------------------------------------------
def _http_post(buf, epoch):
total = len(WAV_HDR) + BYTES_PER_CHUNK
hdr = HTTP_HDR.replace(b"{epoch}", str(epoch).encode(), 1
).replace(b"{length}", str(total).encode(), 1)
s = usocket.socket()
try:
s.connect(("143.244.173.85", 80))
s.sendall(hdr) # header first
s.sendall(WAV_HDR) # then 44-byte WAV header
s.sendall(buf) # finally raw PCM
# minimal response read – ignore body, just parse status
line = s.readline()
ok = line and line.startswith(b"HTTP/1.1 200")
return ok
finally:
s.close()
def upload_loop():
next_idx = 0
while True:
with LOCK:
ready = BUF_READY[next_idx]
if not ready:
next_idx = (next_idx + 1) % NUM_BUFFERS
time.sleep_ms(10)
continue
epoch = int(time.time())
# make a zero-copy memoryview while lock is released
mv = memoryview(BUFFERS[next_idx])
ok = _http_post(mv, epoch)
if ok:
with LOCK:
BUF_READY[next_idx] = False
else:
# leave BUF_READY set so we retry; small delay avoids thrashing
time.sleep(1)
# -------------------------------------------------------------------------
# Boot sequence
# -------------------------------------------------------------------------
def main():
wifi_connect()
_thread.start_new_thread(upload_loop, ())
record_audio() # never returns
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment