Skip to content

Instantly share code, notes, and snippets.

@sxlijin
Last active June 11, 2025 22:59
Show Gist options
  • Select an option

  • Save sxlijin/2269b43a8f04a57f9f6d5b04f904cc5a to your computer and use it in GitHub Desktop.

Select an option

Save sxlijin/2269b43a8f04a57f9f6d5b04f904cc5a to your computer and use it in GitHub Desktop.
ClickhouseJson has a bug around RowCursor boundaries
async fn test_json_table_operations() -> Result<()> {
let client = clickhouse::Client::default()
.with_url(
std::env::var("CLICKHOUSE_HOST_STUDIO2")
.context("CLICKHOUSE_HOST_STUDIO2 must be set")?,
)
.with_user("default")
.with_password(
std::env::var("CLICKHOUSE_PASSWORD_STUDIO2")
.context("CLICKHOUSE_PASSWORD_STUDIO2 must be set")?,
)
.with_option("enable_json_type", "1")
.with_option("schema_inference_make_json_columns_nullable", "1")
// Enable inserting JSON columns as a string
.with_option("input_format_binary_read_json_as_string", "1")
// Enable selecting JSON columns as a string
.with_option("output_format_binary_write_json_as_string", "1")
.with_option("input_format_json_infer_incomplete_types_as_strings", "1")
.with_option("output_format_json_quote_64bit_integers", "0")
// dont cast uint64 that are quoted into actual uint64!
.with_option("input_format_json_try_infer_numbers_from_strings", "0")
.with_option("enable_analyzer", "1")
.with_database("sam_test");
// Create database and table
let ddl = r#"
CREATE DATABASE IF NOT EXISTS sam_test;
DROP TABLE IF EXISTS sam_test.repro2_table;
CREATE TABLE sam_test.repro2_table (
id String,
data Json
) ENGINE = MergeTree()
ORDER BY id
SETTINGS enable_json_type = 1;
"#;
for statement in ddl.split(";") {
let statement = statement.trim();
if statement.is_empty() {
continue;
}
tracing::info!("Executing statement: {}", statement);
client.query(statement).execute().await?;
}
#[derive(Debug, Deserialize, Serialize)]
struct MyDataStruct {
name: String,
value: i64,
large_data: String, // Will be filled with ~500KiB of data
}
#[derive(Debug, Deserialize, Serialize, clickhouse::Row)]
struct TestRowDataAsString {
id: String,
data: String,
}
// Generate and insert 1000 rows
let mut insert = client.insert("sam_test.repro2_table")?;
for i in 0..50 {
let large_data = "x".repeat(30 * 1024); // 100KiB of 'x' characters
let data = MyDataStruct {
name: format!("item_{:05} (len: {})", i, large_data.len()),
value: 5,
large_data,
};
let row = TestRowDataAsString {
id: format!("id_{}", i),
data: serde_json::to_string(&data).unwrap(),
};
insert.write(&row).await?;
}
insert.end().await?;
// Read back all rows
let query = client.query("SELECT id, data FROM sam_test.repro2_table");
let rows = query.fetch_all::<TestRowDataAsString>().await?;
tracing::info!("Read {} rows back from table", rows.len());
for row in rows {
if row.data.len() != 102461 {
tracing::info!("row data len: {}", row.data.len());
}
match serde_json::from_str::<MyDataStruct>(&row.data) {
Ok(data) => {}
Err(e) => tracing::info!("not parsable? {}", e),
}
}
tracing::info!("done with TestRowDataAsString");
#[derive(Debug, Deserialize, Serialize, clickhouse::Row)]
struct TestRowUsingClickhouseJson {
id: String,
data: ClickhouseJson<MyDataStruct>,
}
let query = client.query("SELECT id, data FROM sam_test.repro2_table LIMIT 35");
let rows_result = query.fetch_all::<TestRowUsingClickhouseJson>().await;
match rows_result {
Ok(rows) => {
tracing::info!("Read {} rows back from table", rows.len());
}
Err(e) => {
tracing::error!("Error fetching rows: {:#?}", e);
return Err(e.into());
}
}
tracing::info!("done with TestRowUsingClickhouseJson");
let query = client.query("SELECT id, data FROM sam_test.repro2_table");
let mut rows = query.fetch_bytes("RowBinary")?;
loop {
let row = rows.next().await?;
match row {
Some(row) => {
tracing::info!("row: {:#?}", row.len());
}
None => {
break;
}
}
}
// let rows = query.fetch_all::<TestRowUsingClickhouseJson>().await?;
// tracing::info!("Read {} rows back from table", rows.len());
Ok(())
}
[derive(Debug, Clone, Copy)]
pub struct ClickhouseJson<T>(T);
impl<T> ClickhouseJson<T> {
pub fn value(self) -> T {
self.0
}
pub fn as_ref(&self) -> &T {
&self.0
}
}
impl<T> serde::Serialize for ClickhouseJson<T>
where
T: serde::Serialize,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&serde_json::to_string(&self.0).map_err(|e| {
serde::ser::Error::custom(format!(
"Failed to serialize ClickhouseJson<{}> due to {e}",
std::any::type_name::<T>()
))
})?)
}
}
// struct MyStringVisitor;
// impl<'de> serde::de::Visitor<'de> for MyStringVisitor {
// type Value = String;
// fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
// formatter.write_str("an integer between -2^31 and 2^31")
// }
// fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
// where
// E: serde::de::Error,
// {
// Ok(value.to_string())
// }
// }
impl<'de, T> serde::Deserialize<'de> for ClickhouseJson<T>
where
T: serde::de::DeserializeOwned,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s: String = serde::Deserialize::deserialize(deserializer).map_err(|e| {
serde::de::Error::custom(format!(
"Failed to deserialize ClickhouseJson<{}> in deserialize_str due to <<<{e}>>>",
std::any::type_name::<T>()
))
})?;
let v = serde_json::from_str(&s).map_err(|e| {
serde::de::Error::custom(format!(
"Failed to deserialize ClickhouseJson<{}> in serde_json::from_str due to {e} \n\n s (length: {}): {s:#?}",
std::any::type_name::<T>(),
s.len(),
))
})?;
Ok(Self(v))
}
}
impl<T> From<T> for ClickhouseJson<T>
where
T: serde::Serialize,
{
fn from(value: T) -> Self {
Self(value)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment