Skip to content

Instantly share code, notes, and snippets.

@goors
Created August 26, 2025 13:37
Show Gist options
  • Select an option

  • Save goors/cce159ecd9704e89e11f6fc576a44d90 to your computer and use it in GitHub Desktop.

Select an option

Save goors/cce159ecd9704e89e11f6fc576a44d90 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
voyage_from_logabstract_final.py
End-to-end pipeline:
- Read LOG_ABSTRACT.csv and BUNKER_SUMMARY.csv
- Segment voyages from event log
- Compute energy (MJ), gCO2e, compliance balance vs FuelEU target
- Determine EU scope from alpha-2 prefixes of from/to UNLOCODEs
- Compute fuel splits (MDO/MGO, HFO, LFO, LNG by system)
- Group voyages with the same route (from/to/scope) into single rows
- Compute FuelEU penalty with configurable €/tCO2e
- Export voyage_template.csv with the desired column order
Usage example:
python voyage_from_logabstract_final.py \
--log 20250808_CAROLE-M_LogAbstract.csv \
--bdn 20250808_CAROLE-M_BunkerSummary.csv \
--ship-file-name 20250808_CAROLE-M_LogAbstract.csv \
--cb-price 240
"""
import argparse, os, re, math, sys
from datetime import datetime
from typing import Optional, Dict, Any, Tuple, List
import pandas as pd
from pydantic import ValidationError
from app.contracts.ships.ship_voyage_validation import ShipVoyageValidation
# ----------------------- CONSTANTS -----------------------
FUELEU_REF_GPMJ = 91.16
REDUCTIONS = [(2025,0.02),(2030,0.06),(2035,0.145),(2040,0.31),(2045,0.62),(2050,0.80)]
DEFAULT_LHV = {"HFO":40.4,"LFO":41.2,"VLSFO":41.2,"MDO":42.7,"MGO":42.7,"LNG":48.0,"BLEND":41.2}
FALLBACK_GPMJ = 91.2
EU_ALPHA2 = {
"AT","BE","BG","HR","CY","CZ","DK","EE","FI","FR","DE","GR","HU",
"IE","IT","LV","LT","LU","MT","NL","PL","PT","RO","SK","SI","ES","SE"
}
VOYAGE_COLUMNS = [
"operatorName","operatorImo",
"shipName","shipImo",
"startDate","endDate",
"fromPortName","fromPortCode","toPortName","toPortCode",
"scope",
"timeAtSea","anchorHrs","distanceTravelled","cargoOnboard","transportWork",
"mdoMgo","hfo","lfo","lngMainEngine","lngAuxiliaryEngine","lngBoiler","lngIgg",
"biofuel_1","biofuel_2","biofuel_3","biofuel_4","biofuelData",
"totalCO2","totalCO2EUMRV","totalCH4EUMRV","totalN2OEUMRV","totalCO2EUETS","finalEUAs",
"ghgieActual","ghgieTarget","total_gCo2eq","energy_mj","complianceBalance",
"fuelEUPenalty"
]
# ----------------------- HELPERS -----------------------
def f(x, default=0.0) -> float:
"""Tolerant float parse."""
try:
if pd.isna(x): return float(default)
if isinstance(x, str) and x.strip()=="":
return float(default)
return float(x)
except Exception:
return float(default)
def to_ts(date_str, time_str) -> Optional[datetime]:
"""Parse timestamps with flexible formats."""
if pd.isna(date_str): return None
s = str(date_str).strip()
if time_str is not None and not (isinstance(time_str,float) and math.isnan(time_str)):
s = f"{s} {str(time_str).strip()}"
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M",
"%d.%m.%Y %H:%M", "%Y-%m-%d", "%d/%m/%Y %H:%M"):
try:
return datetime.strptime(s, fmt)
except Exception:
continue
return None
def reduction_for(year: int) -> float:
red = 0.0
for y,r in REDUCTIONS:
if year >= y: red = r
return red
def lhv_for_fuel(ftype: Optional[str]) -> float:
up = (ftype or "").upper()
if up in DEFAULT_LHV: return DEFAULT_LHV[up]
if "HFO" in up: return DEFAULT_LHV["HFO"]
if "VLSFO" in up or "LSFO" in up or "LFO" in up: return DEFAULT_LHV["LFO"]
if "MGO" in up or "MDO" in up or "DMA" in up or "DMX" in up: return DEFAULT_LHV["MGO"]
if "LNG" in up: return DEFAULT_LHV["LNG"]
if "BLEND" in up: return DEFAULT_LHV["BLEND"]
return 42.7
def extract_ship_name(fname: str) -> str:
base = os.path.basename(fname)
name = re.sub(r"\.[^.]+$","",base)
parts = name.split("_")
return parts[1] if len(parts)>=2 else parts[0]
def first_non_null_unique(series: pd.Series) -> str:
vals = [str(v) for v in series.dropna().unique() if str(v).strip()!=""]
if not vals: return ""
if len(vals)>1:
print(f"[warn] Multiple values found, taking first of: {vals}", file=sys.stderr)
return vals[0]
def build_bdn_map(bdn_df: pd.DataFrame) -> Dict[str, Dict[str,Any]]:
out = {}
for _,r in bdn_df.iterrows():
bdn = str(r.get("BDN_Number","")).strip()
if not bdn: continue
fuel = r.get("Fuel_Type","")
lcv = r.get("EU_Lower_Calorific_Value", None)
ghg = r.get("EU_GHG_Intensity", None)
cf = r.get("TtW_CO2_CF", None)
lcv_val = f(lcv, lhv_for_fuel(fuel))
if pd.isna(lcv) or lcv=="":
lcv_val = lhv_for_fuel(fuel)
ghg_val = f(ghg, FALLBACK_GPMJ)
out[bdn] = {"fuel": fuel, "lcv": lcv_val, "ghg": ghg_val, "cf": cf}
return out
def is_eu_alpha2(code: str) -> bool:
if not code or len(str(code)) < 2:
return False
c = str(code)[:2].upper()
if c == "UK": c = "GB"
if c == "EL": c = "GR"
return c in EU_ALPHA2
def classify_and_add(totals: dict, t: float, fuelType: Optional[str], system: str):
"""Accumulate tonnage into fuel buckets by system."""
if t is None: return
try:
t = float(t)
except Exception:
return
if t <= 0: return
up = (fuelType or "").upper()
if "LNG" in up:
if system=="ME": totals["lngMainEngine"] += t
elif system=="AE": totals["lngAuxiliaryEngine"] += t
else: totals["lngBoiler"] += t
elif ("MGO" in up) or ("MDO" in up) or ("DMA" in up) or ("DMX" in up):
totals["mdoMgo"] += t
elif "HFO" in up:
totals["hfo"] += t
elif ("VLSFO" in up) or ("LSFO" in up) or ("LFO" in up):
totals["lfo"] += t
# ----------------------- PIPELINE -----------------------
def run_pipeline_via_file(log_path: str, bdn_path: str, ship_file_name: str, company: str, company_imo: int, cb_price: Optional[float] = None) -> pd.DataFrame:
E = pd.read_csv(log_path)
B = pd.read_csv(bdn_path)
return run_pipeline(E=E, B=B, ship_file_name=ship_file_name, company=company, company_imo=company_imo, cb_price=cb_price)
def run_pipeline(E, B, ship_file_name: str, company: str, company_imo: int, cb_price: Optional[float] = None) -> pd.DataFrame:
# Load & sort events
date_col = "Date_UTC" if "Date_UTC" in E.columns else next((c for c in E.columns if c.lower()=="date_utc"), None)
time_col = "Time_UTC" if "Time_UTC" in E.columns else next((c for c in E.columns if c.lower()=="time_utc"), None)
if date_col is None:
raise RuntimeError("Missing Date_UTC column in LOG_ABSTRACT")
E["_ts"] = E.apply(lambda r: to_ts(r.get(date_col), r.get(time_col)), axis=1)
E = E.sort_values("_ts", kind="mergesort").reset_index(drop=True)
bdn_map = build_bdn_map(B)
# Ship meta
imo_col = "IMO" if "IMO" in E.columns else next((c for c in E.columns if c.upper()=="IMO"), None)
shipImo = first_non_null_unique(E[imo_col]) if imo_col else ""
shipName = extract_ship_name(ship_file_name)
# Voyage segmentation
voyages: List[Dict[str,Any]] = []
curVoy: Optional[Dict[str,Any]] = None
curLeg: Optional[Dict[str,Any]] = None
def close_leg(leg, endTs):
leg["endTs"] = endTs
if leg["energy_mj"] > 0:
mid = leg["startTs"] + (endTs - leg["startTs"]) / 2
target = FUELEU_REF_GPMJ * (1 - reduction_for(mid.year))
actual = leg["w_g"] / leg["energy_mj"]
leg["comp_g"] = (target - actual) * leg["energy_mj"]
leg["ghgieTarget"] = target
else:
leg["comp_g"] = 0.0
leg["ghgieTarget"] = ""
return leg
if "Event" not in E.columns:
raise RuntimeError("Missing Event column in LOG_ABSTRACT")
from_col = "Voyage_From" if "Voyage_From" in E.columns else next((c for c in E.columns if c.lower()=="voyage_from"), None)
to_col = "Voyage_To" if "Voyage_To" in E.columns else next((c for c in E.columns if c.lower()=="voyage_to"), None)
for _,ev in E.iterrows():
ts = ev["_ts"]
event = str(ev["Event"]).strip() if not pd.isna(ev["Event"]) else ""
if not ts: continue
if event in ["Begin Of Sea Passage","Departure","Heave Anchor","Leaving Canal"]:
if curLeg is not None:
curLeg = close_leg(curLeg, ts)
curVoy["legs"].append(curLeg)
curLeg = None
if curVoy is None:
curVoy = {"legs":[], "startTs": ts, "fromCode": ev.get(from_col, None), "toCode": None, "endTs": None}
curLeg = {"type":"SEA","startTs": ts, "fromCode": ev.get(from_col, None), "toCode": ev.get(to_col, None),
"distance_nm":0.0, "energy_mj":0.0, "w_g":0.0, "snapshots":[]}
elif event in ["End Of Sea Passage","Arrival","Drop Anchor","Entering Canal"]:
if curLeg is not None and curLeg["type"]=="SEA":
curLeg = close_leg(curLeg, ts)
curVoy["legs"].append(curLeg)
curLeg = None
if curVoy is None:
curVoy = {"legs":[], "startTs": ts, "fromCode": ev.get(from_col, None), "toCode": None, "endTs": None}
curVoy["endTs"] = ts
if ev.get(to_col) and (not curVoy.get("toCode")):
curVoy["toCode"] = ev.get(to_col)
if event in ["Arrival","End Of Sea Passage"]:
voyages.append(curVoy)
curVoy = None
else:
if curLeg is not None:
# energy and emissions + capture snapshots for fuel splits
systems = [
("ME","ME_Consumption","ME_Fuel_BDN"),
("AE","AE_Consumption","AE_Fuel_BDN"),
("BOILER","Boiler_Consumption","Boiler_Fuel_BDN"),
("DPP","DPP_Consumption","DPP_Cargo_Pump_Fuel_BDN"),
]
for sys_name, t_col, bdn_col in systems:
t = f(ev.get(t_col,0), 0.0)
if t <= 0: continue
bdnNo = ev.get(bdn_col)
info = bdn_map.get(str(bdnNo).strip()) if bdnNo is not None else None
lcv = info["lcv"] if info else 42.7
gpm = info["ghg"] if info else FALLBACK_GPMJ
mj = t * 1000.0 * lcv
curLeg["energy_mj"] += mj
curLeg["w_g"] += mj * gpm
# store minimal snapshot for splits
curLeg["snapshots"].append({
"ME_Consumption": ev.get("ME_Consumption"),
"AE_Consumption": ev.get("AE_Consumption"),
"Boiler_Consumption": ev.get("Boiler_Consumption"),
"ME_Fuel_BDN": ev.get("ME_Fuel_BDN"),
"AE_Fuel_BDN": ev.get("AE_Fuel_BDN"),
"Boiler_Fuel_BDN": ev.get("Boiler_Fuel_BDN"),
})
curLeg["distance_nm"] += f(ev.get("Distance",0), 0.0)
# Close dangling structures
if curLeg is not None:
last_ts = E["_ts"].dropna().max()
curLeg = close_leg(curLeg, last_ts)
if curVoy is None:
curVoy = {"legs":[curLeg], "startTs": curLeg["startTs"], "fromCode": curLeg.get("fromCode"),
"toCode": curLeg.get("toCode"), "endTs": last_ts}
voyages.append(curVoy)
else:
curVoy["legs"].append(curLeg)
if curVoy is not None and curVoy not in voyages:
voyages.append(curVoy)
# Build per-voyage rows with fuel splits
rows = []
for V in voyages:
legs = V["legs"]
if not legs: continue
startTs = V.get("startTs", legs[0]["startTs"])
endTs = V.get("endTs", legs[-1]["endTs"])
fromCode= V.get("fromCode", None) or next((l.get("fromCode") for l in legs if l.get("fromCode")), None) or ""
toCode = V.get("toCode", None) or next((l.get("toCode") for l in reversed(legs) if l.get("toCode")), None) or ""
inFromEU = is_eu_alpha2(fromCode)
inToEU = is_eu_alpha2(toCode)
if inFromEU and inToEU: scopeVal = "EU-EU"
elif inFromEU ^ inToEU: scopeVal = "EU-nonEU"
else: scopeVal = "nonEU-nonEU"
energy_mj = round(sum(l.get("energy_mj",0.0) for l in legs), 2)
total_g = round(sum(l.get("w_g",0.0) for l in legs), 2)
comp_g = round(sum(l.get("comp_g",0.0) for l in legs), 2)
ghgieActual = (total_g/energy_mj) if energy_mj>0 else -1000000000000
leg_targets = [(l.get("ghgieTarget"), l.get("energy_mj",0.0)) for l in legs if l.get("ghgieTarget")]
ghgieTarget = round(sum(t*w for t,w in leg_targets)/sum(w for _,w in leg_targets), 6) if leg_targets else ""
# Fuel splits from snapshots
totals = {"mdoMgo":0.0,"hfo":0.0,"lfo":0.0,"lngMainEngine":0.0,"lngAuxiliaryEngine":0.0,"lngBoiler":0.0}
for leg in legs:
for snap in leg.get("snapshots", []):
tME = f(snap.get("ME_Consumption"),0.0)
tAE = f(snap.get("AE_Consumption"),0.0)
tBO = f(snap.get("Boiler_Consumption"),0.0)
ftME = bdn_map.get(str(snap.get("ME_Fuel_BDN")).strip(),{}).get("fuel")
ftAE = bdn_map.get(str(snap.get("AE_Fuel_BDN")).strip(),{}).get("fuel")
ftBO = bdn_map.get(str(snap.get("Boiler_Fuel_BDN")).strip(),{}).get("fuel") or ftAE
classify_and_add(totals, tME, ftME, "ME")
classify_and_add(totals, tAE, ftAE, "AE")
classify_and_add(totals, tBO, ftBO, "BOILER")
row = {c:"" for c in VOYAGE_COLUMNS}
if (
startTs is None
or endTs is None
or ghgieActual == -1000000000000 or ghgieActual is None
or energy_mj in (None, '')
or comp_g in (None, '')
):
continue
hours = 0
if startTs and endTs:
delta = endTs - startTs
hours = delta.total_seconds() / 3600
anchorHrs=0
row.update({
# "operatorName":"", pull from company
# "operatorImo":"", pull from company
"iceClass": "1C",
"shipName": shipName,
"shipImo": shipImo,
"startDate": startTs.isoformat(),
"endDate": endTs.isoformat(),
"legMode": "Unknown",
"fromPortName": f"Port - {fromCode}",
"fromPortCode": fromCode,
"toPortName": f"Port - {toCode}",
"toPortCode": toCode,
"scope": scopeVal,
"timeAtSea": hours,
"anchorHrs": anchorHrs,
"iceCondition":"No",
"distanceTravelled": round(sum(l.get("distance_nm",0.0) for l in legs), 2),
"cargoOnboard":"", "transportWork":"",
"mdoMgo": round(totals["mdoMgo"],3),
"hfo": round(totals["hfo"],3),
"lfo": round(totals["lfo"],3),
"lngMainEngine": round(totals["lngMainEngine"],3),
"lngAuxiliaryEngine": round(totals["lngAuxiliaryEngine"],3),
"lngBoiler": round(totals["lngBoiler"],3),
"lngIgg": 0,
"biofuel_1":"", "biofuel_2":"", "biofuel_3":"", "biofuel_4":"", "biofuelData":"",
"totalCO2":"", "totalCO2EUMRV":"", "totalCH4EUMRV":"", "totalN2OEUMRV":"", "totalCO2EUETS":"", "finalEUAs":"",
"ghgieActual": round(ghgieActual, 6),
"ghgieTarget": ghgieTarget,
"total_gCo2eq": total_g,
"energy_mj": energy_mj,
"complianceBalance": comp_g,
"fuelEUPenalty": "" # filled after grouping if cb_price provided
})
rows.append(row)
df = pd.DataFrame(rows, columns=VOYAGE_COLUMNS)
# Group voyages with same route (shipName, shipImo, from/to, scope)
if not df.empty:
df["_start"] = pd.to_datetime(df["startDate"], errors="coerce")
df["_end"] = pd.to_datetime(df["endDate"], errors="coerce")
group_cols = ["shipName","shipImo","fromPortCode","toPortCode","scope"]
numeric_sum_cols = ["distanceTravelled","mdoMgo","hfo","lfo","lngMainEngine","lngAuxiliaryEngine","lngBoiler",
"total_gCo2eq","energy_mj","complianceBalance"]
def weighted_target(g: pd.DataFrame) -> float:
e = g["energy_mj"].sum()
if e <= 0: return float("nan")
return (pd.to_numeric(g["ghgieTarget"], errors="coerce").fillna(0) * g["energy_mj"]).sum() / e
agg = df.groupby(group_cols, dropna=False).agg(
startDate_min=("_start","min"),
endDate_max=("_end","max"),
**{col:(col,"sum") for col in numeric_sum_cols}
).reset_index()
targets = df.groupby(group_cols, dropna=False).apply(weighted_target).reset_index(name="ghgieTarget_weighted")
agg = agg.merge(targets, on=group_cols, how="left")
agg["ghgieActual"] = agg.apply(lambda r: (r["total_gCo2eq"]/r["energy_mj"]) if r["energy_mj"]>0 else df["ghgieActual"], axis=1)
agg["startDate"] = agg["startDate_min"].dt.strftime("%Y-%m-%d %H:%M:%S")
agg["endDate"] = agg["endDate_max"].dt.strftime("%Y-%m-%d %H:%M:%S")
#print(df)
# Build final df_out with column order
df_out = pd.DataFrame([{c:"" for c in VOYAGE_COLUMNS} for _ in range(len(agg))])
df_out["operatorName"] = company
df_out["operatorImo"] = company_imo
df_out["shipName"] = agg["shipName"]
df_out["shipImo"] = agg["shipImo"]
df_out["startDate"] = agg["startDate"]
df_out["endDate"] = agg["endDate"]
df_out["fromPortName"] = df["fromPortName"]
df_out["fromPortCode"] = agg["fromPortCode"]
df_out["toPortName"] = df["toPortName"]
df_out["toPortCode"] = agg["toPortCode"]
df_out["scope"] = agg["scope"]
df_out["timeAtSea"] = df["timeAtSea"]
df_out["anchorHrs"] = df["anchorHrs"]
df_out["distanceTravelled"] = agg["distanceTravelled"]
df_out["cargoOnboard"] = None
df_out["transportWork"] = None
df_out["iceClass"] = "1C"
df_out["legMode"] = "Unknown"
df_out["iceCondition"] = "No"
df_out["mdoMgo"] = agg["mdoMgo"].round(3)
df_out["hfo"] = agg["hfo"].round(3)
df_out["lfo"] = agg["lfo"].round(3)
df_out["lngMainEngine"] = agg["lngMainEngine"].round(3)
df_out["lngAuxiliaryEngine"] = agg["lngAuxiliaryEngine"].round(3)
df_out["lngBoiler"] = agg["lngBoiler"].round(3)
df_out["lngIgg"] = 0
df_out["biofuel_1"] = None
df_out["biofuel_2"] = None
df_out["biofuel_3"] = None
df_out["biofuel_4"] = None
df_out["biofuelData"] = None
df_out["totalCO2"] = None
df_out["totalCO2EUMRV"] = None
df_out["totalCH4EUMRV"] = None
df_out["totalN2OEUMRV"] = None
df_out["totalCO2EUETS"] = None
df_out["finalEUAs"] = None
df_out["waps"] = None
df_out["pprop"] = None
df_out["pwind"] = None
df_out["slipMe"] = None
df_out["slipAe"] = None
df_out["slipBoiler"] = None
df_out["slipIgg"] = None
df_out["totalGHGEmissionsEUMRV"] = None
df_out["totalCO2EUETSWithIceClassReduction"] = None
df_out["ghgieActual"] = pd.to_numeric(agg["ghgieActual"], errors="coerce").round(6)
df_out["ghgieTarget"] = pd.to_numeric(agg["ghgieTarget_weighted"], errors="coerce").round(6)
df_out["total_gCo2eq"] = agg["total_gCo2eq"].round(2)
df_out["energy_mj"] = agg["energy_mj"].round(2)
df_out["complianceBalance"] = agg["complianceBalance"].round(2)
# FuelEU penalty if price provided
if cb_price is not None:
df_out["fuelEUPenalty"] = (df_out["complianceBalance"].abs()/1e6 * float(cb_price)).round(2)
else:
df_out["fuelEUPenalty"] = 0
else:
df_out = df.copy()
return df_out
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--log", required=True, help="Path to LOG_ABSTRACT.csv")
ap.add_argument("--bdn", required=True, help="Path to BUNKER_SUMMARY.csv")
ap.add_argument("--ship-file-name", required=True, help="Original ship file name (for shipName extraction)")
ap.add_argument("--cb-price", type=float, default=None, help="FuelEU price in €/tCO2e")
args = ap.parse_args()
df_out = run_pipeline_via_file(args.log, args.bdn, args.ship_file_name, cb_price=args.cb_price, company="Better sea", company_imo=0)
out_path = "voyage_template.csv"
print(df_out)
df_out.to_csv(out_path, index=False, float_format="%.6f")
#print(f"Wrote {out_path}")
# Validate each row
validated = []
errors = []
for i, row in df_out.iterrows():
try:
record = ShipVoyageValidation(**row.to_dict())
validated.append(record)
except ValidationError as e:
errors.append((i, e.errors()))
print("✅ Valid rows:")
for v in validated:
print(v.dict())
pass
if len(errors) > 0:
print("\n❌ Errors:")
for idx, err in errors:
print(f"Row {idx}: {err}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment