Data APIs

To transfer data from one block to another we use Apache Kafka, where we use a Producer (to write data to topic) and Consumer (to receive data from topic). Apache Kafka is distributed message streaming platform. The messages are sent to specific topic and consumer can read those messages from corresponding topics.

To control the functions like create, delete new topics or send and receive data from Producer/Consumer, we use the MessageHandler class, which accepts block parameters and logger as arguments. Block parameters contains block_id, parent_id, run_id, pipeline_id, etc. Logger is the log API used to store errors, warning, debug messages etc. MessageHandler class can be imported using the given import statement.

from pipelineblocksdk.construct.handlers.MessageHandler.MessageHandler import MessageHandler

Creating Producer:

To create a producer use the below code -

output_topic, producer = self.data_handler.create_producer(display_name=<display_name>)

The parameters display_name is the name to be displayed for the topic to which data will be sent by the producer, and batch_size is the buffer size. Once buffer size is reached the message is pushed to Kafka. The above code return output topic and producer object. The producer objects have methods to send messages, release/close producer etc. output_topic is a UUID string, it is the name with which a topic is created in kafka and the producer created will be posting data to this topic.

Pushing messages to topic:

producer.send(msg, *args, *kwargs)

The send method of the producer is used to push messages to kafka topic. The send method takes in the msg as parameter. After all the messages are pushed to topic the producer is closed using producer.close()

Creating Consumer:

consumer = self.data_handler.create_consumer(topic_name=<topic_name>)

Consumer object is created using the above create_consumer method. The topic_name is the topic from which the messages are to received i.e., for eg., ‘93314209-19be-4f6e-951f-f05332f53a18’. Topic display_name and topic_name are two different things. display_name is the name which will be displayed while previewing the data present in the topic on the platform where as topic_name is UUID and it the name with which the topic is created in kafka. Consumer object also has a method to receive messages.

Pull messages from topic:

consumer.receive()

This method returns one message at a time.

MessageHandler methods:

The MessageHandler class also contains methods to set and get schema, delete topic, update metadata, etc.

Schema - Schema is META information for a topic containing data. For every topic there should be a Schema associated with it. It contains details of each column, like type, size, order. It is represented by a dictionary in Python blocks and Map in Java Blocks.

{
   "column_name":{
                   "type": "FloatType()",
                   "order": 1,
                   "size": 12,
                   "active": true
                  }
}
Key Details:

type: It specifies the data type of the column. It can have following values IntegerType(), FloatType(), StringType(), ArrayType(), BinaryType().

order: It is the sequence in which the column appears in the data set. Let's say, in a data set there are 3 columns (COL_1, COL_2, COL_3), then their order will be (1, 2, 3) respectively.

size: It is the length of the value having MAX length in the data set. It is not mandatory and can be set only if required.

active: This field indicates whether a column is active or not. If False, the values corresponding to the column will be there in the data set, but it won’t be displayed in Preview and its value won’t be written to file using 'Target Connectors'. It can be used for soft deleting a column or storing some computational results which is not required to be previewed or written to Output file using 'Target Connectors'.

{
    “infoKeys”: [“schema”, “total”],
    “schema”: {
        “COL_1: {
            “type”: “StringType()”,
            “order”: 1,
            “size”: 255,
            “active”: true 
        }
    },
    “totalRecordCount”: 500
}

In the above example, the dictionary represents META information for a Topic. The infoKeys is a list containing details of the keys inside the META information. Then other keys like schema and totalRecordsCount contains respective information. These keys are not mandatory and a Block developer can store whatever details they want as META information for a given topic.

get_schema - This method is used to return schema from specific topic.
self.get_schema(topic)

Parameters
topic: string
Name of the topic for which schema needs be returned
Returns
dictionary
Schema represented in the form of a map

set_schema - This method allows to set schema to particular topic.
self.set_schema(topic, schema)

Parameters
topic: string
Name of the topic to which schema needs be set
schema: dictionary

delete_topic - Deletes the given topic.
self.delete_topic(topic)

Parameters
topic: string
Name of the topic
Returns
boolean
True is deleted successfully

get_meta_data - gets metadata for a given topic.
self.get_meta_data(topic)

Parameters
topic: string
Name of the topic for which schema needs be returned
Returns
dictionary
Metadata represented in the form of a map

update_meta_data - Allows users to associate metadata for a topic.
self.update_meta_data(topic, meta)

Parameters
topic: string
Name of the topic for which metadata is to be updated
meta: dictionary - Contains key value pairs of metadata