Created
May 8, 2025 03:06
-
-
Save yiliu30/19fa30b1f998ac56329cd41ac2f53b50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from dataclasses import dataclass | |
| from typing import List, Dict | |
| import json | |
| @dataclass | |
| class MoeOpInfo: | |
| num_inputs: int = 0 | |
| num_outputs: int = 0 | |
| def get_moe_op_name(layer_index, group_index): | |
| return ( | |
| f"model.layers.{layer_index}.mlp.experts._temp_expert_group_{group_index}.MoeOp" | |
| ) | |
| @dataclass | |
| class ModelInfo: | |
| n_routed_experts: int = 256 | |
| num_hidden_layers: int = 61 | |
| first_k_dense_replace: int = 3 | |
| model_info = ModelInfo() | |
| @dataclass | |
| class Config: | |
| tp_size: int | |
| ep_size: int | |
| worker_size: int | |
| num_expert_group: int = 8 | |
| def num_experts_per_rank(self): | |
| return model_info.n_routed_experts // self.ep_size | |
| def num_experts_per_group(self): | |
| return self.num_experts_per_rank() // self.num_expert_group | |
| def get_moe_op_info(self): | |
| num_inputs = 1 | |
| num_outputs = 1 + self.num_experts_per_group() | |
| return MoeOpInfo(num_inputs=num_inputs, num_outputs=num_outputs) | |
| one_machine_config = Config(tp_size=8, ep_size=8, worker_size=8) | |
| two_machines_config = Config(tp_size=16, ep_size=16, worker_size=16) | |
| @dataclass | |
| class NodeInfo: | |
| layer_index: int | |
| rank: int | |
| group_index: int | |
| expert_index: int | |
| def filter_moe_nodes(data): | |
| # end with `MoeOp` | |
| return {k: v for k, v in data.items() if k.endswith("MoeOp")} | |
| def merge_moe_op(machine_0_data: Dict, machine_1_data: Dict) -> Dict: | |
| merged_data = {} | |
| # filter out only MoeOp nodes | |
| machine_0_data = filter_moe_nodes(machine_0_data) | |
| machine_1_data = filter_moe_nodes(machine_1_data) | |
| # assert the keys are same | |
| assert set(machine_0_data.keys()) == set( | |
| machine_1_data.keys() | |
| ), f"Keys are not same" | |
| all_moe_ops = machine_0_data.keys() | |
| for moe_op_name in all_moe_ops: | |
| inputs_0 = machine_0_data.get(moe_op_name, {}).get("inputs", []) | |
| inputs_1 = machine_1_data.get(moe_op_name, {}).get("inputs", []) | |
| # Merge inputs: take max element-wise | |
| merged_inputs = [] | |
| for inp_0, inp_1 in zip(inputs_0, inputs_1): | |
| merged_inputs.append([[max(a, b) for a, b in zip(inp_0[0], inp_1[0])]]) | |
| # If one input list is longer, use the longer one | |
| if len(inputs_0) > len(inputs_1): | |
| merged_inputs.extend(inputs_0[len(inputs_1) :]) | |
| else: | |
| merged_inputs.extend(inputs_1[len(inputs_0) :]) | |
| # Merge outputs: keep max of the first, retain rest from machine 0, then concatenate all from machine 1 | |
| outputs_0 = machine_0_data.get(moe_op_name, {}).get("outputs", []) | |
| outputs_1 = machine_1_data.get(moe_op_name, {}).get("outputs", []) | |
| merged_outputs = [] | |
| if outputs_0 and outputs_1: | |
| # Take max of the first corresponding output | |
| merged_outputs.append( | |
| [[max(a, b) for a, b in zip(outputs_0[0][0], outputs_1[0][0])]] | |
| ) | |
| # Keep remaining outputs from machine 0 | |
| merged_outputs.extend(outputs_0[1:]) | |
| # Concatenate all outputs from machine 1 | |
| merged_outputs.extend(outputs_1[1:]) | |
| else: | |
| merged_outputs = outputs_0 or outputs_1 | |
| merged_data[moe_op_name] = {"inputs": merged_inputs, "outputs": merged_outputs} | |
| return merged_data | |
| # Print merged results | |
| # print(json.dumps(merged_info, indent=4)) | |
| ## === | |
| from typing import Dict | |
| import json | |
| import re | |
| def get_moe_op_name(layer_index, group_index, expert_index, postfix): | |
| return f"model.layers.{layer_index}.mlp.experts._temp_expert_group_{group_index}.MoeOp.{postfix}.{expert_index}" | |
| def merge_moe_op_experts_list( | |
| machine_0_data: Dict, machine_1_data: Dict, model_info: ModelInfo, w13_or_w2=True | |
| ) -> Dict: | |
| merged_data = {} | |
| if w13_or_w2: | |
| postfix = "w13_list" | |
| else: | |
| postfix = "w2_list" | |
| for layer_index in range( | |
| model_info.first_k_dense_replace, model_info.num_hidden_layers | |
| ): | |
| for expert_index in range(one_machine_config.num_experts_per_rank()): | |
| group_index_in_one_machine = ( | |
| expert_index // one_machine_config.num_experts_per_group() | |
| ) | |
| group_index_in_two_machines = ( | |
| expert_index % two_machines_config.num_experts_per_rank() | |
| ) // two_machines_config.num_experts_per_group() | |
| expert_index_in_one_machine = ( | |
| expert_index % one_machine_config.num_experts_per_group() | |
| ) | |
| expert_index_in_two_machines = ( | |
| expert_index % two_machines_config.num_experts_per_group() | |
| ) | |
| if expert_index >= two_machines_config.num_experts_per_rank(): | |
| data = machine_1_data | |
| else: | |
| data = machine_0_data | |
| key1 = get_moe_op_name( | |
| layer_index, | |
| group_index_in_one_machine, | |
| expert_index_in_one_machine, | |
| postfix, | |
| ) | |
| key2 = get_moe_op_name( | |
| layer_index, | |
| group_index_in_two_machines, | |
| expert_index_in_two_machines, | |
| postfix, | |
| ) | |
| if key2 in data: | |
| merged_data[key1] = data[key2] | |
| return merged_data | |
| # # Sample input data | |
| # machine_0_data = { | |
| # "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.0": { | |
| # "inputs": [[[0.0]]], | |
| # "params": {"weight": [[0.09375]]}, | |
| # }, | |
| # "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.1": { | |
| # "inputs": [[[0.0]]], | |
| # "params": {"weight": [[0.0830078125]]}, | |
| # }, | |
| # } | |
| # machine_1_data = { | |
| # "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.0": { | |
| # "inputs": [[[0.0]]], | |
| # "params": {"weight": [[0.0751953125]]}, | |
| # }, | |
| # "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.1": { | |
| # "inputs": [[[0.0]]], | |
| # "params": {"weight": [[0.09130859375]]}, | |
| # }, | |
| # } | |
| # Print merged results | |
| # | |
| # print(json.dumps(merged_w13_list, indent=4)) | |
| def merge_other(machine_0_data, machine_1_data): | |
| import json | |
| def merge_entries(entry1, entry2): | |
| merged_entry = {} | |
| for field in set(entry1.keys()).union(entry2.keys()): | |
| if field in entry1 and field in entry2: | |
| if isinstance(entry1[field], list) and isinstance(entry2[field], list): | |
| merged_entry[field] = [ | |
| max(e1, e2) for e1, e2 in zip(entry1[field], entry2[field]) | |
| ] | |
| elif isinstance(entry1[field], dict) and isinstance( | |
| entry2[field], dict | |
| ): | |
| merged_entry[field] = merge_entries(entry1[field], entry2[field]) | |
| else: | |
| merged_entry[field] = max(entry1[field], entry2[field]) | |
| elif field in entry1: | |
| merged_entry[field] = entry1[field] | |
| else: | |
| merged_entry[field] = entry2[field] | |
| return merged_entry | |
| def is_other_keys(key): | |
| return "MoeOp" not in key | |
| def merge_dicts(dict1, dict2): | |
| merged = {} | |
| assert set(dict1.keys()) == set(dict2.keys()), "Keys are not same" | |
| filter_keys = list(filter(is_other_keys, dict1.keys())) | |
| for key in filter_keys: | |
| if key in dict1 and key in dict2: | |
| merged[key] = merge_entries(dict1[key], dict2[key]) | |
| elif key in dict1: | |
| merged[key] = dict1[key] | |
| else: | |
| merged[key] = dict2[key] | |
| return merged | |
| return merge_dicts(machine_0_data, machine_1_data) | |
| # # Simulated input data from two machines | |
| # machine_0_data = { | |
| # "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp": { | |
| # "inputs": [[[9.875]]], | |
| # "outputs": [[[0.014404296875]], [[0.70703125]], [[0.6328125]]], | |
| # }, | |
| # "model.layers.0.mlp.gate_up_proj": { | |
| # "inputs": [[[30.25]]], | |
| # "params": {"weight": [[0.4453125]]}, | |
| # }, | |
| # "model.layers.0.mlp.down_proj": { | |
| # "inputs": [[[0.06591796875]]], | |
| # "params": {"weight": [[0.4921875]]}, | |
| # }, | |
| # } | |
| # machine_1_data = { | |
| # "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp": { | |
| # "inputs": [[[9.876]]], | |
| # "outputs": [[[0.0169677734375]], [[1.3359375]], [[0.271484375]]], | |
| # }, | |
| # "model.layers.3.mlp.experts._temp_expert_group_0.MoeOp.w13_list.0": { | |
| # "inputs": [[[0.0]]], | |
| # "params": {"weight": [[0.09375]]}, | |
| # }, | |
| # "model.layers.0.mlp.gate_up_proj": { | |
| # "inputs": [[[30.25]]], | |
| # "params": {"weight": [[0.4453125]]}, | |
| # }, | |
| # "model.layers.0.mlp.down_proj": { | |
| # "inputs": [[[0.06591796875]]], | |
| # "params": {"weight": [[0.4921875]]}, | |
| # }, | |
| # } | |
| def read_data_from_json(file_path): | |
| with open(file_path, "r") as f: | |
| data = json.load(f) | |
| return data["Nodes"] | |
| def put_together(merged_moe, merged_w13_list, merged_w2_list, merged_other, rank): | |
| results = { | |
| "GlobalRank": None, | |
| "LocalRank": rank, | |
| "Mode": "DynamicRange", | |
| } | |
| merged_json = {} | |
| merged_json.update(merged_moe) | |
| merged_json.update(merged_w13_list) | |
| merged_json.update(merged_w2_list) | |
| merged_json.update(merged_other) | |
| # sort result before return | |
| results["Nodes"] = dict(sorted(merged_json.items())) | |
| return results | |
| import json | |
| # Function to compare the structure (keys) of two dictionaries (JSON objects) | |
| def compare_structure(json1, json2, path=""): | |
| # # Compare if the types of json1 and json2 match (both should be dict or list) | |
| # if type(json1) != type(json2): | |
| # print(f"Type mismatch at {path}") | |
| # print(f"File 1 type: {json1}") | |
| # print(f"File 2 type: {json2}") | |
| # return | |
| # If both are dictionaries, compare the keys | |
| if isinstance(json1, dict) and isinstance(json2, dict): | |
| # Compare the keys in both dictionaries | |
| keys1 = set(json1.keys()) | |
| keys2 = set(json2.keys()) | |
| # Find missing or extra keys | |
| missing_in_json2 = keys1 - keys2 | |
| missing_in_json1 = keys2 - keys1 | |
| if missing_in_json2: | |
| print(f"Keys missing in file 2 at {path}: {missing_in_json2}") | |
| if missing_in_json1: | |
| print(f"Keys missing in file 1 at {path}: {missing_in_json1}") | |
| # Recurse for common keys | |
| for key in keys1.intersection(keys2): | |
| compare_structure( | |
| json1[key], json2[key], path=f"{path}.{key}" if path else key | |
| ) | |
| # If both are lists, compare their lengths | |
| elif isinstance(json1, list) and isinstance(json2, list): | |
| if len(json1) != len(json2): | |
| print(f"List lengths differ at {path}") | |
| else: | |
| for i, (item1, item2) in enumerate(zip(json1, json2)): | |
| compare_structure(item1, item2, path=f"{path}[{i}]") | |
| # Load JSON files | |
| def load_json_file(file_path): | |
| with open(file_path, "r") as file: | |
| return json.load(file) | |
| # Main comparison function | |
| def compare_json_structures(file1, file2): | |
| json_data1 = load_json_file(file1) | |
| json_data2 = load_json_file(file2) | |
| compare_structure(json_data1, json_data2) | |
| # # Provide paths to your JSON files | |
| # file_path_1 = dump_file | |
| # file_path_2 = "/home/yliu7/workspace/inc/3rd-party/vllm/scripts/nc_workspace_tmp_4l_ep8_tp8/inc_measure_output_hooks_maxabs_0_8_mod_list.json" | |
| # compare_json_structures(file_path_1, file_path_2) | |
| def dump_all_nodes_name(merged_json): | |
| mod_list = list(merged_json["Nodes"].keys()) | |
| return sorted(mod_list) | |
| measure_result_path = "/home/yliu7/.cache/huggingface/hub/models--Yi30--nc_workspace_tmp_pile_512_mla/snapshots/e07d0d9184d1e2ebc4dd418a4c7c7db692f136bd/" | |
| for i in range(8): | |
| fimachine_0 = f"{measure_result_path}/inc_measure_output_hooks_maxabs_{i}_16.json" | |
| fimachine_1 = ( | |
| f"{measure_result_path}/inc_measure_output_hooks_maxabs_{i*2 + 1}_16.json" | |
| ) | |
| machine_0_data = read_data_from_json(fimachine_0) | |
| machine_1_data = read_data_from_json(fimachine_1) | |
| merged_moe = merge_moe_op(machine_0_data, machine_1_data) | |
| merged_w13_list = merge_moe_op_experts_list( | |
| machine_0_data, machine_1_data, model_info, True | |
| ) | |
| merged_w2_list = merge_moe_op_experts_list( | |
| machine_0_data, machine_1_data, model_info, False | |
| ) | |
| merged_other = merge_other(machine_0_data, machine_1_data) | |
| merged_json = put_together( | |
| merged_moe, merged_w13_list, merged_w2_list, merged_other, i | |
| ) | |
| merge_path = "./nc_workspace_tmp_mla" | |
| # create dir | |
| import os | |
| os.makedirs(merge_path, exist_ok=True) | |
| merged_file = f"{merge_path}/inc_measure_output_hooks_maxabs_{i}_8.json" | |
| with open(merged_file, "w") as f: | |
| json.dump(merged_json, f, indent=4) | |
| # dump mod list | |
| mod_list = f"{merge_path}/inc_measure_output_hooks_maxabs_{i}_8_mod_list.json" | |
| with open(mod_list, "w") as f: | |
| json.dump(dump_all_nodes_name(merged_json), f, indent=4) | |
| print(f"Dumped {merged_file} and {mod_list}") | |
| global_rank = None | |
| local_rank = i | |
| mode = "" | |
| layers = {} | |
| mode = merged_json["Mode"] | |
| nodes = merged_json["Nodes"] | |
| output_path = merge_path | |
| unified_json_name = merged_file | |
| import numpy as np | |
| # create unified npz file from the unified json | |
| unified_npz_path = unified_json_name.replace(".json", ".npz") | |
| for layer, dlayer in nodes.items(): | |
| layers[layer] = {} | |
| layers[layer]["inputs"] = [np.array(x) for x in dlayer["inputs"]] | |
| if dlayer.get("outputs") is not None: | |
| layers[layer]["outputs"] = [np.array(x) for x in dlayer["outputs"]] | |
| if ( | |
| dlayer.get("params") is not None | |
| and dlayer["params"].get("weight") is not None | |
| ): | |
| layers[layer]["params"] = {} | |
| layers[layer]["params"]["weight"] = np.array(dlayer["params"]["weight"]) | |
| df = { | |
| "GlobalRank": global_rank, | |
| "LocalRank": local_rank, | |
| "Mode": mode, | |
| "Nodes": layers, | |
| } | |
| with open(unified_npz_path, "w"): | |
| np.savez(unified_npz_path, df) | |
| # # dump to local | |
| # dump_file = "/home/yliu7/workspace/inc/3rd-party/vllm/scripts/merged.json" | |
| # with open(dump_file, "w") as f: | |
| # json.dump(merged_json, f, indent=4) | |
| # mod_list = "/home/yliu7/workspace/inc/3rd-party/vllm/scripts/merged_mod_list.json" | |
| # with open(mod_list, "w") as f: | |
| # json.dump(dump_all_nodes_name(merged_json), f, indent=4) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment