Last active
June 11, 2025 22:59
-
-
Save sxlijin/2269b43a8f04a57f9f6d5b04f904cc5a to your computer and use it in GitHub Desktop.
ClickhouseJson has a bug around RowCursor boundaries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| async fn test_json_table_operations() -> Result<()> { | |
| let client = clickhouse::Client::default() | |
| .with_url( | |
| std::env::var("CLICKHOUSE_HOST_STUDIO2") | |
| .context("CLICKHOUSE_HOST_STUDIO2 must be set")?, | |
| ) | |
| .with_user("default") | |
| .with_password( | |
| std::env::var("CLICKHOUSE_PASSWORD_STUDIO2") | |
| .context("CLICKHOUSE_PASSWORD_STUDIO2 must be set")?, | |
| ) | |
| .with_option("enable_json_type", "1") | |
| .with_option("schema_inference_make_json_columns_nullable", "1") | |
| // Enable inserting JSON columns as a string | |
| .with_option("input_format_binary_read_json_as_string", "1") | |
| // Enable selecting JSON columns as a string | |
| .with_option("output_format_binary_write_json_as_string", "1") | |
| .with_option("input_format_json_infer_incomplete_types_as_strings", "1") | |
| .with_option("output_format_json_quote_64bit_integers", "0") | |
| // dont cast uint64 that are quoted into actual uint64! | |
| .with_option("input_format_json_try_infer_numbers_from_strings", "0") | |
| .with_option("enable_analyzer", "1") | |
| .with_database("sam_test"); | |
| // Create database and table | |
| let ddl = r#" | |
| CREATE DATABASE IF NOT EXISTS sam_test; | |
| DROP TABLE IF EXISTS sam_test.repro2_table; | |
| CREATE TABLE sam_test.repro2_table ( | |
| id String, | |
| data Json | |
| ) ENGINE = MergeTree() | |
| ORDER BY id | |
| SETTINGS enable_json_type = 1; | |
| "#; | |
| for statement in ddl.split(";") { | |
| let statement = statement.trim(); | |
| if statement.is_empty() { | |
| continue; | |
| } | |
| tracing::info!("Executing statement: {}", statement); | |
| client.query(statement).execute().await?; | |
| } | |
| #[derive(Debug, Deserialize, Serialize)] | |
| struct MyDataStruct { | |
| name: String, | |
| value: i64, | |
| large_data: String, // Will be filled with ~500KiB of data | |
| } | |
| #[derive(Debug, Deserialize, Serialize, clickhouse::Row)] | |
| struct TestRowDataAsString { | |
| id: String, | |
| data: String, | |
| } | |
| // Generate and insert 1000 rows | |
| let mut insert = client.insert("sam_test.repro2_table")?; | |
| for i in 0..50 { | |
| let large_data = "x".repeat(30 * 1024); // 100KiB of 'x' characters | |
| let data = MyDataStruct { | |
| name: format!("item_{:05} (len: {})", i, large_data.len()), | |
| value: 5, | |
| large_data, | |
| }; | |
| let row = TestRowDataAsString { | |
| id: format!("id_{}", i), | |
| data: serde_json::to_string(&data).unwrap(), | |
| }; | |
| insert.write(&row).await?; | |
| } | |
| insert.end().await?; | |
| // Read back all rows | |
| let query = client.query("SELECT id, data FROM sam_test.repro2_table"); | |
| let rows = query.fetch_all::<TestRowDataAsString>().await?; | |
| tracing::info!("Read {} rows back from table", rows.len()); | |
| for row in rows { | |
| if row.data.len() != 102461 { | |
| tracing::info!("row data len: {}", row.data.len()); | |
| } | |
| match serde_json::from_str::<MyDataStruct>(&row.data) { | |
| Ok(data) => {} | |
| Err(e) => tracing::info!("not parsable? {}", e), | |
| } | |
| } | |
| tracing::info!("done with TestRowDataAsString"); | |
| #[derive(Debug, Deserialize, Serialize, clickhouse::Row)] | |
| struct TestRowUsingClickhouseJson { | |
| id: String, | |
| data: ClickhouseJson<MyDataStruct>, | |
| } | |
| let query = client.query("SELECT id, data FROM sam_test.repro2_table LIMIT 35"); | |
| let rows_result = query.fetch_all::<TestRowUsingClickhouseJson>().await; | |
| match rows_result { | |
| Ok(rows) => { | |
| tracing::info!("Read {} rows back from table", rows.len()); | |
| } | |
| Err(e) => { | |
| tracing::error!("Error fetching rows: {:#?}", e); | |
| return Err(e.into()); | |
| } | |
| } | |
| tracing::info!("done with TestRowUsingClickhouseJson"); | |
| let query = client.query("SELECT id, data FROM sam_test.repro2_table"); | |
| let mut rows = query.fetch_bytes("RowBinary")?; | |
| loop { | |
| let row = rows.next().await?; | |
| match row { | |
| Some(row) => { | |
| tracing::info!("row: {:#?}", row.len()); | |
| } | |
| None => { | |
| break; | |
| } | |
| } | |
| } | |
| // let rows = query.fetch_all::<TestRowUsingClickhouseJson>().await?; | |
| // tracing::info!("Read {} rows back from table", rows.len()); | |
| Ok(()) | |
| } | |
| [derive(Debug, Clone, Copy)] | |
| pub struct ClickhouseJson<T>(T); | |
| impl<T> ClickhouseJson<T> { | |
| pub fn value(self) -> T { | |
| self.0 | |
| } | |
| pub fn as_ref(&self) -> &T { | |
| &self.0 | |
| } | |
| } | |
| impl<T> serde::Serialize for ClickhouseJson<T> | |
| where | |
| T: serde::Serialize, | |
| { | |
| fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> | |
| where | |
| S: serde::Serializer, | |
| { | |
| serializer.serialize_str(&serde_json::to_string(&self.0).map_err(|e| { | |
| serde::ser::Error::custom(format!( | |
| "Failed to serialize ClickhouseJson<{}> due to {e}", | |
| std::any::type_name::<T>() | |
| )) | |
| })?) | |
| } | |
| } | |
| // struct MyStringVisitor; | |
| // impl<'de> serde::de::Visitor<'de> for MyStringVisitor { | |
| // type Value = String; | |
| // fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { | |
| // formatter.write_str("an integer between -2^31 and 2^31") | |
| // } | |
| // fn visit_str<E>(self, value: &str) -> Result<Self::Value, E> | |
| // where | |
| // E: serde::de::Error, | |
| // { | |
| // Ok(value.to_string()) | |
| // } | |
| // } | |
| impl<'de, T> serde::Deserialize<'de> for ClickhouseJson<T> | |
| where | |
| T: serde::de::DeserializeOwned, | |
| { | |
| fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> | |
| where | |
| D: serde::Deserializer<'de>, | |
| { | |
| let s: String = serde::Deserialize::deserialize(deserializer).map_err(|e| { | |
| serde::de::Error::custom(format!( | |
| "Failed to deserialize ClickhouseJson<{}> in deserialize_str due to <<<{e}>>>", | |
| std::any::type_name::<T>() | |
| )) | |
| })?; | |
| let v = serde_json::from_str(&s).map_err(|e| { | |
| serde::de::Error::custom(format!( | |
| "Failed to deserialize ClickhouseJson<{}> in serde_json::from_str due to {e} \n\n s (length: {}): {s:#?}", | |
| std::any::type_name::<T>(), | |
| s.len(), | |
| )) | |
| })?; | |
| Ok(Self(v)) | |
| } | |
| } | |
| impl<T> From<T> for ClickhouseJson<T> | |
| where | |
| T: serde::Serialize, | |
| { | |
| fn from(value: T) -> Self { | |
| Self(value) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment