Block Code Example to Read From an external API in Batch mode

This Block is a sample code that reads data from an API and pushes the data to the Kafka data queue in a Batch mode.

from pipelineblocksdk.api.Singleton import Singleton
from pipelineblocksdk.construct.base.BatchBlock import BatchBlock

# Following code is the definition for a Batch block
class MyBlock(BatchBlock, Singleton):

    # This is the entrypoint for the block. This is a mandatory function.
    def run(self):
        try:
            # To send data to kafka queue we will have create a schema for our data sets
            # The below schema is an example of my dataset that we are reading
            
            schema = {
	            "clump_thickness":{
		            "type": "IntegerType()", "order": 1, "active": True
    		        },
        	    "cell_size_uniformity":{
        		    "type": "IntegerType()", "order": 2, "active": True
        	    	},
        	    "cell_shape_uniformity":{
        		    "type": "IntegerType()", "order": 3, "active": True
        		    },
        	    "marginal_adhesion":{
            		"type": "IntegerType()", "order": 4, "active": True
            		},
        	    "epithelial_cell_size":{
            		"type": "IntegerType()", "order": 5, "active": True
            		},
        	    "bare_nuclei":{
            		"type": "IntegerType()", "order": 6, "active": True
            		},
        	    "bland_chromatin":{
            		"type": "IntegerType()", "order": 7, "active": True
            		},
        	    "normal_nucleoli":{
            		"type": "IntegerType()", "order": 8, "active": True
            		},
        	    "mitoses":{
            		"type": "IntegerType()", "order": 9, "active": True
            		},
        	    "malignant":{
            		"type": "IntegerType()", "order": 10, "active": True
    		        }
            }
            
            
            # After creating the schema we use the producer to push messages to kafka topic
            self.output_topic, self.producer = self.data_handler.create_producer(display_name="Reading data from external api")
            
            # After pushing it to kafka topic we will set the schema
            self.set_schema(self.output_topic, schema)
                     
            print("To read a file using external api")
            
            import pandas as pd
            from hdfs import InsecureClient
            import os
            
            # Here ip is 63.34.35.24 and the port is 50070
            # Below Command is to connect to HDFS  
            client_hdfs = InsecureClient('http://' + "63.34.35.24" + ':50070')
            
            # To read from Hdfs
            with client_hdfs.read('/AutoML/breast_cancer_wisconsin.csv', encoding = 'utf-8') as reader:
                df = pd.read_csv(reader,index_col=0)
                # The send method of the producer is used to push messages to kafka topic
                # Here reader is the file which we are reading.
                # We are using this for loop to push the records to kafka queue row by row
                for row in reader:
                    self.producer.send(row)
            
            print("Read Successful")
    
            # Close the datastream
            self.producer.close()
    
            # Set the output parameter
            output_dict = dict()
    
            # Set the status of the block as completed
            self.block_status = "COMPLETED"
    
            return output_dict
        except Exception as e:
            raise e