[jboss-cvs] JBoss Messaging SVN: r5493 - in trunk/src/main/org/jboss/messaging/core: client/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 9 12:50:53 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-12-09 12:50:53 -0500 (Tue, 09 Dec 2008)
New Revision: 5493
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
LargeMessage: tweak on flow control
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java 2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java 2008-12-09 17:50:53 UTC (rev 5493)
@@ -46,5 +46,5 @@
FileChannel getChannel() throws MessagingException;
- void closeChannel() throws MessagingException;
+ void closeChannel() throws MessagingException;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-12-09 17:50:53 UTC (rev 5493)
@@ -46,4 +46,11 @@
void acknowledge() throws MessagingException;
boolean isLargeMessage();
+
+ /** Size used for FlowControl */
+ int getFlowControlSize();
+
+ /** Size used for FlowControl */
+ void setFlowControlSize(int flowControlSize);
+
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-09 17:50:53 UTC (rev 5493)
@@ -19,8 +19,8 @@
import java.util.Queue;
import java.util.concurrent.Executor;
-import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientFileMessage;
+import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -70,7 +70,7 @@
private final Runner runner = new Runner();
- private File directory;
+ private final File directory;
private ClientMessage currentChunkMessage;
@@ -243,9 +243,9 @@
}
// If no handler before then need to queue them up
- boolean queueUp = this.handler == null;
+ boolean queueUp = handler == null;
- this.handler = theHandler;
+ handler = theHandler;
if (queueUp)
{
@@ -341,14 +341,14 @@
return;
}
+ // Flow control for the first packet, we will have others
+ flowControl(header.length);
+
currentChunkMessage = createFileMessage(header);
-
- // We won't call flow control at this point, as we will only flow control the header right before consumption
-
}
- public synchronized void handleLargeMessageContinuation(SessionReceiveContinuationMessage chunk) throws Exception
+ public synchronized void handleLargeMessageContinuation(final SessionReceiveContinuationMessage chunk) throws Exception
{
if (closing)
{
@@ -357,7 +357,10 @@
ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
- flowControl(chunk.getBody().length);
+ if (chunk.isContinues())
+ {
+ flowControl(chunk.getBody().length);
+ }
if (isFileConsumer())
{
@@ -367,7 +370,7 @@
else
{
MessagingBuffer currentBody = currentChunkMessage.getBody();
-
+
final int currentBodySize = currentBody == null ? 0 : currentBody.limit();
MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBodySize + body.limit()));
@@ -376,7 +379,7 @@
{
newBody.putBytes(currentBody.array());
}
-
+
newBody.putBytes(body.array());
currentChunkMessage.setBody(newBody);
@@ -389,6 +392,8 @@
{
((ClientFileMessage)currentChunkMessage).closeChannel();
}
+
+ currentChunkMessage.setFlowControlSize(chunk.getBody().length);
ClientMessage msgToSend = currentChunkMessage;
currentChunkMessage = null;
@@ -523,7 +528,7 @@
// Must store handler in local variable since might get set to null
// otherwise while this is executing and give NPE when calling onMessage
- MessageHandler theHandler = this.handler;
+ MessageHandler theHandler = handler;
if (theHandler != null)
{
@@ -556,18 +561,10 @@
* @param message
* @throws MessagingException
*/
- private void flowControlBeforeConsumption(ClientMessage message) throws MessagingException
+ private void flowControlBeforeConsumption(final ClientMessage message) throws MessagingException
{
// Chunk messages will execute the flow control while receiving the chunks
- if (!message.isLargeMessage())
- {
- flowControl(message.getEncodeSize());
- }
- else
- {
- // But the header is only flow controlled right before the consumption
- flowControl(message.getPropertiesEncodeSize());
- }
+ flowControl(message.getFlowControlSize());
}
private void doCleanUp(final boolean sendCloseMessage) throws MessagingException
@@ -634,58 +631,54 @@
else
{
int propertiesSize = message.getPropertiesEncodeSize();
-
+
MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
-
+
// FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
// MessagingBuffer.
// There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose
// abstraction
message.encodeProperties(bufferProperties);
-
+
bufferProperties.rewind();
-
+
ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
-
+
cloneMessage.decodeProperties(bufferProperties);
-
+
cloneMessage.setDeliveryCount(message.getDeliveryCount());
-
+
cloneMessage.setLargeMessage(message.isLargeMessage());
-
- cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" +
- this.session.getName() +
- "-" +
- this.getID() +
- ".jbm"));
-
+
+ cloneMessage.setFile(new File(directory, cloneMessage.getMessageID() + "-" +
+ session.getName() +
+ "-" +
+ getID() +
+ ".jbm"));
+
addBytesBody(cloneMessage, message.getBody().array());
-
+
cloneMessage.closeChannel();
-
+
return cloneMessage;
}
}
private ClientMessage createFileMessage(final byte[] header) throws Exception
{
-
+
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
if (isFileConsumer())
{
- if (!this.directory.exists())
+ if (!directory.exists())
{
directory.mkdirs();
}
ClientFileMessageImpl message = new ClientFileMessageImpl();
message.decodeProperties(headerBuffer);
- message.setFile(new File(this.directory, message.getMessageID() + "-" +
- this.session.getName() +
- "-" +
- this.getID() +
- ".jbm"));
+ message.setFile(new File(directory, message.getMessageID() + "-" + session.getName() + "-" + getID() + ".jbm"));
message.setLargeMessage(true);
return message;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientMessageImpl.java 2008-12-09 17:50:53 UTC (rev 5493)
@@ -40,13 +40,14 @@
{
// added this constant here so that the client package have no dependency on JMS
public static final SimpleString REPLYTO_HEADER_NAME = new SimpleString("JMSReplyTo");
-
+
private int deliveryCount;
private ClientConsumerInternal consumer;
private boolean largeMessage;
+ private int flowControlSize = -1;
/*
* Constructor for when reading from network
@@ -97,7 +98,7 @@
public int getDeliveryCount()
{
- return this.deliveryCount;
+ return deliveryCount;
}
public void acknowledge() throws MessagingException
@@ -108,6 +109,23 @@
}
}
+ public int getFlowControlSize()
+ {
+ if (flowControlSize > 0)
+ {
+ return flowControlSize;
+ }
+ else
+ {
+ return getEncodeSize();
+ }
+ }
+
+ public void setFlowControlSize(final int flowControlSize)
+ {
+ this.flowControlSize = flowControlSize;
+ }
+
/**
* @return the largeMessage
*/
@@ -119,7 +137,7 @@
/**
* @param largeMessage the largeMessage to set
*/
- public void setLargeMessage(boolean largeMessage)
+ public void setLargeMessage(final boolean largeMessage)
{
this.largeMessage = largeMessage;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-09 17:12:44 UTC (rev 5492)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-09 17:50:53 UTC (rev 5493)
@@ -655,7 +655,7 @@
if (availableCredits != null)
{
// RequiredBufferSize on this case represents the right number of bytes sent
- availableCredits.addAndGet(-pendingLargeMessage.getPropertiesEncodeSize());
+ availableCredits.addAndGet(-headerBuffer.limit());
}
}
More information about the jboss-cvs-commits
mailing list