主动复制结构
主动复制结构(ARF)页面详细描述了ARF通信器和复制器对象以及实现基于物联网(IoT)的ARF应用程序的基本步骤。
以下将介绍用于在数据收集点与服务器之间实现自动或按需数据交换的SmartEDB Active Replication Fabric C API。
ARF运行时支持由两个库构成:
- 通信库(
mcoiotcomm
):通信库负责处理网络连接(如IP套接字),并向协议库提供一组函数和结构体。其架构类似于SmartEDB集群(mcocltcp
+mcocluster
)和SmartEDB高可用性(mconwtcp
+mcoha
)的架构。 - 协议库(
mcoiotrepl
)。
应用程序可以利用这两个库提供的API进行开发。ARF API 对于设备数据库和服务器数据库来说几乎是相同的,并且所有功能均可被设备和服务器应用程序使用。
ARF通信器和复制器对象属于创建它们的进程,且永远不会在多个进程之间共享。默认情况下,这两个对象都是通过 mco_sys_malloc()
由动态分配器分配的。
另一种可能的分配器是静态分配器,它具有有限的连接数和有限的字典大小。
有关概述,请参阅主动复制结构页面。
示例
为了说明 ARF C API 的用法,以下代码片段展示了一个简单的 ARF 服务器和设备应用程序。
物联网设备端应用
考虑以下针对边缘设备的简单模式定义:
declare database iotdevice;
declare auto_oid[100];
iot device 1000;
uptable class Sensor
{
datetime ts;
unsigned<4> sensorId;
double value;
hash<ts, sensorId> idx[100];
};
downtable class Config
{
unsigned<4> param_id;
double value;
tree<param_id> idx;
};
以下代码片段将初始化 ARF 运行时,并为设备创建通信器和复制器:
int main(int argc, char *argv[])
{
...
mco_iot_replicator_params_t repl_params;
mco_iot_comm_params_t comm_params;
mco_iot_replicator_h repl;
mco_iot_comm_h comm;
const char *conn_string "127.0.0.1:15000";
...
mco_iot_comm_params_init(&comm_params);
CHECK(mco_iot_comm_create(&comm_params, &comm));
mco_iot_replicator_params_init(&repl_params);
CHECK(mco_iot_replicator_create(db, comm, &repl_params, &repl));
CHECK(mco_iot_replicator_connect(repl, conn_string, 2*1000, 0));
...
然后,在插入和/或修改传感器数据之后,设备可以与服务器数据库同步,然后使用如下代码关闭:
CHECK(mco_iot_replicator_sync(repl, MCO_IOT_SERVER_AGENT_ID, MCO_IOT_SYNC_PULL | MCO_IOT_SYNC_PUSH | MCO_IOT_SYNC_WAIT));
PrintConfig(db);
CHECK(mco_iot_replicator_stop(repl));
CHECK(mco_iot_comm_stop(comm));
CHECK(mco_iot_replicator_destroy(repl));
CHECK(mco_iot_comm_destroy(comm));
CHECK(mco_db_disconnect(db));
CHECK(mco_db_close(db_name));
free(dev.dev.conv.ptr);
mco_runtime_stop();
return 0;
}
PrintConfig()
函数只是打印出由 mco_iot_replicator_sync()
从服务器数据库下载的配置参数。
物联网服务端应用
请考虑以下用于与上述设备应用程序进行交互的服务器数据库的简单模式定义:
declare database iotserver;
declare auto_oid[100];
iot server 100;
uptable class Sensor
{
datetime ts;
unsigned<4> sensorId;
double value;
tree<ts, sensorId> idx;
};
downtable class Config
{
unsigned<4> param_id;
double value;
hash<param_id> idx[100];
};
以下代码片段将初始化 ARF 运行时环境,创建通信器和复制器,并开始监听设备连接:
int main(int argc, char *argv[])
{
...
mco_iot_replicator_params_t repl_params;
mco_iot_comm_params_t comm_params;
mco_iot_replicator_h repl;
mco_iot_comm_h comm;
...
mco_iot_comm_params_init(&comm_params);
CHECK(mco_iot_comm_create(&comm_params, &comm));
mco_iot_replicator_params_init(&repl_params);
CHECK(mco_iot_replicator_create(db, comm, &repl_params, &repl));
CHECK(mco_iot_comm_listen(comm, "15000", 0));
...
以下代码可用于修改配置数据并将更改传播到设备:
void InsertConfig(mco_db_h db)
{
Config obj;
mco_trans_h t;
CHECK(mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t));
CHECK(Config_new(t, &obj));
CHECK(Config_agent_id_put(&obj, 1000)); /* This record will be replicated to device with ID 1000 */
CHECK(Config_param_id_put(&obj, 1));
CHECK(Config_value_put(&obj, 3.1415));
CHECK(Config_new(t, &obj));
CHECK(Config_agent_id_put(&obj, MCO_IOT_ALL_AGENTS)); /* This record will be replicated to all devices */
CHECK(Config_param_id_put(&obj, 2));
CHECK(Config_value_put(&obj, 2.7182));
CHECK(mco_trans_commit(t));
}
使用常量 MCO_IOT_ALL_AGENTS
以使此记录复制到所有设备。 然后,服务器可以使用如下代码更新配置参数、打印当前传感器数据并关闭:
InsertConfig(db);
PrintSensors(db);
CHECK(mco_iot_replicator_stop(repl));
CHECK(mco_iot_comm_stop(comm));
CHECK(mco_iot_replicator_destroy(repl));
CHECK(mco_iot_comm_destroy(comm));
CHECK(mco_db_disconnect(db));
CHECK(mco_db_close(db_name));
free(dev.dev.conv.ptr);
mco_runtime_stop();
return 0;
}
PrintSensors()
函数只是简单地打印出当前活动设备的传感器(Sensor)数据。
API函数
请使用下面的链接查看 API 函数的参考指南页面:
函数 | 说明 |
---|---|
mco_iot_init() | 初始化 ARF(物联网)运行时环境 |
连接接口
函数 | 说明 |
---|---|
mco_iot_conn_get_address() | 返回连接地址 |
mco_iot_conn_get_agent_id() | 返回远程节点agentId |
mco_iot_conn_get_stat() | 接收连接统计信息 |
mco_iot_conn_get_error() | 接收最后一个错误描述 |
mco_iot_conn_get_comm() | 返回当前连接所属的通讯器 |
mco_iot_conn_release() | 释放“固定”连接 |
mco_iot_conn_disconnect() | 断开连接 |
通信器接口
函数 | 说明 |
---|---|
mco_iot_comm_params_init() | 设置通讯器的默认值 |
mco_iot_comm_create() | 创建通讯器对象 |
mco_iot_comm_register_callback() | 注册回调函数 |
mco_iot_comm_unregister_callback() | 从指定的上下文中删除回调函数 |
mco_iot_comm_listen() | 创建套接字以侦听指定的 IP 地址 |
mco_iot_comm_stop_listen() | 关闭 listen 套接字 |
mco_iot_comm_connect_async() | 异步调用指定地址connect() |
mco_iot_comm_stop_connect() | 中断通过mco_iot_comm_connect_async() |
mco_iot_comm_find_conn() | 通过以下方式查找连接agentId |
mco_iot_comm_next_conn() | 获取下一个活动连接 |
mco_iot_comm_stop() | 停止通信器 |
mco_iot_comm_destroy() | 销毁通讯器 |
复制器接口
函数 | 说明 |
---|---|
mco_iot_replicator_params_init() | 设置复制器的默认值 |
mco_iot_replicator_create() | 创建复制器对象 |
mco_iot_replicator_connect() | 同步调用connect() |
mco_iot_replicator_get_comm() | 返回指定复制器的基础通讯器 |
mco_iot_replicator_get_agent_id() | 返回指定节点的agent_id |
mco_iot_replicator_get_level() | 返回指定节点的级别 |
mco_iot_replicator_set_level() | 在运行时更改路由器的级别 |
mco_iot_replicator_sync() | 启动节点的同步 |
mco_iot_replicator_clear() | 删除对等体已传输和确认的数据 |
mco_iot_replicator_clear_class() | 仅从已由对等方传输并确认的指定类中删除数据 |
mco_iot_replicator_enum_agents() | 枚举活动连接 |
mco_iot_replicator_stop() | 停止复制器 |
mco_iot_replicator_destroy() | 销毁复制器 |