[hornetq-commits] JBoss hornetq SVN: r8053 - in trunk: src/main/org/hornetq/core/management/impl and 12 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 6 05:53:21 EDT 2009


Author: ataylor
Date: 2009-10-06 05:53:21 -0400 (Tue, 06 Oct 2009)
New Revision: 8053

Modified:
   trunk/src/main/org/hornetq/core/management/QueueControl.java
   trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/postoffice/Binding.java
   trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java
   trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
   trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-170 - removed transient id's from being used for binding id. the persistent queue id is now used.

Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -32,7 +32,7 @@
 
    String getAddress();
 
-   long getPersistenceID();
+   long getID();
 
    boolean isTemporary();
 

Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -159,9 +159,9 @@
       return queue.getMessagesAdded();
    }
 
-   public long getPersistenceID()
+   public long getID()
    {
-      return queue.getPersistenceID();
+      return queue.getID();
    }
 
    public long getScheduledCount()

Modified: trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/persistence/QueueBindingInfo.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -27,7 +27,7 @@
  */
 public interface QueueBindingInfo
 {
-   long getPersistenceID();
+   long getId();
    
    SimpleString getAddress();
    

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -310,7 +310,7 @@
    public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
    {
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-                                                                         ref.getQueue().getPersistenceID());
+                                                                         ref.getQueue().getID());
 
       messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                         SET_SCHEDULED_DELIVERY_TIME,
@@ -400,7 +400,7 @@
    public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
    {
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-                                                                         ref.getQueue().getPersistenceID());
+                                                                         ref.getQueue().getID());
 
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      ref.getMessage().getMessageID(),
@@ -457,7 +457,7 @@
 
    public void updateDeliveryCount(final MessageReference ref) throws Exception
    {
-      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getPersistenceID(),
+      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
                                                                                ref.getDeliveryCount());
 
       messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
@@ -1003,11 +1003,7 @@
                                                                                           binding.getAddress(),
                                                                                           filterString);
 
-      long id = this.generateUniqueID();
-
-      queue.setPersistenceID(id);
-
-      bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding, true);
+      bindingsJournal.appendAddRecord(binding.getID(), QUEUE_BINDING_RECORD, bindingEncoding, true);
    }
 
    public void deleteQueueBinding(final long queueBindingID) throws Exception
@@ -1037,7 +1033,7 @@
 
             bindingEncoding.decode(buffer);
 
-            bindingEncoding.setPersistenceID(id);
+            bindingEncoding.setId(id);
 
             queueBindingInfos.add(bindingEncoding);          
          }
@@ -1265,7 +1261,7 @@
 
    private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
    {
-      long persistenceID;
+      long id;
 
       SimpleString name;
 
@@ -1286,14 +1282,14 @@
          this.filterString = filterString;
       }
 
-      public long getPersistenceID()
+      public long getId()
       {
-         return persistenceID;
+         return id;
       }
 
-      public void setPersistenceID(final long id)
+      public void setId(final long id)
       {
-         this.persistenceID = id;
+         this.id = id;
       }
 
       public SimpleString getAddress()

Modified: trunk/src/main/org/hornetq/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Binding.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/Binding.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -48,9 +48,7 @@
 
    boolean isExclusive();
    
-   int getID();
-   
-   void setID(int id);
+   long getID();
 
    int getDistance();
 }

Modified: trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/QueueInfo.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -40,7 +40,7 @@
    
    private final SimpleString filterString;
    
-   private final int id;
+   private final long id;
    
    private List<SimpleString> filterStrings;
    
@@ -48,7 +48,7 @@
    
    private final int distance;
    
-   public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final int id,
+   public QueueInfo(final SimpleString routingName, final SimpleString clusterName, final SimpleString address, final SimpleString filterString, final Long id,
                     final Integer distance)
    {
       if (routingName == null)
@@ -100,7 +100,7 @@
       return distance;
    }
    
-   public int getID()
+   public long getID()
    {
       return id;
    }

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -51,7 +51,7 @@
 
    private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
 
-   private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
+   private final Map<Long, Binding> bindingsMap = new ConcurrentHashMap<Long, Binding>();
 
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
 
@@ -133,7 +133,7 @@
 
       while (buff.hasRemaining())
       {
-         int bindingID = buff.getInt();
+         long bindingID = buff.getLong();
 
          Binding binding = bindingsMap.get(bindingID);
 

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -45,10 +45,12 @@
    
    private final boolean exclusive;
    
-   private int id;
+   private final long id;
    
-   public DivertBinding(final SimpleString address, final Divert divert)
+   public DivertBinding(long id, final SimpleString address, final Divert divert)
    {
+      this.id = id;
+
       this.address = address;
       
       this.divert = divert;
@@ -62,16 +64,11 @@
       this.exclusive = divert.isExclusive();
    }
    
-   public int getID()
+   public long getID()
    {
       return id;
    }
    
-   public void setID(final int id)
-   {
-      this.id = id;
-   }
-   
    public Filter getFilter()
    {
       return filter;

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -48,8 +48,6 @@
    
    private final SimpleString name;
    
-   private int id;
-   
    private SimpleString clusterName;
    
    public LocalQueueBinding(final SimpleString address, final Queue queue, final SimpleString nodeID)
@@ -65,16 +63,11 @@
       this.clusterName = name.concat(nodeID);
    }
    
-   public int getID()
+   public long getID()
    {
-      return id;
+      return queue.getID();
    }
    
-   public void setID(final int id)
-   {
-      this.id = id;
-   }
-   
    public Filter getFilter()
    {
       return filter;

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -100,16 +100,6 @@
 
    private final boolean persistIDCache;
 
-   // Each queue has a transient ID which lasts the lifetime of its binding. This is used in clustering when routing
-   // messages to particular queues on nodes. We could
-   // use the queue name on the node to identify it. But sometimes we need to route to maybe 10s of thousands of queues
-   // on a particular node, and all would
-   // have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
-   // The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
-   private int transientIDSequence;
-
-   private Set<Integer> transientIDs = new HashSet<Integer>();
-
    private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
 
    private final Object notificationLock = new Object();
@@ -200,9 +190,6 @@
       addressManager.clear();
 
       queueInfos.clear();
-
-      transientIDs.clear();
-
    }
 
    public boolean isStarted()
@@ -243,13 +230,13 @@
 
                SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
 
-               Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
+               Long id = (Long)props.getProperty(ManagementHelper.HDR_BINDING_ID);
 
                SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
 
                Integer distance = (Integer)props.getProperty(ManagementHelper.HDR_DISTANCE);
 
-               QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, transientID, distance);
+               QueueInfo info = new QueueInfo(routingName, clusterName, address, filterString, id, distance);
 
                queueInfos.put(clusterName, info);
 
@@ -435,8 +422,6 @@
    // even though failover is complete
    public synchronized void addBinding(final Binding binding) throws Exception
    {
-      binding.setID(generateTransientID());
-
       addressManager.addBinding(binding);
       
       TypedProperties props = new TypedProperties();
@@ -449,7 +434,7 @@
 
       props.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
 
-      props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+      props.putLongProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
 
       props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
 
@@ -505,8 +490,6 @@
 
       managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
 
-      releaseTransientID(binding.getID());
-
       return binding;
    }
 
@@ -740,7 +723,7 @@
                message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
                message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
                message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
-               message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+               message.putLongProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
                message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
                message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
 
@@ -822,30 +805,6 @@
       return message;
    }
 
-   private int generateTransientID()
-   {
-      int start = transientIDSequence;
-      do
-      {
-         int id = transientIDSequence++;
-
-         if (!transientIDs.contains(id))
-         {
-            transientIDs.add(id);
-
-            return id;
-         }
-      }
-      while (transientIDSequence != start);
-
-      throw new IllegalStateException("Run out of queue ids!");
-   }
-
-   private void releaseTransientID(final int id)
-   {
-      transientIDs.remove(id);
-   }
-
    private final PageMessageOperation getPageOperation(final Transaction tx)
    {
       // you could have races on the case two sessions using the same XID

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -38,10 +38,8 @@
 
    SimpleString getName();
 
-   long getPersistenceID();
+   long getID();
 
-   void setPersistenceID(long id);
-
    Filter getFilter();
 
    boolean isDurable();

Modified: trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/cluster/ClusterConnection.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -41,7 +41,7 @@
    void handleReplicatedAddBinding(SimpleString address,
                                    SimpleString uniqueName,
                                    SimpleString routingName,
-                                   int queueID,
+                                   long queueID,
                                    SimpleString filterString,
                                    SimpleString queueName,                 
                                    int distance) throws Exception;

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -577,14 +577,15 @@
 
          SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
 
-         Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
+         Long queueID = (Long)message.getProperty(ManagementHelper.HDR_BINDING_ID);
 
          if (queueID == null)
          {
             throw new IllegalStateException("queueID is null");
          }
 
-         RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
+         RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
+                                                                 queueAddress,
                                                                  clusterName,
                                                                  routingName,
                                                                  queueID,
@@ -721,7 +722,7 @@
    public void handleReplicatedAddBinding(final SimpleString address,
                                           final SimpleString uniqueName,
                                           final SimpleString routingName,
-                                          final int queueID,
+                                          final long queueID,
                                           final SimpleString filterString,
                                           final SimpleString queueName,
                                           final int distance) throws Exception
@@ -735,7 +736,8 @@
 
       Queue queue = (Queue)queueBinding.getBindable();
 
-      RemoteQueueBinding binding = new RemoteQueueBindingImpl(address,
+      RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
+                                                              address,
                                                               uniqueName,
                                                               routingName,
                                                               queueID,

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -51,7 +51,7 @@
 
    private final SimpleString routingName;
    
-   private final int remoteQueueID;
+   private final long remoteQueueID;
 
    private final Filter queueFilter;
 
@@ -63,19 +63,22 @@
 
    private final SimpleString idsHeaderName;
    
-   private int id;
+   private final long id;
       
    private final int distance;
    
-   public RemoteQueueBindingImpl(final SimpleString address,
+   public RemoteQueueBindingImpl(final long id,
+                                 final SimpleString address,
                                  final SimpleString uniqueName,
                                  final SimpleString routingName,
-                                 final int remoteQueueID,
+                                 final Long remoteQueueID,
                                  final SimpleString filterString,
-                                 final Queue storeAndForwardQueue,                     
+                                 final Queue storeAndForwardQueue,
                                  final SimpleString bridgeName,
                                  final int distance) throws Exception
    {
+      this.id = id;
+
       this.address = address;
 
       this.storeAndForwardQueue = storeAndForwardQueue;
@@ -100,16 +103,11 @@
       this.distance = distance;
    }
    
-   public int getID()
+   public long getID()
    {
       return id;
    }
    
-   public void setID(final int id)
-   {
-      this.id = id;
-   }
-   
    public SimpleString getAddress()
    {
       return address;
@@ -195,20 +193,20 @@
       
       if (ids == null)
       {
-         ids = new byte[4];
+         ids = new byte[8];
       }
       else
       {
-         byte[] newIds = new byte[ids.length + 4];
+         byte[] newIds = new byte[ids.length + 8];
          
-         System.arraycopy(ids, 0, newIds, 4, ids.length);
+         System.arraycopy(ids, 0, newIds, 8, ids.length);
                           
          ids = newIds;
       }
       
       ByteBuffer buff = ByteBuffer.wrap(ids);
       
-      buff.putInt(remoteQueueID);
+      buff.putLong(remoteQueueID);
       
       message.putBytesProperty(idsHeaderName, ids); 
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -843,7 +843,7 @@
 
       if (queue.isDurable())
       {
-         storageManager.deleteQueueBinding(queue.getPersistenceID());
+         storageManager.deleteQueueBinding(queue.getID());
       }
 
       postOffice.removeBinding(queueName);
@@ -1169,7 +1169,7 @@
             filter = new FilterImpl(queueBindingInfo.getFilterString());
          }
 
-         Queue queue = queueFactory.createQueue(queueBindingInfo.getPersistenceID(),
+         Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
                                                 queueBindingInfo.getAddress(),
                                                 queueBindingInfo.getQueueName(),
                                                 filter,
@@ -1178,7 +1178,7 @@
 
          Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeID);
 
-         queues.put(queueBindingInfo.getPersistenceID(), queue);
+         queues.put(queueBindingInfo.getId(), queue);
 
          postOffice.addBinding(binding);
 
@@ -1267,8 +1267,8 @@
          filter = new FilterImpl(filterString);
       }
 
-      final Queue queue = queueFactory.createQueue(-1, address, queueName, filter, durable, temporary);
-
+      final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(), address, queueName, filter, durable, temporary);
+      
       binding = new LocalQueueBinding(address, queue, nodeID);
 
       if (durable)
@@ -1339,7 +1339,7 @@
                                         pagingManager,
                                         storageManager);
 
-         Binding binding = new DivertBinding(sAddress, divert);
+         Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
 
          postOffice.addBinding(binding);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -232,7 +232,7 @@
             }
             else
             {
-               storageManager.deleteMessageTransactional(tx.getID(), getPersistenceID(), msg.getMessageID());
+               storageManager.deleteMessageTransactional(tx.getID(), getID(), msg.getMessageID());
             }
          }
       }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -76,7 +76,7 @@
 
    public static final int NUM_PRIORITIES = 10;
 
-   private volatile long persistenceID = -1;
+   private final long id;
 
    private final SimpleString name;
 
@@ -140,7 +140,7 @@
 
    private volatile SimpleString expiryAddress;
 
-   public QueueImpl(final long persistenceID,
+   public QueueImpl(final long id,
                     final SimpleString address,
                     final SimpleString name,
                     final Filter filter,
@@ -151,7 +151,7 @@
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository)
    {
-      this.persistenceID = persistenceID;
+      this.id = id;
 
       this.address = address;
 
@@ -259,7 +259,7 @@
                message.setStored();
             }
 
-            storageManager.storeReference(ref.getQueue().getPersistenceID(), message.getMessageID());
+            storageManager.storeReference(ref.getQueue().getID(), message.getMessageID());
          }
 
          if (scheduledDeliveryTime != null && durableRef)
@@ -283,7 +283,7 @@
             tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
 
             storageManager.storeReferenceTransactional(tx.getID(),
-                                                       ref.getQueue().getPersistenceID(),
+                                                       ref.getQueue().getID(),
                                                        message.getMessageID());
          }
 
@@ -373,16 +373,11 @@
       return name;
    }
 
-   public long getPersistenceID()
+   public long getID()
    {
-      return persistenceID;
+      return id;
    }
 
-   public void setPersistenceID(final long id)
-   {
-      persistenceID = id;
-   }
-
    public Filter getFilter()
    {
       return filter;
@@ -662,7 +657,7 @@
 
       if (durableRef)
       {
-         storageManager.storeAcknowledge(persistenceID, message.getMessageID());
+         storageManager.storeAcknowledge(id, message.getMessageID());
       }
 
       postAcknowledge(ref);
@@ -676,7 +671,7 @@
 
       if (durableRef)
       {
-         storageManager.storeAcknowledgeTransactional(tx.getID(), persistenceID, message.getMessageID());
+         storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
 
          tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
       }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -25,72 +25,149 @@
  */
 public class ClusterRestartTest extends ClusterTestBase
 {
-   public void testRestartWithDurableQueues() throws Exception
+   public void testRestartWithQueuesCreateInDiffOrder() throws Exception
    {
-      /*setupServer(0, isFileStorage(), isNetty());
+      setupServer(0, isFileStorage(), isNetty());
       setupServer(1, isFileStorage(), isNetty());
-      setupServer(2, isFileStorage(), isNetty());
 
-      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
 
-      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
 
-      setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
 
-      startServers(0, 1, 2);
+      startServers(0, 1);
 
+
+      System.out.println("server 0 = " + getServer(0).getNodeID());
+      System.out.println("server 1 = " + getServer(1).getNodeID());
+
       try
       {
 
          setupSessionFactory(0, isNetty());
          setupSessionFactory(1, isNetty());
-         setupSessionFactory(2, isNetty());
 
 
+         //create some dummy queues to ensure that the test queue has a high numbered binding
+         createQueue(0, "queues.testaddress2", "queue0", null, false);
+         createQueue(0, "queues.testaddress2", "queue1", null, false);
+         createQueue(0, "queues.testaddress2", "queue2", null, false);
+         createQueue(0, "queues.testaddress2", "queue3", null, false);
+         createQueue(0, "queues.testaddress2", "queue4", null, false);
+         createQueue(0, "queues.testaddress2", "queue5", null, false);
+         createQueue(0, "queues.testaddress2", "queue6", null, false);
+         createQueue(0, "queues.testaddress2", "queue7", null, false);
+         createQueue(0, "queues.testaddress2", "queue8", null, false);
+         createQueue(0, "queues.testaddress2", "queue9", null, false);
+         //now create the 2 queues and make sure they are durable
+         createQueue(0, "queues.testaddress", "queue10", null, true);
+         createQueue(1, "queues.testaddress", "queue10", null, true);
 
-         createQueue(0, "queues.testaddress", "queue0", null, true);
-         createQueue(1, "queues.testaddress", "queue0", null, true);
-         createQueue(2, "queues.testaddress", "queue0", null, true);
+         addConsumer(0, 0, "queue10", null);
 
-         addConsumer(1, 1, "queue0", null);
+         waitForBindings(0, "queues.testaddress", 1, 1, true);
+         waitForBindings(1, "queues.testaddress", 1, 0, true);
 
+         waitForBindings(0, "queues.testaddress", 1, 0, false);
+         waitForBindings(1, "queues.testaddress", 1, 1, false);
+
+         printBindings(2);
+
+         sendInRange(1, "queues.testaddress", 0, 10, false, null);
+
+         System.out.println("stopping******************************************************");
+         stopServers(0);
+         System.out.println("stopped******************************************************");
+         startServers(0);
+
          waitForBindings(0, "queues.testaddress", 1, 0, true);
-         waitForBindings(1, "queues.testaddress", 1, 1, true);
-         waitForBindings(2, "queues.testaddress", 1, 0, true);
 
-         waitForBindings(0, "queues.testaddress", 2, 1, false);
-         waitForBindings(1, "queues.testaddress", 2, 0, false);
-         waitForBindings(2, "queues.testaddress", 2, 1, false);
+         addConsumer(1, 0, "queue10", null);
 
-         printBindings();
+         waitForBindings(0, "queues.testaddress", 1, 0, false);
+         waitForBindings(1, "queues.testaddress", 1, 1, false);
+         printBindings(2);
+         
+         sendInRange(1, "queues.testaddress", 10, 20, false, null);
 
-         sendInRange(1, "queues.testaddress", 0, 10, false, null);
 
+         verifyReceiveAllInRange(10, 20, 1);
+         System.out.println("*****************************************************************************");
+      }
+      finally
+      {
+         //closeAllConsumers();
 
-         sendInRange(2, "queues.testaddress", 10, 20, false, null);
+         closeAllSessionFactories();
 
+         stopServers(0, 1);
+      }
+   }
 
-         sendInRange(0, "queues.testaddress", 20, 30, false, null);
+   public void testRestartWithQueuesCreateInDiffOrder2() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
 
+      setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0);
+
+
+      startServers(0, 1);
+
+
+      System.out.println("server 0 = " + getServer(0).getNodeID());
+      System.out.println("server 1 = " + getServer(1).getNodeID());
+
+      try
+      {
+
+         setupSessionFactory(0, isNetty());
+         setupSessionFactory(1, isNetty());
+
+
+         //create some dummy queues to ensure that the test queue has a high numbered binding
+         createQueue(0, "queues.testaddress2", "queue0", null, false);
+         createQueue(0, "queues.testaddress2", "queue1", null, false);
+         createQueue(0, "queues.testaddress2", "queue2", null, false);
+         createQueue(0, "queues.testaddress2", "queue3", null, false);
+         createQueue(0, "queues.testaddress2", "queue4", null, false);
+         createQueue(0, "queues.testaddress2", "queue5", null, false);
+         createQueue(0, "queues.testaddress2", "queue6", null, false);
+         createQueue(0, "queues.testaddress2", "queue7", null, false);
+         createQueue(0, "queues.testaddress2", "queue8", null, false);
+         createQueue(0, "queues.testaddress2", "queue9", null, false);
+         //now create the 2 queues and make sure they are durable
+         createQueue(0, "queues.testaddress", "queue10", null, true);
+         createQueue(1, "queues.testaddress", "queue10", null, true);
+
+         waitForBindings(0, "queues.testaddress", 1, 0, true);
+         waitForBindings(1, "queues.testaddress", 1, 0, true);
+         waitForBindings(0, "queues.testaddress", 1, 0, false);
+         waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+         printBindings(2);
+
+         sendInRange(1, "queues.testaddress", 0, 10, true, null);
+
          System.out.println("stopping******************************************************");
-         stopServers(1);
+         stopServers(0);
+
+         sendInRange(1, "queues.testaddress", 10, 20, true, null);
          System.out.println("stopped******************************************************");
-         startServers(1);
+         startServers(0);
 
          waitForBindings(0, "queues.testaddress", 1, 0, true);
-         waitForBindings(1, "queues.testaddress", 1, 0, true);
-         waitForBindings(2, "queues.testaddress", 1, 0, true);
 
-         addConsumer(4, 1, "queue0", null);
-         waitForBindings(0, "queues.testaddress", 2, 1, false);
-         waitForBindings(1, "queues.testaddress", 2, 0, false);
-         waitForBindings(2, "queues.testaddress", 2, 1, false);
-         printBindings();
-         sendInRange(2, "queues.testaddress", 30, 40, false, null);
 
-         sendInRange(0, "queues.testaddress", 40, 50, false, null);
+         waitForBindings(0, "queues.testaddress", 1, 0, false);
+         waitForBindings(1, "queues.testaddress", 1, 0, false);
+         printBindings(2);
+         addConsumer(0, 1, "queue10", null);
+         addConsumer(1, 0, "queue10", null);
 
-         verifyReceiveAllInRange(0, 50, 1);
+         verifyReceiveRoundRobin(0, 20, 0, 1);
          System.out.println("*****************************************************************************");
       }
       finally
@@ -99,30 +176,22 @@
 
          closeAllSessionFactories();
 
-         stopServers(0, 1, 2);
-      }*/
+         stopServers(0, 1);
+      }
    }
 
-   private void printBindings()
+
+   private void printBindings(int num)
          throws Exception
    {
-      Collection<Binding> bindings0 = getServer(0).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
-      Collection<Binding> bindings1 = getServer(1).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
-      Collection<Binding> bindings2 = getServer(2).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
-      for (Binding binding : bindings0)
+      for(int i  = 0; i < num; i++)
       {
-         System.out.println(binding + " on node 0 at " + binding.getID());
+         Collection<Binding> bindings0 = getServer(i).getPostOffice().getBindingsForAddress(new SimpleString("queues.testaddress")).getBindings();
+         for (Binding binding : bindings0)
+         {
+            System.out.println(binding + " on node " + i + " at " + binding.getID());
+         }
       }
-
-      for (Binding binding : bindings1)
-      {
-         System.out.println(binding + " on node 1 at " + binding.getID());
-      }
-
-      for (Binding binding : bindings2)
-      {
-         System.out.println(binding + " on node 2 at " + binding.getID());
-      }
    }
 
    public boolean isNetty()

Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -130,9 +130,9 @@
             return (String)proxy.retrieveAttributeValue("name");
          }
 
-         public long getPersistenceID()
+         public long getID()
          {
-            return (Long)proxy.retrieveAttributeValue("persistenceID");
+            return (Long)proxy.retrieveAttributeValue("ID");
          }
 
          public long getScheduledCount()

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -957,7 +957,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.postoffice.Binding#getID()
        */
-      public int getID()
+      public long getID()
       {
 
          return 0;
@@ -1008,14 +1008,6 @@
       }
 
       /* (non-Javadoc)
-       * @see org.hornetq.core.postoffice.Binding#setID(int)
-       */
-      public void setID(final int id)
-      {
-
-      }
-
-      /* (non-Javadoc)
        * @see org.hornetq.core.postoffice.Binding#willRoute(org.hornetq.core.server.ServerMessage)
        */
       public void willRoute(final ServerMessage message)

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -19,7 +19,6 @@
 import java.util.concurrent.Executor;
 
 import org.hornetq.core.filter.Filter;
-import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.Distributor;
 import org.hornetq.core.server.MessageReference;
@@ -311,9 +310,9 @@
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#getPersistenceID()
+    * @see org.hornetq.core.server.Queue#getID()
     */
-   public long getPersistenceID()
+   public long getID()
    {
 
       return 0;
@@ -482,14 +481,7 @@
 
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#setPersistenceID(long)
-    */
-   public void setPersistenceID(long id)
-   {
 
-   }
-
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Bindable#preroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
     */

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-10-06 09:35:13 UTC (rev 8052)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-10-06 09:53:21 UTC (rev 8053)
@@ -51,21 +51,6 @@
 
    private static final SimpleString address1 = new SimpleString("address1");
 
-   public void testID()
-   {
-      final long id = 123;
-
-      Queue queue = new QueueImpl(id, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
-
-      assertEquals(id, queue.getPersistenceID());
-
-      final long id2 = 456;
-
-      queue.setPersistenceID(id2);
-
-      assertEquals(id2, queue.getPersistenceID());
-   }
-
    public void testName()
    {
       final SimpleString name = new SimpleString("oobblle");



More information about the hornetq-commits mailing list