Last active
September 29, 2025 22:45
-
-
Save IceWreck/75ed349b8226668d9c32be771afafdc2 to your computer and use it in GitHub Desktop.
New K8s User Generator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import subprocess | |
| import os | |
| import sys | |
| import argparse | |
| import base64 | |
| import logging | |
| import textwrap | |
| import time | |
| from pathlib import Path | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class KubeconfigGenerator: | |
| GROUP = "user-creation-script-user" # group for the user's certificate | |
| def __init__( | |
| self, server_url: str, cluster_name: str, username: str, output_file: Path | |
| ): | |
| self.server_url = server_url | |
| self.cluster_name = cluster_name | |
| self.username = username | |
| self.kubeconfig_output_file = output_file | |
| # intermediate files | |
| output_dir_for_intermediates = self.kubeconfig_output_file.parent | |
| self.key_file = output_dir_for_intermediates / f"{self.username}.key" | |
| self.csr_file = output_dir_for_intermediates / f"{self.username}.csr" | |
| self.cert_file = output_dir_for_intermediates / f"{self.username}.crt" | |
| self.csr_yaml_file = output_dir_for_intermediates / f"{self.username}-csr.yaml" | |
| os.environ["PATH"] = os.environ.get("PATH", "") + os.pathsep + "/usr/local/bin" | |
| @staticmethod | |
| def _run_command(command: list[str]) -> subprocess.CompletedProcess[str]: | |
| """Executes a shell command and handles errors.""" | |
| try: | |
| logger.info(f"executing: {' '.join(command)}") | |
| result = subprocess.run( | |
| command, check=True, capture_output=True, text=True, timeout=30 | |
| ) | |
| return result | |
| except FileNotFoundError: | |
| logger.error( | |
| f"command not found: {command[0]}. Is openssl installed and in your PATH?" | |
| ) | |
| sys.exit(1) | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"error executing command: {' '.join(command)}") | |
| logger.error(f"return code: {e.returncode}") | |
| logger.error(f"stdout: {e.stdout}") | |
| logger.error(f"stderr: {e.stderr}") | |
| sys.exit(1) | |
| except subprocess.TimeoutExpired: | |
| logger.error(f"command timed out: {' '.join(command)}") | |
| sys.exit(1) | |
| @staticmethod | |
| def _read_file_b64(path: Path) -> str: | |
| """Reads a file and returns its base64 encoded content.""" | |
| with open(path, "rb") as f: | |
| return base64.b64encode(f.read()).decode("utf-8") | |
| def generate_kubeconfig(self) -> str: | |
| """Generates the kubeconfig content and returns it as a string.""" | |
| logger.warning( | |
| "this script requires kubectl to be configured with privileges to approve certificate signing requests." | |
| ) | |
| logger.info(f"creating output directory: {self.kubeconfig_output_file.parent}") | |
| self.kubeconfig_output_file.parent.mkdir(parents=True, exist_ok=True) | |
| # create private key and signing request | |
| logger.info("generating private key") | |
| self._run_command(["openssl", "genrsa", "-out", str(self.key_file), "2048"]) | |
| logger.info("creating certificate signing request (csr)") | |
| csr_subject = f"/CN={self.username}/O={self.GROUP}" | |
| self._run_command( | |
| [ | |
| "openssl", | |
| "req", | |
| "-new", | |
| "-key", | |
| str(self.key_file), | |
| "-out", | |
| str(self.csr_file), | |
| "-subj", | |
| csr_subject, | |
| ] | |
| ) | |
| # ensure CSR doesn't already exist | |
| csr_name = self.username | |
| logger.info(f"deleting existing csr '{csr_name}' if it exists") | |
| self._run_command( | |
| ["kubectl", "delete", "csr", csr_name, "--ignore-not-found=true"] | |
| ) | |
| # create CSR | |
| logger.info(msg="creating csr in kubernetes") | |
| with open(self.csr_file, "rb") as f: | |
| csr_content_b64 = base64.b64encode(f.read()).decode("utf-8") | |
| csr_yaml = textwrap.dedent( | |
| f""" | |
| apiVersion: certificates.k8s.io/v1 | |
| kind: CertificateSigningRequest | |
| metadata: | |
| name: {csr_name} | |
| spec: | |
| request: {csr_content_b64} | |
| signerName: kubernetes.io/kube-apiserver-client | |
| usages: | |
| - client auth | |
| """ | |
| ) | |
| logger.info(f"writing csr yaml to {self.csr_yaml_file}") | |
| with open(self.csr_yaml_file, "w") as f: | |
| f.write(csr_yaml) | |
| self._run_command(["kubectl", "apply", "-f", str(self.csr_yaml_file)]) | |
| # approve CSR | |
| logger.info(f"approving csr: {csr_name}") | |
| self._run_command(["kubectl", "certificate", "approve", csr_name]) | |
| logger.info("fetching the signed certificate") | |
| for _ in range(10): # poll for up to 10 seconds | |
| encoded_cert: str = self._run_command( | |
| [ | |
| "kubectl", | |
| "get", | |
| "csr", | |
| csr_name, | |
| "-o", | |
| "jsonpath={.status.certificate}", | |
| ] | |
| ).stdout.strip() | |
| if encoded_cert: | |
| break | |
| time.sleep(1) | |
| else: | |
| logger.error("timed out waiting for certificate to be issued.") | |
| sys.exit(1) | |
| cert_data = base64.b64decode(encoded_cert) | |
| with open(self.cert_file, "wb") as f: | |
| f.write(cert_data) | |
| logger.info("using provided cluster information") | |
| logger.info(f"server url: {self.server_url}") | |
| logger.info(f"cluster name: {self.cluster_name}") | |
| logger.info(f"username: {self.username}") | |
| logger.info("fetching cluster ca from current kubectl context") | |
| ca_data = self._run_command( | |
| [ | |
| "kubectl", | |
| "config", | |
| "view", | |
| "--raw", | |
| "--minify", | |
| "-o", | |
| "jsonpath={.clusters[0].cluster.certificate-authority-data}", | |
| ] | |
| ).stdout.strip() | |
| if not ca_data: | |
| logger.error( | |
| "Failed to obtain cluster CA data from current kubectl context." | |
| ) | |
| sys.exit(1) | |
| client_cert_data = self._read_file_b64(self.cert_file) | |
| client_key_data = self._read_file_b64(self.key_file) | |
| logger.info("assembling the kubeconfig file") | |
| kubeconfig = textwrap.dedent(f""" | |
| apiVersion: v1 | |
| kind: Config | |
| clusters: | |
| - cluster: | |
| certificate-authority-data: {ca_data} | |
| server: {self.server_url} | |
| name: {self.cluster_name} | |
| contexts: | |
| - context: | |
| cluster: {self.cluster_name} | |
| user: {self.username} | |
| name: {self.username}@{self.cluster_name} | |
| current-context: {self.username}@{self.cluster_name} | |
| users: | |
| - name: {self.username} | |
| user: | |
| client-certificate-data: {client_cert_data} | |
| client-key-data: {client_key_data} | |
| """) | |
| logger.info("cleaning up intermediate files (csr and csr yaml)") | |
| self.csr_file.unlink() | |
| self.csr_yaml_file.unlink() | |
| self.cert_file.unlink() | |
| self.key_file.unlink() | |
| logger.info("done") | |
| return kubeconfig | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Create a kubeconfig for a k8s user. This script generates a user certificate that allows authentication, but does NOT grant cluster-admin privileges by default. You will need to manually create a ClusterRoleBinding to grant specific permissions.", | |
| formatter_class=argparse.RawTextHelpFormatter, | |
| ) | |
| parser.add_argument( | |
| "--server-url", | |
| required=True, | |
| help="the base URL of the Kubernetes API server (e.g., https://opxyz.example.com:6443)", | |
| ) | |
| parser.add_argument( | |
| "--cluster-name", | |
| required=True, | |
| help="the name for the cluster in the kubeconfig (e.g., colo-staging-1)", | |
| ) | |
| parser.add_argument( | |
| "--username", required=True, help="the username for the new user" | |
| ) | |
| parser.add_argument( | |
| "--output-file", | |
| type=Path, | |
| default=None, | |
| help="path to the output kubeconfig file, defaults to <username>.kubeconfig", | |
| ) | |
| args = parser.parse_args() | |
| if args.output_file is None: | |
| default_output_dir = Path(os.getcwd()) | |
| args.output_file = default_output_dir / f"{args.username}.kubeconfig" | |
| generator = KubeconfigGenerator( | |
| args.server_url, args.cluster_name, args.username, args.output_file | |
| ) | |
| kubeconfig_content = generator.generate_kubeconfig() | |
| # Write the kubeconfig file | |
| with open(generator.kubeconfig_output_file, "w") as f: | |
| f.write(kubeconfig_content) | |
| logger.info(f"kubeconfig file created at: {generator.kubeconfig_output_file}") | |
| logger.info( | |
| "to grant this user cluster-admin privileges, run the following command on your cluster:" | |
| ) | |
| logger.info( | |
| f"kubectl create clusterrolebinding {generator.username}-cluster-admin --clusterrole=cluster-admin --user={generator.username}" | |
| ) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Tiny CLI script for creating new users in a Kubernetes cluster. It automates the process of generating a user-specific kubeconfig file with the necessary client certificates for authentication.
How it works:
All you need is kubectl access somewhere. There are no deps besides python stdlib so you can literally copy this to the master node, use its kubeconfig and create the first user there.
Perfect for small clusters where you dont want to a heavy platform (like Rancher Manager) for user authentication.
Note: this does not create a rolebinding for you. After running this script you should run something like
and instead of cluser-admin you can also use a custom role or cluster role.