docker-elk-experiment

fle

// filebeat-->logstash-->elasticsearch

docker network create my-elk

sysctl -w vm.max_map_count=262144


docker pull elasticsearch:7.5.2
docker pull kibana:7.5.2
docker pull logstash:7.5.2
docker pull elastic/filebeat:7.5.2

mkdir -p $(pwd)/elasticsearch
mkdir -p $(pwd)/kibana
mkdir -p $(pwd)/logstash
mkdir -p $(pwd)/filebeat

docker run -itd --rm --name elasticsearch-tmp elasticsearch:7.5.2 sleep 10
docker cp elasticsearch-tmp:/usr/share/elasticsearch/config $(pwd)/elasticsearch

docker run -itd --rm --name kibana-tmp kibana:7.5.2 sleep 10
docker cp kibana-tmp:/usr/share/kibana/config $(pwd)/kibana

docker run -itd --rm --name logstash-tmp logstash:7.5.2 sleep 10
docker cp logstash-tmp:/usr/share/logstash/config $(pwd)/logstash

docker run -itd --rm --name filebeat-tmp elastic/filebeat:7.5.2 sleep 10
docker cp filebeat-tmp:/usr/share/filebeat/filebeat.yml $(pwd)/filebeat/filebeat.yml

## run elasticsearch

sed -i '$abootstrap.memory_lock: true' $(pwd)/elasticsearch/config/elasticsearch.yml
sed -i '$adiscovery.type: single-node' $(pwd)/elasticsearch/config/elasticsearch.yml
sed -i 's@-Xms1g@-Xms2g@' $(pwd)/elasticsearch/config/jvm.options
sed -i 's@-Xmx1g@-Xmx2g@' $(pwd)/elasticsearch/config/jvm.options


docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name elasticsearch -p 9200:9200 -p 9300:9300 -v $(pwd)/elasticsearch/config:/usr/share/elasticsearch/config elasticsearch:7.5.2


## run logstash

sed -i 's@-Xms1g@-Xms2g@' $(pwd)/logstash/config/jvm.options
sed -i 's@-Xmx1g@-Xmx2g@' $(pwd)/logstash/config/jvm.options
cp $(pwd)/logstash/config/logstash-sample.conf $(pwd)/logstash/config/logstash.conf
sed -i 's@http://localhost:9200@http://elasticsearch:9200@' $(pwd)/logstash/config/logstash.conf


sed -i "s@^xpack@#&@" $(pwd)/logstash/config/logstash.yml
sed -i "$alog.level: debug" $(pwd)/logstash/config/logstash.yml

docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name logstash -p 9600:9600 -p 5044:5044 -v $(pwd)/logstash/config:/usr/share/logstash/config logstash:7.5.2 logstash -f /usr/share/logstash/config/logstash.conf

## run kibana

sed -i 's@xpack.monitoring.ui.container.elasticsearch.enabled: true@xpack.monitoring.ui.container.elasticsearch.enabled: false@' $(pwd)/kibana/config/kibana.yml
sed -i '$ai18n.locale: zh-CN' $(pwd)/kibana/config/kibana.yml

docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name kibana -p 5601:5601 -v $(pwd)/kibana/config:/usr/share/kibana/config kibana:7.5.2


## run filebeat

cat >$(pwd)/filebeat/filebeat.yml <<EOL
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log

filebeat.config:
  modules:
    path: \${path.config}/modules.d/*.yml
    reload.enabled: false

output.logstash:
  bulk_max_size: 1024
  hosts: ["logstash:5044"]

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
EOL

docker run -itd -e TZ="Asia/shanghai" --privileged --user root --network my-elk --name filebeat \
-v /var/log:/var/log \
-v $(pwd)/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml elastic/filebeat:7.5.2 -e

## 

docker stop kibana
docker stop logstash
docker stop elasticsearch
docker stop filebeat

docker rm kibana
docker rm logstash
docker rm elasticsearch
docker rm filebeat

docker start kibana
docker start logstash
docker start elasticsearch
docker start filebeat

fle

// filebeat--(ssl)-->logstash-->elasticsearch


mkdir $(pwd)/pki
cd $(pwd)/pki

openssl genrsa -out root-ca.key 4096
openssl req -x509 -new -nodes -sha512 -days 18250 -subj "/C=CN/ST=Beijing/L=Beijing/O=example/OU=ELK/CN=rootCA" -key root-ca.key -out root-ca.crt


openssl genrsa -out logstash.key 4096
openssl req -sha512 -new -subj "/C=CN/ST=Beijing/L=Beijing/O=example/OU=ELK/CN=logstash" -key logstash.key -out logstash.csr

cat > v3.ext <<-EOF
authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
extendedKeyUsage = serverAuth,clientAuth
subjectAltName = @alt_names

[alt_names]
DNS.1=logstash
DNS.2=logstash.local
DNS.3=elasticsearch
DNS.4=elasticsearch.local
EOF

openssl x509 -req -sha512 -days 18250 -extfile v3.ext -CA root-ca.crt -CAkey root-ca.key -CAcreateserial -in logstash.csr -out logstash.crt
openssl genrsa -out filebeat.key 4096
openssl req -sha512 -new -subj "/C=CN/ST=Beijing/L=Beijing/O=example/OU=ELK/CN=logstash" -key filebeat.key -out filebeat.csr
openssl x509 -req -sha512 -days 18250 -extfile v3.ext -CA root-ca.crt -CAkey root-ca.key -CAcreateserial -in filebeat.csr -out filebeat.crt

openssl pkcs8 -topk8 -inform pem -in logstash.key -outform pem -nocrypt -out logstash-pkcs8.key


chown -R root:docker *.key
chown -R root:docker *.crt

cd ..


cat >$(pwd)/logstash/config/logstash.conf <<EOL
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate_authorities => ["/etc/pki/certs/root-ca.crt"]
    ssl_certificate => "/etc/pki/certs/logstash.crt"
    ssl_key => "/etc/pki/certs/logstash-pkcs8.key"
    ssl_verify_mode => "force_peer"
    client_inactivity_timeout => 36000
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}
EOL





cat >$(pwd)/filebeat/filebeat.yml <<EOL
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log

filebeat.config:
  modules:
    path: \${path.config}/modules.d/*.yml
    reload.enabled: false

output.logstash:
  bulk_max_size: 1024
  hosts: ["logstash:5044"]
  ssl.certificate_authorities: ["/etc/pki/certs/root-ca.crt"]
  ssl.certificate: "/etc/pki/certs/filebeat.crt"
  ssl.key: "/etc/pki/certs/filebeat.key"

logging.level: debug

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
EOL


docker stop logstash && docker rm logstash
docker stop filebeat && docker rm filebeat


docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name logstash \
-p 9600:9600 -p 5044:5044 \
-v $(pwd)/logstash/config:/usr/share/logstash/config \
-v $(pwd)/pki:/etc/pki/certs \
logstash:7.5.2 logstash -f /usr/share/logstash/config/logstash.conf


docker run -itd -e TZ="Asia/shanghai" --privileged --user root --network my-elk --name filebeat \
-v /var/log:/var/log \
-v $(pwd)/pki:/etc/pki/certs \
-v $(pwd)/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml \
elastic/filebeat:7.5.2 -e



docker exec -it filebeat filebeat test output
logstash: logstash:5044...
  connection...
    parse host... OK
    dns lookup... OK
    addresses: 172.19.0.2
    dial up... OK
  TLS...
    security: server's certificate chain verification is enabled
    handshake... OK
    TLS version: TLSv1.2
    dial up... OK
  talk to server... OK


docker exec -it logstash logstash-plugin list --verbose beats
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.headius.backport9.modules.Modules to method java.lang.Object.finalize()
WARNING: Please consider reporting this to the maintainers of com.headius.backport9.modules.Modules
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
logstash-input-beats (6.0.5)


docker exec -it logstash java --version
openjdk 11.0.5 2019-10-15 LTS
OpenJDK Runtime Environment 18.9 (build 11.0.5+10-LTS)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.5+10-LTS, mixed mode, sharing)


java.lang.IllegalArgumentException: File does not contain valid private key: /etc/pki/certs/logstash.key
	at io.netty.handler.ssl.SslContextBuilder.keyManager(SslContextBuilder.java:350) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslContextBuilder.forServer(SslContextBuilder.java:107) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at org.logstash.netty.SslSimpleBuilder.build(SslSimpleBuilder.java:105) ~[logstash-input-beats-6.0.5.jar:?]
	at org.logstash.beats.Server$BeatsInitializer.initChannel(Server.java:131) ~[logstash-input-beats-6.0.5.jar:?]
	at org.logstash.beats.Server$BeatsInitializer.initChannel(Server.java:101) [logstash-input-beats-6.0.5.jar:?]
	at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.ChannelInitializer.handlerAdded(ChannelInitializer.java:112) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:956) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.DefaultChannelPipeline.access$100(DefaultChannelPipeline.java:46) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute(DefaultChannelPipeline.java:1463) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers(DefaultChannelPipeline.java:1115) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded(DefaultChannelPipeline.java:650) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:502) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:417) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:474) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.44.Final.jar:4.1.44.Final]
	at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.security.spec.InvalidKeySpecException: Neither RSA, DSA nor EC worked
	at io.netty.handler.ssl.SslContext.getPrivateKeyFromByteBuffer(SslContext.java:1144) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslContext.toPrivateKey(SslContext.java:1113) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslContextBuilder.keyManager(SslContextBuilder.java:348) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	... 22 more
Caused by: java.security.spec.InvalidKeySpecException: java.security.InvalidKeyException: IOException : algid parse error, not a sequence
	at sun.security.ec.ECKeyFactory.engineGeneratePrivate(ECKeyFactory.java:169) ~[jdk.crypto.ec:?]
	at java.security.KeyFactory.generatePrivate(KeyFactory.java:390) ~[?:?]
	at io.netty.handler.ssl.SslContext.getPrivateKeyFromByteBuffer(SslContext.java:1142) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslContext.toPrivateKey(SslContext.java:1113) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslContextBuilder.keyManager(SslContextBuilder.java:348) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	... 22 more
Caused by: java.security.InvalidKeyException: IOException : algid parse error, not a sequence
	at sun.security.pkcs.PKCS8Key.decode(PKCS8Key.java:350) ~[?:?]
	at sun.security.pkcs.PKCS8Key.decode(PKCS8Key.java:355) ~[?:?]
	at sun.security.ec.ECPrivateKeyImpl.<init>(ECPrivateKeyImpl.java:74) ~[jdk.crypto.ec:?]
	at sun.security.ec.ECKeyFactory.implGeneratePrivate(ECKeyFactory.java:237) ~[jdk.crypto.ec:?]
	at sun.security.ec.ECKeyFactory.engineGeneratePrivate(ECKeyFactory.java:165) ~[jdk.crypto.ec:?]
	at java.security.KeyFactory.generatePrivate(KeyFactory.java:390) ~[?:?]
	at io.netty.handler.ssl.SslContext.getPrivateKeyFromByteBuffer(SslContext.java:1142) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslContext.toPrivateKey(SslContext.java:1113) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	at io.netty.handler.ssl.SslContextBuilder.keyManager(SslContextBuilder.java:348) ~[netty-all-4.1.44.Final.jar:4.1.44.Final]
	... 22 more

fkle

// filebeat-->kafka-->logstash-->elasticsearch

mkdir -p  $(pwd)/zoo{1,2,3}/data

cat >$(pwd)/zoo1/zoo.cfg <<EOL
tickTime=2000
dataDir=/data
clientPort=2181
initLimit=10
syncLimit=5
server.1=0.0.0.0:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888
EOL

cat >$(pwd)/zoo2/zoo.cfg <<EOL
tickTime=2000
dataDir=/data
clientPort=2181
initLimit=10
syncLimit=5
server.1=zoo1:2888:3888
server.2=0.0.0.0:2888:3888
server.3=zoo3:2888:3888
EOL

cat >$(pwd)/zoo3/zoo.cfg <<EOL
tickTime=2000
dataDir=/data
clientPort=2181
initLimit=10
syncLimit=5
server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=0.0.0.0:2888:3888
EOL


# TODO -v $(pwd)/zoo3/data:/data

docker run --network my-elk --name zoo1 -e ZOO_MY_ID=1 -e ZOO_LOG4J_PROP="INFO,CONSOLE" --restart always -d -v $(pwd)/zoo1/zoo.cfg:/conf/zoo.cfg zookeeper:3.4.14
docker run --network my-elk --name zoo2 -e ZOO_MY_ID=2 -e ZOO_LOG4J_PROP="INFO,CONSOLE" --restart always -d -v $(pwd)/zoo2/zoo.cfg:/conf/zoo.cfg zookeeper:3.4.14
docker run --network my-elk --name zoo3 -e ZOO_MY_ID=3 -e ZOO_LOG4J_PROP="INFO,CONSOLE" --restart always -d -v $(pwd)/zoo3/zoo.cfg:/conf/zoo.cfg zookeeper:3.4.14


docker exec -it zoo1 bash -c "zkCli.sh -server 127.0.0.1:2181 create /kafka null"
docker exec -it zoo1 bash -c "zkCli.sh -server 127.0.0.1:2181 ls /"
docker exec -it zoo2 bash -c "zkCli.sh -server 127.0.0.1:2181 ls /"
docker exec -it zoo3 bash -c "zkCli.sh -server 127.0.0.1:2181 ls /"
docker exec -it zoo3 bash -c "zkCli.sh -server 127.0.0.1:2181 ls /kafka"

[cluster, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

docker stop zoo1 zoo2 zoo3
docker rm zoo1 zoo2 zoo3

docker run -it --rm --network my-elk praqma/network-multitool bash -c "echo mntr |nc zoo2 2181"
docker run -it --rm --network my-elk praqma/network-multitool bash -c "echo stat |nc zoo1 2181"

$ docker pull wurstmeister/kafka:2.12-2.4.0

$ docker run -it --rm wurstmeister/kafka:2.12-2.4.0 bash -c "java -version && find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*' "

openjdk version "1.8.0_212"
OpenJDK Runtime Environment (IcedTea 3.12.0) (Alpine 8.212.04-r0)
OpenJDK 64-Bit Server VM (build 25.212-b04, mixed mode)
kafka_2.12-2.4.0

$ docker pull confluentinc/cp-kafka:5.4.0

$ docker run -it --rm confluentinc/cp-kafka:5.4.0 bash -c "java -version && find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*' "
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (Zulu 8.38.0.13-CA-linux64) (build 1.8.0_212-b04)
OpenJDK 64-Bit Server VM (Zulu 8.38.0.13-CA-linux64) (build 25.212-b04, mixed mode)
kafka/kafka_2.12-5.4.0-ccs-javadoc.jar

docker run -it --network my-elk \
--name kafka-tmp \
-e KAFKA_LISTENERS=INSIDE://:9092,OUTSIDE://:9094 \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT \
-e KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE \
-e KAFKA_ZOOKEEPER_CONNECT=zoo1:2181,zoo2:2181,zoo3:2181 \
-v /var/run/docker.sock:/var/run/docker.sock \
wurstmeister/kafka:2.12-2.4.0


docker exec -it kafka-tmp bash -c "cat /opt/kafka/config/server.properties" | grep -v -e '^\s*#' -e '^\s*$'
broker.id=-1
listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-3448717d3a95
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181/kafka
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
port=9092




docker exec -it kafka-tmp bash -c "netstat -tunlp"
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 0.0.0.0:37567           0.0.0.0:*               LISTEN      1/java
tcp        0      0 0.0.0.0:9092            0.0.0.0:*               LISTEN      1/java
tcp        0      0 0.0.0.0:9094            0.0.0.0:*               LISTEN      1/java
tcp        0      0 127.0.0.11:37451        0.0.0.0:*               LISTEN      -
udp        0      0 127.0.0.11:35996        0.0.0.0:*                           -
[Configuring] 'inter.broker.listener.name' in '/opt/kafka/config/server.properties'
Excluding KAFKA_HOME from broker config
[Configuring] 'port' in '/opt/kafka/config/server.properties'
[Configuring] 'listener.security.protocol.map' in '/opt/kafka/config/server.properties'
[Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
Excluding KAFKA_VERSION from broker config
[Configuring] 'listeners' in '/opt/kafka/config/server.properties'
[Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
[Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
[2020-02-08 05:20:38,752] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-02-08 05:20:39,281] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2020-02-08 05:20:39,282] INFO starting (kafka.server.KafkaServer)
[2020-02-08 05:20:39,283] INFO Connecting to zookeeper on zoo1:2181,zoo2:2181,zoo3:2181 (kafka.server.KafkaServer)
[2020-02-08 05:20:39,301] INFO [ZooKeeperClient Kafka server] Initializing a new session to zoo1:2181,zoo2:2181,zoo3:2181. (kafka.zookeeper.ZooKeeperClient)
[2020-02-08 05:20:39,306] INFO Client environment:zookeeper.version=3.5.6-c11b7e26bc554b8523dc929761dd28808913f091, built on 10/08/2019 20:18 GMT (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:host.name=3448717d3a95 (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:java.version=1.8.0_212 (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:java.vendor=IcedTea (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:java.home=/usr/lib/jvm/java-1.8-openjdk/jre (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:java.class.path=/opt/kafka/bin/../libs/activation-1.1.1.jar:/opt/kafka/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/bin/../libs/commons-cli-1.4.jar:/opt/kafka/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/bin/../libs/connect-api-2.4.0.jar:/opt/kafka/bin/../libs/connect-basic-auth-extension-2.4.0.jar:/opt/kafka/bin/../libs/connect-file-2.4.0.jar:/opt/kafka/bin/../libs/connect-json-2.4.0.jar:/opt/kafka/bin/../libs/connect-mirror-2.4.0.jar:/opt/kafka/bin/../libs/connect-mirror-client-2.4.0.jar:/opt/kafka/bin/../libs/connect-runtime-2.4.0.jar:/opt/kafka/bin/../libs/connect-transforms-2.4.0.jar:/opt/kafka/bin/../libs/guava-20.0.jar:/opt/kafka/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/bin/../libs/jakarta.activation-api-1.2.1.jar:/opt/kafka/bin/../libs/jakarta.annotation-api-1.3.4.jar:/opt/kafka/bin/../libs/jakarta.inject-2.5.0.jar:/opt/kafka/bin/../libs/jakarta.ws.rs-api-2.1.5.jar:/opt/kafka/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/opt/kafka/bin/../libs/javassist-3.22.0-CR2.jar:/opt/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/opt/kafka/bin/../libs/jaxb-api-2.3.0.jar:/opt/kafka/bin/../libs/jersey-client-2.28.jar:/opt/kafka/bin/../libs/jersey-common-2.28.jar:/opt/kafka/bin/../libs/jersey-container-servlet-2.28.jar:/opt/kafka/bin/../libs/jersey-container-servlet-core-2.28.jar:/opt/kafka/bin/../libs/jersey-hk2-2.28.jar:/opt/kafka/bin/../libs/jersey-media-jaxb-2.28.jar:/opt/kafka/bin/../libs/jersey-server-2.28.jar:/opt/kafka/bin/../libs/jetty-client-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-continuation-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-http-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-io-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-security-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-server-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-servlet-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-servlets-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jetty-util-9.4.20.v20190813.jar:/opt/kafka/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/bin/../libs/kafka-clients-2.4.0.jar:/opt/kafka/bin/../libs/kafka-log4j-appender-2.4.0.jar:/opt/kafka/bin/../libs/kafka-streams-2.4.0.jar:/opt/kafka/bin/../libs/kafka-streams-examples-2.4.0.jar:/opt/kafka/bin/../libs/kafka-streams-scala_2.12-2.4.0.jar:/opt/kafka/bin/../libs/kafka-streams-test-utils-2.4.0.jar:/opt/kafka/bin/../libs/kafka-tools-2.4.0.jar:/opt/kafka/bin/../libs/kafka_2.12-2.4.0-sources.jar:/opt/kafka/bin/../libs/kafka_2.12-2.4.0.jar:/opt/kafka/bin/../libs/log4j-1.2.17.jar:/opt/kafka/bin/../libs/lz4-java-1.6.0.jar:/opt/kafka/bin/../libs/maven-artifact-3.6.1.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/netty-buffer-4.1.42.Final.jar:/opt/kafka/bin/../libs/netty-codec-4.1.42.Final.jar:/opt/kafka/bin/../libs/netty-common-4.1.42.Final.jar:/opt/kafka/bin/../libs/netty-handler-4.1.42.Final.jar:/opt/kafka/bin/../libs/netty-resolver-4.1.42.Final.jar:/opt/kafka/bin/../libs/netty-transport-4.1.42.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-epoll-4.1.42.Final.jar:/opt/kafka/bin/../libs/netty-transport-native-unix-common-4.1.42.Final.jar:/opt/kafka/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka/bin/../libs/paranamer-2.8.jar:/opt/kafka/bin/../libs/plexus-utils-3.2.0.jar:/opt/kafka/bin/../libs/reflections-0.9.11.jar:/opt/kafka/bin/../libs/rocksdbjni-5.18.3.jar:/opt/kafka/bin/../libs/scala-collection-compat_2.12-2.1.2.jar:/opt/kafka/bin/../libs/scala-java8-compat_2.12-0.9.0.jar:/opt/kafka/bin/../libs/scala-library-2.12.10.jar:/opt/kafka/bin/../libs/scala-logging_2.12-3.9.2.jar:/opt/kafka/bin/../libs/scala-reflect-2.12.10.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.28.jar:/opt/kafka/bin/../libs/slf4j-log4j12-1.7.28.jar:/opt/kafka/bin/../libs/snappy-java-1.1.7.3.jar:/opt/kafka/bin/../libs/validation-api-2.0.1.Final.jar:/opt/kafka/bin/../libs/zookeeper-3.5.6.jar:/opt/kafka/bin/../libs/zookeeper-jute-3.5.6.jar:/opt/kafka/bin/../libs/zstd-jni-1.4.3-1.jar (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:java.library.path=/usr/lib/jvm/java-1.8-openjdk/jre/lib/amd64/server:/usr/lib/jvm/java-1.8-openjdk/jre/lib/amd64:/usr/lib/jvm/java-1.8-openjdk/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:os.version=4.19.95 (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,306] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,307] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,307] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,307] INFO Client environment:os.memory.free=979MB (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,307] INFO Client environment:os.memory.max=1024MB (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,307] INFO Client environment:os.memory.total=1024MB (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,309] INFO Initiating client connection, connectString=zoo1:2181,zoo2:2181,zoo3:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@3427b02d (org.apache.zookeeper.ZooKeeper)
[2020-02-08 05:20:39,314] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util)
[2020-02-08 05:20:39,319] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2020-02-08 05:20:39,325] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn)
[2020-02-08 05:20:39,327] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2020-02-08 05:20:39,330] INFO Opening socket connection to server zoo3/172.19.0.8:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-02-08 05:20:39,335] INFO Socket connection established, initiating session, client: /172.19.0.9:34738, server: zoo3/172.19.0.8:2181 (org.apache.zookeeper.ClientCnxn)
[2020-02-08 05:20:39,372] INFO Session establishment complete on server zoo3/172.19.0.8:2181, sessionid = 0x300039611ff0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2020-02-08 05:20:39,376] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2020-02-08 05:20:39,743] INFO Cluster ID = PxzeWiM2Q2ewvIap-LRVqQ (kafka.server.KafkaServer)
[2020-02-08 05:20:39,746] WARN No meta.properties file under dir /kafka/kafka-logs-3448717d3a95/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-02-08 05:20:39,810] INFO KafkaConfig values:
	advertised.host.name = null
	advertised.listeners = null
	advertised.port = null
	alter.config.policy.class.name = null
	alter.log.dirs.replication.quota.window.num = 11
	alter.log.dirs.replication.quota.window.size.seconds = 1
	authorizer.class.name =
	auto.create.topics.enable = true
	auto.leader.rebalance.enable = true
	background.threads = 10
	broker.id = -1
	broker.id.generation.enable = true
	broker.rack = null
	client.quota.callback.class = null
	compression.type = producer
	connection.failed.authentication.delay.ms = 100
	connections.max.idle.ms = 600000
	connections.max.reauth.ms = 0
	control.plane.listener.name = null
	controlled.shutdown.enable = true
	controlled.shutdown.max.retries = 3
	controlled.shutdown.retry.backoff.ms = 5000
	controller.socket.timeout.ms = 30000
	create.topic.policy.class.name = null
	default.replication.factor = 1
	delegation.token.expiry.check.interval.ms = 3600000
	delegation.token.expiry.time.ms = 86400000
	delegation.token.master.key = null
	delegation.token.max.lifetime.ms = 604800000
	delete.records.purgatory.purge.interval.requests = 1
	delete.topic.enable = true
	fetch.purgatory.purge.interval.requests = 1000
	group.initial.rebalance.delay.ms = 0
	group.max.session.timeout.ms = 1800000
	group.max.size = 2147483647
	group.min.session.timeout.ms = 6000
	host.name =
	inter.broker.listener.name = INSIDE
	inter.broker.protocol.version = 2.4-IV1
	kafka.metrics.polling.interval.secs = 10
	kafka.metrics.reporters = []
	leader.imbalance.check.interval.seconds = 300
	leader.imbalance.per.broker.percentage = 10
	listener.security.protocol.map = INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
	listeners = INSIDE://:9092,OUTSIDE://:9094
	log.cleaner.backoff.ms = 15000
	log.cleaner.dedupe.buffer.size = 134217728
	log.cleaner.delete.retention.ms = 86400000
	log.cleaner.enable = true
	log.cleaner.io.buffer.load.factor = 0.9
	log.cleaner.io.buffer.size = 524288
	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
	log.cleaner.max.compaction.lag.ms = 9223372036854775807
	log.cleaner.min.cleanable.ratio = 0.5
	log.cleaner.min.compaction.lag.ms = 0
	log.cleaner.threads = 1
	log.cleanup.policy = [delete]
	log.dir = /tmp/kafka-logs
	log.dirs = /kafka/kafka-logs-3448717d3a95
	log.flush.interval.messages = 9223372036854775807
	log.flush.interval.ms = null
	log.flush.offset.checkpoint.interval.ms = 60000
	log.flush.scheduler.interval.ms = 9223372036854775807
	log.flush.start.offset.checkpoint.interval.ms = 60000
	log.index.interval.bytes = 4096
	log.index.size.max.bytes = 10485760
	log.message.downconversion.enable = true
	log.message.format.version = 2.4-IV1
	log.message.timestamp.difference.max.ms = 9223372036854775807
	log.message.timestamp.type = CreateTime
	log.preallocate = false
	log.retention.bytes = -1
	log.retention.check.interval.ms = 300000
	log.retention.hours = 168
	log.retention.minutes = null
	log.retention.ms = null
	log.roll.hours = 168
	log.roll.jitter.hours = 0
	log.roll.jitter.ms = null
	log.roll.ms = null
	log.segment.bytes = 1073741824
	log.segment.delete.delay.ms = 60000
	max.connections = 2147483647
	max.connections.per.ip = 2147483647
	max.connections.per.ip.overrides =
	max.incremental.fetch.session.cache.slots = 1000
	message.max.bytes = 1000012
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	min.insync.replicas = 1
	num.io.threads = 8
	num.network.threads = 3
	num.partitions = 1
	num.recovery.threads.per.data.dir = 1
	num.replica.alter.log.dirs.threads = null
	num.replica.fetchers = 1
	offset.metadata.max.bytes = 4096
	offsets.commit.required.acks = -1
	offsets.commit.timeout.ms = 5000
	offsets.load.buffer.size = 5242880
	offsets.retention.check.interval.ms = 600000
	offsets.retention.minutes = 10080
	offsets.topic.compression.codec = 0
	offsets.topic.num.partitions = 50
	offsets.topic.replication.factor = 1
	offsets.topic.segment.bytes = 104857600
	password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
	password.encoder.iterations = 4096
	password.encoder.key.length = 128
	password.encoder.keyfactory.algorithm = null
	password.encoder.old.secret = null
	password.encoder.secret = null
	port = 9092
	principal.builder.class = null
	producer.purgatory.purge.interval.requests = 1000
	queued.max.request.bytes = -1
	queued.max.requests = 500
	quota.consumer.default = 9223372036854775807
	quota.producer.default = 9223372036854775807
	quota.window.num = 11
	quota.window.size.seconds = 1
	replica.fetch.backoff.ms = 1000
	replica.fetch.max.bytes = 1048576
	replica.fetch.min.bytes = 1
	replica.fetch.response.max.bytes = 10485760
	replica.fetch.wait.max.ms = 500
	replica.high.watermark.checkpoint.interval.ms = 5000
	replica.lag.time.max.ms = 10000
	replica.selector.class = null
	replica.socket.receive.buffer.bytes = 65536
	replica.socket.timeout.ms = 30000
	replication.quota.window.num = 11
	replication.quota.window.size.seconds = 1
	request.timeout.ms = 30000
	reserved.broker.max.id = 1000
	sasl.client.callback.handler.class = null
	sasl.enabled.mechanisms = [GSSAPI]
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.principal.to.local.rules = [DEFAULT]
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism.inter.broker.protocol = GSSAPI
	sasl.server.callback.handler.class = null
	security.inter.broker.protocol = PLAINTEXT
	security.providers = null
	socket.receive.buffer.bytes = 102400
	socket.request.max.bytes = 104857600
	socket.send.buffer.bytes = 102400
	ssl.cipher.suites = []
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.principal.mapping.rules = DEFAULT
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
	transaction.max.timeout.ms = 900000
	transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
	transaction.state.log.load.buffer.size = 5242880
	transaction.state.log.min.isr = 1
	transaction.state.log.num.partitions = 50
	transaction.state.log.replication.factor = 1
	transaction.state.log.segment.bytes = 104857600
	transactional.id.expiration.ms = 604800000
	unclean.leader.election.enable = false
	zookeeper.connect = zoo1:2181,zoo2:2181,zoo3:2181
	zookeeper.connection.timeout.ms = 6000
	zookeeper.max.in.flight.requests = 10
	zookeeper.session.timeout.ms = 6000
	zookeeper.set.acl = false
	zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2020-02-08 05:20:39,824] INFO KafkaConfig values:
	advertised.host.name = null
	advertised.listeners = null
	advertised.port = null
	alter.config.policy.class.name = null
	alter.log.dirs.replication.quota.window.num = 11
	alter.log.dirs.replication.quota.window.size.seconds = 1
	authorizer.class.name =
	auto.create.topics.enable = true
	auto.leader.rebalance.enable = true
	background.threads = 10
	broker.id = -1
	broker.id.generation.enable = true
	broker.rack = null
	client.quota.callback.class = null
	compression.type = producer
	connection.failed.authentication.delay.ms = 100
	connections.max.idle.ms = 600000
	connections.max.reauth.ms = 0
	control.plane.listener.name = null
	controlled.shutdown.enable = true
	controlled.shutdown.max.retries = 3
	controlled.shutdown.retry.backoff.ms = 5000
	controller.socket.timeout.ms = 30000
	create.topic.policy.class.name = null
	default.replication.factor = 1
	delegation.token.expiry.check.interval.ms = 3600000
	delegation.token.expiry.time.ms = 86400000
	delegation.token.master.key = null
	delegation.token.max.lifetime.ms = 604800000
	delete.records.purgatory.purge.interval.requests = 1
	delete.topic.enable = true
	fetch.purgatory.purge.interval.requests = 1000
	group.initial.rebalance.delay.ms = 0
	group.max.session.timeout.ms = 1800000
	group.max.size = 2147483647
	group.min.session.timeout.ms = 6000
	host.name =
	inter.broker.listener.name = INSIDE
	inter.broker.protocol.version = 2.4-IV1
	kafka.metrics.polling.interval.secs = 10
	kafka.metrics.reporters = []
	leader.imbalance.check.interval.seconds = 300
	leader.imbalance.per.broker.percentage = 10
	listener.security.protocol.map = INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
	listeners = INSIDE://:9092,OUTSIDE://:9094
	log.cleaner.backoff.ms = 15000
	log.cleaner.dedupe.buffer.size = 134217728
	log.cleaner.delete.retention.ms = 86400000
	log.cleaner.enable = true
	log.cleaner.io.buffer.load.factor = 0.9
	log.cleaner.io.buffer.size = 524288
	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
	log.cleaner.max.compaction.lag.ms = 9223372036854775807
	log.cleaner.min.cleanable.ratio = 0.5
	log.cleaner.min.compaction.lag.ms = 0
	log.cleaner.threads = 1
	log.cleanup.policy = [delete]
	log.dir = /tmp/kafka-logs
	log.dirs = /kafka/kafka-logs-3448717d3a95
	log.flush.interval.messages = 9223372036854775807
	log.flush.interval.ms = null
	log.flush.offset.checkpoint.interval.ms = 60000
	log.flush.scheduler.interval.ms = 9223372036854775807
	log.flush.start.offset.checkpoint.interval.ms = 60000
	log.index.interval.bytes = 4096
	log.index.size.max.bytes = 10485760
	log.message.downconversion.enable = true
	log.message.format.version = 2.4-IV1
	log.message.timestamp.difference.max.ms = 9223372036854775807
	log.message.timestamp.type = CreateTime
	log.preallocate = false
	log.retention.bytes = -1
	log.retention.check.interval.ms = 300000
	log.retention.hours = 168
	log.retention.minutes = null
	log.retention.ms = null
	log.roll.hours = 168
	log.roll.jitter.hours = 0
	log.roll.jitter.ms = null
	log.roll.ms = null
	log.segment.bytes = 1073741824
	log.segment.delete.delay.ms = 60000
	max.connections = 2147483647
	max.connections.per.ip = 2147483647
	max.connections.per.ip.overrides =
	max.incremental.fetch.session.cache.slots = 1000
	message.max.bytes = 1000012
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	min.insync.replicas = 1
	num.io.threads = 8
	num.network.threads = 3
	num.partitions = 1
	num.recovery.threads.per.data.dir = 1
	num.replica.alter.log.dirs.threads = null
	num.replica.fetchers = 1
	offset.metadata.max.bytes = 4096
	offsets.commit.required.acks = -1
	offsets.commit.timeout.ms = 5000
	offsets.load.buffer.size = 5242880
	offsets.retention.check.interval.ms = 600000
	offsets.retention.minutes = 10080
	offsets.topic.compression.codec = 0
	offsets.topic.num.partitions = 50
	offsets.topic.replication.factor = 1
	offsets.topic.segment.bytes = 104857600
	password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
	password.encoder.iterations = 4096
	password.encoder.key.length = 128
	password.encoder.keyfactory.algorithm = null
	password.encoder.old.secret = null
	password.encoder.secret = null
	port = 9092
	principal.builder.class = null
	producer.purgatory.purge.interval.requests = 1000
	queued.max.request.bytes = -1
	queued.max.requests = 500
	quota.consumer.default = 9223372036854775807
	quota.producer.default = 9223372036854775807
	quota.window.num = 11
	quota.window.size.seconds = 1
	replica.fetch.backoff.ms = 1000
	replica.fetch.max.bytes = 1048576
	replica.fetch.min.bytes = 1
	replica.fetch.response.max.bytes = 10485760
	replica.fetch.wait.max.ms = 500
	replica.high.watermark.checkpoint.interval.ms = 5000
	replica.lag.time.max.ms = 10000
	replica.selector.class = null
	replica.socket.receive.buffer.bytes = 65536
	replica.socket.timeout.ms = 30000
	replication.quota.window.num = 11
	replication.quota.window.size.seconds = 1
	request.timeout.ms = 30000
	reserved.broker.max.id = 1000
	sasl.client.callback.handler.class = null
	sasl.enabled.mechanisms = [GSSAPI]
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.principal.to.local.rules = [DEFAULT]
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism.inter.broker.protocol = GSSAPI
	sasl.server.callback.handler.class = null
	security.inter.broker.protocol = PLAINTEXT
	security.providers = null
	socket.receive.buffer.bytes = 102400
	socket.request.max.bytes = 104857600
	socket.send.buffer.bytes = 102400
	ssl.cipher.suites = []
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.principal.mapping.rules = DEFAULT
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
	transaction.max.timeout.ms = 900000
	transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
	transaction.state.log.load.buffer.size = 5242880
	transaction.state.log.min.isr = 1
	transaction.state.log.num.partitions = 50
	transaction.state.log.replication.factor = 1
	transaction.state.log.segment.bytes = 104857600
	transactional.id.expiration.ms = 604800000
	unclean.leader.election.enable = false
	zookeeper.connect = zoo1:2181,zoo2:2181,zoo3:2181
	zookeeper.connection.timeout.ms = 6000
	zookeeper.max.in.flight.requests = 10
	zookeeper.session.timeout.ms = 6000
	zookeeper.set.acl = false
	zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2020-02-08 05:20:39,846] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2020-02-08 05:20:39,847] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2020-02-08 05:20:39,847] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2020-02-08 05:20:39,868] INFO Log directory /kafka/kafka-logs-3448717d3a95 not found, creating it. (kafka.log.LogManager)
[2020-02-08 05:20:39,874] INFO Loading logs. (kafka.log.LogManager)
[2020-02-08 05:20:39,880] INFO Logs loading complete in 6 ms. (kafka.log.LogManager)
[2020-02-08 05:20:39,893] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2020-02-08 05:20:39,913] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2020-02-08 05:20:40,271] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2020-02-08 05:20:40,305] INFO [SocketServer brokerId=1001] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(INSIDE),PLAINTEXT) (kafka.network.SocketServer)
[2020-02-08 05:20:40,306] INFO Awaiting socket connections on 0.0.0.0:9094. (kafka.network.Acceptor)
[2020-02-08 05:20:40,314] INFO [SocketServer brokerId=1001] Created data-plane acceptor and processors for endpoint : EndPoint(null,9094,ListenerName(OUTSIDE),PLAINTEXT) (kafka.network.SocketServer)
[2020-02-08 05:20:40,315] INFO [SocketServer brokerId=1001] Started 2 acceptor threads for data-plane (kafka.network.SocketServer)
[2020-02-08 05:20:40,332] INFO [ExpirationReaper-1001-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,333] INFO [ExpirationReaper-1001-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,334] INFO [ExpirationReaper-1001-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,334] INFO [ExpirationReaper-1001-ElectLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,347] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2020-02-08 05:20:40,366] INFO Creating /brokers/ids/1001 (is it secure? false) (kafka.zk.KafkaZkClient)
[2020-02-08 05:20:40,394] INFO Stat of the created znode at /brokers/ids/1001 is: 4294967321,4294967321,1581139240379,1581139240379,1,0,0,216176725195685888,240,0,4294967321
 (kafka.zk.KafkaZkClient)
[2020-02-08 05:20:40,394] INFO Registered broker 1001 at path /brokers/ids/1001 with addresses: ArrayBuffer(EndPoint(3448717d3a95,9092,ListenerName(INSIDE),PLAINTEXT), EndPoint(3448717d3a95,9094,ListenerName(OUTSIDE),PLAINTEXT)), czxid (broker epoch): 4294967321 (kafka.zk.KafkaZkClient)
[2020-02-08 05:20:40,456] INFO [ExpirationReaper-1001-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,463] INFO [ExpirationReaper-1001-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,464] INFO [ExpirationReaper-1001-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,492] INFO [GroupCoordinator 1001]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2020-02-08 05:20:40,493] INFO [GroupCoordinator 1001]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2020-02-08 05:20:40,495] INFO Successfully created /controller_epoch with initial epoch 0 (kafka.zk.KafkaZkClient)
[2020-02-08 05:20:40,499] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 4 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-02-08 05:20:40,512] INFO [ProducerId Manager 1001]: Acquired new producerId block (brokerId:1001,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2020-02-08 05:20:40,532] INFO [TransactionCoordinator id=1001] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-02-08 05:20:40,534] INFO [TransactionCoordinator id=1001] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-02-08 05:20:40,536] INFO [Transaction Marker Channel Manager 1001]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-02-08 05:20:40,558] INFO [ExpirationReaper-1001-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-02-08 05:20:40,575] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2020-02-08 05:20:40,649] INFO [SocketServer brokerId=1001] Started data-plane processors for 2 acceptors (kafka.network.SocketServer)
[2020-02-08 05:20:40,652] INFO Kafka version: 2.4.0 (org.apache.kafka.common.utils.AppInfoParser)
[2020-02-08 05:20:40,652] INFO Kafka commitId: 77a89fcf8d7fa018 (org.apache.kafka.common.utils.AppInfoParser)
[2020-02-08 05:20:40,653] INFO Kafka startTimeMs: 1581139240649 (org.apache.kafka.common.utils.AppInfoParser)
[2020-02-08 05:20:40,653] INFO [KafkaServer id=1001] started (kafka.server.KafkaServer)
# filebeat.yml
#output.logstash:
output.kafka:
  enabled: true 
  hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]
  topic: beattest


cat >$(pwd)/logstash/config/logstash-kafka.conf <<EOL
input {
  kafka{
    bootstrap_servers => ["kafka1:9092,kafka2:9092,kafka3:9092"]
    group_id => "baicai"
    auto_offset_reset => "earliest"
    consumer_threads => "5"
    decorate_events => "false"
    topics => ["beattest"]
    codec => json
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "logstash-kafka-%{+YYYY.MM.dd}"
  }
}
EOL



docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name logstash -p 9600:9600 -p 5044:5044 -v $(pwd)/logstash/config:/usr/share/logstash/config logstash:7.5.2 logstash -f /usr/share/logstash/config/logstash-kafka.conf

## run  another logstash instance
docker run -itd -e TZ="Asia/shanghai" --privileged --network my-elk --name logstash1 -p 19600:9600 -p 15044:5044 -v $(pwd)/logstash/config:/usr/share/logstash/config logstash:7.5.2 logstash -f /usr/share/logstash/config/logstash-kafka.conf


mkdir -p $(pwd)/kafka{1,2,3}/log
docker exec -it kafka-tmp bash -c "cat /opt/kafka/config/server.properties" | grep -v -e '^\s*#' -e '^\s*$' > $(pwd)/kafka1/server.properties && docker exec -it kafka-tmp bash -c "cat /opt/kafka/config/server.properties" | grep -v -e '^\s*#' -e '^\s*$' > $(pwd)/kafka2/server.properties && docker exec -it kafka-tmp bash -c "cat /opt/kafka/config/server.properties" | grep -v -e '^\s*#' -e '^\s*$' > $(pwd)/kafka3/server.properties


sed -i 's@broker.id=-1@broker.id=101@' $(pwd)/kafka1/server.properties
sed -i 's@broker.id=-1@broker.id=102@' $(pwd)/kafka2/server.properties
sed -i 's@broker.id=-1@broker.id=103@' $(pwd)/kafka3/server.properties


docker run -itd --network my-elk --name kafka1 -p 10192:9092 -v $(pwd)/kafka1/server.properties:/opt/kafka/config/server.properties -v /var/run/docker.sock:/var/run/docker.sock --entrypoint "" wurstmeister/kafka:2.12-2.4.0 bash -e -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties'
docker run -itd --network my-elk --name kafka2 -p 10292:9092 -v $(pwd)/kafka2/server.properties:/opt/kafka/config/server.properties -v /var/run/docker.sock:/var/run/docker.sock --entrypoint "" wurstmeister/kafka:2.12-2.4.0 bash -e -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties'
docker run -itd --network my-elk --name kafka3 -p 10392:9092 -v $(pwd)/kafka3/server.properties:/opt/kafka/config/server.properties -v /var/run/docker.sock:/var/run/docker.sock --entrypoint "" wurstmeister/kafka:2.12-2.4.0 bash -e -c '/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties'



# kafka创建主题
docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --create --partitions 3 --replication-factor 3 --topic test"
# kafka查看主题
docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --list"
# kafka查看主题详情
docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --describe --topic test"
# kafka修改主题
docker exec -it kafka1 bash -c "kafka-topics.sh --bootstrap-server kafka1:9092 --alter --topic test --partitions 5"
# kafka生产消息
docker run -it --rm --network my-elk wurstmeister/kafka:2.12-2.4.0 kafka-console-producer.sh --broker-list kafka1:9092 --topic test
# kafka消费消息
docker run -it --rm --network my-elk wurstmeister/kafka:2.12-2.4.0 kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server kafka1:9092 --group console-group

kafka-broker

  • broker.id: broker的唯一标识符,集群环境该值不可重复
  • log.dirs: 一个用逗号分隔的目录列表,可以有多个,用来为Kafka存储数据
  • zookeeper.connect:zookeeper访问地址,多个地址用’,’隔开
  • message.max.bytes: server能接收的一条消息的最大的大小
  • https://kafka.apache.org/documentation/#brokerconfigs

kafka-producer

kafka-consumer

[2020-02-11 10:36:21,340] INFO Cluster ID = mJ_AYihBTnyQkXlrslbEJw (kafka.server.KafkaServer)
[2020-02-11 10:36:21,351] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka.common.InconsistentClusterIdException: The Cluster ID mJ_AYihBTnyQkXlrslbEJw doesn't match stored clusterId Some(PxzeWiM2Q2ewvIap-LRVqQ) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
	at kafka.server.KafkaServer.startup(KafkaServer.scala:220)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
	at kafka.Kafka$.main(Kafka.scala:84)
	at kafka.Kafka.main(Kafka.scala)


docker exec -it zoo1 bash -c "zkCli.sh get /kafka/cluster/id"


{"version":"1","id":"mJ_AYihBTnyQkXlrslbEJw"}
cZxid = 0x10000005b
ctime = Tue Feb 11 10:36:21 UTC 2020
mZxid = 0x10000005b
mtime = Tue Feb 11 10:36:21 UTC 2020
pZxid = 0x10000005b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 45
numChildren = 0



~/kafka-logs/meta.properties
#
#Sat Feb 08 05:20:40 GMT 2020
cluster.id=PxzeWiM2Q2ewvIap-LRVqQ
version=0
broker.id=1001

ref