Skip to content

Instantly share code, notes, and snippets.

@benkay86
Last active September 26, 2025 17:11
Show Gist options
  • Select an option

  • Save benkay86/9f52c16dc873ce85877e79c31e1e0e30 to your computer and use it in GitHub Desktop.

Select an option

Save benkay86/9f52c16dc873ce85877e79c31e1e0e30 to your computer and use it in GitHub Desktop.
Synchronizing concurrent slurm jobs using advisory file locks
"""Synchronize slurm jobs using advisory file locks (flock).
This module provides functionality for synchronizing concurrent work across
multiple jobs running on multiple nodes in a slurm cluster. Synchronization is
achieved using advisory file locks, or flock. This module is particularly useful
for easily-parallelizable jobs that divide a large problem into chunks that can
be processed independently.
To use an flock like a mutex:
>>> from slurm_sync import LockFile
>>> with LockFile('/path/to/lock_file'):
>>> print('Do mutually exclusive things.')
For more advanced synchronization using chunks:
>>> from slurm_sync import ChunkFile
>>>
>>> # Reserve a chunk to work on.
>>> # ChunkFile acquires a lock similar to LockFile.
>>> my_chunk = 42 # Plan to do work on chunk 42.
>>> with ChunkFile('/path/to/lock_file') as chunk_file:
>>> if my_chunk in chunk_file.all_chunks:
>>> exit()
>>> chunk_file.my_chunks.add(my_chunk)
>>> # Read in data to do work on my chunk.
>>>
>>> # Do work on my chunk.
>>>
>>> # Save the results of my work and release the chunk.
>>> with ChunkFile('/path/to/lock_file') as chunk_file:
>>> chunk_file.my_chunks.remove(my_chunk)
>>> # Write out results of my work.
See the bottom of this file for a fully-featured working example.
Nota bene: CephFS and NFS have global flock by default. BeeGFS also supports
flock, but it is local to each node by default and therefore unsuitable for
synchronizing activity across nodes. Check /etc/beegfs/beegfs-client.conf and
verify that tuneUseGlobalFileLocks is true.
"""
import fcntl
import json
import os
import re
import subprocess
import sys
import typing
SLURM_JOB_ID = int(os.getenv('SLURM_JOB_ID'))
TIME_LIMIT_SECONDS = None
def time_limit_seconds() -> int:
"""Return the slurm time limit for the currently running job, in seconds.
"""
# Ask slurm how much time was allocated.
time_limit_str = subprocess.run(['squeue', '-j', str(SLURM_JOB_ID), '-h', '--Format', 'TimeLimit'], stdout=subprocess.PIPE).stdout.splitlines()[0].strip().decode()
# Parse into seconds.
match = re.search('([0-9]+-)?([0-9]+):([0-9]+):([0-9]+)', time_limit_str)
time_limit = 0
# Days
if match.group(1):
time_limit += int(match.group(1)[0:-1]) * 86400
# Hours
time_limit += int(match.group(2)) * 3600
# Minutes
time_limit += int(match.group(3)) * 60
# Seconds
time_limit += int(match.group(4))
return time_limit
def _init_time_limit_seconds():
global TIME_LIMIT_SECONDS
if TIME_LIMIT_SECONDS is None:
TIME_LIMIT_SECONDS = time_limit_seconds()
_init_time_limit_seconds()
def running_jobs() -> set[int]:
"""Return an integer set of the currently running slurm jobs.
"""
return {int(jobid.strip().decode()) for jobid in subprocess.run(['squeue', '-h', '--Format', 'JOBID'], stdout=subprocess.PIPE).stdout.splitlines()}
class JobChunk:
"""A combination of a SLURM_JOB_ID and a set of chunks reserved by that job.
Attributes:
slurm_job_id: The slurm job id (as an int).
chunks: An integer set of chunks the slurm job is working on.
"""
def __init__(self, slurm_job_id: int = 0, chunks: typing.Iterable[int] = []):
self.slurm_job_id = slurm_job_id
self.chunks = {chunk for chunk in chunks}
def to_json(self) -> str:
"""Serialize the JobChunk to a json string.
Example output:
'{"SLURM_JOB_ID": 12345, "chunks": [6, 7, 8]}'
"""
return json.dumps({'SLURM_JOB_ID': self.slurm_job_id, 'chunks': list(self.chunks)})
@staticmethod
def from_json(json_data: str | dict) -> typing.Self:
"""Parse a JobChunk from a string of json data.
See to_json().
"""
if isinstance(json_data, str):
json_data = json.loads(json_data)
return JobChunk(json_data['SLURM_JOB_ID'], json_data['chunks'])
class _ChunkFile:
_PRIVATE_CONSTRUCTOR_KEY = object()
def __init__(self, private_constructor, file: os.PathLike | typing.IO):
"""Private constructor for _ChunkFile.
Do not instantiate a _ChunkFile directly. Instead, use the ChunkFile
context manager.
Example:
>>> with ChunkFile('/path/to/lock_file') as chunk_file:
>>> print(chunk_file.chunks)
Attributes:
chunks: A dictionary mapping slurm job id to a set of chunks.
"""
# Discourage calling the constructor directly.
if private_constructor is not _ChunkFile._PRIVATE_CONSTRUCTOR_KEY:
raise ValueError('Do not instantiate _ChunkFile directly; use a context manager instead.')
# Check if the file is path-like or IO-like.
if hasattr(file, 'readlines') and callable(file.readlines):
# The file is IO-like and supports readlines.
self._file = file
else:
# The file is path-like, open it.
self._file = open(file, 'r+')
# Verify the file is open in the correct mode.
assert not self._file.closed
assert self._file.mode == 'r+'
# Parse the contents of the file.
# Read the first line.
line1 = self._file.readline()
# Ensure the file is empty or the format string is correct.
assert line1 == '' or line1 == 'CHUNKFILE 1.0\n'
# Make a dictionary whose keys are the SLURM_JOB_ID
# and whose values are the set of chunks reserved by each job.
self.job_chunks = {job_chunk.slurm_job_id: job_chunk.chunks for job_chunk in [JobChunk.from_json(line) for line in self._file.readlines()]}
def cleanup(self):
"""Remove chunk file entires for jobs that are not running.
"""
running_jobs_now = running_jobs()
for jobid in list(self.job_chunks.keys()):
if jobid not in running_jobs_now:
print(f"jobid: {jobid}, running_jobs_now: {running_jobs_now}")
del self.job_chunks[jobid]
def flush(self):
"""Overwrite the contents of the chunk file with the current job chunks.
Note: This method is called automatically when leaving the context
manager.
"""
# Truncate the file.
self._file.seek(0)
self._file.truncate(0)
# Write the header line.
print('CHUNKFILE 1.0', file=self._file)
# Serialize each job chunk as a line of json.
for key, val in self.job_chunks.items():
job_chunk = JobChunk(key, val)
print(f"{job_chunk.to_json()}", file=self._file)
self._file.flush()
@property
def my_chunks(self) -> set[int]:
"""Chunks for this job.
"""
if SLURM_JOB_ID not in self.job_chunks:
self.job_chunks[SLURM_JOB_ID] = set()
return self.job_chunks[SLURM_JOB_ID]
@property
def all_chunks(self) -> set[int]:
"""Chunks for all jobs in the chunk file.
"""
return {chunk for chunks in self.job_chunks.values() for chunk in chunks}
class LockFile:
"""A simple context manager for a lock file.
The lock file is atomically created if it does not yet exist. Then the
context manager acquires an advisory file lock and opens the lock file. The
lock file is automatically closed and the flock released when exiting the
context manager.
>>> with LockFile('/path/to/lock_file') as lock_file:
>>> # Do mutually exclusive things.
>>> # Optionally, read to/write from the lock_file IO object.
Or if you don't need to do anything with the lock file:
>>> with LockFile('/path/to/lock_file'):
>>> # Do mutually exclusive things.
"""
def __init__(self, lock_filename: os.PathLike):
"""Instantiate a LockFile context manager..
"""
# Store the filename.
self.lock_filename = lock_filename
self.lock_file = None
# Atomically create the lock file if it does not already exist.
try:
f = os.open(lock_filename, os.O_CREAT | os.O_EXCL)
os.close(f)
except FileExistsError:
# If the file already exists that's OK.
pass
def __enter__(self):
# Open and lock the lockfile.
self.lock_file = open(self.lock_filename, 'r+')
try:
fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_EX)
except:
self.lock_file.close()
raise
# Pass the lockfile into the context.
return self.lock_file
def __exit__(self, exc_type, exc_value, traceback):
# Unlock and close the lockfile.
try:
fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN)
finally:
if not self.lock_file.closed:
self.lock_file.close()
self.lock_file = None
# Return False to propagate any exceptions.
return False
class ChunkFile:
"""Context manager for a lock file in which we can store a list of chunks.
>>> with ChunkFile('/path/to/lock_file') as chunk_file:
>>> print(chunk_file.chunks)
Similar to LockFile, this class provudes a context manager for a lock file.
Entering the context acquires an advisory file lock and opens the lock file.
Exiting the context closes the lock file and releases the flock
In addition, ChunkFile manages a list of jobs and what chunks they have
reserved for work. The contents of the file look like this:
{"SLURM_JOB_ID": 123, "chunks": [1, 2, 3]}
{"SLURM_JOB_ID": 124, "chunks": [4, 5]}
{"SLURM_JOB_ID": 125, "chunks": [6, 7, 8]}
Within the context manager, you can access a _ChunkFile with an attribute
chunks, which is a dictionary mapping slurm job ids to a set of chunks each
job has reserved. The _ChunkFile class also provides convenience properties
all_chunks and my_chunks, which are sets. You can reserve and release chunks
with my_chunks.add() and my_chunks.remove(). Changes are automatically
written to the lock file when exiting the context.
"""
def __init__(self, lock_filename: os.PathLike, cleanup: bool = True):
"""Instantiate a ChunkFile from the file name of a lock file.
Creates the lock file if it does not already exist.
"""
self.cleanup = cleanup
self._lock_file = LockFile(lock_filename)
def __enter__(self):
# Delegate to LockContextManager to open and lock the chunk file.
lock_file = self._lock_file.__enter__()
# Try to parse the lock file as a chunk file.
try:
self.chunk_file = _ChunkFile(_ChunkFile._PRIVATE_CONSTRUCTOR_KEY, lock_file)
if self.cleanup:
self.chunk_file.cleanup()
return self.chunk_file
except:
# If there is a problem, ensure the lock is released.
exc_type, exc_value, traceback = sys.exc_info()
self._lock_file.__exit__(exc_type, exc_value, traceback)
raise
def __exit__(self, exc_type, exc_value, traceback):
# Flush the set to the lock file.
try:
self.chunk_file.flush()
self.chunk_file = None
finally:
# Ensure the lock is released.
self._lock_file.__exit__(exc_type, exc_value, traceback)
# Return False to propagate any exceptions.
return False
# Example
if __name__ == "__main__":
import time
import numpy
lock_filename: os.PathLike = '/ceph/chpc/shared/benjamin_kay_group/file.lock'
out_filename: os.PathLike = '/ceph/chpc/shared/benjamin_kay_group/outfile.npy'
chunks: typing.Iterable[int] = range(0,10)
sleep_for_seconds: float = 10
# Track the time it takes to process each chunk.
n_chunks_processed = 0
start_time = time.time()
# Convert chunks to a set.
chunks = {chunk for chunk in chunks}
print(f"Working on the following chunks: {chunks}")
# Do work while there are chunks to work on.
while chunks:
# Check if we have time to process more chunks.
# Give ourselves a 5 minute margin for error.
if n_chunks_processed > 0:
elapsed_time = time.time() - start_time
print(f"Job has been running for {elapsed_time} seconds.")
print(f"Processing a chunk every {elapsed_time/n_chunks_processed} seconds.")
print(f"Slurm job has {TIME_LIMIT_SECONDS - elapsed_time} seconds remaining.")
if elapsed_time / n_chunks_processed + 300 > TIME_LIMIT_SECONDS - elapsed_time:
print(f"We are out of time.")
# Call sbatch to spawn a new job.
# subprocess.run(['sbatch', 'my_job.sh'])
break
# Acquire the lock.
print(f"Acquiring advisory file lock on {lock_filename}")
with ChunkFile(lock_filename) as chunk_file:
print('Lock acquired.')
# Create the output file if it does not exist.
if not os.path.exists(out_filename):
print(f"Initialized empty npy file {out_filename}")
numpy.save(out_filename, numpy.empty(0, dtype=int))
# Check if there is work to do.
# First check the chunk file.
chunks = chunks.difference(chunk_file.all_chunks)
# Then check the output file.
out_data = numpy.load(out_filename)
for chunk in out_data:
chunks.discard(chunk)
print(f"The following chunks still need work: {chunks}")
# Are there any chunks left to work on>
if not chunks:
print('All chunks have been processed, exiting.')
break
# Reserve the next chunk to work on by adding it to the set in the
# lock file.
chunk = chunks.pop()
chunk_file.my_chunks.add(chunk)
print(f"Reserving chunk {chunk} in the lock file {lock_filename}")
# Release the lock and work on the next chunk.
print('Lock released.')
print(f"Working on chunk {chunk}")
time.sleep(sleep_for_seconds)
# Reacquire the lock.
print(f"Acquiring advisory file lock on {lock_filename}")
with ChunkFile(lock_filename) as chunk_file:
print('Lock acquired.')
# Write the result of our work to the output file.
out_data = numpy.load(out_filename)
out_data = numpy.append(out_data, chunk)
numpy.save(out_filename, out_data)
print(f"Wrote npy file {out_filename} with the following chunks: {out_data}")
# Our work is completed; remove this chunk from the reserved set.
chunk_file.my_chunks.remove(chunk)
print(f"Releasing chunk {chunk} from the chunk file {lock_filename}")
print('Lock released.')
# Increment the number of chunks processed.
n_chunks_processed += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment