Skip to content

Instantly share code, notes, and snippets.

@katagaki
Created July 31, 2025 04:17
Show Gist options
  • Select an option

  • Save katagaki/9ca71d4b2443df952a73330062329b1b to your computer and use it in GitHub Desktop.

Select an option

Save katagaki/9ca71d4b2443df952a73330062329b1b to your computer and use it in GitHub Desktop.
Execute Q queries using PyKx from a threaded application such as Flask or FastAPI
import os
from concurrent.futures import ProcessPoolExecutor, Future
from io import BytesIO
from pandas import DataFrame, read_parquet
# Requires installation of pyarrow as well
def run_query_in_process(query: str) -> bytes:
# Import pykx here to avoid import failure when running in thread
os.environ["PYKX_UNLICENSED"] = "true"
import pykx as kx
with kx.SyncQConnection(
host="data.kivotos-finance.com", port=5000,
username="SShiroko", password="ILoveBanks", tls=True
) as q_connection:
# Return the connection result as a DataFrame converted to parquet bytes
return (
q_connection(query)
.pd()
.to_parquet()
)
def run_query(query: str) -> DataFrame:
with ProcessPoolExecutor(max_workers=1) as executor:
future: Future = executor.submit(run_query_in_process, query)
future_result: bytes = future.result(timeout=60)
result_buffer = BytesIO(future_result)
# Parse the result back into a DataFrame from parquet bytes
result: DataFrame = read_parquet(result_buffer)
return result
if __name__ == "__main__":
# This code can be executed in a threaded application (Flask, FastAPI, etc)
zl_dataframe: DataFrame = run_query(".z.l")
print(zl_dataframe)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment