Source code for app.celery

import os
from collections.abc import Callable
from collections.abc import Coroutine
from datetime import timedelta
from functools import wraps
from typing import Any
from typing import ParamSpec
from typing import TypeVar

import sentry_sdk
from asgiref import sync
from celery import Celery
from celery import signals
from celery import Task
from sentry_sdk.integrations.celery import CeleryIntegration

P = ParamSpec('P')
R = TypeVar('R')

# https://github.com/sbdchd/celery-types/issues/80
Task.__class_getitem__ = classmethod(  # type: ignore [attr-defined]
    lambda cls, *args, **kwargs: cls,
)


[docs] def async_task(app: Celery, *args: Any, **kwargs: Any) -> Task[Any, Any]: """Decorator to convert an async function into a Celery task. :param app: The Celery app instance. :param args: Positional arguments to pass to the Celery task decorator. :param kwargs: Keyword arguments to pass to the Celery task decorator. :return: A Celery task decorator that wraps the async function. """ # taken from: https://github.com/celery/celery/issues/6552 def _decorator(func: Callable[P, Coroutine[Any, Any, R]]) -> Task[Any, Any]: # if we are running tests, we don't want this to be converted to a sync # function if 'PYTEST_VERSION' in os.environ: # pragma: no branch # give the function it's .s attribute for testing func.s = func # type: ignore[attr-defined] return func # type: ignore[return-value] sync_call = sync.AsyncToSync(func) @app.task(*args, **kwargs) @wraps(func) def _decorated(*args: P.args, **kwargs: P.kwargs) -> R: # pragma: no cover return sync_call(*args, **kwargs) return _decorated # TODO: remove this once we have the types figured out correctly return _decorator # type: ignore[return-value]
celery_app = Celery( 'd2r-api', broker=os.environ['CELERY_BROKER_URL'], backend=os.environ['CELERY_BROKER_URL'], task_soft_time_limit=int(os.environ['QUEUE_SOFT_TIME_LIMIT']), broker_connection_retry_on_startup=True, include=['app.tasks', 'app.tc_ingester'], result_expires=timedelta(seconds=600), # expire after 10 minutes ) celery_app.conf.timezone = 'UTC' celery_app.set_default()
[docs] @signals.celeryd_init.connect def init_sentry(**_kwargs: Any) -> None: # pragma: no cover """Initialize Sentry for Celery tasks.""" sentry_sdk.init( dsn=os.environ.get('MONITOR_SENTRY_DSN'), integrations=[CeleryIntegration()], traces_sample_rate=float(os.environ.get('SENTRY_SAMPLE_RATE', 0.0)), )
if __name__ == '__main__': celery_app.start()