[hornetq-commits] JBoss hornetq SVN: r9854 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/cursor/impl and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 8 16:21:11 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-08 16:21:08 -0500 (Mon, 08 Nov 2010)
New Revision: 9854

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
changes

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagedMessage.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -30,6 +30,9 @@
 {
    ServerMessage getMessage();
    
+   /** The queues that were routed during paging */
+   long[] getQueueIDs();
+   
    void initMessage(StorageManager storageManager);
 
    long getTransactionID();

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -13,11 +13,10 @@
 
 package org.hornetq.core.paging;
 
-import java.util.List;
-
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 
@@ -59,10 +58,8 @@
 
    void sync() throws Exception;
 
-   boolean page(List<ServerMessage> messages, long transactionId) throws Exception;
+   boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
 
-   boolean page(ServerMessage message) throws Exception;
-
    Page createPage(final int page) throws Exception;
    
    PagingManager getPagingManager();

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -32,6 +32,7 @@
 import org.hornetq.core.paging.cursor.PageSubscription;
 import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Future;
 import org.hornetq.utils.SoftValueHashMap;
@@ -135,6 +136,12 @@
          else if (retPos != null)
          {
             cursorPos = retPos.getPosition();
+            
+            if (!routed(retPos.getPagedMessage(), cursor))
+            {
+               cursor.positionIgnored(cursorPos);
+            }
+            else
             if (retPos.getPagedMessage().getTransactionID() != 0)
             {
                PageTransactionInfo tx = pagingManager.getTransaction(retPos.getPagedMessage().getTransactionID());
@@ -160,6 +167,20 @@
          }
       }
    }
+   
+   private boolean routed(PagedMessage message, PageSubscription subs)
+   {
+      long id = subs.getId();
+      
+      for (long qid : message.getQueueIDs())
+      {
+         if (qid == id)
+         {
+            return true;
+         }
+      }
+      return false;
+   }
 
    private PagedReferenceImpl internalGetNext(final PagePosition pos)
    {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -49,17 +49,20 @@
    private byte[] largeMessageLazyData;
 
    private ServerMessage message;
+   
+   private long queueIDs[];
 
    private long transactionID = 0;
 
-   public PagedMessageImpl(final ServerMessage message, final long transactionID)
+   public PagedMessageImpl(final ServerMessage message, final long[] queueIDs, final long transactionID)
    {
-      this.message = message;
+      this(message, queueIDs);
       this.transactionID = transactionID;
    }
 
-   public PagedMessageImpl(final ServerMessage message)
+   public PagedMessageImpl(final ServerMessage message, final long[] queueIDs)
    {
+      this.queueIDs = queueIDs;
       this.message = message;
    }
 
@@ -87,6 +90,11 @@
    {
       return transactionID;
    }
+   
+   public long[] getQueueIDs()
+   {
+      return queueIDs;
+   }
 
    // EncodingSupport implementation --------------------------------
 
@@ -112,6 +120,15 @@
 
          message.decode(buffer);
       }
+      
+      int queueIDsSize = buffer.readInt();
+      
+      queueIDs = new long[queueIDsSize];
+      
+      for (int i = 0 ; i < queueIDsSize; i++)
+      {
+         queueIDs[i] = buffer.readLong();
+      }
    }
 
    public void encode(final HornetQBuffer buffer)
@@ -123,11 +140,19 @@
       buffer.writeInt(message.getEncodeSize());
 
       message.encode(buffer);
+      
+      buffer.writeInt(queueIDs.length);
+      
+      for (int i = 0 ; i < queueIDs.length; i++)
+      {
+         buffer.writeLong(queueIDs[i]);
+      }
    }
 
    public int getEncodeSize()
    {
-      return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize();
+      return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() + 
+             DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -25,7 +25,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.hornetq.api.core.SimpleString;
@@ -46,6 +45,7 @@
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
@@ -304,19 +304,13 @@
       return storeName;
    }
 
-   public boolean page(final List<ServerMessage> message, final long transactionID) throws Exception
+   public boolean page(final ServerMessage message, final RoutingContext ctx) throws Exception
    {
       // The sync on transactions is done on commit only
-      return page(message, transactionID, false);
+      // TODO: sync on paging
+      return page(message, ctx, false);
    }
 
-   public boolean page(final ServerMessage message) throws Exception
-   {
-      // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
-      // of crash
-      return page(Arrays.asList(message), -1, syncNonTransactional && message.isDurable());
-   }
-
    public void sync() throws Exception
    {
       lock.readLock().lock();
@@ -881,7 +875,7 @@
 
    }
 
-   protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
+   protected boolean page(ServerMessage message, final RoutingContext ctx, final boolean sync) throws Exception
    {
       if (!running)
       {
@@ -939,37 +933,27 @@
             return false;
          }
 
-         for (ServerMessage message : messages)
+         PagedMessage pagedMessage;
+
+         if (!message.isDurable())
          {
-            PagedMessage pagedMessage;
+            // The address should never be transient when paging (even for non-persistent messages when paging)
+            // This will force everything to be persisted
+            message.bodyChanged();
+         }
 
-            if (!message.isDurable())
-            {
-               // The address should never be transient when paging (even for non-persistent messages when paging)
-               // This will force everything to be persisted
-               message.bodyChanged();
-            }
+         pagedMessage = new PagedMessageImpl(message, getQueueIDs(ctx), getTransactionID(ctx));
 
-            if (transactionID != -1)
-            {
-               pagedMessage = new PagedMessageImpl(message, transactionID);
-            }
-            else
-            {
-               pagedMessage = new PagedMessageImpl(message);
-            }
+         int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
-            int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
+         if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
+         {
+            // Make sure nothing is currently validating or using currentPage
+            openNewPage();
+         }
 
-            if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
-            {
-               // Make sure nothing is currently validating or using currentPage
-               openNewPage();
-            }
+         currentPage.write(pagedMessage);
 
-            currentPage.write(pagedMessage);
-         }
-
          return true;
       }
       finally
@@ -979,6 +963,36 @@
 
    }
 
+   private long[] getQueueIDs(RoutingContext ctx)
+   {
+      long ids[] = new long [ctx.getDurableQueues().size() + ctx.getNonDurableQueues().size()];
+      int i = 0;
+      
+      for (org.hornetq.core.server.Queue q : ctx.getDurableQueues())
+      {
+         ids[i++] = q.getID();
+      }
+      
+      for (org.hornetq.core.server.Queue q : ctx.getNonDurableQueues())
+      {
+         ids[i++] = q.getID();
+      }
+      return ids;
+   }
+   
+   private long getTransactionID(RoutingContext ctx)
+   {
+      Transaction tx = ctx.getTransaction();
+      if (tx == null)
+      {
+         return 0l;
+      }
+      else
+      {
+         return tx.getID();
+      }
+   }
+
    /**
     * This method will remove files from the page system and and route them, doing it transactionally
     *     

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -227,16 +227,6 @@
    {
       return pageStore;
    }
-
-   public void paged(final ServerMessage message)
-   {
-      
-   }
-
-   public boolean page(final ServerMessage message) throws Exception
-   {
-      return pageStore.page(message);
-   }
    
    public void route(final ServerMessage message, final RoutingContext context) throws Exception
    {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/ServerMessage.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -56,10 +56,6 @@
 
    PagingStore getPagingStore();
 
-   boolean page() throws Exception;
-
-   boolean page(long transactionID) throws Exception;
-
    boolean storeIsPaging();
 
    void encodeMessageIDToBuffer();

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -249,30 +249,6 @@
       return pagingStore;
    }
 
-   public boolean page() throws Exception
-   {
-      if (pagingStore != null)
-      {
-         return pagingStore.page(this);
-      }
-      else
-      {
-         return false;
-      }
-   }
-
-   public boolean page(final long transactionID) throws Exception
-   {
-      if (pagingStore != null)
-      {
-         return pagingStore.page(Arrays.asList((ServerMessage)this), transactionID);
-      }
-      else
-      {
-         return false;
-      }
-   }
-
    public boolean storeIsPaging()
    {
       if (pagingStore != null)

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -1014,9 +1014,9 @@
                   syncNonTransactional);
          }
 
-         protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
+         protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx, boolean sync) throws Exception
          {
-            boolean paged = super.page(messages, transactionID, sync);
+            boolean paged = super.page(message, ctx, sync);
 
             if (paged)
             {

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -42,11 +42,14 @@
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.LinkedListIterator;
@@ -70,6 +73,8 @@
    private HornetQServer server;
 
    private Queue queue;
+   
+   private List<Queue> queueList;
 
    private static final int PAGE_MAX = -1;
 
@@ -155,10 +160,6 @@
 
       final int NUM_MESSAGES = 100;
 
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
       PageSubscription cursorEven = createNonPersistentCursor(new Filter()
       {
 
@@ -205,6 +206,10 @@
 
       });
 
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
       queue.getPageSubscription().close();
 
       PagedReference msg;
@@ -493,6 +498,8 @@
                                            .getSubscription(queue.getID());
 
       System.out.println("Cursor: " + cursor);
+      
+      RoutingContextImpl ctx = generateCTX();
 
       LinkedListIterator<PagedReference> iterator = cursor.iterator();
 
@@ -508,7 +515,7 @@
 
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-         Assert.assertTrue(pageStore.page(msg));
+         Assert.assertTrue(pageStore.page(msg, ctx));
 
          PagedReference readMessage = iterator.next();
 
@@ -545,7 +552,7 @@
 
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-            Assert.assertTrue(pageStore.page(msg));
+            Assert.assertTrue(pageStore.page(msg, ctx));
          }
 
          PagedReference readMessage = iterator.next();
@@ -581,7 +588,7 @@
 
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-            Assert.assertTrue(pageStore.page(msg));
+            Assert.assertTrue(pageStore.page(msg, ctx));
          }
 
          PagedReference readMessage = iterator.next();
@@ -615,6 +622,24 @@
       assertFalse(lookupPageStore(ADDRESS).isPaging());
 
    }
+   
+   private RoutingContextImpl generateCTX()
+   {
+      return generateCTX(null);
+   }
+   
+   private RoutingContextImpl generateCTX(Transaction tx)
+   {
+      RoutingContextImpl ctx = new RoutingContextImpl(tx);
+      ctx.addDurableQueue(queue);
+      
+      for (Queue q : this.queueList)
+      {
+         ctx.addQueue(q);
+      }
+      
+      return ctx;
+   }
 
    /**
     * @throws Exception
@@ -783,15 +808,19 @@
 
       final int NUM_MESSAGES = 100;
 
-      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
-
-      System.out.println("NumberOfPages = " + numberOfPages);
-
       PageCursorProvider cursorProvider = lookupCursorProvider();
 
       PageSubscription cursor = cursorProvider.createSubscription(11, null, false);
       PageSubscriptionImpl cursor2 = (PageSubscriptionImpl)cursorProvider.createSubscription(12, null, false);
+      
+      this.queueList.add(new FakeQueue(new SimpleString("a"), 11));
+      
+      this.queueList.add(new FakeQueue(new SimpleString("b"), 12));
 
+      int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
+
+      System.out.println("NumberOfPages = " + numberOfPages);
+
       queue.getPageSubscription().close();
 
       PagedReference msg;
@@ -856,16 +885,18 @@
 
       final int NUM_MESSAGES = 100;
 
+      PageCursorProvider cursorProvider = lookupCursorProvider();
+
+      PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
+      
+      queueList.add(new FakeQueue(new SimpleString("tmp"), 2));
+
       int numberOfPages = addMessages(NUM_MESSAGES, 1024 * 1024);
 
       System.out.println("NumberOfPages = " + numberOfPages);
 
-      PageCursorProvider cursorProvider = lookupCursorProvider();
-
       PageCache cache = cursorProvider.getPageCache(new PagePositionImpl(5, 0));
 
-      PageSubscription cursor = cursorProvider.createSubscription(2, null, false);
-
       queue.getPageSubscription().close();
 
       PagePosition startingPos = new PagePositionImpl(5, cache.getNumberOfMessages() / 2);
@@ -1044,6 +1075,8 @@
       PagingStoreImpl pageStore = lookupPageStore(ADDRESS);
 
       pageStore.startPaging();
+      
+      RoutingContext ctx = generateCTX();
 
       for (int i = start; i < start + numMessages; i++)
       {
@@ -1058,7 +1091,7 @@
 
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-         Assert.assertTrue(pageStore.page(msg));
+         Assert.assertTrue(pageStore.page(msg, ctx));
       }
 
       return pageStore.getNumberOfPages();
@@ -1077,12 +1110,23 @@
 
    // Protected -----------------------------------------------------
 
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+      server = null;
+      queue = null;
+      queueList = null;
+      super.tearDown();
+   }
+
    protected void setUp() throws Exception
    {
       super.setUp();
       OperationContextImpl.clearContext();
       System.out.println("Tmp:" + getTemporaryDir());
 
+      queueList = new ArrayList<Queue>();
+      
       createServer();
    }
 
@@ -1117,7 +1161,9 @@
     */
    private PageSubscription createNonPersistentCursor(Filter filter) throws Exception
    {
-      return lookupCursorProvider().createSubscription(server.getStorageManager().generateUniqueID(), filter, false);
+      long id = server.getStorageManager().generateUniqueID();
+      queueList.add(new FakeQueue(new SimpleString(filter.toString()), id));
+      return lookupCursorProvider().createSubscription(id, filter, false);
    }
 
    /**
@@ -1145,7 +1191,10 @@
                            final int NUM_MESSAGES,
                            final int messageSize) throws Exception
    {
-      List<ServerMessage> messages = new ArrayList<ServerMessage>();
+      
+      TransactionImpl txImpl = new TransactionImpl(pgParameter.getTransactionID(), null, null);
+      
+      RoutingContext ctx = generateCTX(txImpl);
 
       for (int i = start; i < start + NUM_MESSAGES; i++)
       {
@@ -1153,20 +1202,11 @@
          ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
          msg.putIntProperty("key", i);
-         messages.add(msg);
+         pageStore.page(msg, ctx);
       }
 
-      pageStore.page(messages, pgParameter.getTransactionID());
    }
 
-   protected void tearDown() throws Exception
-   {
-      server.stop();
-      server = null;
-      queue = null;
-      super.tearDown();
-   }
-
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -288,7 +288,7 @@
 
          replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
 
-         PagedMessage pgmsg = new PagedMessageImpl(msg, -1);
+         PagedMessage pgmsg = new PagedMessageImpl(msg, new long[0]);
          manager.pageWrite(pgmsg, 1);
          manager.pageWrite(pgmsg, 2);
          manager.pageWrite(pgmsg, 3);

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -223,7 +223,7 @@
 
          msg.setAddress(simpleDestination);
 
-         page.write(new PagedMessageImpl(msg));
+         page.write(new PagedMessageImpl(msg, new long [0]));
 
          Assert.assertEquals(initialNumberOfMessages + i + 1, page.getNumberOfMessages());
       }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -28,6 +28,7 @@
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -81,11 +82,11 @@
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
 
-      Assert.assertFalse(store.page(msg));
+      Assert.assertFalse(store.page(msg, new RoutingContextImpl(null)));
 
       store.startPaging();
 
-      Assert.assertTrue(store.page(msg));
+      Assert.assertTrue(store.page(msg, new RoutingContextImpl(null)));
 
       Page page = store.depage();
 
@@ -107,7 +108,7 @@
 
       Assert.assertNull(store.depage());
 
-      Assert.assertFalse(store.page(msg));
+      Assert.assertFalse(store.page(msg, new RoutingContextImpl(null)));
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -64,6 +64,7 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
+import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -202,7 +203,7 @@
 
       Assert.assertTrue(storeImpl.isPaging());
 
-      Assert.assertTrue(storeImpl.page(msg));
+      Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
 
       Assert.assertEquals(1, storeImpl.getNumberOfPages());
 
@@ -265,7 +266,7 @@
 
          ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
 
-         Assert.assertTrue(storeImpl.page(msg));
+         Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
       }
 
       Assert.assertEquals(1, storeImpl.getNumberOfPages());
@@ -345,7 +346,7 @@
 
          ServerMessage msg = createMessage(i, storeImpl, destination, buffer);
 
-         Assert.assertTrue(storeImpl.page(msg));
+         Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
       }
 
       Assert.assertEquals(2, storeImpl.getNumberOfPages());
@@ -381,7 +382,7 @@
 
       ServerMessage msg = createMessage(1, storeImpl, destination, buffers.get(0));
 
-      Assert.assertTrue(storeImpl.page(msg));
+      Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
 
       Page newPage = storeImpl.depage();
 
@@ -399,11 +400,11 @@
 
       Assert.assertFalse(storeImpl.isPaging());
 
-      Assert.assertFalse(storeImpl.page(msg));
+      Assert.assertFalse(storeImpl.page(msg, new RoutingContextImpl(null)));
 
       storeImpl.startPaging();
 
-      Assert.assertTrue(storeImpl.page(msg));
+      Assert.assertTrue(storeImpl.page(msg, new RoutingContextImpl(null)));
 
       Page page = storeImpl.depage();
 
@@ -499,7 +500,7 @@
                   // This is possible because the depage thread is not actually reading the pages.
                   // Just using the internal API to remove it from the page file system
                   ServerMessage msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5));
-                  if (storeImpl.page(msg))
+                  if (storeImpl.page(msg, new RoutingContextImpl(null)))
                   {
                      buffers.put(id, msg);
                   }
@@ -644,7 +645,7 @@
       long lastMessageId = messageIdGenerator.incrementAndGet();
       ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
 
-      storeImpl2.page(lastMsg);
+      storeImpl2.page(lastMsg, new RoutingContextImpl(null));
       buffers2.put(lastMessageId, lastMsg);
 
       Page lastPage = null;
@@ -752,7 +753,7 @@
                   // Just using the internal API to remove it from the page file system
                   ServerMessage msg = createMessage(i, storeImpl, destination, createRandomBuffer(i, 1024));
                   msg.putLongProperty("count", i);
-                  while (!storeImpl.page(msg))
+                  while (!storeImpl.page(msg, new RoutingContextImpl(null)))
                   {
                      storeImpl.startPaging();
                   }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-11-08 18:03:29 UTC (rev 9853)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-11-08 21:21:08 UTC (rev 9854)
@@ -92,10 +92,18 @@
    }
 
    private final SimpleString name;
+   
+   private final long id;
 
    public FakeQueue(final SimpleString name)
    {
+      this(name, 0);
+   }
+   
+   public FakeQueue(final SimpleString name, final long id)
+   {
       this.name = name;
+      this.id = id;
    }
 
    /* (non-Javadoc)
@@ -354,8 +362,7 @@
     */
    public long getID()
    {
-      // TODO Auto-generated method stub
-      return 0;
+      return id;
    }
 
    /* (non-Javadoc)



More information about the hornetq-commits mailing list