sbt-idp/cope2n-ai-fi/common/utils_kvu/split_docs.py

150 lines
6.1 KiB
Python
Executable File

import os
import glob
import json
from tqdm import tqdm
def longestCommonSubsequence(text1: str, text2: str) -> int:
# https://leetcode.com/problems/longest-common-subsequence/discuss/351689/JavaPython-3-Two-DP-codes-of-O(mn)-and-O(min(m-n))-spaces-w-picture-and-analysis
dp = [[0] * (len(text2) + 1) for _ in range(len(text1) + 1)]
for i, c in enumerate(text1):
for j, d in enumerate(text2):
dp[i + 1][j + 1] = 1 + \
dp[i][j] if c == d else max(dp[i][j + 1], dp[i + 1][j])
return dp[-1][-1]
def write_to_json(file_path, content):
with open(file_path, mode="w", encoding="utf8") as f:
json.dump(content, f, ensure_ascii=False)
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
def check_label_exists(array, target_label):
for obj in array:
if obj["label"] == target_label:
return True # Label exists in the array
return False # Label does not exist in the array
def merged_kvu_outputs(loutputs: list) -> dict:
compiled = []
for output_model in loutputs:
for field in output_model:
if field['value'] != "" and not check_label_exists(compiled, field['label']):
element = {
'label': field['label'],
'value': field['value'],
}
compiled.append(element)
elif field['label'] == 'table' and check_label_exists(compiled, "table"):
for index, obj in enumerate(compiled):
if obj['label'] == 'table' and len(field['value']) > 0:
compiled[index]['value'].append(field['value'])
return compiled
def split_docs(doc_data: list, threshold: float=0.6) -> list:
num_pages = len(doc_data)
outputs = []
kvu_content = []
doc_data = sorted(doc_data, key=lambda x: int(x['page_number']))
for data in doc_data:
page_id = int(data['page_number'])
doc_type = data['document_type']
doc_class = data['document_class']
fields = data['fields']
if page_id == 0:
prev_title = doc_type
start_page_id = page_id
prev_class = doc_class
curr_title = doc_type if doc_type != "unknown" else prev_title
curr_class = doc_class if doc_class != "unknown" else "other"
kvu_content.append(fields)
similarity_score = longestCommonSubsequence(curr_title, prev_title) / len(prev_title)
if similarity_score < threshold:
end_page_id = page_id - 1
outputs.append({
"doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title,
"start_page": start_page_id,
"end_page": end_page_id,
"content": merged_kvu_outputs(kvu_content[:-1])
})
prev_title = curr_title
prev_class = curr_class
start_page_id = page_id
kvu_content = kvu_content[-1:]
if page_id == num_pages - 1: # end_page
outputs.append({
"doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title,
"start_page": start_page_id,
"end_page": page_id,
"content": merged_kvu_outputs(kvu_content)
})
elif page_id == num_pages - 1: # end_page
outputs.append({
"doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title,
"start_page": start_page_id,
"end_page": page_id,
"content": merged_kvu_outputs(kvu_content)
})
return outputs
def merge_sbt_output(loutputs):
# TODO: This function is too circumlocutory, need to refactor the whole flow
def dict_to_list_of_dict(the_dict):
output = []
for k,v in the_dict.items():
output.append({
'label': k,
'value': v,
})
return output
merged_output = []
combined_output = {"retailername": None,
"sold_to_party": None,
"purchase_date": [],
"imei_number": [],
"invoice_no": None} # place holder for the output
for output in loutputs:
fields = output['fields']
if "doc_type" not in output: # Should not contain more than 1 page
for field in fields:
combined_output[field["label"]] = field["value"]
combined_output["imei_number"] = [combined_output["imei_number"]]
break
else:
if output['doc_type'] == "imei":
for field in fields:
if field["label"] == "imei_number":
combined_output[field["label"]].append(field["value"])
if output['doc_type'] == "invoice":
for field in fields:
if field["label"] in ["retailername", "sold_to_party", "purchase_date", "invoice_no"] :
if isinstance(combined_output[field["label"]], list):
if field["value"] is not None:
if isinstance(field["value"], list):
combined_output[field["label"]] += field["value"]
else:
combined_output[field["label"]].append(field["value"])
else:
combined_output[field["label"]] = field["value"]
merged_output.append({
"doc_type": "sbt_document",
"start_page": 1,
"end_page": len(loutputs),
"content": dict_to_list_of_dict(combined_output)
})
return merged_output
if __name__ == "__main__":
threshold = 0.9
json_path = "/home/sds/tuanlv/02-KVU/02-KVU_test/visualize/manulife_v2/json_outputs/HS_YCBT_No_IP_HMTD.json"
doc_data = read_json(json_path)
outputs = split_docs(doc_data, threshold)
write_to_json(os.path.join(os.path.dirname(json_path), "splited_doc.json"), outputs)