顔認識ライブラリ「InsightFace」で顔認証と実用的なAI開発の基礎を学ぶ (2)

前回のおさらい

前回のセッション セッション(1)はこちら

前回はFaceAnalysisクラスを使ったサンプルコードでサクッと顔認証を実装しました。驚くほど簡単に顔認証が行えましたね。それからInsightFaceライブラリのgithubページから、python実装コードがあるページを開きFaceAnalysisクラスのコードを確認しました。

InsightFaceの公式レポジトリ

InsightFaceの公式リポジトリの該当ページはこちら
Tips

このpython実装コードはinsightfaceライブラリをpipインストールされた際に、お使いのpython環境に全く同じものがコピーされています。github上でコードを追いかけるのは大変ですので、お使いのPC内で該当ディレクトリをVSCodeなどの便利なコードエディタで開き、簡単にコードを追えるようにしておきましょう

また、サンプルコードを使用するとonnx拡張子のファイルが自動的にダウンロードされることも確認しました。ダウンロードされるファイルは以下の5つでしたね。

ダウンロードされたonnxファイル

  1. 1k3d69.onnx
  2. 2d106det.onnx
  3. det_10g.onnx
  4. genderage.onnx
  5. w600k_r50.onnx

これらのonnxファイルは、InsightFaceが内部で使用している‘AIモデルです。今回はこのAIモデルを解析していきます。ONNX(Open Neural Network Exchange)ファイルは、機械学習モデルを異なるフレームワーク間(TensorFlow, Pytorch, MxNetなど)で移動させるためのオープンソースフォーマットです。ONNXファイルは、モデルのアーキテクチャと重みが保存されています。

FaceAnalysisクラスの解析

FaceAnalysisクラスのコードを見ていきましょう。FaceAnalysisクラスは、顔認証を行うためのクラスですがどのような内容になっているでしょうか。以下のコードは、FaceAnalysisクラスのコンストラクタです。コンストラクタは、クラスのインスタンスを作成する際に呼び出されるメソッドです。コンストラクタ内のensure_available() (6行目にあります)の中で、AIモデルをダウンロードしています。お使いのVSCodeでensure_available()をctrl+クリックすることで、ensure_available()のコードも確認することができますので、しっかりと確認したい方は、コードを追ってみてください。

face_analysis.py
1
2class FaceAnalysis:
3 def __init__(self, name=DEFAULT_MP_NAME, root='~/.insightface', allowed_modules=None, **kwargs):
4 onnxruntime.set_default_logger_severity(3)
5 self.models = {}
6 self.model_dir = ensure_available('models', name, root=root)
7 onnx_files = glob.glob(osp.join(self.model_dir, '*.onnx'))
8 onnx_files = sorted(onnx_files)
9 for onnx_file in onnx_files:
10 model = model_zoo.get_model(onnx_file, **kwargs)
11 if model is None:
12 print('model not recognized:', onnx_file)
13 elif allowed_modules is not None and model.taskname not in allowed_modules:
14 print('model ignore:', onnx_file, model.taskname)
15 del model
16 elif model.taskname not in self.models and (allowed_modules is None or model.taskname in allowed_modules):
17 print('find model:', onnx_file, model.taskname, model.input_shape, model.input_mean, model.input_std)
18 self.models[model.taskname] = model
19 else:
20 print('duplicated model task type, ignore:', onnx_file, model.taskname)
21 del model
22 assert 'detection' in self.models
23 self.det_model = self.models['detection']
24
25 def prepare(self, ctx_id, det_thresh=0.5, det_size=(640, 640)):
26 self.det_thresh = det_thresh
27 assert det_size is not None
28 print('set det-size:', det_size)
29 self.det_size = det_size
30 for taskname, model in self.models.items():
31 if taskname == 'detection':
32 model.prepare(ctx_id, input_size=det_size, det_thresh=det_thresh)
33 else:
34 model.prepare(ctx_id)
35 // 以下省略
36

ONNXファイルを読み込んでいる部分

