diff --git a/krippendorff/krippendorff.py b/krippendorff/krippendorff.py index 7ce91cd..0f9b46b 100644 --- a/krippendorff/krippendorff.py +++ b/krippendorff/krippendorff.py @@ -8,25 +8,66 @@ """ from __future__ import annotations -from typing import Callable, Union +from typing import Literal, Protocol, TypeVar, Union import numpy as np import numpy.typing as npt -DEFAULT_DTYPE = np.float64 +DEFAULT_DTYPE = np.float_ -LevelOfMeasurementCallable = Callable[..., np.ndarray] -LevelOfMeasurement = Union[str, LevelOfMeasurementCallable] # See https://github.com/python/cpython/issues/86399 +ValueScalarType = TypeVar("ValueScalarType", bound=np.generic) +MetricResultScalarType = TypeVar("MetricResultScalarType", bound=np.inexact) -def _nominal_metric(v1: np.ndarray, v2: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE, - **kwargs) -> np.ndarray: # noqa + +class DistanceMetric(Protocol): + def __call__(self, v1: npt.NDArray[ValueScalarType], v2: npt.NDArray[ValueScalarType], i1: npt.NDArray[np.int_], + i2: npt.NDArray[np.int_], n_v: npt.NDArray[MetricResultScalarType], + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: + """Computes the distance for two arrays element-wise. + + Parameters + ---------- + v1 : ndarray + First array of values. + + v2 : ndarray + Second array of values. If `v1.shape != v2.shape`, they must be broadcastable to a common shape (which + becomes the shape of the output). + + i1 : ndarray + Ordinal indices of the first array (with the same shape). + + i2 : ndarray + Ordinal indices of the second array (with the same shape). + + n_v : ndarray, with shape (V,) + Number of pairable elements for each value. + + dtype : data-type + Result and computation data-type. + + Returns + ------- + d : ndarray, with shape (A, B) + Distance between `v1` and `v2`, element-wise. + """ + + +LevelOfMeasurement = Union[Literal["nominal", "ordinal", "interval", "ratio"], DistanceMetric] + + +def _nominal_metric(v1: npt.NDArray[ValueScalarType], v2: npt.NDArray[ValueScalarType], + i1: npt.NDArray[np.int_], i2: npt.NDArray[np.int_], # noqa + n_v: npt.NDArray[MetricResultScalarType], # noqa + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: """Metric for nominal data.""" return (v1 != v2).astype(dtype) -def _ordinal_metric(v1: np.ndarray, v2: np.ndarray, i1: np.ndarray, i2: np.ndarray, # noqa - n_v: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE, **kwargs) -> np.ndarray: # noqa +def _ordinal_metric(v1: npt.NDArray[ValueScalarType], v2: npt.NDArray[ValueScalarType], # noqa + i1: npt.NDArray[np.int_], i2: npt.NDArray[np.int_], n_v: npt.NDArray[np.number], + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: """Metric for ordinal data.""" i1, i2 = np.minimum(i1, i2), np.maximum(i1, i2) @@ -36,20 +77,24 @@ def _ordinal_metric(v1: np.ndarray, v2: np.ndarray, i1: np.ndarray, i2: np.ndarr return (sums_between_indices - np.divide(n_v[i1] + n_v[i2], 2, dtype=dtype)) ** 2 -def _interval_metric(v1: np.ndarray, v2: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE, - **kwargs) -> np.ndarray: # noqa +def _interval_metric(v1: npt.NDArray[ValueScalarType], v2: npt.NDArray[ValueScalarType], + i1: npt.NDArray[np.int_], i2: npt.NDArray[np.int_], n_v: npt.NDArray[np.number], # noqa + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: """Metric for interval data.""" return (v1 - v2).astype(dtype) ** 2 -def _ratio_metric(v1: np.ndarray, v2: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE, **kwargs) -> np.ndarray: # noqa +def _ratio_metric(v1: npt.NDArray[ValueScalarType], v2: npt.NDArray[ValueScalarType], i1: npt.NDArray[np.int_], # noqa + i2: npt.NDArray[np.int_], n_v: npt.NDArray[np.number], # noqa + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: """Metric for ratio data.""" v1_plus_v2 = v1 + v2 return np.divide(v1 - v2, v1_plus_v2, out=np.zeros(np.broadcast(v1, v2).shape), where=v1_plus_v2 != 0, dtype=dtype) ** 2 -def _coincidences(value_counts: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE) -> np.ndarray: +def _coincidences(value_counts: npt.NDArray[np.int_], + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: """Coincidence matrix. Parameters @@ -73,10 +118,13 @@ def _coincidences(value_counts: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE return np.divide(unnormalized_coincidences, (pairable - 1).reshape((-1, 1, 1)), dtype=dtype).sum(axis=0) -def _random_coincidences(n_v: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE) -> np.ndarray: +def _random_coincidences( + n_v: npt.NDArray[MetricResultScalarType], + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: """Random coincidence matrix. Parameters + ---------- n_v : ndarray, with shape (V,) Number of pairable elements for each value. @@ -91,8 +139,8 @@ def _random_coincidences(n_v: np.ndarray, dtype: npt.DTypeLike = DEFAULT_DTYPE) return np.divide(np.outer(n_v, n_v) - np.diagflat(n_v), n_v.sum() - 1, dtype=dtype) -def _distances(value_domain: np.ndarray, distance_metric: LevelOfMeasurementCallable, n_v: np.ndarray, - dtype: npt.DTypeLike = DEFAULT_DTYPE) -> np.ndarray: +def _distances(value_domain: npt.NDArray[ValueScalarType], distance_metric: DistanceMetric, n_v: npt.NDArray[np.int_], + dtype: np.dtype[MetricResultScalarType] = DEFAULT_DTYPE) -> npt.NDArray[MetricResultScalarType]: """Distances of the different possible values. Parameters @@ -120,7 +168,7 @@ def _distances(value_domain: np.ndarray, distance_metric: LevelOfMeasurementCall i2=indices[np.newaxis, :], n_v=n_v, dtype=dtype) -def _distance_metric(level_of_measurement: LevelOfMeasurement) -> LevelOfMeasurementCallable: +def _distance_metric(level_of_measurement: LevelOfMeasurement) -> DistanceMetric: """Distance metric callable of the level of measurement. Parameters @@ -142,7 +190,8 @@ def _distance_metric(level_of_measurement: LevelOfMeasurement) -> LevelOfMeasure }.get(level_of_measurement, level_of_measurement) -def _reliability_data_to_value_counts(reliability_data: np.ndarray, value_domain: np.ndarray) -> np.ndarray: +def _reliability_data_to_value_counts(reliability_data: npt.NDArray[ValueScalarType], + value_domain: npt.NDArray[ValueScalarType]) -> npt.NDArray[np.int_]: """Return the value counts given the reliability data. Parameters @@ -302,6 +351,10 @@ def alpha(reliability_data: npt.ArrayLike | None = None, value_counts: npt.Array if (value_counts.sum(axis=-1) <= 1).all(): raise ValueError("There has to be at least one unit with values assigned by at least two coders.") + dtype = np.dtype(dtype) + if not np.issubdtype(dtype, np.inexact): + raise ValueError("`dtype` must be an inexact type.") + distance_metric = _distance_metric(level_of_measurement) o = _coincidences(value_counts, dtype=dtype) diff --git a/pyproject.toml b/pyproject.toml index 8213e73..6892918 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,7 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.8" -numpy = "^1.20" +numpy = "^1.21" # For NumPy Typing. [tool.poetry.group.test] optional = true