流处理模型 |
流数据服务(Streaming Service)以流处理模型作为服务来源,其中指定了运行服务所需的信息。
流数据处理流程包含:
依据上述处理流程,流数据的处理模型包含四个部分:Receiver(接收器)、Filter(过滤器)、Mapper(转换器)和Sender(发送器)。每个部分作为一个节点,可以进行连接和合并,构建成实时数据处理流 Stream。除了处理流 Stream 以外,还有一些辅助参数作为整个服务的运行条件,一并存储在启动参数类型 Startup 中。处理模型如下图:
流处理模型采用 JSON 格式定义,您可以参考下文介绍的参数及相应的 JSON 示例,编写一个流处理模型文件并发布为流数据服务。您也可以使用流处理模型编辑器构建模型,查看参数说明即可。
用于设置 Spark Streaming 的运行参数。包括:
Stream 中包含了实时数据处理运行流的参数。
继承自StreamNode,作为流数据处理的入口,接收各种来源的数据,包括Socket、WebSocket、Http、文件系统等。Receiver中需要设置接收信息的元数据,即metadata。Receiver节点包括三个部分组成:自身的描述信息如name、source等;消息的元数据metadata;消息的读取格式reader。
流数据服务支持以下接收方式:
SocketReceiver:继承自Receiver,接收Socket消息的节点。需指定的参数有:
ipAddress——String 类型。接收的Socket服务的IP地址
port——int 类型,接收的Socket服务的端口号
示例:
{ "ipAddress" : "127.0.0.1", "port" : 9527, "name" : "socketReceiver", "source" : "Socket Receiver", "description" : "Receive some message from socketServer", "prevNodes" : [], "nextNodes" : [], "className": "com.supermap.bdt.streaming.receiver.SocketReceiver " }
MultiSocketReceiver:继承自Receiver,同时接收多个Socket消息的节点,接收的消息内容必须是相同的。需指定的参数有:
servers——Array[String] 类型。需要接收的多个服务地址,每个数组对象为一个地址,地址与端口用冒号隔开。
示例:
{ "servers": [ "192.168.1.1:9527", "192.168.1.1:9528", "192.168.1.2:9527" ], "name": "multiSocketReceiver", "source": "MultiSource Socket Receiver", "description": "Receive message from multi socket server", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.MultiSocketReceiver" }
SocketServerReceiver:继承自Receiver,Socket服务端接收节点,用于作为服务端接收其他Socket客户的发送的消息。需指定的参数有:
port:int型。启动的Socket服务端监听端口。
{ "port": 9527, "name": "socketServerReceiver", "source": "SocketServer Receiver", "description": "Receive message from socket client", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.SocketServerReceiver" }
WebSocketReceiver:继承自Receiver,接收WebSocket消息的节点。需指定的参数有:
url:String 类型。WebSocket服务地址。
{ "url": "ws://192.168.1.1:9527/websocket ", "name": "webSocketReceiver", "source": "WebSocket Receiver", "description": "Receive message from websocket server", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.WebSocketReceiver" }
TextFileReceiver:继承自Receiver,监控指定目录,读取新增文件的内容。需指定的参数有:
directoryPath:监控的文件目录,如HDFS目录hdfs:///data/;Linux系统中的目录 /user/share/data;Windows系统中的目录C:/data。
{ "directoryPath": "'hdfs:///data/'", "name": "textFileReceiver", "source": "Text File Receiver", "description": "Listen new file in folder", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.TextFileReceiver" }
SingleTextFileReceiver:单文本文件接收器,继承自Receiver,根据设置读取监控文件的内容,支持读取 Json、GeoJSON 和 CSV格式的文件。需指定的参数有:
readInterva:读取时间的间隔。
rowsOneTime:每次读取的行数。
{ "version": 9000, "sparkParameter": { "checkPointDir": "tmp", "interval": 5000 }, "stream": { "nodeDic": { "TextFileReceiver": { "filePath": "G:\\QQRev\\test.json", "readInterva": 1000, "rowsOneTime": 100, "reader": { "isJsonArray": false, "arrayExpression": "", "className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }, "metadata": { "title": "", "epsg": 3857, "fieldInfos": [ { "name": "X", "source": "lon", "nType": "DOUBLE" }, { "name": "Y", "source": "lat", "nType": "DOUBLE" }, { "name": "mbbh", "source": "mbbh", "nType": "TEXT" } ], "featureType": "POINT", "idFieldName": "mbbh", "dateTimeFormat": "yyyy-MM-dd HH:mm:ss" }, "name": "TextFileReceiver", "caption": "", "description": "", "prevNodes": [], "nextNodes": [ "ConsoleSender" ], "className": "com.supermap.bdt.streaming.receiver.SingleTextFileReceiver" }, "ConsoleSender": { "formatter": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.CSVFormatter" }, "name": "ConsoleSender", "caption": "", "description": "", "prevNodes": [ "TextFileReceiver" ], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.ConsoleSender" } } } }
KafkaReceiver:继承自Receiver,接收kafka消息的节点。需指定的参数有:
{ "servers": "192.168.1.1:9092, 192.168.1.2:9092, 192.168.1.3:9092 ,192.168.1.4:9092", "topics": [ "topic1", "topic2" ], "groupid": "groupId", "offset": "latest", "name": "kafkaReceiver", "source": "Kafka Receiver", "description": "Receive message from Kafka", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.KafkaReceiver" }
HttpReceiver: 继承自Receiver,接收 HTTP 的消息节点,目前只支持HTTP的Get方法。
url:String类型。Http服务地址。
{ "url": "https://api.wheretheiss.at/v1/satellites/25544", "name": "httpReceiver", "source": "HTTP Receiver", "description": "Get message from web", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.HttpReceiver" }
JMSReceiver:继承自Receiver,接收JMS标准协议消息的节点,用于接收ActiveMQ、RabbitMQ等消息中间件的消息。
url——JMS消息服务地址
port——int类型。消息服务端口
queueName ——String类型。 消息队列名称
jdniName——String类型。对应消息中间件的JDNI名称,需要到中间件官网查询
username——String类型。用户名
password——String类型。密码
{ "url": "192.168.1.1", "port": 9527, "queueName": "data", "jdniName": "org.apache.activemq.jndi.ActiveMQInitialContextFactory", "userName": "user", "password": "password", "name": "jmsReceiver", "source": "JMS Receiver", "description": "Receive message from JMS(Java Message Service) for ActiveMQ", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.receiver.JMSReceiver" }
metadata 写在 Receiver 参数中,是接收消息的元数据,用于描述消息的格式定义。需指定以下信息:
接收的消息的内容格式,包括CSV格式(CSVFormatter)、JSON格式(JsonFormatter)或者GeoJSON格式(GeoJsonFormatter)。
CSVFormatter:表示接收的消息的内容格式为CSV格式。需指定:
separator:指定分隔符,默认为逗号
"reader": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.CSVFormatter" }
JsonFormatter:接收的消息内容格式为 JSON。示例如下:
"reader": { "className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }
GeoJsonFormatter:接收的消息内容格式为 GeoJSON。示例如下:
"reader": { "className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }
继承于StreamNode,用于过滤当前数据,进行数据的清洗与整理。需指定:
表1 filter参数支持的逻辑运算符列表
运算符 |
描述 |
== | 等于 (==) 该运算符保留属性值等于指定值的对象。例如,[ID] == 3。 注意:double类型慎用,对比精度为10E-10. |
!= | 不等于 (!=) 该运算符保留属性值不等于指定值的对象。例如,[Name] != “A”。 注意:double类型慎用,对比精度为10E-10. |
> | 大于 (>) 该运算符保留属性值大于指定值的对象。例如,[Speed] > 50 |
>= | 大于或等于 (>=) 该运算符保留属性值大于或等于指定值的对象。例如,[Speed] >= 50 |
< | 小于 (<) 该运算符保留属性值小于指定值的对象。例如,[X] < 10.231 |
<= | 小于或等于 (<=) 该运算符保留属性值小于或等于指定值的对象。例如,[Y] <= 40 |
IN | IN 在指定列表中 当在逗号分隔的值列表中存在指定字段的值时,该运算符保留对象。例如,[Code] IN HK1,HK3,HK5 |
MATCHES | MATCHES 正则表达式匹配 当指定字段的值与正则表达式相匹配时,该运算符保留对象。例如,[Code] MATCHES “^HK[135]” 注意:需要匹配的正则表达式需要用“”引号包含起来 |
EXISTS | EXISTS 字段是否存在 当已接收的事件方案中存在指定字段时,该运算符保留对象。例如,EXISTS [X]。 |
ISNULL | ISNULL 是否为空 当指定字段包含空值时,该运算符保留对象。例如, [X] ISNULL。 |
继承于StreamNode,用于建立字段映射以及对字段进行管理,主要包括:字段映射、添加字段、删除字段、字段运算以及地理围栏。
添加字段
"insertMapper": { "insertIndex": 1, "fieldName": "XX", "nType": "DOUBLE", "expression": "[X] * 2", "name": "insertMapper", "source": "Insert Field", "description": "Insert Field by X * 2", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureInsertMapper" }
删除字段
"deleteMapper": { "deleteFieldNames": [ "F1", "F2" ], "name": "deleteMapper", "source": "delete Field", "description": "delete Field F1和F2 , "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureDeleteMapper" }
字段映射
srcToDesNamePair:字段名称与新名称的对照关系
"mapMaper": { "srcToDesNamePair": { "ID": "newID_Name", "Y": "newY_Name", "X": "newX_Name" }, "srcToDesIndexPair": { "ID": 0, "Y": 2, "X": 1 }, "name": "mapMaper", "source": "Map Fields", "description": "Map Fields with new name and index", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureMapMapper" }
字段运算
"calculateMapper": { "fieldName ": Fcal, "expression": "[X] * 2", "name": "calculateMapper", "source": "calculate Field", "description": "calculate Field by X * 2", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.map.FeatureCalculateMapper " }
地理围栏
type——String类型。数据源类型
Info——Array[DsInfo]类型。数据源连接信息。
"GeoFenceMapper": { "connection": { "type": "udb", "info": [ { "server": "Z: \\airport.udb", "datasetNames": [ "airports_40" ] } ] }, "fenceName": "NAME", "fenceID": "SmID", "withinFieldName": "geoWithin", "statusFieldName": "geoStatus", "name": "GeoFenceMapper", "source": "地理标记转换", "description": "", "prevNodes": [ "SocketReceiver" ], "nextNodes": [ "GeoJsonSocketSender", "FenceWithinFilterOut", "FenceWithinFilterIn" ], "className": "com.supermap.bdt.streaming.map.GeoTaggerMapper" }
继承于StreamNode,作为流数据处理的出口,向外发送数据。包含:
以常用的 WebSocket 协议为例,发送数据配置如下:
"webSocketSender": { "path": "ws://127.0.0.1/data", "name": "webSocketSender", "source": "WebSocket Sender", "description": "Send message to WebSocket Server", "prevNodes": [], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender" }
以输出至 Elasticsearch 为例:
"ESSender": { "url": "192.168.168.33", "queueName": "aircondition", "directoryPath": "test1", "bitmap$0": false, "formatter": { "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter" }, "name": "ESSender", "source": "ES发送", "description": "", "prevNodes": [ "TextFileReceiverJson" ], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.EsAppendSender" }
参照上述示例设置,可以将数据的分析处理结果输出到 iServer DataStore 创建的时空数据库中。
根据上述参数说明,编写一个完整的流处理模型文件,您可以将文件另存为后缀为.streaming的文件,用于快速发布流数据服务,也可以将流处理模型文件的内容直接写入“配置信息”中进行发布。示例如下:
{ "version": 9000, "sparkParameter": { "checkPointDir": "tmp", "interval": 10000 }, "stream": { "nodeDic": { "AQIReceiver": { "url": "http://www.supermapol.com/iserver/services/aqi/restjsr/aqi/pm2_5.json?bounds=-113.90625001585,-52.029966847235,113.90625001585,69.175579762077&to=910111", "reader": { "isJsonArray": true, "arrayExpression": "airQualityList",
"className": "com.supermap.bdt.streaming.formatter.JsonFormatter" }, "metadata": { "title": "", "epsg": 3857, "fieldInfos": [ { "name": "X", "source": "location.x", "nType": "DOUBLE" }, { "name": "Y", "source": "location.y", "nType": "DOUBLE" }, { "name": "positionName", "source": "positionName", "nType": "TEXT" }, { "name": "aqi", "source": "aqi", "nType": "DOUBLE" } ], "featureType": "POINT" }, "name": "AQIReceiver", "caption": "", "description": "", "prevNodes": [], "nextNodes": [ "WebSocketClientSender" ], "className": "com.supermap.bdt.streaming.receiver.HttpReceiver" }, "WebSocketClientSender": { "path": "ws://127.0.0.1:8800/iserver/services/dataflow/dataflow/broadcast", "formatter": { "separator": ",", "className": "com.supermap.bdt.streaming.formatter.GeoJsonFormatter" }, "name": "WebSocketClientSender", "caption": "", "description": "", "prevNodes": [ "AQIReceiver" ], "nextNodes": [], "className": "com.supermap.bdt.streaming.sender.WebSocketClientSender" } } } }