Author: borges
Date: 2011-09-07 09:38:25 -0400 (Wed, 07 Sep 2011)
New Revision: 11299
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-720 Synchronization of Pages for replication
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -15,11 +15,12 @@
import java.util.List;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.paging.cursor.LivePageCache;
import org.hornetq.core.persistence.StorageManager;
/**
- *
+ *
* @see PagingManager
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -31,7 +32,7 @@
void write(PagedMessage message) throws Exception;
List<PagedMessage> read(StorageManager storage) throws Exception;
-
+
void setLiveCache(LivePageCache pageCache);
int getSize();
@@ -45,4 +46,6 @@
void close() throws Exception;
boolean delete(PagedMessage[] messages) throws Exception;
+
+ SequentialFile getFile();
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -17,37 +17,32 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.postoffice.Address;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
/**
- *
- *
-<PRE>
-
-+------------+ 1 +-------------+ N +------------+ N +-------+ 1
+----------------+
-| {@link PostOffice} |-------> |PagingManager|-------> |{@link PagingStore}
| ------> | {@link Page} | ------> | {@link SequentialFile} |
-+------------+ +-------------+ +------------+ +-------+
+----------------+
- | 1 ^
- | |
- | |
- | | 1
- | N +---------+
- +--------> | Address |
- +---------+
-
-</PRE>
-
- *
+ * <PRE>
+ *
+ * +------------+ 1 +-------------+ N +------------+ N +-------+
1 +----------------+
+ * | {@link PostOffice} |-------> |{@link PagingManager}|-------> |{@link
PagingStore} | ------> | {@link Page} | ------> | {@link SequentialFile} |
+ * +------------+ +-------------+ +------------+ +-------+
+----------------+
+ * | 1 ^
+ * | |
+ * | |
+ * | | 1
+ * | N +---------+ /
+ * +--------> | {@link Address} |
+ * +---------+
+ * </PRE>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
- *
*/
public interface PagingManager extends HornetQComponent,
HierarchicalRepositoryChangeListener
{
- /** To return the PageStore associated with the address */
+ /** Returns the PageStore associated with the address. A new page store is created if
necessary. */
PagingStore getPageStore(SimpleString address) throws Exception;
/** An injection point for the PostOffice to inject itself */
@@ -67,18 +62,30 @@
* @param transactionID
*/
void removeTransaction(long transactionID);
-
+
Map<Long, PageTransactionInfo> getTransactions();
/**
* Reload previously created PagingStores into memory
- * @throws Exception
+ * @throws Exception
*/
void reloadStores() throws Exception;
SimpleString[] getStoreNames();
void deletePageStore(SimpleString storeName) throws Exception;
-
+
void processReload() throws Exception;
+
+ /**
+ * Lock the manager, and all its {@link PagingStore}s. This method should not be
called during
+ * normal PagingManager usage.
+ */
+ void lockAll();
+
+ /**
+ * Unlock the manager.
+ * @see #lockAll()
+ */
+ void unlockAll();
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -13,8 +13,12 @@
package org.hornetq.core.paging;
+import java.util.Collection;
+
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.RouteContextList;
import org.hornetq.core.server.RoutingContext;
@@ -23,46 +27,46 @@
import org.hornetq.core.settings.impl.AddressSettings;
/**
- *
- * <p>The implementation will take care of details such as PageSize.</p>
- * <p>The producers will write directly to PagingStore and that will decide what
- * Page file should be used based on configured size</p>
- *
+ * <p>
+ * The implementation will take care of details such as PageSize.
+ * </p>
+ * <p>
+ * The producers will write directly to PagingStore, and the store will decide what Page
file should
+ * be used based on configured size.
+ * </p>
* @see PagingManager
-
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
*/
public interface PagingStore extends HornetQComponent
{
SimpleString getAddress();
int getNumberOfPages();
-
+
// The current page in which the system is writing files
int getCurrentWritingPage();
SimpleString getStoreName();
AddressFullMessagePolicy getAddressFullMessagePolicy();
-
+
long getFirstPage();
-
+
long getTopPage();
long getPageSizeBytes();
long getAddressSize();
-
+
long getMaxSize();
-
+
void applySetting(AddressSettings addressSettings);
boolean isPaging();
// It will schedule sync to the file storage
void sync() throws Exception;
-
+
// It will perform a real sync on the current IO file
void ioSync() throws Exception;
@@ -71,23 +75,23 @@
boolean page(ServerMessage message, RoutingContext ctx, RouteContextList listCtx)
throws Exception;
Page createPage(final int page) throws Exception;
-
- boolean checkPage(final int page) throws Exception;
-
+
+ boolean checkPageFileExists(final int page) throws Exception;
+
PagingManager getPagingManager();
-
+
PageCursorProvider getCursorProvier();
-
+
void processReload() throws Exception;
-
- /**
+
+ /**
* Remove the first page from the Writing Queue.
- * The file will still exist until Page.delete is called,
+ * The file will still exist until Page.delete is called,
* So, case the system is reloaded the same Page will be loaded back if delete is not
called.
*
* @throws Exception
- *
- * Note: This should still be part of the interface, even though HornetQ only uses
through the
+ *
+ * Note: This should still be part of the interface, even though HornetQ only uses
through the
*/
Page depage() throws Exception;
@@ -103,20 +107,37 @@
void stopPaging() throws Exception;
void addSize(int size);
-
+
void executeRunnableWhenMemoryAvailable(Runnable runnable);
-
+
/** This method will hold and producer, but it wait operations to finish before
locking (write lock) */
void lock();
-
- /**
- *
+
+ /**
+ *
* Call this method using the same thread used by the last call of {@link
PagingStore#lock()}
- *
+ *
*/
void unlock();
/** This is used mostly by tests.
* We will wait any pending runnable to finish its execution */
void flushExecutors();
+
+ /**
+ * Files to synchronize with backup.
+ * @return
+ * @throws Exception
+ */
+ Collection<Integer> getCurrentIds() throws Exception;
+
+ /**
+ * Sends the pages with given ids to the replicator.
+ * <p>
+ * Sending is done here to avoid exposing the internal {@link SequentialFile}s.
+ * @param replicator
+ * @param pageIds
+ * @throws Exception
+ */
+ void sendPages(ReplicationManager replicator, Collection<Integer> pageIds)
throws Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -150,7 +150,7 @@
cache = softCache.get(pageId);
if (cache == null)
{
- if (!pagingStore.checkPage((int)pageId))
+ if (!pagingStore.checkPageFileExists((int)pageId))
{
return null;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -32,7 +32,7 @@
import org.hornetq.utils.DataConstants;
/**
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
*/
@@ -59,7 +59,7 @@
private final SequentialFile file;
private final SequentialFileFactory fileFactory;
-
+
/**
* The page cache that will be filled with data as we write more data
*/
@@ -96,7 +96,7 @@
{
return pageId;
}
-
+
public void setLiveCache(LivePageCache pageCache)
{
this.pageCache = pageCache;
@@ -109,7 +109,7 @@
size.set((int)file.size());
// Using direct buffer, as described on
https://jira.jboss.org/browse/HORNETQ-467
ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-
+
file.position(0);
file.read(buffer2);
@@ -181,7 +181,7 @@
buffer.rewind();
file.writeDirect(buffer, false);
-
+
if (pageCache != null)
{
pageCache.addLiveMessage(message);
@@ -234,7 +234,7 @@
if (msg.getMessage().isLargeMessage())
{
LargeServerMessage lmsg = (LargeServerMessage)msg.getMessage();
-
+
// Remember, cannot call delete directly here
// Because the large-message may be linked to another message
// or it may still being delivered even though it has been acked already
@@ -257,7 +257,7 @@
{
file.delete();
}
-
+
return true;
}
catch (Exception e)
@@ -276,13 +276,14 @@
{
return size.intValue();
}
-
+
+ @Override
public String toString()
{
return "PageImpl::pageID=" + this.pageId + ", file=" +
this.file;
}
-
+
/* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@@ -339,5 +340,11 @@
suspiciousRecords = true;
}
+ @Override
+ public SequentialFile getFile()
+ {
+ return file;
+ }
+
// Inner classes -------------------------------------------------
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -24,9 +24,9 @@
import org.hornetq.utils.DataConstants;
/**
- *
+ *
* This class represents a paged message
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
*
@@ -45,11 +45,14 @@
// Public --------------------------------------------------------
- /** Large messages will need to be instatiated lazily during getMessage when the
StorageManager is available */
+ /**
+ * Large messages will need to be instantiated lazily during getMessage when the
StorageManager
+ * is available
+ */
private byte[] largeMessageLazyData;
private ServerMessage message;
-
+
private long queueIDs[];
private long transactionID = 0;
@@ -74,17 +77,17 @@
{
return message;
}
-
+
public void initMessage(StorageManager storage)
{
if (largeMessageLazyData != null)
{
LargeServerMessage lgMessage = storage.createLargeMessage();
- message = lgMessage;
HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
- message.decodeHeadersAndProperties(buffer);
+ lgMessage.decodeHeadersAndProperties(buffer);
lgMessage.incrementDelayDeletionCount();
lgMessage.setPaged();
+ message = lgMessage;
largeMessageLazyData = null;
}
}
@@ -93,7 +96,7 @@
{
return transactionID;
}
-
+
public long[] getQueueIDs()
{
return queueIDs;
@@ -123,11 +126,11 @@
message.decode(buffer);
}
-
+
int queueIDsSize = buffer.readInt();
-
+
queueIDs = new long[queueIDsSize];
-
+
for (int i = 0 ; i < queueIDsSize; i++)
{
queueIDs[i] = buffer.readLong();
@@ -143,18 +146,18 @@
buffer.writeInt(message.getEncodeSize());
message.encode(buffer);
-
+
buffer.writeInt(queueIDs.length);
-
- for (int i = 0 ; i < queueIDs.length; i++)
+
+ for (long queueID : queueIDs)
{
- buffer.writeLong(queueIDs[i]);
+ buffer.writeLong(queueID);
}
}
public int getEncodeSize()
{
- return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT +
message.getEncodeSize() +
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT +
message.getEncodeSize() +
DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -18,6 +18,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
@@ -31,7 +32,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
/**
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
@@ -45,6 +46,13 @@
private volatile boolean started = false;
+ /**
+ * Lock used at the start of synchronization between a live server and its backup.
+ * Synchronization will lock all {@link PagingStore} instances, and so any operation
here that
+ * requires a lock on a {@link PagingStore} instance needs to take a read-lock on
+ * {@link #syncLock} to avoid dead-locks.
+ */
+ private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
private final ConcurrentMap<SimpleString, PagingStore> stores = new
ConcurrentHashMap<SimpleString, PagingStore>();
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
@@ -53,7 +61,8 @@
private final StorageManager storageManager;
- private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo>
transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
+ private final ConcurrentMap<Long, PageTransactionInfo> transactions =
+ new ConcurrentHashMap<Long, PageTransactionInfo>();
// Static
//
--------------------------------------------------------------------------------------------------------------------------
@@ -76,9 +85,9 @@
// Public
//
---------------------------------------------------------------------------------------------------------------------------
-
+
// Hierarchical changes listener
-
+
/* (non-Javadoc)
* @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
*/
@@ -87,60 +96,58 @@
reaplySettings();
}
-
-
// PagingManager implementation
//
-----------------------------------------------------------------------------------------------------
public void reaplySettings()
{
- for (PagingStore store : stores.values())
+ for (PagingStore store : stores.values())
{
AddressSettings settings =
this.addressSettingsRepository.getMatch(store.getAddress().toString());
store.applySetting(settings);
}
}
-
+
public SimpleString[] getStoreNames()
{
Set<SimpleString> names = stores.keySet();
return names.toArray(new SimpleString[names.size()]);
}
- public synchronized void reloadStores() throws Exception
+ public void reloadStores() throws Exception
{
- List<PagingStore> reloadedStores =
pagingStoreFactory.reloadStores(addressSettingsRepository);
+ lock();
+ try
+ {
+ List<PagingStore> reloadedStores =
pagingStoreFactory.reloadStores(addressSettingsRepository);
- for (PagingStore store : reloadedStores)
+ for (PagingStore store : reloadedStores)
+ {
+ store.start();
+ stores.put(store.getStoreName(), store);
+ }
+ }
+ finally
{
- store.start();
- stores.put(store.getStoreName(), store);
+ unlock();
}
}
- private synchronized PagingStore createPageStore(final SimpleString storeName) throws
Exception
+ public void deletePageStore(final SimpleString storeName) throws Exception
{
- PagingStore store = stores.get(storeName);
-
- if (store == null)
+ syncLock.readLock().lock();
+ try
{
- store = newStore(storeName);
-
- store.start();
-
- stores.put(storeName, store);
+ PagingStore store = stores.remove(storeName);
+ if (store != null)
+ {
+ store.stop();
+ }
}
-
- return store;
- }
-
- public void deletePageStore(final SimpleString storeName) throws Exception
- {
- PagingStore store = stores.remove(storeName);
- if (store != null)
+ finally
{
- store.stop();
+ syncLock.readLock().unlock();
}
}
@@ -149,12 +156,11 @@
{
PagingStore store = stores.get(storeName);
- if (store == null)
+ if (store != null)
{
- store = createPageStore(storeName);
+ return store;
}
-
- return store;
+ return newStore(storeName);
}
/** this will be set by the postOffice itself.
@@ -179,7 +185,7 @@
{
return transactions.get(id);
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.PagingManager#getTransactions()
*/
@@ -197,37 +203,53 @@
return started;
}
- public synchronized void start() throws Exception
+ public void start() throws Exception
{
- if (started)
+ lock();
+ try
{
- return;
- }
+ if (started)
+ {
+ return;
+ }
- pagingStoreFactory.setPagingManager(this);
+ pagingStoreFactory.setPagingManager(this);
- pagingStoreFactory.setStorageManager(storageManager);
+ pagingStoreFactory.setStorageManager(storageManager);
- reloadStores();
+ reloadStores();
- started = true;
+ started = true;
+ }
+ finally
+ {
+ unlock();
+ }
}
- public synchronized void stop() throws Exception
+ public void stop() throws Exception
{
- if (!started)
+ lock();
+ try
{
- return;
- }
+ if (!started)
+ {
+ return;
+ }
- started = false;
+ started = false;
- for (PagingStore store : stores.values())
+ for (PagingStore store : stores.values())
+ {
+ store.stop();
+ }
+
+ pagingStoreFactory.stop();
+ }
+ finally
{
- store.stop();
+ unlock();
}
-
- pagingStoreFactory.stop();
}
public void processReload() throws Exception
@@ -245,17 +267,52 @@
// Private -------------------------------------------------------
- protected PagingStore newStore(final SimpleString address)
+ private PagingStore newStore(final SimpleString address) throws Exception
{
- return pagingStoreFactory.newStore(address,
-
addressSettingsRepository.getMatch(address.toString()));
+ lock();
+ try {
+ PagingStore store = stores.get(address);
+ if (store == null)
+ {
+ store = pagingStoreFactory.newStore(address,
addressSettingsRepository.getMatch(address.toString()));
+ store.start();
+ stores.put(address, store);
+ }
+ return store;
+ }
+ finally
+ {
+ unlock();
+ }
}
-
- protected PagingStoreFactory getStoreFactory()
+
+ private void unlock()
{
- return pagingStoreFactory;
+ syncLock.writeLock().unlock();
}
- // Inner classes -------------------------------------------------
+ private void lock()
+ {
+ syncLock.writeLock().lock();
+ }
+ @Override
+ public synchronized void lockAll()
+ {
+ syncLock.writeLock().lock();
+ for (PagingStore store : stores.values())
+ {
+ store.lock();
+ }
+ }
+
+ @Override
+ public void unlockAll()
+ {
+ for (PagingStore store : stores.values())
+ {
+ store.unlock();
+ }
+ syncLock.writeLock().unlock();
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -40,7 +40,7 @@
import org.hornetq.utils.UUIDGenerator;
/**
- *
+ *
* Integration point between Paging and NIO
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
@@ -62,9 +62,9 @@
protected final boolean syncNonTransactional;
private PagingManager pagingManager;
-
+
private final ScheduledExecutorService scheduledExecutor;
-
+
private final long syncTimeout;
private StorageManager storageManager;
@@ -86,9 +86,9 @@
this.executorFactory = executorFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
this.scheduledExecutor = scheduledExecutor;
-
+
this.syncTimeout = syncTimeout;
}
@@ -155,6 +155,7 @@
public void setPostOffice(final PostOffice postOffice)
{
+ assert this.postOffice == null;
this.postOffice = postOffice;
}
@@ -233,22 +234,22 @@
{
return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName,
false);
}
-
+
protected PagingManager getPagingManager()
{
return pagingManager;
}
-
+
protected StorageManager getStorageManager()
{
return storageManager;
}
-
+
protected PostOffice getPostOffice()
{
return postOffice;
}
-
+
protected ExecutorFactory getExecutorFactory()
{
return executorFactory;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -13,8 +13,9 @@
package org.hornetq.core.paging.impl;
-import java.io.File;
import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -42,10 +43,8 @@
import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.RouteContextList;
@@ -56,15 +55,13 @@
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
-import org.hornetq.core.transaction.TransactionOperationAbstract;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Future;
/**
- *
+ *
* @see PagingStore
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
@@ -210,7 +207,7 @@
pageSize = addressSettings.getPageSizeBytes();
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
-
+
if (cursorProvider != null)
{
cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
@@ -219,6 +216,7 @@
// Public --------------------------------------------------------
+ @Override
public String toString()
{
return "PagingStoreImpl(" + this.address + ")";
@@ -375,21 +373,28 @@
public synchronized void stop() throws Exception
{
- if (running)
+ lock();
+ try
{
+ if (running)
+ {
+ cursorProvider.stop();
- cursorProvider.stop();
+ running = false;
- running = false;
+ flushExecutors();
- flushExecutors();
-
- if (currentPage != null)
- {
- currentPage.close();
- currentPage = null;
+ if (currentPage != null)
+ {
+ currentPage.close();
+ currentPage = null;
+ }
}
}
+ finally
+ {
+ unlock();
+ }
}
public void flushExecutors()
@@ -556,8 +561,8 @@
{
return currentPage;
}
-
- public boolean checkPage(final int pageNumber)
+
+ public boolean checkPageFileExists(final int pageNumber)
{
String fileName = createFileName(pageNumber);
SequentialFile file = fileFactory.createSequentialFile(fileName, 1);
@@ -566,7 +571,10 @@
public Page createPage(final int pageNumber) throws Exception
{
- String fileName = createFileName(pageNumber);
+ lock();
+ try
+ {
+ String fileName = createFileName(pageNumber);
if (fileFactory == null)
{
@@ -585,6 +593,12 @@
file.close();
return page;
+ }
+
+ finally
+ {
+ unlock();
+ }
}
public void forceAnotherPage() throws Exception
@@ -592,13 +606,13 @@
openNewPage();
}
- /**
- * It returns a Page out of the Page System without reading it.
+ /**
+ * It returns a Page out of the Page System without reading it.
* The method calling this method will remove the page and will start reading it
outside of any locks.
* This method could also replace the current file by a new file, and that process is
done through acquiring a writeLock on currentPageLock
- *
- * Observation: This method is used internally as part of the regular depage process,
but externally is used only on tests,
- * and that's why this method is part of the Testable Interface
+ *
+ * Observation: This method is used internally as part of the regular depage process,
but externally is used only on tests,
+ * and that's why this method is part of the Testable Interface
* */
public Page depage() throws Exception
{
@@ -681,7 +695,7 @@
* @return
* @throws Exception
*/
- private Queue<OurRunnable> onMemoryFreedRunnables = new
ConcurrentLinkedQueue<OurRunnable>();
+ private final Queue<OurRunnable> onMemoryFreedRunnables = new
ConcurrentLinkedQueue<OurRunnable>();
private class MemoryFreedRunnablesExecutor implements Runnable
{
@@ -866,7 +880,7 @@
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx,
listCtx), tx == null ? -1 : tx.getID());
-
+
if (message.isLargeMessage())
{
((LargeServerMessage)message).setPaged();
@@ -880,9 +894,9 @@
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
}
-
+
currentPage.write(pagedMessage);
-
+
if (tx != null)
{
installPageTransaction(tx, listCtx);
@@ -945,15 +959,15 @@
private static class FinishPageMessageOperation implements TransactionOperation
{
public final PageTransactionInfo pageTransaction;
-
+
private final StorageManager storageManager;
-
+
private final PagingManager pagingManager;
-
+
private final Set<PagingStore> usedStores = new
HashSet<PagingStore>();
private boolean stored = false;
-
+
public void addStore(PagingStore store)
{
this.usedStores.add(store);
@@ -1078,9 +1092,9 @@
}
/**
- *
+ *
* Note: Decimalformat is not thread safe, Use synchronization before calling this
method
- *
+ *
* @param pageID
* @return
*/
@@ -1100,5 +1114,41 @@
return maxSize > 0 && getAddressSize() > maxSize;
}
+ @Override
+ public Collection<Integer> getCurrentIds() throws Exception
+ {
+ List<Integer> ids = new ArrayList<Integer>();
+ if (fileFactory != null)
+ {
+ for (String fileName : fileFactory.listFiles("page"))
+ {
+ ids.add(getPageIdFromFileName(fileName));
+ }
+ }
+ return ids;
+ }
+
+ @Override
+ public void sendPages(ReplicationManager replicator, Collection<Integer>
pageIds) throws Exception
+ {
+ lock();
+ try
+ {
+ for (Integer id : pageIds)
+ {
+ SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id),
1);
+ if (!sFile.exists())
+ {
+ continue;
+ }
+ replicator.syncPages(sFile, id, getAddress());
+ }
+ }
+ finally
+ {
+ unlock();
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -14,7 +14,6 @@
package org.hornetq.core.persistence.impl.journal;
import java.io.File;
-import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.security.AccessController;
@@ -350,9 +349,10 @@
/**
* XXX FIXME HORNETQ-720 Method ignores the synchronization of Paging.
* @param replicationManager
+ * @param pagingManager
* @throws HornetQException
*/
- public void startReplication(ReplicationManager replicationManager) throws Exception
+ public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception
{
if (!started)
{
@@ -374,6 +374,7 @@
final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
Map<String, Long> largeMessageFilesToSync;
+ Map<SimpleString, Collection<Integer>> pageFilesToSync;
try
{
storageManagerLock.writeLock().lock();
@@ -385,9 +386,18 @@
localBindingsJournal.writeLock();
try
{
- messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
- largeMessageFilesToSync = getLargeMessageInformation();
+ pagingManager.lockAll();
+ try
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ pageFilesToSync = getPageInformationForSync(pagingManager);
+ largeMessageFilesToSync = getLargeMessageInformation();
+ }
+ finally
+ {
+ pagingManager.unlockAll();
+ }
}
finally
{
@@ -405,6 +415,7 @@
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
sendLargeMessageFiles(largeMessageFilesToSync);
+ sendPagesToBackup(pageFilesToSync, pagingManager);
storageManagerLock.writeLock().lock();
try
@@ -424,6 +435,42 @@
}
}
+ /**
+ * @param pageFilesToSync
+ * @throws Exception
+ */
+ private void sendPagesToBackup(Map<SimpleString, Collection<Integer>>
pageFilesToSync, PagingManager manager)
+ throws Exception
+ {
+ for (Entry<SimpleString, Collection<Integer>> entry :
pageFilesToSync.entrySet())
+ {
+ PagingStore store = manager.getPageStore(entry.getKey());
+ store.sendPages(replicator, entry.getValue());
+ }
+
+ }
+
+ /**
+ * @param pagingManager
+ * @return
+ * @throws Exception
+ */
+ private Map<SimpleString, Collection<Integer>>
getPageInformationForSync(PagingManager pagingManager)
+ throws Exception
+ {
+ Map<SimpleString, Collection<Integer>> info = new
HashMap<SimpleString, Collection<Integer>>();
+ for (SimpleString storeName : pagingManager.getStoreNames())
+ {
+ PagingStore store = pagingManager.getPageStore(storeName);
+ List<Integer> ids = new ArrayList<Integer>();
+ info.put(storeName, store.getCurrentIds());
+ // XXX perhaps before? unnecessary?
+ store.forceAnotherPage();
+ }
+ replicator.sendPagingInfo(info);
+ return info;
+ }
+
private void sendLargeMessageFiles(Map<String, Long> largeMessageFilesToSync)
throws Exception
{
for (Entry<String, Long> entry : largeMessageFilesToSync.entrySet())
@@ -464,11 +511,7 @@
}
/**
- * Send an entire journal file to a replicating server (a backup server that is).
- * @param jf
- * @param replicator2
- * @throws IOException
- * @throws HornetQException
+ * Send an entire journal file to a replicating backup server.
*/
private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws
Exception
{
@@ -809,7 +852,7 @@
readLock();
try
{
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
}
finally
{
@@ -855,9 +898,9 @@
readLock();
try
{
- pageTransaction.setRecordID(generateUniqueID());
+ pageTransaction.setRecordID(generateUniqueID());
- messageJournal.appendAddRecordTransactional(txID,
+ messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
pageTransaction);
@@ -873,11 +916,10 @@
readLock();
try
{
- messageJournal.appendUpdateRecordTransactional(txID,
- pageTransaction.getRecordID(),
-
JournalStorageManager.PAGE_TRANSACTION,
- new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
- depages));
+ messageJournal.appendUpdateRecordTransactional(txID,
pageTransaction.getRecordID(),
+
JournalStorageManager.PAGE_TRANSACTION,
+ new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+
depages));
}
finally
{
@@ -890,7 +932,7 @@
readLock();
try
{
- messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
+ messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new
PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
@@ -907,7 +949,7 @@
readLock();
try
{
- messageJournal.appendUpdateRecordTransactional(txID,
+ messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID));
@@ -923,7 +965,7 @@
readLock();
try
{
- messageJournal.appendUpdateRecordTransactional(txID,
+ messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID));
@@ -1042,7 +1084,7 @@
readLock();
try
{
- messageJournal.appendDeleteRecordTransactional(txID, messageID, new
DeleteEncoding(queueID));
+ messageJournal.appendDeleteRecordTransactional(txID, messageID, new
DeleteEncoding(queueID));
}
finally
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -102,6 +102,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -536,6 +537,11 @@
packet = new ReplicationSyncFileMessage();
break;
}
+ case PacketImpl.REPLICATION_CURRENT_PAGES_INFO:
+ {
+ packet = new ReplicationCurrentPagesMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -196,6 +196,7 @@
public static final byte HA_BACKUP_REGISTRATION = 113;
public static final byte REPLICATION_START_STOP_SYNC = 120;
+ public static final byte REPLICATION_CURRENT_PAGES_INFO = 121;
// Static --------------------------------------------------------
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -0,0 +1,77 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+public final class ReplicationCurrentPagesMessage extends PacketImpl
+{
+
+ private Map<SimpleString, Collection<Integer>> info;
+
+ /**
+ * @param type
+ */
+ public ReplicationCurrentPagesMessage()
+ {
+ super(REPLICATION_CURRENT_PAGES_INFO);
+ }
+
+ /**
+ * @param info
+ */
+ public ReplicationCurrentPagesMessage(Map<SimpleString,
Collection<Integer>> info)
+ {
+ this();
+ this.info = info;
+ }
+
+ @Override
+ public void decodeRest(HornetQBuffer buffer)
+ {
+ info = new HashMap<SimpleString, Collection<Integer>>();
+ int entries = buffer.readInt();
+ for (int i = 0; i < entries; i++)
+ {
+ SimpleString name = buffer.readSimpleString();
+ int nPages = buffer.readInt();
+ List<Integer> ids = new ArrayList<Integer>(nPages);
+ for (int j = 0; j < nPages; j++)
+ {
+ ids.add(Integer.valueOf(buffer.readInt()));
+ }
+ info.put(name, ids);
+ }
+ }
+
+ @Override
+ public void encodeRest(HornetQBuffer buffer)
+ {
+ buffer.writeInt(info.size());
+ for (Entry<SimpleString, Collection<Integer>> entry : info.entrySet())
+ {
+ buffer.writeSimpleString(entry.getKey());
+ Collection<Integer> value = entry.getValue();
+ buffer.writeInt(value.size());
+ for (Integer id : value)
+ {
+ buffer.writeInt(id);
+ }
+ }
+ }
+
+ public Map<SimpleString, Collection<Integer>> getInfo()
+ {
+ return info;
+ }
+}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -1,17 +1,17 @@
package org.hornetq.core.protocol.core.impl.wireformat;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Message is used to:
- * <ol>
- * <li>copy JournalFile data over to the backup during synchronization;
- * <li>send a up-to-date signal to backup;
- * </ol>
+ * Message is used to sync {@link SequentialFile}s to a backup server. The {@link
FileType} controls
+ * which extra information is sent.
*/
public final class ReplicationSyncFileMessage extends PacketImpl
{
@@ -28,31 +28,89 @@
private int dataSize;
private ByteBuffer byteBuffer;
private byte[] byteArray;
+ private SimpleString pageStoreName;
+ private FileType fileType;
+ public enum FileType
+ {
+ JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
+
+ private byte code;
+
+ private FileType(int code)
+ {
+ this.code = (byte)code;
+ }
+
+ /**
+ * @param readByte
+ * @return
+ */
+ public static FileType getFileType(byte readByte)
+ {
+ for (FileType type : EnumSet.allOf(FileType.class))
+ {
+ if (type.code == readByte)
+ return type;
+ }
+ throw new InternalError("Unsupported byte value for " +
FileType.class);
+ }
+ }
+
public ReplicationSyncFileMessage()
{
super(REPLICATION_SYNC_FILE);
}
- public ReplicationSyncFileMessage(JournalContent content, long id, int size,
ByteBuffer buffer)
+ public ReplicationSyncFileMessage(JournalContent content, SimpleString storeName, long
id, int size,
+ ByteBuffer buffer)
{
this();
this.byteBuffer = buffer;
+ this.pageStoreName = storeName;
this.dataSize = size;
this.fileId = id;
this.journalType = content;
+ determineType();
}
+ private void determineType()
+ {
+ if (journalType != null)
+ {
+ fileType = FileType.JOURNAL;
+ }
+ else if (pageStoreName != null)
+ {
+ fileType = FileType.PAGE;
+ }
+ else
+ {
+ fileType = FileType.LARGE_MESSAGE;
+ }
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeLong(fileId);
if (fileId == -1)
return;
- boolean isJournal = journalType != null;
- buffer.writeBoolean(isJournal);
- if (isJournal)
- buffer.writeByte(journalType.typeByte);
+ buffer.writeByte(fileType.code);
+ switch (fileType)
+ {
+ case JOURNAL:
+ {
+ buffer.writeByte(journalType.typeByte);
+ break;
+ }
+ case PAGE:
+ {
+ buffer.writeSimpleString(pageStoreName);
+ break;
+ }
+ }
+
buffer.writeInt(dataSize);
/*
* sending -1 will close the file in case of a journal, but not in case of a
largeMessage
@@ -68,9 +126,25 @@
public void decodeRest(final HornetQBuffer buffer)
{
fileId = buffer.readLong();
- if (buffer.readBoolean())
+ switch (FileType.getFileType(buffer.readByte()))
{
- journalType = JournalContent.getType(buffer.readByte());
+ case JOURNAL:
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ fileType = FileType.JOURNAL;
+ break;
+ }
+ case PAGE:
+ {
+ pageStoreName = buffer.readSimpleString();
+ fileType = FileType.PAGE;
+ break;
+ }
+ case LARGE_MESSAGE:
+ {
+ fileType = FileType.LARGE_MESSAGE;
+ break;
+ }
}
int size = buffer.readInt();
if (size > 0)
@@ -90,19 +164,18 @@
return journalType;
}
- /**
- * @return
- */
public byte[] getData()
{
return byteArray;
}
- /**
- * @return
- */
- public boolean isLargeMessage()
+ public FileType getFileType()
{
- return journalType == null;
+ return fileType;
}
+
+ public SimpleString getPageStore()
+ {
+ return pageStoreName;
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -13,6 +13,8 @@
package org.hornetq.core.replication;
+import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
@@ -114,4 +116,14 @@
* @throws Exception
*/
void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws
Exception;
+
+ void sendPagingInfo(Map<SimpleString, Collection<Integer>> info);
+
+ /**
+ * @param file
+ * @param id
+ * @param pageStore
+ * @throws Exception
+ */
+ void syncPages(SequentialFile file, long id, SimpleString pageStore) throws
Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -14,6 +14,7 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -47,6 +48,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -205,6 +207,10 @@
{
handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
}
+ else if (type == PacketImpl.REPLICATION_CURRENT_PAGES_INFO)
+ {
+ handleCurrentPagesInfo((ReplicationCurrentPagesMessage)packet);
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
@@ -224,17 +230,22 @@
channel.send(response);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ /**
+ * @param packet
*/
+ private void handleCurrentPagesInfo(ReplicationCurrentPagesMessage packet)
+ {
+ for (Entry<SimpleString, Collection<Integer>> entry :
packet.getInfo().entrySet())
+ {
+ // ignore the actual file list for the moment...
+ }
+ }
+
public boolean isStarted()
{
return started;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
public void start() throws Exception
{
Configuration config = server.getConfiguration();
@@ -429,7 +440,6 @@
SequentialFile sq = lm.getFile();
LargeServerMessage mainLM = largeMessagesOnSync.get(id);
SequentialFile mainSeqFile = mainLM.getFile();
- System.out.println(mainSeqFile);
for (;;)
{
buffer.rewind();
@@ -463,27 +473,41 @@
Long id = Long.valueOf(msg.getId());
byte[] data = msg.getData();
SequentialFile sf;
- if (msg.isLargeMessage())
+ switch (msg.getFileType())
{
- synchronized (largeMessagesOnSync)
+ case LARGE_MESSAGE:
{
- LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
- if (largeMessage == null)
+ synchronized (largeMessagesOnSync)
{
- largeMessage = storage.createLargeMessage();
- largeMessage.setDurable(true);
- largeMessage.setMessageID(id);
- largeMessagesOnSync.put(id, largeMessage);
+ LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+ if (largeMessage == null)
+ {
+ largeMessage = storage.createLargeMessage();
+ largeMessage.setDurable(true);
+ largeMessage.setMessageID(id);
+ largeMessagesOnSync.put(id, largeMessage);
+ }
+ sf = largeMessage.getFile();
}
- sf = largeMessage.getFile();
+ break;
}
+ case JOURNAL:
+ {
+ JournalFile journalFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
+ sf = journalFile.getFile();
+ break;
+ }
+ case PAGE:
+ {
+ Page page = getPage(msg.getPageStore(), (int)msg.getId());
+
+ sf = page.getFile();
+ break;
+ }
+ default:
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled
file type " + msg.getFileType());
}
- else
- {
- JournalFile journalFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
- sf = journalFile.getFile();
- }
if (data == null)
{
sf.close();
@@ -751,7 +775,6 @@
page.close();
}
}
-
}
/**
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -14,7 +14,9 @@
package org.hornetq.core.replication.impl;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.LinkedHashSet;
+import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,6 +44,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -508,24 +511,32 @@
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" + file.size()
+ ") to backup. " + file);
- sendLargeFile(content, jf.getFileID(), file, Long.MAX_VALUE);
+ sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
}
@Override
public void syncLargeMessageFile(SequentialFile file, long size, long id) throws
Exception
{
- sendLargeFile(null, id, file, size);
+ sendLargeFile(null, null, id, file, size);
}
+ @Override
+ public void syncPages(SequentialFile file, long id, SimpleString queueName) throws
Exception
+ {
+ sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+ }
+
/**
* Sends large files in reasonably sized chunks to the backup during replication
synchronization.
- * @param content journal type or {@code null} for large-messages
+ * @param content journal type or {@code null} for large-messages and pages
+ * @param pageStore page store name for pages, or {@code null} otherwise
* @param id journal file id or (large) message id
* @param file
* @param maxBytesToSend maximum number of bytes to read and send from the file
* @throws Exception
*/
- private void sendLargeFile(JournalContent content, final long id, SequentialFile file,
long maxBytesToSend)
+ private void sendLargeFile(JournalContent content, SimpleString pageStore, final long
id, SequentialFile file,
+ long maxBytesToSend)
throws Exception
{
if (!file.isOpen())
@@ -554,7 +565,7 @@
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationSyncFileMessage(content, id, bytesRead,
buffer));
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
bytesRead, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
@@ -572,4 +583,10 @@
ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
sendReplicatePacket(msg);
}
+
+ @Override
+ public void sendPagingInfo(Map<SimpleString, Collection<Integer>> info)
+ {
+ sendReplicatePacket(new ReplicationCurrentPagesMessage(info));
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -58,7 +58,6 @@
import org.hornetq.core.deployers.impl.SecurityDeployer;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
-import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
@@ -1564,7 +1563,7 @@
pagingManager.reloadStores();
- JournalLoadInformation[] journalInfo = loadJournals();
+ loadJournals();
final ServerInfo dumper = new ServerInfo(this, pagingManager);
@@ -1646,15 +1645,13 @@
}
}
- private JournalLoadInformation[] loadJournals() throws Exception
+ private void loadJournals() throws Exception
{
- JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
-
List<QueueBindingInfo> queueBindingInfos = new
ArrayList<QueueBindingInfo>();
List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
- journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos,
groupingInfos);
+ storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
recoverStoredConfigs();
@@ -1685,8 +1682,6 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
-
-
}
for (GroupingInfo groupingInfo : groupingInfos)
@@ -1701,7 +1696,7 @@
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new
HashMap<SimpleString, List<Pair<byte[], Long>>>();
- journalInfo[1] = storageManager.loadMessageJournal(postOffice,
+ storageManager.loadMessageJournal(postOffice,
pagingManager,
resourceManager,
queues,
@@ -1720,7 +1715,6 @@
}
}
- return journalInfo;
}
/**
@@ -2012,11 +2006,10 @@
}
JournalStorageManager journalStorageManager =
(JournalStorageManager)storageManager;
-
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
- journalStorageManager.startReplication(replicationManager);
+ journalStorageManager.startReplication(replicationManager, pagingManager);
}
/**
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -19,6 +19,7 @@
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
/**
@@ -118,7 +119,8 @@
@Override
public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport
record) throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id,
record);
+ writeRecord(deleteRecordTX, false, null);
}
@Override
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -13,6 +13,7 @@
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -27,7 +28,7 @@
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- protected static final int N_MSGS = 10;
+ protected int n_msgs = 10;
@Override
protected void setUp() throws Exception
@@ -59,17 +60,22 @@
for (int i = 0; i < totalRounds; i++)
{
messageJournal.forceMoveNextFile();
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
}
backupServer.start();
-
+ syncDelay.deliverUpToDateMsg();
waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
// SEND more messages, now with the backup replicating
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
Set<Long> liveIds = getFileIds(messageJournal);
-
+ PagingStore ps = liveServer.getServer().getPagingManager().getPageStore(ADDRESS);
+ if (ps.getPageSizeBytes() == PAGE_SIZE)
+ {
+ assertTrue("isStarted", ps.isStarted());
+ assertFalse("start paging should return false, because we expect paging to
be running", ps.startPaging());
+ }
finishSyncAndFailover();
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
@@ -79,7 +85,7 @@
// "+ 2": there two other calls that send N_MSGS.
for (int i = 0; i < totalRounds + 2; i++)
{
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
}
assertNoMoreMessages();
}
@@ -108,13 +114,13 @@
backupServer.start();
waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
session.commit();
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
finishSyncAndFailover();
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
assertNoMoreMessages();
}
@@ -131,16 +137,16 @@
{
createProducerSendSomeMessages();
startBackupCrashLive();
- receiveMsgsInRange(0, N_MSGS);
+ receiveMsgsInRange(0, n_msgs);
assertNoMoreMessages();
}
public void testMessageSync() throws Exception
{
createProducerSendSomeMessages();
- receiveMsgsInRange(0, N_MSGS / 2);
+ receiveMsgsInRange(0, n_msgs / 2);
startBackupCrashLive();
- receiveMsgsInRange(N_MSGS / 2, N_MSGS);
+ receiveMsgsInRange(n_msgs / 2, n_msgs);
assertNoMoreMessages();
}
@@ -159,7 +165,7 @@
session = sessionFactory.createSession(true, true);
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer, N_MSGS);
+ sendMessages(session, producer, n_msgs);
session.commit();
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -40,8 +40,8 @@
File dir = new
File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
System.out.println("Dir " + dir.getAbsolutePath() + " " +
dir.exists());
// Set<Long> idsOnBkp = getAllMessageFileIds(dir);
- receiveMsgsInRange(0, N_MSGS / 2);
- assertEquals("we really ought to delete these after delivery", N_MSGS /
2, getAllMessageFileIds(dir).size());
+ receiveMsgsInRange(0, n_msgs / 2);
+ assertEquals("we really ought to delete these after delivery", n_msgs /
2, getAllMessageFileIds(dir).size());
}
private Set<Long> getAllMessageFileIds(File dir)
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java
(rev 0)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -0,0 +1,34 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+public class BackupSyncPagingTest extends BackupSyncJournalTest
+{
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ n_msgs = 100;
+ super.setUp();
+ }
+
+ @Override
+ protected HornetQServer createInVMFailoverServer(final boolean realFiles, final
Configuration configuration,
+ final NodeManager nodeManager)
+ {
+ Map<String, AddressSettings> conf = new HashMap<String,
AddressSettings>();
+ AddressSettings as = new AddressSettings();
+ as.setMaxSizeBytes(PAGE_MAX);
+ as.setPageSizeBytes(PAGE_SIZE);
+ as.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+ conf.put(ADDRESS.toString(), as);
+ return createInVMFailoverServer(realFiles, configuration, PAGE_SIZE, PAGE_MAX,
conf, nodeManager);
+ }
+}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -68,6 +68,9 @@
protected static final int MIN_LARGE_MESSAGE = 1024;
private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
+ protected static final int PAGE_MAX = 2 * 1024;
+ protected static final int PAGE_SIZE = 1024;
+
// Attributes ----------------------------------------------------
protected TestableServer liveServer;
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -46,12 +46,8 @@
// Constants -----------------------------------------------------
- private static final int PAGE_MAX = 100 * 1024;
+ private static final SimpleString ADDRESS = new
SimpleString("SimpleAddress");
- private static final int PAGE_SIZE = 10 * 1024;
-
- static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
// Attributes ----------------------------------------------------
private ServerLocator locator;
@@ -91,7 +87,7 @@
internalTestPage(true, false);
}
- public void testPageTransactionedFailBeforeconsume() throws Exception
+ public void testPageTransactionedFailBeforeConsume() throws Exception
{
internalTestPage(true, true);
}
@@ -130,9 +126,12 @@
if (failBeforeConsume)
{
crash(session);
+ waitForBackup(sf, 60);
}
+
+
session.close();
session = sf.createSession(!transacted, !transacted, 0);
@@ -217,11 +216,7 @@
@Override
protected HornetQServer createServer(final boolean realFiles, final Configuration
configuration)
{
- return createInVMFailoverServer(true,
- configuration,
- PagingFailoverTest.PAGE_SIZE,
- PagingFailoverTest.PAGE_MAX,
- new HashMap<String, AddressSettings>(),
+ return createInVMFailoverServer(true, configuration, PAGE_SIZE, PAGE_MAX, new
HashMap<String, AddressSettings>(),
nodeManager);
}
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -14,9 +14,7 @@
package org.hornetq.tests.unit.core.paging.impl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -53,6 +51,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.hornetq.tests.unit.util.FakePagingManager;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.ExecutorFactory;
@@ -939,155 +938,6 @@
// Inner classes -------------------------------------------------
- class FakePagingManager implements PagingManager
- {
-
- public void activate()
- {
- }
-
- public long addSize(final long size)
- {
- return 0;
- }
-
- public void addTransaction(final PageTransactionInfo pageTransaction)
- {
- }
-
- public PagingStore createPageStore(final SimpleString destination) throws
Exception
- {
- return null;
- }
-
- public long getTotalMemory()
- {
- return 0;
- }
-
- public SimpleString[] getStoreNames()
- {
- return null;
- }
-
- public long getMaxMemory()
- {
- return 0;
- }
-
- public PagingStore getPageStore(final SimpleString address) throws Exception
- {
- return null;
- }
-
- public void deletePageStore(SimpleString storeName) throws Exception
- {
- }
-
- public PageTransactionInfo getTransaction(final long transactionID)
- {
- return null;
- }
-
- public boolean isBackup()
- {
- return false;
- }
-
- public boolean isGlobalPageMode()
- {
- return false;
- }
-
- public boolean isPaging(final SimpleString destination) throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final boolean duplicateDetection)
throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final long transactionId, final
boolean duplicateDetection) throws Exception
- {
- return false;
- }
-
- public void reloadStores() throws Exception
- {
- }
-
- public void removeTransaction(final long transactionID)
- {
-
- }
-
- public void setGlobalPageMode(final boolean globalMode)
- {
- }
-
- public void setPostOffice(final PostOffice postOffice)
- {
- }
-
- public void resumeDepages()
- {
- }
-
- public void sync(final Collection<SimpleString> destinationsToSync) throws
Exception
- {
- }
-
- public boolean isStarted()
- {
- return false;
- }
-
- public void start() throws Exception
- {
- }
-
- public void stop() throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
- */
- public boolean isGlobalFull()
- {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#getTransactions()
- */
- public Map<Long, PageTransactionInfo> getTransactions()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#processReload()
- */
- public void processReload()
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
- */
- public void onChange()
- {
- }
-
- }
-
-
class FakeStoreFactory implements PagingStoreFactory
{
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -14,10 +14,8 @@
package org.hornetq.tests.unit.core.postoffice.impl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -28,18 +26,15 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.paging.PageTransactionInfo;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.GroupingInfo;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.unit.util.FakePagingManager;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
@@ -186,160 +181,4 @@
}
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
- static class FakePagingManager implements PagingManager
- {
-
- public void activate()
- {
- }
-
- public long addSize(final long size)
- {
- return 0;
- }
-
- public void addTransaction(final PageTransactionInfo pageTransaction)
- {
- }
-
- public PagingStore createPageStore(final SimpleString destination) throws
Exception
- {
- return null;
- }
-
- public long getTotalMemory()
- {
- return 0;
- }
-
- public SimpleString[] getStoreNames()
- {
- return null;
- }
-
- public long getMaxMemory()
- {
- return 0;
- }
-
- public PagingStore getPageStore(final SimpleString address) throws Exception
- {
- return null;
- }
-
- public void deletePageStore(SimpleString storeName) throws Exception
- {
- }
-
- public PageTransactionInfo getTransaction(final long transactionID)
- {
- return null;
- }
-
- public boolean isBackup()
- {
- return false;
- }
-
- public boolean isGlobalPageMode()
- {
- return false;
- }
-
- public boolean isPaging(final SimpleString destination) throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final boolean duplicateDetection)
throws Exception
- {
- return false;
- }
-
- public boolean page(final ServerMessage message, final long transactionId, final
boolean duplicateDetection) throws Exception
- {
- return false;
- }
-
- public void reloadStores() throws Exception
- {
- }
-
- public void removeTransaction(final long transactionID)
- {
-
- }
-
- public void setGlobalPageMode(final boolean globalMode)
- {
- }
-
- public void setPostOffice(final PostOffice postOffice)
- {
- }
-
- public void resumeDepages()
- {
- }
-
- public void sync(final Collection<SimpleString> destinationsToSync) throws
Exception
- {
- }
-
- public boolean isStarted()
- {
- return false;
- }
-
- public void start() throws Exception
- {
- }
-
- public void stop() throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
- */
- public boolean isGlobalFull()
- {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#getTransactions()
- */
- public Map<Long, PageTransactionInfo> getTransactions()
- {
- return null;
- }
-
-
-
-
- /* (non-Javadoc)
- * @see org.hornetq.core.paging.PagingManager#processReload()
- */
- public void processReload()
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
- */
- public void onChange()
- {
- }
-
- }
-
}
Added:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java
(rev 0)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -0,0 +1,173 @@
+package org.hornetq.tests.unit.util;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.ServerMessage;
+
+public final class FakePagingManager implements PagingManager
+{
+
+ public void activate()
+ {
+ }
+
+ public long addSize(final long size)
+ {
+ return 0;
+ }
+
+ public void addTransaction(final PageTransactionInfo pageTransaction)
+ {
+ }
+
+ public PagingStore createPageStore(final SimpleString destination) throws Exception
+ {
+ return null;
+ }
+
+ public long getTotalMemory()
+ {
+ return 0;
+ }
+
+ public SimpleString[] getStoreNames()
+ {
+ return null;
+ }
+
+ public long getMaxMemory()
+ {
+ return 0;
+ }
+
+ public PagingStore getPageStore(final SimpleString address) throws Exception
+ {
+ return null;
+ }
+
+ public void deletePageStore(SimpleString storeName) throws Exception
+ {
+ }
+
+ public PageTransactionInfo getTransaction(final long transactionID)
+ {
+ return null;
+ }
+
+ public boolean isBackup()
+ {
+ return false;
+ }
+
+ public boolean isGlobalPageMode()
+ {
+ return false;
+ }
+
+ public boolean isPaging(final SimpleString destination) throws Exception
+ {
+ return false;
+ }
+
+ public boolean page(final ServerMessage message, final boolean duplicateDetection)
throws Exception
+ {
+ return false;
+ }
+
+ public boolean page(final ServerMessage message, final long transactionId, final
boolean duplicateDetection)
+ throws Exception
+ {
+ return false;
+ }
+
+ public void reloadStores() throws Exception
+ {
+ }
+
+ public void removeTransaction(final long transactionID)
+ {
+
+ }
+
+ public void setGlobalPageMode(final boolean globalMode)
+ {
+ }
+
+ public void setPostOffice(final PostOffice postOffice)
+ {
+ }
+
+ public void resumeDepages()
+ {
+ }
+
+ public void sync(final Collection<SimpleString> destinationsToSync) throws
Exception
+ {
+ }
+
+ public boolean isStarted()
+ {
+ return false;
+ }
+
+ public void start() throws Exception
+ {
+ }
+
+ public void stop() throws Exception
+ {
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
+ */
+ public boolean isGlobalFull()
+ {
+ return false;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#getTransactions()
+ */
+ public Map<Long, PageTransactionInfo> getTransactions()
+ {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.paging.PagingManager#processReload()
+ */
+ public void processReload()
+ {
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+ */
+ public void onChange()
+ {
+ }
+
+ @Override
+ public void lockAll()
+ {
+ // no-op
+ }
+
+ @Override
+ public void unlockAll()
+ {
+ // no-op
+ }
+
+}
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-09-07
12:45:04 UTC (rev 11298)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-09-07
13:38:25 UTC (rev 11299)
@@ -509,7 +509,7 @@
*/
protected void assertMessageBody(final int i, final ClientMessage message)
{
- Assert.assertEquals("message" + i,
message.getBodyBuffer().readString());
+ Assert.assertEquals(message.toString(), "message" + i,
message.getBodyBuffer().readString());
}
/**