上のコードを詳しく読み進めると、ダウンロードされたONNXファイルをそれぞれ読み込んで切る部分が、9行目からのループ処理で行われていることがわかります。model.taksnameをみればモデルの概要が書いていそうなきがしますね!onnxファイルをonnx_filesという変数に読み込んで、model_zoo.get_model()に渡しています。Model_zooのget_model()を見ればさらに詳しくわかりそうですので、さっそく見てみましょう。ここでもVSCode上でmdoel.get_model()部分をctrl+クリックすることで、model_zoo.pyが開けると思います。

model_zoo.pyの該当部分

model_zoo.py
1
2def get_model(name, **kwargs):
3 root = kwargs.get('root', '~/.insightface')
4 root = os.path.expanduser(root)
5 model_root = osp.join(root, 'models')
6 allow_download = kwargs.get('download', False)
7 download_zip = kwargs.get('download_zip', False)
8 if not name.endswith('.onnx'):
9 model_dir = os.path.join(model_root, name)
10 model_file = find_onnx_file(model_dir)
11 if model_file is None:
12 return None
13 else:
14 model_file = name
15 if not osp.exists(model_file) and allow_download:
16 model_file = download_onnx('models', model_file, root=root, download_zip=download_zip)
17 assert osp.exists(model_file), 'model_file %s should exist'%model_file
18 assert osp.isfile(model_file), 'model_file %s should be a file'%model_file
19 router = ModelRouter(model_file)
20 providers = kwargs.get('providers', get_default_providers())
21 provider_options = kwargs.get('provider_options', get_default_provider_options())
22 model = router.get_model(providers=providers, provider_options=provider_options)
23 return model
24

model_zoo.pyの該当部分を見てみましょう。get_model()の中で、onnxファイルを読み込んでいることがわかります。また、onnxファイルの中身を読み込み、ModelRouterというクラスに格納して返していることがわかります。

ModelRouterクラスの該当部分

model_zoo.py
1
2class ModelRouter:
3 def __init__(self, onnx_file):
4 self.onnx_file = onnx_file
5
6 def get_model(self, **kwargs):
7 session = PickableInferenceSession(self.onnx_file, **kwargs)
8 print(f'Applied providers: {session._providers}, with options: {session._provider_options}')
9 inputs = session.get_inputs()
10 input_cfg = inputs[0]
11 input_shape = input_cfg.shape
12 outputs = session.get_outputs()
13
14 if len(outputs)>=5:
15 return RetinaFace(model_file=self.onnx_file, session=session)
16 elif input_shape[2]==192 and input_shape[3]==192:
17 return Landmark(model_file=self.onnx_file, session=session)
18 elif input_shape[2]==96 and input_shape[3]==96:
19 return Attribute(model_file=self.onnx_file, session=session)
20 elif len(inputs)==2 and input_shape[2]==128 and input_shape[3]==128:
21 return INSwapper(model_file=self.onnx_file, session=session)
22 elif input_shape[2]==input_shape[3] and input_shape[2]>=112 and input_shape[2]%16==0:
23 return ArcFaceONNX(model_file=self.onnx_file, session=session)
24 else:
25 #raise RuntimeError('error on model routing')
26 return None
27

いよいよ、このコードにたどりつきました。上のコードの14行目以下を見てください。InsightFaceでは、AIモデルを5つのタイプに分類していることがわかります。よくよくコードを追っていくと、それぞれがダウンロードされたONNXファイルに対応しており、それぞれのタイプに応じて、適切なAIモデルを返していることがわかります。

それぞれのモデルクラスの役割をまとめてみましょう。

それぞれのモデルクラスの役割

  1. 1k3d69.onnx顔の検出AI
  2. 2d106det.onnx顔のランドマーク検出AI
  3. det_10g.onnx顔の属性検出AI
  4. genderage.onnx顔の交換AI
  5. w600k_r50.onnx顔の認証AI

これらのモデルの間の依存関係

これらのモデルが推論に使われるのはface_analysis.pyのFaceAnalaysisクラスget()メソッド内です。ここを見ればそれぞれの依存関係がわかります。コードを読んでいくと顔の検出(face detection)だけがその他に依存されていることがわかります。つまりこれは与えられた画像の中の顔をまず顔検出AIで検出し、検出した顔に対して、顔のランドマーク検出AIや顔の属性検出AI、顔の認証AIなどがそれぞれ並列に動いているということが読み取れます。コードは以下の部分です。

face_analysis.py
1
2 def get(self, img, max_num=0):
3 bboxes, kpss = self.det_model.detect(img,
4 max_num=max_num,
5 metric='default')
6 if bboxes.shape[0] == 0:
7 return []
8 ret = []
9 for i in range(bboxes.shape[0]):
10 bbox = bboxes[i, 0:4]
11 det_score = bboxes[i, 4]
12 kps = None
13 if kpss is not None:
14 kps = kpss[i]
15 face = Face(bbox=bbox, kps=kps, det_score=det_score)
16 for taskname, model in self.models.items():
17 if taskname == 'detection':
18 continue
19 model.get(img, face)
20 ret.append(face)
21 return ret
22

それでは、それぞれのモデルを簡単に見てゆきたいと思います。それぞれのモデルのコードは上で既に確認したmodel_zoo.py内のModelRouterクラス内のget_modelメソッドから追えます。

RetinaFaceの解析 (顔検出のコードあり)

それでは顔認識やその他の顔分析プロセスの最初に行われている顔検出から見ていきましょう。顔検出は1k3d69.onnxが読み込まれ、RetinaFaceというクラスとして実装されています。RetinaFaceのコードはこちら。

retinaface.py
1
2class RetinaFace:
3 def __init__(self, model_file=None, session=None):
4 import onnxruntime
5 self.model_file = model_file
6 self.session = session
7 self.taskname = 'detection'
8 if self.session is None:
9 assert self.model_file is not None
10 assert osp.exists(self.model_file)
11 self.session = onnxruntime.InferenceSession(self.model_file, None)
12 self.center_cache = {}
13 self.nms_thresh = 0.4
14 self.det_thresh = 0.5
15 self._init_vars()
16
17 def _init_vars(self):
18 input_cfg = self.session.get_inputs()[0]
19 input_shape = input_cfg.shape
20 # print(input_shape)
21 if isinstance(input_shape[2], str):
22 self.input_size = None
23 else:
24 self.input_size = tuple(input_shape[2:4][::-1])
25 #print('image_size:', self.image_size)
26 input_name = input_cfg.name
27 self.input_shape = input_shape
28 outputs = self.session.get_outputs()
29 output_names = []
30 for o in outputs:
31 output_names.append(o.name)
32 self.input_name = input_name
33 self.output_names = output_names
34 self.input_mean = 127.5
35 self.input_std = 128.0
36 # print(self.output_names)
37 #assert len(outputs)==10 or len(outputs)==15
38 self.use_kps = False
39 self._anchor_ratio = 1.0
40 self._num_anchors = 1
41 if len(outputs) == 6:
42 self.fmc = 3
43 self._feat_stride_fpn = [8, 16, 32]
44 self._num_anchors = 2
45 elif len(outputs) == 9:
46 self.fmc = 3
47 self._feat_stride_fpn = [8, 16, 32]
48 self._num_anchors = 2
49 self.use_kps = True
50 elif len(outputs) == 10:
51 self.fmc = 5
52 self._feat_stride_fpn = [8, 16, 32, 64, 128]
53 self._num_anchors = 1
54 elif len(outputs) == 15:
55 self.fmc = 5
56 self._feat_stride_fpn = [8, 16, 32, 64, 128]
57 self._num_anchors = 1
58 self.use_kps = True
59
60 def prepare(self, ctx_id, **kwargs):
61 if ctx_id < 0:
62 self.session.set_providers(['CPUExecutionProvider'])
63 nms_thresh = kwargs.get('nms_thresh', None)
64 if nms_thresh is not None:
65 self.nms_thresh = nms_thresh
66 det_thresh = kwargs.get('det_thresh', None)
67 if det_thresh is not None:
68 self.det_thresh = det_thresh
69 input_size = kwargs.get('input_size', None)
70 if input_size is not None:
71 if self.input_size is not None:
72 print('warning: det_size is already set in detection model, ignore')
73 else:
74 self.input_size = input_size
75
76 def forward(self, img, threshold):
77 scores_list = []
78 bboxes_list = []
79 kpss_list = []
80 input_size = tuple(img.shape[0:2][::-1])
81 blob = cv2.dnn.blobFromImage(img, 1.0/self.input_std, input_size, (self.input_mean, self.input_mean, self.input_mean), swapRB=True)
82 net_outs = self.session.run(self.output_names, {self.input_name: blob})
83
84 input_height = blob.shape[2]
85 input_width = blob.shape[3]
86 fmc = self.fmc
87 for idx, stride in enumerate(self._feat_stride_fpn):
88 scores = net_outs[idx]
89 bbox_preds = net_outs[idx+fmc]
90 bbox_preds = bbox_preds * stride
91 if self.use_kps:
92 kps_preds = net_outs[idx+fmc*2] * stride
93 height = input_height // stride
94 width = input_width // stride
95 K = height * width
96 key = (height, width, stride)
97 if key in self.center_cache:
98 anchor_centers = self.center_cache[key]
99 else:
100 # solution-1, c style:
101 #anchor_centers = np.zeros( (height, width, 2), dtype=np.float32 )
102 # for i in range(height):
103 # anchor_centers[i, :, 1] = i
104 # for i in range(width):
105 # anchor_centers[:, i, 0] = i
106
107 # solution-2:
108 #ax = np.arange(width, dtype=np.float32)
109 #ay = np.arange(height, dtype=np.float32)
110 #xv, yv = np.meshgrid(np.arange(width), np.arange(height))
111 #anchor_centers = np.stack([xv, yv], axis=-1).astype(np.float32)
112
113 # solution-3:
114 anchor_centers = np.stack(np.mgrid[:height, :width][::-1], axis=-1).astype(np.float32)
115 # print(anchor_centers.shape)
116
117 anchor_centers = (anchor_centers * stride).reshape((-1, 2))
118 if self._num_anchors > 1:
119 anchor_centers = np.stack([anchor_centers]*self._num_anchors, axis=1).reshape((-1, 2))
120 if len(self.center_cache) < 100:
121 self.center_cache[key] = anchor_centers
122
123 pos_inds = np.where(scores >= threshold)[0]
124 bboxes = distance2bbox(anchor_centers, bbox_preds)
125 pos_scores = scores[pos_inds]
126 pos_bboxes = bboxes[pos_inds]
127 scores_list.append(pos_scores)
128 bboxes_list.append(pos_bboxes)
129 if self.use_kps:
130 kpss = distance2kps(anchor_centers, kps_preds)
131 #kpss = kps_preds
132 kpss = kpss.reshape((kpss.shape[0], -1, 2))
133 pos_kpss = kpss[pos_inds]
134 kpss_list.append(pos_kpss)
135 return scores_list, bboxes_list, kpss_list
136
137 def detect(self, img, input_size=None, max_num=0, metric='default'):
138 assert input_size is not None or self.input_size is not None
139 input_size = self.input_size if input_size is None else input_size
140
141 im_ratio = float(img.shape[0]) / img.shape[1]
142 model_ratio = float(input_size[1]) / input_size[0]
143 if im_ratio > model_ratio:
144 new_height = input_size[1]
145 new_width = int(new_height / im_ratio)
146 else:
147 new_width = input_size[0]
148 new_height = int(new_width * im_ratio)
149 det_scale = float(new_height) / img.shape[0]
150 resized_img = cv2.resize(img, (new_width, new_height))
151 det_img = np.zeros((input_size[1], input_size[0], 3), dtype=np.uint8)
152 det_img[:new_height, :new_width, :] = resized_img
153
154 scores_list, bboxes_list, kpss_list = self.forward(det_img, self.det_thresh)
155
156 scores = np.vstack(scores_list)
157 scores_ravel = scores.ravel()
158 order = scores_ravel.argsort()[::-1]
159 bboxes = np.vstack(bboxes_list) / det_scale
160 if self.use_kps:
161 kpss = np.vstack(kpss_list) / det_scale
162 pre_det = np.hstack((bboxes, scores)).astype(np.float32, copy=False)
163 pre_det = pre_det[order, :]
164 keep = self.nms(pre_det)
165 det = pre_det[keep, :]
166 if self.use_kps:
167 kpss = kpss[order, :, :]
168 kpss = kpss[keep, :, :]
169 else:
170 kpss = None
171 if max_num > 0 and det.shape[0] > max_num:
172 area = (det[:, 2] - det[:, 0]) * (det[:, 3] -
173 det[:, 1])
174 img_center = img.shape[0] // 2, img.shape[1] // 2
175 offsets = np.vstack([
176 (det[:, 0] + det[:, 2]) / 2 - img_center[1],
177 (det[:, 1] + det[:, 3]) / 2 - img_center[0]
178 ])
179 offset_dist_squared = np.sum(np.power(offsets, 2.0), 0)
180 if metric == 'max':
181 values = area
182 else:
183 values = area - offset_dist_squared * 2.0 # some extra weight on the centering
184 bindex = np.argsort(
185 values)[::-1] # some extra weight on the centering
186 bindex = bindex[0:max_num]
187 det = det[bindex, :]
188 if kpss is not None:
189 kpss = kpss[bindex, :]
190 return det, kpss
191
192 def nms(self, dets):
193 thresh = self.nms_thresh
194 x1 = dets[:, 0]
195 y1 = dets[:, 1]
196 x2 = dets[:, 2]
197 y2 = dets[:, 3]
198 scores = dets[:, 4]
199
200 areas = (x2 - x1 + 1) * (y2 - y1 + 1)
201 order = scores.argsort()[::-1]
202
203 keep = []
204 while order.size > 0:
205 i = order[0]
206 keep.append(i)
207 xx1 = np.maximum(x1[i], x1[order[1:]])
208 yy1 = np.maximum(y1[i], y1[order[1:]])
209 xx2 = np.minimum(x2[i], x2[order[1:]])
210 yy2 = np.minimum(y2[i], y2[order[1:]])
211
212 w = np.maximum(0.0, xx2 - xx1 + 1)
213 h = np.maximum(0.0, yy2 - yy1 + 1)
214 inter = w * h
215 ovr = inter / (areas[i] + areas[order[1:]] - inter)
216

これはなかなか難解なコードです。RetinaFaceでは顔の領域矩形と、キーポイントを検出しています。RetinaFaceはバックボーンとしてResNetのようなもの、ヘッドにSSDを拡張したようなものを利用しています。これらを自作するのはかなり難易度が高いです、顔検出は今回そのまま使いましょう。物体検出のしくみに興味がある方はまずSSDの仕組みを勉強してみることをお勧めします。

RetinaFaceをつかって画像の中から顔を検出してみる

せっかくですので、ここでRetinaFaceを使ってサクッと顔検出だけを行うコードを作ってみましょう。自信がある方は下の回答を見ずに、まず自分で作ってみてください。

face detection code
1
2import cv2
3import numpy as np
4import insightface
5
6# load detection model
7detector = insightface.model_zoo.get_model("models/buffalo_l/det_10g.onnx") # 任意のパスに変更してください
8detector.prepare(ctx_id=-1, input_size=(640, 640))
9
10# 入力画像を準備
11rgb_img = cv2.cvtColor(cv2.imread("data/images/path_to_image/some_face_1.jpg"), cv2.COLOR_BGR2RGB) # 任意のパスに変更してください
12
13# 検出
14bboxes, kpss = detector.detect(rgb_img)
15

いや~、ライブラリ有難いですね。こんな簡単に顔検出ができてしまいました。またRetinaFaceでは顔の部分のバウンディングボックスだけでなく、目、鼻、口のキーポイントを同時に検出してくれます。これは便利ですね。上記コードを実装したらbboxesとkpssをそれぞれ出力して中身を見てみるとよいでしょう。

LandMarkモデルの確認

LandMarkモデルは顔の目鼻口などのキーポイントだけでなく、もっと詳細な顔のランドマークを検出してくれます。顔のランドマークは、顔の加工や効果を付加したりする画像処理を行うときなどに便利です。今回の顔認証では、このLandMarkモデルは利用しませんので、内部コードには触れません。コードはmodel_zoo.pyのModelRouterクラスから追えます。landmark.pyというファイルにありますので興味がある方は使ってみてください。

Attributeモデルの確認

Attributeモデルは検出した顔の性別や年齢などの属性を推論します。顔の属性情報は小売店やイベント等の来客分析、マーケテイング分析などに活用できます。Attributeモデルも今回の顔認証では利用しませんので、内部コードには触れません。コードはmodel_zoo.pyのModelRouterクラスから追えます。attribute.pyというファイルにありますので、来客分析などに興味がある方は利用してみるとよいでしょう。

ArchFaceONNXモデルの解析

いよいよ顔認証モデルであるArchFaceONNXを見ていきましょう。以下がArchFaceONNXのコードです。

arcface_onnx.py
1
2class ArcFaceONNX:
3 def __init__(self, model_file=None, session=None):
4 assert model_file is not None
5 self.model_file = model_file
6 self.session = session
7 self.taskname = 'recognition'
8 find_sub = False
9 find_mul = False
10 model = onnx.load(self.model_file)
11 graph = model.graph
12 for nid, node in enumerate(graph.node[:8]):
13 #print(nid, node.name)
14 if node.name.startswith('Sub') or node.name.startswith('_minus'):
15 find_sub = True
16 if node.name.startswith('Mul') or node.name.startswith('_mul'):
17 find_mul = True
18 if find_sub and find_mul:
19 #mxnet arcface model
20 input_mean = 0.0
21 input_std = 1.0
22 else:
23 input_mean = 127.5
24 input_std = 127.5
25 self.input_mean = input_mean
26 self.input_std = input_std
27 #print('input mean and std:', self.input_mean, self.input_std)
28 if self.session is None:
29 self.session = onnxruntime.InferenceSession(self.model_file, None)
30 input_cfg = self.session.get_inputs()[0]
31 input_shape = input_cfg.shape
32 input_name = input_cfg.name
33 self.input_size = tuple(input_shape[2:4][::-1])
34 self.input_shape = input_shape
35 outputs = self.session.get_outputs()
36 output_names = []
37 for out in outputs:
38 output_names.append(out.name)
39 self.input_name = input_name
40 self.output_names = output_names
41 assert len(self.output_names)==1
42 self.output_shape = outputs[0].shape
43
44 def prepare(self, ctx_id, **kwargs):
45 if ctx_id<0:
46 self.session.set_providers(['CPUExecutionProvider'])
47
48 def get(self, img, face):
49 aimg = face_align.norm_crop(img, landmark=face.kps, image_size=self.input_size[0])
50 face.embedding = self.get_feat(aimg).flatten()
51 return face.embedding
52
53 def compute_sim(self, feat1, feat2):
54 from numpy.linalg import norm
55 feat1 = feat1.ravel()
56 feat2 = feat2.ravel()
57 sim = np.dot(feat1, feat2) / (norm(feat1) * norm(feat2))
58 return sim
59
60 def get_feat(self, imgs):
61 if not isinstance(imgs, list):
62 imgs = [imgs]
63 input_size = self.input_size
64
65 blob = cv2.dnn.blobFromImages(imgs, 1.0 / self.input_std, input_size,
66 (self.input_mean, self.input_mean, self.input_mean), swapRB=True)
67 net_out = self.session.run(self.output_names, {self.input_name: blob})[0]
68 return net_out
69
70 def forward(self, batch_data):
71 blob = (batch_data - self.input_mean) / self.input_std
72 net_out = self.session.run(self.output_names, {self.input_name: blob})[0]
73 return net_out
74

さきほどRetinaFaceを利用して顔の検出だけを行いました。今度はその検出した顔情報をAcchFaceONNXモデルに入力して顔認識を行っています。コードを読み進めるとArcFaceONNXクラス内のgetメソッドまたはforwardメソッドでモデルの推論が行われるようです。またgetメソッド出力はInsightFaceのfaceオブジェクトのembeddingに格納しているものですので、これが顔認識に使われるembeddingであることは間違いないでしょう。次回セッションではこのArcFaceONNXクラスとRetinaFaceを組み合わせて顔認証のコードを作っていきます。またArcFaceONNXについては理解を深めるためpytorchでモデルを実装することも学習します。実装した結果はセッション1で示したinsightfaceのappを使ってサクッと顔認証を行う結果と同じことになりますが、何がおこなわれてるのか分からなかったセッション1のサンプルコードより、より理解が深まりモジュールレベルでの活用も可能なコードになっていることでしょう。