Block Code Example to generate Structured data
This is a sample code to generate some random strucutred data and push it into the Kafka data queue for further consumption. This Block runs in a Batch mode.
from pipelineblocksdk.construct.base.BatchBlock import BatchBlock
from pipelineblocksdk.api.Singleton import Singleton
# Following code is the definition for a batch block
class MyBlock(BatchBlock, Singleton):
# This is the entrypoint for the block. This is a mandatory function.
def run(self):
# Create a datastream
self.create_sample_datastream()
# Create 10 random data rows
for i in range(10):
# Send the data row to the datastream
self.producer.send(self.generate_random_data())
# Close the datastream
self.producer.close()
# Set the output parameter
output_dict = dict()
output_dict["output_datastream"] = self.output_topic
# Set the status of the block as completed
self.block_status = "COMPLETED"
return output_dict
# Following are user's custom functions
# Randomly generates a datastream to push the data to be consumed by next blocks in the pipeline.
def create_sample_datastream(self):
from random import randint
self.output_topic, self.producer = self.data_handler.create_producer("datastream-" + str(randint(0, 1000)))
self.set_schema(self.output_topic, self.get_schema_for_data())
# Randomly generates data row with 2 columns. An integer and a string.
def generate_random_data(self):
import random
from random import randint
string_value_options = ['John', 'Jamie', 'Jacob', 'Jackson', 'Jeremy']
return [randint(1, 1000), random.choice(string_value_options)]
# Creates a schema/definition for the above defined data.
# This tells that datastream has data in the given structure,
# so that the downstream consumers can read from the datastream and understand the data.
def get_schema_for_data(self):
schema = {
"COLUMN_1": {
"type": "IntegerType()", "order": 1, "active": True
},
"COLUMN_2": {
"type": "StringType()", "order": 2, "active": True
}
}
return schema