Skip to content

Commit

Permalink
add pcloud_utils by alberto (PR #464)
Browse files Browse the repository at this point in the history
  • Loading branch information
han16nah committed Aug 28, 2024
1 parent a0a0d0b commit 1939f67
Showing 1 changed file with 281 additions and 0 deletions.
281 changes: 281 additions & 0 deletions python/pyhelios/util/pcloud_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
#!/usr/bin/env python
# -- coding: utf-8 --

"""
:author: Alberto M. Esmoris Pena
Utils for point cloud operations, e.g., point cloud comparison.
"""

import laspy
import pandas as pd
import numpy as np
from scipy.spatial import KDTree as KDT

# --- CONSTANTS --- #
# --------------------- #
# Expected names for the features
PCLOUD_FNAME_GPS_TIME = 'gps_time'
PCLOUD_FNAME_REFLECTANCE = 'reflectance'
PCLOUD_FNAME_NIR = 'nir'


# --- POINT CLOUD --- #
# ----------------------- #
class PointCloud:
"""
Class representing a point cloud.
:ivar X: The structure space matrix, i.e., the matrix of point-wise
coordinates.
:vartype X: :class:`np.ndarray`
:ivar fnames: The names for each feature.
:vartype fnames: list of str
:ivar F: The feature space matrix, i.e., the matrix of point-wise features.
:vartype F: :class:`np.ndarray` or None
:ivar y: The vector of classes (represented by integers), if any.
:vartype: :class:`np.ndarray` or None
"""
# --- CONSTRUCTION --- #
# ------------------------ #
def __init__(self, X, fnames=None, F=None, y=None):
self.X = X
self.F = F
self.y = y
self.fnames = fnames

@staticmethod
def from_las(las, fnames=None, include_classes=False):
"""
Build a point cloud object from the given LAS.
:param las: The LAS object as generated by laspy.
:param fnames: The names of the features that must be considered.
:param include_classes: Whether to consider the classification (True)
or not (False).
:return: Built point cloud from given LAS.
:rtype: :class:`.PointCloud`
"""
# Extract structure space
scales, offsets = las.header.scales, las.header.offsets
X = np.array([
las.X * scales[0] + offsets[0],
las.Y * scales[1] + offsets[1],
las.Z * scales[2] + offsets[2],
]).T
# Extract features
F = None
if fnames is not None:
F = np.array([las[fname] for fname in fnames]).T
# Extract classes
y = None
if include_classes:
y = np.array(las.classification)
# Build point cloud
return PointCloud(X, fnames=fnames, F=F, y=y)

@staticmethod
def from_las_file(path):
"""
Build a point cloud object from the LAS file at the given path.
:param path: Path to the LAS file.
"""
return PointCloud.from_las(laspy.read(path))

@staticmethod
def from_xyz_file(path, cols, names, sep=' '):
"""
Build a point cloud object from the XYZ/CSV file at the given path.
:param path: Path to the XYZ/CSV file.
:type path: str
:param cols: The indices of the columns to be selected.
:type cols: list of int
:param names: The name for each selected column. Note that "x", "y",
"z" must be given as the coordinates are necessary to build the
point cloud (features and classes are optional). The name
"classification" is reserved to the classes. Any name that has not
been mentioned before will be understood as a feature.
:type names: list of str
"""
# Read data
P = pd.read_csv(path, usecols=cols, names=names, header=None, sep=sep)
# Extract structure space
X = np.array([P['x'], P['y'], P['z']]).T
# Extract classes
y = P['classification'] if 'classification' in names else None
# Extract features
fnames = [
name
for name in names if name not in ['x', 'y', 'z', 'classification']
]
F = None
if len(fnames) > 0:
F = np.vstack([P[fname] for fname in fnames]).T
# Build point cloud
return PointCloud(X, fnames=fnames, F=F, y=y)

# --- ASSERT --- #
# ------------------ #
def assert_equals(self, pcloud, eps=1e-5, k=16):
"""
Assert whether two point clouds are equal.
:param pcloud: The point cloud to compare with.
:param eps: The numeric tolerance threshold.
:param k: How many nearest neighbors must be considered. A high enough
value of k implies that points with the same (x, y, z) but acquired
at different times (i.e., ti != tj) will be properly handled. If
GPSTime.
"""
# Check number of points
assert self.X.shape[0] == pcloud.X.shape[0]
# Check feature names (feature order must also be the same)
check = int(self.fnames is None) + int(pcloud.fnames is None)
assert check != 1 # One has fnames, other does not
if check == 0: # Both have fnames
assert len(self.fnames) == len(pcloud.fnames)
for i, fname in enumerate(self.fnames):
assert fname == pcloud.fnames[i]
# Get neighborhoods
N = self.find_neighborhoods(pcloud, k)
# Compare coordinates
NX = pcloud.X[N]
assert np.allclose(self.X, NX, atol=eps, rtol=0)
# Compare features
if self.F is not None:
# Check number of features
assert np.all(np.array(self.F.shape) == np.array(pcloud.F.shape))
# Check numerical differences in the features
NF = pcloud.F[N]
assert np.allclose(self.F, NF, atol=eps, rtol=0)
# Compare classes
check = int(self.y is None) + int(pcloud.y is None)
assert check != 1 # One has classes, other does not
if check == 0: # Both have classes
Ny = pcloud.y[N]
assert np.any(self.y == Ny)

# --- UTILS --- #
# ----------------- #
def find_neighborhoods(self, pcloud, k):
"""
Find the nearest neighbor for each points in the self point cloud wrt
the points in the given input point cloud.
"""
# Find GPS time, if available
gps_time_idx = None
if self.fnames is not None:
for i, fname in enumerate(self.fnames):
if fname == PCLOUD_FNAME_GPS_TIME:
gps_time_idx = i
break
# Build KDTree
kdt = KDT(pcloud.X)
# If no GPS time, take closest neighbor
if gps_time_idx is None:
return kdt.query(self.X, 1)[1].tolist()
# If GPS time, untie min distance neighbors with time
else:
t = self.F[:, gps_time_idx]
pcloud_t = pcloud.F[:, gps_time_idx]
D, N = kdt.query(self.X, k)
# NOTE that == is used, abs diff wrt eps could also be used
min_distance_mask = (D.T == np.min(D, axis=1)).T
N = [ni[min_distance_mask[i]] for i, ni in enumerate(N)]
for i, ni in enumerate(N):
jmin = np.argmin(np.abs(pcloud_t[ni]-t[i]))
N[i] = ni[jmin]
return N

def shuffle(self):
"""
Randomly shuffle the point cloud, i.e., random permutations on the
points.
"""
# Random shuffle on the structure space
indices = np.arange(0, self.X.shape[0])
np.random.shuffle(indices)
self.X = self.X[indices]
# Random shuffle any other array-like member attribute
for attr in ['F', 'y']:
if getattr(self, attr, None) is not None:
setattr(self, attr, getattr(self, attr)[indices])
return self # Return the object itself, because fluent :)


# --- M A I N --- #
# ------------------- #
# Check the logic when called as an executable script
if __name__ == '__main__':
def clipped_normal(mu, sigma, shape, xmin, xmax):
return np.clip( # Force values to be inside [xmin, xmax]
np.random.normal( # Normal distribution
mu, # Mean
sigma, # Standard deviation
shape # Output dimensionality
),
xmin, # Min value for clipping
xmax # Max value for clipping
)
# Test data with no repeated positions and no GPS time
X1 = np.unique(np.random.normal(0, 1, (1024, 3)), axis=0)
fnames1 = [PCLOUD_FNAME_REFLECTANCE, PCLOUD_FNAME_NIR]
F1 = clipped_normal(0, 1, (1024, 2), -3, 3)
y1 = np.random.randint(0, 5, 1024)
pcloud1 = PointCloud(X1, fnames=fnames1, F=F1, y=y1)

# Test data with no repeated positions and GPS time
fnames2 = fnames1 + [PCLOUD_FNAME_GPS_TIME]
F2 = np.hstack([F1, np.linspace(0, 60, 1024).reshape(-1, 1)])
pcloud2 = PointCloud(X1, fnames=fnames2, F=F2, y=y1)

# Test data with repeated positions and no GPS time
X3 = np.random.normal(0, 1, (768, 3))
X3 = np.vstack([X3, X3[::100], X3[::300]])
F3 = clipped_normal(0, 1, (X3.shape[0], 1), -3, 3)
y3 = np.random.randint(0, 2, X3.shape[0])
fnames3 = [PCLOUD_FNAME_REFLECTANCE]
# Ignore F3 in pcloud3 because repeated positions without time will fail
pcloud3 = PointCloud(X3, fnames=fnames3, F=None, y=y3)

# Test data with repeated positions and GPS time
fnames4 = fnames3 + [PCLOUD_FNAME_GPS_TIME]
F4 = np.hstack([F3, np.linspace(0, 71.3, X3.shape[0]).reshape(-1, 1)])
pcloud4 = PointCloud(X3, fnames=fnames4, F=F4, y=y3)

# Test data with no features and no classess at all
X5 = np.random.normal(0, 1, (999, 3))
pcloud5 = PointCloud(X5, fnames=None, F=None, y=None)

# Test data in pcloud1 with negligible noise
X6 = X1 + clipped_normal(0, 1e-8, X1.shape, -3e-8, 3e-8)
F6 = F1 + clipped_normal(0, 1e-8, F1.shape, -3e-8, 3e-8)
pcloud6 = PointCloud(X6, fnames=fnames1, F=F6, y=y1).shuffle()

# Test data in pcloud2 with negligible noise
F7 = np.hstack([F6, np.linspace(0, 60, 1024).reshape(-1, 1)])
pcloud7 = PointCloud(X6, fnames=fnames2, F=F7, y=y1).shuffle()

# Run asserts that must be passed
pcloud1.assert_equals(pcloud1)
print('Asserted PointCloud 1')
pcloud2.assert_equals(pcloud2)
print('Asserted PointCloud 2')
pcloud3.assert_equals(pcloud3)
print('Asserted PointCloud 3')
pcloud4.assert_equals(pcloud4)
print('Asserted PointCloud 4')
pcloud5.assert_equals(pcloud5)
print('Asserted PointCloud 5')
pcloud6.assert_equals(pcloud6)
pcloud1.assert_equals(pcloud6)
pcloud6.assert_equals(pcloud1)
print('Asserted PointCloud 6')
pcloud7.assert_equals(pcloud7)
pcloud2.assert_equals(pcloud7)
pcloud7.assert_equals(pcloud2)
print('Asserted PointCloud 7')
# All asserts passed
print('\nAll asserts passed! :)')

0 comments on commit 1939f67

Please sign in to comment.