Wednesday, February 4, 2026

Use Amazon MSK Join and Iceberg Kafka Connect with construct a real-time information lake


As analytical workloads more and more demand real-time insights, organizations want enterprise information to enter the information lake instantly after era. Whereas varied strategies exist for real-time CDC information ingestion (comparable to AWS Glue and Amazon EMR Serverless), Amazon MSK Join with Iceberg Kafka Join gives a totally managed, streamlined strategy that reduces operational complexity and allows steady information synchronization.

On this publish, we exhibit learn how to use Iceberg Kafka Join with Amazon Managed Streaming for Apache Kafka (Amazon MSK) Join to speed up real-time information ingestion into information lakes, simplifying the synchronization course of from transactional databases to Apache Iceberg tables.

Resolution overview

On this publish, we present you learn how to implement capturing transaction log information from Amazon Relational Database Service (Amazon RDS) for MySQL and writing it to Amazon Easy Storage Service (Amazon S3) in Iceberg desk format utilizing append mode, overlaying each single-table and multi-table synchronization, as proven within the following determine.

Downstream customers then course of these change information to reconstruct the information state earlier than writing to Iceberg tables.

On this resolution, you utilize the Iceberg Kafka Sink Connector to implement the enterprise on the sink aspect. The Iceberg Kafka Sink Connector has the next options:

  • Helps exactly-once supply
  • Help multi-table synchronization
  • Help schema adjustments
  • Subject title mapping via Iceberg’s column mapping characteristic

Conditions

Earlier than starting the deployment, guarantee you have got the next elements in place:

Amazon RDS for MySQL: This resolution assumes you have already got an Amazon RDS for MySQL database occasion working with the information you wish to synchronize to your Iceberg information lake. Be sure that binary logging is enabled in your RDS occasion to assist Change Knowledge Seize (CDC) operations.

Amazon MSK Cluster: You want an Amazon MSK cluster provisioned in your goal AWS Area. This cluster will function the streaming platform between your MySQL database and the Iceberg information lake. Make sure the cluster is correctly configured with applicable safety teams and community entry.

Amazon S3 Bucket: Guarantee you have got an Amazon S3 bucket able to host the customized Kafka Join plugins. This bucket serves because the storage location from which AWS MSK Join retrieves and installs your plugins. The bucket should exist in your goal AWS Area, and you have to have applicable permissions to add objects to it.

Customized Kafka Join Plugins: To allow real-time information synchronization with MSK Join, you could create two customized plugins. The primary plugin makes use of the Debezium MySQL Connector to learn transactional logs and produce Change Knowledge Seize (CDC) occasions. The second plugin makes use of Iceberg Kafka Connect with synchronize information from Amazon MSK to Apache Iceberg tables.

Construct Setting: To construct the Iceberg Kafka Join plugin, you want a construct surroundings with Java and Gradle put in. You may both launch an Amazon EC2 occasion (advisable: Amazon Linux 2023 or Ubuntu) or use your native machine if it meets the necessities. Guarantee you have got ample disk area (a minimum of 20GB) and community connectivity to clone the repository and obtain dependencies.

Construct Iceberg Kafka Join from open supply

The connector ZIP archive is created as a part of the Iceberg construct. You may run the construct utilizing the next code:

git clone https://github.com/apache/iceberg.git
cd iceberg/
./gradlew -x take a look at -x integrationTest clear construct
The ZIP archive will probably be saved in ./kafka-connect/kafka-connect-runtime/construct/distributions.

Create customized plugins

The following step is to create customized plugins to learn and synchronize the information.

  1. Add the customized plugin ZIP file you compiled within the earlier step to your designated Amazon S3 bucket.
  2. Go to the AWS Administration Console and navigate to Amazon MSK and select Join within the navigation pane.
  3. Select Customized plugins, then choose the plugin file you uploaded to S3 by shopping or getting into its S3 URI.
  4. Specify a singular, descriptive title on your customized plugin (comparable to my-connector-v1).
  5. Select Create customized plugin.

Configure MSK Join

With the plugins put in, you’re able to configure MSK Join.

Configure information supply entry

Begin by configuring information supply entry.

  1. To create a employee configuration, select Employee configurations within the MSK Join console.
  2. Select Create employee configuration and replica and paste the next configuration.
    key.converter.schemas.allow=false
    worth.converter.schemas.allow=false
    key.converter=org.apache.kafka.join.json.JsonConverter
    worth.converter=org.apache.kafka.join.json.JsonConverter
    # Allow matter creation by the employee
    matter.creation.allow=true
    # Default matter creation settings for debezium connector
    matter.creation.default.replication.issue=3
    matter.creation.default.partitions=1
    matter.creation.default.cleanup.coverage=delete

  3. Within the Amazon MSK console, select Connectors underneath Amazon MSK Join and select Create connector.
  4. Within the setup wizard, choose the Debezium MySQL Connector plugin created within the earlier step, enter the connector title and choose the MSK cluster of the synchronization goal. Copy and paste the next content material within the configuration:
    
    connector.class=io.debezium.connector.mysql.MySqlConnector
    duties.max=1
    embrace.schema.adjustments=false
    database.server.id=100000
    database.server.title=
    database.port=3306
    database.hostname=
    database.password=
    database.consumer=
    
    matter.creation.default.partitions=1
    matter.creation.default.replication.issue=3
    
    matter.prefix=mysqlserver
    database.embrace.checklist=
    
    ## route
    transforms=Reroute
    transforms.Reroute.kind=io.debezium.transforms.ByLogicalTableRouter
    transforms.Reroute.matter.regex=(.*)(.*)
    transforms.Reroute.matter.substitute=$1all_records
    
    # schema.historical past
    schema.historical past.inside.kafka.matter
    schema.historical past.inside.kafka.bootstrap.servers=
    # IAM/SASL
    schema.historical past.inside.client.sasl.mechanism=AWS_MSK_IAM
    schema.historical past.inside.client.sasl.consumer.callback.handler.class=software program.amazon.msk.auth.iam.IAMClientCallbackHandler
    schema.historical past.inside.client.safety.protocol=SASL_SSL
    schema.historical past.inside.client.sasl.jaas.config=software program.amazon.msk.auth.iam.IAMLoginModule required;
    schema.historical past.inside.producer.safety.protocol=SASL_SSL
    schema.historical past.inside.producer.sasl.mechanism=AWS_MSK_IAM
    schema.historical past.inside.producer.sasl.consumer.callback.handler.class=software program.amazon.msk.auth.iam.IAMClientCallbackHandler
    schema.historical past.inside.producer.sasl.jaas.config=software program.amazon.msk.auth.iam.IAMLoginModule required;

    Observe that within the configuration, Route is used to jot down a number of information to the identical matter. Within the parameter transforms.Reroute.matter.regex, the common expression is configured to filter the desk names that must be written to the identical matter. Within the following instance, the information containing within the desk title is written to the identical matter.

    ## route
    transforms=Reroute
    transforms.Reroute.kind=io.debezium.transforms.ByLogicalTableRouter
    transforms.Reroute.matter.regex=(.*)(.*)
    transforms.Reroute.matter.substitute=$1all_records

    For instance, after transforms.Reroute.matter.substitute is specified as $1all_records, the subject title created within the MSK is < database.server.title>.all_records.

  5. After you select Create, MSK Join creates a synchronization job for you.

Knowledge synchronization (single desk mode)

Now, you possibly can create a real-time synchronization job for the Iceberg desk. Begin by making a real-time synchronization job for a single desk.

  1. Within the Amazon MSK console, select Connectors underneath MSK Join
  2. Select Create connector.
  3. On the subsequent web page, choose the beforehand created Iceberg Kafka Join plugin
  4. Enter the connector title and choose the MSK cluster of the synchronization goal.
  5. Paste the next code within the configuration.
    
    connector.class=org.apache.iceberg.join.IcebergSinkConnector
    duties.max=1
    subjects=
    iceberg.tables=
    iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
    iceberg.catalog.warehouse=
    iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
    iceberg.catalog.consumer.area=
    iceberg.tables.auto-create-enabled=true
    iceberg.tables.evolve-schema-enabled=true
    iceberg.management.commit.interval-ms=120000
    transforms=debezium
    transforms.debezium.kind=org.apache.iceberg.join.transforms.DebeziumTransform
    key.converter=org.apache.kafka.join.json.JsonConverter
    worth.converter=org.apache.kafka.join.json.JsonConverter
    worth.converter.schemas.allow=false
    key.converter.schemas.allow=false
    iceberg.management.matter=control-iceberg

    For Iceberg Connector, it is going to create a subject named control-iceberg by default to document offset. Choose the beforehand created employee configuration that features matter.creation.allow = true. If you happen to use the default employee configuration and auto-topic creation isn’t enabled on the MSK dealer degree, the connector won’t be able to mechanically create subjects.

    You may also specify this matter title by setting the parameter iceberg.management.matter = . If you wish to use a customized matter, you need to use the next code.

    $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $MYBROKERS --create --topic  --partitions 3 --replication-factor 2 --config cleanup.coverage=compact

  6. Question the synchronized information outcomes via Amazon Athena. From the desk synchronized to Athena, you possibly can see that, along with the supply desk subject, a further _cdc subject has been added to retailer the metadata content material of the CDC.

Compaction

Compaction is an important upkeep operation for Iceberg tables. Though frequent ingestion of small recordsdata can negatively affect question efficiency, common compaction mitigates this situation by consolidating small recordsdata, minimizing metadata overhead, and considerably bettering question effectivity. To take care of optimum desk efficiency, you must implement devoted compaction workflows. AWS Glue presents a superb resolution for this objective, offering automated compaction capabilities that intelligently merge small recordsdata and restructure desk layouts for enhanced question efficiency.

Schema Evolution Demonstration

To exhibit the schema evolution capabilities of this resolution, we performed a take a look at to point out how subject adjustments on the supply database are mechanically synchronized to the Iceberg tables via MSK Join and Iceberg Kafka Join.

Preliminary Setup:

First, we created an RDS MySQL database with a buyer info desk (tb_customer_info) containing the next schema:

+----------------+--------------+------+-----+-------------------+-----------------------------------------------+
| Subject          | Sort         | Null | Key | Default           | Further                                         |
+----------------+--------------+------+-----+-------------------+-----------------------------------------------+
| id             | int unsigned | NO   | PRI | NULL              | auto_increment                                |
| user_name      | varchar(64)  | YES  |     | NULL              |                                               |
| nation        | varchar(64)  | YES  |     | NULL              |                                               |
| province       | mediumtext   | NO   |     | NULL              |                                               |
| metropolis           | int          | NO   |     | NULL              |                                               |
| street_address | varchar(20)  | NO   |     | NULL              |                                               |
| street_name    | varchar(20)  | NO   |     | NULL              |                                               |
| created_at     | timestamp    | NO   |     | CURRENT_TIMESTAMP | DEFAULT_GENERATED                             |
| updated_at     | timestamp    | YES  |     | CURRENT_TIMESTAMP | DEFAULT_GENERATED on replace CURRENT_TIMESTAMP |
+----------------+--------------+------+-----+-------------------+-----------------------------------------------+

We then configured MSK Join utilizing the Debezium MySQL Connector to seize adjustments from this desk and stream them to Amazon MSK in actual time. Following that, we arrange Iceberg Kafka Connect with devour the information from MSK and write it to Iceberg tables.

Schema Modification Check:

To check the schema evolution functionality, we added a brand new subject named telephone to the supply desk:

ALTER TABLE tb_customer_info ADD COLUMN telephone VARCHAR(20) NULL;

We then inserted a brand new document with the telephone subject populated:

INSERT INTO tb_customer_info (user_name,nation,province,metropolis,street_address,street_name,telephone) values ('user_demo','China','Guangdong',755,'Street1 No.369','Street1','13099990001');

Outcomes:

After we queried the Iceberg desk in Amazon Athena, we noticed that the telephone subject had been mechanically added because the final column, and the brand new document was efficiently synchronized with all subject values intact. This demonstrates that Iceberg Kafka Join’s self-adaptive schema functionality seamlessly handles DDL adjustments on the supply, eliminating the necessity for guide schema updates within the information lake.

Knowledge synchronization (multi-table mode)

It’s frequent that information admins wish to use a single connector for shifting information in a number of tables. For instance, you need to use the CDC assortment software to jot down information from a number of tables to a subject after which write information from one matter to a number of Iceberg tables via the patron aspect. In Configure information supply entry, you configured a MySQL synchronization Connector to synchronize tables with specified guidelines to a subject utilizing Route. Now let’s evaluate learn how to distribute information from this matter to a number of Iceberg tables.

  1. When utilizing Iceberg Kafka Connect with synchronize a number of tables to Iceberg tables utilizing AWS Glue Knowledge Catalog, you have to pre-create a database within the Knowledge Catalog earlier than beginning the synchronization course of. The database title in AWS Glue should precisely match the supply database title, as a result of the Iceberg Kafka Join connector mechanically makes use of the supply database title because the goal database title throughout multi-table synchronization. This naming consistency is required as a result of the connector doesn’t present an choice to map supply database names to totally different goal database names in multi-table eventualities.
  2. If you wish to use your customized matter title, you possibly can create a brand new matter to retailer the MSK Join document offset, see Knowledge synchronization (single desk mode).
  3. Within the Amazon MSK console, create one other connector utilizing the next configuration.
    connector.class= org.apache.iceberg.join.IcebergSinkConnector
    duties.max=2
    subjects=
    iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
    iceberg.catalog.warehouse=
    iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
    iceberg.catalog.consumer.area=
    iceberg.tables.auto-create-enabled=true
    iceberg.tables.evolve-schema-enabled=true
    iceberg.management.commit.interval-ms=120000
    transforms=debezium
    transforms.debezium.kind=org.apache.iceberg.join.transforms.DebeziumTransform
    iceberg.tables.route-field=_cdc.supply
    iceberg.tables.dynamic-enabled=true
    key.converter=org.apache.kafka.join.json.JsonConverter
    worth.converter=org.apache.kafka.join.json.JsonConverter
    worth.converter.schemas.allow=false
    key.converter.schemas.allow=false
    iceberg.management.matter=control-iceberg

    On this configuration, two parameters have been added:

    • iceberg.tables.route-field: Specifies the routing subject that distinguishes between totally different tables, specified as cdc.supply for CDC information parsed by Debezium
    • iceberg.tables.dynamic-enabled: If the iceberg.tables parameter isn’t set, it have to be specified as true right here
  4. After completion, MSK Join will creates a sink connector for you.
  5. After the method is full, you possibly can view the newly created desk via Athena.

Different ideas

On this part, we share some extra issues that you need to use to customise your deployment to suit your use case.

  • Specified desk synchronizationWithin the Knowledge synchronization (multi-table mode) part, you specify iceberg.tables.route-field = _cdc.Supply and iceberg.tables.dynamic-enabled=true, these two parameter settings can write a number of tables saved within the Iceberg desk. If you wish to synchronize solely the desired tables, you possibly can specify the desk title you wish to synchronize by setting iceberg.tables.dynamic-enabled = false after which setting the iceberg.tables parameter. For instance,
    iceberg.tables.dynamic-enabled = false
    iceberg.tables = default.tablename1,default.tablename2
     
    iceberg.desk.default.tablename1.route-regex = tablename1
    iceberg.desk.default.tablename2.route-regex = tablename2

  • Efficiency Testing Outcomes

    We performed a efficiency take a look at utilizing sysbench to judge the information synchronization capabilities of this resolution. The take a look at simulated a high-volume write state of affairs to exhibit the system’s throughput and scalability.Check Configuration:

    1. Database setup: Created 25 tables within the MySQL database utilizing sysbench
    2. Knowledge loading: Wrote 20 million information to every desk (500 million complete information)
    3. Actual-time streaming: Configured MSK Connect with stream information from MySQL to Amazon MSK in actual time in the course of the write course of
    4. Kafka Join configuration:
      • Began Kafka Iceberg Join
      • Minimal employees: 1
      • Most employees: 8
      • Allotted two MCUs per employee

    Efficiency Outcomes:

    In our take a look at utilizing the configuration above, every MCU achieved peak writing efficiency of roughly 10,000 information per second, as proven within the following determine. This demonstrates the answer’s means to deal with high-throughput information synchronization workloads successfully.

Clear up

To scrub up your sources, full the next steps:

  1. Delete MSK Join connectors: Take away each the Debezium MySQL Connector and Iceberg Kafka Join connector created for this resolution.
  2. Delete the Amazon MSK cluster: If you happen to created a brand new MSK cluster particularly for this demonstration, delete it to cease incurring costs.
  3. Delete the S3 buckets: Take away the S3 buckets used to retailer the customized Kafka Join plugins and Iceberg desk information. Guarantee you have got backed up any information you want earlier than deletion.
  4. Delete the EC2 occasion: If you happen to launched an EC2 occasion to construct the Iceberg Kafka Join plugin, terminate it.
  5. Delete the RDS MySQL occasion (non-compulsory): If you happen to created a brand new RDS occasion particularly for this demonstration, delete it. If you happen to’re utilizing an current manufacturing database, skip this step.
  6. Take away IAM roles and insurance policies (if created): Delete any IAM roles and insurance policies that had been created particularly for this resolution to take care of safety greatest practices.

Conclusion

On this publish, we offered an answer to attain real-time, environment friendly information synchronization from transactional databases to information lakes utilizing Amazon MSK Join and Iceberg Kafka Join. This resolution gives a low-cost and environment friendly information synchronization paradigm for enterprise-level massive information evaluation. Whether or not you’re working with ecommerce transactions, monetary transactions, or IoT machine logs, this resolution might help you obtain fast entry to a knowledge lake, enabling analytical companies to shortly get hold of the newest enterprise information. We encourage you to do that resolution in your individual surroundings and share your experiences within the feedback part. For extra info, go to Amazon MSK Join.


In regards to the writer

Huang Xiao

Huang Xiao

Huang is a Senior Specialist Resolution Architect with Analytics at AWS. He focuses on massive information resolution structure design, with years of expertise in improvement and architectural design inside the massive information subject.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles