-
-
Save svavassori/3319ff9d7e16a8788665ca59a5a04889 to your computer and use it in GitHub Desktop.
| import sys | |
| import csv | |
| import json | |
| # Converts the JSON output of a PowerBI query to a CSV file | |
| def extract(input_file, output_file): | |
| input_json = read_json(input_file) | |
| data = input_json["results"][0]["result"]["data"] | |
| dm0 = data["dsr"]["DS"][0]["PH"][0]["DM0"] | |
| columns_types = dm0[0]["S"] | |
| columns = map(lambda item: item["GroupKeys"][0]["Source"]["Property"] if item["Kind"] == 1 else item["Value"], data["descriptor"]["Select"]) | |
| value_dicts = data["dsr"]["DS"][0].get("ValueDicts", {}) | |
| reconstruct_arrays(columns_types, dm0) | |
| expand_values(columns_types, dm0, value_dicts) | |
| replace_newlines_with(dm0, "") | |
| write_csv(output_file, columns, dm0) | |
| def read_json(file_name): | |
| with open(file_name) as json_config_file: | |
| return json.load(json_config_file) | |
| def write_csv(output_file, columns, dm0): | |
| with open(output_file, "w") as csvfile: | |
| wrt = csv.writer(csvfile) | |
| wrt.writerow(columns) | |
| for item in dm0: | |
| wrt.writerow(item["C"]) | |
| def reconstruct_arrays(columns_types, dm0): | |
| # fixes array index by applying | |
| # "R" bitset to copy previous values | |
| # "Ø" bitset to set null values | |
| lenght = len(columns_types) | |
| for item in dm0: | |
| currentItem = item["C"] | |
| if "R" in item or "Ø" in item: | |
| copyBitset = item.get("R", 0) | |
| deleteBitSet = item.get("Ø", 0) | |
| for i in range(lenght): | |
| if is_bit_set_for_index(i, copyBitset): | |
| currentItem.insert(i, prevItem[i]) | |
| elif is_bit_set_for_index(i, deleteBitSet): | |
| currentItem.insert(i, None) | |
| prevItem = currentItem | |
| def is_bit_set_for_index(index, bitset): | |
| return (bitset >> index) & 1 == 1 | |
| # substitute indexes with actual values | |
| def expand_values(columns_types, dm0, value_dicts): | |
| for (idx, col) in enumerate(columns_types): | |
| if "DN" in col: | |
| for item in dm0: | |
| dataItem = item["C"] | |
| if isinstance(dataItem[idx], int): | |
| valDict = value_dicts[col["DN"]] | |
| dataItem[idx] = valDict[dataItem[idx]] | |
| def replace_newlines_with(dm0, replacement): | |
| for item in dm0: | |
| elem = item["C"] | |
| for i in range(len(elem)): | |
| if isinstance(elem[i], str): | |
| elem[i] = elem[i].replace("\n", replacement) | |
| def main(): | |
| if len(sys.argv) == 3: | |
| extract(sys.argv[1], sys.argv[2]) | |
| else: | |
| sys.exit("Usage: python3 " + sys.argv[0] + " input_file output_file", file=sys.stderr) | |
| if __name__ == "__main__": | |
| main() |
Thanks!
I'm glad it was useful for you too.
I'm struggling to understand the use case - how would I produce an input_file?
The content of input_file is the output (json-like) of an HTTP request query to PowerBI, you can grab it from the network tab of the browser's dev tools or by using an external tool like cURL or postman
got it - thanks!
What would happen if you have both a DM0 and a DM1? I don't understand much fo this DSR language tbh
The script doesn't look at DM1, so nothing different from DM0 only
Alright so for anyone reading this in the future, DM I believe is just a result of a query, in my scenario I had a DM0 which was high level aggregates and DM1 which contained the detailed information I was interested in. I was able to use the script on DM1 and worked flawlessly.
Thanks @svavassori for spending time to understand this strange data format!
Thanks for this script. It was hard to understand the JSON structure but your script made it super easy.