186 lines
7.9 KiB
Python
186 lines
7.9 KiB
Python
|
from sdsvkvu.utils.post_processing import longestCommonSubsequence, preprocessing, is_string_in_range
|
||
|
from sdsvkvu.utils.dictionary.sbt import get_dict
|
||
|
|
||
|
# For SBT project
|
||
|
def sbt_key_matching(text: str, threshold: float, dict_type: str):
|
||
|
dictionary = get_dict(type=dict_type)
|
||
|
processed_text = preprocessing(text)
|
||
|
|
||
|
# Step 1: Exactly matching
|
||
|
extra_dict = get_dict("extra")
|
||
|
for key, candidates in dictionary.items():
|
||
|
candidates = candidates + extra_dict[key] if key in extra_dict.keys() else candidates
|
||
|
|
||
|
if any([processed_text == txt for txt in candidates]):
|
||
|
return key, 10, processed_text
|
||
|
|
||
|
# Step 2: LCS score
|
||
|
scores = {k: 0.0 for k in dictionary}
|
||
|
for k, v in dictionary.items():
|
||
|
scores[k] = max([longestCommonSubsequence(processed_text, key)/len(key) for key in dictionary[k]])
|
||
|
|
||
|
key, score = max(scores.items(), key=lambda x: x[1])
|
||
|
return key if score >= threshold else text, score, processed_text
|
||
|
|
||
|
def get_sbt_table_info(outputs):
|
||
|
table = []
|
||
|
for single_item in outputs['table']:
|
||
|
item = {k: [] for k in get_dict("header").keys()}
|
||
|
for cell in single_item:
|
||
|
header_name, score, proceessed_text = sbt_key_matching(cell['header'], threshold=0.8, dict_type="header")
|
||
|
# print(f"{cell['header']} ==> {proceessed_text} ==> {header_name} : {score} - {cell['text']}")
|
||
|
is_header_valid = False
|
||
|
if header_name in list(item.keys()):
|
||
|
if header_name != "productname":
|
||
|
is_header_valid = True
|
||
|
elif cell['class'] == 'key': # Header with name of itemno as productname only when key
|
||
|
is_header_valid = True
|
||
|
_, _, proceessed_text = sbt_key_matching(cell['text'], threshold=0.8, dict_type="header")
|
||
|
if any([txt in proceessed_text for txt in ["originalreceipt", "homeclubvoucher", "ippuob"]]):
|
||
|
# print(proceessed_text)
|
||
|
is_header_valid = False
|
||
|
else:
|
||
|
is_header_valid = False
|
||
|
|
||
|
if is_header_valid:
|
||
|
item[header_name].append({
|
||
|
'content': cell['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': cell['id']
|
||
|
})
|
||
|
|
||
|
for header_name, value in item.items():
|
||
|
if len(value) == 0:
|
||
|
item[header_name] = None
|
||
|
continue
|
||
|
item[header_name] = max(value, key=lambda x: x['lcs_score'])['content'] # Get max lsc score
|
||
|
|
||
|
table.append(item)
|
||
|
return table
|
||
|
|
||
|
def get_sbt_triplet_info(outputs):
|
||
|
triplet_pairs = []
|
||
|
for single_item in outputs['triplet']:
|
||
|
item = {k: [] for k in get_dict("header").keys()}
|
||
|
is_item_valid = 0
|
||
|
for key_name, list_value in single_item.items():
|
||
|
for value in list_value:
|
||
|
if value['header'] == "non-header":
|
||
|
continue
|
||
|
header_name, score, proceessed_text = sbt_key_matching(value['header'], threshold=0.8, dict_type="header")
|
||
|
if header_name in list(item.keys()):
|
||
|
is_item_valid = 1
|
||
|
item[header_name].append({
|
||
|
'content': value['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': value['id']
|
||
|
})
|
||
|
|
||
|
if is_item_valid == 1:
|
||
|
for header_name, value in item.items():
|
||
|
if len(value) == 0:
|
||
|
item[header_name] = None
|
||
|
continue
|
||
|
item[header_name] = max(value, key=lambda x: x['lcs_score'])['content'] # Get max lsc score
|
||
|
|
||
|
item['productname'] = key_name
|
||
|
# triplet_pairs.append({key_name: new_item})
|
||
|
triplet_pairs.append(item)
|
||
|
|
||
|
# else: ## Triplet => key as productname
|
||
|
# item['productname'] = key_name
|
||
|
# for value in list_value:
|
||
|
# # print(value)
|
||
|
# if is_string_in_range(value['text']):
|
||
|
# item['qty'] = value['text']
|
||
|
# triplet_pairs.append(item)
|
||
|
return triplet_pairs
|
||
|
|
||
|
|
||
|
def get_sbt_info(outputs):
|
||
|
single_pairs = {k: [] for k in get_dict("key").keys()}
|
||
|
for pair in outputs['single']:
|
||
|
for key_name, value in pair.items():
|
||
|
key_name, score, proceessed_text = sbt_key_matching(key_name, threshold=0.8, dict_type="key")
|
||
|
# print(f"{key} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
|
||
|
|
||
|
if key_name in list(single_pairs):
|
||
|
single_pairs[key_name].append({
|
||
|
'content': value['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': value['id']
|
||
|
})
|
||
|
|
||
|
### Get single_pair of serial_number if it predict as a table (Product Information)
|
||
|
is_product_info = False
|
||
|
for table_row in outputs['table']:
|
||
|
pair = {"key": None, 'value': None}
|
||
|
for cell in table_row:
|
||
|
_, _, proceessed_text = sbt_key_matching(cell['header'], threshold=0.8, dict_type="key")
|
||
|
if any(txt in proceessed_text for txt in ['product', 'information', 'productinformation']):
|
||
|
is_product_info = True
|
||
|
if cell['class'] in pair:
|
||
|
pair[cell['class']] = cell
|
||
|
|
||
|
if all(v is not None for k, v in pair.items()) and is_product_info == True:
|
||
|
key_name, score, proceessed_text = sbt_key_matching(pair['key']['text'], threshold=0.8, dict_type="key")
|
||
|
# print(f"{pair['key']['text']} ==> {proceessed_text} ==> {key_name} : {score} - {pair['value']['text']}")
|
||
|
|
||
|
if key_name in list(single_pairs):
|
||
|
single_pairs[key_name].append({
|
||
|
'content': pair['value']['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': pair['value']['id']
|
||
|
})
|
||
|
### block_end
|
||
|
|
||
|
ap_outputs = {k: None for k in list(single_pairs)}
|
||
|
for key_name, list_potential_value in single_pairs.items():
|
||
|
if len(list_potential_value) == 0: continue
|
||
|
if key_name == "imei_number":
|
||
|
# print('list_potential_value', list_potential_value)
|
||
|
ap_outputs[key_name] = []
|
||
|
for v in list_potential_value:
|
||
|
imei = v['content'].replace(' ', '')
|
||
|
if imei.isdigit() and len(imei) > 5: # imei is number and have more 5 digits
|
||
|
ap_outputs[key_name].append(imei)
|
||
|
else:
|
||
|
selected_value = max(list_potential_value, key=lambda x: x['lcs_score']) # Get max lsc score
|
||
|
ap_outputs[key_name] = selected_value['content']
|
||
|
|
||
|
return ap_outputs
|
||
|
|
||
|
def export_kvu_for_SDSAP(outputs):
|
||
|
# List of items in table
|
||
|
table = get_sbt_table_info(outputs)
|
||
|
triplet_pairs = get_sbt_triplet_info(outputs)
|
||
|
table = table + triplet_pairs
|
||
|
|
||
|
ap_outputs = get_sbt_info(outputs)
|
||
|
|
||
|
ap_outputs['table'] = table
|
||
|
return ap_outputs
|
||
|
|
||
|
def merged_kvu_for_SDSAP_for_multi_pages(lvat_outputs: list):
|
||
|
merged_outputs = {k: [] for k in get_dict("key").keys()}
|
||
|
merged_outputs['table'] = []
|
||
|
for outputs in lvat_outputs:
|
||
|
for key_name, value in outputs.items():
|
||
|
if key_name == "table":
|
||
|
merged_outputs[key_name].extend(value)
|
||
|
else:
|
||
|
merged_outputs[key_name].append(value)
|
||
|
|
||
|
for key, value in merged_outputs.items():
|
||
|
if key == "table":
|
||
|
continue
|
||
|
if len(value) == 0:
|
||
|
merged_outputs[key] = None
|
||
|
else:
|
||
|
merged_outputs[key] = value[0]
|
||
|
|
||
|
return merged_outputs
|