import json
import urllib.parse
import urllib.request
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Annotated
from typing import Any
from typing import Literal
from typing import overload
from typing import TypeVar
from urllib.error import HTTPError
import pandas as pd
from element.schemas import ApiReturn
from element.schemas import Device
from element.schemas import Folder
from element.schemas import Packet
from element.schemas import Reading
@dataclass
class _ValueRange:
lo: float
hi: float
T = TypeVar('T')
[docs]
class ElementApi:
"""
Class to interact with the Element API. The instance should be, if
possible, passed to functions so the internal cache can be utilized.
:param api_location: The location where the Element API is hosted
including the version e.g. ``https://dew21.element-iot.com/api/v1``
:param api_key: The API key as provided to you
"""
def __init__(self, api_location: str, api_key: str) -> None:
self.api_location = api_location.strip('/')
self.api_key = api_key
# the dict for caching looks like:
# {'foldername': {'decentlab_id': 'address'}, ...}
self._id_to_address_mapping: dict[str, dict[int, str]] = defaultdict(dict) # noqa: E
@property
def _address_to_id_mapping(self) -> dict[str, dict[str, int]]:
return {
outer_k: {inner_v: inner_k for inner_k, inner_v in inner.items()}
for outer_k, inner in self._id_to_address_mapping.items()
}
[docs]
def decentlab_id_from_address(
self,
address: str,
folder: str | None = None,
) -> int:
"""
Get the decentlab id in the format of e.g. ``21680`` from the
hexadecimal device address e.g. ``DEC0054B0``.
:param address: the address of the device in a hexadecimal format as
retrieved from the devices's mac-address e.g. ``DEC0054B0``
:param folder: The folder in the Element IoT system to query for this
this can be e.g. ``'stadt-dortmund-klimasensoren-inaktiv-sht35'``
if not specified, searching the cache may be slower
"""
# try to get the mapping from the cached values of the instance
# check all cache folders. The address is unique compared to the
# decentlab_id, that's why we don't need the folder, but it's faster
decentlab_id = None
if folder is not None:
folder_mapping = self._address_to_id_mapping.get(folder)
decentlab_id = folder_mapping.get(
address,
) if folder_mapping else None
else:
for folder in self._address_to_id_mapping:
folder_mapping = self._address_to_id_mapping[folder]
decentlab_id = folder_mapping.get(
address,
) if folder_mapping else None
# we don't know the id, try retrieving it from the API
if decentlab_id is None:
device = self.get_device(address)['body']
decentlab_id = int(
device['fields']['gerateinformation']['seriennummer'],
)
# we get the folder-slug via this API request so we can also
# populate the cache when no folder was specified
folder = device['tags'][0]['slug']
# we can also populate the cache this way
if folder is not None:
self._id_to_address_mapping[folder][decentlab_id] = address
return decentlab_id
[docs]
def address_from_decentlab_id(
self,
decentlab_id: int,
folder: str,
) -> str:
"""
Retrieve the address in the hexadecimal format e.g. ``DEC0054B0`` from
the decenlab id e.g. ``21680``.
The issue is, that the decentlab serial number/id is not part of the
regular metadata in the IoT system. As far as we can see, you only get
this when requesting actual data. Hence this may be really slow since
in the worst case we have to go through all available stations.
We try our best to cache this as part of the instance.
:param decentlab_id: The decentlab serial nr/id in the format of e.g.
``21680``
:param folder: The folder in the Element IoT system to query for this
this can be e.g. ``'stadt-dortmund-klimasensoren-inaktiv-sht35'``
"""
# if we already have the mapping, simply return it without making any
# requests to the API
# we may not even have the folder cached
folder_mapping = self._id_to_address_mapping.get(folder)
if folder_mapping and folder_mapping.get(decentlab_id):
return self._id_to_address_mapping[folder][decentlab_id]
# first, get all available devices in the folder to potentially check
# every single one of them manually.
devices = self.get_devices(folder=folder)
for i in devices['body']:
curr_device_addr = i['name']
# we can skip the ids we have already requested
curr_folder = self._id_to_address_mapping.get(folder)
if (
curr_folder and
curr_device_addr in self._id_to_address_mapping[folder].values() # noqa: E501
):
continue
# now request some data from the device to get the device id
resp = self.get_readings(
device_name=curr_device_addr,
limit=1,
max_pages=1,
)
curr_decentlab_id = resp['body'][0]['data']['device_id']
self._id_to_address_mapping[folder][curr_decentlab_id] = curr_device_addr # noqa: E501
if curr_decentlab_id == decentlab_id:
return curr_device_addr
else:
raise ValueError(
f'unable to find address for '
f'station: {decentlab_id!r}',
)
[docs]
def _make_req(
self,
route: str,
params: dict[str, str | None | int] = {},
max_pages: int | None = None,
) -> ApiReturn[T]:
param_str = ''
if params:
param_str = f"&{'&'.join([f'{k}={v}' for k, v in params.items()])}"
req = f'{self.api_location}/{route}?&auth={self.api_key}{param_str}'
ret = urllib.request.urlopen(req, timeout=5)
if '/stream' in route:
body = []
chunk = ret.readline()
while chunk:
parsed = json.loads(chunk)
body.append(parsed)
chunk = ret.readline()
# after retrieval check if it was successful
if body and 'error' in body[-1]:
raise HTTPError(
url=req,
code=408,
msg=parsed['error'],
hdrs={}, # type: ignore[arg-type]
fp=None,
)
output_data: ApiReturn[T] = {
'body': body, # type: ignore[typeddict-item]
'ok': True,
'status': 200,
}
else:
output_data = json.load(ret)
# check if the request is paginated
retrieve_after_id = output_data.get('retrieve_after_id')
i = 1
while retrieve_after_id:
if max_pages and i >= max_pages:
break
req = (
f'{self.api_location}/{route}?&auth={self.api_key}{param_str}&'
f'retrieve_after={retrieve_after_id}'
)
ret = urllib.request.urlopen(req, timeout=5)
data = json.load(ret)
retrieve_after_id = data.get('retrieve_after_id')
if isinstance(output_data['body'], list):
output_data['body'].extend(data['body'])
else:
raise TypeError(
'cannot handle pagination when `body` is not an array',
)
i += 1
return output_data
[docs]
def get_folders(self) -> ApiReturn[list[Folder]]:
"""Get the folders from the API as the raw return values. If you just
want the slugs (names), use :meth:`get_folder_slugs`.
"""
return self._make_req('tags')
[docs]
def get_folder_slugs(self) -> list[str]:
"""Get all available folder slugs. This can be:
``'stadt-dortmund-klimasensoren-inaktiv-sht35'``
"""
ret = self.get_folders()
return [i['slug'] for i in ret['body']]
[docs]
def get_devices(self, folder: str) -> ApiReturn[list[Device]]:
"""Get all available devices in the current ``folder``
:param folder: The folder(-slug) to get the devices from
"""
return self._make_req('/'.join(['tags', folder, 'devices']))
[docs]
def get_device_addresses(self, folder: str) -> list[str]:
"""Get the hexadecimal addresses e.g. ``DEC0054B0`` from all available
devices in the folder(-slug)
:param folder: The folder(-slug) to get the devices from
"""
devices = self.get_devices(folder=folder)
return [d['name'] for d in devices['body']]
[docs]
def get_device(self, address: str) -> ApiReturn[Device]:
"""Get information for a single device via the hexadecimal address.
:param address: the address of the device in a hexadecimal format as
retrieved from the devices's mac-address e.g. ``DEC0054B0``, If
only the ``decentlab_id`` is present, this may be retrieved using
:meth:`address_from_decentlab_id`.
"""
return self._make_req('/'.join(['devices', address.lower()]))
@overload
def get_readings(
self,
device_name: str,
*,
sort: Literal['measured_at', 'inserted_at'] = 'measured_at',
sort_direction: Literal['asc', 'desc'] = 'asc',
start: datetime | None = None,
end: datetime | None = None,
limit: Annotated[int, _ValueRange(1, 100)] = 100,
max_pages: int | None = None,
stream: bool = False,
timeout: Annotated[int | None, _ValueRange(250, 180000)] = None,
as_dataframe: Literal[True],
) -> pd.DataFrame:
...
@overload
def get_readings(
self,
device_name: str,
*,
sort: Literal['measured_at', 'inserted_at'] = 'measured_at',
sort_direction: Literal['asc', 'desc'] = 'asc',
start: datetime | None = None,
end: datetime | None = None,
limit: Annotated[int, _ValueRange(1, 100)] = 100,
max_pages: int | None = None,
stream: bool = False,
timeout: Annotated[int | None, _ValueRange(250, 180000)] = None,
as_dataframe: Literal[False] = False,
) -> ApiReturn[list[Reading]]:
...
[docs]
def get_readings(
self,
device_name: str,
*,
sort: Literal['measured_at', 'inserted_at'] = 'measured_at',
sort_direction: Literal['asc', 'desc'] = 'asc',
start: datetime | None = None,
end: datetime | None = None,
limit: Annotated[int, _ValueRange(1, 100)] = 100,
max_pages: int | None = None,
stream: bool = False,
timeout: Annotated[int | None, _ValueRange(250, 180000)] = None,
as_dataframe: bool = False,
) -> ApiReturn[list[Reading]] | pd.DataFrame:
"""Get acutal readings from the API. This may be returned as the raw
API-return-value or already converted to a :class:`pandas.DataFrame`.
:param device_name: The name of the device as the hexadecimal address
e.g. ``DEC0054B0``. If only the ``decentlab_id`` is present, this
may be retrieved using :meth:`address_from_decentlab_id`.
:param sort: How the values should be sorted, currently this can only
be ``measured_at`` or ``inserted_at``.
:param sort_direction: The direction the sorting should be applied.
Either ``asc`` for ascending or ``desc`` for descending.
:param start: The datetime to start getting readings for. If ``None``,
all available readings will be retrieved.
:param end: The datetime to stop getting readings for. If ``None``,
all available readings will be retrieved.
:param limit: How many values to fetch per API request (must be between
1 and 100).
:param max_pages: After how many pages of pagination we stop, to avoid
infinitely requesting data from the API.
:param stream: Whether to stream the data or not. This is useful for
very large datasets. ``limit`` is ignored when streaming, use
``start`` and ``end`` to limit the data.
:param timeout: The timeout for the request in milliseconds. The server
will close the connection after this time. This sometimes needs to
be increased for very large datasets.
:param as_dataframe: Determines whether this function returns a
:class:`pandas.DataFrame` or the raw API return
(which is the default)
"""
params: dict[str, Any] = {
'sort': sort,
'sort_direction': sort_direction,
'limit': limit,
}
if start:
params['after'] = start.isoformat().replace('+00:00', 'Z')
if end:
params['before'] = end.isoformat().replace('+00:00', 'Z')
if timeout is not None:
params['timeout'] = timeout
data: ApiReturn[list[Reading]]
if stream is False:
data = self._make_req(
'/'.join(['devices', 'by-name', device_name, 'readings']),
params=params,
max_pages=max_pages,
)
else:
params.pop('limit')
data = self._make_req(
'/'.join([
'devices', 'by-name', device_name,
'readings', 'stream',
]),
params=params,
max_pages=max_pages,
)
if as_dataframe:
# we need to manually add the measured_at (datetime) columns
df_data = [
{'measured_at': i['measured_at'], **i['data']}
for i in data['body']
]
df = pd.DataFrame(df_data)
if not df.empty:
df['measured_at'] = pd.to_datetime(
df['measured_at'],
format='mixed',
)
df = df.set_index('measured_at')
else:
print(f'no data for {device_name!r}')
return df
else:
return data
[docs]
def get_packets(
self,
*,
device_name: str | None = None,
folder: str | None = None,
packet_type: Literal['up', 'down'] | None = None,
start: datetime | None = None,
end: datetime | None = None,
limit: Annotated[int, _ValueRange(1, 100)] = 100,
stream: bool = False,
timeout: Annotated[int | None, _ValueRange(250, 180000)] = None,
max_pages: int | None = None,
) -> ApiReturn[list[Packet]]:
"""Get the original packets from the API. This is returned as the raw
API-return-value. The sorting is fixed to ``transceived_at``.
:param device_name: The name of the device as the hexadecimal address
e.g. ``DEC0054B0``. If only the ``decentlab_id`` is present, this
may be retrieved using :meth:`address_from_decentlab_id`. This is
mutually exclusive with ``folder``
:param folder: The folder in the Element IoT system to query for this
this can be e.g. ``'stadt-dortmund-klimasensoren-inaktiv-sht35'``.
This is mutually exclusive with ``device_name``
:param packet_type: Filter for packet_types (either ``up`` or ``down``)
if ``None`` all package types are returned
:param start: The datetime to start getting readings for. If ``None``,
all available readings will be retrieved.
:param end: The datetime to stop getting readings for. If ``None``,
all available readings will be retrieved.
:param limit: How many values to fetch per API request (must be between
1 and 100).
:param stream: Whether to stream the data or not. This is useful for
very large datasets. ``limit`` is ignored when streaming, use
``start`` and ``end`` to limit the data.
:param timeout: The timeout for the request in milliseconds. The server
will close the connection after this time. This sometimes needs to
be increased for very large datasets. It must be at least 250 ms
and at most 180000 ms.
:param max_pages: After how many pages of pagination we stop, to avoid
infinitely requesting data from the API.
"""
if device_name is None and folder is None:
raise TypeError(
'one of device_name or folder needs to be specified',
)
if device_name is not None and folder is not None:
raise TypeError(
'only one of device_name or folder must be specified',
)
params: dict[str, Any] = {'limit': limit}
if packet_type:
params['packet_type'] = packet_type
if start:
params['after'] = start.isoformat().replace('+00:00', 'Z')
if end:
params['before'] = end.isoformat().replace('+00:00', 'Z')
if timeout is not None:
params['timeout'] = timeout
if device_name is not None:
path_comps = ['devices', 'by-name', device_name, 'packets']
elif folder is not None: # pragma: no branch
path_comps = ['tags', folder, 'packets']
if stream is True:
params.pop('limit')
path_comps.append('stream')
route = '/'.join(path_comps)
data: ApiReturn[list[Packet]] = self._make_req(
route=route,
params=params,
max_pages=max_pages,
)
return data
def __repr__(self) -> str:
return (
f'{type(self).__name__}'
f'('
f'api_location={self.api_location!r}, '
f'api_key={(len(self.api_key) - 3) * "*"}{self.api_key[-3:]}'
')'
)
def __eq__(self, value: object) -> bool:
if isinstance(value, type(self)):
# we intentionally ignore the caches
return (
self.api_key == value.api_key and
self.api_location == value.api_location
)
else:
return False