[jboss-cvs] JBoss Messaging SVN: r5482 - in trunk: src/main/org/jboss/messaging/core/client/impl and 15 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 8 22:53:21 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-12-08 22:53:21 -0500 (Mon, 08 Dec 2008)
New Revision: 5482
Added:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
Removed:
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Chunk & Paging tweaks & PreCommit on Chunk
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientFileMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -42,6 +42,8 @@
void setFile(File file);
+ void setLargeMessage(boolean largeMessage);
+
FileChannel getChannel() throws MessagingException;
void closeChannel() throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -46,6 +46,4 @@
void acknowledge() throws MessagingException;
boolean isLargeMessage();
-
- void setLargeMessage(boolean largeMessage);
}
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSessionFactory.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -38,13 +38,10 @@
boolean xa,
boolean autoCommitSends,
boolean autoCommitAcks,
- final boolean preAcknowledge,
+ boolean preAcknowledge,
int ackBatchSize) throws MessagingException;
- ClientSession createSession(final boolean xa,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean preAcknowledge) throws MessagingException;
+ ClientSession createSession(boolean xa, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge) throws MessagingException;
void setConsumerWindowSize(int size);
@@ -64,19 +61,19 @@
int getMinLargeMessageSize();
- void setMinLargeMessageSize(final int minLargeMessageSize);
+ void setMinLargeMessageSize(int minLargeMessageSize);
boolean isBlockOnPersistentSend();
- void setBlockOnPersistentSend(final boolean blocking);
+ void setBlockOnPersistentSend(boolean blocking);
boolean isBlockOnNonPersistentSend();
- void setBlockOnNonPersistentSend(final boolean blocking);
+ void setBlockOnNonPersistentSend(boolean blocking);
boolean isBlockOnAcknowledge();
- void setBlockOnAcknowledge(final boolean blocking);
+ void setBlockOnAcknowledge(boolean blocking);
boolean isAutoGroup();
@@ -95,10 +92,10 @@
long getCallTimeout();
int getMaxConnections();
-
-// TransportConfiguration getTransportConfiguration();
-//
-// TransportConfiguration getBackupTransportConfiguration();
-
+
+ // TransportConfiguration getTransportConfiguration();
+ //
+ // TransportConfiguration getBackupTransportConfiguration();
+
void close();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -28,7 +28,7 @@
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.Future;
@@ -183,11 +183,7 @@
{
boolean expired = m.isExpired();
- // Chunk messages will execute the flow control while receiving the chunks
- if (!m.isLargeMessage())
- {
- flowControl(m.getEncodeSize());
- }
+ flowControlBeforeConsumption(m);
if (expired)
{
@@ -337,61 +333,60 @@
}
}
- public void handleChunk(SessionSendChunkMessage chunk) throws Exception
+
+ public synchronized void handleLargeMessage(final byte[] header) throws Exception
{
- if (closed)
+ if (closing)
{
+ // This is ok - we just ignore the message
return;
}
- flowControl(chunk.getBody().length);
+ currentChunkMessage = createFileMessage(header);
+
+ // We won't call flow control at this point, as we will only flow control the header right before consumption
+
- if (chunk.getHeader() != null)
+ }
+
+ public synchronized void handleLargeMessageContinuation(SessionReceiveContinuationMessage chunk) throws Exception
+ {
+ if (closing)
{
- // The Header only comes on the first message, so a buffer has to be created on the client
- // to hold either a file or a big message
- MessagingBuffer header = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getHeader()));
+ return;
+ }
- currentChunkMessage = createFileMessage(header);
+ ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
- if (currentChunkMessage instanceof ClientFileMessage)
- {
- ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
- addBytesBody(fileMessage, chunk.getBody());
- }
- else
- {
- MessagingBuffer initialBody = new ByteBufferWrapper(ByteBuffer.wrap(chunk.getBody()));
- currentChunkMessage.setBody(initialBody);
- }
+ flowControl(chunk.getBody().length);
+
+ if (isFileConsumer())
+ {
+ ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
+ addBytesBody(fileMessage, chunk.getBody());
}
else
{
- // No header.. this is then a continuation of a previous message
- ByteBuffer body = ByteBuffer.wrap(chunk.getBody());
+ MessagingBuffer currentBody = currentChunkMessage.getBody();
+
+ final int currentBodySize = currentBody == null ? 0 : currentBody.limit();
- if (currentChunkMessage instanceof ClientFileMessage)
+ MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBodySize + body.limit()));
+
+ if (currentBody != null)
{
- ClientFileMessage fileMessage = (ClientFileMessage)currentChunkMessage;
- addBytesBody(fileMessage, chunk.getBody());
+ newBody.putBytes(currentBody.array());
}
- else
- {
- MessagingBuffer currentBody = currentChunkMessage.getBody();
+
+ newBody.putBytes(body.array());
- MessagingBuffer newBody = new ByteBufferWrapper(ByteBuffer.allocate(currentBody.limit() + body.limit()));
-
- newBody.putBytes(currentBody.array());
- newBody.putBytes(body.array());
-
- currentChunkMessage.setBody(newBody);
- }
+ currentChunkMessage.setBody(newBody);
}
if (!chunk.isContinues())
{
// Close the file that was being generated
- if (currentChunkMessage instanceof ClientFileMessage)
+ if (isFileConsumer())
{
((ClientFileMessage)currentChunkMessage).closeChannel();
}
@@ -542,7 +537,7 @@
{
boolean expired = message.isExpired();
- flowControl(message.getEncodeSize());
+ flowControlBeforeConsumption(message);
if (!expired)
{
@@ -558,6 +553,24 @@
}
}
+ /**
+ * @param message
+ * @throws MessagingException
+ */
+ private void flowControlBeforeConsumption(ClientMessage message) throws MessagingException
+ {
+ // Chunk messages will execute the flow control while receiving the chunks
+ if (!message.isLargeMessage())
+ {
+ flowControl(message.getEncodeSize());
+ }
+ else
+ {
+ // But the header is only flow controlled right before the consumption
+ flowControl(message.getPropertiesEncodeSize());
+ }
+ }
+
private void doCleanUp(final boolean sendCloseMessage) throws MessagingException
{
try
@@ -614,41 +627,52 @@
private ClientFileMessage cloneAsFileMessage(final ClientMessage message) throws Exception
{
- int propertiesSize = message.getPropertiesEncodeSize();
-
- MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
-
- // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
- // MessagingBuffer.
- // There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose
- // abstraction
- message.encodeProperties(bufferProperties);
-
- bufferProperties.rewind();
-
- ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
-
- cloneMessage.decodeProperties(bufferProperties);
-
- cloneMessage.setDeliveryCount(message.getDeliveryCount());
-
- cloneMessage.setLargeMessage(message.isLargeMessage());
-
- cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" +
- this.session.getName() +
- "-" +
- this.getID() +
- ".jbm"));
-
- addBytesBody(cloneMessage, message.getBody().array());
-
- cloneMessage.closeChannel();
-
- return cloneMessage;
+ if (message instanceof ClientFileMessageImpl)
+ {
+ // nothing to be done
+ return (ClientFileMessage)message;
+ }
+ else
+ {
+ int propertiesSize = message.getPropertiesEncodeSize();
+
+ MessagingBuffer bufferProperties = message.getBody().createNewBuffer(propertiesSize);
+
+ // FIXME: Find a better way to clone this ClientMessageImpl as ClientFileMessageImpl without using the
+ // MessagingBuffer.
+ // There is no direct access into the Properties, and I couldn't add a direct cast to this method without loose
+ // abstraction
+ message.encodeProperties(bufferProperties);
+
+ bufferProperties.rewind();
+
+ ClientFileMessageImpl cloneMessage = new ClientFileMessageImpl();
+
+ cloneMessage.decodeProperties(bufferProperties);
+
+ cloneMessage.setDeliveryCount(message.getDeliveryCount());
+
+ cloneMessage.setLargeMessage(message.isLargeMessage());
+
+ cloneMessage.setFile(new File(this.directory, cloneMessage.getMessageID() + "-" +
+ this.session.getName() +
+ "-" +
+ this.getID() +
+ ".jbm"));
+
+ addBytesBody(cloneMessage, message.getBody().array());
+
+ cloneMessage.closeChannel();
+
+ return cloneMessage;
+ }
}
- private ClientMessage createFileMessage(final MessagingBuffer propertiesBuffer) throws Exception
+ private ClientMessage createFileMessage(final byte[] header) throws Exception
{
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
+
if (isFileConsumer())
{
if (!this.directory.exists())
@@ -657,7 +681,7 @@
}
ClientFileMessageImpl message = new ClientFileMessageImpl();
- message.decodeProperties(propertiesBuffer);
+ message.decodeProperties(headerBuffer);
message.setFile(new File(this.directory, message.getMessageID() + "-" +
this.session.getName() +
"-" +
@@ -669,7 +693,7 @@
else
{
ClientMessageImpl message = new ClientMessageImpl();
- message.decodeProperties(propertiesBuffer);
+ message.decodeProperties(headerBuffer);
message.setLargeMessage(true);
return message;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -25,7 +25,7 @@
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
/**
*
@@ -39,8 +39,10 @@
long getID();
void handleMessage(ClientMessage message) throws Exception;
+
+ void handleLargeMessage(byte[] largeMessageHeader) throws Exception;
- void handleChunk(SessionSendChunkMessage chunk) throws Exception;
+ void handleLargeMessageContinuation(SessionReceiveContinuationMessage continuation) throws Exception;
void clear();
@@ -55,4 +57,5 @@
void acknowledge(ClientMessage message) throws MessagingException;
void flushAcks() throws MessagingException;
+
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientFileMessageImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -113,8 +113,16 @@
@Override
public MessagingBuffer getBody()
{
- // TODO: Throw an unsuported exception. (Make sure no tests are using this method first)
+ throw new UnsupportedOperationException("getBody is not supported on FileMessages.");
+ }
+ /**
+ * If a ClientFileMessage is Smaller then the MinLargeMessage configured on the SessionFactory (or JMSConnectionFactory), it will still be sent as any other message,
+ * and for that the file body (which should be small) will be read from the file an populated on the output buffer
+ *
+ * */
+ public void encodeBody(MessagingBuffer buffer)
+ {
FileChannel channel = null;
try
{
@@ -122,12 +130,12 @@
// for a better performance, users should be using the channels when using file
channel = newChannel();
- ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
+ ByteBuffer fileBuffer = ByteBuffer.allocate((int)channel.size());
channel.position(0);
- channel.read(buffer);
+ channel.read(fileBuffer);
- return new ByteBufferWrapper(buffer);
+ buffer.putBytes(fileBuffer.array(), 0, fileBuffer.limit());
}
catch (Exception e)
{
@@ -146,6 +154,9 @@
}
}
+ /**
+ * Read the file content from start to size.
+ */
@Override
public synchronized void encodeBody(final MessagingBuffer buffer, final long start, final int size)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -32,7 +32,7 @@
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.SimpleString;
@@ -260,39 +260,38 @@
final int bodySize = msg.getBodySize();
- int chunkLength = minLargeMessageSize - headerSize;
+ SessionSendMessage initialChunk = new SessionSendMessage(headerBuffer.array(), false);
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
+ channel.send(initialChunk);
- msg.encodeBody(bodyBuffer, 0, chunkLength);
-
- SessionSendChunkMessage chunk = new SessionSendChunkMessage(-1,
- headerBuffer.array(),
- bodyBuffer.array(),
- chunkLength < bodySize,
- sendBlocking);
-
- if (sendBlocking)
+ for (int pos = 0; pos < bodySize;)
{
- channel.sendBlocking(chunk);
- }
- else
- {
- channel.send(chunk);
- }
-
- for (int pos = chunkLength; pos < bodySize;)
- {
- chunkLength = Math.min(bodySize - pos, minLargeMessageSize);
+ final int chunkLength;
+ final boolean lastChunk;
- bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
+
+ final int bytesToWrite = bodySize - pos;
+
+ if (bytesToWrite < minLargeMessageSize)
+ {
+ lastChunk = true;
+ chunkLength = bytesToWrite;
+ }
+ else
+ {
+ lastChunk = false;
+ chunkLength = minLargeMessageSize;
+ }
+
+ final MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate(chunkLength));
msg.encodeBody(bodyBuffer, pos, chunkLength);
- chunk = new SessionSendChunkMessage(-1, null, bodyBuffer.array(), pos + chunkLength < bodySize, sendBlocking);
+ final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.array(), !lastChunk, lastChunk && sendBlocking);
- if (sendBlocking)
+ if (sendBlocking && lastChunk)
{
+ // When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunk);
}
else
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -61,8 +61,8 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
@@ -639,14 +639,24 @@
consumer.handleMessage(message);
}
}
+
+ public void handleReceiveLargeMessage(final long consumerID, final byte[] headerBytes) throws Exception
+ {
+ ClientConsumerInternal consumer = consumers.get(consumerID);
- public void handleReceiveChunk(final long consumerID, final SessionSendChunkMessage chunk) throws Exception
+ if (consumer != null)
+ {
+ consumer.handleLargeMessage(headerBytes);
+ }
+ }
+
+ public void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception
{
ClientConsumerInternal consumer = consumers.get(consumerID);
if (consumer != null)
{
- consumer.handleChunk(chunk);
+ consumer.handleLargeMessageContinuation(continuation);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionInternal.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -16,7 +16,7 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
/**
* A ClientSessionInternal
@@ -39,9 +39,11 @@
void removeProducer(ClientProducerInternal producer);
+ void handleReceiveLargeMessage(final long consumerID, final byte[] headerBytes) throws Exception;
+
void handleReceiveMessage(long consumerID, ClientMessage message) throws Exception;
- void handleReceiveChunk(long consumerID, SessionSendChunkMessage chunk) throws Exception;
+ void handleReceiveContinuation(final long consumerID, final SessionReceiveContinuationMessage continuation) throws Exception;
void handleFailover(RemotingConnection backupConnection);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -23,7 +23,7 @@
package org.jboss.messaging.core.client.impl;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
import org.jboss.messaging.core.logging.Logger;
@@ -32,7 +32,7 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
/**
*
@@ -64,19 +64,26 @@
{
switch (type)
{
- case SESS_CHUNK_SEND:
+ case SESS_RECEIVE_CONTINUATION:
{
- SessionSendChunkMessage chunk = (SessionSendChunkMessage)packet;
- clientSession.handleReceiveChunk(chunk.getTargetID(), chunk);
+ SessionReceiveContinuationMessage continuation = (SessionReceiveContinuationMessage)packet;
+ clientSession.handleReceiveContinuation(continuation.getConsumerID(), continuation);
break;
}
case SESS_RECEIVE_MSG:
{
SessionReceiveMessage message = (SessionReceiveMessage) packet;
-
- clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
+ if (message.isLargeMessage())
+ {
+ clientSession.handleReceiveLargeMessage(message.getConsumerID(), message.getLargeMessageHeader());
+ }
+ else
+ {
+ clientSession.handleReceiveMessage(message.getConsumerID(), message.getClientMessage());
+ }
+
break;
}
case EXCEPTION:
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -395,6 +395,34 @@
{
return this.properties;
}
+
+ /**
+ * Constructs a <code>String</code> with all attributes
+ * in name = value format.
+ *
+ * @return a <code>String</code> representation
+ * of this object.
+ */
+ public String toString()
+ {
+ final String TAB = ", ";
+
+ StringBuffer retValue = new StringBuffer();
+
+ retValue.append("MessageImpl ( ")
+ .append("messageID = ").append(this.messageID).append(TAB)
+ .append("destination = ").append(this.destination).append(TAB)
+ .append("type = ").append(this.type).append(TAB)
+ .append("durable = ").append(this.durable).append(TAB)
+ .append("expiration = ").append(this.expiration).append(TAB)
+ .append("timestamp = ").append(this.timestamp).append(TAB)
+ .append("properties = ").append(this.properties).append(TAB)
+ .append("priority = ").append(this.priority).append(TAB)
+ .append("body = ").append(this.body).append(TAB)
+ .append(" )");
+
+ return retValue.toString();
+ }
// Private -------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -159,4 +159,5 @@
*
*/
void startGlobalDepage();
+
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -281,9 +281,8 @@
{
store.startDepaging(pagingSPI.getGlobalDepagerExecutor());
}
- globalMode.set(false);
}
-
+
/* (non-Javadoc)
* @see org.jboss.messaging.core.paging.PagingManager#getGlobalSize()
*/
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -27,6 +27,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -69,7 +70,7 @@
private final DecimalFormat format = new DecimalFormat("000000000");
- private final AtomicInteger pageUsedSize = new AtomicInteger(0);
+ private final AtomicInteger currentPageSize = new AtomicInteger(0);
private final SimpleString storeName;
@@ -90,7 +91,7 @@
// Bytes consumed by the queue on the memory
private final AtomicLong sizeInBytes = new AtomicLong();
- private volatile Runnable depageAction;
+ private final AtomicBoolean depaging = new AtomicBoolean(false);
private volatile int numberOfPages;
@@ -214,80 +215,6 @@
return storeName;
}
- /**
- * It returns a Page out of the Page System without reading it.
- * The method calling this method will remove the page and will start reading it outside of any locks.
- *
- * */
- public Page depage() throws Exception
- {
- writeLock.lock();
- currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
-
- try
- {
- if (numberOfPages == 0)
- {
- return null;
- }
- else
- {
- numberOfPages--;
-
- final Page returnPage;
-
- // We are out of old pages, all that is left now is the current page.
- // On that case we need to replace it by a new empty page, and return the current page immediately
- if (currentPageId == firstPageId)
- {
- firstPageId = Integer.MAX_VALUE;
-
- if (currentPage != null)
- {
- returnPage = currentPage;
- returnPage.close();
- currentPage = null;
- }
- else
- {
- // sanity check... it shouldn't happen!
- throw new IllegalStateException("CurrentPage is null");
- }
-
- // The current page is empty... what means we achieved the end of the pages
- if (returnPage.getNumberOfMessages() == 0)
- {
- returnPage.open();
- returnPage.delete();
-
- // This will trigger this Destination to exit the page mode,
- // and this will make JBM start using the journal again
- return null;
- }
- else
- {
- // We need to create a new page, as we can't lock the address until we finish depaging.
- openNewPage();
- }
-
- return returnPage;
- }
- else
- {
- returnPage = createPage(firstPageId++);
- }
-
- return returnPage;
- }
- }
- finally
- {
- currentPageLock.writeLock().unlock();
- writeLock.unlock();
- }
-
- }
-
public long addSize(final long size) throws Exception
{
final long maxSize = getMaxSizeBytes();
@@ -355,15 +282,16 @@
if (isTrace)
{
- log.trace("globalMode.get = " + pagingManager.isGlobalPageMode() +
- " currentGlobalSize = " +
- currentGlobalSize +
- " defaultPageSize = " +
- pagingManager.getDefaultPageSize() +
- " maxGlobalSize = " +
- maxGlobalSize +
- "maxGlobalSize - defaultPageSize = " +
- (maxGlobalSize - pagingManager.getDefaultPageSize()));
+
+ log.trace(" globalDepage = " + pagingManager.isGlobalPageMode() +
+ "\n currentGlobalSize = " +
+ currentGlobalSize +
+ "\n defaultPageSize = " +
+ pagingManager.getDefaultPageSize() +
+ "\n maxGlobalSize = " +
+ maxGlobalSize +
+ "\n maxGlobalSize - defaultPageSize = " +
+ (maxGlobalSize - pagingManager.getDefaultPageSize()));
}
if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
@@ -424,16 +352,16 @@
int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
- if (pageUsedSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
+ if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
{
- // Make sure nothing is currently validating currentPaging
+ // Make sure nothing is currently validating or using currentPage
currentPageLock.writeLock().lock();
try
{
openNewPage();
- // openNewPage will set pageUsedSize to zero, we need to set it again
- pageUsedSize.addAndGet(bytesToWrite);
+ // openNewPage will set currentPageSize to zero, we need to set it again
+ currentPageSize.addAndGet(bytesToWrite);
}
finally
{
@@ -505,11 +433,14 @@
}
else
{
+ // startDepaging and clearDepage needs to be atomic.
+ // We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
synchronized (this)
{
- if (depageAction == null)
+ if (!depaging.get())
{
- depageAction = new DepageRunnable(executor);
+ depaging.set(true);
+ Runnable depageAction = new DepageRunnable(executor);
executor.execute(depageAction);
return true;
}
@@ -685,6 +616,82 @@
openNewPage();
}
+ /**
+ * It returns a Page out of the Page System without reading it.
+ * The method calling this method will remove the page and will start reading it outside of any locks.
+ *
+ * Observation: This method is used internally as part of the regular depage process, but externally is used only on tests,
+ * and that's why this method is part of the Testable Interface
+ * */
+ public Page depage() throws Exception
+ {
+ writeLock.lock();
+ currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
+
+ try
+ {
+ if (numberOfPages == 0)
+ {
+ return null;
+ }
+ else
+ {
+ numberOfPages--;
+
+ final Page returnPage;
+
+ // We are out of old pages, all that is left now is the current page.
+ // On that case we need to replace it by a new empty page, and return the current page immediately
+ if (currentPageId == firstPageId)
+ {
+ firstPageId = Integer.MAX_VALUE;
+
+ if (currentPage != null)
+ {
+ returnPage = currentPage;
+ returnPage.close();
+ currentPage = null;
+ }
+ else
+ {
+ // sanity check... it shouldn't happen!
+ throw new IllegalStateException("CurrentPage is null");
+ }
+
+ // The current page is empty... what means we achieved the end of the pages
+ if (returnPage.getNumberOfMessages() == 0)
+ {
+ returnPage.open();
+ returnPage.delete();
+
+ // This will trigger this Destination to exit the page mode,
+ // and this will make JBM start using the journal again
+ return null;
+ }
+ else
+ {
+ // We need to create a new page, as we can't lock the address until we finish depaging.
+ openNewPage();
+ }
+
+ return returnPage;
+ }
+ else
+ {
+ returnPage = createPage(firstPageId++);
+ }
+
+ return returnPage;
+ }
+ }
+ finally
+ {
+ currentPageLock.writeLock().unlock();
+ writeLock.unlock();
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -699,7 +706,7 @@
* If persistent messages are also used, it will update eventual PageTransactions
*/
- private boolean onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> data) throws Exception
+ private void onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> data) throws Exception
{
trace("Depaging....");
@@ -720,7 +727,7 @@
if (pageId <= lastPage.getLastId())
{
log.warn("Page " + pageId + " was already processed, ignoring the page");
- return true;
+ return;
}
}
@@ -804,29 +811,52 @@
ref.getQueue().addLast(ref);
}
- if (pagingManager.isGlobalPageMode())
- {
- // We use the Default Page Size when in global mode for the calculation of the Watermark
- return pagingManager.getGlobalSize() < pagingManager.getMaxGlobalSize() - pagingManager.getDefaultPageSize() && getMaxSizeBytes() <= 0 ||
- getAddressSize() < getMaxSizeBytes();
- }
- else
- {
- // If Max-size is not configured (-1) it will aways return true, as
- // this method was probably called by global-depage
- return getMaxSizeBytes() <= 0 || getAddressSize() < getMaxSizeBytes();
- }
+ }
+ /**
+ * @return
+ */
+ private boolean isFull(final long nextPageSize)
+ {
+ return getMaxSizeBytes() > 0 && getAddressSize() + nextPageSize > getMaxSizeBytes();
}
+ /**
+ * @param nextPageSize
+ * @return
+ */
+ private boolean isGlobalFull(final long nextPageSize)
+ {
+ return pagingManager.getMaxGlobalSize() > 0 && pagingManager.getGlobalSize() + nextPageSize > pagingManager.getMaxGlobalSize();
+ }
+
private long addAddressSize(final long delta)
{
return sizeInBytes.addAndGet(delta);
}
- private synchronized void clearDequeueThread()
+ /**
+ * startDepaging and clearDepage needs to be atomic.
+ * We can't use writeLock to this operation as writeLock would still be used by another thread, and still being a valid usage
+ * @return true if the depage status was cleared
+ */
+ private synchronized boolean clearDepage()
{
- depageAction = null;
+ final boolean pageFull = isFull(getPageSizeBytes());
+ final boolean globalFull = isGlobalFull(getPageSizeBytes());
+ if (pageFull || globalFull)
+ {
+ depaging.set(false);
+ if (!globalFull)
+ {
+ pagingManager.setGlobalPageMode(false);
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
private void openNewPage() throws Exception
@@ -851,7 +881,7 @@
currentPage = createPage(currentPageId);
- pageUsedSize.set(0);
+ currentPageSize.set(0);
currentPage.open();
}
@@ -912,7 +942,7 @@
* @return
* @throws Exception
*/
- private boolean readPage() throws Exception
+ private void readPage() throws Exception
{
Page page = depage();
@@ -925,18 +955,16 @@
lastPageRecord = null;
- return false;
+ return;
}
page.open();
List<PagedMessage> messages = page.read();
- boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
+ onDepage(page.getPageId(), storeName, messages);
page.delete();
-
- return addressNotFull;
}
// Inner classes -------------------------------------------------
@@ -956,14 +984,14 @@
{
if (running)
{
- boolean needMorePages = readPage();
- if (needMorePages)
+ if (!isFull(getPageSizeBytes()) && !isGlobalFull(getPageSizeBytes()))
{
- followingExecutor.execute(this);
+ readPage();
}
- else
+ // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed because the page was full
+ if (!clearDepage())
{
- clearDequeueThread();
+ followingExecutor.execute(this);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -187,7 +187,10 @@
public void deleteFile() throws MessagingException
{
- storageManager.deleteFile(file);
+ if (file != null)
+ {
+ storageManager.deleteFile(file);
+ }
}
@Override
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -502,6 +502,8 @@
storageManager.loadMessages(this, queues, resourceManager);
+ // This is necessary as if the server was previously stopped while a depage was being executed,
+ // it needs to resume the depage process on those destinations
pagingManager.startGlobalDepage();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -28,7 +28,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -40,11 +39,13 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REMOVE_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -113,10 +114,11 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionFailoverCompleteMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -357,14 +359,14 @@
{
return new ArrayList<FailureListener>(failureListeners);
}
-
+
public void setFailureListeners(final List<FailureListener> listeners)
{
this.failureListeners.clear();
-
+
this.failureListeners.addAll(listeners);
}
-
+
public Object getID()
{
return transportConnection.getID();
@@ -390,7 +392,7 @@
}
public void addFailureListener(final FailureListener listener)
- {
+ {
if (listener == null)
{
throw new IllegalStateException("FailureListener cannot be null");
@@ -418,7 +420,7 @@
{
return replicatingConnection;
}
-
+
public void setReplicatingConnection(final RemotingConnection connection)
{
this.replicatingConnection = connection;
@@ -552,7 +554,7 @@
try
{
boolean callNext = listener.connectionFailed(me);
-
+
if (!callNext)
{
break;
@@ -839,11 +841,16 @@
packet = new NullResponseMessage();
break;
}
- case SESS_CHUNK_SEND:
+ case SESS_RECEIVE_CONTINUATION:
{
- packet = new SessionSendChunkMessage();
+ packet = new SessionReceiveContinuationMessage();
break;
}
+ case SESS_SEND_CONTINUATION:
+ {
+ packet = new SessionSendContinuationMessage();
+ break;
+ }
case SESS_REPLICATE_DELIVERY:
{
packet = new SessionReplicateDeliveryMessage();
@@ -1152,9 +1159,9 @@
if (replicatingChannel != null)
{
DelayedResult result = new DelayedResult();
-
+
responseActions.add(result);
-
+
responseActionCount++;
replicatingChannel.send(packet);
@@ -1190,7 +1197,7 @@
break;
}
}
-
+
responseActionCount = 0;
}
}
@@ -1212,7 +1219,7 @@
// This will never get called concurrently by more than one thread
private int responseActionCount;
-
+
// TODO it's not ideal synchronizing this since it forms a contention point with replication
// but we need to do this to protect it w.r.t. the check on replicatingChannel
public void replicateResponseReceived()
@@ -1228,7 +1235,7 @@
if (result == null)
{
throw new IllegalStateException("Cannot find response action");
- }
+ }
}
}
@@ -1236,12 +1243,12 @@
if (result != null)
{
result.replicated();
-
- //TODO - we can optimise this not to lock every time - only if waiting for all replications to return
+
+ // TODO - we can optimise this not to lock every time - only if waiting for all replications to return
synchronized (replicationLock)
{
responseActionCount--;
-
+
if (responseActionCount == 0)
{
replicationLock.notify();
@@ -1249,34 +1256,34 @@
}
}
}
-
+
private void waitForAllReplicationResponse()
- {
+ {
synchronized (replicationLock)
{
if (replicatingChannel != null)
{
long toWait = 10000; // TODO don't hardcode timeout
-
+
long start = System.currentTimeMillis();
-
+
while (responseActionCount > 0 && toWait > 0)
- {
+ {
try
{
replicationLock.wait();
}
catch (InterruptedException e)
- {
+ {
}
-
+
long now = System.currentTimeMillis();
toWait -= now - start;
- start = now;
+ start = now;
}
-
+
if (toWait <= 0)
{
log.warn("Timed out waiting for replication responses to return");
@@ -1318,16 +1325,16 @@
public void transferConnection(final RemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
- // the old connection get processed after transfer has occurred
+ // the old connection get processed after transfer has occurred
synchronized (connection.transferLock)
{
connection.channels.remove(id);
-
- //If we're reconnecting to a live node which is replicated then there will be a replicating channel
- //too. We need to then make sure that all replication responses come back since packets aren't
- //considered confirmed until response comes back and is processed. Otherwise responses to previous
- //message sends could come back after reconnection resulting in clients resending same message
- //since it wasn't confirmed yet.
+
+ // If we're reconnecting to a live node which is replicated then there will be a replicating channel
+ // too. We need to then make sure that all replication responses come back since packets aren't
+ // considered confirmed until response comes back and is processed. Otherwise responses to previous
+ // message sends could come back after reconnection resulting in clients resending same message
+ // since it wasn't confirmed yet.
waitForAllReplicationResponse();
// And switch it
@@ -1342,7 +1349,7 @@
rnewConnection.channels.put(id, this);
connection = rnewConnection;
- }
+ }
}
public void replayCommands(final int otherLastReceivedCommandID)
@@ -1350,7 +1357,7 @@
clearUpTo(otherLastReceivedCommandID);
for (final Packet packet : resendCache)
- {
+ {
doWrite(packet);
}
}
@@ -1621,7 +1628,7 @@
channel.replicatingChannelDead();
}
}
-
+
return true;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -137,15 +137,17 @@
public static final byte SESS_SEND = 76;
- public static final byte SESS_CONSUMER_CLOSE = 77;
+ public static final byte SESS_SEND_CONTINUATION = 77;
+ public static final byte SESS_CONSUMER_CLOSE = 78;
+
public static final byte SESS_RECEIVE_MSG = 79;
- public static final byte SESS_FAILOVER_COMPLETE = 80;
+ public static final byte SESS_RECEIVE_CONTINUATION = 80;
- public static final byte SESS_REPLICATE_DELIVERY = 81;
+ public static final byte SESS_FAILOVER_COMPLETE = 81;
- public static final byte SESS_CHUNK_SEND = 95;
+ public static final byte SESS_REPLICATE_DELIVERY = 82;
// Static --------------------------------------------------------
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,117 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * A SessionContinuationMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 5, 2008 10:08:40 AM
+ *
+ *
+ */
+public abstract class SessionContinuationMessage extends PacketImpl
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private byte[] body;
+
+ private boolean continues;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SessionContinuationMessage(byte type,
+ final byte[] body,
+ final boolean continues)
+ {
+ super(type);
+ this.body = body;
+ this.continues = continues;
+ }
+
+ public SessionContinuationMessage(byte type)
+ {
+ super(type);
+ }
+
+ // Public --------------------------------------------------------
+
+ /**
+ * @return the body
+ */
+ public byte[] getBody()
+ {
+ return body;
+ }
+
+ /**
+ * @return the continues
+ */
+ public boolean isContinues()
+ {
+ return continues;
+ }
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
+ body.length +
+ DataConstants.SIZE_BOOLEAN;
+ }
+
+ @Override
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ buffer.putInt(body.length);
+ buffer.putBytes(body);
+ buffer.putBoolean(continues);
+ }
+
+ @Override
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ int size = buffer.getInt();
+ body = new byte[size];
+ buffer.getBytes(body);
+ continues = buffer.getBoolean();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Property changes on: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionContinuationMessage.java
___________________________________________________________________
Name: svn:mergeinfo
+
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveContinuationMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+/**
+ * A SessionSendContinuationMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 4, 2008 12:25:14 PM
+ *
+ *
+ */
+public class SessionReceiveContinuationMessage extends SessionContinuationMessage
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long consumerID;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * @param type
+ */
+ public SessionReceiveContinuationMessage()
+ {
+ super(SESS_RECEIVE_CONTINUATION);
+ }
+
+ /**
+ * @param type
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ public SessionReceiveContinuationMessage(final long consumerID,
+ final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse)
+ {
+ super(SESS_RECEIVE_CONTINUATION, body, continues);
+ this.consumerID = consumerID;
+ }
+
+ /**
+ * @return the consumerID
+ */
+ public long getConsumerID()
+ {
+ return consumerID;
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return super.getRequiredBufferSize() + DataConstants.SIZE_LONG;
+ }
+
+ @Override
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ super.encodeBody(buffer);
+ buffer.putLong(consumerID);
+ }
+
+ @Override
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ super.decodeBody(buffer);
+ consumerID = buffer.getLong();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReceiveMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -46,6 +46,10 @@
private long consumerID;
+ private boolean largeMessage;
+
+ private byte[] largeMessageHeader;
+
private ClientMessage clientMessage;
private ServerMessage serverMessage;
@@ -56,6 +60,19 @@
// Constructors --------------------------------------------------
+ public SessionReceiveMessage(final long consumerID, final byte[] largeMessageHeader, final int deliveryCount)
+ {
+ super(SESS_RECEIVE_MSG);
+
+ this.consumerID = consumerID;
+
+ this.largeMessageHeader = largeMessageHeader;
+
+ this.deliveryCount = deliveryCount;
+
+ this.largeMessage = true;
+ }
+
public SessionReceiveMessage(final long consumerID, final ServerMessage message, final int deliveryCount)
{
super(SESS_RECEIVE_MSG);
@@ -67,6 +84,8 @@
this.clientMessage = null;
this.deliveryCount = deliveryCount;
+
+ this.largeMessage = false;
}
public SessionReceiveMessage()
@@ -91,22 +110,57 @@
return serverMessage;
}
+ public byte[] getLargeMessageHeader()
+ {
+ return largeMessageHeader;
+ }
+
+ /**
+ * @return the largeMessage
+ */
+ public boolean isLargeMessage()
+ {
+ return largeMessage;
+ }
+
public int getDeliveryCount()
{
return deliveryCount;
}
-
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_INT + serverMessage.getEncodeSize();
+ if (largeMessage)
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT +
+ DataConstants.SIZE_BOOLEAN +
+ DataConstants.SIZE_INT +
+ largeMessageHeader.length;
+ }
+ else
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG +
+ DataConstants.SIZE_INT +
+ DataConstants.SIZE_BOOLEAN +
+ serverMessage.getEncodeSize();
+ }
}
-
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putLong(consumerID);
buffer.putInt(deliveryCount);
- serverMessage.encode(buffer);
+ buffer.putBoolean(largeMessage);
+ if (largeMessage)
+ {
+ buffer.putInt(largeMessageHeader.length);
+ buffer.putBytes(largeMessageHeader);
+ }
+ else
+ {
+ serverMessage.encode(buffer);
+ }
}
public void decodeBody(final MessagingBuffer buffer)
@@ -117,11 +171,20 @@
deliveryCount = buffer.getInt();
- clientMessage = new ClientMessageImpl(deliveryCount);
+ largeMessage = buffer.getBoolean();
- clientMessage.decode(buffer);
-
- clientMessage.getBody().flip();
+ if (largeMessage)
+ {
+ int size = buffer.getInt();
+ largeMessageHeader = new byte[size];
+ buffer.getBytes(largeMessageHeader);
+ }
+ else
+ {
+ clientMessage = new ClientMessageImpl(deliveryCount);
+ clientMessage.decode(buffer);
+ clientMessage.getBody().flip();
+ }
}
// Package protected ---------------------------------------------
Deleted: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendChunkMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -1,197 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.core.remoting.impl.wireformat;
-
-import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.DataConstants;
-
-/**
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class SessionSendChunkMessage extends PacketImpl
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long targetID;
-
- private byte[] header;
-
- private byte[] body;
-
- private boolean continues;
-
- private long messageID;
-
- private boolean requiresResponse;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public SessionSendChunkMessage(final long targetID,
- final byte[] header,
- final byte[] body,
- final boolean continues,
- final boolean requiresResponse)
- {
- super(SESS_CHUNK_SEND);
- this.targetID = targetID;
- this.header = header;
- this.body = body;
- this.continues = continues;
- this.requiresResponse = requiresResponse;
- }
-
- public SessionSendChunkMessage()
- {
- super(SESS_CHUNK_SEND);
- }
-
- // Public --------------------------------------------------------
-
- public long getTargetID()
- {
- return targetID;
- }
-
- public boolean isRequiresResponse()
- {
- return requiresResponse;
- }
-
- public byte[] getHeader()
- {
- return header;
- }
-
- public byte[] getBody()
- {
- return body;
- }
-
- public long getMessageID()
- {
- return messageID;
- }
-
- public void setMessageID(final long messageId)
- {
- messageID = messageId;
- }
-
- public boolean isContinues()
- {
- return continues;
- }
-
- @Override
- public int getRequiredBufferSize()
- {
- return DEFAULT_PACKET_SIZE + DataConstants.SIZE_LONG /* TargetID */+
- DataConstants.SIZE_INT /* HeaderLength */+
- (header != null ? header.length : 0) /* Header bytes */+
- DataConstants.SIZE_INT /* BodyLength */+
- body.length /* Body bytes */+
- DataConstants.SIZE_BOOLEAN /* hasContinuations */+
- DataConstants.SIZE_BOOLEAN /* requiresResponse */+
- DataConstants.SIZE_BOOLEAN /* has MessageId */+
- (messageID > 0 ? DataConstants.SIZE_LONG : 0);
- }
-
- @Override
- public void encodeBody(final MessagingBuffer buffer)
- {
- buffer.putLong(targetID);
-
- if (header != null)
- {
- buffer.putInt(header.length);
- buffer.putBytes(header);
- }
- else
- {
- buffer.putInt(0);
- }
-
- buffer.putInt(body.length);
- buffer.putBytes(body);
-
- buffer.putBoolean(continues);
-
- buffer.putBoolean(requiresResponse);
-
- buffer.putBoolean(messageID > 0);
-
- if (messageID > 0)
- {
- buffer.putLong(messageID);
- }
-
- }
-
- @Override
- public void decodeBody(final MessagingBuffer buffer)
- {
- targetID = buffer.getLong();
-
- final int headerLength = buffer.getInt();
-
- if (headerLength > 0)
- {
- header = new byte[headerLength];
- buffer.getBytes(header);
- }
- else
- {
- header = null;
- }
-
- final int bodyLength = buffer.getInt();
-
- body = new byte[bodyLength];
- buffer.getBytes(body);
-
- continues = buffer.getBoolean();
-
- requiresResponse = buffer.getBoolean();
-
- final boolean hasMessageID = buffer.getBoolean();
-
- if (hasMessageID)
- {
- messageID = buffer.getLong();
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-}
Added: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendContinuationMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.remoting.impl.wireformat;
+
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+
+
+/**
+ * A SessionSendContinuationMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 4, 2008 12:25:14 PM
+ *
+ *
+ */
+public class SessionSendContinuationMessage extends SessionContinuationMessage
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private boolean requiresResponse;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * @param type
+ */
+ public SessionSendContinuationMessage()
+ {
+ super(SESS_SEND_CONTINUATION);
+ }
+
+ /**
+ * @param type
+ * @param body
+ * @param continues
+ * @param requiresResponse
+ */
+ public SessionSendContinuationMessage(final byte[] body,
+ final boolean continues,
+ final boolean requiresResponse)
+ {
+ super(SESS_SEND_CONTINUATION, body, continues);
+ this.requiresResponse = requiresResponse;
+ }
+
+
+ // Public --------------------------------------------------------
+
+ /**
+ * @return the requiresResponse
+ */
+ public boolean isRequiresResponse()
+ {
+ return requiresResponse;
+ }
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return super.getRequiredBufferSize() + DataConstants.SIZE_BOOLEAN;
+ }
+
+ @Override
+ public void encodeBody(final MessagingBuffer buffer)
+ {
+ super.encodeBody(buffer);
+ buffer.putBoolean(requiresResponse);
+ }
+
+ @Override
+ public void decodeBody(final MessagingBuffer buffer)
+ {
+ super.decodeBody(buffer);
+ requiresResponse = buffer.getBoolean();
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendMessage.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -18,11 +18,10 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.remoting.impl.wireformat;
-import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ServerMessage;
@@ -32,6 +31,7 @@
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:csuconic at redhat.com">Clebert Suconic</a>
*
* @version <tt>$Revision$</tt>
*/
@@ -39,14 +39,20 @@
{
// Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(SessionSendMessage.class);
-
// Attributes ----------------------------------------------------
private Message clientMessage;
+
+ private boolean largeMessage;
+
+ /** Used only if largeMessage */
+ private byte[] largeMessageHeader;
+ /** We need to set the MessageID when replicating this on the server */
+ private long largeMessageId = -1;
+
private ServerMessage serverMessage;
-
+
private boolean requiresResponse;
// Static --------------------------------------------------------
@@ -57,74 +63,188 @@
{
super(SESS_SEND);
- this.clientMessage = message;
-
+ clientMessage = message;
+
this.requiresResponse = requiresResponse;
+
+ largeMessage = false;
}
-
+
+ public SessionSendMessage(final byte[] largeMessageHeader, final boolean requiresResponse)
+ {
+ super(SESS_SEND);
+
+ this.largeMessageHeader = largeMessageHeader;
+
+ this.requiresResponse = requiresResponse;
+
+ largeMessage = true;
+ }
+
public SessionSendMessage()
{
super(SESS_SEND);
}
-
+
// Public --------------------------------------------------------
+
+ public boolean isLargeMessage()
+ {
+ return largeMessage;
+ }
+
public Message getClientMessage()
{
return clientMessage;
}
-
+
public ServerMessage getServerMessage()
{
return serverMessage;
}
-
+
+ public byte[] getLargeMessageHeader()
+ {
+ return largeMessageHeader;
+ }
+
public boolean isRequiresResponse()
{
return requiresResponse;
}
+ /**
+ * @return the largeMessageId
+ */
+ public long getMessageID()
+ {
+ if (largeMessage)
+ {
+ return largeMessageId;
+ }
+ else
+ {
+ return serverMessage.getMessageID();
+ }
+ }
+
+ /**
+ * @param largeMessageId the largeMessageId to set
+ */
+ public void setMessageID(long id)
+ {
+ if (largeMessage)
+ {
+ this.largeMessageId = id;
+ }
+ else
+ {
+ serverMessage.setMessageID(id);
+ }
+ }
+
+ @Override
public void encodeBody(final MessagingBuffer buffer)
{
- if (clientMessage != null)
+ buffer.putBoolean(largeMessage);
+
+ if (largeMessage)
{
+ buffer.putInt(largeMessageHeader.length);
+ buffer.putBytes(largeMessageHeader);
+
+ if (largeMessageId > 0)
+ {
+ buffer.putBoolean(true);
+ buffer.putLong(largeMessageId);
+ }
+ else
+ {
+ buffer.putBoolean(false);
+ }
+ }
+ else if (clientMessage != null)
+ {
clientMessage.encode(buffer);
}
else
{
- //If we're replicating a buffer to a backup node then we encode the serverMessage not the clientMessage
+ // If we're replicating a buffer to a backup node then we encode the serverMessage not the clientMessage
serverMessage.encode(buffer);
}
-
+
buffer.putBoolean(requiresResponse);
}
-
+
+ @Override
public void decodeBody(final MessagingBuffer buffer)
{
- //TODO can be optimised
-
- serverMessage = new ServerMessageImpl();
-
- serverMessage.decode(buffer);
-
- serverMessage.getBody().flip();
-
- requiresResponse = buffer.getBoolean();
+ largeMessage = buffer.getBoolean();
+
+ if (largeMessage)
+ {
+ int largeMessageLength = buffer.getInt();
+
+ largeMessageHeader = new byte[largeMessageLength];
+
+ buffer.getBytes(largeMessageHeader);
+
+ final boolean largeMessageIDFilled = buffer.getBoolean();
+
+ if (largeMessageIDFilled)
+ {
+ this.largeMessageId = buffer.getLong();
+ }
+ else
+ {
+ this.largeMessageId = -1;
+ }
+ }
+ else
+ {
+ // TODO can be optimised
+
+ serverMessage = new ServerMessageImpl();
+
+ serverMessage.decode(buffer);
+
+ serverMessage.getBody().flip();
+
+ requiresResponse = buffer.getBoolean();
+ }
}
+ @Override
public int getRequiredBufferSize()
{
- if (clientMessage != null)
+ if (largeMessage)
{
- return BASIC_PACKET_SIZE + clientMessage.getEncodeSize() + DataConstants.SIZE_BOOLEAN;
+ return BASIC_PACKET_SIZE +
+ // IsLargeMessage
+ DataConstants.SIZE_BOOLEAN +
+ // BufferSize
+ DataConstants.SIZE_INT +
+ // Bytes sent
+ largeMessageHeader.length +
+ // LargeMessageID (if > 0) and a boolean statying if the largeMessageID is set
+ DataConstants.SIZE_BOOLEAN + (largeMessageId >= 0 ? DataConstants.SIZE_LONG : 0) +
+ DataConstants.SIZE_BOOLEAN;
}
+ else if (clientMessage != null)
+ {
+ return DataConstants.SIZE_BOOLEAN + BASIC_PACKET_SIZE +
+ clientMessage.getEncodeSize() +
+ DataConstants.SIZE_BOOLEAN;
+ }
else
{
- return BASIC_PACKET_SIZE + serverMessage.getEncodeSize() + DataConstants.SIZE_BOOLEAN;
+ return DataConstants.SIZE_BOOLEAN + BASIC_PACKET_SIZE +
+ serverMessage.getEncodeSize() +
+ DataConstants.SIZE_BOOLEAN;
}
}
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -37,7 +37,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -66,7 +66,7 @@
String getUsername();
String getPassword();
-
+
int getMinLargeMessageSize();
Object getConnectionID();
@@ -74,11 +74,11 @@
void removeConsumer(ServerConsumer consumer) throws Exception;
void close() throws Exception;
-
+
void promptDelivery(Queue queue);
void handleAcknowledge(final SessionAcknowledgeMessage packet);
-
+
void handleExpired(final SessionExpiredMessage packet);
void handleRollback(Packet packet);
@@ -110,15 +110,15 @@
void handleSetXATimeout(SessionXASetTimeoutMessage packet);
void handleAddDestination(SessionAddDestinationMessage packet);
-
+
void handleStart(Packet packet);
-
+
void handleStop(Packet packet);
void handleRemoveDestination(SessionRemoveDestinationMessage packet);
void handleCreateQueue(SessionCreateQueueMessage packet);
-
+
void handleDeleteQueue(SessionDeleteQueueMessage packet);
void handleCreateConsumer(SessionCreateConsumerMessage packet);
@@ -131,18 +131,20 @@
void handleReceiveConsumerCredits(SessionConsumerFlowCreditMessage packet);
- public void handleSendChunkMessage(SessionSendChunkMessage packet);
+ void handleSendContinuations(SessionSendContinuationMessage packet);
void handleSend(SessionSendMessage packet);
+ void handleSendLargeMessage(SessionSendMessage packet);
+
void handleFailedOver(Packet packet);
-
+
void handleClose(Packet packet);
-
+
void handleReplicatedDelivery(SessionReplicateDeliveryMessage packet);
-
+
int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
-
+
Channel getChannel();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -43,7 +43,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.NullResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
@@ -180,15 +180,15 @@
// Otherwise we could end up with a situation where a close comes in, then a delivery comes in,
// then close gets replicated to backup, then delivery gets replicated, but consumer is already
// closed!
-// lock.lock();
-// try
-// {
- setStarted(false);
-// }
-// finally
-// {
-// lock.unlock();
-// }
+ // lock.lock();
+ // try
+ // {
+ setStarted(false);
+ // }
+ // finally
+ // {
+ // lock.unlock();
+ // }
DelayedResult result = channel.replicatePacket(packet);
@@ -239,15 +239,15 @@
public void close() throws Exception
{
-// lock.lock();
-// try
-// {
- setStarted(false);
-// }
-// finally
-// {
-// lock.unlock();
-// }
+ // lock.lock();
+ // try
+ // {
+ setStarted(false);
+ // }
+ // finally
+ // {
+ // lock.unlock();
+ // }
doClose();
}
@@ -545,24 +545,26 @@
{
deliveringRefs.add(ref);
}
-
+
+ // TODO: get rid of the instanceof by something like message.isLargeMessage()
if (message instanceof ServerLargeMessage)
{
// TODO: How to inform the backup node about the LargeMessage being sent?
- largeMessageSender = new LargeMessageSender((ServerLargeMessage)message);
+ largeMessageSender = new LargeMessageSender((ServerLargeMessage)message, ref);
largeMessageSender.sendLargeMessage();
}
else
{
sendStandardMessage(ref, message);
- }
- if (preAcknowledge)
- {
- doAck(ref);
+ if (preAcknowledge)
+ {
+ doAck(ref);
+ }
}
+
return HandleStatus.HANDLED;
}
finally
@@ -589,7 +591,7 @@
if (result == null)
{
// Not replicated - just send now
-
+
channel.send(packet);
}
else
@@ -598,7 +600,7 @@
result.setResultRunner(new Runnable()
{
public void run()
- {
+ {
channel.send(packet);
}
});
@@ -617,16 +619,22 @@
/** The current message being processed */
private ServerLargeMessage pendingLargeMessage;
+ private final MessageReference ref;
+
+ private boolean sentFirstMessage = false;
+
/** The current position on the message being processed */
private long positionPendingLargeMessage;
- private SessionSendChunkMessage readAheadChunk;
+ private SessionReceiveContinuationMessage readAheadChunk;
- public LargeMessageSender(final ServerLargeMessage message)
+ public LargeMessageSender(final ServerLargeMessage message, final MessageReference ref)
{
pendingLargeMessage = (ServerLargeMessage)message;
sizePendingLargeMessage = pendingLargeMessage.getBodySize();
+
+ this.ref = ref;
}
public boolean sendLargeMessage()
@@ -645,20 +653,42 @@
return false;
}
+ if (!sentFirstMessage)
+ {
+
+ sentFirstMessage = true;
+
+ MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
+ pendingLargeMessage.encodeProperties(headerBuffer);
+
+ SessionReceiveMessage initialMessage = new SessionReceiveMessage(id,
+ headerBuffer.array(),
+ ref.getDeliveryCount() + 1);
+
+ channel.send(initialMessage);
+
+ if (availableCredits != null)
+ {
+ // RequiredBufferSize on this case represents the right number of bytes sent
+ availableCredits.addAndGet(-pendingLargeMessage.getPropertiesEncodeSize());
+ }
+ }
+
if (readAheadChunk != null)
{
int chunkLen = readAheadChunk.getBody().length;
-
+
positionPendingLargeMessage += chunkLen;
-
- channel.send(readAheadChunk);
-
- readAheadChunk = null;
-
+
if (availableCredits != null)
{
availableCredits.addAndGet(-chunkLen);
}
+
+ channel.send(readAheadChunk);
+
+ readAheadChunk = null;
+
}
while (positionPendingLargeMessage < sizePendingLargeMessage)
@@ -672,7 +702,7 @@
return false;
}
- SessionSendChunkMessage chunk = createChunkSend();
+ SessionReceiveContinuationMessage chunk = createChunkSend();
int chunkLen = chunk.getBody().length;
@@ -689,7 +719,20 @@
pendingLargeMessage.releaseResources();
ServerConsumerImpl.this.largeMessageSender = null;
+
+ if (preAcknowledge)
+ {
+ try
+ {
+ doAck(ref);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error while ACKing reference " + ref, e);
+ }
+ }
+
return true;
}
finally
@@ -698,45 +741,20 @@
}
}
- private SessionSendChunkMessage createChunkSend()
+ private SessionReceiveContinuationMessage createChunkSend()
{
- SessionSendChunkMessage chunk;
+ SessionReceiveContinuationMessage chunk;
int localChunkLen = 0;
- if (positionPendingLargeMessage == 0)
- {
- int headerSize = pendingLargeMessage.getPropertiesEncodeSize();
+ localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
- localChunkLen = minLargeMessageSize - headerSize;
+ MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
- MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
- pendingLargeMessage.encodeProperties(headerBuffer);
+ pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
- pendingLargeMessage.encodeBody(bodyBuffer, 0, localChunkLen);
+ chunk = new SessionReceiveContinuationMessage(id, bodyBuffer.array(), positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
- chunk = new SessionSendChunkMessage(id,
- headerBuffer.array(),
- bodyBuffer.array(),
- localChunkLen < sizePendingLargeMessage,
- false);
- }
- else
- {
- localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
- MessagingBuffer bodyBuffer = new ByteBufferWrapper(ByteBuffer.allocate((int)localChunkLen));
-
- pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage, localChunkLen);
-
- chunk = new SessionSendChunkMessage(id,
- null,
- bodyBuffer.array(),
- positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage,
- false);
- }
-
return chunk;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -61,7 +61,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -1956,7 +1956,7 @@
result = channel.replicatePacket(packet);
// note we process start before response is back from the backup
-
+
setStarted(true);
}
finally
@@ -2157,68 +2157,40 @@
}
}
- public void handleSendChunkMessage(final SessionSendChunkMessage packet)
+ public void handleSendLargeMessage(final SessionSendMessage packet)
{
- if (packet.getMessageID() == 0)
- {
- packet.setMessageID(storageManager.generateUniqueID());
- }
- Packet response = null;
+ DelayedResult result = channel.replicatePacket(packet);
- // TODO: Replication on ChunkMessages
-
- try
+ if (packet.getMessageID() <= 0L)
{
- if (packet.getHeader() != null)
- {
- largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getHeader());
- }
+ // must generate message id here, so we know they are in sync on live and backup
+ long id = storageManager.generateUniqueID();
- largeMessage.addBytes(packet.getBody());
+ packet.setMessageID(id);
+ }
- if (!packet.isContinues())
- {
- final ServerLargeMessage message = largeMessage;
- largeMessage = null;
+ // With a send we must make sure it is replicated to backup before being processed on live
+ // or can end up with delivery being processed on backup before original send
- message.complete();
-
- send(message);
- }
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
+ if (result == null)
+ {
+ doSendLargeMessage(packet);
}
- catch (Exception e)
+ else
{
- log.error("Failed to send message", e);
-
- if (packet.isRequiresResponse())
+ result.setResultRunner(new Runnable()
{
- if (e instanceof MessagingException)
+ public void run()
{
- response = new MessagingExceptionMessage((MessagingException)e);
+ doSendLargeMessage(packet);
}
- else
- {
- response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
- }
- }
+ });
}
-
- channel.confirm(packet);
-
- if (response != null)
- {
- channel.send(response);
- }
-
+
}
-
+
public void handleSend(final SessionSendMessage packet)
{
// With a send we must make sure it is replicated to backup before being processed on live
@@ -2239,14 +2211,14 @@
lock = null;
}
- if (msg.getMessageID() == 0L)
+ if (packet.getMessageID() <= 0L)
{
// must generate message id here, so we know they are in sync on live and backup
long id = storageManager.generateUniqueID();
- msg.setMessageID(id);
+ packet.setMessageID(id);
}
-
+
if (channel.getReplicatingChannel() != null)
{
msg.putBooleanProperty(new SimpleString("clustered"), true);
@@ -2274,70 +2246,34 @@
});
}
}
-
- private void doSend(final SessionSendMessage packet)
+
+ public void handleSendContinuations(final SessionSendContinuationMessage packet)
{
- Packet response = null;
- try
- {
- ServerMessage message = packet.getServerMessage();
+ DelayedResult result = channel.replicatePacket(packet);
- if (message.getDestination().equals(managementAddress))
- {
- // It's a management message
+ // With a send we must make sure it is replicated to backup before being processed on live
+ // or can end up with delivery being processed on backup before original send
- handleManagementMessage(message);
- }
- else
- {
- send(message);
- }
-
- if (packet.isRequiresResponse())
- {
- response = new NullResponseMessage();
- }
+ if (result == null)
+ {
+ doSendContinuations(packet);
}
- catch (Exception e)
+ else
{
- log.error("Failed to send message", e);
-
- if (packet.isRequiresResponse())
+ result.setResultRunner(new Runnable()
{
- if (e instanceof MessagingException)
+ public void run()
{
- response = new MessagingExceptionMessage((MessagingException)e);
+ doSendContinuations(packet);
}
- else
- {
- response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
- }
- }
+ });
}
- channel.confirm(packet);
- if (response != null)
- {
- channel.send(response);
- }
}
- private void handleManagementMessage(final ServerMessage message) throws Exception
- {
- doSecurity(message);
-
- managementService.handleMessage(message);
-
- SimpleString replyTo = (SimpleString)message.getProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
- if (replyTo != null)
- {
- message.setDestination(replyTo);
- send(message);
- }
- }
-
+
public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
{
ServerConsumer consumer = consumers.get(packet.getConsumerID());
@@ -2360,25 +2296,25 @@
public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
{
boolean wasStarted = this.started;
-
+
if (wasStarted)
{
this.setStarted(false);
}
-
+
remotingConnection.removeFailureListener(this);
channel.transferConnection(newConnection);
-
+
RemotingConnection oldReplicatingConnection = newConnection.getReplicatingConnection();
-
+
if (oldReplicatingConnection != null)
{
oldReplicatingConnection.destroy();
}
-
+
newConnection.setReplicatingConnection(remotingConnection.getReplicatingConnection());
-
+
remotingConnection.setReplicatingConnection(null);
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -2393,12 +2329,12 @@
int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
channel.replayCommands(lastReceivedCommandID);
-
+
if (wasStarted)
{
this.setStarted(true);
}
-
+
return serverLastReceivedCommandID;
}
@@ -2415,7 +2351,7 @@
try
{
log.info("Connection timed out, so clearing up resources for session " + name);
-
+
for (Runnable runner : failureRunners)
{
try
@@ -2452,6 +2388,167 @@
// Private
// ----------------------------------------------------------------------------
+ private void doSendLargeMessage(final SessionSendMessage packet)
+ {
+ Packet response = null;
+
+ try
+ {
+ largeMessage = createLargeMessageStorage(packet.getMessageID(), packet.getLargeMessageHeader());
+
+ if (packet.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send message", e);
+
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+ }
+
+ channel.confirm(packet);
+
+ if (response != null)
+ {
+ channel.send(response);
+ }
+ }
+
+ private void doSend(final SessionSendMessage packet)
+ {
+ Packet response = null;
+
+ try
+ {
+ ServerMessage message = packet.getServerMessage();
+
+ if (message.getDestination().equals(managementAddress))
+ {
+ // It's a management message
+
+ handleManagementMessage(message);
+ }
+ else
+ {
+ send(message);
+ }
+
+ if (packet.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send message", e);
+
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+ }
+
+ channel.confirm(packet);
+
+ if (response != null)
+ {
+ channel.send(response);
+ }
+ }
+
+ /**
+ * @param packet
+ */
+ private void doSendContinuations(final SessionSendContinuationMessage packet)
+ {
+ Packet response = null;
+
+ try
+ {
+
+ if (largeMessage == null)
+ {
+ throw new MessagingException(MessagingException.ILLEGAL_STATE, "large-message not initialized on server");
+ }
+
+ largeMessage.addBytes(packet.getBody());
+
+ if (!packet.isContinues())
+ {
+ final ServerLargeMessage message = largeMessage;
+
+ largeMessage = null;
+
+ message.complete();
+
+ send(message);
+ }
+
+ if (packet.isRequiresResponse())
+ {
+ response = new NullResponseMessage();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to send message", e);
+
+ if (packet.isRequiresResponse())
+ {
+ if (e instanceof MessagingException)
+ {
+ response = new MessagingExceptionMessage((MessagingException)e);
+ }
+ else
+ {
+ response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
+ }
+ }
+ }
+
+ channel.confirm(packet);
+
+ if (response != null)
+ {
+ channel.send(response);
+ }
+ }
+
+
+ private void handleManagementMessage(final ServerMessage message) throws Exception
+ {
+ doSecurity(message);
+
+ managementService.handleMessage(message);
+
+ SimpleString replyTo = (SimpleString)message.getProperty(ClientMessageImpl.REPLYTO_HEADER_NAME);
+ if (replyTo != null)
+ {
+ message.setDestination(replyTo);
+ send(message);
+ }
+ }
+
+
private ServerLargeMessage createLargeMessageStorage(final long messageID, final byte[] header) throws Exception
{
ServerLargeMessage largeMessage = storageManager.createLargeMessage();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -15,7 +15,6 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ADD_DESTINATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CHUNK_SEND;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
@@ -30,6 +29,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_REPLICATE_DELIVERY;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_START;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
@@ -60,8 +60,8 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionExpiredMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendChunkMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionReplicateDeliveryMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
@@ -72,8 +72,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXARollbackMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
-import org.jboss.messaging.core.server.ServerLargeMessage;
-import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.ServerSession;
/**
@@ -91,8 +89,7 @@
private final Channel channel;
- public ServerSessionPacketHandler(final ServerSession session,
- final Channel channel)
+ public ServerSessionPacketHandler(final ServerSession session, final Channel channel)
{
this.session = session;
@@ -128,7 +125,7 @@
case SESS_DELETE_QUEUE:
{
SessionDeleteQueueMessage request = (SessionDeleteQueueMessage)packet;
- session.handleDeleteQueue(request);
+ session.handleDeleteQueue(request);
break;
}
case SESS_QUEUEQUERY:
@@ -146,18 +143,18 @@
case SESS_ACKNOWLEDGE:
{
SessionAcknowledgeMessage message = (SessionAcknowledgeMessage)packet;
- session.handleAcknowledge(message);
+ session.handleAcknowledge(message);
break;
}
case SESS_EXPIRED:
{
SessionExpiredMessage message = (SessionExpiredMessage)packet;
- session.handleExpired(message);
+ session.handleExpired(message);
break;
}
case SESS_COMMIT:
{
- session.handleCommit(packet);
+ session.handleCommit(packet);
break;
}
case SESS_ROLLBACK:
@@ -220,7 +217,7 @@
}
case SESS_XA_INDOUBT_XIDS:
{
- session.handleGetInDoubtXids(packet);
+ session.handleGetInDoubtXids(packet);
break;
}
case SESS_XA_GET_TIMEOUT:
@@ -243,7 +240,7 @@
case SESS_REMOVE_DESTINATION:
{
SessionRemoveDestinationMessage message = (SessionRemoveDestinationMessage)packet;
- session.handleRemoveDestination(message);
+ session.handleRemoveDestination(message);
break;
}
case SESS_START:
@@ -281,13 +278,20 @@
case SESS_SEND:
{
SessionSendMessage message = (SessionSendMessage)packet;
- session.handleSend(message);
+ if (message.isLargeMessage())
+ {
+ session.handleSendLargeMessage(message);
+ }
+ else
+ {
+ session.handleSend(message);
+ }
break;
}
- case SESS_CHUNK_SEND:
+ case SESS_SEND_CONTINUATION:
{
- SessionSendChunkMessage message = (SessionSendChunkMessage)packet;
- session.handleSendChunkMessage(message);
+ SessionSendContinuationMessage message = (SessionSendContinuationMessage)packet;
+ session.handleSendContinuations(message);
break;
}
case SESS_REPLICATE_DELIVERY:
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/ChunkTestBase.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -85,6 +85,7 @@
protected void testChunks(final boolean realFiles,
final boolean useFile,
+ final boolean preAck,
final int numberOfMessages,
final int numberOfIntegers,
final boolean sendingBlocking,
@@ -93,6 +94,7 @@
{
testChunks(realFiles,
useFile,
+ preAck,
numberOfMessages,
numberOfIntegers,
sendingBlocking,
@@ -103,6 +105,7 @@
protected void testChunks(final boolean realFiles,
final boolean useFile,
+ final boolean preAck,
final int numberOfMessages,
final int numberOfIntegers,
final boolean sendingBlocking,
@@ -127,9 +130,11 @@
sf.setBlockOnAcknowledge(true);
}
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ ClientSession session = sf.createSession(null, null, false, true, false, preAck, 0);
session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+ long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
ClientProducer producer = session.createProducer(ADDRESS);
@@ -201,7 +206,7 @@
sf = createInVMFactory();
}
- session = sf.createSession(null, null, false, true, true, false, 0);
+ session = sf.createSession(null, null, false, true, true, preAck, 0);
ClientConsumer consumer = null;
@@ -242,7 +247,10 @@
System.currentTimeMillis() - originalTime >= delayDelivery);
}
- message.acknowledge();
+ if (!preAck)
+ {
+ message.acknowledge();
+ }
assertNotNull(message);
@@ -272,7 +280,13 @@
session.close();
+ assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+ assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getDeliveringCount());
+ assertEquals(0, messagingService.getServer().getPostOffice().getBinding(ADDRESS).getQueue().getMessageCount());
+
validateNoFilesOnLargeDir();
+
+
}
finally
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -206,22 +206,33 @@
public void testMessageChunkFilePersistence() throws Exception
{
- testChunks(true, false, 100, 262144, false, 1000, 0);
+ testChunks(true, false, false, 100, 262144, false, 1000, 0);
}
+ public void testMessageChunkFilePersistenceBlocked() throws Exception
+ {
+ testChunks(true, false, false, 100, 262144, true, 1000, 0);
+ }
+
+
+ public void testMessageChunkFilePersistenceBlockedPreCommit() throws Exception
+ {
+ testChunks(true, false, true, 100, 262144, true, 1000, 0);
+ }
+
public void testMessageChunkFilePersistenceDelayed() throws Exception
{
- testChunks(true, false, 1, 50000, false, 1000, 2000);
+ testChunks(true, false, false, 1, 50000, false, 1000, 2000);
}
public void testMessageChunkNullPersistence() throws Exception
{
- testChunks(false, false, 1, 50000, false, 1000, 0);
+ testChunks(false, false, false, 1, 50000, false, 1000, 0);
}
public void testMessageChunkNullPersistenceDelayed() throws Exception
{
- testChunks(false, false, 100, 50000, false, 10000, 100);
+ testChunks(false, false, false, 100, 50000, false, 10000, 100);
}
public void testPageOnLargeMessage() throws Exception
@@ -238,44 +249,44 @@
public void testSendfileMessage() throws Exception
{
- testChunks(true, true, 100, 50000, false, 1000, 0);
+ testChunks(true, true, false, 100, 50000, false, 1000, 0);
}
public void testSendfileMessageOnNullPersistence() throws Exception
{
- testChunks(false, true, 100, 50000, false, 1000, 0);
+ testChunks(false, true, false, 100, 50000, false, 1000, 0);
}
public void testSendfileMessageOnNullPersistenceSmallMessage() throws Exception
{
- testChunks(false, true, 100, 100, false, 1000, 0);
+ testChunks(false, true, false, 100, 100, true, 1000, 0);
}
public void testSendfileMessageSmallMessage() throws Exception
{
- testChunks(true, true, 100, 4, false, 1000, 0);
+ testChunks(true, true, false, 100, 4, false, 1000, 0);
}
public void testSendRegularMessageNullPersistence() throws Exception
{
- testChunks(false, false, 100, 100, false, 1000, 0);
+ testChunks(false, false, false, 100, 100, false, 1000, 0);
}
public void testSendRegularMessageNullPersistenceDelayed() throws Exception
{
- testChunks(false, false, 100, 100, false, 1000, 1000);
+ testChunks(false, false, false, 100, 100, false, 1000, 1000);
}
public void testSendRegularMessagePersistence() throws Exception
{
- testChunks(true, false, 100, 100, false, 1000, 0);
+ testChunks(true, false, false, 100, 100, false, 1000, 0);
}
public void testSendRegularMessagePersistenceDelayed() throws Exception
{
- testChunks(true, false, 100, 100, false, 1000, 1000);
+ testChunks(true, false, false, 100, 100, false, 1000, 1000);
}
public void testTwoBindingsTwoStartedConsumers() throws Exception
Added: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -0,0 +1,199 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.paging;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import junit.framework.AssertionFailedError;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A PagingServiceIntegrationTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 5, 2008 8:25:58 PM
+ *
+ *
+ */
+public class PagingServiceIntegrationTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingServiceIntegrationTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public void testSendReceivePaging() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfMessages = 10000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
+
+ ClientMessage message = null;
+
+ MessagingBuffer body = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bodyLocal.putInt(j);
+ }
+ bodyLocal.flip();
+
+ if (i == 0)
+ {
+ body = bodyLocal;
+ }
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.close();
+
+ messagingService.stop();
+
+ messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+ messagingService.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ assertEquals(i, ((Integer)message2.getProperty(new SimpleString("id"))).intValue());
+
+ message2.acknowledge();
+
+ assertNotNull(message2);
+
+ try
+ {
+ assertEqualsByteArrays(body.limit(), body.array(), message2.getBody().array());
+ }
+ catch (AssertionFailedError e)
+ {
+ log.info("Expected buffer:" + dumbBytesHex(body.array(), 40));
+ log.info("Arriving buffer:" + dumbBytesHex(message2.getBody().array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+
+ session.close();
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/soak/chunk/MessageChunkSoakTest.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -48,7 +48,7 @@
public void testMessageChunkFilePersistence1G() throws Exception
{
- testChunks(true, true, 2, 268435456, false, 300000, 0, true);
+ testChunks(true, true, false, 2, 268435456, false, 300000, 0, true);
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/chunk/MessageChunkStressTest.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -48,12 +48,12 @@
public void testMessageChunkFilePersistence100M() throws Exception
{
- testChunks(true, true, 10, 26214400, false, 120000, 0);
+ testChunks(true, true, false, 10, 26214400, false, 120000, 0);
}
public void testMessageChunkFilePersistence1M() throws Exception
{
- testChunks(true, true, 1000, 262144, false, 120000, 0);
+ testChunks(true, true, false, 1000, 262144, false, 120000, 0);
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -322,6 +322,8 @@
.array());
assertEquals(0, buffers2.size());
+
+ assertEquals(0, storeImpl.getAddressSize());
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -78,16 +78,23 @@
// Protected -----------------------------------------------------
+
protected void clearData()
{
- deleteAndCreateDir(getJournalDir());
- deleteAndCreateDir(getBindingsDir());
- deleteAndCreateDir(getPageDir());
- deleteAndCreateDir(getLargeMessagesDir());
- deleteAndCreateDir(getClientLargeMessagesDir());
- deleteAndCreateDir(getTemporaryDir());
+ clearData(getTestDir());
}
+
+ protected void clearData(String testDir)
+ {
+ deleteAndCreateDir(getJournalDir(testDir));
+ deleteAndCreateDir(getBindingsDir(testDir));
+ deleteAndCreateDir(getPageDir(testDir));
+ deleteAndCreateDir(getLargeMessagesDir(testDir));
+ deleteAndCreateDir(getClientLargeMessagesDir(testDir));
+ deleteAndCreateDir(getTemporaryDir(testDir));
+ }
+
protected void deleteAndCreateDir(String directory)
{
File file = new File(directory);
@@ -155,6 +162,7 @@
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
+ configuration.setJournalDirectory(getJournalDir());
configuration.setJournalFileSize(100 * 1024);
configuration.setPagingDirectory(getPageDir());
configuration.setLargeMessagesDirectory(getLargeMessagesDir());
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-12-09 03:02:02 UTC (rev 5481)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-12-09 03:53:21 UTC (rev 5482)
@@ -69,22 +69,11 @@
public static final String NETTY_ACCEPTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory";
public static final String NETTY_CONNECTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyConnectorFactory";
+
// Attributes ----------------------------------------------------
private String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
- private String journalDir = testDir + "/journal";
-
- private String bindingsDir = testDir + "/bindings";
-
- private String pageDir = testDir + "/page";
-
- private String largeMessagesDir = testDir + "/large-msg";
-
- private String clientLargeMessagesDir = testDir + "/client-large-msg";
-
- private String temporaryDir = testDir + "/temporary";
-
// Static --------------------------------------------------------
public static String dumpBytes(byte[] bytes)
@@ -198,7 +187,7 @@
/**
* @return the testDir
*/
- public String getTestDir()
+ protected String getTestDir()
{
return testDir;
}
@@ -206,51 +195,96 @@
/**
* @return the journalDir
*/
- public String getJournalDir()
+ protected String getJournalDir()
{
- return journalDir;
+ return getJournalDir(testDir);
}
+ protected String getJournalDir(String testDir)
+ {
+ return testDir + "/journal";
+ }
+
/**
* @return the bindingsDir
*/
- public String getBindingsDir()
+ protected String getBindingsDir()
{
- return bindingsDir;
+ return getBindingsDir(testDir);
}
/**
+ * @return the bindingsDir
+ */
+ protected String getBindingsDir(String testDir)
+ {
+ return testDir + "/bindings";
+ }
+
+ /**
* @return the pageDir
*/
- public String getPageDir()
+ protected String getPageDir()
{
- return pageDir;
+ return getPageDir(testDir);
}
/**
+ * @return the pageDir
+ */
+ protected String getPageDir(String testDir)
+ {
+ return testDir + "/page";
+ }
+
+ /**
* @return the largeMessagesDir
*/
- public String getLargeMessagesDir()
+ protected String getLargeMessagesDir()
{
- return largeMessagesDir;
+ return getLargeMessagesDir(testDir);
}
/**
+ * @return the largeMessagesDir
+ */
+ protected String getLargeMessagesDir(String testDir)
+ {
+ return testDir + "/large-msg";
+ }
+
+ /**
* @return the clientLargeMessagesDir
*/
- public String getClientLargeMessagesDir()
+ protected String getClientLargeMessagesDir()
{
- return clientLargeMessagesDir;
+ return getClientLargeMessagesDir(testDir);
}
/**
+ * @return the clientLargeMessagesDir
+ */
+ protected String getClientLargeMessagesDir(String testDir)
+ {
+ return testDir + "/client-large-msg";
+ }
+
+ /**
* @return the temporaryDir
*/
- public String getTemporaryDir()
+ protected String getTemporaryDir()
{
- return temporaryDir;
+ return getTemporaryDir(testDir);
}
+ /**
+ * @return the temporaryDir
+ */
+ protected String getTemporaryDir(String testDir)
+ {
+ return testDir + "/temp";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list