From 7c2558b4411640c5f3e6091e8affa576faf17211 Mon Sep 17 00:00:00 2001 From: Jonny Corcutt Date: Sat, 16 Dec 2023 14:36:12 +0100 Subject: [PATCH 1/7] Added SurveyParameters class to survey for determining input parameters for initiating a SurveyHeader --- setup.py | 1 + tests/test_data/test_survey_parameters.py | 50 ++++++++ welleng/error.py | 0 welleng/survey.py | 134 ++++++++++++++++++++++ welleng/version.py | 2 +- 5 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 tests/test_data/test_survey_parameters.py mode change 100755 => 100644 welleng/error.py diff --git a/setup.py b/setup.py index cb52d9d..7eac3af 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ 'trimesh', 'utm', 'vedo', + 'pyproj' # required for getting survey parameters ]) # this is the troublesome requirement that needs C dependencies diff --git a/tests/test_data/test_survey_parameters.py b/tests/test_data/test_survey_parameters.py new file mode 100644 index 0000000..68c1e4f --- /dev/null +++ b/tests/test_data/test_survey_parameters.py @@ -0,0 +1,50 @@ +import welleng as we +import inspect +import sys + + +reference = { + 'x': 588319.02, 'y': 5770571.03, 'northing': 5770571.03, + 'easting': 588319.02, 'latitude': 52.077583926214494, + 'longitude': 4.288694821453205, 'convergence': 1.0166440347220762, + 'scale_factor': 0.9996957469340422, 'magnetic_field_intensity': 49381, + 'declination': 2.213, 'dip': -67.199, 'date': '2023-12-16', + 'srs': 'EPSG:23031' +} + + +def test_known_location(): + calculator = we.survey.SurveyParameters(reference.get('srs')) + survey_parameters = calculator.get_factors_from_x_y( + reference.get('x'), reference.get('y'), date=reference.get('date') + ) + + for k, v in survey_parameters.items(): + assert v == reference.get(k) + + pass + + +def one_function_to_run_them_all(): + """ + Function to gather the test functions so that they can be tested by + running this module. + + https://stackoverflow.com/questions/18907712/python-get-list-of-all- + functions-in-current-module-inspecting-current-module + """ + test_functions = [ + obj for name, obj in inspect.getmembers(sys.modules[__name__]) + if (inspect.isfunction(obj) + and name.startswith('test') + and name != 'all') + ] + + for f in test_functions: + f() + + pass + + +if __name__ == '__main__': + one_function_to_run_them_all() diff --git a/welleng/error.py b/welleng/error.py old mode 100755 new mode 100644 diff --git a/welleng/survey.py b/welleng/survey.py index 42b2091..ec377ec 100644 --- a/welleng/survey.py +++ b/welleng/survey.py @@ -7,6 +7,11 @@ except ImportError: MAG_CALC = False from datetime import datetime +try: + from pyproj import CRS, Proj + GRID_CALC = True +except ImportError: + GRID_CALC = False from scipy.optimize import minimize from scipy.spatial.transform import Rotation as R @@ -35,6 +40,135 @@ AZI_REF = ["true", "magnetic", "grid"] +class SurveyParameters(Proj): + """Class for calculating survey parameters for input to a Survey Header. + + This is a wrapper of pyproj that tries to simplify the process of getting + convergence, declination and dip values for a survey header. + + Notes + ----- + Requires ``pyproj`` and ``magnetic_field_calculator`` to be installed and + access to the internet. + + References + ---------- + For more info on transformations between maps, refer to the pyproj project + [here](https://pypi.org/project/pyproj/). + """ + def __init__(self, projection: str = "EPSG:23031") -> None: + """Initiates a SurveyParameters object for conversion of map + coordinates to WGS84 lat/lon for calculating magnetic field properties. + + Parameters + ---------- + projection: str (default: "EPSG:23031") + The EPSG code of the map of interest. The default represents + ED50/UTM zone 31N. + + Reference + --------- + For codes refer to [EPSG](https://epsg.io). + """ + if not GRID_CALC: + raise ImportError('pyproj dependency required, try ``pip install pyproj``.') + self.crs = CRS(projection) + super().__init__(self.crs) + + def get_factors_from_x_y( + self, x: float, y: float, altitude: float = None, + date: str = None + ) -> dict: + """Calculates the survey header parameters for a given map coordinate. + + Parameters + ---------- + x: float + The x or East/West coordinate. + y: float + The y or North/South coordinate. + altitude: float (default: None) + The altitude or z value coordinate. If none is provided this will + default to zero (sea level). + date: str (default: None) + The date of the survey, used when calculating the magnetic + parameters. Will default to the current date. + + Returns + ------- + data: dict + x: float + The x coordinate. + y: float + The y coordinate. + northing: float + The Northing (negative values are South). + easting: float + The Easting (negative values are West). + latitude: float + The WGS84 latitude. + longitude: float + The WGS84 longitude. + convergence: float + Te grid convergence for the provided coordinates. + scale_factor: float + The scale factor for the provided coordinates. + magnetic_field_intensity: float + The total field intensity for the provided coordinates and + time. + declination: float + The declination at the provided coordinates and time. + dip: float + The dip angle at the provided coordinates and time. + date: + The date used for determining the magnetic parameters. + """ + longitude, latitude = self(x, y, inverse=True) + result = self.get_factors(longitude, latitude) + + date = ( + datetime.today().strftime('%Y-%m-%d') if date is None + else date + ) + if MAG_CALC: + magnetic_calculator = MagneticFieldCalculator() + result_magnetic = magnetic_calculator.calculate( + latitude=latitude, longitude=longitude, + altitude=0 if altitude is None else altitude, + date=date + ) + else: + result_magnetic = None + + data = dict( + x=x, + y=y, + northing=y, + easting=x, + latitude=latitude, + longitude=longitude, + convergence=result.meridian_convergence, + scale_factor=result.meridional_scale, + magnetic_field_intensity=( + result_magnetic.get('field-value').get('total-intensity').get('value') + ), + declination=( + result_magnetic.get('field-value').get('declination').get('value') + ), + dip=( + result_magnetic.get('field-value').get('inclination').get('value') + * ( + -1 if "down" in result_magnetic.get('field-value').get('inclination').get('units') + else 1 + ) + ), + date=date, + srs=self.crs.srs + ) + + return data + + class SurveyHeader: """ A class for storing header information about a well. diff --git a/welleng/version.py b/welleng/version.py index c9880e6..7dfe66c 100644 --- a/welleng/version.py +++ b/welleng/version.py @@ -1 +1 @@ -__version__ = '0.7.4' +__version__ = '0.7.5' From 33f0b559289181be5000a9cf05c92a1b2e69d306 Mon Sep 17 00:00:00 2001 From: Jonny Corcutt Date: Sat, 16 Dec 2023 14:43:24 +0100 Subject: [PATCH 2/7] Added annular_volume function to utils --- welleng/utils.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/welleng/utils.py b/welleng/utils.py index eff08dd..2ecf3ab 100644 --- a/welleng/utils.py +++ b/welleng/utils.py @@ -638,3 +638,36 @@ def get_arc(dogleg, radius, toolface, pos=None, vec=None, target=False): pos_new, vec_new = arc.transform(toolface, pos, vec, target) return (pos_new, vec_new, arc.delta_md) + + +def annular_volume(od: float, id: float = None, length: float = None): + """ + Calculate an annular volume. + + If no ``id`` is provided then circular volume is calculated. If no + ``length`` is provided, then the unit volume is calculated (i.e. the + area). + + Units are assumed consistent across input parameters, i.e. the + calculation is dimensionless. + + Parameters + ---------- + od: float + The outer diameter. + id: float | None, optional + The inner diameter, default is 0. + length : float | None, optional + The length of the annulus. + + Returns + ------- + annular_volume: float + The (unit) volume of the annulus or cylinder. + """ + length = 1 if length is None else length + id = 0 if id is None else id + annular_unit_volume = (np.pi * (od - id)**2) / 4 + annular_volume = annular_unit_volume * length + + return annular_volume From 6d5cd47a3957444b4949e75fd437937e411d742e Mon Sep 17 00:00:00 2001 From: Jonny Corcutt Date: Sat, 16 Dec 2023 14:45:18 +0100 Subject: [PATCH 3/7] Remove ureg definition of Nm to prevent warning when importing welleng. --- welleng/units.py | 1 - 1 file changed, 1 deletion(-) diff --git a/welleng/units.py b/welleng/units.py index 1ab7a7f..3d2091f 100644 --- a/welleng/units.py +++ b/welleng/units.py @@ -4,4 +4,3 @@ # TODO import custom units from file instead of defining here ureg.define('ft_lbf = ft * lbf') -ureg.define('Nm = N * m') From 6d3121805ad41d7a1220e7b5baea2b0bc7f24d54 Mon Sep 17 00:00:00 2001 From: jonnymaserati Date: Sat, 16 Dec 2023 15:01:04 +0100 Subject: [PATCH 4/7] Comply with Python guidelines --- tests/test_data/test_survey_parameters.py | 100 +- welleng/connector.py | 3257 ++++++++++----------- welleng/utils.py | 1346 ++++----- 3 files changed, 2351 insertions(+), 2352 deletions(-) diff --git a/tests/test_data/test_survey_parameters.py b/tests/test_data/test_survey_parameters.py index 68c1e4f..69ce643 100644 --- a/tests/test_data/test_survey_parameters.py +++ b/tests/test_data/test_survey_parameters.py @@ -1,50 +1,50 @@ -import welleng as we -import inspect -import sys - - -reference = { - 'x': 588319.02, 'y': 5770571.03, 'northing': 5770571.03, - 'easting': 588319.02, 'latitude': 52.077583926214494, - 'longitude': 4.288694821453205, 'convergence': 1.0166440347220762, - 'scale_factor': 0.9996957469340422, 'magnetic_field_intensity': 49381, - 'declination': 2.213, 'dip': -67.199, 'date': '2023-12-16', - 'srs': 'EPSG:23031' -} - - -def test_known_location(): - calculator = we.survey.SurveyParameters(reference.get('srs')) - survey_parameters = calculator.get_factors_from_x_y( - reference.get('x'), reference.get('y'), date=reference.get('date') - ) - - for k, v in survey_parameters.items(): - assert v == reference.get(k) - - pass - - -def one_function_to_run_them_all(): - """ - Function to gather the test functions so that they can be tested by - running this module. - - https://stackoverflow.com/questions/18907712/python-get-list-of-all- - functions-in-current-module-inspecting-current-module - """ - test_functions = [ - obj for name, obj in inspect.getmembers(sys.modules[__name__]) - if (inspect.isfunction(obj) - and name.startswith('test') - and name != 'all') - ] - - for f in test_functions: - f() - - pass - - -if __name__ == '__main__': - one_function_to_run_them_all() +import inspect +import sys + +import welleng as we + +reference = { + 'x': 588319.02, 'y': 5770571.03, 'northing': 5770571.03, + 'easting': 588319.02, 'latitude': 52.077583926214494, + 'longitude': 4.288694821453205, 'convergence': 1.0166440347220762, + 'scale_factor': 0.9996957469340422, 'magnetic_field_intensity': 49381, + 'declination': 2.213, 'dip': -67.199, 'date': '2023-12-16', + 'srs': 'EPSG:23031' +} + + +def test_known_location(): + calculator = we.survey.SurveyParameters(reference.get('srs')) + survey_parameters = calculator.get_factors_from_x_y( + reference.get('x'), reference.get('y'), date=reference.get('date') + ) + + for k, v in survey_parameters.items(): + assert v == reference.get(k) + + pass + + +def one_function_to_run_them_all(): + """ + Function to gather the test functions so that they can be tested by + running this module. + + https://stackoverflow.com/questions/18907712/python-get-list-of-all- + functions-in-current-module-inspecting-current-module + """ + test_functions = [ + obj for name, obj in inspect.getmembers(sys.modules[__name__]) + if (inspect.isfunction(obj) + and name.startswith('test') + and name != 'all') + ] + + for f in test_functions: + f() + + pass + + +if __name__ == '__main__': + one_function_to_run_them_all() diff --git a/welleng/connector.py b/welleng/connector.py index 764fe48..d591f9a 100644 --- a/welleng/connector.py +++ b/welleng/connector.py @@ -1,1629 +1,1628 @@ -import numpy as np -from copy import copy, deepcopy -from scipy.spatial import distance -from scipy.optimize import minimize -try: - from numba import njit - NUMBA = True -except ImportError: - NUMBA = False - -from .utils import ( - get_vec, _get_angles, get_angles, get_nev, get_xyz, get_unit_vec, - NEV_to_HLA, dls_from_radius -) -from .node import Node, get_node_params - - -class Connector: - def __init__( - self, - node1=None, - node2=None, - pos1=[0., 0., 0.], - vec1=None, - inc1=None, - azi1=None, - md1=0, - dls_design=3.0, - dls_design2=None, - md2=None, - pos2=None, - vec2=None, - inc2=None, - azi2=None, - degrees=True, - unit='meters', - min_error=1e-5, - delta_dls=0.1, - min_tangent=0., - max_iterations=1_000, - force_min_curve=False, - closest_approach=False - ): - - """ - A class to provide a fast, efficient method for determining well - bore section geometry using the appropriate method based upon the - provided parameters, with the intent of assisting machine learning - fitness functions. Interpolation between the returned control points - can be performed posthumously - attempts are made to keep processing to - a minimum in the event that the Connector is being used for machine - learning. - - Only specific combinations of input data are permitted, e.g. if you - input both a start vector AND a start inc and azi you'll get an error - (which one is correct after all?). Think about what you're trying to - achieve and provide the data that will facilitate that outcome. - - Parameters - ---------- - pos1: (3) list or array of floats (default: [0,0,0]) - Start position in NEV coordinates. - vec1: (3) list or array of floats or None (default: None) - Start position unit vector in NEV coordinates. - inc1: float or None (default: None) - Start position inclination. - azi2: float or None (default: None) - Start position azimuth. - md1: float or None (default: None) - Start position measured depth. - dls_design: float (default: 3.0) - The desired Dog Leg Severity (DLS) for the (first) curved - section in degrees per 30 meters or 100 feet. - dls_design2: float or None (default: None) - The desired DLS for the second curve section in degrees per - 30 meters or 100 feet. If set to None then `dls_design` will - be the default value. - md2: float or None (default: None) - The measured depth of the target position. - pos2: (3) list or array of floats or None (default: None) - The position of the target in NEV coordinates. - vec2: (3) list or array of floats or None (default: None) - The target unit vector in NEV coordinates. - inc1: float or None (default: None) - The inclination at the target position. - azi2: float or None (default: None) - The azimuth at the target position. - degrees: boolean (default: True) - Indicates whether the input angles (inc, azi) are in degrees - (True) or radians (False). - unit: string (default: 'meters') - Indicates the distance unit, either 'meters' or 'feet'. - min_error: float (default: 1e-5): - Infers the error tolerance of the results and is used to set - iteration stops when the desired error tolerance is met. Value - must be less than 1. Use with caution as the code may - become unstable if this value is changed. - delta_radius: float (default: 20) - The delta radius (first curve and second curve sections) used - as an iteration stop when balancing radii. If the resulting - delta radius yielded from `dls_design` and `dls_design2` is - larger than `delta_radius`, then `delta_radius` defaults to - the former. - delta_dls: float (default: 0.1) - The delta dls (first curve and second curve sections) used as an - iteration stop when balancing radii, i.e. if the dls of the second - section is within 0.1 deg/30m of the first curve section then the - section is considered balanced and no further iterations are - performed. Setting this value too low will likely result in hitting - the recursion limit. - min_tangent: float (default: 10) - The minimum tangent length in the `curve_hold_curve` method - used to mitigate instability during iterations (where the - tangent section approaches or equals 0). - max_iterations: int (default: 1000) - The maximum number of iterations before giving up trying to - fit a `curve_hold_curve`. This number is limited by Python's - depth of recursion, but if you're hitting the default stop - then consider changing `delta_radius` and `min_tangent` as - your expectations may be unrealistic (this is open source - software after all!) - - Results - ------- - connector: welleng.connector.Connector object - """ - if node1 is not None: - pos1, vec1, md1 = get_node_params( - node1 - ) - if node2 is not None: - pos2, vec2, md2 = get_node_params( - node2 - ) - - # Set up a lookup dictionary to use with the logic to determine - # what connector method to deploy. Use a binary string to - # represent the inputs in the order: - # (md2, inc2, azi2, pos2, vec2) - # Initially I used boolean logic, but it quickly became non- - # transparent and difficult to debug. - self._get_initial_methods() - - # METHODS = [ - # 'hold', - # 'curve_hold', - # 'min_dist_to_target', - # 'min_curve_to_target', - # 'curve_hold_curve', - # 'min_curve' - # ] - - # quick check that inputs are workable and if not some steer to - # the user. - assert vec1 is not None or (inc1 is not None and azi1 is not None), ( - "Require either vec1 or (inc1 and azi1)" - ) - if vec1 is not None: - assert inc1 is None and azi1 is None, ( - "Either vec1 or (inc1 and azi1)" - ) - if (inc1 is not None or azi1 is not None): - assert vec1 is None, "Either vec1 or (inc1 and azi1)" - - assert ( - md2 is not None - or pos2 is not None - or vec2 is not None - or inc2 is not None - or azi2 is not None - ), "Missing target parameters" - - if vec2 is not None: - assert not (inc2 or azi2), "Either vec2 or (inc2 and azi2)" - if (inc2 is not None or azi2 is not None): - assert vec2 is None, "Either vec2 or (inc2 and azi2)" - if md2 is not None: - assert pos2 is None, "Either md2 or pos2" - assert md2 >= md1, "md2 must be larger than md1" - - if dls_design is None: - dls_design = 3.0 - else: - assert dls_design > 0, "dls_design must be greater than zero" - assert min_error < 1, "min_error must be less than 1.0" - - # figure out what method is required to connect the points - target_input = convert_target_input_to_booleans( - md2, inc2, azi2, pos2, vec2 - ) - - self.force_min_curve = force_min_curve - if self.force_min_curve: - self.initial_method = 'min_curve_or_hold' - else: - self.initial_method = self.initial_methods[target_input] - - # do some more initialization stuff - self.min_error = min_error - self.min_tangent = min_tangent - self.iterations = 0 - self.max_iterations = max_iterations - self.errors = [] - self.radii = [] - self.dogleg_old, self.dogleg2_old = 0, 0 - self.dist_curve2 = 0 - self.pos1 = np.array(pos1) - - self.pos2, self.vec2, self.inc2, self.azi2, self.md2 = ( - None, None, None, None, None - ) - - # fill in the input data gaps - if (inc1 is not None and azi1 is not None): - if degrees: - self.inc1 = np.radians(inc1) - self.azi1 = np.radians(azi1) - else: - self.inc1 = inc1 - self.azi1 = azi1 - self.vec1 = np.array(get_vec( - self.inc1, self.azi1, nev=True, deg=False - )).reshape(3) - else: - self.vec1 = np.array(vec1).reshape(3) - self.inc1, self.azi1 = get_angles(self.vec1, nev=True).reshape(2) - - self.md1 = md1 - self.pos_target = None if pos2 is None else np.array(pos2).reshape(3) - self.md_target = md2 - - if vec2 is not None: - self.vec_target = np.array(vec2).reshape(3) - self.inc_target, self.azi_target = get_angles( - self.vec_target, - nev=True - ).reshape(2) - elif (inc2 is not None and azi2 is not None): - if degrees: - self.inc_target = np.radians(inc2) - self.azi_target = np.radians(azi2) - else: - self.inc_target = inc2 - self.azi_target = azi2 - self.vec_target = get_vec( - self.inc_target, self.azi_target, nev=True, deg=False - ).reshape(3) - elif inc2 is None and azi2 is None: - self.inc_target, self.azi_target, self.vec_target = ( - self.inc1, self.azi1, self.vec1 - ) - elif inc2 is None: - self.inc_target = self.inc1 - if degrees: - self.azi_target = np.radians(azi2) - else: - self.azi_target = azi2 - self.vec_target = get_vec( - self.inc_target, self.azi_target, nev=True, deg=False - ).reshape(3) - elif azi2 is None: - self.azi_target = self.azi1 - if degrees: - self.inc_target = np.radians(inc2) - else: - self.inc_target = inc2 - self.vec_target = get_vec( - self.inc_target, self.azi_target, nev=True, deg=False - ).reshape(3) - else: - self.vec_target = vec2 - self.inc_target = inc2 - self.azi_target = azi2 - - self.unit = unit - if self.unit == 'meters': - self.denom = 30 - else: - self.denom = 100 - - if degrees: - self.dls_design = np.radians(dls_design) - if dls_design2: - self.dls_design2 = np.radians(dls_design2) - else: - self.dls_design = dls_design - if dls_design2: - self.dls_design2 = dls_design2 - if not dls_design2: - self.dls_design2 = self.dls_design - - self.radius_design = self.denom / self.dls_design - self.radius_design2 = self.denom / self.dls_design2 - - self.delta_dls = delta_dls - - # some more initialization stuff - self.tangent_length = None - self.dogleg2 = None - - self.pos3, self.vec3, self.inc3, self.azi3, self.md3 = ( - None, None, None, None, None - ) - self.radius_critical, self.radius_critical2 = np.inf, np.inf - self.closest_approach = closest_approach - - # Things fall apart if the start and end vectors exactly equal - # one another, so need to check for this and if this is the - # case, modify the end vector slightly. This is a lazy way of - # doing this, but it's fast. Probably a more precise way would - # be to split the dogleg in two, but that's more hassle than - # it's worth. - if ( - self.vec_target is not None - and np.array_equal(self.vec_target, self.vec1 * -1) - ): - ( - self.vec_target, - self.inc_target, - self.azi_target - ) = mod_vec(self.vec_target, self.min_error) - - # properly figure out the method - self._get_method() - - # and finally, actually do something... - self._use_method() - - self._get_nodes() - - def _get_nodes(self): - self.node_start = Node( - pos=self.pos1.reshape(3), - vec=self.vec1.reshape(3), - md=self.md1 - ) - self.node_end = Node( - pos=self.pos_target.reshape(3), - vec=self.vec_target.reshape(3), - md=self.md_target - ) - - def _min_dist_to_target(self): - ( - self.tangent_length, - self.dogleg - ) = min_dist_to_target(self.radius_design, self.distances) - self.dogleg = check_dogleg(self.dogleg) - self.dist_curve, self.func_dogleg = get_curve_hold_data( - self.radius_design, self.dogleg - ) - self.vec_target = get_vec_target( - self.pos1, - self.vec1, - self.pos_target, - self.tangent_length, - self.dist_curve, - self.func_dogleg - ) - self._get_angles_target() - self._get_md_target() - self.pos2 = ( - self.pos_target - ( - self.tangent_length * self.vec_target - ) - ) - self.md2 = self.md1 + abs(self.dist_curve) - self.md_target = self.md2 + self.tangent_length - self.vec2 = self.vec_target - - def _min_curve_to_target(self): - ( - self.tangent_length, - self.radius_critical, - self.dogleg - ) = min_curve_to_target(self.distances) - self.dogleg = check_dogleg(self.dogleg) - self.dist_curve, self.func_dogleg = get_curve_hold_data( - min(self.radius_design, self.radius_critical), self.dogleg - ) - self.vec_target = get_vec_target( - self.pos1, - self.vec1, - self.pos_target, - self.tangent_length, - self.dist_curve, - self.func_dogleg - ) - self._get_angles_target() - self._get_md_target() - - def _use_method(self): - if self.method == 'hold': - self._hold() - elif self.method == 'min_curve': - self._min_curve() - elif self.method == 'curve_hold_curve': - self.pos2_list, self.pos3_list = [], [deepcopy(self.pos_target)] - self.vec23 = [np.array([0., 0., 0.])] - self.delta_radius_list = [] - # self._target_pos_and_vec_defined(deepcopy(self.pos_target)) - self._target_pos_and_vec_defined( - self.pos1 + (self.pos_target - self.pos1) / 2 - ) - else: - self.distances = self._get_distances( - self.pos1, self.vec1, self.pos_target - ) - if self.radius_design <= get_radius_critical( - self.radius_design, self.distances, self.min_error - ): - self.method = 'min_dist_to_target' - self._min_dist_to_target() - else: - if self.closest_approach: - self.method = 'min_curve_to_target' - self._closest_approach() - else: - self.method = 'min_curve_to_target' - self._min_curve_to_target() - - def _get_method(self): - assert self.initial_method not in [ - 'no_input', - 'vec_and_inc_azi', - 'md_and_pos' - ], f"{self.initial_method}" - if self.initial_method == 'hold': - self.method = 'hold' - elif self.initial_method[-8:] == '_or_hold': - if np.array_equal(self.vec_target, self.vec1): - if self.pos_target is None: - self.method = 'hold' - elif np.allclose( - self.vec_target, - (self.pos_target - self.pos1) - / np.linalg.norm(self.pos_target - self.pos1) - ): - self.method = 'hold' - else: - self.method = self.initial_method[:-8] - else: - self.method = self.initial_method[:-8] - else: - self.method = self.initial_method - - def _get_initial_methods(self): - # TODO: probably better to load this in from a yaml file - # [md2, inc2, azi2, pos2, vec2] forms the booleans - self.initial_methods = { - '00000': 'no_input', - '00001': 'min_curve_or_hold', - '00010': 'curve_hold_or_hold', - '00011': 'curve_hold_curve_or_hold', - '00100': 'min_curve_or_hold', - '00101': 'vec_and_inc_azi', - '00110': 'curve_hold', - '00111': 'vec_and_inc_azi', - '01000': 'min_curve_or_hold', - '01001': 'vec_and_inc_azi', - '01010': 'curve_hold_or_hold', - '01011': 'vec_and_inc_azi', - '01100': 'min_curve_or_hold', - '01101': 'vec_and_inc_azi', - '01110': 'curve_hold_curve_or_hold', - '01111': 'vec_and_inc_azi', - '10000': 'hold', - '10001': 'min_curve_or_hold', - '10010': 'md_and_pos', - '10011': 'md_and_pos', - '10100': 'min_curve_or_hold', - '10101': 'vec_and_inc_azi', - '10110': 'md_and_pos', - '10111': 'md_and_pos', - '11000': 'min_curve_or_hold', - '11001': 'vec_and_inc_azi', - '11010': 'md_and_pos', - '11011': 'md_and_pos', - '11100': 'min_curve_or_hold', - '11101': 'vec_and_inc_azi', - '11110': 'md_and_pos', - '11111': 'md_and_pos' - } - - def _closest_approach(self): - vec_pos1_pos_target = self.pos_target - self.pos1 - vec_pos1_pos_target /= np.linalg.norm(vec_pos1_pos_target) - - cross_product = np.cross(vec_pos1_pos_target, self.vec1) - cross_product /= np.linalg.norm(cross_product) - - factor = cross_product / vec_pos1_pos_target - factor /= abs(factor) - - cc = ( - self.pos1 + cross_product * factor * self.radius_design - ) - - cc_pos_target = self.pos_target - cc - cc_pos_target /= np.linalg.norm(cc_pos_target) - - self.pos_target_original = copy(self.pos_target) - - self.pos_target = cc + cc_pos_target * self.radius_design - - # recalculate self.distances with new self.pos_target - self.distances = self._get_distances( - self.pos1, self.vec1, self.pos_target - ) - - self._min_curve_to_target() - - def _min_curve(self): - self.dogleg = get_dogleg( - self.inc1, self.azi1, self.inc_target, self.azi_target - ) - - self.dogleg = check_dogleg(self.dogleg) - if self.md_target is None: - self.md2 = None - self.dist_curve, self.func_dogleg = get_curve_hold_data( - self.radius_design, self.dogleg - ) - self.md_target = self.md1 + abs(self.dist_curve) - self.pos_target = get_pos( - self.pos1, - self.vec1, - self.vec_target, - self.dist_curve, - self.func_dogleg - ).reshape(3) - else: - with np.errstate(divide='ignore'): - self.radius_critical = np.nan_to_num(abs( - (self.md_target - self.md1) / self.dogleg - ), nan=np.inf) - if ( - self.radius_critical > self.radius_design - or ( - np.around(self.dogleg, decimals=5) - == np.around(np.pi, decimals=5) - ) - ): - self.md2 = ( - self.md1 - + min(self.radius_design, self.radius_critical) - * self.dogleg - ) - ( - self.inc2, self.azi2, self.vec2 - ) = self.inc_target, self.azi_target, self.vec_target - self.dist_curve, self.func_dogleg = get_curve_hold_data( - min(self.radius_design, self.radius_critical), - self.dogleg - ) - self.pos2 = get_pos( - self.pos1, - self.vec1, - self.vec2, - self.dist_curve, - self.func_dogleg - ).reshape(3) - self.pos_target = self.pos2 + ( - self.vec2 * (self.md_target - self.md2) - ) - else: - self.dist_curve, self.func_dogleg = get_curve_hold_data( - self.radius_critical, self.dogleg - ) - self.md2 = None - self.pos_target = get_pos( - self.pos1, - self.vec1, - self.vec_target, - self.dist_curve, - self.func_dogleg - ).reshape(3) - - def _hold(self): - if self.pos_target is None: - self.pos_target = ( - self.pos1 + self.vec1 * (self.md_target - self.md1) - ) - if self.md_target is None: - self.md_target = ( - np.linalg.norm(self.pos_target - self.pos1) - + self.md1 - ) - self.dls, self.dls2 = 0.0, 0.0 - - def _get_angles_target(self): - self.inc_target, self.azi_target = get_angles( - self.vec_target, nev=True - ).reshape(2) - - def _get_md_target(self): - self.md_target = ( - self.dist_curve - + self.tangent_length - + self.dist_curve2 - + self.md1 - ) - - def _get_pos2(self, pos1, vec1, pos_target, radius): - distances = self._get_distances(pos1, vec1, pos_target) - - radius_temp = get_radius_critical( - self.radius_design, - distances, - self.min_error - ) - if radius_temp > radius: - radius_temp = radius - assert self.radius_critical > 0 - - ( - tangent_length, - dogleg - ) = min_dist_to_target( - radius_temp, - distances - ) - - dogleg = check_dogleg(dogleg) - - dist_curve, func_dogleg = get_curve_hold_data( - radius_temp, - dogleg - ) - vec3 = get_vec_target( - pos1, - vec1, - pos_target, - tangent_length, - dist_curve, - func_dogleg - ) - - tangent_temp = self._get_tangent_temp(tangent_length) - - pos2 = ( - pos_target - ( - tangent_temp * vec3 - ) - ) - return pos2 - - def _target_pos_and_vec_defined(self, pos3, vec_old=[0., 0., 0.]): - """ - Function for fitting a curve, hold, curve path between a pair of - points with vectors. - - It's written in this odd way to allow a solver function like scipy's - optimize to solve it, but so far I've not had much success using - these. - - If the curve sections can't be achieved with the design DLS values, - this function will iterate until the two curve section DLSs are - approximately balances (in DLS terms) within the prescribed delta_dls - parameter. - """ - args = (self, None, [0., 0., 0.]) - minimize_target_pos_and_vec_defined( - *( - ([ - self.radius_design, - self.radius_design2 - ],) - + args + (True,) - ) - ) - if not all(( - self.radius_critical > self.radius_design, - self.radius_critical2 > self.radius_design2 - )): - while True and self.iterations <= self.max_iterations: - self.radius_critical = ( - (self.md_target - self.md1) - / (abs(self.dogleg) + abs(self.dogleg2)) - ) - assert self.radius_critical >= 0 - self.radius_critical2 = self.radius_critical - args = (self, deepcopy(self.pos3), deepcopy(self.vec23[-1])) - minimize_target_pos_and_vec_defined( - *( - ([ - self.radius_design, - self.radius_design2 - ],) - + args + (True,) - ) - ) - if abs( - self.dls - self.dls2 - ) < self.delta_dls: - break - self._happy_finish() - return - - self._happy_finish() - - def _happy_finish(self): - # get pos1 to pos2 curve data - self.dist_curve, self.func_dogleg = get_curve_hold_data( - min(self.radius_critical, self.radius_design), - self.dogleg - ) - - self.vec3 = get_vec_target( - self.pos1, - self.vec1, - self.pos3, - self.tangent_length, - self.dist_curve, - self.func_dogleg - ) - - self.vec2 = self.vec3 - - self.pos2 = get_pos( - self.pos1, - self.vec1, - self.vec3, - self.dist_curve, - self.func_dogleg - ).reshape(3) - - self.md2 = self.md1 + abs(self.dist_curve) - - self.dist_curve2, self.func_dogleg2 = get_curve_hold_data( - min(self.radius_critical2, self.radius_design2), - self.dogleg2 - ) - - self.pos3 = get_pos( - self.pos_target, - self.vec_target * -1, - self.vec3 * -1, - self.dist_curve2, - self.func_dogleg2 - ).reshape(3) - - tangent_length = np.linalg.norm( - self.pos3 - self.pos2 - ) - - self.md3 = self.md2 + tangent_length - - self.md_target = self.md3 + abs(self.dist_curve2) - - def interpolate(self, step=30): - return interpolate_well([self], step) - - - def _get_tangent_temp(self, tangent_length): - if np.isnan(tangent_length): - tangent_temp = self.min_tangent - else: - tangent_temp = max(tangent_length, self.min_tangent) - - return tangent_temp - - def _mod_pos(self, pos): - pos_rand = np.random.random(3) # * self.delta_radius - pos += pos_rand - - def _get_distances(self, pos1, vec1, pos_target): - # When initializing a `curve_hold_curve` and pos_target is directly - # ahead it can cause issues (it's a hold and not a curve). So this code - # checks for that condition and if it's the case, will move the - # target_pos a sufficient amount to prevent issues. - vec_temp = np.array(pos_target - pos1) - with np.errstate(divide='ignore', invalid='ignore'): - vec_temp = vec_temp / np.linalg.norm(vec_temp) - if np.array_equal(vec1, vec_temp): - self._mod_pos(pos_target) - if np.allclose(pos1, pos_target): - return (0, 0, 0) - - else: - dist_to_target = ( - np.linalg.norm((pos_target - pos1)) - ) - - dist_perp_to_target = ( - np.dot((pos_target - pos1), vec1) - ) - if dist_perp_to_target > dist_to_target: - # since a tolerance is being used, occasionally things can go - # wrong and need to be caught. - dist_perp_to_target = dist_to_target - - dist_norm_to_target = ( - ( - dist_to_target ** 2 - - dist_perp_to_target ** 2 - ) ** 0.5 - ) - - return ( - dist_to_target, - dist_perp_to_target, - dist_norm_to_target - ) - - -def minimize_target_pos_and_vec_defined( - x, c, pos3=None, vec_old=[0., 0., 0.], result=False -): - radius1, radius2 = x - if pos3 is None: - pos2_init = c._get_pos2(c.pos1, c.vec1, c.pos_target, radius1) - pos3_init = c._get_pos2( - c.pos_target, c.vec_target * -1, c.pos1, radius2 - ) - c.pos3 = pos2_init + ( - pos3_init - pos2_init - ) / 2 - c.distances1 = c._get_distances(c.pos1, c.vec1, c.pos3) - - radius_temp1 = get_radius_critical( - radius1, - c.distances1, - c.min_error - ) - if radius_temp1 < c.radius_critical: - c.radius_critical = radius_temp1 - assert c.radius_critical >= 0 - - radius_effective1 = min(radius1, c.radius_critical) - - ( - c.tangent_length, - c.dogleg - ) = min_dist_to_target( - radius_effective1, - c.distances1 - ) - - c.dogleg = check_dogleg(c.dogleg) - - c.dist_curve, c.func_dogleg = get_curve_hold_data( - radius_effective1, - c.dogleg - ) - c.vec3 = get_vec_target( - c.pos1, - c.vec1, - c.pos3, - c.tangent_length, - c.dist_curve, - c.func_dogleg - ) - - tangent_temp1 = c._get_tangent_temp(c.tangent_length) - - c.pos2 = ( - c.pos3 - ( - tangent_temp1 * c.vec3 - ) - ) - - c.distances2 = c._get_distances( - c.pos_target, - c.vec_target * -1, - c.pos2 - ) - - radius_temp2 = get_radius_critical( - radius2, - c.distances2, - c.min_error - ) - if radius_temp2 < c.radius_critical2: - c.radius_critical2 = radius_temp2 - assert c.radius_critical2 >= 0 - - radius_effective2 = min(radius2, c.radius_critical2) - - ( - c.tangent_length2, - c.dogleg2 - ) = min_dist_to_target( - radius_effective2, - c.distances2 - ) - - c.dogleg2 = check_dogleg(c.dogleg2) - - c.dist_curve2, c.func_dogleg2 = get_curve_hold_data( - radius_effective2, - c.dogleg2 - ) - c.vec2 = get_vec_target( - c.pos_target, - c.vec_target * -1, - c.pos2, - c.tangent_length2, - c.dist_curve2, - c.func_dogleg2 - ) - - tangent_temp2 = c._get_tangent_temp(c.tangent_length2) - - c.pos3 = ( - c.pos2 - ( - tangent_temp2 * c.vec2 - ) - ) - - vec23_denom = np.linalg.norm(c.pos3 - c.pos2) - if vec23_denom == 0: - c.vec23.append(np.array([0., 0., 0.])) - else: - c.vec23.append((c.pos3 - c.pos2) / vec23_denom) - - c.error = np.allclose( - c.vec23[-1], - vec_old, - equal_nan=True, - rtol=c.min_error * 10, - atol=c.min_error * 0.1 - ) - - c.errors.append(c.error) - c.pos3_list.append(c.pos3) - c.pos2_list.append(c.pos2) - c.md_target = ( - c.md1 + c.dist_curve + tangent_temp2 + c.dist_curve2 - ) - c.delta_radius_list.append( - abs(c.radius_critical - c.radius_critical2) - ) - c.dls = max( - dls_from_radius(c.radius_design), - dls_from_radius(c.radius_critical) - ) - c.dls2 = max( - dls_from_radius(c.radius_design2), - dls_from_radius(c.radius_critical2) - ) - - if c.error: - if result: - return c - result = 0. - if radius_temp2 < c.radius_design2: - result += c.radius_design2 - radius_temp2 - if radius_temp1 < c.radius_design: - result += c.radius_design - radius_temp1 - return result - else: - c.iterations += 1 - return minimize_target_pos_and_vec_defined( - x, c, c.pos3, c.vec23[-1], result - ) - - -def check_dogleg(dogleg): - # the code assumes angles are positive and clockwise - if dogleg < 0: - dogleg_new = dogleg + 2 * np.pi - return dogleg_new - else: - return dogleg - - -def mod_vec(vec, error=1e-5): - # if it's not working then twat it with a hammer - vec_mod = vec * np.array([1, 1, 1 - error]) - vec_mod /= np.linalg.norm(vec_mod) - inc_mod, azi_mod = get_angles(vec_mod, nev=True).T - - return vec_mod, inc_mod, azi_mod - - -def _get_xyz(pos): - n, e, v = pos - return np.array([e, n, v]).reshape(-1, 3) - - -def get_pos(pos1, vec1, vec2, dist_curve, func_dogleg): - inc1, azi1 = _get_angles(_get_xyz(vec1)).reshape(2) - inc2, azi2 = _get_angles(_get_xyz(vec2)).reshape(2) - - pos2 = pos1 + ( - ( - dist_curve * func_dogleg / 2 - ) - * - np.array([ - np.sin(inc1) * np.cos(azi1) + np.sin(inc2) * np.cos(azi2), - np.sin(inc1) * np.sin(azi1) + np.sin(inc2) * np.sin(azi2), - np.cos(inc1) + np.cos(inc2) - ]).T - ) - - return pos2 - - -def get_vec_target( - pos1, - vec1, - pos_target, - tangent_length, - dist_curve, - func_dogleg -): - if dist_curve == 0: - return vec1 - - vec_target = ( - ( - pos_target - pos1 - ( - dist_curve - * func_dogleg - ) / 2 * vec1 - ) - / - ( - ( - dist_curve * func_dogleg / 2 - ) + tangent_length - ) - ) - - vec_target /= np.linalg.norm(vec_target) - - return vec_target - - -def get_curve_hold_data(radius, dogleg): - dist_curve = radius * dogleg - func_dogleg = shape_factor(dogleg) - - return ( - dist_curve, - func_dogleg - ) - - -def shape_factor(dogleg): - """ - Function to determine the shape factor of a dogleg. - - Parameters - ---------- - dogleg: float - The dogleg angle in radians of a curve section. - """ - if dogleg == 0: - return 0 - else: - return np.tan(dogleg / 2) / (dogleg / 2) - - -def min_dist_to_target(radius, distances): - """ - Calculates the control points for a curve and hold section from the - start position and vector to the target position. - """ - ( - dist_to_target, - dist_perp_to_target, - dist_norm_to_target - ) = distances - - tangent_length = ( - dist_to_target ** 2 - - 2 * radius * dist_norm_to_target - ) ** 0.5 - - # determine the dogleg angle of the curve section - dogleg = 2 * np.arctan2( - (dist_perp_to_target - tangent_length), - ( - 2 * radius - dist_norm_to_target - ) - ) - - return tangent_length, dogleg - - -def min_curve_to_target(distances): - """ - Calculates the control points for a curve section from the start - position and vector to the target position which is not achievable with - the provided dls_design. - """ - if distances == (0., 0., 0,): - return ( - 0., - np.inf, - 0. - ) - - ( - dist_to_target, - dist_perp_to_target, - dist_norm_to_target - ) = distances - - if dist_norm_to_target == 0.: - radius_critical = np.inf - else: - radius_critical = ( - dist_to_target ** 2 / ( - 2 * dist_norm_to_target - ) - ) - assert radius_critical > 0 - - dogleg = ( - 2 * np.arctan2( - dist_norm_to_target, - dist_perp_to_target - ) - ) - - tangent_length = 0 - - return ( - tangent_length, - radius_critical, - dogleg - ) - - -def get_radius_critical(radius, distances, min_error): - ( - dist_to_target, - dist_perp_to_target, - dist_norm_to_target - ) = distances - - if dist_norm_to_target == 0: - return 0 - - radius_critical = ( - dist_to_target ** 2 / ( - 2 * dist_norm_to_target - ) - ) * (1 - min_error) - - assert radius_critical > 0 - - return radius_critical - - -def angle(vec1, vec2, acute=True): - angle = np.arccos( - np.dot(vec1, vec2) - / - ( - np.linalg.norm(vec1) * np.linalg.norm(vec2) - ) - ) - - if acute: - return angle - - else: - return 2 * np.pi - angle - - -def get_dogleg(inc1, azi1, inc2, azi2): - dogleg = ( - 2 * np.arcsin( - ( - np.sin((inc2 - inc1) / 2) ** 2 - + np.sin(inc1) * np.sin(inc2) - * np.sin((azi2 - azi1) / 2) ** 2 - ) ** 0.5 - ) - ) - - return dogleg - - -def interpolate_well(sections, step=30): - """ - Constructs a well survey from a list of sections of control points. - - Parameters - ---------- - sections: list of welleng.connector.Connector objects - step: float (default: 30) - The desired delta measured depth between survey points, in - addition to the control points. - - Returns - ------- - data: list of welleng.connector.Connector objects - """ - method = { - 'hold': get_interpolate_hold, - 'min_dist_to_target': get_interpolate_min_dist_to_target, - 'min_curve_to_target': get_interpolate_min_curve_to_target, - 'curve_hold_curve': get_interpololate_curve_hold_curve, - 'min_curve': get_min_curve - } - - data = [] - if type(sections) is not list: - sections = [sections] - for s in sections: - data.extend(method[s.method](s, step)) - - return data - - -def interpolate_curve( - md1, - pos1, - vec1, - vec2, - dist_curve, - dogleg, - func_dogleg, - step, - endpoint=False -): - # sometimes the curve section has no length - # this if statement handles this event - if dist_curve == 0: - inc, azi = get_angles(vec1, nev=True).T - data = dict( - md=np.array([md1]), - vec=np.array([vec1]), - inc=inc, - azi=azi, - dogleg=np.array([dogleg]) - ) - - return data - - end_md = abs(dist_curve) - if step is None: - md = np.array([0]) - else: - start_md = step - (md1 % step) - md = np.arange(start_md, end_md, step) - md = np.concatenate(([0.], md)) - if endpoint: - md = np.concatenate((md, [end_md])) - dogleg_interp = (dogleg / dist_curve * md).reshape(-1, 1) - - vec = ( - ( - np.sin(dogleg - dogleg_interp) / np.sin(dogleg) * vec1 - ) - + - ( - np.sin(dogleg_interp) / np.sin(dogleg) * vec2 - ) - ) - vec = vec / np.linalg.norm(vec, axis=1).reshape(-1, 1) - inc, azi = get_angles(vec, nev=True).T - - data = dict( - md=md + md1, - vec=vec, - inc=inc, - azi=azi, - dogleg=np.concatenate(( - np.array([0.]), np.diff(dogleg_interp.reshape(-1)) - )), - ) - - return data - - -def interpolate_hold(md1, pos1, vec1, md2, step, endpoint=False): - end_md = md2 - md1 - if step is None: - md = np.array([0]) - else: - start_md = step - (md1 % step) - md = np.arange(start_md, end_md, step) - md = np.concatenate(([0.], md)) - if endpoint: - md = np.concatenate((md, [end_md])) - vec = np.full((len(md), 3), vec1) - inc, azi = get_angles(vec, nev=True).T - dogleg = np.full_like(md, 0.) - - data = dict( - md=md + md1, - vec=vec, - inc=inc, - azi=azi, - dogleg=dogleg, - ) - - return data - - -def get_min_curve(section, step=30, data=None): - if section.md2 is None: - result = ( - get_interpolate_min_curve_to_target( - section, step, data - ) - ) - else: - result = ( - get_interpolate_min_dist_to_target( - section, step, data - ) - ) - return result - - -def get_interpolate_hold(section, step=30, data=None): - if data is None: - data = [] - - data.append(interpolate_hold( - md1=section.md1, - pos1=section.pos1, - vec1=section.vec1, - md2=section.md_target, - step=step, - endpoint=True - )) - - return data - - -def get_interpolate_min_curve_to_target(section, step=30, data=None): - if data is None: - data = [] - - data.append(interpolate_curve( - md1=section.md1, - pos1=section.pos1, - vec1=section.vec1, - vec2=section.vec_target, - dist_curve=section.dist_curve, - dogleg=section.dogleg, - func_dogleg=section.func_dogleg, - step=step, - endpoint=True - )) - - return data - - -def get_interpolate_min_dist_to_target(section, step=30, data=None): - if data is None: - data = [] - - # the first curve section - data.append(interpolate_curve( - md1=section.md1, - pos1=section.pos1, - vec1=section.vec1, - vec2=section.vec2, - dist_curve=section.dist_curve, - dogleg=section.dogleg, - func_dogleg=section.func_dogleg, - step=step - )) - - # the hold section - data.append(interpolate_hold( - md1=section.md2, - pos1=section.pos2, - vec1=section.vec2, - md2=section.md_target, - step=step, - endpoint=True - )) - - return data - - -def get_interpololate_curve_hold_curve(section, step=30, data=None): - if data is None: - data = [] - - # the first curve section - data.append(interpolate_curve( - md1=section.md1, - pos1=section.pos1, - vec1=section.vec1, - vec2=section.vec2, - dist_curve=section.dist_curve, - dogleg=section.dogleg, - func_dogleg=section.func_dogleg, - step=step - )) - - # the hold section - data.append(interpolate_hold( - md1=section.md2, - pos1=section.pos2, - vec1=section.vec2, - md2=section.md3, - step=step - )) - - # the second curve section - data.append(interpolate_curve( - md1=section.md3, - pos1=section.pos3, - vec1=section.vec3, - vec2=section.vec_target, - dist_curve=section.dist_curve2, - dogleg=section.dogleg2, - func_dogleg=section.func_dogleg2, - step=step, - endpoint=True - )) - - return data - - -def convert_target_input_to_booleans(*inputs): - input = [ - "0" if i is None else "1" for i in inputs - ] - - return ''.join(input) - - -def connect_points( - cartesians, vec_start=[0., 0., 1.], dls_design=3.0, nev=True, - # step=30, - md_start=0. -): - """ - Function for connecting a list or array of only Cartesian points. - - Parameters - ---------- - cartesians: (n, 3) list or array of floats - Either [n, e, tvd] (default) or [x, y, z] - vec_start: (3) list or array of floats (default: [0., 0., 1.]) - Unit start vector (default is pointing down) in the nev or xyz - coordinate system. - dls_design: float or (n, 1) list or array of floats (default: 3.0) - The minimum Dog Leg Severity used when attempting to connect the - points (a high DLS will be used if necessary). - nev: bool (default: True) - Indicates whether the cartesians are referencing the [nev] - (default) or [xyz] coordinate system. - step: float (default: 30) - The desired step interval for the returned Survey object. - md_start: float (default: 0) - The md at the first cartesian point (in the event of a tie-on). - - Returns - ------- - data: list of welleng.connector.Connector objects - """ - if nev: - pos_nev = np.array(cartesians).reshape(-1, 3) - vec_nev = np.zeros_like(pos_nev) - vec_nev[0] = np.array(vec_start).reshape(-1, 3) - else: - e, n, tvd = np.array(cartesians).reshape(-1, 3).T - pos_nev = np.array([n, e, tvd]).T - vec_nev = np.zeros_like(pos_nev) - e, n, tvd = np.array(vec_start).reshape(-1, 3).T - vec_nev[0] = np.array([n, e, tvd]).T - - if type(dls_design) is float: - dls = np.full(len(pos_nev), dls_design) - else: - dls = np.array(dls_design).reshape(-1, 1) - - connections = [] - for i, (p, v, d) in enumerate(zip(pos_nev, vec_nev, dls)): - if i == 0: - node_1 = Node( - pos=p, - vec=v, - md=md_start - ) - continue - if i > 1: - node_1 = connections[-1].node_end - node_2 = Node( - pos=p - ) - c = Connector( - node1=node_1, - node2=node_2, - dls_design=d - ) - assert np.allclose(c.pos_target, p) - connections.append(c) - - return connections - - -def survey_to_plan(survey, tolerance=0.2, dls_design=1., step=30.): - """ - Prototype function for extracting a plan from a drilled well survey - a - minimal number of control points (begining/end points of either hold or - build/turn sections) required to express the well given the provided input - parameters. - - Parameters - ---------- - survey: welleng.survey.Survey object - tolerance: float (default=0.2) - Defines how tight to fit the planned well to the survey - a higher - number will results in less control points but likely poorer fit. - dls_design: float (default=1.0) - The minimum DLS used to fit the planned trajectory. - step: float (default=30) - The desired md step in the plan survey. - - Returns - ------- - sections: list of welleng.connector.Connection objects - """ - assert dls_design > 0., "dls_design must be greater than 0" - - idx = [0] - end = len(survey.md) - 1 - md = survey.md[0] - node = None - sections = [] - - while True: - section, i = _get_section( - survey=survey, - start=idx[-1], - md=md, - node=node, - tolerance=tolerance, - dls_design=dls_design - ) - sections.append(section) - idx.append(i) - if idx[-1] >= end: - break - node = section.node_end - - data = interpolate_well(sections, step=30) - - return sections - - -def _get_section( - survey, start, tolerance, dls_design=1., md=0., node=None -): - idx = start + 2 - nev = survey.get_nev_arr() - - if idx > len(nev): - idx = len(nev) - 1 - - if node is None: - node_1 = Node( - pos=nev[start], - vec=survey.vec_nev[start], - md=md - ) - else: - node_1 = node - - scores = [0.] - delta_scores = [] - c_old = None - - for i, (p, v) in enumerate(zip(nev[idx:], survey.vec_nev[idx:])): - node_2 = Node( - pos=p, - vec=v, - ) - c = Connector( - node1=node_1, - node2=node_2, - dls_design=dls_design - ) - s = c.survey(step=1.) - if c_old is None: - c_old = deepcopy(c) - nev_new = s.get_nev_arr() - - distances = distance.cdist( - nev[start: idx + i], - nev_new - ) - - score = np.sum(np.amin(distances, axis=1)) / (s.md[-1] - s.md[0]) - - delta_scores.append(score - scores[-1]) - scores.append(score) - - if all(( - abs(delta_scores[-1]) >= tolerance, - idx + i < len(nev) - 2 - )): - break - elif idx + i == len(nev) - 1: - c_old = deepcopy(c) - break - else: - c_old = deepcopy(c) - - return ( - c_old, - idx + i - 1 if idx + i != len(nev) - 1 else idx + i - ) - - -def numbafy(functions): - for f in functions: - f = njit(f) - - -if NUMBA: - NUMBAFY = ( - _get_xyz, - get_pos, - get_vec_target, - get_curve_hold_data, - shape_factor, - min_dist_to_target, - get_radius_critical, - angle, - get_dogleg - ) - numbafy(NUMBAFY) +from copy import copy, deepcopy + +import numpy as np +# from scipy.optimize import minimize +from scipy.spatial import distance + +try: + from numba import njit + NUMBA = True +except ImportError: + NUMBA = False + +from .node import Node, get_node_params +from .utils import (NEV_to_HLA, _get_angles, dls_from_radius, get_angles, + get_nev, get_unit_vec, get_vec, get_xyz) + + +class Connector: + def __init__( + self, + node1=None, + node2=None, + pos1=[0., 0., 0.], + vec1=None, + inc1=None, + azi1=None, + md1=0, + dls_design=3.0, + dls_design2=None, + md2=None, + pos2=None, + vec2=None, + inc2=None, + azi2=None, + degrees=True, + unit='meters', + min_error=1e-5, + delta_dls=0.1, + min_tangent=0., + max_iterations=1_000, + force_min_curve=False, + closest_approach=False + ): + + """ + A class to provide a fast, efficient method for determining well + bore section geometry using the appropriate method based upon the + provided parameters, with the intent of assisting machine learning + fitness functions. Interpolation between the returned control points + can be performed posthumously - attempts are made to keep processing to + a minimum in the event that the Connector is being used for machine + learning. + + Only specific combinations of input data are permitted, e.g. if you + input both a start vector AND a start inc and azi you'll get an error + (which one is correct after all?). Think about what you're trying to + achieve and provide the data that will facilitate that outcome. + + Parameters + ---------- + pos1: (3) list or array of floats (default: [0,0,0]) + Start position in NEV coordinates. + vec1: (3) list or array of floats or None (default: None) + Start position unit vector in NEV coordinates. + inc1: float or None (default: None) + Start position inclination. + azi2: float or None (default: None) + Start position azimuth. + md1: float or None (default: None) + Start position measured depth. + dls_design: float (default: 3.0) + The desired Dog Leg Severity (DLS) for the (first) curved + section in degrees per 30 meters or 100 feet. + dls_design2: float or None (default: None) + The desired DLS for the second curve section in degrees per + 30 meters or 100 feet. If set to None then `dls_design` will + be the default value. + md2: float or None (default: None) + The measured depth of the target position. + pos2: (3) list or array of floats or None (default: None) + The position of the target in NEV coordinates. + vec2: (3) list or array of floats or None (default: None) + The target unit vector in NEV coordinates. + inc1: float or None (default: None) + The inclination at the target position. + azi2: float or None (default: None) + The azimuth at the target position. + degrees: boolean (default: True) + Indicates whether the input angles (inc, azi) are in degrees + (True) or radians (False). + unit: string (default: 'meters') + Indicates the distance unit, either 'meters' or 'feet'. + min_error: float (default: 1e-5): + Infers the error tolerance of the results and is used to set + iteration stops when the desired error tolerance is met. Value + must be less than 1. Use with caution as the code may + become unstable if this value is changed. + delta_radius: float (default: 20) + The delta radius (first curve and second curve sections) used + as an iteration stop when balancing radii. If the resulting + delta radius yielded from `dls_design` and `dls_design2` is + larger than `delta_radius`, then `delta_radius` defaults to + the former. + delta_dls: float (default: 0.1) + The delta dls (first curve and second curve sections) used as an + iteration stop when balancing radii, i.e. if the dls of the second + section is within 0.1 deg/30m of the first curve section then the + section is considered balanced and no further iterations are + performed. Setting this value too low will likely result in hitting + the recursion limit. + min_tangent: float (default: 10) + The minimum tangent length in the `curve_hold_curve` method + used to mitigate instability during iterations (where the + tangent section approaches or equals 0). + max_iterations: int (default: 1000) + The maximum number of iterations before giving up trying to + fit a `curve_hold_curve`. This number is limited by Python's + depth of recursion, but if you're hitting the default stop + then consider changing `delta_radius` and `min_tangent` as + your expectations may be unrealistic (this is open source + software after all!) + + Results + ------- + connector: welleng.connector.Connector object + """ + if node1 is not None: + pos1, vec1, md1 = get_node_params( + node1 + ) + if node2 is not None: + pos2, vec2, md2 = get_node_params( + node2 + ) + + # Set up a lookup dictionary to use with the logic to determine + # what connector method to deploy. Use a binary string to + # represent the inputs in the order: + # (md2, inc2, azi2, pos2, vec2) + # Initially I used boolean logic, but it quickly became non- + # transparent and difficult to debug. + self._get_initial_methods() + + # METHODS = [ + # 'hold', + # 'curve_hold', + # 'min_dist_to_target', + # 'min_curve_to_target', + # 'curve_hold_curve', + # 'min_curve' + # ] + + # quick check that inputs are workable and if not some steer to + # the user. + assert vec1 is not None or (inc1 is not None and azi1 is not None), ( + "Require either vec1 or (inc1 and azi1)" + ) + if vec1 is not None: + assert inc1 is None and azi1 is None, ( + "Either vec1 or (inc1 and azi1)" + ) + if (inc1 is not None or azi1 is not None): + assert vec1 is None, "Either vec1 or (inc1 and azi1)" + + assert ( + md2 is not None + or pos2 is not None + or vec2 is not None + or inc2 is not None + or azi2 is not None + ), "Missing target parameters" + + if vec2 is not None: + assert not (inc2 or azi2), "Either vec2 or (inc2 and azi2)" + if (inc2 is not None or azi2 is not None): + assert vec2 is None, "Either vec2 or (inc2 and azi2)" + if md2 is not None: + assert pos2 is None, "Either md2 or pos2" + assert md2 >= md1, "md2 must be larger than md1" + + if dls_design is None: + dls_design = 3.0 + else: + assert dls_design > 0, "dls_design must be greater than zero" + assert min_error < 1, "min_error must be less than 1.0" + + # figure out what method is required to connect the points + target_input = convert_target_input_to_booleans( + md2, inc2, azi2, pos2, vec2 + ) + + self.force_min_curve = force_min_curve + if self.force_min_curve: + self.initial_method = 'min_curve_or_hold' + else: + self.initial_method = self.initial_methods[target_input] + + # do some more initialization stuff + self.min_error = min_error + self.min_tangent = min_tangent + self.iterations = 0 + self.max_iterations = max_iterations + self.errors = [] + self.radii = [] + self.dogleg_old, self.dogleg2_old = 0, 0 + self.dist_curve2 = 0 + self.pos1 = np.array(pos1) + + self.pos2, self.vec2, self.inc2, self.azi2, self.md2 = ( + None, None, None, None, None + ) + + # fill in the input data gaps + if (inc1 is not None and azi1 is not None): + if degrees: + self.inc1 = np.radians(inc1) + self.azi1 = np.radians(azi1) + else: + self.inc1 = inc1 + self.azi1 = azi1 + self.vec1 = np.array(get_vec( + self.inc1, self.azi1, nev=True, deg=False + )).reshape(3) + else: + self.vec1 = np.array(vec1).reshape(3) + self.inc1, self.azi1 = get_angles(self.vec1, nev=True).reshape(2) + + self.md1 = md1 + self.pos_target = None if pos2 is None else np.array(pos2).reshape(3) + self.md_target = md2 + + if vec2 is not None: + self.vec_target = np.array(vec2).reshape(3) + self.inc_target, self.azi_target = get_angles( + self.vec_target, + nev=True + ).reshape(2) + elif (inc2 is not None and azi2 is not None): + if degrees: + self.inc_target = np.radians(inc2) + self.azi_target = np.radians(azi2) + else: + self.inc_target = inc2 + self.azi_target = azi2 + self.vec_target = get_vec( + self.inc_target, self.azi_target, nev=True, deg=False + ).reshape(3) + elif inc2 is None and azi2 is None: + self.inc_target, self.azi_target, self.vec_target = ( + self.inc1, self.azi1, self.vec1 + ) + elif inc2 is None: + self.inc_target = self.inc1 + if degrees: + self.azi_target = np.radians(azi2) + else: + self.azi_target = azi2 + self.vec_target = get_vec( + self.inc_target, self.azi_target, nev=True, deg=False + ).reshape(3) + elif azi2 is None: + self.azi_target = self.azi1 + if degrees: + self.inc_target = np.radians(inc2) + else: + self.inc_target = inc2 + self.vec_target = get_vec( + self.inc_target, self.azi_target, nev=True, deg=False + ).reshape(3) + else: + self.vec_target = vec2 + self.inc_target = inc2 + self.azi_target = azi2 + + self.unit = unit + if self.unit == 'meters': + self.denom = 30 + else: + self.denom = 100 + + if degrees: + self.dls_design = np.radians(dls_design) + if dls_design2: + self.dls_design2 = np.radians(dls_design2) + else: + self.dls_design = dls_design + if dls_design2: + self.dls_design2 = dls_design2 + if not dls_design2: + self.dls_design2 = self.dls_design + + self.radius_design = self.denom / self.dls_design + self.radius_design2 = self.denom / self.dls_design2 + + self.delta_dls = delta_dls + + # some more initialization stuff + self.tangent_length = None + self.dogleg2 = None + + self.pos3, self.vec3, self.inc3, self.azi3, self.md3 = ( + None, None, None, None, None + ) + self.radius_critical, self.radius_critical2 = np.inf, np.inf + self.closest_approach = closest_approach + + # Things fall apart if the start and end vectors exactly equal + # one another, so need to check for this and if this is the + # case, modify the end vector slightly. This is a lazy way of + # doing this, but it's fast. Probably a more precise way would + # be to split the dogleg in two, but that's more hassle than + # it's worth. + if ( + self.vec_target is not None + and np.array_equal(self.vec_target, self.vec1 * -1) + ): + ( + self.vec_target, + self.inc_target, + self.azi_target + ) = mod_vec(self.vec_target, self.min_error) + + # properly figure out the method + self._get_method() + + # and finally, actually do something... + self._use_method() + + self._get_nodes() + + def _get_nodes(self): + self.node_start = Node( + pos=self.pos1.reshape(3), + vec=self.vec1.reshape(3), + md=self.md1 + ) + self.node_end = Node( + pos=self.pos_target.reshape(3), + vec=self.vec_target.reshape(3), + md=self.md_target + ) + + def _min_dist_to_target(self): + ( + self.tangent_length, + self.dogleg + ) = min_dist_to_target(self.radius_design, self.distances) + self.dogleg = check_dogleg(self.dogleg) + self.dist_curve, self.func_dogleg = get_curve_hold_data( + self.radius_design, self.dogleg + ) + self.vec_target = get_vec_target( + self.pos1, + self.vec1, + self.pos_target, + self.tangent_length, + self.dist_curve, + self.func_dogleg + ) + self._get_angles_target() + self._get_md_target() + self.pos2 = ( + self.pos_target - ( + self.tangent_length * self.vec_target + ) + ) + self.md2 = self.md1 + abs(self.dist_curve) + self.md_target = self.md2 + self.tangent_length + self.vec2 = self.vec_target + + def _min_curve_to_target(self): + ( + self.tangent_length, + self.radius_critical, + self.dogleg + ) = min_curve_to_target(self.distances) + self.dogleg = check_dogleg(self.dogleg) + self.dist_curve, self.func_dogleg = get_curve_hold_data( + min(self.radius_design, self.radius_critical), self.dogleg + ) + self.vec_target = get_vec_target( + self.pos1, + self.vec1, + self.pos_target, + self.tangent_length, + self.dist_curve, + self.func_dogleg + ) + self._get_angles_target() + self._get_md_target() + + def _use_method(self): + if self.method == 'hold': + self._hold() + elif self.method == 'min_curve': + self._min_curve() + elif self.method == 'curve_hold_curve': + self.pos2_list, self.pos3_list = [], [deepcopy(self.pos_target)] + self.vec23 = [np.array([0., 0., 0.])] + self.delta_radius_list = [] + # self._target_pos_and_vec_defined(deepcopy(self.pos_target)) + self._target_pos_and_vec_defined( + self.pos1 + (self.pos_target - self.pos1) / 2 + ) + else: + self.distances = self._get_distances( + self.pos1, self.vec1, self.pos_target + ) + if self.radius_design <= get_radius_critical( + self.radius_design, self.distances, self.min_error + ): + self.method = 'min_dist_to_target' + self._min_dist_to_target() + else: + if self.closest_approach: + self.method = 'min_curve_to_target' + self._closest_approach() + else: + self.method = 'min_curve_to_target' + self._min_curve_to_target() + + def _get_method(self): + assert self.initial_method not in [ + 'no_input', + 'vec_and_inc_azi', + 'md_and_pos' + ], f"{self.initial_method}" + if self.initial_method == 'hold': + self.method = 'hold' + elif self.initial_method[-8:] == '_or_hold': + if np.array_equal(self.vec_target, self.vec1): + if self.pos_target is None: + self.method = 'hold' + elif np.allclose( + self.vec_target, + (self.pos_target - self.pos1) + / np.linalg.norm(self.pos_target - self.pos1) + ): + self.method = 'hold' + else: + self.method = self.initial_method[:-8] + else: + self.method = self.initial_method[:-8] + else: + self.method = self.initial_method + + def _get_initial_methods(self): + # TODO: probably better to load this in from a yaml file + # [md2, inc2, azi2, pos2, vec2] forms the booleans + self.initial_methods = { + '00000': 'no_input', + '00001': 'min_curve_or_hold', + '00010': 'curve_hold_or_hold', + '00011': 'curve_hold_curve_or_hold', + '00100': 'min_curve_or_hold', + '00101': 'vec_and_inc_azi', + '00110': 'curve_hold', + '00111': 'vec_and_inc_azi', + '01000': 'min_curve_or_hold', + '01001': 'vec_and_inc_azi', + '01010': 'curve_hold_or_hold', + '01011': 'vec_and_inc_azi', + '01100': 'min_curve_or_hold', + '01101': 'vec_and_inc_azi', + '01110': 'curve_hold_curve_or_hold', + '01111': 'vec_and_inc_azi', + '10000': 'hold', + '10001': 'min_curve_or_hold', + '10010': 'md_and_pos', + '10011': 'md_and_pos', + '10100': 'min_curve_or_hold', + '10101': 'vec_and_inc_azi', + '10110': 'md_and_pos', + '10111': 'md_and_pos', + '11000': 'min_curve_or_hold', + '11001': 'vec_and_inc_azi', + '11010': 'md_and_pos', + '11011': 'md_and_pos', + '11100': 'min_curve_or_hold', + '11101': 'vec_and_inc_azi', + '11110': 'md_and_pos', + '11111': 'md_and_pos' + } + + def _closest_approach(self): + vec_pos1_pos_target = self.pos_target - self.pos1 + vec_pos1_pos_target /= np.linalg.norm(vec_pos1_pos_target) + + cross_product = np.cross(vec_pos1_pos_target, self.vec1) + cross_product /= np.linalg.norm(cross_product) + + factor = cross_product / vec_pos1_pos_target + factor /= abs(factor) + + cc = ( + self.pos1 + cross_product * factor * self.radius_design + ) + + cc_pos_target = self.pos_target - cc + cc_pos_target /= np.linalg.norm(cc_pos_target) + + self.pos_target_original = copy(self.pos_target) + + self.pos_target = cc + cc_pos_target * self.radius_design + + # recalculate self.distances with new self.pos_target + self.distances = self._get_distances( + self.pos1, self.vec1, self.pos_target + ) + + self._min_curve_to_target() + + def _min_curve(self): + self.dogleg = get_dogleg( + self.inc1, self.azi1, self.inc_target, self.azi_target + ) + + self.dogleg = check_dogleg(self.dogleg) + if self.md_target is None: + self.md2 = None + self.dist_curve, self.func_dogleg = get_curve_hold_data( + self.radius_design, self.dogleg + ) + self.md_target = self.md1 + abs(self.dist_curve) + self.pos_target = get_pos( + self.pos1, + self.vec1, + self.vec_target, + self.dist_curve, + self.func_dogleg + ).reshape(3) + else: + with np.errstate(divide='ignore'): + self.radius_critical = np.nan_to_num(abs( + (self.md_target - self.md1) / self.dogleg + ), nan=np.inf) + if ( + self.radius_critical > self.radius_design + or ( + np.around(self.dogleg, decimals=5) + == np.around(np.pi, decimals=5) + ) + ): + self.md2 = ( + self.md1 + + min(self.radius_design, self.radius_critical) + * self.dogleg + ) + ( + self.inc2, self.azi2, self.vec2 + ) = self.inc_target, self.azi_target, self.vec_target + self.dist_curve, self.func_dogleg = get_curve_hold_data( + min(self.radius_design, self.radius_critical), + self.dogleg + ) + self.pos2 = get_pos( + self.pos1, + self.vec1, + self.vec2, + self.dist_curve, + self.func_dogleg + ).reshape(3) + self.pos_target = self.pos2 + ( + self.vec2 * (self.md_target - self.md2) + ) + else: + self.dist_curve, self.func_dogleg = get_curve_hold_data( + self.radius_critical, self.dogleg + ) + self.md2 = None + self.pos_target = get_pos( + self.pos1, + self.vec1, + self.vec_target, + self.dist_curve, + self.func_dogleg + ).reshape(3) + + def _hold(self): + if self.pos_target is None: + self.pos_target = ( + self.pos1 + self.vec1 * (self.md_target - self.md1) + ) + if self.md_target is None: + self.md_target = ( + np.linalg.norm(self.pos_target - self.pos1) + + self.md1 + ) + self.dls, self.dls2 = 0.0, 0.0 + + def _get_angles_target(self): + self.inc_target, self.azi_target = get_angles( + self.vec_target, nev=True + ).reshape(2) + + def _get_md_target(self): + self.md_target = ( + self.dist_curve + + self.tangent_length + + self.dist_curve2 + + self.md1 + ) + + def _get_pos2(self, pos1, vec1, pos_target, radius): + distances = self._get_distances(pos1, vec1, pos_target) + + radius_temp = get_radius_critical( + self.radius_design, + distances, + self.min_error + ) + if radius_temp > radius: + radius_temp = radius + assert self.radius_critical > 0 + + ( + tangent_length, + dogleg + ) = min_dist_to_target( + radius_temp, + distances + ) + + dogleg = check_dogleg(dogleg) + + dist_curve, func_dogleg = get_curve_hold_data( + radius_temp, + dogleg + ) + vec3 = get_vec_target( + pos1, + vec1, + pos_target, + tangent_length, + dist_curve, + func_dogleg + ) + + tangent_temp = self._get_tangent_temp(tangent_length) + + pos2 = ( + pos_target - ( + tangent_temp * vec3 + ) + ) + return pos2 + + def _target_pos_and_vec_defined(self, pos3, vec_old=[0., 0., 0.]): + """ + Function for fitting a curve, hold, curve path between a pair of + points with vectors. + + It's written in this odd way to allow a solver function like scipy's + optimize to solve it, but so far I've not had much success using + these. + + If the curve sections can't be achieved with the design DLS values, + this function will iterate until the two curve section DLSs are + approximately balances (in DLS terms) within the prescribed delta_dls + parameter. + """ + args = (self, None, [0., 0., 0.]) + minimize_target_pos_and_vec_defined( + *( + ([ + self.radius_design, + self.radius_design2 + ],) + + args + (True,) + ) + ) + if not all(( + self.radius_critical > self.radius_design, + self.radius_critical2 > self.radius_design2 + )): + while True and self.iterations <= self.max_iterations: + self.radius_critical = ( + (self.md_target - self.md1) + / (abs(self.dogleg) + abs(self.dogleg2)) + ) + assert self.radius_critical >= 0 + self.radius_critical2 = self.radius_critical + args = (self, deepcopy(self.pos3), deepcopy(self.vec23[-1])) + minimize_target_pos_and_vec_defined( + *( + ([ + self.radius_design, + self.radius_design2 + ],) + + args + (True,) + ) + ) + if abs( + self.dls - self.dls2 + ) < self.delta_dls: + break + self._happy_finish() + return + + self._happy_finish() + + def _happy_finish(self): + # get pos1 to pos2 curve data + self.dist_curve, self.func_dogleg = get_curve_hold_data( + min(self.radius_critical, self.radius_design), + self.dogleg + ) + + self.vec3 = get_vec_target( + self.pos1, + self.vec1, + self.pos3, + self.tangent_length, + self.dist_curve, + self.func_dogleg + ) + + self.vec2 = self.vec3 + + self.pos2 = get_pos( + self.pos1, + self.vec1, + self.vec3, + self.dist_curve, + self.func_dogleg + ).reshape(3) + + self.md2 = self.md1 + abs(self.dist_curve) + + self.dist_curve2, self.func_dogleg2 = get_curve_hold_data( + min(self.radius_critical2, self.radius_design2), + self.dogleg2 + ) + + self.pos3 = get_pos( + self.pos_target, + self.vec_target * -1, + self.vec3 * -1, + self.dist_curve2, + self.func_dogleg2 + ).reshape(3) + + tangent_length = np.linalg.norm( + self.pos3 - self.pos2 + ) + + self.md3 = self.md2 + tangent_length + + self.md_target = self.md3 + abs(self.dist_curve2) + + def interpolate(self, step=30): + return interpolate_well([self], step) + + def _get_tangent_temp(self, tangent_length): + if np.isnan(tangent_length): + tangent_temp = self.min_tangent + else: + tangent_temp = max(tangent_length, self.min_tangent) + + return tangent_temp + + def _mod_pos(self, pos): + pos_rand = np.random.random(3) # * self.delta_radius + pos += pos_rand + + def _get_distances(self, pos1, vec1, pos_target): + # When initializing a `curve_hold_curve` and pos_target is directly + # ahead it can cause issues (it's a hold and not a curve). So this code + # checks for that condition and if it's the case, will move the + # target_pos a sufficient amount to prevent issues. + vec_temp = np.array(pos_target - pos1) + with np.errstate(divide='ignore', invalid='ignore'): + vec_temp = vec_temp / np.linalg.norm(vec_temp) + if np.array_equal(vec1, vec_temp): + self._mod_pos(pos_target) + if np.allclose(pos1, pos_target): + return (0, 0, 0) + + else: + dist_to_target = ( + np.linalg.norm((pos_target - pos1)) + ) + + dist_perp_to_target = ( + np.dot((pos_target - pos1), vec1) + ) + if dist_perp_to_target > dist_to_target: + # since a tolerance is being used, occasionally things can go + # wrong and need to be caught. + dist_perp_to_target = dist_to_target + + dist_norm_to_target = ( + ( + dist_to_target ** 2 + - dist_perp_to_target ** 2 + ) ** 0.5 + ) + + return ( + dist_to_target, + dist_perp_to_target, + dist_norm_to_target + ) + + +def minimize_target_pos_and_vec_defined( + x, c, pos3=None, vec_old=[0., 0., 0.], result=False +): + radius1, radius2 = x + if pos3 is None: + pos2_init = c._get_pos2(c.pos1, c.vec1, c.pos_target, radius1) + pos3_init = c._get_pos2( + c.pos_target, c.vec_target * -1, c.pos1, radius2 + ) + c.pos3 = pos2_init + ( + pos3_init - pos2_init + ) / 2 + c.distances1 = c._get_distances(c.pos1, c.vec1, c.pos3) + + radius_temp1 = get_radius_critical( + radius1, + c.distances1, + c.min_error + ) + if radius_temp1 < c.radius_critical: + c.radius_critical = radius_temp1 + assert c.radius_critical >= 0 + + radius_effective1 = min(radius1, c.radius_critical) + + ( + c.tangent_length, + c.dogleg + ) = min_dist_to_target( + radius_effective1, + c.distances1 + ) + + c.dogleg = check_dogleg(c.dogleg) + + c.dist_curve, c.func_dogleg = get_curve_hold_data( + radius_effective1, + c.dogleg + ) + c.vec3 = get_vec_target( + c.pos1, + c.vec1, + c.pos3, + c.tangent_length, + c.dist_curve, + c.func_dogleg + ) + + tangent_temp1 = c._get_tangent_temp(c.tangent_length) + + c.pos2 = ( + c.pos3 - ( + tangent_temp1 * c.vec3 + ) + ) + + c.distances2 = c._get_distances( + c.pos_target, + c.vec_target * -1, + c.pos2 + ) + + radius_temp2 = get_radius_critical( + radius2, + c.distances2, + c.min_error + ) + if radius_temp2 < c.radius_critical2: + c.radius_critical2 = radius_temp2 + assert c.radius_critical2 >= 0 + + radius_effective2 = min(radius2, c.radius_critical2) + + ( + c.tangent_length2, + c.dogleg2 + ) = min_dist_to_target( + radius_effective2, + c.distances2 + ) + + c.dogleg2 = check_dogleg(c.dogleg2) + + c.dist_curve2, c.func_dogleg2 = get_curve_hold_data( + radius_effective2, + c.dogleg2 + ) + c.vec2 = get_vec_target( + c.pos_target, + c.vec_target * -1, + c.pos2, + c.tangent_length2, + c.dist_curve2, + c.func_dogleg2 + ) + + tangent_temp2 = c._get_tangent_temp(c.tangent_length2) + + c.pos3 = ( + c.pos2 - ( + tangent_temp2 * c.vec2 + ) + ) + + vec23_denom = np.linalg.norm(c.pos3 - c.pos2) + if vec23_denom == 0: + c.vec23.append(np.array([0., 0., 0.])) + else: + c.vec23.append((c.pos3 - c.pos2) / vec23_denom) + + c.error = np.allclose( + c.vec23[-1], + vec_old, + equal_nan=True, + rtol=c.min_error * 10, + atol=c.min_error * 0.1 + ) + + c.errors.append(c.error) + c.pos3_list.append(c.pos3) + c.pos2_list.append(c.pos2) + c.md_target = ( + c.md1 + c.dist_curve + tangent_temp2 + c.dist_curve2 + ) + c.delta_radius_list.append( + abs(c.radius_critical - c.radius_critical2) + ) + c.dls = max( + dls_from_radius(c.radius_design), + dls_from_radius(c.radius_critical) + ) + c.dls2 = max( + dls_from_radius(c.radius_design2), + dls_from_radius(c.radius_critical2) + ) + + if c.error: + if result: + return c + result = 0. + if radius_temp2 < c.radius_design2: + result += c.radius_design2 - radius_temp2 + if radius_temp1 < c.radius_design: + result += c.radius_design - radius_temp1 + return result + else: + c.iterations += 1 + return minimize_target_pos_and_vec_defined( + x, c, c.pos3, c.vec23[-1], result + ) + + +def check_dogleg(dogleg): + # the code assumes angles are positive and clockwise + if dogleg < 0: + dogleg_new = dogleg + 2 * np.pi + return dogleg_new + else: + return dogleg + + +def mod_vec(vec, error=1e-5): + # if it's not working then twat it with a hammer + vec_mod = vec * np.array([1, 1, 1 - error]) + vec_mod /= np.linalg.norm(vec_mod) + inc_mod, azi_mod = get_angles(vec_mod, nev=True).T + + return vec_mod, inc_mod, azi_mod + + +def _get_xyz(pos): + n, e, v = pos + return np.array([e, n, v]).reshape(-1, 3) + + +def get_pos(pos1, vec1, vec2, dist_curve, func_dogleg): + inc1, azi1 = _get_angles(_get_xyz(vec1)).reshape(2) + inc2, azi2 = _get_angles(_get_xyz(vec2)).reshape(2) + + pos2 = pos1 + ( + ( + dist_curve * func_dogleg / 2 + ) + * + np.array([ + np.sin(inc1) * np.cos(azi1) + np.sin(inc2) * np.cos(azi2), + np.sin(inc1) * np.sin(azi1) + np.sin(inc2) * np.sin(azi2), + np.cos(inc1) + np.cos(inc2) + ]).T + ) + + return pos2 + + +def get_vec_target( + pos1, + vec1, + pos_target, + tangent_length, + dist_curve, + func_dogleg +): + if dist_curve == 0: + return vec1 + + vec_target = ( + ( + pos_target - pos1 - ( + dist_curve + * func_dogleg + ) / 2 * vec1 + ) + / + ( + ( + dist_curve * func_dogleg / 2 + ) + tangent_length + ) + ) + + vec_target /= np.linalg.norm(vec_target) + + return vec_target + + +def get_curve_hold_data(radius, dogleg): + dist_curve = radius * dogleg + func_dogleg = shape_factor(dogleg) + + return ( + dist_curve, + func_dogleg + ) + + +def shape_factor(dogleg): + """ + Function to determine the shape factor of a dogleg. + + Parameters + ---------- + dogleg: float + The dogleg angle in radians of a curve section. + """ + if dogleg == 0: + return 0 + else: + return np.tan(dogleg / 2) / (dogleg / 2) + + +def min_dist_to_target(radius, distances): + """ + Calculates the control points for a curve and hold section from the + start position and vector to the target position. + """ + ( + dist_to_target, + dist_perp_to_target, + dist_norm_to_target + ) = distances + + tangent_length = ( + dist_to_target ** 2 + - 2 * radius * dist_norm_to_target + ) ** 0.5 + + # determine the dogleg angle of the curve section + dogleg = 2 * np.arctan2( + (dist_perp_to_target - tangent_length), + ( + 2 * radius - dist_norm_to_target + ) + ) + + return tangent_length, dogleg + + +def min_curve_to_target(distances): + """ + Calculates the control points for a curve section from the start + position and vector to the target position which is not achievable with + the provided dls_design. + """ + if distances == (0., 0., 0,): + return ( + 0., + np.inf, + 0. + ) + + ( + dist_to_target, + dist_perp_to_target, + dist_norm_to_target + ) = distances + + if dist_norm_to_target == 0.: + radius_critical = np.inf + else: + radius_critical = ( + dist_to_target ** 2 / ( + 2 * dist_norm_to_target + ) + ) + assert radius_critical > 0 + + dogleg = ( + 2 * np.arctan2( + dist_norm_to_target, + dist_perp_to_target + ) + ) + + tangent_length = 0 + + return ( + tangent_length, + radius_critical, + dogleg + ) + + +def get_radius_critical(radius, distances, min_error): + ( + dist_to_target, + dist_perp_to_target, + dist_norm_to_target + ) = distances + + if dist_norm_to_target == 0: + return 0 + + radius_critical = ( + dist_to_target ** 2 / ( + 2 * dist_norm_to_target + ) + ) * (1 - min_error) + + assert radius_critical > 0 + + return radius_critical + + +def angle(vec1, vec2, acute=True): + angle = np.arccos( + np.dot(vec1, vec2) + / + ( + np.linalg.norm(vec1) * np.linalg.norm(vec2) + ) + ) + + if acute: + return angle + + else: + return 2 * np.pi - angle + + +def get_dogleg(inc1, azi1, inc2, azi2): + dogleg = ( + 2 * np.arcsin( + ( + np.sin((inc2 - inc1) / 2) ** 2 + + np.sin(inc1) * np.sin(inc2) + * np.sin((azi2 - azi1) / 2) ** 2 + ) ** 0.5 + ) + ) + + return dogleg + + +def interpolate_well(sections, step=30): + """ + Constructs a well survey from a list of sections of control points. + + Parameters + ---------- + sections: list of welleng.connector.Connector objects + step: float (default: 30) + The desired delta measured depth between survey points, in + addition to the control points. + + Returns + ------- + data: list of welleng.connector.Connector objects + """ + method = { + 'hold': get_interpolate_hold, + 'min_dist_to_target': get_interpolate_min_dist_to_target, + 'min_curve_to_target': get_interpolate_min_curve_to_target, + 'curve_hold_curve': get_interpololate_curve_hold_curve, + 'min_curve': get_min_curve + } + + data = [] + if type(sections) is not list: + sections = [sections] + for s in sections: + data.extend(method[s.method](s, step)) + + return data + + +def interpolate_curve( + md1, + pos1, + vec1, + vec2, + dist_curve, + dogleg, + func_dogleg, + step, + endpoint=False +): + # sometimes the curve section has no length + # this if statement handles this event + if dist_curve == 0: + inc, azi = get_angles(vec1, nev=True).T + data = dict( + md=np.array([md1]), + vec=np.array([vec1]), + inc=inc, + azi=azi, + dogleg=np.array([dogleg]) + ) + + return data + + end_md = abs(dist_curve) + if step is None: + md = np.array([0]) + else: + start_md = step - (md1 % step) + md = np.arange(start_md, end_md, step) + md = np.concatenate(([0.], md)) + if endpoint: + md = np.concatenate((md, [end_md])) + dogleg_interp = (dogleg / dist_curve * md).reshape(-1, 1) + + vec = ( + ( + np.sin(dogleg - dogleg_interp) / np.sin(dogleg) * vec1 + ) + + + ( + np.sin(dogleg_interp) / np.sin(dogleg) * vec2 + ) + ) + vec = vec / np.linalg.norm(vec, axis=1).reshape(-1, 1) + inc, azi = get_angles(vec, nev=True).T + + data = dict( + md=md + md1, + vec=vec, + inc=inc, + azi=azi, + dogleg=np.concatenate(( + np.array([0.]), np.diff(dogleg_interp.reshape(-1)) + )), + ) + + return data + + +def interpolate_hold(md1, pos1, vec1, md2, step, endpoint=False): + end_md = md2 - md1 + if step is None: + md = np.array([0]) + else: + start_md = step - (md1 % step) + md = np.arange(start_md, end_md, step) + md = np.concatenate(([0.], md)) + if endpoint: + md = np.concatenate((md, [end_md])) + vec = np.full((len(md), 3), vec1) + inc, azi = get_angles(vec, nev=True).T + dogleg = np.full_like(md, 0.) + + data = dict( + md=md + md1, + vec=vec, + inc=inc, + azi=azi, + dogleg=dogleg, + ) + + return data + + +def get_min_curve(section, step=30, data=None): + if section.md2 is None: + result = ( + get_interpolate_min_curve_to_target( + section, step, data + ) + ) + else: + result = ( + get_interpolate_min_dist_to_target( + section, step, data + ) + ) + return result + + +def get_interpolate_hold(section, step=30, data=None): + if data is None: + data = [] + + data.append(interpolate_hold( + md1=section.md1, + pos1=section.pos1, + vec1=section.vec1, + md2=section.md_target, + step=step, + endpoint=True + )) + + return data + + +def get_interpolate_min_curve_to_target(section, step=30, data=None): + if data is None: + data = [] + + data.append(interpolate_curve( + md1=section.md1, + pos1=section.pos1, + vec1=section.vec1, + vec2=section.vec_target, + dist_curve=section.dist_curve, + dogleg=section.dogleg, + func_dogleg=section.func_dogleg, + step=step, + endpoint=True + )) + + return data + + +def get_interpolate_min_dist_to_target(section, step=30, data=None): + if data is None: + data = [] + + # the first curve section + data.append(interpolate_curve( + md1=section.md1, + pos1=section.pos1, + vec1=section.vec1, + vec2=section.vec2, + dist_curve=section.dist_curve, + dogleg=section.dogleg, + func_dogleg=section.func_dogleg, + step=step + )) + + # the hold section + data.append(interpolate_hold( + md1=section.md2, + pos1=section.pos2, + vec1=section.vec2, + md2=section.md_target, + step=step, + endpoint=True + )) + + return data + + +def get_interpololate_curve_hold_curve(section, step=30, data=None): + if data is None: + data = [] + + # the first curve section + data.append(interpolate_curve( + md1=section.md1, + pos1=section.pos1, + vec1=section.vec1, + vec2=section.vec2, + dist_curve=section.dist_curve, + dogleg=section.dogleg, + func_dogleg=section.func_dogleg, + step=step + )) + + # the hold section + data.append(interpolate_hold( + md1=section.md2, + pos1=section.pos2, + vec1=section.vec2, + md2=section.md3, + step=step + )) + + # the second curve section + data.append(interpolate_curve( + md1=section.md3, + pos1=section.pos3, + vec1=section.vec3, + vec2=section.vec_target, + dist_curve=section.dist_curve2, + dogleg=section.dogleg2, + func_dogleg=section.func_dogleg2, + step=step, + endpoint=True + )) + + return data + + +def convert_target_input_to_booleans(*inputs): + input = [ + "0" if i is None else "1" for i in inputs + ] + + return ''.join(input) + + +def connect_points( + cartesians, vec_start=[0., 0., 1.], dls_design=3.0, nev=True, + # step=30, + md_start=0. +): + """ + Function for connecting a list or array of only Cartesian points. + + Parameters + ---------- + cartesians: (n, 3) list or array of floats + Either [n, e, tvd] (default) or [x, y, z] + vec_start: (3) list or array of floats (default: [0., 0., 1.]) + Unit start vector (default is pointing down) in the nev or xyz + coordinate system. + dls_design: float or (n, 1) list or array of floats (default: 3.0) + The minimum Dog Leg Severity used when attempting to connect the + points (a high DLS will be used if necessary). + nev: bool (default: True) + Indicates whether the cartesians are referencing the [nev] + (default) or [xyz] coordinate system. + step: float (default: 30) + The desired step interval for the returned Survey object. + md_start: float (default: 0) + The md at the first cartesian point (in the event of a tie-on). + + Returns + ------- + data: list of welleng.connector.Connector objects + """ + if nev: + pos_nev = np.array(cartesians).reshape(-1, 3) + vec_nev = np.zeros_like(pos_nev) + vec_nev[0] = np.array(vec_start).reshape(-1, 3) + else: + e, n, tvd = np.array(cartesians).reshape(-1, 3).T + pos_nev = np.array([n, e, tvd]).T + vec_nev = np.zeros_like(pos_nev) + e, n, tvd = np.array(vec_start).reshape(-1, 3).T + vec_nev[0] = np.array([n, e, tvd]).T + + if type(dls_design) is float: + dls = np.full(len(pos_nev), dls_design) + else: + dls = np.array(dls_design).reshape(-1, 1) + + connections = [] + for i, (p, v, d) in enumerate(zip(pos_nev, vec_nev, dls)): + if i == 0: + node_1 = Node( + pos=p, + vec=v, + md=md_start + ) + continue + if i > 1: + node_1 = connections[-1].node_end + node_2 = Node( + pos=p + ) + c = Connector( + node1=node_1, + node2=node_2, + dls_design=d + ) + assert np.allclose(c.pos_target, p) + connections.append(c) + + return connections + + +def survey_to_plan(survey, tolerance=0.2, dls_design=1., step=30.): + """ + Prototype function for extracting a plan from a drilled well survey - a + minimal number of control points (begining/end points of either hold or + build/turn sections) required to express the well given the provided input + parameters. + + Parameters + ---------- + survey: welleng.survey.Survey object + tolerance: float (default=0.2) + Defines how tight to fit the planned well to the survey - a higher + number will results in less control points but likely poorer fit. + dls_design: float (default=1.0) + The minimum DLS used to fit the planned trajectory. + step: float (default=30) + The desired md step in the plan survey. + + Returns + ------- + sections: list of welleng.connector.Connection objects + """ + assert dls_design > 0., "dls_design must be greater than 0" + + idx = [0] + end = len(survey.md) - 1 + md = survey.md[0] + node = None + sections = [] + + while True: + section, i = _get_section( + survey=survey, + start=idx[-1], + md=md, + node=node, + tolerance=tolerance, + dls_design=dls_design + ) + sections.append(section) + idx.append(i) + if idx[-1] >= end: + break + node = section.node_end + + # data = interpolate_well(sections, step=step) + + return sections + + +def _get_section( + survey, start, tolerance, dls_design=1., md=0., node=None +): + idx = start + 2 + nev = survey.get_nev_arr() + + if idx > len(nev): + idx = len(nev) - 1 + + if node is None: + node_1 = Node( + pos=nev[start], + vec=survey.vec_nev[start], + md=md + ) + else: + node_1 = node + + scores = [0.] + delta_scores = [] + c_old = None + + for i, (p, v) in enumerate(zip(nev[idx:], survey.vec_nev[idx:])): + node_2 = Node( + pos=p, + vec=v, + ) + c = Connector( + node1=node_1, + node2=node_2, + dls_design=dls_design + ) + s = c.survey(step=1.) + if c_old is None: + c_old = deepcopy(c) + nev_new = s.get_nev_arr() + + distances = distance.cdist( + nev[start: idx + i], + nev_new + ) + + score = np.sum(np.amin(distances, axis=1)) / (s.md[-1] - s.md[0]) + + delta_scores.append(score - scores[-1]) + scores.append(score) + + if all(( + abs(delta_scores[-1]) >= tolerance, + idx + i < len(nev) - 2 + )): + break + elif idx + i == len(nev) - 1: + c_old = deepcopy(c) + break + else: + c_old = deepcopy(c) + + return ( + c_old, + idx + i - 1 if idx + i != len(nev) - 1 else idx + i + ) + + +def numbafy(functions): + for f in functions: + f = njit(f) + + +if NUMBA: + NUMBAFY = ( + _get_xyz, + get_pos, + get_vec_target, + get_curve_hold_data, + shape_factor, + min_dist_to_target, + get_radius_critical, + angle, + get_dogleg + ) + numbafy(NUMBAFY) diff --git a/welleng/utils.py b/welleng/utils.py index 2ecf3ab..ca7956d 100644 --- a/welleng/utils.py +++ b/welleng/utils.py @@ -1,673 +1,673 @@ -import numpy as np -from scipy.spatial.transform import Rotation as R - -from numpy.typing import ArrayLike, NDArray -from typing import Any, Annotated, Literal, Union, Tuple - -try: - from numba import njit - NUMBA = True -except ImportError: - NUMBA = False - - -class MinCurve: - def __init__( - self, - md, - inc, - azi, - start_xyz=[0., 0., 0.], - unit="meters" - ): - """ - Generate geometric data from a well bore survey. - - Parameters - ---------- - md: list or 1d array of floats - Measured depth along well path from a datum. - inc: list or 1d array of floats - Well path inclincation (relative to z/tvd axis where 0 - indicates down), in radians. - azi: list or 1d array of floats - Well path azimuth (relative to y/North axis), - in radians. - unit: str - Either "meters" or "feet" to determine the unit of the dogleg - severity. - - """ - assert unit == "meters" or unit == "feet", ( - 'Unknown unit, please select "meters" of "feet"' - ) - - self.md = md - survey_length = len(self.md) - assert survey_length > 1, "Survey must have at least two rows" - - self.inc = inc - self.azi = azi - self.start_xyz = start_xyz - self.unit = unit - - # make two slices with a difference or 1 index to enable array - # calculations - md_1 = md[:-1] - md_2 = md[1:] - - inc_1 = inc[:-1] - inc_2 = inc[1:] - - azi_1 = azi[:-1] - azi_2 = azi[1:] - - # calculate the dogleg - # temp = np.arccos( - # np.cos(inc_2 - inc_1) - # - (np.sin(inc_1) * np.sin(inc_2)) - # * (1 - np.cos(azi_2 - azi_1)) - # ) - - self.dogleg = np.zeros(survey_length) - self.dogleg[1:] = get_dogleg(inc_1, azi_1, inc_2, azi_2) - - # calculate rf and assume rf is 1 where dogleg is 0 - self.rf = np.ones(survey_length) - idx = np.where(self.dogleg != 0) - self.rf[idx] = 2 / self.dogleg[idx] * np.tan(self.dogleg[idx] / 2) - - # calculate the change in md between survey stations - temp = np.array(md_2) - np.array(md_1) - self.delta_md = np.zeros(survey_length) - self.delta_md[1:] = temp - - # calculate change in y direction (north) - temp = ( - self.delta_md[1:] - / 2 - * ( - np.sin(inc_1) * np.cos(azi_1) - + np.sin(inc_2) * np.cos(azi_2) - ) - * self.rf[1:] - ) - self.delta_y = np.zeros(survey_length) - self.delta_y[1:] = temp - - # calculate change in x direction (east) - temp = ( - self.delta_md[1:] - / 2 - * ( - np.sin(inc_1) * np.sin(azi_1) - + np.sin(inc_2) * np.sin(azi_2) - ) - * self.rf[1:] - ) - self.delta_x = np.zeros(survey_length) - self.delta_x[1:] = temp - - # calculate change in z direction - temp = ( - self.delta_md[1:] - / 2 - * (np.cos(inc_1) + np.cos(inc_2)) - * self.rf[1:] - ) - self.delta_z = np.zeros(survey_length) - self.delta_z[1:] = temp - - # calculate the dog leg severity - with np.errstate(divide='ignore', invalid='ignore'): - temp = np.degrees(self.dogleg[1:]) / self.delta_md[1:] - self.dls = np.zeros(survey_length) - mask = np.where(temp != np.nan) - self.dls[1:][mask] = temp[mask] - - if unit == "meters": - self.dls *= 30 - else: - self.dls *= 100 - - # cumulate the coordinates and add surface coordinates - self.poss = np.vstack( - np.cumsum( - np.array([self.delta_x, self.delta_y, self.delta_z]).T, axis=0 - ) + self.start_xyz - ) - - -def get_dogleg(inc1, azi1, inc2, azi2): - dogleg = np.arccos( - np.cos(inc2 - inc1) - - (np.sin(inc1) * np.sin(inc2)) - * (1 - np.cos(azi2 - azi1)) - ) - return dogleg - - -def get_vec(inc, azi, nev=False, r=1, deg=True): - """ - Convert inc and azi into a vector. - - Params: - inc: array of n floats - Inclination relative to the z-axis (up) - azi: array of n floats - Azimuth relative to the y-axis - r: float or array of n floats - Scalar to return a scaled vector - - Returns: - An (n,3) array of vectors - """ - if deg: - inc_rad, azi_rad = np.radians(np.array([inc, azi])) - else: - inc_rad = inc - azi_rad = azi - y = r * np.sin(inc_rad) * np.cos(azi_rad) - x = r * np.sin(inc_rad) * np.sin(azi_rad) - z = r * np.cos(inc_rad) - - if nev: - vec = np.array([y, x, z]).T - else: - vec = np.array([x, y, z]).T - - return vec / np.linalg.norm(vec, axis=-1).reshape(-1, 1) - - -def get_nev( - pos, start_xyz=np.array([0., 0., 0.]), start_nev=np.array([0., 0., 0.]) -): - """ - Convert [x, y, z] coordinates to [n, e, tvd] coordinates. - - Params: - pos: (n,3) array of floats - Array of [x, y, z] coordinates - start_xyz: (,3) array of floats - The datum of the [x, y, z] cooardinates - start_nev: (,3) array of floats - The datum of the [n, e, tvd] coordinates - - Returns: - An (n,3) array of [n, e, tvd] coordinates. - """ - # e, n, v = ( - # np.array([pos]).reshape(-1,3) - np.array([start_xyz]) - # ).T - e, n, v = ( - np.array([pos]).reshape(-1, 3) - np.array([start_xyz]) - ).T - - return (np.array([n, e, v]).T + np.array([start_nev])) - - -def get_xyz(pos, start_xyz=[0., 0., 0.], start_nev=[0., 0., 0.]): - y, x, z = ( - np.array([pos]).reshape(-1, 3) - np.array([start_nev]) - ).T - - return (np.array([x, y, z]).T + np.array([start_xyz])) - - -def _get_angles(vec): - xy = vec[:, 0] ** 2 + vec[:, 1] ** 2 - inc = np.arctan2(np.sqrt(xy), vec[:, 2]) # for elevation angle defined from Z-axis down - azi = (np.arctan2(vec[:, 0], vec[:, 1]) + (2 * np.pi)) % (2 * np.pi) - - return np.stack((inc, azi), axis=1) - - -if NUMBA: - _get_angles = njit(_get_angles) - - -def get_angles( - vec: Annotated[NDArray, Literal["N", 3]], nev: bool = False -): - ''' - Determines the inclination and azimuth from a vector. - - Parameters - ---------- - vec: (n,3) array of floats - nev: boolean (default: False) - Indicates if the vector is in (x,y,z) or (n,e,v) coordinates. - - Returns - ------- - [inc, azi]: (n,2) array of floats - A numpy array of incs and axis in radians - - ''' - # make sure it's a unit vector - vec = vec / np.linalg.norm(vec, axis=-1).reshape(-1, 1) - vec = vec.reshape(-1, 3) - - # if it's nev then need to do the shuffle - if nev: - y, x, z = vec.T - vec = np.array([x, y, z]).T - - return _get_angles(vec) - - -def _get_transform(inc, azi): - trans = np.array([ - [np.cos(inc) * np.cos(azi), -np.sin(azi), np.sin(inc) * np.cos(azi)], - [np.cos(inc) * np.sin(azi), np.cos(azi), np.sin(inc) * np.sin(azi)], - [-np.sin(inc), np.zeros_like(inc), np.cos(inc)] - ]).T - - return trans - - -def get_transform( - survey -): - """ - Determine the transform for transforming between NEV and HLA coordinate - systems. - - Params: - survey: (n,3) array of floats - The [md, inc, azi] survey listing array. - - Returns: - transform: (n,3,3) array of floats - """ - survey = survey.reshape(-1, 3) - inc = np.array(survey[:, 1]) - azi = np.array(survey[:, 2]) - - return _get_transform(inc, azi) - - # trans = np.array([ - # [np.cos(inc) * np.cos(azi), -np.sin(azi), np.sin(inc) * np.cos(azi)], - # [np.cos(inc) * np.sin(azi), np.cos(azi), np.sin(inc) * np.sin(azi)], - # [-np.sin(inc), np.zeros_like(inc), np.cos(inc)] - # ]).T - - # return trans - - -def NEV_to_HLA( - survey: Annotated[NDArray, Literal["N", 3]], - NEV: Union[ - Annotated[NDArray, Literal["N", 3]], - Annotated[NDArray, Literal[3, 3, "N"]] - ], - cov: bool = True -) -> Union[ - Annotated[NDArray, Literal['..., 3']], - Annotated[NDArray, Literal['3, 3, ...']] -]: - """ - Transform from NEV to HLA coordinate system. - - Parameters: - ----------- - survey: (n,3) array of floats - The [md, inc, azi] survey listing array. - NEV: (d,3) or (3,3,d) array of floats - The NEV coordinates or covariance matrices. - cov: boolean - If cov is True then a (3,3,d) array of covariance matrices - is expected, else a (d,3) array of coordinates. - - Returns: - -------- - HLAs: NDArray - Either a transformed (n,3) array of HLA coordinates or an - (3,3,n) array of HLA covariance matrices. - """ - - trans = get_transform(survey) - - if cov: - # HLAs = [ - # np.dot(np.dot(t, NEV.T[i]), t.T) for i, t in enumerate(trans) - # ] - # HLAs = np.vstack(HLAs).reshape(-1, 3, 3).T - - HLAs = np.einsum( - '...ik,...jk', - np.einsum( - '...ik,...jk', trans, NEV.T - ), - trans - ).T - - else: - NEV = NEV.reshape(-1, 3) - # HLAs = [ - # np.dot(NEV[i], t.T) for i, t in enumerate(trans) - # ] - - HLAs = np.einsum( - '...k,...jk', NEV, trans - ) - - return HLAs - - -def HLA_to_NEV(survey, HLA, cov=True, trans=None): - if trans is None: - trans = get_transform(survey) - - if cov: - # NEVs = [ - # np.dot(np.dot(t.T, HLA.T[i]), t) for i, t in enumerate(trans) - # ] - # NEVs = np.vstack(NEVs).reshape(-1, 3, 3).T - - NEVs = np.einsum( - '...ik,jk...', - np.einsum( - '...ki,...jk', trans, HLA.T - ), - trans.T - ).T - - else: - # NEVs = [ - # np.dot(hla, t) for hla, t in zip(HLA, trans) - # ] - - NEVs = np.einsum( - 'k...,jk...', HLA.T, trans.T - ) - - # return np.vstack(NEVs).reshape(HLA.shape) - - return np.swapaxes(NEVs, 0, 1) - - -def get_sigmas(cov, long=False): - """ - Extracts the sigma values of a covariance matrix along the principle axii. - - Parameters - ---------- - cov: (n,3,3) array of floats - - Returns - ------- - arr: (n,3) array of floats - """ - - assert cov.shape[-2:] == (3, 3), "Cov is the wrong shape" - - cov = cov.reshape(-1, 3, 3) - - aa, ab, ac = cov[:, :, 0].T - ab, bb, bc = cov[:, :, 1].T - ac, bc, cc = cov[:, :, 2].T - - if long: - return (aa, bb, cc, ab, ac, bc) - else: - return (np.sqrt(aa), np.sqrt(bb), np.sqrt(cc)) - - # a, b, c = np.array([ - # np.sqrt(cov[:, 0, 0]), - # np.sqrt(cov[:, 1, 1]), - # np.sqrt(cov[:, 2, 2]) - # ]) - - # return (a, b, c) - - -def get_unit_vec(vec): - vec = vec / np.linalg.norm(vec) - - return vec - - -def linear_convert(data, factor): - flag = False - if type(data) != list: - flag = True - data = [data] - converted = [d * factor if d is not None else None for d in data] - if flag: - return converted[0] - else: - return converted - - -def make_cov(a, b, c, long=False): - # a, b, c = np.sqrt(np.array([a, b, c])) - if long: - cov = np.array([ - [a * a, a * b, a * c], - [a * b, b * b, b * c], - [a * c, b * c, c * c] - ]) - - else: - cov = np.array([ - [a * a, np.zeros_like(a), np.zeros_like(a)], - [np.zeros_like(a), b * b, np.zeros_like(a)], - [np.zeros_like(a), np.zeros_like(a), c * c] - ]) - - return cov.T - - -def dls_from_radius(radius): - """ - Returns the dls in degrees from a radius. - """ - if isinstance(radius, np.ndarray): - circumference = np.full_like(radius, np.inf) - circumference = np.where( - radius != 0, - 2 * np.pi * radius, - circumference - ) - else: - if radius == 0: - return np.inf - circumference = 2 * np.pi * radius - dls = 360 / circumference * 30 - - return dls - - -def radius_from_dls(dls): - """ - Returns the radius in meters from a DLS in deg/30m. - """ - if isinstance(dls, np.ndarray): - circumference = np.full_like(dls, np.inf) - circumference = np.where( - dls != 0, - (30 / dls) * 360, - circumference - ) - else: - if dls == 0: - return np.inf - circumference = (30 / dls) * 360 - radius = circumference / (2 * np.pi) - - return radius - - -def errors_from_cov(cov, data=False): - """ - Parameters - ---------- - cov: (n, 3, 3) array - The error covariance matrices. - data: bool (default: False) - If True returns a dictionary, else returns a list. - """ - nn, ne, nv, _, ee, ev, _, _, vv = ( - cov.reshape(-1, 9).T - ) - - if data: - return { - i: { - 'nn': _nn, 'ne': _ne, 'nv': _nv, 'ee': _ee, 'ev': _ev, 'vv': _vv - } - for i, (_nn, _ne, _nv, _ee, _ev, _vv) - in enumerate(zip(nn, ne, nv, ee, ev, vv)) - } - - return np.array([nn, ne, nv, ee, ev, vv]).T - - -class Arc: - def __init__(self, dogleg, radius): - """ - Generates a generic arc that can be transformed with a specific pos - and vec via a transform method. The arc is initialized at a local - origin and kicks off down and to the north (assuming an NEV coordinate - system). - - Parameters - ---------- - dogleg: float - The sweep angle of the arc in radians. - radius: float - The radius of the arc in meters. - - Returns - ------- - arc: Arc object - """ - self.dogleg = dogleg - self.radius = radius - self.delta_md = dogleg * radius - - self.pos = np.array([ - np.cos(dogleg), - 0., - np.sin(dogleg) - ]) * radius - self.pos[0] = radius - self.pos[0] - - self.vec = np.array([ - np.sin(dogleg), - 0., - np.cos(dogleg) - ]) - - def transform(self, toolface, pos=None, vec=None, target=False): - """ - Transforms an Arc to a position and orientation. - - Parameters - ---------- - pos: (,3) array - The desired position to transform the arc. - vec: (,3) array - The orientation unit vector to transform the arc. - target: bool - If true, returned arc vector is reversed. - - Returns - ------- - tuple (pos_new, vec_new) - pos_new: (,3) array - The position at the end of the arc post transform. - vec_new: (,3) array - The unit vector at the end of the arc post transform. - """ - if vec is None: - vec = np.array([0., 0., 1.]) - if target: - vec *= -1 - inc, azi = get_angles(vec, nev=True).reshape(2) - angles = [ - toolface, - inc, - azi - ] - r = R.from_euler('zyz', angles, degrees=False) - - pos_new, vec_new = r.apply(np.vstack((self.pos, self.vec))) - - if pos is not None: - pos_new += pos - if target: - vec_new *= -1 - - return (pos_new, vec_new) - - -def get_arc(dogleg, radius, toolface, pos=None, vec=None, target=False): - """ - Creates an Arc instance and transforms it to the desired position and - orientation. - - Parameters - ---------- - dogleg: float - The swept angle of the arc (arc angle) in radians. - radius: float - The radius of the arc (in meters). - toolface: float - The toolface angle in radians (relative to the high side) to rotate the - arc at the desired position and orientation. - pos: (,3) array - The desired position to transform the arc. - vec: (,3) array - The orientation unit vector to transform the arc. - target: bool - If true, returned arc vector is reversed. - - Returns - ------- - tuple of (pos_new, vec_new, arc.delta_md) - pos_new: (,3) array - The position at the end of the arc post transform. - vec_new: (,3) array - The unit vector at the end of the arc post transform. - arc.delta_md: int - The arc length of the arc. - """ - arc = Arc(dogleg, radius) - pos_new, vec_new = arc.transform(toolface, pos, vec, target) - - return (pos_new, vec_new, arc.delta_md) - - -def annular_volume(od: float, id: float = None, length: float = None): - """ - Calculate an annular volume. - - If no ``id`` is provided then circular volume is calculated. If no - ``length`` is provided, then the unit volume is calculated (i.e. the - area). - - Units are assumed consistent across input parameters, i.e. the - calculation is dimensionless. - - Parameters - ---------- - od: float - The outer diameter. - id: float | None, optional - The inner diameter, default is 0. - length : float | None, optional - The length of the annulus. - - Returns - ------- - annular_volume: float - The (unit) volume of the annulus or cylinder. - """ - length = 1 if length is None else length - id = 0 if id is None else id - annular_unit_volume = (np.pi * (od - id)**2) / 4 - annular_volume = annular_unit_volume * length - - return annular_volume +from typing import Annotated, Literal, Union + +import numpy as np +from numpy.typing import NDArray +from scipy.spatial.transform import Rotation as R + +try: + from numba import njit + NUMBA = True +except ImportError: + NUMBA = False + + +class MinCurve: + def __init__( + self, + md, + inc, + azi, + start_xyz=[0., 0., 0.], + unit="meters" + ): + """ + Generate geometric data from a well bore survey. + + Parameters + ---------- + md: list or 1d array of floats + Measured depth along well path from a datum. + inc: list or 1d array of floats + Well path inclincation (relative to z/tvd axis where 0 + indicates down), in radians. + azi: list or 1d array of floats + Well path azimuth (relative to y/North axis), + in radians. + unit: str + Either "meters" or "feet" to determine the unit of the dogleg + severity. + + """ + assert unit == "meters" or unit == "feet", ( + 'Unknown unit, please select "meters" of "feet"' + ) + + self.md = md + survey_length = len(self.md) + assert survey_length > 1, "Survey must have at least two rows" + + self.inc = inc + self.azi = azi + self.start_xyz = start_xyz + self.unit = unit + + # make two slices with a difference or 1 index to enable array + # calculations + md_1 = md[:-1] + md_2 = md[1:] + + inc_1 = inc[:-1] + inc_2 = inc[1:] + + azi_1 = azi[:-1] + azi_2 = azi[1:] + + # calculate the dogleg + # temp = np.arccos( + # np.cos(inc_2 - inc_1) + # - (np.sin(inc_1) * np.sin(inc_2)) + # * (1 - np.cos(azi_2 - azi_1)) + # ) + + self.dogleg = np.zeros(survey_length) + self.dogleg[1:] = get_dogleg(inc_1, azi_1, inc_2, azi_2) + + # calculate rf and assume rf is 1 where dogleg is 0 + self.rf = np.ones(survey_length) + idx = np.where(self.dogleg != 0) + self.rf[idx] = 2 / self.dogleg[idx] * np.tan(self.dogleg[idx] / 2) + + # calculate the change in md between survey stations + temp = np.array(md_2) - np.array(md_1) + self.delta_md = np.zeros(survey_length) + self.delta_md[1:] = temp + + # calculate change in y direction (north) + temp = ( + self.delta_md[1:] + / 2 + * ( + np.sin(inc_1) * np.cos(azi_1) + + np.sin(inc_2) * np.cos(azi_2) + ) + * self.rf[1:] + ) + self.delta_y = np.zeros(survey_length) + self.delta_y[1:] = temp + + # calculate change in x direction (east) + temp = ( + self.delta_md[1:] + / 2 + * ( + np.sin(inc_1) * np.sin(azi_1) + + np.sin(inc_2) * np.sin(azi_2) + ) + * self.rf[1:] + ) + self.delta_x = np.zeros(survey_length) + self.delta_x[1:] = temp + + # calculate change in z direction + temp = ( + self.delta_md[1:] + / 2 + * (np.cos(inc_1) + np.cos(inc_2)) + * self.rf[1:] + ) + self.delta_z = np.zeros(survey_length) + self.delta_z[1:] = temp + + # calculate the dog leg severity + with np.errstate(divide='ignore', invalid='ignore'): + temp = np.degrees(self.dogleg[1:]) / self.delta_md[1:] + self.dls = np.zeros(survey_length) + mask = np.where(temp != np.nan) + self.dls[1:][mask] = temp[mask] + + if unit == "meters": + self.dls *= 30 + else: + self.dls *= 100 + + # cumulate the coordinates and add surface coordinates + self.poss = np.vstack( + np.cumsum( + np.array([self.delta_x, self.delta_y, self.delta_z]).T, axis=0 + ) + self.start_xyz + ) + + +def get_dogleg(inc1, azi1, inc2, azi2): + dogleg = np.arccos( + np.cos(inc2 - inc1) + - (np.sin(inc1) * np.sin(inc2)) + * (1 - np.cos(azi2 - azi1)) + ) + return dogleg + + +def get_vec(inc, azi, nev=False, r=1, deg=True): + """ + Convert inc and azi into a vector. + + Params: + inc: array of n floats + Inclination relative to the z-axis (up) + azi: array of n floats + Azimuth relative to the y-axis + r: float or array of n floats + Scalar to return a scaled vector + + Returns: + An (n,3) array of vectors + """ + if deg: + inc_rad, azi_rad = np.radians(np.array([inc, azi])) + else: + inc_rad = inc + azi_rad = azi + y = r * np.sin(inc_rad) * np.cos(azi_rad) + x = r * np.sin(inc_rad) * np.sin(azi_rad) + z = r * np.cos(inc_rad) + + if nev: + vec = np.array([y, x, z]).T + else: + vec = np.array([x, y, z]).T + + return vec / np.linalg.norm(vec, axis=-1).reshape(-1, 1) + + +def get_nev( + pos, start_xyz=np.array([0., 0., 0.]), start_nev=np.array([0., 0., 0.]) +): + """ + Convert [x, y, z] coordinates to [n, e, tvd] coordinates. + + Params: + pos: (n,3) array of floats + Array of [x, y, z] coordinates + start_xyz: (,3) array of floats + The datum of the [x, y, z] cooardinates + start_nev: (,3) array of floats + The datum of the [n, e, tvd] coordinates + + Returns: + An (n,3) array of [n, e, tvd] coordinates. + """ + # e, n, v = ( + # np.array([pos]).reshape(-1,3) - np.array([start_xyz]) + # ).T + e, n, v = ( + np.array([pos]).reshape(-1, 3) - np.array([start_xyz]) + ).T + + return (np.array([n, e, v]).T + np.array([start_nev])) + + +def get_xyz(pos, start_xyz=[0., 0., 0.], start_nev=[0., 0., 0.]): + y, x, z = ( + np.array([pos]).reshape(-1, 3) - np.array([start_nev]) + ).T + + return (np.array([x, y, z]).T + np.array([start_xyz])) + + +def _get_angles(vec): + xy = vec[:, 0] ** 2 + vec[:, 1] ** 2 + inc = np.arctan2(np.sqrt(xy), vec[:, 2]) # for elevation angle defined from Z-axis down + azi = (np.arctan2(vec[:, 0], vec[:, 1]) + (2 * np.pi)) % (2 * np.pi) + + return np.stack((inc, azi), axis=1) + + +if NUMBA: + _get_angles = njit(_get_angles) + + +def get_angles( + vec: Annotated[NDArray, Literal["N", 3]], nev: bool = False +): + ''' + Determines the inclination and azimuth from a vector. + + Parameters + ---------- + vec: (n,3) array of floats + nev: boolean (default: False) + Indicates if the vector is in (x,y,z) or (n,e,v) coordinates. + + Returns + ------- + [inc, azi]: (n,2) array of floats + A numpy array of incs and axis in radians + + ''' + # make sure it's a unit vector + vec = vec / np.linalg.norm(vec, axis=-1).reshape(-1, 1) + vec = vec.reshape(-1, 3) + + # if it's nev then need to do the shuffle + if nev: + y, x, z = vec.T + vec = np.array([x, y, z]).T + + return _get_angles(vec) + + +def _get_transform(inc, azi): + trans = np.array([ + [np.cos(inc) * np.cos(azi), -np.sin(azi), np.sin(inc) * np.cos(azi)], + [np.cos(inc) * np.sin(azi), np.cos(azi), np.sin(inc) * np.sin(azi)], + [-np.sin(inc), np.zeros_like(inc), np.cos(inc)] + ]).T + + return trans + + +def get_transform( + survey +): + """ + Determine the transform for transforming between NEV and HLA coordinate + systems. + + Params: + survey: (n,3) array of floats + The [md, inc, azi] survey listing array. + + Returns: + transform: (n,3,3) array of floats + """ + survey = survey.reshape(-1, 3) + inc = np.array(survey[:, 1]) + azi = np.array(survey[:, 2]) + + return _get_transform(inc, azi) + + # trans = np.array([ + # [np.cos(inc) * np.cos(azi), -np.sin(azi), np.sin(inc) * np.cos(azi)], + # [np.cos(inc) * np.sin(azi), np.cos(azi), np.sin(inc) * np.sin(azi)], + # [-np.sin(inc), np.zeros_like(inc), np.cos(inc)] + # ]).T + + # return trans + + +def NEV_to_HLA( + survey: Annotated[NDArray, Literal["N", 3]], + NEV: Union[ + Annotated[NDArray, Literal["N", 3]], + Annotated[NDArray, Literal[3, 3, "N"]] + ], + cov: bool = True +) -> Union[ + Annotated[NDArray, Literal['..., 3']], + Annotated[NDArray, Literal['3, 3, ...']] +]: + """ + Transform from NEV to HLA coordinate system. + + Parameters: + ----------- + survey: (n,3) array of floats + The [md, inc, azi] survey listing array. + NEV: (d,3) or (3,3,d) array of floats + The NEV coordinates or covariance matrices. + cov: boolean + If cov is True then a (3,3,d) array of covariance matrices + is expected, else a (d,3) array of coordinates. + + Returns: + -------- + HLAs: NDArray + Either a transformed (n,3) array of HLA coordinates or an + (3,3,n) array of HLA covariance matrices. + """ + + trans = get_transform(survey) + + if cov: + # HLAs = [ + # np.dot(np.dot(t, NEV.T[i]), t.T) for i, t in enumerate(trans) + # ] + # HLAs = np.vstack(HLAs).reshape(-1, 3, 3).T + + HLAs = np.einsum( + '...ik,...jk', + np.einsum( + '...ik,...jk', trans, NEV.T + ), + trans + ).T + + else: + NEV = NEV.reshape(-1, 3) + # HLAs = [ + # np.dot(NEV[i], t.T) for i, t in enumerate(trans) + # ] + + HLAs = np.einsum( + '...k,...jk', NEV, trans + ) + + return HLAs + + +def HLA_to_NEV(survey, HLA, cov=True, trans=None): + if trans is None: + trans = get_transform(survey) + + if cov: + # NEVs = [ + # np.dot(np.dot(t.T, HLA.T[i]), t) for i, t in enumerate(trans) + # ] + # NEVs = np.vstack(NEVs).reshape(-1, 3, 3).T + + NEVs = np.einsum( + '...ik,jk...', + np.einsum( + '...ki,...jk', trans, HLA.T + ), + trans.T + ).T + + else: + # NEVs = [ + # np.dot(hla, t) for hla, t in zip(HLA, trans) + # ] + + NEVs = np.einsum( + 'k...,jk...', HLA.T, trans.T + ) + + # return np.vstack(NEVs).reshape(HLA.shape) + + return np.swapaxes(NEVs, 0, 1) + + +def get_sigmas(cov, long=False): + """ + Extracts the sigma values of a covariance matrix along the principle axii. + + Parameters + ---------- + cov: (n,3,3) array of floats + + Returns + ------- + arr: (n,3) array of floats + """ + + assert cov.shape[-2:] == (3, 3), "Cov is the wrong shape" + + cov = cov.reshape(-1, 3, 3) + + aa, ab, ac = cov[:, :, 0].T + ab, bb, bc = cov[:, :, 1].T + ac, bc, cc = cov[:, :, 2].T + + if long: + return (aa, bb, cc, ab, ac, bc) + else: + return (np.sqrt(aa), np.sqrt(bb), np.sqrt(cc)) + + # a, b, c = np.array([ + # np.sqrt(cov[:, 0, 0]), + # np.sqrt(cov[:, 1, 1]), + # np.sqrt(cov[:, 2, 2]) + # ]) + + # return (a, b, c) + + +def get_unit_vec(vec): + vec = vec / np.linalg.norm(vec) + + return vec + + +def linear_convert(data, factor): + flag = False + if type(data) != list: + flag = True + data = [data] + converted = [d * factor if d is not None else None for d in data] + if flag: + return converted[0] + else: + return converted + + +def make_cov(a, b, c, long=False): + # a, b, c = np.sqrt(np.array([a, b, c])) + if long: + cov = np.array([ + [a * a, a * b, a * c], + [a * b, b * b, b * c], + [a * c, b * c, c * c] + ]) + + else: + cov = np.array([ + [a * a, np.zeros_like(a), np.zeros_like(a)], + [np.zeros_like(a), b * b, np.zeros_like(a)], + [np.zeros_like(a), np.zeros_like(a), c * c] + ]) + + return cov.T + + +def dls_from_radius(radius): + """ + Returns the dls in degrees from a radius. + """ + if isinstance(radius, np.ndarray): + circumference = np.full_like(radius, np.inf) + circumference = np.where( + radius != 0, + 2 * np.pi * radius, + circumference + ) + else: + if radius == 0: + return np.inf + circumference = 2 * np.pi * radius + dls = 360 / circumference * 30 + + return dls + + +def radius_from_dls(dls): + """ + Returns the radius in meters from a DLS in deg/30m. + """ + if isinstance(dls, np.ndarray): + circumference = np.full_like(dls, np.inf) + circumference = np.where( + dls != 0, + (30 / dls) * 360, + circumference + ) + else: + if dls == 0: + return np.inf + circumference = (30 / dls) * 360 + radius = circumference / (2 * np.pi) + + return radius + + +def errors_from_cov(cov, data=False): + """ + Parameters + ---------- + cov: (n, 3, 3) array + The error covariance matrices. + data: bool (default: False) + If True returns a dictionary, else returns a list. + """ + nn, ne, nv, _, ee, ev, _, _, vv = ( + cov.reshape(-1, 9).T + ) + + if data: + return { + i: { + 'nn': _nn, 'ne': _ne, 'nv': _nv, 'ee': _ee, 'ev': _ev, 'vv': _vv + } + for i, (_nn, _ne, _nv, _ee, _ev, _vv) + in enumerate(zip(nn, ne, nv, ee, ev, vv)) + } + + return np.array([nn, ne, nv, ee, ev, vv]).T + + +class Arc: + def __init__(self, dogleg, radius): + """ + Generates a generic arc that can be transformed with a specific pos + and vec via a transform method. The arc is initialized at a local + origin and kicks off down and to the north (assuming an NEV coordinate + system). + + Parameters + ---------- + dogleg: float + The sweep angle of the arc in radians. + radius: float + The radius of the arc in meters. + + Returns + ------- + arc: Arc object + """ + self.dogleg = dogleg + self.radius = radius + self.delta_md = dogleg * radius + + self.pos = np.array([ + np.cos(dogleg), + 0., + np.sin(dogleg) + ]) * radius + self.pos[0] = radius - self.pos[0] + + self.vec = np.array([ + np.sin(dogleg), + 0., + np.cos(dogleg) + ]) + + def transform(self, toolface, pos=None, vec=None, target=False): + """ + Transforms an Arc to a position and orientation. + + Parameters + ---------- + pos: (,3) array + The desired position to transform the arc. + vec: (,3) array + The orientation unit vector to transform the arc. + target: bool + If true, returned arc vector is reversed. + + Returns + ------- + tuple (pos_new, vec_new) + pos_new: (,3) array + The position at the end of the arc post transform. + vec_new: (,3) array + The unit vector at the end of the arc post transform. + """ + if vec is None: + vec = np.array([0., 0., 1.]) + if target: + vec *= -1 + inc, azi = get_angles(vec, nev=True).reshape(2) + angles = [ + toolface, + inc, + azi + ] + r = R.from_euler('zyz', angles, degrees=False) + + pos_new, vec_new = r.apply(np.vstack((self.pos, self.vec))) + + if pos is not None: + pos_new += pos + if target: + vec_new *= -1 + + return (pos_new, vec_new) + + +def get_arc(dogleg, radius, toolface, pos=None, vec=None, target=False): + """ + Creates an Arc instance and transforms it to the desired position and + orientation. + + Parameters + ---------- + dogleg: float + The swept angle of the arc (arc angle) in radians. + radius: float + The radius of the arc (in meters). + toolface: float + The toolface angle in radians (relative to the high side) to rotate the + arc at the desired position and orientation. + pos: (,3) array + The desired position to transform the arc. + vec: (,3) array + The orientation unit vector to transform the arc. + target: bool + If true, returned arc vector is reversed. + + Returns + ------- + tuple of (pos_new, vec_new, arc.delta_md) + pos_new: (,3) array + The position at the end of the arc post transform. + vec_new: (,3) array + The unit vector at the end of the arc post transform. + arc.delta_md: int + The arc length of the arc. + """ + arc = Arc(dogleg, radius) + pos_new, vec_new = arc.transform(toolface, pos, vec, target) + + return (pos_new, vec_new, arc.delta_md) + + +def annular_volume(od: float, id: float = None, length: float = None): + """ + Calculate an annular volume. + + If no ``id`` is provided then circular volume is calculated. If no + ``length`` is provided, then the unit volume is calculated (i.e. the + area). + + Units are assumed consistent across input parameters, i.e. the + calculation is dimensionless. + + Parameters + ---------- + od: float + The outer diameter. + id: float | None, optional + The inner diameter, default is 0. + length : float | None, optional + The length of the annulus. + + Returns + ------- + annular_volume: float + The (unit) volume of the annulus or cylinder. + """ + length = 1 if length is None else length + id = 0 if id is None else id + annular_unit_volume = (np.pi * (od - id)**2) / 4 + annular_volume = annular_unit_volume * length + + return annular_volume From 374a0569ff378f194e0e961c6d2ad7fc8e99e4ed Mon Sep 17 00:00:00 2001 From: jonnymaserati Date: Sat, 16 Dec 2023 15:31:34 +0100 Subject: [PATCH 5/7] Add example to and unit test. --- tests/test_utils.py | 43 +++++++++++++++++++++++++++++++++++++++++++ welleng/survey.py | 8 +------- welleng/utils.py | 13 +++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) create mode 100644 tests/test_utils.py diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..abd758c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,43 @@ +import inspect +import sys + +from welleng.units import ureg +from welleng.utils import annular_volume + + +def test_annular_volume(): + av = annular_volume( + od=ureg('12.25 inch').to('meter'), + id=ureg(f'{9+5/8} inch').to('meter'), + length=ureg('1000 meter') + ) + + assert av.m == 3.491531223156194 + assert str(av.u) == 'meter ** 3' + + pass + + +def one_function_to_run_them_all(): + """ + Function to gather the test functions so that they can be tested by + running this module. + + https://stackoverflow.com/questions/18907712/python-get-list-of-all- + functions-in-current-module-inspecting-current-module + """ + test_functions = [ + obj for name, obj in inspect.getmembers(sys.modules[__name__]) + if (inspect.isfunction(obj) + and name.startswith('test') + and name != 'all') + ] + + for f in test_functions: + f() + + pass + + +if __name__ == '__main__': + one_function_to_run_them_all() diff --git a/welleng/survey.py b/welleng/survey.py index ec377ec..575b6bd 100644 --- a/welleng/survey.py +++ b/welleng/survey.py @@ -7,11 +7,7 @@ except ImportError: MAG_CALC = False from datetime import datetime -try: - from pyproj import CRS, Proj - GRID_CALC = True -except ImportError: - GRID_CALC = False +from pyproj import CRS, Proj from scipy.optimize import minimize from scipy.spatial.transform import Rotation as R @@ -70,8 +66,6 @@ def __init__(self, projection: str = "EPSG:23031") -> None: --------- For codes refer to [EPSG](https://epsg.io). """ - if not GRID_CALC: - raise ImportError('pyproj dependency required, try ``pip install pyproj``.') self.crs = CRS(projection) super().__init__(self.crs) diff --git a/welleng/utils.py b/welleng/utils.py index ca7956d..f88856e 100644 --- a/welleng/utils.py +++ b/welleng/utils.py @@ -664,6 +664,19 @@ def annular_volume(od: float, id: float = None, length: float = None): ------- annular_volume: float The (unit) volume of the annulus or cylinder. + + Example + ------- + ```python + >>> from welleng.utils import annular_volume + >>> from welleng.units import ureg + >>> av = annular_volume( + ... od=ureg('12.25 inch').to('meters), + ... id=ureg(f'{9+5/8} inch').to('meter'), + ... length=ureg('1000 meter') + ... ) + >>> print(av) + ... 3.491531223156194 meter ** 3 """ length = 1 if length is None else length id = 0 if id is None else id From d484bc4e62bdf28b8ac296a296d812fe8cce1350 Mon Sep 17 00:00:00 2001 From: jonnymaserati Date: Sat, 16 Dec 2023 16:01:58 +0100 Subject: [PATCH 6/7] Added example to survey.SurveyParameters and a unit test. --- .../{test_data => }/test_survey_parameters.py | 7 +++-- welleng/survey.py | 28 +++++++++++++++++++ welleng/utils.py | 4 +++ 3 files changed, 37 insertions(+), 2 deletions(-) rename tests/{test_data => }/test_survey_parameters.py (81%) diff --git a/tests/test_data/test_survey_parameters.py b/tests/test_survey_parameters.py similarity index 81% rename from tests/test_data/test_survey_parameters.py rename to tests/test_survey_parameters.py index 69ce643..0d4fc45 100644 --- a/tests/test_data/test_survey_parameters.py +++ b/tests/test_survey_parameters.py @@ -16,11 +16,14 @@ def test_known_location(): calculator = we.survey.SurveyParameters(reference.get('srs')) survey_parameters = calculator.get_factors_from_x_y( - reference.get('x'), reference.get('y'), date=reference.get('date') + x=reference.get('x'), y=reference.get('y'), date=reference.get('date') ) for k, v in survey_parameters.items(): - assert v == reference.get(k) + try: + assert round(v, 3) == round(reference.get(k), 3) + except TypeError: + assert v == reference.get(k) pass diff --git a/welleng/survey.py b/welleng/survey.py index 575b6bd..946bb83 100644 --- a/welleng/survey.py +++ b/welleng/survey.py @@ -116,6 +116,34 @@ def get_factors_from_x_y( The dip angle at the provided coordinates and time. date: The date used for determining the magnetic parameters. + + Example + ------- + In the following example, the parameters for Den Haag in The + Netherlands are looked up with the reference map ED50 UTM Zone 31N. + + ```python + >>> import pprint + >>> from welleng.survey import SurveyParameters + >>> calculator = SurveyParameters('EPSG:23031') + >>> survey_parameters = calculator.get_factors_from_x_y( + ... x=588319.02, y=5770571.03 + ... ) + >>> pprint(survey_parameters) + ... {'convergence': 1.01664403471959, + ... 'date': '2023-12-16', + ... 'declination': 2.213, + ... 'dip': -67.199, + ... 'easting': 588319.02, + ... 'latitude': 52.077583926214494, + ... 'longitude': 4.288694821453205, + ... 'magnetic_field_intensity': 49381, + ... 'northing': 5770571.03, + ... 'scale_factor': 0.9996957469340414, + ... 'srs': 'EPSG:23031', + ... 'x': 588319.02, + ... 'y': 5770571.03} + ``` """ longitude, latitude = self(x, y, inverse=True) result = self.get_factors(longitude, latitude) diff --git a/welleng/utils.py b/welleng/utils.py index f88856e..9954ce9 100644 --- a/welleng/utils.py +++ b/welleng/utils.py @@ -667,6 +667,9 @@ def annular_volume(od: float, id: float = None, length: float = None): Example ------- + In the following example we calculate annular volume along a 1,000 meter + section length of 9 5/8" casing inside 12 1/4" hole. + ```python >>> from welleng.utils import annular_volume >>> from welleng.units import ureg @@ -677,6 +680,7 @@ def annular_volume(od: float, id: float = None, length: float = None): ... ) >>> print(av) ... 3.491531223156194 meter ** 3 + ``` """ length = 1 if length is None else length id = 0 if id is None else id From b823ce91d53160476b2474be77ba558cc93443a1 Mon Sep 17 00:00:00 2001 From: jonnymaserati Date: Sat, 16 Dec 2023 16:04:47 +0100 Subject: [PATCH 7/7] Update to include usage: pyproj [-h] [-v] {sync} ... pyproj version: 3.6.1 [PROJ version: 9.3.0] options: -h, --help show this help message and exit -v, --verbose Show verbose debugging version information. commands: {sync} in default dependencies --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 7eac3af..5659a13 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ 'numpy', 'scipy', 'pint', + 'pyproj', # required for getting survey parameters 'PyYAML', 'setuptools', 'vtk' @@ -43,8 +44,7 @@ 'tabulate', 'trimesh', 'utm', - 'vedo', - 'pyproj' # required for getting survey parameters + 'vedo' ]) # this is the troublesome requirement that needs C dependencies