[jboss-cvs] JBoss Messaging SVN: r5615 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/postoffice/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 12 22:52:51 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-12 22:52:51 -0500 (Mon, 12 Jan 2009)
New Revision: 5615
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
LargeMessage & Failover fixes
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-13 03:52:51 UTC (rev 5615)
@@ -54,8 +54,6 @@
*/
public interface PostOffice extends MessagingComponent
{
- boolean isBackup(); // Remove-me... debug for now only
-
boolean addDestination(SimpleString address, boolean durable) throws Exception;
boolean removeDestination(SimpleString address, boolean durable) throws Exception;
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-13 03:52:51 UTC (rev 5615)
@@ -264,11 +264,6 @@
return addressManager.getDestinations();
}
- public boolean isBackup() // Remove-me... debug for now only
- {
- return backup;
- }
-
// TODO - needs to be synchronized to prevent happening concurrently with activate().
// (and possible removeBinding and other methods)
// Otherwise can have situation where createQueue comes in before failover, then failover occurs
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-13 03:52:51 UTC (rev 5615)
@@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -69,19 +70,21 @@
*/
public class ServerConsumerImpl implements ServerConsumer
{
- // Constants
- // ------------------------------------------------------------------------------------
+ // Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
- // Static
- // ---------------------------------------------------------------------------------------
+ // Static ---------------------------------------------------------------------------------------
- // Attributes
- // -----------------------------------------------------------------------------------
+ private static final boolean trace = log.isTraceEnabled();
- private final boolean trace = log.isTraceEnabled();
+ private static void trace(final String message)
+ {
+ log.trace(message);
+ }
+ // Attributes -----------------------------------------------------------------------------------
+
private final long id;
private final Queue messageQueue;
@@ -119,8 +122,7 @@
private final boolean preAcknowledge;
- // Constructors
- // ---------------------------------------------------------------------------------
+ // Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
final ServerSession session,
@@ -309,18 +311,8 @@
{
int previous = availableCredits.getAndAdd(credits);
- log.info("Had " + previous +
- ", received " +
- credits +
- " = " +
- availableCredits +
- " backup = " +
- messageQueue.isBackup() +
- Thread.currentThread().getName()); // remove me
-
if (previous <= 0 && previous + credits > 0)
{
-// log.info("********************* Calling promptDelivery, backup = " + messageQueue.isBackup()); -- remove me
promptDelivery();
}
}
@@ -510,13 +502,22 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- log.info("No Credits " + availableCredits + " backup=" + this.messageQueue.isBackup() + " ref = " + ref,
- new Exception("trace"));
+ if (trace)
+ {
+ trace("doHandle: No Credits " + availableCredits + " backup=" + messageQueue.isBackup() + " ref = " + ref);
+ }
return HandleStatus.BUSY;
}
else
{
- log.info("Accepted ref = " + ref + " backup = " + messageQueue.isBackup(), new Exception("trace"));
+ if (trace)
+ {
+ trace("doHandle: Accepted ref = " + ref +
+ " backup = " +
+ messageQueue.isBackup() +
+ " Thread = " +
+ Thread.currentThread().getName());
+ }
}
lock.lock();
@@ -529,7 +530,10 @@
// queue for delivery later.
if (!started)
{
- log.info("! started");
+ if (trace)
+ {
+ trace("doHandle: ignore reference " + ref + " as consumer is not started!");
+ }
return HandleStatus.BUSY;
}
@@ -537,7 +541,10 @@
// This has to be checked inside the lock as the set to null is done inside the lock
if (largeMessageSender != null)
{
- log.info("LargeMessageSender != null, backup = " + messageQueue.isBackup());
+ if (trace)
+ {
+ trace("doHandle: LargeMessageSender != null, can't send another message while send is pending");
+ }
return HandleStatus.BUSY;
}
@@ -583,29 +590,30 @@
private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
{
-
DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
message.getMessageID(),
message.getDestination()));
+ largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
+
if (result == null)
{
- largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
-
largeMessageSender.sendLargeMessage();
}
else
{
+
+ final CountDownLatch latchDone = new CountDownLatch(1);
// Send when replicate delivery response comes back
result.setResultRunner(new Runnable()
{
public void run()
{
- largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
-
- largeMessageSender.sendLargeMessage();
+ latchDone.countDown();
}
});
+
+ largeMessageSender.sendLargeMessage();
}
}
@@ -678,8 +686,6 @@
{
lock.lock();
- log.info("Entering SendLargeMessage (backup = " + messageQueue.isBackup() + ")");
-
try
{
if (pendingLargeMessage == null)
@@ -689,13 +695,15 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
- log.info("Leaving send LargeMessage because of credits, backup = " + messageQueue.isBackup() + " even before it started sending messages");
return false;
}
if (!sentFirstMessage)
{
- log.info("Sending first message = " + messageQueue.isBackup());
+ if (trace)
+ {
+ trace("sendLargeMessage:: sending initialMessage, backup = " + messageQueue.isBackup());
+ }
sentFirstMessage = true;
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
@@ -712,19 +720,33 @@
{
// RequiredBufferSize on this case represents the right number of bytes sent
availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
- log.info("Initial send, taking out " + initialMessage.getRequiredBufferSize() +
+ if (trace)
+ {
+ trace("sendLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
" credits, current = " +
availableCredits +
" isBackup = " +
messageQueue.isBackup());
+
+ }
}
}
+ else
+ {
+ if (trace)
+ {
+ trace("sendLargeMessage: Summarizing sendLargeMessage, currentPosition = " + positionPendingLargeMessage);
+ }
+ }
while (positionPendingLargeMessage < sizePendingLargeMessage)
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- log.info("Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
+ if (trace)
+ {
+ trace("sendLargeMessage: Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
+ }
return false;
}
@@ -737,14 +759,24 @@
availableCredits.addAndGet(-chunk.getRequiredBufferSize());
}
- log.info("Sending " + chunk.getRequiredBufferSize() + " availableCredits now is " + availableCredits + " isBackup = " + messageQueue.isBackup());
+ if (trace)
+ {
+ trace("sendLargeMessage: Sending " + chunk.getRequiredBufferSize() +
+ " availableCredits now is " +
+ availableCredits +
+ " isBackup = " +
+ messageQueue.isBackup());
+ }
channel.send(chunk);
positionPendingLargeMessage += chunkLen;
}
- log.info("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+ if (trace)
+ {
+ trace("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+ }
pendingLargeMessage.releaseResources();
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-13 03:52:51 UTC (rev 5615)
@@ -89,7 +89,6 @@
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.Transaction;
-import org.jboss.messaging.core.transaction.TransactionOperation;
import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.util.IDGenerator;
import org.jboss.messaging.util.SimpleIDGenerator;
@@ -995,9 +994,6 @@
{
try
{
- log.info("Receiving credits... first option.. backup = " + postOffice.isBackup() +
- " and it is receiving " +
- packet.getCredits());
consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
}
catch (Exception e)
@@ -1014,11 +1010,6 @@
{
public void run()
{
- log.info("Receiving credits... second option.. backup = " + postOffice.isBackup() +
- " and it is receiving " +
- packet.getCredits() +
- " thread = " +
- Thread.currentThread().getName());
try
{
consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-13 03:52:51 UTC (rev 5615)
@@ -22,9 +22,11 @@
package org.jboss.messaging.tests.integration.cluster.failover;
+import java.io.File;
import java.nio.ByteBuffer;
import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
@@ -33,7 +35,6 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
/**
@@ -62,39 +63,42 @@
public void testLargeMessageReplicatedNoFailover() throws Exception
{
- testLargeMessage(-1, 2);
+ testLargeMessage(-1, 500, 1024 * 1024);
}
+
+ public void testLargeMessageReplicatedNoFailoverSmallMessageSize() throws Exception
+ {
+ testLargeMessage(-1, 500, 50 * 1024);
+ }
+
public void testLargeMessageFailOnProducing() throws Exception
{
- testLargeMessage(1, 500);
+ testLargeMessage(1, 500, 1024 * 1024);
}
-
-// public void testFail() throws Exception
-// {
-// for (int i = 0; i < 100; i++)
-// {
-// System.out.println ("****************** " + i);
-// testLargeMessageFailOnConsume();
-// tearDown();
-// setUp();
-// }
-// }
-//
+ // public void testFail() throws Exception
+ // {
+ // for (int i = 0; i < 100; i++)
+ // {
+ // System.out.println ("****************** " + i);
+ // testLargeMessageFailOnConsume();
+ // tearDown();
+ // setUp();
+ // }
+ // }
+ //
public void testLargeMessageFailOnConsume() throws Exception
{
- testLargeMessage(2, 2);
+ testLargeMessage(2, 10, 1024 * 1024);
}
- private void testLargeMessage(final int placeToFail, final int numberOfMessages) throws Exception
+ private void testLargeMessage(final int placeToFail, final int numberOfMessages, final int messageSize) throws Exception
{
ClientSessionFactory factory = createFailoverFactory();
factory.setMinLargeMessageSize(10 * 1024);
- final int messageSize = 1024*1024;
-
try
{
@@ -126,7 +130,8 @@
try
{
- ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientConsumer consumer = session.createFileConsumer(new File(getTestDir() + File.separator +
+ getClientLargeMessagesDir("live")), ADDRESS);
session.start();
@@ -142,20 +147,17 @@
}
}
- ClientMessage message = consumer.receive(10000);
+ ClientFileMessage message = (ClientFileMessage)consumer.receive(20000);
- assertNotNull(message);
+ assertNotNull("Message i=" + i + " wasn't received", message);
message.acknowledge();
+
+ File file = message.getFile();
- MessagingBuffer buffer = message.getBody();
+ assertEquals(messageSize, file.length());
+ }
- buffer.rewind();
-
- assertEquals(messageSize, buffer.limit());
-
- assertEquals(i, buffer.getInt());
- }
assertNull(consumer.receive(500));
}
finally
@@ -181,7 +183,7 @@
{
e.printStackTrace();
}
-
+
System.out.println("***************************************** Forcing failure");
conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
}
@@ -218,7 +220,7 @@
{
forceFailure(conn);
}
- }
+ }
ClientMessage message = session.createClientMessage(true);
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-12 16:45:17 UTC (rev 5614)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-13 03:52:51 UTC (rev 5615)
@@ -209,12 +209,5 @@
{
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.postoffice.PostOffice#isBackup()
- */
- public boolean isBackup()
- {
- return false;
- }
}
More information about the jboss-cvs-commits
mailing list