Skip to content

Instantly share code, notes, and snippets.

@kawasin73
Last active November 23, 2024 06:10
Show Gist options
  • Select an option

  • Save kawasin73/f8765f7581bb425d5a11aa68453c6144 to your computer and use it in GitHub Desktop.

Select an option

Save kawasin73/f8765f7581bb425d5a11aa68453c6144 to your computer and use it in GitHub Desktop.
cmp_memfd
[package]
name = "cmp_memfd"
version = "0.1.0"
edition = "2021"
[dependencies]
memmap = "*"
libc = "*"
nix = {version = "*", features = ["socket", "uio"]}
tempfile = "*"
use nix::sys::socket::{recvmsg, sendmsg, ControlMessage, ControlMessageOwned, MsgFlags};
use std::ffi::CString;
use std::fs::File;
use std::io::IoSlice;
use std::io::IoSliceMut;
use std::io::Read;
use std::io::Write;
use std::ops::BitXorAssign;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::os::unix::net::{UnixListener, UnixStream};
use std::sync::Arc;
use std::sync::Barrier;
use std::time::Instant;
// 3MB
const DATA_LEN: usize = 3 * 1024 * 1024;
const N_DATA_PER_PRODUCER: usize = 1000;
const N_PRODUCERS: usize = 3;
const QUEUE_SIZE: usize = N_PRODUCERS * 10;
fn main() {
let barrier = Arc::new(Barrier::new(N_PRODUCERS + 1));
let allocate_sem = Arc::new(Semaphore::new("alloc_memfd", 0));
for _ in 0..QUEUE_SIZE {
allocate_sem.post();
}
let path = "/tmp/rust-unixstream-fd.sock";
let _ = std::fs::remove_file(path);
// UnixListenerを作成
let listener = UnixListener::bind(path).expect("Failed to bind socket");
let client = UnixStream::connect(path).expect("Failed to connect to socket");
let (server, _) = listener.accept().expect("Failed to accept connection");
for i in 0..N_PRODUCERS {
let barrier = barrier.clone();
let allocate_sem = allocate_sem.clone();
let client = client.try_clone().expect("Failed to clone client");
std::thread::spawn(move || {
println!("producer {i}");
producer(client, allocate_sem, barrier);
println!("producer {i} finished");
});
}
barrier.wait();
println!("start");
let start = Instant::now();
consumer(server, allocate_sem);
let elapsed = start.elapsed();
println!("finished: {:?}", elapsed);
}
fn producer(client: UnixStream, allocate_sem: Arc<Semaphore>, barrier: Arc<Barrier>) {
let mut src_data = vec![0; DATA_LEN];
File::open("/dev/urandom")
.unwrap()
.read_exact(&mut src_data)
.unwrap();
barrier.wait();
for _ in 0..N_DATA_PER_PRODUCER {
allocate_sem.wait();
let mut file = tempfile::tempfile().unwrap();
file.write_all(&src_data).unwrap();
send_fd(&client, file.as_raw_fd());
}
}
fn consumer(server: UnixStream, allocate_sem: Arc<Semaphore>) {
for _ in 0..(N_DATA_PER_PRODUCER * N_PRODUCERS) {
let fd = recv_fd(&server);
let file = unsafe { std::fs::File::from_raw_fd(fd) };
let m = unsafe { memmap::Mmap::map(&file) }.unwrap();
let mut sum = 0;
for i in 0..DATA_LEN / 8 {
let v = u64::from_ne_bytes(m[i * 8..(i + 1) * 8].try_into().expect("try_into failed"));
sum.bitxor_assign(v);
}
std::hint::black_box(sum);
allocate_sem.post();
}
}
fn send_fd(stream: &UnixStream, fd: RawFd) {
let mut buf = [0u8; 1];
let mut iov = [IoSlice::new(&mut buf)];
let fds = [fd];
let cmsg = [ControlMessage::ScmRights(&fds)];
sendmsg::<()>(stream.as_raw_fd(), &mut iov, &cmsg, MsgFlags::empty(), None)
.expect("Failed to send file descriptor");
}
fn recv_fd(stream: &UnixStream) -> RawFd {
let mut buf = [0u8; 1];
let mut iov = [IoSliceMut::new(&mut buf)];
let mut cmsgspace = nix::cmsg_space!([RawFd; 1]);
let msg = recvmsg::<()>(
stream.as_raw_fd(),
&mut iov,
Some(&mut cmsgspace),
MsgFlags::empty(),
)
.expect("Failed to receive message");
for cmsg in msg.cmsgs().unwrap() {
if let ControlMessageOwned::ScmRights(fds) = cmsg {
return fds[0];
}
}
panic!("No file descriptor received");
}
struct Semaphore {
sem_name: CString,
sem: *mut libc::sem_t,
}
unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}
impl Semaphore {
fn new(name: &str, value: u32) -> Self {
let sem_name = CString::new(name).expect("CString::new failed");
let sem = unsafe { libc::sem_open(sem_name.as_ptr(), libc::O_CREAT, 0o644, value) };
if sem == libc::SEM_FAILED {
panic!("Failed to open semaphore");
}
Self { sem_name, sem }
}
fn wait(&self) {
unsafe {
libc::sem_wait(self.sem);
}
}
fn post(&self) {
unsafe {
libc::sem_post(self.sem);
}
}
}
impl Drop for Semaphore {
fn drop(&mut self) {
unsafe {
if libc::sem_close(self.sem) != 0 {
eprintln!("Failed to close semaphore");
}
}
unsafe {
if libc::sem_unlink(self.sem_name.as_ptr()) != 0 {
eprintln!("Failed to unlink semaphore");
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment