Stream processing model

Feedback


The streaming service uses the stream processing model as the service provider, which specifies the necessary information when running service.

The process of stream data processing includes:

The stream processing model consists of four parts: Receiver, Filter, Mapper, and Sender. Each part as a node, can be connected and merged together to build a real-time data processing stream: Stream. In addition to Stream, there are some auxiliary parameters as the operating conditions of the entire service, which is stored together in the startup parameter type-Startup. The processing model is as follows:

 Configuration Parameters

The stream processing model is defined in JSON format. You can refer to the parameters described below and the corresponding examples to write a stream processing model file and publish it as Streaming Service. You can also use the Streaming Model Editor to build the model and set the parameters.

SparkParameter

Used to set Spark Streaming operation parameters, include:

Stream

Receiver

Inherited from StreamNode, as the entry of stream data processing, receiving data from a variety of sources, including Socket, WebSocket, Http, file system, etc. Receiver need to set metadata parameter to receive information. Receiver node consists of three parts: their own description information such as name, source, etc .; message metadata: metadata; message read format: reader.

Streaming services support the following receive methods:

SocketReceiver: inherited from Receiver, used to receive Socket message. The parameters to be specified are:

Example:

{
   "ipAddress" : "127.0.0.1",
   "port" : 9527, 
   "name" : "socketReceiver", 
   "source" : "Socket Receiver", 
   "description" : "Receive some message from socketServer", 
   "prevNodes" : [],
   "nextNodes" : [],
   "className": "com.supermap.bdt.streaming.receiver.SocketReceiver "
}

MultiSocketReceiver: inherited from Receiver, while receiving multiple nodes of the Socket message, the received message content must be the same. The parameters to be specified are:

Example:

{
  "servers": [
    "192.168.1.1:9527",
    "192.168.1.1:9528",
    "192.168.1.2:9527"
  ],
    "name": "multiSocketReceiver",
    "source": "MultiSource Socket Receiver",
    "description": "Receive message from multi socket server",
    "prevNodes": [],
    "nextNodes": [],
    "className": "com.supermap.bdt.streaming.receiver.MultiSocketReceiver"
}

  SocketServerReceiver: inherited from Receiver, Socket server receiving node, used as a server to receive other Socket customers to send the message. The parameters to be specified are:

{
  "port": 9527,
  "name": "socketServerReceiver",
  "source": "SocketServer Receiver",
  "description": "Receive message from socket client",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.SocketServerReceiver"
}

  WebSocketReceiver: inherited from Receiver, receive WebSocket message node. The parameters to be specified are:

{
  "url": "ws://192.168.1.1:9527/websocket ",
  "name": "webSocketReceiver",
  "source": "WebSocket Receiver",
  "description": "Receive message from websocket server",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.WebSocketReceiver"
}

  TextFileReceiver: inherited from Receiver, monitor the specified directory, and read the contents of the new file. The parameters to be specified are:

{
  "directoryPath": "'hdfs:///data/'",
  "name": "textFileReceiver",
  "source": "Text File Receiver",
  "description": "Listen new file in folder",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.TextFileReceiver"
}

SingleTextFileReceiver: inherited from Receiver, monitor the specified file, and read the contents of the specified file according to the settings. Supported file formats include Json, GeoJSON and CSV. The parameters to be specified are:

{
  "version": 9000,
  "sparkParameter": {
    "checkPointDir": "tmp",
    "interval": 5000
  },
  "stream": {
    "nodeDic": {
      "TextFileReceiver": {
        "filePath": "G:\\QQRev\\test.json",
        "readInterva": 1000,
        "rowsOneTime": 100,
        "reader": {
          "isJsonArray": false,
          "arrayExpression": "",
          "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
        },
        "metadata": {
          "title": "",
          "epsg": 3857,
          "fieldInfos": [
            {
              "name": "X",
              "source": "lon",
              "nType": "DOUBLE"
            },
            {
              "name": "Y",
              "source": "lat",
              "nType": "DOUBLE"
            },
            {
              "name": "mbbh",
              "source": "mbbh",
              "nType": "TEXT"
            }
          ],
          "featureType": "POINT",
          "idFieldName": "mbbh",
          "dateTimeFormat": "yyyy-MM-dd HH:mm:ss"
        },
        "name": "TextFileReceiver",
        "caption": "",
        "description": "",
        "prevNodes": [],
        "nextNodes": [
          "ConsoleSender"
        ],
        "className": "com.supermap.bdt.streaming.receiver.SingleTextFileReceiver"
      },
      "ConsoleSender": {
        "formatter": {
          "separator": ",",
          "className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
        },
        "name": "ConsoleSender",
        "caption": "",
        "description": "",
        "prevNodes": [
          "TextFileReceiver"
        ],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.ConsoleSender"
      }
    }
  }
}

  KafkaReceiver: A node that inherits from Receiver and receives kafka messages. The parameters to be specified are:

{  "servers": "192.168.1.1:9092, 192.168.1.2:9092, 192.168.1.3:9092 ,192.168.1.4:9092",
  "topics": [
    "topic1",
    "topic2"
  ],
  "groupid": "groupId",
  "offset": "latest",
  "name": "kafkaReceiver",
  "source": "Kafka Receiver",
  "description": "Receive message from Kafka",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.KafkaReceiver"
}

HttpReceiver: inherited from Receiver, a message node to receive HTTP. Currently only the Get method of HTTP is supported.

{
  "url": "https://api.wheretheiss.at/v1/satellites/25544",
  "name": "httpReceiver",
  "source": "HTTP Receiver",
  "description": "Get message from web",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
}

JMSReceiver: A node that inherits from the Receiver, receives the JMS standard protocol message, and receives messages such as ActiveMQ, RabbitMQ, and other message middleware.

{
  "url": "192.168.1.1",
  "port": 9527,
  "queueName": "data",
  "jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
  "userName": "user",
  "password": "password",
  "name": "jmsReceiver",
  "source": "JMS Receiver",
  "description": "Receive message from JMS(Java Message Service) for ActiveMQ",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.JMSReceiver"
}
metadata

metadata is written in the Receiver parameter, which is the metadata of the received message, used to describe the definition format of the message. The following information need to be specified:

reader

The content format of the received message, including CSV format, JSON format, or GeoJson format.

CSVFormatter: Indicates that the content of the received message is in CSV format. You need to specify:

"reader": {
    "separator": ",",
    "className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
  }

JsonFormatter: The format of the received message content is JSON. Here is an example:

"reader": {
    "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
  }

GeoJsonFormatter: The format of the received message content is GeoJSON. Here is an example:

"reader": {
    "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
  }

Filter

Inherited from StreamNode, used to filter the current data, and cleaning and sorting the data. You need to specify:

Table 1 list of logical operators supported by the filter parameter

Operator

Description
== Is equal to (=)
This operator holds objects whose attribute value is equal to the specified value. For example, [ID] = 3.
Note: double type, the contrast accuracy is10E-10. Must be used carefully.
!= Is not equal to (! =)
This operator retains objects whose attribute value is not equal to the specified value. For example, [Name]! = "A".
Note: double type, the contrast accuracy is10E-10. Must be used carefully.
> Greater than (>)
This operator retains objects whose property value is greater than the specified value. For example, [Speed]> 50
>= Greater than or equal to (> =)
This operator retains objects whose property value is greater than or equal to the specified value. For example, [Speed]>= 50
< Less than (<)
This operator retains an object whose attribute value is less than the specified value. For example, [X] < 10.231
<= Less than or equal to (<=)
This operator retains an object whose attribute value is less than or equal to the specified value. For example, [Y] <= 40
IN IN In the specified list
the operator retains the object when the value of the specified field exists in the comma-separated list of values. For example, [Code] IN HK1,HK3,HK5
MATCHES MATCHES Regular Expression Match
This operator holds the object when the value of the specified field matches the regular expression. For example, [Code] MATCHES "^ HK [135]”
Note: Regular expressions that need to be matched need to be enclosed in quotation marks
EXISTS Whether the EXISTS field exists
When the specified field exists in the received event scenario, the operator holds the object. For example, EXISTS [X].
ISNULL ISNULL whether it is empty
When the specified field contains a null value, the operator retains the object. For example, [X] ISNULL.

Mapper

Inherited from StreamNode, used to create field mapping and managing the fields, including: field mapping, adding fields, deleting fields, field operations and geofence.

Add a field

"insertMapper": {
        "insertIndex": 1,
        "fieldName": "XX",
        "nType": "DOUBLE",
        "expression": "[X] * 2",
        "name": "insertMapper",
        "source": "Insert Field",
        "description": "Insert Field by X * 2",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureInsertMapper"
      }

Delete field

"deleteMapper": {
        "deleteFieldNames": [
    "F1",
    "F2"
  ],
        "name": "deleteMapper",
        "source": "delete Field",
        "description": "delete Field F1 and F2,
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureDeleteMapper"
      }

Field mapping

"mapMaper": {
        "srcToDesNamePair": {
          "ID": "newID_Name",
          "Y": "newY_Name",
          "X": "newX_Name"
        },
        "srcToDesIndexPair": {
          "ID": 0,
          "Y": 2,
          "X": 1
        },
        "name": "mapMaper",
        "source": "Map Fields",
        "description": "Map Fields with new name and index",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureMapMapper"
      }

Field operation

"calculateMapper": {
        "fieldName ": Fcal,
        "expression": "[X] * 2",
        "name": "calculateMapper",
        "source": "calculate Field",
        "description": "calculate Field by X * 2",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureCalculateMapper "
      }

Geofence

"GeoFenceMapper": {
        "connection": {
          "type": "udb",
          "info": [
            {
              "server": "Z: \\airport.udb",
              "datasetNames": [
                "airports_40"
              ]
            }
          ]
        },
        "fenceName": "NAME",
        "fenceID": "SmID",
        "withinFieldName": "geoWithin",
        "statusFieldName": "geoStatus",
        "name": "GeoFenceMapper",
        "source": "Geomark Transformation",
        "description": "",
        "prevNodes": [
          "SocketReceiver"
        ],
        "nextNodes": [
          "GeoJsonSocketSender",
          "FenceWithinFilterOut",
          "FenceWithinFilterIn"
        ],
        "className": "com.supermap.bdt.streaming.map.GeoTaggerMapper"
      }

 

Sender

Inherited from StreamNode, as exit for stream data processing, sending data outwards. Includes:

Take the example of the commonly used WebSocket protocol, the configuration of sending data is shown as follows:

"webSocketSender": {
        "path": "ws://127.0.0.1/data",
        "name": "webSocketSender",
        "source": "WebSocket Sender",
        "description": "Send message to WebSocket Server",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender"
      }

Take outputting to ElasticSearch as an example:

      "ESSender": {
        "url": "192.168.168.33",
        "queueName": "aircondition",
        "directoryPath": "test1",
        "bitmap$0": false,
        "formatter": {
          "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
        },
        "name": "ESSender",
        "source": "ES sender
        "description": "",
        "prevNodes": [
          "TextFileReceiverJson"
        ],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.EsAppendSender"
      }

With the example settings above, you can output the result of the analysis of the data to the spatio-temporal database created by iServer DataStore.

Write a stream processing model file

According to the above parameters description, write a complete stream processing model file as the example shown below, then save it with a suffix of .streaming. Now you can quickly publish it as a streaming service. In the configuration information option, you can choose to specify the path of the .streaming file then publish, or you can fill in the content of the file in "Configuration information" option then finish publishing.

{
  "version": 9000,
  "sparkParameter": {
    "checkPointDir": "tmp",
    "interval": 10000
  },
  "stream": {
    "nodeDic": {
      "AQIReceiver": {
        "url": "http://www.supermapol.com/iserver/services/aqi/restjsr/aqi/pm2_5.json?bounds=-113.90625001585,-52.029966847235,113.90625001585,69.175579762077&to=910111",
        "reader": {
          "isJsonArray": true,
          "arrayExpression": "airQualityList",
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }, "metadata": { "title": "", "epsg": 3857, "fieldInfos": [ { "name": "X", "source": "location.x", "nType": "DOUBLE" }, { "name": "Y", "source": "location.y", "nType": "DOUBLE" }, { "name": "positionName", "source": "positionName", "nType": "TEXT" }, { "name": "aqi", "source": "aqi", "nType": "DOUBLE" } ], "featureType": "POINT" }, "name": "AQIReceiver", "caption": "", "description": "", "prevNodes": [], "nextNodes": [ "WebSocketClientSender" ], "className": "com.supermap.bdt.streaming.receiver.HttpReceiver" }, "WebSocketClientSender": { "path": "ws://127.0.0.1:8800/iserver/services/dataflow/dataflow/broadcast", "formatter": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter" }, "name": "WebSocketClientSender", "caption": "", "description": "", "prevNodes": [ "AQIReceiver" ], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender" } } } }