Turbine App Visualization is now available in the Meroxa dashboard.

Learn more

Using Conduit to Generate Fake Data for Streaming Systems

By  Haris Osmanagić

 2 Aug 2022

Testing streaming systems and architectures can be difficult because you need to mock data and have an upstream system continuously push that mock data. This post is about how to set up Conduit’s data generator connector. 

The generator connector is built into Conduit. You don’t need to download an external connector to get started. The connector has a number of capabilities like controlling the content it generates (a struct or a file), the format (structured payloads and raw payloads) and the amount and frequency of data generated. With this connector, you’ll be able to test the flow of data through your streaming systems.

The example

Our example will be a simple pipeline, with a generator source and a file destination. The generator source will be generating records, which will then be written to a file.

Setting up Conduit

We will use the Docker image in this example (you can also download a binary or you can build the code yourself). Open up your terminal and run:

docker run -p 8080:8080 --rm  ghcr.io/conduitio/conduit:latest

That’s it, Conduit is up and running!

Creating the pipeline

We will use Conduit’s HTTP API to create the pipeline:

curl -Ss -X POST 'http://localhost:8080/v1/pipelines' -d '
{
  "config": {
  	"name": "my-pipeline",
    "description": "My pipeline"
  }
}' | jq -r .id

We use jq here to pretty-print the output and more easily spot the pipeline ID, which we will use in the next steps. You’ll get something like this:

{
  "id": "93d11532-504f-4591-b7b6-c130a54043ac",
  "state": {
    "status": "STATUS_STOPPED",
    "error": ""
  },
  "config": {
    "name": "my-pipeline",
    "description": "My pipeline"
  },
  "connectorIds": [],
  "processorIds": [],
  "createdAt": "2022-07-12T18:54:33.778965128Z",
  "updatedAt": "2022-07-12T18:54:33.778965128Z"
}


Creating the generator source

Run the following command to add a generator source to the pipeline.

curl -X POST 'http://localhost:8080/v1/connectors' -d '
{
  "type": "TYPE_SOURCE",
  "plugin": "builtin:generator",
  "pipeline_id": "93d11532-504f-4591-b7b6-c130a54043ac",
  "config": {
    "name": "my-generator-source",
    "settings": {
      "format.type": "structured",
      "format.options": "id:int,name:string,company:string,trial:bool",
      "readTime": "10ms",
      "recordCount": "5”
    }
  }
}

Let’s go over the configuration options for the generator source in this example (also described in the README):

format.type and format.options

These two parameters are both required and specify the contents of generated records. format.options has different meanings depending on format.type.

format.type can be structured, raw or file. If structured is used, records with structured payloads will be generated. In that case, format.options needs to be a list of name-type pairs, where type can be one of int, string, time, bool. The generator above will create records with structured payloads, where we will have an ID integer field, a name field (of type string), a company field (of type string as well) and a trial field (of type boolean).

Similar is true when format.type is raw. The only difference is that the structs will be serialized as JSON strings, and then converted to bytes.

To use a file as the payload, we need to set format.type to file. format.options is then expected to be a file path.

readTime

Simulates time needed to read a record. In this example, records will be read every 10 milliseconds.

recordCount

The number of records which the generator will generate, or -1 for no limit. In our example, 5 records will be generated.

burst.sleepTime and burst.generateTime

These two options make it possible to simulate bursts. With this, the connector can sleep for burst.sleepTime (not generating any records), then generate records for burst.generateTime, and then ut will repeat the same cycle. The connector always starts with the sleeping phase. The cycles will end when recordCount has been reached, or never (if recordCount is set to -1).

Example:

"readTime": "1ms",
"burst.sleepTime": "15s",
"burst.generateTime": "30s",
"recordCount": "2000"


Here, the connector will sleep for 15s. Then it will be generating records for the next 30s. Every record will take 1ms to be generated. Once 30s are over, the same cycle will be repeated. recordCount is set to 2000, meaning that the cycles will stop after 2000 records have been generated.

Creating the file destination

Now let’s create a place for all the generated records to be written to. We’ll configure a file destination:

curl -C POST 'http://localhost:8080/v1/connectors' -d '
{
  "type": "TYPE_DESTINATION",
  "plugin": "builtin:file",
  "pipeline_id": "93d11532-504f-4591-b7b6-c130a54043ac",
  "config": {
    "name": "my-file-destination",
    "settings": {
      "path": "/home/conduitdev/projects/conduit/file-destination.txt"
    }
  }
}


Starting the pipeline

Finally, let’s start the pipeline by executing the following command:

curl -X POST http://localhost:8080/v1/pipelines/93d11532-504f-4591-b7b6-c130a54043ac/start


Checking the results

Since we’re generating only 5 records, and are simulating a 10-millisecond read time, we should be able to see the records in the destination pretty much instantaneously. If you check the contents of /home/conduitdev/projects/conduit/file-destination.txt, you should see something like this:

{"company":"string 1","id":1562668947,"name":"string 1","trial":true}
{"company":"string 2","id":554929334,"name":"string 2","trial":false}
{"company":"string 3","id":691297882,"name":"string 3","trial":false}
{"company":"string 4","id":234317840,"name":"string 4","trial":false}
{"company":"string 5","id":1564914498,"name":"string 5","trial":true}

That’s all it takes! If you have any questions, suggestions, or just generally want to talk about streaming data, feel free to start a GitHub discussion or have a conversation with us on discord. And don’t forget to follow us on Twitter if you aren’t already.

         Conduit

Haris Osmanagić

Haris Osmanagić

Senior Software Engineer working on Conduit