Author: timfox
Date: 2009-11-05 09:23:20 -0500 (Thu, 05 Nov 2009)
New Revision: 8223
Modified:
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/paging/PagingManager.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java
trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
refactor and cleanup of paging code
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-05 13:30:49 UTC
(rev 8222)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-05 14:23:20 UTC
(rev 8223)
@@ -73,7 +73,7 @@
protected long messageID;
- private SimpleString destination;
+ protected SimpleString destination;
protected byte type;
Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java 2009-11-05 13:30:49 UTC (rev
8222)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java 2009-11-05 14:23:20 UTC (rev
8223)
@@ -13,12 +13,9 @@
package org.hornetq.core.paging;
-import java.util.Collection;
-
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SimpleString;
/**
@@ -57,35 +54,6 @@
void resumeDepages() throws Exception;
/**
- * To be used by transactions only.
- * If you're sure you will page if isPaging, just call the method page and look at
its return.
- * @param destination
- * @return
- */
- boolean isPaging(SimpleString destination) throws Exception;
-
- /**
- * Page, only if destination is in page mode.
- * @param message
- * @param sync - Sync should be called right after the write
- * @return false if destination is not on page mode
- */
-
- // FIXME - why are these methods still on PagingManager???
- // The current code is doing a lookup every time through this class just to call page
store!!
- boolean page(ServerMessage message, boolean duplicateDetection) throws Exception;
-
- /**
- * Page, only if destination is in page mode.
- * @param message
- * @return false if destination is not on page mode
- */
-
- // FIXME - why are these methods still on PagingManager???
- // The current code is doing a lookup every time through this class just to call page
store!!
- boolean page(ServerMessage message, long transactionId, boolean duplicateDetection)
throws Exception;
-
- /**
* Point to inform/restoring Transactions used when the messages were added into
paging
* */
void addTransaction(PageTransactionInfo pageTransaction);
@@ -95,9 +63,6 @@
* */
PageTransactionInfo getTransaction(long transactionID);
- /** Sync current-pages on disk for these destinations */
- void sync(Collection<SimpleString> destinationsToSync) throws Exception;
-
/**
* @param transactionID
*/
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-05 13:30:49 UTC (rev
8222)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-05 14:23:20 UTC (rev
8223)
@@ -33,32 +33,26 @@
*/
public interface PagingStore extends HornetQComponent
{
+ SimpleString getAddress();
+
int getNumberOfPages();
SimpleString getStoreName();
- /** Maximum number of bytes allowed in memory */
- long getMaxSizeBytes();
-
AddressFullMessagePolicy getAddressFullMessagePolicy();
long getPageSizeBytes();
long getAddressSize();
- /** @return true if paging was started, or false if paging was already started before
this call */
- boolean startPaging() throws Exception;
-
boolean isPaging();
void sync() throws Exception;
- boolean page(PagedMessage message, boolean sync, boolean duplicateDetection) throws
Exception;
+ boolean page(ServerMessage message, long transactionId, boolean duplicateDetection)
throws Exception;
- public boolean readPage() throws Exception;
+ boolean page(ServerMessage message, boolean duplicateDetection) throws Exception;
- Page getCurrentPage();
-
Page createPage(final int page) throws Exception;
/**
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +26,6 @@
import org.hornetq.core.paging.PagingStoreFactory;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.utils.SimpleString;
@@ -47,8 +45,6 @@
private volatile boolean started = false;
- // private volatile boolean backup;
-
private final AtomicLong totalMemoryBytes = new AtomicLong(0);
private final ConcurrentMap<SimpleString, PagingStore> stores = new
ConcurrentHashMap<SimpleString, PagingStore>();
@@ -59,8 +55,6 @@
private final StorageManager storageManager;
- private final boolean syncNonTransactional;
-
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo>
transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
// Static
@@ -73,13 +67,11 @@
public PagingManagerImpl(final PagingStoreFactory pagingSPI,
final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
- final boolean syncNonTransactional)
+ final HierarchicalRepository<AddressSettings>
addressSettingsRepository)
{
pagingStoreFactory = pagingSPI;
this.addressSettingsRepository = addressSettingsRepository;
this.storageManager = storageManager;
- this.syncNonTransactional = syncNonTransactional;
}
// Public
@@ -150,28 +142,6 @@
pagingStoreFactory.setPostOffice(postOffice);
}
- public boolean isPaging(final SimpleString destination) throws Exception
- {
- return getPageStore(destination).isPaging();
- }
-
- public boolean page(final ServerMessage message, final long transactionId, final
boolean duplicateDetection) throws Exception
- {
- // The sync on transactions is done on commit only
- return getPageStore(message.getDestination()).page(new PagedMessageImpl(message,
transactionId),
- false,
- duplicateDetection);
- }
-
- public boolean page(final ServerMessage message, final boolean duplicateDetection)
throws Exception
- {
- // If non Durable, there is no need to sync as there is no requirement for
persistence for those messages in case
- // of crash
- return getPageStore(message.getDestination()).page(new PagedMessageImpl(message),
- syncNonTransactional &&
message.isDurable(),
- duplicateDetection);
- }
-
public void addTransaction(final PageTransactionInfo pageTransaction)
{
transactions.put(pageTransaction.getTransactionID(), pageTransaction);
@@ -187,14 +157,6 @@
return transactions.get(id);
}
- public void sync(final Collection<SimpleString> destinationsToSync) throws
Exception
- {
- for (SimpleString destination : destinationsToSync)
- {
- getPageStore(destination).sync();
- }
- }
-
// HornetQComponent implementation
//
------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -57,6 +57,8 @@
private final String directory;
private final ExecutorFactory executorFactory;
+
+ protected final boolean syncNonTransactional;
private PagingManager pagingManager;
@@ -68,11 +70,14 @@
// Constructors --------------------------------------------------
- public PagingStoreFactoryNIO(final String directory, final ExecutorFactory
executorFactory)
+ public PagingStoreFactoryNIO(final String directory, final ExecutorFactory
executorFactory,
+ final boolean syncNonTransactional)
{
this.directory = directory;
this.executorFactory = executorFactory;
+
+ this.syncNonTransactional = syncNonTransactional;
}
// Public --------------------------------------------------------
@@ -81,17 +86,19 @@
{
}
- public synchronized PagingStore newStore(final SimpleString destinationName, final
AddressSettings settings) throws Exception
+ public synchronized PagingStore newStore(final SimpleString address, final
AddressSettings settings) throws Exception
{
- return new PagingStoreImpl(pagingManager,
+ return new PagingStoreImpl(address,
+ pagingManager,
storageManager,
postOffice,
null,
this,
- destinationName,
+ address,
settings,
- executorFactory.getExecutor());
+ executorFactory.getExecutor(),
+ syncNonTransactional);
}
/**
@@ -168,31 +175,33 @@
BufferedReader reader = new BufferedReader(new InputStreamReader(new
FileInputStream(addressFile)));
- String destination;
+ String addressString;
try
{
- destination = reader.readLine();
+ addressString = reader.readLine();
}
finally
{
reader.close();
}
- SimpleString destinationName = new SimpleString(destination);
+ SimpleString address = new SimpleString(addressString);
SequentialFileFactory factory = newFileFactory(guid);
- AddressSettings settings =
addressSettingsRepository.getMatch(destinationName.toString());
+ AddressSettings settings =
addressSettingsRepository.getMatch(address.toString());
- PagingStore store = new PagingStoreImpl(pagingManager,
+ PagingStore store = new PagingStoreImpl(address,
+ pagingManager,
storageManager,
postOffice,
factory,
this,
- destinationName,
+ address,
settings,
- executorFactory.getExecutor());
+ executorFactory.getExecutor(),
+ syncNonTransactional);
storesReturn.add(store);
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -64,6 +64,8 @@
// Attributes ----------------------------------------------------
+ private final SimpleString address;
+
private final StorageManager storageManager;
private final PostOffice postOffice;
@@ -110,15 +112,17 @@
* We need to perform checks on currentPage with minimal locking
* */
private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
-
+
private final ServerProducerCreditManager creditManager;
-
+
private boolean exceededAvailableCredits;
-
+
private volatile boolean running = false;
-
+
private final AtomicLong availableProducerCredits = new AtomicLong(0);
-
+
+ protected final boolean syncNonTransactional;
+
// Static --------------------------------------------------------
private static final boolean isTrace = log.isTraceEnabled();
@@ -133,20 +137,24 @@
// Constructors --------------------------------------------------
- public PagingStoreImpl(final PagingManager pagingManager,
+ public PagingStoreImpl(final SimpleString address,
+ final PagingManager pagingManager,
final StorageManager storageManager,
final PostOffice postOffice,
final SequentialFileFactory fileFactory,
final PagingStoreFactory storeFactory,
final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor)
+ final Executor executor,
+ final boolean syncNonTransactional)
{
if (pagingManager == null)
{
throw new IllegalStateException("Paging Manager can't be null");
}
+ this.address = address;
+
this.storageManager = storageManager;
this.postOffice = postOffice;
@@ -166,25 +174,26 @@
this.fileFactory = fileFactory;
this.storeFactory = storeFactory;
-
+
this.availableProducerCredits.set(maxSize);
-
- this.creditManager = new ServerProducerCreditManagerImpl(this);
+
+ this.creditManager = new ServerProducerCreditManagerImpl(this);
+
+ this.syncNonTransactional = syncNonTransactional;
}
// Public --------------------------------------------------------
// PagingStore implementation ------------------------------------
- public long getAddressSize()
+ public SimpleString getAddress()
{
- return sizeInBytes.get();
+ return address;
}
- /** Maximum number of bytes allowed in memory */
- public long getMaxSizeBytes()
+ public long getAddressSize()
{
- return maxSize;
+ return sizeInBytes.get();
}
public AddressFullMessagePolicy getAddressFullMessagePolicy()
@@ -200,6 +209,7 @@
public boolean isPaging()
{
currentPageLock.readLock().lock();
+
try
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
@@ -226,27 +236,27 @@
{
return storeName;
}
-
+
public boolean isExceededAvailableCredits()
{
return exceededAvailableCredits;
}
-
+
public synchronized int getAvailableProducerCredits(final int credits)
- {
+ {
if (maxSize != -1 && addressFullMessagePolicy ==
AddressFullMessagePolicy.BLOCK)
- {
+ {
long avail = availableProducerCredits.get();
-
+
if (avail > 0)
{
long take = Math.min(avail, credits);
-
+
availableProducerCredits.addAndGet(-take);
-
+
return (int)take;
}
-
+
return 0;
}
else
@@ -254,243 +264,62 @@
return credits;
}
}
-
+
public void returnProducerCredits(final int credits)
{
checkReleaseProducerFlowControlCredits(-credits);
}
+
- private synchronized void checkReleaseProducerFlowControlCredits(final long size)
+ public void addSize(final ServerMessage message, final boolean add) throws Exception
{
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize
!= -1)
- {
- long avail = availableProducerCredits.addAndGet(-size);
-
- if (avail > 0)
- {
- int used = creditManager.creditsReleased((int)avail);
-
- long num = availableProducerCredits.addAndGet(-used);
-
- if (num < 0)
- {
- log.warn("Available credits has gone negative");
-
- exceededAvailableCredits = true;
- }
- }
- }
- }
-
- public void addSize(final ServerMessage message, final boolean add) throws Exception
- {
long size = message.getMemoryEstimate();
-
+
if (add)
{
checkReleaseProducerFlowControlCredits(size);
-
+
addSize(size);
}
else
{
checkReleaseProducerFlowControlCredits(-size);
-
+
addSize(-size);
}
}
-
+
public void addSize(final MessageReference reference, final boolean add) throws
Exception
{
long size = reference.getMemoryEstimate();
-
+
if (add)
{
checkReleaseProducerFlowControlCredits(size);
-
+
addSize(size);
}
else
{
checkReleaseProducerFlowControlCredits(-size);
-
+
addSize(-size);
}
}
- private void addSize(final long size) throws Exception
- {
- if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
- {
- addAddressSize(size);
-
- pagingManager.addSize(size);
-
- return;
- }
- else
- {
- pagingManager.addSize(size);
-
- final long addressSize = addAddressSize(size);
-
- if (size > 0)
- {
- if (maxSize > 0 && addressSize > maxSize)
- {
- if (startPaging())
- {
- if (isTrace)
- {
- trace("Starting paging on " + getStoreName() + ",
size = " + addressSize + ", maxSize=" + maxSize);
- }
- }
- }
- }
- else
- {
- // When in Global mode, we use the default page size as the mark to start
depage
- if (maxSize > 0 && currentPage != null && addressSize
<= maxSize - pageSize && !depaging.get())
- {
- if (startDepaging())
- {
- if (isTrace)
- {
- trace("Starting depaging Thread, size = " + addressSize);
- }
- }
- }
- }
-
- return;
- }
+ public boolean page(final ServerMessage message, final long transactionID, final
boolean duplicateDetection) throws Exception
+ {
+ // The sync on transactions is done on commit only
+ return page(message, transactionID, false, duplicateDetection);
}
- // TODO all of this can be simplified
- public boolean page(final PagedMessage message, final boolean sync, final boolean
duplicateDetection) throws Exception
+ public boolean page(final ServerMessage message, final boolean duplicateDetection)
throws Exception
{
- if (!running)
- {
- throw new IllegalStateException("PagingStore(" + getStoreName() +
") not initialized");
- }
-
- boolean full = isFull();
-
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
- {
- if (full)
- {
- if (!printedDropMessagesWarning)
- {
- printedDropMessagesWarning = true;
-
- log.warn("Messages are being dropped on address " +
getStoreName());
- }
-
- // Address is full, we just pretend we are paging, and drop the data
- return true;
- }
- else
- {
- return false;
- }
- }
- else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
- {
- return false;
- }
-
- // We need to ensure a read lock, as depage could change the paging state
- currentPageLock.readLock().lock();
-
- try
- {
- // First check done concurrently, to avoid synchronization and increase
throughput
- if (currentPage == null)
- {
- return false;
- }
- }
- finally
- {
- currentPageLock.readLock().unlock();
- }
-
- writeLock.lock();
-
- try
- {
- if (currentPage == null)
- {
- return false;
- }
-
- if (duplicateDetection)
- {
- // We set the duplicate detection header to prevent the message being depaged
more than once in case of
- // failure during depage
-
- ServerMessage msg = message.getMessage(storageManager);
-
- byte[] bytes = new byte[8];
-
- ByteBuffer buff = ByteBuffer.wrap(bytes);
-
- buff.putLong(msg.getMessageID());
-
- msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
- }
-
- int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
-
- if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
- {
- // Make sure nothing is currently validating or using currentPage
- currentPageLock.writeLock().lock();
- try
- {
- openNewPage();
-
- // openNewPage will set currentPageSize to zero, we need to set it again
- currentPageSize.addAndGet(bytesToWrite);
- }
- finally
- {
- currentPageLock.writeLock().unlock();
- }
- }
-
- currentPageLock.readLock().lock();
-
- try
- {
- if (currentPage != null)
- {
- currentPage.write(message);
-
- if (sync)
- {
- currentPage.sync();
- }
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- currentPageLock.readLock().unlock();
- }
- }
- finally
- {
- writeLock.unlock();
- }
-
+ // If non Durable, there is no need to sync as there is no requirement for
persistence for those messages in case
+ // of crash
+ return page(message, -1, syncNonTransactional && message.isDurable(),
duplicateDetection);
}
-
+
public void sync() throws Exception
{
currentPageLock.readLock().lock();
@@ -685,43 +514,12 @@
}
}
- /**
- * Depage one page-file, read it and send it to the pagingManager / postoffice
- * @return
- * @throws Exception
- */
- public boolean readPage() throws Exception
- {
- Page page = depage();
-
- if (page == null)
- {
- return false;
- }
-
- page.open();
-
- List<PagedMessage> messages = page.read();
-
- if (onDepage(page.getPageId(), storeName, messages))
- {
- page.delete();
-
- return true;
- }
- else
- {
- return false;
- }
-
- }
-
+
public Page getCurrentPage()
{
return currentPage;
}
-
public Page createPage(final int page) throws Exception
{
String fileName = createFileName(page);
@@ -748,13 +546,12 @@
return new PageImpl(this.storeName, storageManager, fileFactory, file, page);
}
-
+
public ServerProducerCreditManager getProducerCreditManager()
{
return creditManager;
}
-
// TestSupportPageStore ------------------------------------------
public void forceAnotherPage() throws Exception
@@ -851,6 +648,242 @@
// Private -------------------------------------------------------
/**
+ * Depage one page-file, read it and send it to the pagingManager / postoffice
+ * @return
+ * @throws Exception
+ */
+ private boolean readPage() throws Exception
+ {
+ Page page = depage();
+
+ if (page == null)
+ {
+ return false;
+ }
+
+ page.open();
+
+ List<PagedMessage> messages = page.read();
+
+ if (onDepage(page.getPageId(), storeName, messages))
+ {
+ page.delete();
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+
+
+ private synchronized void checkReleaseProducerFlowControlCredits(final long size)
+ {
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize
!= -1)
+ {
+ long avail = availableProducerCredits.addAndGet(-size);
+
+ if (avail > 0)
+ {
+ int used = creditManager.creditsReleased((int)avail);
+
+ long num = availableProducerCredits.addAndGet(-used);
+
+ if (num < 0)
+ {
+ log.warn("Available credits has gone negative");
+
+ exceededAvailableCredits = true;
+ }
+ }
+ }
+ }
+
+
+ private void addSize(final long size) throws Exception
+ {
+ if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
+ {
+ addAddressSize(size);
+
+ pagingManager.addSize(size);
+
+ return;
+ }
+ else
+ {
+ pagingManager.addSize(size);
+
+ final long addressSize = addAddressSize(size);
+
+ if (size > 0)
+ {
+ if (maxSize > 0 && addressSize > maxSize)
+ {
+ if (startPaging())
+ {
+ if (isTrace)
+ {
+ trace("Starting paging on " + getStoreName() + ",
size = " + addressSize + ", maxSize=" + maxSize);
+ }
+ }
+ }
+ }
+ else
+ {
+ // When in Global mode, we use the default page size as the mark to start
depage
+ if (maxSize > 0 && currentPage != null && addressSize
<= maxSize - pageSize && !depaging.get())
+ {
+ if (startDepaging())
+ {
+ if (isTrace)
+ {
+ trace("Starting depaging Thread, size = " + addressSize);
+ }
+ }
+ }
+ }
+
+ return;
+ }
+ }
+
+ private boolean page(final ServerMessage message, final long transactionID, final
boolean sync, final boolean duplicateDetection) throws Exception
+ {
+ if (!running)
+ {
+ throw new IllegalStateException("PagingStore(" + getStoreName() +
") not initialized");
+ }
+
+ boolean full = isFull();
+
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
+ {
+ if (full)
+ {
+ if (!printedDropMessagesWarning)
+ {
+ printedDropMessagesWarning = true;
+
+ log.warn("Messages are being dropped on address " +
getStoreName());
+ }
+
+ // Address is full, we just pretend we are paging, and drop the data
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
+ {
+ return false;
+ }
+
+ // We need to ensure a read lock, as depage could change the paging state
+ currentPageLock.readLock().lock();
+
+ try
+ {
+ // First check done concurrently, to avoid synchronization and increase
throughput
+ if (currentPage == null)
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ currentPageLock.readLock().unlock();
+ }
+
+ writeLock.lock();
+
+ try
+ {
+ if (currentPage == null)
+ {
+ return false;
+ }
+
+ if (duplicateDetection)
+ {
+ // We set the duplicate detection header to prevent the message being depaged
more than once in case of
+ // failure during depage
+
+ byte[] bytes = new byte[8];
+
+ ByteBuffer buff = ByteBuffer.wrap(bytes);
+
+ buff.putLong(message.getMessageID());
+
+ message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
+ }
+
+ int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
+
+ if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
+ {
+ // Make sure nothing is currently validating or using currentPage
+ currentPageLock.writeLock().lock();
+ try
+ {
+ openNewPage();
+
+ // openNewPage will set currentPageSize to zero, we need to set it again
+ currentPageSize.addAndGet(bytesToWrite);
+ }
+ finally
+ {
+ currentPageLock.writeLock().unlock();
+ }
+ }
+
+ currentPageLock.readLock().lock();
+
+ try
+ {
+ if (currentPage != null)
+ {
+ PagedMessage pagedMessage;
+
+ if (transactionID != -1)
+ {
+ pagedMessage = new PagedMessageImpl(message, transactionID);
+ }
+ else
+ {
+ pagedMessage = new PagedMessageImpl(message);
+ }
+
+ currentPage.write(pagedMessage);
+
+ if (sync)
+ {
+ currentPage.sync();
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ currentPageLock.readLock().unlock();
+ }
+ }
+ finally
+ {
+ writeLock.unlock();
+ }
+
+ }
+
+ /**
* This method will remove files from the page system and and route them, doing it
transactionally
*
* If persistent messages are also used, it will update eventual PageTransactions
@@ -868,13 +901,12 @@
// nothing to be done on this case.
return true;
}
-
// Depage has to be done atomically, in case of failure it should be
// back to where it was
Transaction depageTransaction = new TransactionImpl(storageManager);
-
+
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE,
Boolean.valueOf(true));
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new
HashSet<PageTransactionInfo>();
@@ -882,13 +914,14 @@
for (PagedMessage pagedMessage : pagedMessages)
{
ServerMessage message = pagedMessage.getMessage(storageManager);
-
+
if (message.isLargeMessage())
{
LargeServerMessage largeMsg = (LargeServerMessage)message;
if (!largeMsg.isFileExists())
{
- log.warn("File for large message " + largeMsg.getMessageID() +
" doesn't exist, so ignoring depage for this large message");
+ log.warn("File for large message " + largeMsg.getMessageID() +
+ " doesn't exist, so ignoring depage for this large
message");
continue;
}
}
@@ -931,7 +964,7 @@
if (isTrace)
{
trace("Rollback was called after prepare, ignoring message "
+ message);
- }
+ }
continue;
}
@@ -970,8 +1003,8 @@
}
depageTransaction.commit();
-
- // StorageManager does the check: if (replicated) -> do the proper cleanup
already
+
+ // StorageManager does the check: if (replicated) -> do the proper cleanup
already
storageManager.completeReplication();
if (isTrace)
@@ -987,7 +1020,7 @@
*/
private boolean isAddressFull(final long nextPageSize)
{
- return getMaxSizeBytes() > 0 && getAddressSize() + nextPageSize >
getMaxSizeBytes();
+ return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;
}
private long addAddressSize(final long delta)
@@ -1012,7 +1045,7 @@
" addressSize = " +
this.getAddressSize() +
" addressMax " +
- this.getMaxSizeBytes() +
+ maxSize +
" isPaging = " +
isPaging() +
" addressFull = " +
@@ -1050,9 +1083,8 @@
{
currentPage.close();
}
-
+
currentPage = createPage(currentPageId);
-
currentPageSize.set(0);
@@ -1084,7 +1116,7 @@
// To be used on isDropMessagesWhenFull
private boolean isFull()
{
- return getMaxSizeBytes() > 0 && getAddressSize() >
getMaxSizeBytes();
+ return maxSize > 0 && getAddressSize() > maxSize;
}
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -37,4 +37,9 @@
void forceAnotherPage() throws Exception;
boolean isExceededAvailableCredits();
+
+ /** @return true if paging was started, or false if paging was already started before
this call */
+ boolean startPaging() throws Exception;
+
+ Page getCurrentPage();
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -598,18 +599,16 @@
if (context.getTransaction() == null)
{
- if (pagingManager.page(message, true))
+ if (message.page(true))
{
return;
}
}
else
{
- SimpleString destination = message.getDestination();
-
boolean depage =
context.getTransaction().getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
- if (!depage && pagingManager.isPaging(destination))
+ if (!depage && message.storeIsPaging())
{
getPageOperation(context.getTransaction()).addMessageToPage(message);
@@ -1149,20 +1148,20 @@
boolean pagingPersistent = false;
- HashSet<SimpleString> pagedDestinationsToSync = new
HashSet<SimpleString>();
+ Set<PagingStore> pagingStoresToSync = new
HashSet<PagingStore>();
// We only need to add the dupl id header once per transaction
boolean first = true;
for (ServerMessage message : messagesToPage)
{
- if (pagingManager.page(message, tx.getID(), first))
+ if (message.page(tx.getID(), first))
{
if (message.isDurable())
{
// We only create pageTransactions if using persistent messages
pageTransaction.increment();
pagingPersistent = true;
- pagedDestinationsToSync.add(message.getDestination());
+ pagingStoresToSync.add(message.getPagingStore());
}
}
else
@@ -1179,9 +1178,13 @@
{
tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
- if (!pagedDestinationsToSync.isEmpty())
+ if (!pagingStoresToSync.isEmpty())
{
- pagingManager.sync(pagedDestinationsToSync);
+ for (PagingStore store: pagingStoresToSync)
+ {
+ store.sync();
+ }
+
storageManager.storePageTransaction(tx.getID(), pageTransaction);
}
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -200,10 +200,10 @@
journalLoadInformation = storage.loadInternalOnly();
pageManager = new PagingManagerImpl(new
PagingStoreFactoryNIO(config.getPagingDirectory(),
-
server.getExecutorFactory()),
+
server.getExecutorFactory(),
+
config.isJournalSyncNonTransactional()),
storage,
- server.getAddressSettingsRepository(),
- false);
+ server.getAddressSettingsRepository());
pageManager.start();
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-05 13:30:49 UTC (rev
8222)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-05 14:23:20 UTC (rev
8223)
@@ -52,4 +52,14 @@
ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
void setOriginalHeaders(ServerMessage other, boolean expiry);
+
+ void setPagingStore(PagingStore store);
+
+ PagingStore getPagingStore();
+
+ boolean page(boolean duplicateDetection) throws Exception;
+
+ boolean page(long transactionID, boolean duplicateDetection) throws Exception;
+
+ boolean storeIsPaging();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java 2009-11-05 13:30:49 UTC
(rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java 2009-11-05 14:23:20 UTC
(rev 8223)
@@ -9,7 +9,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
- */
+ */
package org.hornetq.core.server.impl;
@@ -24,25 +24,24 @@
*
*/
public class DeliveryImpl implements Delivery
-{
+{
private final long consumerID;
-
+
private final MessageReference reference;
-
- public DeliveryImpl(final long consumerID,
- final MessageReference reference)
- {
+
+ public DeliveryImpl(final long consumerID, final MessageReference reference)
+ {
this.consumerID = consumerID;
- this.reference = reference;
+ this.reference = reference;
}
public long getConsumerID()
{
return consumerID;
}
-
+
public MessageReference getReference()
{
return reference;
- }
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -28,12 +28,12 @@
protected final List<Consumer> consumers = new ArrayList<Consumer>();
- public void addConsumer(Consumer consumer)
+ public void addConsumer(final Consumer consumer)
{
consumers.add(consumer);
}
- public boolean removeConsumer(Consumer consumer)
+ public boolean removeConsumer(final Consumer consumer)
{
return consumers.remove(consumer);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-05 13:30:49 UTC
(rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-11-05 14:23:20 UTC
(rev 8223)
@@ -50,7 +50,7 @@
private final Filter filter;
private final Transformer transformer;
-
+
public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName,
final SimpleString routingName,
@@ -74,19 +74,19 @@
this.postOffice = postOffice;
}
- public void route(ServerMessage message, final RoutingContext context) throws
Exception
- {
+ public void route(final ServerMessage message, final RoutingContext context) throws
Exception
+ {
SimpleString originalDestination = message.getDestination();
-
- //We must make a copy of the message, otherwise things like returning credits to
the page won't work
- //properly on ack, since the original destination will be overwritten
-
- //TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
-
+
+ // We must make a copy of the message, otherwise things like returning credits to
the page won't work
+ // properly on ack, since the original destination will be overwritten
+
+ // TODO we can optimise this so it doesn't copy if it's not routed anywhere
else
+
ServerMessage copy = message.copy();
-
+
copy.setDestination(forwardAddress);
-
+
copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
if (transformer != null)
@@ -94,7 +94,7 @@
copy = transformer.transform(copy);
}
- postOffice.route(copy, context.getTransaction());
+ postOffice.route(copy, context.getTransaction());
}
public SimpleString getRoutingName()
@@ -111,7 +111,7 @@
{
return exclusive;
}
-
+
public Filter getFilter()
{
return filter;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -60,9 +60,9 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -93,14 +93,14 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.cluster.Transformer;
+import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.LocalGroupingHandler;
import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
-import org.hornetq.core.server.group.impl.GroupBinding;
-import org.hornetq.core.server.cluster.ClusterManager;
-import org.hornetq.core.server.cluster.Transformer;
-import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
@@ -221,7 +221,7 @@
this(configuration, null, null);
}
- public HornetQServerImpl(final Configuration configuration, MBeanServer mbeanServer)
+ public HornetQServerImpl(final Configuration configuration, final MBeanServer
mbeanServer)
{
this(configuration, mbeanServer, null);
}
@@ -256,7 +256,7 @@
this.securityManager = securityManager;
- this.addressSettingsRepository = new
HierarchicalObjectRepository<AddressSettings>();
+ addressSettingsRepository = new
HierarchicalObjectRepository<AddressSettings>();
addressSettingsRepository.setDefault(new AddressSettings());
}
@@ -289,8 +289,8 @@
{
if (!configuration.isSharedStore())
{
- this.replicationEndpoint = new ReplicationEndpointImpl(this);
- this.replicationEndpoint.start();
+ replicationEndpoint = new ReplicationEndpointImpl(this);
+ replicationEndpoint.start();
}
// We defer actually initialisation until the live node has contacted the
backup
log.info("Backup server initialised");
@@ -529,8 +529,8 @@
}
public ReattachSessionResponseMessage reattachSession(final RemotingConnection
connection,
- final String name,
- final int
lastReceivedCommandID) throws Exception
+ final String name,
+ final int lastReceivedCommandID)
throws Exception
{
if (!started)
{
@@ -580,17 +580,17 @@
}
public CreateSessionResponseMessage createSession(final String name,
- final long channelID,
- final String username,
- final String password,
- final int
minLargeMessageSize,
- final int
incrementingVersion,
- final
RemotingConnection connection,
- final boolean
autoCommitSends,
- final boolean
autoCommitAcks,
- final boolean
preAcknowledge,
- final boolean xa,
- final int
sendWindowSize) throws Exception
+ final long channelID,
+ final String username,
+ final String password,
+ final int minLargeMessageSize,
+ final int incrementingVersion,
+ final RemotingConnection
connection,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean preAcknowledge,
+ final boolean xa,
+ final int sendWindowSize) throws
Exception
{
if (!started)
{
@@ -607,14 +607,16 @@
". " +
"Please ensure all clients and servers are upgraded to the same
version for them to " +
"interoperate properly");
- throw new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
"Server and client versions incompatible");
+ throw new
HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
+ "Server and client versions
incompatible");
}
if (!checkActivate())
{
// Backup server is not ready to accept connections
- throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
"Server will not accept create session requests");
+ throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
+ "Server will not accept create session
requests");
}
if (securityStore != null)
@@ -674,11 +676,11 @@
if (replicationEndpoint.getChannel() != null)
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup
replication server is already connected to another server");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Backup replication server is already connected
to another server");
}
-
+
replicationEndpoint.setChannel(channel);
-
return replicationEndpoint;
}
@@ -713,7 +715,7 @@
return new HashSet<ServerSession>(sessions.values());
}
- //TODO - should this really be here?? It's only used in tests
+ // TODO - should this really be here?? It's only used in tests
public boolean isInitialised()
{
synchronized (initialiseLock)
@@ -748,19 +750,19 @@
}
public Queue createQueue(final SimpleString address,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary) throws Exception
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary) throws Exception
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
public Queue deployQueue(final SimpleString address,
- final SimpleString queueName,
- final SimpleString filterString,
- final boolean durable,
- final boolean temporary) throws Exception
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final boolean durable,
+ final boolean temporary) throws Exception
{
return createQueue(address, queueName, filterString, durable, temporary, true);
}
@@ -864,10 +866,11 @@
protected PagingManager createPagingManager()
{
- return new PagingManagerImpl(new
PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
+ return new PagingManagerImpl(new
PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ executorFactory,
+
configuration.isJournalSyncNonTransactional()),
storageManager,
- addressSettingsRepository,
- configuration.isJournalSyncNonTransactional());
+ addressSettingsRepository);
}
/** for use on sub-classes */
@@ -912,7 +915,8 @@
replicationFailoverManager = createBackupConnection(backupConnector,
threadPool, scheduledPool);
- this.replicationManager = new
ReplicationManagerImpl(replicationFailoverManager, configuration.getBackupWindowSize());
+ replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
+
configuration.getBackupWindowSize());
replicationManager.start();
}
}
@@ -1008,7 +1012,7 @@
startReplication();
- this.storageManager = createStorageManager();
+ storageManager = createStorageManager();
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
securityRepository.setDefault(new HashSet<Role>());
@@ -1093,13 +1097,12 @@
}
}
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
-
+
// Load the journal and populate queues, transactions and caches in memory
JournalLoadInformation[] journalInfo = loadJournals();
-
+
compareJournals(journalInfo);
-
// Deploy any queues in the Configuration class - if there's no file deployment
we still need
// to load those
deployQueuesFromConfiguration();
@@ -1163,7 +1166,7 @@
/**
* @param journalInfo
*/
- private void compareJournals(JournalLoadInformation[] journalInfo) throws Exception
+ private void compareJournals(final JournalLoadInformation[] journalInfo) throws
Exception
{
if (replicationManager != null)
{
@@ -1185,7 +1188,7 @@
private JournalLoadInformation[] loadJournals() throws Exception
{
JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
-
+
List<QueueBindingInfo> queueBindingInfos = new
ArrayList<QueueBindingInfo>();
List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
@@ -1230,7 +1233,11 @@
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new
HashMap<SimpleString, List<Pair<byte[], Long>>>();
- journalInfo[1] = storageManager.loadMessageJournal(postOffice, pagingManager,
resourceManager, queues, duplicateIDMap);
+ journalInfo[1] = storageManager.loadMessageJournal(postOffice,
+ pagingManager,
+ resourceManager,
+ queues,
+ duplicateIDMap);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry :
duplicateIDMap.entrySet())
{
@@ -1243,7 +1250,7 @@
cache.load(entry.getValue());
}
}
-
+
return journalInfo;
}
@@ -1405,10 +1412,10 @@
config.getName(),
config.getAddress(),
config.getTimeout());
- }
-
+ }
+
this.groupingHandler = groupingHandler;
-
+
managementService.addNotificationListener(groupingHandler);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -67,6 +67,7 @@
addressSettingsRepository);
}
+ @Override
public synchronized void add(final MessageReference ref, final boolean first)
{
SimpleString prop =
(SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
@@ -76,7 +77,7 @@
HolderReference hr = map.get(prop);
if (!first)
- {
+ {
if (hr != null)
{
// We need to overwrite the old ref with the new one and ack the old one
@@ -169,7 +170,7 @@
this.ref = ref;
}
- public MessageReference copy(Queue queue)
+ public MessageReference copy(final Queue queue)
{
return ref.copy(queue);
}
@@ -209,12 +210,12 @@
ref.incrementDeliveryCount();
}
- public void setDeliveryCount(int deliveryCount)
+ public void setDeliveryCount(final int deliveryCount)
{
ref.setDeliveryCount(deliveryCount);
}
- public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ public void setScheduledDeliveryTime(final long scheduledDeliveryTime)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -28,79 +28,80 @@
public class MemoryManagerImpl implements MemoryManager
{
private static final Logger log = Logger.getLogger(MemoryManagerImpl.class);
-
- private Runtime runtime;
-
- private long measureInterval;
-
- private int memoryWarningThreshold;
-
+
+ private final Runtime runtime;
+
+ private final long measureInterval;
+
+ private final int memoryWarningThreshold;
+
private volatile boolean started;
-
+
private Thread thread;
-
+
private volatile boolean low;
-
- public MemoryManagerImpl(int memoryWarningThreshold, long measureInterval)
+
+ public MemoryManagerImpl(final int memoryWarningThreshold, final long
measureInterval)
{
runtime = Runtime.getRuntime();
-
+
this.measureInterval = measureInterval;
-
- this.memoryWarningThreshold = memoryWarningThreshold;
+
+ this.memoryWarningThreshold = memoryWarningThreshold;
}
-
+
public boolean isMemoryLow()
{
return low;
}
-
+
public synchronized boolean isStarted()
{
return started;
}
-
+
public synchronized void start()
{
- log.debug("Starting MemoryManager with MEASURE_INTERVAL: " +
measureInterval
- + " FREE_MEMORY_PERCENT: " + memoryWarningThreshold);
-
+ log.debug("Starting MemoryManager with MEASURE_INTERVAL: " +
measureInterval +
+ " FREE_MEMORY_PERCENT: " +
+ memoryWarningThreshold);
+
if (started)
{
- //Already started
+ // Already started
return;
}
-
+
started = true;
-
+
thread = new Thread(new MemoryRunnable());
-
+
thread.setDaemon(true);
-
+
thread.start();
}
-
+
public synchronized void stop()
- {
+ {
if (!started)
{
- //Already stopped
+ // Already stopped
return;
}
-
+
started = false;
-
+
thread.interrupt();
-
+
try
{
thread.join();
}
catch (InterruptedException ignore)
- {
+ {
}
}
-
+
private class MemoryRunnable implements Runnable
{
public void run()
@@ -113,7 +114,7 @@
{
break;
}
-
+
Thread.sleep(measureInterval);
}
catch (InterruptedException ignore)
@@ -123,17 +124,17 @@
break;
}
}
-
+
long maxMemory = runtime.maxMemory();
-
+
long totalMemory = runtime.totalMemory();
-
+
long freeMemory = runtime.freeMemory();
-
- long availableMemory = freeMemory + (maxMemory - totalMemory);
-
- double availableMemoryPercent = 100.0 * (double)availableMemory / maxMemory;
-
+
+ long availableMemory = freeMemory + maxMemory - totalMemory;
+
+ double availableMemoryPercent = 100.0 * availableMemory / maxMemory;
+
String info = "";
info += String.format("free memory: %s\n",
SizeFormatterUtil.sizeof(freeMemory));
info += String.format("max memory: %s\n",
SizeFormatterUtil.sizeof(maxMemory));
@@ -144,21 +145,22 @@
{
log.debug(info);
}
-
+
if (availableMemoryPercent <= memoryWarningThreshold)
{
- log.warn("Less than " + memoryWarningThreshold + "%\n"
- + info +
+ log.warn("Less than " + memoryWarningThreshold +
+ "%\n" +
+ info +
"\nYou are in danger of running out of RAM. Have you set
paging parameters " +
"on your addresses? (See user manual \"Paging\"
chapter)");
-
+
low = true;
}
else
{
low = false;
}
-
+
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -47,11 +47,11 @@
public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue)
{
- this.deliveryCount = other.deliveryCount;
+ deliveryCount = other.deliveryCount;
- this.scheduledDeliveryTime = other.scheduledDeliveryTime;
+ scheduledDeliveryTime = other.scheduledDeliveryTime;
- this.message = other.message;
+ message = other.message;
this.queue = queue;
}
@@ -94,7 +94,7 @@
{
deliveryCount++;
}
-
+
public void decrementDeliveryCount()
{
deliveryCount--;
@@ -119,7 +119,7 @@
{
return queue;
}
-
+
public void handled()
{
queue.referenceHandled();
@@ -127,6 +127,7 @@
// Public --------------------------------------------------------
+ @Override
public String toString()
{
return "Reference[" + getMessage().getMessageID() +
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -72,15 +72,15 @@
if (addressSettings.isLastValueQueue())
{
queue = new LastValueQueue(persistenceID,
- address,
- name,
- filter,
- durable,
- temporary,
- scheduledExecutor,
- postOffice,
- storageManager,
- addressSettingsRepository);
+ address,
+ name,
+ filter,
+ durable,
+ temporary,
+ scheduledExecutor,
+ postOffice,
+ storageManager,
+ addressSettingsRepository);
}
else
{
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-05 13:30:49 UTC
(rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-05 14:23:20 UTC
(rev 8223)
@@ -123,7 +123,7 @@
private final ScheduledExecutorService scheduledExecutor;
- private SimpleString address;
+ private final SimpleString address;
private Redistributor redistributor;
@@ -137,7 +137,7 @@
private final Map<Consumer, Iterator<MessageReference>> iterators = new
HashMap<Consumer, Iterator<MessageReference>>();
- private ConcurrentMap<SimpleString, Consumer> groups = new
ConcurrentHashMap<SimpleString, Consumer>();
+ private final ConcurrentMap<SimpleString, Consumer> groups = new
ConcurrentHashMap<SimpleString, Consumer>();
private volatile SimpleString expiryAddress;
@@ -877,7 +877,7 @@
@Override
public String toString()
{
- return "QueueImpl(name=" + this.name.toString() + ")";
+ return "QueueImpl(name=" + name.toString() + ")";
}
// Private
@@ -1402,7 +1402,8 @@
// ack isn't committed, then the server crashes and on
// recovery the message is deleted even though the other ack never committed
- //also note then when this happens as part of a trasaction its the tx commt
of the ack that is important not this
+ // also note then when this happens as part of a trasaction its the tx commt
of the ack that is important
+ // not this
try
{
storageManager.deleteMessage(message.getMessageID());
@@ -1434,7 +1435,7 @@
}
- void postRollback(LinkedList<MessageReference> refs) throws Exception
+ void postRollback(final LinkedList<MessageReference> refs) throws Exception
{
synchronized (this)
{
@@ -1449,7 +1450,7 @@
}
}
- private synchronized void initPagingStore(SimpleString destination)
+ private synchronized void initPagingStore(final SimpleString destination)
{
// PagingManager would be null only on testcases
if (pagingStore == null && pagingManager != null)
@@ -1496,14 +1497,14 @@
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
- QueueImpl.this.lockDelivery();
+ lockDelivery();
try
{
deliver();
}
finally
{
- QueueImpl.this.unlockDelivery();
+ unlockDelivery();
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -43,6 +43,7 @@
return super.removeConsumer(consumer);
}
+ @Override
public synchronized int getConsumerCount()
{
return super.getConsumerCount();
@@ -54,11 +55,11 @@
incrementPosition();
return consumer;
}
-
+
private synchronized void incrementPosition()
{
pos++;
-
+
if (pos == consumers.size())
{
pos = 0;
Modified: trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -29,19 +29,19 @@
*/
public class RoutingContextImpl implements RoutingContext
{
- private List<Queue> queues = new ArrayList<Queue>();
-
+ private final List<Queue> queues = new ArrayList<Queue>();
+
private Transaction transaction;
-
+
private int depth;
-
+
public RoutingContextImpl(final Transaction transaction)
{
this.transaction = transaction;
}
public void addQueue(final Queue queue)
- {
+ {
queues.add(queue);
}
@@ -49,12 +49,12 @@
{
return transaction;
}
-
+
public void setTransaction(final Transaction tx)
{
- this.transaction = tx;
+ transaction = tx;
}
-
+
public List<Queue> getQueues()
{
return queues;
Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -41,7 +41,7 @@
private final ScheduledExecutorService scheduledExecutor;
private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new
LinkedHashMap<Long, ScheduledDeliveryRunnable>();
-
+
private boolean rescheduled;
public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
@@ -67,13 +67,13 @@
scheduledRunnables.put(ref.getMessage().getMessageID(), runnable);
}
- scheduleDelivery(runnable, deliveryTime);
+ scheduleDelivery(runnable, deliveryTime);
return true;
}
return false;
}
-
+
public void reSchedule()
{
synchronized (scheduledRunnables)
@@ -84,7 +84,7 @@
{
scheduleDelivery(runnable,
runnable.getReference().getScheduledDeliveryTime());
}
-
+
rescheduled = true;
}
}
@@ -98,7 +98,7 @@
public List<MessageReference> getScheduledReferences()
{
List<MessageReference> refs = new ArrayList<MessageReference>();
-
+
synchronized (scheduledRunnables)
{
for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables.values())
@@ -112,13 +112,13 @@
public List<MessageReference> cancel()
{
List<MessageReference> refs = new ArrayList<MessageReference>();
-
+
synchronized (scheduledRunnables)
{
for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
{
runnable.cancel();
-
+
refs.add(runnable.getReference());
}
@@ -126,8 +126,8 @@
}
return refs;
}
-
- public MessageReference removeReferenceWithID(long id)
+
+ public MessageReference removeReferenceWithID(final long id)
{
synchronized (scheduledRunnables)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java 2009-11-05 13:30:49 UTC
(rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java 2009-11-05 14:23:20 UTC
(rev 8223)
@@ -34,7 +34,7 @@
{
private final HornetQServer server;
- private PagingManager pagingManager;
+ private final PagingManager pagingManager;
// Constants -----------------------------------------------------
@@ -57,8 +57,8 @@
long maxMemory = Runtime.getRuntime().maxMemory();
long totalMemory = Runtime.getRuntime().totalMemory();
long freeMemory = Runtime.getRuntime().freeMemory();
- long availableMemory = freeMemory + (maxMemory - totalMemory);
- double availableMemoryPercent = 100.0 * (double)availableMemory / maxMemory;
+ long availableMemory = freeMemory + maxMemory - totalMemory;
+ double availableMemoryPercent = 100.0 * availableMemory / maxMemory;
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
String info = "\n**** Server Dump ****\n";
@@ -90,7 +90,9 @@
try
{
pageStore = pagingManager.getPageStore(storeName);
- info += String.format("\t%s: %s\n", storeName,
SizeFormatterUtil.sizeof(pageStore.getPageSizeBytes() * pageStore.getNumberOfPages()));
+ info += String.format("\t%s: %s\n",
+ storeName,
+ SizeFormatterUtil.sizeof(pageStore.getPageSizeBytes() *
pageStore.getNumberOfPages()));
}
catch (Exception e)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -45,6 +45,8 @@
// We cache this
private volatile int memoryEstimate = -1;
+ private PagingStore pagingStore;
+
/*
* Constructor for when reading from network
*/
@@ -88,7 +90,7 @@
messageID = id;
}
- public void setType(byte type)
+ public void setType(final byte type)
{
this.type = type;
}
@@ -103,34 +105,34 @@
public int incrementRefCount(final PagingStore pagingStore, final MessageReference
reference) throws Exception
{
int count = refCount.incrementAndGet();
-
+
if (pagingStore != null)
{
if (count == 1)
{
pagingStore.addSize(this, true);
}
-
+
pagingStore.addSize(reference, true);
}
-
+
return count;
}
-
+
public int decrementRefCount(final PagingStore pagingStore, final MessageReference
reference) throws Exception
{
int count = refCount.decrementAndGet();
-
+
if (pagingStore != null)
{
if (count == 0)
{
pagingStore.addSize(this, false);
}
-
+
pagingStore.addSize(reference, false);
}
-
+
return count;
}
@@ -144,8 +146,6 @@
return durableRefCount.decrementAndGet();
}
-
-
public int getRefCount()
{
return refCount.get();
@@ -158,7 +158,7 @@
public long getLargeBodySize()
{
- return (long)getBodySize();
+ return getBodySize();
}
public int getMemoryEstimate()
@@ -200,18 +200,18 @@
*/
ServerMessage copy = copy(newID);
-
+
copy.setOriginalHeaders(this, expiry);
return copy;
}
-
+
public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
{
if (other.getProperty(HDR_ORIG_MESSAGE_ID) != null)
{
putStringProperty(HDR_ORIGINAL_DESTINATION,
(SimpleString)other.getProperty(HDR_ORIGINAL_DESTINATION));
-
+
putLongProperty(HDR_ORIG_MESSAGE_ID,
(Long)other.getProperty(HDR_ORIG_MESSAGE_ID));
}
else
@@ -222,18 +222,68 @@
putLongProperty(HDR_ORIG_MESSAGE_ID, other.getMessageID());
}
-
+
// reset expiry
setExpiration(0);
-
+
if (expiry)
- {
+ {
long actualExpiryTime = System.currentTimeMillis();
putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
}
}
+ public void setPagingStore(final PagingStore pagingStore)
+ {
+ this.pagingStore = pagingStore;
+
+ // On the server side, we reset the address to point to the instance of address in
the paging store
+ // Otherwise each message would have its own copy of the address String which would
take up more memory
+ destination = pagingStore.getAddress();
+ }
+
+ public PagingStore getPagingStore()
+ {
+ return pagingStore;
+ }
+
+ public boolean page(final boolean duplicateDetection) throws Exception
+ {
+ if (pagingStore != null)
+ {
+ return pagingStore.page(this, duplicateDetection);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public boolean page(final long transactionID, final boolean duplicateDetection) throws
Exception
+ {
+ if (pagingStore != null)
+ {
+ return pagingStore.page(this, transactionID, duplicateDetection);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public boolean storeIsPaging()
+ {
+ if (pagingStore != null)
+ {
+ return pagingStore.isPaging();
+ }
+ else
+ {
+ return false;
+ }
+ }
+
@Override
public String toString()
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -15,7 +15,6 @@
import org.hornetq.core.paging.PagingStore;
-
/**
* A ServerProducerCreditManager
*
@@ -26,10 +25,10 @@
public interface ServerProducerCreditManager
{
int creditsReleased(int credits);
-
+
int acquireCredits(int credits, CreditsAvailableRunnable runnable);
-
+
int waitingEntries();
-
+
PagingStore getStore();
}
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -91,7 +91,7 @@
entry.credits -= credits;
boolean sent = sendCredits(entry.waiting, credits);
-
+
if (sent)
{
credits = 0;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-05 13:30:49
UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-05 14:23:20
UTC (rev 8223)
@@ -168,6 +168,8 @@
private boolean closed;
+ private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new
HashMap<SimpleString, CreditManagerHolder>();
+
// Constructors
---------------------------------------------------------------------------------
public ServerSessionImpl(final String name,
@@ -190,7 +192,7 @@
final HornetQServer server,
final SimpleString managementAddress) throws Exception
{
- this.id = channel.getID();
+ id = channel.getID();
this.username = username;
@@ -332,7 +334,7 @@
}
}
- public void promptDelivery(final Queue queue, boolean async)
+ public void promptDelivery(final Queue queue, final boolean async)
{
if (async)
{
@@ -368,17 +370,17 @@
Filter filter = FilterImpl.createFilter(filterString);;
ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
- this,
- (QueueBinding)binding,
- filter,
- started,
- browseOnly,
- storageManager,
- channel,
- preAcknowledge,
- updateDeliveries,
- executor,
- managementService);
+ this,
+ (QueueBinding)binding,
+ filter,
+ started,
+ browseOnly,
+ storageManager,
+ channel,
+ preAcknowledge,
+ updateDeliveries,
+ executor,
+ managementService);
consumers.put(consumer.getID(), consumer);
@@ -635,7 +637,7 @@
sendResponse(packet, response, false, false);
}
- public void handleForceConsumerDelivery(SessionForceConsumerDelivery message)
+ public void handleForceConsumerDelivery(final SessionForceConsumerDelivery message)
{
try
{
@@ -1401,7 +1403,7 @@
}
catch (Exception e)
{
- log.error("Failed to receive credits " +
this.server.getConfiguration().isBackup(), e);
+ log.error("Failed to receive credits " +
server.getConfiguration().isBackup(), e);
}
sendResponse(packet, null, false, false);
@@ -1412,13 +1414,10 @@
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
- final LargeServerMessage msg = doCreateLargeMessage(id, packet);
+ LargeServerMessage msg = doCreateLargeMessage(id, packet);
if (msg != null)
{
- // With a send we must make sure it is replicated to backup before being
processed on live
- // or can end up with delivery being processed on backup before original send
-
if (currentLargeMessage != null)
{
log.warn("Replacing incomplete LargeMessage with ID=" +
currentLargeMessage.getMessageID());
@@ -1438,6 +1437,8 @@
try
{
+ setPagingStore(message);
+
long id = storageManager.generateUniqueID();
message.setMessageID(id);
@@ -1499,22 +1500,22 @@
{
throw new HornetQException(HornetQException.ILLEGAL_STATE,
"large-message not initialized on server");
}
-
- //Immediately release the credits for the continuations- these don't
contrinute to the in-memory size
- //of the message
-
+
+ // Immediately release the credits for the continuations- these don't
contrinute to the in-memory size
+ // of the message
+
releaseOutStanding(currentLargeMessage, packet.getRequiredBufferSize());
-
+
currentLargeMessage.addBytes(packet.getBody());
if (!packet.isContinues())
- {
+ {
currentLargeMessage.releaseResources();
send(currentLargeMessage);
releaseOutStanding(currentLargeMessage,
currentLargeMessage.getEncodeSize());
-
+
currentLargeMessage = null;
}
@@ -1543,40 +1544,6 @@
sendResponse(packet, response, false, false);
}
- private static final class CreditManagerHolder
- {
- CreditManagerHolder(final PagingStore store)
- {
- this.store = store;
-
- this.manager = store.getProducerCreditManager();
- }
-
- final PagingStore store;
-
- final ServerProducerCreditManager manager;
-
- volatile int outstandingCredits;
- }
-
- private Map<SimpleString, CreditManagerHolder> creditManagerHolders = new
HashMap<SimpleString, CreditManagerHolder>();
-
- private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws
Exception
- {
- CreditManagerHolder holder = creditManagerHolders.get(address);
-
- if (holder == null)
- {
- PagingStore store = postOffice.getPagingManager().getPageStore(address);
-
- holder = new CreditManagerHolder(store);
-
- creditManagerHolders.put(address, holder);
- }
-
- return holder;
- }
-
public void handleRequestProducerCredits(final SessionRequestProducerCreditsMessage
packet) throws Exception
{
final SimpleString address = packet.getAddress();
@@ -1587,7 +1554,7 @@
int gotCredits = holder.manager.acquireCredits(credits, new
CreditsAvailableRunnable()
{
- public boolean run(int credits)
+ public boolean run(final int credits)
{
synchronized (ServerSessionImpl.this)
{
@@ -1613,22 +1580,13 @@
sendResponse(packet, null, false, false);
}
- private void sendProducerCredits(final CreditManagerHolder holder, final int credits,
final SimpleString address)
- {
- holder.outstandingCredits += credits;
-
- Packet packet = new SessionProducerCreditsMessage(credits, address);
-
- channel.send(packet);
- }
-
public int transferConnection(final RemotingConnection newConnection, final int
lastReceivedCommandID)
{
- boolean wasStarted = this.started;
+ boolean wasStarted = started;
if (wasStarted)
{
- this.setStarted(false);
+ setStarted(false);
}
remotingConnection.removeFailureListener(this);
@@ -1652,11 +1610,11 @@
int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
- channel.replayCommands(lastReceivedCommandID, this.id);
+ channel.replayCommands(lastReceivedCommandID, id);
if (wasStarted)
{
- this.setStarted(true);
+ setStarted(true);
}
return serverLastReceivedCommandID;
@@ -1804,11 +1762,15 @@
* @param packet
* @throws Exception
*/
- private LargeServerMessage doCreateLargeMessage(long id, final SessionSendLargeMessage
packet)
+ private LargeServerMessage doCreateLargeMessage(final long id, final
SessionSendLargeMessage packet)
{
try
{
- return createLargeMessageStorage(id, packet.getLargeMessageHeader());
+ LargeServerMessage msg = createLargeMessageStorage(id,
packet.getLargeMessageHeader());
+
+ setPagingStore(msg);
+
+ return msg;
}
catch (Exception e)
{
@@ -1888,7 +1850,7 @@
}
}
- private void rollback(boolean lastMessageAsDelived) throws Exception
+ private void rollback(final boolean lastMessageAsDelived) throws Exception
{
if (tx == null)
{
@@ -1915,15 +1877,66 @@
*/
private void releaseOutStanding(final ServerMessage message, final int credits) throws
Exception
{
- CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
+ CreditManagerHolder holder = getCreditManagerHolder(message);
holder.outstandingCredits -= credits;
holder.store.returnProducerCredits(credits);
}
-
+
+ // TODO can we combine these two methods....
+ private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws
Exception
+ {
+ CreditManagerHolder holder = creditManagerHolders.get(address);
+
+ if (holder == null)
+ {
+ PagingStore store = postOffice.getPagingManager().getPageStore(address);
+
+ holder = new CreditManagerHolder(store);
+
+ creditManagerHolders.put(address, holder);
+ }
+
+ return holder;
+ }
+
+ private CreditManagerHolder getCreditManagerHolder(final ServerMessage message) throws
Exception
+ {
+ SimpleString address = message.getDestination();
+
+ CreditManagerHolder holder = creditManagerHolders.get(address);
+
+ if (holder == null)
+ {
+ holder = new CreditManagerHolder(message.getPagingStore());
+
+ creditManagerHolders.put(address, holder);
+ }
+
+ return holder;
+ }
+
+ private void setPagingStore(final ServerMessage message) throws Exception
+ {
+ PagingStore store =
postOffice.getPagingManager().getPageStore(message.getDestination());
+
+ message.setPagingStore(store);
+ }
+
+ private void sendProducerCredits(final CreditManagerHolder holder, final int credits,
final SimpleString address)
+ {
+ holder.outstandingCredits += credits;
+
+ Packet packet = new SessionProducerCreditsMessage(credits, address);
+
+ channel.send(packet);
+ }
+
private void send(final ServerMessage msg) throws Exception
{
+ // Look up the paging store
+
// check the user has write access to this address.
try
{
@@ -1947,4 +1960,20 @@
postOffice.route(msg, tx);
}
}
+
+ private static final class CreditManagerHolder
+ {
+ CreditManagerHolder(final PagingStore store)
+ {
+ this.store = store;
+
+ manager = store.getProducerCreditManager();
+ }
+
+ final PagingStore store;
+
+ final ServerProducerCreditManager manager;
+
+ volatile int outstandingCredits;
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -20,10 +20,10 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
@@ -53,11 +53,11 @@
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
@@ -275,12 +275,11 @@
{
SessionForceConsumerDelivery message =
(SessionForceConsumerDelivery)packet;
session.handleForceConsumerDelivery(message);
- break;
+ break;
}
case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
{
- SessionRequestProducerCreditsMessage message =
(SessionRequestProducerCreditsMessage)
- packet;
+ SessionRequestProducerCreditsMessage message =
(SessionRequestProducerCreditsMessage)packet;
session.handleRequestProducerCredits(message);
break;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -27,6 +27,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -47,17 +48,17 @@
*/
public class PagingTest extends ServiceTestBase
{
-
+
public PagingTest(String name)
{
super(name);
}
-
+
public PagingTest()
{
super();
}
-
+
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(PagingTest.class);
@@ -160,7 +161,7 @@
ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull(message2);
-
+
assertEquals(i, ((Integer)message2.getProperty(new
SimpleString("id"))).intValue());
message2.acknowledge();
@@ -244,7 +245,7 @@
message.setBody(bodyLocal);
// Stop sending message as soon as we start paging
- if (server.getPostOffice().getPagingManager().isPaging(ADDRESS))
+ if
(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging())
{
break;
}
@@ -253,7 +254,7 @@
producer.send(message);
}
- assertTrue(server.getPostOffice().getPagingManager().isPaging(ADDRESS));
+
assertTrue(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
session.start();
@@ -389,8 +390,12 @@
message.setBody(ChannelBuffers.wrappedBuffer(body));
message.putIntProperty(new SimpleString("id"), i);
+ TestSupportPageStore store = (TestSupportPageStore)server.getPostOffice()
+ .getPagingManager()
+
.getPageStore(ADDRESS);
+
// Worse scenario possible... only schedule what's on pages
- if
(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).getCurrentPage() !=
null)
+ if (store.getCurrentPage() != null)
{
message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME,
scheduledTime);
}
@@ -1112,7 +1117,7 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected Configuration createDefaultConfig()
{
Configuration config = super.createDefaultConfig();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-11-05
13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -242,10 +242,11 @@
@Override
protected PagingManager createPagingManager()
{
- return new PagingManagerImpl(new
FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory()),
+ return new PagingManagerImpl(new
FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
+
super.getConfiguration()
+
.isJournalSyncNonTransactional()),
super.getStorageManager(),
- super.getAddressSettingsRepository(),
-
super.getConfiguration().isJournalSyncNonTransactional());
+ super.getAddressSettingsRepository());
}
class FailurePagingStoreFactoryNIO extends PagingStoreFactoryNIO
@@ -255,9 +256,9 @@
* @param directory
* @param maxThreads
*/
- public FailurePagingStoreFactoryNIO(final String directory)
+ public FailurePagingStoreFactoryNIO(final String directory, final boolean
syncNonTransactional)
{
- super(directory, new
OrderedExecutorFactory(Executors.newCachedThreadPool()));
+ super(directory, new OrderedExecutorFactory(Executors.newCachedThreadPool()),
syncNonTransactional);
}
// Constants -----------------------------------------------------
@@ -277,7 +278,7 @@
factoryField.setAccessible(true);
OrderedExecutorFactory factory =
(org.hornetq.utils.OrderedExecutorFactory)factoryField.get(this);
- return new FailingPagingStore(destinationName, settings,
factory.getExecutor());
+ return new FailingPagingStore(destinationName, settings,
factory.getExecutor(), syncNonTransactional);
}
// Package protected ---------------------------------------------
@@ -297,16 +298,19 @@
*/
public FailingPagingStore(final SimpleString storeName,
final AddressSettings addressSettings,
- final Executor executor)
+ final Executor executor,
+ final boolean syncNonTransactional)
{
- super(getPostOffice().getPagingManager(),
+ super(storeName,
+ getPostOffice().getPagingManager(),
getStorageManager(),
getPostOffice(),
null,
FailurePagingStoreFactoryNIO.this,
storeName,
addressSettings,
- executor);
+ executor,
+ syncNonTransactional);
}
@Override
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -109,7 +109,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
manager.stop();
}
@@ -134,11 +135,13 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
try
{
- manager.compareJournals(new JournalLoadInformation[]{new
JournalLoadInformation(2,2), new JournalLoadInformation(2,2)});
+ manager.compareJournals(new JournalLoadInformation[] { new
JournalLoadInformation(2, 2),
+ new
JournalLoadInformation(2, 2) });
fail("Exception was expected");
}
catch (HornetQException e)
@@ -147,7 +150,8 @@
assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
}
- manager.compareJournals(new JournalLoadInformation[]{new
JournalLoadInformation(), new JournalLoadInformation()});
+ manager.compareJournals(new JournalLoadInformation[] { new
JournalLoadInformation(),
+ new
JournalLoadInformation() });
manager.stop();
}
@@ -173,13 +177,15 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
try
{
- ReplicationManagerImpl manager2 = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager2 = new
ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager2.start();
fail("Exception was expected");
@@ -212,7 +218,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
try
{
@@ -246,7 +253,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -363,7 +371,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -425,7 +434,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
fail("Exception expected");
}
@@ -450,7 +460,8 @@
try
{
- ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -580,10 +591,10 @@
{
PagingManager paging = new PagingManagerImpl(new
PagingStoreFactoryNIO(configuration.getPagingDirectory(),
-
executorFactory),
+
executorFactory,
+ false),
storageManager,
- addressSettingsRepository,
- false);
+ addressSettingsRepository);
paging.start();
return paging;
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -64,10 +64,9 @@
addressSettings.setDefault(settings);
- PagingManagerImpl managerImpl = new PagingManagerImpl(new
PagingStoreFactoryNIO(getPageDir(), new
OrderedExecutorFactory(Executors.newCachedThreadPool())),
+ PagingManagerImpl managerImpl = new PagingManagerImpl(new
PagingStoreFactoryNIO(getPageDir(), new
OrderedExecutorFactory(Executors.newCachedThreadPool()), true),
new NullStorageManager(),
- addressSettings,
- true);
+ addressSettings);
managerImpl.start();
@@ -75,11 +74,11 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"),
createRandomBuffer(10));
- assertFalse(store.page(new PagedMessageImpl(msg), true, true));
+ assertFalse(store.page(msg, true));
store.startPaging();
- assertTrue(store.page(new PagedMessageImpl(msg), true, true));
+ assertTrue(store.page(msg, true));
Page page = store.depage();
@@ -97,7 +96,7 @@
assertNull(store.depage());
- assertFalse(store.page(new PagedMessageImpl(msg), true, true));
+ assertFalse(store.page(msg, true));
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -39,12 +40,11 @@
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
-import org.hornetq.core.paging.impl.PagedMessageImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
+import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -56,8 +56,8 @@
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
@@ -142,14 +142,17 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
+
+ PagingStore storeImpl = new PagingStoreImpl(destinationTestName,
+ createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
null,
destinationTestName,
addressSettings,
- executor);
+ executor,
+ true);
storeImpl.start();
@@ -180,14 +183,16 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
- createStorageManagerMock(),
- createPostOfficeMock(),
- factory,
- storeFactory,
- destinationTestName,
- addressSettings,
- executor);
+ TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+ createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ storeFactory,
+ destinationTestName,
+ addressSettings,
+ executor,
+ true);
storeImpl.start();
@@ -204,24 +209,26 @@
buffers.add(buffer);
SimpleString destination = new SimpleString("test");
- PagedMessageImpl msg = createMessage(destination, buffer);
+ ServerMessage msg = createMessage(storeImpl, destination, buffer);
assertTrue(storeImpl.isPaging());
- assertTrue(storeImpl.page(msg, true, true));
+ assertTrue(storeImpl.page(msg, true));
assertEquals(1, storeImpl.getNumberOfPages());
storeImpl.sync();
- storeImpl = new PagingStoreImpl(createMockManager(),
+ storeImpl = new PagingStoreImpl(destinationTestName,
+ createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
null,
destinationTestName,
addressSettings,
- executor);
+ executor,
+ true);
storeImpl.start();
@@ -239,14 +246,16 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+ TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+ createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
storeFactory,
destinationTestName,
addressSettings,
- executor);
+ executor,
+ true);
storeImpl.start();
@@ -263,9 +272,9 @@
buffers.add(buffer);
- PagedMessageImpl msg = createMessage(destination, buffer);
+ ServerMessage msg = createMessage(storeImpl, destination, buffer);
- assertTrue(storeImpl.page(msg, true, true));
+ assertTrue(storeImpl.page(msg, true));
}
assertEquals(1, storeImpl.getNumberOfPages());
@@ -304,14 +313,16 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+ TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+ createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
storeFactory,
destinationTestName,
addressSettings,
- executor);
+ executor,
+ true);
storeImpl.start();
@@ -335,9 +346,9 @@
storeImpl.forceAnotherPage();
}
- PagedMessageImpl msg = createMessage(destination, buffer);
+ ServerMessage msg = createMessage(storeImpl, destination, buffer);
- assertTrue(storeImpl.page(msg, true, true));
+ assertTrue(storeImpl.page(msg, true));
}
assertEquals(2, storeImpl.getNumberOfPages());
@@ -367,9 +378,9 @@
assertTrue(storeImpl.isPaging());
- PagedMessageImpl msg = createMessage(destination, buffers.get(0));
+ ServerMessage msg = createMessage(storeImpl, destination, buffers.get(0));
- assertTrue(storeImpl.page(msg, true, true));
+ assertTrue(storeImpl.page(msg, true));
Page newPage = storeImpl.depage();
@@ -387,11 +398,11 @@
assertFalse(storeImpl.isPaging());
- assertFalse(storeImpl.page(msg, true, true));
+ assertFalse(storeImpl.page(msg, true));
storeImpl.startPaging();
- assertTrue(storeImpl.page(msg, true, true));
+ assertTrue(storeImpl.page(msg, true));
Page page = storeImpl.depage();
@@ -427,7 +438,6 @@
protected void testConcurrentPaging(final SequentialFileFactory factory, final int
numberOfThreads) throws Exception,
InterruptedException
{
-
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
final int MAX_SIZE = 1024 * 10;
@@ -438,7 +448,7 @@
final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
- final ConcurrentHashMap<Long, PagedMessageImpl> buffers = new
ConcurrentHashMap<Long, PagedMessageImpl>();
+ final ConcurrentHashMap<Long, ServerMessage> buffers = new
ConcurrentHashMap<Long, ServerMessage>();
final ArrayList<Page> readPages = new ArrayList<Page>();
@@ -446,14 +456,16 @@
settings.setPageSizeBytes(MAX_SIZE);
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- final TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+ createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
storeFactory,
new
SimpleString("test"),
settings,
- executor);
+ executor,
+ true);
storeImpl.start();
@@ -480,8 +492,8 @@
while (true)
{
long id = messageIdGenerator.incrementAndGet();
- PagedMessageImpl msg = createMessage(destination,
createRandomBuffer(id, 5));
- if (storeImpl.page(msg, false, true))
+ ServerMessage msg = createMessage(storeImpl, destination,
createRandomBuffer(id, 5));
+ if (storeImpl.page(msg, true))
{
buffers.put(id, msg);
}
@@ -564,7 +576,7 @@
throw consumer.e;
}
- final ConcurrentHashMap<Long, PagedMessage> buffers2 = new
ConcurrentHashMap<Long, PagedMessage>();
+ final ConcurrentMap<Long, ServerMessage> buffers2 = new
ConcurrentHashMap<Long, ServerMessage>();
for (Page page : readPages)
{
@@ -577,13 +589,11 @@
long id = msg.getMessage(null).getBody().readLong();
msg.getMessage(null).getBody().resetReaderIndex();
- PagedMessageImpl msgWritten = buffers.remove(id);
- buffers2.put(id, msg);
+ ServerMessage msgWritten = buffers.remove(id);
+ buffers2.put(id, msg.getMessage(null));
assertNotNull(msgWritten);
- assertEquals(msg.getMessage(null).getDestination(),
msgWritten.getMessage(null).getDestination());
- assertEqualsByteArrays(msgWritten.getMessage(null).getBody().array(),
msg.getMessage(null)
-
.getBody()
-
.array());
+ assertEquals(msg.getMessage(null).getDestination(),
msgWritten.getDestination());
+ assertEqualsByteArrays(msgWritten.getBody().array(),
msg.getMessage(null).getBody().array());
}
}
@@ -601,14 +611,16 @@
fileTmp.close();
}
- TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(),
+ TestSupportPageStore storeImpl2 = new PagingStoreImpl(destinationTestName,
+ createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
storeFactory,
new
SimpleString("test"),
settings,
- executor);
+ executor,
+ true);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
@@ -621,9 +633,9 @@
assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
long lastMessageId = messageIdGenerator.incrementAndGet();
- PagedMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId,
5));
+ ServerMessage lastMsg = createMessage(storeImpl, destination,
createRandomBuffer(lastMessageId, 5));
- storeImpl2.page(lastMsg, false, true);
+ storeImpl2.page(lastMsg, true);
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
@@ -647,12 +659,10 @@
{
long id = msg.getMessage(null).getBody().readLong();
- PagedMessage msgWritten = buffers2.remove(id);
+ ServerMessage msgWritten = buffers2.remove(id);
assertNotNull(msgWritten);
- assertEquals(msg.getMessage(null).getDestination(),
msgWritten.getMessage(null).getDestination());
- assertEqualsByteArrays(msgWritten.getMessage(null).getBody().array(),
msg.getMessage(null)
-
.getBody()
-
.array());
+ assertEquals(msg.getMessage(null).getDestination(),
msgWritten.getDestination());
+ assertEqualsByteArrays(msgWritten.getBody().array(),
msg.getMessage(null).getBody().array());
}
}
@@ -663,14 +673,11 @@
lastMessages.get(0).getMessage(null).getBody().resetReaderIndex();
assertEquals(lastMessages.get(0).getMessage(null).getBody().readLong(),
lastMessageId);
- assertEqualsByteArrays(lastMessages.get(0).getMessage(null).getBody().array(),
lastMsg.getMessage(null)
-
.getBody()
-
.array());
+ assertEqualsByteArrays(lastMessages.get(0).getMessage(null).getBody().array(),
lastMsg.getBody().array());
assertEquals(0, buffers2.size());
assertEquals(0, storeImpl.getAddressSize());
-
}
/**
@@ -691,12 +698,15 @@
return new FakePostOffice();
}
- private PagedMessageImpl createMessage(final SimpleString destination, final
HornetQBuffer buffer)
+ private ServerMessage createMessage(final PagingStore store, final SimpleString
destination, final HornetQBuffer buffer)
{
ServerMessage msg = new ServerMessageImpl((byte)1, true, 0,
System.currentTimeMillis(), (byte)0, buffer);
msg.setDestination(destination);
- return new PagedMessageImpl(msg);
+
+ msg.setPagingStore(store);
+
+ return msg;
}
private HornetQBuffer createRandomBuffer(final long id, final int size)
@@ -947,29 +957,30 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
*/
- public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo>
queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
+ public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo>
queueBindingInfos,
+ List<GroupingInfo>
groupingInfos) throws Exception
{
return new JournalLoadInformation();
}
public void addGrouping(GroupBinding groupBinding) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // To change body of implemented methods use File | Settings | File Templates.
}
public void deleteGrouping(GroupBinding groupBinding) throws Exception
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // To change body of implemented methods use File | Settings | File Templates.
}
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager,
java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
*/
public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
- PagingManager pagingManager,
- ResourceManager resourceManager,
- Map<Long, Queue> queues,
- Map<SimpleString, List<Pair<byte[],
Long>>> duplicateIDMap) throws Exception
+ PagingManager pagingManager,
+ ResourceManager resourceManager,
+ Map<Long, Queue> queues,
+ Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
{
return new JournalLoadInformation();
}
@@ -1136,8 +1147,7 @@
*/
public void afterReplicated(Runnable run)
{
-
-
+
}
/* (non-Javadoc)
@@ -1145,8 +1155,7 @@
*/
public void completeReplication()
{
-
-
+
}
/* (non-Javadoc)
@@ -1154,7 +1163,7 @@
*/
public LargeServerMessage createLargeMessage(long messageId, byte[] header)
{
-
+
return null;
}
@@ -1163,7 +1172,7 @@
*/
public boolean isReplicated()
{
-
+
return false;
}
@@ -1172,7 +1181,7 @@
*/
public JournalLoadInformation[] loadInternalOnly() throws Exception
{
- return null;
+ return null;
}
/* (non-Javadoc)
@@ -1180,8 +1189,7 @@
*/
public void pageClosed(SimpleString storeName, int pageNumber)
{
-
-
+
}
/* (non-Javadoc)
@@ -1189,8 +1197,7 @@
*/
public void pageDeleted(SimpleString storeName, int pageNumber)
{
-
-
+
}
/* (non-Javadoc)
@@ -1198,8 +1205,7 @@
*/
public void pageWrite(PagedMessage message, int pageNumber)
{
-
-
+
}
/* (non-Javadoc)
@@ -1293,8 +1299,6 @@
{
}
-
-
}
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-05
13:30:49 UTC (rev 8222)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-05
14:23:20 UTC (rev 8223)
@@ -895,6 +895,36 @@
return 0;
}
+ public PagingStore getPagingStore()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setPagingStore(PagingStore store)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public boolean page(boolean duplicateDetection) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public boolean page(long transactionID, boolean duplicateDetection) throws
Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public boolean storeIsPaging()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
class FakeFilter implements Filter