from __future__ import annotations
from dataclasses import dataclass, field
from typing import Self
import numpy as np
from antupy.core.units import Unit, _conv_temp, _mul_units, _div_units, _assign_unit
from antupy.core.var import CF, Var
[docs]
@dataclass(frozen=True)
class Array():
"""
A container for homogeneous numerical data with associated physical units.
The Array class represents collections of values (like time series, parametric
variables, or measurement arrays) that share the same physical unit. It provides
automatic unit conversion and arithmetic operations while preserving dimensional
consistency.
Parameters
----------
value_ : np.ndarray, list, or None, optional
Input values as array-like or None. Default is None.
unit_ : str, Unit, or None, optional
Physical unit as string or Unit object. Default is None.
Attributes
----------
value : np.ndarray
The numerical values as a numpy array.
unit : Unit
The physical unit associated with all values.
Raises
------
TypeError
If units are incompatible during arithmetic operations.
ValueError
If requested unit conversion is not dimensionally consistent.
Examples
--------
Basic usage:
>>> from antupy import Array
>>> temperatures = Array([20.0, 25.0, 30.0], "°C")
>>> print(temperatures)
[20. 25. 30.] [°C]
Arithmetic operations with unit conversion:
>>> mass1 = Array([1.0, 2.0, 3.0], "kg")
>>> mass2 = Array([500, 1000, 1500], "g")
>>> total_mass = mass1 + mass2 # Automatic conversion
>>> print(total_mass)
[1.5 2.5 3.5] [kg]
Unit conversion:
>>> print(total_mass.set_unit("g"))
[1500. 2500. 3500.] [g]
>>> print(total_mass.get_value("ton"))
[0.0015 0.0025 0.0035]
Iteration and indexing:
>>> for temp in temperatures[:2]:
... print(f"Temperature: {temp}")
Temperature: 20.0 [°C]
Temperature: 25.0 [°C]
Notes
-----
All arithmetic operations automatically handle unit conversions when
dimensions are compatible. Incompatible operations raise TypeError.
See Also
--------
Var : For handling single values with units
antupy.units.Unit : The underlying unit representation class
"""
_value: np.ndarray|list|Array|None= None
_unit: str|Unit|None = None
value: np.ndarray = field(init=False)
unit: Unit = field(init=False)
def __post_init__(self):
if isinstance(self._value, Array) and self._unit is None:
object.__setattr__(self, "value", self._value.value)
object.__setattr__(self, "unit", self._value.unit)
elif isinstance(self._value, Array) and self._unit is not None:
unit_ = _assign_unit(self._unit)
object.__setattr__(self, "value", self._value.gv(unit_.label_unit))
object.__setattr__(self, "unit", unit_)
else:
object.__setattr__(self, "value", np.array(self._value))
object.__setattr__(self, "unit", _assign_unit(self._unit))
def __add__(self, other: Self|Var):
""" Overloading the addition operator. """
if not isinstance(other, (Array, Var)):
return NotImplemented
if self.unit == other.unit:
return Array(self.value + other.value, self.unit)
elif self.unit.base_units == other.unit.base_units:
return Array(self.value + other.gv(self.unit.label_unit), self.unit)
else:
raise TypeError(f"Cannot add {self.unit} with {other.unit}. Units are not compatible.")
def __radd__(self, other: Self|Var):
""" Overloading the addition operator. """
if not isinstance(other, (Var,Array)):
return NotImplemented
if self.unit == other.unit:
return Array(self.value + other.value, other.unit)
elif self.unit.base_units == other.unit.base_units:
return Array(other.value + self.gv(other.unit.u), other.unit)
else:
raise TypeError(f"Cannot add {self.unit} with {other.unit}. Units are not compatible.")
def __sub__(self, other: Self|Var):
""" Overloading the subtraction operator. """
if not isinstance(other, (Array, Var)):
return NotImplemented
if self.unit == other.unit:
return Array(self.value - other.value, self.unit)
elif self.unit.base_units == other.unit.base_units:
return Array(self.value - other.gv(self.unit.u), self.unit)
else:
raise TypeError(f"Cannot subtract {self.unit} with {other.unit}. Units are not compatible.")
def __rsub__(self, other: Self|Var):
""" Overloading the subtraction operator. """
if not isinstance(other, (Array, Var)):
return NotImplemented
if self.unit == other.unit:
return Array(other.value - self.value, self.unit)
elif self.unit.base_units == other.unit.base_units:
return Array(other.gv(self.unit.u) - self.value, self.unit)
else:
raise TypeError(f"Cannot subtract {self.unit} with {other.unit}. Units are not compatible.")
def __mul__(self, other: Self|Var|float|int):
""" Overloading the multiplication operator. """
if isinstance(other, (Array, Var)):
if self.value is None:
return Array(None, _mul_units(self.unit.u, other.unit.u))
return Array(
self.value * other.value,
_mul_units(self.unit.u, other.unit.u)
)
elif isinstance(other, (int, float)):
return Array(self.value * other, self.unit)
else:
raise TypeError(f"Cannot multiply {type(self)} with {type(other)}")
def __rmul__(self, other: Self|Var|float|int):
""" Overloading the multiplication operator. """
if isinstance(other, (Array, Var)):
if self.value is None:
return Array(None, _mul_units(other.unit.u, self.unit.u))
return Array(
other.value * self.value,
_mul_units(other.unit.u, self.unit.u)
)
elif isinstance(other, (int, float)):
return Array(self.value * other, self.unit)
else:
raise TypeError(f"Cannot multiply {type(self)} with {type(other)}")
def __truediv__(self, other: Self|Var|float|int):
""" Overloading the division operator. """
if isinstance(other, (Array, Var)):
return Array(self.value / other.value, _div_units(self.unit.u, other.unit.u))
elif isinstance(other, (int, float)):
if self.value is None:
return Var(None, self.unit)
return Array(self.value / other, self.unit)
else:
raise TypeError(f"Cannot divide {type(self)} by {type(other)}")
def __rtruediv__(self, other: Self|Var|float|int):
""" Overloading the division operator. """
if isinstance(other, (Array, Var)):
return Array(other.value / self.value, _div_units(other.unit.u, self.unit.u))
elif isinstance(other, (int, float)):
if self.value is None:
return Var(None, _div_units("", self.unit.u))
return Array(other / self.value, _div_units("", self.unit.u))
else:
raise TypeError(f"Cannot divide {type(other)} by {type(self)}")
def __eq__(self, other) -> bool:
""" Overloading the equality operator. """
if not isinstance(other, Array) or other.value is None:
return False
return (
np.allclose(self.value, other.value * CF(other.unit.u, self.unit.u).v)
and self.unit.base_units == other.unit.base_units
)
def __len__(self) -> int:
return len(self.v)
def __getitem__(self, key) -> Var:
return Var(float(self.value[key]), self.u)
def __iter__(self):
return (Var(v, self.u) for v in self.value)
def __repr__(self) -> str:
return f"{self.value:} [{self.unit.u}]"
def get_value(self, unit: str | None = None) -> np.ndarray:
""" Method to obtain the value of the variable in the requested unit.
If the unit is not compatible with the variable unit, an error is raised.
If the unit is None, the value is returned in the variable unit.
"""
if unit is None:
unit = self.unit.u
if self.unit == unit:
return self.value
if self.unit.base_units == Unit(unit).base_units:
if unit in ["°C", "degC","K"]:
return np.array(_conv_temp(self, unit))
return self.value * CF(self.unit.u, unit).v
else:
raise ValueError( f"Var unit ({self.unit}) and wanted unit ({unit}) are not compatible.")
def set_unit(self, unit: str | None = None) -> Array:
""" Set the primary unit of the variable. """
unit = str(unit)
if (self.unit.base_units == Unit(unit).base_units) and (self.value is not None):
return Array(self.value * CF(self.unit, unit).v, Unit(unit))
else:
raise ValueError(
f"unit ({unit}) is not compatible with existing primary unit ({self.unit})."
)
@property
def u(self) -> str:
""" Property to obtain the label unit of the variable"""
return self.unit.label_unit
@property
def v(self) -> np.ndarray:
""" Property to obtain the value of the variable in its label unit. """
return self.value
def gv(self, unit:str|None = None) -> np.ndarray:
"""Alias for self.get_value()"""
return self.get_value(unit)
def su(self, unit: str|None = None) -> Array:
"""Alias of self.set_unit"""
return self.set_unit(unit)
def compatible(self) -> list[str]:
""" Return a list of compatible units for the variable unit. """
return self.unit.compatible()
def mean(self, unit: str | None = None) -> Var:
u = self.u if unit is None else unit
return Var(self.gv(u).mean(), u)
def std(self, unit: str | None = None) -> Var:
u = self.u if unit is None else unit
return Var(self.gv(u).std(), u)
def var(self, unit: str | None = None) -> Var:
u = self.u if unit is None else unit
return Var(self.gv(u).var(), _mul_units(u, u))
def max(self, unit: str | None = None) -> Var:
u = self.u if unit is None else unit
return Var(self.gv(u).max(), u)
def min(self, unit: str | None = None) -> Var:
u = self.u if unit is None else unit
return Var(self.gv(u).min(), u)
def argmax(self) -> int:
return int(np.argmax(self.v))
def argmin(self) -> int:
return int(np.argmin(self.v))
def sum(self, unit: str | None = None) -> Var:
u = self.u if unit is None else unit
return Var(self.gv(u).sum(), u)
def prod(self, unit: str | None = None) -> Var:
u = self.u if unit is None else unit
u_f = ""
for _ in range(len(self)):
u_f = _mul_units(u, u_f)
return Var(self.gv(u).prod(), u_f)
def cumsum(self, unit: str | None = None) -> Array:
u = self.u if unit is None else unit
return Array(self.gv(u).cumsum(), u)
def sort(self, unit: str | None = None) -> Array:
u = self.u if unit is None else unit
return Array(self.gv(u).sort(), u)
def round(self, decimals: int = 0, unit: str | None = None) -> Array:
u = self.u if unit is None else unit
return Array(np.round(self.gv(u), decimals), u)