152 lines
6.1 KiB
Python
Executable File
152 lines
6.1 KiB
Python
Executable File
import os
|
|
import glob
|
|
import json
|
|
from tqdm import tqdm
|
|
|
|
def longestCommonSubsequence(text1: str, text2: str) -> int:
|
|
# https://leetcode.com/problems/longest-common-subsequence/discuss/351689/JavaPython-3-Two-DP-codes-of-O(mn)-and-O(min(m-n))-spaces-w-picture-and-analysis
|
|
dp = [[0] * (len(text2) + 1) for _ in range(len(text1) + 1)]
|
|
for i, c in enumerate(text1):
|
|
for j, d in enumerate(text2):
|
|
dp[i + 1][j + 1] = 1 + \
|
|
dp[i][j] if c == d else max(dp[i][j + 1], dp[i + 1][j])
|
|
return dp[-1][-1]
|
|
|
|
def write_to_json(file_path, content):
|
|
with open(file_path, mode="w", encoding="utf8") as f:
|
|
json.dump(content, f, ensure_ascii=False)
|
|
|
|
|
|
def read_json(file_path):
|
|
with open(file_path, "r") as f:
|
|
return json.load(f)
|
|
|
|
def check_label_exists(array, target_label):
|
|
for obj in array:
|
|
if obj["label"] == target_label:
|
|
return True # Label exists in the array
|
|
return False # Label does not exist in the array
|
|
|
|
def merged_kvu_outputs(loutputs: list) -> dict:
|
|
compiled = []
|
|
for output_model in loutputs:
|
|
for field in output_model:
|
|
if field['value'] != "" and not check_label_exists(compiled, field['label']):
|
|
element = {
|
|
'label': field['label'],
|
|
'value': field['value'],
|
|
}
|
|
compiled.append(element)
|
|
elif field['label'] == 'table' and check_label_exists(compiled, "table"):
|
|
for index, obj in enumerate(compiled):
|
|
if obj['label'] == 'table' and len(field['value']) > 0:
|
|
compiled[index]['value'].append(field['value'])
|
|
return compiled
|
|
|
|
|
|
def split_docs(doc_data: list, threshold: float=0.6) -> list:
|
|
num_pages = len(doc_data)
|
|
outputs = []
|
|
kvu_content = []
|
|
doc_data = sorted(doc_data, key=lambda x: int(x['page_number']))
|
|
for data in doc_data:
|
|
page_id = int(data['page_number'])
|
|
doc_type = data['document_type']
|
|
doc_class = data['document_class']
|
|
fields = data['fields']
|
|
if page_id == 0:
|
|
prev_title = doc_type
|
|
start_page_id = page_id
|
|
prev_class = doc_class
|
|
curr_title = doc_type if doc_type != "unknown" else prev_title
|
|
curr_class = doc_class if doc_class != "unknown" else "other"
|
|
kvu_content.append(fields)
|
|
similarity_score = longestCommonSubsequence(curr_title, prev_title) / len(prev_title)
|
|
if similarity_score < threshold:
|
|
end_page_id = page_id - 1
|
|
outputs.append({
|
|
"doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title,
|
|
"start_page": start_page_id,
|
|
"end_page": end_page_id,
|
|
"content": merged_kvu_outputs(kvu_content[:-1])
|
|
})
|
|
prev_title = curr_title
|
|
prev_class = curr_class
|
|
start_page_id = page_id
|
|
kvu_content = kvu_content[-1:]
|
|
if page_id == num_pages - 1: # end_page
|
|
outputs.append({
|
|
"doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title,
|
|
"start_page": start_page_id,
|
|
"end_page": page_id,
|
|
"content": merged_kvu_outputs(kvu_content)
|
|
})
|
|
elif page_id == num_pages - 1: # end_page
|
|
outputs.append({
|
|
"doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title,
|
|
"start_page": start_page_id,
|
|
"end_page": page_id,
|
|
"content": merged_kvu_outputs(kvu_content)
|
|
})
|
|
return outputs
|
|
|
|
|
|
def merge_sbt_output(loutputs):
|
|
# TODO: This function is too circumlocutory, need to refactor the whole flow
|
|
def dict_to_list_of_dict(the_dict):
|
|
output = []
|
|
for k,v in the_dict.items():
|
|
output.append({
|
|
'label': k,
|
|
'value': v,
|
|
})
|
|
return output
|
|
|
|
print("concat outputs: \n", loutputs)
|
|
|
|
merged_output = []
|
|
combined_output = {"retailername": None,
|
|
"sold_to_party": None,
|
|
"purchase_date": [],
|
|
"imei_number": [],
|
|
"invoice_no": None} # place holder for the output
|
|
for output in loutputs:
|
|
fields = output['fields']
|
|
if "doc_type" not in output: # Should not contain more than 1 page
|
|
for field in fields:
|
|
combined_output[field["label"]] = field["value"]
|
|
combined_output["imei_number"] = [combined_output["imei_number"]]
|
|
break
|
|
else:
|
|
if output['doc_type'] == "imei":
|
|
for field in fields:
|
|
if field["label"] == "imei_number":
|
|
combined_output[field["label"]].append(field["value"])
|
|
if output['doc_type'] == "invoice":
|
|
for field in fields:
|
|
if field["label"] in ["retailername", "sold_to_party", "purchase_date", "invoice_no"] :
|
|
if isinstance(combined_output[field["label"]], list):
|
|
if field["value"] is not None:
|
|
if isinstance(field["value"], list):
|
|
combined_output[field["label"]] += field["value"]
|
|
else:
|
|
combined_output[field["label"]].append(field["value"])
|
|
else:
|
|
combined_output[field["label"]] = field["value"]
|
|
|
|
merged_output.append({
|
|
"doc_type": "sbt_document",
|
|
"start_page": 1,
|
|
"end_page": len(loutputs),
|
|
"content": dict_to_list_of_dict(combined_output)
|
|
})
|
|
return merged_output
|
|
|
|
if __name__ == "__main__":
|
|
threshold = 0.9
|
|
json_path = "/home/sds/tuanlv/02-KVU/02-KVU_test/visualize/manulife_v2/json_outputs/HS_YCBT_No_IP_HMTD.json"
|
|
doc_data = read_json(json_path)
|
|
|
|
outputs = split_docs(doc_data, threshold)
|
|
|
|
write_to_json(os.path.join(os.path.dirname(json_path), "splited_doc.json"), outputs) |