Skip to content

Instantly share code, notes, and snippets.

@carbonin
Last active June 26, 2019 15:19
Show Gist options
  • Select an option

  • Save carbonin/0a189a4bdf45e180e71f3f8e4bc54c9e to your computer and use it in GitHub Desktop.

Select an option

Save carbonin/0a189a4bdf45e180e71f3f8e4bc54c9e to your computer and use it in GitHub Desktop.
AWX encrypt / vmdb decrypt test
require 'openssl'
require 'base64'
class Fernet256
attr_reader :hmac, :cipher, :signing_key, :encryption_key
InvalidToken = Class.new(ArgumentError)
def initialize(key)
decoded_key = Base64.urlsafe_decode64(key)
if decoded_key.size != 64
raise ArgumentError, "Fernet key must be 64 url-safe base64-encoded bytes."
end
@signing_key = decoded_key[0,32]
@encryption_key = decoded_key[32,32]
@cipher = OpenSSL::Cipher::AES256.new(:CBC)
@hmac = OpenSSL::HMAC.new signing_key, OpenSSL::Digest::SHA256.new
end
def decrypt(token)
data = Base64.urlsafe_decode64(token)
verify data
cipher.decrypt
cipher.iv = data[9,16]
cipher.key = encryption_key
ciphertext = data[25..-33]
decrypted = cipher.update ciphertext
decrypted << cipher.final
end
private
def verify(data)
hmac << data[0..-33]
signature = data[-32..-1]
return if hmac.digest == signature
raise InvalidToken
end
end
class AnsibleDecrypt
attr_reader :encryption_key, :encrypted_data
def initialize(secret_key, value, field_name, primary_key)
@encryption_key = get_encryption_key(secret_key, field_name, primary_key)
@encrypted_data = parse_raw_data(value)
end
def decrypt
Fernet256.new(encryption_key).decrypt(encrypted_data).chomp
end
private
def get_encryption_key(secret, field, pk=nil)
key_hash = OpenSSL::Digest::SHA512.new
key_hash << secret
key_hash << pk if pk
key_hash << field
Base64.urlsafe_encode64(key_hash.digest)
end
def parse_raw_data(value)
raw_data = value[11..-1]
raw_data = raw_data[5..-1] if raw_data.start_with?('UTF8$')
algorithm, base64_data = raw_data.split('$', 2)
if algorithm != 'AESCBC'
raise Fernet256::InvalidToken, "unsupported algorithm: #{algorithm}"
end
Base64.decode64(base64_data)
end
end
require 'awesome_spawn'
require 'securerandom'
def run_test(random_utf8)
field_name = SecureRandom.alphanumeric(10)
secret_key = SecureRandom.hex(16)
primary_key = SecureRandom.random_number(2147483647).to_s # upper limit of integers in PG (awx primary key is and integer)
length = SecureRandom.random_number(4000) # roughly the length of an rsa private key
value = random_utf8.lazy.grep(/[[:print:]]/).first(length).join.force_encoding('utf-8')
encrypted = AwesomeSpawn.run!("python3", :params => ["ansible_encrypt.py"], :env => {"FIELD_NAME" => field_name, "VALUE" => value, "SECRET_KEY" => secret_key, "PRIMARY_KEY" => primary_key}).output
decrypted = AnsibleDecrypt.new(secret_key, encrypted, field_name, primary_key).decrypt.force_encoding('utf-8')
raise "Decrypted value #{decrypted} does not match original value #{value}" unless decrypted == value.force_encoding('utf-8')
end
# Generator for a string of random unicode characters of random length for a value
# Mostly from https://stackoverflow.com/questions/52654509/random-generate-a-valid-unicode-character-in-ruby
random_utf8 = Enumerator.new do |yielder|
loop do
yielder << rand(0x10000).chr('UTF-8')
rescue RangeError
end
end
i = 0
loop do
run_test(random_utf8)
print("\r#{i} ")
i+=1
end
import base64
import hashlib
import os
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from django.utils.encoding import smart_str, smart_bytes
class Fernet256(Fernet):
'''Not techincally Fernet, but uses the base of the Fernet spec and uses AES-256-CBC
instead of AES-128-CBC. All other functionality remain identical.
'''
def __init__(self, key, backend=None):
if backend is None:
backend = default_backend()
key = base64.urlsafe_b64decode(key)
if len(key) != 64:
raise ValueError(
"Fernet key must be 64 url-safe base64-encoded bytes."
)
self._signing_key = key[:32]
self._encryption_key = key[32:]
self._backend = backend
def get_encryption_key(secret_key, field_name, pk=None):
h = hashlib.sha512()
h.update(smart_bytes(secret_key))
if pk is not None:
h.update(smart_bytes(str(pk)))
h.update(smart_bytes(field_name))
return base64.urlsafe_b64encode(h.digest())
def encrypt_field(secret_key, field_name, pk, value):
value = smart_str(value)
if value.startswith('$encrypted$'):
return value
key = get_encryption_key(secret_key, field_name, pk)
f = Fernet256(key)
encrypted = f.encrypt(smart_bytes(value))
b64data = smart_str(base64.b64encode(encrypted))
tokens = ['$encrypted', 'UTF8', 'AESCBC', b64data]
return '$'.join(tokens)
field_name = os.environ["FIELD_NAME"]
value = os.environ["VALUE"]
secret_key = os.environ["SECRET_KEY"]
primary_key = os.environ["PRIMARY_KEY"]
print(encrypt_field(secret_key, field_name, primary_key, value), end='')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment