[hornetq-commits] JBoss hornetq SVN: r9515 - in trunk: src/main/org/hornetq/core/paging/impl and 11 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Aug 6 12:58:46 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-06 12:58:45 -0400 (Fri, 06 Aug 2010)
New Revision: 9515
Added:
trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java
Modified:
trunk/src/main/org/hornetq/core/paging/Page.java
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
HORNETQ-472 - Avoid excessive compression on journal after depaging
Modified: trunk/src/main/org/hornetq/core/paging/Page.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/Page.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/Page.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -39,5 +39,5 @@
void close() throws Exception;
- void delete() throws Exception;
+ boolean delete() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -49,9 +49,9 @@
void sync() throws Exception;
- boolean page(ServerMessage message, long transactionId, boolean duplicateDetection) throws Exception;
+ boolean page(ServerMessage message, long transactionId) throws Exception;
- boolean page(ServerMessage message, boolean duplicateDetection) throws Exception;
+ boolean page(ServerMessage message) throws Exception;
Page createPage(final int page) throws Exception;
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -196,24 +196,34 @@
file.close();
}
- public void delete() throws Exception
+ public boolean delete() throws Exception
{
if (storageManager != null)
{
storageManager.pageDeleted(storeName, pageId);
}
- if (suspiciousRecords)
+ try
{
- PageImpl.log.warn("File " + file.getFileName() +
- " being renamed to " +
- file.getFileName() +
- ".invalidPage as it was loaded partially. Please verify your data.");
- file.renameTo(file.getFileName() + ".invalidPage");
+ if (suspiciousRecords)
+ {
+ PageImpl.log.warn("File " + file.getFileName() +
+ " being renamed to " +
+ file.getFileName() +
+ ".invalidPage as it was loaded partially. Please verify your data.");
+ file.renameTo(file.getFileName() + ".invalidPage");
+ }
+ else
+ {
+ file.delete();
+ }
+
+ return true;
}
- else
+ catch (Exception e)
{
- file.delete();
+ log.warn("Error while deleting page file", e);
+ return false;
}
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -13,7 +13,6 @@
package org.hornetq.core.paging.impl;
-import java.nio.ByteBuffer;
import java.text.DecimalFormat;
import java.util.HashSet;
import java.util.List;
@@ -29,7 +28,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -41,6 +39,7 @@
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
@@ -110,6 +109,9 @@
private volatile Page currentPage;
private final ReentrantLock writeLock = new ReentrantLock();
+
+ /** duplicate cache used at this address */
+ private final DuplicateIDCache duplicateCache;
/**
* We need to perform checks on currentPage with minimal locking
@@ -183,6 +185,17 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
+
+ // Post office could be null on the backup node
+ if (postOffice == null)
+ {
+ this.duplicateCache = null;
+ }
+ else
+ {
+ this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
+ }
+
}
// Public --------------------------------------------------------
@@ -249,17 +262,17 @@
return storeName;
}
- public boolean page(final ServerMessage message, final long transactionID, final boolean duplicateDetection) throws Exception
+ public boolean page(final ServerMessage message, final long transactionID) throws Exception
{
// The sync on transactions is done on commit only
- return page(message, transactionID, false, duplicateDetection);
+ return page(message, transactionID, false);
}
- public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
+ public boolean page(final ServerMessage message) throws Exception
{
// If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
// of crash
- return page(message, -1, syncNonTransactional && message.isDurable(), duplicateDetection);
+ return page(message, -1, syncNonTransactional && message.isDurable());
}
public void sync() throws Exception
@@ -635,7 +648,15 @@
if (onDepage(page.getPageId(), storeName, messages))
{
- page.delete();
+ if (page.delete())
+ {
+ // DuplicateCache could be null during replication
+ // however the deletes on the journal will happen through replicated journal
+ if (duplicateCache != null)
+ {
+ duplicateCache.deleteFromCache(generateDuplicateID(page.getPageId()));
+ }
+ }
return true;
}
@@ -777,8 +798,7 @@
private boolean page(final ServerMessage message,
final long transactionID,
- final boolean sync,
- final boolean duplicateDetection) throws Exception
+ final boolean sync) throws Exception
{
if (!running)
{
@@ -836,20 +856,6 @@
return false;
}
- if (duplicateDetection)
- {
- // We set the duplicate detection header to prevent the message being depaged more than once in case of
- // failure during depage
-
- byte[] bytes = new byte[8];
-
- ByteBuffer buff = ByteBuffer.wrap(bytes);
-
- buff.putLong(message.getMessageID());
-
- message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
- }
-
PagedMessage pagedMessage;
if (!message.isDurable())
@@ -933,9 +939,23 @@
// Depage has to be done atomically, in case of failure it should be
// back to where it was
-
+
+ byte[] duplicateIdForPage = generateDuplicateID(pageId);
+
Transaction depageTransaction = new TransactionImpl(storageManager);
+ // DuplicateCache could be null during replication
+ if (duplicateCache != null)
+ {
+ if (duplicateCache.contains(duplicateIdForPage))
+ {
+ log.warn("Page " + pageId + " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
+ return true;
+ }
+
+ duplicateCache.addToCache(duplicateIdForPage, depageTransaction);
+ }
+
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
@@ -1057,8 +1077,18 @@
}
/**
+ * @param pageId
* @return
*/
+ private byte[] generateDuplicateID(final int pageId)
+ {
+ byte duplicateIdForPage[] = new SimpleString("page-" + pageId).getData();
+ return duplicateIdForPage;
+ }
+
+ /**
+ * @return
+ */
private boolean isAddressFull(final long nextPageSize)
{
return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;
Modified: trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -32,6 +32,8 @@
boolean contains(byte[] duplicateID);
void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
+
+ void deleteFromCache(byte [] duplicateID) throws Exception;
void load(List<Pair<byte[], Long>> theIds) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -17,7 +17,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
@@ -43,7 +44,8 @@
{
private static final Logger log = Logger.getLogger(DuplicateIDCacheImpl.class);
- private final Set<ByteArrayHolder> cache = new org.hornetq.utils.ConcurrentHashSet<ByteArrayHolder>();
+ // ByteHolder, position
+ private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<ByteArrayHolder, Integer>();
private final SimpleString address;
@@ -89,7 +91,7 @@
Pair<ByteArrayHolder, Long> pair = new Pair<ByteArrayHolder, Long>(bah, id.b);
- cache.add(bah);
+ cache.put(bah, ids.size());
ids.add(pair);
}
@@ -120,20 +122,52 @@
}
}
+
+
+ public void deleteFromCache(byte [] duplicateID) throws Exception
+ {
+ ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
+
+ Integer posUsed = cache.remove(bah);
+
+ if (posUsed != null)
+ {
+ Pair<ByteArrayHolder, Long> id;
+
+ synchronized (this)
+ {
+ id = ids.get(posUsed.intValue());
+
+ if (id.a.equals(bah))
+ {
+ id.a = null;
+ storageManager.deleteDuplicateID(id.b);
+ id.b = null;
+ }
+ else
+ {
+ System.out.println("Can't delete duplicateID");
+ }
+ }
+ }
+
+ }
+
public boolean contains(final byte[] duplID)
{
- return cache.contains(new ByteArrayHolder(duplID));
+ return cache.get(new ByteArrayHolder(duplID)) != null;
}
public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception
{
- long recordID = storageManager.generateUniqueID();
+ long recordID = -1;
if (tx == null)
{
if (persist)
{
+ recordID = storageManager.generateUniqueID();
storageManager.storeDuplicateID(address, duplID, recordID);
}
@@ -143,6 +177,7 @@
{
if (persist)
{
+ recordID = storageManager.generateUniqueID();
storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
tx.setContainsPersistent();
@@ -156,7 +191,9 @@
private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID)
{
- cache.add(new ByteArrayHolder(duplID));
+ ByteArrayHolder holder = new ByteArrayHolder(duplID);
+
+ cache.put(holder, pos);
Pair<ByteArrayHolder, Long> id;
@@ -165,32 +202,43 @@
// Need fast array style access here -hence ArrayList typing
id = ids.get(pos);
- cache.remove(id.a);
-
- // Record already exists - we delete the old one and add the new one
- // Note we can't use update since journal update doesn't let older records get
- // reclaimed
- id.a = new ByteArrayHolder(duplID);
-
- if (persist)
+ // The id here might be null if it was explicit deleted
+ if (id.a != null)
{
- try
+ cache.remove(id.a);
+
+ // Record already exists - we delete the old one and add the new one
+ // Note we can't use update since journal update doesn't let older records get
+ // reclaimed
+
+ if (id.b != null)
{
- storageManager.deleteDuplicateID(id.b);
+ try
+ {
+ storageManager.deleteDuplicateID(id.b);
+ }
+ catch (Exception e)
+ {
+ DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
+ }
}
- catch (Exception e)
- {
- DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
- }
-
- id.b = recordID;
}
+
+ id.a = holder;
+
+ // The recordID could be negative if the duplicateCache is configured to not persist,
+ // -1 would mean null on this case
+ id.b = recordID >= 0 ? recordID : null;
+
+ holder.pos = pos;
}
else
{
- id = new Pair<ByteArrayHolder, Long>(new ByteArrayHolder(duplID), recordID);
+ id = new Pair<ByteArrayHolder, Long>(holder, recordID >= 0 ? recordID : null);
ids.add(id);
+
+ holder.pos = pos;
}
if (pos++ == cacheSize - 1)
@@ -270,6 +318,8 @@
final byte[] bytes;
int hash;
+
+ int pos;
@Override
public boolean equals(final Object other)
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -604,7 +604,7 @@
if (context.getTransaction() == null)
{
- if (message.page(true))
+ if (message.page())
{
return;
}
@@ -1206,11 +1206,9 @@
Set<PagingStore> pagingStoresToSync = new HashSet<PagingStore>();
- // We only need to add the dupl id header once per transaction
- boolean first = true;
for (ServerMessage message : messagesToPage)
{
- if (message.page(tx.getID(), first))
+ if (message.page(tx.getID()))
{
if (message.isDurable())
{
@@ -1231,7 +1229,6 @@
}
route(message, subTX, false);
}
- first = false;
}
if (pagingPersistent)
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -264,6 +264,8 @@
}
largeMessages.clear();
+
+ pageManager.stop();
}
/* (non-Javadoc)
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -56,9 +56,9 @@
PagingStore getPagingStore();
- boolean page(boolean duplicateDetection) throws Exception;
+ boolean page() throws Exception;
- boolean page(long transactionID, boolean duplicateDetection) throws Exception;
+ boolean page(long transactionID) throws Exception;
boolean storeIsPaging();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -253,11 +253,11 @@
return pagingStore;
}
- public boolean page(final boolean duplicateDetection) throws Exception
+ public boolean page() throws Exception
{
if (pagingStore != null)
{
- return pagingStore.page(this, duplicateDetection);
+ return pagingStore.page(this);
}
else
{
@@ -265,11 +265,11 @@
}
}
- public boolean page(final long transactionID, final boolean duplicateDetection) throws Exception
+ public boolean page(final long transactionID) throws Exception
{
if (pagingStore != null)
{
- return pagingStore.page(this, transactionID, duplicateDetection);
+ return pagingStore.page(this, transactionID);
}
else
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -32,7 +32,6 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -129,14 +128,6 @@
{
failSession(session, latch);
}
- else
- {
- endpoint = (ReplicationEndpointImpl)((HornetQServerImpl)server1Service).getReplicationEndpoint();
- if (endpoint != null)
- {
- endpoint.setDeletePages(false);
- }
- }
session.start();
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -357,9 +357,11 @@
* @throws Exception
* @see org.hornetq.core.paging.Page#delete()
*/
- public void delete() throws Exception
+ public boolean delete() throws Exception
{
- // This will let the file stay, simulating a system failure
+
+ System.out.println("Won't delete");
+ return false;
}
/**
Added: trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A DuplicateCacheTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DuplicateCacheTest extends StorageManagerTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testDuplicate() throws Exception
+ {
+ createStorage();
+
+ DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true);
+
+ TransactionImpl tx = new TransactionImpl(journal);
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, tx);
+ }
+
+ tx.commit();
+
+ tx = new TransactionImpl(journal);
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, tx);
+ }
+
+ tx.commit();
+
+ byte[] id = RandomUtil.randomBytes();
+
+ assertFalse(cache.contains(id));
+
+ cache.addToCache(id, null);
+
+ assertTrue(cache.contains(id));
+
+ cache.deleteFromCache(id);
+
+ assertFalse(cache.contains(id));
+
+ cache.deleteFromCache(id);
+
+ }
+
+
+ public void testDuplicateNonPersistent() throws Exception
+ {
+ createStorage();
+
+ DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false);
+
+ TransactionImpl tx = new TransactionImpl(journal);
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, tx);
+ }
+
+ tx.commit();
+
+ for (int i = 0 ; i < 5000; i++)
+ {
+ byte [] bytes = RandomUtil.randomBytes();
+
+ cache.addToCache(bytes, null);
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -33,6 +33,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -62,10 +63,15 @@
AddressSettings settings = new AddressSettings();
settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setDefault(settings);
+
+
+ PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(getPageDir(),
+ new OrderedExecutorFactory(Executors.newCachedThreadPool()),
+ true);
+
+ storeFactory.setPostOffice(new FakePostOffice());
- PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(),
- new OrderedExecutorFactory(Executors.newCachedThreadPool()),
- true),
+ PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory,
new NullStorageManager(),
addressSettings);
@@ -75,11 +81,11 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- Assert.assertFalse(store.page(msg, true));
+ Assert.assertFalse(store.page(msg));
store.startPaging();
- Assert.assertTrue(store.page(msg, true));
+ Assert.assertTrue(store.page(msg));
Page page = store.depage();
@@ -91,7 +97,7 @@
Assert.assertEquals(1, msgs.size());
- UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
+ UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
.getMessage(null)
.getBodyBuffer()
.toByteBuffer()
@@ -101,7 +107,7 @@
Assert.assertNull(store.depage());
- Assert.assertFalse(store.page(msg, true));
+ Assert.assertFalse(store.page(msg));
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -216,7 +216,7 @@
Assert.assertTrue(storeImpl.isPaging());
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -279,7 +279,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
}
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -359,7 +359,7 @@
ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
}
Assert.assertEquals(2, storeImpl.getNumberOfPages());
@@ -395,7 +395,7 @@
ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
Page newPage = storeImpl.depage();
@@ -413,11 +413,11 @@
Assert.assertFalse(storeImpl.isPaging());
- Assert.assertFalse(storeImpl.page(msg, true));
+ Assert.assertFalse(storeImpl.page(msg));
storeImpl.startPaging();
- Assert.assertTrue(storeImpl.page(msg, true));
+ Assert.assertTrue(storeImpl.page(msg));
Page page = storeImpl.depage();
@@ -513,7 +513,7 @@
// This is possible because the depage thread is not actually reading the pages.
// Just using the internal API to remove it from the page file system
ServerMessage msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5));
- if (storeImpl.page(msg, false))
+ if (storeImpl.page(msg))
{
buffers.put(id, msg);
}
@@ -658,7 +658,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
- storeImpl2.page(lastMsg, true);
+ storeImpl2.page(lastMsg);
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
@@ -685,10 +685,9 @@
ServerMessage msgWritten = buffers2.remove(id);
Assert.assertNotNull(msgWritten);
Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
- UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().toByteBuffer().array(), msg.getMessage(null)
- .getBodyBuffer()
- .toByteBuffer()
- .array());
+ UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(),
+ msgWritten.getBodyBuffer().toByteBuffer().array(),
+ msg.getMessage(null).getBodyBuffer().toByteBuffer().array());
}
}
@@ -814,7 +813,7 @@
{
return null;
}
-
+
public void deletePageStore(SimpleString storeName) throws Exception
{
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -398,13 +398,13 @@
return null;
}
- public boolean page(final boolean duplicateDetection) throws Exception
+ public boolean page() throws Exception
{
// TODO Auto-generated method stub
return false;
}
- public boolean page(final long transactionID, final boolean duplicateDetection) throws Exception
+ public boolean page(final long transactionID) throws Exception
{
// TODO Auto-generated method stub
return false;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2010-08-06 16:58:45 UTC (rev 9515)
@@ -15,10 +15,12 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
@@ -88,8 +90,7 @@
*/
public DuplicateIDCache getDuplicateIDCache(final SimpleString address)
{
- // TODO Auto-generated method stub
- return null;
+ return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false);
}
/* (non-Javadoc)
More information about the hornetq-commits
mailing list