Conduit 0.5 is Now Available

By  Uchenna Anyanwu

 28 Feb 2023

Conduit 0.5 is out! Conduit’s a tool to help developers build streaming data pipelines between production data stores and messaging systems. For example, if you’ve ever used tools like Kafka Connect, Conduit can be used as a drop-in replacement to help stream data to Apache Kafka. With this release, the goal was to make Conduit easier to operate as a service. This meant, making an easy-to-configure Dead Letter Queues (DLQ) through HTTP and gRPC, extending health checking, and adding more capabilities with Debezium records. Here’s a look at some of the key enhancements in Conduit 0.5.0 and Conduit 0.5.1

Stream Inspector

In the Conduit 0.4 release, developers could peek at the data as it enters Conduit via source connectors and what the data looks like as it travels to destination connectors. In this release, we have made the stream inspector more complete through the ability to peek at data as it enters or leaves processors by adding methods to the processor interface and endpoints.

Processor inspection is available via the API.

Dead Letter Queues

In Conduit 0.4 we added Dead Letter Queues (DLQs) that can be configured through pipeline configuration files. In 0.5 we extended this feature by exposing the DLQ configuration through the HTTP and gRPC APIs. Additionally, we added two new metrics that help you keep an eye on the behavior of your DLQ - `conduit_dlq_execution_duration_seconds` is a histogram tracking how long it took to insert records into the DLQ and `conduit_dlq_bytes` gives you an insight into the size of the records sent to the DLQ.

Check out more information about Dead Letter Queues in our documentation.

Unwrap a Debezium record into an OpenCDC record

Two main processors were added to Conduit in this release:

1.) Parse Json Processor: some source connectors tend to create a record that has a raw data (an array of bytes that is not human readable) key, a raw data payload, or both, and if we know that these values are JSON formatted, then this processor can convert the raw data values into structured data (map of strings and values).

  • To parse the key, use `parsejsonkey` processor name.
  • To parse the payload, use `parsejsonpayload` processor name.

    Ex: using the `parsejsonkey` processor, the key can go from looking like this:

record.RawData {
    Raw: [] uint8 {
		0x7b, 0x22, 0x61, 0x66, 0x74, 0x65, 0x72, 0x22, 0x3a, 0x7b, 0x22, 0x64, 0x61, 0x74, 0x61, 0x22, 0x3a, 0x34, 0x2c, 0x22, 0x69, 0x64, 0x22, 0x3a, 0x33, 0x7d, 0x7d
	}
}

To This:

record.StructuredData{
	"after":map[string]interface{} {
    	"data": 4,
        "id": 3,
    }
}

2.) Unwrap Processor: source connectors could create a record with another record wrapped inside the payload, so we provided a processor that unwraps the record from the payload and creates a new OpenCDC record from it. This processor can unwrap two formats:

  • Debezium: if the payload is a Debezium record, then create a processor with the name “unwrap” and add a configuration “format:debezium” for it. Ex: The record can go from looking like (1) to (2)

(1)
record.Record{
    Metadata: map[string]string{
        "conduit.version": "v0.4.0",
    },
    Payload: record.Change{
        Before: nil,
        After: record.StructuredData{
            "payload": map[string]interface{}{
                "after": map[string]interface{}{
                    "description": "test1",
                    "id": 27,
                },
                "before": interface{}(nil),
                "op": "u",
                "source": map[string]interface{}{
                    "opencdc.version": "v1",
                },
                "transaction": interface{}(nil),
                "ts_ms": float64(1674061777225),
            },
            "schema": map[string]interface{}{},
        },
    },
    Key: record.StructuredData{
        "payload": 27,
        "schema": map[string]interface{}{},
    },
}

(2)

record.Record{
    Operation: record.OperationUpdate,
    Metadata: map[string]string{
        "opencdc.readAt": "1674061777225000000",
        "opencdc.version": "v1",
        "conduit.version": "v0.4.0",
    },
    Payload: record.Change{
        Before: record.StructuredData(nil),
        After: record.StructuredData{
            "description": "test1",
            "id": 27,
        },
    },
    Key: record.RawData{
        Raw: []byte("27"),
    },
}
  • Kafka Connect: if the payload is a Kafka Connect record, then create a processor with the name “unwrap” and add a configuration “format:kafka-connect” for it. Ex: The record can go from looking like (1) to (2)

(1)

record.Record{
    Payload: record.Change{
        Before: record.StructuredData(nil),
        After: record.StructuredData{
            "payload": map[string]interface{}{
                "description": "test2",
                "id": 27,
            },
            "schema": map[string]interface{}{},
        },
    },
    Key: record.StructuredData{
        "payload": map[string]interface{}{
            "id": 27,
        },
        "schema": map[string]interface{}{},
    },
}

(2)

record.Record{
    Operation: record.OperationSnapshot,
    Payload: record.Change{
        After: record.StructuredData{
            "description": "test2",
            "id": 27,
        },
    },
    Key: record.StructuredData{"id": 27},
}

Note that the `Payload.After` is unwrapped to be the whole record, and the payload from the `Key` is unwrapped too.

Implement health check

The Conduit Health Check can be used to determine if Conduit is running correctly. It determines if Conduit can successfully connect to the database with which it was setup (which can be BadgerDB, PostgreSQL, or the in-memory one). Here’s an example:

$ curl "http://localhost:8080/healthz"

{"status":"SERVING"}

You can also check individual services within Conduit. The following example checks if the PipelineService is running:

$ curl "http://localhost:8080/healthz?service=PipelineService"

{"status":"SERVING"}

 

And the rest

If you want to see the full list of what was included in this release, check out the Conduit Changelog and the documentation. Also, feel free to join us on Discord or Twitter.

     Conduit