|
|
|
|
|
|
|
import io |
|
import sys |
|
import cv2 |
|
import json |
|
import time |
|
import pathlib |
|
import argparse |
|
import tempfile |
|
import itertools |
|
import contextlib |
|
import torch |
|
import torchvision |
|
import numpy as np |
|
import onnxruntime as ort |
|
from tqdm import tqdm |
|
from loguru import logger |
|
from tabulate import tabulate |
|
from collections import defaultdict |
|
from pycocotools.cocoeval import COCOeval |
|
|
|
CURRENT_DIR = pathlib.Path(__file__).parent |
|
sys.path.append(str(CURRENT_DIR)) |
|
|
|
from coco import COCO_CLASSES |
|
|
|
|
|
class COCOEvaluator: |
|
""" |
|
COCO AP Evaluation class. All the data in the val2017 dataset are processed |
|
and evaluated by COCO API. |
|
""" |
|
|
|
def __init__( |
|
self, |
|
dataloader, |
|
img_size: int, |
|
confthre: float, |
|
nmsthre: float, |
|
num_classes: int, |
|
testdev: bool = False, |
|
per_class_AP: bool = False, |
|
per_class_AR: bool = False, |
|
): |
|
""" |
|
Args: |
|
dataloader (Dataloader): evaluate dataloader. |
|
img_size: image size after preprocess. images are resized |
|
to squares whose shape is (img_size, img_size). |
|
confthre: confidence threshold ranging from 0 to 1, which |
|
is defined in the config file. |
|
nmsthre: IoU threshold of non-max supression ranging from 0 to 1. |
|
num_classes: number of all classes of interest. |
|
testdev: whether run on the testdev set of COCO. |
|
per_class_AP: Show per class AP during evalution or not. Default to False. |
|
per_class_AR: Show per class AR during evalution or not. Default to False. |
|
""" |
|
self.dataloader = dataloader |
|
self.img_size = img_size |
|
self.confthre = confthre |
|
self.nmsthre = nmsthre |
|
self.num_classes = num_classes |
|
self.testdev = testdev |
|
self.per_class_AP = per_class_AP |
|
self.per_class_AR = per_class_AR |
|
|
|
def evaluate(self, ort_sess, return_outputs=False): |
|
""" |
|
COCO average precision (AP) Evaluation. Iterate inference on the test dataset |
|
and the results are evaluated by COCO API. |
|
|
|
NOTE: This function will change training mode to False, please save states if needed. |
|
|
|
Args: |
|
ort_sess (onnxruntime.InferenceSession): onnxruntime session to evaluate. |
|
return_outputs (bool): flag indicates whether return image-wise result or not |
|
|
|
Returns: |
|
eval_results (tuple): summary of metrics for evaluation |
|
output_data (defaultdict): image-wise result |
|
""" |
|
data_list = [] |
|
output_data = defaultdict() |
|
inference_time = 0 |
|
nms_time = 0 |
|
n_samples = max(len(self.dataloader) - 1, 1) |
|
input_name = ort_sess.get_inputs()[0].name |
|
for cur_iter, (imgs, _, info_imgs, ids) in enumerate(tqdm(self.dataloader)): |
|
|
|
|
|
is_time_record = cur_iter < len(self.dataloader) - 1 |
|
if is_time_record: |
|
start = time.time() |
|
|
|
outputs = ort_sess.run(None, {input_name: np.transpose(imgs.numpy(), (0, 2, 3, 1))}) |
|
outputs = [np.transpose(out, (0, 3, 1, 2)) for out in outputs] |
|
outputs = [torch.Tensor(out) for out in outputs] |
|
outputs = head_postprocess(outputs) |
|
if is_time_record: |
|
infer_end = time.time() |
|
inference_time += infer_end - start |
|
outputs = postprocess(outputs, self.num_classes, self.confthre, self.nmsthre) |
|
if is_time_record: |
|
nms_end = time.time() |
|
nms_time += nms_end - infer_end |
|
data_list_elem, image_wise_data = self.convert_to_coco_format( |
|
outputs, info_imgs, ids, return_outputs=True) |
|
data_list.extend(data_list_elem) |
|
output_data.update(image_wise_data) |
|
statistics = [inference_time, nms_time, n_samples] |
|
eval_results = self.evaluate_prediction(data_list, statistics) |
|
if return_outputs: |
|
return eval_results, output_data |
|
return eval_results |
|
|
|
def convert_to_coco_format(self, outputs, info_imgs, ids, return_outputs=False): |
|
data_list = [] |
|
image_wise_data = defaultdict(dict) |
|
for (output, img_h, img_w, img_id) in zip( |
|
outputs, info_imgs[0], info_imgs[1], ids |
|
): |
|
if output is None: |
|
continue |
|
output = output.cpu() |
|
bboxes = output[:, 0:4] |
|
|
|
scale = min( |
|
self.img_size[0] / float(img_h), self.img_size[1] / float(img_w) |
|
) |
|
bboxes /= scale |
|
cls = output[:, 6] |
|
scores = output[:, 4] * output[:, 5] |
|
image_wise_data.update({ |
|
int(img_id): { |
|
"bboxes": [box.numpy().tolist() for box in bboxes], |
|
"scores": [score.numpy().item() for score in scores], |
|
"categories": [ |
|
self.dataloader.dataset.class_ids[int(cls[ind])] |
|
for ind in range(bboxes.shape[0]) |
|
], |
|
} |
|
}) |
|
bboxes = xyxy2xywh(bboxes) |
|
for ind in range(bboxes.shape[0]): |
|
label = self.dataloader.dataset.class_ids[int(cls[ind])] |
|
pred_data = { |
|
"image_id": int(img_id), |
|
"category_id": label, |
|
"bbox": bboxes[ind].numpy().tolist(), |
|
"score": scores[ind].numpy().item(), |
|
"segmentation": [], |
|
} |
|
data_list.append(pred_data) |
|
if return_outputs: |
|
return data_list, image_wise_data |
|
return data_list |
|
|
|
def evaluate_prediction(self, data_dict, statistics): |
|
|
|
|
|
logger.info("Evaluate in main process...") |
|
annType = ["segm", "bbox", "keypoints"] |
|
inference_time = statistics[0] |
|
nms_time = statistics[1] |
|
n_samples = statistics[2] |
|
a_infer_time = 1000 * inference_time / (n_samples * self.dataloader.batch_size) |
|
a_nms_time = 1000 * nms_time / (n_samples * self.dataloader.batch_size) |
|
time_info = ", ".join( |
|
[ |
|
"Average {} time: {:.2f} ms".format(k, v) |
|
for k, v in zip( |
|
["forward", "NMS", "inference"], |
|
[a_infer_time, a_nms_time, (a_infer_time + a_nms_time)], |
|
) |
|
] |
|
) |
|
info = time_info + "\n" |
|
|
|
if len(data_dict) > 0: |
|
cocoGt = self.dataloader.dataset.coco |
|
if self.testdev: |
|
json.dump(data_dict, open("./yolox_testdev_2017.json", "w")) |
|
cocoDt = cocoGt.loadRes("./yolox_testdev_2017.json") |
|
else: |
|
_, tmp = tempfile.mkstemp() |
|
json.dump(data_dict, open(tmp, "w")) |
|
cocoDt = cocoGt.loadRes(tmp) |
|
logger.info("Use standard COCOeval.") |
|
cocoEval = COCOeval(cocoGt, cocoDt, annType[1]) |
|
cocoEval.evaluate() |
|
cocoEval.accumulate() |
|
redirect_string = io.StringIO() |
|
with contextlib.redirect_stdout(redirect_string): |
|
cocoEval.summarize() |
|
info += redirect_string.getvalue() |
|
cat_ids = list(cocoGt.cats.keys()) |
|
cat_names = [cocoGt.cats[catId]['name'] for catId in sorted(cat_ids)] |
|
if self.per_class_AP: |
|
AP_table = per_class_AP_table(cocoEval, class_names=cat_names) |
|
info += "per class AP:\n" + AP_table + "\n" |
|
if self.per_class_AR: |
|
AR_table = per_class_AR_table(cocoEval, class_names=cat_names) |
|
info += "per class AR:\n" + AR_table + "\n" |
|
return cocoEval.stats[0], cocoEval.stats[1], info |
|
else: |
|
return 0, 0, info |
|
|
|
|
|
class ValTransform: |
|
""" |
|
Defines the transformations that should be applied to test PIL image |
|
for input into the network |
|
""" |
|
|
|
def __init__(self, swap=(2, 0, 1), legacy=False): |
|
self.swap = swap |
|
self.legacy = legacy |
|
|
|
|
|
def __call__(self, img, res, input_size): |
|
img, _ = preproc(img, input_size, self.swap) |
|
if self.legacy: |
|
img = img[::-1, :, :].copy() |
|
img /= 255.0 |
|
img -= np.array([0.485, 0.456, 0.406]).reshape(3, 1, 1) |
|
img /= np.array([0.229, 0.224, 0.225]).reshape(3, 1, 1) |
|
return img, np.zeros((1, 5)) |
|
|
|
|
|
def preproc(img, input_size, swap=(2, 0, 1)): |
|
"""Preprocess function for preparing input for the network""" |
|
if len(img.shape) == 3: |
|
padded_img = np.ones((input_size[0], input_size[1], 3), dtype=np.uint8) * 114 |
|
else: |
|
padded_img = np.ones(input_size, dtype=np.uint8) * 114 |
|
r = min(input_size[0] / img.shape[0], input_size[1] / img.shape[1]) |
|
resized_img = cv2.resize( |
|
img, |
|
(int(img.shape[1] * r), int(img.shape[0] * r)), |
|
interpolation=cv2.INTER_LINEAR, |
|
).astype(np.uint8) |
|
padded_img[: int(img.shape[0] * r), : int(img.shape[1] * r)] = resized_img |
|
padded_img = padded_img.transpose(swap) |
|
padded_img = np.ascontiguousarray(padded_img, dtype=np.float32) |
|
return padded_img, r |
|
|
|
|
|
def postprocess(prediction, num_classes, conf_thre=0.7, nms_thre=0.45, class_agnostic=False): |
|
"""Post-processing part after the prediction heads with NMS""" |
|
box_corner = prediction.new(prediction.shape) |
|
box_corner[:, :, 0] = prediction[:, :, 0] - prediction[:, :, 2] / 2 |
|
box_corner[:, :, 1] = prediction[:, :, 1] - prediction[:, :, 3] / 2 |
|
box_corner[:, :, 2] = prediction[:, :, 0] + prediction[:, :, 2] / 2 |
|
box_corner[:, :, 3] = prediction[:, :, 1] + prediction[:, :, 3] / 2 |
|
prediction[:, :, :4] = box_corner[:, :, :4] |
|
output = [None for _ in range(len(prediction))] |
|
for i, image_pred in enumerate(prediction): |
|
|
|
if not image_pred.size(0): |
|
continue |
|
|
|
class_conf, class_pred = torch.max(image_pred[:, 5: 5 + num_classes], 1, keepdim=True) |
|
conf_mask = (image_pred[:, 4] * class_conf.squeeze() >= conf_thre).squeeze() |
|
|
|
detections = torch.cat((image_pred[:, :5], class_conf, class_pred.float()), 1) |
|
detections = detections[conf_mask] |
|
if not detections.size(0): |
|
continue |
|
if class_agnostic: |
|
nms_out_index = torchvision.ops.nms( |
|
detections[:, :4], |
|
detections[:, 4] * detections[:, 5], |
|
nms_thre, |
|
) |
|
else: |
|
nms_out_index = torchvision.ops.batched_nms( |
|
detections[:, :4], |
|
detections[:, 4] * detections[:, 5], |
|
detections[:, 6], |
|
nms_thre, |
|
) |
|
detections = detections[nms_out_index] |
|
if output[i] is None: |
|
output[i] = detections |
|
else: |
|
output[i] = torch.cat((output[i], detections)) |
|
return output |
|
|
|
|
|
def head_postprocess(outputs, strides=[8, 16, 32]): |
|
"""Decode outputs from predictions of the detection heads""" |
|
hw = [x.shape[-2:] for x in outputs] |
|
|
|
outputs = torch.cat([x.flatten(start_dim=2) for x in outputs], dim=2).permute(0, 2, 1) |
|
outputs[..., 4:] = outputs[..., 4:].sigmoid() |
|
return decode_outputs(outputs, outputs[0].type(), hw, strides) |
|
|
|
|
|
def decode_outputs(outputs, dtype, ori_hw, ori_strides): |
|
grids = [] |
|
strides = [] |
|
for (hsize, wsize), stride in zip(ori_hw, ori_strides): |
|
yv, xv = meshgrid([torch.arange(hsize), torch.arange(wsize)]) |
|
grid = torch.stack((xv, yv), 2).view(1, -1, 2) |
|
grids.append(grid) |
|
shape = grid.shape[:2] |
|
strides.append(torch.full((*shape, 1), stride)) |
|
grids = torch.cat(grids, dim=1).type(dtype) |
|
strides = torch.cat(strides, dim=1).type(dtype) |
|
outputs[..., :2] = (outputs[..., :2] + grids) * strides |
|
outputs[..., 2:4] = torch.exp(outputs[..., 2:4]) * strides |
|
return outputs |
|
|
|
|
|
def xyxy2xywh(bboxes): |
|
bboxes[:, 2] = bboxes[:, 2] - bboxes[:, 0] |
|
bboxes[:, 3] = bboxes[:, 3] - bboxes[:, 1] |
|
return bboxes |
|
|
|
|
|
def meshgrid(*tensors): |
|
_TORCH_VER = [int(x) for x in torch.__version__.split(".")[:2]] |
|
if _TORCH_VER >= [1, 10]: |
|
return torch.meshgrid(*tensors, indexing="ij") |
|
else: |
|
return torch.meshgrid(*tensors) |
|
|
|
|
|
def per_class_AR_table(coco_eval, class_names=COCO_CLASSES, headers=["class", "AR"], colums=6): |
|
"""Format the recall of each class""" |
|
per_class_AR = {} |
|
recalls = coco_eval.eval["recall"] |
|
|
|
|
|
assert len(class_names) == recalls.shape[1] |
|
for idx, name in enumerate(class_names): |
|
recall = recalls[:, idx, 0, -1] |
|
recall = recall[recall > -1] |
|
ar = np.mean(recall) if recall.size else float("nan") |
|
per_class_AR[name] = float(ar * 100) |
|
num_cols = min(colums, len(per_class_AR) * len(headers)) |
|
result_pair = [x for pair in per_class_AR.items() for x in pair] |
|
row_pair = itertools.zip_longest(*[result_pair[i::num_cols] for i in range(num_cols)]) |
|
table_headers = headers * (num_cols // len(headers)) |
|
table = tabulate( |
|
row_pair, tablefmt="pipe", floatfmt=".3f", headers=table_headers, numalign="left", |
|
) |
|
return table |
|
|
|
|
|
def per_class_AP_table(coco_eval, class_names=COCO_CLASSES, headers=["class", "AP"], colums=6): |
|
"""Format the precision of each class""" |
|
per_class_AP = {} |
|
precisions = coco_eval.eval["precision"] |
|
|
|
|
|
assert len(class_names) == precisions.shape[2] |
|
for idx, name in enumerate(class_names): |
|
|
|
|
|
precision = precisions[:, :, idx, 0, -1] |
|
precision = precision[precision > -1] |
|
ap = np.mean(precision) if precision.size else float("nan") |
|
per_class_AP[name] = float(ap * 100) |
|
num_cols = min(colums, len(per_class_AP) * len(headers)) |
|
result_pair = [x for pair in per_class_AP.items() for x in pair] |
|
row_pair = itertools.zip_longest(*[result_pair[i::num_cols] for i in range(num_cols)]) |
|
table_headers = headers * (num_cols // len(headers)) |
|
table = tabulate( |
|
row_pair, tablefmt="pipe", floatfmt=".3f", headers=table_headers, numalign="left", |
|
) |
|
return table |
|
|
|
|
|
def get_eval_loader(batch_size, test_size=(640, 640), data_dir='data/COCO', data_num_workers=0, testdev=False, legacy=False): |
|
from coco import COCODataset |
|
valdataset = COCODataset( |
|
data_dir=data_dir, |
|
json_file='instances_val2017.json' if not testdev else 'instances_test2017.json', |
|
name="val2017" if not testdev else "test2017", |
|
img_size=test_size, |
|
preproc=ValTransform(legacy=legacy), |
|
) |
|
sampler = torch.utils.data.SequentialSampler(valdataset) |
|
dataloader_kwargs = { |
|
"num_workers": data_num_workers, |
|
"pin_memory": True, |
|
"sampler": sampler, |
|
"batch_size": batch_size |
|
} |
|
val_loader = torch.utils.data.DataLoader(valdataset, **dataloader_kwargs) |
|
return val_loader |
|
|
|
|
|
def make_parser(): |
|
parser = argparse.ArgumentParser("onnxruntime inference sample") |
|
parser.add_argument( |
|
"-m", |
|
"--model", |
|
type=str, |
|
default="yolox-s-int8.onnx", |
|
help="Input your onnx model.", |
|
) |
|
parser.add_argument( |
|
"-b", |
|
"--batch_size", |
|
type=int, |
|
default=1, |
|
help="Batch size for inference..", |
|
) |
|
parser.add_argument( |
|
"--input_shape", |
|
type=str, |
|
default="640,640", |
|
help="Specify an input shape for inference.", |
|
) |
|
parser.add_argument( |
|
"--ipu", |
|
action="store_true", |
|
help="Use IPU for inference.", |
|
) |
|
parser.add_argument( |
|
"--provider_config", |
|
type=str, |
|
default="vaip_config.json", |
|
help="Path of the config file for setting provider_options.", |
|
) |
|
return parser |
|
|
|
|
|
if __name__ == '__main__': |
|
args = make_parser().parse_args() |
|
input_shape = tuple(map(int, args.input_shape.split(','))) |
|
if args.ipu: |
|
providers = ["VitisAIExecutionProvider"] |
|
provider_options = [{"config_file": args.provider_config}] |
|
else: |
|
providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] |
|
provider_options = None |
|
session = ort.InferenceSession(args.model, providers=providers, provider_options=provider_options) |
|
val_loader = get_eval_loader(args.batch_size) |
|
evaluator = COCOEvaluator(dataloader=val_loader, img_size=input_shape, confthre=0.01, nmsthre=0.65, num_classes=80, testdev=False) |
|
*_, summary = evaluator.evaluate(session) |
|
logger.info("\n" + summary) |
|
|