[hornetq-commits] JBoss hornetq SVN: r9856 - in branches/Branch_New_Paging: src/main/org/hornetq/core/paging/impl and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 8 23:14:10 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-08 23:14:09 -0500 (Mon, 08 Nov 2010)
New Revision: 9856

Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
changes

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/PagingStore.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -16,6 +16,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -60,6 +61,8 @@
 
    boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
 
+   boolean page(ServerMessage message, RoutingContext ctx, RouteContextList listCtx) throws Exception;
+
    Page createPage(final int page) throws Exception;
    
    PagingManager getPagingManager();

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -45,6 +45,7 @@
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -303,12 +304,17 @@
    {
       return storeName;
    }
-
+   
    public boolean page(final ServerMessage message, final RoutingContext ctx) throws Exception
    {
+      return page(message, ctx, ctx.getContextListing(storeName));
+   }
+
+   public boolean page(final ServerMessage message, final RoutingContext ctx, RouteContextList listCtx) throws Exception
+   {
       // The sync on transactions is done on commit only
       // TODO: sync on paging
-      return page(message, ctx, false);
+      return page(message, ctx, listCtx, false);
    }
 
    public void sync() throws Exception
@@ -875,7 +881,7 @@
 
    }
 
-   protected boolean page(ServerMessage message, final RoutingContext ctx, final boolean sync) throws Exception
+   protected boolean page(ServerMessage message, final RoutingContext ctx, RouteContextList listCtx, final boolean sync) throws Exception
    {
       if (!running)
       {
@@ -942,7 +948,7 @@
             message.bodyChanged();
          }
 
-         pagedMessage = new PagedMessageImpl(message, getQueueIDs(ctx), getTransactionID(ctx));
+         pagedMessage = new PagedMessageImpl(message, getQueueIDs(listCtx), getTransactionID(ctx));
 
          int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
@@ -963,10 +969,10 @@
 
    }
 
-   private long[] getQueueIDs(RoutingContext ctx)
+   private long[] getQueueIDs(RouteContextList ctx)
    {
-      List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues(address);
-      List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getDurableQueues(address);
+      List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
+      List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getDurableQueues();
       long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
       int i = 0;
       

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -47,6 +47,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.RoutingContextImpl;
@@ -840,12 +841,17 @@
 
       Transaction tx = context.getTransaction();
       
-      for (SimpleString add: context.getAddresses())
+      
+      for (Map.Entry<SimpleString, RouteContextList> entry: context.getContexListing().entrySet())
       {
+         PagingStore store = pagingManager.getPageStore(entry.getKey());
          
-         PagingStore store = pagingManager.getPageStore(add);
+         if (store.page(message, context, entry.getValue()))
+         {
+            continue;
+         }
    
-         for (Queue queue : context.getNonDurableQueues(add))
+         for (Queue queue : entry.getValue().getNonDurableQueues())
          {
             MessageReference reference = message.createReference(queue);
    
@@ -861,7 +867,7 @@
             message.incrementRefCount();
          }
    
-         Iterator<Queue> iter = context.getDurableQueues(add).iterator();
+         Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();
    
          while (iter.hasNext())
          {

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -14,6 +14,7 @@
 package org.hornetq.core.server;
 
 import java.util.List;
+import java.util.Map;
 
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
@@ -35,8 +36,10 @@
 
    void addQueue(SimpleString address, Queue queue);
 
-   Pair<SimpleString, RouteContextList> getContexListing();
+   Map<SimpleString, RouteContextList> getContexListing();
    
+   RouteContextList getContextListing(SimpleString address);
+   
    List<Queue> getNonDurableQueues(SimpleString address);
 
    List<Queue> getDurableQueues(SimpleString address);

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -16,6 +16,7 @@
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -199,12 +200,14 @@
       buff.putLong(remoteQueueID);
 
       message.putBytesProperty(idsHeaderName, ids);
+         
+      List<Queue> durableQueuesOnContext = context.getDurableQueues(address);
 
-      if (!context.getDurableQueues().contains(storeAndForwardQueue))
+      if (!durableQueuesOnContext.contains(storeAndForwardQueue))
       {
          // There can be many remote bindings for the same node, we only want to add the message once to
          // the s & f queue for that node
-         context.addQueue(address, storeAndForwardQueue);
+         durableQueuesOnContext.add(storeAndForwardQueue);
       }
    }
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -37,7 +37,7 @@
 {
    
    // The pair here is Durable and NonDurable
-   private Map<SimpleString, ContextListing> map = new HashMap<SimpleString, ContextListing>();
+   private Map<SimpleString, RouteContextList> map = new HashMap<SimpleString, RouteContextList>();
 
    private Transaction transaction;
 
@@ -60,23 +60,23 @@
    public void addQueue(final SimpleString address, final Queue queue)
    {
 
-      ContextListing listing = getContextListing(address);
+      RouteContextList listing = getContextListing(address);
       
       if (queue.isDurable())
       {
-         listing.durableQueues.add(queue);
+         listing.getDurableQueues().add(queue);
       }
       else
       {
-         listing.durableQueues.add(queue);
+         listing.getNonDurableQueues().add(queue);
       }
 
       queueCount++;
    }
    
-   private ContextListing getContextListing(SimpleString address)
+   public RouteContextList getContextListing(SimpleString address)
    {
-      ContextListing listing = map.get(address);
+      RouteContextList listing = map.get(address);
       if (listing == null)
       {
          listing = new ContextListing();
@@ -97,12 +97,12 @@
 
    public List<Queue> getNonDurableQueues(SimpleString address)
    {
-      return getContextListing(address).nonDurableQueues;
+      return getContextListing(address).getNonDurableQueues();
    }
 
    public List<Queue> getDurableQueues(SimpleString address)
    {
-      return getContextListing(address).durableQueues;
+      return getContextListing(address).getDurableQueues();
    }
 
    public int getQueueCount()
@@ -113,14 +113,9 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.server.RoutingContext#getAddresses()
     */
-   public Pair<SimpleString, ContextListing>[] getAddresses()
+   public Map<SimpleString, RouteContextList> getContexListing()
    {
-      Object x = new Pair(a, b);
-      
-      
-      Pair<SimpleString, ContextListing> [] contextListing = new Pair<SimpleString, ContextListing>[1]; 
-      // TODO Auto-generated method stub
-      return null;
+      return this.map;
    }
    
    

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -17,12 +17,10 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -42,11 +40,9 @@
 import org.hornetq.core.config.DivertConfiguration;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.impl.PageImpl;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
@@ -1014,9 +1010,9 @@
                   syncNonTransactional);
          }
 
-         protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx, boolean sync) throws Exception
+         protected boolean page(ServerMessage message, org.hornetq.core.server.RoutingContext ctx,  org.hornetq.core.server.RouteContextList listCtx, boolean sync) throws Exception
          {
-            boolean paged = super.page(message, ctx, sync);
+            boolean paged = super.page(message, ctx, listCtx, sync);
 
             if (paged)
             {

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-09 03:04:54 UTC (rev 9855)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-09 04:14:09 UTC (rev 9856)
@@ -515,7 +515,7 @@
 
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-         Assert.assertTrue(pageStore.page(msg, ctx));
+         Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
 
          PagedReference readMessage = iterator.next();
 
@@ -552,7 +552,7 @@
 
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-            Assert.assertTrue(pageStore.page(msg, ctx));
+            Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
          }
 
          PagedReference readMessage = iterator.next();
@@ -588,7 +588,7 @@
 
             msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-            Assert.assertTrue(pageStore.page(msg, ctx));
+            Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
          }
 
          PagedReference readMessage = iterator.next();
@@ -1091,7 +1091,7 @@
 
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
 
-         Assert.assertTrue(pageStore.page(msg, ctx));
+         Assert.assertTrue(pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS)));
       }
 
       return pageStore.getNumberOfPages();
@@ -1202,7 +1202,7 @@
          ServerMessage msg = new ServerMessageImpl(storage.generateUniqueID(), buffer.writerIndex());
          msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
          msg.putIntProperty("key", i);
-         pageStore.page(msg, ctx);
+         pageStore.page(msg, ctx, ctx.getContextListing(ADDRESS));
       }
 
    }



More information about the hornetq-commits mailing list