Block Code Example to Read From an external API in Batch mode

This Block is a sample code that reads data from an API and pushes the data to the Kafka data queue in a Batch mode.

from pipelineblocksdk.api.Singleton import Singleton
from pipelineblocksdk.construct.base.BatchBlock import BatchBlock

# Following code is the definition for a Batch block
class MyBlock(BatchBlock, Singleton):

    # This is the entrypoint for the block. This is a mandatory function.
    def run(self):
            # To send data to kafka queue we will have create a schema for our data sets
            # The below schema is an example of my dataset that we are reading
            schema = {
		            "type": "IntegerType()", "order": 1, "active": True
        		    "type": "IntegerType()", "order": 2, "active": True
        		    "type": "IntegerType()", "order": 3, "active": True
            		"type": "IntegerType()", "order": 4, "active": True
            		"type": "IntegerType()", "order": 5, "active": True
            		"type": "IntegerType()", "order": 6, "active": True
            		"type": "IntegerType()", "order": 7, "active": True
            		"type": "IntegerType()", "order": 8, "active": True
            		"type": "IntegerType()", "order": 9, "active": True
            		"type": "IntegerType()", "order": 10, "active": True
            # After creating the schema we use the producer to push messages to kafka topic
            self.output_topic, self.producer = self.data_handler.create_producer(display_name="Reading data from external api")
            # After pushing it to kafka topic we will set the schema
            self.set_schema(self.output_topic, schema)
            print("To read a file using external api")
            import pandas as pd
            from hdfs import InsecureClient
            import os
            # Here ip is and the port is 50070
            # Below Command is to connect to HDFS  
            client_hdfs = InsecureClient('http://' + "" + ':50070')
            # To read from Hdfs
            with'/AutoML/breast_cancer_wisconsin.csv', encoding = 'utf-8') as reader:
                df = pd.read_csv(reader,index_col=0)
                # The send method of the producer is used to push messages to kafka topic
                # Here reader is the file which we are reading.
                # We are using this for loop to push the records to kafka queue row by row
                for row in reader:
            print("Read Successful")
            # Close the datastream
            # Set the output parameter
            output_dict = dict()
            # Set the status of the block as completed
            self.block_status = "COMPLETED"
            return output_dict
        except Exception as e:
            raise e