- Overview
- TypeInformation: The Foundation
- TypeSerializer: The Workhorse
- The Complete Flow
- Detailed Example Walkthrough
- State Evolution and Migration
- Architecture Insights
Flink's state management relies on a sophisticated type system that bridges Java's object model with efficient serialization. The two core abstractions are:
- TypeInformation: Type metadata + serializer factory
- TypeSerializer: Actual serialization/deserialization logic
Flink needs to:
- Serialize state for checkpointing and savepoints
- Support arbitrary user-defined types
- Enable state schema evolution across job versions
- Abstract different state backend implementations (Heap, RocksDB)
- Optimize serialization performance
The TypeInformation → TypeSerializer design achieves all of this through a clean separation of concerns.
Location: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
TypeInformation is an abstract class that describes the type of data Flink processes. It serves as a factory for TypeSerializers.
public abstract class TypeInformation<T> implements Serializable {
/**
* Creates a serializer for this type.
* This is the key method that bridges TypeInformation to TypeSerializer.
*/
public abstract TypeSerializer<T> createSerializer(SerializerConfig config);
// Type metadata
public abstract Class<T> getTypeClass();
public abstract int getArity();
public abstract int getTotalFields();
public abstract boolean isBasicType();
public abstract boolean isKeyType();
}Location: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
public class BasicTypeInfo<T> extends TypeInformation<T> {
private final TypeSerializer<T> serializer;
// Singleton instances for common types
public static final BasicTypeInfo<Integer> INT_TYPE_INFO =
new BasicTypeInfo<>(
Integer.class,
new Class<?>[] {Long.class, Float.class, Double.class},
IntSerializer.INSTANCE, // Pre-created serializer
IntComparator.class);
public static final BasicTypeInfo<String> STRING_TYPE_INFO =
new BasicTypeInfo<>(
String.class,
new Class<?>[] {},
StringSerializer.INSTANCE,
StringComparator.class);
@Override
public TypeSerializer<T> createSerializer(SerializerConfig config) {
return this.serializer; // Returns the singleton serializer
}
}Key Point: For basic types, the serializer is pre-created and reused (singleton pattern).
Location: flink-core/src/main/java/org/apache/flink/api/java/typeutils/ListTypeInfo.java
public class ListTypeInfo<T> extends TypeInformation<List<T>> {
private final TypeInformation<T> elementTypeInfo;
public ListTypeInfo(TypeInformation<T> elementTypeInfo) {
this.elementTypeInfo = elementTypeInfo;
}
@Override
public TypeSerializer<List<T>> createSerializer(SerializerConfig config) {
// Recursively create serializer for element type
TypeSerializer<T> elementSerializer = elementTypeInfo.createSerializer(config);
// Compose element serializer into list serializer
return new ListSerializer<>(elementSerializer);
}
}Key Point: Composite types create serializers by composing serializers of their constituent types.
- PojoTypeInfo: For Plain Old Java Objects (POJOs)
- TupleTypeInfo: For Flink Tuple types
- MapTypeInfo: For Map types
- GenericTypeInfo: Fallback using Kryo serialization
Location: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
TypeSerializer is the abstract class that performs the actual serialization and deserialization of data.
public abstract class TypeSerializer<T> implements Serializable {
// ========== Serialization/Deserialization ==========
/**
* Serialize a record to the given target output view.
*/
public abstract void serialize(T record, DataOutputView target) throws IOException;
/**
* Deserialize a record from the given source input view.
*/
public abstract T deserialize(DataInputView source) throws IOException;
/**
* Deserialize a record into a reusable object instance.
* This enables object reuse to reduce GC pressure.
*/
public abstract T deserialize(T reuse, DataInputView source) throws IOException;
// ========== Copying ==========
/**
* Create a deep copy of the given element.
*/
public abstract T copy(T from);
/**
* Copy into a reusable object instance.
*/
public abstract T copy(T from, T reuse);
/**
* Copy data from source to target without deserializing.
* This is used for efficient data shuffling.
*/
public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
// ========== Metadata ==========
/**
* Whether the type is immutable.
*/
public abstract boolean isImmutableType();
/**
* Create a duplicate of this serializer.
* Required for thread-safety.
*/
public abstract TypeSerializer<T> duplicate();
/**
* Get the length of the serialized form (-1 if variable length).
*/
public abstract int getLength();
// ========== State Evolution Support ==========
/**
* Create a snapshot of this serializer's configuration.
* This is used for state schema evolution.
*/
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}Location: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java
public final class IntSerializer extends TypeSerializerSingleton<Integer> {
public static final IntSerializer INSTANCE = new IntSerializer();
@Override
public void serialize(Integer record, DataOutputView target) throws IOException {
target.writeInt(record); // Write 4 bytes
}
@Override
public Integer deserialize(DataInputView source) throws IOException {
return source.readInt(); // Read 4 bytes
}
@Override
public Integer copy(Integer from) {
return from; // Integers are immutable
}
@Override
public boolean isImmutableType() {
return true;
}
@Override
public int getLength() {
return 4; // Fixed length: 4 bytes
}
@Override
public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
return new IntSerializerSnapshot();
}
}Serialization Format:
Bytes: [00 00 00 2A] // Integer 42 in big-endian
Location: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java
public final class StringSerializer extends TypeSerializerSingleton<String> {
public static final StringSerializer INSTANCE = new StringSerializer();
@Override
public void serialize(String record, DataOutputView target) throws IOException {
target.writeUTF(record); // Length prefix + UTF-8 bytes
}
@Override
public String deserialize(DataInputView source) throws IOException {
return source.readUTF();
}
@Override
public int getLength() {
return -1; // Variable length
}
}Serialization Format:
For "hello":
Bytes: [00 05 68 65 6C 6C 6F]
| |
| └─ UTF-8 bytes for "hello"
└────── Length prefix (5)
Location: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
public final class ListSerializer<T> extends TypeSerializer<List<T>> {
private final TypeSerializer<T> elementSerializer;
public ListSerializer(TypeSerializer<T> elementSerializer) {
this.elementSerializer = elementSerializer;
}
@Override
public void serialize(List<T> list, DataOutputView target) throws IOException {
final int size = list.size();
target.writeInt(size); // Write length prefix
// Serialize each element
for (T element : list) {
elementSerializer.serialize(element, target);
}
}
@Override
public List<T> deserialize(DataInputView source) throws IOException {
final int size = source.readInt(); // Read length prefix
final List<T> list = new ArrayList<>(size + 1);
// Deserialize each element
for (int i = 0; i < size; i++) {
list.add(elementSerializer.deserialize(source));
}
return list;
}
@Override
public TypeSerializer<List<T>> duplicate() {
TypeSerializer<T> duplicateElement = elementSerializer.duplicate();
return new ListSerializer<>(duplicateElement);
}
}Serialization Format:
For List.of(42, 100, 200) with IntSerializer:
Bytes: [00 00 00 03] [00 00 00 2A] [00 00 00 64] [00 00 00 C8]
| | | |
└─ size (3) └─ 42 └─ 100 └─ 200
Let's trace how TypeInformation and TypeSerializer work together in the state management lifecycle.
Location: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
StateDescriptor is the user-facing configuration for state. It holds either TypeInformation or a TypeSerializer directly.
public abstract class StateDescriptor<S extends State, T> {
protected final String name;
private final AtomicReference<TypeSerializer<T>> serializerAtomicReference;
@Nullable private TypeInformation<T> typeInfo;
// Constructor with TypeInformation
protected StateDescriptor(String name, TypeInformation<T> typeInfo, T defaultValue) {
this.name = name;
this.typeInfo = typeInfo;
this.serializerAtomicReference = new AtomicReference<>();
this.defaultValue = defaultValue;
}
// Constructor with TypeSerializer (advanced use)
protected StateDescriptor(String name, TypeSerializer<T> serializer, T defaultValue) {
this.name = name;
this.typeInfo = null;
this.serializerAtomicReference = new AtomicReference<>(serializer);
this.defaultValue = defaultValue;
}
/**
* Lazy initialization of serializer from TypeInformation.
* Called by the state backend when state is first accessed.
*/
public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
if (serializerAtomicReference.get() == null) {
if (typeInfo == null) {
throw new IllegalStateException("No type information and no serializer");
}
// CREATE SERIALIZER FROM TYPE INFORMATION
TypeSerializer<T> serializer = typeInfo.createSerializer(
executionConfig.getSerializerConfig());
serializerAtomicReference.compareAndSet(null, serializer);
}
}
/**
* Get the serializer (duplicated for thread-safety).
*/
public TypeSerializer<T> getSerializer() {
TypeSerializer<T> serializer = serializerAtomicReference.get();
if (serializer != null) {
return serializer.duplicate();
}
throw new IllegalStateException("Serializer not initialized");
}
}Example - ValueStateDescriptor:
public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
// User typically uses this constructor
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {
super(name, typeInfo, null);
}
// Advanced: direct serializer specification
public ValueStateDescriptor(String name, TypeSerializer<T> typeSerializer) {
super(name, typeSerializer, null);
}
}Location: flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
When user code calls getRuntimeContext().getState(descriptor), the backend creates or retrieves the state.
public <N, S extends State, V> S getOrCreateKeyedState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
// Check if state already exists
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
// INITIALIZE SERIALIZER FROM TYPE INFORMATION
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
// Create state with TTL wrapping if configured
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
// Register state
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
}
return (S) kvState;
}Location: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
The heap backend maintains a StateTable for each state, which stores the actual data.
private <N, V> StateTable<K, N, V> tryRegisterStateTable(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<?, V> stateDesc,
StateSnapshotTransformFactory<V> snapshotTransformFactory) throws Exception {
StateTable<K, N, V> stateTable = registeredKVStates.get(stateDesc.getName());
TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();
if (stateTable != null) {
// State exists from restore - check serializer compatibility
RegisteredKeyValueStateBackendMetaInfo<N, V> restoredKvMetaInfo =
stateTable.getMetaInfo();
TypeSerializerSchemaCompatibility<V> stateCompatibility =
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException(
"The new state serializer is incompatible to read previous state");
}
// Handle migration if needed
if (stateCompatibility.isCompatibleAfterMigration()) {
migrateStateValues(stateTable, stateCompatibility);
}
} else {
// Create new state table with metadata
RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo =
new RegisteredKeyValueStateBackendMetaInfo<>(
stateDesc.getType(),
stateDesc.getName(),
namespaceSerializer,
newStateSerializer, // SERIALIZER IS ATTACHED HERE
snapshotTransformFactory);
stateTable = stateTableFactory.newStateTable(keyContext, newMetaInfo, keySerializer);
registeredKVStates.put(stateDesc.getName(), stateTable);
}
return stateTable;
}Location: flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyValueStateBackendMetaInfo.java
This class holds the serializers and creates snapshots for checkpointing.
public class RegisteredKeyValueStateBackendMetaInfo<N, S> {
private final StateDescriptor.Type stateType;
private final String name;
private final StateSerializerProvider<N> namespaceSerializerProvider;
private final StateSerializerProvider<S> stateSerializerProvider;
public RegisteredKeyValueStateBackendMetaInfo(
StateDescriptor.Type stateType,
String name,
TypeSerializer<N> namespaceSerializer,
TypeSerializer<S> stateSerializer) {
this.stateType = stateType;
this.name = name;
this.namespaceSerializerProvider =
StateSerializerProvider.fromNewRegisteredSerializer(namespaceSerializer);
this.stateSerializerProvider =
StateSerializerProvider.fromNewRegisteredSerializer(stateSerializer);
}
/**
* Create a snapshot for checkpointing.
* This captures the serializer configuration at checkpoint time.
*/
public StateMetaInfoSnapshot snapshot() {
TypeSerializer<N> namespaceSerializer = getNamespaceSerializer();
TypeSerializer<S> stateSerializer = getStateSerializer();
Map<String, String> optionsMap = Map.of(
"state-type", stateType.toString(),
"state-name", name
);
Map<String, TypeSerializerSnapshot<?>> serializerSnapshots = Map.of(
"namespace-serializer", namespaceSerializer.snapshotConfiguration(),
"value-serializer", stateSerializer.snapshotConfiguration()
);
return new StateMetaInfoSnapshot(name, stateType, optionsMap, serializerSnapshots);
}
}Location: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
The actual state implementation that users interact with.
class HeapValueState<K, N, V> extends AbstractHeapState<K, N, V>
implements InternalValueState<K, N, V> {
private HeapValueState(
StateTable<K, N, V> stateTable,
TypeSerializer<K> keySerializer,
TypeSerializer<V> valueSerializer,
TypeSerializer<N> namespaceSerializer,
V defaultValue) {
super(stateTable, keySerializer, valueSerializer, namespaceSerializer, defaultValue);
}
@Override
public V value() {
// In heap backend, values are stored deserialized
// No serialization overhead on reads!
final V result = stateTable.get(currentNamespace);
return result == null ? getDefaultValue() : result;
}
@Override
public void update(V value) {
if (value == null) {
clear();
return;
}
// Store as object in heap
// Serialization only happens during checkpointing
stateTable.put(currentNamespace, value);
}
}Location: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
When a checkpoint is triggered, the serializer finally performs actual serialization.
// In CopyOnWriteStateTableSnapshot
public void writeStateInKeyGroup(DataOutputView dov, int keyGroupId) {
TypeSerializer<K> keySerializer = stateTable.keySerializer;
TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
// Get all entries for this key group
Iterable<StateEntry<K, N, S>> entries = getEntriesForKeyGroup(keyGroupId);
int numEntries = countEntries(entries);
dov.writeInt(numEntries); // Write count
// Serialize each entry: namespace, key, value
for (StateEntry<K, N, S> entry : entries) {
namespaceSerializer.serialize(entry.getNamespace(), dov);
keySerializer.serialize(entry.getKey(), dov);
stateSerializer.serialize(entry.getValue(), dov); // HERE IS THE SERIALIZATION
}
}
// During restore
private static <K, N, S> StateSnapshotKeyGroupReader createV2PlusReader(
StateTable<K, N, S> stateTable) {
TypeSerializer<N> namespaceSerializer = stateTable.getNamespaceSerializer();
TypeSerializer<S> stateSerializer = stateTable.getStateSerializer();
TypeSerializer<K> keySerializer = stateTable.keySerializer;
return KeyGroupPartitioner.createKeyGroupPartitionReader(
(DataInputView in) -> {
// Deserialize each entry
N namespace = namespaceSerializer.deserialize(in);
K key = keySerializer.deserialize(in);
S state = stateSerializer.deserialize(in); // HERE IS THE DESERIALIZATION
return Tuple3.of(namespace, key, state);
},
(Tuple3<N, K, S> element, int keyGroupId) -> {
// Put into state table
stateTable.put(element.f1, keyGroupId, element.f0, element.f2);
}
);
}Let's walk through a complete example of using state with custom types.
public class WordCountFunction extends RichFlatMapFunction<String, Tuple2<String, Long>> {
// State descriptor - defined as member variable
private transient ValueState<Long> countState;
@Override
public void open(Configuration parameters) {
// Step 1: Create TypeInformation
TypeInformation<Long> typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
// Step 2: Create StateDescriptor with TypeInformation
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>(
"word-count", // State name
typeInfo, // TypeInformation
0L); // Default value
// Step 3: Get state from runtime context
// Serializer will be created lazily inside the backend
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(String word, Collector<Tuple2<String, Long>> out) throws Exception {
// Step 4: Use state
Long currentCount = countState.value(); // Read from state (no serialization in heap backend)
currentCount += 1;
countState.update(currentCount); // Update state (no serialization yet)
out.collect(new Tuple2<>(word, currentCount));
}
}When getRuntimeContext().getState(descriptor) is called:
// In AbstractKeyedStateBackend.getOrCreateKeyedState()
if (!descriptor.isSerializerInitialized()) {
descriptor.initializeSerializerUnlessSet(executionConfig);
// This calls: BasicTypeInfo.LONG_TYPE_INFO.createSerializer(config)
// Which returns: LongSerializer.INSTANCE
}// In HeapKeyedStateBackend.tryRegisterStateTable()
RegisteredKeyValueStateBackendMetaInfo<VoidNamespace, Long> metaInfo =
new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE,
"word-count",
VoidNamespaceSerializer.INSTANCE, // Namespace serializer
LongSerializer.INSTANCE, // Value serializer (from TypeInformation)
null);
StateTable<String, VoidNamespace, Long> stateTable =
stateTableFactory.newStateTable(keyContext, metaInfo, keySerializer);// In HeapValueState.value()
Long currentCount = stateTable.get(VoidNamespace.INSTANCE);
// Returns: 0L (default value) on first access
// No serialization - value is stored as Java object
// In HeapValueState.update(1L)
stateTable.put(VoidNamespace.INSTANCE, 1L);
// Stores: Java Long object directly in HashMap
// Still no serialization!When checkpoint is triggered:
// For key "hello" with count 42L
// CopyOnWriteStateTableSnapshot.writeStateInKeyGroup()
DataOutputView output = ...;
// Serialize namespace (VoidNamespace - 0 bytes)
VoidNamespaceSerializer.INSTANCE.serialize(VoidNamespace.INSTANCE, output);
// Serialize key ("hello")
StringSerializer.INSTANCE.serialize("hello", output);
// Writes: [00 05 68 65 6C 6C 6F]
// Serialize value (42L)
LongSerializer.INSTANCE.serialize(42L, output);
// Writes: [00 00 00 00 00 00 00 2A]Complete checkpoint format for one entry:
[00 05 68 65 6C 6C 6F] [00 00 00 00 00 00 00 2A]
| |
└─ "hello" └─ 42L
When restoring from checkpoint:
DataInputView input = ...;
// Deserialize namespace
VoidNamespace namespace = VoidNamespaceSerializer.INSTANCE.deserialize(input);
// Deserialize key
String key = StringSerializer.INSTANCE.deserialize(input);
// Reads: "hello"
// Deserialize value
Long count = LongSerializer.INSTANCE.deserialize(input);
// Reads: 42L
// Put into state table
stateTable.put(key, keyGroupId, namespace, count);
// Now stored as Java objects in heappublic class SensorReadingFunction extends KeyedProcessFunction<String, SensorReading, Alert> {
private transient ListState<SensorReading> readingsState;
// Custom POJO
public static class SensorReading {
public String sensorId;
public long timestamp;
public double temperature;
// Getters, setters, constructor...
}
@Override
public void open(Configuration parameters) {
// Step 1: Create TypeInformation for custom POJO
// Flink automatically infers POJO type information
TypeInformation<SensorReading> typeInfo =
TypeInformation.of(SensorReading.class);
// Creates: PojoTypeInfo with field serializers
// Step 2: Create ListStateDescriptor
ListStateDescriptor<SensorReading> descriptor =
new ListStateDescriptor<>(
"sensor-readings",
typeInfo);
// Step 3: Get state
readingsState = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(
SensorReading reading,
Context ctx,
Collector<Alert> out) throws Exception {
// Add to list state
readingsState.add(reading);
// Get all readings
Iterable<SensorReading> allReadings = readingsState.get();
// Process...
}
}// TypeInformation.of(SensorReading.class) creates PojoTypeInfo
PojoTypeInfo<SensorReading> pojoTypeInfo = new PojoTypeInfo<>(
SensorReading.class,
new PojoField[] {
new PojoField(
SensorReading.class.getField("sensorId"),
BasicTypeInfo.STRING_TYPE_INFO), // Field TypeInformation
new PojoField(
SensorReading.class.getField("timestamp"),
BasicTypeInfo.LONG_TYPE_INFO),
new PojoField(
SensorReading.class.getField("temperature"),
BasicTypeInfo.DOUBLE_TYPE_INFO)
});// PojoTypeInfo.createSerializer()
TypeSerializer<SensorReading> serializer = new PojoSerializer<>(
SensorReading.class,
new TypeSerializer[] {
StringSerializer.INSTANCE, // For sensorId field
LongSerializer.INSTANCE, // For timestamp field
DoubleSerializer.INSTANCE // For temperature field
},
new Field[] {
SensorReading.class.getField("sensorId"),
SensorReading.class.getField("timestamp"),
SensorReading.class.getField("temperature")
});
// Then wrapped in ListSerializer
TypeSerializer<List<SensorReading>> listSerializer =
new ListSerializer<>(serializer);For a list with 2 readings:
// ListSerializer.serialize()
output.writeInt(2); // List size
// First reading
pojoSerializer.serialize(reading1, output);
// Writes: sensorId (String), timestamp (Long), temperature (Double)
// Second reading
pojoSerializer.serialize(reading2, output);Checkpoint format:
[00 00 00 02] // List size: 2 elements
// First SensorReading
[00 07 73 65 6E 73 6F 72 31] // "sensor1"
[00 00 01 7F 12 34 56 78] // timestamp
[40 5E 66 66 66 66 66 66] // temperature: 122.1
// Second SensorReading
[00 07 73 65 6E 73 6F 72 31] // "sensor1"
[00 00 01 7F 12 34 56 90] // timestamp
[40 5F 33 33 33 33 33 33] // temperature: 125.2
One of the most powerful features of Flink's type system is state schema evolution - the ability to change state types across job versions while maintaining savepoint compatibility.
Every TypeSerializer must provide a snapshot of its configuration:
public interface TypeSerializerSnapshot<T> {
/**
* Get the version of this snapshot format.
*/
int getCurrentVersion();
/**
* Write the serializer configuration to output.
*/
void writeSnapshot(DataOutputView out) throws IOException;
/**
* Read the serializer configuration from input.
*/
void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader)
throws IOException;
/**
* Restore the serializer from this snapshot.
*/
TypeSerializer<T> restoreSerializer();
/**
* Check compatibility with a new serializer.
*/
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer);
}public abstract class TypeSerializerSchemaCompatibility<T> {
/**
* Serializers are identical - no migration needed.
*/
public static <T> TypeSerializerSchemaCompatibility<T> compatibleAsIs();
/**
* Serializers are different but compatible - migration needed.
*/
public static <T> TypeSerializerSchemaCompatibility<T> compatibleAfterMigration();
/**
* Serializers are compatible with a reconfigured new serializer.
*/
public static <T> TypeSerializerSchemaCompatibility<T> compatibleWithReconfiguredSerializer(
TypeSerializer<T> reconfiguredSerializer);
/**
* Serializers are incompatible - cannot restore state.
*/
public static <T> TypeSerializerSchemaCompatibility<T> incompatible();
}public class User {
public String name;
public int age;
}
// TypeInformation creates PojoTypeInfo with 2 fields
// Checkpoint created with this schemapublic class User {
public String name;
public int age;
public String email; // NEW FIELD ADDED
}
// Same state name, but different POJO structure// In HeapKeyedStateBackend.tryRegisterStateTable()
// 1. Restore old serializer from snapshot
TypeSerializerSnapshot<User> oldSnapshot = restoredMetaInfo.getSerializerSnapshot();
TypeSerializer<User> oldSerializer = oldSnapshot.restoreSerializer();
// 2. Create new serializer from new TypeInformation
TypeSerializer<User> newSerializer = newTypeInfo.createSerializer(config);
// This creates PojoSerializer with 3 fields
// 3. Check compatibility
TypeSerializerSchemaCompatibility<User> compatibility =
oldSnapshot.resolveSchemaCompatibility(newSerializer);
if (compatibility.isCompatibleAfterMigration()) {
// Migration needed - transform all state values
// PojoSerializer can handle missing fields by setting defaults
stateTable.transformAll((User oldUser) -> {
User newUser = new User();
newUser.name = oldUser.name;
newUser.age = oldUser.age;
newUser.email = null; // Default for new field
return newUser;
});
} else if (compatibility.isIncompatible()) {
throw new StateMigrationException("Cannot restore - incompatible schema");
}// Original job
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("counter", BasicTypeInfo.LONG_TYPE_INFO);
// Checkpoint created with LongSerializer
// Modified job - change to String
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("counter", BasicTypeInfo.STRING_TYPE_INFO);Result: Incompatible - job will fail to restore from savepoint.
Why? LongSerializerSnapshot cannot deserialize as String. The types are fundamentally incompatible.
Compatible Changes (No Migration):
- Same TypeInformation
- Same serializer implementation
- Same type class
Compatible with Migration:
- Adding fields to POJO (with defaults)
- Removing fields from POJO
- Changing field order in POJO (if serializer supports)
- Upgrading Avro schema (with schema evolution rules)
Incompatible Changes:
- Changing type class (Long → String)
- Changing collection element type (List → List)
- Removing required POJO fields without defaults
- Breaking Avro schema compatibility
The TypeInformation → TypeSerializer design achieves clean separation:
-
TypeInformation:
- Type metadata and validation
- Serializer factory
- Type inference and propagation
- Owned by API layer (user-facing)
-
TypeSerializer:
- Actual serialization logic
- Performance-critical code
- State evolution logic
- Owned by runtime layer (internal)
Serializers are created lazily when state is first accessed:
// Descriptor created in open()
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("counter", typeInfo);
// No serializer created yet!
// Serializer created when state is accessed
ValueState<Long> state = getRuntimeContext().getState(descriptor);
// NOW: typeInfo.createSerializer() is calledWhy?
- ExecutionConfig may not be available at descriptor creation time
- Allows configuration to be applied before serializer creation
- Defers expensive initialization
Both TypeInformation and TypeSerializer use the Composite Pattern:
ListTypeInfo<String>
└─ elementTypeInfo: BasicTypeInfo<String>
ListSerializer<String>
└─ elementSerializer: StringSerializer
PojoTypeInfo<User>
├─ fieldTypeInfo[0]: BasicTypeInfo<String> (name)
├─ fieldTypeInfo[1]: BasicTypeInfo<Integer> (age)
└─ fieldTypeInfo[2]: BasicTypeInfo<String> (email)
PojoSerializer<User>
├─ fieldSerializer[0]: StringSerializer
├─ fieldSerializer[1]: IntSerializer
└─ fieldSerializer[2]: StringSerializer
This enables:
- Recursive serializer creation
- Type-safe composition
- Reusable building blocks
The same TypeInformation/TypeSerializer works across all backends:
- Stores deserialized objects
- Serialization only during checkpoint
- Fast read/write, high memory usage
// HeapValueState
stateTable.put(namespace, value); // Stores Java object
return stateTable.get(namespace); // Returns Java object- Stores serialized bytes
- Serialization on every access
- Slower access, lower memory usage
// RocksDBValueState
byte[] valueBytes = valueSerializer.serialize(value);
db.put(keyBytes, valueBytes); // Stores bytes
byte[] valueBytes = db.get(keyBytes);
return valueSerializer.deserialize(valueBytes); // Deserializes on readSame serializer, different usage pattern!
Every serializer must implement snapshotConfiguration():
@Override
public TypeSerializerSnapshot<T> snapshotConfiguration() {
return new MySerializerSnapshot();
}This snapshot is stored in checkpoint metadata and enables:
- Restore old serializer on job restart
- Compare old vs new serializer for compatibility
- Perform state migration if needed
- Validate savepoint compatibility
TypeSerializer supports object reuse to reduce GC pressure:
// Create reusable instance
User reusableUser = new User();
// Reuse in tight loop
for (int i = 0; i < records.length; i++) {
User user = serializer.deserialize(reusableUser, input);
// reusableUser is mutated and returned
process(user);
}This is critical for high-throughput scenarios where millions of objects are deserialized per second.
Serializers are duplicated for thread safety:
public TypeSerializer<T> getSerializer() {
return serializerAtomicReference.get().duplicate();
}Each operator instance gets its own serializer copy, avoiding synchronization overhead.
-
TypeInformation is the type metadata + serializer factory
- User-facing API
- Creates TypeSerializer instances
- Supports type inference
-
TypeSerializer performs actual serialization
- Runtime component
- Performance-critical
- Supports state evolution
-
Flow: TypeInformation → StateDescriptor → TypeSerializer → StateBackend
- Lazy initialization
- Serializer attached to state metadata
- Used during checkpointing
-
State Evolution is built into the design
- TypeSerializerSnapshot captures configuration
- Schema compatibility checks
- Automatic state migration
-
Backend Independence via abstraction
- Same serializers work for Heap and RocksDB
- Different storage strategies (objects vs bytes)
- Same user API
// Pattern 1: Basic type
TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
TypeSerializer<Integer> serializer = typeInfo.createSerializer(config);
// Result: IntSerializer.INSTANCE
// Pattern 2: Collection type
TypeInformation<List<String>> typeInfo = new ListTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO);
TypeSerializer<List<String>> serializer = typeInfo.createSerializer(config);
// Result: ListSerializer(StringSerializer)
// Pattern 3: POJO type
TypeInformation<User> typeInfo = TypeInformation.of(User.class);
TypeSerializer<User> serializer = typeInfo.createSerializer(config);
// Result: PojoSerializer with field serializers
// Pattern 4: State descriptor
ValueStateDescriptor<Long> descriptor =
new ValueStateDescriptor<>("my-state", BasicTypeInfo.LONG_TYPE_INFO);
// Serializer created lazily when state is accessed-
Use TypeInformation for state descriptors
- Don't create serializers manually unless necessary
- Let Flink manage serializer lifecycle
-
Prefer built-in types
- BasicTypeInfo for primitives and common types
- ListTypeInfo, MapTypeInfo for collections
- POJOs for custom types (if possible)
-
Design POJOs carefully
- Public fields or getters/setters
- No-arg constructor
- Not generic, not inner class
- Think about schema evolution
-
Test state migration
- Create savepoint with v1
- Restore with v2
- Verify compatibility
-
Avoid breaking changes
- Don't change type classes
- Don't remove required fields
- Add fields with defaults
Core Classes:
- TypeInformation:
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java - TypeSerializer:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java - StateDescriptor:
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
Implementations:
- BasicTypeInfo:
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java - IntSerializer:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java - ListSerializer:
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
State Backend:
- AbstractKeyedStateBackend:
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java - HeapKeyedStateBackend:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java - HeapValueState:
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java
This document provides an in-depth understanding of how Flink's state serialization and deserialization mechanism works through the TypeInformation and TypeSerializer abstractions. The design enables efficient state management, type safety, and schema evolution while maintaining clean separation between API and runtime layers.