Skip to content

Instantly share code, notes, and snippets.

@theJohnnyBrown
Last active August 29, 2015 14:13
Show Gist options
  • Select an option

  • Save theJohnnyBrown/a5a86eaf95302b011f43 to your computer and use it in GitHub Desktop.

Select an option

Save theJohnnyBrown/a5a86eaf95302b011f43 to your computer and use it in GitHub Desktop.
PigPen avro loader
(ns load-avro.core
(:require [clojure.string :as str]
[pigpen.raw :as raw]
[pigpen.core :as pig]
[pigpen.extensions.test :refer [pigsym-zero pigsym-inc]]
[clojure.data.json :refer [read-str]]
[clojure.repl])
(:import [org.apache.avro
Schema
Schema$Type
Schema$Parser
SchemaNormalization]
[org.apache.avro.file DataFileWriter DataFileReader]
[org.apache.avro.specific SpecificDatumReader]
[pigpen.local PigPenLocalLoader]))
(declare field-names)
(defn prefixed-names [record-schema prefix]
(for [subfield (field-names record-schema)]
(str prefix "." subfield)))
(defn field-names [parsed-schema]
(->> (.getFields parsed-schema)
(map (fn [field]
(let [type (.getType (.schema field))]
(cond
(= type Schema$Type/RECORD) (prefixed-names (.schema field) (.name field))
;; pig only supports union of null, <type>
(= type Schema$Type/UNION) (conj (->> (.getTypes (.schema field))
(filter #(= (.getType %) Schema$Type/RECORD))
(map (fn [rec] (prefixed-names rec (.name field)))))
(.name field))
:else (.name field)))))
flatten
vec))
(defn parse-schema [s] (.parse (Schema$Parser.) s))
(defn load-avro
([location] (load-avro location {}))
([location opts] ;; you can add any other params here, like the args or field list
(let [parsed-schema (parse-schema (:schema opts))
storage (raw/storage$ ;; this creates a new storage definition
;; add these jars to $PIG_CLASSPATH (most likely /home/hadoop/pig/lib)
["json-simple-1.1.1.jar"
"piggybankload.jar"
"avro-1.7.4.jar"
"snappy-java-1.0.4.1.jar"
]
"org.apache.pig.piggybank.storage.avro.AvroStorage" ;; your loader class
["schema" (SchemaNormalization/toParsingForm parsed-schema)])
field-symbols (map symbol (field-names parsed-schema))]
(->
(raw/load$ ;; this is a raw pig load command
location ;; the location of the data - this should be a string
field-symbols
storage ;; this is the storage we created earlier
opts) ;; just pass the opts through
(raw/bind$ ;; all of the pigpen commands work with a single binary field
;; called 'value. this changes your fields into a single
;; serialized value that works with the rest of pigpen
'(pigpen.pig/map->bind vector) ;; vector is the function we use to combine
;; the fields - you can use anything here.
;; map->bind wraps the map-like operation as a mapcat-like operation
;; see the Serialization section here for why we do this:
;; http://techblog.netflix.com/2014/01/introducing-pigpen-map-reduce-for.html
{:args field-symbols ;; tell it what input fields to use
:field-type-in :native
:field-type-out :native}))))) ;; tell it that the input isn't serialized
(defn attrs-lookup [record attr-names]
(cond
(nil? record) nil
(empty? attr-names) record
:else (attrs-lookup (.get record (first attr-names)) (rest attr-names))))
(defmethod pigpen.local/load "org.apache.pig.piggybank.storage.avro.AvroStorage" [{:keys [location fields] :as x}]
(reify PigPenLocalLoader
(locations [_] (pigpen.local/load-list location))
(init-reader [_ filename]
(-> filename (java.io.File.) (DataFileReader. (SpecificDatumReader.))))
(read [_ reader] (for [datum reader] (zipmap fields (map #(attrs-lookup datum (str/split (name %) #"\.")) fields))))
(close-reader [_ reader] (.close reader))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment