# Synchronize from Kafka

Log into your SCloud account and go to the [User Console](https://console.scloudsg.com), then search for "Kafka Message Queue UKafka" under all products or under Big Data, and enter the [Kafka Message Queue UKafka Console](https://console.scloudsg.com/ukafka/ukafka). Click the **Create Cluster** button. On the create cluster page, make selections based on the configurations provided and place your order.

<blockquote>
  Friendly reminder:
   <ol>
     <li>UKafka is a paid product. Please refer to the pricing on the create page.</li>
     <li>Ensure that the UKafka cluster and the UClickhouse cloud data warehouse cluster are in the same region (same VPC). If they are different, use Private Network VPC -> Network Interconnection.</li>
   </ol>
</blockquote>

## Prerequisites

The UKafka product has been created, and a Topic and Group have been created. Please refer to [UKafka Creation](/docs/ukafka/kafkasinkerintro/quickstart).

## Steps

### Connect to the Cluster

- Set up a cloud host in the same region as the cluster (same subnet), and install the Clickhouse-client on the cloud host. Older version download: [Download Clickhouse-client](https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/). Newer version download: [Download Clickhouse-client](https://packages.clickhouse.com/rpm/lts/).

  It is recommended to choose the Clickhouse-client version corresponding to the kernel version actually created. For example, if the cluster kernel version created is 21.8.14.5, download the following rpm packages:

  ```
  wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-21.8.14.5-2.noarch.rpm
  wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-21.8.14.5-2.x86_64.rpm
  ```

- Execute the installation

  ```shell
  rpm -ivh clickhouse-common-static-21.8.14.5-2.x86_64.rpm
  rpm -ivh clickhouse-client-21.8.14.5-2.noarch.rpm
  ```

- Connect to the cluster with clickhouse-client

  ```shell
  clickhouse-client --host=any node IP address --port=9000 --user=admin --password=password set during cluster creation
  ```

  <!-- image-todo -->

  The above command will enter interactive mode. The default username is admin, the default port is 9000, and the node IP can be checked in the cluster details.

## Create Database

```
CREATE DATABASE IF NOT EXISTS ck_test ON CLUSTER ck_cluster;
```

## Create Kafka Consumer Table

```
CREATE TABLE ck_test.lineorder_kafka ON CLUSTER ck_cluster 
(
    LO_ORDERKEY             UInt32,
    LO_LINENUMBER           UInt8,
    LO_CUSTKEY              UInt32,
    LO_PARTKEY              UInt32,
    LO_SUPPKEY              UInt32,
    LO_ORDERDATE            Date,
    LO_ORDERPRIORITY        LowCardinality(String),
    LO_SHIPPRIORITY         UInt8,
    LO_QUANTITY             UInt8,
    LO_EXTENDEDPRICE        UInt32,
    LO_ORDTOTALPRICE        UInt32,
    LO_DISCOUNT             UInt8,
    LO_REVENUE              UInt32,
    LO_SUPPLYCOST           UInt32,
    LO_TAX                  UInt8,
    LO_COMMITDATE           Date,
    LO_SHIPMODE             LowCardinality(String)
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'broker list',
    kafka_topic_list = 'topic list',
    kafka_group_name = 'consumer group name',
    kafka_format = 'CSV',
    kafka_num_consumers = 1,
    kafka_max_block_size = 65536,
    kafka_skip_broken_messages = 0,
    kafka_auto_offset_reset = 'latest';
```

```
Note:
  The Kafka consumer table cannot be used directly as a result table. Its function is only to consume data from Kafka; it does not actually store the data.
```

Parameter description:

| Name                       | Required | Description                                                    |
| -------------------------- | -------- | -------------------------------------------------------------- |
| kafka_broker_list          | Yes      | A comma-separated list of Kafka broker addresses.              |
| kafka_topic_list           | Yes      | A comma-separated list of Topic names.                         |
| kafka_group_name           | Yes      | The consumer group name for Kafka.                             |
| kafka_format               | Yes      | The message format supported by UClickHouse. <br /> Supported formats can be found at [Input and Output Formats](https://clickhouse.com/docs/en/interfaces/formats/?spm=a2c4g.11186623.0.0.540911c3r3HPsy#). |
| kafka_row_delimiter        | No       | Row delimiter for separating different data rows. <br /> Default is "\n", but you can set it according to the actual separation format used when writing data. |
| kafka_num_consumers        | No       | Number of consumers for a single table. Default value is 1. <br /> If one consumer's throughput is not sufficient, specify more consumers. <br /> The total number of consumers should not exceed the number of partitions in the Topic, because each partition can only be assigned one consumer. |
| kafka_max_block_size       | No       | Maximum batch size of Kafka messages in bytes. Default value is 65536 bytes. |
| kafka_skip_broken_messages | No       | The tolerance level of kafka message parser for dirty data. Default value is 0. <br /> If `kafka_skip_broken_messages=N`, the engine will skip N unparseable Kafka messages (one message equals one line of data). |
| kafka_commit_every_batch   | No       | Frequency of executing Kafka commit, described below.<br /> Commit is executed after writing all the data of a whole Block data block. <br /> A commit is executed after writing each Batch of data. |
| kafka_ auto_offset_reset   | No       | Where to start reading Kafka data from. Options are described below.<br /> earliest: Start reading Kafka data from the earliest offset.<br /> latest: Start reading Kafka data from the latest offset.<br /> Note: The 21.8 version of ClickHouse cloud database clusters does not support this parameter. |

For more parameters, refer to the [official Kafka documentation](https://clickhouse.tech/docs/zh/engines/table-engines/integrations/kafka/?spm=a2c4g.11186623.0.0.540911c3r3HPsy).

## Create UClickhouse Local Table

### Create Local Table

```
CREATE TABLE ck_test.lineorder_local ON CLUSTER ck_cluster
(
    LO_ORDERKEY             UInt32,
    LO_LINENUMBER           UInt8,
    LO_CUSTKEY              UInt32,
    LO_PARTKEY              UInt32,
    LO_SUPPKEY              UInt32,
    LO_ORDERDATE            Date,
    LO_ORDERPRIORITY        LowCardinality(String),
    LO_SHIPPRIORITY         UInt8,
    LO_QUANTITY             UInt8,
    LO_EXTENDEDPRICE        UInt32,
    LO_ORDTOTALPRICE        UInt32,
    LO_DISCOUNT             UInt8,
    LO_REVENUE              UInt32,
    LO_SUPPLYCOST           UInt32,
    LO_TAX                  UInt8,
    LO_COMMITDATE           Date,
    LO_SHIPMODE             LowCardinality(String)
)
ENGINE = MergeTree ORDER BY (LO_ORDERKEY);
```

### Create Distributed Table

If you only need to sync data to the local table, you can skip this step.

```
CREATE TABLE IF NOT EXISTS ck_test.lineorder on cluster ck_cluster AS ck_test.lineorder_local  ENGINE = Distributed(
       ck_cluster,
       ck_test,
       lineorder_local,
       rand()
);
```

### Create Materialized View

Create a materialized view to synchronize data consumed from the Kafka consumer table to the UClickHouse target table. If your target table is a local table, replace the distributed table name with the local table name before synchronization.

```
CREATE MATERIALIZED VIEW consumer ON CLUSTER ck_cluster TO lineorder AS SELECT * FROM lineorder_kafka;
```

### Write Messages in UKafka

Please [refer to UKafka](/docs/ukafka/kafkasinkerintro/quickstart).

### Query Verification

```
clickhouse-client --host=any node IP address --port=9000 --user=admin --password=password set during cluster creation --query="select count(*) from lineorder"
```