Created
January 17, 2025 13:25
-
-
Save krono/d3a80983c01f05f482b18b1ec96f7992 to your computer and use it in GitHub Desktop.
some slurm model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3 | |
| import datetime | |
| import math | |
| import os | |
| import json | |
| import random | |
| import subprocess | |
| import sys | |
| import traceback | |
| from datetime import datetime, timedelta | |
| from syslog import openlog, syslog | |
| from dataclasses import dataclass, field, fields | |
| from operator import methodcaller, attrgetter | |
| from enum import Enum, Flag, auto | |
| from collections import OrderedDict | |
| from typing import List, Dict, Union, Optional | |
| # white lie to get int + infinity | |
| number = float | |
| # | |
| class JobState(Enum): | |
| BOOT_FAIL = "BF" | |
| CANCELLED = "CA" | |
| COMPLETED = "CD" | |
| CONFIGURING = "CF" | |
| COMPLETING = "CG" | |
| DEADLINE = "DL" | |
| FAILED = "F" | |
| NODE_FAIL = "NF" | |
| OUT_OF_MEMORY = "OOM" | |
| PENDING = "PD" | |
| PREEMPTED = "PR" | |
| RUNNING = "R" | |
| RESV_DEL_HOLD = "RD" | |
| REQUEUE_FED = "RF" | |
| REQUEUE_HOLD = "RH" | |
| REQUEUED = "RQ" | |
| RESIZING = "RS" | |
| REVOKED = "RV" | |
| SIGNALING = "SI" | |
| SPECIAL_EXIT = "SE" | |
| STAGE_OUT = "SO" | |
| STOPPED = "ST" | |
| SUSPENDED = "S" | |
| TIMEOUT = "TO" | |
| class NodeStateBase(Enum): | |
| UNKNOWN = "unk" | |
| DOWN = "down" | |
| IDLE = "idle" | |
| ALLOCATED = "alloc" | |
| ERROR = "error" | |
| MIXED = "mix" | |
| FUTURE = "future" | |
| # Aliases | |
| UNK = UNKNOWN | |
| ALLOC = ALLOCATED | |
| MIX = MIXED | |
| @property | |
| def short_name(self): | |
| return self.value | |
| class NodeStateFlag(Flag): | |
| NO_FLAG = 0 | |
| BLOCKED = auto() | |
| CLOUD = auto() | |
| COMPLETING = auto() | |
| DRAIN = auto() | |
| DYNAMIC_FUTURE = auto() | |
| DYNAMIC_NORM = auto() | |
| INVALID_REG = auto() | |
| FAIL = auto() | |
| MAINTENANCE = auto() | |
| POWER_DOWN = auto() | |
| POWER_UP = auto() | |
| POWERED_DOWN = auto() | |
| REBOOT_REQUESTED = auto() | |
| REBOOT_ISSUED = auto() | |
| RESERVED = auto() | |
| RESUME = auto() | |
| NOT_RESPONDING = auto() | |
| PLANNED = auto() | |
| POWERING_UP = auto() | |
| POWERING_DOWN = auto() | |
| # Aliases | |
| MAINT = MAINTENANCE | |
| NO_RESPOND = NOT_RESPONDING | |
| RESV = RES = RESERVED | |
| COMP = COMPLETING | |
| @property | |
| def sigil(self): | |
| if self & self.NOT_RESPONDING: return "*" | |
| if self & self.REBOOT_ISSUED: return "^" | |
| if self & self.POWER_DOWN: return "!" | |
| if self & self.POWER_UP: return "#" | |
| if self & self.POWERED_DOWN: return "~" | |
| if self & self.REBOOT_REQUESTED: return "@" | |
| if self & self.POWERING_DOWN: return "%" | |
| if self & self.MAINTENANCE: return "$" | |
| @property | |
| def short_name(self): | |
| if self & self.MAINTENANCE: return "maint" | |
| if self & self.RESERVED: return "resv" | |
| if self & self.COMPLETING: return "comp" | |
| def _is_composite(self): | |
| try: # 3.11+ | |
| return len(self) > 1 | |
| except TypeError: # 3.4-10 | |
| return self not in NodeStateFlag | |
| def __iter__(self): | |
| try: # 3.11+ | |
| yield from self._iter_member_(self._value_) | |
| except: # 3.4-10 | |
| yield from reversed(list(self._iter_member_back(self._value_))) | |
| def _iter_member_back(self, value): | |
| try: | |
| while value: | |
| tag = value.bit_length()-1 | |
| yield type(self)(2**tag) | |
| value &= ~(2**tag) | |
| except StopIteration: | |
| return | |
| def __str__(self): | |
| if not self: return "" # no flag | |
| if not self._is_composite(): return self.name # simple | |
| return "+".join(str(flag) for flag in self) # composit | |
| _DOWN_FLAGS = \ | |
| NodeStateFlag.DRAIN | \ | |
| NodeStateFlag.FAIL | \ | |
| NodeStateFlag.INVALID_REG | \ | |
| NodeStateFlag.NOT_RESPONDING | \ | |
| NodeStateFlag.MAINTENANCE | \ | |
| NodeStateFlag.PLANNED | \ | |
| NodeStateFlag.POWER_DOWN | \ | |
| NodeStateFlag.POWERING_DOWN | \ | |
| NodeStateFlag.POWERED_DOWN | \ | |
| NodeStateFlag.POWERING_UP | \ | |
| NodeStateFlag.REBOOT_ISSUED | \ | |
| NodeStateFlag.REBOOT_REQUESTED | \ | |
| NodeStateFlag.RESERVED | |
| @dataclass(frozen=True) | |
| class NodeState: | |
| state: NodeStateBase | |
| flags: NodeStateFlag = NodeStateFlag.NO_FLAG | |
| def is_down_state(self): | |
| return self.state == NodeStateBase.DOWN or bool(self.flags & _DOWN_FLAGS) | |
| def __str__(self): | |
| info = [self.state.name] | |
| if self.flags: | |
| info += ["+", str(self.flags)] | |
| return "".join(info) | |
| def print_short(self): | |
| if flag_prio := self.flags.short_name: | |
| state = flag_prio | |
| elif self.flags & self.flags.DRAIN: | |
| if self.state is self.state.IDLE: | |
| state = "drng" | |
| else: | |
| state = "drain" | |
| else: | |
| state = self.state.short_name | |
| if sigil := self.flags.sigil: | |
| state += sigil | |
| return state | |
| @dataclass(frozen=True) | |
| class GPU: | |
| """ | |
| Describes _an amount_ of GPU (allocated or used) | |
| """ | |
| name: str | |
| count: int | |
| used: int = None | |
| def __str__(self): | |
| if self.used is not None: | |
| return self.print_used() | |
| return f"{self.name}:{self.count}" if self.name else f"{self.count}" | |
| def __add__(self, other): | |
| if isinstance(other, type(self)): | |
| return self.count + other.count | |
| elif isinstance(other, int): | |
| return self.count + other | |
| return NotImplemented | |
| __radd__ = __add__ | |
| def print_used(self, do_color=False): | |
| gpu_string_usage = f"{self.used or 0}/{self.count}" | |
| if self.used == self.count and do_color: | |
| gpu_string_usage = red(gpu_string_usage) | |
| return f"{gpu_string_usage} ({self.name})" | |
| def print_free(self) -> str: | |
| free = self.count - (self.used or 0) | |
| if free > 0: | |
| return f"{free} ({self.name})" | |
| else: | |
| return "" | |
| @staticmethod | |
| def find_in_tres(tres): | |
| if not tres: | |
| return [] | |
| assert isinstance(tres, OrderedDict) | |
| gpus = [] | |
| # reverse over tres, most specific at end. | |
| for key, value in reversed(tres.items()): | |
| if key.startswith("gres/gpu:"): # specific gpus requested or allocated | |
| gpus.append(GPU(name=key[len("gres/gpu:"):], count=int(value))) | |
| elif key == "gres/gpu": # overall gpus requested or allocated | |
| if len(gpus): # if specific ones are already there, make sure count is right | |
| assert sum(gpus) == int(value) | |
| else: | |
| gpus.append(AnyGPU(count=int(value))) | |
| else: # we're beyond GPUs at the moment | |
| break | |
| return gpus | |
| @staticmethod | |
| def consume(gpus, gpus_used): | |
| """ | |
| Take a list of gpus and do the logical subtraction of 'used' gpus. | |
| """ | |
| def _pop_or_none(l, fun): | |
| """ find item in l based on callable predicate fun | |
| removes item if found, and returns it; otherwise returns None""" | |
| for idx, itm in enumerate(l): | |
| if fun(itm): | |
| l.pop(idx) | |
| return itm | |
| if not (gpus or gpus_used): | |
| return gpus | |
| gpus_used = gpus_used[:] | |
| return [ | |
| GPU(gpu.name, gpu.count, int(gpu.used or 0) + used_gpu.count) \ | |
| if (used_gpu := _pop_or_none(gpus_used, lambda u_gpu: u_gpu.name == gpu.name)) \ | |
| else gpu \ | |
| for gpu in gpus | |
| ] | |
| def AnyGPU(count=1): | |
| return GPU(name=None, count=count) | |
| # cached "now", so all jobs compare aganinst the same time. | |
| def _now(__cache=[]): | |
| # This uses the fact that default arguments are MUTABLE | |
| if not len(__cache): | |
| now = datetime.now() | |
| now -= timedelta(microseconds=now.microsecond) # truncate to seconds | |
| __cache.append(now) | |
| return __cache[0] | |
| COMMAND_EXPAND_NODELIST="scontrol show hostnames" | |
| # expand a nodelist like 'cn[020-400]' to individual nodes via scontrol | |
| def _expand_node_list(in_list): | |
| if not in_list: return "" | |
| hosts = get_command_output(COMMAND_EXPAND_NODELIST.split() + [",".join(in_list)]) | |
| return [node.strip() for node in hosts.strip().split("\n")] | |
| def _g(m): | |
| """make gigabyte from megabyte""" | |
| return m >> 10 | |
| def _day_str(td: timedelta): | |
| """report timedelta days as words""" | |
| return "%2d day%s" % (lambda n: (n, abs(n) != 1 and "s" or ""))(td.days) | |
| @dataclass | |
| class Job: | |
| """ | |
| Describes the properties of a single job | |
| """ | |
| job_id: int | |
| user_id: int = None | |
| user_name: str = "" | |
| job_name: str = "" | |
| job_state: JobState = None | |
| state_reason: str = None | |
| run_time: timedelta = field(init=False) | |
| time_limit: timedelta = None | |
| submit_time: datetime = None | |
| start_time: datetime = None | |
| end_time: datetime = None | |
| suspend_time: datetime = None | |
| pre_sus_time: timedelta = None | |
| partition: List[str] = None | |
| required_nodes: List[str] = None | |
| nodes: List[str] = None | |
| cpus: number = None | |
| memory_mb: number = None | |
| gpus_requested: List[GPU] = None | |
| gpus_allocated: List[GPU] = None | |
| # objectifier hooks | |
| additional_time_fields = ("last_sched_evaluation",) | |
| timedelta_fields = ("pre_sus_time",) | |
| target_object = "jobs" | |
| def __post_init__(self): | |
| self.run_time = self._run_time() | |
| def _run_time(self): | |
| """ see scontrol/info_job.c:_sprint_job_info "Line 6" of Slurm """ | |
| if self.job_state == JobState.PENDING or not self.start_time: | |
| return timedelta(0) | |
| elif self.job_state == JobState.SUSPENDED: | |
| return self.pre_sus_time | |
| else: # started | |
| # finished or running | |
| if self.job_state == JobState.RUNNING or not self.end_time : | |
| end = _now() | |
| else: | |
| end = self.end_time | |
| if self.suspend_time: | |
| return (end - self.suspend_time) + self.pre_sus_time | |
| else: | |
| return end - self.start_time | |
| def run_time_pretty(self): | |
| if not self.run_time: | |
| return "" | |
| elif self.run_time.seconds: | |
| return str(self.run_time) | |
| else: | |
| return _day_str(self.run_time) | |
| def as_list(self, detailed=False): | |
| """ | |
| Return some job attributes as a list, to be displayed in a table. The correct headers are returned by get_header | |
| :param detailed: Detailed (True) or default version with fewer columns (False) | |
| :return: List of attributes describing this job | |
| """ | |
| gpu_info = ",".join(map(str, self.gpus_allocated)) | |
| if gpu_info == "" and len(self.gpus_requested): | |
| gpu_info = str(len(self.gpus_requested)) | |
| memory_info = f'{_g(self.memory_mb)}' | |
| if detailed: | |
| nodes_reason = "" | |
| if self.nodes: | |
| nodes_reason = ",".join(self.nodes) | |
| elif self.state_reason: | |
| nodes_reason = f"({self.state_reason})" | |
| limit = str(self.time_limit) if self.time_limit is not timedelta.max else "∞" | |
| return [f"{self.job_id}", ",".join(self.partition), f"{self.job_name:.20}", nodes_reason, f"{self.user_id}", | |
| self.user_name, self.job_state.name, f"{self.run_time}", limit, f"{self.cpus}", memory_info, gpu_info] | |
| else: | |
| time_info = self.run_time_pretty() | |
| return [f"{self.job_id}", ",".join(self.partition), ",".join(self.nodes), self.user_name, | |
| self.job_state.value, time_info, f"{self.cpus}", memory_info, gpu_info] | |
| @staticmethod | |
| def get_header(detailed=False): | |
| """ | |
| Returns the header row for the table output. To be used in combination with job.as_list. | |
| :param detailed: Headers for the detailed (True) or the default (False) version | |
| :return: The table headers corresponding to the list returned by as_list | |
| """ | |
| if detailed: | |
| return ("JOBID", "PARTITION", "NAME", "NODELIST(REASON)", "UID", "USER", "STATE", "TIME", "LIMIT", "CPUS", | |
| "MEM (GB)", "GPUS") | |
| else: | |
| return ("JOBID", "PARTITION", "NODES", "USER", "ST", "TIME", "CPUS", "MEM", "GPUS") | |
| @staticmethod | |
| def try_object(obj): | |
| if "job_state" not in obj: | |
| return False | |
| # lookup state | |
| obj["job_state"] = JobState[obj["job_state"]] | |
| obj["job_name"] = obj["name"] | |
| # fix reason | |
| if obj["state_reason"] == "None": | |
| obj["state_reason"] = None | |
| # listify | |
| obj.force_list_of_strings("required_nodes") | |
| obj.force_list_of_strings("nodes", value=obj["job_resources"]["nodes"] if obj["job_resources"] else []) | |
| obj.force_list_of_strings("partition") | |
| # fix timelimit: | |
| obj["time_limit"] = timedelta(minutes=obj["time_limit"]) if not math.isinf(obj["time_limit"]) else timedelta.max | |
| obj["gpus_allocated"] = GPU.find_in_tres(obj["tres_alloc_str"]) | |
| obj["gpus_requested"] = GPU.find_in_tres(obj["tres_req_str"]) | |
| # see how the memory is config'ed | |
| obj["memory_mb"] = None | |
| try: # Catch if somewhere an infinity slipped in. | |
| if obj["memory_per_cpu"] is not None: | |
| obj["memory_mb"] = int(obj["memory_per_cpu"] * obj["cpus"]) | |
| elif obj["memory_per_node"] is not None: | |
| obj["memory_mb"] = int(obj["memory_per_node"] * obj["node_count"]) | |
| except OverflowError: | |
| pass | |
| return True | |
| Job.json_attribs = set(map(attrgetter("name"), fields(Job))) | |
| @dataclass | |
| class Node: | |
| """ | |
| Describes properties and state of a node | |
| """ | |
| name: str | |
| cores: int | |
| cpus: int | |
| sockets: int | |
| threads: int | |
| real_memory: int | |
| architecture: str = None | |
| cluster_name: str = "" | |
| partitions: List[Union[str, "Partition"]] = field(default=None, repr=False) | |
| features: List[str] = None | |
| active_features: List[str] = None | |
| comment: str = None | |
| reason: str = "" | |
| reason_set_by_user: str = "" | |
| reservation: str = None | |
| gpus: List[GPU] = None | |
| node_state: NodeState = None | |
| free_mem: int = None | |
| specialized_memory: int = None | |
| alloc_memory: int = None | |
| alloc_cpus: int = None | |
| cpu_load: int = None | |
| boot_time: datetime = None | |
| resume_after: timedelta = None | |
| # objectifier hooks | |
| additional_time_fields = ("last_busy",) | |
| timedelta_fields = () | |
| target_object = "nodes" | |
| # convenience interface | |
| @property | |
| def cpus_total(self): return self.cpus | |
| @property | |
| def cpus_used(self): return self.alloc_cpus | |
| @property | |
| def cpus_free(self): return self.cpus - self.alloc_cpus | |
| @property | |
| def memory_total(self): return self.real_memory - (self.specialized_memory or 0) | |
| @property | |
| def memory_used(self): return self.alloc_memory | |
| @property | |
| def memory_free(self): return self.free_mem | |
| @property | |
| def partition_names(self): return [str(p) for p in self.partitions] | |
| def __str__(self): return self.name | |
| # comparison | |
| def __lt__(self, other): | |
| if not isinstance(other, type(self)): | |
| raise TypeError | |
| return self.name < other.name | |
| def __eq__(self, other): | |
| if not isinstance(other, type(self)): | |
| return False | |
| return self.name == other.name | |
| def __hash__(self): | |
| return hash(self.name) | |
| def update_partition(self, partition): | |
| # replace partition _string_ with partition _object_ of same name if there. | |
| if partition.name in self.partitions: | |
| self.partitions = [partition if str(p) == str(partition) else p for p in self.partitions] | |
| def update_partitions(self, partitions): | |
| try: | |
| for partition in partitions: | |
| self.update_partition(partition) | |
| except TypeError: # if only one given, no iterable | |
| self.update_partition(partitions) | |
| @staticmethod | |
| def _find_state(obj): | |
| obj.force_list_of_strings("state") | |
| base_state = NodeStateBase.UNKNOWN | |
| flags = NodeStateFlag.NO_FLAG | |
| for obj_state in obj["state"]: | |
| try: | |
| base_state = NodeStateBase[obj_state] | |
| continue | |
| except KeyError: | |
| pass | |
| try: | |
| new_flag = NodeStateFlag[obj_state] | |
| flags = new_flag if not flags else flags | new_flag | |
| continue | |
| except KeyError: | |
| pass | |
| # Special case Mixed vs ALLOCATED | |
| if base_state == NodeStateBase.ALLOCATED and ( | |
| obj.get("alloc_cpus", 0) < obj.get("cpus", 0)): | |
| base_state = NodeStateBase.MIXED | |
| return NodeState(base_state, flags) | |
| @staticmethod | |
| def try_object(obj): | |
| # good marker for a Node | |
| if "next_state_after_reboot" not in obj: | |
| return False | |
| # lookup state | |
| obj["node_state"] = Node._find_state(obj) | |
| obj["name"] = obj["name"] | |
| # listify | |
| obj.force_list_of_strings("partitions") | |
| obj.force_list_of_strings("features") | |
| obj.force_list_of_strings("active_features") | |
| # gpus | |
| gpus = GPU.find_in_tres(obj["tres"]) | |
| gpus_used = GPU.find_in_tres(obj["tres_used"]) | |
| obj["gpus"] = GPU.consume(gpus, gpus_used) | |
| return True | |
| def is_node_down(self) -> bool: | |
| return self.node_state.is_down_state() | |
| def get_used_res_strings(self, admin_mode=False) -> List[str]: | |
| """ | |
| This function can be used to return all the data of a node as list of entries, which can then be printed | |
| in a table. | |
| :return: List of "columns" (name, partition, etc. + used/total values for resources), to be displayed as | |
| a line in a table | |
| """ | |
| state_info = self.node_state.print_short() if not admin_mode else str(self.node_state) | |
| if self.is_node_down(): | |
| state_info = red(state_info) | |
| cpu_info = f"{self.cpus_used: >3}/{self.cpus_total: >3}" | |
| if self.cpus_used == self.cpus_total: | |
| cpu_info = red(cpu_info) | |
| memory_info = f"{_g(self.memory_used): >4}/{_g(self.memory_total): >4} G" | |
| if self.memory_used == self.memory_total: | |
| memory_info = red(memory_info) | |
| gpu_info = ", ".join(gpu.print_used(True) for gpu in self.gpus) | |
| partition_info = ",".join(p.info_name for p in self.partitions) | |
| return [partition_info, self.name, state_info, cpu_info, memory_info, gpu_info] | |
| def get_free_res_strings(self) -> Optional[List[str]]: | |
| """ | |
| This function can be used to return all the data of a usable node as list of entries, which can then be printed | |
| in a table. | |
| :return: List of "columns" (name, partition, etc. + "free" values for resources), to be displayed as | |
| a line in a table, OR none if the node can't be used right now (no CPUs/memory available) | |
| """ | |
| if self.is_node_down(): | |
| return | |
| if not self.cpus_free: | |
| return | |
| cpu_info = f"{self.cpus_free: >4}" | |
| if not self.memory_free: | |
| return | |
| memory_info = f"{_g(self.memory_free): >4} G" | |
| gpu_info = ", ".join([gpu.print_free() for gpu in self.gpus]) | |
| partition_info = ",".join(p.info_name for p in self.partitions) | |
| return [partition_info, self.name, cpu_info, memory_info, gpu_info] | |
| Node.json_attribs = set(map(attrgetter("name"), fields(Node))) | |
| @dataclass | |
| class Partition: | |
| """ | |
| This class stores some details about a partition | |
| """ | |
| name: str | |
| default: bool = False | |
| hidden: bool = False | |
| time_limit: timedelta = timedelta.max | |
| default_time: timedelta = None | |
| nodes: List[Union[str, "Node"]] = field(default=None, repr=False) | |
| # objectifier hooks | |
| additional_time_fields = () | |
| timedelta_fields = () | |
| target_object = "partitions" | |
| def __str__(self): return self.name | |
| # comparison | |
| def __lt__(self, other): | |
| if not isinstance(other, type(self)): | |
| raise TypeError | |
| return (self.default and not other.default) or (self.name < other.name) | |
| def __eq__(self, other): | |
| if not isinstance(other, type(self)): | |
| return False | |
| return (self.default is other.default) and (self.name == other.name) | |
| def __hash__(self): | |
| return hash((self.default, self.name)) | |
| def is_default(self) -> bool: | |
| return self.default | |
| def update_addtitional(self, partition_dict): | |
| self.default = (partition_dict.get("Default", "NO") != "NO") | |
| self.hidden = (partition_dict.get("Hidden", "NO") != "NO") | |
| def update_nodes(self, node_list): | |
| # update us | |
| self.nodes = [node for node in node_list if self.name in node.partitions] | |
| # update them | |
| for node in self.nodes: | |
| node.update_partitions(self) | |
| @property | |
| def info_name(self): | |
| return self.name + "*" if self.default else self.name | |
| def time_limit_pretty(self): | |
| if self.time_limit is timedelta.max: | |
| return "∞" | |
| elif self.time_limit.seconds: | |
| return str(self.time_limit) | |
| else: | |
| return _day_str(self.time_limit) | |
| def default_time_pretty(self): | |
| return str(self.default_time) if self.default_time else "" | |
| @staticmethod | |
| def try_object(obj): | |
| if "node_sets" not in obj: # good partition identifier | |
| return False | |
| # fix timelimit: | |
| if tl := obj["maximums"]["time"]: | |
| obj["time_limit"] = timedelta(minutes=tl) if not math.isinf(tl) else timedelta.max | |
| if dl := obj["defaults"]["time"]: | |
| obj["default_time"] = timedelta(minutes=dl) if not math.isinf(dl) else timedelta.max | |
| # we handle that later | |
| if "nodes" in obj: | |
| del obj["nodes"] | |
| return True | |
| Partition.json_attribs = set(map(attrgetter("name"), fields(Partition))) | |
| @dataclass | |
| class Reservation: | |
| """ | |
| Stores the most important information about a pending reservation | |
| """ | |
| name: str | |
| start_time: datetime | |
| end_time: datetime | |
| node_list: str = field(default=None, repr=False) | |
| partition: Union[str, "Partition"] = field(default=None, repr=False) | |
| flags: List[str] = None | |
| # objectifier hooks | |
| additional_time_fields = () | |
| timedelta_fields = () | |
| target_object = "reservations" | |
| def __str__(self): return self.name | |
| @staticmethod | |
| def try_object(obj): | |
| if not "purge_completed" in obj: # good partition identifier | |
| return False | |
| obj.force_list_of_strings("node_list") | |
| # expand foo[01-20] | |
| obj["node_list"] = _expand_node_list(obj["node_list"]) | |
| obj.force_list_of_strings("flags") | |
| return True | |
| def is_future(self): | |
| return self.start_time > _now() | |
| def eta(self): | |
| if self.is_future(): | |
| return self.start_time - _now() | |
| else: | |
| return timedelta(0) | |
| @property | |
| def node_names(self): | |
| return [str(node) for node in self.node_list] | |
| def update_node(self, node): | |
| # replace partition _string_ with node _object_ of same name if there. | |
| if node.name in self.node_list: | |
| self.node_list = [node if str(n) == str(node) else n for n in self.node_list] | |
| def update_nodes(self, nodes): | |
| try: | |
| for node in nodes: | |
| self.update_node(node) | |
| except TypeError: # if only one given, no iterable | |
| self.update_node(nodes) | |
| def print_pending_pretty(self): | |
| limits = [] | |
| for node in self.node_list: | |
| if isinstance(node, Node): | |
| for partition in node.partitions: | |
| if not partition.hidden: | |
| limits.append(partition.time_limit) | |
| max_time = max(limits) | |
| if self.eta() > max_time: | |
| # reservation not near enough, ignore | |
| return | |
| start_in = f" (starting in '{slurm_time(self.eta())}')" if self.is_future() else "" | |
| line = "[MAINTENANCE] " if "MAINT" in self.flags else "" | |
| line += f"There is a reservation '{self.name}' between {self.start_time}{start_in} and " \ | |
| f"{self.end_time}. " | |
| line += f"The following nodes are reserved and cannot be used during that time: {', '.join(self.node_names)}. " | |
| return line | |
| Reservation.json_attribs = set(map(attrgetter("name"), fields(Reservation))) | |
| # | |
| class Objectifier: | |
| """ | |
| Objectifier is a callable object that transforms slurm scontrol json into Job and GPU objects. | |
| """ | |
| def __init__(self, target): | |
| self.target = target | |
| self.obj = None | |
| class Done(Exception): | |
| def __init__(self, value): | |
| self.value = value | |
| # simple access to obj | |
| def __getitem__(self, key): | |
| return self.obj[key] | |
| def __setitem__(self, key, value): | |
| self.obj[key] = value | |
| def __delitem__(self, key): | |
| del self.obj[key] | |
| def __contains__(self, item): | |
| try: | |
| return item in self.obj | |
| except: | |
| return False | |
| # emulate a dict | |
| def items(self): | |
| return self.obj.items() | |
| def keys(self): | |
| return self.obj.keys() | |
| def get(self, key, default=None): | |
| return self.obj.get(key, default) | |
| def __call__(self, obj): | |
| """ process obj (a dict) coming from json.loads """ | |
| try: | |
| self.obj = obj | |
| # improve datatypes | |
| self.timify() | |
| self.listify() | |
| self.dictify() | |
| # try to match objects; these are ok to raise Done when done | |
| self.try_value() | |
| self.try_meta() | |
| # callback | |
| self.try_object() | |
| except self.Done as done: | |
| return done.value | |
| else: | |
| return self.obj | |
| finally: | |
| self.obj = None | |
| def _ify(self, predicate, transform): | |
| """ | |
| transform all entries with transformer iff predicated matches for any of them | |
| """ | |
| def _t(k, v): | |
| if predicate(k, v): | |
| return transform(k, v) | |
| else: | |
| return k, v | |
| if any(predicate(k, v) for k,v in self.items()): | |
| self.obj = dict(_t(key, value) for key, value in self.items()) | |
| def timify(self): | |
| """ make time objects from POSIX stamps but treat 0 as 'absent' """ | |
| def _transform(k,v): | |
| if k in self.target.timedelta_fields: | |
| return k, timedelta(seconds=v) | |
| elif v and v > 0: | |
| return k, datetime.fromtimestamp(v) | |
| return k, None | |
| self._ify( | |
| predicate=lambda k, v: k.endswith("_time") or k in self.target.additional_time_fields, | |
| transform=_transform) | |
| def listify(self): | |
| """ make comma-separated strings into lists """ | |
| self._ify( | |
| predicate=lambda k, v: k != "name" and isinstance(v, str) and "," in v, | |
| transform=lambda k, v: (k, v.split(","))) | |
| def dictify(self): | |
| """ make (ordered) dicts from into lists """ | |
| def _predicate(k, v): | |
| return isinstance(v, list) and \ | |
| len(v) and \ | |
| all([isinstance(ea, str) and ea.count("=") == 1 for ea in v]) | |
| self._ify( | |
| predicate=_predicate, | |
| transform=lambda k, v: (k, OrderedDict(map(methodcaller("split", "="), v)))) | |
| def try_value(self): | |
| """ slurm uses a funny serialization for maybe set/maybe infinte values | |
| map to None, inf, or value. | |
| """ | |
| if self.keys() == {"set","infinite","number"}: | |
| if self["infinite"]: | |
| raise self.Done(float("inf")) | |
| elif not self["set"]: | |
| raise self.Done(None) | |
| else: | |
| raise self.Done(self["number"]) | |
| def force_list_of_strings(self, attrib, value=None): | |
| """ | |
| make sure attrib is a (maybe empty) list of strings, even if it was a simple string before | |
| """ | |
| if not value: | |
| value = self[attrib] | |
| if not isinstance(value, list): | |
| assert isinstance(value, str) | |
| value = [value] if value != "" else [] # allow empty | |
| assert all([isinstance(each, str) for each in value]) | |
| self[attrib] = value | |
| def try_meta(self): | |
| """ | |
| Handle the outermost. | |
| treat errors and/or warnings and return the rest | |
| """ | |
| meta = {"meta", "errors", "warnings"} | |
| if meta.intersection(self.keys()) == meta: | |
| for warning in self["warnings"]: # might be empty | |
| print(f"Warning: {warning['description']}", file=sys.stderr) | |
| if len(self["errors"]): # hopefully not. | |
| raise Exception([error["description"] for error in self["errors"]]) | |
| # we're interested in the Jobs | |
| raise self.Done(self[self.target.target_object]) | |
| def try_object(self): | |
| if self.target.try_object(self): | |
| # assemble | |
| relevant = {k: v for k, v in self.items() if k in self.target.json_attribs} | |
| # Done in this case | |
| raise self.Done(self.target(**relevant)) | |
| # | |
| def get_command_output(command): | |
| """ | |
| Run a command on the system and return the output | |
| :param command: Command to run | |
| :return: Output stdout of that command | |
| """ | |
| try: | |
| proc = subprocess.run(command, capture_output=True, text=True, check=True) | |
| except subprocess.CalledProcessError as cpe: | |
| cmd = " ".join(command) | |
| raise Exception(f'command "{cmd}" failed ({cpe.returncode}) with "{cpe.output}"') from cpe | |
| else: | |
| return proc.stdout | |
| def get_structured(json_source, thing): | |
| """ | |
| Parse a json returned by 'scontrol show SOMETHING --json' | |
| :param json_source: Raw JSON from scontrol | |
| :return: List of Things parsed. | |
| """ | |
| return json.loads(json_source, object_hook=Objectifier(thing)) | |
| def get_additional_partition_data(slurm_oneline_source): | |
| """ | |
| Parse a slurm oneline string to dict of dicts. | |
| This is because the json output for partitions is incomplete as of yet | |
| """ | |
| parts = {} | |
| for part_line in slurm_oneline_source.strip().split("\n"): | |
| part = {} | |
| for entry in part_line.split(" "): | |
| k, v = entry.split("=", 1) | |
| part[k] = v | |
| parts[part["PartitionName"]] = part | |
| return parts | |
| def slurm_time(td: timedelta) -> str: | |
| """ | |
| Format :param td: as Slurm-like time, which can then be supplied to a Slurm | |
| command with the '-t'/'--time' parameter. | |
| Specifically, days, if present are separated by '-' from time. | |
| :param td: Timedelta to be formatted | |
| :return: A string in the format "d-HH:MM:SS | |
| """ | |
| if td.total_seconds() < 0: | |
| return "<ERROR>" | |
| ret = "" | |
| if td.days > 0: | |
| ret = str(td.days) + "-" | |
| td -= timedelta(days=td.days) | |
| ret += str(td or 0) | |
| return ret | |
| # | |
| if __name__ == "__main__": | |
| nodes_list = get_command_output("scontrol show nodes --json".split()) | |
| nodes = get_structured(nodes_list, Node) | |
| from pprint import pprint | |
| pprint(nodes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment