Created
July 31, 2025 04:17
-
-
Save katagaki/9ca71d4b2443df952a73330062329b1b to your computer and use it in GitHub Desktop.
Execute Q queries using PyKx from a threaded application such as Flask or FastAPI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from concurrent.futures import ProcessPoolExecutor, Future | |
| from io import BytesIO | |
| from pandas import DataFrame, read_parquet | |
| # Requires installation of pyarrow as well | |
| def run_query_in_process(query: str) -> bytes: | |
| # Import pykx here to avoid import failure when running in thread | |
| os.environ["PYKX_UNLICENSED"] = "true" | |
| import pykx as kx | |
| with kx.SyncQConnection( | |
| host="data.kivotos-finance.com", port=5000, | |
| username="SShiroko", password="ILoveBanks", tls=True | |
| ) as q_connection: | |
| # Return the connection result as a DataFrame converted to parquet bytes | |
| return ( | |
| q_connection(query) | |
| .pd() | |
| .to_parquet() | |
| ) | |
| def run_query(query: str) -> DataFrame: | |
| with ProcessPoolExecutor(max_workers=1) as executor: | |
| future: Future = executor.submit(run_query_in_process, query) | |
| future_result: bytes = future.result(timeout=60) | |
| result_buffer = BytesIO(future_result) | |
| # Parse the result back into a DataFrame from parquet bytes | |
| result: DataFrame = read_parquet(result_buffer) | |
| return result | |
| if __name__ == "__main__": | |
| # This code can be executed in a threaded application (Flask, FastAPI, etc) | |
| zl_dataframe: DataFrame = run_query(".z.l") | |
| print(zl_dataframe) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment