[hornetq-commits] JBoss hornetq SVN: r9515 - in trunk: src/main/org/hornetq/core/paging/impl and 11 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 6 12:58:46 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-06 12:58:45 -0400 (Fri, 06 Aug 2010)
New Revision: 9515

Added:
   trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java
Modified:
   trunk/src/main/org/hornetq/core/paging/Page.java
   trunk/src/main/org/hornetq/core/paging/PagingStore.java
   trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
   trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
HORNETQ-472 - Avoid excessive compression on journal after depaging

Modified: trunk/src/main/org/hornetq/core/paging/Page.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/Page.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/Page.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -39,5 +39,5 @@
 
    void close() throws Exception;
 
-   void delete() throws Exception;
+   boolean delete() throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -49,9 +49,9 @@
 
    void sync() throws Exception;
 
-   boolean page(ServerMessage message, long transactionId, boolean duplicateDetection) throws Exception;
+   boolean page(ServerMessage message, long transactionId) throws Exception;
 
-   boolean page(ServerMessage message, boolean duplicateDetection) throws Exception;
+   boolean page(ServerMessage message) throws Exception;
 
    Page createPage(final int page) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -196,24 +196,34 @@
       file.close();
    }
 
-   public void delete() throws Exception
+   public boolean delete() throws Exception
    {
       if (storageManager != null)
       {
          storageManager.pageDeleted(storeName, pageId);
       }
 
-      if (suspiciousRecords)
+      try
       {
-         PageImpl.log.warn("File " + file.getFileName() +
-                           " being renamed to " +
-                           file.getFileName() +
-                           ".invalidPage as it was loaded partially. Please verify your data.");
-         file.renameTo(file.getFileName() + ".invalidPage");
+         if (suspiciousRecords)
+         {
+            PageImpl.log.warn("File " + file.getFileName() +
+                              " being renamed to " +
+                              file.getFileName() +
+                              ".invalidPage as it was loaded partially. Please verify your data.");
+            file.renameTo(file.getFileName() + ".invalidPage");
+         }
+         else
+         {
+            file.delete();
+         }
+         
+         return true;
       }
-      else
+      catch (Exception e)
       {
-         file.delete();
+         log.warn("Error while deleting page file", e);
+         return false;
       }
    }
 

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.paging.impl;
 
-import java.nio.ByteBuffer;
 import java.text.DecimalFormat;
 import java.util.HashSet;
 import java.util.List;
@@ -29,7 +28,6 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
@@ -41,6 +39,7 @@
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.PagingStoreFactory;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
@@ -110,6 +109,9 @@
    private volatile Page currentPage;
 
    private final ReentrantLock writeLock = new ReentrantLock();
+   
+   /** duplicate cache used at this address */
+   private final DuplicateIDCache duplicateCache;
 
    /** 
     * We need to perform checks on currentPage with minimal locking
@@ -183,6 +185,17 @@
       this.storeFactory = storeFactory;
 
       this.syncNonTransactional = syncNonTransactional;
+      
+      // Post office could be null on the backup node
+      if (postOffice == null)
+      {
+         this.duplicateCache = null;
+      }
+      else
+      {
+         this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
+      }
+      
    }
 
    // Public --------------------------------------------------------
@@ -249,17 +262,17 @@
       return storeName;
    }
 
-   public boolean page(final ServerMessage message, final long transactionID, final boolean duplicateDetection) throws Exception
+   public boolean page(final ServerMessage message, final long transactionID) throws Exception
    {
       // The sync on transactions is done on commit only
-      return page(message, transactionID, false, duplicateDetection);
+      return page(message, transactionID, false);
    }
 
-   public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
+   public boolean page(final ServerMessage message) throws Exception
    {
       // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
       // of crash
-      return page(message, -1, syncNonTransactional && message.isDurable(), duplicateDetection);
+      return page(message, -1, syncNonTransactional && message.isDurable());
    }
 
    public void sync() throws Exception
@@ -635,7 +648,15 @@
 
       if (onDepage(page.getPageId(), storeName, messages))
       {
-         page.delete();
+         if (page.delete())
+         {
+            // DuplicateCache could be null during replication
+            // however the deletes on the journal will happen through replicated journal
+            if (duplicateCache != null)
+            {
+               duplicateCache.deleteFromCache(generateDuplicateID(page.getPageId()));
+            }
+         }
 
          return true;
       }
@@ -777,8 +798,7 @@
 
    private boolean page(final ServerMessage message,
                         final long transactionID,
-                        final boolean sync,
-                        final boolean duplicateDetection) throws Exception
+                        final boolean sync) throws Exception
    {
       if (!running)
       {
@@ -836,20 +856,6 @@
             return false;
          }
 
-         if (duplicateDetection)
-         {
-            // We set the duplicate detection header to prevent the message being depaged more than once in case of
-            // failure during depage
-
-            byte[] bytes = new byte[8];
-
-            ByteBuffer buff = ByteBuffer.wrap(bytes);
-
-            buff.putLong(message.getMessageID());
-
-            message.putBytesProperty(Message.HDR_DUPLICATE_DETECTION_ID, bytes);
-         }
-
          PagedMessage pagedMessage;
          
          if (!message.isDurable())
@@ -933,9 +939,23 @@
 
       // Depage has to be done atomically, in case of failure it should be
       // back to where it was
-
+      
+      byte[] duplicateIdForPage = generateDuplicateID(pageId);
+      
       Transaction depageTransaction = new TransactionImpl(storageManager);
 
+      // DuplicateCache could be null during replication
+      if (duplicateCache != null)
+      {
+         if (duplicateCache.contains(duplicateIdForPage))
+         {
+            log.warn("Page " + pageId + " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
+            return true;
+         }
+
+         duplicateCache.addToCache(duplicateIdForPage, depageTransaction);
+      }
+
       depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
 
       HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
@@ -1057,8 +1077,18 @@
    }
 
    /**
+    * @param pageId
     * @return
     */
+   private byte[] generateDuplicateID(final int pageId)
+   {
+      byte duplicateIdForPage[] = new SimpleString("page-" + pageId).getData();
+      return duplicateIdForPage;
+   }
+
+   /**
+    * @return
+    */
    private boolean isAddressFull(final long nextPageSize)
    {
       return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;

Modified: trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/DuplicateIDCache.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -32,6 +32,8 @@
    boolean contains(byte[] duplicateID);
 
    void addToCache(byte[] duplicateID, Transaction tx) throws Exception;
+   
+   void deleteFromCache(byte [] duplicateID) throws Exception;
 
    void load(List<Pair<byte[], Long>> theIds) throws Exception;
 }

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -17,7 +17,8 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
@@ -43,7 +44,8 @@
 {
    private static final Logger log = Logger.getLogger(DuplicateIDCacheImpl.class);
 
-   private final Set<ByteArrayHolder> cache = new org.hornetq.utils.ConcurrentHashSet<ByteArrayHolder>();
+   // ByteHolder, position
+   private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<ByteArrayHolder, Integer>();
 
    private final SimpleString address;
 
@@ -89,7 +91,7 @@
 
             Pair<ByteArrayHolder, Long> pair = new Pair<ByteArrayHolder, Long>(bah, id.b);
 
-            cache.add(bah);
+            cache.put(bah, ids.size());
 
             ids.add(pair);
          }
@@ -120,20 +122,52 @@
       }
 
    }
+   
+   
+   public void deleteFromCache(byte [] duplicateID) throws Exception
+   {
+      ByteArrayHolder bah = new ByteArrayHolder(duplicateID);
+      
+      Integer posUsed = cache.remove(bah);
+      
+      if (posUsed != null)
+      {
+         Pair<ByteArrayHolder, Long> id;
+   
+         synchronized (this)
+         {
+            id = ids.get(posUsed.intValue());
+            
+            if (id.a.equals(bah))
+            {
+               id.a = null;
+               storageManager.deleteDuplicateID(id.b);
+               id.b = null;
+            }
+            else
+            {
+               System.out.println("Can't delete duplicateID");
+            }
+         }
+      }
+      
+   }
+   
 
    public boolean contains(final byte[] duplID)
    {
-      return cache.contains(new ByteArrayHolder(duplID));
+      return cache.get(new ByteArrayHolder(duplID)) != null;
    }
 
    public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception
    {
-      long recordID = storageManager.generateUniqueID();
+      long recordID = -1;
 
       if (tx == null)
       {
          if (persist)
          {
+            recordID = storageManager.generateUniqueID();
             storageManager.storeDuplicateID(address, duplID, recordID);
          }
 
@@ -143,6 +177,7 @@
       {
          if (persist)
          {
+            recordID = storageManager.generateUniqueID();
             storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
 
             tx.setContainsPersistent();
@@ -156,7 +191,9 @@
 
    private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID)
    {
-      cache.add(new ByteArrayHolder(duplID));
+      ByteArrayHolder holder = new ByteArrayHolder(duplID);
+      
+      cache.put(holder, pos);
 
       Pair<ByteArrayHolder, Long> id;
 
@@ -165,32 +202,43 @@
          // Need fast array style access here -hence ArrayList typing
          id = ids.get(pos);
 
-         cache.remove(id.a);
-
-         // Record already exists - we delete the old one and add the new one
-         // Note we can't use update since journal update doesn't let older records get
-         // reclaimed
-         id.a = new ByteArrayHolder(duplID);
-
-         if (persist)
+         // The id here might be null if it was explicit deleted
+         if (id.a != null)
          {
-            try
+            cache.remove(id.a);
+   
+            // Record already exists - we delete the old one and add the new one
+            // Note we can't use update since journal update doesn't let older records get
+            // reclaimed
+   
+            if (id.b != null)
             {
-               storageManager.deleteDuplicateID(id.b);
+               try
+               {
+                  storageManager.deleteDuplicateID(id.b);
+               }
+               catch (Exception e)
+               {
+                  DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
+               }
             }
-            catch (Exception e)
-            {
-               DuplicateIDCacheImpl.log.warn("Error on deleting duplicate cache", e);
-            }
-   
-            id.b = recordID;
          }
+
+         id.a = holder;
+
+         // The recordID could be negative if the duplicateCache is configured to not persist, 
+         // -1 would mean null on this case
+         id.b = recordID >= 0 ? recordID : null;
+         
+         holder.pos = pos;
       }
       else
       {
-         id = new Pair<ByteArrayHolder, Long>(new ByteArrayHolder(duplID), recordID);
+         id = new Pair<ByteArrayHolder, Long>(holder, recordID >= 0 ? recordID : null);
 
          ids.add(id);
+         
+         holder.pos = pos;
       }
 
       if (pos++ == cacheSize - 1)
@@ -270,6 +318,8 @@
       final byte[] bytes;
 
       int hash;
+      
+      int pos;
 
       @Override
       public boolean equals(final Object other)

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -604,7 +604,7 @@
 
       if (context.getTransaction() == null)
       {
-         if (message.page(true))
+         if (message.page())
          {
             return;
          }
@@ -1206,11 +1206,9 @@
 
             Set<PagingStore> pagingStoresToSync = new HashSet<PagingStore>();
 
-            // We only need to add the dupl id header once per transaction
-            boolean first = true;
             for (ServerMessage message : messagesToPage)
             {
-               if (message.page(tx.getID(), first))
+               if (message.page(tx.getID()))
                {
                   if (message.isDurable())
                   {
@@ -1231,7 +1229,6 @@
                   }
                   route(message, subTX, false);
                }
-               first = false;
             }
 
             if (pagingPersistent)

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -264,6 +264,8 @@
       }
 
       largeMessages.clear();
+      
+      pageManager.stop();
    }
 
    /* (non-Javadoc)

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -56,9 +56,9 @@
 
    PagingStore getPagingStore();
 
-   boolean page(boolean duplicateDetection) throws Exception;
+   boolean page() throws Exception;
 
-   boolean page(long transactionID, boolean duplicateDetection) throws Exception;
+   boolean page(long transactionID) throws Exception;
 
    boolean storeIsPaging();
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -253,11 +253,11 @@
       return pagingStore;
    }
 
-   public boolean page(final boolean duplicateDetection) throws Exception
+   public boolean page() throws Exception
    {
       if (pagingStore != null)
       {
-         return pagingStore.page(this, duplicateDetection);
+         return pagingStore.page(this);
       }
       else
       {
@@ -265,11 +265,11 @@
       }
    }
 
-   public boolean page(final long transactionID, final boolean duplicateDetection) throws Exception
+   public boolean page(final long transactionID) throws Exception
    {
       if (pagingStore != null)
       {
-         return pagingStore.page(this, transactionID, duplicateDetection);
+         return pagingStore.page(this, transactionID);
       }
       else
       {

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -32,7 +32,6 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
 import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 
@@ -129,14 +128,6 @@
          {
             failSession(session, latch);
          }
-         else
-         {
-            endpoint = (ReplicationEndpointImpl)((HornetQServerImpl)server1Service).getReplicationEndpoint();
-            if (endpoint != null)
-            {
-               endpoint.setDeletePages(false);
-            }
-         }
 
          session.start();
 

Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -357,9 +357,11 @@
           * @throws Exception
           * @see org.hornetq.core.paging.Page#delete()
           */
-         public void delete() throws Exception
+         public boolean delete() throws Exception
          {
-            // This will let the file stay, simulating a system failure
+            
+            System.out.println("Won't delete");
+            return false;
          }
 
          /**

Added: trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/DuplicateCacheTest.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.persistence;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.postoffice.DuplicateIDCache;
+import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
+import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.util.RandomUtil;
+
+/**
+ * A DuplicateCacheTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DuplicateCacheTest extends StorageManagerTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   public void testDuplicate() throws Exception
+   {
+      createStorage();
+      
+      DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, true);
+      
+      TransactionImpl tx = new TransactionImpl(journal);
+      
+      for (int i = 0 ; i < 5000; i++)
+      {
+         byte [] bytes = RandomUtil.randomBytes();
+         
+         cache.addToCache(bytes, tx);
+      }
+      
+      tx.commit();
+      
+      tx = new TransactionImpl(journal);
+      
+      for (int i = 0 ; i < 5000; i++)
+      {
+         byte [] bytes = RandomUtil.randomBytes();
+         
+         cache.addToCache(bytes, tx);
+      }
+      
+      tx.commit();
+      
+      byte[] id = RandomUtil.randomBytes();
+      
+      assertFalse(cache.contains(id));
+      
+      cache.addToCache(id, null);
+      
+      assertTrue(cache.contains(id));
+      
+      cache.deleteFromCache(id);
+      
+      assertFalse(cache.contains(id));
+      
+      cache.deleteFromCache(id);
+      
+   }
+
+   
+   public void testDuplicateNonPersistent() throws Exception
+   {
+      createStorage();
+      
+      DuplicateIDCache cache = new DuplicateIDCacheImpl(new SimpleString("test"), 2000, journal, false);
+      
+      TransactionImpl tx = new TransactionImpl(journal);
+      
+      for (int i = 0 ; i < 5000; i++)
+      {
+         byte [] bytes = RandomUtil.randomBytes();
+         
+         cache.addToCache(bytes, tx);
+      }
+      
+      tx.commit();
+      
+      for (int i = 0 ; i < 5000; i++)
+      {
+         byte [] bytes = RandomUtil.randomBytes();
+         
+         cache.addToCache(bytes, null);
+      }
+      
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -33,6 +33,7 @@
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.OrderedExecutorFactory;
@@ -62,10 +63,15 @@
       AddressSettings settings = new AddressSettings();
       settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       addressSettings.setDefault(settings);
+      
+      
+      PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(getPageDir(),
+                                new OrderedExecutorFactory(Executors.newCachedThreadPool()),
+                                true);
+      
+      storeFactory.setPostOffice(new FakePostOffice());
 
-      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(),
-                                                                                      new OrderedExecutorFactory(Executors.newCachedThreadPool()),
-                                                                                      true),
+      PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory,
                                                             new NullStorageManager(),
                                                             addressSettings);
 
@@ -75,11 +81,11 @@
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
 
-      Assert.assertFalse(store.page(msg, true));
+      Assert.assertFalse(store.page(msg));
 
       store.startPaging();
 
-      Assert.assertTrue(store.page(msg, true));
+      Assert.assertTrue(store.page(msg));
 
       Page page = store.depage();
 
@@ -91,7 +97,7 @@
 
       Assert.assertEquals(1, msgs.size());
 
-      UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
+      UnitTestCase.assertEqualsByteArrays(msg.getBodyBuffer().writerIndex(), msg.getBodyBuffer().toByteBuffer().array(), msgs.get(0)
                                                                                           .getMessage(null)
                                                                                           .getBodyBuffer()
                                                                                           .toByteBuffer()
@@ -101,7 +107,7 @@
 
       Assert.assertNull(store.depage());
 
-      Assert.assertFalse(store.page(msg, true));
+      Assert.assertFalse(store.page(msg));
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -216,7 +216,7 @@
 
       Assert.assertTrue(storeImpl.isPaging());
 
-      Assert.assertTrue(storeImpl.page(msg, true));
+      Assert.assertTrue(storeImpl.page(msg));
 
       Assert.assertEquals(1, storeImpl.getNumberOfPages());
 
@@ -279,7 +279,7 @@
 
          ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
 
-         Assert.assertTrue(storeImpl.page(msg, true));
+         Assert.assertTrue(storeImpl.page(msg));
       }
 
       Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -359,7 +359,7 @@
 
          ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
 
-         Assert.assertTrue(storeImpl.page(msg, true));
+         Assert.assertTrue(storeImpl.page(msg));
       }
 
       Assert.assertEquals(2, storeImpl.getNumberOfPages());
@@ -395,7 +395,7 @@
 
       ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
 
-      Assert.assertTrue(storeImpl.page(msg, true));
+      Assert.assertTrue(storeImpl.page(msg));
 
       Page newPage = storeImpl.depage();
 
@@ -413,11 +413,11 @@
 
       Assert.assertFalse(storeImpl.isPaging());
 
-      Assert.assertFalse(storeImpl.page(msg, true));
+      Assert.assertFalse(storeImpl.page(msg));
 
       storeImpl.startPaging();
 
-      Assert.assertTrue(storeImpl.page(msg, true));
+      Assert.assertTrue(storeImpl.page(msg));
 
       Page page = storeImpl.depage();
 
@@ -513,7 +513,7 @@
                   // This is possible because the depage thread is not actually reading the pages.
                   // Just using the internal API to remove it from the page file system
                   ServerMessage msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5));
-                  if (storeImpl.page(msg, false))
+                  if (storeImpl.page(msg))
                   {
                      buffers.put(id, msg);
                   }
@@ -658,7 +658,7 @@
       long lastMessageId = messageIdGenerator.incrementAndGet();
       ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
 
-      storeImpl2.page(lastMsg, true);
+      storeImpl2.page(lastMsg);
       buffers2.put(lastMessageId, lastMsg);
 
       Page lastPage = null;
@@ -685,10 +685,9 @@
             ServerMessage msgWritten = buffers2.remove(id);
             Assert.assertNotNull(msgWritten);
             Assert.assertEquals(msg.getMessage(null).getAddress(), msgWritten.getAddress());
-            UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().toByteBuffer().array(), msg.getMessage(null)
-                                                                                                      .getBodyBuffer()
-                                                                                                      .toByteBuffer()
-                                                                                                      .array());
+            UnitTestCase.assertEqualsByteArrays(msgWritten.getBodyBuffer().writerIndex(),
+                                                msgWritten.getBodyBuffer().toByteBuffer().array(),
+                                                msg.getMessage(null).getBodyBuffer().toByteBuffer().array());
          }
       }
 
@@ -814,7 +813,7 @@
       {
          return null;
       }
-      
+
       public void deletePageStore(SimpleString storeName) throws Exception
       {
       }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -398,13 +398,13 @@
          return null;
       }
 
-      public boolean page(final boolean duplicateDetection) throws Exception
+      public boolean page() throws Exception
       {
          // TODO Auto-generated method stub
          return false;
       }
 
-      public boolean page(final long transactionID, final boolean duplicateDetection) throws Exception
+      public boolean page(final long transactionID) throws Exception
       {
          // TODO Auto-generated method stub
          return false;

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2010-08-06 12:52:15 UTC (rev 9514)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java	2010-08-06 16:58:45 UTC (rev 9515)
@@ -15,10 +15,12 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.RoutingContext;
@@ -88,8 +90,7 @@
     */
    public DuplicateIDCache getDuplicateIDCache(final SimpleString address)
    {
-      // TODO Auto-generated method stub
-      return null;
+      return new DuplicateIDCacheImpl(address, 2000, new NullStorageManager(), false);
    }
 
    /* (non-Javadoc)



More information about the hornetq-commits mailing list