Add: S3 status

This commit is contained in:
dx-tan 2023-12-25 10:32:09 +07:00
parent 9e570b8275
commit a0396c7e7c
15 changed files with 177 additions and 46 deletions

40
.env_prod Normal file
View File

@ -0,0 +1,40 @@
MEDIA_ROOT=/app/media
# DATABASE django setup
DB_ENGINE=django.db.backends.postgresql_psycopg2
DB_SCHEMA=sbt2
DB_USER=sbt
DB_PASSWORD=sbtCH240
DB_HOST=sbt.cxetpslawu4p.ap-southeast-1.rds.amazonaws.com
DB_PUBLIC_PORT=5432
DB_INTERNAL_PORT=5432
DEBUG=TRUE
CORS_ALLOWED_ORIGINS=*
CTEL_KEY=fTjWnZr4u7x!A%D*G-KaPdRgUkXp2s5v
DB_INTERNAL_KEY=7LYk-iaWTFPqsZHIE5GHuv41S0c_Vlb0ZVc-BnsEZqQ=
ALLOWED_HOSTS='*'
BROKER_URL=amqp://test:test@rabbitmq-manulife-sbt:5672
BASE_URL=http://be-ctel-sbt:9000
BASE_UI_URL=http://fe-sbt:9801
HOST_MEDIA_FOLDER=./media
GID=1000
UID=198
SECRET_KEY=999999999999999999999999999999999999999999999999999999999999999999
RABBITMQ_DEFAULT_USER=test
RABBITMQ_DEFAULT_PASS=test
BASE_PORT=9000
S3_ACCESS_KEY=AKIA3AFPFVWZD77UACHE
S3_SECRET_KEY=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
S3_BUCKET_NAME=ocr-sds
AUTH_TOKEN_LIFE_TIME=168
IMAGE_TOKEN_LIFE_TIME=168
INTERNAL_SDS_KEY=TannedCung
FI_USER_NAME=sbt
FI_PASSWORD=7Eg4AbWIXDnufgn
# Front end env variables
# VITE_PORT=80
# VITE_PROXY=http://0.0.0.0
# VITE_API_BASE_URL=http://0.0.0.0:8000
# PORT=8002

View File

@ -59,6 +59,7 @@ INSTALLED_APPS = [
'drf_spectacular',
'drf_spectacular_sidecar', # required for Django collectstatic discovery
'corsheaders',
"django_celery_beat",
]
@ -207,6 +208,17 @@ BROKER_URL = env.str("BROKER_URL", default="amqp://test:test@107.120.70.226:5672
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60
CELERY_BEAT_SCHEDULE = {
'clean_local_file': {
'task': 'fwd_api.celery_worker.internal_task.clean_local_files',
'schedule': 3600.0,
'args': (),
'options': {
'expires': 120.0,
},
},
}
MAX_UPLOAD_SIZE_OF_A_FILE = 100 * 1024 * 1024 # 100 MB
MAX_UPLOAD_FILE_SIZE_OF_A_REQUEST = 100 * 1024 * 1024 # 100 MB
MAX_UPLOAD_FILES_IN_A_REQUEST = 5

View File

@ -236,7 +236,6 @@ class CtelViewSet(viewsets.ViewSet):
compact_files.append(this_file)
c_connector.do_pdf((rq_id, sub.id, p_type, user.id, compact_files))
time_limit = 120
start_time = time.time()
while True:

View File

View File

@ -0,0 +1,19 @@
from django.conf import settings
from django.utils import timezone
from fwd_api.models import SubscriptionRequest
from .cron_job import CronJob
from fwd_api.celery_worker.worker import app as celery_app
class FileCleaningWoker(CronJob):
"""Clean old files to save disk space"""
def run(self):
print("-----------")
@celery_app.task(time_limit=5000)
def clean_media():
worker = FileCleaningWoker("file_cleaning_worker")
worker.run()

View File

@ -0,0 +1,31 @@
import traceback
from celery.utils.log import get_task_logger
class CronJob:
def __init__(
self,
name,
):
self.name = name
self.logger = self.get_logger()
def get_logger(self):
"""Create/Get the logger for this task"""
logger = get_task_logger(self.name)
return logger
def safe_run(self):
"""Create a logger and execute run()
in a try/except block to prevent crashing
"""
try:
self.run()
except Exception as e:
self.logger.error("Failed to run cron job in safe mode.")
self.logger.error(e)
traceback.print_exc()
def run(self):
raise NotImplementedError("Not implemented error")

View File

@ -107,11 +107,15 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files):
@app.task(name='upload_file_to_s3')
def upload_file_to_s3(local_file_path, s3_key):
def upload_file_to_s3(local_file_path, s3_key, request_id):
if s3_client.s3_client is not None:
res = s3_client.upload_file(local_file_path, s3_key)
if res != None and res["ResponseMetadata"]["HTTPStatusCode"] == 200:
os.remove(local_file_path)
try:
s3_client.upload_file(local_file_path, s3_key)
sub_request = SubscriptionRequest.objects.filter(request_id=request_id)
sub_request.S3_uploaded = True
sub_request.save()
except Exception as e:
return
else:
logger.info(f"S3 is not available, skipping,...")

