[jboss-cvs] JBoss Messaging SVN: r6322 - in trunk/src/main/org/jboss/messaging/core: remoting/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 6 12:45:23 EDT 2009
Author: timfox
Date: 2009-04-06 12:45:23 -0400 (Mon, 06 Apr 2009)
New Revision: 6322
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
Log:
Large messages now use their own packet
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.SimpleString;
@@ -253,7 +254,7 @@
final int bodySize = msg.getBodySize();
- SessionSendMessage initialChunk = new SessionSendMessage(headerBuffer.array(), false);
+ SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.array(), false);
channel.send(initialChunk);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -52,6 +52,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -123,6 +124,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -823,6 +825,11 @@
packet = new SessionSendMessage();
break;
}
+ case SESS_SEND_LARGE:
+ {
+ packet = new SessionSendLargeMessage();
+ break;
+ }
case SESS_RECEIVE_MSG:
{
packet = new SessionReceiveMessage();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -133,16 +133,18 @@
public static final byte SESS_FLOWTOKEN = 70;
public static final byte SESS_SEND = 71;
+
+ public static final byte SESS_SEND_LARGE = 72;
- public static final byte SESS_SEND_CONTINUATION = 72;
+ public static final byte SESS_SEND_CONTINUATION = 73;
- public static final byte SESS_CONSUMER_CLOSE = 73;
+ public static final byte SESS_CONSUMER_CLOSE = 74;
- public static final byte SESS_RECEIVE_MSG = 74;
+ public static final byte SESS_RECEIVE_MSG = 75;
- public static final byte SESS_RECEIVE_CONTINUATION = 75;
+ public static final byte SESS_RECEIVE_CONTINUATION = 76;
- public static final byte SESS_FAILOVER_COMPLETE = 76;
+ public static final byte SESS_FAILOVER_COMPLETE = 77;
//Replication
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendLargeMessage.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -0,0 +1,135 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.DataConstants;
+
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class SessionSendLargeMessage extends PacketImpl
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ /** Used only if largeMessage */
+ private byte[] largeMessageHeader;
+
+ /** We need to set the MessageID when replicating this on the server */
+ private long largeMessageId = -1;
+
+ private boolean requiresResponse;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionSendLargeMessage(final byte[] largeMessageHeader, final boolean requiresResponse)
+ {
+ super(SESS_SEND_LARGE);
+
+ this.largeMessageHeader = largeMessageHeader;
+
+ this.requiresResponse = requiresResponse;
+ }
+
+ public SessionSendLargeMessage()
+ {
+ super(SESS_SEND_LARGE);
+ }
+
+ // Public --------------------------------------------------------
+
+ public byte[] getLargeMessageHeader()
+ {
+ return largeMessageHeader;
+ }
+
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
+
+ /**
+ * @return the largeMessageId
+ */
+ public long getLargeMessageID()
+ {
+ return largeMessageId;
+ }
+
+ /**
+ * @param largeMessageId the largeMessageId to set
+ */
+ public void setLargeMessageID(long id)
+ {
+ this.largeMessageId = id;
+ }
+
+ @Override
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.writeInt(largeMessageHeader.length);
+ buffer.writeBytes(largeMessageHeader);
+ buffer.writeLong(largeMessageId);
+ buffer.writeBoolean(requiresResponse);
+ }
+
+ @Override
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ int largeMessageLength = buffer.readInt();
+
+ largeMessageHeader = new byte[largeMessageLength];
+
+ buffer.readBytes(largeMessageHeader);
+
+ largeMessageId = buffer.readLong();
+
+ requiresResponse = buffer.readBoolean();
+ }
+
+ public int getRequiredBufferSize()
+ {
+ int size = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+ largeMessageHeader.length +
+ DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BOOLEAN;
+
+ return size;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -43,18 +43,10 @@
private Message clientMessage;
- private boolean largeMessage;
-
- /** Used only if largeMessage */
- private byte[] largeMessageHeader;
-
- /** We need to set the MessageID when replicating this on the server */
- private long largeMessageId = -1;
-
private ServerMessage serverMessage;
private boolean requiresResponse;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -66,21 +58,8 @@
clientMessage = message;
this.requiresResponse = requiresResponse;
-
- largeMessage = false;
}
- public SessionSendMessage(final byte[] largeMessageHeader, final boolean requiresResponse)
- {
- super(SESS_SEND);
-
- this.largeMessageHeader = largeMessageHeader;
-
- this.requiresResponse = requiresResponse;
-
- largeMessage = true;
- }
-
public SessionSendMessage()
{
super(SESS_SEND);
@@ -88,12 +67,6 @@
// Public --------------------------------------------------------
-
- public boolean isLargeMessage()
- {
- return largeMessage;
- }
-
public Message getClientMessage()
{
return clientMessage;
@@ -104,68 +77,16 @@
return serverMessage;
}
- public byte[] getLargeMessageHeader()
- {
- return largeMessageHeader;
- }
-
public boolean isRequiresResponse()
{
return requiresResponse;
}
-
- /**
- * @return the largeMessageId
- */
- public long getMessageID()
- {
- if (largeMessage)
- {
- return largeMessageId;
- }
- else
- {
- return serverMessage.getMessageID();
- }
- }
- /**
- * @param largeMessageId the largeMessageId to set
- */
- public void setMessageID(long id)
- {
- if (largeMessage)
- {
- this.largeMessageId = id;
- }
- else
- {
- serverMessage.setMessageID(id);
- }
- }
-
@Override
public void encodeBody(final MessagingBuffer buffer)
{
- buffer.writeBoolean(largeMessage);
-
- if (largeMessage)
+ if (clientMessage != null)
{
- buffer.writeInt(largeMessageHeader.length);
- buffer.writeBytes(largeMessageHeader);
-
- if (largeMessageId > 0)
- {
- buffer.writeBoolean(true);
- buffer.writeLong(largeMessageId);
- }
- else
- {
- buffer.writeBoolean(false);
- }
- }
- else if (clientMessage != null)
- {
clientMessage.encode(buffer);
}
else
@@ -180,66 +101,22 @@
@Override
public void decodeBody(final MessagingBuffer buffer)
{
- largeMessage = buffer.readBoolean();
+ // TODO can be optimised
- if (largeMessage)
- {
- int largeMessageLength = buffer.readInt();
+ serverMessage = new ServerMessageImpl();
- largeMessageHeader = new byte[largeMessageLength];
+ clientMessage = serverMessage;
- buffer.readBytes(largeMessageHeader);
-
- final boolean largeMessageIDFilled = buffer.readBoolean();
-
- if (largeMessageIDFilled)
- {
- this.largeMessageId = buffer.readLong();
- }
- else
- {
- this.largeMessageId = -1;
- }
-
- }
- else
- {
- // TODO can be optimised
+ serverMessage.decode(buffer);
- serverMessage = new ServerMessageImpl();
-
- clientMessage = serverMessage;
+ serverMessage.getBody().resetReaderIndex();
- serverMessage.decode(buffer);
-
- serverMessage.getBody().resetReaderIndex();
-
- requiresResponse = buffer.readBoolean();
- }
+ requiresResponse = buffer.readBoolean();
}
public int getRequiredBufferSize()
{
- int size;
- if (largeMessage)
- {
- size = BASIC_PACKET_SIZE +
- // IsLargeMessage
- DataConstants.SIZE_BOOLEAN +
- // BufferSize
- DataConstants.SIZE_INT +
- // Bytes sent
- largeMessageHeader.length +
- // LargeMessageID (if > 0) and a boolean statying if the largeMessageID is set
- DataConstants.SIZE_BOOLEAN + (largeMessageId >= 0 ? DataConstants.SIZE_LONG : 0) +
- DataConstants.SIZE_BOOLEAN;
- }
- else
- {
- size = DataConstants.SIZE_BOOLEAN + BASIC_PACKET_SIZE +
- clientMessage.getEncodeSize() +
- DataConstants.SIZE_BOOLEAN;
- }
+ int size = BASIC_PACKET_SIZE + clientMessage.getEncodeSize() + DataConstants.SIZE_BOOLEAN;
return size;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -25,6 +25,7 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -32,11 +33,11 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -132,7 +133,7 @@
void handleSend(SessionSendMessage packet);
- void handleSendLargeMessage(SessionSendMessage packet);
+ void handleSendLargeMessage(SessionSendLargeMessage packet);
void handleFailedOver(Packet packet);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -1579,13 +1579,13 @@
private class DelayedAddRedistributor implements Runnable
{
private final Executor executor;
-
+
private final Channel replicatingChannel;
DelayedAddRedistributor(final Executor executor, final Channel replicatingChannel)
{
this.executor = executor;
-
+
this.replicatingChannel = replicatingChannel;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -62,6 +62,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -914,7 +915,7 @@
}
}
- public void handleSendLargeMessage(final SessionSendMessage packet)
+ public void handleSendLargeMessage(final SessionSendLargeMessage packet)
{
// need to create the LargeMessage before continue
final LargeServerMessage msg = doCreateLargeMessage(packet);
@@ -2185,16 +2186,16 @@
}
/**
- * We need to create the LargeMessage before replicating the packe, or else we won't know how to extract the destination,
+ * We need to create the LargeMessage before replicating the packet, or else we won't know how to extract the destination,
* which is stored on the header
* @param packet
* @throws Exception
*/
- private LargeServerMessage doCreateLargeMessage(final SessionSendMessage packet)
+ private LargeServerMessage doCreateLargeMessage(final SessionSendLargeMessage packet)
{
try
{
- return createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader());
+ return createLargeMessageStorage(packet.getLargeMessageHeader());
}
catch (Exception e)
{
@@ -2233,7 +2234,7 @@
channel.confirm(packet);
}
- private void doSendLargeMessage(final SessionSendMessage packet)
+ private void doSendLargeMessage(final SessionSendLargeMessage packet)
{
Packet response = null;
@@ -2410,7 +2411,7 @@
}
}
- private LargeServerMessage createLargeMessageStorage(final long messageID, final byte[] header) throws Exception
+ private LargeServerMessage createLargeMessageStorage(final byte[] header) throws Exception
{
LargeServerMessage largeMessage = storageManager.createLargeMessage();
@@ -2418,9 +2419,6 @@
largeMessage.decodeProperties(headerBuffer);
- // client didn't send the ID originally
- largeMessage.setMessageID(messageID);
-
return largeMessage;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-04-06 13:30:45 UTC (rev 6321)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-04-06 16:45:23 UTC (rev 6322)
@@ -12,14 +12,14 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FAILOVER_COMPLETE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
@@ -28,6 +28,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -44,9 +45,9 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.RollbackMessage;
@@ -55,11 +56,11 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.CreateQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendLargeMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -265,16 +266,15 @@
case SESS_SEND:
{
SessionSendMessage message = (SessionSendMessage)packet;
- if (message.isLargeMessage())
- {
- session.handleSendLargeMessage(message);
- }
- else
- {
- session.handleSend(message);
- }
+ session.handleSend(message);
break;
}
+ case SESS_SEND_LARGE:
+ {
+ SessionSendLargeMessage message = (SessionSendLargeMessage)packet;
+ session.handleSendLargeMessage(message);
+ break;
+ }
case SESS_SEND_CONTINUATION:
{
SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
More information about the jboss-cvs-commits
mailing list