Block Code Example to generate Structured data

This is a sample code to generate some random strucutred data and push it into the Kafka data queue for further consumption. This Block runs in a Batch mode.

from pipelineblocksdk.construct.base.BatchBlock import BatchBlock
from pipelineblocksdk.api.Singleton import Singleton

# Following code is the definition for a batch block
class MyBlock(BatchBlock, Singleton):

    # This is the entrypoint for the block. This is a mandatory function.
    def run(self):
        # Create a datastream
        self.create_sample_datastream()

        # Create 10 random data rows
        for i in range(10):
            # Send the data row to the datastream
            self.producer.send(self.generate_random_data())

        # Close the datastream
        self.producer.close()

        # Set the output parameter
        output_dict = dict()
        output_dict["output_datastream"] = self.output_topic

        # Set the status of the block as completed
        self.block_status = "COMPLETED"

        return output_dict

    # Following are user's custom functions

    # Randomly generates a datastream to push the data to be consumed by next blocks in the pipeline.
    def create_sample_datastream(self):
        from random import randint
        self.output_topic, self.producer = self.data_handler.create_producer("datastream-" + str(randint(0, 1000)))
        self.set_schema(self.output_topic, self.get_schema_for_data())

    # Randomly generates data row with 2 columns. An integer and a string.
    def generate_random_data(self):
        import random
        from random import randint
        string_value_options = ['John', 'Jamie', 'Jacob', 'Jackson', 'Jeremy']
        return [randint(1, 1000), random.choice(string_value_options)]

    # Creates a schema/definition for the above defined data.
    # This tells that datastream has data in the given structure,
    # so that the downstream consumers can read from the datastream and understand the data.
    def get_schema_for_data(self):
        schema = {
            "COLUMN_1": {
                "type": "IntegerType()", "order": 1, "active": True
            },
            "COLUMN_2": {
                "type": "StringType()", "order": 2, "active": True
            }
        }
        return schema