0%

从零构建物联网平台-构建关键组件

  1. 从零构建物联网平台-给个理由先
  2. 从零构建物联网平台-平台架构
  3. 从零构建物联网平台-需求规划、技术选型和系统设计
  4. 从零构建物联网平台-环境构建
  5. 从零构建物联网平台-MQTT消息代理(Message Broker)
  6. 从零构建物联网平台-构建关键组件
  7. 从零构建物联网平台-实现WebSocket通信

我们开始了物联网平台建设的旅程,并确定了最终目标:也就是我们首先设想了我们自己的物联网平台是什么样的以及它的高级需求内容(参见从零构建物联网平台-平台架构从零构建物联网平台-需求规划、技术选型和系统设计)。到目前为止,我们已经构建了一个功能齐全的服务器及基础软件。

本篇文章我们将:

  • 创建时序数据库
  • 用NODE-RED创建核心业务
  • 创建数据库监听
  • 构建基于REST API的消息发布和检索功能

创建核心时序数据库

我们知道时序数据库是物联网平台的关键模块之一。我们在之前的文章从零构建物联网平台-需求规划、技术选型和系统设计中建立了一个通用的数据存储模型。我们现在就创建表。

第一步是在MySQL中创建一个单独的数据库。我们命名其为tSeriesDB。

1
CREATE DATABASE tSeriesDB;

接下来创建一个数据表结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 建表SQL
DROP TABLE IF EXISTS `thingData`;
CREATE TABLE `thingData` (
`id` int(11) NOT NULL, -- 主键
`topic` varchar(1024) NOT NULL, --主题名
`payload` varchar(2048) NOT NULL, --消息内容
`timestamp` varchar(15) NOT NULL, --消息发生时间
`deleted` binary(1) NOT NULL DEFAULT '\0' --删除标识
) ENGINE=InnoDB;

-- 为ID创建自增
ALTER TABLE `thingData`
MODIFY `id` int(11) NOT NULL AUTO_INCREMENT, AUTO_INCREMENT=116;

-- 创建主键索引
ALTER TABLE `thingData` ADD PRIMARY KEY (`id`);

安装NODE-RED必要节点

MySQL中准备好所需的数据库和数据结构后,现在应该启用NODE-RED和所需的其他节点,然后将其配置好。

NODE-RED的默认安装没有访问MySQL的节点。因此我们将使用NODE-RED的节点管理器添加此节点。

在浏览器中打开你搭建NODE-RED系统,然后选择从右上角的菜单中打开节点管理。

按照以下步骤搜索mysql节点并进行安装。

安装完成后,在左侧的节点列表中就可以找到MySQL节点了。

这个节点意味着现在我们可以从NODE-RED访问MySQL数据库。在这个阶段,数据库和数据表已经准备好接受和存储输入值。Node-RED函数实例已经为配置和编程做好了准备,MQTT消息代理也已经开始工作。这些已经为创建一个数据库监听器和相关API创造好所有前提条件。

为我们的平台创建第一个流

安装NODE-RED后,我们没有向node环境添加任何东西。因此我们的画布是空的。我们会在一开始创建一个非常基本的流程。为此我们使用注入节点(inject node)。注入节点允许我们将消息注入到流中,可以是默认字符串或当前时间戳。可以通过单击节点上的按钮注入消息,也可以通过在节点的配置中设置时间间隔来设置循环注入。

从左侧节点列表中拖动一个注入节点,并将其放入工作区中。然后将一个调试节点(debug node)拖到工作区,然后通过将注入节点的输出连接到调试节点来将两个节点连接在一起。调试节点将窗口右侧栏上的调试消息区域中给定的任何输入发送给它。

我们刚刚创建的流只存在于编辑器屏幕上,没有激活。要激活此序列,需要点击部署Deploy按钮。

部署成功后,打开右侧调试栏,然后点击时间戳流上的按钮,你将看到一个输出(当前时间戳,UNIX微秒格式),发送到调试输出。这是一个基本而简单的流程。

我们对这个基本流做两个更改。注入节点当前在点击按钮时发送时间戳;我们将其转换为每15秒重复一次的自动注入操作。然后,我们从输出节点中添加另一个节点mqtt out到流中,并将时间戳注入节点的输出连接到其输入。

添加MQTT发布功能

现在让我们在NODE-RED中添加MQTT发布功能。开箱即用的mqtt节点需要进行配置。我们将提供消息代理详细信息和凭据来进行设置。有关设置此序列的四个步骤,请下图。在此配置中,我们使用端口1883与代理进行连接。

在上面的设置之后,流程如下图所示。在我们部署这个流程之后,时间戳注入就开始了,并且每10秒重复一次。时间戳的输出被发送到两个节点:debug和mqtt out。当debug节点在调试侧栏上显示注入的时间戳时,mqtt out将这个时间戳推入消息流,其中包含时间戳主题和当前UNIX时间戳(以微秒为单位)的值,类似于1587886555016这样的数值。

这个流程完成了需求列表中的需求M1(发布当前时间戳)。我们的第一个流程也在做同样的事情,我们以10秒的固定间隔将当前时间戳发布到消息流中。根据需求,你可以修改这个时间间隔以满足业务需求。

使用REST API发布消息

现在,我们已经看到了如何将消息从NODE-RED节点发布到MQTT,下面我们开发一个REST API来用于发布消息。我们的D4需求的一部分是,设备或应用程序应该能够使用HTTP协议发布消息。

首先,将HTTP节点从输入块拖动到工作区。HTTP节点能够处理来自Web的请求。双击节点并修改节点设置,如下图所示。我们创建一个端点/pub,其中包含两个在POST请求中传递的参数:topic和payload。我们可以在以下节点中作为消息对象的一部分访问这些参数:msg. req.params.topicmsg.req.params.payload

添加另一个mqtt输出节点并调整其设置,如下图所示。由于我们在创建时间戳程序时添加了代理配置,所以我们可以简单地使用相同的配置。我们没有在设置中指定主题,因为会在发布之前在函数节点中提供。

现在从功能面板中拖动两个函数节点(function node)并将它们连接起来,如图下图所示。在这个流程的末尾添加http response输出节点,提供有效负载作为API响应。作为REST API,有必要为流程提供一个http输入节点和一个http response输出节点。如果不添加,API请求永远不会结束,并引起超时。

我们没有对http输出节点进行任何配置更改。通常只有在发送和配置了额外的头信息或需要更改HTTP响应代码时,才会配置HTTP输出节点。但是,可以在前一个函数节点中更改http输出的报头和响应代码。

在这个流程中,有两个函数节点在其中编写了代码。下面解释这段代码。

1
2
3
4
5
6
// create message 
msg.topic = msg.req.params.topic;
msg.payload = msg.req.params.payload;
msg.qos = 2;
msg.retain = false;
return msg;

create message函数块中,接收来自http节点的输入。在http请求中传递的两个参数可以在msg消息对象中获取。在前两行中,我们将为msg对象中的topic和payload赋值。还将服务质量(QoS)设置为2以获得更好的可靠性,并将标记保留为false,因为不希望保留每个消息。

这些输入被传递给mqtt输出节点,该节点随后在提供的主题下发布给定的消息负载,并将QoS设置为2,而不将其保留在代理上。与此同时,我们通过创建一个响应并使用一个
http response节点。

1
2
3
4
5
6
7
8
9
// create response 
msg.payload = {
success: true,
message: "published " +
msg.req.params.topic +
"/" +
msg.req.params.payload
};
return msg;

作为http response输出节点,我们使用两个键修改有效负载。设置success = true表示使用有意义的响应消息发布有效负载。

设置并更新后,部署流程以使其生效。我们可以使用postman测试功能,如下面NODE-RED端点上的代码片段所示。

这个功能不能在web浏览器中直接测试,因为我们创建了一个POST请求。如果希望直接在web浏览器中使用它,只需通过更改HTTP输入节点设置将其转换为GET。

现在,我们的物联网平台增加了两个功能:第一,定期向MQTT消息流发布当前时间戳;第二,开发了消息发布REST API。让我们通过添加数据库监听器来进一步增强这个功能。

创建数据库监听

数据库监听本质上是某个程序或函数监听实时消息流并将其监听的所有内容存储在数据库中。在现在的场景中,我们使用MQTT建立了一个实时消息流。现在,我们开发一个功能,程序监听MQTT流,所有消息都保存到时序数据库中。

我们将mqtt输入节点添加到工作区,然后添加debug节点并将其连接到mqtt输入节点。这是最简单的流程,因为我们只需要配置一件事。注意,MQTT代理的详细信息已经添加了。

在mqtt输入节点中,我们需要提供订阅信息并设置代理。双击mqtt输入节点并配置,如下图所示。我们使用#订阅订阅所有消息,且QoS = 2为可靠订阅。配置完成后,部署流程并在侧栏中监视调试消息。

我们已经有了一个活动的时间戳发布器,它每10秒发布一次当前的时间戳,这些消息应该每10秒出现在调试窗口上。

现在,如果我们使用/pub API发布任何新消息,该消息也显示在调试窗口中。验证之后,让我们修改流程,从函数面板中添加一个函数节点,从存储面板中添加一个MySQL节点。连接这些节点并配置MySQL节点的设置,如下图所示。

下面的代码片段解释了在create query函数中编写的代码。

1
2
3
4
5
6
7
8
9
10
// Create query
// get microtime
var timestamp = new Date().getTime()/1000;
// pad it with trailing zeroes
timestamp = timestamp.toString() + "000";
// trim to exact length 10 + 1 + 3
timestamp = timestamp.substring(0, 14);
var strQuery = "INSERT INTO thingData (topic, payload,timestamp, deleted) VALUES ('" + escape(msg.topic) + "','" + escape(msg.payload) + "','" + timestamp + "', 0);";
msg.topic = strQuery;
return msg;

在代码的前三行中,我们从date对象获取最新的时间戳,并将其转换为零填充的字符串进行存储。第4行代码是插入SQL语句。

NODE-RED中的MySQL节点要求在msg对象中以msg.topic的形式传递查询。倒数第二行执行该赋值,然后函数将修改后的对象传递给MySQL节点执行该语句并将数据插入到数据库中。

部署之后,我们可以使用postman发布任何消息,或者只需等待10秒来发布时间戳。然后登录到MySQL并在数据库中验证是否添加了新记录。

REST API实现消息检索

现在,让我们创建一个API来检索存储在数据库中的消息。在我们的平台需求列表中,我们列出了两个需求。

  • D1:获取单条数据记录。允许应用系统和设备根据指定的主题从数据库中查询单条数据记录。
  • D2:获取多条数据记录。允许应用系统和设备根据指定的主题查询多条数据记录。

这两个API将以与之前相同的方式构建。但是这一次,我们使用MySQL节点的SELECT SQL访问和检索数据库。有关设置配置和流程请参见下图。

我们已经将两个http输入节点的输出绑定到相同的流程中。这样是在适应两种变化/get/:topic和/get/:topic/last/:count。第一个接口仅从时序数据库中检索一条消息,而第二个接口指定要检索的最新消息的数量。

下面的代码片段显示了为create query函数块编写的代码。

1
2
3
4
5
6
7
8
9
10
11
// Create query
// if required record count is not specified
// set default to 1
if(!msg.req.params.count)
msg.req.params.count = 1;
// build the sql query
msg.topic = "SELECT id,topic,payload,timestamp " +"FROM thingData " +
"WHERE topic='" + escape(msg.req.params.topic) + "' " +
"AND deleted=0 " +"ORDER BY id DESC " +"LIMIT " +
msg.req.params.count + ";";
return msg;

在这段代码中,前两行检查是否存在count参数,只有在请求多条最新数据时才需要此参数。因此,在单条查询中不存在此参数。如果不存在,我们将该参数设置为默认值1。

然后,我们使用SELECT查询来检索数据库。在这个查询中,我们使用WHERE来搜索指定的主题,而deleted=0只选择未删除的记录。另外,我们使用ORDER BY id DESC来检索最新的值,并通过使用count参数来限制输出。

由于它是一个时序数据库,并且由于构建数据库监听器的方式,所有的值都是在一个时序数据库中,最新的值在顶部(如果按ID降序排序)。让我们测试这两个API。

因为这些是GET请求,可以在浏览器中直接测试。

总结

从最初的构想来看,我们的功能已经完成了一半。我们构建了一个时序数据库,并将数据库监听器添加到平台中。我们还实现了REST API模块中中的两个关键组件。

坚持原创技术分享,您的支持将鼓励我继续创作!