Created
July 13, 2025 12:35
-
-
Save AliKhadivi/62b63dbb9a655f99ecfbb12642613218 to your computer and use it in GitHub Desktop.
Downloading large docker images with high speed downloader ( wget2 must be installed! )
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import sys | |
| import gzip | |
| from io import BytesIO | |
| import json | |
| import hashlib | |
| import shutil | |
| import requests | |
| import tarfile | |
| import subprocess | |
| import urllib3 | |
| urllib3.disable_warnings() | |
| if len(sys.argv) != 2 : | |
| print('Usage:\n\tdocker_pull.py [registry/][repository/]image[:tag|@digest]\n') | |
| exit(1) | |
| # Look for the Docker image to download | |
| repo = 'library' | |
| tag = 'latest' | |
| # sys.argv.append("alpine") | |
| imgparts = sys.argv[1].split('/') | |
| try: | |
| img,tag = imgparts[-1].split('@') | |
| except ValueError: | |
| try: | |
| img,tag = imgparts[-1].split(':') | |
| except ValueError: | |
| img = imgparts[-1] | |
| # Docker client doesn't seem to consider the first element as a potential registry unless there is a '.' or ':' | |
| if len(imgparts) > 1 and ('.' in imgparts[0] or ':' in imgparts[0]): | |
| registry = imgparts[0] | |
| repo = '/'.join(imgparts[1:-1]) | |
| else: | |
| registry = 'registry-1.docker.io' | |
| if len(imgparts[:-1]) != 0: | |
| repo = '/'.join(imgparts[:-1]) | |
| else: | |
| repo = 'library' | |
| repository = '{}/{}'.format(repo, img) | |
| # Get Docker authentication endpoint when it is required | |
| auth_url='https://auth.docker.io/token' | |
| reg_service='registry.docker.io' | |
| resp = requests.get('https://{}/v2/'.format(registry), verify=False) | |
| if resp.status_code == 401: | |
| auth_url = resp.headers['WWW-Authenticate'].split('"')[1] | |
| try: | |
| reg_service = resp.headers['WWW-Authenticate'].split('"')[3] | |
| except IndexError: | |
| reg_service = "" | |
| # Get Docker token (this function is useless for unauthenticated registries like Microsoft) | |
| def get_auth_head(type): | |
| resp = requests.get('{}?service={}&scope=repository:{}:pull'.format(auth_url, reg_service, repository), verify=False) | |
| access_token = resp.json()['token'] | |
| auth_head = {'Authorization':'Bearer '+ access_token, 'Accept': type} | |
| return auth_head | |
| # Docker style progress bar | |
| def progress_bar(ublob, nb_traits): | |
| sys.stdout.write('\r' + ublob[7:19] + ': Downloading [') | |
| for i in range(0, nb_traits): | |
| if i == nb_traits - 1: | |
| sys.stdout.write('>') | |
| else: | |
| sys.stdout.write('=') | |
| for i in range(0, 49 - nb_traits): | |
| sys.stdout.write(' ') | |
| sys.stdout.write(']') | |
| sys.stdout.flush() | |
| # Fetch manifest v2 and get image layer digests | |
| if resp.status_code != 200: | |
| print('[*] Fallback: Attempting to fetch manifest list for multi-arch support') | |
| auth_head = get_auth_head('application/vnd.docker.distribution.manifest.list.v2+json') | |
| resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{tag}', headers=auth_head, verify=False) | |
| if resp.status_code != 200: | |
| print(f'[-] Cannot fetch manifest list for {repository}:{tag} [HTTP {resp.status_code}]') | |
| print(resp.content) | |
| exit(1) | |
| manifest_list = resp.json() | |
| # Auto-select linux/amd64 | |
| amd64_digest = None | |
| for manifest in manifest_list.get("manifests", []): | |
| platform = manifest.get("platform", {}) | |
| if platform.get("os") == "linux" and platform.get("architecture") == "amd64": | |
| amd64_digest = manifest["digest"] | |
| break | |
| if not amd64_digest: | |
| print("[-] No suitable linux/amd64 manifest found in manifest list.") | |
| exit(1) | |
| print(f'[+] Selected linux/amd64 manifest: {amd64_digest}') | |
| # Now fetch the actual image manifest | |
| auth_head = get_auth_head('application/vnd.docker.distribution.manifest.v2+json') | |
| resp = requests.get(f'https://{registry}/v2/{repository}/manifests/{amd64_digest}', headers=auth_head, verify=False) | |
| if resp.status_code != 200: | |
| print(f'[-] Failed to fetch selected image manifest [HTTP {resp.status_code}]') | |
| exit(1) | |
| # From here on, resp is guaranteed to be the actual image manifest | |
| layers = resp.json()['layers'] | |
| # Create tmp folder that will hold the image | |
| imgdir = 'tmp_{}_{}'.format(img, tag.replace(':', '@')) | |
| if os.path.exists(imgdir): | |
| shutil.rmtree(imgdir) | |
| os.mkdir(imgdir) | |
| print('Creating image structure in: ' + imgdir) | |
| config = resp.json()['config']['digest'] | |
| confresp = requests.get('https://{}/v2/{}/blobs/{}'.format(registry, repository, config), headers=auth_head, verify=False) | |
| file = open('{}/{}.json'.format(imgdir, config[7:]), 'wb') | |
| file.write(confresp.content) | |
| file.close() | |
| content = [{ | |
| 'Config': config[7:] + '.json', | |
| 'RepoTags': [ ], | |
| 'Layers': [ ] | |
| }] | |
| if len(imgparts[:-1]) != 0: | |
| content[0]['RepoTags'].append('/'.join(imgparts[:-1]) + '/' + img + ':' + tag) | |
| else: | |
| content[0]['RepoTags'].append(img + ':' + tag) | |
| empty_json = '{"created":"1970-01-01T00:00:00Z","container_config":{"Hostname":"","Domainname":"","User":"","AttachStdin":false, \ | |
| "AttachStdout":false,"AttachStderr":false,"Tty":false,"OpenStdin":false, "StdinOnce":false,"Env":null,"Cmd":null,"Image":"", \ | |
| "Volumes":null,"WorkingDir":"","Entrypoint":null,"OnBuild":null,"Labels":null}}' | |
| # Build layer folders | |
| parentid='' | |
| for layer in layers: | |
| ublob = layer['digest'] | |
| fake_layerid = hashlib.sha256((parentid+'\n'+ublob+'\n').encode('utf-8')).hexdigest() | |
| layerdir = imgdir + '/' + fake_layerid | |
| os.mkdir(layerdir) | |
| # Create VERSION file | |
| with open(layerdir + '/VERSION', 'w') as f: | |
| f.write('1.0') | |
| print(f'{ublob[7:19]}: Downloading via aria2c...') | |
| blob_url = f'https://{registry}/v2/{repository}/blobs/{ublob}' | |
| headers = get_auth_head('application/vnd.docker.distribution.manifest.v2+json') | |
| auth_header = headers['Authorization'] | |
| wget2_cmd = [ | |
| 'wget2', | |
| '--header', f'Authorization: {auth_header}', | |
| '--check-certificate=off', | |
| '--no-check-certificate', | |
| f'--output-document={layerdir}/layer_gzip.tar', | |
| '--progress', 'bar', | |
| '--verbose', | |
| '--timeout=60', | |
| '--max-threads=7', | |
| blob_url | |
| ] | |
| # print(' '.join(wget2_cmd)) | |
| result = subprocess.run(wget2_cmd) | |
| if result.returncode != 0: | |
| print(f'ERROR: wget2 failed for {ublob[7:19]}') | |
| print(result) | |
| # print(result.stderr.decode()) | |
| shutil.rmtree(imgdir) | |
| exit(1) | |
| print(f'{ublob[7:19]}: Extracting...') | |
| with gzip.open(f'{layerdir}/layer_gzip.tar', 'rb') as gz_in: | |
| with open(f'{layerdir}/layer.tar', 'wb') as tar_out: | |
| shutil.copyfileobj(gz_in, tar_out) | |
| os.remove(f'{layerdir}/layer_gzip.tar') | |
| print(f'{ublob[7:19]}: Pull complete') | |
| content[0]['Layers'].append(fake_layerid + '/layer.tar') | |
| # Create layer json | |
| with open(f'{layerdir}/json', 'w') as f: | |
| if layers[-1]['digest'] == layer['digest']: | |
| json_obj = json.loads(confresp.content) | |
| json_obj.pop('history', None) | |
| json_obj.pop('rootfs', None) | |
| json_obj.pop('rootfS', None) | |
| else: | |
| json_obj = json.loads(empty_json) | |
| json_obj['id'] = fake_layerid | |
| if parentid: | |
| json_obj['parent'] = parentid | |
| parentid = json_obj['id'] | |
| f.write(json.dumps(json_obj)) | |
| file = open(imgdir + '/manifest.json', 'w') | |
| file.write(json.dumps(content)) | |
| file.close() | |
| if len(imgparts[:-1]) != 0: | |
| content = { '/'.join(imgparts[:-1]) + '/' + img : { tag : fake_layerid } } | |
| else: # when pulling only an img (without repo and registry) | |
| content = { img : { tag : fake_layerid } } | |
| file = open(imgdir + '/repositories', 'w') | |
| file.write(json.dumps(content)) | |
| file.close() | |
| # Create image tar and clean tmp folder | |
| docker_tar = repo.replace('/', '_') + '_' + img + '.tar' | |
| sys.stdout.write("Creating archive...") | |
| sys.stdout.flush() | |
| tar = tarfile.open(docker_tar, "w") | |
| tar.add(imgdir, arcname=os.path.sep) | |
| tar.close() | |
| shutil.rmtree(imgdir) | |
| print('\rDocker image pulled: ' + docker_tar) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment