from __future__ import annotations
import argparse
import configparser
import json
import logging
import os
import time
import urllib.error
import urllib.request
from datetime import datetime
from datetime import timezone
from typing import NamedTuple
from typing import TypedDict
from vpf_730.utils import connect
logger = logging.getLogger(__name__)
[docs]
class SenderConfig(NamedTuple):
"""A class representing the configuration of sender.
:param local_db: path to the local sqlite database, where the measurements
are stored
:param send_interval: interval in minutes to send data to the endpoint
minimum 1, maximum 30 (every 30 minutes)
:param max_req_len: maximum number of measurements to send in one request
:param get_endpoint: http endpoint to get the status from (latest data)
:param post_endpoint: http endpoint where the data should be posted to
"""
local_db: str
send_interval: int
get_endpoint: str
post_endpoint: str
max_req_len: int
api_key: str
[docs]
@classmethod
def from_env(cls) -> SenderConfig:
"""Constructs a new :func:`LoggerConfig` from environment variables.
* ``VPF730_LOCAL_DB`` - path to the sqlite database which is used to store data locally
* ``VPF730_SEND_INTERVAL`` - interval in minutes to send data to the endpoint
* ``VPF730_GET_ENDPOINT`` - http endpoint to get the status from (latest data)
* ``VPF730_POST_ENDPOINT`` - http endpoint where the data should be posted to
* ``VPF730_API_KEY`` - the API-key used to authenticate when sending the requests in
:return: a new instance of :func:`SenderConfig` created from
environment variables.
""" # noqa: E501
return cls(
local_db=os.environ['VPF730_LOCAL_DB'],
send_interval=int(os.environ['VPF730_SEND_INTERVAL']),
max_req_len=int(os.environ['VPF730_MAX_REQ_LEN']),
get_endpoint=os.environ['VPF730_GET_ENDPOINT'],
post_endpoint=os.environ['VPF730_POST_ENDPOINT'],
api_key=os.environ['VPF730_API_KEY'],
)
[docs]
@classmethod
def from_file(cls, path: str) -> SenderConfig:
"""Constructs a new :func:`SenderConfig` from a provided ``.ini``
config file with this format:
.. code-block:: ini
[vpf_730]
local_db=local.db
send_interval=5
get_endpoint=https://api.example/com/vpf-730/status
post_endpoint=https://api.example/com/vpf-730/data
max_req_len=512
api_key=deadbeef
:param path: path to the ``.ini`` config file with the structure above
:return: a new instance of :func:`LoggerConfig` created from a config
file
"""
config = configparser.ConfigParser()
config.read(path)
return cls(
config['vpf_730']['local_db'],
int(config['vpf_730']['send_interval']),
config['vpf_730']['get_endpoint'],
config['vpf_730']['post_endpoint'],
int(config['vpf_730']['max_req_len']),
config['vpf_730']['api_key'],
)
[docs]
@classmethod
def from_argparse(cls, args: argparse.Namespace) -> SenderConfig:
"""Constructs a new :func:`SenderConfig` from a
:func:`argparse.Namespace`, created by the argument parser returned by
:func:`vpf_730.main.build_parser`.
:param args: arguments returned from the argument parser created by
:func:`vpf_730.main.build_parser`
:return: a new instance of :func:`SenderConfig` created from CLI
arguments
"""
return cls(
local_db=args.local_db,
send_interval=args.send_interval,
get_endpoint=args.get_endpoint,
post_endpoint=args.post_endpoint,
max_req_len=args.max_req_len,
api_key=os.environ['VPF730_API_KEY'],
)
def __repr__(self) -> str:
return (
f'{type(self).__name__}('
f'local_db={self.local_db!r}, '
f'send_interval={self.send_interval!r}, '
f'get_endpoint={self.get_endpoint!r}, '
f'post_endpoint={self.post_endpoint!r}, '
f'max_req_len={self.max_req_len!r}, '
f'api_key=***)'
)
class MeasurementDict(TypedDict):
timestamp: int
sensor_id: int
last_measurement_period: int
time_since_report: int
optical_range: float
precipitation_type_msg: str
obstruction_to_vision: str
receiver_bg_illumination: float
water_in_precip: float
temp: float
nr_precip_particles: int
transmission_eq: float
exco_less_precip_particle: float
backscatter_exco: float
self_test: str
total_exco: float
class Sender:
def __init__(self, cfg: SenderConfig) -> None:
self.cfg = cfg
self.sending = True
@property
def _sending(self) -> bool:
# this is a hack for being able to test this
return self.sending # pragma: no cover
def run(self) -> None:
prev_minute = -1
while self._sending is True:
time.sleep(.1)
now = datetime.now(timezone.utc)
# don't accidentally log the same timestamp twice
if (
now.minute % self.cfg.send_interval == 0 and
now.second == 0 and
now.minute != prev_minute
):
last_date = self.get_remote_timestamp()
data = self.get_data_from_db(start=last_date)
if not data:
continue
# we don't exceed the max_req_len and can send the data as one
if len(data) <= self.cfg.max_req_len:
self.post_data_to_remote(data=data)
continue
counter = 0
post_stack: list[MeasurementDict] = []
for row in data:
post_stack.append(row)
if counter == self.cfg.max_req_len - 1:
self.post_data_to_remote(data=post_stack)
post_stack = []
counter = 0
else:
counter += 1
# if we have data left which does not fill a max size request
else:
if post_stack:
self.post_data_to_remote(data=post_stack)
def get_remote_timestamp(self) -> int:
status_req = urllib.request.Request(
url=self.cfg.get_endpoint,
headers={
'Authorization': self.cfg.api_key,
'Content-type': 'application/json',
},
)
try:
status_resp = urllib.request.urlopen(status_req)
status_resp_str = status_resp.read().decode()
return json.loads(status_resp_str)['latest_date']
except urllib.error.HTTPError as e:
msg = json.loads(e.read().decode())
logger.exception('http error getting latest date: %s', msg)
raise
def post_data_to_remote(self, data: list[MeasurementDict]) -> None:
post_data = json.dumps({'data': data}).encode()
req = urllib.request.Request(
url=self.cfg.post_endpoint,
data=post_data,
headers={
'Authorization': self.cfg.api_key,
'Content-type': 'application/json',
},
)
try:
urllib.request.urlopen(req)
except urllib.error.HTTPError as e:
msg = json.loads(e.read().decode())
logger.exception('http error sending date: %s', msg)
raise
def get_data_from_db(self, start: int) -> list[MeasurementDict]:
"""Get data from the db starting after ``start``
:param start: unix timestamp (UTC) after which to get data
:return: data
"""
with connect(self.cfg.local_db) as db:
query = '''\
SELECT
timestamp,
sensor_id,
last_measurement_period,
time_since_report,
optical_range,
precipitation_type_msg,
obstruction_to_vision,
receiver_bg_illumination,
water_in_precip,
temp,
nr_precip_particles,
transmission_eq,
exco_less_precip_particle,
backscatter_exco,
self_test,
total_exco
FROM measurements
WHERE timestamp > ?
ORDER BY timestamp
'''
ret = db.execute(query, (start,))
val = ret.fetchall()
# https://github.com/python/mypy/issues/8890
md = MeasurementDict
return [md(i) for i in val] # type: ignore[call-arg, misc]