Skip to content

Instantly share code, notes, and snippets.

@krono
Created January 17, 2025 13:25
Show Gist options
  • Select an option

  • Save krono/d3a80983c01f05f482b18b1ec96f7992 to your computer and use it in GitHub Desktop.

Select an option

Save krono/d3a80983c01f05f482b18b1ec96f7992 to your computer and use it in GitHub Desktop.
some slurm model
#!/usr/bin/python3
import datetime
import math
import os
import json
import random
import subprocess
import sys
import traceback
from datetime import datetime, timedelta
from syslog import openlog, syslog
from dataclasses import dataclass, field, fields
from operator import methodcaller, attrgetter
from enum import Enum, Flag, auto
from collections import OrderedDict
from typing import List, Dict, Union, Optional
# white lie to get int + infinity
number = float
#
class JobState(Enum):
BOOT_FAIL = "BF"
CANCELLED = "CA"
COMPLETED = "CD"
CONFIGURING = "CF"
COMPLETING = "CG"
DEADLINE = "DL"
FAILED = "F"
NODE_FAIL = "NF"
OUT_OF_MEMORY = "OOM"
PENDING = "PD"
PREEMPTED = "PR"
RUNNING = "R"
RESV_DEL_HOLD = "RD"
REQUEUE_FED = "RF"
REQUEUE_HOLD = "RH"
REQUEUED = "RQ"
RESIZING = "RS"
REVOKED = "RV"
SIGNALING = "SI"
SPECIAL_EXIT = "SE"
STAGE_OUT = "SO"
STOPPED = "ST"
SUSPENDED = "S"
TIMEOUT = "TO"
class NodeStateBase(Enum):
UNKNOWN = "unk"
DOWN = "down"
IDLE = "idle"
ALLOCATED = "alloc"
ERROR = "error"
MIXED = "mix"
FUTURE = "future"
# Aliases
UNK = UNKNOWN
ALLOC = ALLOCATED
MIX = MIXED
@property
def short_name(self):
return self.value
class NodeStateFlag(Flag):
NO_FLAG = 0
BLOCKED = auto()
CLOUD = auto()
COMPLETING = auto()
DRAIN = auto()
DYNAMIC_FUTURE = auto()
DYNAMIC_NORM = auto()
INVALID_REG = auto()
FAIL = auto()
MAINTENANCE = auto()
POWER_DOWN = auto()
POWER_UP = auto()
POWERED_DOWN = auto()
REBOOT_REQUESTED = auto()
REBOOT_ISSUED = auto()
RESERVED = auto()
RESUME = auto()
NOT_RESPONDING = auto()
PLANNED = auto()
POWERING_UP = auto()
POWERING_DOWN = auto()
# Aliases
MAINT = MAINTENANCE
NO_RESPOND = NOT_RESPONDING
RESV = RES = RESERVED
COMP = COMPLETING
@property
def sigil(self):
if self & self.NOT_RESPONDING: return "*"
if self & self.REBOOT_ISSUED: return "^"
if self & self.POWER_DOWN: return "!"
if self & self.POWER_UP: return "#"
if self & self.POWERED_DOWN: return "~"
if self & self.REBOOT_REQUESTED: return "@"
if self & self.POWERING_DOWN: return "%"
if self & self.MAINTENANCE: return "$"
@property
def short_name(self):
if self & self.MAINTENANCE: return "maint"
if self & self.RESERVED: return "resv"
if self & self.COMPLETING: return "comp"
def _is_composite(self):
try: # 3.11+
return len(self) > 1
except TypeError: # 3.4-10
return self not in NodeStateFlag
def __iter__(self):
try: # 3.11+
yield from self._iter_member_(self._value_)
except: # 3.4-10
yield from reversed(list(self._iter_member_back(self._value_)))
def _iter_member_back(self, value):
try:
while value:
tag = value.bit_length()-1
yield type(self)(2**tag)
value &= ~(2**tag)
except StopIteration:
return
def __str__(self):
if not self: return "" # no flag
if not self._is_composite(): return self.name # simple
return "+".join(str(flag) for flag in self) # composit
_DOWN_FLAGS = \
NodeStateFlag.DRAIN | \
NodeStateFlag.FAIL | \
NodeStateFlag.INVALID_REG | \
NodeStateFlag.NOT_RESPONDING | \
NodeStateFlag.MAINTENANCE | \
NodeStateFlag.PLANNED | \
NodeStateFlag.POWER_DOWN | \
NodeStateFlag.POWERING_DOWN | \
NodeStateFlag.POWERED_DOWN | \
NodeStateFlag.POWERING_UP | \
NodeStateFlag.REBOOT_ISSUED | \
NodeStateFlag.REBOOT_REQUESTED | \
NodeStateFlag.RESERVED
@dataclass(frozen=True)
class NodeState:
state: NodeStateBase
flags: NodeStateFlag = NodeStateFlag.NO_FLAG
def is_down_state(self):
return self.state == NodeStateBase.DOWN or bool(self.flags & _DOWN_FLAGS)
def __str__(self):
info = [self.state.name]
if self.flags:
info += ["+", str(self.flags)]
return "".join(info)
def print_short(self):
if flag_prio := self.flags.short_name:
state = flag_prio
elif self.flags & self.flags.DRAIN:
if self.state is self.state.IDLE:
state = "drng"
else:
state = "drain"
else:
state = self.state.short_name
if sigil := self.flags.sigil:
state += sigil
return state
@dataclass(frozen=True)
class GPU:
"""
Describes _an amount_ of GPU (allocated or used)
"""
name: str
count: int
used: int = None
def __str__(self):
if self.used is not None:
return self.print_used()
return f"{self.name}:{self.count}" if self.name else f"{self.count}"
def __add__(self, other):
if isinstance(other, type(self)):
return self.count + other.count
elif isinstance(other, int):
return self.count + other
return NotImplemented
__radd__ = __add__
def print_used(self, do_color=False):
gpu_string_usage = f"{self.used or 0}/{self.count}"
if self.used == self.count and do_color:
gpu_string_usage = red(gpu_string_usage)
return f"{gpu_string_usage} ({self.name})"
def print_free(self) -> str:
free = self.count - (self.used or 0)
if free > 0:
return f"{free} ({self.name})"
else:
return ""
@staticmethod
def find_in_tres(tres):
if not tres:
return []
assert isinstance(tres, OrderedDict)
gpus = []
# reverse over tres, most specific at end.
for key, value in reversed(tres.items()):
if key.startswith("gres/gpu:"): # specific gpus requested or allocated
gpus.append(GPU(name=key[len("gres/gpu:"):], count=int(value)))
elif key == "gres/gpu": # overall gpus requested or allocated
if len(gpus): # if specific ones are already there, make sure count is right
assert sum(gpus) == int(value)
else:
gpus.append(AnyGPU(count=int(value)))
else: # we're beyond GPUs at the moment
break
return gpus
@staticmethod
def consume(gpus, gpus_used):
"""
Take a list of gpus and do the logical subtraction of 'used' gpus.
"""
def _pop_or_none(l, fun):
""" find item in l based on callable predicate fun
removes item if found, and returns it; otherwise returns None"""
for idx, itm in enumerate(l):
if fun(itm):
l.pop(idx)
return itm
if not (gpus or gpus_used):
return gpus
gpus_used = gpus_used[:]
return [
GPU(gpu.name, gpu.count, int(gpu.used or 0) + used_gpu.count) \
if (used_gpu := _pop_or_none(gpus_used, lambda u_gpu: u_gpu.name == gpu.name)) \
else gpu \
for gpu in gpus
]
def AnyGPU(count=1):
return GPU(name=None, count=count)
# cached "now", so all jobs compare aganinst the same time.
def _now(__cache=[]):
# This uses the fact that default arguments are MUTABLE
if not len(__cache):
now = datetime.now()
now -= timedelta(microseconds=now.microsecond) # truncate to seconds
__cache.append(now)
return __cache[0]
COMMAND_EXPAND_NODELIST="scontrol show hostnames"
# expand a nodelist like 'cn[020-400]' to individual nodes via scontrol
def _expand_node_list(in_list):
if not in_list: return ""
hosts = get_command_output(COMMAND_EXPAND_NODELIST.split() + [",".join(in_list)])
return [node.strip() for node in hosts.strip().split("\n")]
def _g(m):
"""make gigabyte from megabyte"""
return m >> 10
def _day_str(td: timedelta):
"""report timedelta days as words"""
return "%2d day%s" % (lambda n: (n, abs(n) != 1 and "s" or ""))(td.days)
@dataclass
class Job:
"""
Describes the properties of a single job
"""
job_id: int
user_id: int = None
user_name: str = ""
job_name: str = ""
job_state: JobState = None
state_reason: str = None
run_time: timedelta = field(init=False)
time_limit: timedelta = None
submit_time: datetime = None
start_time: datetime = None
end_time: datetime = None
suspend_time: datetime = None
pre_sus_time: timedelta = None
partition: List[str] = None
required_nodes: List[str] = None
nodes: List[str] = None
cpus: number = None
memory_mb: number = None
gpus_requested: List[GPU] = None
gpus_allocated: List[GPU] = None
# objectifier hooks
additional_time_fields = ("last_sched_evaluation",)
timedelta_fields = ("pre_sus_time",)
target_object = "jobs"
def __post_init__(self):
self.run_time = self._run_time()
def _run_time(self):
""" see scontrol/info_job.c:_sprint_job_info "Line 6" of Slurm """
if self.job_state == JobState.PENDING or not self.start_time:
return timedelta(0)
elif self.job_state == JobState.SUSPENDED:
return self.pre_sus_time
else: # started
# finished or running
if self.job_state == JobState.RUNNING or not self.end_time :
end = _now()
else:
end = self.end_time
if self.suspend_time:
return (end - self.suspend_time) + self.pre_sus_time
else:
return end - self.start_time
def run_time_pretty(self):
if not self.run_time:
return ""
elif self.run_time.seconds:
return str(self.run_time)
else:
return _day_str(self.run_time)
def as_list(self, detailed=False):
"""
Return some job attributes as a list, to be displayed in a table. The correct headers are returned by get_header
:param detailed: Detailed (True) or default version with fewer columns (False)
:return: List of attributes describing this job
"""
gpu_info = ",".join(map(str, self.gpus_allocated))
if gpu_info == "" and len(self.gpus_requested):
gpu_info = str(len(self.gpus_requested))
memory_info = f'{_g(self.memory_mb)}'
if detailed:
nodes_reason = ""
if self.nodes:
nodes_reason = ",".join(self.nodes)
elif self.state_reason:
nodes_reason = f"({self.state_reason})"
limit = str(self.time_limit) if self.time_limit is not timedelta.max else "∞"
return [f"{self.job_id}", ",".join(self.partition), f"{self.job_name:.20}", nodes_reason, f"{self.user_id}",
self.user_name, self.job_state.name, f"{self.run_time}", limit, f"{self.cpus}", memory_info, gpu_info]
else:
time_info = self.run_time_pretty()
return [f"{self.job_id}", ",".join(self.partition), ",".join(self.nodes), self.user_name,
self.job_state.value, time_info, f"{self.cpus}", memory_info, gpu_info]
@staticmethod
def get_header(detailed=False):
"""
Returns the header row for the table output. To be used in combination with job.as_list.
:param detailed: Headers for the detailed (True) or the default (False) version
:return: The table headers corresponding to the list returned by as_list
"""
if detailed:
return ("JOBID", "PARTITION", "NAME", "NODELIST(REASON)", "UID", "USER", "STATE", "TIME", "LIMIT", "CPUS",
"MEM (GB)", "GPUS")
else:
return ("JOBID", "PARTITION", "NODES", "USER", "ST", "TIME", "CPUS", "MEM", "GPUS")
@staticmethod
def try_object(obj):
if "job_state" not in obj:
return False
# lookup state
obj["job_state"] = JobState[obj["job_state"]]
obj["job_name"] = obj["name"]
# fix reason
if obj["state_reason"] == "None":
obj["state_reason"] = None
# listify
obj.force_list_of_strings("required_nodes")
obj.force_list_of_strings("nodes", value=obj["job_resources"]["nodes"] if obj["job_resources"] else [])
obj.force_list_of_strings("partition")
# fix timelimit:
obj["time_limit"] = timedelta(minutes=obj["time_limit"]) if not math.isinf(obj["time_limit"]) else timedelta.max
obj["gpus_allocated"] = GPU.find_in_tres(obj["tres_alloc_str"])
obj["gpus_requested"] = GPU.find_in_tres(obj["tres_req_str"])
# see how the memory is config'ed
obj["memory_mb"] = None
try: # Catch if somewhere an infinity slipped in.
if obj["memory_per_cpu"] is not None:
obj["memory_mb"] = int(obj["memory_per_cpu"] * obj["cpus"])
elif obj["memory_per_node"] is not None:
obj["memory_mb"] = int(obj["memory_per_node"] * obj["node_count"])
except OverflowError:
pass
return True
Job.json_attribs = set(map(attrgetter("name"), fields(Job)))
@dataclass
class Node:
"""
Describes properties and state of a node
"""
name: str
cores: int
cpus: int
sockets: int
threads: int
real_memory: int
architecture: str = None
cluster_name: str = ""
partitions: List[Union[str, "Partition"]] = field(default=None, repr=False)
features: List[str] = None
active_features: List[str] = None
comment: str = None
reason: str = ""
reason_set_by_user: str = ""
reservation: str = None
gpus: List[GPU] = None
node_state: NodeState = None
free_mem: int = None
specialized_memory: int = None
alloc_memory: int = None
alloc_cpus: int = None
cpu_load: int = None
boot_time: datetime = None
resume_after: timedelta = None
# objectifier hooks
additional_time_fields = ("last_busy",)
timedelta_fields = ()
target_object = "nodes"
# convenience interface
@property
def cpus_total(self): return self.cpus
@property
def cpus_used(self): return self.alloc_cpus
@property
def cpus_free(self): return self.cpus - self.alloc_cpus
@property
def memory_total(self): return self.real_memory - (self.specialized_memory or 0)
@property
def memory_used(self): return self.alloc_memory
@property
def memory_free(self): return self.free_mem
@property
def partition_names(self): return [str(p) for p in self.partitions]
def __str__(self): return self.name
# comparison
def __lt__(self, other):
if not isinstance(other, type(self)):
raise TypeError
return self.name < other.name
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.name == other.name
def __hash__(self):
return hash(self.name)
def update_partition(self, partition):
# replace partition _string_ with partition _object_ of same name if there.
if partition.name in self.partitions:
self.partitions = [partition if str(p) == str(partition) else p for p in self.partitions]
def update_partitions(self, partitions):
try:
for partition in partitions:
self.update_partition(partition)
except TypeError: # if only one given, no iterable
self.update_partition(partitions)
@staticmethod
def _find_state(obj):
obj.force_list_of_strings("state")
base_state = NodeStateBase.UNKNOWN
flags = NodeStateFlag.NO_FLAG
for obj_state in obj["state"]:
try:
base_state = NodeStateBase[obj_state]
continue
except KeyError:
pass
try:
new_flag = NodeStateFlag[obj_state]
flags = new_flag if not flags else flags | new_flag
continue
except KeyError:
pass
# Special case Mixed vs ALLOCATED
if base_state == NodeStateBase.ALLOCATED and (
obj.get("alloc_cpus", 0) < obj.get("cpus", 0)):
base_state = NodeStateBase.MIXED
return NodeState(base_state, flags)
@staticmethod
def try_object(obj):
# good marker for a Node
if "next_state_after_reboot" not in obj:
return False
# lookup state
obj["node_state"] = Node._find_state(obj)
obj["name"] = obj["name"]
# listify
obj.force_list_of_strings("partitions")
obj.force_list_of_strings("features")
obj.force_list_of_strings("active_features")
# gpus
gpus = GPU.find_in_tres(obj["tres"])
gpus_used = GPU.find_in_tres(obj["tres_used"])
obj["gpus"] = GPU.consume(gpus, gpus_used)
return True
def is_node_down(self) -> bool:
return self.node_state.is_down_state()
def get_used_res_strings(self, admin_mode=False) -> List[str]:
"""
This function can be used to return all the data of a node as list of entries, which can then be printed
in a table.
:return: List of "columns" (name, partition, etc. + used/total values for resources), to be displayed as
a line in a table
"""
state_info = self.node_state.print_short() if not admin_mode else str(self.node_state)
if self.is_node_down():
state_info = red(state_info)
cpu_info = f"{self.cpus_used: >3}/{self.cpus_total: >3}"
if self.cpus_used == self.cpus_total:
cpu_info = red(cpu_info)
memory_info = f"{_g(self.memory_used): >4}/{_g(self.memory_total): >4} G"
if self.memory_used == self.memory_total:
memory_info = red(memory_info)
gpu_info = ", ".join(gpu.print_used(True) for gpu in self.gpus)
partition_info = ",".join(p.info_name for p in self.partitions)
return [partition_info, self.name, state_info, cpu_info, memory_info, gpu_info]
def get_free_res_strings(self) -> Optional[List[str]]:
"""
This function can be used to return all the data of a usable node as list of entries, which can then be printed
in a table.
:return: List of "columns" (name, partition, etc. + "free" values for resources), to be displayed as
a line in a table, OR none if the node can't be used right now (no CPUs/memory available)
"""
if self.is_node_down():
return
if not self.cpus_free:
return
cpu_info = f"{self.cpus_free: >4}"
if not self.memory_free:
return
memory_info = f"{_g(self.memory_free): >4} G"
gpu_info = ", ".join([gpu.print_free() for gpu in self.gpus])
partition_info = ",".join(p.info_name for p in self.partitions)
return [partition_info, self.name, cpu_info, memory_info, gpu_info]
Node.json_attribs = set(map(attrgetter("name"), fields(Node)))
@dataclass
class Partition:
"""
This class stores some details about a partition
"""
name: str
default: bool = False
hidden: bool = False
time_limit: timedelta = timedelta.max
default_time: timedelta = None
nodes: List[Union[str, "Node"]] = field(default=None, repr=False)
# objectifier hooks
additional_time_fields = ()
timedelta_fields = ()
target_object = "partitions"
def __str__(self): return self.name
# comparison
def __lt__(self, other):
if not isinstance(other, type(self)):
raise TypeError
return (self.default and not other.default) or (self.name < other.name)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return (self.default is other.default) and (self.name == other.name)
def __hash__(self):
return hash((self.default, self.name))
def is_default(self) -> bool:
return self.default
def update_addtitional(self, partition_dict):
self.default = (partition_dict.get("Default", "NO") != "NO")
self.hidden = (partition_dict.get("Hidden", "NO") != "NO")
def update_nodes(self, node_list):
# update us
self.nodes = [node for node in node_list if self.name in node.partitions]
# update them
for node in self.nodes:
node.update_partitions(self)
@property
def info_name(self):
return self.name + "*" if self.default else self.name
def time_limit_pretty(self):
if self.time_limit is timedelta.max:
return "∞"
elif self.time_limit.seconds:
return str(self.time_limit)
else:
return _day_str(self.time_limit)
def default_time_pretty(self):
return str(self.default_time) if self.default_time else ""
@staticmethod
def try_object(obj):
if "node_sets" not in obj: # good partition identifier
return False
# fix timelimit:
if tl := obj["maximums"]["time"]:
obj["time_limit"] = timedelta(minutes=tl) if not math.isinf(tl) else timedelta.max
if dl := obj["defaults"]["time"]:
obj["default_time"] = timedelta(minutes=dl) if not math.isinf(dl) else timedelta.max
# we handle that later
if "nodes" in obj:
del obj["nodes"]
return True
Partition.json_attribs = set(map(attrgetter("name"), fields(Partition)))
@dataclass
class Reservation:
"""
Stores the most important information about a pending reservation
"""
name: str
start_time: datetime
end_time: datetime
node_list: str = field(default=None, repr=False)
partition: Union[str, "Partition"] = field(default=None, repr=False)
flags: List[str] = None
# objectifier hooks
additional_time_fields = ()
timedelta_fields = ()
target_object = "reservations"
def __str__(self): return self.name
@staticmethod
def try_object(obj):
if not "purge_completed" in obj: # good partition identifier
return False
obj.force_list_of_strings("node_list")
# expand foo[01-20]
obj["node_list"] = _expand_node_list(obj["node_list"])
obj.force_list_of_strings("flags")
return True
def is_future(self):
return self.start_time > _now()
def eta(self):
if self.is_future():
return self.start_time - _now()
else:
return timedelta(0)
@property
def node_names(self):
return [str(node) for node in self.node_list]
def update_node(self, node):
# replace partition _string_ with node _object_ of same name if there.
if node.name in self.node_list:
self.node_list = [node if str(n) == str(node) else n for n in self.node_list]
def update_nodes(self, nodes):
try:
for node in nodes:
self.update_node(node)
except TypeError: # if only one given, no iterable
self.update_node(nodes)
def print_pending_pretty(self):
limits = []
for node in self.node_list:
if isinstance(node, Node):
for partition in node.partitions:
if not partition.hidden:
limits.append(partition.time_limit)
max_time = max(limits)
if self.eta() > max_time:
# reservation not near enough, ignore
return
start_in = f" (starting in '{slurm_time(self.eta())}')" if self.is_future() else ""
line = "[MAINTENANCE] " if "MAINT" in self.flags else ""
line += f"There is a reservation '{self.name}' between {self.start_time}{start_in} and " \
f"{self.end_time}. "
line += f"The following nodes are reserved and cannot be used during that time: {', '.join(self.node_names)}. "
return line
Reservation.json_attribs = set(map(attrgetter("name"), fields(Reservation)))
#
class Objectifier:
"""
Objectifier is a callable object that transforms slurm scontrol json into Job and GPU objects.
"""
def __init__(self, target):
self.target = target
self.obj = None
class Done(Exception):
def __init__(self, value):
self.value = value
# simple access to obj
def __getitem__(self, key):
return self.obj[key]
def __setitem__(self, key, value):
self.obj[key] = value
def __delitem__(self, key):
del self.obj[key]
def __contains__(self, item):
try:
return item in self.obj
except:
return False
# emulate a dict
def items(self):
return self.obj.items()
def keys(self):
return self.obj.keys()
def get(self, key, default=None):
return self.obj.get(key, default)
def __call__(self, obj):
""" process obj (a dict) coming from json.loads """
try:
self.obj = obj
# improve datatypes
self.timify()
self.listify()
self.dictify()
# try to match objects; these are ok to raise Done when done
self.try_value()
self.try_meta()
# callback
self.try_object()
except self.Done as done:
return done.value
else:
return self.obj
finally:
self.obj = None
def _ify(self, predicate, transform):
"""
transform all entries with transformer iff predicated matches for any of them
"""
def _t(k, v):
if predicate(k, v):
return transform(k, v)
else:
return k, v
if any(predicate(k, v) for k,v in self.items()):
self.obj = dict(_t(key, value) for key, value in self.items())
def timify(self):
""" make time objects from POSIX stamps but treat 0 as 'absent' """
def _transform(k,v):
if k in self.target.timedelta_fields:
return k, timedelta(seconds=v)
elif v and v > 0:
return k, datetime.fromtimestamp(v)
return k, None
self._ify(
predicate=lambda k, v: k.endswith("_time") or k in self.target.additional_time_fields,
transform=_transform)
def listify(self):
""" make comma-separated strings into lists """
self._ify(
predicate=lambda k, v: k != "name" and isinstance(v, str) and "," in v,
transform=lambda k, v: (k, v.split(",")))
def dictify(self):
""" make (ordered) dicts from into lists """
def _predicate(k, v):
return isinstance(v, list) and \
len(v) and \
all([isinstance(ea, str) and ea.count("=") == 1 for ea in v])
self._ify(
predicate=_predicate,
transform=lambda k, v: (k, OrderedDict(map(methodcaller("split", "="), v))))
def try_value(self):
""" slurm uses a funny serialization for maybe set/maybe infinte values
map to None, inf, or value.
"""
if self.keys() == {"set","infinite","number"}:
if self["infinite"]:
raise self.Done(float("inf"))
elif not self["set"]:
raise self.Done(None)
else:
raise self.Done(self["number"])
def force_list_of_strings(self, attrib, value=None):
"""
make sure attrib is a (maybe empty) list of strings, even if it was a simple string before
"""
if not value:
value = self[attrib]
if not isinstance(value, list):
assert isinstance(value, str)
value = [value] if value != "" else [] # allow empty
assert all([isinstance(each, str) for each in value])
self[attrib] = value
def try_meta(self):
"""
Handle the outermost.
treat errors and/or warnings and return the rest
"""
meta = {"meta", "errors", "warnings"}
if meta.intersection(self.keys()) == meta:
for warning in self["warnings"]: # might be empty
print(f"Warning: {warning['description']}", file=sys.stderr)
if len(self["errors"]): # hopefully not.
raise Exception([error["description"] for error in self["errors"]])
# we're interested in the Jobs
raise self.Done(self[self.target.target_object])
def try_object(self):
if self.target.try_object(self):
# assemble
relevant = {k: v for k, v in self.items() if k in self.target.json_attribs}
# Done in this case
raise self.Done(self.target(**relevant))
#
def get_command_output(command):
"""
Run a command on the system and return the output
:param command: Command to run
:return: Output stdout of that command
"""
try:
proc = subprocess.run(command, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as cpe:
cmd = " ".join(command)
raise Exception(f'command "{cmd}" failed ({cpe.returncode}) with "{cpe.output}"') from cpe
else:
return proc.stdout
def get_structured(json_source, thing):
"""
Parse a json returned by 'scontrol show SOMETHING --json'
:param json_source: Raw JSON from scontrol
:return: List of Things parsed.
"""
return json.loads(json_source, object_hook=Objectifier(thing))
def get_additional_partition_data(slurm_oneline_source):
"""
Parse a slurm oneline string to dict of dicts.
This is because the json output for partitions is incomplete as of yet
"""
parts = {}
for part_line in slurm_oneline_source.strip().split("\n"):
part = {}
for entry in part_line.split(" "):
k, v = entry.split("=", 1)
part[k] = v
parts[part["PartitionName"]] = part
return parts
def slurm_time(td: timedelta) -> str:
"""
Format :param td: as Slurm-like time, which can then be supplied to a Slurm
command with the '-t'/'--time' parameter.
Specifically, days, if present are separated by '-' from time.
:param td: Timedelta to be formatted
:return: A string in the format "d-HH:MM:SS
"""
if td.total_seconds() < 0:
return "<ERROR>"
ret = ""
if td.days > 0:
ret = str(td.days) + "-"
td -= timedelta(days=td.days)
ret += str(td or 0)
return ret
#
if __name__ == "__main__":
nodes_list = get_command_output("scontrol show nodes --json".split())
nodes = get_structured(nodes_list, Node)
from pprint import pprint
pprint(nodes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment