Azure OpenAI Realtime APIを活用したVoiceRAGを構築してみた

こんにちは。AI Labの浅野晟です。

今回は、Microsoftの提供するAzure OpenAI Realtime APIを核としたVoiceRAG (Retrieval Augmented Generation) 環境の構築についてご紹介します。

このプロジェクトの最大のポイントは、STT(Speech-to-Text:音声認識)機能とTTS(Text-to-Speech:音声合成)機能がAPI内部で自動的に処理されるAzure Open AI Realtime APIを使用している点です。

これにより、複雑な音声処理を意識することなく開発を進められ、ユーザーは音声でRAGを介したAIと直接対話できるようになります。

膨大な量のドキュメントから必要な情報を素早く見つけ出し、自然な会話形式で提供できるこのVoiceRAGは、コールセンター業務や社内ヘルプデスクなど、多岐にわたる分野での応用が期待されます。

目次

構築の流れ

本VoiceRAGのシステムは、GitHubで公開されているMicrosoftのサンプルをベースに開発を進めました。


出典:https://github.com/Azure-Samples/aisearch-openai-rag-audio/blob/main/docs/RTMTPattern.png

目的は、社内文書や製品の取扱説明書等をAIに学習させ、ユーザーが音声で「経費精算について教えて」「コーヒーメーカーの電源の入れ方を教えて」などと質問すると、ユーザーの質問に対して登録された文書を参照しながらAIがを音声で回答するシステムを構築することです。

完成品のイメージは次の動画ご参考にしてください:https://www.youtube.com/watch?v=5dyg-mfP4yE&t=1s
※この動画は当社が制作したものではなく、他社が公開しているものです。

今回はコーヒーメーカーの問い合わせ窓口を想定して作成しました。

システムは、以下の主要なAzureサービス群で構成されています。

  • Azure OpenAIサービス: Realtime APIを用いた対話モデル (gpt-4o-realtime-preview) と、文書を数値ベクトルに変換する埋め込みモデル (text-embedding-3-small/large) をデプロイします。

  • Azure AI Search: アップロードされた文書のインデックス化とベクトル検索を担い、RAGの基盤となります。

  • Azure Container Apps: AIモデルと連携するバックエンドアプリケーションをホストする環境です。

  • Container Apps Environment: コンテナアプリが動作するための基盤環境です.

  • Log Analytics: システムのログを収集・分析し、問題の特定に役立ちます.

  • ストレージアカウント (Azure Blob Storage): 学習させる取扱説明書などのドキュメントを安全に保存します.

  • Container Registry: コンテナアプリのイメージを格納するプライベートレジストリです.

  • マネージドID: 各Azureサービスがセキュアに連携するための認証基盤として利用します.

開発プロセスと課題解決

このVoiceRAGの構築は、単純なサンプルコードの実行にとどまらず、Azure環境における複雑な権限管理やデプロイの課題に直面し、それらを一つ一つ解決していくプロセスでした。読者の皆様も同様の課題に遭遇する可能性があるため、その解決策を具体的なポイントとともにご紹介します。

1. サンプル環境の準備と最初の壁

以下のような流れで準備をしていきます。

// gitのリポジトリをクローン
git clone https://github.com/Azure-Samples/aisearch-openai-rag-audio.git

// プロジェクトに移動し、仮想環境を作成
cd ~/aisearch-openai-rag-audio
python3 -m venv venv
source venv/bin/activate

// 必要なライブラリをインストール(しなくてもOK)
pip install -r app/backend/requirements.txt

// Azure Development CLIにログイン
azd auth login

上記準備が済んだら以下の流れでデプロイしアプリケーションを立ち上げます。

// デプロイコマンド
azd up

// 実行権限を付与 (1度)
chmod +x scripts/start.sh

// スタートコマンド
./scripts/start.sh

上記の流れが実現するため、私が直面した課題・その対処法を記しますので参考になれば幸いです。

MicrosoftのAzureサンプルをクローンし、デプロイを開始する際、まず直面したのは権限の問題でした。

  • 発生しうる問題:
    サンプルプロジェクトは通常、Azure CLIの azd up コマンドを使用してデプロイされますが、初期設定ではサブスクリプション全体にリソースグループを作成する権限が必要となる場合があります。
    しかし、私が持っていたロールは特定のリソースグループ内での共同作成者ロールのみで、新たなリソースグループを作成する権限はありませんでした。

  • 解決策:
    この問題に対処するため、main.bicep ファイルのターゲットスコープをサブスクリプションから既存のリソースグループに変更しました。これにより、限られた権限内でデプロイを進めることが可能になります。
    次に、リソースグループ内でのデプロイを進める中で、さらなる権限関連のエラーに直面しました。
    具体的には、デプロイプロセス中にサービスプリンシパルやマネージドIDにロールを割り当てる部分で失敗が発生しました。

  • 発生しうる問題:
    Azureのサービス間連携には、適切なロールの割り当てが不可欠です。
    しかし、デプロイを実行するユーザー(またはCI/CDパイプラインのサービスプリンシパル)が、必要なロールを自動で割り当てる権限を持っていない場合があります。

  • 一時的な対処法:
    まず、main.bicepファイル内のロール割り当てを行うコードブロックを一時的にコメントアウトしてデプロイを進めました。
    これにより、権限エラーによるデプロイの中断を避けつつ、他の部分の動作を確認できました。

  • 根本的な解決策:
    最終的には、管理者の方に足りないロールを手動で付与していただくことで問題を解決しました。
    具体的には、以下のロールが必要でした:

    ◦ Container AppsのマネージドIDに対して: Storage Blob Data Contributor および AcrPull ロール。

    ◦ AI Searchサービスに対して: Storage Blob Data Reader ロール (AI SearchがBlob Storageからデータを読み取るため)。

    ◦ ローカルでセットアップスクリプトを実行するユーザーに対して: Storage Blob Data Contributor ロール (Blob Storageにファイルをアップロードするため)。

    ◦ Search Serviceに関連する操作を行うために: Search Service Contributor、Search Index Data Reader、Search Index Data Contributor ロール。

2. コンテナイメージとデプロイの課題

ロールの問題を乗り越えても、Container Appのデプロイでエラーが発生し続けました。

  • 発生しうる問題:
    サンプルのデプロイ時に、コンテナイメージがContainer Registry (ACR) に自動でプッシュされないことがあります。
    この場合、Container Appは参照すべきイメージを見つけられず、デプロイに失敗します。
    また、main.bicep内のイメージ参照先が、意図しないデフォルト値になっている場合があります。
    さらに、過去のデプロイ試行により同じ名前のContainer Appが残っていると、競合が発生することがあります。

  • 解決策:
    ◦ イメージの手動プッシュ: az acr build コマンドを使用して、必要なコンテナイメージをACRに手動でプッシュしました。

    ◦ イメージ参照の修正: main.bicep内のContainer Appのイメージ設定を、正しいACRのパス(例: <yourACRName.azurecr>.io/voicerag:latest)に修正しました。

    ◦ 重複リソースの対処: 必要に応じて、重複しているContainer Appの名前を main.bicep で変更し、デプロイ前に古いリソースを削除するか、タグを一時的に外すことで、名前の競合を回避しました。

3. 認証とAPI連携の調整

デプロイが成功しても、今度はAPI接続の認証エラーに悩まされました。

  • 発生しうる問題:
    デプロイされたアプリケーションがAzure OpenAIやAI Searchと通信する際に、必要なAPIキーや接続文字列が正しく参照されないことがあります。特に、azd up実行時に .env ファイルが初期化されてしまい、手動で追加したAPIキーが消えるという問題が発生しました。また、サービスによってはローカル認証(APIキーなど)を無効にする設定がデフォルトで有効になっている場合があり、これが認証エラーの原因となります。さらに、ストレージアカウントが接続文字列によるアクセスを許可していない場合もあります。

  • 解決策:
    ◦ APIキーの管理: azd upによる初期化を防ぐため、APIキーを app/backend/.env.local ファイルに記述し、setup_intvect.pyでこのファイルから環境変数を読み込むように修正しました。

    ◦ 認証設定の変更: main.bicep内のAzure OpenAIおよびAI Searchサービスの設定で、disableLocalAuthプロパティをfalseに設定しました。これにより、APIキーでの認証が可能になります。

    ◦ ストレージアクセス権限: ストレージアカウントのmain.bicep設定で、allowSharedKeyAccessをtrueに設定しました。これにより、接続文字列を使用したBlob Storageへのアクセスが可能になります。

    ◦ 資格情報オブジェクトの修正: setup_intvect.py 内で、Azure SearchやBlob Storageにアクセスするためのazure_credentialオブジェクトを、AzureKeyCredentialやBlobServiceClient.from_connection_stringを使用するように修正しました。

4. リソースの整理と再構築

複数の試行錯誤の結果、リソースが乱立し、命名規則も不統一になったため、一度整理して再構築することにしました。

  • 発生しうる問題:
    試行錯誤の過程で作成されたリソースが複数のリソースグループに分散したり、命名規則が統一されなかったりすると、管理が非常に煩雑になります。

  • 解決策:
    ◦ リソースの一元管理: 管理者の方にUser Access AdministratorとCognitive Services Contributorロールを付与していただき、全てのAzureリソースをゼロから再作成しました。

    ◦ 命名規則の統一: main.bicep内の各リソース名パラメータ(例: backendServiceName, logAnalyticsName, searchServiceName, openAiServiceName, storageAccountName)を統一的な命名規則に従って明示的に設定しました。また、azd up時に設定する環境名も統一しました。

    ◦ Bicepファイルの最適化: main.bicep内で重複していたリソースグループの定義をコメントアウトし、全てのリソースのスコープをresourceGroup()に統一しました。また、.azureディレクトリのキャッシュを削除してからazd upを実行することで、クリーンな状態でのデプロイを確実に行いました。

5. 会話品質とRAG機能の向上

デプロイが成功し、基本機能が動作するようになった後も、VoiceRAGの会話品質を向上させるための調整が必要でした。

  • 発生しうる問題:
    初期段階では、AIが会話履歴を適切に保持できず、文脈に沿わない回答をすることがありました。また、プロンプトの指示が不十分な場合、日本語以外の言語(例: スペイン語)で回答してしまう問題も発生しました。さらに、RAGを介した回答は、RAGプロセスによる検索と生成のオーバーヘッドのため、遅延が発生することが確認されました。

  • 解決策:
    セッション内履歴の保持:
    rtmt.pyスクリプト内で会話履歴をセッション内で保持する機能が実装されていることを確認しました。以前保持できていなかったのは、不具合か別の原因によるものと考えられます。
    システムプロンプトの調整:
    AIがユーザーの意図を正確に理解し、適切な日本語で回答できるよう、システムプロンプトを繰り返し調整しました。例えば、製品型式が不明な場合に「フィルターの掃除方法は、ドリップ式とエスプレッソ式がありますがどちらでしょうか?」と聞き返すような会話フローを指示しました。
    また、「取扱説明書の〇〇ページを参照してください」といった直接的な参照誘導ではなく、具体的な回答を提供するよう指示を加えました。これにより、AIは質問内容が不十分な場合に型式を聞き返すなど、より自然な対話ができるようになりました。下に記載されているのものが実際のプロンプトとなります。
    RAGパフォーマンス:
    回答の遅延については、RAGを介さない会話ではほぼ遅延がないものの、RAGを介すると3秒から長い場合は20秒程度の遅延が発生することが確認されました。セッションが長くなるにつれて遅延が長くなる傾向も確認できました。

あなたは家庭用コーヒーメーカーに関する問い合わせ窓口に対応するスタッフです。回答は日本語のみで、'search'ツールを使用してデータソースに基づいてください。  
ユーザーは日本語の音声で質問し、音声で回答を聞きますので、回答はできるだけ短く、1文で答えてください。  
ファイル名、ソース名、キーを音声で読み上げないでください。  

回答する際は、必ず以下の手順に従ってください:  
1. 質問に答える前に、必ず「search」ツールを使用してデータソースを確認してください。  
2. 決してトップヒットの情報だけで答えず、全ての検索結果を踏まえて、検索結果に複数の型番や機種(エスプレッソ式・ドリップ式など)が含まれる場合、  
   つかった型番をリストアップし、ユーザーに「どの型番についてのマニュアルをご希望か」を確認する質問文を生成してください。  
   ユーザーが選びやすいよう、型番と簡単な説明を並べて提示し、「ご希望の型番を教えてください」と締めくくってください。  

(例)ユーザー:フィルターの掃除方法を教えてください。  
  あなた:ドリップ式とエスプレッソ式がございますが、ご希望の型番を教えてください。  
  ユーザー:ドリップ式になります。  
  あなた:お手入れ方法はこちらになります……。  

3. データソースを参照して回答する場合、情報源を「report_grounding」ツールで報告してください。  
4. データソースに回答がない場合は「知らない」と答えてください。  

注意:「取扱説明書の⚪︎⚪︎ページを参照してください」などとは絶対に言わず、ユーザーの質問にはできるだけあなたが調べて回答するよう努力してください。  

あなたが担当するのは以下の製品です。よく区別してから回答してください。  
・ドリップ式コーヒーメーカー(ペーパーフィルター使用)  
・ドリップ式コーヒーメーカー(金属フィルター内蔵)  
・エスプレッソマシン  
・カプセル式コーヒーメーカー  
・全自動コーヒーメーカー  

まとめ

本プロジェクトでは、Azure OpenAI Realtime APIの音声入出力機能を活用し、取扱説明書の内容に基づいてユーザーと対話できるVoiceRAGの構築に成功しました。

この過程で、Azureの複雑な権限管理、コンテナデプロイの課題、API連携、そしてAIの会話品質向上に至るまで、多岐にわたる技術的な挑戦がありました。

特に、Realtime APIがSTTとTTSを内部で自動処理してくれるため、開発者は音声処理の詳細に踏み込むことなくRAGロジックに集中できるという大きな利点がありました。

権限設定やデプロイプロセスの細かな調整は必要でしたが、それらを乗り越えることで、最終的に期待通りのVoiceRAGシステムを実現することができました。

しかし、現状ではまだ抑揚や発音の不自然さAIの回答の精度にブレがあり不安定な現状です。

今はまだpreview段階のRealtime APIですが、今後オプションで日本語対応になったりモデルの性能が上がれば、違和感なく対話できるVoiceRAGがより簡単に実装できるようになる新たな可能性を感じました。

この経験が、皆様のAI開発の一助となれば幸いです。

最終的なソースコード

上記流れで実装した、既存のリソースグループ内にVoiceRAGを作成する際のソースコードを記載します。

app.py↓

import logging
import os
from pathlib import Path

from aiohttp import web
from azure.core.credentials import AzureKeyCredential
from azure.identity import AzureDeveloperCliCredential, DefaultAzureCredential
from dotenv import load_dotenv

from ragtools import attach_rag_tools
from rtmt import RTMiddleTier

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("voicerag")

async def create_app():
    if not os.environ.get("RUNNING_IN_PRODUCTION"):
        logger.info("Running in development mode, loading from .env file")
        load_dotenv()

    llm_key = os.environ.get("AZURE_OPENAI_API_KEY")
    search_key = os.environ.get("AZURE_SEARCH_API_KEY")

    credential = None
    if not llm_key or not search_key:
        if tenant_id := os.environ.get("AZURE_TENANT_ID"):
            logger.info("Using AzureDeveloperCliCredential with tenant_id %s", tenant_id)
            credential = AzureDeveloperCliCredential(tenant_id=tenant_id, process_timeout=60)
        else:
            logger.info("Using DefaultAzureCredential")
            credential = DefaultAzureCredential()
    llm_credential = AzureKeyCredential(llm_key) if llm_key else credential
    search_credential = AzureKeyCredential(search_key) if search_key else credential

    app = web.Application()

    rtmt = RTMiddleTier(
        credentials=llm_credential,
        endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
        deployment=os.environ["AZURE_OPENAI_REALTIME_DEPLOYMENT"],
        voice_choice=os.environ.get("AZURE_OPENAI_REALTIME_VOICE_CHOICE") or "alloy"
        )
    rtmt.system_message = """
        あなたは家庭用コーヒーメーカーに関する問い合わせ窓口に対応するスタッフです。回答は日本語のみで、'search'ツールを使用してデータソースに基づいてください。  
     ユーザーは日本語の音声で質問し、音声で回答を聞きますので、回答はできるだけ短く、1文で答えてください。  
     ファイル名、ソース名、キーを音声で読み上げないでください。  

     回答する際は、必ず以下の手順に従ってください:  
     1. 質問に答える前に、必ず「search」ツールを使用してデータソースを確認してください。  
     2. 決してトップヒットの情報だけで答えず、全ての検索結果を踏まえて、検索結果に複数の型番や機種(エスプレッソ式・ドリップ式など)が含まれる場合、  
        つかった型番をリストアップし、ユーザーに「どの型番についてのマニュアルをご希望か」を確認する質問文を生成してください。  
        ユーザーが選びやすいよう、型番と簡単な説明を並べて提示し、「ご希望の型番を教えてください」と締めくくってください。  

     (例)ユーザー:フィルターの掃除方法を教えてください。  
       あなた:ドリップ式とエスプレッソ式がございますが、ご希望の型番を教えてください。  
       ユーザー:ドリップ式になります。  
       あなた:お手入れ方法はこちらになります……。  

     3. データソースを参照して回答する場合、情報源を「report_grounding」ツールで報告してください。  
     4. データソースに回答がない場合は「知らない」と答えてください。  

     注意:「取扱説明書の⚪︎⚪︎ページを参照してください」などとは絶対に言わず、ユーザーの質問にはできるだけあなたが調べて回答するよう努力してください。  

     あなたが担当するのは以下の製品です。よく区別してから回答してください。  
     ・ドリップ式コーヒーメーカー(ペーパーフィルター使用)  
     ・ドリップ式コーヒーメーカー(金属フィルター内蔵)  
     ・エスプレッソマシン  
     ・カプセル式コーヒーメーカー  
     ・全自動コーヒーメーカー  
    """.strip()

    attach_rag_tools(rtmt,
        credentials=search_credential,
        search_endpoint=os.environ.get("AZURE_SEARCH_ENDPOINT"),
        search_index=os.environ.get("AZURE_SEARCH_INDEX"),
        semantic_configuration=os.environ.get("AZURE_SEARCH_SEMANTIC_CONFIGURATION") or None,
        identifier_field=os.environ.get("AZURE_SEARCH_IDENTIFIER_FIELD") or "chunk_id",
        content_field=os.environ.get("AZURE_SEARCH_CONTENT_FIELD") or "chunk",
        embedding_field=os.environ.get("AZURE_SEARCH_EMBEDDING_FIELD") or "text_vector",
        title_field=os.environ.get("AZURE_SEARCH_TITLE_FIELD") or "title",
        use_vector_query=(os.getenv("AZURE_SEARCH_USE_VECTOR_QUERY", "true") == "true")
        )

    rtmt.attach_to_app(app, "/realtime")

    current_directory = Path(__file__).parent
    app.add_routes([web.get('/', lambda _: web.FileResponse(current_directory / 'static/index.html'))])
    app.router.add_static('/', path=current_directory / 'static', name='static')

    return app

if __name__ == "__main__":
    host = "localhost"
    port = 8765
    web.run_app(create_app(), host=host, port=port)

setup_intvect.py

import json
import logging
import os
import subprocess
from dotenv import load_dotenv
from azure.core.exceptions import ResourceExistsError
from azure.identity import AzureDeveloperCliCredential
from azure.search.documents.indexes import SearchIndexClient, SearchIndexerClient
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes.models import (
    AzureOpenAIEmbeddingSkill,
    AzureOpenAIParameters,
    AzureOpenAIVectorizer,
    FieldMapping,
    HnswAlgorithmConfiguration,
    HnswParameters,
    IndexProjectionMode,
    InputFieldMappingEntry,
    OutputFieldMappingEntry,
    SearchableField,
    SearchField,
    SearchFieldDataType,
    SearchIndex,
    SearchIndexer,
    SearchIndexerDataContainer,
    SearchIndexerDataSourceConnection,
    SearchIndexerDataSourceType,
    SearchIndexerIndexProjections,
    SearchIndexerIndexProjectionSelector,
    SearchIndexerIndexProjectionsParameters,
    SearchIndexerSkillset,
    SemanticConfiguration,
    SemanticField,
    SemanticPrioritizedFields,
    SemanticSearch,
    SimpleField,
    SplitSkill,
    VectorSearch,
    VectorSearchAlgorithmMetric,
    VectorSearchProfile,
)
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv
from rich.logging import RichHandler



def load_azd_env():
    """Get path to current azd env file and load file using python-dotenv"""
    result = subprocess.run("azd env list -o json", shell=True, capture_output=True, text=True)
    if result.returncode != 0:
        raise Exception("Error loading azd env")
    env_json = json.loads(result.stdout)
    env_file_path = None
    for entry in env_json:
        if entry["IsDefault"]:
            env_file_path = entry["DotEnvPath"]
    if not env_file_path:
        raise Exception("No default azd env file found")
    logger.info(f"Loading azd env from {env_file_path}")
    load_dotenv(env_file_path, override=True)


def setup_index(azure_credential, index_name, azure_search_endpoint, azure_storage_connection_string, azure_storage_container, azure_openai_embedding_endpoint, azure_openai_embedding_deployment, azure_openai_embedding_model, azure_openai_embeddings_dimensions):
    index_client = SearchIndexClient(azure_search_endpoint, azure_credential)
    indexer_client = SearchIndexerClient(azure_search_endpoint, azure_credential)

    data_source_connections = indexer_client.get_data_source_connections()
    if index_name in [ds.name for ds in data_source_connections]:
        logger.info(f"Data source connection {index_name} already exists, not re-creating")
    else:
        logger.info(f"Creating data source connection: {index_name}")
        indexer_client.create_data_source_connection(
            data_source_connection=SearchIndexerDataSourceConnection(
                name=index_name, 
                type=SearchIndexerDataSourceType.AZURE_BLOB,
                connection_string=azure_storage_connection_string,
                container=SearchIndexerDataContainer(name=azure_storage_container)))

    index_names = [index.name for index in index_client.list_indexes()]
    if index_name in index_names:
        logger.info(f"Index {index_name} already exists, not re-creating")
    else:
        logger.info(f"Creating index: {index_name}")
        index_client.create_index(
            SearchIndex(
                name=index_name,
                fields=[
                    SearchableField(name="chunk_id", key=True, analyzer_name="keyword", sortable=True),
                    SimpleField(name="parent_id", type=SearchFieldDataType.String, filterable=True),
                    SearchableField(name="title"),
                    SearchableField(name="chunk"),
                    SearchField(
                        name="text_vector", 
                        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                        vector_search_dimensions=EMBEDDINGS_DIMENSIONS,
                        vector_search_profile_name="vp",
                        stored=True,
                        hidden=False)
                ],
                vector_search=VectorSearch(
                    algorithms=[
                        HnswAlgorithmConfiguration(name="algo", parameters=HnswParameters(metric=VectorSearchAlgorithmMetric.COSINE))
                    ],
                    vectorizers=[
                        AzureOpenAIVectorizer(
                            name="openai_vectorizer",
                            azure_open_ai_parameters=AzureOpenAIParameters(
                                resource_uri=azure_openai_embedding_endpoint,
                                deployment_id=azure_openai_embedding_deployment,
                                model_name=azure_openai_embedding_model
                            )
                        )
                    ],
                    profiles=[
                        VectorSearchProfile(name="vp", algorithm_configuration_name="algo", vectorizer="openai_vectorizer")
                    ]
                ),
                semantic_search=SemanticSearch(
                    configurations=[
                        SemanticConfiguration(
                            name="default",
                            prioritized_fields=SemanticPrioritizedFields(title_field=SemanticField(field_name="title"), content_fields=[SemanticField(field_name="chunk")])
                        )
                    ],
                    default_configuration_name="default"
                )
            )
        )

    skillsets = indexer_client.get_skillsets()
    if index_name in [skillset.name for skillset in skillsets]:
        logger.info(f"Skillset {index_name} already exists, not re-creating")
    else:
        logger.info(f"Creating skillset: {index_name}")
        indexer_client.create_skillset(
            skillset=SearchIndexerSkillset(
                name=index_name,
                skills=[
                    SplitSkill(
                        text_split_mode="pages",
                        context="/document",
                        maximum_page_length=2000,
                        page_overlap_length=500,
                        inputs=[InputFieldMappingEntry(name="text", source="/document/content")],
                        outputs=[OutputFieldMappingEntry(name="textItems", target_name="pages")]),
                    AzureOpenAIEmbeddingSkill(
                        context="/document/pages/*",
                        resource_uri=azure_openai_embedding_endpoint,
                        api_key=os.environ.get("AZURE_OPENAI_API_KEY"),
                        deployment_id=azure_openai_embedding_deployment,
                        model_name=azure_openai_embedding_model,
                        dimensions=azure_openai_embeddings_dimensions,
                        inputs=[InputFieldMappingEntry(name="text", source="/document/pages/*")],
                        outputs=[OutputFieldMappingEntry(name="embedding", target_name="text_vector")])
                ],
                index_projections=SearchIndexerIndexProjections(
                    selectors=[
                        SearchIndexerIndexProjectionSelector(
                            target_index_name=index_name,
                            parent_key_field_name="parent_id",
                            source_context="/document/pages/*",
                            mappings=[
                                InputFieldMappingEntry(name="chunk", source="/document/pages/*"),
                                InputFieldMappingEntry(name="text_vector", source="/document/pages/*/text_vector"),
                                InputFieldMappingEntry(name="title", source="/document/metadata_storage_name")
                            ]
                        )
                    ],
                    parameters=SearchIndexerIndexProjectionsParameters(
                        projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS
                    )
                )))

    indexers = indexer_client.get_indexers()
    if index_name in [indexer.name for indexer in indexers]:
        logger.info(f"Indexer {index_name} already exists, not re-creating")
    else:
        indexer_client.create_indexer(
            indexer=SearchIndexer(
                name=index_name,
                data_source_name=index_name,
                skillset_name=index_name,
                target_index_name=index_name,        
                field_mappings=[FieldMapping(source_field_name="metadata_storage_name", target_field_name="title")]
            )
        )

def upload_documents(azure_credential, conn_str,indexer_name, azure_search_endpoint, azure_storage_endpoint, azure_storage_container):
    indexer_client = SearchIndexerClient(azure_search_endpoint, azure_credential)
    # Upload the documents in /data folder to the blob storage container
    # blob_client = BlobServiceClient(
    #     account_url=azure_storage_endpoint, credential=azure_credential,
    #     max_single_put_size=4 * 1024 * 1024
    # )
    blob_client = BlobServiceClient.from_connection_string(
        conn_str=conn_str,
        max_single_put_size=4 * 1024 * 1024
    )
    container_client = blob_client.get_container_client(azure_storage_container)
    if not container_client.exists():
        container_client.create_container()
    existing_blobs = [blob.name for blob in container_client.list_blobs()]

    # Open each file in /data folder
    for file in os.scandir("data"):
        with open(file.path, "rb") as opened_file:
            filename = os.path.basename(file.path)
            # Check if blob already exists
            if filename in existing_blobs:
                logger.info("Blob already exists, skipping file: %s", filename)
            else:
                logger.info("Uploading blob for file: %s", filename)
                blob_client = container_client.upload_blob(filename, opened_file, overwrite=True)

    # Start the indexer
    try:
        indexer_client.run_indexer(indexer_name)
        logger.info("Indexer started. Any unindexed blobs should be indexed in a few minutes, check the Azure Portal for status.")
    except ResourceExistsError:
        logger.info("Indexer already running, not starting again")

if __name__ == "__main__":
    logging.basicConfig(level=logging.WARNING, format="%(message)s", datefmt="[%X]", handlers=[RichHandler(rich_tracebacks=True)])
    logger = logging.getLogger("voicerag")
    logger.setLevel(logging.INFO)

    logger = logging.getLogger("voicerag")

    load_azd_env()
    logger.info("Checking if we need to set up Azure AI Search index...")
    if os.environ.get("AZURE_SEARCH_REUSE_EXISTING") == "true":
        logger.info("Since an existing Azure AI Search index is being used, no changes will be made to the index.")
        exit()
    else:
        logger.info("Setting up Azure AI Search index and integrated vectorization...")

    # Used to name index, indexer, data source and skillset
    AZURE_SEARCH_INDEX = os.environ["AZURE_SEARCH_INDEX"]
    AZURE_OPENAI_EMBEDDING_ENDPOINT = os.environ["AZURE_OPENAI_ENDPOINT"]
    AZURE_OPENAI_EMBEDDING_DEPLOYMENT = os.environ["AZURE_OPENAI_EMBEDDING_DEPLOYMENT"]
    AZURE_OPENAI_EMBEDDING_MODEL = os.environ["AZURE_OPENAI_EMBEDDING_MODEL"]
    EMBEDDINGS_DIMENSIONS = 3072
    AZURE_SEARCH_ENDPOINT = os.environ["AZURE_SEARCH_ENDPOINT"]
    AZURE_STORAGE_ENDPOINT = os.environ["AZURE_STORAGE_ENDPOINT"]
    AZURE_STORAGE_CONTAINER = os.environ["AZURE_STORAGE_CONTAINER"]


    load_dotenv(os.path.join(os.path.dirname(__file__),".env.local"), override=True)

    AZURE_STORAGE_CONNECTION_STRING = os.environ.get("AZURE_STORAGE_CONNECTION_STRING")
    azure_credential = AzureKeyCredential(os.environ.get("AZURE_SEARCH_API_KEY"))

    setup_index(azure_credential,
        index_name=AZURE_SEARCH_INDEX, 
        azure_search_endpoint=AZURE_SEARCH_ENDPOINT,
        azure_storage_connection_string=AZURE_STORAGE_CONNECTION_STRING,
        azure_storage_container=AZURE_STORAGE_CONTAINER,
        azure_openai_embedding_endpoint=AZURE_OPENAI_EMBEDDING_ENDPOINT,
        azure_openai_embedding_deployment=AZURE_OPENAI_EMBEDDING_DEPLOYMENT,
        azure_openai_embedding_model=AZURE_OPENAI_EMBEDDING_MODEL,
        azure_openai_embeddings_dimensions=EMBEDDINGS_DIMENSIONS)

    upload_documents(azure_credential,
        conn_str = AZURE_STORAGE_CONNECTION_STRING,
        indexer_name=AZURE_SEARCH_INDEX,
        azure_search_endpoint=AZURE_SEARCH_ENDPOINT,
        azure_storage_endpoint=AZURE_STORAGE_ENDPOINT,
        azure_storage_container=AZURE_STORAGE_CONTAINER)

main.bicep↓

targetScope = 'resourceGroup'

@minLength(1)
@maxLength(64)
@description('Name of the the environment which is used to generate a short unique hash used in all resources.')
param environmentName string

@minLength(1)
@description('Location for AI Search and Storage resources')
// Constrained due to semantic ranker availability: https://learn.microsoft.com/azure/search/search-region-support#americas
@allowed([
  'brazilsouth'
  'canadacentral'
  'canadaeast'
  'eastus2'
  'northcentralus'
  'westus'
  'westus2'
  'westcentralus'
  'northeurope'
  'francecentral'
  'switzerlandnorth'
  'switzerlandwest'
  'uksouth'
  'australiaeast'
  'eastasia'
  'southeastasia'
  'centralindia'
  'jioindiawest'
  'japanwest'
  'koreacentral'
])
@metadata({
  azd: {
    type: 'location'
  }
})
param location string

param backendServiceName string = '<実際のACPのリソース名>'
param resourceGroupName string = ''

param logAnalyticsName string = '<実際のログのリソース名>'

param reuseExistingSearch bool = true
param searchEndpoint string = ''
param searchServiceName string = '<実際のAI Searchのリソース名>'
param searchServiceResourceGroup string = ''
param searchServiceLocation string = ''
// The free tier does not support managed identity (required) or semantic search (optional)
@allowed(['free', 'basic', 'standard', 'standard2', 'standard3', 'storage_optimized_l1', 'storage_optimized_l2'])
param searchServiceSkuName string
param searchIndexName string
param searchSemanticConfiguration string
param searchServiceSemanticRankerLevel string
var actualSearchServiceSemanticRankerLevel = (searchServiceSkuName == 'free')
  ? 'disabled'
  : searchServiceSemanticRankerLevel
param searchIdentifierField string
param searchContentField string
param searchTitleField string
param searchEmbeddingField string
param searchUseVectorQuery bool

param storageAccountName string = '<実際のストレージアカウントのリソース名>'
param storageResourceGroup string = ''
param storageResourceGroupLocation string = location
param storageContainerName string = 'content'
param storageSkuName string

param reuseExistingOpenAi bool = true
param openAiServiceName string = '<実際のAzure Open AIのリソース名>'
param openAiResourceGroup string = ''
param openAiEndpoint string = ''
param openAiRealtimeDeployment string = ''
param openAiRealtimeVoiceChoice string = ''

@description('Location for the OpenAI resource group')
@allowed([
  'eastus2'
  'swedencentral'
])
@metadata({
  azd: {
    type: 'location'
  }
})
param openAiServiceLocation string

param realtimeDeploymentCapacity int
param realtimeDeploymentVersion string
param embeddingDeploymentCapacity int

param tenantId string = tenant().tenantId

@description('Id of the user or app to assign application roles')
param principalId string = ''

var abbrs = loadJsonContent('abbreviations.json')
var resourceToken = toLower(uniqueString(subscription().id, environmentName, location))
var tags = { 'azd-env-name': environmentName }

@description('Whether the deployment is running on GitHub Actions')
param runningOnGh string = ''

@description('Whether the deployment is running on Azure DevOps Pipeline')
param runningOnAdo string = ''

@description('Used by azd for containerapps deployment')
param webAppExists bool

@allowed(['Consumption', 'D4', 'D8', 'D16', 'D32', 'E4', 'E8', 'E16', 'E32', 'NC24-A100', 'NC48-A100', 'NC96-A100'])
param azureContainerAppsWorkloadProfile string

param acaIdentityName string = '${environmentName}-aca-identity'
param containerRegistryName string = '${replace(environmentName, '-', '')}acr'

// Figure out if we're running as a user or service principal
var principalType = empty(runningOnGh) && empty(runningOnAdo) ? 'User' : 'ServicePrincipal'

/*
// Organize resources in a resource group 
resource resourceGroup 'Microsoft.Resources/resourceGroups@2021-04-01' = { name: !empty(resourceGroupName) ? resourceGroupName : '${abbrs.resourcesResourceGroups}${environmentName}' location: location tags: tags }
resource openAiResourceGroup 'Microsoft.Resources/resourceGroups@2021-04-01' existing = if (!empty(openAiResourceGroupName)) { name: !empty(openAiResourceGroupName) ? openAiResourceGroupName : resourceGroup().name } 
resource searchServiceResourceGroup 'Microsoft.Resources/resourceGroups@2021-04-01' existing = if (!empty(searchServiceResourceGroupName)) { name: !empty(searchServiceResourceGroupName) ? searchServiceResourceGroupName : resourceGroup().name } 
resource storageResourceGroup 'Microsoft.Resources/resourceGroups@2021-04-01' existing = if (!empty(storageResourceGroupName)) { name: !empty(storageResourceGroupName) ? storageResourceGroupName : resourceGroup().name }
*/
module logAnalytics 'br/public:avm/res/operational-insights/workspace:0.7.0' = {
  name: 'loganalytics'
  scope: resourceGroup()
  params: {
    name: !empty(logAnalyticsName) ? logAnalyticsName : '${abbrs.operationalInsightsWorkspaces}${resourceToken}'
    location: location
    tags: tags
    skuName: 'PerGB2018'
    dataRetention: 30
    publicNetworkAccessForIngestion: 'Enabled'
    publicNetworkAccessForQuery: 'Enabled'
    useResourcePermissions: true
  }
}

// Azure container apps resources

// User-assigned identity for pulling images from ACR
module acaIdentity 'core/security/aca-identity.bicep' = {
  name: 'aca-identity'
  scope: resourceGroup()
  params: {
    identityName: acaIdentityName
    location: location
  }
}

module containerApps 'core/host/container-apps.bicep' = {
  name: 'container-apps'
  scope: resourceGroup()
  params: {
    name: 'app'
    tags: tags
    location: location
    workloadProfile: azureContainerAppsWorkloadProfile
    containerAppsEnvironmentName: '${environmentName}-aca-env'
    containerRegistryName: '${containerRegistryName}${resourceToken}'
    logAnalyticsWorkspaceResourceId: logAnalytics.outputs.resourceId
  }
}

// Container Apps for the web application (Python Quart app with JS frontend)
module acaBackend 'core/host/container-app-upsert.bicep' = {
  name: 'aca-web'
  scope: resourceGroup()
  dependsOn: [
    containerApps
    acaIdentity
  ]
  params: {
    name: !empty(backendServiceName) ? backendServiceName : '${abbrs.webSitesContainerApps}backend-${resourceToken}'
    location: location
    identityName: acaIdentityName
    exists: webAppExists
    workloadProfile: azureContainerAppsWorkloadProfile
    containerRegistryName: containerApps.outputs.registryName
    containerAppsEnvironmentName: containerApps.outputs.environmentName
    identityType: 'UserAssigned'
    tags: union(tags, { 'azd-service-name': 'backend' })
    targetPort: 8000
    containerCpuCoreCount: '1.0'
    containerMemory: '2Gi'
    env: {
      AZURE_SEARCH_ENDPOINT: reuseExistingSearch
        ? searchEndpoint
        : 'https://${searchService.outputs.name}.search.windows.net'
      AZURE_SEARCH_INDEX: searchIndexName
      AZURE_SEARCH_SEMANTIC_CONFIGURATION: searchSemanticConfiguration
      AZURE_SEARCH_IDENTIFIER_FIELD: searchIdentifierField
      AZURE_SEARCH_CONTENT_FIELD: searchContentField
      AZURE_SEARCH_TITLE_FIELD: searchTitleField
      AZURE_SEARCH_EMBEDDING_FIELD: searchEmbeddingField
      AZURE_SEARCH_USE_VECTOR_QUERY: searchUseVectorQuery
      AZURE_OPENAI_ENDPOINT: reuseExistingOpenAi ? openAiEndpoint : openAi.outputs.endpoint
      AZURE_OPENAI_REALTIME_DEPLOYMENT: reuseExistingOpenAi ? openAiRealtimeDeployment : openAiDeployments[0].name
      AZURE_OPENAI_REALTIME_VOICE_CHOICE: openAiRealtimeVoiceChoice
      // CORS support, for frontends on other hosts
      RUNNING_IN_PRODUCTION: 'true'
      // For using managed identity to access Azure resources. See https://github.com/microsoft/azure-container-apps/issues/442
      AZURE_CLIENT_ID: acaIdentity.outputs.clientId
    }
  }
}

var embedModel = 'text-embedding-3-large'
var openAiDeployments = [
  {
    name: 'gpt-4o-realtime-preview'
    model: {
      format: 'OpenAI'
      name: 'gpt-4o-realtime-preview'
      version: realtimeDeploymentVersion
    }
    sku: {
      name: 'GlobalStandard'
      capacity: realtimeDeploymentCapacity
    }
  }
  {
    name: embedModel
    model: {
      format: 'OpenAI'
      name: embedModel
      version: '1'
    }
    sku: {
      name: 'Standard'
      capacity: embeddingDeploymentCapacity
    }
  }
]

module openAi 'br/public:avm/res/cognitive-services/account:0.8.0' = if (!reuseExistingOpenAi) {
  name: 'openai'
  //scope: openAiResourceGroup
  scope: resourceGroup()
  params: {
    name: !empty(openAiServiceName) ? openAiServiceName : '${abbrs.cognitiveServicesAccounts}${resourceToken}'
    location: openAiServiceLocation
    tags: tags
    kind: 'OpenAI'
    customSubDomainName: !empty(openAiServiceName)
      ? openAiServiceName
      : '${abbrs.cognitiveServicesAccounts}${resourceToken}'
    sku: 'S0'
    deployments: openAiDeployments
    disableLocalAuth: false
    publicNetworkAccess: 'Enabled'
    networkAcls: {}
    roleAssignments: [
      {
        roleDefinitionIdOrName: 'Cognitive Services OpenAI User'
        principalId: principalId
        principalType: principalType
      }
    ]
  }
}

module searchService 'br/public:avm/res/search/search-service:0.7.1' = if (!reuseExistingSearch) {
  name: 'search-service'
  //scope: searchServiceResourceGroup
  scope: resourceGroup()
  params: {
    name: !empty(searchServiceName) ? searchServiceName : 'gptkb-${resourceToken}'
    location: !empty(searchServiceLocation) ? searchServiceLocation : location
    tags: tags
    disableLocalAuth: false
    sku: searchServiceSkuName
    replicaCount: 1
    semanticSearch: actualSearchServiceSemanticRankerLevel
    // An outbound managed identity is required for integrated vectorization to work,
    // and is only supported on non-free tiers:
    managedIdentities: { systemAssigned: true }
    roleAssignments: [
      {
        roleDefinitionIdOrName: 'Search Index Data Reader'
        principalId: principalId
        principalType: principalType
      }
      {
        roleDefinitionIdOrName: 'Search Index Data Contributor'
        principalId: principalId
        principalType: principalType
      }
      {
        roleDefinitionIdOrName: 'Search Service Contributor'
        principalId: principalId
        principalType: principalType
      }
    ]
  }
}

module storage 'br/public:avm/res/storage/storage-account:0.9.1' = {
  name: 'storage'
  //scope: storageResourceGroup
  scope: resourceGroup()
  params: {
    name: !empty(storageAccountName) ? storageAccountName : '${abbrs.storageStorageAccounts}${resourceToken}'
    location: storageResourceGroupLocation
    tags: tags
    kind: 'StorageV2'
    skuName: storageSkuName
    publicNetworkAccess: 'Enabled' // Necessary for uploading documents to storage container
    networkAcls: {
      defaultAction: 'Allow'
      bypass: 'AzureServices'
    }
    allowBlobPublicAccess: false
    allowSharedKeyAccess: true
    blobServices: {
      deleteRetentionPolicyDays: 2
      deleteRetentionPolicyEnabled: true
      containers: [
        {
          name: storageContainerName
          publicAccess: 'None'
        }
      ]
    }
    roleAssignments: [
      {
        roleDefinitionIdOrName: 'Storage Blob Data Reader'
        principalId: principalId
        principalType: principalType
      }
      // For uploading documents to storage container:
      {
        roleDefinitionIdOrName: 'Storage Blob Data Contributor'
        principalId: principalId
        principalType: principalType
      }
    ]
  }
}

