Created
May 4, 2024 05:46
-
-
Save sunflower2333/5d3013e2a81673a9d99e8cbf12d7a753 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Global Settings | |
| DoubleIC = False | |
| GenSpb = True | |
| GenC = False | |
| GEN_COMMENT = False | |
| FirstReg = 0x25 | |
| SecondReg = 0x4d | |
| TraceFileName = "tracefile.txt" | |
| OutputFileNameC = "spb.c" | |
| OutputFileNameSpbScript = "spb.txt" | |
| if GenSpb: | |
| GenC = False | |
| if GenC: | |
| GenSpb = False | |
| if GenSpb: | |
| DoubleIC = False | |
| class I2cAction: | |
| taskPid = '' | |
| cpu = '' | |
| irq = '' | |
| time = '' | |
| i2c_func = '' | |
| i2c_bus = '' | |
| i2c_direction = '' | |
| i2c_reg = '' | |
| direction = '' | |
| length = '' | |
| data = '' | |
| def __init__(self, strlist): | |
| if len(strlist) == 8: # result | |
| (self.taskPid, self.cpu, self.irq, self.time, self.i2c_func, | |
| self.i2c_bus, self.n, self.ret) = strlist | |
| elif len(strlist) == 10: # read | |
| (self.taskPid, self.cpu, self.irq, self.time, self.i2c_func, | |
| self.i2c_bus, self.i2c_direction, self.i2c_reg, self.direction, | |
| self.length) = strlist | |
| # elif len(strlist) == 12: # reply | |
| # (self.taskPid, self.cpu, self.irq, self.time, self.i2c_func, | |
| # self.i2c_bus, self.i2c_direction, self.i2c_reg, self.direction, | |
| # self.length, self.data) = strlist | |
| # new_data = '' | |
| # for byte in self.data[1:-1].split('-'): | |
| # new_data += (", " + byte) | |
| # self.data = new_data[:] | |
| elif len(strlist) == 11: # write/reply | |
| (self.taskPid, self.cpu, self.irq, self.time, self.i2c_func, | |
| self.i2c_bus, self.i2c_direction, self.i2c_reg, self.direction, | |
| self.length, self.data) = strlist | |
| new_data = '' | |
| if GenC: | |
| for byte in self.data[1:-1].split('-'): | |
| new_data += (", 0x" + byte) | |
| self.data = new_data[2:] | |
| else: | |
| for byte in self.data[1:-1].split('-'): | |
| new_data += (" 0x" + byte) | |
| self.data = new_data[1:] | |
| else: | |
| print("len: ", len(strlist)) | |
| exit("Error get str from list:\n" + str(strlist)) | |
| self.cpu = self.cpu[1:-1] | |
| self.length = '' if self.length == '' else int(self.length[2:], 10) | |
| self.i2c_reg = '' if self.i2c_reg == '' else int(self.i2c_reg[2:], 16) | |
| self.time = -1 if self.time == -1 else float(self.time[:-1]) | |
| def print_all(self): | |
| if self.i2c_func == "i2c_result:": | |
| print(self.taskPid, | |
| self.cpu, | |
| self.irq, | |
| self.time, | |
| self.i2c_func, | |
| self.i2c_bus, | |
| self.n, | |
| self.ret) | |
| elif self.i2c_func == "i2c_read:": | |
| print(self.taskPid, | |
| self.cpu, | |
| self.irq, | |
| self.time, | |
| self.i2c_func, | |
| self.i2c_bus, | |
| self.i2c_direction, | |
| self.i2c_reg, | |
| self.direction, | |
| self.length) | |
| elif self.i2c_func == "i2c_reply:": | |
| print(self.taskPid, | |
| self.cpu, | |
| self.irq, | |
| self.time, | |
| self.i2c_func, | |
| self.i2c_bus, | |
| self.i2c_direction, | |
| self.i2c_reg, | |
| self.direction, | |
| self.length, | |
| self.data) | |
| elif self.i2c_func == "i2c_write:": | |
| print(self.taskPid, | |
| self.cpu, | |
| self.irq, | |
| self.time, | |
| self.i2c_func, | |
| self.i2c_bus, | |
| self.i2c_direction, | |
| self.i2c_reg, | |
| self.direction, | |
| self.length, | |
| self.data) | |
| else: | |
| exit("Unknown i2c func.") | |
| # read file | |
| def read_txt(name): | |
| file = open(name, 'r') | |
| return file | |
| # _-----=> irqs-off | |
| # / _----=> need-resched | |
| # | / _---=> hardirq/softirq | |
| # || / _--=> preempt-depth | |
| # ||| / delay | |
| # TASK-PID CPU# |||| TIMESTAMP FUNCTION | |
| # | | | |||| | | | |
| # writer-8683 [001] .... 691.788632: i2c_write: i2c-2 #0 a=04c f=0000 l=2 [00-00] | |
| def parse_line(line): | |
| tmp_list = line.split(" ") | |
| for i in range(0, tmp_list.count('')): | |
| tmp_list.remove('') | |
| tmp_list[len(tmp_list) - 1] = tmp_list[len(tmp_list) - 1][:-1] | |
| # print(tmp_list) | |
| return I2cAction(tmp_list) | |
| def code_generator(this_ca, file): | |
| code = """ | |
| VOID | |
| SpbWriteRead( | |
| PSPB_CONTEXT SpbContext, | |
| UINT8 *i_data, | |
| UINT8 *r_data, | |
| UINT16 i_len, | |
| UINT16 o_len | |
| ){ | |
| UINT8 o_data[121] = {0}; | |
| SpbDeviceWriteRead(SpbContext, i_data, o_data, i_len, o_len); | |
| if(o_data[0] != r_data[0]) | |
| TraceEvents(TRACE_LEVEL_ERROR, TRACE_DEVICE, "%!FUNC! Incorrect Reply Here: %d %d",o_data[0], r_data[0]); | |
| } | |
| #define WA(...) { \ | |
| SpbDeviceWrite(&pDevice->SpbContextA, (UINT8[]){ ##__VA_ARGS__ }, sizeof((UINT8[]){ ##__VA_ARGS__ })); \ | |
| } | |
| #define WB(...) { \ | |
| SpbDeviceWrite(&pDevice->SpbContextB, (UINT8[]){ ##__VA_ARGS__ }, sizeof((UINT8[]){ ##__VA_ARGS__ })); \ | |
| } | |
| #define WRA(i_len,...) { \ | |
| UINT8 buf[] = { ##__VA_ARGS__ }; \ | |
| SpbWriteRead(&pDevice->SpbContextA, &buf[0], &buf[i_len], i_len, sizeof(buf) - i_len); \ | |
| } | |
| #define WRB(i_len,...) { \ | |
| UINT8 buf[] = { ##__VA_ARGS__ }; \ | |
| SpbWriteRead(&pDevice->SpbContextB, &buf[0], &buf[i_len], i_len, sizeof(buf) - i_len); \ | |
| } | |
| #define DELAY_ONE_MICROSECOND (-10) | |
| #define DELAY_ONE_MILLISECOND (DELAY_ONE_MICROSECOND*1000) | |
| // Micro Sec | |
| #define DELAY_MS(msec) { \ | |
| LARGE_INTEGER interval; \ | |
| interval.QuadPart = DELAY_ONE_MILLISECOND; \ | |
| interval.QuadPart *= msec; \ | |
| KeDelayExecutionThread(KernelMode, 0, &interval); \ | |
| } | |
| // Milli Sec | |
| #define DELAY_US(msec) { \ | |
| LARGE_INTEGER interval; \ | |
| interval.QuadPart = DELAY_ONE_MICROSECOND; \ | |
| interval.QuadPart *= msec; \ | |
| KeDelayExecutionThread(KernelMode, 0, &interval); \ | |
| } | |
| """ | |
| # file.writelines(code) | |
| last_time = 0 | |
| for tmp_ca in this_ca: | |
| # Judge device channel. | |
| if tmp_ca[1].i2c_reg == FirstReg: | |
| dev = 'A' | |
| elif tmp_ca[1].i2c_reg == SecondReg and DoubleIC: | |
| dev = 'B' | |
| else: | |
| continue | |
| # print(tmp_ca[1].time, last_time, tmp_ca[1].time - last_time) | |
| interval = tmp_ca[1].time - last_time | |
| if interval >= 0.001: | |
| if interval < 0.02: | |
| file.write(f"// delay Detected\nDELAY_MS({round(interval * 1000)});\n") | |
| elif interval > 0.08: | |
| file.write(f"// delay Detected\nDELAY_MS(20);\n") | |
| last_time = tmp_ca[1].time | |
| # Parse composed actions. | |
| if tmp_ca[0] == 'write': | |
| code = f"W{dev} ({tmp_ca[1].data});\n" | |
| elif tmp_ca[0] == 'writeread': | |
| # if tmp_ca[1].i2c_reg != tmp_ca[3].i2c_reg: | |
| # print("Error different Device.") | |
| # tmp_ca[1].print_all() | |
| # tmp_ca[3].print_all() | |
| # code = ("{\n" | |
| # + "\tUINT8 o_data[" + str(tmp_ca[2].length) + "] = " + '{ ' + '0' + ' }' + ';' + '\n' | |
| # + "\tUINT8 r_data[] = " + '{ ' + tmp_ca[3].data[:4] + ' }' + ';' + '\n' | |
| # + "\tSpbDeviceWriteRead(&pDevice->SpbContext" + dev + ', {' + tmp_ca[ | |
| # 1].data + "}, " + ", &o_data, " + str( | |
| # tmp_ca[1].length) + ", " + str(tmp_ca[2].length) + ");" + '\n' | |
| # + "\t// returned value should be: \n\t//\t" + tmp_ca[3].data + '\n' | |
| # # Only check 1 byte here. | |
| # + "\tif(o_data[0] != r_data[0])" + '\n' | |
| # + '\t\tTraceEvents(TRACE_LEVEL_ERROR, TRACE_DEVICE, "%!FUNC! Incorrect Reply Here: %d %d",' | |
| # 'o_data[0], r_data[0]);' + '\n' | |
| # + "}\n\n" | |
| # ) | |
| code = f"WR{dev} ({tmp_ca[1].length}, {tmp_ca[1].data},/*|*/ {tmp_ca[3].data});\n" | |
| # else: | |
| # code = '' | |
| elif tmp_ca[0] == 'read': | |
| code = f"R{dev} ({tmp_ca[1].length});\n" | |
| code += f"// returned value should be: {tmp_ca[2].data};\n" | |
| # Do a check here. | |
| file.writelines(code) | |
| def spbcode_generator(this_ca, file): | |
| code = "" | |
| # file.writelines(code) | |
| last_time = 0 | |
| for tmp_ca in this_ca: | |
| if tmp_ca[1].i2c_reg == FirstReg: | |
| # print(tmp_ca[1].time, last_time, tmp_ca[1].time - last_time) | |
| # interval = tmp_ca[1].time - last_time | |
| # if interval >= 0.001: | |
| # if interval < 0.02: | |
| # file.write(f"// delay Detected\nDELAY_MS({round(interval * 1000)})\n") | |
| # elif interval > 0.08: | |
| # file.write(f"// delay Detected\nDELAY_MS(20)\n") | |
| last_time = tmp_ca[1].time | |
| # Parse composed actions. | |
| if tmp_ca[0] == 'write': | |
| code = f"write {{{tmp_ca[1].data}}}\n" | |
| elif tmp_ca[0] == 'writeread': | |
| code = f"writeread {{{tmp_ca[1].data}}} {tmp_ca[1].length}\n" | |
| if GEN_COMMENT: | |
| code += f"// returned value: {{{tmp_ca[3].data}}} \n" | |
| elif tmp_ca[0] == 'read': | |
| code = f"read {tmp_ca[1].length}\n" | |
| if GEN_COMMENT: | |
| code += f"// returned value should be: {tmp_ca[2].data};\n" | |
| # Do a check here. | |
| file.writelines(code) | |
| if __name__ == "__main__": | |
| # Read File | |
| pFile = read_txt(TraceFileName) | |
| actions = [] | |
| # Parse File | |
| this_line = pFile.readline() | |
| while this_line != '': | |
| actions.append(parse_line(this_line)) | |
| this_line = pFile.readline() | |
| # Compose Each Action | |
| composed_actions = [] | |
| i = 0 | |
| while i < len(actions): | |
| if (actions[i].i2c_func == "i2c_read:" and actions[i + 1].i2c_func == "i2c_reply:" | |
| and actions[i + 2].i2c_func == "i2c_result:"): | |
| composed_actions.append(("read", actions[i], actions[i + 1])) | |
| i += 3 | |
| continue | |
| elif actions[i].i2c_func == "i2c_write:" and actions[i + 1].i2c_func == "i2c_result:": | |
| composed_actions.append(("write", actions[i], actions[i + 1])) | |
| i += 2 | |
| continue | |
| elif actions[i].i2c_func == "i2c_write:" and actions[i + 1].i2c_func == "i2c_read:" \ | |
| and actions[i + 2].i2c_func == "i2c_reply:" and actions[i + 3].i2c_func == "i2c_result:": | |
| composed_actions.append(("writeread", actions[i], actions[i + 1], actions[i + 2])) | |
| i += 4 | |
| continue | |
| else: | |
| print("Unknown Composed Function: ") | |
| actions[i].print_all() | |
| i += 1 | |
| # # Print Spb Commands | |
| # destination_reg = 0x4c | |
| # if composed_actions[0][1].i2c_reg == 0x4c: | |
| # print("\n=========DevA=========") | |
| # elif composed_actions[0][1].i2c_reg == 0x4d: | |
| # print("\n=========DevB=========") | |
| # | |
| # for ca in composed_actions: | |
| # # Print Device Channel | |
| # if destination_reg != ca[1].i2c_reg: | |
| # if ca[1].i2c_reg == 0x4c: | |
| # print("\n#=========DevA=========") | |
| # elif ca[1].i2c_reg == 0x4d: | |
| # print("\n#=========DevB=========") | |
| # destination_reg = ca[1].i2c_reg | |
| # | |
| # # Print Cmds | |
| # if ca[0] == 'write': | |
| # print(ca[0], "{", ca[1].data, "}") | |
| # elif ca[0] == 'writeread': | |
| # print(ca[0], "{", ca[1].data, "}", ca[2].length) | |
| # print("# return value should be: ", ca[3].data) | |
| # elif ca[0] == 'read': | |
| # print(ca[0], ca[1].data, ca[1].length) | |
| # print("# return value should be: ", ca[2].data) | |
| if GenC: | |
| # Generate codes. | |
| pSrc = open(OutputFileNameC, 'w') | |
| code_generator(composed_actions, pSrc) | |
| pSrc.close() | |
| if GenSpb and not DoubleIC: | |
| pSrc = open(OutputFileNameSpbScript, 'w') | |
| spbcode_generator(composed_actions, pSrc) | |
| pSrc.close() | |
| pFile.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment