Created
May 25, 2025 13:27
-
-
Save Archonic944/10ded3db054e0599833b75effe22d254 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # qtpy_esp32s2_i2s_stream.py | |
| # | |
| # Continuous 16-kHz mono recording → HTTP POST | |
| # – no sample loss, no packet loss – | |
| # tailored for Adafruit QT Py ESP32-S2 4 MB Flash / 2 MB PSRAM | |
| # | |
| import _thread, gc, time, network, usocket | |
| from machine import Pin, I2S, reset | |
| ### ---------- user settings ---------- ### | |
| SSID = "YOUR-SSID" | |
| PASSWORD = "YOUR-PASS" | |
| POST_URL = "http://143.244.173.85/mic1/upload/{epoch}?password=GreatSecurityForSending" | |
| # GPIO mapping – adjust only if you rewired the mic breakout | |
| PIN_SCK = 10 # BCK | |
| PIN_WS = 11 # LRCL | |
| PIN_SD = 15 # DATA | |
| ### ------------------------------------ ### | |
| SAMPLE_RATE = 16_000 # Hz | |
| BITS = 16 | |
| CHANNELS = 1 | |
| SECONDS_PER_CHUNK = 1 # duration of one buffer | |
| BYTES_PER_CHUNK = SAMPLE_RATE * SECONDS_PER_CHUNK * CHANNELS * (BITS // 8) | |
| NUM_BUFFERS = 4 # 4 × 32 kB = 128 kB, safe on PSRAM builds | |
| # ------------------------------------------------------------------------- | |
| # fixed objects – every allocation here happens only once at boot | |
| # ------------------------------------------------------------------------- | |
| BUFFERS = [bytearray(BYTES_PER_CHUNK) for _ in range(NUM_BUFFERS)] | |
| BUF_READY = [False] * NUM_BUFFERS | |
| BUF_COUNTER = [0] * NUM_BUFFERS | |
| LOCK = _thread.allocate_lock() | |
| # WAV header is constant because chunk size is constant | |
| def _wav_header(datasize, sr=SAMPLE_RATE, bits=BITS, ch=CHANNELS): | |
| riff = b"RIFF" + (datasize + 36).to_bytes(4, "little") + b"WAVE" | |
| fmt_blk = (b"fmt " + (16).to_bytes(4, "little") + | |
| (1).to_bytes(2, "little") + # PCM | |
| ch.to_bytes(2, "little") + | |
| sr.to_bytes(4, "little") + | |
| (sr*ch*bits//8).to_bytes(4, "little") + | |
| (ch*bits//8).to_bytes(2, "little") + | |
| bits.to_bytes(2, "little")) | |
| data = b"data" + datasize.to_bytes(4, "little") | |
| return riff + fmt_blk + data | |
| WAV_HDR = _wav_header(BYTES_PER_CHUNK) # 44 bytes | |
| HTTP_HDR = ( | |
| "POST /mic1/upload/{epoch}?password=GreatSecurityForSending HTTP/1.1\r\n" | |
| "Host: 143.244.173.85\r\n" | |
| "Content-Type: audio/wav\r\n" | |
| "Content-Length: {length}\r\n\r\n" | |
| ).encode() | |
| # ------------------------------------------------------------------------- | |
| # Wi-Fi helper – blocks until link is up | |
| # ------------------------------------------------------------------------- | |
| def wifi_connect(): | |
| wlan = network.WLAN(network.STA_IF) | |
| if not wlan.active(): | |
| wlan.active(True) | |
| if not wlan.isconnected(): | |
| wlan.connect(SSID, PASSWORD) | |
| t0 = time.time() | |
| while not wlan.isconnected(): | |
| if time.time() - t0 > 20: | |
| print("Wi-Fi timeout; resetting") | |
| reset() | |
| time.sleep_ms(200) | |
| print("Wi-Fi OK", wlan.ifconfig()) | |
| # ------------------------------------------------------------------------- | |
| # Recorder (thread 0 – foreground) | |
| # ------------------------------------------------------------------------- | |
| def record_audio(): | |
| i2s = I2S( | |
| 0, | |
| sck=Pin(PIN_SCK), ws=Pin(PIN_WS), sd=Pin(PIN_SD), | |
| mode=I2S.RX, bits=BITS, format=I2S.MONO, | |
| rate=SAMPLE_RATE, ibuf=4096 # small HW DMA buffer | |
| ) | |
| buf_idx, chunk_num = 0, 0 | |
| while True: | |
| # read one whole chunk in place | |
| mv = memoryview(BUFFERS[buf_idx]) | |
| read = 0 | |
| while read < BYTES_PER_CHUNK: # under-run guard | |
| read += i2s.readinto(mv[read:]) # never allocates | |
| # flag buffer ready | |
| with LOCK: | |
| BUF_READY[buf_idx] = True | |
| BUF_COUNTER[buf_idx] = chunk_num | |
| chunk_num += 1 | |
| # stroll to the next free slot; wait if uploader is behind | |
| buf_idx = (buf_idx + 1) % NUM_BUFFERS | |
| while BUF_READY[buf_idx]: | |
| time.sleep_ms(2) # back-pressure without dropping samples | |
| gc.collect() # keep the heap tidy | |
| # ------------------------------------------------------------------------- | |
| # Uploader (thread 1 – background) | |
| # ------------------------------------------------------------------------- | |
| def _http_post(buf, epoch): | |
| total = len(WAV_HDR) + BYTES_PER_CHUNK | |
| hdr = HTTP_HDR.replace(b"{epoch}", str(epoch).encode(), 1 | |
| ).replace(b"{length}", str(total).encode(), 1) | |
| s = usocket.socket() | |
| try: | |
| s.connect(("143.244.173.85", 80)) | |
| s.sendall(hdr) # header first | |
| s.sendall(WAV_HDR) # then 44-byte WAV header | |
| s.sendall(buf) # finally raw PCM | |
| # minimal response read – ignore body, just parse status | |
| line = s.readline() | |
| ok = line and line.startswith(b"HTTP/1.1 200") | |
| return ok | |
| finally: | |
| s.close() | |
| def upload_loop(): | |
| next_idx = 0 | |
| while True: | |
| with LOCK: | |
| ready = BUF_READY[next_idx] | |
| if not ready: | |
| next_idx = (next_idx + 1) % NUM_BUFFERS | |
| time.sleep_ms(10) | |
| continue | |
| epoch = int(time.time()) | |
| # make a zero-copy memoryview while lock is released | |
| mv = memoryview(BUFFERS[next_idx]) | |
| ok = _http_post(mv, epoch) | |
| if ok: | |
| with LOCK: | |
| BUF_READY[next_idx] = False | |
| else: | |
| # leave BUF_READY set so we retry; small delay avoids thrashing | |
| time.sleep(1) | |
| # ------------------------------------------------------------------------- | |
| # Boot sequence | |
| # ------------------------------------------------------------------------- | |
| def main(): | |
| wifi_connect() | |
| _thread.start_new_thread(upload_loop, ()) | |
| record_audio() # never returns | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment