[jboss-cvs] JBoss Messaging SVN: r5566 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 2 07:11:25 EST 2009
Author: timfox
Date: 2009-01-02 07:11:25 -0500 (Fri, 02 Jan 2009)
New Revision: 5566
Modified:
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
Paging, routing, tx refactoring part II
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -108,7 +108,7 @@
// Bindings related operations
- void addBinding(Binding binding) throws Exception;
+ void addBinding(Binding binding, boolean duplicateDetection) throws Exception;
void deleteBinding(Binding binding) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -605,7 +605,7 @@
// Bindings operations
- public void addBinding(final Binding binding) throws Exception
+ public void addBinding(final Binding binding, final boolean duplicateDetection) throws Exception
{
// We generate the queue id here
@@ -644,7 +644,8 @@
binding.getAddress(),
filterString,
binding.isExclusive(),
- linkAddress);
+ linkAddress,
+ duplicateDetection);
bindingsJournal.appendAddRecord(bindingID, BINDING_RECORD, bindingEncoding);
}
@@ -741,7 +742,8 @@
filter,
true,
false,
- bindingEncoding.linkAddress);
+ bindingEncoding.linkAddress,
+ bindingEncoding.duplicateDetection);
}
Binding binding = new BindingImpl(bindingEncoding.type,
@@ -1113,6 +1115,8 @@
boolean exclusive;
SimpleString linkAddress;
+
+ boolean duplicateDetection;
public BindingEncoding()
{
@@ -1123,7 +1127,8 @@
final SimpleString address,
final SimpleString filter,
final boolean exclusive,
- final SimpleString linkAddress)
+ final SimpleString linkAddress,
+ final boolean duplicateDetection)
{
super();
this.type = type;
@@ -1132,6 +1137,7 @@
this.filter = filter;
this.exclusive = exclusive;
this.linkAddress = linkAddress;
+ this.duplicateDetection = duplicateDetection;
}
public void decode(final MessagingBuffer buffer)
@@ -1159,6 +1165,7 @@
filter = buffer.getNullableSimpleString();
exclusive = buffer.getBoolean();
linkAddress = buffer.getNullableSimpleString();
+ duplicateDetection = buffer.getBoolean();
}
public void encode(final MessagingBuffer buffer)
@@ -1176,6 +1183,7 @@
buffer.putNullableSimpleString(filter);
buffer.putBoolean(exclusive);
buffer.putNullableSimpleString(linkAddress);
+ buffer.putBoolean(duplicateDetection);
}
public int getEncodeSize()
@@ -1185,7 +1193,8 @@
SimpleString.sizeofString(address) +
SimpleString.sizeofNullableString(filter) +
SIZE_BOOLEAN +
- SimpleString.sizeofNullableString(linkAddress);
+ SimpleString.sizeofNullableString(linkAddress) +
+ SIZE_BOOLEAN;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -57,7 +57,7 @@
private volatile boolean started;
- public void addBinding(final Binding binding) throws Exception
+ public void addBinding(final Binding binding, final boolean duplicateDetection) throws Exception
{
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -66,7 +66,8 @@
boolean durable,
boolean temporary,
boolean exclusive,
- SimpleString linkAddress) throws Exception;
+ SimpleString linkAddress,
+ boolean duplicateDetection) throws Exception;
Binding addQueueBinding(SimpleString name,
SimpleString address,
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -278,7 +278,7 @@
if (durable)
{
- storageManager.addBinding(binding);
+ storageManager.addBinding(binding, false);
}
return binding;
@@ -290,15 +290,16 @@
final boolean durable,
final boolean temporary,
final boolean exclusive,
- final SimpleString linkAddress) throws Exception
+ final SimpleString linkAddress,
+ final boolean duplicateDetection) throws Exception
{
- Binding binding = createLinkBinding(name, address, filter, durable, temporary, exclusive, linkAddress);
+ Binding binding = createLinkBinding(name, address, filter, durable, temporary, exclusive, linkAddress, duplicateDetection);
addBindingInMemory(binding);
if (durable)
{
- storageManager.addBinding(binding);
+ storageManager.addBinding(binding, duplicateDetection);
}
return binding;
@@ -481,9 +482,10 @@
final boolean durable,
final boolean temporary,
final boolean exclusive,
- final SimpleString linkAddress) throws Exception
+ final SimpleString linkAddress,
+ final boolean duplicateDetection) throws Exception
{
- Bindable bindable = bindableFactory.createLink(-1, name, filter, durable, temporary, linkAddress);
+ Bindable bindable = bindableFactory.createLink(-1, name, filter, durable, temporary, linkAddress, duplicateDetection);
Binding binding = new BindingImpl(BindingType.LINK, address, bindable, exclusive);
Modified: trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/BindableFactory.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -45,7 +45,8 @@
Filter filter,
boolean durable,
boolean temporary,
- SimpleString linkAddress);
+ SimpleString linkAddress,
+ boolean duplicateDetection);
// TODO - these injectors should not be here!!
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ForwarderImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -205,7 +205,10 @@
log.warn("Timed out waiting for batch to be sent");
}
- session.close();
+ if (session != null)
+ {
+ session.close();
+ }
started = false;
}
@@ -225,6 +228,7 @@
public HandleStatus handle(final MessageReference reference) throws Exception
{
+ log.info("Got message, busy: " + busy);
if (busy)
{
return HandleStatus.BUSY;
@@ -236,7 +240,9 @@
{
return HandleStatus.BUSY;
}
+
reference.getQueue().referenceHandled();
+
refs.add(reference);
if (maxBatchTime != -1)
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/MessageFlowImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -342,7 +342,7 @@
//Create the link
- postOffice.addLinkBinding(linkName, address, filter, true, false, exclusive, queueName);
+ postOffice.addLinkBinding(linkName, address, filter, true, false, exclusive, queueName, useDuplicateDetection);
}
Queue queue = (Queue)binding.getBindable();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/BindableFactoryImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -99,10 +99,11 @@
final Filter filter,
final boolean durable,
final boolean temporary,
- final SimpleString linkAddress)
+ final SimpleString linkAddress,
+ final boolean duplicateDetection)
{
- Link link = new LinkImpl(name, durable, filter, linkAddress, postOffice, storageManager);
+ Link link = new LinkImpl(name, durable, filter, linkAddress, duplicateDetection, postOffice, storageManager);
link.setPersistenceID(persistenceID);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/LinkImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_DUPLICATE_DETECTION_ID;
import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGIN_QUEUE;
import org.jboss.messaging.core.filter.Filter;
@@ -52,6 +53,8 @@
private final PostOffice postOffice;
private final SimpleString address;
+
+ private final boolean duplicateDetection;
private final StorageManager storageManager;
@@ -67,6 +70,7 @@
final boolean durable,
final Filter filter,
final SimpleString address,
+ final boolean useDuplicateDetection,
final PostOffice postOffice,
final StorageManager storageManager)
{
@@ -77,6 +81,8 @@
this.filter = filter;
this.address = address;
+
+ this.duplicateDetection = useDuplicateDetection;
this.postOffice = postOffice;
@@ -95,6 +101,13 @@
copy.putStringProperty(HDR_ORIGIN_QUEUE, originalDestination);
+ if (duplicateDetection)
+ {
+ SimpleString duplID = new SimpleString(String.valueOf(copy.getMessageID())).concat(name);
+
+ copy.putStringProperty(HDR_DUPLICATE_DETECTION_ID, duplID);
+ }
+
postOffice.route(copy, tx);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -30,6 +30,7 @@
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.Distributor;
@@ -147,8 +148,37 @@
// Bindable implementation -------------------------------------------------------------------------------------
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public void route(final ServerMessage message, Transaction tx) throws Exception
{
+ SimpleString duplicateID = (SimpleString)message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+
+ DuplicateIDCache cache = null;
+
+ if (duplicateID != null)
+ {
+ cache = postOffice.getDuplicateIDCache(message.getDestination());
+
+ if (cache.contains(duplicateID))
+ {
+ log.warn("Duplicate message detected - message will not be routed");
+
+ return;
+ }
+ }
+
+ boolean durableRef = message.isDurable() && durable;
+
+ boolean startedTx = false;
+
+ if (cache != null && tx == null && durableRef)
+ {
+ //We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
+
+ tx = new TransactionImpl(storageManager, postOffice);
+
+ startedTx = true;
+ }
+
// TODO we can avoid these lookups in the Queue since all messsages in the Queue will be for the same store
PagingStore store = pagingManager.getPageStore(message.getDestination());
@@ -159,9 +189,17 @@
if (!message.isReload())
{
- if (message.getDurableRefCount() == 1)
+ if (message.getRefCount() == 1)
{
- storageManager.storeMessage(message);
+ if (durableRef)
+ {
+ storageManager.storeMessage(message);
+ }
+
+ if (cache != null)
+ {
+ cache.addToCache(duplicateID);
+ }
}
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
@@ -170,7 +208,7 @@
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- if (ref.getMessage().isDurable() && durable)
+ if (durableRef)
{
storageManager.updateScheduledDeliveryTime(ref);
}
@@ -204,17 +242,23 @@
{
MessageReference ref = message.createReference(this);
- boolean first = message.getDurableRefCount() == 1;
+ boolean first = message.getRefCount() == 1;
- if (first)
+ if (message.getRefCount() == 1)
{
- storageManager.storeMessageTransactional(tx.getID(), message);
+ if (durableRef)
+ {
+ storageManager.storeMessageTransactional(tx.getID(), message);
+ }
+
+ if (cache != null)
+ {
+ cache.addToCache(duplicateID, tx);
+ }
}
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
- boolean durableRef = ref.getMessage().isDurable() && durable;
-
+
if (scheduledDeliveryTime != null)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
@@ -240,6 +284,11 @@
}
}
}
+
+ if (startedTx)
+ {
+ tx.commit();
+ }
}
private class AddMessageOperation implements TransactionOperation
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -313,8 +313,8 @@
{
if (tx != null && tx.getXid() == null)
{
- //We only rollback local txs on close, not XA tx branches
-
+ // We only rollback local txs on close, not XA tx branches
+
rollback();
}
@@ -348,8 +348,939 @@
queue.deliverAsync(executor);
}
- public void doHandleCreateConsumer(final SessionCreateConsumerMessage packet)
+ public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
{
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleCreateConsumer(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleCreateConsumer(packet);
+ }
+ });
+ }
+ }
+
+ public void handleCreateQueue(final SessionCreateQueueMessage packet)
+ {
+ final SendLock lock;
+
+ if (channel.getReplicatingChannel() != null)
+ {
+ lock = postOffice.getAddressLock(packet.getAddress());
+
+ lock.lock();
+ }
+ else
+ {
+ lock = null;
+ }
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleCreateQueue(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleCreateQueue(packet);
+
+ lock.unlock();
+ }
+ });
+ }
+ }
+
+ public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
+ {
+ final SendLock lock;
+ if (channel.getReplicatingChannel() != null)
+ {
+ Binding binding = postOffice.getBinding(packet.getQueueName());
+ lock = postOffice.getAddressLock(binding.getAddress());
+
+ lock.lock();
+ }
+ else
+ {
+ lock = null;
+ }
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleDeleteQueue(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleDeleteQueue(packet);
+
+ lock.unlock();
+ }
+ });
+ }
+ }
+
+ public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleExecuteQueueQuery(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleExecuteQueueQuery(packet);
+ }
+ });
+ }
+ }
+
+ public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleExecuteBindingQuery(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleExecuteBindingQuery(packet);
+ }
+ });
+ }
+ }
+
+ public void handleAcknowledge(final SessionAcknowledgeMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleAcknowledge(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleAcknowledge(packet);
+ }
+ });
+ }
+ }
+
+ public void handleExpired(final SessionExpiredMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleExpired(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleExpired(packet);
+ }
+ });
+ }
+ }
+
+ public void handleCommit(final Packet packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleCommit(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleCommit(packet);
+ }
+ });
+ }
+ }
+
+ public void handleRollback(final Packet packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleRollback(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleRollback(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXACommit(final SessionXACommitMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXACommit(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXACommit(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXAEnd(final SessionXAEndMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXAEnd(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXAEnd(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXAForget(final SessionXAForgetMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXAForget(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXAForget(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXAJoin(final SessionXAJoinMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXAJoin(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXAJoin(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXAResume(final SessionXAResumeMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXAResume(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXAResume(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXARollback(final SessionXARollbackMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXARollback(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXARollback(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXAStart(final SessionXAStartMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXAStart(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXAStart(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXASuspend(final Packet packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXASuspend(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXASuspend(packet);
+ }
+ });
+ }
+ }
+
+ public void handleXAPrepare(final SessionXAPrepareMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleXAPrepare(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleXAPrepare(packet);
+ }
+ });
+ }
+ }
+
+ public void handleGetInDoubtXids(final Packet packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleGetInDoubtXids(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleGetInDoubtXids(packet);
+ }
+ });
+ }
+ }
+
+ public void handleGetXATimeout(final Packet packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleGetXATimeout(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleGetXATimeout(packet);
+ }
+ });
+ }
+ }
+
+ public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleSetXATimeout(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleSetXATimeout(packet);
+ }
+ });
+ }
+ }
+
+ public void handleAddDestination(final SessionAddDestinationMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleAddDestination(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleAddDestination(packet);
+ }
+ });
+ }
+ }
+
+ public void handleRemoveDestination(final SessionRemoveDestinationMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleRemoveDestination(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleRemoveDestination(packet);
+ }
+ });
+ }
+ }
+
+ private void lockConsumers()
+ {
+ for (ServerConsumer consumer : consumers.values())
+ {
+ consumer.lock();
+ }
+ }
+
+ private void unlockConsumers()
+ {
+ for (ServerConsumer consumer : consumers.values())
+ {
+ consumer.unlock();
+ }
+ }
+
+ public void handleStart(final Packet packet)
+ {
+ boolean lock = channel.getReplicatingChannel() != null;
+
+ if (lock)
+ {
+ lockConsumers();
+ }
+
+ // We need to prevent any delivery and replication of delivery occurring while the start/stop
+ // is being processed.
+ // Otherwise we can end up with start/stop being processed in different order on backup to live.
+ // Which can result in, say, a delivery arriving at backup, but it's still not started!
+ DelayedResult result = null;
+ try
+ {
+ result = channel.replicatePacket(packet);
+
+ // note we process start before response is back from the backup
+
+ setStarted(true);
+ }
+ finally
+ {
+ if (lock)
+ {
+ unlockConsumers();
+ }
+ }
+
+ if (result == null)
+ {
+ channel.confirm(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ channel.confirm(packet);
+ }
+ });
+ }
+ }
+
+ // TODO try removing the lock consumers and see what happens!!
+ public void handleStop(final Packet packet)
+ {
+ boolean lock = channel.getReplicatingChannel() != null;
+
+ if (lock)
+ {
+ lockConsumers();
+ }
+
+ try
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ // note we process stop before response is back from the backup
+
+ final Packet response = new NullResponseMessage();
+
+ setStarted(false);
+
+ if (result == null)
+ {
+ channel.confirm(packet);
+ // Not clustered - just send now
+ channel.send(response);
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ channel.confirm(packet);
+
+ channel.send(response);
+ }
+ });
+ }
+ }
+ finally
+ {
+ if (lock)
+ {
+ unlockConsumers();
+ }
+ }
+ }
+
+ public void handleFailedOver(final Packet packet)
+ {
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+
+ for (ServerConsumer consumer : consumersClone)
+ {
+ consumer.failedOver();
+ }
+ }
+
+ public void handleClose(final Packet packet)
+ {
+ // We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
+ // but we need to process the actual close() when the replication response returns, otherwise things
+ // can happen like acks can come in after close
+
+ for (ServerConsumer consumer : consumers.values())
+ {
+ consumer.setStarted(false);
+ }
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ if (result == null)
+ {
+ doHandleClose(packet);
+ }
+ else
+ {
+ // Don't process until result has come back from backup
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doHandleClose(packet);
+ }
+ });
+ }
+ }
+
+ public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
+ {
+ // We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
+ // but we need to process the actual close() when the replication response returns, otherwise things
+ // can happen like acks can come in after close
+
+ ServerConsumer consumer = consumers.get(packet.getConsumerID());
+
+ consumer.handleClose(packet);
+ }
+
+ public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ try
+ {
+ // Note we don't wait for response before handling this
+
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
+
+ if (result == null)
+ {
+ channel.confirm(packet);
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ channel.confirm(packet);
+ }
+ });
+ }
+ }
+
+ public void handleSendLargeMessage(final SessionSendMessage packet)
+ {
+ if (packet.getMessageID() <= 0L)
+ {
+ // must generate message id here, so we know they are in sync on live and backup
+ long id = storageManager.generateUniqueID();
+
+ packet.setMessageID(id);
+ }
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ // With a send we must make sure it is replicated to backup before being processed on live
+ // or can end up with delivery being processed on backup before original send
+
+ if (result == null)
+ {
+ doSendLargeMessage(packet);
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doSendLargeMessage(packet);
+ }
+ });
+ }
+
+ }
+
+ public void handleSend(final SessionSendMessage packet)
+ {
+ // With a send we must make sure it is replicated to backup before being processed on live
+ // or can end up with delivery being processed on backup before original send
+
+ ServerMessage msg = packet.getServerMessage();
+
+ final SendLock lock;
+
+ if (channel.getReplicatingChannel() != null)
+ {
+ lock = postOffice.getAddressLock(msg.getDestination());
+
+ lock.beforeSend();
+ }
+ else
+ {
+ lock = null;
+ }
+
+ if (packet.getMessageID() <= 0L)
+ {
+ // must generate message id here, so we know they are in sync on live and backup
+ long id = storageManager.generateUniqueID();
+
+ packet.setMessageID(id);
+ }
+
+ if (channel.getReplicatingChannel() != null)
+ {
+ msg.putBooleanProperty(new SimpleString("clustered"), true);
+ }
+
+ DelayedResult result = channel.replicatePacket(packet);
+
+ // With a send we must make sure it is replicated to backup before being processed on live
+ // or can end up with delivery being processed on backup before original send
+
+ if (result == null)
+ {
+ doSend(packet);
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doSend(packet);
+
+ lock.afterSend();
+ }
+ });
+ }
+ }
+
+ public void handleSendContinuations(final SessionSendContinuationMessage packet)
+ {
+ DelayedResult result = channel.replicatePacket(packet);
+
+ // With a send we must make sure it is replicated to backup before being processed on live
+ // or can end up with delivery being processed on backup before original send
+
+ if (result == null)
+ {
+ doSendContinuations(packet);
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ doSendContinuations(packet);
+ }
+ });
+ }
+ }
+
+ public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
+ {
+ ServerConsumer consumer = consumers.get(packet.getConsumerID());
+
+ if (consumer == null)
+ {
+ throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed");
+ }
+
+ try
+ {
+ consumer.deliverReplicated(packet.getMessageID());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle replicated delivery", e);
+ }
+ }
+
+ public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
+ {
+ boolean wasStarted = this.started;
+
+ if (wasStarted)
+ {
+ this.setStarted(false);
+ }
+
+ remotingConnection.removeFailureListener(this);
+
+ channel.transferConnection(newConnection);
+
+ RemotingConnection oldReplicatingConnection = newConnection.getReplicatingConnection();
+
+ if (oldReplicatingConnection != null)
+ {
+ oldReplicatingConnection.destroy();
+ }
+
+ newConnection.setReplicatingConnection(remotingConnection.getReplicatingConnection());
+
+ remotingConnection.setReplicatingConnection(null);
+
+ newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
+
+ // Destroy the old connection
+ remotingConnection.destroy();
+
+ remotingConnection = newConnection;
+
+ remotingConnection.addFailureListener(this);
+
+ int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
+
+ channel.replayCommands(lastReceivedCommandID);
+
+ if (wasStarted)
+ {
+ this.setStarted(true);
+ }
+
+ return serverLastReceivedCommandID;
+ }
+
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
+ // FailureListener implementation
+ // --------------------------------------------------------------------
+
+ public boolean connectionFailed(final MessagingException me)
+ {
+ try
+ {
+ log.info("Connection timed out, so clearing up resources for session " + name);
+
+ for (Runnable runner : failureRunners)
+ {
+ try
+ {
+ runner.run();
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to execute failure runner", t);
+ }
+ }
+
+ // We call handleClose() since we need to replicate the close too, if there is a backup
+ handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
+
+ log.info("Cleared up resources for session " + name);
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to close connection " + this);
+ }
+
+ return true;
+ }
+
+ // Public
+ // ----------------------------------------------------------------------------
+
+ public Transaction getTransaction()
+ {
+ return tx;
+ }
+
+ // Private
+ // ----------------------------------------------------------------------------
+
+ private void doHandleCreateConsumer(final SessionCreateConsumerMessage packet)
+ {
SimpleString queueName = packet.getQueueName();
SimpleString filterString = packet.getFilterString();
@@ -432,29 +1363,8 @@
channel.send(response);
}
- public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
+ private void doHandleCreateQueue(final SessionCreateQueueMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleCreateConsumer(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleCreateConsumer(packet);
- }
- });
- }
- }
-
- public void doHandleCreateQueue(final SessionCreateQueueMessage packet)
- {
SimpleString address = packet.getAddress();
SimpleString queueName = packet.getQueueName();
@@ -538,80 +1448,8 @@
channel.send(response);
}
- public void handleCreateQueue(final SessionCreateQueueMessage packet)
+ private void doHandleDeleteQueue(final SessionDeleteQueueMessage packet)
{
- final SendLock lock;
-
- if (channel.getReplicatingChannel() != null)
- {
- lock = postOffice.getAddressLock(packet.getAddress());
-
- lock.lock();
- }
- else
- {
- lock = null;
- }
-
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleCreateQueue(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleCreateQueue(packet);
-
- lock.unlock();
- }
- });
- }
- }
-
- public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
- {
- final SendLock lock;
- if (channel.getReplicatingChannel() != null)
- {
- Binding binding = postOffice.getBinding(packet.getQueueName());
- lock = postOffice.getAddressLock(binding.getAddress());
-
- lock.lock();
- }
- else
- {
- lock = null;
- }
-
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleDeleteQueue(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleDeleteQueue(packet);
-
- lock.unlock();
- }
- });
- }
- }
-
- public void doHandleDeleteQueue(final SessionDeleteQueueMessage packet)
- {
SimpleString queueName = packet.getQueueName();
Packet response = null;
@@ -658,29 +1496,8 @@
channel.send(response);
}
- public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
+ private void doHandleExecuteQueueQuery(final SessionQueueQueryMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleExecuteQueueQuery(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleExecuteQueueQuery(packet);
- }
- });
- }
- }
-
- public void doHandleExecuteQueueQuery(final SessionQueueQueryMessage packet)
- {
SimpleString queueName = packet.getQueueName();
Packet response = null;
@@ -732,29 +1549,8 @@
channel.send(response);
}
- public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
+ private void doHandleExecuteBindingQuery(final SessionBindingQueryMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleExecuteBindingQuery(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleExecuteBindingQuery(packet);
- }
- });
- }
- }
-
- public void doHandleExecuteBindingQuery(final SessionBindingQueryMessage packet)
- {
SimpleString address = packet.getAddress();
Packet response = null;
@@ -804,29 +1600,8 @@
channel.send(response);
}
- public void handleAcknowledge(final SessionAcknowledgeMessage packet)
+ private void doHandleAcknowledge(final SessionAcknowledgeMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleAcknowledge(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleAcknowledge(packet);
- }
- });
- }
- }
-
- public void doHandleAcknowledge(final SessionAcknowledgeMessage packet)
- {
Packet response = null;
try
@@ -865,29 +1640,8 @@
}
}
- public void handleExpired(final SessionExpiredMessage packet)
+ private void doHandleExpired(final SessionExpiredMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleExpired(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleExpired(packet);
- }
- });
- }
- }
-
- public void doHandleExpired(final SessionExpiredMessage packet)
- {
try
{
MessageReference ref = consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
@@ -906,29 +1660,8 @@
channel.confirm(packet);
}
- public void handleCommit(final Packet packet)
+ private void doHandleCommit(final Packet packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleCommit(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleCommit(packet);
- }
- });
- }
- }
-
- public void doHandleCommit(final Packet packet)
- {
Packet response = null;
try
@@ -960,29 +1693,8 @@
channel.send(response);
}
- public void handleRollback(final Packet packet)
+ private void doHandleRollback(final Packet packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleRollback(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleRollback(packet);
- }
- });
- }
- }
-
- public void doHandleRollback(final Packet packet)
- {
Packet response = null;
try
@@ -1010,29 +1722,8 @@
channel.send(response);
}
- public void handleXACommit(final SessionXACommitMessage packet)
+ private void doHandleXACommit(final SessionXACommitMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXACommit(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXACommit(packet);
- }
- });
- }
- }
-
- public void doHandleXACommit(final SessionXACommitMessage packet)
- {
Packet response = null;
Xid xid = packet.getXid();
@@ -1094,29 +1785,8 @@
channel.send(response);
}
- public void handleXAEnd(final SessionXAEndMessage packet)
+ private void doHandleXAEnd(final SessionXAEndMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXAEnd(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXAEnd(packet);
- }
- });
- }
- }
-
- public void doHandleXAEnd(final SessionXAEndMessage packet)
- {
Packet response = null;
Xid xid = packet.getXid();
@@ -1189,29 +1859,8 @@
channel.send(response);
}
- public void handleXAForget(final SessionXAForgetMessage packet)
+ private void doHandleXAForget(final SessionXAForgetMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXAForget(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXAForget(packet);
- }
- });
- }
- }
-
- public void doHandleXAForget(final SessionXAForgetMessage packet)
- {
// Do nothing since we don't support heuristic commits / rollback from the
// resource manager
@@ -1222,29 +1871,8 @@
channel.send(response);
}
- public void handleXAJoin(final SessionXAJoinMessage packet)
+ private void doHandleXAJoin(final SessionXAJoinMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXAJoin(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXAJoin(packet);
- }
- });
- }
- }
-
- public void doHandleXAJoin(final SessionXAJoinMessage packet)
- {
Packet response = null;
Xid xid = packet.getXid();
@@ -1294,29 +1922,8 @@
channel.send(response);
}
- public void handleXAResume(final SessionXAResumeMessage packet)
+ private void doHandleXAResume(final SessionXAResumeMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXAResume(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXAResume(packet);
- }
- });
- }
- }
-
- public void doHandleXAResume(final SessionXAResumeMessage packet)
- {
Packet response = null;
Xid xid = packet.getXid();
@@ -1377,29 +1984,8 @@
channel.send(response);
}
- public void handleXARollback(final SessionXARollbackMessage packet)
+ private void doHandleXARollback(final SessionXARollbackMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXARollback(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXARollback(packet);
- }
- });
- }
- }
-
- public void doHandleXARollback(final SessionXARollbackMessage packet)
- {
Packet response = null;
Xid xid = packet.getXid();
@@ -1461,29 +2047,8 @@
channel.send(response);
}
- public void handleXAStart(final SessionXAStartMessage packet)
+ private void doHandleXAStart(final SessionXAStartMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXAStart(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXAStart(packet);
- }
- });
- }
- }
-
- public void doHandleXAStart(final SessionXAStartMessage packet)
- {
Packet response = null;
Xid xid = packet.getXid();
@@ -1533,29 +2098,8 @@
channel.send(response);
}
- public void handleXASuspend(final Packet packet)
+ private void doHandleXASuspend(final Packet packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXASuspend(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXASuspend(packet);
- }
- });
- }
- }
-
- public void doHandleXASuspend(final Packet packet)
- {
Packet response = null;
try
@@ -1603,29 +2147,8 @@
channel.send(response);
}
- public void handleXAPrepare(final SessionXAPrepareMessage packet)
+ private void doHandleXAPrepare(final SessionXAPrepareMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleXAPrepare(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleXAPrepare(packet);
- }
- });
- }
- }
-
- public void doHandleXAPrepare(final SessionXAPrepareMessage packet)
- {
Packet response = null;
Xid xid = packet.getXid();
@@ -1684,29 +2207,8 @@
channel.send(response);
}
- public void handleGetInDoubtXids(final Packet packet)
+ private void doHandleGetInDoubtXids(final Packet packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleGetInDoubtXids(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleGetInDoubtXids(packet);
- }
- });
- }
- }
-
- public void doHandleGetInDoubtXids(final Packet packet)
- {
Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
channel.confirm(packet);
@@ -1714,29 +2216,8 @@
channel.send(response);
}
- public void handleGetXATimeout(final Packet packet)
+ private void doHandleGetXATimeout(final Packet packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleGetXATimeout(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleGetXATimeout(packet);
- }
- });
- }
- }
-
- public void doHandleGetXATimeout(final Packet packet)
- {
Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
channel.confirm(packet);
@@ -1744,29 +2225,8 @@
channel.send(response);
}
- public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
+ private void doHandleSetXATimeout(final SessionXASetTimeoutMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleSetXATimeout(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleSetXATimeout(packet);
- }
- });
- }
- }
-
- public void doHandleSetXATimeout(final SessionXASetTimeoutMessage packet)
- {
Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
channel.confirm(packet);
@@ -1774,29 +2234,8 @@
channel.send(response);
}
- public void handleAddDestination(final SessionAddDestinationMessage packet)
+ private void doHandleAddDestination(final SessionAddDestinationMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleAddDestination(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleAddDestination(packet);
- }
- });
- }
- }
-
- public void doHandleAddDestination(final SessionAddDestinationMessage packet)
- {
Packet response = null;
final SimpleString address = packet.getAddress();
@@ -1859,29 +2298,8 @@
channel.send(response);
}
- public void handleRemoveDestination(final SessionRemoveDestinationMessage packet)
+ private void doHandleRemoveDestination(final SessionRemoveDestinationMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleRemoveDestination(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleRemoveDestination(packet);
- }
- });
- }
- }
-
- public void doHandleRemoveDestination(final SessionRemoveDestinationMessage packet)
- {
Packet response = null;
final SimpleString address = packet.getAddress();
@@ -1919,159 +2337,8 @@
channel.send(response);
}
- private void lockConsumers()
+ private void doHandleClose(final Packet packet)
{
- for (ServerConsumer consumer : consumers.values())
- {
- consumer.lock();
- }
- }
-
- private void unlockConsumers()
- {
- for (ServerConsumer consumer : consumers.values())
- {
- consumer.unlock();
- }
- }
-
- public void handleStart(final Packet packet)
- {
- boolean lock = channel.getReplicatingChannel() != null;
-
- if (lock)
- {
- lockConsumers();
- }
-
- // We need to prevent any delivery and replication of delivery occurring while the start/stop
- // is being processed.
- // Otherwise we can end up with start/stop being processed in different order on backup to live.
- // Which can result in, say, a delivery arriving at backup, but it's still not started!
- DelayedResult result = null;
- try
- {
- result = channel.replicatePacket(packet);
-
- // note we process start before response is back from the backup
-
- setStarted(true);
- }
- finally
- {
- if (lock)
- {
- unlockConsumers();
- }
- }
-
- if (result == null)
- {
- channel.confirm(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- channel.confirm(packet);
- }
- });
- }
- }
-
- // TODO try removing the lock consumers and see what happens!!
- public void handleStop(final Packet packet)
- {
- boolean lock = channel.getReplicatingChannel() != null;
-
- if (lock)
- {
- lockConsumers();
- }
-
- try
- {
- DelayedResult result = channel.replicatePacket(packet);
-
- // note we process stop before response is back from the backup
-
- final Packet response = new NullResponseMessage();
-
- setStarted(false);
-
- if (result == null)
- {
- channel.confirm(packet);
- // Not clustered - just send now
- channel.send(response);
- }
- else
- {
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- channel.confirm(packet);
-
- channel.send(response);
- }
- });
- }
- }
- finally
- {
- if (lock)
- {
- unlockConsumers();
- }
- }
- }
-
- public void handleFailedOver(final Packet packet)
- {
- Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
-
- for (ServerConsumer consumer : consumersClone)
- {
- consumer.failedOver();
- }
- }
-
- public void handleClose(final Packet packet)
- {
- // We need to stop the consumers first before replicating, to ensure no deliveries occur after this,
- // but we need to process the actual close() when the replication response returns, otherwise things
- // can happen like acks can come in after close
-
- for (ServerConsumer consumer : consumers.values())
- {
- consumer.setStarted(false);
- }
-
- DelayedResult result = channel.replicatePacket(packet);
-
- if (result == null)
- {
- doHandleClose(packet);
- }
- else
- {
- // Don't process until result has come back from backup
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doHandleClose(packet);
- }
- });
- }
- }
-
- public void doHandleClose(final Packet packet)
- {
Packet response = null;
try
@@ -2113,274 +2380,6 @@
started = s;
}
- public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
- {
- // We need to stop the consumer first before replicating, to ensure no deliveries occur after this,
- // but we need to process the actual close() when the replication response returns, otherwise things
- // can happen like acks can come in after close
-
- ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
- consumer.handleClose(packet);
- }
-
- public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
- {
- DelayedResult result = channel.replicatePacket(packet);
-
- try
- {
- // Note we don't wait for response before handling this
-
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- log.error("Failed to receive credits", e);
- }
-
- if (result == null)
- {
- channel.confirm(packet);
- }
- else
- {
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- channel.confirm(packet);
- }
- });
- }
- }
-
- public void handleSendLargeMessage(final SessionSendMessage packet)
- {
-
- if (packet.getMessageID() <= 0L)
- {
- // must generate message id here, so we know they are in sync on live and backup
- long id = storageManager.generateUniqueID();
-
- packet.setMessageID(id);
- }
-
- DelayedResult result = channel.replicatePacket(packet);
-
- // With a send we must make sure it is replicated to backup before being processed on live
- // or can end up with delivery being processed on backup before original send
-
- if (result == null)
- {
- doSendLargeMessage(packet);
- }
- else
- {
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doSendLargeMessage(packet);
- }
- });
- }
-
- }
-
- public void handleSend(final SessionSendMessage packet)
- {
- // With a send we must make sure it is replicated to backup before being processed on live
- // or can end up with delivery being processed on backup before original send
-
- ServerMessage msg = packet.getServerMessage();
-
- final SendLock lock;
-
- if (channel.getReplicatingChannel() != null)
- {
- lock = postOffice.getAddressLock(msg.getDestination());
-
- lock.beforeSend();
- }
- else
- {
- lock = null;
- }
-
- if (packet.getMessageID() <= 0L)
- {
- // must generate message id here, so we know they are in sync on live and backup
- long id = storageManager.generateUniqueID();
-
- packet.setMessageID(id);
- }
-
- if (channel.getReplicatingChannel() != null)
- {
- msg.putBooleanProperty(new SimpleString("clustered"), true);
- }
-
- DelayedResult result = channel.replicatePacket(packet);
-
- // With a send we must make sure it is replicated to backup before being processed on live
- // or can end up with delivery being processed on backup before original send
-
- if (result == null)
- {
- doSend(packet);
- }
- else
- {
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doSend(packet);
-
- lock.afterSend();
- }
- });
- }
- }
-
- public void handleSendContinuations(final SessionSendContinuationMessage packet)
- {
- DelayedResult result = channel.replicatePacket(packet);
-
- // With a send we must make sure it is replicated to backup before being processed on live
- // or can end up with delivery being processed on backup before original send
-
- if (result == null)
- {
- doSendContinuations(packet);
- }
- else
- {
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- doSendContinuations(packet);
- }
- });
- }
- }
-
- public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
- {
- ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
- if (consumer == null)
- {
- throw new IllegalStateException("Cannot handle replicated delivery, consumer is closed");
- }
-
- try
- {
- consumer.deliverReplicated(packet.getMessageID());
- }
- catch (Exception e)
- {
- log.error("Failed to handle replicated delivery", e);
- }
- }
-
- public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
- {
- boolean wasStarted = this.started;
-
- if (wasStarted)
- {
- this.setStarted(false);
- }
-
- remotingConnection.removeFailureListener(this);
-
- channel.transferConnection(newConnection);
-
- RemotingConnection oldReplicatingConnection = newConnection.getReplicatingConnection();
-
- if (oldReplicatingConnection != null)
- {
- oldReplicatingConnection.destroy();
- }
-
- newConnection.setReplicatingConnection(remotingConnection.getReplicatingConnection());
-
- remotingConnection.setReplicatingConnection(null);
-
- newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
- // Destroy the old connection
- remotingConnection.destroy();
-
- remotingConnection = newConnection;
-
- remotingConnection.addFailureListener(this);
-
- int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
-
- channel.replayCommands(lastReceivedCommandID);
-
- if (wasStarted)
- {
- this.setStarted(true);
- }
-
- return serverLastReceivedCommandID;
- }
-
- public Channel getChannel()
- {
- return channel;
- }
-
- // FailureListener implementation
- // --------------------------------------------------------------------
-
- public boolean connectionFailed(final MessagingException me)
- {
- try
- {
- log.info("Connection timed out, so clearing up resources for session " + name);
-
- for (Runnable runner : failureRunners)
- {
- try
- {
- runner.run();
- }
- catch (Throwable t)
- {
- log.error("Failed to execute failure runner", t);
- }
- }
-
- // We call handleClose() since we need to replicate the close too, if there is a backup
- handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
-
- log.info("Cleared up resources for session " + name);
- }
- catch (Throwable t)
- {
- log.error("Failed to close connection " + this);
- }
-
- return true;
- }
-
- // Public
- // ----------------------------------------------------------------------------
-
- public Transaction getTransaction()
- {
- return tx;
- }
-
- // Private
- // ----------------------------------------------------------------------------
-
private void doSendLargeMessage(final SessionSendMessage packet)
{
Packet response = null;
@@ -2607,55 +2606,14 @@
// check the user has write access to this address.
doSecurity(msg);
- SimpleString duplicateID = (SimpleString)msg.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
-
- DuplicateIDCache cache = null;
-
- if (duplicateID != null)
+ if (tx == null || autoCommitSends)
{
- cache = postOffice.getDuplicateIDCache(msg.getDestination());
-
- if (cache.contains(duplicateID))
- {
- log.warn("Duplicate message detected - message will not be routed");
-
- return;
- }
+ postOffice.route(msg, null);
}
-
- Transaction theTx = null;
- boolean startedTx = false;
-
- if (!autoCommitSends)
+ else
{
- theTx = tx;
+ postOffice.route(msg, tx);
}
- else if (cache != null)
- {
- theTx = new TransactionImpl(storageManager, postOffice);
-
- startedTx = true;
- }
-
- if (theTx == null)
- {
- postOffice.route(msg, null);
- }
- else
- {
- postOffice.route(msg, theTx);
-
- // Add to cache in same transaction
- if (cache != null)
- {
- cache.addToCache(duplicateID, theTx);
- }
-
- if (startedTx)
- {
- theTx.commit();
- }
- }
}
private void doSecurity(final ServerMessage msg) throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -365,8 +365,6 @@
storageManager.commit(id);
}
- //postOffice.deliver(refsToAdd);
-
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
// transaction until all the messages were added to the queue
// or else we could deliver the messages out of order
@@ -390,8 +388,7 @@
{
operation.afterCommit();
}
- }
-
+ }
}
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DeliveryOrderTest.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -146,9 +146,19 @@
c++;
+ //log.info("Got message " + c);
+
if (c == num)
{
latch.countDown();
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (Exception e)
+ {
+ }
}
}
catch (JMSException e)
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -80,9 +80,10 @@
boolean durable,
boolean temporary,
boolean exclusive,
- SimpleString linkAddress) throws Exception
+ SimpleString linkAddress,
+ boolean duplicateDetection) throws Exception
{
- Link link = queueFactory.createLink(-1, queueName, filter, durable, false, linkAddress);
+ Link link = queueFactory.createLink(-1, queueName, filter, durable, false, linkAddress, duplicateDetection);
Binding binding = new FakeBinding(address, link);
bindings.put(address, binding);
return binding;
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-01-02 10:53:23 UTC (rev 5565)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-01-02 12:11:25 UTC (rev 5566)
@@ -54,9 +54,9 @@
}
public Link createLink(long persistenceID, SimpleString name, Filter filter,
- boolean durable, boolean temporary, SimpleString linkAddress)
+ boolean durable, boolean temporary, SimpleString linkAddress, boolean duplicateDetection)
{
- Link link = new LinkImpl(name, durable, filter, linkAddress, postOffice, null);
+ Link link = new LinkImpl(name, durable, filter, linkAddress, duplicateDetection, postOffice, null);
link.setPersistenceID(persistenceID);
More information about the jboss-cvs-commits
mailing list