Created
November 24, 2025 13:34
-
-
Save recoilme/f058657429300b583b13da92ecd6c4e0 to your computer and use it in GitHub Desktop.
loader.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/python3 | |
| import json | |
| import logging | |
| import os | |
| import random | |
| import re | |
| import signal | |
| import socket | |
| import sys | |
| import threading | |
| import time | |
| from datetime import datetime, timedelta | |
| from urllib.parse import urlparse | |
| import requests | |
| import yaml | |
| import yt.type_info.typing as ti | |
| import yt.wrapper as yt | |
| from requests.adapters import HTTPAdapter, Retry | |
| CONF_PATH = "/usr/local/livelogloader/livelogloader.yaml" | |
| CONFIG = {} | |
| SCHEMAS = { | |
| "trg_binlog": [ | |
| {"name": "timestamp", "type_v3": "utf8", "sort_order": "ascending"}, | |
| {"name": "shard", "type_v3": "utf8", "sort_order": "ascending"}, | |
| {"name": "msgnum", "type_v3": "uint64", "sort_order": "ascending"}, | |
| {"name": "filename", "type_v3": "utf8"}, | |
| {"name": "msgtype", "type_v3": "uint16"}, | |
| {"name": "payload", "type_v3": "string"}, | |
| ], | |
| "rb_binlog": [ | |
| {"name": "timestamp", "type_v3": "utf8", "sort_order": "ascending"}, | |
| {"name": "filename", "type_v3": "utf8", "sort_order": "ascending"}, | |
| {"name": "msgnum", "type_v3": "uint64", "sort_order": "ascending"}, | |
| {"name": "msgtype", "type_v3": "uint16"}, | |
| {"name": "payload", "type_v3": "string"}, | |
| ], | |
| } | |
| def parse_data_format(task: dict) -> tuple[str, list[dict], dict[str, str]]: | |
| """ | |
| Returns | |
| 1. Data format: tsv/trg_binlog/rb_binlog | |
| 2. Table schema | |
| 3. Additional environment params for calling YT job | |
| """ | |
| data_format = task.get("format", "trg_binlog") | |
| if data_format == "trg_binlog": | |
| return data_format, SCHEMAS["trg_binlog"], {} | |
| elif data_format == "rb_binlog": | |
| return data_format, SCHEMAS["rb_binlog"], {} | |
| elif data_format == "tsv": | |
| columns_num = task["tsv_columns_num"] | |
| column_names = [f"column{i}" for i in range(columns_num)] | |
| schema = [{ | |
| "name": "filename", "type_v3": "utf8", "sort_order": "ascending" | |
| }, { | |
| "name": "msgnum", "type_v3": "uint64", "sort_order": "ascending" | |
| }] + [{ | |
| "name": column_names[i], "type_v3": "string", | |
| } for i in range(columns_num)] + [{ | |
| "name": "rest_fields", "type_v3": {"type_name": "list", "item": "string"} | |
| }] | |
| return data_format, schema, {"TSV_COLUMNS": ','.join(column_names)} | |
| elif data_format == "raw_json": | |
| schema = [{ | |
| "name": "filename", "type_v3": "utf8", "sort_order": "ascending" | |
| }, { | |
| "name": "msgnum", "type_v3": "uint64", "sort_order": "ascending" | |
| }, { | |
| "name": "body", "type_v3": "json" | |
| }] | |
| return "raw_json", schema, {} | |
| else: | |
| raise Exception(f"Unknown data_format: {data_format}") | |
| ## Step 1: Configure the root logger | |
| def setup_logging(): | |
| """ | |
| Set up logging configuration based on CONFIG. | |
| If 'log_path' is specified in CONFIG, logs are written to the file with rotation. | |
| Otherwise, logs are output to the console. | |
| """ | |
| # Create a custom formatter to include microseconds | |
| class MicrosecondFormatter(logging.Formatter): | |
| def formatTime(self, record, datefmt=None): | |
| ct = datetime.fromtimestamp(record.created) | |
| if datefmt: | |
| s = ct.strftime(datefmt) | |
| else: | |
| s = ct.strftime('%Y-%m-%d %H:%M:%S.%f') | |
| return s | |
| formatter = MicrosecondFormatter( | |
| '%(asctime)s.%(msecs)03d [%(threadName)s] %(levelname)s: %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| root_logger = logging.getLogger() | |
| root_logger.setLevel(logging.INFO) | |
| # Remove existing handlers | |
| root_logger.handlers = [] | |
| # Check if 'log_path' is specified in CONFIG | |
| log_path = CONFIG.get('log_path') | |
| if log_path: | |
| # Set up a RotatingFileHandler with log rotation | |
| from logging.handlers import RotatingFileHandler | |
| # Configure the handler | |
| file_handler = RotatingFileHandler( | |
| filename=log_path, | |
| mode='a', | |
| maxBytes=100 * 1024 * 1024, # 10 MB | |
| backupCount=5, # Keep up to 5 backup files | |
| encoding='utf-8', | |
| delay=0 | |
| ) | |
| file_handler.setFormatter(formatter) | |
| root_logger.addHandler(file_handler) | |
| # Optional: Also log to console | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(formatter) | |
| root_logger.addHandler(console_handler) | |
| else: | |
| # Log to console only | |
| console_handler = logging.StreamHandler() | |
| console_handler.setFormatter(formatter) | |
| root_logger.addHandler(console_handler) | |
| setup_logging() | |
| class NewLoader: | |
| def load_table_api(self, ytclient, task, list_table, output_table_live, hdfs_proxy_addrs, | |
| certkey_name, data_format): | |
| output_table_live = yt.TablePath(output_table_live) | |
| output_table_live.attributes["schema"] = SCHEMAS.get(data_format) | |
| if ytclient.exists(output_table_live): | |
| ytclient.remove(output_table_live) | |
| list_count = ytclient.get(yt.ypath.ypath_join(list_table, "@row_count")) | |
| logging.info(list_count) | |
| last_file_in_list_table = ytclient.read_table(f"{list_table}" + "{path}" + f"[#{list_count - 1}:#{list_count}]", | |
| format="json") | |
| last_file_in_list_table = list(last_file_in_list_table)[0]['path'].split('/')[-1] | |
| logging.info(last_file_in_list_table) | |
| opacl = [{'subjects': ['adtech_admins'], 'action': 'allow', 'permissions': ['read', 'manage']}] | |
| job_count = list_count | |
| CERT_DIR = yt.ypath.ypath_join(task["yt_cluster"]["base_path"], "sys/secure/cert") | |
| spec_builder = ( | |
| yt.spec_builders.MapSpecBuilder() | |
| .title("Load %s" % output_table_live) | |
| .pool(task["yt_cluster"]["pool"]) | |
| .acl(opacl) | |
| .pool_trees([task["yt_cluster"]["pool_trees"]]) | |
| .input_table_paths([list_table]) | |
| .output_table_paths([output_table_live]) | |
| .job_count(job_count) | |
| .begin_mapper() | |
| .command( | |
| " ".join( | |
| ( | |
| f"HDFS_PROXY_URLS='{hdfs_proxy_addrs}'", | |
| f"TLS_CRT_PATH='{certkey_name}.crt'", | |
| f"TLS_KEY_PATH='{certkey_name}.key'", | |
| f"./{task['yt_cluster']['rbhdfspull_yt_path'].split('/')[-1]} {data_format}", | |
| f"--timeout {task.get('timeout_per_one_load', 900)}", | |
| f"--shard-position 2" | |
| ) | |
| ) | |
| ) | |
| .input_format(yt.JsonFormat()) | |
| .output_format(yt.YsonFormat()) | |
| .file_paths( | |
| [ | |
| yt.ypath.ypath_join(task['yt_cluster']["rbhdfspull_yt_path"]), | |
| yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.crt"), | |
| yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.key"), | |
| ] | |
| ) | |
| .environment( | |
| { | |
| "DATATRANSFER_HTTP_PROXY": task["yt_cluster"]["datatransfer_http_proxy"] | |
| if task.get("use_proxy", False) else "", | |
| } | |
| ) | |
| .end_mapper().spec({"max_failed_job_count": 10000}) | |
| ) | |
| return ytclient.run_operation(spec_builder, sync=False) | |
| def process(self, ytclient, config, shard, rbhdfspull_yt_path): | |
| logging.info(CONFIG) | |
| tasks = fetch_api_tasks(shard) | |
| logging.info(f"Featch API tasks: {len(tasks)} items found") | |
| if not tasks: | |
| return | |
| filenames = { | |
| 'tpsd': {'dst_path': '//home/dwh/raw/adtech/binlog/tpsd', 'src_path': '/data/target/tpsd', 'files': []}, | |
| 'antifraud': {'dst_path': '//home/dwh/raw/adtech/binlog/antifraud', 'src_path': '/data/target/antifraud', | |
| 'files': []}, | |
| 'charge2d': {'dst_path': '//home/dwh/raw/adtech/binlog/charged', 'src_path': '/data/target/charge2d', | |
| 'files': []}, | |
| 'partnerd': {'dst_path': '//home/dwh/raw/adtech/binlog/partnerd', 'src_path': '/data/target/partnerd', | |
| 'files': []}, | |
| } | |
| reg = re.compile(r'^((\d{4})(\d{2})(\d{2}))\d{6}\..*\..*\.pb\.zst$') | |
| date_str = "" | |
| ids_to_commit = [] | |
| for t in tasks: | |
| _id = t['id'] | |
| added = False | |
| for f, fs in t['filenames'].items(): | |
| if f not in filenames: | |
| logging.info(f'ERROR!: idk what to do with {f}') | |
| sys.exit(1) | |
| m = reg.match(fs) | |
| mg = m.groups() | |
| if not date_str: | |
| date_str = m.groups()[0] | |
| else: | |
| if mg[0] != date_str: | |
| logging.warn(f"skipping {t} because date differs from chosen {date_str}") | |
| continue | |
| filenames[f]['files'].append(fs) | |
| added = True | |
| if added: | |
| ids_to_commit.append(_id) | |
| _, y, mth, d = mg | |
| task = { | |
| 'load_over_s3': True, | |
| 'yt_cluster': { | |
| 'base_path': '//home/hc/adtech', | |
| 'datatransfer_http_proxy': 'http://running.rb-hadoop-proxy.datatransfer-yt.hc.one-infra.ru:3128', | |
| 'pool_trees': 'default_hc', | |
| 'pool': 'adtech-hadoop-load', | |
| 'rbhdfspull_yt_path': rbhdfspull_yt_path, | |
| }, | |
| 'format': 'trg_binlog', | |
| } | |
| schema = ( | |
| yt.schema.TableSchema() | |
| .add_column("path", ti.Utf8) | |
| .add_column("type", ti.Utf8) | |
| .add_column("length", ti.Uint64) | |
| .add_column("mtime", ti.Uint64) | |
| ) | |
| list_table = ytclient.create_temp_table( | |
| path=config["remote_temp_tables_directory"], | |
| attributes={"schema": schema}, | |
| expiration_timeout=60 * 60 * 1000, | |
| ) | |
| logging.info(f"list_table: {list_table}") | |
| certkey_name = 'yt-adtech-load' | |
| logging.info(task.get('load_over_s3', False)) | |
| if task.get('load_over_s3', False): | |
| src_url = task.get('src_url', "https://running.s3storage.hdfs-misc.clouds.adcld.ru:9443") | |
| protocol, hostname = src_url.split("://") | |
| hdfs_proxy_addrs = ','.join([ | |
| f"{protocol}://" + v | |
| for v in get_ip_addresses_for_url(f"http://{hostname}", True) | |
| ]) | |
| elif task.get('load_over_rbhp-hdfs-proxy', False): | |
| hdfs_proxy_addrs = "https://hdfs-proxy-vip.rbdev.mail.ru:9443" | |
| certkey_name = 'rbhp-hdfs-proxy' | |
| else: | |
| hdfs_proxy_addrs = ','.join(["https://" + v for v in get_rbprodhdfs_proxy_addrs()]) | |
| logging.info(hdfs_proxy_addrs) | |
| data_format = 'trg_binlog' | |
| print(filenames) | |
| with ytclient.Transaction() as t: | |
| with yt.operations_tracker.OperationsTrackerBase() as ot: | |
| for t, d in filenames.items(): | |
| if not d['files']: | |
| logging.info(f"No files for {t}") | |
| continue | |
| list_table = ytclient.create_temp_table( | |
| path=config["remote_temp_tables_directory"], | |
| attributes={"schema": schema}, | |
| expiration_timeout=60 * 60 * 1000, | |
| ) | |
| output_table = f'{d["dst_path"]}/{date_str}' | |
| output_table_live = output_table + '_live' | |
| d['output_table'] = output_table | |
| d['output_table_live'] = output_table_live | |
| logging.info(f"list_table: {list_table}, {output_table}. File count: {len(d['files'])}") | |
| files_data = [] | |
| for file in d['files']: | |
| if task.get("src_files_grouped_by_date", True): | |
| file_path = f'{d["src_path"]}/{date_str}/{file}' | |
| else: | |
| file_path = f'{d["src_path"]}/{file}' | |
| files_data.append({ | |
| "path": file_path, | |
| 'type': '', | |
| 'length': 0, | |
| 'mtime': 0 | |
| }) | |
| ytclient.write_table(yt.TablePath(list_table, append=True), files_data) | |
| op = self.load_table_api(ytclient, task, list_table, output_table + '_live', | |
| hdfs_proxy_addrs, certkey_name, data_format) | |
| logging.info("add op") | |
| ot.add(op) | |
| with yt.operations_tracker.OperationsTrackerBase() as ot: | |
| for t, d in filenames.items(): | |
| if not d['files']: | |
| continue | |
| if not ytclient.exists(d['output_table']): | |
| ytclient.move(d['output_table_live'], d['output_table']) | |
| else: | |
| logging.info("add op") | |
| op = ytclient.run_merge( | |
| [d["output_table"], d["output_table_live"]], | |
| d["output_table"], | |
| mode="auto", | |
| sync=False, | |
| ) | |
| ot.add(op) | |
| logging.info(f"Update API tasks: {len(ids_to_commit)} items, status: done") | |
| update_api_task_state(ids_to_commit, 'done') | |
| for t, d in filenames.items(): | |
| output_table_live = d['output_table_live'] | |
| if ytclient.exists(output_table_live): | |
| ytclient.remove(output_table_live) | |
| def worker(self): | |
| cluster_name = "jupiter" | |
| yt_cluster = CONFIG.get("yt_clusters", {}).get(cluster_name) | |
| task = {} | |
| if not yt_cluster: | |
| logging.info(f"ERROR!: Cluster config for {cluster_name} not found") | |
| return | |
| config = yt.default_config.get_config_from_env() | |
| config["proxy"]["url"] = yt_cluster["yt_proxy"] | |
| config["proxy"]["enable_proxy_discovery"] = False | |
| config["create_table_attributes"] = {"compression_codec": "zstd_5"} | |
| config["remote_temp_tables_directory"] = f"{yt_cluster['base_path']}/tmp" | |
| config["token"] = CONFIG["yt_token"] | |
| config["proxy"]["operation_link_pattern"] = ( | |
| f"https://yt.vk.team/{cluster_name}/operations/{{id}}/details" | |
| ) | |
| ytclient = yt.YtClient(config=config) | |
| task["yt_cluster"] = yt_cluster | |
| while not stop_event.is_set(): | |
| try: | |
| self.process(ytclient, config, CONFIG["shards"], yt_cluster["rbhdfspull_yt_path"]) | |
| if stop_event.is_set(): | |
| break | |
| except Exception as e: | |
| logging.info(f"ERROR!: Error in processing NewLoader: {type(e)}: {e}") | |
| # Handle unrecoverable errors | |
| if 'Some subjects mentioned in ACL are missing' not in str(e): | |
| stop_event.set() | |
| logging.info("sleep 1 before next iteration") | |
| time.sleep(1) # Wait before next iteration | |
| def get_ip_addresses_for_url(url: str, shuffle_ips: bool = True): | |
| """ | |
| Given a URL like http://my-srvice:8080/path, parse the hostname, resolve it, | |
| and return a list of ip:port addresses. Optionally shuffle them for load balancing. | |
| :param url: The URL to resolve, e.g. 'http://example.com:80' | |
| :param shuffle_ips: If True, randomly shuffle the resolved IPs | |
| :return: A list of strings, each in the form 'ip:port' | |
| """ | |
| parsed_url = urlparse(url) | |
| if not parsed_url.hostname: | |
| logging.info("ERROR!: URL missing a hostname: %s", url) | |
| return [] | |
| hostname = parsed_url.hostname | |
| port = parsed_url.port or 80 # default to 80 if port is not specified or None | |
| # Attempt to resolve hostname | |
| try: | |
| # gethostbyname_ex can return multiple IPs (aliases, etc.) | |
| # - official_name is the canonical name of the host. | |
| # - aliaslist is a (possibly empty) list of aliases for the host. | |
| # - ipaddrlist is a list of IPv4 addresses for the host. | |
| official_name, aliaslist, ipaddrlist = socket.gethostbyname_ex(hostname) | |
| except socket.gaierror as e: | |
| logging.info("ERROR!: Failed to resolve hostname %s: %s", hostname, e) | |
| return [] | |
| if not ipaddrlist: | |
| logging.info("ERROR!: No IP addresses found for hostname %s", hostname) | |
| return [] | |
| # Construct 'ip:port' strings | |
| addresses = [f"{ip}:{port}" for ip in ipaddrlist] | |
| # Shuffle if desired | |
| if shuffle_ips: | |
| random.shuffle(addresses) | |
| logging.info("Resolved %s -> %s", url, addresses) | |
| return addresses | |
| def adjust_loggers(): | |
| logger_names = ['yt', 'yt.wrapper', 'yt.packages.requests', 'requests'] | |
| for name in logger_names: | |
| logger = logging.getLogger(name) | |
| logger.handlers = [] | |
| logger.propagate = True | |
| logger.setLevel(logging.INFO) | |
| # Alternatively, adjust all loggers | |
| for name, logger in logging.Logger.manager.loggerDict.items(): | |
| if isinstance(logger, logging.Logger): | |
| logger.handlers = [] | |
| logger.propagate = True | |
| logger.setLevel(logging.INFO) | |
| # Call the function to adjust loggers | |
| adjust_loggers() | |
| API_TIMEOUT = 120 | |
| API_STAGE = "yt_binlog_loader" | |
| def fetch_api_tasks(shard): | |
| url = f"{CONFIG['statapid_url']}/api/v1/tasks/new" | |
| params = { | |
| "stage": API_STAGE, | |
| "shard": shard, | |
| "limit": 10000, | |
| } | |
| try: | |
| r = requests_session().get(url, params=params, timeout=API_TIMEOUT) | |
| r.raise_for_status() | |
| tasks = r.json() | |
| if tasks: | |
| return tasks | |
| return tasks | |
| except Exception as e: | |
| logging.info("ERROR!: API fetch failed: %s", e) | |
| return [] | |
| def update_api_task_state( | |
| ids: list[int], | |
| status: str = "done", | |
| task_limit: int = 10000, # 10k is max for API | |
| ) -> None: | |
| if not ids: | |
| return | |
| session = requests_session() | |
| task_ids = list(set(ids)) | |
| logging.info(f"Updating API tasks, set status {status}") | |
| for i in range(0, len(task_ids), task_limit): | |
| batch_ids = task_ids[i: i + task_limit] | |
| for attempt in range(1, 6): | |
| try: | |
| response = session.post( | |
| f"{CONFIG['statapid_url']}/api/v1/update_tasks_statuses", | |
| json={"task_ids": batch_ids}, | |
| params={"stage": API_STAGE, "status": status}, | |
| timeout=API_TIMEOUT, | |
| ) | |
| response.raise_for_status() | |
| break | |
| except Exception as e: | |
| logging.warning( | |
| f"Batch update for tasks {batch_ids} failed (try {attempt}/5): {e}" | |
| ) | |
| if attempt == 5: | |
| raise RuntimeError( | |
| f"Could not set '{status}' for tasks: {sorted(batch_ids)}" | |
| ) | |
| time.sleep(0.5 * 2 ** (attempt - 1)) | |
| logging.info(f"Successfully updated status for {len(task_ids)} tasks.") | |
| def requests_session(): | |
| s = requests.Session() | |
| retries = Retry(total=10, backoff_factor=0.5, status_forcelist=[503]) | |
| s.mount('http://', HTTPAdapter(max_retries=retries)) | |
| s.mount('https://', HTTPAdapter(max_retries=retries)) | |
| return s | |
| stop_event = threading.Event() # Event to signal threads to stop | |
| CONFIG = { | |
| 'logfile': None, | |
| 'loglevel': 'DEBUG', | |
| 'targets_export_path': None, | |
| 'default_target': {}, | |
| 'targets': [], | |
| 'rbprod_etcd_user': 'rbhadoop', | |
| 'rbprod_etcd_addr': ["rbzoo%s.i:9443" % x for x in range(1, 6)], | |
| 'rbprod_hdfs_pref_ips': None, | |
| 'rbprod_hdfs_cert': None, | |
| 'ciri_baseurl': None, | |
| 'ciri_auth_token': None, | |
| 'statsd': None, | |
| } | |
| def process(ytclient, output_table, config, table_path, load_date, task): | |
| logging.info(CONFIG) | |
| dst_datestr = load_date.strftime("%Y-%m-%d") | |
| src_datestr = load_date.strftime(task.get("src_date_format", "%Y%m%d")) | |
| if task.get("src_files_grouped_by_date", True): | |
| filename_regex = task.get("filename_regex", '^[^\.].*\.(zst|gz|lz4|bz2|json)') | |
| else: | |
| filename_regex = task.get("filename_regex", f'^{src_datestr}.*\.(zst|gz|lz4|bz2|json)') | |
| loading_mode = 'current' | |
| BIN_DIR = yt.ypath.ypath_join(task["yt_cluster"]["base_path"], "sys/bin") | |
| CERT_DIR = yt.ypath.ypath_join(task["yt_cluster"]["base_path"], "sys/secure/cert") | |
| output_table_path = yt.ypath.ypath_join(output_table, dst_datestr) | |
| success_attr_path = yt.ypath.ypath_join(output_table_path, '@seen_success') | |
| output_table_path_live = output_table_path + "_live" | |
| if task.get("src_files_grouped_by_date", True): | |
| input_table_path = os.path.join(table_path, src_datestr) | |
| else: | |
| input_table_path = os.path.join(table_path) | |
| if not ytclient.exists(output_table): | |
| logging.info(f"Map node {output_table} is absent, creating...") | |
| ytclient.create("map_node", output_table, recursive=True, force=True) | |
| last_seen_file = False | |
| try: | |
| last_seen_file = ytclient.get(yt.ypath.ypath_join(output_table_path, "@last_seen_file")) | |
| except yt.errors.YtResolveError: | |
| pass | |
| logging.info(f"Process: {output_table_path}, last_seen_file {last_seen_file}") | |
| opacl = [{'subjects': ['adtech_admins', 'o.vinogradov'], 'action': 'allow', 'permissions': ['read', 'manage']}] | |
| schema = ( | |
| yt.schema.TableSchema() | |
| .add_column("path", ti.Utf8) | |
| .add_column("type", ti.Utf8) | |
| .add_column("length", ti.Uint64) | |
| .add_column("mtime", ti.Uint64) | |
| ) | |
| list_table = ytclient.create_temp_table( | |
| path=config["remote_temp_tables_directory"], | |
| attributes={"schema": schema}, | |
| expiration_timeout=60 * 60 * 1000, | |
| ) | |
| logging.info(f"list_table: {list_table}") | |
| uploaded_files_schema = ( | |
| yt.schema.TableSchema() | |
| .add_column("date", ti.Date) | |
| .add_column("path", ti.Utf8) | |
| ) | |
| certkey_name = 'yt-adtech-load' | |
| logging.info(task.get('load_over_s3', False)) | |
| if task.get('load_over_s3', False): | |
| src_url = task.get('src_url', "https://running.s3storage.hdfs-misc.clouds.adcld.ru:9443") | |
| protocol, hostname = src_url.split("://") | |
| hdfs_proxy_addrs = ','.join([ | |
| f"{protocol}://" + v | |
| for v in get_ip_addresses_for_url(f"http://{hostname}", True) | |
| ]) | |
| elif task.get('load_over_rbhp-hdfs-proxy', False): | |
| hdfs_proxy_addrs = "https://hdfs-proxy-vip.rbdev.mail.ru:9443" | |
| certkey_name = 'rbhp-hdfs-proxy' | |
| else: | |
| hdfs_proxy_addrs = ','.join(["https://" + v for v in get_rbprodhdfs_proxy_addrs()]) | |
| logging.info(hdfs_proxy_addrs) | |
| if task.get("use_parallel_daily_jobs", False): | |
| meta_path = f"meta/{dst_datestr}" | |
| else: | |
| meta_path = f"meta" | |
| output_table_uploaded_files = output_table + f"/{meta_path}/uploaded_files_as_table" | |
| output_table_uploaded_files_commited = output_table_uploaded_files + "_commited" | |
| if not ytclient.exists(os.path.join(output_table, meta_path)): | |
| logging.info("Meta map node is absent, creating...") | |
| ytclient.create( | |
| "map_node", | |
| os.path.join(output_table, meta_path), | |
| recursive=True | |
| ) | |
| secure_vault = { | |
| "YT_TOKEN": CONFIG['yt_token'], | |
| } | |
| update_env_script = "update_operation_env.sh" | |
| command = " ".join( | |
| [ | |
| f"source {update_env_script} &&", | |
| f"YT_PROXY={task['yt_cluster']['yt_proxy']}", | |
| "YT_ALLOW_HTTP_REQUESTS_TO_YT_FROM_JOB=1", | |
| f"HDFS_PROXY_URLS='{hdfs_proxy_addrs}'", | |
| f"TLS_CRT_PATH='{certkey_name}.crt'", | |
| f"TLS_KEY_PATH='{certkey_name}.key'", | |
| f"./{task['yt_cluster']['rbhdfspull_yt_path'].split('/')[-1]} ls {input_table_path}", | |
| f"--skip-file-list-path {output_table_uploaded_files} ", | |
| f"--success-attr-path {success_attr_path}", | |
| (f"--filename-regex '{filename_regex}' " if filename_regex else ""), | |
| f"--limit {task.get('parallel_files_load', 10000)}", | |
| ] | |
| ) | |
| logging.info(command) | |
| spec_builder = ( | |
| yt.spec_builders.VanillaSpecBuilder() | |
| .title("Listing hdfs %s" % input_table_path) | |
| .pool(task["yt_cluster"]["pool"]) | |
| .acl(opacl) | |
| .secure_vault(secure_vault) | |
| .max_failed_job_count(3) | |
| .begin_task("hdfs ls") | |
| .command(command) | |
| .file_paths( | |
| [ | |
| task['yt_cluster']["rbhdfspull_yt_path"], | |
| yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.crt"), | |
| yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.key"), | |
| yt.ypath.ypath_join(BIN_DIR, f"{update_env_script}"), | |
| ] | |
| ) | |
| .job_count(1) | |
| .memory_limit(2 * 1024 * 1024 * 1024) | |
| .environment( | |
| { | |
| "YT_ALLOW_HTTP_REQUESTS_TO_YT_FROM_JOB": "1", | |
| "DATATRANSFER_HTTP_PROXY": task["yt_cluster"]["datatransfer_http_proxy"] | |
| if task.get("use_proxy", False) else "", | |
| } | |
| ) | |
| .output_format(yt.JsonFormat()) | |
| .output_table_paths([list_table]) # PATCHED | |
| .end_task() | |
| ) | |
| if not ytclient.exists(output_table_uploaded_files): | |
| ytclient.create("table", output_table_uploaded_files, attributes={"schema": uploaded_files_schema}, | |
| recursive=True) | |
| if not ytclient.exists(output_table_uploaded_files_commited): | |
| ytclient.create("table", output_table_uploaded_files_commited, attributes={"schema": uploaded_files_schema}, | |
| recursive=True) | |
| logging.info(output_table_uploaded_files) | |
| logging.info(output_table_uploaded_files_commited) | |
| logging.info(output_table_path_live) | |
| if ytclient.exists(output_table_path_live): | |
| ytclient.remove(output_table_path_live) | |
| ytclient.run_operation(spec_builder) | |
| list_count = ytclient.get(yt.ypath.ypath_join(list_table, "@row_count")) | |
| if list_count == 0 and loading_mode != "previous": | |
| logging.info("Nothing to load") | |
| return | |
| with ytclient.Transaction() as t: | |
| ytclient.move(output_table_uploaded_files, output_table_uploaded_files_commited, force=True) | |
| if not ytclient.exists(list_table): | |
| ytclient.create("table", list_table, recursive=True) | |
| if list_count == 0 and loading_mode == 'previous': | |
| ytclient.set(yt.ypath.ypath_join(output_table_path, "@success"), "true") | |
| if ytclient.exists(output_table_uploaded_files_commited): | |
| pass | |
| return | |
| logging.info(list_count) | |
| last_file_in_list_table = ytclient.read_table(f"{list_table}" + "{path}" + f"[#{list_count - 1}:#{list_count}]", | |
| format="json") | |
| last_file_in_list_table = list(last_file_in_list_table)[0]['path'].split('/')[-1] | |
| logging.info(last_file_in_list_table) | |
| logging.info(f"list_count: {list_count}") | |
| data_format, schema, additional_env_params = parse_data_format(task) | |
| output_table = yt.TablePath(output_table_path_live) | |
| output_table.attributes["schema"] = schema | |
| job_count = list_count | |
| if job_count <= 0: | |
| job_count = 1 | |
| spec_builder = ( | |
| yt.spec_builders.MapSpecBuilder() | |
| .title("Load %s" % output_table_path_live) | |
| .pool(task["yt_cluster"]["pool"]) | |
| .acl(opacl) | |
| .pool_trees([task["yt_cluster"]["pool_trees"]]) | |
| .input_table_paths([list_table]) | |
| .output_table_paths([output_table]) | |
| .job_count(job_count) | |
| .begin_mapper() | |
| .memory_limit(2 * 1024 * 1024 * 1024) | |
| .command( | |
| " ".join( | |
| ( | |
| f"HDFS_PROXY_URLS='{hdfs_proxy_addrs}'", | |
| f"TLS_CRT_PATH='{certkey_name}.crt'", | |
| f"TLS_KEY_PATH='{certkey_name}.key'", | |
| f"./{task['yt_cluster']['rbhdfspull_yt_path'].split('/')[-1]} {data_format}", | |
| f"--timeout {task.get('timeout_per_one_load', 1800)}", | |
| f"--shard-position {task.get('shard_position', 2)}", | |
| ) | |
| ) | |
| ) | |
| .input_format(yt.JsonFormat()) | |
| .output_format(yt.YsonFormat()) | |
| .file_paths( | |
| [ | |
| task['yt_cluster']["rbhdfspull_yt_path"], | |
| yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.crt"), | |
| yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.key"), | |
| ] | |
| ) | |
| .environment( | |
| { | |
| "DATATRANSFER_HTTP_PROXY": task["yt_cluster"]["datatransfer_http_proxy"] | |
| if task.get("use_proxy", False) else "", | |
| **additional_env_params | |
| } | |
| ) | |
| .end_mapper().spec({"max_failed_job_count": 10000}) | |
| ) | |
| ytclient.run_operation(spec_builder) | |
| if not ytclient.exists(output_table_path): | |
| ytclient.move(output_table_path_live, output_table_path) | |
| else: | |
| ytclient.run_merge([output_table_path, output_table_path_live], output_table_path, mode="auto") | |
| ytclient.set(yt.ypath.ypath_join(output_table_path, "@last_seen_file"), last_file_in_list_table) | |
| def get_rbprodhdfs_proxy_addrs(): | |
| upsync_nodes = None | |
| etcd_addr = CONFIG['rbprod_etcd_addr'] | |
| random.shuffle(etcd_addr) | |
| for h in etcd_addr: | |
| url = "http://%s:%s@%s/v2/keys/upsync/rbhdfs-proxy/" % (CONFIG['rbprod_etcd_user'], | |
| CONFIG['rbprod_etcd_password'], h) | |
| r = requests_session().get(url, timeout=10) | |
| if r.status_code == 200: | |
| try: | |
| upsync_nodes = json.loads(r.text)['node']['nodes'] | |
| break | |
| except Exception as e: | |
| logging.info(f"ERROR!: get_rbprodhdfs_proxy_addrs: {e}") | |
| pass | |
| if not upsync_nodes: | |
| return | |
| nodes = [] | |
| other_nodes = [] | |
| random.shuffle(upsync_nodes) | |
| for node in upsync_nodes: | |
| ip, port = node['key'].split('/')[3].split(':') | |
| addr = "%s:%s" % (ip, port) | |
| other_nodes.append(addr) | |
| if not nodes: | |
| nodes += other_nodes | |
| if nodes: | |
| logging.info(nodes[1]) | |
| return nodes | |
| logging.info("ERROR!: get_rbprodhdfs_proxy_addrs: empty list") | |
| def worker(task): | |
| cluster_name = task.get("cluster", "jupiter") | |
| yt_cluster = CONFIG.get("yt_clusters", {}).get(cluster_name) | |
| if not yt_cluster: | |
| logging.info(f"ERROR!: Cluster config for {cluster_name} not found") | |
| return | |
| config = yt.default_config.get_config_from_env() | |
| config["proxy"]["url"] = yt_cluster["yt_proxy"] | |
| config["proxy"]["enable_proxy_discovery"] = False | |
| config["create_table_attributes"] = {"compression_codec": "zstd_5"} | |
| config["remote_temp_tables_directory"] = f"{yt_cluster['base_path']}/tmp" | |
| config["token"] = CONFIG["yt_token"] | |
| config["proxy"]["operation_link_pattern"] = ( | |
| f"https://yt.vk.team/{cluster_name}/operations/{{id}}/details" | |
| ) | |
| task["yt_cluster"] = yt_cluster | |
| src = task['src'] | |
| dst = task['dst'] | |
| table_path = src | |
| output_table = dst | |
| while not stop_event.is_set(): | |
| backfill_depth = task.get("backfill_depth", 2) | |
| days_lag = task.get("days_lag", 0) | |
| now = datetime.now() - timedelta(days_lag) | |
| days_to_load = reversed([ | |
| now - timedelta(days=i) for i in range(backfill_depth) | |
| ]) | |
| threads = [] | |
| for day in days_to_load: | |
| name = f"{task['src']}->{task['dst']}::{day.strftime('%Y-%m-%d')}" | |
| client = yt.YtClient(config=config) | |
| t = threading.Thread(target=process, | |
| args=(client, output_table, config, table_path, day, task), | |
| name=name | |
| ) | |
| threads.append(t) | |
| process_threads(threads, concurrent=task.get("use_parallel_daily_jobs", False)) | |
| def process_threads(threads, concurrent: bool = False): | |
| def wait_result(thread): | |
| try: | |
| thread.join() | |
| except Exception as e: | |
| logging.info(f"ERROR!: Error in process {thread._name}: {e}") | |
| raise | |
| else: | |
| logging.info(f"Successfully finished {thread._name}") | |
| if concurrent: | |
| for t in threads: | |
| logging.info(f"Starting {t._name}") | |
| t.start() | |
| for t in threads: | |
| wait_result(t) | |
| else: | |
| for t in threads: | |
| logging.info(f"Starting {t._name}") | |
| t.start() | |
| wait_result(t) | |
| def signal_handler(sig, frame): | |
| logging.info("Signal received, shutting down.") | |
| stop_event.set() | |
| def main(): | |
| global CONF_PATH | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| # Optionally handle SIGINT for Ctrl+C | |
| signal.signal(signal.SIGINT, signal_handler) | |
| if len(sys.argv) > 1: | |
| CONF_PATH = sys.argv[1] | |
| with open(CONF_PATH) as f: | |
| conf = yaml.safe_load(f) | |
| CONFIG.update(conf) | |
| os.environ['YT_TOKEN'] = CONFIG['yt_token'] | |
| # Set up logging | |
| setup_logging() | |
| adjust_loggers() | |
| # Get tasks from CONFIG['targets'], each containing 'src' and 'dst' | |
| tasks = CONFIG.get('targets', []) | |
| if not tasks: | |
| logging.info("ERROR!: No tasks found in CONFIG['targets']") | |
| sys.exit(1) | |
| threads = [] | |
| if CONFIG.get("use_new_loader", True): | |
| NL = NewLoader() | |
| t = threading.Thread(target=NL.worker, args=(), name="NewLoader") | |
| t.start() | |
| threads.append(t) | |
| for task in tasks: | |
| t = threading.Thread(target=worker, args=(task,), name=f"{task['src']}->{task['dst']}") | |
| t.start() | |
| threads.append(t) | |
| # Wait for threads to complete | |
| try: | |
| for t in threads: | |
| t.join() | |
| except KeyboardInterrupt: | |
| logging.info("KeyboardInterrupt received, stopping threads") | |
| stop_event.set() | |
| for t in threads: | |
| t.join() | |
| except Exception as e: | |
| logging.info(f"ERROR!: Unexpected error: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment