import os import glob import json from tqdm import tqdm def longestCommonSubsequence(text1: str, text2: str) -> int: # https://leetcode.com/problems/longest-common-subsequence/discuss/351689/JavaPython-3-Two-DP-codes-of-O(mn)-and-O(min(m-n))-spaces-w-picture-and-analysis dp = [[0] * (len(text2) + 1) for _ in range(len(text1) + 1)] for i, c in enumerate(text1): for j, d in enumerate(text2): dp[i + 1][j + 1] = 1 + \ dp[i][j] if c == d else max(dp[i][j + 1], dp[i + 1][j]) return dp[-1][-1] def write_to_json(file_path, content): with open(file_path, mode="w", encoding="utf8") as f: json.dump(content, f, ensure_ascii=False) def read_json(file_path): with open(file_path, "r") as f: return json.load(f) def check_label_exists(array, target_label): for obj in array: if obj["label"] == target_label: return True # Label exists in the array return False # Label does not exist in the array def merged_kvu_outputs(loutputs: list) -> dict: compiled = [] for output_model in loutputs: for field in output_model: if field['value'] != "" and not check_label_exists(compiled, field['label']): element = { 'label': field['label'], 'value': field['value'], } compiled.append(element) elif field['label'] == 'table' and check_label_exists(compiled, "table"): for index, obj in enumerate(compiled): if obj['label'] == 'table' and len(field['value']) > 0: compiled[index]['value'].append(field['value']) return compiled def split_docs(doc_data: list, threshold: float=0.6) -> list: num_pages = len(doc_data) outputs = [] kvu_content = [] doc_data = sorted(doc_data, key=lambda x: int(x['page_number'])) for data in doc_data: page_id = int(data['page_number']) doc_type = data['document_type'] doc_class = data['document_class'] fields = data['fields'] if page_id == 0: prev_title = doc_type start_page_id = page_id prev_class = doc_class curr_title = doc_type if doc_type != "unknown" else prev_title curr_class = doc_class if doc_class != "unknown" else "other" kvu_content.append(fields) similarity_score = longestCommonSubsequence(curr_title, prev_title) / len(prev_title) if similarity_score < threshold: end_page_id = page_id - 1 outputs.append({ "doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title, "start_page": start_page_id, "end_page": end_page_id, "content": merged_kvu_outputs(kvu_content[:-1]) }) prev_title = curr_title prev_class = curr_class start_page_id = page_id kvu_content = kvu_content[-1:] if page_id == num_pages - 1: # end_page outputs.append({ "doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title, "start_page": start_page_id, "end_page": page_id, "content": merged_kvu_outputs(kvu_content) }) elif page_id == num_pages - 1: # end_page outputs.append({ "doc_type": f"({prev_class}) {prev_title}" if prev_class != "other" else prev_title, "start_page": start_page_id, "end_page": page_id, "content": merged_kvu_outputs(kvu_content) }) return outputs def merge_sbt_output(loutputs): # TODO: This function is too circumlocutory, need to refactor the whole flow def dict_to_list_of_dict(the_dict): output = [] for k,v in the_dict.items(): output.append({ 'label': k, 'value': v, }) return output merged_output = [] combined_output = {"retailername": None, "sold_to_party": None, "purchase_date": [], "imei_number": [], "invoice_no": None} # place holder for the output for output in loutputs: fields = output['fields'] if "doc_type" not in output: # Should not contain more than 1 page for field in fields: combined_output[field["label"]] = field["value"] combined_output["imei_number"] = [combined_output["imei_number"]] break else: if output['doc_type'] == "imei": for field in fields: if field["label"] == "imei_number": combined_output[field["label"]].append(field["value"]) if output['doc_type'] == "invoice": for field in fields: if field["label"] in ["retailername", "sold_to_party", "purchase_date", "invoice_no"] : if isinstance(combined_output[field["label"]], list): if field["value"] is not None: if isinstance(field["value"], list): combined_output[field["label"]] += field["value"] else: combined_output[field["label"]].append(field["value"]) else: combined_output[field["label"]] = field["value"] merged_output.append({ "doc_type": "sbt_document", "start_page": 1, "end_page": len(loutputs), "content": dict_to_list_of_dict(combined_output) }) return merged_output if __name__ == "__main__": threshold = 0.9 json_path = "/home/sds/tuanlv/02-KVU/02-KVU_test/visualize/manulife_v2/json_outputs/HS_YCBT_No_IP_HMTD.json" doc_data = read_json(json_path) outputs = split_docs(doc_data, threshold) write_to_json(os.path.join(os.path.dirname(json_path), "splited_doc.json"), outputs)