JBoss hornetq SVN: r10906 - trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 08:24:27 -0400 (Fri, 01 Jul 2011)
New Revision: 10906
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
Log:
Reduce visibility.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-01 12:23:52 UTC (rev 10905)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-01 12:24:27 UTC (rev 10906)
@@ -85,7 +85,6 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SUBSCRIBE_TOPOLOGY;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
@@ -157,15 +156,17 @@
* A PacketDecoder
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
*/
-public class PacketDecoder
+public final class PacketDecoder
{
- private static final Logger log = Logger.getLogger(PacketDecoder.class);
-
- public Packet decode(final HornetQBuffer in)
+
+ private PacketDecoder()
{
+ // Utility
+ }
+
+ public static Packet decode(final HornetQBuffer in)
+ {
final byte packetType = in.readByte();
Packet packet;
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01 12:23:52 UTC (rev 10905)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01 12:24:27 UTC (rev 10906)
@@ -80,8 +80,6 @@
private final Object failLock = new Object();
- private final PacketDecoder decoder = new PacketDecoder();
-
private volatile boolean dataReceived;
private final Executor executor;
@@ -418,7 +416,7 @@
{
try
{
- final Packet packet = decoder.decode(buffer);
+ final Packet packet = PacketDecoder.decode(buffer);
if (packet.isAsyncExec() && executor != null)
{
13 years, 5 months
JBoss hornetq SVN: r10905 - trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 08:23:52 -0400 (Fri, 01 Jul 2011)
New Revision: 10905
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
Log:
Place standard channel numbers into constants (better than at code comment)
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-07-01 12:23:16 UTC (rev 10904)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-07-01 12:23:52 UTC (rev 10905)
@@ -37,9 +37,33 @@
*/
public class ChannelImpl implements Channel
{
+ public enum CHANNEL_ID {
+
+ /**
+ * Used for core protocol management.
+ * @see CoreProtocolManager
+ */
+ PING(0),
+ /** Session creation and attachment. */
+ SESSION(1),
+ /** Replication, i.e. for backups that do not share the journal. */
+ REPLICATION(2),
+ /**
+ * Channels [0-9] are reserved for the system, user channels must be greater than that.
+ */
+ USER(10);
+
+ public final long id;
+
+ CHANNEL_ID(long id)
+ {
+ this.id = id;
+ }
+ }
+
private static final Logger log = Logger.getLogger(ChannelImpl.class);
- private volatile long id;
+ private final long id;
private ChannelHandler handler;
@@ -336,7 +360,7 @@
// And switch it
- final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
+ final CoreRemotingConnection rnewConnection = newConnection;
rnewConnection.putChannel(id, this);
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01 12:23:16 UTC (rev 10904)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01 12:23:52 UTC (rev 10905)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
@@ -71,11 +72,7 @@
private final boolean client;
- // Channels 0-9 are reserved for the system
- // 0 is for pinging
- // 1 is for session creation and attachment
- // 2 is for replication
- private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
+ private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(CHANNEL_ID.USER.id);
private boolean idGeneratorSynced = false;
@@ -88,13 +85,13 @@
private volatile boolean dataReceived;
private final Executor executor;
-
+
private volatile boolean executing;
-
+
private final SimpleString nodeID;
private final long creationTime;
-
+
private String clientID;
// Constructors
@@ -139,9 +136,9 @@
this.client = client;
this.executor = executor;
-
+
this.nodeID = nodeID;
-
+
this.creationTime = System.currentTimeMillis();
}
@@ -174,7 +171,7 @@
{
return transportConnection.getRemoteAddress();
}
-
+
public long getCreationTime()
{
return creationTime;
@@ -247,26 +244,26 @@
public List<CloseListener> removeCloseListeners()
{
List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners);
-
+
closeListeners.clear();
-
+
return ret;
}
public List<FailureListener> removeFailureListeners()
{
List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners);
-
+
failureListeners.clear();
-
- return ret;
+
+ return ret;
}
public void setCloseListeners(List<CloseListener> listeners)
{
closeListeners.clear();
-
- closeListeners.addAll(listeners);
+
+ closeListeners.addAll(listeners);
}
public HornetQBuffer createBuffer(final int size)
@@ -323,7 +320,7 @@
callClosingListeners();
}
-
+
public void disconnect()
{
Channel channel0 = getChannel(0, -1);
@@ -331,13 +328,13 @@
// And we remove all channels from the connection, this ensures no more packets will be processed after this
// method is
// complete
-
+
Set<Channel> allChannels = new HashSet<Channel>(channels.values());
removeAllChannels();
// Now we are 100% sure that no more packets will be processed we can flush then send the disconnect
-
+
for (Channel channel: allChannels)
{
channel.flushConfirmations();
@@ -408,7 +405,7 @@
}
}
}
-
+
public void checkFlushBatchBuffer()
{
transportConnection.checkFlushBatchBuffer();
@@ -422,11 +419,11 @@
try
{
final Packet packet = decoder.decode(buffer);
-
+
if (packet.isAsyncExec() && executor != null)
{
executing = true;
-
+
executor.execute(new Runnable()
{
public void run()
@@ -439,7 +436,7 @@
{
RemotingConnectionImpl.log.error("Unexpected error", t);
}
-
+
executing = false;
}
});
@@ -451,13 +448,13 @@
{
Thread.yield();
}
-
+
// Pings must always be handled out of band so we can send pings back to the client quickly
// otherwise they would get in the queue with everything else which might give an intolerable delay
doBufferReceived(packet);
}
-
- dataReceived = true;
+
+ dataReceived = true;
}
catch (Exception e)
{
@@ -515,8 +512,8 @@
{
channels.clear();
}
- }
-
+ }
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
@@ -572,7 +569,7 @@
{
clientID = cID;
}
-
+
public String getClientID()
{
return clientID;
13 years, 5 months
JBoss hornetq SVN: r10904 - in trunk: hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-07-01 08:23:16 -0400 (Fri, 01 Jul 2011)
New Revision: 10904
Removed:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
Remove utility method from public API class.
Modified: trunk/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java 2011-06-30 05:53:02 UTC (rev 10903)
+++ trunk/hornetq-core/src/main/java/org/hornetq/api/core/TransportConfiguration.java 2011-07-01 12:23:16 UTC (rev 10904)
@@ -53,27 +53,6 @@
private static final byte TYPE_STRING = 3;
/**
- * Utility method for splitting a comma separated list of hosts
- *
- * @param commaSeparatedHosts the comma separated host string
- * @return the hosts
- */
- public static String[] splitHosts(final String commaSeparatedHosts)
- {
- if (commaSeparatedHosts == null)
- {
- return new String[0];
- }
- String[] hosts = commaSeparatedHosts.split(",");
-
- for (int i = 0; i < hosts.length; i++)
- {
- hosts[i] = hosts[i].trim();
- }
- return hosts;
- }
-
- /**
* Creates a default TransportConfiguration with no configured transport.
*/
public TransportConfiguration()
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-06-30 05:53:02 UTC (rev 10903)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-07-01 12:23:16 UTC (rev 10904)
@@ -32,7 +32,6 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
@@ -467,7 +466,7 @@
private void startServerChannels()
{
- String[] hosts = TransportConfiguration.splitHosts(host);
+ String[] hosts = NettyAcceptor.splitHosts(host);
for (String h : hosts)
{
SocketAddress address;
@@ -613,6 +612,27 @@
// Inner classes -----------------------------------------------------------------------------
+ /**
+ * Utility method for splitting a comma separated list of hosts
+ *
+ * @param commaSeparatedHosts the comma separated host string
+ * @return the hosts
+ */
+ public static String[] splitHosts(final String commaSeparatedHosts)
+ {
+ if (commaSeparatedHosts == null)
+ {
+ return new String[0];
+ }
+ String[] hosts = commaSeparatedHosts.split(",");
+
+ for (int i = 0; i < hosts.length; i++)
+ {
+ hosts[i] = hosts[i].trim();
+ }
+ return hosts;
+ }
+
private final class HornetQServerChannelHandler extends HornetQChannelHandler
{
HornetQServerChannelHandler(final ChannelGroup group,
Deleted: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java 2011-06-30 05:53:02 UTC (rev 10903)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/TransportConfigurationTest.java 2011-07-01 12:23:16 UTC (rev 10904)
@@ -1,79 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.unit.core.config.impl;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A TransportConfigurationTest
- *
- * @author jmesnil
- *
- * Created 20 janv. 2009 14:46:35
- *
- *
- */
-public class TransportConfigurationTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testSplitNullAddress() throws Exception
- {
- String[] addresses = TransportConfiguration.splitHosts(null);
-
- Assert.assertNotNull(addresses);
- Assert.assertEquals(0, addresses.length);
- }
-
- public void testSplitSingleAddress() throws Exception
- {
- String[] addresses = TransportConfiguration.splitHosts("localhost");
-
- Assert.assertNotNull(addresses);
- Assert.assertEquals(1, addresses.length);
- Assert.assertEquals("localhost", addresses[0]);
- }
-
- public void testSplitManyAddresses() throws Exception
- {
- String[] addresses = TransportConfiguration.splitHosts("localhost, 127.0.0.1, 192.168.0.10");
-
- Assert.assertNotNull(addresses);
- Assert.assertEquals(3, addresses.length);
- Assert.assertEquals("localhost", addresses[0]);
- Assert.assertEquals("127.0.0.1", addresses[1]);
- Assert.assertEquals("192.168.0.10", addresses[2]);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-06-30 05:53:02 UTC (rev 10903)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-07-01 12:23:16 UTC (rev 10904)
@@ -83,7 +83,7 @@
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
-
+
public void connectionReadyForWrites(Object connectionID, boolean ready)
{
}
@@ -108,12 +108,40 @@
acceptor.stop();
Assert.assertFalse(acceptor.isStarted());
UnitTestCase.checkFreePort(TransportConstants.DEFAULT_PORT);
-
+
pool1.shutdown();
pool2.shutdown();
-
+
pool1.awaitTermination(1, TimeUnit.SECONDS);
pool2.awaitTermination(1, TimeUnit.SECONDS);
}
+ public void testSplitNullAddress() throws Exception
+ {
+ String[] addresses = NettyAcceptor.splitHosts(null);
+
+ Assert.assertNotNull(addresses);
+ Assert.assertEquals(0, addresses.length);
+ }
+
+ public void testSplitSingleAddress() throws Exception
+ {
+ String[] addresses = NettyAcceptor.splitHosts("localhost");
+
+ Assert.assertNotNull(addresses);
+ Assert.assertEquals(1, addresses.length);
+ Assert.assertEquals("localhost", addresses[0]);
+ }
+
+ public void testSplitManyAddresses() throws Exception
+ {
+ String[] addresses = NettyAcceptor.splitHosts("localhost, 127.0.0.1, 192.168.0.10");
+
+ Assert.assertNotNull(addresses);
+ Assert.assertEquals(3, addresses.length);
+ Assert.assertEquals("localhost", addresses[0]);
+ Assert.assertEquals("127.0.0.1", addresses[1]);
+ Assert.assertEquals("192.168.0.10", addresses[2]);
+ }
+
}
13 years, 5 months