博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka
阅读量:5938 次
发布时间:2019-06-19

本文共 6672 字,大约阅读时间需要 22 分钟。

kafka是什么

  • kafka是采用scala语言开发的一个 多分区 、 多副本 且 基于zookeeper协调的 分布式 消息系统。
  • kafka是 高吞吐、可持久化、可水平扩展、支持流数据等多种特性的分布式流式处理平台
  • kafka扮演的三大角色:消息系统、存储系统、流式处理平台

基本概念

  • Producer:
  • Consumer:
  • broker:

docker安装kafka

  • 安装docker-compose
sudo curl -L https://github.com/docker/compose/releases/download/1.16.1/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose#daocloud镜像源#curl -L https://get.daocloud.io/docker/compose/releases/download/1.22.0/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-composechmod +x /usr/local/bin/docker-composedocker-compose --version

下载zookeeper和kafka镜像

docker pull wurstmeister/kafka:2.12-2.2.0docker pull wurstmeister/zookeeper:3.4.6

启动zookeeper集群和kafka集群

docker-compose.yml的内容如下

version: '3.1'services:  zoo1:    image: wurstmeister/zookeeper:3.4.6    restart: always    hostname: zoo1    container_name: zoo1    #domainname:     ports:      - 2181:2181    volumes:      - /usr/local/docker_app/zookeeper/zoo1/data:/data      - /usr/local/docker_app/zookeeper/zoo1/datalog:/datalog    environment:      ZOO_MY_ID: 1      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888  zoo2:    image: wurstmeister/zookeeper:3.4.6    restart: always    hostname: zoo2    container_name: zoo2    ports:      - 2182:2181    volumes:      - /usr/local/docker_app/zookeeper/zoo2/data:/data      - /usr/local/docker_app/zookeeper/zoo2/datalog:/datalog    environment:      ZOO_MY_ID: 2      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888  zoo3:    image: wurstmeister/zookeeper:3.4.6    restart: always    hostname: zoo3    container_name: zoo3    ports:      - 2183:2181    volumes:      - /usr/local/docker_app/zookeeper/zoo3/data:/data      - /usr/local/docker_app/zookeeper/zoo3/datalog:/datalog    environment:      ZOO_MY_ID: 3      ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888  kafka1:    image: wurstmeister/kafka:2.12-2.2.0    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_HOST_NAME: kafka1      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"      KAFKA_BROKER_ID: 1    depends_on:      - zoo1      - zoo2      - zoo3    container_name: kafka1    hostname: kafka1  kafka2:    image: wurstmeister/kafka:2.12-2.2.0    ports:      - "9093:9092"    environment:      KAFKA_ADVERTISED_HOST_NAME: kafka2      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"      KAFKA_BROKER_ID: 2    depends_on:      - zoo1      - zoo2      - zoo3    container_name: kafka2    hostname: kafka2  kafka3:    image: wurstmeister/kafka:2.12-2.2.0    ports:      - "9094:9092"    environment:      KAFKA_ADVERTISED_HOST_NAME: kafka3      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2181,zoo3:2181"      KAFKA_BROKER_ID: 3    depends_on:      - zoo1      - zoo2      - zoo3    container_name: kafka3    hostname: kafka3

启动集群

#启动集群docker-compose -f docker-compose.yml up -d#查看启动状态docker-compose -f docker-compose.yml ps

如下图

image

测试集群

docker exec -it kafka1 /bin/bash kafka-topics.sh -zookeeper zoo1:2181 --create --topic topic-demo --replication-factor 1 --partitions 2kafka-topics.sh -zookeeper zoo1:2181 --describe --topic topic-demokafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic topic-demodocker exec -it kafka1 /bin/bashkafka-console-producer.sh --broker-list kafka1:9092 --topic topic-demo>hello>hello kafka

image

image

使用java客户端连接kafka

  • spring boot项目,pom.xml
4.0.0
org.springframework.boot
spring-boot-starter-parent
2.1.4.RELEASE
com.example
spring-boot-kafka
0.0.1-SNAPSHOT
spring-boot-kafka
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter
org.springframework.kafka
spring-kafka
org.springframework.boot
spring-boot-starter-test
test
org.springframework.boot
spring-boot-maven-plugin
  • java代码
package com.example.demo;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class SpringBootKafkaApplication {    public static final String brokerList = "kafka1:9092";    public static final String topic = "topic-demo";        public static void main(String[] args) {        SpringApplication.run(SpringBootKafkaApplication.class, args);                Properties properties = new Properties();        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());                KafkaProducer
producer = new KafkaProducer<>(properties); ProducerRecord
record = new ProducerRecord
(topic, "hello, renchenglin"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } producer.close(); } }

遇到的坑

在启动kafka时,

KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
配置为
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
时,在安装docker的虚拟机上(IP为192.168.31.109)可以正常测试,在宿主机(安装docker的虚拟机的物理机,IP为192.168.31.201)上使用java程序无法访问

后来改为

KAFKA_ADVERTISED_HOST_NAME: kafka3
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
时仍然无法访问,最终在物理机上配置上配置DNS
C:WindowsSystem32driversetchosts 文件中追加如下配置OK

192.168.31.109 kafka1192.168.31.109 kafka2192.168.31.109 kafka3

转载地址:http://pittx.baihongyu.com/

你可能感兴趣的文章