[hornetq-commits] JBoss hornetq SVN: r8813 - in trunk/src/main/org/hornetq/core/server: impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 20 10:08:29 EST 2010


Author: timfox
Date: 2010-01-20 10:08:28 -0500 (Wed, 20 Jan 2010)
New Revision: 8813

Modified:
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
Log:
Improved names in server session

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-20 14:53:17 UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-20 15:08:28 UTC (rev 8813)
@@ -43,73 +43,71 @@
 
    void removeConsumer(ServerConsumer consumer) throws Exception;
 
-   void close() throws Exception;
+   void acknowledge(long consumerID, long messageID) throws Exception;
 
-   void handleAcknowledge(long consumerID, long messageID) throws Exception;
+   void expire(long consumerID, long messageID) throws Exception;
 
-   void handleExpired(long consumerID, long messageID) throws Exception;
+   void rollback(boolean considerLastMessageAsDelivered) throws Exception;
 
-   void handleRollback(boolean considerLastMessageAsDelivered) throws Exception;
+   void commit() throws Exception;
 
-   void handleCommit() throws Exception;
+   void xaCommit(Xid xid, boolean onePhase) throws Exception;
 
-   void handleXACommit(Xid xid, boolean onePhase) throws Exception;
+   void xaEnd(Xid xid) throws Exception;
 
-   void handleXAEnd(Xid xid) throws Exception;
+   void xaForget(Xid xid) throws Exception;
 
-   void handleXAForget(Xid xid) throws Exception;
+   void xaJoin(Xid xid) throws Exception;
 
-   void handleXAJoin(Xid xid) throws Exception;
+   void xaPrepare(Xid xid) throws Exception;
 
-   void handleXAPrepare(Xid xid) throws Exception;
+   void xaResume(Xid xid) throws Exception;
 
-   void handleXAResume(Xid xid) throws Exception;
+   void xaRollback(Xid xid) throws Exception;
 
-   void handleXARollback(Xid xid) throws Exception;
+   void xaStart(Xid xid) throws Exception;
 
-   void handleXAStart(Xid xid) throws Exception;
+   void xaSuspend() throws Exception;
 
-   void handleXASuspend() throws Exception;
+   List<Xid> xaGetInDoubtXids();
 
-   List<Xid> handleGetInDoubtXids();
+   int xaGetTimeout();
 
-   int handleGetXATimeout();
+   void xaSetTimeout(int timeout);
 
-   void handleSetXATimeout(int timeout);
+   void start();
 
-   void handleStart();
+   void stop();
 
-   void handleStop();
-
-   void handleCreateQueue(SimpleString address,
+   void createQueue(SimpleString address,
                           SimpleString name,
                           SimpleString filterString,
                           boolean temporary,
                           boolean durable) throws Exception;
 
-   void handleDeleteQueue(SimpleString name) throws Exception;
+   void deleteQueue(SimpleString name) throws Exception;
 
-   void handleCreateConsumer(long consumerID, SimpleString name, SimpleString filterString, boolean browseOnly) throws Exception;
+   void createConsumer(long consumerID, SimpleString name, SimpleString filterString, boolean browseOnly) throws Exception;
 
-   QueueQueryResult handleExecuteQueueQuery(SimpleString name) throws Exception;
+   QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
 
-   BindingQueryResult handleExecuteBindingQuery(SimpleString address);
+   BindingQueryResult executeBindingQuery(SimpleString address);
 
-   void handleCloseConsumer(long consumerID) throws Exception;
+   void closeConsumer(long consumerID) throws Exception;
 
-   void handleReceiveConsumerCredits(long consumerID, int credits) throws Exception;
+   void receiveConsumerCredits(long consumerID, int credits) throws Exception;
 
-   void handleSendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
+   void sendContinuations(int packetSize, byte[] body, boolean continues) throws Exception;
 
-   void handleSend(ServerMessage message) throws Exception;
+   void send(ServerMessage message) throws Exception;
 
-   void handleSendLargeMessage(byte[] largeMessageHeader) throws Exception;
+   void sendLarge(byte[] largeMessageHeader) throws Exception;
 
-   void handleForceConsumerDelivery(long consumerID, long sequence) throws Exception;
+   void forceConsumerDelivery(long consumerID, long sequence) throws Exception;
 
-   void handleRequestProducerCredits(SimpleString address, int credits) throws Exception;
+   void requestProducerCredits(SimpleString address, int credits) throws Exception;
 
-   void handleClose() throws Exception;
+   void close() throws Exception;
 
    int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-20 14:53:17 UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-20 15:08:28 UTC (rev 8813)
@@ -84,7 +84,7 @@
 
    // Attributes ----------------------------------------------------------------------------
 
-   //private final long id;
+   // private final long id;
 
    private final String username;
 
@@ -101,7 +101,7 @@
    private final boolean strictUpdateDeliveryCount;
 
    private RemotingConnection remotingConnection;
-   
+
    private Channel channel;
 
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
@@ -131,14 +131,14 @@
    // The current currentLargeMessage being processed
    private volatile LargeServerMessage currentLargeMessage;
 
-  // private ServerSessionPacketHandler handler;
+   // private ServerSessionPacketHandler handler;
 
    private boolean closed;
 
    private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new HashMap<SimpleString, CreditManagerHolder>();
 
    private final RoutingContext routingContext = new RoutingContextImpl(null);
-   
+
    private SessionCallback callback;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -157,7 +157,7 @@
                             final StorageManager storageManager,
                             final PostOffice postOffice,
                             final ResourceManager resourceManager,
-                            final SecurityStore securityStore,                         
+                            final SecurityStore securityStore,
                             final ManagementService managementService,
                             final HornetQServer server,
                             final SimpleString managementAddress) throws Exception
@@ -175,7 +175,7 @@
       this.preAcknowledge = preAcknowledge;
 
       this.remotingConnection = remotingConnection;
-      
+
       this.channel = channel;
 
       this.storageManager = storageManager;
@@ -200,7 +200,7 @@
       this.server = server;
 
       this.managementAddress = managementAddress;
-      
+
       remotingConnection.addFailureListener(this);
 
       remotingConnection.addCloseListener(this);
@@ -212,7 +212,7 @@
    {
       this.callback = callback;
    }
-   
+
    public String getUsername()
    {
       return username;
@@ -246,7 +246,7 @@
       }
    }
 
-   public synchronized void close() throws Exception
+   private synchronized void doClose() throws Exception
    {
       if (tx != null && tx.getXid() == null)
       {
@@ -290,7 +290,7 @@
       }
    }
 
-   public void handleCreateConsumer(final long consumerID,
+   public void createConsumer(final long consumerID,
                                     final SimpleString name,
                                     final SimpleString filterString,
                                     final boolean browseOnly) throws Exception
@@ -347,7 +347,7 @@
       }
    }
 
-   public void handleCreateQueue(final SimpleString address,
+   public void createQueue(final SimpleString address,
                                  final SimpleString name,
                                  final SimpleString filterString,
                                  final boolean temporary,
@@ -398,7 +398,7 @@
       }
    }
 
-   public void handleDeleteQueue(final SimpleString name) throws Exception
+   public void deleteQueue(final SimpleString name) throws Exception
    {
       Binding binding = postOffice.getBinding(name);
 
@@ -417,7 +417,7 @@
       }
    }
 
-   public QueueQueryResult handleExecuteQueueQuery(final SimpleString name) throws Exception
+   public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception
    {
       if (name == null)
       {
@@ -457,7 +457,7 @@
       return response;
    }
 
-   public BindingQueryResult handleExecuteBindingQuery(final SimpleString address)
+   public BindingQueryResult executeBindingQuery(final SimpleString address)
    {
       if (address == null)
       {
@@ -479,21 +479,21 @@
       return new BindingQueryResult(!names.isEmpty(), names);
    }
 
-   public void handleForceConsumerDelivery(final long consumerID, final long sequence) throws Exception
+   public void forceConsumerDelivery(final long consumerID, final long sequence) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);
 
       consumer.forceDelivery(sequence);
    }
 
-   public void handleAcknowledge(final long consumerID, final long messageID) throws Exception
+   public void acknowledge(final long consumerID, final long messageID) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);
 
       consumer.acknowledge(autoCommitAcks, tx, messageID);
    }
 
-   public void handleExpired(final long consumerID, final long messageID) throws Exception
+   public void expire(final long consumerID, final long messageID) throws Exception
    {
       MessageReference ref = consumers.get(consumerID).getExpired(messageID);
 
@@ -503,7 +503,7 @@
       }
    }
 
-   public void handleCommit() throws Exception
+   public void commit() throws Exception
    {
       try
       {
@@ -515,12 +515,21 @@
       }
    }
 
-   public void handleRollback(final boolean considerLastMessageAsDelivered) throws Exception
+   public void rollback(final boolean considerLastMessageAsDelivered) throws Exception
    {
-      rollback(considerLastMessageAsDelivered);
+      if (tx == null)
+      {
+         // Might be null if XA
+
+         tx = new TransactionImpl(storageManager);
+      }
+
+      doRollback(considerLastMessageAsDelivered, tx);
+
+      tx = new TransactionImpl(storageManager);
    }
 
-   public void handleXACommit(final Xid xid, final boolean onePhase) throws Exception
+   public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
    {
       if (tx != null)
       {
@@ -568,7 +577,7 @@
       }
    }
 
-   public void handleXAEnd(final Xid xid) throws Exception
+   public void xaEnd(final Xid xid) throws Exception
    {
       if (tx != null && tx.getXid().equals(xid))
       {
@@ -613,7 +622,7 @@
       }
    }
 
-   public void handleXAForget(final Xid xid) throws Exception
+   public void xaForget(final Xid xid) throws Exception
    {
       long id = resourceManager.removeHeuristicCompletion(xid);
 
@@ -636,7 +645,7 @@
       }
    }
 
-   public void handleXAJoin(final Xid xid) throws Exception
+   public void xaJoin(final Xid xid) throws Exception
    {
       Transaction theTx = resourceManager.getTransaction(xid);
 
@@ -659,7 +668,7 @@
       }
    }
 
-   public void handleXAResume(final Xid xid) throws Exception
+   public void xaResume(final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -694,7 +703,7 @@
       }
    }
 
-   public void handleXARollback(final Xid xid) throws Exception
+   public void xaRollback(final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -743,7 +752,7 @@
       }
    }
 
-   public void handleXAStart(final Xid xid) throws Exception
+   public void xaStart(final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -766,7 +775,7 @@
       }
    }
 
-   public void handleXASuspend() throws Exception
+   public void xaSuspend() throws Exception
    {
       if (tx == null)
       {
@@ -791,7 +800,7 @@
       }
    }
 
-   public void handleXAPrepare(final Xid xid) throws Exception
+   public void xaPrepare(final Xid xid) throws Exception
    {
       if (tx != null)
       {
@@ -824,7 +833,7 @@
       }
    }
 
-   public List<Xid> handleGetInDoubtXids()
+   public List<Xid> xaGetInDoubtXids()
    {
       List<Xid> xids = new ArrayList<Xid>();
 
@@ -835,27 +844,27 @@
       return xids;
    }
 
-   public int handleGetXATimeout()
+   public int xaGetTimeout()
    {
       return resourceManager.getTimeoutSeconds();
    }
 
-   public void handleSetXATimeout(final int timeout)
+   public void xaSetTimeout(final int timeout)
    {
       resourceManager.setTimeoutSeconds(timeout);
    }
 
-   public void handleStart()
+   public void start()
    {
       setStarted(true);
    }
 
-   public void handleStop()
+   public void stop()
    {
       setStarted(false);
    }
 
-   public void handleClose()
+   public void close()
    {
       storageManager.afterCompleteOperations(new IOAsyncTask()
       {
@@ -867,7 +876,7 @@
          {
             try
             {
-               close();
+               doClose();
             }
             catch (Exception e)
             {
@@ -877,7 +886,7 @@
       });
    }
 
-   public void handleCloseConsumer(final long consumerID) throws Exception
+   public void closeConsumer(final long consumerID) throws Exception
    {
       final ServerConsumer consumer = consumers.get(consumerID);
 
@@ -891,7 +900,7 @@
       }
    }
 
-   public void handleReceiveConsumerCredits(final long consumerID, final int credits) throws Exception
+   public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception
    {
       ServerConsumer consumer = consumers.get(consumerID);
 
@@ -905,7 +914,7 @@
       consumer.receiveCredits(credits);
    }
 
-   public void handleSendLargeMessage(final byte[] largeMessageHeader) throws Exception
+   public void sendLarge(final byte[] largeMessageHeader) throws Exception
    {
       // need to create the LargeMessage before continue
       long id = storageManager.generateUniqueID();
@@ -920,7 +929,7 @@
       currentLargeMessage = msg;
    }
 
-   public void handleSend(final ServerMessage message) throws Exception
+   public void send(final ServerMessage message) throws Exception
    {
       try
       {
@@ -937,7 +946,7 @@
          }
          else
          {
-            send(message);
+            doSend(message);
          }
       }
       finally
@@ -953,7 +962,7 @@
       }
    }
 
-   public void handleSendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
+   public void sendContinuations(final int packetSize, final byte[] body, final boolean continues) throws Exception
    {
       if (currentLargeMessage == null)
       {
@@ -971,7 +980,7 @@
       {
          currentLargeMessage.releaseResources();
 
-         send(currentLargeMessage);
+         doSend(currentLargeMessage);
 
          releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
 
@@ -979,7 +988,7 @@
       }
    }
 
-   public void handleRequestProducerCredits(final SimpleString address, final int credits) throws Exception
+   public void requestProducerCredits(final SimpleString address, final int credits) throws Exception
    {
       final CreditManagerHolder holder = getCreditManagerHolder(address);
 
@@ -1017,7 +1026,7 @@
          }
       }
    }
-   
+
    public void setTransferring(final boolean transferring)
    {
       Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
@@ -1073,7 +1082,7 @@
    {
       return channel;
    }
-   
+
    public void runConnectionFailureRunners()
    {
       for (Runnable runner : failureRunners.values())
@@ -1110,7 +1119,7 @@
             }
          }
 
-         handleClose();
+         close();
 
          ServerSessionImpl.log.warn("Cleared up resources for session " + name);
       }
@@ -1160,7 +1169,7 @@
 
       started = s;
    }
-   
+
    private void handleManagementMessage(final ServerMessage message) throws Exception
    {
       try
@@ -1184,7 +1193,7 @@
       {
          reply.setAddress(replyTo);
 
-         send(reply);
+         doSend(reply);
       }
    }
 
@@ -1220,20 +1229,6 @@
       }
    }
 
-   private void rollback(final boolean lastMessageAsDelived) throws Exception
-   {
-      if (tx == null)
-      {
-         // Might be null if XA
-
-         tx = new TransactionImpl(storageManager);
-      }
-
-      doRollback(lastMessageAsDelived, tx);
-
-      tx = new TransactionImpl(storageManager);
-   }
-
    /*
     * The way flow producer flow control works is as follows:
     * The client can only send messages as long as it has credits. It requests credits from the server
@@ -1282,7 +1277,7 @@
       callback.sendProducerCreditsMessage(credits, address, -1);
    }
 
-   private void send(final ServerMessage msg) throws Exception
+   private void doSend(final ServerMessage msg) throws Exception
    {
       // Look up the paging store
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2010-01-20 14:53:17 UTC (rev 8812)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2010-01-20 15:08:28 UTC (rev 8813)
@@ -157,7 +157,14 @@
 
       session.runConnectionFailureRunners();
 
-      handleCloseSession();
+      try
+      {
+         session.close();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to close session", e);
+      }
 
       log.warn("Cleared up resources for session " + session.getName());
    }
@@ -166,7 +173,14 @@
    {
       channel.flushConfirmations();
 
-      handleCloseSession();
+      try
+      {
+         session.close();
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to close session", e);
+      }
    }
 
    public void connectionClosed()
@@ -249,15 +263,15 @@
                case SESS_CREATECONSUMER:
                {
                   SessionCreateConsumerMessage request = (SessionCreateConsumerMessage)packet;
-                  session.handleCreateConsumer(request.getID(),
-                                               request.getQueueName(),
-                                               request.getFilterString(),
-                                               request.isBrowseOnly());
+                  session.createConsumer(request.getID(),
+                                         request.getQueueName(),
+                                         request.getFilterString(),
+                                         request.isBrowseOnly());
                   if (request.isRequiresResponse())
                   {
                      // We send back queue information on the queue as a response- this allows the queue to
                      // be automaticall recreated on failover
-                     response = new SessionQueueQueryResponseMessage(session.handleExecuteQueueQuery(request.getQueueName()));
+                     response = new SessionQueueQueryResponseMessage(session.executeQueueQuery(request.getQueueName()));
                   }
 
                   break;
@@ -265,11 +279,11 @@
                case CREATE_QUEUE:
                {
                   CreateQueueMessage request = (CreateQueueMessage)packet;
-                  session.handleCreateQueue(request.getAddress(),
-                                            request.getQueueName(),
-                                            request.getFilterString(),
-                                            request.isTemporary(),
-                                            request.isDurable());
+                  session.createQueue(request.getAddress(),
+                                      request.getQueueName(),
+                                      request.getFilterString(),
+                                      request.isTemporary(),
+                                      request.isDurable());
                   if (request.isRequiresResponse())
                   {
                      response = new NullResponseMessage();
@@ -279,28 +293,28 @@
                case DELETE_QUEUE:
                {
                   SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
-                  session.handleDeleteQueue(request.getQueueName());
+                  session.deleteQueue(request.getQueueName());
                   response = new NullResponseMessage();
                   break;
                }
                case SESS_QUEUEQUERY:
                {
                   SessionQueueQueryMessage request = (SessionQueueQueryMessage)packet;
-                  QueueQueryResult result = session.handleExecuteQueueQuery(request.getQueueName());
+                  QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
                   response = new SessionQueueQueryResponseMessage(result);
                   break;
                }
                case SESS_BINDINGQUERY:
                {
                   SessionBindingQueryMessage request = (SessionBindingQueryMessage)packet;
-                  BindingQueryResult result = session.handleExecuteBindingQuery(request.getAddress());
+                  BindingQueryResult result = session.executeBindingQuery(request.getAddress());
                   response = new SessionBindingQueryResponseMessage(result.isExists(), result.getQueueNames());
                   break;
                }
                case SESS_ACKNOWLEDGE:
                {
                   SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
-                  session.handleAcknowledge(message.getConsumerID(), message.getMessageID());
+                  session.acknowledge(message.getConsumerID(), message.getMessageID());
                   if (message.isRequiresResponse())
                   {
                      response = new NullResponseMessage();
@@ -310,116 +324,116 @@
                case SESS_EXPIRED:
                {
                   SessionExpiredMessage message = (SessionExpiredMessage)packet;
-                  session.handleExpired(message.getConsumerID(), message.getMessageID());
+                  session.expire(message.getConsumerID(), message.getMessageID());
                   break;
                }
                case SESS_COMMIT:
                {
-                  session.handleCommit();
+                  session.commit();
                   response = new NullResponseMessage();
                   break;
                }
                case SESS_ROLLBACK:
                {
-                  session.handleRollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
+                  session.rollback(((RollbackMessage)packet).isConsiderLastMessageAsDelivered());
                   response = new NullResponseMessage();
                   break;
                }
                case SESS_XA_COMMIT:
                {
                   SessionXACommitMessage message = (SessionXACommitMessage)packet;
-                  session.handleXACommit(message.getXid(), message.isOnePhase());
+                  session.xaCommit(message.getXid(), message.isOnePhase());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_END:
                {
                   SessionXAEndMessage message = (SessionXAEndMessage)packet;
-                  session.handleXAEnd(message.getXid());
+                  session.xaEnd(message.getXid());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_FORGET:
                {
                   SessionXAForgetMessage message = (SessionXAForgetMessage)packet;
-                  session.handleXAForget(message.getXid());
+                  session.xaForget(message.getXid());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_JOIN:
                {
                   SessionXAJoinMessage message = (SessionXAJoinMessage)packet;
-                  session.handleXAJoin(message.getXid());
+                  session.xaJoin(message.getXid());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_RESUME:
                {
                   SessionXAResumeMessage message = (SessionXAResumeMessage)packet;
-                  session.handleXAResume(message.getXid());
+                  session.xaResume(message.getXid());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_ROLLBACK:
                {
                   SessionXARollbackMessage message = (SessionXARollbackMessage)packet;
-                  session.handleXARollback(message.getXid());
+                  session.xaRollback(message.getXid());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_START:
                {
                   SessionXAStartMessage message = (SessionXAStartMessage)packet;
-                  session.handleXAStart(message.getXid());
+                  session.xaStart(message.getXid());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_SUSPEND:
                {
-                  session.handleXASuspend();
+                  session.xaSuspend();
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_PREPARE:
                {
                   SessionXAPrepareMessage message = (SessionXAPrepareMessage)packet;
-                  session.handleXAPrepare(message.getXid());
+                  session.xaPrepare(message.getXid());
                   response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
                   break;
                }
                case SESS_XA_INDOUBT_XIDS:
                {
-                  List<Xid> xids = session.handleGetInDoubtXids();
+                  List<Xid> xids = session.xaGetInDoubtXids();
                   response = new SessionXAGetInDoubtXidsResponseMessage(xids);
                   break;
                }
                case SESS_XA_GET_TIMEOUT:
                {
-                  int timeout = session.handleGetXATimeout();
+                  int timeout = session.xaGetTimeout();
                   response = new SessionXAGetTimeoutResponseMessage(timeout);
                   break;
                }
                case SESS_XA_SET_TIMEOUT:
                {
                   SessionXASetTimeoutMessage message = (SessionXASetTimeoutMessage)packet;
-                  session.handleSetXATimeout(message.getTimeoutSeconds());
+                  session.xaSetTimeout(message.getTimeoutSeconds());
                   response = new SessionXASetTimeoutResponseMessage(true);
                   break;
                }
                case SESS_START:
                {
-                  session.handleStart();
+                  session.start();
                   break;
                }
                case SESS_STOP:
                {
-                  session.handleStop();
+                  session.stop();
                   response = new NullResponseMessage();
                   break;
                }
                case SESS_CLOSE:
                {
-                  handleCloseSession();
+                  session.close();
                   removeConnectionListeners();
                   response = new NullResponseMessage();
                   flush = true;
@@ -429,20 +443,20 @@
                case SESS_CONSUMER_CLOSE:
                {
                   SessionConsumerCloseMessage message = (SessionConsumerCloseMessage)packet;
-                  session.handleCloseConsumer(message.getConsumerID());
+                  session.closeConsumer(message.getConsumerID());
                   response = new NullResponseMessage();
                   break;
                }
                case SESS_FLOWTOKEN:
                {
                   SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage)packet;
-                  session.handleReceiveConsumerCredits(message.getConsumerID(), message.getCredits());
+                  session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
                   break;
                }
                case SESS_SEND:
                {
                   SessionSendMessage message = (SessionSendMessage)packet;
-                  session.handleSend((ServerMessage)message.getMessage());
+                  session.send((ServerMessage)message.getMessage());
                   if (message.isRequiresResponse())
                   {
                      response = new NullResponseMessage();
@@ -452,13 +466,13 @@
                case SESS_SEND_LARGE:
                {
                   SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
-                  session.handleSendLargeMessage(message.getLargeMessageHeader());
+                  session.sendLarge(message.getLargeMessageHeader());
                   break;
                }
                case SESS_SEND_CONTINUATION:
                {
                   SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
-                  session.handleSendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
+                  session.sendContinuations(message.getPacketSize(), message.getBody(), message.isContinues());
                   if (message.isRequiresResponse())
                   {
                      response = new NullResponseMessage();
@@ -468,13 +482,13 @@
                case SESS_FORCE_CONSUMER_DELIVERY:
                {
                   SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
-                  session.handleForceConsumerDelivery(message.getConsumerID(), message.getSequence());
+                  session.forceConsumerDelivery(message.getConsumerID(), message.getSequence());
                   break;
                }
                case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
                {
                   SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)packet;
-                  session.handleRequestProducerCredits(message.getAddress(), message.getCredits());
+                  session.requestProducerCredits(message.getAddress(), message.getCredits());
                   break;
                }
             }
@@ -491,7 +505,7 @@
          {
             log.error("Caught unexpected exception", t);
          }
-         
+
          sendResponse(packet, response, flush, closeChannel);
       }
       finally
@@ -551,28 +565,6 @@
       }
    }
 
-   private void handleCloseSession()
-   {
-      storageManager.afterCompleteOperations(new IOAsyncTask()
-      {
-         public void onError(int errorCode, String errorMessage)
-         {
-         }
-
-         public void done()
-         {
-            try
-            {
-               session.close();
-            }
-            catch (Exception e)
-            {
-               log.error("Failed to close session", e);
-            }
-         }
-      });
-   }
-
    public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
    {
       Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);



More information about the hornetq-commits mailing list