ServerlessFrameworkを使ってChatGPT APIを使ったLineBotを作る

はじめに

こんにちは、エンジニアのクロ(@kro96_xr)です。

最近、5年くらい前にQiitaで書いた記事にちょこちょこいいねをいただくんですよね。

qiita.com

なぜ今更この記事を見てもらえているんだろうと考えていたところ、ChatGPT APIを用いたLineBotの作成に取り組んでいる人がいるのではないかと思いました。というわけで私も試してみることにしました。

二番煎じどころではありませんが、自身の体験を共有することで皆さんの参考になればと思います!

なお、今回検証したコードは、以下のリポジトリにまとめています。

github.com

※先にお詫びです
記事を書きながらも検証しているのですが、時間帯によってはタイムアウトでLambdaの処理が終了します。
ログを見ているとAPIレスポンスが時間内に返ってこないようでした。
Lambdaのタイムアウト値はAPI Gatewayの制約内で最大にしているので、安定運用のためには対応を考える必要がありそうです。
ブラウザ版でも重いことがあるため仕方ないのかもしれませんね…

実装結果

まず、この記事で実装したBotのデモになります。以下のスクリーンショットをご覧ください。

ChatGPTのように会話の流れを理解してくれていますね。

構成

当初は、過去に実装したLineBotと同様にHeroku上にFlaskで実装しようかと考えていました。
しかし、ChatGPTとは異なり、API単体では文脈を覚えることができないため、メッセージを保存しておくデータストアが必要でした。

そのため、今回は以下の構成を採用することにしました。
構成の検討にあたり、Classmethodさんの記事を参考にさせていただきました。いつもありがとうございます。

  • API Gateway
  • Lambda
  • DynamoDB

構成を図示すると、以下のようになります。

なお、Lambdaのランタイムは過去と同様Pythonでやっていきます。

実装

ここからは、実際の実装について見ていきます。ちなみに、この辺りもChatGPTと一緒に開発しています。

ディレクトリ構成

ディレクトリ構成は、以下のようになっています。

.
├── .env
├── handler.py
├── requirements.txt
└── serverless.yml

環境変数

まず環境変数にAPIキーなどの情報を設定します。

OPENAI_API_KEY=
LINE_CHANNEL_SECRET=
LINE_CHANNEL_ACCESS_TOKEN=
DYNAMODB_TABLE=

インフラ構築

次に、インフラ構築をServerless Frameworkを使用して行います。
serverless-python-requirements プラグインを使っているため、事前にインストールしてください。

sls plugin install -n serverless-python-requirements

serverless.ymlは以下のようになります。

service: openai-line-bot
frameworkVersion: '3'
useDotenv: true
# カスタム変数
custom:
  defaultStage: dev
  pythonRequirements:
    dockerizePip: non-linux
# プロバイダ
provider:
  name: aws
  runtime: python3.9
  timeout: 30
  stage: ${opt:stage, self:custom.defaultStage}
  region: ap-northeast-1
# Lambda用の権限
  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
      Resource: "arn:aws:dynamodb:${aws:region}:*:table/${env:DYNAMODB_TABLE}"
# Lambda
functions:
  webhook:
    handler: handler.webhook
    events:
      - http:
          path: webhook
          method: post
          cors: true
    environment:
      OPENAI_API_KEY: ${env:OPENAI_API_KEY}
      LINE_CHANNEL_SECRET: ${env:LINE_CHANNEL_SECRET}
      LINE_CHANNEL_ACCESS_TOKEN: ${env:LINE_CHANNEL_ACCESS_TOKEN}
      DYNAMODB_TABLE: ${env:DYNAMODB_TABLE}
# DynamoDB
resources:
  Resources:
    MessageHistoryTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${env:DYNAMODB_TABLE}
        AttributeDefinitions:
          - AttributeName: user_id
            AttributeType: S
          - AttributeName: timestamp
            AttributeType: N
        KeySchema:
          - AttributeName: user_id
            KeyType: HASH
          - AttributeName: timestamp
            KeyType: RANGE
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1


plugins:
  - serverless-python-requirements

インフラの中で試行錯誤したポイントはLambdaのタイムアウト値とDynamoDBの設計です。
タイムアウト値についてはデフォルトが6秒ですが、そのままでは結構な頻度でタイムアウトが発生したため30秒にしました。

また、DynamoDBについては履歴の管理を想定したテーブル設計が必要でした。
テスト用なので自分しか使わない想定ですが、複数ユーザーに対応できるように、user_id を追加しています。
そして、user_id をパーティションキー、timestamp をソートキーとして設定することで、「時系列でソートされたユーザー単位の履歴」を取得できるようにしています。

属性名 キー
user_id string パーティションキー
message string
timestamp number ソートキー

問題としては、完全に同じtimestampでデータを保存しようとするとキーが重複してエラーになることですが、現実的にはほぼ発生しないと判断し今回は無視しています。
DynamoDBの設計自体にあまり慣れていないため、より良い方法があればご教示いただけると嬉しいです。

handlerの実装

次に、handlerの実装について説明します。
APIを呼び出す部分は以下のライブラリを使用しています。
github.com

検証中に返信が返ってこないことがあったため、処理ごとにログを出力しています。

また、DynamoDBから取得する履歴は、とりあえず10件としています。
これは、履歴が多すぎると入力トークン数の上限に引っかかるという問題があるためです。
履歴を残さないと文脈を理解できない一方、多すぎると問題が発生するということで塩梅が難しいですね。

import json
import os
from shutil import ExecError
import time
import uuid
import openai
import boto3
from boto3.dynamodb.conditions import Key
from linebot import LineBotApi, WebhookHandler
from linebot.exceptions import InvalidSignatureError, LineBotApiError
from linebot.models import MessageEvent, TextMessage, TextSendMessage

# 環境変数から必要な情報を取得
OPENAI_API_KEY = os.environ['OPENAI_API_KEY']
LINE_CHANNEL_SECRET = os.environ['LINE_CHANNEL_SECRET']
LINE_CHANNEL_ACCESS_TOKEN = os.environ['LINE_CHANNEL_ACCESS_TOKEN']
DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE']

openai.api_key = OPENAI_API_KEY
line_bot_api = LineBotApi(LINE_CHANNEL_ACCESS_TOKEN)
handler = WebhookHandler(LINE_CHANNEL_SECRET)
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE)

def webhook(event, context):
    print("Webhook called")
    signature = event["headers"].get("x-line-signature") or event["headers"].get("X-Line-Signature")
    body = event['body']

    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        return {
            'statusCode': 400,
            'body': json.dumps({'message': 'Invalid signature'})
        }

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'OK'})
    }

@handler.add(MessageEvent, message=TextMessage)
def handle_message(event):
    send_timestamp = int(time.time() * 1000)
    print("Handling message")
    user_id = event.source.user_id
    user_message = event.message.text
    user_message_obj = {"role": "user", "content": user_message}
    print(f"User message: {user_message}")
    # 履歴の取得
    message_history = get_message_history(user_id)
    # 履歴の順序を変えて最新のメッセージを追加
    messages = [{"role": item["message"]["role"], "content": item["message"]["content"]} for item in reversed(message_history)]
    messages.append(user_message_obj)
    print(f"Message history: {messages}")
    # API呼び出し
    try:
        openai_response = openai.ChatCompletion.create(
            model="gpt-3.5-turbo",
            messages=messages,
            request_timeout=20
        )
    except Exception as e:
        print(f"Error generating AI response: {e}")
        line_bot_api.reply_message(event.reply_token, TextSendMessage(text="エラーが発生しました。AIからの応答の生成に失敗しました。"))
        return
    # レスポンスの取得
    ai_message = openai_response.choices[0].message.content
    ai_message_obj = {"role": "assistant", "content": ai_message}
    receive_timestamp = int(time.time() * 1000)
    print(f"AI message: {ai_message}")
    # LINEへの返答
    try:
        line_bot_api.reply_message(event.reply_token, TextSendMessage(text=ai_message))
    except LineBotApiError as e:
        print(f"Error sending AI response: {e}")
        return
    # ユーザー発言の保存
    try:
        save_message_to_history(user_id, user_message_obj, send_timestamp)
    except Exception as e:
        print(f"Error saving user message: {e}")
        line_bot_api.reply_message(event.reply_token, TextSendMessage(text="エラーが発生しました。メッセージの保存に失敗しました。"))
        return
    # AI発言の保存
    try:
        save_message_to_history(user_id, ai_message_obj , receive_timestamp)
    except Exception as e:
        print(f"Error saving AI message: {e}")
        line_bot_api.reply_message(event.reply_token, TextSendMessage(text="エラーが発生しました。メッセージの保存に失敗しました。"))

def get_message_history(user_id, limit=10):
    print(f"Getting message history for user: {user_id}")
    response = table.query(
        KeyConditionExpression=Key('user_id').eq(user_id),
        Limit=limit,
        ScanIndexForward=False
    )
    return response['Items']

def save_message_to_history(user_id, message, timestamp):
    print(f"Saving message to history: {message}")
    table.put_item(
        Item={
            'user_id': user_id,
            'timestamp': timestamp,
            'message': message
        }
    )

requirements

そして、最後にrequirements.txtです。執筆時点での最新版になっています。

line-bot-sdk==2.4.2
boto3==1.26.96
openai==0.27.2

デプロイ

ここまでできたらServerlessFrameworkでデプロイします。

sls deploy --aws-profile {profile} --stage dev

作成に成功するとエンドポイントが表示されるので、Messaging APIのWebhook URLに設定します。
説明は過去の自分の記事や他の記事を確認してください。

これで構築と設定は完了です。

LINE Botを追加し、メッセージを送信するとデモのようにやりとりすることができます。

おわりに

以上、簡単にではありますがServerless Frameworkを使ってバックエンドを構築し、LineBotを作ってみました。
ひとまず動くところまでは確認できたのでよかったです。

ただ、はじめにでも書いたようにタイムアウトの問題や、履歴のリセット機能や人格の設定など改善したい点はまだまだありますね。
このような改善ポイントについては引き続き検証、対応していきたいと思います!

以上、拙い記事ですが何かの参考になれば幸いです。

参考にさせていただいた記事

dev.classmethod.jp

eng-blog.iij.ad.jp

chatgpt-lab.com

おわりに