238 lines
9.9 KiB
Python
238 lines
9.9 KiB
Python
|
import re
|
||
|
from sdsvkvu.utils.post_processing import longestCommonSubsequence, preprocessing
|
||
|
from sdsvkvu.utils.dictionary.vat import get_dict
|
||
|
|
||
|
|
||
|
# For FI-VAT project
|
||
|
def vat_key_replacing(vat_outputs: dict) -> dict:
|
||
|
outputs = {}
|
||
|
DKVU2XML = get_dict("kvu2xml")
|
||
|
for key, value in vat_outputs.items():
|
||
|
if key != "table":
|
||
|
outputs[DKVU2XML[key]] = value
|
||
|
else:
|
||
|
list_items = []
|
||
|
for item in value:
|
||
|
list_items.append({
|
||
|
DKVU2XML[item_key]: item_value for item_key, item_value in item.items()
|
||
|
})
|
||
|
outputs['table'] = list_items
|
||
|
return outputs
|
||
|
|
||
|
|
||
|
def vat_key_matching(text: str, threshold: float, dict_type: str):
|
||
|
dictionary = get_dict(dict_type)
|
||
|
processed_text = preprocessing(text)
|
||
|
|
||
|
# Step 1: Exactly matching
|
||
|
date_dict = get_dict("date")
|
||
|
for time_key, candidates in date_dict.items():
|
||
|
if any([processed_text == txt for txt in candidates]):
|
||
|
return "Ngày, tháng, năm lập hóa đơn", 5, time_key
|
||
|
|
||
|
extra_dict = get_dict("extra")
|
||
|
for key, candidates in dictionary.items():
|
||
|
candidates = candidates + extra_dict[key] if key in extra_dict.keys() else candidates
|
||
|
|
||
|
if key == 'Tên người bán' and processed_text == "kyboi":
|
||
|
return key, 8, processed_text
|
||
|
|
||
|
if any([processed_text == txt for txt in candidates]):
|
||
|
return key, 10, processed_text
|
||
|
|
||
|
# Step 2: LCS score
|
||
|
scores = {k: 0.0 for k in dictionary}
|
||
|
for k, v in dictionary.items():
|
||
|
if k in ("Ngày, tháng, năm lập hóa đơn"): continue
|
||
|
scores[k] = max([longestCommonSubsequence(processed_text, key)/len(key) for key in dictionary[k]])
|
||
|
key, score = max(scores.items(), key=lambda x: x[1])
|
||
|
return key if score > threshold else text, score, processed_text
|
||
|
|
||
|
|
||
|
def normalize_number_format(s: str) -> float:
|
||
|
s = s.replace(' ', '').replace('O', '0').replace('o', '0')
|
||
|
if s.endswith(",00") or s.endswith(".00"):
|
||
|
s = s[:-3]
|
||
|
if all([delimiter in s for delimiter in [',', '.']]):
|
||
|
s = s.replace('.', '').split(',')
|
||
|
remain_value = s[1].split('0')[0]
|
||
|
return int(s[0]) + int(remain_value) * 1 / (10**len(remain_value))
|
||
|
else:
|
||
|
s = s.replace(',', '').replace('.', '')
|
||
|
return int(s)
|
||
|
|
||
|
|
||
|
def post_process_item(item: dict) -> dict:
|
||
|
check_keys = ['Số lượng', 'Đơn giá', 'Doanh số mua chưa có thuế']
|
||
|
mis_key = []
|
||
|
for key in check_keys:
|
||
|
if item[key] in (None, '0'):
|
||
|
mis_key.append(key)
|
||
|
if len(mis_key) == 1:
|
||
|
try:
|
||
|
if mis_key[0] == check_keys[0] and normalize_number_format(item[check_keys[1]]) != 0:
|
||
|
item[mis_key[0]] = round(normalize_number_format(item[check_keys[2]]) / normalize_number_format(item[check_keys[1]])).__str__()
|
||
|
elif mis_key[0] == check_keys[1] and normalize_number_format(item[check_keys[0]]) != 0:
|
||
|
item[mis_key[0]] = (normalize_number_format(item[check_keys[2]]) / normalize_number_format(item[check_keys[0]])).__str__()
|
||
|
elif mis_key[0] == check_keys[2]:
|
||
|
item[mis_key[0]] = (normalize_number_format(item[check_keys[0]]) * normalize_number_format(item[check_keys[1]])).__str__()
|
||
|
except Exception as e:
|
||
|
print("Cannot post process this item with error:", e)
|
||
|
return item
|
||
|
|
||
|
|
||
|
def get_vat_table_info(outputs):
|
||
|
table = []
|
||
|
for single_item in outputs['table']:
|
||
|
item = {k: [] for k in get_dict("header").keys()}
|
||
|
for cell in single_item:
|
||
|
header_name, score, proceessed_text = vat_key_matching(cell['header'], threshold=0.75, dict_type="header")
|
||
|
if header_name in list(item.keys()):
|
||
|
# item[header_name] = value['text']
|
||
|
item[header_name].append({
|
||
|
'content': cell['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': cell['id']
|
||
|
})
|
||
|
|
||
|
for header_name, value in item.items():
|
||
|
if len(value) == 0:
|
||
|
if header_name in ("Số lượng", "Doanh số mua chưa có thuế"):
|
||
|
item[header_name] = '0'
|
||
|
else:
|
||
|
item[header_name] = None
|
||
|
continue
|
||
|
item[header_name] = max(value, key=lambda x: x['lcs_score'])['content'] # Get max lsc score
|
||
|
|
||
|
item = post_process_item(item)
|
||
|
|
||
|
if item["Mặt hàng"] == None:
|
||
|
continue
|
||
|
table.append(item)
|
||
|
return table
|
||
|
|
||
|
def get_vat_info(outputs):
|
||
|
# VAT Information
|
||
|
single_pairs = {k: [] for k in get_dict("key").keys()}
|
||
|
for pair in outputs['single']:
|
||
|
for raw_key_name, value in pair.items():
|
||
|
key_name, score, proceessed_text = vat_key_matching(raw_key_name, threshold=0.8, dict_type="key")
|
||
|
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
|
||
|
|
||
|
if key_name in list(single_pairs.keys()):
|
||
|
single_pairs[key_name].append({
|
||
|
'content': value['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': value['id'],
|
||
|
})
|
||
|
|
||
|
for triplet in outputs['triplet']:
|
||
|
for key, value_list in triplet.items():
|
||
|
if len(value_list) == 1:
|
||
|
key_name, score, proceessed_text = vat_key_matching(key, threshold=0.8, dict_type="key")
|
||
|
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
|
||
|
|
||
|
if key_name in list(single_pairs.keys()):
|
||
|
single_pairs[key_name].append({
|
||
|
'content': value_list[0]['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': value_list[0]['id']
|
||
|
})
|
||
|
|
||
|
for pair in value_list:
|
||
|
key_name, score, proceessed_text = vat_key_matching(pair['header'], threshold=0.8, dict_type="key")
|
||
|
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
|
||
|
|
||
|
if key_name in list(single_pairs.keys()):
|
||
|
single_pairs[key_name].append({
|
||
|
'content': pair['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': pair['id']
|
||
|
})
|
||
|
|
||
|
for table_row in outputs['table']:
|
||
|
for pair in table_row:
|
||
|
key_name, score, proceessed_text = vat_key_matching(pair['header'], threshold=0.8, dict_type="key")
|
||
|
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
|
||
|
|
||
|
if key_name in list(single_pairs.keys()):
|
||
|
single_pairs[key_name].append({
|
||
|
'content': pair['text'],
|
||
|
'processed_key_name': proceessed_text,
|
||
|
'lcs_score': score,
|
||
|
'token_id': pair['id']
|
||
|
})
|
||
|
|
||
|
return single_pairs
|
||
|
|
||
|
|
||
|
def post_process_tax_code(tax_code_raw: str):
|
||
|
if len(tax_code_raw.replace(' ', '')) not in (10, 13): # to remove the first/last number dupicated
|
||
|
tax_code_raw = tax_code_raw.split(' ')
|
||
|
tax_code_raw = sorted(tax_code_raw, key=lambda x: len(x), reverse=True)[0]
|
||
|
return tax_code_raw.replace(' ', '')
|
||
|
|
||
|
|
||
|
def merge_vat_info(single_pairs):
|
||
|
vat_outputs = {k: None for k in list(single_pairs)}
|
||
|
for key_name, list_potential_value in single_pairs.items():
|
||
|
if key_name in ("Ngày, tháng, năm lập hóa đơn"):
|
||
|
if len(list_potential_value) == 1:
|
||
|
vat_outputs[key_name] = list_potential_value[0]['content']
|
||
|
else:
|
||
|
date_time = {'day': 'dd', 'month': 'mm', 'year': 'yyyy'}
|
||
|
for value in list_potential_value:
|
||
|
date_time[value['processed_key_name']] = re.sub("[^0-9]", "", value['content'])
|
||
|
vat_outputs[key_name] = f"{date_time['day']}/{date_time['month']}/{date_time['year']}"
|
||
|
else:
|
||
|
if len(list_potential_value) == 0: continue
|
||
|
if key_name in ("Mã số thuế người bán"):
|
||
|
selected_value = min(list_potential_value, key=lambda x: x['token_id']) # Get first tax code
|
||
|
vat_outputs[key_name] = post_process_tax_code(selected_value['content'])
|
||
|
|
||
|
else:
|
||
|
selected_value = max(list_potential_value, key=lambda x: x['lcs_score']) # Get max lsc score
|
||
|
vat_outputs[key_name] = selected_value['content']
|
||
|
return vat_outputs
|
||
|
|
||
|
|
||
|
def export_kvu_for_VAT_invoice(outputs):
|
||
|
vat_outputs = {}
|
||
|
# List of items in table
|
||
|
table = get_vat_table_info(outputs)
|
||
|
# VAT Information
|
||
|
single_pairs = get_vat_info(outputs)
|
||
|
vat_outputs = merge_vat_info(single_pairs)
|
||
|
# Combine VAT information and table
|
||
|
vat_outputs['table'] = table
|
||
|
return vat_outputs
|
||
|
|
||
|
|
||
|
def merged_kvu_for_VAT_invoice_for_multi_pages(lvat_outputs: list):
|
||
|
merged_outputs = {k: [] for k in get_dict("key").keys()}
|
||
|
merged_outputs['table'] = []
|
||
|
for outputs in lvat_outputs:
|
||
|
for key_name, value in outputs.items():
|
||
|
if key_name == "table":
|
||
|
merged_outputs[key_name].extend(value)
|
||
|
else:
|
||
|
if value == None or value == "dd/mm/yyyy":
|
||
|
# print(key_name, value)
|
||
|
continue
|
||
|
merged_outputs[key_name].append(value)
|
||
|
|
||
|
for key, value in merged_outputs.items():
|
||
|
if key == "table":
|
||
|
continue
|
||
|
if len(value) == 0:
|
||
|
merged_outputs[key] = None
|
||
|
else:
|
||
|
merged_outputs[key] = value[0]
|
||
|
|
||
|
return merged_outputs
|
||
|
|