动态模式修改
从 SmartEDB 7.1 版本开始,可以通过标准的 SQL 数据定义语言(DDL)语句(如 create table/index、drop table/index 和 alter table/index)来修改 SQL 数据库模式。(有关 SQL DDL 的更多详细信息,请参阅 SmartESQL 用户指南。)
动态架构修改的副本端重放
此增强功能源于一项请求,即当主数据库架构动态修改时,将主端的 CRUD 事件注册并传播到副本端的应用程序。通过实现应用程序端的迭代器回调来实现这一点,这能让 SmartEDB 高可用性数据库运行时以 JSON 格式向应用程序发布有关数据库修改的信息。此迭代器适用于所有受支持的主机语言,并且也可以在 xSQL 配置文件中启用。
C API
副本端的 mco_HA_replica_params_t 结构体新增了两个字段:
mco_trans_iterator_callback_t iterator;
void *iterator_context;
其中:
迭代器是应用程序的回调函数,当发生 CRUD 操作及其他一些事件时,数据库运行时会调用此函数。该回调函数的原型在 mco.h 中定义如下:
typedef MCO_RET (*mco_trans_iterator_callback_t)(mco_trans_h trans, MCO_Hf* obj, int cid, int obj_state, void* user_ctx);
iterator_context
是应用程序的上下文,通过回调函数的最后一个参数user_ctx
传递进去。
JSON output
尽管应用程序可以使用通用格式来实现回调函数并实现自定义输出格式,但已实现并包含在 SmartEDB 高可用性发行版中的迭代器会以 JSON 格式(RFC 7159)表示数据库内容。JSON 转换器概要如下:
mco_trans_iterator_h mco_create_json_converter(mco_stream_h stream,
mco_json_converter_params_t *params);
回调函数的参数如下:
- stream 是用于写入 JSON 值的流。不得为 NULL。
- params 是可选的参数结构体,如下定义(它可以为 NULL,在这种情况下将使用下面描述的默认值):
参数结构体定义如下:
typedef struct {
mco_bool compact;
mco_bool ignore_stream_errors;
MCO_RET last_error;
int last_errno;
} mco_json_converter_params_t;
其中:
- 紧凑 如果设置为 true(默认值),则生成紧凑的 JSON(无空格)。设置为 false 时生成人类可读的 JSON 格式。
- 忽略流错误 表示在将 JSON 写入流时是否忽略错误。如果设置为 false(默认值),任何流错误都会导致高可用性复制停止——mco_HA_attach_master() 函数返回 MCO_E_WRITE_STREAM 错误,并将 stopReason 设置为 MCO_HA_REPLICA_ITERATOR_ERROR。如果设置为 true,则忽略错误并继续复制。
- 最后错误 输出:用于从 mco_create_json_converter() 函数返回 MCO_E_... 错误。
- 最后错误号 输出:用于从 mco_create_json_converter() 函数返回操作系统级别的错误(errno)。
通过调用以下内容,可将可选参数初始化为默认值:
void mco_json_converter_params_init(mco_json_converter_params_t *params);
如果成功,该函数已分配并返回指向 mco_trans_iterator_t 结构(这是所有未来迭代器回调实现的通用结构)的指针,其定义如下:
typedef struct mco_trans_iterator_t {
mco_trans_iterator_callback_t callback;
MCO_RET last_error;
int last_errno;
} mco_trans_iterator_t, *mco_trans_iterator_h;
请注意,mco_trans_iterator_t
表示一个头文件,类似于 C++ 术语中的基类。任何自定义迭代器都会在其头文件之后分配自己的自定义字段。 以下是一个示例:
mco_trans_iterator_h json;
mco_stream_h stream;
mco_json_converter_params_t json_params; // declare parameters
stream = ... <create stream, see below> ...
void mco_json_converter_params_init(&json_params); // initialize parameters
json_params.compact = true; // change default parameter value
json = mco_create_json_converter(stream, &json_params); // create JSON iterator
if (! json)
{
// Handle errors
}
mco_HA_replica_params_t replica_params;
mco_HA_replica_params_init(&replica_params);
replica_params.iterator = json->callback; // set callback function
replica_params.iterator_context = json; // set context
mco_HA_attach_master(db, conn_string, &replica_params, stop_reason, timeout);
数据库对象被渲染为 JSON 对象流,这些对象之间由 \0 分隔(零不会出现在 JSON 中)。因此,可以像以下代码片段那样处理该流(为简单起见,此处使用 Java):
InputStream fis = (new java.net.Socket("127.0.0.1", 5566)).getInputStream();
java.util.Scanner s = new java.util.Scanner(fis).useDelimiter("\\\0");
while (s.hasNext())
{
JSONObject obj = new JSONObject(s.next());
.....
}
每个 JSON 对象包含一个“操作”字段,其值可以是以下之一:
- TRANS_BEGIN:事务开始
- TRANS_END:事务结束(通过回滚或提交)
- OBJ_NEW:创建新对象
- OBJ_UPDATE:更新现有对象
- OBJ_DELETE:删除对象
- DELETE_ALL:删除类中的所有对象
- DB_CLEAN:主应用在事务外调用 mco_db_clean()
- SCHEMA_CHANGED:在事务外修改数据库模式
对于 OBJ_NEW、OBJ_UPDATE、OBJ_DELETE 和 DELETE_ALL 操作的 JSON 对象,包含一个表字段,表示受影响对象所属的表(类名)。对于 OBJ_NEW、OBJ_UPDATE 和 OBJ_DELETE 操作,JSON 对象还包含一个对象字段,其结构和数据字段类型与数据库对象布局一致。请注意以下格式规则:
- 数据库数组和向量表示为 JSON 数组。
- 嵌套结构表示为单独的 JSON 对象。
- 字符和字符串字段为零结尾的 UTF-8 格式。
- nchar 和 nstrings 字段为零结尾的 UTF-16 字符串。
- wchar 和 wstring 字段为零结尾的 UTF-32 字符串。
- BLOB 以 base64 编码的 JSON 字符串表示。
- NULL 值和缺失的可选结构值表示为 JSON 的 null。
例如,对于以下 SQL 布局和内容:
create table a (i int, j int, s string);
insert into a values (1,2,'test');
以下将生成 JSON 数据:
{
"operation":"SCHEMA_CHANGED"
}
{
"operation":"TRANS_BEGIN"
}
{
"operation":"OBJ_NEW",
"table":"a",
"object":{
"auto_oid@":1,
"i":1,
"j":2,
"s":"test"
}
}
{
"operation":"TRANS_END"
}
扩展的 SmartEDB 流
SmartEDB 运行时通常使用流将数据库内容输出到外部媒体,并从外部媒体或存储设备导入数据到数据库。SmartEDB 的导出/导入 API,即 mco_db_save() 和 mco_db_load() 以及一些其他函数使用读取流和写入流来导入/导出数据。迭代器扩展了流函数的功能,使其能够保留并随后将流中的错误信息返回给调用的应用程序或数据库运行时。流通过以下定义的 mco_stream_h 结构来表示:
typedef struct mco_stream_t {
mco_stream_write writer;
MCO_RET (*close) (struct mco_stream_t *self);
MCO_RET last_error;
int last_errno;
} mco_stream_t, *mco_stream_h;
其中:
writer 是一个标准的 SmartEDB 流写入器:
typedef mco_size_sig_t( *mco_stream_write)(void* stream_handle, const void* from, mco_size_t nbytes);
- close 表示流析构函数
last_error 是 MCO_E_... 错误
last_errno 是操作系统级别的 errno 值
当前已实现的是基于文件、基于服务器套接字、基于客户端套接字和基于 tee 流的流函数。
基于文件的流
文件流可以通过以下任一函数创建:
mco_stream_h mco_create_mcofile_stream(const char *filename,
mco_file_stream_params_t *params);
或
mco_stream_h mco_create_stdfile_stream(const char *filename,
mco_file_stream_params_t *params);
第一个函数通过 mco_file_h 句柄使用 SmartEDB fs 库,而第二个函数则使用标准 I/O(stdio.h)的 FILE* 句柄。目前唯一必需的参数是文件名,它表示要写入的文件名,且不能为 NULL。目前没有最优参数,流参数结构体仅包含用于返回从流中接收到的错误的字段:
typedef struct {
MCO_RET last_error;
int last_errno;
} mco_file_stream_params_t;
不过在未来,可能会有一些额外的可选参数(例如加密),所以会调用以下函数来初始化它们:
void mco_file_stream_params_init(mco_file_stream_params_t *params);
以下是一个使用示例:
mco_stream_h stream;
mco_file_stream_params_t file_params;
mco_file_stream_params_init(&file_params);
stream = mco_create_stdfile_stream("myfile.txt", &file_params);
或
mco_stream_h stream = mco_create_stdfile_stream("myfile.txt", 0);
服务器套接字流
以下函数用于创建服务器套接字流:
mco_stream_h mco_create_server_socket_stream(int port, mco_server_socket_stream_params_t *params);
唯一必需的参数是监听的端口号。可选参数由以下结构定义:
typedef struct {
timer_unit write_timeout;
mco_size_t buffer_size;
mco_size_t max_clients;
const char *net_interface;
mco_sock_params_t sock_params;
MCO_RET last_error;
int last_errno;
} mco_server_socket_stream_params_t;
其中:
- write_timeout 是写入套接字超时时间(以毫秒为单位)。默认值为 1000(1 秒)。
- buffer_size 表示用于累积要传递给套接字 send() 函数的数据的缓冲区大小。以字节为单位,默认值为 16*1024。
- max_clients - 最大并发连接客户端数。默认值为 16。
- net_interface - 监听的网络接口地址。默认值为 0.0.0.0(所有接口)。
- sock_params - 套接字参数:类型、域、SSL 等。
- last_error - 输出:用于从 mco_create_server_socket_stream() 函数返回 MCO_E_... 错误。
- last_errno - 输出:用于从 mco_create_server_socket_stream() 函数返回操作系统级别的错误(errno)。
客户端套接字流
以下函数用于创建客户端套接字流:
mco_stream_h mco_create_client_socket_stream(const char *hostname, int port,
mco_client_socket_stream_params_t *params);
所需参数为主机名(名称和 IP 地址)和端口(端口号)。可选参数由以下结构定义:
typedef struct {
timer_unit write_timeout;
timer_unit connect_timeout;
int connect_attempts;
timer_unit connect_interval;
mco_bool auto_reconnect;
mco_size_t buffer_size;
mco_sock_params_t sock_params;
MCO_RET last_error;
int last_errno;
} mco_client_socket_stream_params_t;
其中:
- write_timeout - 写入套接字超时时间(以毫秒为单位)。默认值为 1000(1 秒)
- connect_timeout - connect() 函数的超时时间(以毫秒为单位)。默认值为 2000(2 秒)
- connect_attempts - 在成功建立连接之前调用 connect() 的次数。默认值为 3
- connect_interval - 在连接尝试失败后再次尝试连接() 之前的等待时间。默认值为 0(不等待)
- buffer_size - 表示用于传递给套接字 send() 函数的数据缓冲区大小(以字节为单位)。默认值为 16*1024
- sock_params - 套接字参数:类型、域、SSL 等等
- last_error - 输出:用于从 mco_create_client_socket_stream() 函数返回 MCO_E_... 错误
- last_errno - 输出:用于从 mco_create_client_socket_stream() 函数返回操作系统级别的错误(errno)
默认值通过调用以下方式初始化:
void mco_client_socket_stream_params_init(mco_client_socket_stream_params_t *params);
以下是一个使用示例:
mco_stream_h stream;
mco_client_socket_stream_params_t stream_params;
mco_client_socket_stream_params_init(&stream_params);
stream_params.connect_timeout = 3000; // change connect timeout
// connect to myhost.com:10023
stream = mco_create_client_socket_stream("myhost.com", 10023, &stream_params);
或
// connect to myhost.com:10023
mco_stream_h stream = mco_create_client_socket_stream("myhost.com", 10023, 0);
T 型流
T 型流允许迭代器同时写入两个独立的输出流。例如,其中一个输出流可以是套接字,而另一个可以是文件。T 型流可以嵌套。
以下函数用于创建一个 T 型流:
mco_stream_h mco_create_tee_stream(mco_stream_h stream1,
mco_stream_h stream2, mco_bool any_ok);
其中:
- 流 1 - 第一个输出流
- 流 2 - 第二个输出流
- any_ok - 如果设置为 true,则只要数据成功写入任一输出流,就视为写入成功。如果设置为 false,则必须两个写入操作都成功。
以下是一个使用示例:
// create file stream
mco_stream_h file_stream = mco_create_stdfile_stream("file.txt", 0);
// create socket stream
mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com",
10023, 0);
// join file and socket stream
mco_stream_h tee_stream = mco_create_tee_stream(file_stream, socket_stream, false);
// create JSON iterator writing to both file and socket
mco_trans_iterator_h json = mco_create_json_converter(tee_stream, 0);
自定义流
可以继承现有流实现的功能,并基于现有的 stream_handle 和 stream_writer 创建自定义的流句柄 mco_stream_h:
mco_stream_h mco_create_custom_stream(void* stream_handle,
mco_stream_write output_stream_writer,
MCO_RET(*close)(void *));
其中:
- - stream_handle - 流上下文
- stream_writer - 流函数 -
- close - 流析构函数,可以为 NULL
以下是一个使用示例:
static mco_size_sig_t file_writer(void* stream_handle, const void* from, mco_size_t nbytes)
{
FILE *f = (FILE *) stream_handle;
return fwrite(from, 1, nbytes, f);
}
FILE *f = fopen(filename, "w");
mco_stream_h stream = mco_create_custom_stream(f, file_writer,(MCO_RET (*)(void*)) fclose);
说明
如果出现错误,上述所有的 mco_create_XXX_stream() 函数都将返回 NULL,并在 params->last_error 和 params_last_errno(如果已定义)中设置相应的错误代码。
如果一个流被销毁,所有嵌套的流也会被销毁。例如对于 tee 流:
MCO_RET mco_destroy_stream(mco_stream_h s);
When the iterator is destroyed, all dependent resources (streams) are also destroyed:
MCO_RET mco_destroy_trans_iterator(mco_trans_iterator_h t);
例如:
// create file stream
mco_stream_h file_stream = mco_create_stdfile_stream("file.txt", 0);
// create socket stream
mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com", 10023, 0);
// join file and socket stream
mco_stream_h tee_stream = mco_create_tee_stream(file_stream,
socket_stream, false);
// create JSON iterator writing to both file and socket
mco_trans_iterator_h json = mco_create_json_converter(tee_stream, 0);
//... Use iterator ...
mco_destroy_trans_iterator(json);
// Don't call mco_destroy_stream() for file_stream, socket_stream, tee_stream
迭代器和 mco_stream_h 在 target/mcoiter 库中实现。此库仅使用 SmartEDB 公共 API(mco.h、mcouda.h 和 mconet.h)。此外,标准 I/O(stdio.h)用于 mco_create_stdfile_stream()。通过在 target/mcoiter/mcoiter.c 文件中定义 MCO_STDIO_FILE_STREAM 可以关闭 stdio 的使用:
#define MCO_STDIO_FILE_STREAM 0
最后,“新风格”的流也可以与为“旧风格”流编写的 SmartEDB API 函数一起使用。以下代码片段说明了如何使用 mco_db_save() API 和“新风格”的套接字流连接到服务器并导出数据库内容:
mco_stream_h socket_stream = mco_create_client_socket_stream("myhost.com", 10023, 0); if (socket_stream) { mco_db_save(socket_stream, socket_stream->writer, connection); mco_destroy_stream(socket_stream); }
Java API
基于 Java 的迭代器利用标准的 Java 输出流来输出数据库内容。迭代器本身在 TransIterator 类中定义:
public class TransIterator {
public abstract static class Iterator
{
public void close();
}
public static class JsonConverter extends Iterator
{
public JsonConverter(java.io.OutputStream os, boolean compact,
boolean ignoreStreamErrors);
public JsonConverter(java.io.OutputStream os);
}
}
这里,TransIterator.Iterator 表示所有内部类型的基类,而 TransIterator.JsonConverter 实现了基于 JSON 的迭代器。以下是一个将 JSON 迭代器写入文件的示例:
ReplicaConnection con = new ReplicaConnection(db);
ReplicaConnection.Parameters replParams = new ReplicaConnection.Parameters();
replParams.iterator = new TransIterator.JsonConverter(new java.io.FileOutputStream("myfile.txt"), false, false);
con.attachMaster(connectionString, replParams, CONNECT_TIMEOUT);
replParams.iterator.close();
C# API
与 Java 类似,标准的 .NET Framework 输入/输出流被使用:
public class TransIterator
{
public abstract class Iterator
{
public Iterator(Connection con);
public void Close();
}
public class JsonIterator : Iterator
{
public JsonIterator(Connection con, System.IO.Stream s, bool compact,
bool ignore_stream_errors);
public JsonIterator(Connection con, System.IO.Stream s);
}
}
以下是一个将 JSON 迭代器写入文件的示例:
ReplicaConnection con = new ReplicaConnection(db);
ReplicaConnection.Parameters replParams = new ReplicaConnection.Parameters();
replParams.Iterator = new TransIterator.JsonIterator(con, new System.IO.FileStream("myfile.txt", System.IO.FileMode.Create));
con.AttachMaster(connectionString, replParams, CONNECT_TIMEOUT);
replParams.Iterator.Close();
Python API
Python API 是围绕 C 语言的 mco_stream_h 构建的封装。因此,大多数 Python 方法及其参数与相应的 C 函数相同。要创建一个文件流:
def create_file_stream(filename)
要创建一个服务器流(listen=True)和客户端流(listen=False):
def create_socket_stream(hostname, port, listen = False,
write_timeout = -1, buffer_size = -1, max_clients = -1,
connect_timeout = -1, connect_attempts = -1, c
onnect_interval = -1, reconnect = -1, socket_domain = -1 )
请注意,对于服务器类型套接字,主机名可以为None。如果主机名不等于None,那么它对应于mco_server_socket_stream_params_t::net_interface。buffer_size、write_timeout、socket_domain参数对于客户端和服务器套接字都是通用的,而max_clients参数仅为服务器定义,connect_timeout、connect_attempts、connect_interval、reconnect仅为客户端套接字定义。
以下是一个关于 JSON/Tee 流使用的示例:
db = exdb.open_database(dbname="rpldb", dictionary=dict, is_disk=is_disk,
log_params=log_params)
sock_stream = exdb.create_socket_stream("myhost.com", 10023, listen=False,
write_timeout=200);
file_stream = exdb.create_file_stream("out1.txt");
tee_stream = exdb.create_tee_stream(sock_stream, file_stream);
json_ierator = exdb.create_json_iterator(tee_stream);
replcon = exdb.ReplicaConnection(db)
params = exdb.ReplicaConnectionParameters()
params.iterator = json_iterator;
replcon.attachMaster(ha_replica_connstr, params, CONNECT_TIMEOUT):
exdb.destroy_trans_iterator(params.iterator);
xSQL API
对于 xSQL,迭代器在配置文件中以 JSON 对象的形式进行描述。每个迭代器都包含一个 type 字段,目前必须将其设置为 json。其他字段及其值取决于迭代器类型。对于 json 类型,字段包括 stream、compact(可选,布尔值)和 ignore_stream_errors(可选,布尔值)。
流也在配置文件中定义。流的类型可以是文件、套接字或 tee。其他字段及其值取决于流的类型。
对于类型文件:
{
type : "file",
name : "myfile" # file name
}
*type: "socket", listen: true (server-side socket:
{
type : "socket",
listen : true,
port : 10000, # listen port
buffer_size : 1k, # optional, mco_server_socket_stream_params_t::buffer_size
max_clients : 3, # optional, mco_server_socket_stream_params_t::max_clients
net_interface : 192.168.0.3, # optional, mco_server_socket_stream_params_t::net_interface
write_timeout : 1000, # optional, mco_server_socket_stream_params_t::write_timeout
sock_params : { # optional
domain : "inet" # other options are "local" or "sdp",
# mco_server_socket_stream_params_t::sock_params::domain
mode : ["nodelay"], # combination of "nodelay", "do_not_reuse_address",
# "non_blocking", "keep_alive" or
# "do_not_cloexec", mco_server_socket stream_params_t::sock_params::mode
sndbuf : 16k, # mco_server_socket_stream_params_t::sock_params::sndbuf
use_ssl : false, # apply "ssl_params" for this socket or not
}
}
类型 socket
, listen: false (client socket)
:
{
type : "socket",
listen : false,
hostname : "192.168.0.1" # host to connect to,
port : 10023 # port to connect
auto_reconnect : true, # optional, mco_client_socket_stream_params_t::auto_reconnect
buffer_size : 1k, # optional, mco_client_socket_stream_params_t::buffer_size
connect_attempts : 3, # optional, mco_client_socket_stream_params_t::connect_attempts
connect_interval : 1000, # optional, mco_client_socket_stream_params_t::connect_attempts
connect_timeout : 2000, # optional, mco_client_socket_stream_params_t::connect_timeout
write_timeout : 500, # optional, mco_client_socket_stream_params_t::write_timeout
sock_params : { optional,
domain : "inet" # other options are "local" or "sdp",
# mco_server_socket_stream_params_t::sock_params::domain
mode : ["nodelay"], # combination of "nodelay", "do_not_reuse_address",
# "non_blocking", "keep_alive" or
# "do_not_cloexec", mco_server_socket_stream_params_t::sock_params::mode
sndbuf : 16k, # mco_server_socket_stream_params_t::sock_params::sndbuf
use_ssl : false, # apply "ssl_params" for this socket or not
}
}
类型 tee
:
{
type : "tee",
stream1 : {
...
}
stream2 : {
...
}
}
例如,以下配置定义了一个副本端的 JSON 迭代器,它将名为 myfile.txt 的文件写入到套接字 127.0.0.1:10023 中。
ha_params : {
connection_strings : "127.0.0.1:10000",
replica_params : {
iterator : {
type : "json",
compact : false,
stream : {
type : "tee",
stream1 : {
type : "socket",
listen : false,
hostname : "127.0.0.1",
port : 10023,
},
stream2 : {
type : "file",
name : "myfile.txt"
}
}
}
}
}