[hornetq-commits] JBoss hornetq SVN: r10907 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/api/core and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 1 08:39:30 EDT 2011


Author: borges
Date: 2011-07-01 08:39:30 -0400 (Fri, 01 Jul 2011)
New Revision: 10907

Removed:
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java
Modified:
   branches/HORNETQ-720_Replication/
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
merge from trunk


Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
   - /trunk:10878-10900
   + /trunk:10878-10906

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java	2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java	2011-07-01 12:39:30 UTC (rev 10907)
@@ -53,27 +53,6 @@
    private static final byte TYPE_STRING = 3;
 
    /**
-    * Utility method for splitting a comma separated list of hosts
-    *
-    * @param commaSeparatedHosts the comma separated host string
-    * @return the hosts
-    */
-   public static String[] splitHosts(final String commaSeparatedHosts)
-   {
-      if (commaSeparatedHosts == null)
-      {
-         return new String[0];
-      }
-      String[] hosts = commaSeparatedHosts.split(",");
-
-      for (int i = 0; i < hosts.length; i++)
-      {
-         hosts[i] = hosts[i].trim();
-      }
-      return hosts;
-   }
-
-   /**
     * Creates a default TransportConfiguration with no configured transport.
     */
    public TransportConfiguration()

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-07-01 12:39:30 UTC (rev 10907)
@@ -37,9 +37,33 @@
  */
 public class ChannelImpl implements Channel
 {
+   public enum CHANNEL_ID {
+
+      /**
+       * Used for core protocol management.
+       * @see CoreProtocolManager
+       */
+      PING(0),
+      /** Session creation and attachment. */
+      SESSION(1),
+      /** Replication, i.e. for backups that do not share the journal. */
+      REPLICATION(2),
+      /**
+       * Channels [0-9] are reserved for the system, user channels must be greater than that.
+       */
+      USER(10);
+
+      public final long id;
+
+      CHANNEL_ID(long id)
+      {
+         this.id = id;
+      }
+   }
+
    private static final Logger log = Logger.getLogger(ChannelImpl.class);
 
-   private volatile long id;
+   private final long id;
 
    private ChannelHandler handler;
 
@@ -336,7 +360,7 @@
 
          // And switch it
 
-         final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
+         final CoreRemotingConnection rnewConnection = newConnection;
 
          rnewConnection.putChannel(id, this);
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-07-01 12:39:30 UTC (rev 10907)
@@ -85,7 +85,6 @@
 import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -158,15 +157,17 @@
  * A PacketDecoder
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
  */
-public class PacketDecoder
+public final class PacketDecoder
 {
-   private static final Logger log = Logger.getLogger(PacketDecoder.class);
 
-   public Packet decode(final HornetQBuffer in)
+   private PacketDecoder()
    {
+      // Utility
+   }
+
+   public static Packet decode(final HornetQBuffer in)
+   {
       final byte packetType = in.readByte();
 
       Packet packet;

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java	2011-07-01 12:39:30 UTC (rev 10907)
@@ -30,6 +30,7 @@
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
@@ -71,11 +72,7 @@
 
    private final boolean client;
 
-   // Channels 0-9 are reserved for the system
-   // 0 is for pinging
-   // 1 is for session creation and attachment
-   // 2 is for replication
-   private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
+   private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
 
    private boolean idGeneratorSynced = false;
 
@@ -83,18 +80,16 @@
 
    private final Object failLock = new Object();
 
-   private final PacketDecoder decoder = new PacketDecoder();
-
    private volatile boolean dataReceived;
 
    private final Executor executor;
-   
+
    private volatile boolean executing;
-   
+
    private final SimpleString nodeID;
 
    private final long creationTime;
-   
+
    private String clientID;
 
    // Constructors
@@ -139,9 +134,9 @@
       this.client = client;
 
       this.executor = executor;
-      
+
       this.nodeID = nodeID;
-      
+
       this.creationTime = System.currentTimeMillis();
    }
 
@@ -174,7 +169,7 @@
    {
       return transportConnection.getRemoteAddress();
    }
-   
+
    public long getCreationTime()
    {
       return creationTime;
@@ -247,26 +242,26 @@
    public List<CloseListener> removeCloseListeners()
    {
       List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
-      
+
       closeListeners.clear();
-      
+
       return ret;
    }
 
    public List<FailureListener> removeFailureListeners()
    {
       List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
-      
+
       failureListeners.clear();
-      
-      return ret; 
+
+      return ret;
    }
 
    public void setCloseListeners(List<CloseListener> listeners)
    {
       closeListeners.clear();
-      
-      closeListeners.addAll(listeners);      
+
+      closeListeners.addAll(listeners);
    }
 
    public HornetQBuffer createBuffer(final int size)
@@ -323,7 +318,7 @@
 
       callClosingListeners();
    }
-   
+
    public void disconnect()
    {
       Channel channel0 = getChannel(0, -1);
@@ -331,13 +326,13 @@
       // And we remove all channels from the connection, this ensures no more packets will be processed after this
       // method is
       // complete
-      
+
       Set<Channel> allChannels = new HashSet<Channel>(channels.values());
 
       removeAllChannels();
 
       // Now we are 100% sure that no more packets will be processed we can flush then send the disconnect
-      
+
       for (Channel channel: allChannels)
       {
          channel.flushConfirmations();
@@ -408,7 +403,7 @@
          }
       }
    }
-   
+
    public void checkFlushBatchBuffer()
    {
       transportConnection.checkFlushBatchBuffer();
@@ -421,12 +416,12 @@
    {
       try
       {
-         final Packet packet = decoder.decode(buffer);
-            
+         final Packet packet = PacketDecoder.decode(buffer);
+
          if (packet.isAsyncExec() && executor != null)
          {
             executing = true;
-   
+
             executor.execute(new Runnable()
             {
                public void run()
@@ -439,7 +434,7 @@
                   {
                      RemotingConnectionImpl.log.error("Unexpected error", t);
                   }
-   
+
                   executing = false;
                }
             });
@@ -451,13 +446,13 @@
             {
                Thread.yield();
             }
-            
+
             // Pings must always be handled out of band so we can send pings back to the client quickly
             // otherwise they would get in the queue with everything else which might give an intolerable delay
             doBufferReceived(packet);
          }
-        
-         dataReceived = true;  
+
+         dataReceived = true;
       }
       catch (Exception e)
       {
@@ -515,8 +510,8 @@
       {
          channels.clear();
       }
-   }  
-   
+   }
+
    private void callFailureListeners(final HornetQException me)
    {
       final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
@@ -572,7 +567,7 @@
    {
       clientID = cID;
    }
-   
+
    public String getClientID()
    {
       return clientID;

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2011-07-01 12:39:30 UTC (rev 10907)
@@ -32,7 +32,6 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
@@ -467,7 +466,7 @@
 
    private void startServerChannels()
    {
-      String[] hosts = TransportConfiguration.splitHosts(host);
+      String[] hosts = NettyAcceptor.splitHosts(host);
       for (String h : hosts)
       {
          SocketAddress address;
@@ -613,6 +612,27 @@
 
    // Inner classes -----------------------------------------------------------------------------
 
+   /**
+    * Utility method for splitting a comma separated list of hosts
+    *
+    * @param commaSeparatedHosts the comma separated host string
+    * @return the hosts
+    */
+   public static String[] splitHosts(final String commaSeparatedHosts)
+   {
+      if (commaSeparatedHosts == null)
+      {
+         return new String[0];
+      }
+      String[] hosts = commaSeparatedHosts.split(",");
+   
+      for (int i = 0; i < hosts.length; i++)
+      {
+         hosts[i] = hosts[i].trim();
+      }
+      return hosts;
+   }
+
    private final class HornetQServerChannelHandler extends HornetQChannelHandler
    {
       HornetQServerChannelHandler(final ChannelGroup group,

Deleted: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java	2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java	2011-07-01 12:39:30 UTC (rev 10907)
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.config.impl;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A TransportConfigurationTest
- *
- * @author jmesnil
- * 
- * Created 20 janv. 2009 14:46:35
- *
- *
- */
-public class TransportConfigurationTest extends UnitTestCase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public void testSplitNullAddress() throws Exception
-   {
-      String[] addresses = TransportConfiguration.splitHosts(null);
-
-      Assert.assertNotNull(addresses);
-      Assert.assertEquals(0, addresses.length);
-   }
-
-   public void testSplitSingleAddress() throws Exception
-   {
-      String[] addresses = TransportConfiguration.splitHosts("localhost");
-
-      Assert.assertNotNull(addresses);
-      Assert.assertEquals(1, addresses.length);
-      Assert.assertEquals("localhost", addresses[0]);
-   }
-
-   public void testSplitManyAddresses() throws Exception
-   {
-      String[] addresses = TransportConfiguration.splitHosts("localhost, 127.0.0.1, 192.168.0.10");
-
-      Assert.assertNotNull(addresses);
-      Assert.assertEquals(3, addresses.length);
-      Assert.assertEquals("localhost", addresses[0]);
-      Assert.assertEquals("127.0.0.1", addresses[1]);
-      Assert.assertEquals("192.168.0.10", addresses[2]);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2011-07-01 12:24:27 UTC (rev 10906)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2011-07-01 12:39:30 UTC (rev 10907)
@@ -83,7 +83,7 @@
          public void connectionCreated(final Connection connection, final ProtocolType protocol)
          {
          }
-         
+
          public void connectionReadyForWrites(Object connectionID, boolean ready)
          {
          }
@@ -108,12 +108,40 @@
       acceptor.stop();
       Assert.assertFalse(acceptor.isStarted());
       UnitTestCase.checkFreePort(TransportConstants.DEFAULT_PORT);
-      
+
       pool1.shutdown();
       pool2.shutdown();
-      
+
       pool1.awaitTermination(1, TimeUnit.SECONDS);
       pool2.awaitTermination(1, TimeUnit.SECONDS);
    }
 
+   public void testSplitNullAddress() throws Exception
+   {
+      String[] addresses = NettyAcceptor.splitHosts(null);
+
+      Assert.assertNotNull(addresses);
+      Assert.assertEquals(0, addresses.length);
+   }
+
+   public void testSplitSingleAddress() throws Exception
+   {
+      String[] addresses = NettyAcceptor.splitHosts("localhost");
+
+      Assert.assertNotNull(addresses);
+      Assert.assertEquals(1, addresses.length);
+      Assert.assertEquals("localhost", addresses[0]);
+   }
+
+   public void testSplitManyAddresses() throws Exception
+   {
+      String[] addresses = NettyAcceptor.splitHosts("localhost, 127.0.0.1, 192.168.0.10");
+
+      Assert.assertNotNull(addresses);
+      Assert.assertEquals(3, addresses.length);
+      Assert.assertEquals("localhost", addresses[0]);
+      Assert.assertEquals("127.0.0.1", addresses[1]);
+      Assert.assertEquals("192.168.0.10", addresses[2]);
+   }
+
 }



More information about the hornetq-commits mailing list