# Public Network Access Configuration

## Enable Cluster Authentication Configuration

[Cluster Authentication Configuration](/docs/ukafka/guide/cluster/sasl)

## View SASL Access Point

[SASL_PLAINTEXT Protocol Address](/docs/ukafka/guide/node/address#SASL_PLAINTEXT Protocol Address)

## Configure Public Network Forwarding

### 1. Create a Cloud Server in the Same Subnet as the UKafka Cluster

Note: This document is based on CentOS 7.9

### 2. Install nginx on the Cloud Server

```shell
yum install nginx nginx-all-modules.noarch
```

### 3. Configure nginx Proxy

- Edit `/etc/nginx/nginx.conf` to add stream configuration

```conf
stream {
    log_format proxy '$remote_addr [$time_local] '
                 '$protocol $status $bytes_sent $bytes_received '
                 '$session_time "$upstream_addr" '
                 '"$upstream_bytes_sent" "$upstream_bytes_received" "$upstream_connect_time"';

    access_log /var/log/nginx/tcp-access.log proxy;
    open_log_file_cache off;

    # Unified placement, easy to manage
    include /etc/nginx/tcpConf.d/*.conf;
}
```

- Create `/etc/nginx/tcpConf.d/` directory

```shell
mkdir -p /etc/nginx/tcpConf.d/
```

- Edit `/etc/nginx/tcpConf.d/kafka.conf` configuration file

```conf
upstream tcp9093 {
    server 10.23.26.105:9093;
}
upstream tcp9094 {
    server 10.23.180.35:9094;
}
upstream tcp9095 {
    server 10.23.202.164:9095;
}

server {
    listen 9093;
    proxy_connect_timeout 8s;
    proxy_timeout 24h;
    proxy_pass tcp9093;
}
server {
    listen 9094;
    proxy_connect_timeout 8s;
    proxy_timeout 24h;
    proxy_pass tcp9094;
}
server {
    listen 9095;
    proxy_connect_timeout 8s;
    proxy_timeout 24h;
    proxy_pass tcp9095;
}
```

- Start nginx

```shell
systemctl start nginx.service
```

- Verify the status of nginx service

```shell
systemctl status nginx.service
```

### 4. Host Firewall Configuration

The host firewall needs to accept forwarding requests on ports 9093, 9094, and 9095.

## Public Network Access

### 1. Configure Local Hosts

Modify the local `/etc/hosts` file, and direct all addresses listened by UKafka SASL_PLAINTEXT protocol to the public IP of the cloud server that transmits requests:

```conf
113.31.114.125 ukafka-q0j01x5g-kafka1
113.31.114.125 ukafka-q0j01x5g-kafka2
113.31.114.125 ukafka-q0j01x5g-kafka3
```

### 2. Access with Kafka Command Line Tool

#### Download and Configure Kafka Command Line Tool

Taking Kafka 2.6.1 command line tool as an example:

> You can download the command line tool according to the instance version

```shell
# Download the command line tool
wget https://archive.apache.org/dist/kafka/2.6.1/kafka_2.13-2.6.1.tgz

# Extract
tar -zxvf kafka_2.13-2.6.1.tgz

# Enter the command line tool root directory
cd kafka_2.13-2.6.1
```

Configure the `config/kafka_client_jaas.conf` file as follows, where `username` and `password` are the username and password used when enabling cluster authentication configuration in the control panel:

```shell
cat > config/kafka_client_jaas.conf << EOF
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin_pass";
};
EOF
```

#### Send Message

- Modify `bin/kafka-console-producer.sh` to add `java.security.auth.login.config` parameter

```sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
```

- Modify `config/producer.properties`, specify `security.protocol`, `sasl.mechanism`

```sh
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```

- Run `bin/kafka-console-producer.sh` to send message：

```shell
bin/kafka-console-producer.sh  --producer.config ./config/producer.properties --topic foo --broker-list ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095
>hello
>world
>foo
>bar
>
```

#### Receive Message

- Modify `bin/kafka-console-consumer.sh` to add `java.security.auth.login.config` parameter

``` sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=./config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
```

- Modify `config/consumer.properties`, specify `security.protocol`, `sasl.mechanism`

``` sh
group.id=test-consumer-group

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```

- Run `bin/kafka-console-consumer.sh` to receive message

```shell
bin/kafka-console-consumer.sh --consumer.config ./config/consumer.properties --topic foo --bootstrap-server ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095 --from-beginning
world
hello
foo
bar
```

### 3. Access with Java SDK

There are several ways to access Java SDK, such as configuring the `kafka_client_jaas.conf` file or directly setting the `sasl.jaas.config` property. For details, please refer to the three ways in the following code examples.

If you use the `kafka_client_jaas.conf` configuration file, fill in the username and password of the username and password when you enable the cluster authentication configuration in the console:

```shell
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin_pass";
};
```

#### Producer Example

``` java
package cn.ucloud.ukafka.Example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class KafkaProducerExample {

    public static void main(String[] args) throws Exception {
        // Method 1: Add environment variable that requires specifying the path of the configuration file
        System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");

        // Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`

        Properties props = new Properties();
        props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");

        // You need to specify security.protocol and sasl.mechanism
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        // Method 3: Set jaas configuration content with Properties
        // props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        String topic = "foo";
        String value = "this is the message's value";

        ProducerRecord<String, String>  kafkaMessage =  new ProducerRecord<String, String>(topic, value);
        try {
            Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
            RecordMetadata recordMetadata = metadataFuture.get();
            System.out.println(String.format("Produce ok: topic:%s partition:%d offset:%d value:%s",
                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), value));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

#### Consumer Example

``` java
package cn.ucloud.ukafka.Example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class KafkaConsumerExample {

    public static void main(String[] args) throws Exception {
        // Method 1: Add environment variable that requires specifying the path of the configuration file
        System.setProperty("java.security.auth.login.config", "./config/kafka_client_jaas.conf");

        // Method 2: Add starting JVM parameter `-Djava.security.auth.login.config=./config/kafka_client_jaas.conf`

        Properties props = new Properties();
        props.put("bootstrap.servers", "ukafka-q0j01x5g-kafka1:9093,ukafka-q0j01x5g-kafka2:9094,ukafka-q0j01x5g-kafka3:9095");
        props.put("group.id", "test_group");
        props.put("auto.offset.reset", "earliest");

        // You need to specify security.protocol and sasl.mechanism
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");

        // Method 3: Set jaas configuration content with Properties
        // props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin_pass\";");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        List<String> subscribedTopics = new ArrayList<String>();
        subscribedTopics.add("foo");
        consumer.subscribe(subscribedTopics);

        while (true){
            try {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("Consume topic:%s partition:%d offset:%d value:%s",
                            record.topic(), record.partition(), record.offset(), record.value()));
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
```

> The support and setting for authentication by different language Kafka SDKs are different. For specific settings, you need to check the documentation of the relevant SDK.
