DevDevデブ!!

プログラミングのこととか書きます。多分。。。

aws GlueのETL jobでLTSVをパースするの巻

こんにちは、私です。

最近近所のドラッグストア、ファミマ、西友ICE BOXが品切れ起こしてて困ってます。

ICE BOX難民です。

みんなサクレとかパルム食べててくださいよ。

今回の内容は、まあ表題の通りです。

athenaがLTSVに対応してません

nginxのログをLTSVで出力してて、日次バッチでs3にバックアップしてるんですけど、調査時に過去ログの参照が必要になったときに、いちいちダウンロードしてくるのめんどくさいですよね??

なので、aws athenaでいい感じにログを検索できるようにしたいと思ったわけなんですが、athenaはLTSVに対応してないんですよね。

(LTSVってもしかして日本でしか流行ってないフォーマットなの。。。?)

正確には、lazy.Serdeで正規表現によるパースが可能なんだけど、それでやっちゃうとスキーマが単一のmapになるので、避けたかった

GlueのETL Job(spark)でparquetに変換してしまおう

Embulkでやったほうが多分簡単なんだけど、サーバレスでやりたかったので、GlueのETL Jobでやりました。

大体、以下のような感じ

val ss = glueContext.getSparkSession
val src = sc.textFile("s3://hogehoge/fugafuga/*.gz")
val jsonTextRDD = src.map { (row) =>
    val tupleList = row.split("\t").map { (element) => 
        val splitted = element.split(":")
        val label = splitted(0)
        val value = splitted.tail.mkString(":")
        (label, value)
    }
    JSONObject(tupleList.toMap).toString()
}
val jsonDS = ss.read.json(jsonTextRDD.toDS)
jsonDS.write.parquet("s3://hogehoge/parquet")

JSONObrjectはscala.util.parsing.jsonのやつ scala、sparkともに初心者なので、もっとベターな方法があるのかもしらんが、とりあえず上記のような感じでいけました。

ポイントはいったんjson文字列のRDDを作ってDataSetに変換してから、SparkSessionに読み込ませてるとこですね。

RDD[(String, String)]的なやつからいきなりparquetに出力できるかと思ったけど、できなかったので、いったんjsonに変換してる

GlueContextを使ってないのは、Glue meta catalogに登録しないとデータ参照できないのがめんどくさかったためです。

感想

athenaのLTSV対応、お待ちしています!!!!