Last active
August 29, 2015 14:13
-
-
Save theJohnnyBrown/a5a86eaf95302b011f43 to your computer and use it in GitHub Desktop.
PigPen avro loader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| (ns load-avro.core | |
| (:require [clojure.string :as str] | |
| [pigpen.raw :as raw] | |
| [pigpen.core :as pig] | |
| [pigpen.extensions.test :refer [pigsym-zero pigsym-inc]] | |
| [clojure.data.json :refer [read-str]] | |
| [clojure.repl]) | |
| (:import [org.apache.avro | |
| Schema | |
| Schema$Type | |
| Schema$Parser | |
| SchemaNormalization] | |
| [org.apache.avro.file DataFileWriter DataFileReader] | |
| [org.apache.avro.specific SpecificDatumReader] | |
| [pigpen.local PigPenLocalLoader])) | |
| (declare field-names) | |
| (defn prefixed-names [record-schema prefix] | |
| (for [subfield (field-names record-schema)] | |
| (str prefix "." subfield))) | |
| (defn field-names [parsed-schema] | |
| (->> (.getFields parsed-schema) | |
| (map (fn [field] | |
| (let [type (.getType (.schema field))] | |
| (cond | |
| (= type Schema$Type/RECORD) (prefixed-names (.schema field) (.name field)) | |
| ;; pig only supports union of null, <type> | |
| (= type Schema$Type/UNION) (conj (->> (.getTypes (.schema field)) | |
| (filter #(= (.getType %) Schema$Type/RECORD)) | |
| (map (fn [rec] (prefixed-names rec (.name field))))) | |
| (.name field)) | |
| :else (.name field))))) | |
| flatten | |
| vec)) | |
| (defn parse-schema [s] (.parse (Schema$Parser.) s)) | |
| (defn load-avro | |
| ([location] (load-avro location {})) | |
| ([location opts] ;; you can add any other params here, like the args or field list | |
| (let [parsed-schema (parse-schema (:schema opts)) | |
| storage (raw/storage$ ;; this creates a new storage definition | |
| ;; add these jars to $PIG_CLASSPATH (most likely /home/hadoop/pig/lib) | |
| ["json-simple-1.1.1.jar" | |
| "piggybankload.jar" | |
| "avro-1.7.4.jar" | |
| "snappy-java-1.0.4.1.jar" | |
| ] | |
| "org.apache.pig.piggybank.storage.avro.AvroStorage" ;; your loader class | |
| ["schema" (SchemaNormalization/toParsingForm parsed-schema)]) | |
| field-symbols (map symbol (field-names parsed-schema))] | |
| (-> | |
| (raw/load$ ;; this is a raw pig load command | |
| location ;; the location of the data - this should be a string | |
| field-symbols | |
| storage ;; this is the storage we created earlier | |
| opts) ;; just pass the opts through | |
| (raw/bind$ ;; all of the pigpen commands work with a single binary field | |
| ;; called 'value. this changes your fields into a single | |
| ;; serialized value that works with the rest of pigpen | |
| '(pigpen.pig/map->bind vector) ;; vector is the function we use to combine | |
| ;; the fields - you can use anything here. | |
| ;; map->bind wraps the map-like operation as a mapcat-like operation | |
| ;; see the Serialization section here for why we do this: | |
| ;; http://techblog.netflix.com/2014/01/introducing-pigpen-map-reduce-for.html | |
| {:args field-symbols ;; tell it what input fields to use | |
| :field-type-in :native | |
| :field-type-out :native}))))) ;; tell it that the input isn't serialized | |
| (defn attrs-lookup [record attr-names] | |
| (cond | |
| (nil? record) nil | |
| (empty? attr-names) record | |
| :else (attrs-lookup (.get record (first attr-names)) (rest attr-names)))) | |
| (defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage" [{:keys [location fields] :as x}] | |
| (reify PigPenLocalLoader | |
| (locations [_] (pigpen.local/load-list location)) | |
| (init-reader [_ filename] | |
| (-> filename (java.io.File.) (DataFileReader. (SpecificDatumReader.)))) | |
| (read [_ reader] (for [datum reader] (zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields)))) | |
| (close-reader [_ reader] (.close reader)))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment