[jboss-cvs] JBoss Messaging SVN: r5653 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/persistence/impl/journal and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 16 19:18:23 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-16 19:18:22 -0500 (Fri, 16 Jan 2009)
New Revision: 5653
Modified:
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
Log:
Backup of my current work
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -277,7 +277,7 @@
lastChunk = pos >= bodySize;
- final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), !lastChunk, lastChunk && sendBlocking);
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), !lastChunk, lastChunk && sendBlocking, this.address);
if (sendBlocking && lastChunk)
{
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -187,7 +187,7 @@
return true;
}
- public void deleteFile() throws MessagingException
+ public synchronized void deleteFile() throws MessagingException
{
if (file != null)
{
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -39,19 +39,20 @@
// Constants -----------------------------------------------------
+ public static final int SESSION_CONTINUATION_BASE_SIZE = BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+ DataConstants.SIZE_BOOLEAN;
+
// Attributes ----------------------------------------------------
- private byte[] body;
+ protected byte[] body;
- private boolean continues;
+ protected boolean continues;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionContinuationMessage(byte type,
- final byte[] body,
- final boolean continues)
+ public SessionContinuationMessage(byte type, final byte[] body, final boolean continues)
{
super(type);
this.body = body;
@@ -84,9 +85,7 @@
@Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
- body.length +
- DataConstants.SIZE_BOOLEAN;
+ return SESSION_CONTINUATION_BASE_SIZE + body.length;
}
@Override
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -39,6 +39,8 @@
// Constants -----------------------------------------------------
+ public static final int SESSION_RECEIVE_CONTINUATION_BASE_SIZE = SESSION_CONTINUATION_BASE_SIZE + DataConstants.SIZE_LONG;
+
// Attributes ----------------------------------------------------
private long consumerID;
@@ -83,7 +85,7 @@
@Override
public int getRequiredBufferSize()
{
- return super.getRequiredBufferSize() + DataConstants.SIZE_LONG;
+ return SESSION_RECEIVE_CONTINUATION_BASE_SIZE + body.length;
}
@Override
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -22,7 +22,6 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
-import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.impl.ClientMessageImpl;
import org.jboss.messaging.core.client.impl.ClientMessageInternal;
import org.jboss.messaging.core.logging.Logger;
@@ -41,6 +40,11 @@
{
// Constants -----------------------------------------------------
+ public static final int SESSIO_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE = BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT +
+ DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_INT;
+
private static final Logger log = Logger.getLogger(SessionReceiveMessage.class);
// Attributes ----------------------------------------------------
@@ -115,7 +119,7 @@
{
return largeMessageHeader;
}
-
+
/**
* @return the largeMessage
*/
@@ -133,11 +137,7 @@
{
if (largeMessage)
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
- DataConstants.SIZE_INT +
- DataConstants.SIZE_BOOLEAN +
- DataConstants.SIZE_INT +
- largeMessageHeader.length;
+ return SESSIO_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + largeMessageHeader.length;
}
else
{
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
/**
@@ -43,7 +44,9 @@
// Attributes ----------------------------------------------------
private boolean requiresResponse;
+ private SimpleString address;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -64,10 +67,12 @@
*/
public SessionSendContinuationMessage(final byte[] body,
final boolean continues,
- final boolean requiresResponse)
+ final boolean requiresResponse,
+ final SimpleString address)
{
super(SESS_SEND_CONTINUATION, body, continues);
this.requiresResponse = requiresResponse;
+ this.address = address;
}
@@ -80,11 +85,16 @@
{
return requiresResponse;
}
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
@Override
public int getRequiredBufferSize()
{
- return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN;
+ return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN + SimpleString.sizeofString(address);
}
@Override
@@ -92,6 +102,7 @@
{
super.encodeBody(buffer);
buffer.putBoolean(requiresResponse);
+ buffer.putSimpleString(address);
}
@Override
@@ -99,6 +110,7 @@
{
super.decodeBody(buffer);
requiresResponse = buffer.getBoolean();
+ address = buffer.getSimpleString();
}
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -103,6 +103,13 @@
{
return serverMessage;
}
+
+ /** This should be only used to set the serverLargeMessage,
+ * which needs to be initialised on the storage */
+ public void setServerLargeMessage(ServerMessage serverLargeMessage)
+ {
+ this.serverMessage = serverLargeMessage;
+ }
public byte[] getLargeMessageHeader()
{
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -21,8 +21,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
@@ -66,8 +64,6 @@
private static final boolean trace = log.isTraceEnabled();
public static final int NUM_PRIORITIES = 10;
-
- private static final int MAX_NUMBER_OF_DEPAGES_ON_BACKUP = 10;
private volatile long persistenceID = -1;
@@ -433,8 +429,9 @@
PagingStore store = pagingManager.getPageStore(address);
if (ref == null)
- {
- for (int i = 0; i < MAX_NUMBER_OF_DEPAGES_ON_BACKUP; i++)
+ {
+ // TODO: should we impose a MAX?
+ for (;;)
{
// Can't have the same store being depaged in more than one thread
synchronized (store)
@@ -444,7 +441,10 @@
if (ref == null)
{
// force a depage
- store.readPage();
+ if (!store.readPage())
+ {
+ break;
+ }
}
else
{
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -76,7 +76,8 @@
// Static ---------------------------------------------------------------------------------------
- private static final boolean trace = log.isTraceEnabled();
+ // private static final boolean trace = log.isTraceEnabled();
+ private static final boolean trace = false;
private static void trace(final String message)
{
@@ -105,6 +106,10 @@
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
+ // We will only be sending one largeMessage at any time, however during replication you may have
+ // more than one LargeMessage pending on the replicationBuffer
+ private final AtomicInteger pendingLargeMessagesCounter = new AtomicInteger(0);
+
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
@@ -314,12 +319,27 @@
}
else
{
+ if (trace)
+ {
+ trace("Receiving " + credits + " credits");
+ }
int previous = availableCredits.getAndAdd(credits);
if (previous <= 0 && previous + credits > 0)
{
+ if (trace)
+ {
+ trace("promptDelivery being resumed by receiveCredits");
+ }
promptDelivery();
}
+ else
+ {
+ if (trace)
+ {
+ trace("PromptDelivery wasn't negative");
+ }
+ }
}
}
@@ -489,13 +509,24 @@
private void promptDelivery()
{
- if (largeMessageDeliverer != null)
+ lock.lock();
+ try
{
- resumeLargeMessage();
+ // largeMessageDeliverer is aways set inside a lock
+ // if we don't acquire a lock, we will have NPE eventually
+ if (largeMessageDeliverer != null)
+ {
+ resumeLargeMessage();
+ }
+ else
+ {
+ session.promptDelivery(messageQueue);
+
+ }
}
- else
+ finally
{
- session.promptDelivery(messageQueue);
+ lock.unlock();
}
}
@@ -506,15 +537,16 @@
{
if (messageQueue.isBackup())
{
- // We are supposed to finish largeMessageDeliverer, or use all the possible credits before we return this method.
- // If we play the commands on a different order than how they were generated on the live node, we will
+ // We are supposed to finish largeMessageDeliverer, or use all the possible credits before we return this
+ // method.
+ // If we play the commands on a different order than how they were generated on the live node, we will
// eventually still be running this largeMessage before the next message come, what would reject messages
// from the cluster
largeMessageDeliverer.deliver();
}
else
{
- executor.execute(largeMessageDeliverer.resumeLargeMessageRunnable);
+ executor.execute(resumeLargeMessageRunnable);
}
}
@@ -559,12 +591,15 @@
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
- if (largeMessageDeliverer != null)
+ if (pendingLargeMessagesCounter.get() > 0)
{
- if (trace)
+ if (messageQueue.isBackup())
{
- trace("doHandle: LargeMessageDeliverer != null, can't send another message while send is pending");
+ log.warn("doHandle: rejecting message while send is pending, ignoring reference = " + ref +
+ " backup = " +
+ messageQueue.isBackup());
}
+
return HandleStatus.BUSY;
}
@@ -610,36 +645,46 @@
private void deliverLargeMessage(final MessageReference ref, final ServerMessage message)
{
- largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
+ pendingLargeMessagesCounter.incrementAndGet();
- channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
- // TODO: Should we block until the replication is done?
-// DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
-// if (result != null)
-// {
-// final CountDownLatch latch = new CountDownLatch(1);
-// result.setResultRunner(new Runnable()
-// {
-// public void run()
-// {
-// latch.countDown();
-// }
-//
-// });
-// try
-// {
-// latch.await();
-// }
-// catch (InterruptedException ignored)
-// {
-// }
-// }
+ final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
- // deliverLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with flow control credits
- // credits would arrive while deliver still being done, what would cause interruption on the flowControl
- largeMessageDeliverer.deliver();
-
+ DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+ message.getMessageID(),
+ message.getDestination()));
+
+ if (result == null)
+ {
+ // it doesn't need lock because deliverLargeMesasge is already inside the lock.lock()
+ largeMessageDeliverer = localDeliverer;
+ largeMessageDeliverer.deliver();
+ }
+ else
+ {
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ // setting & unsetting largeMessageDeliver is done inside the lock,
+ // so this needs to be locked
+ lock.lock();
+ try
+ {
+ largeMessageDeliverer = localDeliverer;
+ if (largeMessageDeliverer.deliver())
+ {
+ promptDelivery();
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ });
+ }
+
}
/**
@@ -681,6 +726,26 @@
// Inner classes
// ------------------------------------------------------------------------
+ final Runnable resumeLargeMessageRunnable = new Runnable()
+ {
+ public void run()
+ {
+ lock.lock();
+ try
+ {
+ if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
+ {
+ // prompt Delivery only if chunk was finished
+ session.promptDelivery(messageQueue);
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ };
+
/** Internal encapsulation of the logic on sending LargeMessages.
* This Inner class was created to avoid a bunch of loose properties about the current LargeMessage being sent*/
private class LargeMessageDeliverer
@@ -697,18 +762,6 @@
/** The current position on the message being processed */
private volatile long positionPendingLargeMessage;
- final Runnable resumeLargeMessageRunnable = new Runnable()
- {
- public void run()
- {
- if (deliver())
- {
- // prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue);
- }
- }
- };
-
public LargeMessageDeliverer(final LargeServerMessage message, final MessageReference ref)
{
pendingLargeMessage = message;
@@ -728,12 +781,25 @@
{
return true;
}
-
+
+
+
if (availableCredits != null && availableCredits.get() <= 0)
{
return false;
}
+ int creditsUsed;
+
+ if (availableCredits != null)
+ {
+ creditsUsed = preCalculateFlowControl();
+ }
+ else
+ {
+ creditsUsed = 0;
+ }
+
if (!sentFirstMessage)
{
if (trace)
@@ -754,13 +820,15 @@
if (availableCredits != null)
{
- // RequiredBufferSize on this case represents the right number of bytes sent
- availableCredits.addAndGet(-initialMessage.getRequiredBufferSize());
+ if ((creditsUsed -= initialMessage.getRequiredBufferSize()) < 0)
+ {
+ log.warn("Credit logic is not working properly, too many credits were taken");
+ }
if (trace)
{
trace("deliverLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
" credits, current = " +
- availableCredits +
+ creditsUsed +
" isBackup = " +
messageQueue.isBackup());
@@ -777,7 +845,7 @@
while (positionPendingLargeMessage < sizePendingLargeMessage)
{
- if (availableCredits != null && availableCredits.get() <= 0)
+ if (creditsUsed <= 0)
{
if (trace)
{
@@ -792,7 +860,10 @@
if (availableCredits != null)
{
- availableCredits.addAndGet(-chunk.getRequiredBufferSize());
+ if ((creditsUsed -= chunk.getRequiredBufferSize()) < 0)
+ {
+ log.warn("Flowcontrol logic is not working properly, too many credits were taken");
+ }
}
if (trace)
@@ -809,6 +880,11 @@
positionPendingLargeMessage += chunkLen;
}
+ if (creditsUsed != 0)
+ {
+ log.warn("Flowcontrol logic is not working properly... creidts = " + creditsUsed);
+ }
+
if (trace)
{
trace("Finished deliverLargeMessage isBackup = " + messageQueue.isBackup());
@@ -818,6 +894,8 @@
largeMessageDeliverer = null;
+ pendingLargeMessagesCounter.decrementAndGet();
+
return true;
}
finally
@@ -826,6 +904,39 @@
}
}
+ /**
+ * Credits flow control are calculated in advance.
+ * @return
+ */
+ private int preCalculateFlowControl()
+ {
+ for (;;)
+ {
+ final int currentCredit;
+ int creditsUsed = 0;
+ currentCredit = availableCredits.get();
+
+ if (!sentFirstMessage)
+ {
+ creditsUsed = SessionReceiveMessage.SESSIO_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + pendingLargeMessage.getPropertiesEncodeSize();
+ }
+
+ long chunkLen = 0;
+ for (long i = positionPendingLargeMessage; creditsUsed < currentCredit && i < sizePendingLargeMessage; i += chunkLen)
+ {
+ chunkLen = (int)Math.min(sizePendingLargeMessage - i, minLargeMessageSize);
+ creditsUsed += chunkLen + SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
+ }
+
+ // The calculation of credits and taking credits out has to be taken atomically.
+ // This is being calculated before the packets are sent, so the Consumer shouldn't be receiving credits during this calculation
+ if (availableCredits.compareAndSet(currentCredit, currentCredit - creditsUsed))
+ {
+ return creditsUsed;
+ }
+ }
+ }
+
private SessionReceiveContinuationMessage createChunkSend()
{
SessionReceiveContinuationMessage chunk;
Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -990,19 +990,16 @@
{
DelayedResult result = channel.replicatePacket(packet);
- try
- {
- // Note we don't wait for response before handling this
-
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- log.error("Failed to receive credits", e);
- }
-
if (result == null)
{
+ try
+ {
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
channel.confirm(packet);
}
else
@@ -1011,6 +1008,14 @@
{
public void run()
{
+ try
+ {
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
channel.confirm(packet);
}
});
@@ -1027,6 +1032,28 @@
packet.setMessageID(id);
}
+ // need to create the LargeMessage before continue
+ if (!doCreateLargeMessage(packet))
+ {
+ // packet logged an error, and played with channel.returns... and nothing needs to be done now
+ return;
+ }
+
+
+ final SendLock lock;
+
+ if (channel.getReplicatingChannel() != null)
+ {
+ lock = postOffice.getAddressLock(packet.getServerMessage().getDestination());
+
+ lock.beforeSend();
+ }
+ else
+ {
+ lock = null;
+ }
+
+
DelayedResult result = channel.replicatePacket(packet);
// With a send we must make sure it is replicated to backup before being processed on live
@@ -1043,8 +1070,10 @@
public void run()
{
doSendLargeMessage(packet);
+ lock.afterSend();
}
});
+
}
}
@@ -1109,9 +1138,22 @@
{
DelayedResult result = channel.replicatePacket(packet);
- // With a send we must make sure it is replicated to backup before being processed on live
- // or can end up with delivery being processed on backup before original send
+ final SendLock lock;
+ if (channel.getReplicatingChannel() != null)
+ {
+ // We can't use currentLargeMessage to get the address, as it may be pending on the replication
+ // We would need to require blocked replication for large-messages to avoid getting this address
+ lock = postOffice.getAddressLock(packet.getAddress());
+
+ lock.beforeSend();
+ }
+ else
+ {
+ lock = null;
+ }
+
+
if (result == null)
{
doSendContinuations(packet);
@@ -1123,6 +1165,7 @@
public void run()
{
doSendContinuations(packet);
+ lock.afterSend();
}
});
}
@@ -2343,6 +2386,44 @@
started = s;
}
+
+
+ /**
+ * We need to create the LargeMessage before replicating the packe, or else we won't know how to extract the destination,
+ * which is stored on the header
+ * @param packet
+ * @throws Exception
+ */
+ private boolean doCreateLargeMessage(final SessionSendMessage packet)
+ {
+ try
+ {
+ packet.setServerLargeMessage(createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader()));
+ return true;
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to create large message", e);
+ Packet response = null;
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+ channel.confirm(packet);
+ if (response != null)
+ {
+ channel.send(response);
+ }
+ return false;
+ }
+ }
private void doSendLargeMessage(final SessionSendMessage packet)
{
@@ -2350,8 +2431,7 @@
try
{
- largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader());
-
+ this.largeMessage = (LargeServerMessage)packet.getServerMessage();
if (packet.isRequiresResponse())
{
response = new NullResponseMessage();
@@ -2511,7 +2591,7 @@
largeMessage.decodeProperties(headerBuffer);
- // client didn send the ID originally
+ // client didn't send the ID originally
largeMessage.setMessageID(messageID);
return largeMessage;
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -67,7 +67,7 @@
// Constants -----------------------------------------------------
final static int RECEIVE_WAIT_TIME = 10000;
-
+
// Attributes ----------------------------------------------------
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
@@ -160,7 +160,7 @@
mockFactory.setBlockOnPersistentSend(false);
mockFactory.setBlockOnAcknowledge(false);
- session = mockFactory.createSession(null, null, false, true, true, false, 0);
+ session = mockFactory.createSession(null, null, false, true, true, false, 0);
callback.session = session;
@@ -205,7 +205,7 @@
checkFileRead(file, 13333);
}
-
+
public void testClearOnClientBuffer() throws Exception
{
clearData();
@@ -215,7 +215,7 @@
final int numberOfIntegers = 10;
final int numberOfMessages = 100;
-
+
try
{
ClientSessionFactory sf = createInVMFactory();
@@ -227,7 +227,7 @@
ClientSession session = sf.createSession(null, null, false, true, false, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true, false);
-
+
messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
ClientProducer producer = session.createProducer(ADDRESS);
@@ -243,16 +243,15 @@
producer.send(message);
}
-
ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), ADDRESS);;
File clientfiles = new File(getClientLargeMessagesDir());
-
+
session.start();
-
+
ClientMessage msg = consumer.receive(1000);
msg.acknowledge();
-
+
for (int i = 0; i < 100; i++)
{
if (clientfiles.listFiles().length > 0)
@@ -261,16 +260,14 @@
}
Thread.sleep(100);
}
-
+
assertTrue(clientfiles.listFiles().length > 0);
session.close();
-
-
+
assertEquals(1, clientfiles.list().length); // 1 message was received, that should be kept
validateNoFilesOnLargeDir();
-
}
finally
@@ -283,7 +280,7 @@
{
}
}
-
+
}
public void testMessageChunkFilePersistence() throws Exception
@@ -296,11 +293,11 @@
testChunks(true, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
}
-//Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1472 is complete
-// public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
-// {
-// testChunks(true, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
-// }
+ // Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1472 is complete
+ // public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
+ // {
+ // testChunks(true, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ // }
public void testMessageChunkFilePersistenceDelayed() throws Exception
{
@@ -393,46 +390,41 @@
session.createQueue(ADDRESS, queue[0], null, true, false);
session.createQueue(ADDRESS, queue[1], null, true, false);
-
int numberOfIntegers = 100000;
Message clientFile = createLargeClientMessage(session, numberOfIntegers);
- //Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+ // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
ClientProducer producer = session.createProducer(ADDRESS);
-
-
session.start();
-
+
producer.send(clientFile);
producer.close();
-
ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[1]);
ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
- assertNull(consumer.receive(1000));
+ assertNull(consumer.receive(1000));
assertNotNull(msg);
-
+
msg.acknowledge();
consumer.close();
-
+
System.out.println("Stopping");
session.stop();
-
+
ClientConsumer consumer1 = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[0]);
session.start();
-
msg = consumer1.receive(RECEIVE_WAIT_TIME);
assertNotNull(msg);
msg.acknowledge();
consumer1.close();
-
+
session.commit();
session.close();
@@ -484,12 +476,11 @@
session.createQueue(ADDRESS, queue[0], null, true, false);
session.createQueue(ADDRESS, queue[1], null, true, false);
-
int numberOfIntegers = 100000;
Message clientFile = createLargeClientMessage(session, numberOfIntegers);
- //Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+ // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
ClientProducer producer = session.createProducer(ADDRESS);
producer.send(clientFile);
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -158,6 +158,8 @@
ClientFileMessage message = (ClientFileMessage)consumer.receive(20000);
assertNotNull("Message i=" + i + " wasn't received", message);
+
+// System.out.println("Received message " + i);
message.acknowledge();
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java 2009-01-16 15:50:08 UTC (rev 5652)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java 2009-01-17 00:18:22 UTC (rev 5653)
@@ -1226,13 +1226,13 @@
{
long start = System.currentTimeMillis();
- sf.setMinLargeMessageSize(10 * 1024);
+ sf.setMinLargeMessageSize(1024);
sf.setSendWindowSize(1024*1024);
ClientSession s = sf.createSession(false, false, false);
- final int messageSize = 40 * 1024;
+ final int messageSize = 4 * 1024;
final int numMessages = 100;
@@ -1277,7 +1277,7 @@
for (MyHandler handler : handlers)
{
- boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+ boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
if (!ok)
{
@@ -1422,7 +1422,10 @@
{
thread.join();
- assertNull(thread.throwable);
+ if (thread.throwable != null)
+ {
+ throw new Exception ("Exception on thread " + thread, thread.throwable);
+ }
}
runnable.checkFail();
More information about the jboss-cvs-commits
mailing list