diff --git a/predictions/k_nearest.py b/predictions/k_nearest.py new file mode 100644 index 0000000..1b4c03e --- /dev/null +++ b/predictions/k_nearest.py @@ -0,0 +1,90 @@ +import numpy as np +from collections import Counter + + +def minkowski_distance_p(x, y, p=2): + x = np.asarray(x) + y = np.asarray(y) + + # Find the smallest common datatype with float64 (return type of this + # function) - addresses #10262. + # Don't just cast to float64 for complex input case. + common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype), + 'float64') + + # Make sure x and y are NumPy arrays of correct datatype. + x = x.astype(common_datatype) + y = y.astype(common_datatype) + + if p == np.inf: + return np.amax(np.abs(y - x), axis=-1) + elif p == 1: + return np.sum(np.abs(y - x), axis=-1) + else: + return np.sum(np.abs(y - x) ** p, axis=-1) + + +def minkowski_distance(x, y, p=2): + x = np.asarray(x) + y = np.asarray(y) + if p == np.inf or p == 1: + return minkowski_distance_p(x, y, p) + else: + return minkowski_distance_p(x, y, p) ** (1. / p) + + +class KNearestNeighbours: + def __init__(self, k=5): + self.k = k + self.embeddings = None + self.embeddings_list = None + + def set_embeddings(self, embeddings): + self.embeddings = embeddings + df = embeddings.drop(columns=['labels', 'label_name', 'embeddings']) + # convert embedding from string to list of floats + df["embeddings"] = df["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")]) + # drop embeddings2 + df = df.drop(columns=['embeddings2']) + # to list + self.embeddings_list = df["embeddings"].tolist() + + def distance_matrix(self, keypoints, p=2, threshold=1000000): + x = np.array(keypoints) + m, k = x.shape + y = np.asarray(self.embeddings_list) + n, kk = y.shape + + if k != kk: + raise ValueError(f"x contains {k}-dimensional vectors but y contains " + f"{kk}-dimensional vectors") + + if m * n * k <= threshold: + # print("Using minkowski_distance") + return minkowski_distance(x[:, np.newaxis, :], y[np.newaxis, :, :], p) + else: + result = np.empty((m, n), dtype=float) # FIXME: figure out the best dtype + if m < n: + for i in range(m): + result[i, :] = minkowski_distance(x[i], y, p) + else: + for j in range(n): + result[:, j] = minkowski_distance(x, y[j], p) + return result + + def predict(self, key_points_embeddings): + # calculate distance matrix + dist_matrix = self.distance_matrix(key_points_embeddings, p=2, threshold=1000000) + + # get the 5 closest matches and select the class that is most common and use the average distance as the score + # get the 5 closest matches + indeces = np.argsort(dist_matrix)[0][:self.k] + # get the labels + labels = self.embeddings["label_name"].iloc[indeces].tolist() + c = Counter(labels).most_common()[0][0] + + # filter indeces to only include the most common label + indeces = [i for i in indeces if self.embeddings["label_name"].iloc[i] == c] + # get the average distance + score = np.mean(dist_matrix[0][indeces]) + return c, score diff --git a/predictions/predictor.py b/predictions/predictor.py new file mode 100644 index 0000000..5a78b09 --- /dev/null +++ b/predictions/predictor.py @@ -0,0 +1,232 @@ +import cv2 +import mediapipe as mp +import numpy as np +import pandas as pd +import torch + +from predictions.k_nearest import KNearestNeighbours + +device = torch.device("cpu") +if torch.cuda.is_available(): + device = torch.device("cuda") +from models import SPOTER_EMBEDDINGS + +BODY_IDENTIFIERS = [ + 0, + 33, + 5, + 2, + 8, + 7, + 12, + 11, + 14, + 13, + 16, + 15, +] + +HAND_IDENTIFIERS = [ + 0, + 8, + 7, + 6, + 5, + 12, + 11, + 10, + 9, + 16, + 15, + 14, + 13, + 20, + 19, + 18, + 17, + 4, + 3, + 2, + 1, +] + +CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth" + + +class Predictor: + def __init__(self, embeddings_path, predictor_type): + + # Initialize MediaPipe Hands model + self.holistic = mp.solutions.holistic.Holistic( + min_detection_confidence=0.5, + min_tracking_confidence=0.5, + model_complexity=2 + ) + + self.mp_holistic = mp.solutions.holistic + self.mp_drawing = mp.solutions.drawing_utils + # buffer = [] + self.left_shoulder_index = 11 + self.right_shoulder_index = 12 + self.neck_index = 33 + self.nose_index = 0 + self.left_eye_index = 2 + + # load training embedding csv + self.embeddings = pd.read_csv(embeddings_path) + + checkpoint = torch.load(CHECKPOINT_PATH, map_location=device) + + self.model = SPOTER_EMBEDDINGS( + features=checkpoint["config_args"].vector_length, + hidden_dim=checkpoint["config_args"].hidden_dim, + norm_emb=checkpoint["config_args"].normalize_embeddings, + ).to(device) + + self.model.load_state_dict(checkpoint["state_dict"]) + + if predictor_type is None: + self.predictor = KNearestNeighbours(1) + else: + self.predictor = predictor_type + self.predictor.set_embeddings(self.embeddings) + + def extract_keypoints(self, image_orig): + image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB) + results = self.holistic.process(image) + + def extract_keypoints(lmks): + if lmks: + a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark]) + return a + return None + + def calculate_neck(keypoints): + if keypoints is not None: + left_shoulder = keypoints[11] + right_shoulder = keypoints[12] + + neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, + (float(left_shoulder[1]) + float(right_shoulder[1])) / 2] + # add neck to keypoints + keypoints = np.append(keypoints, [neck], axis=0) + return keypoints + return None + + pose = extract_keypoints(results.pose_landmarks) + pose = calculate_neck(pose) + if pose is None: + return None + pose_norm = self.normalize_pose(pose) + # filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order + pose_norm = pose_norm[BODY_IDENTIFIERS] + + left_hand = extract_keypoints(results.left_hand_landmarks) + right_hand = extract_keypoints(results.right_hand_landmarks) + + if left_hand is None and right_hand is None: + return None + + # normalize hands + if left_hand is not None: + left_hand = self.normalize_hand(left_hand) + else: + left_hand = np.zeros((21, 2)) + if right_hand is not None: + right_hand = self.normalize_hand(right_hand) + else: + right_hand = np.zeros((21, 2)) + + left_hand = left_hand[HAND_IDENTIFIERS] + + right_hand = right_hand[HAND_IDENTIFIERS] + + # combine pose and hands + pose_norm = np.append(pose_norm, left_hand, axis=0) + pose_norm = np.append(pose_norm, right_hand, axis=0) + + # move interval + pose_norm -= 0.5 + + return pose_norm + + # if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2) + def normalize_pose(self, keypoints): + left_shoulder = keypoints[self.left_shoulder_index] + right_shoulder = keypoints[self.right_shoulder_index] + + neck = keypoints[self.neck_index] + nose = keypoints[self.nose_index] + + # Prevent from even starting the analysis if some necessary elements are not present + if (left_shoulder[0] == 0 or right_shoulder[0] == 0 + or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and ( + neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])): + return keypoints + + if left_shoulder[0] != 0 and right_shoulder[0] != 0 and ( + left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]): + shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ( + (left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5) + head_metric = shoulder_distance + else: + neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5) + head_metric = neck_nose_distance + + # Set the starting and ending point of the normalization bounding box + starting_point = [keypoints[self.neck_index][0] - 3 * head_metric, + keypoints[self.left_eye_index][1] + head_metric] + ending_point = [keypoints[self.neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric] + + if starting_point[0] < 0: + starting_point[0] = 0 + if starting_point[1] < 0: + starting_point[1] = 0 + if ending_point[0] < 0: + ending_point[0] = 0 + if ending_point[1] < 0: + ending_point[1] = 0 + + # Normalize the keypoints + for i in range(len(keypoints)): + keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) + keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1]) + + return keypoints + + def normalize_hand(self, keypoints): + x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0] + y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0] + + if not x_values or not y_values: + return keypoints + + width, height = max(x_values) - min(x_values), max(y_values) - min(y_values) + if width > height: + delta_x = 0.1 * width + delta_y = delta_x + ((width - height) / 2) + else: + delta_y = 0.1 * height + delta_x = delta_y + ((height - width) / 2) + + starting_point = (min(x_values) - delta_x, min(y_values) - delta_y) + ending_point = (max(x_values) + delta_x, max(y_values) + delta_y) + + if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0: + return keypoints + + # normalize keypoints + for i in range(len(keypoints)): + keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) + keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1]) + + return keypoints + + def make_prediction(self, keypoints): + # run model on frame + self.model.eval() + with torch.no_grad(): + keypoints = torch.from_numpy(np.array([keypoints])).float().to(device) + new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0] + + return self.predictor.predict(new_embeddings) diff --git a/webcam.py b/webcam.py index f832fc5..6acee10 100644 --- a/webcam.py +++ b/webcam.py @@ -1,339 +1,46 @@ - -from collections import Counter - import cv2 -import mediapipe as mp -import numpy as np -import pandas as pd -import torch -device = torch.device("cpu") -if torch.cuda.is_available(): - device = torch.device("cuda") -from models import SPOTER_EMBEDDINGS +from predictions.k_nearest import KNearestNeighbours +from predictions.predictor import Predictor -# Initialize MediaPipe Hands model -holistic = mp.solutions.holistic.Holistic( - min_detection_confidence=0.5, - min_tracking_confidence=0.5, - model_complexity=2 - ) -mp_holistic = mp.solutions.holistic -mp_drawing = mp.solutions.drawing_utils +if __name__ == '__main__': + buffer = [] -BODY_IDENTIFIERS = [ - 0, - 33, - 5, - 2, - 8, - 7, - 12, - 11, - 14, - 13, - 16, - 15, -] + # open webcam stream + cap = cv2.VideoCapture(0) -HAND_IDENTIFIERS = [ - 0, - 8, - 7, - 6, - 5, - 12, - 11, - 10, - 9, - 16, - 15, - 14, - 13, - 20, - 19, - 18, - 17, - 4, - 3, - 2, - 1, -] + k = 3 + predictor_type = KNearestNeighbours(k) -def extract_keypoints(image_orig): - image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB) - results = holistic.process(image) + # embeddings_path = 'embeddings/basic-signs/embeddings.csv' + embeddings_path = 'embeddings/fingerspelling/embeddings.csv' - def extract_keypoints(lmks): - if lmks: - a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark]) - return a - return None - - def calculate_neck(keypoints): - left_shoulder = keypoints[11] - right_shoulder = keypoints[12] + predictor = Predictor(embeddings_path, predictor_type) - neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2] - # add neck to keypoints - keypoints = np.append(keypoints, [neck], axis=0) - return keypoints + index = 0 - pose = extract_keypoints(results.pose_landmarks) - pose = calculate_neck(pose) - pose_norm = normalize_pose(pose) - # filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order - pose_norm = pose_norm[BODY_IDENTIFIERS] + while cap.isOpened(): + # Wait for key press to exit + if cv2.waitKey(5) & 0xFF == 27: + break - left_hand = extract_keypoints(results.left_hand_landmarks) - right_hand = extract_keypoints(results.right_hand_landmarks) + ret, frame = cap.read() + pose = predictor.extract_keypoints(frame) - if left_hand is None and right_hand is None: - return None + if pose is None: + cv2.imshow('MediaPipe Hands', frame) + continue - # normalize hands - if left_hand is not None: - left_hand = normalize_hand(left_hand) - else: - left_hand = np.zeros((21, 2)) - if right_hand is not None: - right_hand = normalize_hand(right_hand) - else: - right_hand = np.zeros((21, 2)) + buffer.append(pose) + if len(buffer) > 15: + buffer.pop(0) - left_hand = left_hand[HAND_IDENTIFIERS] + if len(buffer) == 15: + label, score = predictor.make_prediction(buffer) - right_hand = right_hand[HAND_IDENTIFIERS] + # draw label + cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) + cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) - # combine pose and hands - pose_norm = np.append(pose_norm, left_hand, axis=0) - pose_norm = np.append(pose_norm, right_hand, axis=0) - - # move interval - pose_norm -= 0.5 - - return pose_norm - - -buffer = [] - -left_shoulder_index = 11 -right_shoulder_index = 12 -neck_index = 33 -nose_index = 0 -left_eye_index = 2 - -# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2) -def normalize_pose(keypoints): - left_shoulder = keypoints[left_shoulder_index] - right_shoulder = keypoints[right_shoulder_index] - - neck = keypoints[neck_index] - nose = keypoints[nose_index] - - # Prevent from even starting the analysis if some necessary elements are not present - if (left_shoulder[0] == 0 or right_shoulder[0] == 0 - or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and ( - neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])): - return keypoints - - if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]): - shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ((left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5) - head_metric = shoulder_distance - else: - neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5) - head_metric = neck_nose_distance - - # Set the starting and ending point of the normalization bounding box - starting_point = [keypoints[neck_index][0] - 3 * head_metric, keypoints[left_eye_index][1] + head_metric] - ending_point = [keypoints[neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric] - - if starting_point[0] < 0: - starting_point[0] = 0 - if starting_point[1] < 0: - starting_point[1] = 0 - if ending_point[0] < 0: - ending_point[0] = 0 - if ending_point[1] < 0: - ending_point[1] = 0 - - # Normalize the keypoints - for i in range(len(keypoints)): - keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) - keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1]) - - return keypoints - -def normalize_hand(keypoints): - x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0] - y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0] - - if not x_values or not y_values: - return keypoints - - width, height = max(x_values) - min(x_values), max(y_values) - min(y_values) - if width > height: - delta_x = 0.1 * width - delta_y = delta_x + ((width - height) / 2) - else: - delta_y = 0.1 * height - delta_x = delta_y + ((height - width) / 2) - - starting_point = (min(x_values) - delta_x, min(y_values) - delta_y) - ending_point = (max(x_values) + delta_x, max(y_values) + delta_y) - - if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0: - return keypoints - - # normalize keypoints - for i in range(len(keypoints)): - keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0]) - keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1]) - - return keypoints - - -# load training embedding csv -df = pd.read_csv('embeddings/basic-signs/embeddings.csv') - -def minkowski_distance_p(x, y, p=2): - x = np.asarray(x) - y = np.asarray(y) - - # Find smallest common datatype with float64 (return type of this - # function) - addresses #10262. - # Don't just cast to float64 for complex input case. - common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype), - 'float64') - - # Make sure x and y are NumPy arrays of correct datatype. - x = x.astype(common_datatype) - y = y.astype(common_datatype) - - if p == np.inf: - return np.amax(np.abs(y-x), axis=-1) - elif p == 1: - return np.sum(np.abs(y-x), axis=-1) - else: - return np.sum(np.abs(y-x)**p, axis=-1) - -def minkowski_distance(x, y, p=2): - x = np.asarray(x) - y = np.asarray(y) - if p == np.inf or p == 1: - return minkowski_distance_p(x, y, p) - else: - return minkowski_distance_p(x, y, p)**(1./p) - - -def distance_matrix(keypoints, embeddings, p=2, threshold=1000000): - - x = np.array(keypoints) - m, k = x.shape - y = np.asarray(embeddings) - n, kk = y.shape - - if k != kk: - raise ValueError(f"x contains {k}-dimensional vectors but y contains " - f"{kk}-dimensional vectors") - - if m*n*k <= threshold: - print("Using minkowski_distance") - return minkowski_distance(x[:,np.newaxis,:],y[np.newaxis,:,:],p) - else: - result = np.empty((m,n),dtype=float) # FIXME: figure out the best dtype - if m < n: - for i in range(m): - result[i,:] = minkowski_distance(x[i],y,p) - else: - for j in range(n): - result[:,j] = minkowski_distance(x,y[j],p) - return result - - -CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth" -checkpoint = torch.load(CHECKPOINT_PATH, map_location=device) - -model = SPOTER_EMBEDDINGS( - features=checkpoint["config_args"].vector_length, - hidden_dim=checkpoint["config_args"].hidden_dim, - norm_emb=checkpoint["config_args"].normalize_embeddings, -).to(device) - -model.load_state_dict(checkpoint["state_dict"]) -embeddings = df.drop(columns=['labels', 'label_name', 'embeddings']) - -# convert embedding from string to list of floats -embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")]) -# drop embeddings2 -embeddings = embeddings.drop(columns=['embeddings2']) -# to list -embeddings = embeddings["embeddings"].tolist() - -def make_prediction(keypoints): - # run model on frame - model.eval() - with torch.no_grad(): - keypoints = torch.from_numpy(np.array([keypoints])).float().to(device) - new_embeddings = model(keypoints).cpu().numpy().tolist()[0] - - # calculate distance matrix - dist_matrix = distance_matrix(new_embeddings, embeddings, p=2, threshold=1000000) - - # get the 5 closest matches and select the class that is most common and use the average distance as the score - # get the 5 closest matches - indeces = np.argsort(dist_matrix)[0][:5] - # get the labels - labels = df["label_name"].iloc[indeces].tolist() - c = Counter(labels).most_common()[0][0] - - # filter indeces to only include the most common label - indeces = [i for i in indeces if df["label_name"].iloc[i] == c] - # get the average distance - score = np.mean(dist_matrix[0][indeces]) - - return c, score - -# open webcam stream -cap = cv2.VideoCapture(0) - -while cap.isOpened(): - # read frame - ret, frame = cap.read() - pose = extract_keypoints(frame) - - if pose is None: - cv2.imshow('MediaPipe Hands', frame) - continue - - buffer.append(pose) - if len(buffer) > 15: - buffer.pop(0) - - if len(buffer) == 15: - label, score = make_prediction(buffer) - - # draw label - cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) - cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) - - # Show the frame - cv2.imshow('MediaPipe Hands', frame) - - # Wait for key press to exit - if cv2.waitKey(5) & 0xFF == 27: - break - -# open video A.mp4 -# cap = cv2.VideoCapture('E.mp4') -# while cap.isOpened(): -# # read frame -# ret, frame = cap.read() -# if frame is None: -# break -# pose = extract_keypoints(frame) - -# buffer.append(pose) - -# label, score = make_prediction(buffer) -# print(label, score) \ No newline at end of file + # Show the frame + cv2.imshow('MediaPipe Hands', frame) \ No newline at end of file