はじめに
こんにちは、エンジニアのクロ(@kro96_xr)です。
最近、5年くらい前にQiitaで書いた記事にちょこちょこいいねをいただくんですよね。
なぜ今更この記事を見てもらえているんだろうと考えていたところ、ChatGPT APIを用いたLineBotの作成に取り組んでいる人がいるのではないかと思いました。というわけで私も試してみることにしました。
二番煎じどころではありませんが、自身の体験を共有することで皆さんの参考になればと思います!
なお、今回検証したコードは、以下のリポジトリにまとめています。
※先にお詫びです
記事を書きながらも検証しているのですが、時間帯によってはタイムアウトでLambdaの処理が終了します。
ログを見ているとAPIレスポンスが時間内に返ってこないようでした。
Lambdaのタイムアウト値はAPI Gatewayの制約内で最大にしているので、安定運用のためには対応を考える必要がありそうです。
ブラウザ版でも重いことがあるため仕方ないのかもしれませんね…
実装結果
まず、この記事で実装したBotのデモになります。以下のスクリーンショットをご覧ください。
ChatGPTのように会話の流れを理解してくれていますね。
構成
当初は、過去に実装したLineBotと同様にHeroku上にFlaskで実装しようかと考えていました。
しかし、ChatGPTとは異なり、API単体では文脈を覚えることができないため、メッセージを保存しておくデータストアが必要でした。
そのため、今回は以下の構成を採用することにしました。
構成の検討にあたり、Classmethodさんの記事を参考にさせていただきました。いつもありがとうございます。
- API Gateway
- Lambda
- DynamoDB
構成を図示すると、以下のようになります。
なお、Lambdaのランタイムは過去と同様Pythonでやっていきます。
実装
ここからは、実際の実装について見ていきます。ちなみに、この辺りもChatGPTと一緒に開発しています。
ディレクトリ構成
ディレクトリ構成は、以下のようになっています。
. ├── .env ├── handler.py ├── requirements.txt └── serverless.yml
環境変数
まず環境変数にAPIキーなどの情報を設定します。
OPENAI_API_KEY= LINE_CHANNEL_SECRET= LINE_CHANNEL_ACCESS_TOKEN= DYNAMODB_TABLE=
インフラ構築
次に、インフラ構築をServerless Frameworkを使用して行います。
serverless-python-requirements プラグインを使っているため、事前にインストールしてください。
sls plugin install -n serverless-python-requirements
serverless.ymlは以下のようになります。
service: openai-line-bot frameworkVersion: '3' useDotenv: true # カスタム変数 custom: defaultStage: dev pythonRequirements: dockerizePip: non-linux # プロバイダ provider: name: aws runtime: python3.9 timeout: 30 stage: ${opt:stage, self:custom.defaultStage} region: ap-northeast-1 # Lambda用の権限 iamRoleStatements: - Effect: Allow Action: - dynamodb:Query - dynamodb:Scan - dynamodb:GetItem - dynamodb:PutItem - dynamodb:UpdateItem - dynamodb:DeleteItem Resource: "arn:aws:dynamodb:${aws:region}:*:table/${env:DYNAMODB_TABLE}" # Lambda functions: webhook: handler: handler.webhook events: - http: path: webhook method: post cors: true environment: OPENAI_API_KEY: ${env:OPENAI_API_KEY} LINE_CHANNEL_SECRET: ${env:LINE_CHANNEL_SECRET} LINE_CHANNEL_ACCESS_TOKEN: ${env:LINE_CHANNEL_ACCESS_TOKEN} DYNAMODB_TABLE: ${env:DYNAMODB_TABLE} # DynamoDB resources: Resources: MessageHistoryTable: Type: AWS::DynamoDB::Table Properties: TableName: ${env:DYNAMODB_TABLE} AttributeDefinitions: - AttributeName: user_id AttributeType: S - AttributeName: timestamp AttributeType: N KeySchema: - AttributeName: user_id KeyType: HASH - AttributeName: timestamp KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 plugins: - serverless-python-requirements
インフラの中で試行錯誤したポイントはLambdaのタイムアウト値とDynamoDBの設計です。
タイムアウト値についてはデフォルトが6秒ですが、そのままでは結構な頻度でタイムアウトが発生したため30秒にしました。
また、DynamoDBについては履歴の管理を想定したテーブル設計が必要でした。
テスト用なので自分しか使わない想定ですが、複数ユーザーに対応できるように、user_id を追加しています。
そして、user_id をパーティションキー、timestamp をソートキーとして設定することで、「時系列でソートされたユーザー単位の履歴」を取得できるようにしています。
属性名 | 型 | キー |
---|---|---|
user_id | string | パーティションキー |
message | string | |
timestamp | number | ソートキー |
問題としては、完全に同じtimestampでデータを保存しようとするとキーが重複してエラーになることですが、現実的にはほぼ発生しないと判断し今回は無視しています。
DynamoDBの設計自体にあまり慣れていないため、より良い方法があればご教示いただけると嬉しいです。
handlerの実装
次に、handlerの実装について説明します。
APIを呼び出す部分は以下のライブラリを使用しています。
github.com
検証中に返信が返ってこないことがあったため、処理ごとにログを出力しています。
また、DynamoDBから取得する履歴は、とりあえず10件としています。
これは、履歴が多すぎると入力トークン数の上限に引っかかるという問題があるためです。
履歴を残さないと文脈を理解できない一方、多すぎると問題が発生するということで塩梅が難しいですね。
import json import os from shutil import ExecError import time import uuid import openai import boto3 from boto3.dynamodb.conditions import Key from linebot import LineBotApi, WebhookHandler from linebot.exceptions import InvalidSignatureError, LineBotApiError from linebot.models import MessageEvent, TextMessage, TextSendMessage # 環境変数から必要な情報を取得 OPENAI_API_KEY = os.environ['OPENAI_API_KEY'] LINE_CHANNEL_SECRET = os.environ['LINE_CHANNEL_SECRET'] LINE_CHANNEL_ACCESS_TOKEN = os.environ['LINE_CHANNEL_ACCESS_TOKEN'] DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE'] openai.api_key = OPENAI_API_KEY line_bot_api = LineBotApi(LINE_CHANNEL_ACCESS_TOKEN) handler = WebhookHandler(LINE_CHANNEL_SECRET) dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(DYNAMODB_TABLE) def webhook(event, context): print("Webhook called") signature = event["headers"].get("x-line-signature") or event["headers"].get("X-Line-Signature") body = event['body'] try: handler.handle(body, signature) except InvalidSignatureError: return { 'statusCode': 400, 'body': json.dumps({'message': 'Invalid signature'}) } return { 'statusCode': 200, 'body': json.dumps({'message': 'OK'}) } @handler.add(MessageEvent, message=TextMessage) def handle_message(event): send_timestamp = int(time.time() * 1000) print("Handling message") user_id = event.source.user_id user_message = event.message.text user_message_obj = {"role": "user", "content": user_message} print(f"User message: {user_message}") # 履歴の取得 message_history = get_message_history(user_id) # 履歴の順序を変えて最新のメッセージを追加 messages = [{"role": item["message"]["role"], "content": item["message"]["content"]} for item in reversed(message_history)] messages.append(user_message_obj) print(f"Message history: {messages}") # API呼び出し try: openai_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, request_timeout=20 ) except Exception as e: print(f"Error generating AI response: {e}") line_bot_api.reply_message(event.reply_token, TextSendMessage(text="エラーが発生しました。AIからの応答の生成に失敗しました。")) return # レスポンスの取得 ai_message = openai_response.choices[0].message.content ai_message_obj = {"role": "assistant", "content": ai_message} receive_timestamp = int(time.time() * 1000) print(f"AI message: {ai_message}") # LINEへの返答 try: line_bot_api.reply_message(event.reply_token, TextSendMessage(text=ai_message)) except LineBotApiError as e: print(f"Error sending AI response: {e}") return # ユーザー発言の保存 try: save_message_to_history(user_id, user_message_obj, send_timestamp) except Exception as e: print(f"Error saving user message: {e}") line_bot_api.reply_message(event.reply_token, TextSendMessage(text="エラーが発生しました。メッセージの保存に失敗しました。")) return # AI発言の保存 try: save_message_to_history(user_id, ai_message_obj , receive_timestamp) except Exception as e: print(f"Error saving AI message: {e}") line_bot_api.reply_message(event.reply_token, TextSendMessage(text="エラーが発生しました。メッセージの保存に失敗しました。")) def get_message_history(user_id, limit=10): print(f"Getting message history for user: {user_id}") response = table.query( KeyConditionExpression=Key('user_id').eq(user_id), Limit=limit, ScanIndexForward=False ) return response['Items'] def save_message_to_history(user_id, message, timestamp): print(f"Saving message to history: {message}") table.put_item( Item={ 'user_id': user_id, 'timestamp': timestamp, 'message': message } )
requirements
そして、最後にrequirements.txtです。執筆時点での最新版になっています。
line-bot-sdk==2.4.2 boto3==1.26.96 openai==0.27.2
デプロイ
ここまでできたらServerlessFrameworkでデプロイします。
sls deploy --aws-profile {profile} --stage dev
作成に成功するとエンドポイントが表示されるので、Messaging APIのWebhook URLに設定します。
説明は過去の自分の記事や他の記事を確認してください。
これで構築と設定は完了です。
LINE Botを追加し、メッセージを送信するとデモのようにやりとりすることができます。
おわりに
以上、簡単にではありますがServerless Frameworkを使ってバックエンドを構築し、LineBotを作ってみました。
ひとまず動くところまでは確認できたのでよかったです。
ただ、はじめにでも書いたようにタイムアウトの問題や、履歴のリセット機能や人格の設定など改善したい点はまだまだありますね。
このような改善ポイントについては引き続き検証、対応していきたいと思います!
以上、拙い記事ですが何かの参考になれば幸いです。