// Roles for the backend to access other services
module openAiRoleBackend 'core/security/role.bicep' = {
  //scope: openAiResourceGroup
  scope: resourceGroup()
  name: 'openai-role-backend'
  params: {
    principalId: acaBackend.outputs.identityPrincipalId
    roleDefinitionId: '5e0bd9bd-7b93-4f28-af87-19fc36ad61bd'
    principalType: 'ServicePrincipal'
  }
}

// Used to issue search queries
// https://learn.microsoft.com/azure/search/search-security-rbac
module searchRoleBackend 'core/security/role.bicep' = {
  //scope: searchServiceResourceGroup
  scope: resourceGroup()
  name: 'search-role-backend'
  params: {
    principalId: acaBackend.outputs.identityPrincipalId
    roleDefinitionId: '1407120a-92aa-4202-b7e9-c0e197c71c8f'
    principalType: 'ServicePrincipal'
  }
}

// Necessary for integrated vectorization, for search service to access storage
module storageRoleSearchService 'core/security/role.bicep' = if (!reuseExistingSearch) {
  //scope: storageResourceGroup
  scope: resourceGroup()
  name: 'storage-role-searchservice'
  params: {
    principalId: !reuseExistingSearch ? searchService.outputs.systemAssignedMIPrincipalId : ''
    roleDefinitionId: '2a2b9908-6ea1-4ae2-8e65-a410df84e7d1' // Storage Blob Data Reader
    principalType: 'ServicePrincipal'
  }
}

// Necessary for integrated vectorization, for search service to access OpenAI embeddings
module openAiRoleSearchService 'core/security/role.bicep' = if (!reuseExistingSearch) {
  //scope: openAiResourceGroup
  scope: resourceGroup()
  name: 'openai-role-searchservice'
  params: {
    principalId: !reuseExistingSearch ? searchService.outputs.systemAssignedMIPrincipalId : ''
    roleDefinitionId: '5e0bd9bd-7b93-4f28-af87-19fc36ad61bd'
    principalType: 'ServicePrincipal'
  }
}

output AZURE_LOCATION string = location
output AZURE_TENANT_ID string = tenantId
output AZURE_RESOURCE_GROUP string = resourceGroup().name

output AZURE_OPENAI_ENDPOINT string = reuseExistingOpenAi ? openAiEndpoint : openAi.outputs.endpoint
output AZURE_OPENAI_REALTIME_DEPLOYMENT string = reuseExistingOpenAi
  ? openAiRealtimeDeployment
  : openAiDeployments[0].name
output AZURE_OPENAI_REALTIME_VOICE_CHOICE string = openAiRealtimeVoiceChoice
output AZURE_OPENAI_EMBEDDING_DEPLOYMENT string = embedModel
output AZURE_OPENAI_EMBEDDING_MODEL string = embedModel

output AZURE_SEARCH_ENDPOINT string = reuseExistingSearch
  ? searchEndpoint
  : 'https://${searchService.outputs.name}.search.windows.net'
output AZURE_SEARCH_INDEX string = searchIndexName
output AZURE_SEARCH_SEMANTIC_CONFIGURATION string = searchSemanticConfiguration
output AZURE_SEARCH_IDENTIFIER_FIELD string = searchIdentifierField
output AZURE_SEARCH_CONTENT_FIELD string = searchContentField
output AZURE_SEARCH_TITLE_FIELD string = searchTitleField
output AZURE_SEARCH_EMBEDDING_FIELD string = searchEmbeddingField
output AZURE_SEARCH_USE_VECTOR_QUERY bool = searchUseVectorQuery

output AZURE_STORAGE_ENDPOINT string = 'https://${storage.outputs.name}.blob.core.windows.net'
output AZURE_STORAGE_ACCOUNT string = storage.outputs.name
//output AZURE_STORAGE_CONNECTION_STRING string = 'ResourceId=/subscriptions/${subscription().subscriptionId}/resourceGroups/${storageResourceGroup.name}/providers/Microsoft.Storage/storageAccounts/${storage.outputs.name}'
output AZURE_STORAGE_CONNECTION_STRING string = 'ResourceId=/subscriptions/${subscription().subscriptionId}/resourceGroups/${resourceGroup().name}/providers/Microsoft.Storage/storageAccounts/${storage.outputs.name}'
output AZURE_STORAGE_CONTAINER string = storageContainerName
//output AZURE_STORAGE_RESOURCE_GROUP string = storageResourceGroup().name
output AZURE_STORAGE_RESOURCE_GROUP string = resourceGroup().name
output BACKEND_URI string = acaBackend.outputs.uri
output AZURE_CONTAINER_REGISTRY_ENDPOINT string = containerApps.outputs.registryLoginServer

.env↓

AZURE_OPENAI_ENDPOINT=
AZURE_OPENAI_REALTIME_DEPLOYMENT=gpt-4o-realtime-preview
AZURE_OPENAI_REALTIME_VOICE_CHOICE=alloy
AZURE_SEARCH_ENDPOINT=
AZURE_SEARCH_INDEX=voicerag-intvect
AZURE_TENANT_ID=
AZURE_SEARCH_SEMANTIC_CONFIGURATION=default
AZURE_SEARCH_IDENTIFIER_FIELD=chunk_id
AZURE_SEARCH_CONTENT_FIELD=chunk
AZURE_SEARCH_TITLE_FIELD=title
AZURE_SEARCH_EMBEDDING_FIELD=text_vector
AZURE_SEARCH_USE_VECTOR_QUERY=true

AZURE_OPENAI_API_KEY=
AZURE_SEARCH_API_KEY=

.env.local↓

AZURE_OPENAI_API_KEY=
AZURE_SEARCH_API_KEY=
AZURE_STORAGE_CONNECTION_STRING=