import logging import os import time from typing import Dict, List, Tuple import cv2 import mediapipe as mp import numpy as np import pandas as pd class KeypointExtractor: def __init__(self, cache_folder: str = "cache"): self.mp_drawing = mp.solutions.drawing_utils self.mp_holistic = mp.solutions.holistic # self.video_folder = video_folder self.cache_folder = cache_folder # we will store the keypoints of each frame as a row in the dataframe. The columns are the keypoints: Pose (33), Left Hand (21), Right Hand (21). Each keypoint has 3 values: x, y self.columns = [f"{i}_{j}" for i in range(33+21*2) for j in ["x", "y"]] # holistic extractor self.holistic = mp.solutions.holistic.Holistic( min_detection_confidence=0.5, min_tracking_confidence=0.5, ) def extract_keypoints_from_video(self, video: str, normalize: str = None, draw: bool = False, ) -> pd.DataFrame: """extract_keypoints_from_video this function extracts keypoints from a video and stores them in a dataframe :param video: the video to extract keypoints from :type video: str :param normalize: the hand normalization algorithm to use, defaults to None :type normalize: str, optional :return: dataframe with keypoints in absolute pixels :rtype: pd.DataFrame """ video_name = video.split("/")[-1].split(".")[0] if not draw: # check if video exists if not os.path.exists(video): logging.error("Video does not exist at path: " + video) return None # check if cache exists if not os.path.exists(self.cache_folder): os.makedirs(self.cache_folder) # check if cache file exists and return if os.path.exists(self.cache_folder + "/" + video_name + ".npy"): # create dataframe from cache df = pd.DataFrame(np.load(self.cache_folder + "/" + video_name + ".npy", allow_pickle=True), columns=self.columns) if normalize: df = self.normalize_hands(df, norm_algorithm=normalize) df, _ = self.normalize_pose_bohacek(df) return df # open video cap = cv2.VideoCapture(video) keypoints_df = pd.DataFrame(columns=self.columns) # extract frames from video so we extract 5 frames per second frame_rate = int(cap.get(cv2.CAP_PROP_FPS)) frame_skip = (frame_rate // 10) -1 output_frames = [] while cap.isOpened(): # skip frames for _ in range(frame_skip): success, image = cap.read() if not success: break success, image = cap.read() if not success: break # extract keypoints of frame if draw: results, draw_image = self.extract_keypoints_from_frame(image, draw=True) output_frames.append(draw_image) else: results = self.extract_keypoints_from_frame(image) def extract_keypoints(landmarks): if landmarks: return [i for landmark in landmarks.landmark for i in [landmark.x, landmark.y]] # store keypoints in dataframe k1 = extract_keypoints(results.pose_landmarks) k2 = extract_keypoints(results.left_hand_landmarks) k3 = extract_keypoints(results.right_hand_landmarks) if k1 and (k2 or k3): data = [k1 + (k2 or [0] * 42) + (k3 or [0] * 42)] new_df = pd.DataFrame(data, columns=self.columns) keypoints_df = pd.concat([keypoints_df, new_df], ignore_index=True) # get frame width and height frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # convert to pixels keypoints_df.iloc[:, ::2] *= frame_width keypoints_df.iloc[:, 1::2] *= frame_height # close video cap.release() # save keypoints to cache np.save(self.cache_folder + "/" + video_name + ".npy", keypoints_df.to_numpy()) # normalize hands and pose keypoints if normalize: keypoints_df = self.normalize_hands(keypoints_df, norm_algorithm=normalize) keypoints_df, _ = self.normalize_pose_bohacek(keypoints_df) if draw: return keypoints_df, output_frames return keypoints_df def extract_keypoints_from_frame(self, image: np.ndarray, draw: bool = False): """extract_keypoints_from_frame this function extracts keypoints from a frame and draws them on the frame if draw is set to True :param image: the frame to extract keypoints from :type image: np.ndarray :param draw: indicates if frame with keypoints on must be returned, defaults to False :type draw: bool, optional :return: the keypoints and the frame with keypoints on if draw is set to True :rtype: np.ndarray """ # Convert the BGR image to RGB and process it with MediaPipe Pose. results = self.holistic.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) if draw: # Draw the pose annotations on the image draw_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # self.mp_drawing.draw_landmarks(draw_image, results.face_landmarks, self.mp_holistic.FACEMESH_CONTOURS) self.mp_drawing.draw_landmarks(draw_image, results.left_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS) self.mp_drawing.draw_landmarks(draw_image, results.right_hand_landmarks, self.mp_holistic.HAND_CONNECTIONS) img_width, img_height = image.shape[1], image.shape[0] # create bounding box around hands if results.left_hand_landmarks: x = [landmark.x for landmark in results.left_hand_landmarks.landmark] y = [landmark.y for landmark in results.left_hand_landmarks.landmark] draw_image = cv2.rectangle(draw_image, (int(min(x) * img_width), int(min(y) * img_height)), (int(max(x) * img_width), int(max(y) * img_height)), (0, 255, 0), 2) if results.right_hand_landmarks: x = [landmark.x for landmark in results.right_hand_landmarks.landmark] y = [landmark.y for landmark in results.right_hand_landmarks.landmark] draw_image = cv2.rectangle(draw_image, (int(min(x) * img_width), int(min(y) * img_height)), (int(max(x) * img_width), int(max(y) * img_height)), (255, 0, 0), 2) self.mp_drawing.draw_landmarks(draw_image, results.pose_landmarks, self.mp_holistic.POSE_CONNECTIONS) return results, draw_image return results def normalize_hands(self, dataframe: pd.DataFrame, norm_algorithm: str="minmax") -> pd.DataFrame: """normalize_hands this function normalizes the hand keypoints of a dataframe :param dataframe: the dataframe to normalize :type dataframe: pd.DataFrame :param norm_algorithm: the normalization algorithm to use, pick from "minmax" and "bohacek" :type norm_algorithm: str :return: the normalized dataframe :rtype: pd.DataFrame """ if norm_algorithm == "minmax": # normalize left hand dataframe, _= self.normalize_hand_minmax(dataframe, "left_hand") # normalize right hand dataframe, _= self.normalize_hand_minmax(dataframe, "right_hand") elif norm_algorithm == "bohacek": # normalize left hand dataframe, _= self.normalize_hand_bohacek(dataframe, "left_hand") # normalize right hand dataframe, _= self.normalize_hand_bohacek(dataframe, "right_hand") else: return dataframe return dataframe def normalize_hand_minmax(self, dataframe: pd.DataFrame, hand: str) -> Tuple[pd.DataFrame, pd.DataFrame]: """normalize_hand_helper this function normalizes the hand keypoints of a dataframe with respect to the minimum and maximum coordinates :param dataframe: the dataframe to normalize :type dataframe: pd.DataFrame :param hand: the hand to normalize :type hand: str :return: the normalized dataframe and the bounding boxes dataframe :rtype: Tuple[pd.DataFrame, pd.DataFrame] """ # get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149) hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))]) # get the x, y coordinates of the hand keypoints hand_coords = dataframe.iloc[:, hand_columns].values.reshape(-1, 21, 2) # get the min and max x, y coordinates of the hand keypoints min_x, min_y = np.min(hand_coords[:, :, 0], axis=1), np.min(hand_coords[:, :, 1], axis=1) max_x, max_y = np.max(hand_coords[:, :, 0], axis=1), np.max(hand_coords[:, :, 1], axis=1) # calculate the center of the hand keypoints center_x, center_y = (min_x + max_x) / 2, (min_y + max_y) / 2 # calculate the width and height of the bounding box around the hand keypoints bbox_width, bbox_height = max_x - min_x, max_y - min_y # repeat the center coordinates and bounding box dimensions to match the shape of hand_coords (numpy magic) center_x, center_y = center_x.reshape(-1, 1, 1), center_y.reshape(-1, 1, 1) center_coords = np.concatenate((np.tile(center_x, (1, 21, 1)), np.tile(center_y, (1, 21, 1))), axis=2) bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1 ,1) bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2) if np.any(bbox_dims == 0): return dataframe, None # normalize the hand keypoints based on the bounding box around the hand norm_hand_coords = (hand_coords - center_coords) / bbox_dims # flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42) # merge starting and ending points of the bounding boxes in a dataframe bbox_array = np.hstack((min_x.reshape(-1, 1), min_y.reshape(-1, 1), max_x.reshape(-1, 1), max_y.reshape(-1, 1))) bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y']) return dataframe, bbox def normalize_hand_bohacek(self, dataframe: pd.DataFrame, hand: str) -> Tuple[pd.DataFrame, pd.DataFrame]: """normalize_hand_helper this function normalizes the hand keypoints of a dataframe using the bohacek normalization algorithm :param dataframe: the dataframe to normalize :type dataframe: pd.DataFrame :param hand: the hand to normalize :type hand: str :return: the normalized dataframe and the bounding boxes dataframe :rtype: Tuple[pd.DataFrame, pd.DataFrame] """ # get all columns that belong to the hand (left hand column 66 - 107, right hand column 108 - 149) hand_columns = np.array([i for i in range(66 + (42 if hand == "right_hand" else 0), 108 + (42 if hand == "right_hand" else 0))]) # get the x, y coordinates of the hand keypoints hand_coords = dataframe.iloc[:, hand_columns].values.reshape(-1, 21, 2) # get the min and max x, y coordinates of the hand keypoints min_x, min_y = np.min(hand_coords[:, :, 0], axis=1), np.min(hand_coords[:, :, 1], axis=1) max_x, max_y = np.max(hand_coords[:, :, 0], axis=1), np.max(hand_coords[:, :, 1], axis=1) # calculate the hand keypoint width and height (NOT the bounding box width and height!) width, height = max_x - min_x, max_y - min_y # initialize empty arrays for deltas delta_x = np.zeros(width.shape, dtype='float64') delta_y = np.zeros(height.shape, dtype='float64') # calculate the deltas mask = width>height # width > height delta_x[mask] = (0.1 * width)[mask] delta_y[mask] = (delta_x + ((width - height) / 2))[mask] # height >= width delta_y[~mask] = (0.1 * height)[~mask] delta_x[~mask] = (delta_y + ((height - width) / 2))[~mask] # set the starting and ending point of the normalization bounding box starting_x, starting_y = min_x - delta_x, min_y - delta_y ending_x, ending_y = max_x + delta_x, max_y + delta_y # calculate the center of the bounding box and the bounding box dimensions bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2 bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y # repeat the center coordinates and bounding box dimensions to match the shape of hand_coords bbox_center_x, bbox_center_y = bbox_center_x.reshape(-1, 1, 1), bbox_center_y.reshape(-1, 1, 1) center_coords = np.concatenate((np.tile(bbox_center_x, (1, 21, 1)), np.tile(bbox_center_y, (1, 21, 1))), axis=2) bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1 ,1) bbox_dims = np.concatenate((np.tile(bbox_width, (1, 21, 1)), np.tile(bbox_height, (1, 21, 1))), axis=2) if np.any(bbox_dims == 0): return dataframe, None # normalize the hand keypoints based on the bounding box around the hand norm_hand_coords = (hand_coords - center_coords) / bbox_dims # flatten the normalized hand keypoints array and replace the original hand keypoints with the normalized hand keypoints in the dataframe dataframe.iloc[:, hand_columns] = norm_hand_coords.reshape(-1, 42) # merge starting and ending points of the bounding boxes in a dataframe bbox_array = np.hstack((starting_x.reshape(-1, 1), starting_y.reshape(-1, 1), ending_x.reshape(-1, 1), ending_y.reshape(-1, 1))) bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y']) return dataframe, bbox def normalize_pose_bohacek(self, dataframe: pd.DataFrame, bbox_size: float = 4) -> Tuple[pd.DataFrame, pd.DataFrame]: """normalize_pose_bohacek this function normalizes the pose keypoints of a dataframe using the Bohacek-normalization algorithm :param dataframe: the dataframe to normalize :type dataframe: pd.DataFrame :param bbox_size: the width and height of the normalization bounding box expressed in head metrics, defaults to 4 :type bbox_size: float, optional :return: the normalized dataframe and the bounding boxes dataframe :rtype: Tuple[pd.DataFrame, pd.DataFrame] """ # get the columns that belong to the pose pose_columns = np.array([i for i in range(66)]) # get the x, y coordinates of the pose keypoints pose_coords = dataframe.iloc[:, pose_columns].values.reshape(-1, 33, 2) # check in what frames shoulders are visible left_shoulder_present_mask = pose_coords[:, 11, 0] != 0 right_shoulder_present_mask = pose_coords[:, 12, 0] != 0 shoulders_present_mask = np.logical_and(left_shoulder_present_mask, right_shoulder_present_mask) # calculate shoulder distance left_shoulder, right_shoulder = pose_coords[shoulders_present_mask, 11], pose_coords[shoulders_present_mask, 12] shoulder_distance = ((left_shoulder[:, 0] - right_shoulder[:, 0])**2 + (left_shoulder[:, 1] - right_shoulder[:, 1])**2)**0.5 head_metric = shoulder_distance # center of shoulders and left eye are necessary to construct bounding box center_shoulders = right_shoulder + (left_shoulder - right_shoulder) / 2 left_eye = pose_coords[shoulders_present_mask, 2] # set the starting and ending point of the normalization bounding box starting_x, starting_y = center_shoulders[:, 0] - (bbox_size / 2) * head_metric, left_eye[:, 1] - 0.5 * head_metric ending_x, ending_y = center_shoulders[:, 0] + (bbox_size / 2) * head_metric, starting_y + (bbox_size - 0.5) * head_metric # calculate the center of the bounding box and the bounding box dimensions bbox_center_x, bbox_center_y = (starting_x + ending_x) / 2, (starting_y + ending_y) / 2 bbox_width, bbox_height = ending_x - starting_x, ending_y - starting_y # repeat the center coordinates and bounding box dimensions to match the shape of pose_coords bbox_center_x, bbox_center_y = bbox_center_x.reshape(-1, 1, 1), bbox_center_y.reshape(-1, 1, 1) center_coords = np.concatenate((np.tile(bbox_center_x, (1, 33, 1)), np.tile(bbox_center_y, (1, 33, 1))), axis=2) bbox_width, bbox_height = bbox_width.reshape(-1, 1, 1), bbox_height.reshape(-1, 1, 1) bbox_dims = np.concatenate((np.tile(bbox_width, (1, 33, 1)), np.tile(bbox_height, (1, 33, 1))), axis=2) if np.any(bbox_dims == 0): return dataframe, None # normalize the pose keypoints based on the bounding box norm_pose_coords = (pose_coords - center_coords) / bbox_dims # flatten the normalized pose keypoints array and replace the original pose keypoints with the normalized pose keypoints in the dataframe dataframe.iloc[shoulders_present_mask, pose_columns] = norm_pose_coords.reshape(-1, 66) # merge starting and ending points of the bounding boxes in a dataframe bbox_array = np.hstack((starting_x.reshape(-1, 1), starting_y.reshape(-1, 1), ending_x.reshape(-1, 1), ending_y.reshape(-1, 1))) bbox = pd.DataFrame(bbox_array, columns=['starting_x', 'starting_y', 'ending_x', 'ending_y']) return dataframe, bbox