Azure AI SearchとAzure Open AIを組み合わせてVoice RAG環境を構築してみた

こんにちは、AI Labの神﨑睦人です。

今回は、私たちが開発に取り組んできた音声RAGシステムの開発プロセスから課題解決までご紹介します。

このプロジェクトは、Retrieval-Augmented Generation(RAG)をベースとした対話型AIに音声インターフェースを統合することで、より自然で直感的なユーザーエクスペリエンスを実現することを目的としています。

このシステムが目指すのは、単にテキストで情報を提供するだけでなく、ユーザーが声で質問し、声で回答を得られる次世代のAIアシスタントです。開発の主要なポイントは、Azure AI Searchによる高精度な情報検索、Azure OpenAIによる自然な対話生成、Azure Bot Serviceを介した柔軟なインターフェース提供、そしてAzure Speech Servicesによる音声入出力機能の実装にありました。

さらに、Azure Cosmos DBを用いたセッション管理と会話履歴の保持により、パーソナライズされた対話体験を提供します。 このブログ記事では、開発環境、環境構築の具体的な流れ、そしてPhase 1からPhase 7までの各開発フェーズで直面した課題とその解決策、最終的なシステムの動作原理について解説します。

目次

開発環境

本音声RAGシステムの開発では、Microsoft Azureの多岐にわたるサービスとPythonエコシステムを組み合わせることで、堅牢かつスケーラブルなインフラを構築しました。 使用した主要なリソースは以下の通りです。

Azure AI Search

大量のドキュメントデータから関連性の高い情報を効率的に検索するために利用しました。特に、PDFやテキストデータのインデックス作成とベクトル検索を可能にし、RAGの精度向上に貢献しています。

Azure OpenAI Service

自然言語処理(NLP)の中核を担い、ユーザーの質問に対する理解と、適切な回答の生成を行いました。GPT-4oなどのモデルをデプロイし、会話の品質を最大化しています。

Azure Blob Storage

RAGに利用するPDFファイルや、音声入出力で生成されるWAVファイルなどのバイナリデータを効率的に格納するために使用しました。特に、大規模な音声データを直接APIで送受信する際の容量制限問題の解決に不可欠でした。

Azure Bot Service

ユーザーとの対話インターフェースとして機能し、Webチャットや将来的にはMicrosoft Teamsなどの様々なチャネルへの拡張性を考慮しました。Direct Line APIを介してクライアントアプリケーションとの連携を実現しています。

Azure Speech Services

音声入力(Speech to Text: STT)と音声出力(Text to Speech: TTS)の両方を提供します。これにより、ユーザーは声で質問し、AIも声で回答を返すというフルボイス対話が可能になりました。

Azure Cosmos DB

ユーザーごとの会話履歴やセッションデータを格納するためのNoSQLデータベースとして採用しました。これにより、パーソナライズされた対話や、複数回のやり取りを通じた文脈維持が可能になります。

Azure Functions

PDFデータからテキスト、画像、JSONを抽出するなどのデータ変換処理をサーバーレスで実行するために利用しました。これにより、必要な時に必要な処理を柔軟にスケールさせることができます。

Python

バックエンドロジック、AIモデルとの連携、データ処理スクリプトなど、システム全体の開発言語としてPythonを使用しました。

これらのサービスを組み合わせることで、高い機能性と安定性を備えた音声RAGシステムとなりました。

環境構築

開発した音声RAGシステムをAzure上で動作させるための環境構築とデプロイは、以下の流れで進めました。

1. 必要なリソースの作成と設定

Azure AI Search: RAGデータのインデックスを格納するために作成しました。

Azure OpenAI Service: LLM(例: GPT-4o)をデプロイし、APIキーとエンドポイントを設定しました。

Azure Blob Storage: RAGデータのオリジナルPDF、変換データ、音声ファイルなどを格納するコンテナを作成しました。

Azure Bot Service: ボットの登録とDirect Lineシークレットキーの取得を行いました。このキーはクライアントアプリケーションからのアクセス認証に不可欠です。

Azure Speech Services: Speech to Text (STT) と Text to Speech (TTS) のAPIキーとリージョンを設定しました。

Azure Cosmos DB: 会話履歴を格納するためのデータベースとコンテナを作成し、エンドポイントとキーを設定しました。

2. ローカル環境の準備

まず、Pythonのインストールと仮想環境のセットアップを行います。

requirements.txtに記載された必要なPythonパッケージをインストールします。

環境変数の設定: Azureサービスから取得した各種APIキー、エンドポイント、シークレットなどを.envファイルに記述し、コードから安全にアクセスできるようにしました。特に、DIRECT_LINE_SECRET、AZURE_SPEECH_API_KEY、AZURE_OPENAI_API_KEYなどの変数を設定しました。

3. Bot Framework Emulatorでのローカルテスト

開発初期段階では、Bot Framework Emulatorを使用してローカルでボットの動作検証を行いました。特に、MICROSOFT_APP_IDとMICROSOFT_APP_PASSWORDを.envファイルで空欄に設定することで、エミュレータとの連携がスムーズに行われました。これにより、Azureにデプロイする前に、基本的な会話フローやロジックのデバッグが可能でした。

4. Azure App Serviceへのデプロイ

Azure CLIを使用し、アプリケーションコードをAzure App Serviceにデプロイしました。

Azureにログインします。

az login

プロジェクトディレクトリに移動し、deploy.zipというデプロイパッケージを作成しました。

例:

zip -r deploy.zip . -x "venv/*" "*.pyc" "__pycache__/*" "*.sh"

デプロイには

az webapp deploy -g <リソースグループ名> -n <App Service名> --type zip --src-path deploy.zip

を実行しました。

デプロイ後、

curl -i https://<App Service URL>/health

でHTTP 200 OKが返ることを確認し、アプリケーションが正常に動作していることを検証しました。

デプロイプロセスでは、az webapp upコマンドの使用時にModuleNotFoundErrorが発生したり、デプロイされたファイルがwwwrootディレクトリに正しく配置されない問題に直面しました。これは、Pythonのランタイムバージョンが予期せず変更されたり、Oryxビルドエンジンがファイルを正しく展開しないことが原因でした。

解決策として、az webapp deployで明示的にZIPをデプロイし、必要に応じてKudu Bashを使って手動でファイルを解凍するなどの手順を踏みました。

これらの手順を通じて、音声RAGシステムがAzure上で安定して稼働する環境を構築しました。

開発プロセスと課題解決の流れ

本システムの開発は、フェーズ1からフェーズ7に分けて段階的に進められました。各フェーズでは、特定の機能の実装と検証、そして発生した課題の解決に焦点を当てました。

Phase 1: Azure AI Searchの構築とチャットプレイグラウンド連携

目的

RAGの基盤となる情報検索機能を構築し、Azure AI SearchとAzure OpenAI Serviceのチャットプレイグラウンドを連携させること。

実施内容

RAGで利用するPDFやテキストデータをAzure Blob Storageに格納しました。

格納されたデータに基づいてAzure AI Searchにインデックスを作成しました。

Azure OpenAI ServiceのチャットプレイグラウンドからAI Searchへの接続を確立し、ドキュメントの読み込みと参照が可能であることを確認しました。

このフェーズでは、主にテキストベースのRAGの基礎が確立されました。データインポートとインデックス作成のプロセスを確立し、チャットプレイグラウンドでの応答がRAGデータに基づいていることを確認しました。この段階では音声機能は含まれていませんでしたが、後のフェーズで音声I/Oを統合するための基盤となりました。

Phase 2: Azure Bot Serviceのロジック構築

目的

ユーザーとの対話インターフェースとなるAzure Bot Serviceを構築し、基本的なボットロジックを実装すること。さらに、Azure OpenAIとの連携を確立すること。

実施内容

Azure Bot Service上にEchoBotのテンプレートをデプロイし、基本的な動作を確認しました。

ボットがAzure OpenAIのLLMから応答を受け取り、ユーザーに返すロジックを実装し、Azure App Serviceへのデプロイを行いました。

課題と解決

Azure上の認証エラー: Bot Frameworkの認証設定が不十分で、Webチャット経由でボットにメッセージを送ると認証エラー(BotFrameworkの認証エラー)が発生しました。

BotFrameworkAdapterSettingsにchannel_auth_tenantパラメータを追加し、MICROSOFT_APP_TENANT_IDを設定することで、認証問題を解決しました。

Phase 3: データ変換スクリプトの作成

目的

RAGシステムに統合するためのPDFデータを、AI Searchで利用しやすい形式(テキスト、画像、JSON)に変換し、Azure Blob Storageに格納するスクリプトを開発すること。

実施内容

Azure Functionsを用いて、PDFファイルを処理するAzure Functions用スクリプトを開発しました。

このスクリプトは、Blob Storageの「input-data」コンテナに新しいPDFがアップロードされるとトリガーされ、Document Intelligence(旧: Form Recognizer)APIを利用してPDFの内容を構造化されたJSON、抽出されたテキスト(txt)、および各ページのスナップショット画像(PNG)に変換します。

変換されたデータは「converted-data」コンテナに格納され、AI Searchのインデックス作成の準備を整えました。

課題と解決

データ構造の最適化: AI Searchのインデックス作成に適したデータ形式を検討し、PDFのページごとに画像、テキスト、JSON形式で保存することで、検索時の柔軟性を高めました。

認証情報の管理: Azure Functions内でCognitive ServicesのAPIキーなどの機密情報を安全に管理するため、環境変数として設定しました。これにより、コード内でのハードコーディングを避け、セキュリティを確保しました。

Logic Appsとの連携: Azure Logic Appsを用いてBlob Storageへのファイル更新をトリガーとし、Azure Functionsを呼び出す自動化ワークフローを構築しました。これにより、RAGデータの取り込みプロセスが自動化されました。

Phase 4: Cosmos DBを活用したユーザーデータ・会話履歴の読み込み

目的

ユーザーごとの会話履歴を永続化し、セッション間で文脈を維持するためにAzure Cosmos DBをRAGシステムに統合すること。

実施内容

CosmosDBChatHistoryクラスを新しく作成し、ユーザーIDに基づいて会話履歴を読み込み、新しいターン(ユーザーの発言とAIの応答)を保存する機能を持たせました。

ボットのon_message_activityハンドラ内で、ユーザーからのメッセージが来るたびにCosmos DBから過去のメッセージを取得し、LLMへのプロンプトに含めるようにしました。

課題と解決

セッション更新のタイミング: Azure Bot Servicesに接続するUIの選定次第でセッション更新のタイミングを変更する必要がありました。

LLMに渡す会話履歴は、直近のメッセージ(最新30件)に限定し、コストと応答速度のバランスを取りました。

セッション管理の明確化: ユーザーIDをキーとして会話履歴を管理することで、異なるユーザー間での会話履歴が混在しないようにしました。Bot Services上のUIでのテスト時には最初のメッセージを表示するタイミングで過去の履歴をリセットするよう設定しました。

Phase 5: 全システム連携とプロンプト最適化

目的

これまでのフェーズで個別に構築してきたAzure AI Search、Azure OpenAI、Azure Bot Service、Azure Cosmos DBの各コンポーネントを統合し、テキスト入出力による完全なRAGシステムとして動作させること。

また、応答の質を高めるためのプロンプトエンジニアリングを行うこと。

実施内容

Azure OpenAIがAI Searchと連携し、取り込まれたドキュメントから情報を正確に取得できることをチャットプレイグラウンドで確認しました。

Cosmos DBとBot Service、App Service間の接続を確立し、ユーザーの会話履歴が正しく保存・ロードされることを検証しました。

ボットのsystem_promptを調整し、AIアシスタントの役割(例: 「家庭用洗濯機に関するAIアシスタント」)を明確にし、ユーザーにどのような情報を提供できるかをガイドするようにしました。

ユーザーに選択肢を提示し、より具体的な情報を引き出すための対話プロンプトを組み込みました。検索を絞り込むための質問をAIが行うようにしました。

課題と解決

会話履歴の文脈維持: Cosmos DBに保存された履歴が、新しい会話開始時に不要な過去の情報を参照してしまう問題がありました。

Cosmos DBからの履歴読み込みロジックを改善し、セッションIDや最新の会話のみを対象とするクエリを導入することで、文脈の適切な維持とリセットを可能にしました。また、Pythonスクリプトによるターミナル上での対話ではスクリプト実行時にセッション更新処理を行うよう変更しました。これにより、user_idに基づく履歴の分離と、新規セッションでの履歴クリアが期待通りに動作することを確認しました。

Phase 6: 音声入力(Speech to Text: STT)の実装

目的

ユーザーの音声入力をテキストに変換するSpeech to Text(STT)機能を実装し、ボットが音声でユーザーの意図を理解できるようにすること。

実施内容

Azure Speech ServicesのSTT APIを利用して、クライアントアプリケーションから録音された音声ファイルをテキストに変換するコードを作成しました。 ローカル環境でrecorded_to_text.pyスクリプトを開発し、音声ファイルが正確にテキストに変換されることを確認しました。

課題と解決

Direct Line APIのメッセージサイズ制限: 音声データを直接Direct Line API経由でボットに送信しようとした際に、「502 Server Error: Bad Gateway」ペイロードサイズに256KBという制限があるため、比較的サイズの大きな音声ファイルを送ると制限を超過してしまうことが原因でした。MessageSizeTooBigエラー (request content length exceeded limit of 262144 bytes) がログに記録されました。

この問題への根本的な解決策は、音声データを直接Direct Line API経由で送信しないという方針に転換することでした。音声ファイルはBlob Storageにアップロードし、そのURLをボットに渡すというアプローチが後のフェーズで検討されることになります。

エラーハンドリングの強化: 502エラーが発生した場合に、クライアント側でリトライを試みる、あるいはテキスト入力にフォールバックするなどの基本的なエラーハンドリングを導入しました。

Phase 7: 音声出力(Text to Speech: TTS)とBlob Storage連携

目的

ボットのテキスト応答を音声に変換するText to Speech(TTS)機能を実装し、生成された音声ファイルを効率的にユーザーに届けるためにAzure Blob Storageと連携し、Phase 6の音声入力も統合すること。

実施内容

Azure Speech ServicesのTTS APIを利用して、ボットのテキスト応答をWAV形式の音声データに変換する機能を実装しました。 Phase 6での課題(Direct Line APIのサイズ制限)Azure Blob Storageに保存するように設計しました。 Blob Storageに保存された音声ファイルへのShared Access Signature(SAS)付きのURLを生成し、このURLをボットのテキスト応答とともにクライアントに返却するようにしました。クライアントは、このURLを使用して音声ファイルをストリーミング再生します。

クライアントサイドのinteraction_client.pyを更新し、返却されたAudio URLをsd.play()で再生するロジックを組み込みました。

課題と解決

「MessageSizeTooBig」エラーの根本的解決: Phase 6で直面したDirect Line APIのペイロードサイズ制限の問題を、Blob StorageとSAS URLを活用することで解決しました。これにより、大きな音声ファイルでも効率的に配信できるようになりました。 クライアントでの音声再生エラー: クライアント側のPythonスクリプトで音声再生ライブラリ(sounddeviceなど)を使用する際に、segmentation faultなどのエラーが発生したり、ノイズが混入する問題が発生しました。

音声再生ライブラリの適切な使用法を確認し、特にnumpyで音声データを扱う際の型変換やバッファ処理を最適化しました。また、クライアント側でのエラーが発生しても全体処理が停止しないよう、try-exceptブロックで堅牢化しました。

ボットがAudio URLを取得するまでに最大5秒間の待機時間(タイムアウト)を設定することで、クライアント側での音声再生がよりスムーズに開始されるように調整しました。

ファイル名管理: Blob Storageに保存される音声ファイルの命名規則を、セッションやタイムスタンプに基づいて一意になるようにしました。

502 Server Error (Bad Gateway) および 504 (Bot timed out) の発生と対応: Direct Line APIを介してボットにメッセージを送信する際や、ボットからの音声応答(Audio URL)を取得する際に、502 Server Error: Bad Gateway や 504 (Bot timed out) といったHTTPエラーが見られました。

ボットへメッセージを送信した際にHTTP 502エラーが発生し、ボットからは”Failed to send activity: bot timed out”というメッセージとともにHTTP 504が返される事象が確認されています。また、音声ファイルをテキストに変換するスクリプト(recorded_to_text.py)を実行した際に、Direct Line APIへのリクエストで502 Server Errorが発生しました。

また、レスポンスの取得タイミング次第ではLLMからの応答が格納されたJSONではなく直前に自身が送ったテキストのJSONを取得しようとしてしまう場合がありました。

メッセージ送信時の502エラーに対しては、tryブロックでエラーを検知した場合にpassするよう実装しました。 ボットからの応答(audio_url)を取得する際にそれより前の自分のメッセージが取得されてしまう問題に対しては、Audio URLが取得できるまで最大5秒間リトライするよう設定することで、処理自体は問題なく動作するようになりました。

※エラーを見るとタイムアウトが発生している場合がありますが、確認した限り、Pythonのスクリプトから送信したメッセージに対してBotとApp Serviceとの間での送信処理でApp Serviceからの応答がタイムアウトしている形になります。送信時の処理なのでレスポンスを使用しないためここではpassしても問題なく動作しています。

これらのエラーの原因として、主にクライアントからボットサービスへのアクセスがApp Serviceのタイムアウト設定を超過したこと、またはボットからの応答取得のタイミングに問題がある可能性(特にデプロイがテスト実行中に発生した場合)が考えられます。

これらのフェーズを経て、ユーザーが音声で質問し、AIが音声で回答を返す、一連のフルボイスRAGシステムが完成しました。

最終的なスクリプトと動作例

以下に、今回使用した主なスクリプトと.envファイルの例を示します。

クライアント側

.env

AZURE_SPEECH_API_KEY=
AZURE_SPEECH_API_REGION=
DIRECT_LINE_SECRET=

interaction_client.py

import os
import base64
import uuid
import requests
from dotenv import load_dotenv
from time import sleep
import wave
import simpleaudio as sa
import io
import numpy as np
import sounddevice as sd
load_dotenv()

DL_SECRET = os.getenv("DIRECT_LINE_SECRET")
USER_ID = "user1"

###################################
## 入力形式の選択: "text" or "audio"
###################################
input_type = "audio"

# 1. Direct Line トークン取得
def get_directline_token(user_id=USER_ID):
    url = "https://directline.botframework.com/v3/directline/tokens/generate"
    headers = {
        "Authorization": f"Bearer {DL_SECRET}",
        "Content-Type": "application/json"
    }
    payload = {"User": {"Id": user_id}}
    res = requests.post(url, headers=headers, json=payload)
    res.raise_for_status()
    return res.json()["token"]

# 2. 会話作成
def create_conversation(dl_token):
    url = "https://directline.botframework.com/v3/directline/conversations"
    headers = {"Authorization": f"Bearer {dl_token}"}
    res = requests.post(url, headers=headers)
    res.raise_for_status()
    return res.json()["conversationId"]

# 3. Bot にメッセージ送信
def send_message(dl_token, conv_id, text, session_id, user_id=USER_ID):
    url = f"https://directline.botframework.com/v3/directline/conversations/{conv_id}/activities"
    headers = {
        "Authorization": f"Bearer {dl_token}",
        "Content-Type": "application/json"
    }
    payload = {
        "type": "message",
        "from": {"id": user_id},
        "text": text,
        "channelData": {"session_id": session_id}
        }
    res = requests.post(url, headers=headers, json=payload)
    print(f"res: {res}")
    res.raise_for_status()
    return res.json()["id"]

# 録音開始
def record_wav(filename="input.wav", sec=10, rate=16000):
    print("録音開始...")
    audio = sd.rec(int(sec * rate), samplerate=rate, channels=1, dtype="int16")
    sd.wait()
    with wave.open(filename, 'wb') as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)  # 16bit
        wf.setframerate(rate)
        wf.writeframes(audio.tobytes())
    print(f"録音終了: {filename}")

def send_audio_message(dl_token, conv_id, audio_path, user_id=USER_ID):
    url = f"https://directline.botframework.com/v3/directline/conversations/{conv_id}/activities"
    headers = {
        "Authorization": f"Bearer {dl_token}",
        "Content-Type": "application/json"
    }
    # 音声ファイルをbase64で送信
    with open(audio_path, "rb") as f:
        audio_b64 = base64.b64encode(f.read()).decode("utf-8")
    payload = {
        "type": "message",
        "from": {"id": user_id},
        "attachments": [
            {
                "contentType": "audio/wav",
                "content": audio_b64,
                "name": os.path.basename(audio_path)
            }
        ]
    }
    res = requests.post(url, headers=headers, json=payload)
    res.raise_for_status()
    return res.json()["id"]

# 4. Bot の返信取得
def get_reply(dl_token, conv_id, watermark=None):
    url = f"https://directline.botframework.com/v3/directline/conversations/{conv_id}/activities"
    headers = {"Authorization": f"Bearer {dl_token}"}
    params = {"watermark": watermark} if watermark else {}
    res = requests.get(url, headers=headers, params=params)
    res.raise_for_status()
    data = res.json()
    activities = data["activities"]
    last_activity = activities[-1] if activities else None
    watermark = data.get("watermark")
    return last_activity, watermark

def save_audio_attachment(activity, filename="output.wav"):
    attachments = activity.get("attachments", [])
    for att in attachments:
        if att.get("contentType") == "audio/wav" and "content" in att:
            audio_base64 = att["content"]
            with open(filename, "wb") as f:
                f.write(base64.b64decode(audio_base64))
            print(f"Saved speech to {filename}")
            return True
    print("No audio attachment found!")
    return False

def play_audio_from_url(url):
    response = requests.get(url)
    response.raise_for_status()
    wav_bytes = io.BytesIO(response.content)
    with wave.open(wav_bytes, 'rb') as wf:
        rate = wf.getframerate()
        frames = wf.readframes(wf.getnframes())
        dtype = np.int16 if wf.getsampwidth()==2 else np.uint8
        audio = np.frombuffer(frames, dtype=dtype)
        sd.play(audio, samplerate=rate)
        sd.wait()

def main():
    print("Getting Direct Line token...")
    dl_token = get_directline_token()

    print("Creating conversation...")
    conv_id = create_conversation(dl_token)

    watermark = None
    session_id = str(uuid.uuid4())

    while True:
        print("1: botにメッセージを送信")
        print("2: 会話を終了")
        cmd = input("番号を入力してください: ").strip()

        if cmd == "1":
            if input_type == "text":
                user_text = input("送信テキストを入力してください: ").strip()
                if not user_text:
                    print("空メッセージは送信されません。")
                    continue

                try:
                    send_message(dl_token, conv_id, user_text, session_id)
                except Exception as e:
                    pass

            elif input_type == "audio":
                record_wav("input.wav")
                audio_path = "input.wav"
                send_audio_message(dl_token, conv_id, audio_path, session_id)

            MAX_RETRY = 5
            reply = None
            audio_url = None
            print(f"Bot応答待機中...")
            for i in range(MAX_RETRY):
                sleep(2)
                reply, watermark = get_reply(dl_token, conv_id, watermark)

                if reply:
                    channel_data = reply.get("channelData", {})
                    audio_url = channel_data.get("audio_url") if channel_data else None
                    if audio_url:
                        break

            if reply:
                print("Bot reply:", reply.get("text", ""))
                if audio_url:
                    play_audio_from_url(audio_url)
                    sleep(2)

            else:
                print("Botから返信がありません。")

        elif cmd == "2":
            print("会話を終了します")
            break

        else:
            print("正しい番号を入力してください")
            continue

if __name__ == "__main__":
    main()

Azure側

app.py

import os
from aiohttp import web
from botbuilder.core import BotFrameworkAdapter, BotFrameworkAdapterSettings, TurnContext, MemoryStorage, ConversationState
from botbuilder.schema import Activity
from bot import EchoBot
from dotenv import load_dotenv

load_dotenv()

APP_ID = os.getenv("MICROSOFT_APP_ID", "")
APP_PASSWORD = os.getenv("MICROSOFT_APP_PASSWORD", "")
TENANT_ID = os.getenv("MICROSOFT_APP_TENANT_ID", "")
adapter_settings = BotFrameworkAdapterSettings(APP_ID, APP_PASSWORD,channel_auth_tenant=TENANT_ID)
adapter = BotFrameworkAdapter(adapter_settings)

memory = MemoryStorage()
conversation_state = ConversationState(memory)
bot = EchoBot(conversation_state)

async def messages(req: web.Request) -> web.Response:
    body = await req.json()
    activity = Activity().deserialize(body)
    auth_header = req.headers.get("Authorization", "")
    await adapter.process_activity(activity, auth_header, bot.on_turn)
    return web.Response(status=200)

# ここから health 追加
async def health(req: web.Request) -> web.Response:
    return web.json_response({"status": "ok"})
# ここまで

app = web.Application()
app.router.add_post("/api/messages", messages)
app.router.add_get("/health", health)  # ← 追加

if __name__ == "__main__":
    port = int(os.environ.get("PORT", 8000))
    web.run_app(app, host="0.0.0.0", port=port)

bot.py

from botbuilder.core import ActivityHandler, TurnContext, ConversationState
from botbuilder.schema import Attachment, Activity, ActivityTypes
from connector.connect_openai import ask_openai
from connector.connect_cosmos_db import CosmosDBChatHistory
from connector.connect_blob_storage import ConnectBlobStorage
from connector.connect_speech_service import ConnectSpeechService
import uuid


from dotenv import load_dotenv
from datetime import datetime, timedelta

load_dotenv()
class EchoBot(ActivityHandler):
    def __init__(self, conversation_state: ConversationState):
        self.conversation_state = conversation_state
        self.session_id_property = self.conversation_state.create_property("session_id")

    async def on_message_activity(self, turn_context: TurnContext):

        activity = turn_context.activity

        session_id = None
        if hasattr(activity, 'channel_data') and isinstance(activity.channel_data, dict):
            session_id = activity.channel_data.get("session_id")

        if not session_id:
            session_id = str(uuid.uuid4())

        user_input = None
        speech_service = ConnectSpeechService()

        if activity.attachments and len(activity.attachments) > 0:
            for att in activity.attachments:
                if att.content_type == "audio/wav":
                    import base64
                    audio_bin = base64.b64decode(att.content)
                    user_input = speech_service.convert_audio_to_text(audio_bin)
                    print("STT(音声→テキスト)結果:", user_input)
                    break
        else:
            user_input = activity.text


        #user_input = turn_context.activity.text
        user_id = turn_context.activity.from_property.id
        username = turn_context.activity.from_property.name
        system_prompt = {"role": "system", "content": "あなたは○○についての質問に答えるAI アシスタントです。"
        }
        history_manager = CosmosDBChatHistory(user_id, session_id)
        previous_messages = history_manager.load_messages()
        messages = [system_prompt] + previous_messages
        print("テキスト起こし結果",user_input)

        messages.append({"role": "user", "content": user_input})
        response = ask_openai(messages)
        ai_reply = response.choices[0].message.content
        print("REPLY:",ai_reply )


        audio_bin=speech_service.convert_text_to_audio(ai_reply)

        blob=ConnectBlobStorage()
        sas_url=blob.save_audio_data(audio_bin)
        print("URL",sas_url)

        history_manager.add_turn(user_input, ai_reply, username)
        await self.conversation_state.save_changes(turn_context)

        reply_activity = Activity(
            type=ActivityTypes.message,
            text=ai_reply,
            channel_data={
                "audio_url": sas_url
            }
        )
        await turn_context.send_activity(reply_activity)

    async def on_members_added_activity(self, members_added, turn_context: TurnContext):
        session_id = str(uuid.uuid4())
        await self.session_id_property.set(turn_context, session_id)
        await self.conversation_state.save_changes(turn_context)

        for member in members_added:
            if member.id != turn_context.activity.recipient.id:
                await turn_context.send_activity("こんにちは!ご質問をどうぞ。")

動作例

python interaction_client.py
Getting Direct Line token...
Creating conversation...
1: botにメッセージを送信
2: 会話を終了
番号を入力してください: 1
録音開始...

Botからの返信...
Bot reply: (botからの応答が表示されます)

Audio URL: https://example.com/...

1: botにメッセージを送信
2: 会話を終了

今後の展望

今回のプロジェクトでは応答速度を重視していませんが、エラーハンドリングの強化や音声応答の取得処理の最適化、クライアント側の処理の改善によって処理速度の向上も期待できます。