Skip to content

Instantly share code, notes, and snippets.

@yiliu30
Created May 8, 2025 03:06
Show Gist options
  • Select an option

  • Save yiliu30/19fa30b1f998ac56329cd41ac2f53b50 to your computer and use it in GitHub Desktop.

Select an option

Save yiliu30/19fa30b1f998ac56329cd41ac2f53b50 to your computer and use it in GitHub Desktop.
from dataclasses import dataclass
from typing import List, Dict
import json
@dataclass
class MoeOpInfo:
num_inputs: int = 0
num_outputs: int = 0
def get_moe_op_name(layer_index, group_index):
return (
f"model.layers.{layer_index}.mlp.experts._temp_expert_group_{group_index}.MoeOp"
)
@dataclass
class ModelInfo:
n_routed_experts: int = 256
num_hidden_layers: int = 61
first_k_dense_replace: int = 3
model_info = ModelInfo()
@dataclass
class Config:
tp_size: int
ep_size: int
worker_size: int
num_expert_group: int = 8
def num_experts_per_rank(self):
return model_info.n_routed_experts // self.ep_size
def num_experts_per_group(self):
return self.num_experts_per_rank() // self.num_expert_group
def get_moe_op_info(self):
num_inputs = 1
num_outputs = 1 + self.num_experts_per_group()
return MoeOpInfo(num_inputs=num_inputs, num_outputs=num_outputs)
one_machine_config = Config(tp_size=8, ep_size=8, worker_size=8)
two_machines_config = Config(tp_size=16, ep_size=16, worker_size=16)
@dataclass
class NodeInfo:
layer_index: int
rank: int
group_index: int
expert_index: int
def filter_moe_nodes(data):
# end with `MoeOp`
return {k: v for k, v in data.items() if k.endswith("MoeOp")}
def merge_moe_op(machine_0_data: Dict, machine_1_data: Dict) -> Dict:
merged_data = {}
# filter out only MoeOp nodes
machine_0_data = filter_moe_nodes(machine_0_data)
machine_1_data = filter_moe_nodes(machine_1_data)
# assert the keys are same
assert set(machine_0_data.keys()) == set(
machine_1_data.keys()
), f"Keys are not same"
all_moe_ops = machine_0_data.keys()
for moe_op_name in all_moe_ops:
inputs_0 = machine_0_data.get(moe_op_name, {}).get("inputs", [])
inputs_1 = machine_1_data.get(moe_op_name, {}).get("inputs", [])
# Merge inputs: take max element-wise
merged_inputs = []
for inp_0, inp_1 in zip(inputs_0, inputs_1):
merged_inputs.append([[max(a, b) for a, b in zip(inp_0[0], inp_1[0])]])
# If one input list is longer, use the longer one
if len(inputs_0) > len(inputs_1):
merged_inputs.extend(inputs_0[len(inputs_1) :])
else:
merged_inputs.extend(inputs_1[len(inputs_0) :])
# Merge outputs: keep max of the first, retain rest from machine 0, then concatenate all from machine 1
outputs_0 = machine_0_data.get(moe_op_name, {}).get("outputs", [])
outputs_1 = machine_1_data.get(moe_op_name, {}).get("outputs", [])
merged_outputs = []
if outputs_0 and outputs_1:
# Take max of the first corresponding output
merged_outputs.append(
[[max(a, b) for a, b in zip(outputs_0[0][0], outputs_1[0][0])]]
)
# Keep remaining outputs from machine 0
merged_outputs.extend(outputs_0[1:])
# Concatenate all outputs from machine 1
merged_outputs.extend(outputs_1[1:])
else:
merged_outputs = outputs_0 or outputs_1
merged_data[moe_op_name] = {"inputs": merged_inputs, "outputs": merged_outputs}
return merged_data
# Print merged results
# print(json.dumps(merged_info, indent=4))
## ===
from typing import Dict
import json
import re
def get_moe_op_name(layer_index, group_index, expert_index, postfix):
return f"model.layers.{layer_index}.mlp.experts._temp_expert_group_{group_index}.MoeOp.{postfix}.{expert_index}"
def merge_moe_op_experts_list(
machine_0_data: Dict, machine_1_data: Dict, model_info: ModelInfo, w13_or_w2=True
) -> Dict:
merged_data = {}
if w13_or_w2:
postfix = "w13_list"
else:
postfix = "w2_list"
for layer_index in range(
model_info.first_k_dense_replace, model_info.num_hidden_layers
):
for expert_index in range(one_machine_config.num_experts_per_rank()):
group_index_in_one_machine = (
expert_index // one_machine_config.num_experts_per_group()
)
group_index_in_two_machines = (
expert_index % two_machines_config.num_experts_per_rank()
) // two_machines_config.num_experts_per_group()
expert_index_in_one_machine = (
expert_index % one_machine_config.num_experts_per_group()
)
expert_index_in_two_machines = (
expert_index % two_machines_config.num_experts_per_group()
)
if expert_index >= two_machines_config.num_experts_per_rank():
data = machine_1_data
else:
data = machine_0_data
key1 = get_moe_op_name(
layer_index,
group_index_in_one_machine,
expert_index_in_one_machine,
postfix,
)
key2 = get_moe_op_name(
layer_index,
group_index_in_two_machines,
expert_index_in_two_machines,
postfix,
)
if key2 in data:
merged_data[key1] = data[key2]
return merged_data
# # Sample input data
# machine_0_data = {
# "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.0": {
# "inputs": [[[0.0]]],
# "params": {"weight": [[0.09375]]},
# },
# "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.1": {
# "inputs": [[[0.0]]],
# "params": {"weight": [[0.0830078125]]},
# },
# }
# machine_1_data = {
# "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.0": {
# "inputs": [[[0.0]]],
# "params": {"weight": [[0.0751953125]]},
# },
# "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.1": {
# "inputs": [[[0.0]]],
# "params": {"weight": [[0.09130859375]]},
# },
# }
# Print merged results
#
# print(json.dumps(merged_w13_list, indent=4))
def merge_other(machine_0_data, machine_1_data):
import json
def merge_entries(entry1, entry2):
merged_entry = {}
for field in set(entry1.keys()).union(entry2.keys()):
if field in entry1 and field in entry2:
if isinstance(entry1[field], list) and isinstance(entry2[field], list):
merged_entry[field] = [
max(e1, e2) for e1, e2 in zip(entry1[field], entry2[field])
]
elif isinstance(entry1[field], dict) and isinstance(
entry2[field], dict
):
merged_entry[field] = merge_entries(entry1[field], entry2[field])
else:
merged_entry[field] = max(entry1[field], entry2[field])
elif field in entry1:
merged_entry[field] = entry1[field]
else:
merged_entry[field] = entry2[field]
return merged_entry
def is_other_keys(key):
return "MoeOp" not in key
def merge_dicts(dict1, dict2):
merged = {}
assert set(dict1.keys()) == set(dict2.keys()), "Keys are not same"
filter_keys = list(filter(is_other_keys, dict1.keys()))
for key in filter_keys:
if key in dict1 and key in dict2:
merged[key] = merge_entries(dict1[key], dict2[key])
elif key in dict1:
merged[key] = dict1[key]
else:
merged[key] = dict2[key]
return merged
return merge_dicts(machine_0_data, machine_1_data)
# # Simulated input data from two machines
# machine_0_data = {
# "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp": {
# "inputs": [[[9.875]]],
# "outputs": [[[0.014404296875]], [[0.70703125]], [[0.6328125]]],
# },
# "model.layers.0.mlp.gate_up_proj": {
# "inputs": [[[30.25]]],
# "params": {"weight": [[0.4453125]]},
# },
# "model.layers.0.mlp.down_proj": {
# "inputs": [[[0.06591796875]]],
# "params": {"weight": [[0.4921875]]},
# },
# }
# machine_1_data = {
# "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp": {
# "inputs": [[[9.876]]],
# "outputs": [[[0.0169677734375]], [[1.3359375]], [[0.271484375]]],
# },
# "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.0": {
# "inputs": [[[0.0]]],
# "params": {"weight": [[0.09375]]},
# },
# "model.layers.0.mlp.gate_up_proj": {
# "inputs": [[[30.25]]],
# "params": {"weight": [[0.4453125]]},
# },
# "model.layers.0.mlp.down_proj": {
# "inputs": [[[0.06591796875]]],
# "params": {"weight": [[0.4921875]]},
# },
# }
def read_data_from_json(file_path):
with open(file_path, "r") as f:
data = json.load(f)
return data["Nodes"]
def put_together(merged_moe, merged_w13_list, merged_w2_list, merged_other, rank):
results = {
"GlobalRank": None,
"LocalRank": rank,
"Mode": "DynamicRange",
}
merged_json = {}
merged_json.update(merged_moe)
merged_json.update(merged_w13_list)
merged_json.update(merged_w2_list)
merged_json.update(merged_other)
# sort result before return
results["Nodes"] = dict(sorted(merged_json.items()))
return results
import json
# Function to compare the structure (keys) of two dictionaries (JSON objects)
def compare_structure(json1, json2, path=""):
# # Compare if the types of json1 and json2 match (both should be dict or list)
# if type(json1) != type(json2):
# print(f"Type mismatch at {path}")
# print(f"File 1 type: {json1}")
# print(f"File 2 type: {json2}")
# return
# If both are dictionaries, compare the keys
if isinstance(json1, dict) and isinstance(json2, dict):
# Compare the keys in both dictionaries
keys1 = set(json1.keys())
keys2 = set(json2.keys())
# Find missing or extra keys
missing_in_json2 = keys1 - keys2
missing_in_json1 = keys2 - keys1
if missing_in_json2:
print(f"Keys missing in file 2 at {path}: {missing_in_json2}")
if missing_in_json1:
print(f"Keys missing in file 1 at {path}: {missing_in_json1}")
# Recurse for common keys
for key in keys1.intersection(keys2):
compare_structure(
json1[key], json2[key], path=f"{path}.{key}" if path else key
)
# If both are lists, compare their lengths
elif isinstance(json1, list) and isinstance(json2, list):
if len(json1) != len(json2):
print(f"List lengths differ at {path}")
else:
for i, (item1, item2) in enumerate(zip(json1, json2)):
compare_structure(item1, item2, path=f"{path}[{i}]")
# Load JSON files
def load_json_file(file_path):
with open(file_path, "r") as file:
return json.load(file)
# Main comparison function
def compare_json_structures(file1, file2):
json_data1 = load_json_file(file1)
json_data2 = load_json_file(file2)
compare_structure(json_data1, json_data2)
# # Provide paths to your JSON files
# file_path_1 = dump_file
# file_path_2 = "/home/yliu7/workspace/inc/3rd-party/vllm/scripts/nc_workspace_tmp_4l_ep8_tp8/inc_measure_output_hooks_maxabs_0_8_mod_list.json"
# compare_json_structures(file_path_1, file_path_2)
def dump_all_nodes_name(merged_json):
mod_list = list(merged_json["Nodes"].keys())
return sorted(mod_list)
measure_result_path = "/home/yliu7/.cache/huggingface/hub/models--Yi30--nc_workspace_tmp_pile_512_mla/snapshots/e07d0d9184d1e2ebc4dd418a4c7c7db692f136bd/"
for i in range(8):
fimachine_0 = f"{measure_result_path}/inc_measure_output_hooks_maxabs_{i}_16.json"
fimachine_1 = (
f"{measure_result_path}/inc_measure_output_hooks_maxabs_{i*2 + 1}_16.json"
)
machine_0_data = read_data_from_json(fimachine_0)
machine_1_data = read_data_from_json(fimachine_1)
merged_moe = merge_moe_op(machine_0_data, machine_1_data)
merged_w13_list = merge_moe_op_experts_list(
machine_0_data, machine_1_data, model_info, True
)
merged_w2_list = merge_moe_op_experts_list(
machine_0_data, machine_1_data, model_info, False
)
merged_other = merge_other(machine_0_data, machine_1_data)
merged_json = put_together(
merged_moe, merged_w13_list, merged_w2_list, merged_other, i
)
merge_path = "./nc_workspace_tmp_mla"
# create dir
import os
os.makedirs(merge_path, exist_ok=True)
merged_file = f"{merge_path}/inc_measure_output_hooks_maxabs_{i}_8.json"
with open(merged_file, "w") as f:
json.dump(merged_json, f, indent=4)
# dump mod list
mod_list = f"{merge_path}/inc_measure_output_hooks_maxabs_{i}_8_mod_list.json"
with open(mod_list, "w") as f:
json.dump(dump_all_nodes_name(merged_json), f, indent=4)
print(f"Dumped {merged_file} and {mod_list}")
global_rank = None
local_rank = i
mode = ""
layers = {}
mode = merged_json["Mode"]
nodes = merged_json["Nodes"]
output_path = merge_path
unified_json_name = merged_file
import numpy as np
# create unified npz file from the unified json
unified_npz_path = unified_json_name.replace(".json", ".npz")
for layer, dlayer in nodes.items():
layers[layer] = {}
layers[layer]["inputs"] = [np.array(x) for x in dlayer["inputs"]]
if dlayer.get("outputs") is not None:
layers[layer]["outputs"] = [np.array(x) for x in dlayer["outputs"]]
if (
dlayer.get("params") is not None
and dlayer["params"].get("weight") is not None
):
layers[layer]["params"] = {}
layers[layer]["params"]["weight"] = np.array(dlayer["params"]["weight"])
df = {
"GlobalRank": global_rank,
"LocalRank": local_rank,
"Mode": mode,
"Nodes": layers,
}
with open(unified_npz_path, "w"):
np.savez(unified_npz_path, df)
# # dump to local
# dump_file = "/home/yliu7/workspace/inc/3rd-party/vllm/scripts/merged.json"
# with open(dump_file, "w") as f:
# json.dump(merged_json, f, indent=4)
# mod_list = "/home/yliu7/workspace/inc/3rd-party/vllm/scripts/merged_mod_list.json"
# with open(mod_list, "w") as f:
# json.dump(dump_all_nodes_name(merged_json), f, indent=4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment