Skip to content

Instantly share code, notes, and snippets.

@recoilme
Created November 24, 2025 13:34
Show Gist options
  • Select an option

  • Save recoilme/f058657429300b583b13da92ecd6c4e0 to your computer and use it in GitHub Desktop.

Select an option

Save recoilme/f058657429300b583b13da92ecd6c4e0 to your computer and use it in GitHub Desktop.
loader.py
#!/bin/python3
import json
import logging
import os
import random
import re
import signal
import socket
import sys
import threading
import time
from datetime import datetime, timedelta
from urllib.parse import urlparse
import requests
import yaml
import yt.type_info.typing as ti
import yt.wrapper as yt
from requests.adapters import HTTPAdapter, Retry
CONF_PATH = "/usr/local/livelogloader/livelogloader.yaml"
CONFIG = {}
SCHEMAS = {
"trg_binlog": [
{"name": "timestamp", "type_v3": "utf8", "sort_order": "ascending"},
{"name": "shard", "type_v3": "utf8", "sort_order": "ascending"},
{"name": "msgnum", "type_v3": "uint64", "sort_order": "ascending"},
{"name": "filename", "type_v3": "utf8"},
{"name": "msgtype", "type_v3": "uint16"},
{"name": "payload", "type_v3": "string"},
],
"rb_binlog": [
{"name": "timestamp", "type_v3": "utf8", "sort_order": "ascending"},
{"name": "filename", "type_v3": "utf8", "sort_order": "ascending"},
{"name": "msgnum", "type_v3": "uint64", "sort_order": "ascending"},
{"name": "msgtype", "type_v3": "uint16"},
{"name": "payload", "type_v3": "string"},
],
}
def parse_data_format(task: dict) -> tuple[str, list[dict], dict[str, str]]:
"""
Returns
1. Data format: tsv/trg_binlog/rb_binlog
2. Table schema
3. Additional environment params for calling YT job
"""
data_format = task.get("format", "trg_binlog")
if data_format == "trg_binlog":
return data_format, SCHEMAS["trg_binlog"], {}
elif data_format == "rb_binlog":
return data_format, SCHEMAS["rb_binlog"], {}
elif data_format == "tsv":
columns_num = task["tsv_columns_num"]
column_names = [f"column{i}" for i in range(columns_num)]
schema = [{
"name": "filename", "type_v3": "utf8", "sort_order": "ascending"
}, {
"name": "msgnum", "type_v3": "uint64", "sort_order": "ascending"
}] + [{
"name": column_names[i], "type_v3": "string",
} for i in range(columns_num)] + [{
"name": "rest_fields", "type_v3": {"type_name": "list", "item": "string"}
}]
return data_format, schema, {"TSV_COLUMNS": ','.join(column_names)}
elif data_format == "raw_json":
schema = [{
"name": "filename", "type_v3": "utf8", "sort_order": "ascending"
}, {
"name": "msgnum", "type_v3": "uint64", "sort_order": "ascending"
}, {
"name": "body", "type_v3": "json"
}]
return "raw_json", schema, {}
else:
raise Exception(f"Unknown data_format: {data_format}")
## Step 1: Configure the root logger
def setup_logging():
"""
Set up logging configuration based on CONFIG.
If 'log_path' is specified in CONFIG, logs are written to the file with rotation.
Otherwise, logs are output to the console.
"""
# Create a custom formatter to include microseconds
class MicrosecondFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
ct = datetime.fromtimestamp(record.created)
if datefmt:
s = ct.strftime(datefmt)
else:
s = ct.strftime('%Y-%m-%d %H:%M:%S.%f')
return s
formatter = MicrosecondFormatter(
'%(asctime)s.%(msecs)03d [%(threadName)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# Remove existing handlers
root_logger.handlers = []
# Check if 'log_path' is specified in CONFIG
log_path = CONFIG.get('log_path')
if log_path:
# Set up a RotatingFileHandler with log rotation
from logging.handlers import RotatingFileHandler
# Configure the handler
file_handler = RotatingFileHandler(
filename=log_path,
mode='a',
maxBytes=100 * 1024 * 1024, # 10 MB
backupCount=5, # Keep up to 5 backup files
encoding='utf-8',
delay=0
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Optional: Also log to console
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
else:
# Log to console only
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
setup_logging()
class NewLoader:
def load_table_api(self, ytclient, task, list_table, output_table_live, hdfs_proxy_addrs,
certkey_name, data_format):
output_table_live = yt.TablePath(output_table_live)
output_table_live.attributes["schema"] = SCHEMAS.get(data_format)
if ytclient.exists(output_table_live):
ytclient.remove(output_table_live)
list_count = ytclient.get(yt.ypath.ypath_join(list_table, "@row_count"))
logging.info(list_count)
last_file_in_list_table = ytclient.read_table(f"{list_table}" + "{path}" + f"[#{list_count - 1}:#{list_count}]",
format="json")
last_file_in_list_table = list(last_file_in_list_table)[0]['path'].split('/')[-1]
logging.info(last_file_in_list_table)
opacl = [{'subjects': ['adtech_admins'], 'action': 'allow', 'permissions': ['read', 'manage']}]
job_count = list_count
CERT_DIR = yt.ypath.ypath_join(task["yt_cluster"]["base_path"], "sys/secure/cert")
spec_builder = (
yt.spec_builders.MapSpecBuilder()
.title("Load %s" % output_table_live)
.pool(task["yt_cluster"]["pool"])
.acl(opacl)
.pool_trees([task["yt_cluster"]["pool_trees"]])
.input_table_paths([list_table])
.output_table_paths([output_table_live])
.job_count(job_count)
.begin_mapper()
.command(
" ".join(
(
f"HDFS_PROXY_URLS='{hdfs_proxy_addrs}'",
f"TLS_CRT_PATH='{certkey_name}.crt'",
f"TLS_KEY_PATH='{certkey_name}.key'",
f"./{task['yt_cluster']['rbhdfspull_yt_path'].split('/')[-1]} {data_format}",
f"--timeout {task.get('timeout_per_one_load', 900)}",
f"--shard-position 2"
)
)
)
.input_format(yt.JsonFormat())
.output_format(yt.YsonFormat())
.file_paths(
[
yt.ypath.ypath_join(task['yt_cluster']["rbhdfspull_yt_path"]),
yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.crt"),
yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.key"),
]
)
.environment(
{
"DATATRANSFER_HTTP_PROXY": task["yt_cluster"]["datatransfer_http_proxy"]
if task.get("use_proxy", False) else "",
}
)
.end_mapper().spec({"max_failed_job_count": 10000})
)
return ytclient.run_operation(spec_builder, sync=False)
def process(self, ytclient, config, shard, rbhdfspull_yt_path):
logging.info(CONFIG)
tasks = fetch_api_tasks(shard)
logging.info(f"Featch API tasks: {len(tasks)} items found")
if not tasks:
return
filenames = {
'tpsd': {'dst_path': '//home/dwh/raw/adtech/binlog/tpsd', 'src_path': '/data/target/tpsd', 'files': []},
'antifraud': {'dst_path': '//home/dwh/raw/adtech/binlog/antifraud', 'src_path': '/data/target/antifraud',
'files': []},
'charge2d': {'dst_path': '//home/dwh/raw/adtech/binlog/charged', 'src_path': '/data/target/charge2d',
'files': []},
'partnerd': {'dst_path': '//home/dwh/raw/adtech/binlog/partnerd', 'src_path': '/data/target/partnerd',
'files': []},
}
reg = re.compile(r'^((\d{4})(\d{2})(\d{2}))\d{6}\..*\..*\.pb\.zst$')
date_str = ""
ids_to_commit = []
for t in tasks:
_id = t['id']
added = False
for f, fs in t['filenames'].items():
if f not in filenames:
logging.info(f'ERROR!: idk what to do with {f}')
sys.exit(1)
m = reg.match(fs)
mg = m.groups()
if not date_str:
date_str = m.groups()[0]
else:
if mg[0] != date_str:
logging.warn(f"skipping {t} because date differs from chosen {date_str}")
continue
filenames[f]['files'].append(fs)
added = True
if added:
ids_to_commit.append(_id)
_, y, mth, d = mg
task = {
'load_over_s3': True,
'yt_cluster': {
'base_path': '//home/hc/adtech',
'datatransfer_http_proxy': 'http://running.rb-hadoop-proxy.datatransfer-yt.hc.one-infra.ru:3128',
'pool_trees': 'default_hc',
'pool': 'adtech-hadoop-load',
'rbhdfspull_yt_path': rbhdfspull_yt_path,
},
'format': 'trg_binlog',
}
schema = (
yt.schema.TableSchema()
.add_column("path", ti.Utf8)
.add_column("type", ti.Utf8)
.add_column("length", ti.Uint64)
.add_column("mtime", ti.Uint64)
)
list_table = ytclient.create_temp_table(
path=config["remote_temp_tables_directory"],
attributes={"schema": schema},
expiration_timeout=60 * 60 * 1000,
)
logging.info(f"list_table: {list_table}")
certkey_name = 'yt-adtech-load'
logging.info(task.get('load_over_s3', False))
if task.get('load_over_s3', False):
src_url = task.get('src_url', "https://running.s3storage.hdfs-misc.clouds.adcld.ru:9443")
protocol, hostname = src_url.split("://")
hdfs_proxy_addrs = ','.join([
f"{protocol}://" + v
for v in get_ip_addresses_for_url(f"http://{hostname}", True)
])
elif task.get('load_over_rbhp-hdfs-proxy', False):
hdfs_proxy_addrs = "https://hdfs-proxy-vip.rbdev.mail.ru:9443"
certkey_name = 'rbhp-hdfs-proxy'
else:
hdfs_proxy_addrs = ','.join(["https://" + v for v in get_rbprodhdfs_proxy_addrs()])
logging.info(hdfs_proxy_addrs)
data_format = 'trg_binlog'
print(filenames)
with ytclient.Transaction() as t:
with yt.operations_tracker.OperationsTrackerBase() as ot:
for t, d in filenames.items():
if not d['files']:
logging.info(f"No files for {t}")
continue
list_table = ytclient.create_temp_table(
path=config["remote_temp_tables_directory"],
attributes={"schema": schema},
expiration_timeout=60 * 60 * 1000,
)
output_table = f'{d["dst_path"]}/{date_str}'
output_table_live = output_table + '_live'
d['output_table'] = output_table
d['output_table_live'] = output_table_live
logging.info(f"list_table: {list_table}, {output_table}. File count: {len(d['files'])}")
files_data = []
for file in d['files']:
if task.get("src_files_grouped_by_date", True):
file_path = f'{d["src_path"]}/{date_str}/{file}'
else:
file_path = f'{d["src_path"]}/{file}'
files_data.append({
"path": file_path,
'type': '',
'length': 0,
'mtime': 0
})
ytclient.write_table(yt.TablePath(list_table, append=True), files_data)
op = self.load_table_api(ytclient, task, list_table, output_table + '_live',
hdfs_proxy_addrs, certkey_name, data_format)
logging.info("add op")
ot.add(op)
with yt.operations_tracker.OperationsTrackerBase() as ot:
for t, d in filenames.items():
if not d['files']:
continue
if not ytclient.exists(d['output_table']):
ytclient.move(d['output_table_live'], d['output_table'])
else:
logging.info("add op")
op = ytclient.run_merge(
[d["output_table"], d["output_table_live"]],
d["output_table"],
mode="auto",
sync=False,
)
ot.add(op)
logging.info(f"Update API tasks: {len(ids_to_commit)} items, status: done")
update_api_task_state(ids_to_commit, 'done')
for t, d in filenames.items():
output_table_live = d['output_table_live']
if ytclient.exists(output_table_live):
ytclient.remove(output_table_live)
def worker(self):
cluster_name = "jupiter"
yt_cluster = CONFIG.get("yt_clusters", {}).get(cluster_name)
task = {}
if not yt_cluster:
logging.info(f"ERROR!: Cluster config for {cluster_name} not found")
return
config = yt.default_config.get_config_from_env()
config["proxy"]["url"] = yt_cluster["yt_proxy"]
config["proxy"]["enable_proxy_discovery"] = False
config["create_table_attributes"] = {"compression_codec": "zstd_5"}
config["remote_temp_tables_directory"] = f"{yt_cluster['base_path']}/tmp"
config["token"] = CONFIG["yt_token"]
config["proxy"]["operation_link_pattern"] = (
f"https://yt.vk.team/{cluster_name}/operations/{{id}}/details"
)
ytclient = yt.YtClient(config=config)
task["yt_cluster"] = yt_cluster
while not stop_event.is_set():
try:
self.process(ytclient, config, CONFIG["shards"], yt_cluster["rbhdfspull_yt_path"])
if stop_event.is_set():
break
except Exception as e:
logging.info(f"ERROR!: Error in processing NewLoader: {type(e)}: {e}")
# Handle unrecoverable errors
if 'Some subjects mentioned in ACL are missing' not in str(e):
stop_event.set()
logging.info("sleep 1 before next iteration")
time.sleep(1) # Wait before next iteration
def get_ip_addresses_for_url(url: str, shuffle_ips: bool = True):
"""
Given a URL like http://my-srvice:8080/path, parse the hostname, resolve it,
and return a list of ip:port addresses. Optionally shuffle them for load balancing.
:param url: The URL to resolve, e.g. 'http://example.com:80'
:param shuffle_ips: If True, randomly shuffle the resolved IPs
:return: A list of strings, each in the form 'ip:port'
"""
parsed_url = urlparse(url)
if not parsed_url.hostname:
logging.info("ERROR!: URL missing a hostname: %s", url)
return []
hostname = parsed_url.hostname
port = parsed_url.port or 80 # default to 80 if port is not specified or None
# Attempt to resolve hostname
try:
# gethostbyname_ex can return multiple IPs (aliases, etc.)
# - official_name is the canonical name of the host.
# - aliaslist is a (possibly empty) list of aliases for the host.
# - ipaddrlist is a list of IPv4 addresses for the host.
official_name, aliaslist, ipaddrlist = socket.gethostbyname_ex(hostname)
except socket.gaierror as e:
logging.info("ERROR!: Failed to resolve hostname %s: %s", hostname, e)
return []
if not ipaddrlist:
logging.info("ERROR!: No IP addresses found for hostname %s", hostname)
return []
# Construct 'ip:port' strings
addresses = [f"{ip}:{port}" for ip in ipaddrlist]
# Shuffle if desired
if shuffle_ips:
random.shuffle(addresses)
logging.info("Resolved %s -> %s", url, addresses)
return addresses
def adjust_loggers():
logger_names = ['yt', 'yt.wrapper', 'yt.packages.requests', 'requests']
for name in logger_names:
logger = logging.getLogger(name)
logger.handlers = []
logger.propagate = True
logger.setLevel(logging.INFO)
# Alternatively, adjust all loggers
for name, logger in logging.Logger.manager.loggerDict.items():
if isinstance(logger, logging.Logger):
logger.handlers = []
logger.propagate = True
logger.setLevel(logging.INFO)
# Call the function to adjust loggers
adjust_loggers()
API_TIMEOUT = 120
API_STAGE = "yt_binlog_loader"
def fetch_api_tasks(shard):
url = f"{CONFIG['statapid_url']}/api/v1/tasks/new"
params = {
"stage": API_STAGE,
"shard": shard,
"limit": 10000,
}
try:
r = requests_session().get(url, params=params, timeout=API_TIMEOUT)
r.raise_for_status()
tasks = r.json()
if tasks:
return tasks
return tasks
except Exception as e:
logging.info("ERROR!: API fetch failed: %s", e)
return []
def update_api_task_state(
ids: list[int],
status: str = "done",
task_limit: int = 10000, # 10k is max for API
) -> None:
if not ids:
return
session = requests_session()
task_ids = list(set(ids))
logging.info(f"Updating API tasks, set status {status}")
for i in range(0, len(task_ids), task_limit):
batch_ids = task_ids[i: i + task_limit]
for attempt in range(1, 6):
try:
response = session.post(
f"{CONFIG['statapid_url']}/api/v1/update_tasks_statuses",
json={"task_ids": batch_ids},
params={"stage": API_STAGE, "status": status},
timeout=API_TIMEOUT,
)
response.raise_for_status()
break
except Exception as e:
logging.warning(
f"Batch update for tasks {batch_ids} failed (try {attempt}/5): {e}"
)
if attempt == 5:
raise RuntimeError(
f"Could not set '{status}' for tasks: {sorted(batch_ids)}"
)
time.sleep(0.5 * 2 ** (attempt - 1))
logging.info(f"Successfully updated status for {len(task_ids)} tasks.")
def requests_session():
s = requests.Session()
retries = Retry(total=10, backoff_factor=0.5, status_forcelist=[503])
s.mount('http://', HTTPAdapter(max_retries=retries))
s.mount('https://', HTTPAdapter(max_retries=retries))
return s
stop_event = threading.Event() # Event to signal threads to stop
CONFIG = {
'logfile': None,
'loglevel': 'DEBUG',
'targets_export_path': None,
'default_target': {},
'targets': [],
'rbprod_etcd_user': 'rbhadoop',
'rbprod_etcd_addr': ["rbzoo%s.i:9443" % x for x in range(1, 6)],
'rbprod_hdfs_pref_ips': None,
'rbprod_hdfs_cert': None,
'ciri_baseurl': None,
'ciri_auth_token': None,
'statsd': None,
}
def process(ytclient, output_table, config, table_path, load_date, task):
logging.info(CONFIG)
dst_datestr = load_date.strftime("%Y-%m-%d")
src_datestr = load_date.strftime(task.get("src_date_format", "%Y%m%d"))
if task.get("src_files_grouped_by_date", True):
filename_regex = task.get("filename_regex", '^[^\.].*\.(zst|gz|lz4|bz2|json)')
else:
filename_regex = task.get("filename_regex", f'^{src_datestr}.*\.(zst|gz|lz4|bz2|json)')
loading_mode = 'current'
BIN_DIR = yt.ypath.ypath_join(task["yt_cluster"]["base_path"], "sys/bin")
CERT_DIR = yt.ypath.ypath_join(task["yt_cluster"]["base_path"], "sys/secure/cert")
output_table_path = yt.ypath.ypath_join(output_table, dst_datestr)
success_attr_path = yt.ypath.ypath_join(output_table_path, '@seen_success')
output_table_path_live = output_table_path + "_live"
if task.get("src_files_grouped_by_date", True):
input_table_path = os.path.join(table_path, src_datestr)
else:
input_table_path = os.path.join(table_path)
if not ytclient.exists(output_table):
logging.info(f"Map node {output_table} is absent, creating...")
ytclient.create("map_node", output_table, recursive=True, force=True)
last_seen_file = False
try:
last_seen_file = ytclient.get(yt.ypath.ypath_join(output_table_path, "@last_seen_file"))
except yt.errors.YtResolveError:
pass
logging.info(f"Process: {output_table_path}, last_seen_file {last_seen_file}")
opacl = [{'subjects': ['adtech_admins', 'o.vinogradov'], 'action': 'allow', 'permissions': ['read', 'manage']}]
schema = (
yt.schema.TableSchema()
.add_column("path", ti.Utf8)
.add_column("type", ti.Utf8)
.add_column("length", ti.Uint64)
.add_column("mtime", ti.Uint64)
)
list_table = ytclient.create_temp_table(
path=config["remote_temp_tables_directory"],
attributes={"schema": schema},
expiration_timeout=60 * 60 * 1000,
)
logging.info(f"list_table: {list_table}")
uploaded_files_schema = (
yt.schema.TableSchema()
.add_column("date", ti.Date)
.add_column("path", ti.Utf8)
)
certkey_name = 'yt-adtech-load'
logging.info(task.get('load_over_s3', False))
if task.get('load_over_s3', False):
src_url = task.get('src_url', "https://running.s3storage.hdfs-misc.clouds.adcld.ru:9443")
protocol, hostname = src_url.split("://")
hdfs_proxy_addrs = ','.join([
f"{protocol}://" + v
for v in get_ip_addresses_for_url(f"http://{hostname}", True)
])
elif task.get('load_over_rbhp-hdfs-proxy', False):
hdfs_proxy_addrs = "https://hdfs-proxy-vip.rbdev.mail.ru:9443"
certkey_name = 'rbhp-hdfs-proxy'
else:
hdfs_proxy_addrs = ','.join(["https://" + v for v in get_rbprodhdfs_proxy_addrs()])
logging.info(hdfs_proxy_addrs)
if task.get("use_parallel_daily_jobs", False):
meta_path = f"meta/{dst_datestr}"
else:
meta_path = f"meta"
output_table_uploaded_files = output_table + f"/{meta_path}/uploaded_files_as_table"
output_table_uploaded_files_commited = output_table_uploaded_files + "_commited"
if not ytclient.exists(os.path.join(output_table, meta_path)):
logging.info("Meta map node is absent, creating...")
ytclient.create(
"map_node",
os.path.join(output_table, meta_path),
recursive=True
)
secure_vault = {
"YT_TOKEN": CONFIG['yt_token'],
}
update_env_script = "update_operation_env.sh"
command = " ".join(
[
f"source {update_env_script} &&",
f"YT_PROXY={task['yt_cluster']['yt_proxy']}",
"YT_ALLOW_HTTP_REQUESTS_TO_YT_FROM_JOB=1",
f"HDFS_PROXY_URLS='{hdfs_proxy_addrs}'",
f"TLS_CRT_PATH='{certkey_name}.crt'",
f"TLS_KEY_PATH='{certkey_name}.key'",
f"./{task['yt_cluster']['rbhdfspull_yt_path'].split('/')[-1]} ls {input_table_path}",
f"--skip-file-list-path {output_table_uploaded_files} ",
f"--success-attr-path {success_attr_path}",
(f"--filename-regex '{filename_regex}' " if filename_regex else ""),
f"--limit {task.get('parallel_files_load', 10000)}",
]
)
logging.info(command)
spec_builder = (
yt.spec_builders.VanillaSpecBuilder()
.title("Listing hdfs %s" % input_table_path)
.pool(task["yt_cluster"]["pool"])
.acl(opacl)
.secure_vault(secure_vault)
.max_failed_job_count(3)
.begin_task("hdfs ls")
.command(command)
.file_paths(
[
task['yt_cluster']["rbhdfspull_yt_path"],
yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.crt"),
yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.key"),
yt.ypath.ypath_join(BIN_DIR, f"{update_env_script}"),
]
)
.job_count(1)
.memory_limit(2 * 1024 * 1024 * 1024)
.environment(
{
"YT_ALLOW_HTTP_REQUESTS_TO_YT_FROM_JOB": "1",
"DATATRANSFER_HTTP_PROXY": task["yt_cluster"]["datatransfer_http_proxy"]
if task.get("use_proxy", False) else "",
}
)
.output_format(yt.JsonFormat())
.output_table_paths([list_table]) # PATCHED
.end_task()
)
if not ytclient.exists(output_table_uploaded_files):
ytclient.create("table", output_table_uploaded_files, attributes={"schema": uploaded_files_schema},
recursive=True)
if not ytclient.exists(output_table_uploaded_files_commited):
ytclient.create("table", output_table_uploaded_files_commited, attributes={"schema": uploaded_files_schema},
recursive=True)
logging.info(output_table_uploaded_files)
logging.info(output_table_uploaded_files_commited)
logging.info(output_table_path_live)
if ytclient.exists(output_table_path_live):
ytclient.remove(output_table_path_live)
ytclient.run_operation(spec_builder)
list_count = ytclient.get(yt.ypath.ypath_join(list_table, "@row_count"))
if list_count == 0 and loading_mode != "previous":
logging.info("Nothing to load")
return
with ytclient.Transaction() as t:
ytclient.move(output_table_uploaded_files, output_table_uploaded_files_commited, force=True)
if not ytclient.exists(list_table):
ytclient.create("table", list_table, recursive=True)
if list_count == 0 and loading_mode == 'previous':
ytclient.set(yt.ypath.ypath_join(output_table_path, "@success"), "true")
if ytclient.exists(output_table_uploaded_files_commited):
pass
return
logging.info(list_count)
last_file_in_list_table = ytclient.read_table(f"{list_table}" + "{path}" + f"[#{list_count - 1}:#{list_count}]",
format="json")
last_file_in_list_table = list(last_file_in_list_table)[0]['path'].split('/')[-1]
logging.info(last_file_in_list_table)
logging.info(f"list_count: {list_count}")
data_format, schema, additional_env_params = parse_data_format(task)
output_table = yt.TablePath(output_table_path_live)
output_table.attributes["schema"] = schema
job_count = list_count
if job_count <= 0:
job_count = 1
spec_builder = (
yt.spec_builders.MapSpecBuilder()
.title("Load %s" % output_table_path_live)
.pool(task["yt_cluster"]["pool"])
.acl(opacl)
.pool_trees([task["yt_cluster"]["pool_trees"]])
.input_table_paths([list_table])
.output_table_paths([output_table])
.job_count(job_count)
.begin_mapper()
.memory_limit(2 * 1024 * 1024 * 1024)
.command(
" ".join(
(
f"HDFS_PROXY_URLS='{hdfs_proxy_addrs}'",
f"TLS_CRT_PATH='{certkey_name}.crt'",
f"TLS_KEY_PATH='{certkey_name}.key'",
f"./{task['yt_cluster']['rbhdfspull_yt_path'].split('/')[-1]} {data_format}",
f"--timeout {task.get('timeout_per_one_load', 1800)}",
f"--shard-position {task.get('shard_position', 2)}",
)
)
)
.input_format(yt.JsonFormat())
.output_format(yt.YsonFormat())
.file_paths(
[
task['yt_cluster']["rbhdfspull_yt_path"],
yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.crt"),
yt.ypath.ypath_join(CERT_DIR, f"{certkey_name}.key"),
]
)
.environment(
{
"DATATRANSFER_HTTP_PROXY": task["yt_cluster"]["datatransfer_http_proxy"]
if task.get("use_proxy", False) else "",
**additional_env_params
}
)
.end_mapper().spec({"max_failed_job_count": 10000})
)
ytclient.run_operation(spec_builder)
if not ytclient.exists(output_table_path):
ytclient.move(output_table_path_live, output_table_path)
else:
ytclient.run_merge([output_table_path, output_table_path_live], output_table_path, mode="auto")
ytclient.set(yt.ypath.ypath_join(output_table_path, "@last_seen_file"), last_file_in_list_table)
def get_rbprodhdfs_proxy_addrs():
upsync_nodes = None
etcd_addr = CONFIG['rbprod_etcd_addr']
random.shuffle(etcd_addr)
for h in etcd_addr:
url = "http://%s:%s@%s/v2/keys/upsync/rbhdfs-proxy/" % (CONFIG['rbprod_etcd_user'],
CONFIG['rbprod_etcd_password'], h)
r = requests_session().get(url, timeout=10)
if r.status_code == 200:
try:
upsync_nodes = json.loads(r.text)['node']['nodes']
break
except Exception as e:
logging.info(f"ERROR!: get_rbprodhdfs_proxy_addrs: {e}")
pass
if not upsync_nodes:
return
nodes = []
other_nodes = []
random.shuffle(upsync_nodes)
for node in upsync_nodes:
ip, port = node['key'].split('/')[3].split(':')
addr = "%s:%s" % (ip, port)
other_nodes.append(addr)
if not nodes:
nodes += other_nodes
if nodes:
logging.info(nodes[1])
return nodes
logging.info("ERROR!: get_rbprodhdfs_proxy_addrs: empty list")
def worker(task):
cluster_name = task.get("cluster", "jupiter")
yt_cluster = CONFIG.get("yt_clusters", {}).get(cluster_name)
if not yt_cluster:
logging.info(f"ERROR!: Cluster config for {cluster_name} not found")
return
config = yt.default_config.get_config_from_env()
config["proxy"]["url"] = yt_cluster["yt_proxy"]
config["proxy"]["enable_proxy_discovery"] = False
config["create_table_attributes"] = {"compression_codec": "zstd_5"}
config["remote_temp_tables_directory"] = f"{yt_cluster['base_path']}/tmp"
config["token"] = CONFIG["yt_token"]
config["proxy"]["operation_link_pattern"] = (
f"https://yt.vk.team/{cluster_name}/operations/{{id}}/details"
)
task["yt_cluster"] = yt_cluster
src = task['src']
dst = task['dst']
table_path = src
output_table = dst
while not stop_event.is_set():
backfill_depth = task.get("backfill_depth", 2)
days_lag = task.get("days_lag", 0)
now = datetime.now() - timedelta(days_lag)
days_to_load = reversed([
now - timedelta(days=i) for i in range(backfill_depth)
])
threads = []
for day in days_to_load:
name = f"{task['src']}->{task['dst']}::{day.strftime('%Y-%m-%d')}"
client = yt.YtClient(config=config)
t = threading.Thread(target=process,
args=(client, output_table, config, table_path, day, task),
name=name
)
threads.append(t)
process_threads(threads, concurrent=task.get("use_parallel_daily_jobs", False))
def process_threads(threads, concurrent: bool = False):
def wait_result(thread):
try:
thread.join()
except Exception as e:
logging.info(f"ERROR!: Error in process {thread._name}: {e}")
raise
else:
logging.info(f"Successfully finished {thread._name}")
if concurrent:
for t in threads:
logging.info(f"Starting {t._name}")
t.start()
for t in threads:
wait_result(t)
else:
for t in threads:
logging.info(f"Starting {t._name}")
t.start()
wait_result(t)
def signal_handler(sig, frame):
logging.info("Signal received, shutting down.")
stop_event.set()
def main():
global CONF_PATH
signal.signal(signal.SIGTERM, signal_handler)
# Optionally handle SIGINT for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
if len(sys.argv) > 1:
CONF_PATH = sys.argv[1]
with open(CONF_PATH) as f:
conf = yaml.safe_load(f)
CONFIG.update(conf)
os.environ['YT_TOKEN'] = CONFIG['yt_token']
# Set up logging
setup_logging()
adjust_loggers()
# Get tasks from CONFIG['targets'], each containing 'src' and 'dst'
tasks = CONFIG.get('targets', [])
if not tasks:
logging.info("ERROR!: No tasks found in CONFIG['targets']")
sys.exit(1)
threads = []
if CONFIG.get("use_new_loader", True):
NL = NewLoader()
t = threading.Thread(target=NL.worker, args=(), name="NewLoader")
t.start()
threads.append(t)
for task in tasks:
t = threading.Thread(target=worker, args=(task,), name=f"{task['src']}->{task['dst']}")
t.start()
threads.append(t)
# Wait for threads to complete
try:
for t in threads:
t.join()
except KeyboardInterrupt:
logging.info("KeyboardInterrupt received, stopping threads")
stop_event.set()
for t in threads:
t.join()
except Exception as e:
logging.info(f"ERROR!: Unexpected error: {e}")
raise
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment