Created
January 31, 2026 08:53
-
-
Save zavolo/efa07496095d70425857cb73db48558f to your computer and use it in GitHub Desktop.
shindows
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import json | |
| import subprocess | |
| import re | |
| import sys | |
| import shutil | |
| import ctypes | |
| import urllib.request | |
| import urllib.error | |
| def get_callback_url(gateway): | |
| if not gateway: | |
| return None | |
| parts = gateway.split('.') | |
| if len(parts) != 4: | |
| return None | |
| if parts[3] == '1': | |
| if parts[2] == '0': | |
| callback_ip = f"{parts[0]}.{parts[1]}.{parts[2]}.2" | |
| elif parts[2] == '3': | |
| callback_ip = f"{parts[0]}.{parts[1]}.{parts[2]}.22" | |
| else: | |
| return None | |
| else: | |
| return None | |
| return f"http://{callback_ip}:5000/vm/callback" | |
| def find_config_drive(): | |
| for drive in range(ord('A'), ord('Z') + 1): | |
| root = f"{chr(drive)}:\\" | |
| if os.path.exists(os.path.join(root, "openstack", "latest", "meta_data.json")): | |
| return root | |
| return None | |
| def get_admin_name(): | |
| try: | |
| cmd = 'powershell -Command "(Get-LocalUser | Where-Object {$_.SID -like \'S-1-5-*-500\'}).Name"' | |
| name = subprocess.check_output(cmd, shell=True).decode('cp866').strip() | |
| return name if name else "Администратор" | |
| except: | |
| return "Администратор" | |
| def eject_config_drive(): | |
| try: | |
| drive = subprocess.check_output('wmic logicaldisk where VolumeName="config-2" get Caption', shell=True).decode('cp866') | |
| drive_letter = ''.join(filter(str.isalpha, drive.split('\n')[1] if len(drive.split('\n')) > 1 else '')) | |
| if drive_letter: | |
| ctypes.windll.WINMM.mciSendStringW(f"open {drive_letter}: type cdaudio alias d_drive", None, 0, None) | |
| ctypes.windll.WINMM.mciSendStringW("set d_drive door open", None, 0, None) | |
| except: | |
| pass | |
| def cleanup_scripts(): | |
| scripts_path = r"C:\Program Files\Cloudbase Solutions\Cloudbase-Init\LocalScripts" | |
| try: | |
| if os.path.exists(scripts_path): | |
| for item in os.listdir(scripts_path): | |
| item_path = os.path.join(scripts_path, item) | |
| try: | |
| if os.path.isfile(item_path): | |
| os.remove(item_path) | |
| elif os.path.isdir(item_path): | |
| shutil.rmtree(item_path) | |
| except: | |
| pass | |
| except: | |
| pass | |
| def send_status(callback_url, vm_id, status, message=""): | |
| if not callback_url: | |
| return | |
| try: | |
| data = json.dumps({ | |
| 'vm_id': vm_id, | |
| 'status': status, | |
| 'message': message | |
| }).encode('utf-8') | |
| req = urllib.request.Request( | |
| callback_url, | |
| data=data, | |
| headers={'Content-Type': 'application/json'}, | |
| method='POST' | |
| ) | |
| urllib.request.urlopen(req, timeout=5) | |
| except: | |
| pass | |
| def main(): | |
| vm_id = None | |
| callback_url = None | |
| try: | |
| drive = find_config_drive() | |
| if not drive: | |
| sys.exit(0) | |
| meta_path = os.path.join(drive, "openstack", "latest", "meta_data.json") | |
| network_path = os.path.join(drive, "openstack", "content", "0000") | |
| gateway = None | |
| if os.path.exists(network_path): | |
| with open(network_path, 'r') as f: | |
| net_data = f.read() | |
| gw = re.search(r'gateway\s+([0-9.]+)', net_data) | |
| if gw: | |
| gateway = gw.group(1) | |
| callback_url = get_callback_url(gateway) | |
| if os.path.exists(meta_path): | |
| with open(meta_path, 'r') as f: | |
| meta = json.load(f) | |
| vm_id = meta.get('uuid') or meta.get('instance_id') | |
| send_status(callback_url, vm_id, 'configuring', 'Начало настройки') | |
| admin_pass = meta.get('admin_pass') | |
| if admin_pass: | |
| name = get_admin_name() | |
| subprocess.run(f'net user "{name}" {admin_pass} /active:yes', shell=True) | |
| subprocess.run(f'powershell -Command "Set-LocalUser -Name \'{name}\' -PasswordNeverExpires $true"', shell=True) | |
| if os.path.exists(network_path): | |
| with open(network_path, 'r') as f: | |
| net_data = f.read() | |
| ip = re.search(r'address\s+([0-9.]+)', net_data) | |
| mask = re.search(r'netmask\s+([0-9.]+)', net_data) | |
| gw = re.search(r'gateway\s+([0-9.]+)', net_data) | |
| dns = re.search(r'dns-nameservers\s+(.+)', net_data) | |
| if ip and mask: | |
| cidr = sum(bin(int(x)).count('1') for x in mask.group(1).split('.')) | |
| gw_cmd = f"-DefaultGateway {gw.group(1)}" if gw else "" | |
| ps_net = f""" | |
| $iface = Get-NetAdapter | Where-Object {{$_.Status -eq 'Up'}} | Select-Object -First 1 | |
| if ($iface) {{ | |
| New-NetIPAddress -InterfaceIndex $iface.ifIndex -IPAddress {ip.group(1)} -PrefixLength {cidr} {gw_cmd} -Confirm:$false | |
| if ('{dns.group(1) if dns else ""}') {{ | |
| $dns_servers = '{dns.group(1) if dns else ""}'.Split(' ') | Where-Object {{$_ -match '^\d'}} | |
| Set-DnsClientServerAddress -InterfaceIndex $iface.ifIndex -ServerAddresses $dns_servers | |
| }} | |
| }} | |
| """ | |
| subprocess.run(['powershell', '-Command', ps_net], shell=True) | |
| eject_config_drive() | |
| cleanup_scripts() | |
| send_status(callback_url, vm_id, 'ready', 'Настройка завершена') | |
| except Exception as e: | |
| if vm_id and callback_url: | |
| send_status(callback_url, vm_id, 'error', str(e)) | |
| raise | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment