[jboss-cvs] JBoss Messaging SVN: r5612 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/persistence/impl/nullpm and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jan 10 18:07:50 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-10 18:07:50 -0500 (Sat, 10 Jan 2009)
New Revision: 5612
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
Log:
Backup
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -182,6 +182,11 @@
return currentRefCount;
}
+ public boolean isLargeMessage()
+ {
+ return true;
+ }
+
public void deleteFile() throws MessagingException
{
if (file != null)
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -103,6 +103,11 @@
}
+ public boolean isLargeMessage()
+ {
+ return true;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -54,6 +54,8 @@
*/
public interface PostOffice extends MessagingComponent
{
+ boolean isBackup(); // Remove-me... debug for now only
+
boolean addDestination(SimpleString address, boolean durable) throws Exception;
boolean removeDestination(SimpleString address, boolean durable) throws Exception;
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -263,6 +263,11 @@
{
return addressManager.getDestinations();
}
+
+ public boolean isBackup() // Remove-me... debug for now only
+ {
+ return backup;
+ }
// TODO - needs to be synchronized to prevent happening concurrently with activate().
// (and possible removeBinding and other methods)
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -53,6 +53,8 @@
int getMemoryEstimate();
+ boolean isLargeMessage();
+
void setReload();
boolean isReload();
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -309,8 +309,18 @@
{
int previous = availableCredits.getAndAdd(credits);
+ log.info("Had " + previous +
+ ", received " +
+ credits +
+ " = " +
+ availableCredits +
+ " backup = " +
+ messageQueue.isBackup() +
+ Thread.currentThread().getName()); // remove me
+
if (previous <= 0 && previous + credits > 0)
{
+// log.info("********************* Calling promptDelivery, backup = " + messageQueue.isBackup()); -- remove me
promptDelivery();
}
}
@@ -422,7 +432,11 @@
if (handled != HandleStatus.HANDLED)
{
- throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
+ throw new IllegalStateException("Reference was not handled " + ref +
+ " " +
+ handled +
+ " backup = " +
+ messageQueue.isBackup());
}
}
@@ -496,28 +510,37 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
+ log.info("No Credits " + availableCredits + " backup=" + this.messageQueue.isBackup() + " ref = " + ref,
+ new Exception("trace"));
return HandleStatus.BUSY;
}
+ else
+ {
+ log.info("Accepted ref = " + ref + " backup = " + messageQueue.isBackup(), new Exception("trace"));
+ }
lock.lock();
try
{
- // If there is a pendingLargeMessage we can't take another message
- // This has to be checked inside the lock as the set to null is done inside the lock
- if (largeMessageSender != null)
- {
- return HandleStatus.BUSY;
- }
// If the consumer is stopped then we don't accept the message, it
// should go back into the
// queue for delivery later.
if (!started)
{
+ log.info("! started");
return HandleStatus.BUSY;
}
+ // If there is a pendingLargeMessage we can't take another message
+ // This has to be checked inside the lock as the set to null is done inside the lock
+ if (largeMessageSender != null)
+ {
+ log.info("LargeMessageSender != null, backup = " + messageQueue.isBackup());
+ return HandleStatus.BUSY;
+ }
+
final ServerMessage message = ref.getMessage();
if (filter != null && !filter.match(message))
@@ -541,31 +564,9 @@
doAck(ref);
}
- // TODO: get rid of the instanceof by something like message.isLargeMessage()
- if (message instanceof LargeServerMessage)
+ if (message.isLargeMessage())
{
- // FIXME - please put the replication logic in the sendLargeMessage method
-
- DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
- message.getMessageID(),
- message.getDestination()));
-
- if (result == null)
- {
- sendLargeMessage(ref, message);
- }
- else
- {
- // Send when replicate delivery response comes back
- result.setResultRunner(new Runnable()
- {
- public void run()
- {
- sendLargeMessage(ref, message);
- }
- });
- }
-
+ sendLargeMessage(ref, message);
}
else
{
@@ -582,9 +583,31 @@
private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
{
- largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
- largeMessageSender.sendLargeMessage();
+ DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+ message.getMessageID(),
+ message.getDestination()));
+
+ if (result == null)
+ {
+ largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
+
+ largeMessageSender.sendLargeMessage();
+ }
+ else
+ {
+ // Send when replicate delivery response comes back
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
+
+ largeMessageSender.sendLargeMessage();
+ }
+ });
+ }
+
}
/**
@@ -642,8 +665,6 @@
/** The current position on the message being processed */
private volatile long positionPendingLargeMessage;
- private volatile SessionReceiveContinuationMessage readAheadChunk;
-
public LargeMessageSender(final LargeServerMessage message, final MessageReference ref)
{
pendingLargeMessage = message;
@@ -657,6 +678,8 @@
{
lock.lock();
+ log.info("Entering SendLargeMessage (backup = " + messageQueue.isBackup() + ")");
+
try
{
if (pendingLargeMessage == null)
@@ -666,11 +689,13 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ log.info("Leaving send LargeMessage because of credits, backup = " + messageQueue.isBackup() + " even before it started sending messages");
return false;
}
if (!sentFirstMessage)
{
+ log.info("Sending first message = " + messageQueue.isBackup());
sentFirstMessage = true;
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
@@ -687,33 +712,19 @@
{
// RequiredBufferSize on this case represents the right number of bytes sent
availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
+ log.info("Initial send, taking out " + initialMessage.getRequiredBufferSize() +
+ " credits, current = " +
+ availableCredits +
+ " isBackup = " +
+ messageQueue.isBackup());
}
}
- if (readAheadChunk != null)
- {
- int chunkLen = readAheadChunk.getBody().length;
-
- positionPendingLargeMessage += chunkLen;
-
- if (availableCredits != null)
- {
- availableCredits.addAndGet(-readAheadChunk.getRequiredBufferSize());
- }
-
- channel.send(readAheadChunk);
-
- readAheadChunk = null;
- }
-
while (positionPendingLargeMessage < sizePendingLargeMessage)
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (readAheadChunk == null)
- {
- readAheadChunk = createChunkSend();
- }
+ log.info("Leaving loop of send LargeMessage because of credits, backup = " + messageQueue.isBackup());
return false;
}
@@ -726,11 +737,15 @@
availableCredits.addAndGet(-chunk.getRequiredBufferSize());
}
+ log.info("Sending " + chunk.getRequiredBufferSize() + " availableCredits now is " + availableCredits + " isBackup = " + messageQueue.isBackup());
+
channel.send(chunk);
positionPendingLargeMessage += chunkLen;
}
+ log.info("Finished sendLargeMessage isBackup = " + messageQueue.isBackup());
+
pendingLargeMessage.releaseResources();
largeMessageSender = null;
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -138,6 +138,11 @@
return reload;
}
+ public boolean isLargeMessage()
+ {
+ return false;
+ }
+
public void setReload()
{
this.reload = true;
@@ -158,4 +163,5 @@
getDestination() +
"]";
}
+
}
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -371,7 +371,7 @@
public void handleDeleteQueue(final SessionDeleteQueueMessage packet)
{
final SendLock lock;
-
+
if (channel.getReplicatingChannel() != null)
{
Binding binding = postOffice.getBinding(packet.getQueueName());
@@ -991,20 +991,22 @@
{
DelayedResult result = channel.replicatePacket(packet);
- try
+ if (result == null)
{
- // Note we don't wait for response before handling this
+ try
+ {
+ log.info("Receiving credits... first option.. backup = " + postOffice.isBackup() +
+ " and it is receiving " +
+ packet.getCredits());
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- log.error("Failed to receive credits", e);
- }
+ channel.confirm(packet);
- if (result == null)
- {
- channel.confirm(packet);
}
else
{
@@ -1012,7 +1014,21 @@
{
public void run()
{
+ log.info("Receiving credits... second option.. backup = " + postOffice.isBackup() +
+ " and it is receiving " +
+ packet.getCredits() +
+ " thread = " +
+ Thread.currentThread().getName());
+ try
+ {
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
channel.confirm(packet);
+
}
});
}
@@ -1433,7 +1449,7 @@
{
throw new MessagingException(MessagingException.ILLEGAL_STATE, "Cannot delete queue - it has consumers");
}
-
+
if (queue.isDurable())
{
queue.deleteAllReferences(storageManager, postOffice, queueSettingsRepository);
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -62,7 +62,7 @@
public void testLargeMessageReplicatedNoFailover() throws Exception
{
- testLargeMessage(-1, 500);
+ testLargeMessage(-1, 2);
}
public void testLargeMessageFailOnProducing() throws Exception
@@ -84,7 +84,7 @@
//
public void testLargeMessageFailOnConsume() throws Exception
{
- testLargeMessage(2, 500);
+ testLargeMessage(2, 2);
}
private void testLargeMessage(final int placeToFail, final int numberOfMessages) throws Exception
@@ -93,7 +93,7 @@
factory.setMinLargeMessageSize(10 * 1024);
- final int messageSize = 25000;
+ final int messageSize = 1024*1024;
try
{
@@ -105,7 +105,6 @@
}
finally
{
- System.out.println("Giving up!!!!!!");
}
}
@@ -143,7 +142,7 @@
}
}
- ClientMessage message = consumer.receive(5000);
+ ClientMessage message = consumer.receive(10000);
assertNotNull(message);
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-10 18:03:56 UTC (rev 5611)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2009-01-10 23:07:50 UTC (rev 5612)
@@ -209,4 +209,12 @@
{
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.postoffice.PostOffice#isBackup()
+ */
+ public boolean isBackup()
+ {
+ return false;
+ }
+
}
More information about the jboss-cvs-commits
mailing list