Block Code Example to Read From an external API in Streaming mode
This Block is a sample code that reads data from an API and pushes the data to the Kafka data queue in a Streaming mode.
from pipelineblocksdk.api.Singleton import Singleton
from pipelineblocksdk.construct.base.StreamBlock import StreamBlock
# Following code is the definition for a Stream block
class MyBlock(StreamBlock, Singleton):
# This is the entrypoint for the block. This is a mandatory function.
def run(self):
# To send data to kafka queue we will have create a schema for our data sets
# The below schema is an example of my dataset that we are reading
schema = {
"type": "IntegerType()", "order": 1, "active": True
"type": "IntegerType()", "order": 2, "active": True
"type": "IntegerType()", "order": 3, "active": True
"type": "IntegerType()", "order": 4, "active": True
"type": "IntegerType()", "order": 5, "active": True
"type": "IntegerType()", "order": 6, "active": True
"type": "IntegerType()", "order": 7, "active": True
"type": "IntegerType()", "order": 8, "active": True
"type": "IntegerType()", "order": 9, "active": True
"type": "IntegerType()", "order": 10, "active": True
# After creating the schema we use the producer to push messages to kafka topic
self.output_topic, self.producer = self.data_handler.create_producer(display_name="Reading data from external api")
# After pushing it to kafka topic we will set the schema
self.set_schema(self.output_topic, schema)
# Set the output parameter
output_dict = dict()
# Set the status of the block as completed
self.block_status = "COMPLETED"
return output_dict
except Exception as e:
raise e
def stream(self):
print("To read a file using external api")
import pandas as pd
from hdfs import InsecureClient
import os
# Here ip is and the port is 50070
# Below Command is to connect to HDFS
client_hdfs = InsecureClient('http://' + "" + ':50070')
# To read from HDFS
with'/AutoML/breast_cancer_wisconsin.csv', encoding = 'utf-8') as reader:
df = pd.read_csv(reader,index_col=0)
# The send method of the producer is used to push messages to kafka topic
# Here reader is the file which we are reading.
# We are using this for loop to push the records to kafka queue row by row
for row in reader:
print("Read Successful")
# Close the datastream
# Set the output parameter
output_dict = dict()
# Set the status of the block as completed
self.block_status = "COMPLETED"
return output_dict
except Exception as e:
raise e