[jboss-cvs] JBoss Messaging SVN: r5711 - in trunk/src/main/org/jboss/messaging/core: server/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 23 15:47:51 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-23 15:47:51 -0500 (Fri, 23 Jan 2009)
New Revision: 5711
Modified:
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
As the Header could change during the send, I changed the preCalculateCredits
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-01-23 20:47:51 UTC (rev 5711)
@@ -31,7 +31,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.server.Bindable;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-23 18:18:27 UTC (rev 5710)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-23 20:47:51 UTC (rev 5711)
@@ -790,63 +790,48 @@
{
return false;
}
-
- int precalculateAvailableCredits;
-
- if (availableCredits != null)
+ SessionReceiveMessage initialMessage;
+
+ if (sentFirstMessage)
{
- // Flow control needs to be done in advance.
- precalculateAvailableCredits = preCalculateFlowControl();
+ initialMessage = null;
}
else
{
- precalculateAvailableCredits = 0;
- }
-
- if (!sentFirstMessage)
- {
- if (trace)
- {
- trace("deliverLargeMessage:: sending initialMessage, backup = " + messageQueue.isBackup());
- }
sentFirstMessage = true;
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
pendingLargeMessage.encodeProperties(headerBuffer);
- SessionReceiveMessage initialMessage = new SessionReceiveMessage(id,
+ initialMessage = new SessionReceiveMessage(id,
headerBuffer.array(),
ref.getDeliveryCount() + 1);
+ }
- channel.send(initialMessage);
+ int precalculateAvailableCredits;
- if (availableCredits != null)
- {
- if ((precalculateAvailableCredits -= initialMessage.getRequiredBufferSize()) < 0)
- {
- log.warn("Credit logic is not working properly, too many credits were taken");
- }
-
- if (trace)
- {
- trace("deliverLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
- " credits, current = " +
- precalculateAvailableCredits +
- " isBackup = " +
- messageQueue.isBackup());
-
- }
- }
+ if (availableCredits != null)
+ {
+ // Flow control needs to be done in advance.
+ precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
}
else
{
- if (trace)
+ precalculateAvailableCredits = 0;
+ }
+
+
+ if (initialMessage != null)
+ {
+ channel.send(initialMessage);
+
+ if (availableCredits != null)
{
- trace("deliverLargeMessage: Resuming deliverLargeMessage, currentPosition = " + positionPendingLargeMessage);
+ precalculateAvailableCredits -= initialMessage.getRequiredBufferSize();
}
}
-
+
while (positionPendingLargeMessage < sizePendingLargeMessage)
{
if (precalculateAvailableCredits <= 0)
@@ -912,16 +897,16 @@
* Credits flow control are calculated in advance.
* @return
*/
- private int preCalculateFlowControl()
+ private int preCalculateFlowControl(SessionReceiveMessage firstPacket)
{
while (true)
{
final int currentCredit = availableCredits.get();
int precalculatedCredits = 0;
- if (!sentFirstMessage)
+ if (firstPacket != null)
{
- precalculatedCredits = SessionReceiveMessage.SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + pendingLargeMessage.getPropertiesEncodeSize();
+ precalculatedCredits = firstPacket.getRequiredBufferSize();
}
long chunkLen = 0;
More information about the jboss-cvs-commits
mailing list