Kafka安装部署及Java实现发送接收消息

 

1、需要先安装jdk,安装kafka

curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz

tar zxvf kafka_2.10-0.9.0.0.tgz

cd kafka_2.10-0.9.0.0.tgz

2、Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台

bin/zookeeper-server-start.sh config/zookeeper.properties &

3、启动Kafka:

bin/kafka-server-start.sh config/server.properties

4、运行producer 模拟消息生产者,此时可以在控制台输入消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

5、运行consumer,模拟消费者,此时可以在控制台查看接收到的消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

6、创建一个maven项目,添加依赖

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency>
7、创建配置类,主要是一些配置信息
public interface KafkaProperties { /** * 服务器相关信息配置 */ final static String zkConnect = "139.159.225.241:2181"; final static String groupId = "group1"; final static String topic = "topic1"; final static String kafkaServerURL = "kafka.host"; final static int kafkaServerPort = 9092; final static int kafkaProducerBufferSize = 64 * 1024; final static int connectionTimeOut = 20000; final static int reconnectInterval = 10000; final static String topic2 = "topic2"; final static String topic3 = "topic3"; final static String clientId = "SimpleConsumerDemoClient"; }
8、创建消息生产者,产生消息,发送消息
import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import java.util.Properties;/** * 消息生产者 */ public class kafkaProducer extends Thread { private final kafka.javaapi.producer.Producer<Integer, String> producer; private final String topic; private final Properties props = new Properties(); public kafkaProducer(String topic) { props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "139.159.225.241:9092"); //多个分开写 producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); this.topic = topic; } @Override public void run() { int messageNo = 1; while (true) { String messageStr = new String("Message_" + messageNo); System.out.println("Send:" + messageStr); producer.send(new KeyedMessage<Integer, String>(topic, messageStr)); messageNo++; try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
9、创建消息消费者,处理消息
/** * 消息消费者 */ public class KafkaConsumer extends Thread { private final ConsumerConnector consumer; private final String topic;public KafkaConsumer(String topic) { consumer = kafka.consumer.Consumer.createJavaConsumerConnector( createConsumerConfig()); this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put("zookeeper.connect", KafkaProperties.zkConnect); props.put("group.id", KafkaProperties.groupId); props.put("zookeeper.session.timeout.ms", "40000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println("receive:" + new String(it.next().message())); try { sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
10、启动测试kafka
public class KafkaConsumerProducerDemo { public static void main(String[] args) { kafkaProducer producerThread = new kafkaProducer(KafkaProperties.topic); producerThread.start(); KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic); consumerThread.start(); } }
常用命令:

启动Zookeeper server:

bin/zookeeper-server-start.sh config/zookeeper.properties &

&是为了能退出命令行

启动Kafka server:

bin/kafka-server-start.sh config/server.properties &

停止Kafka server

bin/kafka-server-stop.sh

停止Zookeeper server:

bin/zookeeper-server-stop.sh