策略及实现
概述
可以生成用于恢复主数据库的事务日志,也可能不生成。函数 mco_translog_iterate()
负责读取要传递给外部数据存储的数据,并依次调用用户定义的回调函数。传递给此回调函数的数据是已记录事务内创建、删除或修改的对象。这与使用标准的函数 mco_trans_iterate()
遍历事务影响的对象类似,但 mco_trans_iterate()
仅遍历当前事务中的对象,因此提供了同步导出此数据的能力,而使用 mco_translog_iterate()
可以在任何时候读取日志文件并“导出”它。
写入进程:从启动事务日志记录并在数据库内部处理数据的进程。
读取进程:调用用户定义的回调函数将数据传输到外部数据存储的进程。
从写入进程向读取进程提供数据有两种方式。
- 第一种方法是创建一个日志文件。在日志写入器关闭该日志文件之后,可以像在普通 TL 使用中打开日志文件以恢复主数据库那样,打开并读取该日志文件以进行导出。
- 第二种方法是使用由事务日志提供的管道机制。在这种情况下,数据通过内存缓冲区进行传输,该缓冲区要么位于常规内存中,要么位于共享内存中。这种方法比基于文件的方法更快,但可能会造成竞争情况:
- 如果读取进程(调用函数 mco_translog_iterate() 的进程)读取数据的速度不够快,那么写入进程就会阻塞,导致管道缓冲区中没有可用内存。这会使写入进程一直阻塞,直到读取进程释放所需的管道缓冲区大小。可以通过在缓冲区溢出时用临时文件扩展主管道缓冲区来解决此问题;这样写入进程内部就不会出现阻塞情况。
通过设置标志 MCO_TRANSLOG_DUAL_OUT
,这两种方法可以结合起来。
请注意,在使用混合数据库的 TL 时,若关闭磁盘管理器日志记录( mco_db_open_dev()
设置 NO_LOG
日志类型),性能会有所提升。若数据库仅包含持久化类,那么使用 TL 的唯一原因就是将事务导出到外部系统,因为 SmartEDB 本身具有独立于事务日志记录可选模块的内置事务日志记录实现。
异步导出通过调用带有标志 MCO_TRANSLOG_ITERABLE
的函数 mco_translog_start()
来启动。这允许通过调用函数 mco_translog_iterate() 来读取并迭代日志。
无论读取器和写入器进程之间使用的是文件还是管道,标志 MCO_TRANSLOG_ITERABLE
都是正确启动中继进程所必需的。写入器和读取器可以是单个进程中的线程,也可以是在单台计算机上运行的不同进程(应用程序)。当使用基于文件的方法时,这些应用程序甚至可以在不同的计算机上——如果将已记录的事务应用到外部数据存储的过程在不同的计算机上运行,那么尽管基于文件的方法速度较慢,但这是唯一的选择。
普通日志文件传输给读取器的媒介
若要使用普通日志文件将事务导出到外部数据存储,必须通过函数 mco_translog_iterate()
打开该文件。要成为有效的日志文件,它必须在写入方通过 mco_translog_stop()
关闭,或者通过调用带有 MCO_TRANSLOG_RESTART
的 mco_translog_start()
间接关闭。
打开基于简单文件的日志
#define DISK_PAGE_SIZE 4096
#define LOG_FILE "/tmp/mylog"
mco_TL_start_data_t log_parms;
log_parms.flags = MCO_TRANSLOG_ITERABLE;
log_parms.disk_page_size = DISK_PAGE_SIZE;
mco_translog_start (db, LOG_FILE, &log_parms);
日志文件的读取与迭代
#define TEMP_BUFFER_SIZE (32*1024*1024)
#define LOG_FILE "/tmp/mylog"
MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
{
return MCO_S_OK;
}
...
mem_buff = malloc (TEMP_BUFFER_SIZE);
mco_translog_iterate (LOG_FILE, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (),
mem_buff, TEMP_BUFFER_SIZE);
将日志分割成多个文件
为了减少从事务写入日志到传递给用户定义的迭代回调之间的延迟,并且为了减少日志文件占用的磁盘空间,可以限制单个日志文件的最大大小,并在达到该限制时切换到另一个日志文件进行记录。
实现此文件大小限制有两种方法。
- 第一种方法是通过使用标志
MCO_TRANSLOG_SIZE_CLBK
启动日志记录,指定最大日志文件大小以及用户定义的文件大小回调函数的地址。这实现了一种中断策略;当超过指定的日志文件大小时,将调用回调函数。 - 第二种方法是轮询策略。可以通过函数
mco_translog_get_info()
定期检查当前日志文件的大小。
这两种方法都是从写入进程检查日志文件的大小。
当需要将日志文件切换到另一个时,只需再次调用 mco_translog_start()
函数,但这次要加上额外的标志 MCO_TRANSLOG_RESTART
。这会使日志文件切换到另一个,无需调用 mco_translog_stop()
函数,也不必在此期间冻结事务活动,这对于多线程应用程序来说非常有利。
示例
写入多文件日志记录——方法 1:“中断”
#define DISK_PAGE_SIZE 4096
#define MAX_LOG_SIZE 128*1024*1024
#define LOG_FILE1 "/tmp/mylog1"
#define MARK_FILE1 "/tmp/mymark1"
#define LOG_FILE2 "/tmp/mylog2"
#define MARK_FILE2 "/tmp/mymark2"
volatile int size_exceeded = 0;
void warn_sz_proc (mco_size_t log_size)
{
size_exceeded = 1;
}
...
mco_TL_current_info_t log_info;
mco_TL_start_data_t log_parms;
log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_SIZE_CLBK;
log_parms.disk_page_size = DISK_PAGE_SIZE;
log_parms.max_size = MAX_LOG_SIZE;
log_parms.warn_sz_proc = warn_sz_proc;
mco_translog_start (db, LOG_FILE1, &log_parms);
/* Work with database. Current log is the file LOG_FILE1 */
while (!size_exceeded)
{
...
}
log_parms.flags |= MCO_TRANSLOG_RESTART;
mco_translog_start (db, LOG_FILE2, &log_parms);
create_file (MARK_FILE1);
/* Work with database further. Current log is the file LOG_FILE2 */
...
mco_translog_stop (db);
create_file (MARK_FILE2);
写入多文件日志记录——方法 1:“投票”
#define DISK_PAGE_SIZE 4096
#define MAX_LOG_SIZE 128*1024*1024
#define LOG_FILE1 "/tmp/mylog1"
#define MARK_FILE1 "/tmp/mymark1"
#define LOG_FILE2 "/tmp/mylog2"
#define MARK_FILE2 "/tmp/mymark2"
mco_TL_current_info_t log_info;
mco_TL_start_data_t log_parms;
log_parms.flags = MCO_TRANSLOG_ITERABLE;
log_parms.disk_page_size = DISK_PAGE_SIZE;
mco_translog_start (db, LOG_FILE1, &log_parms);
/* Work with database. Current log is the file LOG_FILE1 */
while (mco_translog_get_info (db, &log_info) == MCO_S_OK
&& log_info.log_size < MAX_LOG_SIZE)
{
...
}
log_parms.flags |= MCO_TRANSLOG_RESTART;
mco_translog_start (db, LOG_FILE2, &log_parms);
create_file (MARK_FILE1);
/* Work with database further. Current log is the file LOG_FILE2 */
...
mco_translog_stop (db);
create_file (MARK_FILE2);
读取和遍历多个日志文件
从多个日志文件中读取内容相当简单。应用程序只需通过依次调用函数 mco_translog_iterate()
来切换日志文件即可。
#define TEMP_BUFFER_SIZE (32*1024*1024)
#define LOG_FILE1 "/tmp/mylog1"
#define MARK_FILE1 "/tmp/mymark1"
#define LOG_FILE2 "/tmp/mylog2"
#define MARK_FILE2 "/tmp/mymark2"
MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
{
/* Note "trans" is always 0 in case of asynchronous export */
return MCO_S_OK;
}
...
mem_buff = malloc (TEMP_BUFFER_SIZE);
wait_while_no_file (MARK_FILE1);
mco_translog_iterate (LOG_FILE1, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (),
mem_buff, TEMP_BUFFER_SIZE);
unlink (MARK_FILE1);
wait_while_no_file (MARK_FILE2);
mco_translog_iterate (LOG_FILE2, 0, iteration_proc, user_ctx, mydatabase_get_dictionary (),
mem_buff, TEMP_BUFFER_SIZE);
unlink (MARK_FILE2);
请注意,mco_translog_iterate()
函数只有在完成日志文件的读取、用户定义的回调函数返回值不是 MCO_S_OK
或发生其他内部错误时才会返回。另外,要读取下一个日志文件,必须反复调用 mco_translog_iterate()
函数。
使用管道传输数据
当采用管道模式作为将事务数据导出至外部数据库的传输方式时,会在共享内存或常规内存(取决于用作数据源的主 SmartEDB 数据库所使用的内存设置)中放置一个管道缓冲区。如果读取进程处理数据的速度快于写入进程将其放入管道的速度,那么这种模式显然是最快的。然而,如果读取进程比写入进程慢,管道缓冲区将会溢出,写入进程会在事务提交期间被阻塞,直到读取进程释放出足够的内存来存储事务。
可以通过使用可选的溢出文件来避免这种阻塞。此文件在函数 mco_translog_start()
和 mco_translog_iterate()
的 file_path
参数中指定。如果指定了文件,则在管道缓冲区溢出时使用该文件,因此写入进程永远不会被阻塞。
当然,使用此溢出文件的写入者和读取者的处理速度会慢得多。这是确定管道缓冲区大小时需要考虑的一个因素。另一个需要考虑的因素是,无论如何,管道模式都不会生成可用于数据库恢复的日志文件。
读取进程必须打开要导出部分或全部事务数据的数据库的数据库句柄(通过调用 mco_db_connect()
函数)。然后,必须将此数据库句柄传递给 mco_translog_iterate()
函数。这允许读取器访问管道。
管道缓冲区是 mco_device_t 类型的一个实例,在创建数据库时必须指定。它将被分配 MCO_MEMORY_ASSIGN_PIPE_BUF
,并且类型为 MCO_MEMORY_NAMED
或 MCO_MEMORY_CONV
,必须与主数据库设备使用的内存类型相同。
还可以指定多个管道缓冲设备。在这种情况下,事务将自动广播到所有管道实例,作为单独的管道缓冲区。这意味着调用函数 mco_translog_iterate()
或 mco_translog_play()
的读取器(进程或线程)的数量必须与管道设备的数量相等。检查 mco_TL_current_info 结构中的 pipe_readers_connected
字段以验证所有读取器是否已连接是有用的。由于所有管道设备都同时工作,因此建议为所有管道指定相同的大小。对于传递给 mco_translog_iterate()
的所有临时缓冲区也是如此。(有关多个管道的示例实现,请参阅示例 tlogitermultipipe。)
传递给 mco_translog_iterate()
的临时缓冲区应通过 malloc()
分配,前提是主数据库设备为常规内存类型。但如果主数据库设备使用共享(命名)内存,则传递给 mco_translog_iterate()
的临时缓冲区指针必须为 0。在这种情况下,mco_translog_iterate()
会自动在共享内存中创建临时缓冲区。
请注意,在使用共享内存时,区分 SmartEDB 核心的“偏移量”和“直接指针”性质是很重要的。如果使用“dptr”核心,则所有内存设备(结构体 mco_device_t 的对象)都必须指定某个唯一的“提示”字段,该字段实际上是请求的共享内存块的起始地址。关于函数 mco_translog_play()
和 mco_translog_iterate()
,参数(或结构体 mco_TL_play_params_h 中对应的字段)void *mem_ptr
实际上与内部使用的内存设备相关,该内存设备临时保存从日志流中读取的数据库对象。因此,此处的参数 mem_ptr
是该内存设备的相同“提示”。在“偏移量”核心的情况下,它应为零;在“dptr”核心的情况下,它应为有效的地址值。
预缓冲
在初始化管道时,应用程序可以在 data_flags 结构中指定 MCO_TRANSLOG_SYNC_INSTANTLY
标志。如果指定了该标志,则每个事务都会立即写入管道。否则,事务日志运行时会在其内部中间缓冲区(默认大小为 64K)中缓冲事务。当缓冲区满时,所有事务会一次性写入管道。在使用多个管道和双输出模式时(见下文),不允许使用 MCO_TRANSLOG_SYNC_COUNT
和 MCO_TRANSLOG_SYNC_TIMER
标志值。
用户定义的回调函数 iteration_proc
可通过返回不同于 MCO_S_OK
的值来报告错误。在这种情况下,mco_translog_iterate()
会立即返回与 iteration_proc
刚刚返回的相同的返回码。在写入器端,事务提交(mco_trans_commit()
或 mco_trans_commit_phase2()
)将返回错误码 MCO_E_TL_PIPE_TERM
。这会告知写入器进程读取器已中断其循环。在这种情况下,在执行任何进一步操作之前,应通过调用 mco_translog_stop()
来停止事务日志。
选项标记最后一个对象
运行时标志 MCO_RT_OPTION_MARK_LAST_OBJ
可以通过函数 mco_runtime_setoption()
进行设置,以指定需要“事务结束”的通知。日志文件或管道流中的事务按其原始顺序输出。默认情况下,在读取和遍历日志或管道时,没有简单的方法来了解传递给用户定义的迭代回调的对象是否为当前事务中的最新对象。为此,使用运行时选项 MCO_RT_OPTION_MARK_LAST_OBJ
使运行时能够通知用户定义的回调每个事务的结束。如果设置了该选项,则用户定义的迭代回调将额外被调用一次,传递一个零化的对象句柄,以表明当前事务已结束。
更多实现细节在示例 tlogiterpipe、tlogitermultipipe 和 tlogiterdualout 中有所展示。
打开基于管道的日志文件
#define LOG_PAGE_SIZE 128
#define PIPE_BUFFER_SIZE (16*1024*1024)
mco_device_t dev[];
mco_TL_start_data_t log_parms;
...
/* assign pipe buffer as mco device. It will be automatically
passed to a reader */
dev[x].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
dev[x].size = PIPE_BUFFER_SIZE;
dev[x].type = MCO_MEMORY_NAMED;
sprintf( dev[x].dev.named.name, "%s-pipe", databaseName_ );
dev[x].dev.named.flags = 0;
dev[x].dev.named.hint = 0;
...
mco_db_open_dev (DATABASE_NAME, mydatabase_get_dictionary(), dev, x, &db_params);
mco_db_connect (DATABASE_NAME, &db);
log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE;
log_parms.disk_page_size = LOG_PAGE_SIZE;
mco_translog_start (db, 0, &log_parms);
基于管道的日志的读取与迭代
#define TEMP_BUFFER_SIZE (32*1024*1024)
#define PIPE_BUFFER_SIZE (16*1024*1024)
MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
{
mco_trans_counter_t trans_no;
mco_trans_no (trans, &trans_no);
if (all_ok)
return MCO_S_OK;
else
return MCO_ERR_LAST + 1;
}
...
MCO_RET rc;
void *mem_buff;
mco_db_connect (mydatabase_name, &db);
/* if used memory management library is different from mcomconv */
#ifdef USE_SHARED_MEMORY
mem_buff = 0;
#else
mem_buff = malloc (TEMP_BUFFER_SIZE);
#endif
mco_translog_iterate (0, db, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE);
打开带有溢出文件的基于管道的日志文件
#define LOG_PAGE_SIZE 128
#define PIPE_BUFFER_SIZE (16*1024*1024)
#define PIPE_OVEFLOW_FILE "/tmp/overflow_file"
mco_device_t dev[];
mco_TL_start_data_t log_parms;
...
/* assign pipe buffer as mco device. It will be automatically
passed to a reader */
dev[x].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
dev[x].size = PIPE_BUFFER_SIZE;
dev[x].type = MCO_MEMORY_NAMED;
sprintf( dev[x].dev.named.name, "%s-pipe", databaseName_ );
dev[x].dev.named.flags = 0;
dev[x].dev.named.hint = 0;
...
mco_db_open_dev (DATABASE_NAME, mydatabase_get_dictionary(), dev, x, &db_params);
mco_db_connect (DATABASE_NAME, &db);
log_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE;
log_parms.disk_page_size = LOG_PAGE_SIZE;
mco_translog_start (db, PIPE_OVEFLOW_FILE, &log_parms);
基于管道的日志文件(含溢出文件)的读取与迭代
#define TEMP_BUFFER_SIZE (32*1024*1024)
#define PIPE_BUFFER_SIZE (16*1024*1024)
#define PIPE_OVEFLOW_FILE "/tmp/overflow_file"
MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
{
mco_trans_counter_t trans_no;
mco_trans_no (trans, &trans_no);
...
if (all_ok)
return MCO_S_OK;
else
return MCO_ERR_LAST + 1;
}
...
MCO_RET rc;
void *mem_buff;
mco_db_connect (mydatabase_name, &db);
/* if used memory management library is different from mcomconv */
#ifdef USE_SHARED_MEMORY
mem_buff = 0;
#else
mem_buff = malloc (TEMP_BUFFER_SIZE);
#endif
mco_translog_iterate (PIPE_OVEFLOW_FILE, db, iteration_proc, user_ctx, mydatabase_get_dictionary (), mem_buff, TEMP_BUFFER_SIZE);
字节序转换
可以使用事务日志记录将小端字节序和大端字节序架构混合使用。换句话说,应用程序可以在小端字节序系统上创建事务日志,并将其重放到大端字节序系统上,反之亦然。具备适当的自动检测和自动转换功能,无需进一步配置或用户干预。但请注意,不同字长平台(例如 x32 与 x64)的混合使用是不允许的。另外,请注意,写入方(日志创建者)和读取方应用程序必须使用相同的事务管理器(MURSIW 或 MVCC)。
双输出日志模式
SmartEDB 事务日志记录允许在使用管道模式作为事务数据导出的传输方式的同时,将事务日志的副本写入文件。此文件可用于例如恢复数据库或以日志形式将其存储在持久介质上。存储的日志文件随后可通过 API 函数 mco_translog_apply() 进行处理。或者,可通过函数 mco_translog_iterate()
和 mco_translog_play()
进行迭代。通过指定标志 MCO_TRANSLOG_DUALOUT
以及标志 MCO_TRANSLOG_PIPE
可启用双输出日志记录。结构 mco_TL_start_data
的字段 dual_log_path
指定目标日志文件。(示例 tlogiterdualout 演示了此功能。)
动态管道
此功能(在 SmartEDB 事务日志版本 6.5 及更高版本中)允许应用程序在运行时创建/删除管道,以及连接到和断开与它们的连接。数据库事件会记录到管道中,连接的应用程序能够根据需要处理这些日志;它们还可以随意停止处理这些日志,并移除相关的资源(管道和连接)。通过动态管道支持以下应用程序场景:
主进程创建一个共享内存数据库并启动事务日志记录。
在任何给定的时间,辅助进程都会连接到数据库,使用此连接创建一个管道,然后处理主应用程序写入该管道的日志。
实际上,辅助进程异步监听写入日志的数据库事件,并且能够访问触发这些事件的已修改数据库对象的内容。辅助进程可以随时断开与数据库的连接,并销毁其管道。
当然,如果主进程通过 mco_translog_stop()
停止事务日志记录,所有已连接的读取器都会收到相应的返回代码。主应用程序不会接收或处理来自辅助应用程序的任何通知,而只是像往常一样提交其事务,并在需要时记录事件。可以同时创建多个管道,并且多个读取器可以连接到这些管道。
实施
如果为
mco_translog_start()
设置了标志MCO_TRANSLOG_DYNAMIC_PIPE
,则读取器可以在任何时候连接和断开管道,而不会影响写入器(提交事务的线程)。每个读取器都必须创建自己的管道——两个或多个读取器不能连接到同一个管道。mco_translog_play_ex()
这个 API 与mco_translog_play()
类似,不同之处在于它仅接受一个 mco_TL_play_params_t 结构体作为参数。该结构体的字段对应于mco_translog_play()
的参数,另外多了一个字段:
mco_device_t *pipe_device;
pipe_device 是读者要使用的管道设备的描述符。如果 pipe_device
的值为 NULL
,则事务日志运行时会选择第一个没有任何连接读者的管道。当使用旧式的 mco_translog_play()
和 mco_translog_iterate()
API 时,也会发生同样的情况。
mco_translog_play_params_init()
函数将 mco_TL_play_params_t 结构体的所有成员清零。运行时选项
MCO_RT_MAX_DYNAMIC_PIPES
的默认值为 0。此值定义了可通过mco_db_extend_dev()
添加的 PIPE_BUF 类型设备的最大数量。此限制不包括通过mco_db_open_dev()
创建的管道设备。换句话说,如果应用程序将MCO_RT_MAX_DYNAMIC_PIPES
设置为 3,并在创建数据库时定义了 5 个管道设备,那么在任何时候它最多可以有 8 个管道设备。
在日志文件或管道的迭代过程中使用事件处理程序
如果数据库模式定义了事件,那么除了迭代对象本身之外,还可以在读取日志文件或管道期间注册并触发这些事件的处理程序。这种技术称为“持久事件队列”,可以作为当前进程或单独进程的异步事件处理方法。
要使用此功能,数据库模式必须定义一个或多个事件,应用程序必须为这些事件定义处理程序,并且应用程序中必须定义一个用于注册处理程序的函数。然后,使用函数 mco_translog_play() 而不是 mco_translog_iterate() 来迭代日志。(示例 tlogiterevents 演示了此功能。)
通过事件筛选日志中存储的数据
可以指定事件掩码来限制写入日志流并传输到日志文件或管道的数据。请注意,由于进行了筛选,此模式不允许将日志用于数据库恢复。但是,对于某些特定目的,它有助于减少传输到日志的数据量。例如,如果在读取器端使用事件处理程序,并且除了事件本身的通知之外,无需将所有数据传输到日志。
标志 MCO_TRANSLOG_EVENT_MASK
用于启用此功能,结构 mco_TL_start_data 的 event_mask
字段用于定义所需的掩码。有两个预定义的掩码值:MCO_LOG_MASK_ALL_CHANGES
会将所有数据存储在日志中,这相当于禁用事件掩码过滤;MCO_LOG_MASK_ALL_EVENTS
仅存储与数据库模式中定义的事件相关的数据。如果未使用这些预定义值,则掩码必须由数据库模式编译器生成的事件处理程序标识符组成。(请注意,事件处理程序标识符不是从零开始的,即它们是从 1 开始的自然数,并且必须减 1 才能在掩码中使用。例如,新、删除和删除所有事件可以在掩码中指定如下:
tl_parms.event_mask = (1 << (MCO_EVENT_newEvent - 1)) |
(1 << (MCO_EVENT_deleteEvent - 1)) |
(1 << (MCO_EVENT_deleteAllEvent - 1));
用户定义的迭代回调
考虑以下模式:
class Record_A
{
uint4 key;
};
class Record_B
{
uint4 key;
};
模式编译器生成的部分代码将是:
typedef struct Record_A_ { MCO_Hf h; } Record_A;
#define Record_A_code 1
typedef struct Record_B_ { MCO_Hf h; } Record_B;
#define Record_B_code 2
用户自定义的回调函数 iteration_proc
由 mco_translog_iterate()
对每个对象依次调用。此 iteration_proc
具有以下定义:
MCO_RET iteration_proc(
mco_trans_h trans,
MCO_Hf* obj,
int cid,
int obj_state,
void* user_ctx
)
trans
mco_trans_h
事务句柄。(通过使用此句柄调用 mco_trans_no()
函数可以获取事务编号。)可能为 0,表示在源数据库中调用了 mco_db_clean()
函数。有关详细信息,请参阅示例 tlogiterpipe。
obj
MCO_Hf*
在事务中实际创建、删除或更新的对象的对象句柄。此句柄可用于通过类似以下调用访问对象的字段:Record_A_key_get((Record_A*)obj, &key); Record_B_key_get((Record_B*)obj, &key)
;
如果事务状态指示为结束,则可能为 0。(有关详细信息,请参阅“选项标记最后一个对象”部分)。
cid
int
此对象的类标识符。其值与定义的 Record_A_code 或 Record_B_code 相同。在尝试访问其字段之前,将此代码与 Record_A_code 或 Record_B_code 进行比较,以确定存在的是哪种类型的对象。
obj_state
int
对对象执行的操作类型:MCO_TRANS_OBJ_ALL_DELETED - 删除此类的所有对象。MCO_TRANS_OBJ_DELETED - 对象已被删除。MCO_TRANS_OBJ_CREATED - 对象已创建。0 - 对象已更新。
user_ctx
void*
传递给函数 mco_trans_iterate()
的指向用户数据的指针。
如果 obj_state
值为 MCO_TRANS_OBJ_ALL_DELETED
,则 obj 是在事务中调用 classname_delete_all()
函数时数据库中该类的第一个对象。例如:Record_A_delete_all()
。如果在调用 classname_delete_all()
时数据库中不存在此类对象,则当前的迭代回调函数将不会被调用。因此,obj
参数不可能为空。
上述模式的用户自定义回调实现示例
MCO_RET iteration_proc (mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx)
{
mco_trans_counter_t trans_no;
if (trans == 0)
{
/* handle call of mco_db_clean called in source DB */
return MCO_S_OK;
}
mco_trans_no (trans, &trans_no);
if (cid == Record_A_code)
{
uint4 key;
if (obj_state == MCO_TRANS_OBJ_ALL_DELETED)
{
/* Delete all objects of class Record_A */
}
else if (obj_state == MCO_TRANS_OBJ_DELETED)
{
/* Delete current object by the key */
Record_A_key_get ((Record_A *)obj, &key);
}
else if (obj_state == MCO_TRANS_OBJ_CREATED)
{
/* Create new object with the key */
Record_A_key_get ((Record_A *)obj, &key);
}
else
{
/* Update object with the key */
Record_A_key_get ((Record_A *)obj, &key);
}
}
else if (cid == Record_B_code)
{
uint4 key;
if (obj_state == MCO_TRANS_OBJ_ALL_DELETED)
{
/* Delete all objects of class Record_B */
}
else if (obj_state == MCO_TRANS_OBJ_DELETED)
{
/* Delete current object by the key */
Record_B_key_get ((Record_B *)obj, &key);
}
else if (obj_state == MCO_TRANS_OBJ_CREATED)
{
/* Create new object with the key */
Record_B_key_get ((Record_B *)obj, &key);
}
else
{
/* Update object with the key */
Record_B_key_get ((Record_B *)obj, &key);
}
}
return MCO_S_OK;
}
用户定义的寄存器事件处理程序回调函数
考虑以下模式:
class Record
{
uint4 key;
event <new> newEvent;
};
模式编译器生成的部分代码将是:
#define MCO_EVENT_newEvent 1
typedef MCO_RET (*mco_newEvent_handler)( mco_trans_h t, Record * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *param);
MCO_RET mco_register_newEvent_handler ( mco_trans_h t, mco_newEvent_handler handler, void * param );
MCO_RET mco_unregister_newEvent_handler ( mco_trans_h t, mco_newEvent_handler handler);
用户自定义的回调函数 register_callback()
会在 mco_translog_play()
开始对象迭代之前被调用。该 register_callback()
函数具有以下定义:
MCO_RET register_callback(
mco_trans_h trans, // 事务句柄。此句柄应传递给事件注册函数。
void* user_ctx // 传递给函数 mco_trans_play() 的参数 regevent_user_ctx 中的用户数据指针。
)
示例
MCO_RET register_callback(mco_trans_h t, void *param)
{
return mco_register_newEvent_handler(t, my_newEvent_handler, param);
}
MCO_RET my_newEvent_handler( mco_trans_h t, Record * obj, MCO_EVENT_TYPE et, /*INOUT*/ void *param)
{
ev_stat_t *s = (ev_stat_t *)param;
printf("my_newEvent_handler %d\n", ++(s->new_cnt));
return MCO_S_OK;
}
潜在系统故障的预防措施
如果在使用管道的多进程场景中启用了事务日志记录,并且其中一个进程崩溃(例如读取器),则写入器进程可能会因等待管道中使用的内部锁而被挂起。为解决这种情况,可以在事务日志记录进程旁边运行一个“嗅探器”任务;该嗅探器任务将清理由崩溃进程留下的死连接所遗留的锁。(有关嗅探器 API 的详细信息,请参阅 SmartEDB 用户指南中的“从失败进程恢复数据库”部分。)
无论使用何种事务日志记录(TL)方法(管道、多管道或动态管道),都建议采取以下步骤:
- 在主应用程序的单独线程中运行嗅探器:
#define SNIFFER_INTERVAL 100
MCO_RET sniffer_callback(mco_db_h db, void* context, mco_trans_counter_t trans_no)
{
SAMPLE_OS_TASK_ID pid = *(SAMPLE_OS_TASK_ID *)context;
if ( sample_os_task_id_check( pid ) == 0 )
{
return MCO_S_OK;
}
printf("Process %d is crashed\n", pid);
return MCO_S_DEAD_CONNECTION;
}
void sniffer_loop( sample_task_t * descriptor )
{
mco_db_h db;
SAMPLE_OS_TASK_ID pid = sample_os_task_id_get();
/* Connect using mco_db_connect_ctx() and pass &pid as parameter */
MCO_RET rc = mco_db_connect_ctx(db_name, &pid, &db);
if ( MCO_S_OK == rc )
{
/* Descriptor->stopped flag is set to 1 by sample_stop_task() in main thread*/
while ( MCO_S_OK == rc && descriptor->stopped == 0 )
{
rc = mco_db_sniffer(db, sniffer_callback, MCO_SNIFFER_INSPECT_ACTIVE_CONNECTIONS);
sample_sleep(SNIFFER_INTERVAL);
}
mco_db_disconnect(db);
}
}
请注意,嗅探器是在包含 sniffer_loop() 函数的任务中执行的。回调函数本身只是确保所有连接到数据库的任务都处于活动状态(例如,请参阅 samples/native/core/19-recovery/sniffer)。
2a) 对于管道情况,主程序将初始化 TL 处理并监视 TL 报告的任何错误。如果报告了任何错误,主应用程序将终止 TL 处理(这将释放 TL 运行时设置的所有锁),并在必要时重新启动 TL,以允许其他 TL 客户端继续处理管道,例如:
...
/* allocate pipe memory device */
dev[1].type = MCO_MEMORY_NAMED;
dev[1].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
sprintf( dev[1].dev.named.name, "%s-pipe", db_name );
dev[1].size = DATABASE_SIZE / 2;
dev[1].dev.named.flags = 0;
dev[1].dev.named.hint = 0;
...
/* Set up and run transaction logging (pipe is created here) */
tl_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE;
tl_parms.disk_page_size = PSTORAGE_PAGE_SIZE;
rc = mco_translog_start( connection, 0, &tl_parms );
/* Wait for TL-client connection */
printf("\n\n\tWaiting for log reader ...\n" );
while(1)
{
mco_TL_current_info_t tl_info;
mco_translog_get_info ( connection, &tl_info );
if (tl_info.pipe_readers_connected)
break;
else
sample_sleep (100);
}
...
while ( data processing )
{
while (1)
{
/* process database transaction */
if (rc == MCO_S_OK)
{
...
}
else if (rc == MCO_E_TL_IO_ERROR || rc == MCO_E_TL_PIPE_TERM )
{
/* TL processing error detected */
/* force logging to stop */
rc = mco_translog_terminate( connection );
/* wait for TL connection */
printf("\n\n\tWaiting for log reader ...\n" );
while(1)
{
mco_TL_current_info_t tl_info;
mco_translog_get_info ( connection, &tl_info );
if (tl_info.pipe_readers_connected)
break;
else
sample_sleep (100);
}
/* restart TL processing */
tl_parms.flags |= MCO_TRANSLOG_RESTART;
rc = mco_translog_start( connection, 0, &tl_parms );
sample_rc_check( "\tRe-start logging", rc );
break;
}
else
...
}
}
2b. 对于多管道的情况,处理方式与 TL 类似,只是需要创建多个管道设备:
...
for ( i=1; i<=PIPES_COUNT; i++ ) {
dev[i].type = MCO_MEMORY_NAMED;
dev[i].assignment = MCO_MEMORY_ASSIGN_PIPE_BUF;
sprintf( dev[i].dev.named.name, "%s-pipe-%u", db_name, i );
dev[i].size = DATABASE_SIZE / 2;
dev[i].dev.named.flags = 0;
dev[i].dev.named.hint = 0;
}
...
rc = mco_translog_start( connection, 0, &tl_parms );
sample_rc_check( "\tStart logging", rc );
/* Wait while iterator thread is alive */
printf("\n\n\tWaiting for log reader ...\n" );
while(1)
{
mco_TL_current_info_t tl_info;
mco_translog_get_info ( connection, &tl_info );
if (tl_info.pipe_readers_connected == PIPES_COUNT)
break;
else
sample_sleep (100);
}
...
while ( data processing )
{
while (1)
{
/* process database transaction */
if (rc == MCO_S_OK)
{
...
}
else if (rc == MCO_E_TL_IO_ERROR || rc == MCO_E_TL_PIPE_TERM )
{
/* TL processing error detected */
/* force logging to stop */
rc = mco_translog_terminate( connection );
/* wait for TL connection */
printf("\n\n\tWaiting for log reader ...\n" );
while(1)
{
mco_TL_current_info_t tl_info;
mco_translog_get_info ( connection, &tl_info );
if (tl_info.pipe_readers_connected == PIPES_COUNT)
break;
else
sample_sleep (100);
}
/* restart TL processing */
tl_parms.flags |= MCO_TRANSLOG_RESTART;
rc = mco_translog_start( connection, 0, &tl_parms );
sample_rc_check( "\tRe-start logging", rc );
break;
} else
...
}
}
2c. 动态管道的情况很简单,例如:
...
/* Set maximum number of dynamic pipe segments */
mco_runtime_setoption(MCO_RT_MAX_DYNAMIC_PIPES, PIPES_COUNT);
...
/* Set default database parameters */
mco_db_params_init ( &db_params );
/* Customize the params according to the application */
db_params.mem_page_size = MEMORY_PAGE_SIZE;
db_params.disk_page_size = 0; /* Pure in-memory database */
db_params.db_max_connections = 10 + PIPES_COUNT;
db_params.connection_context_size = sizeof(SAMPLE_OS_TASK_ID);
...
/* Set up and run transaction logging (pipe is created here) */
tl_parms.flags = MCO_TRANSLOG_ITERABLE | MCO_TRANSLOG_PIPE |
MCO_TRANSLOG_DYNAMIC_PIPE | MCO_TRANSLOG_SYNC_INSTANTLY;
tl_parms.disk_page_size = PSTORAGE_PAGE_SIZE;
rc = mco_translog_start( connection, 0, &tl_parms );
sample_rc_check( "\tStart logging", rc );
/* Wait while iterator thread is alive */
printf("\n\n\tWaiting for log reader ...\n" );
while(1)
{
mco_TL_current_info_t tl_info;
mco_translog_get_info ( connection, &tl_info );
if (tl_info.pipe_readers_connected)
break;
else
sample_sleep (100);
}
...
可能的数据中继死锁
存在一些可能导致数据中继死锁的情况。例如:
当存在两个(或更多)数据库连接时,第一个连接可能会向数据库中大量写入新记录,而第二个连接则用于启动事务日志读取器(例如调用 mco_translog_play() 等)。然后,在用户定义的读取器回调函数内部,可能会有一些数据处理操作,导致从这个第二个连接向同一个数据库写入数据。在高负载情况下,用户定义的读取器回调函数(在第二个连接上)中的写入事务可能无法完成并被锁定,因为管道中没有空间写入数据。但与此同时,管道也无法被读取(以释放一些空间),因为读取过程是在与提交操作已被阻塞的同一个线程中执行的。
在高负载情况下,如果没有管道溢出文件,当响应迭代器读取而提交对象的线程由于管道已满而无法执行提交操作时,它会阻塞等待读取器释放管道中的空间。但读取器线程由于自身被阻塞,等待第一个线程写入并释放同样有限且已满的队列,因而无法做到这一点。实际上,如果管道已满且读取线程因自身处于挂起的提交操作中而无法读取数据,那么任何试图提交事务的线程(连接)都会被阻塞。这是闭环系统(提交 - 日志 - 管道 - 读取器分析器 - 再次提交)中一种不明显但自然的行为。试图将管道读取过程与新数据的提交(例如,提交 - 日志 - 管道 - 读取器分析器 - 某些数据队列 - 另一个应用程序线程和数据库连接 - 再次提交)分离可能会导致死锁。基本上,如果应用程序将从事务日志管道读取的数据的结果提交到数据库,在高负载情况下可能会出现死锁。
这可在应用程序级别通过人为限制初始提交来加以缓和。实际上,最初设计数据中继功能是为了将数据从 SmartEDB 转移到另一个数据库管理系统或某些数据处理器。所以,当时并未假定处理过的数据会再提交回 SmartEDB。幸运的是,管道溢出文件解决了“数据回环”这一问题。重要的是,避免死锁的唯一可靠方法是使用管道溢出文件(这确实会对整体性能造成一定影响)。
MCO_E_TL_PIPE_TERM 错误条件的可能情况
当调用 mco_translog_iterate() 的读取器以及执行事务提交操作的写入器返回错误代码 MCO_E_TL_PIPE_TERM 时,可能会出现两种情况。这两种场景可能如下所示:
案例 A:读取器和写入器中的 MCO_E_TL_PIPE_TERM1. 调用 mco_translog_start() 函数以开始记录日志。
在迭代线程中调用 mco_translog_iterate() 函数以开始读取。
用户定义的迭代过程返回一个错误代码(除 MCO_S_OK 以外的任何值)以终止函数 mco_translog_iterate() 。
当函数 mco_translog_iterate() 以迭代过程指定的代码返回时,管道即被终止。
当作者尝试提交事务时,函数 mco_trans_commit() 将返回 MCO_E_TL_PIPE_TERM,因为管道已终止。
在未先调用函数 mco_translog_stop() 的情况下再次启动 mco_translog_iterate() 。
现在,mco_translog_iterate() 函数返回 MCO_E_TL_PIPE_TERM,因为管道已终止。
案例 B:在调用 mco_translog_terminate() 之后,读取器和写入器中的 MCO_E_TL_PIPE_TERM 状态
1 调用 mco_translog_start() 开始日志记录
2 在迭代线程中调用 mco_translog_iterate() 函数以开始读取
在写入线程(或主实例)中调用 mco_translog_terminate() 函数,管道即被终止。
现在,函数 mco_translog_iterate() 返回 MCO_E_TL_PIPE_TERM 。
尝试提交事务导致函数 mco_trans_commit() 返回 MCO_E_TL_PIPE_TERM,因为管道已终止。
请注意,如果用户定义的迭代回调始终返回 MCO_S_OK,则情况 A 不可能出现;如果在调用 mco_trans_commit() 时返回错误代码 MCO_E_TL_IO_ERROR 时调用函数 mco_translog_terminate(),则情况 B 是可能的。