Skip to content

Instantly share code, notes, and snippets.

@sunflower2333
Created May 4, 2024 05:46
Show Gist options
  • Select an option

  • Save sunflower2333/5d3013e2a81673a9d99e8cbf12d7a753 to your computer and use it in GitHub Desktop.

Select an option

Save sunflower2333/5d3013e2a81673a9d99e8cbf12d7a753 to your computer and use it in GitHub Desktop.
# Global Settings
DoubleIC = False
GenSpb = True
GenC = False
GEN_COMMENT = False
FirstReg = 0x25
SecondReg = 0x4d
TraceFileName = "tracefile.txt"
OutputFileNameC = "spb.c"
OutputFileNameSpbScript = "spb.txt"
if GenSpb:
GenC = False
if GenC:
GenSpb = False
if GenSpb:
DoubleIC = False
class I2cAction:
taskPid = ''
cpu = ''
irq = ''
time = ''
i2c_func = ''
i2c_bus = ''
i2c_direction = ''
i2c_reg = ''
direction = ''
length = ''
data = ''
def __init__(self, strlist):
if len(strlist) == 8: # result
(self.taskPid, self.cpu, self.irq, self.time, self.i2c_func,
self.i2c_bus, self.n, self.ret) = strlist
elif len(strlist) == 10: # read
(self.taskPid, self.cpu, self.irq, self.time, self.i2c_func,
self.i2c_bus, self.i2c_direction, self.i2c_reg, self.direction,
self.length) = strlist
# elif len(strlist) == 12: # reply
# (self.taskPid, self.cpu, self.irq, self.time, self.i2c_func,
# self.i2c_bus, self.i2c_direction, self.i2c_reg, self.direction,
# self.length, self.data) = strlist
# new_data = ''
# for byte in self.data[1:-1].split('-'):
# new_data += (", " + byte)
# self.data = new_data[:]
elif len(strlist) == 11: # write/reply
(self.taskPid, self.cpu, self.irq, self.time, self.i2c_func,
self.i2c_bus, self.i2c_direction, self.i2c_reg, self.direction,
self.length, self.data) = strlist
new_data = ''
if GenC:
for byte in self.data[1:-1].split('-'):
new_data += (", 0x" + byte)
self.data = new_data[2:]
else:
for byte in self.data[1:-1].split('-'):
new_data += (" 0x" + byte)
self.data = new_data[1:]
else:
print("len: ", len(strlist))
exit("Error get str from list:\n" + str(strlist))
self.cpu = self.cpu[1:-1]
self.length = '' if self.length == '' else int(self.length[2:], 10)
self.i2c_reg = '' if self.i2c_reg == '' else int(self.i2c_reg[2:], 16)
self.time = -1 if self.time == -1 else float(self.time[:-1])
def print_all(self):
if self.i2c_func == "i2c_result:":
print(self.taskPid,
self.cpu,
self.irq,
self.time,
self.i2c_func,
self.i2c_bus,
self.n,
self.ret)
elif self.i2c_func == "i2c_read:":
print(self.taskPid,
self.cpu,
self.irq,
self.time,
self.i2c_func,
self.i2c_bus,
self.i2c_direction,
self.i2c_reg,
self.direction,
self.length)
elif self.i2c_func == "i2c_reply:":
print(self.taskPid,
self.cpu,
self.irq,
self.time,
self.i2c_func,
self.i2c_bus,
self.i2c_direction,
self.i2c_reg,
self.direction,
self.length,
self.data)
elif self.i2c_func == "i2c_write:":
print(self.taskPid,
self.cpu,
self.irq,
self.time,
self.i2c_func,
self.i2c_bus,
self.i2c_direction,
self.i2c_reg,
self.direction,
self.length,
self.data)
else:
exit("Unknown i2c func.")
# read file
def read_txt(name):
file = open(name, 'r')
return file
# _-----=> irqs-off
# / _----=> need-resched
# | / _---=> hardirq/softirq
# || / _--=> preempt-depth
# ||| / delay
# TASK-PID CPU# |||| TIMESTAMP FUNCTION
# | | | |||| | |
# writer-8683 [001] .... 691.788632: i2c_write: i2c-2 #0 a=04c f=0000 l=2 [00-00]
def parse_line(line):
tmp_list = line.split(" ")
for i in range(0, tmp_list.count('')):
tmp_list.remove('')
tmp_list[len(tmp_list) - 1] = tmp_list[len(tmp_list) - 1][:-1]
# print(tmp_list)
return I2cAction(tmp_list)
def code_generator(this_ca, file):
code = """
VOID
SpbWriteRead(
PSPB_CONTEXT SpbContext,
UINT8 *i_data,
UINT8 *r_data,
UINT16 i_len,
UINT16 o_len
){
UINT8 o_data[121] = {0};
SpbDeviceWriteRead(SpbContext, i_data, o_data, i_len, o_len);
if(o_data[0] != r_data[0])
TraceEvents(TRACE_LEVEL_ERROR, TRACE_DEVICE, "%!FUNC! Incorrect Reply Here: %d %d",o_data[0], r_data[0]);
}
#define WA(...) { \
SpbDeviceWrite(&pDevice->SpbContextA, (UINT8[]){ ##__VA_ARGS__ }, sizeof((UINT8[]){ ##__VA_ARGS__ })); \
}
#define WB(...) { \
SpbDeviceWrite(&pDevice->SpbContextB, (UINT8[]){ ##__VA_ARGS__ }, sizeof((UINT8[]){ ##__VA_ARGS__ })); \
}
#define WRA(i_len,...) { \
UINT8 buf[] = { ##__VA_ARGS__ }; \
SpbWriteRead(&pDevice->SpbContextA, &buf[0], &buf[i_len], i_len, sizeof(buf) - i_len); \
}
#define WRB(i_len,...) { \
UINT8 buf[] = { ##__VA_ARGS__ }; \
SpbWriteRead(&pDevice->SpbContextB, &buf[0], &buf[i_len], i_len, sizeof(buf) - i_len); \
}
#define DELAY_ONE_MICROSECOND (-10)
#define DELAY_ONE_MILLISECOND (DELAY_ONE_MICROSECOND*1000)
// Micro Sec
#define DELAY_MS(msec) { \
LARGE_INTEGER interval; \
interval.QuadPart = DELAY_ONE_MILLISECOND; \
interval.QuadPart *= msec; \
KeDelayExecutionThread(KernelMode, 0, &interval); \
}
// Milli Sec
#define DELAY_US(msec) { \
LARGE_INTEGER interval; \
interval.QuadPart = DELAY_ONE_MICROSECOND; \
interval.QuadPart *= msec; \
KeDelayExecutionThread(KernelMode, 0, &interval); \
}
"""
# file.writelines(code)
last_time = 0
for tmp_ca in this_ca:
# Judge device channel.
if tmp_ca[1].i2c_reg == FirstReg:
dev = 'A'
elif tmp_ca[1].i2c_reg == SecondReg and DoubleIC:
dev = 'B'
else:
continue
# print(tmp_ca[1].time, last_time, tmp_ca[1].time - last_time)
interval = tmp_ca[1].time - last_time
if interval >= 0.001:
if interval < 0.02:
file.write(f"// delay Detected\nDELAY_MS({round(interval * 1000)});\n")
elif interval > 0.08:
file.write(f"// delay Detected\nDELAY_MS(20);\n")
last_time = tmp_ca[1].time
# Parse composed actions.
if tmp_ca[0] == 'write':
code = f"W{dev} ({tmp_ca[1].data});\n"
elif tmp_ca[0] == 'writeread':
# if tmp_ca[1].i2c_reg != tmp_ca[3].i2c_reg:
# print("Error different Device.")
# tmp_ca[1].print_all()
# tmp_ca[3].print_all()
# code = ("{\n"
# + "\tUINT8 o_data[" + str(tmp_ca[2].length) + "] = " + '{ ' + '0' + ' }' + ';' + '\n'
# + "\tUINT8 r_data[] = " + '{ ' + tmp_ca[3].data[:4] + ' }' + ';' + '\n'
# + "\tSpbDeviceWriteRead(&pDevice->SpbContext" + dev + ', {' + tmp_ca[
# 1].data + "}, " + ", &o_data, " + str(
# tmp_ca[1].length) + ", " + str(tmp_ca[2].length) + ");" + '\n'
# + "\t// returned value should be: \n\t//\t" + tmp_ca[3].data + '\n'
# # Only check 1 byte here.
# + "\tif(o_data[0] != r_data[0])" + '\n'
# + '\t\tTraceEvents(TRACE_LEVEL_ERROR, TRACE_DEVICE, "%!FUNC! Incorrect Reply Here: %d %d",'
# 'o_data[0], r_data[0]);' + '\n'
# + "}\n\n"
# )
code = f"WR{dev} ({tmp_ca[1].length}, {tmp_ca[1].data},/*|*/ {tmp_ca[3].data});\n"
# else:
# code = ''
elif tmp_ca[0] == 'read':
code = f"R{dev} ({tmp_ca[1].length});\n"
code += f"// returned value should be: {tmp_ca[2].data};\n"
# Do a check here.
file.writelines(code)
def spbcode_generator(this_ca, file):
code = ""
# file.writelines(code)
last_time = 0
for tmp_ca in this_ca:
if tmp_ca[1].i2c_reg == FirstReg:
# print(tmp_ca[1].time, last_time, tmp_ca[1].time - last_time)
# interval = tmp_ca[1].time - last_time
# if interval >= 0.001:
# if interval < 0.02:
# file.write(f"// delay Detected\nDELAY_MS({round(interval * 1000)})\n")
# elif interval > 0.08:
# file.write(f"// delay Detected\nDELAY_MS(20)\n")
last_time = tmp_ca[1].time
# Parse composed actions.
if tmp_ca[0] == 'write':
code = f"write {{{tmp_ca[1].data}}}\n"
elif tmp_ca[0] == 'writeread':
code = f"writeread {{{tmp_ca[1].data}}} {tmp_ca[1].length}\n"
if GEN_COMMENT:
code += f"// returned value: {{{tmp_ca[3].data}}} \n"
elif tmp_ca[0] == 'read':
code = f"read {tmp_ca[1].length}\n"
if GEN_COMMENT:
code += f"// returned value should be: {tmp_ca[2].data};\n"
# Do a check here.
file.writelines(code)
if __name__ == "__main__":
# Read File
pFile = read_txt(TraceFileName)
actions = []
# Parse File
this_line = pFile.readline()
while this_line != '':
actions.append(parse_line(this_line))
this_line = pFile.readline()
# Compose Each Action
composed_actions = []
i = 0
while i < len(actions):
if (actions[i].i2c_func == "i2c_read:" and actions[i + 1].i2c_func == "i2c_reply:"
and actions[i + 2].i2c_func == "i2c_result:"):
composed_actions.append(("read", actions[i], actions[i + 1]))
i += 3
continue
elif actions[i].i2c_func == "i2c_write:" and actions[i + 1].i2c_func == "i2c_result:":
composed_actions.append(("write", actions[i], actions[i + 1]))
i += 2
continue
elif actions[i].i2c_func == "i2c_write:" and actions[i + 1].i2c_func == "i2c_read:" \
and actions[i + 2].i2c_func == "i2c_reply:" and actions[i + 3].i2c_func == "i2c_result:":
composed_actions.append(("writeread", actions[i], actions[i + 1], actions[i + 2]))
i += 4
continue
else:
print("Unknown Composed Function: ")
actions[i].print_all()
i += 1
# # Print Spb Commands
# destination_reg = 0x4c
# if composed_actions[0][1].i2c_reg == 0x4c:
# print("\n=========DevA=========")
# elif composed_actions[0][1].i2c_reg == 0x4d:
# print("\n=========DevB=========")
#
# for ca in composed_actions:
# # Print Device Channel
# if destination_reg != ca[1].i2c_reg:
# if ca[1].i2c_reg == 0x4c:
# print("\n#=========DevA=========")
# elif ca[1].i2c_reg == 0x4d:
# print("\n#=========DevB=========")
# destination_reg = ca[1].i2c_reg
#
# # Print Cmds
# if ca[0] == 'write':
# print(ca[0], "{", ca[1].data, "}")
# elif ca[0] == 'writeread':
# print(ca[0], "{", ca[1].data, "}", ca[2].length)
# print("# return value should be: ", ca[3].data)
# elif ca[0] == 'read':
# print(ca[0], ca[1].data, ca[1].length)
# print("# return value should be: ", ca[2].data)
if GenC:
# Generate codes.
pSrc = open(OutputFileNameC, 'w')
code_generator(composed_actions, pSrc)
pSrc.close()
if GenSpb and not DoubleIC:
pSrc = open(OutputFileNameSpbScript, 'w')
spbcode_generator(composed_actions, pSrc)
pSrc.close()
pFile.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment