实现细节
同步复制与异步复制
SmartEDB 高可用性支持主节点与副本之间的同步和异步复制。同步复制需要副本确认每个事务,因此事务处理率主要受网络往返延迟(RTT)影响。对于某些应用,异步复制可能更合适。
异步复制使用缓冲区传递事务数据;主节点将事务数据放入缓冲区,由单独线程发送到副本(如 C API mco_HA_async_send_data_to_replicas())。在异步模式下,副本不确认事务,因此复制速度更快,仅依赖带宽而非网络延迟。但副本会滞后于主节点,滞后时间取决于缓冲区大小。
事务窗口
同步复制可设置事务窗口(如通过 C API 的 mco_HA_set_trans_window_size() 和 mco_HA_commit_window())。事务窗口每 N 个事务发送一次确认,减少了 RTT 影响,但副本最多可能滞后 N 个事务。注意,事务窗口对异步复制无效;异步缓冲区由应用程序分配,其大小用于优化吞吐量。
混合同步和异步复制
单个主应用程序可以与一个或多个副本实现同步复制,与其他副本实现异步复制。
当标志 MCO_HAMODE_FORCE_SYNC 被设置时,它会为给定的副本开启同步复制。通常该标志的设置方式如下:
mco_HA_replica_params_t replica_p;
...
mco_HA_replica_params_init(&replica_p);
...
replica_p.mode_flags = MCO_HAMODE_REPLICA_NOTIFICATION | MCO_HAMODE_FORCE_SYNC;
请注意,如果主服务器以同步模式运行,此设置将不起作用。(有关实现细节,请参阅示例 haasync 组件 rplsync。)
使用持久化数据库的复制
当副本需要将数据存储在持久化数据库中时,需考虑性能问题。同步复制会导致副本上的写入操作阻塞主节点。为优化性能,应根据应用需求调整副本的事务提交策略和超时参数。(有关详细信息,请参阅“持久化数据库 I/O”页面。)
使用 MCO_COMMIT_DELAYED 策略时,mco_trans_commit() 不会立即将事务写入磁盘,而是当以下任一阈值达到时才写入:
- log_params.delayed_commit_threshold:未提交数据大小(未写入磁盘)
- log_params.max_delayed_transactions:未写入事务数量
- log_params.max_commit_delay:提交与写入磁盘的最大延迟(毫秒)
db_params.log_params.delayed_commit_threshold = 64*1024;
db_params.log_params.max_delayed_transactions = 10;
db_params.log_params.max_commit_delay = 1000;
例如,设置为每 1 秒、每 10 笔交易或每 64K 更改的数据,以先到者为准。这些阈值仅在 mco_trans_commit() 调用时检查。因此,即使设置了 max_commit_delay 为 1 秒,执行小事务后,数据也不会立即写入磁盘,直到下一次 mco_trans_commit() 调用。
要确保每次事务提交时都将数据写入磁盘,可以使用 MCO_COMMIT_SYNC_FLUSH 策略。这是最持久的提交策略,但会导致 mco_trans_commit() 在主节点上等待数据写入副本的持久介质,可能非常慢。建议在这种情况下使用长事务(如在一个事务中插入 1000 个对象)。如果主数据库位于持久介质上,应用程序可以调用 mco_disk_flush() 刷新已提交的更改,但目前无法强制刷新副本数据库。
异步复制结合 MCO_COMMIT_SYNC_FLUSH 可避免阻塞主节点,但数据不会立即发送到副本,而是放入异步缓冲区并在 mco_trans_commit() 完成后发送。如果副本提交时间较长且主服务器的异步缓冲区较大,主服务器崩溃时这些数据可能会丢失。
多种通信通道
高可用性应用程序可以在多个通道上进行通信。函数 mco_HA_attach_master() 和 mco_HA_attach_replica() 根据连接字符串的内容选择通道实现。连接字符串传递给第一个注册的通道。如果通道实现识别该字符串(即字符串具有该通道实现的正确格式),则会使用该字符串。否则,字符串将传递给下一个实现,依此类推。每个通道都有其自己的唯一前缀,用于标识该通道的字符串(标准通道具有 tcp、udp 和 pipe 前缀)。当连接字符串中存在前缀时,字符串解析将暂停,并使用相应的通道。
例如:
:tcp:20000 - 主节点端,TCP 通道,监听端口 - 2000
:udp:127.0.0.1:30000 - 副本或主节点,UDP 通道,主机 - 127.0.0.1,端口 - 3000
每个通信通道都必须在主节点和副本节点中进行注册,这需要在调用 mco_HA_start() 之后以及调用任何其他高可用性(HA)函数之前,通过调用 mco_HA_channel_implementation_add() 来完成。例如:
mco_HA_start();
…
mco_HA_channel_implementation_add( mco_nw_tcpip_vt() );
mco_HA_channel_implementation_add( mco_nw_udpip_vt() );
mco_HA_channel_implementation_add( mco_nw_pipe_vt() );
并且主程序必须为每个已注册的通道实现创建监听线程(调用 mco_HA_attach_replica() 的线程)。这些监听线程仅在传递给 mco_HA_attach_replica() 的连接字符串上有所不同。 (有关实现细节,请参阅示例 hamultichan。)
副本接管为主节点
在大多数关键任务的高可用性(HA)应用中,副本应当能够在主节点出现故障时接管。在这种情况下,主节点和副本将是同一应用的副本,它们都清楚自己是以主节点还是副本的身份运行,副本在必要时能够切换角色。
这类应用程序的关键特性在于实现代码以检测主节点是否在运行,如果在运行则连接到它并以副本模式运行。否则,主节点由于某种原因已失效,应用程序必须接管成为主节点。
以下示例代码说明了一个 C/C++ 应用程序可能如何实现从副本到主节点的切换:
for (i = 0; i < max_retries; ++i)
{
printf("Try to attach master...");
master_mode = 0;
ReplicaParams.mode_flags = MCO_HAMODE_REPLICA_NOTIFICATION;
rc = mco_HA_attach_master(&attach_p.db, &ReplicaParams);
if (rc != MCO_S_OK)
{
printf("attempt #%d failed (rc=%d)\n", i + 1, rc);
sample_sleep(500);
}
else
{
break;
}
}
/* switch to MASTER mode */
/******* setup HA instance *********/
master_mode = 1;
stop_flag = exit_flag;
init_db = (attach_p.db == 0);
if (init_db) /* attach_master not detect "old" master */
{
/* create the master database */
printf("Create database for the first time\n");
rc = sample_open_database( db_name, switchdb_get_dictionary(),
DATABASE_SIZE, CACHE_SIZE,
MEMORY_PAGE_SIZE, PSTORAGE_PAGE_SIZE, 5,
&attach_p.memory);
if (rc != MCO_S_OK)
{
printf("Can't open database, error %d\n", rc);
return 1;
}
/* connect to the database, obtain a database handle */
rc = mco_db_connect(db_name, &attach_p.db);
if (rc != MCO_S_OK)
{
printf("Can't connect to database, error %d\n", rc);
return 1;
}
}
MasterParams.mode_flags = MCO_MASTER_MODE;
mco_HA_set_master_params(attach_p.db, &MasterParams); // set MASTER mode
sample_start_connected_task(&listen_task, ListenToReplicas, db_name,
&ha ); /* start the ListenToReplicas thread*/
sample_start_connected_task(&master_task, Master, db_name, (void*)
init_db); /* start the Master thread*/
sample_join_task (&listen_task);
sample_join_task (&master_task);
/****************** master clean up *****************/
mco_HA_stop(attach_p.db); /* detach all replicas */
}
同样,以下示例代码说明了一个 Java 应用程序可能如何实现从副本到主节点的切换:
HASwitch() throws Exception
{
…
// prepare replica parameters
ReplicaConnection rpl_con = new ReplicaConnection(db);
ReplicaConnection.Parameters replParams = new
ReplicaConnection.Parameters();
// attach to master. Analog of mco_nw_attach_master
try
{
System.out.println("Try to connect master...");
if (!rpl_con.attachMaster("localhost:" + PORT, replParams,
CONNECT_TIMEOUT))
{
System.err.println("Failed to connect to master : timeout");
}
}
catch (DatabaseError de)
{
System.err.println("Failed to connect to master : error " +
de.errorCode);
}
//stop & wait working thread
running = false;
inspectThread.join();
System.out.println("Replica is terminated, switch to MASTER mode");
rpl_con.disconnect();
/************* Master mode **************/
MasterConnection mst_con = new MasterConnection(db);
MasterConnection.Parameters mst_params = new
MasterConnection.Parameters( MasterConnection.MCO_HAMODE_ASYNCH );
mst_params.maxReplicas = MAX_REPLICAS;
mst_params.asyncBuf = new
Database.PrivateMemoryDevice( Database.Device.Kind.AsyncBuffer,
ASUNC_BUF_SIZE );
mst_con.setReplicationMode(mst_params);
listening = true;
// start listen and async. commit threads
Thread listenThread = new Thread( new Runnable() {
public void run() {
listen(); } } );
Thread replicateThread = new Thread( new Runnable() {
public void run() {
replicate(); } });
listenThread.start();
replicateThread.start();
//////////////////////////////////////////////////////
// Insert some data in the database if needed
//////////////////////////////////////////////////////
mst_con.startTransaction(Database.TransactionType.ReadWrite);
…
mst_con.commitTransaction();
// stop & wait threads
listening = false;
listenThread.join();
mst_con.stopReplication();
replicateThread.join();
mst_con.disconnect();
db.close();
System.out.println("Master completes it work");
}
分布式数据库的高可用性
复制构成了分片式 SmartEDB 数据库高可用性的基础。通过 SmartESQL SQL 由分布式 SQL 引擎支持分片。每个分片由一个高可用性主节点和一定数量的高可用性副本组成。(有关更多详细信息,请参阅 SmartESQL 用户指南。)
部分复制
对于高可用性(HA)应用程序而言,可能需要复制数据库中的一部分对象而非全部对象。为此,在 C/C++ 应用程序模式中,关键字 local 用于指示哪些类受部分复制的影响。主数据库中的本地类对象不会复制到副本节点;并且从主数据库接收到的副本数据库中的本地类对象的内容也不会写入数据库。主数据库和副本数据库上的本地类可以不同,并且无需启用二进制演化来强制执行部分复制。
C/C++ APIs
函数 mco_HA_enable_filter(mco_db_h db, mco_bool enabled) 可在运行时开启和关闭过滤功能。应在调用 mco_db_connect() 之后,但在调用 mco_HA_attach_replica() 或 mco_HA_attach_master() 之前调用此函数。如果定义了本地表,则默认开启过滤功能。因此,mco_HA_enable_filter(db, MCO_NO) 会关闭过滤功能——实际上会忽略模式中的任何本地类定义。如果模式未定义本地表,则此函数不会产生任何效果。 (有关部分复制的 C/C++ 示例,请参阅 samples/native/ha/hafilter。)
Java and C#
使用 C# 和 Java API 时,通过属性 local 来指定本地类,并且在运行时可以通过调用 MasterConnection 或 ReplicaConnection 的 enableFilter() 方法来开启/关闭部分复制。
MasterConnection::enableFilter(boolean enabled);
ReplicaConnection::enableFilter(boolean enabled);
Python
Python 包装器支持部分复制,方法是在模式定义中将类声明为本地类,就像 C/C++ 应用程序那样,然后调用 load_dictionary() 方法。例如:
schema = '''
#define uint4 unsigned<4>
#define uint4 unsigned<4>
declare database filtermstdb;
declare auto_oid [2000];
class T1
{
uint4 key;
unique tree<key> tkey;
};
class T2
{
uint4 key;
unique tree<key> tkey;
};
class T3
{
uint4 key;
unique tree<key> tkey;
};
local class T4
{
uint4 key;
unique tree<key> tkey;
};
'''
dict = exdb.load_dictionary(schema)
此外,与 Java 和 C# API 一样,在运行时可以通过调用 MasterConnection 或 ReplicaConnection 的 enableFilter() 方法来开启/关闭部分复制。
MasterConnection::enableFilter(boolean enabled);
ReplicaConnection::enableFilter(boolean enabled);
设置法定人数
在某些情况下,主节点仅在达到一定数量的副本(法定人数)连接时才对数据库执行更新操作。如果活动副本的数量少于法定人数,mco_trans_start() 或 mco_trans_commit() 将返回错误代码 MCO_E_HA_NO_QUORUM。此最小副本数量可通过 mco_HA_master_params::quorum 参数进行设置(默认值为 0)。
在某些情况下(例如,在提交期间网络中断),无法确定最后一个事务是否已被副本接收。在这种情况下,commit() 会将更改应用到主数据库,并返回状态码 MCO_S_HA_REPLICA_DETACH。此代码意味着,如果应用程序切换到副本,此事务可能会丢失。(请注意,只有当 mco_HA_master_params::quorum 大于 0 时,才可能出现 MCO_S_HA_REPLICA_DETACH。)
主参数 mco_HA_master_params::quorum 设置高可用性(HA)仲裁的初始值,而 mco_HA_set_quorum() C API 或其等效的 Java setQuorum()、C# SetQuorum() 或 Python setQuorum() API 用于在运行时更改 HA 仲裁。默认的仲裁值为 0,这意味着任何数量的活动副本都是可接受的(包括 0 个)。
请注意,只有在同步复制模式(即未设置 MCO_HAMODE_ASYNCH 标志)且事务窗口大小为 1(请参阅 mco_HA_set_trans_window_size())的情况下,才允许设置大于 0 的法定人数值。
扩展数据库内存
扩展 SmartEDB 数据库的内存大小是通过调用核心 API 函数 mco_db_extend() 来实现的。然而,由于此调用会引发内部写入事务,因此不能从副本中调用。所以扩展数据库内存的步骤是:首先从主节点调用 mco_db_extend(),然后在通知回调函数中,副本将响应 MCO_REPL_NOTIFY_MASTER_DB_EXTENDED 通知代码,通过调用 mco_db_extend_dev() 来进行处理。以下代码片段展示了在副本中如何实现这样的通知回调函数:
/* replica notification callback function */
void replica_notifying( uint2 notification_code, /* notification code */
uint4 param1, /* reserved for special cases */
void* param2, /* reserved for special cases */
void* context) /* pointer to the user-defined context */
{
char *context_str = (char*)context; /* get context */
switch (notification_code)
{
case MCO_REPL_NOTIFY_CONNECTED:
printf("\n** Notification ** Replica's been connected, “
“context = %s\n", context_str);
break;
case MCO_REPL_NOTIFY_DB_LOAD_OK:
printf("\n** Notification ** Database's been loaded “
“successfully, context = %s\n", context_str);
break;
case MCO_REPL_NOTIFY_MASTER_DB_EXTENDED:
{
MCO_RET rc;
/* Get the device size passed in param2 */
mco_size_t size = *((mco_size_t*) param2);
printf("\n** Notification ** Master's database was extended, “
“extend size %d bytes, context = %s\n",
(int)size, context_str);
extend_dev.type = MCO_MEMORY_CONV;
extend_dev.assignment = MCO_MEMORY_ASSIGN_DATABASE; extend_dev.size = size;
/* allocate memory and set device pointer */
extend_dev.dev.conv.ptr = (void*)malloc( extend_dev.size );
if (extend_dev.dev.conv.ptr)
{
rc = mco_db_extend_dev(db_name, &extend_dev);
printf("\nmco_db_extend_dev(), size %d : %s\n",
DATABASE_SIZE, (rc == MCO_S_OK) ? "OK" : "FAILED" );
}
break;
}
case MCO_REPL_NOTIFY_REPLICA_STOPPED:
{
const char* reason = "";
if (param1 == MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT) {
reason = "MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT";
}
printf("\n** Notification ** Replica stopped with the reason: “
“%d (%s), context = %s\n", param1, reason, context_str);
break;
}
default:
printf("\n** Notification ** Replica's been notified code = “
“%d, param1 = %u, param2 = %p, context = %s\n",
notification_code, param1, param2, context_str);
break;
}
}
请注意,设备的大小是通过第三个参数(param2)以指向 mco_size_t 的指针形式传递到通知回调函数中的,并且副本通过调用 mco_db_extend_dev() 来扩展其存储大小。
高可用性(HA)序列器 API
有时可能需要确定副本数据库中哪一个是最相关的。假设我们有一个配置,包含一个主节点和两个副本 R1 和 R2。然后在某个时刻 T1,R1 崩溃。稍后在时刻 T2(大于 T1),主节点和 R2 由于电源故障停止处理。在 R1 和 R2 应用程序冷启动后,必须确定哪个节点将成为新的主节点。为了解决这个问题,可以调用 mco_HA_get_sequencer() API 来返回“数据库版本号”(或从高可用性的角度来看“当前性”)。此数字可用于确定哪个数据库是最新的。
关于设置 detach_timeout 的说明
当主节点分离副本(显式分离或在 mco_HA_stop() 内部)时,它会发送 DETACH 消息以表明复制结束。detach_timeout 用于此 send() 操作。发送消息后(无论结果如何),主节点都会关闭与副本的通道。更改超时时间不会影响副本断开连接的速度;它会影响 mco_HA_detach() 调用内部的最大时间。将 detach_timeout 设置为 0 可能会导致 DETACH 消息根本不会发送,因此副本将不会收到分离通知。因此副本会遇到通道意外关闭的情况(并返回错误代码 MCO_E_NW_RECVERR)。所以不建议将 detach_timeout 设置为 0。