JBoss hornetq SVN: r8820 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-21 05:29:24 -0500 (Thu, 21 Jan 2010)
New Revision: 8820
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Log:
removed unused import preventing the project to build
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-21 09:02:52 UTC (rev 8819)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-21 10:29:24 UTC (rev 8820)
@@ -46,7 +46,6 @@
import java.util.List;
-import javax.security.auth.message.callback.PrivateKeyCallback.Request;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
15 years, 11 months
JBoss hornetq SVN: r8819 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-21 04:02:52 -0500 (Thu, 21 Jan 2010)
New Revision: 8819
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Log:
fixed security test
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20 19:45:20 UTC (rev 8818)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-21 09:02:52 UTC (rev 8819)
@@ -46,6 +46,7 @@
import java.util.List;
+import javax.security.auth.message.callback.PrivateKeyCallback.Request;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -204,9 +205,7 @@
{
return channel;
}
-
-
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
@@ -216,7 +215,8 @@
Packet response = null;
boolean flush = false;
boolean closeChannel = false;
-
+ boolean requiresResponse = false;
+
try
{
try
@@ -224,13 +224,14 @@
switch (type)
{
case SESS_CREATECONSUMER:
- {
- SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
+ {
+ SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
+ requiresResponse = request.isRequiresResponse();
session.createConsumer(request.getID(),
request.getQueueName(),
request.getFilterString(),
request.isBrowseOnly());
- if (request.isRequiresResponse())
+ if (requiresResponse)
{
// We send back queue information on the queue as a response- this allows the queue to
// be automaticall recreated on failover
@@ -242,19 +243,21 @@
case CREATE_QUEUE:
{
CreateQueueMessage request = (CreateQueueMessage)packet;
+ requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(),
request.getQueueName(),
request.getFilterString(),
request.isTemporary(),
request.isDurable());
- if (request.isRequiresResponse())
+ if (requiresResponse)
{
response = new NullResponseMessage();
}
break;
}
case DELETE_QUEUE:
- {
+ {
+ requiresResponse = true;
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
session.deleteQueue(request.getQueueName());
response = new NullResponseMessage();
@@ -262,6 +265,7 @@
}
case SESS_QUEUEQUERY:
{
+ requiresResponse = true;
SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
response = new SessionQueueQueryResponseMessage(result);
@@ -269,6 +273,7 @@
}
case SESS_BINDINGQUERY:
{
+ requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
BindingQueryResult result = session.executeBindingQuery(request.getAddress());
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
@@ -277,8 +282,9 @@
case SESS_ACKNOWLEDGE:
{
SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
+ requiresResponse = message.isRequiresResponse();
session.acknowledge(message.getConsumerID(), message.getMessageID());
- if (message.isRequiresResponse())
+ if (requiresResponse)
{
response = new NullResponseMessage();
}
@@ -292,18 +298,21 @@
}
case SESS_COMMIT:
{
+ requiresResponse = true;
session.commit();
response = new NullResponseMessage();
break;
}
case SESS_ROLLBACK:
{
+ requiresResponse = true;
session.rollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
response = new NullResponseMessage();
break;
}
case SESS_XA_COMMIT:
{
+ requiresResponse = true;
SessionXACommitMessage message = (SessionXACommitMessage)packet;
session.xaCommit(message.getXid(), message.isOnePhase());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -311,6 +320,7 @@
}
case SESS_XA_END:
{
+ requiresResponse = true;
SessionXAEndMessage message = (SessionXAEndMessage)packet;
session.xaEnd(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -318,6 +328,7 @@
}
case SESS_XA_FORGET:
{
+ requiresResponse = true;
SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
session.xaForget(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -325,6 +336,7 @@
}
case SESS_XA_JOIN:
{
+ requiresResponse = true;
SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
session.xaJoin(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -332,6 +344,7 @@
}
case SESS_XA_RESUME:
{
+ requiresResponse = true;
SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
session.xaResume(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -339,6 +352,7 @@
}
case SESS_XA_ROLLBACK:
{
+ requiresResponse = true;
SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
session.xaRollback(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -346,6 +360,7 @@
}
case SESS_XA_START:
{
+ requiresResponse = true;
SessionXAStartMessage message = (SessionXAStartMessage)packet;
session.xaStart(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -353,12 +368,14 @@
}
case SESS_XA_SUSPEND:
{
+ requiresResponse = true;
session.xaSuspend();
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_PREPARE:
{
+ requiresResponse = true;
SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
session.xaPrepare(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
@@ -366,18 +383,21 @@
}
case SESS_XA_INDOUBT_XIDS:
{
+ requiresResponse = true;
List<Xid> xids = session.xaGetInDoubtXids();
response = new SessionXAGetInDoubtXidsResponseMessage(xids);
break;
}
case SESS_XA_GET_TIMEOUT:
{
+ requiresResponse = true;
int timeout = session.xaGetTimeout();
response = new SessionXAGetTimeoutResponseMessage(timeout);
break;
}
case SESS_XA_SET_TIMEOUT:
{
+ requiresResponse = true;
SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
session.xaSetTimeout(message.getTimeoutSeconds());
response = new SessionXASetTimeoutResponseMessage(true);
@@ -390,12 +410,14 @@
}
case SESS_STOP:
{
+ requiresResponse = true;
session.stop();
response = new NullResponseMessage();
break;
}
case SESS_CLOSE:
{
+ requiresResponse = true;
session.close();
removeConnectionListeners();
response = new NullResponseMessage();
@@ -405,6 +427,7 @@
}
case SESS_CONSUMER_CLOSE:
{
+ requiresResponse = true;
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
session.closeConsumer(message.getConsumerID());
response = new NullResponseMessage();
@@ -419,8 +442,9 @@
case SESS_SEND:
{
SessionSendMessage message = (SessionSendMessage)packet;
+ requiresResponse = message.isRequiresResponse();
session.send((ServerMessage)message.getMessage());
- if (message.isRequiresResponse())
+ if (requiresResponse)
{
response = new NullResponseMessage();
}
@@ -435,8 +459,9 @@
case SESS_SEND_CONTINUATION:
{
SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
+ requiresResponse = message.isRequiresResponse();
session.sendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
- if (message.isRequiresResponse())
+ if (requiresResponse)
{
response = new NullResponseMessage();
}
@@ -458,11 +483,25 @@
}
catch (HornetQXAException e)
{
- response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+ if (requiresResponse)
+ {
+ response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+ }
+ else
+ {
+ log.error("Caught XA exception", e);
+ }
}
catch (HornetQException e)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ if (requiresResponse)
+ {
+ response = new HornetQExceptionMessage((HornetQException)e);
+ }
+ else
+ {
+ log.error("Caught exception", e);
+ }
}
catch (Throwable t)
{
15 years, 11 months
JBoss hornetq SVN: r8818 - in trunk: src/main/org/hornetq/core/remoting and 13 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-20 14:45:20 -0500 (Wed, 20 Jan 2010)
New Revision: 8818
Added:
trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/ProtocolType.java
trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
factoring remoting layer for multi-protocols
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -35,6 +35,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
@@ -117,7 +118,7 @@
private final Executor executor;
- private volatile RemotingConnection remotingConnection;
+ private volatile CoreRemotingConnection remotingConnection;
private final Set<ClientProducerInternal> producers = new ConcurrentHashSet<ClientProducerInternal>();
@@ -203,7 +204,7 @@
final int minLargeMessageSize,
final int initialMessagePacketSize,
final String groupID,
- final RemotingConnection remotingConnection,
+ final CoreRemotingConnection remotingConnection,
final int version,
final Channel channel,
final Executor executor) throws HornetQException
@@ -834,7 +835,7 @@
// Needs to be synchronized to prevent issues with occurring concurrently with close()
- public synchronized void handleFailover(final RemotingConnection backupConnection)
+ public synchronized void handleFailover(final CoreRemotingConnection backupConnection)
{
if (closed)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -16,6 +16,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
@@ -52,7 +53,7 @@
void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage continuation) throws Exception;
- void handleFailover(RemotingConnection backupConnection);
+ void handleFailover(CoreRemotingConnection backupConnection);
RemotingConnection getConnection();
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -27,6 +27,7 @@
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
@@ -364,7 +365,7 @@
return session.getXAResource();
}
- public void handleFailover(final RemotingConnection backupConnection)
+ public void handleFailover(final CoreRemotingConnection backupConnection)
{
session.handleFailover(backupConnection);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -16,7 +16,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.SessionFailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
/**
* A ConnectionManager
@@ -52,7 +52,7 @@
void removeSession(final ClientSessionInternal session);
- public RemotingConnection getConnection();
+ public CoreRemotingConnection getConnection();
int numConnections();
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -37,9 +37,10 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -77,7 +78,7 @@
// debug
- private static Map<TransportConfiguration, Set<RemotingConnection>> debugConns;
+ private static Map<TransportConfiguration, Set<CoreRemotingConnection>> debugConns;
private static boolean debug = false;
@@ -85,7 +86,7 @@
{
FailoverManagerImpl.debug = true;
- FailoverManagerImpl.debugConns = new ConcurrentHashMap<TransportConfiguration, Set<RemotingConnection>>();
+ FailoverManagerImpl.debugConns = new ConcurrentHashMap<TransportConfiguration, Set<CoreRemotingConnection>>();
}
public static void disableDebug()
@@ -96,9 +97,9 @@
FailoverManagerImpl.debugConns = null;
}
- private void checkAddDebug(final RemotingConnection conn)
+ private void checkAddDebug(final CoreRemotingConnection conn)
{
- Set<RemotingConnection> conns;
+ Set<CoreRemotingConnection> conns;
synchronized (FailoverManagerImpl.debugConns)
{
@@ -106,7 +107,7 @@
if (conns == null)
{
- conns = new HashSet<RemotingConnection>();
+ conns = new HashSet<CoreRemotingConnection>();
FailoverManagerImpl.debugConns.put(connectorConfig, conns);
}
@@ -117,7 +118,7 @@
public static void failAllConnectionsForConnector(final TransportConfiguration config)
{
- Set<RemotingConnection> conns;
+ Set<CoreRemotingConnection> conns;
synchronized (FailoverManagerImpl.debugConns)
{
@@ -125,13 +126,13 @@
if (conns != null)
{
- conns = new HashSet<RemotingConnection>(FailoverManagerImpl.debugConns.get(config));
+ conns = new HashSet<CoreRemotingConnection>(FailoverManagerImpl.debugConns.get(config));
}
}
if (conns != null)
{
- for (RemotingConnection conn : conns)
+ for (CoreRemotingConnection conn : conns)
{
conn.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "simulated connection failure"));
}
@@ -175,7 +176,7 @@
private final Executor closeExecutor;
- private RemotingConnection connection;
+ private CoreRemotingConnection connection;
private final long retryInterval;
@@ -272,7 +273,7 @@
// ConnectionLifeCycleListener implementation --------------------------------------------------
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
@@ -319,7 +320,7 @@
{
Version clientVersion = VersionLoader.getVersion();
- RemotingConnection theConnection = null;
+ CoreRemotingConnection theConnection = null;
Lock lock = null;
@@ -651,7 +652,7 @@
// So.. do failover / reconnection
- RemotingConnection oldConnection = connection;
+ CoreRemotingConnection oldConnection = connection;
connection = null;
@@ -753,9 +754,9 @@
/*
* Re-attach sessions all pre-existing sessions to the new remoting connection
*/
- private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts)
+ private void reconnectSessions(final CoreRemotingConnection oldConnection, final int reconnectAttempts)
{
- RemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
+ CoreRemotingConnection newConnection = getConnectionWithRetry(reconnectAttempts);
if (newConnection == null)
{
@@ -786,7 +787,7 @@
}
}
- private RemotingConnection getConnectionWithRetry(final int reconnectAttempts)
+ private CoreRemotingConnection getConnectionWithRetry(final int reconnectAttempts)
{
long interval = retryInterval;
@@ -799,7 +800,7 @@
return null;
}
- RemotingConnection theConnection = getConnection();
+ CoreRemotingConnection theConnection = getConnection();
if (theConnection == null)
{
@@ -897,7 +898,7 @@
}
}
- public RemotingConnection getConnection()
+ public CoreRemotingConnection getConnection()
{
if (connection == null)
{
@@ -1057,9 +1058,9 @@
private class Channel0Handler implements ChannelHandler
{
- private final RemotingConnection conn;
+ private final CoreRemotingConnection conn;
- private Channel0Handler(final RemotingConnection conn)
+ private Channel0Handler(final CoreRemotingConnection conn)
{
this.conn = conn;
}
@@ -1088,7 +1089,7 @@
{
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
- RemotingConnection theConn = connection;
+ CoreRemotingConnection theConn = connection;
if (theConn != null && connectionID == theConn.getID())
{
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -17,10 +17,10 @@
import org.hornetq.api.core.HornetQException;
/**
- * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.RemotingConnection}.
+ * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.CoreRemotingConnection}.
* <p/>
* Any packet sent will have its channel id set to the specific channel sending so it can be routed to its correct channel
- * when received by the {@link org.hornetq.core.remoting.RemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
+ * when received by the {@link org.hornetq.core.remoting.CoreRemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
* <p/>
* Each Channel should will forward any packets received to its {@link org.hornetq.core.remoting.ChannelHandler}.
* <p/>
@@ -81,7 +81,7 @@
*
* @param newConnection the new connection
*/
- void transferConnection(RemotingConnection newConnection);
+ void transferConnection(CoreRemotingConnection newConnection);
/**
* resends any packets that have not received confirmations yet.
@@ -126,7 +126,7 @@
/**
* returns the Remoting Connection being used by the channel
*/
- RemotingConnection getConnection();
+ CoreRemotingConnection getConnection();
/**
* sends a confirmation of a packet being received.
@@ -148,7 +148,7 @@
void flushConfirmations();
/**
- * Called by {@link org.hornetq.core.remoting.RemotingConnection} when a packet is received.
+ * Called by {@link org.hornetq.core.remoting.CoreRemotingConnection} when a packet is received.
* <p/>
* This method should then call its {@link org.hornetq.core.remoting.ChannelHandler} after appropriate processing of
* the packet
Added: trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/CoreRemotingConnection.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+
+/**
+ * Extension of RemotingConnection for the HornetQ core protocol
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface CoreRemotingConnection extends RemotingConnection
+{
+ /**
+ * return the channel with the channel id specified.
+ * <p/>
+ * If it does not exist create it with the confirmation window size.
+ *
+ * @param channelID the channel id
+ * @param confWindowSize the confirmation window size
+ * @return the channel
+ */
+ Channel getChannel(long channelID, int confWindowSize);
+
+ /**
+ * add the channel with the specified channel id
+ *
+ * @param channelID the channel id
+ * @param channel the channel
+ */
+ void putChannel(long channelID, Channel channel);
+
+ /**
+ * remove the channel with the specified channel id
+ *
+ * @param channelID the channel id
+ * @return true if removed
+ */
+ boolean removeChannel(long channelID);
+
+ /**
+ * generate a unique (within this connection) channel id
+ *
+ * @return the id
+ */
+ long generateChannelID();
+
+ /**
+ * resets the id generator used to when generating id's
+ *
+ * @param id the first id to set it to
+ */
+ void syncIDGeneratorSequence(long id);
+
+ /**
+ * return the next id that will be chosen.
+ *
+ * @return the id
+ */
+ long getIDGeneratorSequence();
+
+ /**
+ * return the current tomeout for blocking calls
+ *
+ * @return the timeout in milliseconds
+ */
+ long getBlockingCallTimeout();
+
+ /**
+ * return the transfer lock used when transferring connections.
+ *
+ * @return the lock
+ */
+ Object getTransferLock();
+}
Added: trunk/src/main/org/hornetq/core/remoting/ProtocolType.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/ProtocolType.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/ProtocolType.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+/**
+ * A ProtocolType
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public enum ProtocolType
+{
+ CORE, STOMP, AMQP;
+}
Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -46,40 +46,6 @@
String getRemoteAddress();
/**
- * return the channel with the channel id specified.
- * <p/>
- * If it does not exist create it with the confirmation window size.
- *
- * @param channelID the channel id
- * @param confWindowSize the confirmation window size
- * @return the channel
- */
- Channel getChannel(long channelID, int confWindowSize);
-
- /**
- * add the channel with the specified channel id
- *
- * @param channelID the channel id
- * @param channel the channel
- */
- void putChannel(long channelID, Channel channel);
-
- /**
- * remove the channel with the specified channel id
- *
- * @param channelID the channel id
- * @return true if removed
- */
- boolean removeChannel(long channelID);
-
- /**
- * generate a unique (within this connection) channel id
- *
- * @return the id
- */
- long generateChannelID();
-
- /**
* add a failure listener.
* <p/>
* The listener will be called in the event of connection failure.
@@ -150,20 +116,6 @@
void destroy();
/**
- * resets the id generator used to when generating id's
- *
- * @param id the first id to set it to
- */
- void syncIDGeneratorSequence(long id);
-
- /**
- * return the next id that will be chosen.
- *
- * @return the id
- */
- long getIDGeneratorSequence();
-
- /**
* return the underlying Connection.
*
* @return the connection
@@ -182,36 +134,23 @@
*
* @return true if destroyed, otherwise false
*/
- boolean isDestroyed();
-
+ boolean isDestroyed();
+
/**
- * return the current tomeout for blocking calls
- *
- * @return the timeout in milliseconds
+ * Disconnect the connection, closing all channels
*/
- long getBlockingCallTimeout();
-
+ void disconnect();
+
/**
- * return the transfer lock used when transferring connections.
- *
- * @return the lock
- */
- Object getTransferLock();
-
- /**
* returns true if any data has been received since the last time this method was called.
*
* @return true if data has been received.
*/
boolean checkDataReceived();
-
+
/**
- * remove all channels from the remoting connection
+ * flush all outstanding data from the connection.
*/
- void removeAllChannels();
+ void flush();
- /**
- * flush all outstanding confirmations onto the connection.
- */
- void flushConfirmations();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -25,6 +25,7 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -52,7 +53,7 @@
private volatile int lastConfirmedCommandID = -1;
- private volatile RemotingConnection connection;
+ private volatile CoreRemotingConnection connection;
private volatile boolean closed;
@@ -76,7 +77,7 @@
private volatile boolean transferring;
- public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
+ public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
{
this.connection = connection;
@@ -316,7 +317,7 @@
closed = true;
}
- public void transferConnection(final RemotingConnection newConnection)
+ public void transferConnection(final CoreRemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
// the old connection get processed after transfer has occurred
@@ -326,7 +327,7 @@
// And switch it
- final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+ final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
rnewConnection.putChannel(id, this);
@@ -369,7 +370,7 @@
lock.unlock();
}
- public RemotingConnection getConnection()
+ public CoreRemotingConnection getConnection()
{
return connection;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -26,9 +26,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +38,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
{
// Constants
// ------------------------------------------------------------------------------------
@@ -84,6 +84,8 @@
private volatile boolean dataReceived;
private final Executor executor;
+
+ private volatile boolean executing;
// Constructors
// ---------------------------------------------------------------------------------
@@ -275,7 +277,22 @@
callClosingListeners();
}
+
+ public void disconnect()
+ {
+ Channel channel0 = getChannel(0, -1);
+ // And we remove all channels from the connection, this ensures no more packets will be processed after this
+ // method is
+ // complete
+
+ removeAllChannels();
+
+ // Now we are 100% sure that no more packets will be processed we can send the disconnect
+
+ channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ }
+
public long generateChannelID()
{
return idGenerator.generateID();
@@ -325,20 +342,12 @@
return res;
}
- public void removeAllChannels()
+ //We flush any confirmations on the connection - this prevents idle bridges for example
+ //sitting there with many unacked messages
+ public void flush()
{
- // We get the transfer lock first - this ensures no packets are being processed AND
- // it's guaranteed no more packets will be processed once this method is complete
synchronized (transferLock)
{
- channels.clear();
- }
- }
-
- public void flushConfirmations()
- {
- synchronized (transferLock)
- {
for (Channel channel : channels.values())
{
channel.flushConfirmations();
@@ -349,8 +358,6 @@
// Buffer Handler implementation
// ----------------------------------------------------
- private volatile boolean executing;
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
@@ -434,6 +441,15 @@
// Private
// --------------------------------------------------------------------------------------
+ private void removeAllChannels()
+ {
+ // We get the transfer lock first - this ensures no packets are being processed AND
+ // it's guaranteed no more packets will be processed once this method is complete
+ synchronized (transferLock)
+ {
+ channels.clear();
+ }
+ }
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.remoting.Acceptor;
@@ -215,14 +216,14 @@
this.connector = connector;
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection);
+ listener.connectionCreated(connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -17,8 +17,8 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -70,7 +70,7 @@
this.executor = executor;
- listener.connectionCreated(this);
+ listener.connectionCreated(this, ProtocolType.CORE);
}
private volatile boolean closing;
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -19,6 +19,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -174,14 +175,14 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection);
+ listener.connectionCreated(connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Added: trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/ConnectionEntry.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.core.remoting.RemotingConnection;
+
+/**
+ * A ConnectionEntry
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class ConnectionEntry
+{
+ public final RemotingConnection connection;
+
+ public volatile long lastCheck;
+
+ public volatile long ttl;
+
+ public ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
+ {
+ this.connection = connection;
+
+ this.lastCheck = lastCheck;
+
+ this.ttl = ttl;
+ }
+}
Added: trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/ProtocolManager.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server;
+
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A ProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface ProtocolManager
+{
+ ConnectionEntry createConnectionEntry(Connection connection);
+
+}
Added: trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/CoreProtocolManager.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.server.impl;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A CoreProtocolManager
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class CoreProtocolManager implements ProtocolManager
+{
+ private final HornetQServer server;
+
+ private final List<Interceptor> interceptors;
+
+ public CoreProtocolManager(final HornetQServer server,
+ final List<Interceptor> interceptors)
+ {
+ this.server = server;
+
+ this.interceptors = interceptors;
+ }
+
+ public ConnectionEntry createConnectionEntry(final Connection connection)
+ {
+ final Configuration config = server.getConfiguration();
+
+ CoreRemotingConnection rc = new RemotingConnectionImpl(connection,
+ interceptors,
+ config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory().getExecutor()
+ : null);
+
+ Channel channel1 = rc.getChannel(1, -1);
+
+ ChannelHandler handler = new HornetQPacketHandler(server, channel1, rc);
+
+ channel1.setHandler(handler);
+
+ long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
+
+ if (config.getConnectionTTLOverride() != -1)
+ {
+ ttl = config.getConnectionTTLOverride();
+ }
+
+ final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
+
+ final Channel channel0 = rc.getChannel(0, -1);
+
+ channel0.setHandler(new ChannelHandler()
+ {
+ public void handlePacket(final Packet packet)
+ {
+ if (packet.getType() == PacketImpl.PING)
+ {
+ Ping ping = (Ping)packet;
+
+ if (config.getConnectionTTLOverride() == -1)
+ {
+ // Allow clients to specify connection ttl
+ entry.ttl = ping.getConnectionTTL();
+ }
+
+ // Just send a ping back
+ channel0.send(packet);
+ }
+ }
+ });
+
+ return entry;
+ }
+}
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -26,20 +26,15 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.ChannelHandler;
-import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.server.ConnectionEntry;
+import org.hornetq.core.remoting.server.ProtocolManager;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
@@ -89,7 +84,10 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
-
+
+ private Map<ProtocolType, ProtocolManager> protocolMap =
+ new ConcurrentHashMap<ProtocolType, ProtocolManager>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -122,6 +120,8 @@
this.managementService = managementService;
this.threadPool = threadPool;
this.scheduledThreadPool = scheduledThreadPool;
+
+ this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManager(server, interceptors));
}
// RemotingService implementation -------------------------------
@@ -170,6 +170,7 @@
if (managementService != null)
{
acceptor.setNotificationService(managementService);
+
managementService.registerAcceptor(acceptor, info);
}
}
@@ -235,17 +236,7 @@
{
RemotingConnection conn = entry.connection;
- Channel channel0 = conn.getChannel(0, -1);
-
- // And we remove all channels from the connection, this ensures no more packets will be processed after this
- // method is
- // complete
-
- conn.removeAllChannels();
-
- // Now we are 100% sure that no more packets will be processed we can send the disconnect
-
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ conn.disconnect();
}
for (Acceptor acceptor : acceptors)
@@ -304,61 +295,32 @@
// ConnectionLifeCycleListener implementation -----------------------------------
- public void connectionCreated(final Connection connection)
+ private ProtocolManager getProtocolManager(ProtocolType protocol)
{
+ return protocolMap.get(protocol);
+ }
+
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
+ {
if (server == null)
{
throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
}
-
- RemotingConnection rc = new RemotingConnectionImpl(connection,
- interceptors,
- config.isAsyncConnectionExecutionEnabled() ? server.getExecutorFactory()
- .getExecutor()
- : null);
-
- Channel channel1 = rc.getChannel(1, -1);
-
- ChannelHandler handler = createHandler(rc, channel1);
-
- channel1.setHandler(handler);
-
- long ttl = HornetQClient.DEFAULT_CONNECTION_TTL;
-
- if (config.getConnectionTTLOverride() != -1)
+
+ ProtocolManager pmgr = this.getProtocolManager(protocol);
+
+ if (pmgr == null)
{
- ttl = config.getConnectionTTLOverride();
+ throw new IllegalArgumentException("Unknown protocol " + protocol);
}
- final ConnectionEntry entry = new ConnectionEntry(rc, System.currentTimeMillis(), ttl);
-
+ ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+
connections.put(connection.getID(), entry);
- final Channel channel0 = rc.getChannel(0, -1);
-
- channel0.setHandler(new ChannelHandler()
- {
- public void handlePacket(final Packet packet)
- {
- if (packet.getType() == PacketImpl.PING)
- {
- Ping ping = (Ping)packet;
-
- if (config.getConnectionTTLOverride() == -1)
- {
- // Allow clients to specify connection ttl
- entry.ttl = ping.getConnectionTTL();
- }
-
- // Just send a ping back
- channel0.send(packet);
- }
- }
- });
-
if (config.isBackup())
{
- serverSideReplicatingConnection = rc;
+ serverSideReplicatingConnection = entry.connection;
}
}
@@ -409,14 +371,6 @@
// Protected -----------------------------------------------------
- /**
- * Subclasses (on tests) may use this to create a different channel.
- */
- protected ChannelHandler createHandler(final RemotingConnection rc, final Channel channel)
- {
- return new HornetQPacketHandler(server, channel, rc);
- }
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -434,24 +388,6 @@
}
}
- private static final class ConnectionEntry
- {
- final RemotingConnection connection;
-
- volatile long lastCheck;
-
- volatile long ttl;
-
- ConnectionEntry(final RemotingConnection connection, final long lastCheck, final long ttl)
- {
- this.connection = connection;
-
- this.lastCheck = lastCheck;
-
- this.ttl = ttl;
- }
- }
-
private final class FailureCheckAndFlushThread extends Thread
{
private final long pauseInterval;
@@ -516,11 +452,8 @@
}
if (flush)
- {
- //We flush any confirmations on the connection - this prevents idle bridges for example
- //sitting there with many unacked messages
-
- conn.flushConfirmations();
+ {
+ conn.flush();
}
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -31,8 +31,8 @@
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
@@ -69,7 +69,7 @@
private final FailoverManager failoverManager;
- private RemotingConnection replicatingConnection;
+ private CoreRemotingConnection replicatingConnection;
private Channel replicatingChannel;
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -24,7 +24,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
@@ -70,7 +70,7 @@
void unregisterActivateCallback(ActivateCallback callback);
- ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
+ ReattachSessionResponseMessage reattachSession(CoreRemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
/** The journal at the backup server has to be equivalent as the journal used on the live node.
* Or else the backup node is out of sync. */
@@ -82,7 +82,7 @@
String password,
int minLargeMessageSize,
int incrementingVersion,
- RemotingConnection remotingConnection,
+ CoreRemotingConnection remotingConnection,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -19,7 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
/**
*
@@ -109,7 +109,7 @@
void close() throws Exception;
- int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
+ int transferConnection(CoreRemotingConnection newConnection, int lastReceivedCommandID);
Channel getChannel();
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,8 +22,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -48,9 +48,9 @@
private final Channel channel1;
- private final RemotingConnection connection;
+ private final CoreRemotingConnection connection;
- public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final RemotingConnection connection)
+ public HornetQPacketHandler(final HornetQServer server, final Channel channel1, final CoreRemotingConnection connection)
{
this.server = server;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -69,7 +69,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.remoting.server.RemotingService;
@@ -536,7 +536,7 @@
return clusterManager;
}
- public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
+ public ReattachSessionResponseMessage reattachSession(final CoreRemotingConnection connection,
final String name,
final int lastConfirmedCommandID) throws Exception
{
@@ -593,7 +593,7 @@
final String password,
final int minLargeMessageSize,
final int incrementingVersion,
- final RemotingConnection connection,
+ final CoreRemotingConnection connection,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -44,8 +44,8 @@
import org.hornetq.core.postoffice.QueueBinding;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.server.BindingQueryResult;
@@ -100,7 +100,7 @@
private final boolean strictUpdateDeliveryCount;
- private RemotingConnection remotingConnection;
+ private CoreRemotingConnection remotingConnection;
private Channel channel;
@@ -152,7 +152,7 @@
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final boolean xa,
- final RemotingConnection remotingConnection,
+ final CoreRemotingConnection remotingConnection,
final Channel channel,
final StorageManager storageManager,
final PostOffice postOffice,
@@ -1037,7 +1037,7 @@
}
}
- public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
+ public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
// delivered
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -59,9 +59,9 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
@@ -126,7 +126,7 @@
private final Channel channel;
- private volatile RemotingConnection remotingConnection;
+ private volatile CoreRemotingConnection remotingConnection;
public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
@@ -205,45 +205,8 @@
return channel;
}
- public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
- {
- // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
- // delivered
- // after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
- // sequence of packets.
- // It is not sufficient to just stop the session, since right after stopping the session, another session start
- // might be executed
- // before we have transferred the connection, leaving it in a started state
- session.setTransferring(true);
+
- removeConnectionListeners();
-
- // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
- // then the connection will get cleaned up anyway when the server ping timeout kicks in.
- // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
- // the replicating connection will cause the outstanding responses to be be replayed on the live server,
- // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
- // received responses that the backup did not know about.
-
- channel.transferConnection(newConnection);
-
- newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
- remotingConnection = newConnection;
-
- addConnectionListeners();
-
- int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
-
- channel.replayCommands(lastReceivedCommandID);
-
- channel.setTransferring(false);
-
- session.setTransferring(false);
-
- return serverLastReceivedCommandID;
- }
-
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
@@ -513,14 +514,14 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id " + connection.getID());
}
- listener.connectionCreated(connection);
+ listener.connectionCreated(connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -17,6 +17,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -55,7 +56,7 @@
this.listener = listener;
- listener.connectionCreated(this);
+ listener.connectionCreated(this, ProtocolType.CORE);
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -30,6 +30,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -598,7 +599,7 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
{
Modified: trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -13,6 +13,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
/**
* A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
@@ -26,7 +27,7 @@
*
* @param connection the connection that has been created
*/
- void connectionCreated(Connection connection);
+ void connectionCreated(Connection connection, ProtocolType protocol);
/**
* called when a connection is destroyed.
Modified: trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -25,8 +25,8 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.remoting.Channel;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -51,7 +51,7 @@
private HornetQServer server;
- private RemotingConnection connection;
+ private CoreRemotingConnection connection;
// Static --------------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyConnector;
@@ -549,7 +550,7 @@
latch = connCreatedLatch;
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
this.connection = connection;
if (latch != null)
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,13 +22,17 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.FailoverManagerImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
@@ -345,14 +349,14 @@
session.addFailureListener(clientListener);
- RemotingConnection serverConn = null;
+ CoreRemotingConnection serverConn = null;
while (serverConn == null)
{
Set<RemotingConnection> conns = server.getRemotingService().getConnections();
if (!conns.isEmpty())
{
- serverConn = server.getRemotingService().getConnections().iterator().next();
+ serverConn = (CoreRemotingConnection)server.getRemotingService().getConnections().iterator().next();
}
else
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
@@ -63,7 +64,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
};
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -77,7 +78,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
};
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.integration.transports.netty.NettyConnection;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -219,7 +220,7 @@
class MyListener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-20 17:11:18 UTC (rev 8817)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-20 19:45:20 UTC (rev 8818)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -66,7 +67,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
};
@@ -103,7 +104,7 @@
{
}
- public void connectionCreated(final Connection connection)
+ public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
}
};
15 years, 11 months
JBoss hornetq SVN: r8817 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/integration/transports/netty and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-20 12:11:18 -0500 (Wed, 20 Jan 2010)
New Revision: 8817
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added SUBSCRIBE command
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java 2010-01-20 17:07:39 UTC (rev 8816)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java 2010-01-20 17:11:18 UTC (rev 8817)
@@ -31,8 +31,8 @@
String CONNECT = "CONNECT";
String SEND = "SEND";
String DISCONNECT = "DISCONNECT";
- String SUBSCRIBE = "SUB";
- String UNSUBSCRIBE = "UNSUB";
+ String SUBSCRIBE = "SUBSCRIBE";
+ String UNSUBSCRIBE = "UNSUBSCRIBE";
String BEGIN_TRANSACTION = "BEGIN";
String COMMIT_TRANSACTION = "COMMIT";
String ABORT_TRANSACTION = "ABORT";
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 17:07:39 UTC (rev 8816)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 17:11:18 UTC (rev 8817)
@@ -43,6 +43,9 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.ServerMessage;
@@ -587,7 +590,7 @@
{
response = onConnect(frame, server, connection);
}
- if (Stomp.Commands.DISCONNECT.equals(command))
+ else if (Stomp.Commands.DISCONNECT.equals(command))
{
response = onDisconnect(frame, server, connection);
}
@@ -595,10 +598,16 @@
{
response = onSend(frame, server, connection);
}
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(frame, server, connection);
+ }
else
{
log.error("Unsupported Stomp frame: " + frame);
+ response = new StompFrame(Stomp.Responses.ERROR, new HashMap<String, Object>(), ("Unsupported frame: " + command).getBytes());
}
+
if (response != null)
{
System.out.println(">>> will reply " + response);
@@ -638,19 +647,44 @@
}
}
- private void checkConnected(RemotingConnection connection) throws StompException
+ /**
+ * @param frame
+ * @param server
+ * @param connection
+ * @return
+ * @throws StompException
+ * @throws HornetQException
+ */
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException, HornetQException
{
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ SimpleString queueName = StompDestinationConverter.convertDestination(queue);
+
+ ServerSession session = checkAndGetSession(connection);
+ long id = server.getStorageManager().generateUniqueID();
+ SessionCreateConsumerMessage packet = new SessionCreateConsumerMessage(id , queueName, null, false, false);
+ session.handleCreateConsumer(packet);
+ SessionConsumerFlowCreditMessage credits = new SessionConsumerFlowCreditMessage(id, -1);
+ session.handleReceiveConsumerCredits(credits );
+ session.handleStart(new PacketImpl(PacketImpl.SESS_START));
+
+ return null;
+ }
+
+ private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+ {
ServerSession session = sessions.get(connection);
if (session == null)
{
throw new StompException("Not connected");
}
+ return session;
}
+
private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
{
- checkConnected(connection);
-
- ServerSession session = sessions.get(connection);
+ ServerSession session = checkAndGetSession(connection);
if (session != null)
{
try
@@ -668,7 +702,7 @@
private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException, StompException
{
- checkConnected(connection);
+ ServerSession session = checkAndGetSession(connection);
Map<String, Object> headers = frame.getHeaders();
String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
@@ -703,7 +737,6 @@
message.getBodyBuffer().writeBytes(content);
}
- ServerSession session = sessions.get(connection);
SessionSendMessage packet = new SessionSendMessage(message, false);
session.handleSend(packet);
if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20 17:07:39 UTC (rev 8816)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20 17:11:18 UTC (rev 8817)
@@ -728,6 +728,10 @@
Stomp.NULL;
sendFrame(frame);
+ waitForFrameToTakeEffect();
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
frame =
"COMMIT\n" +
"transaction: tx1\n" +
@@ -899,7 +903,6 @@
int c = 0;
for (; ;) {
c = is.read();
- System.out.println(c);
if (c < 0) {
throw new IOException("socket closed.");
}
15 years, 11 months
JBoss hornetq SVN: r8816 - in trunk/src/main/org/hornetq/core: exception and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-20 12:07:39 -0500 (Wed, 20 Jan 2010)
New Revision: 8816
Added:
trunk/src/main/org/hornetq/core/exception/
trunk/src/main/org/hornetq/core/exception/HornetQXAException.java
Log:
missing file
Added: trunk/src/main/org/hornetq/core/exception/HornetQXAException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQXAException.java (rev 0)
+++ trunk/src/main/org/hornetq/core/exception/HornetQXAException.java 2010-01-20 17:07:39 UTC (rev 8816)
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.exception;
+
+import javax.transaction.xa.XAException;
+
+/**
+ * A HornetQXAException
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class HornetQXAException extends XAException
+{
+ private static final long serialVersionUID = 6535914602965015803L;
+
+ public HornetQXAException(final int errorCode, final String message)
+ {
+ super(message);
+
+ this.errorCode = errorCode;
+ }
+
+ public HornetQXAException(final int errorCode)
+ {
+ super(errorCode);
+ }
+}
15 years, 11 months
JBoss hornetq SVN: r8815 - in trunk/src/main/org/hornetq/core: remoting and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-20 12:00:28 -0500 (Wed, 20 Jan 2010)
New Revision: 8815
Removed:
trunk/src/main/org/hornetq/core/exception/
Modified:
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
reverted last commit
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 17:00:28 UTC (rev 8815)
@@ -17,10 +17,10 @@
import org.hornetq.api.core.HornetQException;
/**
- * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.CoreRemotingConnection}.
+ * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.RemotingConnection}.
* <p/>
* Any packet sent will have its channel id set to the specific channel sending so it can be routed to its correct channel
- * when received by the {@link org.hornetq.core.remoting.CoreRemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
+ * when received by the {@link org.hornetq.core.remoting.RemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
* <p/>
* Each Channel should will forward any packets received to its {@link org.hornetq.core.remoting.ChannelHandler}.
* <p/>
@@ -81,7 +81,7 @@
*
* @param newConnection the new connection
*/
- void transferConnection(CoreRemotingConnection newConnection);
+ void transferConnection(RemotingConnection newConnection);
/**
* resends any packets that have not received confirmations yet.
@@ -126,7 +126,7 @@
/**
* returns the Remoting Connection being used by the channel
*/
- CoreRemotingConnection getConnection();
+ RemotingConnection getConnection();
/**
* sends a confirmation of a packet being received.
@@ -148,7 +148,7 @@
void flushConfirmations();
/**
- * Called by {@link org.hornetq.core.remoting.CoreRemotingConnection} when a packet is received.
+ * Called by {@link org.hornetq.core.remoting.RemotingConnection} when a packet is received.
* <p/>
* This method should then call its {@link org.hornetq.core.remoting.ChannelHandler} after appropriate processing of
* the packet
Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 17:00:28 UTC (rev 8815)
@@ -46,6 +46,40 @@
String getRemoteAddress();
/**
+ * return the channel with the channel id specified.
+ * <p/>
+ * If it does not exist create it with the confirmation window size.
+ *
+ * @param channelID the channel id
+ * @param confWindowSize the confirmation window size
+ * @return the channel
+ */
+ Channel getChannel(long channelID, int confWindowSize);
+
+ /**
+ * add the channel with the specified channel id
+ *
+ * @param channelID the channel id
+ * @param channel the channel
+ */
+ void putChannel(long channelID, Channel channel);
+
+ /**
+ * remove the channel with the specified channel id
+ *
+ * @param channelID the channel id
+ * @return true if removed
+ */
+ boolean removeChannel(long channelID);
+
+ /**
+ * generate a unique (within this connection) channel id
+ *
+ * @return the id
+ */
+ long generateChannelID();
+
+ /**
* add a failure listener.
* <p/>
* The listener will be called in the event of connection failure.
@@ -116,6 +150,20 @@
void destroy();
/**
+ * resets the id generator used to when generating id's
+ *
+ * @param id the first id to set it to
+ */
+ void syncIDGeneratorSequence(long id);
+
+ /**
+ * return the next id that will be chosen.
+ *
+ * @return the id
+ */
+ long getIDGeneratorSequence();
+
+ /**
* return the underlying Connection.
*
* @return the connection
@@ -134,7 +182,36 @@
*
* @return true if destroyed, otherwise false
*/
- boolean isDestroyed();
-
- void disconnect();
+ boolean isDestroyed();
+
+ /**
+ * return the current tomeout for blocking calls
+ *
+ * @return the timeout in milliseconds
+ */
+ long getBlockingCallTimeout();
+
+ /**
+ * return the transfer lock used when transferring connections.
+ *
+ * @return the lock
+ */
+ Object getTransferLock();
+
+ /**
+ * returns true if any data has been received since the last time this method was called.
+ *
+ * @return true if data has been received.
+ */
+ boolean checkDataReceived();
+
+ /**
+ * remove all channels from the remoting connection
+ */
+ void removeAllChannels();
+
+ /**
+ * flush all outstanding confirmations onto the connection.
+ */
+ void flushConfirmations();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 17:00:28 UTC (rev 8815)
@@ -25,7 +25,6 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.CommandConfirmationHandler;
-import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -53,7 +52,7 @@
private volatile int lastConfirmedCommandID = -1;
- private volatile CoreRemotingConnection connection;
+ private volatile RemotingConnection connection;
private volatile boolean closed;
@@ -77,7 +76,7 @@
private volatile boolean transferring;
- public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
+ public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
{
this.connection = connection;
@@ -317,7 +316,7 @@
closed = true;
}
- public void transferConnection(final CoreRemotingConnection newConnection)
+ public void transferConnection(final RemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
// the old connection get processed after transfer has occurred
@@ -327,7 +326,7 @@
// And switch it
- final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
+ final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
rnewConnection.putChannel(id, this);
@@ -370,7 +369,7 @@
lock.unlock();
}
- public CoreRemotingConnection getConnection()
+ public RemotingConnection getConnection()
{
return connection;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20 17:00:28 UTC (rev 8815)
@@ -26,9 +26,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +38,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
+public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
{
// Constants
// ------------------------------------------------------------------------------------
@@ -84,8 +84,6 @@
private volatile boolean dataReceived;
private final Executor executor;
-
- private volatile boolean executing;
// Constructors
// ---------------------------------------------------------------------------------
@@ -277,22 +275,7 @@
callClosingListeners();
}
-
- public void disconnect()
- {
- Channel channel0 = getChannel(0, -1);
- // And we remove all channels from the connection, this ensures no more packets will be processed after this
- // method is
- // complete
-
- removeAllChannels();
-
- // Now we are 100% sure that no more packets will be processed we can send the disconnect
-
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
- }
-
public long generateChannelID()
{
return idGenerator.generateID();
@@ -342,6 +325,16 @@
return res;
}
+ public void removeAllChannels()
+ {
+ // We get the transfer lock first - this ensures no packets are being processed AND
+ // it's guaranteed no more packets will be processed once this method is complete
+ synchronized (transferLock)
+ {
+ channels.clear();
+ }
+ }
+
public void flushConfirmations()
{
synchronized (transferLock)
@@ -356,6 +349,8 @@
// Buffer Handler implementation
// ----------------------------------------------------
+ private volatile boolean executing;
+
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
@@ -439,15 +434,6 @@
// Private
// --------------------------------------------------------------------------------------
- private void removeAllChannels()
- {
- // We get the transfer lock first - this ensures no packets are being processed AND
- // it's guaranteed no more packets will be processed once this method is complete
- synchronized (transferLock)
- {
- channels.clear();
- }
- }
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 16:51:03 UTC (rev 8814)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 17:00:28 UTC (rev 8815)
@@ -170,7 +170,6 @@
if (managementService != null)
{
acceptor.setNotificationService(managementService);
-
managementService.registerAcceptor(acceptor, info);
}
}
@@ -236,7 +235,17 @@
{
RemotingConnection conn = entry.connection;
- conn.disconnect();
+ Channel channel0 = conn.getChannel(0, -1);
+
+ // And we remove all channels from the connection, this ensures no more packets will be processed after this
+ // method is
+ // complete
+
+ conn.removeAllChannels();
+
+ // Now we are 100% sure that no more packets will be processed we can send the disconnect
+
+ channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
}
for (Acceptor acceptor : acceptors)
15 years, 11 months
JBoss hornetq SVN: r8814 - in trunk/src/main/org/hornetq/core: exception and 3 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-20 11:51:03 -0500 (Wed, 20 Jan 2010)
New Revision: 8814
Added:
trunk/src/main/org/hornetq/core/exception/
trunk/src/main/org/hornetq/core/exception/HornetQXAException.java
Modified:
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
Log:
added missing class
Added: trunk/src/main/org/hornetq/core/exception/HornetQXAException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQXAException.java (rev 0)
+++ trunk/src/main/org/hornetq/core/exception/HornetQXAException.java 2010-01-20 16:51:03 UTC (rev 8814)
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.exception;
+
+import javax.transaction.xa.XAException;
+
+/**
+ * A HornetQXAException
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class HornetQXAException extends XAException
+{
+ private static final long serialVersionUID = 6535914602965015803L;
+
+ public HornetQXAException(final int errorCode, final String message)
+ {
+ super(message);
+
+ this.errorCode = errorCode;
+ }
+
+ public HornetQXAException(final int errorCode)
+ {
+ super(errorCode);
+ }
+}
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 15:08:28 UTC (rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 16:51:03 UTC (rev 8814)
@@ -17,10 +17,10 @@
import org.hornetq.api.core.HornetQException;
/**
- * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.RemotingConnection}.
+ * A channel is a way of interleaving data meant for different endpoints over the same {@link org.hornetq.core.remoting.CoreRemotingConnection}.
* <p/>
* Any packet sent will have its channel id set to the specific channel sending so it can be routed to its correct channel
- * when received by the {@link org.hornetq.core.remoting.RemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
+ * when received by the {@link org.hornetq.core.remoting.CoreRemotingConnection}. see {@link org.hornetq.core.remoting.Packet#setChannelID(long)}.
* <p/>
* Each Channel should will forward any packets received to its {@link org.hornetq.core.remoting.ChannelHandler}.
* <p/>
@@ -81,7 +81,7 @@
*
* @param newConnection the new connection
*/
- void transferConnection(RemotingConnection newConnection);
+ void transferConnection(CoreRemotingConnection newConnection);
/**
* resends any packets that have not received confirmations yet.
@@ -126,7 +126,7 @@
/**
* returns the Remoting Connection being used by the channel
*/
- RemotingConnection getConnection();
+ CoreRemotingConnection getConnection();
/**
* sends a confirmation of a packet being received.
@@ -148,7 +148,7 @@
void flushConfirmations();
/**
- * Called by {@link org.hornetq.core.remoting.RemotingConnection} when a packet is received.
+ * Called by {@link org.hornetq.core.remoting.CoreRemotingConnection} when a packet is received.
* <p/>
* This method should then call its {@link org.hornetq.core.remoting.ChannelHandler} after appropriate processing of
* the packet
Modified: trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 15:08:28 UTC (rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/RemotingConnection.java 2010-01-20 16:51:03 UTC (rev 8814)
@@ -46,40 +46,6 @@
String getRemoteAddress();
/**
- * return the channel with the channel id specified.
- * <p/>
- * If it does not exist create it with the confirmation window size.
- *
- * @param channelID the channel id
- * @param confWindowSize the confirmation window size
- * @return the channel
- */
- Channel getChannel(long channelID, int confWindowSize);
-
- /**
- * add the channel with the specified channel id
- *
- * @param channelID the channel id
- * @param channel the channel
- */
- void putChannel(long channelID, Channel channel);
-
- /**
- * remove the channel with the specified channel id
- *
- * @param channelID the channel id
- * @return true if removed
- */
- boolean removeChannel(long channelID);
-
- /**
- * generate a unique (within this connection) channel id
- *
- * @return the id
- */
- long generateChannelID();
-
- /**
* add a failure listener.
* <p/>
* The listener will be called in the event of connection failure.
@@ -150,20 +116,6 @@
void destroy();
/**
- * resets the id generator used to when generating id's
- *
- * @param id the first id to set it to
- */
- void syncIDGeneratorSequence(long id);
-
- /**
- * return the next id that will be chosen.
- *
- * @return the id
- */
- long getIDGeneratorSequence();
-
- /**
* return the underlying Connection.
*
* @return the connection
@@ -182,36 +134,7 @@
*
* @return true if destroyed, otherwise false
*/
- boolean isDestroyed();
-
- /**
- * return the current tomeout for blocking calls
- *
- * @return the timeout in milliseconds
- */
- long getBlockingCallTimeout();
-
- /**
- * return the transfer lock used when transferring connections.
- *
- * @return the lock
- */
- Object getTransferLock();
-
- /**
- * returns true if any data has been received since the last time this method was called.
- *
- * @return true if data has been received.
- */
- boolean checkDataReceived();
-
- /**
- * remove all channels from the remoting connection
- */
- void removeAllChannels();
-
- /**
- * flush all outstanding confirmations onto the connection.
- */
- void flushConfirmations();
+ boolean isDestroyed();
+
+ void disconnect();
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 15:08:28 UTC (rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 16:51:03 UTC (rev 8814)
@@ -25,6 +25,7 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.CommandConfirmationHandler;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
@@ -52,7 +53,7 @@
private volatile int lastConfirmedCommandID = -1;
- private volatile RemotingConnection connection;
+ private volatile CoreRemotingConnection connection;
private volatile boolean closed;
@@ -76,7 +77,7 @@
private volatile boolean transferring;
- public ChannelImpl(final RemotingConnection connection, final long id, final int confWindowSize)
+ public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize)
{
this.connection = connection;
@@ -316,7 +317,7 @@
closed = true;
}
- public void transferConnection(final RemotingConnection newConnection)
+ public void transferConnection(final CoreRemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
// the old connection get processed after transfer has occurred
@@ -326,7 +327,7 @@
// And switch it
- final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+ final CoreRemotingConnection rnewConnection = (CoreRemotingConnection)newConnection;
rnewConnection.putChannel(id, this);
@@ -369,7 +370,7 @@
lock.unlock();
}
- public RemotingConnection getConnection()
+ public CoreRemotingConnection getConnection()
{
return connection;
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20 15:08:28 UTC (rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-20 16:51:03 UTC (rev 8814)
@@ -26,9 +26,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.CoreRemotingConnection;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -38,7 +38,7 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @version <tt>$Revision$</tt> $Id$
*/
-public class RemotingConnectionImpl extends AbstractBufferHandler implements RemotingConnection
+public class RemotingConnectionImpl extends AbstractBufferHandler implements CoreRemotingConnection
{
// Constants
// ------------------------------------------------------------------------------------
@@ -84,6 +84,8 @@
private volatile boolean dataReceived;
private final Executor executor;
+
+ private volatile boolean executing;
// Constructors
// ---------------------------------------------------------------------------------
@@ -275,7 +277,22 @@
callClosingListeners();
}
+
+ public void disconnect()
+ {
+ Channel channel0 = getChannel(0, -1);
+ // And we remove all channels from the connection, this ensures no more packets will be processed after this
+ // method is
+ // complete
+
+ removeAllChannels();
+
+ // Now we are 100% sure that no more packets will be processed we can send the disconnect
+
+ channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ }
+
public long generateChannelID()
{
return idGenerator.generateID();
@@ -325,16 +342,6 @@
return res;
}
- public void removeAllChannels()
- {
- // We get the transfer lock first - this ensures no packets are being processed AND
- // it's guaranteed no more packets will be processed once this method is complete
- synchronized (transferLock)
- {
- channels.clear();
- }
- }
-
public void flushConfirmations()
{
synchronized (transferLock)
@@ -349,8 +356,6 @@
// Buffer Handler implementation
// ----------------------------------------------------
- private volatile boolean executing;
-
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
@@ -434,6 +439,15 @@
// Private
// --------------------------------------------------------------------------------------
+ private void removeAllChannels()
+ {
+ // We get the transfer lock first - this ensures no packets are being processed AND
+ // it's guaranteed no more packets will be processed once this method is complete
+ synchronized (transferLock)
+ {
+ channels.clear();
+ }
+ }
private void callFailureListeners(final HornetQException me)
{
final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 15:08:28 UTC (rev 8813)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20 16:51:03 UTC (rev 8814)
@@ -170,6 +170,7 @@
if (managementService != null)
{
acceptor.setNotificationService(managementService);
+
managementService.registerAcceptor(acceptor, info);
}
}
@@ -235,17 +236,7 @@
{
RemotingConnection conn = entry.connection;
- Channel channel0 = conn.getChannel(0, -1);
-
- // And we remove all channels from the connection, this ensures no more packets will be processed after this
- // method is
- // complete
-
- conn.removeAllChannels();
-
- // Now we are 100% sure that no more packets will be processed we can send the disconnect
-
- channel0.sendAndFlush(new PacketImpl(PacketImpl.DISCONNECT));
+ conn.disconnect();
}
for (Acceptor acceptor : acceptors)
15 years, 11 months
JBoss hornetq SVN: r8813 - in trunk/src/main/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-20 10:08:28 -0500 (Wed, 20 Jan 2010)
New Revision: 8813
Modified:
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Log:
Improved names in server session
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 14:53:17 UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 15:08:28 UTC (rev 8813)
@@ -43,73 +43,71 @@
void removeConsumer(ServerConsumer consumer) throws Exception;
- void close() throws Exception;
+ void acknowledge(long consumerID, long messageID) throws Exception;
- void handleAcknowledge(long consumerID, long messageID) throws Exception;
+ void expire(long consumerID, long messageID) throws Exception;
- void handleExpired(long consumerID, long messageID) throws Exception;
+ void rollback(boolean considerLastMessageAsDelivered) throws Exception;
- void handleRollback(boolean considerLastMessageAsDelivered) throws Exception;
+ void commit() throws Exception;
- void handleCommit() throws Exception;
+ void xaCommit(Xid xid, boolean onePhase) throws Exception;
- void handleXACommit(Xid xid, boolean onePhase) throws Exception;
+ void xaEnd(Xid xid) throws Exception;
- void handleXAEnd(Xid xid) throws Exception;
+ void xaForget(Xid xid) throws Exception;
- void handleXAForget(Xid xid) throws Exception;
+ void xaJoin(Xid xid) throws Exception;
- void handleXAJoin(Xid xid) throws Exception;
+ void xaPrepare(Xid xid) throws Exception;
- void handleXAPrepare(Xid xid) throws Exception;
+ void xaResume(Xid xid) throws Exception;
- void handleXAResume(Xid xid) throws Exception;
+ void xaRollback(Xid xid) throws Exception;
- void handleXARollback(Xid xid) throws Exception;
+ void xaStart(Xid xid) throws Exception;
- void handleXAStart(Xid xid) throws Exception;
+ void xaSuspend() throws Exception;
- void handleXASuspend() throws Exception;
+ List<Xid> xaGetInDoubtXids();
- List<Xid> handleGetInDoubtXids();
+ int xaGetTimeout();
- int handleGetXATimeout();
+ void xaSetTimeout(int timeout);
- void handleSetXATimeout(int timeout);
+ void start();
- void handleStart();
+ void stop();
- void handleStop();
-
- void handleCreateQueue(SimpleString address,
+ void createQueue(SimpleString address,
SimpleString name,
SimpleString filterString,
boolean temporary,
boolean durable) throws Exception;
- void handleDeleteQueue(SimpleString name) throws Exception;
+ void deleteQueue(SimpleString name) throws Exception;
- void handleCreateConsumer(long consumerID, SimpleString name, SimpleString filterString, boolean browseOnly) throws Exception;
+ void createConsumer(long consumerID, SimpleString name, SimpleString filterString, boolean browseOnly) throws Exception;
- QueueQueryResult handleExecuteQueueQuery(SimpleString name) throws Exception;
+ QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
- BindingQueryResult handleExecuteBindingQuery(SimpleString address);
+ BindingQueryResult executeBindingQuery(SimpleString address);
- void handleCloseConsumer(long consumerID) throws Exception;
+ void closeConsumer(long consumerID) throws Exception;
- void handleReceiveConsumerCredits(long consumerID, int credits) throws Exception;
+ void receiveConsumerCredits(long consumerID, int credits) throws Exception;
- void handleSendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
+ void sendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
- void handleSend(ServerMessage message) throws Exception;
+ void send(ServerMessage message) throws Exception;
- void handleSendLargeMessage(byte[] largeMessageHeader) throws Exception;
+ void sendLarge(byte[] largeMessageHeader) throws Exception;
- void handleForceConsumerDelivery(long consumerID, long sequence) throws Exception;
+ void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
- void handleRequestProducerCredits(SimpleString address, int credits) throws Exception;
+ void requestProducerCredits(SimpleString address, int credits) throws Exception;
- void handleClose() throws Exception;
+ void close() throws Exception;
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 14:53:17 UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 15:08:28 UTC (rev 8813)
@@ -84,7 +84,7 @@
// Attributes ----------------------------------------------------------------------------
- //private final long id;
+ // private final long id;
private final String username;
@@ -101,7 +101,7 @@
private final boolean strictUpdateDeliveryCount;
private RemotingConnection remotingConnection;
-
+
private Channel channel;
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
@@ -131,14 +131,14 @@
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
- // private ServerSessionPacketHandler handler;
+ // private ServerSessionPacketHandler handler;
private boolean closed;
private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new HashMap<SimpleString, CreditManagerHolder>();
private final RoutingContext routingContext = new RoutingContextImpl(null);
-
+
private SessionCallback callback;
// Constructors ---------------------------------------------------------------------------------
@@ -157,7 +157,7 @@
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
- final SecurityStore securityStore,
+ final SecurityStore securityStore,
final ManagementService managementService,
final HornetQServer server,
final SimpleString managementAddress) throws Exception
@@ -175,7 +175,7 @@
this.preAcknowledge = preAcknowledge;
this.remotingConnection = remotingConnection;
-
+
this.channel = channel;
this.storageManager = storageManager;
@@ -200,7 +200,7 @@
this.server = server;
this.managementAddress = managementAddress;
-
+
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -212,7 +212,7 @@
{
this.callback = callback;
}
-
+
public String getUsername()
{
return username;
@@ -246,7 +246,7 @@
}
}
- public synchronized void close() throws Exception
+ private synchronized void doClose() throws Exception
{
if (tx != null && tx.getXid() == null)
{
@@ -290,7 +290,7 @@
}
}
- public void handleCreateConsumer(final long consumerID,
+ public void createConsumer(final long consumerID,
final SimpleString name,
final SimpleString filterString,
final boolean browseOnly) throws Exception
@@ -347,7 +347,7 @@
}
}
- public void handleCreateQueue(final SimpleString address,
+ public void createQueue(final SimpleString address,
final SimpleString name,
final SimpleString filterString,
final boolean temporary,
@@ -398,7 +398,7 @@
}
}
- public void handleDeleteQueue(final SimpleString name) throws Exception
+ public void deleteQueue(final SimpleString name) throws Exception
{
Binding binding = postOffice.getBinding(name);
@@ -417,7 +417,7 @@
}
}
- public QueueQueryResult handleExecuteQueueQuery(final SimpleString name) throws Exception
+ public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
{
if (name == null)
{
@@ -457,7 +457,7 @@
return response;
}
- public BindingQueryResult handleExecuteBindingQuery(final SimpleString address)
+ public BindingQueryResult executeBindingQuery(final SimpleString address)
{
if (address == null)
{
@@ -479,21 +479,21 @@
return new BindingQueryResult(!names.isEmpty(), names);
}
- public void handleForceConsumerDelivery(final long consumerID, final long sequence) throws Exception
+ public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
consumer.forceDelivery(sequence);
}
- public void handleAcknowledge(final long consumerID, final long messageID) throws Exception
+ public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
- public void handleExpired(final long consumerID, final long messageID) throws Exception
+ public void expire(final long consumerID, final long messageID) throws Exception
{
MessageReference ref = consumers.get(consumerID).getExpired(messageID);
@@ -503,7 +503,7 @@
}
}
- public void handleCommit() throws Exception
+ public void commit() throws Exception
{
try
{
@@ -515,12 +515,21 @@
}
}
- public void handleRollback(final boolean considerLastMessageAsDelivered) throws Exception
+ public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
{
- rollback(considerLastMessageAsDelivered);
+ if (tx == null)
+ {
+ // Might be null if XA
+
+ tx = new TransactionImpl(storageManager);
+ }
+
+ doRollback(considerLastMessageAsDelivered, tx);
+
+ tx = new TransactionImpl(storageManager);
}
- public void handleXACommit(final Xid xid, final boolean onePhase) throws Exception
+ public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
{
if (tx != null)
{
@@ -568,7 +577,7 @@
}
}
- public void handleXAEnd(final Xid xid) throws Exception
+ public void xaEnd(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -613,7 +622,7 @@
}
}
- public void handleXAForget(final Xid xid) throws Exception
+ public void xaForget(final Xid xid) throws Exception
{
long id = resourceManager.removeHeuristicCompletion(xid);
@@ -636,7 +645,7 @@
}
}
- public void handleXAJoin(final Xid xid) throws Exception
+ public void xaJoin(final Xid xid) throws Exception
{
Transaction theTx = resourceManager.getTransaction(xid);
@@ -659,7 +668,7 @@
}
}
- public void handleXAResume(final Xid xid) throws Exception
+ public void xaResume(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -694,7 +703,7 @@
}
}
- public void handleXARollback(final Xid xid) throws Exception
+ public void xaRollback(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -743,7 +752,7 @@
}
}
- public void handleXAStart(final Xid xid) throws Exception
+ public void xaStart(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -766,7 +775,7 @@
}
}
- public void handleXASuspend() throws Exception
+ public void xaSuspend() throws Exception
{
if (tx == null)
{
@@ -791,7 +800,7 @@
}
}
- public void handleXAPrepare(final Xid xid) throws Exception
+ public void xaPrepare(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -824,7 +833,7 @@
}
}
- public List<Xid> handleGetInDoubtXids()
+ public List<Xid> xaGetInDoubtXids()
{
List<Xid> xids = new ArrayList<Xid>();
@@ -835,27 +844,27 @@
return xids;
}
- public int handleGetXATimeout()
+ public int xaGetTimeout()
{
return resourceManager.getTimeoutSeconds();
}
- public void handleSetXATimeout(final int timeout)
+ public void xaSetTimeout(final int timeout)
{
resourceManager.setTimeoutSeconds(timeout);
}
- public void handleStart()
+ public void start()
{
setStarted(true);
}
- public void handleStop()
+ public void stop()
{
setStarted(false);
}
- public void handleClose()
+ public void close()
{
storageManager.afterCompleteOperations(new IOAsyncTask()
{
@@ -867,7 +876,7 @@
{
try
{
- close();
+ doClose();
}
catch (Exception e)
{
@@ -877,7 +886,7 @@
});
}
- public void handleCloseConsumer(final long consumerID) throws Exception
+ public void closeConsumer(final long consumerID) throws Exception
{
final ServerConsumer consumer = consumers.get(consumerID);
@@ -891,7 +900,7 @@
}
}
- public void handleReceiveConsumerCredits(final long consumerID, final int credits) throws Exception
+ public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
@@ -905,7 +914,7 @@
consumer.receiveCredits(credits);
}
- public void handleSendLargeMessage(final byte[] largeMessageHeader) throws Exception
+ public void sendLarge(final byte[] largeMessageHeader) throws Exception
{
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
@@ -920,7 +929,7 @@
currentLargeMessage = msg;
}
- public void handleSend(final ServerMessage message) throws Exception
+ public void send(final ServerMessage message) throws Exception
{
try
{
@@ -937,7 +946,7 @@
}
else
{
- send(message);
+ doSend(message);
}
}
finally
@@ -953,7 +962,7 @@
}
}
- public void handleSendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
+ public void sendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
{
if (currentLargeMessage == null)
{
@@ -971,7 +980,7 @@
{
currentLargeMessage.releaseResources();
- send(currentLargeMessage);
+ doSend(currentLargeMessage);
releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
@@ -979,7 +988,7 @@
}
}
- public void handleRequestProducerCredits(final SimpleString address, final int credits) throws Exception
+ public void requestProducerCredits(final SimpleString address, final int credits) throws Exception
{
final CreditManagerHolder holder = getCreditManagerHolder(address);
@@ -1017,7 +1026,7 @@
}
}
}
-
+
public void setTransferring(final boolean transferring)
{
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -1073,7 +1082,7 @@
{
return channel;
}
-
+
public void runConnectionFailureRunners()
{
for (Runnable runner : failureRunners.values())
@@ -1110,7 +1119,7 @@
}
}
- handleClose();
+ close();
ServerSessionImpl.log.warn("Cleared up resources for session " + name);
}
@@ -1160,7 +1169,7 @@
started = s;
}
-
+
private void handleManagementMessage(final ServerMessage message) throws Exception
{
try
@@ -1184,7 +1193,7 @@
{
reply.setAddress(replyTo);
- send(reply);
+ doSend(reply);
}
}
@@ -1220,20 +1229,6 @@
}
}
- private void rollback(final boolean lastMessageAsDelived) throws Exception
- {
- if (tx == null)
- {
- // Might be null if XA
-
- tx = new TransactionImpl(storageManager);
- }
-
- doRollback(lastMessageAsDelived, tx);
-
- tx = new TransactionImpl(storageManager);
- }
-
/*
* The way flow producer flow control works is as follows:
* The client can only send messages as long as it has credits. It requests credits from the server
@@ -1282,7 +1277,7 @@
callback.sendProducerCreditsMessage(credits, address, -1);
}
- private void send(final ServerMessage msg) throws Exception
+ private void doSend(final ServerMessage msg) throws Exception
{
// Look up the paging store
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20 14:53:17 UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20 15:08:28 UTC (rev 8813)
@@ -157,7 +157,14 @@
session.runConnectionFailureRunners();
- handleCloseSession();
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
log.warn("Cleared up resources for session " + session.getName());
}
@@ -166,7 +173,14 @@
{
channel.flushConfirmations();
- handleCloseSession();
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
}
public void connectionClosed()
@@ -249,15 +263,15 @@
case SESS_CREATECONSUMER:
{
SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
- session.handleCreateConsumer(request.getID(),
- request.getQueueName(),
- request.getFilterString(),
- request.isBrowseOnly());
+ session.createConsumer(request.getID(),
+ request.getQueueName(),
+ request.getFilterString(),
+ request.isBrowseOnly());
if (request.isRequiresResponse())
{
// We send back queue information on the queue as a response- this allows the queue to
// be automaticall recreated on failover
- response = new SessionQueueQueryResponseMessage(session.handleExecuteQueueQuery(request.getQueueName()));
+ response = new SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName()));
}
break;
@@ -265,11 +279,11 @@
case CREATE_QUEUE:
{
CreateQueueMessage request = (CreateQueueMessage)packet;
- session.handleCreateQueue(request.getAddress(),
- request.getQueueName(),
- request.getFilterString(),
- request.isTemporary(),
- request.isDurable());
+ session.createQueue(request.getAddress(),
+ request.getQueueName(),
+ request.getFilterString(),
+ request.isTemporary(),
+ request.isDurable());
if (request.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -279,28 +293,28 @@
case DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
- session.handleDeleteQueue(request.getQueueName());
+ session.deleteQueue(request.getQueueName());
response = new NullResponseMessage();
break;
}
case SESS_QUEUEQUERY:
{
SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
- QueueQueryResult result = session.handleExecuteQueueQuery(request.getQueueName());
+ QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
response = new SessionQueueQueryResponseMessage(result);
break;
}
case SESS_BINDINGQUERY:
{
SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
- BindingQueryResult result = session.handleExecuteBindingQuery(request.getAddress());
+ BindingQueryResult result = session.executeBindingQuery(request.getAddress());
response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
break;
}
case SESS_ACKNOWLEDGE:
{
SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
- session.handleAcknowledge(message.getConsumerID(), message.getMessageID());
+ session.acknowledge(message.getConsumerID(), message.getMessageID());
if (message.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -310,116 +324,116 @@
case SESS_EXPIRED:
{
SessionExpiredMessage message = (SessionExpiredMessage)packet;
- session.handleExpired(message.getConsumerID(), message.getMessageID());
+ session.expire(message.getConsumerID(), message.getMessageID());
break;
}
case SESS_COMMIT:
{
- session.handleCommit();
+ session.commit();
response = new NullResponseMessage();
break;
}
case SESS_ROLLBACK:
{
- session.handleRollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
+ session.rollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
response = new NullResponseMessage();
break;
}
case SESS_XA_COMMIT:
{
SessionXACommitMessage message = (SessionXACommitMessage)packet;
- session.handleXACommit(message.getXid(), message.isOnePhase());
+ session.xaCommit(message.getXid(), message.isOnePhase());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_END:
{
SessionXAEndMessage message = (SessionXAEndMessage)packet;
- session.handleXAEnd(message.getXid());
+ session.xaEnd(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_FORGET:
{
SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
- session.handleXAForget(message.getXid());
+ session.xaForget(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_JOIN:
{
SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
- session.handleXAJoin(message.getXid());
+ session.xaJoin(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_RESUME:
{
SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
- session.handleXAResume(message.getXid());
+ session.xaResume(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_ROLLBACK:
{
SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
- session.handleXARollback(message.getXid());
+ session.xaRollback(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_START:
{
SessionXAStartMessage message = (SessionXAStartMessage)packet;
- session.handleXAStart(message.getXid());
+ session.xaStart(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_SUSPEND:
{
- session.handleXASuspend();
+ session.xaSuspend();
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_PREPARE:
{
SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
- session.handleXAPrepare(message.getXid());
+ session.xaPrepare(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
break;
}
case SESS_XA_INDOUBT_XIDS:
{
- List<Xid> xids = session.handleGetInDoubtXids();
+ List<Xid> xids = session.xaGetInDoubtXids();
response = new SessionXAGetInDoubtXidsResponseMessage(xids);
break;
}
case SESS_XA_GET_TIMEOUT:
{
- int timeout = session.handleGetXATimeout();
+ int timeout = session.xaGetTimeout();
response = new SessionXAGetTimeoutResponseMessage(timeout);
break;
}
case SESS_XA_SET_TIMEOUT:
{
SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
- session.handleSetXATimeout(message.getTimeoutSeconds());
+ session.xaSetTimeout(message.getTimeoutSeconds());
response = new SessionXASetTimeoutResponseMessage(true);
break;
}
case SESS_START:
{
- session.handleStart();
+ session.start();
break;
}
case SESS_STOP:
{
- session.handleStop();
+ session.stop();
response = new NullResponseMessage();
break;
}
case SESS_CLOSE:
{
- handleCloseSession();
+ session.close();
removeConnectionListeners();
response = new NullResponseMessage();
flush = true;
@@ -429,20 +443,20 @@
case SESS_CONSUMER_CLOSE:
{
SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
- session.handleCloseConsumer(message.getConsumerID());
+ session.closeConsumer(message.getConsumerID());
response = new NullResponseMessage();
break;
}
case SESS_FLOWTOKEN:
{
SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
- session.handleReceiveConsumerCredits(message.getConsumerID(), message.getCredits());
+ session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
break;
}
case SESS_SEND:
{
SessionSendMessage message = (SessionSendMessage)packet;
- session.handleSend((ServerMessage)message.getMessage());
+ session.send((ServerMessage)message.getMessage());
if (message.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -452,13 +466,13 @@
case SESS_SEND_LARGE:
{
SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
- session.handleSendLargeMessage(message.getLargeMessageHeader());
+ session.sendLarge(message.getLargeMessageHeader());
break;
}
case SESS_SEND_CONTINUATION:
{
SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
- session.handleSendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
+ session.sendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
if (message.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -468,13 +482,13 @@
case SESS_FORCE_CONSUMER_DELIVERY:
{
SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
- session.handleForceConsumerDelivery(message.getConsumerID(), message.getSequence());
+ session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
break;
}
case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
{
SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)packet;
- session.handleRequestProducerCredits(message.getAddress(), message.getCredits());
+ session.requestProducerCredits(message.getAddress(), message.getCredits());
break;
}
}
@@ -491,7 +505,7 @@
{
log.error("Caught unexpected exception", t);
}
-
+
sendResponse(packet, response, flush, closeChannel);
}
finally
@@ -551,28 +565,6 @@
}
}
- private void handleCloseSession()
- {
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- try
- {
- session.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close session", e);
- }
- }
- });
- }
-
public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
{
Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
15 years, 11 months
JBoss hornetq SVN: r8812 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/integration/transports/netty and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-01-20 09:53:17 -0500 (Wed, 20 Jan 2010)
New Revision: 8812
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java
Removed:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added code to have a complete Stomp CONNECT + SEND + DISCONNECT use case
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java 2010-01-20 14:53:17 UTC (rev 8812)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+/**
+ * A StompException
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompException extends Exception
+{
+
+ /**
+ * @param string
+ */
+ public StompException(String string)
+ {
+ super(string);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java 2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java 2010-01-20 14:53:17 UTC (rev 8812)
@@ -28,6 +28,6 @@
public StompFrameDelimiter()
{
- super(MAX_DATA_LENGTH, true, Delimiters.nulDelimiter());
+ super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
}
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20 14:53:17 UTC (rev 8812)
@@ -13,6 +13,9 @@
package org.hornetq.integration.transports.netty;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
@@ -49,8 +52,10 @@
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.integration.stomp.Stomp;
import org.hornetq.integration.stomp.StompDestinationConverter;
+import org.hornetq.integration.stomp.StompException;
import org.hornetq.integration.stomp.StompFrame;
import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -565,15 +570,16 @@
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
{
+ StompFrame frame = (StompFrame)e.getMessage();
+ System.out.println(">>> got frame " + frame);
+
+ // need to interact with HornetQ server & session
+ HornetQServer server = serverHandler.getServer();
+ RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+
try
{
- StompFrame frame = (StompFrame)e.getMessage();
- System.out.println(">>> got frame " + frame);
- // need to interact with HornetQ server & session
- HornetQServer server = serverHandler.getServer();
- RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
-
String command = frame.getCommand();
StompFrame response = null;
@@ -581,6 +587,10 @@
{
response = onConnect(frame, server, connection);
}
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(frame, server, connection);
+ }
else if (Stomp.Commands.SEND.equals(command))
{
response = onSend(frame, server, connection);
@@ -598,14 +608,68 @@
connection.getTransportConnection().write(buffer, true);
}
}
+ catch (StompException ex)
+ {
+ // Let the stomp client know about any protocol errors.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+ ex.printStackTrace(stream);
+ stream.append(Stomp.NULL + Stomp.NEWLINE);
+ stream.close();
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+ final String receiptId = (String) frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null) {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ byte[] bytes = marshaller.marshal(errorMessage);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+
+ }
catch (Exception ex)
{
ex.printStackTrace();
}
}
- private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException
+ private void checkConnected(RemotingConnection connection) throws StompException
{
+ ServerSession session = sessions.get(connection);
+ if (session == null)
+ {
+ throw new StompException("Not connected");
+ }
+ }
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+ {
+ checkConnected(connection);
+
+ ServerSession session = sessions.get(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ throw new StompException(e.getMessage());
+ }
+ sessions.remove(connection);
+ }
+ return null;
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws HornetQException, StompException
+ {
+ checkConnected(connection);
+
Map<String, Object> headers = frame.getHeaders();
String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
/*
@@ -615,6 +679,10 @@
boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
*/
byte type = HornetQTextMessage.TYPE;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+ {
+ type = HornetQBytesMessage.TYPE;
+ }
long timestamp = System.currentTimeMillis();
boolean durable = false;
long expiration = -1;
@@ -625,9 +693,15 @@
message.setType(type);
message.setTimestamp(timestamp);
message.setAddress(address);
- String content = new String(frame.getContent());
- System.out.println(">>> got: " + content);
- message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(content));
+ byte[] content = frame.getContent();
+ if (type == HornetQTextMessage.TYPE)
+ {
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
+ }
+ else
+ {
+ message.getBodyBuffer().writeBytes(content);
+ }
ServerSession session = sessions.get(connection);
SessionSendMessage packet = new SessionSendMessage(message, false);
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20 14:53:17 UTC (rev 8812)
@@ -79,7 +79,32 @@
Assert.assertTrue(f.startsWith("CONNECTED"));
Assert.assertTrue(f.indexOf("response-id:1") >= 0);
}
+
+ public void testDisconnectAndError() throws Exception {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+
+ connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ // sending a message will result in an error
+ String frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+ }
+
+
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -150,7 +175,47 @@
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+
+ public void testSendMessageWithContentLength() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ byte[] data = new byte[] {1, 2, 3, 4};
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "content-length:" + data.length + "\n\n" +
+ new String(data) +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
public void _testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
Deleted: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java 2010-01-20 14:38:53 UTC (rev 8811)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java 2010-01-20 14:53:17 UTC (rev 8812)
@@ -1,90 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.integration.transports.netty.NettyAcceptor;
-import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
-import org.hornetq.integration.transports.netty.TransportConstants;
-
-import junit.framework.TestCase;
-
-/**
- * A StompTest
- *
- * @author jmesnil
- *
- *
- */
-public class StompTest2 extends TestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private HornetQServer server;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testFoo() throws Exception
- {
- Thread.sleep(10);
- }
-
- // Package protected ---------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- Configuration config = new ConfigurationImpl();
- config.setSecurityEnabled(false);
-
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);
- params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
- TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
- config.getAcceptorConfigurations().add(stompTransport);
-
- server = HornetQServers.newHornetQServer(config);
- server.start();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- server.stop();
-
- super.tearDown();
- }
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
15 years, 11 months
JBoss hornetq SVN: r8811 - in trunk: src/main/org/hornetq/core/client/impl and 7 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-01-20 09:38:53 -0500 (Wed, 20 Jan 2010)
New Revision: 8811
Added:
trunk/src/main/org/hornetq/core/server/BindingQueryResult.java
trunk/src/main/org/hornetq/core/server/QueueQueryResult.java
trunk/src/main/org/hornetq/core/server/SessionCallback.java
Modified:
trunk/src/main/org/hornetq/api/core/client/ClientSession.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/remoting/Channel.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
Log:
partial refactoring to remove remoting knowledge from session
Modified: trunk/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSession.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSession.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -455,6 +455,7 @@
/**
* Creates a ClientMessage.
*
+ * @param type type of the message
* @param durable whether the created message is durable or not
* @param expiration the message expiration
* @param timestamp the message timestamp
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -866,7 +866,7 @@
{
// The session was found on the server - we reattached transparently ok
- channel.replayCommands(response.getLastConfirmedCommandID(), channel.getID());
+ channel.replayCommands(response.getLastConfirmedCommandID());
}
else
{
Modified: trunk/src/main/org/hornetq/core/remoting/Channel.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/remoting/Channel.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -89,9 +89,8 @@
* Typically called after a connection has been transferred.
*
* @param lastConfirmedCommandID the last confirmed packet
- * @param newID the new id to use
*/
- void replayCommands(int lastConfirmedCommandID, final long newID);
+ void replayCommands(int lastConfirmedCommandID);
/**
* returns the last confirmed packet command id
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -336,7 +336,7 @@
}
}
- public void replayCommands(final int otherLastConfirmedCommandID, final long newChannelID)
+ public void replayCommands(final int otherLastConfirmedCommandID)
{
if (resendCache != null)
{
@@ -344,8 +344,6 @@
for (final Packet packet : resendCache)
{
- packet.setChannelID(newChannelID);
-
doWrite(packet);
}
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.QueueQueryResult;
/**
*
@@ -41,15 +42,10 @@
private boolean temporary;
- public SessionQueueQueryResponseMessage(final SimpleString name,
- final SimpleString address,
- final boolean durable,
- final boolean temporary,
- final SimpleString filterString,
- final int consumerCount,
- final int messageCount)
+ public SessionQueueQueryResponseMessage(final QueueQueryResult result)
{
- this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
+ this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(),
+ result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists());
}
public SessionQueueQueryResponseMessage()
Added: trunk/src/main/org/hornetq/core/server/BindingQueryResult.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/BindingQueryResult.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/BindingQueryResult.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ *
+ * A BindingQueryResult
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class BindingQueryResult
+{
+ private boolean exists;
+
+ private List<SimpleString> queueNames;
+
+ public BindingQueryResult(final boolean exists, final List<SimpleString> queueNames)
+ {
+ this.exists = exists;
+
+ this.queueNames = queueNames;
+ }
+
+ public boolean isExists()
+ {
+ return exists;
+ }
+
+ public List<SimpleString> getQueueNames()
+ {
+ return queueNames;
+ }
+}
Added: trunk/src/main/org/hornetq/core/server/QueueQueryResult.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/QueueQueryResult.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/QueueQueryResult.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ *
+ * A QueueQueryResult
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ */
+public class QueueQueryResult
+{
+ private SimpleString name;
+
+ private boolean exists;
+
+ private boolean durable;
+
+ private int consumerCount;
+
+ private int messageCount;
+
+ private SimpleString filterString;
+
+ private SimpleString address;
+
+ private boolean temporary;
+
+ public QueueQueryResult(final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final int messageCount)
+ {
+ this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
+ }
+
+ public QueueQueryResult()
+ {
+ this(null, null, false, false, null, 0, 0, false);
+ }
+
+ private QueueQueryResult(final SimpleString name,
+ final SimpleString address,
+ final boolean durable,
+ final boolean temporary,
+ final SimpleString filterString,
+ final int consumerCount,
+ final int messageCount,
+ final boolean exists)
+ {
+ this.durable = durable;
+
+ this.temporary = temporary;
+
+ this.consumerCount = consumerCount;
+
+ this.messageCount = messageCount;
+
+ this.filterString = filterString;
+
+ this.address = address;
+
+ this.name = name;
+
+ this.exists = exists;
+ }
+
+ public boolean isExists()
+ {
+ return exists;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public int getConsumerCount()
+ {
+ return consumerCount;
+ }
+
+ public int getMessageCount()
+ {
+ return messageCount;
+ }
+
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public boolean isTemporary()
+ {
+ return temporary;
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -13,34 +13,13 @@
package org.hornetq.core.server;
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.SimpleString;
import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.hornetq.core.server.impl.ServerSessionPacketHandler;
/**
*
@@ -54,8 +33,6 @@
{
String getName();
- long getID();
-
String getUsername();
String getPassword();
@@ -68,73 +45,79 @@
void close() throws Exception;
- void handleAcknowledge(final SessionAcknowledgeMessage packet);
+ void handleAcknowledge(long consumerID, long messageID) throws Exception;
- void handleExpired(final SessionExpiredMessage packet);
+ void handleExpired(long consumerID, long messageID) throws Exception;
- void handleRollback(RollbackMessage packet);
+ void handleRollback(boolean considerLastMessageAsDelivered) throws Exception;
- void handleCommit(Packet packet);
+ void handleCommit() throws Exception;
- void handleXACommit(SessionXACommitMessage packet);
+ void handleXACommit(Xid xid, boolean onePhase) throws Exception;
- void handleXAEnd(SessionXAEndMessage packet);
+ void handleXAEnd(Xid xid) throws Exception;
- void handleXAForget(SessionXAForgetMessage packet);
+ void handleXAForget(Xid xid) throws Exception;
- void handleXAJoin(SessionXAJoinMessage packet);
+ void handleXAJoin(Xid xid) throws Exception;
- void handleXAPrepare(SessionXAPrepareMessage packet);
+ void handleXAPrepare(Xid xid) throws Exception;
- void handleXAResume(SessionXAResumeMessage packet);
+ void handleXAResume(Xid xid) throws Exception;
- void handleXARollback(SessionXARollbackMessage packet);
+ void handleXARollback(Xid xid) throws Exception;
- void handleXAStart(SessionXAStartMessage packet);
+ void handleXAStart(Xid xid) throws Exception;
- void handleXASuspend(Packet packet);
+ void handleXASuspend() throws Exception;
- void handleGetInDoubtXids(Packet packet);
+ List<Xid> handleGetInDoubtXids();
- void handleGetXATimeout(Packet packet);
+ int handleGetXATimeout();
- void handleSetXATimeout(SessionXASetTimeoutMessage packet);
+ void handleSetXATimeout(int timeout);
- void handleStart(Packet packet);
+ void handleStart();
- void handleStop(Packet packet);
+ void handleStop();
- void handleCreateQueue(CreateQueueMessage packet);
+ void handleCreateQueue(SimpleString address,
+ SimpleString name,
+ SimpleString filterString,
+ boolean temporary,
+ boolean durable) throws Exception;
- void handleDeleteQueue(SessionDeleteQueueMessage packet);
+ void handleDeleteQueue(SimpleString name) throws Exception;
- void handleCreateConsumer(SessionCreateConsumerMessage packet);
+ void handleCreateConsumer(long consumerID, SimpleString name, SimpleString filterString, boolean browseOnly) throws Exception;
- void handleExecuteQueueQuery(SessionQueueQueryMessage packet);
+ QueueQueryResult handleExecuteQueueQuery(SimpleString name) throws Exception;
- void handleExecuteBindingQuery(SessionBindingQueryMessage packet);
+ BindingQueryResult handleExecuteBindingQuery(SimpleString address);
- void handleCloseConsumer(SessionConsumerCloseMessage packet);
+ void handleCloseConsumer(long consumerID) throws Exception;
- void handleReceiveConsumerCredits(SessionConsumerFlowCreditMessage packet);
+ void handleReceiveConsumerCredits(long consumerID, int credits) throws Exception;
- void handleSendContinuations(SessionSendContinuationMessage packet);
+ void handleSendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
- void handleSend(SessionSendMessage packet);
+ void handleSend(ServerMessage message) throws Exception;
- void handleSendLargeMessage(SessionSendLargeMessage packet);
+ void handleSendLargeMessage(byte[] largeMessageHeader) throws Exception;
- void handleForceConsumerDelivery(SessionForceConsumerDelivery message);
+ void handleForceConsumerDelivery(long consumerID, long sequence) throws Exception;
- void handleRequestProducerCredits(SessionRequestProducerCreditsMessage message) throws Exception;
+ void handleRequestProducerCredits(SimpleString address, int credits) throws Exception;
- void handleClose(Packet packet);
+ void handleClose() throws Exception;
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
Channel getChannel();
- ServerSessionPacketHandler getHandler();
-
- void setHandler(ServerSessionPacketHandler handler);
+ void setTransferring(boolean transferring);
+
+ void runConnectionFailureRunners();
+
+ void setCallback(SessionCallback callback);
}
Added: trunk/src/main/org/hornetq/core/server/SessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/SessionCallback.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/SessionCallback.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+
+/**
+ * A SessionCallback
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface SessionCallback
+{
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset);
+
+ public int sendMessage(ServerMessage message, long consumerID, int deliveryCount);
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount);
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse);
+
+}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -22,7 +22,6 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -643,11 +642,11 @@
configuration.isPersistDeliveryCountBeforeDelivery(),
xa,
connection,
+ channel,
storageManager,
postOffice,
resourceManager,
- securityStore,
- channel,
+ securityStore,
managementService,
this,
configuration.getManagementAddress());
@@ -656,10 +655,11 @@
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
storageManager.newContext(executorFactory.getExecutor()),
- storageManager);
+ storageManager,
+ channel);
+
+ session.setCallback(handler);
- session.setHandler(handler);
-
channel.setHandler(handler);
return new CreateSessionResponseMessage(version.getIncrementingVersion());
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -245,12 +245,12 @@
executor.execute(deliverRunner);
}
}
-
+
public Executor getExecutor()
{
return executor;
}
-
+
public synchronized void deliverNow()
{
deliverRunner.run();
@@ -743,7 +743,7 @@
}
return false;
}
-
+
public int sendMessagesToDeadLetterAddress(Filter filter) throws Exception
{
int count = 0;
@@ -757,7 +757,7 @@
deliveringCount.incrementAndGet();
sendToDeadLetterAddress(ref);
iter.remove();
- count ++;
+ count++;
}
}
return count;
@@ -832,7 +832,7 @@
return false;
}
-
+
public int changeReferencesPriority(final Filter filter, final byte newPriority) throws Exception
{
Iterator<MessageReference> iter = messageReferences.iterator();
@@ -843,7 +843,7 @@
MessageReference ref = iter.next();
if (filter == null || filter.match(ref.getMessage()))
{
- count ++;
+ count++;
iter.remove();
ref.getMessage().setPriority(newPriority);
addLast(ref);
@@ -1404,7 +1404,8 @@
}
catch (Exception e)
{
- QueueImpl.log.warn("Unable to remove message id = " + message.getMessageID() + " please remove manually");
+ QueueImpl.log.warn("Unable to remove message id = " + message.getMessageID() + " please remove manually",
+ e);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -23,7 +23,6 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ClientConsumerImpl;
@@ -33,10 +32,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.remoting.Channel;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -44,6 +39,7 @@
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.transaction.Transaction;
@@ -112,7 +108,7 @@
private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
- private final Channel channel;
+ private final SessionCallback callback;
private volatile boolean closed;
@@ -133,12 +129,11 @@
final boolean started,
final boolean browseOnly,
final StorageManager storageManager,
- final Channel channel,
+ final SessionCallback callback,
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final ManagementService managementService) throws Exception
{
-
this.id = id;
this.filter = filter;
@@ -157,7 +152,7 @@
this.storageManager = storageManager;
- this.channel = channel;
+ this.callback = callback;
this.preAcknowledge = preAcknowledge;
@@ -361,9 +356,7 @@
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName());
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
-
- channel.send(packet);
+ callback.sendMessage(forcedDeliveryMessage, id, 0);
}
catch (Exception e)
{
@@ -634,15 +627,12 @@
*/
private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
{
- final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount());
+ int packetSize = callback.sendMessage(message, id, ref.getDeliveryCount());
- channel.send(packet);
-
if (availableCredits != null)
{
- availableCredits.addAndGet(-packet.getPacketSize());
+ availableCredits.addAndGet(-packetSize);
}
-
}
// Inner classes
@@ -732,21 +722,18 @@
sizePendingLargeMessage = context.getLargeBodySize();
- SessionReceiveLargeMessage initialPacket = new SessionReceiveLargeMessage(id,
- headerBuffer.toByteBuffer()
- .array(),
- context.getLargeBodySize(),
- ref.getDeliveryCount());
-
context.open();
sentInitialPacket = true;
- channel.send(initialPacket);
+ int packetSize = callback.sendLargeMessage(id,
+ headerBuffer.toByteBuffer().array(),
+ context.getLargeBodySize(),
+ ref.getDeliveryCount());
if (availableCredits != null)
{
- availableCredits.addAndGet(-initialPacket.getPacketSize());
+ availableCredits.addAndGet(-packetSize);
}
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -768,22 +755,33 @@
return false;
}
- SessionReceiveContinuationMessage chunk = createChunkSend(context);
+ int localChunkLen = 0;
- int chunkLen = chunk.getBody().length;
+ localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
- channel.send(chunk);
+ HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
+ context.encode(bodyBuffer, localChunkLen);
+
+ byte[] body = bodyBuffer.toByteBuffer().array();
+
+ int packetSize = callback.sendLargeMessageContinuation(id,
+ body,
+ positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
+ false);
+
+ int chunkLen = body.length;
+
if (ServerConsumerImpl.trace)
{
- ServerConsumerImpl.trace("deliverLargeMessage: Sending " + chunk.getPacketSize() +
+ ServerConsumerImpl.trace("deliverLargeMessage: Sending " + packetSize +
" availableCredits now is " +
availableCredits);
}
if (availableCredits != null)
{
- availableCredits.addAndGet(-chunk.getPacketSize());
+ availableCredits.addAndGet(-packetSize);
}
positionPendingLargeMessage += chunkLen;
@@ -846,26 +844,6 @@
lock.unlock();
}
}
-
- private SessionReceiveContinuationMessage createChunkSend(final BodyEncoder context) throws HornetQException
- {
- SessionReceiveContinuationMessage chunk;
-
- int localChunkLen = 0;
-
- localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
- HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
-
- context.encode(bodyBuffer, localChunkLen);
-
- chunk = new SessionReceiveContinuationMessage(id,
- bodyBuffer.toByteBuffer().array(),
- positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
- false);
-
- return chunk;
- }
}
private class BrowserDeliverer implements Runnable
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -24,13 +24,13 @@
import java.util.concurrent.ConcurrentHashMap;
import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.journal.IOAsyncTask;
@@ -45,52 +45,20 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
-import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.SecurityStore;
+import org.hornetq.core.server.BindingQueryResult;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.transaction.ResourceManager;
@@ -113,10 +81,10 @@
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
// Static -------------------------------------------------------------------------------
-
+
// Attributes ----------------------------------------------------------------------------
- private final long id;
+ //private final long id;
private final String username;
@@ -133,6 +101,8 @@
private final boolean strictUpdateDeliveryCount;
private RemotingConnection remotingConnection;
+
+ private Channel channel;
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
@@ -146,8 +116,6 @@
private final SecurityStore securityStore;
- private final Channel channel;
-
private final ManagementService managementService;
private volatile boolean started = false;
@@ -163,13 +131,15 @@
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
- private ServerSessionPacketHandler handler;
+ // private ServerSessionPacketHandler handler;
private boolean closed;
private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new HashMap<SimpleString, CreditManagerHolder>();
private final RoutingContext routingContext = new RoutingContextImpl(null);
+
+ private SessionCallback callback;
// Constructors ---------------------------------------------------------------------------------
@@ -183,17 +153,15 @@
final boolean strictUpdateDeliveryCount,
final boolean xa,
final RemotingConnection remotingConnection,
+ final Channel channel,
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
- final SecurityStore securityStore,
- final Channel channel,
+ final SecurityStore securityStore,
final ManagementService managementService,
final HornetQServer server,
final SimpleString managementAddress) throws Exception
{
- id = channel.getID();
-
this.username = username;
this.password = password;
@@ -207,6 +175,8 @@
this.preAcknowledge = preAcknowledge;
this.remotingConnection = remotingConnection;
+
+ this.channel = channel;
this.storageManager = storageManager;
@@ -223,8 +193,6 @@
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
- this.channel = channel;
-
this.managementService = managementService;
this.name = name;
@@ -232,7 +200,7 @@
this.server = server;
this.managementAddress = managementAddress;
-
+
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -240,16 +208,11 @@
// ServerSession implementation ----------------------------------------------------------------------------
- public ServerSessionPacketHandler getHandler()
+ public void setCallback(final SessionCallback callback)
{
- return handler;
+ this.callback = callback;
}
-
- public void setHandler(final ServerSessionPacketHandler handler)
- {
- this.handler = handler;
- }
-
+
public String getUsername()
{
return username;
@@ -265,11 +228,6 @@
return minLargeMessageSize;
}
- public long getID()
- {
- return id;
- }
-
public String getName()
{
return name;
@@ -332,577 +290,333 @@
}
}
- public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
+ public void handleCreateConsumer(final long consumerID,
+ final SimpleString name,
+ final SimpleString filterString,
+ final boolean browseOnly) throws Exception
{
- SimpleString name = packet.getQueueName();
-
- SimpleString filterString = packet.getFilterString();
+ Binding binding = postOffice.getBinding(name);
- boolean browseOnly = packet.isBrowseOnly();
-
- Packet response = null;
-
- try
+ if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
{
- Binding binding = postOffice.getBinding(name);
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue " + name + " does not exist");
+ }
- if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
- {
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST, "Queue " + name + " does not exist");
- }
+ securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
- securityStore.check(binding.getAddress(), CheckType.CONSUME, this);
+ Filter filter = FilterImpl.createFilter(filterString);
- Filter filter = FilterImpl.createFilter(filterString);;
+ ServerConsumer consumer = new ServerConsumerImpl(consumerID,
+ this,
+ (QueueBinding)binding,
+ filter,
+ started,
+ browseOnly,
+ storageManager,
+ callback,
+ preAcknowledge,
+ strictUpdateDeliveryCount,
+ managementService);
- ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
- this,
- (QueueBinding)binding,
- filter,
- started,
- browseOnly,
- storageManager,
- channel,
- preAcknowledge,
- strictUpdateDeliveryCount,
- managementService);
+ consumers.put(consumer.getID(), consumer);
- consumers.put(consumer.getID(), consumer);
+ if (!browseOnly)
+ {
+ TypedProperties props = new TypedProperties();
- if (!browseOnly)
- {
- TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
- props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
- props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
+ props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
+ Queue theQueue = (Queue)binding.getBindable();
- Queue theQueue = (Queue)binding.getBindable();
+ props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
- props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+ if (filterString != null)
+ {
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ }
- if (filterString != null)
- {
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
- }
+ Notification notification = new Notification(null, CONSUMER_CREATED, props);
- Notification notification = new Notification(null, CONSUMER_CREATED, props);
+ managementService.sendNotification(notification);
+ }
+ }
- managementService.sendNotification(notification);
- }
-
- //We send back queue information on the queue as a response- this allows the queue to
- //be automaticall recreated on failover
-
- if (packet.isRequiresResponse())
- {
- response = doExecuteQueueQuery(name);
- }
- else
- {
- response = null;
- }
+ public void handleCreateQueue(final SimpleString address,
+ final SimpleString name,
+ final SimpleString filterString,
+ final boolean temporary,
+ final boolean durable) throws Exception
+ {
+ if (durable)
+ {
+ // make sure the user has privileges to create this queue
+ securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
}
- catch (Exception e)
+ else
{
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to create consumer", e);
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
}
-
- sendResponse(packet, response, false, false);
- }
- public void handleCreateQueue(final CreateQueueMessage packet)
- {
- SimpleString address = packet.getAddress();
+ server.createQueue(address, name, filterString, durable, temporary);
- final SimpleString name = packet.getQueueName();
-
- SimpleString filterString = packet.getFilterString();
-
- boolean temporary = packet.isTemporary();
-
- boolean durable = packet.isDurable();
-
- Packet response = null;
-
- try
+ if (temporary)
{
- if (durable)
- {
- // make sure the user has privileges to create this queue
- securityStore.check(address, CheckType.CREATE_DURABLE_QUEUE, this);
- }
- else
- {
- securityStore.check(address, CheckType.CREATE_NON_DURABLE_QUEUE, this);
- }
+ // Temporary queue in core simply means the queue will be deleted if
+ // the remoting connection
+ // dies. It does not mean it will get deleted automatically when the
+ // session is closed.
+ // It is up to the user to delete the queue when finished with it
- server.createQueue(address, name, filterString, durable, temporary);
-
- if (temporary)
+ failureRunners.put(name, new Runnable()
{
- // Temporary queue in core simply means the queue will be deleted if
- // the remoting connection
- // dies. It does not mean it will get deleted automatically when the
- // session is closed.
- // It is up to the user to delete the queue when finished with it
-
- failureRunners.put(name, new Runnable()
+ public void run()
{
- public void run()
+ try
{
- try
+ if (postOffice.getBinding(name) != null)
{
- if (postOffice.getBinding(name) != null)
+ postOffice.removeBinding(name);
+
+ if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
{
- postOffice.removeBinding(name);
-
- if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
- {
- creditManagerHolders.remove(name);
- }
+ creditManagerHolders.remove(name);
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
- }
}
- });
- }
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
- else
- {
- response = null;
- }
+ catch (Exception e)
+ {
+ ServerSessionImpl.log.error("Failed to remove temporary queue " + name);
+ }
+ }
+ });
}
- catch (Exception e)
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to create queue", e);
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
+ public void handleDeleteQueue(final SimpleString name) throws Exception
{
- SimpleString name = packet.getQueueName();
+ Binding binding = postOffice.getBinding(name);
- Packet response = null;
-
- try
+ if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
{
- Binding binding = postOffice.getBinding(name);
+ throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
+ }
- if (binding == null || binding.getType() != BindingType.LOCAL_QUEUE)
- {
- throw new HornetQException(HornetQException.QUEUE_DOES_NOT_EXIST);
- }
+ server.destroyQueue(name, this);
- server.destroyQueue(name, this);
+ failureRunners.remove(name);
- failureRunners.remove(name);
-
- if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
- {
- creditManagerHolders.remove(name);
- }
-
- response = new NullResponseMessage();
- }
- catch (Exception e)
+ if (postOffice.getBindingsForAddress(name).getBindings().size() == 0)
{
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to delete queue", e);
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ creditManagerHolders.remove(name);
}
-
- sendResponse(packet, response, false, false);
}
-
- public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
- {
- SimpleString name = packet.getQueueName();
- Packet response = null;
-
- try
+ public QueueQueryResult handleExecuteQueueQuery(final SimpleString name) throws Exception
+ {
+ if (name == null)
{
- response = doExecuteQueueQuery(name);
+ throw new IllegalArgumentException("Queue name is null");
}
- catch (Exception e)
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to execute queue query", e);
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
+ QueueQueryResult response;
- sendResponse(packet, response, false, false);
- }
+ Binding binding = postOffice.getBinding(name);
- public void handleExecuteBindingQuery(final SessionBindingQueryMessage packet)
- {
- SimpleString address = packet.getAddress();
-
- Packet response = null;
-
- try
+ if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
{
- if (address == null)
- {
- throw new IllegalArgumentException("Address is null");
- }
+ Queue queue = (Queue)binding.getBindable();
- List<SimpleString> names = new ArrayList<SimpleString>();
+ Filter filter = queue.getFilter();
- Bindings bindings = postOffice.getMatchingBindings(address);
+ SimpleString filterString = filter == null ? null : filter.getFilterString();
- for (Binding binding : bindings.getBindings())
- {
- if (binding.getType() == BindingType.LOCAL_QUEUE)
- {
- names.add(binding.getUniqueName());
- }
- }
-
- response = new SessionBindingQueryResponseMessage(!names.isEmpty(), names);
+ response = new QueueQueryResult(name,
+ binding.getAddress(),
+ queue.isDurable(),
+ queue.isTemporary(),
+ filterString,
+ queue.getConsumerCount(),
+ queue.getMessageCount());
}
- catch (Exception e)
+ // make an exception for the management address (see HORNETQ-29)
+ else if (name.equals(managementAddress))
{
- ServerSessionImpl.log.error("Failed to execute binding query", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1);
}
+ else
+ {
+ response = new QueueQueryResult();
+ }
- sendResponse(packet, response, false, false);
+ return response;
}
- public void handleForceConsumerDelivery(final SessionForceConsumerDelivery message)
+ public BindingQueryResult handleExecuteBindingQuery(final SimpleString address)
{
- try
+ if (address == null)
{
- ServerConsumer consumer = consumers.get(message.getConsumerID());
-
- consumer.forceDelivery(message.getSequence());
+ throw new IllegalArgumentException("Address is null");
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to query consumer deliveries", e);
- }
- sendResponse(message, null, false, false);
- }
+ List<SimpleString> names = new ArrayList<SimpleString>();
- public void handleAcknowledge(final SessionAcknowledgeMessage packet)
- {
- Packet response = null;
+ Bindings bindings = postOffice.getMatchingBindings(address);
- try
+ for (Binding binding : bindings.getBindings())
{
- ServerConsumer consumer = consumers.get(packet.getConsumerID());
-
- consumer.acknowledge(autoCommitAcks, tx, packet.getMessageID());
-
- if (packet.isRequiresResponse())
+ if (binding.getType() == BindingType.LOCAL_QUEUE)
{
- response = new NullResponseMessage();
+ names.add(binding.getUniqueName());
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to acknowledge", e);
- if (packet.isRequiresResponse())
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
- }
+ return new BindingQueryResult(!names.isEmpty(), names);
+ }
- sendResponse(packet, response, false, false);
+ public void handleForceConsumerDelivery(final long consumerID, final long sequence) throws Exception
+ {
+ ServerConsumer consumer = consumers.get(consumerID);
+
+ consumer.forceDelivery(sequence);
}
- public void handleExpired(final SessionExpiredMessage packet)
+ public void handleAcknowledge(final long consumerID, final long messageID) throws Exception
{
- try
- {
- MessageReference ref = consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
+ ServerConsumer consumer = consumers.get(consumerID);
- if (ref != null)
- {
- ref.getQueue().expire(ref);
- }
- }
- catch (Exception e)
+ consumer.acknowledge(autoCommitAcks, tx, messageID);
+ }
+
+ public void handleExpired(final long consumerID, final long messageID) throws Exception
+ {
+ MessageReference ref = consumers.get(consumerID).getExpired(messageID);
+
+ if (ref != null)
{
- ServerSessionImpl.log.error("Failed to acknowledge", e);
+ ref.getQueue().expire(ref);
}
-
- sendResponse(packet, null, false, false);
}
- public void handleCommit(final Packet packet)
+ public void handleCommit() throws Exception
{
- Packet response = null;
-
try
{
tx.commit();
-
- response = new NullResponseMessage();
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to commit", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
finally
{
tx = new TransactionImpl(storageManager);
}
-
- sendResponse(packet, response, false, false);
}
- public void handleRollback(final RollbackMessage packet)
+ public void handleRollback(final boolean considerLastMessageAsDelivered) throws Exception
{
- Packet response = null;
+ rollback(considerLastMessageAsDelivered);
+ }
- try
+ public void handleXACommit(final Xid xid, final boolean onePhase) throws Exception
+ {
+ if (tx != null)
{
- rollback(packet.isConsiderLastMessageAsDelivered());
+ final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
- response = new NullResponseMessage();
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
}
- catch (Exception e)
+ else
{
- ServerSessionImpl.log.error("Failed to rollback", e);
+ Transaction theTx = resourceManager.removeTransaction(xid);
- if (e instanceof HornetQException)
+ if (theTx == null)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+ {
+ throw new HornetQXAException(XAException.XA_HEURCOM,
+ "transaction has been heuristically committed: " + xid);
+ }
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ throw new HornetQXAException(XAException.XA_HEURRB,
+ "transaction has been heuristically rolled back: " + xid);
+ }
+ else
+ {
+ throw new HornetQXAException(XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+ }
}
else
{
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
+ if (theTx.getState() == Transaction.State.SUSPENDED)
+ {
+ // Put it back
+ resourceManager.putTransaction(xid, tx);
+
+ throw new HornetQXAException(XAException.XAER_PROTO, "Cannot commit transaction, it is suspended " + xid);
+ }
+ else
+ {
+ theTx.commit(onePhase);
+ }
}
}
-
- sendResponse(packet, response, false, false);
}
- public void handleXACommit(final SessionXACommitMessage packet)
+ public void handleXAEnd(final Xid xid) throws Exception
{
- Packet response = null;
-
- Xid xid = packet.getXid();
-
- try
+ if (tx != null && tx.getXid().equals(xid))
{
- if (tx != null)
+ if (tx.getState() == Transaction.State.SUSPENDED)
{
- final String msg = "Cannot commit, session is currently doing work in transaction " + tx.getXid();
+ final String msg = "Cannot end, transaction is suspended";
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
}
else
{
- Transaction theTx = resourceManager.removeTransaction(xid);
-
- if (theTx == null)
- {
- // checked heuristic committed transactions
- if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURCOM,
- "transaction has been heuristically committed: " + xid);
- }
- // checked heuristic rolled back transactions
- else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURRB,
- "transaction has been heuristically rolled back: " + xid);
- }
- else
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_NOTA,
- "Cannot find xid in resource manager: " + xid);
- }
- }
- else
- {
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- // Put it back
- resourceManager.putTransaction(xid, tx);
-
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot commit transaction, it is suspended " + xid);
- }
- else
- {
- theTx.commit(packet.isOnePhase());
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
- }
+ tx = null;
}
}
- catch (Exception e)
+ else
{
- ServerSessionImpl.log.error("Failed to xa commit", e);
+ // It's also legal for the TM to call end for a Xid in the suspended
+ // state
+ // See JTA 1.1 spec 3.4.4 - state diagram
+ // Although in practice TMs rarely do this.
+ Transaction theTx = resourceManager.getTransaction(xid);
- if (e instanceof HornetQException)
+ if (theTx == null)
{
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
+ final String msg = "Cannot find suspended transaction to end " + xid;
- sendResponse(packet, response, false, false);
- }
-
- public void handleXAEnd(final SessionXAEndMessage packet)
- {
- Packet response = null;
-
- Xid xid = packet.getXid();
-
- try
- {
- if (tx != null && tx.getXid().equals(xid))
- {
- if (tx.getState() == Transaction.State.SUSPENDED)
- {
- final String msg = "Cannot end, transaction is suspended";
-
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
- else
- {
- tx = null;
- }
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
else
{
- // It's also legal for the TM to call end for a Xid in the suspended
- // state
- // See JTA 1.1 spec 3.4.4 - state diagram
- // Although in practice TMs rarely do this.
- Transaction theTx = resourceManager.getTransaction(xid);
-
- if (theTx == null)
+ if (theTx.getState() != Transaction.State.SUSPENDED)
{
- final String msg = "Cannot find suspended transaction to end " + xid;
+ final String msg = "Transaction is not suspended " + xid;
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
}
else
{
- if (theTx.getState() != Transaction.State.SUSPENDED)
- {
- final String msg = "Transaction is not suspended " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
- else
- {
- theTx.resume();
- }
+ theTx.resume();
}
}
-
- if (response == null)
- {
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa end", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleXAForget(final SessionXAForgetMessage packet)
+ public void handleXAForget(final Xid xid) throws Exception
{
- long id = resourceManager.removeHeuristicCompletion(packet.getXid());
- int code = XAResource.XA_OK;
+ long id = resourceManager.removeHeuristicCompletion(xid);
+
if (id != -1)
{
try
@@ -912,402 +626,236 @@
catch (Exception e)
{
e.printStackTrace();
- code = XAException.XAER_RMERR;
+
+ throw new HornetQXAException(XAException.XAER_RMERR);
}
}
else
{
- code = XAException.XAER_NOTA;
+ throw new HornetQXAException(XAException.XAER_NOTA);
}
-
- Packet response = new SessionXAResponseMessage((code != XAResource.XA_OK), code, null);
-
- sendResponse(packet, response, false, false);
}
- public void handleXAJoin(final SessionXAJoinMessage packet)
+ public void handleXAJoin(final Xid xid) throws Exception
{
- Packet response = null;
+ Transaction theTx = resourceManager.getTransaction(xid);
- Xid xid = packet.getXid();
-
- try
+ if (theTx == null)
{
- Transaction theTx = resourceManager.getTransaction(xid);
+ final String msg = "Cannot find xid in resource manager: " + xid;
- if (theTx == null)
- {
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
- }
- else
- {
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot join tx, it is suspended " + xid);
- }
- else
- {
- tx = theTx;
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
- }
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
- catch (Exception e)
+ else
{
- ServerSessionImpl.log.error("Failed to xa join", e);
-
- if (e instanceof HornetQException)
+ if (theTx.getState() == Transaction.State.SUSPENDED)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ throw new HornetQXAException(XAException.XAER_PROTO, "Cannot join tx, it is suspended " + xid);
}
else
{
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
+ tx = theTx;
}
}
-
- sendResponse(packet, response, false, false);
}
- public void handleXAResume(final SessionXAResumeMessage packet)
+ public void handleXAResume(final Xid xid) throws Exception
{
- Packet response = null;
+ if (tx != null)
+ {
+ final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
- Xid xid = packet.getXid();
-
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx != null)
+ Transaction theTx = resourceManager.getTransaction(xid);
+
+ if (theTx == null)
{
- final String msg = "Cannot resume, session is currently doing work in a transaction " + tx.getXid();
+ final String msg = "Cannot find xid in resource manager: " + xid;
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
else
{
- Transaction theTx = resourceManager.getTransaction(xid);
-
- if (theTx == null)
+ if (theTx.getState() != Transaction.State.SUSPENDED)
{
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ throw new HornetQXAException(XAException.XAER_PROTO,
+ "Cannot resume transaction, it is not suspended " + xid);
}
else
{
- if (theTx.getState() != Transaction.State.SUSPENDED)
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot resume transaction, it is not suspended " + xid);
- }
- else
- {
- tx = theTx;
+ tx = theTx;
- tx.resume();
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
+ tx.resume();
}
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa resume", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleXARollback(final SessionXARollbackMessage packet)
+ public void handleXARollback(final Xid xid) throws Exception
{
- Packet response = null;
+ if (tx != null)
+ {
+ final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
- Xid xid = packet.getXid();
-
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx != null)
- {
- final String msg = "Cannot roll back, session is currently doing work in a transaction " + tx.getXid();
+ Transaction theTx = resourceManager.removeTransaction(xid);
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
- else
+ if (theTx == null)
{
- Transaction theTx = resourceManager.removeTransaction(xid);
-
- if (theTx == null)
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
{
- // checked heuristic committed transactions
- if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURCOM,
- "transaction has ben heuristically committed: " + xid);
- }
- // checked heuristic rolled back transactions
- else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
- {
- response = new SessionXAResponseMessage(true,
- XAException.XA_HEURRB,
- "transaction has ben heuristically rolled back: " + xid);
- }
- else
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_NOTA,
- "Cannot find xid in resource manager: " + xid);
- }
+ throw new HornetQXAException(XAException.XA_HEURCOM,
+ "transaction has ben heuristically committed: " + xid);
}
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ throw new HornetQXAException(XAException.XA_HEURRB,
+ "transaction has ben heuristically rolled back: " + xid);
+ }
else
{
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- // Put it back
- resourceManager.putTransaction(xid, tx);
-
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot rollback transaction, it is suspended " + xid);
- }
- else
- {
- doRollback(false, theTx);
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
+ throw new HornetQXAException(XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
}
}
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa rollback", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
else
{
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
- }
-
- public void handleXAStart(final SessionXAStartMessage packet)
- {
- Packet response = null;
-
- Xid xid = packet.getXid();
-
- try
- {
- if (tx != null)
- {
- final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
-
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
- else
- {
- tx = new TransactionImpl(xid, storageManager, postOffice);
-
- boolean added = resourceManager.putTransaction(xid, tx);
-
- if (!added)
+ if (theTx.getState() == Transaction.State.SUSPENDED)
{
- final String msg = "Cannot start, there is already a xid " + tx.getXid();
+ // Put it back
+ resourceManager.putTransaction(xid, tx);
- response = new SessionXAResponseMessage(true, XAException.XAER_DUPID, msg);
+ throw new HornetQXAException(XAException.XAER_PROTO,
+ "Cannot rollback transaction, it is suspended " + xid);
}
else
{
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ doRollback(false, theTx);
}
}
}
- catch (Exception e)
+ }
+
+ public void handleXAStart(final Xid xid) throws Exception
+ {
+ if (tx != null)
{
- ServerSessionImpl.log.error("Failed to xa start", e);
+ final String msg = "Cannot start, session is already doing work in a transaction " + tx.getXid();
- if (e instanceof HornetQException)
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
+ {
+ tx = new TransactionImpl(xid, storageManager, postOffice);
+
+ boolean added = resourceManager.putTransaction(xid, tx);
+
+ if (!added)
{
- response = new HornetQExceptionMessage((HornetQException)e);
+ final String msg = "Cannot start, there is already a xid " + tx.getXid();
+
+ throw new HornetQXAException(XAException.XAER_DUPID, msg);
}
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
}
-
- sendResponse(packet, response, false, false);
}
- public void handleXASuspend(final Packet packet)
+ public void handleXASuspend() throws Exception
{
- Packet response = null;
+ if (tx == null)
+ {
+ final String msg = "Cannot suspend, session is not doing work in a transaction ";
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx == null)
+ if (tx.getState() == Transaction.State.SUSPENDED)
{
- final String msg = "Cannot suspend, session is not doing work in a transaction ";
+ final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
}
else
{
- if (tx.getState() == Transaction.State.SUSPENDED)
- {
- final String msg = "Cannot suspend, transaction is already suspended " + tx.getXid();
+ tx.suspend();
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
- }
- else
- {
- tx.suspend();
-
- tx = null;
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
+ tx = null;
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa suspend", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleXAPrepare(final SessionXAPrepareMessage packet)
+ public void handleXAPrepare(final Xid xid) throws Exception
{
- Packet response = null;
+ if (tx != null)
+ {
+ final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
- Xid xid = packet.getXid();
-
- try
+ throw new HornetQXAException(XAException.XAER_PROTO, msg);
+ }
+ else
{
- if (tx != null)
+ Transaction theTx = resourceManager.getTransaction(xid);
+
+ if (theTx == null)
{
- final String msg = "Cannot commit, session is currently doing work in a transaction " + tx.getXid();
+ final String msg = "Cannot find xid in resource manager: " + xid;
- response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
+ throw new HornetQXAException(XAException.XAER_NOTA, msg);
}
else
{
- Transaction theTx = resourceManager.getTransaction(xid);
-
- if (theTx == null)
+ if (theTx.getState() == Transaction.State.SUSPENDED)
{
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ throw new HornetQXAException(XAException.XAER_PROTO,
+ "Cannot prepare transaction, it is suspended " + xid);
}
else
{
- if (theTx.getState() == Transaction.State.SUSPENDED)
- {
- response = new SessionXAResponseMessage(true,
- XAException.XAER_PROTO,
- "Cannot prepare transaction, it is suspended " + xid);
- }
- else
- {
- theTx.prepare();
-
- response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
- }
+ theTx.prepare();
}
}
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to xa prepare", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleGetInDoubtXids(final Packet packet)
+ public List<Xid> handleGetInDoubtXids()
{
- List<Xid> indoubtsXids = new ArrayList<Xid>();
- indoubtsXids.addAll(resourceManager.getPreparedTransactions());
- indoubtsXids.addAll(resourceManager.getHeuristicCommittedTransactions());
- indoubtsXids.addAll(resourceManager.getHeuristicRolledbackTransactions());
- Packet response = new SessionXAGetInDoubtXidsResponseMessage(indoubtsXids);
+ List<Xid> xids = new ArrayList<Xid>();
- sendResponse(packet, response, false, false);
+ xids.addAll(resourceManager.getPreparedTransactions());
+ xids.addAll(resourceManager.getHeuristicCommittedTransactions());
+ xids.addAll(resourceManager.getHeuristicRolledbackTransactions());
+
+ return xids;
}
- public void handleGetXATimeout(final Packet packet)
+ public int handleGetXATimeout()
{
- Packet response = new SessionXAGetTimeoutResponseMessage(resourceManager.getTimeoutSeconds());
-
- sendResponse(packet, response, false, false);
+ return resourceManager.getTimeoutSeconds();
}
- public void handleSetXATimeout(final SessionXASetTimeoutMessage packet)
+ public void handleSetXATimeout(final int timeout)
{
- Packet response = new SessionXASetTimeoutResponseMessage(resourceManager.setTimeoutSeconds(packet.getTimeoutSeconds()));
-
- sendResponse(packet, response, false, false);
+ resourceManager.setTimeoutSeconds(timeout);
}
- public void handleStart(final Packet packet)
+ public void handleStart()
{
setStarted(true);
-
- sendResponse(packet, null, false, false);
}
- public void handleStop(final Packet packet)
+ public void handleStop()
{
- final Packet response = new NullResponseMessage();
-
setStarted(false);
-
- sendResponse(packet, response, false, false);
}
- public void handleClose(final Packet packet)
+ public void handleClose()
{
storageManager.afterCompleteOperations(new IOAsyncTask()
{
@@ -1317,97 +865,63 @@
public void done()
{
- doClose(packet);
+ try
+ {
+ close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
}
});
}
- public void handleCloseConsumer(final SessionConsumerCloseMessage packet)
+ public void handleCloseConsumer(final long consumerID) throws Exception
{
- final ServerConsumer consumer = consumers.get(packet.getConsumerID());
+ final ServerConsumer consumer = consumers.get(consumerID);
- Packet response;
-
- try
+ if (consumer != null)
{
- if (consumer != null)
- {
- consumer.close();
- }
- else
- {
- ServerSessionImpl.log.error("Cannot find consumer with id " + packet.getConsumerID());
- }
-
- response = new NullResponseMessage();
+ consumer.close();
}
- catch (Exception e)
+ else
{
- ServerSessionImpl.log.error("Failed to close consumer", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
+ ServerSessionImpl.log.error("Cannot find consumer with id " + consumerID);
}
-
- sendResponse(packet, response, false, false);
}
- public void handleReceiveConsumerCredits(final SessionConsumerFlowCreditMessage packet)
+ public void handleReceiveConsumerCredits(final long consumerID, final int credits) throws Exception
{
- ServerConsumer consumer = consumers.get(packet.getConsumerID());
+ ServerConsumer consumer = consumers.get(consumerID);
if (consumer == null)
{
- ServerSessionImpl.log.error("There is no consumer with id " + packet.getConsumerID());
-
+ ServerSessionImpl.log.error("There is no consumer with id " + consumerID);
+
return;
}
- try
- {
- consumer.receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to receive credits " + server.getConfiguration().isBackup(), e);
- }
-
- sendResponse(packet, null, false, false);
+ consumer.receiveCredits(credits);
}
- public void handleSendLargeMessage(final SessionSendLargeMessage packet)
+ public void handleSendLargeMessage(final byte[] largeMessageHeader) throws Exception
{
-
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
- LargeServerMessage msg = doCreateLargeMessage(id, packet);
+ LargeServerMessage msg = storageManager.createLargeMessage(id, largeMessageHeader);
- if (msg != null)
+ if (currentLargeMessage != null)
{
- if (currentLargeMessage != null)
- {
- ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
- }
-
- currentLargeMessage = msg;
-
- sendResponse(packet, null, false, false);
+ ServerSessionImpl.log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
}
+
+ currentLargeMessage = msg;
}
- public void handleSend(final SessionSendMessage packet)
+ public void handleSend(final ServerMessage message) throws Exception
{
- Packet response = null;
-
- ServerMessage message = (ServerMessage)packet.getMessage();
-
try
{
long id = storageManager.generateUniqueID();
@@ -1425,28 +939,7 @@
{
send(message);
}
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
}
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to send message", e);
-
- if (packet.isRequiresResponse())
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
- }
finally
{
try
@@ -1458,74 +951,40 @@
ServerSessionImpl.log.error("Failed to release outstanding credits", e);
}
}
-
- sendResponse(packet, response, false, false);
}
- public void handleSendContinuations(final SessionSendContinuationMessage packet)
+ public void handleSendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
{
- Packet response = null;
-
- try
+ if (currentLargeMessage == null)
{
- if (currentLargeMessage == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
- }
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
+ }
- // Immediately release the credits for the continuations- these don't contribute to the in-memory size
- // of the message
+ // Immediately release the credits for the continuations- these don't contribute to the in-memory size
+ // of the message
- releaseOutStanding(currentLargeMessage, packet.getPacketSize());
+ releaseOutStanding(currentLargeMessage, packetSize);
- currentLargeMessage.addBytes(packet.getBody());
+ currentLargeMessage.addBytes(body);
- if (!packet.isContinues())
- {
- currentLargeMessage.releaseResources();
+ if (!continues)
+ {
+ currentLargeMessage.releaseResources();
- send(currentLargeMessage);
+ send(currentLargeMessage);
- releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
+ releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
- currentLargeMessage = null;
- }
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
+ currentLargeMessage = null;
}
- catch (Exception e)
- {
- if (packet.isRequiresResponse())
- {
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- ServerSessionImpl.log.error("Failed to send message", e);
-
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
- }
-
- sendResponse(packet, response, false, false);
}
- public void handleRequestProducerCredits(final SessionRequestProducerCreditsMessage packet) throws Exception
+ public void handleRequestProducerCredits(final SimpleString address, final int credits) throws Exception
{
- final SimpleString address = packet.getAddress();
-
final CreditManagerHolder holder = getCreditManagerHolder(address);
- int credits = packet.getCredits();
-
- //Requesting -ve credits means returning them
-
+ // Requesting -ve credits means returning them
+
if (credits < 0)
{
releaseOutStanding(address, -credits);
@@ -1541,7 +1000,7 @@
if (!closed)
{
sendProducerCredits(holder, credits, address);
-
+
return true;
}
else
@@ -1551,23 +1010,33 @@
}
}
});
-
+
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
}
}
+ }
+
+ public void setTransferring(final boolean transferring)
+ {
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
- sendResponse(packet, null, false, false);
+ for (ServerConsumer consumer : consumersClone)
+ {
+ consumer.setTransferring(transferring);
+ }
}
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
- {
- //We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get delivered
- //after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
- //sequence of packets.
- //It is not sufficient to just stop the session, since right after stopping the session, another session start might be executed
- //before we have transferred the connection, leaving it in a started state
+ {
+ // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
+ // delivered
+ // after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
+ // sequence of packets.
+ // It is not sufficient to just stop the session, since right after stopping the session, another session start
+ // might be executed
+ // before we have transferred the connection, leaving it in a started state
setTransferring(true);
remotingConnection.removeFailureListener(this);
@@ -1579,9 +1048,9 @@
// the replicating connection will cause the outstanding responses to be be replayed on the live server,
// if these reach the client who then subsequently fails over, on reconnection to backup, it will have
// received responses that the backup did not know about.
-
+
channel.transferConnection(newConnection);
-
+
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
remotingConnection = newConnection;
@@ -1591,10 +1060,10 @@
int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
- channel.replayCommands(lastReceivedCommandID, id);
-
+ channel.replayCommands(lastReceivedCommandID);
+
channel.setTransferring(false);
-
+
setTransferring(false);
return serverLastReceivedCommandID;
@@ -1604,6 +1073,21 @@
{
return channel;
}
+
+ public void runConnectionFailureRunners()
+ {
+ for (Runnable runner : failureRunners.values())
+ {
+ try
+ {
+ runner.run();
+ }
+ catch (Throwable t)
+ {
+ ServerSessionImpl.log.error("Failed to execute failure runner", t);
+ }
+ }
+ }
// FailureListener implementation
// --------------------------------------------------------------------
@@ -1626,7 +1110,7 @@
}
}
- handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
+ handleClose();
ServerSessionImpl.log.warn("Cleared up resources for session " + name);
}
@@ -1654,7 +1138,7 @@
}
catch (Throwable t)
{
- ServerSessionImpl.log.error("Failed fire listeners " + this);
+ ServerSessionImpl.log.error("Failed to fire listeners " + this);
}
}
@@ -1665,129 +1149,6 @@
// Private
// ----------------------------------------------------------------------------
- private SessionQueueQueryResponseMessage doExecuteQueueQuery(final SimpleString name) throws Exception
- {
- if (name == null)
- {
- throw new IllegalArgumentException("Queue name is null");
- }
-
- SessionQueueQueryResponseMessage response;
-
- Binding binding = postOffice.getBinding(name);
-
- if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
- {
- Queue queue = (Queue)binding.getBindable();
-
- Filter filter = queue.getFilter();
-
- SimpleString filterString = filter == null ? null : filter.getFilterString();
-
- response = new SessionQueueQueryResponseMessage(name,
- binding.getAddress(),
- queue.isDurable(),
- queue.isTemporary(),
- filterString,
- queue.getConsumerCount(),
- queue.getMessageCount());
- }
- // make an exception for the management address (see HORNETQ-29)
- else if (name.equals(managementAddress))
- {
- response = new SessionQueueQueryResponseMessage(name, managementAddress, true, false, null, -1, -1);
- }
- else
- {
- response = new SessionQueueQueryResponseMessage();
- }
-
- return response;
- }
-
- private void sendResponse(final Packet confirmPacket,
- final Packet response,
- final boolean flush,
- final boolean closeChannel)
- {
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
- public void onError(final int errorCode, final String errorMessage)
- {
- ServerSessionImpl.log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
-
- HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new HornetQException(errorCode,
- errorMessage));
-
- doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
- }
-
- public void done()
- {
- doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
- }
- });
- }
-
- /**
- * @param confirmPacket
- * @param response
- * @param flush
- * @param closeChannel
- */
- private void doConfirmAndResponse(final Packet confirmPacket,
- final Packet response,
- final boolean flush,
- final boolean closeChannel)
- {
- if (confirmPacket != null)
- {
- channel.confirm(confirmPacket);
-
- if (flush)
- {
- channel.flushConfirmations();
- }
- }
-
- if (response != null)
- {
- channel.send(response);
- }
-
- if (closeChannel)
- {
- channel.close();
- }
- }
-
- private void doClose(final Packet packet)
- {
- Packet response = null;
-
- try
- {
- close();
-
- response = new NullResponseMessage();
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to close", e);
-
- if (e instanceof HornetQException)
- {
- response = new HornetQExceptionMessage((HornetQException)e);
- }
- else
- {
- response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
- }
- }
-
- sendResponse(packet, response, true, true);
- }
-
private void setStarted(final boolean s)
{
Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -1800,44 +1161,6 @@
started = s;
}
- private void setTransferring(final boolean transferring)
- {
- Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
-
- for (ServerConsumer consumer : consumersClone)
- {
- consumer.setTransferring(transferring);
- }
- }
-
- /**
- * We need to create the LargeMessage before replicating the packet, or else we won't know how to extract the destination,
- * which is stored on the header
- * @param packet
- * @throws Exception
- */
- private LargeServerMessage doCreateLargeMessage(final long id, final SessionSendLargeMessage packet)
- {
- try
- {
- LargeServerMessage msg = createLargeMessageStorage(id, packet.getLargeMessageHeader());
-
- return msg;
- }
- catch (Exception e)
- {
- ServerSessionImpl.log.error("Failed to create large message", e);
- Packet response = null;
-
- channel.confirm(packet);
- if (response != null)
- {
- channel.send(response);
- }
- return null;
- }
- }
-
private void handleManagementMessage(final ServerMessage message) throws Exception
{
try
@@ -1865,11 +1188,6 @@
}
}
- private LargeServerMessage createLargeMessageStorage(final long id, final byte[] header) throws Exception
- {
- return storageManager.createLargeMessage(id, header);
- }
-
private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx) throws Exception
{
boolean wasStarted = started;
@@ -1931,7 +1249,7 @@
{
releaseOutStanding(message.getAddress(), credits);
}
-
+
private void releaseOutStanding(final SimpleString address, final int credits) throws Exception
{
CreditManagerHolder holder = getCreditManagerHolder(address);
@@ -1961,9 +1279,7 @@
{
holder.outstandingCredits += credits;
- Packet packet = new SessionProducerCreditsMessage(credits, address, -1);
-
- channel.send(packet);
+ callback.sendProducerCreditsMessage(credits, address, -1);
}
private void send(final ServerMessage msg) throws Exception
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -44,23 +44,44 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+import java.util.List;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.exception.HornetQXAException;
+import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
@@ -68,13 +89,21 @@
import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
+import org.hornetq.core.server.BindingQueryResult;
+import org.hornetq.core.server.QueueQueryResult;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
/**
* A ServerSessionPacketHandler
@@ -84,7 +113,7 @@
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
* @author <a href="mailto:clebert.suconic@jboss.org>Clebert Suconic</a>
*/
-public class ServerSessionPacketHandler implements ChannelHandler
+public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener, SessionCallback
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
@@ -95,220 +124,486 @@
// Storagemanager here is used to set the Context
private final StorageManager storageManager;
+ private final Channel channel;
+
+ private volatile RemotingConnection remotingConnection;
+
public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
- final StorageManager storageManager)
+ final StorageManager storageManager,
+ final Channel channel)
{
this.session = session;
this.storageManager = storageManager;
this.sessionContext = sessionContext;
+
+ this.channel = channel;
+
+ this.remotingConnection = channel.getConnection();
+
+ addConnectionListeners();
}
public long getID()
{
- return session.getID();
+ return channel.getID();
}
+ public void connectionFailed(final HornetQException exception)
+ {
+ log.warn("Client connection failed, clearing up resources for session " + session.getName());
+
+ session.runConnectionFailureRunners();
+
+ handleCloseSession();
+
+ log.warn("Cleared up resources for session " + session.getName());
+ }
+
+ public void close()
+ {
+ channel.flushConfirmations();
+
+ handleCloseSession();
+ }
+
+ public void connectionClosed()
+ {
+ session.runConnectionFailureRunners();
+ }
+
+ private void addConnectionListeners()
+ {
+ remotingConnection.addFailureListener(this);
+ remotingConnection.addCloseListener(this);
+ }
+
+ private void removeConnectionListeners()
+ {
+ remotingConnection.removeFailureListener(this);
+ remotingConnection.removeCloseListener(this);
+ }
+
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
+ public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
+ {
+ // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
+ // delivered
+ // after the channel has transferred but *before* packets have been replayed - this will give the client the wrong
+ // sequence of packets.
+ // It is not sufficient to just stop the session, since right after stopping the session, another session start
+ // might be executed
+ // before we have transferred the connection, leaving it in a started state
+ session.setTransferring(true);
+
+ removeConnectionListeners();
+
+ // Note. We do not destroy the replicating connection here. In the case the live server has really crashed
+ // then the connection will get cleaned up anyway when the server ping timeout kicks in.
+ // In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
+ // the replicating connection will cause the outstanding responses to be be replayed on the live server,
+ // if these reach the client who then subsequently fails over, on reconnection to backup, it will have
+ // received responses that the backup did not know about.
+
+ channel.transferConnection(newConnection);
+
+ newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
+
+ remotingConnection = newConnection;
+
+ addConnectionListeners();
+
+ int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
+
+ channel.replayCommands(lastReceivedCommandID);
+
+ channel.setTransferring(false);
+
+ session.setTransferring(false);
+
+ return serverLastReceivedCommandID;
+ }
+
public void handlePacket(final Packet packet)
{
byte type = packet.getType();
storageManager.setContext(sessionContext);
+ Packet response = null;
+ boolean flush = false;
+ boolean closeChannel = false;
+
try
{
- switch (type)
+ try
{
- case SESS_CREATECONSUMER:
+ switch (type)
{
- SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
- session.handleCreateConsumer(request);
- break;
+ case SESS_CREATECONSUMER:
+ {
+ SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
+ session.handleCreateConsumer(request.getID(),
+ request.getQueueName(),
+ request.getFilterString(),
+ request.isBrowseOnly());
+ if (request.isRequiresResponse())
+ {
+ // We send back queue information on the queue as a response- this allows the queue to
+ // be automaticall recreated on failover
+ response = new SessionQueueQueryResponseMessage(session.handleExecuteQueueQuery(request.getQueueName()));
+ }
+
+ break;
+ }
+ case CREATE_QUEUE:
+ {
+ CreateQueueMessage request = (CreateQueueMessage)packet;
+ session.handleCreateQueue(request.getAddress(),
+ request.getQueueName(),
+ request.getFilterString(),
+ request.isTemporary(),
+ request.isDurable());
+ if (request.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
+ case DELETE_QUEUE:
+ {
+ SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
+ session.handleDeleteQueue(request.getQueueName());
+ response = new NullResponseMessage();
+ break;
+ }
+ case SESS_QUEUEQUERY:
+ {
+ SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
+ QueueQueryResult result = session.handleExecuteQueueQuery(request.getQueueName());
+ response = new SessionQueueQueryResponseMessage(result);
+ break;
+ }
+ case SESS_BINDINGQUERY:
+ {
+ SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
+ BindingQueryResult result = session.handleExecuteBindingQuery(request.getAddress());
+ response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
+ break;
+ }
+ case SESS_ACKNOWLEDGE:
+ {
+ SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
+ session.handleAcknowledge(message.getConsumerID(), message.getMessageID());
+ if (message.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
+ case SESS_EXPIRED:
+ {
+ SessionExpiredMessage message = (SessionExpiredMessage)packet;
+ session.handleExpired(message.getConsumerID(), message.getMessageID());
+ break;
+ }
+ case SESS_COMMIT:
+ {
+ session.handleCommit();
+ response = new NullResponseMessage();
+ break;
+ }
+ case SESS_ROLLBACK:
+ {
+ session.handleRollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
+ response = new NullResponseMessage();
+ break;
+ }
+ case SESS_XA_COMMIT:
+ {
+ SessionXACommitMessage message = (SessionXACommitMessage)packet;
+ session.handleXACommit(message.getXid(), message.isOnePhase());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_END:
+ {
+ SessionXAEndMessage message = (SessionXAEndMessage)packet;
+ session.handleXAEnd(message.getXid());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_FORGET:
+ {
+ SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
+ session.handleXAForget(message.getXid());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_JOIN:
+ {
+ SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
+ session.handleXAJoin(message.getXid());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_RESUME:
+ {
+ SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
+ session.handleXAResume(message.getXid());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_ROLLBACK:
+ {
+ SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
+ session.handleXARollback(message.getXid());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_START:
+ {
+ SessionXAStartMessage message = (SessionXAStartMessage)packet;
+ session.handleXAStart(message.getXid());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_SUSPEND:
+ {
+ session.handleXASuspend();
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_PREPARE:
+ {
+ SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
+ session.handleXAPrepare(message.getXid());
+ response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
+ break;
+ }
+ case SESS_XA_INDOUBT_XIDS:
+ {
+ List<Xid> xids = session.handleGetInDoubtXids();
+ response = new SessionXAGetInDoubtXidsResponseMessage(xids);
+ break;
+ }
+ case SESS_XA_GET_TIMEOUT:
+ {
+ int timeout = session.handleGetXATimeout();
+ response = new SessionXAGetTimeoutResponseMessage(timeout);
+ break;
+ }
+ case SESS_XA_SET_TIMEOUT:
+ {
+ SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
+ session.handleSetXATimeout(message.getTimeoutSeconds());
+ response = new SessionXASetTimeoutResponseMessage(true);
+ break;
+ }
+ case SESS_START:
+ {
+ session.handleStart();
+ break;
+ }
+ case SESS_STOP:
+ {
+ session.handleStop();
+ response = new NullResponseMessage();
+ break;
+ }
+ case SESS_CLOSE:
+ {
+ handleCloseSession();
+ removeConnectionListeners();
+ response = new NullResponseMessage();
+ flush = true;
+ closeChannel = true;
+ break;
+ }
+ case SESS_CONSUMER_CLOSE:
+ {
+ SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
+ session.handleCloseConsumer(message.getConsumerID());
+ response = new NullResponseMessage();
+ break;
+ }
+ case SESS_FLOWTOKEN:
+ {
+ SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
+ session.handleReceiveConsumerCredits(message.getConsumerID(), message.getCredits());
+ break;
+ }
+ case SESS_SEND:
+ {
+ SessionSendMessage message = (SessionSendMessage)packet;
+ session.handleSend((ServerMessage)message.getMessage());
+ if (message.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
+ case SESS_SEND_LARGE:
+ {
+ SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
+ session.handleSendLargeMessage(message.getLargeMessageHeader());
+ break;
+ }
+ case SESS_SEND_CONTINUATION:
+ {
+ SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
+ session.handleSendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
+ if (message.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ break;
+ }
+ case SESS_FORCE_CONSUMER_DELIVERY:
+ {
+ SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
+ session.handleForceConsumerDelivery(message.getConsumerID(), message.getSequence());
+ break;
+ }
+ case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
+ {
+ SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)packet;
+ session.handleRequestProducerCredits(message.getAddress(), message.getCredits());
+ break;
+ }
}
- case CREATE_QUEUE:
- {
- CreateQueueMessage request = (CreateQueueMessage)packet;
- session.handleCreateQueue(request);
- break;
- }
- case DELETE_QUEUE:
- {
- SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
- session.handleDeleteQueue(request);
- break;
- }
- case SESS_QUEUEQUERY:
- {
- SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
- session.handleExecuteQueueQuery(request);
- break;
- }
- case SESS_BINDINGQUERY:
- {
- SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
- session.handleExecuteBindingQuery(request);
- break;
- }
- case SESS_ACKNOWLEDGE:
- {
- SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
- session.handleAcknowledge(message);
- break;
- }
- case SESS_EXPIRED:
- {
- SessionExpiredMessage message = (SessionExpiredMessage)packet;
- session.handleExpired(message);
- break;
- }
- case SESS_COMMIT:
- {
- session.handleCommit(packet);
- break;
- }
- case SESS_ROLLBACK:
- {
- session.handleRollback((RollbackMessage)packet);
- break;
- }
- case SESS_XA_COMMIT:
- {
- SessionXACommitMessage message = (SessionXACommitMessage)packet;
- session.handleXACommit(message);
- break;
- }
- case SESS_XA_END:
- {
- SessionXAEndMessage message = (SessionXAEndMessage)packet;
- session.handleXAEnd(message);
- break;
- }
- case SESS_XA_FORGET:
- {
- SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
- session.handleXAForget(message);
- break;
- }
- case SESS_XA_JOIN:
- {
- SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
- session.handleXAJoin(message);
- break;
- }
- case SESS_XA_RESUME:
- {
- SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
- session.handleXAResume(message);
- break;
- }
- case SESS_XA_ROLLBACK:
- {
- SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
- session.handleXARollback(message);
- break;
- }
- case SESS_XA_START:
- {
- SessionXAStartMessage message = (SessionXAStartMessage)packet;
- session.handleXAStart(message);
- break;
- }
- case SESS_XA_SUSPEND:
- {
- session.handleXASuspend(packet);
- break;
- }
- case SESS_XA_PREPARE:
- {
- SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
- session.handleXAPrepare(message);
- break;
- }
- case SESS_XA_INDOUBT_XIDS:
- {
- session.handleGetInDoubtXids(packet);
- break;
- }
- case SESS_XA_GET_TIMEOUT:
- {
- session.handleGetXATimeout(packet);
- break;
- }
- case SESS_XA_SET_TIMEOUT:
- {
- SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
- session.handleSetXATimeout(message);
- break;
- }
- case SESS_START:
- {
- session.handleStart(packet);
- break;
- }
- case SESS_STOP:
- {
- session.handleStop(packet);
- break;
- }
- case SESS_CLOSE:
- {
- session.handleClose(packet);
- break;
- }
- case SESS_CONSUMER_CLOSE:
- {
- SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
- session.handleCloseConsumer(message);
- break;
- }
- case SESS_FLOWTOKEN:
- {
- SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
- session.handleReceiveConsumerCredits(message);
- break;
- }
- case SESS_SEND:
- {
- SessionSendMessage message = (SessionSendMessage)packet;
- session.handleSend(message);
- break;
- }
- case SESS_SEND_LARGE:
- {
- SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
- session.handleSendLargeMessage(message);
- break;
- }
- case SESS_SEND_CONTINUATION:
- {
- SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
- session.handleSendContinuations(message);
- break;
- }
- case SESS_FORCE_CONSUMER_DELIVERY:
- {
- SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
- session.handleForceConsumerDelivery(message);
- break;
- }
- case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
- {
- SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)packet;
- session.handleRequestProducerCredits(message);
- break;
- }
}
+ catch (HornetQXAException e)
+ {
+ response = new SessionXAResponseMessage(true, e.errorCode, e.getMessage());
+ }
+ catch (HornetQException e)
+ {
+ response = new HornetQExceptionMessage((HornetQException)e);
+ }
+ catch (Throwable t)
+ {
+ log.error("Caught unexpected exception", t);
+ }
+
+ sendResponse(packet, response, flush, closeChannel);
}
- catch (Throwable t)
- {
- ServerSessionPacketHandler.log.error("Caught unexpected exception", t);
- }
finally
{
storageManager.completeOperations();
storageManager.clearContext();
}
}
+
+ private void sendResponse(final Packet confirmPacket,
+ final Packet response,
+ final boolean flush,
+ final boolean closeChannel)
+ {
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
+
+ HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new HornetQException(errorCode,
+ errorMessage));
+
+ doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+ }
+
+ public void done()
+ {
+ doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
+ }
+ });
+ }
+
+ private void doConfirmAndResponse(final Packet confirmPacket,
+ final Packet response,
+ final boolean flush,
+ final boolean closeChannel)
+ {
+ if (confirmPacket != null)
+ {
+ channel.confirm(confirmPacket);
+
+ if (flush)
+ {
+ channel.flushConfirmations();
+ }
+ }
+
+ if (response != null)
+ {
+ channel.send(response);
+ }
+
+ if (closeChannel)
+ {
+ channel.close();
+ }
+ }
+
+ private void handleCloseSession()
+ {
+ storageManager.afterCompleteOperations(new IOAsyncTask()
+ {
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ public void done()
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
+ }
+ });
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+ {
+ Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+ {
+ Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
+
+ channel.send(packet);
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -152,8 +152,7 @@
ClientSessionFactory sf = createInVMFactory();
if (sendingBlocking)
- {
- sf.setBlockOnNonDurableSend(true);
+ { sf.setBlockOnNonDurableSend(true);
sf.setBlockOnDurableSend(true);
sf.setBlockOnAcknowledge(true);
}
@@ -523,7 +522,7 @@
}
session.close();
-
+
long globalSize = server.getPostOffice().getPagingManager().getTotalMemory();
Assert.assertEquals(0l, globalSize);
Assert.assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-01-20 13:24:55 UTC (rev 8810)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2010-01-20 14:38:53 UTC (rev 8811)
@@ -160,6 +160,8 @@
clientSession = sessionFactory.createSession(true, false, false);
+ log.info("committing");
+
clientSession.commit(xid, false);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
15 years, 11 months