Skip to content

Instantly share code, notes, and snippets.

@molsonkiko
Created June 21, 2023 23:12
Show Gist options
  • Select an option

  • Save molsonkiko/f5995f16da1b0a3e3afa703c18b5158c to your computer and use it in GitHub Desktop.

Select an option

Save molsonkiko/f5995f16da1b0a3e3afa703c18b5158c to your computer and use it in GitHub Desktop.
pyquery (tool for low-code data conversion with PythonScript in Notepad++)
'''
By Mark J. Olson, June 2023
Simple tool for querying CSVs, JSON, and tabular XML files with Python list comprehensions, etc.
USAGE
1. Download PythonScript (see REFERENCES 1 below).
Put it in your Notepad++ installation's plugins folder.
2. Download this file and put it in %AppData%/Roaming/Notepad++/plugins/config/PythonScript/scripts
3. Go to PythonScript plugin from the main menu, choose the Configuration option,
and add this script to PythonScript's main toolbar.
4. Add a keyboard shortcut for calling the script.
5. Run the script. A dialog will appear, allowing you to choose a query and output type.
You may also be prompted to use a regex if you're trying to work with a text file.
Numbers are parsed as floats, some strings may be parsed as booleans, all else is text.
For example, pyq("[row for row in x if row['CITY'].startswith('a')]").
The query can include:
* any functions that are always available, like max, min, and sorted
* the mean function (takes average of a list of numbers)
* functions/classes from math, re, and collections.
* a group_by function (see below)
* an inner_join function for performing a SQL-style inner join (see below)
* bykey and byind functions for sorting by a key/index in child iterables
(e.g., sorted(x, bykey('a')) will sort by key 'a' in each child)
5. The result of the query will be displayed in a new window.
For best results, move the query file to other view
so that one view has your tabular file and the other view
has the query results file.
6. Files are only re-parsed as needed (if a file is modified inside or outside of NPP)
allowing for good performance while still updating results when changes happen
7. See STUFF_IN_ALL_CAPS to customize settings.
REFERENCES:
1. https://github.com/bruderstein/PythonScript
(best documentation available in doc folder when you install it.
2. https://docs.python.org/3/ (see especially the csv, xml, and json modules)
3. https://npp-user-manual.org/docs/searching/#regular-expressions
TODO:
1. Add unit tests
2. Improve remembering of queries, regexes, and output formats.
'''
from Npp import *
import csv
import collections
from datetime import datetime
from io import StringIO
import json
import logging
import math
import os
import re
from xml.etree import ElementTree as ET
### SETTINGS
DIALOG_ON_RUN = True # open a pop-up dialog to run queries when the script is run (alternative: run from PythonScript console)
VALID_DATA_TYPES = {'csv', 'json', 'jsonl', 'xml', 'regex'}
FILE_EXTENSION_TYPE_MAPS = { # keys are file extensions, values are in VALID_DATA_TYPES
'json': 'json',
'xml': 'xml',
'jsonl': 'jsonl',
'csv': 'csv',
'tsv': 'csv',
}
TYPES_TO_TEST = ['json', 'jsonl', 'xml'] # try these data types if file extension not helpful
DEFAULT_OUTPUT_TYPE = 'csv' # must be in VALID_DATA_TYPES
CSV_DEFAULT_OUTPUT_DIALECT = 'excel-tab' # tab-separated; use 'excel' for comma-separated
AUTODETECT_BOOLEANS = True
BOOLEAN_PAIRS = [ # case insensitive
{'F': False, 'T': True},
{'FALSE': False, 'TRUE': True},
{'NO': False, 'YES': True},
{'N': False, 'Y': True},
]
BOOLEAN_VALS = {k: pair for pair in BOOLEAN_PAIRS for k in pair}
DEFAULT_QUERY = 'x' # select all data
DEFAULT_REGEX = '(?-s)^(.*)$' # select entire line
### END SETTINGS
logging.basicConfig(format='%(levelname)s@%(asctime)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
def now():
return datetime.now().timestamp()
def getFileExtension(fname):
for ii in range(len(fname) - 1, -1, -1):
if fname[ii] == '.':
break
if ii == 0:
return ''
return fname[ii + 1:]
PYQ_DIR = os.path.join(os.path.dirname(__file__), 'pyquery')
num_regex = re.compile(r'[+-]?(?:(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][+-]?\d+)?|nan|inf)')
#### QUERY FUNCTIONS ####
def bykey(key):
return lambda x: x[key]
def byind(ind):
return lambda x: x[ind]
def mean(rows):
return sum(rows)/len(rows)
def inner_join(rows1, rows2, var1, var2=None):
'''
Standard SQL-style inner join.
var1 and var2 are column names/numbers OR tuples of column names/numbers.
var2 is set to var1 if not specified.
If rows1 and rows2 are lists of lists,
returns a list of dicts
where an 0-based index in a row is mapped to 'col{index}' for rows1
or 'col{index}_1' for rows2
Otherwise, if rows2 has column names that are duplicates of rows in rows1,
the duplicate names in rows2 are mangled by appending '_1'.
EXAMPLE:
>>> rows1 = [{'a': 1, 'b': 'foo', 'd': 5.2}, {'a': 3, 'b': 'bar', 'd': -1}]
>>> rows2 = [{'a': 3, 'b': True, 'c': -2}, {'a': 1, 'b': False, 'c': -1}]
>>> inner_join(rows1, rows2, 'a', 'b') # 1 and True are considered equal
[{'a': 1, 'b': 'foo', 'd': 5.2, 'a_1': 3, 'b_1': True, 'c': -2}]
>>> rows3 = [[1, 'a'], [2, 'b']]
>>> rows4 = [[3, False, 'foo'], [2, True, 'bar'], [2, False, 'baz']]
>>> inner_join(rows3, rows4, 0)
[{'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': True, 'col2_1': 'bar'},
{'col0': 2, 'col1': 'b', 'col0_1': 2, 'col1_1': False, 'col2_1': 'baz'}]
>>> rows5 = [{'a': 1, 'b_1': 2, 'b': 1}]
>>> rows6 = [{'a': 3, 'b': 1, 'c': -2, 'b_1': False}]
>>> inner_join(rows5, rows6, 'a', 'b') # if k + '_1' is still duplicate, append '_1' until not duplicate
[{'a': 1, 'b_1': 2, 'b': 1, 'a_1': 3, 'b_1_1': 1, 'c': -2, 'b_1_1_1': False}]
'''
if not (rows1 and rows2):
return []
if var2 is None:
var2 = var1
rows1_grouped = {}
varlists = is_list_like(var1)
if varlists ^ is_list_like(var2):
raise ValueError("var1 and var2 must both be single column names/numbers or both be tuples of column names/numbers; cannot have one be single and one be tuple")
if is_list_like(var1):
for row in rows1:
val = tuple(row[subvar1] for subvar1 in var1)
rows1_grouped.setdefault(val, []).append(row)
else:
for row in rows1:
val = row[var1]
rows1_grouped.setdefault(val, []).append(row)
rowtype = type(rows1[0])
if rowtype == list:
def row_combiner(row1, row2):
out = {f'col{ii}': v for ii, v in enumerate(row1)}
for ii, v in enumerate(row2):
out[f'col{ii}_1'] = v
return out
else:
shared_keys = set(rows1[0]) & set(k for k in rows2[0])
def row_combiner(row1, row2):
out = {**row1}
for k, v in row2.items():
if k in shared_keys:
mangled = k + '_1'
ctr = 2
while mangled in out:
mangled += '_1'
out[mangled] = v
else:
out[k] = v
return out
final = []
for row2 in rows2:
if varlists:
val = tuple(row2[v] for v in var2)
else:
val = row2[var2]
rows1_related = rows1_grouped.get(val)
if rows1_related:
for row1 in rows1_related:
final.append(row_combiner(row1, row2))
return final
def group_by(rows, group_var, func, agg_var = None):
'''
rows: a list of dicts with str keys
group_var: str, the column name to group by
OR a tuple of strings (group by all in tuple)
func: a function that operates on a list
(must operate on list of dicts if agg_var is None)
OR a list of such functions
agg_var: a column name OR a list of column names.
EXAMPLE:
>>> group_by(rows, 'contaminated', max, 'zone')
{
"TRUE": 2.0,
"FALSE": 4.0
}
>>> group_by(rows, 'contaminated', [mean, min], 'nums')
{
"TRUE": {
"mean": 1.5,
"min": 1.0
},
"FALSE": {
"mean": 2.0,
"min": 1.0
}
}
>>> group_by(rows, 'names', [sum, min], ['nums', 'zone'])
{
"Bluds": {
"nums": {"min": NaN, "sum": NaN},
"zone": {"min": 1.0, "sum": 3.0}
},
"dfsd": {
"nums": {"min": 0.5, "sum": 2.0},
"zone": {"min": 2.0, "sum": 8.0}
},
"flodt": {
"nums": {"min": 3.4, "sum": 3.4},
"zone": {"min": 4.0, "sum": 4.0}
}
}
'''
row_groups = {}
if is_list_like(group_var):
joining_underscore_count = 0
for row in rows:
val = tuple(row[subvar] for subvar in group_var)
for subval in val:
if isinstance(subval, str):
underscore_list = tuple(m.end() - m.start()
for m in re.finditer('_+', subval))
if underscore_list:
consec_underscores = max(underscore_list)
joining_underscore_count = max(joining_underscore_count, consec_underscores)
row_groups.setdefault(val, []).append(row)
tuple_row_groups = row_groups
row_groups = {}
# stringified tuples make bad column names in XML and CSV.
# join column names together with more underscores than appear in any value
joining_underscores = '_' * (joining_underscore_count + 1)
for k, v in tuple_row_groups.items():
row_groups[joining_underscores.join(k)] = v
else:
for row in rows:
val = row[group_var]
row_groups.setdefault(str(val), []).append(row)
if isinstance(func, list):
new_func = lambda x: {subfunc.__name__ : subfunc(x)
for subfunc in func}
else:
new_func = func
if isinstance(agg_var, list):
return {group_val:
{sub_var: new_func([row[sub_var] for row in rows])
for sub_var in agg_var}
for group_val, rows in row_groups.items()}
elif isinstance(agg_var, str):
return {group_val: new_func([row[agg_var] for row in rows])
for group_val, rows in row_groups.items()}
return {group_val: new_func(rows) for group_val, rows in row_groups.items()}
#### TABULARIZING FUNCTIONS ####
def is_list_like(x):
return isinstance(x, (list, set, frozenset, tuple))
def convert_to_list_of_dicts(rows):
if is_list_like(rows):
if not isinstance(rows, list):
rows = list(rows)
if not rows:
return rows
firstrow = rows[0]
if isinstance(firstrow, dict):
return rows
elif not is_list_like(firstrow):
return [{'col1': row} for row in rows]
return [dict(flatten_row(row, [])) for row in rows]
elif isinstance(rows, dict):
if not rows:
return rows
lists, dicts, other_vars = [], [], []
for k, v in rows.items():
if is_list_like(v):
lists.append((k, v))
elif isinstance(v, dict):
dicts.append((k, v))
else:
other_vars.append((k, v))
if not lists:
return [dict(flatten_row(rows, []))]
out_rows = []
max_len = max(len(v) for k, v in lists)
base_row = dict(other_vars)
base_row.update(flatten_row(dict(dicts), []))
for ii in range(max_len):
new_row = base_row.copy()
for list_var, list_ in lists:
new_row[list_var] = None if ii >= len(list_) else list_[ii]
out_rows.append(dict(flatten_row(new_row, [])))
return out_rows
# rows is a scalar
return [{'col1': rows}]
def flatten_row(row, current_key):
if is_list_like(row):
if not isinstance(row, list):
row = list(row)
for ii, v in enumerate(row):
current_key.append(f'col{ii}')
yield from flatten_row(v, current_key)
current_key.pop()
elif isinstance(row, dict):
for k, v in row.items():
current_key.append(k)
yield from flatten_row(v, current_key)
current_key.pop()
elif not current_key:
yield '_', row
else:
yield '.'.join(current_key), row
#### PROCESSING FUNCTIONS ####
def csv_to_json(text):
lines = [x.rstrip() for x in editor.getText().splitlines()]
sniffer = csv.Sniffer()
dialect = sniffer.sniff(text[:4092])
reader = csv.DictReader(lines, dialect=dialect)
rows = list(reader)
postprocess_json(rows, AUTODETECT_BOOLEANS)
return rows
def xml_to_json(text):
root = ET.fromstring(text)
js = [{e.tag: e.text for e in child} for child in root]
postprocess_json(js, AUTODETECT_BOOLEANS)
return js
def jsonl_to_json(text):
lines = text.splitlines()
return [json.loads(line) for line in lines]
def postprocess_json(rows, autodetect_booleans):
'''
rows must be a list of dicts mapping str to str
Performs the following in-place changes:
1. Converts string representations of numbers (using '.' as decimal sep)
to the corresponding numbers
2. If autodetect_booleans, converts columns where all values are in the same
boolean pair to the corresponding boolean value
(e.g., columns of all 'T' and 'F' become True and False)
'''
bad_keys = set()
boolean_pairs = {}
if autodetect_booleans:
for row in rows:
for k, v in row.items():
if k not in bad_keys and isinstance(v, str):
# check if value is in the same boolean pair
# as all previous values in this row
current_boopair = boolean_pairs.get(k)
upv = v.upper()
boopair = BOOLEAN_VALS.get(upv)
if not boopair or (current_boopair and boopair is not current_boopair):
bad_keys.add(k)
if k in boolean_pairs:
del boolean_pairs[k]
else:
boolean_pairs[k] = boopair
for row in rows:
for k, v in row.items():
boopair = boolean_pairs.get(k)
if boopair:
row[k] = boopair[v.upper()]
elif isinstance(v, str) and num_regex.fullmatch(v):
row[k] = float(v)
#### OUTPUT FUNCTIONS ####
def dump_json(js_):
return json.dumps(js_, indent=4)
def dump_jsonl(js_):
rows = convert_to_list_of_dicts(js_)
return '\n'.join(json.dumps(row) for row in rows)
def dump_csv(js_):
rows = convert_to_list_of_dicts(js_)
fieldnames = sorted(set(k for row in rows for k in row),
key=lambda x: x.upper())
strio = StringIO()
writer = csv.DictWriter(strio, fieldnames, dialect=CSV_DEFAULT_OUTPUT_DIALECT)
writer.writeheader()
for row in rows:
writer.writerow(row)
return strio.getvalue()
def dump_xml(js_):
root = ET.Element('Table')
rows = convert_to_list_of_dicts(js_)
for row in rows:
rowelement = ET.Element('Row')
root.append(rowelement)
for k, v in row.items():
velement = ET.Element(k)
velement.text = '' if v is None else str(v)
rowelement.append(velement)
tree = ET.ElementTree(root)
ET.indent(tree, ' ' * 4)
strio = StringIO()
tree.write(strio, encoding='unicode', xml_declaration=True, short_empty_elements=False)
return strio.getvalue()
def dump_to_regex_replace(js_, regex, bels):
rows = convert_to_list_of_dicts(js_)
# we want to use Boost regex syntax to parse our file
notepad.new()
newtext = '\n'.join(bels.join(str(v) for v in row.values()) for row in rows)
editor.setText(newtext)
search_regex = bels.join(f"(?'{k}'.*)" for k in rows[0])
logging.debug('search_regex = %s, regex = %s', search_regex, regex)
editor.rereplace(search_regex, regex)
editor.rereplace(bels, '')
text = editor.getText()
editor.setText('')
notepad.close()
return text
#### HELPER CLASSES ####
class TabDef:
def __init__(self, rows, data_type):
self.data_type = data_type
self.refresh(rows)
def query(self, query):
return eval(query, {
'byind': byind,
'bykey': bykey,
'collections': collections,
'group_by': group_by,
'math': math,
'avg': mean,
'mean': mean,
'groupby': group_by,
'group_by': group_by,
'join': inner_join,
'inner_join': inner_join,
're': re,
'x': self.rows,
})
def refresh(self, rows):
self.rows = rows
self.last_refresh = now()
self.get_column_types()
def get_column_types(self):
if (not self.rows) or (isinstance(self.data_type, str) and self.data_type == 'json'):
# json data need not have columns, can be tree-like
self.column_types = None
return
self.column_types = {}
for row in self.rows:
if is_list_like(row):
for ii, v in enumerate(row):
self.column_types.setdefault(ii, set()).add(type(v))
elif isinstance(row, dict):
for k, v in row.items():
self.column_types.setdefault(k, set()).add(type(v))
def column_types_repr(self):
out = ['{']
for k, types in self.column_types.items():
tlist = sorted('None' if t == type(None) else t.__name__
for t in types)
out.append(f"{repr(k)}: {'|'.join(tlist)}")
out.append(', ')
out[-1] = '}'
return ''.join(out)
class PyQuery:
def __init__(self):
self.tabdefs = {}
self.mtimes = {}
self.id_to_be_renamed = None
self.fname_to_be_renamed = None
self.remembered_queries = {}
self.remembered_regexes = {}
self.remembered_output_formats = {}
self.is_writing_to_pyq_buffer = False
self.current_fname = notepad.getCurrentFilename()
self.query_file_path = os.path.join(PYQ_DIR, 'PyQuery query file.pyq')
self.remembered_queries_fname = 'queries.json'
self.remembered_regexes_fname = 'regexes.json'
self.remembered_output_formats_fname = 'output_formats.json'
def dump_if_not_empty(self, dict_, fname):
if not os.path.exists(PYQ_DIR) or not dict_:
return
abspath = os.path.join(PYQ_DIR, fname)
with open(abspath, 'w') as f:
json.dump(dict_, f)
def on_shutdown(self, notif):
self.dump_if_not_empty(self.remembered_queries, self.remembered_queries_fname)
self.dump_if_not_empty(self.remembered_regexes, self.remembered_regexes_fname)
self.dump_if_not_empty(self.remembered_output_formats, self.remembered_output_formats_fname)
def get_remembered_thing(self, attr, fname, default):
'''attr is the name of a dict attribute
('remembered_queries' or 'remembered_regexes' or 'remembered_output_formats')
Returns the thing associated with fname.
If it can't find the thing associated with fname, find the thing associated
with that thing's file extension.
'''
if not getattr(self, attr):
abspath = os.path.join(PYQ_DIR, fname)
if os.path.exists(abspath):
with open(abspath) as f:
setattr(self, attr, json.load(f))
memo = getattr(self, attr)
remembered_for_file = memo.get(self.current_fname)
if remembered_for_file:
self.memorize_thing(remembered_for_file, memo, default)
return remembered_for_file
exts = memo.get('extensions')
if exts:
fname_ext = getFileExtension(self.current_fname)
return exts.get(fname_ext, default)
return default
def get_query(self):
return self.get_remembered_thing('remembered_queries', self.remembered_queries_fname, DEFAULT_QUERY)
def get_regex(self):
return self.get_remembered_thing('remembered_regexes', self.remembered_regexes_fname, DEFAULT_REGEX)
def get_output_format(self):
return self.get_remembered_thing('remembered_output_formats', self.remembered_output_formats_fname, DEFAULT_OUTPUT_TYPE)
def memorize_thing(self, thing, dict_, default):
if thing == default:
return
dict_[self.current_fname] = thing
ext = getFileExtension(self.current_fname)
exts = dict_.setdefault('extensions', {})
exts[ext] = thing
def memorize_query(self, query):
self.memorize_thing(query, self.remembered_queries, DEFAULT_QUERY)
def memorize_output_format(self, output_format):
self.memorize_thing(output_format, self.remembered_output_formats, DEFAULT_OUTPUT_TYPE)
def memorize_regex(self, regex):
self.memorize_thing(regex, self.remembered_regexes, DEFAULT_REGEX)
def on_bufferactivated(self, notif):
fname = notepad.getCurrentFilename()
if fname == self.query_file_path:
self.is_writing_to_pyq_buffer = True
return
self.is_writing_to_pyq_buffer = False
self.current_fname = notepad.getCurrentFilename()
last_mod_outside_npp = now()
try: # check for modifications outside of Notepad++
last_mod_outside_npp = os.path.getmtime(self.current_fname)
except FileNotFoundError:
return # an in-memory buffer, can't be modified outside of NPP
last_mod_in_buffer = self.mtimes.get(self.current_fname, 0.)
self.mtimes[self.current_fname] = max(last_mod_in_buffer, last_mod_outside_npp)
logging.info('Opened %s (last modified %s)', self.current_fname, datetime.fromtimestamp(self.mtimes[self.current_fname]))
def on_filebeforerename(self, notif):
self.id_to_be_renamed = notif['bufferID']
self.fname_to_be_renamed = notepad.getBufferFilename(self.id_to_be_renamed)
def on_filerenamecancel(self, notif):
self.id_to_be_renamed = None
self.fname_to_be_renamed = None
def on_filerenamed(self, notif):
if not self.id_to_be_renamed: # was cancelled
return
fname = notepad.getBufferFilename(self.id_to_be_renamed)
logging.info('current_fname = %s, fname = %s, fname_to_be_renamed = %s, id_to_be_renamed = %s',
self.current_fname, fname, self.fname_to_be_renamed, self.id_to_be_renamed)
if self.fname_to_be_renamed == self.current_fname:
self.current_fname = fname
mtime = self.mtimes.get(self.fname_to_be_renamed)
if mtime:
self.mtimes[fname] = self.mtimes[self.fname_to_be_renamed]
del self.mtimes[self.fname_to_be_renamed]
else:
self.mtimes[fname] = now()
tabdef = self.tabdefs.get(self.fname_to_be_renamed)
if tabdef:
self.tabdefs[fname] = tabdef
del self.tabdefs[self.fname_to_be_renamed]
self.id_to_be_renamed = None
self.fname_to_be_renamed = None
def on_modified(self, notif):
if notif['text'] and not self.is_writing_to_pyq_buffer:
self.mtimes[self.current_fname] = now()
def get_tabdef(self):
tabdef = self.tabdefs.get(self.current_fname)
if tabdef:
mtime = self.mtimes[self.current_fname]
if mtime < tabdef.last_refresh:
logging.info('used cached tabdef for %s', self.current_fname)
return tabdef
else:
tabdef = TabDef(None, None)
notepad.open(self.current_fname)
extension = getFileExtension(self.current_fname)
if tabdef.data_type:
data_type = tabdef.data_type
else:
data_type = FILE_EXTENSION_TYPE_MAPS.get(extension)
text = editor.getText()
if not data_type:
for typ in ['xml', 'json', 'jsonl']: # don't try csv, it finds illogical column separators
proc = self.get_processor(typ)
try:
rows = proc(text)
logging.info('Successfully parsed as %s in trial loop', typ)
return self.bind_tabdef(tabdef, typ, rows)
except Exception as ex: # not the correct way to parse
logging.warning('While parsing as %s in try loop, got error:\n%s', typ, ex)
pass
return None # couldn't parse
else:
processor = self.get_processor(data_type)
try:
rows = processor(text)
except Exception as ex:
logging.error('While parsing as %s, got error:\n%s', data_type, ex)
return None
logging.info('Successfully re-parsed as remembered data type %s', data_type)
return self.bind_tabdef(tabdef, data_type, rows)
def bind_tabdef(self, tabdef, data_type, rows):
tabdef.data_type = data_type
self.tabdefs[self.current_fname] = tabdef
tabdef.refresh(rows)
return tabdef
def get_delimiting_bels(self):
'''
we use BEL chars to delimit fields in the intermediate file
when the output mode is a regex-replace
EXAMPLE:
If the query result is [{'col1': 0, 'col2': 'a'}, {'col1': 1, 'col2': 'b'}]
the intermediate file contains
0{BEL}a
0{BEL}b
where {BEL} is the BEL character.
But if BEL is in the file's text, we need to use more bels to delimit.
E.g, if the query result is [{'col1': 0, 'col2': 'a{BEL}'}, {'col1': 1, 'col2': 'b'}]
the intermediate file will contain
0{BEL}{BEL}a{BEL}
1{BEL}{BEL}b{BEL}
'''
self._consecutive_bel_count = 0
def on_bel_match(m):
bel_ct = m.end() - m.start()
logging.debug('consecutive_bel_count = %d, bel_ct = %d', self._consecutive_bel_count, bel_ct)
self._consecutive_bel_count = max(self._consecutive_bel_count, bel_ct)
logging.debug('After finds, consecutive_bel_count = %d', self._consecutive_bel_count)
editor.research('\x07+', on_bel_match)
delimiting_bels = '\x07' * (self._consecutive_bel_count + 1)
return delimiting_bels
def write_to_pyq_buffer(self, text):
if not os.path.exists(PYQ_DIR):
os.mkdir(PYQ_DIR)
if not os.path.exists(self.query_file_path):
with open(self.query_file_path, 'w') as f:
f.write('')
notepad.open(self.query_file_path)
editor.setText(text)
def regex_matches_to_json(self, regex):
notepad.open(self.current_fname)
matches = []
editor.research(regex, lambda m: matches.append(m.groups()))
self.memorize_regex(regex)
rows = convert_to_list_of_dicts(matches)
postprocess_json(rows, AUTODETECT_BOOLEANS)
return rows
def get_processor(self, data_type):
return {
'csv': csv_to_json,
'json': json.loads,
'jsonl': jsonl_to_json,
'xml': xml_to_json,
}.get(data_type,
lambda text: self.regex_matches_to_json(data_type))
def dump(self, output_type, data=None):
if data is None:
tabdef = self.get_tabdef()
data = tabdef.rows
dumper = {
'json': dump_json,
'jsonl': dump_jsonl,
'csv': dump_csv,
'xml': dump_xml,
}.get(output_type,
lambda rows: dump_to_regex_replace(rows, output_type, self.get_delimiting_bels()))
outstr = dumper(data)
self.memorize_output_format(output_type)
self.write_to_pyq_buffer(outstr)
def dialog(self):
tabdef = self.get_tabdef()
dtype = None if not tabdef else tabdef.data_type
if not tabdef or dtype not in VALID_DATA_TYPES:
default_regex = self.get_regex()
regex = notepad.prompt(
f"Couldn't parse current file as any of the types {TYPES_TO_TEST}\nEnter csv to parse as csv, or a regex if you wish to query regex matches",
"Couldn't parse file; enter csv or a regex",
default_regex
)
if not regex:
return
rows = self.regex_matches_to_json(regex)
tabdef = self.bind_tabdef(TabDef(None, None), regex, rows)
msg = f'Enter Python query on {tabdef.data_type} file. x represents data.'
coltypes = '' if not (tabdef and tabdef.column_types) \
else f'Col types: {tabdef.column_types_repr()}'
default_query = self.get_query()
query = notepad.prompt(coltypes, msg, default_query)
if not query:
return
output_type = self.get_output_format()
data_type = notepad.prompt('Enter an output format (regex or one of json, jsonl, csv, xml)', 'Enter an output format', output_type)
if not data_type:
return
self.pyq(query, data_type)
def pyq(self, query, output_type=None):
tabdef = self.get_tabdef()
if not tabdef:
self.write_to_pyq_buffer(f'{self.current_fname} could not be parsed as a file of any of the types {VALID_DATA_TYPES}')
return
if not output_type:
output_type = DEFAULT_OUTPUT_TYPE
try:
result = tabdef.query(query)
except Exception as ex:
self.write_to_pyq_buffer(f'Failure while parsing query:\n{ex}')
return
self.memorize_query(query)
self.dump(output_type, result)
if __name__ == '__main__':
try:
INITIALIZED
except NameError:
INITIALIZED = True
PYQ = PyQuery()
pyq = PYQ.pyq
dump = PYQ.dump
notepad.callback(PYQ.on_bufferactivated, [NOTIFICATION.BUFFERACTIVATED])
notepad.callback(PYQ.on_filebeforerename, [NOTIFICATION.FILEBEFORERENAME])
notepad.callback(PYQ.on_filerenamecancel, [NOTIFICATION.FILERENAMECANCEL])
notepad.callback(PYQ.on_filerenamed, [NOTIFICATION.FILERENAMED])
notepad.callback(PYQ.on_shutdown, [NOTIFICATION.BEFORESHUTDOWN])
editor.callback(PYQ.on_modified, [SCINTILLANOTIFICATION.MODIFIED])
if DIALOG_ON_RUN:
PYQ.dialog()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment