Introduction to Meroxa's New Conduit Connector for Apache Flink

By  Haris Osmanagić

 17 Jun 2024

Conduit Apache Flink Connector

At Meroxa, we're excited to introduce the Conduit connector for Apache Flink, a powerful combination that significantly expands Flink’s capabilities. Apache Flink is renowned for its robust stream processing capabilities, while Conduit offers a lightweight and fast data streaming solution, simplifying the creation of connectors. By integrating these tools, we enhance the options available for real-time data processing.

How It Works

To leverage the robustness of Apache Flink’s Kafka connector, we have designed the Conduit connector to work seamlessly within Flink environments. Here’s a breakdown of the process:

  1. Flink Source: Represents a Conduit pipeline that reads from a data source and writes to a Kafka topic.
  2. Flink Job: Processes data from the Kafka topic, transforming it as needed, and writes the processed data to another Kafka topic.
  3. Sink: A Conduit pipeline reads data from the Kafka topic and writes it to the final destination.

Our Goal

To illustrate the capabilities, we'll demonstrate a job that reads data from Conduit’s generator connector, adds metadata, and writes the data to a file.

Requirements

To get started, you’ll need the following:

  • Java 11 or higher
  • Maven
  • Conduit (refer to our documentation)
  • Kafka (ensure auto.create.topics.enable is set to true)

Setup

First, create a new Maven project and include the necessary dependencies:

<dependencies>
   <dependency>
     <groupId>com.meroxa</groupId>
     <artifactId>conduit-flink-connector</artifactId>
     <version>0.0.1-SNAPSHOT</version>
   </dependency>
   <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-java</artifactId>
     <version>1.17.2</version>
   </dependency>
   <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka</artifactId>
     <version>1.17.2</version>
   </dependency>
</dependencies>

Next, write the main class and get a new execution environment for your Flink job:

var env = StreamExecutionEnvironment.getExecutionEnvironment();

Adding a Source

Each Conduit source in an Apache Flink job maps to a connector on a running Conduit instance. In the conduit-flink-connector, this is represented with io.conduit.flink.ConduitSource:

// (1) Used to correlate all the pipelines which are part of this app
String appId = "conduit-flink-demo";
// (2) Create a new Conduit source
KafkaSource<Record> source = new ConduitSource(
   appId,
   // (3) Specify the plugin
   "generator",
   // (4) Configure the plugin
   Map.of(
     "recordCount", "1",
     "format.type", "structured",
     "format.options.id", "int",
     "format.options.name", "string"
   )
   // (5) Build a KafkaSource instance
).buildKafkaSource();

Breaking It Down

  1. Application ID: Specifies an ID for the Flink job. The Conduit connector uses this ID as part of the Conduit pipeline IDs it creates.
  2. Create ConduitSource: Instantiates a new ConduitSource.
  3. Specify Connector: Choose the Conduit connector to be used. Conduit comes with a few built-in connectors, and additional can be installed. In this case, the built-in generator connector.
  4. Configure Connector: A connector’s configuration is usually part of the README. The configuration we have will make the connector produce one record, that has a structured payload, with two fields: an ID and a name.
  5. Build KafkaSource: Builds the KafkaSource instance.

Writing a Map Transformation

Create a DataStream and add a map transformation to it. The transformation accepts an io.conduit.opencdc.Record and returns an io.conduit.opencdc.Record. Here, we add metadata to each record:

DataStream<Record> in = env.fromSource(
     source,
     WatermarkStrategy.noWatermarks(),
     "generator-source"
   ).map((MapFunction<Record, Record>) value -> {
     value.getMetadata().put("processed-by", "flink");
     return value;
   });

Adding a Sink

Now, write the data into a file:

var sink = new ConduitSink(
   appId,
   "file",
   Map.of("path", "/tmp/file-destination.txt")
).buildKafkaSink();

Connect and Execute

Connect the stream and trigger program execution:

in.sinkTo(sink);
env.execute("Conduit + Apache Flink demo");

Putting It All Together

var env = StreamExecutionEnvironment.getExecutionEnvironment();
String appId = "conduit-flink-demo";
KafkaSource<Record> source = new ConduitSource(
   appId,
   "generator",
   Map.of(
     "recordCount", "1",
     "format.type", "structured",
     "format.options.id", "int",
     "format.options.name", "string"
   )
).buildKafkaSource();

DataStream<Record> in = env.fromSource(
   source,
   WatermarkStrategy.noWatermarks(),
   "generator-source"
).map((MapFunction<Record, Record>) value -> {
   value.getMetadata().put("processed-by", "flink");
   return value;
});

var sink = new ConduitSink(
   appId,
   "file",
   Map.of("path", "/tmp/file-destination.txt")
).buildKafkaSink();

in.sinkTo(sink);
env.execute("Conduit + Apache Flink demo");

Ensure that Conduit and Kafka are running before executing the job. Running the application will generate the following records:

{
  "position": "eyJHcm91cElEIjoiNTU0MTU0NTktOTQ5Ny00OWYyLTgzMGUtMjUyY2EwOTE4YTY5IiwiVG9waWMiOiJmbGluay10b3BpYy1zaW5rIiwiUGFydGl0aW9uIjowLCJPZmZzZXQiOjB9",
  "operation": "create",
  "metadata": {

  },
  "key": {

  },
  "payload": {

  	"id": 3758801242992936400,
  	"name": "petrifier"

  }
}

What we see is a typical OpenCDC record. The .Payload.After field contains an id and a name that were created in the generator connector. Looking at the metadata, you’ll notice "processed-by": "flink" that comes from the map function.

Next Steps

Examine the topics used (flink-topic-source and flink-topic-sink), modify the map transformation, and observe the updated results. For more examples, including PostgreSQL to Snowflake, visit our GitHub repository.

We'd love to hear your feedback on the connector. Join us on Discord, GitHub Discussions, or Twitter/X for more conversations! Also, don't forget to request a demo to learn about our new Conduit Platform!

     Conduit, Meroxa, custom connector, Open source

Haris Osmanagić

Haris Osmanagić

Senior Software Engineer working on Conduit