sbt-idp/cope2n-api/fwd_api/utils/s3.py
2024-02-21 18:20:35 +07:00

88 lines
3.3 KiB
Python

import boto3
class MinioS3Client:
def __init__(self, access_key, secret_key, bucket_name, endpoint=""):
self.endpoint = endpoint
self.access_key = access_key
self.secret_key = secret_key
self.bucket_name = bucket_name
try:
if len(endpoint) > 0:
self.s3_client = boto3.client(
's3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)
else:
self.s3_client = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key
)
except Exception as e:
print(f"[WARM] Unable to create an s3 client, {e}")
self.s3_client = None
def update_object(self, s3_key, content):
try:
res = self.s3_client.put_object(
Bucket=self.bucket_name,
Key=s3_key,
Body=content
)
# print(f"Object '{s3_key}' updated in S3 with res: {res}")
return res
except Exception as e:
print(f"Error updating object in S3: {str(e)}")
def upload_file(self, local_file_path, s3_key):
try:
res = self.s3_client.upload_file(local_file_path, self.bucket_name, s3_key)
# print(f"File '{local_file_path}' uploaded to S3 with key '{s3_key}'")
return res
except Exception as e:
print(f"Error uploading file to S3: {str(e)}")
def download_file(self, s3_key, local_file_path):
try:
res = self.s3_client.download_file(self.bucket_name, s3_key, local_file_path)
# print(f"File '{s3_key}' downloaded from S3 to '{local_file_path}'")
return res
except Exception as e:
print(f"Error downloading file from S3: {str(e)}")
def create_url_with_expiration(self, s3_key, expiration_time):
try:
res = self.s3_client.generate_presigned_url(
ClientMethod="get_object", ExpiresIn=expiration_time,
Params={
"Bucket": self.bucket_name,
"Key": s3_key,
},
)
# print(f"URL for file '{s3_key}' expires in {expiration_time} seconds")
return res
except Exception as e:
print(f"Error generating URL for file '{s3_key}': {str(e)}")
if __name__=="__main__":
FILE = "/app/media/users/1/subscriptions/33/requests/sbt_invoice/SAP00c6c229c2954e498b119968a318d366/temp_SAP00c6c229c2954e498b119968a318d366.jpg"
s3_client = MinioS3Client(
# endpoint='http://minio:9884',
access_key='AKIA3AFPFVWZD77UACHE',
secret_key='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',
bucket_name='ocr-sds'
)
# Read the content of the JPG image file
# with open(FILE, 'rb') as file:
# image_content = file.read()
# Update the object in S3 with the new content
# res = s3_client.update_object('invoice/viettinbank/image.jpg', image_content)
# print(f"[DEBUG]: res: {res}")
res = s3_client.upload_file(FILE, 'invoic1e/viettinbank/image.jpg')
print(f"[DEBUG]: res: {res}")