Project

General

Profile

Task #10

Updated by Fernando Jose Capeletto Neto over 1 year ago

Migrate Kafka Broker to Local Server : 
 * to enhance Performance (since CMS components are in the same network) (to be verified) 
 * The trial period for Kafka broker at Confluence will end in Apr 10th. 

 == Testing ==  

 * Worked with docker compose configuration below. 

 <pre> 
 version: '2' 

 services: 
   zookeeper: 
     image: wurstmeister/zookeeper 
     container_name: zookeeper 
     expose: 
       - "2181" 

   kafka: 
     image: wurstmeister/kafka 
     container_name: kafka 
     ports: 
       - "9092:9092" 
     expose: 
       - "9093" 
     environment: 
       KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,EXTERNAL://lab.fernando.engineer:9092 
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT 
       KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 
       KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE 
       KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" 
       KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" 
       KAFKA_SASL_ENABLED_MECHANISMS: PLAIN 
       KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN 
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" 
       KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000 
       LOG4J_LOGGER_KAFKA: WARN LOG4J_LOGGER_KAFKA=WARN 
       LOG4J_LOGGER_ORG_APACHE_KAFKA: WARN LOG4J_LOGGER_ORG_APACHE_KAFKA=WARN 
     depends_on: 
       - zookeeper 
     volumes: 
       - /var/www/vhosts/kafka:/etc/kafka 
 </pre> 

 <pre> 
 [root@node1 kafka]# cat kafka_server_jaas.conf  
 KafkaServer { 
   org.apache.kafka.common.security.plain.PlainLoginModule required 
   username="admin" 
   password="admin-secret" 
   user_admin="admin-secret"; 
 }; 
 Client {}; 
 </pre> 

 * Auth worked (above logs error when not using correct user/pass) 
 * Topic configuration added as Beans direct on application. 
 * Had to set allow topic creation (bad point but with working authentication is safe) 
 * Reference for configuring Auth: https://habr.com/en/articles/529222/ 

 == Example of Bad User/Pass Error =  

 On the Kafka Server: 
 <pre> 
 kafka          | [2023-04-06 05:56:13,788] INFO [GroupCoordinator 1001]: Group trackhandler-group with generation 2 is now empty (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator) 
 kafka          | [2023-04-06 05:57:40,541] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /45.29.40.136 (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) 
 kafka          | [2023-04-06 05:57:41,486] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /45.29.40.136 (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) 
 kafka          | [2023-04-06 05:57:42,414] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /45.29.40.136 (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) 
 kafka          | [2023-04-06 05:57:43,823] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1001] Failed authentication with /45.29.40.136 (Authentication failed: Invalid username or password) (org.apache.kafka.common.network.Selector) 
 </pre>  

 On the app: 
 <pre> 
 [2023-04-06 00:57:59.718] ERROR [o.a.k.c.NetworkClient] [663]: [Producer clientId=TrackHandler] Connection to node -1 failed authentication due to: Authentication failed: Invalid username or password 
 [2023-04-06 00:58:01.045] ERROR [o.a.k.c.NetworkClient] [663]: [Producer clientId=TrackHandler] Connection to node -1 failed authentication due to: Authentication failed: Invalid username or passwor 
 </pre> 

 === Other config tested === 

 * Reference: https://github.com/conduktor/kafka-stack-docker-compose (Plan B, topic configuration in docker-compose file but missing auth) 
 * Missing fix 'host.docker.internal issue', for now workaround using 'internal' listener. 
 ** See (https://stackoverflow.com/questions/48546124/what-is-linux-equivalent-of-host-docker-internal/48547074#48547074) 
 * https://www.baeldung.com/spring-boot-kafka-ssl. (not implemented) 
 ** (Pending investigate to create need keystore and mentioned cert / truststore and credential files) 
 ** https://docs.oracle.com/cd/E19509-01/820-3503/ggsxx/index.html 

 == Plan B Docker Compose File also tested == 
 <pre> 
 version: '2.1' 

 services: 
   zoo1: 
     image: confluentinc/cp-zookeeper:7.3.2 
     hostname: zoo1 
     container_name: zoo1 
     ports: 
       - "2181:2181" 
     environment: 
       ZOOKEEPER_CLIENT_PORT: 2181 
       ZOOKEEPER_SERVER_ID: 1 
       ZOOKEEPER_SERVERS: zoo1:2888:3888 

   kafka1: 
     image: confluentinc/cp-kafka:7.3.2 
     hostname: kafka1 
     container_name: kafka1 
     ports: 
       - "9092:9092" 
       - "9999:9999" 
       - "29092:29092" 
     expose: 
       - "29092" 
     environment: 
       KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://lab.fernando.engineer:9092,DOCKER://host.docker.internal:29092 
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT 
       KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL 
       KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" 
       KAFKA_BROKER_ID: 1 
       KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" 
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 
       KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 
       KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 
       KAFKA_JMX_PORT: 9999 
       KAFKA_JMX_HOSTNAME: lab.fernando.engineer 
       KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer 
       KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" 
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" 
       KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 1000 
     depends_on: 
       - zoo1 

   init-kafka: 
       image: confluentinc/cp-kafka:7.3.2 
       depends_on: 
           - kafka1 
       volumes: 
         - /var/www/vhosts/kafka/log:/log 
       entrypoint: [ '/bin/sh', '-c' ] 
       command: | 
           " 
           # blocks until kafka is reachable 
           kafka-topics --bootstrap-server kafka1:19092 --list 
           echo -e 'Creating kafka topics for lab.fernando.engineer' 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic AISMESSAGE_TOPIC --replication-factor 1 --partitions 1 --config retention.ms=3000 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic TACTICAL_ALERT_TOPIC --replication-factor 1 --partitions 1 --config retention.ms=5000 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic SYSTEM_ALERT_TOPIC --replication-factor 1 --partitions 1 --config retention.ms=3600000 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic TRACKDAO_PERFORMANCE_TOPIC --replication-factor 1 --partitions 1 --config retention.ms=1250 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic TRACKDAO_TOPIC --replication-factor 1 --partitions 1 --config retention.ms=1250 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic AISMESSAGE_TOPICTEST --replication-factor 1 --partitions 1 --config retention.ms=3000 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic TACTICAL_ALERT_TOPICTEST --replication-factor 1 --partitions 1 --config retention.ms=5000 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic SYSTEM_ALERT_TOPICTEST --replication-factor 1 --partitions 1 --config retention.ms=3600000 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic TRACKDAO_PERFORMANCE_TOPICTEST --replication-factor 1 --partitions 1 --config retention.m=1250 
           kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic TRACKDAO_TOPICTEST --replication-factor 1 --partitions 1 --config retention.ms=1250 

          #Log each topic configuration during creation for further check 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name AISMESSAGE_TOPIC --describe --all > /log/AISMESSAGE_TOPIC.conf 
           echo -e 'Configuration for topic AISMESSAGE_TOPIC saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name TACTICAL_ALERT_TOPIC --describe --all > /log/TACTICAL_ALERT_TOPIC.conf 
           echo -e 'Configuration for topic TACTICAL_ALERT_TOPIC saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name SYSTEM_ALERT_TOPIC --describe --all > /log/SYSTEM_ALERT_TOPIC.conf 
           echo -e 'Configuration for topic SYSTEM_ALERT_TOPIC saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name TRACKDAO_PERFORMANCE_TOPIC --describe --all > /log/TRACKDAO_PERFORMANCE_TOPIC.conf 
           echo -e 'Configuration for topic TRACKDAO_PERFORMANCE_TOPIC saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name TRACKDAO_TOPIC --describe --all > /log/TRACKDAO_TOPIC.conf 
           echo -e 'Configuration for topic TRACKDAO_TOPIC saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name AISMESSAGE_TOPICTEST --describe --all > /log/AISMESSAGE_TOPICTEST.conf 
           echo -e 'Configuration for topic AISMESSAGE_TOPICTEST saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name TACTICAL_ALERT_TOPICTEST --describe --all > /log/TACTICAL_ALERT_TOPICTEST.conf 
           echo -e 'Configuration for topic TACTICAL_ALERT_TOPICTEST saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name SYSTEM_ALERT_TOPICTEST --describe --all > /log/SYSTEM_ALERT_TOPICTEST.conf 
           echo -e 'Configuration for topic SYSTEM_ALERT_TOPICTEST saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name TRACKDAO_PERFORMANCE_TOPICTEST --describe --all > /log/TRACKDAO_PERFORMANCE_TOPICTEST.conf 
           echo -e 'Configuration for topic TRACKDAO_PERFORMANCE_TOPICTEST saved' 
           kafka-configs --bootstrap-server kafka1:19092 --entity-type topics --entity-name TRACKDAO_TOPICTEST --describe --all > /log/TRACKDAO_TOPICTEST.conf 
           echo -e 'Configuration for topic TRACKDAO_TOPICTEST saved' 
 </pre>

Back