-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdistance.py
251 lines (196 loc) · 7.52 KB
/
distance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import numpy as np
from scipy.spatial.distance import pdist, squareform
from joblib import Parallel, delayed
from tqdm import tqdm
from matminer.featurizers.site.fingerprint import CrystalNNFingerprint
from ElM2D import ElM2D
from ElMD import ElMD
import gridrdf
from matminer.featurizers.site import CrystalNNFingerprint
from matminer.featurizers.structure import SiteStatsFingerprint
# LoStOP Distance ######################################################################
import numpy as np
from scipy.spatial.distance import pdist, squareform
from joblib import Parallel, delayed
from tqdm import tqdm
from matminer.featurizers.site.fingerprint import CrystalNNFingerprint
from ElM2D import ElM2D
from ElMD import ElMD
import gridrdf
from matminer.featurizers.site import CrystalNNFingerprint
from matminer.featurizers.structure import SiteStatsFingerprint
import warnings
# LoStOP Distance ######################################################################
def process_structure(idx, structure, preset="ops", verbose=True):
"""Process a single structure to generate LoStOP fingerprints.
Args:
idx (int): Index of the structure being processed
structure (Structure): Pymatgen structure object
preset (str): Preset configuration for CrystalNNFingerprint
verbose (bool): Whether to print processing messages
Returns:
tuple: (Fingerprint array for the structure, Feature labels)
"""
cnnf = CrystalNNFingerprint.from_preset(preset)
site_feats = []
site_fails = 0
n_sites = len(structure)
for site in range(n_sites):
try:
feat = cnnf.featurize(structure, site)
site_feats.append(feat)
except Exception as e:
if verbose:
print(
f"Failed on site {site} of structure {idx} ({structure.formula}): {str(e)}"
)
site_fails += 1
if site_fails > 0 and verbose:
print(
f"Failed to featurize {site_fails}/{n_sites} sites for structure {idx} ({structure.formula})"
)
if site_feats:
site_feats_array = np.array(site_feats)
stats = {
"mean": np.mean(site_feats_array, axis=0),
"std_dev": np.std(site_feats_array, axis=0),
"minimum": np.min(site_feats_array, axis=0),
"maximum": np.max(site_feats_array, axis=0),
}
structure_feat = np.concatenate(list(stats.values()))
labels = [
f"{stat}_{label}"
for stat in stats.keys()
for label in cnnf.feature_labels()
]
return structure_feat, labels
else:
if verbose:
print(f"No features calculated for structure {idx} ({structure.formula})")
return None, None
def calculate_fingerprints(structures, preset="ops", n_jobs=-1):
"""
Calculates structure fingerprints in parallel using joblib.
Args:
structures: List of pymatgen Structure objects
preset: Preset to use for CrystalNNFingerprint
n_jobs: Number of parallel jobs (-1 for all cores)
Returns:
tuple: (list of fingerprints, feature labels)
"""
indices = list(range(len(structures)))
iterator = tqdm(
zip(indices, structures),
total=len(structures),
desc="Generating LoStOP DM",
)
# Process structures in parallel
results = Parallel(n_jobs=n_jobs, backend="loky")(
delayed(process_structure)(idx, structure, preset=preset, verbose=False)
for idx, structure in iterator
)
# Separate fingerprints and labels
structure_feats, labels_list = zip(*results)
# Filter out None values from failed calculations
structure_feats = [feat for feat in structure_feats if feat is not None]
# Use labels from first successful calculation
labels = next((lab for lab in labels_list if lab is not None), None)
return structure_feats, labels
def get_lostop_dm(structures: list, precomputed: str = None):
"""
Calculate LoStOP distance matrix with expanded statistics.
Args:
structures: List of pymatgen Structure objects
precomputed: Path to precomputed features file (optional)
Returns:
numpy.ndarray: Distance matrix
"""
print("\nCalculating LoStOP distance matrix...")
if precomputed is None:
structure_feats, labels = calculate_fingerprints(structures)
structure_feats = np.array(structure_feats)
else:
structure_feats = np.loadtxt(precomputed, delimiter=",", skiprows=1)
labels = None
try:
distance_matrix = squareform(pdist(structure_feats, metric="euclidean"))
except ValueError as e:
print(f"Error calculating distance matrix: {e}")
distance_matrix = structure_feats
return distance_matrix
# ElMD Distance ########################################################################
def get_elmd_dm(formulas: list):
"""
Calculate ElMD distance matrix for chemical formulas.
Args:
formulas: List of chemical formula strings
Returns:
numpy.ndarray: Distance matrix
"""
print("\nCalculating ElMD distance matrix...")
mapper = ElM2D(verbose=False)
mapper.fit(formulas)
return mapper.dm
# GRID-RDF Distance ####################################################################
def calculate_grid_representations(
structures: list,
maximum_grid_distance=10,
bin_size=0.1,
broadening=0.1,
number_of_shells=100,
):
"""
Calculate grid representations for a list of structures using RDF analysis.
Args:
structures: List of structure objects to analyze
maximum_grid_distance: Maximum distance for the grid calculation (default: 10)
bin_size: Size of bins for discretization (default: 0.1)
broadening: Broadening parameter for RDF calculation (default: 0.1)
number_of_shells: Number of neighbor shells to consider (default: 100)
Returns:
list: List of grid representations for each structure
"""
# Find all neighbours and cutoffs
neighbours, cutoffs = gridrdf.extendRDF.find_all_neighbours(
structures,
num_neighbours=number_of_shells,
cutoff=None,
return_limits=True,
dryrun=False,
)
# Adjust the cutoff to accommodate all nearest neighbours
max_dist = max(maximum_grid_distance, np.round(cutoffs[1], 1))
if max_dist != maximum_grid_distance:
print(
f"Maximum distance has been updated to {max_dist} to account for {number_of_shells} neighbours"
)
# Calculate grid representations for each structure
grid_representations = []
for i, struct in enumerate(structures):
grid_rep = gridrdf.extendRDF.calculate_rdf(
struct,
neighbours[i],
rdf_type="grid",
max_dist=max_dist,
bin_width=bin_size,
smearing=broadening,
normed=True,
broadening_method="convolve",
return_sparse=False,
)
grid_representations.append(grid_rep)
return grid_representations
def get_gridrdf_dm(structures: list):
"""
Calculate the grid-RDF distance matrix for a list of structures.
Args:
structures: List of structure objects
Returns:
numpy.ndarray: Distance matrix
"""
print("\nCalculating GRID-RDF distance matrix...")
grid_representations = calculate_grid_representations(structures)
distance_matrix = gridrdf.earth_mover_distance.super_fast_EMD_matrix(
grid_representations, bin_width=0.1
)
return distance_matrix