[jboss-cvs] JBoss Messaging SVN: r6460 - in trunk: examples/jms/large-message and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 16 19:06:20 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-16 19:06:20 -0400 (Thu, 16 Apr 2009)
New Revision: 6460
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
Modified:
trunk/examples/jms/common/build.xml
trunk/examples/jms/large-message/build.xml
trunk/examples/jms/large-message/readme.html
trunk/examples/jms/large-message/server0/jbm-jms.xml
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java
Log:
tweaks
Modified: trunk/examples/jms/common/build.xml
===================================================================
--- trunk/examples/jms/common/build.xml 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/examples/jms/common/build.xml 2009-04-16 23:06:20 UTC (rev 6460)
@@ -103,10 +103,12 @@
<target name="runExample" depends="compile">
+ <property name="java-min-memory" value="512M"/>
+ <property name="java-max-memory" value="2048M"/>
<java classname="${example.classname}" fork="true" resultproperty="example-result">
<jvmarg value="-XX:+UseParallelGC"/>
- <jvmarg value="-Xms512M"/>
- <jvmarg value="-Xmx2048M"/>
+ <jvmarg value="-Xms${java-min-memory}"/>
+ <jvmarg value="-Xmx${java-max-memory}"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
Modified: trunk/examples/jms/large-message/build.xml
===================================================================
--- trunk/examples/jms/large-message/build.xml 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/examples/jms/large-message/build.xml 2009-04-16 23:06:20 UTC (rev 6460)
@@ -33,8 +33,11 @@
<import file="../common/build.xml"/>
<target name="run">
+
<antcall target="runExample">
<param name="example.classname" value="org.jboss.jms.example.LargeMessageExample"/>
+ <param name="java-min-memory" value="128M"/>
+ <param name="java-max-memory" value="128M"/>
</antcall>
</target>
Modified: trunk/examples/jms/large-message/readme.html
===================================================================
--- trunk/examples/jms/large-message/readme.html 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/examples/jms/large-message/readme.html 2009-04-16 23:06:20 UTC (rev 6460)
@@ -1,14 +1,14 @@
<html>
<head>
- <title>JBoss Messaging JMS Queue Example</title>
+ <title>JBoss Messaging Large Message Example</title>
<link rel="stylesheet" type="text/css" href="../common/common.css">
</head>
<body>
- <h1>JMS Queue Example</h1>
+ <h1>Large Message Example</h1>
<br>
- <p>This example shows you how to send and receive a message to a JMS Queue with JBoss Messaging.</p>
- <p>Queues are a standard part of JMS, please consult the JMS 1.1 specification for full details.</p>
- <p>A Queue is used to send messages point to point, from a producer to a consumer. The queue guarantees message ordering between these 2 points.</p>
+ <p>This example shows you how to send and receive very large messages with JBoss Messaging.</p>
+ <p>JBossMessaging provides an extension to JMS where you can use an InputStream or OutputStream as the source and destination for your messages. You can send messages as large as it would fit in your disk.</p>
+ <p>You may also choose to read LargeMessages using the regular ByteStream or ByteMessage methods, but using the OutputStream will provide you a much better performance</p>
<br>
<h2>Example step-by-step</h2>
<p><i>To run the example, simply type <code>ant</code> from this directory</i></p>
Modified: trunk/examples/jms/large-message/server0/jbm-jms.xml
===================================================================
--- trunk/examples/jms/large-message/server0/jbm-jms.xml 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/examples/jms/large-message/server0/jbm-jms.xml 2009-04-16 23:06:20 UTC (rev 6460)
@@ -10,9 +10,6 @@
<entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
</entries>
- <!-- When using this ConnectionFactory, messages beyond this limit are considered largeMessages and will be sent using smaller packets -->
- <min-large-message-size>10240</min-large-message-size>
-
</connection-factory>
<!--the queue used by the example-->
@@ -20,4 +17,4 @@
<entry name="/queue/exampleQueue"/>
</queue>
-</deployment>
\ No newline at end of file
+</deployment>
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/LargeMessageTest.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -22,6 +22,7 @@
package org.jboss.test.messaging.jms.message;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +36,6 @@
import javax.jms.Session;
import org.jboss.messaging.jms.client.JBossMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.test.messaging.jms.JMSTestCase;
/**
@@ -72,7 +72,7 @@
BytesMessage m = session.createBytesMessage();
- ((JBossMessage)m).setInputStream(UnitTestCase.createFakeLargeStream(1024 * 1024));
+ ((JBossMessage)m).setInputStream(createFakeLargeStream(1024 * 1024));
prod.send(m);
@@ -99,7 +99,7 @@
assertEquals(1024, numberOfBytes);
for (int j = 0; j < 1024; j++)
{
- assertEquals(UnitTestCase.getSamplebyte(i + j), data[j]);
+ assertEquals(getSamplebyte(i + j), data[j]);
}
}
@@ -131,7 +131,7 @@
BytesMessage m = session.createBytesMessage();
- ((JBossMessage)m).setInputStream(UnitTestCase.createFakeLargeStream(10));
+ ((JBossMessage)m).setInputStream(createFakeLargeStream(10));
prod.send(m);
@@ -155,7 +155,7 @@
assertEquals(10, numberOfBytes);
for (int j = 0; j < numberOfBytes; j++)
{
- assertEquals(UnitTestCase.getSamplebyte(j), data[j]);
+ assertEquals(getSamplebyte(j), data[j]);
}
assertNotNull(rm);
@@ -189,7 +189,7 @@
BytesMessage m = session.createBytesMessage();
- ((JBossMessage)m).setInputStream(UnitTestCase.createFakeLargeStream(msgSize));
+ ((JBossMessage)m).setInputStream(createFakeLargeStream(msgSize));
prod.send(m);
@@ -218,7 +218,7 @@
public void write(int b) throws IOException
{
numberOfBytes.incrementAndGet();
- if (UnitTestCase.getSamplebyte(position++) != b)
+ if (getSamplebyte(position++) != b)
{
System.out.println("Wrong byte at position " + position);
numberOfErrors.incrementAndGet();
@@ -251,7 +251,51 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+ protected byte getSamplebyte(final long position)
+ {
+ return (byte)('a' + (position) % ('z' - 'a' + 1));
+ }
+ // Creates a Fake LargeStream without using a real file
+ protected InputStream createFakeLargeStream(final long size) throws Exception
+ {
+ return new InputStream()
+ {
+ private long count;
+
+ private boolean closed = false;
+
+ @Override
+ public void close() throws IOException
+ {
+ super.close();
+ System.out.println("Sent " + count + " bytes over fakeOutputStream, while size = " + size);
+ closed = true;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ if (closed)
+ {
+ throw new IOException("Stream was closed");
+ }
+ if (count++ < size)
+ {
+ return getSamplebyte(count - 1);
+ }
+ else
+ {
+ return -1;
+ }
+ }
+ };
+
+ }
+
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -1,207 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.chunkmessage;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
-
-/**
- * A ChunkCleanupTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ChunkCleanupTest extends ChunkTestBase
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ChunkCleanupTest.class);
-
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
-
- public void testCleanup() throws Exception
- {
- clearData();
-
- FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(), "1234.tmp"));
-
- fileOut.write(new byte[1024]); // anything
-
- fileOut.close();
-
- Configuration config = createDefaultConfig();
-
- server = createServer(true, config, new HashMap<String, AddressSettings>());
-
- server.start();
-
- try
- {
-
- File directoryLarge = new File(getLargeMessagesDir());
-
- assertEquals("The startup should have been deleted 1234.tmp", 0, directoryLarge.list().length);
- }
- finally
- {
- server.stop();
- }
- }
-
- public void testFailureOnSendingFile() throws Exception
- {
- clearData();
-
- Configuration config = createDefaultConfig();
-
- config.setPagingMaxGlobalSizeBytes(20 * 1024);
- config.setPagingGlobalWatermarkSize(10 * 1024);
-
- server = createServer(true, config, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int numberOfBytes = 2 * 1024 * 1024;
-
- ClientSession session = null;
-
- class LocalCallback implements MockConnector.MockCallback
- {
- AtomicInteger counter = new AtomicInteger(0);
-
- ClientSession session;
-
- public void onWrite(final MessagingBuffer buffer)
- {
- log.info("calling cb onwrite** ");
- if (counter.incrementAndGet() == 5)
- {
- RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
- RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)server.getRemotingService();
- remotingServiceImpl.connectionException(conn.getID(),
- new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
- throw new IllegalStateException("blah");
- }
- }
- }
-
- LocalCallback callback = new LocalCallback();
-
- try
- {
- HashMap<String, Object> parameters = new HashMap<String, Object>();
- parameters.put("callback", callback);
-
- TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
- parameters);
-
- ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
-
- mockFactory.setBlockOnNonPersistentSend(false);
- mockFactory.setBlockOnPersistentSend(false);
- mockFactory.setBlockOnAcknowledge(false);
-
- session = mockFactory.createSession(null, null, false, true, true, false, 0);
-
- callback.session = session;
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
-
- try
- {
- producer.send(clientLarge);
-
- fail("Exception was expected!");
- }
- catch (Exception e)
- {
- }
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Exception ignored)
- {
- ignored.printStackTrace();
- }
- }
-
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -1,667 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.chunkmessage;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A ChunkTestBase
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Oct 29, 2008 11:43:52 AM
- *
- *
- */
-public class ChunkTestBase extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(ChunkTestBase.class);
-
- protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- protected MessagingServer server;
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void testChunks(final boolean isXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery) throws Exception
- {
- testChunks(isXA,
- rollbackFirstSend,
- useStreamOnConsume,
- realFiles,
- preAck,
- sendingBlocking,
- testBrowser,
- useMessageConsumer,
- numberOfMessages,
- numberOfBytes,
- waitOnConsumer,
- delayDelivery,
- -1,
- 10 * 1024);
- }
-
- protected void testChunks(final boolean isXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery,
- final int producerWindow,
- final int minSize) throws Exception
- {
- clearData();
-
- server = createServer(realFiles);
- server.start();
-
- try
- {
- ClientSessionFactory sf = createInVMFactory();
-
- if (sendingBlocking)
- {
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
- sf.setBlockOnAcknowledge(true);
- }
-
- if (producerWindow > 0)
- {
- sf.setSendWindowSize(producerWindow);
- }
-
- sf.setMinLargeMessageSize(minSize);
-
- ClientSession session;
-
- Xid xid = null;
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- if (rollbackFirstSend)
- {
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
-
- validateNoFilesOnLargeDir();
- }
-
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
-
- session.close();
-
- if (realFiles)
- {
- server.stop();
-
- server = createServer(realFiles);
- server.start();
-
- sf = createInVMFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- ClientConsumer consumer = null;
-
- for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
- {
-
- System.out.println("Iteration: " + iteration);
-
- session.stop();
-
- // first time with a browser
- consumer = session.createConsumer(ADDRESS, null, iteration == 0);
-
- if (useMessageConsumer)
- {
- final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
- final AtomicInteger errors = new AtomicInteger(0);
-
- MessageHandler handler = new MessageHandler()
- {
- int msgCounter;
-
- public void onMessage(final ClientMessage message)
- {
-
- try
- {
- System.out.println("Message on consumer: " + msgCounter);
-
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
- assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
- System.currentTimeMillis() - originalTime >= delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
- // the same
- // scheduled delivery time
- assertEquals(msgCounter,
- ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
- }
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- public void write(byte b[]) throws IOException
- {
- if (b[0] == getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- System.out.println("Read position " + bytesRead.get() + " on consumer");
- }
- else
- {
- System.out.println("Received invalid packet at position " + bytesRead.get());
- }
- }
-
- @Override
- public void write(int b) throws IOException
- {
- bytesRead.incrementAndGet();
- if (b == (byte)'a')
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- System.out.println("byte not as expected!");
- }
- }
- });
-
- assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
-
- MessagingBuffer buffer = message.getBody();
- buffer.resetReaderIndex();
- assertEquals(numberOfBytes, buffer.writerIndex());
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0)
- {
- System.out.println("Read " + b + " bytes");
- }
-
- assertEquals(getSamplebyte(b), buffer.readByte());
- }
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- System.out.println("Got an error");
- errors.incrementAndGet();
- }
- finally
- {
- latchDone.countDown();
- msgCounter++;
- }
- }
- };
-
- session.start();
-
- consumer.setMessageHandler(handler);
-
- assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
- assertEquals(0, errors.get());
-
- }
- else
- {
-
- session.start();
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- System.currentTimeMillis();
-
- ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
-
- assertNotNull(message);
-
- System.out.println("Message: " + i);
-
- System.currentTimeMillis();
-
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
- assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
- System.currentTimeMillis() - originalTime >= delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
- // scheduled delivery time
- assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
- }
-
- MessagingBuffer buffer = message.getBody();
- buffer.resetReaderIndex();
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- public void write(byte b[]) throws IOException
- {
- if (b[0] == getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- }
- else
- {
- System.out.println("Received invalid packet at position " + bytesRead.get());
- }
-
- }
-
- @Override
- public void write(int b) throws IOException
- {
- if (bytesRead.get() % (1024l * 1024l) == 0)
- {
- System.out.println("Read " + bytesRead.get() + " bytes");
- }
- if (b == (byte)'a')
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- System.out.println("byte not as expected!");
- }
- }
- });
-
- assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0l)
- {
- System.out.println("Read " + b + " bytes");
- }
- assertEquals(getSamplebyte(b), buffer.readByte());
- }
- }
-
- }
-
- }
- consumer.close();
-
- if (iteration == 0)
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
- }
- else
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
- }
- }
-
- session.close();
-
- long globalSize = server.getPostOffice().getPagingManager().getGlobalSize();
- assertEquals(0l, globalSize);
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- /**
- * @param useFile
- * @param numberOfMessages
- * @param numberOfIntegers
- * @param delayDelivery
- * @param testTime
- * @param session
- * @param producer
- * @throws FileNotFoundException
- * @throws IOException
- * @throws MessagingException
- */
- private void sendMessages(final int numberOfMessages,
- final long numberOfBytes,
- final long delayDelivery,
- final ClientSession session,
- final ClientProducer producer) throws Exception
- {
- System.out.println("NumberOfBytes = " + numberOfBytes);
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage message = session.createClientMessage(true);
-
- // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
- // test
- if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
- {
- System.out.println("Sending message (stream)" + i);
- message.setBodyInputStream(createFakeLargeStream(numberOfBytes));
- }
- else
- {
- System.out.println("Sending message (array)" + i);
- byte[] bytes = new byte[(int)numberOfBytes];
- for (int j = 0; j < bytes.length; j++)
- {
- bytes[j] = getSamplebyte(j);
- }
- message.getBody().writeBytes(bytes);
- }
- message.putIntProperty(new SimpleString("counter-message"), i);
- if (delayDelivery > 0)
- {
- long time = System.currentTimeMillis();
- message.putLongProperty(new SimpleString("original-time"), time);
- message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
-
- producer.send(message);
- }
- else
- {
- producer.send(message);
- }
- }
- }
-
- protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
- {
- MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
-
- for (int i = 0; i < numberOfIntegers; i++)
- {
- body.writeInt(i);
- }
-
- return body;
-
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
- {
- return createLargeClientMessage(session, numberOfBytes, true);
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session,
- final long numberOfBytes,
- final boolean persistent) throws Exception
- {
-
- ClientMessage clientMessage = session.createClientMessage(persistent);
-
- clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
-
- return clientMessage;
- }
-
- /**
- * @param session
- * @param queueToRead
- * @param numberOfIntegers
- * @throws MessagingException
- * @throws FileNotFoundException
- * @throws IOException
- */
- protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws MessagingException,
- FileNotFoundException,
- IOException
- {
- session.start();
-
- ClientConsumer consumer = session.createConsumer(queueToRead);
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- assertNotNull(clientMessage);
-
- clientMessage.acknowledge();
-
- session.commit();
-
- consumer.close();
- }
-
- /**
- * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
- */
- protected void validateNoFilesOnLargeDir() throws Exception
- {
- File largeMessagesFileDir = new File(getLargeMessagesDir());
-
- // Deleting the file is async... we keep looking for a period of the time until the file is really gone
- for (int i = 0; i < 100; i++)
- {
- if (largeMessagesFileDir.listFiles().length > 0)
- {
- Thread.sleep(10);
- }
- else
- {
- break;
- }
- }
-
- assertEquals(0, largeMessagesFileDir.listFiles().length);
- }
-
- protected OutputStream createFakeOutputStream() throws Exception
- {
-
- return new OutputStream()
- {
- private boolean closed = false;
-
- private int count;
-
- @Override
- public void close() throws IOException
- {
- super.close();
- closed = true;
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (count++ % 1024 * 1024 == 0)
- {
- System.out.println("OutputStream received " + count + " bytes");
- }
- if (closed)
- {
- throw new IOException("Stream was closed");
- }
- }
-
- };
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java (from rev 6459, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -0,0 +1,207 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.chunkmessage;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
+import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
+
+/**
+ * A LargeMessageCleanupTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LargeMessageCleanupTest extends LargeMessageTestBase
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(LargeMessageCleanupTest.class);
+
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testCleanup() throws Exception
+ {
+ clearData();
+
+ FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(), "1234.tmp"));
+
+ fileOut.write(new byte[1024]); // anything
+
+ fileOut.close();
+
+ Configuration config = createDefaultConfig();
+
+ server = createServer(true, config, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+
+ File directoryLarge = new File(getLargeMessagesDir());
+
+ assertEquals("The startup should have been deleted 1234.tmp", 0, directoryLarge.list().length);
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ public void testFailureOnSendingFile() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(20 * 1024);
+ config.setPagingGlobalWatermarkSize(10 * 1024);
+
+ server = createServer(true, config, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfBytes = 2 * 1024 * 1024;
+
+ ClientSession session = null;
+
+ class LocalCallback implements MockConnector.MockCallback
+ {
+ AtomicInteger counter = new AtomicInteger(0);
+
+ ClientSession session;
+
+ public void onWrite(final MessagingBuffer buffer)
+ {
+ log.info("calling cb onwrite** ");
+ if (counter.incrementAndGet() == 5)
+ {
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+ RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)server.getRemotingService();
+ remotingServiceImpl.connectionException(conn.getID(),
+ new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+ throw new IllegalStateException("blah");
+ }
+ }
+ }
+
+ LocalCallback callback = new LocalCallback();
+
+ try
+ {
+ HashMap<String, Object> parameters = new HashMap<String, Object>();
+ parameters.put("callback", callback);
+
+ TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
+ parameters);
+
+ ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
+
+ mockFactory.setBlockOnNonPersistentSend(false);
+ mockFactory.setBlockOnPersistentSend(false);
+ mockFactory.setBlockOnAcknowledge(false);
+
+ session = mockFactory.createSession(null, null, false, true, true, false, 0);
+
+ callback.session = session;
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
+
+ try
+ {
+ producer.send(clientLarge);
+
+ fail("Exception was expected!");
+ }
+ catch (Exception e)
+ {
+ }
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+ }
+
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java (from rev 6459, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -0,0 +1,670 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.chunkmessage;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A LargeMessageTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 29, 2008 11:43:52 AM
+ *
+ *
+ */
+public class LargeMessageTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
+
+ protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ protected MessagingServer server;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void testChunks(final boolean isXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery) throws Exception
+ {
+ testChunks(isXA,
+ rollbackFirstSend,
+ useStreamOnConsume,
+ realFiles,
+ preAck,
+ sendingBlocking,
+ testBrowser,
+ useMessageConsumer,
+ numberOfMessages,
+ numberOfBytes,
+ waitOnConsumer,
+ delayDelivery,
+ -1,
+ 10 * 1024);
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery,
+ final int producerWindow,
+ final int minSize) throws Exception
+ {
+ clearData();
+
+ server = createServer(realFiles);
+ server.start();
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ if (sendingBlocking)
+ {
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ }
+
+ if (producerWindow > 0)
+ {
+ sf.setSendWindowSize(producerWindow);
+ }
+
+ sf.setMinLargeMessageSize(minSize);
+
+ ClientSession session;
+
+ Xid xid = null;
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ if (rollbackFirstSend)
+ {
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ validateNoFilesOnLargeDir();
+ }
+
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+
+ server = createServer(realFiles);
+ server.start();
+
+ sf = createInVMFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientConsumer consumer = null;
+
+ for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
+ {
+
+ System.out.println("Iteration: " + iteration);
+
+ session.stop();
+
+ // first time with a browser
+ consumer = session.createConsumer(ADDRESS, null, iteration == 0);
+
+ if (useMessageConsumer)
+ {
+ final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ MessageHandler handler = new MessageHandler()
+ {
+ int msgCounter;
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ System.out.println("Message on consumer: " + msgCounter);
+
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
+ assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
+ // the same
+ // scheduled delivery time
+ assertEquals(msgCounter,
+ ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ public void write(byte b[]) throws IOException
+ {
+ if (b[0] == getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ System.out.println("Read position " + bytesRead.get() + " on consumer");
+ if (bytesRead.get() == 1126400l)
+ {
+ System.out.println("I'm here");
+ }
+ }
+ else
+ {
+ System.out.println("Received invalid packet at position " + bytesRead.get());
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ if (b == getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ System.out.println("byte not as expected!");
+ }
+ }
+ });
+
+ assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+
+ MessagingBuffer buffer = message.getBody();
+ buffer.resetReaderIndex();
+ assertEquals(numberOfBytes, buffer.writerIndex());
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0)
+ {
+ System.out.println("Read " + b + " bytes");
+ }
+
+ assertEquals(getSamplebyte(b), buffer.readByte());
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.out.println("Got an error");
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ latchDone.countDown();
+ msgCounter++;
+ }
+ }
+ };
+
+ session.start();
+
+ consumer.setMessageHandler(handler);
+
+ assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
+ assertEquals(0, errors.get());
+
+ }
+ else
+ {
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ System.currentTimeMillis();
+
+ ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
+
+ assertNotNull(message);
+
+ System.out.println("Message: " + i);
+
+ System.currentTimeMillis();
+
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
+ assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
+ // scheduled delivery time
+ assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
+ }
+
+ MessagingBuffer buffer = message.getBody();
+ buffer.resetReaderIndex();
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ public void write(byte b[]) throws IOException
+ {
+ if (b[0] == getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ }
+ else
+ {
+ System.out.println("Received invalid packet at position " + bytesRead.get());
+ }
+
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ if (bytesRead.get() % (1024l * 1024l) == 0)
+ {
+ System.out.println("Read " + bytesRead.get() + " bytes");
+ }
+ if (b == (byte)'a')
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ System.out.println("byte not as expected!");
+ }
+ }
+ });
+
+ assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0l)
+ {
+ System.out.println("Read " + b + " bytes");
+ }
+ assertEquals(getSamplebyte(b), buffer.readByte());
+ }
+ }
+
+ }
+
+ }
+ consumer.close();
+
+ if (iteration == 0)
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+ else
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+
+ session.close();
+
+ long globalSize = server.getPostOffice().getPagingManager().getGlobalSize();
+ assertEquals(0l, globalSize);
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ /**
+ * @param useFile
+ * @param numberOfMessages
+ * @param numberOfIntegers
+ * @param delayDelivery
+ * @param testTime
+ * @param session
+ * @param producer
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws MessagingException
+ */
+ private void sendMessages(final int numberOfMessages,
+ final long numberOfBytes,
+ final long delayDelivery,
+ final ClientSession session,
+ final ClientProducer producer) throws Exception
+ {
+ System.out.println("NumberOfBytes = " + numberOfBytes);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
+ // test
+ if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
+ {
+ System.out.println("Sending message (stream)" + i);
+ message.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+ }
+ else
+ {
+ System.out.println("Sending message (array)" + i);
+ byte[] bytes = new byte[(int)numberOfBytes];
+ for (int j = 0; j < bytes.length; j++)
+ {
+ bytes[j] = getSamplebyte(j);
+ }
+ message.getBody().writeBytes(bytes);
+ }
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ if (delayDelivery > 0)
+ {
+ long time = System.currentTimeMillis();
+ message.putLongProperty(new SimpleString("original-time"), time);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+ producer.send(message);
+ }
+ else
+ {
+ producer.send(message);
+ }
+ }
+ }
+
+ protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
+ {
+ MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
+
+ for (int i = 0; i < numberOfIntegers; i++)
+ {
+ body.writeInt(i);
+ }
+
+ return body;
+
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
+ {
+ return createLargeClientMessage(session, numberOfBytes, true);
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session,
+ final long numberOfBytes,
+ final boolean persistent) throws Exception
+ {
+
+ ClientMessage clientMessage = session.createClientMessage(persistent);
+
+ clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+
+ return clientMessage;
+ }
+
+ /**
+ * @param session
+ * @param queueToRead
+ * @param numberOfIntegers
+ * @throws MessagingException
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws MessagingException,
+ FileNotFoundException,
+ IOException
+ {
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueToRead);
+
+ ClientMessage clientMessage = consumer.receive(5000);
+
+ assertNotNull(clientMessage);
+
+ clientMessage.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+ }
+
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+ */
+ protected void validateNoFilesOnLargeDir() throws Exception
+ {
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+
+ // Deleting the file is async... we keep looking for a period of the time until the file is really gone
+ for (int i = 0; i < 100; i++)
+ {
+ if (largeMessagesFileDir.listFiles().length > 0)
+ {
+ Thread.sleep(10);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ assertEquals(0, largeMessagesFileDir.listFiles().length);
+ }
+
+ protected OutputStream createFakeOutputStream() throws Exception
+ {
+
+ return new OutputStream()
+ {
+ private boolean closed = false;
+
+ private int count;
+
+ @Override
+ public void close() throws IOException
+ {
+ super.close();
+ closed = true;
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (count++ % 1024 * 1024 == 0)
+ {
+ System.out.println("OutputStream received " + count + " bytes");
+ }
+ if (closed)
+ {
+ throw new IOException("Stream was closed");
+ }
+ }
+
+ };
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java (from rev 6459, trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -0,0 +1,1209 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.client;
+
+import java.util.HashMap;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.AssertionFailedError;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.Message;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.chunkmessage.LargeMessageTestBase;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A LargeMessageTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created 29-Sep-08 4:04:10 PM
+ *
+ *
+ */
+public class LargeMessageTest extends LargeMessageTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ final static int RECEIVE_WAIT_TIME = 60000;
+
+ private final int LARGE_MESSAGE_SIZE = 20 * 1024;
+
+ // Attributes ----------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Static --------------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageTest.class);
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testResendSmallStreamMessage() throws Exception
+ {
+ internalTestResendMessage(50000);
+ }
+
+ public void testResendLargeStreamMessage() throws Exception
+ {
+ internalTestResendMessage(150 * 1024);
+ }
+
+ public void internalTestResendMessage(long messageSize) throws Exception
+ {
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ SimpleString ADDRESS2 = ADDRESS.concat("-2");
+
+ session.createQueue(ADDRESS2, ADDRESS2, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientProducer producer2 = session.createProducer(ADDRESS2);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, false);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientConsumer consumer2 = session.createConsumer(ADDRESS2);
+
+ ClientMessage msg1 = consumer.receive(10000);
+ msg1.acknowledge();
+
+ producer2.send(msg1);
+
+
+ try
+ {
+ producer2.send(msg1);
+ fail("Expected Exception");
+ }
+ catch (Throwable e)
+ {
+ }
+
+ session.commit();
+
+ ClientMessage msg2 = consumer2.receive(10000);
+
+ assertNotNull(msg2);
+
+ msg2.acknowledge();
+
+ session.commit();
+
+ assertEquals(messageSize, msg2.getBodySize());
+
+
+ for (int i = 0 ; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg2.getBody().readByte());
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+ public void testFilePersistenceOneHugeMessage() throws Exception
+ {
+ testChunks(false, false, true, true, false, false, false, false, 1, 100 * 1024l * 1024l, RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
+ }
+
+ public void testFilePersistenceOneMessageStreaming() throws Exception
+ {
+ testChunks(false, false, true, true, false, false, false, false, 1, 100 * 1024l * 1024l, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceSmallMessageStreaming() throws Exception
+ {
+ testChunks(false, false, true, true, false, false, false, false, 100, 1024, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceOneHugeMessageConsumer() throws Exception
+ {
+ testChunks(false, false, true, true, false, false, false, true, 1, 100 * 1024 * 1024, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
+ }
+
+ public void testFilePersistence() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceConsumer() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceXAStream() throws Exception
+ {
+ testChunks(true, false, true, true, false, false, false, false, 1, 1024 * 1024, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlocked() throws Exception
+ {
+ testChunks(false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlockedConsumer() throws Exception
+ {
+ testChunks(false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlockedXA() throws Exception
+ {
+ testChunks(true, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlockedXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlockedPreACK() throws Exception
+ {
+ testChunks(false, true, false, true, true, true, true, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlockedPreACKConsumer() throws Exception
+ {
+ testChunks(false, true, false, true, true, true, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlockedPreACKXA() throws Exception
+ {
+ testChunks(true, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceBlockedPreACKXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testFilePersistenceDelayed() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ }
+
+ public void testFilePersistenceDelayedConsumer() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ }
+
+ public void testFilePersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ }
+
+ public void testFilePersistenceDelayedXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+ }
+
+ public void testNullPersistence() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testNullPersistenceConsumer() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testNullPersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, true, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testNullPersistenceXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testNullPersistenceDelayed() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ }
+
+ public void testNullPersistenceDelayedConsumer() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ }
+
+ public void testNullPersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ }
+
+ public void testNullPersistenceDelayedXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+ }
+
+ public void testPageOnLargeMessage() throws Exception
+ {
+ testPageOnLargeMessage(true, false);
+ }
+
+ public void testPageOnLargeMessageNullPersistence() throws Exception
+ {
+ testPageOnLargeMessage(false, false);
+
+ }
+
+ public void testSendSmallMessageXA() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, true, false, 100, 4, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendSmallMessageXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, true, true, 100, 4, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendSmallMessageNullPersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, true, false, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendSmallMessageNullPersistenceXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessageNullPersistenceDelayed() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessagePersistence() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, true, false, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessagePersistenceConsumer() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessagePersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, true, false, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessagePersistenceXAConsumer() throws Exception
+ {
+ testChunks(true, true, false, true, false, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessagePersistenceDelayed() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessagePersistenceDelayedXA() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception
+ {
+ testChunks(false, true, false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testTwoBindingsTwoStartedConsumers() throws Exception
+ {
+ // there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... The file must be deleted
+
+ try
+ {
+
+ server = createServer(true);
+
+ server.start();
+
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, queue[0], null, true);
+ session.createQueue(ADDRESS, queue[1], null, true);
+
+ int numberOfBytes = 400000;
+
+ Message clientFile = createLargeClientMessage(session, numberOfBytes);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ session.start();
+
+ producer.send(clientFile);
+
+ producer.close();
+
+ ClientConsumer consumer = session.createConsumer(queue[1]);
+ ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
+ assertNull(consumer.receive(1000));
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ consumer.close();
+
+ System.out.println("Stopping");
+
+ session.stop();
+
+ ClientConsumer consumer1 = session.createConsumer(queue[0]);
+
+ session.start();
+
+ msg = consumer1.receive(RECEIVE_WAIT_TIME);
+ assertNotNull(msg);
+ msg.acknowledge();
+ consumer1.close();
+
+ session.commit();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testTwoBindingsAndRestart() throws Exception
+ {
+ testTwoBindings(true);
+ }
+
+ public void testTwoBindingsNoRestart() throws Exception
+ {
+ testTwoBindings(false);
+ }
+
+ public void testTwoBindings(final boolean restart) throws Exception
+ {
+ // there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... The file must be deleted
+
+ try
+ {
+
+ server = createServer(true);
+
+ server.start();
+
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, queue[0], null, true);
+ session.createQueue(ADDRESS, queue[1], null, true);
+
+ int numberOfBytes = 400000;
+
+ Message clientFile = createLargeClientMessage(session, numberOfBytes);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+ producer.send(clientFile);
+
+ producer.close();
+
+ readMessage(session, queue[1], numberOfBytes);
+
+ if (restart)
+ {
+ session.close();
+
+ server.stop();
+
+ log.info("Restartning");
+
+ server = createServer(true);
+
+ server.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+ }
+
+ readMessage(session, queue[0], numberOfBytes);
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testSendRollbackXA() throws Exception
+ {
+ internalTestSendRollback(true);
+ }
+
+ public void testSendRollback() throws Exception
+ {
+ internalTestSendRollback(false);
+ }
+
+ private void internalTestSendRollback(final boolean isXA) throws Exception
+ {
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ session = sf.createSession(isXA, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ Xid xid = null;
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, 50000, false);
+
+ for (int i = 0; i < 1; i++)
+ {
+ producer.send(clientFile);
+ }
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testSimpleRollback() throws Exception
+ {
+ simpleRollbackInternalTest(false);
+ }
+
+ public void testSimpleRollbackXA() throws Exception
+ {
+ simpleRollbackInternalTest(true);
+ }
+
+ public void simpleRollbackInternalTest(final boolean isXA) throws Exception
+ {
+ // there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... The file must be deleted
+
+ try
+ {
+
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(isXA, false, false);
+
+ Xid xid = null;
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ int numberOfBytes = 200000;
+
+ session.start();
+
+ log.info("Session started");
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int n = 0; n < 10; n++)
+ {
+ Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
+
+ producer.send(clientFile);
+
+ assertNull(consumer.receiveImmediate());
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
+
+ producer.send(clientFile);
+
+ assertNull(consumer.receiveImmediate());
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+
+ ClientMessage clientMessage = consumer.receive(5000);
+
+ assertNotNull(clientMessage);
+
+ assertEquals(numberOfBytes, clientMessage.getBody().writerIndex());
+
+ clientMessage.acknowledge();
+
+ if (isXA)
+ {
+ if (i == 0)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ }
+ else
+ {
+ if (i == 0)
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testBufferMultipleLargeMessages() throws Exception
+ {
+ ClientSession session = null;
+ MessagingServer server = null;
+
+ final int SIZE = 10 * 1024;
+ final int NUMBER_OF_MESSAGES = 30;
+ try
+ {
+
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setMinLargeMessageSize(1024);
+ sf.setConsumerWindowSize(1024 * 1024);
+
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ Message clientFile = session.createClientMessage(true);
+ clientFile.setBodyInputStream(createFakeLargeStream(SIZE));
+ producer.send(clientFile);
+
+ }
+ session.commit();
+ producer.close();
+
+ session.start();
+
+ ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ // Wait the consumer to be complete with 10 messages before getting others
+ long timeout = System.currentTimeMillis() + 10000;
+ while (consumer.getBufferSize() < NUMBER_OF_MESSAGES && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(10);
+ }
+ assertEquals(NUMBER_OF_MESSAGES, consumer.getBufferSize());
+
+ // Reads the messages, rollback.. read them again
+ for (int trans = 0; trans < 2; trans++)
+ {
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+ assertNotNull(msg);
+
+ // it will ignore the buffer (not read it) on the first try
+ if (trans == 0)
+ {
+ for (int byteRead = 0; byteRead < SIZE; byteRead++)
+ {
+ assertEquals(getSamplebyte(byteRead), msg.getBody().readByte());
+ }
+ }
+
+ msg.acknowledge();
+ }
+ if (trans == 0)
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+
+ assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testSendStreamingSingleMessage() throws Exception
+ {
+ ClientSession session = null;
+ MessagingServer server = null;
+
+ final int SIZE = 10 * 1024 * 1024;
+ try
+ {
+
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setMinLargeMessageSize(100 * 1024);
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ Message clientFile = session.createClientMessage(true);
+ clientFile.setBodyInputStream(createFakeLargeStream(SIZE));
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ session.start();
+
+ System.out.println("Sending");
+ producer.send(clientFile);
+
+ producer.close();
+
+ System.out.println("Waiting");
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg2 = consumer.receive(10000);
+
+ msg2.acknowledge();
+
+ msg2.setOutputStream(createFakeOutputStream());
+ assertTrue(msg2.waitOutputStreamCompletion(60000));
+
+ // for (int i = 0; i < SIZE; i++)
+ // {
+ // byte value = msg2.getBody().readByte();
+ // assertEquals("Error position " + i, (byte)'a', value);
+ // }
+
+ session.commit();
+
+ assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ /** Receive messages but never reads them, leaving the buffer pending */
+ public void testIgnoreStreaming() throws Exception
+ {
+ ClientSession session = null;
+ MessagingServer server = null;
+
+ final int SIZE = 10 * 1024;
+ final int NUMBER_OF_MESSAGES = 1;
+ try
+ {
+
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setMinLargeMessageSize(1024);
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ Message msg = session.createClientMessage(true);
+ msg.setBodyInputStream(createFakeLargeStream(SIZE));
+ msg.putIntProperty(new SimpleString("key"), i);
+ producer.send(msg);
+
+ System.out.println("Sent msg " + i);
+ }
+
+ session.start();
+
+ System.out.println("Sending");
+
+ producer.close();
+
+ System.out.println("Waiting");
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(50000);
+ assertNotNull(msg);
+
+ assertEquals(i, msg.getProperty(new SimpleString("key")));
+
+ msg.acknowledge();
+ }
+
+ consumer.close();
+
+ session.commit();
+
+ assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ System.out.println("Thread done");
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ clearData();
+ log.info("\n*********************************************************************************\n Starting " + getName() +
+ "\n*********************************************************************************");
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ log.info("\n*********************************************************************************\nDone with " + getName() +
+ "\n*********************************************************************************");
+ super.tearDown();
+ }
+
+ protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
+ {
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(20 * 1024);
+ config.setPagingGlobalWatermarkSize(10 * 1024);
+
+ server = createServer(realFiles, config, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfBytes = 1024;
+
+ final int numberOfBytesBigMessage = 400000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ if (sendBlocking)
+ {
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ }
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ // printBuffer("body to be sent : " , body);
+
+ ClientMessage message = null;
+
+ MessagingBuffer body = null;
+
+ for (int i = 0; i < 100; i++)
+ {
+ MessagingBuffer bodyLocal = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfBytes);
+
+ for (int j = 1; j <= numberOfBytes; j++)
+ {
+ bodyLocal.writeInt(j);
+ }
+
+ if (i == 0)
+ {
+ body = bodyLocal;
+ }
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ producer.send(message);
+ }
+
+ ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
+
+ producer.send(clientFile);
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+
+ server = createServer(true, config, new HashMap<String, AddressSettings>());
+ server.start();
+
+ sf = createInVMFactory();
+ }
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(RECEIVE_WAIT_TIME);
+
+ log.info("got message " + i);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+
+ assertNotNull(message2);
+
+ try
+ {
+ assertEqualsByteArrays(body.writerIndex(), body.array(), message2.getBody().array());
+ }
+ catch (AssertionFailedError e)
+ {
+ log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
+ log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+
+ session.close();
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ readMessage(session, ADDRESS, numberOfBytesBigMessage);
+
+ // printBuffer("message received : ", message2.getBody());
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -1,1209 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.client;
-
-import java.util.HashMap;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import junit.framework.AssertionFailedError;
-
-import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientConsumerInternal;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.chunkmessage.ChunkTestBase;
-import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A TestMessageChunk
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created 29-Sep-08 4:04:10 PM
- *
- *
- */
-public class MessageChunkTest extends ChunkTestBase
-{
-
- // Constants -----------------------------------------------------
-
- final static int RECEIVE_WAIT_TIME = 60000;
-
- private final int LARGE_MESSAGE_SIZE = 20 * 1024;
-
- // Attributes ----------------------------------------------------
-
- static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- // Static --------------------------------------------------------
- private static final Logger log = Logger.getLogger(MessageChunkTest.class);
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testResendSmallStreamMessage() throws Exception
- {
- internalTestResendMessage(50000);
- }
-
- public void testResendLargeStreamMessage() throws Exception
- {
- internalTestResendMessage(150 * 1024);
- }
-
- public void internalTestResendMessage(long messageSize) throws Exception
- {
- ClientSession session = null;
-
- try
- {
- server = createServer(true);
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- session = sf.createSession(false, false, false);
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
- SimpleString ADDRESS2 = ADDRESS.concat("-2");
-
- session.createQueue(ADDRESS2, ADDRESS2, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientProducer producer2 = session.createProducer(ADDRESS2);
-
- Message clientFile = createLargeClientMessage(session, messageSize, false);
-
- producer.send(clientFile);
-
- session.commit();
-
- session.start();
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
- ClientConsumer consumer2 = session.createConsumer(ADDRESS2);
-
- ClientMessage msg1 = consumer.receive(10000);
- msg1.acknowledge();
-
- producer2.send(msg1);
-
-
- try
- {
- producer2.send(msg1);
- fail("Expected Exception");
- }
- catch (Throwable e)
- {
- }
-
- session.commit();
-
- ClientMessage msg2 = consumer2.receive(10000);
-
- assertNotNull(msg2);
-
- msg2.acknowledge();
-
- session.commit();
-
- assertEquals(messageSize, msg2.getBodySize());
-
-
- for (int i = 0 ; i < messageSize; i++)
- {
- assertEquals(getSamplebyte(i), msg2.getBody().readByte());
- }
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
- public void testMessageChunkFilePersistenceOneHugeMessage() throws Exception
- {
- testChunks(false, false, true, true, false, false, false, false, 1, 100 * 1024l * 1024l, RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
- }
-
- public void testMessageChunkFilePersistenceOneMessageStreaming() throws Exception
- {
- testChunks(false, false, true, true, false, false, false, false, 1, 100 * 1024l * 1024l, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceSmallMessageStreaming() throws Exception
- {
- testChunks(false, false, true, true, false, false, false, false, 100, 1024, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceOneHugeMessageConsumer() throws Exception
- {
- testChunks(false, false, true, true, false, false, false, true, 1, 100 * 1024 * 1024, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
- }
-
- public void testMessageChunkFilePersistence() throws Exception
- {
- testChunks(false, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceConsumer() throws Exception
- {
- testChunks(false, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceXA() throws Exception
- {
- testChunks(true, true, false, true, false, false, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceXAStream() throws Exception
- {
- testChunks(true, false, true, true, false, false, false, false, 1, 1024 * 1024, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceXAConsumer() throws Exception
- {
- testChunks(true, true, false, true, false, false, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlocked() throws Exception
- {
- testChunks(false, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedConsumer() throws Exception
- {
- testChunks(false, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedXA() throws Exception
- {
- testChunks(true, true, false, true, false, true, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedXAConsumer() throws Exception
- {
- testChunks(true, true, false, true, false, true, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedPreACK() throws Exception
- {
- testChunks(false, true, false, true, true, true, true, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedPreACKConsumer() throws Exception
- {
- testChunks(false, true, false, true, true, true, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedPreACKXA() throws Exception
- {
- testChunks(true, true, false, true, true, true, true, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedPreACKXAConsumer() throws Exception
- {
- testChunks(true, true, false, true, true, true, true, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceDelayed() throws Exception
- {
- testChunks(false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
- }
-
- public void testMessageChunkFilePersistenceDelayedConsumer() throws Exception
- {
- testChunks(false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
- }
-
- public void testMessageChunkFilePersistenceDelayedXA() throws Exception
- {
- testChunks(true, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
- }
-
- public void testMessageChunkFilePersistenceDelayedXAConsumer() throws Exception
- {
- testChunks(true, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
- }
-
- public void testMessageChunkNullPersistence() throws Exception
- {
- testChunks(false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkNullPersistenceConsumer() throws Exception
- {
- testChunks(false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkNullPersistenceXA() throws Exception
- {
- testChunks(true, true, false, false, false, false, true, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkNullPersistenceXAConsumer() throws Exception
- {
- testChunks(true, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkNullPersistenceDelayed() throws Exception
- {
- testChunks(false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
- }
-
- public void testMessageChunkNullPersistenceDelayedConsumer() throws Exception
- {
- testChunks(false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
- }
-
- public void testMessageChunkNullPersistenceDelayedXA() throws Exception
- {
- testChunks(true, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
- }
-
- public void testMessageChunkNullPersistenceDelayedXAConsumer() throws Exception
- {
- testChunks(true, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
- }
-
- public void testPageOnLargeMessage() throws Exception
- {
- testPageOnLargeMessage(true, false);
- }
-
- public void testPageOnLargeMessageNullPersistence() throws Exception
- {
- testPageOnLargeMessage(false, false);
-
- }
-
- public void testSendSmallMessageXA() throws Exception
- {
- testChunks(true, true, false, true, false, false, true, false, 100, 4, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendSmallMessageXAConsumer() throws Exception
- {
- testChunks(true, true, false, true, false, false, true, true, 100, 4, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendSmallMessageNullPersistenceXA() throws Exception
- {
- testChunks(true, true, false, false, false, false, true, false, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendSmallMessageNullPersistenceXAConsumer() throws Exception
- {
- testChunks(true, true, false, false, false, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessageNullPersistenceDelayed() throws Exception
- {
- testChunks(false, true, false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessageNullPersistenceDelayedConsumer() throws Exception
- {
- testChunks(false, true, false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception
- {
- testChunks(true, true, false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessageNullPersistenceDelayedXAConsumer() throws Exception
- {
- testChunks(true, true, false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessagePersistence() throws Exception
- {
- testChunks(false, true, false, true, false, false, true, false, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessagePersistenceConsumer() throws Exception
- {
- testChunks(false, true, false, true, false, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessagePersistenceXA() throws Exception
- {
- testChunks(true, true, false, true, false, false, true, false, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessagePersistenceXAConsumer() throws Exception
- {
- testChunks(true, true, false, true, false, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessagePersistenceDelayed() throws Exception
- {
- testChunks(false, true, false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessagePersistenceDelayedConsumer() throws Exception
- {
- testChunks(false, true, false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessagePersistenceDelayedXA() throws Exception
- {
- testChunks(false, true, false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessagePersistenceDelayedXAConsumer() throws Exception
- {
- testChunks(false, true, false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testTwoBindingsTwoStartedConsumers() throws Exception
- {
- // there are two bindings.. one is ACKed, the other is not, the server is restarted
- // The other binding is acked... The file must be deleted
-
- try
- {
-
- server = createServer(true);
-
- server.start();
-
- SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, queue[0], null, true);
- session.createQueue(ADDRESS, queue[1], null, true);
-
- int numberOfBytes = 400000;
-
- Message clientFile = createLargeClientMessage(session, numberOfBytes);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- session.start();
-
- producer.send(clientFile);
-
- producer.close();
-
- ClientConsumer consumer = session.createConsumer(queue[1]);
- ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
- assertNull(consumer.receive(1000));
- assertNotNull(msg);
-
- msg.acknowledge();
- consumer.close();
-
- System.out.println("Stopping");
-
- session.stop();
-
- ClientConsumer consumer1 = session.createConsumer(queue[0]);
-
- session.start();
-
- msg = consumer1.receive(RECEIVE_WAIT_TIME);
- assertNotNull(msg);
- msg.acknowledge();
- consumer1.close();
-
- session.commit();
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- public void testTwoBindingsAndRestart() throws Exception
- {
- testTwoBindings(true);
- }
-
- public void testTwoBindingsNoRestart() throws Exception
- {
- testTwoBindings(false);
- }
-
- public void testTwoBindings(final boolean restart) throws Exception
- {
- // there are two bindings.. one is ACKed, the other is not, the server is restarted
- // The other binding is acked... The file must be deleted
-
- try
- {
-
- server = createServer(true);
-
- server.start();
-
- SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, queue[0], null, true);
- session.createQueue(ADDRESS, queue[1], null, true);
-
- int numberOfBytes = 400000;
-
- Message clientFile = createLargeClientMessage(session, numberOfBytes);
-
- ClientProducer producer = session.createProducer(ADDRESS);
- producer.send(clientFile);
-
- producer.close();
-
- readMessage(session, queue[1], numberOfBytes);
-
- if (restart)
- {
- session.close();
-
- server.stop();
-
- log.info("Restartning");
-
- server = createServer(true);
-
- server.start();
-
- sf = createInVMFactory();
-
- session = sf.createSession(null, null, false, true, true, false, 0);
- }
-
- readMessage(session, queue[0], numberOfBytes);
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- public void testSendRollbackXA() throws Exception
- {
- internalTestSendRollback(true);
- }
-
- public void testSendRollback() throws Exception
- {
- internalTestSendRollback(false);
- }
-
- private void internalTestSendRollback(final boolean isXA) throws Exception
- {
-
- ClientSession session = null;
-
- try
- {
- server = createServer(true);
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- session = sf.createSession(isXA, false, false);
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
- Xid xid = null;
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- Message clientFile = createLargeClientMessage(session, 50000, false);
-
- for (int i = 0; i < 1; i++)
- {
- producer.send(clientFile);
- }
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.rollback(xid);
- }
- else
- {
- session.rollback();
- }
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- public void testSimpleRollback() throws Exception
- {
- simpleRollbackInternalTest(false);
- }
-
- public void testSimpleRollbackXA() throws Exception
- {
- simpleRollbackInternalTest(true);
- }
-
- public void simpleRollbackInternalTest(final boolean isXA) throws Exception
- {
- // there are two bindings.. one is ACKed, the other is not, the server is restarted
- // The other binding is acked... The file must be deleted
-
- try
- {
-
- server = createServer(true);
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(isXA, false, false);
-
- Xid xid = null;
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- int numberOfBytes = 200000;
-
- session.start();
-
- log.info("Session started");
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- for (int n = 0; n < 10; n++)
- {
- Message clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
-
- producer.send(clientFile);
-
- assertNull(consumer.receiveImmediate());
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
-
- clientFile = createLargeClientMessage(session, numberOfBytes, n % 2 == 0);
-
- producer.send(clientFile);
-
- assertNull(consumer.receiveImmediate());
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
-
- for (int i = 0; i < 2; i++)
- {
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- assertNotNull(clientMessage);
-
- assertEquals(numberOfBytes, clientMessage.getBody().writerIndex());
-
- clientMessage.acknowledge();
-
- if (isXA)
- {
- if (i == 0)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- }
- else
- {
- if (i == 0)
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- }
- }
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- public void testBufferMultipleLargeMessages() throws Exception
- {
- ClientSession session = null;
- MessagingServer server = null;
-
- final int SIZE = 10 * 1024;
- final int NUMBER_OF_MESSAGES = 30;
- try
- {
-
- server = createServer(true);
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- sf.setMinLargeMessageSize(1024);
- sf.setConsumerWindowSize(1024 * 1024);
-
- session = sf.createSession(null, null, false, false, false, false, 0);
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
- {
- Message clientFile = session.createClientMessage(true);
- clientFile.setBodyInputStream(createFakeLargeStream(SIZE));
- producer.send(clientFile);
-
- }
- session.commit();
- producer.close();
-
- session.start();
-
- ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
-
- // Wait the consumer to be complete with 10 messages before getting others
- long timeout = System.currentTimeMillis() + 10000;
- while (consumer.getBufferSize() < NUMBER_OF_MESSAGES && timeout > System.currentTimeMillis())
- {
- Thread.sleep(10);
- }
- assertEquals(NUMBER_OF_MESSAGES, consumer.getBufferSize());
-
- // Reads the messages, rollback.. read them again
- for (int trans = 0; trans < 2; trans++)
- {
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
- {
- ClientMessage msg = consumer.receive(10000);
- assertNotNull(msg);
-
- // it will ignore the buffer (not read it) on the first try
- if (trans == 0)
- {
- for (int byteRead = 0; byteRead < SIZE; byteRead++)
- {
- assertEquals(getSamplebyte(byteRead), msg.getBody().readByte());
- }
- }
-
- msg.acknowledge();
- }
- if (trans == 0)
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
-
- assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
- }
- finally
- {
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- public void testSendStreamingSingleMessage() throws Exception
- {
- ClientSession session = null;
- MessagingServer server = null;
-
- final int SIZE = 10 * 1024 * 1024;
- try
- {
-
- server = createServer(true);
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- sf.setMinLargeMessageSize(100 * 1024);
-
- session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- Message clientFile = session.createClientMessage(true);
- clientFile.setBodyInputStream(createFakeLargeStream(SIZE));
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- session.start();
-
- System.out.println("Sending");
- producer.send(clientFile);
-
- producer.close();
-
- System.out.println("Waiting");
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- ClientMessage msg2 = consumer.receive(10000);
-
- msg2.acknowledge();
-
- msg2.setOutputStream(createFakeOutputStream());
- assertTrue(msg2.waitOutputStreamCompletion(60000));
-
- // for (int i = 0; i < SIZE; i++)
- // {
- // byte value = msg2.getBody().readByte();
- // assertEquals("Error position " + i, (byte)'a', value);
- // }
-
- session.commit();
-
- assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
- }
- finally
- {
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- /** Receive messages but never reads them, leaving the buffer pending */
- public void testIgnoreStreaming() throws Exception
- {
- ClientSession session = null;
- MessagingServer server = null;
-
- final int SIZE = 10 * 1024;
- final int NUMBER_OF_MESSAGES = 1;
- try
- {
-
- server = createServer(true);
-
- server.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- sf.setMinLargeMessageSize(1024);
-
- session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
- {
- Message msg = session.createClientMessage(true);
- msg.setBodyInputStream(createFakeLargeStream(SIZE));
- msg.putIntProperty(new SimpleString("key"), i);
- producer.send(msg);
-
- System.out.println("Sent msg " + i);
- }
-
- session.start();
-
- System.out.println("Sending");
-
- producer.close();
-
- System.out.println("Waiting");
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
- {
- ClientMessage msg = consumer.receive(50000);
- assertNotNull(msg);
-
- assertEquals(i, msg.getProperty(new SimpleString("key")));
-
- msg.acknowledge();
- }
-
- consumer.close();
-
- session.commit();
-
- assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
- System.out.println("Thread done");
- }
- finally
- {
- try
- {
- session.close();
- }
- catch (Throwable ignored)
- {
- }
-
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- clearData();
- log.info("\n*********************************************************************************\n Starting " + getName() +
- "\n*********************************************************************************");
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- log.info("\n*********************************************************************************\nDone with " + getName() +
- "\n*********************************************************************************");
- super.tearDown();
- }
-
- protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
- {
- Configuration config = createDefaultConfig();
-
- config.setPagingMaxGlobalSizeBytes(20 * 1024);
- config.setPagingGlobalWatermarkSize(10 * 1024);
-
- server = createServer(realFiles, config, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int numberOfBytes = 1024;
-
- final int numberOfBytesBigMessage = 400000;
-
- try
- {
- ClientSessionFactory sf = createInVMFactory();
-
- if (sendBlocking)
- {
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
- sf.setBlockOnAcknowledge(true);
- }
-
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- // printBuffer("body to be sent : " , body);
-
- ClientMessage message = null;
-
- MessagingBuffer body = null;
-
- for (int i = 0; i < 100; i++)
- {
- MessagingBuffer bodyLocal = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfBytes);
-
- for (int j = 1; j <= numberOfBytes; j++)
- {
- bodyLocal.writeInt(j);
- }
-
- if (i == 0)
- {
- body = bodyLocal;
- }
-
- message = session.createClientMessage(true);
- message.setBody(bodyLocal);
-
- producer.send(message);
- }
-
- ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
-
- producer.send(clientFile);
-
- session.close();
-
- if (realFiles)
- {
- server.stop();
-
- server = createServer(true, config, new HashMap<String, AddressSettings>());
- server.start();
-
- sf = createInVMFactory();
- }
-
- session = sf.createSession(null, null, false, true, true, false, 0);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(RECEIVE_WAIT_TIME);
-
- log.info("got message " + i);
-
- assertNotNull(message2);
-
- message2.acknowledge();
-
- assertNotNull(message2);
-
- try
- {
- assertEqualsByteArrays(body.writerIndex(), body.array(), message2.getBody().array());
- }
- catch (AssertionFailedError e)
- {
- log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
- log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
- throw e;
- }
- }
-
- consumer.close();
-
- session.close();
-
- session = sf.createSession(null, null, false, true, true, false, 0);
-
- readMessage(session, ADDRESS, numberOfBytesBigMessage);
-
- // printBuffer("message received : ", message2.getBody());
-
- session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java (from rev 6459, trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.stress.chunk;
+
+import org.jboss.messaging.tests.integration.chunkmessage.LargeMessageTestBase;
+
+/**
+ * A MessageChunkSoakTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 27, 2008 5:07:05 PM
+ *
+ *
+ */
+public class LargeMessageStressTest extends LargeMessageTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testMessageChunkFilePersistenceOneHugeMessage() throws Exception
+ {
+ testChunks(false, false, true, true, false, false, false, true, 1, 200 * 1024l * 1024l + 1024l, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2009-04-16 20:33:04 UTC (rev 6459)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2009-04-16 23:06:20 UTC (rev 6460)
@@ -1,62 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.stress.chunk;
-
-import org.jboss.messaging.tests.integration.chunkmessage.ChunkTestBase;
-
-/**
- * A MessageChunkSoakTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Oct 27, 2008 5:07:05 PM
- *
- *
- */
-public class MessageChunkStressTest extends ChunkTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testMessageChunkFilePersistenceOneHugeMessage() throws Exception
- {
- testChunks(false, false, true, true, false, false, false, false, 1, 4l * 1024 * 1024l * 1024l + 1024l, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
More information about the jboss-cvs-commits
mailing list