Created
November 21, 2025 03:58
-
-
Save nneul/60f7f6f66efdd673724a0da6456c8bdd to your computer and use it in GitHub Desktop.
Example on publishing cert into vcenter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/local/certmgmt/.venv/bin/python | |
| import sys | |
| import argparse | |
| import json | |
| import ssl | |
| import requests | |
| from requests.auth import HTTPBasicAuth | |
| from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
| import OpenSSL.crypto | |
| __author__ = "nneul" | |
| def main(): | |
| """Set up vault environment, retrieve and convert cert and key, install on vcenter appliance""" | |
| # Set up logger | |
| log = Logger() | |
| # SNIP LOCAL STUFF RE VAULT | |
| # Silence whining errors about invalid certs | |
| requests.packages.urllib3.disable_warnings(InsecureRequestWarning) | |
| # | |
| # Retrieve the cert/key contents - grab this from certbot/lego files instead | |
| # | |
| vault_secret = vault.get_secret(args.vault_path) | |
| try: | |
| private_key = vault_secret["private_key"] | |
| certificate = vault_secret["certificate"] | |
| chain = vault_secret["chain"] | |
| full_chain = vault_secret["full_chain"] | |
| except BaseException: | |
| raise Exception("Unable to extract secret info from vault") | |
| # | |
| # Check to see if we actually need to replace cert | |
| # | |
| new_cert_parsed = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, certificate) | |
| new_cert_dumped = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, new_cert_parsed) | |
| host_cert_raw = ssl.get_server_certificate((args.name, 443)) | |
| host_cert_parsed = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, host_cert_raw) | |
| host_cert_dumped = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, host_cert_parsed) | |
| if new_cert_dumped == host_cert_dumped: | |
| if args.force: | |
| log.info("force replacing unchanged cert") | |
| else: | |
| log.info("certificate already identical, no action required") | |
| sys.exit(0) | |
| else: | |
| log.info("certificate on host does not match, continuing with installation") | |
| # | |
| # Retrieve the vcenter sso creds | |
| # | |
| admin_secret = vault.get_secret(args.admin_creds) | |
| try: | |
| admin_user = admin_secret["username"] | |
| admin_pw = admin_secret["password"] | |
| except BaseException: | |
| raise Exception("Unable to extract vcenter sso admin secret from vault") | |
| url = "https://{name}/api/session".format(name=args.name) | |
| res = requests.post(url, auth=HTTPBasicAuth(admin_user, admin_pw), verify=False) | |
| sid = res.json() | |
| if not sid: | |
| raise Exception("Unable to get session id") | |
| # | |
| # Load certs for LetsEncrypt roots from locally comitted copies | |
| # | |
| with open("/local/certmgmt/etc/letsencrypt-isrg-x1-root.crt", "r") as tmp_fh: | |
| root_cert_x1 = "".join(tmp_fh.readlines()) | |
| # | |
| # Common url and headers for chain requests | |
| # | |
| url = "https://{name}/api/vcenter/certificate-management/vcenter/trusted-root-chains".format(name=args.name) | |
| headers = {"vmware-api-session-id": sid, "Content-Type": "application/json"} | |
| # | |
| # Install a cert chain that has X1 CA from LetsEncrypt | |
| # | |
| body = { | |
| "cert_chain": { | |
| "cert_chain": [ | |
| root_cert_x1, | |
| ] | |
| } | |
| } | |
| res = requests.post(url, headers=headers, data=json.dumps(body), verify=False) | |
| if not res.ok: | |
| raise Exception("Failed to install root_cert_x1 chain: %s" % res.content) | |
| else: | |
| print("chain install res = %s" % res.content) | |
| # | |
| # Install a cert chain from LetsEncrypt | |
| # | |
| body = { | |
| "cert_chain": { | |
| "cert_chain": [ | |
| chain, | |
| ] | |
| } | |
| } | |
| res = requests.post(url, headers=headers, data=json.dumps(body), verify=False) | |
| if not res.ok: | |
| raise Exception("Failed to install chain: %s" % res.content) | |
| else: | |
| print("chain install res = %s" % res.content) | |
| # | |
| # Install a cert chain that has both X1 root and intermediate CA's from LetsEncrypt, not sure this is actually needed | |
| # | |
| body = {"cert_chain": {"cert_chain": [root_cert_x1, chain]}} | |
| res = requests.post(url, headers=headers, data=json.dumps(body), verify=False) | |
| if not res.ok: | |
| raise Exception("Failed to install root_cert_x1+chain: %s" % res.content) | |
| else: | |
| print("chain install root_cert_x1+chain res = %s" % res.content) | |
| # | |
| # Install actual certificate | |
| # | |
| body = {"cert": vault_secret["certificate"], "key": vault_secret["private_key"], "root_cert": chain + root_cert_x1} | |
| url = "https://{name}/api/vcenter/certificate-management/vcenter/tls".format(name=args.name) | |
| res = requests.put(url, headers=headers, data=json.dumps(body), verify=False) | |
| if not res.ok: | |
| raise Exception("Failed to install certificate: %s" % res.content) | |
| else: | |
| print("cert install res = %s (expected to be empty): " % res.content) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment