C/C++ 中应用
要实现一个 C/C++ 集群应用程序,至少需要调用以下函数:
MCO_RET mco_cluster_init();
MCO_RET mco_cluster_params_init();
MCO_RET mco_cluster_db_open();
MCO_RET mco_cluster_listen();
MCO_RET mco_cluster_stop();
集群应用程序可以注册一个“仲裁检查回调”函数,以在数据库处理开始之前验证集群中是否存在必要的最小节点数。如果未注册仲裁回调函数,则集群运行时将使用 qrank 机制来验证仲裁。此外,还可以注册一个“通知回调”函数,以检测节点何时加入或离开集群。 以下代码片段演示了一个具有仲裁检查和连接通知回调函数的 C/C++ 集群应用程序:
const char dbName [] = "clusterdb";
// Listen-thread
THREAD_PROC_DEFINE (ClusterListen, p)
{
mco_db_h db;
mco_db_connect (dbName, & db);
mco_cluster_listen (db);
mco_db_disconnect (db);
return 0;
}
// Quorum check callback function. It must return MCO_YES if the node can start
// (or continue) to work:
// parameters:
// uint2 *neighbor_ids - array of active neighbors
// uint2 n_neighbors - number of active neighbors in the neighbor_ids array
// void *param - user-defined context
mco_bool my_check_quorum(uint2 *neighbor_ids, uint2 n_neighbors, void *param)
{
mco_bool result = MCO_YES;
// <some logic>
return result;
}
// Notification callback function.
void cluster_notifying(mco_db_h db, mco_cluster_notification_t notification_code,
mco_cluster_node_info_t *node_info,
void *param, void *user_context)
{
mco_cluster_node_info_t nodes_info[MCO_MAX_CLUSTER_SIZE];
uint2 nd, n_nodes_info = MCO_MAX_CLUSTER_SIZE;
printf("***NOTIFICATION*** Node %d (%s, qrank %d) %s.\n", node_info->node_id,
node_info->addr, node_info->qrank,
notification_code == MCO_CLUSTER_NOTIFY_NODE_CONNECT ?
"CONNECTED" : "DISCONNECTED");
mco_cluster_get_active_nodes(db, nodes_info, &n_nodes_info);
printf("List of active nodes: \n --------------------------\n");
for (nd = 0; nd < n_nodes_info; ++nd)
{
printf("ID %d, Addr %s, Qrank %d\n", nodes_info[nd].node_id,
nodes_info[nd].addr, nodes_info[nd].qrank);
}
}
int main ()
{
mco_cluster_params_t cl_params;
mco_cluster_node_params_t nodes [2];
mco_device_t dev [4];
unsigned int n_dev;
mco_db_params_t db_params;
// Initialize cluster runtime
mco_cluster_init ();
// Initialize cluster parameters
mco_cluster_params_init(&cl_params);
// Prepare dev, n_dev, db_params, as usual for mco_db_open_dev()
// Fill the information about nodes
strcpy (nodes[0].addr, "node1:10000")
strcpy (nodes[1].addr, "node2:10000");
nodes[0].qrank = nodes [1].qrank = 1;
cl_params.nodes = nodes;
cl_params.n_nodes = 2
cl_params.node_id = local_id; // on node1 this value should be 0
// on node2 this value should be 1, and so on
// set Quorum callback function
cl_params.check_quorum_func = my_check_quorum;
cl_params.check_quorum_param = 0; // don't use the context
// set Notification callback function
cl_params.notifying_callback = cluster_notifying;
cl_params.notifying_context = 0; // don't use the context
/ * create cluster database * /
mco_cluster_db_open (dbName,
clusterdb_get_dictionary(),
dev,
n_dev,
&db_params,
&cl_params);
/ * Start mco_cluster_listen() thread * /
createThread (ClusterListen, 0, & hListenThread);
/ * Work with the database as usual * /
/ * Stop the cluster * /
mco_cluster_stop ();
/ * Wait for the listen-thread to stop * /
THREAD_JOIN (hListenThread, res);
/ * Close the database * /
mco_db_close ();
}
在使用 mco_cluster_db_open() 函数时,传递的 mco_cluster_params_t 结构体(cl_params)包含了集群的配置信息。通常,这个结构体会定义节点的数量(n_nodes)、当前节点的整数标识符(node_id),以及一个包含其他节点信息的数组(nodes)。每个 mco_cluster_node_params_t 结构体中记录了节点 ID(例如,nodes[0] 对应 node_id=0,nodes[7] 对应 node_id=7 等等)、地址和 qrank 值。此外,还可以指定用于仲裁检查和节点通知的回调函数。
在这个示例中,我们使用了一个名为 my_check_quorum() 的仲裁检查回调函数。通过这个函数,应用程序可以验证在线节点的数量,确保集群有足够的节点来正常工作。
请注意,在处理集群数据库时,mco_trans_commit() 函数可能会返回 MCO_E_CLUSTER_NOQUORUM 错误代码。这表示当一个或多个节点出现故障时,剩余的节点无法构成法定人数,导致事务无法继续。
另外,我们还使用了通知回调函数(notifying_callback)。当有节点连接到或从集群断开连接时,应用程序会收到一个连接事件(notification_code = connect / disconnect),并附带有关节点的信息(node_info)和应用程序上下文(user_context)参数。
需要注意的是,通知回调函数是在监听器任务的上下文中调用的。因此,它必须快速完成其任务,并且不能执行任何事务性操作或主动的集群管理活动。在这种情况下,只有 mco_cluster_info() 和 mco_cluster_get_active_nodes() 是合适的选择。
向正在运行的集群添加节点
新节点可以通过简单地调用 mco_cluster_db_open() 函数加入现有的集群,传入唯一的节点 ID 以及正在运行的节点之一的地址:端口和节点 ID。例如,假设我们有节点 A 和节点 B 作为集群运行,其中节点 A 的节点 ID 为 0,节点 B 的节点 ID 为 1。新节点 N 知道节点 A 但不知道节点 B。它选择节点 ID 为 2,并按如下方式填充 node_params 结构:
strcpy(node_params[0].addr, "nodeA:10000");
strcpy(node_params[1].addr, ""); // set empty address, because don't know about nodeB
strcpy(node_params[2].addr, "<my address and port>");
要将节点 N 加入集群,通常我们会调用 mco_cluster_db_open() 函数。请注意,如果节点 N 不小心选择了已被节点 B 占用的节点 ID(例如 node_id = 1),mco_cluster_db_open() 将返回 MCO_E_CLUSTER_DUPLICATE_NODEID 错误代码。为了确定集群中的成员及其节点 ID,您可以使用 mco_cluster_discover() 函数来获取相关信息。
此外,节点还可以通过调用 mco_cluster_detach() 和 mco_cluster_attach() 函数来脱离或重新加入集群。当一个节点加入集群时,它只需要知道其中一个相邻节点的地址即可。然后,它会从集群运行时获取最新的节点列表信息。
例如,考虑一个由四个节点 A、B、C 和 D 组成的集群。它们的节点列表、node_id 和 qrank 设置如下:
- - 节点 A:{"A", "B"},node_id = 0,qrank = 1 —— 节点 A 知道自己的地址以及节点 B 的存在,但不知道 C 和 D。
- - 节点 B:{"A", "B"},node_id = 1,qrank = 1 —— 节点 B 知道自己的地址以及节点 A 的存在,但不知道 C 和 D。
- - 节点 C:{"A", "B", "C"},node_id = 2,qrank = 0 —— 节点 C 知道节点 A 和 B 的存在。qrank = 0 表示断开 C 不会影响仲裁。
- - 节点 D:{"A", "B", "", "D"},node_id = 3,qrank = 0 —— 节点 D 知道节点 A 和 B 的存在,但不确定 C 是否连接。qrank = 0 表示断开 D 不会影响仲裁。
首先启动的是节点 A 和 B,它们互相知道对方的地址,因此可以相互通信。接下来,节点 C 加入集群并连接到 A 和 B,A 和 B 也会将 C 添加到它们的集群列表中。然后,节点 D 加入集群,它首先连接到已知的 A 和 B,并从 A 和 B 获取更新后的集群列表(此时已包含 C)。现在,D 也可以连接到 C。
mco_cluster_detach() API 用于将数据库从集群中分离。分离后,数据库立即可以进行读写访问。请注意,mco_cluster_detach() 会减少集群的总 qrank 值。例如,假设集群中有两个工作节点 A 和 B,它们的 qrank 都为 1。如果节点 B 发生故障,集群(即节点 A)将停止运行,因为此时没有足够的仲裁(活动节点的 qrank 总和必须大于所有节点 qrank 总和的一半,而 1 并不大于 2 / 2 = 1)。然而,如果节点 B 主动调用 mco_cluster_detach(),节点 A 可以继续作为集群运行,因为这是“自愿脱离”,而不是网络或节点故障。节点 B 的 qrank 将变为 0,仲裁仍然有效(1 > 1/2)。
要确定是否有“动态”节点(即脱离或加入集群的节点),您可以随时调用 mco_cluster_get_active_nodes() API。在调用此函数之前,请先调用 mco_cluster_info(db, &cl_info),以获取包括活动节点数量(cl_info.n_active_nodes)在内的信息。然后,根据这个参数分配适当的 nodes_info 数组空间,再调用 mco_cluster_get_active_nodes()。
限制复制
有时,将自动化复制限制在有限的一组类中是有益的。这是通过将某些类声明为分布式类来实现的。分布式类在模式中声明,这些类的对象不会自动分发到集群。相反,每个节点都会缓冲(保留一个内部列表)对其分布式类的所有插入、修改和删除操作。更新本地对象的事务不会触发任何与网络相关的活动。
应用程序会定期通过调用以下函数之一来触发分布式对象到集群的复制:
MCO_RET mco_cluster_scatter(mco_db_h db, uint2 *class_codes, uint2 n_class_codes,
uint2 *node_ids, uint2 n_node_ids);
MCO_RET mco_cluster_gather(mco_db_h db, uint2 *class_codes, uint2 n_class_codes,
uint2 *node_ids, uint2 n_node_ids);
应用程序可以将对分布式对象的更新推送到一个或多个集群节点,或者从一个或多个集群节点拉取更新到一组远程分布式对象。这两种操作都是原子性的,确保了数据的一致性。如果检测到冲突,整个推送或拉取操作都会回滚,以保证数据的完整性。mco_cluster_scatter() 函数会在上次成功复制数据之后推送对分布式表的更新。同样,mco_cluster_gather() 函数会在上次成功复制之后拉取对感兴趣的远程“分布式”对象的更新。
例如,考虑一个由三个节点 A、B 和 C 以及两个分布式类 X 和 Y 组成的场景:
- 节点 A 创建了对象 x1。
- 节点 B 创建了对象 x2。
- 节点 A 将 {X} 分散到 {B} [A 向 B 发送 x1]。
- 节点 B 从 {A, C} 收集 {X} [无需发送,因为 x1 已经在 B 上]。
- 节点 A 创建了对象 y1。
- 节点 A 将 {X, Y} 分散到 {B, C} [A 将 y1 发送到 B,并将 (x1, y1) 发送到 C]。
- 节点 C 从 {A, B} 收集 {X, Y} [C 从 B 获取 x2(x1 和 y1 已经在 C 上)]。
通过这种方式,集群(尤其是分布式类)允许在网络中添加硬件(独立节点)以提高整体吞吐量。如果所有数据都自动分布,由于网络相关的开销,可扩展性会受到限制。使用分布式特性,您可以分割数据并在本地快速处理这些数据段。然后,您可以通过 mco_cluster_scatter() 和 mco_cluster_gather() 函数自动共享本地处理的结果。
另一种限制被复制的数据的方法是在模式中使用本地声明。被声明为本地的类的对象永远不会被复制——也就是说,它们仅在声明它们的单个节点的本地数据库中维护,而不会分发到其他节点。