mirror of
https://github.com/20kaushik02/CSE546_Cloud_Computing_Projects.git
synced 2026-01-25 14:44:04 +00:00
did not do part 2. but must complete even if of no use.
This commit is contained in:
340
Tooling/model/facenet_pytorch/models/inception_resnet_v1.py
Normal file
340
Tooling/model/facenet_pytorch/models/inception_resnet_v1.py
Normal file
@@ -0,0 +1,340 @@
|
||||
import os
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
|
||||
import torch
|
||||
from torch import nn
|
||||
from torch.nn import functional as F
|
||||
|
||||
from .utils.download import download_url_to_file
|
||||
|
||||
|
||||
class BasicConv2d(nn.Module):
|
||||
|
||||
def __init__(self, in_planes, out_planes, kernel_size, stride, padding=0):
|
||||
super().__init__()
|
||||
self.conv = nn.Conv2d(
|
||||
in_planes, out_planes,
|
||||
kernel_size=kernel_size, stride=stride,
|
||||
padding=padding, bias=False
|
||||
) # verify bias false
|
||||
self.bn = nn.BatchNorm2d(
|
||||
out_planes,
|
||||
eps=0.001, # value found in tensorflow
|
||||
momentum=0.1, # default pytorch value
|
||||
affine=True
|
||||
)
|
||||
self.relu = nn.ReLU(inplace=False)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.conv(x)
|
||||
x = self.bn(x)
|
||||
x = self.relu(x)
|
||||
return x
|
||||
|
||||
|
||||
class Block35(nn.Module):
|
||||
|
||||
def __init__(self, scale=1.0):
|
||||
super().__init__()
|
||||
|
||||
self.scale = scale
|
||||
|
||||
self.branch0 = BasicConv2d(256, 32, kernel_size=1, stride=1)
|
||||
|
||||
self.branch1 = nn.Sequential(
|
||||
BasicConv2d(256, 32, kernel_size=1, stride=1),
|
||||
BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1)
|
||||
)
|
||||
|
||||
self.branch2 = nn.Sequential(
|
||||
BasicConv2d(256, 32, kernel_size=1, stride=1),
|
||||
BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1),
|
||||
BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1)
|
||||
)
|
||||
|
||||
self.conv2d = nn.Conv2d(96, 256, kernel_size=1, stride=1)
|
||||
self.relu = nn.ReLU(inplace=False)
|
||||
|
||||
def forward(self, x):
|
||||
x0 = self.branch0(x)
|
||||
x1 = self.branch1(x)
|
||||
x2 = self.branch2(x)
|
||||
out = torch.cat((x0, x1, x2), 1)
|
||||
out = self.conv2d(out)
|
||||
out = out * self.scale + x
|
||||
out = self.relu(out)
|
||||
return out
|
||||
|
||||
|
||||
class Block17(nn.Module):
|
||||
|
||||
def __init__(self, scale=1.0):
|
||||
super().__init__()
|
||||
|
||||
self.scale = scale
|
||||
|
||||
self.branch0 = BasicConv2d(896, 128, kernel_size=1, stride=1)
|
||||
|
||||
self.branch1 = nn.Sequential(
|
||||
BasicConv2d(896, 128, kernel_size=1, stride=1),
|
||||
BasicConv2d(128, 128, kernel_size=(1,7), stride=1, padding=(0,3)),
|
||||
BasicConv2d(128, 128, kernel_size=(7,1), stride=1, padding=(3,0))
|
||||
)
|
||||
|
||||
self.conv2d = nn.Conv2d(256, 896, kernel_size=1, stride=1)
|
||||
self.relu = nn.ReLU(inplace=False)
|
||||
|
||||
def forward(self, x):
|
||||
x0 = self.branch0(x)
|
||||
x1 = self.branch1(x)
|
||||
out = torch.cat((x0, x1), 1)
|
||||
out = self.conv2d(out)
|
||||
out = out * self.scale + x
|
||||
out = self.relu(out)
|
||||
return out
|
||||
|
||||
|
||||
class Block8(nn.Module):
|
||||
|
||||
def __init__(self, scale=1.0, noReLU=False):
|
||||
super().__init__()
|
||||
|
||||
self.scale = scale
|
||||
self.noReLU = noReLU
|
||||
|
||||
self.branch0 = BasicConv2d(1792, 192, kernel_size=1, stride=1)
|
||||
|
||||
self.branch1 = nn.Sequential(
|
||||
BasicConv2d(1792, 192, kernel_size=1, stride=1),
|
||||
BasicConv2d(192, 192, kernel_size=(1,3), stride=1, padding=(0,1)),
|
||||
BasicConv2d(192, 192, kernel_size=(3,1), stride=1, padding=(1,0))
|
||||
)
|
||||
|
||||
self.conv2d = nn.Conv2d(384, 1792, kernel_size=1, stride=1)
|
||||
if not self.noReLU:
|
||||
self.relu = nn.ReLU(inplace=False)
|
||||
|
||||
def forward(self, x):
|
||||
x0 = self.branch0(x)
|
||||
x1 = self.branch1(x)
|
||||
out = torch.cat((x0, x1), 1)
|
||||
out = self.conv2d(out)
|
||||
out = out * self.scale + x
|
||||
if not self.noReLU:
|
||||
out = self.relu(out)
|
||||
return out
|
||||
|
||||
|
||||
class Mixed_6a(nn.Module):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.branch0 = BasicConv2d(256, 384, kernel_size=3, stride=2)
|
||||
|
||||
self.branch1 = nn.Sequential(
|
||||
BasicConv2d(256, 192, kernel_size=1, stride=1),
|
||||
BasicConv2d(192, 192, kernel_size=3, stride=1, padding=1),
|
||||
BasicConv2d(192, 256, kernel_size=3, stride=2)
|
||||
)
|
||||
|
||||
self.branch2 = nn.MaxPool2d(3, stride=2)
|
||||
|
||||
def forward(self, x):
|
||||
x0 = self.branch0(x)
|
||||
x1 = self.branch1(x)
|
||||
x2 = self.branch2(x)
|
||||
out = torch.cat((x0, x1, x2), 1)
|
||||
return out
|
||||
|
||||
|
||||
class Mixed_7a(nn.Module):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self.branch0 = nn.Sequential(
|
||||
BasicConv2d(896, 256, kernel_size=1, stride=1),
|
||||
BasicConv2d(256, 384, kernel_size=3, stride=2)
|
||||
)
|
||||
|
||||
self.branch1 = nn.Sequential(
|
||||
BasicConv2d(896, 256, kernel_size=1, stride=1),
|
||||
BasicConv2d(256, 256, kernel_size=3, stride=2)
|
||||
)
|
||||
|
||||
self.branch2 = nn.Sequential(
|
||||
BasicConv2d(896, 256, kernel_size=1, stride=1),
|
||||
BasicConv2d(256, 256, kernel_size=3, stride=1, padding=1),
|
||||
BasicConv2d(256, 256, kernel_size=3, stride=2)
|
||||
)
|
||||
|
||||
self.branch3 = nn.MaxPool2d(3, stride=2)
|
||||
|
||||
def forward(self, x):
|
||||
x0 = self.branch0(x)
|
||||
x1 = self.branch1(x)
|
||||
x2 = self.branch2(x)
|
||||
x3 = self.branch3(x)
|
||||
out = torch.cat((x0, x1, x2, x3), 1)
|
||||
return out
|
||||
|
||||
|
||||
class InceptionResnetV1(nn.Module):
|
||||
"""Inception Resnet V1 model with optional loading of pretrained weights.
|
||||
|
||||
Model parameters can be loaded based on pretraining on the VGGFace2 or CASIA-Webface
|
||||
datasets. Pretrained state_dicts are automatically downloaded on model instantiation if
|
||||
requested and cached in the torch cache. Subsequent instantiations use the cache rather than
|
||||
redownloading.
|
||||
|
||||
Keyword Arguments:
|
||||
pretrained {str} -- Optional pretraining dataset. Either 'vggface2' or 'casia-webface'.
|
||||
(default: {None})
|
||||
classify {bool} -- Whether the model should output classification probabilities or feature
|
||||
embeddings. (default: {False})
|
||||
num_classes {int} -- Number of output classes. If 'pretrained' is set and num_classes not
|
||||
equal to that used for the pretrained model, the final linear layer will be randomly
|
||||
initialized. (default: {None})
|
||||
dropout_prob {float} -- Dropout probability. (default: {0.6})
|
||||
"""
|
||||
def __init__(self, pretrained=None, classify=False, num_classes=None, dropout_prob=0.6, device=None):
|
||||
super().__init__()
|
||||
|
||||
# Set simple attributes
|
||||
self.pretrained = pretrained
|
||||
self.classify = classify
|
||||
self.num_classes = num_classes
|
||||
|
||||
if pretrained == 'vggface2':
|
||||
tmp_classes = 8631
|
||||
elif pretrained == 'casia-webface':
|
||||
tmp_classes = 10575
|
||||
elif pretrained is None and self.classify and self.num_classes is None:
|
||||
raise Exception('If "pretrained" is not specified and "classify" is True, "num_classes" must be specified')
|
||||
|
||||
|
||||
# Define layers
|
||||
self.conv2d_1a = BasicConv2d(3, 32, kernel_size=3, stride=2)
|
||||
self.conv2d_2a = BasicConv2d(32, 32, kernel_size=3, stride=1)
|
||||
self.conv2d_2b = BasicConv2d(32, 64, kernel_size=3, stride=1, padding=1)
|
||||
self.maxpool_3a = nn.MaxPool2d(3, stride=2)
|
||||
self.conv2d_3b = BasicConv2d(64, 80, kernel_size=1, stride=1)
|
||||
self.conv2d_4a = BasicConv2d(80, 192, kernel_size=3, stride=1)
|
||||
self.conv2d_4b = BasicConv2d(192, 256, kernel_size=3, stride=2)
|
||||
self.repeat_1 = nn.Sequential(
|
||||
Block35(scale=0.17),
|
||||
Block35(scale=0.17),
|
||||
Block35(scale=0.17),
|
||||
Block35(scale=0.17),
|
||||
Block35(scale=0.17),
|
||||
)
|
||||
self.mixed_6a = Mixed_6a()
|
||||
self.repeat_2 = nn.Sequential(
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
Block17(scale=0.10),
|
||||
)
|
||||
self.mixed_7a = Mixed_7a()
|
||||
self.repeat_3 = nn.Sequential(
|
||||
Block8(scale=0.20),
|
||||
Block8(scale=0.20),
|
||||
Block8(scale=0.20),
|
||||
Block8(scale=0.20),
|
||||
Block8(scale=0.20),
|
||||
)
|
||||
self.block8 = Block8(noReLU=True)
|
||||
self.avgpool_1a = nn.AdaptiveAvgPool2d(1)
|
||||
self.dropout = nn.Dropout(dropout_prob)
|
||||
self.last_linear = nn.Linear(1792, 512, bias=False)
|
||||
self.last_bn = nn.BatchNorm1d(512, eps=0.001, momentum=0.1, affine=True)
|
||||
|
||||
if pretrained is not None:
|
||||
self.logits = nn.Linear(512, tmp_classes)
|
||||
load_weights(self, pretrained)
|
||||
|
||||
if self.classify and self.num_classes is not None:
|
||||
self.logits = nn.Linear(512, self.num_classes)
|
||||
|
||||
self.device = torch.device('cpu')
|
||||
if device is not None:
|
||||
self.device = device
|
||||
self.to(device)
|
||||
|
||||
def forward(self, x):
|
||||
"""Calculate embeddings or logits given a batch of input image tensors.
|
||||
|
||||
Arguments:
|
||||
x {torch.tensor} -- Batch of image tensors representing faces.
|
||||
|
||||
Returns:
|
||||
torch.tensor -- Batch of embedding vectors or multinomial logits.
|
||||
"""
|
||||
x = self.conv2d_1a(x)
|
||||
x = self.conv2d_2a(x)
|
||||
x = self.conv2d_2b(x)
|
||||
x = self.maxpool_3a(x)
|
||||
x = self.conv2d_3b(x)
|
||||
x = self.conv2d_4a(x)
|
||||
x = self.conv2d_4b(x)
|
||||
x = self.repeat_1(x)
|
||||
x = self.mixed_6a(x)
|
||||
x = self.repeat_2(x)
|
||||
x = self.mixed_7a(x)
|
||||
x = self.repeat_3(x)
|
||||
x = self.block8(x)
|
||||
x = self.avgpool_1a(x)
|
||||
x = self.dropout(x)
|
||||
x = self.last_linear(x.view(x.shape[0], -1))
|
||||
x = self.last_bn(x)
|
||||
if self.classify:
|
||||
x = self.logits(x)
|
||||
else:
|
||||
x = F.normalize(x, p=2, dim=1)
|
||||
return x
|
||||
|
||||
|
||||
def load_weights(mdl, name):
|
||||
"""Download pretrained state_dict and load into model.
|
||||
|
||||
Arguments:
|
||||
mdl {torch.nn.Module} -- Pytorch model.
|
||||
name {str} -- Name of dataset that was used to generate pretrained state_dict.
|
||||
|
||||
Raises:
|
||||
ValueError: If 'pretrained' not equal to 'vggface2' or 'casia-webface'.
|
||||
"""
|
||||
if name == 'vggface2':
|
||||
path = 'https://github.com/timesler/facenet-pytorch/releases/download/v2.2.9/20180402-114759-vggface2.pt'
|
||||
elif name == 'casia-webface':
|
||||
path = 'https://github.com/timesler/facenet-pytorch/releases/download/v2.2.9/20180408-102900-casia-webface.pt'
|
||||
else:
|
||||
raise ValueError('Pretrained models only exist for "vggface2" and "casia-webface"')
|
||||
|
||||
model_dir = os.path.join(get_torch_home(), 'checkpoints')
|
||||
os.makedirs(model_dir, exist_ok=True)
|
||||
|
||||
cached_file = os.path.join(model_dir, os.path.basename(path))
|
||||
if not os.path.exists(cached_file):
|
||||
download_url_to_file(path, cached_file)
|
||||
|
||||
state_dict = torch.load(cached_file)
|
||||
mdl.load_state_dict(state_dict)
|
||||
|
||||
|
||||
def get_torch_home():
|
||||
torch_home = os.path.expanduser(
|
||||
os.getenv(
|
||||
'TORCH_HOME',
|
||||
os.path.join(os.getenv('XDG_CACHE_HOME', '~/.cache'), 'torch')
|
||||
)
|
||||
)
|
||||
return torch_home
|
||||
519
Tooling/model/facenet_pytorch/models/mtcnn.py
Normal file
519
Tooling/model/facenet_pytorch/models/mtcnn.py
Normal file
@@ -0,0 +1,519 @@
|
||||
import torch
|
||||
from torch import nn
|
||||
import numpy as np
|
||||
import os
|
||||
|
||||
from .utils.detect_face import detect_face, extract_face
|
||||
|
||||
|
||||
class PNet(nn.Module):
|
||||
"""MTCNN PNet.
|
||||
|
||||
Keyword Arguments:
|
||||
pretrained {bool} -- Whether or not to load saved pretrained weights (default: {True})
|
||||
"""
|
||||
|
||||
def __init__(self, pretrained=True):
|
||||
super().__init__()
|
||||
|
||||
self.conv1 = nn.Conv2d(3, 10, kernel_size=3)
|
||||
self.prelu1 = nn.PReLU(10)
|
||||
self.pool1 = nn.MaxPool2d(2, 2, ceil_mode=True)
|
||||
self.conv2 = nn.Conv2d(10, 16, kernel_size=3)
|
||||
self.prelu2 = nn.PReLU(16)
|
||||
self.conv3 = nn.Conv2d(16, 32, kernel_size=3)
|
||||
self.prelu3 = nn.PReLU(32)
|
||||
self.conv4_1 = nn.Conv2d(32, 2, kernel_size=1)
|
||||
self.softmax4_1 = nn.Softmax(dim=1)
|
||||
self.conv4_2 = nn.Conv2d(32, 4, kernel_size=1)
|
||||
|
||||
self.training = False
|
||||
|
||||
if pretrained:
|
||||
state_dict_path = os.path.join(os.path.dirname(__file__), '../data/pnet.pt')
|
||||
state_dict = torch.load(state_dict_path)
|
||||
self.load_state_dict(state_dict)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.conv1(x)
|
||||
x = self.prelu1(x)
|
||||
x = self.pool1(x)
|
||||
x = self.conv2(x)
|
||||
x = self.prelu2(x)
|
||||
x = self.conv3(x)
|
||||
x = self.prelu3(x)
|
||||
a = self.conv4_1(x)
|
||||
a = self.softmax4_1(a)
|
||||
b = self.conv4_2(x)
|
||||
return b, a
|
||||
|
||||
|
||||
class RNet(nn.Module):
|
||||
"""MTCNN RNet.
|
||||
|
||||
Keyword Arguments:
|
||||
pretrained {bool} -- Whether or not to load saved pretrained weights (default: {True})
|
||||
"""
|
||||
|
||||
def __init__(self, pretrained=True):
|
||||
super().__init__()
|
||||
|
||||
self.conv1 = nn.Conv2d(3, 28, kernel_size=3)
|
||||
self.prelu1 = nn.PReLU(28)
|
||||
self.pool1 = nn.MaxPool2d(3, 2, ceil_mode=True)
|
||||
self.conv2 = nn.Conv2d(28, 48, kernel_size=3)
|
||||
self.prelu2 = nn.PReLU(48)
|
||||
self.pool2 = nn.MaxPool2d(3, 2, ceil_mode=True)
|
||||
self.conv3 = nn.Conv2d(48, 64, kernel_size=2)
|
||||
self.prelu3 = nn.PReLU(64)
|
||||
self.dense4 = nn.Linear(576, 128)
|
||||
self.prelu4 = nn.PReLU(128)
|
||||
self.dense5_1 = nn.Linear(128, 2)
|
||||
self.softmax5_1 = nn.Softmax(dim=1)
|
||||
self.dense5_2 = nn.Linear(128, 4)
|
||||
|
||||
self.training = False
|
||||
|
||||
if pretrained:
|
||||
state_dict_path = os.path.join(os.path.dirname(__file__), '../data/rnet.pt')
|
||||
state_dict = torch.load(state_dict_path)
|
||||
self.load_state_dict(state_dict)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.conv1(x)
|
||||
x = self.prelu1(x)
|
||||
x = self.pool1(x)
|
||||
x = self.conv2(x)
|
||||
x = self.prelu2(x)
|
||||
x = self.pool2(x)
|
||||
x = self.conv3(x)
|
||||
x = self.prelu3(x)
|
||||
x = x.permute(0, 3, 2, 1).contiguous()
|
||||
x = self.dense4(x.view(x.shape[0], -1))
|
||||
x = self.prelu4(x)
|
||||
a = self.dense5_1(x)
|
||||
a = self.softmax5_1(a)
|
||||
b = self.dense5_2(x)
|
||||
return b, a
|
||||
|
||||
|
||||
class ONet(nn.Module):
|
||||
"""MTCNN ONet.
|
||||
|
||||
Keyword Arguments:
|
||||
pretrained {bool} -- Whether or not to load saved pretrained weights (default: {True})
|
||||
"""
|
||||
|
||||
def __init__(self, pretrained=True):
|
||||
super().__init__()
|
||||
|
||||
self.conv1 = nn.Conv2d(3, 32, kernel_size=3)
|
||||
self.prelu1 = nn.PReLU(32)
|
||||
self.pool1 = nn.MaxPool2d(3, 2, ceil_mode=True)
|
||||
self.conv2 = nn.Conv2d(32, 64, kernel_size=3)
|
||||
self.prelu2 = nn.PReLU(64)
|
||||
self.pool2 = nn.MaxPool2d(3, 2, ceil_mode=True)
|
||||
self.conv3 = nn.Conv2d(64, 64, kernel_size=3)
|
||||
self.prelu3 = nn.PReLU(64)
|
||||
self.pool3 = nn.MaxPool2d(2, 2, ceil_mode=True)
|
||||
self.conv4 = nn.Conv2d(64, 128, kernel_size=2)
|
||||
self.prelu4 = nn.PReLU(128)
|
||||
self.dense5 = nn.Linear(1152, 256)
|
||||
self.prelu5 = nn.PReLU(256)
|
||||
self.dense6_1 = nn.Linear(256, 2)
|
||||
self.softmax6_1 = nn.Softmax(dim=1)
|
||||
self.dense6_2 = nn.Linear(256, 4)
|
||||
self.dense6_3 = nn.Linear(256, 10)
|
||||
|
||||
self.training = False
|
||||
|
||||
if pretrained:
|
||||
state_dict_path = os.path.join(os.path.dirname(__file__), '../data/onet.pt')
|
||||
state_dict = torch.load(state_dict_path)
|
||||
self.load_state_dict(state_dict)
|
||||
|
||||
def forward(self, x):
|
||||
x = self.conv1(x)
|
||||
x = self.prelu1(x)
|
||||
x = self.pool1(x)
|
||||
x = self.conv2(x)
|
||||
x = self.prelu2(x)
|
||||
x = self.pool2(x)
|
||||
x = self.conv3(x)
|
||||
x = self.prelu3(x)
|
||||
x = self.pool3(x)
|
||||
x = self.conv4(x)
|
||||
x = self.prelu4(x)
|
||||
x = x.permute(0, 3, 2, 1).contiguous()
|
||||
x = self.dense5(x.view(x.shape[0], -1))
|
||||
x = self.prelu5(x)
|
||||
a = self.dense6_1(x)
|
||||
a = self.softmax6_1(a)
|
||||
b = self.dense6_2(x)
|
||||
c = self.dense6_3(x)
|
||||
return b, c, a
|
||||
|
||||
|
||||
class MTCNN(nn.Module):
|
||||
"""MTCNN face detection module.
|
||||
|
||||
This class loads pretrained P-, R-, and O-nets and returns images cropped to include the face
|
||||
only, given raw input images of one of the following types:
|
||||
- PIL image or list of PIL images
|
||||
- numpy.ndarray (uint8) representing either a single image (3D) or a batch of images (4D).
|
||||
Cropped faces can optionally be saved to file
|
||||
also.
|
||||
|
||||
Keyword Arguments:
|
||||
image_size {int} -- Output image size in pixels. The image will be square. (default: {160})
|
||||
margin {int} -- Margin to add to bounding box, in terms of pixels in the final image.
|
||||
Note that the application of the margin differs slightly from the davidsandberg/facenet
|
||||
repo, which applies the margin to the original image before resizing, making the margin
|
||||
dependent on the original image size (this is a bug in davidsandberg/facenet).
|
||||
(default: {0})
|
||||
min_face_size {int} -- Minimum face size to search for. (default: {20})
|
||||
thresholds {list} -- MTCNN face detection thresholds (default: {[0.6, 0.7, 0.7]})
|
||||
factor {float} -- Factor used to create a scaling pyramid of face sizes. (default: {0.709})
|
||||
post_process {bool} -- Whether or not to post process images tensors before returning.
|
||||
(default: {True})
|
||||
select_largest {bool} -- If True, if multiple faces are detected, the largest is returned.
|
||||
If False, the face with the highest detection probability is returned.
|
||||
(default: {True})
|
||||
selection_method {string} -- Which heuristic to use for selection. Default None. If
|
||||
specified, will override select_largest:
|
||||
"probability": highest probability selected
|
||||
"largest": largest box selected
|
||||
"largest_over_threshold": largest box over a certain probability selected
|
||||
"center_weighted_size": box size minus weighted squared offset from image center
|
||||
(default: {None})
|
||||
keep_all {bool} -- If True, all detected faces are returned, in the order dictated by the
|
||||
select_largest parameter. If a save_path is specified, the first face is saved to that
|
||||
path and the remaining faces are saved to <save_path>1, <save_path>2 etc.
|
||||
(default: {False})
|
||||
device {torch.device} -- The device on which to run neural net passes. Image tensors and
|
||||
models are copied to this device before running forward passes. (default: {None})
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, image_size=160, margin=0, min_face_size=20,
|
||||
thresholds=[0.6, 0.7, 0.7], factor=0.709, post_process=True,
|
||||
select_largest=True, selection_method=None, keep_all=False, device=None
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
self.image_size = image_size
|
||||
self.margin = margin
|
||||
self.min_face_size = min_face_size
|
||||
self.thresholds = thresholds
|
||||
self.factor = factor
|
||||
self.post_process = post_process
|
||||
self.select_largest = select_largest
|
||||
self.keep_all = keep_all
|
||||
self.selection_method = selection_method
|
||||
|
||||
self.pnet = PNet()
|
||||
self.rnet = RNet()
|
||||
self.onet = ONet()
|
||||
|
||||
self.device = torch.device('cpu')
|
||||
if device is not None:
|
||||
self.device = device
|
||||
self.to(device)
|
||||
|
||||
if not self.selection_method:
|
||||
self.selection_method = 'largest' if self.select_largest else 'probability'
|
||||
|
||||
def forward(self, img, save_path=None, return_prob=False):
|
||||
"""Run MTCNN face detection on a PIL image or numpy array. This method performs both
|
||||
detection and extraction of faces, returning tensors representing detected faces rather
|
||||
than the bounding boxes. To access bounding boxes, see the MTCNN.detect() method below.
|
||||
|
||||
Arguments:
|
||||
img {PIL.Image, np.ndarray, or list} -- A PIL image, np.ndarray, torch.Tensor, or list.
|
||||
|
||||
Keyword Arguments:
|
||||
save_path {str} -- An optional save path for the cropped image. Note that when
|
||||
self.post_process=True, although the returned tensor is post processed, the saved
|
||||
face image is not, so it is a true representation of the face in the input image.
|
||||
If `img` is a list of images, `save_path` should be a list of equal length.
|
||||
(default: {None})
|
||||
return_prob {bool} -- Whether or not to return the detection probability.
|
||||
(default: {False})
|
||||
|
||||
Returns:
|
||||
Union[torch.Tensor, tuple(torch.tensor, float)] -- If detected, cropped image of a face
|
||||
with dimensions 3 x image_size x image_size. Optionally, the probability that a
|
||||
face was detected. If self.keep_all is True, n detected faces are returned in an
|
||||
n x 3 x image_size x image_size tensor with an optional list of detection
|
||||
probabilities. If `img` is a list of images, the item(s) returned have an extra
|
||||
dimension (batch) as the first dimension.
|
||||
|
||||
Example:
|
||||
>>> from facenet_pytorch import MTCNN
|
||||
>>> mtcnn = MTCNN()
|
||||
>>> face_tensor, prob = mtcnn(img, save_path='face.png', return_prob=True)
|
||||
"""
|
||||
|
||||
# Detect faces
|
||||
batch_boxes, batch_probs, batch_points = self.detect(img, landmarks=True)
|
||||
# Select faces
|
||||
if not self.keep_all:
|
||||
batch_boxes, batch_probs, batch_points = self.select_boxes(
|
||||
batch_boxes, batch_probs, batch_points, img, method=self.selection_method
|
||||
)
|
||||
# Extract faces
|
||||
faces = self.extract(img, batch_boxes, save_path)
|
||||
|
||||
if return_prob:
|
||||
return faces, batch_probs
|
||||
else:
|
||||
return faces
|
||||
|
||||
def detect(self, img, landmarks=False):
|
||||
"""Detect all faces in PIL image and return bounding boxes and optional facial landmarks.
|
||||
|
||||
This method is used by the forward method and is also useful for face detection tasks
|
||||
that require lower-level handling of bounding boxes and facial landmarks (e.g., face
|
||||
tracking). The functionality of the forward function can be emulated by using this method
|
||||
followed by the extract_face() function.
|
||||
|
||||
Arguments:
|
||||
img {PIL.Image, np.ndarray, or list} -- A PIL image, np.ndarray, torch.Tensor, or list.
|
||||
|
||||
Keyword Arguments:
|
||||
landmarks {bool} -- Whether to return facial landmarks in addition to bounding boxes.
|
||||
(default: {False})
|
||||
|
||||
Returns:
|
||||
tuple(numpy.ndarray, list) -- For N detected faces, a tuple containing an
|
||||
Nx4 array of bounding boxes and a length N list of detection probabilities.
|
||||
Returned boxes will be sorted in descending order by detection probability if
|
||||
self.select_largest=False, otherwise the largest face will be returned first.
|
||||
If `img` is a list of images, the items returned have an extra dimension
|
||||
(batch) as the first dimension. Optionally, a third item, the facial landmarks,
|
||||
are returned if `landmarks=True`.
|
||||
|
||||
Example:
|
||||
>>> from PIL import Image, ImageDraw
|
||||
>>> from facenet_pytorch import MTCNN, extract_face
|
||||
>>> mtcnn = MTCNN(keep_all=True)
|
||||
>>> boxes, probs, points = mtcnn.detect(img, landmarks=True)
|
||||
>>> # Draw boxes and save faces
|
||||
>>> img_draw = img.copy()
|
||||
>>> draw = ImageDraw.Draw(img_draw)
|
||||
>>> for i, (box, point) in enumerate(zip(boxes, points)):
|
||||
... draw.rectangle(box.tolist(), width=5)
|
||||
... for p in point:
|
||||
... draw.rectangle((p - 10).tolist() + (p + 10).tolist(), width=10)
|
||||
... extract_face(img, box, save_path='detected_face_{}.png'.format(i))
|
||||
>>> img_draw.save('annotated_faces.png')
|
||||
"""
|
||||
|
||||
with torch.no_grad():
|
||||
batch_boxes, batch_points = detect_face(
|
||||
img, self.min_face_size,
|
||||
self.pnet, self.rnet, self.onet,
|
||||
self.thresholds, self.factor,
|
||||
self.device
|
||||
)
|
||||
|
||||
boxes, probs, points = [], [], []
|
||||
for box, point in zip(batch_boxes, batch_points):
|
||||
box = np.array(box)
|
||||
point = np.array(point)
|
||||
if len(box) == 0:
|
||||
boxes.append(None)
|
||||
probs.append([None])
|
||||
points.append(None)
|
||||
elif self.select_largest:
|
||||
box_order = np.argsort((box[:, 2] - box[:, 0]) * (box[:, 3] - box[:, 1]))[::-1]
|
||||
box = box[box_order]
|
||||
point = point[box_order]
|
||||
boxes.append(box[:, :4])
|
||||
probs.append(box[:, 4])
|
||||
points.append(point)
|
||||
else:
|
||||
boxes.append(box[:, :4])
|
||||
probs.append(box[:, 4])
|
||||
points.append(point)
|
||||
boxes = np.array(boxes)
|
||||
probs = np.array(probs)
|
||||
points = np.array(points)
|
||||
|
||||
if (
|
||||
not isinstance(img, (list, tuple)) and
|
||||
not (isinstance(img, np.ndarray) and len(img.shape) == 4) and
|
||||
not (isinstance(img, torch.Tensor) and len(img.shape) == 4)
|
||||
):
|
||||
boxes = boxes[0]
|
||||
probs = probs[0]
|
||||
points = points[0]
|
||||
|
||||
if landmarks:
|
||||
return boxes, probs, points
|
||||
|
||||
return boxes, probs
|
||||
|
||||
def select_boxes(
|
||||
self, all_boxes, all_probs, all_points, imgs, method='probability', threshold=0.9,
|
||||
center_weight=2.0
|
||||
):
|
||||
"""Selects a single box from multiple for a given image using one of multiple heuristics.
|
||||
|
||||
Arguments:
|
||||
all_boxes {np.ndarray} -- Ix0 ndarray where each element is a Nx4 ndarry of
|
||||
bounding boxes for N detected faces in I images (output from self.detect).
|
||||
all_probs {np.ndarray} -- Ix0 ndarray where each element is a Nx0 ndarry of
|
||||
probabilities for N detected faces in I images (output from self.detect).
|
||||
all_points {np.ndarray} -- Ix0 ndarray where each element is a Nx5x2 array of
|
||||
points for N detected faces. (output from self.detect).
|
||||
imgs {PIL.Image, np.ndarray, or list} -- A PIL image, np.ndarray, torch.Tensor, or list.
|
||||
|
||||
Keyword Arguments:
|
||||
method {str} -- Which heuristic to use for selection:
|
||||
"probability": highest probability selected
|
||||
"largest": largest box selected
|
||||
"largest_over_theshold": largest box over a certain probability selected
|
||||
"center_weighted_size": box size minus weighted squared offset from image center
|
||||
(default: {'probability'})
|
||||
threshold {float} -- theshold for "largest_over_threshold" method. (default: {0.9})
|
||||
center_weight {float} -- weight for squared offset in center weighted size method.
|
||||
(default: {2.0})
|
||||
|
||||
Returns:
|
||||
tuple(numpy.ndarray, numpy.ndarray, numpy.ndarray) -- nx4 ndarray of bounding boxes
|
||||
for n images. Ix0 array of probabilities for each box, array of landmark points.
|
||||
"""
|
||||
|
||||
#copying batch detection from extract, but would be easier to ensure detect creates consistent output.
|
||||
batch_mode = True
|
||||
if (
|
||||
not isinstance(imgs, (list, tuple)) and
|
||||
not (isinstance(imgs, np.ndarray) and len(imgs.shape) == 4) and
|
||||
not (isinstance(imgs, torch.Tensor) and len(imgs.shape) == 4)
|
||||
):
|
||||
imgs = [imgs]
|
||||
all_boxes = [all_boxes]
|
||||
all_probs = [all_probs]
|
||||
all_points = [all_points]
|
||||
batch_mode = False
|
||||
|
||||
selected_boxes, selected_probs, selected_points = [], [], []
|
||||
for boxes, points, probs, img in zip(all_boxes, all_points, all_probs, imgs):
|
||||
|
||||
if boxes is None:
|
||||
selected_boxes.append(None)
|
||||
selected_probs.append([None])
|
||||
selected_points.append(None)
|
||||
continue
|
||||
|
||||
# If at least 1 box found
|
||||
boxes = np.array(boxes)
|
||||
probs = np.array(probs)
|
||||
points = np.array(points)
|
||||
|
||||
if method == 'largest':
|
||||
box_order = np.argsort((boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]))[::-1]
|
||||
elif method == 'probability':
|
||||
box_order = np.argsort(probs)[::-1]
|
||||
elif method == 'center_weighted_size':
|
||||
box_sizes = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
|
||||
img_center = (img.width / 2, img.height/2)
|
||||
box_centers = np.array(list(zip((boxes[:, 0] + boxes[:, 2]) / 2, (boxes[:, 1] + boxes[:, 3]) / 2)))
|
||||
offsets = box_centers - img_center
|
||||
offset_dist_squared = np.sum(np.power(offsets, 2.0), 1)
|
||||
box_order = np.argsort(box_sizes - offset_dist_squared * center_weight)[::-1]
|
||||
elif method == 'largest_over_threshold':
|
||||
box_mask = probs > threshold
|
||||
boxes = boxes[box_mask]
|
||||
box_order = np.argsort((boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1]))[::-1]
|
||||
if sum(box_mask) == 0:
|
||||
selected_boxes.append(None)
|
||||
selected_probs.append([None])
|
||||
selected_points.append(None)
|
||||
continue
|
||||
|
||||
box = boxes[box_order][[0]]
|
||||
prob = probs[box_order][[0]]
|
||||
point = points[box_order][[0]]
|
||||
selected_boxes.append(box)
|
||||
selected_probs.append(prob)
|
||||
selected_points.append(point)
|
||||
|
||||
if batch_mode:
|
||||
selected_boxes = np.array(selected_boxes)
|
||||
selected_probs = np.array(selected_probs)
|
||||
selected_points = np.array(selected_points)
|
||||
else:
|
||||
selected_boxes = selected_boxes[0]
|
||||
selected_probs = selected_probs[0][0]
|
||||
selected_points = selected_points[0]
|
||||
|
||||
return selected_boxes, selected_probs, selected_points
|
||||
|
||||
def extract(self, img, batch_boxes, save_path):
|
||||
# Determine if a batch or single image was passed
|
||||
batch_mode = True
|
||||
if (
|
||||
not isinstance(img, (list, tuple)) and
|
||||
not (isinstance(img, np.ndarray) and len(img.shape) == 4) and
|
||||
not (isinstance(img, torch.Tensor) and len(img.shape) == 4)
|
||||
):
|
||||
img = [img]
|
||||
batch_boxes = [batch_boxes]
|
||||
batch_mode = False
|
||||
|
||||
# Parse save path(s)
|
||||
if save_path is not None:
|
||||
if isinstance(save_path, str):
|
||||
save_path = [save_path]
|
||||
else:
|
||||
save_path = [None for _ in range(len(img))]
|
||||
|
||||
# Process all bounding boxes
|
||||
faces = []
|
||||
for im, box_im, path_im in zip(img, batch_boxes, save_path):
|
||||
if box_im is None:
|
||||
faces.append(None)
|
||||
continue
|
||||
|
||||
if not self.keep_all:
|
||||
box_im = box_im[[0]]
|
||||
|
||||
faces_im = []
|
||||
for i, box in enumerate(box_im):
|
||||
face_path = path_im
|
||||
if path_im is not None and i > 0:
|
||||
save_name, ext = os.path.splitext(path_im)
|
||||
face_path = save_name + '_' + str(i + 1) + ext
|
||||
|
||||
face = extract_face(im, box, self.image_size, self.margin, face_path)
|
||||
if self.post_process:
|
||||
face = fixed_image_standardization(face)
|
||||
faces_im.append(face)
|
||||
|
||||
if self.keep_all:
|
||||
faces_im = torch.stack(faces_im)
|
||||
else:
|
||||
faces_im = faces_im[0]
|
||||
|
||||
faces.append(faces_im)
|
||||
|
||||
if not batch_mode:
|
||||
faces = faces[0]
|
||||
|
||||
return faces
|
||||
|
||||
|
||||
def fixed_image_standardization(image_tensor):
|
||||
processed_tensor = (image_tensor - 127.5) / 128.0
|
||||
return processed_tensor
|
||||
|
||||
|
||||
def prewhiten(x):
|
||||
mean = x.mean()
|
||||
std = x.std()
|
||||
std_adj = std.clamp(min=1.0/(float(x.numel())**0.5))
|
||||
y = (x - mean) / std_adj
|
||||
return y
|
||||
|
||||
378
Tooling/model/facenet_pytorch/models/utils/detect_face.py
Normal file
378
Tooling/model/facenet_pytorch/models/utils/detect_face.py
Normal file
@@ -0,0 +1,378 @@
|
||||
import torch
|
||||
from torch.nn.functional import interpolate
|
||||
from torchvision.transforms import functional as F
|
||||
from torchvision.ops.boxes import batched_nms
|
||||
from PIL import Image
|
||||
import numpy as np
|
||||
import os
|
||||
import math
|
||||
|
||||
# OpenCV is optional, but required if using numpy arrays instead of PIL
|
||||
try:
|
||||
import cv2
|
||||
except:
|
||||
pass
|
||||
|
||||
def fixed_batch_process(im_data, model):
|
||||
batch_size = 512
|
||||
out = []
|
||||
for i in range(0, len(im_data), batch_size):
|
||||
batch = im_data[i:(i+batch_size)]
|
||||
out.append(model(batch))
|
||||
|
||||
return tuple(torch.cat(v, dim=0) for v in zip(*out))
|
||||
|
||||
def detect_face(imgs, minsize, pnet, rnet, onet, threshold, factor, device):
|
||||
if isinstance(imgs, (np.ndarray, torch.Tensor)):
|
||||
if isinstance(imgs,np.ndarray):
|
||||
imgs = torch.as_tensor(imgs.copy(), device=device)
|
||||
|
||||
if isinstance(imgs,torch.Tensor):
|
||||
imgs = torch.as_tensor(imgs, device=device)
|
||||
|
||||
if len(imgs.shape) == 3:
|
||||
imgs = imgs.unsqueeze(0)
|
||||
else:
|
||||
if not isinstance(imgs, (list, tuple)):
|
||||
imgs = [imgs]
|
||||
if any(img.size != imgs[0].size for img in imgs):
|
||||
raise Exception("MTCNN batch processing only compatible with equal-dimension images.")
|
||||
imgs = np.stack([np.uint8(img) for img in imgs])
|
||||
imgs = torch.as_tensor(imgs.copy(), device=device)
|
||||
|
||||
|
||||
|
||||
model_dtype = next(pnet.parameters()).dtype
|
||||
imgs = imgs.permute(0, 3, 1, 2).type(model_dtype)
|
||||
|
||||
batch_size = len(imgs)
|
||||
h, w = imgs.shape[2:4]
|
||||
m = 12.0 / minsize
|
||||
minl = min(h, w)
|
||||
minl = minl * m
|
||||
|
||||
# Create scale pyramid
|
||||
scale_i = m
|
||||
scales = []
|
||||
while minl >= 12:
|
||||
scales.append(scale_i)
|
||||
scale_i = scale_i * factor
|
||||
minl = minl * factor
|
||||
|
||||
# First stage
|
||||
boxes = []
|
||||
image_inds = []
|
||||
|
||||
scale_picks = []
|
||||
|
||||
all_i = 0
|
||||
offset = 0
|
||||
for scale in scales:
|
||||
im_data = imresample(imgs, (int(h * scale + 1), int(w * scale + 1)))
|
||||
im_data = (im_data - 127.5) * 0.0078125
|
||||
reg, probs = pnet(im_data)
|
||||
|
||||
boxes_scale, image_inds_scale = generateBoundingBox(reg, probs[:, 1], scale, threshold[0])
|
||||
boxes.append(boxes_scale)
|
||||
image_inds.append(image_inds_scale)
|
||||
|
||||
pick = batched_nms(boxes_scale[:, :4], boxes_scale[:, 4], image_inds_scale, 0.5)
|
||||
scale_picks.append(pick + offset)
|
||||
offset += boxes_scale.shape[0]
|
||||
|
||||
boxes = torch.cat(boxes, dim=0)
|
||||
image_inds = torch.cat(image_inds, dim=0)
|
||||
|
||||
scale_picks = torch.cat(scale_picks, dim=0)
|
||||
|
||||
# NMS within each scale + image
|
||||
boxes, image_inds = boxes[scale_picks], image_inds[scale_picks]
|
||||
|
||||
|
||||
# NMS within each image
|
||||
pick = batched_nms(boxes[:, :4], boxes[:, 4], image_inds, 0.7)
|
||||
boxes, image_inds = boxes[pick], image_inds[pick]
|
||||
|
||||
regw = boxes[:, 2] - boxes[:, 0]
|
||||
regh = boxes[:, 3] - boxes[:, 1]
|
||||
qq1 = boxes[:, 0] + boxes[:, 5] * regw
|
||||
qq2 = boxes[:, 1] + boxes[:, 6] * regh
|
||||
qq3 = boxes[:, 2] + boxes[:, 7] * regw
|
||||
qq4 = boxes[:, 3] + boxes[:, 8] * regh
|
||||
boxes = torch.stack([qq1, qq2, qq3, qq4, boxes[:, 4]]).permute(1, 0)
|
||||
boxes = rerec(boxes)
|
||||
y, ey, x, ex = pad(boxes, w, h)
|
||||
|
||||
# Second stage
|
||||
if len(boxes) > 0:
|
||||
im_data = []
|
||||
for k in range(len(y)):
|
||||
if ey[k] > (y[k] - 1) and ex[k] > (x[k] - 1):
|
||||
img_k = imgs[image_inds[k], :, (y[k] - 1):ey[k], (x[k] - 1):ex[k]].unsqueeze(0)
|
||||
im_data.append(imresample(img_k, (24, 24)))
|
||||
im_data = torch.cat(im_data, dim=0)
|
||||
im_data = (im_data - 127.5) * 0.0078125
|
||||
|
||||
# This is equivalent to out = rnet(im_data) to avoid GPU out of memory.
|
||||
out = fixed_batch_process(im_data, rnet)
|
||||
|
||||
out0 = out[0].permute(1, 0)
|
||||
out1 = out[1].permute(1, 0)
|
||||
score = out1[1, :]
|
||||
ipass = score > threshold[1]
|
||||
boxes = torch.cat((boxes[ipass, :4], score[ipass].unsqueeze(1)), dim=1)
|
||||
image_inds = image_inds[ipass]
|
||||
mv = out0[:, ipass].permute(1, 0)
|
||||
|
||||
# NMS within each image
|
||||
pick = batched_nms(boxes[:, :4], boxes[:, 4], image_inds, 0.7)
|
||||
boxes, image_inds, mv = boxes[pick], image_inds[pick], mv[pick]
|
||||
boxes = bbreg(boxes, mv)
|
||||
boxes = rerec(boxes)
|
||||
|
||||
# Third stage
|
||||
points = torch.zeros(0, 5, 2, device=device)
|
||||
if len(boxes) > 0:
|
||||
y, ey, x, ex = pad(boxes, w, h)
|
||||
im_data = []
|
||||
for k in range(len(y)):
|
||||
if ey[k] > (y[k] - 1) and ex[k] > (x[k] - 1):
|
||||
img_k = imgs[image_inds[k], :, (y[k] - 1):ey[k], (x[k] - 1):ex[k]].unsqueeze(0)
|
||||
im_data.append(imresample(img_k, (48, 48)))
|
||||
im_data = torch.cat(im_data, dim=0)
|
||||
im_data = (im_data - 127.5) * 0.0078125
|
||||
|
||||
# This is equivalent to out = onet(im_data) to avoid GPU out of memory.
|
||||
out = fixed_batch_process(im_data, onet)
|
||||
|
||||
out0 = out[0].permute(1, 0)
|
||||
out1 = out[1].permute(1, 0)
|
||||
out2 = out[2].permute(1, 0)
|
||||
score = out2[1, :]
|
||||
points = out1
|
||||
ipass = score > threshold[2]
|
||||
points = points[:, ipass]
|
||||
boxes = torch.cat((boxes[ipass, :4], score[ipass].unsqueeze(1)), dim=1)
|
||||
image_inds = image_inds[ipass]
|
||||
mv = out0[:, ipass].permute(1, 0)
|
||||
|
||||
w_i = boxes[:, 2] - boxes[:, 0] + 1
|
||||
h_i = boxes[:, 3] - boxes[:, 1] + 1
|
||||
points_x = w_i.repeat(5, 1) * points[:5, :] + boxes[:, 0].repeat(5, 1) - 1
|
||||
points_y = h_i.repeat(5, 1) * points[5:10, :] + boxes[:, 1].repeat(5, 1) - 1
|
||||
points = torch.stack((points_x, points_y)).permute(2, 1, 0)
|
||||
boxes = bbreg(boxes, mv)
|
||||
|
||||
# NMS within each image using "Min" strategy
|
||||
# pick = batched_nms(boxes[:, :4], boxes[:, 4], image_inds, 0.7)
|
||||
pick = batched_nms_numpy(boxes[:, :4], boxes[:, 4], image_inds, 0.7, 'Min')
|
||||
boxes, image_inds, points = boxes[pick], image_inds[pick], points[pick]
|
||||
|
||||
boxes = boxes.cpu().numpy()
|
||||
points = points.cpu().numpy()
|
||||
|
||||
image_inds = image_inds.cpu()
|
||||
|
||||
batch_boxes = []
|
||||
batch_points = []
|
||||
for b_i in range(batch_size):
|
||||
b_i_inds = np.where(image_inds == b_i)
|
||||
batch_boxes.append(boxes[b_i_inds].copy())
|
||||
batch_points.append(points[b_i_inds].copy())
|
||||
|
||||
batch_boxes, batch_points = np.array(batch_boxes), np.array(batch_points)
|
||||
|
||||
return batch_boxes, batch_points
|
||||
|
||||
|
||||
def bbreg(boundingbox, reg):
|
||||
if reg.shape[1] == 1:
|
||||
reg = torch.reshape(reg, (reg.shape[2], reg.shape[3]))
|
||||
|
||||
w = boundingbox[:, 2] - boundingbox[:, 0] + 1
|
||||
h = boundingbox[:, 3] - boundingbox[:, 1] + 1
|
||||
b1 = boundingbox[:, 0] + reg[:, 0] * w
|
||||
b2 = boundingbox[:, 1] + reg[:, 1] * h
|
||||
b3 = boundingbox[:, 2] + reg[:, 2] * w
|
||||
b4 = boundingbox[:, 3] + reg[:, 3] * h
|
||||
boundingbox[:, :4] = torch.stack([b1, b2, b3, b4]).permute(1, 0)
|
||||
|
||||
return boundingbox
|
||||
|
||||
|
||||
def generateBoundingBox(reg, probs, scale, thresh):
|
||||
stride = 2
|
||||
cellsize = 12
|
||||
|
||||
reg = reg.permute(1, 0, 2, 3)
|
||||
|
||||
mask = probs >= thresh
|
||||
mask_inds = mask.nonzero()
|
||||
image_inds = mask_inds[:, 0]
|
||||
score = probs[mask]
|
||||
reg = reg[:, mask].permute(1, 0)
|
||||
bb = mask_inds[:, 1:].type(reg.dtype).flip(1)
|
||||
q1 = ((stride * bb + 1) / scale).floor()
|
||||
q2 = ((stride * bb + cellsize - 1 + 1) / scale).floor()
|
||||
boundingbox = torch.cat([q1, q2, score.unsqueeze(1), reg], dim=1)
|
||||
return boundingbox, image_inds
|
||||
|
||||
|
||||
def nms_numpy(boxes, scores, threshold, method):
|
||||
if boxes.size == 0:
|
||||
return np.empty((0, 3))
|
||||
|
||||
x1 = boxes[:, 0].copy()
|
||||
y1 = boxes[:, 1].copy()
|
||||
x2 = boxes[:, 2].copy()
|
||||
y2 = boxes[:, 3].copy()
|
||||
s = scores
|
||||
area = (x2 - x1 + 1) * (y2 - y1 + 1)
|
||||
|
||||
I = np.argsort(s)
|
||||
pick = np.zeros_like(s, dtype=np.int16)
|
||||
counter = 0
|
||||
while I.size > 0:
|
||||
i = I[-1]
|
||||
pick[counter] = i
|
||||
counter += 1
|
||||
idx = I[0:-1]
|
||||
|
||||
xx1 = np.maximum(x1[i], x1[idx]).copy()
|
||||
yy1 = np.maximum(y1[i], y1[idx]).copy()
|
||||
xx2 = np.minimum(x2[i], x2[idx]).copy()
|
||||
yy2 = np.minimum(y2[i], y2[idx]).copy()
|
||||
|
||||
w = np.maximum(0.0, xx2 - xx1 + 1).copy()
|
||||
h = np.maximum(0.0, yy2 - yy1 + 1).copy()
|
||||
|
||||
inter = w * h
|
||||
if method == 'Min':
|
||||
o = inter / np.minimum(area[i], area[idx])
|
||||
else:
|
||||
o = inter / (area[i] + area[idx] - inter)
|
||||
I = I[np.where(o <= threshold)]
|
||||
|
||||
pick = pick[:counter].copy()
|
||||
return pick
|
||||
|
||||
|
||||
def batched_nms_numpy(boxes, scores, idxs, threshold, method):
|
||||
device = boxes.device
|
||||
if boxes.numel() == 0:
|
||||
return torch.empty((0,), dtype=torch.int64, device=device)
|
||||
# strategy: in order to perform NMS independently per class.
|
||||
# we add an offset to all the boxes. The offset is dependent
|
||||
# only on the class idx, and is large enough so that boxes
|
||||
# from different classes do not overlap
|
||||
max_coordinate = boxes.max()
|
||||
offsets = idxs.to(boxes) * (max_coordinate + 1)
|
||||
boxes_for_nms = boxes + offsets[:, None]
|
||||
boxes_for_nms = boxes_for_nms.cpu().numpy()
|
||||
scores = scores.cpu().numpy()
|
||||
keep = nms_numpy(boxes_for_nms, scores, threshold, method)
|
||||
return torch.as_tensor(keep, dtype=torch.long, device=device)
|
||||
|
||||
|
||||
def pad(boxes, w, h):
|
||||
boxes = boxes.trunc().int().cpu().numpy()
|
||||
x = boxes[:, 0]
|
||||
y = boxes[:, 1]
|
||||
ex = boxes[:, 2]
|
||||
ey = boxes[:, 3]
|
||||
|
||||
x[x < 1] = 1
|
||||
y[y < 1] = 1
|
||||
ex[ex > w] = w
|
||||
ey[ey > h] = h
|
||||
|
||||
return y, ey, x, ex
|
||||
|
||||
|
||||
def rerec(bboxA):
|
||||
h = bboxA[:, 3] - bboxA[:, 1]
|
||||
w = bboxA[:, 2] - bboxA[:, 0]
|
||||
|
||||
l = torch.max(w, h)
|
||||
bboxA[:, 0] = bboxA[:, 0] + w * 0.5 - l * 0.5
|
||||
bboxA[:, 1] = bboxA[:, 1] + h * 0.5 - l * 0.5
|
||||
bboxA[:, 2:4] = bboxA[:, :2] + l.repeat(2, 1).permute(1, 0)
|
||||
|
||||
return bboxA
|
||||
|
||||
|
||||
def imresample(img, sz):
|
||||
im_data = interpolate(img, size=sz, mode="area")
|
||||
return im_data
|
||||
|
||||
|
||||
def crop_resize(img, box, image_size):
|
||||
if isinstance(img, np.ndarray):
|
||||
img = img[box[1]:box[3], box[0]:box[2]]
|
||||
out = cv2.resize(
|
||||
img,
|
||||
(image_size, image_size),
|
||||
interpolation=cv2.INTER_AREA
|
||||
).copy()
|
||||
elif isinstance(img, torch.Tensor):
|
||||
img = img[box[1]:box[3], box[0]:box[2]]
|
||||
out = imresample(
|
||||
img.permute(2, 0, 1).unsqueeze(0).float(),
|
||||
(image_size, image_size)
|
||||
).byte().squeeze(0).permute(1, 2, 0)
|
||||
else:
|
||||
out = img.crop(box).copy().resize((image_size, image_size), Image.BILINEAR)
|
||||
return out
|
||||
|
||||
|
||||
def save_img(img, path):
|
||||
if isinstance(img, np.ndarray):
|
||||
cv2.imwrite(path, cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
|
||||
else:
|
||||
img.save(path)
|
||||
|
||||
|
||||
def get_size(img):
|
||||
if isinstance(img, (np.ndarray, torch.Tensor)):
|
||||
return img.shape[1::-1]
|
||||
else:
|
||||
return img.size
|
||||
|
||||
|
||||
def extract_face(img, box, image_size=160, margin=0, save_path=None):
|
||||
"""Extract face + margin from PIL Image given bounding box.
|
||||
|
||||
Arguments:
|
||||
img {PIL.Image} -- A PIL Image.
|
||||
box {numpy.ndarray} -- Four-element bounding box.
|
||||
image_size {int} -- Output image size in pixels. The image will be square.
|
||||
margin {int} -- Margin to add to bounding box, in terms of pixels in the final image.
|
||||
Note that the application of the margin differs slightly from the davidsandberg/facenet
|
||||
repo, which applies the margin to the original image before resizing, making the margin
|
||||
dependent on the original image size.
|
||||
save_path {str} -- Save path for extracted face image. (default: {None})
|
||||
|
||||
Returns:
|
||||
torch.tensor -- tensor representing the extracted face.
|
||||
"""
|
||||
margin = [
|
||||
margin * (box[2] - box[0]) / (image_size - margin),
|
||||
margin * (box[3] - box[1]) / (image_size - margin),
|
||||
]
|
||||
raw_image_size = get_size(img)
|
||||
box = [
|
||||
int(max(box[0] - margin[0] / 2, 0)),
|
||||
int(max(box[1] - margin[1] / 2, 0)),
|
||||
int(min(box[2] + margin[0] / 2, raw_image_size[0])),
|
||||
int(min(box[3] + margin[1] / 2, raw_image_size[1])),
|
||||
]
|
||||
|
||||
face = crop_resize(img, box, image_size)
|
||||
|
||||
if save_path is not None:
|
||||
os.makedirs(os.path.dirname(save_path) + "/", exist_ok=True)
|
||||
save_img(face, save_path)
|
||||
|
||||
face = F.to_tensor(np.float32(face))
|
||||
|
||||
return face
|
||||
102
Tooling/model/facenet_pytorch/models/utils/download.py
Normal file
102
Tooling/model/facenet_pytorch/models/utils/download.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import hashlib
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
from urllib.request import urlopen, Request
|
||||
|
||||
try:
|
||||
from tqdm.auto import tqdm # automatically select proper tqdm submodule if available
|
||||
except ImportError:
|
||||
try:
|
||||
from tqdm import tqdm
|
||||
except ImportError:
|
||||
# fake tqdm if it's not installed
|
||||
class tqdm(object): # type: ignore
|
||||
|
||||
def __init__(self, total=None, disable=False,
|
||||
unit=None, unit_scale=None, unit_divisor=None):
|
||||
self.total = total
|
||||
self.disable = disable
|
||||
self.n = 0
|
||||
# ignore unit, unit_scale, unit_divisor; they're just for real tqdm
|
||||
|
||||
def update(self, n):
|
||||
if self.disable:
|
||||
return
|
||||
|
||||
self.n += n
|
||||
if self.total is None:
|
||||
sys.stderr.write("\r{0:.1f} bytes".format(self.n))
|
||||
else:
|
||||
sys.stderr.write("\r{0:.1f}%".format(100 * self.n / float(self.total)))
|
||||
sys.stderr.flush()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if self.disable:
|
||||
return
|
||||
|
||||
sys.stderr.write('\n')
|
||||
|
||||
|
||||
def download_url_to_file(url, dst, hash_prefix=None, progress=True):
|
||||
r"""Download object at the given URL to a local path.
|
||||
Args:
|
||||
url (string): URL of the object to download
|
||||
dst (string): Full path where object will be saved, e.g. `/tmp/temporary_file`
|
||||
hash_prefix (string, optional): If not None, the SHA256 downloaded file should start with `hash_prefix`.
|
||||
Default: None
|
||||
progress (bool, optional): whether or not to display a progress bar to stderr
|
||||
Default: True
|
||||
Example:
|
||||
>>> torch.hub.download_url_to_file('https://s3.amazonaws.com/pytorch/models/resnet18-5c106cde.pth', '/tmp/temporary_file')
|
||||
"""
|
||||
file_size = None
|
||||
# We use a different API for python2 since urllib(2) doesn't recognize the CA
|
||||
# certificates in older Python
|
||||
req = Request(url, headers={"User-Agent": "torch.hub"})
|
||||
u = urlopen(req)
|
||||
meta = u.info()
|
||||
if hasattr(meta, 'getheaders'):
|
||||
content_length = meta.getheaders("Content-Length")
|
||||
else:
|
||||
content_length = meta.get_all("Content-Length")
|
||||
if content_length is not None and len(content_length) > 0:
|
||||
file_size = int(content_length[0])
|
||||
|
||||
# We deliberately save it in a temp file and move it after
|
||||
# download is complete. This prevents a local working checkpoint
|
||||
# being overridden by a broken download.
|
||||
dst = os.path.expanduser(dst)
|
||||
dst_dir = os.path.dirname(dst)
|
||||
f = tempfile.NamedTemporaryFile(delete=False, dir=dst_dir)
|
||||
|
||||
try:
|
||||
if hash_prefix is not None:
|
||||
sha256 = hashlib.sha256()
|
||||
with tqdm(total=file_size, disable=not progress,
|
||||
unit='B', unit_scale=True, unit_divisor=1024) as pbar:
|
||||
while True:
|
||||
buffer = u.read(8192)
|
||||
if len(buffer) == 0:
|
||||
break
|
||||
f.write(buffer)
|
||||
if hash_prefix is not None:
|
||||
sha256.update(buffer)
|
||||
pbar.update(len(buffer))
|
||||
|
||||
f.close()
|
||||
if hash_prefix is not None:
|
||||
digest = sha256.hexdigest()
|
||||
if digest[:len(hash_prefix)] != hash_prefix:
|
||||
raise RuntimeError('invalid hash value (expected "{}", got "{}")'
|
||||
.format(hash_prefix, digest))
|
||||
shutil.move(f.name, dst)
|
||||
finally:
|
||||
f.close()
|
||||
if os.path.exists(f.name):
|
||||
os.remove(f.name)
|
||||
416
Tooling/model/facenet_pytorch/models/utils/tensorflow2pytorch.py
Normal file
416
Tooling/model/facenet_pytorch/models/utils/tensorflow2pytorch.py
Normal file
@@ -0,0 +1,416 @@
|
||||
import tensorflow as tf
|
||||
import torch
|
||||
import json
|
||||
import os, sys
|
||||
|
||||
from dependencies.facenet.src import facenet
|
||||
from dependencies.facenet.src.models import inception_resnet_v1 as tf_mdl
|
||||
from dependencies.facenet.src.align import detect_face
|
||||
|
||||
from models.inception_resnet_v1 import InceptionResnetV1
|
||||
from models.mtcnn import PNet, RNet, ONet
|
||||
|
||||
|
||||
def import_tf_params(tf_mdl_dir, sess):
|
||||
"""Import tensorflow model from save directory.
|
||||
|
||||
Arguments:
|
||||
tf_mdl_dir {str} -- Location of protobuf, checkpoint, meta files.
|
||||
sess {tensorflow.Session} -- Tensorflow session object.
|
||||
|
||||
Returns:
|
||||
(list, list, list) -- Tuple of lists containing the layer names,
|
||||
parameter arrays as numpy ndarrays, parameter shapes.
|
||||
"""
|
||||
print('\nLoading tensorflow model\n')
|
||||
if callable(tf_mdl_dir):
|
||||
tf_mdl_dir(sess)
|
||||
else:
|
||||
facenet.load_model(tf_mdl_dir)
|
||||
|
||||
print('\nGetting model weights\n')
|
||||
tf_layers = tf.trainable_variables()
|
||||
tf_params = sess.run(tf_layers)
|
||||
|
||||
tf_shapes = [p.shape for p in tf_params]
|
||||
tf_layers = [l.name for l in tf_layers]
|
||||
|
||||
if not callable(tf_mdl_dir):
|
||||
path = os.path.join(tf_mdl_dir, 'layer_description.json')
|
||||
else:
|
||||
path = 'data/layer_description.json'
|
||||
with open(path, 'w') as f:
|
||||
json.dump({l: s for l, s in zip(tf_layers, tf_shapes)}, f)
|
||||
|
||||
return tf_layers, tf_params, tf_shapes
|
||||
|
||||
|
||||
def get_layer_indices(layer_lookup, tf_layers):
|
||||
"""Giving a lookup of model layer attribute names and tensorflow variable names,
|
||||
find matching parameters.
|
||||
|
||||
Arguments:
|
||||
layer_lookup {dict} -- Dictionary mapping pytorch attribute names to (partial)
|
||||
tensorflow variable names. Expects dict of the form {'attr': ['tf_name', ...]}
|
||||
where the '...'s are ignored.
|
||||
tf_layers {list} -- List of tensorflow variable names.
|
||||
|
||||
Returns:
|
||||
list -- The input dictionary with the list of matching inds appended to each item.
|
||||
"""
|
||||
layer_inds = {}
|
||||
for name, value in layer_lookup.items():
|
||||
layer_inds[name] = value + [[i for i, n in enumerate(tf_layers) if value[0] in n]]
|
||||
return layer_inds
|
||||
|
||||
|
||||
def load_tf_batchNorm(weights, layer):
|
||||
"""Load tensorflow weights into nn.BatchNorm object.
|
||||
|
||||
Arguments:
|
||||
weights {list} -- Tensorflow parameters.
|
||||
layer {torch.nn.Module} -- nn.BatchNorm.
|
||||
"""
|
||||
layer.bias.data = torch.tensor(weights[0]).view(layer.bias.data.shape)
|
||||
layer.weight.data = torch.ones_like(layer.weight.data)
|
||||
layer.running_mean = torch.tensor(weights[1]).view(layer.running_mean.shape)
|
||||
layer.running_var = torch.tensor(weights[2]).view(layer.running_var.shape)
|
||||
|
||||
|
||||
def load_tf_conv2d(weights, layer, transpose=False):
|
||||
"""Load tensorflow weights into nn.Conv2d object.
|
||||
|
||||
Arguments:
|
||||
weights {list} -- Tensorflow parameters.
|
||||
layer {torch.nn.Module} -- nn.Conv2d.
|
||||
"""
|
||||
if isinstance(weights, list):
|
||||
if len(weights) == 2:
|
||||
layer.bias.data = (
|
||||
torch.tensor(weights[1])
|
||||
.view(layer.bias.data.shape)
|
||||
)
|
||||
weights = weights[0]
|
||||
|
||||
if transpose:
|
||||
dim_order = (3, 2, 1, 0)
|
||||
else:
|
||||
dim_order = (3, 2, 0, 1)
|
||||
|
||||
layer.weight.data = (
|
||||
torch.tensor(weights)
|
||||
.permute(dim_order)
|
||||
.view(layer.weight.data.shape)
|
||||
)
|
||||
|
||||
|
||||
def load_tf_conv2d_trans(weights, layer):
|
||||
return load_tf_conv2d(weights, layer, transpose=True)
|
||||
|
||||
|
||||
def load_tf_basicConv2d(weights, layer):
|
||||
"""Load tensorflow weights into grouped Conv2d+BatchNorm object.
|
||||
|
||||
Arguments:
|
||||
weights {list} -- Tensorflow parameters.
|
||||
layer {torch.nn.Module} -- Object containing Conv2d+BatchNorm.
|
||||
"""
|
||||
load_tf_conv2d(weights[0], layer.conv)
|
||||
load_tf_batchNorm(weights[1:], layer.bn)
|
||||
|
||||
|
||||
def load_tf_linear(weights, layer):
|
||||
"""Load tensorflow weights into nn.Linear object.
|
||||
|
||||
Arguments:
|
||||
weights {list} -- Tensorflow parameters.
|
||||
layer {torch.nn.Module} -- nn.Linear.
|
||||
"""
|
||||
if isinstance(weights, list):
|
||||
if len(weights) == 2:
|
||||
layer.bias.data = (
|
||||
torch.tensor(weights[1])
|
||||
.view(layer.bias.data.shape)
|
||||
)
|
||||
weights = weights[0]
|
||||
layer.weight.data = (
|
||||
torch.tensor(weights)
|
||||
.transpose(-1, 0)
|
||||
.view(layer.weight.data.shape)
|
||||
)
|
||||
|
||||
|
||||
# High-level parameter-loading functions:
|
||||
|
||||
def load_tf_block35(weights, layer):
|
||||
load_tf_basicConv2d(weights[:4], layer.branch0)
|
||||
load_tf_basicConv2d(weights[4:8], layer.branch1[0])
|
||||
load_tf_basicConv2d(weights[8:12], layer.branch1[1])
|
||||
load_tf_basicConv2d(weights[12:16], layer.branch2[0])
|
||||
load_tf_basicConv2d(weights[16:20], layer.branch2[1])
|
||||
load_tf_basicConv2d(weights[20:24], layer.branch2[2])
|
||||
load_tf_conv2d(weights[24:26], layer.conv2d)
|
||||
|
||||
|
||||
def load_tf_block17_8(weights, layer):
|
||||
load_tf_basicConv2d(weights[:4], layer.branch0)
|
||||
load_tf_basicConv2d(weights[4:8], layer.branch1[0])
|
||||
load_tf_basicConv2d(weights[8:12], layer.branch1[1])
|
||||
load_tf_basicConv2d(weights[12:16], layer.branch1[2])
|
||||
load_tf_conv2d(weights[16:18], layer.conv2d)
|
||||
|
||||
|
||||
def load_tf_mixed6a(weights, layer):
|
||||
if len(weights) != 16:
|
||||
raise ValueError(f'Number of weight arrays ({len(weights)}) not equal to 16')
|
||||
load_tf_basicConv2d(weights[:4], layer.branch0)
|
||||
load_tf_basicConv2d(weights[4:8], layer.branch1[0])
|
||||
load_tf_basicConv2d(weights[8:12], layer.branch1[1])
|
||||
load_tf_basicConv2d(weights[12:16], layer.branch1[2])
|
||||
|
||||
|
||||
def load_tf_mixed7a(weights, layer):
|
||||
if len(weights) != 28:
|
||||
raise ValueError(f'Number of weight arrays ({len(weights)}) not equal to 28')
|
||||
load_tf_basicConv2d(weights[:4], layer.branch0[0])
|
||||
load_tf_basicConv2d(weights[4:8], layer.branch0[1])
|
||||
load_tf_basicConv2d(weights[8:12], layer.branch1[0])
|
||||
load_tf_basicConv2d(weights[12:16], layer.branch1[1])
|
||||
load_tf_basicConv2d(weights[16:20], layer.branch2[0])
|
||||
load_tf_basicConv2d(weights[20:24], layer.branch2[1])
|
||||
load_tf_basicConv2d(weights[24:28], layer.branch2[2])
|
||||
|
||||
|
||||
def load_tf_repeats(weights, layer, rptlen, subfun):
|
||||
if len(weights) % rptlen != 0:
|
||||
raise ValueError(f'Number of weight arrays ({len(weights)}) not divisible by {rptlen}')
|
||||
weights_split = [weights[i:i+rptlen] for i in range(0, len(weights), rptlen)]
|
||||
for i, w in enumerate(weights_split):
|
||||
subfun(w, getattr(layer, str(i)))
|
||||
|
||||
|
||||
def load_tf_repeat_1(weights, layer):
|
||||
load_tf_repeats(weights, layer, 26, load_tf_block35)
|
||||
|
||||
|
||||
def load_tf_repeat_2(weights, layer):
|
||||
load_tf_repeats(weights, layer, 18, load_tf_block17_8)
|
||||
|
||||
|
||||
def load_tf_repeat_3(weights, layer):
|
||||
load_tf_repeats(weights, layer, 18, load_tf_block17_8)
|
||||
|
||||
|
||||
def test_loaded_params(mdl, tf_params, tf_layers):
|
||||
"""Check each parameter in a pytorch model for an equivalent parameter
|
||||
in a list of tensorflow variables.
|
||||
|
||||
Arguments:
|
||||
mdl {torch.nn.Module} -- Pytorch model.
|
||||
tf_params {list} -- List of ndarrays representing tensorflow variables.
|
||||
tf_layers {list} -- Corresponding list of tensorflow variable names.
|
||||
"""
|
||||
tf_means = torch.stack([torch.tensor(p).mean() for p in tf_params])
|
||||
for name, param in mdl.named_parameters():
|
||||
pt_mean = param.data.mean()
|
||||
matching_inds = ((tf_means - pt_mean).abs() < 1e-8).nonzero()
|
||||
print(f'{name} equivalent to {[tf_layers[i] for i in matching_inds]}')
|
||||
|
||||
|
||||
def compare_model_outputs(pt_mdl, sess, test_data):
|
||||
"""Given some testing data, compare the output of pytorch and tensorflow models.
|
||||
|
||||
Arguments:
|
||||
pt_mdl {torch.nn.Module} -- Pytorch model.
|
||||
sess {tensorflow.Session} -- Tensorflow session object.
|
||||
test_data {torch.Tensor} -- Pytorch tensor.
|
||||
"""
|
||||
print('\nPassing test data through TF model\n')
|
||||
if isinstance(sess, tf.Session):
|
||||
images_placeholder = tf.get_default_graph().get_tensor_by_name("input:0")
|
||||
phase_train_placeholder = tf.get_default_graph().get_tensor_by_name("phase_train:0")
|
||||
embeddings = tf.get_default_graph().get_tensor_by_name("embeddings:0")
|
||||
feed_dict = {images_placeholder: test_data.numpy(), phase_train_placeholder: False}
|
||||
tf_output = torch.tensor(sess.run(embeddings, feed_dict=feed_dict))
|
||||
else:
|
||||
tf_output = sess(test_data)
|
||||
|
||||
print(tf_output)
|
||||
|
||||
print('\nPassing test data through PT model\n')
|
||||
pt_output = pt_mdl(test_data.permute(0, 3, 1, 2))
|
||||
print(pt_output)
|
||||
|
||||
distance = (tf_output - pt_output).norm()
|
||||
print(f'\nDistance {distance}\n')
|
||||
|
||||
|
||||
def compare_mtcnn(pt_mdl, tf_fun, sess, ind, test_data):
|
||||
tf_mdls = tf_fun(sess)
|
||||
tf_mdl = tf_mdls[ind]
|
||||
|
||||
print('\nPassing test data through TF model\n')
|
||||
tf_output = tf_mdl(test_data.numpy())
|
||||
tf_output = [torch.tensor(out) for out in tf_output]
|
||||
print('\n'.join([str(o.view(-1)[:10]) for o in tf_output]))
|
||||
|
||||
print('\nPassing test data through PT model\n')
|
||||
with torch.no_grad():
|
||||
pt_output = pt_mdl(test_data.permute(0, 3, 2, 1))
|
||||
pt_output = [torch.tensor(out) for out in pt_output]
|
||||
for i in range(len(pt_output)):
|
||||
if len(pt_output[i].shape) == 4:
|
||||
pt_output[i] = pt_output[i].permute(0, 3, 2, 1).contiguous()
|
||||
print('\n'.join([str(o.view(-1)[:10]) for o in pt_output]))
|
||||
|
||||
distance = [(tf_o - pt_o).norm() for tf_o, pt_o in zip(tf_output, pt_output)]
|
||||
print(f'\nDistance {distance}\n')
|
||||
|
||||
|
||||
def load_tf_model_weights(mdl, layer_lookup, tf_mdl_dir, is_resnet=True, arg_num=None):
|
||||
"""Load tensorflow parameters into a pytorch model.
|
||||
|
||||
Arguments:
|
||||
mdl {torch.nn.Module} -- Pytorch model.
|
||||
layer_lookup {[type]} -- Dictionary mapping pytorch attribute names to (partial)
|
||||
tensorflow variable names, and a function suitable for loading weights.
|
||||
Expects dict of the form {'attr': ['tf_name', function]}.
|
||||
tf_mdl_dir {str} -- Location of protobuf, checkpoint, meta files.
|
||||
"""
|
||||
tf.reset_default_graph()
|
||||
with tf.Session() as sess:
|
||||
tf_layers, tf_params, tf_shapes = import_tf_params(tf_mdl_dir, sess)
|
||||
layer_info = get_layer_indices(layer_lookup, tf_layers)
|
||||
|
||||
for layer_name, info in layer_info.items():
|
||||
print(f'Loading {info[0]}/* into {layer_name}')
|
||||
weights = [tf_params[i] for i in info[2]]
|
||||
layer = getattr(mdl, layer_name)
|
||||
info[1](weights, layer)
|
||||
|
||||
test_loaded_params(mdl, tf_params, tf_layers)
|
||||
|
||||
if is_resnet:
|
||||
compare_model_outputs(mdl, sess, torch.randn(5, 160, 160, 3).detach())
|
||||
|
||||
|
||||
def tensorflow2pytorch():
|
||||
lookup_inception_resnet_v1 = {
|
||||
'conv2d_1a': ['InceptionResnetV1/Conv2d_1a_3x3', load_tf_basicConv2d],
|
||||
'conv2d_2a': ['InceptionResnetV1/Conv2d_2a_3x3', load_tf_basicConv2d],
|
||||
'conv2d_2b': ['InceptionResnetV1/Conv2d_2b_3x3', load_tf_basicConv2d],
|
||||
'conv2d_3b': ['InceptionResnetV1/Conv2d_3b_1x1', load_tf_basicConv2d],
|
||||
'conv2d_4a': ['InceptionResnetV1/Conv2d_4a_3x3', load_tf_basicConv2d],
|
||||
'conv2d_4b': ['InceptionResnetV1/Conv2d_4b_3x3', load_tf_basicConv2d],
|
||||
'repeat_1': ['InceptionResnetV1/Repeat/block35', load_tf_repeat_1],
|
||||
'mixed_6a': ['InceptionResnetV1/Mixed_6a', load_tf_mixed6a],
|
||||
'repeat_2': ['InceptionResnetV1/Repeat_1/block17', load_tf_repeat_2],
|
||||
'mixed_7a': ['InceptionResnetV1/Mixed_7a', load_tf_mixed7a],
|
||||
'repeat_3': ['InceptionResnetV1/Repeat_2/block8', load_tf_repeat_3],
|
||||
'block8': ['InceptionResnetV1/Block8', load_tf_block17_8],
|
||||
'last_linear': ['InceptionResnetV1/Bottleneck/weights', load_tf_linear],
|
||||
'last_bn': ['InceptionResnetV1/Bottleneck/BatchNorm', load_tf_batchNorm],
|
||||
'logits': ['Logits', load_tf_linear],
|
||||
}
|
||||
|
||||
print('\nLoad VGGFace2-trained weights and save\n')
|
||||
mdl = InceptionResnetV1(num_classes=8631).eval()
|
||||
tf_mdl_dir = 'data/20180402-114759'
|
||||
data_name = 'vggface2'
|
||||
load_tf_model_weights(mdl, lookup_inception_resnet_v1, tf_mdl_dir)
|
||||
state_dict = mdl.state_dict()
|
||||
torch.save(state_dict, f'{tf_mdl_dir}-{data_name}.pt')
|
||||
torch.save(
|
||||
{
|
||||
'logits.weight': state_dict['logits.weight'],
|
||||
'logits.bias': state_dict['logits.bias'],
|
||||
},
|
||||
f'{tf_mdl_dir}-{data_name}-logits.pt'
|
||||
)
|
||||
state_dict.pop('logits.weight')
|
||||
state_dict.pop('logits.bias')
|
||||
torch.save(state_dict, f'{tf_mdl_dir}-{data_name}-features.pt')
|
||||
|
||||
print('\nLoad CASIA-Webface-trained weights and save\n')
|
||||
mdl = InceptionResnetV1(num_classes=10575).eval()
|
||||
tf_mdl_dir = 'data/20180408-102900'
|
||||
data_name = 'casia-webface'
|
||||
load_tf_model_weights(mdl, lookup_inception_resnet_v1, tf_mdl_dir)
|
||||
state_dict = mdl.state_dict()
|
||||
torch.save(state_dict, f'{tf_mdl_dir}-{data_name}.pt')
|
||||
torch.save(
|
||||
{
|
||||
'logits.weight': state_dict['logits.weight'],
|
||||
'logits.bias': state_dict['logits.bias'],
|
||||
},
|
||||
f'{tf_mdl_dir}-{data_name}-logits.pt'
|
||||
)
|
||||
state_dict.pop('logits.weight')
|
||||
state_dict.pop('logits.bias')
|
||||
torch.save(state_dict, f'{tf_mdl_dir}-{data_name}-features.pt')
|
||||
|
||||
lookup_pnet = {
|
||||
'conv1': ['pnet/conv1', load_tf_conv2d_trans],
|
||||
'prelu1': ['pnet/PReLU1', load_tf_linear],
|
||||
'conv2': ['pnet/conv2', load_tf_conv2d_trans],
|
||||
'prelu2': ['pnet/PReLU2', load_tf_linear],
|
||||
'conv3': ['pnet/conv3', load_tf_conv2d_trans],
|
||||
'prelu3': ['pnet/PReLU3', load_tf_linear],
|
||||
'conv4_1': ['pnet/conv4-1', load_tf_conv2d_trans],
|
||||
'conv4_2': ['pnet/conv4-2', load_tf_conv2d_trans],
|
||||
}
|
||||
lookup_rnet = {
|
||||
'conv1': ['rnet/conv1', load_tf_conv2d_trans],
|
||||
'prelu1': ['rnet/prelu1', load_tf_linear],
|
||||
'conv2': ['rnet/conv2', load_tf_conv2d_trans],
|
||||
'prelu2': ['rnet/prelu2', load_tf_linear],
|
||||
'conv3': ['rnet/conv3', load_tf_conv2d_trans],
|
||||
'prelu3': ['rnet/prelu3', load_tf_linear],
|
||||
'dense4': ['rnet/conv4', load_tf_linear],
|
||||
'prelu4': ['rnet/prelu4', load_tf_linear],
|
||||
'dense5_1': ['rnet/conv5-1', load_tf_linear],
|
||||
'dense5_2': ['rnet/conv5-2', load_tf_linear],
|
||||
}
|
||||
lookup_onet = {
|
||||
'conv1': ['onet/conv1', load_tf_conv2d_trans],
|
||||
'prelu1': ['onet/prelu1', load_tf_linear],
|
||||
'conv2': ['onet/conv2', load_tf_conv2d_trans],
|
||||
'prelu2': ['onet/prelu2', load_tf_linear],
|
||||
'conv3': ['onet/conv3', load_tf_conv2d_trans],
|
||||
'prelu3': ['onet/prelu3', load_tf_linear],
|
||||
'conv4': ['onet/conv4', load_tf_conv2d_trans],
|
||||
'prelu4': ['onet/prelu4', load_tf_linear],
|
||||
'dense5': ['onet/conv5', load_tf_linear],
|
||||
'prelu5': ['onet/prelu5', load_tf_linear],
|
||||
'dense6_1': ['onet/conv6-1', load_tf_linear],
|
||||
'dense6_2': ['onet/conv6-2', load_tf_linear],
|
||||
'dense6_3': ['onet/conv6-3', load_tf_linear],
|
||||
}
|
||||
|
||||
print('\nLoad PNet weights and save\n')
|
||||
tf_mdl_dir = lambda sess: detect_face.create_mtcnn(sess, None)
|
||||
mdl = PNet()
|
||||
data_name = 'pnet'
|
||||
load_tf_model_weights(mdl, lookup_pnet, tf_mdl_dir, is_resnet=False, arg_num=0)
|
||||
torch.save(mdl.state_dict(), f'data/{data_name}.pt')
|
||||
tf.reset_default_graph()
|
||||
with tf.Session() as sess:
|
||||
compare_mtcnn(mdl, tf_mdl_dir, sess, 0, torch.randn(1, 256, 256, 3).detach())
|
||||
|
||||
print('\nLoad RNet weights and save\n')
|
||||
mdl = RNet()
|
||||
data_name = 'rnet'
|
||||
load_tf_model_weights(mdl, lookup_rnet, tf_mdl_dir, is_resnet=False, arg_num=1)
|
||||
torch.save(mdl.state_dict(), f'data/{data_name}.pt')
|
||||
tf.reset_default_graph()
|
||||
with tf.Session() as sess:
|
||||
compare_mtcnn(mdl, tf_mdl_dir, sess, 1, torch.randn(1, 24, 24, 3).detach())
|
||||
|
||||
print('\nLoad ONet weights and save\n')
|
||||
mdl = ONet()
|
||||
data_name = 'onet'
|
||||
load_tf_model_weights(mdl, lookup_onet, tf_mdl_dir, is_resnet=False, arg_num=2)
|
||||
torch.save(mdl.state_dict(), f'data/{data_name}.pt')
|
||||
tf.reset_default_graph()
|
||||
with tf.Session() as sess:
|
||||
compare_mtcnn(mdl, tf_mdl_dir, sess, 2, torch.randn(1, 48, 48, 3).detach())
|
||||
144
Tooling/model/facenet_pytorch/models/utils/training.py
Normal file
144
Tooling/model/facenet_pytorch/models/utils/training.py
Normal file
@@ -0,0 +1,144 @@
|
||||
import torch
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
|
||||
class Logger(object):
|
||||
|
||||
def __init__(self, mode, length, calculate_mean=False):
|
||||
self.mode = mode
|
||||
self.length = length
|
||||
self.calculate_mean = calculate_mean
|
||||
if self.calculate_mean:
|
||||
self.fn = lambda x, i: x / (i + 1)
|
||||
else:
|
||||
self.fn = lambda x, i: x
|
||||
|
||||
def __call__(self, loss, metrics, i):
|
||||
track_str = '\r{} | {:5d}/{:<5d}| '.format(self.mode, i + 1, self.length)
|
||||
loss_str = 'loss: {:9.4f} | '.format(self.fn(loss, i))
|
||||
metric_str = ' | '.join('{}: {:9.4f}'.format(k, self.fn(v, i)) for k, v in metrics.items())
|
||||
print(track_str + loss_str + metric_str + ' ', end='')
|
||||
if i + 1 == self.length:
|
||||
print('')
|
||||
|
||||
|
||||
class BatchTimer(object):
|
||||
"""Batch timing class.
|
||||
Use this class for tracking training and testing time/rate per batch or per sample.
|
||||
|
||||
Keyword Arguments:
|
||||
rate {bool} -- Whether to report a rate (batches or samples per second) or a time (seconds
|
||||
per batch or sample). (default: {True})
|
||||
per_sample {bool} -- Whether to report times or rates per sample or per batch.
|
||||
(default: {True})
|
||||
"""
|
||||
|
||||
def __init__(self, rate=True, per_sample=True):
|
||||
self.start = time.time()
|
||||
self.end = None
|
||||
self.rate = rate
|
||||
self.per_sample = per_sample
|
||||
|
||||
def __call__(self, y_pred, y):
|
||||
self.end = time.time()
|
||||
elapsed = self.end - self.start
|
||||
self.start = self.end
|
||||
self.end = None
|
||||
|
||||
if self.per_sample:
|
||||
elapsed /= len(y_pred)
|
||||
if self.rate:
|
||||
elapsed = 1 / elapsed
|
||||
|
||||
return torch.tensor(elapsed)
|
||||
|
||||
|
||||
def accuracy(logits, y):
|
||||
_, preds = torch.max(logits, 1)
|
||||
return (preds == y).float().mean()
|
||||
|
||||
|
||||
def pass_epoch(
|
||||
model, loss_fn, loader, optimizer=None, scheduler=None,
|
||||
batch_metrics={'time': BatchTimer()}, show_running=True,
|
||||
device='cpu', writer=None
|
||||
):
|
||||
"""Train or evaluate over a data epoch.
|
||||
|
||||
Arguments:
|
||||
model {torch.nn.Module} -- Pytorch model.
|
||||
loss_fn {callable} -- A function to compute (scalar) loss.
|
||||
loader {torch.utils.data.DataLoader} -- A pytorch data loader.
|
||||
|
||||
Keyword Arguments:
|
||||
optimizer {torch.optim.Optimizer} -- A pytorch optimizer.
|
||||
scheduler {torch.optim.lr_scheduler._LRScheduler} -- LR scheduler (default: {None})
|
||||
batch_metrics {dict} -- Dictionary of metric functions to call on each batch. The default
|
||||
is a simple timer. A progressive average of these metrics, along with the average
|
||||
loss, is printed every batch. (default: {{'time': iter_timer()}})
|
||||
show_running {bool} -- Whether or not to print losses and metrics for the current batch
|
||||
or rolling averages. (default: {False})
|
||||
device {str or torch.device} -- Device for pytorch to use. (default: {'cpu'})
|
||||
writer {torch.utils.tensorboard.SummaryWriter} -- Tensorboard SummaryWriter. (default: {None})
|
||||
|
||||
Returns:
|
||||
tuple(torch.Tensor, dict) -- A tuple of the average loss and a dictionary of average
|
||||
metric values across the epoch.
|
||||
"""
|
||||
|
||||
mode = 'Train' if model.training else 'Valid'
|
||||
logger = Logger(mode, length=len(loader), calculate_mean=show_running)
|
||||
loss = 0
|
||||
metrics = {}
|
||||
|
||||
for i_batch, (x, y) in enumerate(loader):
|
||||
x = x.to(device)
|
||||
y = y.to(device)
|
||||
y_pred = model(x)
|
||||
loss_batch = loss_fn(y_pred, y)
|
||||
|
||||
if model.training:
|
||||
loss_batch.backward()
|
||||
optimizer.step()
|
||||
optimizer.zero_grad()
|
||||
|
||||
metrics_batch = {}
|
||||
for metric_name, metric_fn in batch_metrics.items():
|
||||
metrics_batch[metric_name] = metric_fn(y_pred, y).detach().cpu()
|
||||
metrics[metric_name] = metrics.get(metric_name, 0) + metrics_batch[metric_name]
|
||||
|
||||
if writer is not None and model.training:
|
||||
if writer.iteration % writer.interval == 0:
|
||||
writer.add_scalars('loss', {mode: loss_batch.detach().cpu()}, writer.iteration)
|
||||
for metric_name, metric_batch in metrics_batch.items():
|
||||
writer.add_scalars(metric_name, {mode: metric_batch}, writer.iteration)
|
||||
writer.iteration += 1
|
||||
|
||||
loss_batch = loss_batch.detach().cpu()
|
||||
loss += loss_batch
|
||||
if show_running:
|
||||
logger(loss, metrics, i_batch)
|
||||
else:
|
||||
logger(loss_batch, metrics_batch, i_batch)
|
||||
|
||||
if model.training and scheduler is not None:
|
||||
scheduler.step()
|
||||
|
||||
loss = loss / (i_batch + 1)
|
||||
metrics = {k: v / (i_batch + 1) for k, v in metrics.items()}
|
||||
|
||||
if writer is not None and not model.training:
|
||||
writer.add_scalars('loss', {mode: loss.detach()}, writer.iteration)
|
||||
for metric_name, metric in metrics.items():
|
||||
writer.add_scalars(metric_name, {mode: metric})
|
||||
|
||||
return loss, metrics
|
||||
|
||||
|
||||
def collate_pil(x):
|
||||
out_x, out_y = [], []
|
||||
for xx, yy in x:
|
||||
out_x.append(xx)
|
||||
out_y.append(yy)
|
||||
return out_x, out_y
|
||||
Reference in New Issue
Block a user