教育行业A股IPO第一股(股票代码 003032)

全国咨询/投诉热线:400-618-4000

Kafka怎样创建事务编程?【完整代码编写和测试流程】

更新时间:2022年01月12日15时35分 来源:传智教育 浏览次数:

需求

在Kafka的topic 「ods_user」中有一些用户数据,数据格式如下:

| 姓名,性别,出生日期

| 张三,1,1980-10-09

| 李四,0,1985-11-01

我们需要编写程序,将用户的性别转换为男、女(1-男,0-女),转换后将数据写入到topic 「dwd_user」中。要求使用事务保障,要么消费了数据同时写入数据到 topic,提交offset。要么全部失败。

启动生产者控制台程序模拟数据

# 创建名为ods_user和dwd_user的主题
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic ods_user
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic dwd_user
# 生产数据到 ods_user
bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic ods_user
# 从dwd_user消费数据
bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic dwd_user 
--from-beginning  --isolation-level read_committed

编写创建消费者代码

编写一个方法 createConsumer,该方法中返回一个消费者,订阅「ods_user」主题。注意:需要配置事务隔离级别、关闭自动提交。

实现步骤:

1. 创建Kafka消费者配置

Properties props = new Properties();
 props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
 props.setProperty("group.id", "ods_user");
 props.put("isolation.level","read_committed");
 props.setProperty("enable.auto.commit", "false");
 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

2. 创建消费者,并订阅 ods_user 主题

//1.创建消费者
publicstaticConsumer<String,String>createConsumer(){
//1.创建Kafka消费者配置
Propertiesprops=newProperties();
props.setProperty("bootstrap.servers","node1.itcast.cn:9092");
props.setProperty("group.id","ods_user");
props.put("isolation.level","read_committed");
props.setProperty("enable.auto.commit","false");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

//2.创建Kafka消费者
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

//3.订阅要消费的主题
consumer.subscribe(Arrays.asList("ods_user"));

returnconsumer;
}

编写创建生产者代码

编写一个方法 createProducer,返回一个生产者对象。注意:需要配置事务的id,开启了事务会默认开启幂等性。

1. 创建生产者配置

Propertiesprops=newProperties();
props.put("bootstrap.servers","node1.itcast.cn:9092");
props.put("transactional.id","dwd_user");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

2.创建生产者对象

publicstaticProducer < String, String > createProduceer() {
    //1.创建生产者配置
    Propertiesprops = newProperties();
    props.put("bootstrap.servers", "node1.itcast.cn:9092");
    props.put("transactional.id", "dwd_user");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //2.创建生产者
    Producer < String, String > producer = newKafkaProducer < > (props);
    returnproducer;
}

编写代码消费并生产数据

实现步骤:

1. 调用之前实现的方法,创建消费者、生产者对象

2. 生产者调用initTransactions初始化事务

3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic

(1) 生产者开启事务

(2) 消费者拉取消息

(3) 遍历拉取到的消息,并进行预处理(将1转换为男,0转换为女)

(4) 生产消息到dwd_user topic中

(5) 提交偏移量到事务中

(6) 提交事务

(7) 捕获异常,如果出现异常,则取消事务

publicstaticvoidmain(String[] args) {
    Consumer < String, String > consumer = createConsumer();
    Producer < String, String > producer = createProducer();
    //初始化事务
    producer.initTransactions();
    while (true) {
        try {
            //1.开启事务
            producer.beginTransaction();
            //2.定义Map结构,用于保存分区对应的offset
            Map < TopicPartition, OffsetAndMetadata > offsetCommits = newHashMap < > ();
            //2.拉取消息
            ConsumerRecords < String, String > records = consumer.poll(Duration.ofSeconds(2));
            for (ConsumerRecord < String, String > record: records) {
                //3.保存偏移量
                offsetCommits.put(newTopicPartition(record.topic(), record.partition()), newOffsetAndMetadata(record.offset() + 1));
                //4.进行转换处理
                String[] fields = record.value().split(",");
                fields[1] = fields[1].equalsIgnoreCase("1") ? "男" : "女";
                Stringmessage = fields[0] + "," + fields[1] + "," + fields[2];
                //5.生产消息到dwd_user
                producer.send(newProducerRecord < > ("dwd_user", message));
            }
            //6.提交偏移量到事务
            producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
            //7.提交事务
            producer.commitTransaction();
        } catch (Exceptione) {
            //8.放弃事务
            producer.abortTransaction();
        }
    }
}

测试

往之前启动的console-producer中写入消息进行测试,同时检查console-consumer是否能够接收到消息:

kafka 创建事务编程

逐个测试一下消息:

  • 张三,1,1980-10-09
  • 李四,0,1985-11-01
//3.保存偏移量
offsetCommits.put(newTopicPartition(record.topic(),record.partition()),
       newOffsetAndMetadata(record.offset()+1));
//4.进行转换处理
String[]fields=record.value().split(",");
fields[1]=fields[1].equalsIgnoreCase("1")?"男":"女";
Stringmessage=fields[0]+","+fields[1]+","+fields[2];

//模拟异常
inti=1/0;

//5.生产消息到dwd_user
producer.send(newProducerRecord<>("dwd_user",message));

启动程序一次,抛出异常。

再启动程序一次,还是抛出异常。

直到我们处理该异常为止。

我们发现,可以消费到消息,但如果中间出现异常的话,offset是不会被提交的,除非消费、生产消息都成功,才会提交事务。






猜你喜欢:

怎样一键启动或关闭Kafka?有快捷的方法吗?

为什么选择kafka采集数据?

Kafka的常用API介绍[大数据培训]

SparkStreaming连接Kafka两种方式

传智教育python+大数据开发培训

0 分享到:
和我们在线交谈!