Conduit v0.11 Unveils Powerful Schema Support for Enhanced Data Integration

By  Lovro Mažgon

 19 Aug 2024

We made it, Conduit v0.11 is here! In this latest release, we’ve focused on adding schema support, enabling you to detect schema changes and retain type information end-end. Our commitment is to make data integration more efficient and user-friendly, helping you optimize your data streaming workflows.

Schema Support

With the release of Conduit v0.11, one of the most significant enhancements is the support for schemas.

Key Highlights of Conduit v0.11

  • Schema Support: Manage and detect schema changes seamlessly. Conduit now preserves type information end-to-end, ensuring data integrity and type safety throughout the pipeline.
  • Schema Registry: Integrated schema registry within Conduit, with compatibility for Confluent Schema Registry. Easily manage and fetch schemas without deploying separate services.
  • Connector Enhancements: New and improved connector SDK for working with schemas, simplifying the process of data encoding, decoding, and transformation.
  • Processor Improvements: Enhanced processor SDK with schema support, allowing for more accurate and reliable data processing.
  • Documentation Search: Quickly find the information you need with our new search feature in the Conduit documentation.

The primary benefits of schema support include:

  • Data Integrity: Ensures that data adheres to the expected structure, reducing the risk of errors and inconsistencies.
  • Type Safety: Retains type information throughout the data pipeline, allowing for safe and accurate data processing.
  • Future-Proofing: Prepares the system to handle evolving data structures, making it easier to adapt to changes without significant disruptions.

In the following sections, we will delve into the specifics of how schema support is implemented in Conduit, including the schema registry, connectors, processors, and additions to the OpenCDC record format.

Schema Registry

The Schema Registry is now a built-in component of Conduit, enabling the usage of schemas in Conduit pipelines out of the box without deploying a separate service.

Check out the source of the Conduit Schema Registry. It is written in Go, meaning that it can be compiled into Conduit and is used internally as the default schema registry. We have also written a test suite, which runs against our schema registry as well as the Confluent Schema Registry, ensuring their compatibility. The Conduit Schema Registry currently supports only a subset of the features, however, the long-term goal is to make it fully compatible and allow it to be run as a standalone service.

Conduit also allows you to configure an external schema registry that’s compatible with the Confluent Schema Registry API.

schema-registry:
  type: "confluent"
  confluent:
    connection-string: "http://localhost:8085"

This snippet of the conduit.yaml file shows how to configure Conduit to connect to a Confluent Schema Registry instance. Check out the documentation for more information.

Schemas and OpenCDC records

We have added support for attaching schemas to OpenCDC records by introducing four standard metadata fields. These fields provide the required information to identify and fetch a specific schema from a schema registry.

  1. opencdc.key.schema.subject and opencdc.key.schema.version

    These fields contain the schema subject and version for the data in the .Key field of the OpenCDC record.

  2. opencdc.payload.schema.subject and opencdc.payload.schema.version

    These fields contain the schema subject and version for the data in the .Payload.Before and .Payload.After fields.

Connectors

The latest Connector SDK includes several enhancements to simplify working with schemas.

First, we introduced the schema package, which contains utilities for retrieving and creating schemas in connectors. These utilities interact with Conduit’s Schema Registry. The returned schema can be used to encode and decode data, as well as traverse the schema and apply it to the destination resource (e.g. creating a destination table with the correct types).

Here’s an example:

package myConnector

import (
	"context"

	"github.com/conduitio/conduit-connector-sdk/schema"
	"github.com/conduitio/conduit-commons/opencdc"
)

/* ... */

func (d *Destination) Write(ctx context.Context, records []opencdc.Record) (int, error) {
	for i, r := range records {
		keySubject, _ := r.Metadata.GetKeySchemaSubject()
		keyVersion, _ := r.Metadata.GetKeySchemaVersion()
		keySchema, _ := schema.Get(ctx, keySubject, keyVersion)

		payloadSubject, _ := r.Metadata.GetKeySchemaSubject()
		payloadVersion, _ := r.Metadata.GetKeySchemaVersion()
		payloadSchema, _ := schema.Get(ctx, payloadSubject, payloadVersion)

		// use keySchema and payloadSchema ...
	}
}

We also introduced source middleware that extracts an Avro schema from structured data and encodes the value into Avro raw data. This alleviates the issue of losing type information, which previously affected standalone connectors. The source middleware is enabled by default in all connectors using the latest connector SDK, meaning that connectors don’t need any specific code to benefit from schema support.

Additionally, all destination connectors benefit from another middleware, which works in the opposite manner to the source middleware. If a record contains the new metadata fields with a subject and version, it will fetch the schema and decode the data into structured data. This ensures that both the destination and source connectors can work with structured data while preserving the correct type information end-to-end.

To find out more about the source and destination middleware check out the middleware documentation.

Processors

The Processor SDK now includes schema support, similar to the Connector SDK, making it easier to work with structured data in processors.

We have introduced a schema package in the processor SDK, which can be used to interact with Conduit’s Schema Registry. This package allows processors to retrieve and create schemas, ensuring that type information is preserved throughout data processing.

Here’s a snippet of how you could interact with the new schema package:

package myProcessor

import (
	"context"

	sdk "github.com/conduitio/conduit-processor-sdk"
	"github.com/conduitio/conduit-processor-sdk/schema"
	"github.com/conduitio/conduit-commons/opencdc"
)

/* ... */

func (p *Processor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord {
	for i, r := range recs {
		keySubject, _ := r.Metadata.GetKeySchemaSubject()
		keyVersion, _ := r.Metadata.GetKeySchemaVersion()
		keySchema, _ := schema.Get(ctx, keySubject, keyVersion)

		payloadSubject, _ := r.Metadata.GetKeySchemaSubject()
		payloadVersion, _ := r.Metadata.GetKeySchemaVersion()
		payloadSchema, _ := schema.Get(ctx, payloadSubject, payloadVersion)

		// use keySchema and payloadSchema ...
	}
}

Additionally, processors are equipped with new middleware that automatically handles the encoding and decoding of data in records that have an attached schema. The middleware detects changes in data (e.g. new fields, deleted fields, changed field types) and updates the schema, bumping its version according to the applied changes. This middleware is enabled by default for all processors, ensuring seamless schema management without requiring any additional code in the processor implementation.

Other improvements

Apart from the schema support, we have added several other improvements in v0.11.

Documentation search

One of the most significant additions to our documentation is the introduction of a search bar. The search bar allows users to quickly locate the content they are looking for. This feature is especially useful for newcomers who are getting acquainted with Conduit, as it reduces the time spent navigating the documentation.

Connector improvements

Postgres connector

The latest release of the Postgres connector includes support for incremental snapshots in logical replication mode. This feature allows for safely executing snapshots of the current state before starting to stream changes. It is especially important for large tables, which can take hours or even days to snapshot. With this enhancement, an interrupted snapshot can be resumed from the last successfully synced position.

We also improved the management of logical replication slots, ensuring that slots created by Conduit are cleaned up when the pipeline is deleted.

These changes are included in the built-in Postgres connector, but feel free to check out the source for the connector here.

HTTP connector

The source connector has now become more flexible, allowing you to use JavaScript to specify the behavior for getting the request data and for parsing the response.

In the destination connector, we have added the ability to build the URL of the request using data from the incoming parameters.

Check out the HTTP connector source here.

Processor improvements

error

We introduced a new processor called error, which can be used to send a record to the DLQ (Dead Letter Queue) or fail the pipeline. It should always be used together with a condition, otherwise all records reaching this processor will produce an error.

Read more about the error processor here.

webhook.http

We added the ability to specify headers for the webhook.http processor.

Read more about the webhook.http processor here.

field.convert

The field.convert processor can now convert data to a Go time.Time object. It supports converting unix nano timestamps or RFC3339 formatted dates.

Read more about the field.convert processor here.

avro.encode and avro.decode

These processors previously required users to run an external schema registry and configure the connection string for each processor. Now, they have been updated to use Conduit’s schema registry, eliminating the need for an external service.

Read more about the avro.encode and avro.decode processors here and here.

What’s next?

With the release of Conduit v0.11, we have reached an important milestone. However, there are still exciting features on the horizon. Here’s a glimpse of what’s coming next:

  • We plan to add more robust pipeline lifecycle management functionality directly into Conduit. Specifically, we will introduce the ability to configure a restart policy at the pipeline level in case of failures. This will enable recovery from transient errors, such as an external service being unreachable, even if the connector itself cannot handle such failures.
  • We acknowledge that the Conduit UI has lagged behind the features we’ve added over the past two years, limiting access to Conduit’s full potential. Instead, we focused on improving the internal capabilities and configuring them through configuration files. We think that Conduit is most useful as a tool that can be automated and configured programmatically, therefore we plan to remove the UI from Conduit entirely. In its place, we will add powerful CLI commands to simplify tasks such as bootstrapping new pipelines, exploring the contents of a running Conduit instance, and creating your own processors or connectors.
  • We plan to refactor the API and introduce the ability to export and import pipelines into configuration files. This will enhance the integration between the API and configuration management, making it easier to manage and deploy pipelines.

We invite you to participate in shaping the Conduit roadmap by joining our GitHub discussions or starting a new discussion yourself. Your feedback and ideas are crucial in helping us prioritize features that meet your needs.

Conclusion

Conduit v0.11 brings a host of new features and improvements that enhance the flexibility, usability, and performance of our data streaming platform. From comprehensive schema support to robust connector and processor enhancements, this release is designed to make data integration more seamless and efficient. We encourage you to upgrade to the latest version and explore these new capabilities. As always, we welcome your feedback and contributions to help shape the future of Conduit. Get involved by joining our Discord server and saying hello to the team behind Conduit!

     Conduit, Meroxa, OpenCDC, Change Data Capture

Lovro Mažgon

Lovro Mažgon

Staff Software Engineer at Meroxa working on conduit.io