Skip to content

Instantly share code, notes, and snippets.

@westonpace
Forked from akhildevelops/main.rs
Created August 8, 2025 12:47
Show Gist options
  • Select an option

  • Save westonpace/f7fc7bebb3deaeb6e187fdb9d2e2a7f4 to your computer and use it in GitHub Desktop.

Select an option

Save westonpace/f7fc7bebb3deaeb6e187fdb9d2e2a7f4 to your computer and use it in GitHub Desktop.
Arrow vs Lance
// WriteOps for 10GB
//// Observed:
//// Arrow write duration: 35.340100258s
//// Lance write duration: 112.428478707s
//ReadOps for 10GB:
//// Observed:
//// Arrow read: 2.873216313s
//// Lance read: 4.404420162s
//// Dependencies in Cargo.toml
// [package]
// name = "benchmark"
// version = "0.1.0"
// edition = "2024"
// [dependencies]
// arrow = "55"
// flatbuffers = "25.2.10"
// futures = "0.3.31"
// lance = "0.32.0"
// tokio = { version = "1.47.1", features = ["rt-multi-thread"] }
// src/main.rs
#![allow(unused)]
use std::fs::File;
use std::mem::MaybeUninit;
use std::path::PathBuf;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
use arrow::array::Array;
use arrow::array::Float32Builder;
use arrow::array::Int32Array;
use arrow::array::Int32Builder;
use arrow::array::LargeBinaryBuilder;
use arrow::array::LargeStringBuilder;
use arrow::array::RecordBatch;
use arrow::array::RecordBatchIterator;
const TOTAL_BYTES: usize = 10 * 1024 * 1024 * 1024;
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow::datatypes::Schema;
use arrow::error::ArrowError;
use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::FileWriter;
use futures::StreamExt;
use lance::Dataset;
use tokio::runtime;
#[derive(Clone)]
pub struct Rng {
state: u32,
}
impl Rng {
fn init(seed: u32) -> Self {
Self { state: seed }
}
}
impl Iterator for Rng {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
self.state ^= self.state << 13;
self.state ^= self.state >> 17;
self.state ^= self.state << 5;
Some(self.state)
}
}
const BUFFER_LEN: usize = 4096;
const RB_SIZE: usize = 1024 * 1024 * 1024 * 1;
const SEED: u32 = 139849023;
#[derive(Clone)]
struct RBGen {
r: Rng,
total_bytes: usize,
schema: Schema,
}
impl Iterator for RBGen {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
if self.total_bytes >= TOTAL_BYTES {
return None;
}
let mut int32_builder = Int32Builder::new();
let mut f32_builder = Float32Builder::new();
let mut str_builder = LargeStringBuilder::new();
let mut blob_builder = LargeBinaryBuilder::new();
let value = self.r.next().unwrap();
let mut rb_size = 0;
let mut is_null = true;
while rb_size < RB_SIZE {
if is_null {
int32_builder.append_value(value as i32);
f32_builder.append_value(f32::from_bits(value));
} else {
int32_builder.append_null();
f32_builder.append_null();
}
is_null = !is_null;
rb_size += std::mem::size_of::<u32>();
rb_size += std::mem::size_of::<f32>();
// Create Random String
let mut written_chars: u32 = 0;
let u8_value = (value % 256) as u8;
let n_chars = if value % 2 == 0 {
1024 * 1024 * 1024
} else {
1
};
let mut initial_char = std::cmp::min(std::cmp::max(u8_value, 0x21), 0x7E);
let mut string = String::with_capacity(n_chars as usize);
while written_chars < n_chars {
let mut string_buffer = vec![0x30_u8; BUFFER_LEN];
unsafe {
for char in string_buffer.iter_mut() {
*char = initial_char;
initial_char += 1;
if initial_char == 0x7F {
initial_char = 0x21;
}
}
written_chars += string_buffer.len() as u32;
let string_buffer_str =
str::from_boxed_utf8_unchecked(string_buffer.into_boxed_slice());
string.push_str(&string_buffer_str);
}
}
str_builder.append_value(&string);
rb_size += string.len();
blob_builder.append_value(&string);
rb_size += string.len();
}
self.total_bytes += rb_size;
let int32_arr = int32_builder.finish();
let f32_arr = f32_builder.finish();
let string_arr = str_builder.finish();
let blob_arr = blob_builder.finish();
let rb = RecordBatch::try_new(
Arc::new(self.schema.clone()),
vec![
Arc::new(int32_arr),
Arc::new(f32_arr),
Arc::new(string_arr),
Arc::new(blob_arr),
],
)
.unwrap();
Some(Ok(rb))
}
}
fn write_arrow_lance() {
// Custom random generator for transparency
let r = Rng::init(SEED);
// 4 different datatyp columns: int32, f32, LargeUtf8 and LargeBinary
let int32_field = Field::new("int32", DataType::Int32, true);
let f32_field = Field::new("f32", DataType::Float32, true);
let string_field = Field::new("string", DataType::LargeUtf8, true);
let blob_field = Field::new("blob", DataType::LargeBinary, true);
let schema = Schema::new(vec![int32_field, f32_field, string_field, blob_field]);
// Generates Record Batches until 10GB
let rb_gen = RBGen {
r: r,
total_bytes: 0,
schema: schema.clone(),
};
// Arrow Writes
let mut file = File::create("./blob").unwrap();
let mut fw = FileWriter::try_new(&mut file, &schema).unwrap();
let arrow_file_start = std::time::Instant::now();
for rb in rb_gen.clone() {
fw.write(&rb.unwrap()).unwrap();
}
fw.finish().unwrap();
println!("Arrow write duration: {:?}", arrow_file_start.elapsed());
//Lance Writes
let rb_iter = RecordBatchIterator::new(rb_gen, Arc::new(schema));
let lance_file = PathBuf::from_str("./blob.lance").unwrap();
if lance_file.exists() {
std::fs::remove_dir_all(&lance_file).unwrap();
}
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let lance_file_start = std::time::Instant::now();
rt.block_on(Dataset::write(rb_iter, "./blob.lance", None))
.unwrap();
println!("Lance write duration: {:?}", lance_file_start.elapsed());
}
fn read_arrow() {
// let start_read = std::time::Instant::now();
let file = File::open("./blob").unwrap();
let mut fr = FileReader::try_new(file, None).unwrap();
// println!("===={:?}=====", start_read.elapsed());
loop {
// let rb_read = std::time::Instant::now();
if let Some(rb_result) = fr.next() {
// println!("{:?}", rb_read.elapsed());
// let finish = std::time::Instant::now();
let rb = rb_result.unwrap();
let n_rows = rb.num_rows();
let size = rb.get_array_memory_size();
let col = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
let nulls = col.null_count();
let value = arrow::compute::sum(col).unwrap();
// let end = finish.elapsed();
// println!("{},{},{:?},{},{}", n_rows, size, end, value, nulls);
} else {
break;
}
}
}
fn read_lance() {
let block = async {
// let open_time = std::time::Instant::now();
let dataset = Dataset::open("./blob.lance").await.unwrap();
// println!("{:?}", open_time.elapsed());
// let scanner_time = std::time::Instant::now();
let scanner = dataset.scan();
// println!("{:?}", scanner_time.elapsed());
// let stream_time = std::time::Instant::now();
let mut stream = scanner.try_into_stream().await.unwrap();
// println!("{:?}", stream_time.elapsed());
loop {
// let rb_read = std::time::Instant::now();
if let Some(rb_result) = stream.next().await {
// println!("{:?}", rb_read.elapsed());
// let finish = std::time::Instant::now();
let rb = rb_result.unwrap();
let n_rows = rb.num_rows();
let size = rb.get_array_memory_size();
let col = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
let nulls = col.null_count();
let value = arrow::compute::sum(col).unwrap();
// let end = finish.elapsed();
// println!("{},{},{:?},{},{}", n_rows, size, end, value, nulls);
// pause();
} else {
break;
}
}
};
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(block);
}
fn main() {
// Write arrow and lance datasets from same source
//// Observed:
//// Arrow write duration: 35.340100258s
//// Lance write duration: 112.428478707s
write_arrow_lance();
// File Reads
// Note: Uncomment the inner durations if needed.
// Commented them not to interfere with actual intended operation.
//// Observed:
//// Arrow read: 2.873216313s
//// Lance read: 4.404420162s
//// Read Arrow File
println!("==========ARROW READ===========");
let arrow_t = std::time::Instant::now();
read_arrow();
println!("{:?}", arrow_t.elapsed());
//// Read Lance Folder
println!("==========LANCE READ===========");
let lance_t = std::time::Instant::now();
read_lance();
println!("{:?}", lance_t.elapsed());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment