Author: timfox
Date: 2009-11-28 03:42:53 -0500 (Sat, 28 Nov 2009)
New Revision: 8441
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
Log:
mainly fixed JMSFailoverTest
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-28
03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-28
08:42:53 UTC (rev 8441)
@@ -144,7 +144,7 @@
sessionExecutor = executor;
this.clientWindowSize = clientWindowSize;
-
+
this.ackBatchSize = ackBatchSize;
}
@@ -585,7 +585,7 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
-
+
if (creditsToSend >= clientWindowSize)
{
if (clientWindowSize == 0 && discountSlowConsumer)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-28
03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java 2009-11-28
08:42:53 UTC (rev 8441)
@@ -82,13 +82,13 @@
}
public synchronized void reset()
- {
+ {
// Any arriving credits from before failover won't arrive, so we re-initialise
semaphore.drainPermits();
arriving = 0;
-
+
checkCredits(windowSize * 2);
}
@@ -123,6 +123,7 @@
private void requestCredits(final int credits)
{
+
session.sendProducerCreditsMessage(credits, destination);
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-28 03:29:29
UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-28 08:42:53
UTC (rev 8441)
@@ -843,13 +843,13 @@
conn.write(buffer, false);
- int clientWindowSize =
calcWindowSize(entry.getValue().getClientWindowSize());
-
+ int clientWindowSize = entry.getValue().getClientWindowSize();
+
if (clientWindowSize != 0)
{
SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
clientWindowSize);
-
+
packet.setChannelID(channel.getID());
buffer = packet.encode(channel.getConnection());
@@ -902,6 +902,9 @@
if (resetCreditManager)
{
producerCreditManager.reset();
+
+ //Also need to send more credits for consumers, otherwise the system could hand
with the server
+ //not having any credits to send
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 03:29:29 UTC
(rev 8440)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 08:42:53 UTC
(rev 8441)
@@ -99,7 +99,7 @@
// Public --------------------------------------------------------
public TimedBuffer(final int size, final long timeout, final boolean flushOnSync,
final boolean logRates)
- {
+ {
this.bufferSize = size;
this.logRates = logRates;
if (logRates)
@@ -131,7 +131,6 @@
timerThread.start();
- log.info("log rates " + logRates);
if (logRates)
{
logRatesTimerTask = new LogRatesTimerTask();
@@ -263,7 +262,7 @@
flush();
}
}
-
+
if (buffer.writerIndex() == bufferLimit)
{
flush();
@@ -273,47 +272,47 @@
public void flush()
{
ByteBuffer bufferToFlush = null;
-
+
boolean useSync = false;
-
+
List<IOAsyncTask> callbacksToCall = null;
-
+
synchronized (this)
{
if (buffer.writerIndex() > 0)
{
latchTimer.up();
-
+
int pos = buffer.writerIndex();
-
+
if (logRates)
{
bytesFlushed += pos;
}
-
+
bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
-
+
// Putting a byteArray on a native buffer is much faster, since it will do in
a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each
byte
-
+
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
callbacksToCall = callbacks;
-
+
callbacks = new LinkedList<IOAsyncTask>();
-
+
useSync = pendingSync;
-
+
active = false;
pendingSync = false;
-
+
buffer.clear();
bufferLimit = 0;
flushesDone++;
}
}
-
+
// Execute the flush outside of the lock
// This is important for NIO performance while we are using NIO Callbacks
if (bufferToFlush != null)
@@ -339,7 +338,7 @@
{
if (bufferObserver != null)
{
- flush();
+ flush();
}
}
finally
@@ -369,7 +368,7 @@
{
double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
log.info("Write rate = " + rate + " bytes / sec or " +
(long)(rate / (1024 * 1024)) + " MiB / sec");
- double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
+ double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
log.info("Flush rate = " + flushRate + " flushes /
sec");
}
@@ -377,7 +376,7 @@
bytesFlushed = 0;
- flushesDone = 0;
+ flushesDone = 0;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-28
03:29:29 UTC (rev 8440)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-28
08:42:53 UTC (rev 8441)
@@ -186,7 +186,7 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
- {
+ {
return HandleStatus.BUSY;
}
@@ -416,6 +416,8 @@
public void receiveCredits(final int credits) throws Exception
{
+
+
if (credits == -1)
{
// No flow control
@@ -433,7 +435,7 @@
" currentValue = " +
availableCredits.get());
}
-
+
if (previous <= 0 && previous + credits > 0)
{
promptDelivery(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-11-28
03:29:29 UTC (rev 8440)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-11-28
08:42:53 UTC (rev 8441)
@@ -85,7 +85,18 @@
jbcf.setBlockOnPersistentSend(true);
jbcf.setBlockOnNonPersistentSend(true);
+
+ //Note we set consumer window size to a value so we can verify that consumer credit
re-sending
+ //works properly on failover
+ //The value is small enough that credits will have to be resent several time
+
+ final int numMessages = 10;
+
+ final int bodySize = 1000;
+
+ jbcf.setConsumerWindowSize(numMessages * bodySize / 10);
+
Connection conn = jbcf.createConnection();
MyExceptionListener listener = new MyExceptionListener();
@@ -104,8 +115,7 @@
Queue queue = sess.createQueue("myqueue");
- final int numMessages = 1000;
-
+
MessageProducer producer = sess.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);