Last active
June 26, 2019 15:19
-
-
Save carbonin/0a189a4bdf45e180e71f3f8e4bc54c9e to your computer and use it in GitHub Desktop.
AWX encrypt / vmdb decrypt test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require 'openssl' | |
| require 'base64' | |
| class Fernet256 | |
| attr_reader :hmac, :cipher, :signing_key, :encryption_key | |
| InvalidToken = Class.new(ArgumentError) | |
| def initialize(key) | |
| decoded_key = Base64.urlsafe_decode64(key) | |
| if decoded_key.size != 64 | |
| raise ArgumentError, "Fernet key must be 64 url-safe base64-encoded bytes." | |
| end | |
| @signing_key = decoded_key[0,32] | |
| @encryption_key = decoded_key[32,32] | |
| @cipher = OpenSSL::Cipher::AES256.new(:CBC) | |
| @hmac = OpenSSL::HMAC.new signing_key, OpenSSL::Digest::SHA256.new | |
| end | |
| def decrypt(token) | |
| data = Base64.urlsafe_decode64(token) | |
| verify data | |
| cipher.decrypt | |
| cipher.iv = data[9,16] | |
| cipher.key = encryption_key | |
| ciphertext = data[25..-33] | |
| decrypted = cipher.update ciphertext | |
| decrypted << cipher.final | |
| end | |
| private | |
| def verify(data) | |
| hmac << data[0..-33] | |
| signature = data[-32..-1] | |
| return if hmac.digest == signature | |
| raise InvalidToken | |
| end | |
| end | |
| class AnsibleDecrypt | |
| attr_reader :encryption_key, :encrypted_data | |
| def initialize(secret_key, value, field_name, primary_key) | |
| @encryption_key = get_encryption_key(secret_key, field_name, primary_key) | |
| @encrypted_data = parse_raw_data(value) | |
| end | |
| def decrypt | |
| Fernet256.new(encryption_key).decrypt(encrypted_data).chomp | |
| end | |
| private | |
| def get_encryption_key(secret, field, pk=nil) | |
| key_hash = OpenSSL::Digest::SHA512.new | |
| key_hash << secret | |
| key_hash << pk if pk | |
| key_hash << field | |
| Base64.urlsafe_encode64(key_hash.digest) | |
| end | |
| def parse_raw_data(value) | |
| raw_data = value[11..-1] | |
| raw_data = raw_data[5..-1] if raw_data.start_with?('UTF8$') | |
| algorithm, base64_data = raw_data.split('$', 2) | |
| if algorithm != 'AESCBC' | |
| raise Fernet256::InvalidToken, "unsupported algorithm: #{algorithm}" | |
| end | |
| Base64.decode64(base64_data) | |
| end | |
| end | |
| require 'awesome_spawn' | |
| require 'securerandom' | |
| def run_test(random_utf8) | |
| field_name = SecureRandom.alphanumeric(10) | |
| secret_key = SecureRandom.hex(16) | |
| primary_key = SecureRandom.random_number(2147483647).to_s # upper limit of integers in PG (awx primary key is and integer) | |
| length = SecureRandom.random_number(4000) # roughly the length of an rsa private key | |
| value = random_utf8.lazy.grep(/[[:print:]]/).first(length).join.force_encoding('utf-8') | |
| encrypted = AwesomeSpawn.run!("python3", :params => ["ansible_encrypt.py"], :env => {"FIELD_NAME" => field_name, "VALUE" => value, "SECRET_KEY" => secret_key, "PRIMARY_KEY" => primary_key}).output | |
| decrypted = AnsibleDecrypt.new(secret_key, encrypted, field_name, primary_key).decrypt.force_encoding('utf-8') | |
| raise "Decrypted value #{decrypted} does not match original value #{value}" unless decrypted == value.force_encoding('utf-8') | |
| end | |
| # Generator for a string of random unicode characters of random length for a value | |
| # Mostly from https://stackoverflow.com/questions/52654509/random-generate-a-valid-unicode-character-in-ruby | |
| random_utf8 = Enumerator.new do |yielder| | |
| loop do | |
| yielder << rand(0x10000).chr('UTF-8') | |
| rescue RangeError | |
| end | |
| end | |
| i = 0 | |
| loop do | |
| run_test(random_utf8) | |
| print("\r#{i} ") | |
| i+=1 | |
| end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import base64 | |
| import hashlib | |
| import os | |
| from cryptography.fernet import Fernet | |
| from cryptography.hazmat.backends import default_backend | |
| from django.utils.encoding import smart_str, smart_bytes | |
| class Fernet256(Fernet): | |
| '''Not techincally Fernet, but uses the base of the Fernet spec and uses AES-256-CBC | |
| instead of AES-128-CBC. All other functionality remain identical. | |
| ''' | |
| def __init__(self, key, backend=None): | |
| if backend is None: | |
| backend = default_backend() | |
| key = base64.urlsafe_b64decode(key) | |
| if len(key) != 64: | |
| raise ValueError( | |
| "Fernet key must be 64 url-safe base64-encoded bytes." | |
| ) | |
| self._signing_key = key[:32] | |
| self._encryption_key = key[32:] | |
| self._backend = backend | |
| def get_encryption_key(secret_key, field_name, pk=None): | |
| h = hashlib.sha512() | |
| h.update(smart_bytes(secret_key)) | |
| if pk is not None: | |
| h.update(smart_bytes(str(pk))) | |
| h.update(smart_bytes(field_name)) | |
| return base64.urlsafe_b64encode(h.digest()) | |
| def encrypt_field(secret_key, field_name, pk, value): | |
| value = smart_str(value) | |
| if value.startswith('$encrypted$'): | |
| return value | |
| key = get_encryption_key(secret_key, field_name, pk) | |
| f = Fernet256(key) | |
| encrypted = f.encrypt(smart_bytes(value)) | |
| b64data = smart_str(base64.b64encode(encrypted)) | |
| tokens = ['$encrypted', 'UTF8', 'AESCBC', b64data] | |
| return '$'.join(tokens) | |
| field_name = os.environ["FIELD_NAME"] | |
| value = os.environ["VALUE"] | |
| secret_key = os.environ["SECRET_KEY"] | |
| primary_key = os.environ["PRIMARY_KEY"] | |
| print(encrypt_field(secret_key, field_name, primary_key, value), end='') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment