Skip to content

Instantly share code, notes, and snippets.

@IceWreck
Last active September 29, 2025 22:45
Show Gist options
  • Select an option

  • Save IceWreck/75ed349b8226668d9c32be771afafdc2 to your computer and use it in GitHub Desktop.

Select an option

Save IceWreck/75ed349b8226668d9c32be771afafdc2 to your computer and use it in GitHub Desktop.
New K8s User Generator
#!/usr/bin/env python3
import subprocess
import os
import sys
import argparse
import base64
import logging
import textwrap
import time
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class KubeconfigGenerator:
GROUP = "user-creation-script-user" # group for the user's certificate
def __init__(
self, server_url: str, cluster_name: str, username: str, output_file: Path
):
self.server_url = server_url
self.cluster_name = cluster_name
self.username = username
self.kubeconfig_output_file = output_file
# intermediate files
output_dir_for_intermediates = self.kubeconfig_output_file.parent
self.key_file = output_dir_for_intermediates / f"{self.username}.key"
self.csr_file = output_dir_for_intermediates / f"{self.username}.csr"
self.cert_file = output_dir_for_intermediates / f"{self.username}.crt"
self.csr_yaml_file = output_dir_for_intermediates / f"{self.username}-csr.yaml"
os.environ["PATH"] = os.environ.get("PATH", "") + os.pathsep + "/usr/local/bin"
@staticmethod
def _run_command(command: list[str]) -> subprocess.CompletedProcess[str]:
"""Executes a shell command and handles errors."""
try:
logger.info(f"executing: {' '.join(command)}")
result = subprocess.run(
command, check=True, capture_output=True, text=True, timeout=30
)
return result
except FileNotFoundError:
logger.error(
f"command not found: {command[0]}. Is openssl installed and in your PATH?"
)
sys.exit(1)
except subprocess.CalledProcessError as e:
logger.error(f"error executing command: {' '.join(command)}")
logger.error(f"return code: {e.returncode}")
logger.error(f"stdout: {e.stdout}")
logger.error(f"stderr: {e.stderr}")
sys.exit(1)
except subprocess.TimeoutExpired:
logger.error(f"command timed out: {' '.join(command)}")
sys.exit(1)
@staticmethod
def _read_file_b64(path: Path) -> str:
"""Reads a file and returns its base64 encoded content."""
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def generate_kubeconfig(self) -> str:
"""Generates the kubeconfig content and returns it as a string."""
logger.warning(
"this script requires kubectl to be configured with privileges to approve certificate signing requests."
)
logger.info(f"creating output directory: {self.kubeconfig_output_file.parent}")
self.kubeconfig_output_file.parent.mkdir(parents=True, exist_ok=True)
# create private key and signing request
logger.info("generating private key")
self._run_command(["openssl", "genrsa", "-out", str(self.key_file), "2048"])
logger.info("creating certificate signing request (csr)")
csr_subject = f"/CN={self.username}/O={self.GROUP}"
self._run_command(
[
"openssl",
"req",
"-new",
"-key",
str(self.key_file),
"-out",
str(self.csr_file),
"-subj",
csr_subject,
]
)
# ensure CSR doesn't already exist
csr_name = self.username
logger.info(f"deleting existing csr '{csr_name}' if it exists")
self._run_command(
["kubectl", "delete", "csr", csr_name, "--ignore-not-found=true"]
)
# create CSR
logger.info(msg="creating csr in kubernetes")
with open(self.csr_file, "rb") as f:
csr_content_b64 = base64.b64encode(f.read()).decode("utf-8")
csr_yaml = textwrap.dedent(
f"""
apiVersion: certificates.k8s.io/v1
kind: CertificateSigningRequest
metadata:
name: {csr_name}
spec:
request: {csr_content_b64}
signerName: kubernetes.io/kube-apiserver-client
usages:
- client auth
"""
)
logger.info(f"writing csr yaml to {self.csr_yaml_file}")
with open(self.csr_yaml_file, "w") as f:
f.write(csr_yaml)
self._run_command(["kubectl", "apply", "-f", str(self.csr_yaml_file)])
# approve CSR
logger.info(f"approving csr: {csr_name}")
self._run_command(["kubectl", "certificate", "approve", csr_name])
logger.info("fetching the signed certificate")
for _ in range(10): # poll for up to 10 seconds
encoded_cert: str = self._run_command(
[
"kubectl",
"get",
"csr",
csr_name,
"-o",
"jsonpath={.status.certificate}",
]
).stdout.strip()
if encoded_cert:
break
time.sleep(1)
else:
logger.error("timed out waiting for certificate to be issued.")
sys.exit(1)
cert_data = base64.b64decode(encoded_cert)
with open(self.cert_file, "wb") as f:
f.write(cert_data)
logger.info("using provided cluster information")
logger.info(f"server url: {self.server_url}")
logger.info(f"cluster name: {self.cluster_name}")
logger.info(f"username: {self.username}")
logger.info("fetching cluster ca from current kubectl context")
ca_data = self._run_command(
[
"kubectl",
"config",
"view",
"--raw",
"--minify",
"-o",
"jsonpath={.clusters[0].cluster.certificate-authority-data}",
]
).stdout.strip()
if not ca_data:
logger.error(
"Failed to obtain cluster CA data from current kubectl context."
)
sys.exit(1)
client_cert_data = self._read_file_b64(self.cert_file)
client_key_data = self._read_file_b64(self.key_file)
logger.info("assembling the kubeconfig file")
kubeconfig = textwrap.dedent(f"""
apiVersion: v1
kind: Config
clusters:
- cluster:
certificate-authority-data: {ca_data}
server: {self.server_url}
name: {self.cluster_name}
contexts:
- context:
cluster: {self.cluster_name}
user: {self.username}
name: {self.username}@{self.cluster_name}
current-context: {self.username}@{self.cluster_name}
users:
- name: {self.username}
user:
client-certificate-data: {client_cert_data}
client-key-data: {client_key_data}
""")
logger.info("cleaning up intermediate files (csr and csr yaml)")
self.csr_file.unlink()
self.csr_yaml_file.unlink()
self.cert_file.unlink()
self.key_file.unlink()
logger.info("done")
return kubeconfig
def main():
parser = argparse.ArgumentParser(
description="Create a kubeconfig for a k8s user. This script generates a user certificate that allows authentication, but does NOT grant cluster-admin privileges by default. You will need to manually create a ClusterRoleBinding to grant specific permissions.",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--server-url",
required=True,
help="the base URL of the Kubernetes API server (e.g., https://opxyz.example.com:6443)",
)
parser.add_argument(
"--cluster-name",
required=True,
help="the name for the cluster in the kubeconfig (e.g., colo-staging-1)",
)
parser.add_argument(
"--username", required=True, help="the username for the new user"
)
parser.add_argument(
"--output-file",
type=Path,
default=None,
help="path to the output kubeconfig file, defaults to <username>.kubeconfig",
)
args = parser.parse_args()
if args.output_file is None:
default_output_dir = Path(os.getcwd())
args.output_file = default_output_dir / f"{args.username}.kubeconfig"
generator = KubeconfigGenerator(
args.server_url, args.cluster_name, args.username, args.output_file
)
kubeconfig_content = generator.generate_kubeconfig()
# Write the kubeconfig file
with open(generator.kubeconfig_output_file, "w") as f:
f.write(kubeconfig_content)
logger.info(f"kubeconfig file created at: {generator.kubeconfig_output_file}")
logger.info(
"to grant this user cluster-admin privileges, run the following command on your cluster:"
)
logger.info(
f"kubectl create clusterrolebinding {generator.username}-cluster-admin --clusterrole=cluster-admin --user={generator.username}"
)
if __name__ == "__main__":
main()
@IceWreck
Copy link
Author

IceWreck commented Sep 29, 2025

Tiny CLI script for creating new users in a Kubernetes cluster. It automates the process of generating a user-specific kubeconfig file with the necessary client certificates for authentication.

How it works:

  1. Generates a new private key for the user.
  2. Creates a Certificate Signing Request (CSR) and sends it to the Kubernetes API.
  3. Approves the CSR to get a signed client certificate.
  4. Assembles a ready-to-use kubeconfig file.

All you need is kubectl access somewhere. There are no deps besides python stdlib so you can literally copy this to the master node, use its kubeconfig and create the first user there.

Perfect for small clusters where you dont want to a heavy platform (like Rancher Manager) for user authentication.

Note: this does not create a rolebinding for you. After running this script you should run something like

kubectl create clusterrolebinding yourusername-cluster-admin --clusterrole=cluster-admin --user=yourusername

and instead of cluser-admin you can also use a custom role or cluster role.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment