消息要在网络上进行传输,必须进行序列化,序列化器的作用就是如此。 Kafka默认提供了多种种序列化器,包括字符串序列化器(org.apache.kafka.common.serialization.StringSerializer)、数值序列化器(org.apache.kafka.common.serialization.ShortSerializer、org.apache.kafka.common.serialization.IntegerSerializer、org.apache.kafka.common.serialization.LongSerializer、org.apache.kafka.common.serialization.FloatSerializer、org.apache.kafka.common.serialization.DoubleSerializer)和字节序列化器(ByteArraySerializer、ByteBufferSerializer)等。 在日常工作中,以上序列化器满足大多场景的使用,如果要满足特殊需求,可能通过实现Serializable接口来自定义序列化器。
Java代码实现 Person.class
package com.example.kafkaproducer; import lombok.Value; /** * @author Admin */ // Lombok注解 @Value public class Person { String name; String address; }PersonSerializer.class
package com.example.kafkaproducer; import org.apache.kafka.common.serialization.Serializer; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.Map; /** * @author Admin */ public class PersonSerializer implements Serializer<Person> { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String s, Person person) { if (person == null) { return null; } // 定义name和address的数组 byte[] name, address; try { // 将Person的name属性转换为数组 if (person.getName()!= null) { name = person.getName().getBytes("UTF-8"); } else { name = new byte[0]; } // 将Person的address属性转换为数组 if (person.getAddress()!= null) { address = person.getAddress().getBytes("UTF-8"); } else { address = new byte[0]; } //使用ByteBuffer拼接数组 ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length); buffer.putInt(name.length); buffer.put(name); buffer.putInt(address.length); buffer.put(address); // 返回buffer的数组对象 return buffer.array(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return new byte[0]; } @Override public void close() { } }