From 1a15907fc03c0832cec6adb180b61b522ca6a3d8 Mon Sep 17 00:00:00 2001 From: PhanThanhTrung <pt.trung2@samsung.com> Date: Tue, 29 Oct 2024 11:07:30 +0700 Subject: [PATCH] update opentelemetry to ai service sbt --- cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py | 5 +- .../celery_worker/mock_process_tasks_fi.py | 71 +----- cope2n-ai-fi/celery_worker/worker_fi.py | 40 +++- cope2n-ai-fi/common/process_pdf.py | 210 ++---------------- cope2n-ai-fi/common/utils_kvu/split_docs.py | 11 + cope2n-ai-fi/requirements.txt | 14 +- 6 files changed, 90 insertions(+), 261 deletions(-) diff --git a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py index ca1de3b..896e292 100755 --- a/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py +++ b/cope2n-ai-fi/api/sdsap_sbt/prediction_sbt.py @@ -7,6 +7,7 @@ from pathlib import Path import urllib.parse import uuid from copy import deepcopy +from opentelemetry import trace import sys, os cur_dir = str(Path(__file__).parents[2]) sys.path.append(cur_dir) @@ -22,6 +23,7 @@ from utils.logging.logging import LOGGER_CONFIG logging.config.dictConfig(LOGGER_CONFIG) # Get the logger logger = logging.getLogger(__name__) +tracer = trace.get_tracer() logger.info("OCR engine configfs: \n", ocr_cfg) logger.info("KVU configfs: \n", kvu_cfg) @@ -35,7 +37,7 @@ kvu_cfg.pop("option") # pop option sbt_engine = load_engine(kvu_cfg) kvu_cfg["option"] = option - +@tracer.start_as_current_span("sbt_predict") def sbt_predict(image_url, engine, metadata={}) -> None: req = urllib.request.urlopen(image_url) arr = np.asarray(bytearray(req.read()), dtype=np.uint8) @@ -65,6 +67,7 @@ def sbt_predict(image_url, engine, metadata={}) -> None: os.remove(tmp_image_path) return outputs +@tracer.start_as_current_span("predict") def predict(page_numb, image_url, metadata={}): """ module predict function diff --git a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py index 3e6d61d..2a0521b 100755 --- a/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py +++ b/cope2n-ai-fi/celery_worker/mock_process_tasks_fi.py @@ -1,71 +1,22 @@ -from celery_worker.worker_fi import app -from celery_worker.client_connector_fi import CeleryConnector -from common.process_pdf import compile_output_sbt -from .task_warpper import VerboseTask import logging import logging.config + +from opentelemetry import trace + +from celery_worker.client_connector_fi import CeleryConnector +from celery_worker.worker_fi import app +from common.process_pdf import compile_output_sbt from utils.logging.logging import LOGGER_CONFIG + +from .task_warpper import VerboseTask + # Load the logging configuration logging.config.dictConfig(LOGGER_CONFIG) # Get the logger logger = logging.getLogger(__name__) +tracer = trace.get_tracer() -@app.task(base=VerboseTask,name="process_fi_invoice") -def process_invoice(rq_id, list_url): - from celery_worker.client_connector_fi import CeleryConnector - from common.process_pdf import compile_output_fi - - c_connector = CeleryConnector() - try: - result = compile_output_fi(list_url) - hoadon = {"status": 200, "content": result, "message": "Success"} - logger.info(hoadon) - c_connector.process_fi_invoice_result((rq_id, hoadon)) - return {"rq_id": rq_id} - except Exception as e: - logger.info(e) - hoadon = {"status": 404, "content": {}} - c_connector.process_fi_invoice_result((rq_id, hoadon)) - return {"rq_id": rq_id} - - -@app.task(base=VerboseTask,name="process_sap_invoice") -def process_sap_invoice(rq_id, list_url): - from celery_worker.client_connector_fi import CeleryConnector - from common.process_pdf import compile_output - - logger.info(list_url) - c_connector = CeleryConnector() - try: - result = compile_output(list_url) - hoadon = {"status": 200, "content": result, "message": "Success"} - c_connector.process_sap_invoice_result((rq_id, hoadon)) - return {"rq_id": rq_id} - except Exception as e: - logger.info(e) - hoadon = {"status": 404, "content": {}} - c_connector.process_sap_invoice_result((rq_id, hoadon)) - return {"rq_id": rq_id} - -@app.task(base=VerboseTask,name="process_manulife_invoice") -def process_manulife_invoice(rq_id, list_url): - from celery_worker.client_connector_fi import CeleryConnector - from common.process_pdf import compile_output_manulife - # TODO: simply returning 200 and 404 doesn't make any sense - c_connector = CeleryConnector() - try: - result = compile_output_manulife(list_url) - hoadon = {"status": 200, "content": result, "message": "Success"} - logger.info(hoadon) - c_connector.process_manulife_invoice_result((rq_id, hoadon)) - return {"rq_id": rq_id} - except Exception as e: - logger.info(e) - hoadon = {"status": 404, "content": {}} - c_connector.process_manulife_invoice_result((rq_id, hoadon)) - return {"rq_id": rq_id} - -@app.task(base=VerboseTask,name="process_sbt_invoice") +@app.task(base=VerboseTask,name="process_sbt_invoice", track_started=True) def process_sbt_invoice(rq_id, list_url, metadata): # TODO: simply returning 200 and 404 doesn't make any sense c_connector = CeleryConnector() diff --git a/cope2n-ai-fi/celery_worker/worker_fi.py b/cope2n-ai-fi/celery_worker/worker_fi.py index f36ffd1..6cd7265 100755 --- a/cope2n-ai-fi/celery_worker/worker_fi.py +++ b/cope2n-ai-fi/celery_worker/worker_fi.py @@ -1,11 +1,47 @@ -from celery import Celery -from kombu import Queue, Exchange import environ +from celery import Celery +from celery.signals import worker_process_init +from kombu import Exchange, Queue +from opentelemetry import metrics, trace +from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ + OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ + OTLPSpanExporter +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor env = environ.Env( DEBUG=(bool, False) ) +debug = env.str("DEBUG", False) + + +tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318") +service_name = "sbt_celery_ai" + +@worker_process_init.connect(weak=False) +def init_celery_tracing(*args, **kwargs): + CeleryInstrumentor().instrument() + span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces") + processor = BatchSpanProcessor(span_exporter=span_exporter) + + attributes = {SERVICE_NAME: service_name} + resource = Resource(attributes=attributes) + trace_provider = TracerProvider(resource=resource) + trace_provider.add_span_processor(span_processor=processor) + trace.set_tracer_provider(tracer_provider=trace_provider) + + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics")) + meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) + metrics.set_meter_provider(meter_provider=meter_provider) + + app: Celery = Celery( "postman", broker= env.str("CELERY_BROKER", "amqp://test:test@rabbitmq:5672"), diff --git a/cope2n-ai-fi/common/process_pdf.py b/cope2n-ai-fi/common/process_pdf.py index fb161ad..2cde1e0 100755 --- a/cope2n-ai-fi/common/process_pdf.py +++ b/cope2n-ai-fi/common/process_pdf.py @@ -1,23 +1,17 @@ -import os -import json -import time - -from common import json2xml -from common.json2xml import convert_key_names, replace_xml_values -from common.utils_kvu.split_docs import split_docs, merge_sbt_output - -# from api.OCRBase.prediction import predict as ocr_predict -# from api.Kie_Invoice_AP.prediction_sap import predict -# from api.Kie_Invoice_AP.prediction_fi import predict_fi -# from api.manulife.predict_manulife import predict as predict_manulife -from api.sdsap_sbt.prediction_sbt import predict as predict_sbt import logging import logging.config +import os +import time + +from opentelemetry import trace + +from api.sdsap_sbt.prediction_sbt import predict as predict_sbt +from common.utils_kvu.split_docs import merge_sbt_output from utils.logging.logging import LOGGER_CONFIG -# Load the logging configuration + logging.config.dictConfig(LOGGER_CONFIG) -# Get the logger logger = logging.getLogger(__name__) +tracer = trace.get_tracer() os.environ['PYTHONPATH'] = '/home/thucpd/thucpd/cope2n-ai/cope2n-ai/' @@ -27,54 +21,6 @@ def check_label_exists(array, target_label): return True # Label exists in the array return False # Label does not exist in the array -def compile_output(list_url): - """_summary_ - - Args: - pdf_extracted (list): list: [{ - "1": url},{"2": url}, - ...] - Raises: - NotImplementedError: _description_ - - Returns: - dict: output compiled - """ - - results = { - "model":{ - "name":"Invoice", - "confidence": 1.0, - "type": "finance/invoice", - "isValid": True, - "shape": "letter", - } - } - compile_outputs = [] - compiled = [] - for page in list_url: - output_model = predict(page['page_number'], page['file_url']) - for field in output_model['fields']: - if field['value'] != "" and not check_label_exists(compiled, field['label']): - element = { - 'label': field['label'], - 'value': field['value'], - } - compiled.append(element) - elif field['label'] == 'table' and check_label_exists(compiled, "table"): - for index, obj in enumerate(compiled): - if obj['label'] == 'table': - compiled[index]['value'].append(field['value']) - compile_output = { - 'page_index': page['page_number'], - 'request_file_id': page['request_file_id'], - 'fields': output_model['fields'] - } - compile_outputs.append(compile_output) - results['combine_results'] = compiled - results['pages'] = compile_outputs - return results - def update_null_values(kvu_result, next_kvu_result): for key, value in kvu_result.items(): if value is None and next_kvu_result.get(key) is not None: @@ -86,127 +32,7 @@ def replace_empty_null_values(my_dict): my_dict[key] = None return my_dict -def compile_output_fi(list_url): - """_summary_ - - Args: - pdf_extracted (list): list: [{ - "1": url},{"2": url}, - ...] - Raises: - NotImplementedError: _description_ - - Returns: - dict: output compiled - """ - - results = { - "model":{ - "name":"Invoice", - "confidence": 1.0, - "type": "finance/invoice", - "isValid": True, - "shape": "letter", - } - } - # Loop through the list_url to update kvu_result - for i in range(len(list_url) - 1): - page = list_url[i] - next_page = list_url[i + 1] - kvu_result, output_kie = predict_fi(page['page_number'], page['file_url']) - next_kvu_result, next_output_kie = predict_fi(next_page['page_number'], next_page['file_url']) - - update_null_values(kvu_result, next_kvu_result) - output_kie = replace_empty_null_values(output_kie) - next_output_kie = replace_empty_null_values(next_output_kie) - update_null_values(output_kie, next_output_kie) - - # Handle the last item in the list_url - if list_url: - page = list_url[-1] - kvu_result, output_kie = predict_fi(page['page_number'], page['file_url']) - - converted_dict = convert_key_names(kvu_result) - converted_dict.update(convert_key_names(output_kie)) - output_fi = replace_xml_values(json2xml.xml_template3, converted_dict) - field_fi = { - "xml": output_fi, - } - results['combine_results'] = field_fi - # results['combine_results'] = converted_dict - # results['combine_results_kie'] = output_kie - return results - -def compile_output_ocr_base(list_url): - """Compile output of OCRBase - - Args: - list_url (list): List string url of image - - Returns: - dict: dict of output - """ - - results = { - "model":{ - "name":"OCRBase", - "confidence": 1.0, - "type": "ocrbase", - "isValid": True, - "shape": "letter", - } - } - compile_outputs = [] - for page in list_url: - output_model = ocr_predict(page['page_number'], page['file_url']) - compile_output = { - 'page_index': page['page_number'], - 'request_file_id': page['request_file_id'], - 'fields': output_model['fields'] - } - compile_outputs.append(compile_output) - results['pages'] = compile_outputs - return results - -def compile_output_manulife(list_url): - """_summary_ - - Args: - pdf_extracted (list): list: [{ - "1": url},{"2": url}, - ...] - Raises: - NotImplementedError: _description_ - - Returns: - dict: output compiled - """ - - results = { - "model":{ - "name":"Invoice", - "confidence": 1.0, - "type": "finance/invoice", - "isValid": True, - "shape": "letter", - } - } - - outputs = [] - for page in list_url: - output_model = predict_manulife(page['page_number'], page['file_url']) # gotta be predict_manulife(), for the time being, this function is not avaible, we just leave a dummy function here instead - logger.info("output_model", output_model) - outputs.append(output_model) - logger.info("outputs", outputs) - documents = split_docs(outputs) - logger.info("documents", documents) - results = { - "total_pages": len(list_url), - "ocr_num_pages": len(list_url), - "document": documents - } - return results - +@tracer.start_as_current_span("compile_output_sbt") def compile_output_sbt(list_url, metadata): """_summary_ @@ -237,6 +63,7 @@ def compile_output_sbt(list_url, metadata): outputs = [] start = time.time() pages_predict_time = [] + for page in list_url: output_model = predict_sbt(page['page_number'], page['file_url'], metadata) pages_predict_time.append(time.time()) @@ -244,7 +71,9 @@ def compile_output_sbt(list_url, metadata): output_model['doc_type'] = page['doc_type'] outputs.append(output_model) start_postprocess = time.time() + documents = merge_sbt_output(outputs) + inference_profile["postprocess"] = [start_postprocess, time.time()] inference_profile["inference"] = [start, pages_predict_time] results = { @@ -254,16 +83,3 @@ def compile_output_sbt(list_url, metadata): "inference_profile": inference_profile } return results - - -def main(): - """ - main function - """ - list_url = [{"file_url": "https://www.irs.gov/pub/irs-pdf/fw9.pdf", "page_number": 1, "request_file_id": 1}, ...] - results = compile_output(list_url) - with open('output.json', 'w', encoding='utf-8') as file: - json.dump(results, file, ensure_ascii=False, indent=4) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/cope2n-ai-fi/common/utils_kvu/split_docs.py b/cope2n-ai-fi/common/utils_kvu/split_docs.py index 172d63b..157bdb1 100755 --- a/cope2n-ai-fi/common/utils_kvu/split_docs.py +++ b/cope2n-ai-fi/common/utils_kvu/split_docs.py @@ -2,7 +2,11 @@ import os import glob import json from tqdm import tqdm +from opentelemetry import trace +tracer = trace.get_tracer() + +@tracer.start_as_current_span("longestCommonSubsequence") def longestCommonSubsequence(text1: str, text2: str) -> int: # https://leetcode.com/problems/longest-common-subsequence/discuss/351689/JavaPython-3-Two-DP-codes-of-O(mn)-and-O(min(m-n))-spaces-w-picture-and-analysis dp = [[0] * (len(text2) + 1) for _ in range(len(text1) + 1)] @@ -12,21 +16,25 @@ def longestCommonSubsequence(text1: str, text2: str) -> int: dp[i][j] if c == d else max(dp[i][j + 1], dp[i + 1][j]) return dp[-1][-1] +@tracer.start_as_current_span("write_to_json") def write_to_json(file_path, content): with open(file_path, mode="w", encoding="utf8") as f: json.dump(content, f, ensure_ascii=False) +@tracer.start_as_current_span("read_json") def read_json(file_path): with open(file_path, "r") as f: return json.load(f) +@tracer.start_as_current_span("check_label_exists") def check_label_exists(array, target_label): for obj in array: if obj["label"] == target_label: return True # Label exists in the array return False # Label does not exist in the array +@tracer.start_as_current_span("merged_kvu_outputs") def merged_kvu_outputs(loutputs: list) -> dict: compiled = [] for output_model in loutputs: @@ -44,6 +52,7 @@ def merged_kvu_outputs(loutputs: list) -> dict: return compiled +@tracer.start_as_current_span("split_docs") def split_docs(doc_data: list, threshold: float=0.6) -> list: num_pages = len(doc_data) outputs = [] @@ -91,8 +100,10 @@ def split_docs(doc_data: list, threshold: float=0.6) -> list: return outputs +@tracer.start_as_current_span("merge_sbt_output") def merge_sbt_output(loutputs): # TODO: This function is too circumlocutory, need to refactor the whole flow + @tracer.start_as_current_span("dict_to_list_of_dict") def dict_to_list_of_dict(the_dict): output = [] for k,v in the_dict.items(): diff --git a/cope2n-ai-fi/requirements.txt b/cope2n-ai-fi/requirements.txt index fe251fb..f3fde7e 100755 --- a/cope2n-ai-fi/requirements.txt +++ b/cope2n-ai-fi/requirements.txt @@ -11,4 +11,16 @@ easydict imagesize==1.4.1 pdf2image==1.17.0 redis==5.0.1 -celery==5.3.6 \ No newline at end of file +celery==5.3.6 + +opentelemetry-api==1.27.0 +opentelemetry-distro==0.48b0 +opentelemetry-exporter-otlp==1.27.0 +opentelemetry-exporter-otlp-proto-common==1.27.0 +opentelemetry-exporter-otlp-proto-http==1.27.0 +opentelemetry-instrumentation==0.48b0 +opentelemetry-instrumentation-celery==0.48b0 +opentelemetry-proto==1.27.0 +opentelemetry-sdk==1.27.0 +opentelemetry-semantic-conventions==0.48b0 +opentelemetry-util-http==0.48b0 \ No newline at end of file