[jboss-cvs] JBoss Messaging SVN: r1430 - in trunk: src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Oct 4 12:07:44 EDT 2006


Author: timfox
Date: 2006-10-04 12:07:34 -0400 (Wed, 04 Oct 2006)
New Revision: 1430

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NodeAddressInfo.java
Modified:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SharedState.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-571


Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -204,9 +204,7 @@
    public Binding unbindQueue(String queueName) throws Throwable
    {
       if (trace) { log.trace(this + " unbinding queue " + queueName); }
-            
-      log.info("unbinding queue: " + queueName);
-      
+             
       if (queueName == null)
       {
          throw new IllegalArgumentException("Queue name is null");
@@ -223,8 +221,6 @@
             //Need to remove from db too
             
             deleteBinding(binding.getQueue().getName());    
-            
-            log.info("deleting binding from db");
          }
          
          binding.getQueue().removeAllReferences();         
@@ -534,8 +530,6 @@
 
          int rows = ps.executeUpdate();
          
-         log.info("deleted " + rows + " rows");
-         
          return rows == 1;
       }
       finally

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -25,6 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.tx.TxCallback;
 
@@ -60,6 +61,10 @@
  */
 class CastMessagesCallback implements TxCallback
 {           
+   private static final Logger log = Logger.getLogger(CastMessagesCallback.class);
+   
+   private boolean trace = log.isTraceEnabled();   
+      
    private List persistent;
    
    private List nonPersistent;
@@ -186,13 +191,13 @@
    {
       if (multicast)
       {
+         if (trace) { log.trace("Multicasting transaction across group"); }
          office.asyncSendRequest(req);
       }
       else
       {
-         //FIXME temp commented out until unicast works
-         //office.asyncSendRequest(req, toNodeId);
-         office.asyncSendRequest(req);
+         if (trace) { log.trace("Unicasting transaction to node"); }
+         office.asyncSendRequest(req, toNodeId);
       }
    }
       

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -105,8 +105,8 @@
    
    private View currentView;
    
-   //Map < Address, node id>
-   private Map nodeIdAddressMap;
+   //Map < Address, NodeAddressInfo>
+   private Map nodeIdAddressesMap;
    
    private Map holdingArea;
    
@@ -141,7 +141,7 @@
    
    private void init()
    {
-      this.nodeIdAddressMap = new HashMap();
+      this.nodeIdAddressesMap = new HashMap();
       
       this.holdingArea = new HashMap();
    }
@@ -277,17 +277,22 @@
       
       super.start();
                   
-      Address currentAddress = syncChannel.getLocalAddress();
+      Address syncAddress = syncChannel.getLocalAddress();
+      
+      Address asyncAddress = asyncChannel.getLocalAddress();
                      
-      handleAddressNodeMapping(currentAddress, nodeId);
+      NodeAddressInfo info = new NodeAddressInfo(syncAddress, asyncAddress);
       
-      syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));           
+      handleAddressNodeMapping(info, nodeId);
       
+      syncSendRequest(new SendNodeIdRequest(info, nodeId));           
+      
       statsSender.start();
       
       started = true;   
       
-      if (trace) { log.trace("Started " + this + " with address " + currentAddress); }
+      if (trace) { log.trace("Started " + this + " with sync address " + syncAddress +
+                             " async address " + asyncAddress); }
    }
 
    public synchronized void stop() throws Exception
@@ -465,16 +470,16 @@
                {
                   if (numberRemote == 1)
                   {
-                  //   log.info("unicast no tx");
+                     if (trace) { log.trace(this.nodeId + " unicasting message to " + lastNodeId); }
                      //Unicast - only one node is interested in the message
+                                        
+                     asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
                      
-                     //TODO - temporarily commented out until can get unicast to work                     
-                     //asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
-                     asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
+                     //syncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId, false);
                   }
                   else
                   {
-                  //   log.info("multicast no tx");
+                     if (trace) { log.trace(this.nodeId + " multicasting message to group"); }
                      //Multicast - more than one node is interested
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
                   }                                 
@@ -545,7 +550,7 @@
       {                     
          //Sanity
 
-         if (!nodeIdAddressMap.containsKey(new Integer(nodeId)))
+         if (!nodeIdAddressesMap.containsKey(new Integer(nodeId)))
          {
             throw new IllegalStateException("Cannot find address for node: " + nodeId);
          }
@@ -590,7 +595,7 @@
       try
       {         
          // Sanity
-         if (!nodeIdAddressMap.containsKey(new Integer(nodeId)))
+         if (!nodeIdAddressesMap.containsKey(new Integer(nodeId)))
          {
             throw new IllegalStateException("Cannot find address for node: " + nodeId);
          }
@@ -603,18 +608,19 @@
       }
    }
    
-   public void handleAddressNodeMapping(Address address, int nodeId) throws Exception
+   public void handleAddressNodeMapping(NodeAddressInfo info, int nodeId) throws Exception
    {
       lock.writeLock().acquire();
       
       if (trace)
       {
-         log.trace(this.nodeId + " Adding address node mapping for " + address + " and " + nodeId);
+         log.trace(this.nodeId + " Adding address node mapping for " + info.getSyncChannelAddress() +
+                   "," + info.getAsyncChannelAddress() + " and " + nodeId);
       }
       
       try
       { 
-         nodeIdAddressMap.put(new Integer(nodeId), address);
+         nodeIdAddressesMap.put(new Integer(nodeId), info);
       }
       finally
       {
@@ -715,13 +721,13 @@
    }
    
    /*
-    * Unicast a message to one members of the group
+    * Unicast a message to one member of the group
     */
    public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
    {               
       if (trace) { log.trace(this.nodeId + " sending asynch request to single node, request: " + request + " node " + nodeId); }
       
-      Address address = this.getAddressForNodeId(nodeId);
+      Address address = this.getAddressForNodeId(nodeId, false);
       
       if (trace) { log.trace(this.nodeId + " sending to address " + address); }
       
@@ -734,7 +740,7 @@
             
       Message m = new Message(address, null, bytes);
       
-      asyncChannel.send(m);
+      asyncChannel.send(m);      
    }
    
    /*
@@ -744,7 +750,7 @@
    {              
       if (trace) { log.trace(this.nodeId + " sending synch request to single node, request: " + request + " node " + nodeId); }
             
-      Address address = this.getAddressForNodeId(nodeId);
+      Address address = this.getAddressForNodeId(nodeId, true);
       
       if (trace) { log.trace(this.nodeId + " sending to address " + address); }      
       
@@ -1022,19 +1028,7 @@
       return dels;
    }
    
-   public Address getAddressForNodeId(int nodeId) throws Exception
-   {
-      lock.readLock().acquire();
-      
-      try
-      {
-         return (Address)nodeIdAddressMap.get(new Integer(nodeId));
-      }
-      finally
-      {
-         lock.readLock().release();      
-      }
-   }
+   
                    
    // Public ------------------------------------------------------------------------------------------
       
@@ -1190,21 +1184,21 @@
    }
    
 
-   private Integer getNodeIdForAddress(Address address) throws Exception
+   private Integer getNodeIdForSyncAddress(Address address) throws Exception
    {
       lock.readLock().acquire();
       try
       { 
-         Iterator iter = nodeIdAddressMap.entrySet().iterator();
+         Iterator iter = nodeIdAddressesMap.entrySet().iterator();
          
          Integer nodeId = null;
          while (iter.hasNext())
          {
             Map.Entry entry = (Map.Entry)iter.next();
             
-            Address adr = (Address)entry.getValue();
+            NodeAddressInfo info = (NodeAddressInfo)entry.getValue();
             
-            if (adr.equals(address))
+            if (info.getSyncChannelAddress().equals(address))
             {
                nodeId = (Integer)entry.getKey();
             }
@@ -1254,7 +1248,7 @@
          }
          
          //Remove the address mapping
-         nodeIdAddressMap.remove(nodeId);
+         nodeIdAddressesMap.remove(nodeId);
       }
       finally
       {
@@ -1289,7 +1283,7 @@
          }
       }
       
-      SharedState state = new SharedState(bindings, nodeIdAddressMap);
+      SharedState state = new SharedState(bindings, nodeIdAddressesMap);
       
       byte[] bytes = StreamUtils.toBytes(state); 
            
@@ -1332,9 +1326,9 @@
          addBinding(binding);         
       }
       
-      this.nodeIdAddressMap.clear();
+      this.nodeIdAddressesMap.clear();
       
-      this.nodeIdAddressMap.putAll(state.getNodeIdAddressMap());
+      this.nodeIdAddressesMap.putAll(state.getNodeIdAddressMap());
    }
    
    
@@ -1365,6 +1359,36 @@
       return request;            
    }
    
+   private Address getAddressForNodeId(int nodeId, boolean sync) throws Exception
+   {
+      lock.readLock().acquire();
+      
+      try
+      {
+         NodeAddressInfo info = (NodeAddressInfo)nodeIdAddressesMap.get(new Integer(nodeId));
+         
+         if (info != null)
+         {
+            if (sync)
+            {
+               return info.getSyncChannelAddress();
+            }
+            else
+            {
+               return info.getAsyncChannelAddress();
+            }
+         }
+         else
+         {
+            return null;
+         }
+      }
+      finally
+      {
+         lock.readLock().release();      
+      }
+   }
+   
    // Inner classes -------------------------------------------------------------------
     
    /*
@@ -1401,7 +1425,6 @@
       
       public void receive(Message message)
       {         
-         //log.info("Received message on control channel: " + message);
       }
       
       public void setState(byte[] bytes)
@@ -1482,7 +1505,7 @@
                   {                  
                      try
                      {
-                        Integer nodeId = getNodeIdForAddress(address);
+                        Integer nodeId = getNodeIdForSyncAddress(address);
                         
                         if (nodeId == null)
                         {
@@ -1523,7 +1546,7 @@
    
    
    /*
-    * This class is used to listen for messages on the data channel
+    * This class is used to listen for messages on the async channel
     */
    private class DataReceiver implements Receiver
    {
@@ -1550,6 +1573,8 @@
       
       public void receive(Message message)
       {
+         if (trace) { log.trace(nodeId + " received message " + message + " on async channel"); }
+         
          try
          {
             byte[] bytes = message.getBuffer();
@@ -1580,6 +1605,7 @@
    {
       public Object handle(Message message)
       {                
+         if (trace) { log.trace(nodeId + " received message " + message + " on sync channel"); }
          try
          {   
             byte[] bytes = message.getBuffer();

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NodeAddressInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NodeAddressInfo.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/NodeAddressInfo.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -0,0 +1,95 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.jboss.messaging.util.Streamable;
+import org.jgroups.Address;
+import org.jgroups.stack.IpAddress;
+
+/**
+ * A NodeAddressInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+class NodeAddressInfo implements Streamable
+{
+   private Address syncChannelAddress;
+   
+   private Address asyncChannelAddress;
+   
+   public NodeAddressInfo()
+   {     
+   }
+   
+   NodeAddressInfo(Address syncChannelAddress, Address asyncChannelAddress)
+   {
+      this.syncChannelAddress = syncChannelAddress;
+      
+      this.asyncChannelAddress = asyncChannelAddress;
+   }
+   
+   Address getSyncChannelAddress()
+   {
+      return syncChannelAddress;
+   }
+   
+   Address getAsyncChannelAddress()
+   {
+      return asyncChannelAddress;
+   }
+   
+   public void read(DataInputStream in) throws Exception
+   {
+      syncChannelAddress = new IpAddress();
+      
+      syncChannelAddress.readFrom(in);
+      
+      asyncChannelAddress = new IpAddress();
+      
+      asyncChannelAddress.readFrom(in);
+   }
+
+   public void write(DataOutputStream out) throws Exception
+   {
+      if (!(syncChannelAddress instanceof IpAddress))
+      {
+         throw new IllegalStateException("Address must be IpAddress");
+      }
+      
+      if (!(asyncChannelAddress instanceof IpAddress))
+      {
+         throw new IllegalStateException("Address must be IpAddress");
+      }
+      
+      syncChannelAddress.writeTo(out);
+      
+      asyncChannelAddress.writeTo(out);  
+   }
+
+}

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -26,7 +26,6 @@
 
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jgroups.Address;
 
 /**
  * 
@@ -50,13 +49,11 @@
    void removeBindingFromCluster(int nodeId, String queueName)
       throws Exception;
    
-   void handleAddressNodeMapping(Address address, int nodeId)
+   void handleAddressNodeMapping(NodeAddressInfo info, int nodeId)
       throws Exception;
    
    void routeFromCluster(Message message, String routingKey, Map queueNameNodeIdMap) throws Exception;
    
-   //void addToQueue(String queueName, List messages) throws Exception;
-   
    void asyncSendRequest(ClusterRequest request) throws Exception;
    
    void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception;

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -213,5 +213,4 @@
       }
    }
 
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SendNodeIdRequest.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -24,9 +24,6 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 
-import org.jgroups.Address;
-import org.jgroups.stack.IpAddress;
-
 /**
  * A SendNodeIdRequest
  * 
@@ -41,7 +38,7 @@
 {
    static final int TYPE = 7;
 
-   private Address address;
+   private NodeAddressInfo info;
    
    private int nodeId;
    
@@ -49,16 +46,16 @@
    {      
    }
    
-   SendNodeIdRequest(Address address, int nodeId)
+   SendNodeIdRequest(NodeAddressInfo info, int nodeId)
    {
-      this.address = address;
+      this.info = info;
       
       this.nodeId = nodeId;      
    }
    
    Object execute(PostOfficeInternal office) throws Exception
    {
-      office.handleAddressNodeMapping(address, nodeId);
+      office.handleAddressNodeMapping(info, nodeId);
       
       return null;
    }
@@ -70,22 +67,17 @@
 
    public void read(DataInputStream in) throws Exception
    {
-      address = new IpAddress();
+      info = new NodeAddressInfo();
       
-      address.readFrom(in);
+      info.read(in);
       
       nodeId = in.readInt();
    }
 
    public void write(DataOutputStream out) throws Exception
    {
-      if (!(address instanceof IpAddress))
-      {
-         throw new IllegalStateException("Address must be IpAddress");
-      }
+      info.write(out);
       
-      address.writeTo(out);
-      
       out.writeInt(nodeId);   
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SharedState.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SharedState.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/SharedState.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -24,11 +24,11 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.jboss.messaging.util.StreamUtils;
 import org.jboss.messaging.util.Streamable;
 
 /**
@@ -78,7 +78,20 @@
          bindings.add(bb);
       }
       
-      nodeIdAddressMap = (Map)StreamUtils.readObject(in, false);
+      size = in.readInt();
+      
+      nodeIdAddressMap = new HashMap(size);
+      
+      for (int i = 0; i < size; i++)
+      {
+         int nodeId = in.readInt();
+         
+         NodeAddressInfo info = new NodeAddressInfo();
+         
+         info.read(in);
+         
+         nodeIdAddressMap.put(new Integer(nodeId), info);
+      }
    }
 
    public void write(DataOutputStream out) throws Exception
@@ -92,6 +105,21 @@
          info.write(out);
       }
       
-      StreamUtils.writeObject(out, nodeIdAddressMap, true, false);     
+      out.writeInt(nodeIdAddressMap.size());
+      
+      iter = nodeIdAddressMap.entrySet().iterator();
+      
+      while (iter.hasNext())
+      {
+         Map.Entry entry = (Map.Entry)iter.next();
+         
+         Integer nodeId = (Integer)entry.getKey();
+         
+         out.writeInt(nodeId.intValue());
+         
+         NodeAddressInfo info = (NodeAddressInfo)entry.getValue();
+         
+         info.write(out);
+      }
    }
 }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -550,15 +550,14 @@
           
          office1.unbindClusteredQueue("queue1");
          
-         //It should be possible to bind local queues into a clustered post office
-         
-         PagingFilteredQueue queue7 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);       
+         //It should be possible to bind queues locally into a clustered post office
+         LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding7 = office1.bindQueue("queue1", queue7);
          
-         PagingFilteredQueue queue8 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);       
+         LocalClusteredQueue queue8 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);       
          Binding binding8 = office2.bindQueue("queue1", queue8);
          
-         PagingFilteredQueue queue9 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);    
+         LocalClusteredQueue queue9 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);       
          try
          {
             Binding binding9 = office1.bindQueue("queue1", queue9);
@@ -624,9 +623,13 @@
          queue3.add(receiver3);
          
          Message msg1 = CoreMessageFactory.createCoreMessage(1);      
-         MessageReference ref1 = ms.reference(msg1);         
-         boolean routed = office1.route(ref1, "topic1", null);      
+         MessageReference ref1 = ms.reference(msg1);  
+         log.info("Sending message 1");
+         boolean routed = office1.route(ref1, "topic1", null);   
+         log.info("Sent message 1");
          assertTrue(routed);
+         
+         
          Message msg2 = CoreMessageFactory.createCoreMessage(2);      
          MessageReference ref2 = ms.reference(msg2);         
          routed = office1.route(ref2, "topic1", null);      

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java	2006-10-04 11:59:01 UTC (rev 1429)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/JGroupsUtil.java	2006-10-04 16:07:34 UTC (rev 1430)
@@ -46,16 +46,6 @@
       }
 
       return
-//      "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;bind_addr=" + host + ";up_thread=false;down_thread=false):" +
-//      "PING(timeout=2000;num_initial_members=3;up_thread=false;down_thread=false):"+
-//      "FD(timeout=3000;up_thread=false;down_thread=false):"+
-//      "VERIFY_SUSPECT(timeout=1500;up_thread=false;down_thread=false):"+
-//      "pbcast.NAKACK(gc_lag=10;retransmit_timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
-//      "UNICAST(timeout=600,1200,2400,4800;up_thread=false;down_thread=false):"+
-//      "pbcast.STABLE(desired_avg_gossip=10000;up_thread=false;down_thread=false):"+
-//      "FRAG(up_thread=false;down_thread=false):"+
-//      "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=true;print_local_addr=true;up_thread=false;down_thread=false)";
-
       "UDP(mcast_recv_buf_size=500000;down_thread=false;ip_mcast=true;mcast_send_buf_size=32000;"+
           "mcast_port=45566;ucast_recv_buf_size=500000;use_incoming_packet_handler=false;"+
           "mcast_addr=228.8.8.8;use_outgoing_packet_handler=true;ucast_send_buf_size=32000;ip_ttl=32;"+
@@ -71,7 +61,7 @@
       "pbcast.STABLE(stability_delay=1000;desired_avg_gossip=20000;down_thread=false;max_bytes=0;up_thread=false):"+
       "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+
       "VIEW_SYNC(avg_send_interval=60000;down_thread=false;up_thread=false):"+
-      "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true)";      
+      "pbcast.GMS(print_local_addr=true;join_timeout=3000;down_thread=false;join_retry_timeout=2000;up_thread=false;shun=true)";
    }
    
 




More information about the jboss-cvs-commits mailing list