diff --git a/.env_prod b/.env_prod new file mode 100644 index 0000000..453c5a3 --- /dev/null +++ b/.env_prod @@ -0,0 +1,40 @@ +MEDIA_ROOT=/app/media +# DATABASE django setup +DB_ENGINE=django.db.backends.postgresql_psycopg2 +DB_SCHEMA=sbt2 +DB_USER=sbt +DB_PASSWORD=sbtCH240 +DB_HOST=sbt.cxetpslawu4p.ap-southeast-1.rds.amazonaws.com +DB_PUBLIC_PORT=5432 +DB_INTERNAL_PORT=5432 + +DEBUG=TRUE +CORS_ALLOWED_ORIGINS=* +CTEL_KEY=fTjWnZr4u7x!A%D*G-KaPdRgUkXp2s5v +DB_INTERNAL_KEY=7LYk-iaWTFPqsZHIE5GHuv41S0c_Vlb0ZVc-BnsEZqQ= +ALLOWED_HOSTS='*' +BROKER_URL=amqp://test:test@rabbitmq-manulife-sbt:5672 +BASE_URL=http://be-ctel-sbt:9000 +BASE_UI_URL=http://fe-sbt:9801 +HOST_MEDIA_FOLDER=./media +GID=1000 +UID=198 +SECRET_KEY=999999999999999999999999999999999999999999999999999999999999999999 +RABBITMQ_DEFAULT_USER=test +RABBITMQ_DEFAULT_PASS=test +BASE_PORT=9000 +S3_ACCESS_KEY=AKIA3AFPFVWZD77UACHE +S3_SECRET_KEY=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 +S3_BUCKET_NAME=ocr-sds + +AUTH_TOKEN_LIFE_TIME=168 +IMAGE_TOKEN_LIFE_TIME=168 +INTERNAL_SDS_KEY=TannedCung +FI_USER_NAME=sbt +FI_PASSWORD=7Eg4AbWIXDnufgn + +# Front end env variables +# VITE_PORT=80 +# VITE_PROXY=http://0.0.0.0 +# VITE_API_BASE_URL=http://0.0.0.0:8000 +# PORT=8002 \ No newline at end of file diff --git a/cope2n-api/fwd/settings.py b/cope2n-api/fwd/settings.py index 514ff37..13740da 100755 --- a/cope2n-api/fwd/settings.py +++ b/cope2n-api/fwd/settings.py @@ -59,6 +59,7 @@ INSTALLED_APPS = [ 'drf_spectacular', 'drf_spectacular_sidecar', # required for Django collectstatic discovery 'corsheaders', + "django_celery_beat", ] @@ -207,6 +208,17 @@ BROKER_URL = env.str("BROKER_URL", default="amqp://test:test@107.120.70.226:5672 CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 +CELERY_BEAT_SCHEDULE = { + 'clean_local_file': { + 'task': 'fwd_api.celery_worker.internal_task.clean_local_files', + 'schedule': 3600.0, + 'args': (), + 'options': { + 'expires': 120.0, + }, + }, +} + MAX_UPLOAD_SIZE_OF_A_FILE = 100 * 1024 * 1024 # 100 MB MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST = 100 * 1024 * 1024 # 100 MB MAX_UPLOAD_FILES_IN_A_REQUEST = 5 diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index 2ef48b6..4de39e3 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -236,7 +236,6 @@ class CtelViewSet(viewsets.ViewSet): compact_files.append(this_file) c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files)) - time_limit = 120 start_time = time.time() while True: diff --git a/cope2n-api/fwd_api/bg_tasks/__init__.py b/cope2n-api/fwd_api/bg_tasks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cope2n-api/fwd_api/bg_tasks/clean_local_files.py b/cope2n-api/fwd_api/bg_tasks/clean_local_files.py new file mode 100644 index 0000000..5fe41a1 --- /dev/null +++ b/cope2n-api/fwd_api/bg_tasks/clean_local_files.py @@ -0,0 +1,19 @@ +from django.conf import settings +from django.utils import timezone + +from fwd_api.models import SubscriptionRequest +from .cron_job import CronJob +from fwd_api.celery_worker.worker import app as celery_app + +class FileCleaningWoker(CronJob): + """Clean old files to save disk space""" + + def run(self): + print("-----------") + + + +@celery_app.task(time_limit=5000) +def clean_media(): + worker = FileCleaningWoker("file_cleaning_worker") + worker.run() \ No newline at end of file diff --git a/cope2n-api/fwd_api/bg_tasks/cron_job.py b/cope2n-api/fwd_api/bg_tasks/cron_job.py new file mode 100644 index 0000000..639462a --- /dev/null +++ b/cope2n-api/fwd_api/bg_tasks/cron_job.py @@ -0,0 +1,31 @@ +import traceback + +from celery.utils.log import get_task_logger + + +class CronJob: + def __init__( + self, + name, + ): + self.name = name + self.logger = self.get_logger() + + def get_logger(self): + """Create/Get the logger for this task""" + logger = get_task_logger(self.name) + return logger + + def safe_run(self): + """Create a logger and execute run() + in a try/except block to prevent crashing + """ + try: + self.run() + except Exception as e: + self.logger.error("Failed to run cron job in safe mode.") + self.logger.error(e) + traceback.print_exc() + + def run(self): + raise NotImplementedError("Not implemented error") \ No newline at end of file diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index e3615f5..f9c54cf 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -107,11 +107,15 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): @app.task(name='upload_file_to_s3') -def upload_file_to_s3(local_file_path, s3_key): +def upload_file_to_s3(local_file_path, s3_key, request_id): if s3_client.s3_client is not None: - res = s3_client.upload_file(local_file_path, s3_key) - if res != None and res["ResponseMetadata"]["HTTPStatusCode"] == 200: - os.remove(local_file_path) + try: + s3_client.upload_file(local_file_path, s3_key) + sub_request = SubscriptionRequest.objects.filter(request_id=request_id) + sub_request.S3_uploaded = True + sub_request.save() + except Exception as e: + return else: logger.info(f"S3 is not available, skipping,...") diff --git a/cope2n-api/fwd_api/celery_worker/worker.py b/cope2n-api/fwd_api/celery_worker/worker.py index 70d302c..b8530fc 100755 --- a/cope2n-api/fwd_api/celery_worker/worker.py +++ b/cope2n-api/fwd_api/celery_worker/worker.py @@ -16,6 +16,17 @@ app: Celery = Celery( broker_transport_options={'confirm_publish': False}, ) +app.config_from_object("django.conf:settings", namespace="CELERY") +app.autodiscover_tasks() + +@app.on_after_finalize.connect +def setup_periodic_tasks(sender, **kwargs): + from fwd_api.bg_tasks.clean_local_files import clean_media + sender.add_periodic_task( + 10.0, clean_media.s(), expires=120.0 + ) + + app.conf.update({ 'task_queues': [ diff --git a/cope2n-api/fwd_api/models/SubscriptionRequest.py b/cope2n-api/fwd_api/models/SubscriptionRequest.py index ea6d44c..1073cfe 100755 --- a/cope2n-api/fwd_api/models/SubscriptionRequest.py +++ b/cope2n-api/fwd_api/models/SubscriptionRequest.py @@ -18,4 +18,5 @@ class SubscriptionRequest(models.Model): status = models.IntegerField() # 1: Processing(Pending) 2: PredictCompleted 3: ReturnCompleted subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE) created_at = models.DateTimeField(default=timezone.now, db_index=True) - updated_at = models.DateTimeField(auto_now=True) \ No newline at end of file + updated_at = models.DateTimeField(auto_now=True) + S3_uploaded = models.BooleanField(default=False) \ No newline at end of file diff --git a/cope2n-api/fwd_api/utils/file.py b/cope2n-api/fwd_api/utils/file.py index 29c15b9..5e83d0a 100644 --- a/cope2n-api/fwd_api/utils/file.py +++ b/cope2n-api/fwd_api/utils/file.py @@ -156,9 +156,10 @@ def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: Temporar def save_to_S3(file_name, rq, local_file_path): try: file_path = get_folder_path(rq) + request_id = rq.request_id assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id" s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name) - c_connector.upload_file_to_s3((local_file_path, s3_key)) + c_connector.upload_file_to_s3((local_file_path, s3_key, request_id)) return s3_key except Exception as e: print(f"[ERROR]: {e}") diff --git a/cope2n-api/requirements.txt b/cope2n-api/requirements.txt index 327bea3..c204228 100755 --- a/cope2n-api/requirements.txt +++ b/cope2n-api/requirements.txt @@ -49,4 +49,5 @@ djangorestframework-xml==2.0.0 boto3==1.29.7 imagesize==1.4.1 pdf2image==1.16.3 -redis==5.0.1 \ No newline at end of file +redis==5.0.1 +django-celery-beat==2.5.0 \ No newline at end of file diff --git a/deploy_images.sh b/deploy_images.sh index 688822a..9be2183 100755 --- a/deploy_images.sh +++ b/deploy_images.sh @@ -21,4 +21,4 @@ docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:${tag} cp ./docker-compose.yml ./docker-compose_${tag}.yml sed -i "s/{{tag}}/$tag/g" ./docker-compose_${tag}.yml -cp .env .env_${tag} +cp .env_prod .env_${tag} diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 96e890a..9b0f3e4 100755 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -76,7 +76,7 @@ services: volumes: - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - BE_static:/app/static - # - ./cope2n-api:/app + - ./cope2n-api:/app working_dir: /app depends_on: db-sbt: @@ -162,7 +162,7 @@ services: condition: service_started volumes: - ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT} - # - ./cope2n-api:/app + - ./cope2n-api:/app working_dir: /app command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5" diff --git a/docker-compose.yml b/docker-compose.yml index 963434c..33afaa1 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -135,8 +135,6 @@ services: networks: - ctel-sbt depends_on: - db-sbt: - condition: service_started rabbitmq-sbt: condition: service_started volumes: diff --git a/speedtest_sync.py b/speedtest_sync.py index 449f451..a24b839 100644 --- a/speedtest_sync.py +++ b/speedtest_sync.py @@ -1,10 +1,12 @@ import requests import time +import random import argparse import multiprocessing import tqdm -import random import traceback +from requests_toolbelt import MultipartEncoderMonitor +import requests parser = argparse.ArgumentParser() @@ -30,23 +32,48 @@ try: except: print("Failed to login") print(response.content) + # After the login, store the token in the memory (RAM) or DB # Re-login to issue a new token after 6 days. # ================================================================= def process_file(data): - files, token = data + _, token = data + files = [] + if random.random() < 0.2: + files = [ + ('invoice_file', ("invoice.jpg", open("test_samples/sbt/invoice.jpg", "rb"), 'application/octet-stream')), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')), + ] + elif random.random() < 0.6: + files = [ + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ] + else: + files = [ + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')), + ('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')), + ] num_files = len(files) - files.append( - ('processType', (None, 12)), - ) + files.append(('processType', '12')) # ================================================================= # UPLOAD THE FILE start_time = time.time() + end_of_upload_time = 0 + def my_callback(monitor): + nonlocal end_of_upload_time + if monitor.bytes_read == monitor.len: + end_of_upload_time = time.time() + m = MultipartEncoderMonitor.from_fields( + fields=files, + callback=my_callback + ) try: response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={ 'Authorization': token, - }, files=files, timeout=300) + 'Content-Type': m.content_type + }, data=m, timeout=300) except requests.exceptions.Timeout: print("Timeout occurred while uploading") return { @@ -68,11 +95,19 @@ def process_file(data): "num_files": 0, } end_time = time.time() - upload_time = end_time - start_time + upload_time = end_of_upload_time - start_time # ================================================================= try: data = response.json() + if len(data["files"]) != num_files: + return { + "success": False, + "status": "missing_file", + "upload_time": 0, + "process_time": 0, + "num_files": 0, + } data.pop("files", None) print(data) except: @@ -88,36 +123,14 @@ def process_file(data): "success": True, "status": 200, "upload_time": upload_time, - "process_time": upload_time, + "process_time": time.time() - start_time - upload_time, "num_files": num_files, } - -invoice_files = [ - ('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())), -] -# invoice_files = [ -# ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())), -# ] -imei_files = [ - ('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())), - ('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())), -] -def get_imei_files(): - num_files = random.randint(1, len(imei_files) + 1) - print("Num imeis", num_files) - files = imei_files[:num_files] - # print("Num of imei files:", len(files)) - return files -def get_files(): - return invoice_files + get_imei_files() def gen_input(num_input): for _ in range(num_input): - yield (get_files(), token) + yield (None, token) pool = multiprocessing.Pool(processes=args.num_workers) results = [] for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=args.num_requests)), total=args.num_requests): @@ -126,7 +139,6 @@ for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=ar print("## TEST REPORT #################################") print("Number of requests: {}".format(args.num_requests)) print("Number of concurrent requests: {}".format(args.num_workers)) -print("Number of files: 1 invoice, 1-5 imei files (random)") print("--------------------------------------") print("SUCCESS RATE") counter = {} @@ -142,7 +154,8 @@ if len(uploading_time) == 0: print("No valid uploading time") print("Check the results!") processing_time = [x["process_time"] for x in results if x["success"]] -print("Uploading + Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) +print("Uploading time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(uploading_time) / len(uploading_time), min(uploading_time), max(uploading_time))) +print("Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time))) print("--------------------------------------") print("TIME BY IMAGE") uploading_time = [x["upload_time"] for x in results if x["success"]] @@ -151,3 +164,4 @@ num_images = sum(x["num_files"] for x in results if x["success"]) print("Total images:", num_images) print("Uploading + Processing time: {:.3f}s".format(sum(processing_time) / num_images)) print("--------------------------------------") +