Skip to content

Commit

Permalink
cmems obs script updates
Browse files Browse the repository at this point in the history
  • Loading branch information
JustinElms committed Oct 30, 2024
1 parent 7146a20 commit aa5cacb
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 103 deletions.
3 changes: 2 additions & 1 deletion scripts/data_importers/cmems_drifter.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def main(uri: str, filename: str):
print(fname)
with xr.open_dataset(fname) as ds:

ds = reformat_coordinates(ds)
if ds.LATITUDE.size > 1:
ds = reformat_coordinates(ds)

df = ds.to_dataframe().reset_index().dropna(axis=1, how="all").dropna()

Expand Down
197 changes: 95 additions & 102 deletions scripts/data_importers/cmems_glider.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def reformat_coordinates(ds: xr.Dataset) -> xr.Dataset:

return ds


def main(uri: str, filename: str):

"""Import Glider NetCDF
:param str uri: Database URI
Expand All @@ -73,121 +73,114 @@ def main(uri: str, filename: str):
for fname in filenames:
print(fname)
with xr.open_dataset(fname).drop_duplicates("TIME") as ds:
time_diff = np.diff(ds.TIME.data).astype("timedelta64[D]").astype(int)
breaks = np.argwhere(time_diff > 5).flatten()
deployment_times = np.split(ds.TIME, breaks + 1)

ds = reformat_coordinates(ds)
variables = [v for v in VARIABLES if v in ds.variables]

for deployment in deployment_times:
subset = ds.sel(TIME=deployment)
subset = reformat_coordinates(subset)

dep_date = np.datetime_as_string(deployment, unit="D")[0]
df = (
subset[["TIME", "LATITUDE", "LONGITUDE", *variables]]
.to_dataframe()
.reset_index()
.dropna(axis=1, how="all")
.dropna()
)

# remove missing variables from variables list
variables = [v for v in VARIABLES if v in df.columns]

for variable in variables:
if variable not in datatype_map:
statement = select(DataType).where(
DataType.key == subset[variable].standard_name
df = (
ds[["TIME", "LATITUDE", "LONGITUDE", *variables]]
.to_dataframe()
.reset_index()
.dropna(axis=1, how='all')
.dropna()
)

# remove missing variables from variables list
variables = [v for v in VARIABLES if v in df.columns]

for variable in variables:
if variable not in datatype_map:
statement = select(DataType).where(
DataType.key == ds[variable].standard_name
)
dt = session.execute(statement).all()
if not dt:
dt = DataType(
key=ds[variable].standard_name,
name=ds[variable].long_name,
unit=ds[variable].units,
)
dt = session.execute(statement).all()
if not dt:
dt = DataType(
key=subset[variable].standard_name,
name=subset[variable].long_name,
unit=subset[variable].units,
)
session.add(dt)
else:
dt = dt[0][0]

datatype_map[variable] = dt

session.add(dt)
else:
dt = dt[0][0]

datatype_map[variable] = dt

session.commit()

p = Platform(
type=Platform.Type.glider, unique_id=f"{ds.attrs["platform_code"]}"
)
attrs = {
"Glider Platform": ds.attrs["platform_code"],
"WMO": ds.attrs["wmo_platform_code"],
"Institution": ds.attrs["institution"],
}
p.attrs = attrs

try:
session.add(p)
session.commit()
except IntegrityError:
print("Error committing platform.")
session.rollback()
stmt = select(Platform.id).where(Platform.unique_id == ds.attrs["platform_code"])
p.id = session.execute(stmt).first()[0]

n_chunks = np.ceil(len(df)/1e4)

if n_chunks < 1:
continue

for chunk in np.array_split(df, n_chunks):
chunk["STATION_ID"] = 0

stations = [
Station(
platform_id=p.id,
time=row.TIME,
latitude=row.LATITUDE,
longitude=row.LONGITUDE,
)
for idx, row in chunk[["TIME","LATITUDE", "LONGITUDE"]].drop_duplicates().iterrows()
]

platform_id = subset.attrs["platform_code"]
p = Platform(
type=Platform.Type.glider, unique_id=f"{platform_id}-{dep_date}"
)
attrs = {
"Glider Platform": platform_id,
"WMO": subset.attrs["wmo_platform_code"],
"Institution": subset.attrs["institution"],
}
p.attrs = attrs

# Using return_defaults=True here so that the stations will get
# updated with id's. It's slower, but it means that we can just
# put all the station ids into a pandas series to use when
# constructing the samples.
try:
session.add(p)
session.bulk_save_objects(stations, return_defaults=True)
session.commit()
except IntegrityError:
print("Error committing platform.")
print("Error committing station.")
session.rollback()
stmt = select(Platform.id).where(
Platform.unique_id == f"{platform_id}-{dep_date}"
)
p.id = session.execute(stmt).first()[0]

n_chunks = np.ceil(len(df) / 1e4)
stmt = select(Station).where(Station.platform_id==p.id)
station_data = session.execute(stmt).all()

if n_chunks < 1:
continue
for station in station_data:
chunk.loc[chunk["TIME"]==station[0].time,"STATION_ID"] = station[0].id

for chunk in np.array_split(df, n_chunks):
stations = [
Station(
platform_id=p.id,
time=row.TIME,
latitude=row.LATITUDE,
longitude=row.LONGITUDE,
samples = [
[
Sample(
station_id=row.STATION_ID,
depth=row.DEPTH,
value=row[variable],
datatype_key=datatype_map[variable].key,
)
for idx, row in chunk.iterrows()
for variable in variables
]
for idx, row in chunk.iterrows()
]
try:
session.bulk_save_objects(
[item for sublist in samples for item in sublist]
)
except IntegrityError:
print("Error committing samples.")
session.rollback()

# Using return_defaults=True here so that the stations will get
# updated with id's. It's slower, but it means that we can just
# put all the station ids into a pandas series to use when
# constructing the samples.
try:
session.bulk_save_objects(stations, return_defaults=True)
except IntegrityError:
print("Error committing station.")
session.rollback()
stmt = select(Station).where(Station.platform_id == p.id)
chunk["STATION_ID"] = session.execute(stmt).all()

chunk["STATION_ID"] = [s.id for s in stations]

samples = [
[
Sample(
station_id=row.STATION_ID,
depth=row.DEPTH,
value=row[variable],
datatype_key=datatype_map[variable].key,
)
for variable in variables
]
for idx, row in chunk.iterrows()
]
try:
session.bulk_save_objects(
[item for sublist in samples for item in sublist]
)
except IntegrityError:
print("Error committing samples.")
session.rollback()

session.commit()
session.commit()


if __name__ == "__main__":
Expand Down

0 comments on commit aa5cacb

Please sign in to comment.