JBoss hornetq SVN: r8221 - in trunk: src/main/org/hornetq/core/client/impl and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-05 03:42:21 -0500 (Thu, 05 Nov 2009)
New Revision: 8221
Modified:
trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-206
Modified: trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-05 07:56:15 UTC (rev 8220)
+++ trunk/examples/jms/large-message/src/org/hornetq/jms/example/LargeMessageExample.java 2009-11-05 08:42:21 UTC (rev 8221)
@@ -47,7 +47,9 @@
// The message we will send is size 256MB, even though we are only running in 50MB of RAM on both client and server.
// HornetQ will support much larger message sizes, but we use 512MB so the example runs in reasonable time.
- private final long FILE_SIZE = 256 * 1024 * 1024;
+ // private final long FILE_SIZE = 256L * 1024 * 1024;
+
+ private final long FILE_SIZE = 2L * 1024 * 1024 * 1024; // 2 GiB message
public boolean runExample() throws Exception
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-05 07:56:15 UTC (rev 8220)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-05 08:42:21 UTC (rev 8221)
@@ -22,8 +22,8 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.message.Message;
-import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
@@ -250,7 +250,7 @@
if (isLarge)
{
- largeMessageSend(sendBlocking, msg);
+ largeMessageSend(sendBlocking, msg, theCredits);
}
else if (sendBlocking)
{
@@ -269,14 +269,8 @@
// Not the continuations, but this is ok since we are only interested in limiting the amount of
// data in *memory* and continuations go straight to the disk
- if (isLarge)
+ if (!isLarge)
{
- // TODO this is pretty hacky - we should define consistent meanings of encode size
-
- theCredits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
- }
- else
- {
theCredits.acquireCredits(msg.getEncodeSize());
}
}
@@ -292,15 +286,14 @@
throw new HornetQException(HornetQException.OBJECT_CLOSED, "Producer is closed");
}
}
-
-
+
// Methods to send Large Messages----------------------------------------------------------------
-
+
/**
* @param msg
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final Message msg) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking, final Message msg, final ClientProducerCredits credits) throws HornetQException
{
int headerSize = msg.getHeadersAndPropertiesEncodeSize();
@@ -323,15 +316,23 @@
channel.send(initialChunk);
+ try
+ {
+ credits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
+
InputStream input = msg.getBodyInputStream();
if (input != null)
{
- largeMessageSendStreamed(sendBlocking, input);
+ largeMessageSendStreamed(sendBlocking, input, credits);
}
else
{
- largeMessageSendBuffered(sendBlocking, msg);
+ largeMessageSendBuffered(sendBlocking, msg, credits);
}
}
@@ -340,7 +341,9 @@
* @param msg
* @throws HornetQException
*/
- private void largeMessageSendBuffered(final boolean sendBlocking, final Message msg) throws HornetQException
+ private void largeMessageSendBuffered(final boolean sendBlocking,
+ final Message msg,
+ final ClientProducerCredits credits) throws HornetQException
{
final long bodySize = msg.getLargeBodySize();
@@ -373,6 +376,14 @@
{
channel.send(chunk);
}
+
+ try
+ {
+ credits.acquireCredits(chunk.getRequiredBufferSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
}
}
@@ -381,7 +392,9 @@
* @param input
* @throws HornetQException
*/
- private void largeMessageSendStreamed(final boolean sendBlocking, InputStream input) throws HornetQException
+ private void largeMessageSendStreamed(final boolean sendBlocking,
+ final InputStream input,
+ final ClientProducerCredits credits) throws HornetQException
{
boolean lastPacket = false;
@@ -441,6 +454,14 @@
{
channel.send(chunk);
}
+
+ try
+ {
+ credits.acquireCredits(chunk.getRequiredBufferSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
}
try
@@ -455,8 +476,6 @@
}
}
-
-
// Inner Classes --------------------------------------------------------------------------------
class DecodingContext implements LargeMessageEncodingContext
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-05 07:56:15 UTC (rev 8220)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-05 08:42:21 UTC (rev 8221)
@@ -1478,7 +1478,7 @@
{
try
{
- releaseOutStanding(message);
+ releaseOutStanding(message, message.getEncodeSize());
}
catch (Exception e)
{
@@ -1499,20 +1499,23 @@
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
}
-
+
+ //Immediately release the credits for the continuations- these don't contrinute to the in-memory size
+ //of the message
+
+ releaseOutStanding(currentLargeMessage, packet.getRequiredBufferSize());
+
currentLargeMessage.addBytes(packet.getBody());
if (!packet.isContinues())
- {
- final LargeServerMessage message = currentLargeMessage;
+ {
+ currentLargeMessage.releaseResources();
+ send(currentLargeMessage);
+
+ releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
+
currentLargeMessage = null;
-
- message.releaseResources();
-
- send(message);
-
- releaseOutStanding(message);
}
if (packet.isRequiresResponse())
@@ -1910,17 +1913,15 @@
* returned. When a session closes any outstanding credits will be returned.
*
*/
- private void releaseOutStanding(final ServerMessage message) throws Exception
+ private void releaseOutStanding(final ServerMessage message, final int credits) throws Exception
{
CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
- int size = message.getEncodeSize();
+ holder.outstandingCredits -= credits;
- holder.outstandingCredits -= size;
-
- holder.store.returnProducerCredits(size);
+ holder.store.returnProducerCredits(credits);
}
-
+
private void send(final ServerMessage msg) throws Exception
{
// check the user has write access to this address.
16 years, 1 month
JBoss hornetq SVN: r8220 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-05 02:56:15 -0500 (Thu, 05 Nov 2009)
New Revision: 8220
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
minor reformat
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-05 07:52:32 UTC (rev 8219)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-05 07:56:15 UTC (rev 8220)
@@ -93,7 +93,7 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
private boolean largeMessageInDelivery;
/**
@@ -143,7 +143,7 @@
this.binding = binding;
- this.messageQueue = binding.getQueue();
+ messageQueue = binding.getQueue();
this.executor = executor;
@@ -159,7 +159,7 @@
this.managementService = managementService;
- this.minLargeMessageSize = session.getMinLargeMessageSize();
+ minLargeMessageSize = session.getMinLargeMessageSize();
this.updateDeliveries = updateDeliveries;
@@ -349,7 +349,7 @@
forcedDeliveryMessage.setDestination(messageQueue.getName());
final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
-
+
channel.send(packet);
}
});
@@ -515,7 +515,7 @@
// Private --------------------------------------------------------------------------------------
- private void promptDelivery(boolean asyncDelivery)
+ private void promptDelivery(final boolean asyncDelivery)
{
lock.lock();
try
@@ -747,9 +747,6 @@
}
}
- /**
- *
- */
public void finish() throws Exception
{
lock.lock();
@@ -758,7 +755,7 @@
if (largeMessage == null)
{
// handleClose could be calling close while handleDeliver is also calling finish.
- // As a result one of them could get here after the largeMessage is already gone.
+ // As a result one of them could get here after the largeMessage is already gone.
// On that case we just ignore this call
return;
}
@@ -786,7 +783,7 @@
}
}
- private SessionReceiveContinuationMessage createChunkSend(LargeMessageEncodingContext context)
+ private SessionReceiveContinuationMessage createChunkSend(final LargeMessageEncodingContext context)
{
SessionReceiveContinuationMessage chunk;
@@ -844,7 +841,7 @@
while (iterator.hasNext())
{
- MessageReference ref = (MessageReference)iterator.next();
+ MessageReference ref = iterator.next();
try
{
HandleStatus status = handle(ref);
16 years, 1 month
JBoss hornetq SVN: r8219 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-05 02:52:32 -0500 (Thu, 05 Nov 2009)
New Revision: 8219
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
Log:
removed bogus test
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-05 07:31:28 UTC (rev 8218)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-05 07:52:32 UTC (rev 8219)
@@ -53,7 +53,7 @@
{
return true;
}
-
+
private int getMessageEncodeSize(final SimpleString address) throws Exception
{
ClientSessionFactory cf = createFactory(isNetty());
@@ -214,7 +214,7 @@
internalTestSlowConsumerNoBuffer(false);
}
-// I believe this test became invalid after we started using another thread to deliver the large message
+ // I believe this test became invalid after we started using another thread to deliver the large message
public void disabled_testSlowConsumerNoBufferLargeMessages() throws Exception
{
internalTestSlowConsumerNoBuffer(true);
@@ -863,11 +863,6 @@
testNoWindowRoundRobin(false);
}
- public void testNoWindowRoundRobinLargeMessage() throws Exception
- {
- testNoWindowRoundRobin(true);
- }
-
private void testNoWindowRoundRobin(final boolean largeMessages) throws Exception
{
@@ -926,7 +921,7 @@
{
Thread.sleep(10);
}
-
+
assertNull(consumerImpl.getAvailableCredits());
}
}
16 years, 1 month
JBoss hornetq SVN: r8218 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-05 02:31:28 -0500 (Thu, 05 Nov 2009)
New Revision: 8218
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
removed unnecessary volatile modifiers that were there before
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-05 07:25:01 UTC (rev 8217)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-05 07:31:28 UTC (rev 8218)
@@ -93,8 +93,7 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
- // Note, this does not need to be volatile since it is only accessed when the lock is held
+
private boolean largeMessageInDelivery;
/**
@@ -624,17 +623,14 @@
{
private final long sizePendingLargeMessage;
- /** The current message being processed
- * Note, this does not need to be volatile since it is only accessed when the lock is held
- */
private LargeServerMessage largeMessage;
private final MessageReference ref;
- private volatile boolean sentInitialPacket = false;
+ private boolean sentInitialPacket = false;
/** The current position on the message being processed */
- private volatile long positionPendingLargeMessage;
+ private long positionPendingLargeMessage;
private LargeMessageEncodingContext context;
16 years, 1 month
JBoss hornetq SVN: r8217 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-11-05 02:25:01 -0500 (Thu, 05 Nov 2009)
New Revision: 8217
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
removed unnecessary volatile modifiers that clebert added
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-04 23:36:19 UTC (rev 8216)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-05 07:25:01 UTC (rev 8217)
@@ -94,7 +94,8 @@
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
- private volatile boolean largeMessageInDelivery;
+ // Note, this does not need to be volatile since it is only accessed when the lock is held
+ private boolean largeMessageInDelivery;
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
@@ -349,6 +350,7 @@
forcedDeliveryMessage.setDestination(messageQueue.getName());
final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);
+
channel.send(packet);
}
});
@@ -622,8 +624,10 @@
{
private final long sizePendingLargeMessage;
- /** The current message being processed */
- private volatile LargeServerMessage largeMessage;
+ /** The current message being processed
+ * Note, this does not need to be volatile since it is only accessed when the lock is held
+ */
+ private LargeServerMessage largeMessage;
private final MessageReference ref;
16 years, 1 month
JBoss hornetq SVN: r8216 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-04 18:36:19 -0500 (Wed, 04 Nov 2009)
New Revision: 8216
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
Log:
Disabling test for now until I confirm the test is invalid
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-04 23:34:51 UTC (rev 8215)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-04 23:36:19 UTC (rev 8216)
@@ -214,7 +214,8 @@
internalTestSlowConsumerNoBuffer(false);
}
- public void testSlowConsumerNoBufferLargeMessages() throws Exception
+// I believe this test became invalid after we started using another thread to deliver the large message
+ public void disabled_testSlowConsumerNoBufferLargeMessages() throws Exception
{
internalTestSlowConsumerNoBuffer(true);
}
16 years, 1 month
JBoss hornetq SVN: r8215 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-11-04 18:34:51 -0500 (Wed, 04 Nov 2009)
New Revision: 8215
Modified:
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-188 - a few tweaks
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-04 23:29:37 UTC (rev 8214)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-04 23:34:51 UTC (rev 8215)
@@ -1401,7 +1401,16 @@
// One decrements the ref count, then the other stores a delete, the delete gets committed, but the first
// ack isn't committed, then the server crashes and on
// recovery the message is deleted even though the other ack never committed
- storageManager.deleteMessage(message.getMessageID());
+
+ //also note then when this happens as part of a trasaction its the tx commt of the ack that is important not this
+ try
+ {
+ storageManager.deleteMessage(message.getMessageID());
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to remove message id = " + message.getMessageID() + " please remove manually");
+ }
}
}
16 years, 1 month
JBoss hornetq SVN: r8214 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-04 18:29:37 -0500 (Wed, 04 Nov 2009)
New Revision: 8214
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
Fixing eventual failure on LargeMessageTest
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-04 16:59:56 UTC (rev 8213)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-04 23:29:37 UTC (rev 8214)
@@ -94,7 +94,7 @@
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
- private boolean largeMessageInDelivery;
+ private volatile boolean largeMessageInDelivery;
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
@@ -200,8 +200,6 @@
return HandleStatus.BUSY;
}
- // note: Since we schedule deliveries to start under replication, we use a counter of pendingLargeMessages.
-
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
if (largeMessageInDelivery)
@@ -625,7 +623,7 @@
private final long sizePendingLargeMessage;
/** The current message being processed */
- private LargeServerMessage largeMessage;
+ private volatile LargeServerMessage largeMessage;
private final MessageReference ref;
@@ -757,6 +755,13 @@
lock.lock();
try
{
+ if (largeMessage == null)
+ {
+ // handleClose could be calling close while handleDeliver is also calling finish.
+ // As a result one of them could get here after the largeMessage is already gone.
+ // On that case we just ignore this call
+ return;
+ }
context.close();
largeMessage.releaseResources();
16 years, 1 month
JBoss hornetq SVN: r8213 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-04 11:59:56 -0500 (Wed, 04 Nov 2009)
New Revision: 8213
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
Log:
tweaks
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-11-04 16:57:35 UTC (rev 8212)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-11-04 16:59:56 UTC (rev 8213)
@@ -33,7 +33,6 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.server.HornetQServer;
@@ -99,10 +98,10 @@
"hornetq-data",
"hq",
100);
-
+
List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
-
+
journal.start();
journal.load(committedRecords, preparedTransactions, null);
@@ -156,7 +155,7 @@
assertEquals(xid, xids[0]);
session.rollback(xid);
-
+
session.close();
sf.close();
}
@@ -243,7 +242,7 @@
setupServer(JournalType.ASYNCIO);
drainQueue(0, QUEUE);
drainQueue(0, new SimpleString("LAZY-QUEUE"));
-
+
checkEmptyXID(xid);
}
@@ -268,7 +267,7 @@
if (i % 100 == 0)
{
- System.out.println("Received #" + i + " on thread after start");
+ // System.out.println("Received #" + i + " on thread after start");
}
msg.acknowledge();
}
@@ -345,11 +344,10 @@
config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
-
+
// This test is supposed to not sync.. All the ACKs are async, and it was supposed to not sync
config.setJournalSyncNonTransactional(false);
-
-
+
// config.setJournalCompactMinFiles(0);
// config.setJournalCompactPercentage(0);
@@ -361,7 +359,7 @@
sf = createNettyFactory();
sf.setBlockOnPersistentSend(false);
sf.setBlockOnAcknowledge(false);
-
+
ClientSession sess = sf.createSession();
try
@@ -446,7 +444,7 @@
}
if (i % 100 == 0)
{
- System.out.println(Thread.currentThread().getName() + "::sent #" + i);
+ // System.out.println(Thread.currentThread().getName() + "::sent #" + i);
}
ClientMessage msg = session.createClientMessage(true);
msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
@@ -514,7 +512,7 @@
}
if (i % 100 == 0)
{
- System.out.println(Thread.currentThread().getName() + "::received #" + i);
+ // System.out.println(Thread.currentThread().getName() + "::received #" + i);
}
}
16 years, 1 month
JBoss hornetq SVN: r8212 - trunk/tests/src/org/hornetq/tests/integration/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-11-04 11:57:35 -0500 (Wed, 04 Nov 2009)
New Revision: 8212
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
Log:
Setting sync to false. This test was not meant to sync on non-transactional.
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-11-04 16:55:22 UTC (rev 8211)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-11-04 16:57:35 UTC (rev 8212)
@@ -345,7 +345,11 @@
config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
-
+
+ // This test is supposed to not sync.. All the ACKs are async, and it was supposed to not sync
+ config.setJournalSyncNonTransactional(false);
+
+
// config.setJournalCompactMinFiles(0);
// config.setJournalCompactPercentage(0);
@@ -442,7 +446,7 @@
}
if (i % 100 == 0)
{
- // System.out.println(Thread.currentThread().getName() + "::sent #" + i);
+ System.out.println(Thread.currentThread().getName() + "::sent #" + i);
}
ClientMessage msg = session.createClientMessage(true);
msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
@@ -472,7 +476,7 @@
}
catch (Throwable e)
{
- e.printStackTrace();
+ this.e = e;
}
}
}
@@ -510,7 +514,7 @@
}
if (i % 100 == 0)
{
- // System.out.println(Thread.currentThread().getName() + "::received #" + i);
+ System.out.println(Thread.currentThread().getName() + "::received #" + i);
}
}
16 years, 1 month