import re from sdsvkvu.utils.post_processing import ( longestCommonSubsequence, longest_common_substring, preprocessing, parse_date, split_key_value_by_colon, normalize_imei, normalize_website, normalize_hotline, normalize_seller, normalize_voucher ) from sdsvkvu.utils.dictionary.sbt_v2 import get_dict def post_process_date(list_dates): if len(list_dates) == 0: return None selected_value = max(list_dates, key=lambda x: x["lcs_score"]) # Get max lsc score if not isinstance(selected_value["content"], str): is_parser_error = True date_formated = None else: date_formated, is_parser_error = parse_date(selected_value["content"]) return date_formated def post_process_serial(list_serials): if len(list_serials) == 0: return None selected_value = max( list_serials, key=lambda x: x["lcs_score"] ) # Get max lsc score return selected_value["content"].strip() def post_process_imei(list_imeis): imeis = [] for v in list_imeis: if not isinstance(v["content"], str): continue imei = v["content"].replace(" ", "") if imei.isdigit() and len(imei) > 5: # imei is number and have more 5 digits imeis.append({ "content": imei, "token_id": v['token_id'] }) if len(imeis) > 0: return sorted(imeis, key=lambda x: int(x["token_id"]))[0]['content'].strip() return None def post_process_qty(inp_str: str) -> str: pattern = r"\d" match = re.search(pattern, inp_str) if match: return match.group() return inp_str def post_process_seller(list_sellers): seller_mapping = get_dict(type="seller_mapping") vote_list = {} for seller in list_sellers: seller_name = seller['content'] if seller_name not in vote_list: vote_list[seller_name] = 0 vote_list[seller_name] += seller['lcs_score'] if len(vote_list) > 0: selected_value = max( vote_list, key=lambda x: vote_list[x] ) # Get major voting for norm_seller, candidates in seller_mapping.items(): if any(preprocessing(txt) == preprocessing(selected_value) for txt in candidates): selected_value = norm_seller break selected_value = selected_value.lower() for txt in candidates: txt = txt.lower() if txt in selected_value: selected_value = selected_value.replace(txt, norm_seller) return selected_value.strip().title() return None def post_process_subsidiary(list_subsidiaries): if len(list_subsidiaries) > 0: selected_value = max( list_subsidiaries, key=lambda x: x["lcs_score"] ) # Get max lsc score return selected_value["content"] return None def sbt_key_matching(text: str, threshold: float, dict_type: str): dictionary = get_dict(type=dict_type) processed_text = preprocessing(text) scores = {k: 0.0 for k in dictionary} # Step 1: LCS score for k, v in dictionary.items(): score1 = max([ longestCommonSubsequence(processed_text, key) / max(len(key), len(processed_text)) for key in dictionary[k]]) score2 = max([ longest_common_substring(processed_text, key) / max(len(key), len(processed_text)) for key in dictionary[k]]) scores[k] = score1 if score1 > score2 else score2 key, score = max(scores.items(), key=lambda x: x[1]) return key if score >= threshold else text, score, processed_text def get_date_value(list_dates): date_outputs = [] for date_obj in list_dates: if "raw_key_name" in date_obj: date_key, date_value = split_key_value_by_colon(date_obj['raw_key_name'], date_obj['text']) else: date_key, date_value = split_key_value_by_colon( date_obj['text'] if date_obj['class'] == "date_key" else None, date_obj['text'] if date_obj['class'] == "date_value" else None ) # print(f"======{date_key} : {date_value}") if date_key is None and date_obj['class'] == "date_value": date_value = date_obj['text'] proceessed_text, score = "", len(date_value) if isinstance(date_value, str) else 0 else: key_name, score, proceessed_text = sbt_key_matching( date_key, threshold=0.8, dict_type="date" ) # print(f"{date_key} ==> {proceessed_text} ==> {key_name} : {score} - {date_value}") date_outputs.append( { "content": date_value, "processed_key_name": proceessed_text, "lcs_score": score, "token_id": date_obj["id"], } ) return date_outputs def get_serial_imei(list_sn): sn_outputs = {"serial_number": [], "imei_number": []} for sn_obj in list_sn: if "raw_key_name" in sn_obj: sn_key, sn_value = split_key_value_by_colon(sn_obj['raw_key_name'], sn_obj['text']) else: sn_key, sn_value = split_key_value_by_colon( sn_obj['text'] if sn_obj['class'] == "sn_key" else None, sn_obj['text'] if sn_obj['class'] == "sn_value" else None ) # print(f"====== {sn_key} : {sn_value}") if sn_key is None and sn_obj['class'] == "sn_value": sn_value = sn_obj['text'] key_name, proceessed_text, score = None, "", 0.8 else: key_name, score, proceessed_text = sbt_key_matching( sn_key, threshold=0.8, dict_type="imei" ) # print(f"{sn_key} ==> {proceessed_text} ==> {key_name} : {score} - {sn_value}") value = { "content": sn_value, "processed_key_name": proceessed_text, "lcs_score": score, "token_id": sn_obj["id"], } if key_name is None: if normalize_imei(sn_value).isdigit(): sn_outputs['imei_number'].append(value) else: sn_outputs['serial_number'].append(value) elif key_name in ['imei_number', 'serial_number']: sn_outputs[key_name].append(value) return sn_outputs def get_product_info(list_items): table = [] for row in list_items: item = {} for key, value in row.items(): item[key] = None if len(value) > 0: if key == "qty": item[key] = post_process_qty(value[0]["text"]) else: item[key] = value[0]["text"] table.append(item) return table def get_seller(outputs): # Post processing to combine seller and extra information (voucher, hotline, website) seller_outputs = [] voucher_info = [] for key_field in ["seller", "website", "hotline", "voucher", "sold_by"]: threshold = 0.7 func_name = f"normalize_{key_field}" for potential_seller in outputs[key_field]: seller_name, score, processed_text = sbt_key_matching(eval(func_name)(potential_seller["text"]), threshold=threshold, dict_type="seller") print(f"{potential_seller['text']} ==> {processed_text} ==> {seller_name} : {score}") if key_field in ("voucher"): voucher_info.append(potential_seller['text']) seller_outputs.append( { "content": seller_name, "raw_seller_name": potential_seller["text"], "processed_seller_name": processed_text, "lcs_score": score, "info": key_field } ) for voucher in voucher_info: for i in range(len(seller_outputs)): if voucher.lower() not in seller_outputs[i]['content'].lower(): seller_outputs[i]['content'] = f"{voucher} {seller_outputs[i]['content']}" return seller_outputs def get_subsidiary(list_subsidiaries): subsidiary_outputs = [] sold_by_info = [] for sold_obj in list_subsidiaries: if "raw_key_name" in sold_obj: sold_key, sold_value = split_key_value_by_colon(sold_obj['raw_key_name'], sold_obj['text']) else: sold_key, sold_value = split_key_value_by_colon( sold_obj['text'] if sold_obj['class'] == "sold_key" else None, sold_obj['text'] if sold_obj['class'] == "sold_value" else None ) # print(f"======{sold_key} : {sold_value}") if sold_key is None and sold_obj['class'] == "sold_value": sold_value = sold_obj['text'] key_name, proceessed_text, score = "unknown", "", 0.8 else: key_name, score, proceessed_text = sbt_key_matching( sold_key, threshold=0.8, dict_type="sold_by" ) # print(f"{sold_key} ==> {proceessed_text} ==> {key_name} : {score} - {sold_value}") if key_name == "sold_by": sold_by_info.append(sold_obj) else: subsidiary_outputs.append( { "content": sold_value, "processed_key_name": proceessed_text, "lcs_score": score, "token_id": sold_obj["id"], } ) return subsidiary_outputs, sold_by_info def export_kvu_for_SBT(outputs): # sold to party list_subsidiaries, sold_by_info = get_subsidiary(outputs['sold_value']) # seller outputs['sold_by'] = sold_by_info list_sellers = get_seller(outputs) # date list_dates = get_date_value(outputs["date_value"]) # serial_number or imei list_serial_imei = get_serial_imei(outputs["serial_imei"]) serial_number = post_process_serial(list_serial_imei["serial_number"]) imei_number = post_process_imei(list_serial_imei["imei_number"]) # table # list_items = get_product_info(outputs["table"]) ap_outputs = {} ap_outputs["retailername"] = post_process_seller(list_sellers) ap_outputs["sold_to_party"] = post_process_subsidiary(list_subsidiaries) ap_outputs["purchase_date"] = post_process_date(list_dates) ap_outputs["imei_number"] = imei_number if imei_number is not None else serial_number # ap_outputs["table"] = list_items return ap_outputs def merged_kvu_for_SBT_for_multi_pages(lvat_outputs: list): merged_outputs = {k: [] for k in get_dict("key").keys()} merged_outputs['table'] = [] for outputs in lvat_outputs: for key_name, value in outputs.items(): if key_name == "table": merged_outputs[key_name].extend(value) else: merged_outputs[key_name].append(value) for key, value in merged_outputs.items(): if key == "table": continue if len(value) == 0: merged_outputs[key] = None else: merged_outputs[key] = value[0] return merged_outputs