こんにちは、 開発本部のドゥックです。
先月末に当社のデータベースPostgreSQL on EC2からBigQueryへのデータ転送を実行しました。この記事では、ETL処理をサポートするサーバーレスツールであるAWSGlueについて説明させていただきたいです。
背景
現在、オンプレミスデータベースは1日1回バックアップされ、バックアップを元にEC2上のPostgreSQLをデータ分析作業のために起動しています。これらをデータ分析作業のためにBigQueryに転送します。
ただし、そのままBigQueryに転送することだけではなく、事前に処理する必要があります。 たとえば、すべてのリリースを含むデータテーブルがあります。各リリースには、企業、タイトル、コンテンツ、発行日などが含まれます。
しかし、公開前のプレスリリースはBigQueryに転送したくないです。そして、ユーザーの個人データを含む情報もマスキングか削除したいです。その際、公開されたリリースをフィルタリングする必要があります。
こういう状況ではAWS Glueを活用できます。

ETLとは?
ETLは、データパイプライニングタイプの1つです。
パイプライニングは簡単に言えば、ある場所から別の場所にデータを取得することを意味します。
Extract-Transform-Loadの頭字語で、毎日データがソースから抽出され(Extract)、変換(Transform)、ターゲットソースに書き出します(Load)。
データは所在もフォーマットもバラバラなままです。この状態では分析に利用できません。ETL処理を行ってから、データウェアハウス(DWH)にデータを統合する必要があります。
Glue Data Catalogでメータデータ管理
Glueは、データカタログとETLの2つの主要セッションとそれらを取り巻く機能に分けられます。データカタログには、次のコンポーネントが含まれています。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/populate-data-catalog.html
Connections
今回はEC2上のPostgreSQLに接続できるようにします。JDBCを使用してそのインスタンスへの接続を確立します。
ファイアウォールルールを設定する
接続したいDBインスタンスにアタッチされているセキュリティグループに移動し、defaultのグループを選択します。
All TCPインバウンドファイアウォールルールが必要ですが、All Trafficもう設定されたので、大丈夫です。
JDBC接続を作成する
Amazon Glueで、JDBC接続を作成します。 次のようになります。
AWS Glue管理画面でconnectionをテストします。
Crawlers
Connectionsを介してアクセスされるデータの場合、Crawlerを起動して実行する必要があります。「PostgreSQL on EC2に接続し、一連のClassifierを通過してスキーマデータを決定し、データカタログのDatabaseにメタデータtableを生成する」という役割を担当します。
また、Crawlerは、unstructured dataやsemi-structured dataなどの複雑なデータ型のスキーマを定義することもできるため、時間を大幅に節約できます。 たとえば、ParquetやAVROの場合、人間が読める形式にすることは困難です。クローラーを使用するとそれを行うのに数分しかかかりません。
Tables
Crawlerを実行した後、tablesを取得します。
一般的なリレーショナルデータベーステーブルではありません。 代わりに、データソースのメタデータテーブル定義です。 プレビュー付きのリンクに少し似ていて、データの場所と、そこにあるデータフィールドとタイプを示します。
Parquet、CSV、JSONなどのS3に保存されているファイルベースのデータと、RDSテーブルなどの従来のデータストアのデータを移すことができます。
Glue ETLでETL周りを管理、実行
Glue Data Catalogで各テーブルのメタデータを収集して決定した後。次のステップは、ETLを実行することです。
次のコンポーネントが含まれています。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/author-job.html
Job
Jobは Glue ETLの中心です。
データカタログに格納されているソースからデータを読み込み、そのデータの変換を実行、ターゲットにロードします。
Jobはコードを自動的に生成することも、PythonまたはScalaでコーディングすることもできます。
また、Glue Studioで直感的な方法でコンポーネントごとにスクリプトを開発できます。
以下はマスキングするため、カスタムコードです。
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
comp_user_df = dfc.select(list(dfc.keys())[0]).toDF()
#Mask column data using encryption technique
#Create encryption func
import hashlib
#Creating a function to encrypt column
def encrypt_fun(col):
enc_value = hashlib.sha256(col.encode()).hexdigest()
return enc_value
#Create UDF for encryption or in other words convert python function to pyspark UDF
from pyspark.sql.functions import udf
encrypt_udf = udf(encrypt_fun)
#Add a new derived column for encrypted columns
enc_comp_user_df = comp_user_df.withColumn('enc_user_id', encrypt_udf('user_id'))
# 削除したいカラムをここで追加する
transformed_df = ...
masked_staff_dyc = DynamicFrame.fromDF(transformed_df, glueContext, "newcustomerdata")
return (DynamicFrameCollection({"CustomTransform0": masked_staff_dyc}, glueContext))
Glue DynamicFrameを理解する
Glue DynamicFrameは、ネイティブのSpark DataFrameをAWSで抽象化したものです。 一言で言えば、DynamicFrameはその場でスキーマを計算し、スキーマの不整合がある場合は、フィールドに複数のタイプを持つことができます。 それらの違いの詳細については、こちらをご覧ください。
継続的なロギングとジョブメトリックを有効にする
ジョブ構成で継続的なロギングとジョブメトリックをオンにします。CloudWatchでログを記述します。
AWS GlueStudioを使用してログを監視できます。
また、CloudWatchログに移動し、ロググループを探します。/aws-glue/jobs/logs-v2:
Trigger
次に、ジョブの実行頻度をスケジュールする必要があります。Triggerはこの役割を果たします。
それらは、スケジューリング、コマンド、または別のジョブからのイベントにによって実行できます。
現在、以下のように1日1回08:05 JPT(23:05 UTC)に実行することを設定しました。
Workflow
ワークフローは、トリガーとジョブの組み合わせです。たとえば、ジョブBは、実行する前にジョブAが完了するのを待つ必要があります。ジョブAを実行するトリガーを作成して、ジョブAが完了してジョブBを実行するまで待機するトリガーを作成するワークフローを作成できます。
まとめ
AWS Glueは、データの移動または変換を必要とする多くのユースケースに最適な効率的で便利なツールです。 開発の過程で、まだ触れていない機能がたくさんあります。 将来的にはそれらを使用できることを願っています。