Split up k_nearest neighbor from webcam file
This commit is contained in:
90
predictions/k_nearest.py
Normal file
90
predictions/k_nearest.py
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
import numpy as np
|
||||||
|
from collections import Counter
|
||||||
|
|
||||||
|
|
||||||
|
def minkowski_distance_p(x, y, p=2):
|
||||||
|
x = np.asarray(x)
|
||||||
|
y = np.asarray(y)
|
||||||
|
|
||||||
|
# Find the smallest common datatype with float64 (return type of this
|
||||||
|
# function) - addresses #10262.
|
||||||
|
# Don't just cast to float64 for complex input case.
|
||||||
|
common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype),
|
||||||
|
'float64')
|
||||||
|
|
||||||
|
# Make sure x and y are NumPy arrays of correct datatype.
|
||||||
|
x = x.astype(common_datatype)
|
||||||
|
y = y.astype(common_datatype)
|
||||||
|
|
||||||
|
if p == np.inf:
|
||||||
|
return np.amax(np.abs(y - x), axis=-1)
|
||||||
|
elif p == 1:
|
||||||
|
return np.sum(np.abs(y - x), axis=-1)
|
||||||
|
else:
|
||||||
|
return np.sum(np.abs(y - x) ** p, axis=-1)
|
||||||
|
|
||||||
|
|
||||||
|
def minkowski_distance(x, y, p=2):
|
||||||
|
x = np.asarray(x)
|
||||||
|
y = np.asarray(y)
|
||||||
|
if p == np.inf or p == 1:
|
||||||
|
return minkowski_distance_p(x, y, p)
|
||||||
|
else:
|
||||||
|
return minkowski_distance_p(x, y, p) ** (1. / p)
|
||||||
|
|
||||||
|
|
||||||
|
class KNearestNeighbours:
|
||||||
|
def __init__(self, k=5):
|
||||||
|
self.k = k
|
||||||
|
self.embeddings = None
|
||||||
|
self.embeddings_list = None
|
||||||
|
|
||||||
|
def set_embeddings(self, embeddings):
|
||||||
|
self.embeddings = embeddings
|
||||||
|
df = embeddings.drop(columns=['labels', 'label_name', 'embeddings'])
|
||||||
|
# convert embedding from string to list of floats
|
||||||
|
df["embeddings"] = df["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
|
||||||
|
# drop embeddings2
|
||||||
|
df = df.drop(columns=['embeddings2'])
|
||||||
|
# to list
|
||||||
|
self.embeddings_list = df["embeddings"].tolist()
|
||||||
|
|
||||||
|
def distance_matrix(self, keypoints, p=2, threshold=1000000):
|
||||||
|
x = np.array(keypoints)
|
||||||
|
m, k = x.shape
|
||||||
|
y = np.asarray(self.embeddings_list)
|
||||||
|
n, kk = y.shape
|
||||||
|
|
||||||
|
if k != kk:
|
||||||
|
raise ValueError(f"x contains {k}-dimensional vectors but y contains "
|
||||||
|
f"{kk}-dimensional vectors")
|
||||||
|
|
||||||
|
if m * n * k <= threshold:
|
||||||
|
# print("Using minkowski_distance")
|
||||||
|
return minkowski_distance(x[:, np.newaxis, :], y[np.newaxis, :, :], p)
|
||||||
|
else:
|
||||||
|
result = np.empty((m, n), dtype=float) # FIXME: figure out the best dtype
|
||||||
|
if m < n:
|
||||||
|
for i in range(m):
|
||||||
|
result[i, :] = minkowski_distance(x[i], y, p)
|
||||||
|
else:
|
||||||
|
for j in range(n):
|
||||||
|
result[:, j] = minkowski_distance(x, y[j], p)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def predict(self, key_points_embeddings):
|
||||||
|
# calculate distance matrix
|
||||||
|
dist_matrix = self.distance_matrix(key_points_embeddings, p=2, threshold=1000000)
|
||||||
|
|
||||||
|
# get the 5 closest matches and select the class that is most common and use the average distance as the score
|
||||||
|
# get the 5 closest matches
|
||||||
|
indeces = np.argsort(dist_matrix)[0][:self.k]
|
||||||
|
# get the labels
|
||||||
|
labels = self.embeddings["label_name"].iloc[indeces].tolist()
|
||||||
|
c = Counter(labels).most_common()[0][0]
|
||||||
|
|
||||||
|
# filter indeces to only include the most common label
|
||||||
|
indeces = [i for i in indeces if self.embeddings["label_name"].iloc[i] == c]
|
||||||
|
# get the average distance
|
||||||
|
score = np.mean(dist_matrix[0][indeces])
|
||||||
|
return c, score
|
||||||
232
predictions/predictor.py
Normal file
232
predictions/predictor.py
Normal file
@@ -0,0 +1,232 @@
|
|||||||
|
import cv2
|
||||||
|
import mediapipe as mp
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import torch
|
||||||
|
|
||||||
|
from predictions.k_nearest import KNearestNeighbours
|
||||||
|
|
||||||
|
device = torch.device("cpu")
|
||||||
|
if torch.cuda.is_available():
|
||||||
|
device = torch.device("cuda")
|
||||||
|
from models import SPOTER_EMBEDDINGS
|
||||||
|
|
||||||
|
BODY_IDENTIFIERS = [
|
||||||
|
0,
|
||||||
|
33,
|
||||||
|
5,
|
||||||
|
2,
|
||||||
|
8,
|
||||||
|
7,
|
||||||
|
12,
|
||||||
|
11,
|
||||||
|
14,
|
||||||
|
13,
|
||||||
|
16,
|
||||||
|
15,
|
||||||
|
]
|
||||||
|
|
||||||
|
HAND_IDENTIFIERS = [
|
||||||
|
0,
|
||||||
|
8,
|
||||||
|
7,
|
||||||
|
6,
|
||||||
|
5,
|
||||||
|
12,
|
||||||
|
11,
|
||||||
|
10,
|
||||||
|
9,
|
||||||
|
16,
|
||||||
|
15,
|
||||||
|
14,
|
||||||
|
13,
|
||||||
|
20,
|
||||||
|
19,
|
||||||
|
18,
|
||||||
|
17,
|
||||||
|
4,
|
||||||
|
3,
|
||||||
|
2,
|
||||||
|
1,
|
||||||
|
]
|
||||||
|
|
||||||
|
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
|
||||||
|
|
||||||
|
|
||||||
|
class Predictor:
|
||||||
|
def __init__(self, embeddings_path, predictor_type):
|
||||||
|
|
||||||
|
# Initialize MediaPipe Hands model
|
||||||
|
self.holistic = mp.solutions.holistic.Holistic(
|
||||||
|
min_detection_confidence=0.5,
|
||||||
|
min_tracking_confidence=0.5,
|
||||||
|
model_complexity=2
|
||||||
|
)
|
||||||
|
|
||||||
|
self.mp_holistic = mp.solutions.holistic
|
||||||
|
self.mp_drawing = mp.solutions.drawing_utils
|
||||||
|
# buffer = []
|
||||||
|
self.left_shoulder_index = 11
|
||||||
|
self.right_shoulder_index = 12
|
||||||
|
self.neck_index = 33
|
||||||
|
self.nose_index = 0
|
||||||
|
self.left_eye_index = 2
|
||||||
|
|
||||||
|
# load training embedding csv
|
||||||
|
self.embeddings = pd.read_csv(embeddings_path)
|
||||||
|
|
||||||
|
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
|
||||||
|
|
||||||
|
self.model = SPOTER_EMBEDDINGS(
|
||||||
|
features=checkpoint["config_args"].vector_length,
|
||||||
|
hidden_dim=checkpoint["config_args"].hidden_dim,
|
||||||
|
norm_emb=checkpoint["config_args"].normalize_embeddings,
|
||||||
|
).to(device)
|
||||||
|
|
||||||
|
self.model.load_state_dict(checkpoint["state_dict"])
|
||||||
|
|
||||||
|
if predictor_type is None:
|
||||||
|
self.predictor = KNearestNeighbours(1)
|
||||||
|
else:
|
||||||
|
self.predictor = predictor_type
|
||||||
|
self.predictor.set_embeddings(self.embeddings)
|
||||||
|
|
||||||
|
def extract_keypoints(self, image_orig):
|
||||||
|
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
|
||||||
|
results = self.holistic.process(image)
|
||||||
|
|
||||||
|
def extract_keypoints(lmks):
|
||||||
|
if lmks:
|
||||||
|
a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark])
|
||||||
|
return a
|
||||||
|
return None
|
||||||
|
|
||||||
|
def calculate_neck(keypoints):
|
||||||
|
if keypoints is not None:
|
||||||
|
left_shoulder = keypoints[11]
|
||||||
|
right_shoulder = keypoints[12]
|
||||||
|
|
||||||
|
neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2,
|
||||||
|
(float(left_shoulder[1]) + float(right_shoulder[1])) / 2]
|
||||||
|
# add neck to keypoints
|
||||||
|
keypoints = np.append(keypoints, [neck], axis=0)
|
||||||
|
return keypoints
|
||||||
|
return None
|
||||||
|
|
||||||
|
pose = extract_keypoints(results.pose_landmarks)
|
||||||
|
pose = calculate_neck(pose)
|
||||||
|
if pose is None:
|
||||||
|
return None
|
||||||
|
pose_norm = self.normalize_pose(pose)
|
||||||
|
# filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order
|
||||||
|
pose_norm = pose_norm[BODY_IDENTIFIERS]
|
||||||
|
|
||||||
|
left_hand = extract_keypoints(results.left_hand_landmarks)
|
||||||
|
right_hand = extract_keypoints(results.right_hand_landmarks)
|
||||||
|
|
||||||
|
if left_hand is None and right_hand is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# normalize hands
|
||||||
|
if left_hand is not None:
|
||||||
|
left_hand = self.normalize_hand(left_hand)
|
||||||
|
else:
|
||||||
|
left_hand = np.zeros((21, 2))
|
||||||
|
if right_hand is not None:
|
||||||
|
right_hand = self.normalize_hand(right_hand)
|
||||||
|
else:
|
||||||
|
right_hand = np.zeros((21, 2))
|
||||||
|
|
||||||
|
left_hand = left_hand[HAND_IDENTIFIERS]
|
||||||
|
|
||||||
|
right_hand = right_hand[HAND_IDENTIFIERS]
|
||||||
|
|
||||||
|
# combine pose and hands
|
||||||
|
pose_norm = np.append(pose_norm, left_hand, axis=0)
|
||||||
|
pose_norm = np.append(pose_norm, right_hand, axis=0)
|
||||||
|
|
||||||
|
# move interval
|
||||||
|
pose_norm -= 0.5
|
||||||
|
|
||||||
|
return pose_norm
|
||||||
|
|
||||||
|
# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2)
|
||||||
|
def normalize_pose(self, keypoints):
|
||||||
|
left_shoulder = keypoints[self.left_shoulder_index]
|
||||||
|
right_shoulder = keypoints[self.right_shoulder_index]
|
||||||
|
|
||||||
|
neck = keypoints[self.neck_index]
|
||||||
|
nose = keypoints[self.nose_index]
|
||||||
|
|
||||||
|
# Prevent from even starting the analysis if some necessary elements are not present
|
||||||
|
if (left_shoulder[0] == 0 or right_shoulder[0] == 0
|
||||||
|
or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and (
|
||||||
|
neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])):
|
||||||
|
return keypoints
|
||||||
|
|
||||||
|
if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (
|
||||||
|
left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]):
|
||||||
|
shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + (
|
||||||
|
(left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5)
|
||||||
|
head_metric = shoulder_distance
|
||||||
|
else:
|
||||||
|
neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5)
|
||||||
|
head_metric = neck_nose_distance
|
||||||
|
|
||||||
|
# Set the starting and ending point of the normalization bounding box
|
||||||
|
starting_point = [keypoints[self.neck_index][0] - 3 * head_metric,
|
||||||
|
keypoints[self.left_eye_index][1] + head_metric]
|
||||||
|
ending_point = [keypoints[self.neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric]
|
||||||
|
|
||||||
|
if starting_point[0] < 0:
|
||||||
|
starting_point[0] = 0
|
||||||
|
if starting_point[1] < 0:
|
||||||
|
starting_point[1] = 0
|
||||||
|
if ending_point[0] < 0:
|
||||||
|
ending_point[0] = 0
|
||||||
|
if ending_point[1] < 0:
|
||||||
|
ending_point[1] = 0
|
||||||
|
|
||||||
|
# Normalize the keypoints
|
||||||
|
for i in range(len(keypoints)):
|
||||||
|
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
||||||
|
keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1])
|
||||||
|
|
||||||
|
return keypoints
|
||||||
|
|
||||||
|
def normalize_hand(self, keypoints):
|
||||||
|
x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0]
|
||||||
|
y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0]
|
||||||
|
|
||||||
|
if not x_values or not y_values:
|
||||||
|
return keypoints
|
||||||
|
|
||||||
|
width, height = max(x_values) - min(x_values), max(y_values) - min(y_values)
|
||||||
|
if width > height:
|
||||||
|
delta_x = 0.1 * width
|
||||||
|
delta_y = delta_x + ((width - height) / 2)
|
||||||
|
else:
|
||||||
|
delta_y = 0.1 * height
|
||||||
|
delta_x = delta_y + ((height - width) / 2)
|
||||||
|
|
||||||
|
starting_point = (min(x_values) - delta_x, min(y_values) - delta_y)
|
||||||
|
ending_point = (max(x_values) + delta_x, max(y_values) + delta_y)
|
||||||
|
|
||||||
|
if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0:
|
||||||
|
return keypoints
|
||||||
|
|
||||||
|
# normalize keypoints
|
||||||
|
for i in range(len(keypoints)):
|
||||||
|
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
||||||
|
keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1])
|
||||||
|
|
||||||
|
return keypoints
|
||||||
|
|
||||||
|
def make_prediction(self, keypoints):
|
||||||
|
# run model on frame
|
||||||
|
self.model.eval()
|
||||||
|
with torch.no_grad():
|
||||||
|
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
|
||||||
|
new_embeddings = self.model(keypoints).cpu().numpy().tolist()[0]
|
||||||
|
|
||||||
|
return self.predictor.predict(new_embeddings)
|
||||||
355
webcam.py
355
webcam.py
@@ -1,339 +1,46 @@
|
|||||||
|
|
||||||
from collections import Counter
|
|
||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
import mediapipe as mp
|
|
||||||
import numpy as np
|
|
||||||
import pandas as pd
|
|
||||||
import torch
|
|
||||||
|
|
||||||
device = torch.device("cpu")
|
from predictions.k_nearest import KNearestNeighbours
|
||||||
if torch.cuda.is_available():
|
from predictions.predictor import Predictor
|
||||||
device = torch.device("cuda")
|
|
||||||
from models import SPOTER_EMBEDDINGS
|
|
||||||
|
|
||||||
# Initialize MediaPipe Hands model
|
if __name__ == '__main__':
|
||||||
holistic = mp.solutions.holistic.Holistic(
|
buffer = []
|
||||||
min_detection_confidence=0.5,
|
|
||||||
min_tracking_confidence=0.5,
|
|
||||||
model_complexity=2
|
|
||||||
)
|
|
||||||
mp_holistic = mp.solutions.holistic
|
|
||||||
mp_drawing = mp.solutions.drawing_utils
|
|
||||||
|
|
||||||
BODY_IDENTIFIERS = [
|
# open webcam stream
|
||||||
0,
|
cap = cv2.VideoCapture(0)
|
||||||
33,
|
|
||||||
5,
|
|
||||||
2,
|
|
||||||
8,
|
|
||||||
7,
|
|
||||||
12,
|
|
||||||
11,
|
|
||||||
14,
|
|
||||||
13,
|
|
||||||
16,
|
|
||||||
15,
|
|
||||||
]
|
|
||||||
|
|
||||||
HAND_IDENTIFIERS = [
|
k = 3
|
||||||
0,
|
predictor_type = KNearestNeighbours(k)
|
||||||
8,
|
|
||||||
7,
|
|
||||||
6,
|
|
||||||
5,
|
|
||||||
12,
|
|
||||||
11,
|
|
||||||
10,
|
|
||||||
9,
|
|
||||||
16,
|
|
||||||
15,
|
|
||||||
14,
|
|
||||||
13,
|
|
||||||
20,
|
|
||||||
19,
|
|
||||||
18,
|
|
||||||
17,
|
|
||||||
4,
|
|
||||||
3,
|
|
||||||
2,
|
|
||||||
1,
|
|
||||||
]
|
|
||||||
|
|
||||||
def extract_keypoints(image_orig):
|
# embeddings_path = 'embeddings/basic-signs/embeddings.csv'
|
||||||
image = cv2.cvtColor(image_orig, cv2.COLOR_BGR2RGB)
|
embeddings_path = 'embeddings/fingerspelling/embeddings.csv'
|
||||||
results = holistic.process(image)
|
|
||||||
|
|
||||||
def extract_keypoints(lmks):
|
predictor = Predictor(embeddings_path, predictor_type)
|
||||||
if lmks:
|
|
||||||
a = np.array([[float(lmk.x), float(lmk.y)] for lmk in lmks.landmark])
|
|
||||||
return a
|
|
||||||
return None
|
|
||||||
|
|
||||||
def calculate_neck(keypoints):
|
|
||||||
left_shoulder = keypoints[11]
|
|
||||||
right_shoulder = keypoints[12]
|
|
||||||
|
|
||||||
neck = [(float(left_shoulder[0]) + float(right_shoulder[0])) / 2, (float(left_shoulder[1]) + float(right_shoulder[1])) / 2]
|
index = 0
|
||||||
# add neck to keypoints
|
|
||||||
keypoints = np.append(keypoints, [neck], axis=0)
|
|
||||||
return keypoints
|
|
||||||
|
|
||||||
pose = extract_keypoints(results.pose_landmarks)
|
while cap.isOpened():
|
||||||
pose = calculate_neck(pose)
|
# Wait for key press to exit
|
||||||
pose_norm = normalize_pose(pose)
|
if cv2.waitKey(5) & 0xFF == 27:
|
||||||
# filter out keypoints that are not in BODY_IDENTIFIERS and make sure they are in the correct order
|
break
|
||||||
pose_norm = pose_norm[BODY_IDENTIFIERS]
|
|
||||||
|
|
||||||
left_hand = extract_keypoints(results.left_hand_landmarks)
|
ret, frame = cap.read()
|
||||||
right_hand = extract_keypoints(results.right_hand_landmarks)
|
pose = predictor.extract_keypoints(frame)
|
||||||
|
|
||||||
if left_hand is None and right_hand is None:
|
if pose is None:
|
||||||
return None
|
cv2.imshow('MediaPipe Hands', frame)
|
||||||
|
continue
|
||||||
|
|
||||||
# normalize hands
|
buffer.append(pose)
|
||||||
if left_hand is not None:
|
if len(buffer) > 15:
|
||||||
left_hand = normalize_hand(left_hand)
|
buffer.pop(0)
|
||||||
else:
|
|
||||||
left_hand = np.zeros((21, 2))
|
|
||||||
if right_hand is not None:
|
|
||||||
right_hand = normalize_hand(right_hand)
|
|
||||||
else:
|
|
||||||
right_hand = np.zeros((21, 2))
|
|
||||||
|
|
||||||
left_hand = left_hand[HAND_IDENTIFIERS]
|
if len(buffer) == 15:
|
||||||
|
label, score = predictor.make_prediction(buffer)
|
||||||
|
|
||||||
right_hand = right_hand[HAND_IDENTIFIERS]
|
# draw label
|
||||||
|
cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
||||||
|
cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
||||||
|
|
||||||
# combine pose and hands
|
# Show the frame
|
||||||
pose_norm = np.append(pose_norm, left_hand, axis=0)
|
cv2.imshow('MediaPipe Hands', frame)
|
||||||
pose_norm = np.append(pose_norm, right_hand, axis=0)
|
|
||||||
|
|
||||||
# move interval
|
|
||||||
pose_norm -= 0.5
|
|
||||||
|
|
||||||
return pose_norm
|
|
||||||
|
|
||||||
|
|
||||||
buffer = []
|
|
||||||
|
|
||||||
left_shoulder_index = 11
|
|
||||||
right_shoulder_index = 12
|
|
||||||
neck_index = 33
|
|
||||||
nose_index = 0
|
|
||||||
left_eye_index = 2
|
|
||||||
|
|
||||||
# if we have the keypoints, normalize single body, keypoints is numpy array of (identifiers, 2)
|
|
||||||
def normalize_pose(keypoints):
|
|
||||||
left_shoulder = keypoints[left_shoulder_index]
|
|
||||||
right_shoulder = keypoints[right_shoulder_index]
|
|
||||||
|
|
||||||
neck = keypoints[neck_index]
|
|
||||||
nose = keypoints[nose_index]
|
|
||||||
|
|
||||||
# Prevent from even starting the analysis if some necessary elements are not present
|
|
||||||
if (left_shoulder[0] == 0 or right_shoulder[0] == 0
|
|
||||||
or (left_shoulder[0] == right_shoulder[0] and left_shoulder[1] == right_shoulder[1])) and (
|
|
||||||
neck[0] == 0 or nose[0] == 0 or (neck[0] == nose[0] and neck[1] == nose[1])):
|
|
||||||
return keypoints
|
|
||||||
|
|
||||||
if left_shoulder[0] != 0 and right_shoulder[0] != 0 and (left_shoulder[0] != right_shoulder[0] or left_shoulder[1] != right_shoulder[1]):
|
|
||||||
shoulder_distance = ((((left_shoulder[0] - right_shoulder[0]) ** 2) + ((left_shoulder[1] - right_shoulder[1]) ** 2)) ** 0.5)
|
|
||||||
head_metric = shoulder_distance
|
|
||||||
else:
|
|
||||||
neck_nose_distance = ((((neck[0] - nose[0]) ** 2) + ((neck[1] - nose[1]) ** 2)) ** 0.5)
|
|
||||||
head_metric = neck_nose_distance
|
|
||||||
|
|
||||||
# Set the starting and ending point of the normalization bounding box
|
|
||||||
starting_point = [keypoints[neck_index][0] - 3 * head_metric, keypoints[left_eye_index][1] + head_metric]
|
|
||||||
ending_point = [keypoints[neck_index][0] + 3 * head_metric, starting_point[1] - 6 * head_metric]
|
|
||||||
|
|
||||||
if starting_point[0] < 0:
|
|
||||||
starting_point[0] = 0
|
|
||||||
if starting_point[1] < 0:
|
|
||||||
starting_point[1] = 0
|
|
||||||
if ending_point[0] < 0:
|
|
||||||
ending_point[0] = 0
|
|
||||||
if ending_point[1] < 0:
|
|
||||||
ending_point[1] = 0
|
|
||||||
|
|
||||||
# Normalize the keypoints
|
|
||||||
for i in range(len(keypoints)):
|
|
||||||
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
|
||||||
keypoints[i][1] = (keypoints[i][1] - ending_point[1]) / (starting_point[1] - ending_point[1])
|
|
||||||
|
|
||||||
return keypoints
|
|
||||||
|
|
||||||
def normalize_hand(keypoints):
|
|
||||||
x_values = [keypoints[i][0] for i in range(len(keypoints)) if keypoints[i][0] != 0]
|
|
||||||
y_values = [keypoints[i][1] for i in range(len(keypoints)) if keypoints[i][1] != 0]
|
|
||||||
|
|
||||||
if not x_values or not y_values:
|
|
||||||
return keypoints
|
|
||||||
|
|
||||||
width, height = max(x_values) - min(x_values), max(y_values) - min(y_values)
|
|
||||||
if width > height:
|
|
||||||
delta_x = 0.1 * width
|
|
||||||
delta_y = delta_x + ((width - height) / 2)
|
|
||||||
else:
|
|
||||||
delta_y = 0.1 * height
|
|
||||||
delta_x = delta_y + ((height - width) / 2)
|
|
||||||
|
|
||||||
starting_point = (min(x_values) - delta_x, min(y_values) - delta_y)
|
|
||||||
ending_point = (max(x_values) + delta_x, max(y_values) + delta_y)
|
|
||||||
|
|
||||||
if ending_point[0] - starting_point[0] == 0 or ending_point[1] - starting_point[1] == 0:
|
|
||||||
return keypoints
|
|
||||||
|
|
||||||
# normalize keypoints
|
|
||||||
for i in range(len(keypoints)):
|
|
||||||
keypoints[i][0] = (keypoints[i][0] - starting_point[0]) / (ending_point[0] - starting_point[0])
|
|
||||||
keypoints[i][1] = (keypoints[i][1] - starting_point[1]) / (ending_point[1] - starting_point[1])
|
|
||||||
|
|
||||||
return keypoints
|
|
||||||
|
|
||||||
|
|
||||||
# load training embedding csv
|
|
||||||
df = pd.read_csv('embeddings/basic-signs/embeddings.csv')
|
|
||||||
|
|
||||||
def minkowski_distance_p(x, y, p=2):
|
|
||||||
x = np.asarray(x)
|
|
||||||
y = np.asarray(y)
|
|
||||||
|
|
||||||
# Find smallest common datatype with float64 (return type of this
|
|
||||||
# function) - addresses #10262.
|
|
||||||
# Don't just cast to float64 for complex input case.
|
|
||||||
common_datatype = np.promote_types(np.promote_types(x.dtype, y.dtype),
|
|
||||||
'float64')
|
|
||||||
|
|
||||||
# Make sure x and y are NumPy arrays of correct datatype.
|
|
||||||
x = x.astype(common_datatype)
|
|
||||||
y = y.astype(common_datatype)
|
|
||||||
|
|
||||||
if p == np.inf:
|
|
||||||
return np.amax(np.abs(y-x), axis=-1)
|
|
||||||
elif p == 1:
|
|
||||||
return np.sum(np.abs(y-x), axis=-1)
|
|
||||||
else:
|
|
||||||
return np.sum(np.abs(y-x)**p, axis=-1)
|
|
||||||
|
|
||||||
def minkowski_distance(x, y, p=2):
|
|
||||||
x = np.asarray(x)
|
|
||||||
y = np.asarray(y)
|
|
||||||
if p == np.inf or p == 1:
|
|
||||||
return minkowski_distance_p(x, y, p)
|
|
||||||
else:
|
|
||||||
return minkowski_distance_p(x, y, p)**(1./p)
|
|
||||||
|
|
||||||
|
|
||||||
def distance_matrix(keypoints, embeddings, p=2, threshold=1000000):
|
|
||||||
|
|
||||||
x = np.array(keypoints)
|
|
||||||
m, k = x.shape
|
|
||||||
y = np.asarray(embeddings)
|
|
||||||
n, kk = y.shape
|
|
||||||
|
|
||||||
if k != kk:
|
|
||||||
raise ValueError(f"x contains {k}-dimensional vectors but y contains "
|
|
||||||
f"{kk}-dimensional vectors")
|
|
||||||
|
|
||||||
if m*n*k <= threshold:
|
|
||||||
print("Using minkowski_distance")
|
|
||||||
return minkowski_distance(x[:,np.newaxis,:],y[np.newaxis,:,:],p)
|
|
||||||
else:
|
|
||||||
result = np.empty((m,n),dtype=float) # FIXME: figure out the best dtype
|
|
||||||
if m < n:
|
|
||||||
for i in range(m):
|
|
||||||
result[i,:] = minkowski_distance(x[i],y,p)
|
|
||||||
else:
|
|
||||||
for j in range(n):
|
|
||||||
result[:,j] = minkowski_distance(x,y[j],p)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
CHECKPOINT_PATH = "checkpoints/checkpoint_embed_1105.pth"
|
|
||||||
checkpoint = torch.load(CHECKPOINT_PATH, map_location=device)
|
|
||||||
|
|
||||||
model = SPOTER_EMBEDDINGS(
|
|
||||||
features=checkpoint["config_args"].vector_length,
|
|
||||||
hidden_dim=checkpoint["config_args"].hidden_dim,
|
|
||||||
norm_emb=checkpoint["config_args"].normalize_embeddings,
|
|
||||||
).to(device)
|
|
||||||
|
|
||||||
model.load_state_dict(checkpoint["state_dict"])
|
|
||||||
embeddings = df.drop(columns=['labels', 'label_name', 'embeddings'])
|
|
||||||
|
|
||||||
# convert embedding from string to list of floats
|
|
||||||
embeddings["embeddings"] = embeddings["embeddings2"].apply(lambda x: [float(i) for i in x[1:-1].split(", ")])
|
|
||||||
# drop embeddings2
|
|
||||||
embeddings = embeddings.drop(columns=['embeddings2'])
|
|
||||||
# to list
|
|
||||||
embeddings = embeddings["embeddings"].tolist()
|
|
||||||
|
|
||||||
def make_prediction(keypoints):
|
|
||||||
# run model on frame
|
|
||||||
model.eval()
|
|
||||||
with torch.no_grad():
|
|
||||||
keypoints = torch.from_numpy(np.array([keypoints])).float().to(device)
|
|
||||||
new_embeddings = model(keypoints).cpu().numpy().tolist()[0]
|
|
||||||
|
|
||||||
# calculate distance matrix
|
|
||||||
dist_matrix = distance_matrix(new_embeddings, embeddings, p=2, threshold=1000000)
|
|
||||||
|
|
||||||
# get the 5 closest matches and select the class that is most common and use the average distance as the score
|
|
||||||
# get the 5 closest matches
|
|
||||||
indeces = np.argsort(dist_matrix)[0][:5]
|
|
||||||
# get the labels
|
|
||||||
labels = df["label_name"].iloc[indeces].tolist()
|
|
||||||
c = Counter(labels).most_common()[0][0]
|
|
||||||
|
|
||||||
# filter indeces to only include the most common label
|
|
||||||
indeces = [i for i in indeces if df["label_name"].iloc[i] == c]
|
|
||||||
# get the average distance
|
|
||||||
score = np.mean(dist_matrix[0][indeces])
|
|
||||||
|
|
||||||
return c, score
|
|
||||||
|
|
||||||
# open webcam stream
|
|
||||||
cap = cv2.VideoCapture(0)
|
|
||||||
|
|
||||||
while cap.isOpened():
|
|
||||||
# read frame
|
|
||||||
ret, frame = cap.read()
|
|
||||||
pose = extract_keypoints(frame)
|
|
||||||
|
|
||||||
if pose is None:
|
|
||||||
cv2.imshow('MediaPipe Hands', frame)
|
|
||||||
continue
|
|
||||||
|
|
||||||
buffer.append(pose)
|
|
||||||
if len(buffer) > 15:
|
|
||||||
buffer.pop(0)
|
|
||||||
|
|
||||||
if len(buffer) == 15:
|
|
||||||
label, score = make_prediction(buffer)
|
|
||||||
|
|
||||||
# draw label
|
|
||||||
cv2.putText(frame, label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
|
||||||
cv2.putText(frame, str(score), (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
|
|
||||||
|
|
||||||
# Show the frame
|
|
||||||
cv2.imshow('MediaPipe Hands', frame)
|
|
||||||
|
|
||||||
# Wait for key press to exit
|
|
||||||
if cv2.waitKey(5) & 0xFF == 27:
|
|
||||||
break
|
|
||||||
|
|
||||||
# open video A.mp4
|
|
||||||
# cap = cv2.VideoCapture('E.mp4')
|
|
||||||
# while cap.isOpened():
|
|
||||||
# # read frame
|
|
||||||
# ret, frame = cap.read()
|
|
||||||
# if frame is None:
|
|
||||||
# break
|
|
||||||
# pose = extract_keypoints(frame)
|
|
||||||
|
|
||||||
# buffer.append(pose)
|
|
||||||
|
|
||||||
# label, score = make_prediction(buffer)
|
|
||||||
# print(label, score)
|
|
||||||
Reference in New Issue
Block a user