[jboss-cvs] JBoss Messaging SVN: r5760 - in trunk: src/main/org/jboss/messaging/core/management and 15 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 29 13:24:44 EST 2009


Author: timfox
Date: 2009-01-29 13:24:44 -0500 (Thu, 29 Jan 2009)
New Revision: 5760

Modified:
   trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
   trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
   trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
   trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
   trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
   trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
   trunk/src/main/org/jboss/messaging/core/server/Queue.java
   trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/util/SimpleString.java
   trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
Log:
More clustering


Modified: trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/client/management/impl/ManagementHelper.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -68,6 +68,8 @@
    
    public static final SimpleString HDR_ADDRESS = new SimpleString("_JBM_Address");
    
+   public static final SimpleString HDR_BINDING_ID = new SimpleString("_JBM_Binding_ID");
+   
    public static final SimpleString HDR_FILTERSTRING = new SimpleString("_JBM_FilterString");
 
    // Attributes ----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/management/NotificationType.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/NotificationType.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/management/NotificationType.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -31,5 +31,5 @@
  */
 public enum NotificationType
 {
-   QUEUE_CREATED, QUEUE_DESTROYED, ADDRESS_ADDED, ADDRESS_REMOVED, CONSUMER_CREATED, CONSUMER_CLOSED;
+   BINDING_ADDED, BINDING_REMOVED, ADDRESS_ADDED, ADDRESS_REMOVED, CONSUMER_CREATED, CONSUMER_CLOSED;
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/AddressControl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -84,10 +84,10 @@
       {
          Bindings bindings = postOffice.getBindingsForAddress(address);
          String[] queueNames = new String[bindings.getBindings().size()];
-         for (int i = 0; i < bindings.getBindings().size(); i++)
-         {
-            Binding binding = bindings.getBindings().get(i);
-            queueNames[i] = binding.getUniqueName().toString();
+         int i = 0;
+         for (Binding binding: bindings.getBindings())
+         {           
+            queueNames[i++] = binding.getUniqueName().toString();
          }
          return queueNames;
       }

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -284,29 +284,14 @@
       if (log.isDebugEnabled())
       {
          log.debug("registered queue " + objectName);
-      }
-      
-      TypedProperties props = new TypedProperties();
-      
-      log.info("registering queue with address "+ address);
-      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-      props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, queue.getName());
-      
-      sendNotification(new Notification(NotificationType.QUEUE_CREATED, props));
+      }            
    }
 
    public void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
    {
       ObjectName objectName = getQueueObjectName(address, name);
       unregisterResource(objectName);
-      messageCounterManager.unregisterMessageCounter(name.toString());
-      
-      TypedProperties props = new TypedProperties();
-      
-      props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-      props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, name);
-
-      sendNotification(new Notification(NotificationType.QUEUE_DESTROYED, props));
+      messageCounterManager.unregisterMessageCounter(name.toString());     
    }
 
    public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -67,7 +67,9 @@
    
    public static final SimpleString HDR_DUPLICATE_DETECTION_ID = new SimpleString("_JBM_DUPL_ID");
 
-   public static final SimpleString HDR_EXCLUDED_QUEUES = new SimpleString("_JBM_EXCLUDED_QUEUES");
+   public static final SimpleString HDR_ROUTE_TO_IDS = new SimpleString("_JBM_ROUTE_TO");
+   
+   public static final SimpleString HDR_FROM_CLUSTER = new SimpleString("_JBM_FROM_CLUSTER");
       
    // Attributes ----------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -378,9 +378,7 @@
 
             buff.putLong(msg.getMessageID());
 
-            SimpleString duplID = new SimpleString(bytes);
-
-            msg.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
+            msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
          }
 
          int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -67,9 +67,9 @@
 
    void updateScheduledDeliveryTime(MessageReference ref) throws Exception;
 
-   void storeDuplicateID(SimpleString address, SimpleString duplID, long recordID) throws Exception;
+   void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
 
-   void updateDuplicateID(SimpleString address, SimpleString duplID, long recordID) throws Exception;
+   void updateDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception;
 
    void deleteDuplicateID(long recordID) throws Exception;
 
@@ -83,9 +83,9 @@
 
    void deleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
 
-   void storeDuplicateIDTransactional(long txID, SimpleString address, SimpleString duplID, long recordID) throws Exception;
+   void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
 
-   void updateDuplicateIDTransactional(long txID, SimpleString address, SimpleString duplID, long recordID) throws Exception;
+   void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID, long recordID) throws Exception;
 
    void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception;
 
@@ -106,7 +106,7 @@
                            HierarchicalRepository<QueueSettings> queueSettingsRepository,
                            Map<Long, Queue> queues,
                            ResourceManager resourceManager,
-                           Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception;
+                           Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
 
    // Bindings related operations
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -79,6 +79,7 @@
 import org.jboss.messaging.core.transaction.TransactionPropertyIndexes;
 import org.jboss.messaging.core.transaction.Transaction.State;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.DataConstants;
 import org.jboss.messaging.util.IDGenerator;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.Pair;
@@ -279,14 +280,14 @@
       messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), SET_SCHEDULED_DELIVERY_TIME, encoding);
    }
 
-   public void storeDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+   public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
       messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding);
    }
 
-   public void updateDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+   public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
@@ -386,7 +387,7 @@
 
    public void storeDuplicateIDTransactional(final long txID,
                                              final SimpleString address,
-                                             final SimpleString duplID,
+                                             final byte[] duplID,
                                              final long recordID) throws Exception
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
@@ -396,7 +397,7 @@
 
    public void updateDuplicateIDTransactional(final long txID,
                                               final SimpleString address,
-                                              final SimpleString duplID,
+                                              final byte[] duplID,
                                               final long recordID) throws Exception
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
@@ -438,7 +439,7 @@
                                   final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                                   final Map<Long, Queue> queues,
                                   final ResourceManager resourceManager,
-                                  final Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception
+                                  final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
    {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
@@ -608,16 +609,16 @@
 
                encoding.decode(buff);
 
-               List<Pair<SimpleString, Long>> ids = duplicateIDMap.get(encoding.address);
+               List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
 
                if (ids == null)
                {
-                  ids = new ArrayList<Pair<SimpleString, Long>>();
+                  ids = new ArrayList<Pair<byte[], Long>>();
 
                   duplicateIDMap.put(encoding.address, ids);
                }
 
-               ids.add(new Pair<SimpleString, Long>(encoding.duplID, record.id));
+               ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
 
                break;
             }
@@ -671,7 +672,7 @@
                                          final Map<Long, Queue> queues,
                                          final ResourceManager resourceManager,
                                          final List<PreparedTransactionInfo> preparedTransactions,
-                                         final Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception
+                                         final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
    {
       final PagingManager pagingManager = postOffice.getPagingManager();
 
@@ -798,16 +799,16 @@
 
                   encoding.decode(buff);
 
-                  List<Pair<SimpleString, Long>> ids = duplicateIDMap.get(encoding.address);
+                  List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
 
                   if (ids == null)
                   {
-                     ids = new ArrayList<Pair<SimpleString, Long>>();
+                     ids = new ArrayList<Pair<byte[], Long>>();
 
                      duplicateIDMap.put(encoding.address, ids);
                   }
 
-                  ids.add(new Pair<SimpleString, Long>(encoding.duplID, record.id));
+                  ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
 
                   break;
                }
@@ -1405,9 +1406,9 @@
    {
       SimpleString address;
 
-      SimpleString duplID;
+      byte[] duplID;
 
-      public DuplicateIDEncoding(final SimpleString address, final SimpleString duplID)
+      public DuplicateIDEncoding(final SimpleString address, final byte[] duplID)
       {
          this.address = address;
 
@@ -1422,19 +1423,25 @@
       {
          address = buffer.getSimpleString();
 
-         duplID = buffer.getSimpleString();
+         int size = buffer.getInt();
+         
+         duplID = new byte[size];
+         
+         buffer.getBytes(duplID);
       }
 
       public void encode(final MessagingBuffer buffer)
       {
          buffer.putSimpleString(address);
 
-         buffer.putSimpleString(duplID);
+         buffer.putInt(duplID.length);
+         
+         buffer.putBytes(duplID);
       }
 
       public int getEncodeSize()
       {
-         return SimpleString.sizeofString(address) + SimpleString.sizeofString(duplID);
+         return SimpleString.sizeofString(address) + DataConstants.SIZE_INT + duplID.length;
       }
    }
    

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -80,8 +80,7 @@
       return true;
    }
 
-   public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,                               
-                                  final List<SimpleString> destinations) throws Exception
+   public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<SimpleString> destinations) throws Exception
    {
 
    }
@@ -93,11 +92,11 @@
    public void rollback(final long txID) throws Exception
    {
    }
-   
+
    public void storeReference(final long queueID, final long messageID) throws Exception
    {
    }
-   
+
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
    }
@@ -150,24 +149,24 @@
    {
    }
 
-   public void storeDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+   public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
    }
 
    public void storeDuplicateIDTransactional(final long txID,
                                              final SimpleString address,
-                                             final SimpleString duplID,
+                                             final byte[] duplID,
                                              final long recordID) throws Exception
    {
    }
 
-   public void updateDuplicateID(final SimpleString address, final SimpleString duplID, final long recordID) throws Exception
+   public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
    }
 
    public void updateDuplicateIDTransactional(final long txID,
                                               final SimpleString address,
-                                              final SimpleString duplID,
+                                              final byte[] duplID,
                                               final long recordID) throws Exception
    {
    }
@@ -214,12 +213,12 @@
    {
    }
 
-   public void loadMessageJournal(final PostOffice postOffice,
-                                  final StorageManager storageManager,
-                                  final HierarchicalRepository<QueueSettings> queueSettingsRepository,
-                                  final Map<Long, Queue> queues,
-                                  final ResourceManager resourceManager,
-                                  final Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap) throws Exception
+   public void loadMessageJournal(PostOffice postOffice,
+                                  StorageManager storageManager,
+                                  HierarchicalRepository<QueueSettings> queueSettingsRepository,
+                                  Map<Long, Queue> queues,
+                                  ResourceManager resourceManager,
+                                  Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
    {
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -48,6 +48,13 @@
    boolean filterMatches(ServerMessage message) throws Exception;
    
    boolean isHighAcceptPriority(ServerMessage message);
+   
+   //TODO find a better way
+   void willRoute(ServerMessage message);
 
    boolean isExclusive();
+   
+   int getID();
+   
+   void setID(int id);
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -22,7 +22,7 @@
 
 package org.jboss.messaging.core.postoffice;
 
-import java.util.List;
+import java.util.Collection;
 
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.transaction.Transaction;
@@ -38,10 +38,8 @@
  */
 public interface Bindings
 {
-   List<Binding> getBindings();
+   Collection<Binding> getBindings();
    
-   void route(final ServerMessage message) throws Exception;
-   
    void route(ServerMessage message, Transaction tx) throws Exception;
    
    void addBinding(Binding binding);

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/DuplicateIDCache.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -26,7 +26,6 @@
 
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.util.Pair;
-import org.jboss.messaging.util.SimpleString;
 
 /**
  * A DuplicateIDCache
@@ -39,9 +38,9 @@
  */
 public interface DuplicateIDCache
 {
-   boolean contains(SimpleString duplicateID);
+   boolean contains(byte[] duplicateID);
    
-   void addToCache(SimpleString duplicateID, Transaction tx) throws Exception;  
+   void addToCache(byte[] duplicateID, Transaction tx) throws Exception;  
    
-   void load(List<Pair<SimpleString, Long>> theIds) throws Exception;
+   void load(List<Pair<byte[], Long>> theIds) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -45,14 +45,17 @@
    
    private final SimpleString address;
    
+   private final int id;
+   
    private List<SimpleString> filterStrings;
    
    private int numberOfConsumers;
 
-   public QueueInfo(final SimpleString queueName, final SimpleString address)
+   public QueueInfo(final SimpleString queueName, final SimpleString address, final int id)
    {
       this.queueName = queueName;
       this.address = address;      
+      this.id = id;
    }
 
    public SimpleString getQueueName()
@@ -64,6 +67,11 @@
    {
       return address;
    }
+   
+   public int getID()
+   {
+      return id;
+   }
 
    public List<SimpleString> getFilterStrings()
    {

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.postoffice.impl;
 
+import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +33,7 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.server.Bindable;
@@ -55,13 +58,13 @@
 
    private final Map<SimpleString, Integer> routingNamePositions = new ConcurrentHashMap<SimpleString, Integer>();
 
-   private final List<Binding> bindingsList = new CopyOnWriteArrayList<Binding>();
+   private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
 
    private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
 
-   public List<Binding> getBindings()
+   public Collection<Binding> getBindings()
    {
-      return bindingsList;
+      return bindingsMap.values();
    }
 
    public void addBinding(final Binding binding)
@@ -91,7 +94,7 @@
          bindings.add(binding);
       }
 
-      bindingsList.add(binding);
+      bindingsMap.put(binding.getID(), binding);
    }
 
    public void removeBinding(final Binding binding)
@@ -117,15 +120,48 @@
          }
       }
 
-      bindingsList.remove(binding);
+      bindingsMap.remove(binding.getID());
    }
 
-   public void route(ServerMessage message) throws Exception
+   private void routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
    {
-      route(message, null);
+      byte[] ids = (byte[])message.getProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+      
+      ByteBuffer buff = ByteBuffer.wrap(ids);
+      
+      Set<Bindable> chosen = new HashSet<Bindable>();
+      
+      while (buff.hasRemaining())
+      {
+         int bindingID = buff.getInt();
+         
+         Binding binding = bindingsMap.get(bindingID);
+         
+         if (binding == null)
+         {
+            //The binding has been closed - we need to route the message somewhere else...............
+            throw new IllegalStateException("Binding not found when routing from cluster - it must have closed");
+            
+            //FIXME need to deal with this better            
+         }
+         
+         binding.willRoute(message);
+         
+         chosen.add(binding.getBindable());
+      }
+      
+      for (Bindable bindable : chosen)
+      {
+         bindable.preroute(message, tx);
+      }
+      
+      for (Bindable bindable : chosen)
+      {
+         bindable.route(message, tx);
+      }
    }
 
-   public void route(ServerMessage message, Transaction tx) throws Exception
+   public void route(final ServerMessage message, final Transaction tx) throws Exception
    {
       if (!exclusiveBindings.isEmpty())
       {
@@ -136,130 +172,137 @@
       }
       else
       {
-         Set<Bindable> chosen = new HashSet<Bindable>();
-
-         for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
+         if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
          {
-            SimpleString routingName = entry.getKey();
-
-            List<Binding> bindings = entry.getValue();
-
-            if (bindings == null)
+            routeFromCluster(message, tx);
+         }
+         else
+         {
+            Set<Bindable> chosen = new HashSet<Bindable>();
+   
+            for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
             {
-               // The value can become null if it's concurrently removed while we're iterating - this is expected
-               // ConcurrentHashMap behaviour!
-               continue;
-            }
-
-            Integer ipos = routingNamePositions.get(routingName);
-
-            int pos = ipos != null ? ipos.intValue() : 0;
-
-            int length = bindings.size();
-
-            int startPos = pos;
-
-            Binding theBinding = null;
-
-            int lastNoMatchingConsumerPos = -1;
-
-            while (true)
-            {
-               Binding binding;
-               try
+               SimpleString routingName = entry.getKey();
+   
+               List<Binding> bindings = entry.getValue();
+   
+               if (bindings == null)
                {
-                  binding = bindings.get(pos);
+                  // The value can become null if it's concurrently removed while we're iterating - this is expected
+                  // ConcurrentHashMap behaviour!
+                  continue;
                }
-               catch (IndexOutOfBoundsException e)
+   
+               Integer ipos = routingNamePositions.get(routingName);
+   
+               int pos = ipos != null ? ipos.intValue() : 0;
+   
+               int length = bindings.size();
+   
+               int startPos = pos;
+   
+               Binding theBinding = null;
+   
+               int lastNoMatchingConsumerPos = -1;
+   
+               while (true)
                {
-                  // This can occur if binding is removed while in route
-                  if (!bindings.isEmpty())
+                  Binding binding;
+                  try
                   {
-                     pos = 0;
-
-                     continue;
+                     binding = bindings.get(pos);
                   }
-                  else
+                  catch (IndexOutOfBoundsException e)
                   {
-                     break;
+                     // This can occur if binding is removed while in route
+                     if (!bindings.isEmpty())
+                     {
+                        pos = 0;
+   
+                        continue;
+                     }
+                     else
+                     {
+                        break;
+                     }
                   }
-               }
-
-               if (binding.filterMatches(message))
-               {
-                  // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
-                  // unnecessary overhead)
-                  if (length == 1 || binding.isHighAcceptPriority(message))
+   
+                  if (binding.filterMatches(message))
                   {
-                     theBinding = binding;
-
-                     pos = incrementPos(pos, length);
-
-                     break;
-                  }
-                  else
-                  {
-                     lastNoMatchingConsumerPos = pos;
-                  }
-               }
-
-               pos = incrementPos(pos, length);
-
-               if (pos == startPos)
-               {
-                  if (lastNoMatchingConsumerPos != -1)
-                  {                     
-                     try
+                     // bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
+                     // unnecessary overhead)
+                     if (length == 1 || binding.isHighAcceptPriority(message))
                      {
-                        theBinding = bindings.get(pos);
+                        theBinding = binding;
+   
+                        pos = incrementPos(pos, length);
+   
+                        break;
                      }
-                     catch (IndexOutOfBoundsException e)
+                     else
                      {
-                        // This can occur if binding is removed while in route
-                        if (!bindings.isEmpty())
+                        lastNoMatchingConsumerPos = pos;
+                     }
+                  }
+   
+                  pos = incrementPos(pos, length);
+   
+                  if (pos == startPos)
+                  {
+                     if (lastNoMatchingConsumerPos != -1)
+                     {                     
+                        try
                         {
-                           pos = 0;
-                           
-                           lastNoMatchingConsumerPos = -1;
-
-                           continue;
+                           theBinding = bindings.get(pos);
                         }
-                        else
+                        catch (IndexOutOfBoundsException e)
                         {
-                           break;
+                           // This can occur if binding is removed while in route
+                           if (!bindings.isEmpty())
+                           {
+                              pos = 0;
+                              
+                              lastNoMatchingConsumerPos = -1;
+   
+                              continue;
+                           }
+                           else
+                           {
+                              break;
+                           }
                         }
+                                            
+                        pos = lastNoMatchingConsumerPos;
+   
+                        pos = incrementPos(pos, length);
                      }
-                                         
-                     pos = lastNoMatchingConsumerPos;
-
-                     pos = incrementPos(pos, length);
+                     break;
                   }
-                  break;
                }
+   
+               if (theBinding != null)
+               {
+                  theBinding.willRoute(message);
+                  
+                  chosen.add(theBinding.getBindable());
+               }
+   
+               routingNamePositions.put(routingName, pos);
             }
-
-            if (theBinding != null)
+   
+            //TODO refactor to do this is one iteration
+            
+            for (Bindable bindable : chosen)
             {
-               chosen.add(theBinding.getBindable());
+               bindable.preroute(message, tx);
             }
-
-            routingNamePositions.put(routingName, pos);
-
+            
+            for (Bindable bindable : chosen)
+            {
+               bindable.route(message, tx);
+            }
          }
-
-         //TODO refactor to do this is one iteration
-         
-         for (Bindable bindable : chosen)
-         {
-            bindable.preroute(message, tx);
-         }
-         
-         for (Bindable bindable : chosen)
-         {
-            bindable.route(message, tx);
-         }
       }
-
    }
 
    private final int incrementPos(int pos, int length)

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -53,6 +53,8 @@
    
    private final boolean exclusive;
    
+   private int id;
+      
    public DivertBinding(final SimpleString address, final Divert divert)
    {
       this.address = address;
@@ -67,6 +69,17 @@
       
       this.exclusive = divert.isExclusive();
    }
+   
+   public int getID()
+   {
+      return id;
+   }
+   
+   public void setID(final int id)
+   {
+      this.id = id;
+   }
+   
       
    public boolean filterMatches(final ServerMessage message) throws Exception
    {
@@ -109,6 +122,10 @@
    {
       return true;
    }
+   
+   public void willRoute(final ServerMessage message)
+   {      
+   }
 
    public boolean isQueueBinding()
    {

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -50,89 +50,57 @@
 public class DuplicateIDCacheImpl implements DuplicateIDCache
 {
    private static final Logger log = Logger.getLogger(DuplicateIDCacheImpl.class);
-   
-   public static volatile boolean debug;
-   
-   private static Set<DuplicateIDCacheImpl> caches = new ConcurrentHashSet<DuplicateIDCacheImpl>();
 
-   public static void dumpCaches()
-   {
-      for (DuplicateIDCacheImpl cache : caches)
-      {
-         log.info("Dumping cache for address: " + cache.address);
-         log.info("First the set:");
-         for (SimpleString duplID : cache.cache)
-         {
-            log.info(duplID);
-         }
-         log.info("End set");
-         log.info("Now the list:");
-         for (Pair<SimpleString, Long> id : cache.ids)
-         {
-            log.info(id.a + ":" + id.b);
-         }
-         log.info("End dump");
-      }
-   }
+   private final Set<ByteArrayHolder> cache = new ConcurrentHashSet<ByteArrayHolder>();
 
-   private final Set<SimpleString> cache = new ConcurrentHashSet<SimpleString>();
-
    private final SimpleString address;
 
    // Note - deliberately typed as ArrayList since we want to ensure fast indexed
    // based array access
-   private final ArrayList<Pair<SimpleString, Long>> ids;
+   private final ArrayList<Pair<ByteArrayHolder, Long>> ids;
 
    private int pos;
 
    private int cacheSize;
 
    private final StorageManager storageManager;
-   
+
    private final boolean persist;
+
    
-   public DuplicateIDCacheImpl(final SimpleString address, final int size, final StorageManager storageManager,
+   public DuplicateIDCacheImpl(final SimpleString address,
+                               final int size,
+                               final StorageManager storageManager,
                                final boolean persist)
    {
       this.address = address;
 
       this.cacheSize = size;
 
-      this.ids = new ArrayList<Pair<SimpleString, Long>>(size);
+      this.ids = new ArrayList<Pair<ByteArrayHolder, Long>>(size);
 
       this.storageManager = storageManager;
-      
+
       this.persist = persist;
-      
-      if (debug)
-      {
-         caches.add(this);
-      }
    }
 
-   protected void finalize() throws Throwable
+   public void load(final List<Pair<byte[], Long>> theIds) throws Exception
    {
-      if (debug)
-      {
-         caches.remove(this);
-      }
-
-      super.finalize();
-   }
-
-   public void load(final List<Pair<SimpleString, Long>> theIds) throws Exception
-   {
       int count = 0;
 
       long txID = -1;
-      
-      for (Pair<SimpleString, Long> id : theIds)
+
+      for (Pair<byte[], Long> id : theIds)
       {
          if (count < cacheSize)
          {
-            cache.add(id.a);
+            ByteArrayHolder bah = new ByteArrayHolder(id.a);
             
-            ids.add(id);
+            Pair<ByteArrayHolder, Long> pair = new Pair<ByteArrayHolder, Long>(bah, id.b);
+            
+            cache.add(bah);
+
+            ids.add(pair);
          }
          else
          {
@@ -156,15 +124,15 @@
       pos = theIds.size();
    }
 
-   public boolean contains(final SimpleString duplID)
+   public boolean contains(final byte[] duplID)
    {
-      return cache.contains(duplID);
+      return cache.contains(new ByteArrayHolder(duplID));
    }
-   
-   public synchronized void addToCache(final SimpleString duplID, final Transaction tx) throws Exception
+
+   public synchronized void addToCache(final byte[] duplID, final Transaction tx) throws Exception
    {
       long recordID = storageManager.generateUniqueID();
-      
+
       if (tx == null)
       {
          if (persist)
@@ -175,25 +143,25 @@
          addToCacheInMemory(duplID, recordID);
       }
       else
-      {             
+      {
          if (persist)
          {
             storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);
-   
+
             tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
          }
-   
+
          // For a tx, it's important that the entry is not added to the cache until commit (or prepare)
          // since if the client fails then resends them tx we don't want it to get rejected
-         tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));      
+         tx.addOperation(new AddDuplicateIDOperation(duplID, recordID));
       }
    }
 
-   private void addToCacheInMemory(final SimpleString duplID, final long recordID) throws Exception
+   private void addToCacheInMemory(final byte[] duplID, final long recordID) throws Exception
    {
-      cache.add(duplID);
+      cache.add(new ByteArrayHolder(duplID));
 
-      Pair<SimpleString, Long> id;
+      Pair<ByteArrayHolder, Long> id;
 
       if (pos < ids.size())
       {
@@ -205,15 +173,15 @@
          // Record already exists - we delete the old one and add the new one
          // Note we can't use update since journal update doesn't let older records get
          // reclaimed
-         id.a = duplID;
-         
+         id.a = new ByteArrayHolder(duplID);
+
          storageManager.deleteDuplicateID(id.b);
 
          id.b = recordID;
       }
       else
       {
-         id = new Pair<SimpleString, Long>(duplID, recordID);
+         id = new Pair<ByteArrayHolder, Long>(new ByteArrayHolder(duplID), recordID);
 
          ids.add(id);
       }
@@ -225,14 +193,14 @@
    }
 
    private class AddDuplicateIDOperation implements TransactionOperation
-   {      
-      final SimpleString duplID;
+   {
+      final byte[] duplID;
 
       final long recordID;
 
       volatile boolean done;
 
-      AddDuplicateIDOperation(final SimpleString duplID, final long recordID)
+      AddDuplicateIDOperation(final byte[] duplID, final long recordID)
       {
          this.duplID = duplID;
 
@@ -248,7 +216,7 @@
             done = true;
          }
       }
-      
+
       public void beforeCommit(final Transaction tx) throws Exception
       {
       }
@@ -276,4 +244,56 @@
       }
 
    }
+   
+   private static final class ByteArrayHolder
+   {
+      ByteArrayHolder(final byte[] bytes)
+      {
+         this.bytes = bytes;
+      }
+
+      final byte[] bytes;
+
+      int hash;
+
+      public boolean equals(Object other)
+      {
+         if (other instanceof ByteArrayHolder)
+         {
+            ByteArrayHolder s = (ByteArrayHolder)other;
+
+            if (bytes.length != s.bytes.length)
+            {
+               return false;
+            }
+
+            for (int i = 0; i < bytes.length; i++)
+            {
+               if (bytes[i] != s.bytes[i])
+               {
+                  return false;
+               }
+            }
+
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }
+
+      public int hashCode()
+      {
+         if (hash == 0)
+         {
+            for (int i = 0; i < bytes.length; i++)
+            {
+               hash = 31 * hash + bytes[i];
+            }
+         }
+
+         return hash;
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -52,6 +52,8 @@
    
    private final SimpleString name;
    
+   private int id;
+      
    public LocalQueueBinding(final SimpleString address, final Queue queue)
    {
       this.address = address;
@@ -62,6 +64,17 @@
       
       this.name = queue.getName();
    }
+   
+   public int getID()
+   {
+      return id;
+   }
+   
+   public void setID(final int id)
+   {
+      this.id = id;
+   }
+   
       
    public boolean filterMatches(final ServerMessage message) throws Exception
    {
@@ -125,6 +138,12 @@
       
       return false;
    }
+   
+   public void willRoute(final ServerMessage message)
+   {      
+   }
+   
+   
 
    public boolean isQueueBinding()
    {

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -114,7 +114,19 @@
    private final int idCacheSize;
 
    private final boolean persistIDCache;
-
+   
+   //Each queue has a transient ID which lasts the lifetime of its binding. This is used in clustering when routing messages to particular queues on nodes. We could
+   //use the queue name on the node to identify it. But sometimes we need to route to maybe 10s of thousands of queues on a particular node, and all would
+   //have to be specified in the message. Specify 10000 ints takes up a lot less space than 10000 arbitrary queue names
+   //The drawback of this approach is we only allow up to 2^32 queues in memory at any one time
+   private int transientIDSequence;
+   
+   private Set<Integer> transientIDs = new HashSet<Integer>();
+   
+   private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
+   
+   private final Object notificationLock = new Object();
+      
    public PostOfficeImpl(final StorageManager storageManager,
                          final PagingManager pagingManager,
                          final QueueFactory bindableFactory,
@@ -215,17 +227,13 @@
    // NotificationListener implementation -------------------------------------
    
    
-   private Map<SimpleString, QueueInfo> queueInfos = new HashMap<SimpleString, QueueInfo>();
-   
-   private final Object notificationLock = new Object();
-   
    public void onNotification(final Notification notification)
    {
       synchronized (notificationLock)
       {
          NotificationType type = notification.getType();
          
-         if (type == NotificationType.QUEUE_CREATED)
+         if (type == NotificationType.BINDING_ADDED)
          {
             TypedProperties props = notification.getProperties();
             
@@ -233,11 +241,13 @@
             
             SimpleString address = (SimpleString)props.getProperty(ManagementHelper.HDR_ADDRESS);
             
-            QueueInfo info = new QueueInfo(queueName, address);
+            Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
             
+            QueueInfo info = new QueueInfo(queueName, address, transientID);
+            
             queueInfos.put(queueName, info);
          }
-         else if (type == NotificationType.QUEUE_DESTROYED)
+         else if (type == NotificationType.BINDING_REMOVED)
          {
             TypedProperties props = notification.getProperties();
             
@@ -294,87 +304,8 @@
          }
       }
    }
-   
-   private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
-   {
-      ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
-      message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
-      
-      message.setDestination(queueName);
-      
-      message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));        
-      message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-      
-      return message;
-   }
-   
-   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
-   {
-      //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
-      //that queue infos and notifications are received in a contiguous consistent stream
-      Binding binding = addressManager.getBinding(queueName);
-      
-      if (binding == null)
-      {
-         throw new IllegalStateException("Cannot find queue " + queueName);
-      }
-      
-      Queue queue = (Queue)binding.getBindable();
-      
-      //Need to lock to make sure all queue info and notifications are in the correct order with no gaps
-      synchronized (notificationLock)
-      {
-         //First send a reset message
-         
-         ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID()); 
-         message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
-         message.setDestination(queueName);
-         message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
-         
-         queue.preroute(message, null);            
-         queue.route(message, null);
-                  
-         for (QueueInfo info: queueInfos.values())
-         {
-            log.info("creatign queue created message");
-            message = createQueueInfoMessage(NotificationType.QUEUE_CREATED, queueName);
-            
-            message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
-            message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
-            
-            queue.preroute(message, null);            
-            queue.route(message, null);
-            
-            int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
-            
-            for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
-            {
-               message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
-               
-               message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName()); 
-               
-               queue.preroute(message, null);            
-               queue.route(message, null);
-            }
-            
-            if (info.getFilterStrings() != null)
-            {
-               for (SimpleString filterString: info.getFilterStrings())
-               {
-                  message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
-                  
-                  message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
-                  message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); 
-                  
-                  queue.preroute(message, null);            
-                  queue.route(message, null);
-               }
-            }           
-         }
-      }
-      
-   }
 
+
    // PostOffice implementation -----------------------------------------------
 
    public synchronized boolean addDestination(final SimpleString address, final boolean durable) throws Exception
@@ -429,21 +360,40 @@
    // even though failover is complete
    public synchronized void addBinding(final Binding binding) throws Exception
    {
+      binding.setID(generateTransientID());
+      
       addBindingInMemory(binding);
+           
+      TypedProperties props = new TypedProperties();
+      
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+      props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, binding.getRoutingName());
+      props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+      
+      managementService.sendNotification(new Notification(NotificationType.BINDING_ADDED, props));
    }
 
    public synchronized Binding removeBinding(final SimpleString uniqueName) throws Exception
    {
       Binding binding = removeBindingInMemory(uniqueName);
-
+      
       if (binding.isQueueBinding())
       {
          managementService.unregisterQueue(uniqueName, binding.getAddress());
       }
+                        
+      TypedProperties props = new TypedProperties();
+      
+      props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+      props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, binding.getRoutingName());
 
+      managementService.sendNotification(new Notification(NotificationType.BINDING_REMOVED, props));
+      
+      releaseTransientID(binding.getID());
+
       return binding;
    }
-
+   
    public Bindings getBindingsForAddress(final SimpleString address)
    {
       Bindings bindings = addressManager.getBindings(address);
@@ -474,7 +424,7 @@
          }
       }
 
-      SimpleString duplicateID = (SimpleString)message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
+      byte[] duplicateID = (byte[])message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
 
       DuplicateIDCache cache = null;
 
@@ -620,8 +570,115 @@
       return cache;
    }
    
+   
+
+   public void sendQueueInfoToQueue(final SimpleString queueName) throws Exception
+   {
+      //We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
+      //that queue infos and notifications are received in a contiguous consistent stream
+      Binding binding = addressManager.getBinding(queueName);
+      
+      if (binding == null)
+      {
+         throw new IllegalStateException("Cannot find queue " + queueName);
+      }
+      
+      Queue queue = (Queue)binding.getBindable();
+      
+      //Need to lock to make sure all queue info and notifications are in the correct order with no gaps
+      synchronized (notificationLock)
+      {
+         //First send a reset message
+         
+         ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID()); 
+         message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+         message.setDestination(queueName);
+         message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
+         
+         queue.preroute(message, null);            
+         queue.route(message, null);
+                  
+         for (QueueInfo info: queueInfos.values())
+         {            
+            message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
+            
+            message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
+            message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+            message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+            
+            queue.preroute(message, null);            
+            queue.route(message, null);
+            
+            int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
+            
+            for (int i = 0; i < info.getNumberOfConsumers() - consumersWithFilters; i++)
+            {
+               message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+               
+               message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName()); 
+               
+               queue.preroute(message, null);            
+               queue.route(message, null);
+            }
+            
+            if (info.getFilterStrings() != null)
+            {
+               for (SimpleString filterString: info.getFilterStrings())
+               {
+                  message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
+                  
+                  message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
+                  message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); 
+                  
+                  queue.preroute(message, null);            
+                  queue.route(message, null);
+               }
+            }           
+         }
+      }
+      
+   }
+   
    // Private -----------------------------------------------------------------
    
+   private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
+   {
+      ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
+      message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
+      
+      message.setDestination(queueName);
+      
+      message.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(type.toString()));        
+      message.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+      
+      return message;
+   }
+   
+   
+   private int generateTransientID()
+   {
+      int start = transientIDSequence;
+      do
+      {
+         int id = transientIDSequence++;
+         
+         if (!transientIDs.contains(id))
+         {
+            transientIDs.add(id);
+            
+            return id;
+         }
+      }
+      while (transientIDSequence != start);
+      
+      throw new IllegalStateException("Run out of queue ids!");
+   }
+   
+   private void releaseTransientID(final int id)
+   {
+      transientIDs.remove(id);
+   }
+   
    private final PageMessageOperation getPageOperation(final Transaction tx)
    {
       PageMessageOperation oper = (PageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_MESSAGES_OPERATION);
@@ -657,7 +714,7 @@
          }
 
          managementService.registerQueue(queue, binding.getAddress(), storageManager);
-      }
+      }            
    }
 
    private Binding removeBindingInMemory(final SimpleString bindingName) throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/WildcardAddressManager.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -21,16 +21,17 @@
  */
 package org.jboss.messaging.core.postoffice.impl;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.postoffice.Address;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.util.SimpleString;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * extends the simple manager to allow wildcard addresses to be used.
  *
@@ -72,7 +73,7 @@
                Bindings b = super.getBindings(destAdd.getAddress());
                if (b != null)
                {
-                  List<Binding> theBindings = b.getBindings();
+                  Collection<Binding> theBindings = b.getBindings();
                   for (Binding theBinding : theBindings)
                   {
                      super.addMappingInternal(address, theBinding);

Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -47,7 +47,7 @@
    long getPersistenceID();
 
    void setPersistenceID(long id);
-
+   
    Filter getFilter();
 
    boolean isDurable();

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -48,6 +48,8 @@
    int decrementRefCount();
 
    ServerMessage copy(long newID) throws Exception;
+   
+   ServerMessage copy() throws Exception;
 
    int getMemoryEstimate();
 
@@ -58,4 +60,6 @@
    boolean isStored();
 
    int getRefCount();
+   
+   //TODO - we might be able to put this in a better place
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -24,7 +24,9 @@
 
 import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
 
+import java.util.HashSet;
 import java.util.LinkedList;
+import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -47,6 +49,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.NotificationType;
 import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.FailureListener;
 import org.jboss.messaging.core.remoting.RemotingConnection;
@@ -135,9 +138,13 @@
    private final int maxRetriesAfterFailover;
 
    private final MessageHandler queueInfoMessageHandler;
-   
+
    private final String queueDataAddress;
 
+   private final SimpleString idsHeaderName;
+
+   private final boolean forClusterConnector;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -161,7 +168,8 @@
                      final int maxRetriesAfterFailover,
                      final boolean useDuplicateDetection,
                      final MessageHandler queueInfoMessageHandler,
-                     final String queueDataAddress) throws Exception
+                     final String queueDataAddress,
+                     final boolean forClusterConnector) throws Exception
    {
       log.info("Creating new bridge " + name + " queue " + queue);
       this.name = name;
@@ -204,11 +212,15 @@
       this.maxRetriesAfterFailover = maxRetriesAfterFailover;
 
       this.queueInfoMessageHandler = queueInfoMessageHandler;
-      
+
       log.info("queue info handler " + this.queueInfoMessageHandler);
-      
+
       this.queueDataAddress = queueDataAddress;
 
+      this.forClusterConnector = forClusterConnector;
+
+      this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
+
       if (maxBatchTime != -1)
       {
          future = scheduledExecutor.scheduleAtFixedRate(new BatchTimeout(),
@@ -258,28 +270,34 @@
 
             session.addFailureListener(BridgeImpl.this);
 
+            // TODO - we should move this code to the ClusterConnectorImpl - and just execute it when the bridge
+            // connection is opened and closed - we can use
+            // a callback to tell us that
             if (queueInfoMessageHandler != null)
             {
                // Get the queue data
 
-               SimpleString notifQueueName = new SimpleString("notif-").concat(UUIDGenerator.getInstance().generateSimpleStringUUID());
+               SimpleString notifQueueName = new SimpleString("notif-").concat(UUIDGenerator.getInstance()
+                                                                                            .generateSimpleStringUUID());
 
-               SimpleString filter = new SimpleString(                                                                                                         
-                                                      ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
+               SimpleString filter = new SimpleString(ManagementHelper.HDR_NOTIFICATION_TYPE + " IN (" +
                                                       "'" +
-                                                      NotificationType.QUEUE_CREATED +
+                                                      NotificationType.BINDING_ADDED +
                                                       "'," +
                                                       "'" +
-                                                      NotificationType.QUEUE_DESTROYED +
+                                                      NotificationType.BINDING_REMOVED +
                                                       "'," +
                                                       "'" +
                                                       NotificationType.CONSUMER_CREATED +
                                                       "'," +
                                                       "'" +
                                                       NotificationType.CONSUMER_CLOSED +
-                                                      "') AND " +                                                     
-                                                      ManagementHelper.HDR_ADDRESS + " LIKE '" + queueDataAddress + "%'");
-               
+                                                      "') AND " +
+                                                      ManagementHelper.HDR_ADDRESS +
+                                                      " LIKE '" +
+                                                      queueDataAddress +
+                                                      "%'");
+
                session.createQueue(DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS, notifQueueName, filter, false, true);
 
                ClientConsumer notifConsumer = session.createConsumer(notifQueueName);
@@ -294,12 +312,12 @@
                                                        ManagementServiceImpl.getMessagingServerObjectName(),
                                                        "sendQueueInfoToQueue",
                                                        notifQueueName.toString());
-               
+
                ClientProducer prod = session.createProducer(ConfigurationImpl.DEFAULT_MANAGEMENT_ADDRESS);
-               
+
                prod.send(message);
             }
-            
+
             log.info("Created objects");
 
             active = true;
@@ -323,7 +341,7 @@
       {
          return;
       }
-      
+
       started = false;
 
       active = false;
@@ -476,10 +494,10 @@
          {
             return;
          }
-         
+
          log.info("sending batch");
 
-         // TODO - if batch size = 1 then don't need tx
+         // TODO - if batch size = 1 then don't need tx - actually we should use asynch send acknowledgement stream - then we don't need a transaction at all
 
          while (true)
          {
@@ -493,6 +511,32 @@
             ref.getQueue().acknowledge(tx, ref);
 
             ServerMessage message = ref.getMessage();
+            
+            if (this.forClusterConnector)
+            {
+               //We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave only
+               //the one pertinent for the destination node - this is important since different queues on different nodes could have same queue ids
+               //Note we must copy since same message may get routed to other nodes which require different headers
+               message = message.copy();
+               
+               //TODO - we can optimise this
+              
+               Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
+               
+               byte[] queueIds = (byte[])message.getProperty(idsHeaderName);
+               
+               for (SimpleString propName: propNames)
+               {
+                  if (propName.startsWith(MessageImpl.HDR_ROUTE_TO_IDS))
+                  {
+                     message.removeProperty(propName);
+                  }
+               }
+               
+               message.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
+               
+               message.putBooleanProperty(MessageImpl.HDR_FROM_CLUSTER, Boolean.TRUE);
+            }
 
             if (transformer != null)
             {

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -310,9 +310,10 @@
                                            bridgeConfig.getRetryIntervalMultiplier(),
                                            bridgeConfig.getMaxRetriesBeforeFailover(),
                                            bridgeConfig.getMaxRetriesAfterFailover(),
-                                           false,
+                                           false, // Duplicate detection is handled in the RemoteQueueBindingImpl
                                            record,
-                                           address.toString());
+                                           address.toString(),
+                                           true);
 
             record.setBridge(bridge);
 
@@ -438,31 +439,34 @@
 
             log.info("Got notification message " + type);
 
-            if (type == NotificationType.QUEUE_CREATED)
+            if (type == NotificationType.BINDING_ADDED)
             {
                log.info("queue created");
-               SimpleString uniqueName = new SimpleString("flow-").concat(UUIDGenerator.getInstance()
-                                                                                       .generateSimpleStringUUID());
+               SimpleString uniqueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
 
                SimpleString queueAddress = (SimpleString)message.getProperty(ManagementHelper.HDR_ADDRESS);
 
                SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);
 
                SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
+               
+               Integer queueID = (Integer)message.getProperty(ManagementHelper.HDR_BINDING_ID);
 
                RemoteQueueBinding binding = new RemoteQueueBindingImpl(queueAddress,
                                                                        uniqueName,
                                                                        queueName,
+                                                                       queueID,
                                                                        filterString,
                                                                        queue,
                                                                        useDuplicateDetection,
-                                                                       forwardWhenNoMatchingConsumers);
+                                                                       forwardWhenNoMatchingConsumers,
+                                                                       bridge.getName());
 
                bindings.put(queueName, binding);
 
                postOffice.addBinding(binding);
             }
-            else if (type == NotificationType.QUEUE_DESTROYED)
+            else if (type == NotificationType.BINDING_REMOVED)
             {
                log.info("queue destroyed");
                SimpleString queueName = (SimpleString)message.getProperty(ManagementHelper.HDR_QUEUE_NAME);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -399,7 +399,8 @@
                                  config.getMaxRetriesAfterFailover(),
                                  config.isUseDuplicateDetection(),
                                  null,
-                                 null);
+                                 null,
+                                 false);
 
          bridges.put(config.getName(), bridge);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.server.cluster.impl;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -30,11 +31,13 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.server.Bindable;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
 import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.util.UUIDGenerator;
 
 /**
  * A RemoteQueueBindingImpl
@@ -56,6 +59,8 @@
    private final SimpleString uniqueName;
 
    private final SimpleString routingName;
+   
+   private final int remoteQueueID;
 
    private final Filter queueFilter;
 
@@ -68,14 +73,20 @@
    private final boolean duplicateDetection;
    
    private final boolean forwardWhenNoMatchingConsumers;
-
+   
+   private final SimpleString idsHeaderName;
+   
+   private int id;
+   
    public RemoteQueueBindingImpl(final SimpleString address,
                                  final SimpleString uniqueName,
                                  final SimpleString routingName,
+                                 final int remoteQueueID,
                                  final SimpleString filterString,
                                  final Queue storeAndForwardQueue,
                                  final boolean duplicateDetection,
-                                 final boolean forwardWhenNoMatchingConsumers) throws Exception
+                                 final boolean forwardWhenNoMatchingConsumers,
+                                 final SimpleString bridgeName) throws Exception
    {
       this.address = address;
 
@@ -84,6 +95,8 @@
       this.uniqueName = uniqueName;
 
       this.routingName = routingName;
+      
+      this.remoteQueueID = remoteQueueID;
 
       this.duplicateDetection = duplicateDetection;
 
@@ -97,8 +110,20 @@
       }
       
       this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+      
+      this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
    }
-
+   
+   public int getID()
+   {
+      return id;
+   }
+   
+   public void setID(final int id)
+   {
+      this.id = id;
+   }
+   
    public SimpleString getAddress()
    {
       return address;
@@ -170,6 +195,43 @@
 
       return false;
    }
+   
+   public void willRoute(final ServerMessage message)
+   {      
+      //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
+      
+      //TODO - this can be optimised
+      
+      byte[] ids = (byte[])message.getProperty(idsHeaderName);
+      
+      if (ids == null)
+      {
+         ids = new byte[4];
+      }
+      else
+      {
+         byte[] newIds = new byte[ids.length + 4];
+         
+         System.arraycopy(ids, 0, newIds, 4, ids.length);
+                          
+         ids = newIds;
+      }
+      
+      ByteBuffer buff = ByteBuffer.wrap(ids);
+      
+      buff.putInt(remoteQueueID);
+      
+      message.putBytesProperty(idsHeaderName, ids);           
+      
+      //Now add a duplicate detection header, if required.
+      //We use a GUID for this
+      if (duplicateDetection)
+      {
+         byte[] guid = UUIDGenerator.getInstance().generateUUID().asBytes();
+         
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, guid);
+      }
+   }
 
    public synchronized void addConsumer(final SimpleString filterString) throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -296,7 +296,7 @@
          postOffice.addBinding(binding);         
       }
 
-      Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
+      Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
 
       storageManager.loadMessageJournal(postOffice,
@@ -306,7 +306,7 @@
                                         resourceManager,
                                         duplicateIDMap);
 
-      for (Map.Entry<SimpleString, List<Pair<SimpleString, Long>>> entry : duplicateIDMap.entrySet())
+      for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
       {
          SimpleString address = entry.getKey();
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueFactoryImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -51,7 +51,7 @@
    private PostOffice postOffice;
 
    private final StorageManager storageManager;
-
+      
    public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
                            final HierarchicalRepository<QueueSettings> queueSettingsRepository,
                            final StorageManager storageManager)
@@ -75,7 +75,7 @@
                             final boolean temporary)
    {
       QueueSettings queueSettings = queueSettingsRepository.getMatch(name.toString());
-
+      
       Queue queue = new QueueImpl(persistenceID,
                                   name,
                                   filter,                                  

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -73,7 +73,7 @@
    public static final int NUM_PRIORITIES = 10;
 
    private volatile long persistenceID = -1;
-
+   
    private final SimpleString name;
 
    private volatile Filter filter;
@@ -116,9 +116,7 @@
 
    private int consumersToFailover = -1;
 
-  // private final SimpleString routeToPropertyName;
-
-   public QueueImpl(final long persistenceID,
+   public QueueImpl(final long persistenceID,                   
                     final SimpleString name,
                     final Filter filter,
                     final boolean durable,
@@ -129,7 +127,7 @@
                     final HierarchicalRepository<QueueSettings> queueSettingsRepository)
    {
       this.persistenceID = persistenceID;
-
+      
       this.name = name;
 
       this.filter = filter;
@@ -329,7 +327,7 @@
    {
       persistenceID = id;
    }
-
+   
    public Filter getFilter()
    {
       return filter;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -160,6 +160,13 @@
       
       return m;
    }
+   
+   public ServerMessage copy() throws Exception
+   {
+      ServerMessage m = new ServerMessageImpl(this);
+       
+      return m;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -1498,7 +1498,7 @@
          {
             storageManager.addQueueBinding(binding);                        
          }
-
+ 
          postOffice.addBinding(binding);
          
          if (temporary)

Modified: trunk/src/main/org/jboss/messaging/util/SimpleString.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/SimpleString.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/src/main/org/jboss/messaging/util/SimpleString.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.util;
 
@@ -30,7 +30,6 @@
 
 import org.jboss.messaging.core.logging.Logger;
 
-
 /**
  * 
  * A SimpleString
@@ -44,220 +43,219 @@
 public class SimpleString implements CharSequence, Serializable, Comparable<SimpleString>
 {
    private static final long serialVersionUID = 4204223851422244307L;
-   
+
    private static final Logger log = Logger.getLogger(SimpleString.class);
-   
 
    // Attributes
-	// ------------------------------------------------------------------------
-	private final byte[] data;
-	
-	private transient int hash;
-	
-	//Cache the string
-	private transient String str;
-	
+   // ------------------------------------------------------------------------
+   private final byte[] data;
+
+   private transient int hash;
+
+   // Cache the string
+   private transient String str;
+
    // Static
-	// ----------------------------------------------------------------------
-	
-	/**
-	 * Returns a SimpleString constructed from the <code>string</code> parameter.
-	 * If <code>string</code> is <code>null</code>, the return value will be <code>null</code> too.
-	 */
-	public static SimpleString toSimpleString(final String string)
-	{
-	   if (string == null)
-	   {
-	      return null;
-	   }
-	   return new SimpleString(string);
-	}
+   // ----------------------------------------------------------------------
 
-	// Constructors
-	// ----------------------------------------------------------------------
-		
-	public SimpleString(final String string)
-	{
-		int len = string.length();
-		
-		data = new byte[len << 1];
-		
-		int j = 0;
-		
-		for (int i = 0; i < len; i++)
-		{
-			char c = string.charAt(i);
-			
-			byte low = (byte)(c & 0xFF);  // low byte
-			
-			data[j++] = low;
-			
-			byte high = (byte)(c >> 8 & 0xFF);  // high byte
-			
-			data[j++] = high;
-		}
-		
-		str = string;
-	}
-	
-	public SimpleString(final byte[] data)
-	{
-		this.data = data;
-	}
-	
-	// CharSequence implementation
-	// ---------------------------------------------------------------------------
-	
-	public int length()
-	{
-		return data.length >> 1;
-	}
-	
-	public char charAt(int pos)
-	{
-		if (pos < 0 || pos >= data.length >> 1)
-		{
-			throw new IndexOutOfBoundsException();
-		}
-		pos <<= 1;
-		
-		return (char)(data[pos] | data[pos + 1] << 8);
-	}
-	
-	public CharSequence subSequence(final int start, final int end)
-	{
-		int len = data.length >> 1;
+   /**
+    * Returns a SimpleString constructed from the <code>string</code> parameter.
+    * If <code>string</code> is <code>null</code>, the return value will be <code>null</code> too.
+    */
+   public static SimpleString toSimpleString(final String string)
+   {
+      if (string == null)
+      {
+         return null;
+      }
+      return new SimpleString(string);
+   }
 
-		if (end < start || start < 0 || end > len)
-		{
-			throw new IndexOutOfBoundsException();
-		}
-		else
-		{
-			int newlen = (end - start) << 1;
-			byte[] bytes = new byte[newlen];
-			
-			System.arraycopy(data, start << 1, bytes, 0, newlen);
-			
-			return new SimpleString(bytes);
-		}
-	}
-	
-	// Comparable implementation -------------------------------------
-	
+   // Constructors
+   // ----------------------------------------------------------------------
+
+   public SimpleString(final String string)
+   {
+      int len = string.length();
+
+      data = new byte[len << 1];
+
+      int j = 0;
+
+      for (int i = 0; i < len; i++)
+      {
+         char c = string.charAt(i);
+
+         byte low = (byte)(c & 0xFF); // low byte
+
+         data[j++] = low;
+
+         byte high = (byte)(c >> 8 & 0xFF); // high byte
+
+         data[j++] = high;
+      }
+
+      str = string;
+   }
+
+   public SimpleString(final byte[] data)
+   {
+      this.data = data;
+   }
+
+   // CharSequence implementation
+   // ---------------------------------------------------------------------------
+
+   public int length()
+   {
+      return data.length >> 1;
+   }
+
+   public char charAt(int pos)
+   {
+      if (pos < 0 || pos >= data.length >> 1)
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      pos <<= 1;
+
+      return (char)(data[pos] | data[pos + 1] << 8);
+   }
+
+   public CharSequence subSequence(final int start, final int end)
+   {
+      int len = data.length >> 1;
+
+      if (end < start || start < 0 || end > len)
+      {
+         throw new IndexOutOfBoundsException();
+      }
+      else
+      {
+         int newlen = (end - start) << 1;
+         byte[] bytes = new byte[newlen];
+
+         System.arraycopy(data, start << 1, bytes, 0, newlen);
+
+         return new SimpleString(bytes);
+      }
+   }
+
+   // Comparable implementation -------------------------------------
+
    public int compareTo(SimpleString o)
    {
       return toString().compareTo(o.toString());
    }
-	
-	// Public
-	// ---------------------------------------------------------------------------
-	
-	public byte[] getData()
-	{
-		return data;
-	}
-		
-	public boolean startsWith(final SimpleString other)
-	{
-		byte[] otherdata = other.data;
-		
-		if (otherdata.length > this.data.length)
-		{
-			return false;
-		}
-		
-		for (int i = 0; i < otherdata.length; i++)
-		{
-			if (this.data[i] != otherdata[i])
-			{
-				return false;
-			}
-		}
-		
-		return true;
-	}
-		
-	public String toString()
-	{
-		if (str == null)
-		{
-   		int len = data.length >> 1;
-   		
-   		char[] chars = new char[len];
-   		
-   		int j = 0;
-   		
-   		for (int i = 0; i < len; i++)
-   		{
-   		   int low = data[j++] & 0xFF;
-   		   
-   		   int high = (data[j++] << 8) & 0xFF00 ;
-   		   
-   			chars[i] = (char)(low | high);
-   		}
-   		
-   		str =  new String(chars);
-		}
-		
-		return str;
-	}
-	
-	public boolean equals(Object other)
-	{		
-		if (other instanceof SimpleString)
-		{
-   		SimpleString s = (SimpleString)other;
-   		
-   		if (data.length != s.data.length)
-   		{
-   			return false;
-   		}
-   		
-   		for (int i = 0; i < data.length; i++)
-   		{
-   			if (data[i] != s.data[i])
-   			{
-   				return false;
-   			}
-   		}
-   		
-   		return true;
-		}
-		else
-		{
-			return false;
-		}
-	}
-	
-	public int hashCode()
-	{
-		if (hash == 0)
-		{
-			for (int i = 0; i < data.length; i++)
-			{
+
+   // Public
+   // ---------------------------------------------------------------------------
+
+   public byte[] getData()
+   {
+      return data;
+   }
+
+   public boolean startsWith(final SimpleString other)
+   {
+      byte[] otherdata = other.data;
+
+      if (otherdata.length > this.data.length)
+      {
+         return false;
+      }
+
+      for (int i = 0; i < otherdata.length; i++)
+      {
+         if (this.data[i] != otherdata[i])
+         {
+            return false;
+         }
+      }
+
+      return true;
+   }
+
+   public String toString()
+   {
+      if (str == null)
+      {
+         int len = data.length >> 1;
+
+         char[] chars = new char[len];
+
+         int j = 0;
+
+         for (int i = 0; i < len; i++)
+         {
+            int low = data[j++] & 0xFF;
+
+            int high = (data[j++] << 8) & 0xFF00;
+
+            chars[i] = (char)(low | high);
+         }
+
+         str = new String(chars);
+      }
+
+      return str;
+   }
+
+   public boolean equals(Object other)
+   {
+      if (other instanceof SimpleString)
+      {
+         SimpleString s = (SimpleString)other;
+
+         if (data.length != s.data.length)
+         {
+            return false;
+         }
+
+         for (int i = 0; i < data.length; i++)
+         {
+            if (data[i] != s.data[i])
+            {
+               return false;
+            }
+         }
+
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   public int hashCode()
+   {
+      if (hash == 0)
+      {
+         for (int i = 0; i < data.length; i++)
+         {
             hash = 31 * hash + data[i];
-        }
-		}
-		
-		return hash;
-	}
+         }
+      }
 
+      return hash;
+   }
+
    public SimpleString[] split(char delim)
    {
-      if(!contains(delim))
+      if (!contains(delim))
       {
-         return new SimpleString[]{this};
+         return new SimpleString[] { this };
       }
       else
       {
          List<SimpleString> all = new ArrayList<SimpleString>();
          int lasPos = 0;
-         for (int i = 0; i < data.length; i+=2)
+         for (int i = 0; i < data.length; i += 2)
          {
-            byte low = (byte)(delim & 0xFF);  // low byte
-            byte high = (byte)(delim >> 8 & 0xFF);  // high byte
-            if (data[i] == low && data[i+1] == high)
+            byte low = (byte)(delim & 0xFF); // low byte
+            byte high = (byte)(delim >> 8 & 0xFF); // high byte
+            if (data[i] == low && data[i + 1] == high)
             {
                byte[] bytes = new byte[i - lasPos];
                System.arraycopy(data, lasPos, bytes, 0, bytes.length);
@@ -273,14 +271,13 @@
       }
    }
 
-
    public boolean contains(char c)
    {
-      for (int i = 0; i < data.length; i+=2)
+      for (int i = 0; i < data.length; i += 2)
       {
-         byte low = (byte)(c & 0xFF);  // low byte
-			byte high = (byte)(c >> 8 & 0xFF);  // high byte
-         if (data[i] == low && data[i+1] == high)
+         byte low = (byte)(c & 0xFF); // low byte
+         byte high = (byte)(c >> 8 & 0xFF); // high byte
+         if (data[i] == low && data[i + 1] == high)
          {
             return true;
          }
@@ -288,7 +285,7 @@
       return false;
    }
 
-   public SimpleString concat(SimpleString toAdd)
+   public SimpleString concat(final SimpleString toAdd)
    {
       byte[] bytes = new byte[data.length + toAdd.getData().length];
       System.arraycopy(data, 0, bytes, 0, data.length);
@@ -296,20 +293,20 @@
       return new SimpleString(bytes);
    }
 
-   public SimpleString concat(char c)
+   public SimpleString concat(final char c)
    {
       byte[] bytes = new byte[data.length + 2];
       System.arraycopy(data, 0, bytes, 0, data.length);
       bytes[data.length] = (byte)(c & 0xFF);
-      bytes[data.length+1] = (byte)(c >> 8 & 0xFF);
+      bytes[data.length + 1] = (byte)(c >> 8 & 0xFF);
       return new SimpleString(bytes);
    }
 
    public static int sizeofString(final SimpleString str)
-	{
-		return SIZE_INT + str.data.length;
-	}
-   
+   {
+      return SIZE_INT + str.data.length;
+   }
+
    public static int sizeofNullableString(final SimpleString str)
    {
       if (str == null)
@@ -322,28 +319,31 @@
       }
    }
 
-   public void getChars(int srcBegin, int srcEnd, char dst[], int dstBegin) {
-      if (srcBegin < 0) {
-          throw new StringIndexOutOfBoundsException(srcBegin);
+   public void getChars(int srcBegin, int srcEnd, char dst[], int dstBegin)
+   {
+      if (srcBegin < 0)
+      {
+         throw new StringIndexOutOfBoundsException(srcBegin);
       }
-      if (srcEnd > length()) {
-          throw new StringIndexOutOfBoundsException(srcEnd);
+      if (srcEnd > length())
+      {
+         throw new StringIndexOutOfBoundsException(srcEnd);
       }
-      if (srcBegin > srcEnd) {
-          throw new StringIndexOutOfBoundsException(srcEnd - srcBegin);
+      if (srcBegin > srcEnd)
+      {
+         throw new StringIndexOutOfBoundsException(srcEnd - srcBegin);
       }
-      
+
       int j = 0;
 
-      for (int i = srcBegin; i <  srcEnd - srcBegin; i++)
+      for (int i = srcBegin; i < srcEnd - srcBegin; i++)
       {
          int low = data[j++] & 0xFF;
-         
-         int high = (data[j++] << 8) & 0xFF00 ;
-         
+
+         int high = (data[j++] << 8) & 0xFF00;
+
          dst[i] = (char)(low | high);
       }
-  }
+   }
 
-
 }
\ No newline at end of file

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/DuplicateDetectionTest.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -85,19 +85,19 @@
 
       message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       message2 = consumer.receive(1000);
       assertEquals(1, message2.getProperty(propKey));
 
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       message2 = consumer.receive(250);
       assertNull(message2);
 
       message = createMessage(session, 3);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       message2 = consumer.receive(250);
       assertNull(message2);
@@ -106,19 +106,19 @@
 
       message = createMessage(session, 4);
       SimpleString dupID2 = new SimpleString("hijklmnop");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       message2 = consumer.receive(1000);
       assertEquals(4, message2.getProperty(propKey));
 
       message = createMessage(session, 5);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       message2 = consumer.receive(1000);
       assertNull(message2);
 
       message = createMessage(session, 6);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       message2 = consumer.receive(250);
       assertNull(message2);
@@ -163,7 +163,7 @@
 
          ClientMessage message = createMessage(session, i);
 
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
 
          producer1.send(message);
          producer2.send(message);
@@ -183,8 +183,6 @@
          assertEquals(i, message.getProperty(propKey));
       }
 
-      DuplicateIDCacheImpl.dumpCaches();
-
       log.info("Now sending more");
       for (int i = 0; i < cacheSize; i++)
       {
@@ -192,7 +190,7 @@
 
          ClientMessage message = createMessage(session, i);
 
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
 
          producer1.send(message);
          producer2.send(message);
@@ -212,7 +210,7 @@
 
          message = createMessage(session, i);
 
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
 
          producer1.send(message);
          producer2.send(message);
@@ -238,7 +236,7 @@
 
          message = createMessage(session, i);
 
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
 
          producer1.send(message);
          producer2.send(message);
@@ -260,7 +258,7 @@
 
          message = createMessage(session, i);
 
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
 
          producer1.send(message);
          producer2.send(message);
@@ -301,7 +299,7 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
 
       session.close();
@@ -317,7 +315,7 @@
       // Should be able to resend it and not get rejected since transaction didn't commit
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
 
       session.commit();
@@ -351,7 +349,7 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
 
       session.rollback();
@@ -359,7 +357,7 @@
       // Should be able to resend it and not get rejected since transaction didn't commit
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
 
       session.commit();
@@ -393,12 +391,12 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID1 = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1.getData());
       producer.send(message);
 
       message = createMessage(session, 1);
       SimpleString dupID2 = new SimpleString("hijklmno");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
 
       session.commit();
@@ -406,11 +404,11 @@
       // These next two should get rejected
 
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID1.getData());
       producer.send(message);
 
       message = createMessage(session, 3);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
 
       session.commit();
@@ -449,7 +447,7 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
             
       session.commit();
@@ -463,7 +461,7 @@
       producer = session.createProducer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       message = createMessage(session, 2);
@@ -510,7 +508,7 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -532,7 +530,7 @@
       // Should be able to resend it and not get rejected since transaction didn't commit
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);
@@ -585,7 +583,7 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -609,7 +607,7 @@
       // Should be able to resend it and not get rejected since transaction didn't commit
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);
@@ -662,7 +660,7 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -686,7 +684,7 @@
       // Should NOT be able to resend it 
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);
@@ -738,7 +736,7 @@
 
       ClientMessage message = createMessage(session, 0);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -764,7 +762,7 @@
       // Should NOT be able to resend it 
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);
@@ -833,14 +831,14 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       ClientMessage message2 = consumer.receive(1000);
       assertEquals(1, message2.getProperty(propKey));
       
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       message2 = consumer.receive(1000);
       assertEquals(2, message2.getProperty(propKey));
@@ -868,13 +866,13 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       message2 = consumer.receive(200);
       assertNull(message2);
       
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       message2 = consumer.receive(200);
       assertNull(message2);
@@ -918,7 +916,7 @@
       {
          ClientMessage message = createMessage(session, i);
          SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
          producer.send(message);
          ClientMessage message2 = consumer.receive(1000);
          assertEquals(i, message2.getProperty(propKey));
@@ -950,7 +948,7 @@
       {
          ClientMessage message = createMessage(session, i);
          SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
          producer.send(message);
          ClientMessage message2 = consumer.receive(100);
          assertNull(message2);
@@ -996,7 +994,7 @@
       {
          ClientMessage message = createMessage(session, i);
          SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
          producer.send(message);
          ClientMessage message2 = consumer.receive(1000);
          assertEquals(i, message2.getProperty(propKey));
@@ -1030,7 +1028,7 @@
       {
          ClientMessage message = createMessage(session, i);
          SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
          producer.send(message);
          if (i >= subsequentCacheSize)
          {
@@ -1085,7 +1083,7 @@
       {
          ClientMessage message = createMessage(session, i);
          SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
          producer.send(message);
          ClientMessage message2 = consumer.receive(1000);
          assertEquals(i, message2.getProperty(propKey));
@@ -1130,7 +1128,7 @@
       {
          ClientMessage message = createMessage(session, i);
          SimpleString dupID = new SimpleString("abcdefg" + i);
-         message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+         message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
          producer.send(message);
          if (i >= subsequentCacheSize)
          {
@@ -1182,14 +1180,14 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       ClientMessage message2 = consumer.receive(1000);
       assertEquals(1, message2.getProperty(propKey));
       
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       message2 = consumer.receive(1000);
       assertEquals(2, message2.getProperty(propKey));
@@ -1217,13 +1215,13 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       message2 = consumer.receive(200);
       assertEquals(1, message2.getProperty(propKey));
       
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       message2 = consumer.receive(200);
       assertEquals(2, message2.getProperty(propKey));
@@ -1265,7 +1263,7 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       session.commit();
       ClientMessage message2 = consumer.receive(1000);
@@ -1273,7 +1271,7 @@
       
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       session.commit();
       message2 = consumer.receive(1000);
@@ -1302,14 +1300,14 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       session.commit();
       message2 = consumer.receive(200);
       assertEquals(1, message2.getProperty(propKey));
       
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       session.commit();
       message2 = consumer.receive(200);
@@ -1350,7 +1348,7 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       session.commit();
       ClientMessage message2 = consumer.receive(1000);
@@ -1360,7 +1358,7 @@
       
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       session.commit();
       message2 = consumer.receive(1000);
@@ -1391,14 +1389,14 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
       session.commit();
       message2 = consumer.receive(200);
       assertNull(message2);
       
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       session.commit();
       message2 = consumer.receive(200);
@@ -1445,12 +1443,12 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
                 
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -1484,11 +1482,11 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
             
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);
@@ -1544,12 +1542,12 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
                 
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -1581,11 +1579,11 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
             
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);
@@ -1641,12 +1639,12 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
                 
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -1680,11 +1678,11 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
             
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);
@@ -1740,12 +1738,12 @@
 
       ClientMessage message = createMessage(session, 1);
       SimpleString dupID = new SimpleString("abcdefg");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
                 
       message = createMessage(session, 2);
       SimpleString dupID2 = new SimpleString("hijklmnopqr");
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid, XAResource.TMSUCCESS);
@@ -1778,11 +1776,11 @@
       consumer = session.createConsumer(queueName);
 
       message = createMessage(session, 1);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID.getData());
       producer.send(message);
             
       message = createMessage(session, 2);
-      message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2);
+      message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, dupID2.getData());
       producer.send(message);
       
       session.end(xid2, XAResource.TMSUCCESS);

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -35,6 +35,24 @@
 public class FakeBinding implements Binding
 {
    
+   public int getID()
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
+   public void setID(int id)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   public void willRoute(ServerMessage message)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
    public boolean filterMatches(ServerMessage message) throws Exception
    {
       // TODO Auto-generated method stub

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java	2009-01-29 16:47:21 UTC (rev 5759)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java	2009-01-29 18:24:44 UTC (rev 5760)
@@ -250,53 +250,53 @@
       verify(mbeanServer);
    }
 
-   public void testRegisterQueue() throws Exception
-   {
-      SimpleString address = randomSimpleString();
-      SimpleString name = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
-      ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
+//   public void testRegisterQueue() throws Exception
+//   {
+//      SimpleString address = randomSimpleString();
+//      SimpleString name = randomSimpleString();
+//      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
+//      ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
+//
+//      MBeanServer mbeanServer = createMock(MBeanServer.class);
+//      Queue queue = createMock(Queue.class);
+//      expect(queue.getName()).andStubReturn(name);
+//      expect(queue.isDurable()).andReturn(true);
+//      StorageManager storageManager = createMock(StorageManager.class);
+//      expect(mbeanServer.isRegistered(objectName)).andReturn(false);
+//      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
+//
+//      replay(mbeanServer, queue, storageManager);
+//
+//      ManagementService service = new ManagementServiceImpl(mbeanServer, true);
+//      service.registerQueue(queue, address, storageManager);
+//
+//      verify(mbeanServer, queue, storageManager);
+//   }
+//
+//   public void testRegisterAlreadyRegisteredQueue() throws Exception
+//   {
+//      SimpleString address = randomSimpleString();
+//      SimpleString name = randomSimpleString();
+//      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
+//      ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
+//
+//      MBeanServer mbeanServer = createMock(MBeanServer.class);
+//      Queue queue = createMock(Queue.class);
+//      expect(queue.getName()).andStubReturn(name);
+//      expect(queue.isDurable()).andReturn(true);
+//      StorageManager storageManager = createMock(StorageManager.class);
+//      expect(mbeanServer.isRegistered(objectName)).andReturn(true);
+//      mbeanServer.unregisterMBean(objectName);
+//      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
+//
+//      replay(mbeanServer, queue, storageManager);
+//
+//      ManagementService service = new ManagementServiceImpl(mbeanServer, true);
+//      service.registerQueue(queue, address, storageManager);
+//
+//      verify(mbeanServer, queue, storageManager);
+//   }
 
-      MBeanServer mbeanServer = createMock(MBeanServer.class);
-      Queue queue = createMock(Queue.class);
-      expect(queue.getName()).andStubReturn(name);
-      expect(queue.isDurable()).andReturn(true);
-      StorageManager storageManager = createMock(StorageManager.class);
-      expect(mbeanServer.isRegistered(objectName)).andReturn(false);
-      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
-
-      replay(mbeanServer, queue, storageManager);
-
-      ManagementService service = new ManagementServiceImpl(mbeanServer, true);
-      service.registerQueue(queue, address, storageManager);
-
-      verify(mbeanServer, queue, storageManager);
-   }
-
-   public void testRegisterAlreadyRegisteredQueue() throws Exception
-   {
-      SimpleString address = randomSimpleString();
-      SimpleString name = randomSimpleString();
-      ObjectName objectName = ManagementServiceImpl.getQueueObjectName(address, name);
-      ObjectInstance objectInstance = new ObjectInstance(objectName, QueueControl.class.getName());
-
-      MBeanServer mbeanServer = createMock(MBeanServer.class);
-      Queue queue = createMock(Queue.class);
-      expect(queue.getName()).andStubReturn(name);
-      expect(queue.isDurable()).andReturn(true);
-      StorageManager storageManager = createMock(StorageManager.class);
-      expect(mbeanServer.isRegistered(objectName)).andReturn(true);
-      mbeanServer.unregisterMBean(objectName);
-      expect(mbeanServer.registerMBean(isA(StandardMBean.class), eq(objectName))).andReturn(objectInstance);
-
-      replay(mbeanServer, queue, storageManager);
-
-      ManagementService service = new ManagementServiceImpl(mbeanServer, true);
-      service.registerQueue(queue, address, storageManager);
-
-      verify(mbeanServer, queue, storageManager);
-   }
-
    public void testUnregisterQueue() throws Exception
    {
       SimpleString address = randomSimpleString();




More information about the jboss-cvs-commits mailing list