[jboss-cvs] JBoss Messaging SVN: r6036 - in trunk: src/main/org/jboss/messaging/core/config and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sat Mar 7 10:30:27 EST 2009


Author: timfox
Date: 2009-03-07 10:30:27 -0500 (Sat, 07 Mar 2009)
New Revision: 6036

Modified:
   trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
Log:
various fixes etc

Modified: trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/cluster/impl/DiscoveryGroupImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -22,9 +22,7 @@
 
 package org.jboss.messaging.core.cluster.impl;
 
-import java.io.ByteArrayInputStream;
 import java.io.InterruptedIOException;
-import java.io.ObjectInputStream;
 import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.MulticastSocket;
@@ -34,10 +32,12 @@
 import java.util.List;
 import java.util.Map;
 
+import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.cluster.DiscoveryGroup;
 import org.jboss.messaging.core.cluster.DiscoveryListener;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.Pair;
 
 /**
@@ -223,20 +223,18 @@
                   continue;
                }
             }
+            
+            MessagingBuffer buffer = ChannelBuffers.wrappedBuffer(data);
+            
+            String originatingNodeID = buffer.readString();
 
-            ByteArrayInputStream bis = new ByteArrayInputStream(data);
-
-            ObjectInputStream ois = new ObjectInputStream(bis);
-
-            String originatingNodeID = ois.readUTF();
-
             if (nodeID.equals(originatingNodeID))
             {
                // Ignore traffic from own node
                continue;
             }
 
-            int size = ois.readInt();
+            int size = buffer.readInt();
 
             boolean changed = false;
 
@@ -244,15 +242,19 @@
             {
                for (int i = 0; i < size; i++)
                {
-                  TransportConfiguration connector = (TransportConfiguration)ois.readObject();
+                  TransportConfiguration connector = new TransportConfiguration();
+                  
+                  connector.decode(buffer);
 
-                  boolean existsBackup = ois.readBoolean();
+                  boolean existsBackup = buffer.readBoolean();
 
                   TransportConfiguration backupConnector = null;
 
                   if (existsBackup)
                   {
-                     backupConnector = (TransportConfiguration)ois.readObject();
+                     backupConnector = new TransportConfiguration();
+                     
+                     backupConnector.decode(buffer);
                   }
 
                   Pair<TransportConfiguration, TransportConfiguration> connectorPair = new Pair<TransportConfiguration, TransportConfiguration>(connector,

Modified: trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/config/TransportConfiguration.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -22,9 +22,12 @@
 package org.jboss.messaging.core.config;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
 /**
  * A TransportConfiguration
  * 
@@ -35,73 +38,191 @@
 {
    private static final long serialVersionUID = -3994528421527392679L;
 
-   private final String name;
-   
-   private final String factoryClassName;
-   
-   private final Map<String, Object> params;
-   
+   private String name;
+
+   private String factoryClassName;
+
+   private Map<String, Object> params;
+
+   private static final byte TYPE_BOOLEAN = 0;
+
+   private static final byte TYPE_INT = 1;
+
+   private static final byte TYPE_LONG = 2;
+
+   private static final byte TYPE_STRING = 3;
+
+   public void encode(final MessagingBuffer buffer)
+   {
+      buffer.writeString(name);
+      buffer.writeString(factoryClassName);
+
+      buffer.writeInt(params == null ? 0 : params.size());
+
+      if (params != null)
+      {
+         for (Map.Entry<String, Object> entry : params.entrySet())
+         {
+            buffer.writeString(entry.getKey());
+
+            Object val = entry.getValue();
+
+            if (val instanceof Boolean)
+            {
+               buffer.writeByte(TYPE_BOOLEAN);
+               buffer.writeBoolean((Boolean)val);
+            }
+            else if (val instanceof Integer)
+            {
+               buffer.writeByte(TYPE_INT);
+               buffer.writeInt((Integer)val);
+            }
+            else if (val instanceof Long)
+            {
+               buffer.writeByte(TYPE_LONG);
+               buffer.writeLong((Long)val);
+            }
+            else if (val instanceof String)
+            {
+               buffer.writeByte(TYPE_STRING);
+               buffer.writeString((String)val);
+            }
+            else
+            {
+               throw new IllegalArgumentException("Invalid type " + val);
+            }
+         }
+      }
+   }
+
+   public void decode(final MessagingBuffer buffer)
+   {
+      name = buffer.readString();
+      factoryClassName = buffer.readString();
+
+      int num = buffer.readInt();
+
+      if (params == null)
+      {
+         if (num > 0)
+         {
+            params = new HashMap<String, Object>();
+         }
+      }
+      else
+      {
+         params.clear();
+      }
+
+      for (int i = 0; i < num; i++)
+      {
+         String key = buffer.readString();
+
+         byte type = buffer.readByte();
+
+         Object val;
+
+         switch (type)
+         {
+            case TYPE_BOOLEAN:
+            {
+               val = buffer.readBoolean();
+
+               break;
+            }
+            case TYPE_INT:
+            {
+               val = buffer.readInt();
+
+               break;
+            }
+            case TYPE_LONG:
+            {
+               val = buffer.readLong();
+
+               break;
+            }
+            case TYPE_STRING:
+            {
+               val = buffer.readString();
+
+               break;
+            }
+            default:
+            {
+               throw new IllegalArgumentException("Invalid type " + type);
+            }
+         }
+
+         params.put(key, val);
+      }
+   }
+
    public static String[] splitHosts(final String commaSeparatedHosts)
-   {  
+   {
       if (commaSeparatedHosts == null)
       {
          return new String[0];
       }
       String[] hosts = commaSeparatedHosts.split(",");
-      
+
       for (int i = 0; i < hosts.length; i++)
       {
-         hosts[i] = hosts[i].trim();         
+         hosts[i] = hosts[i].trim();
       }
-      return hosts;      
+      return hosts;
    }
-   
+
+   public TransportConfiguration()
+   {
+   }
+
    public TransportConfiguration(final String className, final Map<String, Object> params, final String name)
    {
       this.factoryClassName = className;
-      
+
       this.params = params;
-      
+
       this.name = name;
    }
-   
+
    public TransportConfiguration(final String className, final Map<String, Object> params)
    {
       this(className, params, UUID.randomUUID().toString());
    }
-   
+
    public TransportConfiguration(final String className)
    {
       this(className, null, UUID.randomUUID().toString());
    }
-   
+
    public String getName()
    {
       return name;
    }
-   
+
    public String getFactoryClassName()
    {
       return factoryClassName;
    }
-   
+
    public Map<String, Object> getParams()
    {
       return params;
    }
-   
+
    public int hashCode()
    {
       return factoryClassName.hashCode();
    }
-   
+
    public boolean equals(final Object other)
    {
       if (other instanceof TransportConfiguration == false)
       {
          return false;
       }
-      
+
       TransportConfiguration kother = (TransportConfiguration)other;
 
       if (factoryClassName.equals(kother.factoryClassName))

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -189,7 +189,17 @@
 
       // Injecting the postoffice (itself) on queueFactory for paging-control
       queueFactory.setPostOffice(this);
-
+      
+      if (!backup)
+      {
+         startExpiryScanner();
+      }
+                  
+      started = true;
+   }
+   
+   private void startExpiryScanner()
+   {
       if (messageExpiryScanPeriod > 0)
       {
          MessageExpiryRunner messageExpiryRunner = new MessageExpiryRunner();
@@ -201,7 +211,6 @@
                                                       messageExpiryScanPeriod,
                                                       TimeUnit.MILLISECONDS);
       }
-      started = true;
    }
 
    public void stop() throws Exception
@@ -669,6 +678,8 @@
             }
          }
       }
+      
+      startExpiryScanner();
 
       return queues;
    }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -358,6 +358,7 @@
 
    public String getRemoteAddress()
    {
+      log.info("transport connection is " + transportConnection);
       return transportConnection.getRemoteAddress();
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -147,6 +147,10 @@
 
    public static final byte SESS_REPLICATE_DELIVERY = 91;
    
+   public static final byte REPLICATE_UPDATE_CONNECTORS = 92;
+   
+   public static final byte REPLICATE_NOTIFICATION = 93;
+   
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -16,6 +16,7 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.management.ManagementService;
 import org.jboss.messaging.core.management.MessagingServerControlMBean;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -32,6 +33,7 @@
 import org.jboss.messaging.core.settings.impl.AddressSettings;
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
 import org.jboss.messaging.utils.UUID;
 
@@ -97,6 +99,8 @@
                                                        boolean xa,
                                                        int sendWindowSize) throws Exception;
    
+   void updateClusterConnectionConnectors(SimpleString clusterConnectionName, List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
+   
    void removeSession(String name) throws Exception;
    
    ServerSession getSession(String name);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterConnection.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -23,7 +23,11 @@
 
 package org.jboss.messaging.core.server.cluster;
 
+import java.util.List;
+
+import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.utils.Pair;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
@@ -38,4 +42,6 @@
 public interface ClusterConnection extends MessagingComponent
 {
    SimpleString getName();
+   
+   void handleReplicatedUpdateConnectors(List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/ClusterManager.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -26,6 +26,7 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.utils.SimpleString;
 
 /**
  * A ClusterManager
@@ -38,6 +39,8 @@
 public interface ClusterManager extends MessagingComponent
 {
    Map<String, Bridge> getBridges();
-   
+
    Set<ClusterConnection> getClusterConnections();
+
+   ClusterConnection getClusterConnection(SimpleString name);
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BroadcastGroupImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.core.server.cluster.impl;
 
 import java.io.ByteArrayOutputStream;
-import java.io.ObjectOutputStream;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
@@ -31,8 +30,10 @@
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
 
+import org.jboss.messaging.core.buffers.ChannelBuffers;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.core.server.cluster.BroadcastGroup;
 import org.jboss.messaging.utils.Pair;
 
@@ -94,11 +95,12 @@
       if (localPort != -1)
       {
          socket = new DatagramSocket(localPort);
-      } else
+      }
+      else
       {
-         socket = new DatagramSocket();         
+         socket = new DatagramSocket();
       }
-      
+
       started = true;
    }
 
@@ -146,36 +148,30 @@
 
    public synchronized void broadcastConnectors() throws Exception
    {
-      // TODO - for now we just use plain serialization to serialize the transport configs
-      // we should use our own format for a tighter representation
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      MessagingBuffer buff = ChannelBuffers.dynamicBuffer(4096);
+     
+      buff.writeString(nodeID);
 
-      ObjectOutputStream oos = new ObjectOutputStream(bos);
-      
-      oos.writeUTF(nodeID);
+      buff.writeInt(connectorPairs.size());
 
-      oos.writeInt(connectorPairs.size());
-      
       for (Pair<TransportConfiguration, TransportConfiguration> connectorPair : connectorPairs)
       {
-         oos.writeObject(connectorPair.a);
+         connectorPair.a.encode(buff);
 
          if (connectorPair.b != null)
          {
-            oos.writeBoolean(true);
+            buff.writeBoolean(true);
 
-            oos.writeObject(connectorPair.b);
+            connectorPair.b.encode(buff);
          }
          else
          {
-            oos.writeBoolean(false);
+            buff.writeBoolean(false);
          }
       }
-
-      oos.flush();
-
-      byte[] data = bos.toByteArray();
-
+      
+      byte[] data = buff.array();
+            
       DatagramPacket packet = new DatagramPacket(data, data.length, groupAddress, groupPort);
 
       socket.send(packet);
@@ -194,13 +190,13 @@
       }
       catch (Exception e)
       {
-         log.error("Failed to broadcast connector configs");
+         log.error("Failed to broadcast connector configs", e);
       }
    }
 
    public synchronized void setScheduledFuture(final ScheduledFuture<?> future)
    {
       this.future = future;
-   }   
+   }
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -46,6 +46,9 @@
 import org.jboss.messaging.core.postoffice.Bindings;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.cluster.Bridge;
@@ -106,6 +109,10 @@
    private final int maxHops;
 
    private final UUID nodeUUID;
+   
+   private final Channel replicatingChannel;
+   
+   private boolean backup;
 
    private volatile boolean started;
 
@@ -128,7 +135,9 @@
                                 final QueueFactory queueFactory,
                                 final List<Pair<TransportConfiguration, TransportConfiguration>> connectors,
                                 final int maxHops,
-                                final UUID nodeUUID) throws Exception
+                                final UUID nodeUUID,
+                                final Channel replicatingChannel,
+                                final boolean backup) throws Exception
    {
       this.name = name;
 
@@ -163,6 +172,10 @@
       this.maxHops = maxHops;
 
       this.nodeUUID = nodeUUID;
+      
+      this.replicatingChannel = replicatingChannel;
+      
+      this.backup = backup;
 
       this.updateConnectors(connectors);
    }
@@ -186,7 +199,9 @@
                                 final QueueFactory queueFactory,
                                 final DiscoveryGroup discoveryGroup,
                                 final int maxHops,
-                                final UUID nodeUUID) throws Exception
+                                final UUID nodeUUID,
+                                final Channel replicatingChannel,
+                                final boolean backup) throws Exception
    {
       this.name = name;
 
@@ -221,6 +236,10 @@
       this.maxHops = maxHops;
 
       this.nodeUUID = nodeUUID;
+      
+      this.replicatingChannel = replicatingChannel;
+      
+      this.backup = backup;
    }
 
    public synchronized void start() throws Exception
@@ -267,7 +286,17 @@
    {
       return name;
    }
-
+   
+   public synchronized void handleReplicatedUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   {
+      if (!backup)
+      {
+         return;
+      }
+      
+      updateConnectors(connectors);
+   }
+   
    // DiscoveryListener implementation ------------------------------------------------------------------
 
    public synchronized void connectorsChanged()
@@ -283,9 +312,38 @@
          log.error("Failed to update connectors", e);
       }
    }
-
+   
    private void updateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
    {
+      if (replicatingChannel == null)
+      {
+         doUpdateConnectors(connectors);
+      }
+      else
+      {
+         Packet packet = new ReplicateClusterConnectionUpdate(name, connectors);
+         
+         Runnable action = new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  doUpdateConnectors(connectors);
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to update connectors", e);
+               }
+            }
+         };
+         
+         replicatingChannel.replicatePacket(packet, 1, action);
+      }
+   }
+
+   private void doUpdateConnectors(final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   {
       Set<Pair<TransportConfiguration, TransportConfiguration>> connectorSet = new HashSet<Pair<TransportConfiguration, TransportConfiguration>>();
 
       connectorSet.addAll(connectors);

Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -207,6 +207,11 @@
    {
       return new HashSet<ClusterConnection>(clusters.values());
    }
+   
+   public ClusterConnection getClusterConnection(final SimpleString name)
+   {
+      return clusters.get(name.toString()); 
+   }
 
    private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration config) throws Exception
    {
@@ -475,7 +480,9 @@
                                                        queueFactory,
                                                        connectors,
                                                        config.getMaxHops(),
-                                                       nodeUUID);
+                                                       nodeUUID,
+                                                       null,
+                                                       false);
       }
       else
       {
@@ -503,7 +510,9 @@
                                                        queueFactory,
                                                        dg,
                                                        config.getMaxHops(),
-                                                       nodeUUID);
+                                                       nodeUUID,
+                                                       null,
+                                                       false);
       }
 
       managementService.registerCluster(clusterConnection, config);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -67,6 +67,7 @@
 import org.jboss.messaging.core.server.Queue;
 import org.jboss.messaging.core.server.QueueFactory;
 import org.jboss.messaging.core.server.ServerSession;
+import org.jboss.messaging.core.server.cluster.ClusterConnection;
 import org.jboss.messaging.core.server.cluster.ClusterManager;
 import org.jboss.messaging.core.server.cluster.Transformer;
 import org.jboss.messaging.core.server.cluster.impl.ClusterManagerImpl;
@@ -716,6 +717,19 @@
       return sessions.get(name);
    }
 
+   public void updateClusterConnectionConnectors(final SimpleString clusterConnectionName,
+                                                 final List<Pair<TransportConfiguration, TransportConfiguration>> connectors) throws Exception
+   {
+      ClusterConnection cc = clusterManager.getClusterConnection(clusterConnectionName);
+
+      if (cc == null)
+      {
+         throw new IllegalStateException("Cannot find cluster connection with name " + clusterConnectionName);
+      }
+
+      cc.handleReplicatedUpdateConnectors(connectors);
+   }
+
    public List<ServerSession> getSessions(final String connectionID)
    {
       Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();
@@ -1023,7 +1037,7 @@
 
       return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
-   
+
    private Transformer instantiateTransformer(final String transformerClassName)
    {
       Transformer transformer = null;

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -15,6 +15,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_UPDATE_CONNECTORS;
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -26,6 +27,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateClusterConnectionUpdate;
 import org.jboss.messaging.core.server.MessagingServer;
 
 /**
@@ -88,6 +90,15 @@
 
             break;
          }
+         case REPLICATE_UPDATE_CONNECTORS:
+         {
+            ReplicateClusterConnectionUpdate request = (ReplicateClusterConnectionUpdate)packet;
+            
+            handleClusterConnectionUpdate(request);
+            
+            break;
+            
+         }
          default:
          {
             log.error("Invalid packet " + packet);
@@ -227,5 +238,17 @@
       
       channel1.send(response);
    }
+   
+   private void handleClusterConnectionUpdate(final ReplicateClusterConnectionUpdate request)
+   {
+      try
+      {
+         server.updateClusterConnectionConnectors(request.getClusterConnectionName(), request.getConnectors());
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to handle cluster connection update", e);
+      }
+   }
 
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/MessagingChannelHandler.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -81,6 +81,7 @@
          if (active)
          {
             listener.connectionDestroyed(e.getChannel().getId());
+            
             active = false;
          }
       }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -22,6 +22,19 @@
 
 package org.jboss.messaging.integration.transports.netty;
 
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Timer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.SSLContext;
+
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
@@ -42,30 +55,16 @@
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
-import static org.jboss.netty.channel.Channels.pipeline;
-
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.local.DefaultLocalServerChannelFactory;
 import org.jboss.netty.channel.local.LocalAddress;
-import org.jboss.netty.channel.local.LocalServerChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
 import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
 import org.jboss.netty.handler.ssl.SslHandler;
 
-import javax.net.ssl.SSLContext;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Timer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A Netty TCP Acceptor that supports SSL
  *
@@ -271,7 +270,7 @@
             }
 
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
-            pipeline.addLast("handler", new MessagingServerChannelHandler(channelGroup, handler, listener));
+            pipeline.addLast("handler", new MessagingServerChannelHandler(channelGroup, handler, new Listener()));
             return pipeline;
          }
       };
@@ -355,7 +354,7 @@
       @Override
       public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception
       {
-         final Connection tc = new NettyConnection(e.getChannel(), new Listener());
+         new NettyConnection(e.getChannel(), new Listener());
 
          SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
          if (sslHandler != null)
@@ -366,7 +365,6 @@
                {
                   if (future.isSuccess())
                   {
-                     listener.connectionCreated(tc);
                      active = true;
                   }
                   else
@@ -378,7 +376,6 @@
          }
          else
          {
-            listener.connectionCreated(tc);
             active = true;
          }
       }
@@ -392,13 +389,22 @@
          {
             throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
          }
+         
+         listener.connectionCreated(connection);
       }
 
       public void connectionDestroyed(final Object connectionID)
       {
          if (connections.remove(connectionID) != null)
          {
-            listener.connectionDestroyed(connectionID);
+            //Execute on different thread to avoid deadlocks
+            new Thread()
+            {
+               public void run()
+               {
+                  listener.connectionDestroyed(connectionID);               
+               }
+            }.start();
          }
       }
 

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnection.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -130,6 +130,8 @@
 
    public String getRemoteAddress()
    {
+      log.info("channel is " + channel);
+      log.info("channel remote address " + channel.getRemoteAddress());
       return channel.getRemoteAddress().toString();
    }
    

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -21,6 +21,24 @@
  */
 package org.jboss.messaging.integration.transports.netty;
 
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
@@ -40,8 +58,6 @@
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
-import static org.jboss.netty.channel.Channels.pipeline;
-import static org.jboss.netty.channel.Channels.write;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.UpstreamMessageEvent;
@@ -65,20 +81,6 @@
 import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.jboss.netty.handler.ssl.SslHandler;
 
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * A NettyConnector
  *
@@ -100,7 +102,7 @@
    private ClientSocketChannelFactory channelFactory;
 
    private ClientBootstrap bootstrap;
-   
+
    ChannelGroup channelGroup;
 
    private final BufferHandler handler;
@@ -137,7 +139,7 @@
 
    private ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
 
-   private  final String servletPath;
+   private final String servletPath;
 
    // Static --------------------------------------------------------
 
@@ -227,8 +229,6 @@
       this.tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME,
                                                                      TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
                                                                      configuration);
-
-
    }
 
    public synchronized void start()
@@ -247,8 +247,8 @@
       {
          channelFactory = new OioClientSocketChannelFactory(workerExecutor);
       }
-      //if we are a servlet wrap the socketChannelFactory
-      if(useServlet)
+      // if we are a servlet wrap the socketChannelFactory
+      if (useServlet)
       {
          ClientSocketChannelFactory proxyChannelFactory = channelFactory;
          channelFactory = new HttpTunnelingClientSocketChannelFactory(proxyChannelFactory, workerExecutor);
@@ -305,7 +305,7 @@
                pipeline.addLast("httphandler", new HttpHandler());
             }
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
-            pipeline.addLast("handler", new MessagingClientChannelHandler(channelGroup, handler, listener));
+            pipeline.addLast("handler", new MessagingClientChannelHandler(channelGroup, handler, new Listener()));
             return pipeline;
          }
       });
@@ -317,12 +317,12 @@
       {
          return;
       }
-      
+
       bootstrap = null;
       channelGroup.close().awaitUninterruptibly();
       channelFactory.releaseExternalResources();
       channelFactory = null;
-      
+
       for (Connection connection : connections.values())
       {
          listener.connectionDestroyed(connection.getID());
@@ -438,8 +438,6 @@
 
       private String url;
 
-      //private String sessionId = null;
-
       private Future handShakeFuture = new Future();
 
       private boolean active = false;
@@ -452,7 +450,6 @@
 
       private CookieEncoder cookieEncoder = new CookieEncoder();
 
-
       public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
       {
          super.channelConnected(ctx, e);
@@ -481,7 +478,7 @@
       @Override
       public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
       {
-         HttpResponse response = (HttpResponse) e.getMessage();
+         HttpResponse response = (HttpResponse)e.getMessage();
          if (httpRequiresSessionId && !active)
          {
             Map<String, Cookie> cookieMap = cookieDecoder.decode(response.getHeader(HttpHeaders.Names.SET_COOKIE));
@@ -494,14 +491,11 @@
             active = true;
             handShakeFuture.run();
          }
-         MessageEvent event = new UpstreamMessageEvent(e.getChannel(),
-                                                       response.getContent(),
-                                                       e.getRemoteAddress());
+         MessageEvent event = new UpstreamMessageEvent(e.getChannel(), response.getContent(), e.getRemoteAddress());
          waitingGet = false;
          ctx.sendUpstream(event);
       }
 
-
       @Override
       public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
       {
@@ -527,7 +521,7 @@
             {
                httpRequest.addHeader(HttpHeaders.Names.COOKIE, cookie);
             }
-            ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+            ChannelBuffer buf = (ChannelBuffer)e.getMessage();
             httpRequest.setContent(buf);
             httpRequest.addHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
             write(ctx, e.getFuture(), httpRequest, e.getRemoteAddress());
@@ -573,7 +567,17 @@
 
       public void connectionDestroyed(final Object connectionID)
       {
-         connections.remove(connectionID);
+         if (connections.remove(connectionID) != null)
+         {
+            // Execute on different thread to avoid deadlocks
+            new Thread()
+            {
+               public void run()
+               {
+                  listener.connectionDestroyed(connectionID);
+               }
+            }.start();
+         }
       }
 
       public void connectionException(final Object connectionID, final MessagingException me)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverExpiredMessageTest.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -92,12 +92,8 @@
   
       ClientSession session1 = sf1.createSession(false, true, true);
       
-      log.info("created session");
-
       session1.createQueue(ADDRESS, ADDRESS, null, false, false);
-      
-      log.info("created queue");
-      
+       
       session1.start();
 
       ClientProducer producer = session1.createProducer(ADDRESS);
@@ -107,9 +103,7 @@
       //Set time to live so at least some of them will more than likely expire before they are consumed by the client
       
       long now = System.currentTimeMillis();
-      
-      log.info("sending messages");
-      
+       
       long expire = now + 5000;
 
       for (int i = 0; i < numMessages; i++)
@@ -122,12 +116,8 @@
          message.putIntProperty(new SimpleString("count"), i);         
          message.getBody().writeString("aardvarks");
          producer.send(message);               
-         
-         log.info("sent message " + i);
       }
       
-      log.info("sent messages");
-      
       ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
                  
       final RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
@@ -169,13 +159,10 @@
          }
          else
          {
-            log.info("message was null");
             break;
          }
       }           
       
-      log.info("Got " + count + " messages");
-           
       t.join();
                    
       session1.close();
@@ -194,16 +181,87 @@
       session2.close();      
    }
    
+   public void testExpireViaReaperOnLive() throws Exception
+   {            
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams));
+      
+      sf1.setSendWindowSize(32 * 1024);
+  
+      ClientSession session1 = sf1.createSession(false, true, true);
+      
+      log.info("created session");
+
+      session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+      
+      log.info("created queue");
+      
+      ClientProducer producer = session1.createProducer(ADDRESS);
+
+      final int numMessages = 10000;
+      
+      //Set time to live so messages are expired on the server
+      
+      long now = System.currentTimeMillis();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session1.createClientMessage(JBossTextMessage.TYPE,
+                                                             false,
+                                                             now,
+                                                             System.currentTimeMillis(),
+                                                             (byte)1);
+         message.putIntProperty(new SimpleString("count"), i);         
+         message.getBody().writeString("aardvarks");
+         producer.send(message);               
+      }
+
+      Thread.sleep(4 * expireScanPeriod);
+      
+      //Messages should all be expired now
+      
+      ClientConsumer consumer1 = session1.createConsumer(ADDRESS);
+      
+      session1.start();
+                 
+      RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+ 
+      conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+                              
+      ClientMessage message = consumer1.receive(1000);
+      
+      assertNull(message);
+                         
+      session1.close();
+      
+      //Make sure no more messages
+      ClientSession session2 = sf1.createSession(false, true, true);
+      
+      session2.start();
+      
+      ClientConsumer consumer2 = session2.createConsumer(ADDRESS);
+      
+      message = consumer2.receive(1000);
+      
+      assertNull(message);
+      
+      session2.close();      
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
 
+   private final long expireScanPeriod = 1000;
+   
    @Override
    protected void setUp() throws Exception
    {
       super.setUp();
       
       Configuration backupConf = new ConfigurationImpl();
+      backupConf.setMessageExpiryScanPeriod(expireScanPeriod);
       backupConf.setSecurityEnabled(false);
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
       backupConf.getAcceptorConfigurations()
@@ -214,6 +272,7 @@
       backupService.start();
 
       Configuration liveConf = new ConfigurationImpl();
+      liveConf.setMessageExpiryScanPeriod(expireScanPeriod);
       liveConf.setSecurityEnabled(false);
       liveConf.getAcceptorConfigurations()
               .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/discovery/DiscoveryTest.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -913,7 +913,7 @@
       Map<String, Object> params = new HashMap<String, Object>();
       params.put(UUIDGenerator.getInstance().generateStringUUID(), 123);
       params.put(UUIDGenerator.getInstance().generateStringUUID(), UUIDGenerator.getInstance().generateStringUUID());
-      params.put(UUIDGenerator.getInstance().generateStringUUID(), 721633.123d);
+      params.put(UUIDGenerator.getInstance().generateStringUUID(), true);
       TransportConfiguration tc = new TransportConfiguration(className, params, name);
       return tc;
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java	2009-03-06 23:54:20 UTC (rev 6035)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/management/JMSServerControlTest.java	2009-03-07 15:30:27 UTC (rev 6036)
@@ -40,6 +40,7 @@
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
 import org.jboss.messaging.core.server.Messaging;
@@ -63,7 +64,10 @@
 public class JMSServerControlTest extends UnitTestCase
 {
    // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(JMSServerControlTest.class);
 
+
    // Attributes ----------------------------------------------------
 
    private MBeanServer mbeanServer;
@@ -314,6 +318,8 @@
 
          // FIXME: with Netty, the server is not notified immediately that the connection is closed
          Thread.sleep(500);
+         
+         log.info("got here");
 
          assertEquals(0, control.listRemoteAddresses().length);
       }




More information about the jboss-cvs-commits mailing list