Skip to content

Instantly share code, notes, and snippets.

@chadgeary
Created June 3, 2021 13:55
Show Gist options
  • Select an option

  • Save chadgeary/b9266b758d6bd359596562275c0ce0d2 to your computer and use it in GitHub Desktop.

Select an option

Save chadgeary/b9266b758d6bd359596562275c0ce0d2 to your computer and use it in GitHub Desktop.
ca.py
import boto3
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat import backends
from cryptography import x509
from cryptography.x509.oid import NameOID
import datetime
import json
import os
def lambda_handler(event, context):
now = datetime.datetime.now()
expire_date = now + datetime.timedelta(days=3650)
s3 = boto3.resource('s3')
http = urllib3.PoolManager()
files = ['nifi/certificates/nifi-cert.pem','nifi/certificates/nifi-key.key','nifi/certificates/admin/admin_cert.pem','nifi/certificates/admin/private_key.pem', ]
s3_object = list(s3.Bucket(os.environ['BUCKET']).objects.filter(Prefix='nifi/certificates/nifi-key.key'))
if len(s3_object) > 0
print('CA found, skipping.')
else:
print('CA not found, generating.')
# Valid dates
now = datetime.datetime.now()
expire_date = now + datetime.timedelta(days=3650)
print(expire_date)
# CA
ca_private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=backends.default_backend()
)
ca_public_key = ca_private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"NIFICA"),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"NIFICA"),
]))
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(expire_date)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(ca_public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True)
ca_certificate = builder.sign(
private_key=ca_private_key, algorithm=hashes.SHA256(), backend=backends.default_backend()
)
ca_private_bytes = ca_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
ca_public_bytes = ca_certificate.public_bytes(
encoding=serialization.Encoding.PEM)
# ADMIN
admin_private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=backends.default_backend()
)
admin_public_key = admin_private_key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"NIFI"),
x509.NameAttribute(NameOID.COMMON_NAME, u"admin")
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"NIFICA"),
]))
builder = builder.not_valid_before(now)
builder = builder.not_valid_after(expire_date)
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(admin_public_key)
admin_certificate = builder.sign(
private_key=ca_private_key, algorithm=hashes.SHA256(), backend=backends.default_backend()
)
private_bytes = admin_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
public_bytes = admin_certificate.public_bytes(
encoding=serialization.Encoding.PEM)
# UPLOAD
for prefixes in files:
s3.meta.client.upload_fileobj(
http.request('GET', urls[key], preload_content=False),
os.environ['BUCKET'],
key,
ExtraArgs={'ServerSideEncryption':'aws:kms','SSEKMSKeyId':os.environ['KEY']})
print(key + ' put to s3.')
return {
'statusCode': 200,
'body': json.dumps('Complete')
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment