Morning Girl

Web API, Windows, C#, .NET, Dynamics 365/CRM etc..

Embulk を使ってCSVからMySQLにデータをロード:Windows 環境

最近 Embulk という良さげなETLツールを見つけました。

JDBC DriverもPluginで追加できるという、個人的に大ヒットなすぐれもの。基本的にはLinuxベースで動かす人が多そうですが、クロスプラットフォーム対応しているので Windows でも動かせます。

なので、とりあえずさくっと Windows で動かしてみました。

Embulkとは?

ログファイルやCSVファイルなどの大量データをBlukで各種データストア、DWHなどのサービスへロードできるオープンソースのETLツールです。

f:id:sugimomoto:20190828193500p:plain

特徴的なのはPluginモデルを採用している点で、このPluginを取り込むことで様々なデータソースにInputとOutput処理が対応できます。

例えばCSVからRedisにいれたり、MySQLからCassandraに入れたりと。Pluginの一覧は以下で公開されていますし、JDBCなどの汎用の口を使えば、CData で提供している 150種類以上の JDBC Driver をデータソースにして取り込むこともできます。

Embulk plugin list

ざっくり Embulkを理解したい場合は、以下のスライドと開発者の方のリリースBlogがおすすめです。

www.slideshare.net

frsyuki.hatenablog.com

上記の通り、EmbulkはPluginモデルを採用しています。以下のページから Embluk の Plugin一覧が確認できます。

Embulk のインストール

以下のGithubのQuick Startに詳しく書かれています。

github.com

まず以下のPowerShellコマンドでbatファイルをインストールします。私は「C:\Work\Software\Embulk」に保存。

PowerShell -Command "& {[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::TLS12; Invoke-WebRequest http://dl.embulk.org/embulk-latest.jar -OutFile embulk.bat}"

このままだと使いにくいので環境変数に保存した場所を追加。

f:id:sugimomoto:20190828193509p:plain

とりあえず Quick Start

Embulk はコマンドラインで実行しますが、そこまでコマンドは多くありません。使いたいプラグインをインストールして、どのような処理を行うのか? をYAMLの設定ファイルに記述、そしてそれを実行する、というのが大まかな流れです。

   mkbundle   <directory>                             # create a new plugin bundle environment.
   bundle     [directory]                             # update a plugin bundle environment.
   run        <config.yml>                            # run a bulk load transaction.
   cleanup    <config.yml>                            # cleanup resume state.
   preview    <config.yml>                            # dry-run the bulk load without output and show preview.
   guess      <partial-config.yml> -o <output.yml>    # guess missing parameters to create a complete configuration file.
   gem        <install | list | help>                 # install a plugin or show installed plugins.
   new        <category> <name>                       # generates new plugin template
   migrate    <path>                                  # modify plugin code to use the latest Embulk plugin API
   example    [path]                                  # creates an example config file and csv file to try embulk.
   selfupdate [version]                               # upgrades embulk to the latest released version or to the specified version.

Quick Startに乗っているもので簡単に試すことができるので、それを見てみましょう。

embulk example ./try1 

exampleを実行すると、gz圧縮されたサンプルのCSVファイル(sample_01.csv.gz)と、対象のフォルダに実行用のconfigファイルを生成するための元ファイル(seed.yml)が生成されます。

seed.ymlは以下のようになっていて、InputするファイルとOutput先がざっくり書かれているだけのものです。

in:
  type: file
  path_prefix: 'C:\Work\Blog\20190826_EmbulkTrial\Trial\.\try1\csv\sample_'
out:
  type: stdout

このファイルを元にguess(推測処理)を実施し、設定ファイル(config.yml)を生成します。Embulkの特徴の一つだと思います。これによって取得したいCSVファイルの構成を読み取り、おおまかな設定ファイルが生成できます。

embulk guess   ./try1/seed.yml -o config.yml

生成された結果が以下の通りです。

in:
  type: file
  path_prefix: C:\Work\Blog\20190826_EmbulkTrial\Trial\.\try1\csv\sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

ポイントはdecoders、charaset、newlineなどCSVファイルを読み取って、フォーマット情報を選別し、設定ファイルにピクアップしてくれているところと、CSVのカラム情報などもあらかじめ構成してくれるところでしょうか。このあたりを一つ一つ書くのは大変だと思うので、CSV取り込み時には大変ありがたい機能だと思います。

この設定ファイルを元にOutput先に定義したところへ出力するわけですが、その前にpreviewコマンドで実行結果の想定を確認できます。ここでInputした定義がどのようになっているかが確認できますね。

embulk preview config.yml

f:id:sugimomoto:20190828193522p:plain

最後に run コマンドで、設定ファイルを実行できます。デフォルトのサンプルはコマンドラインに出力するだけですが、以下のような感じです。

embulk run     config.yml

f:id:sugimomoto:20190828193533p:plain

MySQL Output を試す

これだけだとつまらないので MySQL への Outputを試します。

github.com

前述の通り、Embulk は Pluginモデルなので、公開されている・開発したPluginを自由に取り込んで接続先を拡張することができます。

以下のコマンドでMySQLの Output Pluginがインストールできます。

embulk gem install embulk-output-mysql

インストールしているPluginは 以下のコマンドで一覧確認ができます。

embulk gem list

f:id:sugimomoto:20190828193540p:plain

次にconfigファイルをいじります。Outの部分にMySQLの設定を加えました。Modeは色々とありますが、今回はシンプルにInsertだけです。テーブル名に指定した部分は自動的に生成されます。

in:
  type: file
  path_prefix: C:\Work\Blog\20190826_EmbulkTrial\sample_
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: 
    type: mysql
    host: localhost
    database: DatabaseName
    user: UserId
    password: UserPassword
    table: example
    mode: insert

これで embulk run を実行。

f:id:sugimomoto:20190828193550p:plain

こんな感じでデータが生成されました!

f:id:sugimomoto:20190828193557p:plain

次回は JDBC Plugin を使って、 CData JDBC Driver と組み合わせた処理を実行してみたいと思います。

kageura.hatenadiary.jp

Reference

github.com

qiita.com

dev.classmethod.jp

christina04.hatenablog.com