[jboss-cvs] JBoss Messaging SVN: r6322 - in trunk/src/main/org/jboss/messaging/core: remoting/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 6 12:45:23 EDT 2009


Author: timfox
Date: 2009-04-06 12:45:23 -0400 (Mon, 06 Apr 2009)
New Revision: 6322

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
Log:
Large messages now use their own packet

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.message.impl.MessageImpl;
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
 import org.jboss.messaging.utils.SimpleString;
@@ -253,7 +254,7 @@
 
       final int bodySize = msg.getBodySize();
 
-      SessionSendMessage initialChunk = new SessionSendMessage(headerBuffer.array(), false);
+      SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.array(), false);
 
       channel.send(initialChunk);
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -52,6 +52,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -123,6 +124,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -823,6 +825,11 @@
             packet = new SessionSendMessage();
             break;
          }
+         case SESS_SEND_LARGE:
+         {
+            packet = new SessionSendLargeMessage();
+            break;
+         }
          case SESS_RECEIVE_MSG:
          {
             packet = new SessionReceiveMessage();

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -133,16 +133,18 @@
    public static final byte SESS_FLOWTOKEN = 70;
 
    public static final byte SESS_SEND = 71;
+   
+   public static final byte SESS_SEND_LARGE = 72;
 
-   public static final byte SESS_SEND_CONTINUATION = 72;
+   public static final byte SESS_SEND_CONTINUATION = 73;
 
-   public static final byte SESS_CONSUMER_CLOSE = 73;
+   public static final byte SESS_CONSUMER_CLOSE = 74;
 
-   public static final byte SESS_RECEIVE_MSG = 74;
+   public static final byte SESS_RECEIVE_MSG = 75;
 
-   public static final byte SESS_RECEIVE_CONTINUATION = 75;
+   public static final byte SESS_RECEIVE_CONTINUATION = 76;
 
-   public static final byte SESS_FAILOVER_COMPLETE = 76;
+   public static final byte SESS_FAILOVER_COMPLETE = 77;
    
    //Replication
 

Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -0,0 +1,135 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.DataConstants;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionSendLargeMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   /** Used only if largeMessage */
+   private byte[] largeMessageHeader;
+
+   /** We need to set the MessageID when replicating this on the server */
+   private long largeMessageId = -1;
+
+   private boolean requiresResponse;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public SessionSendLargeMessage(final byte[] largeMessageHeader, final boolean requiresResponse)
+   {
+      super(SESS_SEND_LARGE);
+
+      this.largeMessageHeader = largeMessageHeader;
+
+      this.requiresResponse = requiresResponse;
+   }
+
+   public SessionSendLargeMessage()
+   {
+      super(SESS_SEND_LARGE);
+   }
+
+   // Public --------------------------------------------------------
+
+   public byte[] getLargeMessageHeader()
+   {
+      return largeMessageHeader;
+   }
+
+   public boolean isRequiresResponse()
+   {
+      return requiresResponse;
+   }
+
+   /**
+    * @return the largeMessageId
+    */
+   public long getLargeMessageID()
+   {
+      return largeMessageId;
+   }
+
+   /**
+    * @param largeMessageId the largeMessageId to set
+    */
+   public void setLargeMessageID(long id)
+   {
+      this.largeMessageId = id;
+   }
+
+   @Override
+   public void encodeBody(final MessagingBuffer buffer)
+   {
+      buffer.writeInt(largeMessageHeader.length);
+      buffer.writeBytes(largeMessageHeader);
+      buffer.writeLong(largeMessageId);
+      buffer.writeBoolean(requiresResponse);
+   }
+
+   @Override
+   public void decodeBody(final MessagingBuffer buffer)
+   {
+      int largeMessageLength = buffer.readInt();
+
+      largeMessageHeader = new byte[largeMessageLength];
+
+      buffer.readBytes(largeMessageHeader);
+
+      largeMessageId = buffer.readLong();
+
+      requiresResponse = buffer.readBoolean();
+   }
+
+   public int getRequiredBufferSize()
+   {
+      int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+                 largeMessageHeader.length +
+                 DataConstants.SIZE_LONG +
+                 DataConstants.SIZE_BOOLEAN;
+
+      return size;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -43,18 +43,10 @@
 
    private Message clientMessage;
 
-   private boolean largeMessage;
-
-   /** Used only if largeMessage */
-   private byte[] largeMessageHeader;
-   
-   /** We need to set the MessageID when replicating this on the server */
-   private long largeMessageId = -1;
-
    private ServerMessage serverMessage;
 
    private boolean requiresResponse;
-   
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -66,21 +58,8 @@
       clientMessage = message;
 
       this.requiresResponse = requiresResponse;
-
-      largeMessage = false;
    }
 
-   public SessionSendMessage(final byte[] largeMessageHeader, final boolean requiresResponse)
-   {
-      super(SESS_SEND);
-
-      this.largeMessageHeader = largeMessageHeader;
-
-      this.requiresResponse = requiresResponse;
-
-      largeMessage = true;
-   }
-
    public SessionSendMessage()
    {
       super(SESS_SEND);
@@ -88,12 +67,6 @@
 
    // Public --------------------------------------------------------
 
-   
-   public boolean isLargeMessage()
-   {
-      return largeMessage;
-   }
-   
    public Message getClientMessage()
    {
       return clientMessage;
@@ -104,68 +77,16 @@
       return serverMessage;
    }
 
-   public byte[] getLargeMessageHeader()
-   {
-      return largeMessageHeader;
-   }
-
    public boolean isRequiresResponse()
    {
       return requiresResponse;
    }
-   
-   /**
-    * @return the largeMessageId
-    */
-   public long getMessageID()
-   {
-      if (largeMessage)
-      {
-         return largeMessageId;
-      }
-      else
-      {
-         return serverMessage.getMessageID();
-      }
-   }
 
-   /**
-    * @param largeMessageId the largeMessageId to set
-    */
-   public void setMessageID(long id)
-   {
-      if (largeMessage)
-      {
-         this.largeMessageId = id;
-      }
-      else
-      {
-         serverMessage.setMessageID(id);
-      }
-   }
-
    @Override
    public void encodeBody(final MessagingBuffer buffer)
    {
-      buffer.writeBoolean(largeMessage);
-
-      if (largeMessage)
+      if (clientMessage != null)
       {
-         buffer.writeInt(largeMessageHeader.length);
-         buffer.writeBytes(largeMessageHeader);
-         
-         if (largeMessageId > 0)
-         {
-            buffer.writeBoolean(true);
-            buffer.writeLong(largeMessageId);
-         }
-         else
-         {
-            buffer.writeBoolean(false);
-         }                  
-      }
-      else if (clientMessage != null)
-      {
          clientMessage.encode(buffer);
       }
       else
@@ -180,66 +101,22 @@
    @Override
    public void decodeBody(final MessagingBuffer buffer)
    {
-      largeMessage = buffer.readBoolean();
+      // TODO can be optimised
 
-      if (largeMessage)
-      {
-         int largeMessageLength = buffer.readInt();
+      serverMessage = new ServerMessageImpl();
 
-         largeMessageHeader = new byte[largeMessageLength];
+      clientMessage = serverMessage;
 
-         buffer.readBytes(largeMessageHeader);
-         
-         final boolean largeMessageIDFilled = buffer.readBoolean();
-         
-         if (largeMessageIDFilled)
-         {
-            this.largeMessageId = buffer.readLong();
-         }
-         else
-         {
-            this.largeMessageId = -1;
-         }
-                                       
-      }
-      else
-      {
-         // TODO can be optimised
+      serverMessage.decode(buffer);
 
-         serverMessage = new ServerMessageImpl();
-         
-         clientMessage = serverMessage;
+      serverMessage.getBody().resetReaderIndex();
 
-         serverMessage.decode(buffer);
-
-         serverMessage.getBody().resetReaderIndex();
-
-         requiresResponse = buffer.readBoolean();
-      }
+      requiresResponse = buffer.readBoolean();
    }
 
    public int getRequiredBufferSize()
    {
-      int size;
-      if (largeMessage)
-      {
-         size =  BASIC_PACKET_SIZE +
-                // IsLargeMessage
-                DataConstants.SIZE_BOOLEAN +
-                // BufferSize
-                DataConstants.SIZE_INT +
-                // Bytes sent
-                largeMessageHeader.length +
-                // LargeMessageID (if > 0) and a boolean statying if the largeMessageID is set
-                DataConstants.SIZE_BOOLEAN + (largeMessageId >= 0 ? DataConstants.SIZE_LONG : 0) + 
-                DataConstants.SIZE_BOOLEAN;
-      }
-      else
-      {
-         size =  DataConstants.SIZE_BOOLEAN + BASIC_PACKET_SIZE +
-                clientMessage.getEncodeSize() +
-                DataConstants.SIZE_BOOLEAN;
-      }
+      int size = BASIC_PACKET_SIZE + clientMessage.getEncodeSize() + DataConstants.SIZE_BOOLEAN;
 
       return size;
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -25,6 +25,7 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -32,11 +33,11 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -132,7 +133,7 @@
 
    void handleSend(SessionSendMessage packet);
 
-   void handleSendLargeMessage(SessionSendMessage packet);
+   void handleSendLargeMessage(SessionSendLargeMessage packet);
 
    void handleFailedOver(Packet packet);
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -1579,13 +1579,13 @@
    private class DelayedAddRedistributor implements Runnable
    {
       private final Executor executor;
-      
+
       private final Channel replicatingChannel;
 
       DelayedAddRedistributor(final Executor executor, final Channel replicatingChannel)
       {
          this.executor = executor;
-         
+
          this.replicatingChannel = replicatingChannel;
       }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -62,6 +62,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -914,7 +915,7 @@
       }
    }
 
-   public void handleSendLargeMessage(final SessionSendMessage packet)
+   public void handleSendLargeMessage(final SessionSendLargeMessage packet)
    {
       // need to create the LargeMessage before continue
       final LargeServerMessage msg = doCreateLargeMessage(packet);
@@ -2185,16 +2186,16 @@
    }
 
    /**
-    * We need to create the LargeMessage before replicating the packe, or else we won't know how to extract the destination,
+    * We need to create the LargeMessage before replicating the packet, or else we won't know how to extract the destination,
     * which is stored on the header
     * @param packet
     * @throws Exception
     */
-   private LargeServerMessage doCreateLargeMessage(final SessionSendMessage packet)
+   private LargeServerMessage doCreateLargeMessage(final SessionSendLargeMessage packet)
    {
       try
       {
-         return createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader());
+         return createLargeMessageStorage(packet.getLargeMessageHeader());
       }
       catch (Exception e)
       {
@@ -2233,7 +2234,7 @@
       channel.confirm(packet);
    }
 
-   private void doSendLargeMessage(final SessionSendMessage packet)
+   private void doSendLargeMessage(final SessionSendLargeMessage packet)
    {
       Packet response = null;
 
@@ -2410,7 +2411,7 @@
       }
    }
 
-   private LargeServerMessage createLargeMessageStorage(final long messageID, final byte[] header) throws Exception
+   private LargeServerMessage createLargeMessageStorage(final byte[] header) throws Exception
    {
       LargeServerMessage largeMessage = storageManager.createLargeMessage();
 
@@ -2418,9 +2419,6 @@
 
       largeMessage.decodeProperties(headerBuffer);
 
-      // client didn't send the ID originally
-      largeMessage.setMessageID(messageID);
-
       return largeMessage;
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java	2009-04-06 16:45:23 UTC (rev 6322)
@@ -12,14 +12,14 @@
 
 package org.jboss.messaging.core.server.impl;
 
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -28,6 +28,7 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -44,9 +45,9 @@
 import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
 
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.ChannelHandler;
 import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
@@ -55,11 +56,11 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -265,16 +266,15 @@
             case SESS_SEND:
             {
                SessionSendMessage message = (SessionSendMessage)packet;
-               if (message.isLargeMessage())
-               {
-                  session.handleSendLargeMessage(message);
-               }
-               else
-               {
-                  session.handleSend(message);
-               }
+               session.handleSend(message);               
                break;              
             }
+            case SESS_SEND_LARGE:
+            {
+               SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
+               session.handleSendLargeMessage(message);     
+               break;              
+            }
             case SESS_SEND_CONTINUATION:
             {
                SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;




More information about the jboss-cvs-commits mailing list