デバイス別アクセスログの集計処理をLambdaに置き換えました

  • URLをコピーしました!

こんにちは、PR TIMESでバックエンドエンジニアをしている宮崎(@sucalul)です。

今回はPR TIMESのデバイス別アクセスログの集計処理をLambdaに置き換えた取り組みを紹介します。

目次

はじめに

PR TIMESではアクセスログを複数の機能で利用していますが、

  • アクセスログの量が多く処理が重い
  • bot判定が処理によってバラバラ

などの問題がありました。

これらを解決するための第一弾としてデバイス別アクセスログの改善を行いました。

デバイス別アクセスログ集計の機能は元々、td-agentのプラグイン(Ruby実装)で処理してRedisに書き込んでいましたが、CDNをCloudFrontからFastlyに移行したことに合わせてLambdaに置き換えました。

あわせて読みたい
PR TIMESのCDNをCloudFrontからFastlyに移行しました こんにちは、インフラチームテックリードの櫻井です。 今回はプレスリリース配信サービスの prtimes.jp で使用しているCDNをCloudFrontからFastlyに移行したことについ...

方針

FastlyとS3とLambdaとDynamoDBを活用します。流れは以下のようになります。

  1. FastlyでアクセスログをS3にPUTする
  2. S3へのアップロードをトリガーにLambdaを呼び出す
  3. Lambdaではbot判定を行いつつ、集計処理を行い、集計結果をDynamoDBに書き込む

構成をまとめると以下のようになります。

また、デバイス別アクセスはPCとスマートフォンなどのアクセス比率が分かればいいので、要件的に多重実行や実行できなかった時のことを考えなくてもよいと判断しました。

FastlyではS3 へのログストリーミングには multipart uploadを使用しており、こちらで指定したPeriod の周期でファイルをクローズするような動作になっているようです。(2023年5月現在)

そのため、Lambdaで設定するトリガーはs3:ObjectCreated:CompleteMultipartUploadにする必要がありました。

参考

Fastly は Amazon S3 にログを継続的にストリームするものの、Amazon S3 のウェブサイトと API では、アップロードが完了するまでファイルにアクセスすることはできません。

https://docs.fastly.com/ja/guides/log-streaming-amazon-s3 より引用

Fastlyの設定

主に以下のことを行いました。

  1. それぞれのログをjson形式にする
  2. 一定時間経過したらgzip形式でS3にアップロードする

詳細はこちらの「アクセスログ出力設定」をご参照ください。

あわせて読みたい
PR TIMESのCDNをCloudFrontからFastlyに移行しました こんにちは、インフラチームテックリードの櫻井です。 今回はプレスリリース配信サービスの prtimes.jp で使用しているCDNをCloudFrontからFastlyに移行したことについ...

DynamoDBの項目

DynamoDBの項目と入る値の例は以下の通りです。

iddatecompanyIdreleaseIdpcsptb
112:22023-02-21112220105

同じ企業ID、同じリリースID、同じ日付へのアクセスがあれば該当デバイスのカウントを加算し、なければ新しいレコードを書き込む仕組みになっています。

Lambdaの実装

Lambdaでは、以下を行います。

  1. S3からアクセスログファイルを取得する
  2. それぞれのログを集計する
  3. DynamoDBに書き込む

S3からアクセスログファイルを取得する

例 Amazon S3 の通知イベントを見ると、Recordsの配列になっていて、ファイルが複数渡されるように見えますが、イベントはファイル毎にトリガーされるため、for文でループはせず、event.Recordsの0番目を取得する実装にしています。

また、gzip.NewReader を使ってgzipファイルを展開しています。

func lambdaHandler(event events.S3Event) error {
	region := os.Getenv("S3_REGION")
	sess, err := session.NewSession(&aws.Config{
		Region: aws.String(region)},
	)
	if err != nil {
		log.Println(err)
		return err
	}
	client := s3.New(sess)

	config, err := aggregate.NewConfig()
	if err != nil {
		log.Println(err)
		return err
	}

	var accessLog aggregate.AccessLog
	outputPlugin := aggregate.NewOutputPlugin(config)

	// オブジェクト取得
	// Lambdaはファイル毎にイベントがtriggerされるため、forでループはせず、event.Recordsの0番目を取得する
	record := event.Records[0]
	decodedKey, err := url.QueryUnescape(record.S3.Object.Key)
	if err != nil {
		log.Println("Failed to decodeKey(). Key: ", record.S3.Object.Key, ". err: ", err)
		return err
	}
	obj, err := client.GetObject(&s3.GetObjectInput{
		Bucket: aws.String(record.S3.Bucket.Name),
		Key:    aws.String(decodedKey),
	})
	if err != nil {
		log.Println("Failed to client.GetObject(). BucketName: ", record.S3.Bucket.Name, ". Key: ", record.S3.Object.Key, ". err: ", err)
		return err
	}

	rc := obj.Body
	defer rc.Close()

	gz, err := gzip.NewReader(rc)
	if err != nil {
		log.Println("Failed to gzip.NewReader(). BucketName: ", record.S3.Bucket.Name, ". Key: ", record.S3.Object.Key, ". err: ", err)
		return err
	}
	defer gz.Close()
}

それぞれのログを集計する

ここではbufio を使い一行ずつ取得します。AddEvent関数で集計処理を行います。

// defer gz.Close()の続き
scanner := bufio.NewScanner(gz)
// ファイルの中身を一行ずつ取得し集計する
for scanner.Scan() {
    data := scanner.Text()
	  err = json.Unmarshal([]byte(data), &accessLog)
if err != nil {
		log.Println("Failed to json.Unmarshal(). data: ", data, ". err: ", err)
		return err
	}
	err = outputPlugin.AddEvent(&accessLog)
	if err != nil {
		// エラーになったわけではなく、単にそのレコードを無視しただけなのでログをはくだけで、returnしない
		log.Println(err)
	}
}

AddEventではNewEgressEvent関数で集計処理を行い、その結果をappendします。

type OutputPlugin struct {
	accessCountRepo *AccessCountRepo
	egressEvents    []*EgressEvent
}

func (p *OutputPlugin) AddEvent(accessLog *AccessLog) error {
	egressEvent, err := NewEgressEvent(accessLog)
	if err != nil {
		// ignore: malformed ingress record
		return err
	}
	p.egressEvents = append(p.egressEvents, egressEvent)
	return nil
}

NewEgressEvent関数内で行う集計処理の一例を紹介します。useragentを使いデバイス、botの判定を行います。

func analyzeAgent(agent string) DeviceType {
	ua := useragent.Parse(agent)
	if ua.Bot {
		return Undefined
	}
	if ua.Mobile {
		return Smartphone
	}
	if ua.Tablet {
		return Tablet
	}
	return PC
}

DynamoDBに書き込む

ファイルの行数分の集計が終われば、DynamoDBに書き込みを行います。

context.WithTimeoutでtimeoutを設定し、Lambdaのtimeoutはこれより長くしています。

// 上のmain.goの続き
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, Timeout)
defer cancel()
err = outputPlugin.Flush(ctx)
if err != nil {
	log.Println(err)
	return err
}

return nil

Flushでは、集計したEgressEventから同じページへのアクセスはグループ化し、dynamo を使い書き込み処理を行いました。

デプロイ

Lambdaが完成したので、デプロイを行います。PR TIMESでは普段からLambdaのデプロイには、lambrollを使っています。Lambdaの設定が書かれたjsonファイルを用意し、makeコマンドを実行することでデプロイできるので非常に便利です。makeコマンドを用意しておく理由は、Makefileで依存関係を明記できるので、Lambdaにデプロイする前に go build でバイナリを生成できます。また、json側で設定する値を環境変数にしておいて、makeコマンドで適切な環境変数を指定したいためです。これによって、ステージング、本番用にmakeコマンドを用意して使い分けることができます。

例えば、以下のようにjsonファイルとMakefileを用意し、make deploy_stg_device_access_log を実行するだけでデプロイが完了します。

{
  "Architectures": [
    "x86_64"
  ],
  "Description": "{{ must_env `DESCRIPTON` }}",
  "Environment": {
    "Variables": {
      "DYNAMODB_REGION": "{{ must_env `DYNAMODB_REGION` }}",
      "DYNAMODB_TABLE_NAME": "{{ must_env `DYNAMODB_TABLE_NAME`}}",
      "S3_REGION": "{{ must_env `S3_REGION` }}"
    }
  },
  "EphemeralStorage": {
    "Size": 512
  },
  "FunctionName": "{{ must_env `FUNCTION_NAME` }}",
  "Handler": "{{ must_env `HANDLER_NAME` }}",
  "MemorySize": 2048,
  "Role": "{{ must_env `FUNCTION_ROLE` }}",
  "Runtime": "go1.x",
  "SnapStart": {
    "ApplyOn": "None"
  },
  "Tags": {
    "Project": "{{ must_env `FUNCTION_NAME` }}"
  },
  "Timeout": 900,
  "TracingConfig": {
    "Mode": "PassThrough"
  }
}
bin/device_access_log: cmd/device_access_log/main.go 
	go build -o bin/device_access_log cmd/device_access_log/main.go

# ステージング環境
.PHONY: deploy_stg_device_access_log
deploy_stg_device_access_log: bin/device_access_log
	DESCRIPTON='stg-description' \
	FUNCTION_NAME=stg-device-access-log \
	HANDLER_NAME='bin/device_access_log' \
	DYNAMODB_REGION='stg-dynamodb-region' \
	DYNAMODB_TABLE_NAME='stg-dynamodb-table-name' \
	S3_REGION='stg-s3-region' \
	FUNCTION_ROLE='stg-function-roll' \
	lambroll deploy --function="function.json"

lambrollはトリガーの設定は行わないので、コンソールからトリガーを設定しデプロイ完了になります。

最後に

今回はデバイス別アクセスログのみの変更でしたが、これをベースに他のアクセスログの集計処理も改善していきます。

会社の雰囲気や技術スタックにご興味がある方は、ぜひ一度カジュアル面談に応募してくださると幸いです。

株式会社PR TIMES
カジュアル面談申し込みフォーム - 株式会社PR TIMES 株式会社PR TIMESでは現在00-0. カジュアル面談(全職種OK)を募集しています。
  • URLをコピーしました!

この記事を書いた人

2022新卒で PR TIMES に入社し、開発本部でバックエンドエンドエンジニアをしています。

目次