[jboss-cvs] JBoss Messaging SVN: r6639 - in trunk/tests/src/org/jboss/messaging/tests: integration/chunkmessage and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Apr 30 13:09:51 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-30 13:09:51 -0400 (Thu, 30 Apr 2009)
New Revision: 6639
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/
trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnector.java
trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java
trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Removing verbose outputs and renaming package on testsuite
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java 2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -1,207 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.chunkmessage;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
-
-/**
- * A LargeMessageCleanupTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class LargeMessageCleanupTest extends LargeMessageTestBase
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(LargeMessageCleanupTest.class);
-
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
-
- public void testCleanup() throws Exception
- {
- clearData();
-
- FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(), "1234.tmp"));
-
- fileOut.write(new byte[1024]); // anything
-
- fileOut.close();
-
- Configuration config = createDefaultConfig();
-
- server = createServer(true, config, new HashMap<String, AddressSettings>());
-
- server.start();
-
- try
- {
-
- File directoryLarge = new File(getLargeMessagesDir());
-
- assertEquals("The startup should have been deleted 1234.tmp", 0, directoryLarge.list().length);
- }
- finally
- {
- server.stop();
- }
- }
-
- public void testFailureOnSendingFile() throws Exception
- {
- clearData();
-
- Configuration config = createDefaultConfig();
-
- config.setPagingMaxGlobalSizeBytes(20 * 1024);
- config.setPagingGlobalWatermarkSize(10 * 1024);
-
- server = createServer(true, config, new HashMap<String, AddressSettings>());
-
- server.start();
-
- final int numberOfBytes = 2 * 1024 * 1024;
-
- ClientSession session = null;
-
- class LocalCallback implements MockConnector.MockCallback
- {
- AtomicInteger counter = new AtomicInteger(0);
-
- ClientSession session;
-
- public void onWrite(final MessagingBuffer buffer)
- {
- log.info("calling cb onwrite** ");
- if (counter.incrementAndGet() == 5)
- {
- RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
- RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)server.getRemotingService();
- remotingServiceImpl.connectionException(conn.getID(),
- new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
- throw new IllegalStateException("blah");
- }
- }
- }
-
- LocalCallback callback = new LocalCallback();
-
- try
- {
- HashMap<String, Object> parameters = new HashMap<String, Object>();
- parameters.put("callback", callback);
-
- TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
- parameters);
-
- ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
-
- mockFactory.setBlockOnNonPersistentSend(false);
- mockFactory.setBlockOnPersistentSend(false);
- mockFactory.setBlockOnAcknowledge(false);
-
- session = mockFactory.createSession(null, null, false, true, true, false, 0);
-
- callback.session = session;
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
-
- try
- {
- producer.send(clientLarge);
-
- fail("Exception was expected!");
- }
- catch (Exception e)
- {
- }
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Exception ignored)
- {
- ignored.printStackTrace();
- }
- }
-
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java 2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -1,670 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.chunkmessage;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.MessageHandler;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.tests.util.ServiceTestBase;
-import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A LargeMessageTestBase
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Oct 29, 2008 11:43:52 AM
- *
- *
- */
-public class LargeMessageTestBase extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
-
- protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- protected MessagingServer server;
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void testChunks(final boolean isXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery) throws Exception
- {
- testChunks(isXA,
- rollbackFirstSend,
- useStreamOnConsume,
- realFiles,
- preAck,
- sendingBlocking,
- testBrowser,
- useMessageConsumer,
- numberOfMessages,
- numberOfBytes,
- waitOnConsumer,
- delayDelivery,
- -1,
- 10 * 1024);
- }
-
- protected void testChunks(final boolean isXA,
- final boolean rollbackFirstSend,
- final boolean useStreamOnConsume,
- final boolean realFiles,
- final boolean preAck,
- final boolean sendingBlocking,
- final boolean testBrowser,
- final boolean useMessageConsumer,
- final int numberOfMessages,
- final long numberOfBytes,
- final int waitOnConsumer,
- final long delayDelivery,
- final int producerWindow,
- final int minSize) throws Exception
- {
- clearData();
-
- server = createServer(realFiles);
- server.start();
-
- try
- {
- ClientSessionFactory sf = createInVMFactory();
-
- if (sendingBlocking)
- {
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
- sf.setBlockOnAcknowledge(true);
- }
-
- if (producerWindow > 0)
- {
- sf.setProducerWindowSize(producerWindow);
- }
-
- sf.setMinLargeMessageSize(minSize);
-
- ClientSession session;
-
- Xid xid = null;
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- if (rollbackFirstSend)
- {
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
-
- validateNoFilesOnLargeDir();
- }
-
- sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
-
- session.close();
-
- if (realFiles)
- {
- server.stop();
-
- server = createServer(realFiles);
- server.start();
-
- sf = createInVMFactory();
- }
-
- session = sf.createSession(null, null, isXA, false, false, preAck, 0);
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- ClientConsumer consumer = null;
-
- for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
- {
-
- System.out.println("Iteration: " + iteration);
-
- session.stop();
-
- // first time with a browser
- consumer = session.createConsumer(ADDRESS, null, iteration == 0);
-
- if (useMessageConsumer)
- {
- final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
- final AtomicInteger errors = new AtomicInteger(0);
-
- MessageHandler handler = new MessageHandler()
- {
- int msgCounter;
-
- public void onMessage(final ClientMessage message)
- {
-
- try
- {
- System.out.println("Message on consumer: " + msgCounter);
-
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
- assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
- System.currentTimeMillis() - originalTime >= delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
- // the same
- // scheduled delivery time
- assertEquals(msgCounter,
- ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
- }
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- public void write(byte b[]) throws IOException
- {
- if (b[0] == getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- System.out.println("Read position " + bytesRead.get() + " on consumer");
- if (bytesRead.get() == 1126400l)
- {
- System.out.println("I'm here");
- }
- }
- else
- {
- System.out.println("Received invalid packet at position " + bytesRead.get());
- }
- }
-
- @Override
- public void write(int b) throws IOException
- {
- if (b == getSamplebyte(bytesRead.get()))
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- System.out.println("byte not as expected!");
- }
- }
- });
-
- assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
-
- MessagingBuffer buffer = message.getBody();
- buffer.resetReaderIndex();
- assertEquals(numberOfBytes, buffer.writerIndex());
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0)
- {
- System.out.println("Read " + b + " bytes");
- }
-
- assertEquals(getSamplebyte(b), buffer.readByte());
- }
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- System.out.println("Got an error");
- errors.incrementAndGet();
- }
- finally
- {
- latchDone.countDown();
- msgCounter++;
- }
- }
- };
-
- session.start();
-
- consumer.setMessageHandler(handler);
-
- assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
- assertEquals(0, errors.get());
-
- }
- else
- {
-
- session.start();
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- System.currentTimeMillis();
-
- ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
-
- assertNotNull(message);
-
- System.out.println("Message: " + i);
-
- System.currentTimeMillis();
-
- if (delayDelivery > 0)
- {
- long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
- assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
- System.currentTimeMillis() - originalTime >= delayDelivery);
- }
-
- if (!preAck)
- {
- message.acknowledge();
- }
-
- assertNotNull(message);
-
- if (delayDelivery <= 0)
- {
- // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
- // scheduled delivery time
- assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
- }
-
- MessagingBuffer buffer = message.getBody();
- buffer.resetReaderIndex();
-
- if (useStreamOnConsume)
- {
- final AtomicLong bytesRead = new AtomicLong(0);
- message.saveToOutputStream(new OutputStream()
- {
-
- public void write(byte b[]) throws IOException
- {
- if (b[0] == getSamplebyte(bytesRead.get()))
- {
- bytesRead.addAndGet(b.length);
- }
- else
- {
- System.out.println("Received invalid packet at position " + bytesRead.get());
- }
-
- }
-
- @Override
- public void write(int b) throws IOException
- {
- if (bytesRead.get() % (1024l * 1024l) == 0)
- {
- System.out.println("Read " + bytesRead.get() + " bytes");
- }
- if (b == (byte)'a')
- {
- bytesRead.incrementAndGet();
- }
- else
- {
- System.out.println("byte not as expected!");
- }
- }
- });
-
- assertEquals(numberOfBytes, bytesRead.get());
- }
- else
- {
- for (long b = 0; b < numberOfBytes; b++)
- {
- if (b % (1024l * 1024l) == 0l)
- {
- System.out.println("Read " + b + " bytes");
- }
- assertEquals(getSamplebyte(b), buffer.readByte());
- }
- }
-
- }
-
- }
- consumer.close();
-
- if (iteration == 0)
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
- }
- else
- {
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
- }
- }
-
- session.close();
-
- long globalSize = server.getPostOffice().getPagingManager().getGlobalSize();
- assertEquals(0l, globalSize);
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
- assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- /**
- * @param useFile
- * @param numberOfMessages
- * @param numberOfIntegers
- * @param delayDelivery
- * @param testTime
- * @param session
- * @param producer
- * @throws FileNotFoundException
- * @throws IOException
- * @throws MessagingException
- */
- private void sendMessages(final int numberOfMessages,
- final long numberOfBytes,
- final long delayDelivery,
- final ClientSession session,
- final ClientProducer producer) throws Exception
- {
- System.out.println("NumberOfBytes = " + numberOfBytes);
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage message = session.createClientMessage(true);
-
- // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
- // test
- if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
- {
- System.out.println("Sending message (stream)" + i);
- message.setBodyInputStream(createFakeLargeStream(numberOfBytes));
- }
- else
- {
- System.out.println("Sending message (array)" + i);
- byte[] bytes = new byte[(int)numberOfBytes];
- for (int j = 0; j < bytes.length; j++)
- {
- bytes[j] = getSamplebyte(j);
- }
- message.getBody().writeBytes(bytes);
- }
- message.putIntProperty(new SimpleString("counter-message"), i);
- if (delayDelivery > 0)
- {
- long time = System.currentTimeMillis();
- message.putLongProperty(new SimpleString("original-time"), time);
- message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
-
- producer.send(message);
- }
- else
- {
- producer.send(message);
- }
- }
- }
-
- protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
- {
- MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
-
- for (int i = 0; i < numberOfIntegers; i++)
- {
- body.writeInt(i);
- }
-
- return body;
-
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
- {
- return createLargeClientMessage(session, numberOfBytes, true);
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session,
- final long numberOfBytes,
- final boolean persistent) throws Exception
- {
-
- ClientMessage clientMessage = session.createClientMessage(persistent);
-
- clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
-
- return clientMessage;
- }
-
- /**
- * @param session
- * @param queueToRead
- * @param numberOfIntegers
- * @throws MessagingException
- * @throws FileNotFoundException
- * @throws IOException
- */
- protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws MessagingException,
- FileNotFoundException,
- IOException
- {
- session.start();
-
- ClientConsumer consumer = session.createConsumer(queueToRead);
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- assertNotNull(clientMessage);
-
- clientMessage.acknowledge();
-
- session.commit();
-
- consumer.close();
- }
-
- /**
- * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
- */
- protected void validateNoFilesOnLargeDir() throws Exception
- {
- File largeMessagesFileDir = new File(getLargeMessagesDir());
-
- // Deleting the file is async... we keep looking for a period of the time until the file is really gone
- for (int i = 0; i < 100; i++)
- {
- if (largeMessagesFileDir.listFiles().length > 0)
- {
- Thread.sleep(10);
- }
- else
- {
- break;
- }
- }
-
- assertEquals(0, largeMessagesFileDir.listFiles().length);
- }
-
- protected OutputStream createFakeOutputStream() throws Exception
- {
-
- return new OutputStream()
- {
- private boolean closed = false;
-
- private int count;
-
- @Override
- public void close() throws IOException
- {
- super.close();
- closed = true;
- }
-
- @Override
- public void write(final int b) throws IOException
- {
- if (count++ % 1024 * 1024 == 0)
- {
- System.out.println("OutputStream received " + count + " bytes");
- }
- if (closed)
- {
- throw new IOException("Stream was closed");
- }
- }
-
- };
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java 2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -43,7 +43,7 @@
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.chunkmessage.LargeMessageTestBase;
+import org.jboss.messaging.tests.integration.largemessage.LargeMessageTestBase;
import org.jboss.messaging.utils.DataConstants;
import org.jboss.messaging.utils.SimpleString;
@@ -80,12 +80,12 @@
{
internalTestResendMessage(50000);
}
-
+
public void testResendLargeStreamMessage() throws Exception
{
internalTestResendMessage(150 * 1024);
}
-
+
public void internalTestResendMessage(long messageSize) throws Exception
{
ClientSession session = null;
@@ -101,7 +101,7 @@
session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
-
+
SimpleString ADDRESS2 = ADDRESS.concat("-2");
session.createQueue(ADDRESS2, ADDRESS2, true);
@@ -115,18 +115,17 @@
producer.send(clientFile);
session.commit();
-
+
session.start();
-
+
ClientConsumer consumer = session.createConsumer(ADDRESS);
ClientConsumer consumer2 = session.createConsumer(ADDRESS2);
-
+
ClientMessage msg1 = consumer.receive(10000);
msg1.acknowledge();
producer2.send(msg1);
-
-
+
try
{
producer2.send(msg1);
@@ -137,23 +136,22 @@
}
session.commit();
-
+
ClientMessage msg2 = consumer2.receive(10000);
-
+
assertNotNull(msg2);
-
+
msg2.acknowledge();
-
+
session.commit();
-
+
assertEquals(messageSize, msg2.getBodySize());
-
-
- for (int i = 0 ; i < messageSize; i++)
+
+ for (int i = 0; i < messageSize; i++)
{
assertEquals(getSamplebyte(i), msg2.getBody().readByte());
}
-
+
session.close();
validateNoFilesOnLargeDir();
@@ -177,9 +175,23 @@
}
}
}
+
public void testFilePersistenceOneHugeMessage() throws Exception
{
- testChunks(false, false, true, true, false, false, false, false, 1, 100 * 1024l * 1024l, RECEIVE_WAIT_TIME, 0, 10 * 1024 * 1024, 1024 * 1024);
+ testChunks(false,
+ false,
+ true,
+ true,
+ false,
+ false,
+ false,
+ false,
+ 1,
+ 100 * 1024l * 1024l,
+ RECEIVE_WAIT_TIME,
+ 0,
+ 10 * 1024 * 1024,
+ 1024 * 1024);
}
public void testFilePersistenceOneMessageStreaming() throws Exception
@@ -194,7 +206,20 @@
public void testFilePersistenceOneHugeMessageConsumer() throws Exception
{
- testChunks(false, false, true, true, false, false, false, true, 1, 100 * 1024 * 1024, 120000, 0, 10 * 1024 * 1024, 1024 * 1024);
+ testChunks(false,
+ false,
+ true,
+ true,
+ false,
+ false,
+ false,
+ true,
+ 1,
+ 100 * 1024 * 1024,
+ 120000,
+ 0,
+ 10 * 1024 * 1024,
+ 1024 * 1024);
}
public void testFilePersistence() throws Exception
@@ -890,6 +915,109 @@
}
}
+ public void testReceiveMultipleMessages() throws Exception
+ {
+ ClientSession session = null;
+ MessagingServer server = null;
+
+ final int SIZE = 10 * 1024;
+ final int NUMBER_OF_MESSAGES = 1000;
+ try
+ {
+
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setMinLargeMessageSize(1024);
+ sf.setConsumerWindowSize(1024 * 1024);
+
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ Message clientFile = session.createClientMessage(true);
+ clientFile.setBodyInputStream(createFakeLargeStream(SIZE));
+ producer.send(clientFile);
+
+ }
+ session.commit();
+ producer.close();
+
+ session.start();
+
+ // Reads the messages, rollback.. read them again
+ for (int trans = 0; trans < 2; trans++)
+ {
+
+ ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ // Wait the consumer to be complete with 10 messages before getting others
+ long timeout = System.currentTimeMillis() + 10000;
+ while (consumer.getBufferSize() < 10 && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(10);
+ }
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(10000);
+ assertNotNull(msg);
+
+ // it will ignore the buffer (not read it) on the first try
+ if (trans == 0)
+ {
+ for (int byteRead = 0; byteRead < SIZE; byteRead++)
+ {
+ assertEquals(getSamplebyte(byteRead), msg.getBody().readByte());
+ }
+ }
+
+ msg.acknowledge();
+ }
+ if (trans == 0)
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+
+ consumer.close();
+ }
+
+ assertEquals(0l, server.getPostOffice().getPagingManager().getGlobalSize());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testSendStreamingSingleMessage() throws Exception
{
ClientSession session = null;
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java (from rev 6637, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageCleanupTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageCleanupTest.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -0,0 +1,207 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.largemessage;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.largemessage.mock.MockConnector;
+import org.jboss.messaging.tests.integration.largemessage.mock.MockConnectorFactory;
+
+/**
+ * A LargeMessageCleanupTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LargeMessageCleanupTest extends LargeMessageTestBase
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(LargeMessageCleanupTest.class);
+
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testCleanup() throws Exception
+ {
+ clearData();
+
+ FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(), "1234.tmp"));
+
+ fileOut.write(new byte[1024]); // anything
+
+ fileOut.close();
+
+ Configuration config = createDefaultConfig();
+
+ server = createServer(true, config, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+
+ File directoryLarge = new File(getLargeMessagesDir());
+
+ assertEquals("The startup should have been deleted 1234.tmp", 0, directoryLarge.list().length);
+ }
+ finally
+ {
+ server.stop();
+ }
+ }
+
+ public void testFailureOnSendingFile() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(20 * 1024);
+ config.setPagingGlobalWatermarkSize(10 * 1024);
+
+ server = createServer(true, config, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int numberOfBytes = 2 * 1024 * 1024;
+
+ ClientSession session = null;
+
+ class LocalCallback implements MockConnector.MockCallback
+ {
+ AtomicInteger counter = new AtomicInteger(0);
+
+ ClientSession session;
+
+ public void onWrite(final MessagingBuffer buffer)
+ {
+ log.info("calling cb onwrite** ");
+ if (counter.incrementAndGet() == 5)
+ {
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+ RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)server.getRemotingService();
+ remotingServiceImpl.connectionException(conn.getID(),
+ new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+ throw new IllegalStateException("blah");
+ }
+ }
+ }
+
+ LocalCallback callback = new LocalCallback();
+
+ try
+ {
+ HashMap<String, Object> parameters = new HashMap<String, Object>();
+ parameters.put("callback", callback);
+
+ TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
+ parameters);
+
+ ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
+
+ mockFactory.setBlockOnNonPersistentSend(false);
+ mockFactory.setBlockOnPersistentSend(false);
+ mockFactory.setBlockOnAcknowledge(false);
+
+ session = mockFactory.createSession(null, null, false, true, true, false, 0);
+
+ callback.session = session;
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
+
+ try
+ {
+ producer.send(clientLarge);
+
+ fail("Exception was expected!");
+ }
+ catch (Exception e)
+ {
+ }
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+ }
+
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java (from rev 6637, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/LargeMessageTestBase.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -0,0 +1,670 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.largemessage;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A LargeMessageTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 29, 2008 11:43:52 AM
+ *
+ *
+ */
+public class LargeMessageTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageTestBase.class);
+
+ protected final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ protected MessagingServer server;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void testChunks(final boolean isXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery) throws Exception
+ {
+ testChunks(isXA,
+ rollbackFirstSend,
+ useStreamOnConsume,
+ realFiles,
+ preAck,
+ sendingBlocking,
+ testBrowser,
+ useMessageConsumer,
+ numberOfMessages,
+ numberOfBytes,
+ waitOnConsumer,
+ delayDelivery,
+ -1,
+ 10 * 1024);
+ }
+
+ protected void testChunks(final boolean isXA,
+ final boolean rollbackFirstSend,
+ final boolean useStreamOnConsume,
+ final boolean realFiles,
+ final boolean preAck,
+ final boolean sendingBlocking,
+ final boolean testBrowser,
+ final boolean useMessageConsumer,
+ final int numberOfMessages,
+ final long numberOfBytes,
+ final int waitOnConsumer,
+ final long delayDelivery,
+ final int producerWindow,
+ final int minSize) throws Exception
+ {
+ clearData();
+
+ server = createServer(realFiles);
+ server.start();
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ if (sendingBlocking)
+ {
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ }
+
+ if (producerWindow > 0)
+ {
+ sf.setProducerWindowSize(producerWindow);
+ }
+
+ sf.setMinLargeMessageSize(minSize);
+
+ ClientSession session;
+
+ Xid xid = null;
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ if (rollbackFirstSend)
+ {
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ validateNoFilesOnLargeDir();
+ }
+
+ sendMessages(numberOfMessages, numberOfBytes, delayDelivery, session, producer);
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+
+ server = createServer(realFiles);
+ server.start();
+
+ sf = createInVMFactory();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientConsumer consumer = null;
+
+ for (int iteration = testBrowser ? 0 : 1; iteration < 2; iteration++)
+ {
+
+ System.out.println("Iteration: " + iteration);
+
+ session.stop();
+
+ // first time with a browser
+ consumer = session.createConsumer(ADDRESS, null, iteration == 0);
+
+ if (useMessageConsumer)
+ {
+ final CountDownLatch latchDone = new CountDownLatch(numberOfMessages);
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ MessageHandler handler = new MessageHandler()
+ {
+ int msgCounter;
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ System.out.println("Message on consumer: " + msgCounter);
+
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
+ assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple scheduledMessages with
+ // the same
+ // scheduled delivery time
+ assertEquals(msgCounter,
+ ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
+ }
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ public void write(byte b[]) throws IOException
+ {
+ if (b[0] == getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ System.out.println("Read position " + bytesRead.get() + " on consumer");
+ if (bytesRead.get() == 1126400l)
+ {
+ System.out.println("I'm here");
+ }
+ }
+ else
+ {
+ System.out.println("Received invalid packet at position " + bytesRead.get());
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ if (b == getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ System.out.println("byte not as expected!");
+ }
+ }
+ });
+
+ assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+
+ MessagingBuffer buffer = message.getBody();
+ buffer.resetReaderIndex();
+ assertEquals(numberOfBytes, buffer.writerIndex());
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0)
+ {
+ System.out.println("Read " + b + " bytes");
+ }
+
+ assertEquals(getSamplebyte(b), buffer.readByte());
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ System.out.println("Got an error");
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ latchDone.countDown();
+ msgCounter++;
+ }
+ }
+ };
+
+ session.start();
+
+ consumer.setMessageHandler(handler);
+
+ assertTrue(latchDone.await(waitOnConsumer, TimeUnit.SECONDS));
+ assertEquals(0, errors.get());
+
+ }
+ else
+ {
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ System.currentTimeMillis();
+
+ ClientMessage message = consumer.receive(waitOnConsumer + delayDelivery);
+
+ assertNotNull(message);
+
+ System.out.println("Message: " + i);
+
+ System.currentTimeMillis();
+
+ if (delayDelivery > 0)
+ {
+ long originalTime = (Long)message.getProperty(new SimpleString("original-time"));
+ assertTrue(System.currentTimeMillis() - originalTime + "<" + delayDelivery,
+ System.currentTimeMillis() - originalTime >= delayDelivery);
+ }
+
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
+
+ assertNotNull(message);
+
+ if (delayDelivery <= 0)
+ {
+ // right now there is no guarantee of ordered delivered on multiple scheduledMessages with the same
+ // scheduled delivery time
+ assertEquals(i, ((Integer)message.getProperty(new SimpleString("counter-message"))).intValue());
+ }
+
+ MessagingBuffer buffer = message.getBody();
+ buffer.resetReaderIndex();
+
+ if (useStreamOnConsume)
+ {
+ final AtomicLong bytesRead = new AtomicLong(0);
+ message.saveToOutputStream(new OutputStream()
+ {
+
+ public void write(byte b[]) throws IOException
+ {
+ if (b[0] == getSamplebyte(bytesRead.get()))
+ {
+ bytesRead.addAndGet(b.length);
+ }
+ else
+ {
+ System.out.println("Received invalid packet at position " + bytesRead.get());
+ }
+
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ if (bytesRead.get() % (1024l * 1024l) == 0)
+ {
+ System.out.println("Read " + bytesRead.get() + " bytes");
+ }
+ if (b == (byte)'a')
+ {
+ bytesRead.incrementAndGet();
+ }
+ else
+ {
+ System.out.println("byte not as expected!");
+ }
+ }
+ });
+
+ assertEquals(numberOfBytes, bytesRead.get());
+ }
+ else
+ {
+ for (long b = 0; b < numberOfBytes; b++)
+ {
+ if (b % (1024l * 1024l) == 0l)
+ {
+ System.out.println("Read " + b + " bytes");
+ }
+ assertEquals(getSamplebyte(b), buffer.readByte());
+ }
+ }
+
+ }
+
+ }
+ consumer.close();
+
+ if (iteration == 0)
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+ else
+ {
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+
+ session.close();
+
+ long globalSize = server.getPostOffice().getPagingManager().getGlobalSize();
+ assertEquals(0l, globalSize);
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ /**
+ * @param useFile
+ * @param numberOfMessages
+ * @param numberOfIntegers
+ * @param delayDelivery
+ * @param testTime
+ * @param session
+ * @param producer
+ * @throws FileNotFoundException
+ * @throws IOException
+ * @throws MessagingException
+ */
+ private void sendMessages(final int numberOfMessages,
+ final long numberOfBytes,
+ final long delayDelivery,
+ final ClientSession session,
+ final ClientProducer producer) throws Exception
+ {
+ System.out.println("NumberOfBytes = " + numberOfBytes);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ // If the test is using more than 1M, we will only use the Streaming, as it require too much memory from the
+ // test
+ if (numberOfBytes > 1024 * 1024 || i % 2 == 0)
+ {
+ System.out.println("Sending message (stream)" + i);
+ message.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+ }
+ else
+ {
+ System.out.println("Sending message (array)" + i);
+ byte[] bytes = new byte[(int)numberOfBytes];
+ for (int j = 0; j < bytes.length; j++)
+ {
+ bytes[j] = getSamplebyte(j);
+ }
+ message.getBody().writeBytes(bytes);
+ }
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ if (delayDelivery > 0)
+ {
+ long time = System.currentTimeMillis();
+ message.putLongProperty(new SimpleString("original-time"), time);
+ message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time + delayDelivery);
+
+ producer.send(message);
+ }
+ else
+ {
+ producer.send(message);
+ }
+ }
+ }
+
+ protected MessagingBuffer createLargeBuffer(final int numberOfIntegers)
+ {
+ MessagingBuffer body = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
+
+ for (int i = 0; i < numberOfIntegers; i++)
+ {
+ body.writeInt(i);
+ }
+
+ return body;
+
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session, final int numberOfBytes) throws Exception
+ {
+ return createLargeClientMessage(session, numberOfBytes, true);
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session,
+ final long numberOfBytes,
+ final boolean persistent) throws Exception
+ {
+
+ ClientMessage clientMessage = session.createClientMessage(persistent);
+
+ clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+
+ return clientMessage;
+ }
+
+ /**
+ * @param session
+ * @param queueToRead
+ * @param numberOfIntegers
+ * @throws MessagingException
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ protected void readMessage(final ClientSession session, final SimpleString queueToRead, final int numberOfBytes) throws MessagingException,
+ FileNotFoundException,
+ IOException
+ {
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(queueToRead);
+
+ ClientMessage clientMessage = consumer.receive(5000);
+
+ assertNotNull(clientMessage);
+
+ clientMessage.acknowledge();
+
+ session.commit();
+
+ consumer.close();
+ }
+
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+ */
+ protected void validateNoFilesOnLargeDir() throws Exception
+ {
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+
+ // Deleting the file is async... we keep looking for a period of the time until the file is really gone
+ for (int i = 0; i < 100; i++)
+ {
+ if (largeMessagesFileDir.listFiles().length > 0)
+ {
+ Thread.sleep(10);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ assertEquals(0, largeMessagesFileDir.listFiles().length);
+ }
+
+ protected OutputStream createFakeOutputStream() throws Exception
+ {
+
+ return new OutputStream()
+ {
+ private boolean closed = false;
+
+ private int count;
+
+ @Override
+ public void close() throws IOException
+ {
+ super.close();
+ closed = true;
+ }
+
+ @Override
+ public void write(final int b) throws IOException
+ {
+ if (count++ % 1024 * 1024 == 0)
+ {
+ //System.out.println("OutputStream received " + count + " bytes");
+ }
+ if (closed)
+ {
+ throw new IOException("Stream was closed");
+ }
+ }
+
+ };
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock (from rev 6637, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnector.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnector.java 2009-04-30 16:08:56 UTC (rev 6637)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnector.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -20,7 +20,7 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.messaging.tests.integration.chunkmessage.mock;
+package org.jboss.messaging.tests.integration.largemessage.mock;
import java.util.Map;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/mock/MockConnectorFactory.java 2009-04-30 16:08:56 UTC (rev 6637)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/mock/MockConnectorFactory.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -20,7 +20,7 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.messaging.tests.integration.chunkmessage.mock;
+package org.jboss.messaging.tests.integration.largemessage.mock;
import java.util.Map;
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java 2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/chunk/LargeMessageStressTest.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -22,7 +22,7 @@
package org.jboss.messaging.tests.stress.chunk;
-import org.jboss.messaging.tests.integration.chunkmessage.LargeMessageTestBase;
+import org.jboss.messaging.tests.integration.largemessage.LargeMessageTestBase;
/**
* A MessageChunkSoakTest
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-04-30 16:45:28 UTC (rev 6638)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-04-30 17:09:51 UTC (rev 6639)
@@ -407,7 +407,6 @@
public void close() throws IOException
{
super.close();
- System.out.println("Sent " + count + " bytes over fakeOutputStream, while size = " + size);
closed = true;
}
More information about the jboss-cvs-commits
mailing list