update opentelemetry to ai service sbt

This commit is contained in:
PhanThanhTrung 2024-10-29 11:07:30 +07:00
parent 700480788c
commit 1a15907fc0
6 changed files with 90 additions and 261 deletions

View File

@ -7,6 +7,7 @@ from pathlib import Path
import urllib.parse
import uuid
from copy import deepcopy
from opentelemetry import trace
import sys, os
cur_dir = str(Path(__file__).parents[2])
sys.path.append(cur_dir)
@ -22,6 +23,7 @@ from utils.logging.logging import LOGGER_CONFIG
logging.config.dictConfig(LOGGER_CONFIG)
# Get the logger
logger = logging.getLogger(__name__)
tracer = trace.get_tracer()
logger.info("OCR engine configfs: \n", ocr_cfg)
logger.info("KVU configfs: \n", kvu_cfg)
@ -35,7 +37,7 @@ kvu_cfg.pop("option") # pop option
sbt_engine = load_engine(kvu_cfg)
kvu_cfg["option"] = option
@tracer.start_as_current_span("sbt_predict")
def sbt_predict(image_url, engine, metadata={}) -> None:
req = urllib.request.urlopen(image_url)
arr = np.asarray(bytearray(req.read()), dtype=np.uint8)
@ -65,6 +67,7 @@ def sbt_predict(image_url, engine, metadata={}) -> None:
os.remove(tmp_image_path)
return outputs
@tracer.start_as_current_span("predict")
def predict(page_numb, image_url, metadata={}):
"""
module predict function

View File

@ -1,71 +1,22 @@
from celery_worker.worker_fi import app
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output_sbt
from .task_warpper import VerboseTask
import logging
import logging.config
from opentelemetry import trace
from celery_worker.client_connector_fi import CeleryConnector
from celery_worker.worker_fi import app
from common.process_pdf import compile_output_sbt
from utils.logging.logging import LOGGER_CONFIG
from .task_warpper import VerboseTask
# Load the logging configuration
logging.config.dictConfig(LOGGER_CONFIG)
# Get the logger
logger = logging.getLogger(__name__)
tracer = trace.get_tracer()
@app.task(base=VerboseTask,name="process_fi_invoice")
def process_invoice(rq_id, list_url):
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output_fi
c_connector = CeleryConnector()
try:
result = compile_output_fi(list_url)
hoadon = {"status": 200, "content": result, "message": "Success"}
logger.info(hoadon)
c_connector.process_fi_invoice_result((rq_id, hoadon))
return {"rq_id": rq_id}
except Exception as e:
logger.info(e)
hoadon = {"status": 404, "content": {}}
c_connector.process_fi_invoice_result((rq_id, hoadon))
return {"rq_id": rq_id}
@app.task(base=VerboseTask,name="process_sap_invoice")
def process_sap_invoice(rq_id, list_url):
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output
logger.info(list_url)
c_connector = CeleryConnector()
try:
result = compile_output(list_url)
hoadon = {"status": 200, "content": result, "message": "Success"}
c_connector.process_sap_invoice_result((rq_id, hoadon))
return {"rq_id": rq_id}
except Exception as e:
logger.info(e)
hoadon = {"status": 404, "content": {}}
c_connector.process_sap_invoice_result((rq_id, hoadon))
return {"rq_id": rq_id}
@app.task(base=VerboseTask,name="process_manulife_invoice")
def process_manulife_invoice(rq_id, list_url):
from celery_worker.client_connector_fi import CeleryConnector
from common.process_pdf import compile_output_manulife
# TODO: simply returning 200 and 404 doesn't make any sense
c_connector = CeleryConnector()
try:
result = compile_output_manulife(list_url)
hoadon = {"status": 200, "content": result, "message": "Success"}
logger.info(hoadon)
c_connector.process_manulife_invoice_result((rq_id, hoadon))
return {"rq_id": rq_id}
except Exception as e:
logger.info(e)
hoadon = {"status": 404, "content": {}}
c_connector.process_manulife_invoice_result((rq_id, hoadon))
return {"rq_id": rq_id}
@app.task(base=VerboseTask,name="process_sbt_invoice")
@app.task(base=VerboseTask,name="process_sbt_invoice", track_started=True)
def process_sbt_invoice(rq_id, list_url, metadata):
# TODO: simply returning 200 and 404 doesn't make any sense
c_connector = CeleryConnector()

View File

@ -1,11 +1,47 @@
from celery import Celery
from kombu import Queue, Exchange
import environ
from celery import Celery
from celery.signals import worker_process_init
from kombu import Exchange, Queue
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.http.metric_exporter import \
OTLPMetricExporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import \
OTLPSpanExporter
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
env = environ.Env(
DEBUG=(bool, False)
)
debug = env.str("DEBUG", False)
tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318")
service_name = "sbt_celery_ai"
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces")
processor = BatchSpanProcessor(span_exporter=span_exporter)
attributes = {SERVICE_NAME: service_name}
resource = Resource(attributes=attributes)
trace_provider = TracerProvider(resource=resource)
trace_provider.add_span_processor(span_processor=processor)
trace.set_tracer_provider(tracer_provider=trace_provider)
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics"))
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
metrics.set_meter_provider(meter_provider=meter_provider)
app: Celery = Celery(
"postman",
broker= env.str("CELERY_BROKER", "amqp://test:test@rabbitmq:5672"),

View File

@ -1,23 +1,17 @@
import os
import json
import time
from common import json2xml
from common.json2xml import convert_key_names, replace_xml_values
from common.utils_kvu.split_docs import split_docs, merge_sbt_output
# from api.OCRBase.prediction import predict as ocr_predict
# from api.Kie_Invoice_AP.prediction_sap import predict
# from api.Kie_Invoice_AP.prediction_fi import predict_fi
# from api.manulife.predict_manulife import predict as predict_manulife
from api.sdsap_sbt.prediction_sbt import predict as predict_sbt
import logging
import logging.config
import os
import time
from opentelemetry import trace
from api.sdsap_sbt.prediction_sbt import predict as predict_sbt
from common.utils_kvu.split_docs import merge_sbt_output
from utils.logging.logging import LOGGER_CONFIG
# Load the logging configuration
logging.config.dictConfig(LOGGER_CONFIG)
# Get the logger
logger = logging.getLogger(__name__)
tracer = trace.get_tracer()
os.environ['PYTHONPATH'] = '/home/thucpd/thucpd/cope2n-ai/cope2n-ai/'
@ -27,54 +21,6 @@ def check_label_exists(array, target_label):
return True # Label exists in the array
return False # Label does not exist in the array
def compile_output(list_url):
"""_summary_
Args:
pdf_extracted (list): list: [{
"1": url},{"2": url},
...]
Raises:
NotImplementedError: _description_
Returns:
dict: output compiled
"""
results = {
"model":{
"name":"Invoice",
"confidence": 1.0,
"type": "finance/invoice",
"isValid": True,
"shape": "letter",
}
}
compile_outputs = []
compiled = []
for page in list_url:
output_model = predict(page['page_number'], page['file_url'])
for field in output_model['fields']:
if field['value'] != "" and not check_label_exists(compiled, field['label']):
element = {
'label': field['label'],
'value': field['value'],
}
compiled.append(element)
elif field['label'] == 'table' and check_label_exists(compiled, "table"):
for index, obj in enumerate(compiled):
if obj['label'] == 'table':
compiled[index]['value'].append(field['value'])
compile_output = {
'page_index': page['page_number'],
'request_file_id': page['request_file_id'],
'fields': output_model['fields']
}
compile_outputs.append(compile_output)
results['combine_results'] = compiled
results['pages'] = compile_outputs
return results
def update_null_values(kvu_result, next_kvu_result):
for key, value in kvu_result.items():
if value is None and next_kvu_result.get(key) is not None:
@ -86,127 +32,7 @@ def replace_empty_null_values(my_dict):
my_dict[key] = None
return my_dict
def compile_output_fi(list_url):
"""_summary_
Args:
pdf_extracted (list): list: [{
"1": url},{"2": url},
...]
Raises:
NotImplementedError: _description_
Returns:
dict: output compiled
"""
results = {
"model":{
"name":"Invoice",
"confidence": 1.0,
"type": "finance/invoice",
"isValid": True,
"shape": "letter",
}
}
# Loop through the list_url to update kvu_result
for i in range(len(list_url) - 1):
page = list_url[i]
next_page = list_url[i + 1]
kvu_result, output_kie = predict_fi(page['page_number'], page['file_url'])
next_kvu_result, next_output_kie = predict_fi(next_page['page_number'], next_page['file_url'])
update_null_values(kvu_result, next_kvu_result)
output_kie = replace_empty_null_values(output_kie)
next_output_kie = replace_empty_null_values(next_output_kie)
update_null_values(output_kie, next_output_kie)
# Handle the last item in the list_url
if list_url:
page = list_url[-1]
kvu_result, output_kie = predict_fi(page['page_number'], page['file_url'])
converted_dict = convert_key_names(kvu_result)
converted_dict.update(convert_key_names(output_kie))
output_fi = replace_xml_values(json2xml.xml_template3, converted_dict)
field_fi = {
"xml": output_fi,
}
results['combine_results'] = field_fi
# results['combine_results'] = converted_dict
# results['combine_results_kie'] = output_kie
return results
def compile_output_ocr_base(list_url):
"""Compile output of OCRBase
Args:
list_url (list): List string url of image
Returns:
dict: dict of output
"""
results = {
"model":{
"name":"OCRBase",
"confidence": 1.0,
"type": "ocrbase",
"isValid": True,
"shape": "letter",
}
}
compile_outputs = []
for page in list_url:
output_model = ocr_predict(page['page_number'], page['file_url'])
compile_output = {
'page_index': page['page_number'],
'request_file_id': page['request_file_id'],
'fields': output_model['fields']
}
compile_outputs.append(compile_output)
results['pages'] = compile_outputs
return results
def compile_output_manulife(list_url):
"""_summary_
Args:
pdf_extracted (list): list: [{
"1": url},{"2": url},
...]
Raises:
NotImplementedError: _description_
Returns:
dict: output compiled
"""
results = {
"model":{
"name":"Invoice",
"confidence": 1.0,
"type": "finance/invoice",
"isValid": True,
"shape": "letter",
}
}
outputs = []
for page in list_url:
output_model = predict_manulife(page['page_number'], page['file_url']) # gotta be predict_manulife(), for the time being, this function is not avaible, we just leave a dummy function here instead
logger.info("output_model", output_model)
outputs.append(output_model)
logger.info("outputs", outputs)
documents = split_docs(outputs)
logger.info("documents", documents)
results = {
"total_pages": len(list_url),
"ocr_num_pages": len(list_url),
"document": documents
}
return results
@tracer.start_as_current_span("compile_output_sbt")
def compile_output_sbt(list_url, metadata):
"""_summary_
@ -237,6 +63,7 @@ def compile_output_sbt(list_url, metadata):
outputs = []
start = time.time()
pages_predict_time = []
for page in list_url:
output_model = predict_sbt(page['page_number'], page['file_url'], metadata)
pages_predict_time.append(time.time())
@ -244,7 +71,9 @@ def compile_output_sbt(list_url, metadata):
output_model['doc_type'] = page['doc_type']
outputs.append(output_model)
start_postprocess = time.time()
documents = merge_sbt_output(outputs)
inference_profile["postprocess"] = [start_postprocess, time.time()]
inference_profile["inference"] = [start, pages_predict_time]
results = {
@ -254,16 +83,3 @@ def compile_output_sbt(list_url, metadata):
"inference_profile": inference_profile
}
return results
def main():
"""
main function
"""
list_url = [{"file_url": "https://www.irs.gov/pub/irs-pdf/fw9.pdf", "page_number": 1, "request_file_id": 1}, ...]
results = compile_output(list_url)
with open('output.json', 'w', encoding='utf-8') as file:
json.dump(results, file, ensure_ascii=False, indent=4)
if __name__ == "__main__":
main()

