Block Code Example to Read From an external API in Streaming mode
This Block is a sample code that reads data from an API and pushes the data to the Kafka data queue in a Streaming mode.
from pipelineblocksdk.api.Singleton import Singleton
from pipelineblocksdk.construct.base.StreamBlock import StreamBlock
# Following code is the definition for a Stream block
class MyBlock(StreamBlock, Singleton):
# This is the entrypoint for the block. This is a mandatory function.
def run(self):
try:
# To send data to kafka queue we will have create a schema for our data sets
# The below schema is an example of my dataset that we are reading
schema = {
"clump_thickness":{
"type": "IntegerType()", "order": 1, "active": True
},
"cell_size_uniformity":{
"type": "IntegerType()", "order": 2, "active": True
},
"cell_shape_uniformity":{
"type": "IntegerType()", "order": 3, "active": True
},
"marginal_adhesion":{
"type": "IntegerType()", "order": 4, "active": True
},
"epithelial_cell_size":{
"type": "IntegerType()", "order": 5, "active": True
},
"bare_nuclei":{
"type": "IntegerType()", "order": 6, "active": True
},
"bland_chromatin":{
"type": "IntegerType()", "order": 7, "active": True
},
"normal_nucleoli":{
"type": "IntegerType()", "order": 8, "active": True
},
"mitoses":{
"type": "IntegerType()", "order": 9, "active": True
},
"malignant":{
"type": "IntegerType()", "order": 10, "active": True
}
}
# After creating the schema we use the producer to push messages to kafka topic
self.output_topic, self.producer = self.data_handler.create_producer(display_name="Reading data from external api")
# After pushing it to kafka topic we will set the schema
self.set_schema(self.output_topic, schema)
self.stream()
# Set the output parameter
output_dict = dict()
# Set the status of the block as completed
self.block_status = "COMPLETED"
return output_dict
except Exception as e:
raise e
@async
def stream(self):
try:
print("To read a file using external api")
import pandas as pd
from hdfs import InsecureClient
import os
# Here ip is 63.34.35.24 and the port is 50070
# Below Command is to connect to HDFS
client_hdfs = InsecureClient('http://' + "63.34.35.24" + ':50070')
# To read from HDFS
with client_hdfs.read('/AutoML/breast_cancer_wisconsin.csv', encoding = 'utf-8') as reader:
df = pd.read_csv(reader,index_col=0)
# The send method of the producer is used to push messages to kafka topic
# Here reader is the file which we are reading.
# We are using this for loop to push the records to kafka queue row by row
for row in reader:
self.producer.send(row)
print("Read Successful")
# Close the datastream
self.producer.close()
# Set the output parameter
output_dict = dict()
# Set the status of the block as completed
self.block_status = "COMPLETED"
return output_dict
except Exception as e:
raise e