Skip to content

Instantly share code, notes, and snippets.

@polikutinevgeny
Last active September 13, 2019 02:59
Show Gist options
  • Select an option

  • Save polikutinevgeny/2542eaa86a7164cc430be62ede974025 to your computer and use it in GitHub Desktop.

Select an option

Save polikutinevgeny/2542eaa86a7164cc430be62ede974025 to your computer and use it in GitHub Desktop.
Apache beam NPE
Exception in thread "main" java.lang.NullPointerException
at main.FieldValueGetter$SchemaCodeGen$EDIMbOOQ.get(Unknown Source)
at main.FieldValueGetter$SchemaCodeGen$EDIMbOOQ.get(Unknown Source)
at org.apache.beam.sdk.values.RowWithGetters.getValue(RowWithGetters.java:63)
at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.scanNullFields(RowCoderGenerator.java:245)
at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:230)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$U2iAPosY.encode(Unknown Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$U2iAPosY.encode(Unknown Source)
at org.apache.beam.sdk.coders.RowCoder.encode(RowCoder.java:145)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:81)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
at org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:413)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:370)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:275)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:169)
at main.MainKt.main(main.kt:35)
package main
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.DefaultCoder
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.schemas.JavaBeanSchema
import org.apache.beam.sdk.schemas.SchemaCoder
import org.apache.beam.sdk.schemas.annotations.DefaultSchema
import org.apache.beam.sdk.schemas.annotations.SchemaCreate
import org.apache.beam.sdk.transforms.Create
import org.apache.beam.sdk.transforms.Distinct
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.ParDo
import org.joda.time.DateTime
import javax.annotation.Nullable
@DefaultCoder(SchemaCoder::class)
@DefaultSchema(JavaBeanSchema::class)
data class UserData @SchemaCreate constructor(
@get:Nullable
@setparam:Nullable
var dateTime: DateTime?
)
class NoOpDoFn<InputT, OutputT> : DoFn<InputT, OutputT>() {
@ProcessElement
fun processElement(c: ProcessContext) {
}
}
fun main(args: Array<String>) {
val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(PipelineOptions::class.java)
val p = Pipeline.create(options)
val input = p.apply(Create.of(
UserData(null)
))
input
.apply(Distinct.create())
.apply(ParDo.of(NoOpDoFn<UserData, UserData>()))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment