Last active
November 23, 2024 06:10
-
-
Save kawasin73/f8765f7581bb425d5a11aa68453c6144 to your computer and use it in GitHub Desktop.
cmp_memfd
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| [package] | |
| name = "cmp_memfd" | |
| version = "0.1.0" | |
| edition = "2021" | |
| [dependencies] | |
| memmap = "*" | |
| libc = "*" | |
| nix = {version = "*", features = ["socket", "uio"]} | |
| tempfile = "*" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use nix::sys::socket::{recvmsg, sendmsg, ControlMessage, ControlMessageOwned, MsgFlags}; | |
| use std::ffi::CString; | |
| use std::fs::File; | |
| use std::io::IoSlice; | |
| use std::io::IoSliceMut; | |
| use std::io::Read; | |
| use std::io::Write; | |
| use std::ops::BitXorAssign; | |
| use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; | |
| use std::os::unix::net::{UnixListener, UnixStream}; | |
| use std::sync::Arc; | |
| use std::sync::Barrier; | |
| use std::time::Instant; | |
| // 3MB | |
| const DATA_LEN: usize = 3 * 1024 * 1024; | |
| const N_DATA_PER_PRODUCER: usize = 1000; | |
| const N_PRODUCERS: usize = 3; | |
| const QUEUE_SIZE: usize = N_PRODUCERS * 10; | |
| fn main() { | |
| let barrier = Arc::new(Barrier::new(N_PRODUCERS + 1)); | |
| let allocate_sem = Arc::new(Semaphore::new("alloc_memfd", 0)); | |
| for _ in 0..QUEUE_SIZE { | |
| allocate_sem.post(); | |
| } | |
| let path = "/tmp/rust-unixstream-fd.sock"; | |
| let _ = std::fs::remove_file(path); | |
| // UnixListenerを作成 | |
| let listener = UnixListener::bind(path).expect("Failed to bind socket"); | |
| let client = UnixStream::connect(path).expect("Failed to connect to socket"); | |
| let (server, _) = listener.accept().expect("Failed to accept connection"); | |
| for i in 0..N_PRODUCERS { | |
| let barrier = barrier.clone(); | |
| let allocate_sem = allocate_sem.clone(); | |
| let client = client.try_clone().expect("Failed to clone client"); | |
| std::thread::spawn(move || { | |
| println!("producer {i}"); | |
| producer(client, allocate_sem, barrier); | |
| println!("producer {i} finished"); | |
| }); | |
| } | |
| barrier.wait(); | |
| println!("start"); | |
| let start = Instant::now(); | |
| consumer(server, allocate_sem); | |
| let elapsed = start.elapsed(); | |
| println!("finished: {:?}", elapsed); | |
| } | |
| fn producer(client: UnixStream, allocate_sem: Arc<Semaphore>, barrier: Arc<Barrier>) { | |
| let mut src_data = vec![0; DATA_LEN]; | |
| File::open("/dev/urandom") | |
| .unwrap() | |
| .read_exact(&mut src_data) | |
| .unwrap(); | |
| barrier.wait(); | |
| for _ in 0..N_DATA_PER_PRODUCER { | |
| allocate_sem.wait(); | |
| let mut file = tempfile::tempfile().unwrap(); | |
| file.write_all(&src_data).unwrap(); | |
| send_fd(&client, file.as_raw_fd()); | |
| } | |
| } | |
| fn consumer(server: UnixStream, allocate_sem: Arc<Semaphore>) { | |
| for _ in 0..(N_DATA_PER_PRODUCER * N_PRODUCERS) { | |
| let fd = recv_fd(&server); | |
| let file = unsafe { std::fs::File::from_raw_fd(fd) }; | |
| let m = unsafe { memmap::Mmap::map(&file) }.unwrap(); | |
| let mut sum = 0; | |
| for i in 0..DATA_LEN / 8 { | |
| let v = u64::from_ne_bytes(m[i * 8..(i + 1) * 8].try_into().expect("try_into failed")); | |
| sum.bitxor_assign(v); | |
| } | |
| std::hint::black_box(sum); | |
| allocate_sem.post(); | |
| } | |
| } | |
| fn send_fd(stream: &UnixStream, fd: RawFd) { | |
| let mut buf = [0u8; 1]; | |
| let mut iov = [IoSlice::new(&mut buf)]; | |
| let fds = [fd]; | |
| let cmsg = [ControlMessage::ScmRights(&fds)]; | |
| sendmsg::<()>(stream.as_raw_fd(), &mut iov, &cmsg, MsgFlags::empty(), None) | |
| .expect("Failed to send file descriptor"); | |
| } | |
| fn recv_fd(stream: &UnixStream) -> RawFd { | |
| let mut buf = [0u8; 1]; | |
| let mut iov = [IoSliceMut::new(&mut buf)]; | |
| let mut cmsgspace = nix::cmsg_space!([RawFd; 1]); | |
| let msg = recvmsg::<()>( | |
| stream.as_raw_fd(), | |
| &mut iov, | |
| Some(&mut cmsgspace), | |
| MsgFlags::empty(), | |
| ) | |
| .expect("Failed to receive message"); | |
| for cmsg in msg.cmsgs().unwrap() { | |
| if let ControlMessageOwned::ScmRights(fds) = cmsg { | |
| return fds[0]; | |
| } | |
| } | |
| panic!("No file descriptor received"); | |
| } | |
| struct Semaphore { | |
| sem_name: CString, | |
| sem: *mut libc::sem_t, | |
| } | |
| unsafe impl Send for Semaphore {} | |
| unsafe impl Sync for Semaphore {} | |
| impl Semaphore { | |
| fn new(name: &str, value: u32) -> Self { | |
| let sem_name = CString::new(name).expect("CString::new failed"); | |
| let sem = unsafe { libc::sem_open(sem_name.as_ptr(), libc::O_CREAT, 0o644, value) }; | |
| if sem == libc::SEM_FAILED { | |
| panic!("Failed to open semaphore"); | |
| } | |
| Self { sem_name, sem } | |
| } | |
| fn wait(&self) { | |
| unsafe { | |
| libc::sem_wait(self.sem); | |
| } | |
| } | |
| fn post(&self) { | |
| unsafe { | |
| libc::sem_post(self.sem); | |
| } | |
| } | |
| } | |
| impl Drop for Semaphore { | |
| fn drop(&mut self) { | |
| unsafe { | |
| if libc::sem_close(self.sem) != 0 { | |
| eprintln!("Failed to close semaphore"); | |
| } | |
| } | |
| unsafe { | |
| if libc::sem_unlink(self.sem_name.as_ptr()) != 0 { | |
| eprintln!("Failed to unlink semaphore"); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment