Commit f350b742 authored by Quan's avatar Quan

init

parent 33d07985
Pipeline #490 failed with stages
# README
* This project implement the head detection model and human detection. More detail about each model can be read at [head detection](http://192.168.0.232:8929/tienln4/ai_camera_detector/-/blob/master/docs/head.md) and [person detection](http://192.168.0.232:8929/tienln4/ai_camera_detector/-/blob/master/docs/person.md)
# Version
- mb2-ssd-lite_f19: original ssd-lite model
- mb2-ssd-lite_f38: for head detection
- mb2-ssd-lite_f38_person: for person detection (small objects)
- rfb_tiny_mb2_ssd: for person detection (there are two sub version: c32 fast and c64 slow)
```
* config(c32 and c64): at line 76 (/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/module/rfb_tiny_mobilenet_v2.py)
```
\ No newline at end of file
ai_camera_detector @ 7defbe39
Subproject commit 7defbe39bdff3b81a02cec330119d2cef4459cf1
import sys
sys.path.append('/media/ducanh/DATA/tienln/ai_camera/detector/')
import cv2
# from PIL import Image
import numpy as np
import os
from datasets.data_preprocessing import PredictionTransform
size = 300
transform = PredictionTransform(size)
def get_input_FD(img_raw):
img = transform(img_raw)
img = np.expand_dims(img, axis=0)
return img
image_path = '/media/ducanh/DATA/tienln/ai_camera/app/app_head_detection/imgs'
images = os.listdir(image_path)
paths = []
for image_name in images:
print(image_name)
img = cv2.imread(os.path.join(image_path, image_name))
img_processed = get_input_FD(img)
img_processed.tofile(os.path.join('/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data', image_name).replace('.jpg', ".raw"))
paths.append(os.path.join('/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data', image_name).replace('.jpg', ".raw"))
print("test")
with open('/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/data_quantize_person.txt', 'w') as f:
for item in paths:
f.write("%s\n" % item)
\ No newline at end of file
raw_data/66063.raw
raw_data/10118.raw
raw_data/52309.raw
raw_data/17374.raw
raw_data/55038.raw
raw_data/21594.raw
raw_data/46879.raw
raw_data/72073.raw
raw_data/34577.raw
raw_data/22842.raw
raw_data/19030.raw
raw_data/22111.raw
raw_data/72055.raw
raw_data/50707.raw
raw_data/29345.raw
raw_data/14942.raw
raw_data/49723.raw
raw_data/61017.raw
raw_data/51772.raw
raw_data/42987.raw
raw_data/51994.raw
raw_data/86928.raw
raw_data/59368.raw
raw_data/42194.raw
raw_data/47692.raw
raw_data/12384.raw
raw_data/25929.raw
raw_data/61103.raw
raw_data/53785.raw
raw_data/23324.raw
raw_data/72880.raw
raw_data/63513.raw
raw_data/68275.raw
raw_data/75792.raw
raw_data/85954.raw
raw_data/46905.raw
raw_data/69667.raw
raw_data/47457.raw
raw_data/69533.raw
raw_data/15378.raw
raw_data/37103.raw
raw_data/63752.raw
raw_data/51264.raw
raw_data/44684.raw
raw_data/70829.raw
raw_data/25730.raw
raw_data/58802.raw
raw_data/37585.raw
raw_data/12155.raw
raw_data/39834.raw
raw_data/79228.raw
raw_data/27457.raw
raw_data/59429.raw
raw_data/32689.raw
raw_data/82567.raw
raw_data/78851.raw
raw_data/70275.raw
raw_data/37352.raw
raw_data/58027.raw
raw_data/53293.raw
raw_data/77275.raw
raw_data/68371.raw
raw_data/56403.raw
raw_data/84186.raw
raw_data/25011.raw
raw_data/29547.raw
raw_data/43464.raw
raw_data/78356.raw
raw_data/18074.raw
raw_data/25941.raw
raw_data/26285.raw
raw_data/52162.raw
raw_data/44600.raw
raw_data/21081.raw
raw_data/72377.raw
raw_data/86938.raw
raw_data/28992.raw
raw_data/16366.raw
raw_data/21919.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/29.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/21.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/5.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/3.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/24.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/16.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/22.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/13.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/26.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/17.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/35.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/31.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/25.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/Screenshot from 2020-08-24 09-04-49.png
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/6.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/30.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/32.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/37.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/2.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/36.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/28.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/1.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/15.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/crop_26.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/8.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/27.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/9.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/23.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/11.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/34.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/33.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/19.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/10.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/18.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/12.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/20.raw
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/Screenshot from 2020-08-24 09-03-46.png
/media/ducanh/DATA/tienln/ai_camera/detector/app/quantize/raw_data/processed.jpeg
import numpy as np
import logging
import pathlib
import xml.etree.ElementTree as ET
import cv2
import os
import json
class _DataLoader:
def __init__(self, root, transform=None, target_transform=None):
self.anno_path = root[0]
self.img_path = root[1]
self.transform = transform
self.target_transform = target_transform
self.ids = []
self.class_names = ('BACKGROUND', 'person')
self.class_dict = {class_name: i for i, class_name in enumerate(self.class_names)}
self._annopath = os.path.join('%s', 'json_annotations', '%s.json')
for file in os.listdir(self.anno_path):
with open(os.path.join(self.anno_path,file), 'r') as f:
data = json.load(f)
objects = data["objects"]
for sub_object in data["objects"]:
if sub_object["label"]=="person":
self.ids.append(file.split(".json")[0])
break
def __getitem__(self, index):
image_id = self.ids[index]
boxes, labels= self._get_annotation(image_id)
image = self._read_image(image_id)
if self.transform:
image, boxes, labels = self.transform(image, boxes, labels)
print(image)
if self.target_transform:
boxes, labels = self.target_transform(boxes, labels)
return image, boxes, labels
def __len__(self):
return len(self.ids)
def _get_annotation(self, image_id):
annotation_file = os.path.join(self.anno_path,image_id+".json")
# print(annotation_file)
with open(annotation_file, 'r') as f:
data = json.load(f)
objects = data["objects"]
boxes = []
labels = []
for sub_object in objects:
class_name = sub_object["label"]
if class_name in self.class_dict:
bbox = sub_object["bbox"]
x1 = float(bbox["x_topleft"])
y1 = float(bbox["y_topleft"])
x2 = x1 + float(bbox["w"])
y2 = y1 + float(bbox["h"])
boxes.append([x1, y1, x2, y2])
labels.append(self.class_dict[class_name])
return (np.array(boxes, dtype=np.float32),
np.array(labels, dtype=np.int64))
def _read_image(self, image_id):
if os.path.isfile(os.path.join(self.img_path,image_id+".jpg")):
image_file = os.path.join(self.img_path,image_id+".jpg")
elif os.path.isfile(os.path.join(self.img_path,image_id+".jpeg")):
image_file = os.path.join(self.img_path,image_id+".jpeg")
else :
image_file = os.path.join(self.img_path,image_id+".png")
image = cv2.imread(str(image_file))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
return image
\ No newline at end of file
# from ..transforms.transforms import *
from datasets.data_transform import *
class TrainAugmentation:
def __init__(self, size, mean=0, std=1.0):
"""
Args:
size: the size the of final image.
mean: mean pixel value per channel.
"""
self.mean = mean
self.size = size
self.augment = Compose([
# ConvertFromInts(),
# PhotometricDistort(),
# Expand(self.mean),
# RandomSampleCrop(),
# RandomMirror(),
ToPercentCoords(),
Resize(self.size),
SubtractMeans(self.mean),
lambda img, boxes=None, labels=None: (img / std, boxes, labels),
ToTensor(),
])
def __call__(self, img, boxes, labels):
"""
Args:
img: the output of cv.imread in RGB layout.
boxes: boundding boxes in the form of (x1, y1, x2, y2).
labels: labels of boxes.
"""
return self.augment(img, boxes, labels)
class TestTransform:
def __init__(self, size, mean=0.0, std=1.0):
self.transform = Compose([
ToPercentCoords(),
Resize(size),
SubtractMeans(mean),
lambda img, boxes=None, labels=None: (img / std, boxes, labels),
ToTensor(),
])
def __call__(self, image, boxes, labels):
return self.transform(image, boxes, labels)
class PredictionTransform:
def __init__(self, size, mean=0.0, std=1.0):
self.transform = Compose([
Resize(size),
SubtractMeans(mean),
lambda img, boxes=None, labels=None: (img / std, boxes, labels),
ToTensor()
])
def __call__(self, image):
image, _, _ = self.transform(image)
return image
\ No newline at end of file
# from https://github.com/amdegroot/ssd.pytorch
import torch
from torchvision import transforms
import cv2
import numpy as np
import types
from numpy import random
def intersect(box_a, box_b):
max_xy = np.minimum(box_a[:, 2:], box_b[2:])
min_xy = np.maximum(box_a[:, :2], box_b[:2])
inter = np.clip((max_xy - min_xy), a_min=0, a_max=np.inf)
return inter[:, 0] * inter[:, 1]
def jaccard_numpy(box_a, box_b):
"""Compute the jaccard overlap of two sets of boxes. The jaccard overlap
is simply the intersection over union of two boxes.
E.g.:
A ∩ B / A ∪ B = A ∩ B / (area(A) + area(B) - A ∩ B)
Args:
box_a: Multiple bounding boxes, Shape: [num_boxes,4]
box_b: Single bounding box, Shape: [4]
Return:
jaccard overlap: Shape: [box_a.shape[0], box_a.shape[1]]
"""
inter = intersect(box_a, box_b)
area_a = ((box_a[:, 2]-box_a[:, 0]) *
(box_a[:, 3]-box_a[:, 1])) # [A,B]
area_b = ((box_b[2]-box_b[0]) *
(box_b[3]-box_b[1])) # [A,B]
union = area_a + area_b - inter
return inter / union # [A,B]
class Compose(object):
"""Composes several augmentations together.
Args:
transforms (List[Transform]): list of transforms to compose.
Example:
>>> augmentations.Compose([
>>> transforms.CenterCrop(10),
>>> transforms.ToTensor(),
>>> ])
"""
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, img, boxes=None, labels=None):
for t in self.transforms:
img, boxes, labels = t(img, boxes, labels)
return img, boxes, labels
class Lambda(object):
"""Applies a lambda as a transform."""
def __init__(self, lambd):
assert isinstance(lambd, types.LambdaType)
self.lambd = lambd
def __call__(self, img, boxes=None, labels=None):
return self.lambd(img, boxes, labels)
class ConvertFromInts(object):
def __call__(self, image, boxes=None, labels=None):
return image.astype(np.float32), boxes, labels
class SubtractMeans(object):
def __init__(self, mean):
self.mean = np.array(mean, dtype=np.float32)
def __call__(self, image, boxes=None, labels=None):
image = image.astype(np.float32)
image -= self.mean
return image.astype(np.float32), boxes, labels
class ToAbsoluteCoords(object):
def __call__(self, image, boxes=None, labels=None):
height, width, channels = image.shape
boxes[:, 0] *= width
boxes[:, 2] *= width
boxes[:, 1] *= height
boxes[:, 3] *= height
return image, boxes, labels
class ToPercentCoords(object):
def __call__(self, image, boxes=None, labels=None):
height, width, channels = image.shape
boxes[:, 0] /= width
boxes[:, 2] /= width
boxes[:, 1] /= height
boxes[:, 3] /= height
return image, boxes, labels
class Resize(object):
def __init__(self, size=300):
self.size = size
def __call__(self, image, boxes=None, labels=None):
if type(self.size) is list :
image = cv2.resize(image, (self.size[0],
self.size[1]))
else:
image = cv2.resize(image, (self.size,
self.size))
return image, boxes, labels
class RandomSaturation(object):
def __init__(self, lower=0.5, upper=1.5):
self.lower = lower
self.upper = upper
assert self.upper >= self.lower, "contrast upper must be >= lower."
assert self.lower >= 0, "contrast lower must be non-negative."
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
image[:, :, 1] *= random.uniform(self.lower, self.upper)
return image, boxes, labels
class RandomHue(object):
def __init__(self, delta=18.0):
assert delta >= 0.0 and delta <= 360.0
self.delta = delta
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
image[:, :, 0] += random.uniform(-self.delta, self.delta)
image[:, :, 0][image[:, :, 0] > 360.0] -= 360.0
image[:, :, 0][image[:, :, 0] < 0.0] += 360.0
return image, boxes, labels
class RandomLightingNoise(object):
def __init__(self):
self.perms = ((0, 1, 2), (0, 2, 1),
(1, 0, 2), (1, 2, 0),
(2, 0, 1), (2, 1, 0))
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
swap = self.perms[random.randint(len(self.perms))]
shuffle = SwapChannels(swap) # shuffle channels
image = shuffle(image)
return image, boxes, labels
class ConvertColor(object):
def __init__(self, current, transform):
self.transform = transform
self.current = current
def __call__(self, image, boxes=None, labels=None):
if self.current == 'BGR' and self.transform == 'HSV':
image = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
elif self.current == 'RGB' and self.transform == 'HSV':
image = cv2.cvtColor(image, cv2.COLOR_RGB2HSV)
elif self.current == 'BGR' and self.transform == 'RGB':
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
elif self.current == 'HSV' and self.transform == 'BGR':
image = cv2.cvtColor(image, cv2.COLOR_HSV2BGR)
elif self.current == 'HSV' and self.transform == "RGB":
image = cv2.cvtColor(image, cv2.COLOR_HSV2RGB)
else:
raise NotImplementedError
return image, boxes, labels
class RandomContrast(object):
def __init__(self, lower=0.5, upper=1.5):
self.lower = lower
self.upper = upper
assert self.upper >= self.lower, "contrast upper must be >= lower."
assert self.lower >= 0, "contrast lower must be non-negative."
# expects float image
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
alpha = random.uniform(self.lower, self.upper)
image *= alpha
return image, boxes, labels
class RandomBrightness(object):
def __init__(self, delta=32):
assert delta >= 0.0
assert delta <= 255.0
self.delta = delta
def __call__(self, image, boxes=None, labels=None):
if random.randint(2):
delta = random.uniform(-self.delta, self.delta)
image += delta
return image, boxes, labels
class ToCV2Image(object):
def __call__(self, tensor, boxes=None, labels=None):
return tensor.cpu().numpy().astype(np.float32).transpose((1, 2, 0)), boxes, labels
class ToTensor(object):
def __call__(self, cvimage, boxes=None, labels=None):
return torch.from_numpy(cvimage.astype(np.float32)).permute(2, 0, 1), boxes, labels
class RandomSampleCrop(object):
"""Crop
Arguments:
img (Image): the image being input during training
boxes (Tensor): the original bounding boxes in pt form
labels (Tensor): the class labels for each bbox
mode (float tuple): the min and max jaccard overlaps
Return:
(img, boxes, classes)
img (Image): the cropped image
boxes (Tensor): the adjusted bounding boxes in pt form
labels (Tensor): the class labels for each bbox
"""
def __init__(self):
self.sample_options = (
# using entire original input image
None,
# sample a patch s.t. MIN jaccard w/ obj in .1,.3,.4,.7,.9
(0.1, None),
(0.3, None),
(0.7, None),
(0.9, None),
# randomly sample a patch
(None, None),
)
def __call__(self, image, boxes=None, labels=None):
height, width, _ = image.shape
while True:
# randomly choose a mode
mode = random.choice(self.sample_options)
if mode is None:
return image, boxes, labels
min_iou, max_iou = mode
if min_iou is None:
min_iou = float('-inf')
if max_iou is None:
max_iou = float('inf')
# max trails (50)
for _ in range(50):
current_image = image
w = random.uniform(0.3 * width, width)
h = random.uniform(0.3 * height, height)
# aspect ratio constraint b/t .5 & 2
if h / w < 0.5 or h / w > 2:
continue
left = random.uniform(width - w)
top = random.uniform(height - h)
# convert to integer rect x1,y1,x2,y2
rect = np.array([int(left), int(top), int(left+w), int(top+h)])
# calculate IoU (jaccard overlap) b/t the cropped and gt boxes
overlap = jaccard_numpy(boxes, rect)
# is min and max overlap constraint satisfied? if not try again
if overlap.min() < min_iou and max_iou < overlap.max():
continue
# cut the crop from the image
current_image = current_image[rect[1]:rect[3], rect[0]:rect[2],
:]
# keep overlap with gt box IF center in sampled patch
centers = (boxes[:, :2] + boxes[:, 2:]) / 2.0
# mask in all gt boxes that above and to the left of centers
m1 = (rect[0] < centers[:, 0]) * (rect[1] < centers[:, 1])
# mask in all gt boxes that under and to the right of centers
m2 = (rect[2] > centers[:, 0]) * (rect[3] > centers[:, 1])
# mask in that both m1 and m2 are true
mask = m1 * m2
# have any valid boxes? try again if not
if not mask.any():
continue
# take only matching gt boxes
current_boxes = boxes[mask, :].copy()
# take only matching gt labels
current_labels = labels[mask]
# should we use the box left and top corner or the crop's
current_boxes[:, :2] = np.maximum(current_boxes[:, :2],
rect[:2])
# adjust to crop (by substracting crop's left,top)
current_boxes[:, :2] -= rect[:2]
current_boxes[:, 2:] = np.minimum(current_boxes[:, 2:],
rect[2:])
# adjust to crop (by substracting crop's left,top)
current_boxes[:, 2:] -= rect[:2]
return current_image, current_boxes, current_labels
class Expand(object):
def __init__(self, mean):
self.mean = mean
def __call__(self, image, boxes, labels):
if random.randint(2):
return image, boxes, labels
height, width, depth = image.shape
ratio = random.uniform(1, 4)
left = random.uniform(0, width*ratio - width)
top = random.uniform(0, height*ratio - height)
expand_image = np.zeros(
(int(height*ratio), int(width*ratio), depth),
dtype=image.dtype)
expand_image[:, :, :] = self.mean
expand_image[int(top):int(top + height),
int(left):int(left + width)] = image
image = expand_image
boxes = boxes.copy()
boxes[:, :2] += (int(left), int(top))
boxes[:, 2:] += (int(left), int(top))
return image, boxes, labels
class RandomMirror(object):
def __call__(self, image, boxes, classes):
_, width, _ = image.shape
if random.randint(2):
image = image[:, ::-1]
boxes = boxes.copy()
boxes[:, 0::2] = width - boxes[:, 2::-2]
return image, boxes, classes
class SwapChannels(object):
"""Transforms a tensorized image by swapping the channels in the order
specified in the swap tuple.
Args:
swaps (int triple): final order of channels
eg: (2, 1, 0)
"""
def __init__(self, swaps):
self.swaps = swaps
def __call__(self, image):
"""
Args:
image (Tensor): image tensor to be transformed
Return:
a tensor with channels swapped according to swap
"""
# if torch.is_tensor(image):
# image = image.data.cpu().numpy()
# else:
# image = np.array(image)
image = image[:, :, self.swaps]
return image
class PhotometricDistort(object):
def __init__(self):
self.pd = [
RandomContrast(), # RGB
ConvertColor(current="RGB", transform='HSV'), # HSV
RandomSaturation(), # HSV
RandomHue(), # HSV
ConvertColor(current='HSV', transform='RGB'), # RGB
RandomContrast() # RGB
]
self.rand_brightness = RandomBrightness()
self.rand_light_noise = RandomLightingNoise()
def __call__(self, image, boxes, labels):
im = image.copy()
im, boxes, labels = self.rand_brightness(im, boxes, labels)
if random.randint(2):
distort = Compose(self.pd[:-1])
else:
distort = Compose(self.pd[1:])
im, boxes, labels = distort(im, boxes, labels)
return self.rand_light_noise(im, boxes, labels)
/media/ducanh/DATA/tienln/data/human/use/wider_person/annotations+/media/ducanh/DATA/tienln/data/human/use/wider_person/images
/media/ducanh/DATA/tienln/data/human/use/crowd_human/origin_annotations/train+/media/ducanh/DATA/tienln/data/human/use/crowd_human/images
/media/ducanh/DATA/tienln/data/MSCOCO/2017/annotations/train+/media/ducanh/DATA/tienln/data/MSCOCO/2017/train2017
/media/ducanh/DATA/tienln/data/VOC2012/person_json_annotations+/media/ducanh/DATA/tienln/data/VOC2012/JPEGImages
/media/ducanh/DATA/tienln/data/human/No/cityperson/cityperson/cleaned_json_annotations+/media/ducanh/DATA/tienln/data/human/No/cityperson/cityperson/images
/media/ducanh/DATA/tienln/data/human/No/ECP/ecp/json_annotations+/media/ducanh/DATA/tienln/data/human/No/ECP/ecp/images
/media/ducanh/DATA/tienln/data/human/use/crowd_human/origin_annotations/val1+/media/ducanh/DATA/tienln/data/human/use/crowd_human/images
```
mb2-ssd-lite_f38: verson for head
```
# Model
* The person detection model based on SSD architecture
* There are 3 blocks: Backbone, Extralayers, Detection head
## Backbone
* The extractor is used: Mobilenet
* The original Mobilenet-V2 can achhive high accuracy (use [pre-trained](https://storage.googleapis.com/models-hao/mb2-ssd-lite-mp-0_686.pth) from VOC dataset). However the running time also high
* The tiny Mobilenet-V2 is customed Mobilenet-V2 (vertical, horizontal).
```
The backbone can be customed at: ./module/mobilent_v2.py
```
## Extra layers
* Multi-scale feature maps for detection
* The SSD architecture uses multiple layers (multi-scale feature maps) to detect objects independently. As CNN reduces the spatial dimension gradually, the resolution of the feature maps also decrease. SSD uses lower resolution layers to detect larger scale objects and vice versa. For example, the 4× 4 feature maps are used for larger scale object.
* Because the model person detection is used for surveillance camera (detect samll, medium bojects) so that the small feature maps do not contains lots of information. Thus, this model is eliminated 2 last feature maps
* To the head dection use feature maps: 38, 19, 8, 4 . The resolution of feature map depending on the the input of network.
* Note that: The high resolution of input network is not synonymous with the good results (Experiments)
```
The Extra Layers can be customed at: ./model/mb_ssd_lite_f38.py and ./module/ssd.py
```
**Change feature map**
Step_1: file 'mb_ssd_lite_f38_config.py' at line 27, to get feature map from backbone
Step_2: To maching the depth of Extra layers with detection head. change in chanel at 'mb_ssd_lite_f38_config.py' (regression_headers, classification_headers)
for instance:
- feature map size 38, output depth of 192. so that, at line 33 have to assign in_channels= 192
**Change number of anchor boxes**
Step_1: file 'mb_ssd_lite_f38_config.py', change number of anchor boxes for each grid on feature map
Step_2: To maching the number of anchor boxes: change 'anchors list' line 23
## Detection head
* Regression head (location) and classification header (classification)
* The most impotant factor in this component is anchor boxes (whith 3 parametert can obtimize scale, ratio, number of anchor per gid cell )
* The anchor boxes are defined in [] based on the COCO, VOC dataset with many objects as well as ratio, size...
* In [], the author proposed formula to generate anchors boxes. However, this formula is designed for many object catagory , size... For instance, the proposed scale factor in [] are [0.2, 0.9] which can be suitable for COCO dataset. However, this factor does not give good results... As mentioned above, the objects in surveillance applivation distribute small, medium and rarely large. Therefor the scale should have distribution at smaller than 0.5
* Ratio: the ration between height/width of objects. statistics to get this factor. Code is available [here]()
```
Anchor boxes: ./model/config/mb_ssd_lite_f38_config.py and ./module/ssd.py
```
## Loss
* Localization oss: Use smooth_l1_loss
* Classification loss: Use focal loss instead of CE loss to to address the issue of the class imbalance problem (person/background)
# Dataset
* There are 3 main dataset: Crowd-Human- 15k and Wider-face- 16k, brainwash-11k
* SCUTB (valid)
# Requirements
* anaconda
* Python-3.6
* Pytorch-1.2
* Torchvision-0.4
* opencv-python-
* pandas
# Training
* Optimizer: SGD, with weight decay: 5e-4, batch size: 32, Number of echop:150
* Data augmentation:
```
python train.py, type_network 'mb2-ssd-lite_f38'
```
# Testing
```
Folder image: python detect_imgs.py --net_type <model_path> --test_path <path_dir_image>
video: python live_demo.py --model_path <path_network>
```
# ModelParameter and results
* Input: (300, 300)
* Feature maps: 38-38, 19-19, 10-10, 5-5
* Step (shrinkage): 8, 16, 32, 64
* Scale (box size): (16-32), (32-64),(64,128),(128,256)
* Ratios: 1.7
## Pytorch model (mb_ssd_lite_f38_150_193_14)
| input network | parameter | FLOPs | Miss rate | mAP | Running time | Model weight |
| :-----------: | :-------: | :---: | :-------: | :----: | :----------: | :-------------------------------------------------------------------------------------: |
| 300x300 | 2.7 M | 1.2 G | 7% | 90.02% | 39ms | [Weight](http://192.168.0.232:8929/tienln4/ai_camera_detector/-/tree/master/app%2Fhead) |
## dlc model:
* dlc and quantized dlc
* Total parameters: 2750864
* Memory Needed to Run: 345.9 MiB
* Total MACs per inference: 618M (100%)
## Các vấn đề gặp phải
- Kích thước ảnh lớn không đồng nghĩa với việc model sẽ cho kết quả tốt hơn, việc tăng kích thước đầu vào của model giúp model tạo ra các feature maps lớn--> detect các objects bé hơn
- Giảm kích thước model bằng việc giảm độ sâu model (depth chanel) sẽ bị đánh đổi về độ chính xác rất lớn thay vào đó muốn cân đối kích thước mô hình thì thực hiện thay đổi các feature map trong Extra layers, Số lượng anchors box
- Trong quá trính thiết kế anchor boxes, Nếu áp dụng công thức tính scale (Trong code đặt là box_size) trong anchor boxes trong bài báo gốc (SSD) không đem lại kết quả tốt cho một số trường hợp cụ thể (Ví dụ như bài toán detect đầu hay người với object bé). Cụ thể, trong bài báo tác giả đề xuất scale nằm trong dải 0.2 đến 0.9 với số lượng tùy ý (Muốn đạt được kết quả cao thì thiết kế nhiều đồng thời đánh đổi về tốc độ), tuy nhiên dải đấy mang tính tổng quát, vì trong bộ COCO chứa rất ít các objects bé (15x15 pixel). Do đó, Để thực hiện với các object bé cần chọn giải phân bố của scale khác. Không có một nghiên cứu nào đề xuất ra phương pháp chọn scale phù hợp (Lựa chọn theo kinh nghiệm thực tế). Do đó, để lựa chọn dải của scale cần thực hiện: Lựa chọn dải kích thước mà ứng dụng cần nhắm tới (đầu 15x15 pixel) thì min của scale 15/300 (300 là kích thước ảnh đầu vào). Lưu lý không phân bố đều dải này từ 0-1 mà chủ yếu rtập trung khoảng 0-0.5
- Một tham số quan trong nữa trong anchor boxes là ratio: Đối với đầu thì CAM đặt góc nào thì tỉ lệ chiều cao đầu với chiều rộng không thay đổi nhiều 1.3-1.7. tuy nhiên, Bài toán phát hiện người thì phụ nhiều vào góc lắp Camera, người ngồi, đứng, nằm...thì tỉ lệ này dao động rất lớn (config cho mdel nhiềù ratio thì model hoạt động tốt, tuy nhiên bị đánh dổi về tốc độ- trade off lựa chọn 3 tỉ lệ đối với person và 1 đối với head). Để lựa chọn được tỉ lệ phù hợp cần thực hiện thống kê data bằng việc phân cụm các tỉ lệ (code: )
- Khi thực hiện thay dổi dẫn đến thay dổi số lượng anchor thì cũng cần thay đổi các thông số model để maching. Cụ thể như sau:
### Thay dổi bên Extra layer:
- Lựa chọn các kích thước của feature map từ backbone:
```
source_layer_indexes = [GraphPath(7, 'conv', 3),GraphPath(14, 'conv', 3),19,] line 27 at /media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/model/mb_ssd_lite_f38.py
7 hoặc 14: Thể hiện block thứ 7 hoặc 14 trong backbone
'conv', 3: thể thể hiện lấy từ lớp thứ 3 (conv) trong khối đó
Tóm lại: GraphPath(7, 'conv', 3) có nghĩa là feature map được lấy ra từ khối thứ 7 và lớp thứ 3 trong khối đó,GraphPath(14, 'conv', 3) có nghĩa là feature map được lấy ra từ khối thứ 14 và lớp thứ 3 trong khối đó; 19 có nghĩa là lấy lớp conv cuối cùng của block đó. Để xem các layer trong block chỉ cần print(backbone)
```
- Khi thực hiện thay đổ các feature map đãn đến mất khớp (No mactching) trong detection head (regression_headers, classification_headers): cuj thể là biến 'in_channels' trong hai khối regression_headers, classification_headers. Để thực hiện điền đúng cần in backbone để xem chính các output chanel là bao nhiêu để điền đúng ờ regression_headers, classification_headers
### Thay đổi số lượng anchor
- Việc thay đổi số lượng anchor cho mỗi grid trên feature map (file config). Cần thay đổi trên model để các thông số khớp với nhau: Cụ thể chỉ cần thay đổi giá trị của list 'anchor' line 23 at /media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/model/mb_ssd_lite_f38.py. Giá trị của list chính là số lượng anchor boxes trên mỗi grid của feature map
- Ví dụ: trong file config số lượng anhor cho mỗi grid là 2 thì giá trị của list 'anchor' line 23 cũng là 2.
### Data
- Đổi với head detection: Data tập trung vào face nên model hoạt động tốt nhất cho face, kém đổi với đầu phía sau và camera đặt góc âm
- Đối với bài toán: Person detection thì data chủ yếu full, thiếú nửa thân.
- Chú ý: Để mdel học tốt trong quá trình trainiing thì cần lọc các ảnh có kích thước quá bé (10 pixel đối với đầu và 15 đối với người)
File added
```
rfb_tiny_mb2_ssd: ver_c32 fast; ver_c64 slow (change at /media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/module/rfb_tiny_mobilenet_v2.py 'self.base_channel')
mb2-ssd-lite_f38_person: for small objects
```
# Model
* The person detection model based on SSD architecture
* There are 3 blocks: Backbone, Extralayers, Detection head
## Backbone
* The extractor is used: tiny [Mobilenet-V2]() + [RFB]() component
* The original Mobilenet-V2 can achhive high accuracy (use [pre-trained](https://storage.googleapis.com/models-hao/mb2-ssd-lite-mp-0_686.pth) from VOC dataset). However the running time also high
* The tiny Mobilenet-V2 is customed Mobilenet-V2 (vertical, horizontal).
```
The backbone can be customed at: ./module/rfb_tiny_mobilenet_v2.py
```
## Extra layers
* Multi-scale feature maps for detection
* The SSD architecture uses multiple layers (multi-scale feature maps) to detect objects independently. As CNN reduces the spatial dimension gradually, the resolution of the feature maps also decrease. SSD uses lower resolution layers to detect larger scale objects and vice versa. For example, the 4× 4 feature maps are used for larger scale object.
* Because the model person detection is used for surveillance camera (detect samll, medium bojects) so that the small feature maps do not contains lots of information. Thus, this model is eliminated 2 last feature maps
* To the person dection use feature maps: 40-30, 20-15, 10-8, 5-4 (width-height). The resolution of feature map depending on the the input of network.
* Note that: The high resolution of input network is not synonymous with the good results
```
The Extra Layers can be customed at: ./model/rfb_tiny_mb_ssd.py and ./module/ssd.py
```
## Detection head
* Regression head (location) and classification header (classification)
* The most impotant factor in this component is anchor boxes (whith 3 parametert can obtimize scale, ratio, number of anchor per gid cell )
* The anchor boxes are defined in [] based on the COCO, VOC dataset with many objects as well as ratio, size...
* In [], the author proposed formula to generate anchors boxes. However, this formula is designed for many object catagory , size... For instance, the proposed scale factor in [] are [0.2, 0.9] which can be suitable for COCO dataset. However, this factor does not give good results... As mentioned above, the objects in surveillance applivation distribute small, medium and rarely large. Therefor the scale should have distribution at smaller than 0.5
* Ratio: the ration between height/width of objects. statistics to get this factor. Code is available [here]()
```
Anchor boxes: ./model/config/rfb_tiny_mb_ssd_config.py and ./model/rfb_tiny_mb_ssd.py
```
## Loss
* Localization oss: Use smooth_l1_loss
* Classification loss: Use focal loss instead of CE loss to to address the issue of the class imbalance problem (person/background)
# Dataset
* There are 2 main dataset: Crowd-Human- 15k and Wider-Person-8k (both full box).
* COCO person (contain noise, fail annotations)
* Cleaned City person, Eure city person: eliminate boxes which has (area box)/(area image) >
# Requirements
* anaconda
* Python-3.6
* Pytorch-1.2
* Torchvision-0.4
* opencv-python-
* pandas
# Training
* Optimizer: SGD, with weight decay: 5e-4, batch size: 32, Number of echop:
* Training with batch size: 32
* Data augmentation:
```
python train.py, type_network rfb_tiny_mb2_ssd, setting base_channel = 64 with ver-1 or base_channel = 32 with ver2
```
# Model parameter and results
* Input: (320, 240)
* Feature maps: 40-30, 20-15, 10-8, 5-4
* Step (shrinkage): 8-8, 16-16, 32-30, 64-60
* Scale (box size): (10, 16, 24), (32, 48), (64, 96), (128, 192, 256)
* Ratios: 2.21, 2.47, 2.73
* base_channel: Ver1(rfb_tiny_mb2_ssd_c64): 32, Ver2(rfb_tiny_mb2_ssd_c32): 64
## Pytorch model
| input network | parameter | FLOPs | Miss rate | AP | Running time | Model weight |
| :-----------: | :-------: | :---: | :-------: | :---: | :----------: | :-------------------------------------------------------------------------------------------------------------------: |
| 320x240 (v1) | 3.9 M | 2.7 G | 7.7% | 88% | 48ms | [Ver1 weight](http://192.168.0.232:8929/tienln4/ai_camera_detector/-/tree/master/app%2Fperson%2Frfb_tiny_mb2_ssd_c64) |
| 320x240 (v2) | 1,2 M | 0.8G | 12.5%% | 84% | 24ms | [Ver2 weight](http://192.168.0.232:8929/tienln4/ai_camera_detector/-/tree/master/app%2Fperson%2Frfb_tiny_mb2_ssd_c32) |
## dlc model:
### Ver1
* dlc and quantized dlc
* Total parameters: 3937476
* Total MACs per inference: 1365M
* Memory Needed to Run: 332.0
### Ver2
* dlc and quantized dlc
* Total parameters: 1147452
* Total MACs per inference: 369M
* Memory Needed to Run: 171
## Các vấn đề gặp phải
- Kích thước ảnh lớn không đồng nghĩa với việc model sẽ cho kết quả tốt hơn, việc tăng kích thước đầu vào của model giúp model tạo ra các feature maps lớn--> detect các objects bé hơn
- Giảm kích thước model bằng việc giảm độ sâu model (depth chanel) sẽ bị đánh đổi về độ chính xác rất lớn thay vào đó muốn cân đối kích thước mô hình thì thực hiện thay đổi các feature map trong Extra layers, Số lượng anchors box
- Trong quá trính thiết kế anchor boxes, Nếu áp dụng công thức tính scale (Trong code đặt là box_size) trong anchor boxes trong bài báo gốc (SSD) không đem lại kết quả tốt cho một số trường hợp cụ thể (Ví dụ như bài toán detect đầu hay người với object bé). Cụ thể, trong bài báo tác giả đề xuất scale nằm trong dải 0.2 đến 0.9 với số lượng tùy ý (Muốn đạt được kết quả cao thì thiết kế nhiều đồng thời đánh đổi về tốc độ), tuy nhiên dải đấy mang tính tổng quát, vì trong bộ COCO chứa rất ít các objects bé (15x15 pixel). Do đó, Để thực hiện với các object bé cần chọn giải phân bố của scale khác. Không có một nghiên cứu nào đề xuất ra phương pháp chọn scale phù hợp (Lựa chọn theo kinh nghiệm thực tế). Do đó, để lựa chọn dải của scale cần thực hiện: Lựa chọn dải kích thước mà ứng dụng cần nhắm tới (đầu 15x15 pixel) thì min của scale 15/300 (300 là kích thước ảnh đầu vào). Lưu lý không phân bố đều dải này từ 0-1 mà chủ yếu rtập trung khoảng 0-0.5
- Một tham số quan trong nữa trong anchor boxes là ratio: Đối với đầu thì CAM đặt góc nào thì tỉ lệ chiều cao đầu với chiều rộng không thay đổi nhiều 1.3-1.7. tuy nhiên, Bài toán phát hiện người thì phụ nhiều vào góc lắp Camera, người ngồi, đứng, nằm...thì tỉ lệ này dao động rất lớn (config cho mdel nhiềù ratio thì model hoạt động tốt, tuy nhiên bị đánh dổi về tốc độ- trade off lựa chọn 3 tỉ lệ đối với person và 1 đối với head). Để lựa chọn được tỉ lệ phù hợp cần thực hiện thống kê data bằng việc phân cụm các tỉ lệ (code: )
- Khi thực hiện thay dổi dẫn đến thay dổi số lượng anchor thì cũng cần thay đổi các thông số model để maching. Cụ thể như sau:
### Thay dổi bên Extra layer:
- Lựa chọn các kích thước của feature map từ backbone:
```
source_layer_indexes = [GraphPath(7, 'conv', 3),GraphPath(14, 'conv', 3),19,] line 27 at /media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/model/mb_ssd_lite_f38.py
7 hoặc 14: Thể hiện block thứ 7 hoặc 14 trong backbone
'conv', 3: thể thể hiện lấy từ lớp thứ 3 (conv) trong khối đó
Tóm lại: GraphPath(7, 'conv', 3) có nghĩa là feature map được lấy ra từ khối thứ 7 và lớp thứ 3 trong khối đó,GraphPath(14, 'conv', 3) có nghĩa là feature map được lấy ra từ khối thứ 14 và lớp thứ 3 trong khối đó; 19 có nghĩa là lấy lớp conv cuối cùng của block đó. Để xem các layer trong block chỉ cần print(backbone)
```
- Khi thực hiện thay đổ các feature map đãn đến mất khớp (No mactching) trong detection head (regression_headers, classification_headers): cuj thể là biến 'in_channels' trong hai khối regression_headers, classification_headers. Để thực hiện điền đúng cần in backbone để xem chính các output chanel là bao nhiêu để điền đúng ờ regression_headers, classification_headers
### Thay đổi số lượng anchor
- Việc thay đổi số lượng anchor cho mỗi grid trên feature map (file config). Cần thay đổi trên model để các thông số khớp với nhau: Cụ thể chỉ cần thay đổi giá trị của list 'anchor' line 23 at /media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/model/mb_ssd_lite_f38.py. Giá trị của list chính là số lượng anchor boxes trên mỗi grid của feature map
- Ví dụ: trong file config số lượng anhor cho mỗi grid là 2 thì giá trị của list 'anchor' line 23 cũng là 2.
### Data
- Đổi với head detection: Data tập trung vào face nên model hoạt động tốt nhất cho face, kém đổi với đầu phía sau và camera đặt góc âm
- Đối với bài toán: Person detection thì data chủ yếu full, thiếú nửa thân.
- Chú ý: Để mdel học tốt trong quá trình trainiing thì cần lọc các ảnh có kích thước quá bé (10 pixel đối với đầu và 15 đối với người)
\ No newline at end of file
File added
# from typing import List
# import numpy as np
# import torch
# import itertools
# import math
# import collections
# image_size = 300
# image_mean = np.array([127, 127, 127]) # RGB layout
# image_std = 128.0
# iou_threshold = 0.3
# center_variance = 0.1
# size_variance = 0.2
# SSDBoxSizes = collections.namedtuple('SSDBoxSizes', ['min', 'max'])
# SSDSpec = collections.namedtuple('SSDSpec', ['feature_map_size', 'shrinkage', 'box_sizes', 'aspect_ratios'])
# #COCO
# specs = [
# SSDSpec(19, 16, SSDBoxSizes(21, 45), [5.5, 6.7, 7.8]),
# SSDSpec(10, 32, SSDBoxSizes(45, 99), [5.5, 6.7, 7.8]),
# SSDSpec(5, 64, SSDBoxSizes(99, 153), [5.5, 6.7, 7.8]),
# SSDSpec(3, 100, SSDBoxSizes(153, 207), [5.5, 6.7, 7.8]),
# SSDSpec(2, 150, SSDBoxSizes(207, 261), [5.5, 6.7, 7.8]),
# SSDSpec(1, 300, SSDBoxSizes(261, 315), [5.5, 6.7, 7.8])
# ]
# #VOC
# # specs = [
# # SSDSpec(19, 16, SSDBoxSizes(30, 60), [5.5, 6.7, 7.8]),
# # SSDSpec(10, 32, SSDBoxSizes(60, 111), [5.5, 6.7, 7.8]),
# # SSDSpec(5, 64, SSDBoxSizes(111, 162), [5.5, 6.7, 7.8]),
# # SSDSpec(3, 100, SSDBoxSizes(162, 213), [5.5, 6.7, 7.8]),
# # SSDSpec(2, 150, SSDBoxSizes(213, 264), [5.5, 6.7, 7.8]),
# # SSDSpec(1, 300, SSDBoxSizes(264, 315), [5.5, 6.7, 7.8])
# # ]
# def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True) -> torch.Tensor:
# priors = []
# for spec in specs:
# scale = image_size / spec.shrinkage
# for j, i in itertools.product(range(spec.feature_map_size), repeat=2):
# x_center = (i + 0.5) / scale
# y_center = (j + 0.5) / scale
# for ratio in spec.aspect_ratios:
# ratio = math.sqrt(ratio)
# # small sized square box
# size = spec.box_sizes.min
# h = w = size / image_size
# priors.append([
# x_center,
# y_center,
# w,
# h*ratio
# ])
# # big sized square box
# size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min)
# h = w = size / image_size
# priors.append([
# x_center,
# y_center,
# w,
# h*ratio
# ])
# priors = torch.tensor(priors)
# # print('Number of: ',priors.shape)
# if clamp:
# torch.clamp(priors, 0.0, 1.0, out=priors)
# return priors
# priors = generate_ssd_priors(specs, image_size)
from typing import List
import numpy as np
import torch
import itertools
import math
import collections
image_size = 300
image_mean = np.array([127, 127, 127]) # RGB layout
image_std = 128.0
iou_threshold = 0.6
center_variance = 0.1
size_variance = 0.2
SSDBoxSizes = collections.namedtuple('SSDBoxSizes', ['min', 'max'])
SSDSpec = collections.namedtuple('SSDSpec', ['feature_map_size', 'shrinkage', 'box_sizes', 'aspect_ratios'])
specs = [
SSDSpec(38, 8, SSDBoxSizes(16, 32), [1.7]),
SSDSpec(19, 16, SSDBoxSizes(32, 64), [1.7]),
SSDSpec(10, 32, SSDBoxSizes(64, 128), [1.7]),
SSDSpec(5, 64, SSDBoxSizes(128, 256), [1.7]),
]
def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True) -> torch.Tensor:
priors = []
for spec in specs:
scale = image_size / spec.shrinkage
for j, i in itertools.product(range(spec.feature_map_size), repeat=2):
x_center = (i + 0.5) / scale
y_center = (j + 0.5) / scale
for ratio in spec.aspect_ratios:
ratio = math.sqrt(ratio)
# small sized square box
size = spec.box_sizes.min
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h*ratio
])
# big sized square box
size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min)
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h*ratio
])
priors = torch.tensor(priors)
if clamp:
torch.clamp(priors, 0.0, 1.0, out=priors)
return priors
priors = generate_ssd_priors(specs, image_size)
\ No newline at end of file
from typing import List
import numpy as np
import torch
import itertools
import math
import collections
image_size = 300
image_mean = np.array([127, 127, 127]) # RGB layout
image_std = 128.0
iou_threshold = 0.3
center_variance = 0.1
size_variance = 0.2
SSDBoxSizes = collections.namedtuple('SSDBoxSizes', ['min', 'max'])
SSDSpec = collections.namedtuple('SSDSpec', ['feature_map_size', 'shrinkage', 'box_sizes', 'aspect_ratios'])
specs = [
SSDSpec(38, 8, SSDBoxSizes(10, 16), [5.5, 6.7, 7.8]),
SSDSpec(19, 16, SSDBoxSizes(24, 34), [5.5, 6.7, 7.8]),
SSDSpec(10, 30, SSDBoxSizes(46, 60), [5.5, 6.7, 7.8]),
SSDSpec(5, 60, SSDBoxSizes(76, 92), [5.5, 6.7, 7.8]),
SSDSpec(3, 100, SSDBoxSizes(110, 128), [5.5, 6.7, 7.8]),
SSDSpec(2, 150, SSDBoxSizes(156, 194), [5.5, 6.7, 7.8]),
SSDSpec(1, 300, SSDBoxSizes(248, 300), [5.5, 6.7, 7.8]),
]
def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True) -> torch.Tensor:
priors = []
for spec in specs:
scale = image_size / spec.shrinkage
for j, i in itertools.product(range(spec.feature_map_size), repeat=2):
x_center = (i + 0.5) / scale
y_center = (j + 0.5) / scale
for ratio in spec.aspect_ratios:
ratio = math.sqrt(ratio)
# small sized square box
size = spec.box_sizes.min
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h*ratio
])
# big sized square box
# size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min)
size = spec.box_sizes.max
h = w = size / image_size
priors.append([
x_center,
y_center,
w,
h*ratio
])
priors = torch.tensor(priors)
print('number', priors.shape)
if clamp:
torch.clamp(priors, 0.0, 1.0, out=priors)
return priors
priors = generate_ssd_priors(specs, image_size)
\ No newline at end of file
import numpy as np
import torch
image_size = [320, 240]
image_mean_test = image_mean = np.array([127, 127, 127])
image_std = 128.0
iou_threshold = 0.3
center_variance = 0.1
size_variance = 0.2
def generate_priors(size):
shrinkage_list = []
priors = []
min_boxes = [[10, 16, 24], [32, 48], [64, 96], [128, 192, 256]]
feature_map_list = [[40, 20, 10, 5], [30, 15, 8, 4]]
ratios = [1.7, 1.9, 2.1]
for i in range(0, len(image_size)):
item_list = []
for k in range(0, len(feature_map_list[i])):
item_list.append(image_size[i] / feature_map_list[i][k])
shrinkage_list.append(item_list)
for index in range(0, len(feature_map_list[0])):
scale_w = image_size[0] / shrinkage_list[0][index]
scale_h = image_size[1] / shrinkage_list[1][index]
for j in range(0, feature_map_list[1][index]):
for i in range(0, feature_map_list[0][index]):
x_center = (i + 0.5) / scale_w
y_center = (j + 0.5) / scale_h
for min_box in min_boxes[index]:
for ratio in ratios:
w = min_box / image_size[0]
h = min_box / image_size[1]
priors.append([
x_center,
y_center,
w,
h*ratio
])
# print("priors nums:{}".format(len(priors)))
priors = torch.tensor(priors)
torch.clamp(priors, 0.0, 1.0, out=priors)
return priors
priors = generate_priors(320)
import torch
from torch.nn import Conv2d, Sequential, ModuleList, BatchNorm2d
from torch import nn
from module.mobilent_v2 import MobileNetV2, InvertedResidual
from module.ssd import SSD, GraphPath
from utils.predictor import Predictor
from utils.argument import _argument
from model.config import mb_ssd_lite_f19_config as config
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, onnx_compatible=False):
"""Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.
"""
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
return Sequential(
Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
groups=in_channels, stride=stride, padding=padding),
BatchNorm2d(in_channels),
ReLU(),
Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
)
def create_mb_ssd_lite_f19(num_classes, width_mult=1.0, use_batch_norm=True, onnx_compatible=False, is_test=False):
anchors = [6,6,6,6,6,6]
base_net = MobileNetV2(width_mult=width_mult, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible).features
source_layer_indexes = [GraphPath(14, 'conv', 3),19,]
extras = ModuleList([
InvertedResidual(1280, 512, stride=2, expand_ratio=0.2),
InvertedResidual(512, 256, stride=2, expand_ratio=0.25),
InvertedResidual(256, 256, stride=2, expand_ratio=0.5),
InvertedResidual(256, 64, stride=2, expand_ratio=0.25)
])
regression_headers = ModuleList([
SeperableConv2d(in_channels=round(576 * width_mult), out_channels=anchors[0] * 4,
kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=1280, out_channels=anchors[1] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=512, out_channels=anchors[2] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=256, out_channels=anchors[3] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=256, out_channels=anchors[4] * 4, kernel_size=3, padding=1, onnx_compatible=False),
Conv2d(in_channels=64, out_channels=anchors[5] * 4, kernel_size=1),
])
classification_headers = ModuleList([
SeperableConv2d(in_channels=round(576 * width_mult), out_channels=anchors[0] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=1280, out_channels=anchors[1] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=anchors[2] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=anchors[3] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=anchors[4] * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=64, out_channels=anchors[5] * num_classes, kernel_size=1),
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_mb_ssd_lite_f19_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=torch.device('cpu')):
predictor = Predictor(net, config.image_size, config.image_mean,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor
\ No newline at end of file
import torch
from torch.nn import Conv2d, Sequential, ModuleList, BatchNorm2d
from torch import nn
from module.mobilent_v2 import MobileNetV2, InvertedResidual
from module.ssd import SSD, GraphPath
from utils.predictor import Predictor
from utils.argument import _argument
from model.config import mb_ssd_lite_f38_config as config
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, onnx_compatible=False):
"""Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.
"""
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
return Sequential(
Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
groups=in_channels, stride=stride, padding=padding),
BatchNorm2d(in_channels),
ReLU(),
Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
)
def create_mb_ssd_lite_f38(num_classes, width_mult=1.0, use_batch_norm=True, onnx_compatible=False, is_test=False):
anchors = [2,2,2,2]
base_net = MobileNetV2(width_mult=width_mult, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible).features
source_layer_indexes = [GraphPath(7, 'conv', 3),GraphPath(14, 'conv', 3),19,]
extras = ModuleList([
InvertedResidual(1280, 512, stride=2, expand_ratio=0.2),
])
regression_headers = ModuleList([
SeperableConv2d(in_channels=round(192 * width_mult), out_channels=anchors[0] * 4,kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=576, out_channels=anchors[1] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=1280, out_channels=anchors[2] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=512, out_channels=anchors[3] * 4, kernel_size=3, padding=1, onnx_compatible=False),
])
classification_headers = ModuleList([
SeperableConv2d(in_channels=round(192 * width_mult), out_channels=anchors[0] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=576, out_channels=anchors[1] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=1280, out_channels=anchors[2] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=anchors[3] * num_classes, kernel_size=3, padding=1),
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_mb_ssd_lite_f38_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=torch.device('cpu')):
predictor = Predictor(net, config.image_size, config.image_mean,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor
import torch
from torch.nn import Conv2d, Sequential, ModuleList, BatchNorm2d
from torch import nn
from module.mobilent_v2 import MobileNetV2, InvertedResidual
from module.ssd import SSD, GraphPath
from utils.predictor import Predictor
from utils.argument import _argument
from model.config import mb_ssd_lite_f38_person_config as config
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0, onnx_compatible=False):
"""Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.
"""
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
return Sequential(
Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
groups=in_channels, stride=stride, padding=padding),
BatchNorm2d(in_channels),
ReLU(),
Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
)
def create_mb_ssd_lite_f38_person(num_classes, width_mult=1.0, use_batch_norm=True, onnx_compatible=False, is_test=False):
anchors = [6,6,6,6,6,6,6]
base_net = MobileNetV2(width_mult=width_mult, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible).features
source_layer_indexes = [GraphPath(7, 'conv', 3),GraphPath(14, 'conv', 3),19]
extras = ModuleList([
InvertedResidual(1280, 512, stride=2, expand_ratio=0.2),
InvertedResidual(512, 256, stride=2, expand_ratio=0.25),
InvertedResidual(256, 256, stride=2, expand_ratio=0.5),
InvertedResidual(256, 64, stride=2, expand_ratio=0.25)
])
regression_headers = ModuleList([
SeperableConv2d(in_channels=round(192 * width_mult), out_channels=anchors[0] * 4,kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=576, out_channels=anchors[1] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=1280, out_channels=anchors[2] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=512, out_channels=anchors[3] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=256, out_channels=anchors[3] * 4, kernel_size=3, padding=1, onnx_compatible=False),
SeperableConv2d(in_channels=256, out_channels=anchors[4] * 4, kernel_size=3, padding=1, onnx_compatible=False),
Conv2d(in_channels=64, out_channels=anchors[5] * 4, kernel_size=1),
])
classification_headers = ModuleList([
SeperableConv2d(in_channels=round(192 * width_mult), out_channels=anchors[0] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=576, out_channels=anchors[1] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=1280, out_channels=anchors[2] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=512, out_channels=anchors[3] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=anchors[3] * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=256, out_channels=anchors[4] * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=64, out_channels=anchors[5] * num_classes, kernel_size=1),
])
return SSD(num_classes, base_net, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config)
def create_mb_ssd_lite_f38_person_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=torch.device('cpu')):
predictor = Predictor(net, config.image_size, config.image_mean,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor
from torch.nn import Conv2d, Sequential, ModuleList, ReLU, BatchNorm2d
from module.rfb_tiny_mobilenet_v2 import Mb_Tiny_RFB
from model.config import rfb_tiny_mb_ssd_config as config
from utils.predictor import Predictor
from module.ssd import SSD
def SeperableConv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0):
"""Replace Conv2d with a depthwise Conv2d and Pointwise Conv2d.
"""
return Sequential(
Conv2d(in_channels=in_channels, out_channels=in_channels, kernel_size=kernel_size,
groups=in_channels, stride=stride, padding=padding),
BatchNorm2d(in_channels),
ReLU(),
Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=1),
)
def create_rfb_tiny_mb_ssd(num_classes, is_test=False, device="cuda"):
base_net = Mb_Tiny_RFB(2)
base_net_model = base_net.model # disable dropout layer
source_layer_indexes = [8,11,13]
extras = ModuleList([
Sequential(
Conv2d(in_channels=base_net.base_channel * 16, out_channels=base_net.base_channel * 4, kernel_size=1),
ReLU(),
SeperableConv2d(in_channels=base_net.base_channel * 4, out_channels=base_net.base_channel * 16, kernel_size=3, stride=2, padding=1),
ReLU()
)
])
regression_headers = ModuleList([
SeperableConv2d(in_channels=base_net.base_channel * 4, out_channels=9 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=base_net.base_channel * 8, out_channels=6 * 4, kernel_size=3, padding=1),
SeperableConv2d(in_channels=base_net.base_channel * 16, out_channels=6 * 4, kernel_size=3, padding=1),
Conv2d(in_channels=base_net.base_channel * 16, out_channels=9 * 4, kernel_size=3, padding=1)
])
classification_headers = ModuleList([
SeperableConv2d(in_channels=base_net.base_channel * 4, out_channels=9 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=base_net.base_channel * 8, out_channels=6 * num_classes, kernel_size=3, padding=1),
SeperableConv2d(in_channels=base_net.base_channel * 16, out_channels=6 * num_classes, kernel_size=3, padding=1),
Conv2d(in_channels=base_net.base_channel * 16, out_channels=9 * num_classes, kernel_size=3, padding=1)
])
return SSD(num_classes, base_net_model, source_layer_indexes,
extras, classification_headers, regression_headers, is_test=is_test, config=config, device=device)
def create_rfb_tiny_mb_ssd_predictor(net, candidate_size=200, nms_method=None, sigma=0.5, device=None):
predictor = Predictor(net, config.image_size, config.image_mean_test,
config.image_std,
nms_method=nms_method,
iou_threshold=config.iou_threshold,
candidate_size=candidate_size,
sigma=sigma,
device=device)
return predictor
import torch.nn as nn
import math
# Modified from https://github.com/tonylins/pytorch-mobilenet-v2/blob/master/MobileNetV2.py.
# In this version, Relu6 is replaced with Relu to make it ONNX compatible.
# BatchNorm Layer is optional to make it easy do batch norm confusion.
def conv_bn(inp, oup, stride, use_batch_norm=True, onnx_compatible=False):
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
if use_batch_norm:
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup),
ReLU(inplace=True)
)
else:
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
ReLU(inplace=True)
)
def conv_1x1_bn(inp, oup, use_batch_norm=True, onnx_compatible=False):
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
if use_batch_norm:
return nn.Sequential(
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
ReLU(inplace=True)
)
else:
return nn.Sequential(
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
ReLU(inplace=True)
)
class InvertedResidual(nn.Module):
def __init__(self, inp, oup, stride, expand_ratio, use_batch_norm=True, onnx_compatible=False):
super(InvertedResidual, self).__init__()
ReLU = nn.ReLU if onnx_compatible else nn.ReLU6
self.stride = stride
assert stride in [1, 2]
hidden_dim = round(inp * expand_ratio)
self.use_res_connect = self.stride == 1 and inp == oup
if expand_ratio == 1:
if use_batch_norm:
self.conv = nn.Sequential(
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
nn.BatchNorm2d(hidden_dim),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
)
else:
self.conv = nn.Sequential(
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
)
else:
if use_batch_norm:
self.conv = nn.Sequential(
# pw
nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
nn.BatchNorm2d(hidden_dim),
ReLU(inplace=True),
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
nn.BatchNorm2d(hidden_dim),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
)
else:
self.conv = nn.Sequential(
# pw
nn.Conv2d(inp, hidden_dim, 1, 1, 0, bias=False),
ReLU(inplace=True),
# dw
nn.Conv2d(hidden_dim, hidden_dim, 3, stride, 1, groups=hidden_dim, bias=False),
ReLU(inplace=True),
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
)
def forward(self, x):
if self.use_res_connect:
return x + self.conv(x)
else:
return self.conv(x)
class MobileNetV2(nn.Module):
def __init__(self, n_class=1000, input_size=224, width_mult=1., dropout_ratio=0.2,
use_batch_norm=True, onnx_compatible=False):
super(MobileNetV2, self).__init__()
block = InvertedResidual
input_channel = 32
last_channel = 1280
interverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
]
# building first layer
assert input_size % 32 == 0
input_channel = int(input_channel * width_mult)
self.last_channel = int(last_channel * width_mult) if width_mult > 1.0 else last_channel
self.features = [conv_bn(3, input_channel, 2, onnx_compatible=onnx_compatible)]
# building inverted residual blocks
for t, c, n, s in interverted_residual_setting:
output_channel = int(c * width_mult)
for i in range(n):
if i == 0:
self.features.append(block(input_channel, output_channel, s,
expand_ratio=t, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible))
else:
self.features.append(block(input_channel, output_channel, 1,
expand_ratio=t, use_batch_norm=use_batch_norm,
onnx_compatible=onnx_compatible))
input_channel = output_channel
# building last several layers
self.features.append(conv_1x1_bn(input_channel, self.last_channel,
use_batch_norm=use_batch_norm, onnx_compatible=onnx_compatible))
# make it nn.Sequential
self.features = nn.Sequential(*self.features)
# building classifier
self.classifier = nn.Sequential(
nn.Dropout(dropout_ratio),
nn.Linear(self.last_channel, n_class),
)
self._initialize_weights()
def forward(self, x):
x = self.features(x)
x = x.mean(3).mean(2)
x = self.classifier(x)
return x
def _initialize_weights(self):
for m in self.modules():
if isinstance(m, nn.Conv2d):
n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
m.weight.data.normal_(0, math.sqrt(2. / n))
if m.bias is not None:
m.bias.data.zero_()
elif isinstance(m, nn.BatchNorm2d):
m.weight.data.fill_(1)
m.bias.data.zero_()
elif isinstance(m, nn.Linear):
n = m.weight.size(1)
m.weight.data.normal_(0, 0.01)
m.bias.data.zero_()
import torch
import torch.nn as nn
import torch.nn.functional as F
class BasicConv(nn.Module):
def __init__(self, in_planes, out_planes, kernel_size, stride=1, padding=0, dilation=1, groups=1, relu=True, bn=True):
super(BasicConv, self).__init__()
self.out_channels = out_planes
if bn:
self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=False)
self.bn = nn.BatchNorm2d(out_planes, eps=1e-5, momentum=0.01, affine=True)
self.relu = nn.ReLU(inplace=True) if relu else None
else:
self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=padding, dilation=dilation, groups=groups, bias=True)
self.bn = None
self.relu = nn.ReLU(inplace=True) if relu else None
def forward(self, x):
x = self.conv(x)
if self.bn is not None:
x = self.bn(x)
if self.relu is not None:
x = self.relu(x)
return x
class BasicRFB(nn.Module):
def __init__(self, in_planes, out_planes, stride=1, scale=0.1, map_reduce=8, vision=1, groups=1):
super(BasicRFB, self).__init__()
self.scale = scale
self.out_channels = out_planes
inter_planes = in_planes // map_reduce
self.branch0 = nn.Sequential(
BasicConv(in_planes, inter_planes, kernel_size=1, stride=1, groups=groups, relu=False),
BasicConv(inter_planes, 2 * inter_planes, kernel_size=(3, 3), stride=stride, padding=(1, 1), groups=groups),
BasicConv(2 * inter_planes, 2 * inter_planes, kernel_size=3, stride=1, padding=vision + 1, dilation=vision + 1, relu=False, groups=groups)
)
self.branch1 = nn.Sequential(
BasicConv(in_planes, inter_planes, kernel_size=1, stride=1, groups=groups, relu=False),
BasicConv(inter_planes, 2 * inter_planes, kernel_size=(3, 3), stride=stride, padding=(1, 1), groups=groups),
BasicConv(2 * inter_planes, 2 * inter_planes, kernel_size=3, stride=1, padding=vision + 2, dilation=vision + 2, relu=False, groups=groups)
)
self.branch2 = nn.Sequential(
BasicConv(in_planes, inter_planes, kernel_size=1, stride=1, groups=groups, relu=False),
BasicConv(inter_planes, (inter_planes // 2) * 3, kernel_size=3, stride=1, padding=1, groups=groups),
BasicConv((inter_planes // 2) * 3, 2 * inter_planes, kernel_size=3, stride=stride, padding=1, groups=groups),
BasicConv(2 * inter_planes, 2 * inter_planes, kernel_size=3, stride=1, padding=vision + 4, dilation=vision + 4, relu=False, groups=groups)
)
self.ConvLinear = BasicConv(6 * inter_planes, out_planes, kernel_size=1, stride=1, relu=False)
self.shortcut = BasicConv(in_planes, out_planes, kernel_size=1, stride=stride, relu=False)
self.relu = nn.ReLU(inplace=False)
def forward(self, x):
x0 = self.branch0(x)
x1 = self.branch1(x)
x2 = self.branch2(x)
out = torch.cat((x0, x1, x2), 1)
out = self.ConvLinear(out)
short = self.shortcut(x)
out = out * self.scale + short
out = self.relu(out)
return out
class Mb_Tiny_RFB(nn.Module):
def __init__(self, num_classes=2):
super(Mb_Tiny_RFB, self).__init__()
self.base_channel = 64
def conv_bn(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace=True)
)
def conv_dw(inp, oup, stride):
return nn.Sequential(
nn.Conv2d(inp, inp, 3, stride, 1, groups=inp, bias=False),
nn.BatchNorm2d(inp),
nn.ReLU(inplace=True),
nn.Conv2d(inp, oup, 1, 1, 0, bias=False),
nn.BatchNorm2d(oup),
nn.ReLU(inplace=True),
)
self.model = nn.Sequential(
conv_bn(3, self.base_channel, 2), # 160*120
conv_dw(self.base_channel, self.base_channel * 2, 1),
conv_dw(self.base_channel * 2, self.base_channel * 2, 2), # 80*60
conv_dw(self.base_channel * 2, self.base_channel * 2, 1),
conv_dw(self.base_channel * 2, self.base_channel * 4, 2), # 40*30
conv_dw(self.base_channel * 4, self.base_channel * 4, 1),
conv_dw(self.base_channel * 4, self.base_channel * 4, 1),
BasicRFB(self.base_channel * 4, self.base_channel * 4, stride=1, scale=1.0),
conv_dw(self.base_channel * 4, self.base_channel * 8, 2), # 20*15
conv_dw(self.base_channel * 8, self.base_channel * 8, 1),
conv_dw(self.base_channel * 8, self.base_channel * 8, 1),
conv_dw(self.base_channel * 8, self.base_channel * 16, 2), # 10*8
conv_dw(self.base_channel * 16, self.base_channel * 16, 1)
)
self.fc = nn.Linear(1024, num_classes)
def forward(self, x):
x = self.model(x)
x = F.avg_pool2d(x, 7)
x = x.view(-1, 1024)
x = self.fc(x)
return x
import torch.nn as nn
import torch
import numpy as np
from typing import List, Tuple
import torch.nn.functional as F
# from ..utils import box_utils
from utils import box_processing as box_utils
from collections import namedtuple
GraphPath = namedtuple("GraphPath", ['s0', 'name', 's1']) #
class SSD(nn.Module):
def __init__(self, num_classes: int, base_net: nn.ModuleList, source_layer_indexes: List[int],
extras: nn.ModuleList, classification_headers: nn.ModuleList,
regression_headers: nn.ModuleList, is_test=False, config=None, device=None):
"""Compose a SSD model using the given components.
"""
super(SSD, self).__init__()
self.num_classes = num_classes
self.base_net = base_net
self.source_layer_indexes = source_layer_indexes
self.extras = extras
self.classification_headers = classification_headers
self.regression_headers = regression_headers
self.is_test = is_test
self.config = config
# register layers in source_layer_indexes by adding them to a module list
self.source_layer_add_ons = nn.ModuleList([t[1] for t in source_layer_indexes
if isinstance(t, tuple) and not isinstance(t, GraphPath)])
if device:
self.device = device
else:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
if is_test:
self.config = config
self.priors = config.priors.to(self.device)
def forward(self, x: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
confidences = []
locations = []
start_layer_index = 0
header_index = 0
for end_layer_index in self.source_layer_indexes:
if isinstance(end_layer_index, GraphPath):
path = end_layer_index
end_layer_index = end_layer_index.s0
added_layer = None
elif isinstance(end_layer_index, tuple):
added_layer = end_layer_index[1]
end_layer_index = end_layer_index[0]
path = None
else:
added_layer = None
path = None
for layer in self.base_net[start_layer_index: end_layer_index]:
x = layer(x)
if added_layer:
y = added_layer(x)
else:
y = x
if path:
sub = getattr(self.base_net[end_layer_index], path.name)
for layer in sub[:path.s1]:
x = layer(x)
y = x
for layer in sub[path.s1:]:
x = layer(x)
end_layer_index += 1
start_layer_index = end_layer_index
confidence, location = self.compute_header(header_index, y)
header_index += 1
confidences.append(confidence)
locations.append(location)
for layer in self.base_net[end_layer_index:]:
x = layer(x)
for layer in self.extras:
x = layer(x)
confidence, location = self.compute_header(header_index, x)
header_index += 1
confidences.append(confidence)
locations.append(location)
confidences = torch.cat(confidences, 1)
locations = torch.cat(locations, 1)
if self.is_test:
confidences = F.softmax(confidences, dim=2)
boxes = box_utils.convert_locations_to_boxes(
locations, self.priors, self.config.center_variance, self.config.size_variance
)
boxes = box_utils.center_form_to_corner_form(boxes)
return confidences, boxes
else:
return confidences, locations
def compute_header(self, i, x):
confidence = self.classification_headers[i](x)
confidence = confidence.permute(0, 2, 3, 1).contiguous()
confidence = confidence.view(confidence.size(0), -1, self.num_classes)
location = self.regression_headers[i](x)
location = location.permute(0, 2, 3, 1).contiguous()
location = location.view(location.size(0), -1, 4)
return confidence, location
def init_from_base_net(self, model):
self.base_net.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage), strict=True)
self.source_layer_add_ons.apply(_xavier_init_)
self.extras.apply(_xavier_init_)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def init_from_pretrained_ssd(self, model):
state_dict = torch.load(model, map_location=lambda storage, loc: storage)
state_dict = {k: v for k, v in state_dict.items() if not (k.startswith("classification_headers") or k.startswith("regression_headers"))}
model_dict = self.state_dict()
model_dict.update(state_dict)
self.load_state_dict(model_dict)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def init(self):
self.base_net.apply(_xavier_init_)
self.source_layer_add_ons.apply(_xavier_init_)
self.extras.apply(_xavier_init_)
self.classification_headers.apply(_xavier_init_)
self.regression_headers.apply(_xavier_init_)
def load(self, model):
self.load_state_dict(torch.load(model, map_location=lambda storage, loc: storage))
def save(self, model_path):
torch.save(self.state_dict(), model_path)
class MatchPrior(object):
def __init__(self, center_form_priors, center_variance, size_variance, iou_threshold):
self.center_form_priors = center_form_priors
self.corner_form_priors = box_utils.center_form_to_corner_form(center_form_priors)
self.center_variance = center_variance
self.size_variance = size_variance
self.iou_threshold = iou_threshold
def __call__(self, gt_boxes, gt_labels):
if type(gt_boxes) is np.ndarray:
gt_boxes = torch.from_numpy(gt_boxes)
if type(gt_labels) is np.ndarray:
gt_labels = torch.from_numpy(gt_labels)
boxes, labels = box_utils.assign_priors(gt_boxes, gt_labels,
self.corner_form_priors, self.iou_threshold)
boxes = box_utils.corner_form_to_center_form(boxes)
locations = box_utils.convert_boxes_to_locations(boxes, self.center_form_priors, self.center_variance, self.size_variance)
return locations, labels
def _xavier_init_(m: nn.Module):
if isinstance(m, nn.Conv2d):
nn.init.xavier_uniform_(m.weight)
\ No newline at end of file
import sys
sys.path.append('/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector')
from model.mb_ssd_lite_f19 import create_mb_ssd_lite_f19, create_mb_ssd_lite_f19_predictor
from model.mb_ssd_lite_f38 import create_mb_ssd_lite_f38, create_mb_ssd_lite_f38_predictor
from model.mb_ssd_lite_f38_person import create_mb_ssd_lite_f38_person, create_mb_ssd_lite_f38_person_predictor
from model.rfb_tiny_mb_ssd import create_rfb_tiny_mb_ssd, create_rfb_tiny_mb_ssd_predictor
from utils.misc import Timer
from torchscope import scope
import argparse
import cv2
import sys
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
parser = argparse.ArgumentParser(description='Single Shot MultiBox Detector predictor With Pytorch')
parser.add_argument("--net_type", default="rfb_tiny_mb2_ssd", type=str,help='mb2-ssd-lite_f19, mb2-ssd-lite_f38, rfb_tiny_mb2_ssd')
parser.add_argument('--model_path', default = '/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/app/person/rfb_tiny_mb2_ssd_c32/rfb_tiny_mb2_ssd_c32_63_208_222.pth',
help='model weight')
parser.add_argument('--label_path', default = '/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/utils/labels/person.txt', help='class names lable')
parser.add_argument('--result_path', default = 'detect_results', help='result path to save')
parser.add_argument('--test_path', default = "/media/ducanh/DATA/tienln/data/test_data/mall", help='path of folder test')
parser.add_argument('--test_device', default="cuda:0", type=str,help='cuda:0 or cpu')
args = parser.parse_args()
def load_model():
class_names = [name.strip() for name in open(args.label_path).readlines()]
if args.net_type == 'mb2-ssd-lite_f19':
net = create_mb_ssd_lite_f19(len(class_names), is_test=True)
net.load(args.model_path)
predictor = create_mb_ssd_lite_f19_predictor(net, candidate_size=200)
elif args.net_type == 'mb2-ssd-lite_f38':
net = create_mb_ssd_lite_f38(len(class_names), is_test=True, )
predictor = create_mb_ssd_lite_f38_predictor(net, candidate_size=2000)
net.load(args.model_path)
elif args.net_type == 'mb2-ssd-lite_f38_person':
net = create_mb_ssd_lite_f38_person(len(class_names), is_test=True, )
predictor = create_mb_ssd_lite_f38_person_predictor(net, candidate_size=2000)
net.load(args.model_path)
elif args.net_type == 'rfb_tiny_mb2_ssd':
net = create_rfb_tiny_mb_ssd(len(class_names), is_test=True, device=args.test_device)
net.load(args.model_path)
predictor = create_rfb_tiny_mb_ssd_predictor(net, candidate_size=5000, device=args.test_device)
else:
print("The net type is wrong. It should be one of vgg16-ssd, mb1-ssd and mb1-ssd-lite.")
sys.exit(1)
scope(net, (3, 300, 300))
return predictor
if __name__ == "__main__":
tt_time = 0
predictor = load_model()
if not os.path.exists(args.result_path):
os.makedirs(args.result_path)
listdir = os.listdir(args.test_path)
sum = 0
for image_path in listdir:
orig_image = cv2.imread(os.path.join(args.test_path, image_path))
# orig_image = cv2.resize(orig_image, (640,480))
image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB)
import time
t1 = time.time()
boxes, labels, probs = predictor.predict(image, 2000,0.5)
tt_time += (time.time()-t1)
probs = probs.numpy()
sum += boxes.size(0)
for i in range(boxes.size(0)):
box = boxes[i, :]
cv2.rectangle(orig_image, (box[0], box[1]), (box[2], box[3]), (0,0,255), 2)
cv2.putText(orig_image, str(probs[i]), (box[0], box[1]+20),cv2.FONT_HERSHEY_DUPLEX, 0.3, (255, 255, 255))
cv2.putText(orig_image, str(boxes.size(0)), (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.imwrite(os.path.join(args.result_path, image_path), orig_image)
print(f"Found {len(probs)} object. The output image is {args.result_path}")
print(sum, tt_time/36) #101002540945
\ No newline at end of file
import sys
sys.path.append('/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector')
from model.mb_ssd_lite_f19 import create_mb_ssd_lite_f19, create_mb_ssd_lite_f19_predictor
from model.mb_ssd_lite_f38 import create_mb_ssd_lite_f38, create_mb_ssd_lite_f38_predictor
from model.rfb_tiny_mb_ssd import create_rfb_tiny_mb_ssd, create_rfb_tiny_mb_ssd_predictor
from utils.misc import Timer
import time
import argparse
import cv2
import sys
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
parser = argparse.ArgumentParser(description='Single Shot MultiBox Detector predictor With Pytorch')
parser.add_argument("--net_type", default="rfb_tiny_mb2_ssd", type=str,help='mb2-ssd-lite_f19, mb2-ssd-lite_f38, rfb_tiny_mb2_ssd')
parser.add_argument('--model_path', default = 'app/person/rfb_tiny_mb2_ssd_c32/rfb_tiny_mb2_ssd_c32_63_208_222.pth',
help='model weight')
parser.add_argument('--label_path', default = '/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/utils/labels/person.txt', help='class names lable')
parser.add_argument('--result_path', default = 'results', help='result path to save')
parser.add_argument('--test_path', default = "mall", help='path of folder test')
parser.add_argument('--test_device', default="cpu", type=str,help='cuda:0 or cpu')
args = parser.parse_args()
# mb2-ssd-lite_FPN38-epoch-150-train_loss-1.93-val_loss-1.4
# 'rtsp://root:123456@192.168.0.241/axis-media/media.3gp'
capture = cv2.VideoCapture(0)
class_names = [name.strip() for name in open(args.label_path).readlines()]
num_classes = len(class_names)
def load_model():
class_names = [name.strip() for name in open(args.label_path).readlines()]
if args.net_type == 'mb2-ssd-lite_f19':
net = create_mb_ssd_lite_f19(len(class_names), is_test=True)
net.load(args.model_path)
predictor = create_mb_ssd_lite_f19_predictor(net, candidate_size=200)
elif args.net_type == 'mb2-ssd-lite_f38':
net = create_mb_ssd_lite_f38(len(class_names), is_test=True, )
predictor = create_mb_ssd_lite_f38_predictor(net, candidate_size=200)
net.load(args.model_path)
elif args.net_type == 'rfb_tiny_mb2_ssd':
net = create_rfb_tiny_mb_ssd(len(class_names), is_test=True, device=args.test_device)
net.load(args.model_path)
predictor = create_rfb_tiny_mb_ssd_predictor(net, candidate_size=5000, device=args.test_device)
else:
print("The net type is wrong. It should be one of vgg16-ssd, mb1-ssd and mb1-ssd-lite.")
sys.exit(1)
return predictor
def live_demo():
predictor = load_model()
timer = Timer()
count = 0
while True:
ret, orig_image = capture.read()
count += 1
if count ==15:
name = int(time.time())
cv2.imwrite('/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/image_test_size/'+ str(name)+'.jpg', orig_image)
count = 0
print(name)
orig_image = cv2.resize(orig_image, (480,360))
image = cv2.cvtColor(orig_image, cv2.COLOR_BGR2RGB)
timer.start()
boxes, labels, probs = predictor.predict(image, 2000, 0.5)
interval = timer.end()
probs = probs.numpy()
# print('Time: {:.2f}s, Detect Objects: {:d}.'.format(interval, labels.size(0)))
for i in range(boxes.size(0)):
box = boxes[i, :]
label = f"{class_names[labels[i]]}: {probs[i]:.2f}"
# cv2.rectangle(orig_image, (box[0], box[1]), (box[2], box[3]), (0, 0, 255), 2)
# cv2.putText(orig_image, str(probs[i]), (box[0], box[1]+20),cv2.FONT_HERSHEY_DUPLEX, 0.3, (255, 255, 255))
# cv2.putText(orig_image,"number of people: " + str(boxes.size(0)), (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.imshow('annotated', orig_image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
live_demo()
\ No newline at end of file
import sys
sys.path.append('/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/')
from utils.misc import str2bool, Timer, freeze_net_layers, store_labels
from torch.optim.lr_scheduler import CosineAnnealingLR, MultiStepLR
import os
import logging
import sys
import itertools
import torch
from torchscope import scope
from torchsummary import summary
from utils.loss import MultiboxLoss, FocalLoss
from utils.argument import _argument
from train import train, test, data_loader, create_network
from model.mb_ssd_lite_f38 import create_mb_ssd_lite_f38
from model.config import mb_ssd_lite_f38_config
from model.mb_ssd_lite_f38_person import create_mb_ssd_lite_f38_person
from model.config import mb_ssd_lite_f38_person_config
from model.mb_ssd_lite_f19 import create_mb_ssd_lite_f19
from model.config import mb_ssd_lite_f19_config
from model.rfb_tiny_mb_ssd import create_rfb_tiny_mb_ssd
from model.config import rfb_tiny_mb_ssd_config
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
class Train():
'''
The class to training
'''
def __init__(self):
self.args = _argument()
self.device = torch.device("cuda:0" if torch.cuda.is_available() and self.args.use_cuda else "cpu")
self.net, self.criterion, self.optimizer, self.scheduler, self.train_loader, self.val_loader = self.get_model()
self.dir_path = os.path.join(self.args.checkpoint_folder,self.args.net)
if not os.path.exists(self.dir_path):
os.makedirs(self.dir_path)
def get_model(self):
timer = Timer()
logging.info(self.args)
if self.args.net == 'mb2-ssd-lite_f19':
create_net = create_mb_ssd_lite_f19
config = mb_ssd_lite_f19_config
elif self.args.net == 'mb2-ssd-lite_f38':
create_net = create_mb_ssd_lite_f38
config = mb_ssd_lite_f38_config
elif self.args.net == 'mb2-ssd-lite_f38_person':
create_net = create_mb_ssd_lite_f38_person
config = mb_ssd_lite_f38_person_config
elif self.args.net == 'rfb_tiny_mb2_ssd':
create_net = create_rfb_tiny_mb_ssd
config = rfb_tiny_mb_ssd_config
else:
logging.fatal("The net type is wrong.")
parser.print_help(sys.stderr)
sys.exit(1)
train_loader,val_loader, num_classes = data_loader(config)
net, criterion, optimizer, scheduler = create_network(create_net, num_classes, self.device)
return net, criterion, optimizer, scheduler, train_loader, val_loader
def training (self):
print(self.dir_path)
for epoch in range(0, self.args.num_epochs):
self.scheduler.step()
training_loss = train(self.train_loader, self.net, self.criterion, self.optimizer, device=self.device, debug_steps=self.args.debug_steps, epoch=epoch)
if epoch % self.args.validation_epochs == 0 or epoch == self.args.num_epochs - 1:
if self.args.valid:
val_running_loss, val_running_regression_loss, val_running_classification_loss = test(self.val_loader,self.net,self.criterion,device=self.device)
logging.info(
f"Epoch: {epoch}, " +
f"val_avg_loss: {val_running_loss:.4f}, " +
f"val_reg_loss {val_running_regression_loss:.4f}, " +
f"val_cls_loss: {val_running_classification_loss:.4f}")
model_path = os.path.join(self.dir_path, f"{self.args.net}-epoch-{epoch}-train_loss-{round(training_loss,2)}-val_loss-{round(val_running_loss,2)}.pth")
else :
model_path = os.path.join(self.dir_path, f"{self.args.net}-epoch-{epoch}-train_loss-{round(training_loss,2)}.pth")
self.net.save(model_path)
logging.info(f"Saved model {self.dir_path}")
if __name__ == '__main__':
train = Train().training()
\ No newline at end of file
from utils.argument import _argument
import logging
import sys
import itertools
from utils.misc import str2bool, Timer, freeze_net_layers, store_labels
from torch.optim.lr_scheduler import CosineAnnealingLR, MultiStepLR
from datasets.data_loader import _DataLoader
from module.ssd import MatchPrior
from datasets.data_preprocessing import TrainAugmentation, TestTransform
from torch.utils.data import DataLoader, ConcatDataset
from utils.loss import MultiboxLoss, FocalLoss
from torchsummary import summary
import torch
from torchscope import scope
import sys
sys.path.append('/media/ducanh/DATA/tienln/ai_camera/detector/')
from utils.misc import str2bool, Timer, freeze_net_layers, store_labels
timer = Timer()
args = _argument()
def train(loader, net, criterion, optimizer, device, debug_steps=100, epoch=-1):
net.train(True)
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
training_loss = 0.0
for i, data in enumerate(loader):
print(".", end="", flush=True)
images, boxes, labels = data
images = images.to(device)
boxes = boxes.to(device)
labels = labels.to(device)
optimizer.zero_grad()
confidence, locations = net(images)
regression_loss, classification_loss = criterion(confidence, locations, labels, boxes) # TODO CHANGE BOXES
loss = regression_loss + classification_loss
loss.backward()
optimizer.step()
running_loss += loss.item()
running_regression_loss += regression_loss.item()
running_classification_loss += classification_loss.item()
if i and i % args.debug_steps == 0:
avg_loss = running_loss / debug_steps
avg_reg_loss = running_regression_loss / debug_steps
avg_clf_loss = running_classification_loss / debug_steps
logging.info(
f"Epoch: {epoch}, Step: {i}, " +
f"train_avg_loss: {avg_loss:.4f}, " +
f"train_reg_loss: {avg_reg_loss:.4f}, " +
f"train_cls_loss: {avg_clf_loss:.4f}"
)
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
training_loss = avg_loss
return training_loss
def test(loader, net, criterion, device):
net.eval()
running_loss = 0.0
running_regression_loss = 0.0
running_classification_loss = 0.0
num = 0
for _, data in enumerate(loader):
images, boxes, labels = data
images = images.to(device)
boxes = boxes.to(device)
labels = labels.to(device)
num += 1
with torch.no_grad():
confidence, locations = net(images)
regression_loss, classification_loss = criterion(confidence, locations, labels, boxes)
loss = regression_loss + classification_loss
running_loss += loss.item()
running_regression_loss += regression_loss.item()
running_classification_loss += classification_loss.item()
return running_loss / num, running_regression_loss / num, running_classification_loss / num
def data_loader(config):
train_transform = TrainAugmentation(config.image_size, config.image_mean, config.image_std)
target_transform = MatchPrior(config.priors, config.center_variance,config.size_variance, config.iou_threshold)
test_transform = TestTransform(config.image_size, config.image_mean, config.image_std)
logging.info("Prepare training datasets.")
Data_Train = []
Data_Valid = []
datasets = []
path_dataset = open("/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/datasets/train_dataset.txt", "r")
for line in path_dataset:
data = line.split('+')
Data_Train.append([data[0],data[1][:-1]])
# training datasets
# dataset_paths = [Data_Train[0],Data_Train[1],Data_Train[2],Data_Train[3],Data_Train[4],Data_Train[5]]
dataset_paths = [Data_Train[3]]
for dataset_path in dataset_paths:
print(dataset_path)
dataset = _DataLoader(dataset_path, transform=train_transform,target_transform=target_transform)
print(len(dataset.ids))
datasets.append(dataset)
num_classes = len(dataset.class_names)
train_dataset = ConcatDataset(datasets)
logging.info("Train dataset size: {}".format(len(train_dataset)))
train_loader = DataLoader(train_dataset, args.batch_size,num_workers=args.num_workers,shuffle=True)
if args.valid:
# Validation datasets
path_dataset = open("/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/datasets/valid_dataset.txt", "r")
for line in path_dataset:
data = line.split('+')
Data_Valid.append([data[0],data[1][:-1]])
# print(Data_Valid)
logging.info("Prepare Validation datasets.")
valid_dataset_paths = [Data_Valid[0]]
for dataset_path in valid_dataset_paths:
val_dataset = _DataLoader(dataset_path, transform=test_transform,target_transform=target_transform)
val_loader = DataLoader(val_dataset, args.batch_size,num_workers=args.num_workers,shuffle=True)
return train_loader, val_loader, num_classes
else:
return train_loader, num_classes
def create_network(create_net,num_classes, DEVICE ):
logging.info("Build network.")
net = create_net(num_classes)
# print(net)
min_loss = -10000.0
last_epoch = -1
base_net_lr = args.base_net_lr if args.base_net_lr is not None else args.lr
extra_layers_lr = args.extra_layers_lr if args.extra_layers_lr is not None else args.lr
if args.freeze_base_net:
logging.info("Freeze base net.")
freeze_net_layers(net.base_net)
params = itertools.chain(net.source_layer_add_ons.parameters(), net.extras.parameters(),
net.regression_headers.parameters(), net.classification_headers.parameters())
params = [
{'params': itertools.chain(
net.source_layer_add_ons.parameters(),
net.extras.parameters()
), 'lr': extra_layers_lr},
{'params': itertools.chain(
net.regression_headers.parameters(),
net.classification_headers.parameters()
)}
]
elif args.freeze_net:
freeze_net_layers(net.base_net)
freeze_net_layers(net.source_layer_add_ons)
freeze_net_layers(net.extras)
params = itertools.chain(net.regression_headers.parameters(), net.classification_headers.parameters())
logging.info("Freeze all the layers except prediction heads.")
else:
params = [
{'params': net.base_net.parameters(), 'lr': base_net_lr},
{'params': itertools.chain(
net.source_layer_add_ons.parameters(),
net.extras.parameters()
), 'lr': extra_layers_lr},
{'params': itertools.chain(
net.regression_headers.parameters(),
net.classification_headers.parameters()
)}
]
timer.start("Load Model")
if args.resume:
logging.info(f"Resume from the model {args.resume}")
net.load(args.resume)
elif args.base_net:
logging.info(f"Init from base net {args.base_net}")
net.init_from_base_net(args.base_net)
elif args.pretrained_ssd:
logging.info(f"Init from pretrained ssd {args.pretrained_ssd}")
net.init_from_pretrained_ssd(args.pretrained_ssd)
logging.info(f'Took {timer.end("Load Model"):.2f} seconds to load the model.')
net.to(DEVICE)
# criterion = MultiboxLoss(config.priors, iou_threshold=0.5, neg_pos_ratio=3,
# center_variance=0.1, size_variance=0.2, device=DEVICE)
criterion = FocalLoss()
optimizer = torch.optim.SGD(params, lr=args.lr, momentum=args.momentum,
weight_decay=args.weight_decay)
logging.info(f"Learning rate: {args.lr}, Base net learning rate: {base_net_lr}, "
+ f"Extra Layers learning rate: {extra_layers_lr}.")
if args.scheduler == 'multi-step':
logging.info("Uses MultiStepLR scheduler.")
milestones = [int(v.strip()) for v in args.milestones.split(",")]
scheduler = MultiStepLR(optimizer, milestones=milestones,
gamma=0.1, last_epoch=last_epoch)
elif args.scheduler == 'cosine':
logging.info("Uses CosineAnnealingLR scheduler.")
scheduler = CosineAnnealingLR(optimizer, args.t_max, last_epoch=last_epoch)
else:
logging.fatal(f"Unsupported Scheduler: {args.scheduler}.")
parser.print_help(sys.stderr)
sys.exit(1)
return net, criterion, optimizer, scheduler
import argparse
from utils.misc import str2bool
import logging
import sys
def _argument():
parser = argparse.ArgumentParser(
description='Single Shot MultiBox Detector Training With Pytorch')
parser.add_argument("--dataset_type", default="voc", type=str,
help='Specify dataset type. Currently support voc and open_images.')
parser.add_argument('--datasets', nargs='+', help='Dataset directory path')
parser.add_argument('--validation_dataset', help='Dataset directory path')
parser.add_argument('--balance_data', action='store_true',
help="Balance training data by down-sampling more frequent labels.")
parser.add_argument('--net', default="mb2-ssd-lite_f38_person",
help="It can be mb2-ssd-lite_f19, mb2-ssd-lite_f38,mb2-ssd-lite_f38_person, rfb_tiny_mb2_ssd")
parser.add_argument('--freeze_base_net', action='store_true',
help="Freeze base net layers.")
parser.add_argument('--freeze_net', action='store_true',
help="Freeze all the layers except the prediction head.")
parser.add_argument('--mb2_width_mult', default=1, type=float,
help='Width Multiplifier for MobilenetV2')
# Params for SGD
parser.add_argument('--lr', '--learning-rate', default=1e-2, type=float,
help='initial learning rate')
parser.add_argument('--momentum', default=0.9, type=float,
help='Momentum value for optim')
parser.add_argument('--weight_decay', default=5e-4, type=float,
help='Weight decay for SGD')
parser.add_argument('--gamma', default=0.1, type=float,
help='Gamma update for SGD')
parser.add_argument('--base_net_lr', default=None, type=float,
help='initial learning rate for base net.')
parser.add_argument('--extra_layers_lr', default=None, type=float,
help='initial learning rate for the layers not in base net and prediction heads.')
# Params for loading pretrained basenet or checkpoints.
parser.add_argument('--base_net',
help='Pretrained base model')
parser.add_argument('--pretrained_ssd', help='Pre-trained base model')
parser.add_argument('--resume', default=None, type=str,
help='Checkpoint state_dict file to resume training from')
# Scheduler
parser.add_argument('--scheduler', default="multi-step", type=str,
help="Scheduler for SGD. It can one of multi-step and cosine")
# Params for Multi-step Scheduler
parser.add_argument('--milestones', default="80,100", type=str,
help="milestones for MultiStepLR")
# Params for Cosine Annealingmodels/mb2-ssd-lite-epoch-8-train_loss-2.88-val_loss-2.6.pth
parser.add_argument('--t_max', default=120, type=float,
help='T_max value for Cosine Annealing Scheduler.')
# Train params
parser.add_argument('--batch_size', default=32, type=int,
help='Batch size for training')
parser.add_argument('--num_epochs', default=500, type=int,
help='the number epochs')
parser.add_argument('--num_workers', default=4, type=int,
help='Number of workers used in dataloading')
parser.add_argument('--validation_epochs', default=1, type=int,
help='the number epochs')
parser.add_argument('--debug_steps', default=20, type=int,
help='Set the debug log output frequency.')
parser.add_argument('--use_cuda', default=True, type=str2bool,
help='Use CUDA to train model')
parser.add_argument('--valid', default=True, type=bool,
help='valid when training')
parser.add_argument('--checkpoint_folder', default='/media/ducanh/DATA/tienln/ai_camera/ai_camera_detector/weight/training',
help='Directory for saving checkpoint models')
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
args = parser.parse_args()
return args
\ No newline at end of file
import collections
import torch
import itertools
from typing import List
import math
import collections
# SSDBoxSizes = collections.namedtuple('SSDBoxSizes', ['min', 'max'])
# SSDSpec = collections.namedtuple('SSDSpec', ['feature_map_size', 'shrinkage', 'box_sizes', 'aspect_ratios'])
# def generate_ssd_priors(specs: List[SSDSpec], image_size, clamp=True) -> torch.Tensor:
# """Generate SSD Prior Boxes.
# It returns the center, height and width of the priors. The values are relative to the image size
# Args:
# specs: SSDSpecs about the shapes of sizes of prior boxes. i.e.
# specs = [
# SSDSpec(38, 8, SSDBoxSizes(30, 60), [2]),
# SSDSpec(19, 16, SSDBoxSizes(60, 111), [2, 3]),
# SSDSpec(10, 32, SSDBoxSizes(111, 162), [2, 3]),
# SSDSpec(5, 64, SSDBoxSizes(162, 213), [2, 3]),
# SSDSpec(3, 100, SSDBoxSizes(213, 264), [2]),
# SSDSpec(1, 300, SSDBoxSizes(264, 315), [2])
# ]
# image_size: image size.
# clamp: if true, clamp the values to make fall between [0.0, 1.0]
# Returns:
# priors (num_priors, 4): The prior boxes represented as [[center_x, center_y, w, h]]. All the values
# are relative to the image size.
# """
# priors = []
# for spec in specs:
# scale = image_size / spec.shrinkage
# for j, i in itertools.product(range(spec.feature_map_size), repeat=2):
# x_center = (i + 0.5) / scale
# y_center = (j + 0.5) / scale
# # small sized square box
# size = spec.box_sizes.min
# h = w = size / image_size
# priors.append([
# x_center,
# y_center,
# w,
# h
# ])
# # big sized square box
# size = math.sqrt(spec.box_sizes.max * spec.box_sizes.min)
# h = w = size / image_size
# priors.append([
# x_center,
# y_center,
# w,
# h
# ])
# # change h/w ratio of the small sized box
# size = spec.box_sizes.min
# h = w = size / image_size
# for ratio in spec.aspect_ratios:
# ratio = math.sqrt(ratio)
# priors.append([
# x_center,
# y_center,
# w * ratio,
# h / ratio
# ])
# priors.append([
# x_center,
# y_center,
# w / ratio,
# h * ratio
# ])
# priors = torch.tensor(priors)
# print(priors.shape)
# if clamp:
# torch.clamp(priors, 0.0, 1.0, out=priors)
# return priors
def convert_locations_to_boxes(locations, priors, center_variance,
size_variance):
"""Convert regressional location results of SSD into boxes in the form of (center_x, center_y, h, w).
The conversion:
$$predicted\_center * center_variance = \frac {real\_center - prior\_center} {prior\_hw}$$
$$exp(predicted\_hw * size_variance) = \frac {real\_hw} {prior\_hw}$$
We do it in the inverse direction here.
Args:
locations (batch_size, num_priors, 4): the regression output of SSD. It will contain the outputs as well.
priors (num_priors, 4) or (batch_size/1, num_priors, 4): prior boxes.
center_variance: a float used to change the scale of center.
size_variance: a float used to change of scale of size.
Returns:
boxes: priors: [[center_x, center_y, h, w]]. All the values
are relative to the image size.
"""
# priors can have one dimension less.
if priors.dim() + 1 == locations.dim():
priors = priors.unsqueeze(0)
return torch.cat([
locations[..., :2] * center_variance * priors[..., 2:] + priors[..., :2],
torch.exp(locations[..., 2:] * size_variance) * priors[..., 2:]
], dim=locations.dim() - 1)
def convert_boxes_to_locations(center_form_boxes, center_form_priors, center_variance, size_variance):
# priors can have one dimension less
if center_form_priors.dim() + 1 == center_form_boxes.dim():
center_form_priors = center_form_priors.unsqueeze(0)
return torch.cat([
(center_form_boxes[..., :2] - center_form_priors[..., :2]) / center_form_priors[..., 2:] / center_variance,
torch.log(center_form_boxes[..., 2:] / center_form_priors[..., 2:]) / size_variance
], dim=center_form_boxes.dim() - 1)
def area_of(left_top, right_bottom) -> torch.Tensor:
"""Compute the areas of rectangles given two corners.
Args:
left_top (N, 2): left top corner.
right_bottom (N, 2): right bottom corner.
Returns:
area (N): return the area.
"""
hw = torch.clamp(right_bottom - left_top, min=0.0)
return hw[..., 0] * hw[..., 1]
def iou_of(boxes0, boxes1, eps=1e-5):
"""Return intersection-over-union (Jaccard index) of boxes.
Args:
boxes0 (N, 4): ground truth boxes.
boxes1 (N or 1, 4): predicted boxes.
eps: a small number to avoid 0 as denominator.
Returns:
iou (N): IoU values.
"""
overlap_left_top = torch.max(boxes0[..., :2], boxes1[..., :2])
overlap_right_bottom = torch.min(boxes0[..., 2:], boxes1[..., 2:])
overlap_area = area_of(overlap_left_top, overlap_right_bottom)
area0 = area_of(boxes0[..., :2], boxes0[..., 2:])
area1 = area_of(boxes1[..., :2], boxes1[..., 2:])
return overlap_area / (area0 + area1 - overlap_area + eps)
def assign_priors(gt_boxes, gt_labels, corner_form_priors,
iou_threshold):
"""Assign ground truth boxes and targets to priors.
Args:
gt_boxes (num_targets, 4): ground truth boxes.
gt_labels (num_targets): labels of targets.
priors (num_priors, 4): corner form priors
Returns:
boxes (num_priors, 4): real values for priors.
labels (num_priros): labels for priors.
"""
# size: num_priors x num_targets
ious = iou_of(gt_boxes.unsqueeze(0), corner_form_priors.unsqueeze(1))
# size: num_priors
best_target_per_prior, best_target_per_prior_index = ious.max(1)
# size: num_targets
best_prior_per_target, best_prior_per_target_index = ious.max(0)
for target_index, prior_index in enumerate(best_prior_per_target_index):
best_target_per_prior_index[prior_index] = target_index
# 2.0 is used to make sure every target has a prior assigned
best_target_per_prior.index_fill_(0, best_prior_per_target_index, 2)
# size: num_priors
labels = gt_labels[best_target_per_prior_index]
labels[best_target_per_prior < iou_threshold] = 0 # the backgournd id
boxes = gt_boxes[best_target_per_prior_index]
return boxes, labels
def hard_negative_mining(loss, labels, neg_pos_ratio):
"""
It used to suppress the presence of a large number of negative prediction.
It works on image level not batch level.
For any example/image, it keeps all the positive predictions and
cut the number of negative predictions to make sure the ratio
between the negative examples and positive examples is no more
the given ratio for an image.
Args:
loss (N, num_priors): the loss for each example.
labels (N, num_priors): the labels.
neg_pos_ratio: the ratio between the negative examples and positive examples.
"""
pos_mask = labels > 0
num_pos = pos_mask.long().sum(dim=1, keepdim=True)
num_neg = num_pos * neg_pos_ratio
loss[pos_mask] = -math.inf
_, indexes = loss.sort(dim=1, descending=True)
_, orders = indexes.sort(dim=1)
neg_mask = orders < num_neg
return pos_mask | neg_mask
def center_form_to_corner_form(locations):
return torch.cat([locations[..., :2] - locations[..., 2:]/2,
locations[..., :2] + locations[..., 2:]/2], locations.dim() - 1)
def corner_form_to_center_form(boxes):
return torch.cat([
(boxes[..., :2] + boxes[..., 2:]) / 2,
boxes[..., 2:] - boxes[..., :2]
], boxes.dim() - 1)
def hard_nms(box_scores, iou_threshold, top_k=-1, candidate_size=200):
"""
Args:
box_scores (N, 5): boxes in corner-form and probabilities.
iou_threshold: intersection over union threshold.
top_k: keep top_k results. If k <= 0, keep all the results.
candidate_size: only consider the candidates with the highest scores.
Returns:
picked: a list of indexes of the kept boxes
"""
scores = box_scores[:, -1]
boxes = box_scores[:, :-1]
picked = []
_, indexes = scores.sort(descending=True)
indexes = indexes[:candidate_size]
while len(indexes) > 0:
current = indexes[0]
picked.append(current.item())
if 0 < top_k == len(picked) or len(indexes) == 1:
break
current_box = boxes[current, :]
indexes = indexes[1:]
rest_boxes = boxes[indexes, :]
iou = iou_of(
rest_boxes,
current_box.unsqueeze(0),
)
indexes = indexes[iou <= iou_threshold]
return box_scores[picked, :]
def nms(box_scores, nms_method=None, score_threshold=None, iou_threshold=None,
sigma=0.5, top_k=-1, candidate_size=200):
if nms_method == "soft":
return soft_nms(box_scores, score_threshold, sigma, top_k)
else:
return hard_nms(box_scores, iou_threshold, top_k, candidate_size=candidate_size)
def soft_nms(box_scores, score_threshold, sigma=0.5, top_k=-1):
"""Soft NMS implementation.
References:
https://arxiv.org/abs/1704.04503
https://github.com/facebookresearch/Detectron/blob/master/detectron/utils/cython_nms.pyx
Args:
box_scores (N, 5): boxes in corner-form and probabilities.
score_threshold: boxes with scores less than value are not considered.
sigma: the parameter in score re-computation.
scores[i] = scores[i] * exp(-(iou_i)^2 / simga)
top_k: keep top_k results. If k <= 0, keep all the results.
Returns:
picked_box_scores (K, 5): results of NMS.
"""
picked_box_scores = []
while box_scores.size(0) > 0:
max_score_index = torch.argmax(box_scores[:, 4])
cur_box_prob = torch.tensor(box_scores[max_score_index, :])
picked_box_scores.append(cur_box_prob)
if len(picked_box_scores) == top_k > 0 or box_scores.size(0) == 1:
break
cur_box = cur_box_prob[:-1]
box_scores[max_score_index, :] = box_scores[-1, :]
box_scores = box_scores[:-1, :]
ious = iou_of(cur_box.unsqueeze(0), box_scores[:, :-1])
box_scores[:, -1] = box_scores[:, -1] * torch.exp(-(ious * ious) / sigma)
box_scores = box_scores[box_scores[:, -1] > score_threshold, :]
if len(picked_box_scores) > 0:
return torch.stack(picked_box_scores)
else:
return torch.tensor([])
\ No newline at end of file
import torch.nn as nn
import torch
import torch.nn.functional as F
class ScaledL2Norm(nn.Module):
def __init__(self, in_channels, initial_scale):
super(ScaledL2Norm, self).__init__()
self.in_channels = in_channels
self.scale = nn.Parameter(torch.Tensor(in_channels))
self.initial_scale = initial_scale
self.reset_parameters()
def forward(self, x):
return (F.normalize(x, p=2, dim=1)
* self.scale.unsqueeze(0).unsqueeze(2).unsqueeze(3))
def reset_parameters(self):
self.scale.data.fill_(self.initial_scale)
\ No newline at end of file
BACKGROUND
head
\ No newline at end of file
BACKGROUND
person
\ No newline at end of file
import torch.nn as nn
import torch.nn.functional as F
import torch
import numpy as np
from utils import box_processing as box_utils
class MultiboxLoss(nn.Module):
def __init__(self, priors, iou_threshold, neg_pos_ratio,
center_variance, size_variance, device):
"""Implement SSD Multibox Loss.
Basically, Multibox loss combines classification loss
and Smooth L1 regression loss.
"""
super(MultiboxLoss, self).__init__()
self.iou_threshold = iou_threshold
self.neg_pos_ratio = neg_pos_ratio
self.center_variance = center_variance
self.size_variance = size_variance
self.priors = priors
self.priors.to(device)
def forward(self, confidence, predicted_locations, labels, gt_locations):
"""Compute classification loss and smooth l1 loss.
Args:
confidence (batch_size, num_priors, num_classes): class predictions.
locations (batch_size, num_priors, 4): predicted locations.
labels (batch_size, num_priors): real labels of all the priors.
boxes (batch_size, num_priors, 4): real boxes corresponding all the priors.
"""
num_classes = confidence.size(2)
with torch.no_grad():
# derived from cross_entropy=sum(log(p))
loss = -F.log_softmax(confidence, dim=2)[:, :, 0]
mask = box_utils.hard_negative_mining(loss, labels, self.neg_pos_ratio)
confidence = confidence[mask, :]
classification_loss = F.cross_entropy(confidence.reshape(-1, num_classes), labels[mask], size_average=False)
pos_mask = labels > 0
predicted_locations = predicted_locations[pos_mask, :].reshape(-1, 4)
gt_locations = gt_locations[pos_mask, :].reshape(-1, 4)
smooth_l1_loss = F.smooth_l1_loss(predicted_locations, gt_locations, size_average=False)
num_pos = gt_locations.size(0)
return smooth_l1_loss/num_pos, classification_loss/num_pos
class FocalLoss(nn.Module):
def __init__(self, gamma = 2, alpha = 0.25):
"""
focusing is parameter that can adjust the rate at which easy
examples are down-weighted.
alpha may be set by inverse class frequency or treated as a hyper-param
If you don't want to balance factor, set alpha to 1
If you don't want to focusing factor, set gamma to 1
which is same as normal cross entropy loss
"""
super(FocalLoss, self).__init__()
self.gamma = gamma
self.alpha = alpha
def forward(self, conf_preds, loc_preds, conf_targets, loc_targets):
"""
Args:
predictions (tuple): (conf_preds, loc_preds)
conf_preds shape: [batch, n_anchors, num_cls]
loc_preds shape: [batch, n_anchors, 4]
targets (tensor): (conf_targets, loc_targets)
conf_targets shape: [batch, n_anchors]
loc_targets shape: [batch, n_anchors, 4]
"""
############### Confiden Loss part ###############
"""
#focal loss implementation(1)
pos_cls = conf_targets > -1 # exclude ignored anchors
mask = pos_cls.unsqueeze(2).expand_as(conf_preds)
conf_p = conf_preds[mask].view(-1, conf_preds.size(2)).clone()
conf_t = conf_targets[pos_cls].view(-1).clone()
p = F.softmax(conf_p, 1)
p = p.clamp(1e-7, 1. - 1e-7) # to avoid loss going to inf
c_mask = conf_p.data.new(conf_p.size(0), conf_p.size(1)).fill_(0)
c_mask = Variable(c_mask)
ids = conf_t.view(-1, 1)
c_mask.scatter_(1, ids, 1.)
p_t = (p*c_mask).sum(1).view(-1, 1)
p_t_log = p_t.log()
# This is focal loss presented in ther paper eq(5)
conf_loss = -self.alpha * ((1 - p_t)**self.gamma * p_t_log)
conf_loss = conf_loss.sum()
"""
# focal loss implementation(2)
pos_cls = conf_targets >-1
mask = pos_cls.unsqueeze(2).expand_as(conf_preds)
conf_p = conf_preds[mask].view(-1, conf_preds.size(2)).clone()
p_t_log = -F.cross_entropy(conf_p, conf_targets[pos_cls], reduction='sum')
p_t = torch.exp(p_t_log)
# This is focal loss presented in the paper eq(5)
conf_loss = -self.alpha * ((1 - p_t)**self.gamma * p_t_log)
############# Localization Loss part ##############
pos = conf_targets > 0 # ignore background
pos_idx = pos.unsqueeze(pos.dim()).expand_as(loc_preds)
loc_p = loc_preds[pos_idx].view(-1, 4)
loc_t = loc_targets[pos_idx].view(-1, 4)
loc_loss = F.smooth_l1_loss(loc_p, loc_t, reduction='sum')
num_pos = pos.long().sum(1, keepdim = True)
N = max(num_pos.data.sum(), 1) # to avoid divide by 0. It is caused by data augmentation when crop the images. The cropping can distort the boxes
conf_loss /= N # exclude number of background?
loc_loss /= N
return loc_loss, conf_loss
def one_hot(self, x, n):
y = torch.eye(n)
return y[x]
\ No newline at end of file
import time
import torch
def str2bool(s):
return s.lower() in ('true', '1')
class Timer:
def __init__(self):
self.clock = {}
def start(self, key="default"):
self.clock[key] = time.time()
def end(self, key="default"):
if key not in self.clock:
raise Exception(f"{key} is not in the clock.")
interval = time.time() - self.clock[key]
del self.clock[key]
return interval
def save_checkpoint(epoch, net_state_dict, optimizer_state_dict, best_score, checkpoint_path, model_path):
torch.save({
'epoch': epoch,
'model': net_state_dict,
'optimizer': optimizer_state_dict,
'best_score': best_score
}, checkpoint_path)
torch.save(net_state_dict, model_path)
def load_checkpoint(checkpoint_path):
return torch.load(checkpoint_path)
def freeze_net_layers(net):
for param in net.parameters():
param.requires_grad = False
def store_labels(path, labels):
with open(path, "w") as f:
f.write("\n".join(labels))
import torch
# from ..utils import box_utils
from utils import box_processing as box_utils
# from .data_preprocessing import PredictionTransform
from datasets.data_preprocessing import PredictionTransform
# from ..utils.misc import Timer
from utils.misc import Timer
class Predictor:
def __init__(self, net, size, mean=0.0, std=1.0, nms_method=None,
iou_threshold=0.3, filter_threshold=0.01, candidate_size=200, sigma=0.5, device=None):
self.net = net
self.transform = PredictionTransform(size, mean, std)
self.iou_threshold = iou_threshold
self.filter_threshold = filter_threshold
self.candidate_size = candidate_size
self.nms_method = nms_method
self.sigma = sigma
if device:
self.device = device
else:
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.net.to(self.device)
self.net.eval()
self.timer = Timer()
def predict(self, image, top_k=-1, prob_threshold=None):
cpu_device = torch.device("cpu")
height, width, _ = image.shape
image = self.transform(image)
images = image.unsqueeze(0)
images = images.to(self.device)
with torch.no_grad():
self.timer.start()
scores, boxes = self.net.forward(images)
# print("Inference time: ", self.timer.end())
boxes = boxes[0]
scores = scores[0]
if not prob_threshold:
prob_threshold = self.filter_threshold
# this version of nms is slower on GPU, so we move data to CPU.
boxes = boxes.to(cpu_device)
scores = scores.to(cpu_device)
picked_box_probs = []
picked_labels = []
for class_index in range(1, scores.size(1)):
probs = scores[:, class_index]
mask = probs > prob_threshold
probs = probs[mask]
if probs.size(0) == 0:
continue
subset_boxes = boxes[mask, :]
box_probs = torch.cat([subset_boxes, probs.reshape(-1, 1)], dim=1)
box_probs = box_utils.nms(box_probs, self.nms_method,
score_threshold=prob_threshold,
iou_threshold=self.iou_threshold,
sigma=self.sigma,
top_k=top_k,
candidate_size=self.candidate_size)
picked_box_probs.append(box_probs)
picked_labels.extend([class_index] * box_probs.size(0))
if not picked_box_probs:
return torch.tensor([]), torch.tensor([]), torch.tensor([])
picked_box_probs = torch.cat(picked_box_probs)
picked_box_probs[:, 0] *= width
picked_box_probs[:, 1] *= height
picked_box_probs[:, 2] *= width
picked_box_probs[:, 3] *= height
return picked_box_probs[:, :4], torch.tensor(picked_labels), picked_box_probs[:, 4]
\ No newline at end of file
import sys
sys.path.append('/media/ducanh/DATA/tienln/ai_camera/detector/')
from model.mb_ssd_lite_f19 import create_mb_ssd_lite_f19, create_mb_ssd_lite_f19_predictor
from model.mb_ssd_lite_f38 import create_mb_ssd_lite_f38, create_mb_ssd_lite_f38_predictor
from model.rfb_tiny_mb_ssd import create_rfb_tiny_mb_ssd, create_rfb_tiny_mb_ssd_predictor
import argparse
import torch
parser = argparse.ArgumentParser(description='Single Shot MultiBox Detector predictor With Pytorch')
parser.add_argument("--net_type", default="rfb_tiny_mb2_ssd", type=str,help='mb2-ssd-lite_f19, mb2-ssd-lite_f38, rfb_tiny_mb2_ssd')
parser.add_argument('--model_path', default = '/media/ducanh/DATA/tienln/ai_camera/tiny_ssd/models/train_model/Epoch-146-loss-1.42-val-1.9.pth',help='model weight')
parser.add_argument('--label_path', default = '/media/ducanh/DATA/tienln/ai_camera/detector/utils/labels/person.txt', help='class names lable')
args = parser.parse_args()
num_classes = len([name.strip() for name in open(args.label_path).readlines()])
if args.net_type == 'mb2-ssd-lite_f19':
net = create_mb_ssd_lite_f19(num_classes)
elif args.net_type == 'mb2-ssd-lite_f38':
net = create_mb_ssd_lite_f38_predictor(num_classes)
elif args.net_type == 'rfb_tiny_mb2_ssd':
net = create_rfb_tiny_mb_ssd(num_classes)
else:
print("unsupport network type.")
sys.exit(1)
net.load(args.model_path)
net.eval()
net.to("cuda")
model_name = args.model_path.split("/")[-1].split(".")[0]
model_path = f"app/person/{model_name}.onnx"
dummy_input = torch.randn(1, 3, 240, 320).to("cuda")
torch.onnx.export(net, dummy_input, model_path, verbose=False, input_names=['input'], output_names=['scores', 'boxes'])
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment