The simple logging example can be run via:
- mpiexec -n 4 python mpi_logger.py
For the compute and logging example:
- mpiexec -n 4 python mock_execution.py
The mpi scheduler is provided by:
The simple logging example can be run via:
For the compute and logging example:
The mpi scheduler is provided by:
| #!/usr/bin/env python | |
| from os import makedirs | |
| from os.path import exists | |
| import logging | |
| from mpi4py import MPI | |
| import schwimmbad | |
| import structlog | |
| from mpi_logger import MPIFileHandler, COMMON_PROCESSORS | |
| from task import SomeTask | |
| # configure structlog to log in a specific way | |
| structlog.configure( | |
| processors=COMMON_PROCESSORS, | |
| logger_factory=structlog.stdlib.LoggerFactory() | |
| ) | |
| # I/O handler | |
| HANDLER = MPIFileHandler("execution.log") | |
| FORMATTER = logging.Formatter('%(message)s') | |
| HANDLER.setFormatter(FORMATTER) | |
| # initialise a logger | |
| LOGGER = logging.getLogger('status') | |
| LOGGER.setLevel(logging.DEBUG) | |
| LOGGER.addHandler(HANDLER) | |
| def main(): | |
| """ | |
| Simulate the execution of a bunch of tasks using MPI, | |
| as well as setup MPI logging. | |
| We should see all processor ranks logging similar events right up | |
| till `pool = schwimmbad.choose_pool(mpi=True)` is called. | |
| After which only rank 0 will be logging events, and executing lines | |
| of code. | |
| All tasks should contain the same value for rank as an argument, | |
| and the value will be zero. | |
| The tasks themeselves will be carried out by the workers, | |
| ranks 1 and higher, and rank 0 will act as a scheduler. | |
| """ | |
| # comm and processor info | |
| comm = MPI.COMM_WORLD | |
| rank = comm.Get_rank() | |
| size = comm.Get_size() | |
| # bind the logs to include the rank and size | |
| log = structlog.get_logger('status').bind(rank=rank, size=size) | |
| log.info('checking rank and size') | |
| # simpler mechanism the tells slaves to wait for the master's instructions | |
| log.info('about to initialise an MPI pool') | |
| pool = schwimmbad.choose_pool(mpi=True) | |
| # after pool is created, only rank 0 will be executing the following lines | |
| log.info('MPI pool chosen') | |
| # change as required | |
| outdir = 'outdir' | |
| bands = range(1, 21) | |
| # the task class that calls some function | |
| task = SomeTask() | |
| log.info('task chosen') | |
| # define the list of tasks and the input parameters | |
| args = [(outdir, band, rank) for band in bands] | |
| # the rank arg should always be zero (remember we defined the MPI pool) | |
| log.info(args=args) | |
| if not exists(outdir): | |
| log.info('create output directory') | |
| makedirs(outdir) | |
| # map tasks across the pool of workers | |
| # rank 0 acts only as a scheduler, ranks 1 and higher are workers | |
| pool.map(task, args) | |
| pool.close() | |
| log.info('finished processing') | |
| if __name__ == "__main__": | |
| main() |
| #!/usr/bin/env python | |
| """ | |
| Logging configuration for JSON Lines structured logging. | |
| Defines structured logging for: | |
| * Errors -- qualname error | |
| * Status messages -- qualname status | |
| """ | |
| import logging | |
| from mpi4py import MPI | |
| import structlog | |
| from structlog.processors import JSONRenderer | |
| COMMON_PROCESSORS = [ | |
| structlog.stdlib.add_log_level, | |
| structlog.processors.TimeStamper(fmt="ISO"), | |
| structlog.processors.StackInfoRenderer(), | |
| structlog.processors.format_exc_info, | |
| JSONRenderer(sort_keys=True) | |
| ] | |
| def get_wrapped_logger(logger_name: str = 'root', **kwargs): | |
| """ Returns a struct log equivalent for the named logger """ | |
| return structlog.wrap_logger( | |
| logging.getLogger(logger_name), | |
| COMMON_PROCESSORS, | |
| **kwargs | |
| ) | |
| class FormatJSONL(logging.Formatter): | |
| """ Prevents printing of the stack trace to enable JSON lines output """ | |
| def formatException(self, ei): | |
| """ Disables printing separate stack traces """ | |
| return | |
| ERROR_LOGGER = get_wrapped_logger('error', stack_info=True) | |
| STATUS_LOGGER = get_wrapped_logger('status') | |
| class MPIIOStream(object): | |
| """ | |
| A very basic MPI stream handler for synchronised I/O. | |
| """ | |
| def __init__(self, filename, comm, mode): | |
| self._file = MPI.File.Open(comm, filename, mode) | |
| self._file.Set_atomicity(True) | |
| def write(self, msg): | |
| # if for some reason we don't have a unicode string... | |
| try: | |
| msg = msg.encode() | |
| except AttributeError: | |
| pass | |
| self._file.Write_shared(msg) | |
| def sync(self): | |
| """ | |
| Synchronise the processes | |
| """ | |
| self._file.Sync() | |
| def close(self): | |
| self.sync() | |
| self._file.Close() | |
| class MPIFileHandler(logging.StreamHandler): | |
| """ | |
| A basic MPI file handler for writing log files. | |
| Internally opens a synchronised MPI I/O stream via MPIIOStream. | |
| Ideas and some code from: | |
| * https://groups.google.com/forum/#!topic/mpi4py/SaNzc8bdj6U | |
| * https://gist.github.com/JohnCEarls/8172807 | |
| * https://stackoverflow.com/questions/45680050/cannot-write-to-shared-mpi-file-with-mpi4py | |
| """ | |
| def __init__(self, filename, | |
| mode=MPI.MODE_WRONLY|MPI.MODE_CREATE, comm=MPI.COMM_WORLD): | |
| self.filename = filename | |
| self.mode = mode | |
| self.comm = comm | |
| super(MPIFileHandler, self).__init__(self._open()) | |
| def _open(self): | |
| stream = MPIIOStream(self.filename, self.comm, self.mode) | |
| return stream | |
| def close(self): | |
| if self.stream: | |
| self.stream.close() | |
| self.stream = None | |
| def emit(self, record): | |
| """ | |
| Emit a record. | |
| We have to override emit, as the logging.StreamHandler has 2 calls | |
| to 'write'. The first for the message, and the second for the | |
| terminator. This posed a problem for mpi, where a second process | |
| could call 'write' in between these two calls and create a | |
| conjoined log message. | |
| """ | |
| msg = self.format(record) | |
| self.stream.write('{}{}'.format(msg, self.terminator)) | |
| self.flush() | |
| def main(): | |
| """ | |
| A sample test run. | |
| """ | |
| comm = MPI.COMM_WORLD | |
| logger = logging.getLogger("node[%i]"%comm.rank) | |
| # logger = logging.getLogger("func-status") # another name example | |
| logger.setLevel(logging.DEBUG) | |
| mpi_handler = MPIFileHandler("test.log") | |
| formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
| mpi_handler.setFormatter(formatter) | |
| logger.addHandler(mpi_handler) | |
| # sample log levels | |
| logger.debug('debug message') | |
| logger.info('info message') | |
| logger.warn('warn message') | |
| logger.error('error message') | |
| logger.critical('critical message') | |
| if __name__ == "__main__": | |
| main() |
| #!/usr/bin/env python | |
| from os.path import join as pjoin | |
| import numpy | |
| import h5py | |
| from mpi4py import MPI | |
| import structlog | |
| from mpi_logger import COMMON_PROCESSORS | |
| # log file i/o is initialised elsewhere | |
| LOG = structlog.get_logger('status') | |
| # which rank worker will do the work (should always be 1 or higher) | |
| COMM = MPI.COMM_WORLD | |
| RANK = COMM.Get_rank() | |
| class SomeTask(object): | |
| """ | |
| A simple class definition (similar to luigi) that defines | |
| a work method for custom functions. | |
| """ | |
| def work(self, outdir, band, rank): | |
| # rank2 will be the process executing the work | |
| # but the arg 'rank' will always be zero, as rank 0 defined the work | |
| log = LOG.bind(rank=rank, rank2=RANK) | |
| log.info("start processing band: {}".format(band)) | |
| out_fname = pjoin(outdir, 'band-{}.h5'.format(band)) | |
| with h5py.File(out_fname, 'w') as fid: | |
| data = numpy.random.randint(0, 10001, (4000, 4000)) | |
| kwargs = {'data': data, | |
| 'compression': 'lzf', | |
| 'chunks': (1, 4000), | |
| 'shuffle': True} | |
| fid.create_dataset('data', **kwargs) | |
| log.info("finished processing band: {}".format(band)) | |
| def __call__(self, args): | |
| self.work(*args) |