Author: borges
Date: 2011-07-01 08:23:52 -0400 (Fri, 01 Jul 2011)
New Revision: 10905
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
Log:
Place standard channel numbers into constants (better than at code comment)
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-07-01
12:23:16 UTC (rev 10904)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-07-01
12:23:52 UTC (rev 10905)
@@ -37,9 +37,33 @@
*/
public class ChannelImpl implements Channel
{
+ public enum CHANNEL_ID {
+
+ /**
+ * Used for core protocol management.
+ * @see CoreProtocolManager
+ */
+ PING(0),
+ /** Session creation and attachment. */
+ SESSION(1),
+ /** Replication, i.e. for backups that do not share the journal. */
+ REPLICATION(2),
+ /**
+ * Channels [0-9] are reserved for the system, user channels must be greater than
that.
+ */
+ USER(10);
+
+ public final long id;
+
+ CHANNEL_ID(long id)
+ {
+ this.id = id;
+ }
+ }
+
private static final Logger log = Logger.getLogger(ChannelImpl.class);
- private volatile long id;
+ private final long id;
private ChannelHandler handler;
@@ -336,7 +360,7 @@
// And switch it
- final CoreRemotingConnection rnewConnection =
(CoreRemotingConnection)newConnection;
+ final CoreRemotingConnection rnewConnection = newConnection;
rnewConnection.putChannel(id, this);
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01
12:23:16 UTC (rev 10904)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-07-01
12:23:52 UTC (rev 10905)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
@@ -71,11 +72,7 @@
private final boolean client;
- // Channels 0-9 are reserved for the system
- // 0 is for pinging
- // 1 is for session creation and attachment
- // 2 is for replication
- private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
+ private volatile SimpleIDGenerator idGenerator = new
SimpleIDGenerator(CHANNEL_ID.USER.id);
private boolean idGeneratorSynced = false;
@@ -88,13 +85,13 @@
private volatile boolean dataReceived;
private final Executor executor;
-
+
private volatile boolean executing;
-
+
private final SimpleString nodeID;
private final long creationTime;
-
+
private String clientID;
// Constructors
@@ -139,9 +136,9 @@
this.client = client;
this.executor = executor;
-
+
this.nodeID = nodeID;
-
+
this.creationTime = System.currentTimeMillis();
}
@@ -174,7 +171,7 @@
{
return transportConnection.getRemoteAddress();
}
-
+
public long getCreationTime()
{
return creationTime;
@@ -247,26 +244,26 @@
public List<CloseListener> removeCloseListeners()
{
List<CloseListener> ret = new
ArrayList<CloseListener>(closeListeners);
-
+
closeListeners.clear();
-
+
return ret;
}
public List<FailureListener> removeFailureListeners()
{
List<FailureListener> ret = new
ArrayList<FailureListener>(failureListeners);
-
+
failureListeners.clear();
-
- return ret;
+
+ return ret;
}
public void setCloseListeners(List<CloseListener> listeners)
{
closeListeners.clear();
-
- closeListeners.addAll(listeners);
+
+ closeListeners.addAll(listeners);
}
public HornetQBuffer createBuffer(final int size)
@@ -323,7 +320,7 @@
callClosingListeners();
}
-
+
public void disconnect()
{
Channel channel0 = getChannel(0, -1);
@@ -331,13 +328,13 @@
// And we remove all channels from the connection, this ensures no more packets
will be processed after this
// method is
// complete
-
+
Set<Channel> allChannels = new HashSet<Channel>(channels.values());
removeAllChannels();
// Now we are 100% sure that no more packets will be processed we can flush then
send the disconnect
-
+
for (Channel channel: allChannels)
{
channel.flushConfirmations();
@@ -408,7 +405,7 @@
}
}
}
-
+
public void checkFlushBatchBuffer()
{
transportConnection.checkFlushBatchBuffer();
@@ -422,11 +419,11 @@
try
{
final Packet packet = decoder.decode(buffer);
-
+
if (packet.isAsyncExec() && executor != null)
{
executing = true;
-
+
executor.execute(new Runnable()
{
public void run()
@@ -439,7 +436,7 @@
{
RemotingConnectionImpl.log.error("Unexpected error", t);
}
-
+
executing = false;
}
});
@@ -451,13 +448,13 @@
{
Thread.yield();
}
-
+
// Pings must always be handled out of band so we can send pings back to the
client quickly
// otherwise they would get in the queue with everything else which might
give an intolerable delay
doBufferReceived(packet);
}
-
- dataReceived = true;
+
+ dataReceived = true;
}
catch (Exception e)
{
@@ -515,8 +512,8 @@
{
channels.clear();
}
- }
-
+ }
+
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
@@ -572,7 +569,7 @@
{
clientID = cID;
}
-
+
public String getClientID()
{
return clientID;