java基于kafka实现消息队列

分类:软件编程
阅读:573
作者:majingjing
发布:2017-05-19 16:16

kafka的windows环境搭建请参考文章 Windows上搭建Kafka运行环境

本示例代码都是在kafka版本0.8.0基础上编写

查看官网说明 http://kafka.apache.org/08/documentation.html

由于官方给出的代码示例比较正式,使用到了线程和线程池来演示.乍一看还是很复杂的,对于刚开始接触kafka来说并不是很直观. 我学习的步骤就是看下kafka是如何完成消息队列的推送和接收,这个才是我关注的重点.

官方示例代码

QQ截图20170519171859.png

QQ截图20170519171950.png

下面给出示例代码,在官方示例代码的基础上进行删减

负责发布消息代码
public class TmKafkaProducer {

	public static void main(String[] args) {
		Properties props = new Properties();
		// 此处配置的是kafka的端口
		props.put("metadata.broker.list", "127.0.0.1:9092");

		// 配置value的序列化类
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		// 配置key的序列化类
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");

		// request.required.acks
		props.put("request.required.acks", "-1");

		ProducerConfig config = new ProducerConfig(props);

		Producer<String, String> producer = new Producer<String, String>(config);

		KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "1", "def");
		producer.send(data);
		producer.close();
	}
}
消息接收者
public class TmKafkaConsumer {

	public static void main(String[] args) {
		String topic = "test";

		Properties props = new Properties();
		props.put("zookeeper.connect", "192.168.1.45:2181");
		props.put("group.id", "jd-group");
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");

		ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(1));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

		for (final KafkaStream<byte[], byte[]> stream : streams) {
			ConsumerIterator<byte[], byte[]> it = stream.iterator();
			while (it.hasNext()) {
				System.out.println(new String(it.next().message()));
			}
		}

		try {
			Thread.sleep(10000);
		} catch (InterruptedException ie) {

		}
	}
}

示例源代码地址: TM-kafka-demo