Embulk を使ってCSVからMySQLにデータをロード:Windows 環境
最近 Embulk という良さげなETLツールを見つけました。
JDBC DriverもPluginで追加できるという、個人的に大ヒットなすぐれもの。基本的にはLinuxベースで動かす人が多そうですが、クロスプラットフォーム対応しているので Windows でも動かせます。
なので、とりあえずさくっと Windows で動かしてみました。
Embulkとは?
ログファイルやCSVファイルなどの大量データをBlukで各種データストア、DWHなどのサービスへロードできるオープンソースのETLツールです。
特徴的なのはPluginモデルを採用している点で、このPluginを取り込むことで様々なデータソースにInputとOutput処理が対応できます。
例えばCSVからRedisにいれたり、MySQLからCassandraに入れたりと。Pluginの一覧は以下で公開されていますし、JDBCなどの汎用の口を使えば、CData で提供している 150種類以上の JDBC Driver をデータソースにして取り込むこともできます。
ざっくり Embulkを理解したい場合は、以下のスライドと開発者の方のリリースBlogがおすすめです。
www.slideshare.net
上記の通り、EmbulkはPluginモデルを採用しています。以下のページから Embluk の Plugin一覧が確認できます。
Embulk のインストール
以下のGithubのQuick Startに詳しく書かれています。
まず以下のPowerShellコマンドでbatファイルをインストールします。私は「C:\Work\Software\Embulk」に保存。
PowerShell -Command "& {[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::TLS12; Invoke-WebRequest http://dl.embulk.org/embulk-latest.jar -OutFile embulk.bat}"
このままだと使いにくいので環境変数に保存した場所を追加。
とりあえず Quick Start
Embulk はコマンドラインで実行しますが、そこまでコマンドは多くありません。使いたいプラグインをインストールして、どのような処理を行うのか? をYAMLの設定ファイルに記述、そしてそれを実行する、というのが大まかな流れです。
mkbundle <directory> # create a new plugin bundle environment. bundle [directory] # update a plugin bundle environment. run <config.yml> # run a bulk load transaction. cleanup <config.yml> # cleanup resume state. preview <config.yml> # dry-run the bulk load without output and show preview. guess <partial-config.yml> -o <output.yml> # guess missing parameters to create a complete configuration file. gem <install | list | help> # install a plugin or show installed plugins. new <category> <name> # generates new plugin template migrate <path> # modify plugin code to use the latest Embulk plugin API example [path] # creates an example config file and csv file to try embulk. selfupdate [version] # upgrades embulk to the latest released version or to the specified version.
Quick Startに乗っているもので簡単に試すことができるので、それを見てみましょう。
embulk example ./try1
exampleを実行すると、gz圧縮されたサンプルのCSVファイル(sample_01.csv.gz)と、対象のフォルダに実行用のconfigファイルを生成するための元ファイル(seed.yml)が生成されます。
seed.ymlは以下のようになっていて、InputするファイルとOutput先がざっくり書かれているだけのものです。
in: type: file path_prefix: 'C:\Work\Blog\20190826_EmbulkTrial\Trial\.\try1\csv\sample_' out: type: stdout
このファイルを元にguess(推測処理)を実施し、設定ファイル(config.yml)を生成します。Embulkの特徴の一つだと思います。これによって取得したいCSVファイルの構成を読み取り、おおまかな設定ファイルが生成できます。
embulk guess ./try1/seed.yml -o config.yml
生成された結果が以下の通りです。
in: type: file path_prefix: C:\Work\Blog\20190826_EmbulkTrial\Trial\.\try1\csv\sample_ decoders: - {type: gzip} parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' null_string: 'NULL' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: purchase, type: timestamp, format: '%Y%m%d'} - {name: comment, type: string} out: {type: stdout}
ポイントはdecoders、charaset、newlineなどCSVファイルを読み取って、フォーマット情報を選別し、設定ファイルにピクアップしてくれているところと、CSVのカラム情報などもあらかじめ構成してくれるところでしょうか。このあたりを一つ一つ書くのは大変だと思うので、CSV取り込み時には大変ありがたい機能だと思います。
この設定ファイルを元にOutput先に定義したところへ出力するわけですが、その前にpreviewコマンドで実行結果の想定を確認できます。ここでInputした定義がどのようになっているかが確認できますね。
embulk preview config.yml
最後に run コマンドで、設定ファイルを実行できます。デフォルトのサンプルはコマンドラインに出力するだけですが、以下のような感じです。
embulk run config.yml
MySQL Output を試す
これだけだとつまらないので MySQL への Outputを試します。
前述の通り、Embulk は Pluginモデルなので、公開されている・開発したPluginを自由に取り込んで接続先を拡張することができます。
以下のコマンドでMySQLの Output Pluginがインストールできます。
embulk gem install embulk-output-mysql
インストールしているPluginは 以下のコマンドで一覧確認ができます。
embulk gem list
次にconfigファイルをいじります。Outの部分にMySQLの設定を加えました。Modeは色々とありますが、今回はシンプルにInsertだけです。テーブル名に指定した部分は自動的に生成されます。
in: type: file path_prefix: C:\Work\Blog\20190826_EmbulkTrial\sample_ parser: charset: UTF-8 newline: LF type: csv delimiter: ',' quote: '"' escape: '"' null_string: 'NULL' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: purchase, type: timestamp, format: '%Y%m%d'} - {name: comment, type: string} out: type: mysql host: localhost database: DatabaseName user: UserId password: UserPassword table: example mode: insert
これで embulk run を実行。
こんな感じでデータが生成されました!
次回は JDBC Plugin を使って、 CData JDBC Driver と組み合わせた処理を実行してみたいと思います。