Source code for app.qc

from collections.abc import Callable
from collections.abc import Sequence
from datetime import timedelta
from functools import partial
from typing import Any
from typing import TypedDict

import numpy as np
import numpy.typing as npt
import pandas as pd
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncConnection
from titanlib import buddy_check
from titanlib import isolation_check
from titanlib import Points

from app.database import sessionmanager
from app.models import Station
from app.routers.v1 import TABLE_MAPPING


[docs] async def range_check( s: 'pd.Series[float]', *, lower_bound: float, upper_bound: float, **kwargs: dict[str, Any], ) -> 'pd.Series[bool]': """Check if the values in the series are within the specified range. :param s: The pandas Series to check, which must have a :class:`pd.DateTimeIndex`. :param lower_bound: The lower bound of the range. :param upper_bound: The upper bound of the range. :return: A boolean Series indicating whether each value is within the range. """ return ~((s >= lower_bound) & (s <= upper_bound))
[docs] async def persistence_check( s: 'pd.Series[float]', *, window: timedelta, excludes: Sequence[float] = [], station: Station, con: AsyncConnection, **kwargs: dict[str, Any], ) -> 'pd.Series[bool]': """Check if the values in the series are persistent. For this we need to get more data from the database so that we can check if the values are the same for ``window`` minutes. :param s: The pandas Series to check, which must have a :class:`pd.DateTimeIndex`. :return: A boolean Series indicating whether each value is persistent. """ # get some additional data from the database so we can perform the check on # enough data to cover at least one window min_data_date = s.index.min() additional_data_start = min_data_date - window table = TABLE_MAPPING[station.station_type]['max']['table'] query = ( select(table).where( table.station_id == station.station_id, table.measured_at >= additional_data_start, table.measured_at < min_data_date, ) ) db_data = await con.run_sync( lambda con: pd.read_sql(sql=query, con=con, index_col=['measured_at']), ) # now find values that are the same if not db_data.empty: all_data = pd.concat([db_data[s.name], s]).sort_index() else: all_data = s.sort_index() # we shift the data by one so we get the minimum of consecutive values changes = all_data != all_data.shift() all_data = all_data.to_frame() all_data['changes'] = changes # now we create groups where persistent values are the same all_data['groups'] = changes.cumsum() # finally per group calculate the duration of the persistent values duration: 'pd.Series[timedelta]' = all_data.groupby( 'groups', ).apply(_t_delta, include_groups=False) duration.name = 'duration' all_data = all_data.merge(duration, left_on='groups', right_index=True) all_data['flags'] = ( all_data['duration'] >= window ) & ~all_data[s.name].isin(excludes) # TODO: there might be inconsistencies if we flag the first value of a persistent # series, or not. If the first value comes from the old data we fetched, we cannot # flag it, only the following values. For a series of persistent values this means # that the first one won't be flagged, but the rest will be. # we must only return the flags for the data we got passed in the first place return all_data['flags'].loc[s.index]
def _t_delta(x: 'pd.Series[float]') -> timedelta: """Calculate the time difference between the first and last value in a series.""" max_date = x.index.max() min_date = x.index.min() return max_date - min_date
[docs] async def spike_dip_check( s: 'pd.Series[float]', *, delta: float, station: Station, con: AsyncConnection, **kwargs: dict[str, Any], ) -> 'pd.Series[bool]': """check if there are spikes or dips in the data. For this we need to get more data from the database so that we can check if the values spike more than ``delta``. :param s: The pandas Series to check, which must have a :class:`pd.DateTimeIndex`. :param delta: The threshold for the spike/dip check per minute. :param station: The station to check. :param con: The database connection to use. :return: A boolean Series indicating whether each value is a spike or dip. """ # we have to get the previous value from the database to check if the value is # a spike or dip table = TABLE_MAPPING[station.station_type]['max']['table'] query = ( select(table).where( table.station_id == station.station_id, table.measured_at < s.index.min(), ).order_by(table.measured_at.desc()).limit(1) ) db_data = await con.run_sync( lambda con: pd.read_sql(sql=query, con=con, index_col=['measured_at']), ) # in case this is the very first time the qc runs if not db_data.empty: all_data = pd.concat([db_data[s.name], s]).sort_index() else: all_data = s.sort_index() all_data = all_data.to_frame() # now find the value difference between the current and the previous value all_data['shifted'] = all_data.shift() df = all_data.reset_index() # there are jumps (a value suddenly jumps but remains at the level) and spikes # (a single values jumps for a single time step). for jumps only the first values # of the jumps is marked and for spikes the first spiked values and the flowed value # is marked since it's a dip after a spike. df['time_diff'] = abs(df['measured_at'] - df['measured_at'].shift()) df['time_diff'] = df['time_diff'].dt.total_seconds() / 60 all_data = df.set_index('measured_at') # normalize the difference by the time that passed between the two values all_data['value_diff'] = ( abs(all_data[s.name] - all_data['shifted']) / all_data['time_diff'] ) all_data['flags'] = all_data['value_diff'] > delta return all_data['flags'].loc[s.index]
[docs] class BuddyCheckConfig(TypedDict): """Configuration for the buddy check and isolation check implemented via :func:`titanlib.buddy_check` and :func:`titanlib.isolation_check`. """ callable: Callable[..., npt.NDArray[np.integer]] radius: float num_min: int threshold: float max_elev_diff: float elev_gradient: float min_std: float num_iterations: int
[docs] async def apply_buddy_check( data: pd.DataFrame, config: dict[str, BuddyCheckConfig], ) -> pd.DataFrame: """Apply the buddy check to the data for the given time period. :param data: The data to apply the buddy check to. It must have a :return: A DataFrame with the buddy check results as flags. """ data = data.sort_values('measured_at') # create a new regular 5-minute index for the data data['measured_at_rounded'] = data['measured_at'].dt.round('5min') # sometimes the above creates duplicates when we have measurements that are # more often than every 5 minutes, so we need to drop them. We keep the last data = data.drop_duplicates( subset=['measured_at_rounded', 'station_id'], keep='last', ).set_index(['measured_at_rounded', 'station_id']) dfs: list[pd.DataFrame] = [] # step through the time steps for d in data.index.get_level_values(0).unique(): df_current: pd.DataFrame = data.loc[d].copy() # prepare an initial Points object for the isolation check longitude = df_current['longitude'].to_numpy() latitude = df_current['latitude'].to_numpy() altitude = df_current['altitude'].to_numpy() points = Points(longitude, latitude, altitude) # step through the parameters we have a config for for param in config: param_config = config[param] # detect isolated stations isolation_flags = isolation_check( points, param_config['num_min'], param_config['radius'], ) isolated_col = f'{param}_qc_isolated_check' df_current.loc[:, isolated_col] = isolation_flags.astype(bool) # select only stations that are not isolated db_data_non_isolated = df_current.loc[ df_current[isolated_col] == False, # noqa: E712 param, ] non_iso_mask = ~df_current[isolated_col].to_numpy() # we need to recreate the points for only the non-isolated stations points_non_isolated = Points( longitude[non_iso_mask], latitude[non_iso_mask], altitude[non_iso_mask], ) # get the correct parameter configuration and we can only qc stations # that are not isolated size = points_non_isolated.size() flags = buddy_check( points_non_isolated, db_data_non_isolated.to_numpy(), np.full(size, param_config['radius']), np.full(size, param_config['num_min']), param_config['threshold'], param_config['max_elev_diff'], param_config['elev_gradient'], param_config['min_std'], param_config['num_iterations'], ) # store the flags in the DataFrame df_current.loc[ db_data_non_isolated.index, f'{param}_qc_buddy_check', ] = flags.astype(bool) # we need to replace the NaN values with None for the database df_current[f'{param}_qc_buddy_check'] = df_current[ f'{param}_qc_buddy_check' ].replace({np.nan: None}) # TODO: if the value was nan, it is also flagged as True dfs.append(df_current) data = pd.concat(dfs) data = data.reset_index().set_index(['measured_at', 'station_id']) return data.filter(like='_check')
BUDDY_CHECK_COLUMNS: dict[str, BuddyCheckConfig] = { # TODO: all these values need calibration and adjustment 'air_temperature': { 'callable': buddy_check, 'radius': 5500, 'num_min': 3, 'threshold': 2.7, 'max_elev_diff': 100, 'elev_gradient': -0.0065, 'min_std': 2, 'num_iterations': 5, }, 'relative_humidity': { 'callable': buddy_check, 'radius': 6000, 'num_min': 3, 'threshold': 7, 'max_elev_diff': -1, # do not check elevation difference 'elev_gradient': 0, 'min_std': 3, 'num_iterations': 5, }, 'atmospheric_pressure': { 'callable': buddy_check, 'radius': 10000, 'num_min': 3, 'threshold': 3, 'max_elev_diff': 100, 'elev_gradient': 0.125, # lapse rate at sea level 'min_std': 1.5, 'num_iterations': 5, }, # TODO: at five minutes precipitation will likely give a lot of false positives and # the qc will be useless } # The values are based on the QC-procedure used at RUB which was derived and adapted # from the Guide to the Global Observing System by WMO # https://library.wmo.int/viewer/35699/download?file=488-2017_en.pdf&type=pdf # Plausible Value Range: The measured value has to be in this range # Plausible Rate of Change: The measured value must not change more than this # per minute. # Minimum required Variability: The measured value must change after this time # (excluding some values e.g. 0 for precipitation or 0 for solar radiation during night) COLUMNS = { 'air_temperature': [ partial(range_check, lower_bound=-40, upper_bound=50), partial(persistence_check, window=timedelta(hours=3)), partial(spike_dip_check, delta=0.3), ], 'relative_humidity': [ partial(range_check, lower_bound=10, upper_bound=100), partial(persistence_check, window=timedelta(hours=5)), partial(spike_dip_check, delta=4), ], 'atmospheric_pressure': [ partial(range_check, lower_bound=860, upper_bound=1055), partial(persistence_check, window=timedelta(hours=6)), partial(spike_dip_check, delta=0.3), ], 'wind_speed': [ partial(range_check, lower_bound=0, upper_bound=30), partial(persistence_check, window=timedelta(hours=5)), partial(spike_dip_check, delta=20), ], 'u_wind': [ partial(range_check, lower_bound=-30, upper_bound=30), partial(persistence_check, window=timedelta(hours=5)), partial(spike_dip_check, delta=20), ], 'v_wind': [ partial(range_check, lower_bound=-30, upper_bound=30), partial(persistence_check, window=timedelta(hours=5)), partial(spike_dip_check, delta=20), ], 'maximum_wind_speed': [ partial(range_check, lower_bound=0, upper_bound=30), partial(persistence_check, window=timedelta(hours=1)), ], 'wind_direction': [ partial(range_check, lower_bound=0, upper_bound=360), partial(persistence_check, window=timedelta(hours=1), excludes=[0, 360]), ], 'precipitation_sum': [ partial(range_check, lower_bound=0, upper_bound=50), partial(persistence_check, window=timedelta(hours=2), excludes=[0]), partial(spike_dip_check, delta=20), ], 'solar_radiation': [ partial(range_check, lower_bound=0, upper_bound=1400), # TODO: this is dependent on the time of the day and somehow needs handling partial(persistence_check, window=timedelta(hours=3), excludes=[0, 1, 2, 3]), partial(spike_dip_check, delta=800), ], 'lightning_average_distance': [ partial(range_check, lower_bound=0, upper_bound=40), partial(persistence_check, window=timedelta(hours=1), excludes=[0]), ], 'lightning_strike_count': [ partial(range_check, lower_bound=0, upper_bound=65535), partial(persistence_check, window=timedelta(hours=1), excludes=[0]), ], 'x_orientation_angle': [ partial(range_check, lower_bound=-3, upper_bound=3), partial(spike_dip_check, delta=1), ], 'y_orientation_angle': [ partial(range_check, lower_bound=-3, upper_bound=3), partial(spike_dip_check, delta=1), ], 'black_globe_temperature': [ partial(range_check, lower_bound=-40, upper_bound=90), partial(persistence_check, window=timedelta(hours=3)), partial(spike_dip_check, delta=40), ], } # failing range checks are severe # persistence checks are less severe # spike/dip checks are even less severe QC_SCORE_WEIGHTS = pd.Series({ 'air_temperature_qc_range_check': 100, 'air_temperature_qc_persistence_check': 50, 'air_temperature_qc_spike_dip_check': 30, 'relative_humidity_qc_range_check': 100, 'relative_humidity_qc_persistence_check': 50, 'relative_humidity_qc_spike_dip_check': 30, 'atmospheric_pressure_qc_range_check': 100, 'atmospheric_pressure_qc_persistence_check': 50, 'atmospheric_pressure_qc_spike_dip_check': 30, 'wind_speed_qc_range_check': 100, 'wind_speed_qc_persistence_check': 50, 'wind_speed_qc_spike_dip_check': 30, 'wind_direction_qc_range_check': 100, 'wind_direction_qc_persistence_check': 50, 'u_wind_qc_range_check': 100, 'u_wind_qc_persistence_check': 50, 'u_wind_qc_spike_dip_check': 30, 'v_wind_qc_range_check': 100, 'v_wind_qc_persistence_check': 50, 'v_wind_qc_spike_dip_check': 30, 'maximum_wind_speed_qc_range_check': 100, 'maximum_wind_speed_qc_persistence_check': 50, 'precipitation_sum_qc_range_check': 100, 'precipitation_sum_qc_persistence_check': 50, 'precipitation_sum_qc_spike_dip_check': 30, 'solar_radiation_qc_range_check': 100, 'solar_radiation_qc_persistence_check': 50, 'solar_radiation_qc_spike_dip_check': 30, 'lightning_average_distance_qc_range_check': 100, 'lightning_average_distance_qc_persistence_check': 50, 'lightning_strike_count_qc_range_check': 100, 'lightning_strike_count_qc_persistence_check': 50, # a slightly tilted sensors is less of a problem 'x_orientation_angle_qc_range_check': 50, 'x_orientation_angle_qc_spike_dip_check': 20, 'y_orientation_angle_qc_range_check': 50, 'y_orientation_angle_qc_spike_dip_check': 20, # buddy checks are generally considered less severe # isolated checks are not severe at all 'air_temperature_qc_isolated_check': 0, 'air_temperature_qc_buddy_check': 20, 'relative_humidity_qc_isolated_check': 0, 'relative_humidity_qc_buddy_check': 20, 'atmospheric_pressure_qc_isolated_check': 0, 'atmospheric_pressure_qc_buddy_check': 20, }) def _score_qc(x: 'pd.Series[float]') -> float: # check what flags are relevant to the current selection relevant_flags = QC_SCORE_WEIGHTS.index.intersection(x.index) # only select the relevant flags weights = QC_SCORE_WEIGHTS[relevant_flags] flags = x[relevant_flags] # we can only consider valid flags, so we filter out NaN values valid = flags.notna() & weights.notna() weights = weights[valid] flags = flags[valid] if weights.empty or flags.empty or weights.sum() == 0: return float('nan') score = (weights * ~(flags).astype(bool)).sum() / weights.sum() return score
[docs] async def calculate_qc_score(data: pd.DataFrame) -> 'pd.Series[float]': """Calculate the quality control score for the data. :param data: The data to calculate the quality control score for. :return: A Series with the quality control score for each row. """ return data.apply(_score_qc, axis=1)
[docs] async def apply_qc(data: pd.DataFrame, station_id: str) -> pd.DataFrame: """Apply the quality control to the data for a given station and time period. This function applies various quality control checks to the data, such as :func:`range_check`, :func:`persistence_check`, and :func:`spike_dip_check`. :param data: The data to apply quality control to. :param station: The station to apply quality control for. :return: The data with the quality control applied as flags. """ data = data.sort_index() async with sessionmanager.session() as sess: station = ( await sess.execute(select(Station).where(Station.station_id == station_id)) ).scalar_one() con = await sess.connection() for column in data.columns: qc_functions = COLUMNS.get(column) if qc_functions: for qc_function in qc_functions: res = await qc_function(s=data[column], station=station, con=con) data[f'{column}_qc_{qc_function.func.__name__}'] = res return data