sbt-idp/cope2n-ai-fi/api/Kie_Invoice_AP/AnyKey_Value/utils/utils.py
2023-11-30 18:22:16 +07:00

808 lines
33 KiB
Python
Executable File

import os
import cv2
import json
import random
import glob
import re
import numpy as np
from tqdm import tqdm
from pdf2image import convert_from_path
from dicttoxml import dicttoxml
from word_preprocess import (
vat_standardizer,
ap_standardizer,
get_string_with_word2line,
split_key_value_by_colon,
normalize_kvu_output,
normalize_kvu_output_for_manulife,
manulife_standardizer
)
from utils.kvu_dictionary import (
vat_dictionary,
ap_dictionary,
manulife_dictionary
)
def create_dir(save_dir=''):
if not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True)
# else:
# print("DIR already existed.")
# print('Save dir : {}'.format(save_dir))
def convert_pdf2img(pdf_dir, save_dir):
pdf_files = glob.glob(f'{pdf_dir}/*.pdf')
print('No. pdf files:', len(pdf_files))
print(pdf_files)
for file in tqdm(pdf_files):
pdf2img(file, save_dir, n_pages=-1, return_fname=False)
# pages = convert_from_path(file, 500)
# for i, page in enumerate(pages):
# page.save(os.path.join(save_dir, os.path.basename(file).replace('.pdf', f'_{i}.jpg')), 'JPEG')
print('Done!!!')
def pdf2img(pdf_path, save_dir, n_pages=-1, return_fname=False):
file_names = []
pages = convert_from_path(pdf_path)
if n_pages != -1:
pages = pages[:n_pages]
for i, page in enumerate(pages):
_save_path = os.path.join(save_dir, os.path.basename(pdf_path).replace('.pdf', f'_{i}.jpg'))
page.save(_save_path, 'JPEG')
file_names.append(_save_path)
if return_fname:
return file_names
def xyxy2xywh(bbox):
return [
float(bbox[0]),
float(bbox[1]),
float(bbox[2]) - float(bbox[0]),
float(bbox[3]) - float(bbox[1]),
]
def write_to_json(file_path, content):
with open(file_path, mode='w', encoding='utf8') as f:
json.dump(content, f, ensure_ascii=False)
def read_json(file_path):
with open(file_path, 'r') as f:
return json.load(f)
def read_xml(file_path):
with open(file_path, 'r') as xml_file:
return xml_file.read()
def write_to_xml(file_path, content):
with open(file_path, mode="w", encoding='utf8') as f:
f.write(content)
def write_to_xml_from_dict(file_path, content):
xml = dicttoxml(content)
xml = content
xml_decode = xml.decode()
with open(file_path, mode="w") as f:
f.write(xml_decode)
def load_ocr_result(ocr_path):
with open(ocr_path, 'r') as f:
lines = f.read().splitlines()
preds = []
for line in lines:
preds.append(line.split('\t'))
return preds
def post_process_basic_ocr(lwords: list) -> list:
pp_lwords = []
for word in lwords:
pp_lwords.append(word.replace("", " "))
return pp_lwords
def read_ocr_result_from_txt(file_path: str):
'''
return list of bounding boxes, list of words
'''
with open(file_path, 'r') as f:
lines = f.read().splitlines()
boxes, words = [], []
for line in lines:
if line == "":
continue
word_info = line.split("\t")
if len(word_info) == 6:
x1, y1, x2, y2, text, _ = word_info
elif len(word_info) == 5:
x1, y1, x2, y2, text = word_info
x1, y1, x2, y2 = int(float(x1)), int(float(y1)), int(float(x2)), int(float(y2))
if text and text != " ":
words.append(text)
boxes.append((x1, y1, x2, y2))
return boxes, words
def get_colormap():
return {
'others': (0, 0, 255), # others: red
'title': (0, 255, 255), # title: yellow
'key': (255, 0, 0), # key: blue
'value': (0, 255, 0), # value: green
'header': (233, 197, 15), # header
'group': (0, 128, 128), # group
'relation': (0, 0, 255)# (128, 128, 128), # relation
}
def convert_image(image):
exif = image._getexif()
orientation = None
if exif is not None:
orientation = exif.get(0x0112)
# Convert the PIL image to OpenCV format
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Rotate the image in OpenCV if necessary
if orientation == 3:
image = cv2.rotate(image, cv2.ROTATE_180)
elif orientation == 6:
image = cv2.rotate(image, cv2.ROTATE_90_CLOCKWISE)
elif orientation == 8:
image = cv2.rotate(image, cv2.ROTATE_90_COUNTERCLOCKWISE)
else:
image = np.asarray(image)
if len(image.shape) == 2:
image = np.repeat(image[:, :, np.newaxis], 3, axis=2)
assert len(image.shape) == 3
return image, orientation
def visualize(image, bbox, pr_class_words, pr_relations, color_map, labels=['others', 'title', 'key', 'value', 'header'], thickness=1):
image, orientation = convert_image(image)
# if orientation is not None and orientation == 6:
# width, height, _ = image.shape
# else:
# height, width, _ = image.shape
if len(pr_class_words) > 0:
id2label = {k: labels[k] for k in range(len(labels))}
for lb, groups in enumerate(pr_class_words):
if lb == 0:
continue
for group_id, group in enumerate(groups):
for i, word_id in enumerate(group):
# x0, y0, x1, y1 = int(bbox[word_id][0]*width/1000), int(bbox[word_id][1]*height/1000), int(bbox[word_id][2]*width/1000), int(bbox[word_id][3]*height/1000)
# x0, y0, x1, y1 = revert_box(bbox[word_id], width, height)
x0, y0, x1, y1 = bbox[word_id]
cv2.rectangle(image, (x0, y0), (x1, y1), color=color_map[id2label[lb]], thickness=thickness)
if i == 0:
x_center0, y_center0 = int((x0+x1)/2), int((y0+y1)/2)
else:
x_center1, y_center1 = int((x0+x1)/2), int((y0+y1)/2)
cv2.line(image, (x_center0, y_center0), (x_center1, y_center1), color=color_map['group'], thickness=thickness)
x_center0, y_center0 = x_center1, y_center1
if len(pr_relations) > 0:
for pair in pr_relations:
# xyxy0 = int(bbox[pair[0]][0]*width/1000), int(bbox[pair[0]][1]*height/1000), int(bbox[pair[0]][2]*width/1000), int(bbox[pair[0]][3]*height/1000)
# xyxy1 = int(bbox[pair[1]][0]*width/1000), int(bbox[pair[1]][1]*height/1000), int(bbox[pair[1]][2]*width/1000), int(bbox[pair[1]][3]*height/1000)
# xyxy0 = revert_box(bbox[pair[0]], width, height)
# xyxy1 = revert_box(bbox[pair[1]], width, height)
xyxy0 = bbox[pair[0]]
xyxy1 = bbox[pair[1]]
x_center0, y_center0 = int((xyxy0[0] + xyxy0[2])/2), int((xyxy0[1] + xyxy0[3])/2)
x_center1, y_center1 = int((xyxy1[0] + xyxy1[2])/2), int((xyxy1[1] + xyxy1[3])/2)
cv2.line(image, (x_center0, y_center0), (x_center1, y_center1), color=color_map['relation'], thickness=thickness)
return image
def revert_box(box, width, height):
return [
int((box[0] / 1000) * width),
int((box[1] / 1000) * height),
int((box[2] / 1000) * width),
int((box[3] / 1000) * height)
]
def get_wordgroup_bbox(lbbox: list, lword_ids: list) -> list:
points = [lbbox[i] for i in lword_ids]
x_min, y_min = min(points, key=lambda x: x[0])[0], min(points, key=lambda x: x[1])[1]
x_max, y_max = max(points, key=lambda x: x[2])[2], max(points, key=lambda x: x[3])[3]
return [x_min, y_min, x_max, y_max]
def get_pairs(json: list, rel_from: str, rel_to: str) -> dict:
outputs = {}
for pair in json:
is_rel = {rel_from: {'status': 0}, rel_to: {'status': 0}}
for element in pair:
if element['class'] in (rel_from, rel_to):
is_rel[element['class']]['status'] = 1
is_rel[element['class']]['value'] = element
if all([v['status'] == 1 for _, v in is_rel.items()]):
outputs[is_rel[rel_to]['value']['group_id']] = [is_rel[rel_from]['value']['group_id'], is_rel[rel_to]['value']['group_id']]
return outputs
def get_table_relations(json: list, header_key_pairs: dict, rel_from="key", rel_to="value") -> dict:
list_keys = list(header_key_pairs.keys())
relations = {k: [] for k in list_keys}
for pair in json:
is_rel = {rel_from: {'status': 0}, rel_to: {'status': 0}}
for element in pair:
if element['class'] == rel_from and element['group_id'] in list_keys:
is_rel[rel_from]['status'] = 1
is_rel[rel_from]['value'] = element
if element['class'] == rel_to:
is_rel[rel_to]['status'] = 1
is_rel[rel_to]['value'] = element
if all([v['status'] == 1 for _, v in is_rel.items()]):
relations[is_rel[rel_from]['value']['group_id']].append(is_rel[rel_to]['value']['group_id'])
return relations
def get_key2values_relations(key_value_pairs: dict):
triple_linkings = {}
for value_group_id, key_value_pair in key_value_pairs.items():
key_group_id = key_value_pair[0]
if key_group_id not in list(triple_linkings.keys()):
triple_linkings[key_group_id] = []
triple_linkings[key_group_id].append(value_group_id)
return triple_linkings
def merged_token_to_wordgroup(class_words: list, lwords: list, lbboxes: list, labels: list) -> dict:
word_groups = {}
id2class = {i: labels[i] for i in range(len(labels))}
for class_id, lwgroups_in_class in enumerate(class_words):
for ltokens_in_wgroup in lwgroups_in_class:
group_id = ltokens_in_wgroup[0]
ltokens_to_ltexts = [lwords[token] for token in ltokens_in_wgroup]
ltokens_to_lbboxes = [lbboxes[token] for token in ltokens_in_wgroup]
# text_string = get_string(ltokens_to_ltexts)
# text_string= get_string_by_deduplicate_bbox(ltokens_to_ltexts, ltokens_to_lbboxes)
text_string = get_string_with_word2line(ltokens_to_ltexts, ltokens_to_lbboxes)
group_bbox = get_wordgroup_bbox(lbboxes, ltokens_in_wgroup)
word_groups[group_id] = {
'group_id': group_id,
'text': text_string,
'class': id2class[class_id],
'tokens': ltokens_in_wgroup,
'bbox': group_bbox
}
return word_groups
def verify_linking_id(word_groups: dict, linking_id: int) -> int:
if linking_id not in list(word_groups):
for wg_id, _word_group in word_groups.items():
if linking_id in _word_group['tokens']:
return wg_id
return linking_id
def matched_wordgroup_relations(word_groups:dict, lrelations: list) -> list:
outputs = []
for pair in lrelations:
wg_from = verify_linking_id(word_groups, pair[0])
wg_to = verify_linking_id(word_groups, pair[1])
try:
outputs.append([word_groups[wg_from], word_groups[wg_to]])
except:
print('Not valid pair:', wg_from, wg_to)
return outputs
def get_single_entity(word_groups: dict, lrelations: list) -> list:
single_entity = {'title': [], 'key': [], 'value': [], 'header': []}
list_linked_ids = []
for pair in lrelations:
list_linked_ids.extend(pair)
for word_group_id, word_group in word_groups.items():
if word_group_id not in list_linked_ids:
single_entity[word_group['class']].append(word_group)
return single_entity
def export_kvu_outputs(file_path, lwords, lbboxes, class_words, lrelations, labels=['others', 'title', 'key', 'value', 'header']):
word_groups = merged_token_to_wordgroup(class_words, lwords, lbboxes, labels)
linking_pairs = matched_wordgroup_relations(word_groups, lrelations)
header_key = get_pairs(linking_pairs, rel_from='header', rel_to='key') # => {key_group_id: [header_group_id, key_group_id]}
header_value = get_pairs(linking_pairs, rel_from='header', rel_to='value') # => {value_group_id: [header_group_id, value_group_id]}
key_value = get_pairs(linking_pairs, rel_from='key', rel_to='value') # => {value_group_id: [key_group_id, value_group_id]}
single_entity = get_single_entity(word_groups, lrelations)
# table_relations = get_table_relations(linking_pairs, header_key) # => {key_group_id: [value_group_id1, value_groupid2, ...]}
key2values_relations = get_key2values_relations(key_value) # => {key_group_id: [value_group_id1, value_groupid2, ...]}
triplet_pairs = []
single_pairs = []
table = []
# print('key2values_relations', key2values_relations)
for key_group_id, list_value_group_ids in key2values_relations.items():
if len(list_value_group_ids) == 0: continue
elif (len(list_value_group_ids) == 1) and (list_value_group_ids[0] not in list(header_value.keys())) and (key_group_id not in list(header_key.keys())):
value_group_id = list_value_group_ids[0]
single_pairs.append({word_groups[key_group_id]['text']: {
'text': word_groups[value_group_id]['text'],
'id': value_group_id,
'class': "value",
'bbox': word_groups[value_group_id]['bbox'],
'key_bbox': word_groups[key_group_id]['bbox']
}})
else:
item = []
for value_group_id in list_value_group_ids:
if value_group_id not in header_value.keys():
header_group_id = -1 # temp
header_name_for_value = "non-header"
else:
header_group_id = header_value[value_group_id][0]
header_name_for_value = word_groups[header_group_id]['text']
item.append({
'text': word_groups[value_group_id]['text'],
'header': header_name_for_value,
'id': value_group_id,
"key_id": key_group_id,
"header_id": header_group_id,
'class': 'value',
'bbox': word_groups[value_group_id]['bbox'],
'key_bbox': word_groups[key_group_id]['bbox'],
'header_bbox': word_groups[header_group_id]['bbox'] if header_group_id != -1 else [0, 0, 0, 0],
})
if key_group_id not in list(header_key.keys()):
triplet_pairs.append({
word_groups[key_group_id]['text']: item
})
else:
header_group_id = header_key[key_group_id][0]
header_name_for_key = word_groups[header_group_id]['text']
item.append({
'text': word_groups[key_group_id]['text'],
'header': header_name_for_key,
'id': key_group_id,
"key_id": key_group_id,
"header_id": header_group_id,
'class': 'key',
'bbox': word_groups[value_group_id]['bbox'],
'key_bbox': word_groups[key_group_id]['bbox'],
'header_bbox': word_groups[header_group_id]['bbox'],
})
table.append({key_group_id: item})
single_entity_dict = {}
for class_name, single_items in single_entity.items():
single_entity_dict[class_name] = []
for single_item in single_items:
single_entity_dict[class_name].append({
'text': single_item['text'],
'id': single_item['group_id'],
'class': class_name,
'bbox': single_item['bbox']
})
if len(table) > 0:
table = sorted(table, key=lambda x: list(x.keys())[0])
table = [v for item in table for k, v in item.items()]
outputs = {}
outputs['title'] = single_entity_dict['title']
outputs['key'] = single_entity_dict['key']
outputs['value'] = single_entity_dict['value']
outputs['single'] = sorted(single_pairs, key=lambda x: int(float(list(x.values())[0]['id'])))
outputs['triplet'] = triplet_pairs
outputs['table'] = table
create_dir(os.path.join(os.path.dirname(file_path), 'kvu_results'))
file_path = os.path.join(os.path.dirname(file_path), 'kvu_results', os.path.basename(file_path))
write_to_json(file_path, outputs)
return outputs
def export_kvu_for_all(file_path, lwords, lbboxes, class_words, lrelations, labels=['others', 'title', 'key', 'value', 'header']) -> dict:
raw_outputs = export_kvu_outputs(
file_path, lwords, lbboxes, class_words, lrelations, labels
)
outputs = {}
# Title
outputs["title"] = (
raw_outputs["title"][0]["text"] if len(raw_outputs["title"]) > 0 else None
)
# Pairs of key-value
for pair in raw_outputs["single"]:
for key, values in pair.items():
# outputs[key] = values["text"]
elements = split_key_value_by_colon(key, values["text"])
outputs[elements[0]] = elements[1]
# Only key fields
for key in raw_outputs["key"]:
# outputs[key["text"]] = None
elements = split_key_value_by_colon(key["text"], None)
outputs[elements[0]] = elements[1]
# Triplet data
for triplet in raw_outputs["triplet"]:
for key, list_value in triplet.items():
outputs[key] = [value["text"] for value in list_value]
# Table data
table = []
header_list = {cell['header']: cell['header_bbox'] for row in raw_outputs['table'] for cell in row}
if header_list:
header_list = dict(sorted(header_list.items(), key=lambda x: int(x[1][0])))
print("Header_list:", header_list.keys())
for row in raw_outputs["table"]:
item = {header: None for header in list(header_list.keys())}
for cell in row:
item[cell["header"]] = cell["text"]
table.append(item)
outputs["tables"] = [{"headers": list(header_list.keys()), "data": table}]
else:
outputs["tables"] = []
outputs = normalize_kvu_output(outputs)
# write_to_json(file_path, outputs)
return outputs
def export_kvu_for_manulife(
file_path,
lwords,
lbboxes,
class_words,
lrelations,
labels=["others", "title", "key", "value", "header"],
) -> dict:
raw_outputs = export_kvu_outputs(
file_path, lwords, lbboxes, class_words, lrelations, labels
)
outputs = {}
# Title
title_list = []
for title in raw_outputs["title"]:
is_match, title_name, score, proceessed_text = manulife_standardizer(title["text"], threshold=0.6, type_dict="title")
title_list.append({
'documment_type': title_name if is_match else None,
'content': title['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': title['id']
})
if len(title_list) > 0:
selected_element = max(title_list, key=lambda x: x['lcs_score'])
outputs["title"] = selected_element['content'].upper()
outputs["class_doc"] = selected_element['documment_type']
outputs["Loại chứng từ"] = selected_element['documment_type']
outputs["Tên chứng từ"] = selected_element['content']
else:
outputs["title"] = None
outputs["class_doc"] = None
outputs["Loại chứng từ"] = None
outputs["Tên chứng từ"] = None
# Pairs of key-value
for pair in raw_outputs["single"]:
for key, values in pair.items():
# outputs[key] = values["text"]
elements = split_key_value_by_colon(key, values["text"])
outputs[elements[0]] = elements[1]
# Only key fields
for key in raw_outputs["key"]:
# outputs[key["text"]] = None
elements = split_key_value_by_colon(key["text"], None)
outputs[elements[0]] = elements[1]
# Triplet data
for triplet in raw_outputs["triplet"]:
for key, list_value in triplet.items():
outputs[key] = [value["text"] for value in list_value]
# Table data
table = []
header_list = {cell['header']: cell['header_bbox'] for row in raw_outputs['table'] for cell in row}
if header_list:
header_list = dict(sorted(header_list.items(), key=lambda x: int(x[1][0])))
# print("Header_list:", header_list.keys())
for row in raw_outputs["table"]:
item = {header: None for header in list(header_list.keys())}
for cell in row:
item[cell["header"]] = cell["text"]
table.append(item)
outputs["tables"] = [{"headers": list(header_list.keys()), "data": table}]
else:
outputs["tables"] = []
outputs = normalize_kvu_output_for_manulife(outputs)
# write_to_json(file_path, outputs)
return outputs
# For FI-VAT project
def get_vat_table_information(outputs):
table = []
for single_item in outputs['table']:
headers = [item['header'] for sublist in outputs['table'] for item in sublist if 'header' in item]
item = {k: [] for k in headers}
print(item)
for cell in single_item:
# header_name, score, proceessed_text = vat_standardizer(cell['header'], threshold=0.75, header=True)
# if header_name in list(item.keys()):
# item[header_name] = value['text']
item[cell['header']].append({
'content': cell['text'],
'processed_key_name': cell['header'],
'lcs_score': random.uniform(0.75, 1.0),
'token_id': cell['id']
})
# for header_name, value in item.items():
# if len(value) == 0:
# if header_name in ("Số lượng", "Doanh số mua chưa có thuế"):
# item[header_name] = '0'
# else:
# item[header_name] = None
# continue
# item[header_name] = max(value, key=lambda x: x['lcs_score'])['content'] # Get max lsc score
# item = post_process_for_item(item)
# if item["Mặt hàng"] == None:
# continue
table.append(item)
print(table)
return table
def get_vat_information(outputs):
# VAT Information
single_pairs = {k: [] for k in list(vat_dictionary(header=False).keys())}
for pair in outputs['single']:
for raw_key_name, value in pair.items():
key_name, score, proceessed_text = vat_standardizer(raw_key_name, threshold=0.8, header=False)
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': value['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': value['id'],
})
for triplet in outputs['triplet']:
for key, value_list in triplet.items():
if len(value_list) == 1:
key_name, score, proceessed_text = vat_standardizer(key, threshold=0.8, header=False)
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': value_list[0]['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': value_list[0]['id']
})
for pair in value_list:
key_name, score, proceessed_text = vat_standardizer(pair['header'], threshold=0.8, header=False)
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': pair['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': pair['id']
})
for table_row in outputs['table']:
for pair in table_row:
key_name, score, proceessed_text = vat_standardizer(pair['header'], threshold=0.8, header=False)
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs.keys()):
single_pairs[key_name].append({
'content': pair['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': pair['id']
})
return single_pairs
def post_process_vat_information(single_pairs):
vat_outputs = {k: None for k in list(single_pairs)}
for key_name, list_potential_value in single_pairs.items():
if key_name in ("Ngày, tháng, năm lập hóa đơn"):
if len(list_potential_value) == 1:
vat_outputs[key_name] = list_potential_value[0]['content']
else:
date_time = {'day': 'dd', 'month': 'mm', 'year': 'yyyy'}
for value in list_potential_value:
date_time[value['processed_key_name']] = re.sub("[^0-9]", "", value['content'])
vat_outputs[key_name] = f"{date_time['day']}/{date_time['month']}/{date_time['year']}"
else:
if len(list_potential_value) == 0: continue
if key_name in ("Mã số thuế người bán"):
selected_value = min(list_potential_value, key=lambda x: x['token_id']) # Get first tax code
# tax_code_raw = selected_value['content'].replace(' ', '')
tax_code_raw = selected_value['content']
if len(tax_code_raw.replace(' ', '')) not in (10, 13): # to remove the first number dupicated
tax_code_raw = tax_code_raw.split(' ')
tax_code_raw = sorted(tax_code_raw, key=lambda x: len(x), reverse=True)[0]
vat_outputs[key_name] = tax_code_raw.replace(' ', '')
else:
selected_value = max(list_potential_value, key=lambda x: x['lcs_score']) # Get max lsc score
vat_outputs[key_name] = selected_value['content']
return vat_outputs
def export_kvu_for_VAT_invoice(file_path, lwords, class_words, lrelations, labels=['others', 'title', 'key', 'value', 'header']):
vat_outputs = {}
outputs = export_kvu_outputs(file_path, lwords, class_words, lrelations, labels)
# List of items in table
table = get_vat_table_information(outputs)
# table = outputs["table"]
for pair in outputs['single']:
for raw_key_name, value in pair.items():
vat_outputs[raw_key_name] = value['text']
# VAT Information
# single_pairs = get_vat_information(outputs)
# vat_outputs = post_process_vat_information(single_pairs)
# Combine VAT information and table
vat_outputs['table'] = table
write_to_json(file_path, vat_outputs)
print(vat_outputs)
return vat_outputs
# For SBT project
def get_ap_table_information(outputs):
table = []
for single_item in outputs['table']:
item = {k: [] for k in list(ap_dictionary(header=True).keys())}
for cell in single_item:
header_name, score, proceessed_text = ap_standardizer(cell['header'], threshold=0.8, header=True)
# print(f"{key} ==> {proceessed_text} ==> {header_name} : {score} - {value['text']}")
if header_name in list(item.keys()):
item[header_name].append({
'content': cell['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': cell['id']
})
for header_name, value in item.items():
if len(value) == 0:
item[header_name] = None
continue
item[header_name] = max(value, key=lambda x: x['lcs_score'])['content'] # Get max lsc score
table.append(item)
return table
def get_ap_triplet_information(outputs):
triplet_pairs = []
for single_item in outputs['triplet']:
item = {k: [] for k in list(ap_dictionary(header=True).keys())}
is_item_valid = 0
for key_name, list_value in single_item.items():
for value in list_value:
if value['header'] == "non-header":
continue
header_name, score, proceessed_text = ap_standardizer(value['header'], threshold=0.8, header=True)
if header_name in list(item.keys()):
is_item_valid = 1
item[header_name].append({
'content': value['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': value['id']
})
if is_item_valid == 1:
for header_name, value in item.items():
if len(value) == 0:
item[header_name] = None
continue
item[header_name] = max(value, key=lambda x: x['lcs_score'])['content'] # Get max lsc score
item['productname'] = key_name
# triplet_pairs.append({key_name: new_item})
triplet_pairs.append(item)
return triplet_pairs
def get_ap_information(outputs):
single_pairs = {k: [] for k in list(ap_dictionary(header=False).keys())}
for pair in outputs['single']:
for raw_key_name, value in pair.items():
key_name, score, proceessed_text = ap_standardizer(raw_key_name, threshold=0.8, header=False)
# print(f"{raw_key_name} ==> {proceessed_text} ==> {key_name} : {score} - {value['text']}")
if key_name in list(single_pairs):
single_pairs[key_name].append({
'content': value['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': value['id']
})
## Get single_pair if it in a table (Product Information)
is_product_info = False
for table_row in outputs['table']:
pair = {"key": None, 'value': None}
for cell in table_row:
_, _, proceessed_text = ap_standardizer(cell['header'], threshold=0.8, header=False)
if any(txt in proceessed_text for txt in ['product', 'information', 'productinformation']):
is_product_info = True
if cell['class'] in pair:
pair[cell['class']] = cell
if all(v is not None for k, v in pair.items()) and is_product_info == True:
key_name, score, proceessed_text = ap_standardizer(pair['key']['text'], threshold=0.8, header=False)
# print(f"{pair['key']['text']} ==> {proceessed_text} ==> {key_name} : {score} - {pair['value']['text']}")
if key_name in list(single_pairs):
single_pairs[key_name].append({
'content': pair['value']['text'],
'processed_key_name': proceessed_text,
'lcs_score': score,
'token_id': pair['value']['id']
})
## end_block
ap_outputs = {k: None for k in list(single_pairs)}
for key_name, list_potential_value in single_pairs.items():
if len(list_potential_value) == 0: continue
if key_name == "imei_number":
# print('list_potential_value', list_potential_value)
# ap_outputs[key_name] = [v['content'] for v in list_potential_value if v['content'].replace(' ', '').isdigit() and len(v['content'].replace(' ', '')) > 5]
ap_outputs[key_name] = []
for v in list_potential_value:
imei = v['content'].replace(' ', '')
if imei.isdigit() and len(imei) > 5: # imei is number and have more 5 digits
ap_outputs[key_name].append(imei)
else:
selected_value = max(list_potential_value, key=lambda x: x['lcs_score']) # Get max lsc score
ap_outputs[key_name] = selected_value['content']
return ap_outputs
def export_kvu_for_SDSAP(file_path, lwords, class_words, lrelations, labels=['others', 'title', 'key', 'value', 'header']):
outputs = export_kvu_outputs(file_path, lwords, class_words, lrelations, labels)
# List of items in table
table = get_ap_table_information(outputs)
triplet_pairs = get_ap_triplet_information(outputs)
table = table + triplet_pairs
ap_outputs = get_ap_information(outputs)
ap_outputs['table'] = table
# ap_outputs['triplet'] = triplet_pairs
write_to_json(file_path, ap_outputs)
return ap_outputs