涉及 HTTP & MQ 结合的无非就两大应用,通过 HTTP 对外提供接口服务和基于 HTTP 的数据上报,这篇文章将实现这两个功能
一、HTTP 服务接口
需求:客户需要数据库某张表的今日实时数据,但是客户很菜不会写SQL,想要给一个 HTTP 接口查看即可,不要求什么花里胡哨的报表展示。
1.1 准备数据
create table TEST_HTTP_MQ
(
FFID VARCHAR2(10),
ACID VARCHAR2(10),
FLNO VARCHAR2(10),
HBDATE DATE
);
INSERT INTO HCSS_AIRPORT_ODS.TEST_HTTP_MQ (FFID, ACID, FLNO, HBDATE) VALUES ('114628', 'CA', '3350', TO_DATE('2021-11-01 16:11:28', 'YYYY-MM-DD HH24:MI:SS'));
INSERT INTO HCSS_AIRPORT_ODS.TEST_HTTP_MQ (FFID, ACID, FLNO, HBDATE) VALUES ('114629', 'SC', '8686', TO_DATE('2021-11-01 14:11:47', 'YYYY-MM-DD HH24:MI:SS'));
INSERT INTO HCSS_AIRPORT_ODS.TEST_HTTP_MQ (FFID, ACID, FLNO, HBDATE) VALUES ('114630', '3U', '8633', TO_DATE('2021-10-01 16:12:14', 'YYYY-MM-DD HH24:MI:SS'));
1.2 flow 设计
拓扑接口如下
逻辑为:起一个 HTTP 服务检测一个地址,当有请求时触发 flow,Compute 里去数据库中查询数据并封装成 JSON 一份返回作为 HTTP 的 Response,另一份加入一个标签写入 MQ 做备份,起到日志的作用。
1.2.1 HTTP Input
配置 HTTP 请求地址:/api/getAFDS,最终的 HTTP 请求地址为:http://ip:port/api/getAFDS
1.2.2 Compute
这里封装 SQL,封装查询结果
CREATE COMPUTE MODULE HTTP_Compute
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
-- CALL CopyMessageHeaders();
-- CALL CopyEntireMessage();
DECLARE SQL CHARACTER;
CREATE FIELD OutputLocalEnvironment.TEMP.PMS;
CREATE FIELD OutputRoot.JSON.Data.data IDENTITY(JSON.Array);
DECLARE XSJC REFERENCE TO OutputLocalEnvironment.TEMP.PMS;
DECLARE I INTEGER 1;
DECLARE SYSDATE CHARACTER;
-- 获取当前处理时间
SET SYSDATE = CAST(CURRENT_TIMESTAMP AS CHARACTER FORMAT 'yyyyMMddHHmmss');
-- 待执行的SQL
SET SQL = 'SELECT FFID, ACID, FLNO, HBDATE
FROM HCSS_AIRPORT_ODS.TEST_HTTP_MQ
WHERE TRUNC(HBDATE) = TRUNC(SYSDATE)';
-- 将 SQL 放入数据库中执行,并获取查询结果
SET XSJC.Row[I] =PASSTHRU (SQL TO Database.orcl);
-- 封装 SQL 查询结果
SET OutputRoot.JSON.Data.Updatetime = SYSDATE;
SET OutputRoot.JSON.Data.msg = '成功';
SET OutputRoot.JSON.Data.success = 'true';
WHILE XSJC.Row[I].FFID IS NOT NULL DO
SET OutputRoot.JSON.Data.data.Item[I].FFID = XSJC.Row[I].FFID;
SET OutputRoot.JSON.Data.data.Item[I].ACID = XSJC.Row[I].ACID;
SET OutputRoot.JSON.Data.data.Item[I].FLNO = XSJC.Row[I].FLNO;
SET OutputRoot.JSON.Data.data.Item[I].HBDATE = XSJC.Row[I].HBDATE;
SET I = I+1;
END WHILE;
RETURN TRUE;
END;
CREATE PROCEDURE CopyMessageHeaders() BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER;
SET J = CARDINALITY(InputRoot.*[]);
WHILE I < J DO
SET OutputRoot.*[I] = InputRoot.*[I];
SET I = I + 1;
END WHILE;
END;
CREATE PROCEDURE CopyEntireMessage() BEGIN
SET OutputRoot = InputRoot;
END;
END MODULE;
1.2.3 Compute1
用来确认报文日志中的报文来源于哪个接口,起到日志作用
CREATE COMPUTE MODULE HTTP_Compute1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
-- CALL CopyMessageHeaders();
-- CALL CopyEntireMessage();
DECLARE STYP CHARACTER;
SET OutputRoot.JSON.* = InputRoot.JSON.*;
-- AFDS 为标识符,用来确认报文日志中的报文来源于哪个接口,可自行命名。
SET STYP= 'AFDS' ;
SET OutputRoot.JSON.Data.styp=STYP;
RETURN TRUE;
END;
CREATE PROCEDURE CopyMessageHeaders() BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER;
SET J = CARDINALITY(InputRoot.*[]);
WHILE I < J DO
SET OutputRoot.*[I] = InputRoot.*[I];
SET I = I + 1;
END WHILE;
END;
CREATE PROCEDURE CopyEntireMessage() BEGIN
SET OutputRoot = InputRoot;
END;
END MODULE;
1.3 测试
部署 flow 即可测试 [OK]
二、HTTP 数据上报
需求:根据数据协议开发数据上报服务
2.1 数据协议
接口说明:xxx 数据上报
请求地址:http://ip:port/scimsservice/FoUnpackResult
请求参数:xml 格式
基础数据项 | 数据名称 | 数据类型 | 说明 |
---|---|---|---|
消息名称 | MessageName | 字符型,最大50个字符 | FoUnpackResult |
消息生成时间 | MessageTime | 日期型 | 参见信息定义中的日期说明 |
行李条码号 | CheckID | 字符型,最大50个字符 | IATA或RFID等行李编号 |
扫描图像的Guid | ImageGuid | 字符型,最大100个字符 | 扫描图像的唯一标识 |
开包位置 | CheckPos | 字符型,最大50个字符 | 比如:A01、YZ01,如无位置信息传送空值 |
操作员ID | OptId | 字符型,最大50个字符 | 00000001 |
开包时间 | UnpackTime | 日期型 | 参见信息定义中的日期说明 |
违禁品信息 | ContrabandInfo | ---- | 违禁品信息,可没有,可多个 |
违禁品类型 | Type | 字符型,最大50个字符 | 违禁品类型名称 |
数量 | Count | 整型 | 数量 |
处理方式 | Handle | 字符型,最大50个字符 | 处理方式描述 |
违禁品拍照图像下载地址 | PhotoURL | 字符型,最大200个字符 | 违禁品拍照图像URL,可没有,可多个 |
数据样例:
<Envelope>
<Header>
<MessageName>FoUnpackResult</MessageName>
<MessageTime>2012-02-07T14:30:35+08:00</MessageTime>
</Header>
<Body>
<Record>
<CheckId>38291821342</CheckId>
<ImageGuid>ImageGuid1</ImageGuid>
<CheckPos>开包间</CheckPos>
<OptId/>
<UnpackTime>2012-02-07T14:30:35+08:00</UnpackTime>
<ContrabandInfo>
<Type>锂电池</Type>
<Count>1</Count>
<Handle>旅客自行处理</Handle>
</ContrabandInfo>
<ContrabandInfo>
<Type>危险物品</Type>
<Count>1</Count>
<Handle>移交</Handle>
</ContrabandInfo>
<PhotoURL>http://192.168.101.1:8000/1.jpg</PhotoURL>
<PhotoURL>http://192.168.101.1:8000/2.jpg</PhotoURL>
</Record>
</Body>
</Envelope>
2.2 数据库准备
根据数据协议创建对应的表存放将来上报的数据
create table SCIMS_FLEXONE_FoUnpackResult
(
MessageName varchar2(50),
MessageTime varchar2(50),
CheckType varchar2(50),
CheckID varchar2(100),
ImageGuid varchar2(100),
CheckPos varchar2(50),
OptId varchar2(50),
UnpackTime varchar2(50),
Type varchar2(50),
Count number,
Handle varchar2(50),
PhotoURL varchar2(200)
);
2.3 flow 设计
为了提高系统的稳定性,减少因数据解析错误导致数据上报失败,因此将数据上报和解析数据独立出来,数据上报后直接入队列,之后从队列取数据解析入库;这样可以保证数据上报一定是成功的。
2.3.1 上报 flow
Computer2
这里不做任何处理,将数据原封不动的写入队列
CREATE COMPUTE MODULE SCIMS_HTTP_Compute
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
-- CALL CopyMessageHeaders();
-- CALL CopyEntireMessage();
SET OutputRoot.XMLNSC.* = InputRoot.XMLNSC.*;
RETURN TRUE;
END;
CREATE PROCEDURE CopyMessageHeaders() BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER;
SET J = CARDINALITY(InputRoot.*[]);
WHILE I < J DO
SET OutputRoot.*[I] = InputRoot.*[I];
SET I = I + 1;
END WHILE;
END;
CREATE PROCEDURE CopyEntireMessage() BEGIN
SET OutputRoot = InputRoot;
END;
END MODULE;
Computer3
封装 Response 给上报方一定的反馈
CREATE COMPUTE MODULE SCIMS_HTTP_Compute1
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
-- CALL CopyMessageHeaders();
-- CALL CopyEntireMessage();
SET OutputRoot.JSON.Data.resCode = '0';
SET OutputRoot.JSON.Data.resMsg = '成功';
RETURN TRUE;
END;
CREATE PROCEDURE CopyMessageHeaders() BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER;
SET J = CARDINALITY(InputRoot.*[]);
WHILE I < J DO
SET OutputRoot.*[I] = InputRoot.*[I];
SET I = I + 1;
END WHILE;
END;
CREATE PROCEDURE CopyEntireMessage() BEGIN
SET OutputRoot = InputRoot;
END;
END MODULE;
2.3.2 解析 flow
这里的逻辑就是简单的取数据解析,同时写入另一个队列做一个备份(MQ Output3),解析失败入失败队列(MQ Output2)定期检查数据。
数据解析逻辑如下:
CREATE COMPUTE MODULE SCIMS_EXEC_Compute2
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
-- CALL CopyMessageHeaders();
-- CALL CopyEntireMessage();
DECLARE I INTEGER 1;
insert into Database.orcl.HCSS_AIRPORT_ODS.SCIMS_FLEXONE_FoUnpackResult(
messagename,
messagetime,
checktype,
checkid,
imageguid,
checkpos,
optid,
unpacktime,
type,
count,
handle,
photourl
) values (
InputRoot.XMLNSC.Envelope.Header.MessageName,
InputRoot.XMLNSC.Envelope.Header.MessageTime,
InputRoot.XMLNSC.Envelope.Body.Record.CheckType,
InputRoot.XMLNSC.Envelope.Body.Record.CheckID,
InputRoot.XMLNSC.Envelope.Body.Record.ImageGuid,
InputRoot.XMLNSC.Envelope.Body.Record.CheckPos,
InputRoot.XMLNSC.Envelope.Body.Record.OptId,
InputRoot.XMLNSC.Envelope.Body.Record.UnpackTime,
InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Type,
InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Count,
InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Handle,
InputRoot.XMLNSC.Envelope.Body.Record.PhotoURL[I]
);
SET I = I + 1;
WHILE InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Type IS NOT NULL DO
insert into Database.orcl.HCSS_AIRPORT_ODS.SCIMS_FLEXONE_FoUnpackResult(
messagename,
messagetime,
checktype,
checkid,
imageguid,
checkpos,
optid,
unpacktime,
type,
count,
handle,
photourl
) values (
InputRoot.XMLNSC.Envelope.Header.MessageName,
InputRoot.XMLNSC.Envelope.Header.MessageTime,
InputRoot.XMLNSC.Envelope.Body.Record.CheckType,
InputRoot.XMLNSC.Envelope.Body.Record.CheckID,
InputRoot.XMLNSC.Envelope.Body.Record.ImageGuid,
InputRoot.XMLNSC.Envelope.Body.Record.CheckPos,
InputRoot.XMLNSC.Envelope.Body.Record.OptId,
InputRoot.XMLNSC.Envelope.Body.Record.UnpackTime,
InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Type,
InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Count,
InputRoot.XMLNSC.Envelope.Body.Record.ContrabandInfo[I].Handle,
InputRoot.XMLNSC.Envelope.Body.Record.PhotoURL[I]
);
SET I = I + 1;
END WHILE;
RETURN TRUE;
END;
CREATE PROCEDURE CopyMessageHeaders() BEGIN
DECLARE I INTEGER 1;
DECLARE J INTEGER;
SET J = CARDINALITY(InputRoot.*[]);
WHILE I < J DO
SET OutputRoot.*[I] = InputRoot.*[I];
SET I = I + 1;
END WHILE;
END;
CREATE PROCEDURE CopyEntireMessage() BEGIN
SET OutputRoot = InputRoot;
END;
END MODULE;
2.4 测试
postman 模拟发送一个上报请求,查看数据库有无数据插入即可
评论区