[jboss-cvs] JBoss Messaging SVN: r7628 - in trunk: src/main/org/jboss/messaging/core/client/impl and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 27 23:21:24 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-07-27 23:21:22 -0400 (Mon, 27 Jul 2009)
New Revision: 7628
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java
trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java
trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1601 - Implementing Client cache for LargeMessages
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -62,6 +62,10 @@
long getClientFailureCheckPeriod();
void setClientFailureCheckPeriod(long clientFailureCheckPeriod);
+
+ boolean isCacheLargeMessagesClient();
+
+ void setCacheLargeMessagesClient(boolean cached);
long getConnectionTTL();
Modified: trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/LargeMessageBuffer.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -42,6 +42,8 @@
void close();
+ void cancel();
+
void setOutputStream(final OutputStream output) throws MessagingException;
void saveBuffer(final OutputStream output) throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.utils.Future;
import org.jboss.messaging.utils.TokenBucketLimiter;
+import org.jboss.messaging.utils.UUIDGenerator;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -70,13 +71,12 @@
private final Runner runner = new Runner();
- private final File directory;
-
private ClientMessageInternal currentChunkMessage;
-
+
private LargeMessageBufferImpl currentLargeMessageBuffer;
-
- // When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard te body before moving to the next message.
+
+ // When receiving LargeMessages, the user may choose to not read the body, on this case we need to discard the body
+ // before moving to the next message.
private ClientMessageInternal largeMessageReceived;
private final TokenBucketLimiter rateLimiter;
@@ -92,7 +92,7 @@
private volatile boolean closed;
private volatile int creditsToSend;
-
+
private volatile boolean slowConsumerInitialCreditSent = false;
private volatile Exception lastException;
@@ -115,7 +115,6 @@
final TokenBucketLimiter rateLimiter,
final Executor executor,
final Channel channel,
- final File directory,
final boolean preAcknowledge)
{
this.id = id;
@@ -132,18 +131,16 @@
this.ackBatchSize = ackBatchSize;
- this.directory = directory;
-
this.preAcknowledge = preAcknowledge;
}
// ClientConsumer implementation
// -----------------------------------------------------------------
- public ClientMessage receive(long timeout) throws MessagingException
+ public ClientMessage receive(long timeout) throws MessagingException
{
checkClosed();
-
+
if (largeMessageReceived != null)
{
// Check if there are pending packets to be received
@@ -161,7 +158,7 @@
throw new MessagingException(MessagingException.ILLEGAL_STATE,
"Cannot call receive(...) - a MessageHandler is set");
}
-
+
if (clientWindowSize == 0)
{
startSlowConsumer();
@@ -218,20 +215,20 @@
if (m != null)
{
- //if we have already pre acked we cant expire
+ // if we have already pre acked we cant expire
boolean expired = !preAcknowledge && m.isExpired();
flowControlBeforeConsumption(m);
if (expired)
{
m.discardLargeBody();
-
+
session.expire(id, m.getMessageID());
if (clientWindowSize == 0)
{
startSlowConsumer();
- }
+ }
if (toWait > 0)
{
@@ -247,7 +244,7 @@
{
this.largeMessageReceived = m;
}
-
+
return m;
}
else
@@ -335,14 +332,6 @@
return closed;
}
- /* (non-Javadoc)
- * @see org.jboss.messaging.core.client.ClientConsumer#isLargeMessagesAsFiles()
- */
- public boolean isFileConsumer()
- {
- return directory != null;
- }
-
public void stop() throws MessagingException
{
waitForOnMessageToComplete();
@@ -389,7 +378,7 @@
ClientMessageInternal messageToHandle = message;
messageToHandle.onReceipt(this);
-
+
if (trace)
{
log.trace("Adding message " + message + " into buffer");
@@ -419,18 +408,26 @@
// This is ok - we just ignore the message
return;
}
-
+
// Flow control for the first packet, we will have others
flowControl(packet.getPacketSize(), false);
currentChunkMessage = new ClientMessageImpl();
-
+
currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
-
+
currentChunkMessage.setLargeMessage(true);
- currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60);
+ File largeMessageCache = null;
+
+ if (session.isCacheLargeMessageClient())
+ {
+ largeMessageCache = File.createTempFile("tmp-large-message-" + currentChunkMessage.getMessageID()+ "-", ".tmp");
+ largeMessageCache.deleteOnExit();
+ }
+ currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+
currentChunkMessage.setBody(currentLargeMessageBuffer);
currentChunkMessage.setFlowControlSize(0);
@@ -444,7 +441,7 @@
{
return;
}
-
+
currentLargeMessageBuffer.addPacket(chunk);
}
@@ -502,17 +499,17 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
-
+
if (creditsToSend >= clientWindowSize)
{
-
+
if (clientWindowSize == 0 && discountSlowConsumer)
{
if (trace)
{
log.trace("Sending " + creditsToSend + " -1, for slow consumer");
}
-
+
slowConsumerInitialCreditSent = false;
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
@@ -520,7 +517,7 @@
final int credits = creditsToSend - 1;
creditsToSend = 0;
-
+
sendCredits(credits);
}
else
@@ -529,7 +526,7 @@
{
log.trace("Sending " + messageBytes + " from flow-control");
}
-
+
final int credits = creditsToSend;
creditsToSend = 0;
@@ -669,12 +666,12 @@
log.trace("Calling handler.onMessage");
}
theHandler.onMessage(message);
-
+
if (trace)
{
log.trace("Handler.onMessage done");
}
-
+
if (message.isLargeMessage())
{
message.discardLargeBody();
@@ -723,13 +720,12 @@
// Now we wait for any current handler runners to run.
waitForOnMessageToComplete();
-
+
if (currentLargeMessageBuffer != null)
{
- currentLargeMessageBuffer.close();
+ currentLargeMessageBuffer.cancel();
currentLargeMessageBuffer = null;
}
-
closed = true;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -58,8 +58,6 @@
void acknowledge(ClientMessage message) throws MessagingException;
void flushAcks() throws MessagingException;
-
- boolean isFileConsumer();
void stop() throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -108,6 +108,8 @@
public static final int DEFAULT_THREAD_POOL_MAX_SIZE = -1;
public static final int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
+
+ public static final boolean DEFAULT_CACHE_LARGE_MESSAGE_CLIENT = false;
// Attributes
// -----------------------------------------------------------------------------------
@@ -129,6 +131,8 @@
private boolean readOnly;
// Settable attributes:
+
+ private boolean cacheLargeMessagesClient = DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
private List<Pair<TransportConfiguration, TransportConfiguration>> staticConnectors;
@@ -386,6 +390,17 @@
// ClientSessionFactory implementation------------------------------------------------------------
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ return cacheLargeMessagesClient;
+ }
+
+ public synchronized void setCacheLargeMessagesClient(boolean cached)
+ {
+ this.cacheLargeMessagesClient = cached;
+ }
+
+
public synchronized List<Pair<TransportConfiguration, TransportConfiguration>> getStaticConnectors()
{
return staticConnectors;
@@ -966,6 +981,7 @@
autoCommitAcks,
preAcknowledge,
ackBatchSize,
+ cacheLargeMessagesClient,
minLargeMessageSize,
blockOnAcknowledge,
autoGroup,
@@ -1007,4 +1023,5 @@
connectionManagerMap.values().toArray(connectionManagerArray);
}
+
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -153,6 +153,8 @@
private final boolean blockOnPersistentSend;
private final int minLargeMessageSize;
+
+ private final boolean cacheLargeMessageClient;
private final Channel channel;
@@ -185,6 +187,7 @@
final int producerMaxRate,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final RemotingConnection remotingConnection,
final int version,
@@ -226,6 +229,8 @@
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
this.blockOnPersistentSend = blockOnPersistentSend;
+
+ this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
}
@@ -353,7 +358,7 @@
final int maxRate,
final boolean browseOnly) throws MessagingException
{
- return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly, null);
+ return internalCreateConsumer(queueName, filterString, windowSize, maxRate, browseOnly);
}
public ClientConsumer createConsumer(final String queueName,
@@ -568,6 +573,20 @@
// ClientSessionInternal implementation
// ------------------------------------------------------------
+
+ public int getMinLargeMessageSize()
+ {
+ return minLargeMessageSize;
+ }
+
+ /**
+ * @return the cacheLargeMessageClient
+ */
+ public boolean isCacheLargeMessageClient()
+ {
+ return cacheLargeMessageClient;
+ }
+
public String getName()
{
return name;
@@ -1123,8 +1142,7 @@
final SimpleString filterString,
final int windowSize,
final int maxRate,
- final boolean browseOnly,
- final File directory) throws MessagingException
+ final boolean browseOnly) throws MessagingException
{
checkClosed();
@@ -1174,7 +1192,6 @@
: null,
executor,
channel,
- directory,
preAcknowledge);
addConsumer(consumer);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -29,7 +29,11 @@
String getName();
void acknowledge(long consumerID, long messageID) throws MessagingException;
-
+
+ boolean isCacheLargeMessageClient();
+
+ int getMinLargeMessageSize();
+
MessagingBuffer createBuffer(int size);
void expire(long consumerID, long messageID) throws MessagingException;
@@ -51,8 +55,8 @@
boolean handleFailover(RemotingConnection backupConnection);
RemotingConnection getConnection();
-
+
void cleanUp() throws Exception;
-
+
void returnBlocking();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManager.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -44,6 +44,7 @@
final boolean autoCommitAcks,
final boolean preAcknowledge,
final int ackBatchSize,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final boolean blockOnAcknowledge,
final boolean autoGroup,
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -256,6 +256,7 @@
final boolean autoCommitAcks,
final boolean preAcknowledge,
final int ackBatchSize,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final boolean blockOnAcknowledge,
final boolean autoGroup,
@@ -358,6 +359,7 @@
producerMaxRate,
blockOnNonPersistentSend,
blockOnPersistentSend,
+ cacheLargeMessageClient,
minLargeMessageSize,
connection,
response.getServerVersion(),
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/LargeMessageBufferImpl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -22,10 +22,14 @@
package org.jboss.messaging.core.client.impl;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.concurrent.LinkedBlockingQueue;
@@ -34,6 +38,7 @@
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.client.LargeMessageBuffer;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.DataConstants;
@@ -56,6 +61,8 @@
// Attributes ----------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageBufferImpl.class);
+
private final ClientConsumerInternal consumerInternal;
private final LinkedBlockingQueue<SessionReceiveContinuationMessage> packets = new LinkedBlockingQueue<SessionReceiveContinuationMessage>();
@@ -80,6 +87,8 @@
private Exception handledException;
+ private final FileCache fileCache;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -88,9 +97,25 @@
final long totalSize,
final int readTimeout)
{
+ this(consumerInternal, totalSize, readTimeout, null);
+ }
+
+ public LargeMessageBufferImpl(final ClientConsumerInternal consumerInternal,
+ final long totalSize,
+ final int readTimeout,
+ final File cachedFile)
+ {
this.consumerInternal = consumerInternal;
this.readTimeout = readTimeout;
this.totalSize = totalSize;
+ if (cachedFile == null)
+ {
+ this.fileCache = null;
+ }
+ else
+ {
+ this.fileCache = new FileCache(cachedFile);
+ }
}
// Public --------------------------------------------------------
@@ -125,7 +150,7 @@
{
int flowControlCredit = 0;
boolean continues = false;
-
+
synchronized (this)
{
if (outStream != null)
@@ -137,6 +162,11 @@
streamEnded = true;
}
+ if (fileCache != null)
+ {
+ fileCache.cachePackage(packet.getBody());
+ }
+
outStream.write(packet.getBody());
flowControlCredit = packet.getPacketSize();
@@ -151,11 +181,25 @@
}
catch (Exception e)
{
+ log.warn(e.getMessage(), e);
handledException = e;
}
}
else
{
+ if (fileCache != null)
+ {
+ try
+ {
+ fileCache.cachePackage(packet.getBody());
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ handledException = e;
+ }
+ }
+
packets.offer(packet);
}
}
@@ -168,18 +212,28 @@
}
catch (Exception e)
{
+ log.warn(e.getMessage(), e);
handledException = e;
}
}
}
- public synchronized void close()
+ public synchronized void cancel()
{
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
+
notifyAll();
}
+ public synchronized void close()
+ {
+ if (fileCache != null)
+ {
+ fileCache.close();
+ }
+ }
+
public void setOutputStream(final OutputStream output) throws MessagingException
{
@@ -289,14 +343,23 @@
*/
public byte getByte(final int index)
{
- checkForPacket(index);
- return currentPacket.getBody()[(int)(index - packetPosition)];
+ return getByte((long)index);
}
private byte getByte(final long index)
{
checkForPacket(index);
- return currentPacket.getBody()[(int)(index - packetPosition)];
+
+ //System.out.println("position = " + index + " , packetPosition = " + packetPosition + " filecache = " + fileCache);
+
+ if (fileCache != null && index < packetPosition)
+ {
+ return fileCache.getByteFromCache(index);
+ }
+ else
+ {
+ return currentPacket.getBody()[(int)(index - packetPosition)];
+ }
}
/* (non-Javadoc)
@@ -555,7 +618,15 @@
public void readerIndex(final int readerIndex)
{
- checkForPacket(readerIndex);
+ try
+ {
+ checkForPacket(readerIndex);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
this.readerIndex = readerIndex;
}
@@ -576,7 +647,15 @@
public void setIndex(final int readerIndex, final int writerIndex)
{
- checkForPacket(readerIndex);
+ try
+ {
+ checkForPacket(readerIndex);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
this.readerIndex = readerIndex;
}
@@ -620,7 +699,15 @@
public void resetReaderIndex()
{
- checkForPacket(0);
+ try
+ {
+ checkForPacket(0);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
public void markWriterIndex()
@@ -1186,11 +1273,15 @@
{
throw new IndexOutOfBoundsException();
}
- if (index < lastIndex)
+
+ if (fileCache == null)
{
- throw new IllegalAccessError("LargeMessage have read-only and one-way buffers");
+ if (index < lastIndex)
+ {
+ throw new IllegalAccessError("LargeMessage have read-only and one-way buffers");
+ }
+ lastIndex = index;
}
- lastIndex = index;
while (index >= packetLastPosition && !streamEnded)
{
@@ -1198,6 +1289,148 @@
}
}
+ /**
+ * @param body
+ */
// Inner classes -------------------------------------------------
+ private class FileCache
+ {
+
+ private final int BUFFER_SIZE = 10 * 1024;
+
+ public FileCache(File cachedFile)
+ {
+ this.cachedFile = cachedFile;
+ }
+
+ ByteBuffer readCache;
+
+ long readCachePositionStart = Integer.MAX_VALUE;
+
+ long readCachePositionEnd = -1;
+
+ private final File cachedFile;
+
+ private volatile RandomAccessFile cachedRAFile;
+
+ private volatile FileChannel cachedChannel;
+
+ private synchronized void readCache(long position)
+ {
+
+ try
+ {
+ if (position < readCachePositionStart || position > readCachePositionEnd)
+ {
+
+ checkOpen();
+
+ if (position > cachedChannel.size())
+ {
+ throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size());
+ }
+
+ readCachePositionStart = (position / BUFFER_SIZE) * BUFFER_SIZE;
+
+ if (readCache == null)
+ {
+ readCache = ByteBuffer.allocate(BUFFER_SIZE);
+ }
+
+ readCache.clear();
+
+ readCachePositionEnd = readCachePositionStart + cachedChannel.read(readCache) -1;
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ finally
+ {
+ close();
+ }
+ }
+
+ public synchronized byte getByteFromCache(long position)
+ {
+ readCache(position);
+
+ return readCache.get((int)(position - readCachePositionStart));
+
+ }
+
+ public void cachePackage(byte[] body) throws Exception
+ {
+ checkOpen();
+
+ cachedChannel.position(cachedChannel.size());
+ cachedChannel.write(ByteBuffer.wrap(body));
+
+ close();
+ }
+
+ /**
+ * @throws FileNotFoundException
+ */
+ public void checkOpen() throws FileNotFoundException
+ {
+ if (cachedFile != null || !cachedChannel.isOpen())
+ {
+ this.cachedRAFile = new RandomAccessFile(cachedFile, "rw");
+
+ cachedChannel = cachedRAFile.getChannel();
+ }
+ }
+
+ public void close()
+ {
+ if (cachedChannel != null && cachedChannel.isOpen())
+ {
+ try
+ {
+ cachedChannel.close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ cachedChannel = null;
+ }
+
+ if (cachedRAFile != null)
+ {
+ try
+ {
+ cachedRAFile.close();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ cachedRAFile = null;
+ }
+
+ }
+
+ protected void finalize()
+ {
+ close();
+ if (cachedFile != null && cachedFile.exists())
+ {
+ try
+ {
+ cachedFile.delete();
+ }
+ catch (Exception e)
+ {
+ log.warn("Exception during finalization for LargeMessage file cache", e);
+ }
+ }
+ }
+
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossConnectionFactory.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -46,9 +46,9 @@
*/
public class JBossConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory, Serializable/*
- * , Referenceable
- * http://jira.jboss.org/jira/browse/JBMESSAGING-395
- */
+ * , Referenceable
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-395
+ */
{
// Constants ------------------------------------------------------------------------------------
@@ -67,9 +67,9 @@
private int dupsOKBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
private int transactionBatchSize = ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE;
-
+
private boolean readOnly;
-
+
// Constructors ---------------------------------------------------------------------------------
public JBossConnectionFactory()
@@ -102,7 +102,7 @@
{
this(connectorConfig, null);
}
-
+
// ConnectionFactory implementation -------------------------------------------------------------
public Connection createConnection() throws JMSException
@@ -239,7 +239,7 @@
public synchronized long getDiscoveryInitialWaitTimeout()
{
- return sessionFactory.getDiscoveryInitialWaitTimeout();
+ return sessionFactory.getDiscoveryInitialWaitTimeout();
}
public synchronized void setDiscoveryInitialWaitTimeout(long discoveryInitialWaitTimeout)
@@ -253,7 +253,7 @@
}
public synchronized void setClientID(String clientID)
- {
+ {
checkWrite();
this.clientID = clientID;
}
@@ -350,6 +350,19 @@
sessionFactory.setProducerMaxRate(producerMaxRate);
}
+ /**
+ * @param cacheLargeMessagesClient
+ */
+ public synchronized void setCacheLargeMessagesClient(boolean cacheLargeMessagesClient)
+ {
+ sessionFactory.setCacheLargeMessagesClient(cacheLargeMessagesClient);
+ }
+
+ public synchronized boolean isCacheLargeMessagesClient()
+ {
+ return sessionFactory.isCacheLargeMessagesClient();
+ }
+
public synchronized int getMinLargeMessageSize()
{
return sessionFactory.getMinLargeMessageSize();
@@ -459,7 +472,7 @@
{
sessionFactory.setFailoverOnServerShutdown(failoverOnServerShutdown);
}
-
+
public synchronized boolean isUseGlobalPools()
{
return sessionFactory.isUseGlobalPools();
@@ -469,7 +482,7 @@
{
sessionFactory.setUseGlobalPools(useGlobalPools);
}
-
+
public synchronized int getScheduledThreadPoolMaxSize()
{
return sessionFactory.getScheduledThreadPoolMaxSize();
@@ -489,12 +502,12 @@
{
sessionFactory.setThreadPoolMaxSize(threadPoolMaxSize);
}
-
+
public ClientSessionFactory getCoreFactory()
{
return sessionFactory;
}
-
+
public void close()
{
sessionFactory.close();
@@ -510,7 +523,7 @@
final int type) throws JMSException
{
readOnly = true;
-
+
JBossConnection connection = new JBossConnection(username,
password,
type,
@@ -534,7 +547,6 @@
}
}
-
// Inner classes --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/JMSServerManager.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -150,6 +150,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -181,6 +182,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -133,6 +133,7 @@
int consumerMaxRate = getInteger(e, "consumer-max-rate", ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE, MINUS_ONE_OR_GT_ZERO);
int producerWindowSize = getInteger(e, "producer-window-size", ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE, GT_ZERO);
int producerMaxRate = getInteger(e, "producer-max-rate", ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE, MINUS_ONE_OR_GT_ZERO);
+ boolean cacheLargeMessagesClient = getBoolean(e, "cache-large-message-client", ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT);
int minLargeMessageSize = getInteger(e, "min-large-message-size", ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE, GT_ZERO);
boolean blockOnAcknowledge = getBoolean(e, "block-on-acknowledge", ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE);
boolean blockOnNonPersistentSend = getBoolean(e, "block-on-non-persistent-send", ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND);
@@ -235,6 +236,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessagesClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -267,6 +269,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessagesClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerManagerImpl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -351,6 +351,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -383,6 +384,7 @@
cf.setConnectionTTL(connectionTTL);
cf.setCallTimeout(callTimeout);
cf.setMaxConnections(maxConnections);
+ cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
@@ -417,6 +419,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessagesClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -451,6 +454,7 @@
cf.setConnectionTTL(connectionTTL);
cf.setCallTimeout(callTimeout);
cf.setMaxConnections(maxConnections);
+ cf.setCacheLargeMessagesClient(cacheLargeMessagesClient);
cf.setMinLargeMessageSize(minLargeMessageSize);
cf.setConsumerWindowSize(consumerWindowSize);
cf.setConsumerMaxRate(consumerMaxRate);
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/JMSServerControl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -103,6 +103,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -135,6 +136,7 @@
@Parameter(name = "connectionTTL") long connectionTTL,
@Parameter(name = "callTimeout") long callTimeout,
@Parameter(name = "maxConnections") int maxConnections,
+ @Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
@Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
@Parameter(name = "consumerWindowSize") int consumerWindowSize,
@Parameter(name = "consumerMaxRate") int consumerMaxRate,
@@ -179,6 +181,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -212,6 +215,7 @@
@Parameter(name = "connectionTTL") long connectionTTL,
@Parameter(name = "callTimeout") long callTimeout,
@Parameter(name = "maxConnections") int maxConnections,
+ @Parameter(name = "cacheLargemessageClient") boolean cacheLargeMessageClient,
@Parameter(name = "minLargeMessageSize") int minLargeMessageSize,
@Parameter(name = "consumerWindowSize") int consumerWindowSize,
@Parameter(name = "consumerMaxRate") int consumerMaxRate,
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/impl/JMSServerControlImpl.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -218,6 +218,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -254,6 +255,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -289,6 +291,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -327,6 +330,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -383,6 +387,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -417,6 +422,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -452,6 +458,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -486,6 +493,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
Modified: trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/src/main/org/jboss/messaging/jms/server/management/jmx/impl/ReplicationAwareJMSServerControlWrapper.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -76,6 +76,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -108,6 +109,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -141,6 +143,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -173,6 +176,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -338,6 +342,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -371,6 +376,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -404,6 +410,7 @@
final long connectionTTL,
final long callTimeout,
final int maxConnections,
+ final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int consumerWindowSize,
final int consumerMaxRate,
@@ -437,6 +444,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/CTSMiscellaneousTest.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -31,6 +31,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -101,6 +102,7 @@
DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/JMSTestCase.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -10,6 +10,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -76,6 +77,7 @@
DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
DEFAULT_CONSUMER_WINDOW_SIZE,
DEFAULT_CONSUMER_MAX_RATE,
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -30,6 +30,7 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
@@ -325,6 +326,7 @@
DEFAULT_CONNECTION_TTL,
DEFAULT_CALL_TIMEOUT,
DEFAULT_MAX_CONNECTIONS,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
DEFAULT_MIN_LARGE_MESSAGE_SIZE,
prefetchSize,
DEFAULT_CONSUMER_MAX_RATE,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/BridgeTestBase.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -190,6 +190,7 @@
cf.setReconnectAttempts(0);
cf.setBlockOnNonPersistentSend(true);
cf.setBlockOnPersistentSend(true);
+ cf.setCacheLargeMessagesClient(true);
return cf;
}
@@ -209,6 +210,7 @@
cf.setReconnectAttempts(0);
cf.setBlockOnNonPersistentSend(true);
cf.setBlockOnPersistentSend(true);
+ cf.setCacheLargeMessagesClient(true);
return cf;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/bridge/JMSBridgeReconnectionTest.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -47,10 +47,10 @@
// The bridge won't work with largeMessages & failures
// https://jira.jboss.org/jira/browse/JBMESSAGING-1601
-// public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P_LargeMessage() throws Exception
-// {
-// testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, true);
-// }
+ public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_P_LargeMessage() throws Exception
+ {
+ testCrashAndReconnectDestBasic(QualityOfServiceMode.ONCE_AND_ONLY_ONCE, true, true);
+ }
public void testCrashAndReconnectDestBasic_OnceAndOnlyOnce_NP() throws Exception
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlTest.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -406,6 +406,7 @@
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
@@ -450,6 +451,7 @@
ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+ ClientSessionFactoryImpl.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -96,6 +96,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -128,6 +129,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -161,6 +163,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -193,6 +196,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -375,6 +379,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -408,6 +413,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
@@ -442,6 +448,7 @@
long connectionTTL,
long callTimeout,
int maxConnections,
+ boolean cacheLargeMessageClient,
int minLargeMessageSize,
int consumerWindowSize,
int consumerMaxRate,
@@ -475,6 +482,7 @@
connectionTTL,
callTimeout,
maxConnections,
+ cacheLargeMessageClient,
minLargeMessageSize,
consumerWindowSize,
consumerMaxRate,
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/client/impl/LargeMessageBufferTest.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -25,6 +25,7 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PipedInputStream;
@@ -195,6 +196,44 @@
assertEquals(f1, readBuffer.readFloat());
}
+ private File getTestFile()
+ {
+ return new File(getTestDir(), "temp.file");
+ }
+
+ public void testReadDataOverCached() throws Exception
+ {
+ clearData();
+
+ ChannelBuffer dynamic = ChannelBuffers.dynamicBuffer(1);
+
+ String str1 = RandomUtil.randomString();
+ String str2 = RandomUtil.randomString();
+ Double d1 = RandomUtil.randomDouble();
+ float f1 = RandomUtil.randomFloat();
+
+ dynamic.writeUTF(str1);
+ dynamic.writeString(str2);
+ dynamic.writeDouble(d1);
+ dynamic.writeFloat(f1);
+
+ LargeMessageBufferImpl readBuffer = splitBuffer(3, dynamic.array(), getTestFile());
+
+ assertEquals(str1, readBuffer.readUTF());
+ assertEquals(str2, readBuffer.readString());
+ assertEquals(d1, readBuffer.readDouble());
+ assertEquals(f1, readBuffer.readFloat());
+
+ readBuffer.readerIndex(0);
+
+ assertEquals(str1, readBuffer.readUTF());
+ assertEquals(str2, readBuffer.readString());
+ assertEquals(d1, readBuffer.readDouble());
+ assertEquals(f1, readBuffer.readFloat());
+
+ readBuffer.close();
+ }
+
public void testReadPartialData() throws Exception
{
@@ -239,7 +278,7 @@
latchGo.await();
- buffer.close();
+ buffer.cancel();
t.join();
@@ -262,7 +301,9 @@
public void testStreamData() throws Exception
{
- final LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), 1024 * 11 + 123, 1);
+ final LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(),
+ 1024 * 11 + 123,
+ 1);
final PipedOutputStream output = new PipedOutputStream();
final PipedInputStream input = new PipedInputStream(output);
@@ -373,11 +414,11 @@
assertEquals(0, errors.get());
}
-
+
public void testStreamDataWaitCompletionOnCompleteBuffer() throws Exception
{
final LargeMessageBufferImpl outBuffer = create15BytesSample();
-
+
outBuffer.saveBuffer(new OutputStream()
{
@Override
@@ -395,7 +436,6 @@
outBuffer.addPacket(new SessionReceiveContinuationMessage(-1, new byte[] { 0, 1, 2, 3, 4 }, true, false));
-
final CountDownLatch latchBytesWritten1 = new CountDownLatch(5);
final CountDownLatch latchBytesWritten2 = new CountDownLatch(10);
@@ -408,8 +448,7 @@
latchBytesWritten2.countDown();
}
});
-
-
+
latchBytesWritten1.await();
try
@@ -420,8 +459,7 @@
catch (IllegalAccessError ignored)
{
}
-
-
+
assertTrue("It waited too much", System.currentTimeMillis() - start < 30000);
}
@@ -462,8 +500,13 @@
private LargeMessageBufferImpl splitBuffer(int splitFactor, byte[] bytes) throws Exception
{
- LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), bytes.length, 5);
+ return splitBuffer(splitFactor, bytes, null);
+ }
+ private LargeMessageBufferImpl splitBuffer(int splitFactor, byte[] bytes, File file) throws Exception
+ {
+ LargeMessageBufferImpl outBuffer = new LargeMessageBufferImpl(new FakeConsumerInternal(), bytes.length, 5, file);
+
ByteArrayInputStream input = new ByteArrayInputStream(bytes);
while (true)
@@ -700,6 +743,21 @@
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.client.ClientConsumer#getLargeMessageCacheDir()
+ */
+ public File getLargeMessageCacheDir()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.client.ClientConsumer#setLargeMessageCacheDir(java.io.File)
+ */
+ public void setLargeMessageCacheDir(File largeMessageCacheDir)
+ {
+ }
+
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -99,28 +99,6 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
- protected void clearData()
- {
- clearData(getTestDir());
- }
-
- protected void clearData(String testDir)
- {
- // Need to delete the root
-
- File file = new File(testDir);
- deleteDirectory(file);
- file.mkdirs();
-
- recreateDirectory(getJournalDir(testDir));
- recreateDirectory(getBindingsDir(testDir));
- recreateDirectory(getPageDir(testDir));
- recreateDirectory(getLargeMessagesDir(testDir));
- recreateDirectory(getClientLargeMessagesDir(testDir));
- recreateDirectory(getTemporaryDir(testDir));
- }
-
protected Configuration createConfigForJournal()
{
Configuration config = new ConfigurationImpl();
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-07-27 17:17:58 UTC (rev 7627)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2009-07-28 03:21:22 UTC (rev 7628)
@@ -273,6 +273,28 @@
return testDir;
}
+ protected void clearData()
+ {
+ clearData(getTestDir());
+ }
+
+ protected void clearData(String testDir)
+ {
+ // Need to delete the root
+
+ File file = new File(testDir);
+ deleteDirectory(file);
+ file.mkdirs();
+
+ recreateDirectory(getJournalDir(testDir));
+ recreateDirectory(getBindingsDir(testDir));
+ recreateDirectory(getPageDir(testDir));
+ recreateDirectory(getLargeMessagesDir(testDir));
+ recreateDirectory(getClientLargeMessagesDir(testDir));
+ recreateDirectory(getTemporaryDir(testDir));
+ }
+
+
/**
* @return the journalDir
*/
More information about the jboss-cvs-commits
mailing list