"""Doppler and polarimetric spectra.
Evaluate Doppler-velocity-resolved scattering matrices ``S_spec(v)`` and
``Z_spec(v)`` for a hydrometeor PSD (single species) or a
:class:`~rustmatrix.hd_mix.HydroMix`, then derive the spectral radar
observables (sZ_h, sZ_dr, sK_dp, sρ_hv, sδ_hv, sLDR) by applying the
bulk :mod:`rustmatrix.radar` formulas bin-by-bin.
Physics
-------
Line-of-sight velocity for a vertical-pointing radar is
``v = v_t(D) + w``, where ``v_t(D)`` is terminal fall speed and ``w``
is the mean vertical air motion.
Sign convention: **positive velocity = toward a downward-pointing
radar = fall direction**. For an up-looking radar flip the sign of
``w`` and ``v_bins`` (it is a pure sign flip — the kernels are even).
Finite beamwidth combined with horizontal wind broadens the spectrum
by a deterministic Gaussian of width
``σ_beam = |u_h| · θ_b / (2 √(2 ln 2))`` (Doviak & Zrnić 1993, small-θ_b
vertical-pointing limit). This ``σ_beam`` adds in quadrature to the
diameter-dependent turbulence width ``σ_t(D)`` when the spectral
Gaussian kernel is built.
``S_spec`` and ``Z_spec`` are linear in N(D), so a
:class:`~rustmatrix.hd_mix.HydroMix` is handled by summing per-component
spectra on a shared velocity grid.
System noise
------------
Radar receivers have a thermal noise floor that appears in every
velocity bin. The noise is uncorrelated between H and V channels, so
it adds incoherently to ``sZ_h`` and ``sZ_v`` but does **not** feed the
underlying ``S_spec`` / ``Z_spec`` matrices — those remain the
signal-only scattering matrices and continue to round-trip to the bulk
``radar.*`` helpers exactly. Noise biases ``sZ_dr``, ``sρ_hv``,
``sLDR`` through the per-bin SNR.
Pass ``noise=`` to :class:`SpectralIntegrator`:
* ``None`` (default) — no noise, the spectrum is signal-only.
* ``"realistic"`` or ``True`` — use :func:`realistic_noise_floor` for the
scatterer's wavelength (a -20 dBZ total reflectivity by default).
* a ``float`` — total noise reflectivity [mm⁶ m⁻³] distributed uniformly
across the velocity grid and applied equally to H and V.
* a 2-tuple ``(noise_h, noise_v)`` [mm⁶ m⁻³] — separate H and V noise.
References
----------
Doviak, R. J. & Zrnić, D. S. (1993). *Doppler Radar and Weather
Observations*, 2nd ed., Academic Press. (§5.3 beam-broadening.)
Zeng, Y., Janjić, T., de Lozar, A., Blahak, U., Reich, H., Keil, C.,
Seifert, A., Hagen, M., Tridon, F., Kneifel, S. (2023). "Interpreting
Doppler spectra measured by cloud radars using a particle inertia
model." *Atmos. Meas. Tech.*, 16, 3727–3757. doi:10.5194/amt-16-3727-2023
"""
from __future__ import annotations
from dataclasses import dataclass
from types import SimpleNamespace
from typing import Callable, Dict, Mapping, Optional, Tuple, Union
import warnings
import numpy as np
try:
from numpy import trapezoid as _trapezoid
except ImportError: # numpy < 2.0
from numpy import trapz as _trapezoid
from .. import radar as _radar
from .. import scatter as _scatter
from ..hd_mix import HydroMix, MixtureComponent
from ..psd import PSD
from ..scatterer import Scatterer
from ..tmatrix_aux import geom_vert_back
from . import beam # noqa: F401 — exposes spectra.beam
# ---------------------------------------------------------------------------
# Fall-speed presets (spectra.fall_speed.*)
# ---------------------------------------------------------------------------
def _to_array(D):
return np.atleast_1d(np.asarray(D, dtype=float))
[docs]
def atlas_srivastava_sekhon_1973(D, rho_ratio: float = 1.0):
"""Atlas–Srivastava–Sekhon (1973) rain terminal velocity [m/s].
``v_t(D) = (9.65 − 10.3 · exp(−0.6 · D)) · rho_ratio``
Parameters
----------
D : array_like
Equivalent drop diameter [mm].
rho_ratio : float
``(ρ_0 / ρ_air)^0.4`` air-density correction (1.0 at sea level).
"""
Dv = _to_array(D)
return rho_ratio * (9.65 - 10.3 * np.exp(-0.6 * Dv))
[docs]
def brandes_et_al_2002(D):
"""Brandes et al. (2002) 4th-order polynomial rain fall speed [m/s].
``v_t(D) = −0.1021 + 4.932 D − 0.9551 D² + 0.07934 D³ − 0.002362 D⁴``
(valid 0.1 ≤ D ≤ 8 mm; clamped to zero below 0.1 mm).
"""
Dv = _to_array(D)
v = (
-0.1021
+ 4.932 * Dv
- 0.9551 * Dv ** 2
+ 0.07934 * Dv ** 3
- 0.002362 * Dv ** 4
)
v[Dv < 0.1] = 0.0
return np.maximum(v, 0.0)
[docs]
def beard_1976(D, T: float = 293.15, P: float = 101325.0):
"""Beard-style rain terminal velocity with explicit T, P dependence [m/s].
Uses the Brandes et al. (2002) sea-level fit as the reference fall
speed and applies Beard's (1977) density correction
``(ρ₀ / ρ)^0.5`` to shift to arbitrary (T, P). This is the practical
form used in most cloud microphysics codes when "Beard 1976" is
requested with a non-standard ambient state — the explicit
three-regime Beard (1976) formulation is notoriously sensitive to
coefficient transcription and gains little accuracy over the
much-simpler Brandes fit below 7 mm.
Parameters
----------
D : array_like
Equivalent drop diameter [mm].
T : float
Air temperature [K]. Default 293.15 K (20 °C).
P : float
Air pressure [Pa]. Default 101325 Pa.
References
----------
Beard, K. V. (1977). Terminal velocity adjustment for cloud and
precipitation drops aloft. *J. Atmos. Sci.*, 34, 1293–1298.
Brandes, E. A., Zhang, G., & Vivekanandan, J. (2002). Experiments in
rainfall estimation with a polarimetric radar in a subtropical
environment. *J. Appl. Meteor.*, 41, 674–685.
"""
Dv = _to_array(D)
R_d = 287.05
rho0 = 1.2041 # reference dry-air density at 20 °C, 1 atm
rho = P / (R_d * T)
density_correction = np.sqrt(rho0 / rho)
return brandes_et_al_2002(Dv) * density_correction
[docs]
def locatelli_hobbs_1974_aggregates(D):
"""Locatelli & Hobbs (1974) unrimed aggregates (mixed habits) [m/s].
``v_t(D) = 0.69 · D^0.41`` (D in mm, valid 1 ≤ D ≤ 10 mm).
"""
Dv = _to_array(D)
return 0.69 * np.power(np.maximum(Dv, 1e-6), 0.41)
[docs]
def locatelli_hobbs_1974_graupel_hex(D):
"""Locatelli & Hobbs (1974) hexagonal graupel [m/s].
``v_t(D) = 1.1 · D^0.57`` (D in mm, valid 0.5 ≤ D ≤ 2 mm).
"""
Dv = _to_array(D)
return 1.1 * np.power(np.maximum(Dv, 1e-6), 0.57)
[docs]
def power_law(a: float, b: float, D_ref: float = 1.0, c: float = 0.0) -> Callable:
"""Return a user-parametrised power-law fall-speed callable.
``v_t(D) = a · (D / D_ref)^b + c`` [m/s, D in mm]
Parameters
----------
a, b : float
Prefactor and exponent of the power law.
D_ref : float
Reference diameter used to normalise ``D`` (mm). Default 1.0.
c : float
Additive offset [m/s]. Default 0.0.
"""
def _f(D):
Dv = _to_array(D)
return a * np.power(np.maximum(Dv, 1e-12) / D_ref, b) + c
_f.__name__ = f"power_law(a={a}, b={b}, D_ref={D_ref}, c={c})"
return _f
[docs]
class fall_speed: # noqa: N801 — used as a namespace, not a class
"""Namespace of fall-speed presets. Call any as ``fall_speed.preset(D)``."""
atlas_srivastava_sekhon_1973 = staticmethod(atlas_srivastava_sekhon_1973)
brandes_et_al_2002 = staticmethod(brandes_et_al_2002)
beard_1976 = staticmethod(beard_1976)
locatelli_hobbs_1974_aggregates = staticmethod(locatelli_hobbs_1974_aggregates)
locatelli_hobbs_1974_graupel_hex = staticmethod(locatelli_hobbs_1974_graupel_hex)
power_law = staticmethod(power_law)
# ---------------------------------------------------------------------------
# Turbulence models (spectra.turbulence.*)
# ---------------------------------------------------------------------------
class _TurbulenceModel:
"""Base class: ``__call__(D) -> σ_t(D)`` [m/s]."""
def __call__(self, D):
raise NotImplementedError
[docs]
class NoTurbulence(_TurbulenceModel):
"""``σ_t(D) = 0`` everywhere (delta-function binning)."""
def __call__(self, D):
return np.zeros_like(_to_array(D))
def __repr__(self):
return "NoTurbulence()"
[docs]
class GaussianTurbulence(_TurbulenceModel):
"""Constant Gaussian broadening: ``σ_t(D) = σ_t``.
The canonical spectral-polarimetry assumption. Matches classical
papers that parameterise broadening with a single bulk σ_t.
"""
def __init__(self, sigma_t: float):
if sigma_t < 0:
raise ValueError("sigma_t must be non-negative.")
self.sigma_t = float(sigma_t)
def __call__(self, D):
return np.full_like(_to_array(D), self.sigma_t)
def __repr__(self):
return f"GaussianTurbulence(sigma_t={self.sigma_t})"
[docs]
class InertialZeng2023(_TurbulenceModel):
"""Particle-inertia-corrected turbulence response (Zeng et al. 2023).
Heavy drops cannot follow small-scale eddies, so their effective
turbulence broadening is a low-pass-filtered version of the ambient
air turbulence. We use a first-order response:
σ_t(D) = σ_air / √(1 + St(D)²)
with Stokes number ``St(D) = τ_p(D) / τ_eddy``, particle relaxation
time ``τ_p(D) = v_t_ref(D) / g`` (g = 9.80665 m/s²), and eddy
turnover time ``τ_eddy = (L_o² / ε)^(1/3)``.
Limits:
* Small drops (``St → 0``) → ``GaussianTurbulence(σ_air)``.
* Large drops (``St → ∞``) → ``NoTurbulence()``.
Parameters
----------
sigma_air : float
Ambient turbulence intensity [m/s].
epsilon : float
Turbulent kinetic energy dissipation rate [m²/s³].
L_o : float, optional
Outer (integral) length scale [m]. Default 100 m.
v_t_ref : callable, optional
``D -> v_t(D)`` used to estimate ``τ_p``. Defaults to
:func:`atlas_srivastava_sekhon_1973`. Pass the same fall-speed
model used in the integrator for consistency.
References
----------
Zeng et al. (2023), *Atmos. Meas. Tech.*, 16, 3727.
"""
_g = 9.80665 # m/s²
def __init__(
self,
sigma_air: float,
epsilon: float,
L_o: float = 100.0,
v_t_ref: Optional[Callable] = None,
):
if sigma_air < 0:
raise ValueError("sigma_air must be non-negative.")
if epsilon <= 0:
raise ValueError("epsilon must be positive.")
if L_o <= 0:
raise ValueError("L_o must be positive.")
self.sigma_air = float(sigma_air)
self.epsilon = float(epsilon)
self.L_o = float(L_o)
self.v_t_ref = v_t_ref if v_t_ref is not None else atlas_srivastava_sekhon_1973
self.tau_eddy = (self.L_o ** 2 / self.epsilon) ** (1.0 / 3.0)
def __call__(self, D):
Dv = _to_array(D)
tau_p = self.v_t_ref(Dv) / self._g
St = tau_p / self.tau_eddy
return self.sigma_air / np.sqrt(1.0 + St ** 2)
def __repr__(self):
return (
f"InertialZeng2023(sigma_air={self.sigma_air}, "
f"epsilon={self.epsilon}, L_o={self.L_o})"
)
def _normalise_turbulence(t) -> Callable:
"""Accept a _TurbulenceModel, or any `D -> σ_t(D)` callable."""
if t is None:
return NoTurbulence()
if isinstance(t, _TurbulenceModel):
return t
if callable(t):
return t
raise TypeError(
"turbulence must be a TurbulenceModel, a callable D->σ_t, or None."
)
[docs]
class turbulence: # noqa: N801 — namespace
"""Namespace for turbulence models. ``turbulence.from_params(...)``
picks the right concrete model from simple scalar inputs."""
NoTurbulence = NoTurbulence
GaussianTurbulence = GaussianTurbulence
InertialZeng2023 = InertialZeng2023
[docs]
@staticmethod
def from_params(
sigma: Optional[float] = None,
epsilon: Optional[float] = None,
L_o: float = 100.0,
v_t_ref: Optional[Callable] = None,
) -> _TurbulenceModel:
"""Smart constructor.
* ``epsilon`` supplied ⇒ :class:`InertialZeng2023` (with
``sigma_air = sigma`` if given, else a default of 0.3 m/s).
* only ``sigma`` supplied ⇒ :class:`GaussianTurbulence`.
* both ``None`` (or ``sigma == 0`` and no ε) ⇒ :class:`NoTurbulence`.
"""
if epsilon is not None:
return InertialZeng2023(
sigma_air=sigma if sigma is not None else 0.3,
epsilon=epsilon,
L_o=L_o,
v_t_ref=v_t_ref,
)
if sigma is None or sigma == 0:
return NoTurbulence()
return GaussianTurbulence(sigma_t=sigma)
# ---------------------------------------------------------------------------
# System noise
# ---------------------------------------------------------------------------
#: Default "realistic" total noise reflectivity [dBZ] when the user asks
#: for a realistic default. Picked to be a sensible mid-range value that
#: is representative of research radars without being wavelength-specific
#: (operational S-band is quieter, cloud radars noisier in absolute dBZ
#: but equivalent per-range-gate).
REALISTIC_NOISE_DBZ = -20.0
[docs]
def realistic_noise_floor(
wavelength_mm: float = 33.3, Z_dBZ: float = REALISTIC_NOISE_DBZ
) -> float:
"""Return a realistic total noise reflectivity in mm⁶ m⁻³.
Converts a noise-equivalent reflectivity in dBZ to linear units. The
default (-20 dBZ) is a realistic per-range-gate noise floor for a
research radar; operational S-band systems are typically quieter
(-30 to -40 dBZ), while cloud radars at Ka/W with short integration
times can be noisier.
The wavelength argument is accepted for signature-compatibility so
callers can write ``realistic_noise_floor(scatterer.wavelength)``
even though the default is currently wavelength-independent.
Parameters
----------
wavelength_mm : float
Radar wavelength [mm]. Currently unused by the default model but
reserved for future band-dependent presets.
Z_dBZ : float
Total noise reflectivity [dBZ] across the Nyquist range.
Returns
-------
float
Total noise reflectivity in linear mm⁶ m⁻³.
"""
_ = wavelength_mm # reserved; kept for future band-dependent behaviour
return 10.0 ** (Z_dBZ / 10.0)
def _resolve_noise(noise, wavelength_mm: float) -> Tuple[float, float]:
"""Return ``(noise_h, noise_v)`` in mm⁶ m⁻³ from the user spec.
``None`` or 0 ⇒ ``(0, 0)`` (no noise).
``"realistic"`` / ``"default"`` / ``True`` ⇒ realistic default on both channels.
``float`` ⇒ same value on H and V.
``(nh, nv)`` ⇒ separate H and V values.
"""
if noise is None or noise is False:
return 0.0, 0.0
if noise is True or (isinstance(noise, str) and noise.lower() in ("realistic", "default")):
n = realistic_noise_floor(wavelength_mm)
return n, n
if isinstance(noise, (tuple, list)):
if len(noise) != 2:
raise ValueError("`noise` tuple must be (noise_h, noise_v).")
nh, nv = float(noise[0]), float(noise[1])
if nh < 0 or nv < 0:
raise ValueError("noise values must be non-negative.")
return nh, nv
try:
n = float(noise)
except (TypeError, ValueError) as exc:
raise TypeError(
"`noise` must be None, True, 'realistic', a float, or a (noise_h, noise_v) tuple."
) from exc
if n < 0:
raise ValueError("noise value must be non-negative.")
return n, n
# ---------------------------------------------------------------------------
# Spectral result container
# ---------------------------------------------------------------------------
[docs]
@dataclass
class SpectralResult:
"""Doppler-resolved scattering matrices and derived observables.
Attributes
----------
v : ndarray (M,)
Doppler-velocity bin centres [m/s].
S_spec : ndarray (2, 2, M) complex
Spectral amplitude matrix.
Z_spec : ndarray (4, 4, M) float
Spectral phase matrix.
sZ_h, sZ_v : ndarray (M,)
Spectral linear reflectivity [mm⁶ m⁻³ / (m/s)], horizontal /
vertical polarisation.
sZ_dr : ndarray (M,)
Spectral differential reflectivity (linear ratio).
sKdp : ndarray (M,) or None
Spectral specific differential phase [° / km]. ``None`` when no
forward-scatter geometry was supplied.
srho_hv : ndarray (M,)
Spectral copolar correlation coefficient (0–1).
sdelta_hv : ndarray (M,)
Spectral backscatter differential phase [rad].
sLDR : ndarray (M,)
Spectral linear depolarisation ratio (linear).
wavelength : float
Wavelength [mm] (copied from the scatterer for `collapse_to_bulk`).
Kw_sqr : float
|K_w|² (copied for `collapse_to_bulk`).
"""
v: np.ndarray
S_spec: np.ndarray
Z_spec: np.ndarray
sZ_h: np.ndarray
sZ_v: np.ndarray
sZ_dr: np.ndarray
sKdp: Optional[np.ndarray]
srho_hv: np.ndarray
sdelta_hv: np.ndarray
sLDR: np.ndarray
wavelength: float
Kw_sqr: float
noise_h: float = 0.0
noise_v: float = 0.0
_S_spec_forward: Optional[np.ndarray] = None # per-bin forward S for sKdp re-derivation
[docs]
def collapse_to_bulk(self) -> SimpleNamespace:
"""Integrate S_spec / Z_spec over v and return a Scatterer-shaped
shim so bulk :mod:`radar` helpers can be called on the integrated
matrices.
The returned object has ``.get_S()``, ``.get_Z()``,
``.wavelength``, ``.Kw_sqr``, and a backscatter geometry — enough
for ``radar.refl``, ``radar.Zdr``, ``radar.rho_hv``,
``radar.delta_hv`` to work. If the spectrum was built with a
forward geometry, ``.get_S_forward()`` returns the forward-summed
S so you can hand-construct ``radar.Kdp``-style quantities.
"""
Z_sum = _trapezoid(self.Z_spec, self.v, axis=-1)
S_sum_back = _trapezoid(self.S_spec, self.v, axis=-1)
if self._S_spec_forward is not None:
S_sum_forward = _trapezoid(self._S_spec_forward, self.v, axis=-1)
else:
S_sum_forward = None
shim = SimpleNamespace()
shim.wavelength = self.wavelength
shim.Kw_sqr = self.Kw_sqr
shim.thet0, shim.thet, shim.phi0, shim.phi = 0.0, 180.0, 0.0, 0.0
shim.alpha, shim.beta = 0.0, 0.0
shim.psd_integrator = None
shim.get_S = lambda _S=S_sum_back: _S
shim.get_Z = lambda _Z=Z_sum: _Z
shim.get_SZ = lambda _S=S_sum_back, _Z=Z_sum: (_S, _Z)
if S_sum_forward is not None:
shim.get_S_forward = lambda _S=S_sum_forward: _S
return shim
# ---------------------------------------------------------------------------
# SpectralIntegrator
# ---------------------------------------------------------------------------
[docs]
class SpectralIntegrator:
"""Build spectral S and Z matrices on a Doppler-velocity grid.
Parameters
----------
source : Scatterer or HydroMix
The scattering target. A :class:`~rustmatrix.Scatterer` must have
an initialised ``psd_integrator`` and a ``psd`` attached. A
:class:`~rustmatrix.HydroMix` carries its own components (each
with ``scatterer.psd``).
fall_speed : callable, optional
``D_mm -> v_t_m_per_s`` for the **single-species** case. Ignored
when ``source`` is a ``HydroMix``.
turbulence : _TurbulenceModel or callable or None, optional
Turbulence model for the single-species case. Ignored when
``source`` is a ``HydroMix``.
component_kinematics : mapping, optional
For ``HydroMix`` inputs only. Maps component label (or index)
to ``(fall_speed, turbulence)``. Every component in the mixture
must be represented.
v_bins : array_like, optional
Explicit 1-D velocity grid [m/s]. Mutually exclusive with
``(v_min, v_max, n_bins)``.
v_min, v_max, n_bins : float, float, int, optional
Convenience triple that builds ``v_bins = linspace(v_min, v_max, n_bins)``.
w : float
Mean vertical air motion [m/s], positive-downward. Default 0.
u_h : float
Horizontal wind speed magnitude [m/s]. Drives beam broadening.
Default 0.
beamwidth : float
One-way half-power beamwidth θ_b [rad]. Default 0 (pencil beam).
geometry_backscatter : 6-tuple
Backscatter geometry. Default :data:`geom_vert_back`.
geometry_forward : 6-tuple, optional
Forward-scatter geometry. Required if you want sK_dp.
noise : None / "realistic" / True / float / (float, float), optional
System noise floor. ``None`` (default) disables noise and the
spectrum is signal-only — :meth:`SpectralResult.collapse_to_bulk`
then round-trips to the bulk radar observables exactly. Pass
``"realistic"`` or ``True`` to use :func:`realistic_noise_floor`
for the scatterer's wavelength; a scalar for equal H/V noise in
mm⁶ m⁻³; or a 2-tuple ``(noise_h, noise_v)``.
Noise is distributed uniformly across ``v_bins``, added to
``sZ_h`` / ``sZ_v``, and biases ``sZ_dr`` / ``sρ_hv`` / ``sLDR``
through per-bin SNR. The underlying ``S_spec`` and ``Z_spec``
stay signal-only.
"""
_sigma_beam_fwhm = 2.0 * np.sqrt(2.0 * np.log(2.0))
def __init__(
self,
source: Union[Scatterer, HydroMix],
fall_speed: Optional[Callable] = None,
turbulence=None,
*,
component_kinematics: Optional[Mapping] = None,
v_bins: Optional[np.ndarray] = None,
v_min: Optional[float] = None,
v_max: Optional[float] = None,
n_bins: Optional[int] = None,
w: float = 0.0,
u_h: float = 0.0,
beamwidth: float = 0.0,
geometry_backscatter: Tuple = geom_vert_back,
geometry_forward: Optional[Tuple] = None,
noise=None,
):
self.source = source
self.w = float(w)
self.u_h = float(abs(u_h))
self.beamwidth = float(beamwidth)
self.geometry_backscatter = tuple(geometry_backscatter)
self.geometry_forward = (
tuple(geometry_forward) if geometry_forward is not None else None
)
self._noise_spec = noise # resolved lazily in run() once λ is known
# --- velocity grid ---
v_triple = (v_min, v_max, n_bins)
triple_given = any(x is not None for x in v_triple)
if v_bins is None and not triple_given:
raise ValueError(
"Pass either `v_bins` or all three of `v_min`, `v_max`, `n_bins`."
)
if v_bins is not None and triple_given:
raise ValueError(
"Pass only one of `v_bins` or `(v_min, v_max, n_bins)`."
)
if v_bins is not None:
self.v_bins = np.asarray(v_bins, dtype=float).ravel()
else:
if None in v_triple:
raise ValueError(
"`v_min`, `v_max`, `n_bins` must all be supplied together."
)
self.v_bins = np.linspace(float(v_min), float(v_max), int(n_bins))
if self.v_bins.size < 2:
raise ValueError("v_bins must have at least 2 points.")
# --- beam-broadening variance ---
self.sigma_beam = self.u_h * self.beamwidth / self._sigma_beam_fwhm
# --- resolve per-component kinematics ---
if isinstance(source, HydroMix):
if component_kinematics is None:
raise ValueError(
"HydroMix sources require `component_kinematics` mapping "
"component label -> (fall_speed, turbulence)."
)
if fall_speed is not None or turbulence is not None:
raise ValueError(
"Pass `component_kinematics` for HydroMix; do not also "
"pass single-species `fall_speed`/`turbulence`."
)
self._component_kin = self._resolve_component_kinematics(
source, component_kinematics
)
self._single_kin = None
else:
if fall_speed is None:
raise ValueError("Single-species source requires `fall_speed`.")
self._single_kin = (fall_speed, _normalise_turbulence(turbulence))
self._component_kin = None
# ------------------------------------------------------------------
@staticmethod
def _resolve_component_kinematics(mix: HydroMix, mapping: Mapping):
by_key: Dict[int, Tuple[Callable, Callable]] = {}
labels = [c.label for c in mix.components]
# Accept either label-keyed or index-keyed mappings.
for key, val in mapping.items():
if not (isinstance(val, tuple) and len(val) == 2):
raise ValueError(
f"component_kinematics[{key!r}] must be (fall_speed, turbulence)."
)
fs, turb = val
turb = _normalise_turbulence(turb)
if isinstance(key, int):
idx = key
else:
if key not in labels:
raise ValueError(
f"component_kinematics refers to label {key!r} not "
f"in mixture labels {labels!r}."
)
idx = labels.index(key)
by_key[idx] = (fs, turb)
missing = set(range(len(mix.components))) - set(by_key)
if missing:
missing_labels = [labels[i] for i in sorted(missing)]
raise ValueError(
"component_kinematics missing entries for: "
f"{missing_labels!r}."
)
return [by_key[i] for i in range(len(mix.components))]
# ------------------------------------------------------------------
def _spectra_for_component(
self,
scatterer: Scatterer,
psd: PSD,
fall_speed_fn: Callable,
turb_fn: Callable,
geom: Tuple,
table_key: str,
) -> np.ndarray:
"""Build the spectral matrix (complex or real) for one component.
Returns an array of shape ``(P, Q, M)`` where ``(P, Q)`` is
``(2, 2)`` for the S table and ``(4, 4)`` for Z.
"""
integ = scatterer.psd_integrator
if integ is None or getattr(integ, "_S_table", None) is None:
raise ValueError(
"Component scatterer lacks an initialised PSDIntegrator."
)
if geom not in integ.geometries:
raise ValueError(
f"Geometry {geom!r} is not registered on this component. "
"Include it in PSDIntegrator.geometries before "
"init_scatter_table."
)
D = integ._psd_D # shape (N_D,)
table = getattr(integ, f"_{table_key}_table")[geom] # (P, Q, N_D)
N_D = np.atleast_1d(psd(D)) # shape (N_D,)
v_t = np.atleast_1d(np.asarray(fall_speed_fn(D), dtype=float))
sigma_t = np.atleast_1d(np.asarray(turb_fn(D), dtype=float))
sigma_eff = np.sqrt(sigma_t ** 2 + self.sigma_beam ** 2) # (N_D,)
# Expected velocity for each diameter (positive-downward).
v_exp = v_t + self.w # (N_D,)
v = self.v_bins # (M,)
# Per-bin kernel K(v_k, D_i), shape (M, N_D).
diff = v[:, None] - v_exp[None, :]
# Compute per-bin local widths once — used both for the delta-bin
# path and for the narrow-Gaussian threshold.
dv_edge = np.diff(v)
local_width = np.empty_like(v)
local_width[1:-1] = 0.5 * (dv_edge[:-1] + dv_edge[1:])
local_width[0] = dv_edge[0]
local_width[-1] = dv_edge[-1]
# Route to the integral-preserving delta path whenever the
# Gaussian kernel is too narrow to be resolved by the grid
# (σ below half the median bin width). Evaluating a needle-like
# Gaussian at bin centers undersamples it; binning the whole
# power into the nearest bin gives the correct ∫sZ_h dv.
dv_median = float(np.median(local_width))
delta_sigma = sigma_eff < 0.5 * dv_median
K = np.zeros((v.size, D.size), dtype=float)
if np.any(~delta_sigma):
s = sigma_eff[~delta_sigma]
K[:, ~delta_sigma] = (
1.0 / (np.sqrt(2.0 * np.pi) * s)
) * np.exp(-0.5 * (diff[:, ~delta_sigma] / s) ** 2)
if np.any(delta_sigma):
for idx_D in np.where(delta_sigma)[0]:
k = int(np.argmin(np.abs(v - v_exp[idx_D])))
if local_width[k] > 0:
K[k, idx_D] = 1.0 / local_width[k]
# Trapezoidal weights along D.
w_D = np.empty_like(D)
dD = np.diff(D)
w_D[1:-1] = 0.5 * (dD[:-1] + dD[1:])
w_D[0] = 0.5 * dD[0]
w_D[-1] = 0.5 * dD[-1]
# Build combined weight[k, D] = N(D) * K(k, D) * trap_w(D).
weight = K * (N_D * w_D)[None, :] # (M, N_D)
# Contract over the diameter axis.
# table has shape (P, Q, N_D); weight has shape (M, N_D).
# Result shape (P, Q, M).
return np.einsum("pqd,md->pqm", table, weight)
# ------------------------------------------------------------------
def _range_warning(self):
"""Emit UserWarning if the v_bins grid is too narrow to hold the
bulk of the spectral power."""
# Determine expected min/max of v_exp ± 3 σ_eff across all components.
lo_all, hi_all = [], []
if self._single_kin is not None:
sc = self.source
integ = sc.psd_integrator
D = integ._psd_D if integ is not None else np.array([1.0])
fs, turb = self._single_kin
v_exp = fs(D) + self.w
sigma_eff = np.sqrt(turb(D) ** 2 + self.sigma_beam ** 2)
lo_all.append(np.min(v_exp - 3 * sigma_eff))
hi_all.append(np.max(v_exp + 3 * sigma_eff))
else:
for comp, (fs, turb) in zip(self.source.components, self._component_kin):
integ = comp.scatterer.psd_integrator
D = integ._psd_D
v_exp = fs(D) + self.w
sigma_eff = np.sqrt(turb(D) ** 2 + self.sigma_beam ** 2)
lo_all.append(np.min(v_exp - 3 * sigma_eff))
hi_all.append(np.max(v_exp + 3 * sigma_eff))
lo, hi = min(lo_all), max(hi_all)
v_lo, v_hi = self.v_bins[0], self.v_bins[-1]
if lo < v_lo or hi > v_hi:
warnings.warn(
"Spectral power extends beyond v_bins: expected range "
f"[{lo:.3g}, {hi:.3g}] m/s, v_bins covers "
f"[{v_lo:.3g}, {v_hi:.3g}] m/s. Bulk-sum identity will be "
"degraded by the leakage.",
UserWarning,
stacklevel=3,
)
# ------------------------------------------------------------------
[docs]
def run(self) -> SpectralResult:
"""Evaluate the spectrum and derived observables."""
self._range_warning()
geom_b = self.geometry_backscatter
geom_f = self.geometry_forward
M = self.v_bins.size
if self._single_kin is not None:
sc = self.source
fs, turb = self._single_kin
S_spec = self._spectra_for_component(
sc, sc.psd, fs, turb, geom_b, "S"
)
Z_spec = self._spectra_for_component(
sc, sc.psd, fs, turb, geom_b, "Z"
)
if geom_f is not None:
S_spec_f = self._spectra_for_component(
sc, sc.psd, fs, turb, geom_f, "S"
)
else:
S_spec_f = None
wavelength = sc.wavelength
Kw_sqr = sc.Kw_sqr
else:
mix: HydroMix = self.source
S_spec = np.zeros((2, 2, M), dtype=complex)
Z_spec = np.zeros((4, 4, M), dtype=float)
S_spec_f = (
np.zeros((2, 2, M), dtype=complex) if geom_f is not None else None
)
for comp, (fs, turb) in zip(mix.components, self._component_kin):
S_spec += self._spectra_for_component(
comp.scatterer, comp.psd, fs, turb, geom_b, "S"
)
Z_spec += self._spectra_for_component(
comp.scatterer, comp.psd, fs, turb, geom_b, "Z"
)
if geom_f is not None:
S_spec_f += self._spectra_for_component(
comp.scatterer, comp.psd, fs, turb, geom_f, "S"
)
wavelength = mix.wavelength
Kw_sqr = mix.Kw_sqr
# Derive per-bin observables.
sZ_h = np.empty(M)
sZ_v = np.empty(M)
sZ_dr = np.empty(M)
srho_hv = np.empty(M)
sdelta_hv = np.empty(M)
sLDR = np.empty(M)
sKdp = np.empty(M) if geom_f is not None else None
pref = wavelength ** 4 / (np.pi ** 5 * Kw_sqr)
# Resolve system-noise spec now that λ is known, and spread it
# uniformly across the velocity grid (units: mm⁶ m⁻³ / (m/s)).
noise_h_total, noise_v_total = _resolve_noise(self._noise_spec, wavelength)
v_span = float(self.v_bins[-1] - self.v_bins[0])
noise_psd_h = noise_h_total / v_span if v_span > 0 else 0.0
noise_psd_v = noise_v_total / v_span if v_span > 0 else 0.0
for k in range(M):
Z = Z_spec[:, :, k]
sigma_h = 2.0 * np.pi * (Z[0, 0] - Z[0, 1] - Z[1, 0] + Z[1, 1])
sigma_v = 2.0 * np.pi * (Z[0, 0] + Z[0, 1] + Z[1, 0] + Z[1, 1])
signal_h = pref * sigma_h
signal_v = pref * sigma_v
sZ_h[k] = signal_h + noise_psd_h
sZ_v[k] = signal_v + noise_psd_v
# Z_dr including noise bias (symmetric if noise_h == noise_v).
if sZ_v[k] != 0:
sZ_dr[k] = sZ_h[k] / sZ_v[k]
else:
sZ_dr[k] = np.nan
a = (Z[2, 2] + Z[3, 3]) ** 2 + (Z[3, 2] - Z[2, 3]) ** 2
b = Z[0, 0] - Z[0, 1] - Z[1, 0] + Z[1, 1]
c = Z[0, 0] + Z[0, 1] + Z[1, 0] + Z[1, 1]
if (b * c) > 0:
rho_signal = np.sqrt(a / (b * c))
else:
rho_signal = np.nan
# Noise biases ρ_hv by the per-bin SNR: ρ_obs = ρ_sig /
# sqrt((1+1/SNR_h)(1+1/SNR_v)). With zero noise this reduces
# to ρ_signal exactly.
if noise_psd_h > 0 or noise_psd_v > 0:
# Guard against 0/0: bins with no signal collapse ρ → 0
# (the noise-only limit) rather than overflowing.
if signal_h <= 0 or signal_v <= 0:
srho_hv[k] = 0.0
else:
# Bias factor in the noise/signal form; overflow
# here means SNR≈0, so ρ→0 is the correct physical
# answer — silence the warning and clamp.
with np.errstate(over="ignore"):
n_over_s_h = noise_psd_h / signal_h if noise_psd_h > 0 else 0.0
n_over_s_v = noise_psd_v / signal_v if noise_psd_v > 0 else 0.0
bias = np.sqrt((1.0 + n_over_s_h) * (1.0 + n_over_s_v))
srho_hv[k] = rho_signal / bias if np.isfinite(bias) else 0.0
else:
srho_hv[k] = rho_signal
sdelta_hv[k] = np.arctan2(Z[2, 3] - Z[3, 2], -Z[2, 2] - Z[3, 3])
num_ldr = Z[0, 0] - Z[0, 1] + Z[1, 0] - Z[1, 1]
den_ldr = Z[0, 0] - Z[0, 1] - Z[1, 0] + Z[1, 1]
# LDR: noise adds to the cross-pol (num) and co-pol (den) in
# proportion to each channel's noise — same formula as sZ_dr.
ldr_num = pref * 2.0 * np.pi * num_ldr + noise_psd_v
ldr_den = pref * 2.0 * np.pi * den_ldr + noise_psd_h
sLDR[k] = ldr_num / ldr_den if ldr_den != 0 else np.nan
if sKdp is not None:
Sf = S_spec_f[:, :, k]
sKdp[k] = 1e-3 * (180.0 / np.pi) * wavelength * (Sf[1, 1] - Sf[0, 0]).real
return SpectralResult(
v=self.v_bins.copy(),
S_spec=S_spec,
Z_spec=Z_spec,
sZ_h=sZ_h,
sZ_v=sZ_v,
sZ_dr=sZ_dr,
sKdp=sKdp,
srho_hv=srho_hv,
sdelta_hv=sdelta_hv,
sLDR=sLDR,
wavelength=wavelength,
Kw_sqr=Kw_sqr,
noise_h=noise_h_total,
noise_v=noise_v_total,
_S_spec_forward=S_spec_f,
)
__all__ = [
"fall_speed",
"turbulence",
"beam",
"SpectralIntegrator",
"SpectralResult",
"NoTurbulence",
"GaussianTurbulence",
"InertialZeng2023",
"atlas_srivastava_sekhon_1973",
"brandes_et_al_2002",
"beard_1976",
"locatelli_hobbs_1974_aggregates",
"locatelli_hobbs_1974_graupel_hex",
"power_law",
"realistic_noise_floor",
"REALISTIC_NOISE_DBZ",
]