流处理模型 |
流数据服务(Streaming Service)以流处理模型作为服务来源,其中指定了运行服务所需的信息。
流数据处理流程包含:

依据上述处理流程,流数据的处理模型包含四个部分:Receiver(接收器)、Filter(过滤器)、Mapper(转换器)和Sender(发送器)。每个部分作为一个节点,可以进行连接和合并,构建成实时数据处理流 Stream。除了处理流 Stream 以外,还有一些辅助参数作为整个服务的运行条件,一并存储在启动参数类型 Startup 中。处理模型如下图:

流处理模型采用 JSON 格式定义,您可以参考下文介绍的参数及相应的 JSON 示例,编写一个流处理模型文件并发布为流数据服务。您也可以使用流处理模型编辑器构建模型,查看参数说明即可。
用于设置 Spark Streaming 的运行参数。包括:
Stream 中包含了实时数据处理运行流的参数。
继承自StreamNode,作为流数据处理的入口,接收各种来源的数据,包括Socket、WebSocket、Http、文件系统等。Receiver中需要设置接收信息的元数据,即metadata。Receiver节点包括三个部分组成:自身的描述信息如name、source等;消息的元数据metadata;消息的读取格式reader。
流数据服务支持以下接收方式:
SocketReceiver:继承自Receiver,接收Socket消息的节点。需指定的参数有:
ipAddress——String 类型。接收的Socket服务的IP地址
port——int 类型,接收的Socket服务的端口号
示例:
{
"ipAddress" : "127.0.0.1",
"port" : 9527,
"name" : "socketReceiver",
"source" : "Socket Receiver",
"description" : "Receive some message from socketServer",
"prevNodes" : [],
"nextNodes" : [],
"className": "com.supermap.bdt.streaming.receiver.SocketReceiver "
}
MultiSocketReceiver:继承自Receiver,同时接收多个Socket消息的节点,接收的消息内容必须是相同的。需指定的参数有:
servers——Array[String] 类型。需要接收的多个服务地址,每个数组对象为一个地址,地址与端口用冒号隔开。
示例:
{
"servers": [
"192.168.1.1:9527",
"192.168.1.1:9528",
"192.168.1.2:9527"
],
"name": "multiSocketReceiver",
"source": "MultiSource Socket Receiver",
"description": "Receive message from multi socket server",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.MultiSocketReceiver"
}
SocketServerReceiver:继承自Receiver,Socket服务端接收节点,用于作为服务端接收其他Socket客户的发送的消息。需指定的参数有:
port:int型。启动的Socket服务端监听端口。
{
"port": 9527,
"name": "socketServerReceiver",
"source": "SocketServer Receiver",
"description": "Receive message from socket client",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.SocketServerReceiver"
}
WebSocketReceiver:继承自Receiver,接收WebSocket消息的节点。需指定的参数有:
url:String 类型。WebSocket服务地址。
{
"url": "ws://192.168.1.1:9527/websocket ",
"name": "webSocketReceiver",
"source": "WebSocket Receiver",
"description": "Receive message from websocket server",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.WebSocketReceiver"
}
TextFileReceiver:继承自Receiver,监控指定目录,读取新增文件的内容。需指定的参数有:
directoryPath:监控的文件目录,如HDFS目录hdfs:///data/;Linux系统中的目录 /user/share/data;Windows系统中的目录C:/data。
{
"directoryPath": "'hdfs:///data/'",
"name": "textFileReceiver",
"source": "Text File Receiver",
"description": "Listen new file in folder",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.TextFileReceiver"
}
SingleTextFileReceiver:单文本文件接收器,继承自Receiver,根据设置读取监控文件的内容,支持读取 Json、GeoJSON 和 CSV格式的文件。需指定的参数有:
readInterva:读取时间的间隔。
rowsOneTime:每次读取的行数。
{
"version": 9000,
"sparkParameter": {
"checkPointDir": "tmp",
"interval": 5000
},
"stream": {
"nodeDic": {
"TextFileReceiver": {
"filePath": "G:\\QQRev\\test.json",
"readInterva": 1000,
"rowsOneTime": 100,
"reader": {
"isJsonArray": false,
"arrayExpression": "",
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
},
"metadata": {
"title": "",
"epsg": 3857,
"fieldInfos": [
{
"name": "X",
"source": "lon",
"nType": "DOUBLE"
},
{
"name": "Y",
"source": "lat",
"nType": "DOUBLE"
},
{
"name": "mbbh",
"source": "mbbh",
"nType": "TEXT"
}
],
"featureType": "POINT",
"idFieldName": "mbbh",
"dateTimeFormat": "yyyy-MM-dd HH:mm:ss"
},
"name": "TextFileReceiver",
"caption": "",
"description": "",
"prevNodes": [],
"nextNodes": [
"ConsoleSender"
],
"className": "com.supermap.bdt.streaming.receiver.SingleTextFileReceiver"
},
"ConsoleSender": {
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
},
"name": "ConsoleSender",
"caption": "",
"description": "",
"prevNodes": [
"TextFileReceiver"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.ConsoleSender"
}
}
}
}
KafkaReceiver:继承自Receiver,接收kafka消息的节点。需指定的参数有:
{ "servers": "192.168.1.1:9092, 192.168.1.2:9092, 192.168.1.3:9092 ,192.168.1.4:9092",
"topics": [
"topic1",
"topic2"
],
"groupid": "groupId",
"offset": "latest",
"name": "kafkaReceiver",
"source": "Kafka Receiver",
"description": "Receive message from Kafka",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.KafkaReceiver"
}
HttpReceiver: 继承自Receiver,接收 HTTP 的消息节点,目前只支持HTTP的Get方法。
url:String类型。Http服务地址。
{
"url": "https://api.wheretheiss.at/v1/satellites/25544",
"name": "httpReceiver",
"source": "HTTP Receiver",
"description": "Get message from web",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
}
JMSReceiver:继承自Receiver,接收JMS标准协议消息的节点,用于接收ActiveMQ、RabbitMQ等消息中间件的消息。
url——JMS消息服务地址
port——int类型。消息服务端口
queueName ——String类型。 消息队列名称
jdniName——String类型。对应消息中间件的JDNI名称,需要到中间件官网查询
username——String类型。用户名
password——String类型。密码
{
"url": "192.168.1.1",
"port": 9527,
"queueName": "data",
"jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory",
"userName": "user",
"password": "password",
"name": "jmsReceiver",
"source": "JMS Receiver",
"description": "Receive message from JMS(Java Message Service) for ActiveMQ",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.receiver.JMSReceiver"
}
metadata 写在 Receiver 参数中,是接收消息的元数据,用于描述消息的格式定义。需指定以下信息:
接收的消息的内容格式,包括CSV格式(CSVFormatter)、JSON格式(JsonFormatter)或者GeoJSON格式(GeoJsonFormatter)。
CSVFormatter:表示接收的消息的内容格式为CSV格式。需指定:
separator:指定分隔符,默认为逗号
"reader": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.CSVFormatter"
}
JsonFormatter:接收的消息内容格式为 JSON。示例如下:
"reader": {
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
}
GeoJsonFormatter:接收的消息内容格式为 GeoJSON。示例如下:
"reader": {
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
}
继承于StreamNode,用于过滤当前数据,进行数据的清洗与整理。需指定:
表1 filter参数支持的逻辑运算符列表
|
运算符 |
描述 |
| == | 等于 (==) 该运算符保留属性值等于指定值的对象。例如,[ID] == 3。 注意:double类型慎用,对比精度为10E-10. |
| != | 不等于 (!=) 该运算符保留属性值不等于指定值的对象。例如,[Name] != “A”。 注意:double类型慎用,对比精度为10E-10. |
| > | 大于 (>) 该运算符保留属性值大于指定值的对象。例如,[Speed] > 50 |
| >= | 大于或等于 (>=) 该运算符保留属性值大于或等于指定值的对象。例如,[Speed] >= 50 |
| < | 小于 (<) 该运算符保留属性值小于指定值的对象。例如,[X] < 10.231 |
| <= | 小于或等于 (<=) 该运算符保留属性值小于或等于指定值的对象。例如,[Y] <= 40 |
| IN | IN 在指定列表中 当在逗号分隔的值列表中存在指定字段的值时,该运算符保留对象。例如,[Code] IN HK1,HK3,HK5 |
| MATCHES | MATCHES 正则表达式匹配 当指定字段的值与正则表达式相匹配时,该运算符保留对象。例如,[Code] MATCHES “^HK[135]” 注意:需要匹配的正则表达式需要用“”引号包含起来 |
| EXISTS | EXISTS 字段是否存在 当已接收的事件方案中存在指定字段时,该运算符保留对象。例如,EXISTS [X]。 |
| ISNULL | ISNULL 是否为空 当指定字段包含空值时,该运算符保留对象。例如, [X] ISNULL。 |
继承于StreamNode,用于建立字段映射以及对字段进行管理,主要包括:字段映射、添加字段、删除字段、字段运算以及地理围栏。
添加字段
"insertMapper": {
"insertIndex": 1,
"fieldName": "XX",
"nType": "DOUBLE",
"expression": "[X] * 2",
"name": "insertMapper",
"source": "Insert Field",
"description": "Insert Field by X * 2",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureInsertMapper"
}
删除字段
"deleteMapper": {
"deleteFieldNames": [
"F1",
"F2"
],
"name": "deleteMapper",
"source": "delete Field",
"description": "delete Field F1和F2 ,
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureDeleteMapper"
}
字段映射
srcToDesNamePair:字段名称与新名称的对照关系
"mapMaper": {
"srcToDesNamePair": {
"ID": "newID_Name",
"Y": "newY_Name",
"X": "newX_Name"
},
"srcToDesIndexPair": {
"ID": 0,
"Y": 2,
"X": 1
},
"name": "mapMaper",
"source": "Map Fields",
"description": "Map Fields with new name and index",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureMapMapper"
}
字段运算
"calculateMapper": {
"fieldName ": Fcal,
"expression": "[X] * 2",
"name": "calculateMapper",
"source": "calculate Field",
"description": "calculate Field by X * 2",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.map.FeatureCalculateMapper "
}
地理围栏
type——String类型。数据源类型
Info——Array[DsInfo]类型。数据源连接信息。
"GeoFenceMapper": {
"connection": {
"type": "udb",
"info": [
{
"server": "Z: \\airport.udb",
"datasetNames": [
"airports_40"
]
}
]
},
"fenceName": "NAME",
"fenceID": "SmID",
"withinFieldName": "geoWithin",
"statusFieldName": "geoStatus",
"name": "GeoFenceMapper",
"source": "地理标记转换",
"description": "",
"prevNodes": [
"SocketReceiver"
],
"nextNodes": [
"GeoJsonSocketSender",
"FenceWithinFilterOut",
"FenceWithinFilterIn"
],
"className": "com.supermap.bdt.streaming.map.GeoTaggerMapper"
}
继承于StreamNode,作为流数据处理的出口,向外发送数据。包含:
以常用的 WebSocket 协议为例,发送数据配置如下:
"webSocketSender": {
"path": "ws://127.0.0.1/data",
"name": "webSocketSender",
"source": "WebSocket Sender",
"description": "Send message to WebSocket Server",
"prevNodes": [],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.WebSocketClientSender"
}
以输出至 Elasticsearch 为例:
"ESSender": {
"url": "192.168.168.33",
"queueName": "aircondition",
"directoryPath": "test1",
"bitmap$0": false,
"formatter": {
"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"name": "ESSender",
"source": "ES发送",
"description": "",
"prevNodes": [
"TextFileReceiverJson"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.EsAppendSender"
}
参照上述示例设置,可以将数据的分析处理结果输出到 iServer DataStore 创建的时空数据库中。
根据上述参数说明,编写一个完整的流处理模型文件,您可以将文件另存为后缀为.streaming的文件,用于快速发布流数据服务,也可以将流处理模型文件的内容直接写入“配置信息”中进行发布。示例如下:
{
"version": 9000,
"sparkParameter": {
"checkPointDir": "tmp",
"interval": 10000
},
"stream": {
"nodeDic": {
"AQIReceiver": {
"url": "http://www.supermapol.com/iserver/services/aqi/restjsr/aqi/pm2_5.json?bounds=-113.90625001585,-52.029966847235,113.90625001585,69.175579762077&to=910111",
"reader": {
"isJsonArray": true,
"arrayExpression": "airQualityList",
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter"
},
"metadata": {
"title": "",
"epsg": 3857,
"fieldInfos": [
{
"name": "X",
"source": "location.x",
"nType": "DOUBLE"
},
{
"name": "Y",
"source": "location.y",
"nType": "DOUBLE"
},
{
"name": "positionName",
"source": "positionName",
"nType": "TEXT"
},
{
"name": "aqi",
"source": "aqi",
"nType": "DOUBLE"
}
],
"featureType": "POINT"
},
"name": "AQIReceiver",
"caption": "",
"description": "",
"prevNodes": [],
"nextNodes": [
"WebSocketClientSender"
],
"className": "com.supermap.bdt.streaming.receiver.HttpReceiver"
},
"WebSocketClientSender": {
"path": "ws://127.0.0.1:8800/iserver/services/dataflow/dataflow/broadcast",
"formatter": {
"separator": ",",
"className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"name": "WebSocketClientSender",
"caption": "",
"description": "",
"prevNodes": [
"AQIReceiver"
],
"nextNodes": [],
"className": "com.supermap.bdt.streaming.sender.WebSocketClientSender"
}
}
}
}