Created
August 13, 2020 14:34
-
-
Save stephenpascoe/bd224c3a970d1cbe2f9d62bbd83eaec2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Convert a Dynamics365 input CSV (separated by "|") into a EDI record format. | |
| usage: dynamics_edi_940.py --input <filename> --output <filename> | |
| """ | |
| import re | |
| import datetime as dt | |
| import sys | |
| import argparse | |
| MAX_FIELD_SIZE = 40 | |
| # ---------------------------------------------------------------------------- | |
| # EDI Output Templates and constants | |
| EDI_ROW = '''LX*{RowNum}~ | |
| W01*{qty}*EA**PL*{inventTransId}*ND*{itemId}******{InventBatchId}~ | |
| ''' | |
| FIELD_LIST_AX2012 = [ | |
| 'CurrencyCode', 'Address', 'City', 'CountryRegionId', 'County', 'customer', | |
| 'DlvDate', 'DlvTerm', 'HeaderBoxText', 'InventLocationId', 'inventTransId', | |
| 'itemId', 'ZipCode', 'LineBoxText', 'Organisation', 'pickingRouteID', 'qty', | |
| 'SalesId', 'SalesPrice', 'ShipCarrierDeliveryContact', 'Street', 'TelePhone', | |
| 'InventBatchId', 'VATNum' | |
| ] | |
| FIELD_LIST_D365 = [ | |
| 'Address', 'Address2', 'City', 'CountryRegionId', 'County', 'CurrencyCode', 'Customer', | |
| 'DlvDate', 'DlvTerm', 'HeaderBoxText', 'InventBatchId', 'InventLocationId', 'inventTransId', | |
| 'itemId', 'LineBoxText', 'Name2', 'Organisation', 'pickingRouteID', 'qty', 'SalesId', | |
| 'SalesPrice', 'ServiceLevel', 'ShippingCarrierDeliveryContract', 'Street', 'Street2', 'TelePhone', | |
| 'TransportService', 'TransportServiceCode', 'VatNum', 'ZipCode' | |
| ] | |
| HEADER_TMPL = """ISA*00* *00* *ZZ*OXFORDNANOPORE *ZZ*DLSS01 *{fileProcessYYMMDD}*{fileProcessHHMM}*U*00401*{interchangeControlNumber}*0*P*`~ | |
| GS*OW*OXFORDNANOPORE*DLSS01*{fileProcessYYYYMMDD}*{fileProcessHHMM}*{interchangeControlNumber}*X*004010~ | |
| ST*940*0001~ | |
| W05*N*{pickingRouteID}*{SalesId}***30~ | |
| N1*ST*{Organisation}*91*ONTSHIPTO~ | |
| N3*{Street}~ | |
| N4*{City}*{County}*{ZipCode}*{CountryRegionId}~ | |
| G62*04*{DlvDate}~""" | |
| NTE_TMPL = "NTE*WHI*{ShipCarrierDeliveryContact}~" | |
| FOOTER_TMPL = """SE*{numIncludedSegments}*0001~ | |
| GE*1*{interchangeControlNumber}~ | |
| IEA*1*{interchangeControlNumber}~""" | |
| ROW_TMPL = """LX*{RowNum}~ | |
| W01*{qty}*EA**PL*{inventTransId}*ND*{itemId}******{InventBatchId}~""" | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Convert D365 CSV to EDI") | |
| parser.add_argument('--input', '-i', help='Input filename') | |
| parser.add_argument('--output', '-o', help='Output filename') | |
| args = parser.parse_args() | |
| with open(args.input) as fh: | |
| rows = read_csv(fh) | |
| with open(args.output, 'w') as fh: | |
| write_edi(rows, fh) | |
| # ---------------------------------------------------------------------------- | |
| # Input functions | |
| def read_csv(fh): | |
| rows = [] | |
| row_num = 1 | |
| for row in iter_rows(fh): | |
| row_cleaned = {k: clean(k, row[k]) for k in row} | |
| row_cleaned['interchangeControlNumber'] = pr_to_icn(row_cleaned['pickingRouteID']) | |
| row_cleaned['RowNum'] = row_num | |
| rows.append(row_cleaned) | |
| row_num += 1 | |
| return rows | |
| def clean(key, value): | |
| if value: | |
| if key == 'qty': | |
| cleaned = parse_qty(value) | |
| elif key == 'Street': | |
| cleaned = parse_street(value) | |
| else: | |
| try: | |
| cleaned = int(value.strip()) | |
| except ValueError: | |
| try: | |
| cleaned = float(value.strip()) | |
| except ValueError: | |
| cleaned = value.strip() | |
| else: | |
| cleaned = value.strip() | |
| return cleaned | |
| def iter_rows(fh): | |
| for line in fh: | |
| row = line.split('|') | |
| row_dict = {k: v for (k, v) in zip(FIELD_LIST_D365, row)} | |
| yield row_dict | |
| def pr_to_icn(pr): | |
| if type(pr) != str: | |
| raise ValueError('Unexpected value of pickingRouteID: {}'.format(repr(pr))) | |
| mo = re.match(r'(PR|OXUS).*?(\d+)', pr) | |
| if not mo: | |
| raise ValueError('Cannot parse {}'.format(pr)) | |
| icn = int(mo.group(2)) | |
| if mo.group(1) == 'PR': | |
| prefix = 1 | |
| elif mo.group(1) == 'OXUS': | |
| prefix = 2 | |
| else: | |
| raise ValueError('Cannot parse {}'.format(pr)) | |
| return '{}{:08d}'.format(prefix, icn) | |
| def parse_qty(qty): | |
| mo = re.match(r'\s*(\d+)', qty) | |
| if not mo: | |
| raise ValueError('Cannot parse "{}"'.format(qty)) | |
| return int(mo.group(1)) | |
| def parse_street(street): | |
| street = ''.join(street.split('"')) | |
| streets = [x.strip() for x in street.split('^')] | |
| return '*'.join([x[:MAX_FIELD_SIZE] for x in streets]) | |
| # ---------------------------------------------------------------------------- | |
| # Output functions | |
| def write_edi(in_rows, fh): | |
| header = make_header(in_rows[0]) | |
| out_rows = make_rows(in_rows) | |
| num_segments = len(header) + len(out_rows) - 1 | |
| footer = make_footer(num_segments, in_rows[0]['interchangeControlNumber']) | |
| print('\n'.join(header + out_rows + footer), file=fh) | |
| def make_header(header): | |
| now = dt.datetime.now() | |
| header = HEADER_TMPL.format(fileProcessYYYYMMDD=now.strftime('%Y%m%d'), | |
| fileProcessHHMM=now.strftime('%H%M'), | |
| fileProcessYYMMDD=now.strftime('%y%m%d'), | |
| **header) | |
| return header.split('\n') | |
| def make_rows(rows): | |
| out_rows = [] | |
| for row in rows: | |
| out_rows += ROW_TMPL.format(**row).split('\n') | |
| return out_rows | |
| def make_footer(num_segments, interchange_control_number): | |
| return FOOTER_TMPL.format(numIncludedSegments=num_segments, | |
| interchangeControlNumber=interchange_control_number).split('\n') | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment