Author: timfox
Date: 2010-01-20 10:08:28 -0500 (Wed, 20 Jan 2010)
New Revision: 8813
Modified:
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Log:
Improved names in server session
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 14:53:17 UTC (rev
8812)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-20 15:08:28 UTC (rev
8813)
@@ -43,73 +43,71 @@
void removeConsumer(ServerConsumer consumer) throws Exception;
- void close() throws Exception;
+ void acknowledge(long consumerID, long messageID) throws Exception;
- void handleAcknowledge(long consumerID, long messageID) throws Exception;
+ void expire(long consumerID, long messageID) throws Exception;
- void handleExpired(long consumerID, long messageID) throws Exception;
+ void rollback(boolean considerLastMessageAsDelivered) throws Exception;
- void handleRollback(boolean considerLastMessageAsDelivered) throws Exception;
+ void commit() throws Exception;
- void handleCommit() throws Exception;
+ void xaCommit(Xid xid, boolean onePhase) throws Exception;
- void handleXACommit(Xid xid, boolean onePhase) throws Exception;
+ void xaEnd(Xid xid) throws Exception;
- void handleXAEnd(Xid xid) throws Exception;
+ void xaForget(Xid xid) throws Exception;
- void handleXAForget(Xid xid) throws Exception;
+ void xaJoin(Xid xid) throws Exception;
- void handleXAJoin(Xid xid) throws Exception;
+ void xaPrepare(Xid xid) throws Exception;
- void handleXAPrepare(Xid xid) throws Exception;
+ void xaResume(Xid xid) throws Exception;
- void handleXAResume(Xid xid) throws Exception;
+ void xaRollback(Xid xid) throws Exception;
- void handleXARollback(Xid xid) throws Exception;
+ void xaStart(Xid xid) throws Exception;
- void handleXAStart(Xid xid) throws Exception;
+ void xaSuspend() throws Exception;
- void handleXASuspend() throws Exception;
+ List<Xid> xaGetInDoubtXids();
- List<Xid> handleGetInDoubtXids();
+ int xaGetTimeout();
- int handleGetXATimeout();
+ void xaSetTimeout(int timeout);
- void handleSetXATimeout(int timeout);
+ void start();
- void handleStart();
+ void stop();
- void handleStop();
-
- void handleCreateQueue(SimpleString address,
+ void createQueue(SimpleString address,
SimpleString name,
SimpleString filterString,
boolean temporary,
boolean durable) throws Exception;
- void handleDeleteQueue(SimpleString name) throws Exception;
+ void deleteQueue(SimpleString name) throws Exception;
- void handleCreateConsumer(long consumerID, SimpleString name, SimpleString
filterString, boolean browseOnly) throws Exception;
+ void createConsumer(long consumerID, SimpleString name, SimpleString filterString,
boolean browseOnly) throws Exception;
- QueueQueryResult handleExecuteQueueQuery(SimpleString name) throws Exception;
+ QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
- BindingQueryResult handleExecuteBindingQuery(SimpleString address);
+ BindingQueryResult executeBindingQuery(SimpleString address);
- void handleCloseConsumer(long consumerID) throws Exception;
+ void closeConsumer(long consumerID) throws Exception;
- void handleReceiveConsumerCredits(long consumerID, int credits) throws Exception;
+ void receiveConsumerCredits(long consumerID, int credits) throws Exception;
- void handleSendContinuations(int packetSize, byte[] body, boolean continues) throws
Exception;
+ void sendContinuations(int packetSize, byte[] body, boolean continues) throws
Exception;
- void handleSend(ServerMessage message) throws Exception;
+ void send(ServerMessage message) throws Exception;
- void handleSendLargeMessage(byte[] largeMessageHeader) throws Exception;
+ void sendLarge(byte[] largeMessageHeader) throws Exception;
- void handleForceConsumerDelivery(long consumerID, long sequence) throws Exception;
+ void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
- void handleRequestProducerCredits(SimpleString address, int credits) throws
Exception;
+ void requestProducerCredits(SimpleString address, int credits) throws Exception;
- void handleClose() throws Exception;
+ void close() throws Exception;
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 14:53:17
UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-20 15:08:28
UTC (rev 8813)
@@ -84,7 +84,7 @@
// Attributes
----------------------------------------------------------------------------
- //private final long id;
+ // private final long id;
private final String username;
@@ -101,7 +101,7 @@
private final boolean strictUpdateDeliveryCount;
private RemotingConnection remotingConnection;
-
+
private Channel channel;
private final Map<Long, ServerConsumer> consumers = new
ConcurrentHashMap<Long, ServerConsumer>();
@@ -131,14 +131,14 @@
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
- // private ServerSessionPacketHandler handler;
+ // private ServerSessionPacketHandler handler;
private boolean closed;
private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new
HashMap<SimpleString, CreditManagerHolder>();
private final RoutingContext routingContext = new RoutingContextImpl(null);
-
+
private SessionCallback callback;
// Constructors
---------------------------------------------------------------------------------
@@ -157,7 +157,7 @@
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
- final SecurityStore securityStore,
+ final SecurityStore securityStore,
final ManagementService managementService,
final HornetQServer server,
final SimpleString managementAddress) throws Exception
@@ -175,7 +175,7 @@
this.preAcknowledge = preAcknowledge;
this.remotingConnection = remotingConnection;
-
+
this.channel = channel;
this.storageManager = storageManager;
@@ -200,7 +200,7 @@
this.server = server;
this.managementAddress = managementAddress;
-
+
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -212,7 +212,7 @@
{
this.callback = callback;
}
-
+
public String getUsername()
{
return username;
@@ -246,7 +246,7 @@
}
}
- public synchronized void close() throws Exception
+ private synchronized void doClose() throws Exception
{
if (tx != null && tx.getXid() == null)
{
@@ -290,7 +290,7 @@
}
}
- public void handleCreateConsumer(final long consumerID,
+ public void createConsumer(final long consumerID,
final SimpleString name,
final SimpleString filterString,
final boolean browseOnly) throws Exception
@@ -347,7 +347,7 @@
}
}
- public void handleCreateQueue(final SimpleString address,
+ public void createQueue(final SimpleString address,
final SimpleString name,
final SimpleString filterString,
final boolean temporary,
@@ -398,7 +398,7 @@
}
}
- public void handleDeleteQueue(final SimpleString name) throws Exception
+ public void deleteQueue(final SimpleString name) throws Exception
{
Binding binding = postOffice.getBinding(name);
@@ -417,7 +417,7 @@
}
}
- public QueueQueryResult handleExecuteQueueQuery(final SimpleString name) throws
Exception
+ public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
{
if (name == null)
{
@@ -457,7 +457,7 @@
return response;
}
- public BindingQueryResult handleExecuteBindingQuery(final SimpleString address)
+ public BindingQueryResult executeBindingQuery(final SimpleString address)
{
if (address == null)
{
@@ -479,21 +479,21 @@
return new BindingQueryResult(!names.isEmpty(), names);
}
- public void handleForceConsumerDelivery(final long consumerID, final long sequence)
throws Exception
+ public void forceConsumerDelivery(final long consumerID, final long sequence) throws
Exception
{
ServerConsumer consumer = consumers.get(consumerID);
consumer.forceDelivery(sequence);
}
- public void handleAcknowledge(final long consumerID, final long messageID) throws
Exception
+ public void acknowledge(final long consumerID, final long messageID) throws Exception
{
ServerConsumer consumer = consumers.get(consumerID);
consumer.acknowledge(autoCommitAcks, tx, messageID);
}
- public void handleExpired(final long consumerID, final long messageID) throws
Exception
+ public void expire(final long consumerID, final long messageID) throws Exception
{
MessageReference ref = consumers.get(consumerID).getExpired(messageID);
@@ -503,7 +503,7 @@
}
}
- public void handleCommit() throws Exception
+ public void commit() throws Exception
{
try
{
@@ -515,12 +515,21 @@
}
}
- public void handleRollback(final boolean considerLastMessageAsDelivered) throws
Exception
+ public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
{
- rollback(considerLastMessageAsDelivered);
+ if (tx == null)
+ {
+ // Might be null if XA
+
+ tx = new TransactionImpl(storageManager);
+ }
+
+ doRollback(considerLastMessageAsDelivered, tx);
+
+ tx = new TransactionImpl(storageManager);
}
- public void handleXACommit(final Xid xid, final boolean onePhase) throws Exception
+ public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
{
if (tx != null)
{
@@ -568,7 +577,7 @@
}
}
- public void handleXAEnd(final Xid xid) throws Exception
+ public void xaEnd(final Xid xid) throws Exception
{
if (tx != null && tx.getXid().equals(xid))
{
@@ -613,7 +622,7 @@
}
}
- public void handleXAForget(final Xid xid) throws Exception
+ public void xaForget(final Xid xid) throws Exception
{
long id = resourceManager.removeHeuristicCompletion(xid);
@@ -636,7 +645,7 @@
}
}
- public void handleXAJoin(final Xid xid) throws Exception
+ public void xaJoin(final Xid xid) throws Exception
{
Transaction theTx = resourceManager.getTransaction(xid);
@@ -659,7 +668,7 @@
}
}
- public void handleXAResume(final Xid xid) throws Exception
+ public void xaResume(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -694,7 +703,7 @@
}
}
- public void handleXARollback(final Xid xid) throws Exception
+ public void xaRollback(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -743,7 +752,7 @@
}
}
- public void handleXAStart(final Xid xid) throws Exception
+ public void xaStart(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -766,7 +775,7 @@
}
}
- public void handleXASuspend() throws Exception
+ public void xaSuspend() throws Exception
{
if (tx == null)
{
@@ -791,7 +800,7 @@
}
}
- public void handleXAPrepare(final Xid xid) throws Exception
+ public void xaPrepare(final Xid xid) throws Exception
{
if (tx != null)
{
@@ -824,7 +833,7 @@
}
}
- public List<Xid> handleGetInDoubtXids()
+ public List<Xid> xaGetInDoubtXids()
{
List<Xid> xids = new ArrayList<Xid>();
@@ -835,27 +844,27 @@
return xids;
}
- public int handleGetXATimeout()
+ public int xaGetTimeout()
{
return resourceManager.getTimeoutSeconds();
}
- public void handleSetXATimeout(final int timeout)
+ public void xaSetTimeout(final int timeout)
{
resourceManager.setTimeoutSeconds(timeout);
}
- public void handleStart()
+ public void start()
{
setStarted(true);
}
- public void handleStop()
+ public void stop()
{
setStarted(false);
}
- public void handleClose()
+ public void close()
{
storageManager.afterCompleteOperations(new IOAsyncTask()
{
@@ -867,7 +876,7 @@
{
try
{
- close();
+ doClose();
}
catch (Exception e)
{
@@ -877,7 +886,7 @@
});
}
- public void handleCloseConsumer(final long consumerID) throws Exception
+ public void closeConsumer(final long consumerID) throws Exception
{
final ServerConsumer consumer = consumers.get(consumerID);
@@ -891,7 +900,7 @@
}
}
- public void handleReceiveConsumerCredits(final long consumerID, final int credits)
throws Exception
+ public void receiveConsumerCredits(final long consumerID, final int credits) throws
Exception
{
ServerConsumer consumer = consumers.get(consumerID);
@@ -905,7 +914,7 @@
consumer.receiveCredits(credits);
}
- public void handleSendLargeMessage(final byte[] largeMessageHeader) throws Exception
+ public void sendLarge(final byte[] largeMessageHeader) throws Exception
{
// need to create the LargeMessage before continue
long id = storageManager.generateUniqueID();
@@ -920,7 +929,7 @@
currentLargeMessage = msg;
}
- public void handleSend(final ServerMessage message) throws Exception
+ public void send(final ServerMessage message) throws Exception
{
try
{
@@ -937,7 +946,7 @@
}
else
{
- send(message);
+ doSend(message);
}
}
finally
@@ -953,7 +962,7 @@
}
}
- public void handleSendContinuations(final int packetSize, final byte[] body, final
boolean continues) throws Exception
+ public void sendContinuations(final int packetSize, final byte[] body, final boolean
continues) throws Exception
{
if (currentLargeMessage == null)
{
@@ -971,7 +980,7 @@
{
currentLargeMessage.releaseResources();
- send(currentLargeMessage);
+ doSend(currentLargeMessage);
releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
@@ -979,7 +988,7 @@
}
}
- public void handleRequestProducerCredits(final SimpleString address, final int
credits) throws Exception
+ public void requestProducerCredits(final SimpleString address, final int credits)
throws Exception
{
final CreditManagerHolder holder = getCreditManagerHolder(address);
@@ -1017,7 +1026,7 @@
}
}
}
-
+
public void setTransferring(final boolean transferring)
{
Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
@@ -1073,7 +1082,7 @@
{
return channel;
}
-
+
public void runConnectionFailureRunners()
{
for (Runnable runner : failureRunners.values())
@@ -1110,7 +1119,7 @@
}
}
- handleClose();
+ close();
ServerSessionImpl.log.warn("Cleared up resources for session " +
name);
}
@@ -1160,7 +1169,7 @@
started = s;
}
-
+
private void handleManagementMessage(final ServerMessage message) throws Exception
{
try
@@ -1184,7 +1193,7 @@
{
reply.setAddress(replyTo);
- send(reply);
+ doSend(reply);
}
}
@@ -1220,20 +1229,6 @@
}
}
- private void rollback(final boolean lastMessageAsDelived) throws Exception
- {
- if (tx == null)
- {
- // Might be null if XA
-
- tx = new TransactionImpl(storageManager);
- }
-
- doRollback(lastMessageAsDelived, tx);
-
- tx = new TransactionImpl(storageManager);
- }
-
/*
* The way flow producer flow control works is as follows:
* The client can only send messages as long as it has credits. It requests credits
from the server
@@ -1282,7 +1277,7 @@
callback.sendProducerCreditsMessage(credits, address, -1);
}
- private void send(final ServerMessage msg) throws Exception
+ private void doSend(final ServerMessage msg) throws Exception
{
// Look up the paging store
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20
14:53:17 UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2010-01-20
15:08:28 UTC (rev 8813)
@@ -157,7 +157,14 @@
session.runConnectionFailureRunners();
- handleCloseSession();
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
log.warn("Cleared up resources for session " + session.getName());
}
@@ -166,7 +173,14 @@
{
channel.flushConfirmations();
- handleCloseSession();
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
}
public void connectionClosed()
@@ -249,15 +263,15 @@
case SESS_CREATECONSUMER:
{
SessionCreateConsumerMessage request =
(SessionCreateConsumerMessage)packet;
- session.handleCreateConsumer(request.getID(),
- request.getQueueName(),
- request.getFilterString(),
- request.isBrowseOnly());
+ session.createConsumer(request.getID(),
+ request.getQueueName(),
+ request.getFilterString(),
+ request.isBrowseOnly());
if (request.isRequiresResponse())
{
// We send back queue information on the queue as a response- this
allows the queue to
// be automaticall recreated on failover
- response = new
SessionQueueQueryResponseMessage(session.handleExecuteQueueQuery(request.getQueueName()));
+ response = new
SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName()));
}
break;
@@ -265,11 +279,11 @@
case CREATE_QUEUE:
{
CreateQueueMessage request = (CreateQueueMessage)packet;
- session.handleCreateQueue(request.getAddress(),
- request.getQueueName(),
- request.getFilterString(),
- request.isTemporary(),
- request.isDurable());
+ session.createQueue(request.getAddress(),
+ request.getQueueName(),
+ request.getFilterString(),
+ request.isTemporary(),
+ request.isDurable());
if (request.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -279,28 +293,28 @@
case DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
- session.handleDeleteQueue(request.getQueueName());
+ session.deleteQueue(request.getQueueName());
response = new NullResponseMessage();
break;
}
case SESS_QUEUEQUERY:
{
SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
- QueueQueryResult result =
session.handleExecuteQueueQuery(request.getQueueName());
+ QueueQueryResult result =
session.executeQueueQuery(request.getQueueName());
response = new SessionQueueQueryResponseMessage(result);
break;
}
case SESS_BINDINGQUERY:
{
SessionBindingQueryMessage request =
(SessionBindingQueryMessage)packet;
- BindingQueryResult result =
session.handleExecuteBindingQuery(request.getAddress());
+ BindingQueryResult result =
session.executeBindingQuery(request.getAddress());
response = new SessionBindingQueryResponseMessage(result.isExists(),
result.getQueueNames());
break;
}
case SESS_ACKNOWLEDGE:
{
SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
- session.handleAcknowledge(message.getConsumerID(),
message.getMessageID());
+ session.acknowledge(message.getConsumerID(), message.getMessageID());
if (message.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -310,116 +324,116 @@
case SESS_EXPIRED:
{
SessionExpiredMessage message = (SessionExpiredMessage)packet;
- session.handleExpired(message.getConsumerID(),
message.getMessageID());
+ session.expire(message.getConsumerID(), message.getMessageID());
break;
}
case SESS_COMMIT:
{
- session.handleCommit();
+ session.commit();
response = new NullResponseMessage();
break;
}
case SESS_ROLLBACK:
{
-
session.handleRollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
+
session.rollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
response = new NullResponseMessage();
break;
}
case SESS_XA_COMMIT:
{
SessionXACommitMessage message = (SessionXACommitMessage)packet;
- session.handleXACommit(message.getXid(), message.isOnePhase());
+ session.xaCommit(message.getXid(), message.isOnePhase());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_END:
{
SessionXAEndMessage message = (SessionXAEndMessage)packet;
- session.handleXAEnd(message.getXid());
+ session.xaEnd(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_FORGET:
{
SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
- session.handleXAForget(message.getXid());
+ session.xaForget(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_JOIN:
{
SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
- session.handleXAJoin(message.getXid());
+ session.xaJoin(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_RESUME:
{
SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
- session.handleXAResume(message.getXid());
+ session.xaResume(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_ROLLBACK:
{
SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
- session.handleXARollback(message.getXid());
+ session.xaRollback(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_START:
{
SessionXAStartMessage message = (SessionXAStartMessage)packet;
- session.handleXAStart(message.getXid());
+ session.xaStart(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_SUSPEND:
{
- session.handleXASuspend();
+ session.xaSuspend();
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_PREPARE:
{
SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
- session.handleXAPrepare(message.getXid());
+ session.xaPrepare(message.getXid());
response = new SessionXAResponseMessage(false, XAResource.XA_OK,
null);
break;
}
case SESS_XA_INDOUBT_XIDS:
{
- List<Xid> xids = session.handleGetInDoubtXids();
+ List<Xid> xids = session.xaGetInDoubtXids();
response = new SessionXAGetInDoubtXidsResponseMessage(xids);
break;
}
case SESS_XA_GET_TIMEOUT:
{
- int timeout = session.handleGetXATimeout();
+ int timeout = session.xaGetTimeout();
response = new SessionXAGetTimeoutResponseMessage(timeout);
break;
}
case SESS_XA_SET_TIMEOUT:
{
SessionXASetTimeoutMessage message =
(SessionXASetTimeoutMessage)packet;
- session.handleSetXATimeout(message.getTimeoutSeconds());
+ session.xaSetTimeout(message.getTimeoutSeconds());
response = new SessionXASetTimeoutResponseMessage(true);
break;
}
case SESS_START:
{
- session.handleStart();
+ session.start();
break;
}
case SESS_STOP:
{
- session.handleStop();
+ session.stop();
response = new NullResponseMessage();
break;
}
case SESS_CLOSE:
{
- handleCloseSession();
+ session.close();
removeConnectionListeners();
response = new NullResponseMessage();
flush = true;
@@ -429,20 +443,20 @@
case SESS_CONSUMER_CLOSE:
{
SessionConsumerCloseMessage message =
(SessionConsumerCloseMessage)packet;
- session.handleCloseConsumer(message.getConsumerID());
+ session.closeConsumer(message.getConsumerID());
response = new NullResponseMessage();
break;
}
case SESS_FLOWTOKEN:
{
SessionConsumerFlowCreditMessage message =
(SessionConsumerFlowCreditMessage)packet;
- session.handleReceiveConsumerCredits(message.getConsumerID(),
message.getCredits());
+ session.receiveConsumerCredits(message.getConsumerID(),
message.getCredits());
break;
}
case SESS_SEND:
{
SessionSendMessage message = (SessionSendMessage)packet;
- session.handleSend((ServerMessage)message.getMessage());
+ session.send((ServerMessage)message.getMessage());
if (message.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -452,13 +466,13 @@
case SESS_SEND_LARGE:
{
SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
- session.handleSendLargeMessage(message.getLargeMessageHeader());
+ session.sendLarge(message.getLargeMessageHeader());
break;
}
case SESS_SEND_CONTINUATION:
{
SessionSendContinuationMessage message =
(SessionSendContinuationMessage)packet;
- session.handleSendContinuations(message.getPacketSize(),
message.getBody(), message.isContinues());
+ session.sendContinuations(message.getPacketSize(), message.getBody(),
message.isContinues());
if (message.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -468,13 +482,13 @@
case SESS_FORCE_CONSUMER_DELIVERY:
{
SessionForceConsumerDelivery message =
(SessionForceConsumerDelivery)packet;
- session.handleForceConsumerDelivery(message.getConsumerID(),
message.getSequence());
+ session.forceConsumerDelivery(message.getConsumerID(),
message.getSequence());
break;
}
case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
{
SessionRequestProducerCreditsMessage message =
(SessionRequestProducerCreditsMessage)packet;
- session.handleRequestProducerCredits(message.getAddress(),
message.getCredits());
+ session.requestProducerCredits(message.getAddress(),
message.getCredits());
break;
}
}
@@ -491,7 +505,7 @@
{
log.error("Caught unexpected exception", t);
}
-
+
sendResponse(packet, response, flush, closeChannel);
}
finally
@@ -551,28 +565,6 @@
}
}
- private void handleCloseSession()
- {
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- public void done()
- {
- try
- {
- session.close();
- }
- catch (Exception e)
- {
- log.error("Failed to close session", e);
- }
- }
- });
- }
-
public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount)
{
Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize,
deliveryCount);