kafka消费者消息写文件

tech2024-05-14  82

package com.jasongj.kafka.consumer; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.serialization.StringDeserializer; public class WriteBlock { public static void main(String[] args) throws Exception { args = new String[]{"localhost:9092", "topic1", "test-consumer-group", "consumer3"}; if (args == null || args.length != 4) { System.err.println( "Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}"); System.exit(1); } String bootstrap = args[0]; String topic = args[1]; String groupid = args[2]; String clientid = args[3]; Properties props = new Properties(); props.put("bootstrap.servers", bootstrap); props.put("group.id", groupid); props.put("enable.auto.commit", "false"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("max.poll.interval.ms", "300000"); props.put("max.poll.records", "500"); props.put("auto.offset.reset", "earliest"); props.put("deserializer.encoding", "UTF-8"); org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); records.partitions().forEach(topicPartition -> { List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition); partitionRecords.forEach(record -> { String temp = record.value()+"\n\r"; try { writeFile( temp); } catch (IOException e) { e.printStackTrace(); } temp = ""; }); long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1))); }); } } public static synchronized void writeFile( String conent) throws IOException { Date now = new Date(); SimpleDateFormat dateFormatMinute= new SimpleDateFormat("yyyy-MM-dd-HH"); String hour = dateFormatMinute.format( now ); SimpleDateFormat dateFormatSecond= new SimpleDateFormat("yyyy-MM-dd-HH-mm"); String minute = dateFormatSecond.format( now ); File dir = new File("d:\\log_dir\\"+hour+"\\"); File file = new File("d:\\log_dir\\"+hour+"\\"+minute); //判断日期文件夹是否存在,不存在的话创建 if (!dir.isDirectory()) { dir.mkdir(); } //判断文件是否存在,不存在的话创建 if (!file.exists()) { try { file.createNewFile(); } catch (IOException e) { e.printStackTrace(); } } BufferedWriter out = null; try { out = new BufferedWriter(new OutputStreamWriter( new FileOutputStream(file, true))); out.write(conent); } catch (Exception e) { e.printStackTrace(); } finally { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } } package com.jasongj.kafka.consumer; import java.io.*; import java.text.SimpleDateFormat; import java.util.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; public class WriteBlock { private static final Logger logger = LogManager.getLogger(WriteBlock.class); public static void main(String[] args) throws Exception { //args = new String[] { "kafka0:9092", "topic1", "group2", "consumer2" }; //args = new String[]{"localhost:9092", "topic1", "test-consumer-group", "consumer3"}; if (args == null || args.length != 4) { System.err.println( "Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}"); System.exit(1); } String bootstrap = args[0]; String topic = args[1]; String groupid = args[2]; String clientid = args[3]; Properties props = new Properties(); props.put("bootstrap.servers", bootstrap); props.put("group.id", groupid); props.put("enable.auto.commit", "false"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); props.put("max.poll.interval.ms", "300000"); props.put("max.poll.records", "500"); props.put("auto.offset.reset", "earliest"); props.put("deserializer.encoding", "UTF-8"); org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); records.partitions().forEach(topicPartition -> { List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition); partitionRecords.forEach(record -> { String temp = record.value()+"\n\r"; //System.out.println(temp); try { writeFile( temp); } catch (IOException e) { logger.info(e.getMessage()); } temp = ""; }); long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(lastOffset + 1))); }); } } public static synchronized void writeFile( String content) throws IOException { Date now = new Date(); SimpleDateFormat dateFormatHour= new SimpleDateFormat("yyyy-MM-dd-HH"); String hour = dateFormatHour.format( now ); SimpleDateFormat dateFormatMinute= new SimpleDateFormat("yyyy-MM-dd-HH-mm"); String minute = dateFormatMinute.format( now ); File dir = new File("/home/ubuntu/writeBlock/"+hour+"/"); File file = new File("/home/ubuntu/writeBlock/"+hour+"/"+minute); //File dir = new File("d:\\log_dir\\"+hour+"\\"); //File file = new File("d:\\log_dir\\"+hour+"\\"+minute); //判断日期文件夹是否存在,不存在的话创建 if (!dir.isDirectory()) { dir.mkdir(); } //判断文件是否存在,不存在的话创建 if (!file.exists()) { file.createNewFile(); } BufferedWriter out = new BufferedWriter(new OutputStreamWriter( new FileOutputStream(file, true))); out.write(content); out.close(); } }
最新回复(0)