Author: clebert.suconic(a)jboss.com
Date: 2009-11-06 19:33:24 -0500 (Fri, 06 Nov 2009)
New Revision: 8248
Added:
trunk/src/main/org/hornetq/core/message/BodyEncoder.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
Removed:
trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-211 - ClientProducer to send LargeMessages and
few other tweaks
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-06 20:38:14
UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-07 00:33:24
UTC (rev 8248)
@@ -15,10 +15,12 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.LargeMessageBuffer;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.SimpleString;
@@ -228,5 +230,8 @@
"]";
}
-
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getBodyEncoder()
+ */
+
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-06
20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -17,12 +17,11 @@
import java.io.IOException;
import java.io.InputStream;
-import java.nio.ByteBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.remoting.Channel;
@@ -347,47 +346,59 @@
{
final long bodySize = msg.getLargeBodySize();
- LargeMessageEncodingContext context = new DecodingContext(msg);
+ BodyEncoder context = msg.getBodyEncoder();
- for (int pos = 0; pos < bodySize;)
+ context.open();
+ try
{
- final boolean lastChunk;
- final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);
+ for (int pos = 0; pos < bodySize;)
+ {
+ final boolean lastChunk;
- final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
+ final int chunkLength = Math.min((int)(bodySize - pos),
minLargeMessageSize);
- msg.encodeBody(bodyBuffer, context, chunkLength);
+ final HornetQBuffer bodyBuffer = ChannelBuffers.buffer(chunkLength);
- pos += chunkLength;
+ context.encode(bodyBuffer, chunkLength);
- lastChunk = pos >= bodySize;
+ pos += chunkLength;
- final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(bodyBuffer.array(),
-
!lastChunk,
-
lastChunk && sendBlocking);
+ lastChunk = pos >= bodySize;
- if (sendBlocking && lastChunk)
- {
- // When sending it blocking, only the last chunk will be blocking.
- channel.sendBlocking(chunk);
- }
- else
- {
- channel.send(chunk);
- }
+ final SessionSendContinuationMessage chunk = new
SessionSendContinuationMessage(bodyBuffer.array(),
+
!lastChunk,
+
lastChunk && sendBlocking);
- try
- {
- credits.acquireCredits(chunk.getRequiredBufferSize());
+ if (sendBlocking && lastChunk)
+ {
+ // When sending it blocking, only the last chunk will be blocking.
+ channel.sendBlocking(chunk);
+ }
+ else
+ {
+ channel.send(chunk);
+ }
+
+ try
+ {
+ credits.acquireCredits(chunk.getRequiredBufferSize());
+ }
+ catch (InterruptedException e)
+ {
+ }
}
- catch (InterruptedException e)
- {
- }
}
+ finally
+ {
+ context.close();
+ }
}
/**
+ * TODO: This method could be eliminated and
+ * combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean,
Message, ClientProducerCredits)}.
+ * All that's needed for this is ClientMessage returning the proper
BodyEncoder for streamed
* @param sendBlocking
* @param input
* @throws HornetQException
@@ -477,35 +488,4 @@
}
// Inner Classes
--------------------------------------------------------------------------------
- class DecodingContext implements LargeMessageEncodingContext
- {
- private final Message message;
-
- private int lastPos = 0;
-
- public DecodingContext(Message message)
- {
- this.message = message;
- }
-
- public void open() throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public int write(ByteBuffer bufferRead) throws Exception
- {
- return -1;
- }
-
- public int write(HornetQBuffer bufferOut, int size)
- {
- bufferOut.writeBytes(message.getBody(), lastPos, size);
- lastPos += size;
- return size;
- }
- }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-06
20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/client/impl/LargeMessageBufferImpl.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -58,7 +58,7 @@
private final LinkedBlockingQueue<SessionReceiveContinuationMessage> packets =
new LinkedBlockingQueue<SessionReceiveContinuationMessage>();
- private SessionReceiveContinuationMessage currentPacket = null;
+ private volatile SessionReceiveContinuationMessage currentPacket = null;
private final long totalSize;
Copied: trunk/src/main/org/hornetq/core/message/BodyEncoder.java (from rev 8244,
trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java)
===================================================================
--- trunk/src/main/org/hornetq/core/message/BodyEncoder.java (rev
0)
+++ trunk/src/main/org/hornetq/core/message/BodyEncoder.java 2009-11-07 00:33:24 UTC (rev
8248)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+package org.hornetq.core.message;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Class used to encode message body into buffers.
+ * Used to send large streams over the wire
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ * Created Nov 2, 2009
+ */
+public interface BodyEncoder
+{
+ void open() throws HornetQException;
+
+ void close() throws HornetQException;
+
+ int encode(ByteBuffer bufferRead) throws HornetQException;
+
+ int encode(HornetQBuffer bufferOut, int size) throws HornetQException;
+}
Deleted: trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java 2009-11-06
20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/message/LargeMessageEncodingContext.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -1,20 +0,0 @@
-package org.hornetq.core.message;
-
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-
-import java.nio.ByteBuffer;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * Created Nov 2, 2009
- */
-public interface LargeMessageEncodingContext
-{
- void open() throws Exception;
-
- void close() throws Exception;
-
- int write(ByteBuffer bufferRead) throws Exception;
-
- int write(HornetQBuffer bufferOut, int size);
-}
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-11-06 20:38:14 UTC (rev
8247)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-11-07 00:33:24 UTC (rev
8248)
@@ -78,10 +78,10 @@
boolean isLargeMessage();
long getLargeBodySize();
+
+ /** Used to encode Body over the wire when using large messages */
+ BodyEncoder getBodyEncoder();
- // Used on Message chunk
- void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext context,
int size);
-
/** Set the InputStream used on a message that will be sent over a producer */
void setBodyInputStream(InputStream stream);
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-06 20:38:14 UTC
(rev 8247)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-07 00:33:24 UTC
(rev 8248)
@@ -19,13 +19,15 @@
import static org.hornetq.utils.DataConstants.SIZE_LONG;
import java.io.InputStream;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -214,12 +216,6 @@
buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
}
- // Used on Message chunk side
- public void encodeBody(final HornetQBuffer bufferOut, final
LargeMessageEncodingContext context, final int size)
- {
- context.write(bufferOut, size);
- }
-
public void decode(final HornetQBuffer buffer)
{
decodeHeadersAndProperties(buffer);
@@ -663,6 +659,11 @@
{
this.body = body;
}
+
+ public BodyEncoder getBodyEncoder()
+ {
+ return new DecodingContext();
+ }
// Public --------------------------------------------------------
@@ -673,4 +674,36 @@
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
+
+
+ class DecodingContext implements BodyEncoder
+ {
+ private int lastPos = 0;
+
+ public DecodingContext()
+ {
+ }
+
+ public void open()
+ {
+ }
+
+ public void close()
+ {
+ }
+
+ public int encode(ByteBuffer bufferRead) throws HornetQException
+ {
+ HornetQBuffer buffer = ChannelBuffers.wrappedBuffer(bufferRead);
+ return encode(buffer, bufferRead.capacity());
+ }
+
+ public int encode(HornetQBuffer bufferOut, int size)
+ {
+ bufferOut.writeBytes(getBody(), lastPos, size);
+ lastPos += size;
+ return size;
+ }
+ }
+
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -15,9 +15,11 @@
import static org.hornetq.utils.DataConstants.SIZE_INT;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
@@ -25,7 +27,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.server.impl.ServerMessageImpl;
/**
@@ -55,7 +57,7 @@
private SequentialFile file;
private long bodySize = -1;
-
+
private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -64,7 +66,7 @@
public FileLargeServerMessage(final JournalStorageManager storageManager)
{
- this.storageManager = storageManager;
+ this.storageManager = storageManager;
}
/**
@@ -72,16 +74,14 @@
* @param copy
* @param fileCopy
*/
- private FileLargeServerMessage(final FileLargeServerMessage copy,
- final SequentialFile fileCopy,
- final long newID)
+ private FileLargeServerMessage(final FileLargeServerMessage copy, final SequentialFile
fileCopy, final long newID)
{
super(copy);
this.linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
bodySize = copy.bodySize;
- setMessageID(newID);
+ setMessageID(newID);
}
// Public --------------------------------------------------------
@@ -92,7 +92,7 @@
public synchronized void addBytes(final byte[] bytes) throws Exception
{
validateFile();
-
+
if (!file.isOpen())
{
file.open();
@@ -103,14 +103,14 @@
bodySize += bytes.length;
}
- public void encodeBody(final HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
+ public void encodeBody(final HornetQBuffer bufferOut, BodyEncoder context, int size)
{
try
{
// This could maybe be optimized (maybe reading directly into bufferOut)
ByteBuffer bufferRead = ByteBuffer.allocate(size);
- int bytesRead = context.write(bufferRead);
+ int bytesRead = context.encode(bufferRead);
bufferRead.flip();
@@ -188,13 +188,13 @@
}
}
- public LargeMessageEncodingContext createNewContext()
+ public BodyEncoder getBodyEncoder()
{
return new DecodingContext();
}
private void checkDelete() throws Exception
- {
+ {
if (getRefCount() <= 0)
{
if (linkMessage != null)
@@ -301,9 +301,9 @@
SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
durable);
ServerMessage newMessage = new FileLargeServerMessage(linkMessage == null ? this
- :
(FileLargeServerMessage)linkMessage,
- newfile,
- newID);
+ :
(FileLargeServerMessage)linkMessage,
+ newfile,
+ newID);
return newMessage;
}
@@ -334,7 +334,7 @@
{
throw new RuntimeException("MessageID not set on LargeMessage");
}
-
+
file = storageManager.createFileForLargeMessage(getMessageID(), durable);
file.open();
@@ -372,29 +372,62 @@
// Inner classes -------------------------------------------------
- class DecodingContext implements LargeMessageEncodingContext
+ class DecodingContext implements BodyEncoder
{
private SequentialFile cFile;
-
- public void open() throws Exception
+
+ public void open() throws HornetQException
{
- cFile = file.copy();
- cFile.open();
+ try
+ {
+ cFile = file.copy();
+ cFile.open();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(),
e);
+ }
}
- public void close() throws Exception
+ public void close() throws HornetQException
{
- cFile.close();
+ try
+ {
+ cFile.close();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(),
e);
+ }
}
- public int write(ByteBuffer bufferRead) throws Exception
+ public int encode(ByteBuffer bufferRead) throws HornetQException
{
- return cFile.read(bufferRead);
+ try
+ {
+ return cFile.read(bufferRead);
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(),
e);
+ }
}
- public int write(HornetQBuffer bufferOut, int size)
+ public int encode(HornetQBuffer bufferOut, int size) throws HornetQException
{
- return -1;
+ // This could maybe be optimized (maybe reading directly into bufferOut)
+ ByteBuffer bufferRead = ByteBuffer.allocate(size);
+
+ int bytesRead = encode(bufferRead);
+
+ bufferRead.flip();
+
+ if (bytesRead > 0)
+ {
+ bufferOut.writeBytes(bufferRead.array(), 0, bytesRead);
+ }
+
+ return bytesRead;
}
}
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -16,11 +16,8 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.message.LargeMessageEncodingContext;
import org.hornetq.core.server.impl.ServerMessageImpl;
-import java.nio.ByteBuffer;
-
/**
* A NullStorageLargeServerMessage
*
@@ -160,49 +157,6 @@
return getHeadersAndPropertiesEncodeSize();
}
- public LargeMessageEncodingContext createNewContext()
- {
- return new DecodingContext();
- }
-
- @Override
- public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext context,
int size)
- {
- DecodingContext decodingContext = (DecodingContext) context;
- try
- {
- decodingContext.write(bufferOut, size);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- class DecodingContext implements LargeMessageEncodingContext
- {
- private int lastPos = 0;
-
- public void open() throws Exception
- {
- }
-
- public void close() throws Exception
- {
- }
-
- public int write(final ByteBuffer bufferRead) throws Exception
- {
- return -1;
- }
-
- public int write(final HornetQBuffer bufferRead, final int size)
- {
- bufferRead.writeBytes(getBody(), lastPos, size);
- lastPos += size;
- return size;
- }
- }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-06 20:38:14 UTC
(rev 8247)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-07 00:33:24 UTC
(rev 8248)
@@ -13,7 +13,7 @@
package org.hornetq.core.server;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
/**
* A LargeMessage
@@ -43,6 +43,4 @@
void incrementDelayDeletionCount();
void decrementDelayDeletionCount() throws Exception;
-
- LargeMessageEncodingContext createNewContext();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-06
20:38:14 UTC (rev 8247)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -24,12 +24,13 @@
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.impl.ClientConsumerImpl;
import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.QueueBinding;
@@ -632,7 +633,7 @@
/** The current position on the message being processed */
private long positionPendingLargeMessage;
- private LargeMessageEncodingContext context;
+ private BodyEncoder context;
public LargeMessageDeliverer(final LargeServerMessage message, final
MessageReference ref) throws Exception
{
@@ -672,7 +673,7 @@
largeMessage.getLargeBodySize(),
ref.getDeliveryCount());
- context = largeMessage.createNewContext();
+ context = largeMessage.getBodyEncoder();
context.open();
@@ -783,7 +784,7 @@
}
}
- private SessionReceiveContinuationMessage createChunkSend(final
LargeMessageEncodingContext context)
+ private SessionReceiveContinuationMessage createChunkSend(final BodyEncoder
context) throws HornetQException
{
SessionReceiveContinuationMessage chunk;
@@ -793,8 +794,7 @@
HornetQBuffer bodyBuffer = ChannelBuffers.buffer(localChunkLen);
- // pendingLargeMessage.encodeBody(bodyBuffer, positionPendingLargeMessage,
localChunkLen);
- largeMessage.encodeBody(bodyBuffer, context, localChunkLen);
+ context.encode(bodyBuffer, localChunkLen);
chunk = new SessionReceiveContinuationMessage(id,
bodyBuffer.array(),
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-06
20:38:14 UTC (rev 8247)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -31,6 +31,8 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.persistence.impl.journal.FileLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -2080,6 +2082,69 @@
}
}
+ // The ClientConsumer should be able to also send ServerLargeMessages as that's
done by the CoreBridge
+ public void testSendServerMessage() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(false);
+
+ ClientSession session = sf.createSession(false, false);
+
+ try
+ {
+ FileLargeServerMessage fileMessage = new
FileLargeServerMessage((JournalStorageManager)server.getStorageManager());
+
+ fileMessage.setMessageID(1005);
+
+ for (int i = 0 ; i < LARGE_MESSAGE_SIZE; i++)
+ {
+ fileMessage.addBytes(new byte[]{getSamplebyte(i)});
+ }
+
+ fileMessage.releaseResources();
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ prod.send(fileMessage);
+
+ fileMessage.deleteFile();
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer(ADDRESS);
+
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals(msg.getBodySize(), LARGE_MESSAGE_SIZE);
+
+ for (int i = 0 ; i < LARGE_MESSAGE_SIZE; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBody().readByte());
+ }
+
+ msg.acknowledge();
+
+ session.commit();
+
+ }
+ finally
+ {
+ sf.close();
+ server.stop();
+ }
+ }
+
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -59,8 +59,7 @@
internaltestSimpleBridge(false, true);
}
- // Commented out by Clebert - I'm investigating this failure.. so I've set as
disabled
- public void disabled_testSimpleBridgeLargeMessageNullPersistence() throws Exception
+ public void testSimpleBridgeLargeMessageNullPersistence() throws Exception
{
internaltestSimpleBridge(true, false);
}
@@ -234,8 +233,7 @@
internalTestWithFilter(false, true);
}
- // Commented out by Clebert - I'm investigating this failure.. so I've set as
disabled
- public void disabled_testWithFilterLargeMessages() throws Exception
+ public void testWithFilterLargeMessages() throws Exception
{
internalTestWithFilter(true, false);
}
Added:
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
(rev 0)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.largemessage;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.persistence.impl.journal.FileLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A ServerLargeMessageTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ServerLargeMessageTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // The ClientConsumer should be able to also send ServerLargeMessages as that's
done by the CoreBridge
+ public void testSendServerMessage() throws Exception
+ {
+ HornetQServer server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createFactory(false);
+
+ ClientSession session = sf.createSession(false, false);
+
+ try
+ {
+ FileLargeServerMessage fileMessage = new
FileLargeServerMessage((JournalStorageManager)server.getStorageManager());
+
+ fileMessage.setMessageID(1005);
+
+ for (int i = 0 ; i < 2 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ fileMessage.addBytes(new byte[]{getSamplebyte(i)});
+ }
+
+ fileMessage.releaseResources();
+
+ session.createQueue("A", "A");
+
+ ClientProducer prod = session.createProducer("A");
+
+ prod.send(fileMessage);
+
+ fileMessage.deleteFile();
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer cons = session.createConsumer("A");
+
+ ClientMessage msg = cons.receive(5000);
+
+ assertNotNull(msg);
+
+ assertEquals(msg.getBodySize(), 2 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ for (int i = 0 ; i < 2 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBody().readByte());
+ }
+
+ msg.acknowledge();
+
+ session.commit();
+
+ }
+ finally
+ {
+ sf.close();
+ server.stop();
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/AIOJournalImplTest.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -17,7 +17,6 @@
import junit.framework.TestSuite;
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
@@ -44,6 +43,7 @@
super.setUp();
}
+ @Override
protected SequentialFileFactory getFileFactory() throws Exception
{
File file = new File(getTestDir());
Modified:
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/FakeJournalImplTest.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -25,9 +25,9 @@
*/
public class FakeJournalImplTest extends JournalImplTestUnit
{
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- return new FakeSequentialFileFactory();
- }
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new FakeSequentialFileFactory();
+ }
}
-
Modified:
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -32,43 +32,44 @@
public abstract class JournalImplTestUnit extends JournalImplTestBase
{
private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
-
+
+ @Override
protected void tearDown() throws Exception
{
super.tearDown();
-
+
assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
}
-
+
public void testAddUpdateDeleteManyLargeFileSize() throws Exception
{
final int numberAdds = 10000;
-
+
final int numberUpdates = 5000;
-
+
final int numberDeletes = 3000;
-
+
long[] adds = new long[numberAdds];
-
+
for (int i = 0; i < numberAdds; i++)
{
adds[i] = i;
}
-
+
long[] updates = new long[numberUpdates];
-
+
for (int i = 0; i < numberUpdates; i++)
{
updates[i] = i;
}
-
+
long[] deletes = new long[numberDeletes];
-
+
for (int i = 0; i < numberDeletes; i++)
{
deletes[i] = i;
}
-
+
setup(10, 10 * 1024 * 1024, true);
createJournal();
startJournal();
@@ -80,38 +81,38 @@
createJournal();
startJournal();
loadAndCheck();
-
+
}
-
+
public void testAddUpdateDeleteManySmallFileSize() throws Exception
{
final int numberAdds = 10000;
-
+
final int numberUpdates = 5000;
-
+
final int numberDeletes = 3000;
-
+
long[] adds = new long[numberAdds];
-
+
for (int i = 0; i < numberAdds; i++)
{
adds[i] = i;
}
-
+
long[] updates = new long[numberUpdates];
-
+
for (int i = 0; i < numberUpdates; i++)
{
updates[i] = i;
}
-
+
long[] deletes = new long[numberDeletes];
-
+
for (int i = 0; i < numberDeletes; i++)
{
deletes[i] = i;
}
-
+
setup(10, 10 * 1024, true);
createJournal();
startJournal();
@@ -124,57 +125,53 @@
createJournal();
startJournal();
loadAndCheck();
-
+
}
-
+
public void testReclaimAndReload() throws Exception
{
setup(2, 10 * 1024 * 1024, false);
createJournal();
startJournal();
load();
-
+
long start = System.currentTimeMillis();
-
-
+
byte[] record = generateRecord(recordLength);
-
+
int NUMBER_OF_RECORDS = 1000;
for (int count = 0; count < NUMBER_OF_RECORDS; count++)
{
journal.appendAddRecord(count, (byte)0, record, false);
-
+
if (count >= NUMBER_OF_RECORDS / 2)
{
journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2, false);
}
-
+
if (count % 100 == 0)
{
log.debug("Done: " + count);
}
}
-
+
long end = System.currentTimeMillis();
-
- double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
-
+
+ double rate = 1000 * (double)NUMBER_OF_RECORDS / (end - start);
+
log.debug("Rate of " + rate + " adds/removes per sec");
-
+
log.debug("Reclaim status = " + debugJournal());
-
+
stopJournal();
createJournal();
startJournal();
journal.load(new ArrayList<RecordInfo>(), new
ArrayList<PreparedTransactionInfo>(), null);
-
+
assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
-
+
stopJournal();
}
-
-
-}
-
+}
Modified:
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/NIOJournalImplTest.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -29,23 +29,22 @@
*/
public class NIOJournalImplTest extends JournalImplTestUnit
{
- private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
-
- protected String journalDir = System.getProperty("user.home") +
"/journal-test";
-
- protected SequentialFileFactory getFileFactory() throws Exception
- {
- File file = new File(journalDir);
-
- log.debug("deleting directory " + journalDir);
-
- deleteDirectory(file);
-
- file.mkdir();
-
- return new NIOSequentialFileFactory(journalDir);
- }
-
-
-}
+ private static final Logger log = Logger.getLogger(NIOJournalImplTest.class);
+ protected String journalDir = System.getProperty("user.home") +
"/journal-test";
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.debug("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-06
20:38:14 UTC (rev 8247)
+++
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-07
00:33:24 UTC (rev 8248)
@@ -22,7 +22,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.message.LargeMessageEncodingContext;
+import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.postoffice.Binding;
@@ -458,7 +458,7 @@
}
- public void encodeBody(HornetQBuffer bufferOut, LargeMessageEncodingContext
context, int size)
+ public void encodeBody(HornetQBuffer bufferOut, BodyEncoder context, int size)
{
// To change body of implemented methods use File | Settings | File Templates.
}
@@ -1116,6 +1116,15 @@
return false;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.message.Message#getBodyEncoder()
+ */
+ public BodyEncoder getBodyEncoder()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
class FakeFilter implements Filter