[jboss-cvs] JBoss Messaging SVN: r5608 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/paging/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 9 01:24:32 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-09 01:24:32 -0500 (Fri, 09 Jan 2009)
New Revision: 5608
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
Implementing backupMode on PagingManager
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -58,6 +58,14 @@
*/
public interface PagingManager extends MessagingComponent
{
+ void setBackup();
+
+
+ void activate();
+
+
+ boolean isBackup();
+
/** The system is paging because of global-page-mode */
boolean isGlobalPageMode();
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -62,6 +62,8 @@
void sync() throws Exception;
boolean page(PagedMessage message, boolean sync, boolean duplicateDetection) throws Exception;
+
+ public void readPage() throws Exception;
/**
*
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -59,6 +59,8 @@
private volatile boolean started = false;
private final long maxGlobalSize;
+
+ private volatile boolean backup;
private final AtomicLong globalSize = new AtomicLong(0);
@@ -107,6 +109,25 @@
// PagingManager implementation
// -----------------------------------------------------------------------------------------------------
+ public void setBackup()
+ {
+ this.backup = true;
+ }
+
+
+ public void activate()
+ {
+ this.backup = false;
+
+ startGlobalDepage();
+ }
+
+
+ public boolean isBackup()
+ {
+ return this.backup;
+ }
+
public boolean isGlobalPageMode()
{
return globalMode.get();
@@ -285,10 +306,13 @@
public synchronized void startGlobalDepage()
{
- setGlobalPageMode(true);
- for (PagingStore store : stores.values())
+ if (!isBackup())
{
- store.startDepaging(pagingStoreFactory.getGlobalDepagerExecutor());
+ setGlobalPageMode(true);
+ for (PagingStore store : stores.values())
+ {
+ store.startDepaging(pagingStoreFactory.getGlobalDepagerExecutor());
+ }
}
}
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -368,7 +368,7 @@
ServerMessage msg = message.getMessage(storageManager);
// FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
-
+
byte[] bytes = new byte[8];
ByteBuffer buff = ByteBuffer.wrap(bytes);
@@ -456,6 +456,11 @@
public boolean startDepaging(final Executor executor)
{
+ if (!pagingManager.isBackup())
+ {
+ return false;
+ }
+
currentPageLock.readLock().lock();
try
{
@@ -614,7 +619,7 @@
}
System.out.println("Paging " + this.getStoreName());
-
+
// if the first check failed, we do it again under a global currentPageLock
// (writeLock) this time
writeLock.lock();
@@ -638,6 +643,30 @@
}
}
+ /**
+ * Depage one page-file, read it and send it to the pagingManager / postoffice
+ * @return
+ * @throws Exception
+ */
+ public void readPage() throws Exception
+ {
+ Page page = depage();
+
+ if (page == null)
+ {
+ return;
+ }
+
+ page.open();
+
+ List<PagedMessage> messages = page.read();
+
+ onDepage(page.getPageId(), storeName, messages);
+
+ page.delete();
+
+ }
+
// TestSupportPageStore ------------------------------------------
public void forceAnotherPage() throws Exception
@@ -648,6 +677,7 @@
/**
* It returns a Page out of the Page System without reading it.
* The method calling this method will remove the page and will start reading it outside of any locks.
+ * This method could also replace the current file by a new file, and that process is done through acquiring a writeLock on currentPageLock
*
* Observation: This method is used internally as part of the regular depage process, but externally is used only on tests,
* and that's why this method is part of the Testable Interface
@@ -726,7 +756,7 @@
// Protected -----------------------------------------------------
- // In order to test failures, we need to be able to extend this class
+ // In order to test failures, we need to be able to extend this class
// and replace the Page for another Page that will fail before the file is removed
// That's why createPage is not a private method
protected Page createPage(final int page) throws Exception
@@ -756,7 +786,6 @@
return new PageImpl(fileFactory, file, page);
}
-
// Private -------------------------------------------------------
/**
@@ -949,30 +978,6 @@
return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
}
- /**
- * Depage one page-file, read it and send it to the pagingManager / postoffice
- * @return
- * @throws Exception
- */
- private void readPage() throws Exception
- {
- Page page = depage();
-
- if (page == null)
- {
- return;
- }
-
- page.open();
-
- List<PagedMessage> messages = page.read();
-
- onDepage(page.getPageId(), storeName, messages);
-
- page.delete();
-
- }
-
// Inner classes -------------------------------------------------
private class DepageRunnable implements Runnable
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -466,6 +466,8 @@
}
}
}
+
+ pagingManager.activate();
return queues;
}
@@ -627,9 +629,17 @@
}
}
- // This is necessary as if the server was previously stopped while a depage was being executed,
- // it needs to resume the depage process on those destinations
- pagingManager.startGlobalDepage();
+
+ if (backup)
+ {
+ pagingManager.setBackup();
+ }
+ else
+ {
+ // This is necessary as if the server was previously stopped while a depage was being executed,
+ // it needs to resume the depage process on those destinations
+ pagingManager.startGlobalDepage();
+ }
}
private class MessageExpiryRunner extends Thread
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -13,6 +13,8 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -28,17 +30,21 @@
private long messageID;
+ private SimpleString address;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionReplicateDeliveryMessage(final long consumerID, final long messageID)
+ public SessionReplicateDeliveryMessage(final long consumerID, final long messageID, final SimpleString address)
{
super(SESS_REPLICATE_DELIVERY);
this.consumerID = consumerID;
this.messageID = messageID;
+
+ this.address = address;
}
public SessionReplicateDeliveryMessage()
@@ -58,11 +64,18 @@
return messageID;
}
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putLong(consumerID);
buffer.putLong(messageID);
+
+ buffer.putSimpleString(address);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -70,13 +83,20 @@
consumerID = buffer.getLong();
messageID = buffer.getLong();
+
+ address = buffer.getSimpleString();
}
-
+
public boolean isRequiresConfirmations()
- {
+ {
return false;
}
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofString(address);
+ }
+
public boolean equals(Object other)
{
if (other instanceof SessionReplicateDeliveryMessage == false)
@@ -88,7 +108,7 @@
return super.equals(other) && this.consumerID == r.consumerID && this.messageID == r.messageID;
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -106,12 +106,12 @@
/**
- * Remove the reference or add it on the DuplicateIDCache case not found
- * @param id
+ * Remove the reference or backup nodes.
+ * This may lead to depage operations
* @return
* @throws Exception
*/
- MessageReference removeOrCacheReference(long id) throws Exception;
+ MessageReference removeReferenceOnBackup(SimpleString address, long id) throws Exception;
/** Remove message from queue, add it to the scheduled delivery list without affect reference counting */
void rescheduleDelivery(long id, long scheduledDeliveryTime);
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -26,6 +26,7 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -56,7 +57,7 @@
void failedOver();
- void deliverReplicated(final long messageID) throws Exception;
+ void deliverReplicated(SimpleString address, long messageID) throws Exception;
void lock();
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -12,7 +12,6 @@
package org.jboss.messaging.core.server.impl;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@@ -67,6 +66,8 @@
private static final boolean trace = log.isTraceEnabled();
public static final int NUM_PRIORITIES = 10;
+
+ private static final int MAX_NUMBER_OF_DEPAGES_ON_BACKUP = 10;
private volatile long persistenceID = -1;
@@ -90,11 +91,6 @@
private final DuplicateIDCache duplicateIDCache;
- /**
- * We need to Lock the duplicateIDCache
- * */
- private final Lock duplicateIDLock = new ReentrantLock();
-
private boolean direct;
private boolean promptDelivery;
@@ -170,8 +166,6 @@
boolean durableRef = message.isDurable() && durable;
- boolean startLock = false;
-
try
{
@@ -199,15 +193,6 @@
startedTx = true;
}
-
- if (isBackup())
- {
- // We need to lock access to route when dealing with backupNodes
- // Paging will add IDs for records not found during replicateACK,
- // and that needs to be atomic between route and addID
- duplicateIDLock.lock();
- startLock = true;
- }
}
// There is no way to cache the Store, since a Queue may belong to multiple addresses,
@@ -319,11 +304,6 @@
// some exception happened during routing, the tx needs to be rolled back
tx.rollback();
}
-
- if (startLock)
- {
- duplicateIDLock.unlock();
- }
}
return true;
@@ -440,59 +420,38 @@
}
}
- public MessageReference removeOrCacheReference(long id) throws Exception
+ public MessageReference removeReferenceOnBackup(SimpleString address, long id) throws Exception
{
+ if (!isBackup())
+ {
+ throw new IllegalStateException("removeReferenceOnBackup method could only be called on backup nodes");
+ }
+
// most of the times, the remove will work ok, so we first try it without any locks
MessageReference ref = removeReferenceWithID(id, false);
+ PagingStore store = pagingManager.getPageStore(address);
+
if (ref == null)
{
- // This is a temporary hack, for the temporary branch only
- for (int i = 0; i < 10; i++)
+ for (int i = 0; i < MAX_NUMBER_OF_DEPAGES_ON_BACKUP; i++)
{
- System.out.println("Retry " + i);
- Thread.sleep(100);
-
- ref = removeReferenceWithID(id, false);
-
- if (ref != null)
+ // Can't have the same store being depaged in more than one thread
+ synchronized (store)
{
- System.out.println("Finally found it:");
- break;
+ // as soon as it gets the lock, it needs to verify if another thread couldn't find the reference
+ ref = removeReferenceWithID(id, false);
+ if (ref == null)
+ {
+ // force a depage
+ store.readPage();
+ }
+ else
+ {
+ break;
+ }
}
}
-// duplicateIDLock.lock();
-// try
-// {
-// // we need to do it again under a lock.
-// // Depage could still be routing the Message, so we need to lock the duplicateIDLock
-// // to avoid a race with route
-//
-// ref = removeReferenceWithID(id, false);
-//
-// if (ref == null)
-// {
-//
-// System.out.println("Didn't find the reference " + id + " on backup. Storing it!");
-//
-//
-// // FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
-//
-// byte[] bytes = new byte[8];
-//
-// ByteBuffer buff = ByteBuffer.wrap(bytes);
-//
-// buff.putLong(id);
-//
-// SimpleString duplID = new SimpleString(bytes);
-//
-// duplicateIDCache.addToCache(duplID);
-// }
-// }
-// finally
-// {
-// duplicateIDLock.unlock();
-// }
}
return ref;
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -56,6 +56,7 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.SimpleString;
/**
* Concrete implementation of a ClientConsumer.
@@ -248,16 +249,16 @@
Iterator<MessageReference> iter = refs.iterator();
closed = true;
-
+
Transaction tx = new TransactionImpl(storageManager);
while (iter.hasNext())
{
MessageReference ref = iter.next();
-
- ref.cancel(tx, storageManager, postOffice, queueSettingsRepository);
+
+ ref.cancel(tx, storageManager, postOffice, queueSettingsRepository);
}
-
+
tx.rollback();
}
@@ -402,26 +403,27 @@
return ref;
}
- public void deliverReplicated(final long messageID) throws Exception
+ public void deliverReplicated(final SimpleString address, final long messageID) throws Exception
{
// It may not be the first in the queue - since there may be multiple producers
// sending to the queue
- MessageReference ref = messageQueue.removeOrCacheReference(messageID);
+ MessageReference ref = messageQueue.removeReferenceOnBackup(address, messageID);
- if (ref != null)
+ if (ref == null)
{
- // We call doHandle rather than handle, since we don't want to check available credits
- // This is because delivery and receive credits can be processed in different order on live
- // and backup, and otherwise we could have a situation where the delivery is replicated
- // but the credits haven't arrived yet, so the delivery gets rejected on backup
- HandleStatus handled = doHandle(ref);
+ throw new IllegalStateException("Cannot find ref when replicating delivery " + messageID);
+ }
- if (handled != HandleStatus.HANDLED)
- {
- throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
- }
+ // We call doHandle rather than handle, since we don't want to check available credits
+ // This is because delivery and receive credits can be processed in different order on live
+ // and backup, and otherwise we could have a situation where the delivery is replicated
+ // but the credits haven't arrived yet, so the delivery gets rejected on backup
+ HandleStatus handled = doHandle(ref);
+
+ if (handled != HandleStatus.HANDLED)
+ {
+ throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
}
-
}
public void failedOver()
@@ -529,23 +531,24 @@
{
deliveringRefs.add(ref);
}
-
+
ref.getQueue().referenceHandled();
}
-
+
if (preAcknowledge)
{
- //With pre-ack, we ack *before* sending to the client
+ // With pre-ack, we ack *before* sending to the client
doAck(ref);
}
-
+
// TODO: get rid of the instanceof by something like message.isLargeMessage()
if (message instanceof LargeServerMessage)
- {
- //FIXME - please put the replication logic in the sendLargeMessage method
-
+ {
+ // FIXME - please put the replication logic in the sendLargeMessage method
+
DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
- message.getMessageID()));
+ message.getMessageID(),
+ message.getDestination()));
if (result == null)
{
@@ -568,7 +571,7 @@
{
sendStandardMessage(ref, message);
}
-
+
return HandleStatus.HANDLED;
}
finally
@@ -597,7 +600,9 @@
final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
- DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
+ DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+ message.getMessageID(),
+ message.getDestination()));
if (result == null)
{
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -1140,7 +1140,7 @@
try
{
- consumer.deliverReplicated(packet.getMessageID());
+ consumer.deliverReplicated(packet.getAddress(), packet.getMessageID());
}
catch (Exception e)
{
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-09 06:24:32 UTC (rev 5608)
@@ -81,7 +81,7 @@
ClientProducer producer = session.createProducer(ADDRESS);
- final int numMessages = 5000;
+ final int numMessages = 50000;
PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
PagingStore storeLive = pmLive.getPageStore(ADDRESS);
@@ -119,7 +119,6 @@
if (i == numMessages / 2)
{
-// assertEquals("GlobalSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
System.out.println("Failing");
conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
}
More information about the jboss-cvs-commits
mailing list