View File

@ -16,6 +16,17 @@ app: Celery = Celery(
broker_transport_options={'confirm_publish': False},
)
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
from fwd_api.bg_tasks.clean_local_files import clean_media
sender.add_periodic_task(
10.0, clean_media.s(), expires=120.0
)
app.conf.update({
'task_queues':
[

View File

@ -18,4 +18,5 @@ class SubscriptionRequest(models.Model):
status = models.IntegerField() # 1: Processing(Pending) 2: PredictCompleted 3: ReturnCompleted
subscription = models.ForeignKey(Subscription, on_delete=models.CASCADE)
created_at = models.DateTimeField(default=timezone.now, db_index=True)
updated_at = models.DateTimeField(auto_now=True)
updated_at = models.DateTimeField(auto_now=True)
S3_uploaded = models.BooleanField(default=False)

View File

@ -156,9 +156,10 @@ def resize_and_save_file(file_name: str, rq: SubscriptionRequest, file: Temporar
def save_to_S3(file_name, rq, local_file_path):
try:
file_path = get_folder_path(rq)
request_id = rq.request_id
assert len(file_path.split("/")) >= 2, "file_path must have at least process type and request id"
s3_key = os.path.join(file_path.split("/")[-2], file_path.split("/")[-1], file_name)
c_connector.upload_file_to_s3((local_file_path, s3_key))
c_connector.upload_file_to_s3((local_file_path, s3_key, request_id))
return s3_key
except Exception as e:
print(f"[ERROR]: {e}")

View File

@ -49,4 +49,5 @@ djangorestframework-xml==2.0.0
boto3==1.29.7
imagesize==1.4.1
pdf2image==1.16.3
redis==5.0.1
redis==5.0.1
django-celery-beat==2.5.0

View File

@ -21,4 +21,4 @@ docker push public.ecr.aws/v4n9y6r8/sidp/cope2n-fe-fi-sbt:${tag}
cp ./docker-compose.yml ./docker-compose_${tag}.yml
sed -i "s/{{tag}}/$tag/g" ./docker-compose_${tag}.yml
cp .env .env_${tag}
cp .env_prod .env_${tag}

View File

@ -76,7 +76,7 @@ services:
volumes:
- ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT}
- BE_static:/app/static
# - ./cope2n-api:/app
- ./cope2n-api:/app
working_dir: /app
depends_on:
db-sbt:
@ -162,7 +162,7 @@ services:
condition: service_started
volumes:
- ${HOST_MEDIA_FOLDER}:${MEDIA_ROOT}
# - ./cope2n-api:/app
- ./cope2n-api:/app
working_dir: /app
command: sh -c "celery -A fwd_api.celery_worker.worker worker -l INFO -c 5"

View File

@ -135,8 +135,6 @@ services:
networks:
- ctel-sbt
depends_on:
db-sbt:
condition: service_started
rabbitmq-sbt:
condition: service_started
volumes:

View File

@ -1,10 +1,12 @@
import requests
import time
import random
import argparse
import multiprocessing
import tqdm
import random
import traceback
from requests_toolbelt import MultipartEncoderMonitor
import requests
parser = argparse.ArgumentParser()
@ -30,23 +32,48 @@ try:
except:
print("Failed to login")
print(response.content)
# After the login, store the token in the memory (RAM) or DB
# Re-login to issue a new token after 6 days.
# =================================================================
def process_file(data):
files, token = data
_, token = data
files = []
if random.random() < 0.2:
files = [
('invoice_file', ("invoice.jpg", open("test_samples/sbt/invoice.jpg", "rb"), 'application/octet-stream')),
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')),
]
elif random.random() < 0.6:
files = [
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
]
else:
files = [
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb"), 'application/octet-stream')),
('imei_files', ("imei1.jpg", open("test_samples/sbt/imei2.jpg", "rb"), 'application/octet-stream')),
]
num_files = len(files)
files.append(
('processType', (None, 12)),
)
files.append(('processType', '12'))
# =================================================================
# UPLOAD THE FILE
start_time = time.time()
end_of_upload_time = 0
def my_callback(monitor):
nonlocal end_of_upload_time
if monitor.bytes_read == monitor.len:
end_of_upload_time = time.time()
m = MultipartEncoderMonitor.from_fields(
fields=files,
callback=my_callback
)
try:
response = requests.post(f'{args.host}/api/ctel/images/process_sync/', headers={
'Authorization': token,
}, files=files, timeout=300)
'Content-Type': m.content_type
}, data=m, timeout=300)
except requests.exceptions.Timeout:
print("Timeout occurred while uploading")
return {
@ -68,11 +95,19 @@ def process_file(data):
"num_files": 0,
}
end_time = time.time()
upload_time = end_time - start_time
upload_time = end_of_upload_time - start_time
# =================================================================
try:
data = response.json()
if len(data["files"]) != num_files:
return {
"success": False,
"status": "missing_file",
"upload_time": 0,
"process_time": 0,
"num_files": 0,
}
data.pop("files", None)
print(data)
except:
@ -88,36 +123,14 @@ def process_file(data):
"success": True,
"status": 200,
"upload_time": upload_time,
"process_time": upload_time,
"process_time": time.time() - start_time - upload_time,
"num_files": num_files,
}
invoice_files = [
('invoice_file', ('invoice.pdf', open("test_samples/20220303025923NHNE_20220222_Starhub_Order_Confirmation_by_Email.pdf", "rb").read())),
]
# invoice_files = [
# ('invoice_file', ('invoice.jpg', open("test_samples/sbt/invoice.jpg", "rb").read())),
# ]
imei_files = [
('imei_files', ("test_samples/sbt/imei1.jpg", open("test_samples/sbt/imei1.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei2.jpg", open("test_samples/sbt/imei2.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei3.jpg", open("test_samples/sbt/imei3.jpg", "rb").read())),
('imei_files', ("test_samples/sbt/imei4.jpeg", open("test_samples/sbt/imei4.jpeg", "rb").read())),
('imei_files', ("test_samples/sbt/imei5.jpg", open("test_samples/sbt/imei5.jpg", "rb").read())),
]
def get_imei_files():
num_files = random.randint(1, len(imei_files) + 1)
print("Num imeis", num_files)
files = imei_files[:num_files]
# print("Num of imei files:", len(files))
return files
def get_files():
return invoice_files + get_imei_files()
def gen_input(num_input):
for _ in range(num_input):
yield (get_files(), token)
yield (None, token)
pool = multiprocessing.Pool(processes=args.num_workers)
results = []
for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=args.num_requests)), total=args.num_requests):
@ -126,7 +139,6 @@ for result in tqdm.tqdm(pool.imap_unordered(process_file, gen_input(num_input=ar
print("## TEST REPORT #################################")
print("Number of requests: {}".format(args.num_requests))
print("Number of concurrent requests: {}".format(args.num_workers))
print("Number of files: 1 invoice, 1-5 imei files (random)")
print("--------------------------------------")
print("SUCCESS RATE")
counter = {}
@ -142,7 +154,8 @@ if len(uploading_time) == 0:
print("No valid uploading time")
print("Check the results!")
processing_time = [x["process_time"] for x in results if x["success"]]
print("Uploading + Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time)))
print("Uploading time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(uploading_time) / len(uploading_time), min(uploading_time), max(uploading_time)))
print("Processing time (Avg / Min / Max): {:.3f}s {:.3f}s {:.3f}s".format(sum(processing_time) / len(processing_time), min(processing_time), max(processing_time)))
print("--------------------------------------")
print("TIME BY IMAGE")
uploading_time = [x["upload_time"] for x in results if x["success"]]
@ -151,3 +164,4 @@ num_images = sum(x["num_files"] for x in results if x["success"])
print("Total images:", num_images)
print("Uploading + Processing time: {:.3f}s".format(sum(processing_time) / num_images))
print("--------------------------------------")