Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ml pipeline rewrite #40

Merged
merged 11 commits into from
Jul 30, 2023
2 changes: 1 addition & 1 deletion clusterer/datapoint_clusterer.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line 90 still throws an error when only point.data is written. code runs perfectly with point.data.numpy()

Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def create_clusters(_datapoints: list[Datapoint], _epsilon:float, _min_samples:i
Returns:
list[Cluster] : a list of clusters found
"""
points = np.array([point.data[0].numpy() for point in _datapoints])
points = np.array([point.data.numpy() for point in _datapoints])
scaler = StandardScaler()
scaler.fit(points)
points = scaler.transform(points)
Expand Down
121 changes: 71 additions & 50 deletions clusterer/inference.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,102 @@
import torchvision
from exception import FaceNotFoundError
from model import FaceNet
from facenet_pytorch import MTCNN
from PIL import Image
from triplet_dataset import create_transform
import torch
from facenet_pytorch import MTCNN, InceptionResnetV1
import numpy as np
from exception import FaceNotFoundError


class EmbeddingPipeline:
"""
Pipeline class for detecting faces from a single image and converting to embedding vectors

Attributes:
detector (MTCNN): MTCNN detector object. Defaults to MTCNN(keep_all=True, device=device).eval().
resnet (InceptionResnetV1): InceptionResnetV1 object. Defaults to InceptionResnetV1(pretrained=pretrained, device=device).eval().
device (str): device to run the model on. Defaults to 'cpu'.
pretrained (str): pretrained model to use. Defaults to 'vggface2'.
resize (float): resize factor for the image. Defaults to None.
resnet (InceptionResnetV1): InceptionResnetV1 object. Defaults to InceptionResnetV1(pretrained='vggface2').eval().

"""
def __init__(self, detector: MTCNN = None, resnet: InceptionResnetV1 = None, device: str = 'cpu', pretrained: str = 'vggface2', resize: float = None):

def __init__(self, detector: MTCNN = None, default_resnet: bool = True, embedding_size: int = 512):
"""
Constructor for EmbeddingPipeline class

Args:
detector (MTCNN): MTCNN detector object. Defaults to MTCNN(keep_all=True, device=device).eval().
detector (MTCNN): MTCNN detector object. Defaults to MTCNN(keep_all=True, device=device).eval().
resnet (InceptionResnetV1): InceptionResnetV1 object. Defaults to InceptionResnetV1(pretrained=pretrained, device=device).eval().
device (str): device to run the model on. Defaults to 'cpu'.
pretrained (str): pretrained model to use. Defaults to 'vggface2'.
resize (float): resize factor for the image. Defaults to None.
Constructor for EmbeddingPipeline class

Args:
detector (MTCNN): MTCNN detector object. Defaults to MTCNN(keep_all=True, device=device).eval().
default_resnet (bool): boolean. If True defaults resnet used to InceptionResnetV1(pretrained='vggface2').eval().
embedding_size (int): size of embedding vector. Defaults to 512.

"""

DETECTOR = MTCNN(keep_all=True, device=device).eval()
RESNET = InceptionResnetV1(pretrained=pretrained, device=device).eval()
DETECTOR = MTCNN(keep_all=True).eval()
RESNET = FaceNet(embedding_size=embedding_size, use_default=default_resnet)

self.detector = DETECTOR if detector is None else detector
self.resize = resize
self.resnet = RESNET if resnet is None else resnet

self.resnet = RESNET

def __call__(self, filepath: str):
def __call__(self, image_path: str):
"""
Reads the image, processes it, optionally resizes it and detects faces
Reads the image, processes it and detects faces

Args:
filepath (str): path to the image file
image_path (str): path to the image file

Returns:
numpy array of embedding vectors of size {torch.Size([1, 512])}

"""
faces = self._detect_faces(image_path)
embeddings = self._create_embeddings(faces)
return embeddings

def _detect_faces(self, path: str):
""" detects faces from image in provided path

Args:
path (str): path to image

Raises:
FaceNotFoundError: in case of no face found in image

Returns:
list[PIL.Image.Image]: list of faces detected as PIL images
"""

img = Image.open(filepath)
images = []

if self.resize is not None:
img = img.resize([int(d*self.resize) for d in img.size])
transform_to_image = torchvision.transforms.ToPILImage()
image = Image.open(path)
faces = self.detector(image)
if faces is None:
raise FaceNotFoundError(path)
else:
for face in faces:
images.append(transform_to_image(face))
return images

def _create_embeddings(self, faces: list[Image.Image], transform_height: int = 224, transform_width: int = 224):
"""transforms and coverts faces into embedding vectors

Args:
faces (list[Image.Image]): list of PIL Images of faces
transform_height (int, optional): height of transformed image. Defaults to 224.
transform_width (int, optional): width of transformed image. Defaults to 224.

Returns:
torch.Tensor: tensor of embedding vectors of faces
"""

transform = create_transform(transform_height, transform_width)

transformed_faces = []

for face in faces:
converted = face.convert('RGB')
transformed_face = transform(converted)
transformed_faces.append(transformed_face)

detected_faces = self.detector(img)
transformed_faces = torch.stack(transformed_faces)

embeddings = self._create_embeddings(detected_faces, filepath)
embeddings = self.resnet.embed(transformed_faces)

return embeddings
return embeddings

def _create_embeddings(self, faces: list[torch.tensor], filepath: str):
"""
Converts array of faces to embedding vectors

Args:
faces (list[torch.tensor]): list of tensors of detected faces
filepath (str): path of image

Returns:
numpy array of embedding vectors of size {torch.Size([1, 512])}

"""
if faces is not None:
embeddings = np.array([self.resnet(torch.unsqueeze(face, 0)).detach().numpy() for face in faces])
return embeddings
else:
raise FaceNotFoundError(filepath)

2 changes: 1 addition & 1 deletion clusterer/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def embed(self, images: torch.Tensor):
Returns:
embeddings (np.ndarray) : numpy array of embeddings
"""
images = self.preprocess(images)
# images = self.preprocess(images)
embeddings: torch.Tensor = self.forward(images)
return embeddings.detach().numpy()

Expand Down
44 changes: 13 additions & 31 deletions clusterer/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def cluster_from_file(source_path: str, dest_path: str, epsilon: float):

def write_datapoints(source_path: str, dest_path: str):
'''
reads images from file or folder, creates datapoints and writes them into .pkl file
reads images from folder, creates datapoints and writes them into .pkl file

Args:
source_path (str): path of file or folder containing images
Expand All @@ -36,29 +36,17 @@ def write_datapoints(source_path: str, dest_path: str):
'''
points = []
pipeline = EmbeddingPipeline()

if os.path.isfile(source_path):
embeddings = pipeline(source_path)
if embeddings is not None:
for embedding in embeddings:
embedding = torch.from_numpy(embedding)
point = clusterer.Datapoint(embedding, None, source_path)
points.append(point)
else:
raise FaceNotFoundError(source_path)

else:
for root, _, filenames in os.walk(source_path):
for filename in filenames:
imgpath = os.path.join(root, filename)
embeddings = pipeline(imgpath)
if embeddings is not None:
for embedding in embeddings:
embedding = torch.from_numpy(embedding)
point = clusterer.Datapoint(embedding, None, imgpath)
points.append(point)
else:
raise FaceNotFoundError(imgpath)
for root, _, filenames in os.walk(source_path):
for filename in filenames:
imgpath = os.path.join(root, filename)
embeddings = pipeline(imgpath)
if embeddings is not None:
for embedding in embeddings:
embedding = torch.from_numpy(embedding)
point = clusterer.Datapoint(embedding, None, imgpath)
points.append(point)
else:
raise FaceNotFoundError(imgpath)

with open(dest_path, 'wb') as f:
pickle.dump(points, f, protocol=pickle.HIGHEST_PROTOCOL)
Expand Down Expand Up @@ -91,10 +79,4 @@ def get_cluster_ids(clusters: list[clusterer.Cluster], image_path: str):

else:
raise FaceNotFoundError(image_path)








Loading