-
-
Save aivarsk/2b26854c956e36fdfd73349586f2b168 to your computer and use it in GitHub Desktop.
TigerBeetle as a file storage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import argparse | |
| import mmap | |
| import os | |
| from itertools import batched | |
| import tigerbeetle as tb | |
| client = tb.ClientSync(cluster_id=0, replica_addresses=os.getenv("TB_ADDRESS", "3000")) | |
| FILE = 73 | |
| BULK = 8_000 | |
| def create_a_file(filename, size): | |
| filename = filename.encode() | |
| if len(filename) > 16: | |
| raise ValueError("Invalid filename, more than 16 bytes") | |
| account = tb.Account( | |
| id=int.from_bytes(filename), | |
| user_data_64=size, | |
| user_data_32=len(filename), | |
| ledger=FILE, | |
| code=FILE, | |
| ) | |
| errors = client.create_accounts([account]) | |
| if errors: | |
| raise ValueError(errors[0]) | |
| return account | |
| def get_a_file(filename): | |
| if len(filename) > 16: | |
| raise ValueError("Invalid filename, more than 16 bytes") | |
| return client.lookup_accounts([int.from_bytes(filename.encode())])[0] | |
| def upload_a_file(file_id, filename, verbose): | |
| try: | |
| system_id = get_a_file(".").id | |
| except: | |
| system_id = create_a_file(".", 0).id | |
| if verbose: | |
| print() | |
| transfers = [] | |
| written = 0 | |
| with open(filename, "rb") as f: | |
| mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) | |
| for batch in batched(mm, n=28): | |
| block = b"".join(batch) | |
| transfers.append( | |
| tb.Transfer( | |
| id=tb.id(), | |
| debit_account_id=system_id, | |
| credit_account_id=file_id, | |
| amount=len(block), | |
| user_data_128=int.from_bytes(block[:16]), | |
| user_data_64=int.from_bytes(block[16:24]), | |
| user_data_32=int.from_bytes(block[24:]), | |
| ledger=FILE, | |
| code=FILE, | |
| ) | |
| ) | |
| written += len(block) | |
| if len(transfers) >= BULK: | |
| errors = client.create_transfers(transfers) | |
| if errors: | |
| raise RuntimeError(errors) | |
| if verbose: | |
| print(f"\r{written} bytes written", flush=True, end="") | |
| transfers = [] | |
| if transfers: | |
| errors = client.create_transfers(transfers) | |
| if errors: | |
| raise RuntimeError(errors) | |
| if verbose: | |
| print(f"\r{written} bytes written", flush=True, end="") | |
| def download_a_file(file_id, filename, verbose): | |
| timestamp_min = 0 | |
| if verbose: | |
| print() | |
| written = 0 | |
| with open(filename, "wb") as f: | |
| while True: | |
| transfers = client.get_account_transfers( | |
| tb.AccountFilter( | |
| account_id=file_id, flags=tb.AccountFilterFlags.CREDITS, limit=BULK, timestamp_min=timestamp_min | |
| ) | |
| ) | |
| for transfer in transfers: | |
| written += transfer.amount | |
| timestamp_min = transfer.timestamp | |
| if transfer.amount < 16: | |
| f.write(transfer.user_data_128.to_bytes(transfer.amount)) | |
| break | |
| else: | |
| f.write(transfer.user_data_128.to_bytes(16)) | |
| transfer.amount -= 16 | |
| if transfer.amount < 8: | |
| f.write(transfer.user_data_64.to_bytes(transfer.amount)) | |
| break | |
| else: | |
| f.write(transfer.user_data_64.to_bytes(8)) | |
| transfer.amount -= 8 | |
| if transfer.amount < 4: | |
| f.write(transfer.user_data_32.to_bytes(transfer.amount)) | |
| else: | |
| f.write(transfer.user_data_32.to_bytes(4)) | |
| if len(transfers) < BULK: | |
| if verbose: | |
| print(f"\r{written} bytes written", flush=True, end="") | |
| break | |
| timestamp_min += 1 | |
| if verbose: | |
| print(f"\r{written} bytes written", flush=True, end="") | |
| parser = argparse.ArgumentParser(prog="tbcp", formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
| parser.add_argument("-v", "--verbose", help="verbose", action="store_true") | |
| parser.add_argument("source", help="Soure filesystem or TigerBeetle file") | |
| parser.add_argument("destination", help="Destination filesystem or TigerBeetle file") | |
| args = parser.parse_args() | |
| if args.source.startswith("tb:") and not args.destination.startswith("tb:"): | |
| file = get_a_file(args.source[3:]) | |
| if file.credits_posted != file.user_data_64: | |
| raise RuntimeError(f"{args.source} is incomplete: {file.credits_posted} < {file.user_data_64}") | |
| download_a_file(file.id, args.destination, args.verbose) | |
| elif not args.source.startswith("tb:") and args.destination.startswith("tb:"): | |
| file = create_a_file(args.destination[3:], os.path.getsize(args.source)) | |
| upload_a_file(file.id, args.source, args.verbose) | |
| else: | |
| parser.print_help() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment