Google Cloud Functions + BigQueryでFastly Next-Gen WAFのログを収集する

  • URLをコピーしました!

こんにちは。PR TIMESでインターンをしている笹山雷雅です。

この記事では、Fastly Next-Gen WAFのログをBigQueryで分析するためにGoogle Cloud Functionsでサーバーレスアプリケーションを実装したので、紹介します。

目次

この記事で話すこと

  • Next-Gen WAFのログを取得する実装(Go)
  • サーバーレスな分析サービスの構築例

背景・実現したいこと

PR TIMESでは、データ分析にBigQueryを用いています。BigQueryを用いた他の活用事例は以下の開発者ブログにてご覧いただけます。

あわせて読みたい
Google AnalyticsとBigQueryを連携して高速柔軟な集計を可能に *この記事はベトナム語と日本語で書かれており、2ページ目にベトナム語があります。*Bài viết được viết bằng tiếng Nhật và tiếng Việt, bản tiếng Việt nằm ở trang...
あわせて読みたい
Mongoに溜まった約1.6億の記事データをBigQueryで高速検索! こんにちは、開発本部の植江田和成です。 今回は、WebClipping で使っている MongoDB に保存されていた約1.6億の記事データを BigQuery で検索できるようにしたことにつ...
あわせて読みたい
AuroraからBigQueryへデータ転送する際のシステム構成 こんにちは、21新卒の岩下です。 今回は私が普段開発を担当しているPR TIMES STORYのDBデータをBigQueryへ転送したので、そちらについての話を書いていきたいと思います...
あわせて読みたい
BigQuery Data Transfersをエンジニア全員に対して実行可能にした話 こんにちは。2024年4月に新卒で入社したバックエンドエンジニアの筒井(@tsuttsun_wind)です。今回は、BigQuery Data Transfers(以下Data Transfers)をエンジニア全員が...

運用・監視において重要なアクセスログはすでにBigQueryで閲覧可能となっていましたが、Fastly 製品の一つである Fastly Next-Gen WAF(以下、NGWAF)のログについては、まだ BigQuery で参照できていませんでした。そのため、今回新たにログ連携の仕組みを実装することとなりました。

Next-Gen WAFの詳細はFastlyの公式ページをご覧ください。

あわせて読みたい
App & API Protection | Fastly Next-Gen WAF | Fastly Fastly provides web app and API protection in a single solution with our next-gen WAF. Protect your services and keep applications secure using Fastly. 

PR TIMES では、NGWAF を導入することで、Web アプリケーションに対する高度なセキュリティを確保し、サービスを安定的に提供することを目的として導入しています。

システム概要

FastlyのアクセスログはAWSのS3やGCSに保存する設定がありますが、NGWAFには存在していません。NGWAFログはAPI経由で取得可能ですが、取得したJSONをそのままBigQueryに投入しても(BigQueryで)閲覧しづらいため、適切な形式に整形する必要があります。

今回実装したシステムは、以下の図のようになっています。BigQuery が Google Cloud のサービスの一つであるため、管理・実装のしやすさを考慮して、Google Cloud のサービスのみで実装することにしました。

Fastly APIを使ってNGWAFログを取得し、Cloud Functions、Pub/Sub、Secret Managerを通じてGCSに保存し、さらにBigQueryにデータ転送するシステムのフロー図
今回実装したシステムの概要図

各サービスとその役割

  • Cloud Functions (第 2 世代):Google Cloud のサーバーレスアプリケーションです。FastlyのAPIからデータを取得して、整形するために利用します。今回はシンプルな構成にしたかったので、Cloud Run ではなく、Cloud Functions を採用しました。
    • 発火方式はイベントドリブンを採用しました。HTTP 関数でも可能ですが、今回の例ではどちらを採用しても大きな違いはないと考えています。
  • Google Cloud Storage (以下、GCS):データをファイルとして保存できるサービスです。今回は Cloud Functions で取得したログの一時保存と、多重実行防止のため、アドバイザリーロック(ロックファイル)の保存にも利用しています。
  • Pub/Sub:メッセージ送受信サービスです。今回は Cloud Functions をイベントドリブン関数として定義し、Pub/Sub メッセージを購読します。
  • Cloud Scheduler:Cron のような定期的なイベントを発火できるサービスです。Pub/Sub メッセージの発行を行います。

実装のポイント・詰まったところとその解決法

実装の際に気をつけたことや、実際に詰まったところとどう解決したかを紹介します。

APIの仕様

今回使用するAPIは、Pythonによる実装例が公式ドキュメントで紹介されています。

あわせて読みたい
Extracting your data | Fastly Documentation IMPORTANT: Next-Gen WAF stores requests that contain attacks and anomalies, with some qualifications. If you would like to extract this…

エンドポイントの仕様は、APIリファレンスとして参照できます。

毎時00分に実行し、00分から1時間分のデータを取得するために、0 * * * * と記述し、直近1時間の範囲でログの収集などの作業を行おうとしました。

しかし、APIのクエリパラメータとしてfromuntil にはUNIX時間を渡しますが、ドキュメントに記載されているとおり、直近5分の時間を渡してはいけません。

そのため、今回のAPIでは直近5分は取得できず、400 Bad Requestエラーが返されてしまったので、5分以上ずらすことでエラーが出ないようにする必要があります。今回は余裕を持って10 * * * * とし毎時10分に実行するようにCronを調整しました。

詳しい時間に関する制約は、以下のリンクを参照してください。

あわせて読みたい
Extracting your data | Fastly Documentation IMPORTANT: Next-Gen WAF stores requests that contain attacks and anomalies, with some qualifications. If you would like to extract this…

受け取ったJSONをBigQuery向けにJSONLオブジェクトに変換する

APIから取得するログデータは、JSON形式で取得できます。

{
	"next": {
		"uri": "/api/v0/corps/testcorp/sites/www.example.com/feed/requests?next=cXVlcnlUaGVuRmV0Y2g7Mjs4NDM6cGhsQU1DdHRUTWEtWTJNdFRucVpDZzs4NDI6cGhsQU1DdHRUTWEtWTJNdFRucVpDZzswOw=="
	 },
	"data": [
		{
			"id": "54871be4f749437f4f00008d",
			other fields...
	
	  },
	  {
		  "id": "54871be4f749437f4f000093",
		  other fields...
	  },
	  ...
	]
}

dataにログデータが配列として返るので、BigQueryに転送するためにJSON配列をJSONL形式に変換する必要があります。

ここではjson.RawMessagejson.Compact を使用して変換しています。

具体的な実装は今回のタスクをきっかけにCTOの金子が下記の記事で紹介しています。

HTTPクライアントをカスタムする

GET /corps/{corpName}/sites/{siteName}/feed/requests は、リクエストヘッダにx-api-userx-api-tokenが必要です。

またレスポンス内のログが一定件数を超えるとnext.uri というカラムに残りのログにアクセスできるエンドポイントが返されます。

基本的な実装でも問題ありませんが、アプリケーションコードの流れとして「設定取得→API取得→GCSに書き込む」以外の処理を記述すると、コードが読みづらくなります。具体的な実装として、以下に示す方法を採用しました。

まず、http.Clientやその他APIを取得する際に必要な情報を持つNGWAFHTTPClient を定義します。

type NGWAFHTTPClient struct {
	client  *http.Client
	baseURL *url.URL
	config  *NGWAFConfig
}

http.Client は、Go 言語の標準ライブラリ net/http で提供される HTTP クライアントの実装です。http.Client を使用することで、HTTP リクエストの送信、レスポンスの受信、接続の管理など、HTTP 通信に必要な処理を抽象化できます。

func (c *NGWAFHTTPClient) newRequest(ctx context.Context, method, path string, body io.Reader) (*http.Request, error) {
	req, err := http.NewRequestWithContext(ctx, method, path, body)
	if err != nil {
		return nil, err
	}
	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
	req.Header.Set("x-api-user", c.config.Email)
	req.Header.Set("x-api-token", c.config.Token)

	return req, nil
}

func (c *NGWAFHTTPClient) do(req *http.Request) (*http.Response, error) {
	return c.client.Do(req)
}

newRequest メソッドは、HTTP リクエストを作成し、必要なヘッダーを設定します。do メソッドは、作成されたリクエストを実行し、レスポンスを取得します。

このように、リクエストの作成と実行処理をメソッドに分離することで、主要な処理を行う関数がシンプルになり、コードの見通しが向上します。後述しますが、このコードには冪等性がありません。

func (c *NGWAFHTTPClient) fetchPaginatedData(ctx context.Context, untilTime time.Time) error {
	nextURI := c.baseURL.String()
	for {
		req, err := c.newRequest(
			ctx,
			http.MethodGet,
			nextURI,
			nil,
		)
		if err != nil {
			return err
		}

		resp, err := c.do(req)
		if err != nil {
			return err
		}
		defer resp.Body.Close()

		if resp.StatusCode != http.StatusOK {
		  // エラー時のボディを取得する
			errBodyString, err := io.ReadAll(resp.Body)
			if err != nil {
				return fmt.Errorf("failed to read error response body: %w", err)
			}
			log.Printf("Error response body: %s", errBodyString)
			return fmt.Errorf("failed to fetch paginated data from %s. Status Code: %d", nextURI, resp.StatusCode)
		}

		response := NGWAFResponse{}
		if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
			return err
		}

		var buffer bytes.Buffer
		for _, d := range response.Data {
			json.Compact(&buffer, d)
			buffer.Write([]byte("\\n"))
		}
		
		// 逐次的にStorageにアップロード
		err = writeGCS(ctx, &buffer, untilTime)
		if err != nil {
			return err
		}

		if response.Next.URI == "" {
			break
		}
		nextURI = c.config.Base + response.Next.URI
	}

	return nil
}

重複実行を防ぐ仕組み

アクセスログを取得して保存するときは、ログを重複して挿入しないようにすることが重要です。

Cloud Functionsなどのサーバーレス基盤は、イベントが多重発火したり再試行されることがあるため、明示的な排他制御が必須です。そこでアドバイザリーロック(Advisory Lock)を実装します。

アドバイザリーロックは、共有リソースへのアクセスを調整するために使用される手法の一つです。ここで言うアドバイザリーロックは、同一期間の多重取得を防ぐため、GCSに「ロックファイル」を作成し重複実行を検出する仕組みです。

1時間ごとの実行のため、Cloud Functions実行時、最初に時間単位以下を切り捨てた値を名前の末尾に追加したオブジェクトをGCSに作成します(例 ngwaf-log/lockfiles/ngwaf-log-advisory-lock-202504140200)。

Cloud Functionsの実行で、APIの実行前に同名オブジェクトが存在すると、アプリケーションでエラーを返却して終了させます。

// アドバイザリーロックを取得するための関数
func advisoryLock(ctx context.Context, untilTime time.Time) error {
	lockStorageConfig, err := getLockStorageConfig()
	if err != nil {
		log.Printf("Error getting Lock Storage config: %v", err)
		return fmt.Errorf("error getting Lock Storage config: %w", err)
	}

	// 時間(h)以下を切り捨てたオブジェクト名を作成
	timeStamp := untilTime.Format("200601021504")
	lockName := fmt.Sprintf("%s%s", lockStorageConfig.ObjectName, timeStamp)

	// Storageクライアントの作成
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("failed to create storage client: %w", err)
	}
	defer client.Close()

	// 対象のバケットとオブジェクト情報を取得
	bucket := client.Bucket(lockStorageConfig.BucketName)
	obj := bucket.Object(lockName)
	_, err = obj.Attrs(ctx)

	// オブジェクト取得時に ErrObjectNotExist が返るか (= この時間帯のデータ取得が行われているか) を検証
	if errors.Is(err, storage.ErrObjectNotExist) {
		wc := obj.If(storage.Conditions{DoesNotExist: true}).NewWriter(ctx)

		// ロックファイルを作成
		if _, err := wc.Write([]byte(lockName)); err != nil {
			return fmt.Errorf("failed to write lock data to GCS: %w", err)
		}
		if err := wc.Close(); err != nil {
			return fmt.Errorf("failed to close writer: %w", err)
		}
		log.Printf("Lock object created: %s", lockName)
		return nil
	}

	if err != nil {
		return fmt.Errorf("failed to get object attributes: %w", err)
	}

	return fmt.Errorf("lock object already exists: %s", lockName)
}

冪等性の担保を見送る判断

サーバーレスアプリケーションにおいては、冪等性も重要視されます。

冪等とは、1 回の実行結果と複数回の実行結果が同じになるような操作を指します。

アクセスログの挿入も本来は冪等であるべきですが、そのためには、以下の 2 点を実装する必要があります。

  1. すでに実行されているが、再実行された場合、その実行を中止し、挿入が起こらないようにする。
  2. 1 回目の実行において途中で失敗した場合に、再実行で失敗した段階から処理を復帰させられる。

今回使用した Fastly API のレスポンスデータは、時系列順などの順番の一貫性や、同じ API を呼び出すことによるデータの同一性が保証されていないため、上記の 2 つ目の要件である「失敗した段階から処理を復帰」することが非常に困難です。

また、今回のアプリケーションはすべての NGWAF のログを確実に取得する必要があるわけではなく、仮に冪等性を確保しようとすると実装がかなり複雑になることが予想されたため、「まず一部でもいいからデータを BigQuery で分析できるようにする」ことを目標とし、冪等性の確保を優先しない判断をしました。

具体的には、データの欠損が発生しないようにするには、取得済みのログをデータベースなどで管理し、再実行時に欠損しているデータを特定する処理が必要になります。しかし、Fastly API のレスポンスデータの特性上、この処理を実装することが非常に困難であり、開発コストも高くなることが予想されました。

さらに、取得したデータ全体をメモリ上に展開して処理しようとすると、事前にどれだけのデータ量が取得されるか予測できないため、メモリ不足によるシステム障害のリスクがあります。

そのため、今回は、ログを取得した直後にGCSに書き込む実装にすることで、早期にデータ分析を開始することを優先しました。

環境変数をYAMLで管理する

Cloud Functionsは環境変数に対応しています。

公式ドキュメントで最初に紹介されている設定方法は、deployコマンド時に--set-env-vars として渡す方法です。

しかし、この場合、登録する環境変数が多い場合に記述が煩雑になり、デプロイコマンドのオプションなのでコードのバージョン管理にも適していません。

そこで、別に用意されている--env-vars-file オプションでYAMLファイルを渡す方法を採用しました。

デプロイコマンド

gcloud functions deploy fastly-ngwaf-log-stg \
--gen2 \
--runtime=go121 \
--timeout=540 \
--trigger-topic=ngwaf_log_stg \
--trigger-service-account=<acccount mail address> \
--source=. \
--entry-point=fetchNGWAFData \
--env-vars-file=.env.stg.vars.yaml # 環境変数をまとめたファイルを渡すオプション

YAMLファイル例

NGWAF_EMAIL: "abcde@example.com"
NGWAF_CORP: ""
NGWAF_SITE: ""
NGWAF_BASE: ""
GCS_BUCKET_NAME: ""
GCS_OBJECT_NAME_PREFIX: "logs/"
GCS_LOCK_OBJECT_NAME_PREFIX: "lockfiles/"

ファイルにすることでGitでの管理が容易になりました。

機密情報にはSecret Managerを使う

Fastly APIにアクセスするにはアクセストークンとメールアドレスによる認証情報を渡す必要があります。

トークンが公開されてしまうと、そのトークンを用いて秘匿情報にアクセスするなど悪用される可能性があるため、前の節で述べたようなYAMLなどファイルに配置して管理はできません。

このようなケースにおいて、Google CloudではSecret Managerを使用できます。

Secret Managerで作成した値は、projects/314159265/secrets/HOGE のような形式のキー(Secret Managerのトップページから作成したシークレットの詳細を開くとコピーできます)を用いて以下のようにアクセスします。

func getSecret(ctx context.Context, tokenName string) (string, error) {
	client, err := secretmanager.NewClient(ctx)
	if err != nil {
		return "", fmt.Errorf("failed to create secret manager client: %w", err)
	}
	defer client.Close()

	req := &secretmanagerpb.AccessSecretVersionRequest{
		// シークレット名に "/versions/latest" を追加
		Name: fmt.Sprintf("%s/versions/latest", tokenName),
	}

	result, err := client.AccessSecretVersion(ctx, req)
	if err != nil {
		return "", fmt.Errorf("failed to access secret version: %w", err)
	}

	return string(result.Payload.Data), nil
}

Secret ManagerをCloud Functionsで使う場合、デプロイ時に渡すサービスアカウントがSecret Managerへのアクセス権限があるかを確認しましょう。

「IAM」にアクセスして設定したサービスアカウントの行のロールに「Secret Manager のシークレットアクセサー」が存在していれば問題ありません。

Google CloudのSecret Managerでのロール設定を示す画面。'Secret Manager のシークレット アクセサー'と'編集者'の役割が表示されている。
サービスアカウントに必要なロール

BigQueryへの転送を設定する+データに合わせたテーブルを作成

取得したログをGCSからBigQueryにData Transferを使って転送します。

BigQueryのデータ転送オプションを示す画面のスクリーンショット、左側のメニューにデータ転送セクションが強調表示されている。
BigQueryのコンソールのペイン

BigQuery Studioの左のペインから「データ転送」を押すと転送のダッシュボードが表示されるので、そこで作成できます。

転送プランの作成前に、データ転送先のデータセットおよびテーブルを作成する必要がありますが、BigQueryのテーブルではパーティションを設定することをPR TIMESでは必須にしています。特に指定しない場合、転送時のタイムスタンプが新しいカラムとして追加され、パーティションキーとして割り当てられます。

パーティションキーを設定すると、クエリの条件に応じてアクセスする範囲を絞ってアクセスできるため、パフォーマンスが高くなります。また、BigQueryはクエリ実行時にスキャンしたデータに応じた料金が発生する形態のため、使用料金を抑える点においてもパーティション設定が重要です。

転送時間がパーティションキーの場合、同一データが挿入されても転送時間が異なる場合に違うレコードとして挿入されてしまうので、アクセスログ分析のためのテーブルとしては望ましくありません。

BigQueryのパーティションに関する詳しい説明は以下のドキュメントを参照してください。

Google Cloud Documentation
パーティション分割テーブルの概要  |  BigQuery  |  Google Cloud Documentation BigQuery のパーティション分割テーブルとそのタイプ、制限事項、割り当て、料金、セキュリティについて説明します。

そのため、ログ自体が持つタイムスタンプなど(アクセスログの場合、アクセス日時など)をパーティションキーに設定したいです。そのためには転送元にあるデータの構造に対応できるようなスキーマを作成時に定義する必要があります。

BigQuery へのデータ転送の設定は、以下の手順で行います。

  1. BigQuery コンソールで、データを格納するデータセットを作成します。
  2. データセット内に、転送先のテーブルを作成します。テーブルのスキーマは、転送元データの構造に合わせて定義します。
  3. BigQuery Data Transfer Service の画面で、新しい転送構成を作成します。
  4. 転送元として Google Cloud Storage を選択し、GCS バケットのパスを指定します。(hoge/*と設定するとディレクトリ以下の任意のファイルを転送できます)。
  5. 転送先のデータセットとテーブルを指定します。
  6. 必要に応じて、転送スケジュールやパーティション設定などを指定します。

データ転送時のデバッグ

テーブルのスキーマを定義し転送を行った時に発生したエラーとデバッグについて紹介します。

発生したエラーは以下のとおりです。

  • エラー 1: 「failed with error INVALID_ARGUMENT: Error while reading data, error message: JSON table encountered too many errors, giving up.」
  • エラー 2: 「Error while reading data, error message: JSON processing encountered too many errors, giving up. Rows: 1; errors: 1; max bad: 0; error percent: 0 gs://prtimes-ngwaf-log/ngwaf-log: Error while reading data, error message: JSON parsing error in row starting at position 0: Nested」

これらのエラーは、テーブルの定義と GCS のオブジェクトの形式が一致しないために発生していると考えられます。

ローカル環境でデバッグできる bq load コマンドを使用し、最小構成のデータを挿入しながら、問題を引き起こすカラムを検証しました。

テーブルは削除して作り直したり、仮のテーブルで試すなどして対応しました。

bq load \
--source_format=NEWLINE_DELIMITED_JSON \
--schema=./schema.json \
ngwaf_logs.stgngwaflog \
temp.data.json #temp.data.jsonの怪しいカラムを消したり増やしたりして結果を確かめる

エラーが出ていた原因は、ログごとに格納されるデータが異なり、事前にスキーマで定義できないカラムでした。そこで、カラムの型をJSON に指定することで解決しました。

まとめ

本記事では、Fastly Next-Gen WAF のログをGoogle Cloud Functionsを活用してサーバーレスかつ安全にBigQueryへ連携する仕組みを構築した事例を紹介しました。

APIポリシーやサーバーレスの性質上冪等性の確保、排他制御、データスキーマとの突き合わせによる落とし穴など、様々な課題に直面しました。本記事が同様の構成を考えている方や、Google Cloud×Fastly連携のTipsとなれば幸いです。

  • URLをコピーしました!

この記事を書いた人

インターンとしてバックエンドを担当しています

目次