Azure Document Intelligenceで面倒な書類比較を自動化してみた

こんにちは。AILabの堂上拓です。

現在AILabでは、社内業務の効率化に貢献するmicroSaaSツールの開発に取り組んでいます。 今回は、2つの書類間の細かな違いを自動検出するツールを開発しました。

書類の比較作業は、非常に集中力を要する上に達成感を得にくく、精神的負担の大きい業務のひとつです。 本ツールを活用いただくことで、この比較作業の負担を大幅に軽減し、業務の一部を効率化することができます。 少しでも皆様のストレスを減らす一助となれば幸いです。

それでは、開発の経緯とツールの使い方についてご説明します。

目次

開発環境

本アプリケーションで使用した技術スタックと構成は以下のとおりです。

Azure Document Intelligence

AzureのDocument Intelligenceを用いて、書類内のテキスト、構造、バウンディングボックス(位置情報)を抽出しています。 テキストの内容だけでなく「どこに書かれているか」といった空間的な情報も取得できるため、視覚的な差分強調(ハイライト)が可能になりました。

Azure OpenAI

新旧対照表とは、2つのファイルの異なる箇所について記述した比較表のことを指します。 本ツールでは、その内容をAzure OpenAIによって自動生成しています。文章ごとの差分を抽出し、要約付きで出力します。

Azure Logic Apps

ファイルがBlob Storageにアップロードされると、自動的にフローが起動し、処理が開始される仕組みです。 ノーコードでAzure Functionsなどのサービスと連携可能で、処理の自動化が簡単に実現できました。

Azure Functions

差分比較の主要ロジックは、Azure Functions上のPythonスクリプトで実装しています。 サーバーレスでスケーラブルに動作し、必要なときだけ起動するため、コスト効率も良好です。

Python

OpenCVやdifflib、numpy、Pillowなどのライブラリを活用することで、OCR結果の差分処理や描画処理の実装を効率化しました。

開発プロセス

今回の開発では、特に工夫が必要だった点や、実装の中で重要なポイントとなった箇所について、以下で詳しく紹介していきます。

① 二つのファイルをアップロードし、pdfを分割

まず、Azure Blob Storageの指定フォルダに2つのPDFファイルを同時にアップロードします。 もしファイルが1つしかなければ、処理は自動的に停止する仕組みです。 PDFが2つそろった場合、それぞれのPDFを1ページずつのJPEG形式に分割します。 これは、ページ単位で差分処理を行うためであり、処理の安定性と精度を高める目的があります。

② OCRを実行し、画像をハイライト画像と新旧対照表作成

次に、PDFから抽出したJPEG画像をOCR処理にかけ、文字情報とその位置を含むJSONファイルを生成します。 左側の分岐では、2つのJSONファイルを比較し、差分のある箇所をハイライトします。 赤と青のマーカーで、それぞれのファイルにおける異なる箇所を視覚的に示します。

Azure Functionsにデプロイしているコードはこちらになります。

import logging
import json
import numpy as np
import difflib
import cv2
import os
import io
from PIL import Image  # Pillow使用
import azure.functions as func
from azure.storage.blob import BlobServiceClient

BLOB_CONNECTION_STRING = os.getenv("BLOB_CONNECTION_STRING")
JPEG_CONTAINER = os.getenv("input_jpeg")
JSON_CONTAINER = os.getenv("input_json")
OUTPUT_CONTAINER = os.getenv("output_document_compare")

def get_image_size_from_bytes(image_bytes):
    img = Image.open(io.BytesIO(image_bytes))
    return img.width, img.height

def convert_polygon_inch_to_pixel(poly_inch, dpi_x, dpi_y):
    # poly_inch は [x0,y0,x1,y1,x2,y2,x3,y3](inch単位)
    return [int(poly_inch[i]*dpi_x) if i%2==0 else int(poly_inch[i]*dpi_y) for i in range(len(poly_inch))]

def extract_words_and_coordinates_by_page(data, images_by_page):
    pages = {}
    for page in data["pages"]:
        page_number = page["pageNumber"]
        page_width_inch = page["width"]
        page_height_inch = page["height"]
        unit = page.get("unit", "inch")
        words = []
        coords = []

        if page_number not in images_by_page or "before" not in images_by_page[page_number]:
            for word in page["words"]:
                words.append(word["content"])
                coords.append(word["polygon"])
            pages[page_number] = (words, coords)
            continue

        img_width_px, img_height_px = get_image_size_from_bytes(images_by_page[page_number]["before"])

        dpi_x = img_width_px / page_width_inch
        dpi_y = img_height_px / page_height_inch

        for word in page["words"]:
            words.append(word["content"])
            poly_inch = word["polygon"]  
            poly_px = convert_polygon_inch_to_pixel(poly_inch, dpi_x, dpi_y)
            coords.append(poly_px)
        pages[page_number] = (words, coords)
    return pages

def normalize_to_pixel(poly):
    return np.array([[poly[i], poly[i+1]] for i in range(0, len(poly), 2)], np.int32).reshape((-1, 1, 2))

def draw_differences_on_image(image_bytes, coordinates, highlight_indices, color):
    nparr = np.frombuffer(image_bytes, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    if img is None:
        return np.zeros((100, 100, 3), dtype=np.uint8), ["画像デコード失敗"]

    overlay = img.copy()
    debug_logs = []

    for idx in highlight_indices:
        if idx >= len(coordinates):
            continue

        poly = coordinates[idx]
        if len(poly) != 8:
            continue

        pts = normalize_to_pixel(poly)
        cv2.polylines(overlay, [pts], isClosed=True, color=color, thickness=2)
        cv2.fillPoly(overlay, [pts], color)

    cv2.addWeighted(overlay, 0.4, img, 0.6, 0, img)
    return img, debug_logs

def get_diff_indices(words1, words2):
    sm = difflib.SequenceMatcher(None, words1, words2)
    only_in_1, only_in_2 = [], []
    for tag, i1, i2, j1, j2 in sm.get_opcodes():
        if tag == "insert":
            only_in_2.extend(range(j1, j2))
        elif tag == "delete":
            only_in_1.extend(range(i1, i2))
        elif tag == "replace":
            only_in_1.extend(range(i1, i2))
            only_in_2.extend(range(j1, j2))
    return only_in_1, only_in_2

def main(req: func.HttpRequest) -> func.HttpResponse:
    log_messages = []

    try:
        blob_service = BlobServiceClient.from_connection_string(BLOB_CONNECTION_STRING)
        jpeg_container = blob_service.get_container_client(JPEG_CONTAINER)
        json_container = blob_service.get_container_client(JSON_CONTAINER)
        output_container = blob_service.get_container_client(OUTPUT_CONTAINER)

        json_blobs = sorted(
            [blob for blob in json_container.list_blobs() if blob.name.lower().endswith(".json")],
            key=lambda b: b.last_modified
        )
        if len(json_blobs) < 2:
            return func.HttpResponse("JSONファイルが2つ未満です。", status_code=400)
        before_blob_name = json_blobs[0].name
        after_blob_name = json_blobs[1].name

        before_data = json_container.get_blob_client(before_blob_name).download_blob().readall()
        after_data = json_container.get_blob_client(after_blob_name).download_blob().readall()

        json_before = json.loads(before_data)
        json_after = json.loads(after_data)

        blobs = jpeg_container.list_blobs()
        images_by_page = {}
        for blob in blobs:
            name = blob.name
            if not (name.endswith(".jpg") or name.endswith(".jpeg")):
                continue
            filename = os.path.basename(name)
            parts = filename.split("_")
            if len(parts) >= 3:
                try:
                    page_number = int(parts[1])
                    image_type = parts[2].split(".")[0]
                    image_bytes = jpeg_container.get_blob_client(blob.name).download_blob().readall()
                    if page_number not in images_by_page:
                        images_by_page[page_number] = {}
                    images_by_page[page_number][image_type] = image_bytes
                except Exception as e:
                    logging.warning(f"ファイル名処理エラー: {name} - {e}")

        before_pages = extract_words_and_coordinates_by_page(json_before, images_by_page)
        after_pages = extract_words_and_coordinates_by_page(json_after, images_by_page)

        for page_number in sorted(images_by_page.keys()):
            img_before_bytes = images_by_page[page_number].get("before")
            img_after_bytes = images_by_page[page_number].get("after")

            if img_before_bytes is None or img_after_bytes is None:
                continue

            words1, coords1 = before_pages.get(page_number, ([], []))
            words2, coords2 = after_pages.get(page_number, ([], []))
            only_in_1, only_in_2 = get_diff_indices(words1, words2)
            out1, debug1 = draw_differences_on_image(img_before_bytes, coords1, only_in_1, (255, 0, 0))
            out2, debug2 = draw_differences_on_image(img_after_bytes, coords2, only_in_2, (0, 0, 255))
            _, buffer1 = cv2.imencode(".jpg", out1)
            _, buffer2 = cv2.imencode(".jpg", out2)

            output_container.upload_blob(f"page_{page_number}_before.jpg", buffer1.tobytes(), overwrite=True)
            output_container.upload_blob(f"page_{page_number}_after.jpg", buffer2.tobytes(), overwrite=True)

        return func.HttpResponse("\n".join(log_messages), status_code=200)

    except Exception as e:
        return func.HttpResponse("\n".join(log_messages), status_code=500)

③ JPEGを1枚ずつOCRする

PDF全体でOCRを実行するとAIに渡すテキストが長くなりすぎるため、ここではJPEG画像単位でOCRを再実行します。 これにより、ページごとに分割された適切な粒度でAIに渡すことができ、精度の高い差分要約が可能になります。

④ AIに差分の要約を作成してもらう

JPEGごとに取得したOCR結果(前後)をAIに渡し、差分を自動的に要約してもらいます。 要約は、「改定前/改定後の文」「変更内容の種類」「変更の簡易説明」を含む構造化されたJSON形式で返されます。

⑤ 新旧対照表の作成

④で得られた要約結果をもとに、Google スプレッドシート上に新旧対照表を自動生成します。 今後は、ハイライト画像についても同様にGoogle Driveに保存可能にする予定です。

使用方法

① Azure Portalアカウントにログインする

このツールを利用するには、ご自身でAzure Blob Storageにファイルをアップロードしていただく必要があります。まずは以下のURLからAzure Portalにアクセスし、ログインしてください。アカウントをお持ちでない場合は、この機会に作成をお願いします。

https://azure.microsoft.com/ja-jp/get-started/azure-portal/

② Azure Blob Storageに比較したい書類をアップロードする

ログイン後、ストレージアカウントを開いてください。見つからない場合は、ポータル上部の検索バーをご活用いただくとスムーズです。 ストレージアカウント一覧が表示されますので、その中から自信が作成したコンテナを選択してください。 続いて、左側のメニューバーから「コンテナー」を選択します

このコンテナには、書類比較に必要ないくつかのフォルダがあります。今回は「pdf」コンテナを利用しますので、こちらを開いて、比較したい2つの書類をアップロードしてください。 ファイルサイズや内容によっては、比較結果の出力までに少し時間がかかることがあります。

③ 出力を確認する

現在、差分のハイライトはすべてJPEG画像として出力されます。生成されたファイルは「highlight-jpeg」フォルダにあります。以下のように、異なる箇所がそれぞれ赤と青で示されます。 また、新旧ファイルの異なる部分を要約した「新旧対照表(CSV形式)」も出力されます。こちらも一部サンプルを以下に示しています。

今後の展望

現在はPDFとJPEGの二重OCRで処理を細分化していますが、今後はOCRの回数を減らしつつ精度を高める構成に改善していく予定です。また、全体のフローをGoogle Driveベースに移行することで、より柔軟で統合的な運用を目指します。これにより処理効率の向上と、実用性のさらなる向上が期待できます。