数据库事件
事件声明用于定义应用程序将收到的通知,例如:添加新对象、删除对象、更新对象或其指定字段(数组、向量和二进制大对象字段以及内部结构的元素除外),以及检查点事件。(请注意,检查点事件仅在显式调用检查点 API 时触发。)这些事件是特定于类的,因此只有定义了事件的类才会触发相应的通知。
数据库事件可以由应用程序同步或异步处理。
异步事件处理
在异步事件处理中,应用程序会为每种类型的事件创建一个独立的线程。对于 C 和 C++ 应用程序,事件线程会调用 mco_async_event_wait() 函数来等待指定的事件。而在 Java、Python 和 C# 应用程序中,则会调用 Connection 方法 WaitEvent()。当事件发生时,SmartEDB 运行时会释放该线程,使其能够继续处理其他任务。这样,事件处理线程可以与其他线程并行运行,直到完成处理并再次调用 mco_async_event_wait() 或 WaitEvent() 等待下一个事件。
需要注意的是,在事件处理程序完成任务并重新进入等待状态之前,可能会有新的事件实例发生(事件不会排队)。为了最小化这种风险,您可以将事件处理委托给另一个线程,使当前处理程序线程能够立即重新进入等待状态。如果无法容忍未处理事件的风险,您可以选择使用同步事件处理程序,或者维护一个未处理事件的列表,甚至可以利用 SmartEDB 的事务日志功能中的 mco_trans_play() 函数。异步事件会在事务提交后被激活。如果在一个事务中添加、删除了多个对象,或者更新了多个具有事件处理程序的字段,所有相关的处理程序将同时被激活。
在 C 或 C++ 应用程序中,事件处理程序的定义如下所示:
/* Thread function that handles the <new> event. */
void NewEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_newEvent) )
{
/* Process the event. */
...
}
}
/* Thread function that handles the <update> event. */
void UpdateEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_updateEvent) )
{
/* Process the event. */
...
}
}
/* Thread function that handles the <delete> event. */
void DeleteEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_deleteEvent) )
{
/* Process the event. */
...
}
}
/* Thread function that handles the <update> event. */
void CheckpointEventHandler( sample_task_t * descriptor )
{
mco_db_h db = descriptor->db_connection;
while ( MCO_S_OK == mco_async_event_wait(db, MCO_EVENT_checkpointEvent) )
{
/* Process the event. */
...
}
}
接下来,C/C++ 应用程序会启动事件处理线程,并让主应用程序线程短暂休眠(通常 100 毫秒就足够了),以便给事件处理线程一些时间开始监听。之后,主应用程序线程将继续进行正常的数据库处理。当应用程序结束时,它会调用 mco_async_event_release_all() 来释放所有事件,并停止事件处理线程。(有关实现细节,请参考 SDK 示例 samples/native/core/10-events/asynch。)
同样地,Java 和 C# 应用程序也会为每个事件启动独立的线程来处理,每个线程都使用如下所示的 ThreadProc:
private class ThreadParams
{
public Connection con;
public string event_name;
public ThreadParams(Connection con, string EventName)
{
this.con = con;
this.event_name = EventName;
}
}
private static void ThreadProc(object param)
{
ThreadParams tp = (ThreadParams)param;
try
{
while (!exit)
{
tp.con.WaitEvent(tp.event_name);
}
}
catch (DatabaseError x)
{
if (x.errorCode >= 50)
{
// Errors
}
else
{
// Normal return codes including MCO_S_EVENT_RELEASED
}
}
}
如上文所述,C# 和 Java 应用程序将使主应用程序线程暂停几毫秒,然后继续进行正常的数据库处理。在终止时,应用程序将调用 Connection 方法 ReleaseAllEvents(),然后停止事件处理程序线程。(有关进一步的实现细节,请参阅 SDK 示例 samples/java/events/asyncbasic 或 samples/csharp/AsyncEvent。)C# 应用程序还可以通过委托机制处理异步事件,方法是将处理程序添加到 Connection 属性 AsynchEvent 和 AsyncEventError 中,然后调用方法 StartEventListeners()。(有关进一步的实现细节,请参阅 SDK 示例 samples/csharp/events/AsyncDelegate。)
同步事件处理
C 和 C++ 应用程序不仅可以异步响应 SmartEDB 事件,还可以选择同步处理这些事件。不过,Java 和 C# 应用程序目前不支持同步事件处理功能。同步事件处理程序会在引发事件的同一线程上下文中被调用。因此,在编写事件处理程序时,请务必小心,避免造成过长的延迟,因为处理程序控制着一个读写事务,可能会暂时阻塞对数据库的访问。具体来说,处理程序不应等待不确定的外部事件,例如用户输入。
同步处理程序通过返回 MCO_S_OK 来表示成功完成;其他返回值(如 mco.h 中定义的 MCO_S_* 或 MCO_E_* 常量,或用户自定义的值)则表示成功或失败,并附带额外信息。SmartEDB 会将此返回值传递给应用程序,以便应用程序根据需要采取相应措施(例如在必要时回滚事务)。
要注册同步处理程序,您可以通过调用为每个事件生成的 mco_register_<事件名称>_handler() 函数来实现。对于单个事件类型,您可以注册多个处理程序,但它们被调用的顺序无法预测。在注册时,应用程序可以传递一个用户定义的参数,该参数将在调用事件处理程序时传递给它。这个参数是一个 void 指针,可以根据应用程序的需求引用简单的标量值或复杂的数据结构。
对于新对象事件,同步处理程序会在对象实例化后立即由 classname_new() 或 classname_oid_new() 方法调用(因此对象句柄在此时是有效的)。对于检查点事件,同步处理程序会在对象插入索引之前或之后立即由 classname_checkpoint() 方法调用——应用程序可以在注册处理程序时指定是在插入索引之前还是之后调用。尽管 mco_trans_commit() 函数也会更新索引,但它不会触发检查点事件。对于删除事件,同步处理程序会在对象删除之前由 classname_delete() 方法调用(此时对象句柄仍然有效)。
请注意,delete 事件不会由 classname_delete_all() 方法触发。只有 classname_delete() 方法才会触发这些事件。
更新事件可以为整个类(即该类的所有字段)定义,也可以为类中的特定字段定义,方法是在事件声明中指定字段名称。与检查点事件类似,应用程序必须在注册处理程序时指定是在字段更新之前还是之后调用处理程序。更新处理程序由导致字段内容更改的接口方法激活,例如 classname_fieldname_put()。如果在更新之前调用事件处理程序,并且处理程序对字段调用 classname_fieldname_get(),则它将检索数据库中的当前值。相反,如果在更新之后调用事件,则处理程序将检索应用程序刚刚放入数据库中的值。用户定义的参数可用于向处理程序提供额外信息,例如在更新之前事件处理程序的传入值或更新之后事件处理程序的旧值。
请注意,同步和异步事件均可应用于任何给定的事件。在使用 SmartEDB 的共享内存模式时,同步事件处理程序必须属于引发事件的同一进程。因此,如果有可能进程 B 会插入、更新或删除 Alpha 对象,那么请勿在进程 A 中为类 Alpha 注册同步事件处理程序。建议在这种情况下使用异步事件处理程序。
此外,对于更新事件,同一类的类级事件不能与该类的字段更新事件合并。
示例
以下代码片段说明了事件处理。请考虑以下用于具有事件通知的类的模式 DDL 定义:
class dropped_call
{
uint4 trunk_id;
...
autoid[10000];
event < trunk_id update > upd_trunk; // any name will do
event < new > add_trunk;
event < checkpoint > checkpoint_trunk;
event < delete > del_trunk;
};
模式编译器将在接口头文件中生成以下定义:
#define MCO_EVENT_upd_trunk 15
// 15 is only illustrative; the actual value is not important
typedef MCO_RET (*mco_upd_trunk_handler)( /*IN*/ mco_trans_h *t,
/*IN*/ dropped_call *handle,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param );
MCO_RET mco_register_upd_trunk_handler( /*IN*/ mco_trans_h *t,
/*IN*/ mco_upd_trunk_handler handler,
/*IN*/ void *param,
/*IN*/ MCO_HANDLING_ORDER when);
MCO_RET mco_unregister_upd_trunk_handler( /*IN*/ mco_trans_h *t,
/*IN*/ mco_upd_trunk_handler handler);
#define MCO_EVENT_add_trunk 16
typedef MCO_RET (*mco_add_trunk_handler)( /*IN*/ mco_trans_h *t,
/*IN*/ dropped_call *handle,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param );
MCO_RET mco_register_add_trunk_handler( /*IN*/ mco_trans_h *t,
/*IN*/ mco_add_trunk_handler handler,
/*IN*/ void *param);
MCO_RET mco_unregister_add_trunk_handler( /*IN*/ mco_trans_h *t,
/*IN*/ mco_add_trunk_handler handler);
#define MCO_EVENT_checkpoint_trunk 17
typedef MCO_RET (*mco_checkpoint_trunk_handler)( /*IN*/ mco_trans_h *t,
/*IN*/ dropped_call *handle,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param );
MCO_RET mco_register_checkpoint_trunk_handler( /*IN*/ mco_trans_h *t,
/*IN*/ mco_checkpoint_trunk_handler handler,
/*IN*/ void *param,
/*IN*/ MCO_HANDLING_ORDER when);
MCO_RET mco_unregister_checkpoint_trunk_handler(/*IN*/ mco_trans_h *t,
/*IN*/ mco_checkpoint_trunk_handler handler);
#define MCO_EVENT_del_trunk 18
typedef MCO_RET (*mco_del_trunk_handler)( /*IN*/ mco_trans_h *t,
/*IN*/ dropped_call *handle,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param );
MCO_RET mco_register_del_trunk_handler( /*IN*/ mco_trans_h *t,
/*IN*/ mco_del_trunk_handler handler,
/*IN*/ void *param);
MCO_RET mco_unregister_del_trunk_handler( /*IN*/ mco_trans_h *t,
/*IN*/ mco_del_trunk_handler handler);
若要为上述事件之一使用异步处理程序,C/C++ 应用程序将启动一个线程,并在该线程函数内调用:
mco_async_event_wait( dbh, MCO_EVENT_upd_trunk );
其中,dbh 是通过 mco_db_connect() 调用获取的数据库句柄,而 upd_trunk 是在生成的接口文件中定义的值,用于引用感兴趣的事件。Java 和 C# 应用程序将启动一个线程并调用 Connection 方法 WaitEvent()。
如前所述,此线程将阻塞(等待),直至被 SmartEDB运行时释放。它可以通过事件的发生而被释放,或者 C/C++ 应用程序也可以通过调用以下任意一个函数强制释放它:
mco_async_event_release( dbh, upd_trunk );
mco_async_event_release_all_( dbh );
C/C++ 事件处理程序将通过 mco_async_event_wait()
的返回值来判断事件是因事件发生而释放,还是由 release()
函数释放。MCO_S_OK
表示事件已发生;MCO_S_EVENT_RELEASED
表示事件已被释放。Java 和 C# 事件处理程序在事件被释放时会捕获带有错误代码 MCO_S_EVENT_RELEASED 的异常。 对于前面的类定义及其生成的接口,以下代码片段说明了同步事件处理。首先,C/C++ 应用程序必须使用如下代码为其同步事件处理函数进行注册:
int register_events(mco_db_h db)
{
MCO_RET rc;
mco_trans_h t;
mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
mco_register_add_trunk_handler(t, &new_handler, (void*) 0);
mco_register_checkpoint_trunk_handler(t, &checkpoint_handler,
(void*) 0, MCO_BEFORE_UPDATE );
mco_register_del_trunk_handler(t, &delete_handler, (void *) 0);
mco_register_upd_trunk_handler( t, &update_handler1,
(void *) 0, MCO_BEFORE_UPDATE );
rc = mco_trans_commit(t);
return rc;
}
C/C++ 处理函数的主体将如下所示:
/* Handler for the "<new>" event. Reads the autoid and prints it out */
MCO_RET new_handler( /*IN*/ mco_trans_h t,
/*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param)
{
int8 u8;
param = (int *)1;
dropped_call_autoid_get( obj, &u8 );
printf( "Event \"Object New\" : object (%llu) is created\n", u8 );
return MCO_S_OK;
}
/* Handler for the "<checkpoint>" event. Reads the autoid and prints it out */
MCO_RET checkpoint_handler( /*IN*/ mco_trans_h t,
/*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param)
{
int8 u8;
param = (int *)1;
dropped_call_autoid_get( obj, &u8 );
printf( "Event \"Object Checkpoint\" : object (%ld,%ld) is about to be created\n", u8.lo, u8.hi );
return MCO_S_OK;
}
/* Handler for the "<delete>" event. Note that the handler
* is called before the current transaction is committed.
* Therefore, the object is still valid; the object handle
* is passed to the handler and is used to obtain the
* autoid of the object. The event's handler return value
* is passed into the "delete" function and is later
* examined by the mco_trans_commit(). If the value is
* anything but MCO_S_OK, the transaction is rolled back.
* In this sample every other delete transaction is
* committed.
*/
MCO_RET delete_handler( /*IN*/ mco_trans_h t,
/*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *user_param)
{
int8 u8;
dropped_call_autoid_get( obj, &u8);
printf( "Event \"Object Delete\": object (%ld,%ld) is being deleted...", u8.lo, u8.hi );
return (((u8.lo + u8.hi) %2) ? 1: MCO_S_OK);
}
/* Handler for the "update" event. This handler is called
* before the update transaction is committed - hence the
* value of the field being changed is reported unchanged
* yet.
*/
MCO_RET update_handler1( /*IN*/ mco_trans_h t,
/*IN*/ dropped_call * obj,
/*IN*/ MCO_EVENT_TYPE et,
/*INOUT*/ void *param)
{
uint4 u4;
int8 u8;
dropped_call_autoid_get( obj, &u8);
dropped_call_trunk_id_get(obj, &u4);
printf( "Event \"Object Update\" (before commit): object (%ld,%ld) value %d\n", u8.lo, u8.hi, u4 );
return MCO_S_OK;
}
当 C/C++ 应用程序完成事件处理后,会通过如下代码注销事件:
int unregister_events(mco_db_h db)
{
MCO_RET rc;
mco_trans_h t;
mco_trans_start(db, MCO_READ_WRITE, MCO_TRANS_FOREGROUND, &t);
mco_unregister_add_trunk_handler( t, & new_handler);
mco_unregister_del_trunk_handler( t, & delete_handler);
mco_unregister_update_handler( t, & update_handler1);
rc = mco_trans_commit(t);
return rc;
}
有关进一步的实现细节,请参阅 SDK 示例 samples/native/core/10-events/synch。