日頃の行い

個人的な日頃の行いをつらつら書いてます\\\\ ٩( 'ω' )و ////

GoogleBigQueryとSlackHistoryのpluginを触ってEmbulkにちょっとだけ入門してみた。

Embulk触ってみたいと思いつつ時が流れていたので、Embulkを使って個人で利用しているSlackの過去のチャットをBigQueryにでも流し込んでみようかなとふと思ったのでやってみようとしました。
やってみようと・・・したんです・・・w

※最終的に書き込みは成功してないですw
 最終的なRuntimeで一部がinput(Slack)から読み込んだ後、
 カラム定義が合わず書き込めませんでした。
 Config自体は問題なさそうです。

概要

  1. about Embulk
  2. この後やること概要
  3. BigQuery, SlackHistoryプラグインのインストール
  4. Embulk Configの記述
  5. 実行
  6. 所感

以降は下記のDockerを利用してます。
https://hub.docker.com/r/tarata/centos-with-embulk/

1. about Embulk

そもそもEmbulkとは??
Embulkはデータのバルクローダーです。
たくさんのデータを、例えばどこかのストレージに書き込んで(ロードして)くれるミドルウェアです。
pluginさえあれば、YAML形式のconfigにデータのinput元とoutput先を書けばロードしてくれるという感じです。

github.com

とりあえずやってみないとよくわからないので、githubレポジトリにあるRunning exampleをやってみました。
実行サンプルがあるの、わかりやすくていいですね。

# まずはembulk exampleというコマンドを利用してexampleを用意します。
$embulk example try1
2015-08-15 10:31:42.611 +0000: Embulk v0.6.23
Creating try1 directory...
  Creating try1/
  Creating try1/csv/
  Creating try1/csv/sample_01.csv.gz
  Creating try1/example.yml

Run following subcommands to try embulk:

   1. embulk guess try1/example.yml -o config.yml
   2. embulk preview config.yml
   3. embulk run config.yml
$tree
.
└── try1
    ├── csv
    │   └── sample_01.csv.gz
    └── example.yml

2 directories, 2 files


# guessはpluginに実装されている場合、現在書かれているConfigからConfigを作ってくれるみたいですね。
$embulk guess ./try1/example.yml -o config.yml
2015-08-15 10:34:42.972 +0000: Embulk v0.6.23
2015-08-15 10:34:44.243 +0000 [INFO] (guess): Listing local files at directory '/root/workspace/try1/csv' filtering filename by prefix 'sample_'
2015-08-15 10:34:44.251 +0000 [INFO] (guess): Loading files [/root/workspace/try1/csv/sample_01.csv.gz]
2015-08-15 10:34:44.318 +0000 [INFO] (guess): Loaded plugin embulk/guess/gzip from a load path
2015-08-15 10:34:44.325 +0000 [INFO] (guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: /root/workspace/try1/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}

Created 'config.yml' file.

# ちなみに、実装されていない場合、下記のようなエラーが出ました。
# NotImplementedError: Embulk::Plugin::InputSlackHistory.guess(config) is not implemented. This input plugin does not support guess.
# 2015/08/16現在 InputSlackHistory pluginにはguess機能はないみたいです。
# CSVの場合、カラムの定義などをしてあげる必要ありそうだけど、Slackの場合はSlack側の仕様にデータフォーマットなどは書いてあるので別に必要なさそうな気もしますね。

# previewがあるようです。
# output先に依存せずどういうデータが入るかがわかるの楽でいいですね。
# 毎度stdoutで出さなくて済みます。
$embulk preview config.yml
2015-08-15 10:38:46.926 +0000: Embulk v0.6.23
2015-08-15 10:38:48.236 +0000 [INFO] (preview): Listing local files at directory '/root/workspace/try1/csv' filtering filename by prefix 'sample_'
2015-08-15 10:38:48.243 +0000 [INFO] (preview): Loading files [/root/workspace/try1/csv/sample_01.csv.gz]
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                       NULL |
+---------+--------------+-------------------------+-------------------------+----------------------------+

# 最後に実行です。
$embulk run config.yml
2015-08-15 10:39:14.175 +0000: Embulk v0.6.23
2015-08-15 10:39:16.195 +0000 [INFO] (transaction): Listing local files at directory '/root/workspace/try1/csv' filtering filename by prefix 'sample_'
2015-08-15 10:39:16.204 +0000 [INFO] (transaction): Loading files [/root/workspace/try1/csv/sample_01.csv.gz]
2015-08-15 10:39:16.259 +0000 [INFO] (transaction): {done:  0 / 1, running: 0}
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin
4,11270,2015-01-29 11:54:36,20150129,NULL
2015-08-15 10:39:16.458 +0000 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-08-15 10:39:16.476 +0000 [INFO] (main): Committed.
2015-08-15 10:39:16.476 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/root/workspace/try1/csv/sample_01.csv.gz"},"out":{}}

Exampleとして用意されたCSVを標準出力にバルクロードしたという例でした。

2. この後やること概要

なにかbigqueryに突っ込んでみたかったので、個人で利用しているslackのヒストリーをembulkで突っ込んでみました。
嘘です。実際に突っ込もうとしたところで失敗しました。
ちょっと力が尽きてしまったので、過程をつらつら書いています。

利用したプラグイン

github.com

github.com

3. BigQuery, SlackHistoryプラグインのインストール

インストール簡単

$embulk gem install embulk-input-slack-history
... #略

$embulk gem install embulk-output-bigquery
... #略

4. Embulk Configの記述

Config書いてはrunしてConfig書いてはrunしてたくさん怒られました。
embulkのConfigではなく、bigqueryの設定にハマりました。
最終的に書いたファイルはこんな感じです。
(slackのtoken, service account emailはごまかしてます。)

in:
    type: slack_history
    token: xxxxxxx
out:
    type: bigquery
    service_account_email: xxxxx@developer.gserviceaccount.com
    p12_keyfile_path: ./key.p12
    project: mysamplebqproject
    dataset: slack
    auto_create_table: true
    table: slack_forme
    schema_path: ./schema.json
    formatter:
        type: csv
        header_line: false
    path_prefix: /tmp
    encoders:
        - {type: gzip}
    file_ext: csv.gz

schema.json

[
    {
        "name": "channelid","mode":"REQUIRED", "type":"STRING"
    },
    {
        "name": "channelname","mode":"REQUIRED", "type":"STRING"
    },
    {
        "name": "private","mode":"NULLABLE", "type":"STRING"
    },
    {
        "name": "datetime","mode":"REQUIRED", "type":"STRING"
    },
    {
        "name": "username","mode":"NULLABLE", "type":"STRING"
    },
    {
        "name": "userid","mode":"NULLABLE", "type":"STRING"
    },
    {
        "name": "message","mode":"NULLABLE", "type":"STRING"
    }
]

[tips]bigqueryでハマった所

「invalid_grant」と言われ続ける

service_account_emailがおかしかったみたいです。
下記のURLから対象のプロジェクトを選択したページのサイドメニューから「APIと認証」 -> 「認証情報」を選択し、そこの右側にある「OAuth」の「サービス アカウント」の「メールアドレス」を選ばないといけなかったです。

https://console.developers.google.com/project/

OAuthを作っていない場合は作らないといけないです。

また、p12_keyfile_pathには上記のOAuthの箇所で、「新しいP12キーを生成」を選択してダウンロードしたものを利用しないといけないです。

5. 実行

実行してみたら、Config自体は通ったぽいのですが途中でこんなエラーが(´・ω・`)w
カラム定義がおかしい・・・?slack historyの方がおかしい・・・?
この時の私はここで力尽きました。
もうちょい調査して、原因特定したいです。

$embulk run slack-to-bq.yml
2015-08-15 20:44:32.968 +0900: Embulk v0.6.23
2015-08-15 20:44:40.761 +0900 [INFO] (transaction): Loaded plugin embulk-input-slack-history (0.1.1)
2015-08-15 20:44:40.822 +0900 [INFO] (transaction): Loaded plugin embulk-output-bigquery (0.1.7)
Slack history input started.
2015-08-15 20:44:42.243 +0900 [INFO] (transaction): {done:  0 / 1, running: 0}
2015-08-15 20:44:42.450 +0900 [INFO] (task-0000): Writing file [/tmp.000.00.csv.gz]
2015-08-15 20:45:08.857 +0900 [INFO] (task-0000): Job preparing... project:mysamplebqproject dataset:slack table:slack_forme
2015-08-15 20:45:08.863 +0900 [INFO] (task-0000): table:[slack_forme] will be create if not exists
2015-08-15 20:45:08.871 +0900 [INFO] (task-0000): Upload start [/tmp.000.00.csv.gz]
2015-08-15 20:45:14.914 +0900 [INFO] (task-0000): Upload completed [/tmp.000.00.csv.gz]
2015-08-15 20:45:14.951 +0900 [INFO] (task-0000): Job executed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] file:[/tmp.000.00.csv.gz]
2015-08-15 20:45:15.221 +0900 [INFO] (task-0000): Checking job status... job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] elapsed_time:269ms status:[PENDING]
2015-08-15 20:45:25.475 +0900 [INFO] (task-0000): Checking job status... job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] elapsed_time:10523ms status:[PENDING]
2015-08-15 20:45:35.689 +0900 [INFO] (task-0000): Checking job status... job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] elapsed_time:20737ms status:[PENDING]
2015-08-15 20:45:45.928 +0900 [ERROR] (task-0000): Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED]
2015-08-15 20:45:45.932 +0900 [INFO] (transaction): {done:  1 / 1, running: 0}
2015-08-15 20:45:45.937 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable.
org.embulk.exec.PartialExecutionException: org.embulk.output.BigqueryWriter$JobFailedException: Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED]
    at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:331)
    at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:529)
    at org.embulk.exec.BulkLoader.access$100(org/embulk/exec/BulkLoader.java:36)
    at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:342)
    at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:338)
    at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25)
    at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:338)
    at org.embulk.command.Runner.run(org/embulk/command/Runner.java:166)
    at org.embulk.command.Runner.main(org/embulk/command/Runner.java:117)
    at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:497)
    at RUBY.run(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/command/embulk_run.rb:370)
    at usr.local.rvm.gems.ruby_minus_2_dot_2_dot_2.gems.embulk_minus_0_dot_6_dot_23.lib.embulk.command.embulk.(root)(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/command/embulk.rb:47)
    at usr.local.rvm.gems.ruby_minus_2_dot_2_dot_2.gems.embulk_minus_0_dot_6_dot_23.lib.embulk.command.embulk.(root)(usr/local/rvm/gems/ruby_minus_2_dot_2_dot_2/gems/embulk_minus_0_dot_6_dot_23/lib/embulk/command//usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/command/embulk.rb:47)
Caused by: org.embulk.output.BigqueryWriter$JobFailedException: Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED]
    at org.embulk.output.BigqueryWriter.getJobStatus(org/embulk/output/org/embulk/output/BigqueryWriter.java:99)
    at org.embulk.output.BigqueryWriter.getJobStatusUntilDone(org/embulk/output/org/embulk/output/BigqueryWriter.java:128)
    at org.embulk.output.BigqueryWriter.executeLoad(org/embulk/output/org/embulk/output/BigqueryWriter.java:209)
    at org.embulk.output.BigqueryOutputPlugin$1.finish(org/embulk/output/org/embulk/output/BigqueryOutputPlugin.java:246)
    at org.embulk.standards.GzipFileEncoderPlugin$1.finish(org/embulk/standards/org/embulk/standards/GzipFileEncoderPlugin.java:62)
    at org.embulk.spi.util.OutputStreamFileOutput.finish(org/embulk/spi/util/org/embulk/spi/util/OutputStreamFileOutput.java:58)
    at org.embulk.spi.util.FileOutputOutputStream.close(org/embulk/spi/util/org/embulk/spi/util/FileOutputOutputStream.java:104)
    at sun.nio.cs.StreamEncoder.implClose(sun/nio/cs/sun/nio/cs/StreamEncoder.java:320)
    at sun.nio.cs.StreamEncoder.close(sun/nio/cs/sun/nio/cs/StreamEncoder.java:149)
    at java.io.OutputStreamWriter.close(java/io/java/io/OutputStreamWriter.java:233)
    at java.io.BufferedWriter.close(java/io/java/io/BufferedWriter.java:266)
    at org.embulk.spi.util.LineEncoder.finish(org/embulk/spi/util/org/embulk/spi/util/LineEncoder.java:100)
    at org.embulk.standards.CsvFormatterPlugin$1.finish(org/embulk/standards/org/embulk/standards/CsvFormatterPlugin.java:210)
    at org.embulk.spi.FileOutputRunner$DelegateTransactionalPageOutput.finish(org/embulk/spi/org/embulk/spi/FileOutputRunner.java:178)
    at org.embulk.spi.PageBuilder.finish(org/embulk/spi/org/embulk/spi/PageBuilder.java:223)
    at org.embulk.spi.util.DynamicPageBuilder.finish(org/embulk/spi/util/org/embulk/spi/util/DynamicPageBuilder.java:153)
    at java.lang.reflect.Method.invoke(java/lang/reflect/java/lang/reflect/Method.java:497)
    at RUBY.finish(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/page_builder.rb:68)
    at RUBY.finish(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/page_builder.rb:68)
    at RUBY.run(/root/.embulk/jruby/1.9/gems/embulk-input-slack-history-0.1.1/lib/embulk/input/slack_history.rb:235)
    at RUBY.run(/root/.embulk/jruby/1.9/gems/embulk-input-slack-history-0.1.1/lib/embulk/input/slack_history.rb:235)
    at RUBY.run(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/input_plugin.rb:101)
    at RUBY.run(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/input_plugin.rb:101)
    at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13)
    at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13)
    at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13)
    at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13)
    at org.embulk.spi.util.Executors.process(org/embulk/spi/util/org/embulk/spi/util/Executors.java:59)
    at org.embulk.spi.util.Executors.process(org/embulk/spi/util/org/embulk/spi/util/Executors.java:39)
    at org.embulk.exec.LocalExecutorPlugin$2.call(org/embulk/exec/org/embulk/exec/LocalExecutorPlugin.java:105)
    at org.embulk.exec.LocalExecutorPlugin$2.call(org/embulk/exec/org/embulk/exec/LocalExecutorPlugin.java:101)
    at java.util.concurrent.FutureTask.run(java/util/concurrent/java/util/concurrent/FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/java/util/concurrent/ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/java/util/concurrent/ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(java/lang/java/lang/Thread.java:745)

Error: org.embulk.output.BigqueryWriter$JobFailedException: Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED]

フェー

6. 所感

結局うまくは行ってないのですが・・・w
こんなエラーがもし自分で組んだプログラムで出続けると悲しみを負いそうです。
Embulkはinputのpluginの責務でデータをembulkが扱えるように変換し、
outputのpluginの責務でembulkが扱えるデータをoutput先が扱えるデータに変換して渡す
ということをしてくれていると思っていて、最近ストレージなどが増えてきていると思うので、pluginが再利用性高くできていることを考えると、こういったミドルウェアはいいなぁと思いました。
これがアダプターパターン・・・?なのかな。