sbt-idp/cope2n-ai-fi/modules/_sdsvkvu/sdsvkvu/utils/query/vat.py
2023-11-30 18:22:16 +07:00

238 lines
9.9 KiB
Python

import re
from sdsvkvu.utils.post_processing import longestCommonSubsequence, preprocessing
from sdsvkvu.utils.dictionary.vat import get_dict
# For FI-VAT project
def vat_key_replacing(vat_outputs: dict) -> dict:
outputs = {}
DKVU2XML = get_dict("kvu2xml")
for key, value in vat_outputs.items():
if key != "table":
outputs[DKVU2XML[key]] = value
else:
list_items = []
for item in value:
list_items.append({
DKVU2XML[item_key]: item_value for item_key, item_value in item.items()
})
outputs['table'] = list_items
return outputs
def vat_key_matching(text: str, threshold: float, dict_type: str):
dictionary = get_dict(dict_type)
processed_text = preprocessing(text)
# Step 1: Exactly matching
date_dict = get_dict("date")
for time_key, candidates in date_dict.items():
if any([processed_text == txt for txt in candidates]):
return "Ngày, tháng, năm lập hóa đơn", 5, time_key
extra_dict = get_dict("extra")
for key, candidates in dictionary.items():
candidates = candidates + extra_dict[key] if key in extra_dict.keys() else candidates
if key == 'Tên người bán' and processed_text == "kyboi":
return key, 8, processed_text
if any([processed_text == txt for txt in candidates]):
return key, 10, processed_text
# Step 2: LCS score
scores = {k: 0.0 for k in dictionary}
for k, v in dictionary.items():
if k in ("Ngày, tháng, năm lập hóa đơn"): continue
scores[k] = max([longestCommonSubsequence(processed_text, key)/len(key) for key in dictionary[k]])
key, score = max(scores.items(), key=lambda x: x[1])
return key if score > threshold else text, score, processed_text
def normalize_number_format(s: str) -> float:
s = s.replace(' ', '').replace('O', '0').replace('o', '0')
if s.endswith(",00") or s.endswith(".00"):
s = s[:-3]
if all([delimiter in s for delimiter in [',', '.']]):
s = s.replace('.', '').split(',')
remain_value = s[1].split('0')[0]
return int(s[0]) + int(remain_value) * 1 / (10**len(remain_value))
else:
s = s.replace(',', '').replace('.', '')
return int(s)
def post_process_item(item: dict) -> dict:
check_keys = ['Số lượng', 'Đơn giá', 'Doanh số mua chưa có thuế']
mis_key = []
for key in check_keys:
if item[key] in (None, '0'):
mis_key.append(key)
if len(mis_key) == 1:
try:
if mis_key[0] == check_keys[0] and normalize_number_format(item[check_keys[1]]) != 0:
item[mis_key[0]] = round(normalize_number_format(item[check_keys[2]]) / normalize_number_format(item[check_keys[1]])).__str__()
elif mis_key[0] == check_keys[1] and normalize_number_format(item[check_keys[0]]) != 0:
item[mis_key[0]] = (normalize_number_format(item[check_keys[2]]) / normalize_number_format(item[check_keys[0]])).__str__()
elif mis_key[0] == check_keys[2]:
item[mis_key[0]] = (normalize_number_format(item[check_keys[0]]) * normalize_number_format(item[check_keys[1]])).__str__()
except Exception as e:
print("Cannot post process this item with error:", e)
return item
def get_vat_table_info(outputs):
table = []
for single_item in outputs['table']:
item = {k: [] for k in get_dict("header").keys()}
for cell in single_item:
header_name, score, proceessed_text = vat_key_matching(cell['header'], threshold=0.75, dict_type="header")
if header_name in list(item.keys()):
# item[header_name] = value['text']
item[header_name].append({
'content': cell['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': cell['id']
})
for header_name, value in item.items():
if len(value) == 0:
if header_name in ("Số lượng", "Doanh số mua chưa có thuế"):
item[header_name] = '0'
else:
item[header_name] = None
continue
item[header_name] = max(value, key=lambda x: x['lcs_score'])['content'] # Get max lsc score
item = post_process_item(item)
if item["Mặt hàng"] == None:
continue
table.append(item)
return table
def get_vat_info(outputs):
# VAT Information
single_pairs = {k: [] for k in get_dict("key").keys()}
for pair in outputs['single']:
for raw_key_name, value in pair.items():
key_name, score, proceessed_text = vat_key_matching(raw_key_name, threshold=0.8, dict_type="key")
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': value['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': value['id'],
})
for triplet in outputs['triplet']:
for key, value_list in triplet.items():
if len(value_list) == 1:
key_name, score, proceessed_text = vat_key_matching(key, threshold=0.8, dict_type="key")
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': value_list[0]['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': value_list[0]['id']
})
for pair in value_list:
key_name, score, proceessed_text = vat_key_matching(pair['header'], threshold=0.8, dict_type="key")
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': pair['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': pair['id']
})
for table_row in outputs['table']:
for pair in table_row:
key_name, score, proceessed_text = vat_key_matching(pair['header'], threshold=0.8, dict_type="key")
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': pair['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': pair['id']
})
return single_pairs
def post_process_tax_code(tax_code_raw: str):
if len(tax_code_raw.replace(' ', '')) not in (10, 13): # to remove the first/last number dupicated
tax_code_raw = tax_code_raw.split(' ')
tax_code_raw = sorted(tax_code_raw, key=lambda x: len(x), reverse=True)[0]
return tax_code_raw.replace(' ', '')
def merge_vat_info(single_pairs):
vat_outputs = {k: None for k in list(single_pairs)}
for key_name, list_potential_value in single_pairs.items():
if key_name in ("Ngày, tháng, năm lập hóa đơn"):
if len(list_potential_value) == 1:
vat_outputs[key_name] = list_potential_value[0]['content']
else:
date_time = {'day': 'dd', 'month': 'mm', 'year': 'yyyy'}
for value in list_potential_value:
date_time[value['processed_key_name']] = re.sub("[^0-9]", "", value['content'])
vat_outputs[key_name] = f"{date_time['day']}/{date_time['month']}/{date_time['year']}"
else:
if len(list_potential_value) == 0: continue
if key_name in ("Mã số thuế người bán"):
selected_value = min(list_potential_value, key=lambda x: x['token_id']) # Get first tax code
vat_outputs[key_name] = post_process_tax_code(selected_value['content'])
else:
selected_value = max(list_potential_value, key=lambda x: x['lcs_score']) # Get max lsc score
vat_outputs[key_name] = selected_value['content']
return vat_outputs
def export_kvu_for_VAT_invoice(outputs):
vat_outputs = {}
# List of items in table
table = get_vat_table_info(outputs)
# VAT Information
single_pairs = get_vat_info(outputs)
vat_outputs = merge_vat_info(single_pairs)
# Combine VAT information and table
vat_outputs['table'] = table
return vat_outputs
def merged_kvu_for_VAT_invoice_for_multi_pages(lvat_outputs: list):
merged_outputs = {k: [] for k in get_dict("key").keys()}
merged_outputs['table'] = []
for outputs in lvat_outputs:
for key_name, value in outputs.items():
if key_name == "table":
merged_outputs[key_name].extend(value)
else:
if value == None or value == "dd/mm/yyyy":
# print(key_name, value)
continue
merged_outputs[key_name].append(value)
for key, value in merged_outputs.items():
if key == "table":
continue
if len(value) == 0:
merged_outputs[key] = None
else:
merged_outputs[key] = value[0]
return merged_outputs