[hornetq-commits] JBoss hornetq SVN: r9607 - in branches/Branch_2_1: src/main/org/hornetq/core/paging and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Aug 29 16:01:45 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-29 16:01:42 -0400 (Sun, 29 Aug 2010)
New Revision: 9607

Modified:
   branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
   branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-497 & https://jira.jboss.org/browse/HORNETQ-485 - Fixes on Paging and Journal

Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -108,7 +108,7 @@
 
    private static final void traceRecord(final String message)
    {
-      System.out.println(message);
+      JournalImpl.log.trace(message);
    }
 
    // The sizes of primitive types
@@ -838,7 +838,11 @@
 
             if (JournalImpl.TRACE_RECORDS)
             {
-               JournalImpl.traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+               JournalImpl.traceRecord("appendAddRecord::id=" + id +
+                                       ", userRecordType=" +
+                                       recordType +
+                                       ", usedFile = " +
+                                       usedFile);
             }
 
             records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
@@ -919,7 +923,11 @@
 
             if (JournalImpl.TRACE_RECORDS)
             {
-               JournalImpl.traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+               JournalImpl.traceRecord("appendUpdateRecord::id=" + id +
+                                       ", userRecordType=" +
+                                       recordType +
+                                       ", usedFile = " +
+                                       usedFile);
             }
 
             // record== null here could only mean there is a compactor, and computing the delete should be done after
@@ -1060,6 +1068,8 @@
                JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
                                        ",id=" +
                                        id +
+                                       ", userRecordType=" +
+                                       recordType +
                                        ", usedFile = " +
                                        usedFile);
             }
@@ -1113,6 +1123,8 @@
                JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
                                        ",id=" +
                                        id +
+                                       ", userRecordType=" +
+                                       recordType +
                                        ", usedFile = " +
                                        usedFile);
             }
@@ -2105,6 +2117,8 @@
 
       filesRepository.pushOpenedFile();
 
+      state = JournalImpl.STATE_LOADED;
+
       for (TransactionHolder transaction : loadTransactions.values())
       {
          if (!transaction.prepared || transaction.invalid)
@@ -2112,19 +2126,9 @@
             JournalImpl.log.warn("Uncommitted transaction with id " + transaction.transactionID +
                                  " found and discarded");
 
-            JournalTransaction transactionInfo = transactions.get(transaction.transactionID);
+            // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
+            this.appendRollbackRecord(transaction.transactionID, false);
 
-            if (transactionInfo == null)
-            {
-               throw new IllegalStateException("Cannot find tx " + transaction.transactionID);
-            }
-
-            // Reverse the refs
-            transactionInfo.forget();
-
-            // Remove the transactionInfo
-            transactions.remove(transaction.transactionID);
-
             loadManager.failedTransaction(transaction.transactionID,
                                           transaction.recordInfos,
                                           transaction.recordsToDelete);
@@ -2149,8 +2153,6 @@
          }
       }
 
-      state = JournalImpl.STATE_LOADED;
-
       checkReclaimStatus();
 
       return new JournalLoadInformation(records.size(), maxID.longValue());

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,6 +14,8 @@
 package org.hornetq.core.paging;
 
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
 
 /**
  * 
@@ -37,10 +39,15 @@
    void setRecordID(long id);
 
    long getTransactionID();
+   
+   void store(StorageManager storageManager,Transaction tx) throws Exception;
+   
+   void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws Exception;
 
-   int increment();
+   // To be used after the update was stored or reload
+   void update(int update, StorageManager storageManager);
 
-   int decrement();
+   void increment();
 
    int getNumberOfMessages();
 

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -18,7 +18,11 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.utils.DataConstants;
 
 /**
@@ -30,11 +34,13 @@
 {
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(PageTransactionInfoImpl.class);
+
    // Attributes ----------------------------------------------------
 
    private long transactionID;
 
-   private volatile long recordID;
+   private volatile long recordID = -1;
 
    private volatile CountDownLatch countDownCompleted;
 
@@ -42,7 +48,7 @@
 
    private volatile boolean rolledback;
 
-   private final AtomicInteger numberOfMessages = new AtomicInteger(0);
+   private AtomicInteger numberOfMessages = new AtomicInteger(0);
 
    // Static --------------------------------------------------------
 
@@ -75,21 +81,25 @@
       return transactionID;
    }
 
-   public int increment()
+   public void update(final int update, final StorageManager storageManager)
    {
-      return numberOfMessages.incrementAndGet();
+      int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
+      if (sizeAfterUpdate == 0 && storageManager != null)
+      {
+         try
+         {
+            storageManager.deletePageTransactional(this.recordID);
+         }
+         catch (Exception e)
+         {
+            log.warn("Can't delete page transaction id=" + this.recordID);
+         }
+      }
    }
 
-   public int decrement()
+   public void increment()
    {
-      final int value = numberOfMessages.decrementAndGet();
-
-      if (value < 0)
-      {
-         throw new IllegalStateException("Internal error Negative value on Paging transactions!");
-      }
-
-      return value;
+      numberOfMessages.incrementAndGet();
    }
 
    public int getNumberOfMessages()
@@ -103,10 +113,8 @@
    {
       transactionID = buffer.readLong();
       numberOfMessages.set(buffer.readInt());
-      countDownCompleted = null; // if it is being readed, probably it was
-      // committed
-      committed = true; // Unless it is a incomplete prepare, which is marked by
-      // markIcomplete
+      countDownCompleted = null;
+      committed = true;
    }
 
    public synchronized void encode(final HornetQBuffer buffer)
@@ -141,6 +149,49 @@
       }
    }
 
+   public void store(final StorageManager storageManager, final Transaction tx) throws Exception
+   {
+      storageManager.storePageTransaction(tx.getID(), this);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
+    */
+   public void storeUpdate(final StorageManager storageManager, final Transaction tx, final int depages) throws Exception
+   {
+      storageManager.updatePageTransaction(tx.getID(), this, depages);
+      
+      final PageTransactionInfo pgToUpdate = this;
+      
+      tx.addOperation(new TransactionOperation()
+      {
+         public void beforeRollback(Transaction tx) throws Exception
+         {
+         }
+         
+         public void beforePrepare(Transaction tx) throws Exception
+         {
+         }
+         
+         public void beforeCommit(Transaction tx) throws Exception
+         {
+         }
+         
+         public void afterRollback(Transaction tx)
+         {
+         }
+         
+         public void afterPrepare(Transaction tx)
+         {
+         }
+         
+         public void afterCommit(Transaction tx)
+         {
+            pgToUpdate.update(depages, storageManager);
+         }
+      });
+   }
+
    public boolean isCommit()
    {
       return committed;
@@ -166,6 +217,16 @@
       countDownCompleted = new CountDownLatch(1);
    }
 
+   public String toString()
+   {
+      return "PageTransactionInfoImpl(transactionID=" + transactionID +
+             ",id=" +
+             recordID +
+             ",numberOfMessages=" +
+             numberOfMessages +
+             ")";
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,8 +14,9 @@
 package org.hornetq.core.paging.impl;
 
 import java.text.DecimalFormat;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
@@ -958,7 +959,7 @@
 
       depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
 
-      HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+      HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
 
       for (PagedMessage pagedMessage : pagedMessages)
       {
@@ -978,6 +979,7 @@
          final long transactionIdDuringPaging = pagedMessage.getTransactionID();
 
          PageTransactionInfo pageUserTransaction = null;
+         AtomicInteger countPageTX = null;
 
          if (transactionIdDuringPaging >= 0)
          {
@@ -992,6 +994,12 @@
             }
             else
             {
+               countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
+               if (countPageTX == null)
+               {
+                  countPageTX = new AtomicInteger();
+                  pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
+               }
 
                // This is to avoid a race condition where messages are depaged
                // before the commit arrived
@@ -1036,8 +1044,7 @@
          // This needs to be done after routing because of duplication detection
          if (pageUserTransaction != null && message.isDurable())
          {
-            pageUserTransaction.decrement();
-            pageTransactionsToUpdate.add(pageUserTransaction);
+            countPageTX.incrementAndGet();
          }
       }
 
@@ -1047,21 +1054,12 @@
          return false;
       }
 
-      for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+      for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : pageTransactionsToUpdate.entrySet())
       {
          // This will set the journal transaction to commit;
          depageTransaction.setContainsPersistent();
-
-         if (pageWithTransaction.getNumberOfMessages() == 0)
-         {
-            // numberOfReads==numberOfWrites -> We delete the record
-            storageManager.deletePageTransactional(depageTransaction.getID(), pageWithTransaction.getRecordID());
-            pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
-         }
-         else
-         {
-            storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
-         }
+         
+         entry.getKey().storeUpdate(storageManager, depageTransaction, entry.getValue().intValue());
       }
 
       depageTransaction.commit();

Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -134,8 +134,10 @@
    void rollback(long txID) throws Exception;
 
    void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
+   
+   void updatePageTransaction(long txID, PageTransactionInfo pageTransaction,  int depage) throws Exception;
 
-   void deletePageTransactional(long txID, long recordID) throws Exception;
+   void deletePageTransactional(long recordID) throws Exception;
 
    /** This method is only useful at the backup side. We only load internal structures making the journals ready for
     *  append mode on the backup side. */

Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -575,13 +575,6 @@
 
    public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
    {
-      if (pageTransaction.getRecordID() != 0)
-      {
-         // Instead of updating the record, we delete the old one as that is
-         // better for reclaiming
-         messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
-      }
-
       pageTransaction.setRecordID(generateUniqueID());
 
       messageJournal.appendAddRecordTransactional(txID,
@@ -590,6 +583,11 @@
                                                   pageTransaction);
    }
 
+   public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
+   {
+      messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+   }
+
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
       messageJournal.appendUpdateRecordTransactional(txID,
@@ -623,9 +621,9 @@
       messageJournal.appendDeleteRecord(id, true, getContext(true));
    }
 
-   public void deletePageTransactional(final long txID, final long recordID) throws Exception
+   public void deletePageTransactional(final long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      messageJournal.appendDeleteRecord(recordID, false);
    }
 
    public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -907,14 +905,27 @@
             }
             case PAGE_TRANSACTION:
             {
-               PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+               if (record.isUpdate)
+               {
+                  PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+                  
+                  pageUpdate.decode(buff);
+                  
+                  PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
+                  
+                  pageTX.update(pageUpdate.recods, null);
+               }
+               else
+               {
+                  PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+   
+                  pageTransactionInfo.decode(buff);
+   
+                  pageTransactionInfo.setRecordID(record.id);
+   
+                  pagingManager.addTransaction(pageTransactionInfo);
+               }
 
-               pageTransactionInfo.decode(buff);
-
-               pageTransactionInfo.setRecordID(record.id);
-
-               pagingManager.addTransaction(pageTransactionInfo);
-
                break;
             }
             case SET_SCHEDULED_DELIVERY_TIME:
@@ -2007,7 +2018,49 @@
          super(queueID);
       }
    }
+   
+   private static class PageUpdateTXEncoding implements EncodingSupport
+   {
+      
+      public long pageTX;
+      
+      public int recods;
+      
+      public PageUpdateTXEncoding()
+      {
+      }
+      
+      public PageUpdateTXEncoding(final long pageTX, final int records)
+      {
+         this.pageTX = pageTX;
+         this.recods = records;
+      }
+      
+      public void decode(HornetQBuffer buffer)
+      {
+         this.pageTX = buffer.readLong();
+         this.recods = buffer.readInt();
+      }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+       */
+      public void encode(HornetQBuffer buffer)
+      {
+         buffer.writeLong(pageTX);
+         buffer.writeInt(recods);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+       */
+      public int getEncodeSize()
+      {
+         return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+      }
+      
+   }
+
    private static class ScheduledDeliveryEncoding extends QueueEncoding
    {
       long scheduledDeliveryTime;

Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -436,4 +436,18 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+    */
+   public void deletePageTransactional(long recordID) throws Exception
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+    */
+   public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+   {
+   }
+
 }

Modified: branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -58,9 +58,9 @@
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.UUIDGenerator;
@@ -1242,7 +1242,7 @@
                      store.sync();
                   }
 
-                  storageManager.storePageTransaction(tx.getID(), pageTransaction);
+                  pageTransaction.store(storageManager, tx);
                }
             }
          }

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,8 +14,10 @@
 package org.hornetq.tests.integration.client;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 import junit.framework.AssertionFailedError;
@@ -30,6 +32,7 @@
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.server.HornetQServer;
@@ -38,7 +41,6 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.DataConstants;
 
 /**
  * A PagingTest
@@ -86,12 +88,192 @@
       internaltestSendReceivePaging(true);
    }
 
-
    public void testSendReceivePagingNonPersistent() throws Exception
    {
       internaltestSendReceivePaging(false);
    }
 
+   public void testWithDiverts() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      DivertConfiguration divert1 = new DivertConfiguration("dv1",
+                                                            "nm1",
+                                                            PagingTest.ADDRESS.toString(),
+                                                            PagingTest.ADDRESS.toString() + "-1",
+                                                            true,
+                                                            null,
+                                                            null);
+
+      DivertConfiguration divert2 = new DivertConfiguration("dv2",
+                                                            "nm2",
+                                                            PagingTest.ADDRESS.toString(),
+                                                            PagingTest.ADDRESS.toString() + "-2",
+                                                            true,
+                                                            null,
+                                                            null);
+
+      ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+      divertList.add(divert1);
+      divertList.add(divert2);
+
+      config.setDivertConfigurations(divertList);
+
+      server.start();
+
+      final int numberOfIntegers = 256;
+
+      final int numberOfMessages = 30000;
+
+      final byte[] body = new byte[numberOfIntegers * 4];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= numberOfIntegers; j++)
+      {
+         bb.putInt(j);
+      }
+
+      try
+      {
+         {
+            ClientSessionFactory sf = createInVMFactory();
+
+            sf.setBlockOnNonDurableSend(true);
+            sf.setBlockOnDurableSend(true);
+            sf.setBlockOnAcknowledge(true);
+
+            ClientSession session = sf.createSession(false, false, false);
+
+            session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS + "-1", null, true);
+
+            session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS + "-2", null, true);
+
+            ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+            ClientMessage message = null;
+
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+               message = session.createMessage(true);
+
+               HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+               bodyLocal.writeBytes(body);
+
+               message.putIntProperty(new SimpleString("id"), i);
+
+               producer.send(message);
+            }
+
+            session.commit();
+
+            session.close();
+
+            server.stop();
+         }
+
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+
+         final ClientSessionFactory sf2 = createInVMFactory();
+
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         Thread threads[] = new Thread[2];
+
+         for (int start = 1; start <= 2; start++)
+         {
+
+            final String addressToSubscribe = PagingTest.ADDRESS + "-" + start;
+
+            threads[start - 1] = new Thread()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     ClientSession session = sf2.createSession(null, null, false, true, true, false, 0);
+
+                     ClientConsumer consumer = session.createConsumer(addressToSubscribe);
+
+                     session.start();
+
+                     for (int i = 0; i < numberOfMessages; i++)
+                     {
+                        ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+                        Assert.assertNotNull(message2);
+
+                        Assert.assertEquals(i, message2.getIntProperty("id").intValue());
+
+                        message2.acknowledge();
+
+                        Assert.assertNotNull(message2);
+
+                        session.commit();
+
+                        try
+                        {
+                           assertBodiesEqual(body, message2.getBodyBuffer());
+                        }
+                        catch (AssertionFailedError e)
+                        {
+                           PagingTest.log.info("Expected buffer:" + UnitTestCase.dumbBytesHex(body, 40));
+                           PagingTest.log.info("Arriving buffer:" + UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+                                                                                                      .toByteBuffer()
+                                                                                                      .array(), 40));
+                           throw e;
+                        }
+                     }
+
+                     consumer.close();
+                  }
+                  catch (Throwable e)
+                  {
+                     e.printStackTrace();
+                     errors.incrementAndGet();
+                  }
+
+               }
+            };
+         }
+
+         for (int i = 0; i < 2; i++)
+         {
+            threads[i].start();
+         }
+
+         for (int i = 0; i < 2; i++)
+         {
+            threads[i].join();
+         }
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
    {
       clearData();
@@ -150,11 +332,10 @@
 
          session.close();
 
-
          if (persistentMessages)
          {
             server.stop();
-   
+
             server = createServer(true,
                                   config,
                                   PagingTest.PAGE_SIZE,
@@ -360,8 +541,7 @@
       }
 
    }
-   
-   
+
    /**
     * - Make a destination in page mode
     * - Add stuff to a transaction
@@ -396,7 +576,7 @@
          sf.setBlockOnNonDurableSend(true);
          sf.setBlockOnDurableSend(true);
          sf.setBlockOnAcknowledge(true);
-         
+
          byte[] body = new byte[messageSize];
 
          ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
@@ -408,7 +588,7 @@
          ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
          firstMessage.getBodyBuffer().writeBytes(body);
          firstMessage.putIntProperty(new SimpleString("id"), 0);
-         
+
          producerTransacted.send(firstMessage);
 
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -479,7 +659,7 @@
 
             Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
 
-//             System.out.println(messageID);
+            // System.out.println(messageID);
             Assert.assertNotNull(messageID);
             Assert.assertEquals("message received out of order", i, messageID.intValue());
 
@@ -505,8 +685,6 @@
 
    }
 
-  
-
    public void testPageOnSchedulingNoRestart() throws Exception
    {
       internalTestPageOnScheduling(false);
@@ -756,8 +934,6 @@
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
 
          ClientMessage message = null;
-         
-         
 
          for (int i = 0; i < numberOfMessages; i++)
          {
@@ -955,7 +1131,7 @@
       }
 
    }
-   
+
    public void testDropMessagesExpiring() throws Exception
    {
       clearData();
@@ -969,7 +1145,7 @@
 
       settings.put(PagingTest.ADDRESS.toString(), set);
 
-      HornetQServer server = createServer(true, config, 1024,  1024 * 1024, settings);
+      HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
 
       server.start();
 
@@ -978,7 +1154,7 @@
       try
       {
          ClientSessionFactory sf = createInVMFactory();
-         
+
          sf.setAckBatchSize(0);
 
          ClientSession session = sf.createSession();
@@ -988,7 +1164,7 @@
          ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
 
          ClientMessage message = null;
-         
+
          class MyHandler implements MessageHandler
          {
             int count;
@@ -1001,16 +1177,16 @@
                }
                catch (Exception e)
                {
-                  
+
                }
-               
+
                count++;
-               
+
                if (count % 1000 == 0)
                {
                   log.info("received " + count);
                }
-               
+
                try
                {
                   message.acknowledge();
@@ -1019,13 +1195,13 @@
                {
                   e.printStackTrace();
                }
-            }           
+            }
          }
-         
+
          ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
 
          session.start();
-         
+
          consumer.setMessageHandler(new MyHandler());
 
          for (int i = 0; i < numberOfMessages; i++)
@@ -1034,12 +1210,12 @@
 
             message = session.createMessage(false);
             message.getBodyBuffer().writeBytes(body);
-            
+
             message.setExpiration(System.currentTimeMillis() + 100);
 
             producer.send(message);
          }
-         
+
          session.close();
       }
       finally

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -463,8 +463,12 @@
       {
          AlignedJournalImplTest.log.debug("Expected exception " + e, e);
       }
+      
 
       setupAndLoadJournal(JOURNAL_SIZE, 100);
+      
+      journalImpl.forceMoveNextFile();
+      journalImpl.checkReclaimStatus();
 
       Assert.assertEquals(0, records.size());
       Assert.assertEquals(0, transactions.size());

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-08-29 20:01:42 UTC (rev 9607)
@@ -125,22 +125,6 @@
 
       Assert.assertEquals(nr1, trans2.getNumberOfMessages());
 
-      for (int i = 0; i < nr1; i++)
-      {
-         trans.decrement();
-      }
-
-      Assert.assertEquals(0, trans.getNumberOfMessages());
-
-      try
-      {
-         trans.decrement();
-         Assert.fail("Exception expected!");
-      }
-      catch (Throwable ignored)
-      {
-      }
-
    }
 
    public void testDoubleStart() throws Exception
@@ -1355,6 +1339,20 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+       */
+      public void deletePageTransactional(long recordID) throws Exception
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+       */
+      public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+      {
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list