Last active
September 26, 2025 17:11
-
-
Save benkay86/9f52c16dc873ce85877e79c31e1e0e30 to your computer and use it in GitHub Desktop.
Synchronizing concurrent slurm jobs using advisory file locks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Synchronize slurm jobs using advisory file locks (flock). | |
| This module provides functionality for synchronizing concurrent work across | |
| multiple jobs running on multiple nodes in a slurm cluster. Synchronization is | |
| achieved using advisory file locks, or flock. This module is particularly useful | |
| for easily-parallelizable jobs that divide a large problem into chunks that can | |
| be processed independently. | |
| To use an flock like a mutex: | |
| >>> from slurm_sync import LockFile | |
| >>> with LockFile('/path/to/lock_file'): | |
| >>> print('Do mutually exclusive things.') | |
| For more advanced synchronization using chunks: | |
| >>> from slurm_sync import ChunkFile | |
| >>> | |
| >>> # Reserve a chunk to work on. | |
| >>> # ChunkFile acquires a lock similar to LockFile. | |
| >>> my_chunk = 42 # Plan to do work on chunk 42. | |
| >>> with ChunkFile('/path/to/lock_file') as chunk_file: | |
| >>> if my_chunk in chunk_file.all_chunks: | |
| >>> exit() | |
| >>> chunk_file.my_chunks.add(my_chunk) | |
| >>> # Read in data to do work on my chunk. | |
| >>> | |
| >>> # Do work on my chunk. | |
| >>> | |
| >>> # Save the results of my work and release the chunk. | |
| >>> with ChunkFile('/path/to/lock_file') as chunk_file: | |
| >>> chunk_file.my_chunks.remove(my_chunk) | |
| >>> # Write out results of my work. | |
| See the bottom of this file for a fully-featured working example. | |
| Nota bene: CephFS and NFS have global flock by default. BeeGFS also supports | |
| flock, but it is local to each node by default and therefore unsuitable for | |
| synchronizing activity across nodes. Check /etc/beegfs/beegfs-client.conf and | |
| verify that tuneUseGlobalFileLocks is true. | |
| """ | |
| import fcntl | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import typing | |
| SLURM_JOB_ID = int(os.getenv('SLURM_JOB_ID')) | |
| TIME_LIMIT_SECONDS = None | |
| def time_limit_seconds() -> int: | |
| """Return the slurm time limit for the currently running job, in seconds. | |
| """ | |
| # Ask slurm how much time was allocated. | |
| time_limit_str = subprocess.run(['squeue', '-j', str(SLURM_JOB_ID), '-h', '--Format', 'TimeLimit'], stdout=subprocess.PIPE).stdout.splitlines()[0].strip().decode() | |
| # Parse into seconds. | |
| match = re.search('([0-9]+-)?([0-9]+):([0-9]+):([0-9]+)', time_limit_str) | |
| time_limit = 0 | |
| # Days | |
| if match.group(1): | |
| time_limit += int(match.group(1)[0:-1]) * 86400 | |
| # Hours | |
| time_limit += int(match.group(2)) * 3600 | |
| # Minutes | |
| time_limit += int(match.group(3)) * 60 | |
| # Seconds | |
| time_limit += int(match.group(4)) | |
| return time_limit | |
| def _init_time_limit_seconds(): | |
| global TIME_LIMIT_SECONDS | |
| if TIME_LIMIT_SECONDS is None: | |
| TIME_LIMIT_SECONDS = time_limit_seconds() | |
| _init_time_limit_seconds() | |
| def running_jobs() -> set[int]: | |
| """Return an integer set of the currently running slurm jobs. | |
| """ | |
| return {int(jobid.strip().decode()) for jobid in subprocess.run(['squeue', '-h', '--Format', 'JOBID'], stdout=subprocess.PIPE).stdout.splitlines()} | |
| class JobChunk: | |
| """A combination of a SLURM_JOB_ID and a set of chunks reserved by that job. | |
| Attributes: | |
| slurm_job_id: The slurm job id (as an int). | |
| chunks: An integer set of chunks the slurm job is working on. | |
| """ | |
| def __init__(self, slurm_job_id: int = 0, chunks: typing.Iterable[int] = []): | |
| self.slurm_job_id = slurm_job_id | |
| self.chunks = {chunk for chunk in chunks} | |
| def to_json(self) -> str: | |
| """Serialize the JobChunk to a json string. | |
| Example output: | |
| '{"SLURM_JOB_ID": 12345, "chunks": [6, 7, 8]}' | |
| """ | |
| return json.dumps({'SLURM_JOB_ID': self.slurm_job_id, 'chunks': list(self.chunks)}) | |
| @staticmethod | |
| def from_json(json_data: str | dict) -> typing.Self: | |
| """Parse a JobChunk from a string of json data. | |
| See to_json(). | |
| """ | |
| if isinstance(json_data, str): | |
| json_data = json.loads(json_data) | |
| return JobChunk(json_data['SLURM_JOB_ID'], json_data['chunks']) | |
| class _ChunkFile: | |
| _PRIVATE_CONSTRUCTOR_KEY = object() | |
| def __init__(self, private_constructor, file: os.PathLike | typing.IO): | |
| """Private constructor for _ChunkFile. | |
| Do not instantiate a _ChunkFile directly. Instead, use the ChunkFile | |
| context manager. | |
| Example: | |
| >>> with ChunkFile('/path/to/lock_file') as chunk_file: | |
| >>> print(chunk_file.chunks) | |
| Attributes: | |
| chunks: A dictionary mapping slurm job id to a set of chunks. | |
| """ | |
| # Discourage calling the constructor directly. | |
| if private_constructor is not _ChunkFile._PRIVATE_CONSTRUCTOR_KEY: | |
| raise ValueError('Do not instantiate _ChunkFile directly; use a context manager instead.') | |
| # Check if the file is path-like or IO-like. | |
| if hasattr(file, 'readlines') and callable(file.readlines): | |
| # The file is IO-like and supports readlines. | |
| self._file = file | |
| else: | |
| # The file is path-like, open it. | |
| self._file = open(file, 'r+') | |
| # Verify the file is open in the correct mode. | |
| assert not self._file.closed | |
| assert self._file.mode == 'r+' | |
| # Parse the contents of the file. | |
| # Read the first line. | |
| line1 = self._file.readline() | |
| # Ensure the file is empty or the format string is correct. | |
| assert line1 == '' or line1 == 'CHUNKFILE 1.0\n' | |
| # Make a dictionary whose keys are the SLURM_JOB_ID | |
| # and whose values are the set of chunks reserved by each job. | |
| self.job_chunks = {job_chunk.slurm_job_id: job_chunk.chunks for job_chunk in [JobChunk.from_json(line) for line in self._file.readlines()]} | |
| def cleanup(self): | |
| """Remove chunk file entires for jobs that are not running. | |
| """ | |
| running_jobs_now = running_jobs() | |
| for jobid in list(self.job_chunks.keys()): | |
| if jobid not in running_jobs_now: | |
| print(f"jobid: {jobid}, running_jobs_now: {running_jobs_now}") | |
| del self.job_chunks[jobid] | |
| def flush(self): | |
| """Overwrite the contents of the chunk file with the current job chunks. | |
| Note: This method is called automatically when leaving the context | |
| manager. | |
| """ | |
| # Truncate the file. | |
| self._file.seek(0) | |
| self._file.truncate(0) | |
| # Write the header line. | |
| print('CHUNKFILE 1.0', file=self._file) | |
| # Serialize each job chunk as a line of json. | |
| for key, val in self.job_chunks.items(): | |
| job_chunk = JobChunk(key, val) | |
| print(f"{job_chunk.to_json()}", file=self._file) | |
| self._file.flush() | |
| @property | |
| def my_chunks(self) -> set[int]: | |
| """Chunks for this job. | |
| """ | |
| if SLURM_JOB_ID not in self.job_chunks: | |
| self.job_chunks[SLURM_JOB_ID] = set() | |
| return self.job_chunks[SLURM_JOB_ID] | |
| @property | |
| def all_chunks(self) -> set[int]: | |
| """Chunks for all jobs in the chunk file. | |
| """ | |
| return {chunk for chunks in self.job_chunks.values() for chunk in chunks} | |
| class LockFile: | |
| """A simple context manager for a lock file. | |
| The lock file is atomically created if it does not yet exist. Then the | |
| context manager acquires an advisory file lock and opens the lock file. The | |
| lock file is automatically closed and the flock released when exiting the | |
| context manager. | |
| >>> with LockFile('/path/to/lock_file') as lock_file: | |
| >>> # Do mutually exclusive things. | |
| >>> # Optionally, read to/write from the lock_file IO object. | |
| Or if you don't need to do anything with the lock file: | |
| >>> with LockFile('/path/to/lock_file'): | |
| >>> # Do mutually exclusive things. | |
| """ | |
| def __init__(self, lock_filename: os.PathLike): | |
| """Instantiate a LockFile context manager.. | |
| """ | |
| # Store the filename. | |
| self.lock_filename = lock_filename | |
| self.lock_file = None | |
| # Atomically create the lock file if it does not already exist. | |
| try: | |
| f = os.open(lock_filename, os.O_CREAT | os.O_EXCL) | |
| os.close(f) | |
| except FileExistsError: | |
| # If the file already exists that's OK. | |
| pass | |
| def __enter__(self): | |
| # Open and lock the lockfile. | |
| self.lock_file = open(self.lock_filename, 'r+') | |
| try: | |
| fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_EX) | |
| except: | |
| self.lock_file.close() | |
| raise | |
| # Pass the lockfile into the context. | |
| return self.lock_file | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| # Unlock and close the lockfile. | |
| try: | |
| fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN) | |
| finally: | |
| if not self.lock_file.closed: | |
| self.lock_file.close() | |
| self.lock_file = None | |
| # Return False to propagate any exceptions. | |
| return False | |
| class ChunkFile: | |
| """Context manager for a lock file in which we can store a list of chunks. | |
| >>> with ChunkFile('/path/to/lock_file') as chunk_file: | |
| >>> print(chunk_file.chunks) | |
| Similar to LockFile, this class provudes a context manager for a lock file. | |
| Entering the context acquires an advisory file lock and opens the lock file. | |
| Exiting the context closes the lock file and releases the flock | |
| In addition, ChunkFile manages a list of jobs and what chunks they have | |
| reserved for work. The contents of the file look like this: | |
| {"SLURM_JOB_ID": 123, "chunks": [1, 2, 3]} | |
| {"SLURM_JOB_ID": 124, "chunks": [4, 5]} | |
| {"SLURM_JOB_ID": 125, "chunks": [6, 7, 8]} | |
| Within the context manager, you can access a _ChunkFile with an attribute | |
| chunks, which is a dictionary mapping slurm job ids to a set of chunks each | |
| job has reserved. The _ChunkFile class also provides convenience properties | |
| all_chunks and my_chunks, which are sets. You can reserve and release chunks | |
| with my_chunks.add() and my_chunks.remove(). Changes are automatically | |
| written to the lock file when exiting the context. | |
| """ | |
| def __init__(self, lock_filename: os.PathLike, cleanup: bool = True): | |
| """Instantiate a ChunkFile from the file name of a lock file. | |
| Creates the lock file if it does not already exist. | |
| """ | |
| self.cleanup = cleanup | |
| self._lock_file = LockFile(lock_filename) | |
| def __enter__(self): | |
| # Delegate to LockContextManager to open and lock the chunk file. | |
| lock_file = self._lock_file.__enter__() | |
| # Try to parse the lock file as a chunk file. | |
| try: | |
| self.chunk_file = _ChunkFile(_ChunkFile._PRIVATE_CONSTRUCTOR_KEY, lock_file) | |
| if self.cleanup: | |
| self.chunk_file.cleanup() | |
| return self.chunk_file | |
| except: | |
| # If there is a problem, ensure the lock is released. | |
| exc_type, exc_value, traceback = sys.exc_info() | |
| self._lock_file.__exit__(exc_type, exc_value, traceback) | |
| raise | |
| def __exit__(self, exc_type, exc_value, traceback): | |
| # Flush the set to the lock file. | |
| try: | |
| self.chunk_file.flush() | |
| self.chunk_file = None | |
| finally: | |
| # Ensure the lock is released. | |
| self._lock_file.__exit__(exc_type, exc_value, traceback) | |
| # Return False to propagate any exceptions. | |
| return False | |
| # Example | |
| if __name__ == "__main__": | |
| import time | |
| import numpy | |
| lock_filename: os.PathLike = '/ceph/chpc/shared/benjamin_kay_group/file.lock' | |
| out_filename: os.PathLike = '/ceph/chpc/shared/benjamin_kay_group/outfile.npy' | |
| chunks: typing.Iterable[int] = range(0,10) | |
| sleep_for_seconds: float = 10 | |
| # Track the time it takes to process each chunk. | |
| n_chunks_processed = 0 | |
| start_time = time.time() | |
| # Convert chunks to a set. | |
| chunks = {chunk for chunk in chunks} | |
| print(f"Working on the following chunks: {chunks}") | |
| # Do work while there are chunks to work on. | |
| while chunks: | |
| # Check if we have time to process more chunks. | |
| # Give ourselves a 5 minute margin for error. | |
| if n_chunks_processed > 0: | |
| elapsed_time = time.time() - start_time | |
| print(f"Job has been running for {elapsed_time} seconds.") | |
| print(f"Processing a chunk every {elapsed_time/n_chunks_processed} seconds.") | |
| print(f"Slurm job has {TIME_LIMIT_SECONDS - elapsed_time} seconds remaining.") | |
| if elapsed_time / n_chunks_processed + 300 > TIME_LIMIT_SECONDS - elapsed_time: | |
| print(f"We are out of time.") | |
| # Call sbatch to spawn a new job. | |
| # subprocess.run(['sbatch', 'my_job.sh']) | |
| break | |
| # Acquire the lock. | |
| print(f"Acquiring advisory file lock on {lock_filename}") | |
| with ChunkFile(lock_filename) as chunk_file: | |
| print('Lock acquired.') | |
| # Create the output file if it does not exist. | |
| if not os.path.exists(out_filename): | |
| print(f"Initialized empty npy file {out_filename}") | |
| numpy.save(out_filename, numpy.empty(0, dtype=int)) | |
| # Check if there is work to do. | |
| # First check the chunk file. | |
| chunks = chunks.difference(chunk_file.all_chunks) | |
| # Then check the output file. | |
| out_data = numpy.load(out_filename) | |
| for chunk in out_data: | |
| chunks.discard(chunk) | |
| print(f"The following chunks still need work: {chunks}") | |
| # Are there any chunks left to work on> | |
| if not chunks: | |
| print('All chunks have been processed, exiting.') | |
| break | |
| # Reserve the next chunk to work on by adding it to the set in the | |
| # lock file. | |
| chunk = chunks.pop() | |
| chunk_file.my_chunks.add(chunk) | |
| print(f"Reserving chunk {chunk} in the lock file {lock_filename}") | |
| # Release the lock and work on the next chunk. | |
| print('Lock released.') | |
| print(f"Working on chunk {chunk}") | |
| time.sleep(sleep_for_seconds) | |
| # Reacquire the lock. | |
| print(f"Acquiring advisory file lock on {lock_filename}") | |
| with ChunkFile(lock_filename) as chunk_file: | |
| print('Lock acquired.') | |
| # Write the result of our work to the output file. | |
| out_data = numpy.load(out_filename) | |
| out_data = numpy.append(out_data, chunk) | |
| numpy.save(out_filename, out_data) | |
| print(f"Wrote npy file {out_filename} with the following chunks: {out_data}") | |
| # Our work is completed; remove this chunk from the reserved set. | |
| chunk_file.my_chunks.remove(chunk) | |
| print(f"Releasing chunk {chunk} from the chunk file {lock_filename}") | |
| print('Lock released.') | |
| # Increment the number of chunks processed. | |
| n_chunks_processed += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment