[jboss-cvs] JBoss Messaging SVN: r1764 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/jms/crash tests/src/org/jboss/test/messaging/tools tests/src/org/jboss/test/messaging/tools/jmx tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 11 11:56:16 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-12-11 11:56:06 -0500 (Mon, 11 Dec 2006)
New Revision: 1764
Added:
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java
Log:
added a cluster event notification mechanism, various other tests and tweaks
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -22,6 +22,10 @@
package org.jboss.messaging.core.plugin;
import javax.management.ObjectName;
+import javax.management.NotificationListener;
+import javax.management.NotificationFilter;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
import javax.transaction.TransactionManager;
import org.jboss.jms.selector.SelectorFactory;
import org.jboss.jms.server.JMSConditionFactory;
@@ -103,6 +107,27 @@
return postOffice.getNodeIDView();
}
+ // NotificationBroadcaster implementation ------------------------
+
+ public void addNotificationListener(NotificationListener listener,
+ NotificationFilter filter,
+ Object object) throws IllegalArgumentException
+ {
+ postOffice.addNotificationListener(listener, filter, object);
+ }
+
+ public void removeNotificationListener(NotificationListener listener)
+ throws ListenerNotFoundException
+ {
+ postOffice.removeNotificationListener(listener);
+ }
+
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ return postOffice.getNotificationInfo();
+ }
+
+
// MBean attributes ----------------------------------------------
public synchronized ObjectName getServerPeer()
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -40,6 +40,8 @@
*/
public interface ClusteredPostOffice extends PostOffice, Peer
{
+ public static final String VIEW_CHANGED_NOTIFICATION = "VIEW_CHANGED";
+
/**
* Bind a queue to the post office under a specific condition
* such that it is available across the cluster
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -44,6 +44,12 @@
import javax.jms.TextMessage;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationListener;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationFilter;
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.logging.Logger;
@@ -82,7 +88,7 @@
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
- *
+ *
* A DefaultClusteredPostOffice
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -96,6 +102,8 @@
public class DefaultClusteredPostOffice extends DefaultPostOffice
implements ClusteredPostOffice, PostOfficeInternal, Replicator
{
+ // Constants -----------------------------------------------------
+
private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
// Key for looking up node id -> address info mapping from replicated data
@@ -104,22 +112,32 @@
// Key for looking up node id -> failed over for node id mapping from replicated data
public static final String FAILED_OVER_FOR_KEY = "failed_over_for";
- //Used for failure testing
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Used for failure testing
+
private boolean failBeforeCommit;
-
private boolean failAfterCommit;
-
private boolean failHandleResult;
- //End of failure testing attributes
+ // End of failure testing attributes
+
private boolean trace = log.isTraceEnabled();
+ private String groupName;
+
+ private boolean started;
+
+ private Element syncChannelConfigElement;
+ private String syncChannelConfig;
private Channel syncChannel;
+ private Element asyncChannelConfigElement;
+ private String asyncChannelConfig;
private Channel asyncChannel;
- private String groupName;
-
private MessageDispatcher controlMessageDispatcher;
private Object setStateLock = new Object();
@@ -139,14 +157,6 @@
private Set leftSet;
- private Element syncChannelConfigElement;
-
- private Element asyncChannelConfigElement;
-
- private String syncChannelConfig;
-
- private String asyncChannelConfig;
-
private long stateTimeout;
private long castTimeout;
@@ -167,10 +177,13 @@
private ReplicationListener nodeAddressMapListener;
- private boolean started;
-
- private QueuedExecutor viewExecutor;
+ private NotificationBroadcasterSupport nbSupport;
+ private QueuedExecutor viewExecutor;
+
+
+ // Constructors --------------------------------------------------
+
/*
* Constructor using Element for configuration
*/
@@ -197,8 +210,8 @@
throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
- rf, failoverMapper, statsSendPeriod);
+ pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
+ redistributionPolicy, rf, failoverMapper, statsSendPeriod);
this.syncChannelConfigElement = syncChannelConfig;
this.asyncChannelConfigElement = asyncChannelConfig;
@@ -229,8 +242,8 @@
long statsSendPeriod) throws Exception
{
this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
- pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
- rf, failoverMapper, statsSendPeriod);
+ pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
+ redistributionPolicy, rf, failoverMapper, statsSendPeriod);
this.syncChannelConfig = syncChannelConfig;
this.asyncChannelConfig = asyncChannelConfig;
@@ -285,12 +298,13 @@
failoverMap = new LinkedHashMap();
leftSet = new HashSet();
-
+
+ nbSupport = new NotificationBroadcasterSupport();
+
viewExecutor = new QueuedExecutor();
}
- // MessagingComponent overrides
- // --------------------------------------------------------------
+ // MessagingComponent overrides ----------------------------------
public synchronized void start() throws Exception
{
@@ -379,6 +393,26 @@
}
}
+ // NotificationBroadcaster implementation ------------------------
+
+ public void addNotificationListener(NotificationListener listener,
+ NotificationFilter filter,
+ Object object) throws IllegalArgumentException
+ {
+ nbSupport.addNotificationListener(listener, filter, object);
+ }
+
+ public void removeNotificationListener(NotificationListener listener)
+ throws ListenerNotFoundException
+ {
+ nbSupport.removeNotificationListener(listener);
+ }
+
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ return new MBeanNotificationInfo[0];
+ }
+
// Peer implementation -------------------------------------------
public Set getNodeIDView()
@@ -424,10 +458,7 @@
public Binding bindClusteredQueue(Condition condition, LocalClusteredQueue queue) throws Exception
{
- if (trace)
- {
- log.trace(this.currentNodeId + " binding clustered queue: " + queue + " with condition: " + condition);
- }
+ if (trace) { log.trace(this.currentNodeId + " binding clustered queue: " + queue + " with condition: " + condition); }
if (queue.getNodeId() != this.currentNodeId)
{
@@ -443,22 +474,9 @@
return binding;
}
- private void sendBindRequest(Condition condition, LocalClusteredQueue queue, Binding binding)
- throws Exception
- {
- BindRequest request =
- new BindRequest(this.currentNodeId, queue.getName(), condition.toText(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
- binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
-
- syncSendRequest(request);
- }
-
public Binding unbindClusteredQueue(String queueName) throws Throwable
{
- if (trace)
- {
- log.trace(this.currentNodeId + " unbind clustered queue: " + queueName);
- }
+ if (trace) { log.trace(this.currentNodeId + " unbind clustered queue: " + queueName); }
Binding binding = (Binding)super.unbindQueue(queueName);
@@ -469,274 +487,140 @@
return binding;
}
- public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
+ public Collection listAllBindingsForCondition(Condition condition) throws Exception
{
- if (trace)
- {
- log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
- }
+ return listBindingsForConditionInternal(condition, false);
+ }
- //debug
- try
- {
- TextMessage tm = (TextMessage)ref.getMessage();
-
- log.info(this.currentNodeId + " *********** Routing ref: " + tm.getText() + " with condition " + condition + " and transaction " + tx);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
- if (ref == null)
- {
- throw new IllegalArgumentException("Message reference is null");
- }
-
- if (condition == null)
- {
- throw new IllegalArgumentException("Condition is null");
- }
-
- boolean routed = false;
-
+ public Binding getBindingforChannelId(long channelId) throws Exception
+ {
lock.readLock().acquire();
try
{
- ClusteredBindings cb = (ClusteredBindings)conditionMap.get(condition);
+ //First look in the failed map
+ //Failed bindings are stored in the failed map by channel id
+ Map channelMap = (Map)failedBindings.get(new Integer(currentNodeId));
+ Binding binding = null;
+ if (channelMap != null)
+ {
+ binding = (Binding)channelMap.get(new Long(channelId));
+ }
- boolean startInternalTx = false;
-
- int lastNodeId = -1;
-
- if (cb != null)
+ if (binding == null)
{
- if (tx == null && ref.isReliable())
- {
- if (!(cb.getDurableCount() == 0 || (cb.getDurableCount() == 1 && cb.getLocalDurableCount() == 1)))
- {
- // When routing a persistent message without a transaction then we may need to start an
- // internal transaction in order to route it.
- // This is so we can guarantee the message is delivered to all or none of the subscriptions.
- // We need to do this if there is anything other than
- // No durable subs or exactly one local durable sub
- startInternalTx = true;
- if (trace)
- {
- log.trace(this.currentNodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
- }
- }
- }
+ //Not found in the failed map - look in the name map
+ Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
- if (startInternalTx)
+ if (nameMap != null)
{
- tx = tr.createTransaction();
- }
-
- int numberRemote = 0;
-
- Map queueNameNodeIdMap = null;
-
- long lastChannelId = -1;
-
- Collection routers = cb.getRouters();
-
- Iterator iter = routers.iterator();
-
- while (iter.hasNext())
- {
- ClusterRouter router = (ClusterRouter)iter.next();
-
- Delivery del = router.handle(null, ref, tx);
-
- if (del != null && del.isSelectorAccepted())
+ for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
{
- routed = true;
-
- ClusteredQueue queue = (ClusteredQueue)del.getObserver();
-
- if (trace)
+ Binding itemBinding = (Binding)iterbindings.next();
+ if (itemBinding.getQueue().getChannelID() == channelId)
{
- log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
- queue.getNodeId() + " local:" + queue.isLocal());
-
+ binding = itemBinding;
+ break;
}
-
- log.info(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
- queue.getNodeId() + " local:" + queue.isLocal());
-
- if (router.numberOfReceivers() > 1)
- {
- //We have now chosen which one will receive the message so we need to add this
- //information to a map which will get sent when casting - so the the queue
- //on the receiving node knows whether to receive the message
- if (queueNameNodeIdMap == null)
- {
- queueNameNodeIdMap = new HashMap();
- }
-
- queueNameNodeIdMap.put(queue.getName(), new Integer(queue.getNodeId()));
- }
-
- if (!queue.isLocal())
- {
- //We need to send the message remotely
- numberRemote++;
-
- lastNodeId = queue.getNodeId();
-
- lastChannelId = queue.getChannelID();
- }
}
}
-
- //Now we've sent the message to any local queues, we might also need
- //to send the message to the other office instances on the cluster if there are
- //queues on those nodes that need to receive the message
-
- //TODO - there is an innefficiency here, numberRemote does not take into account that more than one
- //of the number remote may be on the same node, so we could end up multicasting
- //when unicast would do
- if (numberRemote > 0)
+ else
{
- if (tx == null)
- {
- if (numberRemote == 1)
- {
- if (trace) { log.trace(this.currentNodeId + " unicasting message to " + lastNodeId); }
-
- //Unicast - only one node is interested in the message
- asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), null), lastNodeId);
- }
- else
- {
- if (trace) { log.trace(this.currentNodeId + " multicasting message to group"); }
-
- //Multicast - more than one node is interested
- asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), queueNameNodeIdMap));
- }
- }
- else
- {
- CastMessagesCallback callback = (CastMessagesCallback)tx.getCallback(this);
-
- if (callback == null)
- {
- callback = new CastMessagesCallback(currentNodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
-
- //This callback MUST be executed first
-
- //Execution order is as follows:
- //Before commit:
- //1. Cast messages across network - get added to holding area (if persistent) on receiving
- //nodes
- //2. Persist messages in persistent store
- //After commit
- //1. Cast commit message across network
- tx.addFirstCallback(callback, this);
- }
-
- callback.addMessage(condition, ref.getMessage(), queueNameNodeIdMap,
- numberRemote == 1 ? lastNodeId : -1,
- lastChannelId);
- }
+ log.info("nameMap is null");
}
-
- if (startInternalTx)
- {
- tx.commit();
- if (trace) { log.trace("Committed internal transaction"); }
- }
}
+ log.info("Returned " + binding);
+ return binding;
}
finally
{
lock.readLock().release();
}
-
- return routed;
}
- public boolean isLocal()
- {
- return false;
- }
+ // PostOfficeInternal implementation -----------------------------
- public Collection listAllBindingsForCondition(Condition condition) throws Exception
+ /*
+ * Called when another node adds a binding
+ */
+ public void addBindingFromCluster(int nodeId, String queueName, String conditionText,
+ String filterString, long channelID, boolean durable,
+ boolean failed)
+ throws Exception
{
- return listBindingsForConditionInternal(condition, false);
- }
+ lock.writeLock().acquire();
- public FailoverMapper getFailoverMapper()
- {
- return failoverMapper;
- }
+ if (trace)
+ {
+ log.info(this.currentNodeId + " adding binding from node: " + nodeId +
+ " queue: " + queueName + " with condition: " + conditionText);
+ }
- // Replicator implementation --------------------------------------------------------------------------
+ Condition condition = conditionFactory.createCondition(conditionText);
- public Map get(Serializable key) throws Exception
- {
- synchronized (replicatedData)
+ try
{
- Map m = (Map)replicatedData.get(key);
+ //Sanity
- return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
- }
- }
+ if (!knowAboutNodeId(nodeId))
+ {
+ throw new IllegalStateException("Don't know about node id: " + nodeId);
+ }
- public void put(Serializable key, Serializable replicant) throws Exception
- {
- putReplicantLocally(currentNodeId, key, replicant);
+ // We currently only allow one binding per name per node
+ Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
- PutReplicantRequest request = new PutReplicantRequest(currentNodeId, key, replicant);
+ Binding binding = null;
- syncSendRequest(request);
- }
+ if (nameMap != null)
+ {
+ binding = (Binding)nameMap.get(queueName);
+ }
- public boolean remove(Serializable key) throws Exception
- {
- if (removeReplicantLocally(this.currentNodeId, key))
- {
- RemoveReplicantRequest request = new RemoveReplicantRequest(this.currentNodeId, key);
+ if (binding != null && failed)
+ {
+ throw new IllegalArgumentException(this.currentNodeId +
+ " Binding already exists for node Id " + nodeId + " queue name " + queueName);
+ }
- syncSendRequest(request);
+ binding = this.createBinding(nodeId, condition, queueName, channelID, filterString,
+ durable, failed);
- return true;
+ addBinding(binding);
}
- else
+ finally
{
- return false;
+ lock.writeLock().release();
}
+
+ log.info("****** binding added");
}
- public void registerListener(ReplicationListener listener)
+ /*
+ * Called when another node removes a binding
+ */
+ public void removeBindingFromCluster(int nodeId, String queueName) throws Exception
{
- synchronized (replicationListeners)
+ lock.writeLock().acquire();
+
+ if (trace) { log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName); }
+
+ try
{
- if (replicationListeners.contains(listener))
+ // Sanity
+ if (!knowAboutNodeId(nodeId))
{
- throw new IllegalArgumentException("Listener " + listener + " is already registered");
+ throw new IllegalStateException("Don't know about node id: " + nodeId);
}
- replicationListeners.add(listener);
+
+ removeBinding(nodeId, queueName);
}
- }
-
- public void unregisterListener(ReplicationListener listener)
- {
- synchronized (replicationListeners)
+ finally
{
- boolean removed = replicationListeners.remove(listener);
-
- if (!removed)
- {
- throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
- }
+ lock.writeLock().release();
}
}
- // PostOfficeInternal implementation ------------------------------------------------------------------
-
public void handleNodeLeft(int nodeId) throws Exception
{
synchronized (leftSet)
@@ -752,7 +636,7 @@
throws Exception
{
log.info("##########putReplicantLocally received, before lock");
-
+
synchronized (replicatedData)
{
log.info("putReplicantLocally received, after lock");
@@ -769,7 +653,7 @@
notifyListeners(key, m, true, originatorNodeID);
}
-
+
log.info("putReplicantLocally, completed");
}
@@ -806,88 +690,6 @@
}
}
- /*
- * Called when another node adds a binding
- */
- public void addBindingFromCluster(int nodeId, String queueName, String conditionText,
- String filterString, long channelID, boolean durable, boolean failed)
- throws Exception
- {
- lock.writeLock().acquire();
-
- if (trace)
- {
- log.info(this.currentNodeId + " adding binding from node: " + nodeId +
- " queue: " + queueName + " with condition: " + conditionText);
- }
-
- Condition condition = conditionFactory.createCondition(conditionText);
-
- try
- {
- //Sanity
-
- if (!knowAboutNodeId(nodeId))
- {
- throw new IllegalStateException("Don't know about node id: " + nodeId);
- }
-
- // We currently only allow one binding per name per node
- Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-
- Binding binding = null;
-
- if (nameMap != null)
- {
- binding = (Binding)nameMap.get(queueName);
- }
-
- if (binding != null && failed)
- {
- throw new IllegalArgumentException(this.currentNodeId +
- " Binding already exists for node Id " + nodeId + " queue name " + queueName);
- }
-
- binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable, failed);
-
- addBinding(binding);
- }
- finally
- {
- lock.writeLock().release();
- }
-
- log.info("****** binding added");
- }
-
- /*
- * Called when another node removes a binding
- */
- public void removeBindingFromCluster(int nodeId, String queueName) throws Exception
- {
- lock.writeLock().acquire();
-
- if (trace)
- {
- log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName);
- }
-
- try
- {
- // Sanity
- if (!knowAboutNodeId(nodeId))
- {
- throw new IllegalStateException("Don't know about node id: " + nodeId);
- }
-
- removeBinding(nodeId, queueName);
- }
- finally
- {
- lock.writeLock().release();
- }
- }
-
public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKeyText,
Map queueNameNodeIdMap) throws Exception
{
@@ -901,9 +703,9 @@
routingKeyText + " map " + queueNameNodeIdMap);
Condition routingKey = conditionFactory.createCondition(routingKeyText);
-
- lock.readLock().acquire();
+ lock.readLock().acquire();
+
// Need to reference the message
MessageReference ref = null;
try
@@ -1063,62 +865,83 @@
if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
}
- /**
- * Check for any transactions that need to be committed or rolled back
- */
- public void check(Integer nodeId) throws Throwable
+ public void updateQueueStats(int nodeId, List statsList) throws Exception
{
- if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
+ lock.readLock().acquire();
- synchronized (holdingArea)
+ if (trace) { log.trace(this.currentNodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
+
+ try
{
- Iterator iter = holdingArea.entrySet().iterator();
+ if (nodeId == this.currentNodeId)
+ {
+ //Sanity check
+ throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
+ }
- List toRemove = new ArrayList();
+ Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
- while (iter.hasNext())
+ if (nameMap == null)
{
- Map.Entry entry = (Map.Entry)iter.next();
+ //This is ok, the node might have left
+ if (trace) { log.trace(this.currentNodeId + " cannot find node in name map, i guess the node might have left?"); }
+ }
+ else
+ {
+ Iterator iter = statsList.iterator();
- TransactionId id = (TransactionId)entry.getKey();
-
- if (id.getNodeId() == nodeId.intValue())
+ while (iter.hasNext())
{
- ClusterTransaction tx = (ClusterTransaction)entry.getValue();
+ QueueStats st = (QueueStats)iter.next();
- if (trace) { log.trace("Found transaction " + tx + " in holding area"); }
+ Binding bb = (Binding)nameMap.get(st.getQueueName());
- boolean commit = tx.check(this);
-
- if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
-
- if (commit)
+ if (bb == null)
{
- tx.commit(this);
+ //I guess this is possible if the queue was unbound
+ if (trace) { log.trace(this.currentNodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
}
else
{
- tx.rollback(this);
- }
+ RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
- toRemove.add(id);
+ stub.setStats(st);
- if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
- }
- }
+ if (trace) { log.trace(this.currentNodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
- //Remove the transactions from the holding area
+ ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
- iter = toRemove.iterator();
+ //Maybe the local queue now wants to pull message(s) from the remote queue given that the
+ //stats for the remote queue have changed
+ LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
- while (iter.hasNext())
- {
- TransactionId id = (TransactionId)iter.next();
+ if (localQueue!=null)
+ {
+ //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
+ RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
- holdingArea.remove(id);
+ if (trace) { log.trace(this.currentNodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+
+ localQueue.setPullQueue(toQueue);
+
+ if (toQueue != null && localQueue.getRefCount() == 0)
+ {
+ //We now trigger delivery - this may cause a pull event
+ //We only do this if there are no refs in the local queue
+
+ localQueue.deliver(false);
+
+ if (trace) { log.trace(this.currentNodeId + " triggered delivery for " + localQueue.getName()); }
+ }
+ }
+ }
+ }
}
}
- if (trace) { log.trace(this.currentNodeId + " check complete"); }
+ finally
+ {
+ lock.readLock().release();
+ }
}
public void sendQueueStats() throws Exception
@@ -1180,91 +1003,11 @@
}
}
- public void updateQueueStats(int nodeId, List statsList) throws Exception
- {
- lock.readLock().acquire();
-
- if (trace) { log.trace(this.currentNodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
-
- try
- {
- if (nodeId == this.currentNodeId)
- {
- //Sanity check
- throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
- }
-
- Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-
- if (nameMap == null)
- {
- //This is ok, the node might have left
- if (trace) { log.trace(this.currentNodeId + " cannot find node in name map, i guess the node might have left?"); }
- }
- else
- {
- Iterator iter = statsList.iterator();
-
- while (iter.hasNext())
- {
- QueueStats st = (QueueStats)iter.next();
-
- Binding bb = (Binding)nameMap.get(st.getQueueName());
-
- if (bb == null)
- {
- //I guess this is possible if the queue was unbound
- if (trace) { log.trace(this.currentNodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
- }
- else
- {
- RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
-
- stub.setStats(st);
-
- if (trace) { log.trace(this.currentNodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
-
- ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
-
- //Maybe the local queue now wants to pull message(s) from the remote queue given that the
- //stats for the remote queue have changed
- LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
-
- if (localQueue!=null)
- {
- //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
- RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
-
- if (trace) { log.trace(this.currentNodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
-
- localQueue.setPullQueue(toQueue);
-
- if (toQueue != null && localQueue.getRefCount() == 0)
- {
- //We now trigger delivery - this may cause a pull event
- //We only do this if there are no refs in the local queue
-
- localQueue.deliver(false);
-
- if (trace) { log.trace(this.currentNodeId + " triggered delivery for " + localQueue.getName()); }
- }
- }
- }
- }
- }
- }
- finally
- {
- lock.readLock().release();
- }
- }
-
public boolean referenceExistsInStorage(long channelID, long messageID) throws Exception
{
return pm.referenceExists(channelID, messageID);
}
-
public void handleMessagePullResult(int remoteNodeId, long holdingTxId,
String queueName, org.jboss.messaging.core.Message message) throws Throwable
{
@@ -1322,320 +1065,348 @@
}
}
- public int getNodeId()
+ // Replicator implementation -------------------------------------
+
+ public void put(Serializable key, Serializable replicant) throws Exception
{
- return currentNodeId;
+ putReplicantLocally(currentNodeId, key, replicant);
+
+ PutReplicantRequest request = new PutReplicantRequest(currentNodeId, key, replicant);
+
+ syncSendRequest(request);
}
- public String toString()
+ public Map get(Serializable key) throws Exception
{
- StringBuffer sb = new StringBuffer("ClusteredPostOffice[");
- sb.append(currentNodeId).append(":").append(getOfficeName()).append(":");
+ synchronized (replicatedData)
+ {
+ Map m = (Map)replicatedData.get(key);
- if (syncChannel == null)
- {
- sb.append("UNINITIALIZED");
+ return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
}
- else
- {
- Address addr = syncChannel.getLocalAddress();
- if (addr == null)
- {
- sb.append("UNCONNECTED");
- }
- else
- {
- sb.append(addr);
- }
- }
-
- sb.append("]");
- return sb.toString();
}
- // Public ------------------------------------------------------------------------------------------
-
- //MUST ONLY be used for testing
- public int getNumberOfNodesInCluster()
+ public boolean remove(Serializable key) throws Exception
{
- if (currentView != null)
+ if (removeReplicantLocally(this.currentNodeId, key))
{
- return currentView.size();
+ RemoveReplicantRequest request = new RemoveReplicantRequest(this.currentNodeId, key);
+
+ syncSendRequest(request);
+
+ return true;
}
else
{
- return 0;
+ return false;
}
}
- //MUST ONLY be used for testing
- public void setFail(boolean beforeCommit, boolean afterCommit, boolean handleResult)
+ public void registerListener(ReplicationListener listener)
{
- this.failBeforeCommit = beforeCommit;
- this.failAfterCommit = afterCommit;
- this.failHandleResult = handleResult;
- }
-
- //MUST ONLY be used for testing
- public Collection getHoldingTransactions()
- {
- return holdingArea.values();
- }
-
-
- /**
- * Verifies changes on the View deciding if a node joined or left the cluster
- *
- * */
- private void verifyMembership(View oldView, View newView) throws Throwable
- {
- if (oldView != null)
+ synchronized (replicationListeners)
{
- for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ if (replicationListeners.contains(listener))
{
- Address address = (Address)i.next();
- if (!newView.containsMember(address))
- {
- nodeLeft(address);
- }
+ throw new IllegalArgumentException("Listener " + listener + " is already registered");
}
+ replicationListeners.add(listener);
}
+ }
- for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ public void unregisterListener(ReplicationListener listener)
+ {
+ synchronized (replicationListeners)
{
- Address address = (Address)i.next();
- if (oldView == null || !oldView.containsMember(address))
+ boolean removed = replicationListeners.remove(listener);
+
+ if (!removed)
{
- nodeJoined(address);
+ throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
}
}
}
- /**
- * This method fails over all the queues from node <failedNodeId> onto this node
- * It is triggered when a JGroups view change occurs due to a member leaving and
- * it's determined the member didn't leave cleanly
- *
- * @param failedNodeId
- * @throws Exception
- */
- private void failOver(int failedNodeId) throws Exception
+ public FailoverMapper getFailoverMapper()
{
- //Need to lock
- lock.writeLock().acquire();
+ return failoverMapper;
+ }
+ // Public --------------------------------------------------------
+
+ public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
+ {
+ if (trace) { log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx); }
+
+ //debug
try
{
- log.info(this.currentNodeId + " is performing failover for node " + failedNodeId);
+ TextMessage tm = (TextMessage)ref.getMessage();
- /*
- We make sure a FailoverStatus object is put in the replicated data for the node
- The real failover node will always add this in.
- This means that each node knows which node has really started the failover for another node, and
- which node did failover for other nodes in the past
- We cannot rely on the failoverMap for this, since that will regenerated once failover is done,
- because of the change in membership.
- And clients may failover after that and need to know if they have the correct node.
- Since this is the first thing we do after detecting failover, it should be very quick that
- all nodes know, however there is still a chance that a client tries to failover before
- the information is replicated.
- */
+ log.info(this.currentNodeId + " *********** Routing ref: " + tm.getText() + " with condition " + condition + " and transaction " + tx);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
- Map replicants = (Map)get(FAILED_OVER_FOR_KEY);
+ if (ref == null)
+ {
+ throw new IllegalArgumentException("Message reference is null");
+ }
- FailoverStatus status = (FailoverStatus)replicants.get(new Integer(currentNodeId));
+ if (condition == null)
+ {
+ throw new IllegalArgumentException("Condition is null");
+ }
- if (status == null)
- {
- status = new FailoverStatus();
- }
+ boolean routed = false;
- status.startFailingOverForNode(failedNodeId);
+ lock.readLock().acquire();
- log.info("Putting state that failover is starting");
-
- put(FAILED_OVER_FOR_KEY, status);
+ try
+ {
+ ClusteredBindings cb = (ClusteredBindings)conditionMap.get(condition);
- log.info("Put state that failover is starting");
+ boolean startInternalTx = false;
- //Get the map of queues for the failed node
+ int lastNodeId = -1;
- Map subMaps = (Map)nameMaps.get(new Integer(failedNodeId));
- if (subMaps==null || subMaps.size()==0)
+ if (cb != null)
{
- log.warn("Couldn't find any binding to failOver from serverId=" +failedNodeId);
- return;
- }
+ if (tx == null && ref.isReliable())
+ {
+ if (!(cb.getDurableCount() == 0 || (cb.getDurableCount() == 1 && cb.getLocalDurableCount() == 1)))
+ {
+ // When routing a persistent message without a transaction then we may need to
+ // start an internal transaction in order to route it. This is so we can guarantee
+ // the message is delivered to all or none of the subscriptions. We need to do
+ // this if there is anything other than. No durable subs or exactly one local
+ // durable sub.
- //Compile a list of the queue names to remove
- //Note that any non durable bindings will already have been removed (in removeDataForNode()) when the
- //node leave was detected, so if there are any non durable bindings left here then
- //this is an error
+ startInternalTx = true;
- //We iterate through twice to avoid ConcurrentModificationException
- ArrayList namesToRemove = new ArrayList();
- for (Iterator iterNames = subMaps.entrySet().iterator(); iterNames.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterNames.next();
-
- Binding binding = (Binding )entry.getValue();
-
- //Sanity check
- if (!binding.getQueue().isRecoverable())
- {
- throw new IllegalStateException("Find non recoverable queue in map, these should have been removed!");
+ if (trace) { log.trace(this.currentNodeId + " Starting internal transaction since more than one durable sub or remote durable subs"); }
+ }
}
- //Sanity check
- if (!binding.getQueue().isClustered())
+ if (startInternalTx)
{
- throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
+ tx = tr.createTransaction();
}
- ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
+ int numberRemote = 0;
- //Sanity check
- if (queue.isLocal())
+ Map queueNameNodeIdMap = null;
+
+ long lastChannelId = -1;
+
+ Collection routers = cb.getRouters();
+
+ Iterator iter = routers.iterator();
+
+ while (iter.hasNext())
{
- throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
- }
- namesToRemove.add(entry);
- }
+ ClusterRouter router = (ClusterRouter)iter.next();
- log.info("Deleting " + namesToRemove.size() + " bindings from old node");
+ Delivery del = router.handle(null, ref, tx);
- for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterNames.next();
+ if (del != null && del.isSelectorAccepted())
+ {
+ routed = true;
- Binding binding = (Binding)entry.getValue();
+ ClusteredQueue queue = (ClusteredQueue)del.getObserver();
- RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+ if (trace) { log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " + queue.getNodeId() + " local:" + queue.isLocal()); }
- String queueName = (String)entry.getKey();
+ log.info(this.currentNodeId + " Routing message to queue or stub:" +
+ queue.getName() + " on node " + queue.getNodeId() + " local:" +
+ queue.isLocal());
- //First the binding is removed from the in memory condition and name maps
- this.removeBinding(failedNodeId, queueName);
+ if (router.numberOfReceivers() > 1)
+ {
+ //We have now chosen which one will receive the message so we need to add this
+ //information to a map which will get sent when casting - so the the queue
+ //on the receiving node knows whether to receive the message
+ if (queueNameNodeIdMap == null)
+ {
+ queueNameNodeIdMap = new HashMap();
+ }
- //Then deleted from the database
- this.deleteBinding(failedNodeId, queueName);
+ queueNameNodeIdMap.put(queue.getName(), new Integer(queue.getNodeId()));
+ }
- log.info("deleted binding for " + queueName);
-
- //Note we do not need to send an unbind request across the cluster - this is because
- //when the node crashes a view change will hit the other nodes and that will cause
- //all binding data for that node to be removed anyway
+ if (!queue.isLocal())
+ {
+ //We need to send the message remotely
+ numberRemote++;
- //If there is already a queue registered with the same name, then we set a flag "failed" on the
- //binding and then the queue will go into a special list of failed bindings
- //otherwise we treat at as a normal queue
- //This is because we cannot deal with more than one queue with the same name
- //Any new consumers will always only connect to queues in the main name map
- //This may mean that queues in the failed map have messages stranded in them if consumers
- //disconnect (since no more can reconnect)
- //However we message redistribution activated other queues will be able to consume from them.
- //TODO allow message redistribution for queues in the failed list
- boolean failed = this.internalGetBindingForQueueName(queueName) != null;
+ lastNodeId = queue.getNodeId();
- if (!failed)
- {
- log.info("The current node didn't have a queue " + queueName + " so it's assuming the queue as a regular queue");
+ lastChannelId = queue.getChannelID();
+ }
+ }
}
- else
- {
- log.info("There is already a queue with that name so adding to failed map");
- }
- //Create a new binding
- Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
- stub.getName(), stub.getChannelID(),
- stub.getFilter(), stub.isRecoverable(), failed);
+ //Now we've sent the message to any local queues, we might also need
+ //to send the message to the other office instances on the cluster if there are
+ //queues on those nodes that need to receive the message
- log.info("Created new binding");
+ //TODO - there is an innefficiency here, numberRemote does not take into account that more than one
+ //of the number remote may be on the same node, so we could end up multicasting
+ //when unicast would do
+ if (numberRemote > 0)
+ {
+ if (tx == null)
+ {
+ if (numberRemote == 1)
+ {
+ if (trace) { log.trace(this.currentNodeId + " unicasting message to " + lastNodeId); }
- //Insert it into the database
- insertBinding(newBinding);
+ //Unicast - only one node is interested in the message
+ asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), null), lastNodeId);
+ }
+ else
+ {
+ if (trace) { log.trace(this.currentNodeId + " multicasting message to group"); }
- LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+ //Multicast - more than one node is interested
+ asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), queueNameNodeIdMap));
+ }
+ }
+ else
+ {
+ CastMessagesCallback callback = (CastMessagesCallback)tx.getCallback(this);
- clusteredQueue.deactivate();
- clusteredQueue.load();
- clusteredQueue.activate();
+ if (callback == null)
+ {
+ callback = new CastMessagesCallback(currentNodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
- log.info("Loaded queue");
+ //This callback MUST be executed first
- //Add the new binding in memory
- addBinding(newBinding);
+ //Execution order is as follows:
+ //Before commit:
+ //1. Cast messages across network - get added to holding area (if persistent) on receiving
+ //nodes
+ //2. Persist messages in persistent store
+ //After commit
+ //1. Cast commit message across network
+ tx.addFirstCallback(callback, this);
+ }
- //Send a bind request so other nodes add it too
- sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+ callback.addMessage(condition, ref.getMessage(), queueNameNodeIdMap,
+ numberRemote == 1 ? lastNodeId : -1,
+ lastChannelId);
+ }
+ }
- //FIXME there is a problem in the above code.
- //If the server crashes between deleting the binding from the database
- //and creating the new binding in the database, then the binding will be completely
- //lost from the database when the server is resurrected.
- //To remedy, both db operations need to be done in the same JBDC tx
+ if (startInternalTx)
+ {
+ tx.commit();
+ if (trace) { log.trace("Committed internal transaction"); }
+ }
}
-
- log.info("Server side fail over is now complete");
-
- //TODO - should this be in a finally? I'm not sure
- status.finishFailingOver();
-
- log.info("Putting state that failover has completed");
- put(FAILED_OVER_FOR_KEY, status);
- log.info("Put state that failover has completed");
}
finally
{
- lock.writeLock().release();
+ lock.readLock().release();
}
+
+ return routed;
}
- public Binding getBindingforChannelId(long channelId) throws Exception
+ public boolean isLocal()
{
- lock.readLock().acquire();
+ return false;
+ }
- try
+ /**
+ * Check for any transactions that need to be committed or rolled back
+ */
+ public void check(Integer nodeId) throws Throwable
+ {
+ if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
+
+ synchronized (holdingArea)
{
- //First look in the failed map
- //Failed bindings are stored in the failed map by channel id
- Map channelMap = (Map)failedBindings.get(new Integer(currentNodeId));
- Binding binding = null;
- if (channelMap != null)
- {
- binding = (Binding)channelMap.get(new Long(channelId));
- }
+ Iterator iter = holdingArea.entrySet().iterator();
- if (binding == null)
+ List toRemove = new ArrayList();
+
+ while (iter.hasNext())
{
- //Not found in the failed map - look in the name map
- Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
+ Map.Entry entry = (Map.Entry)iter.next();
- if (nameMap != null)
+ TransactionId id = (TransactionId)entry.getKey();
+
+ if (id.getNodeId() == nodeId.intValue())
{
- for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
+ ClusterTransaction tx = (ClusterTransaction)entry.getValue();
+
+ if (trace) { log.trace("Found transaction " + tx + " in holding area"); }
+
+ boolean commit = tx.check(this);
+
+ if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
+
+ if (commit)
{
- Binding itemBinding = (Binding)iterbindings.next();
- if (itemBinding.getQueue().getChannelID() == channelId)
- {
- binding = itemBinding;
- break;
- }
+ tx.commit(this);
}
+ else
+ {
+ tx.rollback(this);
+ }
+
+ toRemove.add(id);
+
+ if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
}
- else
- {
- log.info("nameMap is null");
- }
}
- log.info("Returned " + binding);
- return binding;
+
+ //Remove the transactions from the holding area
+
+ iter = toRemove.iterator();
+
+ while (iter.hasNext())
+ {
+ TransactionId id = (TransactionId)iter.next();
+
+ holdingArea.remove(id);
+ }
}
- finally
+ if (trace) { log.trace(this.currentNodeId + " check complete"); }
+ }
+
+ public int getNodeId()
+ {
+ return currentNodeId;
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer("ClusteredPostOffice[");
+ sb.append(currentNodeId).append(":").append(getOfficeName()).append(":");
+
+ if (syncChannel == null)
{
- lock.readLock().release();
+ sb.append("UNINITIALIZED");
}
+ else
+ {
+ Address addr = syncChannel.getLocalAddress();
+ if (addr == null)
+ {
+ sb.append("UNCONNECTED");
+ }
+ else
+ {
+ sb.append(addr);
+ }
+ }
+
+ sb.append("]");
+ return sb.toString();
}
public String printBindingInformation()
@@ -1747,10 +1518,27 @@
return buffer.toString();
}
+ /**
+ * MUST ONLY be used for testing!
+ */
+ public void setFail(boolean beforeCommit, boolean afterCommit, boolean handleResult)
+ {
+ this.failBeforeCommit = beforeCommit;
+ this.failAfterCommit = afterCommit;
+ this.failHandleResult = handleResult;
+ }
+ /**
+ * MUST ONLY be used for testing!
+ */
+ public Collection getHoldingTransactions()
+ {
+ return holdingArea.values();
+ }
- // Protected ---------------------------------------------------------------------------------------
+ // Package protected ---------------------------------------------
+ // Protected -----------------------------------------------------
protected void addToNameMap(Binding binding)
{
@@ -1764,21 +1552,6 @@
}
}
- private void addIntoFailedMaps(Binding binding)
- {
- Map channelMap = (Map)failedBindings.get(new Integer(binding.getNodeId()));
-
- if (channelMap == null)
- {
- channelMap = new LinkedHashMap();
-
- failedBindings.put(new Integer(binding.getNodeId()), channelMap);
- }
-
- channelMap.put(new Long(binding.getQueue().getChannelID()), binding);
- }
-
-
protected void addToConditionMap(Binding binding)
{
Condition condition = binding.getCondition();
@@ -1905,8 +1678,18 @@
return new DefaultBinding(nodeId, condition, queue, failed);
}
- // Private ------------------------------------------------------------------------------------------
+ // Private -------------------------------------------------------
+ private void sendBindRequest(Condition condition, LocalClusteredQueue queue, Binding binding)
+ throws Exception
+ {
+ BindRequest request =
+ new BindRequest(this.currentNodeId, queue.getName(), condition.toText(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
+ binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
+
+ syncSendRequest(request);
+ }
+
private boolean leaveMessageReceived(Integer nodeId) throws Exception
{
synchronized (leftSet)
@@ -2168,7 +1951,7 @@
BindingInfo info = (BindingInfo)iter.next();
Condition condition = conditionFactory.createCondition(info.getConditionText());
-
+
Binding binding = this.createBinding(info.getNodeId(), condition, info.getQueueName(), info.getChannelId(),
info.getFilterString(), info.isDurable(),info.isFailed());
@@ -2273,9 +2056,6 @@
}
}
}
-
-
-
/*
* A new node has joined the group
*/
@@ -2312,58 +2092,311 @@
check(theNodeId);
synchronized (failoverMap)
- {
+ {
//Need to evaluate this before we regenerate the failover map
- Integer failoverNode = (Integer)failoverMap.get(theNodeId);
-
+ Integer failoverNode = (Integer)failoverMap.get(theNodeId);
+
if (failoverNode == null)
{
throw new IllegalStateException("Cannot find failover node for node " + theNodeId);
}
-
+
//debug dump failover map
-
+
Iterator iter = failoverMap.entrySet().iterator();
-
+
log.info("Dumping failover map");
while (iter.hasNext())
{
Map.Entry entry = (Map.Entry)iter.next();
-
+
Integer nodeId = (Integer)entry.getKey();
-
+
Integer failoverNodeId = (Integer)entry.getValue();
-
+
log.info("node->failover node: " + nodeId + " --> " + failoverNodeId);
}
log.info("end dump");
-
+
//end debug
-
+
boolean isFailover = failoverNode.intValue() == this.currentNodeId;
-
+
log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
-
+
log.info("Crashed: " + crashed);
-
+
//Remove any replicant data and non durable bindings for the node - again we need to do this
//irrespective of whether we crashed
//This will notify any listeners which will recalculate the connection factory delegates and failover delegates
removeDataForNode(theNodeId);
-
+
if (crashed && isFailover)
{
//The node crashed and we are the failover node
//so let's perform failover
-
+
//TODO server side valve
-
+
failOver(theNodeId.intValue());
}
}
}
+ /**
+ * Verifies changes on the View deciding if a node joined or left the cluster.
+ */
+ private void verifyMembership(View oldView, View newView) throws Throwable
+ {
+ if (oldView != null)
+ {
+ for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (!newView.containsMember(address))
+ {
+ nodeLeft(address);
+ }
+ }
+ }
+ for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (oldView == null || !oldView.containsMember(address))
+ {
+ nodeJoined(address);
+ }
+ }
+ }
+
+ /**
+ * This method fails over all the queues from node <failedNodeId> onto this node
+ * It is triggered when a JGroups view change occurs due to a member leaving and
+ * it's determined the member didn't leave cleanly
+ *
+ * @param failedNodeId
+ * @throws Exception
+ */
+ private void failOver(int failedNodeId) throws Exception
+ {
+ //Need to lock
+ lock.writeLock().acquire();
+
+ try
+ {
+ log.info(this.currentNodeId + " is performing failover for node " + failedNodeId);
+
+ /*
+ We make sure a FailoverStatus object is put in the replicated data for the node
+ The real failover node will always add this in.
+ This means that each node knows which node has really started the failover for another node, and
+ which node did failover for other nodes in the past
+ We cannot rely on the failoverMap for this, since that will regenerated once failover is done,
+ because of the change in membership.
+ And clients may failover after that and need to know if they have the correct node.
+ Since this is the first thing we do after detecting failover, it should be very quick that
+ all nodes know, however there is still a chance that a client tries to failover before
+ the information is replicated.
+ */
+
+ Map replicants = (Map)get(FAILED_OVER_FOR_KEY);
+
+ FailoverStatus status = (FailoverStatus)replicants.get(new Integer(currentNodeId));
+
+ if (status == null)
+ {
+ status = new FailoverStatus();
+ }
+
+ status.startFailingOverForNode(failedNodeId);
+
+ log.info("Putting state that failover is starting");
+
+ put(FAILED_OVER_FOR_KEY, status);
+
+ log.info("Put state that failover is starting");
+
+ //Get the map of queues for the failed node
+
+ Map subMaps = (Map)nameMaps.get(new Integer(failedNodeId));
+ if (subMaps==null || subMaps.size()==0)
+ {
+ log.warn("Couldn't find any binding to failOver from serverId=" +failedNodeId);
+ return;
+ }
+
+ //Compile a list of the queue names to remove
+ //Note that any non durable bindings will already have been removed (in removeDataForNode()) when the
+ //node leave was detected, so if there are any non durable bindings left here then
+ //this is an error
+
+ //We iterate through twice to avoid ConcurrentModificationException
+ ArrayList namesToRemove = new ArrayList();
+ for (Iterator iterNames = subMaps.entrySet().iterator(); iterNames.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)iterNames.next();
+
+ Binding binding = (Binding )entry.getValue();
+
+ //Sanity check
+ if (!binding.getQueue().isRecoverable())
+ {
+ throw new IllegalStateException("Find non recoverable queue in map, these should have been removed!");
+ }
+
+ //Sanity check
+ if (!binding.getQueue().isClustered())
+ {
+ throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
+ }
+
+ ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
+
+ //Sanity check
+ if (queue.isLocal())
+ {
+ throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
+ }
+ namesToRemove.add(entry);
+ }
+
+ log.info("Deleting " + namesToRemove.size() + " bindings from old node");
+
+ for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry)iterNames.next();
+
+ Binding binding = (Binding)entry.getValue();
+
+ RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+
+ String queueName = (String)entry.getKey();
+
+ //First the binding is removed from the in memory condition and name maps
+ this.removeBinding(failedNodeId, queueName);
+
+ //Then deleted from the database
+ this.deleteBinding(failedNodeId, queueName);
+
+ log.info("deleted binding for " + queueName);
+
+ //Note we do not need to send an unbind request across the cluster - this is because
+ //when the node crashes a view change will hit the other nodes and that will cause
+ //all binding data for that node to be removed anyway
+
+ //If there is already a queue registered with the same name, then we set a flag "failed" on the
+ //binding and then the queue will go into a special list of failed bindings
+ //otherwise we treat at as a normal queue
+ //This is because we cannot deal with more than one queue with the same name
+ //Any new consumers will always only connect to queues in the main name map
+ //This may mean that queues in the failed map have messages stranded in them if consumers
+ //disconnect (since no more can reconnect)
+ //However we message redistribution activated other queues will be able to consume from them.
+ //TODO allow message redistribution for queues in the failed list
+ boolean failed = this.internalGetBindingForQueueName(queueName) != null;
+
+ if (!failed)
+ {
+ log.info("The current node didn't have a queue " + queueName + " so it's assuming the queue as a regular queue");
+ }
+ else
+ {
+ log.info("There is already a queue with that name so adding to failed map");
+ }
+
+ //Create a new binding
+ Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
+ stub.getName(), stub.getChannelID(),
+ stub.getFilter(), stub.isRecoverable(), failed);
+
+ log.info("Created new binding");
+
+ //Insert it into the database
+ insertBinding(newBinding);
+
+ LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+
+ clusteredQueue.deactivate();
+ clusteredQueue.load();
+ clusteredQueue.activate();
+
+ log.info("Loaded queue");
+
+ //Add the new binding in memory
+ addBinding(newBinding);
+
+ //Send a bind request so other nodes add it too
+ sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+
+ //FIXME there is a problem in the above code.
+ //If the server crashes between deleting the binding from the database
+ //and creating the new binding in the database, then the binding will be completely
+ //lost from the database when the server is resurrected.
+ //To remedy, both db operations need to be done in the same JBDC tx
+ }
+
+ log.info("Server side fail over is now complete");
+
+ //TODO - should this be in a finally? I'm not sure
+ status.finishFailingOver();
+
+ log.info("Putting state that failover has completed");
+ put(FAILED_OVER_FOR_KEY, status);
+ log.info("Put state that failover has completed");
+ }
+ finally
+ {
+ lock.writeLock().release();
+ }
+ }
+
+ private void addIntoFailedMaps(Binding binding)
+ {
+ Map channelMap = (Map)failedBindings.get(new Integer(binding.getNodeId()));
+
+ if (channelMap == null)
+ {
+ channelMap = new LinkedHashMap();
+
+ failedBindings.put(new Integer(binding.getNodeId()), channelMap);
+ }
+
+ channelMap.put(new Long(binding.getQueue().getChannelID()), binding);
+ }
+
+ private void sendJMXNotification(String notificationType)
+ {
+ Notification n = new Notification(notificationType, "", 0l);
+ nbSupport.sendNotification(n);
+ }
+
+ private void handleViewAccepted(View newView)
+ {
+ //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
+ //TODO: can't we do the same since this is pretty useful?
+ log.info(currentNodeId + " got new view: " + newView + " postOffice:"
+ + DefaultClusteredPostOffice.this.getOfficeName());
+
+ // JGroups will make sure this method is never called by more than one thread concurrently
+
+ View oldView = currentView;
+ currentView = newView;
+
+ try
+ {
+ verifyMembership(oldView, newView);
+ sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ }
+
// Inner classes -------------------------------------------------------------------
/*
@@ -2462,10 +2495,11 @@
{
try
{
- //We queue up changes and execute them asynchronously.
- //This is because JGroups will not let us do stuff like send synch messages
- //using the same thread that delivered the view change and this is what we need to
- //do in failover, for example.
+ // We queue up changes and execute them asynchronously.
+ // This is because JGroups will not let us do stuff like send synch messages using the
+ // same thread that delivered the view change and this is what we need to do in
+ // failover, for example.
+
viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
}
catch (InterruptedException e)
@@ -2480,41 +2514,16 @@
return null;
}
}
-
- private void handleViewAccepted(View newView)
- {
- //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
- //TODO: can't we do the same since this is pretty useful?
- log.info(currentNodeId + " got new view: " + newView + " postOffice:"
- + DefaultClusteredPostOffice.this.getOfficeName());
- // JGroups will make sure this method is never called by more than one thread concurrently
-
- View oldView = currentView;
- currentView = newView;
-
- try
- {
- verifyMembership(oldView, newView);
- }
- catch (Throwable e)
- {
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- }
-
private class HandleViewAcceptedRunnable implements Runnable
{
private View newView;
-
+
HandleViewAcceptedRunnable(View newView)
{
this.newView = newView;
}
-
+
public void run()
{
handleViewAccepted(newView);
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -6,8 +6,10 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
+import javax.management.NotificationBroadcaster;
import java.util.Set;
+
/**
* Group management interface.
*
@@ -15,10 +17,12 @@
* @version <tt>$Revision$</tt>
* $Id$
*/
-public interface Peer
+public interface Peer extends NotificationBroadcaster
{
/**
* Returns a set of nodeIDs (integers) representing the IDs of cluster's nodes.
*/
Set getNodeIDView();
+
+
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -8,9 +8,15 @@
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import javax.management.NotificationListener;
+import javax.management.Notification;
+import javax.management.ObjectName;
import java.util.Set;
+import EDU.oswego.cs.dl.util.concurrent.Slot;
+
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version <tt>$Revision$</tt>
@@ -51,6 +57,44 @@
}
}
+ public void testJoinNotification() throws Exception
+ {
+ ViewChangeNotificationListener listener = new ViewChangeNotificationListener();
+ ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
+
+ try
+ {
+ ServerManagement.start("all", 0);
+
+ log.info("Server 0 started");
+
+ ServerManagement.addNotificationListener(0, postOfficeObjectName, listener);
+
+ log.info("NotificationListener added to server 0");
+
+ ServerManagement.start("all", 1);
+
+ log.info("Blocking to receive notification ...");
+
+ if (!listener.viewChanged(120000))
+ {
+ fail("Did not receive view change!");
+ }
+
+ Set view = ServerManagement.getServer(1).getNodeIDView();
+
+ assertEquals(2, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ assertTrue(view.contains(new Integer(1)));
+ }
+ finally
+ {
+ ServerManagement.removeNotificationListener(0, postOfficeObjectName, listener);
+ ServerManagement.stop(1);
+ ServerManagement.stop(0);
+ }
+ }
+
public void testTwoNodesCluster() throws Exception
{
try
@@ -146,11 +190,196 @@
public void testCleanLeave() throws Exception
{
+ try
+ {
+ // Start with a 3 node cluster
+ ServerManagement.start("all", 0);
+ ServerManagement.start("all", 1);
+ ServerManagement.start("all", 2);
+
+ Set view = ServerManagement.getServer(0).getNodeIDView();
+
+ assertEquals(3, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ assertTrue(view.contains(new Integer(1)));
+ assertTrue(view.contains(new Integer(2)));
+
+ // Make node 0 to "cleanly" leave the cluster
+
+ ServerManagement.stop(0);
+
+ view = ServerManagement.getServer(1).getNodeIDView();
+
+ assertEquals(2, view.size());
+ assertTrue(view.contains(new Integer(1)));
+ assertTrue(view.contains(new Integer(2)));
+
+ // Make node 2 to "cleanly" leave the cluster
+
+ ServerManagement.stop(2);
+
+ view = ServerManagement.getServer(1).getNodeIDView();
+
+ assertEquals(1, view.size());
+ assertTrue(view.contains(new Integer(1)));
+
+ // Reuse the "hollow" RMI server 0 to start another cluster node
+
+ ServerManagement.start("all", 0);
+
+ view = ServerManagement.getServer(0).getNodeIDView();
+
+ assertEquals(2, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ assertTrue(view.contains(new Integer(1)));
+
+
+ // Reuse the "hollow" RMI server 2 to start another cluster node
+
+ ServerManagement.start("all", 2);
+
+ view = ServerManagement.getServer(2).getNodeIDView();
+
+ assertEquals(3, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ assertTrue(view.contains(new Integer(1)));
+ assertTrue(view.contains(new Integer(2)));
+
+ }
+ finally
+ {
+ ServerManagement.stop(2);
+ ServerManagement.stop(1);
+ ServerManagement.stop(0);
+ }
}
+ public void testDirtyLeaveOneNode() throws Exception
+ {
+ ViewChangeNotificationListener viewChange = new ViewChangeNotificationListener();
+ ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
+ try
+ {
+ // Start with a 2 node cluster
+ ServerManagement.start("all", 0);
+ ServerManagement.start("all", 1);
+
+ Set view = ServerManagement.getServer(0).getNodeIDView();
+
+ assertEquals(2, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ assertTrue(view.contains(new Integer(1)));
+
+ ServerManagement.addNotificationListener(0, postOfficeObjectName, viewChange);
+
+ // Make node 1 to "dirty" leave the cluster, by killing the VM running it.
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED 1");
+ log.info("########");
+
+ // Wait for membership change notification
+
+ if (!viewChange.viewChanged(120000))
+ {
+ fail("Did not receive view change after killing server 2!");
+ }
+
+ view = ServerManagement.getServer(0).getNodeIDView();
+
+ assertEquals(1, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ }
+ finally
+ {
+ ServerManagement.removeNotificationListener(0, postOfficeObjectName, viewChange);
+
+ ServerManagement.stop(1);
+ ServerManagement.stop(0);
+ }
+ }
+
+ public void testDirtyLeaveTwoNodes() throws Exception
+ {
+ ViewChangeNotificationListener viewChange = new ViewChangeNotificationListener();
+ ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
+
+ try
+ {
+ // Start with a 3 node cluster
+
+ ServerManagement.start("all", 0);
+ ServerManagement.start("all", 1);
+ ServerManagement.start("all", 2);
+
+ Set view = ServerManagement.getServer(0).getNodeIDView();
+
+ assertEquals(3, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ assertTrue(view.contains(new Integer(1)));
+ assertTrue(view.contains(new Integer(2)));
+
+ ServerManagement.addNotificationListener(0, postOfficeObjectName, viewChange);
+
+ // Make node 2 to "dirty" leave the cluster, by killing the VM running it.
+
+ ServerManagement.kill(2);
+
+ log.info("########");
+ log.info("######## KILLED 2");
+ log.info("########");
+
+ // Wait for membership change notification
+
+ if (!viewChange.viewChanged(120000))
+ {
+ fail("Did not receive view change after killing server 2!");
+ }
+
+ view = ServerManagement.getServer(1).getNodeIDView();
+
+ assertEquals(2, view.size());
+ assertTrue(view.contains(new Integer(0)));
+ assertTrue(view.contains(new Integer(1)));
+
+ // Make node 1 to "dirty" leave the cluster, by killing the VM running it.
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED 1");
+ log.info("########");
+
+ // Wait for membership change notification
+
+ if (!viewChange.viewChanged(120000))
+ {
+ fail("Did not receive view change after killing server 1!");
+ }
+
+ view = ServerManagement.getServer(0).getNodeIDView();
+
+ assertEquals(1, view.size());
+ assertTrue(view.contains(new Integer(0)));
+
+ }
+ finally
+ {
+ ServerManagement.removeNotificationListener(0, postOfficeObjectName, viewChange);
+
+ ServerManagement.stop(2);
+ ServerManagement.stop(1);
+ ServerManagement.stop(0);
+ }
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -169,4 +398,45 @@
// Inner classes -------------------------------------------------
+ private class ViewChangeNotificationListener implements NotificationListener
+ {
+ private Slot slot;
+
+ ViewChangeNotificationListener()
+ {
+ slot = new Slot();
+ }
+
+ public void handleNotification(Notification notification, Object object)
+ {
+
+ if (!ClusteredPostOffice.VIEW_CHANGED_NOTIFICATION.equals(notification.getType()))
+ {
+ // ignore it
+ return;
+ }
+
+ log.info("received VIEW_CHANGED notification");
+
+ try
+ {
+ slot.put(Boolean.TRUE);
+ }
+ catch(InterruptedException e)
+ {
+ log.error(e);
+ }
+ }
+
+ public boolean viewChanged(long timeout) throws InterruptedException
+ {
+ Boolean result = (Boolean)slot.poll(timeout);
+ if (result == null)
+ {
+ return false;
+ }
+ return result.booleanValue();
+ }
+ }
+
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -38,7 +38,6 @@
import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.message.MessageProxy;
import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
import org.jboss.test.messaging.tools.ServerManagement;
@@ -503,8 +502,8 @@
log.info("************ KILLING (CRASHING) SERVER 1");
- ServerManagement.getServer(1).destroy();
-
+ ServerManagement.getServer(1).kill();
+
log.info("killed server, now waiting");
Thread.sleep(5000);
@@ -837,28 +836,28 @@
// Private -------------------------------------------------------
- private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
- {
- MessageProxy message = (MessageProxy) consumer.receive(3000);
- TextMessage txtMessage = (TextMessage) message;
- if (message != null)
- {
- log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
- } else
- {
- log.info(text + ": Message received was null");
- }
- if (shouldAssert)
- {
- if (shouldBeNull)
- {
- assertNull(message);
- } else
- {
- assertNotNull(message);
- }
- }
- }
+// private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
+// {
+// MessageProxy message = (MessageProxy) consumer.receive(3000);
+// TextMessage txtMessage = (TextMessage) message;
+// if (message != null)
+// {
+// log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
+// } else
+// {
+// log.info(text + ": Message received was null");
+// }
+// if (shouldAssert)
+// {
+// if (shouldBeNull)
+// {
+// assertNull(message);
+// } else
+// {
+// assertNotNull(message);
+// }
+// }
+// }
// Inner classes -------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -103,7 +103,7 @@
// public void testKill() throws Exception
// {
-// ServerManagement.getServer(0).destroy();
+// ServerManagement.getServer(0).kill();
// }
public void testDistributedTopic() throws Exception
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -116,7 +116,7 @@
String remotingSessionId = (String)remoteServer.executeCommand(command);
- remoteServer.destroy();
+ remoteServer.kill();
//we have removed the exception listener so the server side resouces shouldn't be cleared up
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -115,7 +115,7 @@
// Now we should have a client connection from the remote server to the local server
- remoteServer.destroy();
+ remoteServer.kill();
log.trace("killed remote server");
// Wait for connection resources to be cleared up
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -117,7 +117,7 @@
// Now we should have a client connection from the remote server to the local server
- remoteServer.destroy();
+ remoteServer.kill();
log.trace("killed remote server");
// Wait for connection resources to be cleared up
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -119,7 +119,7 @@
// Now we should have a client connection from the remote server to the local server
- remoteServer.destroy();
+ remoteServer.kill();
log.trace("killed remote server");
// Wait for connection resources to be cleared up
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -121,7 +121,7 @@
log.info("we have = " + ((SimpleConnectionManager)cm).getClients().size() + " clients registered on SimpleconnectionManager");
// Now we should have a client connection from the remote server to the local server
- remoteServer.destroy();
+ remoteServer.kill();
log.info("killed remote server");
// Wait for connection resources to be cleared up
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -117,7 +117,7 @@
// Now we should have a client connection from the remote server to the local server
- remoteServer.destroy();
+ remoteServer.kill();
log.trace("killed remote server");
// Wait for connection resources to be cleared up
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -24,7 +24,13 @@
import java.rmi.Naming;
import java.util.Hashtable;
import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.Iterator;
import javax.management.ObjectName;
+import javax.management.NotificationListener;
+import javax.management.Notification;
import javax.transaction.UserTransaction;
import org.jboss.jms.message.MessageIdGeneratorFactory;
import org.jboss.jms.server.DestinationManager;
@@ -35,6 +41,7 @@
import org.jboss.test.messaging.tools.jmx.rmi.LocalTestServer;
import org.jboss.test.messaging.tools.jmx.rmi.RMITestServer;
import org.jboss.test.messaging.tools.jmx.rmi.Server;
+import org.jboss.test.messaging.tools.jmx.rmi.NotificationListenerID;
import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
import org.jboss.test.messaging.tools.jndi.RemoteInitialContextFactory;
@@ -73,6 +80,9 @@
private static Server[] servers = new Server[MAX_SERVER_COUNT];
+ // Map<NotificationListener - NotificationListenerPoller>
+ private static Map notificationListenerPollers = new HashMap();
+
public static boolean isLocal()
{
return !"true".equals(System.getProperty("remote"));
@@ -171,17 +181,45 @@
public static synchronized void stop(int index) throws Exception
{
- insureStarted(index);
+ if (servers[index] == null)
+ {
+ log.warn("Server " + index + " has not been created, so it cannot be stopped");
+ return;
+ }
+
+ if (!servers[index].isStarted())
+ {
+ log.warn("Server " + index + " either has not been started, or it is stopped already");
+ return;
+ }
+
servers[index].stop();
}
+ /**
+ * TODO - this methods should be removed, to not be confused with kill(index)
+ * @deprecated
+ */
public static synchronized void destroy() throws Exception
{
stop();
+ servers[0].kill();
+ servers[0] = null;
+ }
- servers[0].destroy();
+ /**
+ * Abruptly kills the VM running the specified server.
+ */
+ public static synchronized void kill(int index) throws Exception
+ {
+ if (servers[index] == null)
+ {
+ log.warn("Server " + index + " has not been created, so it cannot be killed");
+ return;
+ }
- servers[0] = null;
+ servers[index].kill();
+ servers[index] = null;
}
public static void disconnect() throws Exception
@@ -224,6 +262,59 @@
return servers[0].invoke(on, operationName, params, signature);
}
+ public static void addNotificationListener(int serverIndex, ObjectName on,
+ NotificationListener listener) throws Exception
+ {
+ insureStarted(serverIndex);
+
+ if (isLocal())
+ {
+ // add the listener directly to the server
+ servers[serverIndex].addNotificationListener(on, listener);
+ }
+ else
+ {
+ // is remote, need to poll
+ NotificationListenerPoller p =
+ new NotificationListenerPoller((Server)servers[serverIndex], on, listener);
+
+ synchronized(notificationListenerPollers)
+ {
+ notificationListenerPollers.put(listener, p);
+ }
+
+ new Thread(p, "Poller for " + Integer.toHexString(p.hashCode())).start();
+ }
+ }
+
+ public static void removeNotificationListener(int serverIndex, ObjectName on,
+ NotificationListener listener) throws Exception
+ {
+ insureStarted(serverIndex);
+
+ if (isLocal())
+ {
+ // remove the listener directly
+ servers[serverIndex].removeNotificationListener(on, listener);
+ }
+ else
+ {
+ // is remote
+
+ NotificationListenerPoller p = null;
+ synchronized(notificationListenerPollers)
+ {
+ p = (NotificationListenerPoller)notificationListenerPollers.remove(listener);
+ }
+
+ if (p != null)
+ {
+ // stop the polling thread
+ p.stop();
+ }
+ }
+ }
+
public static Set query(ObjectName pattern) throws Exception
{
insureStarted();
@@ -645,7 +736,9 @@
{
log.info("trying to connect to the remote RMI server " + index +
(attempt == 1 ? "" : ", attempt " + attempt));
+
s = (Server)Naming.lookup(name);
+
log.info("connected to the remote server");
}
catch(Exception e)
@@ -805,4 +898,63 @@
// }
// }
// }
+
+ private static long listenerIDCounter = 0;
+
+ static class NotificationListenerPoller implements Runnable
+ {
+ public static final int POLL_INTERVAL = 500;
+
+ private long id;
+ private Server server;
+ private NotificationListener listener;
+ private volatile boolean running;
+
+ private synchronized static long generateID()
+ {
+ return listenerIDCounter++;
+ }
+
+ NotificationListenerPoller(Server server, ObjectName on, NotificationListener listener)
+ throws Exception
+ {
+ id = generateID();
+ this.server = server;
+
+ server.addNotificationListener(on, new NotificationListenerID(id));
+
+ this.listener = listener;
+ this.running = true;
+ }
+
+ public void run()
+ {
+ while(running)
+ {
+ try
+ {
+ List notifications = server.pollNotificationListener(id);
+
+ for(Iterator i = notifications.iterator(); i.hasNext(); )
+ {
+ Notification n = (Notification)i.next();
+ listener.handleNotification(n, null);
+ }
+
+ Thread.sleep(POLL_INTERVAL);
+ }
+ catch(Exception e)
+ {
+ log.error(e);
+ stop();
+ }
+ }
+ }
+
+ public void stop()
+ {
+ running = false;
+ }
+ }
+
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -45,6 +45,7 @@
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.management.ObjectName;
+import javax.management.NotificationListener;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NameNotFoundException;
@@ -704,6 +705,18 @@
return mbeanServer.getAttribute(on, name);
}
+ public void addNotificationListener(ObjectName on, NotificationListener listener)
+ throws Exception
+ {
+ mbeanServer.addNotificationListener(on, listener, null, null);
+ }
+
+ public void removeNotificationListener(ObjectName on, NotificationListener listener)
+ throws Exception
+ {
+ mbeanServer.removeNotificationListener(on, listener);
+ }
+
public void bindDefaultJMSProvider() throws Exception
{
JNDIProviderAdapter pa = new JNDIProviderAdapter();
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -30,6 +30,7 @@
import javax.jms.Queue;
import javax.jms.Topic;
import javax.management.ObjectName;
+import javax.management.NotificationListener;
import javax.transaction.UserTransaction;
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.ServerPeer;
@@ -170,9 +171,9 @@
}
}
- public synchronized void destroy() throws Exception
+ public synchronized void kill() throws Exception
{
- stop();
+ throw new IllegalStateException("Cannot KILL a local server. Consider using stop() instead.");
}
public ObjectName deploy(String mbeanConfiguration) throws Exception
@@ -203,6 +204,18 @@
return sc.invoke(on, operationName, params, signature);
}
+ public void addNotificationListener(ObjectName on, NotificationListener listener)
+ throws Exception
+ {
+ sc.addNotificationListener(on, listener);
+ }
+
+ public void removeNotificationListener(ObjectName on, NotificationListener listener)
+ throws Exception
+ {
+ sc.removeNotificationListener(on, listener);
+ }
+
public Set query(ObjectName pattern) throws Exception
{
return sc.query(pattern);
@@ -808,6 +821,12 @@
return (Set)sc.getAttribute(postOfficeObjectName, "NodeIDView");
}
+ public List pollNotificationListener(long listenerID) throws Exception
+ {
+ throw new IllegalStateException("Poll doesn't make sense on a local server. " +
+ "Register listeners directly instead.");
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -0,0 +1,58 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.tools.jmx.rmi;
+
+import javax.management.NotificationListener;
+import javax.management.Notification;
+import java.io.Serializable;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class NotificationListenerID implements Serializable, NotificationListener
+{
+ // Constants -----------------------------------------------------
+
+ private static final long serialVersionUID = -39839086486546L;
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long id;
+
+ // Constructors --------------------------------------------------
+
+ public NotificationListenerID(long id)
+ {
+ this.id = id;
+ }
+
+ // NotificationListener implementation ---------------------------
+
+ public void handleNotification(Notification notification, Object object)
+ {
+ throw new IllegalStateException("Do not use this method directly!");
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getID()
+ {
+ return id;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -0,0 +1,67 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.tools.jmx.rmi;
+
+import javax.management.NotificationListener;
+import javax.management.Notification;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Stores notifications until they're transferred to the remote client.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class ProxyNotificationListener implements NotificationListener
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private List notifications;
+
+ // Constructors --------------------------------------------------
+
+ ProxyNotificationListener()
+ {
+ notifications = new ArrayList();
+ }
+
+ // NotificationListener implementation ---------------------------
+
+ public synchronized void handleNotification(Notification notification, Object object)
+ {
+ notifications.add(notification);
+ }
+
+ // Public --------------------------------------------------------
+
+ public synchronized List drain()
+ {
+ if (notifications.size() == 0)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ List old = notifications;
+ notifications = new ArrayList();
+ return old;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -25,7 +25,12 @@
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
import javax.management.ObjectName;
+import javax.management.NotificationListener;
import javax.transaction.UserTransaction;
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.ServerPeer;
@@ -46,24 +51,20 @@
*/
public class RMITestServer extends UnicastRemoteObject implements Server
{
- private static final long serialVersionUID = -368445344011004778L;
+ // Constants -----------------------------------------------------
+ public static final String RMI_SERVER_PREFIX = "messaging_rmi_server_";
+ public static final String NAMING_SERVER_PREFIX = "naming_rmi_server_";
+
public static final int DEFAULT_REGISTRY_PORT = 22555;
public static final int DEFAULT_SERVER_INDEX = 0;
public static final String DEFAULT_SERVER_HOST = "localhost";
+ private static final long serialVersionUID = -368445344011004778L;
private static final Logger log = Logger.getLogger(RMITestServer.class);
- protected RemoteTestServer server;
+ // Static --------------------------------------------------------
- private RMINamingDelegate namingDelegate;
-
-
- public static final String RMI_SERVER_PREFIX = "messaging_rmi_server_";
- public static final String NAMING_SERVER_PREFIX = "naming_rmi_server_";
-
- private static Registry registry;
-
public static void main(String[] args) throws Exception
{
log.debug("initializing RMI runtime");
@@ -92,6 +93,7 @@
// let RMI know the bind address
System.setProperty("java.rmi.server.hostname", host);
+ Registry registry;
// try to bind first
try
@@ -115,123 +117,150 @@
log.info("RMI server " + serverIndex + " bound");
}
- public class VMKiller implements Runnable
- {
- public void run()
- {
- log.info("shutting down the VM");
+ // Attributes ----------------------------------------------------
- try
- {
- Thread.sleep(250);
- }
- catch(Exception e)
- {
- log.warn("interrupted while sleeping", e);
- }
+ protected RemoteTestServer server;
+ private RMINamingDelegate namingDelegate;
+ // Map<Long-ProxyNotificationListener>
+ private Map proxyListeners;
- System.exit(0);
- }
- }
+ // Constructors --------------------------------------------------
public RMITestServer(int index) throws Exception
{
namingDelegate = new RMINamingDelegate(index);
server = new RemoteTestServer(index);
+ proxyListeners = new HashMap();
}
- public void configureSecurityForDestination(String destName, String config) throws Exception
+ // Server implementation -----------------------------------------
+
+ public void start(String containerConfig) throws Exception
{
- server.configureSecurityForDestination(destName, config);
+ server.start(containerConfig);
}
- public ObjectName deploy(String mbeanConfiguration) throws Exception
+ public void stop() throws Exception
{
- return server.deploy(mbeanConfiguration);
+ server.stop();
+ namingDelegate.reset();
}
- public void deployQueue(String name, String jndiName, boolean clustered) throws Exception
+ public synchronized void kill() throws Exception
{
- server.deployQueue(name, jndiName, clustered);
+ // Kills the server without doing any graceful shutdown. For graceful shutdown use stop().
+ new Thread(new VMKiller(), "VM Killer").start();
}
- public void deployTopic(String name, String jndiName, boolean clustered) throws Exception
+ public ObjectName deploy(String mbeanConfiguration) throws Exception
{
- server.deployTopic(name, jndiName, clustered);
+ return server.deploy(mbeanConfiguration);
}
-
- public void deployQueue(String name,
- String jndiName,
- int fullSize,
- int pageSize,
- int downCacheSize,
- boolean clustered) throws Exception
+
+ public void undeploy(ObjectName on) throws Exception
{
- server.deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+ server.undeploy(on);
}
- public void createQueue(String name, String jndiName) throws Exception
+ public Object getAttribute(ObjectName on, String attribute) throws Exception
{
- server.createQueue(name, jndiName);
+ return server.getAttribute(on, attribute);
}
- public void deployTopic(String name,
- String jndiName,
- int fullSize,
- int pageSize,
- int downCacheSize,
- boolean clustered) throws Exception
+ public void setAttribute(ObjectName on, String name, String valueAsString) throws Exception
{
- server.deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+ server.setAttribute(on, name, valueAsString);
}
- public void createTopic(String name, String jndiName) throws Exception
+ public Object invoke(ObjectName on, String operationName, Object[] params, String[] signature)
+ throws Exception
{
- server.createTopic(name, jndiName);
+ return server.invoke(on, operationName, params, signature);
}
- public void deployConnectionFactory(String objectName, String[] jndiBindings)
+ public void addNotificationListener(ObjectName on, NotificationListener listener)
throws Exception
{
- server.deployConnectionFactory(objectName, jndiBindings);
+ if (!(listener instanceof NotificationListenerID))
+ {
+ throw new IllegalArgumentException("A RMITestServer can only handle NotificationListenerIDs!");
+ }
+
+ long id = ((NotificationListenerID)listener).getID();
+
+ ProxyNotificationListener pl = new ProxyNotificationListener();
+
+ synchronized(proxyListeners)
+ {
+ proxyListeners.put(new Long(id), pl);
+ }
+
+ server.addNotificationListener(on, pl);
}
-
- public void deployConnectionFactory(String objectName, String[] jndiBindings, int prefetchSize)
+
+ public void removeNotificationListener(ObjectName on, NotificationListener listener)
throws Exception
{
- server.deployConnectionFactory(objectName, jndiBindings, prefetchSize);
+
+ if (!(listener instanceof NotificationListenerID))
+ {
+ throw new IllegalArgumentException("A RMITestServer can only handle NotificationListenerIDs!");
+ }
+
+ long id = ((NotificationListenerID)listener).getID();
+
+ ProxyNotificationListener pl = null;
+
+ synchronized(proxyListeners)
+ {
+ pl = (ProxyNotificationListener)proxyListeners.remove(new Long(id));
+ }
+
+ server.removeNotificationListener(on, pl);
}
- public void deployConnectionFactory(String objectName,
- String[] jndiBindings,
- int prefetchSize,
- int defaultTempQueueFullSize,
- int defaultTempQueuePageSize,
- int defaultTempQueueDownCacheSize) throws Exception
+ public Set query(ObjectName pattern) throws Exception
{
- server.deployConnectionFactory(objectName, jndiBindings, prefetchSize,
- defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
+ return server.query(pattern);
}
- public void undeployConnectionFactory(ObjectName objectName) throws Exception
+ public String getDatabaseType()
{
- server.undeployConnectionFactory(objectName);
+ return server.getDatabaseType();
}
- public synchronized void destroy() throws Exception
+ public void log(int level, String text) throws Exception
{
- //Kill the server without doing any graceful shutdown
-
- //For graceful shutdown use stop()
-
- new Thread(new VMKiller(), "VM Killer").start();
+ server.log(level, text);
}
- public Object getAttribute(ObjectName on, String attribute) throws Exception
+ public void startServerPeer(int serverPeerID, String defaultQueueJNDIContext,
+ String defaultTopicJNDIContext, boolean clustered) throws Exception
{
- return server.getAttribute(on, attribute);
+ server.
+ startServerPeer(serverPeerID, defaultQueueJNDIContext, defaultTopicJNDIContext, clustered);
}
+ public void stopServerPeer() throws Exception
+ {
+ server.stopServerPeer();
+ }
+
+ public boolean isServerPeerStarted() throws Exception
+ {
+ return server.isServerPeerStarted();
+ }
+
+ public ObjectName getServerPeerObjectName() throws Exception
+ {
+ return server.getServerPeerObjectName();
+ }
+
+ public boolean isStarted() throws Exception
+ {
+ return server.isStarted();
+ }
+
public Set getConnectorSubsystems() throws Exception
{
return server.getConnectorSubsystems();
@@ -248,9 +277,9 @@
server.removeServerInvocationHandler(subsystem);
}
- public String getDefaultSecurityConfig() throws Exception
+ public MessageStore getMessageStore() throws Exception
{
- return server.getDefaultSecurityConfig();
+ return server.getMessageStore();
}
public DestinationManager getDestinationManager() throws Exception
@@ -258,106 +287,122 @@
return server.getDestinationManager();
}
- public MessageStore getMessageStore() throws Exception
+ public PersistenceManager getPersistenceManager() throws Exception
{
- return server.getMessageStore();
+ return server.getPersistenceManager();
}
- public PersistenceManager getPersistenceManager() throws Exception
+ public PostOffice getQueuePostOffice() throws Exception
{
- return server.getPersistenceManager();
+ return server.getQueuePostOffice();
}
-
- public ObjectName getServerPeerObjectName() throws Exception
+
+ public PostOffice getTopicPostOffice() throws Exception
{
- return server.getServerPeerObjectName();
+ return server.getTopicPostOffice();
}
- public Object invoke(ObjectName on, String operationName, Object[] params, String[] signature) throws Exception
+ public ServerPeer getServerPeer() throws Exception
{
- return server.invoke(on, operationName, params, signature);
+ return server.getServerPeer();
}
- public boolean isServerPeerStarted() throws Exception
+ public void deployTopic(String name, String jndiName, boolean clustered) throws Exception
{
- return server.isServerPeerStarted();
+ server.deployTopic(name, jndiName, clustered);
}
- public boolean isStarted() throws Exception
+ public void deployTopic(String name,
+ String jndiName,
+ int fullSize,
+ int pageSize,
+ int downCacheSize,
+ boolean clustered) throws Exception
{
- return server.isStarted();
+ server.deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
}
- public void log(int level, String text) throws Exception
+ public void createTopic(String name, String jndiName) throws Exception
{
- server.log(level, text);
+ server.createTopic(name, jndiName);
}
- public Set query(ObjectName pattern) throws Exception
+ public void deployQueue(String name, String jndiName, boolean clustered) throws Exception
{
- return server.query(pattern);
+ server.deployQueue(name, jndiName, clustered);
}
- public String getDatabaseType()
+ public void deployQueue(String name,
+ String jndiName,
+ int fullSize,
+ int pageSize,
+ int downCacheSize,
+ boolean clustered) throws Exception
{
- return server.getDatabaseType();
+ server.deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
}
- public void setAttribute(ObjectName on, String name, String valueAsString) throws Exception
+ public void createQueue(String name, String jndiName) throws Exception
{
- server.setAttribute(on, name, valueAsString);
+ server.createQueue(name, jndiName);
}
- public void setDefaultSecurityConfig(String config) throws Exception
+ public void undeployDestination(boolean isQueue, String name) throws Exception
{
- server.setDefaultSecurityConfig(config);
+ server.undeployDestination(isQueue, name);
}
- public void start(String containerConfig) throws Exception
+ public boolean destroyDestination(boolean isQueue, String name) throws Exception
{
- server.start(containerConfig);
+ return server.destroyDestination(isQueue, name);
}
- public void startServerPeer(int serverPeerID, String defaultQueueJNDIContext,
- String defaultTopicJNDIContext, boolean clustered) throws Exception
+ public void deployConnectionFactory(String objectName, String[] jndiBindings)
+ throws Exception
{
- server.startServerPeer(serverPeerID, defaultQueueJNDIContext, defaultTopicJNDIContext, clustered);
+ server.deployConnectionFactory(objectName, jndiBindings);
}
- public void stop() throws Exception
+ public void deployConnectionFactory(String objectName, String[] jndiBindings, int prefetchSize)
+ throws Exception
{
- server.stop();
- namingDelegate.reset();
+ server.deployConnectionFactory(objectName, jndiBindings, prefetchSize);
}
- public void stopServerPeer() throws Exception
+ public void deployConnectionFactory(String objectName,
+ String[] jndiBindings,
+ int prefetchSize,
+ int defaultTempQueueFullSize,
+ int defaultTempQueuePageSize,
+ int defaultTempQueueDownCacheSize) throws Exception
{
- server.stopServerPeer();
+ server.deployConnectionFactory(objectName, jndiBindings, prefetchSize,
+ defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
}
- public void undeploy(ObjectName on) throws Exception
+ public void undeployConnectionFactory(ObjectName objectName) throws Exception
{
- server.undeploy(on);
+ server.undeployConnectionFactory(objectName);
}
- public void undeployDestination(boolean isQueue, String name) throws Exception
+ public void configureSecurityForDestination(String destName, String config) throws Exception
{
- server.undeployDestination(isQueue, name);
+ server.configureSecurityForDestination(destName, config);
}
- public boolean destroyDestination(boolean isQueue, String name) throws Exception
+ public void setDefaultSecurityConfig(String config) throws Exception
{
- return server.destroyDestination(isQueue, name);
+ server.setDefaultSecurityConfig(config);
}
- public Object executeCommand(Command command) throws Exception
+ public String getDefaultSecurityConfig() throws Exception
{
- return server.executeCommand(command);
+ return server.getDefaultSecurityConfig();
}
- public ServerPeer getServerPeer() throws Exception
+ public Object executeCommand(Command command) throws Exception
{
- return server.getServerPeer();
+ return server.executeCommand(command);
}
public UserTransaction getUserTransaction() throws Exception
@@ -370,9 +415,54 @@
return server.getNodeIDView();
}
+ public List pollNotificationListener(long listenerID) throws Exception
+ {
+ ProxyNotificationListener pl = null;
+
+ synchronized(proxyListeners)
+ {
+ pl = (ProxyNotificationListener)proxyListeners.get(new Long(listenerID));
+ }
+
+ if (pl == null)
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ return pl.drain();
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
private RMINamingDelegate getNamingDelegate()
{
return namingDelegate;
}
+ // Inner classes -------------------------------------------------
+
+ public class VMKiller implements Runnable
+ {
+ public void run()
+ {
+ log.info("shutting down the VM");
+
+ try
+ {
+ Thread.sleep(250);
+ }
+ catch(Exception e)
+ {
+ log.warn("interrupted while sleeping", e);
+ }
+
+ System.exit(0);
+ }
+ }
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -23,7 +23,9 @@
import java.rmi.Remote;
import java.util.Set;
+import java.util.List;
import javax.management.ObjectName;
+import javax.management.NotificationListener;
import javax.transaction.UserTransaction;
import org.jboss.jms.server.DestinationManager;
import org.jboss.jms.server.ServerPeer;
@@ -43,21 +45,36 @@
public interface Server extends Remote
{
void start(String containerConfig) throws Exception;
+
void stop() throws Exception;
- void destroy() throws Exception;
/**
+ * For a remote server, it "abruptly" kills the VM running the server. For a local server
+ * it just stops the server.
+ */
+ void kill() throws Exception;
+
+ /**
* Deploys and registers a service based on the MBean service descriptor element, specified as
* a String. Supports XMBeans. The implementing class and the ObjectName are inferred from the
* mbean element. If there are configuration attributed specified in the deployment descriptor,
* they are applied to the service instance.
*/
ObjectName deploy(String mbeanConfiguration) throws Exception;
+
void undeploy(ObjectName on) throws Exception;
+
Object getAttribute(ObjectName on, String attribute) throws Exception;
+
void setAttribute(ObjectName on, String name, String valueAsString) throws Exception;
+
Object invoke(ObjectName on, String operationName, Object[] params, String[] signature)
throws Exception;
+
+ void addNotificationListener(ObjectName on, NotificationListener listener) throws Exception;
+
+ void removeNotificationListener(ObjectName on, NotificationListener listener) throws Exception;
+
/**
* Returns a set of ObjectNames corresponding to installed services.
*/
@@ -218,4 +235,9 @@
*/
Set getNodeIDView() throws Exception;
+ /**
+ * @return List<Notification>
+ */
+ List pollNotificationListener(long listenerID) throws Exception;
+
}
Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java 2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java 2006-12-11 16:56:06 UTC (rev 1764)
@@ -81,7 +81,7 @@
// We should shut down cleanly - not kill the process like we are currently doing
- server.destroy();
+ server.kill();
// The last RMI server will take with it the registry too
More information about the jboss-cvs-commits
mailing list