流处理模型

发送反馈


流数据服务(Streaming Service)以流处理模型作为服务来源,其中指定了运行服务所需的信息。

流数据处理流程包含:

依据上述处理流程,流数据的处理模型包含四个部分:Receiver(接收器)、Filter(过滤器)、Mapper(转换器)和Sender(发送器)。每个部分作为一个节点,可以进行连接和合并,构建成实时数据处理流 Stream。除了处理流 Stream 以外,还有一些辅助参数作为整个服务的运行条件,一并存储在启动参数类型 Startup 中。处理模型如下图:

 配置参数

流处理模型采用 JSON 格式定义,您可以参考下文介绍的参数及相应的 JSON 示例,编写一个流处理模型文件并发布为流数据服务。您也可以使用流处理模型编辑器构建模型,查看参数说明即可。

SparkParameter

用于设置 Spark Streaming 的运行参数。包括:

Stream

Stream 中包含了实时数据处理运行流的参数。

Receiver

继承自StreamNode,作为流数据处理的入口,接收各种来源的数据,包括Socket、WebSocket、Http、文件系统等。Receiver中需要设置接收信息的元数据,即metadata。Receiver节点包括三个部分组成:自身的描述信息如name、source等;消息的元数据metadata;消息的读取格式reader。

流数据服务支持以下接收方式:

SocketReceiver:继承自Receiver,接收Socket消息的节点。需指定的参数有:

示例:

{
   "ipAddress" : "127.0.0.1",
   "port" : 9527, 
   "name" : "socketReceiver", 
   "source" : "Socket Receiver", 
   "description" : "Receive some message from socketServer", 
   "prevNodes" : [],
   "nextNodes" : [],
   "className": "com.supermap.bdt.streaming.receiver.SocketReceiver "
}

MultiSocketReceiver:继承自Receiver,同时接收多个Socket消息的节点,接收的消息内容必须是相同的。需指定的参数有:

示例:

{
  "servers": [
    "192.168.1.1:9527",
    "192.168.1.1:9528",
    "192.168.1.2:9527"
  ],
    "name": "multiSocketReceiver",
    "source": "MultiSource Socket Receiver",
    "description": "Receive message from multi socket server",
    "prevNodes": [],
    "nextNodes": [],
    "className": "com.supermap.bdt.streaming.receiver.MultiSocketReceiver"
}

  SocketServerReceiver:继承自Receiver,Socket服务端接收节点,用于作为服务端接收其他Socket客户的发送的消息。需指定的参数有:

{
  "port": 9527,
  "name": "socketServerReceiver",
  "source": "SocketServer Receiver",
  "description": "Receive message from socket client",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.SocketServerReceiver"
}

  WebSocketReceiver:继承自Receiver,接收WebSocket消息的节点。需指定的参数有:

{
  "url": "ws://192.168.1.1:9527/websocket ",
  "name": "webSocketReceiver",
  "source": "WebSocket Receiver",
  "description": "Receive message from websocket server",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.WebSocketReceiver"
}

  TextFileReceiver:继承自Receiver,监控指定目录,读取新增文件的内容。需指定的参数有:

{
  "directoryPath": "'hdfs:///data/'",
  "name": "textFileReceiver",
  "source": "Text File Receiver",
  "description": "Listen new file in folder",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.TextFileReceiver"
}

SingleTextFileReceiver:单文本文件接收器,继承自Receiver,根据设置读取监控文件的内容,支持读取 Json、GeoJSON 和 CSV格式的文件。需指定的参数有:

{
  "version": 9000,
  "sparkParameter": {
    "checkPointDir": "tmp",
    "interval": 5000
  },
  "stream": {
    "nodeDic": {
      "TextFileReceiver": {
        "filePath": "G:\\QQRev\\test.json",
        "readInterva": 1000,
        "rowsOneTime": 100,
        "reader": {
          "isJsonArray": false,
          "arrayExpression": "",
          "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
        },
        "metadata": {
          "title": "",
          "epsg": 3857,
          "fieldInfos": [
            {
              "name": "X",
              "source": "lon",
              "nType": "DOUBLE"
            },
            {
              "name": "Y",
              "source": "lat",
              "nType": "DOUBLE"
            },
            {
              "name": "mbbh",
              "source": "mbbh",
              "nType": "TEXT"
            }
          ],
          "featureType": "POINT",
          "idFieldName": "mbbh",
          "dateTimeFormat": "yyyy-MM-dd HH:mm:ss"
        },
        "name": "TextFileReceiver",
        "caption": "",
        "description": "",
        "prevNodes": [],
        "nextNodes": [
          "ConsoleSender"
        ],
        "className": "com.supermap.bdt.streaming.receiver.SingleTextFileReceiver"
      },
      "ConsoleSender": {
        "formatter": {
          "separator": ",",
          "className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
        },
        "name": "ConsoleSender",
        "caption": "",
        "description": "",
        "prevNodes": [
          "TextFileReceiver"
        ],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.ConsoleSender"
      }
    }
  }
}

KafkaReceiver:继承自Receiver,接收kafka消息的节点。需指定的参数有:

{  "servers": "192.168.1.1:9092, 192.168.1.2:9092, 192.168.1.3:9092 ,192.168.1.4:9092",
  "topics": [
    "topic1",
    "topic2"
  ],
  "groupid": "groupId", 
  "offset": "latest",
  "name": "kafkaReceiver",
  "source": "Kafka Receiver",
  "description": "Receive message from Kafka",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.KafkaReceiver"
}

HttpReceiver: 继承自Receiver,接收 HTTP 的消息节点,目前只支持HTTP的Get方法。

{
  "url": "https://api.wheretheiss.at/v1/satellites/25544",
  "name": "httpReceiver",
  "source": "HTTP Receiver",
  "description": "Get message from web",
  "prevNodes": [],
  "nextNodes": [],
  "className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
}

JMSReceiver:继承自Receiver,接收JMS标准协议消息的节点,用于接收ActiveMQ、RabbitMQ等消息中间件的消息。

{
  "url": "192.168.1.1",
  "port": 9527,
  "queueName": "data",
  "jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
  "userName": "user",
  "password": "password",
  "name": "jmsReceiver",
  "source": "JMS Receiver",
  "description": "Receive message from JMS(Java Message Service) for ActiveMQ",
  "prevNodes": [],
  "nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.JMSReceiver"
}
metadata

metadata 写在 Receiver 参数中,是接收消息的元数据,用于描述消息的格式定义。需指定以下信息:

reader

接收的消息的内容格式,包括CSV格式(CSVFormatter)、JSON格式(JsonFormatter)或者GeoJSON格式(GeoJsonFormatter)。

CSVFormatter:表示接收的消息的内容格式为CSV格式。需指定:

"reader": {
    "separator": ",",
    "className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
  }

JsonFormatter:接收的消息内容格式为 JSON。示例如下:

"reader": {
    "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
  }

GeoJsonFormatter:接收的消息内容格式为 GeoJSON。示例如下:

"reader": {
    "className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
  }

Filter

继承于StreamNode,用于过滤当前数据,进行数据的清洗与整理。需指定:

表1 filter参数支持的逻辑运算符列表

运算符

描述
== 等于 (==)
该运算符保留属性值等于指定值的对象。例如,[ID] == 3。
注意:double类型慎用,对比精度为10E-10.
!= 不等于 (!=)
该运算符保留属性值不等于指定值的对象。例如,[Name] != “A”。
注意:double类型慎用,对比精度为10E-10.
> 大于 (>)
该运算符保留属性值大于指定值的对象。例如,[Speed] > 50
>= 大于或等于 (>=)
该运算符保留属性值大于或等于指定值的对象。例如,[Speed] >= 50
< 小于 (<)
该运算符保留属性值小于指定值的对象。例如,[X] < 10.231
<= 小于或等于 (<=)
该运算符保留属性值小于或等于指定值的对象。例如,[Y] <= 40
IN IN 在指定列表中
当在逗号分隔的值列表中存在指定字段的值时,该运算符保留对象。例如,[Code] IN HK1,HK3,HK5
MATCHES MATCHES 正则表达式匹配
当指定字段的值与正则表达式相匹配时,该运算符保留对象。例如,[Code] MATCHES “^HK[135]”
注意:需要匹配的正则表达式需要用“”引号包含起来
EXISTS EXISTS 字段是否存在
当已接收的事件方案中存在指定字段时,该运算符保留对象。例如,EXISTS [X]。
ISNULL ISNULL 是否为空
当指定字段包含空值时,该运算符保留对象。例如, [X] ISNULL。

Mapper

继承于StreamNode,用于建立字段映射以及对字段进行管理,主要包括:字段映射、添加字段、删除字段、字段运算以及地理围栏。

添加字段

"insertMapper": {
        "insertIndex": 1,
        "fieldName": "XX",
        "nType": "DOUBLE",
        "expression": "[X] * 2",
        "name": "insertMapper",
        "source": "Insert Field",
        "description": "Insert Field by X * 2",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureInsertMapper"
      }

删除字段

"deleteMapper": {
        "deleteFieldNames": [
    "F1",
    "F2"
  ],
        "name": "deleteMapper",
        "source": "delete Field",
        "description": "delete Field F1和F2 ,
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureDeleteMapper"
      }

字段映射

"mapMaper": {
        "srcToDesNamePair": {
          "ID": "newID_Name",
          "Y": "newY_Name",
          "X": "newX_Name"
        },
        "srcToDesIndexPair": {
          "ID": 0,
          "Y": 2,
          "X": 1
        },
        "name": "mapMaper",
        "source": "Map Fields",
        "description": "Map Fields with new name and index",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureMapMapper"
      }

字段运算

"calculateMapper": {
        "fieldName ": Fcal,
        "expression": "[X] * 2",
        "name": "calculateMapper",
        "source": "calculate Field",
        "description": "calculate Field by X * 2",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.map.FeatureCalculateMapper "
      }

地理围栏

"GeoFenceMapper": {
        "connection": {
          "type": "udb",
          "info": [
            {
              "server": "Z: \\airport.udb",
              "datasetNames": [
                "airports_40"
              ]
            }
          ]
        },
        "fenceName": "NAME",
        "fenceID": "SmID",
        "withinFieldName": "geoWithin",
        "statusFieldName": "geoStatus",
        "name": "GeoFenceMapper",
        "source": "地理标记转换",
        "description": "",
        "prevNodes": [
          "SocketReceiver"
        ],
        "nextNodes": [
          "GeoJsonSocketSender",
          "FenceWithinFilterOut",
          "FenceWithinFilterIn"
        ],
        "className": "com.supermap.bdt.streaming.map.GeoTaggerMapper"
      }

 

Sender

继承于StreamNode,作为流数据处理的出口,向外发送数据。包含:

以常用的 WebSocket 协议为例,发送数据配置如下:

"webSocketSender": {
        "path": "ws://127.0.0.1/data",
        "name": "webSocketSender",
        "source": "WebSocket Sender",
        "description": "Send message to WebSocket Server",
        "prevNodes": [],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender"
      }

以输出至 Elasticsearch 为例:

      "ESSender": {
        "url": "192.168.168.33",
        "queueName": "aircondition",
        "directoryPath": "test1",
        "bitmap$0": false,
        "formatter": {
          "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
        },
        "name": "ESSender",
        "source": "ES发送",
        "description": "",
        "prevNodes": [
          "TextFileReceiverJson"
        ],
        "nextNodes": [],
        "className": "com.supermap.bdt.streaming.sender.EsAppendSender"
      }

参照上述示例设置,可以将数据的分析处理结果输出到 iServer DataStore 创建的时空数据库中。

编写流处理模型文件

根据上述参数说明,编写一个完整的流处理模型文件,您可以将文件另存为后缀为.streaming的文件,用于快速发布流数据服务,也可以将流处理模型文件的内容直接写入“配置信息”中进行发布。示例如下:

{
  "version": 9000,
  "sparkParameter": {
    "checkPointDir": "tmp",
    "interval": 10000
  },
  "stream": {
    "nodeDic": {
      "AQIReceiver": {
        "url": "http://www.supermapol.com/iserver/services/aqi/restjsr/aqi/pm2_5.json?bounds=-113.90625001585,-52.029966847235,113.90625001585,69.175579762077&to=910111",
        "reader": {
          "isJsonArray": true,
          "arrayExpression": "airQualityList",
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }, "metadata": { "title": "", "epsg": 3857, "fieldInfos": [ { "name": "X", "source": "location.x", "nType": "DOUBLE" }, { "name": "Y", "source": "location.y", "nType": "DOUBLE" }, { "name": "positionName", "source": "positionName", "nType": "TEXT" }, { "name": "aqi", "source": "aqi", "nType": "DOUBLE" } ], "featureType": "POINT" }, "name": "AQIReceiver", "caption": "", "description": "", "prevNodes": [], "nextNodes": [ "WebSocketClientSender" ], "className": "com.supermap.bdt.streaming.receiver.HttpReceiver" }, "WebSocketClientSender": { "path": "ws://127.0.0.1:8800/iserver/services/dataflow/dataflow/broadcast", "formatter": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter" }, "name": "WebSocketClientSender", "caption": "", "description": "", "prevNodes": [ "AQIReceiver" ], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender" } } } }