From d56aab422737b4d4f22f5291677de52afb7ea31a Mon Sep 17 00:00:00 2001 From: TannedCung Date: Mon, 23 Dec 2024 11:12:03 +0700 Subject: [PATCH 1/2] Add: Bg task for sending failed response --- cope2n-api/fwd/settings.py | 7 ++++- .../fwd_api/celery_worker/internal_task.py | 26 ++++++++++++++++++- cope2n-api/fwd_api/celery_worker/worker.py | 4 ++- .../fwd_api/middleware/response_monitor.py | 14 ++++++++++ 4 files changed, 48 insertions(+), 3 deletions(-) create mode 100644 cope2n-api/fwd_api/middleware/response_monitor.py diff --git a/cope2n-api/fwd/settings.py b/cope2n-api/fwd/settings.py index 8f05d50..097a31c 100755 --- a/cope2n-api/fwd/settings.py +++ b/cope2n-api/fwd/settings.py @@ -14,6 +14,7 @@ from pathlib import Path import environ from django.urls import reverse_lazy from fwd_api.middleware.logging_request_response_middleware import TraceIDLogFilter +from fwd_api.middleware.response_monitor import ResponseMonitorMiddleware # Build paths inside the project like this: BASE_DIR / 'subdir'. @@ -49,6 +50,9 @@ S3_SECRET_KEY = env.str("S3_SECRET_KEY", "") S3_BUCKET_NAME = env.str("S3_BUCKET_NAME", "ocr-data") REDIS_HOST = env.str("REDIS_HOST", "result-cache") REDIS_PORT = env.int("REDIS_PORT", 6379) +AWS_REGION = env.str("AWS_REGION", "") +AWS_QUEUE_URL = env.str("AWS_QUEUE_URL", "") + INSTALLED_APPS = [ @@ -77,7 +81,8 @@ MIDDLEWARE = [ 'corsheaders.middleware.CorsMiddleware', "whitenoise.middleware.WhiteNoiseMiddleware", "django.middleware.locale.LocaleMiddleware", - "fwd_api.middleware.logging_request_response_middleware.LoggingMiddleware" + "fwd_api.middleware.logging_request_response_middleware.LoggingMiddleware", + 'fwd_api.middleware.response_monitor.ResponseMonitorMiddleware', ] LOCALE_PATHS = [ diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index 4ef99ba..84678fe 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -289,4 +289,28 @@ def upload_obj_to_s3(byte_obj, s3_key): obj = base64.b64decode(byte_obj) res = s3_client.update_object(s3_key, obj) else: - logger.info(f"S3 is not available, skipping,...") \ No newline at end of file + logger.info(f"S3 is not available, skipping,...") + +@app.task(base=VerboseTask, name='send_response_to_sqs') +def send_response_to_sqs(response_data, status_code): + """Send error responses to SQS for monitoring""" + import boto3 + import json + import datetime + + try: + sqs_client = boto3.client('sqs', region_name=settings.AWS_REGION) # keys are stored in the cridental + + message_body = { + "status_code": status_code, + "timestamp": int(datetime.datetime.now().timestamp()), + "message": response_data + } + + sqs_client.send_message( + QueueUrl=settings.AWS_QUEUE_URL, + MessageBody=json.dumps(message_body) + ) + logger.info(f"Error response sent to SQS: {status_code}") + except Exception as e: + logger.error(f"Failed to send to SQS: {str(e)}") \ No newline at end of file diff --git a/cope2n-api/fwd_api/celery_worker/worker.py b/cope2n-api/fwd_api/celery_worker/worker.py index 816a073..35b327f 100755 --- a/cope2n-api/fwd_api/celery_worker/worker.py +++ b/cope2n-api/fwd_api/celery_worker/worker.py @@ -47,6 +47,7 @@ app.conf.update({ Queue('csv_feedback'), Queue('report'), Queue('report_2'), + Queue('error_responses'), ], 'task_routes': { 'process_sap_invoice_result': {'queue': 'invoice_sap_rs'}, @@ -65,7 +66,8 @@ app.conf.update({ 'remove_local_file': {'queue': "remove_local_file"}, 'csv_feedback': {'queue': "csv_feedback"}, 'make_a_report': {'queue': "report"}, - 'make_a_report_2': {'queue': "report_2"}, + 'make_a_report_2': {'queue': "report_2"}, + 'send_response_to_sqs': {'queue': 'error_responses'}, } }) diff --git a/cope2n-api/fwd_api/middleware/response_monitor.py b/cope2n-api/fwd_api/middleware/response_monitor.py new file mode 100644 index 0000000..9bbcf8a --- /dev/null +++ b/cope2n-api/fwd_api/middleware/response_monitor.py @@ -0,0 +1,14 @@ +from functools import partial +from django.utils.deprecation import MiddlewareMixin +from fwd_api.celery_worker.internal_task import send_response_to_sqs + +class ResponseMonitorMiddleware(MiddlewareMixin): + def process_response(self, request, response): + """Monitor responses and send errors to SQS""" + if response and (400 <= response.status_code < 600): + # Send async to avoid blocking response + send_response_to_sqs.delay( + response.data if hasattr(response, 'data') else str(response.content), + response.status_code + ) + return response \ No newline at end of file From c3f5fde43eb611863ba94f0d063ab4e6f35d38d1 Mon Sep 17 00:00:00 2001 From: TannedCung Date: Mon, 23 Dec 2024 13:02:35 +0700 Subject: [PATCH 2/2] Fix: share client name --- cope2n-api/fwd_api/celery_worker/internal_task.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index 84678fe..fc9491a 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -20,6 +20,8 @@ from fwd_api.middleware.local_storage import get_current_trace_id import csv import json import copy +import boto3 +import datetime from fwd_api.utils.accuracy import predict_result_to_ready from celery.utils.log import get_task_logger @@ -33,6 +35,8 @@ s3_client = S3Util.MinioS3Client( secret_key=settings.S3_SECRET_KEY, bucket_name=settings.S3_BUCKET_NAME ) +sqs_client = boto3.client('sqs'+ str(uuid.uuid4()), region_name=settings.AWS_REGION) # keys are stored in the cridental + def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list: try: @@ -294,13 +298,8 @@ def upload_obj_to_s3(byte_obj, s3_key): @app.task(base=VerboseTask, name='send_response_to_sqs') def send_response_to_sqs(response_data, status_code): """Send error responses to SQS for monitoring""" - import boto3 - import json - import datetime - try: - sqs_client = boto3.client('sqs', region_name=settings.AWS_REGION) # keys are stored in the cridental - + try: message_body = { "status_code": status_code, "timestamp": int(datetime.datetime.now().timestamp()),