Created
June 21, 2023 23:12
-
-
Save molsonkiko/f5995f16da1b0a3e3afa703c18b5158c to your computer and use it in GitHub Desktop.
pyquery (tool for low-code data conversion with PythonScript in Notepad++)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| By Mark J. Olson, June 2023 | |
| Simple tool for querying CSVs, JSON, and tabular XML files with Python list comprehensions, etc. | |
| USAGE | |
| 1. Download PythonScript (see REFERENCES 1 below). | |
| Put it in your Notepad++ installation's plugins folder. | |
| 2. Download this file and put it in %AppData%/Roaming/Notepad++/plugins/config/PythonScript/scripts | |
| 3. Go to PythonScript plugin from the main menu, choose the Configuration option, | |
| and add this script to PythonScript's main toolbar. | |
| 4. Add a keyboard shortcut for calling the script. | |
| 5. Run the script. A dialog will appear, allowing you to choose a query and output type. | |
| You may also be prompted to use a regex if you're trying to work with a text file. | |
| Numbers are parsed as floats, some strings may be parsed as booleans, all else is text. | |
| For example, pyq("[row for row in x if row['CITY'].startswith('a')]"). | |
| The query can include: | |
| * any functions that are always available, like max, min, and sorted | |
| * the mean function (takes average of a list of numbers) | |
| * functions/classes from math, re, and collections. | |
| * a group_by function (see below) | |
| * an inner_join function for performing a SQL-style inner join (see below) | |
| * bykey and byind functions for sorting by a key/index in child iterables | |
| (e.g., sorted(x, bykey('a')) will sort by key 'a' in each child) | |
| 5. The result of the query will be displayed in a new window. | |
| For best results, move the query file to other view | |
| so that one view has your tabular file and the other view | |
| has the query results file. | |
| 6. Files are only re-parsed as needed (if a file is modified inside or outside of NPP) | |
| allowing for good performance while still updating results when changes happen | |
| 7. See STUFF_IN_ALL_CAPS to customize settings. | |
| REFERENCES: | |
| 1. https://github.com/bruderstein/PythonScript | |
| (best documentation available in doc folder when you install it. | |
| 2. https://docs.python.org/3/ (see especially the csv, xml, and json modules) | |
| 3. https://npp-user-manual.org/docs/searching/#regular-expressions | |
| TODO: | |
| 1. Add unit tests | |
| 2. Improve remembering of queries, regexes, and output formats. | |
| ''' | |
| from Npp import * | |
| import csv | |
| import collections | |
| from datetime import datetime | |
| from io import StringIO | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import re | |
| from xml.etree import ElementTree as ET | |
| ### SETTINGS | |
| DIALOG_ON_RUN = True # open a pop-up dialog to run queries when the script is run (alternative: run from PythonScript console) | |
| VALID_DATA_TYPES = {'csv', 'json', 'jsonl', 'xml', 'regex'} | |
| FILE_EXTENSION_TYPE_MAPS = { # keys are file extensions, values are in VALID_DATA_TYPES | |
| 'json': 'json', | |
| 'xml': 'xml', | |
| 'jsonl': 'jsonl', | |
| 'csv': 'csv', | |
| 'tsv': 'csv', | |
| } | |
| TYPES_TO_TEST = ['json', 'jsonl', 'xml'] # try these data types if file extension not helpful | |
| DEFAULT_OUTPUT_TYPE = 'csv' # must be in VALID_DATA_TYPES | |
| CSV_DEFAULT_OUTPUT_DIALECT = 'excel-tab' # tab-separated; use 'excel' for comma-separated | |
| AUTODETECT_BOOLEANS = True | |
| BOOLEAN_PAIRS = [ # case insensitive | |
| {'F': False, 'T': True}, | |
| {'FALSE': False, 'TRUE': True}, | |
| {'NO': False, 'YES': True}, | |
| {'N': False, 'Y': True}, | |
| ] | |
| BOOLEAN_VALS = {k: pair for pair in BOOLEAN_PAIRS for k in pair} | |
| DEFAULT_QUERY = 'x' # select all data | |
| DEFAULT_REGEX = '(?-s)^(.*)$' # select entire line | |
| ### END SETTINGS | |
| logging.basicConfig(format='%(levelname)s@%(asctime)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG) | |
| def now(): | |
| return datetime.now().timestamp() | |
| def getFileExtension(fname): | |
| for ii in range(len(fname) - 1, -1, -1): | |
| if fname[ii] == '.': | |
| break | |
| if ii == 0: | |
| return '' | |
| return fname[ii + 1:] | |
| PYQ_DIR = os.path.join(os.path.dirname(__file__), 'pyquery') | |
| num_regex = re.compile(r'[+-]?(?:(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][+-]?\d+)?|nan|inf)') | |
| #### QUERY FUNCTIONS #### | |
| def bykey(key): | |
| return lambda x: x[key] | |
| def byind(ind): | |
| return lambda x: x[ind] | |
| def mean(rows): | |
| return sum(rows)/len(rows) | |
| def inner_join(rows1, rows2, var1, var2=None): | |
| ''' | |
| Standard SQL-style inner join. | |
| var1 and var2 are column names/numbers OR tuples of column names/numbers. | |
| var2 is set to var1 if not specified. | |
| If rows1 and rows2 are lists of lists, | |
| returns a list of dicts | |
| where an 0-based index in a row is mapped to 'col{index}' for rows1 | |
| or 'col{index}_1' for rows2 | |
| Otherwise, if rows2 has column names that are duplicates of rows in rows1, | |
| the duplicate names in rows2 are mangled by appending '_1'. | |
| EXAMPLE: | |
| >>> rows1 = [{'a': 1, 'b': 'foo', 'd': 5.2}, {'a': 3, 'b': 'bar', 'd': -1}] | |
| >>> rows2 = [{'a': 3, 'b': True, 'c': -2}, {'a': 1, 'b': False, 'c': -1}] | |
| >>> inner_join(rows1, rows2, 'a', 'b') # 1 and True are considered equal | |
| [{'a': 1, 'b': 'foo', 'd': 5.2, 'a_1': 3, 'b_1': True, 'c': -2}] | |
| >>> rows3 = [[1, 'a'], [2, 'b']] | |
| >>> rows4 = [[3, False, 'foo'], [2, True, 'bar'], [2, False, 'baz']] | |
| >>> inner_join(rows3, rows4, 0) | |
| [{'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': True, 'col2_1': 'bar'}, | |
| {'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': False, 'col2_1': 'baz'}] | |
| >>> rows5 = [{'a': 1, 'b_1': 2, 'b': 1}] | |
| >>> rows6 = [{'a': 3, 'b': 1, 'c': -2, 'b_1': False}] | |
| >>> inner_join(rows5, rows6, 'a', 'b') # if k + '_1' is still duplicate, append '_1' until not duplicate | |
| [{'a': 1, 'b_1': 2, 'b': 1, 'a_1': 3, 'b_1_1': 1, 'c': -2, 'b_1_1_1': False}] | |
| ''' | |
| if not (rows1 and rows2): | |
| return [] | |
| if var2 is None: | |
| var2 = var1 | |
| rows1_grouped = {} | |
| varlists = is_list_like(var1) | |
| if varlists ^ is_list_like(var2): | |
| raise ValueError("var1 and var2 must both be single column names/numbers or both be tuples of column names/numbers; cannot have one be single and one be tuple") | |
| if is_list_like(var1): | |
| for row in rows1: | |
| val = tuple(row[subvar1] for subvar1 in var1) | |
| rows1_grouped.setdefault(val, []).append(row) | |
| else: | |
| for row in rows1: | |
| val = row[var1] | |
| rows1_grouped.setdefault(val, []).append(row) | |
| rowtype = type(rows1[0]) | |
| if rowtype == list: | |
| def row_combiner(row1, row2): | |
| out = {f'col{ii}': v for ii, v in enumerate(row1)} | |
| for ii, v in enumerate(row2): | |
| out[f'col{ii}_1'] = v | |
| return out | |
| else: | |
| shared_keys = set(rows1[0]) & set(k for k in rows2[0]) | |
| def row_combiner(row1, row2): | |
| out = {**row1} | |
| for k, v in row2.items(): | |
| if k in shared_keys: | |
| mangled = k + '_1' | |
| ctr = 2 | |
| while mangled in out: | |
| mangled += '_1' | |
| out[mangled] = v | |
| else: | |
| out[k] = v | |
| return out | |
| final = [] | |
| for row2 in rows2: | |
| if varlists: | |
| val = tuple(row2[v] for v in var2) | |
| else: | |
| val = row2[var2] | |
| rows1_related = rows1_grouped.get(val) | |
| if rows1_related: | |
| for row1 in rows1_related: | |
| final.append(row_combiner(row1, row2)) | |
| return final | |
| def group_by(rows, group_var, func, agg_var = None): | |
| ''' | |
| rows: a list of dicts with str keys | |
| group_var: str, the column name to group by | |
| OR a tuple of strings (group by all in tuple) | |
| func: a function that operates on a list | |
| (must operate on list of dicts if agg_var is None) | |
| OR a list of such functions | |
| agg_var: a column name OR a list of column names. | |
| EXAMPLE: | |
| >>> group_by(rows, 'contaminated', max, 'zone') | |
| { | |
| "TRUE": 2.0, | |
| "FALSE": 4.0 | |
| } | |
| >>> group_by(rows, 'contaminated', [mean, min], 'nums') | |
| { | |
| "TRUE": { | |
| "mean": 1.5, | |
| "min": 1.0 | |
| }, | |
| "FALSE": { | |
| "mean": 2.0, | |
| "min": 1.0 | |
| } | |
| } | |
| >>> group_by(rows, 'names', [sum, min], ['nums', 'zone']) | |
| { | |
| "Bluds": { | |
| "nums": {"min": NaN, "sum": NaN}, | |
| "zone": {"min": 1.0, "sum": 3.0} | |
| }, | |
| "dfsd": { | |
| "nums": {"min": 0.5, "sum": 2.0}, | |
| "zone": {"min": 2.0, "sum": 8.0} | |
| }, | |
| "flodt": { | |
| "nums": {"min": 3.4, "sum": 3.4}, | |
| "zone": {"min": 4.0, "sum": 4.0} | |
| } | |
| } | |
| ''' | |
| row_groups = {} | |
| if is_list_like(group_var): | |
| joining_underscore_count = 0 | |
| for row in rows: | |
| val = tuple(row[subvar] for subvar in group_var) | |
| for subval in val: | |
| if isinstance(subval, str): | |
| underscore_list = tuple(m.end() - m.start() | |
| for m in re.finditer('_+', subval)) | |
| if underscore_list: | |
| consec_underscores = max(underscore_list) | |
| joining_underscore_count = max(joining_underscore_count, consec_underscores) | |
| row_groups.setdefault(val, []).append(row) | |
| tuple_row_groups = row_groups | |
| row_groups = {} | |
| # stringified tuples make bad column names in XML and CSV. | |
| # join column names together with more underscores than appear in any value | |
| joining_underscores = '_' * (joining_underscore_count + 1) | |
| for k, v in tuple_row_groups.items(): | |
| row_groups[joining_underscores.join(k)] = v | |
| else: | |
| for row in rows: | |
| val = row[group_var] | |
| row_groups.setdefault(str(val), []).append(row) | |
| if isinstance(func, list): | |
| new_func = lambda x: {subfunc.__name__ : subfunc(x) | |
| for subfunc in func} | |
| else: | |
| new_func = func | |
| if isinstance(agg_var, list): | |
| return {group_val: | |
| {sub_var: new_func([row[sub_var] for row in rows]) | |
| for sub_var in agg_var} | |
| for group_val, rows in row_groups.items()} | |
| elif isinstance(agg_var, str): | |
| return {group_val: new_func([row[agg_var] for row in rows]) | |
| for group_val, rows in row_groups.items()} | |
| return {group_val: new_func(rows) for group_val, rows in row_groups.items()} | |
| #### TABULARIZING FUNCTIONS #### | |
| def is_list_like(x): | |
| return isinstance(x, (list, set, frozenset, tuple)) | |
| def convert_to_list_of_dicts(rows): | |
| if is_list_like(rows): | |
| if not isinstance(rows, list): | |
| rows = list(rows) | |
| if not rows: | |
| return rows | |
| firstrow = rows[0] | |
| if isinstance(firstrow, dict): | |
| return rows | |
| elif not is_list_like(firstrow): | |
| return [{'col1': row} for row in rows] | |
| return [dict(flatten_row(row, [])) for row in rows] | |
| elif isinstance(rows, dict): | |
| if not rows: | |
| return rows | |
| lists, dicts, other_vars = [], [], [] | |
| for k, v in rows.items(): | |
| if is_list_like(v): | |
| lists.append((k, v)) | |
| elif isinstance(v, dict): | |
| dicts.append((k, v)) | |
| else: | |
| other_vars.append((k, v)) | |
| if not lists: | |
| return [dict(flatten_row(rows, []))] | |
| out_rows = [] | |
| max_len = max(len(v) for k, v in lists) | |
| base_row = dict(other_vars) | |
| base_row.update(flatten_row(dict(dicts), [])) | |
| for ii in range(max_len): | |
| new_row = base_row.copy() | |
| for list_var, list_ in lists: | |
| new_row[list_var] = None if ii >= len(list_) else list_[ii] | |
| out_rows.append(dict(flatten_row(new_row, []))) | |
| return out_rows | |
| # rows is a scalar | |
| return [{'col1': rows}] | |
| def flatten_row(row, current_key): | |
| if is_list_like(row): | |
| if not isinstance(row, list): | |
| row = list(row) | |
| for ii, v in enumerate(row): | |
| current_key.append(f'col{ii}') | |
| yield from flatten_row(v, current_key) | |
| current_key.pop() | |
| elif isinstance(row, dict): | |
| for k, v in row.items(): | |
| current_key.append(k) | |
| yield from flatten_row(v, current_key) | |
| current_key.pop() | |
| elif not current_key: | |
| yield '_', row | |
| else: | |
| yield '.'.join(current_key), row | |
| #### PROCESSING FUNCTIONS #### | |
| def csv_to_json(text): | |
| lines = [x.rstrip() for x in editor.getText().splitlines()] | |
| sniffer = csv.Sniffer() | |
| dialect = sniffer.sniff(text[:4092]) | |
| reader = csv.DictReader(lines, dialect=dialect) | |
| rows = list(reader) | |
| postprocess_json(rows, AUTODETECT_BOOLEANS) | |
| return rows | |
| def xml_to_json(text): | |
| root = ET.fromstring(text) | |
| js = [{e.tag: e.text for e in child} for child in root] | |
| postprocess_json(js, AUTODETECT_BOOLEANS) | |
| return js | |
| def jsonl_to_json(text): | |
| lines = text.splitlines() | |
| return [json.loads(line) for line in lines] | |
| def postprocess_json(rows, autodetect_booleans): | |
| ''' | |
| rows must be a list of dicts mapping str to str | |
| Performs the following in-place changes: | |
| 1. Converts string representations of numbers (using '.' as decimal sep) | |
| to the corresponding numbers | |
| 2. If autodetect_booleans, converts columns where all values are in the same | |
| boolean pair to the corresponding boolean value | |
| (e.g., columns of all 'T' and 'F' become True and False) | |
| ''' | |
| bad_keys = set() | |
| boolean_pairs = {} | |
| if autodetect_booleans: | |
| for row in rows: | |
| for k, v in row.items(): | |
| if k not in bad_keys and isinstance(v, str): | |
| # check if value is in the same boolean pair | |
| # as all previous values in this row | |
| current_boopair = boolean_pairs.get(k) | |
| upv = v.upper() | |
| boopair = BOOLEAN_VALS.get(upv) | |
| if not boopair or (current_boopair and boopair is not current_boopair): | |
| bad_keys.add(k) | |
| if k in boolean_pairs: | |
| del boolean_pairs[k] | |
| else: | |
| boolean_pairs[k] = boopair | |
| for row in rows: | |
| for k, v in row.items(): | |
| boopair = boolean_pairs.get(k) | |
| if boopair: | |
| row[k] = boopair[v.upper()] | |
| elif isinstance(v, str) and num_regex.fullmatch(v): | |
| row[k] = float(v) | |
| #### OUTPUT FUNCTIONS #### | |
| def dump_json(js_): | |
| return json.dumps(js_, indent=4) | |
| def dump_jsonl(js_): | |
| rows = convert_to_list_of_dicts(js_) | |
| return '\n'.join(json.dumps(row) for row in rows) | |
| def dump_csv(js_): | |
| rows = convert_to_list_of_dicts(js_) | |
| fieldnames = sorted(set(k for row in rows for k in row), | |
| key=lambda x: x.upper()) | |
| strio = StringIO() | |
| writer = csv.DictWriter(strio, fieldnames, dialect=CSV_DEFAULT_OUTPUT_DIALECT) | |
| writer.writeheader() | |
| for row in rows: | |
| writer.writerow(row) | |
| return strio.getvalue() | |
| def dump_xml(js_): | |
| root = ET.Element('Table') | |
| rows = convert_to_list_of_dicts(js_) | |
| for row in rows: | |
| rowelement = ET.Element('Row') | |
| root.append(rowelement) | |
| for k, v in row.items(): | |
| velement = ET.Element(k) | |
| velement.text = '' if v is None else str(v) | |
| rowelement.append(velement) | |
| tree = ET.ElementTree(root) | |
| ET.indent(tree, ' ' * 4) | |
| strio = StringIO() | |
| tree.write(strio, encoding='unicode', xml_declaration=True, short_empty_elements=False) | |
| return strio.getvalue() | |
| def dump_to_regex_replace(js_, regex, bels): | |
| rows = convert_to_list_of_dicts(js_) | |
| # we want to use Boost regex syntax to parse our file | |
| notepad.new() | |
| newtext = '\n'.join(bels.join(str(v) for v in row.values()) for row in rows) | |
| editor.setText(newtext) | |
| search_regex = bels.join(f"(?'{k}'.*)" for k in rows[0]) | |
| logging.debug('search_regex = %s, regex = %s', search_regex, regex) | |
| editor.rereplace(search_regex, regex) | |
| editor.rereplace(bels, '') | |
| text = editor.getText() | |
| editor.setText('') | |
| notepad.close() | |
| return text | |
| #### HELPER CLASSES #### | |
| class TabDef: | |
| def __init__(self, rows, data_type): | |
| self.data_type = data_type | |
| self.refresh(rows) | |
| def query(self, query): | |
| return eval(query, { | |
| 'byind': byind, | |
| 'bykey': bykey, | |
| 'collections': collections, | |
| 'group_by': group_by, | |
| 'math': math, | |
| 'avg': mean, | |
| 'mean': mean, | |
| 'groupby': group_by, | |
| 'group_by': group_by, | |
| 'join': inner_join, | |
| 'inner_join': inner_join, | |
| 're': re, | |
| 'x': self.rows, | |
| }) | |
| def refresh(self, rows): | |
| self.rows = rows | |
| self.last_refresh = now() | |
| self.get_column_types() | |
| def get_column_types(self): | |
| if (not self.rows) or (isinstance(self.data_type, str) and self.data_type == 'json'): | |
| # json data need not have columns, can be tree-like | |
| self.column_types = None | |
| return | |
| self.column_types = {} | |
| for row in self.rows: | |
| if is_list_like(row): | |
| for ii, v in enumerate(row): | |
| self.column_types.setdefault(ii, set()).add(type(v)) | |
| elif isinstance(row, dict): | |
| for k, v in row.items(): | |
| self.column_types.setdefault(k, set()).add(type(v)) | |
| def column_types_repr(self): | |
| out = ['{'] | |
| for k, types in self.column_types.items(): | |
| tlist = sorted('None' if t == type(None) else t.__name__ | |
| for t in types) | |
| out.append(f"{repr(k)}: {'|'.join(tlist)}") | |
| out.append(', ') | |
| out[-1] = '}' | |
| return ''.join(out) | |
| class PyQuery: | |
| def __init__(self): | |
| self.tabdefs = {} | |
| self.mtimes = {} | |
| self.id_to_be_renamed = None | |
| self.fname_to_be_renamed = None | |
| self.remembered_queries = {} | |
| self.remembered_regexes = {} | |
| self.remembered_output_formats = {} | |
| self.is_writing_to_pyq_buffer = False | |
| self.current_fname = notepad.getCurrentFilename() | |
| self.query_file_path = os.path.join(PYQ_DIR, 'PyQuery query file.pyq') | |
| self.remembered_queries_fname = 'queries.json' | |
| self.remembered_regexes_fname = 'regexes.json' | |
| self.remembered_output_formats_fname = 'output_formats.json' | |
| def dump_if_not_empty(self, dict_, fname): | |
| if not os.path.exists(PYQ_DIR) or not dict_: | |
| return | |
| abspath = os.path.join(PYQ_DIR, fname) | |
| with open(abspath, 'w') as f: | |
| json.dump(dict_, f) | |
| def on_shutdown(self, notif): | |
| self.dump_if_not_empty(self.remembered_queries, self.remembered_queries_fname) | |
| self.dump_if_not_empty(self.remembered_regexes, self.remembered_regexes_fname) | |
| self.dump_if_not_empty(self.remembered_output_formats, self.remembered_output_formats_fname) | |
| def get_remembered_thing(self, attr, fname, default): | |
| '''attr is the name of a dict attribute | |
| ('remembered_queries' or 'remembered_regexes' or 'remembered_output_formats') | |
| Returns the thing associated with fname. | |
| If it can't find the thing associated with fname, find the thing associated | |
| with that thing's file extension. | |
| ''' | |
| if not getattr(self, attr): | |
| abspath = os.path.join(PYQ_DIR, fname) | |
| if os.path.exists(abspath): | |
| with open(abspath) as f: | |
| setattr(self, attr, json.load(f)) | |
| memo = getattr(self, attr) | |
| remembered_for_file = memo.get(self.current_fname) | |
| if remembered_for_file: | |
| self.memorize_thing(remembered_for_file, memo, default) | |
| return remembered_for_file | |
| exts = memo.get('extensions') | |
| if exts: | |
| fname_ext = getFileExtension(self.current_fname) | |
| return exts.get(fname_ext, default) | |
| return default | |
| def get_query(self): | |
| return self.get_remembered_thing('remembered_queries', self.remembered_queries_fname, DEFAULT_QUERY) | |
| def get_regex(self): | |
| return self.get_remembered_thing('remembered_regexes', self.remembered_regexes_fname, DEFAULT_REGEX) | |
| def get_output_format(self): | |
| return self.get_remembered_thing('remembered_output_formats', self.remembered_output_formats_fname, DEFAULT_OUTPUT_TYPE) | |
| def memorize_thing(self, thing, dict_, default): | |
| if thing == default: | |
| return | |
| dict_[self.current_fname] = thing | |
| ext = getFileExtension(self.current_fname) | |
| exts = dict_.setdefault('extensions', {}) | |
| exts[ext] = thing | |
| def memorize_query(self, query): | |
| self.memorize_thing(query, self.remembered_queries, DEFAULT_QUERY) | |
| def memorize_output_format(self, output_format): | |
| self.memorize_thing(output_format, self.remembered_output_formats, DEFAULT_OUTPUT_TYPE) | |
| def memorize_regex(self, regex): | |
| self.memorize_thing(regex, self.remembered_regexes, DEFAULT_REGEX) | |
| def on_bufferactivated(self, notif): | |
| fname = notepad.getCurrentFilename() | |
| if fname == self.query_file_path: | |
| self.is_writing_to_pyq_buffer = True | |
| return | |
| self.is_writing_to_pyq_buffer = False | |
| self.current_fname = notepad.getCurrentFilename() | |
| last_mod_outside_npp = now() | |
| try: # check for modifications outside of Notepad++ | |
| last_mod_outside_npp = os.path.getmtime(self.current_fname) | |
| except FileNotFoundError: | |
| return # an in-memory buffer, can't be modified outside of NPP | |
| last_mod_in_buffer = self.mtimes.get(self.current_fname, 0.) | |
| self.mtimes[self.current_fname] = max(last_mod_in_buffer, last_mod_outside_npp) | |
| logging.info('Opened %s (last modified %s)', self.current_fname, datetime.fromtimestamp(self.mtimes[self.current_fname])) | |
| def on_filebeforerename(self, notif): | |
| self.id_to_be_renamed = notif['bufferID'] | |
| self.fname_to_be_renamed = notepad.getBufferFilename(self.id_to_be_renamed) | |
| def on_filerenamecancel(self, notif): | |
| self.id_to_be_renamed = None | |
| self.fname_to_be_renamed = None | |
| def on_filerenamed(self, notif): | |
| if not self.id_to_be_renamed: # was cancelled | |
| return | |
| fname = notepad.getBufferFilename(self.id_to_be_renamed) | |
| logging.info('current_fname = %s, fname = %s, fname_to_be_renamed = %s, id_to_be_renamed = %s', | |
| self.current_fname, fname, self.fname_to_be_renamed, self.id_to_be_renamed) | |
| if self.fname_to_be_renamed == self.current_fname: | |
| self.current_fname = fname | |
| mtime = self.mtimes.get(self.fname_to_be_renamed) | |
| if mtime: | |
| self.mtimes[fname] = self.mtimes[self.fname_to_be_renamed] | |
| del self.mtimes[self.fname_to_be_renamed] | |
| else: | |
| self.mtimes[fname] = now() | |
| tabdef = self.tabdefs.get(self.fname_to_be_renamed) | |
| if tabdef: | |
| self.tabdefs[fname] = tabdef | |
| del self.tabdefs[self.fname_to_be_renamed] | |
| self.id_to_be_renamed = None | |
| self.fname_to_be_renamed = None | |
| def on_modified(self, notif): | |
| if notif['text'] and not self.is_writing_to_pyq_buffer: | |
| self.mtimes[self.current_fname] = now() | |
| def get_tabdef(self): | |
| tabdef = self.tabdefs.get(self.current_fname) | |
| if tabdef: | |
| mtime = self.mtimes[self.current_fname] | |
| if mtime < tabdef.last_refresh: | |
| logging.info('used cached tabdef for %s', self.current_fname) | |
| return tabdef | |
| else: | |
| tabdef = TabDef(None, None) | |
| notepad.open(self.current_fname) | |
| extension = getFileExtension(self.current_fname) | |
| if tabdef.data_type: | |
| data_type = tabdef.data_type | |
| else: | |
| data_type = FILE_EXTENSION_TYPE_MAPS.get(extension) | |
| text = editor.getText() | |
| if not data_type: | |
| for typ in ['xml', 'json', 'jsonl']: # don't try csv, it finds illogical column separators | |
| proc = self.get_processor(typ) | |
| try: | |
| rows = proc(text) | |
| logging.info('Successfully parsed as %s in trial loop', typ) | |
| return self.bind_tabdef(tabdef, typ, rows) | |
| except Exception as ex: # not the correct way to parse | |
| logging.warning('While parsing as %s in try loop, got error:\n%s', typ, ex) | |
| pass | |
| return None # couldn't parse | |
| else: | |
| processor = self.get_processor(data_type) | |
| try: | |
| rows = processor(text) | |
| except Exception as ex: | |
| logging.error('While parsing as %s, got error:\n%s', data_type, ex) | |
| return None | |
| logging.info('Successfully re-parsed as remembered data type %s', data_type) | |
| return self.bind_tabdef(tabdef, data_type, rows) | |
| def bind_tabdef(self, tabdef, data_type, rows): | |
| tabdef.data_type = data_type | |
| self.tabdefs[self.current_fname] = tabdef | |
| tabdef.refresh(rows) | |
| return tabdef | |
| def get_delimiting_bels(self): | |
| ''' | |
| we use BEL chars to delimit fields in the intermediate file | |
| when the output mode is a regex-replace | |
| EXAMPLE: | |
| If the query result is [{'col1': 0, 'col2': 'a'}, {'col1': 1, 'col2': 'b'}] | |
| the intermediate file contains | |
| 0{BEL}a | |
| 0{BEL}b | |
| where {BEL} is the BEL character. | |
| But if BEL is in the file's text, we need to use more bels to delimit. | |
| E.g, if the query result is [{'col1': 0, 'col2': 'a{BEL}'}, {'col1': 1, 'col2': 'b'}] | |
| the intermediate file will contain | |
| 0{BEL}{BEL}a{BEL} | |
| 1{BEL}{BEL}b{BEL} | |
| ''' | |
| self._consecutive_bel_count = 0 | |
| def on_bel_match(m): | |
| bel_ct = m.end() - m.start() | |
| logging.debug('consecutive_bel_count = %d, bel_ct = %d', self._consecutive_bel_count, bel_ct) | |
| self._consecutive_bel_count = max(self._consecutive_bel_count, bel_ct) | |
| logging.debug('After finds, consecutive_bel_count = %d', self._consecutive_bel_count) | |
| editor.research('\x07+', on_bel_match) | |
| delimiting_bels = '\x07' * (self._consecutive_bel_count + 1) | |
| return delimiting_bels | |
| def write_to_pyq_buffer(self, text): | |
| if not os.path.exists(PYQ_DIR): | |
| os.mkdir(PYQ_DIR) | |
| if not os.path.exists(self.query_file_path): | |
| with open(self.query_file_path, 'w') as f: | |
| f.write('') | |
| notepad.open(self.query_file_path) | |
| editor.setText(text) | |
| def regex_matches_to_json(self, regex): | |
| notepad.open(self.current_fname) | |
| matches = [] | |
| editor.research(regex, lambda m: matches.append(m.groups())) | |
| self.memorize_regex(regex) | |
| rows = convert_to_list_of_dicts(matches) | |
| postprocess_json(rows, AUTODETECT_BOOLEANS) | |
| return rows | |
| def get_processor(self, data_type): | |
| return { | |
| 'csv': csv_to_json, | |
| 'json': json.loads, | |
| 'jsonl': jsonl_to_json, | |
| 'xml': xml_to_json, | |
| }.get(data_type, | |
| lambda text: self.regex_matches_to_json(data_type)) | |
| def dump(self, output_type, data=None): | |
| if data is None: | |
| tabdef = self.get_tabdef() | |
| data = tabdef.rows | |
| dumper = { | |
| 'json': dump_json, | |
| 'jsonl': dump_jsonl, | |
| 'csv': dump_csv, | |
| 'xml': dump_xml, | |
| }.get(output_type, | |
| lambda rows: dump_to_regex_replace(rows, output_type, self.get_delimiting_bels())) | |
| outstr = dumper(data) | |
| self.memorize_output_format(output_type) | |
| self.write_to_pyq_buffer(outstr) | |
| def dialog(self): | |
| tabdef = self.get_tabdef() | |
| dtype = None if not tabdef else tabdef.data_type | |
| if not tabdef or dtype not in VALID_DATA_TYPES: | |
| default_regex = self.get_regex() | |
| regex = notepad.prompt( | |
| f"Couldn't parse current file as any of the types {TYPES_TO_TEST}\nEnter csv to parse as csv, or a regex if you wish to query regex matches", | |
| "Couldn't parse file; enter csv or a regex", | |
| default_regex | |
| ) | |
| if not regex: | |
| return | |
| rows = self.regex_matches_to_json(regex) | |
| tabdef = self.bind_tabdef(TabDef(None, None), regex, rows) | |
| msg = f'Enter Python query on {tabdef.data_type} file. x represents data.' | |
| coltypes = '' if not (tabdef and tabdef.column_types) \ | |
| else f'Col types: {tabdef.column_types_repr()}' | |
| default_query = self.get_query() | |
| query = notepad.prompt(coltypes, msg, default_query) | |
| if not query: | |
| return | |
| output_type = self.get_output_format() | |
| data_type = notepad.prompt('Enter an output format (regex or one of json, jsonl, csv, xml)', 'Enter an output format', output_type) | |
| if not data_type: | |
| return | |
| self.pyq(query, data_type) | |
| def pyq(self, query, output_type=None): | |
| tabdef = self.get_tabdef() | |
| if not tabdef: | |
| self.write_to_pyq_buffer(f'{self.current_fname} could not be parsed as a file of any of the types {VALID_DATA_TYPES}') | |
| return | |
| if not output_type: | |
| output_type = DEFAULT_OUTPUT_TYPE | |
| try: | |
| result = tabdef.query(query) | |
| except Exception as ex: | |
| self.write_to_pyq_buffer(f'Failure while parsing query:\n{ex}') | |
| return | |
| self.memorize_query(query) | |
| self.dump(output_type, result) | |
| if __name__ == '__main__': | |
| try: | |
| INITIALIZED | |
| except NameError: | |
| INITIALIZED = True | |
| PYQ = PyQuery() | |
| pyq = PYQ.pyq | |
| dump = PYQ.dump | |
| notepad.callback(PYQ.on_bufferactivated, [NOTIFICATION.BUFFERACTIVATED]) | |
| notepad.callback(PYQ.on_filebeforerename, [NOTIFICATION.FILEBEFORERENAME]) | |
| notepad.callback(PYQ.on_filerenamecancel, [NOTIFICATION.FILERENAMECANCEL]) | |
| notepad.callback(PYQ.on_filerenamed, [NOTIFICATION.FILERENAMED]) | |
| notepad.callback(PYQ.on_shutdown, [NOTIFICATION.BEFORESHUTDOWN]) | |
| editor.callback(PYQ.on_modified, [SCINTILLANOTIFICATION.MODIFIED]) | |
| if DIALOG_ON_RUN: | |
| PYQ.dialog() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment