kafka的windows环境搭建请参考文章 Windows上搭建Kafka运行环境
本示例代码都是在kafka版本0.8.0基础上编写
查看官网说明 http://kafka.apache.org/08/documentation.html
由于官方给出的代码示例比较正式,使用到了线程和线程池来演示.乍一看还是很复杂的,对于刚开始接触kafka来说并不是很直观. 我学习的步骤就是看下kafka是如何完成消息队列的推送和接收,这个才是我关注的重点.
官方示例代码
下面给出示例代码,在官方示例代码的基础上进行删减
负责发布消息代码
public class TmKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
// 此处配置的是kafka的端口
props.put("metadata.broker.list", "127.0.0.1:9092");
// 配置value的序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 配置key的序列化类
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
// request.required.acks
props.put("request.required.acks", "-1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "1", "def");
producer.send(data);
producer.close();
}
}
消息接收者
public class TmKafkaConsumer {
public static void main(String[] args) {
String topic = "test";
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.1.45:2181");
props.put("group.id", "jd-group");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (final KafkaStream<byte[], byte[]> stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(new String(it.next().message()));
}
}
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
}
}
示例源代码地址: TM-kafka-demo