Skip to content

Instantly share code, notes, and snippets.

@typenoob
Last active October 30, 2025 03:31
Show Gist options
  • Select an option

  • Save typenoob/7598e5b6639191ef9af7fea874f1c694 to your computer and use it in GitHub Desktop.

Select an option

Save typenoob/7598e5b6639191ef9af7fea874f1c694 to your computer and use it in GitHub Desktop.
华为交换机TLS连接限制
- sal中需包含 RSA+SHA256
- cipher中需包含 AES256-SHA256:AES256-SHA:AES128-SHA中的一个
- max_sal=18
@typenoob
Copy link
Author

typenoob commented Oct 30, 2025

from dataclasses import dataclass
import os
import re
import requests
from typing import Optional, Dict, Any
from urllib.parse import parse_qs
import ssl
import urllib.request
import requests
from urllib3.util.ssl_ import create_urllib3_context
import warnings

# 禁用 SSL 警告
warnings.filterwarnings('ignore', message='Unverified HTTPS request')

DriverSkipError = Exception("Driver skip error")
DriverNotMatchError = Exception("Driver not match error")
DriverAuthError = Exception("Driver auth error")


class CustomSSLContext(requests.adapters.HTTPAdapter):
    def init_poolmanager(self, *args, **kwargs):
        # 创建 SSL 上下文
        context = create_urllib3_context()

        # 设置 TLS 版本
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
        context.set_server_sigalgs(
            "RSA+SHA384:RSA+SHA512:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:DSA+SHA256:RSA+SHA256")
        context.set_ciphers("AES256-SHA256:AES256-SHA:AES128-SHA")
        context.max_version = ssl.TLSVersion.TLSv1_2
        context.minimum_version = ssl.TLSVersion.TLSv1_2
        context.check_hostname = False
        context.verify_mode = ssl.CERT_NONE

        kwargs['ssl_context'] = context
        return super().init_poolmanager(*args, **kwargs)


@dataclass
class DeviceConfig:
    username: str
    password: str
    payload: str
    login_uri: str
    config_uri: str
    test_uri: str


class HWClient:
    def __init__(self):
        self.name = "HUAWEI"
        self.ip = None
        self.token = None
        self.session = None
        self.config = DeviceConfig(
            username=os.getenv("USERNAME", ""),
            password=os.getenv("PASSWORD", ""),
            payload="UserName={}&Password={}&Edition=0",
            login_uri="https://{}/login.cgi?_=0.5615007049300567",
            config_uri="https://{}/config.cgi?_=0.05967055919149211",
            test_uri="https://{}/simple/style/default/image/a.png"
        )
        self._init_session()

    def _init_session(self):
        """初始化 requests session"""
        self.session = requests.Session()
        self.session.mount('https://', CustomSSLContext())
        # 配置 session 选项
        self.session.verify = False  # 忽略 SSL 证书验证
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0",
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8"
        })
        # 设置超时
        self.session.request = lambda method, url, **kwargs: requests.Session.request(
            self.session, method, url, **kwargs
        )

    def init(self, args: Dict[str, Any]) -> None:
        self.ip = args["ip"]

        # 这里假设 _log 模块的功能,需要根据实际情况调整
        # if entry, exists := _log.GetEntryByIP(c.ip); exists && entry.Code == 0:
        #     return DriverSkipError

        if self.test_connection() is not None:
            return DriverNotMatchError

        self.auth()
        if not self.token:
            print(f"密码错误,交换机ip:{self.ip}")
            return DriverAuthError

        return None

    def destroy(self):
        """清理资源"""
        if self.session:
            self.session.close()
            self.session = None

    def auth(self):
        """认证获取token"""
        url = self.config.login_uri.format(self.ip)
        payload = self.config.payload.format(self.config.username, self.config.password)

        try:
            response = self.session.post(
                url,
                data=payload,
                headers={
                    "Content-Type": "application/x-www-form-urlencoded; text/xml; charset=UTF-8"
                }
            )

            if response.status_code == 200:
                # 解析token
                params = parse_qs(response.text)
                self.token = params.get("Token", [None])[0]
                print(f"认证成功,Token: {self.token}")
            else:
                self.token = None
                print(f"认证失败,HTTP状态码: {response.status_code}")

        except requests.exceptions.RequestException as e:
            print(f"认证请求失败: {e}")
            self.token = None

    def test_connection(self) -> Optional[Exception]:
        """测试连接"""
        url = self.config.test_uri.format(self.ip)

        try:
            # 使用更短的超时时间进行连接测试
            response = self.session.get(url, timeout=3)
            print(f"连接测试成功,HTTP状态码: {response.status_code}")
            if response.status_code == 200:
                return None
            else:
                return Exception("driver not match")

        except requests.exceptions.Timeout:
            print("连接测试超时")
            return Exception("connection timeout")
        except requests.exceptions.RequestException as e:
            print(f"连接测试失败: {e}")
            return Exception("connection failed")

    def get_host_name(self) -> Optional[str]:
        """获取主机名"""
        if not self.token:
            print("未认证,无法获取主机名")
            return None

        url = self.config.config_uri.format(self.ip)

        payload = '''MessageID=292&<rpc message-id="292" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
        <edit-config operation="merge">
        <target><running/></target><error-option>stop-on-error</error-option>
        <config><featurename istop="true" type="cli"><display>current-configuration | include sysname</display></featurename></config>
        </edit-config>
        </rpc>]]>]]>'''

        try:
            response = self.session.post(
                url,
                data=payload,
                headers={
                    "Content-Type": "application/x-www-form-urlencoded; text/xml; charset=UTF-8",
                    "Token": self.token
                },
                cookies={"UserName": self.config.username}
            )

            if response.status_code == 200:
                # 更新 token
                token_match = re.search(r'Token=([A-Za-z0-9]+)', response.text)
                if token_match:
                    self.token = token_match.group(1)

                host_name = self.extract_name_from_response(response.text)
                return host_name
            else:
                print(f"获取主机名失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            print(f"获取主机名失败: {e}")
            return None

    def get_device_name(self) -> Optional[str]:
        """获取设备名"""
        if not self.token:
            print("未认证,无法获取设备名")
            return None

        url = self.config.config_uri.format(self.ip)

        payload = '''MessageID=293&<rpc message-id="293" xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
        <edit-config operation="merge">
        <target><running/></target><error-option>stop-on-error</error-option>
        <config><featurename istop="true" type="cli"><display>version | include Switch</display></featurename></config>
        </edit-config>
        </rpc>]]>]]>'''

        try:
            response = self.session.post(
                url,
                data=payload,
                headers={
                    "Content-Type": "application/x-www-form-urlencoded; text/xml; charset=UTF-8",
                    "Token": self.token
                },
                cookies={"UserName": self.config.username}
            )

            if response.status_code == 200:
                # 更新 token
                token_match = re.search(r'Token=([A-Za-z0-9]+)', response.text)
                if token_match:
                    self.token = token_match.group(1)

                device_name = self.extract_name_from_response(response.text)
                return device_name
            else:
                print(f"获取设备名失败,HTTP状态码: {response.status_code}")
                return None

        except requests.exceptions.RequestException as e:
            print(f"获取设备名失败: {e}")
            return None

    def get_name(self) -> str:
        return self.name

    @staticmethod
    def extract_name_from_response(response_text: str) -> str:
        lines = response_text.strip().split('\n')

        if not lines:
            raise ValueError("提取主机名或设备类型失败!响应内容为空")

        # 取最后一行
        last_line = lines[-1].strip()

        # 用空格分割
        words = last_line.split()

        # 检查是否有至少2个单词
        if len(words) >= 2:
            return words[1]  # 返回第二个单词
        else:
            raise ValueError("提取主机名或设备类型失败!最后一行格式不正确")

    def __enter__(self):
        """支持上下文管理器"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出上下文时清理资源"""
        self.destroy()


# 注册函数(如果需要保持类似的注册机制)
def register_client():
    """注册客户端工厂函数"""
    return HWClient()


if __name__ == "__main__":
    os.environ['USERNAME'] = 'admin'
    os.environ['PASSWORD'] = 'P@ssw5rd'

    # 使用上下文管理器确保资源清理
    with HWClient() as client:
        # 使用示例
        result = client.init({"ip": "192.168.128.19"})
        if result is None:
            print("初始化成功")
            host_name = client.get_host_name()
            device_name = client.get_device_name()
            print(f"主机名: {host_name}")
            print(f"设备名: {device_name}")
        else:
            print(f"初始化失败: {result}")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment