diff --git a/.gitignore b/.gitignore index de72e23..d1da6cb 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,4 @@ demo-ocr/ logs/ docker-compose_.yml cope2n-ai-fi/Dockerfile_old_work +*.sql diff --git a/cope2n-api/fwd/settings.py b/cope2n-api/fwd/settings.py index 13740da..514ff37 100755 --- a/cope2n-api/fwd/settings.py +++ b/cope2n-api/fwd/settings.py @@ -59,7 +59,6 @@ INSTALLED_APPS = [ 'drf_spectacular', 'drf_spectacular_sidecar', # required for Django collectstatic discovery 'corsheaders', - "django_celery_beat", ] @@ -208,17 +207,6 @@ BROKER_URL = env.str("BROKER_URL", default="amqp://test:test@107.120.70.226:5672 CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60 -CELERY_BEAT_SCHEDULE = { - 'clean_local_file': { - 'task': 'fwd_api.celery_worker.internal_task.clean_local_files', - 'schedule': 3600.0, - 'args': (), - 'options': { - 'expires': 120.0, - }, - }, -} - MAX_UPLOAD_SIZE_OF_A_FILE = 100 * 1024 * 1024 # 100 MB MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST = 100 * 1024 * 1024 # 100 MB MAX_UPLOAD_FILES_IN_A_REQUEST = 5 diff --git a/cope2n-api/fwd_api/bg_tasks/__init__.py b/cope2n-api/fwd_api/bg_tasks/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/cope2n-api/fwd_api/bg_tasks/clean_local_files.py b/cope2n-api/fwd_api/bg_tasks/clean_local_files.py deleted file mode 100644 index 5fe41a1..0000000 --- a/cope2n-api/fwd_api/bg_tasks/clean_local_files.py +++ /dev/null @@ -1,19 +0,0 @@ -from django.conf import settings -from django.utils import timezone - -from fwd_api.models import SubscriptionRequest -from .cron_job import CronJob -from fwd_api.celery_worker.worker import app as celery_app - -class FileCleaningWoker(CronJob): - """Clean old files to save disk space""" - - def run(self): - print("-----------") - - - -@celery_app.task(time_limit=5000) -def clean_media(): - worker = FileCleaningWoker("file_cleaning_worker") - worker.run() \ No newline at end of file diff --git a/cope2n-api/fwd_api/bg_tasks/cron_job.py b/cope2n-api/fwd_api/bg_tasks/cron_job.py deleted file mode 100644 index 639462a..0000000 --- a/cope2n-api/fwd_api/bg_tasks/cron_job.py +++ /dev/null @@ -1,31 +0,0 @@ -import traceback - -from celery.utils.log import get_task_logger - - -class CronJob: - def __init__( - self, - name, - ): - self.name = name - self.logger = self.get_logger() - - def get_logger(self): - """Create/Get the logger for this task""" - logger = get_task_logger(self.name) - return logger - - def safe_run(self): - """Create a logger and execute run() - in a try/except block to prevent crashing - """ - try: - self.run() - except Exception as e: - self.logger.error("Failed to run cron job in safe mode.") - self.logger.error(e) - traceback.print_exc() - - def run(self): - raise NotImplementedError("Not implemented error") \ No newline at end of file diff --git a/cope2n-api/fwd_api/celery_worker/client_connector.py b/cope2n-api/fwd_api/celery_worker/client_connector.py index 10557ed..db411a6 100755 --- a/cope2n-api/fwd_api/celery_worker/client_connector.py +++ b/cope2n-api/fwd_api/celery_worker/client_connector.py @@ -29,9 +29,9 @@ class CeleryConnector: 'process_manulife_invoice': {'queue': "invoice_manulife"}, 'process_sbt_invoice': {'queue': "invoice_sbt"}, 'do_pdf': {'queue': "do_pdf"}, - 'upload_file_to_s3': {'queue': "upload_file_to_s3"}, + 'upload_file_to_s3': {'queue': "upload_file_to_s3"}, 'upload_obj_to_s3': {'queue': "upload_obj_to_s3"}, - + 'remove_local_file': {'queue': "remove_local_file"}, } app = Celery( @@ -45,6 +45,8 @@ class CeleryConnector: return self.send_task('upload_file_to_s3', args) def upload_obj_to_s3(self, args): return self.send_task('upload_obj_to_s3', args) + def remove_local_file(self, args): + return self.send_task('remove_local_file', args, countdown=600) # nearest execution of this task in 10 minutes def process_fi(self, args): return self.send_task('process_fi_invoice', args) def process_fi_result(self, args): @@ -86,10 +88,9 @@ class CeleryConnector: def process_invoice_sbt(self, args): return self.send_task('process_sbt_invoice', args) - def send_task(self, name=None, args=None): + def send_task(self, name=None, args=None, countdown=None): if name not in self.task_routes or 'queue' not in self.task_routes[name]: raise GeneralException("System") - return self.app.send_task(name, args, queue=self.task_routes[name]['queue'], expires=300) - + return self.app.send_task(name, args, queue=self.task_routes[name]['queue'], expires=300, countdown=countdown) c_connector = CeleryConnector() diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index 7ae29ac..d1376d5 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -123,14 +123,24 @@ def upload_file_to_s3(local_file_path, s3_key, request_id): if s3_client.s3_client is not None: try: s3_client.upload_file(local_file_path, s3_key) - sub_request = SubscriptionRequest.objects.filter(request_id=request_id) + sub_request = SubscriptionRequest.objects.filter(request_id=request_id)[0] sub_request.S3_uploaded = True sub_request.save() except Exception as e: + logger.error(f"Unable to set S3: {e}") + print(f"Unable to set S3: {e}") return else: logger.info(f"S3 is not available, skipping,...") +@app.task(name='remove_local_file') +def remove_local_file(local_file_path, request_id): + print(f"[INFO] Removing local file: {local_file_path}, ...") + try: + os.remove(local_file_path) + except Exception as e: + logger.info(f"Unable to remove local file: {e}") + @app.task(name='upload_obj_to_s3') def upload_obj_to_s3(byte_obj, s3_key): if s3_client.s3_client is not None: diff --git a/cope2n-api/fwd_api/celery_worker/worker.py b/cope2n-api/fwd_api/celery_worker/worker.py index b8530fc..ee497cd 100755 --- a/cope2n-api/fwd_api/celery_worker/worker.py +++ b/cope2n-api/fwd_api/celery_worker/worker.py @@ -37,6 +37,8 @@ app.conf.update({ Queue('do_pdf'), Queue('upload_file_to_s3'), Queue('upload_obj_to_s3'), + Queue('remove_local_file'), + ], 'task_routes': { @@ -51,6 +53,8 @@ app.conf.update({ 'do_pdf': {'queue': "do_pdf"}, 'upload_file_to_s3': {'queue': "upload_file_to_s3"}, 'upload_obj_to_s3': {'queue': "upload_obj_to_s3"}, + 'upload_file_to_s3': {'queue': "upload_file_to_s3"}, + 'remove_local_file': {'queue': "remove_local_file"}, } }) diff --git a/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_s3_uploaded.py b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_s3_uploaded.py new file mode 100644 index 0000000..97b3810 --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0158_subscriptionrequest_s3_uploaded.py @@ -0,0 +1,18 @@ +# Generated by Django 4.1.3 on 2023-12-22 10:10 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0157_alter_subscriptionrequest_created_at'), + ] + + operations = [ + migrations.AddField( + model_name='subscriptionrequest', + name='S3_uploaded', + field=models.BooleanField(default=False), + ), + ] diff --git a/cope2n-api/fwd_api/migrations/0162_merge_20231225_1439.py b/cope2n-api/fwd_api/migrations/0162_merge_20231225_1439.py new file mode 100644 index 0000000..b8d5f4b --- /dev/null +++ b/cope2n-api/fwd_api/migrations/0162_merge_20231225_1439.py @@ -0,0 +1,14 @@ +# Generated by Django 4.1.3 on 2023-12-25 07:39 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('fwd_api', '0158_subscriptionrequest_s3_uploaded'), + ('fwd_api', '0161_alter_subscriptionrequest_ai_inference_start_time_and_more'), + ] + + operations = [ + ] diff --git a/cope2n-api/fwd_api/utils/file.py b/cope2n-api/fwd_api/utils/file.py index 5e83d0a..4a94b39 100644 --- a/cope2n-api/fwd_api/utils/file.py +++ b/cope2n-api/fwd_api/utils/file.py @@ -160,6 +160,7 @@ def save_to_S3(file_name, rq, local_file_path): assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id" s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name) c_connector.upload_file_to_s3((local_file_path, s3_key, request_id)) + c_connector.remove_local_file((local_file_path, request_id)) return s3_key except Exception as e: print(f"[ERROR]: {e}") diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index db1120c..45edf08 100755 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -81,11 +81,11 @@ services: depends_on: db-sbt: condition: service_started - # command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input && - # python manage.py migrate && - # python manage.py compilemessages && - # gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod - command: bash -c "tail -f > /dev/null" + command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input && + python manage.py migrate && + python manage.py compilemessages && + gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod + # command: bash -c "tail -f > /dev/null" minio: image: minio/minio