[hornetq-commits] JBoss hornetq SVN: r9855 - in branches/Branch_New_Paging: src/main/org/hornetq/core/postoffice/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Nov 8 22:04:55 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-11-08 22:04:54 -0500 (Mon, 08 Nov 2010)
New Revision: 9855

Added:
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
Modified:
   branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
   branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
Log:
backup (it's not compiling)

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -965,15 +965,17 @@
 
    private long[] getQueueIDs(RoutingContext ctx)
    {
-      long ids[] = new long [ctx.getDurableQueues().size() + ctx.getNonDurableQueues().size()];
+      List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues(address);
+      List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getDurableQueues(address);
+      long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
       int i = 0;
       
-      for (org.hornetq.core.server.Queue q : ctx.getDurableQueues())
+      for (org.hornetq.core.server.Queue q : durableQueues)
       {
          ids[i++] = q.getID();
       }
       
-      for (org.hornetq.core.server.Queue q : ctx.getNonDurableQueues())
+      for (org.hornetq.core.server.Queue q : nonDurableQueues)
       {
          ids[i++] = q.getID();
       }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -839,83 +839,89 @@
       final List<MessageReference> refs = new ArrayList<MessageReference>();
 
       Transaction tx = context.getTransaction();
-
-      for (Queue queue : context.getNonDurableQueues())
+      
+      for (SimpleString add: context.getAddresses())
       {
-         MessageReference reference = message.createReference(queue);
-
-         refs.add(reference);
-
-         if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+         
+         PagingStore store = pagingManager.getPageStore(add);
+   
+         for (Queue queue : context.getNonDurableQueues(add))
          {
-            Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
-            reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+            MessageReference reference = message.createReference(queue);
+   
+            refs.add(reference);
+   
+            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+            {
+               Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+   
+               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+            }
+   
+            message.incrementRefCount();
          }
-
-         message.incrementRefCount();
-      }
-
-      Iterator<Queue> iter = context.getDurableQueues().iterator();
-
-      while (iter.hasNext())
-      {
-         Queue queue = iter.next();
-
-         MessageReference reference = message.createReference(queue);
-
-         refs.add(reference);
-
-         if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
+   
+         Iterator<Queue> iter = context.getDurableQueues(add).iterator();
+   
+         while (iter.hasNext())
          {
-            Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
-
-            reference.setScheduledDeliveryTime(scheduledDeliveryTime);
-         }
-
-         if (message.isDurable())
-         {
-            int durableRefCount = message.incrementDurableRefCount();
-
-            if (durableRefCount == 1)
+            Queue queue = iter.next();
+   
+            MessageReference reference = message.createReference(queue);
+   
+            refs.add(reference);
+   
+            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
             {
+               Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+   
+               reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+            }
+   
+            if (message.isDurable())
+            {
+               int durableRefCount = message.incrementDurableRefCount();
+   
+               if (durableRefCount == 1)
+               {
+                  if (tx != null)
+                  {
+                     storageManager.storeMessageTransactional(tx.getID(), message);
+                  }
+                  else
+                  {
+                     storageManager.storeMessage(message);
+                  }
+               }
+   
                if (tx != null)
                {
-                  storageManager.storeMessageTransactional(tx.getID(), message);
+                  storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
+   
+                  tx.setContainsPersistent();
                }
                else
                {
-                  storageManager.storeMessage(message);
+                  storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
                }
-            }
-
-            if (tx != null)
-            {
-               storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
-
-               tx.setContainsPersistent();
-            }
-            else
-            {
-               storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());
-            }
-
-            if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
-            {
-               if (tx != null)
+   
+               if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME))
                {
-                  storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+                  if (tx != null)
+                  {
+                     storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+                  }
+                  else
+                  {
+                     storageManager.updateScheduledDeliveryTime(reference);
+                  }
                }
-               else
-               {
-                  storageManager.updateScheduledDeliveryTime(reference);
-               }
             }
+   
+            message.incrementRefCount();
          }
-
-         message.incrementRefCount();
       }
-
+      
       if (tx != null)
       {
          tx.addOperation(new AddOperation(refs));

Added: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java	                        (rev 0)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RouteContextList.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+/**
+ * This is a simple datatype containing the list of a routing context
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public interface RouteContextList
+{
+
+   List<Queue> getDurableQueues();
+   
+   List<Queue> getNonDurableQueues();
+
+}

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java	2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/RoutingContext.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -15,12 +15,15 @@
 
 import java.util.List;
 
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.transaction.Transaction;
 
 /**
  * A RoutingContext
  *
  * @author Tim Fox
+ * @author Clebert Suconic
  *
  *
  */
@@ -30,14 +33,18 @@
 
    void setTransaction(Transaction transaction);
 
-   void addQueue(Queue queue);
+   void addQueue(SimpleString address, Queue queue);
 
-   List<Queue> getNonDurableQueues();
+   Pair<SimpleString, RouteContextList> getContexListing();
+   
+   List<Queue> getNonDurableQueues(SimpleString address);
 
-   List<Queue> getDurableQueues();
+   List<Queue> getDurableQueues(SimpleString address);
 
    int getQueueCount();
 
    void clear();
+   
+   
 
 }

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -204,7 +204,7 @@
       {
          // There can be many remote bindings for the same node, we only want to add the message once to
          // the s & f queue for that node
-         context.addQueue(storeAndForwardQueue);
+         context.addQueue(address, storeAndForwardQueue);
       }
    }
 

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -267,7 +267,7 @@
 
    public void route(final ServerMessage message, final RoutingContext context) throws Exception
    {
-      context.addQueue(this);
+      context.addQueue(address, this);
    }
 
    // Queue implementation ----------------------------------------------------------------------------------------

Modified: branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -14,9 +14,15 @@
 package org.hornetq.core.server.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.transaction.Transaction;
 
@@ -29,10 +35,10 @@
  */
 public class RoutingContextImpl implements RoutingContext
 {
-   private final List<Queue> nonDurableQueues = new ArrayList<Queue>(1);
+   
+   // The pair here is Durable and NonDurable
+   private Map<SimpleString, ContextListing> map = new HashMap<SimpleString, ContextListing>();
 
-   private final List<Queue> durableQueues = new ArrayList<Queue>(1);
-
    private Transaction transaction;
 
    private int queueCount;
@@ -41,35 +47,42 @@
    {
       this.transaction = transaction;
    }
-
+   
    public void clear()
    {
       transaction = null;
 
-      nonDurableQueues.clear();
-
-      durableQueues.clear();
-
+      map.clear();
+      
       queueCount = 0;
    }
 
-   public void addQueue(final Queue queue)
+   public void addQueue(final SimpleString address, final Queue queue)
    {
+
+      ContextListing listing = getContextListing(address);
+      
       if (queue.isDurable())
       {
-         durableQueues.add(queue);
+         listing.durableQueues.add(queue);
       }
       else
       {
-         nonDurableQueues.add(queue);
+         listing.durableQueues.add(queue);
       }
 
       queueCount++;
    }
-
-   public void addDurableQueue(final Queue queue)
+   
+   private ContextListing getContextListing(SimpleString address)
    {
-      durableQueues.add(queue);
+      ContextListing listing = map.get(address);
+      if (listing == null)
+      {
+         listing = new ContextListing();
+         map.put(address, listing);
+      }
+      return listing;
    }
 
    public Transaction getTransaction()
@@ -82,14 +95,14 @@
       transaction = tx;
    }
 
-   public List<Queue> getNonDurableQueues()
+   public List<Queue> getNonDurableQueues(SimpleString address)
    {
-      return nonDurableQueues;
+      return getContextListing(address).nonDurableQueues;
    }
 
-   public List<Queue> getDurableQueues()
+   public List<Queue> getDurableQueues(SimpleString address)
    {
-      return durableQueues;
+      return getContextListing(address).durableQueues;
    }
 
    public int getQueueCount()
@@ -97,4 +110,41 @@
       return queueCount;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.RoutingContext#getAddresses()
+    */
+   public Pair<SimpleString, ContextListing>[] getAddresses()
+   {
+      Object x = new Pair(a, b);
+      
+      
+      Pair<SimpleString, ContextListing> [] contextListing = new Pair<SimpleString, ContextListing>[1]; 
+      // TODO Auto-generated method stub
+      return null;
+   }
+   
+   
+   private class ContextListing implements RouteContextList
+   {
+      private List<Queue> durableQueue = new ArrayList<Queue>(1);
+      
+      private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.RouteContextList#getDurableQueues()
+       */
+      public List<Queue> getDurableQueues()
+      {
+         return durableQueue;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.RouteContextList#getNonDurableQueues()
+       */
+      public List<Queue> getNonDurableQueues()
+      {
+         return nonDurableQueue;
+      }
+   }
+
 }

Modified: branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java
===================================================================
--- branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-08 21:21:08 UTC (rev 9854)
+++ branches/Branch_New_Paging/tests/src/org/hornetq/tests/integration/paging/PageCursorTest.java	2010-11-09 03:04:54 UTC (rev 9855)
@@ -631,11 +631,11 @@
    private RoutingContextImpl generateCTX(Transaction tx)
    {
       RoutingContextImpl ctx = new RoutingContextImpl(tx);
-      ctx.addDurableQueue(queue);
+      ctx.addQueue(ADDRESS, queue);
       
       for (Queue q : this.queueList)
       {
-         ctx.addQueue(q);
+         ctx.addQueue(ADDRESS, q);
       }
       
       return ctx;



More information about the hornetq-commits mailing list