[docs]defasync_task(app:Celery,*args:Any,**kwargs:Any)->Task[Any,Any]:"""Decorator to convert an async function into a Celery task. :param app: The Celery app instance. :param args: Positional arguments to pass to the Celery task decorator. :param kwargs: Keyword arguments to pass to the Celery task decorator. :return: A Celery task decorator that wraps the async function. """# taken from: https://github.com/celery/celery/issues/6552def_decorator(func:Callable[P,Coroutine[Any,Any,R]])->Task[Any,Any]:# if we are running tests, we don't want this to be converted to a sync# functionif'PYTEST_VERSION'inos.environ:# pragma: no branch# give the function it's .s attribute for testingfunc.s=func# type: ignore[attr-defined]returnfunc# type: ignore[return-value]sync_call=sync.AsyncToSync(func)@app.task(*args,**kwargs)@wraps(func)def_decorated(*args:P.args,**kwargs:P.kwargs)->R:# pragma: no coverreturnsync_call(*args,**kwargs)return_decorated# TODO: remove this once we have the types figured out correctlyreturn_decorator# type: ignore[return-value]
celery_app=Celery('d2r-api',broker=os.environ['CELERY_BROKER_URL'],backend=os.environ['CELERY_BROKER_URL'],task_soft_time_limit=int(os.environ['QUEUE_SOFT_TIME_LIMIT']),broker_connection_retry_on_startup=True,include=['app.tasks','app.tc_ingester'],result_expires=timedelta(seconds=600),# expire after 10 minutes)celery_app.conf.timezone='UTC'celery_app.set_default()
[docs]@signals.celeryd_init.connectdefinit_sentry(**_kwargs:Any)->None:# pragma: no cover"""Initialize Sentry for Celery tasks."""sentry_sdk.init(dsn=os.environ.get('MONITOR_SENTRY_DSN'),integrations=[CeleryIntegration()],traces_sample_rate=float(os.environ.get('SENTRY_SAMPLE_RATE',0.0)),)