Hadoop Streaming での外部ファイルの扱いもだいぶ固まってきました。発表資料のスライドではこの辺の話を書いたことがあるんですが、ブログには書いてなかったので一度きっちりまとめておこうかなーっと。というわけで今回は Hadoop Streaming での外部ファイルの読み込みについてまとめますよ!(*゚Д゚)=3 ムハー

そもそも外部ファイルの読み込みと言っても、この二つのパターンがあります。

1) 外部ファイルが master 上にある(つまりローカルディスクにある)場合
2) 外部ファイルが別ファイルシステム(S3 だったり HDFS だったり)にある場合

外部ファイルが master 上にある場合


まず、この場合はとても簡単です。例えば hoge.txt という外部ファイルを mapper もしくは reducer から扱いたい場合、Hadoop 実行時のコマンドラインの引数に -file hoge.txt と指定してあげれば OK です。こんな感じ。

# コマンドラインにて
hadoop jar xxx.jar \
  -mapper xxx.rb -reducer xxx.rb \
  -file hoge.txt

-file オプションを指定することで、この hoge.txt が master から各 slave へ実行時にコピーされるので、あとは mapper なり reducer なりで普通に File.open してあげるだけ。以前は勉強不足で Hadoop からローカルファイルは扱えないと勘違いしていたんですが、全くそんなことは無いようです。。お恥ずかしい ///

# mapper もしくは reducer にて
out_data = ''
File.open("hoge.txt") {|f|
  out_data = f.read
}

ARGF.each do |line| # ログデータ等が1行ずつ渡ってくる
  line.chomp!

  # out_data を使ったりとかごにょごにょ

end

外部ファイルが別ファイルシステムにある場合


次にこの場合ですが、この場合はちょっと複雑です。このように指定します。先ほどは -file オプションでしたが、この場合は -cacheFile オプションなので注意してください。

# コマンドラインにて
hadoop jar xxx.jar \
  -mapper xxx.rb -reducer xxx.rb \
  -cacheFile s3n://path/to/hoge.txt#hogehoge

# mapper もしくは reducer にて
out_data = ''
File.open("hogehoge") {|f| # hoge.txt じゃないよ!!
  out_data = f.read
}
 
ARGF.each do |line| # ログデータ等が1行ずつ渡ってくる
  line.chomp!
  
  # out_data を使ったりとかごにょごにょ
   
end

ここで注意しないといけないところがあります! -cacheFile オプションで外部ファイルの URI を指定するのですが、その後ろに #hogehoge という謎の文字がくっついています。これは一体何でしょうか?しかもその後 mapper もしくは reducer ではその hogehoge の部分を使っています( # の後ろの部分)。

実はこの -cacheFile オプションを使うと、実行時にまず別ファイルシステムにあるファイルをローカルに保存します。その後はローカルにファイルがある状態なので、外部ファイルが master 上にある場合と同じように File.open するだけでアクセスすることが出来ます。なお、ローカルに持ってくるときにそのファイルはあるパスに置かれるんですが、そのファイルに対するシンボリックリンクが # 以下の名前で 各 mapper, reducer タスクのカレントディレクトリに作成されます。なのでそのシンボリックリンク名でアクセス出来るわけです。

もうちょっと突っ込んだ話


もうちょっと細かく説明すると、-file オプションで指定されたファイルや -cacheFile オプションでコピーされたファイルは slave の以下のパスに置かれます。

# -file でコピーされたファイル
${mapred.local.dir}/taskTracker/jobcache/$jobid/jar 以下

# -cacheFile でコピーされたファイル
${mapred.local.dir}/taskTracker/archive 以下

# 各タスクのカレントディレクトリ
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work

カレントディレクトリの中には mapper とか reducer とか -file オプションでコピーされたファイルとか -cacheFile オプションでコピーされたファイルとかのシンボリックリンクがあって、それぞれ実体ファイルを参照しています。今回の場合だと hogehoge というファイルがカレントディレクトリに置かれているわけです。hoge.txt では無くて。で、hogehoge が実体ファイルである hoge.txt に対してシンボリックリンクを張っています。カレントディレクトリ (work) はタスク開始に伴い作成され、タスクが終了すると削除されます。

決して mapper や reducer のファイルと同じディレクトリに hogehoge があるわけではありません(最初はそう思ってました)。

別ファイルシステムにあるファイルを参照するもう一つの方法


以前は別ファイルシステムにあるデータを mapper, reducer から参照する場合、このように参照していました。

out_data = `hadoop dfs -cat s3://path/to/hoge.txt`

これでももちろん参照することは可能なんですが、毎回別ファイルシステム(別サーバ)との通信が発生してしまうため、slave 毎に最初の一度しか通信の発生しない -cacheFile オプションを使う方が良いでしょう。

というわけで、これだけわかってれば Hadoop Streaming での開発も怖いもんなしですよ!!
このエントリーをはてなブックマークに追加