[jboss-cvs] JBoss Messaging SVN: r6181 - in trunk/tests/src/org/jboss/messaging/tests/integration: client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 26 13:05:09 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-26 13:05:08 -0400 (Thu, 26 Mar 2009)
New Revision: 6181
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
Log:
Moving MessageChunkTest to client... and separating specific failure tests to a messageChunk specific package
Added: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkCleanupTest.java 2009-03-26 17:05:08 UTC (rev 6181)
@@ -0,0 +1,290 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.integration.chunkmessage;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientFileMessage;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
+import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A ChunkCleanupTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ChunkCleanupTest extends ChunkTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ public void testCleanup() throws Exception
+ {
+ clearData();
+
+ createLargeFile(getLargeMessagesDir(), "1234.tmp", 13333);
+
+ Configuration config = createDefaultConfig();
+
+ messagingService = createService(true, config, new HashMap<String, AddressSettings>());
+
+ messagingService.start();
+
+ try
+ {
+
+ File directoryLarge = new File(getLargeMessagesDir());
+
+ assertEquals(0, directoryLarge.list().length);
+ }
+ finally
+ {
+ messagingService.stop();
+ }
+ }
+
+ public void testFailureOnSendingFile() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(20 * 1024);
+ config.setPagingGlobalWatermarkSize(10 * 1024);
+
+ messagingService = createService(true, config, new HashMap<String, AddressSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegersBigMessage = 150000;
+
+ ClientSession session = null;
+
+ class LocalCallback implements MockConnector.MockCallback
+ {
+
+ AtomicInteger counter = new AtomicInteger(0);
+
+ ClientSession session;
+
+ public void onWrite(final MessagingBuffer buffer)
+ {
+ if (counter.incrementAndGet() == 5)
+ {
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+ RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)messagingService.getServer()
+ .getRemotingService();
+ remotingServiceImpl.connectionException(conn.getID(),
+ new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+ throw new IllegalStateException("blah");
+ }
+ }
+
+ }
+
+ LocalCallback callback = new LocalCallback();
+
+ try
+ {
+ HashMap<String, Object> parameters = new HashMap<String, Object>();
+ parameters.put("callback", callback);
+
+ TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
+ parameters);
+
+ ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
+
+ mockFactory.setBlockOnNonPersistentSend(false);
+ mockFactory.setBlockOnPersistentSend(false);
+ mockFactory.setBlockOnAcknowledge(false);
+
+ session = mockFactory.createSession(null, null, false, true, true, false, 0);
+
+ callback.session = session;
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientFileMessage clientLarge = createLargeClientMessage(session, numberOfIntegersBigMessage);
+
+ try
+ {
+ producer.send(clientLarge);
+ fail("Exception was expected!");
+ }
+ catch (Exception e)
+ {
+ }
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Exception ignored)
+ {
+ ignored.printStackTrace();
+ }
+ }
+
+ }
+
+ // Validate the functions to create and verify files
+ public void testFiles() throws Exception
+ {
+ clearData();
+
+ File file = createLargeFile(getTemporaryDir(), "test.tst", 13333);
+
+ checkFileRead(file, 13333);
+ }
+
+ public void testClearOnClientBuffer() throws Exception
+ {
+ clearData();
+
+ messagingService = createService(true);
+ messagingService.start();
+
+ final int numberOfIntegers = 10;
+ final int numberOfMessages = 100;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, false, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ File tmpData = createLargeFile(getTemporaryDir(), "someFile.dat", numberOfIntegers);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createFileMessage(true);
+ ((ClientFileMessage)message).setFile(tmpData);
+ message.putIntProperty(new SimpleString("counter-message"), i);
+ System.currentTimeMillis();
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), ADDRESS);;
+
+ File clientfiles = new File(getClientLargeMessagesDir());
+
+ session.start();
+
+ ClientMessage msg = consumer.receive(1000);
+ msg.acknowledge();
+
+ for (int i = 0; i < 100; i++)
+ {
+ if (clientfiles.listFiles().length > 0)
+ {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ assertTrue(clientfiles.listFiles().length > 0);
+
+ session.close();
+
+ assertEquals(1, clientfiles.list().length); // 1 message was received, that should be kept
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-03-26 17:00:09 UTC (rev 6180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2009-03-26 17:05:08 UTC (rev 6181)
@@ -1,955 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.chunkmessage;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import junit.framework.AssertionFailedError;
-
-import org.jboss.messaging.core.buffers.ChannelBuffers;
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientFileMessage;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
-import org.jboss.messaging.core.config.Configuration;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.server.impl.RemotingServiceImpl;
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.settings.impl.AddressSettings;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnector;
-import org.jboss.messaging.tests.integration.chunkmessage.mock.MockConnectorFactory;
-import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.SimpleString;
-
-/**
- * A TestMessageChunk
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created 29-Sep-08 4:04:10 PM
- *
- *
- */
-public class MessageChunkTest extends ChunkTestBase
-{
-
- // Constants -----------------------------------------------------
-
- final static int RECEIVE_WAIT_TIME = 10000;
-
- // Attributes ----------------------------------------------------
-
- static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
- // Static --------------------------------------------------------
- private static final Logger log = Logger.getLogger(MessageChunkTest.class);
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testCleanup() throws Exception
- {
- clearData();
-
- createLargeFile(getLargeMessagesDir(), "1234.tmp", 13333);
-
- Configuration config = createDefaultConfig();
-
- messagingService = createService(true, config, new HashMap<String, AddressSettings>());
-
- messagingService.start();
-
- try
- {
-
- File directoryLarge = new File(getLargeMessagesDir());
-
- assertEquals(0, directoryLarge.list().length);
- }
- finally
- {
- messagingService.stop();
- }
- }
-
- public void testFailureOnSendingFile() throws Exception
- {
- clearData();
-
- Configuration config = createDefaultConfig();
-
- config.setPagingMaxGlobalSizeBytes(20 * 1024);
- config.setPagingGlobalWatermarkSize(10 * 1024);
-
- messagingService = createService(true, config, new HashMap<String, AddressSettings>());
-
- messagingService.start();
-
- final int numberOfIntegersBigMessage = 150000;
-
- ClientSession session = null;
-
- class LocalCallback implements MockConnector.MockCallback
- {
-
- AtomicInteger counter = new AtomicInteger(0);
-
- ClientSession session;
-
- public void onWrite(final MessagingBuffer buffer)
- {
- if (counter.incrementAndGet() == 5)
- {
- RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
- RemotingServiceImpl remotingServiceImpl = (RemotingServiceImpl)messagingService.getServer()
- .getRemotingService();
- remotingServiceImpl.connectionException(conn.getID(),
- new MessagingException(MessagingException.NOT_CONNECTED, "blah!"));
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
- throw new IllegalStateException("blah");
- }
- }
-
- }
-
- LocalCallback callback = new LocalCallback();
-
- try
- {
- HashMap<String, Object> parameters = new HashMap<String, Object>();
- parameters.put("callback", callback);
-
- TransportConfiguration transport = new TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
- parameters);
-
- ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
-
- mockFactory.setBlockOnNonPersistentSend(false);
- mockFactory.setBlockOnPersistentSend(false);
- mockFactory.setBlockOnAcknowledge(false);
-
- session = mockFactory.createSession(null, null, false, true, true, false, 0);
-
- callback.session = session;
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientFileMessage clientLarge = createLargeClientMessage(session, numberOfIntegersBigMessage);
-
- try
- {
- producer.send(clientLarge);
- fail("Exception was expected!");
- }
- catch (Exception e)
- {
- }
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- try
- {
- messagingService.stop();
- }
- catch (Exception ignored)
- {
- ignored.printStackTrace();
- }
- }
-
- }
-
- // Validate the functions to create and verify files
- public void testFiles() throws Exception
- {
- clearData();
-
- File file = createLargeFile(getTemporaryDir(), "test.tst", 13333);
-
- checkFileRead(file, 13333);
- }
-
- public void testClearOnClientBuffer() throws Exception
- {
- clearData();
-
- messagingService = createService(true);
- messagingService.start();
-
- final int numberOfIntegers = 10;
- final int numberOfMessages = 100;
-
- try
- {
- ClientSessionFactory sf = createInVMFactory();
-
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
- sf.setBlockOnAcknowledge(true);
-
- ClientSession session = sf.createSession(null, null, false, true, false, false, 0);
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- File tmpData = createLargeFile(getTemporaryDir(), "someFile.dat", numberOfIntegers);
-
- for (int i = 0; i < numberOfMessages; i++)
- {
- ClientMessage message = session.createFileMessage(true);
- ((ClientFileMessage)message).setFile(tmpData);
- message.putIntProperty(new SimpleString("counter-message"), i);
- System.currentTimeMillis();
- producer.send(message);
- }
-
- ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), ADDRESS);;
-
- File clientfiles = new File(getClientLargeMessagesDir());
-
- session.start();
-
- ClientMessage msg = consumer.receive(1000);
- msg.acknowledge();
-
- for (int i = 0; i < 100; i++)
- {
- if (clientfiles.listFiles().length > 0)
- {
- break;
- }
- Thread.sleep(100);
- }
-
- assertTrue(clientfiles.listFiles().length > 0);
-
- session.close();
-
- assertEquals(1, clientfiles.list().length); // 1 message was received, that should be kept
-
- validateNoFilesOnLargeDir();
-
- }
- finally
- {
- try
- {
- messagingService.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- public void testMessageChunkFilePersistence() throws Exception
- {
- testChunks(false, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceXA() throws Exception
- {
- testChunks(true, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlocked() throws Exception
- {
- testChunks(false, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedXA() throws Exception
- {
- testChunks(true, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedPreACK() throws Exception
- {
- testChunks(false, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceBlockedPreACKXA() throws Exception
- {
- testChunks(true, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkFilePersistenceDelayed() throws Exception
- {
- testChunks(false, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
- }
-
- public void testMessageChunkFilePersistenceDelayedXA() throws Exception
- {
- testChunks(true, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
- }
-
- public void testMessageChunkNullPersistence() throws Exception
- {
- testChunks(false, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkNullPersistenceXA() throws Exception
- {
- testChunks(true, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testMessageChunkNullPersistenceDelayed() throws Exception
- {
- testChunks(false, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
- }
-
- public void testMessageChunkNullPersistenceDelayedXA() throws Exception
- {
- testChunks(true, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
- }
-
- public void testPageOnLargeMessage() throws Exception
- {
- testPageOnLargeMessage(true, false);
- }
-
- public void testPageOnLargeMessageNullPersistence() throws Exception
- {
- testPageOnLargeMessage(false, false);
-
- }
-
- public void testSendfileMessage() throws Exception
- {
- testChunks(false, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendfileMessageXA() throws Exception
- {
- testChunks(true, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendfileMessageOnNullPersistence() throws Exception
- {
- testChunks(false, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendfileMessageOnNullPersistenceXA() throws Exception
- {
- testChunks(true, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
- {
- testChunks(false, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendfileMessageOnNullPersistenceSmallMessageXA() throws Exception
- {
- testChunks(true, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendfileMessageSmallMessage() throws Exception
- {
- testChunks(false, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendfileMessageSmallMessageXA() throws Exception
- {
- testChunks(true, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessageNullPersistence() throws Exception
- {
- testChunks(false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessageNullPersistenceXA() throws Exception
- {
- testChunks(true, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessageNullPersistenceDelayed() throws Exception
- {
- testChunks(false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception
- {
- testChunks(true, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessagePersistence() throws Exception
- {
- testChunks(false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessagePersistenceXA() throws Exception
- {
- testChunks(true, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
- }
-
- public void testSendRegularMessagePersistenceDelayed() throws Exception
- {
- testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testSendRegularMessagePersistenceDelayedXA() throws Exception
- {
- testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
- }
-
- public void testTwoBindingsTwoStartedConsumers() throws Exception
- {
- // there are two bindings.. one is ACKed, the other is not, the server is restarted
- // The other binding is acked... The file must be deleted
-
- clearData();
-
- try
- {
-
- messagingService = createService(true);
-
- messagingService.start();
-
- SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, queue[0], null, true);
- session.createQueue(ADDRESS, queue[1], null, true);
-
- int numberOfIntegers = 100000;
-
- Message clientFile = createLargeClientMessage(session, numberOfIntegers);
- // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- session.start();
-
- producer.send(clientFile);
-
- producer.close();
-
- ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[1]);
- ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
- assertNull(consumer.receive(1000));
- assertNotNull(msg);
-
- msg.acknowledge();
- consumer.close();
-
- System.out.println("Stopping");
-
- session.stop();
-
- ClientConsumer consumer1 = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[0]);
-
- session.start();
-
- msg = consumer1.receive(RECEIVE_WAIT_TIME);
- assertNotNull(msg);
- msg.acknowledge();
- consumer1.close();
-
- session.commit();
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- messagingService.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- public void testTwoBindingsAndRestart() throws Exception
- {
- testTwoBindings(true);
- }
-
- public void testTwoBindingsNoRestart() throws Exception
- {
- testTwoBindings(false);
- }
-
- public void testTwoBindings(final boolean restart) throws Exception
- {
- // there are two bindings.. one is ACKed, the other is not, the server is restarted
- // The other binding is acked... The file must be deleted
-
- clearData();
-
- try
- {
-
- messagingService = createService(true);
-
- messagingService.start();
-
- SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, queue[0], null, true);
- session.createQueue(ADDRESS, queue[1], null, true);
-
- int numberOfIntegers = 100000;
-
- Message clientFile = createLargeClientMessage(session, numberOfIntegers);
- // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
-
- ClientProducer producer = session.createProducer(ADDRESS);
- producer.send(clientFile);
-
- producer.close();
-
- readMessage(session, queue[1], numberOfIntegers);
-
- if (restart)
- {
- session.close();
-
- messagingService.stop();
-
- log.info("Restartning");
-
- messagingService = createService(true);
-
- messagingService.start();
-
- sf = createInVMFactory();
-
- session = sf.createSession(null, null, false, true, true, false, 0);
- }
-
- readMessage(session, queue[0], numberOfIntegers);
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- messagingService.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- public void testSendRollbackXA() throws Exception
- {
- internalTestSendRollback(true);
- }
-
- public void testSendRollback() throws Exception
- {
- internalTestSendRollback(false);
- }
-
- private void internalTestSendRollback(final boolean isXA) throws Exception
- {
- clearData();
-
- messagingService = createService(true);
-
- messagingService.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(isXA, false, false);
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
- Xid xid = null;
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- Message clientFile = createLargeClientMessage(session, 50000, false);
-
- for (int i = 0; i < 1; i++)
- {
- producer.send(clientFile);
- }
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.rollback(xid);
- }
- else
- {
- session.rollback();
- }
-
- session.close();
-
- validateNoFilesOnLargeDir();
-
- messagingService.stop();
-
- }
-
- public void testSimpleRollback() throws Exception
- {
- simpleRollbackInternalTest(false);
- }
-
- public void testSimpleRollbackXA() throws Exception
- {
- simpleRollbackInternalTest(true);
- }
-
- public void simpleRollbackInternalTest(boolean isXA) throws Exception
- {
- // there are two bindings.. one is ACKed, the other is not, the server is restarted
- // The other binding is acked... The file must be deleted
-
- clearData();
-
- try
- {
-
- messagingService = createService(true);
-
- messagingService.start();
-
- ClientSessionFactory sf = createInVMFactory();
-
- ClientSession session = sf.createSession(isXA, false, false);
-
- Xid xid = null;
-
- if (isXA)
- {
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- int numberOfIntegers = 50000;
-
- session.start();
-
- log.info("Session started");
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- for (int n = 0; n < 10; n++)
- {
- Message clientFile = createLargeClientMessage(session, numberOfIntegers, n % 2 == 0);
-
- producer.send(clientFile);
-
- assertNull(consumer.receiveImmediate());
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.rollback();
- }
-
- producer.send(clientFile);
-
- assertNull(consumer.receiveImmediate());
-
- if (isXA)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.commit();
- }
-
- for (int i = 0; i < 2; i++)
- {
-
- ClientMessage clientMessage = consumer.receive(5000);
-
- assertNotNull(clientMessage);
-
- assertEquals(numberOfIntegers * 4, clientMessage.getBody().writerIndex());
-
- clientMessage.acknowledge();
-
- if (isXA)
- {
- if (i == 0)
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
- session.rollback(xid);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- else
- {
- session.end(xid, XAResource.TMSUCCESS);
- session.commit(xid, true);
- xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
- }
- }
- else
- {
- if (i == 0)
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- }
- }
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- messagingService.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- log.info("\n*********************************************************************************\n Starting " + this.getName() +
- "\n*********************************************************************************");
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- log.info("\n*********************************************************************************\nDone with " + this.getName() +
- "\n*********************************************************************************");
- super.tearDown();
- }
-
- protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
- {
- clearData();
-
- Configuration config = createDefaultConfig();
-
- config.setPagingMaxGlobalSizeBytes(20 * 1024);
- config.setPagingGlobalWatermarkSize(10 * 1024);
-
- messagingService = createService(realFiles, config, new HashMap<String, AddressSettings>());
-
- messagingService.start();
-
- final int numberOfIntegers = 256;
-
- final int numberOfIntegersBigMessage = 100000;
-
- try
- {
- ClientSessionFactory sf = createInVMFactory();
-
- if (sendBlocking)
- {
- sf.setBlockOnNonPersistentSend(true);
- sf.setBlockOnPersistentSend(true);
- sf.setBlockOnAcknowledge(true);
- }
-
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- // printBuffer("body to be sent : " , body);
-
- ClientMessage message = null;
-
- MessagingBuffer body = null;
-
- for (int i = 0; i < 100; i++)
- {
- MessagingBuffer bodyLocal = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
-
- for (int j = 1; j <= numberOfIntegers; j++)
- {
- bodyLocal.writeInt(j);
- }
-
- if (i == 0)
- {
- body = bodyLocal;
- }
-
- message = session.createClientMessage(true);
- message.setBody(bodyLocal);
-
- producer.send(message);
- }
-
- ClientFileMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
-
- producer.send(clientFile);
-
- session.close();
-
- if (realFiles)
- {
- messagingService.stop();
-
- messagingService = createService(true, config, new HashMap<String, AddressSettings>());
- messagingService.start();
-
- sf = createInVMFactory();
- }
-
- session = sf.createSession(null, null, false, true, true, false, 0);
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage message2 = consumer.receive(RECEIVE_WAIT_TIME);
-
- log.info("got message " + i);
-
- assertNotNull(message2);
-
- message2.acknowledge();
-
- assertNotNull(message2);
-
- try
- {
- assertEqualsByteArrays(body.writerIndex(), body.array(), message2.getBody().array());
- }
- catch (AssertionFailedError e)
- {
- log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
- log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
- throw e;
- }
- }
-
- consumer.close();
-
- session.close();
-
- session = sf.createSession(null, null, false, true, true, false, 0);
-
- readMessage(session, ADDRESS, numberOfIntegersBigMessage);
-
- // printBuffer("message received : ", message2.getBody());
-
- session.close();
- }
- finally
- {
- try
- {
- messagingService.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
-
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java (from rev 6174, trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java 2009-03-26 17:05:08 UTC (rev 6181)
@@ -0,0 +1,744 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.client;
+
+import java.io.File;
+import java.util.HashMap;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.AssertionFailedError;
+
+import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientFileMessage;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.Message;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.tests.integration.chunkmessage.ChunkTestBase;
+import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A TestMessageChunk
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created 29-Sep-08 4:04:10 PM
+ *
+ *
+ */
+public class MessageChunkTest extends ChunkTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ final static int RECEIVE_WAIT_TIME = 10000;
+
+ // Attributes ----------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Static --------------------------------------------------------
+ private static final Logger log = Logger.getLogger(MessageChunkTest.class);
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testMessageChunkFilePersistence() throws Exception
+ {
+ testChunks(false, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkFilePersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkFilePersistenceBlocked() throws Exception
+ {
+ testChunks(false, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkFilePersistenceBlockedXA() throws Exception
+ {
+ testChunks(true, true, false, false, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkFilePersistenceBlockedPreACK() throws Exception
+ {
+ testChunks(false, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkFilePersistenceBlockedPreACKXA() throws Exception
+ {
+ testChunks(true, true, false, true, true, true, 100, 262144, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkFilePersistenceDelayed() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
+ }
+
+ public void testMessageChunkFilePersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, false, 1, 50000, RECEIVE_WAIT_TIME, 2000);
+ }
+
+ public void testMessageChunkNullPersistence() throws Exception
+ {
+ testChunks(false, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkNullPersistenceXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, true, 1, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testMessageChunkNullPersistenceDelayed() throws Exception
+ {
+ testChunks(false, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
+ }
+
+ public void testMessageChunkNullPersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, false, 100, 50000, RECEIVE_WAIT_TIME, 100);
+ }
+
+ public void testPageOnLargeMessage() throws Exception
+ {
+ testPageOnLargeMessage(true, false);
+ }
+
+ public void testPageOnLargeMessageNullPersistence() throws Exception
+ {
+ testPageOnLargeMessage(false, false);
+
+ }
+
+ public void testSendfileMessage() throws Exception
+ {
+ testChunks(false, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageXA() throws Exception
+ {
+ testChunks(true, true, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageOnNullPersistence() throws Exception
+ {
+ testChunks(false, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageOnNullPersistenceXA() throws Exception
+ {
+ testChunks(true, false, true, false, false, true, 100, 50000, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
+ {
+ testChunks(false, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageOnNullPersistenceSmallMessageXA() throws Exception
+ {
+ testChunks(true, false, true, false, true, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageSmallMessage() throws Exception
+ {
+ testChunks(false, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendfileMessageSmallMessageXA() throws Exception
+ {
+ testChunks(true, true, true, false, false, true, 100, 4, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessageNullPersistence() throws Exception
+ {
+ testChunks(false, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessageNullPersistenceXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessageNullPersistenceDelayed() throws Exception
+ {
+ testChunks(false, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessageNullPersistenceDelayedXA() throws Exception
+ {
+ testChunks(true, false, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessagePersistence() throws Exception
+ {
+ testChunks(false, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessagePersistenceXA() throws Exception
+ {
+ testChunks(true, true, false, false, false, true, 100, 100, RECEIVE_WAIT_TIME, 0);
+ }
+
+ public void testSendRegularMessagePersistenceDelayed() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testSendRegularMessagePersistenceDelayedXA() throws Exception
+ {
+ testChunks(false, true, false, false, false, false, 100, 100, RECEIVE_WAIT_TIME, 1000);
+ }
+
+ public void testTwoBindingsTwoStartedConsumers() throws Exception
+ {
+ // there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... The file must be deleted
+
+ clearData();
+
+ try
+ {
+
+ messagingService = createService(true);
+
+ messagingService.start();
+
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, queue[0], null, true);
+ session.createQueue(ADDRESS, queue[1], null, true);
+
+ int numberOfIntegers = 100000;
+
+ Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+ // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ session.start();
+
+ producer.send(clientFile);
+
+ producer.close();
+
+ ClientConsumer consumer = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[1]);
+ ClientMessage msg = consumer.receive(RECEIVE_WAIT_TIME);
+ assertNull(consumer.receive(1000));
+ assertNotNull(msg);
+
+ msg.acknowledge();
+ consumer.close();
+
+ System.out.println("Stopping");
+
+ session.stop();
+
+ ClientConsumer consumer1 = session.createFileConsumer(new File(getClientLargeMessagesDir()), queue[0]);
+
+ session.start();
+
+ msg = consumer1.receive(RECEIVE_WAIT_TIME);
+ assertNotNull(msg);
+ msg.acknowledge();
+ consumer1.close();
+
+ session.commit();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testTwoBindingsAndRestart() throws Exception
+ {
+ testTwoBindings(true);
+ }
+
+ public void testTwoBindingsNoRestart() throws Exception
+ {
+ testTwoBindings(false);
+ }
+
+ public void testTwoBindings(final boolean restart) throws Exception
+ {
+ // there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... The file must be deleted
+
+ clearData();
+
+ try
+ {
+
+ messagingService = createService(true);
+
+ messagingService.start();
+
+ SimpleString queue[] = new SimpleString[] { new SimpleString("queue1"), new SimpleString("queue2") };
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, queue[0], null, true);
+ session.createQueue(ADDRESS, queue[1], null, true);
+
+ int numberOfIntegers = 100000;
+
+ Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+ // Message clientFile = createLargeClientMessage(session, numberOfIntegers);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+ producer.send(clientFile);
+
+ producer.close();
+
+ readMessage(session, queue[1], numberOfIntegers);
+
+ if (restart)
+ {
+ session.close();
+
+ messagingService.stop();
+
+ log.info("Restartning");
+
+ messagingService = createService(true);
+
+ messagingService.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+ }
+
+ readMessage(session, queue[0], numberOfIntegers);
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testSendRollbackXA() throws Exception
+ {
+ internalTestSendRollback(true);
+ }
+
+ public void testSendRollback() throws Exception
+ {
+ internalTestSendRollback(false);
+ }
+
+ private void internalTestSendRollback(final boolean isXA) throws Exception
+ {
+ clearData();
+
+ messagingService = createService(true);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(isXA, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ Xid xid = null;
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, 50000, false);
+
+ for (int i = 0; i < 1; i++)
+ {
+ producer.send(clientFile);
+ }
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+
+ messagingService.stop();
+
+ }
+
+ public void testSimpleRollback() throws Exception
+ {
+ simpleRollbackInternalTest(false);
+ }
+
+ public void testSimpleRollbackXA() throws Exception
+ {
+ simpleRollbackInternalTest(true);
+ }
+
+ public void simpleRollbackInternalTest(boolean isXA) throws Exception
+ {
+ // there are two bindings.. one is ACKed, the other is not, the server is restarted
+ // The other binding is acked... The file must be deleted
+
+ clearData();
+
+ try
+ {
+
+ messagingService = createService(true);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(isXA, false, false);
+
+ Xid xid = null;
+
+ if (isXA)
+ {
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ int numberOfIntegers = 50000;
+
+ session.start();
+
+ log.info("Session started");
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int n = 0; n < 10; n++)
+ {
+ Message clientFile = createLargeClientMessage(session, numberOfIntegers, n % 2 == 0);
+
+ producer.send(clientFile);
+
+ assertNull(consumer.receiveImmediate());
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ producer.send(clientFile);
+
+ assertNull(consumer.receiveImmediate());
+
+ if (isXA)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.commit();
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+
+ ClientMessage clientMessage = consumer.receive(5000);
+
+ assertNotNull(clientMessage);
+
+ assertEquals(numberOfIntegers * 4, clientMessage.getBody().writerIndex());
+
+ clientMessage.acknowledge();
+
+ if (isXA)
+ {
+ if (i == 0)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.rollback(xid);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+ }
+ else
+ {
+ if (i == 0)
+ {
+ session.rollback();
+ }
+ else
+ {
+ session.commit();
+ }
+ }
+ }
+ }
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ log.info("\n*********************************************************************************\n Starting " + this.getName() +
+ "\n*********************************************************************************");
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ log.info("\n*********************************************************************************\nDone with " + this.getName() +
+ "\n*********************************************************************************");
+ super.tearDown();
+ }
+
+ protected void testPageOnLargeMessage(final boolean realFiles, final boolean sendBlocking) throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(20 * 1024);
+ config.setPagingGlobalWatermarkSize(10 * 1024);
+
+ messagingService = createService(realFiles, config, new HashMap<String, AddressSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfIntegersBigMessage = 100000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ if (sendBlocking)
+ {
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+ }
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ // printBuffer("body to be sent : " , body);
+
+ ClientMessage message = null;
+
+ MessagingBuffer body = null;
+
+ for (int i = 0; i < 100; i++)
+ {
+ MessagingBuffer bodyLocal = ChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bodyLocal.writeInt(j);
+ }
+
+ if (i == 0)
+ {
+ body = bodyLocal;
+ }
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ producer.send(message);
+ }
+
+ ClientFileMessage clientFile = createLargeClientMessage(session, numberOfIntegersBigMessage);
+
+ producer.send(clientFile);
+
+ session.close();
+
+ if (realFiles)
+ {
+ messagingService.stop();
+
+ messagingService = createService(true, config, new HashMap<String, AddressSettings>());
+ messagingService.start();
+
+ sf = createInVMFactory();
+ }
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message2 = consumer.receive(RECEIVE_WAIT_TIME);
+
+ log.info("got message " + i);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+
+ assertNotNull(message2);
+
+ try
+ {
+ assertEqualsByteArrays(body.writerIndex(), body.array(), message2.getBody().array());
+ }
+ catch (AssertionFailedError e)
+ {
+ log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
+ log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+
+ session.close();
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ readMessage(session, ADDRESS, numberOfIntegersBigMessage);
+
+ // printBuffer("message received : ", message2.getBody());
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/integration/client/MessageChunkTest.java
___________________________________________________________________
Name: svn:mergeinfo
+
More information about the jboss-cvs-commits
mailing list