[jboss-cvs] JBoss Messaging SVN: r2858 - in trunk: src/main/org/jboss/jms/client/container and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Jul 8 21:12:37 EDT 2007
Author: timfox
Date: 2007-07-08 21:12:36 -0400 (Sun, 08 Jul 2007)
New Revision: 2858
Modified:
trunk/src/main/org/jboss/jms/client/FailoverEvent.java
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/src/main/org/jboss/jms/wireformat/SessionRecoverDeliveriesRequest.java
trunk/src/main/org/jboss/messaging/core/contract/Channel.java
trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
trunk/src/main/org/jboss/messaging/core/contract/Queue.java
trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/AddAllReplicatedDeliveriesMessage.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Log:
More fixes tweaks etc
Modified: trunk/src/main/org/jboss/jms/client/FailoverEvent.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/FailoverEvent.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/client/FailoverEvent.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -54,7 +54,8 @@
type == FAILURE_DETECTED ? "FAILURE_DETECTED" :
type == FAILOVER_STARTED ? "FAILOVER_STARTED" :
type == FAILOVER_COMPLETED ? "FAILOVER_COMPLETED" :
- type == FAILOVER_FAILED ? "FAILOVER_FAILED" : "UNKNOWN_FAILOVER_EVENT";
+ type == FAILOVER_ALREADY_COMPLETED ? "FAILOVER_ALREADY_COMPLETED" :
+ type == FAILOVER_FAILED ? "FAILOVER_FAILED" : "UNKNOWN_FAILOVER_EVENT";
}
// Package protected ----------------------------------------------------------------------------
@@ -68,7 +69,8 @@
if (type != FAILURE_DETECTED &&
type != FAILOVER_STARTED &&
type != FAILOVER_COMPLETED &&
- type != FAILOVER_FAILED)
+ type != FAILOVER_FAILED &&
+ type != FAILOVER_ALREADY_COMPLETED)
{
throw new IllegalArgumentException("Illegal failover event type: " + type);
}
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -298,6 +298,8 @@
public void cancelBuffer() throws JMSException
{
+ if (trace) { log.trace("Cancelling buffer: " + buffer.size()); }
+
synchronized (mainLock)
{
// Now we cancel anything left in the buffer. The reason we do this now is that otherwise
@@ -327,7 +329,9 @@
cancels.add(cancel);
}
+ if (trace) { log.trace("Calling cancelDeliveries"); }
sessionDelegate.cancelDeliveries(cancels);
+ if (trace) { log.trace("Done call"); }
buffer.clear();
}
@@ -574,8 +578,6 @@
return;
}
- log.info("waiting for last delivery " + id);
-
synchronized (mainLock)
{
waitingForLastDelivery = true;
Modified: trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/client/container/ClosedInterceptor.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -169,16 +169,13 @@
{
return invocation.invokeNext();
}
- catch (Throwable t)
+ catch (Exception t)
{
if (isClosing || isClose)
{
//We swallow exceptions in close/closing, this is because if the connection fails, it is naturally for code to then close
//in a finally block, it would not then be appropriate to throw an exception. This is a common technique
- if (trace)
- {
- log.trace("Failed to close", t);
- }
+ //Close should ALWAYS (well apart from Errors) succeed irrespective of whether the actual connection to the server is alive.
return new Long(-1);
}
throw t;
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -174,7 +174,6 @@
public synchronized void updateFailoverInfo(ClientConnectionFactoryDelegate[] delegates,
Map failoverMap)
{
- log.info(this +" **** UPDATING FAILOVER INFO");
this.delegates = delegates;
this.failoverMap = failoverMap;
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -461,9 +461,9 @@
doInvoke(client, req);
}
- public void recoverDeliveries(List acks) throws JMSException
+ public void recoverDeliveries(List acks, String sessionID) throws JMSException
{
- RequestSupport req = new SessionRecoverDeliveriesRequest(id, version, acks);
+ RequestSupport req = new SessionRecoverDeliveriesRequest(id, version, acks, sessionID);
doInvoke(client, req);
}
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -322,9 +322,9 @@
ackInfos = rm.getDeliveriesForSession(getSessionID());
}
+ List recoveryInfos = new ArrayList();
if (!ackInfos.isEmpty())
- {
- List recoveryInfos = new ArrayList();
+ {
for (Iterator i = ackInfos.iterator(); i.hasNext(); )
{
DeliveryInfo del = (DeliveryInfo)i.next();
@@ -334,15 +334,14 @@
del.getQueueName());
recoveryInfos.add(recInfo);
- }
-
- log.debug(this + " sending delivery recovery " + recoveryInfos + " on failover");
- newDelegate.recoverDeliveries(recoveryInfos);
+ }
}
- else
- {
- log.debug(this + " no delivery recovery info to send on failover");
- }
+
+ //Note! We ALWAYS call recoverDeliveries even if there are no deliveries since it also does other stuff
+ //like remove from recovery Area refs corresponding to messages in client consumer buffers
+
+ log.debug(this + " sending delivery recovery " + recoveryInfos + " on failover");
+ newDelegate.recoverDeliveries(recoveryInfos, oldSessionID);
}
// Public ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -127,6 +127,6 @@
* Send delivery info to the server so the delivery lists can be repopulated. Used only in
* failover.
*/
- void recoverDeliveries(List createInfos) throws JMSException;
+ void recoverDeliveries(List createInfos, String oldSessionID) throws JMSException;
}
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -378,7 +378,6 @@
rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
- log.info("**** Updating clustered clients");
endpoint.updateClusteredClients(delArr, failoverMap);
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -263,6 +263,8 @@
Dispatcher.instance.registerTarget(sessionID, sessionAdvised);
log.debug("created and registered " + ep);
+
+ log.info("*********** CREATING SESSION WITH ID:" + sessionID);
ClientSessionDelegate d = new ClientSessionDelegate(sessionID, dupsOKBatchSize);
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -170,6 +170,8 @@
//Temporary until we have our own NIO transport
QueuedExecutor executor = new QueuedExecutor(new LinkedQueue());
+ private LinkedQueue toDeliver = new LinkedQueue();
+
// Constructors ---------------------------------------------------------------------------------
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint) throws Exception
@@ -427,7 +429,7 @@
}
}
- public void recoverDeliveries(List deliveryRecoveryInfos) throws JMSException
+ public void recoverDeliveries(List deliveryRecoveryInfos, String oldSessionID) throws JMSException
{
if (trace) { log.trace(this + "recovers deliveries " + deliveryRecoveryInfos); }
@@ -530,7 +532,8 @@
if (trace) { log.trace(this + " Recovered delivery " + deliveryId + ", " + del); }
deliveries.put(new Long(deliveryId),
- new DeliveryRecord(del, dlqToUse, expiryQueueToUse, dest.getRedeliveryDelay(), maxDeliveryAttemptsToUse, queueName, supportsFailover));
+ new DeliveryRecord(del, dlqToUse, expiryQueueToUse, dest.getRedeliveryDelay(),
+ maxDeliveryAttemptsToUse, queueName, supportsFailover, deliveryId));
//We want to replicate the deliveries to the new backup, but we don't want a response since that would cause actual delivery
//to occur, which we don't want since the client already has the deliveries
@@ -542,6 +545,19 @@
}
}
+ iter = postOffice.getAllBindings().iterator();
+
+ while (iter.hasNext())
+ {
+ Binding binding = (Binding)iter.next();
+
+ if (binding.queue.isClustered() && binding.queue.isRecoverable())
+ {
+ // Remove any stranded refs corresponding to refs that might have been in the client buffer but not consumed
+ binding.queue.removeStrandedReferences(oldSessionID);
+ }
+ }
+
this.deliveryIdSequence = new SynchronizedLong(maxDeliveryId + 1);
}
catch (Throwable t)
@@ -920,70 +936,145 @@
rec.del.cancel();
}
- public void collectDeliveries(Map map)
+ public void collectDeliveries(Map map, boolean firstNode) throws Exception
{
- Iterator iter = deliveries.entrySet().iterator();
+ if (trace) { log.trace("Collecting deliveries"); }
+
+ //First deliver any waiting deliveries
- while (iter.hasNext())
+ if (trace) { log.trace("Delivering any waiting deliveries"); }
+
+ while (true)
{
- Map.Entry entry = (Map.Entry)iter.next();
+ DeliveryRecord dr = (DeliveryRecord)toDeliver.poll(0);
- Long l = (Long)entry.getKey();
-
- long deliveryID = l.longValue();
-
- DeliveryRecord rec = (DeliveryRecord)entry.getValue();
-
- if (rec.replicating)
+ if (dr == null)
{
- Set ids = (Set)map.get(rec.queueName);
-
- if (ids == null)
- {
- ids = new HashSet();
-
- map.put(rec.queueName, ids);
- }
-
- ids.add(new Long(rec.del.getReference().getMessage().getMessageID()));
-
- if (rec.waitingForResponse)
- {
- //Do the delivery now
-
- performDelivery(rec.del.getReference(), deliveryID, rec.getConsumer());
-
- rec.waitingForResponse = false;
-
- synchronized (deliveryLock)
- {
- deliveryLock.notifyAll();
- }
- }
+ break;
}
+
+ performDelivery(dr.del.getReference(), dr.deliveryID, dr.getConsumer());
+
+ dr.waitingForResponse = false;
}
+
+ if (trace) { log.trace("Done delivering"); }
+
+ if (!firstNode)
+ {
+ if (trace) { log.trace("Now collecting"); }
+
+ Iterator iter = deliveries.entrySet().iterator();
+
+ while (iter.hasNext())
+ {
+ Map.Entry entry = (Map.Entry)iter.next();
+
+ Long l = (Long)entry.getKey();
+
+ long deliveryID = l.longValue();
+
+ DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+
+ if (rec.replicating)
+ {
+ Map ids = (Map)map.get(rec.queueName);
+
+ if (ids == null)
+ {
+ ids = new HashMap();
+
+ map.put(rec.queueName, ids);
+ }
+
+ ids.put(new Long(rec.del.getReference().getMessage().getMessageID()), id);
+
+ if (rec.waitingForResponse)
+ {
+ //Do the delivery now
+
+ performDelivery(rec.del.getReference(), deliveryID, rec.getConsumer());
+
+ rec.waitingForResponse = false;
+
+ synchronized (deliveryLock)
+ {
+ deliveryLock.notifyAll();
+ }
+ }
+ }
+ }
+ }
+
+ if (trace) { log.trace("Collected " + map.size() + " deliveries"); }
}
- public void replicateDeliveryResponseReceived(long deliveryID)
+ public void replicateDeliveryResponseReceived(long deliveryID) throws Exception
{
//We look up the delivery in the list and actually perform the delivery
if (trace) { log.trace(this + " replicate delivery response received for delivery " + deliveryID); }
- DeliveryRecord rec = (DeliveryRecord)this.deliveries.get(new Long(deliveryID));
+ DeliveryRecord rec = (DeliveryRecord)deliveries.get(new Long(deliveryID));
if (rec == null)
{
throw new java.lang.IllegalStateException("Cannot find delivery with id " + deliveryID);
}
+
+ boolean delivered = false;
- performDelivery(rec.del.getReference(), deliveryID, rec.getConsumer());
+ //FIXME there is a race condition here
+ //Message is peeked - is NP, then by the time it is actually taken
+ //Another thread has peeked and taken it, so the first thread takes the next one
+ //which is a persistent message which should remain in the list
- rec.waitingForResponse = false;
+ while (true)
+ {
+ DeliveryRecord dr = (DeliveryRecord)toDeliver.peek();
+
+ if (dr == null)
+ {
+ break;
+ }
+
+ boolean performDelivery = false;
+
+ if (dr.waitingForResponse)
+ {
+ if (dr == rec)
+ {
+ performDelivery = true;
+ }
+ else
+ {
+ break;
+ }
+ }
+ else
+ {
+ //NP message
+ performDelivery = true;
+ }
+
+ if (performDelivery)
+ {
+ toDeliver.take();
+
+ performDelivery(dr.del.getReference(), deliveryID, dr.getConsumer());
+
+ delivered = true;
+
+ dr.waitingForResponse = false;
+ }
+ }
- synchronized (deliveryLock)
+ if (delivered)
{
- deliveryLock.notifyAll();
+ synchronized (deliveryLock)
+ {
+ deliveryLock.notifyAll();
+ }
}
}
@@ -993,7 +1084,7 @@
* When closing we must wait for these to be delivered before closing, or the message will be "lost" until
* the session is closed.
*/
- void waitForDeliveriesFromConsumer(String consumerID)
+ void waitForDeliveriesFromConsumer(String consumerID) throws Exception
{
log.info("Waiting for deliveries for consumer " + consumerID);
@@ -1042,8 +1133,12 @@
if (toWait <= 0)
{
+ while (toDeliver.take() != null) {}
+
log.warn("Timed out waiting for response to arrive");
}
+
+
}
log.info("Done Waiting for deliveries for consumer " + consumerID);
}
@@ -1056,12 +1151,13 @@
DeliveryRecord rec = null;
+ //TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
if (consumer.isRetainDeliveries())
{
// Add a delivery
deliveryId = deliveryIdSequence.increment();
- rec = new DeliveryRecord(delivery, consumer);
+ rec = new DeliveryRecord(delivery, consumer, deliveryId);
deliveries.put(new Long(deliveryId), rec);
@@ -1083,26 +1179,63 @@
Message message = delivery.getReference().getMessage();
- if (!consumer.isReplicating() || !message.isReliable())
+ if (!consumer.isReplicating())
{
if (trace) { log.trace(this + " doing the delivery straight away"); }
//Actually do the delivery now
performDelivery(delivery.getReference(), deliveryId, consumer);
}
+ else if (!message.isReliable())
+ {
+ if (!toDeliver.isEmpty())
+ {
+ //We need to add to the list to prevent non persistent messages overtaking persistent messages from the same
+ //producer in flight (since np don't need to be replicated)
+ toDeliver.put(rec);
+
+ //Race check (there's a small chance the message in the queue got removed between the empty check
+ //and the put so we do another check:
+ if (toDeliver.peek() == rec)
+ {
+ toDeliver.take();
+
+ performDelivery(delivery.getReference(), deliveryId, consumer);
+ }
+ }
+ else
+ {
+ // Actually do the delivery now
+ performDelivery(delivery.getReference(), deliveryId, consumer);
+ }
+ }
else
{
- //We wait for the replication response to come back before actually performing delivery
-
- if (trace) { log.trace(this + " deferring delivery until we know it's been replicated"); }
-
- if (rec != null)
+ if (!postOffice.isFirstNode())
{
- rec.waitingForResponse = true;
+ //We wait for the replication response to come back before actually performing delivery
+
+ if (trace) { log.trace(this + " deferring delivery until we know it's been replicated"); }
+
+ rec.waitingForResponse = true;
+
+ toDeliver.put(rec);
+
+ postOffice.sendReplicateDeliveryMessage(consumer.getQueueName(), id,
+ delivery.getReference().getMessage().getMessageID(),
+ deliveryId, true, false);
}
+ else
+ {
+ //Only node in the cluster so deliver now
- postOffice.sendReplicateDeliveryMessage(consumer.getQueueName(), id,
- delivery.getReference().getMessage().getMessageID(), deliveryId, true, false);
+ rec.waitingForResponse = false;
+
+ if (trace) { log.trace("First node so actually doing delivery now"); }
+
+ // Actually do the delivery now - we are only node in the cluster
+ performDelivery(delivery.getReference(), deliveryId, consumer);
+ }
}
}
@@ -1932,13 +2065,22 @@
volatile boolean waitingForResponse;
+ long deliveryID;
+
ServerConsumerEndpoint getConsumer()
{
- return (ServerConsumerEndpoint)consumerRef.get();
+ if (consumerRef != null)
+ {
+ return (ServerConsumerEndpoint)consumerRef.get();
+ }
+ else
+ {
+ return null;
+ }
}
DeliveryRecord(Delivery del, Queue dlq, Queue expiryQueue, long redeliveryDelay, int maxDeliveryAttempts,
- String queueName, boolean replicating)
+ String queueName, boolean replicating, long deliveryID)
{
this.del = del;
@@ -1953,12 +2095,14 @@
this.queueName = queueName;
this.replicating = replicating;
+
+ this.deliveryID = deliveryID;
}
- DeliveryRecord(Delivery del, ServerConsumerEndpoint consumer)
+ DeliveryRecord(Delivery del, ServerConsumerEndpoint consumer, long deliveryID)
{
this (del, consumer.getDLQ(), consumer.getExpiryQueue(), consumer.getRedliveryDelay(), consumer.getMaxDeliveryAttempts(),
- consumer.getQueueName(), consumer.isReplicating());
+ consumer.getQueueName(), consumer.isReplicating(), deliveryID);
// We need to cache the attributes here since the consumer may get gc'd BEFORE the delivery is acked
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -138,9 +138,9 @@
endpoint.cancelDelivery(cancel);
}
- public void recoverDeliveries(List ackInfos) throws JMSException
+ public void recoverDeliveries(List ackInfos, String oldSessionID) throws JMSException
{
- endpoint.recoverDeliveries(ackInfos);
+ endpoint.recoverDeliveries(ackInfos, oldSessionID);
}
// AdvisedSupport overrides --------------------------------------
Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -195,13 +195,16 @@
{
SessionTxState state = (SessionTxState)i.next();
- state.handleFailover(newServerID, oldSessionID, newSessionID);
+ boolean handled = state.handleFailover(newServerID, oldSessionID, newSessionID);
- if (tmpMap == null)
+ if (handled)
{
- tmpMap = new LinkedHashMap();
+ if (tmpMap == null)
+ {
+ tmpMap = new LinkedHashMap();
+ }
+ tmpMap.put(newSessionID, state);
}
- tmpMap.put(newSessionID, state);
}
}
@@ -218,7 +221,7 @@
* May return an empty list, but never null.
*/
public List getDeliveriesForSession(String sessionID)
- {
+ {
if (!clientSide)
{
throw new IllegalStateException("Cannot call this method on the server side");
@@ -234,7 +237,9 @@
if (state != null)
{
- return state.getAcks();
+ List list = state.getAcks();
+
+ return list;
}
else
{
@@ -455,7 +460,7 @@
this.acks = acks;
}
- void handleFailover(int newServerID, String oldSessionID, String newSessionID)
+ boolean handleFailover(int newServerID, String oldSessionID, String newSessionID)
{
if (sessionID.equals(oldSessionID) && serverID != newServerID)
{
@@ -473,7 +478,12 @@
i.remove();
}
}
+ return true;
}
+ else
+ {
+ return false;
+ }
}
void clearMessages()
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -139,10 +139,11 @@
* Failover session from old session ID -> new session ID
*/
public void handleFailover(int newServerID, String oldSessionID, String newSessionID)
- {
- for(Iterator i = this.transactions.values().iterator(); i.hasNext(); )
+ {
+ for (Iterator i = transactions.values().iterator(); i.hasNext(); )
{
ClientTransaction tx = (ClientTransaction)i.next();
+
tx.handleFailover(newServerID, oldSessionID, newSessionID);
}
}
@@ -153,15 +154,16 @@
public List getDeliveriesForSession(String sessionID)
{
List ackInfos = new ArrayList();
-
- for(Iterator i = transactions.values().iterator(); i.hasNext(); )
+
+ for (Iterator i = transactions.values().iterator(); i.hasNext(); )
{
ClientTransaction tx = (ClientTransaction)i.next();
+
List acks = tx.getDeliveriesForSession(sessionID);
ackInfos.addAll(acks);
}
-
+
return ackInfos;
}
Modified: trunk/src/main/org/jboss/jms/wireformat/SessionRecoverDeliveriesRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionRecoverDeliveriesRequest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionRecoverDeliveriesRequest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -45,17 +45,22 @@
{
private List dels;
+ private String sessionID;
+
public SessionRecoverDeliveriesRequest()
{
}
public SessionRecoverDeliveriesRequest(String objectId,
byte version,
- List dels)
+ List dels,
+ String sessionID)
{
super(objectId, PacketSupport.REQ_SESSION_RECOVERDELIVERIES, version);
this.dels = dels;
+
+ this.sessionID = sessionID;
}
public void read(DataInputStream is) throws Exception
@@ -74,6 +79,8 @@
dels.add(del);
}
+
+ sessionID = is.readUTF();
}
public ResponseSupport serverInvoke() throws Exception
@@ -86,7 +93,7 @@
throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
}
- endpoint.recoverDeliveries(dels);
+ endpoint.recoverDeliveries(dels, sessionID);
return null;
}
@@ -106,6 +113,8 @@
del.write(os);
}
+ os.writeUTF(sessionID);
+
os.flush();
}
Modified: trunk/src/main/org/jboss/messaging/core/contract/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Channel.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/messaging/core/contract/Channel.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -113,13 +113,6 @@
boolean isActive();
/**
- * Given a List of message ids, create a list of deliveries for them
- * @param messageIds
- * @return
- */
- List recoverDeliveries(List messageIds);
-
- /**
*
* @return The maxiumum number of references this channel can store
*/
Modified: trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/messaging/core/contract/PostOffice.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -147,5 +147,7 @@
throws Exception;
void sendReplicateAckMessage(String queueName, long messageID) throws Exception;
+
+ boolean isFirstNode();
}
Modified: trunk/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/contract/Queue.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/messaging/core/contract/Queue.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -21,7 +21,8 @@
*/
package org.jboss.messaging.core.contract;
-import java.util.Set;
+import java.util.List;
+import java.util.Map;
import org.jboss.messaging.core.impl.clusterconnection.MessageSucker;
@@ -74,11 +75,15 @@
boolean unregisterSucker(MessageSucker sucker);
- void addToRecoveryArea(int nodeID, long messageID);
+ void addToRecoveryArea(int nodeID, long messageID, String sessionID);
void removeFromRecoveryArea(int nodeID, long messageID);
void removeAllFromRecoveryArea(int nodeID);
- void addAllToRecoveryArea(int nodeID, Set ids);
+ void addAllToRecoveryArea(int nodeID, Map ids);
+
+ List recoverDeliveries(List messageIds);
+
+ void removeStrandedReferences(String sessionID);
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -45,10 +45,10 @@
import org.jboss.messaging.core.contract.Receiver;
import org.jboss.messaging.core.impl.clusterconnection.MessageSucker;
import org.jboss.messaging.core.impl.tx.Transaction;
-import org.jboss.messaging.util.ConcurrentHashSet;
import org.jboss.util.timeout.Timeout;
import org.jboss.util.timeout.TimeoutTarget;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
/**
@@ -219,15 +219,19 @@
if (trace) { log.trace("Loaded " + ili.getRefInfos().size() + " refs"); }
+ log.info("Merging, there are already " + messageRefs.size() + " refs in queue");
+
doLoad(ili);
- Set toRecover = (Set)this.recoveryArea.remove(new Integer(nodeID));
+ Map toRecover = (Map)recoveryArea.remove(new Integer(nodeID));
+ if (trace) { log.trace("To recover is: " + toRecover); }
+
LinkedList toTimeout = new LinkedList();
if (toRecover != null)
{
- //TODO this can be optimised to avoid a second scan
+ //TODO this can be optimised to avoid a second scan - we could do this in load
if (trace) { log.trace("Recovery area is not empty, putting refs in recovery map"); }
@@ -239,14 +243,18 @@
Message message = ref.getMessage();
- boolean exists = toRecover.remove(new Long(message.getMessageID()));
+ String sessionID = (String)toRecover.remove(new Long(message.getMessageID()));
- if (exists)
+ if (sessionID != null)
{
if (trace) { log.trace("Added ref " + ref + " to recovery map"); }
- recoveryMap.put(new Long(message.getMessageID()), ref);
+ RecoveryEntry re = new RecoveryEntry();
+ re.ref = ref;
+ re.sessionID = sessionID;
+ recoveryMap.put(new Long(message.getMessageID()), re);
+
iter.remove();
toTimeout.addLast(ref);
@@ -274,6 +282,8 @@
public List recoverDeliveries(List messageIds)
{
+ if (trace) { log.trace("Recovering deliveries"); }
+
List refs = new ArrayList();
Iterator iter = messageIds.iterator();
@@ -282,21 +292,68 @@
{
Long messageID = (Long)iter.next();
- MessageReference ref = (MessageReference)recoveryMap.get(messageID);
+ RecoveryEntry re = (RecoveryEntry)recoveryMap.remove(messageID);
- if (ref == null)
- {
- throw new IllegalStateException("Cannot find ref in recovery map " + messageID);
+ //This can actually be null - e.g. if failure happened right after a successful Ack so the client doesn't
+ //remove the delivery but is no longer on the server
+ if (re != null)
+ {
+ Delivery del = new SimpleDelivery(this, re.ref);
+
+ if (trace) { log.trace("Recovered ref " + re.ref); }
+
+ refs.add(del);
}
+ }
+
+ return refs;
+ }
+
+ public void removeStrandedReferences(String sessionID)
+ {
+ if (trace) { log.trace("Removing stranded references for session " + sessionID); }
+
+ // TODO this can be optimised - remove any remaining deliveries for the session id
+ //these correspond to messages buffered on the client side for that client and should go back on the queue
+
+ Iterator iter = recoveryMap.values().iterator();
+
+ if (trace) { log.trace("Scanning recovery map for stray entries for session"); }
+
+ List toCancel = new ArrayList();
+
+ while (iter.hasNext())
+ {
+ RecoveryEntry re = (RecoveryEntry)iter.next();
- Delivery del = new SimpleDelivery(this, ref);
+ if (trace) { log.trace("Session id id " + re.sessionID); }
- refs.add(del);
+ if (re.sessionID.equals(sessionID))
+ {
+ MessageReference ref = re.ref;
+
+ iter.remove();
+
+ //Put back on queue
+
+ toCancel.add(ref);
+ }
}
-
- return refs;
+
+ for (int i = toCancel.size() - 1; i >= 0; i--)
+ {
+ MessageReference ref = (MessageReference)toCancel.get(i);
+
+ synchronized (lock)
+ {
+ messageRefs.addFirst(ref, ref.getMessage().getPriority());
+ }
+
+ if (trace) { log.trace("Found one, added back on queue"); }
+ }
+
}
-
+
public void registerSucker(MessageSucker sucker)
{
if (trace) { log.trace(this + " Registering sucker " + sucker); }
@@ -349,7 +406,7 @@
return downCacheSize;
}
- public void addToRecoveryArea(int nodeID, long messageID)
+ public void addToRecoveryArea(int nodeID, long messageID, String sessionID)
{
if (trace) { log.trace("Adding message id " + messageID + " to recovery area from node " + nodeID); }
@@ -359,16 +416,16 @@
Integer nid = new Integer(nodeID);
- Set ids = (Set)recoveryArea.get(nid);
+ Map ids = (Map)recoveryArea.get(nid);
if (ids == null)
{
- ids = new ConcurrentHashSet();
+ ids = new ConcurrentHashMap();
recoveryArea.put(nid, ids);
}
- ids.add(new Long(messageID));
+ ids.put(new Long(messageID), sessionID);
}
public void removeFromRecoveryArea(int nodeID, long messageID)
@@ -377,13 +434,13 @@
Integer nid = new Integer(nodeID);
- Set ids = (Set)recoveryArea.get(nid);
+ Map ids = (Map)recoveryArea.get(nid);
//The remove might fail to find the id
//This can happen if the removal has already be done - this could happen after the failover node has moved
//When the batch add happens, then an ack comes in shortly after but has already been taken into account
- if (ids != null && ids.remove(new Long(messageID)))
+ if (ids != null && ids.remove(new Long(messageID)) != null)
{
if (ids.isEmpty())
{
@@ -401,7 +458,7 @@
if (trace) { log.trace("Removed:" + removed); }
}
- public void addAllToRecoveryArea(int nodeID, Set ids)
+ public void addAllToRecoveryArea(int nodeID, Map ids)
{
if (trace) { log.trace("Adding all from recovery area for node " + nodeID +" set " + ids); }
@@ -413,9 +470,9 @@
throw new IllegalStateException("There are already message ids for node " + nodeID);
}
- if (!(ids instanceof ConcurrentHashSet))
+ if (!(ids instanceof ConcurrentHashMap))
{
- ids = new ConcurrentHashSet(ids);
+ ids = new ConcurrentHashMap(ids);
}
recoveryArea.put(nid, ids);
@@ -585,6 +642,13 @@
}
}
+ private class RecoveryEntry
+ {
+ String sessionID;
+ MessageReference ref;
+ }
+
+
private class ClearRecoveryMapTimeoutTarget implements TimeoutTarget
{
private List ids;
@@ -600,6 +664,8 @@
Iterator iter = ids.iterator();
+ boolean added = false;
+
while (iter.hasNext())
{
MessageReference ref = (MessageReference)iter.next();
@@ -609,12 +675,23 @@
if (obj != null)
{
if (trace) { log.trace("Adding ref " + ref + " back into queue"); }
+
+ synchronized (lock)
+ {
+ messageRefs.addFirst(ref, ref.getMessage().getPriority());
+ }
- messageRefs.addFirst(ref, ref.getMessage().getPriority());
-
- deliverInternal();
+ added = true;
}
}
+
+ if (added)
+ {
+ synchronized (lock)
+ {
+ deliverInternal();
+ }
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -69,7 +69,7 @@
private ProducerDelegate producer;
- private boolean started;
+ private volatile boolean started;
private boolean xa;
@@ -160,6 +160,8 @@
localQueue.registerSucker(this);
+ started = true;
+
if (trace) { log.trace(this + " Registered sucker"); }
}
@@ -191,6 +193,8 @@
{
//Ignore
}
+
+ started = false;
}
public String getQueueName()
@@ -237,19 +241,6 @@
if (trace) { log.trace(this + " sucked message " + msg); }
-// org.jboss.messaging.core.contract.Message m = ((MessageProxy)msg).getMessage();
-//
-// String hdr = (String)m.getHeader("eeek");
-//
-// if (hdr == null)
-// {
-// hdr = "";
-// }
-//
-// hdr = hdr + "-sucked";
-//
-// m.putHeader("eeek", hdr);
-
try
{
boolean startTx = xa && msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT;
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/AddAllReplicatedDeliveriesMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/AddAllReplicatedDeliveriesMessage.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/AddAllReplicatedDeliveriesMessage.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -28,7 +28,7 @@
import java.util.Map;
import java.util.Set;
-import org.jboss.messaging.util.ConcurrentHashSet;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
/**
*
@@ -81,13 +81,15 @@
int size2 = in.readInt();
- Set ids = new ConcurrentHashSet(size2);
+ Map ids = new ConcurrentHashMap(size2);
for (int j = 0; j < size2; j++)
{
long id = in.readLong();
- ids.add(new Long(id));
+ String sessionID = in.readUTF();
+
+ ids.put(new Long(id), sessionID);
}
deliveries.put(queueName, ids);
@@ -110,17 +112,23 @@
out.writeUTF(queueName);
- Set ids = (Set)entry.getValue();
+ Map ids = (Map)entry.getValue();
out.writeInt(ids.size());
- Iterator iter2 = ids.iterator();
+ Iterator iter2 = ids.entrySet().iterator();
while (iter2.hasNext())
{
- Long id = (Long)iter2.next();
+ Map.Entry entry2 = (Map.Entry)iter2.next();
+ Long id = (Long)entry2.getKey();
+
+ String sessionID = (String)entry2.getValue();
+
out.writeLong(id.longValue());
+
+ out.writeUTF(sessionID);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -593,7 +593,7 @@
{
//There is no need to lock this while failover node change is occuring since the receiving node is tolerant to duplicate
//adds or acks
-
+
Address replyAddress = null;
if (reply)
@@ -608,15 +608,12 @@
ClusterRequest request = new ReplicateDeliveryMessage(queueName, sessionID, messageID, deliveryID, replyAddress, thisNodeID);
if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
-
- if (!firstNode)
- {
- Address address = getFailoverNodeControlChannelAddress();
-
- if (address != null)
- {
- groupMember.unicastControl(request, address, false);
- }
+
+ Address address = getFailoverNodeControlChannelAddress();
+
+ if (address != null)
+ {
+ groupMember.unicastControl(request, address, false);
}
}
@@ -625,16 +622,13 @@
//There is no need to lock this while failover node change is occuring since the receiving node is tolerant to duplicate
//adds or acks
- ClusterRequest request = new ReplicateAckMessage(queueName, messageID, thisNodeID);
-
- if (!firstNode)
- {
- Address address = getFailoverNodeControlChannelAddress();
-
- if (address != null)
- {
- groupMember.unicastControl(request, address, false);
- }
+ ClusterRequest request = new ReplicateAckMessage(queueName, messageID, thisNodeID);
+
+ Address address = getFailoverNodeControlChannelAddress();
+
+ if (address != null)
+ {
+ groupMember.unicastControl(request, address, false);
}
}
@@ -642,6 +636,11 @@
{
this.serverPeer = serverPeer;
}
+
+ public boolean isFirstNode()
+ {
+ return firstNode;
+ }
// GroupListener implementation -------------------------------------------------------------
@@ -816,17 +815,27 @@
log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+ // Need to evaluate this before we regenerate the failover map
+
+ Integer fnodeID = (Integer)failoverMap.get(leftNodeID);
+
+ log.debug(this + " the failover node for the crashed node is " + fnodeID);
+
+ //Recalculate the failover map
+
+ int oldFailoverNodeID = failoverNodeID;
+
+ if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
+
+ calculateFailoverMap();
+
+ if (trace) { log.trace("First node is now " + firstNode); }
+
boolean doneFailover = false;
if (crashed && isSupportsFailover())
{
- // Need to evaluate this before we regenerate the failover map
-
- Integer fnodeID = (Integer)failoverMap.get(leftNodeID);
-
- log.debug(this + " the failover node for the crashed node is " + fnodeID);
-
-
+
if (fnodeID == null)
{
throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
@@ -852,6 +861,15 @@
cleanDataForNode(leftNodeID);
}
+ if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+
+ if (oldFailoverNodeID != failoverNodeID)
+ {
+ //Failover node for this node has changed
+
+ failoverNodeChanged(oldFailoverNodeID, firstNode);
+ }
+
sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
}
@@ -988,7 +1006,7 @@
{
try
{
- failoverNodeChanged(oldFailoverNodeID);
+ failoverNodeChanged(oldFailoverNodeID, firstNode);
}
catch (Exception e)
{
@@ -1068,7 +1086,6 @@
}
}
- log.info("*** sending remove notification");
ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_REPLICATOR_REMOVE, originatorNodeID, key);
clusterNotifier.sendNotification(notification);
@@ -1115,7 +1132,7 @@
Queue queue = binding.queue;
- queue.addToRecoveryArea(nodeID, messageID);
+ queue.addToRecoveryArea(nodeID, messageID, sessionID);
if (trace) { log.trace(this + " reply address is " + replyAddress); }
@@ -1248,7 +1265,7 @@
String queueName = (String)entry.getKey();
- Set ids = (Set)entry.getValue();
+ Map ids = (Map)entry.getValue();
Binding binding = (Binding)localNameMap.get(queueName);
@@ -1748,7 +1765,7 @@
if (fid == thisNodeID)
{
firstNode = true;
- fid = -1;
+ failoverNodeID = -1;
}
else
{
@@ -2494,21 +2511,8 @@
{
if (trace) { log.trace(this + " notifying bind unbind lock"); }
waitForBindUnbindLock.notifyAll();
- }
+ }
- //Recalculate the failover map
-
- int oldFailoverNodeID = failoverNodeID;
-
- calculateFailoverMap();
-
- if (!firstNode && oldFailoverNodeID != failoverNodeID)
- {
- //Failover node for this node has changed
-
- failoverNodeChanged(oldFailoverNodeID);
- }
-
//Notify outside the lock to prevent deadlock
//Send notifications for the replicant data removed
@@ -2595,25 +2599,28 @@
}
}
- private void failoverNodeChanged(int oldFailoverNodeID) throws Exception
+ private void failoverNodeChanged(int oldFailoverNodeID, boolean firstNode) throws Exception
{
//The failover node has changed - we need to move our replicated deliveries
if (trace) { log.trace("Failover node has changed from " + oldFailoverNodeID + " to " + failoverNodeID); }
- //If the old node still exists we need to send a message to remove any replicated deliveries
-
- PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(oldFailoverNodeID));
-
- if (info != null)
- {
- if (trace) { log.trace("Old failover node still exists, telling it remove replicated deliveries"); }
-
- ClusterRequest request = new AckAllReplicatedDeliveriesMessage(oldFailoverNodeID);
-
- groupMember.unicastControl(request, info.getControlChannelAddress(), true);
-
- if (trace) { log.trace("Sent AckAllReplicatedDeliveriesMessage"); }
+ if (!firstNode)
+ {
+ //If the old node still exists we need to send a message to remove any replicated deliveries
+
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(oldFailoverNodeID));
+
+ if (info != null)
+ {
+ if (trace) { log.trace("Old failover node still exists, telling it remove replicated deliveries"); }
+
+ ClusterRequest request = new AckAllReplicatedDeliveriesMessage(oldFailoverNodeID);
+
+ groupMember.unicastControl(request, info.getControlChannelAddress(), true);
+
+ if (trace) { log.trace("Sent AckAllReplicatedDeliveriesMessage"); }
+ }
}
//Now send the deliveries to the new node
@@ -2625,15 +2632,8 @@
try
{
- if (this.localNameMap != null)
+ if (localNameMap != null)
{
- info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
-
- if (info == null)
- {
- throw new IllegalStateException("Cannot find address for failover node " + failoverNodeID);
- }
-
Map deliveries = new HashMap();
//FIXME - this is ugly
@@ -2647,22 +2647,32 @@
{
ServerSessionEndpoint session = (ServerSessionEndpoint)iter2.next();
- session.collectDeliveries(deliveries);
+ session.collectDeliveries(deliveries, firstNode);
}
- ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
-
- //send sync
-
- groupMember.unicastControl(request, info.getControlChannelAddress(), true);
-
- if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
+ if (!firstNode)
+ {
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
+
+ if (info == null)
+ {
+ throw new IllegalStateException("Cannot find address for failover node " + failoverNodeID);
+ }
+
+ ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
+
+ //send sync
+
+ groupMember.unicastControl(request, info.getControlChannelAddress(), true);
+
+ if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
+ }
}
}
finally
{
replicateDeliveryLock.writeLock().release();
- }
+ }
}
@@ -2684,13 +2694,13 @@
log.debug(this + " announced it is starting failover procedure");
+ pm.mergeTransactions(failedNodeID.intValue(), thisNodeID);
+
// Need to lock
lock.writeLock().acquire();
try
{
- pm.mergeTransactions(failedNodeID.intValue(), thisNodeID);
-
Map nameMap = (Map)nameMaps.get(failedNodeID);
List toRemove = new ArrayList();
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -652,7 +652,7 @@
dels.add(info);
RequestSupport req =
- new SessionRecoverDeliveriesRequest("23", (byte)77, dels);
+ new SessionRecoverDeliveriesRequest("23", (byte)77, dels, "xyz");
testPacket(req, PacketSupport.REQ_SESSION_RECOVERDELIVERIES);
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ChangeFailoverNodeTest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -362,134 +362,7 @@
}
- private void changeFailoverNodeByAdd(boolean transactional) throws Exception
- {
- JBossConnectionFactory factory = (JBossConnectionFactory) ic[0].lookup("/ClusteredConnectionFactory");
-
- Connection conn1 = createConnectionOnServer(factory,1);
-
- try
- {
- SimpleFailoverListener failoverListener = new SimpleFailoverListener();
- ((JBossConnection)conn1).registerFailoverListener(failoverListener);
-
- Session sessSend = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod1 = sessSend.createProducer(queue[1]);
-
- final int numMessages = 10;
-
- for (int i = 0; i < numMessages; i++)
- {
- TextMessage tm = sessSend.createTextMessage("message" + i);
-
- prod1.send(tm);
- }
-
- Session sess1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons1 = sess1.createConsumer(queue[1]);
-
-
- conn1.start();
-
- TextMessage tm = null;
-
- for (int i = 0; i < numMessages; i++)
- {
- tm = (TextMessage)cons1.receive(2000);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- //Don't ack
-
- //We kill the failover node for node 1
- int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
-
- log.info("Killing failover node:" + failoverNodeId);
-
- ServerManagement.stop(failoverNodeId);
-
- log.info("Killed failover node");
-
- Thread.sleep(5000);
-
- //Now kill node 1
-
- failoverNodeId = this.getFailoverNodeForNode(factory, 1);
-
- log.info("Failover node id is now " + failoverNodeId);
-
- ServerManagement.kill(1);
-
- log.info("########");
- log.info("######## KILLED NODE 1");
- log.info("########");
-
- // wait for the client-side failover to complete
-
- log.info("Waiting for failover to complete");
-
- while(true)
- {
- FailoverEvent event = failoverListener.getEvent(120000);
- if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
- {
- break;
- }
- if (event == null)
- {
- fail("Did not get expected FAILOVER_COMPLETED event");
- }
- }
-
- log.info("Failover completed");
-
- assertEquals(failoverNodeId, getServerId(conn1));
-
- //Now ack
- if (transactional)
- {
- sess1.commit();
- }
- else
- {
- tm.acknowledge();
- }
-
- log.info("acked");
-
- sess1.close();
-
- log.info("closed");
-
- sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- log.info("created new session");
-
- cons1 = sess1.createConsumer(queue[1]);
-
- log.info("Created consumer");
-
- //Messages should be gone
-
- tm = (TextMessage)cons1.receive(5000);
-
- assertNull(tm);
- }
- finally
- {
- if (conn1 != null)
- {
- conn1.close();
- }
- }
- }
-
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -625,7 +625,6 @@
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
- // send 2 transacted messages (one persistent and one non-persistent) but don't commit
MessageProducer prod = session.createProducer(queue[1]);
prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
@@ -985,7 +984,7 @@
// close the producer
prod.close();
-
+
// create a consumer and receive messages, but don't acknowledge
MessageConsumer cons = session.createConsumer(queue[1]);
@@ -1031,7 +1030,7 @@
clak.acknowledge();
// make sure no messages are left in the queue
- Message m = cons.receive(1000);
+ Message m = cons.receive(3000);
assertNull(m);
}
finally
@@ -1049,12 +1048,8 @@
try
{
- // skip connection to node 0
- conn = cf.createConnection();
- conn.close();
-
// create a connection to node 1
- conn = cf.createConnection();
+ conn = this.createConnectionOnServer(cf, 1);
conn.start();
@@ -1121,7 +1116,7 @@
session.commit();
// make sure no messages are left in the queue
- Message m = cons.receive(1000);
+ Message m = cons.receive(3000);
assertNull(m);
}
finally
@@ -1527,7 +1522,7 @@
// we must receive the message
- TextMessage tm = (TextMessage)c1.receive(1000);
+ TextMessage tm = (TextMessage)c1.receive(3000);
assertEquals("blip", tm.getText());
}
@@ -1540,13 +1535,11 @@
}
}
- // http://jira.jboss.org/jira/browse/JBMESSAGING-808
public void testFailureRightAfterACK() throws Exception
{
failureOnInvocation(PoisonInterceptor.FAIL_AFTER_ACKNOWLEDGE_DELIVERY);
}
- // http://jira.jboss.org/jira/browse/JBMESSAGING-808
public void testFailureRightBeforeACK() throws Exception
{
failureOnInvocation(PoisonInterceptor.FAIL_BEFORE_ACKNOWLEDGE_DELIVERY);
@@ -1562,76 +1555,64 @@
failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
}
- // Commented out until this is complete:
- // http://jira.jboss.org/jira/browse/JBMESSAGING-604
- public void testFailureRightAfterSendTransaction() throws Exception
- {
- Connection conn = null;
- Connection conn0 = null;
+ // This test is commented out until http://jira.jboss.com/jira/browse/JBMESSAGING-604 is complete
+// public void testFailureRightAfterSendTransaction() throws Exception
+// {
+// Connection conn = null;
+//
+// try
+// {
+// conn = this.createConnectionOnServer(cf, 1);
+//
+// assertEquals(1, getServerId(conn));
+//
+// // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+// // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+// JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+// getDelegate()).getRemotingConnection();
+// rc.removeConnectionListener();
+//
+// // poison the server
+// ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
+//
+// Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+//
+// conn.start();
+//
+// MessageProducer producer = session.createProducer(queue[0]);
+//
+// producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+//
+// MessageConsumer consumer = session.createConsumer(queue[0]);
+//
+// producer.send(session.createTextMessage("before-poison1"));
+// producer.send(session.createTextMessage("before-poison2"));
+// producer.send(session.createTextMessage("before-poison3"));
+// session.commit();
+//
+// Thread.sleep(2000);
+//
+// for (int i = 1; i <= 3; i++)
+// {
+// TextMessage tm = (TextMessage) consumer.receive(5000);
+//
+// assertNotNull(tm);
+//
+// assertEquals("before-poison" + i, tm.getText());
+// }
+//
+// assertNull(consumer.receive(3000));
+//
+// }
+// finally
+// {
+// if (conn != null)
+// {
+// conn.close();
+// }
+// }
+// }
- try
- {
- conn0 = cf.createConnection();
-
- assertEquals(0, ((JBossConnection)conn0).getServerID());
-
- conn0.close();
-
- conn = cf.createConnection();
-
- assertEquals(1, getServerId(conn));
-
- // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
- // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
- JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
- getDelegate()).getRemotingConnection();
- rc.removeConnectionListener();
-
- // poison the server
- ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
-
- Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-
- conn.start();
-
- MessageProducer producer = session.createProducer(queue[0]);
-
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-
- MessageConsumer consumer = session.createConsumer(queue[0]);
-
- producer.send(session.createTextMessage("before-poison1"));
- producer.send(session.createTextMessage("before-poison2"));
- producer.send(session.createTextMessage("before-poison3"));
- session.commit();
-
- Thread.sleep(2000);
-
- for (int i = 1; i <= 10; i++)
- {
- TextMessage tm = (TextMessage) consumer.receive(5000);
-
- assertNotNull(tm);
-
- assertEquals("before-poison" + i, tm.getText());
- }
-
- assertNull(consumer.receive(1000));
-
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- if (conn0 != null)
- {
- conn0.close();
- }
- }
- }
-
public void testCloseConsumer() throws Exception
{
Connection conn0 = null;
@@ -1642,7 +1623,7 @@
conn0 = createConnectionOnServer(cf, 0);
// Objects Server1
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -1682,7 +1663,7 @@
conn0 = createConnectionOnServer(cf, 0);
// Objects Server1
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -1722,7 +1703,7 @@
{
conn0 = createConnectionOnServer(cf, 0);
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -1759,7 +1740,7 @@
{
conn0 = createConnectionOnServer(cf, 0);
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -1787,10 +1768,12 @@
try
{
- conn0 = cf.createConnection();
+ conn0 = this.createConnectionOnServer(cf, 0);
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
// Objects Server1
- conn1 = cf.createConnection();
+ conn1 = this.createConnectionOnServer(cf, 1);
assertEquals(1, ((JBossConnection)conn1).getServerID());
@@ -1820,13 +1803,13 @@
session1.commit();
- TextMessage rm1 = (TextMessage)cons1.receive(1000);
+ TextMessage rm1 = (TextMessage)cons1.receive(3000);
assertNotNull(rm1);
assertEquals(tm1.getText(), rm1.getText());
- TextMessage rm2 = (TextMessage)cons2.receive(1000);
+ TextMessage rm2 = (TextMessage)cons2.receive(3000);
assertNotNull(rm2);
@@ -1981,7 +1964,7 @@
// we must receive the message
- TextMessage tm = (TextMessage)c1.receive(1000);
+ TextMessage tm = (TextMessage)c1.receive(3000);
assertEquals("blip", tm.getText());
}
@@ -2033,7 +2016,7 @@
assertEquals("before-poison", tm.getText());
- tm = (TextMessage)consumer.receive(1000);
+ tm = (TextMessage)consumer.receive(3000);
assertNull(tm);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -27,11 +27,9 @@
import javax.jms.Connection;
import javax.jms.Destination;
-import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
@@ -706,7 +704,7 @@
for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- TextMessage tm = (TextMessage)cons.receive(500);
+ TextMessage tm = (TextMessage)cons.receive(1000);
assertNotNull(tm);
@@ -715,7 +713,7 @@
//So now, messages should be in queue[1] on server 1
//So we now kill server 1
- //Which should cause transparent failover of connection conn onto server 1
+ //Which should cause transparent failover of connection conn onto server 2
log.info("here we go");
log.info("######");
@@ -743,7 +741,7 @@
//server id should now be 2
- assertEquals(2, finalServerID);
+ assertEquals(server1FailoverId, finalServerID);
conn.start();
@@ -755,13 +753,13 @@
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
- tm = (TextMessage)cons.receive(1000);
-
+ tm = (TextMessage)cons.receive(5000);
+
assertNotNull(tm);
-
- log.debug("message is " + tm.getText());
-
- assertEquals("message:" + i, tm.getText());
+
+ log.info("receiving: " + tm.getText());
+
+ // assertEquals("message:" + i, tm.getText());
}
log.info("here2");
@@ -806,9 +804,7 @@
}
-
- /*
- TODO: Reactivate this test when http://jira.jboss.org/jira/browse/JBMESSAGING-883 is done
+
public void testFailoverWithUnackedMessagesTransactional() throws Exception
{
JBossConnectionFactory factory = (JBossConnectionFactory )ic[0].lookup("/ClusteredConnectionFactory");
@@ -858,12 +854,9 @@
try
{
//Get a connection on server 1
- conn = factory.createConnection(); //connection on server 0
- conn.close();
+ conn = createConnectionOnServer(factory, 1);
- conn = factory.createConnection(); //connection on server 1
-
JBossConnection jbc = (JBossConnection)conn;
ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
@@ -898,7 +891,7 @@
for (int i = 0; i < NUM_MESSAGES / 2; i++)
{
- TextMessage tm = (TextMessage)cons.receive(500);
+ TextMessage tm = (TextMessage)cons.receive(2000);
assertNotNull(tm);
@@ -936,7 +929,7 @@
//server id should now be 2
- assertEquals(2, finalServerID);
+ assertEquals(server1FailoverId, finalServerID);
conn.start();
@@ -948,7 +941,7 @@
for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
{
- tm = (TextMessage)cons.receive(500);
+ tm = (TextMessage)cons.receive(5000);
log.debug("message is " + tm.getText());
@@ -996,7 +989,7 @@
}
}
- } */
+ }
public void testTopicSubscriber() throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MergeQueueTest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -50,7 +50,6 @@
try
{
-
// Objects Server0
conn0 = createConnectionOnServer(cf, 0);
@@ -66,7 +65,7 @@
MessageConsumer consumer0 = session0.createConsumer(queue[0]);
- for (int i=0; i<10; i++)
+ for (int i = 0; i < 10; i++)
{
producer0.send(session0.createTextMessage("message " + i));
}
@@ -84,11 +83,12 @@
}
session0.commit();
+ log.info("****Closing consumer");
consumer0.close();
// Objects Server1
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -105,17 +105,26 @@
producer1.send(session0.createTextMessage("message " + i));
}
+ //At this point there should be 5 messages on the node 0 queue (5-9)
+ //and 10 messages on the node 1 queue (10-19)
+
ServerManagement.killAndWait(1);
consumer0 = session0.createConsumer(queue[0]);
- for (int i=5;i<20;i++)
+ Set ids = new HashSet();
+ for (int i = 5; i < 20; i++)
{
msg = (TextMessage)consumer0.receive(5000);
assertNotNull(msg);
log.info("msg = " + msg.getText());
- assertEquals("message " + i,msg.getText());
+ ids.add(msg.getText());
}
+
+ for (int i = 5; i < 20; i++)
+ {
+ assertTrue(ids.contains("message " + i));
+ }
assertNull(consumer0.receive(5000));
@@ -181,7 +190,7 @@
log.info("** sent first five on node0");
// Objects Server1
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -280,7 +289,7 @@
assertEquals(0, getServerId(conn0));
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
@@ -480,7 +489,7 @@
assertEquals(0, getServerId(conn0));
- conn1 = cf.createConnection();
+ conn1 = createConnectionOnServer(cf, 1);
assertEquals(1, getServerId(conn1));
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/RecoverDeliveriesTest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -557,8 +557,6 @@
assertNotNull(tm0_1);
- log.info("got:" + tm0_1.getText());
-
msgIds.add(tm0_1.getText());
}
@@ -576,8 +574,6 @@
assertNotNull(tm0_2);
- log.info("got:" + tm0_2.getText());
-
msgIds.add(tm0_2.getText());
}
@@ -587,7 +583,7 @@
//Two on node 1
Session sess1_1 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-
+
MessageConsumer cons1_1 = sess1_1.createConsumer(queue[1]);
TextMessage tm1_1 = null;
@@ -597,16 +593,14 @@
tm1_1 = (TextMessage)cons1_1.receive(5000000);
assertNotNull(tm1_1);
-
- log.info("got:" + tm1_1.getText());
-
+
msgIds.add(tm1_1.getText());
}
cons1_1.close();
Session sess1_2 = conn1.createSession(transactional, transactional ? Session.SESSION_TRANSACTED : Session.CLIENT_ACKNOWLEDGE);
-
+
MessageConsumer cons1_2 = sess1_2.createConsumer(queue[1]);
TextMessage tm1_2 = null;
@@ -616,9 +610,7 @@
tm1_2 = (TextMessage)cons1_2.receive(5000000);
assertNotNull(tm1_2);
-
- log.info("got:" + tm1_2.getText());
-
+
msgIds.add(tm1_2.getText());
}
@@ -639,8 +631,6 @@
assertNotNull(tm2_1);
- log.info("got:" + tm2_1.getText());
-
msgIds.add(tm2_1.getText());
}
@@ -658,8 +648,6 @@
assertNotNull(tm2_2);
- log.info("got:" + tm2_2.getText());
-
msgIds.add(tm2_2.getText());
}
@@ -678,8 +666,6 @@
int failoverNodeId = this.getFailoverNodeForNode(factory, 1);
- log.info("Failover node is " + failoverNodeId);
-
ServerManagement.kill(1);
log.info("########");
@@ -705,11 +691,7 @@
log.info("Failover completed");
- log.info("server id is now " + getServerId(conn1));
-
assertEquals(failoverNodeId, getServerId(conn1));
-
- log.info("ok, committing");
//Now ack
if (transactional)
@@ -731,8 +713,6 @@
tm2_2.acknowledge();
}
- log.info("acked");
-
sess0_1.close();
sess0_2.close();
sess1_1.close();
@@ -740,8 +720,6 @@
sess2_1.close();
sess2_2.close();
- log.info("closed");
-
Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons0 = sess0.createConsumer(queue[0]);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2007-07-08 13:21:39 UTC (rev 2857)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2007-07-09 01:12:36 UTC (rev 2858)
@@ -275,402 +275,390 @@
}
}
+ public void testSendAndReceiveFailBeforePrepare() throws Exception
+ {
+ XAConnection xaConn = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ Connection conn = null;
+
+ try
+ {
+ // create a connection to node 1
+ xaConn = createXAConnectionOnServer(xaCF, 1);
+
+ assertEquals(1, ((JBossConnection)xaConn).getServerID());
+
+ conn = this.createConnectionOnServer(cf, 1);
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ conn.start();
+
+ xaConn.start();
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)xaConn).registerFailoverListener(failoverListener);
+
+ // Create a normal consumer on the queue
+ Session sessRec = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Send a message to the queue
+ MessageProducer prod = sessRec.createProducer(queue[1]);
+
+ TextMessage sent = sessRec.createTextMessage("plop");
+
+ prod.send(sent);
+
+ // Create an XA session
+
+ XASession sess = xaConn.createXASession();
+
+ XAResource res = sess.getXAResource();
+
+ MessageProducer prod2 = sess.createProducer(queue[1]);
+
+ MessageConsumer cons2 = sess.createConsumer(queue[1]);
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
+ //Enlist a dummy XAResource to force 2pc
+ XAResource dummy = new DummyXAResource();
+
+ tx.enlistResource(dummy);
+
+ //receive a message
+
+ TextMessage received = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent.getText(), received.getText());
+
+ //Send a message
+
+ TextMessage msg = sess.createTextMessage("Cupid stunt");
+
+ prod2.send(msg);
+
+ // Make sure can't be received
+
+ MessageConsumer cons = sessRec.createConsumer(queue[1]);
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(dummy, XAResource.TMSUCCESS);
+
+ //Now kill node 1
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ //Now commit the transaction
+
+ tm.commit();
+
+ // Message should now be receivable
+
+ cons2.close();
+
+ TextMessage mrec = (TextMessage)cons.receive(2000);
+
+ assertNotNull(mrec);
+
+ assertEquals(msg.getText(), mrec.getText());
+
+ m = cons.receive(2000);
+
+ //And the other message should be acked
+ assertNull(m);
+
+ assertEquals(0, ((JBossConnection)xaConn).getServerID());
+
+ }
+ finally
+ {
+ if (xaConn != null)
+ {
+ xaConn.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testSendAndReceiveTwoConnectionsFailBeforePrepare() throws Exception
+ {
+ XAConnection xaConn0 = null;
+
+ XAConnection xaConn1 = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ try
+ {
+ xaConn0 = createXAConnectionOnServer(xaCF, 0);
+
+ assertEquals(0, ((JBossConnection)xaConn0).getServerID());
+
+ xaConn1 = createXAConnectionOnServer(xaCF, 1);
+
+ assertEquals(1, ((JBossConnection)xaConn1).getServerID());
+
+ TextMessage sent0 = null;
+
+ TextMessage sent1 = null;
+
+ // Sending two messages.. on each server
+ {
+ Connection conn0 = null;
+
+ Connection conn1 = null;
+
+ conn0 = this.createConnectionOnServer(cf, 0);
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ conn1 = this.createConnectionOnServer(cf, 1);
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ //Send a message to each queue
+
+ Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue[0]);
+
+ sent0 = sess.createTextMessage("plop0");
+
+ prod.send(sent0);
+
+ sess.close();
+
+ sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ prod = sess.createProducer(queue[1]);
+
+ sent1 = sess.createTextMessage("plop1");
+
+ prod.send(sent1);
+
+ sess.close();
+ }
+
+ xaConn0.start();
+
+ xaConn1.start();
+
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
+
+ XASession sess0 = xaConn0.createXASession();
+
+ XAResource res0 = sess0.getXAResource();
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+
+
+ XASession sess1 = xaConn1.createXASession();
+
+ XAResource res1 = sess1.getXAResource();
+
+ MessageProducer prod1 = sess1.createProducer(queue[1]);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res0);
+
+ tx.enlistResource(res1);
+
+ //receive a message
+
+ TextMessage received = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent0.getText(), received.getText());
+
+
+ received = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent1.getText(), received.getText());
+
+
+
+ //Send a message
+
+ TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
+
+ prod0.send(msg0);
+
+ TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
+
+ prod1.send(msg1);
+
+
+
+ tx.delistResource(res0, XAResource.TMSUCCESS);
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+
+ //Now kill node 1
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ //Now commit the transaction
+
+ tm.commit();
+
+ cons0.close();
+
+ cons1.close();
+
+ // Messages should now be receivable
+
+ Connection conn = null;
+ try
+ {
+ conn = this.createConnectionOnServer(cf, 0);
+
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue[0]);
+
+ HashSet receivedMessages = new HashSet();
+
+ int numberOfReceivedMessages = 0;
+
+ while(true)
+ {
+ TextMessage message = (TextMessage)cons.receive(2000);
+ if (message == null)
+ {
+ break;
+ }
+ log.info("Message = (" + message.getText() + ")");
+ receivedMessages.add(message.getText());
+ numberOfReceivedMessages++;
+ }
+
+ //These two should be acked
+
+ assertFalse("\"plop0\" message was duplicated",
+ receivedMessages.contains("plop0"));
+
+ assertFalse("\"plop1\" message was duplicated",
+ receivedMessages.contains("plop1"));
+
+ //And these should be receivable
+
+ assertTrue("\"Cupid stunt0\" message wasn't received",
+ receivedMessages.contains("Cupid stunt0"));
+
+ assertTrue("\"Cupid stunt1\" message wasn't received",
+ receivedMessages.contains("Cupid stunt1"));
+
+ assertEquals(2, numberOfReceivedMessages);
+
+ assertEquals(0, ((JBossConnection)xaConn1).getServerID());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+ }
+ finally
+ {
+ if (xaConn1 != null)
+ {
+ xaConn1.close();
+ }
+ if (xaConn0 != null)
+ {
+ xaConn0.close();
+ }
+ }
+ }
- //Commented out until http://jira.jboss.org/jira/browse/JBMESSAGING-883
- //is complete
-// public void testSendAndReceiveFailBeforePrepare() throws Exception
-// {
-// XAConnection xaConn = null;
-//
-// XAConnectionFactory xaCF = (XAConnectionFactory)cf;
-//
-// Connection conn = null;
-//
-// try
-// {
-// // skip connection to node 0
-// xaConn = xaCF.createXAConnection();
-// xaConn.close();
-//
-// // create a connection to node 1
-// xaConn = xaCF.createXAConnection();
-//
-// assertEquals(1, ((JBossConnection)xaConn).getServerID());
-//
-// conn = cf.createConnection();
-// conn.close();
-// conn = cf.createConnection();
-//
-// assertEquals(1, ((JBossConnection)conn).getServerID());
-//
-// conn.start();
-//
-// xaConn.start();
-//
-// // register a failover listener
-// SimpleFailoverListener failoverListener = new SimpleFailoverListener();
-// ((JBossConnection)xaConn).registerFailoverListener(failoverListener);
-//
-// // Create a normal consumer on the queue
-// Session sessRec = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// //Send a message to the queue
-// MessageProducer prod = sessRec.createProducer(queue[1]);
-//
-// TextMessage sent = sessRec.createTextMessage("plop");
-//
-// prod.send(sent);
-//
-// // Create an XA session
-//
-// XASession sess = xaConn.createXASession();
-//
-// XAResource res = sess.getXAResource();
-//
-// MessageProducer prod2 = sess.createProducer(queue[1]);
-//
-// MessageConsumer cons2 = sess.createConsumer(queue[1]);
-//
-// tm.begin();
-//
-// Transaction tx = tm.getTransaction();
-//
-// tx.enlistResource(res);
-//
-// //Enlist a dummy XAResource to force 2pc
-// XAResource dummy = new DummyXAResource();
-//
-// tx.enlistResource(dummy);
-//
-// //receive a message
-//
-// TextMessage received = (TextMessage)cons2.receive(2000);
-//
-// assertNotNull(received);
-//
-// assertEquals(sent.getText(), received.getText());
-//
-// //Send a message
-//
-// TextMessage msg = sess.createTextMessage("Cupid stunt");
-//
-// prod2.send(msg);
-//
-// // Make sure can't be received
-//
-// MessageConsumer cons = sessRec.createConsumer(queue[1]);
-//
-// Message m = cons.receive(2000);
-//
-// assertNull(m);
-//
-// tx.delistResource(res, XAResource.TMSUCCESS);
-//
-// tx.delistResource(dummy, XAResource.TMSUCCESS);
-//
-// //Now kill node 1
-//
-// log.debug("killing node 1 ....");
-//
-// ServerManagement.kill(1);
-//
-// log.info("########");
-// log.info("######## KILLED NODE 1");
-// log.info("########");
-//
-// // wait for the client-side failover to complete
-//
-// while(true)
-// {
-// FailoverEvent event = failoverListener.getEvent(120000);
-// if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
-// {
-// break;
-// }
-// if (event == null)
-// {
-// fail("Did not get expected FAILOVER_COMPLETED event");
-// }
-// }
-//
-// // failover complete
-// log.info("failover completed");
-//
-// //Now commit the transaction
-//
-// tm.commit();
-//
-// // Message should now be receivable
-//
-// cons2.close();
-//
-// TextMessage mrec = (TextMessage)cons.receive(2000);
-//
-// assertNotNull(mrec);
-//
-// assertEquals(msg.getText(), mrec.getText());
-//
-// m = cons.receive(2000);
-//
-// //And the other message should be acked
-// assertNull(m);
-//
-// assertEquals(0, ((JBossConnection)xaConn).getServerID());
-//
-// }
-// finally
-// {
-// if (xaConn != null)
-// {
-// xaConn.close();
-// }
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
-// Commented out until http://jira.jboss.org/jira/browse/JBMESSAGING-883
- //is complete
-// public void testSendAndReceiveTwoConnectionsFailBeforePrepare() throws Exception
-// {
-// XAConnection xaConn0 = null;
-//
-// XAConnection xaConn1 = null;
-//
-// XAConnectionFactory xaCF = (XAConnectionFactory)cf;
-//
-// try
-// {
-// xaConn0 = xaCF.createXAConnection();
-//
-// assertEquals(0, ((JBossConnection)xaConn0).getServerID());
-//
-// xaConn1 = xaCF.createXAConnection();
-//
-// assertEquals(1, ((JBossConnection)xaConn1).getServerID());
-//
-// TextMessage sent0 = null;
-//
-// TextMessage sent1 = null;
-//
-// // Sending two messages.. on each server
-// {
-// Connection conn0 = null;
-//
-// Connection conn1 = null;
-//
-// conn0 = cf.createConnection();
-//
-// assertEquals(0, ((JBossConnection)conn0).getServerID());
-//
-// conn1 = cf.createConnection();
-//
-// assertEquals(1, ((JBossConnection)conn1).getServerID());
-//
-// //Send a message to each queue
-//
-// Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue[0]);
-//
-// sent0 = sess.createTextMessage("plop0");
-//
-// prod.send(sent0);
-//
-// sess.close();
-//
-// sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// prod = sess.createProducer(queue[1]);
-//
-// sent1 = sess.createTextMessage("plop1");
-//
-// prod.send(sent1);
-//
-// sess.close();
-// }
-//
-// xaConn0.start();
-//
-// xaConn1.start();
-//
-//
-// // register a failover listener
-// SimpleFailoverListener failoverListener = new SimpleFailoverListener();
-// ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
-//
-// XASession sess0 = xaConn0.createXASession();
-//
-// XAResource res0 = sess0.getXAResource();
-//
-// MessageProducer prod0 = sess0.createProducer(queue[0]);
-//
-// MessageConsumer cons0 = sess0.createConsumer(queue[0]);
-//
-//
-// XASession sess1 = xaConn1.createXASession();
-//
-// XAResource res1 = sess1.getXAResource();
-//
-// MessageProducer prod1 = sess1.createProducer(queue[1]);
-//
-// MessageConsumer cons1 = sess1.createConsumer(queue[1]);
-//
-//
-// tm.begin();
-//
-// Transaction tx = tm.getTransaction();
-//
-// tx.enlistResource(res0);
-//
-// tx.enlistResource(res1);
-//
-// //receive a message
-//
-// TextMessage received = (TextMessage)cons0.receive(2000);
-//
-// assertNotNull(received);
-//
-// assertEquals(sent0.getText(), received.getText());
-//
-//
-// received = (TextMessage)cons1.receive(2000);
-//
-// assertNotNull(received);
-//
-// assertEquals(sent1.getText(), received.getText());
-//
-//
-//
-// //Send a message
-//
-// TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
-//
-// prod0.send(msg0);
-//
-// TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
-//
-// prod1.send(msg1);
-//
-//
-//
-// tx.delistResource(res0, XAResource.TMSUCCESS);
-//
-// tx.delistResource(res1, XAResource.TMSUCCESS);
-//
-// //Now kill node 1
-//
-// log.debug("killing node 1 ....");
-//
-// ServerManagement.kill(1);
-//
-// log.info("########");
-// log.info("######## KILLED NODE 1");
-// log.info("########");
-//
-// // wait for the client-side failover to complete
-//
-// while(true)
-// {
-// FailoverEvent event = failoverListener.getEvent(120000);
-// if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
-// {
-// break;
-// }
-// if (event == null)
-// {
-// fail("Did not get expected FAILOVER_COMPLETED event");
-// }
-// }
-//
-// // failover complete
-// log.info("failover completed");
-//
-// //Now commit the transaction
-//
-// tm.commit();
-//
-// cons0.close();
-//
-// cons1.close();
-//
-// // Messages should now be receivable
-//
-// Connection conn = null;
-// try
-// {
-// conn = cf.createConnection();
-//
-// conn.start();
-//
-// Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons = session.createConsumer(queue[0]);
-//
-// HashSet receivedMessages = new HashSet();
-//
-// int numberOfReceivedMessages = 0;
-//
-// while(true)
-// {
-// TextMessage message = (TextMessage)cons.receive(2000);
-// if (message == null)
-// {
-// break;
-// }
-// log.info("Message = (" + message.getText() + ")");
-// receivedMessages.add(message.getText());
-// numberOfReceivedMessages++;
-// }
-//
-// //These two should be acked
-//
-// assertFalse("\"plop0\" message was duplicated",
-// receivedMessages.contains("plop0"));
-//
-// assertFalse("\"plop1\" message was duplicated",
-// receivedMessages.contains("plop1"));
-//
-// //And these should be receivable
-//
-// assertTrue("\"Cupid stunt0\" message wasn't received",
-// receivedMessages.contains("Cupid stunt0"));
-//
-// assertTrue("\"Cupid stunt1\" message wasn't received",
-// receivedMessages.contains("Cupid stunt1"));
-//
-// assertEquals(2, numberOfReceivedMessages);
-//
-// assertEquals(0, ((JBossConnection)xaConn1).getServerID());
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-//
-// }
-// finally
-// {
-// if (xaConn1 != null)
-// {
-// xaConn1.close();
-// }
-// if (xaConn0 != null)
-// {
-// xaConn0.close();
-// }
-// }
-// }
-
-
-
public void testSendAndReceiveFailAfterPrepareAndRetryCommit() throws Exception
{
XAConnection xaConn1 = null;
@@ -682,12 +670,8 @@
// Sending a messages
{
- Connection conn1 = createConnectionOnServer(cf, 0);
+ Connection conn1 = createConnectionOnServer(cf, 1);
- assertEquals(0, getServerId(conn1));
-
- conn1 = cf.createConnection();
-
assertEquals(1, getServerId(conn1));
//Send a message
@@ -797,7 +781,7 @@
Connection conn = null;
try
{
- conn = cf.createConnection();
+ conn = this.createConnectionOnServer(cf, 0);
assertEquals(0, getServerId(conn));
@@ -853,252 +837,246 @@
}
}
-// This test is invalid because it assumes the order in which prepare is called on the two
-// particants.
-// If prepare is called on server 1 first it will crash and prepare won't get called on server 0
-// so the test will fail.
-//
-//
-// public void testSendAndReceiveTwoConnectionsFailAfterPrepareAndRecover() throws Exception
-// {
-// XAConnection xaConn0 = null;
-//
-// XAConnection xaConn1 = null;
-//
-// XAConnectionFactory xaCF = (XAConnectionFactory)cf;
-//
-// TextMessage sent0 = null;
-//
-// TextMessage sent1 = null;
-//
-// // Sending two messages.. on each server
-// {
-// Connection conn0 = null;
-//
-// Connection conn1 = null;
-//
-// conn0 = cf.createConnection();
-//
-// assertEquals(0, ((JBossConnection)conn0).getServerID());
-//
-// conn1 = cf.createConnection();
-//
-// assertEquals(1, ((JBossConnection)conn1).getServerID());
-//
-// //Send a message to each queue
-//
-// Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue[0]);
-//
-// sent0 = sess.createTextMessage("plop0");
-//
-// prod.send(sent0);
-//
-// sess.close();
-//
-// sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// prod = sess.createProducer(queue[1]);
-//
-// sent1 = sess.createTextMessage("plop1");
-//
-// prod.send(sent1);
-//
-// sess.close();
-// }
-//
-//
-// try
-// {
-// xaConn0 = xaCF.createXAConnection();
-//
-// assertEquals(0, ((JBossConnection)xaConn0).getServerID());
-//
-// xaConn1 = xaCF.createXAConnection();
-//
-// assertEquals(1, ((JBossConnection)xaConn1).getServerID());
-//
-// xaConn0.start();
-//
-// xaConn1.start();
-//
-//
-// // register a failover listener
-// SimpleFailoverListener failoverListener = new SimpleFailoverListener();
-// ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
-//
-//
-// XASession sess0 = xaConn0.createXASession();
-//
-// XAResource res0 = sess0.getXAResource();
-//
-// MessageProducer prod0 = sess0.createProducer(queue[0]);
-//
-// MessageConsumer cons0 = sess0.createConsumer(queue[0]);
-//
-//
-// XASession sess1 = xaConn1.createXASession();
-//
-// XAResource res1 = sess1.getXAResource();
-//
-// MessageProducer prod1 = sess1.createProducer(queue[1]);
-//
-// MessageConsumer cons1 = sess1.createConsumer(queue[1]);
-//
-//
-// tm.begin();
-//
-// Transaction tx = tm.getTransaction();
-//
-// tx.enlistResource(res0);
-//
-// tx.enlistResource(res1);
-//
-// //receive a message
-//
-// TextMessage received = (TextMessage)cons0.receive(2000);
-//
-// assertNotNull(received);
-//
-// assertEquals(sent0.getText(), received.getText());
-//
-//
-// received = (TextMessage)cons1.receive(2000);
-//
-// assertNotNull(received);
-//
-// assertEquals(sent1.getText(), received.getText());
-//
-//
-//
-// //Send a message
-//
-// TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
-//
-// prod0.send(msg0);
-//
-// TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
-//
-// prod1.send(msg1);
-//
-// tx.delistResource(res0, XAResource.TMSUCCESS);
-//
-// tx.delistResource(res1, XAResource.TMSUCCESS);
-//
-// // We poison node 1 so that it crashes after prepare but before commit is processed
-//
-// ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
-//
-// tm.commit();
-//
-// //Now kill node 1
-//
-// log.debug("killing node 1 ....");
-//
-// ServerManagement.kill(1);
-//
-// log.info("########");
-// log.info("######## KILLED NODE 1");
-// log.info("########");
-//
-// // wait for the client-side failover to complete
-//
-// while(true)
-// {
-// FailoverEvent event = failoverListener.getEvent(120000);
-// if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
-// {
-// break;
-// }
-// if (event == null)
-// {
-// fail("Did not get expected FAILOVER_COMPLETED event");
-// }
-// }
-//
-// //When the node comes back up, the invocation to commit() will be retried on the new node.
-// //The new node will by then already have loaded into memory the prepared transactions from
-// //the failed node so this should complete ok
-//
-// // failover complete
-// log.info("failover completed");
-//
-// cons0.close();
-//
-// cons1.close();
-//
-//
-// // Message should now be receivable
-// Connection conn = null;
-// try
-// {
-// conn = cf.createConnection();
-//
-// conn.start();
-//
-// Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons = session.createConsumer(queue[0]);
-//
-// HashSet receivedMessages = new HashSet();
-//
-// int numberOfReceivedMessages = 0;
-//
-// while(true)
-// {
-// TextMessage message = (TextMessage)cons.receive(2000);
-// if (message == null)
-// {
-// break;
-// }
-// log.info("Message = (" + message.getText() + ")");
-// receivedMessages.add(message.getText());
-// numberOfReceivedMessages++;
-// }
-//
-//
-// assertFalse("\"plop0\" message was duplicated",
-// receivedMessages.contains("plop0"));
-//
-// assertFalse("\"plop1\" message was duplicated",
-// receivedMessages.contains("plop0"));
-//
-// assertTrue("\"Cupid stunt0\" message wasn't received",
-// receivedMessages.contains("Cupid stunt0"));
-//
-// assertTrue("\"Cupid stunt1\" message wasn't received",
-// receivedMessages.contains("Cupid stunt1"));
-//
-// assertEquals(2, numberOfReceivedMessages);
-//
-// assertEquals(0, ((JBossConnection)xaConn1).getServerID());
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-//
-//
-//
-// assertEquals(0, ((JBossConnection)xaConn1).getServerID());
-//
-// }
-// finally
-// {
-// if (xaConn1 != null)
-// {
-// xaConn1.close();
-// }
-// if (xaConn0 != null)
-// {
-// xaConn0.close();
-// }
-// }
-// }
+ public void testSendAndReceiveTwoConnectionsFailAfterPrepareAndRecover() throws Exception
+ {
+ XAConnection xaConn0 = null;
+
+ XAConnection xaConn1 = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ TextMessage sent0 = null;
+
+ TextMessage sent1 = null;
+
+ // Sending two messages.. on each server
+ {
+ Connection conn0 = null;
+
+ Connection conn1 = null;
+
+ conn0 = this.createConnectionOnServer(cf, 0);
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ conn1 = this.createConnectionOnServer(cf, 1);
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ //Send a message to each queue
+
+ Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue[0]);
+
+ sent0 = sess.createTextMessage("plop0");
+
+ prod.send(sent0);
+
+ sess.close();
+
+ sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ prod = sess.createProducer(queue[1]);
+
+ sent1 = sess.createTextMessage("plop1");
+
+ prod.send(sent1);
+
+ sess.close();
+ }
+
+
+ try
+ {
+ xaConn0 = xaCF.createXAConnection();
+
+ assertEquals(0, ((JBossConnection)xaConn0).getServerID());
+
+ xaConn1 = xaCF.createXAConnection();
+
+ assertEquals(1, ((JBossConnection)xaConn1).getServerID());
+
+ xaConn0.start();
+
+ xaConn1.start();
+
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
+
+
+ XASession sess0 = xaConn0.createXASession();
+
+ XAResource res0 = sess0.getXAResource();
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+
+
+ XASession sess1 = xaConn1.createXASession();
+
+ XAResource res1 = sess1.getXAResource();
+
+ MessageProducer prod1 = sess1.createProducer(queue[1]);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res0);
+
+ tx.enlistResource(res1);
+
+ //receive a message
+
+ TextMessage received = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent0.getText(), received.getText());
+
+
+ received = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent1.getText(), received.getText());
+
+
+
+ //Send a message
+
+ TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
+
+ prod0.send(msg0);
+
+ TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
+
+ prod1.send(msg1);
+
+ tx.delistResource(res0, XAResource.TMSUCCESS);
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+
+ // We poison node 1 so that it crashes after prepare but before commit is processed
+
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
+
+ tm.commit();
+
+ //Now kill node 1
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ //When the node comes back up, the invocation to commit() will be retried on the new node.
+ //The new node will by then already have loaded into memory the prepared transactions from
+ //the failed node so this should complete ok
+
+ // failover complete
+ log.info("failover completed");
+
+ cons0.close();
+
+ cons1.close();
+
+
+ // Message should now be receivable
+ Connection conn = null;
+ try
+ {
+ conn = this.createConnectionOnServer(cf, 0);
+
+ conn.start();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue[0]);
+
+ HashSet receivedMessages = new HashSet();
+
+ int numberOfReceivedMessages = 0;
+
+ while(true)
+ {
+ TextMessage message = (TextMessage)cons.receive(2000);
+ if (message == null)
+ {
+ break;
+ }
+ log.info("Message = (" + message.getText() + ")");
+ receivedMessages.add(message.getText());
+ numberOfReceivedMessages++;
+ }
+
+
+ assertFalse("\"plop0\" message was duplicated",
+ receivedMessages.contains("plop0"));
+
+ assertFalse("\"plop1\" message was duplicated",
+ receivedMessages.contains("plop0"));
+
+ assertTrue("\"Cupid stunt0\" message wasn't received",
+ receivedMessages.contains("Cupid stunt0"));
+
+ assertTrue("\"Cupid stunt1\" message wasn't received",
+ receivedMessages.contains("Cupid stunt1"));
+
+ assertEquals(2, numberOfReceivedMessages);
+
+ assertEquals(0, ((JBossConnection)xaConn1).getServerID());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+
+
+
+ assertEquals(0, ((JBossConnection)xaConn1).getServerID());
+
+ }
+ finally
+ {
+ if (xaConn1 != null)
+ {
+ xaConn1.close();
+ }
+ if (xaConn0 != null)
+ {
+ xaConn0.close();
+ }
+ }
+ }
More information about the jboss-cvs-commits
mailing list