MasterConnection
MasterConnection 类扩展了 Connection 类,用于 SmartEDB 高可用性主应用程序。复制主应用程序必须使用此类的实例,而不是使用 Connection 类。
类定义
public class MasterConnection extends Connection
{
/**
* Enable HA commit sharing.
*/
public static final int MCO_HAMODE_MULTIPROCESS_COMMIT = 0x2;
/**
* Activae HA Asynchronous replication mode, if used, the notification flag
* and the notification callback routine address MUST also be set.
*/
public static final int MCO_HAMODE_ASYNCH = 0x4;
/**
* Enable multicast replication mode .
*/
public static final int MCO_HAMODE_MCAST = 0x8;
/**
* Enable multicast reliable mode.
*/
public static final int MCO_HAMODE_MCAST_RELIABLE = 0x10;
/**
* Enable Hot synchroniation mode.
*/
public static final int MCO_HAMODE_HOTSYNCH = 0x20;
/**
* Activate HA stateful replication mode. This mode allows acceleration of the restart process
* after a short-term disconnect by the replica. The master saves the last trans_log_length
* transactions in its database. If a replica re-attaches quickly, it will have missed only
* a few of the master's transactions. So if the replica missed less than trans_log_length
* transactions, the master can send only these skipped transactions to the replica instead
* of a full initial synchronization.
*/
public static final int MCO_HAMODE_STATEFUL_REPLICATION = 0x40;
/**
* Enable binary evolution mode.
*/
public static final int MCO_HAMODE_BINEVOLUTION = 0x80;
public interface ErrorHandler
{
public void handler(int errCode, int replicaNo);
};
/**
* The replication connection parameters.
*/
public static class Parameters
{
/**
* The timeout for read transaction data.
*/
public int commitTimeout;
/**
* The timeout (wait limit) for initial synchronization.
*/
public int initialTimeout;
/**
* The amount of time a replica waits for a next commit.
*/
public int synchTimeout;
/**
* The amount of time a replica waits for a next commit.
*/
public int detachTimeout;
/**
* A combination of MCO_HAMODE_* bit flags.
*/
public int modeFlags;
/**
* The maximum number of replicas for a master.
*/
public int maxReplicas;
/**
*
*/
public int transLogLength;
/**
* Device representing asynchronous buffer
*/
public Database.Device asyncBuf;
/**
* Detach replica callback
*/
public ErrorHandler errorHandler;
/**
* Multicast support
*/
public int mcastPort;
public String mcastAddr;
/**
* Number of objects to send at once during hot-synchronization
*/
public int hotsyncMsgObjects;
/**
* Max message size during hot-synchronization
*/
public int hotsyncMsgSize;
/**
* Minimum number of connected replicas in order to the master database to accept write transactions
*/
public int quorum;
/**
* SSL parameters or null if SSL not used
*/
public Database.SSLParameters sslParameters = null;
/**
* Compression level: 0..9, 0 - no compression
*/
public int compressionLevel;
public Parameters()
{
initParameters(this);
asyncBuf = null;
}
public Parameters(int mode)
{
this();
modeFlags = mode;
}
}
public MasterConnection(Database db)
{
super(db);
}
/**
* Set the replication mode.
* @param params Replication parameters.
*/
public void setReplicationMode(MasterConnection.Parameters params)
{
setReplicationMode(id, params);
}
/**
* This method is called by the master application (primary master process for a shared memory application)
* to wait for a connection request from a replica.
* @param masterPort the transport-layer-dependent address of the master.
* @param timeout the connection timeout period.
* @return true if the new replica is successfully attached, false if no replicas connect within this period,
*/
public boolean attachReplica(String masterPort, int timeout)
{
return attachReplica(id, masterPort, timeout);
}
/**
* This method detaches (disconnects) the $replica identified by the index.
* @param index the index of the replica (valid indexes range from 0 to getNumberOfReplicas()).
*/
public void detachReplica(int index)
{
detachReplica(id, index);
}
/**
* Get mode flags
*/
public int getMode()
{
return getMode(id);
}
/**
* This method detaches (disconnects) all connected replicas.
*/
public void detachAllReplicas()
{
detachAllReplicas(id);
}
/**
* Close this connection.
*/
public void disconnect()
{
super.disconnect();
releaseResources(buf);
}
/**
* Close this connection. The same as disconnect.
*/
public void close()
{
disconnect();
}
/**
* This method is called by the master to set the transaction packet size. With synchronous replication
* (default mode), a replica does not confirm each of the master's transactions, but a transaction packet
* as a whole. Thus, the larger the window_size, the fewer confirmations the replica will send to the
* master. This can improve the speed of replication, but the replica can "lag" behind the master up
* to window_size transactions.
* @param size the transaction packet size.
*/
public void setTransWindowSize(int size)
{
setTransWindowSize(id, size);
}
/**
* This method is called by the master to force replicas to confirm a transaction package.
* It is useful when the transaction window size is more than 1 (see setTransWindowSize).
*/
public void commitWindow()
{
commitWindow(id);
}
/**
* This method is called by the master to set the quorum - minimum number of connected replicas
* in order to the master database to accept write transactions
* @param quorum new quorum value.
*/
public void setQuorum(int quorum)
{
setQuorum(id, quorum);
}
/**
* This function returns the current number of replicas attached to the master.
*/
public int getNumberOfReplicas()
{
return getNumberOfReplicas(id);
}
/**
* This function is used by the master to send the "KEEP_ALIVE" signal to replicas.
*/
public void keepAlive(int timeout)
{
keepAlive(id, timeout);
}
/**
* This method shutdowns the HA subsystem.
*/
public void stopReplication()
{
stopReplication(id);
}
/**
* This method is called by the master if asynchronous replication mode is enabled to send transaction
* data from its buffer to replicas.
* @return true if the replication was sucessfully performed, false if replication was stopped by method stopReplication.
*/
public boolean asyncSendDataToReplicas()
{
return asyncSendDataToReplicas(id);
}
/**
* This function can be called by a separate thread of the primary master process in order to implement
* synchronization of committed transactions completed by other (secondary) masters attached to a
* shared memory database.
*/
public void transCommitSync()
{
transCommitSync(id);
}
public static class ReplicaAddress
{
public String ip;
public int port;
public ReplicaAddress(String ip, int port)
{
this.ip = ip;
this.port = port;
}
}
/**
* Get replica IP address and port
* @param no replica number
* @return replica connection address
*/
public ReplicaAddress getReplicaAddress(int no)
{
return getReplicaAddress(id, no);
}
/**
* Enable or disable replication filter for "local" classes
*/
public void enableFilter(boolean enabled)
{
enableFilter(id, enabled);
}
public Database.HAChannelInfo getReplicaInfo(int no)
{
return getReplicaInfo(id, no);
}
long buf;
private native boolean attachReplica(long id, String masterPort, int timeout);
private native void detachReplica(long id, int index);
private native void detachAllReplicas(long id);
private native void setReplicationMode(long id, MasterConnection.Parameters params);
private native void releaseResources(long buf);
private native void setTransWindowSize(long id, int size);
private native void setQuorum(long id, int quorum);
private native void commitWindow(long id);
private native int getNumberOfReplicas(long id);
private native void keepAlive(long id, int timeout);
private native void stopReplication(long id);
private native boolean asyncSendDataToReplicas(long id);
private native void transCommitSync(long id);
private native ReplicaAddress getReplicaAddress(long id, int no);
private native void enableFilter(long id, boolean enabled);
private static native void initParameters(MasterConnection.Parameters params);
private native int getMode(long id);
private native Database.HAChannelInfo getReplicaInfo(long id, int no);
}
方法
setQuorum
此功能由主节点调用,用于在运行时设置高可用性(HA)仲裁数。请注意,只有在同步复制模式下,才允许设置大于 0 的仲裁数。
public void setQuorum(int quorum);
参数
- quorum:可接受的活动副本数量(包括 0 个)
示例
MasterConnection con = new MasterConnection(db);
MasterConnection.Parameters ha_params = new MasterConnection.Parameters();
ha_params.quorum = 1; /* set initial value for HA quorum */
con.setReplicationMode(ha_params);
...
con.setQuorum(0); /* change HA quorum at runtime */