sbt-idp/cope2n-ai-fi/modules/_sdsvkvu/sdsvkvu/utils/query/sbt_v2.py
2023-11-30 18:22:16 +07:00

320 lines
11 KiB
Python

import re
from sdsvkvu.utils.post_processing import (
longestCommonSubsequence,
longest_common_substring,
preprocessing,
parse_date,
split_key_value_by_colon,
normalize_imei,
normalize_website,
normalize_hotline,
normalize_seller,
normalize_voucher
)
from sdsvkvu.utils.dictionary.sbt_v2 import get_dict
def post_process_date(list_dates):
if len(list_dates) == 0:
return None
selected_value = max(list_dates, key=lambda x: x["lcs_score"]) # Get max lsc score
if not isinstance(selected_value["content"], str):
is_parser_error = True
date_formated = None
else:
date_formated, is_parser_error = parse_date(selected_value["content"])
return date_formated
def post_process_serial(list_serials):
if len(list_serials) == 0:
return None
selected_value = max(
list_serials, key=lambda x: x["lcs_score"]
) # Get max lsc score
return selected_value["content"].strip()
def post_process_imei(list_imeis):
imeis = []
for v in list_imeis:
if not isinstance(v["content"], str):
continue
imei = v["content"].replace(" ", "")
if imei.isdigit() and len(imei) > 5: # imei is number and have more 5 digits
imeis.append({
"content": imei,
"token_id": v['token_id']
})
if len(imeis) > 0:
return sorted(imeis, key=lambda x: int(x["token_id"]))[0]['content'].strip()
return None
def post_process_qty(inp_str: str) -> str:
pattern = r"\d"
match = re.search(pattern, inp_str)
if match:
return match.group()
return inp_str
def post_process_seller(list_sellers):
seller_mapping = get_dict(type="seller_mapping")
vote_list = {}
for seller in list_sellers:
seller_name = seller['content']
if seller_name not in vote_list:
vote_list[seller_name] = 0
vote_list[seller_name] += seller['lcs_score']
if len(vote_list) > 0:
selected_value = max(
vote_list, key=lambda x: vote_list[x]
) # Get major voting
for norm_seller, candidates in seller_mapping.items():
if any(preprocessing(txt) == preprocessing(selected_value) for txt in candidates):
selected_value = norm_seller
break
selected_value = selected_value.lower()
for txt in candidates:
txt = txt.lower()
if txt in selected_value:
selected_value = selected_value.replace(txt, norm_seller)
return selected_value.strip().title()
return None
def post_process_subsidiary(list_subsidiaries):
if len(list_subsidiaries) > 0:
selected_value = max(
list_subsidiaries, key=lambda x: x["lcs_score"]
) # Get max lsc score
return selected_value["content"]
return None
def sbt_key_matching(text: str, threshold: float, dict_type: str):
dictionary = get_dict(type=dict_type)
processed_text = preprocessing(text)
scores = {k: 0.0 for k in dictionary}
# Step 1: LCS score
for k, v in dictionary.items():
score1 = max([
longestCommonSubsequence(processed_text, key) /
max(len(key), len(processed_text))
for key in dictionary[k]])
score2 = max([
longest_common_substring(processed_text, key) /
max(len(key), len(processed_text))
for key in dictionary[k]])
scores[k] = score1 if score1 > score2 else score2
key, score = max(scores.items(), key=lambda x: x[1])
return key if score >= threshold else text, score, processed_text
def get_date_value(list_dates):
date_outputs = []
for date_obj in list_dates:
if "raw_key_name" in date_obj:
date_key, date_value = split_key_value_by_colon(date_obj['raw_key_name'], date_obj['text'])
else:
date_key, date_value = split_key_value_by_colon(
date_obj['text'] if date_obj['class'] == "date_key" else None,
date_obj['text'] if date_obj['class'] == "date_value" else None
)
# print(f"======{date_key} : {date_value}")
if date_key is None and date_obj['class'] == "date_value":
date_value = date_obj['text']
proceessed_text, score = "", len(date_value) if isinstance(date_value, str) else 0
else:
key_name, score, proceessed_text = sbt_key_matching(
date_key, threshold=0.8, dict_type="date"
)
# print(f"{date_key} ==> {proceessed_text} ==> {key_name} : {score} - {date_value}")
date_outputs.append(
{
"content": date_value,
"processed_key_name": proceessed_text,
"lcs_score": score,
"token_id": date_obj["id"],
}
)
return date_outputs
def get_serial_imei(list_sn):
sn_outputs = {"serial_number": [], "imei_number": []}
for sn_obj in list_sn:
if "raw_key_name" in sn_obj:
sn_key, sn_value = split_key_value_by_colon(sn_obj['raw_key_name'], sn_obj['text'])
else:
sn_key, sn_value = split_key_value_by_colon(
sn_obj['text'] if sn_obj['class'] == "sn_key" else None,
sn_obj['text'] if sn_obj['class'] == "sn_value" else None
)
# print(f"====== {sn_key} : {sn_value}")
if sn_key is None and sn_obj['class'] == "sn_value":
sn_value = sn_obj['text']
key_name, proceessed_text, score = None, "", 0.8
else:
key_name, score, proceessed_text = sbt_key_matching(
sn_key, threshold=0.8, dict_type="imei"
)
# print(f"{sn_key} ==> {proceessed_text} ==> {key_name} : {score} - {sn_value}")
value = {
"content": sn_value,
"processed_key_name": proceessed_text,
"lcs_score": score,
"token_id": sn_obj["id"],
}
if key_name is None:
if normalize_imei(sn_value).isdigit():
sn_outputs['imei_number'].append(value)
else:
sn_outputs['serial_number'].append(value)
elif key_name in ['imei_number', 'serial_number']:
sn_outputs[key_name].append(value)
return sn_outputs
def get_product_info(list_items):
table = []
for row in list_items:
item = {}
for key, value in row.items():
item[key] = None
if len(value) > 0:
if key == "qty":
item[key] = post_process_qty(value[0]["text"])
else:
item[key] = value[0]["text"]
table.append(item)
return table
def get_seller(outputs): # Post processing to combine seller and extra information (voucher, hotline, website)
seller_outputs = []
voucher_info = []
for key_field in ["seller", "website", "hotline", "voucher", "sold_by"]:
threshold = 0.7
func_name = f"normalize_{key_field}"
for potential_seller in outputs[key_field]:
seller_name, score, processed_text = sbt_key_matching(eval(func_name)(potential_seller["text"]), threshold=threshold, dict_type="seller")
print(f"{potential_seller['text']} ==> {processed_text} ==> {seller_name} : {score}")
if key_field in ("voucher"):
voucher_info.append(potential_seller['text'])
seller_outputs.append(
{
"content": seller_name,
"raw_seller_name": potential_seller["text"],
"processed_seller_name": processed_text,
"lcs_score": score,
"info": key_field
}
)
for voucher in voucher_info:
for i in range(len(seller_outputs)):
if voucher.lower() not in seller_outputs[i]['content'].lower():
seller_outputs[i]['content'] = f"{voucher} {seller_outputs[i]['content']}"
return seller_outputs
def get_subsidiary(list_subsidiaries):
subsidiary_outputs = []
sold_by_info = []
for sold_obj in list_subsidiaries:
if "raw_key_name" in sold_obj:
sold_key, sold_value = split_key_value_by_colon(sold_obj['raw_key_name'], sold_obj['text'])
else:
sold_key, sold_value = split_key_value_by_colon(
sold_obj['text'] if sold_obj['class'] == "sold_key" else None,
sold_obj['text'] if sold_obj['class'] == "sold_value" else None
)
# print(f"======{sold_key} : {sold_value}")
if sold_key is None and sold_obj['class'] == "sold_value":
sold_value = sold_obj['text']
key_name, proceessed_text, score = "unknown", "", 0.8
else:
key_name, score, proceessed_text = sbt_key_matching(
sold_key, threshold=0.8, dict_type="sold_by"
)
# print(f"{sold_key} ==> {proceessed_text} ==> {key_name} : {score} - {sold_value}")
if key_name == "sold_by":
sold_by_info.append(sold_obj)
else:
subsidiary_outputs.append(
{
"content": sold_value,
"processed_key_name": proceessed_text,
"lcs_score": score,
"token_id": sold_obj["id"],
}
)
return subsidiary_outputs, sold_by_info
def export_kvu_for_SBT(outputs):
# sold to party
list_subsidiaries, sold_by_info = get_subsidiary(outputs['sold_value'])
# seller
outputs['sold_by'] = sold_by_info
list_sellers = get_seller(outputs)
# date
list_dates = get_date_value(outputs["date_value"])
# serial_number or imei
list_serial_imei = get_serial_imei(outputs["serial_imei"])
serial_number = post_process_serial(list_serial_imei["serial_number"])
imei_number = post_process_imei(list_serial_imei["imei_number"])
# table
# list_items = get_product_info(outputs["table"])
ap_outputs = {}
ap_outputs["retailername"] = post_process_seller(list_sellers)
ap_outputs["sold_to_party"] = post_process_subsidiary(list_subsidiaries)
ap_outputs["purchase_date"] = post_process_date(list_dates)
ap_outputs["imei_number"] = imei_number if imei_number is not None else serial_number
# ap_outputs["table"] = list_items
return ap_outputs
def merged_kvu_for_SBT_for_multi_pages(lvat_outputs: list):
merged_outputs = {k: [] for k in get_dict("key").keys()}
merged_outputs['table'] = []
for outputs in lvat_outputs:
for key_name, value in outputs.items():
if key_name == "table":
merged_outputs[key_name].extend(value)
else:
merged_outputs[key_name].append(value)
for key, value in merged_outputs.items():
if key == "table":
continue
if len(value) == 0:
merged_outputs[key] = None
else:
merged_outputs[key] = value[0]
return merged_outputs