|
import numpy as np |
|
import cv2 |
|
from pathlib import Path |
|
import torch |
|
import time |
|
import torchvision |
|
import re |
|
import glob |
|
from torch.utils.data import Dataset |
|
import yaml |
|
import os |
|
from multiprocessing.pool import ThreadPool, Pool |
|
from tqdm import tqdm |
|
from itertools import repeat |
|
import logging |
|
from PIL import Image, ExifTags |
|
import hashlib |
|
import sys |
|
import pathlib |
|
CURRENT_DIR = pathlib.Path(__file__).parent |
|
sys.path.append(str(CURRENT_DIR)) |
|
|
|
IMG_FORMATS = ['bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff', 'dng', 'webp', 'mpo'] |
|
NUM_THREADS = min(8, os.cpu_count()) |
|
img_formats = IMG_FORMATS |
|
vid_formats = ['mov', 'avi', 'mp4', 'mpg', 'mpeg', 'm4v', 'wmv', 'mkv'] |
|
|
|
|
|
for orientation in ExifTags.TAGS.keys(): |
|
if ExifTags.TAGS[orientation] == 'Orientation': |
|
break |
|
|
|
|
|
def make_dirs(dir='./datasets/coco'): |
|
|
|
dir = Path(dir) |
|
for p in [dir / 'labels']: |
|
p.mkdir(parents=True, exist_ok=True) |
|
return dir |
|
|
|
|
|
def coco91_to_coco80_class(): |
|
|
|
x = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, None, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, None, 24, 25, None, |
|
None, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, None, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, |
|
51, 52, 53, 54, 55, 56, 57, 58, 59, None, 60, None, None, 61, None, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, |
|
None, 73, 74, 75, 76, 77, 78, 79, None] |
|
return x |
|
|
|
|
|
def is_ascii(s=""): |
|
|
|
s = str(s) |
|
return len(s.encode().decode("ascii", "ignore")) == len(s) |
|
|
|
|
|
def is_chinese(s="人工智能"): |
|
|
|
return re.search("[\u4e00-\u9fff]", s) |
|
|
|
|
|
def letterbox( |
|
im, |
|
new_shape=(640, 640), |
|
color=(114, 114, 114), |
|
auto=True, |
|
scaleFill=False, |
|
scaleup=True, |
|
stride=32, |
|
): |
|
|
|
shape = im.shape[:2] |
|
if isinstance(new_shape, int): |
|
new_shape = (new_shape, new_shape) |
|
|
|
|
|
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) |
|
if not scaleup: |
|
r = min(r, 1.0) |
|
|
|
|
|
ratio = r, r |
|
new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) |
|
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] |
|
if auto: |
|
dw, dh = np.mod(dw, stride), np.mod(dh, stride) |
|
elif scaleFill: |
|
dw, dh = 0.0, 0.0 |
|
new_unpad = (new_shape[1], new_shape[0]) |
|
ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] |
|
|
|
dw /= 2 |
|
dh /= 2 |
|
|
|
if shape[::-1] != new_unpad: |
|
im = cv2.resize(im, new_unpad, interpolation=cv2.INTER_LINEAR) |
|
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) |
|
left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) |
|
im = cv2.copyMakeBorder( |
|
im, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color |
|
) |
|
return im, ratio, (dw, dh) |
|
|
|
|
|
def xyxy2xywh(x): |
|
|
|
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) |
|
y[:, 0] = (x[:, 0] + x[:, 2]) / 2 |
|
y[:, 1] = (x[:, 1] + x[:, 3]) / 2 |
|
y[:, 2] = x[:, 2] - x[:, 0] |
|
y[:, 3] = x[:, 3] - x[:, 1] |
|
return y |
|
|
|
|
|
def xywh2xyxy(x): |
|
|
|
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) |
|
y[:, 0] = x[:, 0] - x[:, 2] / 2 |
|
y[:, 1] = x[:, 1] - x[:, 3] / 2 |
|
y[:, 2] = x[:, 0] + x[:, 2] / 2 |
|
y[:, 3] = x[:, 1] + x[:, 3] / 2 |
|
return y |
|
|
|
|
|
def non_max_suppression( |
|
prediction, |
|
conf_thres=0.25, |
|
iou_thres=0.45, |
|
classes=None, |
|
agnostic=False, |
|
multi_label=False, |
|
labels=(), |
|
max_det=300, |
|
): |
|
"""Runs Non-Maximum Suppression (NMS) on inference results |
|
|
|
Returns: |
|
list of detections, on (n,6) tensor per image [xyxy, conf, cls] |
|
""" |
|
|
|
nc = prediction.shape[2] - 5 |
|
xc = prediction[..., 4] > conf_thres |
|
|
|
|
|
assert ( |
|
0 <= conf_thres <= 1 |
|
), f"Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0" |
|
assert ( |
|
0 <= iou_thres <= 1 |
|
), f"Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0" |
|
|
|
|
|
min_wh, max_wh = 2, 4096 |
|
max_nms = 30000 |
|
time_limit = 10.0 |
|
redundant = True |
|
multi_label &= nc > 1 |
|
merge = False |
|
|
|
t = time.time() |
|
output = [torch.zeros((0, 6), device=prediction.device)] * prediction.shape[0] |
|
for xi, x in enumerate(prediction): |
|
|
|
|
|
x = x[xc[xi]] |
|
|
|
|
|
if labels and len(labels[xi]): |
|
l = labels[xi] |
|
v = torch.zeros((len(l), nc + 5), device=x.device) |
|
v[:, :4] = l[:, 1:5] |
|
v[:, 4] = 1.0 |
|
v[range(len(l)), l[:, 0].long() + 5] = 1.0 |
|
x = torch.cat((x, v), 0) |
|
|
|
|
|
if not x.shape[0]: |
|
continue |
|
|
|
|
|
x[:, 5:] *= x[:, 4:5] |
|
|
|
|
|
box = xywh2xyxy(x[:, :4]) |
|
|
|
|
|
if multi_label: |
|
i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T |
|
x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1) |
|
else: |
|
conf, j = x[:, 5:].max(1, keepdim=True) |
|
x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres] |
|
|
|
|
|
if classes is not None: |
|
x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] |
|
|
|
|
|
|
|
|
|
|
|
|
|
n = x.shape[0] |
|
if not n: |
|
continue |
|
elif n > max_nms: |
|
x = x[x[:, 4].argsort(descending=True)[:max_nms]] |
|
|
|
|
|
c = x[:, 5:6] * (0 if agnostic else max_wh) |
|
boxes, scores = x[:, :4] + c, x[:, 4] |
|
i = torchvision.ops.nms(boxes, scores, iou_thres) |
|
if i.shape[0] > max_det: |
|
i = i[:max_det] |
|
if merge and (1 < n < 3e3): |
|
|
|
iou = box_iou(boxes[i], boxes) > iou_thres |
|
weights = iou * scores[None] |
|
x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum( |
|
1, keepdim=True |
|
) |
|
if redundant: |
|
i = i[iou.sum(1) > 1] |
|
|
|
output[xi] = x[i] |
|
if (time.time() - t) > time_limit: |
|
print(f"WARNING: NMS time limit {time_limit}s exceeded") |
|
break |
|
|
|
return output |
|
|
|
|
|
def clip_coords(boxes, shape): |
|
|
|
if isinstance(boxes, torch.Tensor): |
|
boxes[:, 0].clamp_(0, shape[1]) |
|
boxes[:, 1].clamp_(0, shape[0]) |
|
boxes[:, 2].clamp_(0, shape[1]) |
|
boxes[:, 3].clamp_(0, shape[0]) |
|
else: |
|
boxes[:, [0, 2]] = boxes[:, [0, 2]].clip(0, shape[1]) |
|
boxes[:, [1, 3]] = boxes[:, [1, 3]].clip(0, shape[0]) |
|
|
|
|
|
def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None): |
|
|
|
if ratio_pad is None: |
|
gain = min( |
|
img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1] |
|
) |
|
pad = (img1_shape[1] - img0_shape[1] * gain) / 2, ( |
|
img1_shape[0] - img0_shape[0] * gain |
|
) / 2 |
|
else: |
|
gain = ratio_pad[0][0] |
|
pad = ratio_pad[1] |
|
|
|
coords[:, [0, 2]] -= pad[0] |
|
coords[:, [1, 3]] -= pad[1] |
|
coords[:, :4] /= gain |
|
clip_coords(coords, img0_shape) |
|
return coords |
|
|
|
|
|
class Annotator: |
|
|
|
def __init__( |
|
self, |
|
im, |
|
line_width=None, |
|
font_size=None, |
|
font="Arial.ttf", |
|
pil=False, |
|
example="abc", |
|
): |
|
assert ( |
|
im.data.contiguous |
|
), "Image not contiguous. Apply np.ascontiguousarray(im) to Annotator() input images." |
|
self.pil = pil or not is_ascii(example) or is_chinese(example) |
|
self.im = im |
|
self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) |
|
|
|
def box_label( |
|
self, box, label="", color=(128, 128, 128), txt_color=(255, 255, 255) |
|
): |
|
|
|
p1, p2 = (int(box[0]), int(box[1])), (int(box[2]), int(box[3])) |
|
cv2.rectangle( |
|
self.im, p1, p2, color, thickness=self.lw, lineType=cv2.LINE_AA |
|
) |
|
if label: |
|
tf = max(self.lw - 1, 1) |
|
w, h = cv2.getTextSize(label, 0, fontScale=self.lw / 3, thickness=tf)[ |
|
0 |
|
] |
|
outside = p1[1] - h - 3 >= 0 |
|
p2 = p1[0] + w, p1[1] - h - 3 if outside else p1[1] + h + 3 |
|
cv2.rectangle(self.im, p1, p2, color, -1, cv2.LINE_AA) |
|
cv2.putText( |
|
self.im, |
|
label, |
|
(p1[0], p1[1] - 2 if outside else p1[1] + h + 2), |
|
0, |
|
self.lw / 3, |
|
txt_color, |
|
thickness=tf, |
|
lineType=cv2.LINE_AA, |
|
) |
|
|
|
def rectangle(self, xy, fill=None, outline=None, width=1): |
|
|
|
self.draw.rectangle(xy, fill, outline, width) |
|
|
|
def result(self): |
|
|
|
return np.asarray(self.im) |
|
|
|
|
|
class Colors: |
|
|
|
def __init__(self): |
|
|
|
hex = ( |
|
"FF3838", |
|
"FF9D97", |
|
"FF701F", |
|
"FFB21D", |
|
"CFD231", |
|
"48F90A", |
|
"92CC17", |
|
"3DDB86", |
|
"1A9334", |
|
"00D4BB", |
|
"2C99A8", |
|
"00C2FF", |
|
"344593", |
|
"6473FF", |
|
"0018EC", |
|
"8438FF", |
|
"520085", |
|
"CB38FF", |
|
"FF95C8", |
|
"FF37C7", |
|
) |
|
self.palette = [self.hex2rgb("#" + c) for c in hex] |
|
self.n = len(self.palette) |
|
|
|
def __call__(self, i, bgr=False): |
|
c = self.palette[int(i) % self.n] |
|
return (c[2], c[1], c[0]) if bgr else c |
|
|
|
@staticmethod |
|
def hex2rgb(h): |
|
return tuple(int(h[1 + i : 1 + i + 2], 16) for i in (0, 2, 4)) |
|
|
|
|
|
def create_dataloader(path, imgsz, batch_size, stride, single_cls=False, hyp=None, augment=False, cache=False, pad=0.0, |
|
rect=False, rank=-1, workers=8, image_weights=False, quad=False, prefix=''): |
|
|
|
dataset = LoadImagesAndLabels(path, imgsz, batch_size, |
|
augment=augment, |
|
hyp=hyp, |
|
rect=rect, |
|
cache_images=cache, |
|
single_cls=single_cls, |
|
stride=int(stride), |
|
pad=pad, |
|
image_weights=image_weights, |
|
prefix=prefix) |
|
|
|
batch_size = min(batch_size, len(dataset)) |
|
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, workers]) |
|
sampler = torch.utils.data.distributed.DistributedSampler(dataset) if rank != -1 else None |
|
loader = torch.utils.data.DataLoader if image_weights else InfiniteDataLoader |
|
|
|
dataloader = loader(dataset, |
|
batch_size=batch_size, |
|
num_workers=nw, |
|
sampler=sampler, |
|
pin_memory=True, |
|
collate_fn=LoadImagesAndLabels.collate_fn) |
|
return dataloader, dataset |
|
|
|
|
|
class LoadImagesAndLabels(Dataset): |
|
|
|
cache_version = 0.5 |
|
|
|
def __init__(self, path, img_size=640, batch_size=16, augment=False, hyp=None, rect=False, image_weights=False, |
|
cache_images=False, single_cls=False, stride=32, pad=0.0, prefix=''): |
|
self.img_size = img_size |
|
self.augment = augment |
|
self.hyp = hyp |
|
self.image_weights = image_weights |
|
self.rect = False if image_weights else rect |
|
self.mosaic = False |
|
self.mosaic_border = [-img_size // 2, -img_size // 2] |
|
self.stride = stride |
|
self.path = path |
|
self.albumentations = None |
|
|
|
f = [] |
|
for p in path if isinstance(path, list) else [path]: |
|
p = Path(p) |
|
if p.is_dir(): |
|
f += glob.glob(str(p / '**' / '*.*'), recursive=True) |
|
|
|
elif p.is_file(): |
|
with open(p, 'r') as t: |
|
t = t.read().strip().splitlines() |
|
parent = str(p.parent) + os.sep |
|
f += [x.replace('./', parent) if x.startswith('./') else x for x in t] |
|
|
|
else: |
|
raise Exception(f'{prefix}{p} does not exist') |
|
self.img_files = sorted([x.replace('/', os.sep) for x in f if x.split('.')[-1].lower() in IMG_FORMATS]) |
|
|
|
assert self.img_files, f'{prefix}No images found' |
|
|
|
|
|
self.label_files = img2label_paths(self.img_files) |
|
cache_path = (p if p.is_file() else Path(self.label_files[0]).parent).with_suffix('.cache') |
|
try: |
|
cache, exists = np.load(cache_path, allow_pickle=True).item(), True |
|
assert cache['version'] == self.cache_version |
|
assert cache['hash'] == get_hash(self.label_files + self.img_files) |
|
except: |
|
cache, exists = self.cache_labels(cache_path, prefix), False |
|
|
|
|
|
nf, nm, ne, nc, n = cache.pop('results') |
|
if exists: |
|
d = f"Scanning '{cache_path}' images and labels... {nf} found, {nm} missing, {ne} empty, {nc} corrupted" |
|
tqdm(None, desc=prefix + d, total=n, initial=n) |
|
if cache['msgs']: |
|
logging.info('\n'.join(cache['msgs'])) |
|
|
|
|
|
[cache.pop(k) for k in ('hash', 'version', 'msgs')] |
|
labels, shapes, self.segments = zip(*cache.values()) |
|
self.labels = list(labels) |
|
self.shapes = np.array(shapes, dtype=np.float64) |
|
self.img_files = list(cache.keys()) |
|
self.label_files = img2label_paths(cache.keys()) |
|
if single_cls: |
|
for x in self.labels: |
|
x[:, 0] = 0 |
|
|
|
n = len(shapes) |
|
bi = np.floor(np.arange(n) / batch_size).astype(int) |
|
nb = bi[-1] + 1 |
|
self.batch = bi |
|
self.n = n |
|
self.indices = range(n) |
|
|
|
|
|
if self.rect: |
|
|
|
s = self.shapes |
|
ar = s[:, 1] / s[:, 0] |
|
irect = ar.argsort() |
|
self.img_files = [self.img_files[i] for i in irect] |
|
self.label_files = [self.label_files[i] for i in irect] |
|
self.labels = [self.labels[i] for i in irect] |
|
self.shapes = s[irect] |
|
ar = ar[irect] |
|
|
|
|
|
shapes = [[1, 1]] * nb |
|
for i in range(nb): |
|
ari = ar[bi == i] |
|
mini, maxi = ari.min(), ari.max() |
|
if maxi < 1: |
|
shapes[i] = [maxi, 1] |
|
elif mini > 1: |
|
shapes[i] = [1, 1 / mini] |
|
|
|
self.batch_shapes = np.ceil(np.array(shapes) * img_size / stride + pad).astype(int) * stride |
|
|
|
|
|
self.imgs, self.img_npy = [None] * n, [None] * n |
|
if cache_images: |
|
if cache_images == 'disk': |
|
self.im_cache_dir = Path(Path(self.img_files[0]).parent.as_posix() + '_npy') |
|
self.img_npy = [self.im_cache_dir / Path(f).with_suffix('.npy').name for f in self.img_files] |
|
self.im_cache_dir.mkdir(parents=True, exist_ok=True) |
|
gb = 0 |
|
self.img_hw0, self.img_hw = [None] * n, [None] * n |
|
results = ThreadPool(NUM_THREADS).imap(lambda x: load_image(*x), zip(repeat(self), range(n))) |
|
pbar = tqdm(enumerate(results), total=n) |
|
for i, x in pbar: |
|
if cache_images == 'disk': |
|
if not self.img_npy[i].exists(): |
|
np.save(self.img_npy[i].as_posix(), x[0]) |
|
gb += self.img_npy[i].stat().st_size |
|
else: |
|
self.imgs[i], self.img_hw0[i], self.img_hw[i] = x |
|
gb += self.imgs[i].nbytes |
|
pbar.desc = f'{prefix}Caching images ({gb / 1E9:.1f}GB {cache_images})' |
|
pbar.close() |
|
|
|
def cache_labels(self, path=Path('./labels.cache'), prefix=''): |
|
|
|
x = {} |
|
nm, nf, ne, nc, msgs = 0, 0, 0, 0, [] |
|
desc = f"{prefix}Scanning '{path.parent / path.stem}' images and labels..." |
|
with Pool(NUM_THREADS) as pool: |
|
pbar = tqdm(pool.imap(verify_image_label, zip(self.img_files, self.label_files, repeat(prefix))), desc=desc, total=len(self.img_files)) |
|
for im_file, l, shape, segments, nm_f, nf_f, ne_f, nc_f, msg in pbar: |
|
nm += nm_f |
|
nf += nf_f |
|
ne += ne_f |
|
nc += nc_f |
|
if im_file: |
|
x[im_file] = [l, shape, segments] |
|
if msg: |
|
msgs.append(msg) |
|
pbar.desc = f"{desc}{nf} found, {nm} missing, {ne} empty, {nc} corrupted" |
|
|
|
pbar.close() |
|
if msgs: |
|
logging.info('\n'.join(msgs)) |
|
x['hash'] = get_hash(self.label_files + self.img_files) |
|
x['results'] = nf, nm, ne, nc, len(self.img_files) |
|
x['msgs'] = msgs |
|
x['version'] = self.cache_version |
|
try: |
|
np.save(path, x) |
|
path.with_suffix('.cache.npy').rename(path) |
|
logging.info(f'{prefix}New cache created: {path}') |
|
except Exception as e: |
|
logging.info(f'{prefix}WARNING: Cache directory {path.parent} is not writeable: {e}') |
|
return x |
|
|
|
def __len__(self): |
|
return len(self.img_files) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __getitem__(self, index): |
|
index = self.indices[index] |
|
|
|
hyp = self.hyp |
|
mosaic = self.mosaic |
|
|
|
|
|
img, (h0, w0), (h, w) = load_image(self, index) |
|
|
|
|
|
shape = self.batch_shapes[self.batch[index]] if self.rect else self.img_size |
|
img, ratio, pad = letterbox(img, shape, auto=False, scaleup=self.augment) |
|
shapes = (h0, w0), ((h / h0, w / w0), pad) |
|
|
|
labels = self.labels[index].copy() |
|
if labels.size: |
|
labels[:, 1:] = xywhn2xyxy(labels[:, 1:], ratio[0] * w, ratio[1] * h, padw=pad[0], padh=pad[1]) |
|
|
|
nl = len(labels) |
|
if nl: |
|
labels[:, 1:5] = xyxy2xywhn(labels[:, 1:5], w=img.shape[1], h=img.shape[0], clip=True, eps=1E-3) |
|
|
|
labels_out = torch.zeros((nl, 6)) |
|
if nl: |
|
labels_out[:, 1:] = torch.from_numpy(labels) |
|
|
|
|
|
img = img.transpose((2, 0, 1))[::-1] |
|
img = np.ascontiguousarray(img) |
|
|
|
return torch.from_numpy(img), labels_out, self.img_files[index], shapes |
|
|
|
@staticmethod |
|
def collate_fn(batch): |
|
img, label, path, shapes = zip(*batch) |
|
for i, l in enumerate(label): |
|
l[:, 0] = i |
|
return torch.stack(img, 0), torch.cat(label, 0), path, shapes |
|
|
|
|
|
def coco80_to_coco91_class(): |
|
|
|
|
|
|
|
|
|
|
|
x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 27, 28, 31, 32, 33, 34, |
|
35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, |
|
64, 65, 67, 70, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 84, 85, 86, 87, 88, 89, 90] |
|
return x |
|
|
|
|
|
def check_dataset(data, autodownload=True): |
|
|
|
|
|
|
|
|
|
extract_dir = '' |
|
|
|
|
|
if isinstance(data, (str, Path)): |
|
with open(data, errors='ignore') as f: |
|
data = yaml.safe_load(f) |
|
|
|
|
|
path = extract_dir or Path(data.get('path') or '') |
|
for k in 'train', 'val', 'test': |
|
if data.get(k): |
|
data[k] = str(path / data[k]) if isinstance(data[k], str) else [str(path / x) for x in data[k]] |
|
|
|
assert 'nc' in data, "Dataset 'nc' key missing." |
|
if 'names' not in data: |
|
data['names'] = [f'class{i}' for i in range(data['nc'])] |
|
train, val, test, s = [data.get(x) for x in ('train', 'val', 'test', 'download')] |
|
if val: |
|
val = [Path(x).resolve() for x in (val if isinstance(val, list) else [val])] |
|
if not all(x.exists() for x in val): |
|
print('\nWARNING: Dataset not found, nonexistent paths: %s' % [str(x) for x in val if not x.exists()]) |
|
|
|
return data |
|
|
|
|
|
def box_iou(box1, box2): |
|
|
|
""" |
|
Return intersection-over-union (Jaccard index) of boxes. |
|
Both sets of boxes are expected to be in (x1, y1, x2, y2) format. |
|
Arguments: |
|
box1 (Tensor[N, 4]) |
|
box2 (Tensor[M, 4]) |
|
Returns: |
|
iou (Tensor[N, M]): the NxM matrix containing the pairwise |
|
IoU values for every element in boxes1 and boxes2 |
|
""" |
|
|
|
def box_area(box): |
|
|
|
return (box[2] - box[0]) * (box[3] - box[1]) |
|
|
|
area1 = box_area(box1.T) |
|
area2 = box_area(box2.T) |
|
|
|
|
|
inter = (torch.min(box1[:, None, 2:], box2[:, 2:]) - torch.max(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2) |
|
return inter / (area1[:, None] + area2 - inter) |
|
|
|
|
|
def increment_path(path, exist_ok=False, sep='', mkdir=False): |
|
|
|
path = Path(path) |
|
if path.exists() and not exist_ok: |
|
suffix = path.suffix |
|
path = path.with_suffix('') |
|
dirs = glob.glob(f"{path}{sep}*") |
|
matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs] |
|
i = [int(m.groups()[0]) for m in matches if m] |
|
n = max(i) + 1 if i else 2 |
|
path = Path(f"{path}{sep}{n}{suffix}") |
|
dir = path if path.suffix == '' else path.parent |
|
if not dir.exists() and mkdir: |
|
dir.mkdir(parents=True, exist_ok=True) |
|
return path |
|
|
|
|
|
def colorstr(*input): |
|
|
|
*args, string = input if len(input) > 1 else ('blue', 'bold', input[0]) |
|
colors = {'black': '\033[30m', |
|
'red': '\033[31m', |
|
'green': '\033[32m', |
|
'yellow': '\033[33m', |
|
'blue': '\033[34m', |
|
'magenta': '\033[35m', |
|
'cyan': '\033[36m', |
|
'white': '\033[37m', |
|
'bright_black': '\033[90m', |
|
'bright_red': '\033[91m', |
|
'bright_green': '\033[92m', |
|
'bright_yellow': '\033[93m', |
|
'bright_blue': '\033[94m', |
|
'bright_magenta': '\033[95m', |
|
'bright_cyan': '\033[96m', |
|
'bright_white': '\033[97m', |
|
'end': '\033[0m', |
|
'bold': '\033[1m', |
|
'underline': '\033[4m'} |
|
return ''.join(colors[x] for x in args) + f'{string}' + colors['end'] |
|
|
|
|
|
def ap_per_class(tp, conf, pred_cls, target_cls, plot=False, save_dir='.', names=()): |
|
""" Compute the average precision, given the recall and precision curves. |
|
Source: https://github.com/rafaelpadilla/Object-Detection-Metrics. |
|
# Arguments |
|
tp: True positives (nparray, nx1 or nx10). |
|
conf: Objectness value from 0-1 (nparray). |
|
pred_cls: Predicted object classes (nparray). |
|
target_cls: True object classes (nparray). |
|
plot: Plot precision-recall curve at [email protected] |
|
save_dir: Plot save directory |
|
# Returns |
|
The average precision as computed in py-faster-rcnn. |
|
""" |
|
|
|
|
|
i = np.argsort(-conf) |
|
tp, conf, pred_cls = tp[i], conf[i], pred_cls[i] |
|
|
|
|
|
unique_classes = np.unique(target_cls) |
|
nc = unique_classes.shape[0] |
|
|
|
|
|
px, py = np.linspace(0, 1, 1000), [] |
|
ap, p, r = np.zeros((nc, tp.shape[1])), np.zeros((nc, 1000)), np.zeros((nc, 1000)) |
|
for ci, c in enumerate(unique_classes): |
|
i = pred_cls == c |
|
n_l = (target_cls == c).sum() |
|
n_p = i.sum() |
|
|
|
if n_p == 0 or n_l == 0: |
|
continue |
|
else: |
|
|
|
fpc = (1 - tp[i]).cumsum(0) |
|
tpc = tp[i].cumsum(0) |
|
|
|
|
|
recall = tpc / (n_l + 1e-16) |
|
r[ci] = np.interp(-px, -conf[i], recall[:, 0], left=0) |
|
|
|
|
|
precision = tpc / (tpc + fpc) |
|
p[ci] = np.interp(-px, -conf[i], precision[:, 0], left=1) |
|
|
|
|
|
for j in range(tp.shape[1]): |
|
ap[ci, j], mpre, mrec = compute_ap(recall[:, j], precision[:, j]) |
|
if plot and j == 0: |
|
py.append(np.interp(px, mrec, mpre)) |
|
|
|
|
|
f1 = 2 * p * r / (p + r + 1e-16) |
|
|
|
i = f1.mean(0).argmax() |
|
return p[:, i], r[:, i], ap, f1[:, i], unique_classes.astype('int32') |
|
|
|
|
|
def compute_ap(recall, precision): |
|
""" Compute the average precision, given the recall and precision curves |
|
# Arguments |
|
recall: The recall curve (list) |
|
precision: The precision curve (list) |
|
# Returns |
|
Average precision, precision curve, recall curve |
|
""" |
|
|
|
|
|
mrec = np.concatenate(([0.0], recall, [1.0])) |
|
mpre = np.concatenate(([1.0], precision, [0.0])) |
|
|
|
|
|
mpre = np.flip(np.maximum.accumulate(np.flip(mpre))) |
|
|
|
|
|
method = 'interp' |
|
if method == 'interp': |
|
x = np.linspace(0, 1, 101) |
|
ap = np.trapz(np.interp(x, mrec, mpre), x) |
|
else: |
|
i = np.where(mrec[1:] != mrec[:-1])[0] |
|
ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1]) |
|
|
|
return ap, mpre, mrec |
|
|
|
|
|
def output_to_target(output): |
|
|
|
targets = [] |
|
for i, o in enumerate(output): |
|
for *box, conf, cls in o.cpu().numpy(): |
|
targets.append([i, cls, *list(*xyxy2xywh(np.array(box)[None])), conf]) |
|
return np.array(targets) |
|
|
|
|
|
def check_yaml(file, suffix=('.yaml', '.yml')): |
|
|
|
return check_file(file, suffix) |
|
|
|
|
|
def check_file(file, suffix=''): |
|
|
|
check_suffix(file, suffix) |
|
file = str(file) |
|
return file |
|
|
|
|
|
def check_suffix(file='yolov5s.pt', suffix=('.pt',), msg=''): |
|
|
|
if file and suffix: |
|
if isinstance(suffix, str): |
|
suffix = [suffix] |
|
for f in file if isinstance(file, (list, tuple)) else [file]: |
|
assert Path(f).suffix.lower() in suffix, f"{msg}{f} acceptable suffix is {suffix}" |
|
|
|
|
|
def img2label_paths(img_paths): |
|
|
|
sa, sb = os.sep + 'images' + os.sep, os.sep + 'labels' + os.sep |
|
return [sb.join(x.rsplit(sa, 1)).rsplit('.', 1)[0] + '.txt' for x in img_paths] |
|
|
|
|
|
def exif_size(img): |
|
|
|
s = img.size |
|
try: |
|
rotation = dict(img._getexif().items())[orientation] |
|
if rotation == 6: |
|
s = (s[1], s[0]) |
|
elif rotation == 8: |
|
s = (s[1], s[0]) |
|
except: |
|
pass |
|
|
|
return s |
|
|
|
|
|
def verify_image_label(args): |
|
|
|
im_file, lb_file, prefix = args |
|
nm, nf, ne, nc, msg, segments = 0, 0, 0, 0, '', [] |
|
try: |
|
|
|
im = Image.open(im_file) |
|
im.verify() |
|
shape = exif_size(im) |
|
assert (shape[0] > 9) & (shape[1] > 9), f'image size {shape} <10 pixels' |
|
assert im.format.lower() in IMG_FORMATS, f'invalid image format {im.format}' |
|
if im.format.lower() in ('jpg', 'jpeg'): |
|
with open(im_file, 'rb') as f: |
|
f.seek(-2, 2) |
|
if f.read() != b'\xff\xd9': |
|
Image.open(im_file).save(im_file, format='JPEG', subsampling=0, quality=100) |
|
msg = f'{prefix}WARNING: corrupt JPEG restored and saved {im_file}' |
|
|
|
|
|
if os.path.isfile(lb_file): |
|
nf = 1 |
|
with open(lb_file, 'r') as f: |
|
l = [x.split() for x in f.read().strip().splitlines() if len(x)] |
|
if any([len(x) > 8 for x in l]): |
|
classes = np.array([x[0] for x in l], dtype=np.float32) |
|
segments = [np.array(x[1:], dtype=np.float32).reshape(-1, 2) for x in l] |
|
l = np.concatenate((classes.reshape(-1, 1), segments2boxes(segments)), 1) |
|
l = np.array(l, dtype=np.float32) |
|
if len(l): |
|
assert l.shape[1] == 5, 'labels require 5 columns each' |
|
assert (l >= 0).all(), 'negative labels' |
|
assert (l[:, 1:] <= 1).all(), 'non-normalized or out of bounds coordinate labels' |
|
assert np.unique(l, axis=0).shape[0] == l.shape[0], 'duplicate labels' |
|
else: |
|
ne = 1 |
|
l = np.zeros((0, 5), dtype=np.float32) |
|
else: |
|
nm = 1 |
|
l = np.zeros((0, 5), dtype=np.float32) |
|
return im_file, l, shape, segments, nm, nf, ne, nc, msg |
|
except Exception as e: |
|
nc = 1 |
|
msg = f'{prefix}WARNING: Ignoring corrupted image and/or label {im_file}: {e}' |
|
return [None, None, None, None, nm, nf, ne, nc, msg] |
|
|
|
|
|
def segments2boxes(segments): |
|
|
|
boxes = [] |
|
for s in segments: |
|
x, y = s.T |
|
boxes.append([x.min(), y.min(), x.max(), y.max()]) |
|
return xyxy2xywh(np.array(boxes)) |
|
|
|
|
|
def get_hash(paths): |
|
|
|
size = sum(os.path.getsize(p) for p in paths if os.path.exists(p)) |
|
h = hashlib.md5(str(size).encode()) |
|
h.update(''.join(paths).encode()) |
|
return h.hexdigest() |
|
|
|
|
|
class InfiniteDataLoader(torch.utils.data.dataloader.DataLoader): |
|
""" Dataloader that reuses workers |
|
|
|
Uses same syntax as vanilla DataLoader |
|
""" |
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
object.__setattr__(self, 'batch_sampler', _RepeatSampler(self.batch_sampler)) |
|
self.iterator = super().__iter__() |
|
|
|
def __len__(self): |
|
return len(self.batch_sampler.sampler) |
|
|
|
def __iter__(self): |
|
for i in range(len(self)): |
|
yield next(self.iterator) |
|
|
|
|
|
class _RepeatSampler(object): |
|
""" Sampler that repeats forever |
|
|
|
Args: |
|
sampler (Sampler) |
|
""" |
|
|
|
def __init__(self, sampler): |
|
self.sampler = sampler |
|
|
|
def __iter__(self): |
|
while True: |
|
yield from iter(self.sampler) |
|
|
|
|
|
def load_image(self, i): |
|
|
|
im = self.imgs[i] |
|
if im is None: |
|
npy = self.img_npy[i] |
|
if npy and npy.exists(): |
|
im = np.load(npy) |
|
else: |
|
path = self.img_files[i] |
|
im = cv2.imread(path) |
|
assert im is not None, 'Image Not Found ' + path |
|
h0, w0 = im.shape[:2] |
|
r = self.img_size / max(h0, w0) |
|
if r != 1: |
|
im = cv2.resize(im, (int(w0 * r), int(h0 * r)), |
|
interpolation=cv2.INTER_AREA if r < 1 and not self.augment else cv2.INTER_LINEAR) |
|
return im, (h0, w0), im.shape[:2] |
|
else: |
|
return self.imgs[i], self.img_hw0[i], self.img_hw[i] |
|
|
|
|
|
def xywhn2xyxy(x, w=640, h=640, padw=0, padh=0): |
|
|
|
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) |
|
y[:, 0] = w * (x[:, 0] - x[:, 2] / 2) + padw |
|
y[:, 1] = h * (x[:, 1] - x[:, 3] / 2) + padh |
|
y[:, 2] = w * (x[:, 0] + x[:, 2] / 2) + padw |
|
y[:, 3] = h * (x[:, 1] + x[:, 3] / 2) + padh |
|
return y |
|
|
|
|
|
def xyxy2xywhn(x, w=640, h=640, clip=False, eps=0.0): |
|
|
|
if clip: |
|
clip_coords(x, (h - eps, w - eps)) |
|
y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) |
|
y[:, 0] = ((x[:, 0] + x[:, 2]) / 2) / w |
|
y[:, 1] = ((x[:, 1] + x[:, 3]) / 2) / h |
|
y[:, 2] = (x[:, 2] - x[:, 0]) / w |
|
y[:, 3] = (x[:, 3] - x[:, 1]) / h |
|
return y |
|
|
|
|
|
def post_process(x): |
|
grid = np.load("./grid.npy", allow_pickle=True) |
|
anchor_grid = np.load("./anchor_grid.npy", allow_pickle=True) |
|
x = list(x) |
|
z = [] |
|
stride = [8, 16, 32] |
|
for i in range(3): |
|
bs, _, ny, nx = x[i].shape |
|
x[i] = ( |
|
torch.tensor(x[i]) |
|
.view(bs, 3, 85, ny, nx) |
|
.permute(0, 1, 3, 4, 2) |
|
.contiguous() |
|
) |
|
y = x[i].sigmoid() |
|
xy = (y[..., 0:2] * 2.0 - 0.5 + grid[i]) * stride[i] |
|
wh = (y[..., 2:4] * 2) ** 2 * anchor_grid[i] |
|
y = torch.cat((xy, wh, y[..., 4:]), -1) |
|
z.append(y.view(bs, -1, 85)) |
|
|
|
return (torch.cat(z, 1), x) |