Created
April 18, 2024 05:51
-
-
Save IntendedConsequence/17ba179e67ae963a4cc125b46ec33d7c to your computer and use it in GitHub Desktop.
Face detection and recognition from insightface in one file and minimal dependencies, without cython and extension compilation nonsense
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import glob | |
| import hashlib | |
| import os.path | |
| import pickle | |
| import sys | |
| import zipfile | |
| import cv2 | |
| import onnx | |
| from onnx import numpy_helper | |
| import onnxruntime | |
| import numpy as np | |
| from numpy.linalg import norm as l2norm | |
| # TODO: SCRFD model | |
| # from insightface.model_zoo.scrfd import SCRFD | |
| from skimage import transform as trans | |
| from pathlib import Path | |
| import requests | |
| from tqdm import tqdm | |
| # NOTE: requirements: | |
| # - opencv-python-headless | |
| # - onnx | |
| # - onnxruntime | |
| # - [optional] onnxruntime-gpu | |
| # - numpy | |
| # - skimage | |
| # - requests | |
| # - tqdm | |
| # NOTE: other zip names: buffalo_s, buffalo_sc | |
| # NOTE: SCRFD onnx filename: scrfd_person_2.5g.onnx | |
| DEFAULT_MP_NAME = 'buffalo_l' | |
| BASE_REPO_URL = 'https://github.com/deepinsight/insightface/releases/download/v0.7' | |
| # NOTE: load only face detection and recognition models by default | |
| ALLOWED_MODULES = ["detection", "recognition"] | |
| # NOTE: uncomment to enable loading all original models by default | |
| # ALLOWED_MODULES = None | |
| class Face(dict): | |
| def __init__(self, d=None, **kwargs): | |
| if d is None: | |
| d = {} | |
| if kwargs: | |
| d.update(**kwargs) | |
| for k, v in d.items(): | |
| setattr(self, k, v) | |
| # Class attributes | |
| #for k in self.__class__.__dict__.keys(): | |
| # if not (k.startswith('__') and k.endswith('__')) and not k in ('update', 'pop'): | |
| # setattr(self, k, getattr(self, k)) | |
| def __setattr__(self, name, value): | |
| if isinstance(value, (list, tuple)): | |
| value = [self.__class__(x) | |
| if isinstance(x, dict) else x for x in value] | |
| elif isinstance(value, dict) and not isinstance(value, self.__class__): | |
| value = self.__class__(value) | |
| super(Face, self).__setattr__(name, value) | |
| super(Face, self).__setitem__(name, value) | |
| __setitem__ = __setattr__ | |
| def __getattr__(self, name): | |
| return None | |
| @property | |
| def embedding_norm(self): | |
| if self.embedding is None: | |
| return None | |
| return l2norm(self.embedding) | |
| @property | |
| def normed_embedding(self): | |
| if self.embedding is None: | |
| return None | |
| return self.embedding / self.embedding_norm | |
| @property | |
| def sex(self): | |
| if self.gender is None: | |
| return None | |
| return 'M' if self.gender==1 else 'F' | |
| def check_sha1(filename, sha1_hash): | |
| """Check whether the sha1 hash of the file content matches the expected hash. | |
| Parameters | |
| ---------- | |
| filename : str | |
| Path to the file. | |
| sha1_hash : str | |
| Expected sha1 hash in hexadecimal digits. | |
| Returns | |
| ------- | |
| bool | |
| Whether the file content matches the expected hash. | |
| """ | |
| sha1 = hashlib.sha1() | |
| with open(filename, 'rb') as f: | |
| while True: | |
| data = f.read(1048576) | |
| if not data: | |
| break | |
| sha1.update(data) | |
| sha1_file = sha1.hexdigest() | |
| l = min(len(sha1_file), len(sha1_hash)) | |
| return sha1.hexdigest()[0:l] == sha1_hash[0:l] | |
| def download_file(url, path=None, overwrite=False, sha1_hash=None): | |
| """Download an given URL | |
| Parameters | |
| ---------- | |
| url : str | |
| URL to download | |
| path : str, optional | |
| Destination path to store downloaded file. By default stores to the | |
| current directory with same name as in url. | |
| overwrite : bool, optional | |
| Whether to overwrite destination file if already exists. | |
| sha1_hash : str, optional | |
| Expected sha1 hash in hexadecimal digits. Will ignore existing file when hash is specified | |
| but doesn't match. | |
| Returns | |
| ------- | |
| str | |
| The file path of the downloaded file. | |
| """ | |
| if path is None: | |
| fname = url.split('/')[-1] | |
| else: | |
| path = os.path.expanduser(path) | |
| if os.path.isdir(path): | |
| fname = os.path.join(path, url.split('/')[-1]) | |
| else: | |
| fname = path | |
| if overwrite or not os.path.exists(fname) or ( | |
| sha1_hash and not check_sha1(fname, sha1_hash)): | |
| dirname = os.path.dirname(os.path.abspath(os.path.expanduser(fname))) | |
| if not os.path.exists(dirname): | |
| os.makedirs(dirname) | |
| print('Downloading %s from %s...' % (fname, url), file=sys.stderr) | |
| r = requests.get(url, stream=True) | |
| if r.status_code != 200: | |
| raise RuntimeError("Failed downloading url %s" % url) | |
| total_length = r.headers.get('content-length') | |
| with open(fname, 'wb') as f: | |
| if total_length is None: # no content length header | |
| for chunk in r.iter_content(chunk_size=1024): | |
| if chunk: # filter out keep-alive new chunks | |
| f.write(chunk) | |
| else: | |
| total_length = int(total_length) | |
| for chunk in tqdm(r.iter_content(chunk_size=1024), | |
| total=int(total_length / 1024. + 0.5), | |
| unit='KB', | |
| unit_scale=False, | |
| dynamic_ncols=True): | |
| f.write(chunk) | |
| if sha1_hash and not check_sha1(fname, sha1_hash): | |
| raise UserWarning('File {} is downloaded but the content hash does not match. ' \ | |
| 'The repo may be outdated or download may be incomplete. ' \ | |
| 'If the "repo_url" is overridden, consider switching to ' \ | |
| 'the default repo.'.format(fname)) | |
| return fname | |
| def download(sub_dir, name, force=False, root='~/.insightface'): | |
| _root = os.path.expanduser(root) | |
| dir_path = os.path.join(_root, sub_dir, name) | |
| if os.path.exists(dir_path) and not force: | |
| return dir_path | |
| print('download_path:', dir_path, file=sys.stderr) | |
| zip_file_path = os.path.join(_root, sub_dir, name + '.zip') | |
| model_url = "%s/%s.zip"%(BASE_REPO_URL, name) | |
| download_file(model_url, | |
| path=zip_file_path, | |
| overwrite=True) | |
| if not os.path.exists(dir_path): | |
| os.makedirs(dir_path) | |
| with zipfile.ZipFile(zip_file_path) as zf: | |
| zf.extractall(dir_path) | |
| #os.remove(zip_file_path) | |
| return dir_path | |
| def download_onnx(sub_dir, model_file, force=False, root='~/.insightface', download_zip=False): | |
| _root = os.path.expanduser(root) | |
| model_root = os.path.join(_root, sub_dir) | |
| new_model_file = os.path.join(model_root, model_file) | |
| if os.path.exists(new_model_file) and not force: | |
| return new_model_file | |
| if not os.path.exists(model_root): | |
| os.makedirs(model_root) | |
| print('download_path:', new_model_file, file=sys.stderr) | |
| if not download_zip: | |
| model_url = "%s/%s"%(BASE_REPO_URL, model_file) | |
| download_file(model_url, | |
| path=new_model_file, | |
| overwrite=True) | |
| else: | |
| model_url = "%s/%s.zip"%(BASE_REPO_URL, model_file) | |
| zip_file_path = new_model_file+".zip" | |
| download_file(model_url, | |
| path=zip_file_path, | |
| overwrite=True) | |
| with zipfile.ZipFile(zip_file_path) as zf: | |
| zf.extractall(model_root) | |
| return new_model_file | |
| def ensure_available(sub_dir, name, root='~/.insightface'): | |
| return download(sub_dir, name, force=False, root=root) | |
| # retinaface | |
| def distance2bbox(points, distance, max_shape=None): | |
| """Decode distance prediction to bounding box. | |
| Args: | |
| points (Tensor): Shape (n, 2), [x, y]. | |
| distance (Tensor): Distance from the given point to 4 | |
| boundaries (left, top, right, bottom). | |
| max_shape (tuple): Shape of the image. | |
| Returns: | |
| Tensor: Decoded bboxes. | |
| """ | |
| x1 = points[:, 0] - distance[:, 0] | |
| y1 = points[:, 1] - distance[:, 1] | |
| x2 = points[:, 0] + distance[:, 2] | |
| y2 = points[:, 1] + distance[:, 3] | |
| if max_shape is not None: | |
| x1 = x1.clamp(min=0, max=max_shape[1]) | |
| y1 = y1.clamp(min=0, max=max_shape[0]) | |
| x2 = x2.clamp(min=0, max=max_shape[1]) | |
| y2 = y2.clamp(min=0, max=max_shape[0]) | |
| return np.stack([x1, y1, x2, y2], axis=-1) | |
| def distance2kps(points, distance, max_shape=None): | |
| """Decode distance prediction to bounding box. | |
| Args: | |
| points (Tensor): Shape (n, 2), [x, y]. | |
| distance (Tensor): Distance from the given point to 4 | |
| boundaries (left, top, right, bottom). | |
| max_shape (tuple): Shape of the image. | |
| Returns: | |
| Tensor: Decoded bboxes. | |
| """ | |
| preds = [] | |
| for i in range(0, distance.shape[1], 2): | |
| px = points[:, i%2] + distance[:, i] | |
| py = points[:, i%2+1] + distance[:, i+1] | |
| if max_shape is not None: | |
| px = px.clamp(min=0, max=max_shape[1]) | |
| py = py.clamp(min=0, max=max_shape[0]) | |
| preds.append(px) | |
| preds.append(py) | |
| return np.stack(preds, axis=-1) | |
| class RetinaFace: | |
| def __init__(self, model_file=None, session=None): | |
| import onnxruntime | |
| self.model_file = model_file | |
| self.session = session | |
| self.taskname = 'detection' | |
| if self.session is None: | |
| assert self.model_file is not None | |
| assert os.path.exists(self.model_file) | |
| self.session = onnxruntime.InferenceSession(self.model_file, None) | |
| self.center_cache = {} | |
| self.nms_thresh = 0.4 | |
| self.det_thresh = 0.5 | |
| self._init_vars() | |
| def _init_vars(self): | |
| input_cfg = self.session.get_inputs()[0] | |
| input_shape = input_cfg.shape | |
| #print(input_shape, file=sys.stderr) | |
| if isinstance(input_shape[2], str): | |
| self.input_size = None | |
| else: | |
| self.input_size = tuple(input_shape[2:4][::-1]) | |
| #print('image_size:', self.image_size, file=sys.stderr) | |
| input_name = input_cfg.name | |
| self.input_shape = input_shape | |
| outputs = self.session.get_outputs() | |
| output_names = [] | |
| for o in outputs: | |
| output_names.append(o.name) | |
| self.input_name = input_name | |
| self.output_names = output_names | |
| self.input_mean = 127.5 | |
| self.input_std = 128.0 | |
| #print(self.output_names, file=sys.stderr) | |
| #assert len(outputs)==10 or len(outputs)==15 | |
| self.use_kps = False | |
| self._anchor_ratio = 1.0 | |
| self._num_anchors = 1 | |
| if len(outputs)==6: | |
| self.fmc = 3 | |
| self._feat_stride_fpn = [8, 16, 32] | |
| self._num_anchors = 2 | |
| elif len(outputs)==9: | |
| self.fmc = 3 | |
| self._feat_stride_fpn = [8, 16, 32] | |
| self._num_anchors = 2 | |
| self.use_kps = True | |
| elif len(outputs)==10: | |
| self.fmc = 5 | |
| self._feat_stride_fpn = [8, 16, 32, 64, 128] | |
| self._num_anchors = 1 | |
| elif len(outputs)==15: | |
| self.fmc = 5 | |
| self._feat_stride_fpn = [8, 16, 32, 64, 128] | |
| self._num_anchors = 1 | |
| self.use_kps = True | |
| def prepare(self, ctx_id, **kwargs): | |
| if ctx_id<0: | |
| self.session.set_providers(['CPUExecutionProvider']) | |
| nms_thresh = kwargs.get('nms_thresh', None) | |
| if nms_thresh is not None: | |
| self.nms_thresh = nms_thresh | |
| det_thresh = kwargs.get('det_thresh', None) | |
| if det_thresh is not None: | |
| self.det_thresh = det_thresh | |
| input_size = kwargs.get('input_size', None) | |
| if input_size is not None: | |
| if self.input_size is not None: | |
| print('warning: det_size is already set in detection model, ignore', file=sys.stderr) | |
| else: | |
| self.input_size = input_size | |
| def forward(self, img, threshold): | |
| scores_list = [] | |
| bboxes_list = [] | |
| kpss_list = [] | |
| input_size = tuple(img.shape[0:2][::-1]) | |
| blob = cv2.dnn.blobFromImage(img, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True) | |
| net_outs = self.session.run(self.output_names, {self.input_name : blob}) | |
| input_height = blob.shape[2] | |
| input_width = blob.shape[3] | |
| fmc = self.fmc | |
| for idx, stride in enumerate(self._feat_stride_fpn): | |
| scores = net_outs[idx] | |
| bbox_preds = net_outs[idx+fmc] | |
| bbox_preds = bbox_preds * stride | |
| if self.use_kps: | |
| kps_preds = net_outs[idx+fmc*2] * stride | |
| height = input_height // stride | |
| width = input_width // stride | |
| K = height * width | |
| key = (height, width, stride) | |
| if key in self.center_cache: | |
| anchor_centers = self.center_cache[key] | |
| else: | |
| #solution-1, c style: | |
| #anchor_centers = np.zeros( (height, width, 2), dtype=np.float32 ) | |
| #for i in range(height): | |
| # anchor_centers[i, :, 1] = i | |
| #for i in range(width): | |
| # anchor_centers[:, i, 0] = i | |
| #solution-2: | |
| #ax = np.arange(width, dtype=np.float32) | |
| #ay = np.arange(height, dtype=np.float32) | |
| #xv, yv = np.meshgrid(np.arange(width), np.arange(height)) | |
| #anchor_centers = np.stack([xv, yv], axis=-1).astype(np.float32) | |
| #solution-3: | |
| anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32) | |
| #print(anchor_centers.shape, file=sys.stderr) | |
| anchor_centers = (anchor_centers * stride).reshape( (-1, 2) ) | |
| if self._num_anchors>1: | |
| anchor_centers = np.stack([anchor_centers]*self._num_anchors, axis=1).reshape( (-1,2) ) | |
| if len(self.center_cache)<100: | |
| self.center_cache[key] = anchor_centers | |
| pos_inds = np.where(scores>=threshold)[0] | |
| bboxes = distance2bbox(anchor_centers, bbox_preds) | |
| pos_scores = scores[pos_inds] | |
| pos_bboxes = bboxes[pos_inds] | |
| scores_list.append(pos_scores) | |
| bboxes_list.append(pos_bboxes) | |
| if self.use_kps: | |
| kpss = distance2kps(anchor_centers, kps_preds) | |
| #kpss = kps_preds | |
| kpss = kpss.reshape( (kpss.shape[0], -1, 2) ) | |
| pos_kpss = kpss[pos_inds] | |
| kpss_list.append(pos_kpss) | |
| return scores_list, bboxes_list, kpss_list | |
| def detect(self, img, input_size = None, max_num=0, metric='default'): | |
| assert input_size is not None or self.input_size is not None | |
| input_size = self.input_size if input_size is None else input_size | |
| im_ratio = float(img.shape[0]) / img.shape[1] | |
| model_ratio = float(input_size[1]) / input_size[0] | |
| if im_ratio>model_ratio: | |
| new_height = input_size[1] | |
| new_width = int(new_height / im_ratio) | |
| else: | |
| new_width = input_size[0] | |
| new_height = int(new_width * im_ratio) | |
| det_scale = float(new_height) / img.shape[0] | |
| resized_img = cv2.resize(img, (new_width, new_height)) | |
| det_img = np.zeros( (input_size[1], input_size[0], 3), dtype=np.uint8 ) | |
| det_img[:new_height, :new_width, :] = resized_img | |
| scores_list, bboxes_list, kpss_list = self.forward(det_img, self.det_thresh) | |
| scores = np.vstack(scores_list) | |
| scores_ravel = scores.ravel() | |
| order = scores_ravel.argsort()[::-1] | |
| bboxes = np.vstack(bboxes_list) / det_scale | |
| if self.use_kps: | |
| kpss = np.vstack(kpss_list) / det_scale | |
| pre_det = np.hstack((bboxes, scores)).astype(np.float32, copy=False) | |
| pre_det = pre_det[order, :] | |
| keep = self.nms(pre_det) | |
| det = pre_det[keep, :] | |
| if self.use_kps: | |
| kpss = kpss[order,:,:] | |
| kpss = kpss[keep,:,:] | |
| else: | |
| kpss = None | |
| if max_num > 0 and det.shape[0] > max_num: | |
| area = (det[:, 2] - det[:, 0]) * (det[:, 3] - | |
| det[:, 1]) | |
| img_center = img.shape[0] // 2, img.shape[1] // 2 | |
| offsets = np.vstack([ | |
| (det[:, 0] + det[:, 2]) / 2 - img_center[1], | |
| (det[:, 1] + det[:, 3]) / 2 - img_center[0] | |
| ]) | |
| offset_dist_squared = np.sum(np.power(offsets, 2.0), 0) | |
| if metric=='max': | |
| values = area | |
| else: | |
| values = area - offset_dist_squared * 2.0 # some extra weight on the centering | |
| bindex = np.argsort( | |
| values)[::-1] # some extra weight on the centering | |
| bindex = bindex[0:max_num] | |
| det = det[bindex, :] | |
| if kpss is not None: | |
| kpss = kpss[bindex, :] | |
| return det, kpss | |
| def nms(self, dets): | |
| thresh = self.nms_thresh | |
| x1 = dets[:, 0] | |
| y1 = dets[:, 1] | |
| x2 = dets[:, 2] | |
| y2 = dets[:, 3] | |
| scores = dets[:, 4] | |
| areas = (x2 - x1 + 1) * (y2 - y1 + 1) | |
| order = scores.argsort()[::-1] | |
| keep = [] | |
| while order.size > 0: | |
| i = order[0] | |
| keep.append(i) | |
| xx1 = np.maximum(x1[i], x1[order[1:]]) | |
| yy1 = np.maximum(y1[i], y1[order[1:]]) | |
| xx2 = np.minimum(x2[i], x2[order[1:]]) | |
| yy2 = np.minimum(y2[i], y2[order[1:]]) | |
| w = np.maximum(0.0, xx2 - xx1 + 1) | |
| h = np.maximum(0.0, yy2 - yy1 + 1) | |
| inter = w * h | |
| ovr = inter / (areas[i] + areas[order[1:]] - inter) | |
| inds = np.where(ovr <= thresh)[0] | |
| order = order[inds + 1] | |
| return keep | |
| # arcface | |
| arcface_dst = np.array( | |
| [[38.2946, 51.6963], [73.5318, 51.5014], [56.0252, 71.7366], | |
| [41.5493, 92.3655], [70.7299, 92.2041]], | |
| dtype=np.float32) | |
| def estimate_norm(lmk, image_size=112,mode='arcface'): | |
| assert lmk.shape == (5, 2) | |
| assert image_size%112==0 or image_size%128==0 | |
| if image_size%112==0: | |
| ratio = float(image_size)/112.0 | |
| diff_x = 0 | |
| else: | |
| ratio = float(image_size)/128.0 | |
| diff_x = 8.0*ratio | |
| dst = arcface_dst * ratio | |
| dst[:,0] += diff_x | |
| tform = trans.SimilarityTransform() | |
| tform.estimate(lmk, dst) | |
| M = tform.params[0:2, :] | |
| return M | |
| def norm_crop(img, landmark, image_size=112, mode='arcface'): | |
| M = estimate_norm(landmark, image_size, mode) | |
| warped = cv2.warpAffine(img, M, (image_size, image_size), borderValue=0.0) | |
| return warped | |
| def norm_crop2(img, landmark, image_size=112, mode='arcface'): | |
| M = estimate_norm(landmark, image_size, mode) | |
| warped = cv2.warpAffine(img, M, (image_size, image_size), borderValue=0.0) | |
| return warped, M | |
| class ArcFaceONNX: | |
| def __init__(self, model_file=None, session=None): | |
| assert model_file is not None | |
| self.model_file = model_file | |
| self.session = session | |
| self.taskname = 'recognition' | |
| find_sub = False | |
| find_mul = False | |
| model = onnx.load(self.model_file) | |
| graph = model.graph | |
| for nid, node in enumerate(graph.node[:8]): | |
| #print(nid, node.name, file=sys.stderr) | |
| if node.name.startswith('Sub') or node.name.startswith('_minus'): | |
| find_sub = True | |
| if node.name.startswith('Mul') or node.name.startswith('_mul'): | |
| find_mul = True | |
| if find_sub and find_mul: | |
| #mxnet arcface model | |
| input_mean = 0.0 | |
| input_std = 1.0 | |
| else: | |
| input_mean = 127.5 | |
| input_std = 127.5 | |
| self.input_mean = input_mean | |
| self.input_std = input_std | |
| #print('input mean and std:', self.input_mean, self.input_std, file=sys.stderr) | |
| if self.session is None: | |
| self.session = onnxruntime.InferenceSession(self.model_file, None) | |
| input_cfg = self.session.get_inputs()[0] | |
| input_shape = input_cfg.shape | |
| input_name = input_cfg.name | |
| self.input_size = tuple(input_shape[2:4][::-1]) | |
| self.input_shape = input_shape | |
| outputs = self.session.get_outputs() | |
| output_names = [] | |
| for out in outputs: | |
| output_names.append(out.name) | |
| self.input_name = input_name | |
| self.output_names = output_names | |
| assert len(self.output_names)==1 | |
| self.output_shape = outputs[0].shape | |
| def prepare(self, ctx_id, **kwargs): | |
| if ctx_id<0: | |
| self.session.set_providers(['CPUExecutionProvider']) | |
| def get(self, img, face): | |
| aimg = norm_crop(img, landmark=face.kps, image_size=self.input_size[0]) | |
| face.embedding = self.get_feat(aimg).flatten() | |
| return face.embedding | |
| def compute_sim(self, feat1, feat2): | |
| from numpy.linalg import norm | |
| feat1 = feat1.ravel() | |
| feat2 = feat2.ravel() | |
| sim = np.dot(feat1, feat2) / (norm(feat1) * norm(feat2)) | |
| return sim | |
| def get_feat(self, imgs): | |
| if not isinstance(imgs, list): | |
| imgs = [imgs] | |
| input_size = self.input_size | |
| blob = cv2.dnn.blobFromImages(imgs, 1.0 / self.input_std, input_size, | |
| (self.input_mean, self.input_mean, self.input_mean), swapRB=True) | |
| net_out = self.session.run(self.output_names, {self.input_name: blob})[0] | |
| return net_out | |
| def forward(self, batch_data): | |
| blob = (batch_data - self.input_mean) / self.input_std | |
| net_out = self.session.run(self.output_names, {self.input_name: blob})[0] | |
| return net_out | |
| # genderage | |
| def transform(data, center, output_size, scale, rotation): | |
| scale_ratio = scale | |
| rot = float(rotation) * np.pi / 180.0 | |
| #translation = (output_size/2-center[0]*scale_ratio, output_size/2-center[1]*scale_ratio) | |
| t1 = trans.SimilarityTransform(scale=scale_ratio) | |
| cx = center[0] * scale_ratio | |
| cy = center[1] * scale_ratio | |
| t2 = trans.SimilarityTransform(translation=(-1 * cx, -1 * cy)) | |
| t3 = trans.SimilarityTransform(rotation=rot) | |
| t4 = trans.SimilarityTransform(translation=(output_size / 2, | |
| output_size / 2)) | |
| t = t1 + t2 + t3 + t4 | |
| M = t.params[0:2] | |
| cropped = cv2.warpAffine(data, | |
| M, (output_size, output_size), | |
| borderValue=0.0) | |
| return cropped, M | |
| class Attribute: | |
| def __init__(self, model_file=None, session=None): | |
| assert model_file is not None | |
| self.model_file = model_file | |
| self.session = session | |
| find_sub = False | |
| find_mul = False | |
| model = onnx.load(self.model_file) | |
| graph = model.graph | |
| for nid, node in enumerate(graph.node[:8]): | |
| #print(nid, node.name, file=sys.stderr) | |
| if node.name.startswith('Sub') or node.name.startswith('_minus'): | |
| find_sub = True | |
| if node.name.startswith('Mul') or node.name.startswith('_mul'): | |
| find_mul = True | |
| if nid<3 and node.name=='bn_data': | |
| find_sub = True | |
| find_mul = True | |
| if find_sub and find_mul: | |
| #mxnet arcface model | |
| input_mean = 0.0 | |
| input_std = 1.0 | |
| else: | |
| input_mean = 127.5 | |
| input_std = 128.0 | |
| self.input_mean = input_mean | |
| self.input_std = input_std | |
| #print('input mean and std:', model_file, self.input_mean, self.input_std, file=sys.stderr) | |
| if self.session is None: | |
| self.session = onnxruntime.InferenceSession(self.model_file, None) | |
| input_cfg = self.session.get_inputs()[0] | |
| input_shape = input_cfg.shape | |
| input_name = input_cfg.name | |
| self.input_size = tuple(input_shape[2:4][::-1]) | |
| self.input_shape = input_shape | |
| outputs = self.session.get_outputs() | |
| output_names = [] | |
| for out in outputs: | |
| output_names.append(out.name) | |
| self.input_name = input_name | |
| self.output_names = output_names | |
| assert len(self.output_names)==1 | |
| output_shape = outputs[0].shape | |
| #print('init output_shape:', output_shape, file=sys.stderr) | |
| if output_shape[1]==3: | |
| self.taskname = 'genderage' | |
| else: | |
| self.taskname = 'attribute_%d'%output_shape[1] | |
| def prepare(self, ctx_id, **kwargs): | |
| if ctx_id<0: | |
| self.session.set_providers(['CPUExecutionProvider']) | |
| def get(self, img, face): | |
| bbox = face.bbox | |
| w, h = (bbox[2] - bbox[0]), (bbox[3] - bbox[1]) | |
| center = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2 | |
| rotate = 0 | |
| _scale = self.input_size[0] / (max(w, h)*1.5) | |
| #print('param:', img.shape, bbox, center, self.input_size, _scale, rotate, file=sys.stderr) | |
| aimg, M = transform(img, center, self.input_size[0], _scale, rotate) | |
| input_size = tuple(aimg.shape[0:2][::-1]) | |
| #assert input_size==self.input_size | |
| blob = cv2.dnn.blobFromImage(aimg, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True) | |
| pred = self.session.run(self.output_names, {self.input_name : blob})[0][0] | |
| if self.taskname=='genderage': | |
| assert len(pred)==3 | |
| gender = np.argmax(pred[:2]) | |
| age = int(np.round(pred[2]*100)) | |
| face['gender'] = gender | |
| face['age'] = age | |
| return gender, age | |
| else: | |
| return pred | |
| # landmark | |
| def trans_points2d(pts, M): | |
| new_pts = np.zeros(shape=pts.shape, dtype=np.float32) | |
| for i in range(pts.shape[0]): | |
| pt = pts[i] | |
| new_pt = np.array([pt[0], pt[1], 1.], dtype=np.float32) | |
| new_pt = np.dot(M, new_pt) | |
| #print('new_pt', new_pt.shape, new_pt, file=sys.stderr) | |
| new_pts[i] = new_pt[0:2] | |
| return new_pts | |
| def trans_points3d(pts, M): | |
| scale = np.sqrt(M[0][0] * M[0][0] + M[0][1] * M[0][1]) | |
| #print(scale, file=sys.stderr) | |
| new_pts = np.zeros(shape=pts.shape, dtype=np.float32) | |
| for i in range(pts.shape[0]): | |
| pt = pts[i] | |
| new_pt = np.array([pt[0], pt[1], 1.], dtype=np.float32) | |
| new_pt = np.dot(M, new_pt) | |
| #print('new_pt', new_pt.shape, new_pt, file=sys.stderr) | |
| new_pts[i][0:2] = new_pt[0:2] | |
| new_pts[i][2] = pts[i][2] * scale | |
| return new_pts | |
| def trans_points(pts, M): | |
| if pts.shape[1] == 2: | |
| return trans_points2d(pts, M) | |
| else: | |
| return trans_points3d(pts, M) | |
| def get_object(name): | |
| objects_dir = os.path.join(Path(__file__).parent.absolute(), 'objects') | |
| if not name.endswith('.pkl'): | |
| name = name+".pkl" | |
| filepath = os.path.join(objects_dir, name) | |
| if not os.path.exists(filepath): | |
| return None | |
| with open(filepath, 'rb') as f: | |
| obj = pickle.load(f) | |
| return obj | |
| class Landmark: | |
| def __init__(self, model_file=None, session=None): | |
| assert model_file is not None | |
| self.model_file = model_file | |
| self.session = session | |
| find_sub = False | |
| find_mul = False | |
| model = onnx.load(self.model_file) | |
| graph = model.graph | |
| for nid, node in enumerate(graph.node[:8]): | |
| #print(nid, node.name, file=sys.stderr) | |
| if node.name.startswith('Sub') or node.name.startswith('_minus'): | |
| find_sub = True | |
| if node.name.startswith('Mul') or node.name.startswith('_mul'): | |
| find_mul = True | |
| if nid<3 and node.name=='bn_data': | |
| find_sub = True | |
| find_mul = True | |
| if find_sub and find_mul: | |
| #mxnet arcface model | |
| input_mean = 0.0 | |
| input_std = 1.0 | |
| else: | |
| input_mean = 127.5 | |
| input_std = 128.0 | |
| self.input_mean = input_mean | |
| self.input_std = input_std | |
| #print('input mean and std:', model_file, self.input_mean, self.input_std, file=sys.stderr) | |
| if self.session is None: | |
| self.session = onnxruntime.InferenceSession(self.model_file, None) | |
| input_cfg = self.session.get_inputs()[0] | |
| input_shape = input_cfg.shape | |
| input_name = input_cfg.name | |
| self.input_size = tuple(input_shape[2:4][::-1]) | |
| self.input_shape = input_shape | |
| outputs = self.session.get_outputs() | |
| output_names = [] | |
| for out in outputs: | |
| output_names.append(out.name) | |
| self.input_name = input_name | |
| self.output_names = output_names | |
| assert len(self.output_names)==1 | |
| output_shape = outputs[0].shape | |
| self.require_pose = False | |
| #print('init output_shape:', output_shape, file=sys.stderr) | |
| if output_shape[1]==3309: | |
| self.lmk_dim = 3 | |
| self.lmk_num = 68 | |
| self.mean_lmk = get_object('meanshape_68.pkl') | |
| self.require_pose = True | |
| else: | |
| self.lmk_dim = 2 | |
| self.lmk_num = output_shape[1]//self.lmk_dim | |
| self.taskname = 'landmark_%dd_%d'%(self.lmk_dim, self.lmk_num) | |
| def prepare(self, ctx_id, **kwargs): | |
| if ctx_id<0: | |
| self.session.set_providers(['CPUExecutionProvider']) | |
| def get(self, img, face): | |
| bbox = face.bbox | |
| w, h = (bbox[2] - bbox[0]), (bbox[3] - bbox[1]) | |
| center = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2 | |
| rotate = 0 | |
| _scale = self.input_size[0] / (max(w, h)*1.5) | |
| #print('param:', img.shape, bbox, center, self.input_size, _scale, rotate, file=sys.stderr) | |
| aimg, M = transform(img, center, self.input_size[0], _scale, rotate) | |
| input_size = tuple(aimg.shape[0:2][::-1]) | |
| #assert input_size==self.input_size | |
| blob = cv2.dnn.blobFromImage(aimg, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True) | |
| pred = self.session.run(self.output_names, {self.input_name : blob})[0][0] | |
| if pred.shape[0] >= 3000: | |
| pred = pred.reshape((-1, 3)) | |
| else: | |
| pred = pred.reshape((-1, 2)) | |
| if self.lmk_num < pred.shape[0]: | |
| pred = pred[self.lmk_num*-1:,:] | |
| pred[:, 0:2] += 1 | |
| pred[:, 0:2] *= (self.input_size[0] // 2) | |
| if pred.shape[1] == 3: | |
| pred[:, 2] *= (self.input_size[0] // 2) | |
| IM = cv2.invertAffineTransform(M) | |
| pred = trans_points(pred, IM) | |
| face[self.taskname] = pred | |
| if self.require_pose: | |
| P = transform.estimate_affine_matrix_3d23d(self.mean_lmk, pred) | |
| s, R, t = transform.P2sRt(P) | |
| rx, ry, rz = transform.matrix2angle(R) | |
| pose = np.array( [rx, ry, rz], dtype=np.float32 ) | |
| face['pose'] = pose #pitch, yaw, roll | |
| return pred | |
| # inswapper | |
| class INSwapper(): | |
| def __init__(self, model_file=None, session=None): | |
| self.model_file = model_file | |
| self.session = session | |
| model = onnx.load(self.model_file) | |
| graph = model.graph | |
| self.emap = numpy_helper.to_array(graph.initializer[-1]) | |
| self.input_mean = 0.0 | |
| self.input_std = 255.0 | |
| #print('input mean and std:', model_file, self.input_mean, self.input_std, file=sys.stderr) | |
| if self.session is None: | |
| self.session = onnxruntime.InferenceSession(self.model_file, None) | |
| inputs = self.session.get_inputs() | |
| self.input_names = [] | |
| for inp in inputs: | |
| self.input_names.append(inp.name) | |
| outputs = self.session.get_outputs() | |
| output_names = [] | |
| for out in outputs: | |
| output_names.append(out.name) | |
| self.output_names = output_names | |
| assert len(self.output_names)==1 | |
| output_shape = outputs[0].shape | |
| input_cfg = inputs[0] | |
| input_shape = input_cfg.shape | |
| self.input_shape = input_shape | |
| print('inswapper-shape:', self.input_shape, file=sys.stderr) | |
| self.input_size = tuple(input_shape[2:4][::-1]) | |
| def forward(self, img, latent): | |
| img = (img - self.input_mean) / self.input_std | |
| pred = self.session.run(self.output_names, {self.input_names[0]: img, self.input_names[1]: latent})[0] | |
| return pred | |
| def get(self, img, target_face, source_face, paste_back=True): | |
| aimg, M = norm_crop2(img, target_face.kps, self.input_size[0]) | |
| blob = cv2.dnn.blobFromImage(aimg, 1.0 / self.input_std, self.input_size, | |
| (self.input_mean, self.input_mean, self.input_mean), swapRB=True) | |
| latent = source_face.normed_embedding.reshape((1,-1)) | |
| latent = np.dot(latent, self.emap) | |
| latent /= np.linalg.norm(latent) | |
| pred = self.session.run(self.output_names, {self.input_names[0]: blob, self.input_names[1]: latent})[0] | |
| #print(latent.shape, latent.dtype, pred.shape, file=sys.stderr) | |
| img_fake = pred.transpose((0,2,3,1))[0] | |
| bgr_fake = np.clip(255 * img_fake, 0, 255).astype(np.uint8)[:,:,::-1] | |
| if not paste_back: | |
| return bgr_fake, M | |
| else: | |
| target_img = img | |
| fake_diff = bgr_fake.astype(np.float32) - aimg.astype(np.float32) | |
| fake_diff = np.abs(fake_diff).mean(axis=2) | |
| fake_diff[:2,:] = 0 | |
| fake_diff[-2:,:] = 0 | |
| fake_diff[:,:2] = 0 | |
| fake_diff[:,-2:] = 0 | |
| IM = cv2.invertAffineTransform(M) | |
| img_white = np.full((aimg.shape[0],aimg.shape[1]), 255, dtype=np.float32) | |
| bgr_fake = cv2.warpAffine(bgr_fake, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0) | |
| img_white = cv2.warpAffine(img_white, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0) | |
| fake_diff = cv2.warpAffine(fake_diff, IM, (target_img.shape[1], target_img.shape[0]), borderValue=0.0) | |
| img_white[img_white>20] = 255 | |
| fthresh = 10 | |
| fake_diff[fake_diff<fthresh] = 0 | |
| fake_diff[fake_diff>=fthresh] = 255 | |
| img_mask = img_white | |
| mask_h_inds, mask_w_inds = np.where(img_mask==255) | |
| mask_h = np.max(mask_h_inds) - np.min(mask_h_inds) | |
| mask_w = np.max(mask_w_inds) - np.min(mask_w_inds) | |
| mask_size = int(np.sqrt(mask_h*mask_w)) | |
| k = max(mask_size//10, 10) | |
| #k = max(mask_size//20, 6) | |
| #k = 6 | |
| kernel = np.ones((k,k),np.uint8) | |
| img_mask = cv2.erode(img_mask,kernel,iterations = 1) | |
| kernel = np.ones((2,2),np.uint8) | |
| fake_diff = cv2.dilate(fake_diff,kernel,iterations = 1) | |
| k = max(mask_size//20, 5) | |
| #k = 3 | |
| #k = 3 | |
| kernel_size = (k, k) | |
| blur_size = tuple(2*i+1 for i in kernel_size) | |
| img_mask = cv2.GaussianBlur(img_mask, blur_size, 0) | |
| k = 5 | |
| kernel_size = (k, k) | |
| blur_size = tuple(2*i+1 for i in kernel_size) | |
| fake_diff = cv2.GaussianBlur(fake_diff, blur_size, 0) | |
| img_mask /= 255 | |
| fake_diff /= 255 | |
| #img_mask = fake_diff | |
| img_mask = np.reshape(img_mask, [img_mask.shape[0],img_mask.shape[1],1]) | |
| fake_merged = img_mask * bgr_fake + (1-img_mask) * target_img.astype(np.float32) | |
| fake_merged = fake_merged.astype(np.uint8) | |
| return fake_merged | |
| # model_zoo | |
| class PickableInferenceSession(onnxruntime.InferenceSession): | |
| # This is a wrapper to make the current InferenceSession class pickable. | |
| def __init__(self, model_path, **kwargs): | |
| super().__init__(model_path, **kwargs) | |
| self.model_path = model_path | |
| def __getstate__(self): | |
| return {'model_path': self.model_path} | |
| def __setstate__(self, values): | |
| model_path = values['model_path'] | |
| self.__init__(model_path) | |
| class ModelRouter: | |
| def __init__(self, onnx_file): | |
| self.onnx_file = onnx_file | |
| def get_model(self, **kwargs): | |
| session = PickableInferenceSession(self.onnx_file, **kwargs) | |
| print(f'Applied providers: {session._providers}, with options: {session._provider_options}', file=sys.stderr) | |
| inputs = session.get_inputs() | |
| input_cfg = inputs[0] | |
| input_shape = input_cfg.shape | |
| outputs = session.get_outputs() | |
| if len(outputs)>=5: | |
| return RetinaFace(model_file=self.onnx_file, session=session) | |
| elif input_shape[2]==192 and input_shape[3]==192: | |
| return Landmark(model_file=self.onnx_file, session=session) | |
| # return None | |
| elif input_shape[2]==96 and input_shape[3]==96: | |
| return Attribute(model_file=self.onnx_file, session=session) | |
| # return None | |
| elif len(inputs)==2 and input_shape[2]==128 and input_shape[3]==128: | |
| return INSwapper(model_file=self.onnx_file, session=session) | |
| # return None | |
| elif input_shape[2]==input_shape[3] and input_shape[2]>=112 and input_shape[2]%16==0: | |
| return ArcFaceONNX(model_file=self.onnx_file, session=session) | |
| else: | |
| #raise RuntimeError('error on model routing') | |
| return None | |
| def find_onnx_file(dir_path): | |
| if not os.path.exists(dir_path): | |
| return None | |
| paths = glob.glob("%s/*.onnx" % dir_path) | |
| if len(paths) == 0: | |
| return None | |
| paths = sorted(paths) | |
| return paths[-1] | |
| def get_default_providers(): | |
| return ['CUDAExecutionProvider', 'CPUExecutionProvider'] | |
| def get_default_provider_options(): | |
| return None | |
| def get_model(name, **kwargs): | |
| root = kwargs.get('root', '~/.insightface') | |
| root = os.path.expanduser(root) | |
| model_root = os.path.join(root, 'models') | |
| allow_download = kwargs.get('download', False) | |
| download_zip = kwargs.get('download_zip', False) | |
| if not name.endswith('.onnx'): | |
| model_dir = os.path.join(model_root, name) | |
| model_file = find_onnx_file(model_dir) | |
| if model_file is None: | |
| return None | |
| else: | |
| model_file = name | |
| if not os.path.exists(model_file) and allow_download: | |
| model_file = download_onnx('models', model_file, root=root, download_zip=download_zip) | |
| assert os.path.exists(model_file), 'model_file %s should exist'%model_file | |
| assert os.path.isfile(model_file), 'model_file %s should be a file'%model_file | |
| router = ModelRouter(model_file) | |
| providers = kwargs.get('providers', get_default_providers()) | |
| provider_options = kwargs.get('provider_options', get_default_provider_options()) | |
| model = router.get_model(providers=providers, provider_options=provider_options) | |
| return model | |
| class FaceAnalysis: | |
| def __init__(self, name=DEFAULT_MP_NAME, root='~/.insightface', allowed_modules=ALLOWED_MODULES, **kwargs): | |
| # TODO: make optional | |
| onnxruntime.set_default_logger_severity(3) | |
| self.models = {} | |
| self.model_dir = ensure_available('models', name, root=root) | |
| onnx_files = glob.glob(os.path.join(self.model_dir, '*.onnx')) | |
| onnx_files = sorted(onnx_files) | |
| for onnx_file in onnx_files: | |
| model = get_model(onnx_file, **kwargs) | |
| if model is None: | |
| print('model not recognized:', onnx_file, file=sys.stderr) | |
| elif isinstance(model, INSwapper): | |
| # NOTE: face_swap model is not supposed to be autoloaded in FaceAnalysis. | |
| print('model ignore:', onnx_file, "face_swap", file=sys.stderr) | |
| del model | |
| elif allowed_modules is not None and model.taskname not in allowed_modules: | |
| print('model ignore:', onnx_file, model.taskname, file=sys.stderr) | |
| del model | |
| elif model.taskname not in self.models and (allowed_modules is None or model.taskname in allowed_modules): | |
| print('find model:', onnx_file, model.taskname, model.input_shape, model.input_mean, model.input_std, file=sys.stderr) | |
| self.models[model.taskname] = model | |
| else: | |
| print('duplicated model task type, ignore:', onnx_file, model.taskname, file=sys.stderr) | |
| del model | |
| assert 'detection' in self.models | |
| self.det_model = self.models['detection'] | |
| def prepare(self, ctx_id, det_thresh=0.5, det_size=(640, 640)): | |
| self.det_thresh = det_thresh | |
| assert det_size is not None | |
| print('set det-size:', det_size, file=sys.stderr) | |
| self.det_size = det_size | |
| for taskname, model in self.models.items(): | |
| if taskname=='detection': | |
| model.prepare(ctx_id, input_size=det_size, det_thresh=det_thresh) | |
| else: | |
| model.prepare(ctx_id) | |
| def get(self, img, max_num=0): | |
| bboxes, kpss = self.det_model.detect(img, | |
| max_num=max_num, | |
| metric='default') | |
| if bboxes.shape[0] == 0: | |
| return [] | |
| ret = [] | |
| for i in range(bboxes.shape[0]): | |
| bbox = bboxes[i, 0:4] | |
| det_score = bboxes[i, 4] | |
| kps = None | |
| if kpss is not None: | |
| kps = kpss[i] | |
| face = Face(bbox=bbox, kps=kps, det_score=det_score) | |
| for taskname, model in self.models.items(): | |
| if taskname=='detection': | |
| continue | |
| model.get(img, face) | |
| ret.append(face) | |
| return ret | |
| def draw_on(self, img, faces): | |
| dimg = img.copy() | |
| for i in range(len(faces)): | |
| face = faces[i] | |
| box = face.bbox.astype(int) | |
| color = (0, 0, 255) | |
| cv2.rectangle(dimg, (box[0], box[1]), (box[2], box[3]), color, 2) | |
| if face.kps is not None: | |
| kps = face.kps.astype(int) | |
| #print(landmark.shape, file=sys.stderr) | |
| for l in range(kps.shape[0]): | |
| color = (0, 0, 255) | |
| if l == 0 or l == 3: | |
| color = (0, 255, 0) | |
| cv2.circle(dimg, (kps[l][0], kps[l][1]), 1, color, | |
| 2) | |
| if face.gender is not None and face.age is not None: | |
| cv2.putText(dimg,'%s,%d'%(face.sex,face.age), (box[0]-1, box[1]-4),cv2.FONT_HERSHEY_COMPLEX,0.7,(0,255,0),1) | |
| #for key, value in face.items(): | |
| # if key.startswith('landmark_3d'): | |
| # print(key, value.shape, file=sys.stderr) | |
| # print(value[0:10,:], file=sys.stderr) | |
| # lmk = np.round(value).astype(np.int) | |
| # for l in range(lmk.shape[0]): | |
| # color = (255, 0, 0) | |
| # cv2.circle(dimg, (lmk[l][0], lmk[l][1]), 1, color, | |
| # 2) | |
| return dimg | |
| class FaceAnalysis2(FaceAnalysis): | |
| """Convenience wrapper that allows to customize input size per inference call of the detection model.""" | |
| def get(self, img, max_num=0, det_size=(640, 640)): | |
| if det_size is not None: | |
| self.det_model.input_size = det_size | |
| return super().get(img, max_num) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment