From 259cbabd881d479b887c6fd6ff1487bfc8b02f09 Mon Sep 17 00:00:00 2001 From: jonnymaserati Date: Sun, 28 Jan 2024 10:23:39 +0100 Subject: [PATCH] Minor enhancements and a couple of handy . --- tests/test_clearance_iscwsa.py | 130 ++++++------- tests/test_connector.py | 240 +++++++++++------------ tests/test_fluid.py | 54 +++--- tests/test_iscwsa_mwd_error.py | 124 ++++++------ tests/test_minimal.py | 29 ++- tests/test_survey_interpolate.py | 77 +++----- tests/test_survey_parameters.py | 175 +++++++---------- tests/test_utils.py | 317 ++++++++++++++++++------------- welleng/connector.py | 39 +++- welleng/survey.py | 26 ++- welleng/utils.py | 29 +++ welleng/version.py | 2 +- welleng/visual.py | 5 +- 13 files changed, 616 insertions(+), 631 deletions(-) diff --git a/tests/test_clearance_iscwsa.py b/tests/test_clearance_iscwsa.py index 94b7035..a67a3a1 100644 --- a/tests/test_clearance_iscwsa.py +++ b/tests/test_clearance_iscwsa.py @@ -1,4 +1,3 @@ -import unittest from welleng.survey import Survey, make_survey_header from welleng.clearance import IscwsaClearance import numpy as np @@ -7,7 +6,7 @@ """ Test that the ISCWSA clearance model is working within a defined tolerance, testing against the ISCWSA standard set of wellpaths for evaluating clearance -scenarios using the MWD Rev4 error model. +scenarios using the MWD Rev4 error model. """ # Read well and validation data @@ -57,74 +56,67 @@ def generate_surveys(self, data=data): return surveys -class TestClearanceIscwsa(unittest.TestCase): - def test_minimize_sf(self, data=data): - surveys = generate_surveys(data) - reference = surveys["Reference well"] - offset = surveys["09 - well"] - - result = IscwsaClearance(reference, offset, minimize_sf=False) - result_min = IscwsaClearance(reference, offset, minimize_sf=True) - - idx = np.where(result_min.ref.interpolated == False) - - # Check that interpolated survey is not corrupted - for attr in [ - 'azi_grid_rad', 'azi_mag_rad', 'azi_true_rad', 'cov_hla', 'cov_nev', - 'pos_nev', 'pos_xyz', 'md', 'radius' - ]: - assert np.allclose( - getattr(result.ref, attr), getattr(result_min.ref, attr)[idx] - ) - - pass - - for attr in [ - 'Rr', 'calc_hole', 'distance_cc', 'eou_boundary', - 'eou_separation', 'hoz_bearing', 'idx', 'masd', 'off_cov_hla', - 'off_cov_nev', 'off_delta_hlas', 'off_delta_nevs', 'off_pcr', - 'ref_cov_hla', 'ref_cov_nev', 'ref_delta_hlas', 'ref_delta_nevs', - 'ref_nevs', 'ref_pcr', 'sf', 'wellbore_separation' - ]: - # `toolface_bearing` and `trav_cyl_azi_deg` are a bit unstable when - # well paths are parallel. - - assert np.allclose( - getattr(result, attr), getattr(result_min, attr)[idx], - rtol=1e-01, atol=1e-02 - ) - - pass - - def test_clearance_iscwsa(self, data=data, rtol=1e-02, atol=1e-03): - surveys = generate_surveys(data) - reference = surveys["Reference well"] - - # Perform clearance checks for each survey - for well in surveys: - if well != "09 - well": - continue - if well == "Reference well": - continue - else: - offset = surveys[well] - # skip well 10 - if well in ["10 - well"]: - continue - else: - for b in [False, True]: - result = IscwsaClearance(reference, offset, minimize_sf=b) - assert np.allclose( - result.sf[np.where(result.ref.interpolated == False)], - np.array(data["wells"][well]["SF"]), - rtol=rtol, atol=atol - ) +def test_minimize_sf(data=data): + surveys = generate_surveys(data) + reference = surveys["Reference well"] + offset = surveys["09 - well"] + + result = IscwsaClearance(reference, offset, minimize_sf=False) + result_min = IscwsaClearance(reference, offset, minimize_sf=True) + + idx = np.where(result_min.ref.interpolated == False) # noqa E712 + + # Check that interpolated survey is not corrupted + for attr in [ + 'azi_grid_rad', 'azi_mag_rad', 'azi_true_rad', 'cov_hla', 'cov_nev', + 'pos_nev', 'pos_xyz', 'md', 'radius' + ]: + assert np.allclose( + getattr(result.ref, attr), getattr(result_min.ref, attr)[idx] + ) + + pass + + for attr in [ + 'Rr', 'calc_hole', 'distance_cc', 'eou_boundary', + 'eou_separation', 'hoz_bearing', 'idx', 'masd', 'off_cov_hla', + 'off_cov_nev', 'off_delta_hlas', 'off_delta_nevs', 'off_pcr', + 'ref_cov_hla', 'ref_cov_nev', 'ref_delta_hlas', 'ref_delta_nevs', + 'ref_nevs', 'ref_pcr', 'sf', 'wellbore_separation' + ]: + # `toolface_bearing` and `trav_cyl_azi_deg` are a bit unstable when + # well paths are parallel. + + assert np.allclose( + getattr(result, attr), getattr(result_min, attr)[idx], + rtol=1e-01, atol=1e-02 + ) pass -# make above test runnanble separately -if __name__ == '__main__': - unittest.main() - # test_minimize_sf(data=data) - # test_clearance_iscwsa(data=data) +def test_clearance_iscwsa(data=data, rtol=1e-02, atol=1e-03): + surveys = generate_surveys(data) + reference = surveys["Reference well"] + + # Perform clearance checks for each survey + for well in surveys: + if well != "09 - well": + continue + if well == "Reference well": + continue + else: + offset = surveys[well] + # skip well 10 + if well in ["10 - well"]: + continue + else: + for b in [False, True]: + result = IscwsaClearance(reference, offset, minimize_sf=b) + assert np.allclose( + result.sf[np.where(result.ref.interpolated == False)], # noqa E712 + np.array(data["wells"][well]["SF"]), + rtol=rtol, atol=atol + ) + + pass diff --git a/tests/test_connector.py b/tests/test_connector.py index b451689..cbadb4b 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -1,139 +1,113 @@ -import inspect -import sys -import unittest - import numpy as np from welleng.connector import Connector from welleng.survey import Survey, from_connections -class ConnectorTest(unittest.TestCase): - def test_md_hold(self): - # test hold with only md provided - c = Connector( - vec1=[0, 0, 1], - md2=500, - ) - assert ( - c.inc_target == c.inc1 - and c.azi_target == c.azi1 - and c.pos_target[2] == c.md_target - ), "Failed c1" - assert c.method == 'hold', "Unexpected method" - - assert isinstance(from_connections(c), Survey) - - def test_md_and_vec(self): - # test with md2 and vec2 provided (minimum curvature) - c = Connector( - vec1=[0, 0, 1], - md2=1000, - vec2=[0, 1, 0] - ) - assert c.method == 'min_curve' - - def test_pos(self): - # test with pos2 provided (minimum distance) - c = Connector( - vec1=[0, 0, 1], - pos2=[100, 100, 1000], - ) - assert c.md_target > c.pos1[2], "Failed c3" - - def test_pos_and_dls(self): - # test with pos2 needing more aggressive dls (minimum curvature) - c = Connector( - vec1=[0, 0, 1], - pos2=[200, 400, 200] - ) - assert c.method == 'min_curve_to_target' - - def test_pos_and_vec(self): - # test with pos2 and vec2 provided - vec1 = [-1, -1, 1] - vec2 = [1, -1, 0] - c = Connector( - pos1=[0., 0., 0], - vec1=vec1 / np.linalg.norm(vec1), - pos2=[0., 1000., 500.], - vec2=vec2 / np.linalg.norm(vec2), - ) - assert c.method == 'curve_hold_curve' - - # test if interpolator and survey functions are working - assert isinstance(from_connections(c, step=30), Survey) - - def test_pos_inc_azi(self): - # test with pos2, inc1 and azi1 provided - c = Connector( - pos1=[0., 0., 0], - inc1=0., - azi1=90, - pos2=[1000., 1000., 1000.], - vec2=[0., 0., 1.], - ) - assert c.method == 'curve_hold_curve' - - def test_dls2(self): - # test with different dls for second curve section - c = Connector( - pos1=[0., 0., 0], - vec1=[0., 0., 1.], - pos2=[0., 100., 1000.], - vec2=[0., 0., 1.], - dls_design2=5 - ) - assert c.radius_design2 < c.radius_design - - def test_radius_critical(self): - # test with dls_critical requirement (actual dls < dls_design) - c = Connector( - pos1=[0., 0., 0], - vec1=[0., 0., 1.], - pos2=[0., 100., 100.], - vec2=[0., 0., 1.], - ) - assert c.radius_critical < c.radius_design - - def test_min_curve(self): - # test min_curve (inc2 provided) - c = Connector( - pos1=[0., 0., 0], - vec1=[0., 0., 1.], - inc2=30, - ) - assert c.method == 'min_curve' - - def test_radius_critical_with_min_curve(self): - # test min_curve with md less than required radius - c = Connector( - pos1=[0., 0., 0], - inc1=0, - azi1=0, - md2=500, - inc2=90, - azi2=0, - ) - assert c.radius_critical < c.radius_design -# def one_function_to_run_them_all(): -# """ -# Function to gather the test functions so that they can be tested by -# running this module. - -# https://stackoverflow.com/questions/18907712/python-get-list-of-all- -# functions-in-current-module-inspecting-current-module -# """ -# test_functions = [ -# obj for name, obj in inspect.getmembers(sys.modules[__name__]) -# if (inspect.isfunction(obj) -# and name.startswith('test') -# and name != 'all') -# ] - -# [f() for f in test_functions] - - -if __name__ == '__main__': - unittest.main() - # one_function_to_run_them_all() +def test_md_hold(): + # test hold with only md provided + c = Connector( + vec1=[0, 0, 1], + md2=500, + ) + assert ( + c.inc_target == c.inc1 + and c.azi_target == c.azi1 + and c.pos_target[2] == c.md_target + ), "Failed c1" + assert c.method == 'hold', "Unexpected method" + + assert isinstance(from_connections(c), Survey) + +def test_md_and_vec(): + # test with md2 and vec2 provided (minimum curvature) + c = Connector( + vec1=[0, 0, 1], + md2=1000, + vec2=[0, 1, 0] + ) + assert c.method == 'min_curve' + +def test_pos(): + # test with pos2 provided (minimum distance) + c = Connector( + vec1=[0, 0, 1], + pos2=[100, 100, 1000], + ) + assert c.md_target > c.pos1[2], "Failed c3" + +def test_pos_and_dls(): + # test with pos2 needing more aggressive dls (minimum curvature) + c = Connector( + vec1=[0, 0, 1], + pos2=[200, 400, 200] + ) + assert c.method == 'min_curve_to_target' + +def test_pos_and_vec(): + # test with pos2 and vec2 provided + vec1 = [-1, -1, 1] + vec2 = [1, -1, 0] + c = Connector( + pos1=[0., 0., 0], + vec1=vec1 / np.linalg.norm(vec1), + pos2=[0., 1000., 500.], + vec2=vec2 / np.linalg.norm(vec2), + ) + assert c.method == 'curve_hold_curve' + + # test if interpolator and survey functions are working + assert isinstance(from_connections(c, step=30), Survey) + +def test_pos_inc_azi(): + # test with pos2, inc1 and azi1 provided + c = Connector( + pos1=[0., 0., 0], + inc1=0., + azi1=90, + pos2=[1000., 1000., 1000.], + vec2=[0., 0., 1.], + ) + assert c.method == 'curve_hold_curve' + +def test_dls2(): + # test with different dls for second curve section + c = Connector( + pos1=[0., 0., 0], + vec1=[0., 0., 1.], + pos2=[0., 100., 1000.], + vec2=[0., 0., 1.], + dls_design2=5 + ) + assert c.radius_design2 < c.radius_design + +def test_radius_critical(): + # test with dls_critical requirement (actual dls < dls_design) + c = Connector( + pos1=[0., 0., 0], + vec1=[0., 0., 1.], + pos2=[0., 100., 100.], + vec2=[0., 0., 1.], + ) + assert c.radius_critical < c.radius_design + +def test_min_curve(): + # test min_curve (inc2 provided) + c = Connector( + pos1=[0., 0., 0], + vec1=[0., 0., 1.], + inc2=30, + ) + assert c.method == 'min_curve' + +def test_radius_critical_with_min_curve(): + # test min_curve with md less than required radius + c = Connector( + pos1=[0., 0., 0], + inc1=0, + azi1=0, + md2=500, + inc2=90, + azi2=0, + ) + assert c.radius_critical < c.radius_design diff --git a/tests/test_fluid.py b/tests/test_fluid.py index 03e581e..dd3ec9a 100644 --- a/tests/test_fluid.py +++ b/tests/test_fluid.py @@ -1,34 +1,26 @@ -import unittest - from welleng.fluid import Fluid -class FluidTest(unittest.TestCase): - def test_fluid_density_profile(self): - """ - Test that the get_density_profile method in the Fluid class returns the - same value as the example in the SPE 11118 paper. - """ - fluid = Fluid( - fluid_density=10., # ppg - reference_temp=120., # Fahrenheit, - weighting_material='SPE_11118', - base_fluid_water_ratio=0.103, - ) - - # Override properties to align with ones provided in the example. - fluid.volume_water_reference_relative = 0.09 - fluid.volume_oil_reference_relative = 0.78 - fluid.volume_weighting_material_relative = 0.11 - - density_profile = fluid.get_density_profile( - depth=10_000., - temperature=250. - ) - - assert round(density_profile[-1], 2) == 9.85 - - -if __name__ == '__main__': - unittest.main() - # test_fluid_density_profile() +def test_fluid_density_profile(): + """ + Test that the get_density_profile method in the Fluid class returns the + same value as the example in the SPE 11118 paper. + """ + fluid = Fluid( + fluid_density=10., # ppg + reference_temp=120., # Fahrenheit, + weighting_material='SPE_11118', + base_fluid_water_ratio=0.103, + ) + + # Override properties to align with ones provided in the example. + fluid.volume_water_reference_relative = 0.09 + fluid.volume_oil_reference_relative = 0.78 + fluid.volume_weighting_material_relative = 0.11 + + density_profile = fluid.get_density_profile( + depth=10_000., + temperature=250. + ) + + assert round(density_profile[-1], 2) == 9.85 diff --git a/tests/test_iscwsa_mwd_error.py b/tests/test_iscwsa_mwd_error.py index 04b5082..47465d8 100644 --- a/tests/test_iscwsa_mwd_error.py +++ b/tests/test_iscwsa_mwd_error.py @@ -58,72 +58,64 @@ def get_err(error_model, wd): return err -# initiate lists -class IscwsaMwdErroroTest(unittest.TestCase): - def test_iscwsa_error_models(self, input_files=input_files): - for error_model, filename in input_files.items(): - df, err = initiate(error_model, filename) - - nn_c, ee_c, vv_c, ne_c, nv_c, ev_c = [], [], [], [], [], [] - data = [ - nn_c, ee_c, vv_c, ne_c, nv_c, ev_c - ] - - # generate error data - for index, row in df.iterrows(): - i = get_md_index(err, row['md']) - s = row['source'] - if s in ["Totals", "TOTAL"]: - source_cov = err.errors.cov_NEVs.T[i] - else: - source_cov = err.errors.errors[s].cov_NEV.T[i] - v = get_sigmas(source_cov, long=True) - for j, d in enumerate(v): - data[j].append(d[0]) - - # convert to dictionary - ed = {} - headers = [ - 'nn_c', 'ee_c', 'vv_c', 'ne_c', 'nv_c', 'ev_c' - ] - for i, h in enumerate(headers): - ed[h] = data[i] - - df_c = pd.DataFrame(ed) - - df_r = df.join(df_c) - - headers = [ - 'nn_d', 'ee_d', 'vv_d', 'ne_d', 'nv_d', 'ev_d' - ] - df_d = pd.DataFrame( - np.around( - np.array(df_c) - np.array(df.iloc[:, 2:]), - decimals=4 - ), - columns=headers +def test_iscwsa_error_models(input_files=input_files): + for error_model, filename in input_files.items(): + df, err = initiate(error_model, filename) + + nn_c, ee_c, vv_c, ne_c, nv_c, ev_c = [], [], [], [], [], [] + data = [ + nn_c, ee_c, vv_c, ne_c, nv_c, ev_c + ] + + # generate error data + for index, row in df.iterrows(): + i = get_md_index(err, row['md']) + s = row['source'] + if s in ["Totals", "TOTAL"]: + source_cov = err.errors.cov_NEVs.T[i] + else: + source_cov = err.errors.errors[s].cov_NEV.T[i] + v = get_sigmas(source_cov, long=True) + for j, d in enumerate(v): + data[j].append(d[0]) + + # convert to dictionary + ed = {} + headers = [ + 'nn_c', 'ee_c', 'vv_c', 'ne_c', 'nv_c', 'ev_c' + ] + for i, h in enumerate(headers): + ed[h] = data[i] + + df_c = pd.DataFrame(ed) + + df_r = df.join(df_c) + + headers = [ + 'nn_d', 'ee_d', 'vv_d', 'ne_d', 'nv_d', 'ev_d' + ] + df_d = pd.DataFrame( + np.around( + np.array(df_c) - np.array(df.iloc[:, 2:]), + decimals=4 + ), + columns=headers + ) + + df_r = df_r.join(df_d) + + with np.errstate(divide='ignore', invalid='ignore'): + error = np.nan_to_num(np.absolute( + np.array(df_d) / np.array(df.iloc[:, 2:]) + ) * 100 ) - df_r = df_r.join(df_d) + assert np.all(error < TOLERANCE), ( + f"failing error {d}" + ) - with np.errstate(divide='ignore', invalid='ignore'): - error = np.nan_to_num(np.absolute( - np.array(df_d) / np.array(df.iloc[:, 2:]) - ) * 100 - ) - - assert np.all(error < TOLERANCE), ( - f"failing error {d}" - ) - - # if you wanted to view the results, this would save then to an Excel - # file. - df_r.to_excel( - "tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx" - ) - - -# make above test runnanble separately -if __name__ == '__main__': - unittest.main() - # test_iscwsa_error_models(input_files) + # if you wanted to view the results, this would save then to an Excel + # file. + # df_r.to_excel( + # "tests/test_data/error_mwdrev5_iscwsa_validation_results.xlsx" + # ) diff --git a/tests/test_minimal.py b/tests/test_minimal.py index d61a655..f64d92f 100644 --- a/tests/test_minimal.py +++ b/tests/test_minimal.py @@ -4,24 +4,17 @@ -------------- Test things that should work with a *minimal* welleng install. """ -import unittest import welleng as we -class MinimalTest(unittest.TestCase): - - def test_survey(self): - survey = we.survey.interpolate_survey( - survey=we.survey.Survey( - md=[0, 500, 1000, 2000, 3000], - inc=[0, 0, 30, 90, 90], - azi=[90, 90, 90, 135, 180], - error_model='ISCWSA MWD Rev5' - ), - step=30. - ) - return survey - - -if __name__ == '__main__': - unittest.main() +def test_survey(): + survey = we.survey.interpolate_survey( + survey=we.survey.Survey( + md=[0, 500, 1000, 2000, 3000], + inc=[0, 0, 30, 90, 90], + azi=[90, 90, 90, 135, 180], + error_model='ISCWSA MWD Rev5' + ), + step=30. + ) + return survey diff --git a/tests/test_survey_interpolate.py b/tests/test_survey_interpolate.py index a9ceec0..693680e 100644 --- a/tests/test_survey_interpolate.py +++ b/tests/test_survey_interpolate.py @@ -1,8 +1,6 @@ -import unittest - import welleng as we -survey = we.survey.Survey( +SURVEY = we.survey.Survey( md=[0, 500, 1000, 2000, 2500, 3500], inc=[0, 0, 30, 90, 100, 80], azi=[45, 45, 45, 90, 90, 180], @@ -10,58 +8,31 @@ ) -class SurveyInterpolateTest(unittest.TestCase): - def test_survey_interpolate_survey(self, step=30): - global survey - survey_interp = we.survey.interpolate_survey(survey, step=step) - assert isinstance(survey_interp, we.survey.Survey) - - survey_interp = survey.interpolate_survey(step=step) - assert isinstance(survey_interp, we.survey.Survey) - - def test_survey_interpolate_survey_tvd(self, step=10): - global survey - survey_interp = survey.interpolate_survey(step=30) - survey_interp_tvd = we.survey.interpolate_survey_tvd( - survey_interp, step=step - ) - assert isinstance(survey_interp_tvd, we.survey.Survey) - - survey_interp_tvd = survey_interp.interpolate_survey_tvd(step=step) - assert isinstance(survey_interp_tvd, we.survey.Survey) - - def test_interpolate_md(self, md=800): - global survey - node = survey.interpolate_md(md=md) - assert isinstance(node, we.node.Node) - - def test_interpolate_tvd(self, tvd=800): - global survey - node = survey.interpolate_tvd(tvd=tvd) - assert isinstance(node, we.node.Node) - - -# def one_function_to_run_them_all(): -# """ -# Function to gather the test functions so that they can be tested by -# running this module. +def test_survey_interpolate_survey(step=30): + global SURVEY + survey_interp = we.survey.interpolate_survey(SURVEY, step=step) + assert isinstance(survey_interp, we.survey.Survey) -# https://stackoverflow.com/questions/18907712/python-get-list-of-all- -# functions-in-current-module-inspecting-current-module -# """ -# test_functions = [ -# obj for name, obj in inspect.getmembers(sys.modules[__name__]) -# if (inspect.isfunction(obj) -# and name.startswith('test') -# and name != 'all') -# ] + survey_interp = SURVEY.interpolate_survey(step=step) + assert isinstance(survey_interp, we.survey.Survey) -# for f in test_functions: -# f() +def test_survey_interpolate_survey_tvd(step=10): + global SURVEY + survey_interp = SURVEY.interpolate_survey(step=30) + survey_interp_tvd = we.survey.interpolate_survey_tvd( + survey_interp, step=step + ) + assert isinstance(survey_interp_tvd, we.survey.Survey) -# pass + survey_interp_tvd = survey_interp.interpolate_survey_tvd(step=step) + assert isinstance(survey_interp_tvd, we.survey.Survey) +def test_interpolate_md(md=800): + global SURVEY + node = SURVEY.interpolate_md(md=md) + assert isinstance(node, we.node.Node) -if __name__ == '__main__': - unittest.main() - # one_function_to_run_them_all() +def test_interpolate_tvd(tvd=800): + global SURVEY + node = SURVEY.interpolate_tvd(tvd=tvd) + assert isinstance(node, we.node.Node) diff --git a/tests/test_survey_parameters.py b/tests/test_survey_parameters.py index 87b6c0b..c956ded 100644 --- a/tests/test_survey_parameters.py +++ b/tests/test_survey_parameters.py @@ -1,9 +1,7 @@ -import unittest - import welleng as we import numpy as np -reference = { +REFERENCE = { 'x': 588319.02, 'y': 5770571.03, 'northing': 5770571.03, 'easting': 588319.02, 'latitude': 52.077583926214494, 'longitude': 4.288694821453205, 'convergence': 1.0166440347220762, @@ -13,103 +11,74 @@ 'wgs84-utm31': [588225.162, 5770360.512] } -calculator = we.survey.SurveyParameters(reference.get('srs')) - - -class SurveyParamsTest(unittest.TestCase): - def test_known_location(self): - survey_parameters = calculator.get_factors_from_x_y( - x=reference.get('x'), y=reference.get('y'), - date=reference.get('date') - ) - for k, v in survey_parameters.items(): - try: - assert round(v, 3) == round(reference.get(k), 3) - except TypeError: - assert v == reference.get(k) - - pass - - def test_transform_projection_coordinates(self): - # Convert survey coordinates from UTM31_ED50 to UTM31_WGS84 - coords = np.array((reference.get('easting'), reference.get('northing'))) - result = calculator.transform_coordinates(coords, 'EPSG:32631') - assert np.allclose( - result, - np.array(reference.get('wgs84-utm31')) - ) - - # Try as a list - result = calculator.transform_coordinates( - coords.tolist(), 'EPSG:32631' - ) - assert np.allclose( - result, - np.array(reference.get('wgs84-utm31')) - ) - - # Try as a tuple - result = calculator.transform_coordinates( - tuple(coords.tolist()), 'EPSG:32631' - ) - assert np.allclose( - result, - np.array(reference.get('wgs84-utm31')) - ) - - result = calculator.transform_coordinates( - np.array([coords, coords]), - 'EPSG:32631' - ) - assert np.allclose( - result, - np.full_like(result, reference.get('wgs84-utm31')) - ) - - # Try as a list - result = calculator.transform_coordinates( - [coords.tolist(), coords.tolist()], - 'EPSG:32631' - ) - assert np.allclose( - result, - np.full_like(result, reference.get('wgs84-utm31')) - ) - - # Try as a tuple - result = calculator.transform_coordinates( - (tuple(coords.tolist()), tuple(coords.tolist())), - 'EPSG:32631' - ) - assert np.allclose( - result, - np.full_like(result, reference.get('wgs84-utm31')) - ) - - pass - - -# def one_function_to_run_them_all(): -# """ -# Function to gather the test functions so that they can be tested by -# running this module. - -# https://stackoverflow.com/questions/18907712/python-get-list-of-all- -# functions-in-current-module-inspecting-current-module -# """ -# test_functions = [ -# obj for name, obj in inspect.getmembers(sys.modules[__name__]) -# if (inspect.isfunction(obj) -# and name.startswith('test') -# and name != 'all') -# ] - -# for f in test_functions: -# f() - -# pass - - -if __name__ == '__main__': - unittest.main() - # one_function_to_run_them_all() +CALCULATOR = we.survey.SurveyParameters(REFERENCE.get('srs')) + + +def test_known_location(): + survey_parameters = CALCULATOR.get_factors_from_x_y( + x=REFERENCE.get('x'), y=REFERENCE.get('y'), + date=REFERENCE.get('date') + ) + for k, v in survey_parameters.items(): + try: + assert round(v, 3) == round(REFERENCE.get(k), 3) + except TypeError: + assert v == REFERENCE.get(k) + + pass + +def test_transform_projection_coordinates(): + # Convert survey coordinates from UTM31_ED50 to UTM31_WGS84 + coords = np.array((REFERENCE.get('easting'), REFERENCE.get('northing'))) + result = CALCULATOR.transform_coordinates(coords, 'EPSG:32631') + assert np.allclose( + result, + np.array(REFERENCE.get('wgs84-utm31')) + ) + + # Try as a list + result = CALCULATOR.transform_coordinates( + coords.tolist(), 'EPSG:32631' + ) + assert np.allclose( + result, + np.array(REFERENCE.get('wgs84-utm31')) + ) + + # Try as a tuple + result = CALCULATOR.transform_coordinates( + tuple(coords.tolist()), 'EPSG:32631' + ) + assert np.allclose( + result, + np.array(REFERENCE.get('wgs84-utm31')) + ) + + result = CALCULATOR.transform_coordinates( + np.array([coords, coords]), + 'EPSG:32631' + ) + assert np.allclose( + result, + np.full_like(result, REFERENCE.get('wgs84-utm31')) + ) + + # Try as a list + result = CALCULATOR.transform_coordinates( + [coords.tolist(), coords.tolist()], + 'EPSG:32631' + ) + assert np.allclose( + result, + np.full_like(result, REFERENCE.get('wgs84-utm31')) + ) + + # Try as a tuple + result = CALCULATOR.transform_coordinates( + (tuple(coords.tolist()), tuple(coords.tolist())), + 'EPSG:32631' + ) + assert np.allclose( + result, + np.full_like(result, REFERENCE.get('wgs84-utm31')) + ) diff --git a/tests/test_utils.py b/tests/test_utils.py index 976de0e..3c726fd 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,19 +1,22 @@ -import unittest - import numpy as np +import pytest from numpy.typing import NDArray - from welleng.units import ureg from welleng.utils import ( - annular_volume, decimal2dms, dms2decimal, pprint_dms, dms_from_string + annular_volume, + decimal2dms, + dms2decimal, + pprint_dms, + dms_from_string, + radius_from_dls, + get_toolface, + get_arc, ) -LAT, LON = (52, 4, 43.1868, 'N'), (4, 17, 19.6368, 'E') +LAT, LON = (52, 4, 43.1868, "N"), (4, 17, 19.6368, "E") def _generate_random_dms(n: int, ndigits: int = None) -> NDArray: - """Generates a bunch of lat, lon coordinates. - """ assert n % 2 == 0, "n must be an even int" deg = np.random.randint(0, 180, n) min = np.random.randint(0, 60, n) @@ -22,170 +25,210 @@ def _generate_random_dms(n: int, ndigits: int = None) -> NDArray: if ndigits is not None: sec = np.around(sec, ndigits) - data = { - 0: 'N', 1: 'S', 2: 'E', 3: 'W' - } + data = {0: "N", 1: "S", 2: "E", 3: "W"} - direction = np.array([ - data.get(d) - for d in np.ravel(np.stack(( - np.random.randint(0, 2, int(n/2)), - np.random.randint(2, 4, int(n/2)) - ), axis=1 - )) - ]) + direction = np.array( + [ + data.get(d) + for d in np.ravel( + np.stack( + ( + np.random.randint(0, 2, int(n / 2)), + np.random.randint(2, 4, int(n / 2)), + ), + axis=1, + ) + ) + ] + ) - return np.stack( - (deg, min, sec, direction), - axis=-1, - dtype=object - ).reshape((-1, 2, 4)) + return np.stack((deg, min, sec, direction), axis=-1, dtype=object).reshape( + (-1, 2, 4) + ) def are_tuples_identical(tuple1, tuple2): return all(x == y for x, y in zip(tuple1, tuple2)) -class UtilsTest(unittest.TestCase): - def test_annular_volume(self): - av = annular_volume( - od=ureg('12.25 inch').to('meter'), - id=ureg(f'{9+5/8} inch').to('meter'), - length=ureg('1000 meter') +@pytest.mark.parametrize( + "od, id, length, expected", + [ + ( + ureg("12.25 inch").to("meter"), + ureg(f"{9+5/8} inch").to("meter"), + ureg("1000 meter"), + 3.491531223156194, + ), + # Add more test cases as needed + ], +) +def test_annular_volume(od, id, length, expected): + av = annular_volume(od=od, id=id, length=length) + assert av.m == expected + assert str(av.u) == "meter ** 3" + + +def test_decimal2dms(): + degrees, minutes, seconds, direction = decimal2dms( + (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]) + ) + assert (degrees, minutes, round(seconds, 4), direction) == LAT + + dms = decimal2dms((LAT[0] + LAT[1] / 60 + LAT[2] / 3600), ndigits=4) + assert np.all(np.equal(dms, np.array((np.array(LAT[:3]))))) + + dms = decimal2dms( + np.array( + [ + (LAT[0] + LAT[1] / 60 + LAT[2] / 3600), + (LON[0] + LON[1] / 60 + LON[2] / 3600), + ] + ).reshape((-1, 1)), + ndigits=4, + ) + assert np.all(np.equal(dms, np.array((np.array(LAT[:3]), np.array(LON[:3]))))) + + dms = decimal2dms( + np.array( + [ + (LAT[0] + LAT[1] / 60 + LAT[2] / 3600), + (LON[0] + LON[1] / 60 + LON[2] / 3600), + ] + ), + ndigits=4, + ) + assert np.all(np.equal(dms, np.array((np.array(LAT[:3]), np.array(LON[:3]))))) + + dms = decimal2dms( + np.array( + [ + (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]), + (LON[0] + LON[1] / 60 + LON[2] / 3600, LON[3]), + ] + ), + ndigits=4, + ) + assert np.all( + np.equal( + dms, np.array((np.array(LAT, dtype=object), np.array(LON, dtype=object))) ) + ) - assert av.m == 3.491531223156194 - assert str(av.u) == 'meter ** 3' - - pass - def test_decimal2dms(self): - degrees, minutes, seconds, direction = decimal2dms( - (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]) +def test_dms2decimal(): + decimal = dms2decimal( + (-LAT[0], LAT[1], LAT[2], LAT[3]) + ) # check it handles westerly + assert np.all( + np.equal( + decimal, + np.array([-(LAT[0] + LAT[1] / 60 + LAT[2] / 3600), LAT[3]], dtype=object), ) - assert (degrees, minutes, round(seconds, 4), direction) == LAT + ) - dms = decimal2dms( - (LAT[0] + LAT[1] / 60 + LAT[2] / 3600), ndigits=4 - ) - assert np.all(np.equal( - dms, - np.array((np.array(LAT[:3]))) - )) - - dms = decimal2dms(np.array([ - (LAT[0] + LAT[1] / 60 + LAT[2] / 3600), - (LON[0] + LON[1] / 60 + LON[2] / 3600) - ]).reshape((-1, 1)), ndigits=4) - assert np.all(np.equal( - dms, - np.array((np.array(LAT[:3]), np.array(LON[:3]))) - )) - - dms = decimal2dms(np.array([ - (LAT[0] + LAT[1] / 60 + LAT[2] / 3600), - (LON[0] + LON[1] / 60 + LON[2] / 3600) - ]), ndigits=4) - assert np.all(np.equal( - dms, - np.array((np.array(LAT[:3]), np.array(LON[:3]))) - )) - - dms = decimal2dms(np.array([ - (LAT[0] + LAT[1] / 60 + LAT[2] / 3600, LAT[3]), - (LON[0] + LON[1] / 60 + LON[2] / 3600, LON[3]) - ]), ndigits=4) - assert np.all(np.equal( - dms, - np.array((np.array(LAT, dtype=object), np.array(LON, dtype=object))) - )) - - def test_dms2decimal(self): - decimal = dms2decimal((-LAT[0], LAT[1], LAT[2], LAT[3])) # check it handles westerly - assert np.all(np.equal( - decimal, - np.array([ - -(LAT[0] + LAT[1] / 60 + LAT[2] / 3600), - LAT[3] - ], dtype=object) - )) + decimal = dms2decimal((LAT[:3])) + assert decimal == LAT[0] + LAT[1] / 60 + LAT[2] / 3600 - decimal = dms2decimal((LAT[:3])) - assert decimal == LAT[0] + LAT[1] / 60 + LAT[2] / 3600 + decimals = dms2decimal((LAT[:3], LON[:3])) + assert np.all( + np.equal(decimals, np.array((dms2decimal(LAT[:3]), dms2decimal(LON[:3])))) + ) - decimals = dms2decimal((LAT[:3], LON[:3])) - assert np.all(np.equal( + decimals = dms2decimal((LAT, LON)) + assert np.all( + np.equal( decimals, - np.array((dms2decimal(LAT[:3]), dms2decimal(LON[:3]))) - )) + np.array((dms2decimal(LAT), dms2decimal(LON))).reshape(decimals.shape), + ) + ) - decimals = dms2decimal((LAT, LON)) - assert np.all(np.equal( + decimals = dms2decimal(((LAT, LON), (LON, LAT))) + assert np.all( + np.equal( decimals, - np.array((dms2decimal(LAT), dms2decimal(LON))).reshape(decimals.shape) - )) + np.array( + ( + (dms2decimal(LAT), dms2decimal(LON)), + (dms2decimal(LON), dms2decimal(LAT)), + ) + ), + ) + ) + + +def test_dms2decimal2dms(): + _dms = _generate_random_dms(int(1e3), 8) + decimal = dms2decimal(_dms) + dms = decimal2dms(decimal, 8) + assert np.all(np.equal(_dms, dms)) - decimals = dms2decimal(((LAT, LON), (LON, LAT))) - assert np.all(np.equal( - decimals, - np.array(( - (dms2decimal(LAT), dms2decimal(LON)), - (dms2decimal(LON), dms2decimal(LAT)) - )) - )) - def test_dms2decimal2dms(self): - _dms = _generate_random_dms(int(1e3), 8) - decimal = dms2decimal(_dms) - dms = decimal2dms(decimal, 8) +def test_pprint_dms(): + result = pprint_dms(LAT, return_data=True) + data = dms_from_string(result) - assert np.all(np.equal(_dms, dms)) + assert are_tuples_identical(LAT, data) - def test_pprint_dms(self): - result = pprint_dms(LAT, return_data=True) - data = dms_from_string(result) + result = pprint_dms(LAT, return_data=True, symbols=False) + data = dms_from_string(result) - assert are_tuples_identical(LAT, data) + assert are_tuples_identical(LAT, data) - result = pprint_dms(LAT, return_data=True, symbols=False) - data = dms_from_string(result) + result = pprint_dms(LAT[:3], return_data=True) + data = dms_from_string(result) - assert are_tuples_identical(LAT, data) + assert are_tuples_identical(LAT[:3], data) - result = pprint_dms(LAT[:3], return_data=True) - data = dms_from_string(result) + result = pprint_dms(LAT[:3], return_data=True, symbols=False) + data = dms_from_string(result) - assert are_tuples_identical(LAT[:3], data) + assert are_tuples_identical(LAT[:3], data) - result = pprint_dms(LAT[:3], return_data=True, symbols=False) - data = dms_from_string(result) - assert are_tuples_identical(LAT[:3], data) +def test_get_arc(): + pos1 = np.array([0, 0, 0]) + vec1 = np.array([1, 0, 0]) + dls_design = 2.5 + radius = radius_from_dls(dls_design) + toolface = np.pi / 2 + dogleg = np.pi / 2 + + pos2, vec2, arc_length = get_arc(dogleg, radius, toolface, pos1, vec1) + + assert all( + ( + np.allclose(pos2, pos1 + np.array([radius, radius, 0])), + np.allclose(vec2, np.array([0, 1, 0])), + ) + ) - pass +def test_get_toolface(): + pos1 = np.array([0, 0, 0]) + vec1 = np.array([1, 0, 0]) + dls_design = 2.5 + radius = radius_from_dls(dls_design) -# def one_function_to_run_them_all(): -# """ -# Function to gather the test functions so that they can be tested by -# running this module. + pos2 = pos1 + np.array([radius, radius, 0]) + toolface = get_toolface(pos1, vec1, pos2) + assert np.isclose(toolface, np.pi / 2) -# https://stackoverflow.com/questions/18907712/python-get-list-of-all- -# functions-in-current-module-inspecting-current-module -# """ -# test_functions = [ -# obj for name, obj in inspect.getmembers(sys.modules[__name__]) -# if (inspect.isfunction(obj) -# and name.startswith('test') -# and name != 'all') -# ] + pos2 = pos1 + np.array([radius, -radius, 0]) + toolface = get_toolface(pos1, vec1, pos2) + assert np.isclose(toolface, -np.pi / 2) -# for f in test_functions: -# f() + pos2 = pos1 + np.array([radius, 0, radius]) + toolface = get_toolface(pos1, vec1, pos2) + assert np.isclose(toolface, np.pi) -# pass + pos2 = pos1 + np.array([radius, 0, -radius]) + toolface = get_toolface(pos1, vec1, pos2) + assert np.isclose(toolface, 0) + pos1 = np.array([0, 0, 0]) + vec1 = np.array([0, 0, 1]) -if __name__ == '__main__': - unittest.main() - # one_function_to_run_them_all() + pos2 = pos1 + np.array([radius, 0, radius]) + toolface = get_toolface(pos1, vec1, pos2) + assert np.isclose(toolface, 0) diff --git a/welleng/connector.py b/welleng/connector.py index 6275c9d..44c1138 100644 --- a/welleng/connector.py +++ b/welleng/connector.py @@ -367,6 +367,7 @@ def _min_dist_to_target(self): self.md2 = self.md1 + abs(self.dist_curve) self.md_target = self.md2 + self.tangent_length self.vec2 = self.vec_target + self.dls = np.degrees(self.dogleg) / abs(self.dist_curve) * 30 def _min_curve_to_target(self): ( @@ -388,6 +389,7 @@ def _min_curve_to_target(self): ) self._get_angles_target() self._get_md_target() + self.dls = np.degrees(self.dogleg) / self.dist_curve * 30 def _use_method(self): if self.method == 'hold': @@ -695,7 +697,15 @@ def _target_pos_and_vec_defined(self, pos3, vec_old=[0., 0., 0.]): self.dls - self.dls2 ) < self.delta_dls, np.allclose(self.pos1, self.pos2), - np.allclose(self.pos3, self.pos_target) + np.allclose(self.pos3, self.pos_target), + all(( + np.allclose(self.pos3, self.pos3_list[-5:]), + np.allclose(self.pos2, self.pos2_list[-5:]) + )), + all(( + self.dls == self.dls_design, + self.dls2 == self.dls_design2 + )) )): break self._happy_finish() @@ -752,6 +762,8 @@ def _happy_finish(self): self.md_target = self.md3 + abs(self.dist_curve2) + return self + def interpolate(self, step=30): return interpolate_well([self], step) @@ -819,6 +831,8 @@ def minimize_target_pos_and_vec_defined( c.pos3 = pos2_init + ( pos3_init - pos2_init ) / 2 + # else: + # c.pos3 = c.pos2 + (c.pos3 - c.pos2) / 2 c.distances1 = c._get_distances(c.pos1, c.vec1, c.pos3) radius_temp1 = get_radius_critical( @@ -935,12 +949,12 @@ def minimize_target_pos_and_vec_defined( abs(c.radius_critical - c.radius_critical2) ) c.dls = max( - dls_from_radius(c.radius_design), - dls_from_radius(c.radius_critical) + np.radians(dls_from_radius(c.radius_design)), + np.radians(dls_from_radius(c.radius_critical)) ) c.dls2 = max( - dls_from_radius(c.radius_design2), - dls_from_radius(c.radius_critical2) + np.radians(dls_from_radius(c.radius_design2)), + np.radians(dls_from_radius(c.radius_critical2)) ) if c.error: @@ -1111,7 +1125,10 @@ def min_curve_to_target(distances): 2 * dist_norm_to_target ) ) - assert radius_critical > 0 + if np.isnan(radius_critical): + radius_critical = np.nan + else: + assert radius_critical > 0 dogleg = ( 2 * np.arctan2( @@ -1145,7 +1162,10 @@ def get_radius_critical(radius, distances, min_error): ) ) * (1 - min_error) - assert radius_critical > 0 + if np.isnan(radius_critical): + radius_critical = np.nan + else: + assert radius_critical > 0 return radius_critical @@ -1225,7 +1245,7 @@ def interpolate_curve( ): # sometimes the curve section has no length # this if statement handles this event - if dist_curve == 0: + if any((dist_curve == 0, np.isnan(dist_curve))): inc, azi = get_angles(vec1, nev=True).T data = dict( md=np.array([md1]), @@ -1627,6 +1647,7 @@ def numbafy(functions): min_dist_to_target, get_radius_critical, angle, - get_dogleg + get_dogleg, + check_dogleg ) numbafy(NUMBAFY) diff --git a/welleng/survey.py b/welleng/survey.py index 29a8f5e..9de4284 100644 --- a/welleng/survey.py +++ b/welleng/survey.py @@ -185,13 +185,16 @@ def get_factors_from_x_y( convergence=result.meridian_convergence, scale_factor=result.meridional_scale, magnetic_field_intensity=( - result_magnetic.get('field-value').get('total-intensity').get('value') + None if result_magnetic is None + else result_magnetic.get('field-value').get('total-intensity').get('value') ), declination=( - result_magnetic.get('field-value').get('declination').get('value') + None if result_magnetic is None + else result_magnetic.get('field-value').get('declination').get('value') ), dip=( - result_magnetic.get('field-value').get('inclination').get('value') + None if result_magnetic is None + else result_magnetic.get('field-value').get('inclination').get('value') * ( -1 if "down" in result_magnetic.get('field-value').get('inclination').get('units') else 1 @@ -1559,10 +1562,8 @@ def _interpolate_survey(survey, x=0, index=0): 2), with the interpolated station between them (index 1) """ - if isinstance(index, np.ndarray): - index = index[0] - - index = int(index) + index = _ensure_int_or_float(index, int) + x = _ensure_int_or_float(x, float) assert index < len(survey.md) - 1, "Index is out of range" @@ -1605,8 +1606,8 @@ def _interpolate_survey(survey, x=0, index=0): s = Survey( md=np.array( [survey.md[index], survey.md[index] + x], - dtype='object' - ).astype(np.float64), # this is to prevent VisibleDeprecationWarning + dtype='float64' + ), inc=np.array([survey.inc_rad[index], inc]), azi=np.array([survey.azi_grid_rad[index], azi]), cov_nev=( @@ -1835,6 +1836,13 @@ def make_long_cov(arr): return cov +def _ensure_int_or_float(val, required_type) -> int | float: + if isinstance(val, np.ndarray): + val = val[0] + + return required_type(val) + + class SplitSurvey: def __init__( self, diff --git a/welleng/utils.py b/welleng/utils.py index 12c8e90..2cfc196 100644 --- a/welleng/utils.py +++ b/welleng/utils.py @@ -568,6 +568,9 @@ def transform(self, toolface, pos=None, vec=None, target=False): pos_new, vec_new = r.apply(np.vstack((self.pos, self.vec))) + # make sure vec_new is a unit vector: + vec_new = get_unit_vec(vec_new) + if pos is not None: pos_new += pos if target: @@ -843,3 +846,29 @@ def dms_from_string(text): else: return + + +def get_toolface(pos1: NDArray, vec1: NDArray, pos2: NDArray) -> float: + """Returns the toolface of an offset position relative to a reference + position and vector. + + Parameters + ---------- + pos1: ndarray + The reference NEV coordinate, e.g. current location. + vec1: ndarray + The reference NEV unit vector, e.g. current vector heading. + pos2: ndarray + The offset NEV coordinate, e.g. a target position. + + Returns + ------- + toolface: float + The toolface (bearing or required heading) in radians to pos2 from pos1 + with vec1. + """ + inc, azi = get_angles(vec1, nev=True)[0] + r = R.from_euler('zy', [-azi, -inc], degrees=False) + pos = r.apply(pos2 - pos1) + + return np.arctan2(*(np.flip(pos[:2]))) diff --git a/welleng/version.py b/welleng/version.py index ef72cc0..4ca39e7 100644 --- a/welleng/version.py +++ b/welleng/version.py @@ -1 +1 @@ -__version__ = '0.8.1' +__version__ = '0.8.2' diff --git a/welleng/visual.py b/welleng/visual.py index 8b2aa21..f052b42 100644 --- a/welleng/visual.py +++ b/welleng/visual.py @@ -70,7 +70,7 @@ def add(self, obj, *args, **kwargs) -> None: """ if isinstance(obj, mesh.WellMesh): poly = buildPolyData(obj.mesh.vertices, obj.mesh.faces) - vedo_mesh = Mesh(poly, *args, *kwargs) + vedo_mesh = Mesh(poly, *args, **kwargs) setattr(obj, 'vedo_mesh', vedo_mesh) self.wells.append(obj) super().add(obj.vedo_mesh) @@ -150,9 +150,10 @@ def _pointer_callback(self, event): md = survey.md[idx] inc = survey.inc_deg[idx] azi_grid = survey.azi_grid_deg[idx] + dls = survey.dls[idx] self.pointer_text.text(f''' well name: {name}\n - md: {md:.2f}\t inc: {inc:.2f}\t azi: {azi_grid:.2f}\n + md: {md:.2f}\t inc: {inc:.2f}\t azi: {azi_grid:.2f}\t dls: {dls:.2f}\n point coordinates: {np.round(pt3d, 3)} ''') self.render()