Python 中应用
Python 集群支持是在 SmartEDB 集群之上的一层薄薄的封装。要使用 Python 实现集群应用程序,首先初始化一组集群节点,然后调用 exdb.open_database() 方法,并为每个节点创建一个监听线程。 在创建集群之前,Python 运行时必须加载支持集群的 SmartEDB 运行时。这是通过向 exdb.init_runtime() 方法传递参数 cluster = True 来实现的。(请注意,仅支持 MVCC 事务管理器用于集群。)
exdb.init_runtime(is_disk, 'mvcc', is_shm, is_debug, cluster = True)
然后通过调用 exdb.open_database() 来创建每个节点。例如:
db = exdb.open_database(
dbname='clusterdb_%s' % node_id,
dictionary=dict,
clusterParams=cluster_params,
is_disk=is_disk,
db_segment_size=128*1024*1024
);
这里通过将唯一的 node_id 连接起来使数据库名称 dbname 唯一化。参数 clusterParams 是 Python 类 exdb.ClusterParams 的一个实例,它与 C API 结构体 mco_cluster_params_t 相对应。可以按如下方式初始化 clusterParams,以便在本地主机上使用不同的端口:
nodes = [exdb.ClusterNodeParams("127.0.0.1:2001"),
exdb.ClusterNodeParams("127.0.0.1:2002"),
exdb.ClusterNodeParams("127.0.0.1:2003")]
cluster_params = exdb.ClusterParams(nwtype = 'TCP', nodeId = int(node_id),
nodes = nodes, notifyCallback=onNotify)
ClusterNodeParams 类具有以下构造函数:
def __init__(self, addr, qrank = 1):
self.addr = addr
self.qrank = qrank
其中 addr 是节点的 TCP 地址,qrank 是此节点的仲裁等级。
ClusterParams
exdb.ClusterParams 类的构造函数定义为:
def __init__(self, nodeId, nwtype, nodes = [], nwparams = None,
MPICluster = False, window = None, conn_pool_factor = 50,
sync_msg_objects = 100, sync_msg_size = 0,
notifyCallback = None, quorumCallback = None,
clusterSendBuf = 0, clusterRecvBuf = 0, mode_mask = 0
):
其中各元素如下:
nodeId
此集群节点的整数标识符。转换为 node_id 集群参数。
nwtype
集群类型。可以是“TCP”或“MPI”。
nodes
集群中的节点列表。每个节点都表示为 ClusterNodeParams 类的一个实例。
nwparams
可选的网络参数。一个字典,用于设置集群的网络参数。TCP 集群字典的键:
- "connectTimeout"
- "connectInterval"
- "socketSendBuf"
- "socketRecvBuf"
- "socketDomain"
- "keepAliveTime"
- "keepAliveProbes"
- "ssl_params"
如果指定了 ssl_params,则会使用 SSL 层;用于设置 SSL 加密连接的参数是一个字典,包含以下内容:
- "cipher_list" - string
- "max_cert_list" - long
- "verify_mode" - int
- "verify_depth" - long
- "cert_file_pem" - string
- "pkey_file_pem" - string
nwparams 参数使用示例:
nwparams = {"connectTimeout" : 15000, "keepAliveTime" : 600}
MPICluster
如果使用 MPI 通道,请将其设置为 True。
window
一个声明如下所示的类型为 ClusterWindow 的对象:
class ClusterWindow(object):
def __init__(self, bsize = 0, length = 0, timeout = 1):
self.bsize = bsize
self.length = length
self.timeout = timeout
conn_pool_factor
一个整数值,用于指定连接池的大小(以数据库最大连接数的百分比表示)。
sync_msg_objects
一个整数值,用于指定同步期间每条消息中的对象最大数量。
sync_msg_size
一个整数值,用于指定同步期间消息的最大字节大小。
notifyCallback
用于接收集群通知的 Python 回调函数。例如:
notifications = ["connect", "disconnect"]
def onNotify(notification_code, node_info):
print 'Cluster notification: code %s info %s' % (notifications[notification_code], node_info)
quorumCallback
当未达到集群仲裁时要调用的回调函数
clusterSendBuf
一个整数值,指定内部发送缓冲区的大小(以字节为单位)。
clusterRecvBuf
一个整数值,指定内部接收缓冲区的大小(以字节为单位)。
mode_mask
一个集群模式掩码(mco_cluster_params_t::mode_mask);调试输出(MCO_CLUSTER_MODE_DEBUG_OUTPUT)和早期数据发送(MCO_CLUSTER_MODE_EARLY_DATA_SEND)的组合。
示例
以下代码片段(来自示例 samples/python/cluster)展示了简单的 Python 集群应用程序如何执行这些步骤:
class ListenThread(threading.Thread):
def __init__(self, db):
self.db = db
super(ListenThread, self).__init__()
def run(self):
print "Listen thread started"
con = self.db.connect()
print "Listen thread connected:", con
con.listen()
con.close()
}
...
notifications = ["connect", "disconnect"]
def onNotify(notification_code, node_info):
print 'Cluster notification: code %s info %s' % (notifications[notification_code], node_info)
...
def start_node(n_nodes, node_id, q = None):
...
exdb.init_runtime(is_disk, 'mvcc', is_shm, is_debug, cluster = True)
db = None
try:
# use different ports on the localhost
nodes = [exdb.ClusterNodeParams("127.0.0.1:200%s" % (10 + i * 10)) for i in xrange(n_nodes)]
cluster_params = exdb.ClusterParams(nwtype = 'TCP', nodeId = int(node_id),
nodes = nodes, notifyCallback=onNotify)
dict = exdb.load_dictionary(schema, persistent=is_disk, debug=is_debug)
db = exdb.open_database(dbname='clusterdb_%s' % node_id, dictionary=dict,
clusterParams=cluster_params,
is_disk=is_disk, db_segment_size=128*1024*1024);
print 'Node %s: Database opened' % node_id
listenThread = ListenThread(db)
listenThread.start()
print 'Node %s: ListenThread started' % node_id
with db.connect() as con:
print 'Node %s: Database connected' % node_id
startTime = time.time()
doWork(con, n_nodes, node_id)
stopTime = time.time()
print 'Node %s: Work finished' % node_id
con.stop() # stop cluster runtime. Returns after ALL nodes call stop()
stopAllTime = time.time()
print 'Node %s: Connection stopped' % node_id
listenThread.join()
...
finally:
if not db is None:
db.close()
return