package com
.jasongj
.kafka
.consumer
;
import java
.io
.*
;
import java
.text
.SimpleDateFormat
;
import java
.util
.*
;
import java
.util
.concurrent
.atomic
.AtomicLong
;
import org
.apache
.kafka
.clients
.consumer
.ConsumerRecord
;
import org
.apache
.kafka
.clients
.consumer
.ConsumerRecords
;
import org
.apache
.kafka
.clients
.consumer
.KafkaConsumer
;
import org
.apache
.kafka
.clients
.consumer
.OffsetAndMetadata
;
import org
.apache
.kafka
.common
.serialization
.StringDeserializer
;
public class WriteBlock {
public static void main(String
[] args
) throws Exception
{
args
= new String[]{"localhost:9092", "topic1", "test-consumer-group", "consumer3"};
if (args
== null
|| args
.length
!= 4) {
System
.err
.println(
"Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}");
System
.exit(1);
}
String bootstrap
= args
[0];
String topic
= args
[1];
String groupid
= args
[2];
String clientid
= args
[3];
Properties props
= new Properties();
props
.put("bootstrap.servers", bootstrap
);
props
.put("group.id", groupid
);
props
.put("enable.auto.commit", "false");
props
.put("key.deserializer", StringDeserializer
.class.getName());
props
.put("value.deserializer", StringDeserializer
.class.getName());
props
.put("max.poll.interval.ms", "300000");
props
.put("max.poll.records", "500");
props
.put("auto.offset.reset", "earliest");
props
.put("deserializer.encoding", "UTF-8");
org
.apache
.kafka
.clients
.consumer
.KafkaConsumer
<String, String> consumer
= new org.apache.kafka.clients.consumer.KafkaConsumer<>(props
);
consumer
.subscribe(Arrays
.asList(topic
));
while (true) {
ConsumerRecords
<String, String> records
= consumer
.poll(100);
records
.partitions().forEach(topicPartition
-> {
List
<ConsumerRecord
<String, String>> partitionRecords
= records
.records(topicPartition
);
partitionRecords
.forEach(record
-> {
String temp
= record
.value()+"\n\r";
try {
writeFile( temp
);
} catch (IOException e
) {
e
.printStackTrace();
}
temp
= "";
});
long lastOffset
= partitionRecords
.get(partitionRecords
.size() - 1).offset();
consumer
.commitSync(Collections
.singletonMap(topicPartition
, new OffsetAndMetadata(lastOffset
+ 1)));
});
}
}
public static synchronized void writeFile( String conent
) throws IOException
{
Date now
= new Date();
SimpleDateFormat dateFormatMinute
= new SimpleDateFormat("yyyy-MM-dd-HH");
String hour
= dateFormatMinute
.format( now
);
SimpleDateFormat dateFormatSecond
= new SimpleDateFormat("yyyy-MM-dd-HH-mm");
String minute
= dateFormatSecond
.format( now
);
File dir
= new File("d:\\log_dir\\"+hour
+"\\");
File file
= new File("d:\\log_dir\\"+hour
+"\\"+minute
);
if (!dir
.isDirectory()) {
dir
.mkdir();
}
if (!file
.exists()) {
try {
file
.createNewFile();
} catch (IOException e
) {
e
.printStackTrace();
}
}
BufferedWriter out
= null
;
try {
out
= new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file
, true)));
out
.write(conent
);
} catch (Exception e
) {
e
.printStackTrace();
} finally {
try {
out
.close();
} catch (IOException e
) {
e
.printStackTrace();
}
}
}
}
package com
.jasongj
.kafka
.consumer
;
import java
.io
.*
;
import java
.text
.SimpleDateFormat
;
import java
.util
.*
;
import org
.apache
.kafka
.clients
.consumer
.ConsumerRecord
;
import org
.apache
.kafka
.clients
.consumer
.ConsumerRecords
;
import org
.apache
.kafka
.clients
.consumer
.OffsetAndMetadata
;
import org
.apache
.kafka
.common
.serialization
.StringDeserializer
;
import org
.apache
.log4j
.LogManager
;
import org
.apache
.log4j
.Logger
;
public class WriteBlock {
private static final Logger logger
= LogManager
.getLogger(WriteBlock
.class);
public static void main(String
[] args
) throws Exception
{
if (args
== null
|| args
.length
!= 4) {
System
.err
.println(
"Usage:\n\tjava -jar kafka_consumer.jar ${bootstrap_server} ${topic_name} ${group_name} ${client_id}");
System
.exit(1);
}
String bootstrap
= args
[0];
String topic
= args
[1];
String groupid
= args
[2];
String clientid
= args
[3];
Properties props
= new Properties();
props
.put("bootstrap.servers", bootstrap
);
props
.put("group.id", groupid
);
props
.put("enable.auto.commit", "false");
props
.put("key.deserializer", StringDeserializer
.class.getName());
props
.put("value.deserializer", StringDeserializer
.class.getName());
props
.put("max.poll.interval.ms", "300000");
props
.put("max.poll.records", "500");
props
.put("auto.offset.reset", "earliest");
props
.put("deserializer.encoding", "UTF-8");
org
.apache
.kafka
.clients
.consumer
.KafkaConsumer
<String, String> consumer
= new org.apache.kafka.clients.consumer.KafkaConsumer<>(props
);
consumer
.subscribe(Arrays
.asList(topic
));
while (true) {
ConsumerRecords
<String, String> records
= consumer
.poll(100);
records
.partitions().forEach(topicPartition
-> {
List
<ConsumerRecord
<String, String>> partitionRecords
= records
.records(topicPartition
);
partitionRecords
.forEach(record
-> {
String temp
= record
.value()+"\n\r";
try {
writeFile( temp
);
} catch (IOException e
) {
logger
.info(e
.getMessage());
}
temp
= "";
});
long lastOffset
= partitionRecords
.get(partitionRecords
.size() - 1).offset();
consumer
.commitSync(Collections
.singletonMap(topicPartition
, new OffsetAndMetadata(lastOffset
+ 1)));
});
}
}
public static synchronized void writeFile( String content
) throws IOException
{
Date now
= new Date();
SimpleDateFormat dateFormatHour
= new SimpleDateFormat("yyyy-MM-dd-HH");
String hour
= dateFormatHour
.format( now
);
SimpleDateFormat dateFormatMinute
= new SimpleDateFormat("yyyy-MM-dd-HH-mm");
String minute
= dateFormatMinute
.format( now
);
File dir
= new File("/home/ubuntu/writeBlock/"+hour
+"/");
File file
= new File("/home/ubuntu/writeBlock/"+hour
+"/"+minute
);
if (!dir
.isDirectory()) {
dir
.mkdir();
}
if (!file
.exists()) {
file
.createNewFile();
}
BufferedWriter out
= new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file
, true)));
out
.write(content
);
out
.close();
}
}