Embulk触ってみたいと思いつつ時が流れていたので、Embulkを使って個人で利用しているSlackの過去のチャットをBigQueryにでも流し込んでみようかなとふと思ったのでやってみようとしました。
やってみようと・・・したんです・・・w
※最終的に書き込みは成功してないですw
最終的なRuntimeで一部がinput(Slack)から読み込んだ後、
カラム定義が合わず書き込めませんでした。
Config自体は問題なさそうです。
概要
- about Embulk
- この後やること概要
- BigQuery, SlackHistoryプラグインのインストール
- Embulk Configの記述
- 実行
- 所感
以降は下記のDockerを利用してます。
https://hub.docker.com/r/tarata/centos-with-embulk/
1. about Embulk
そもそもEmbulkとは??
Embulkはデータのバルクローダーです。
たくさんのデータを、例えばどこかのストレージに書き込んで(ロードして)くれるミドルウェアです。
pluginさえあれば、YAML形式のconfigにデータのinput元とoutput先を書けばロードしてくれるという感じです。
とりあえずやってみないとよくわからないので、githubレポジトリにあるRunning exampleをやってみました。
実行サンプルがあるの、わかりやすくていいですね。
# まずはembulk exampleというコマンドを利用してexampleを用意します。 $embulk example try1 2015-08-15 10:31:42.611 +0000: Embulk v0.6.23 Creating try1 directory... Creating try1/ Creating try1/csv/ Creating try1/csv/sample_01.csv.gz Creating try1/example.yml Run following subcommands to try embulk: 1. embulk guess try1/example.yml -o config.yml 2. embulk preview config.yml 3. embulk run config.yml $tree . └── try1 ├── csv │ └── sample_01.csv.gz └── example.yml 2 directories, 2 files # guessはpluginに実装されている場合、現在書かれているConfigからConfigを作ってくれるみたいですね。 $embulk guess ./try1/example.yml -o config.yml 2015-08-15 10:34:42.972 +0000: Embulk v0.6.23 2015-08-15 10:34:44.243 +0000 [INFO] (guess): Listing local files at directory '/root/workspace/try1/csv' filtering filename by prefix 'sample_' 2015-08-15 10:34:44.251 +0000 [INFO] (guess): Loading files [/root/workspace/try1/csv/sample_01.csv.gz] 2015-08-15 10:34:44.318 +0000 [INFO] (guess): Loaded plugin embulk/guess/gzip from a load path 2015-08-15 10:34:44.325 +0000 [INFO] (guess): Loaded plugin embulk/guess/csv from a load path in: type: file path_prefix: /root/workspace/try1/csv/sample_ decoders: - {type: gzip} parser: charset: UTF-8 newline: CRLF type: csv delimiter: ',' quote: '"' trim_if_not_quoted: false skip_header_lines: 1 allow_extra_columns: false allow_optional_columns: false columns: - {name: id, type: long} - {name: account, type: long} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: purchase, type: timestamp, format: '%Y%m%d'} - {name: comment, type: string} out: {type: stdout} Created 'config.yml' file. # ちなみに、実装されていない場合、下記のようなエラーが出ました。 # NotImplementedError: Embulk::Plugin::InputSlackHistory.guess(config) is not implemented. This input plugin does not support guess. # 2015/08/16現在 InputSlackHistory pluginにはguess機能はないみたいです。 # CSVの場合、カラムの定義などをしてあげる必要ありそうだけど、Slackの場合はSlack側の仕様にデータフォーマットなどは書いてあるので別に必要なさそうな気もしますね。 # previewがあるようです。 # output先に依存せずどういうデータが入るかがわかるの楽でいいですね。 # 毎度stdoutで出さなくて済みます。 $embulk preview config.yml 2015-08-15 10:38:46.926 +0000: Embulk v0.6.23 2015-08-15 10:38:48.236 +0000 [INFO] (preview): Listing local files at directory '/root/workspace/try1/csv' filtering filename by prefix 'sample_' 2015-08-15 10:38:48.243 +0000 [INFO] (preview): Loading files [/root/workspace/try1/csv/sample_01.csv.gz] +---------+--------------+-------------------------+-------------------------+----------------------------+ | id:long | account:long | time:timestamp | purchase:timestamp | comment:string | +---------+--------------+-------------------------+-------------------------+----------------------------+ | 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk | | 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby | | 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin | | 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | NULL | +---------+--------------+-------------------------+-------------------------+----------------------------+ # 最後に実行です。 $embulk run config.yml 2015-08-15 10:39:14.175 +0000: Embulk v0.6.23 2015-08-15 10:39:16.195 +0000 [INFO] (transaction): Listing local files at directory '/root/workspace/try1/csv' filtering filename by prefix 'sample_' 2015-08-15 10:39:16.204 +0000 [INFO] (transaction): Loading files [/root/workspace/try1/csv/sample_01.csv.gz] 2015-08-15 10:39:16.259 +0000 [INFO] (transaction): {done: 0 / 1, running: 0} 1,32864,2015-01-27 19:23:49,20150127,embulk 2,14824,2015-01-27 19:01:23,20150127,embulk jruby 3,27559,2015-01-28 02:20:02,20150128,Embulk "csv" parser plugin 4,11270,2015-01-29 11:54:36,20150129,NULL 2015-08-15 10:39:16.458 +0000 [INFO] (transaction): {done: 1 / 1, running: 0} 2015-08-15 10:39:16.476 +0000 [INFO] (main): Committed. 2015-08-15 10:39:16.476 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"/root/workspace/try1/csv/sample_01.csv.gz"},"out":{}}
Exampleとして用意されたCSVを標準出力にバルクロードしたという例でした。
2. この後やること概要
なにかbigqueryに突っ込んでみたかったので、個人で利用しているslackのヒストリーをembulkで突っ込んでみました。
嘘です。実際に突っ込もうとしたところで失敗しました。
ちょっと力が尽きてしまったので、過程をつらつら書いています。
利用したプラグイン
3. BigQuery, SlackHistoryプラグインのインストール
インストール簡単
$embulk gem install embulk-input-slack-history ... #略 $embulk gem install embulk-output-bigquery ... #略
4. Embulk Configの記述
Config書いてはrunしてConfig書いてはrunしてたくさん怒られました。
embulkのConfigではなく、bigqueryの設定にハマりました。
最終的に書いたファイルはこんな感じです。
(slackのtoken, service account emailはごまかしてます。)
in: type: slack_history token: xxxxxxx out: type: bigquery service_account_email: xxxxx@developer.gserviceaccount.com p12_keyfile_path: ./key.p12 project: mysamplebqproject dataset: slack auto_create_table: true table: slack_forme schema_path: ./schema.json formatter: type: csv header_line: false path_prefix: /tmp encoders: - {type: gzip} file_ext: csv.gz
schema.json
[ { "name": "channelid","mode":"REQUIRED", "type":"STRING" }, { "name": "channelname","mode":"REQUIRED", "type":"STRING" }, { "name": "private","mode":"NULLABLE", "type":"STRING" }, { "name": "datetime","mode":"REQUIRED", "type":"STRING" }, { "name": "username","mode":"NULLABLE", "type":"STRING" }, { "name": "userid","mode":"NULLABLE", "type":"STRING" }, { "name": "message","mode":"NULLABLE", "type":"STRING" } ]
[tips]bigqueryでハマった所
「invalid_grant」と言われ続ける
service_account_emailがおかしかったみたいです。
下記のURLから対象のプロジェクトを選択したページのサイドメニューから「APIと認証」 -> 「認証情報」を選択し、そこの右側にある「OAuth」の「サービス アカウント」の「メールアドレス」を選ばないといけなかったです。
https://console.developers.google.com/project/
OAuthを作っていない場合は作らないといけないです。
また、p12_keyfile_pathには上記のOAuthの箇所で、「新しいP12キーを生成」を選択してダウンロードしたものを利用しないといけないです。
5. 実行
実行してみたら、Config自体は通ったぽいのですが途中でこんなエラーが(´・ω・`)w
カラム定義がおかしい・・・?slack historyの方がおかしい・・・?
この時の私はここで力尽きました。
もうちょい調査して、原因特定したいです。
$embulk run slack-to-bq.yml 2015-08-15 20:44:32.968 +0900: Embulk v0.6.23 2015-08-15 20:44:40.761 +0900 [INFO] (transaction): Loaded plugin embulk-input-slack-history (0.1.1) 2015-08-15 20:44:40.822 +0900 [INFO] (transaction): Loaded plugin embulk-output-bigquery (0.1.7) Slack history input started. 2015-08-15 20:44:42.243 +0900 [INFO] (transaction): {done: 0 / 1, running: 0} 2015-08-15 20:44:42.450 +0900 [INFO] (task-0000): Writing file [/tmp.000.00.csv.gz] 2015-08-15 20:45:08.857 +0900 [INFO] (task-0000): Job preparing... project:mysamplebqproject dataset:slack table:slack_forme 2015-08-15 20:45:08.863 +0900 [INFO] (task-0000): table:[slack_forme] will be create if not exists 2015-08-15 20:45:08.871 +0900 [INFO] (task-0000): Upload start [/tmp.000.00.csv.gz] 2015-08-15 20:45:14.914 +0900 [INFO] (task-0000): Upload completed [/tmp.000.00.csv.gz] 2015-08-15 20:45:14.951 +0900 [INFO] (task-0000): Job executed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] file:[/tmp.000.00.csv.gz] 2015-08-15 20:45:15.221 +0900 [INFO] (task-0000): Checking job status... job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] elapsed_time:269ms status:[PENDING] 2015-08-15 20:45:25.475 +0900 [INFO] (task-0000): Checking job status... job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] elapsed_time:10523ms status:[PENDING] 2015-08-15 20:45:35.689 +0900 [INFO] (task-0000): Checking job status... job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] elapsed_time:20737ms status:[PENDING] 2015-08-15 20:45:45.928 +0900 [ERROR] (task-0000): Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED] 2015-08-15 20:45:45.932 +0900 [INFO] (transaction): {done: 1 / 1, running: 0} 2015-08-15 20:45:45.937 +0900 [INFO] (main): Transaction partially failed. Cleaning up the intermediate data. Use -r option to make it resumable. org.embulk.exec.PartialExecutionException: org.embulk.output.BigqueryWriter$JobFailedException: Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED] at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(org/embulk/exec/BulkLoader.java:331) at org.embulk.exec.BulkLoader.doRun(org/embulk/exec/BulkLoader.java:529) at org.embulk.exec.BulkLoader.access$100(org/embulk/exec/BulkLoader.java:36) at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:342) at org.embulk.exec.BulkLoader$1.run(org/embulk/exec/BulkLoader.java:338) at org.embulk.spi.Exec.doWith(org/embulk/spi/Exec.java:25) at org.embulk.exec.BulkLoader.run(org/embulk/exec/BulkLoader.java:338) at org.embulk.command.Runner.run(org/embulk/command/Runner.java:166) at org.embulk.command.Runner.main(org/embulk/command/Runner.java:117) at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:497) at RUBY.run(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/command/embulk_run.rb:370) at usr.local.rvm.gems.ruby_minus_2_dot_2_dot_2.gems.embulk_minus_0_dot_6_dot_23.lib.embulk.command.embulk.(root)(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/command/embulk.rb:47) at usr.local.rvm.gems.ruby_minus_2_dot_2_dot_2.gems.embulk_minus_0_dot_6_dot_23.lib.embulk.command.embulk.(root)(usr/local/rvm/gems/ruby_minus_2_dot_2_dot_2/gems/embulk_minus_0_dot_6_dot_23/lib/embulk/command//usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/command/embulk.rb:47) Caused by: org.embulk.output.BigqueryWriter$JobFailedException: Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED] at org.embulk.output.BigqueryWriter.getJobStatus(org/embulk/output/org/embulk/output/BigqueryWriter.java:99) at org.embulk.output.BigqueryWriter.getJobStatusUntilDone(org/embulk/output/org/embulk/output/BigqueryWriter.java:128) at org.embulk.output.BigqueryWriter.executeLoad(org/embulk/output/org/embulk/output/BigqueryWriter.java:209) at org.embulk.output.BigqueryOutputPlugin$1.finish(org/embulk/output/org/embulk/output/BigqueryOutputPlugin.java:246) at org.embulk.standards.GzipFileEncoderPlugin$1.finish(org/embulk/standards/org/embulk/standards/GzipFileEncoderPlugin.java:62) at org.embulk.spi.util.OutputStreamFileOutput.finish(org/embulk/spi/util/org/embulk/spi/util/OutputStreamFileOutput.java:58) at org.embulk.spi.util.FileOutputOutputStream.close(org/embulk/spi/util/org/embulk/spi/util/FileOutputOutputStream.java:104) at sun.nio.cs.StreamEncoder.implClose(sun/nio/cs/sun/nio/cs/StreamEncoder.java:320) at sun.nio.cs.StreamEncoder.close(sun/nio/cs/sun/nio/cs/StreamEncoder.java:149) at java.io.OutputStreamWriter.close(java/io/java/io/OutputStreamWriter.java:233) at java.io.BufferedWriter.close(java/io/java/io/BufferedWriter.java:266) at org.embulk.spi.util.LineEncoder.finish(org/embulk/spi/util/org/embulk/spi/util/LineEncoder.java:100) at org.embulk.standards.CsvFormatterPlugin$1.finish(org/embulk/standards/org/embulk/standards/CsvFormatterPlugin.java:210) at org.embulk.spi.FileOutputRunner$DelegateTransactionalPageOutput.finish(org/embulk/spi/org/embulk/spi/FileOutputRunner.java:178) at org.embulk.spi.PageBuilder.finish(org/embulk/spi/org/embulk/spi/PageBuilder.java:223) at org.embulk.spi.util.DynamicPageBuilder.finish(org/embulk/spi/util/org/embulk/spi/util/DynamicPageBuilder.java:153) at java.lang.reflect.Method.invoke(java/lang/reflect/java/lang/reflect/Method.java:497) at RUBY.finish(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/page_builder.rb:68) at RUBY.finish(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/page_builder.rb:68) at RUBY.run(/root/.embulk/jruby/1.9/gems/embulk-input-slack-history-0.1.1/lib/embulk/input/slack_history.rb:235) at RUBY.run(/root/.embulk/jruby/1.9/gems/embulk-input-slack-history-0.1.1/lib/embulk/input/slack_history.rb:235) at RUBY.run(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/input_plugin.rb:101) at RUBY.run(/usr/local/rvm/gems/ruby-2.2.2/gems/embulk-0.6.23/lib/embulk/input_plugin.rb:101) at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13) at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13) at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13) at Embulk$$InputPlugin$$JavaAdapter_1388218274.run(Embulk$$InputPlugin$$JavaAdapter_1388218274.gen:13) at org.embulk.spi.util.Executors.process(org/embulk/spi/util/org/embulk/spi/util/Executors.java:59) at org.embulk.spi.util.Executors.process(org/embulk/spi/util/org/embulk/spi/util/Executors.java:39) at org.embulk.exec.LocalExecutorPlugin$2.call(org/embulk/exec/org/embulk/exec/LocalExecutorPlugin.java:105) at org.embulk.exec.LocalExecutorPlugin$2.call(org/embulk/exec/org/embulk/exec/LocalExecutorPlugin.java:101) at java.util.concurrent.FutureTask.run(java/util/concurrent/java/util/concurrent/FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(java/util/concurrent/java/util/concurrent/ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java/util/concurrent/java/util/concurrent/ThreadPoolExecutor.java:617) at java.lang.Thread.run(java/lang/java/lang/Thread.java:745) Error: org.embulk.output.BigqueryWriter$JobFailedException: Job failed. job id:[job_Ym0YzqoqKq5YzN7M3d_95vRTeqI] reason:[invalid][Too few columns: expected 7 column(s) but got 1 column(s). For additional help: http://goo.gl/RWuPQ] status:[FAILED]
フェー
6. 所感
結局うまくは行ってないのですが・・・w
こんなエラーがもし自分で組んだプログラムで出続けると悲しみを負いそうです。
Embulkはinputのpluginの責務でデータをembulkが扱えるように変換し、
outputのpluginの責務でembulkが扱えるデータをoutput先が扱えるデータに変換して渡す
ということをしてくれていると思っていて、最近ストレージなどが増えてきていると思うので、pluginが再利用性高くできていることを考えると、こういったミドルウェアはいいなぁと思いました。
これがアダプターパターン・・・?なのかな。