[jboss-cvs] JBoss Messaging SVN: r5194 - in branches/Branch_Chunk_CRS2: src/main/org/jboss/messaging/core/journal/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 28 19:58:25 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-28 19:58:25 -0400 (Tue, 28 Oct 2008)
New Revision: 5194
Modified:
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Dealing with flow control on message chunks
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/client/impl/FileClientMessageImpl.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -57,15 +57,12 @@
{
super();
}
-
- public FileClientMessageImpl(boolean durable)
+ public FileClientMessageImpl(final boolean durable)
{
super(durable, null);
}
-
-
/**
* @param type
* @param durable
@@ -74,32 +71,28 @@
* @param priority
* @param body
*/
- public FileClientMessageImpl(byte type,
- boolean durable,
- long expiration,
- long timestamp,
- byte priority,
- MessagingBuffer body)
+ public FileClientMessageImpl(final byte type,
+ final boolean durable,
+ final long expiration,
+ final long timestamp,
+ final byte priority,
+ final MessagingBuffer body)
{
super(type, durable, expiration, timestamp, priority, body);
// TODO Auto-generated constructor stub
}
-
-
/**
* @param type
* @param durable
* @param body
*/
- public FileClientMessageImpl(byte type, boolean durable, MessagingBuffer body)
+ public FileClientMessageImpl(final byte type, final boolean durable, final MessagingBuffer body)
{
super(type, durable, body);
// TODO Auto-generated constructor stub
}
-
-
/**
* @param deliveryCount
*/
@@ -167,27 +160,27 @@
}
}
}
-
- public synchronized void encodeBody(MessagingBuffer buffer, int start, int size)
+
+ @Override
+ public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
{
try
{
FileChannel channel = getChannel();
-
+
ByteBuffer bufferRead = ByteBuffer.allocate(size);
-
+
channel.position(start);
channel.read(bufferRead);
-
+
buffer.putBytes(bufferRead.array());
}
catch (Exception e)
{
throw new RuntimeException(e.getMessage(), e);
}
-
+
}
-
@Override
public void setBody(final MessagingBuffer body)
@@ -246,13 +239,13 @@
{
RandomAccessFile randomFile = new RandomAccessFile(getFile(), "rw");
randomFile.seek(0);
-
+
FileChannel channel = randomFile.getChannel();
return channel;
}
catch (IOException e)
{
- throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
+ throw new MessagingException(MessagingException.INTERNAL_ERROR, e.getMessage(), e);
}
}
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -40,6 +40,11 @@
public NIOSequentialFileFactory(final String journalDir)
{
super(journalDir);
+
+ if (journalDir == null)
+ {
+ new Exception ("journalDir is null").printStackTrace();
+ }
}
// maxIO is ignored on NIO
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/Message.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -87,7 +87,7 @@
// Used on Message chunk
- void encodeBody(MessagingBuffer buffer, int start, int size);
+ void encodeBody(MessagingBuffer buffer, long start, int size);
void encodeBody(MessagingBuffer buffer);
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -193,9 +193,9 @@
}
// Used on Message chunk
- public void encodeBody(MessagingBuffer buffer, int start, int size)
+ public void encodeBody(MessagingBuffer buffer, long start, int size)
{
- buffer.putBytes(body.array(), start, size);
+ buffer.putBytes(body.array(), (int)start, size);
}
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -88,7 +88,7 @@
}
@Override
- public synchronized void encodeBody(final MessagingBuffer bufferOut, final int start, final int size)
+ public synchronized void encodeBody(final MessagingBuffer bufferOut, final long start, final int size)
{
new Exception ("Encode body");
validateFile();
@@ -220,6 +220,7 @@
{
if (file.isOpen())
{
+ System.out.println("Closing file " + file);
try
{
file.close();
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -949,12 +949,15 @@
*/
private void cleanupIncompleteFiles() throws Exception
{
- List<String> tmpFiles = this.largeMessagesFactory.listFiles("tmp");
- for (String tmpFile: tmpFiles)
+ if (largeMessagesFactory != null)
{
- SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
- System.out.println("cleaning up file " + file);
- file.delete();
+ List<String> tmpFiles = this.largeMessagesFactory.listFiles("tmp");
+ for (String tmpFile : tmpFiles)
+ {
+ SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
+ System.out.println("cleaning up file " + file);
+ file.delete();
+ }
}
}
Modified: branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -27,6 +27,7 @@
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -91,7 +92,7 @@
private final Lock lock = new ReentrantLock();
- private final Semaphore availableCredits;
+ private final AtomicInteger availableCredits;
private boolean started;
@@ -107,6 +108,13 @@
private final PostOffice postOffice;
private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
+
+ /** The current message being processed */
+ private volatile ServerLargeMessage pendingLargeMessage;
+
+ /** The current position on the message being processed */
+ private volatile long positionLargeMessage;
+
private final Channel channel;
@@ -140,7 +148,7 @@
if (enableFlowControl)
{
- availableCredits = new Semaphore(0);
+ availableCredits = new AtomicInteger(0);
}
else
{
@@ -170,7 +178,7 @@
public HandleStatus handle(final MessageReference ref) throws Exception
{
- if (availableCredits != null && availableCredits.availablePermits() <= 0)
+ if (availableCredits != null && availableCredits.get() <= 0)
{
return HandleStatus.BUSY;
}
@@ -189,6 +197,12 @@
try
{
+
+ if (pendingLargeMessage != null)
+ {
+ new Exception("Busy because of pendingLargeMessage").printStackTrace();
+ return HandleStatus.BUSY;
+ }
// If the consumer is stopped then we don't accept the message, it
// should go back into the
@@ -215,10 +229,13 @@
if (message instanceof ServerLargeMessage)
{
+// Todo: How to handle large-files on clustering
// if (result == null)
// {
// Not replicated - just send now
- sendChunks((ServerLargeMessage)message);
+ pendingLargeMessage = (ServerLargeMessage)message;
+ positionLargeMessage = 0;
+ sendChunks();
// }
// else
// {
@@ -245,7 +262,7 @@
if (availableCredits != null)
{
- availableCredits.release(message.getEncodeSize());
+ availableCredits.addAndGet(-message.getEncodeSize());
}
if (result == null)
@@ -369,10 +386,7 @@
{
if (availableCredits != null)
{
- int previous;
-
- previous = availableCredits.availablePermits();
- availableCredits.release(credits);
+ int previous = availableCredits.getAndAdd(credits);
if (previous <= 0 && previous + credits > 0)
{
@@ -460,59 +474,104 @@
// Private
// --------------------------------------------------------------------------------------
- /**
- * @param message
- * @throws MessagingException
- */
- private void sendChunks(ServerLargeMessage message) throws Exception
+ private boolean sendChunks()
{
- final int bodySize = message.getBodySize();
- int chunkLength = 0;
+ lock.lock();
- SessionSendChunkMessage chunk = null;
-
- for (int pos = 0; pos < bodySize; pos += chunkLength)
+ try
{
- if (pos == 0)
+ final long bodySize = pendingLargeMessage.getBodySize();
+
+ int chunkLength = 0;
+
+ SessionSendChunkMessage chunk = null;
+
+ for (; positionLargeMessage < bodySize; positionLargeMessage += chunkLength)
{
- int headerSize = message.getPropertiesEncodeSize();
-
- chunkLength = minLargeMessageSize - headerSize;
-
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(message.getPropertiesEncodeSize()));
- message.encodeProperties(headerBuffer);
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
- message.encodeBody(bodyBuffer, 0, chunkLength);
-
- chunk = new SessionSendChunkMessage(id,
- headerBuffer.array(),
- bodyBuffer.array(),
- chunkLength < bodySize,
- false);
+ if (availableCredits.get() <= 0)
+ {
+ System.out.println("Cancelling.. not enough credits");
+ return false;
+ }
+ else
+ {
+ System.out.println("good!!!");
+ }
+
+ if (positionLargeMessage == 0)
+ {
+ int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
+
+ chunkLength = minLargeMessageSize - headerSize;
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
+ pendingLargeMessage.encodeProperties(headerBuffer);
+
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
+ pendingLargeMessage.encodeBody(bodyBuffer, 0, chunkLength);
+
+ if (availableCredits != null)
+ {
+ availableCredits.addAndGet(-chunkLength);
+ }
+
+ chunk = new SessionSendChunkMessage(id,
+ headerBuffer.array(),
+ bodyBuffer.array(),
+ chunkLength < bodySize,
+ false);
+ }
+ else
+ {
+ chunkLength = (int)Math.min(bodySize - positionLargeMessage, minLargeMessageSize);
+
+ if (availableCredits != null)
+ {
+ int leftCredits = availableCredits.addAndGet(-chunkLength);
+// if (leftCredits < 0)
+// {
+// if (chunkLength > 0)
+// {
+// availableCredits.addAndGet(-leftCredits);
+// }
+// else
+// {
+// // sanity check only.. it shouldn't happen
+// // This next statement means, we didn't have enough credit to send anything, so we return the credits and give up sending
+// availableCredits.addAndGet(chunkLength);
+// return false;
+// }
+// }
+ }
+
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)chunkLength));
+
+ pendingLargeMessage.encodeBody(bodyBuffer, positionLargeMessage, chunkLength);
+
+ chunk = new SessionSendChunkMessage(id,
+ null,
+ bodyBuffer.array(),
+ positionLargeMessage + chunkLength < bodySize,
+ false);
+ }
+
+ channel.send(chunk);
}
- else
- {
- chunkLength = Math.min(bodySize - pos, minLargeMessageSize);
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
+
+ pendingLargeMessage.releaseResources();
+ this.pendingLargeMessage = null;
+ this.positionLargeMessage = -1;
+
+ return true;
+ }
+ finally
+ {
+ lock.unlock();
+ }
- message.encodeBody(bodyBuffer, pos, chunkLength);
-
-
- chunk = new SessionSendChunkMessage(id, null, bodyBuffer.array(), pos + chunkLength < bodySize, false);
- }
-
- if (availableCredits != null)
- {
- availableCredits.acquire(chunk.getPacketSize());
- }
-
- channel.send(chunk);
- }
-
- message.releaseResources();
}
private void doClose() throws Exception
@@ -543,7 +602,18 @@
private void promptDelivery()
{
- session.promptDelivery(messageQueue);
+ if (pendingLargeMessage != null)
+ {
+ if (sendChunks())
+ {
+ // prompt Delivery only if chunk was finished
+ session.promptDelivery(messageQueue);
+ }
+ }
+ else
+ {
+ session.promptDelivery(messageQueue);
+ }
}
// Inner classes
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -230,7 +230,7 @@
public void testMessageChunkNullPersistenceDelayed() throws Exception
{
- testInternal(false, false, 100, 50000, false, 1000, 100);
+ testInternal(false, false, 100, 50000, false, 10000, 100);
}
public void testMessageChunkFilePersistence() throws Exception
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -21,70 +21,51 @@
*/
package org.jboss.messaging.tests.integration.scheduling;
+import java.util.Calendar;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.MessagingService;
-import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.impl.XidImpl;
import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.util.SimpleString;
import org.jboss.util.id.GUID;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-import java.io.File;
-import java.util.Calendar;
-
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
-public class ScheduledMessageTest extends UnitTestCase
+public class ScheduledMessageTest extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ScheduledMessageTest.class);
- private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
-
- private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
-
- private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/journal";
-
- private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/bindings";
-
- private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/ScheduledMessageRecoveryTest/page";
-
private SimpleString atestq = new SimpleString("ascheduledtestq");
private SimpleString atestq2 = new SimpleString("ascheduledtestq2");
private MessagingService messagingService;
- private ConfigurationImpl configuration;
+ private Configuration configuration;
protected void setUp() throws Exception
{
- File file = new File(journalDir);
- File file2 = new File(bindingsDir);
- File file3 = new File(pageDir);
- deleteDirectory(file);
- file.mkdirs();
- deleteDirectory(file2);
- file2.mkdirs();
- deleteDirectory(file3);
- file3.mkdirs();
- configuration = new ConfigurationImpl();
+ super.clearData();
+ configuration = createDefaultConfig();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
- configuration.setPagingDirectory(pageDir);
+ configuration.setPagingMaxGlobalSizeBytes(-1);
+ messagingService = createService(true, configuration);
+ messagingService.start();
}
protected void tearDown() throws Exception
@@ -101,9 +82,7 @@
// ignore
}
}
- new File(journalDir).delete();
- new File(bindingsDir).delete();
- new File(pageDir).delete();
+ clearData();
}
public void testRecoveredMessageDeliveredCorrectly() throws Exception
@@ -159,14 +138,8 @@
public void testPagedMessageDeliveredCorrectly() throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- configuration.setPagingMaxGlobalSizeBytes(0);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
// then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(false, true, false, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
@@ -197,17 +170,11 @@
public void testPagedMessageDeliveredMultipleConsumersCorrectly() throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- configuration.setPagingMaxGlobalSizeBytes(0);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
QueueSettings qs = new QueueSettings();
qs.setRedeliveryDelay(5000l);
messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
// then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(false, true, false, false);
session.createQueue(atestq, atestq, null, true, true);
session.createQueue(atestq, atestq2, null, true, true);
@@ -252,17 +219,11 @@
public void testPagedMessageDeliveredMultipleConsumersAfterRecoverCorrectly() throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- configuration.setPagingMaxGlobalSizeBytes(0);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
QueueSettings qs = new QueueSettings();
qs.setRedeliveryDelay(5000l);
messagingService.getServer().getQueueSettingsRepository().addMatch(atestq2.toString(), qs);
// then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(false, true, false, false);
session.createQueue(atestq, atestq, null, true, true);
session.createQueue(atestq, atestq2, null, true, true);
@@ -288,9 +249,9 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+ messagingService = createService(true, configuration);
messagingService.start();
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ sessionFactory = createInVMFactory();
session = sessionFactory.createSession(false, true, true, false);
consumer = session.createConsumer(atestq);
consumer2 = session.createConsumer(atestq2);
@@ -316,13 +277,8 @@
public void testMessageDeliveredCorrectly(boolean recover) throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ // then we create a client as normal
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(false, true, false, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
@@ -344,9 +300,9 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+ messagingService = createService(true, configuration);
messagingService.start();
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ sessionFactory = createInVMFactory();
session = sessionFactory.createSession(false, true, true, false);
}
ClientConsumer consumer = session.createConsumer(atestq);
@@ -370,13 +326,7 @@
public void testScheduledMessagesDeliveredCorrectly(boolean recover) throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(false, true, false, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
@@ -403,10 +353,10 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+ messagingService = createService(true, configuration);
messagingService.start();
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ sessionFactory = createInVMFactory();
session = sessionFactory.createSession(false, true, true, false);
}
@@ -451,13 +401,7 @@
public void testScheduledMessagesDeliveredCorrectlyDifferentOrder(boolean recover) throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(false, true, false, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
@@ -485,10 +429,10 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+ messagingService = createService(true, configuration);
messagingService.start();
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ sessionFactory = createInVMFactory();
session = sessionFactory.createSession(false, true, true, false);
@@ -533,13 +477,7 @@
public void testScheduledAndNormalMessagesDeliveredCorrectly(boolean recover) throws Exception
{
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(false, true, false, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
@@ -565,10 +503,10 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+ messagingService = createService(true, configuration);
messagingService.start();
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ sessionFactory = createInVMFactory();
session = sessionFactory.createSession(false, true, true, false);
}
@@ -609,13 +547,9 @@
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
Xid xid2 = new XidImpl("xa2".getBytes(), 1, new GUID().toString().getBytes());
- TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
- configuration.getAcceptorConfigurations().add(transportConfig);
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
- // start the server
- messagingService.start();
- // then we create a client as normal
- ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+
+
+ ClientSessionFactory sessionFactory = createInVMFactory();
ClientSession session = sessionFactory.createSession(true, false, false, false);
session.createQueue(atestq, atestq, null, true, false);
session.start(xid, XAResource.TMNOFLAGS);
@@ -639,10 +573,10 @@
session.close();
messagingService.stop();
messagingService = null;
- messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir, null);
+ messagingService = createService(true, configuration);
messagingService.start();
- sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ sessionFactory = createInVMFactory();
session = sessionFactory.createSession(true, false, false, false);
}
Modified: branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-28 17:40:46 UTC (rev 5193)
+++ branches/Branch_Chunk_CRS2/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-10-28 23:58:25 UTC (rev 5194)
@@ -138,6 +138,11 @@
return createService(realFiles, createDefaultConfig(), new HashMap<String, QueueSettings>());
}
+ protected MessagingService createService(final boolean realFiles, final Configuration configuration)
+ {
+ return createService(realFiles, configuration, new HashMap<String, QueueSettings>());
+ }
+
protected Configuration createDefaultConfig()
{
return createDefaultConfig(false);
@@ -164,6 +169,7 @@
configuration.setJournalMinFiles(2);
configuration.setJournalFileSize(100 * 1024);
configuration.setPagingDirectory(pageDir);
+ configuration.setLargeMessagesDirectory(largeMessagesDir);
configuration.getAcceptorConfigurations().clear();
More information about the jboss-cvs-commits
mailing list