Add: delay task for local file removal

This commit is contained in:
dx-tan 2023-12-25 18:48:50 +07:00
parent 3db520d477
commit 4a2e73c3c5
12 changed files with 60 additions and 73 deletions

1
.gitignore vendored
View File

@ -32,3 +32,4 @@ demo-ocr/
logs/
docker-compose_.yml
cope2n-ai-fi/Dockerfile_old_work
*.sql

View File

@ -59,7 +59,6 @@ INSTALLED_APPS = [
'drf_spectacular',
'drf_spectacular_sidecar', # required for Django collectstatic discovery
'corsheaders',
"django_celery_beat",
]
@ -208,17 +207,6 @@ BROKER_URL = env.str("BROKER_URL", default="amqp://test:test@107.120.70.226:5672
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_BEAT_SCHEDULE = {
'clean_local_file': {
'task': 'fwd_api.celery_worker.internal_task.clean_local_files',
'schedule': 3600.0,
'args': (),
'options': {
'expires': 120.0,
},
},
}
MAX_UPLOAD_SIZE_OF_A_FILE = 100 * 1024 * 1024 # 100 MB
MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST = 100 * 1024 * 1024 # 100 MB
MAX_UPLOAD_FILES_IN_A_REQUEST = 5

View File

@ -1,19 +0,0 @@
from django.conf import settings
from django.utils import timezone
from fwd_api.models import SubscriptionRequest
from .cron_job import CronJob
from fwd_api.celery_worker.worker import app as celery_app
class FileCleaningWoker(CronJob):
"""Clean old files to save disk space"""
def run(self):
print("-----------")
@celery_app.task(time_limit=5000)
def clean_media():
worker = FileCleaningWoker("file_cleaning_worker")
worker.run()

View File

@ -1,31 +0,0 @@
import traceback
from celery.utils.log import get_task_logger
class CronJob:
def __init__(
self,
name,
):
self.name = name
self.logger = self.get_logger()
def get_logger(self):
"""Create/Get the logger for this task"""
logger = get_task_logger(self.name)
return logger
def safe_run(self):
"""Create a logger and execute run()
in a try/except block to prevent crashing
"""
try:
self.run()
except Exception as e:
self.logger.error("Failed to run cron job in safe mode.")
self.logger.error(e)
traceback.print_exc()
def run(self):
raise NotImplementedError("Not implemented error")

View File

@ -29,9 +29,9 @@ class CeleryConnector:
'process_manulife_invoice': {'queue': "invoice_manulife"},
'process_sbt_invoice': {'queue': "invoice_sbt"},
'do_pdf': {'queue': "do_pdf"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"},
'upload_obj_to_s3': {'queue': "upload_obj_to_s3"},
'remove_local_file': {'queue': "remove_local_file"},
}
app = Celery(
@ -45,6 +45,8 @@ class CeleryConnector:
return self.send_task('upload_file_to_s3', args)
def upload_obj_to_s3(self, args):
return self.send_task('upload_obj_to_s3', args)
def remove_local_file(self, args):
return self.send_task('remove_local_file', args, countdown=600) # nearest execution of this task in 10 minutes
def process_fi(self, args):
return self.send_task('process_fi_invoice', args)
def process_fi_result(self, args):
@ -86,10 +88,9 @@ class CeleryConnector:
def process_invoice_sbt(self, args):
return self.send_task('process_sbt_invoice', args)
def send_task(self, name=None, args=None):
def send_task(self, name=None, args=None, countdown=None):
if name not in self.task_routes or 'queue' not in self.task_routes[name]:
raise GeneralException("System")
return self.app.send_task(name, args, queue=self.task_routes[name]['queue'], expires=300)
return self.app.send_task(name, args, queue=self.task_routes[name]['queue'], expires=300, countdown=countdown)
c_connector = CeleryConnector()

View File

@ -123,14 +123,24 @@ def upload_file_to_s3(local_file_path, s3_key, request_id):
if s3_client.s3_client is not None:
try:
s3_client.upload_file(local_file_path, s3_key)
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)[0]
sub_request.S3_uploaded = True
sub_request.save()
except Exception as e:
logger.error(f"Unable to set S3: {e}")
print(f"Unable to set S3: {e}")
return
else:
logger.info(f"S3 is not available, skipping,...")
@app.task(name='remove_local_file')
def remove_local_file(local_file_path, request_id):
print(f"[INFO] Removing local file: {local_file_path}, ...")
try:
os.remove(local_file_path)
except Exception as e:
logger.info(f"Unable to remove local file: {e}")
@app.task(name='upload_obj_to_s3')
def upload_obj_to_s3(byte_obj, s3_key):
if s3_client.s3_client is not None:

View File

@ -37,6 +37,8 @@ app.conf.update({
Queue('do_pdf'),
Queue('upload_file_to_s3'),
Queue('upload_obj_to_s3'),
Queue('remove_local_file'),
],
'task_routes': {
@ -51,6 +53,8 @@ app.conf.update({
'do_pdf': {'queue': "do_pdf"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"},
'upload_obj_to_s3': {'queue': "upload_obj_to_s3"},
'upload_file_to_s3': {'queue': "upload_file_to_s3"},
'remove_local_file': {'queue': "remove_local_file"},
}
})

View File

@ -0,0 +1,18 @@
# Generated by Django 4.1.3 on 2023-12-22 10:10
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0157_alter_subscriptionrequest_created_at'),
]
operations = [
migrations.AddField(
model_name='subscriptionrequest',
name='S3_uploaded',
field=models.BooleanField(default=False),
),
]

View File

@ -0,0 +1,14 @@
# Generated by Django 4.1.3 on 2023-12-25 07:39
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('fwd_api', '0158_subscriptionrequest_s3_uploaded'),
('fwd_api', '0161_alter_subscriptionrequest_ai_inference_start_time_and_more'),
]
operations = [
]

View File

@ -160,6 +160,7 @@ def save_to_S3(file_name, rq, local_file_path):
assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id"
s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name)
c_connector.upload_file_to_s3((local_file_path, s3_key, request_id))
c_connector.remove_local_file((local_file_path, request_id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")

View File

@ -81,11 +81,11 @@ services:
depends_on:
db-sbt:
condition: service_started
# command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input &&
# python manage.py migrate &&
# python manage.py compilemessages &&
# gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod
command: bash -c "tail -f > /dev/null"
command: sh -c "chmod -R 777 /app/static; sleep 5; python manage.py collectstatic --no-input &&
python manage.py migrate &&
python manage.py compilemessages &&
gunicorn fwd.asgi:application -k uvicorn.workers.UvicornWorker --timeout 300 -b 0.0.0.0:9000" # pre-makemigrations on prod
# command: bash -c "tail -f > /dev/null"
minio:
image: minio/minio