[jboss-cvs] JBoss Messaging SVN: r5605 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/persistence/impl/journal and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 8 22:25:05 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-08 22:25:04 -0500 (Thu, 08 Jan 2009)
New Revision: 5605
Added:
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Removed:
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/remote/
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
Log:
Part I
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -365,17 +365,19 @@
// We set the duplicate detection header to prevent the message being depaged more than once in case of
// failure during depage
+ ServerMessage msg = message.getMessage(storageManager);
+
+ // FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
+
byte[] bytes = new byte[8];
ByteBuffer buff = ByteBuffer.wrap(bytes);
- ServerMessage msg = message.getMessage(storageManager);
-
buff.putLong(msg.getMessageID());
SimpleString duplID = new SimpleString(bytes);
- message.getMessage(storageManager).putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
+ msg.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
}
int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
@@ -611,6 +613,8 @@
currentPageLock.readLock().unlock();
}
+ System.out.println("Paging " + this.getStoreName());
+
// if the first check failed, we do it again under a global currentPageLock
// (writeLock) this time
writeLock.lock();
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -474,7 +474,7 @@
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
- MessageReference removed = queue.removeReferenceWithID(messageID);
+ MessageReference removed = queue.removeReferenceWithID(messageID, true);
if (removed == null)
{
@@ -894,7 +894,7 @@
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
- MessageReference removed = queue.removeReferenceWithID(messageID);
+ MessageReference removed = queue.removeReferenceWithID(messageID, true);
referencesToAck.add(removed);
@@ -976,7 +976,7 @@
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
- MessageReference removed = queue.removeReferenceWithID(messageID);
+ MessageReference removed = queue.removeReferenceWithID(messageID, true);
referencesToAck.add(removed);
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -26,7 +26,9 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.AddressManager;
import org.jboss.messaging.core.postoffice.Binding;
@@ -45,6 +47,8 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.TransactionOperation;
+import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.SimpleString;
@@ -52,6 +56,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -293,7 +298,14 @@
final SimpleString linkAddress,
final boolean duplicateDetection) throws Exception
{
- Binding binding = createLinkBinding(name, address, filter, durable, temporary, exclusive, linkAddress, duplicateDetection);
+ Binding binding = createLinkBinding(name,
+ address,
+ filter,
+ durable,
+ temporary,
+ exclusive,
+ linkAddress,
+ duplicateDetection);
addBindingInMemory(binding);
@@ -356,7 +368,50 @@
if (bindings != null)
{
- bindings.route(message, tx);
+ // TODO: Why this method is being called with tx == null?
+ if (tx == null)
+ {
+ bindings.route(message, null);
+ }
+ else
+ {
+ SimpleString destination = message.getDestination();
+
+ // TODO - this can all be optimised
+ Set<SimpleString> pagingAddresses = (Set<SimpleString>)tx.getProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE);
+
+ if (pagingAddresses == null)
+ {
+ pagingAddresses = new HashSet<SimpleString>();
+
+ tx.putProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE, pagingAddresses);
+ }
+
+ boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
+
+ if (!depage && !message.isReload() &&
+ (pagingAddresses.contains(destination) || pagingManager.isPaging(destination)))
+ {
+ pagingAddresses.add(destination);
+
+ List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
+
+ if (messages == null)
+ {
+ messages = new ArrayList<ServerMessage>();
+
+ tx.putProperty(TransactionPropertyIndexes.PAGED_MESSAGES, messages);
+
+ tx.addOperation(new PageMessageOperation());
+ }
+
+ messages.add(message);
+ }
+ else
+ {
+ bindings.route(message, tx);
+ }
+ }
}
}
@@ -480,7 +535,13 @@
final SimpleString linkAddress,
final boolean duplicateDetection) throws Exception
{
- Bindable bindable = bindableFactory.createLink(-1, name, filter, durable, temporary, linkAddress, duplicateDetection);
+ Bindable bindable = bindableFactory.createLink(-1,
+ name,
+ filter,
+ durable,
+ temporary,
+ linkAddress,
+ duplicateDetection);
Binding binding = new BindingImpl(BindingType.LINK, address, bindable, exclusive);
@@ -544,10 +605,15 @@
}
pagingManager.reloadStores();
-
+
Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
- storageManager.loadMessageJournal(this, storageManager, queueSettingsRepository, queues, resourceManager, duplicateIDMap);
+ storageManager.loadMessageJournal(this,
+ storageManager,
+ queueSettingsRepository,
+ queues,
+ resourceManager,
+ duplicateIDMap);
for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry : duplicateIDMap.entrySet())
{
@@ -562,7 +628,7 @@
}
// This is necessary as if the server was previously stopped while a depage was being executed,
- // it needs to resume the depage process on those destinations
+ // it needs to resume the depage process on those destinations
pagingManager.startGlobalDepage();
}
@@ -598,4 +664,106 @@
}
}
}
+
+ // TODO - this can be further optimised to have one PageMessageOperation per message, NOT one which uses a shared
+ // list
+ private class PageMessageOperation implements TransactionOperation
+ {
+
+ public void afterCommit(final Transaction tx) throws Exception
+ {
+ }
+
+ public void afterPrepare(final Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(final Transaction tx) throws Exception
+ {
+ if (tx.getState() != Transaction.State.PREPARED)
+ {
+ pageMessages(tx);
+ }
+ }
+
+ public void beforePrepare(final Transaction tx) throws Exception
+ {
+ pageMessages(tx);
+ }
+
+ public void beforeRollback(final Transaction tx) throws Exception
+ {
+ }
+
+ private void pageMessages(final Transaction tx) throws Exception
+ {
+ List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
+
+ if (messages != null && !messages.isEmpty())
+ {
+ PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+
+ if (pageTransaction == null)
+ {
+ pageTransaction = new PageTransactionInfoImpl(tx.getID());
+
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
+
+ // To avoid a race condition where depage happens before the transaction is completed, we need to inform
+ // the
+ // pager about this transaction is being processed
+ pagingManager.addTransaction(pageTransaction);
+ }
+
+ boolean pagingPersistent = false;
+
+ HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
+
+ // We only need to add the dupl id header once per transaction
+ boolean first = true;
+ for (ServerMessage message : messages)
+ {
+ // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+ // Explained under Transaction On Paging. (This is the item B)
+ if (pagingManager.page(message, tx.getID(), first))
+ {
+ if (message.isDurable())
+ {
+ // We only create pageTransactions if using persistent messages
+ pageTransaction.increment();
+ pagingPersistent = true;
+ pagedDestinationsToSync.add(message.getDestination());
+ }
+ }
+ else
+ {
+ // This could happen when the PageStore left the pageState
+
+ // TODO is this correct - don't we lose transactionality here???
+ route(message, null);
+ }
+ first = false;
+ }
+
+ if (pagingPersistent)
+ {
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+
+ if (!pagedDestinationsToSync.isEmpty())
+ {
+ pagingManager.sync(pagedDestinationsToSync);
+ storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ }
+ }
+
+ messages.clear();
+ }
+ }
+
+ }
+
}
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -93,8 +93,26 @@
int getMessagesAdded();
- MessageReference removeReferenceWithID(long id) throws Exception;
+ /**
+ * When replicating deliveries, we are just replaying removing the messages from the Queue,
+ * and on that case removeSizes are coming from acks, on that case acknowledged should be passed as false
+ *
+ * @param id
+ * @param affectPaging
+ * @return
+ * @throws Exception
+ */
+ MessageReference removeReferenceWithID(long id, boolean acknowledged) throws Exception;
+
+ /**
+ * Remove the reference or add it on the DuplicateIDCache case not found
+ * @param id
+ * @return
+ * @throws Exception
+ */
+ MessageReference removeOrCacheReference(long id) throws Exception;
+
/** Remove message from queue, add it to the scheduled delivery list without affect reference counting */
void rescheduleDelivery(long id, long scheduledDeliveryTime);
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -12,27 +12,26 @@
package org.jboss.messaging.core.server.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.DuplicateIDCache;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -81,8 +80,6 @@
private final boolean temporary;
- private final PostOffice postOffice;
-
private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
@@ -91,6 +88,13 @@
private volatile Distributor distributionPolicy = new RoundRobinDistributor();
+ private final DuplicateIDCache duplicateIDCache;
+
+ /**
+ * We need to Lock the duplicateIDCache
+ * */
+ private final Lock duplicateIDLock = new ReentrantLock();
+
private boolean direct;
private boolean promptDelivery;
@@ -133,7 +137,7 @@
this.temporary = temporary;
- this.postOffice = postOffice;
+ this.duplicateIDCache = postOffice.getDuplicateIDCache(this.name);
this.storageManager = storageManager;
@@ -154,154 +158,129 @@
// Bindable implementation -------------------------------------------------------------------------------------
public boolean route(final ServerMessage message, Transaction tx) throws Exception
- {
+ {
if (filter != null && !filter.match(message))
{
return false;
}
-
+
SimpleString duplicateID = (SimpleString)message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
- DuplicateIDCache cache = null;
+ boolean startedTx = false;
- if (!message.isReload() && duplicateID != null)
- {
- cache = postOffice.getDuplicateIDCache(message.getDestination());
+ boolean durableRef = message.isDurable() && durable;
- if (cache.contains(duplicateID))
- {
- if (tx == null)
- {
- log.warn("Duplicate message detected - message will not be routed");
- }
- else
- {
- log.warn("Duplicate message detected - transaction will be rejected");
-
- tx.markAsRollbackOnly(null);
- }
+ boolean startLock = false;
- return true;
- }
- }
-
- boolean durableRef = message.isDurable() && durable;
-
- boolean startedTx = false;
-
- if (cache != null && tx == null && durableRef)
+ try
{
- //We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
-
- tx = new TransactionImpl(storageManager);
-
- startedTx = true;
- }
-
- // There is no way to cache the Store, since a Queue may belong to multiple addresses,
- // so we aways need this lookup
- PagingStore store = pagingManager.getPageStore(message.getDestination());
- if (tx == null)
- {
- // If durable, must be persisted before anything is routed
- MessageReference ref = message.createReference(this);
-
- if (!message.isReload())
+ if (!message.isReload() && duplicateID != null)
{
- if (message.getRefCount() == 1)
+ if (duplicateIDCache.contains(duplicateID))
{
- if (durableRef)
+ if (tx == null)
{
- storageManager.storeMessage(message);
+ log.warn("Duplicate message detected - message will not be routed");
}
-
- if (cache != null)
+ else
{
- cache.addToCache(duplicateID);
+ log.warn("Duplicate message detected - transaction will be rejected");
+
+ //tx.markAsRollbackOnly(null);
}
+
+ return true;
}
- Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
- if (scheduledDeliveryTime != null)
+ if (tx == null && durableRef)
{
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
+ tx = new TransactionImpl(storageManager);
+ startedTx = true;
- if (durableRef)
- {
- storageManager.updateScheduledDeliveryTime(ref);
- }
}
+
+ if (isBackup())
+ {
+ // We need to lock access to route when dealing with backupNodes
+ // Paging will add IDs for records not found during replicateACK,
+ // and that needs to be atomic between route and addID
+ duplicateIDLock.lock();
+ startLock = true;
+ }
}
- if (message.getRefCount() == 1)
+ // There is no way to cache the Store, since a Queue may belong to multiple addresses,
+ // so we always need this lookup
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (tx == null)
{
- store.addSize(message.getMemoryEstimate());
- }
+ // If durable, must be persisted before anything is routed
+ MessageReference ref = message.createReference(this);
- store.addSize(ref.getMemoryEstimate());
+ if (!message.isReload())
+ {
+ if (message.getRefCount() == 1)
+ {
+ if (durableRef)
+ {
+ storageManager.storeMessage(message);
+ }
- // TODO addLast never currently returns anything other than STATUS_HANDLED
+ if (duplicateID != null)
+ {
+ duplicateIDCache.addToCache(duplicateID);
+ }
+ }
- addLast(ref);
- }
- else
- {
- // TODO combine this similar logic with the non transactional case
+ Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
- SimpleString destination = message.getDestination();
+ if (scheduledDeliveryTime != null)
+ {
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- //TODO - this can all be optimised
- Set<SimpleString> pagingAddresses = (Set<SimpleString>)tx.getProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE);
-
- if (pagingAddresses == null)
- {
- pagingAddresses = new HashSet<SimpleString>();
-
- tx.putProperty(TransactionPropertyIndexes.DESTINATIONS_IN_PAGE_MODE, pagingAddresses);
- }
-
- boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-
- if (!depage && !message.isReload() && (pagingAddresses.contains(destination) || pagingManager.isPaging(destination)))
- {
- pagingAddresses.add(destination);
-
- List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
-
- if (messages == null)
+ if (durableRef)
+ {
+ storageManager.updateScheduledDeliveryTime(ref);
+ }
+ }
+ }
+
+ if (message.getRefCount() == 1)
{
- messages = new ArrayList<ServerMessage>();
-
- tx.putProperty(TransactionPropertyIndexes.PAGED_MESSAGES, messages);
-
- tx.addOperation(new PageMessageOperation());
+ store.addSize(message.getMemoryEstimate());
}
-
- messages.add(message);
+
+ store.addSize(ref.getMemoryEstimate());
+
+ // TODO addLast never currently returns anything other than STATUS_HANDLED
+
+ addLast(ref);
}
else
{
+ // TODO combine this similar logic with the non transactional case
+
MessageReference ref = message.createReference(this);
boolean first = message.getRefCount() == 1;
-
- if (!message.isReload() && message.getRefCount() == 1)
+
+ if (!message.isReload() && message.getRefCount() == 1)
{
if (durableRef)
{
storageManager.storeMessageTransactional(tx.getID(), message);
}
-
- if (cache != null)
+
+ if (duplicateID != null)
{
- cache.addToCache(duplicateID, tx);
+ duplicateIDCache.addToCache(duplicateID, tx);
}
}
Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
+
if (scheduledDeliveryTime != null)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
@@ -318,25 +297,38 @@
}
store.addSize(ref.getMemoryEstimate());
-
+
tx.addOperation(new AddMessageOperation(ref, first));
if (durableRef)
- {
+ {
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
}
}
+
+ if (startedTx)
+ {
+ tx.commit();
+ tx = null;
+ }
}
-
- if (startedTx)
+ finally
{
- tx.commit();
+ if (startedTx && tx != null)
+ {
+ // some exception happened during routing, the tx needs to be rolled back
+ tx.rollback();
+ }
+
+ if (startLock)
+ {
+ duplicateIDLock.unlock();
+ }
}
-
+
return true;
}
-
-
+
// Queue implementation ----------------------------------------------------------------------------------------
public boolean isClustered()
@@ -360,7 +352,7 @@
}
public void addLast(final MessageReference ref)
- {
+ {
add(ref, false);
}
@@ -448,8 +440,66 @@
}
}
- public synchronized MessageReference removeReferenceWithID(final long id) throws Exception
+ public MessageReference removeOrCacheReference(long id) throws Exception
{
+ // most of the times, the remove will work ok, so we first try it without any locks
+ MessageReference ref = removeReferenceWithID(id, false);
+
+ if (ref == null)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ System.out.println("Retry " + i);
+ Thread.sleep(100);
+
+ ref = removeReferenceWithID(id, false);
+
+ if (ref != null)
+ {
+ System.out.println("Finally found it:");
+ break;
+ }
+ }
+// duplicateIDLock.lock();
+// try
+// {
+// // we need to do it again under a lock.
+// // Depage could still be routing the Message, so we need to lock the duplicateIDLock
+// // to avoid a race with route
+//
+// ref = removeReferenceWithID(id, false);
+//
+// if (ref == null)
+// {
+//
+// System.out.println("Didn't find the reference " + id + " on backup. Storing it!");
+//
+//
+// // FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
+//
+// byte[] bytes = new byte[8];
+//
+// ByteBuffer buff = ByteBuffer.wrap(bytes);
+//
+// buff.putLong(id);
+//
+// SimpleString duplID = new SimpleString(bytes);
+//
+// duplicateIDCache.addToCache(duplID);
+// }
+// }
+// finally
+// {
+// duplicateIDLock.unlock();
+// }
+ }
+
+ return ref;
+
+ }
+
+ public synchronized MessageReference removeReferenceWithID(final long id, final boolean acknowledged) throws Exception
+ {
Iterator<MessageReference> iterator = messageReferences.iterator();
MessageReference removed = null;
@@ -464,7 +514,10 @@
removed = ref;
- referenceRemoved(removed);
+ if (acknowledged)
+ {
+ referenceRemoved(removed);
+ }
break;
}
@@ -519,14 +572,14 @@
return null;
}
-
+
public long getPersistenceID()
{
return persistenceID;
}
public void setPersistenceID(final long id)
- {
+ {
this.persistenceID = id;
}
@@ -588,11 +641,12 @@
public synchronized int deleteAllReferences(final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
- {
+ {
return deleteMatchingReferences(null, storageManager, postOffice, queueSettingsRepository);
}
- public synchronized int deleteMatchingReferences(final Filter filter, final StorageManager storageManager,
+ public synchronized int deleteMatchingReferences(final Filter filter,
+ final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
@@ -631,7 +685,8 @@
return count;
}
- public synchronized boolean deleteReference(final long messageID, final StorageManager storageManager,
+ public synchronized boolean deleteReference(final long messageID,
+ final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
@@ -1132,115 +1187,17 @@
}
}
}
-
- //TODO - this can be further optimised to have one PageMessageOperation per message, NOT one which uses a shared list
- private class PageMessageOperation implements TransactionOperation
- {
- public void afterCommit(final Transaction tx) throws Exception
- {
- }
- public void afterPrepare(final Transaction tx) throws Exception
- {
- }
-
- public void afterRollback(final Transaction tx) throws Exception
- {
- }
-
- public void beforeCommit(final Transaction tx) throws Exception
- {
- if (tx.getState() != Transaction.State.PREPARED)
- {
- pageMessages(tx);
- }
- }
-
- public void beforePrepare(final Transaction tx) throws Exception
- {
- pageMessages(tx);
- }
-
- public void beforeRollback(final Transaction tx) throws Exception
- {
- }
-
- private void pageMessages(final Transaction tx) throws Exception
- {
- List<ServerMessage> messages = (List<ServerMessage>)tx.getProperty(TransactionPropertyIndexes.PAGED_MESSAGES);
-
- if (messages != null && !messages.isEmpty())
- {
- PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-
- if (pageTransaction == null)
- {
- pageTransaction = new PageTransactionInfoImpl(tx.getID());
-
- tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransaction);
-
- // To avoid a race condition where depage happens before the transaction is completed, we need to inform the
- // pager about this transaction is being processed
- pagingManager.addTransaction(pageTransaction);
- }
-
- boolean pagingPersistent = false;
-
- HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
-
- // We only need to add the dupl id header once per transaction
- boolean first = true;
- for (ServerMessage message : messages)
- {
- // http://wiki.jboss.org/wiki/JBossMessaging2Paging
- // Explained under Transaction On Paging. (This is the item B)
- if (pagingManager.page(message, tx.getID(), first))
- {
- if (message.isDurable())
- {
- // We only create pageTransactions if using persistent messages
- pageTransaction.increment();
- pagingPersistent = true;
- pagedDestinationsToSync.add(message.getDestination());
- }
- }
- else
- {
- // This could happen when the PageStore left the pageState
-
- //TODO is this correct - don't we lose transactionality here???
- postOffice.route(message, null);
- }
- first = false;
- }
-
- if (pagingPersistent)
- {
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
- if (!pagedDestinationsToSync.isEmpty())
- {
- pagingManager.sync(pagedDestinationsToSync);
- storageManager.storePageTransaction(tx.getID(), pageTransaction);
- }
- }
-
- messages.clear();
- }
- }
-
- }
-
private class AddMessageOperation implements TransactionOperation
{
private final MessageReference ref;
-
+
private final boolean first;
AddMessageOperation(final MessageReference ref, final boolean first)
{
this.ref = ref;
-
+
this.first = first;
}
@@ -1258,7 +1215,7 @@
}
public void beforeCommit(final Transaction tx) throws Exception
- {
+ {
}
public void beforePrepare(final Transaction tx) throws Exception
@@ -1268,11 +1225,11 @@
public void beforeRollback(final Transaction tx) throws Exception
{
ServerMessage msg = ref.getMessage();
-
+
PagingStore store = pagingManager.getPageStore(msg.getDestination());
-
+
store.addSize(-ref.getMemoryEstimate());
-
+
if (first)
{
store.addSize(-msg.getMemoryEstimate());
@@ -1281,5 +1238,4 @@
}
-
}
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -406,23 +406,22 @@
{
// It may not be the first in the queue - since there may be multiple producers
// sending to the queue
- MessageReference ref = messageQueue.removeReferenceWithID(messageID);
+ MessageReference ref = messageQueue.removeOrCacheReference(messageID);
- if (ref == null)
+ if (ref != null)
{
- throw new IllegalStateException("Cannot find ref when replicating delivery " + messageID);
- }
+ // We call doHandle rather than handle, since we don't want to check available credits
+ // This is because delivery and receive credits can be processed in different order on live
+ // and backup, and otherwise we could have a situation where the delivery is replicated
+ // but the credits haven't arrived yet, so the delivery gets rejected on backup
+ HandleStatus handled = doHandle(ref);
- // We call doHandle rather than handle, since we don't want to check available credits
- // This is because delivery and receive credits can be processed in different order on live
- // and backup, and otherwise we could have a situation where the delivery is replicated
- // but the credits haven't arrived yet, so the delivery gets rejected on backup
- HandleStatus handled = doHandle(ref);
-
- if (handled != HandleStatus.HANDLED)
- {
- throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
+ if (handled != HandleStatus.HANDLED)
+ {
+ throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
+ }
}
+
}
public void failedOver()
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -56,9 +56,9 @@
protected final Map<String, Object> backupParams = new HashMap<String, Object>();
- private MessagingService liveService;
+ protected MessagingService liveService;
- private MessagingService backupService;
+ protected MessagingService backupService;
// Static --------------------------------------------------------
@@ -73,7 +73,7 @@
backupParams));
}
- protected void setUpFileBased() throws Exception
+ protected void setUpFileBased(long maxGlobalSize) throws Exception
{
deleteDirectory(new File(getTestDir()));
@@ -86,8 +86,8 @@
backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
backupConf.setJournalFileSize(100 * 1024);
- backupConf.setPagingMaxGlobalSizeBytes(100 * 1024 * 1024);
- backupConf.setPagingDefaultSize(10 * 1024);
+ backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+ backupConf.setPagingDefaultSize(20 * 1024);
backupConf.setSecurityEnabled(false);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
@@ -108,8 +108,8 @@
liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
- liveConf.setPagingMaxGlobalSizeBytes(100 * 1024 * 1024);
- liveConf.setPagingDefaultSize(10 * 1024);
+ liveConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+ liveConf.setPagingDefaultSize(20 * 1024);
liveConf.setJournalFileSize(100 * 1024);
liveConf.setSecurityEnabled(false);
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -68,12 +68,11 @@
factory.setBlockOnNonPersistentSend(true);
factory.setBlockOnPersistentSend(true);
- // Enable this and the test will fail
- //factory.setMinLargeMessageSize(10 * 1024);
+ factory.setMinLargeMessageSize(10 * 1024);
ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
- final int numberOfMessages = 500;
+ final int numberOfMessages = 1;
final int numberOfBytes = 15000;
@@ -145,7 +144,7 @@
@Override
protected void setUp() throws Exception
{
- setUpFileBased();
+ setUpFileBased(100*1024*1024);
}
protected void tearDown() throws Exception
Added: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java (rev 0)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -0,0 +1,264 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LargeMessageFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 8, 2008 7:09:38 PM
+ *
+ *
+ */
+public class LargeMessageFailoverTest extends FailoverTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testLargeMessageReplicatedNoFailover() throws Exception
+ {
+ testLargeMessage(-1, 500);
+ }
+
+ public void testLargeMessageFailOnProducing() throws Exception
+ {
+ testLargeMessage(1, 500);
+ }
+
+
+// public void testFail() throws Exception
+// {
+// for (int i = 0; i < 100; i++)
+// {
+// System.out.println ("****************** " + i);
+// testLargeMessageFailOnConsume();
+// tearDown();
+// setUp();
+// }
+// }
+//
+ public void testLargeMessageFailOnConsume() throws Exception
+ {
+ testLargeMessage(2, 500);
+ }
+
+ private void testLargeMessage(final int placeToFail, final int numberOfMessages) throws Exception
+ {
+ ClientSessionFactory factory = createFailoverFactory();
+
+ // factory.setMinLargeMessageSize(10 * 1024);
+
+ final int messageSize = 25000;
+
+ try
+ {
+
+ sendMessages(factory, placeToFail == 1, numberOfMessages, messageSize);
+
+ receiveMessages(factory, placeToFail == 2, numberOfMessages, messageSize);
+
+ }
+ finally
+ {
+ System.out.println("Giving up!!!!!!");
+ }
+
+ }
+
+ /**
+ * @param factory
+ * @param placeToFail
+ * @param numberOfMessages
+ * @param messageSize
+ * @throws MessagingException
+ */
+ private void receiveMessages(final ClientSessionFactory factory,
+ final boolean fail,
+ final int numberOfMessages,
+ final int messageSize) throws MessagingException
+ {
+ ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
+
+ try
+ {
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ if (fail)
+ {
+ if (i == numberOfMessages / 3)
+ {
+ forceFailure(conn);
+ }
+ }
+
+ ClientMessage message = consumer.receive(5000);
+
+ assertNotNull(message);
+
+ message.acknowledge();
+
+ MessagingBuffer buffer = message.getBody();
+
+ buffer.rewind();
+
+ assertEquals(messageSize, buffer.limit());
+
+ assertEquals(i, buffer.getInt());
+ }
+ assertNull(consumer.receive(500));
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ /**
+ * @param conn
+ */
+ private void forceFailure(final RemotingConnection conn)
+ {
+ new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ System.out.println("***************************************** Forcing failure");
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+ }
+ }.start();
+ }
+
+ /**
+ * @param factory
+ * @param placeToFail
+ * @param numberOfMessages
+ * @param messageSize
+ * @throws MessagingException
+ */
+ private void sendMessages(final ClientSessionFactory factory,
+ final boolean fail,
+ final int numberOfMessages,
+ final int messageSize) throws MessagingException
+ {
+ ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
+
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ if (fail)
+ {
+ if (i == numberOfMessages / 3)
+ {
+ forceFailure(conn);
+ }
+ }
+
+ ClientMessage message = session.createClientMessage(true);
+
+ ByteBuffer buffer = ByteBuffer.allocate(messageSize);
+
+ buffer.putInt(i);
+
+ buffer.rewind();
+
+ message.setBody(new ByteBufferWrapper(buffer));
+
+ producer.send(message);
+
+ }
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ setUpFileBased(100 * 1024 * 1024);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java (rev 0)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -0,0 +1,186 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A PagingFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 8, 2008 10:53:16 AM
+ *
+ *
+ */
+public class PagingFailoverTest extends FailoverTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ public void testPaging() throws Exception
+ {
+ ClientSession session = null;
+ try
+ {
+ ClientSessionFactory sf1 = createFailoverFactory();
+
+ sf1.setBlockOnAcknowledge(true);
+ sf1.setBlockOnNonPersistentSend(true);
+ sf1.setBlockOnPersistentSend(true);
+
+ //sf1.setMinLargeMessageSize(10 * 1024);
+
+ session = sf1.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ session.start();
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 5000;
+
+ PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
+ PagingStore storeLive = pmLive.getPageStore(ADDRESS);
+
+ PagingManager pmBackup = backupService.getServer().getPostOffice().getPagingManager();
+ PagingStore storeBackup = pmBackup.getPageStore(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+ ByteBuffer buffer = ByteBuffer.allocate(1000);
+
+ buffer.putInt(i);
+
+ buffer.rewind();
+
+ message.setBody(new ByteBufferWrapper(buffer));
+
+ producer.send(message);
+
+ if (storeLive.isPaging())
+ {
+ assertTrue(storeBackup.isPaging());
+ }
+ }
+
+ final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ assertEquals("GloblSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+
+ if (i == numMessages / 2)
+ {
+// assertEquals("GlobalSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
+ System.out.println("Failing");
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+ }
+
+ ClientMessage message = consumer.receive(1000);
+
+ //System.out.println("message = " + message);
+
+ assertNotNull(message);
+
+ message.acknowledge();
+ //session.commit();
+
+ message.getBody().rewind();
+
+ assertEquals(i, message.getBody().getInt());
+
+ }
+
+ assertNull(consumer.receive(100));
+
+
+// assertEquals(0, pmLive.getGlobalSize());
+ assertEquals(0, pmBackup.getGlobalSize());
+
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ // eat it
+ }
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void tearDown() throws Exception
+ {
+ //super.tearDown();
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ setUpFileBased(100 * 1024);
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PageCrashTest.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -23,17 +23,44 @@
package org.jboss.messaging.tests.integration.paging;
import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executor;
import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.management.ManagementService;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PagedMessage;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
+import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
+import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.remoting.RemotingService;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.security.JBMSecurityManager;
+import org.jboss.messaging.core.security.impl.JBMSecurityManagerImpl;
+import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServerImpl;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.integration.paging.remote.RemotePageCrashExecution;
import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.tests.util.SpawnedVMSupport;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.messaging.util.SimpleString;
/**
* This test will make sure that a failing depage won't cause duplicated messages
@@ -49,6 +76,8 @@
// Constants -----------------------------------------------------
+ public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -59,12 +88,9 @@
public void testCrashDuringDeleteFile() throws Exception
{
- clearData();
+
+ pageAndFail();
- Process process = SpawnedVMSupport.spawnVM(RemotePageCrashExecution.class.getCanonicalName());
- process.waitFor();
- assertEquals("The remote process failed, test is invalid", RemotePageCrashExecution.OK, process.exitValue());
-
File pageDir = new File(getPageDir());
File directories[] = pageDir.listFiles();
@@ -97,7 +123,7 @@
session.start();
- ClientConsumer consumer = session.createConsumer(RemotePageCrashExecution.ADDRESS);
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
assertNull(consumer.receive(200));
@@ -120,8 +146,330 @@
super.tearDown();
}
+ // Privte -------------------------------------------------------
+
+
+ /** This method will leave garbage on paging.
+ * It will not delete page files as if the server crashed right after commit,
+ * and before removing the file*/
+ private void pageAndFail() throws Exception
+ {
+ clearData();
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService service = newMessagingService(config);
+
+ service.start();
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(1024);
+
+ ClientMessage message = null;
+
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ PagingStore store = service.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+ int messages = 0;
+ while (!store.isPaging())
+ {
+ producer.send(message);
+ messages++;
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ messages++;
+ producer.send(message);
+ }
+
+ session.close();
+
+ assertTrue(service.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < messages; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+ }
+
+ consumer.close();
+
+ session.close();
+
+ assertEquals(0, service.getServer().getPostOffice().getPagingManager().getGlobalSize());
+
+ }
+ finally
+ {
+ try
+ {
+ service.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
// Private -------------------------------------------------------
+ private MessagingServiceImpl newMessagingService(final Configuration configuration)
+ {
+
+ StorageManager storageManager = new JournalStorageManager(configuration);
+
+ RemotingService remotingService = new RemotingServiceImpl(configuration);
+
+ JBMSecurityManager securityManager = new JBMSecurityManagerImpl(true);
+
+ ManagementService managementService = new ManagementServiceImpl(ManagementFactory.getPlatformMBeanServer(), false);
+
+ remotingService.setManagementService(managementService);
+
+ MessagingServer server = new FailingMessagingServiceImpl();
+
+ server.setConfiguration(configuration);
+
+ server.setStorageManager(storageManager);
+
+ server.setRemotingService(remotingService);
+
+ server.setSecurityManager(securityManager);
+
+ server.setManagementService(managementService);
+
+ return new MessagingServiceImpl(server, storageManager, remotingService);
+ }
+
// Inner classes -------------------------------------------------
+ /** This is hacking MessagingServerImpl,
+ * to make sure the server will fail right
+ * after before the page-file was removed */
+ class FailingMessagingServiceImpl extends MessagingServerImpl
+ {
+ /**
+ * Method could be replaced for test purposes
+ */
+ @Override
+ protected PagingManager createPagingManager()
+ {
+ return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
+ super.getConfiguration().getPagingMaxThreads()),
+ super.getStorageManager(),
+ super.getQueueSettingsRepository(),
+ super.getConfiguration().getPagingMaxGlobalSizeBytes(),
+ super.getConfiguration().getPagingDefaultSize(),
+ super.getConfiguration().isJournalSyncNonTransactional());
+ }
+
+ class FailurePagingStoreFactoryNIO extends PagingStoreFactoryNIO
+
+ {
+ /**
+ * @param directory
+ * @param maxThreads
+ */
+ public FailurePagingStoreFactoryNIO(final String directory, final int maxThreads)
+ {
+ super(directory, maxThreads);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public synchronized PagingStore newStore(final SimpleString destinationName, final QueueSettings settings) throws Exception
+ {
+ Field factoryField = PagingStoreFactoryNIO.class.getDeclaredField("executorFactory");
+ factoryField.setAccessible(true);
+
+ OrderedExecutorFactory factory = (OrderedExecutorFactory)factoryField.get(this);
+ return new FailingPagingStore(destinationName, settings, factory.getExecutor());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ class FailingPagingStore extends PagingStoreImpl
+ {
+
+ /**
+ * @param pagingManager
+ * @param storageManager
+ * @param postOffice
+ * @param fileFactory
+ * @param storeFactory
+ * @param storeName
+ * @param queueSettings
+ * @param executor
+ */
+ public FailingPagingStore(final SimpleString storeName,
+ final QueueSettings queueSettings,
+ final Executor executor)
+ {
+ super(getPostOffice().getPagingManager(),
+ getStorageManager(),
+ getPostOffice(),
+ null,
+ FailurePagingStoreFactoryNIO.this,
+ storeName,
+ queueSettings,
+ executor);
+ }
+
+ @Override
+ protected Page createPage(final int page) throws Exception
+ {
+
+ Page originalPage = super.createPage(page);
+
+ return new FailingPage(originalPage);
+ }
+
+ }
+
+ }
+
+ class FailingPage implements Page
+ {
+ Page delegatedPage;
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#close()
+ */
+ public void close() throws Exception
+ {
+ delegatedPage.close();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#delete()
+ */
+ public void delete() throws Exception
+ {
+ // This will let the file stay, simulating a system failure
+ }
+
+ /**
+ * @return
+ * @see org.jboss.messaging.core.paging.Page#getNumberOfMessages()
+ */
+ public int getNumberOfMessages()
+ {
+ return delegatedPage.getNumberOfMessages();
+ }
+
+ /**
+ * @return
+ * @see org.jboss.messaging.core.paging.Page#getPageId()
+ */
+ public int getPageId()
+ {
+ return delegatedPage.getPageId();
+ }
+
+ /**
+ * @return
+ * @see org.jboss.messaging.core.paging.Page#getSize()
+ */
+ public int getSize()
+ {
+ return delegatedPage.getSize();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#open()
+ */
+ public void open() throws Exception
+ {
+ delegatedPage.open();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#read()
+ */
+ public List<PagedMessage> read() throws Exception
+ {
+ return delegatedPage.read();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#sync()
+ */
+ public void sync() throws Exception
+ {
+ delegatedPage.sync();
+ }
+
+ /**
+ * @param message
+ * @throws Exception
+ * @see org.jboss.messaging.core.paging.Page#write(org.jboss.messaging.core.paging.PagedMessage)
+ */
+ public void write(final PagedMessage message) throws Exception
+ {
+ delegatedPage.write(message);
+ }
+
+ public FailingPage(final Page delegatePage)
+ {
+ delegatedPage = delegatePage;
+ }
+ }
+
+ }
+
+ // Inner classes -------------------------------------------------
+
}
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -637,12 +637,21 @@
}
}
+
+ public void testPageMultipleDestinations() throws Exception
+ {
+ internalTestPageMultipleDestinations(false);
+ }
+
+ public void testPageMultipleDestinationsTransacted() throws Exception
+ {
+ internalTestPageMultipleDestinations(true);
+ }
- public void testPageMultipleDestinations() throws Exception
+
+ private void internalTestPageMultipleDestinations(boolean transacted) throws Exception
{
- clearData();
-
Configuration config = createDefaultConfig();
final int MAX_SIZE = 90 * 1024; // this must be lower than minlargeMessageSize on the SessionFactory
@@ -666,7 +675,7 @@
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ ClientSession session = sf.createSession(null, null, false, !transacted, true, false, 0);
for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
{
@@ -687,6 +696,11 @@
for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
{
producer.send(message);
+
+ if (transacted)
+ {
+ session.commit();
+ }
}
session.close();
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-01-09 03:19:05 UTC (rev 5604)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2009-01-09 03:25:04 UTC (rev 5605)
@@ -1164,7 +1164,7 @@
EasyMock.expect(consumer.handle(messageReference3)).andReturn(HandleStatus.HANDLED);
EasyMock.replay(consumer);
queue.addListFirst(messageReferences);
- queue.removeReferenceWithID(2);
+ queue.removeReferenceWithID(2, true);
queue.addConsumer(consumer);
queue.deliverNow();
EasyMock.verify(consumer);
More information about the jboss-cvs-commits
mailing list