Skip to content

Instantly share code, notes, and snippets.

@batonac
Last active February 23, 2026 16:26
Show Gist options
  • Select an option

  • Save batonac/550151b52e782f7aa209505c50d3c3d5 to your computer and use it in GitHub Desktop.

Select an option

Save batonac/550151b52e782f7aa209505c50d3c3d5 to your computer and use it in GitHub Desktop.
Hierarchical Editable Frappe Script Report
frappe.query_reports["Item Stock Settings"] = {
filters: [
{
fieldname: "item_group",
label: __("Item Group"),
fieldtype: "Link",
options: "Item Group",
},
],
tree: true,
initial_depth: 1,
formatter: function (value, row, column, data, default_formatter, filter) {
if (!data) return default_formatter(value, row, column, data);
const row_type = data.row_type;
// Name column: show hierarchy formatting with links
if (column.fieldname === "name") {
const item_code = data.item_code;
let display = value || "";
if (row_type === "item_group") {
display = `<span class="text-muted">${frappe.utils.escape_html(data.name)}</span>`;
} else if (row_type === "template") {
display = item_code
? `<a href="/app/item/${encodeURIComponent(item_code)}" target="_blank">${frappe.utils.escape_html(data.name)}</a>`
: frappe.utils.escape_html(data.name);
} else if (row_type === "variant") {
display = item_code
? `<a href="/app/item/${encodeURIComponent(item_code)}" target="_blank"><em>${frappe.utils.escape_html(data.name)}</em></a>`
: `<em>${frappe.utils.escape_html(data.name)}</em>`;
} else if (row_type === "item") {
display = item_code
? `<a href="/app/item/${encodeURIComponent(item_code)}" target="_blank">${frappe.utils.escape_html(data.name)}</a>`
: frappe.utils.escape_html(data.name);
}
return display;
}
// Hide all editable fields on item_group rows
if (row_type === "item_group" && column.fieldname !== "name") {
return "";
}
// Valuation rate formatting
if (column.fieldname === "valuation_rate" && value) {
return frappe.format(value, { fieldtype: "Currency" }, { inline: true });
}
// Conversion factor formatting (use Fraction formatter)
if (column.fieldname === "conversion_factor" && value) {
return frappe.format(
value,
{ fieldtype: "Float", options: "Fraction" },
{ inline: true },
);
}
return default_formatter(value, row, column, data);
},
get_datatable_options: function (options) {
const has_item_group = frappe.query_report.get_filter_value("item_group");
return Object.assign(options, {
treeView: true,
checkboxColumn: false,
serialNoColumn: false,
layout: "fixed",
cellHeight: 36,
getEditor: function (
colIndex,
rowIndex,
value,
parent,
column,
row,
data,
) {
if (!data || !data.row_type) return false;
const row_type = data.row_type;
const field = column.fieldname || column.id;
// Determine if this cell is editable
const editable_fields = [
"stock_uom",
"sales_uom",
"valuation_rate",
"conversion_factor",
];
let is_editable = false;
if (row_type === "item_group") {
is_editable = false;
} else {
is_editable = editable_fields.includes(field);
}
if (!is_editable) return false;
// Number editor for valuation_rate
if (field === "valuation_rate") {
const input = document.createElement("input");
input.className = "dt-input";
input.type = "number";
input.step = "0.01";
input.style.width = "100%";
parent.appendChild(input);
return {
initValue(val) {
input.focus();
input.value = val || "";
},
getValue() {
return parseFloat(input.value) || 0;
},
setValue(newValue) {
const oldValue = value;
if (newValue != oldValue) {
save_cell_value(data, field, newValue, oldValue, input);
} else {
input.value = newValue;
}
},
};
}
// Fraction editor for conversion_factor
if (field === "conversion_factor") {
const control = frappe.ui.form.make_control({
df: {
fieldtype: "Float",
fieldname: field,
options: "Fraction",
},
parent: parent,
render_input: true,
});
control.toggle_label(false);
control.toggle_description(false);
// Prevent dblclick from bubbling to the datatable's handler,
// which would destroy and recreate the editor due to a bug
// in its same-cell guard (activateEditing uses wrong variable names)
parent.addEventListener("dblclick", (e) => e.stopPropagation());
return {
initValue(val) {
control.set_value(val);
setTimeout(() => {
if (control.$input) control.$input.focus();
}, 100);
},
getValue() {
return control.get_value();
},
setValue(newValue) {
const oldValue = value;
if (newValue != oldValue) {
save_cell_value(data, field, newValue, oldValue, null, control);
} else {
control.set_value(newValue);
}
},
};
}
// Link/Select editor for UOM fields
if (["stock_uom", "sales_uom"].includes(field)) {
let control = frappe.ui.form.make_control({
df: {
fieldtype: "Link",
fieldname: field,
options: "UOM",
},
parent: parent,
render_input: true,
});
control.toggle_label(false);
control.toggle_description(false);
return {
initValue(val) {
control.set_value(val);
setTimeout(() => {
if (control.$input) control.$input.focus();
}, 100);
},
getValue() {
return control.get_value();
},
setValue(newValue) {
const oldValue = value;
if (newValue !== oldValue) {
save_cell_value(data, field, newValue, oldValue, null, control);
} else {
control.set_value(newValue);
}
},
};
}
return false;
},
});
function save_cell_value(data, field, newValue, oldValue, input, control) {
const item_code = data.item_code;
let method, args;
if (field === "conversion_factor") {
method =
"custom_app.custom_app.report.item_stock_settings.item_stock_settings.update_sales_uom_conversion";
args = {
item_code: item_code,
value: newValue,
};
} else {
method =
"custom_app.custom_app.report.item_stock_settings.item_stock_settings.update_item_field";
args = {
item_code: item_code,
fieldname: field,
value: newValue,
};
}
frappe.call({
method: method,
args: args,
async: false,
callback: (r) => {
if (r.message && r.message.status === "success") {
frappe.show_alert(
{
message: r.message.message,
indicator: "green",
},
3,
);
if (input) input.value = newValue;
if (control) control.set_value(newValue);
} else {
frappe.show_alert(
{
message: r.message ? r.message.message : __("Update failed"),
indicator: "red",
},
5,
);
// Revert
if (input) input.value = oldValue;
if (control) control.set_value(oldValue);
}
},
error: () => {
frappe.show_alert(
{
message: __("An error occurred during the update."),
indicator: "red",
},
5,
);
if (input) input.value = oldValue;
if (control) control.set_value(oldValue);
},
});
}
},
after_datatable_render: function (datatable) {
if (!datatable) return;
// Style item group rows with subtle background
datatable.datamanager.rows.forEach((row, rowIndex) => {
const indent = row[1]?.indent;
if (indent === undefined) return;
// Find row_type from data
const rowData = datatable.datamanager.data[rowIndex];
if (!rowData) return;
if (rowData.row_type === "item_group") {
datatable.datamanager.columns.forEach((_, colIdx) => {
datatable.style.setStyle(`.dt-cell--${colIdx}-${rowIndex}`, {
backgroundColor: "var(--bg-light-gray, #f5f7fa)",
fontWeight: "600",
});
});
}
});
},
};
from typing import TYPE_CHECKING, Any, cast
import frappe
from frappe import _
from frappe.query_builder import DocType, Order
from frappe.utils import cint, flt
from pypika.functions import Coalesce, NullIf
from custom_app.utils.json_functions import NESTED_SET_DEPTH
if TYPE_CHECKING:
from custom_app.custom_app.custom.item import Item
def get_columns() -> list[dict]:
"""Minimal column definitions - JS handles formatting/editing."""
return [
{
"fieldname": "name",
"label": _("Name"),
"fieldtype": "Data",
"width": 400,
},
{
"fieldname": "valuation_rate",
"label": _("Rate"),
"fieldtype": "Currency",
"width": 130,
"editable": True,
},
{
"fieldname": "stock_uom",
"label": _("Stock UOM"),
"fieldtype": "Link",
"options": "UOM",
"width": 120,
"editable": True,
},
{
"fieldname": "sales_uom",
"label": _("Sales UOM"),
"fieldtype": "Link",
"options": "UOM",
"width": 120,
"editable": True,
},
{
"fieldname": "conversion_factor",
"label": _("Conversion"),
"fieldtype": "Float",
"width": 140,
"editable": True,
},
]
def get_data(filters: dict | None = None) -> list[dict]:
"""
Build hierarchical tree via a single PyPika query with JOINs.
Indent is computed via a correlated subquery in SQL.
"""
if filters is None:
filters = {}
item_group_filter = filters.get("item_group", "")
IG = DocType("Item Group")
Item = DocType("Item")
UCD = DocType("UOM Conversion Detail")
# Build the correlated subquery for ig_indent
if item_group_filter:
# First get root boundaries
root_row = (frappe.qb.from_(IG).select(IG.lft, IG.rgt).where(IG.name == item_group_filter)).run(
as_dict=True
)
if not root_row:
return []
root_lft, root_rgt = root_row[0]["lft"], root_row[0]["rgt"]
indent_subquery = NESTED_SET_DEPTH("Item Group", "ig", root_lft=root_lft, root_rgt=root_rgt)
else:
indent_subquery = NESTED_SET_DEPTH("Item Group", "ig")
# Main query
query = (
frappe.qb.from_(IG.as_("ig"))
.left_join(Item)
.on((Item.item_group == IG.as_("ig").name) & (Item.disabled == 0))
.left_join(UCD)
.on((UCD.parent == Item.name) & (UCD.parenttype == "Item") & (UCD.uom == Item.sales_uom))
.select(
IG.as_("ig").name.as_("ig_name"),
IG.as_("ig").item_group_name,
indent_subquery.as_("ig_indent"),
Item.name.as_("item_code"),
Item.item_name,
Item.stock_uom,
Item.sales_uom,
Item.valuation_rate,
Item.has_variants,
Item.variant_of,
UCD.conversion_factor.as_("sales_uom_cf"),
)
.orderby(IG.as_("ig").lft)
.orderby(Coalesce(NullIf(Item.variant_of, ""), Item.name))
.orderby(Item.has_variants, order=Order.desc)
.orderby(Item.item_name)
)
if item_group_filter:
ig = IG.as_("ig")
query = query.where(ig.lft >= root_lft).where(ig.rgt <= root_rgt)
rows = query.run(as_dict=True)
if not rows:
return []
# ── Single linear pass to build the flat tree ────────────────
result = []
seen_groups: set[str] = set()
seen_items: set[str] = set()
template_indent: dict[str, int] = {}
for row in rows:
ig_name = row["ig_name"]
ig_indent = cint(row["ig_indent"])
if ig_name not in seen_groups:
seen_groups.add(ig_name)
result.append(
{
"name": row["item_group_name"],
"item_code": "",
"row_type": "item_group",
"row_id": ig_name,
"stock_uom": "",
"sales_uom": "",
"valuation_rate": None,
"conversion_factor": None,
"indent": ig_indent,
"has_variants": 0,
}
)
item_code = row["item_code"]
if not item_code:
continue
variant_of = row["variant_of"]
if variant_of:
item_indent = template_indent.get(variant_of, ig_indent + 1) + 1
else:
item_indent = ig_indent + 1
if item_code not in seen_items:
seen_items.add(item_code)
has_variants = row["has_variants"]
item_label = f"{row['item_name']} ({item_code})"
if has_variants:
item_label = f"⚙ {item_label}"
row_type = "variant" if variant_of else ("template" if has_variants else "item")
stock_uom = row["stock_uom"] or ""
sales_uom = row["sales_uom"] or ""
conversion_factor = flt(row["sales_uom_cf"], 6) if sales_uom and sales_uom != stock_uom else None
result.append(
{
"name": item_label,
"item_code": item_code,
"row_type": row_type,
"row_id": item_code,
"stock_uom": stock_uom,
"sales_uom": sales_uom,
"valuation_rate": flt(row["valuation_rate"], 2),
"conversion_factor": conversion_factor,
"indent": item_indent,
"has_variants": has_variants,
}
)
if has_variants:
template_indent[item_code] = item_indent
return result
def execute(filters: dict | None = None):
if filters is None:
filters = {}
columns = get_columns()
data = get_data(filters)
return columns, data
@frappe.whitelist()
def update_item_field(item_code: str, fieldname: str, value: Any) -> dict:
"""Update a single field on an Item document."""
if not item_code:
return {"status": "error", "message": _("Item code is required")}
allowed_fields = {"stock_uom", "sales_uom", "valuation_rate"}
if fieldname not in allowed_fields:
return {"status": "error", "message": _("Field {0} is not editable").format(fieldname)}
try:
doc = frappe.get_doc("Item", item_code)
doc.set(fieldname, value)
doc.save()
frappe.db.commit()
return {
"status": "success",
"message": _("{0} updated for {1}").format(fieldname, item_code),
}
except Exception as e:
frappe.log_error(frappe.get_traceback(), f"Item Stock Settings Update Error - {item_code}")
return {"status": "error", "message": str(e)}
@frappe.whitelist()
def update_sales_uom_conversion(item_code: str, value: Any) -> dict:
"""Update the conversion factor for the item's sales UOM."""
if not item_code:
return {"status": "error", "message": _("Item code is required")}
try:
doc = cast("Item", frappe.get_doc("Item", item_code))
sales_uom = doc.sales_uom
if not sales_uom:
return {"status": "error", "message": _("No sales UOM set for {0}").format(item_code)}
for row in doc.uoms:
if row.uom == sales_uom:
row.conversion_factor = flt(value)
break
else:
doc.append("uoms", {"uom": sales_uom, "conversion_factor": flt(value)})
doc.save()
frappe.db.commit()
return {
"status": "success",
"message": _("Sales UOM conversion updated for {0}").format(item_code),
}
except Exception as e:
frappe.log_error(frappe.get_traceback(), f"Sales UOM Conversion Update Error - {item_code}")
return {"status": "error", "message": str(e)}
"""
Custom PyPika functions for JSON operations in MariaDB/MySQL.
Provides JSON aggregation and manipulation functions for use with
Frappe's query builder.
Example usage:
from frappe.query_builder import DocType
from custom_app.utils.json_functions import JSON_OBJECTAGG, JSON_ARRAYAGG
Item = DocType("Item")
IVA = DocType("Item Variant Attribute")
result = (
frappe.qb.from_(Item)
.left_join(IVA).on(Item.name == IVA.parent)
.select(
Item.item_code,
JSON_OBJECTAGG(IVA.attribute, IVA.attribute_value).as_("attributes")
)
.where(Item.variant_of == template_item)
.groupby(Item.item_code)
).run(as_dict=True)
"""
from typing import Any
from frappe.query_builder import CustomFunction
from frappe.query_builder.functions import AggregateFunction
from frappe.query_builder.terms import ValueWrapper
from pypika.terms import Term
# JSON creation functions
JSON_OBJECT = CustomFunction("JSON_OBJECT")
JSON_ARRAY = CustomFunction("JSON_ARRAY")
JSON_MERGE_PATCH = CustomFunction("JSON_MERGE_PATCH")
JSON_EXTRACT = CustomFunction("JSON_EXTRACT")
JSON_UNQUOTE = CustomFunction("JSON_UNQUOTE")
JSON_KEYS = CustomFunction("JSON_KEYS")
class JSON_OBJECTAGG(AggregateFunction):
"""
Aggregates key-value pairs into a JSON object.
Usage:
JSON_OBJECTAGG(key_column, value_column)
SQL equivalent:
JSON_OBJECTAGG(key_column, value_column)
"""
def __init__(self, key, value, alias=None):
super().__init__("JSON_OBJECTAGG", key, value, alias=alias)
class JSON_ARRAYAGG(AggregateFunction):
"""
JSON_ARRAYAGG with optional DISTINCT and ORDER BY support.
MariaDB syntax: JSON_ARRAYAGG([DISTINCT] expr [ORDER BY expr [, expr ...]])
Usage:
# Simple
JSON_ARRAYAGG(column)
# With distinct and single order
JSON_ARRAYAGG(column, distinct=True, orderby=column)
# Natural numeric sort (numbers before strings, numeric order)
JSON_ARRAYAGG(column, distinct=True, orderby=[column + 0, column])
"""
def __init__(
self,
term: Any,
distinct: bool = False,
orderby: Term | list[Term] | None = None,
alias: str | None = None,
):
# Don't pass term to parent - we'll handle it ourselves
super().__init__("JSON_ARRAYAGG", alias=alias)
# Store the term separately, wrapping if needed
self._term = term if isinstance(term, Term) else ValueWrapper(term)
self._distinct = distinct
self._orderby = orderby
def get_function_sql(self, **kwargs: Any) -> str:
# Build term SQL
term_sql = self._term.get_sql(with_alias=False, **kwargs)
# Build DISTINCT prefix
distinct_sql = "DISTINCT " if self._distinct else ""
# Build ORDER BY suffix
order_sql = ""
if self._orderby is not None:
order_terms = self._orderby if isinstance(self._orderby, list) else [self._orderby]
order_parts = [
t.get_sql(with_alias=False, **kwargs) if isinstance(t, Term) else str(t) for t in order_terms
]
order_sql = f" ORDER BY {', '.join(order_parts)}"
return f"JSON_ARRAYAGG({distinct_sql}{term_sql}{order_sql})"
class GROUP_CONCAT(AggregateFunction):
"""
Concatenates values with a separator.
Usage:
GROUP_CONCAT(column)
SQL equivalent:
GROUP_CONCAT(DISTINCT column SEPARATOR ',')
"""
def __init__(self, term, distinct=False, separator=",", alias=None):
if distinct:
# PyPika doesn't easily support DISTINCT in aggregate, so we use a workaround
super().__init__("GROUP_CONCAT", term, alias=alias)
else:
super().__init__("GROUP_CONCAT", term, alias=alias)
class NESTED_SET_DEPTH(Term):
"""
Calculate the depth/indent of a node in a nested set model by counting ancestors.
This generates a correlated subquery that counts how many rows have
lft < node.lft AND rgt > node.rgt (i.e., ancestors of the node).
Usage:
IG = DocType("Item Group")
# Full tree depth
depth = NESTED_SET_DEPTH("Item Group", "ig")
# Depth relative to a subtree root
depth = NESTED_SET_DEPTH("Item Group", "ig", root_lft=10, root_rgt=50)
SQL equivalent:
(SELECT COUNT(*) FROM `tabItem Group` a
WHERE a.lft < `ig`.lft AND a.rgt > `ig`.rgt
[AND a.lft >= root_lft AND a.rgt <= root_rgt])
"""
def __init__(
self,
table: str,
outer_alias: str,
root_lft: int | None = None,
root_rgt: int | None = None,
):
super().__init__()
self._table = table
self._outer_alias = outer_alias
self._root_lft = root_lft
self._root_rgt = root_rgt
def get_sql(self, with_alias: bool = False, **kwargs: Any) -> str:
table_name = f"`tab{self._table}`"
outer_ref = f"`{self._outer_alias}`"
sql = (
f"(SELECT COUNT(*) FROM {table_name} a WHERE a.lft < {outer_ref}.lft AND a.rgt > {outer_ref}.rgt"
)
if self._root_lft is not None and self._root_rgt is not None:
sql += f" AND a.lft >= {self._root_lft} AND a.rgt <= {self._root_rgt}"
sql += ")"
if with_alias and self.alias:
sql += f" `{self.alias}`"
return sql
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment