[Jboss-cvs] JBoss Messaging SVN: r1295 - in trunk: src/etc/server/default/deploy src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/local src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/etc tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/local tests/src/org/jboss/test/messaging/core/paging tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/tools/jmx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 15 13:44:31 EDT 2006


Author: timfox
Date: 2006-09-15 13:44:02 -0400 (Fri, 15 Sep 2006)
New Revision: 1295

Removed:
   trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java
Modified:
   trunk/src/etc/server/default/deploy/remoting-service.xml
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
   trunk/src/main/org/jboss/jms/server/destination/QueueService.java
   trunk/src/main/org/jboss/jms/server/destination/TopicService.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java
   trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
   trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
   trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/local/Queue.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
   trunk/tests/build.xml
   trunk/tests/etc/container.xml
   trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
   trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java
   trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
More clustering work - remoting fix, selectors



Modified: trunk/src/etc/server/default/deploy/remoting-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/remoting-service.xml	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/etc/server/default/deploy/remoting-service.xml	2006-09-15 17:44:02 UTC (rev 1295)
@@ -16,7 +16,8 @@
             <invoker transport="socket">
                <attribute name="marshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
                <attribute name="unmarshaller" isParam="true">org.jboss.jms.server.remoting.JMSWireFormat</attribute>
-               <attribute name="serializationtype" isParam="true">jboss</attribute>
+               <!-- Serialization type must be jms - do not change! -->
+               <attribute name="serializationtype" isParam="true">jms</attribute>
                <attribute name="dataType" isParam="true">jms</attribute>
                <attribute name="socket.check_connection" isParam="true">false</attribute>
                <attribute name="timeout">0</attribute>

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -170,12 +170,14 @@
          return;
       }
       
-      //We explicitly associate the datatype "jms" with our customer SerializationManager
+      //We explicitly associate the datatype "jms" with our custom SerializationManager
       //This is vital for performance reasons.
       try
       {
+//         SerializationStreamFactory.setManagerClassName(
+//            "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
          SerializationStreamFactory.setManagerClassName(
-            "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
+                  "jms", "org.jboss.jms.server.remoting.MessagingSerializationManager");
       }
       catch (Exception e)
       {

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -762,8 +762,11 @@
    {
       // We explicitly associate the datatype "jms" with the java SerializationManager
       // This is vital for performance reasons.
+//      SerializationStreamFactory.setManagerClassName(
+//         "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
       SerializationStreamFactory.setManagerClassName(
-         "jms", "org.jboss.remoting.serialization.impl.jboss.JBossSerializationManager");
+             "jms", "org.jboss.jms.server.remoting.MessagingSerializationManager");
+      
 
       JMSWireFormat wf = new JMSWireFormat();
 

Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -142,6 +142,8 @@
       while (iter.hasNext())
       {
          Binding binding = (Binding)iter.next();
+         
+         String filterString = binding.getFilter() != null ? binding.getFilter().getFilterString() : null;
                   
          if (durable && binding.isDurable())
          {                      
@@ -154,7 +156,7 @@
             sb.append("\", clientID=\"");
             sb.append(helper.getClientId());
             sb.append("\", selector=\"");
-            sb.append(binding.getSelector());
+            sb.append(filterString);
             sb.append("\"\n");
          }
          else if (!durable && !binding.isDurable())
@@ -162,7 +164,7 @@
             sb.append("Non-durable, subscriptionID=\"");
             sb.append(binding.getChannelId());
             sb.append("\", selector=\"");
-            sb.append(binding.getSelector());
+            sb.append(filterString);
             sb.append("\"\n");
          }
       }

Modified: trunk/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/destination/QueueService.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -101,7 +101,7 @@
                org.jboss.messaging.core.local.Queue q = 
                   new org.jboss.messaging.core.local.Queue(binding.getChannelId(), ms, pm, true, true,
                            destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize(),
-                           executor, null);
+                           executor);
                q.load();
                binding.setQueue(q);
                binding.activate();
@@ -114,10 +114,10 @@
             org.jboss.messaging.core.local.Queue q = 
                new org.jboss.messaging.core.local.Queue(idm.getId(), ms, pm, true, true,
                         destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize(),
-                        executor, null);
+                        executor);
             
             //Make a binding for this queue
-            postOffice.bindQueue(destination.getName(), destination.getName(), q);
+            postOffice.bindQueue(destination.getName(), destination.getName(), null, q);
          }
          
          //push security update to the server

Modified: trunk/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/TopicService.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/destination/TopicService.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -82,7 +82,7 @@
                org.jboss.messaging.core.local.Queue q = 
                   new org.jboss.messaging.core.local.Queue(binding.getChannelId(), ms, pm, true, true,
                            destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize(),
-                           executor, null);
+                           executor);
                q.load();
                binding.setQueue(q);
                binding.activate();

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -192,10 +192,10 @@
                            mDest.getFullSize(),
                            mDest.getPageSize(),
                            mDest.getDownCacheSize(),
-                           executor, selector);
+                           executor);
                
                //Make a binding for this queue
-               binding = topicPostOffice.bindQueue(new GUID().toString(), jmsDestination.getName(), q);               
+               binding = topicPostOffice.bindQueue(new GUID().toString(), jmsDestination.getName(), selector, q);               
             }
             else
             {
@@ -229,10 +229,10 @@
                               mDest.getFullSize(),
                               mDest.getPageSize(),
                               mDest.getDownCacheSize(),
-                              executor, selector);
+                              executor);
                   
                   //Make a binding for this queue
-                  binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), q);               
+                  binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), selector, q);               
                }
                else
                {
@@ -246,11 +246,13 @@
                   // Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
                   // one and creating a new one.
    
+                  String filterString = binding.getFilter() != null ? binding.getFilter().getFilterString() : null;
+                  
                   boolean selectorChanged =
-                     (selectorString == null && binding.getSelector() != null) ||
-                     (binding.getSelector() == null && selectorString != null) ||
-                     (binding.getSelector() != null && selectorString != null &&
-                     !binding.getSelector().equals(selectorString));
+                     (selectorString == null && filterString != null) ||
+                     (filterString == null && selectorString != null) ||
+                     (filterString != null && selectorString != null &&
+                     !filterString.equals(selectorString));
                   
                   if (trace) { log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed"); }
    
@@ -274,10 +276,10 @@
                                  mDest.getFullSize(),
                                  mDest.getPageSize(),
                                  mDest.getDownCacheSize(),
-                                 executor, selector);
+                                 executor);
                      
                      //Make a binding for this queue
-                     binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), q);  
+                     binding = topicPostOffice.bindQueue(name, jmsDestination.getName(), selector, q);  
                   }               
                }
             }
@@ -572,10 +574,10 @@
             QueuedExecutor executor = (QueuedExecutor)pool.get();
             Queue q = 
                new Queue(idm.getId(), ms, pm, true, false, fullSize, pageSize, downCacheSize,
-                         executor, null);
+                         executor);
             
             //Make a binding for this queue
-            queuePostOffice.bindQueue(dest.getName(), dest.getName(), q);  
+            queuePostOffice.bindQueue(dest.getName(), dest.getName(), null, q);  
          }         
       }
       catch (Throwable t)

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -25,6 +25,8 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -48,9 +50,8 @@
 import org.jboss.remoting.InvocationResponse;
 import org.jboss.remoting.marshal.Marshaller;
 import org.jboss.remoting.marshal.UnMarshaller;
-import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
-import org.jboss.remoting.marshal.serializable.SerializableUnMarshaller;
 import org.jboss.serial.io.JBossObjectInputStream;
+import org.jboss.serial.io.JBossObjectOutputStream;
 
 /**
  * 
@@ -77,6 +78,8 @@
    private static final long serialVersionUID = -7646123424863782043L;
 
    private static final Logger log = Logger.getLogger(JMSWireFormat.class);
+   
+   private static boolean usingJBossSerialization;
 
    // The request codes  - start from zero
 
@@ -99,46 +102,41 @@
 
 
    // Static --------------------------------------------------------
+   
+   public static void setUsingJBossSerialization(boolean b)
+   {
+      usingJBossSerialization = b;
+   }
 
    // Attributes ----------------------------------------------------
 
-   protected Marshaller serializableMarshaller;
-   protected UnMarshaller serializableUnMarshaller;
-
    protected boolean trace;
 
    // Constructors --------------------------------------------------
 
    public JMSWireFormat()
    {
-      serializableMarshaller = new SerializableMarshaller();
-      serializableUnMarshaller = new SerializableUnMarshaller();
-
       trace = log.isTraceEnabled();
    }
+   
+   
 
    // Marshaller implementation -------------------------------------
 
    public void write(Object obj, OutputStream out) throws IOException
    {          
-      DataOutputStream dos;
-      
-      if (out instanceof DataOutputStream)
+      //Sanity check
+      if (!(out instanceof MessagingObjectOutputStream))
       {
-         dos = (DataOutputStream)out;
+         throw new IllegalStateException("Must be MessagingObjectOutputStream");
       }
-      else
-      {
-         //TODO - We should get remoting to pass in a DataOutputStream
-         //So we don't have to wrap it every time
-         dos = new DataOutputStream(out);
-      }
       
+      DataOutputStream dos = (DataOutputStream)(((MessagingObjectOutputStream)out).getUnderlyingStream());
+            
       handleVersion(obj, dos);
 
       try
-      {
-         
+      {         
          if (obj instanceof InvocationRequest)
          {
             if (trace) { log.trace("writing InvocationRequest"); }
@@ -283,7 +281,7 @@
                   dos.write(SERIALIZED);
    
                   // Delegate to serialization to handle the wire format
-                  serializableMarshaller.write(obj, dos);
+                  serialize(dos, obj);
    
                   if (trace) { log.trace("wrote using standard serialization"); }
                }
@@ -311,7 +309,7 @@
                dos.write(SERIALIZED);
    
                //Delegate to serialization to handle the wire format
-               serializableMarshaller.write(obj, dos);
+               serialize(dos, obj);
    
                if (trace) { log.trace("wrote using standard serialization"); }
             }
@@ -374,7 +372,7 @@
                dos.write(SERIALIZED);
    
                //Delegate to serialization to handle the wire format
-               serializableMarshaller.write(obj, out);
+               serialize(dos, obj);
    
                if (trace) { log.trace("wrote using standard serialization"); }
             }
@@ -401,25 +399,13 @@
 
    public Object read(InputStream in, Map map) throws IOException, ClassNotFoundException
    {      
-      if (in instanceof JBossObjectInputStream)
+      // Sanity check
+      if (!(in instanceof MessagingObjectInputStream))
       {
-         // Need to explicitly set the classloader
-         ((JBossObjectInputStream)in).
-            setClassLoader(Thread.currentThread().getContextClassLoader());
+         throw new IllegalStateException("Must be MessagingObjectInputStream");
       }
       
-      DataInputStream dis;
-      
-      //TODO We should ensure that remoting always passes in a DataInputStream
-      //This saves us wrapping it each time
-      if (in instanceof DataInputStream)
-      {
-         dis = (DataInputStream)in;
-      }
-      else
-      {
-         dis = new DataInputStream(in);
-      }
+      DataInputStream dis = (DataInputStream)(((MessagingObjectInputStream)in).getUnderlyingStream());
 
       // First byte read is always version
 
@@ -437,7 +423,7 @@
             case SERIALIZED:
             {
                // Delegate to serialization
-               Object ret = serializableUnMarshaller.read(dis, map);
+               Object ret = deserialize(dis);
    
                if (trace) { log.trace("read using standard serialization"); }
    
@@ -726,6 +712,42 @@
 
       return mi;
    }
+   
+   private void serialize(OutputStream os, Object obj) throws Exception
+   { 
+      ObjectOutputStream oos;
+      
+      if (usingJBossSerialization)
+      {
+         oos = new JBossObjectOutputStream(os);
+      }
+      else
+      {
+         oos = new ObjectOutputStream(os);
+      }
+      
+      oos.writeObject(obj);
+      
+      oos.flush();      
+   }
+   
+   private Object deserialize(InputStream is) throws Exception
+   {
+      ObjectInputStream ois;
+      
+      if (usingJBossSerialization)
+      {
+         ois = new JBossObjectInputStream(is);
+      }
+      else
+      {
+         ois = new ObjectInputStream(is);
+      }
+      
+      Object obj = ois.readObject();
+      
+      return obj;
+   }
 
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingMarshallable.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,6 +21,10 @@
   */
 package org.jboss.jms.server.remoting;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 
 /**
  * 
@@ -31,19 +35,25 @@
  *
  * $Id$
  */
-public class MessagingMarshallable
+public class MessagingMarshallable implements Externalizable
 {
    // Constants -----------------------------------------------------
 
+   private static final long serialVersionUID = 4715063783844562048L;
+   
    // Static --------------------------------------------------------
 
    // Attributes ----------------------------------------------------
-
+   
    protected byte version;
    protected Object load;
 
    // Constructors --------------------------------------------------
 
+   public MessagingMarshallable()
+   {     
+   }
+   
    public MessagingMarshallable(byte version, Object load)
    {
       this.version = version;
@@ -67,6 +77,20 @@
       return "MessagingMarshallable[" + version + ", " + load + "]";
    }
 
+   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+   {
+      version = in.readByte();
+      
+      load = in.readObject();
+   }
+
+   public void writeExternal(ObjectOutput out) throws IOException
+   {
+      out.writeByte(version);
+      
+      out.writeObject(load);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectInputStream.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -23,10 +23,15 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InvalidObjectException;
+import java.io.NotActiveException;
 import java.io.ObjectInputStream;
+import java.io.ObjectInputValidation;
 
 /**
  * A MessagingObjectInputStream
+ * 
+ * See comment in MessagingSerializationManager
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
@@ -51,4 +56,149 @@
       return in;
    }
 
+   public int available() throws IOException
+   {
+      return in.available();
+   }
+
+   public void close() throws IOException
+   {
+      in.close();
+   }
+
+   public boolean equals(Object obj)
+   {
+      return in.equals(obj);
+   }
+
+   public int hashCode()
+   {
+      return in.hashCode();
+   }
+
+   public void mark(int readlimit)
+   {
+      in.mark(readlimit);
+   }
+
+   public boolean markSupported()
+   {
+      return in.markSupported();
+   }
+
+   public int read() throws IOException
+   {
+      return in.read();
+   }
+
+   public int read(byte[] b, int off, int len) throws IOException
+   {
+      return in.read(b, off, len);
+   }
+
+   public int read(byte[] b) throws IOException
+   {
+      return in.read(b);
+   }
+
+   public void reset() throws IOException
+   {
+      in.reset();
+   }
+
+   public long skip(long n) throws IOException
+   {
+      return in.skip(n);
+   }
+
+   public String toString()
+   {
+      return in.toString();
+   }
+
+   public void defaultReadObject() throws IOException, ClassNotFoundException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public boolean readBoolean() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public byte readByte() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public char readChar() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public double readDouble() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public GetField readFields() throws IOException, ClassNotFoundException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public float readFloat() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void readFully(byte[] buf, int off, int len) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void readFully(byte[] buf) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public int readInt() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public long readLong() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public short readShort() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public Object readUnshared() throws IOException, ClassNotFoundException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public int readUnsignedByte() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public int readUnsignedShort() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public String readUTF() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void registerValidation(ObjectInputValidation obj, int prio) throws NotActiveException, InvalidObjectException
+   {
+      throw new UnsupportedOperationException();
+   }
+
 }

Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingObjectOutputStream.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -27,6 +27,8 @@
 
 /**
  * A MessagingObjectOutputStream
+ * 
+ * See comment in MessagingSerializationManager
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
@@ -49,4 +51,119 @@
       return out;
    }
 
+   public void close() throws IOException
+   {
+      out.close();
+   }
+
+   public boolean equals(Object obj)
+   {
+      return out.equals(obj);
+   }
+
+   public void flush() throws IOException
+   {
+      out.flush();
+   }
+
+   public void write(byte[] b, int off, int len) throws IOException
+   {
+      out.write(b, off, len);
+   }
+
+   public void write(byte[] b) throws IOException
+   {
+      out.write(b);
+   }
+
+   public void write(int b) throws IOException
+   {
+      out.write(b);
+   }
+
+   public void defaultWriteObject() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public PutField putFields() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void reset() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void useProtocolVersion(int version) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeBoolean(boolean val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeByte(int val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeBytes(String str) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeChar(int val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeChars(String str) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeDouble(double val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeFields() throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeFloat(float val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeInt(int val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeLong(long val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeShort(int val) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeUnshared(Object obj) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public void writeUTF(String str) throws IOException
+   {
+      throw new UnsupportedOperationException();
+   }
+
 }

Modified: trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/jms/server/remoting/MessagingSerializationManager.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,17 +21,28 @@
  */
 package org.jboss.jms.server.remoting;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 
+import org.jboss.logging.Logger;
 import org.jboss.remoting.serialization.IMarshalledValue;
 import org.jboss.remoting.serialization.SerializationManager;
 
 /**
  * A MessagingSerializationManager
+ * 
+ * This class and the related ObjectInputStream and ObjectOutputStream classes
+ * are a hack to work around a limitiation of JBoss remoting whereby it always assumes
+ * the createInput and createOutput methods always return an ObjectInput/OutputStream
+ * For the purposes of messaging we want the marshaller and the server and client invokers
+ * to use the underlying stream instead, since we do not want all the extra crap that the object input/output
+ * streams add to the stream (headers)
+ * This should really be fixed properly in remoting
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
@@ -41,6 +52,9 @@
  */
 public class MessagingSerializationManager extends SerializationManager
 {
+   private static final Logger log = Logger.getLogger(MessagingSerializationManager.class);
+
+   
    public IMarshalledValue createdMarshalledValue(Object arg0) throws IOException
    {
       throw new UnsupportedOperationException();
@@ -48,12 +62,12 @@
 
    public ObjectInputStream createInput(InputStream in, ClassLoader cl) throws IOException
    {
-      return new MessagingObjectInputStream(in);
+      return new MessagingObjectInputStream(new DataInputStream(in));
    }
    
    public ObjectOutputStream createOutput(OutputStream out) throws IOException
    {
-      return new MessagingObjectOutputStream(out);
+      return new MessagingObjectOutputStream(new DataOutputStream(out));
    }
 
    public IMarshalledValue createMarshalledValueForClone(Object arg0) throws IOException

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -167,37 +167,8 @@
       {
          return handleInternal(sender, ref, tx, true);
       }
-   }
-   
-   public Delivery handleDontPersist(DeliveryObserver sender, MessageReference ref, Transaction tx)
-   {
-      checkClosed();
+   }  
       
-      Future result = new Future();
-
-      if (tx == null)
-      {         
-         try
-         {
-            // Instead of executing directly, we add the handle request to the event queue.
-            // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
-            // result, but when remoting does, we can use a full SEDA approach and get even better
-            // throughput.
-            this.executor.execute(new HandleRunnable(result, sender, ref, false));
-         }
-         catch (InterruptedException e)
-         {
-            log.warn("Thread interrupted", e);
-         }
-   
-         return (Delivery)result.getResult();
-      }
-      else
-      {
-         return handleInternal(sender, ref, tx, false);
-      }
-   }
-      
    // DeliveryObserver implementation --------------------------
 
    public void acknowledge(Delivery d, Transaction tx) throws Throwable
@@ -1037,24 +1008,26 @@
    {
       // by default a noop
    }
-
-   // Private -------------------------------------------------------
- 
-   private void checkClosed()
+   
+   protected void checkClosed()
    {
       if (router == null)
       {
          throw new IllegalStateException(this + " closed");
       }
    }
+
+   // Private -------------------------------------------------------
+ 
+
   
    // Inner classes -------------------------------------------------
 
-   private class DeliveryRunnable implements Runnable
+   protected class DeliveryRunnable implements Runnable
    {
       Future result;
       
-      DeliveryRunnable(Future result)
+      public DeliveryRunnable(Future result)
       {
          this.result = result;
       }
@@ -1070,7 +1043,7 @@
       }
    }
 
-   private class HandleRunnable implements Runnable
+   protected class HandleRunnable implements Runnable
    {
       Future result;
 
@@ -1080,7 +1053,7 @@
       
       boolean persist;
 
-      HandleRunnable(Future result, DeliveryObserver sender, MessageReference ref, boolean persist)
+      public HandleRunnable(Future result, DeliveryObserver sender, MessageReference ref, boolean persist)
       {
          this.result = result;
          this.sender = sender;

Modified: trunk/src/main/org/jboss/messaging/core/local/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/Queue.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/local/Queue.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -22,14 +22,9 @@
 package org.jboss.messaging.core.local;
 
 import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.PagingChannel;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.tx.Transaction;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -65,38 +60,19 @@
    // Static --------------------------------------------------------
    
    // Attributes ----------------------------------------------------
-   
-   protected Filter filter;
-   
-   
+    
    // Constructors --------------------------------------------------
 
    public Queue(long id, MessageStore ms, PersistenceManager pm,             
                 boolean acceptReliableMessages, boolean recoverable,
-                int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor,
-                Filter filter)
+                int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor)
    {
       super(id, ms, pm, acceptReliableMessages, recoverable, fullSize, pageSize, downCacheSize, executor);
       
       router = new RoundRobinPointToPointRouter();
-      
-      this.filter = filter;
    }
     
-   // Channel implementation ----------------------------------------
-   
-   public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
-   { 
-      //If the queue has a Filter we do not accept any Message references that do not
-      //match the Filter
-      if (filter != null && !filter.accept(ref))
-      {
-         if (trace) { log.trace(this + " not accepting " + ref); }
-         return null;
-      }
-      
-      return super.handle(sender, ref, tx);
-   }
+   // Channel implementation ----------------------------------------   
 
    // Public --------------------------------------------------------
 
@@ -105,11 +81,6 @@
       return "Queue[" + getChannelID() + "]";
    }
    
-   public Filter getFilter()
-   {
-      return filter;
-   }
-
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,6 +21,7 @@
  */
 package org.jboss.messaging.core.plugin.contract;
 
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredBinding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
 
@@ -46,7 +47,7 @@
     * @return
     * @throws Exception
     */
-   ClusteredBinding bindClusteredQueue(String queueName, String condition, ClusteredQueue queue) throws Exception;
+   ClusteredBinding bindClusteredQueue(String queueName, String condition, Filter filter, ClusteredQueue queue) throws Exception;
    
    /**
     * Unbind a clustered queue from the post office

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -23,6 +23,7 @@
 
 import java.util.List;
 
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.local.Queue;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
@@ -53,11 +54,12 @@
     * @param queueName The unique name of the queue
     * @param condition The condition to be used when routing references
     * @param noLocal
+    * @param filter The filter to use
     * @param queue
     * @return
     * @throws Exception
     */
-   Binding bindQueue(String queueName, String condition, Queue queue) throws Exception;
+   Binding bindQueue(String queueName, String condition, Filter filter, Queue queue) throws Exception;
    
    /**
     * Unbind a queue from the post office

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/Binding.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -21,6 +21,7 @@
  */
 package org.jboss.messaging.core.plugin.postoffice;
 
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.local.Queue;
 
 /**
@@ -42,7 +43,7 @@
    
    Queue getQueue();
    
-   String getSelector();
+   Filter getFilter();
      
    long getChannelId();
    

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/BindingImpl.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,6 +24,8 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 
+import org.jboss.jms.selector.Selector;
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.local.Queue;
 import org.jboss.messaging.util.StreamUtils;
 
@@ -49,7 +51,7 @@
     
    private boolean active;
    
-   private String selector;
+   private Filter filter;
    
    private long channelId;
      
@@ -59,7 +61,7 @@
    {      
    }
 
-   public BindingImpl(String nodeId, String queueName, String condition, String selector,
+   public BindingImpl(String nodeId, String queueName, String condition, Filter filter,
                       long channelId, boolean durable)
    {
       this.nodeId = nodeId;
@@ -68,7 +70,7 @@
       
       this.condition = condition;      
       
-      this.selector = selector;
+      this.filter = filter;
       
       this.channelId = channelId;
          
@@ -113,9 +115,9 @@
       return active;
    }
    
-   public String getSelector()
+   public Filter getFilter()
    {
-      return selector;
+      return filter;
    }
    
    public long getChannelId()
@@ -148,8 +150,11 @@
       
       active = in.readBoolean();
       
-      selector = (String)StreamUtils.readObject(in, false);
+      String filterString = (String)StreamUtils.readObject(in, false);
       
+      //TODO we need to generalise this
+      filter = new Selector(filterString);
+      
       channelId = in.readLong();
       
       durable = in.readBoolean();
@@ -165,7 +170,7 @@
       
       out.writeBoolean(active);
       
-      StreamUtils.writeObject(out, selector, false, false);
+      StreamUtils.writeObject(out, filter.getFilterString(), false, false);
       
       out.writeLong(channelId);
       

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/PostOfficeImpl.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,6 +24,7 @@
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,8 +37,10 @@
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 
+import org.jboss.jms.selector.Selector;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.local.Queue;
 import org.jboss.messaging.core.plugin.JDBCSupport;
@@ -121,7 +124,7 @@
      
    // PostOffice implementation ---------------------------------------        
          
-   public Binding bindQueue(String queueName, String condition, Queue queue) throws Exception
+   public Binding bindQueue(String queueName, String condition, Filter filter, Queue queue) throws Exception
    {
       if (queueName == null)
       {
@@ -153,11 +156,9 @@
          }
          
          boolean durable = queue.isRecoverable();
-         
-         String filter = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
                     
          binding = createBinding(nodeId, queueName, condition, filter,
-                                   queue.getChannelID(), durable);         
+                                 queue.getChannelID(), durable);         
          
          binding.setQueue(queue);
          
@@ -327,13 +328,18 @@
                if (binding.isActive() && binding.getNodeId().equals(this.nodeId))
                {
                   //It's a local binding so we pass the message on to the subscription
-                  Queue subscription = binding.getQueue();
-               
-                  Delivery del = subscription.handle(null, ref, tx);
+                  Queue queue = binding.getQueue();
                   
-                  if (del != null && del.isSelectorAccepted())
+                  Filter filter = binding.getFilter();
+                  
+                  if (filter != null && filter.accept(ref))
                   {
-                     routed = true;
+                     Delivery del = queue.handle(null, ref, tx);
+                     
+                     if (del != null && del.isSelectorAccepted())
+                     {
+                        routed = true;
+                     }      
                   }                  
                }               
             } 
@@ -355,9 +361,9 @@
      
    // Protected -----------------------------------------------------
    
-   protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
-                                   long channelId, boolean durable)
-   {
+   protected Binding createBinding(String nodeId, String queueName, String condition, Filter filter,
+                                   long channelId, boolean durable) throws Exception
+   {           
       return new BindingImpl(nodeId, queueName, condition, filter,
                              channelId, durable);   
    }
@@ -396,12 +402,21 @@
          conn = ds.getConnection();
          
          ps = conn.prepareStatement(getSQLStatement("INSERT_BINDING"));
+         
+         String filterString = binding.getFilter() == null ? null : binding.getFilter().getFilterString();
                   
          ps.setString(1, this.officeName);
          ps.setString(2, this.nodeId);
          ps.setString(3, binding.getQueueName());
          ps.setString(4, binding.getCondition());         
-         ps.setString(5, binding.getSelector());
+         if (filterString != null)
+         {
+            ps.setString(5, filterString);
+         }
+         else
+         {
+            ps.setNull(5, Types.VARCHAR);
+         }
          ps.setLong(6, binding.getChannelId());
 
          ps.executeUpdate();;
@@ -488,13 +503,25 @@
             
             String selector = rs.getString(4);
             
+            if (rs.wasNull())
+            {
+               selector = null;
+            }
+            
             long channelId = rs.getLong(5);
               
             //We don't load the actual queue - this is because we don't know the paging params until
             //activation time
                     
-            Binding binding = createBinding(nodeId, queueName, condition, selector, channelId, true);
+            //TODO we need to generalise selector creation
+            Selector filter = null;
+            if (selector != null)
+            {
+               filter = new Selector(selector);
+            }
             
+            Binding binding = createBinding(nodeId, queueName, condition, filter, channelId, true);
+            
             list.add(binding);
          }
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRequest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,7 +24,6 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 
-import org.jboss.logging.Logger;
 import org.jboss.messaging.util.Streamable;
 
 /**
@@ -38,9 +37,7 @@
  *
  */
 abstract class ClusterRequest implements Streamable
-{
-   private static final Logger log = Logger.getLogger(ClusterRequest.class);
-      
+{    
    /*
     * Factory method
     */

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredBindingImpl.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -24,6 +24,7 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.plugin.postoffice.BindingImpl;
 
 /**
@@ -46,9 +47,9 @@
    {
    }
 
-   public ClusteredBindingImpl(String nodeId, String queueName, String condition, String selector, long channelId, boolean durable)
+   public ClusteredBindingImpl(String nodeId, String queueName, String condition, Filter filter, long channelId, boolean durable)
    {
-      super(nodeId, queueName, condition, selector, channelId, durable);
+      super(nodeId, queueName, condition, filter, channelId, durable);
    }
 
    public double getConsumptionRate()

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredPostOfficeImpl.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -36,8 +36,10 @@
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 
+import org.jboss.jms.selector.Selector;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.local.Queue;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
@@ -277,16 +279,14 @@
    
    // PostOffice implementation ---------------------------------------        
    
-   public ClusteredBinding bindClusteredQueue(String queueName, String condition, ClusteredQueue queue) throws Exception
+   public ClusteredBinding bindClusteredQueue(String queueName, String condition, Filter filter, ClusteredQueue queue) throws Exception
    {           
-      ClusteredBinding binding = (ClusteredBinding)super.bindQueue(queueName, condition, queue);
+      ClusteredBinding binding = (ClusteredBinding)super.bindQueue(queueName, condition, filter, queue);
       
       boolean durable = queue.isRecoverable();
       
-      String filter = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
-      
       BindRequest request =
-         new BindRequest(nodeId, queueName, condition, filter,
+         new BindRequest(nodeId, queueName, condition, filter == null ? null : filter.getFilterString(),
                          binding.getChannelId(), durable);
       
       syncSendRequest(request);
@@ -410,33 +410,38 @@
                   throw new IllegalStateException("No bindings in list!");
                }
                
-               if (binding.getNodeId().equals(this.nodeId))
+               Filter filter = binding.getFilter();
+                               
+               if (filter != null && filter.accept(ref))
                {
-                  //It's a local binding so we pass the message on to the queue
-                  Queue queue = binding.getQueue();
-               
-                  Delivery del = queue.handle(null, ref, tx);    
+                  if (binding.getNodeId().equals(this.nodeId))
+                  {
+                     //It's a local binding so we pass the message on to the queue
+                     Queue queue = binding.getQueue();
                   
-                  if (del != null && del.isSelectorAccepted())
+                     Delivery del = queue.handle(null, ref, tx);    
+                     
+                     if (del != null && del.isSelectorAccepted())
+                     {
+                        routed = true;
+                     }  
+                  }
+                  else
                   {
+                     //It's a binding on a different office instance on the cluster
+                     numberRemote++;                     
+                     
+                     if (ref.isReliable() && binding.isDurable())
+                     {
+                        //Insert the reference into the database
+                        
+                        //TODO perhaps we should do this via a stub queue class
+                        pm.addReference(binding.getChannelId(), ref, tx);
+                     }
+                     
                      routed = true;
                   }  
-               }
-               else
-               {
-                  //It's a binding on a different office instance on the cluster
-                  numberRemote++;                     
-                  
-                  if (ref.isReliable() && binding.isDurable())
-                  {
-                     //Insert the reference into the database
-                     
-                     //TODO perhaps we should do this via a stub queue class
-                     pm.addReference(binding.getChannelId(), ref, tx);
-                  }
-                  
-                  routed = true;
-               }                                                
+               }                                                              
             }
                                     
             //Now we've sent the message to any local queues, we might also need
@@ -522,7 +527,7 @@
             throw new IllegalArgumentException(this.nodeId + "Binding already exists for node Id " + nodeId + " queue name " + queueName);
          }
          
-         binding = new ClusteredBindingImpl(nodeId, queueName, condition, filterString,
+         binding = new ClusteredBindingImpl(nodeId, queueName, condition, new Selector(filterString),
                                            channelID, durable); 
          
          binding.activate();
@@ -585,7 +590,7 @@
             throw new IllegalStateException("Cannot find binding for queue name " + queueName);
          }
          
-         Queue queue = binding.getQueue();
+         ClusteredQueue queue = (ClusteredQueue)binding.getQueue();
          
          Iterator iter = messages.iterator();
          
@@ -599,7 +604,7 @@
                
                ref = ms.reference(msg);
                
-               queue.handleDontPersist(null, ref, null);
+               queue.handleFromCluster(null, ref, null);
             }
             finally
             {
@@ -663,11 +668,11 @@
                      if (handle)
                      {
                         //It's a local binding so we pass the message on to the subscription
-                        Queue subscription = binding.getQueue();
+                        ClusteredQueue queue = (ClusteredQueue)binding.getQueue();
                      
                         //TODO instead of adding a new method on the channel
                         //we should set a header and use the same method
-                        subscription.handleDontPersist(null, ref, null);
+                        queue.handleFromCluster(null, ref, null);
                      }
                   }                               
                }
@@ -849,34 +854,37 @@
             {
                ClusteredBinding bb = (ClusteredBinding)iter.next();
                
-               ClusteredQueue q = (ClusteredQueue)bb.getQueue();
-               
-               //We don't bother sending the stat if there is less than STATS_DIFFERENCE_MARGIN_PERCENT % difference
-               
-               double newRate = q.getGrowthRate();
-               
-               int newMessageCount = q.messageCount();
-               
-               boolean sendStats = decideToSendStats(bb.getConsumptionRate(), newRate);
-               
-               if (!sendStats)
-               {
-                  sendStats = decideToSendStats(bb.getMessageCount(), newMessageCount);
-               }
-               
-               if (sendStats)
-               {
-                  bb.setConsumptionRate(newRate);
-                  bb.setMessageCount(newMessageCount);
+               if (bb.isActive())
+               {                  
+                  ClusteredQueue q = (ClusteredQueue)bb.getQueue();
                   
-                  if (stats == null)
+                  //We don't bother sending the stat if there is less than STATS_DIFFERENCE_MARGIN_PERCENT % difference
+                  
+                  double newRate = q.getGrowthRate();
+                  
+                  int newMessageCount = q.messageCount();
+                  
+                  boolean sendStats = decideToSendStats(bb.getConsumptionRate(), newRate);
+                  
+                  if (!sendStats)
                   {
-                     stats = new ArrayList();
+                     sendStats = decideToSendStats(bb.getMessageCount(), newMessageCount);
                   }
-                  QueueStats qs = new QueueStats(bb.getQueueName(), newRate, newMessageCount);
                   
-                  stats.add(qs);
-               }                  
+                  if (sendStats)
+                  {
+                     bb.setConsumptionRate(newRate);
+                     bb.setMessageCount(newMessageCount);
+                     
+                     if (stats == null)
+                     {
+                        stats = new ArrayList();
+                     }
+                     QueueStats qs = new QueueStats(bb.getQueueName(), newRate, newMessageCount);
+                     
+                     stats.add(qs);
+                  } 
+               }
             }
          }
       }
@@ -961,11 +969,11 @@
       
    // Protected ---------------------------------------------------------------------------------------
    
-   protected Binding createBinding(String nodeId, String queueName, String condition, String filter,
-            long channelId, boolean durable)
+   protected Binding createBinding(String nodeId, String queueName, String condition, Filter filter,
+            long channelId, boolean durable) throws Exception
    {
       return new ClusteredBindingImpl(nodeId, queueName, condition, filter,
-                                     channelId, durable);   
+                                      channelId, durable);   
    }
            
    protected void loadBindings() throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusteredQueue.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -25,12 +25,14 @@
 import java.util.List;
 
 import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.Filter;
+import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.SimpleDelivery;
 import org.jboss.messaging.core.local.Queue;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.util.Future;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -56,10 +58,11 @@
    
    private volatile int numberConsumed;
  
-   public ClusteredQueue(long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages, boolean recoverable, int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor, Filter filter)
+   public ClusteredQueue(long id, MessageStore ms, PersistenceManager pm, boolean acceptReliableMessages,
+                         boolean recoverable, int fullSize, int pageSize, int downCacheSize, QueuedExecutor executor)
    {
       super(id, ms, pm, acceptReliableMessages, recoverable, fullSize, pageSize,
-            downCacheSize, executor, filter);
+            downCacheSize, executor);
       
       lastTime = System.currentTimeMillis();      
       
@@ -115,7 +118,35 @@
          }
       }          
    }
+   
+   /*
+    * This is the same as the normal handle() method on the Channel except it doesn't
+    * persist the message even if it is persistent - this is because persistent messages
+    * are always persisted on the sending node before sending.
+    */
+   public Delivery handleFromCluster(DeliveryObserver sender, MessageReference ref, Transaction tx)
+      throws Exception
+   {
+      checkClosed();
+      
+      Future result = new Future();
 
+      if (tx == null)
+      {         
+         // Instead of executing directly, we add the handle request to the event queue.
+         // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
+         // result, but when remoting does, we can use a full SEDA approach and get even better
+         // throughput.
+         this.executor.execute(new HandleRunnable(result, sender, ref, false));
+   
+         return (Delivery)result.getResult();
+      }
+      else
+      {
+         return handleInternal(sender, ref, tx, false);
+      }
+   }
+
    protected void addReferenceInMemory(MessageReference ref) throws Exception
    {
       super.addReferenceInMemory(ref);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/MessageHolder.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -25,7 +25,6 @@
 import java.io.DataOutputStream;
 import java.util.Map;
 
-import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.message.MessageFactory;
 import org.jboss.messaging.util.StreamUtils;
@@ -42,8 +41,6 @@
  */
 class MessageHolder implements Streamable
 {
-   private static final Logger log = Logger.getLogger(MessageHolder.class);
-      
    private String routingKey;
    
    private Message message;

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/build.xml	2006-09-15 17:44:02 UTC (rev 1295)
@@ -58,14 +58,14 @@
    -->
 
    <property name="functional.tests.database" value="hsqldb"/>
-   <property name="functional.tests.serialization" value="jboss"/>
+   <property name="functional.tests.serialization" value="jms"/>
 
    <!--
         Stress tests.
    -->
 
    <property name="stress.tests.database" value="mysql"/>
-   <property name="stress.tests.serialization" value="jboss"/>
+   <property name="stress.tests.serialization" value="jms"/>
 
    <!--
         Project paths.
@@ -880,7 +880,7 @@
    <property name="test.execution.classpath.file" value=".test.execution.classpath"/>
 
    <target name="get-test-execution-classpath" depends="init">
-      <pathconvert refid="test.execution.classpath" 
+      <pathconvert refid="test.execution.classpath"
                    property="test.execution.classpath.unix"/>
       <echo message="${test.execution.classpath.unix}" file="${test.execution.classpath.file}"/>
    </target>

Modified: trunk/tests/etc/container.xml
===================================================================
--- trunk/tests/etc/container.xml	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/etc/container.xml	2006-09-15 17:44:02 UTC (rev 1295)
@@ -55,8 +55,10 @@
       precedence. Supported values: "jboss" and "java".
    -->
 
-   <serialization-type>jboss</serialization-type>
+   <!-- Must ALWAYS be jms -->
 
+   <serialization-type>jms</serialization-type>
+
 </container>
 
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -120,26 +120,26 @@
 
    public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
    {
-      log.info(this + " got routable:" + ref);
+      log.trace(this + " got routable:" + ref);
       
       try
       {
          if (ref == null)
          {
-            log.info("Receiver [" + name + "] is rejecting a null reference");
+            log.trace("Receiver [" + name + "] is rejecting a null reference");
             return null;
          }
          
          if (SELECTOR_REJECTING.equals(state))
          {
-            log.info(this + " is rejecting message since doesn't match selector");
+            log.trace(this + " is rejecting message since doesn't match selector");
             return new SimpleDelivery(null, null, true, false);
          }
 
          if (REJECTING.equals(state))
          {
 
-            log.info(this + " is rejecting reference " + ref);
+            log.trace(this + " is rejecting reference " + ref);
             return null;
          }
 
@@ -150,10 +150,10 @@
                                        "THE BEHAVIOUR OF A BROKEN RECEIVER");
          }
 
-        log.info("State is:" + state);
+         log.trace("State is:" + state);
          
          boolean done = ACKING.equals(state) ? true : false;
-         log.info(this + " is " + (done ? "ACKing" : "NACKing") +  " message " + ref);
+         log.trace(this + " is " + (done ? "ACKing" : "NACKing") +  " message " + ref);
          
          Message m = ref.getMessage();
          
@@ -162,7 +162,7 @@
 
          if (immediateAsynchronousAcknowledgment)
          {
-            log.info("simulating an asynchronous ACK that arrives before we return the delivery to channel");
+            log.trace("simulating an asynchronous ACK that arrives before we return the delivery to channel");
             try
             {
                delivery.acknowledge(null);
@@ -210,7 +210,7 @@
          log.error("No channel, cannot request messages");
          return;
       }
-      log.info("receiver explicitely requesting message from the channel");
+      log.trace("receiver explicitely requesting message from the channel");
       channel.deliver(true);
    }
 
@@ -292,7 +292,7 @@
          Message m = (Message)o[0];
          if (m == r)
          {
-            log.info("*** found it");
+            log.trace("*** found it");
             d = (Delivery)o[1];
             touple = o;
             break;
@@ -311,7 +311,7 @@
 
       d.acknowledge(tx);
 
-      log.info(this + " acknowledged "  + r);
+      log.trace(this + " acknowledged "  + r);
 
       // make sure I get rid of message if the transaction is rolled back
       if (tx != null)
@@ -349,7 +349,7 @@
 
       d.cancel();
 
-      log.info(this + " cancelled "  + r);
+      log.trace(this + " cancelled "  + r);
    }
 
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/NonRecoverableQueueTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -55,8 +55,7 @@
    {
       super.setUp();
       
-      queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(),
-                                      null);
+      queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
    }
 
    public void tearDown() throws Exception
@@ -72,8 +71,7 @@
 
    public void recoverChannel() throws Exception
    {
-      queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(),
-                                      null);
+      queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
    }
 
    // Public --------------------------------------------------------

Deleted: trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/QueueWithFilterTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -1,158 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.test.messaging.core.local;
-
-import org.jboss.messaging.core.Filter;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.core.local.Queue;
-import org.jboss.messaging.core.message.CoreMessage;
-import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
-import org.jboss.messaging.core.plugin.SimpleMessageStore;
-import org.jboss.messaging.core.plugin.contract.MessageStore;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.test.messaging.MessagingTestCase;
-import org.jboss.test.messaging.tools.jmx.ServiceContainer;
-
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
-/**
- * 
- * A QueueWithFilterTest.
- * 
- * @author <a href="tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision: 1237 $
- *
- * $Id: queueWithFilterTest.java 1237 2006-08-30 18:36:36Z timfox $
- */
-public class QueueWithFilterTest extends MessagingTestCase
-{
-   public QueueWithFilterTest(String name)
-   {
-      super(name);
-   }
-
-   protected PersistenceManager pm;
-   
-   protected MessageStore ms;
-   
-   protected ServiceContainer sc;
-    
-   // ChannelTestBase overrides  ------------------------------------
-
-   public void setUp() throws Exception
-   {
-      super.setUp();
-      
-      sc = new ServiceContainer("all,-remoting,-security");
-      sc.start();
-      
-      pm =
-         new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
-                                    true, true, true, 100);      
-      pm.start();
-      
-      ms = new SimpleMessageStore();
-      ms.start();
-            
-      log.debug("setup done");
-   }
-
-   public void tearDown() throws Exception
-   {
-      pm.stop();
-      
-      ms.stop();
-      
-      sc.stop();
-      
-      super.tearDown();
-   }
-   
-
-   public void testWithFilter()
-   {
-      Filter f = new SimpleFilter(3);
-      
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
-                                                   f);
-       
-      Message m1 = new CoreMessage(1, false, 0, 0, (byte)0, null, null, 0);
-      Message m2 = new CoreMessage(2, false, 0, 0, (byte)0, null, null, 0);
-      Message m3 = new CoreMessage(3, false, 0, 0, (byte)0, null, null, 0);
-      Message m4 = new CoreMessage(4, false, 0, 0, (byte)0, null, null, 0);
-      Message m5 = new CoreMessage(5, false, 0, 0, (byte)0, null, null, 0);
-      
-      assertNull(queue.handle(null, ms.reference(m1), null));
-      assertNull(queue.handle(null, ms.reference(m2), null));
-      assertNotNull(queue.handle(null, ms.reference(m3), null));
-      assertNull(queue.handle(null, ms.reference(m4), null));
-      assertNull(queue.handle(null, ms.reference(m5), null));
-      
-   }
-   
-   public void testWithoutFilter()
-   {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
-                                                   null);     
-      
-      Message m1 = new CoreMessage(1, false, 0, 0, (byte)0, null, null, 0);
-      Message m2 = new CoreMessage(2, false, 0, 0, (byte)0, null, null, 0);
-      Message m3 = new CoreMessage(3, false, 0, 0, (byte)0, null, null, 0);
-      Message m4 = new CoreMessage(4, false, 0, 0, (byte)0, null, null, 0);
-      Message m5 = new CoreMessage(5, false, 0, 0, (byte)0, null, null, 0);
-      
-      assertNotNull(queue.handle(null, ms.reference(m1), null));
-      assertNotNull(queue.handle(null, ms.reference(m2), null));
-      assertNotNull(queue.handle(null, ms.reference(m3), null));
-      assertNotNull(queue.handle(null, ms.reference(m4), null));
-      assertNotNull(queue.handle(null, ms.reference(m5), null));
-      
-   }
-   
-   public void crashChannel() throws Exception
-   {
-   }
-
-   public void recoverChannel() throws Exception
-   {
-   }
-   
-   class SimpleFilter implements Filter
-   {
-      private long value;
-      SimpleFilter(long value)
-      {
-         this.value = value;
-      }
-      public boolean accept(Routable routable)
-      {
-         MessageReference ref = (MessageReference)routable;
-         return ref.getMessageID() == value;
-      }
-      public String getFilterString()
-      {
-         return null;
-      }
-   }
-}

Modified: trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/RecoverableQueueTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -55,8 +55,7 @@
    {
       super.setUp();
       
-      queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
-                                      null);
+      queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
    }
 
    public void tearDown() throws Exception
@@ -72,8 +71,7 @@
 
    public void recoverChannel() throws Exception
    {
-      queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(),
-                                      null);
+      queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
    }
 
    // Public --------------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_2PCTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -64,9 +64,9 @@
    
    public void testChannelShareNP_2PC() throws Throwable
    {
-      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
-      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
                       
       Message[] msgs = new Message[150];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,9 +63,9 @@
    
    public void test1() throws Throwable
    {
-      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
-      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
                   
       Message[] msgs = new Message[150];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_TTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -64,9 +64,9 @@
    
    public void testChannelShareNP_Transactional() throws Throwable
    {
-      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
-      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
                                
       Message[] msgs = new Message[150];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_2PCTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -59,9 +59,9 @@
    
    public void test1() throws Throwable
    {
-      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
-      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
                             
       Message[] msgs = new Message[150];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_NTTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,9 +63,9 @@
    
    public void test1() throws Throwable
    {
-      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
-      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
                                 
       Message[] msgs = new Message[150];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_P_TTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,9 +63,9 @@
    
    public void test1() throws Throwable
    {
-      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue1 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
-      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(2, ms, pm, true, true, 50, 10, 5, new QueuedExecutor());
                             
       Message[] msgs = new Message[150];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/PagingTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -63,7 +63,7 @@
 
    public void testPaging() throws Exception
    {
-      Queue p = new Queue(0, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue p = new Queue(0, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
 
       CoreMessage m = null;
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_2PCTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
    
    public void test1() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
                
       Message[] msgs = new Message[241];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_NTTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -65,7 +65,7 @@
    
    public void test1() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
                            
       Message[] msgs = new Message[241];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_NP_TTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
  
    public void test1() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
                     
       Message[] msgs = new Message[241];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_2PCTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
    
    public void test1() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
                     
       Message[] msgs = new Message[241];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_NTTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -65,7 +65,7 @@
    
    public void test1() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
                         
       Message[] msgs = new Message[241];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_P_TTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
    
    public void test1() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
                   
       Message[] msgs = new Message[241];
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -66,7 +66,7 @@
    
    public void testRecoverableQueueCrash() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
       Message[] msgs = new Message[200];
       
@@ -126,7 +126,7 @@
       ms = new SimpleMessageStore();
       ms.start();
        
-      Queue queue2 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
       
       queue2.load();
       
@@ -155,7 +155,7 @@
    {
       //Non recoverable queue - eg temporary queue
       
-      Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
 
       Message[] msgs = new Message[200];
       
@@ -215,7 +215,7 @@
       ms = new SimpleMessageStore();
       ms.start();
       
-      Queue queue2 = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
       
       queue2.load();
       
@@ -242,7 +242,7 @@
    {
       //Non recoverable queue - eg temporary queue
       
-      Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, false, 100, 20, 10, new QueuedExecutor());
       
       Message[] msgs = new Message[200];
       
@@ -306,7 +306,7 @@
    
    public void testQueueReloadWithSmallerFullSize() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
 
       Message[] msgs = new Message[150];
       
@@ -360,7 +360,7 @@
       
       //Reload the queue with a smaller fullSize
       
-      Queue queue2 = new Queue(1, ms, pm, true, false, 50, 20, 10, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(1, ms, pm, true, false, 50, 20, 10, new QueuedExecutor());
       
       queue2.load();
       
@@ -404,7 +404,7 @@
    
    public void testReloadWithLargerFullSize() throws Throwable
    {
-      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor(), null);
+      Queue queue = new Queue(1, ms, pm, true, true, 100, 20, 10, new QueuedExecutor());
 
       Message[] msgs = new Message[150];
       
@@ -458,7 +458,7 @@
       
       //Reload the queue with a smaller fullSize
       
-      Queue queue2 = new Queue(1, ms, pm, true, false, 130, 20, 10, new QueuedExecutor(), null);
+      Queue queue2 = new Queue(1, ms, pm, true, false, 130, 20, 10, new QueuedExecutor());
       
       queue2.load();
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredPostOfficeTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -28,6 +28,7 @@
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.BasicRedistributionPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredBinding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredPostOfficeImpl;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.FavourLocalRoutingPolicy;
@@ -93,12 +94,12 @@
          
          //Add a couple of bindings
          
-         ClusteredQueue queue1 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         ClusteredQueue queue1 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding1 =
-            office1.bindClusteredQueue("sub1", "topic1", queue1);
-         ClusteredQueue queue2 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+            office1.bindClusteredQueue("sub1", "topic1", null, queue1);
+         ClusteredQueue queue2 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding2 =
-            office1.bindClusteredQueue("sub2", "topic1", queue2);
+            office1.bindClusteredQueue("sub2", "topic1", null, queue2);
          
          //Start another office - make sure it picks up the bindings from the first node
          
@@ -113,9 +114,9 @@
          
          //Add another binding on node 2
          
-         ClusteredQueue queue3 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         ClusteredQueue queue3 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding3 =
-            office2.bindClusteredQueue("sub3", "topic1", queue3);
+            office2.bindClusteredQueue("sub3", "topic1", null, queue3);
   
          //Make sure both nodes pick it up
          
@@ -137,9 +138,9 @@
 
          //Add another binding on node 1
          
-         ClusteredQueue queue4 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         ClusteredQueue queue4 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding4 =
-            office2.bindClusteredQueue("sub4", "topic1", queue4);
+            office2.bindClusteredQueue("sub4", "topic1", null, queue4);
          
          // Make sure both nodes pick it up
          
@@ -196,9 +197,9 @@
          
          //Add another binding on node 3
                   
-         ClusteredQueue queue5 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         ClusteredQueue queue5 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding5 =
-            office3.bindClusteredQueue("sub5", "topic1", queue5);
+            office3.bindClusteredQueue("sub5", "topic1", null, queue5);
          
          // Make sure all nodes pick it up
          
@@ -228,13 +229,13 @@
          
          //Add a durable and a non durable binding on node 1
          
-         ClusteredQueue queue6 = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         ClusteredQueue queue6 = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding6 =
-            office1.bindClusteredQueue("sub6", "topic1", queue6);
+            office1.bindClusteredQueue("sub6", "topic1", null, queue6);
          
-         ClusteredQueue queue7 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         ClusteredQueue queue7 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding7 =
-            office1.bindClusteredQueue("sub7", "topic1", queue7);
+            office1.bindClusteredQueue("sub7", "topic1", null, queue7);
          
          // Make sure all nodes pick them up
          
@@ -406,6 +407,53 @@
       clusteredTransactionalRoute(false);
    }
    
+   public final void testRedistribute() throws Exception
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+          
+      try
+      {   
+         office1 = createClusteredPostOffice("node1", "testgroup");
+         office2 = createClusteredPostOffice("node2", "testgroup");
+         
+         ClusteredQueue queue1 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         ClusteredBinding binding1 = office1.bindClusteredQueue("queue1", "queue1", null, queue1);
+         
+         ClusteredQueue queue2 = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         ClusteredBinding binding2 = office2.bindClusteredQueue("queue1", "queue1", null, queue1);
+         
+         final int NUM_MESSAGES = 10;
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            Message msg = CoreMessageFactory.createCoreMessage(i, false, null);      
+            MessageReference ref = ms.reference(msg);         
+            boolean routed = office1.route(ref, "queue1", null);
+         }
+         
+         List msgs = queue1.browse();
+         assertEquals(NUM_MESSAGES, msgs.size());
+         msgs = queue2.browse();
+         assertEquals(0, msgs.size());
+         
+         
+      
+      }
+      finally
+      {
+         if (office1 != null)
+         {            
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+      }
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -427,53 +475,53 @@
          ClusteredQueue[] queues = new ClusteredQueue[16];
          Binding[] bindings = new Binding[16];
          
-         queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[0] = office1.bindClusteredQueue("sub1", "topic1", queues[0]);
+         queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[0] = office1.bindClusteredQueue("sub1", "topic1", null, queues[0]);
          
-         queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[1] = office1.bindClusteredQueue("sub2", "topic1", queues[1]);
+         queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[1] = office1.bindClusteredQueue("sub2", "topic1", null, queues[1]);
          
-         queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[2] = office2.bindClusteredQueue("sub3", "topic1", queues[2]);
+         queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[2] = office2.bindClusteredQueue("sub3", "topic1", null, queues[2]);
          
-         queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[3] = office2.bindClusteredQueue("sub4", "topic1", queues[3]);
+         queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[3] = office2.bindClusteredQueue("sub4", "topic1", null, queues[3]);
          
-         queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[4] = office2.bindClusteredQueue("sub5", "topic1", queues[4]);
+         queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[4] = office2.bindClusteredQueue("sub5", "topic1", null, queues[4]);
          
-         queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[5] = office1.bindClusteredQueue("sub6", "topic1", queues[5]);
+         queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[5] = office1.bindClusteredQueue("sub6", "topic1", null, queues[5]);
          
-         queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[6] = office1.bindClusteredQueue("sub7", "topic1", queues[6]);
+         queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[6] = office1.bindClusteredQueue("sub7", "topic1", null, queues[6]);
          
-         queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[7] = office1.bindClusteredQueue("sub8", "topic1", queues[7]);
+         queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[7] = office1.bindClusteredQueue("sub8", "topic1", null, queues[7]);
          
-         queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[8] = office1.bindClusteredQueue("sub9", "topic2", queues[8]);
+         queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[8] = office1.bindClusteredQueue("sub9", "topic2", null, queues[8]);
          
-         queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[9] = office1.bindClusteredQueue("sub10", "topic2", queues[9]);
+         queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[9] = office1.bindClusteredQueue("sub10", "topic2", null, queues[9]);
          
-         queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[10] = office2.bindClusteredQueue("sub11", "topic2", queues[10]);
+         queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[10] = office2.bindClusteredQueue("sub11", "topic2", null, queues[10]);
          
-         queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[11] = office2.bindClusteredQueue("sub12", "topic2", queues[11]);
+         queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[11] = office2.bindClusteredQueue("sub12", "topic2", null, queues[11]);
          
-         queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[12] = office2.bindClusteredQueue("sub13", "topic2", queues[12]);
+         queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[12] = office2.bindClusteredQueue("sub13", "topic2", null, queues[12]);
          
-         queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[13] = office1.bindClusteredQueue("sub14", "topic2", queues[13]);
+         queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[13] = office1.bindClusteredQueue("sub14", "topic2", null, queues[13]);
          
-         queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[14] = office1.bindClusteredQueue("sub15", "topic2", queues[14]);
+         queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[14] = office1.bindClusteredQueue("sub15", "topic2", null, queues[14]);
          
-         queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[15] = office1.bindClusteredQueue("sub16", "topic2", queues[15]);
+         queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[15] = office1.bindClusteredQueue("sub16", "topic2", null, queues[15]);
        
          SimpleReceiver[] receivers = new SimpleReceiver[16];
          
@@ -589,53 +637,53 @@
          ClusteredQueue[] queues = new ClusteredQueue[16];
          Binding[] bindings = new Binding[16];
          
-         queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[0] = office1.bindClusteredQueue("sub1", "topic1", queues[0]);
+         queues[0] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[0] = office1.bindClusteredQueue("sub1", "topic1", null, queues[0]);
          
-         queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[1] = office1.bindClusteredQueue("sub2", "topic1", queues[1]);
+         queues[1] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[1] = office1.bindClusteredQueue("sub2", "topic1", null, queues[1]);
          
-         queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[2] = office2.bindClusteredQueue("sub3", "topic1", queues[2]);
+         queues[2] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[2] = office2.bindClusteredQueue("sub3", "topic1", null, queues[2]);
          
-         queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[3] = office2.bindClusteredQueue("sub4", "topic1", queues[3]);
+         queues[3] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[3] = office2.bindClusteredQueue("sub4", "topic1", null, queues[3]);
          
-         queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[4] = office2.bindClusteredQueue("sub5", "topic1", queues[4]);
+         queues[4] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[4] = office2.bindClusteredQueue("sub5", "topic1", null, queues[4]);
          
-         queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[5] = office1.bindClusteredQueue("sub6", "topic1", queues[5]);
+         queues[5] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[5] = office1.bindClusteredQueue("sub6", "topic1", null, queues[5]);
          
-         queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[6] = office1.bindClusteredQueue("sub7", "topic1", queues[6]);
+         queues[6] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[6] = office1.bindClusteredQueue("sub7", "topic1", null, queues[6]);
          
-         queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[7] = office1.bindClusteredQueue("sub8", "topic1", queues[7]);
+         queues[7] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[7] = office1.bindClusteredQueue("sub8", "topic1", null, queues[7]);
          
-         queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[8] = office1.bindClusteredQueue("sub9", "topic2", queues[8]);
+         queues[8] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[8] = office1.bindClusteredQueue("sub9", "topic2", null, queues[8]);
          
-         queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[9] = office1.bindClusteredQueue("sub10", "topic2", queues[9]);
+         queues[9] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[9] = office1.bindClusteredQueue("sub10", "topic2", null, queues[9]);
          
-         queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[10] = office2.bindClusteredQueue("sub11", "topic2", queues[10]);
+         queues[10] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[10] = office2.bindClusteredQueue("sub11", "topic2", null, queues[10]);
          
-         queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[11] = office2.bindClusteredQueue("sub12", "topic2", queues[11]);
+         queues[11] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[11] = office2.bindClusteredQueue("sub12", "topic2", null, queues[11]);
          
-         queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[12] = office2.bindClusteredQueue("sub13", "topic2", queues[12]);
+         queues[12] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[12] = office2.bindClusteredQueue("sub13", "topic2", null, queues[12]);
          
-         queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[13] = office1.bindClusteredQueue("sub14", "topic2", queues[13]);
+         queues[13] = new ClusteredQueue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[13] = office1.bindClusteredQueue("sub14", "topic2", null, queues[13]);
          
-         queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[14] = office1.bindClusteredQueue("sub15", "topic2", queues[14]);
+         queues[14] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[14] = office1.bindClusteredQueue("sub15", "topic2", null, queues[14]);
          
-         queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
-         bindings[15] = office1.bindClusteredQueue("sub16", "topic2", queues[15]);
+         queues[15] = new ClusteredQueue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
+         bindings[15] = office1.bindClusteredQueue("sub16", "topic2", null, queues[15]);
 
          SimpleReceiver[] receivers = new SimpleReceiver[16];
          
@@ -1183,7 +1231,7 @@
                                  null, true, nodeId, "Clustered", ms, groupName,
                                  JGroupsUtil.getControlStackProperties(50, 1),
                                  JGroupsUtil.getDataStackProperties(50, 1),
-                                 tr, pm, 5000, 5000, routingPolicy, redistPolicy, 5000);
+                                 tr, pm, 5000, 5000, routingPolicy, redistPolicy, 1000);
       
       postOffice.start();      
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/SimplePostOfficeTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -146,15 +146,15 @@
          Filter filter1 = new Selector("x = 'cheese'");
          Filter filter2 = new Selector("y = 'bread'");
          
-         Queue queue1 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+         Queue queue1 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());
          
          Binding binding1 =
-            office1.bindQueue("durableQueue", "condition1", queue1);
+            office1.bindQueue("durableQueue", "condition1", null, queue1);
          
          //Binding twice with the same name should fail      
          try
          {
-            Binding bindFail = office1.bindQueue("durableQueue", "condition1", queue1);
+            Binding bindFail = office1.bindQueue("durableQueue", "condition1", null, queue1);
             fail();
          }
          catch (IllegalArgumentException e)
@@ -163,9 +163,9 @@
          }
                
          //Bind one non durable
-         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);
+         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());
          Binding binding2 =
-            office1.bindQueue("nonDurableQueue", "condition2", queue2);
+            office1.bindQueue("nonDurableQueue", "condition2", null, queue2);
          
          //Check they're there
          
@@ -247,37 +247,37 @@
       {      
          office = createPostOffice();
          
-         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding1 =
-            office.bindQueue("queue1", "condition1", queue1);
+            office.bindQueue("queue1", "condition1", null, queue1);
          
-         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding2 =
-            office.bindQueue("queue2", "condition1", queue2);
+            office.bindQueue("queue2", "condition1", null, queue2);
          
-         Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding3 =
-            office.bindQueue("queue3", "condition1", queue3);
+            office.bindQueue("queue3", "condition1", null, queue3);
          
-         Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding4 =
-            office.bindQueue("queue4", "condition1", queue4);
+            office.bindQueue("queue4", "condition1", null, queue4);
          
-         Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding5 =
-            office.bindQueue("queue5", "condition2", queue5);
+            office.bindQueue("queue5", "condition2", null, queue5);
          
-         Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding6 =
-            office.bindQueue("queue6", "condition2", queue6);
+            office.bindQueue("queue6", "condition2", null, queue6);
          
-         Queue queue7 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue7 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding7 =
-            office.bindQueue("queue7", "condition2", queue7);
+            office.bindQueue("queue7", "condition2", null, queue7);
          
-         Queue queue8 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue8 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding8 =
-            office.bindQueue("queue8", "condition2", queue8);
+            office.bindQueue("queue8", "condition2", null, queue8);
          
          
          List bindings = office.listBindingsForCondition("dummy");
@@ -353,29 +353,29 @@
       
          postOffice = createPostOffice();
          
-         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding1 =
-            postOffice.bindQueue("queue1", "topic1", queue1);
+            postOffice.bindQueue("queue1", "topic1", null, queue1);
          
-         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding2 =
-            postOffice.bindQueue("queue2", "topic1", queue2);
+            postOffice.bindQueue("queue2", "topic1", null, queue2);
          
-         Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding3 =
-            postOffice.bindQueue("queue3", "topic1", queue3);
+            postOffice.bindQueue("queue3", "topic1", null, queue3);
          
-         Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue4 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding4 =
-            postOffice.bindQueue("queue4", "topic2", queue4);
+            postOffice.bindQueue("queue4", "topic2", null, queue4);
          
-         Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue5 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding5 =
-            postOffice.bindQueue("queue5", "topic2", queue5);
+            postOffice.bindQueue("queue5", "topic2", null, queue5);
          
-         Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue6 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding6 =
-            postOffice.bindQueue("queue6", "topic2", queue6);
+            postOffice.bindQueue("queue6", "topic2", null, queue6);
       
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
@@ -494,9 +494,9 @@
       {      
          postOffice = createPostOffice();
          
-         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding1 =
-            postOffice.bindQueue("queue1", "condition1", queue1);
+            postOffice.bindQueue("queue1", "condition1", null, queue1);
               
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
          queue1.add(receiver1);
@@ -538,29 +538,29 @@
       {      
          postOffice = createPostOffice();
       
-         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding1 =
-            postOffice.bindQueue("queue1", "topic1", queue1);
+            postOffice.bindQueue("queue1", "topic1", null, queue1);
          
-         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue2 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding2 =
-            postOffice.bindQueue("queue2", "topic1", queue2);
+            postOffice.bindQueue("queue2", "topic1", null, queue2);
          
-         Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue3 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding3 =
-            postOffice.bindQueue("queue3", "topic1", queue3);
+            postOffice.bindQueue("queue3", "topic1", null, queue3);
          
-         Queue queue4 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue4 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding4 =
-            postOffice.bindQueue("queue4", "topic2", queue4);
+            postOffice.bindQueue("queue4", "topic2", null, queue4);
          
-         Queue queue5 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue5 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding5 =
-            postOffice.bindQueue("queue5", "topic2", queue5);
+            postOffice.bindQueue("queue5", "topic2", null, queue5);
          
-         Queue queue6 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue6 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding6 =
-            postOffice.bindQueue("queue6", "topic2", queue6);
+            postOffice.bindQueue("queue6", "topic2", null, queue6);
       
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
          queue1.add(receiver1);
@@ -701,13 +701,13 @@
       {      
          postOffice = createPostOffice();
       
-         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue1 = new Queue(im.getId(), ms, pm, true, false, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding1 =
-            postOffice.bindQueue("queue1", "topic1", queue1);
+            postOffice.bindQueue("queue1", "topic1", null, queue1);
          
-         Queue queue2 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get(), null);         
+         Queue queue2 = new Queue(im.getId(), ms, pm, true, true, 2000, 100, 100, (QueuedExecutor)pool.get());         
          Binding binding2 =
-            postOffice.bindQueue("queue2", "topic1", queue2);
+            postOffice.bindQueue("queue2", "topic1", null, queue2);
           
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);;
          queue1.add(receiver1);
@@ -962,8 +962,10 @@
    protected void assertEquivalent(Binding binding1, Binding binding2)
    {
       assertEquals(binding1.getNodeId(), binding2.getNodeId());
-      assertEquals(binding1.getQueueName(), binding2.getQueueName());      
-      assertEquals(binding1.getSelector(), binding2.getSelector());
+      assertEquals(binding1.getQueueName(), binding2.getQueueName()); 
+      String selector1 = binding1.getFilter() != null ? binding1.getFilter().getFilterString() : null;
+      String selector2 = binding2.getFilter() != null ? binding2.getFilter().getFilterString() : null;
+      assertEquals(selector1, selector2);
       assertEquals(binding1.getChannelId(), binding2.getChannelId());
       assertEquals(binding1.isDurable(), binding2.isDurable());
    }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -272,32 +272,26 @@
       final int NUM_MESSAGES = 20;
 
       //Send some messages
-      log.info("********* SENDING MSGS");
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          Message m = producerSess.createMessage();
          producer.send(m);
       }
-      log.info("*********** SENT MSGSSGS");
-      
+
       assertRemainingMessages(0);
       
       producerSess.rollback();
       
       //Send some messages
-      log.info("********* SENDING MOREMSGS");
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
          Message m = producerSess.createMessage();
          producer.send(m);
       }
-      log.info("********* SENT MORE MSGS");
       assertRemainingMessages(0);
       
-      log.info("COMMITTING");
       producerSess.commit();
-      log.info("COMMITTED");
-      
+
       assertRemainingMessages(NUM_MESSAGES);
 
       log.trace("Sent messages");

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -259,7 +259,7 @@
             "<invoker transport=\"socket\">\n" +              
                "<attribute name=\"marshaller\" isParam=\"true\">org.jboss.jms.server.remoting.JMSWireFormat</attribute>\n" +
                "<attribute name=\"unmarshaller\" isParam=\"true\">org.jboss.jms.server.remoting.JMSWireFormat</attribute>\n" +
-               "<attribute name=\"serializationtype\" isParam=\"true\">jboss</attribute>\n" +
+               "<attribute name=\"serializationtype\" isParam=\"true\">jms</attribute>\n" +
                "<attribute name=\"dataType\" isParam=\"true\">jms</attribute>\n" +
                "<attribute name=\"serverBindPort\">" + port +"</attribute>\n" +
                "<attribute name=\"socket.check_connection\" isParam=\"true\">false</attribute>\n" +

Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-09-15 15:52:42 UTC (rev 1294)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-09-15 17:44:02 UTC (rev 1295)
@@ -79,6 +79,7 @@
 import org.jboss.tm.usertx.client.ServerVMClientUserTransaction;
 import org.jboss.remoting.InvokerLocator;
 import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.serialization.SerializationStreamFactory;
 
 
 /**
@@ -972,10 +973,13 @@
 
    private void startRemoting(boolean multiplex) throws Exception
    {
+      SerializationStreamFactory.setManagerClassName(
+               "jms", "org.jboss.jms.server.remoting.MessagingSerializationManager");
+              
       RemotingJMXWrapper mbean;
 
       String serializationType = config.getSerializationType();
-
+      
       String params = "/?marshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
                       "unmarshaller=org.jboss.jms.server.remoting.JMSWireFormat&" +
                       "serializationtype=" + serializationType + "&" +




More information about the jboss-cvs-commits mailing list