Gson(de) 序列化器

此示例使用 gson 庫將 java 物件對映到 json 字串。 (de)序列化器是通用的,但它們並不總是需要!

序列

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

用法

序列化程式通過所需的 key.serializervalue.serializer 生產者屬性定義。

假設我們有一個名為 SensorValue 的 POJO 類,我們想要生成沒有任何鍵的訊息(鍵設定為 null):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

key.serializer 是必需的配置。由於我們沒有指定訊息金鑰,我們保留 StringSerializer 隨附 kafka,它能夠處理 null)。

解串器

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }

    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }

    @Override
    public void close() {}
}

用法

反序列化器通過所需的 key.deserializervalue.deserializer 消費者屬性來定義。

假設我們有一個名為 SensorValue 的 POJO 類,我們想要生成沒有任何鍵的訊息(鍵設定為 null):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

在這裡,我們為消費者配置新增一個自定義屬性,即 CONFIG_VALUE_CLASSGsonDeserializer 將在 configure() 方法中使用它來確定它應該處理的 POJO 類(新增到 props 的所有屬性將以地圖的形式傳遞給 configure 方法)。