kafka集群部署k8s(k8s基于Kuboard部署)

tech2022-10-31  132

1、下载镜像

docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka

2、启动zookeeper

注意事项

访问方式选择Headless, StatefulSet 中每个 Pod 的网络标识,在kafka访问zookeeper的时候可直接用zookeeper的Pod 的网络标识需要创建数据卷 Volume,应为zookeeper集群每个zookeeper需要自己的myid配置zookeeper配置文件,需要注意的是server.1=0.0.0.0:2888:3888配置,本机需要配置成0.0.0.0。不然会有一个报错,然后就是myid每个zookeeper不一样,其余的全一样

zookeeper配置文件

tickTime=2000

# The number of ticks that the initial

# synchronization phase can take

initLimit=10

# The number of ticks that can pass between

# sending a request and getting an acknowledgement

syncLimit=5

# the directory where the snapshot is stored.

# do not use /tmp for storage, /tmp here is just

# example sakes.

dataDir=/opt/zookeeper-3.4.13/data

# the port at which the clients will connect

clientPort=2181

# the maximum number of client connections.

# increase this if you need to handle more clients

#maxClientCnxns=60

#

# Be sure to read the maintenance section of the

# administrator guide before turning on autopurge.

#

# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

#

# The number of snapshots to retain in dataDir

autopurge.snapRetainCount=3

# Purge task interval in hours

# Set to "0" to disable auto purge feature

autopurge.purgeInterval=1

server.1=0.0.0.0:2888:3888

server.2=svc-zk02:2888:3888

server.3=svc-zk03:2888:3888

myid=1  

3、启动kafka

启动kafka需要映射端口服务,便于外网链接

配置文件参数

BROKER_ID每个kafka是不一样的

KAFKA_BROKER_ID=0

链接zookeeper地址

KAFKA_ZOOKEEPER_CONNECT=svc-zk01:2181,svc-zk02:2181,svc-zk03:2181

ADVERTISED_LISTENERS主要注意,这里填写node节点ip和9092默认端口映出来的端口号

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.158.160:31457

LISTENERS也要注意直接:9092即可,我试过填写其他地址不是服务起不来,就是各种报错

KAFKA_LISTENERS=PLAINTEXT://:9092

 

 kafka配置文件

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements.  See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License.  You may obtain a copy of the License at # #    http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker. broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from  # java.net.InetAddress.getCanonicalHostName() if not configured. #   FORMAT: #     listeners = listener_name://host_name:port #   EXAMPLE: #     listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,  # it uses the value for "listeners" if configured.  Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). advertised.listeners=PLAINTEXT://172.17.158.160:31457

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files log.dirs=/kafka/kafka-logs-svc-kafka-645654f984-mxlvl

# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: #    1. Durability: Unflushed data may be lost if you are not using replication. #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=svc-zk01:2181,svc-zk02:2181,svc-zk03:2181

# Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0

port=9092

 kafka.yaml   可根据自己需求需改,这是一个kafka服务的

--- apiVersion: apps/v1 kind: Deployment metadata:   annotations:     deployment.kubernetes.io/revision: '22'     k8s.eip.work/displayName: kafka     k8s.eip.work/ingress: 'false'     k8s.eip.work/service: NodePort     k8s.eip.work/workload: svc-kafka   creationTimestamp: '2020-08-27T01:58:50Z'   generation: 27   labels:     k8s.eip.work/layer: svc     k8s.eip.work/name: svc-kafka   managedFields:     - apiVersion: apps/v1       fieldsType: FieldsV1       fieldsV1:         'f:metadata': {}         'f:spec':           'f:selector': {}           'f:strategy': {}           'f:template':             'f:metadata': {}             'f:spec':               'f:containers':                 'k:{"name":"kafka"}':                   'f:env': {}               'f:imagePullSecrets': {}       manager: Mozilla       operation: Update       time: '2020-09-03T02:20:02Z'     - apiVersion: apps/v1       fieldsType: FieldsV1       fieldsV1:         'f:metadata': {}         'f:status':           'f:conditions': {}       manager: kube-controller-manager       operation: Update       time: '2020-09-03T02:20:03Z'   name: svc-kafka   namespace: kafka   resourceVersion: '29536513'   selfLink: /apis/apps/v1/namespaces/kafka/deployments/svc-kafka   uid: a32857de-452e-4e80-b6aa-1439addeacfe spec:   progressDeadlineSeconds: 600   replicas: 1   revisionHistoryLimit: 10   selector:     matchLabels:       k8s.eip.work/layer: svc       k8s.eip.work/name: svc-kafka   strategy:     rollingUpdate:       maxSurge: 25%       maxUnavailable: 25%     type: RollingUpdate   template:     metadata:       labels:         k8s.eip.work/layer: svc         k8s.eip.work/name: svc-kafka     spec:       containers:         - env:             - name: KAFKA_BROKER_ID               value: '0'             - name: KAFKA_ZOOKEEPER_CONNECT               value: 'svc-zk01:2181,svc-zk02:2181,svc-zk03:2181'             - name: KAFKA_ADVERTISED_LISTENERS               value: 'PLAINTEXT://172.17.158.160:31457'             - name: KAFKA_LISTENERS               value: 'PLAINTEXT://:9092'           image: '172.17.14.243/kafka/kafka:latest'           imagePullPolicy: Always           name: kafka           terminationMessagePath: /dev/termination-log           terminationMessagePolicy: File       dnsPolicy: ClusterFirst       imagePullSecrets:         - name: haisen       restartPolicy: Always       schedulerName: default-scheduler       terminationGracePeriodSeconds: 30 status:   availableReplicas: 1   conditions:     - lastTransitionTime: '2020-09-02T06:09:59Z'       lastUpdateTime: '2020-09-02T06:09:59Z'       message: Deployment has minimum availability.       reason: MinimumReplicasAvailable       status: 'True'       type: Available     - lastTransitionTime: '2020-08-27T01:58:50Z'       lastUpdateTime: '2020-09-03T02:20:03Z'       message: ReplicaSet "svc-kafka-645654f984" has successfully progressed.       reason: NewReplicaSetAvailable       status: 'True'       type: Progressing   observedGeneration: 27   readyReplicas: 1   replicas: 1   updatedReplicas: 1

--- apiVersion: v1 kind: Service metadata:   annotations:     k8s.eip.work/displayName: kafka     k8s.eip.work/workload: svc-kafka   creationTimestamp: '2020-09-01T08:51:08Z'   labels:     k8s.eip.work/layer: svc     k8s.eip.work/name: svc-kafka   managedFields:     - apiVersion: v1       fieldsType: FieldsV1       fieldsV1:         'f:metadata': {}         'f:spec':           'f:ports': {}       manager: Mozilla       operation: Update       time: '2020-09-01T08:51:08Z'   name: svc-kafka   namespace: kafka   resourceVersion: '28951527'   selfLink: /api/v1/namespaces/kafka/services/svc-kafka   uid: 0cfe9439-15bb-4b4b-b75a-e327f463e6b2 spec:   clusterIP: 10.102.214.178   externalTrafficPolicy: Cluster   ports:     - name: nx562n       nodePort: 31457       port: 9092       protocol: TCP       targetPort: 9092   selector:     k8s.eip.work/layer: svc     k8s.eip.work/name: svc-kafka   sessionAffinity: None   type: NodePort

 zookeeper.yaml   可根据自己需求需改,这是一个zookeeper服务的

--- apiVersion: apps/v1 kind: Deployment metadata:   annotations:     deployment.kubernetes.io/revision: '5'     k8s.eip.work/displayName: zk01     k8s.eip.work/ingress: 'false'     k8s.eip.work/service: None     k8s.eip.work/workload: svc-zk01   creationTimestamp: '2020-08-28T09:02:09Z'   generation: 14   labels:     k8s.eip.work/layer: svc     k8s.eip.work/name: svc-zk01   managedFields:     - apiVersion: apps/v1       fieldsType: FieldsV1       fieldsV1:         'f:metadata': {}         'f:spec':           'f:selector': {}           'f:strategy': {}           'f:template':             'f:metadata': {}             'f:spec':               'f:containers':                 'k:{"name":"zk01"}':                   'f:volumeMounts': {}               'f:imagePullSecrets': {}               'f:volumes':                 'k:{"name":"zk01"}': {}       manager: Mozilla       operation: Update       time: '2020-09-02T06:51:14Z'     - apiVersion: apps/v1       fieldsType: FieldsV1       fieldsV1:         'f:metadata': {}         'f:status':           'f:conditions': {}       manager: kube-controller-manager       operation: Update       time: '2020-09-02T08:41:15Z'   name: svc-zk01   namespace: kafka   resourceVersion: '29288035'   selfLink: /apis/apps/v1/namespaces/kafka/deployments/svc-zk01   uid: cb0ddd2b-79b7-49ad-a78f-0f7800a03f26 spec:   progressDeadlineSeconds: 600   replicas: 1   revisionHistoryLimit: 10   selector:     matchLabels:       k8s.eip.work/layer: svc       k8s.eip.work/name: svc-zk01   strategy:     rollingUpdate:       maxSurge: 25%       maxUnavailable: 25%     type: RollingUpdate   template:     metadata:       labels:         k8s.eip.work/layer: svc         k8s.eip.work/name: svc-zk01     spec:       containers:         - image: '172.17.14.243/kafka/zookeeper:latest'           imagePullPolicy: Always           name: zk01           terminationMessagePath: /dev/termination-log           terminationMessagePolicy: File           volumeMounts:             - mountPath: /opt/zookeeper-3.4.13/data               name: zk01             - mountPath: /opt/zookeeper-3.4.13/conf               name: zk01               subPath: conf       dnsPolicy: ClusterFirst       imagePullSecrets:         - name: haisen       restartPolicy: Always       schedulerName: default-scheduler       terminationGracePeriodSeconds: 30       volumes:         - name: zk01           persistentVolumeClaim:             claimName: zookeeper status:   availableReplicas: 1   conditions:     - lastTransitionTime: '2020-08-28T09:02:09Z'       lastUpdateTime: '2020-09-02T07:02:28Z'       message: ReplicaSet "svc-zk01-6fc6754575" has successfully progressed.       reason: NewReplicaSetAvailable       status: 'True'       type: Progressing     - lastTransitionTime: '2020-09-02T08:41:15Z'       lastUpdateTime: '2020-09-02T08:41:15Z'       message: Deployment has minimum availability.       reason: MinimumReplicasAvailable       status: 'True'       type: Available   observedGeneration: 14   readyReplicas: 1   replicas: 1   updatedReplicas: 1

--- apiVersion: v1 kind: Service metadata:   annotations:     k8s.eip.work/displayName: zk01     k8s.eip.work/workload: svc-zk01   creationTimestamp: '2020-08-28T09:02:10Z'   labels:     k8s.eip.work/layer: svc     k8s.eip.work/name: svc-zk01   managedFields:     - apiVersion: v1       fieldsType: FieldsV1       fieldsV1:         'f:metadata': {}         'f:spec': {}       manager: Mozilla       operation: Update       time: '2020-08-28T09:02:10Z'   name: svc-zk01   namespace: kafka   resourceVersion: '27605474'   selfLink: /api/v1/namespaces/kafka/services/svc-zk01   uid: b77a548c-dd40-4879-a7de-9c88a714ffb9 spec:   clusterIP: None   selector:     k8s.eip.work/layer: svc     k8s.eip.work/name: svc-zk01   sessionAffinity: None   type: ClusterIP

最新回复(0)