I need to achieve my goal in a single ES query:
Know how many employees in every department in a company, and get the count of active employees, new employees, and leave employees for every department, and sum these buckets to get the active/new/leave count for company
The specific condition can be described as follow(simplified version)
- Need to search for all employee in the target company (
c_company_idfield) - Need to count the employee without a department(
"missing": "unknown") - The employee only stores hire date and leave date
- The count of active employees in the beginning of the month(
employed_originfield)- For those with
c_statusin[1, 2](not leaved employee), with hire date less than2020-01-01, will contribute to count of active employees - For thoes with
c_statusvalue3(leaved employee), with hire date less than2020-01-01and leave date greater than2020-01-01, will contribute to count of active employees
- For those with
- new employees(
hirefield)- Hire date between
2020-01-01and2020-02-01
- Hire date between
- leave employees(
leavefield)c_statusvalue3and leave date between2020-01-01and2020-02-01
- The count of active employees in the end of the month(
employedfield)(The real needed field)employed_origin+hire-leave
- Sum all buckets to get the company's
employedcount,hirecount andleavecount
After hours of search engine's help, I finally make all the conditions into a single ES query, and the performance also meets my requirement
GET my_index/_search
{
"track_total_hits": 100000,
"query": {
"bool": {
"must": [
{
"term": {
"c_company_id": "xxxxxxxxxxxxx"
}
}
]
}
},
"size": 0,
"aggs": {
"department_id": {
"terms": {
"field": "c_department_id",
"size" : 3000,
"missing": "unknown"
},
"aggs": {
"employed_origin": {
"sum": {
"script": {
"lang": "painless",
"source": "def lower_ms = Instant.parse(params.lower_date + \"T00:00:00.00Z\").toEpochMilli(); if (doc.containsKey('c_work_status') && doc['c_work_status'].size() > 0 && doc.containsKey('c_hire_dt') && doc['c_hire_dt'].size() > 0 && doc['c_hire_dt'].value.toInstant().toEpochMilli() < lower_ms) { if (1 <= doc['c_work_status'].value && doc['c_work_status'].value <= 2) { return 1; } else if (doc['c_work_status'].value == 4 && doc.containsKey('c_leave_dt') && doc['c_leave_dt'].size() > 0 && doc['c_leave_dt'].value.toInstant().toEpochMilli() >= lower_ms) { return 1; } else { return 0; } } else { return 0; }",
"params": {
"upper_date": "2020-02-01",
"lower_date": "2020-01-01"
}
}
}
},
"hire": {
"sum": {
"script": {
"lang": "painless",
"source": "def lower_ms = Instant.parse(params.lower_date + \"T00:00:00.00Z\").toEpochMilli(); def upper_ms = Instant.parse(params.upper_date + \"T00:00:00.00Z\").toEpochMilli(); if (doc.containsKey('c_hire_dt') && doc['c_hire_dt'].size() > 0 && doc['c_hire_dt'].value.toInstant().toEpochMilli() >= lower_ms && doc['c_hire_dt'].value.toInstant().toEpochMilli() < upper_ms) { return 1; } else { return 0; }",
"params": {
"upper_date": "2020-02-01",
"lower_date": "2020-01-01"
}
}
}
},
"leave": {
"sum": {
"script": {
"lang": "painless",
"source": "def lower_ms = Instant.parse(params.lower_date + \"T00:00:00.00Z\").toEpochMilli(); def upper_ms = Instant.parse(params.upper_date + \"T00:00:00.00Z\").toEpochMilli(); if (doc.containsKey('c_work_status') && doc['c_work_status'].size() > 0 && doc['c_work_status'].value == 3 && doc.containsKey('c_leave_dt') && doc['c_leave_dt'].size() > 0 && doc['c_leave_dt'].value.toInstant().toEpochMilli() >= lower_ms && doc['c_leave_dt'].value.toInstant().toEpochMilli() < upper_ms) { return 1; } else { return 0; }",
"params": {
"upper_date": "2020-02-01",
"lower_date": "2020-01-01"
}
}
}
},
"employed": {
"bucket_script": {
"buckets_path": {
"employed_origin": "employed_origin",
"hire": "hire",
"leave": "leave"
},
"script": "params.employed_origin + params.hire - params.leave"
}
}
}
},
"employed_origin": {
"sum_bucket": {
"buckets_path": "department_id>employed_origin"
}
},
"employed": {
"sum_bucket": {
"buckets_path": "department_id>employed"
}
},
"hire": {
"sum_bucket": {
"buckets_path": "department_id>hire"
}
},
"leave": {
"sum_bucket": {
"buckets_path": "department_id>leave"
}
}
}
}