事件管理
正如介绍页面所述,SmartEDB 提供了通过生存时间(TTL)特性自动删除过时数据库对象的能力,并且能够管理数据库对象的新增、删除、更新和检查点事件。
生存时间机制
生存时间机制(TTL)支持根据 TTL 策略自动删除对象。支持两种 TTL 策略:
- TTL_count:设置对象数量阈值;
- TTL_clock_time:设置对象生存长度阈值。
这两种策略可以同时为单个类以秒(sec)、毫秒(ms)和微秒(us)为单位。语法为:
TTL_count <max_object_count>;
TTL_clock_time <max_object_age> <sec | ms | us>;
例如,下面的模式定义为类 A 设置TTL策略:
class A
{
unsigned<4> id;
char<256> name;
tree<id> idx_id;
TTL_count 10;
TTL_clock_time 5 sec;
};
在此模式下,当创建的 A 类对象数量超过10个时,系统将删除超出限制的对象(即之前创建的对象)。同时,任何存储时间超过5秒的对象也将被自动清除。
为了实现TTL(Time-to-Live)机制,系统引入了两个隐藏字段:ttl_count@
和ttl_timestamp@
,以及相应的索引:ttl_count_idx@
和ttl_timestamp_idx@
。这些字段和索引仅在为特定类配置了TTL策略时才会添加。每当创建新对象时,会初始化这些字段的值。
TTL策略的检查与执行仅在READ_WRITE
事务提交的第一阶段前进行,这意味着在当前事务中新创建的对象由于其索引尚未更新,因此不会受到TTL策略的影响。为了解决这一问题,可以在事务提交前设置一个检查点。
分布式环境中的副作用
在使用SmartEDB集群实现分布式数据库时,若采用TTL_clock_time策略,则必须确保各参与节点的时钟精确同步。
请注意以下关键点:
- 在集群环境中,TTL时钟仅在事务发起方于提交阶段1开始时进行验证。远程节点不验证时钟时间,无论其本地时钟状态如何,均会应用该事务。一旦事务在远程节点成功应用,将向发起方发送确认通知,以完成事务提交。尽管各节点上的时钟可能存在差异,但只要事务成功提交,所有节点的数据内容将保持一致。
- 某节点可能会存在违反其TTL要求的数据。例如,在上述场景中,远程节点的时钟可能显著快于发起事务的本地节点的时钟。根据远程节点的时钟,某些记录应已被删除,但实际上仍保留在该节点的数据库中。
- 即使某节点上的TTL未过期,对象也可能因在其他节点上已过期而被删除。例如,假设节点1的时钟设置为下午1点,而节点2的时钟设置为下午2点。当事务从节点1发起并成功提交到两个节点后,尽管该记录在节点2上的TTL已过期,但在事务提交后不久,节点2发起另一事务,并根据其自身时钟验证TTL条件后,删除了刚刚插入的对象,并将此删除操作传播至整个集群。因此,该记录的实际生存期可能比预期要短。
事件接口
SmartEDB 应用程序能够对数据事件(如创建、更新或删除数据库对象)做出响应。对于 C 应用程序,DDL 事件声明会导致应用程序接收以下数据库事件类型的通知:添加新对象、删除对象或某类的所有对象、检查点事件以及更新对象或对象的指定字段。事件特定于类。换句话说,当向数据库中添加 B 类的对象时,A 类的新事件处理程序不会收到通知。模式中的事件声明定义了哪些事件将触发应用程序通知。应用程序如何处理这些事件由事件处理程序在运行时决定。在 C 应用程序中,事件可以同步处理或异步处理。
异步事件处理
在异步事件处理中,应用程序会生成一个单独的线程来处理每种类型的事件。
- 此事件线程调用函数
mco_async_event_wait()
来等待指定事件。 - 当事件发生时,运行时会释放该线程。
- 释放线程后,运行时继续正常处理,处理程序线程与其他线程并行运行,直到完成处理并再次调用
mco_async_event_wait()
。 - 在事件处理程序完成任务并再次调用
mco_async_event_wait()
等待事件之前(事件未排队),可能会有一个很小的窗口,在此期间另一个事件实例可能会发生。 - 如果处理程序将事件的处理委托给另一个线程,允许处理程序线程立即再次等待事件,则可以将此窗口最小化。
- 如果不接受未处理事件风险,可以使用同步事件处理程序,应用程序可以维护一个单独的未处理事件表,或者可以使用 SmartEDB 事务日志的
mco_translog_play()
函数。
异步事件在事务提交后激活。如果在单个事务的范围内,添加、删除了多个对象,或者更新了多个具有等待事件处理程序的字段,则所有处理程序将同时激活。C 应用程序将在如下代码中定义异步事件处理程序:处理 <new>
事件的线程函数。
/* 处理 <new> 事件的线程函数。 */
void NewEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_newEvent) )
{
/* 处理事件 */
...
}
}
/* 处理 <update> 事件的线程函数。 */
void UpdateEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_updateEvent) )
{
/* 处理事件 */
...
}
}
/* 处理 <delete> 事件的线程函数。 */
void DeleteEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_deleteEvent) )
{
/* 处理事件 */
...
}
}
/* 处理 <update> 事件的线程函数。 */
void CheckpointEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_checkpointEvent) )
{
/* 处理事件 */
...
}
}
然后,C 应用程序将启动事件处理线程,并使主应用程序线程休眠若干毫秒(通常 100 毫秒就足够了),以便事件处理线程开始监听(等待)。接着,主应用程序线程将继续进行正常的数据库处理。在终止时,应用程序将调用 mco_async_event_release_all()
来释放所有事件,然后停止事件处理线程。
有关实现细节,请参阅 SDK 示例 samples/core/10_events_asynch。
同步事件处理
同步事件处理机制使C应用程序能即时响应事件,避免长时间延迟以防数据库访问被阻塞。
处理程序在同一线程中执行,控制READ_WRITE
事务,返回MCO_S_OK
表示成功。返回mco.h
中定义的MCO_S
或MCO_E
常量或用户自定义值用于指示成功或失败,并提供额外信息。运行时会将此返回值传递给应用程序以根据需要采取相应措施(例如必要时回滚事务)。
同步事件处理程序通过调用生成的mco_register_event_handler()
函数进行注册。可以为单个事件类型注册多个处理程序,但无法预测它们的调用顺序。
注册时,应用程序可以传递一个用户定义的参数,该参数将在事件处理程序调用时传递给它。该参数是一个void指针,可以引用简单的标量值或复杂的数据结构,具体取决于应用程序需求。
新对象创建事件:同步处理程序由
classname_new()
或classname_oid_new()
函数在对象实例化后立即调用。检查点事件:同步处理程序由
classname_checkpoint()
方法在对象插入索引之前或之后立即调用。检查点事件不由mco_trans_commit()
触发。删除事件:同步处理程序由
classname_delete()
方法在对象删除前调用。classname_delete_all()
方法不会触发删除事件。只有classname_delete()
方法会触发这些事件。更新事件:可以针对整个类或特定字段定义。与检查点事件类似,应用程序必须通过处理程序注册接口指定是在字段更新之前还是之后调用处理程序。更新处理程序可由任何导致字段内容更改的接口方法激活,例如
classname_fieldname_put()
、classname_fieldname_erase()
。- 如果在更新前调用事件处理程序并调用
classname_fieldname_get()
,则会检索当前数据库中的值; - 如果在更新后调用,则会检索应用程序刚刚写入数据库的值。用户定义的参数可用于向处理程序提供附加信息,例如事件前处理程序的传入值、事件后处理程序的旧值或擦除操作的矢量偏移量。
对于更新事件,不能同时为同一类定义类范围的事件和字段更新事件。
- 如果在更新前调用事件处理程序并调用
请注意
同步和异步事件都可以应用于任何给定的事件。在使用共享内存时,同步事件处理程序必须属于同一进程,否则结果将是不可预测的。特别是,如果进程B可能会插入、更新或删除Alpha对象,则不要在进程A中为类Alpha注册同步事件处理程序。应使用异步事件处理程序。
下面的代码片段说明了事件处理机制。考虑以下带有事件通知的类的模式DDL定义:
class dropped_call
{
uint4 trunk_id;
...
autoid;
event < trunk_id update > upd_trunk; // any name will do
event < new > add_trunk;
event < checkpoint > checkpoint_trunk;
event < delete > del_trunk;
};
模式编译器将在接口头文件中生成以下定义:
#define upd_trunk 15
// 15 只是举例说明;实际数值并不重要。
#define add_trunk 16
#define checkpoint_trunk 17
#define del_trunk 18
typedef MCO_RET (*mco_upd_trunk_handler)( /*IN*/ mco_trans_h *t,
/*IN*/ dropped_call *handle,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param );
typedef MCO_RET (*mco_add_trunk_handler)( /*IN*/ mco_trans_h *t,
/*IN*/ dropped_call *handle,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param );
typedef MCO_RET (*mco_del_trunk_handler)( /*IN*/ mco_trans_h *t,
/*IN*/ dropped_call *handle,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param );
MCO_RET mco_register_upd_trunk_handler( /*IN*/ mco_upd_trunk_handler,
/*IN*/ void *param,
/*IN*/ MCO_HANDLING_ORDER when);
MCO_RET mco_register_add_trunk_handler( /*IN*/ mco_add_trunk_handler,
/*IN*/ void *param,
/*IN*/ MCO_HANDLING_ORDER when);
MCO_RET mco_register_checkpoint_trunk_handler( /*IN*/ mco_checkpoint_trunk_handler,
/*IN*/ void *param);
MCO_RET mco_register_del_trunk_handler( /*IN*/ mco_del_trunk_handler,
/*IN*/ void *param);
MCO_RET mco_unregister_upd_trunk_handler( /*IN*/ mco_upd_trunk_handler);
MCO_RET mco_unregister_add_trunk_handler( /*IN*/ mco_add_trunk_handler);
MCO_RET mco_unregister_checkpoint_trunk_handler( /*IN*/ mco_checkpoint_trunk_handler);
MCO_RET mco_unregister_del_trunk_handler( /*IN*/ mco_del_trunk_handler);
若要为上述事件之一使用异步处理程序,应用程序将启动一个线程,并在该线程函数内调用:
mco_async_event_wait( dbh, upd_trunk );
其中,dbh 是 mco_db_connect()
方法返回的数据库句柄,而 upd_trunk
是在生成的接口文件中定义的值,用于引用感兴趣的事件。 如前所述,此线程将阻塞(等待),直到由运行时释放。可以通过事件的发生而被释放,或者应用程序可以通过调用以下方法之一强制释放它::
mco_async_event_release( dbh, upd_trunk );
mco_async_event_release_all_( dbh );
C 事件处理程序将通过 mco_async_event_wait()
的返回值来知晓自身是因事件发生而被释放,还是因 release()
函数而被释放。
MCO_S_OK
表示事件已发生;MCO_S_EVENT_RELEASED
表示事件已被释放。
对于前面的类定义及其生成的接口,以下代码片段展示了同步事件处理。
首先,应用程序必须使用如下代码为其同步事件处理程序函数进行注册:
int register_events(mco_db_h db)
{
MCO_RET rc;
mco_trans_h t;
mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
mco_register_add_trunk_handler(t, &new_handler, (void*) 0);
mco_register_checkpoint_trunk_handler(t, &checkpoint_handler,
(void*) 0, MCO_BEFORE_UPDATE );
mco_register_del_trunk_handler(t, &delete_handler, (void *) 0);
mco_register_upd_trunk_handler( t, &update_handler1,
(void *) 0, MCO_BEFORE_UPDATE );
rc = mco_trans_commit(t);
return rc;
}
处理程序函数的主体如下所示:
/* “<new>”事件的处理程序。读取自动 ID 并将其打印出来。 */
MCO_RET new_handler( /*IN*/ mco_trans_h t, /*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et, /*INOUT*/ void *param)
{
int8 u8;
param = (int *)1;
dropped_call_autoid_get( obj, &u8 );
printf( "Event \"Object New\" : object (%ld,%ld) is created\n", u8.lo, u8.hi );
return MCO_S_OK;
}
/* Handler for the "<checkpoint>" event. Reads the autoid and prints it out */
MCO_RET checkpoint_handler( /*IN*/ mco_trans_h t, /*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et, /*INOUT*/ void *param)
{
int8 u8;
param = (int *)1;
dropped_call_autoid_get( obj, &u8 );
printf( "Event \"Object Checkpoint\" : object (%ld,%ld) is about to be created\n", u8.lo, u8.hi );
return MCO_S_OK;
}
/* “<delete>”事件的处理程序。请注意,该处理程序在当前事务提交之前被调用。因此,对象仍然有效;对象句柄传递给处理程序,并用于获取对象的自增 ID。事件处理程序的返回值传递给“delete”函数,稍后由 mco_trans_commit() 进行检查。如果返回值不是 MCO_S_OK,则事务将回滚。在此示例中,每隔一个删除事务提交一次。
*/
MCO_RET delete_handler( /*IN*/ mco_trans_h t, /*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et, /*INOUT*/ void *user_param)
{
int8 u8;
dropped_call_autoid_get( obj, &u8);
printf( "Event \"Object Delete\": object (%ld,%ld) is being deleted...", u8.lo, u8.hi );
return (((u8.lo + u8.hi) %2) ? 1: MCO_S_OK);
}
/* “<update>”事件的处理程序。此处理程序在更新事务提交之前被调用,因此被更改字段的值仍会报告为未更改。 */
MCO_RET update_handler1( /*IN*/ mco_trans_h t, /*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et, /*INOUT*/ void *param)
{
uint4 u4;
int8 u8;
dropped_call_autoid_get( obj, &u8);
dropped_call_trunk_id_get(obj, &u4);
printf( "Event \"Object Update\" (before commit): object (%ld,%ld) value %d\n",
u8.lo, u8.hi, u4 );
return MCO_S_OK;
}
当应用程序完成处理事件时,事件将由以下代码取消注册:
int unregister_events(mco_db_h db)
{
MCO_RET rc;
mco_trans_h t;
mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
mco_unregister_add_trunk_handler( t, & new_handler);
mco_unregister_del_trunk_handler( t, & delete_handler);
mco_unregister_update_handler( t, & update_handler1);
rc = mco_trans_commit(t);
return rc;
}
请参阅SDK示例 samples/core/10_events_sync了解更多实现细节。