View File

@ -2,7 +2,11 @@ import os
import glob
import json
from tqdm import tqdm
from opentelemetry import trace
tracer = trace.get_tracer()
@tracer.start_as_current_span("longestCommonSubsequence")
def longestCommonSubsequence(text1: str, text2: str) -> int:
# https://leetcode.com/problems/longest-common-subsequence/discuss/351689/JavaPython-3-Two-DP-codes-of-O(mn)-and-O(min(m-n))-spaces-w-picture-and-analysis
dp = [[0] * (len(text2) + 1) for _ in range(len(text1) + 1)]
@ -12,21 +16,25 @@ def longestCommonSubsequence(text1: str, text2: str) -> int:
dp[i][j] if c == d else max(dp[i][j + 1], dp[i + 1][j])
return dp[-1][-1]
@tracer.start_as_current_span("write_to_json")
def write_to_json(file_path, content):
with open(file_path, mode="w", encoding="utf8") as f:
json.dump(content, f, ensure_ascii=False)
@tracer.start_as_current_span("read_json")
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
@tracer.start_as_current_span("check_label_exists")
def check_label_exists(array, target_label):
for obj in array:
if obj["label"] == target_label:
return True # Label exists in the array
return False # Label does not exist in the array
@tracer.start_as_current_span("merged_kvu_outputs")
def merged_kvu_outputs(loutputs: list) -> dict:
compiled = []
for output_model in loutputs:
@ -44,6 +52,7 @@ def merged_kvu_outputs(loutputs: list) -> dict:
return compiled
@tracer.start_as_current_span("split_docs")
def split_docs(doc_data: list, threshold: float=0.6) -> list:
num_pages = len(doc_data)
outputs = []
@ -91,8 +100,10 @@ def split_docs(doc_data: list, threshold: float=0.6) -> list:
return outputs
@tracer.start_as_current_span("merge_sbt_output")
def merge_sbt_output(loutputs):
# TODO: This function is too circumlocutory, need to refactor the whole flow
@tracer.start_as_current_span("dict_to_list_of_dict")
def dict_to_list_of_dict(the_dict):
output = []
for k,v in the_dict.items():

View File

@ -12,3 +12,15 @@ imagesize==1.4.1
pdf2image==1.17.0
redis==5.0.1
celery==5.3.6
opentelemetry-api==1.27.0
opentelemetry-distro==0.48b0
opentelemetry-exporter-otlp==1.27.0
opentelemetry-exporter-otlp-proto-common==1.27.0
opentelemetry-exporter-otlp-proto-http==1.27.0
opentelemetry-instrumentation==0.48b0
opentelemetry-instrumentation-celery==0.48b0
opentelemetry-proto==1.27.0
opentelemetry-sdk==1.27.0
opentelemetry-semantic-conventions==0.48b0
opentelemetry-util-http==0.48b0