Merge pull request #169 from SDSRV-IDP/dev/status_code_to_SQS

Dev/status code to sqs
This commit is contained in:
Đỗ Xuân Tân 2024-12-25 14:50:47 +07:00 committed by GitHub Enterprise
commit faeb1dfc1d
4 changed files with 47 additions and 3 deletions

View File

@ -14,6 +14,7 @@ from pathlib import Path
import environ import environ
from django.urls import reverse_lazy from django.urls import reverse_lazy
from fwd_api.middleware.logging_request_response_middleware import TraceIDLogFilter from fwd_api.middleware.logging_request_response_middleware import TraceIDLogFilter
from fwd_api.middleware.response_monitor import ResponseMonitorMiddleware
# Build paths inside the project like this: BASE_DIR / 'subdir'. # Build paths inside the project like this: BASE_DIR / 'subdir'.
@ -49,6 +50,9 @@ S3_SECRET_KEY = env.str("S3_SECRET_KEY", "")
S3_BUCKET_NAME = env.str("S3_BUCKET_NAME", "ocr-data") S3_BUCKET_NAME = env.str("S3_BUCKET_NAME", "ocr-data")
REDIS_HOST = env.str("REDIS_HOST", "result-cache") REDIS_HOST = env.str("REDIS_HOST", "result-cache")
REDIS_PORT = env.int("REDIS_PORT", 6379) REDIS_PORT = env.int("REDIS_PORT", 6379)
AWS_REGION = env.str("AWS_REGION", "")
AWS_QUEUE_URL = env.str("AWS_QUEUE_URL", "")
INSTALLED_APPS = [ INSTALLED_APPS = [
@ -77,7 +81,8 @@ MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware', 'corsheaders.middleware.CorsMiddleware',
"whitenoise.middleware.WhiteNoiseMiddleware", "whitenoise.middleware.WhiteNoiseMiddleware",
"django.middleware.locale.LocaleMiddleware", "django.middleware.locale.LocaleMiddleware",
"fwd_api.middleware.logging_request_response_middleware.LoggingMiddleware" "fwd_api.middleware.logging_request_response_middleware.LoggingMiddleware",
'fwd_api.middleware.response_monitor.ResponseMonitorMiddleware',
] ]
LOCALE_PATHS = [ LOCALE_PATHS = [

View File

@ -20,6 +20,8 @@ from fwd_api.middleware.local_storage import get_current_trace_id
import csv import csv
import json import json
import copy import copy
import boto3
import datetime
from fwd_api.utils.accuracy import predict_result_to_ready from fwd_api.utils.accuracy import predict_result_to_ready
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
@ -33,6 +35,8 @@ s3_client = S3Util.MinioS3Client(
secret_key=settings.S3_SECRET_KEY, secret_key=settings.S3_SECRET_KEY,
bucket_name=settings.S3_BUCKET_NAME bucket_name=settings.S3_BUCKET_NAME
) )
sqs_client = boto3.client('sqs'+ str(uuid.uuid4()), region_name=settings.AWS_REGION) # keys are stored in the cridental
def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list: def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list:
try: try:
@ -289,4 +293,23 @@ def upload_obj_to_s3(byte_obj, s3_key):
obj = base64.b64decode(byte_obj) obj = base64.b64decode(byte_obj)
res = s3_client.update_object(s3_key, obj) res = s3_client.update_object(s3_key, obj)
else: else:
logger.info(f"S3 is not available, skipping,...") logger.info(f"S3 is not available, skipping,...")
@app.task(base=VerboseTask, name='send_response_to_sqs')
def send_response_to_sqs(response_data, status_code):
"""Send error responses to SQS for monitoring"""
try:
message_body = {
"status_code": status_code,
"timestamp": int(datetime.datetime.now().timestamp()),
"message": response_data
}
sqs_client.send_message(
QueueUrl=settings.AWS_QUEUE_URL,
MessageBody=json.dumps(message_body)
)
logger.info(f"Error response sent to SQS: {status_code}")
except Exception as e:
logger.error(f"Failed to send to SQS: {str(e)}")

View File

@ -47,6 +47,7 @@ app.conf.update({
Queue('csv_feedback'), Queue('csv_feedback'),
Queue('report'), Queue('report'),
Queue('report_2'), Queue('report_2'),
Queue('error_responses'),
], ],
'task_routes': { 'task_routes': {
'process_sap_invoice_result': {'queue': 'invoice_sap_rs'}, 'process_sap_invoice_result': {'queue': 'invoice_sap_rs'},
@ -65,7 +66,8 @@ app.conf.update({
'remove_local_file': {'queue': "remove_local_file"}, 'remove_local_file': {'queue': "remove_local_file"},
'csv_feedback': {'queue': "csv_feedback"}, 'csv_feedback': {'queue': "csv_feedback"},
'make_a_report': {'queue': "report"}, 'make_a_report': {'queue': "report"},
'make_a_report_2': {'queue': "report_2"}, 'make_a_report_2': {'queue': "report_2"},
'send_response_to_sqs': {'queue': 'error_responses'},
} }
}) })

View File

@ -0,0 +1,14 @@
from functools import partial
from django.utils.deprecation import MiddlewareMixin
from fwd_api.celery_worker.internal_task import send_response_to_sqs
class ResponseMonitorMiddleware(MiddlewareMixin):
def process_response(self, request, response):
"""Monitor responses and send errors to SQS"""
if response and (400 <= response.status_code < 600):
# Send async to avoid blocking response
send_response_to_sqs.delay(
response.data if hasattr(response, 'data') else str(response.content),
response.status_code
)
return response