Spaces:
Runtime error
Runtime error
| import numpy as np | |
| import cv2 | |
| import pandas as pd | |
| import operator | |
| import matplotlib.pyplot as plt | |
| import os | |
| from sklearn.model_selection import train_test_split | |
| from tensorflow.keras.utils import Sequence | |
| from config import yolo_config | |
| def load_weights(model, weights_file_path): | |
| conv_layer_size = 110 | |
| conv_output_idxs = [93, 101, 109] | |
| with open(weights_file_path, 'rb') as file: | |
| major, minor, revision, seen, _ = np.fromfile(file, dtype=np.int32, count=5) | |
| bn_idx = 0 | |
| for conv_idx in range(conv_layer_size): | |
| conv_layer_name = f'conv2d_{conv_idx}' if conv_idx > 0 else 'conv2d' | |
| bn_layer_name = f'batch_normalization_{bn_idx}' if bn_idx > 0 else 'batch_normalization' | |
| conv_layer = model.get_layer(conv_layer_name) | |
| filters = conv_layer.filters | |
| kernel_size = conv_layer.kernel_size[0] | |
| input_dims = conv_layer.input_shape[-1] | |
| if conv_idx not in conv_output_idxs: | |
| # darknet bn layer weights: [beta, gamma, mean, variance] | |
| bn_weights = np.fromfile(file, dtype=np.float32, count=4 * filters) | |
| # tf bn layer weights: [gamma, beta, mean, variance] | |
| bn_weights = bn_weights.reshape((4, filters))[[1, 0, 2, 3]] | |
| bn_layer = model.get_layer(bn_layer_name) | |
| bn_idx += 1 | |
| else: | |
| conv_bias = np.fromfile(file, dtype=np.float32, count=filters) | |
| # darknet shape: (out_dim, input_dims, height, width) | |
| # tf shape: (height, width, input_dims, out_dim) | |
| conv_shape = (filters, input_dims, kernel_size, kernel_size) | |
| conv_weights = np.fromfile(file, dtype=np.float32, count=np.product(conv_shape)) | |
| conv_weights = conv_weights.reshape(conv_shape).transpose([2, 3, 1, 0]) | |
| if conv_idx not in conv_output_idxs: | |
| conv_layer.set_weights([conv_weights]) | |
| bn_layer.set_weights(bn_weights) | |
| else: | |
| conv_layer.set_weights([conv_weights, conv_bias]) | |
| if len(file.read()) == 0: | |
| print('all weights read') | |
| else: | |
| print(f'failed to read all weights, # of unread weights: {len(file.read())}') | |
| def get_detection_data(img, model_outputs, class_names): | |
| """ | |
| :param img: target raw image | |
| :param model_outputs: outputs from inference_model | |
| :param class_names: list of object class names | |
| :return: | |
| """ | |
| num_bboxes = model_outputs[-1][0] | |
| boxes, scores, classes = [output[0][:num_bboxes] for output in model_outputs[:-1]] | |
| h, w = img.shape[:2] | |
| df = pd.DataFrame(boxes, columns=['x1', 'y1', 'x2', 'y2']) | |
| df[['x1', 'x2']] = (df[['x1', 'x2']] * w).astype('int64') | |
| df[['y1', 'y2']] = (df[['y1', 'y2']] * h).astype('int64') | |
| df['class_name'] = np.array(class_names)[classes.astype('int64')] | |
| df['score'] = scores | |
| df['w'] = df['x2'] - df['x1'] | |
| df['h'] = df['y2'] - df['y1'] | |
| print(f'# of bboxes: {num_bboxes}') | |
| return df | |
| def read_annotation_lines(annotation_path, test_size=None, random_seed=5566): | |
| with open(annotation_path) as f: | |
| lines = f.readlines() | |
| if test_size: | |
| return train_test_split(lines, test_size=test_size, random_state=random_seed) | |
| else: | |
| return lines | |
| def draw_bbox(img, detections, cmap, random_color=True, figsize=(10, 10), show_img=True, show_text=True): | |
| """ | |
| Draw bounding boxes on the img. | |
| :param img: BGR img. | |
| :param detections: pandas DataFrame containing detections | |
| :param random_color: assign random color for each objects | |
| :param cmap: object colormap | |
| :param plot_img: if plot img with bboxes | |
| :return: None | |
| """ | |
| img = np.array(img) | |
| scale = max(img.shape[0:2]) / 416 | |
| line_width = int(2 * scale) | |
| for _, row in detections.iterrows(): | |
| x1, y1, x2, y2, cls, score, w, h = row.values | |
| color = list(np.random.random(size=3) * 255) if random_color else cmap[cls] | |
| cv2.rectangle(img, (x1, y1), (x2, y2), color, line_width) | |
| if show_text: | |
| text = f'{cls} {score:.2f}' | |
| font = cv2.FONT_HERSHEY_DUPLEX | |
| font_scale = max(0.3 * scale, 0.3) | |
| thickness = max(int(1 * scale), 1) | |
| (text_width, text_height) = cv2.getTextSize(text, font, fontScale=font_scale, thickness=thickness)[0] | |
| cv2.rectangle(img, (x1 - line_width//2, y1 - text_height), (x1 + text_width, y1), color, cv2.FILLED) | |
| cv2.putText(img, text, (x1, y1), font, font_scale, (255, 255, 255), thickness, cv2.LINE_AA) | |
| if show_img: | |
| plt.figure(figsize=figsize) | |
| plt.imshow(img) | |
| plt.show() | |
| return img | |
| class DataGenerator(Sequence): | |
| """ | |
| Generates data for Keras | |
| ref: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly | |
| """ | |
| def __init__(self, | |
| annotation_lines, | |
| class_name_path, | |
| folder_path, | |
| max_boxes=100, | |
| shuffle=True): | |
| self.annotation_lines = annotation_lines | |
| self.class_name_path = class_name_path | |
| self.num_classes = len([line.strip() for line in open(class_name_path).readlines()]) | |
| self.num_gpu = yolo_config['num_gpu'] | |
| self.batch_size = yolo_config['batch_size'] * self.num_gpu | |
| self.target_img_size = yolo_config['img_size'] | |
| self.anchors = np.array(yolo_config['anchors']).reshape((9, 2)) | |
| self.shuffle = shuffle | |
| self.indexes = np.arange(len(self.annotation_lines)) | |
| self.folder_path = folder_path | |
| self.max_boxes = max_boxes | |
| self.on_epoch_end() | |
| def __len__(self): | |
| 'number of batches per epoch' | |
| return int(np.ceil(len(self.annotation_lines) / self.batch_size)) | |
| def __getitem__(self, index): | |
| 'Generate one batch of data' | |
| # Generate indexes of the batch | |
| idxs = self.indexes[index * self.batch_size:(index + 1) * self.batch_size] | |
| # Find list of IDs | |
| lines = [self.annotation_lines[i] for i in idxs] | |
| # Generate data | |
| X, y_tensor, y_bbox = self.__data_generation(lines) | |
| return [X, *y_tensor, y_bbox], np.zeros(len(lines)) | |
| def on_epoch_end(self): | |
| 'Updates indexes after each epoch' | |
| if self.shuffle: | |
| np.random.shuffle(self.indexes) | |
| def __data_generation(self, annotation_lines): | |
| """ | |
| Generates data containing batch_size samples | |
| :param annotation_lines: | |
| :return: | |
| """ | |
| X = np.empty((len(annotation_lines), *self.target_img_size), dtype=np.float32) | |
| y_bbox = np.empty((len(annotation_lines), self.max_boxes, 5), dtype=np.float32) # x1y1x2y2 | |
| for i, line in enumerate(annotation_lines): | |
| img_data, box_data = self.get_data(line) | |
| X[i] = img_data | |
| y_bbox[i] = box_data | |
| y_tensor, y_true_boxes_xywh = preprocess_true_boxes(y_bbox, self.target_img_size[:2], self.anchors, self.num_classes) | |
| return X, y_tensor, y_true_boxes_xywh | |
| def get_data(self, annotation_line): | |
| line = annotation_line.split() | |
| img_path = line[0] | |
| img = cv2.imread(os.path.join(self.folder_path, img_path))[:, :, ::-1] | |
| ih, iw = img.shape[:2] | |
| h, w, c = self.target_img_size | |
| boxes = np.array([np.array(list(map(float, box.split(',')))) for box in line[1:]], dtype=np.float32) # x1y1x2y2 | |
| scale_w, scale_h = w / iw, h / ih | |
| img = cv2.resize(img, (w, h)) | |
| image_data = np.array(img) / 255. | |
| # correct boxes coordinates | |
| box_data = np.zeros((self.max_boxes, 5)) | |
| if len(boxes) > 0: | |
| np.random.shuffle(boxes) | |
| boxes = boxes[:self.max_boxes] | |
| boxes[:, [0, 2]] = boxes[:, [0, 2]] * scale_w # + dx | |
| boxes[:, [1, 3]] = boxes[:, [1, 3]] * scale_h # + dy | |
| box_data[:len(boxes)] = boxes | |
| return image_data, box_data | |
| def preprocess_true_boxes(true_boxes, input_shape, anchors, num_classes): | |
| '''Preprocess true boxes to training input format | |
| Parameters | |
| ---------- | |
| true_boxes: array, shape=(bs, max boxes per img, 5) | |
| Absolute x_min, y_min, x_max, y_max, class_id relative to input_shape. | |
| input_shape: array-like, hw, multiples of 32 | |
| anchors: array, shape=(N, 2), (9, wh) | |
| num_classes: int | |
| Returns | |
| ------- | |
| y_true: list of array, shape like yolo_outputs, xywh are reletive value | |
| ''' | |
| num_stages = 3 # default setting for yolo, tiny yolo will be 2 | |
| anchor_mask = [[0, 1, 2], [3, 4, 5], [6, 7, 8]] | |
| bbox_per_grid = 3 | |
| true_boxes = np.array(true_boxes, dtype='float32') | |
| true_boxes_abs = np.array(true_boxes, dtype='float32') | |
| input_shape = np.array(input_shape, dtype='int32') | |
| true_boxes_xy = (true_boxes_abs[..., 0:2] + true_boxes_abs[..., 2:4]) // 2 # (100, 2) | |
| true_boxes_wh = true_boxes_abs[..., 2:4] - true_boxes_abs[..., 0:2] # (100, 2) | |
| # Normalize x,y,w, h, relative to img size -> (0~1) | |
| true_boxes[..., 0:2] = true_boxes_xy/input_shape[::-1] # xy | |
| true_boxes[..., 2:4] = true_boxes_wh/input_shape[::-1] # wh | |
| bs = true_boxes.shape[0] | |
| grid_sizes = [input_shape//{0:8, 1:16, 2:32}[stage] for stage in range(num_stages)] | |
| y_true = [np.zeros((bs, | |
| grid_sizes[s][0], | |
| grid_sizes[s][1], | |
| bbox_per_grid, | |
| 5+num_classes), dtype='float32') | |
| for s in range(num_stages)] | |
| # [(?, 52, 52, 3, 5+num_classes) (?, 26, 26, 3, 5+num_classes) (?, 13, 13, 3, 5+num_classes) ] | |
| y_true_boxes_xywh = np.concatenate((true_boxes_xy, true_boxes_wh), axis=-1) | |
| # Expand dim to apply broadcasting. | |
| anchors = np.expand_dims(anchors, 0) # (1, 9 , 2) | |
| anchor_maxes = anchors / 2. # (1, 9 , 2) | |
| anchor_mins = -anchor_maxes # (1, 9 , 2) | |
| valid_mask = true_boxes_wh[..., 0] > 0 # (1, 100) | |
| for batch_idx in range(bs): | |
| # Discard zero rows. | |
| wh = true_boxes_wh[batch_idx, valid_mask[batch_idx]] # (# of bbox, 2) | |
| num_boxes = len(wh) | |
| if num_boxes == 0: continue | |
| wh = np.expand_dims(wh, -2) # (# of bbox, 1, 2) | |
| box_maxes = wh / 2. # (# of bbox, 1, 2) | |
| box_mins = -box_maxes # (# of bbox, 1, 2) | |
| # Compute IoU between each anchors and true boxes for responsibility assignment | |
| intersect_mins = np.maximum(box_mins, anchor_mins) # (# of bbox, 9, 2) | |
| intersect_maxes = np.minimum(box_maxes, anchor_maxes) | |
| intersect_wh = np.maximum(intersect_maxes - intersect_mins, 0.) | |
| intersect_area = np.prod(intersect_wh, axis=-1) # (9,) | |
| box_area = wh[..., 0] * wh[..., 1] # (# of bbox, 1) | |
| anchor_area = anchors[..., 0] * anchors[..., 1] # (1, 9) | |
| iou = intersect_area / (box_area + anchor_area - intersect_area) # (# of bbox, 9) | |
| # Find best anchor for each true box | |
| best_anchors = np.argmax(iou, axis=-1) # (# of bbox,) | |
| for box_idx in range(num_boxes): | |
| best_anchor = best_anchors[box_idx] | |
| for stage in range(num_stages): | |
| if best_anchor in anchor_mask[stage]: | |
| x_offset = true_boxes[batch_idx, box_idx, 0]*grid_sizes[stage][1] | |
| y_offset = true_boxes[batch_idx, box_idx, 1]*grid_sizes[stage][0] | |
| # Grid Index | |
| grid_col = np.floor(x_offset).astype('int32') | |
| grid_row = np.floor(y_offset).astype('int32') | |
| anchor_idx = anchor_mask[stage].index(best_anchor) | |
| class_idx = true_boxes[batch_idx, box_idx, 4].astype('int32') | |
| # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 0] = x_offset - grid_col # x | |
| # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 1] = y_offset - grid_row # y | |
| # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, :4] = true_boxes_abs[batch_idx, box_idx, :4] # abs xywh | |
| y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, :2] = true_boxes_xy[batch_idx, box_idx, :] # abs xy | |
| y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 2:4] = true_boxes_wh[batch_idx, box_idx, :] # abs wh | |
| y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 4] = 1 # confidence | |
| y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 5+class_idx] = 1 # one-hot encoding | |
| # smooth | |
| # onehot = np.zeros(num_classes, dtype=np.float) | |
| # onehot[class_idx] = 1.0 | |
| # uniform_distribution = np.full(num_classes, 1.0 / num_classes) | |
| # delta = 0.01 | |
| # smooth_onehot = onehot * (1 - delta) + delta * uniform_distribution | |
| # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 5:] = smooth_onehot | |
| return y_true, y_true_boxes_xywh | |
| """ | |
| Calculate the AP given the recall and precision array | |
| 1st) We compute a version of the measured precision/recall curve with | |
| precision monotonically decreasing | |
| 2nd) We compute the AP as the area under this curve by numerical integration. | |
| """ | |
| def voc_ap(rec, prec): | |
| """ | |
| --- Official matlab code VOC2012--- | |
| mrec=[0 ; rec ; 1]; | |
| mpre=[0 ; prec ; 0]; | |
| for i=numel(mpre)-1:-1:1 | |
| mpre(i)=max(mpre(i),mpre(i+1)); | |
| end | |
| i=find(mrec(2:end)~=mrec(1:end-1))+1; | |
| ap=sum((mrec(i)-mrec(i-1)).*mpre(i)); | |
| """ | |
| rec.insert(0, 0.0) # insert 0.0 at begining of list | |
| rec.append(1.0) # insert 1.0 at end of list | |
| mrec = rec[:] | |
| prec.insert(0, 0.0) # insert 0.0 at begining of list | |
| prec.append(0.0) # insert 0.0 at end of list | |
| mpre = prec[:] | |
| """ | |
| This part makes the precision monotonically decreasing | |
| (goes from the end to the beginning) | |
| matlab: for i=numel(mpre)-1:-1:1 | |
| mpre(i)=max(mpre(i),mpre(i+1)); | |
| """ | |
| # matlab indexes start in 1 but python in 0, so I have to do: | |
| # range(start=(len(mpre) - 2), end=0, step=-1) | |
| # also the python function range excludes the end, resulting in: | |
| # range(start=(len(mpre) - 2), end=-1, step=-1) | |
| for i in range(len(mpre)-2, -1, -1): | |
| mpre[i] = max(mpre[i], mpre[i+1]) | |
| """ | |
| This part creates a list of indexes where the recall changes | |
| matlab: i=find(mrec(2:end)~=mrec(1:end-1))+1; | |
| """ | |
| i_list = [] | |
| for i in range(1, len(mrec)): | |
| if mrec[i] != mrec[i-1]: | |
| i_list.append(i) # if it was matlab would be i + 1 | |
| """ | |
| The Average Precision (AP) is the area under the curve | |
| (numerical integration) | |
| matlab: ap=sum((mrec(i)-mrec(i-1)).*mpre(i)); | |
| """ | |
| ap = 0.0 | |
| for i in i_list: | |
| ap += ((mrec[i]-mrec[i-1])*mpre[i]) | |
| return ap, mrec, mpre | |
| """ | |
| Draw plot using Matplotlib | |
| """ | |
| def draw_plot_func(dictionary, n_classes, window_title, plot_title, x_label, output_path, to_show, plot_color, true_p_bar): | |
| # sort the dictionary by decreasing value, into a list of tuples | |
| sorted_dic_by_value = sorted(dictionary.items(), key=operator.itemgetter(1)) | |
| print(sorted_dic_by_value) | |
| # unpacking the list of tuples into two lists | |
| sorted_keys, sorted_values = zip(*sorted_dic_by_value) | |
| # | |
| if true_p_bar != "": | |
| """ | |
| Special case to draw in: | |
| - green -> TP: True Positives (object detected and matches ground-truth) | |
| - red -> FP: False Positives (object detected but does not match ground-truth) | |
| - pink -> FN: False Negatives (object not detected but present in the ground-truth) | |
| """ | |
| fp_sorted = [] | |
| tp_sorted = [] | |
| for key in sorted_keys: | |
| fp_sorted.append(dictionary[key] - true_p_bar[key]) | |
| tp_sorted.append(true_p_bar[key]) | |
| plt.barh(range(n_classes), fp_sorted, align='center', color='crimson', label='False Positive') | |
| plt.barh(range(n_classes), tp_sorted, align='center', color='forestgreen', label='True Positive', left=fp_sorted) | |
| # add legend | |
| plt.legend(loc='lower right') | |
| """ | |
| Write number on side of bar | |
| """ | |
| fig = plt.gcf() # gcf - get current figure | |
| axes = plt.gca() | |
| r = fig.canvas.get_renderer() | |
| for i, val in enumerate(sorted_values): | |
| fp_val = fp_sorted[i] | |
| tp_val = tp_sorted[i] | |
| fp_str_val = " " + str(fp_val) | |
| tp_str_val = fp_str_val + " " + str(tp_val) | |
| # trick to paint multicolor with offset: | |
| # first paint everything and then repaint the first number | |
| t = plt.text(val, i, tp_str_val, color='forestgreen', va='center', fontweight='bold') | |
| plt.text(val, i, fp_str_val, color='crimson', va='center', fontweight='bold') | |
| if i == (len(sorted_values)-1): # largest bar | |
| adjust_axes(r, t, fig, axes) | |
| else: | |
| plt.barh(range(n_classes), sorted_values, color=plot_color) | |
| """ | |
| Write number on side of bar | |
| """ | |
| fig = plt.gcf() # gcf - get current figure | |
| axes = plt.gca() | |
| r = fig.canvas.get_renderer() | |
| for i, val in enumerate(sorted_values): | |
| str_val = " " + str(val) # add a space before | |
| if val < 1.0: | |
| str_val = " {0:.2f}".format(val) | |
| t = plt.text(val, i, str_val, color=plot_color, va='center', fontweight='bold') | |
| # re-set axes to show number inside the figure | |
| if i == (len(sorted_values)-1): # largest bar | |
| adjust_axes(r, t, fig, axes) | |
| # set window title | |
| fig.canvas.set_window_title(window_title) | |
| # write classes in y axis | |
| tick_font_size = 12 | |
| plt.yticks(range(n_classes), sorted_keys, fontsize=tick_font_size) | |
| """ | |
| Re-scale height accordingly | |
| """ | |
| init_height = fig.get_figheight() | |
| # comput the matrix height in points and inches | |
| dpi = fig.dpi | |
| height_pt = n_classes * (tick_font_size * 1.4) # 1.4 (some spacing) | |
| height_in = height_pt / dpi | |
| # compute the required figure height | |
| top_margin = 0.15 # in percentage of the figure height | |
| bottom_margin = 0.05 # in percentage of the figure height | |
| figure_height = height_in / (1 - top_margin - bottom_margin) | |
| # set new height | |
| if figure_height > init_height: | |
| fig.set_figheight(figure_height) | |
| # set plot title | |
| plt.title(plot_title, fontsize=14) | |
| # set axis titles | |
| # plt.xlabel('classes') | |
| plt.xlabel(x_label, fontsize='large') | |
| # adjust size of window | |
| fig.tight_layout() | |
| # save the plot | |
| fig.savefig(output_path) | |
| # show image | |
| # if to_show: | |
| plt.show() | |
| # close the plot | |
| # plt.close() | |
| """ | |
| Plot - adjust axes | |
| """ | |
| def adjust_axes(r, t, fig, axes): | |
| # get text width for re-scaling | |
| bb = t.get_window_extent(renderer=r) | |
| text_width_inches = bb.width / fig.dpi | |
| # get axis width in inches | |
| current_fig_width = fig.get_figwidth() | |
| new_fig_width = current_fig_width + text_width_inches | |
| propotion = new_fig_width / current_fig_width | |
| # get axis limit | |
| x_lim = axes.get_xlim() | |
| axes.set_xlim([x_lim[0], x_lim[1]*propotion]) | |
| def read_txt_to_list(path): | |
| # open txt file lines to a list | |
| with open(path) as f: | |
| content = f.readlines() | |
| # remove whitespace characters like `\n` at the end of each line | |
| content = [x.strip() for x in content] | |
| return content |