Author: timfox
Date: 2010-01-20 14:45:20 -0500 (Wed, 20 Jan 2010)
New Revision: 8818
Added:
trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/ProtocolType.java
trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
factoring remoting layer for multi-protocols
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-20 17:11:18
UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-20 19:45:20
UTC (rev 8818)
@@ -35,6 +35,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
@@ -117,7 +118,7 @@
private final Executor executor;
- private volatile RemotingConnection remotingConnection;
+ private volatile CoreRemotingConnection remotingConnection;
private final Set<ClientProducerInternal> producers = new
ConcurrentHashSet<ClientProducerInternal>();
@@ -203,7 +204,7 @@
final int minLargeMessageSize,
final int initialMessagePacketSize,
final String groupID,
- final RemotingConnection remotingConnection,
+ final CoreRemotingConnection remotingConnection,
final int version,
final Channel channel,
final Executor executor) throws HornetQException
@@ -834,7 +835,7 @@
// Needs to be synchronized to prevent issues with occurring concurrently with
close()
- public synchronized void handleFailover(final RemotingConnection backupConnection)
+ public synchronized void handleFailover(final CoreRemotingConnection
backupConnection)
{
if (closed)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -16,6 +16,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
@@ -52,7 +53,7 @@
void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage
continuation) throws Exception;
- void handleFailover(RemotingConnection backupConnection);
+ void handleFailover(CoreRemotingConnection backupConnection);
RemotingConnection getConnection();
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-20 17:11:18
UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-20 19:45:20
UTC (rev 8818)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
@@ -364,7 +365,7 @@
return session.getXAResource();
}
- public void handleFailover(final RemotingConnection backupConnection)
+ public void handleFailover(final CoreRemotingConnection backupConnection)
{
session.handleFailover(backupConnection);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-01-20 17:11:18
UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-01-20 19:45:20
UTC (rev 8818)
@@ -16,7 +16,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
/**
* A ConnectionManager
@@ -52,7 +52,7 @@
void removeSession(final ClientSessionInternal session);
- public RemotingConnection getConnection();
+ public CoreRemotingConnection getConnection();
int numConnections();
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -37,9 +37,10 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -77,7 +78,7 @@
// debug
- private static Map<TransportConfiguration, Set<RemotingConnection>>
debugConns;
+ private static Map<TransportConfiguration, Set<CoreRemotingConnection>>
debugConns;
private static boolean debug = false;
@@ -85,7 +86,7 @@
{
FailoverManagerImpl.debug = true;
- FailoverManagerImpl.debugConns = new ConcurrentHashMap<TransportConfiguration,
Set<RemotingConnection>>();
+ FailoverManagerImpl.debugConns = new ConcurrentHashMap<TransportConfiguration,
Set<CoreRemotingConnection>>();
}
public static void disableDebug()
@@ -96,9 +97,9 @@
FailoverManagerImpl.debugConns = null;
}
- private void checkAddDebug(final RemotingConnection conn)
+ private void checkAddDebug(final CoreRemotingConnection conn)
{
- Set<RemotingConnection> conns;
+ Set<CoreRemotingConnection> conns;
synchronized (FailoverManagerImpl.debugConns)
{
@@ -106,7 +107,7 @@
if (conns == null)
{
- conns = new HashSet<RemotingConnection>();
+ conns = new HashSet<CoreRemotingConnection>();
FailoverManagerImpl.debugConns.put(connectorConfig, conns);
}
@@ -117,7 +118,7 @@
public static void failAllConnectionsForConnector(final TransportConfiguration
config)
{
- Set<RemotingConnection> conns;
+ Set<CoreRemotingConnection> conns;
synchronized (FailoverManagerImpl.debugConns)
{
@@ -125,13 +126,13 @@
if (conns != null)
{
- conns = new
HashSet<RemotingConnection>(FailoverManagerImpl.debugConns.get(config));
+ conns = new
HashSet<CoreRemotingConnection>(FailoverManagerImpl.debugConns.get(config));
}
}
if (conns != null)
{
- for (RemotingConnection conn : conns)
+ for (CoreRemotingConnection conn : conns)
{
conn.fail(new HornetQException(HornetQException.INTERNAL_ERROR,
"simulated connection failure"));
}
@@ -175,7 +176,7 @@
private final Executor closeExecutor;
- private RemotingConnection connection;
+ private CoreRemotingConnection connection;
private final long retryInterval;
@@ -272,7 +273,7 @@
// ConnectionLifeCycleListener implementation
--------------------------------------------------
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
@@ -319,7 +320,7 @@
{
Version clientVersion = VersionLoader.getVersion();
- RemotingConnection theConnection = null;
+ CoreRemotingConnection theConnection = null;
Lock lock = null;
@@ -651,7 +652,7 @@
// So.. do failover / reconnection
- RemotingConnection oldConnection = connection;
+ CoreRemotingConnection oldConnection = connection;
connection = null;
@@ -753,9 +754,9 @@
/*
* Re-attach sessions all pre-existing sessions to the new remoting connection
*/
- private void reconnectSessions(final RemotingConnection oldConnection, final int
reconnectAttempts)
+ private void reconnectSessions(final CoreRemotingConnection oldConnection, final int
reconnectAttempts)
{
- RemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
+ CoreRemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
if (newConnection == null)
{
@@ -786,7 +787,7 @@
}
}
- private RemotingConnection getConnectionWithRetry(final int reconnectAttempts)
+ private CoreRemotingConnection getConnectionWithRetry(final int reconnectAttempts)
{
long interval = retryInterval;
@@ -799,7 +800,7 @@
return null;
}
- RemotingConnection theConnection = getConnection();
+ CoreRemotingConnection theConnection = getConnection();
if (theConnection == null)
{
@@ -897,7 +898,7 @@
}
}
- public RemotingConnection getConnection()
+ public CoreRemotingConnection getConnection()
{
if (connection == null)
{
@@ -1057,9 +1058,9 @@
private class Channel0Handler implements ChannelHandler
{
- private final RemotingConnection conn;
+ private final CoreRemotingConnection conn;
- private Channel0Handler(final RemotingConnection conn)
+ private Channel0Handler(final CoreRemotingConnection conn)
{
this.conn = conn;
}
@@ -1088,7 +1089,7 @@
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
- RemotingConnection theConn = connection;
+ CoreRemotingConnection theConn = connection;
if (theConn != null && connectionID == theConn.getID())
{
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 17:11:18 UTC (rev
8817)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 19:45:20 UTC (rev
8818)
@@ -17,10 +17,10 @@
import org.hornetq.api.core.HornetQException;
/**
- * A channel is a way of interleaving data meant for different endpoints over the same
{@link org.hornetq.core.remoting.RemotingConnection}.
+ * A channel is a way of interleaving data meant for different endpoints over the same
{@link org.hornetq.core.remoting.CoreRemotingConnection}.
* <p/>
* Any packet sent will have its channel id set to the specific channel sending so it can
be routed to its correct channel
- * when received by the {@link org.hornetq.core.remoting.RemotingConnection}. see {@link
org.hornetq.core.remoting.Packet#setChannelID(long)}.
+ * when received by the {@link org.hornetq.core.remoting.CoreRemotingConnection}. see
{@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
* <p/>
* Each Channel should will forward any packets received to its {@link
org.hornetq.core.remoting.ChannelHandler}.
* <p/>
@@ -81,7 +81,7 @@
*
* @param newConnection the new connection
*/
- void transferConnection(RemotingConnection newConnection);
+ void transferConnection(CoreRemotingConnection newConnection);
/**
* resends any packets that have not received confirmations yet.
@@ -126,7 +126,7 @@
/**
* returns the Remoting Connection being used by the channel
*/
- RemotingConnection getConnection();
+ CoreRemotingConnection getConnection();
/**
* sends a confirmation of a packet being received.
@@ -148,7 +148,7 @@
void flushConfirmations();
/**
- * Called by {@link org.hornetq.core.remoting.RemotingConnection} when a packet is
received.
+ * Called by {@link org.hornetq.core.remoting.CoreRemotingConnection} when a packet is
received.
* <p/>
* This method should then call its {@link org.hornetq.core.remoting.ChannelHandler}
after appropriate processing of
* the packet
Added: trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+
+/**
+ * Extension of RemotingConnection for the HornetQ core protocol
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface CoreRemotingConnection extends RemotingConnection
+{
+ /**
+ * return the channel with the channel id specified.
+ * <p/>
+ * If it does not exist create it with the confirmation window size.
+ *
+ * @param channelID the channel id
+ * @param confWindowSize the confirmation window size
+ * @return the channel
+ */
+ Channel getChannel(long channelID, int confWindowSize);
+
+ /**
+ * add the channel with the specified channel id
+ *
+ * @param channelID the channel id
+ * @param channel the channel
+ */
+ void putChannel(long channelID, Channel channel);
+
+ /**
+ * remove the channel with the specified channel id
+ *
+ * @param channelID the channel id
+ * @return true if removed
+ */
+ boolean removeChannel(long channelID);
+
+ /**
+ * generate a unique (within this connection) channel id
+ *
+ * @return the id
+ */
+ long generateChannelID();
+
+ /**
+ * resets the id generator used to when generating id's
+ *
+ * @param id the first id to set it to
+ */
+ void syncIDGeneratorSequence(long id);
+
+ /**
+ * return the next id that will be chosen.
+ *
+ * @return the id
+ */
+ long getIDGeneratorSequence();
+
+ /**
+ * return the current tomeout for blocking calls
+ *
+ * @return the timeout in milliseconds
+ */
+ long getBlockingCallTimeout();
+
+ /**
+ * return the transfer lock used when transferring connections.
+ *
+ * @return the lock
+ */
+ Object getTransferLock();
+}
Added: trunk/src/main/org/hornetq/core/remoting/ProtocolType.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/ProtocolType.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/ProtocolType.java 2010-01-20 19:45:20 UTC
(rev 8818)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+/**
+ * A ProtocolType
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public enum ProtocolType
+{
+ CORE, STOMP, AMQP;
+}
Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 17:11:18
UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 19:45:20
UTC (rev 8818)
@@ -46,40 +46,6 @@
String getRemoteAddress();
/**
- * return the channel with the channel id specified.
- * <p/>
- * If it does not exist create it with the confirmation window size.
- *
- * @param channelID the channel id
- * @param confWindowSize the confirmation window size
- * @return the channel
- */
- Channel getChannel(long channelID, int confWindowSize);
-
- /**
- * add the channel with the specified channel id
- *
- * @param channelID the channel id
- * @param channel the channel
- */
- void putChannel(long channelID, Channel channel);
-
- /**
- * remove the channel with the specified channel id
- *
- * @param channelID the channel id
- * @return true if removed
- */
- boolean removeChannel(long channelID);
-
- /**
- * generate a unique (within this connection) channel id
- *
- * @return the id
- */
- long generateChannelID();
-
- /**
* add a failure listener.
* <p/>
* The listener will be called in the event of connection failure.
@@ -150,20 +116,6 @@
void destroy();
/**
- * resets the id generator used to when generating id's
- *
- * @param id the first id to set it to
- */
- void syncIDGeneratorSequence(long id);
-
- /**
- * return the next id that will be chosen.
- *
- * @return the id
- */
- long getIDGeneratorSequence();
-
- /**
* return the underlying Connection.
*
* @return the connection
@@ -182,36 +134,23 @@
*
* @return true if destroyed, otherwise false
*/
- boolean isDestroyed();
-
+ boolean isDestroyed();
+
/**
- * return the current tomeout for blocking calls
- *
- * @return the timeout in milliseconds
+ * Disconnect the connection, closing all channels
*/
- long getBlockingCallTimeout();
-
+ void disconnect();
+
/**
- * return the transfer lock used when transferring connections.
- *
- * @return the lock
- */
- Object getTransferLock();
-
- /**
* returns true if any data has been received since the last time this method was
called.
*
* @return true if data has been received.
*/
boolean checkDataReceived();
-
+
/**
- * remove all channels from the remoting connection
+ * flush all outstanding data from the connection.
*/
- void removeAllChannels();
+ void flush();
- /**
- * flush all outstanding confirmations onto the connection.
- */
- void flushConfirmations();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 17:11:18 UTC
(rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 19:45:20 UTC
(rev 8818)
@@ -25,6 +25,7 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -52,7 +53,7 @@
private volatile int lastConfirmedCommandID = -1;
- private volatile RemotingConnection connection;
+ private volatile CoreRemotingConnection connection;
private volatile boolean closed;
@@ -76,7 +77,7 @@
private volatile boolean transferring;
- public ChannelImpl(final RemotingConnection connection, final long id, final int
confWindowSize)
+ public ChannelImpl(final CoreRemotingConnection connection, final long id, final int
confWindowSize)
{
this.connection = connection;
@@ -316,7 +317,7 @@
closed = true;
}
- public void transferConnection(final RemotingConnection newConnection)
+ public void transferConnection(final CoreRemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
// the old connection get processed after transfer has occurred
@@ -326,7 +327,7 @@
// And switch it
- final RemotingConnectionImpl rnewConnection =
(RemotingConnectionImpl)newConnection;
+ final CoreRemotingConnection rnewConnection =
(CoreRemotingConnection)newConnection;
rnewConnection.putChannel(id, this);
@@ -369,7 +370,7 @@
lock.unlock();
}
- public RemotingConnection getConnection()
+ public CoreRemotingConnection getConnection()
{
return connection;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -26,9 +26,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +38,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements
RemotingConnection
+public class RemotingConnectionImpl extends AbstractBufferHandler implements
CoreRemotingConnection
{
// Constants
//
------------------------------------------------------------------------------------
@@ -84,6 +84,8 @@
private volatile boolean dataReceived;
private final Executor executor;
+
+ private volatile boolean executing;
// Constructors
// ---------------------------------------------------------------------------------
@@ -275,7 +277,22 @@
callClosingListeners();
}
+
+ public void disconnect()
+ {
+ Channel channel0 = getChannel(0, -1);
+ // And we remove all channels from the connection, this ensures no more packets
will be processed after this
+ // method is
+ // complete
+
+ removeAllChannels();
+
+ // Now we are 100% sure that no more packets will be processed we can send the
disconnect
+
+ channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ }
+
public long generateChannelID()
{
return idGenerator.generateID();
@@ -325,20 +342,12 @@
return res;
}
- public void removeAllChannels()
+ //We flush any confirmations on the connection - this prevents idle bridges for
example
+ //sitting there with many unacked messages
+ public void flush()
{
- // We get the transfer lock first - this ensures no packets are being processed
AND
- // it's guaranteed no more packets will be processed once this method is
complete
synchronized (transferLock)
{
- channels.clear();
- }
- }
-
- public void flushConfirmations()
- {
- synchronized (transferLock)
- {
for (Channel channel : channels.values())
{
channel.flushConfirmations();
@@ -349,8 +358,6 @@
// Buffer Handler implementation
// ----------------------------------------------------
- private volatile boolean executing;
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
@@ -434,6 +441,15 @@
// Private
//
--------------------------------------------------------------------------------------
+ private void removeAllChannels()
+ {
+ // We get the transfer lock first - this ensures no packets are being processed
AND
+ // it's guaranteed no more packets will be processed once this method is
complete
+ synchronized (transferLock)
+ {
+ channels.clear();
+ }
+ }
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new
ArrayList<FailureListener>(failureListeners);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.remoting.Acceptor;
@@ -215,14 +216,14 @@
this.connector = connector;
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id
" + connection.getID());
}
- listener.connectionCreated(connection);
+ listener.connectionCreated(connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -17,8 +17,8 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -70,7 +70,7 @@
this.executor = executor;
- listener.connectionCreated(this);
+ listener.connectionCreated(this, ProtocolType.CORE);
}
private volatile boolean closing;
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -174,14 +175,14 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id
" + connection.getID());
}
- listener.connectionCreated(connection);
+ listener.connectionCreated(connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Added: trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * A ConnectionEntry
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConnectionEntry
+{
+ public final RemotingConnection connection;
+
+ public volatile long lastCheck;
+
+ public volatile long ttl;
+
+ public ConnectionEntry(final RemotingConnection connection, final long lastCheck,
final long ttl)
+ {
+ this.connection = connection;
+
+ this.lastCheck = lastCheck;
+
+ this.ttl = ttl;
+ }
+}
Added: trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A ProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ProtocolManager
+{
+ ConnectionEntry createConnectionEntry(Connection connection);
+
+}
Added: trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A CoreProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class CoreProtocolManager implements ProtocolManager
+{
+ private final HornetQServer server;
+
+ private final List<Interceptor> interceptors;
+
+ public CoreProtocolManager(final HornetQServer server,
+ final List<Interceptor> interceptors)
+ {
+ this.server = server;
+
+ this.interceptors = interceptors;
+ }
+
+ public ConnectionEntry createConnectionEntry(final Connection connection)
+ {
+ final Configuration config = server.getConfiguration();
+
+ CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
+ interceptors,
+
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory().getExecutor()
+
: null);
+
+ Channel channel1 = rc.getChannel(1, -1);
+
+ ChannelHandler handler = new HornetQPacketHandler(server, channel1, rc);
+
+ channel1.setHandler(handler);
+
+ long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ if (config.getConnectionTTLOverride() != -1)
+ {
+ ttl = config.getConnectionTTLOverride();
+ }
+
+ final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(),
ttl);
+
+ final Channel channel0 = rc.getChannel(0, -1);
+
+ channel0.setHandler(new ChannelHandler()
+ {
+ public void handlePacket(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.PING)
+ {
+ Ping ping = (Ping)packet;
+
+ if (config.getConnectionTTLOverride() == -1)
+ {
+ // Allow clients to specify connection ttl
+ entry.ttl = ping.getConnectionTTL();
+ }
+
+ // Just send a ping back
+ channel0.send(packet);
+ }
+ }
+ });
+
+ return entry;
+ }
+}
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -26,20 +26,15 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
@@ -89,7 +84,10 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+
+ private Map<ProtocolType, ProtocolManager> protocolMap =
+ new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -122,6 +120,8 @@
this.managementService = managementService;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
+
+ this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManager(server,
interceptors));
}
// RemotingService implementation -------------------------------
@@ -170,6 +170,7 @@
if (managementService != null)
{
acceptor.setNotificationService(managementService);
+
managementService.registerAcceptor(acceptor, info);
}
}
@@ -235,17 +236,7 @@
{
RemotingConnection conn = entry.connection;
- Channel channel0 = conn.getChannel(0, -1);
-
- // And we remove all channels from the connection, this ensures no more packets
will be processed after this
- // method is
- // complete
-
- conn.removeAllChannels();
-
- // Now we are 100% sure that no more packets will be processed we can send the
disconnect
-
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ conn.disconnect();
}
for (Acceptor acceptor : acceptors)
@@ -304,61 +295,32 @@
// ConnectionLifeCycleListener implementation -----------------------------------
- public void connectionCreated(final Connection connection)
+ private ProtocolManager getProtocolManager(ProtocolType protocol)
{
+ return protocolMap.get(protocol);
+ }
+
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ {
if (server == null)
{
throw new IllegalStateException("Unable to create connection, server
hasn't finished starting up");
}
-
- RemotingConnection rc = new RemotingConnectionImpl(connection,
- interceptors,
-
config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
-
.getExecutor()
-
: null);
-
- Channel channel1 = rc.getChannel(1, -1);
-
- ChannelHandler handler = createHandler(rc, channel1);
-
- channel1.setHandler(handler);
-
- long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- if (config.getConnectionTTLOverride() != -1)
+
+ ProtocolManager pmgr = this.getProtocolManager(protocol);
+
+ if (pmgr == null)
{
- ttl = config.getConnectionTTLOverride();
+ throw new IllegalArgumentException("Unknown protocol " + protocol);
}
- final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(),
ttl);
-
+ ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+
connections.put(connection.getID(), entry);
- final Channel channel0 = rc.getChannel(0, -1);
-
- channel0.setHandler(new ChannelHandler()
- {
- public void handlePacket(final Packet packet)
- {
- if (packet.getType() == PacketImpl.PING)
- {
- Ping ping = (Ping)packet;
-
- if (config.getConnectionTTLOverride() == -1)
- {
- // Allow clients to specify connection ttl
- entry.ttl = ping.getConnectionTTL();
- }
-
- // Just send a ping back
- channel0.send(packet);
- }
- }
- });
-
if (config.isBackup())
{
- serverSideReplicatingConnection = rc;
+ serverSideReplicatingConnection = entry.connection;
}
}
@@ -409,14 +371,6 @@
// Protected -----------------------------------------------------
- /**
- * Subclasses (on tests) may use this to create a different channel.
- */
- protected ChannelHandler createHandler(final RemotingConnection rc, final Channel
channel)
- {
- return new HornetQPacketHandler(server, channel, rc);
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -434,24 +388,6 @@
}
}
- private static final class ConnectionEntry
- {
- final RemotingConnection connection;
-
- volatile long lastCheck;
-
- volatile long ttl;
-
- ConnectionEntry(final RemotingConnection connection, final long lastCheck, final
long ttl)
- {
- this.connection = connection;
-
- this.lastCheck = lastCheck;
-
- this.ttl = ttl;
- }
- }
-
private final class FailureCheckAndFlushThread extends Thread
{
private final long pauseInterval;
@@ -516,11 +452,8 @@
}
if (flush)
- {
- //We flush any confirmations on the connection - this prevents idle
bridges for example
- //sitting there with many unacked messages
-
- conn.flushConfirmations();
+ {
+ conn.flush();
}
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -31,8 +31,8 @@
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
@@ -69,7 +69,7 @@
private final FailoverManager failoverManager;
- private RemotingConnection replicatingConnection;
+ private CoreRemotingConnection replicatingConnection;
private Channel replicatingChannel;
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-20 17:11:18 UTC (rev
8817)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-20 19:45:20 UTC (rev
8818)
@@ -24,7 +24,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
@@ -70,7 +70,7 @@
void unregisterActivateCallback(ActivateCallback callback);
- ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String
name, int lastConfirmedCommandID) throws Exception;
+ ReattachSessionResponseMessage reattachSession(CoreRemotingConnection connection,
String name, int lastConfirmedCommandID) throws Exception;
/** The journal at the backup server has to be equivalent as the journal used on the
live node.
* Or else the backup node is out of sync. */
@@ -82,7 +82,7 @@
String password,
int minLargeMessageSize,
int incrementingVersion,
- RemotingConnection remotingConnection,
+ CoreRemotingConnection remotingConnection,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 17:11:18 UTC (rev
8817)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 19:45:20 UTC (rev
8818)
@@ -19,7 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
/**
*
@@ -109,7 +109,7 @@
void close() throws Exception;
- int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
+ int transferConnection(CoreRemotingConnection newConnection, int
lastReceivedCommandID);
Channel getChannel();
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -22,8 +22,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -48,9 +48,9 @@
private final Channel channel1;
- private final RemotingConnection connection;
+ private final CoreRemotingConnection connection;
- public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final
RemotingConnection connection)
+ public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final
CoreRemotingConnection connection)
{
this.server = server;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-20 17:11:18
UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-20 19:45:20
UTC (rev 8818)
@@ -69,7 +69,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
@@ -536,7 +536,7 @@
return clusterManager;
}
- public ReattachSessionResponseMessage reattachSession(final RemotingConnection
connection,
+ public ReattachSessionResponseMessage reattachSession(final CoreRemotingConnection
connection,
final String name,
final int
lastConfirmedCommandID) throws Exception
{
@@ -593,7 +593,7 @@
final String password,
final int minLargeMessageSize,
final int incrementingVersion,
- final RemotingConnection
connection,
+ final CoreRemotingConnection
connection,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 17:11:18
UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 19:45:20
UTC (rev 8818)
@@ -44,8 +44,8 @@
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.server.BindingQueryResult;
@@ -100,7 +100,7 @@
private final boolean strictUpdateDeliveryCount;
- private RemotingConnection remotingConnection;
+ private CoreRemotingConnection remotingConnection;
private Channel channel;
@@ -152,7 +152,7 @@
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final boolean xa,
- final RemotingConnection remotingConnection,
+ final CoreRemotingConnection remotingConnection,
final Channel channel,
final StorageManager storageManager,
final PostOffice postOffice,
@@ -1037,7 +1037,7 @@
}
}
- public int transferConnection(final RemotingConnection newConnection, final int
lastReceivedCommandID)
+ public int transferConnection(final CoreRemotingConnection newConnection, final int
lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is
occurring- otherwise packets might get
// delivered
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -59,9 +59,9 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
@@ -126,7 +126,7 @@
private final Channel channel;
- private volatile RemotingConnection remotingConnection;
+ private volatile CoreRemotingConnection remotingConnection;
public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
@@ -205,45 +205,8 @@
return channel;
}
- public int transferConnection(final RemotingConnection newConnection, final int
lastReceivedCommandID)
- {
- // We need to disable delivery on all the consumers while the transfer is
occurring- otherwise packets might get
- // delivered
- // after the channel has transferred but *before* packets have been replayed - this
will give the client the wrong
- // sequence of packets.
- // It is not sufficient to just stop the session, since right after stopping the
session, another session start
- // might be executed
- // before we have transferred the connection, leaving it in a started state
- session.setTransferring(true);
+
- removeConnectionListeners();
-
- // Note. We do not destroy the replicating connection here. In the case the live
server has really crashed
- // then the connection will get cleaned up anyway when the server ping timeout
kicks in.
- // In the case the live server is really still up, i.e. a split brain situation (or
in tests), then closing
- // the replicating connection will cause the outstanding responses to be be
replayed on the live server,
- // if these reach the client who then subsequently fails over, on reconnection to
backup, it will have
- // received responses that the backup did not know about.
-
- channel.transferConnection(newConnection);
-
-
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
- remotingConnection = newConnection;
-
- addConnectionListeners();
-
- int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
-
- channel.replayCommands(lastReceivedCommandID);
-
- channel.setTransferring(false);
-
- session.setTransferring(false);
-
- return serverLastReceivedCommandID;
- }
-
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
@@ -513,14 +514,14 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id
" + connection.getID());
}
- listener.connectionCreated(connection);
+ listener.connectionCreated(connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -55,7 +56,7 @@
this.listener = listener;
- listener.connectionCreated(this);
+ listener.connectionCreated(this, ProtocolType.CORE);
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -30,6 +30,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -598,7 +599,7 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
{
Modified: trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
---
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -13,6 +13,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
/**
* A ConnectionLifeCycleListener is called by the remoting implementation to notify of
connection events.
@@ -26,7 +27,7 @@
*
* @param connection the connection that has been created
*/
- void connectionCreated(Connection connection);
+ void connectionCreated(Connection connection, ProtocolType protocol);
/**
* called when a connection is destroyed.
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -25,8 +25,8 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -51,7 +51,7 @@
private HornetQServer server;
- private RemotingConnection connection;
+ private CoreRemotingConnection connection;
// Static --------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyConnector;
@@ -549,7 +550,7 @@
latch = connCreatedLatch;
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
this.connection = connection;
if (latch != null)
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-01-20
17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -22,13 +22,17 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
@@ -345,14 +349,14 @@
session.addFailureListener(clientListener);
- RemotingConnection serverConn = null;
+ CoreRemotingConnection serverConn = null;
while (serverConn == null)
{
Set<RemotingConnection> conns =
server.getRemotingService().getConnections();
if (!conns.isEmpty())
{
- serverConn = server.getRemotingService().getConnections().iterator().next();
+ serverConn =
(CoreRemotingConnection)server.getRemotingService().getConnections().iterator().next();
}
else
{
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
@@ -63,7 +64,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
};
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -77,7 +78,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
};
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.integration.transports.netty.NettyConnection;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -219,7 +220,7 @@
class MyListener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-20
17:11:18 UTC (rev 8817)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-20
19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -66,7 +67,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
};
@@ -103,7 +104,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType
protocol)
{
}
};