from celery import Task from celery.utils.log import get_task_logger from utils.logging.local_storage import get_current_trace_id, set_current_trace_id logger = get_task_logger(__name__) class VerboseTask(Task): abstract = True def on_failure(self, exc, task_id, args, kwargs, einfo): # Task failed. What do you want to do? logger.error(f'FAILURE: Task: {self.name} - {task_id} | Task raised an exception: {exc}') def on_success(self, retval, task_id, args, kwargs): logger.info(f"SUCCESS: Task: {self.name} - {task_id} | retval: {retval} | args: {args} | kwargs: {kwargs}") def before_start(self, task_id, args, kwargs): trace_id = args[-1] args.pop(-1) set_current_trace_id(trace_id) logger.info(f"BEFORE_START: Task: {self.name} - {task_id} | args: {args} | kwargs: {kwargs}")