[jboss-cvs] JBoss Messaging SVN: r5640 - in branches/Branch_Failover_Page/src/main/org/jboss/messaging/core: server/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jan 14 17:38:58 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-14 17:38:58 -0500 (Wed, 14 Jan 2009)
New Revision: 5640
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
Log:
Tweaks & comments
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-01-14 15:05:45 UTC (rev 5639)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-01-14 22:38:58 UTC (rev 5640)
@@ -476,24 +476,26 @@
if (creditsToSend >= clientWindowSize)
{
- final int credits = creditsToSend;
- creditsToSend = 0;
-
if (useExecutor)
{
- sessionExecutor.execute(new Runnable(){
-
+ final int credits = creditsToSend;
+
+ creditsToSend = 0;
+ sessionExecutor.execute(new Runnable()
+ {
+
public void run()
{
channel.send(new SessionConsumerFlowCreditMessage(id, credits));
}
-
+
});
}
else
{
- channel.send(new SessionConsumerFlowCreditMessage(id, credits));
+ channel.send(new SessionConsumerFlowCreditMessage(id, creditsToSend));
+ creditsToSend = 0;
}
}
}
@@ -624,7 +626,7 @@
{
channel.sendBlocking(new SessionConsumerCloseMessage(id));
}
-
+
clearBuffer();
}
finally
@@ -632,16 +634,16 @@
session.removeConsumer(this);
}
}
-
+
private void clearBuffer()
{
if (isFileConsumer())
{
- for (ClientMessage message: buffer)
+ for (ClientMessage message : buffer)
{
if (message instanceof ClientFileMessage)
{
- ((ClientFileMessage) message).getFile().delete();
+ ((ClientFileMessage)message).getFile().delete();
}
}
}
@@ -691,7 +693,7 @@
"-" +
getID() +
".jbm"));
-
+
cloneMessage.setFlowControlSize(message.getFlowControlSize());
addBytesBody(cloneMessage, message.getBody().array());
@@ -712,7 +714,7 @@
if (!directory.exists())
{
boolean ok = directory.mkdirs();
-
+
if (!ok)
{
throw new IOException("Failed to create directory " + directory.getCanonicalPath());
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-14 15:05:45 UTC (rev 5639)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-14 22:38:58 UTC (rev 5640)
@@ -103,7 +103,7 @@
private boolean started;
- private volatile LargeMessageSender largeMessageSender = null;
+ private volatile LargeMessageDeliverer largeMessageDeliverer = null;
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
@@ -489,7 +489,7 @@
private void promptDelivery()
{
- if (largeMessageSender != null)
+ if (largeMessageDeliverer != null)
{
resumeLargeMessage();
}
@@ -506,15 +506,15 @@
{
if (messageQueue.isBackup())
{
- // We are supposed to finish largeMessageSender, or use all the possible credits before we return this method.
+ // We are supposed to finish largeMessageDeliverer, or use all the possible credits before we return this method.
// If we play the commands on a different order than how they were generated on the live node, we will
// eventually still be running this largeMessage before the next message come, what would reject messages
// from the cluster
- largeMessageSender.resumeLargeMessageRunnable.run();
+ largeMessageDeliverer.deliver();
}
else
{
- executor.execute(largeMessageSender.resumeLargeMessageRunnable);
+ executor.execute(largeMessageDeliverer.resumeLargeMessageRunnable);
}
}
@@ -559,11 +559,11 @@
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
- if (largeMessageSender != null)
+ if (largeMessageDeliverer != null)
{
if (trace)
{
- trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
+ trace("doHandle: LargeMessageDeliverer != null, can't send another message while send is pending");
}
return HandleStatus.BUSY;
}
@@ -593,11 +593,11 @@
if (message.isLargeMessage())
{
- sendLargeMessage(ref, message);
+ deliverLargeMessage(ref, message);
}
else
{
- sendStandardMessage(ref, message);
+ deliverStandardMessage(ref, message);
}
return HandleStatus.HANDLED;
@@ -608,23 +608,45 @@
}
}
- private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
+ private void deliverLargeMessage(final MessageReference ref, final ServerMessage message)
{
- // TODO: Should we block until the replication is done?
+ largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
+
channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
- // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with
- // ordering and flow control
- largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
- largeMessageSender.sendLargeMessage();
+ // TODO: Should we block until the replication is done?
+// DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
+// if (result != null)
+// {
+// final CountDownLatch latch = new CountDownLatch(1);
+// result.setResultRunner(new Runnable()
+// {
+// public void run()
+// {
+// latch.countDown();
+// }
+//
+// });
+// try
+// {
+// latch.await();
+// }
+// catch (InterruptedException ignored)
+// {
+// }
+// }
+ // deliverLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with flow control credits
+ // credits would arrive while deliver still being done, what would cause interruption on the flowControl
+ largeMessageDeliverer.deliver();
+
}
/**
* @param ref
* @param message
*/
- private void sendStandardMessage(final MessageReference ref, final ServerMessage message)
+ private void deliverStandardMessage(final MessageReference ref, final ServerMessage message)
{
if (availableCredits != null)
{
@@ -661,7 +683,7 @@
/** Internal encapsulation of the logic on sending LargeMessages.
* This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
- private class LargeMessageSender
+ private class LargeMessageDeliverer
{
private final long sizePendingLargeMessage;
@@ -679,7 +701,7 @@
{
public void run()
{
- if (sendLargeMessage())
+ if (deliver())
{
// prompt Delivery only if chunk was finished
session.promptDelivery(messageQueue);
@@ -687,7 +709,7 @@
}
};
- public LargeMessageSender(final LargeServerMessage message, final MessageReference ref)
+ public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref)
{
pendingLargeMessage = message;
@@ -696,7 +718,7 @@
this.ref = ref;
}
- public boolean sendLargeMessage()
+ public boolean deliver()
{
lock.lock();
@@ -716,7 +738,7 @@
{
if (trace)
{
- trace("sendLargeMessage:: sending initialMessage, backup = " + messageQueue.isBackup());
+ trace("deliverLargeMessage:: sending initialMessage, backup = " + messageQueue.isBackup());
}
sentFirstMessage = true;
@@ -736,7 +758,7 @@
availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
if (trace)
{
- trace("sendLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
+ trace("deliverLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
" credits, current = " +
availableCredits +
" isBackup = " +
@@ -749,7 +771,7 @@
{
if (trace)
{
- trace("sendLargeMessage: Summarizing sendLargeMessage, currentPosition = " + positionPendingLargeMessage);
+ trace("deliverLargeMessage: Summarizing deliverLargeMessage, currentPosition = " + positionPendingLargeMessage);
}
}
@@ -759,7 +781,7 @@
{
if (trace)
{
- trace("sendLargeMessage: Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
+ trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
}
return false;
}
@@ -775,7 +797,7 @@
if (trace)
{
- trace("sendLargeMessage: Sending " + chunk.getRequiredBufferSize() +
+ trace("deliverLargeMessage: Sending " + chunk.getRequiredBufferSize() +
" availableCredits now is " +
availableCredits +
" isBackup = " +
@@ -789,12 +811,12 @@
if (trace)
{
- trace("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+ trace("Finished deliverLargeMessage isBackup = " + messageQueue.isBackup());
}
pendingLargeMessage.releaseResources();
- largeMessageSender = null;
+ largeMessageDeliverer = null;
return true;
}
More information about the jboss-cvs-commits
mailing list