Kinesis Data Firehose+S3を使ったログ基盤をTerraformで構築する

はじめに

こんにちは、エンジニアのクロ(@kro96_xr)です。

今回はサーバレスなログ基盤を構築、検証してみたため、その内容について書きたいと思います。

検証を実施した背景

弊社はSYNMNというアプリを提供していますが、将来的にアプリ内でのユーザー行動ログを収集して分析を行いたいという要望がありました。
イメージとしてはGoogle Analiticsを使ってWebサイトのユーザー動線や滞在時間を行うような感じでしょうか。

元々サーバログをCloudWatch Logsに流していたため、サーバ側で検知できるもの(API実行ログ、エラーログ等)については取得できています。
しかし、アプリ内の行動ログとなるとサーバを経由しない情報もあり、アプリから直接ログをPUTする方法を検討する必要がありました。
AWS SDKを使えばアプリから直接ログをPUTできるということで、Kinesis Firehose(以下Firehose)を検証することになりました。

インフラ構成と検証の流れ

検証した構成と流れは以下のようになります

  • 検証スクリプトからFirehoseにテスト用のログデータをPUT
  • FirehoseからLambda関数を呼び出してデータを変換する
    • 今回はログにタイムスタンプを追加します
  • 変換されたデータをS3に保存
  • (S3に保存したデータをAthenaで取得する)今回の記事では対象外とさせていただきます。

また、データ保存時は動的パーティショニングを使い、データのグループ化を行いたいと思います。

Terraformでの環境構築

それでは各リソースを作成するためのTerraformのコードを見ていきます。
以下の内容は社内検証用に書いたコードに手を加えたものになります。

S3

生データを保存するためのバケットを作成します。特に特殊なことはないかと思います。

# ログ用のバケット
resource "aws_s3_bucket" "main" {
  bucket = "analitics-logs"
}

# バージョニング設定
resource "aws_s3_bucket_versioning" "main" {
  bucket = aws_s3_bucket.main.id
  versioning_configuration {
    status = "Enabled"
  }
}

# サーバサイド暗号化設定
resource "aws_s3_bucket_server_side_encryption_configuration" "main" {
  bucket = aws_s3_bucket.main.id

  rule {
    apply_server_side_encryption_by_default {
      sse_algorithm = "AES256"
    }
  }
}

Lambda

データ変換をかけるためのLambda関数を作成します。

今回はPythonを使ってデータが送信された時間をログに付与する処理を書いています。
Firehoseのメッセージイベントの中にある"approximateArrivalTimestamp"という項目をログに追加しています。
Amazon Kinesis Data Firehose で AWS Lambda を使用する - AWS Lambda

import json
import base64
import datetime

def lambda_handler(event, context):

    results = []
    records = event["records"]
    for record in records:
        recordId = record["recordId"]
        data = record["data"]
        unixtime_micro = record["approximateArrivalTimestamp"]
        unixtime_milli = unixtime_micro//1000
        # Base64からデコード
        decoded_data = base64.b64decode(data).decode("utf-8")

        # JSONの処理
        payload = json.loads(decoded_data)        
        timestamp = datetime.datetime.fromtimestamp(unixtime_milli)
        payload['server_timestamp_utc'] = timestamp
        decoded_data = json.dumps(payload, default=str)
        
        # Base64に再エンコード
        data = base64.b64encode(decoded_data.encode())

        results.append({
            "result":"Ok",
            "recordId":recordId,
            "data":data
        })
        
    return {
        "records":results
    }

Terraformのコードは以下になります。

# ファイルのzip化をplan時に行う
data "archive_file" "add_timestamp_zip" {
  type        = "zip"
  source_dir  = "Zip化するソースディレクトリ"
  output_path = "Zipファイルの出力先"
}

# Function
resource "aws_lambda_function" "add_timestamp" {
  function_name = "firehose_add_timestamp"

  handler                        = "function.lambda_handler" #ファイル名.関数名
  filename                       = "${data.archive_file.add_timestamp_zip.output_path}"
  runtime                        = "python3.9"
  role                              = "${aws_iam_role.lambda_iam_role.arn}"
  source_code_hash        = "${data.archive_file.add_timestamp_zip.output_base64sha256}"
  timeout                        = 60 # 推奨値が60秒以上、サポートは5分未満
}

# IAMロール
resource "aws_iam_role" "lambda_iam_role" {
  name = "lambda_iam_role"

  assume_role_policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow",
      "Sid": ""
    }
  ]
}
POLICY
}

# Policy
resource "aws_iam_role_policy" "lambda_access_policy" {
  name   = "lambda_access_policy"
  role   = "${aws_iam_role.lambda_iam_role.id}"
  policy = <<POLICY
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogStream",
        "logs:CreateLogGroup",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}
POLICY
}

Firehose

続いてFirehose関連のリソースを作成します。
コンソールからFirehoseを作成する場合はIAMロールとポリシーの自動作成ができますが、Terraformから作成する場合は手動作成が必要です。
ポリシーの内容は使用する機能によって異なりますが、今回の構成では以下のようになります。変数は適宜設定してください。

また、ダイナミックパーティショニングの設定は、Athenaでスキャンするときの条件に影響するので分析の粒度に応じて設定した方が良さそうです。S3内をフルスキャンする羽目になります。

# ポリシーの作成とアタッチ
resource "aws_iam_policy" "main" {
  name   = "KinesisFirehoseServiceRole"
  policy = <<-EOT
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:AbortMultipartUpload",
            "s3:GetBucketLocation",
            "s3:GetObject",
            "s3:ListBucket",
            "s3:ListBucketMultipartUploads",
            "s3:PutObject"
          ],
          "Resource": [
            "${var.s3_bucket_arn}",
            "${var.s3_bucket_arn}/*"
          ]
        },
        {
          "Effect": "Allow",
          "Action": [
            "kinesis:DescribeStream",
            "kinesis:GetShardIterator",
            "kinesis:GetRecords",
            "kinesis:ListShards"
          ],
          "Resource": "arn:aws:kinesis:ap-northeast-1:${var.aws_account}:stream/${var.stream_name}"
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": "arn:aws:lambda:ap-northeast-1:${var.aws_account}:function:${var.lambda_function_name}:$LATEST"
        },
        {
          "Effect": "Allow",
          "Action": [
            "logs:PutLogEvents"
          ],
          "Resource": [
            "arn:aws:logs:ap-northeast-1:${var.aws_account}:log-group:/aws/kinesisfirehose/${var.stream_name}:log-stream:*"
          ]
        }
      ]
    }
  EOT
}

# ロール作成
resource "aws_iam_role" "main" {
  name = "FirehosePutLogs"
   managed_policy_arns = [
    aws_iam_policy.main.arn
  ]
 
  assume_role_policy = <<EOF
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Principal": {
            "Service": "firehose.amazonaws.com"
          },
          "Effect": "Allow",
          "Sid": ""
        }
      ]
    }
  EOF
}

# Firehoseの作成
resource "aws_kinesis_firehose_delivery_stream" "main" {
  name        = var.stream_name
  destination = "extended_s3"
 
  extended_s3_configuration {
    role_arn            = aws_iam_role.main.arn
    bucket_arn          = var.s3_bucket_arn

    buffer_size = 64 # Dynamic Partitioningの場合は64MB以上
    buffer_interval = 60 # デフォルトは300

    # Dynamic Partitioningを有効化
    dynamic_partitioning_configuration {
      enabled = "true"
    }

    # カテゴリ/時間ごとにグルーピングする
    prefix              = "logs/category=!{partitionKeyFromQuery:category}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
    error_output_prefix = "errors/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}/"

    processing_configuration {
      enabled = "true"

      # Multi-record deaggregation processor example
      processors {
        type = "RecordDeAggregation"
        parameters {
          parameter_name  = "SubRecordType"
          parameter_value = "JSON"
        }
      }
      # New line delimiter processor example
      processors {
        type = "AppendDelimiterToRecord"
      }
      # JQ processor example
      processors {
        type = "MetadataExtraction"
        parameters {
          parameter_name  = "MetadataExtractionQuery"
          parameter_value = "{category:.category}"
        }
        parameters {
          parameter_name  = "JsonParsingEngine"
          parameter_value = "JQ-1.6"
        }
      }
      # Lambda
      processors {
        type = "Lambda"

        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = "${var.lambda_function_arn}:$LATEST"
        }
      }
    }
  }
}

以上でログのPUT先としてのFirehoseと、データ変換処理用のLambda、そしてログ保存用のS3の構築ができました。

テストデータをPUTする

それでは構築した環境に実際にデータをPUTしていきます。

検証用に雑に書いたコードを修正して掲載しています。色々許してください。

  • 環境
    • Python 3.10.0
    • Boto3 1.26.74
    • python-dotenv

python-dotenvでアクセスキーやシークレットキーを設定しておいてください。

import os
from boto3.session import Session
import json
from dotenv import load_dotenv
import random, string

class Logger:
    file = None
    def __init__(self, file_name):
        self.f = open(file_name, 'w')
    def __del__(self):
        self.f.close()
    def write(self, log):
        self.f.write(log.decode())

class AWSLogger:
    client = None
    stream_name = None
    def __init__(self):
        # AWS SDK情報の設定
        session = Session(
        aws_access_key_id=os.environ['ACCESS_KEY'],
        aws_secret_access_key=os.environ['SECRET_ACCESS_KEY'],
        region_name='ap-northeast-1')

        self.client = session.client('firehose')
        self.stream_name = os.environ['STREAM_NAME']
    def write(self, log):
        response = self.client.put_record(
        DeliveryStreamName = self.stream_name,
        Record={'Data': log})
        print(response)

def randomstring(n):
   return ''.join(random.choices(string.ascii_letters + string.digits, k=n))

def encode_to_json(data) -> str:
  json_str = json.dumps(data, default=str)
  return json_str

# ここから処理開始
# 環境変数の読込
load_dotenv()

# ロガー
# logger = Logger('random_log.csv')
logger = AWSLogger()

# ログを繰り返しPUTする
for i in range(50):
    log = {"id": i, "category": "test_blog","message": randomstring(32)}
    logger.write((encode_to_json(log) + "\n").encode())

上記を実行するとダイナミックパーティショニングによりS3のファイルが作成され

ログファイルが出力されます。 以下一部抜粋です。Lambdaの処理によりタイムスタンプが付与できていることが確認できます。(処理が早すぎて同時刻ですが)

{"id": 0, "category": "test_blog", "message": "gYS4lmMDij0OuO9S8gAcVCtwe3g85YEn", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 1, "category": "test_blog", "message": "VKqyuY28vG5VujDG9xRfNBSGI7p76cyo", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 2, "category": "test_blog", "message": "hWYxVdGUelprtcZLYcE8Ych85XC9ubvA", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 3, "category": "test_blog", "message": "kuwI9MtC62zFoj5fyldz92rMfT9M0VlH", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 4, "category": "test_blog", "message": "dNiVp1Ig2ay91E71VlSQkUceBlfqKxP9", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 5, "category": "test_blog", "message": "EMQZY0BaM5qwihLdc4qzJ3kaI7CsGaLj", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 6, "category": "test_blog", "message": "GzrIBKRXWrp6Gpau3PtscUXwukz1TBSQ", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 7, "category": "test_blog", "message": "HULj9qpgKPjxZ4nxOCD1KOnYORJ77WCU", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 8, "category": "test_blog", "message": "BgCwK4V0YUb772Eijx8ifln8uWo3hjdB", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 9, "category": "test_blog", "message": "tf79TKZJoYjKz8S9dpxxsfTWhevX6jMQ", "server_timestamp_utc": "2023-02-24 03:25:57"}
{"id": 10, "category": "test_blog", "message": "gfiuuWZYj3ZArADrNqqWz6yS2gL66twu", "server_timestamp_utc": "2023-02-24 03:25:57"}

おわりに

以上、比較的簡単にログを収集かつグルーピングまで行うことが出来ました。
このあとは、Athenaを使って必要なデータのみ取得し、BIツールに突っ込むような動きを想定しています。

しかし、実際にログを収集するにはプライバシーポリシーの訂正や同意の管理、GDPR対応の検討などシステム外でも色々やることがありますし、有用な分析のためにはログの内容の設計も必要になってきます。
また、アプリ側でのログ送信についてもバッチでまとめて送ったり、エラーハンドリングなど色々考慮することが多そうです。
まだまだやることはありそうですが引き続き色々検証していきます。